一、延迟消息适应场景

一般延迟队列用于特定事件发生后隔一段时间需要做特定处理的场景,下面举几个常见的栗子

1.电商系统中,若用户下单后30min不支付,自动取消订单
2.用户登录APP浏览特定商品20min后还没下单,自动推送商品评测信息的消息并发放商品相关优惠券


二、rabbitMQ的延迟消息

Rabbitmq本身是没有延迟队列的,要实现延迟消息,一般有两种方式:

1.通过Rabbitmq本身队列的特性来实现,需要使用Rabbitmq的死信交换机(Exchange)和消息的存活时间TTL(Time To Live)
2.在rabbitmq 3.5.7及以上的版本提供了一个插件(rabbitmq-delayed-message-exchange)来实现延迟队列功能。同时插件依赖Erlang/OPT 18.0及以上

一个个来看

三、rabbitMQ不使用插件实现延迟消息

3.1 原理图

image.png

若想不借助插件实现rabbitMQ的延迟消息,实际就是利用一个没有消费者的Queue1,等待消息过期后,通过交换机转发到Queue2来进行消费,消息的延迟时间就是消息在Queue1中的存活时间

3.2 不使用插件实现延迟消息的局限性

可以看到如果不使用插件,延迟消息的延迟时间是依赖于Queue1的x-message-ttl的,也就是说,需要支持多少种延迟的时间,就得提前设置好多少个无消费类的Queue,而且由于转发绑定的Queue2需要配到交换机中,比较死板,而真实的业务中消费类肯定是不一样的,故我真正在实现的时候在发到Queue1之前把消息体及需要消费的Queue(假设名称为Queue3)进行了落库,Queue2中进行消费时,实际是查库把消息读出来然后发送到了Queue3

而如果使用插件就没有以上的局限

四、rabbitMQ使用插件实现延迟消息

4.1 插件简介

在rabbitmq 3.5.7及以上的版本提供了一个插件(rabbitmq-delayed-message-exchange)来实现延迟队列功能。同时插件依赖Erlang/OPT 18.0及以上。

插件源码地址:
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange

插件下载地址:
https://bintray.com/rabbitmq/community-plugins/rabbitmq_delayed_message_exchange

4.2 插件使用方式

4.2.1 安装

进入插件安装目录

{rabbitmq-server}/plugins/(可以查看一下当前已存在的插件)

下载插件

wget https://bintray.com/rabbitmq/community-plugins/download_file?file_path=rabbitmq_delayed_message_exchange-0.0.1.ez

(如果下载的文件名称不规则就手动重命名一下如:rabbitmq_delayed_message_exchange-0.0.1.ez)

4.2.2 启用插件

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

(关闭插件)

rabbitmq-plugins disable rabbitmq_delayed_message_exchange


4.2.3 插件使用

在需要发送延迟消息队列的项目中,声明一个x-delayed-message类型的交换机来使用delayed-messaging特性,注意这个交换机并不是rabbitmq本身的,而是插件提供的,一定要是x-delayed-message类型,绑定的queue就是正常的queue即可,不需要额外多余的queue(这是和不用插件方式的最大区别及好处)

消息发送时,在header添加"x-delay"参数来控制消息的延时时间,如果使用的是spring-rabbit中的RabbitTemplate,只需要通过messageProperties.setDelay(delay)方法set上延迟时间即可(单位为毫秒)