java使用RabbitMQ实现延迟消息示例
作者:java炒饭小能手
在分布式系统中,消息队列通常用于解耦服务,RabbitMQ是一个广泛使用的消息队列服务。延迟消息(也称为延时队列或TTL消息)是一种常见的场景应用,特别适合处理某些任务在一段时间后执行的需求,如订单超时处理、延时通知等。
本文将以具体代码为例,展示如何使用RabbitMQ来实现延迟消息处理,涵盖队列和交换机的配置、消息的发送与接收以及死信队列的处理。
什么是延迟消息?
延迟消息是指消息在发送到队列后,经过设定的时间延迟再被消费。RabbitMQ 本身没有直接支持延迟队列的功能,但可以通过 TTL(Time To Live)+ 死信队列(Dead Letter Queue, DLQ) 的组合来实现。当消息超过TTL(消息存活时间)后,不会被立即消费,而是会被转发到绑定的死信队列,从而实现延迟处理。
RabbitMQ中的延迟消息原理
在RabbitMQ中,我们可以通过以下几个概念来实现延迟消息:
- TTL(Time To Live):可以为队列设置TTL,消息超过该时间后会被标记为“死信”。
- 死信队列(Dead Letter Queue):当消息在正常队列中过期或处理失败时,RabbitMQ可以将它们路由到一个死信队列,死信队列可以用来处理这些过期或未处理的消息。
- x-dead-letter-exchange 和 x-dead-letter-routing-key:可以通过配置队列的参数,将过期消息发送到一个专门的死信交换器,并根据指定的路由键转发到死信队列。
消息来到ttl.queue消息队列,过期时间内无人消费,消息来到死信交换机hmall.direct,在direct.queue消息队列无需等待。
1. RabbitMQ的配置
首先,我们需要配置两个队列和两个交换机:一个用于存放延时消息,另一个用于处理超时的死信消息。
package com.heima.stroke.configuration; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; @Configuration public class RabbitConfig { // 延迟时间 单位:毫秒 (这里设为30秒) private static final long DELAY_TIME = 1000 * 30; // 行程超时队列 public static final String STROKE_OVER_QUEUE = "STROKE_OVER_QUEUE"; // 行程死信队列 public static final String STROKE_DEAD_QUEUE = "STROKE_DEAD_QUEUE"; // 行程超时队列交换机 public static final String STROKE_OVER_QUEUE_EXCHANGE = "STROKE_OVER_QUEUE_EXCHANGE"; // 行程死信队列交换机 public static final String STROKE_DEAD_QUEUE_EXCHANGE = "STROKE_DEAD_QUEUE_EXCHANGE"; // 行程超时交换机 Routing Key public static final String STROKE_OVER_KEY = "STROKE_OVER_KEY"; // 行程死信交换机 Routing Key public static final String STROKE_DEAD_KEY = "STROKE_DEAD_KEY"; /** * 声明行程超时队列,并设置其参数 * x-dead-letter-exchange:绑定的死信交换机 * x-dead-letter-routing-key:死信路由Key * x-message-ttl:消息的过期时间 */ @Bean public Queue strokeOverQueue() { Map<String, Object> args = new HashMap<>(3); args.put("x-dead-letter-exchange", STROKE_DEAD_QUEUE_EXCHANGE); args.put("x-dead-letter-routing-key", STROKE_DEAD_KEY); args.put("x-message-ttl", DELAY_TIME); // 设置TTL为30秒 return QueueBuilder.durable(STROKE_OVER_QUEUE).withArguments(args).build(); } @Bean public DirectExchange strokeOverQueueExchange() { return new DirectExchange(STROKE_OVER_QUEUE_EXCHANGE); } @Bean public Binding bindingStrokeOverDirect() { return BindingBuilder.bind(strokeOverQueue()).to(strokeOverQueueExchange()).with(STROKE_OVER_KEY); } }
解释:
TTL设置:我们通过x-message-ttl
设置消息的过期时间为30秒。
死信队列绑定:通过x-dead-letter-exchange
和x-dead-letter-routing-key
设置,当消息过期时,它会被转发到死信交换机,再路由到死信队列。
2. 生产者发送延迟消息
接下来,我们通过生产者向超时队列发送消息,这些消息将在TTL过期后转发到死信队列。
package com.heima.stroke.rabbitmq; import com.alibaba.fastjson.JSON; import com.heima.modules.vo.StrokeVO; import com.heima.stroke.configuration.RabbitConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component public class MQProducer { private final static Logger logger = LoggerFactory.getLogger(MQProducer.class); @Autowired RabbitTemplate rabbitTemplate; /** * 发送延时消息到行程超时队列 * * @param strokeVO 消息体 */ public void sendOver(StrokeVO strokeVO) { String mqMessage = JSON.toJSONString(strokeVO); logger.info("send timeout msg:{}", mqMessage); rabbitTemplate.convertAndSend(RabbitConfig.STROKE_OVER_QUEUE_EXCHANGE, RabbitConfig.STROKE_OVER_KEY, mqMessage); } }
解释:
sendOver
方法将消息发送到超时队列,消息将在超时后进入死信队列。生产者不需要额外处理TTL或死信的配置,只需发送消息即可。
3. 消费者监听死信队列
当消息超过TTL后,将会被转发到死信队列。消费者需要监听死信队列并处理这些消息。
package com.heima.stroke.rabbitmq; import com.alibaba.fastjson.JSON; import com.heima.modules.vo.StrokeVO; import com.heima.stroke.configuration.RabbitConfig; import com.heima.stroke.handler.StrokeHandler; import com.rabbitmq.client.Channel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.handler.annotation.Header; import org.springframework.stereotype.Component; @Component public class MQConsumer { private final static Logger logger = LoggerFactory.getLogger(MQConsumer.class); @Autowired private StrokeHandler strokeHandler; /** * 监听死信队列 * * @param message 消息体 * @param channel RabbitMQ的Channel * @param tag 消息的Delivery Tag */ @RabbitListener( bindings = { @QueueBinding( value = @Queue(value = RabbitConfig.STROKE_DEAD_QUEUE, durable = "true"), exchange = @Exchange(value = RabbitConfig.STROKE_DEAD_QUEUE_EXCHANGE), key = RabbitConfig.STROKE_DEAD_KEY) }) @RabbitHandler public void processStroke(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) { StrokeVO strokeVO = JSON.parseObject(message.getBody(), StrokeVO.class); logger.info("get dead msg:{}", message.getBody()); if (strokeVO == null) { return; } try { // 处理超时的行程消息 strokeHandler.timeoutHandel(strokeVO); // 手动确认消息 channel.basicAck(tag, false); } catch (Exception e) { e.printStackTrace(); } } }
解释:
@RabbitListener
注解绑定了死信队列的监听器。当消息被转发到死信队列时,该消费者会接收到消息。
使用 channel.basicAck(tag, false)
手动确认消息处理成功,确保消息不会重复消费。
4. 处理超时业务逻辑
在我们的业务中,当消息超时未处理时,将其状态设置为超时。
public void timeoutHandel(StrokeVO strokeVO) { // 获取司机行程ID和乘客行程ID String inviterTripId = strokeVO.getInviterTripId(); String inviteeTripId = strokeVO.getInviteeTripId(); // 检查邀请状态是否为未确认 String inviteeStatus = redisHelper.getHash(HtichConstants.STROKE_INVITE_PREFIX, inviteeTripId, inviterTripId); String inviterStatus = redisHelper.getHash(HtichConstants.STROKE_INVITE_PREFIX, inviterTripId, inviteeTripId); if (String.valueOf(InviteState.UNCONFIRMED.getCode()).equals(inviteeStatus) && String.valueOf(InviteState.UNCONFIRMED.getCode()).equals(inviterStatus)) { // 更新为超时状态 redisHelper.addHash(HtichConstants.STROKE_INVITE_PREFIX, inviteeTripId, inviterTripId, String.valueOf(InviteState.TIMEOUT.getCode())); redisHelper.addHash(HtichConstants.STROKE_INVITE_PREFIX, inviterTripId, inviteeTripId, String.valueOf(InviteState.TIMEOUT.getCode())); } }
到此这篇关于java使用RabbitMQ实现延迟消息示例的文章就介绍到这了,更多相关java RabbitMQ延迟消息内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!