java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > Springboot rabbitmq死信队列

Springboot结合rabbitmq实现的死信队列

作者:西安未央

为了保证订单业务的消息数据不丢失,需要使用到RabbitMQ的死信队列机制,本文主要介绍了Springboot结合rabbitmq实现的死信队列,具有一定的参考价值,感兴趣的可以了解一下

概述

RabbitMQ是流行的开源消息队列系统,使用erlang语言开发。为了保证订单业务的消息数据不丢失,需要使用到RabbitMQ的死信队列机制,当消息消费发生异常时,将消息投入死信队列中。但由于对死信队列的概念及配置不熟悉,导致曾一度陷入百度的汪洋大海,无法自拔,很多文章都看起来可行,但是实际上却并不能帮我解决实际问题。最终,在官网文档中找到了我想要的答案,通过官网文档的学习,才发现对于死信队列存在一些误解,导致配置死信队列之路困难重重。

详细

一、运行效果

image.png

image.png

二、实现过程

①、先创建一个Springboot项目。然后在pom文件中添加  spring-boot-starter-amqp  和  spring-boot-starter-web 的依赖,接下来创建一个Config类,这里是关键:

package com.zyf.rabbitmqdeadletterdemo.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class RabbitMQConfig {
    public static final String BUSINESS_EXCHANGE_NAME = "rabbitmq.dead.letter.business.exchange";
    public static final String BUSINESS_QUEUEA_NAME = "rabbitmq.dead.letter.business.queueA";
    public static final String BUSINESS_QUEUEB_NAME = "rabbitmq.dead.letter.business.queueB";
    public static final String DEAD_LETTER_EXCHANGE = "rabbitmq.dead.letter.deadletter.exchange";
    public static final String DEAD_LETTER_QUEUEA_ROUTING_KEY = "rabbitmq.dead.letter.deadletter.queueA.routingkey";
    public static final String DEAD_LETTER_QUEUEB_ROUTING_KEY = "rabbitmq.dead.letter.deadletter.queueB.routingkey";
    public static final String DEAD_LETTER_QUEUEA_NAME = "rabbitmq.dead.letter.deadletter.queueA";
    public static final String DEAD_LETTER_QUEUEB_NAME = "rabbitmq.dead.letter.deadletter.queueB";
    // 声明业务Exchange
    @Bean("businessExchange")
    public FanoutExchange businessExchange(){
        return new FanoutExchange(BUSINESS_EXCHANGE_NAME);
    }
    // 声明死信Exchange
    @Bean("deadLetterExchange")
    public DirectExchange deadLetterExchange(){
        return new DirectExchange(DEAD_LETTER_EXCHANGE);
    }
    // 声明业务队列A
    @Bean("businessQueueA")
    public Queue businessQueueA(){
        Map<String, Object> args = new HashMap<>(2);
//       x-dead-letter-exchange    这里声明当前队列绑定的死信交换机
        args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
//       x-dead-letter-routing-key  这里声明当前队列的死信路由key
        args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEA_ROUTING_KEY);
        return QueueBuilder.durable(BUSINESS_QUEUEA_NAME).withArguments(args).build();
    }
    // 声明业务队列B
    @Bean("businessQueueB")
    public Queue businessQueueB(){
        Map<String, Object> args = new HashMap<>(2);
//       x-dead-letter-exchange    这里声明当前队列绑定的死信交换机
        args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
//       x-dead-letter-routing-key  这里声明当前队列的死信路由key
        args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEB_ROUTING_KEY);
        return QueueBuilder.durable(BUSINESS_QUEUEB_NAME).withArguments(args).build();
    }
    // 声明死信队列A
    @Bean("deadLetterQueueA")
    public Queue deadLetterQueueA(){
        return new Queue(DEAD_LETTER_QUEUEA_NAME);
    }
    // 声明死信队列B
    @Bean("deadLetterQueueB")
    public Queue deadLetterQueueB(){
        return new Queue(DEAD_LETTER_QUEUEB_NAME);
    }
    // 声明业务队列A绑定关系
    @Bean
    public Binding businessBindingA(@Qualifier("businessQueueA") Queue queue,
                                    @Qualifier("businessExchange") FanoutExchange exchange){
        return BindingBuilder.bind(queue).to(exchange);
    }
    // 声明业务队列B绑定关系
    @Bean
    public Binding businessBindingB(@Qualifier("businessQueueB") Queue queue,
                                    @Qualifier("businessExchange") FanoutExchange exchange){
        return BindingBuilder.bind(queue).to(exchange);
    }
    // 声明死信队列A绑定关系
    @Bean
    public Binding deadLetterBindingA(@Qualifier("deadLetterQueueA") Queue queue,
                                      @Qualifier("deadLetterExchange") DirectExchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEA_ROUTING_KEY);
    }
    // 声明死信队列B绑定关系
    @Bean
    public Binding deadLetterBindingB(@Qualifier("deadLetterQueueB") Queue queue,
                                      @Qualifier("deadLetterExchange") DirectExchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEB_ROUTING_KEY);
    }
}

