RabbitMq(具体怎么用,看这一篇即可)
创始人
2024-05-28 09:14:16
0

RabbitMq汇总

  • 1.RabbitMq的传统实现方式
  • 2.SpringAMQP简化RabbitMq开发
    • 2.1 基本消息队列(BasicQueue)
    • 2.2 工作消息队列(WorkQueue)
    • 2.3 发布订阅 -- 广播(Fanout)
    • 2.4 发布订阅 -- 路由(Direct)
    • 2.5 发布订阅 -- 主题(Topic)
  • 2.SpringAMQP声明交换机和队列
    • 2.1 使用bean的方式声明交换机和队列
    • 2.2 使用注解的方式声明交换机和队列

1.RabbitMq的传统实现方式

动手实现一个简单的消息队列
在这里插入图片描述

无论时发布消息还是消费消息,都要建立连接, 所以我们可以将这个步骤抽取出来

public class ConnectionUtil {/*** 建立与RabbitMQ的连接* @return* @throws Exception*/public static Connection getConnection() throws Exception {// 定义连接工厂ConnectionFactory factory = new ConnectionFactory();// 设置服务地址factory.setHost("192.168.202.128");// 端口factory.setPort(5672);// 设置账号信息,用户名、密码、vhostfactory.setVirtualHost("/");factory.setUsername("itcast");factory.setPassword("123321");// 通过工厂获取连接Connection connection = factory.newConnection();return connection;}
}

一、发布消息

  1. 建立连接
  2. 创建通道
  3. 创建队列
  4. 发送消息
  5. 关闭通道和连接
public class PublisherTest {@Testpublic void testSendMessage() throws IOException, TimeoutException {// 1.建立连接Connection connection = ConnectionUtil.getConnection();// 2.创建通道ChannelChannel channel = connection.createChannel();// 3.创建队列String queueName = "simple.queue";channel.queueDeclare(queueName, false, false, false, null);// 4.发送消息String message = "hello, rabbitmq!";channel.basicPublish("", queueName, null, message.getBytes());System.out.println("发送消息成功:【" + message + "】");// 5.关闭通道和连接channel.close();connection.close();}
}

二、订阅消息

  1. 建立连接
  2. 创建通道
  3. 创建队列
  4. 订阅消息
public class ConsumerTest {public static void main(String[] args) throws IOException, TimeoutException {// 1.建立连接Connection connection = ConnectionUtil.getConnection();// 2.创建通道ChannelChannel channel = connection.createChannel();// 3.创建队列String queueName = "simple.queue";channel.queueDeclare(queueName, false, false, false, null);// 4.订阅消息channel.basicConsume(queueName, true, new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) throws IOException {// 5.处理消息String message = new String(body);System.out.println("接收到消息:【" + message + "】");}});System.out.println("等待接收消息。。。。");}
}

2.SpringAMQP简化RabbitMq开发

一、引入依赖


org.springframework.bootspring-boot-starter-amqp

二、在发布者消费者两端,都要配置MQ地址
SpringAMQP提供了配置来简化手动创建连接这一复杂的过程

spring:rabbitmq:host: 192.168.202.128 # 主机名port: 5672 # 端口virtual-host: / # 虚拟主机username: itcast # 用户名password: 123321 # 密码

三、简化发送消息
SpringAMQP提供了RabbitTemplate类来简化发送消息的步骤

@Autowired
private RabbitTemplate rabbitTemplate;

四、简化订阅消息
SpringAMQP提供了@RabbitListener注解来简化订阅消息的步骤

@RabbitListener(queues = "simple.queue")
public void listenMessage(String msg) throws InterruptedException {}

2.1 基本消息队列(BasicQueue)

最基本的队列模型:一个生产者发送消息到一个队列,一个消费者从队列中取消息

在这里插入图片描述

实际开发中,我们通常事先在rabbitMq界面创建好队列,然后只要记住队列的名称

一、发布消息
  注入RabbitTemplate来简化操作, RabbitTemplate在执行convertAndSend方法时,会自动开启通道, 往指定名称的队列中发送消息, 并在方法结束后关闭连接和通道

@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSimpleQueue() {// 队列名称String queueName = "simple.queue";// 消息String message = "hello, spring amqp!";// 发送消息rabbitTemplate.convertAndSend(queueName, message);}
}

二、订阅消息
使用@RabbitListener注解实现对队列的订阅

@Component
public class SpringRabbitListener {@RabbitListener(queues = "simple.queue")public void listenSimpleQueueMessage(String msg) throws InterruptedException {System.out.println("spring 消费者接收到消息:【" + msg + "】");}
}

总结:
基本消息队列模型中,在生产者和消费者之间,只有队列这一个媒介

