7. TTL 延迟队列
创始人
2024-03-16 15:24:01
0

二八佳人体似酥,腰间仗剑斩愚夫。虽然不见人头落,暗里教君骨髓枯。

创建两个队列 QA和 QB,两者队列 TTL 分别设置为 10S 和 40S,然后在创建一个交换机 X和死信交
换机 Y,它们的类型都是 direct,创建一个死信队列QD,它们的绑定关系如下:

image.png

死信队列

ttl 配置 TtlConfig

package com.yjl.amqp.config.ttl;import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;import java.util.HashMap;
import java.util.Map;/*** 列信队列配置** @author yuejianli* @date 2022-11-23*/
@Component
public class TtlConfig {@Value("${rabbit.ttl.queue_a}")private String queuea;@Value("${rabbit.ttl.queue_b}")private String queueb;@Value("${rabbit.ttl.x_exchange}")private String xexchange;@Value("${rabbit.ttl.y_dead_queue_d}")private String deadQueued;@Value("${rabbit.ttl.y_dead_exchange}")private String ydeadExchange;@Value("${rabbit.ttl.queue_c}")private String queuec;// 定义队列和 交换机,并进行绑定@Bean(value = "direct_exchange")DirectExchange directExchange() {return new DirectExchange(xexchange);}@Bean(value = "direct_queuea")public Queue queuea() {//return new Queue(queuea);Map args = new HashMap<>();args.put("x-dead-letter-exchange", ydeadExchange);args.put("x-dead-letter-routing-key", "YD");// 声明 ttl  10sargs.put("x-message-ttl", 10000);return QueueBuilder.durable(queuea).withArguments(args).build();}//进行绑定@BeanBinding bindingDirectExchangea(@Qualifier("direct_queuea") Queue queue,@Qualifier("direct_exchange") DirectExchange directExchange) {return BindingBuilder.bind(queue).to(directExchange).with("XA");}@Bean(value = "direct_queueb")public Queue queueb() {// return new Queue(queueb);Map args = new HashMap<>();args.put("x-dead-letter-exchange", ydeadExchange);args.put("x-dead-letter-routing-key", "YD");// 声明 ttl  40sargs.put("x-message-ttl", 40000);return QueueBuilder.durable(queueb).withArguments(args).build();}@BeanBinding bindingDirectExchangeb(@Qualifier("direct_queueb") Queue queue,@Qualifier("direct_exchange") DirectExchange directExchange) {return BindingBuilder.bind(queue).to(directExchange).with("XB");}@Bean(value = "direct_dead_exchange")DirectExchange directDeadExchange() {return new DirectExchange(ydeadExchange);}@Bean(value = "direct_dead_queued")public Queue deadQueued() {return new Queue(deadQueued);}// 进行绑定@BeanBinding bindingDeadDirectExchange(@Qualifier("direct_dead_queued") Queue queue,@Qualifier("direct_dead_exchange") DirectExchange directExchange) {return BindingBuilder.bind(queue).to(directExchange).with("YD");}
}

死信消费者 TtlConsumer

@Component
@Slf4j
public class TtlConsumer {/*** 死信队列*/@RabbitListener(queues = {"${rabbit.ttl.y_dead_queue_d}"})public void deadQueueListener(String message) {log.info("死信队列获取消息:" + message);// 执行具体的业务操作}
}

生产者 TtlProduer

@RestController
@Slf4j
public class TtlProduer {@Value("${rabbit.ttl.x_exchange}")private String xexchange;@Value("${rabbit.ttl.delayed_exchange}")private String deadExchange;@Value("${rabbit.ttl.delayed_routing_key}")private String deadRoutingKey;@Resourceprivate RabbitTemplate rabbitTemplate;@GetMapping("/sendMessage/{message}")public String sendMessage(@PathVariable("message") String message) {log.info("接收到消息:{}", message);// 发送到 A 队列rabbitTemplate.convertAndSend(xexchange, "XA", "消息来自ttl 10s 的队列消息:" + message);//发送到B队列rabbitTemplate.convertAndSend(xexchange, "XB", "消息来自ttl 40s 的队列消息:" + message);return message;}
}

验证

输入网址, 发送消息

http://localhost:8088/Server/sendMessage/hello,RabbitMQ

image.png

死信队列 A 10s 接到消息, 队列B 40s 后接收到消息。

第一条消息在 10S 后变成了死信消息,然后被消费者消费掉,第二条消息在 40S 之后变成了死信消息,
然后被消费掉,这样一个延时队列就打造完成了。
不过,如果这样使用的话,岂不是每增加一个新的时间需求,就要新增一个队列,这里只有 10S 和 40S
两个时间选项,如果需要一个小时后处理,那么就需要增加 TTL 为一个小时的队列,如果是预定会议室然 后提前通知这样的场景,岂不是要增加无数个队列才能满足需求?

延时队列优化

在这里新增了一个队列QC,绑定关系如下,该队列不设置 TTL 时间

image.png

ttl 配置QC 队列 TtlConfig