②、接下来,是业务队列的消费代码:

@Slf4j@Componentpublic class BusinessMessageReceiver {    @RabbitListener(queues = BUSINESS_QUEUEA_NAME)
    public void receiveA(Message message, Channel channel) throws IOException {
        String msg = new String(message.getBody());
        log.info("收到业务消息A:{}", msg);        boolean ack = true;
        Exception exception = null;        try {            if (msg.contains("deadletter")){                throw new RuntimeException("dead letter exception");
            }
        } catch (Exception e){
            ack = false;
            exception = e;
        }        if (!ack){
            log.error("消息消费发生异常,error msg:{}", exception.getMessage(), exception);
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
        } else {
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        }
    }    @RabbitListener(queues = BUSINESS_QUEUEB_NAME)
    public void receiveB(Message message, Channel channel) throws IOException {
        System.out.println("收到业务消息B:" + new String(message.getBody()));
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}

③、然后配置死信队列的消费者:

@Componentpublic class DeadLetterMessageReceiver {    @RabbitListener(queues = DEAD_LETTER_QUEUEA_NAME)
    public void receiveA(Message message, Channel channel) throws IOException {
        System.out.println("收到死信消息A:" + new String(message.getBody()));
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }    @RabbitListener(queues = DEAD_LETTER_QUEUEB_NAME)
    public void receiveB(Message message, Channel channel) throws IOException {
        System.out.println("收到死信消息B:" + new String(message.getBody()));
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}

④、为了方便测试,写一个简单的消息生产者,并通过controller层来生产消息。

@Componentpublic class BusinessMessageSender {    @Autowired
    private RabbitTemplate rabbitTemplate;    public void sendMsg(String msg){
        rabbitTemplate.convertSendAndReceive(BUSINESS_EXCHANGE_NAME, "", msg);
    }
}
@RequestMapping("rabbitmq")@RestControllerpublic class RabbitMQMsgController {    @Autowired
    private BusinessMessageSender sender;    @RequestMapping("sendmsg")
    public void sendMsg(String msg){
        sender.sendMsg(msg);
    }
}

三、项目结构图

image.png

四、补充总结

死信队列其实并没有什么神秘的地方,不过是绑定在死信交换机上的普通队列,而死信交换机也只是一个普通的交换机,不过是用来专门处理死信的交换机。

总结一下死信消息的生命周期:

死信消息是RabbitMQ为我们做的一层保证,其实我们也可以不使用死信队列,而是在消息消费异常时,将消息主动投递到另一个交换机中,当你明白了这些之后,这些Exchange和Queue想怎样配合就能怎么配合。比如从死信队列拉取消息,然后发送邮件、短信、钉钉通知来通知开发人员关注。或者将消息重新投递到一个队列然后设置过期时间,来进行延时消费。

到此这篇关于Springboot结合rabbitmq实现的死信队列的文章就介绍到这了,更多相关Springboot rabbitmq死信队列内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文