  • 生产者只要知道往哪个队列发送消息
  • 消费者只要知道订阅哪个队列中的消息

2.2 工作消息队列(WorkQueue)

在基本消息队列(BasicQueue)中,我们只有一个消费者, 在工作消息队列模型下,我们可以设置多个消费者同时订阅一个队列
在这里插入图片描述
一、发布消息
  实现和基本消息队列时是一样的,只要知道往哪个队列发送消息, 这里我们演示发送多条信息

@Test
public void testWorkQueue() throws InterruptedException {// 队列名称String queueName = "simple.queue";// 消息String message = "hello, message_";for (int i = 0; i < 20; i++) {// 发送消息rabbitTemplate.convertAndSend(queueName, message + i);Thread.sleep(20);}
}

二、订阅消息

@RabbitListener(queues = "simple.queue")
public void listenWorkQueue1(String msg) throws InterruptedException {System.out.println("消费者1接收到消息:【" + msg + "】" + LocalTime.now());Thread.sleep(20);
}@RabbitListener(queues = "simple.queue")
public void listenWorkQueue2(String msg) throws InterruptedException {System.err.println("消费者2........接收到消息:【" + msg + "】" + LocalTime.now());Thread.sleep(200);
}

总结:
工作消息队列模型中,在生产者和消费者之间,只有队列这一个媒介(跟基本模型一样)

  • 生产者只要知道往哪个队列发送消息
  • 消费者只要知道订阅哪个队列中的消息(同一个队列)

2.3 发布订阅 – 广播(Fanout)

广播模式下,引入了一个新的概念:交换机

  交换机是一个消息的中转站, 它可以实现广播、定向、通配符等不同形式的消息递交方式; 交换机只负责递交消息, 并不具备存储消息的能力, 消息最终存储媒介, 依旧是队列, 因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!

在这里插入图片描述

一、发布消息
  现在就不是往队列中发消息了, 发送者只要知道往哪个交换机发送消息, 不用去关心交换机将消息转发给哪些队列

@Test
public void testFanoutExchange() {// 队列名称String exchangeName = "itcast.fanout";// 消息String message = "hello, everyone!";rabbitTemplate.convertAndSend(exchangeName, "", message);
}

  如果你想了解交换机会往哪些队列中发送消息,可以登录rabbitMq的界面,查看交换机详情, 里面会详细罗列当前交换机绑定的队列
在这里插入图片描述

二、订阅消息
消息的最终存储媒介,依旧是队列.消费者只要知道订阅哪个队列中的消息

@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) {System.out.println("消费者1接收到Fanout消息:【" + msg + "】");
}@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg) {System.out.println("消费者2接收到Fanout消息:【" + msg + "】");
}

总结:

  • 生产者只要知道往哪个交换机发送消息, 不用去关心交换机将消息转发给哪些队列
  • 消费者只要知道订阅哪个队列中的消息

2.4 发布订阅 – 路由(Direct)

  之前我们学习了广播(Fanout), 在广播模式下, 只要往交换机发送消息, 那么交换机会将消息转发给所有绑定的队列, 而路由(Direct)又称为定向
  交换机绑定了A、B两个队列,我们往交换机中发消息时, 最终希望交换机只把消息转发给B队列,这个过程即路由
在这里插入图片描述

一、发布消息
往交换机发消息的时候,需要指定交换机转发给哪个队列(拥有通用routingKey的队列), 此处我们设置的routingKey为red

@Test
public void testSendDirectExchange() {// 交换机名称String exchangeName = "itcast.direct";// 消息String message = "红色警报!日本乱排核废水,导致海洋生物变异,惊现哥斯拉!";// 发送消息rabbitTemplate.convertAndSend(exchangeName, "red", message);
}

我们发现这个交换机绑定了两个队列, 每个队列都设置了两个routingKey, 其中都有red, 所以这两个队列都能收到消息
在这里插入图片描述

二、订阅消息

@RabbitListener(queues =  "direct.queue1")
public void listenDirectQueue1(String msg){System.out.println("消费者接收到direct.queue1的消息:【" + msg + "】");
}@RabbitListener(queues =  "direct.queue2")
public void listenDirectQueue2(String msg){System.out.println("消费者接收到direct.queue2的消息:【" + msg + "】");
}

总结:

  • 生产者不仅要知道往哪个交换机发消息, 同时还要通过路由秘钥(routingKey)指定交换机将消息转发给哪些队列(拥有同样的路由秘钥,即可转发)
  • 消费者只要知道订阅哪个队列中的消息(一直都没变过)

2.5 发布订阅 – 主题(Topic)

