RabbitMQ 快速入门-消息的收发
创始人
2024-02-28 21:25:17
0

RabbitMQ 快速入门-消息的收发

  • 准备工作
  • 一、Connection 方式
    • 1. 生产者测试类
    • 2. 消费者测试类
    • 注意
  • 二、RabbitTemplate 方式
    • 1. 生产者测试类
    • 2. 创建队列
    • 3. 消费者
    • 注意

准备工作

推荐创建两个 SpringBoot 项目,一个作为生产者,另一个作为消费者

也可使用 Maven 的继承聚合模式管理两个项目

项目中需要引入下面的依赖

		org.springframework.bootspring-boot-starter-amqp

为便于运行,在测试类中编写代码对消息队列进行操作

一、Connection 方式

RabbitMQ 中有几种概念,分别是:虚拟主机(virtualHost),通道(channel),队列(queue),还有一个交换机(exchanges)的概念在之后会遇到

服务先与虚拟主机建立连接,然后创建通道,声明或创建队列之后发送或接收消息,消息最终会在队列中传输

下面使用 connection 的方式来实现接发消息,以便于理解 RabbitMQ 的模式(不常用到,了解即可)

1. 生产者测试类

@SpringBootTest
public class PublisherTest {@Testpublic void testSendMessage() throws IOException, TimeoutException {// 1.建立连接ConnectionFactory factory = new ConnectionFactory();// 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码factory.setHost("192.168.0.102");factory.setPort(5672);factory.setVirtualHost("/");factory.setUsername("root");factory.setPassword("123456");// 1.2.建立连接Connection connection = factory.newConnection();// 2.创建通道 ChannelChannel channel = connection.createChannel();// 3. 声明队列(不存在则创建)String queueName = "simple.queue";	// 队列名channel.queueDeclare(queueName, false, false, false, null);// 4.发送消息String message = "hello, rabbitmq!";channel.basicPublish("", queueName, null, message.getBytes());System.out.println("发送消息成功:" + message);// 5.关闭通道和连接channel.close();connection.close();}
}

2. 消费者测试类

@SpringBootTest
public class ConsumerTest {public static void main(String[] args) throws IOException, TimeoutException {// 1. 建立连接ConnectionFactory factory = new ConnectionFactory();// 1.1 设置连接参数,分别是:主机名、端口号、vhost、用户名、密码factory.setHost("192.168.0.102");factory.setPort(5672);factory.setVirtualHost("/");factory.setUsername("root");factory.setPassword("123456");// 1.2 建立连接Connection connection = factory.newConnection();// 2. 创建通道 ChannelChannel channel = connection.createChannel();// 3. 声明队列(不存在则创建)String queueName = "simple.queue";	// 队列名channel.queueDeclare(queueName, false, false, false, null);// 4. 获取消息channel.basicConsume(queueName, true, new DefaultConsumer(channel) {@SneakyThrows@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) {// 5.处理消息String message = new String(body);System.out.println("接收到消息:" + message);}});System.out.println("等待接收消息......");}
}

我们可以在 RabbitMQ 的管理页面查看相关信息:

在这里插入图片描述

注意

  1. 消费者接收消息是异步过程,而不会阻塞主线程
  2. queueDeclare 在不存在该队列时会创建队列,否则不创建
  3. 队列不会被自动删除,可以在管理页删除(点击队列名称,点击 Delete 选项)
  4. 消息只会被读取一次,未被读取的消息存放在队列中等待被消费
  5. 上例消费者没有关闭通道和连接的操作,不会只读取一条消息,而是一直等待不停读取
  6. RabbitMQ 重启后,队列因未持久化被删除,将 queueDeclare 第二个参数改为 true 以创建持久化队列(已存在的队列不可更改)
  7. RabbitMQ 重启后,虽有队列但消息没了,因为消息未持久化,发送消息时将 basicPublish 方法第三个参数改为 MessageProperties.PERSISTENT_TEXT_PLAIN 以持久化消息

持久化的队列在 Features 栏会有字母 D 标示,如图:

在这里插入图片描述
有持久化的消息可以看到 Properties 信息,未持久化则没有,如图:

在这里插入图片描述

二、RabbitTemplate 方式

上面的例子可以看出,大多数代码是重复的,所以 SpringAMQP 中封装了 RabbitTemplate 以便于进行消息队列的操作

首先在项目 yaml 配置文件中假如 RabbitMQ 的连接相关配置

spring:rabbitmq:host: 192.168.0.102	# RabbitMQ 服务 ip 地址port: 5672			# 消息服务端口username: root		# 用户名password: "123456"	# 密码virtual-host: /		#虚拟主机

然后就能自动装配 RabbitTemplate 类了

1. 生产者测试类

@RunWith(SpringRunner.class)
@SpringBootTest()
public class SpringAmqpTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSendMessage2SimpleQueue() {String queueName = "simple.queue";String message   = "Hello, springAMQP!";rabbitTemplate.convertAndSend(queueName, message);}
}

只需调用 convertAndSend 方法即可发送消息

注意:此操作不会创建队列,如果队列不存在则没有效果

2. 创建队列

若要创建队列,需要声明一个 Queue 类型的 bean 并受到 Spring 的管理

通常放在一个 Configuration 配置类中,示例如下:

@Configuration
public class RabbitMqConfig {@Beanpublic Queue simpleQueue() {return new Queue("simple.queue");	// 队列名与函数名无关}
}

如此启动项目时,bean 被创建,就会创建一个队列(若已存在则不再创建)

3. 消费者

消费者不再在测试类中演示,而是使用监听队列的方式

只需在一个方法上注解 @RabbitListener,并指定队列名
同时方法所在的类也要被 Spring 管理(注解 @Component)

@Component
public class SpringRabbitListener {@RabbitListener(queues = "simple.queue")public void listenSimpleQueue(String message) {System.out.printf("消费者接收到 simple.queue 的消息:【 %s 】\n", message);}}

启动项目即可监听队列并处理接收到的消息

注意:如果监听的队列名不存在,则会报错Failed to declare queue(s):[simple.queue],解决方法同前面的配置里创建队列

注意

  1. 此方式创建的队列默认持久化
  2. 此方式生产的消息默认持久化

相关内容

热门资讯

监控摄像头接入GB28181平... 流程简介将监控摄像头的视频在网站和APP中直播,要解决的几个问题是:1&...
Windows10添加群晖磁盘... 在使用群晖NAS时,我们需要通过本地映射的方式把NAS映射成本地的一块磁盘使用。 通过...
protocol buffer... 目录 目录 什么是protocol buffer 1.protobuf 1.1安装  1.2使用...
Fluent中创建监测点 1 概述某些仿真问题,需要创建监测点,用于获取空间定点的数据࿰...
educoder数据结构与算法...                                                   ...
MySQL下载和安装(Wind... 前言:刚换了一台电脑,里面所有东西都需要重新配置,习惯了所...
MFC文件操作  MFC提供了一个文件操作的基类CFile,这个类提供了一个没有缓存的二进制格式的磁盘...
在Word、WPS中插入AxM... 引言 我最近需要写一些文章,在排版时发现AxMath插入的公式竟然会导致行间距异常&#...
有效的括号 一、题目 给定一个只包括 '(',')','{','}'...
【Ctfer训练计划】——(三... 作者名:Demo不是emo  主页面链接:主页传送门 创作初心ÿ...