SpringCloud Stream 整合RabbitMQ的基本步骤
作者:毛毛的猫毛
本篇简单介绍SpringCloud Stream 整合RabbitMQ基本步骤:
- 引入SpringCloud
- 引入SpringCloud Stream相关依赖
- 定义绑定接口: 消息生产者(Output…Binding) 、消息消费者(Input…Binding)
- @EnableBinding 在对应类上进行定义
- @StreamListener 在对应方法上创建监听用来消费消息
- 调用output的send()方法生产消息
一、项目介绍
演示SpringCloud Stream 整合RabbitMQ,项目可以在一个工程里完成,本次建立了一个工程mq-service,其中包含三个Module:
- mq-service-base :基础模块(包含了共用依赖、共用变量)
- mq-service-producer :生产者
- mq-service-consumer :消费者
注: 完全可以在一个工程里实现,这里为了区分,并为了后续单独启动或停止生产者或消费者做实验,也为了适应实际应用项目,所以创建了不同Module
(1)版本
- SpringBoot : 2.0.6.RELEASE
- SpringCloud : Finchley.SR2
- RabbitMQ : 3.8.1
(2)项目整体结构
(3)基础模块
1)pom.xml
这里作为公共模块引入SpringCloud、Spring Cloud Stream等,其中也再此引入fastjson、lombok等工具依赖
(完整代码见文章最下面)
其中Spring Cloud Stream如下:
<!-- Spring Cloud Stream, 用于MQ消息发送--> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency>
2) model
定义共用的变量,如CollectionRequest.java
二、生产者
(1)结构
(2)pom.xml
导入base的依赖即可,因为相关共用依赖在base中已经引入
<dependency> <groupId>com.zrk</groupId> <artifactId>mq-service-base</artifactId> <version>0.0.1-SNAPSHOT</version> </dependency>
(3)定义绑定(接口)
OutputMessageBinding.java
public interface OutputMessageBinding { /** Topic 名称*/ String OUTPUT = "message-center-out"; @Output(OUTPUT) MessageChannel output(); }
(4)添加配置
# rabbitmq连接信息 spring.rabbitmq.addresses=192.168.1.125 spring.rabbitmq.port=5672 spring.rabbitmq.username=admin spring.rabbitmq.password=123456 spring.cloud.stream.bindings.message-center-out.destination=message-center spring.cloud.stream.rabbit.bindings.message-center-out.consumer.exchangeType=fanout
(5) 调用方法
CollectionServiceImpl.java
@Service @EnableBinding(OutputMessageBinding.class) public class CollectionServiceImpl implements CollectionService{ @Resource private OutputMessageBinding outputMessageBinding; /** * @param schoolName * @param content */ @Override public void getCollection(String schoolName, String content) { CollectionRequest request = new CollectionRequest(); request.setSchoolName(schoolName); request.setContent(content); outputMessageBinding.output().send(MessageBuilder.withPayload(request).build()); } }
注: 主要是两点
- @EnableBinding 定义
- outputMessageBinding.output().send(MessageBuilder.withPayload(request).build()); 生产消息
三、消费者
(1)结构
(2)pom.xml
导入base的依赖即可,因为相关共用依赖在base中已经引入
<dependency> <groupId>com.zrk</groupId> <artifactId>mq-service-base</artifactId> <version>0.0.1-SNAPSHOT</version> </dependency>
(3)定义绑定(接口)
InputMessageBinding.java
public interface InputMessageBinding { String INPUT = "message-center-input"; @Input(INPUT) SubscribableChannel input(); }
注: 消费者这里与生产者不同,用的是SubscribableChannel ,而生产者用的是MessageChannel
(4)添加配置
# rabbitmq连接信息 spring.rabbitmq.addresses=192.168.1.125 spring.rabbitmq.port=5672 spring.rabbitmq.username=admin spring.rabbitmq.password=123456 spring.cloud.stream.bindings.message-center-input.destination=message-center spring.cloud.stream.bindings.message-center-input.group=${spring.application.name}
(5) 调用方法
CollectionReceiver.java
@Slf4j @EnableBinding(InputMessageBinding.class) public class CollectionReceiver { @StreamListener(InputMessageBinding.INPUT) public void handle(String value){ log.info("[消息] 接收到发送消息MQ: {}", value); CollectionRequest request = JSON.parseObject(value, CollectionRequest.class); log.info("处理收集信息:" + request.toString()); } }
注: 主要是两点
- @EnableBinding 定义
- @StreamListener 注册监听
至此,生产者与消费者都创建完成,分别启动两个项目,并调用生产者接口进行验证:
四、验证 在postman 访问生产者接口:
localhost:30110/collection/getCollectionschoolName=‘zrk’&content=‘send message to rabbitmq’
观察消费者日志:
查看rabbitmq首页
则证明已经整合成功,接下来将研究一下更多的配置与用法。
如果有需要,可以参考项目完整代码:https://github.com/zrk333/mq-service
到此这篇关于SpringCloud Stream 整合RabbitMQ的文章就介绍到这了,更多相关SpringCloud Stream 整合RabbitMQ内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!