博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
RabbitMQ实现延时任务
阅读量:6856 次
发布时间:2019-06-26

本文共 3654 字,大约阅读时间需要 12 分钟。

hot3.png

一、延时任务适用场景

    1. 电商系统中,用户提交订单之后,如果30分钟内没有付款,则取消订单;

    2. 业务操作失败后,间隔一定的时间进行失败重试;

    3. 滴滴打车完成匿名评价之后,延时推送给司机;

    4. ......

二、RabbitMQ延时队列相关概念

       死信:关于死信的定义,官方给出的定义如下

            1. 消费者拒绝处理并设置requeue=false

            2. 消息过期

            3. 队列达到最大长度,排在队列前面的消息会被丢弃或者转发到死信路由上

       Time-To-Live :  消息在队列中的存活时间,过期的消息会成为死信,该属性可以设置给消息或队列;

       Dead Letter Exchanges:死信交换机,死信将会发送给该交换机;

三、工作流程图

四、编码实现

RabbitMQ配置类

import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.BindingBuilder;import org.springframework.amqp.core.DirectExchange;import org.springframework.amqp.core.Queue;import org.springframework.beans.factory.annotation.Value;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import java.util.HashMap;import java.util.Map;/** * 配置类 * @author lijinshan * @date 2018/07/13 */@Configurationpublic class RmqConfig {    /**     * 任务延时时间、失败重试间隔时间     */    @Value("${delay.task.interval}")    private int delayTaskInterval;    /**     * 延时任务暂存队列名称     */    public final static String DELAY_TASK_TEMP_QUEUE = "bl_delay_task_temp_queue";    /**     * 延时MQ交换机     */    public final static String DELAY_TASK_EXCHANGE = "bl_delay_task_exchange";    /**     * 延时MQ路由键     */    public final static String DELAY_TASK_ROUTING_KEY = "bl_delay_task_routing_key";    /**     * 最终处理延时任务的队列名称     */    public final static String DELAY_TASK_HANDLER_QUEUE = "bl_delay_handler_queue";    /**     * 声明延时任务暂存队列     */    @Bean    public Queue delayTaskTempQueue() {        Map
params = new HashMap<>(); params.put("x-message-ttl", delayTaskInterval); params.put("x-dead-letter-exchange", DELAY_TASK_EXCHANGE); params.put("x-dead-letter-routing-key", DELAY_TASK_ROUTING_KEY); return new Queue(DELAY_TASK_TEMP_QUEUE, true, false, false, params); } /** * 声明死信交换机 */ @Bean public DirectExchange delayTaskExchange() { return new DirectExchange(DELAY_TASK_EXCHANGE); } /** * 声明最终处理延时任务的队列 */ @Bean public Queue delayTaskHandlerQueue() { return new Queue(DELAY_TASK_HANDLER_QUEUE); } /** * 最终处理延时任务的队列和死信交换机绑定起来 */ @Bean public Binding delayTaskBinding(Queue delayTaskHandlerQueue, DirectExchange delayTaskExchange) { return BindingBuilder.bind(delayTaskHandlerQueue).to(delayTaskExchange).with(DELAY_TASK_ROUTING_KEY); }}

消费者

import com.alibaba.fastjson.JSONObject;import com.kingdee.cloudhub.newscard.proto.DelayTaskDTO;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.beans.factory.annotation.Value;import org.springframework.stereotype.Component;/** * 延时任务消费者类 * @author lijinshan * @date 2018/07/13 */@Component@RabbitListener(queues = RmqConfig.DELAY_TASK_HANDLER_QUEUE)public class DelayTaskRmqConsumer {    /**     * 失败重试次数,如果大于0,表示开启失败重试     */    @Value("${delay.task.retry-times}")    private int maxRetryTime;    @RabbitHandler    public void process(String msg) {        try {            DelayTaskDTO delayTaskDTO = JSONObject.parseObject(msg, DelayTaskDTO.class);            // 如果失败次数大于指定值,则抛弃该任务            if (delayTaskDTO.getHandlerCount() > maxRetryTime) {                //记录入库,人工排查                return;            }            // 如果业务执行失败,重新发回延时队列 delay_task_temp_queue        } catch (Exception e) {            // 如果出现异常,重新发回延时队列 delay_task_temp_queue        }    }}

五、启动应用程序,打开RabbitMQ管理页面,查看关键配置

    1. 临时队列配信息

2. 死信交换机配置信息

    由以上信息可以看出,延时队列的ttl被设置成1800000ms(30分钟),死信交换机为bl_delay_task_exchange,消息在延时队列中过期之后,将会进入死信交换机,由交换机将其路由到真正的消费队列bl_delay_handler_queue。

转载于:https://my.oschina.net/lyyjason/blog/1846173

你可能感兴趣的文章
-webkit-overflow-scrolling
查看>>
钉钉开发系列(十一)钉钉网页扫码登录
查看>>
什么是ERP
查看>>
linux ./configure 的参数详解
查看>>
Github 上 Star 最多的个人 Spring Boot 开源学习项目
查看>>
企业级大数据平台构建
查看>>
0302作业.
查看>>
关于:target与定位动画的奇怪现象
查看>>
linq
查看>>
css设置height 100%
查看>>
数据结构与算法基本学习笔记(5)
查看>>
【2-SAT】【DFS】【分类讨论】Gym - 101617K - Unsatisfying
查看>>
Eclipse+Tomcat+Ant 小记
查看>>
[转载]ubuntu防火墙设置
查看>>
poj3080
查看>>
java-注释、API之字符串(String)
查看>>
jQuery函数attr()和prop()的区别
查看>>
mysql 查询
查看>>
SAS9.4安装
查看>>
UIPageViewController-浅析
查看>>