SpringBoot中的多RabbitMQ数据源配置实现
作者:阿劲
简介
在构建复杂的应用程序时,经常需要与多个数据源进行交互。这可能包括连接多个数据库、消息队列或其他数据存储系统。RabbitMQ 是一个流行的消息队列系统,它通过消息队列实现了应用程序之间的松耦合,适用于异步任务处理、解耦、削峰填谷等场景。本篇博客将介绍如何在 Spring Boot 中配置和管理多个 RabbitMQ 数据源,以满足不同的应用需求,并提供示例代码使用
1. 依赖引入
首先,在 pom.xml 文件中添加 RabbitMQ 的 Spring Boot Starter 依赖,以便引入 RabbitMQ 相关的库和功能。
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
2. 抽象类
创建一个抽象类 AbstractRabbitConfiguration,其中包含了RabbitMQ的基本配置信息。这些信息包括主机、端口、用户名、密码、虚拟主机、队列名、交换机名、确认机制和消费条数等。这个抽象类的目的是为了让子类继承这些基本配置信息,并根据不同的数据源创建相应的RabbitMQ连接和管理器。
@Data public abstract class AbstractRabbitConfiguration { protected String host; protected Integer port; protected String userName; protected String password; protected String virtualHost; protected String queueName; protected String exchangeName; protected String routingKey; protected String acknowledge = "manual"; protected Integer prefetch = 1; public ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setHost(host); connectionFactory.setPort(port); connectionFactory.setVirtualHost(virtualHost); connectionFactory.setUsername(userName); connectionFactory.setPassword(password); connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED); connectionFactory.setPublisherReturns(Boolean.TRUE); return connectionFactory; } }
3. 子类
在抽象类的基础上,我们可以创建多个子类,每个子类对应一个不同的RabbitMQ数据源配置。以一个名为 RabbitConfig 的子类为例,假设它是用于主数据源的配置。
@Configuration @ConfigurationProperties(prefix = "kxj.rabbit") public class RabbitConfig extends AbstractRabbitConfiguration { @Bean("primaryConnectionFactory") @Primary public ConnectionFactory primaryConnectionFactory() { return super.connectionFactory(); } @Bean @Primary public RabbitTemplate rabbitTemplate(@Qualifier("primaryConnectionFactory") ConnectionFactory connectionFactory, @Qualifier("confirmCallback") ConfirmCallback confirmCallback, @Qualifier("returnCallback") ReturnCallback returnCallback) { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setMandatory(true); rabbitTemplate.setConfirmCallback(confirmCallback); rabbitTemplate.setReturnCallback(returnCallback); rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter()); return rabbitTemplate; } @Bean(name = "primaryContainerFactory") public SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory( SimpleRabbitListenerContainerFactoryConfigurer configurer, @Qualifier("primaryConnectionFactory") ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); // 设置ACK确认机制 factory.setAcknowledgeMode(AcknowledgeMode.valueOf(acknowledge.toUpperCase())); // 设置消费者消费条数 factory.setPrefetchCount(prefetch); configurer.configure(factory, connectionFactory); return factory; } @Bean(name = "primaryRabbitAdmin") public RabbitAdmin rabbitAdmin(@Qualifier("primaryConnectionFactory") ConnectionFactory connectionFactory) { RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory); rabbitAdmin.setAutoStartup(true); // 声明交换机,队列及对应绑定关系 Queue queue = RabbitmqUtil.createQueue(queueName); FanoutExchange exchange = RabbitmqUtil.createFanoutExchange(exchangeName); Binding binding = RabbitmqUtil.createBinding(queue, exchange, ""); RabbitmqUtil.createRabbitAdmin(queue, exchange, binding, rabbitAdmin); return rabbitAdmin; } }
在子类中,我们使用 @Configuration 注解将它标记为Spring的配置类,并使用 @ConfigurationProperties 注解将以 kxj.rabbit 为前缀的配置属性注入到类中。这使得我们可以在配置文件中为不同的数据源配置不同的RabbitMQ属性。
在子类中,我们定义多个Bean来配置RabbitMQ的连接、管理和消息处理等,以满足不同数据源的需求。在这里创建主数据源的连接工厂,并使用 @Primary 注解将其标记为默认的连接工厂。
除了连接工厂之外,我们还可以配置其他与RabbitMQ相关的Bean,如 RabbitTemplate、RabbitAdmin 以及回调类等。这些Bean可以根据不同数据源的需求进行配置,例如设置消息确认机制、消息返回机制和消息转换器等。
另外,我们在 rabbitTemplate 方法中也进行了一些配置,如设置 mandatory 为 true,设置消息转换器为 Jackson2JsonMessageConverter 等。
4. 配置回调类
在处理消息时,我们通常需要设置确认回调(ConfirmCallback)和返回回调(ReturnCallback)。这些回调类可以用于处理消息的确认和返回情况。
@Slf4j @Component public class ConfirmCallback implements RabbitTemplate.ConfirmCallback { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (ack) { log.info("传递消息到交换机成功,correlationData:{}, cause:{}", JSON.toJSONString(correlationData), cause); } else { log.error("传递消息到交换机失败,correlationData:{}, cause:{}", JSON.toJSONString(correlationData), cause); } } } @Slf4j @Component public class ReturnCallback implements RabbitTemplate.ReturnCallback { @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { String msg = new String(message.getBody()); log.error(String.format("消息{%s}不能被正确路由,routingKey为{%s}", msg, routingKey)); } }
5. 配置文件
server.port=8895 kxj.rabbit.host=MQ地址 kxj.rabbit.port=MQ端口 kxj.rabbit.virtualHost=/ kxj.rabbit.userName=guest kxj.rabbit.password=guest kxj.rabbit.queueName=test.queue kxj.rabbit.exchangeName=test.exchange kxj.rabbit.routingKey=test-routing-key
6. 工具类
在 RabbitMQ 的配置过程中,我们需要声明交换机、队列和绑定关系等,这些操作可以通过一个工具类 RabbitmqUtil 来实现。
public class RabbitmqUtil { public static DirectExchange createDirectExchange(String exchangeName) { if (StringUtils.isNotBlank(exchangeName)) { return new DirectExchange(exchangeName, true, false); } return null; } public static TopicExchange createTopicExchange(String exchangeName) { if (StringUtils.isNotBlank(exchangeName)) { return new TopicExchange(exchangeName, true, false); } return null; } public static FanoutExchange createFanoutExchange(String exchangeName) { if (StringUtils.isNotBlank(exchangeName)) { return new FanoutExchange(exchangeName, true, false); } return null; } public static Queue createQueue(String queueName) { if (StringUtils.isNotBlank(queueName)) { return new Queue(queueName, true); } return null; } public static Binding createBinding(Queue queueName, Exchange exchangeName, String routingKeyName) { if (Objects.nonNull(queueName) && Objects.nonNull(exchangeName)) { return BindingBuilder.bind(queueName).to(exchangeName).with(routingKeyName).noargs(); } return null; } // public static void createRabbitAdmin(Queue queue, DirectExchange exchange, Binding binding, RabbitAdmin rabbitAdmin) { // rabbitAdmin.declareQueue(queue); // rabbitAdmin.declareExchange(exchange); // rabbitAdmin.declareBinding(binding); // } public static void createRabbitAdmin(Queue queue, Exchange exchange, Binding binding, RabbitAdmin rabbitAdmin) { if (queue != null) { rabbitAdmin.declareQueue(queue); } if (exchange != null) { rabbitAdmin.declareExchange(exchange); } if (binding != null) { rabbitAdmin.declareBinding(binding); } } }
7. 测试用例
我们可以编写一些测试用例来验证以上配置是否正确。下面是一个发送消息到主数据源的示例:
@RunWith(SpringRunner.class) @SpringBootTest public class RabbitmqTest { @Autowired @Qualifier("primaryRabbitAdmin") private RabbitAdmin primaryRabbitAdmin; @Autowired @Qualifier("primaryContainerFactory") private SimpleRabbitListenerContainerFactory primaryContainerFactory; @Autowired @Qualifier("primaryConnectionFactory") private ConnectionFactory primaryConnectionFactory; @Autowired private RabbitTemplate primaryRabbitTemplate; @Test public void testSend() { String message = "Hello, World!"; primaryRabbitTemplate.convertAndSend("test.exchange", "test.routingKey", message); String receivedMessage = (String) primaryRabbitTemplate.receiveAndConvert("test.queue"); assertEquals(message, receivedMessage); } }
在上面的测试用例中,我们使用了 @Qualifier 注解来指定主数据源的 Bean,然后通过 RabbitTemplate 发送消息到 test.exchange,并在队列 test.queue 中接收到消息。我们可以通过断言来判断发送和接收的消息是否一致,以此验证配置是否正确。
总结
通过使用抽象类和子类的方式,我们可以轻松地配置和管理多个RabbitMQ数据源,每个数据源可以有不同的属性配置。这种方法使得我们的应用程序更具灵活性,能够与多个RabbitMQ实例交互,满足不同数据源的需求。同时,回调类的使用也可以帮助我们处理消息的确认和返回情况,确保消息的可靠性传递。
到此这篇关于SpringBoot中的多RabbitMQ数据源配置实现的文章就介绍到这了,更多相关SpringBoot 多RabbitMQ数据源配置内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!