    // 声明一个队列 C, 没有过期时间的。@Bean(value = "direct_queuec")public Queue queuec() {// return new Queue(queueb);Map args = new HashMap<>();args.put("x-dead-letter-exchange", ydeadExchange);args.put("x-dead-letter-routing-key", "YD");return QueueBuilder.durable(queuec).withArguments(args).build();}@BeanBinding bindingDirectExchangec(@Qualifier("direct_queuec") Queue queue,@Qualifier("direct_exchange") DirectExchange directExchange) {return BindingBuilder.bind(queue).to(directExchange).with("XC");}

死信消费者 TtlConsumer 保持不变

生产者 TtlProducer

时间由前端传入, 设置时间往 C 队列里面放置。

    @GetMapping("/sendMessage/{message}/{ttl}")public String sendMessageWithTtl(@PathVariable("message") String message, @PathVariable("ttl") String ttl) {log.info("接收到消息:{}, 发送的时长 是{} ms", message, ttl);//
//        // 发送到 A 队列
//        rabbitTemplate.convertAndSend(xexchange,"XA","消息来自ttl 10s 的队列消息:"+message);
//        //发送到B队列
//        rabbitTemplate.convertAndSend(xexchange,"XB","消息来自ttl 40s 的队列消息:"+message);// 发送给 C 队列rabbitTemplate.convertAndSend(xexchange, "XC", message, correlationData -> {correlationData.getMessageProperties().setExpiration(ttl);return correlationData;});return message;}

验证

发送消息,携带着时间:

http://localhost:8088/Server/sendMessage/sendA5s/5000

http://localhost:8088/Server/sendMessage/sendB20s/20000

image.png

看起来似乎没什么问题,但是在最开始的时候,就介绍过如果使用在消息属性上设置 TTL 的方式,消
息可能并不会按时“死亡“,因为 RabbitMQ只会检查第一个消息是否过期,如果过期则丢到死信队列, 如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行。

如,先执行发送 20s 的,再执行发送 5s的

http://localhost:8088/Server/sendMessage/sendB20s/20000
http://localhost:8088/Server/sendMessage/sendA5s/5000

image.png

消息消费时有顺序了。

Rabbitmq 插件实现延迟队列

上文中提到的问题,确实是一个问题,如果不能实现在消息粒度上的 TTL,并使其在设置的 TTL 时间
及时死亡,就无法设计成一个通用的延时队列。那如何解决呢,接下来我们就去解决该问题

在官网上下载 https://www.rabbitmq.com/community-plugins.html,
下载 rabbitmq_delayed_message_exchange 插件,然后解压放置到 RabbitMQ的插件目录。
进入 RabbitMQ的安装目录下的 plgins 目录,执行下面命令让该插件生效,然后重启 RabbitMQ

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

image.png

我本地可能是由于版本的原因,一直不成功

D:\job\RabbitMQ Server\rabbitmq_server-3.10.11\sbin>rabbitmq-plugins.bat enable rabbitmq_delayed_message_exchange

image.png

image.png

虽然有这个 x-delayed-message

架构优化

image.png

在我们自定义的交换机中,这是一种新的交换类型,该类型消息支持延迟投递机制 消息传递后并
不会立即投递到目标队列中,而是存储在mnesia(一个分布式数据系统)表中,当达到投递时间时,才 投递到目标队列中

配置 DelayedQueueConfig

package com.yjl.amqp.config.ttl;import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;import java.util.HashMap;
import java.util.Map;/*** 延迟队列处理** @author yuejianli* @date 2022-11-23*/
@Component
public class DelayedQueueConfig {@Value("${rabbit.ttl.delayed_queue}")private String deadQueued;@Value("${rabbit.ttl.delayed_exchange}")private String deadExchange;@Value("${rabbit.ttl.delayed_routing_key}")private String deadRoutingKey;// 自定义交换机, 定义延迟交换机@Bean("delayedExchange")public CustomExchange delayedExchange() {Map args = new HashMap<>();// 自定义交换机的类型args.put("x-delayed-type", "direct");return new CustomExchange(deadExchange, "x-delayed-message", true, false, args);}@Bean("delayedQueue")public Queue delayedQueue() {return new Queue(deadQueued);}//进行绑定@BeanBinding bindingDelayedExchange(@Qualifier("delayedQueue") Queue queue,@Qualifier("delayedExchange") CustomExchange customExchange) {return BindingBuilder.bind(queue).to(customExchange).with(deadRoutingKey).noargs();}
}

延迟消费者扩展 TtlConsumer

    /*** 延迟队列*/@RabbitListener(queues = {"${rabbit.ttl.delayed_queue}"})public void delayedQueueListener(String message) {log.info("延迟队列获取消息:" + message);// 执行具体的业务操作}

生产者 TtlProducer

 @GetMapping("/sendDelayMessage/{message}/{ttl}")public String sendDelayMessageWithTtl(@PathVariable("message") String message, @PathVariable("ttl") String ttl) {log.info("接收到消息:{}, 发送的时长 是{} ms", message, ttl);// 发送给 C 队列rabbitTemplate.convertAndSend(deadExchange, deadRoutingKey, message, correlationData -> {correlationData.getMessageProperties().setExpiration(ttl);return correlationData;});return message;}

验证

http://localhost:8088/Server/sendDelayMessage/sendB20s/20000
http://localhost:8088/Server/sendDelayMessage/sendA5s/5000

我本地是不行, 有发现问题的,可以告诉我。
image.png

想要的效果是这样:

image.png

第二个消息被先消费掉了,符合预期

延时队列在需要延时处理的场景下非常有用,使用 RabbitMQ来实现延时队列可以很好的利用
RabbitMQ的特性,如:
消息可靠发送、消息可靠投递、死信队列来保障消息至少被消费一次以及未被正 确处理的消息不会被丢弃。
另外,通过 RabbitMQ集群的特性,可以很好的解决单点故障问题,
不会因为 单个节点挂掉导致延时队列不可用或者消息丢失。
当然,延时队列还有很多其它选择,
比如利用 Java 的DelayQueue,利用 Redis 的 zset,
利用 Quartz 或者利用 kafka 的时间轮,这些方式各有特点,看需要适用的场景

相关内容

热门资讯

监控摄像头接入GB28181平... 流程简介将监控摄像头的视频在网站和APP中直播,要解决的几个问题是:1&...
Windows10添加群晖磁盘... 在使用群晖NAS时,我们需要通过本地映射的方式把NAS映射成本地的一块磁盘使用。 通过...
protocol buffer... 目录 目录 什么是protocol buffer 1.protobuf 1.1安装  1.2使用...
在Word、WPS中插入AxM... 引言 我最近需要写一些文章,在排版时发现AxMath插入的公式竟然会导致行间距异常&#...
Fluent中创建监测点 1 概述某些仿真问题,需要创建监测点,用于获取空间定点的数据࿰...
educoder数据结构与算法...                                                   ...
MySQL下载和安装(Wind... 前言:刚换了一台电脑,里面所有东西都需要重新配置,习惯了所...
MFC文件操作  MFC提供了一个文件操作的基类CFile,这个类提供了一个没有缓存的二进制格式的磁盘...
有效的括号 一、题目 给定一个只包括 '(',')','{','}'...
【Ctfer训练计划】——(三... 作者名:Demo不是emo  主页面链接:主页传送门 创作初心ÿ...