  主题(Topic)是对路由(Direct)的一种补充, 我们希望某一个组的队列都能收到消息, 但是在rabbitMq中无法将队列编组, 有了主题(Topic)后,我们可以将这一组的队列的routingKey都设置为china.#, 那只会Routingkey只要符合通配符规则, 例如china.jiangsuchina.jiangsu.suzhou 这个组都可以接收到消息
在这里插入图片描述

  • #:匹配一个或多个词
    item.#:能够匹配item.spu.insert 或者 item.spu
  • *:匹配不多不少恰好1个词
    item.*:只能匹配item.spu

一、发布消息
跟路由时一样, 交换机拿到routingKey后会去匹配对应的队列

@Test
public void testSendTopicExchange() {// 交换机名称String exchangeName = "itcast.topic";// 消息String message = "喜报!孙悟空大战哥斯拉,胜!";// 发送消息rabbitTemplate.convertAndSend(exchangeName, "china.news", message);
}

当下判断两个队列都符合routingKey
在这里插入图片描述

二、订阅消息

@RabbitListener(queues =  "topic.queue1")
public void listenTopicQueue1(String msg){System.out.println("消费者接收到topic.queue1的消息:【" + msg + "】");
}@RabbitListener(queues =  "topic.queue2")
public void listenTopicQueue2(String msg){System.out.println("消费者接收到topic.queue2的消息:【" + msg + "】");
}

总结:

  • 生产者不仅要知道往哪个交换机发消息, 同时还要通过路由秘钥(routingKey)指定交换机将消息转发给哪些队列(拥有同样的路由秘钥,即可转发)
  • 交换机会根据routingKey来匹配符合条件的队列(这个过程是交换机来完成,所以我们不用关心)
  • 消费者只要知道订阅哪个队列中的消息(一直都没变过)

2.SpringAMQP声明交换机和队列

2.1 使用bean的方式声明交换机和队列

启动项目后, 就会在rabbitMq中创建交换机和队列, 包括两者的绑定关系

package cn.itcast.mq.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class FanoutConfig {/*** 声明交换机* @return Fanout类型交换机*/@Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange("itcast.fanout");}/*** 第1个队列*/@Beanpublic Queue fanoutQueue1(){return new Queue("fanout.queue1");}/*** 绑定队列和交换机*/@Beanpublic Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}/*** 第2个队列*/@Beanpublic Queue fanoutQueue2(){return new Queue("fanout.queue2");}/*** 绑定队列和交换机*/@Beanpublic Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}
}

2.2 使用注解的方式声明交换机和队列

Direct定向
基于@Bean的方式声明队列和交换机比较麻烦,Spring还提供了基于注解方式来声明。
在consumer的SpringRabbitListener中添加两个消费者,同时基于注解来声明队列和交换机:

@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue1"),exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),key = {"red", "blue"}
))
public void listenDirectQueue1(String msg){System.out.println("消费者接收到direct.queue1的消息:【" + msg + "】");
}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue2"),exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),key = {"red", "yellow"}
))
public void listenDirectQueue2(String msg){System.out.println("消费者接收到direct.queue2的消息:【" + msg + "】");
}

Topic主题

@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue1"),exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),key = "china.#"
))
public void listenTopicQueue1(String msg){System.out.println("消费者接收到topic.queue1的消息:【" + msg + "】");
}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue2"),exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),key = "#.news"
))
public void listenTopicQueue2(String msg){System.out.println("消费者接收到topic.queue2的消息:【" + msg + "】");
}

相关内容

热门资讯

监控摄像头接入GB28181平... 流程简介将监控摄像头的视频在网站和APP中直播,要解决的几个问题是:1&...
Windows10添加群晖磁盘... 在使用群晖NAS时,我们需要通过本地映射的方式把NAS映射成本地的一块磁盘使用。 通过...
protocol buffer... 目录 目录 什么是protocol buffer 1.protobuf 1.1安装  1.2使用...
在Word、WPS中插入AxM... 引言 我最近需要写一些文章,在排版时发现AxMath插入的公式竟然会导致行间距异常&#...
【PdgCntEditor】解... 一、问题背景 大部分的图书对应的PDF,目录中的页码并非PDF中直接索引的页码...
修复 爱普生 EPSON L4... L4151 L4153 L4156 L4158 L4163 L4165 L4166 L4168 L4...
Fluent中创建监测点 1 概述某些仿真问题,需要创建监测点,用于获取空间定点的数据࿰...
educoder数据结构与算法...                                                   ...
MySQL下载和安装(Wind... 前言:刚换了一台电脑,里面所有东西都需要重新配置,习惯了所...
MFC文件操作  MFC提供了一个文件操作的基类CFile,这个类提供了一个没有缓存的二进制格式的磁盘...