RMQ延迟队列
创始人
2024-05-02 02:54:46
0

目录

  • 一、场景
  • 二、TTL和DLX
  • 三、开发步骤

一、场景

“订单下单成功后,15分钟未支付自动取消”
1.传统处理超时订单
采取定时任务轮训数据库订单,并且批量处理。其弊端也是显而易见的;对服务器、数据库性会有很大的要求,
并且当处理大量订单起来会很力不从心,而且实时性也不是特别好。当然传统的手法还可以再优化一下,
即存入订单的时候就算出订单的过期时间插入数据库,设置定时任务查询数据库的时候就只需要查询过期了的订单,
然后再做其他的业务操作

2.rabbitMQ延时队列方案
一台普通的rabbitmq服务器单队列容纳千万级别的消息还是没什么压力的,而且rabbitmq集群扩展支持的也是非常好的,
并且队列中的消息是可以进行持久化,即使我们重启或者宕机也能保证数据不丢失

二、TTL和DLX

rabbitMQ中是没有延时队列的,也没有属性可以设置,只能通过死信交换机(DLX)和设置过期时间(TTL)结合起来实现延迟队列

1.TTL
TTL是Time To Live的缩写, 也就是生存时间。
RabbitMq支持对消息和队列设置TTL,对消息这设置是在发送的时候指定,对队列设置是从消息入队列开始计算, 只要超过了队列的超时时间配置, 那么消息会自动清除。
如果两种方式一起使用消息的TTL和队列的TTL之间较小的为准,也就是消息5s过期,队列是10s,那么5s的生效。
默认是没有过期时间的,表示消息没有过期时间;如果设置为0,表示消息在投递到消费者的时候直接被消费,否则丢弃。

 设置消息的过期时间用 x-message-ttl 参数实现,单位毫秒。设置队列的过期时间用 x-expires 参数,单位毫秒,注意,不能设置为0。消息:生产者 -> 交换机 消息在生产者制造消息的时候就开始计算了TTL  TTL=5队列:生产者 -> 交换机 -> 路由键 -> 队列 当消息送达到队列的时候才开始计算TTL  TTL=10

2.DLX和死信队列
DLX即Dead-Letter-Exchange(死信交换机),它其实就是一个正常的交换机,能够与任何队列绑定。

 死信队列是指队列(正常)上的消息(过期)变成死信后,能够发送到另外一个交换机(DLX),然后被路由到一个队列上,这个队列,就是死信队列成为死信一般有以下几种情况:消息被拒绝(basic.reject or basic.nack)且带requeue=false参数消息的TTL-存活时间已经过期队列长度限制被超越(队列满)注1:如果队列上存在死信, RabbitMq会将死信消息投递到设置的DLX上去 ,注2:通过在队列里设置x-dead-letter-exchange参数来声明DLX,如果当前DLX是direct类型还要声明x-dead-letter-routing-key参数来指定路由键,如果没有指定,则使用原队列的路由键
  1. 延迟队列
    通过DLX和TTL模拟出延迟队列的功能,即,消息发送以后,不让消费者拿到,而是等待过期时间,变成死信后,发送给死信交换机再路由到死信队列进行消费

在这里插入图片描述

三、开发步骤

编写配置类

package com.xlb.rabbitmqprovider.config;import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;@Configuration
public class RabbitDLXConfig {public final static String NORMAL_QUEUE = "normal_queue";public final static String NORMAL_ROUTING_KEY = "normal_routing_key";public final static String NORMAL_EXCHANGE = "normal_exchange";public final static String DELAY_QUEUE = "delay_queue";public final static String DELAY_ROUTING_KEY = "delay_routing_key";public final static String DELAY_EXCHANGE = "delay_exchange";//    普通交换机以及普通队列@Beanpublic Queue normalQueue(){Map map = new HashMap();map.put("x-message-ttl", 20000);//message在该队列queue的存活时间最大为10秒map.put("x-dead-letter-exchange", DELAY_EXCHANGE); //x-dead-letter-exchange参数是设置该队列的死信交换器(DLX)map.put("x-dead-letter-routing-key", DELAY_ROUTING_KEY);//x-dead-letter-routing-key参数是给这个DLX指定路由键return new Queue(NORMAL_QUEUE, true, false, false, map);}@Beanpublic DirectExchange normalExchange(){return new DirectExchange(NORMAL_EXCHANGE, true, false);}@Beanpublic Binding normalBinding(){return BindingBuilder.bind(normalQueue()).to(normalExchange()).with(NORMAL_ROUTING_KEY);}//    死信交换机及延迟队列@Beanpublic Queue delayQueue(){return new Queue(DELAY_QUEUE);}@Beanpublic DirectExchange delayExchange(){return new DirectExchange(DELAY_EXCHANGE);}@Beanpublic Binding delayBinding(){return BindingBuilder.bind(delayQueue()).to(delayExchange()).with(DELAY_ROUTING_KEY);}}

启动测试
在这里插入图片描述
在这里插入图片描述

编写消费者

package com.xlb.rabbitmqconsumer.conf;import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.util.Map;@Component
@RabbitListener(queues = {"delay_queue"})
public class DLXReceiver {@RabbitHandlerpublic void handler(Map msg){System.out.println("通过订单编号,查询数据库,修改订单状态为已支付或者待支付,如果为待支付,那么库存要重新加回去");System.out.println(msg);}
}

在这里插入图片描述

相关内容

热门资讯

监控摄像头接入GB28181平... 流程简介将监控摄像头的视频在网站和APP中直播,要解决的几个问题是:1&...
Windows10添加群晖磁盘... 在使用群晖NAS时,我们需要通过本地映射的方式把NAS映射成本地的一块磁盘使用。 通过...
protocol buffer... 目录 目录 什么是protocol buffer 1.protobuf 1.1安装  1.2使用...
在Word、WPS中插入AxM... 引言 我最近需要写一些文章,在排版时发现AxMath插入的公式竟然会导致行间距异常&#...
【PdgCntEditor】解... 一、问题背景 大部分的图书对应的PDF,目录中的页码并非PDF中直接索引的页码...
Fluent中创建监测点 1 概述某些仿真问题,需要创建监测点,用于获取空间定点的数据࿰...
educoder数据结构与算法...                                                   ...
MySQL下载和安装(Wind... 前言:刚换了一台电脑,里面所有东西都需要重新配置,习惯了所...
修复 爱普生 EPSON L4... L4151 L4153 L4156 L4158 L4163 L4165 L4166 L4168 L4...
MFC文件操作  MFC提供了一个文件操作的基类CFile,这个类提供了一个没有缓存的二进制格式的磁盘...