一、延时任务适用场景
1. 电商系统中,用户提交订单之后,如果30分钟内没有付款,则取消订单;
2. 业务操作失败后,间隔一定的时间进行失败重试;
3. 滴滴打车完成匿名评价之后,延时推送给司机;
4. ......
二、RabbitMQ延时队列相关概念
死信:关于死信的定义,官方给出的定义如下
1. 消费者拒绝处理并设置requeue=false
2. 消息过期
3. 队列达到最大长度,排在队列前面的消息会被丢弃或者转发到死信路由上
Time-To-Live : 消息在队列中的存活时间,过期的消息会成为死信,该属性可以设置给消息或队列;
Dead Letter Exchanges:死信交换机,死信将会发送给该交换机;
三、工作流程图
四、编码实现
RabbitMQ配置类
import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.BindingBuilder;import org.springframework.amqp.core.DirectExchange;import org.springframework.amqp.core.Queue;import org.springframework.beans.factory.annotation.Value;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import java.util.HashMap;import java.util.Map;/** * 配置类 * @author lijinshan * @date 2018/07/13 */@Configurationpublic class RmqConfig { /** * 任务延时时间、失败重试间隔时间 */ @Value("${delay.task.interval}") private int delayTaskInterval; /** * 延时任务暂存队列名称 */ public final static String DELAY_TASK_TEMP_QUEUE = "bl_delay_task_temp_queue"; /** * 延时MQ交换机 */ public final static String DELAY_TASK_EXCHANGE = "bl_delay_task_exchange"; /** * 延时MQ路由键 */ public final static String DELAY_TASK_ROUTING_KEY = "bl_delay_task_routing_key"; /** * 最终处理延时任务的队列名称 */ public final static String DELAY_TASK_HANDLER_QUEUE = "bl_delay_handler_queue"; /** * 声明延时任务暂存队列 */ @Bean public Queue delayTaskTempQueue() { Mapparams = new HashMap<>(); params.put("x-message-ttl", delayTaskInterval); params.put("x-dead-letter-exchange", DELAY_TASK_EXCHANGE); params.put("x-dead-letter-routing-key", DELAY_TASK_ROUTING_KEY); return new Queue(DELAY_TASK_TEMP_QUEUE, true, false, false, params); } /** * 声明死信交换机 */ @Bean public DirectExchange delayTaskExchange() { return new DirectExchange(DELAY_TASK_EXCHANGE); } /** * 声明最终处理延时任务的队列 */ @Bean public Queue delayTaskHandlerQueue() { return new Queue(DELAY_TASK_HANDLER_QUEUE); } /** * 最终处理延时任务的队列和死信交换机绑定起来 */ @Bean public Binding delayTaskBinding(Queue delayTaskHandlerQueue, DirectExchange delayTaskExchange) { return BindingBuilder.bind(delayTaskHandlerQueue).to(delayTaskExchange).with(DELAY_TASK_ROUTING_KEY); }}
消费者
import com.alibaba.fastjson.JSONObject;import com.kingdee.cloudhub.newscard.proto.DelayTaskDTO;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.beans.factory.annotation.Value;import org.springframework.stereotype.Component;/** * 延时任务消费者类 * @author lijinshan * @date 2018/07/13 */@Component@RabbitListener(queues = RmqConfig.DELAY_TASK_HANDLER_QUEUE)public class DelayTaskRmqConsumer { /** * 失败重试次数,如果大于0,表示开启失败重试 */ @Value("${delay.task.retry-times}") private int maxRetryTime; @RabbitHandler public void process(String msg) { try { DelayTaskDTO delayTaskDTO = JSONObject.parseObject(msg, DelayTaskDTO.class); // 如果失败次数大于指定值,则抛弃该任务 if (delayTaskDTO.getHandlerCount() > maxRetryTime) { //记录入库,人工排查 return; } // 如果业务执行失败,重新发回延时队列 delay_task_temp_queue } catch (Exception e) { // 如果出现异常,重新发回延时队列 delay_task_temp_queue } }}
五、启动应用程序,打开RabbitMQ管理页面,查看关键配置
1. 临时队列配信息
2. 死信交换机配置信息
由以上信息可以看出,延时队列的ttl被设置成1800000ms(30分钟),死信交换机为bl_delay_task_exchange,消息在延时队列中过期之后,将会进入死信交换机,由交换机将其路由到真正的消费队列bl_delay_handler_queue。