SpringBoot集成RocketMQ的使用示例
作者:小月亮与六便士
RocketMQ是阿里巴巴开源的一款消息中间件,性能优秀,功能齐全,被广泛应用在各种业务场景,本文就来介绍一下SpringBoot集成RocketMQ的使用示例,感兴趣的可以了解一下
一、RocketMQ基本概念
消息模型(Message Model)
RocketMQ主要由Producer、Broker、Consumer三部分组成,其中Producer负责生产消息,Consumer负责消费消息,Broker负责存储消息。Broker在实际部署过程中对应一台服务器,每个Broker可以存储多个Topic的消息,每个Topic的消息也可以分片存储于不同的Broker。MessageQueue用于存储消息的物理地址,每个Topic中的消息地址存储于多个MessageQueue中。ConsumerGroup由多个Consumer实例构成。
1、在springBoot项目中添加Maven依赖
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.0.4</version> </dependency>
2、添加配置:
application.yml 文件中添加如下配置:
rocketmq: name-server: 192.168.152.165:9876 producer: group: my-group
SpringBoot 集成 RocketMQ代码:
生产者: 消息发送的三种方式
package com.rocketmq.springbootrocketmq; import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; import org.springframework.test.context.junit4.SpringRunner; import java.util.concurrent.TimeUnit; @RunWith(SpringRunner.class) @SpringBootTest public class T { @Autowired private RocketMQTemplate rocketMQTemplate; //同步消息 @Test public void testRocketMQ() { Message msg = MessageBuilder.withPayload("boot发送同步消息").build(); rocketMQTemplate.send("helloTopicBoot", msg); System.out.println("success send"); } //异步消息 @Test public void sendASYCMsg() throws InterruptedException { Message message = MessageBuilder.withPayload("boot发送异步消息").build(); rocketMQTemplate.asyncSend("helloTopicBoot", message, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { System.out.println("发送状态:"+sendResult.getSendStatus()); } @Override public void onException(Throwable throwable) { System.out.println("消息发送失败"); } }); TimeUnit.SECONDS.sleep(5); } //一次性消息 @Test public void sendOneWayRocketMQ() { Message msg = MessageBuilder.withPayload("boot发送一次性消息").build(); rocketMQTemplate.sendOneWay("helloTopicBoot", msg); } }
消费者:
package com.example.springbooTRocketMQConsumer.listener; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Component; import java.nio.charset.Charset; @Component @RocketMQMessageListener(consumerGroup = "htbConsumerGroup",topic = "helloTopicBoot") public class HelloTopicListener implements RocketMQListener<MessageExt> { @Override public void onMessage(MessageExt messageExt) { System.out.println("success get:"+new String(messageExt.getBody(), Charset.defaultCharset())); } }
消息消费的两种模式
集群模式:默认模式
广播模式:
消费者:messageModel = MessageModel.BROADCASTING
package com.example.springbooTRocketMQConsumer.listener; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.spring.annotation.MessageModel; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Component; import java.nio.charset.Charset; @Component @RocketMQMessageListener(consumerGroup = "htbConsumerGroup",topic = "helloTopicBoot",messageModel = MessageModel.BROADCASTING) public class HelloTopicListener implements RocketMQListener<MessageExt> { @Override public void onMessage(MessageExt messageExt) { System.out.println("success get:"+new String(messageExt.getBody(), Charset.defaultCharset())); } }
顺序消息
生产者:
//顺序消息 @Test public void sendOrderlyMsg(){ //设置队列选择器 rocketMQTemplate.setMessageQueueSelector(new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> list, org.apache.rocketmq.common.message.Message message, Object o) { String orderIdStr = (String) o; long orderId = Long.parseLong(orderIdStr); int index = (int)orderId % list.size(); return list.get(index); } }); List<OrderStep> orderSteps = OrderUtil.buildOrders(); for (OrderStep orderStep : orderSteps) { Message msg = MessageBuilder.withPayload(orderStep.toString()).build(); rocketMQTemplate.sendOneWayOrderly("orderlyTopicBoot",msg,String.valueOf(orderStep.getOrderId())); } }
消费者:
package com.example.springbooTRocketMQConsumer.listener; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.spring.annotation.ConsumeMode; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Component; import java.nio.charset.Charset; @Component @RocketMQMessageListener(consumerGroup = "orderlyConsumerBoot",topic = "orderlyTopicBoot",consumeMode = ConsumeMode.ORDERLY) public class OrderlyTopicListener implements RocketMQListener<MessageExt> { @Override public void onMessage(MessageExt messageExt) { System.out.println("当前线程:" + Thread.currentThread() + "队列ID"+messageExt.getQueueId() + ",消息内容:" + new String(messageExt.getBody(),Charset.defaultCharset())); } }
延迟消息
生产者:
//延迟消息 @Test public void sendDelayRocketMQ() { Message msg = MessageBuilder.withPayload("boot发送延时消息,发送时间:"+new Date()).build(); rocketMQTemplate.syncSend("helloTopicBoot", msg,3000,3); }
消费者:
package com.example.springbooTRocketMQConsumer.listener; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.spring.annotation.MessageModel; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Component; import java.nio.charset.Charset; import java.util.Date; @Component @RocketMQMessageListener(consumerGroup = "htbConsumerGroup",topic = "helloTopicBoot") public class DelayTopicListener implements RocketMQListener<MessageExt> { @Override public void onMessage(MessageExt messageExt) { System.out.println("success get:发送时间"+new Date()+new String(messageExt.getBody(), Charset.defaultCharset())); } }
消息Tag条件过滤
生成者
//Tag消息 @Test public void sendTagFilterRocketMQ() { Message msg1 = MessageBuilder.withPayload("消息A").build(); rocketMQTemplate.sendOneWay("tagFilterBoot:TagA", msg1); Message msg2 = MessageBuilder.withPayload("消息B").build(); rocketMQTemplate.sendOneWay("tagFilterBoot:TagB", msg2); Message msg3 = MessageBuilder.withPayload("消息C").build(); rocketMQTemplate.sendOneWay("tagFilterBoot:TagC", msg3); }
消费者:
package com.example.springbooTRocketMQConsumer.listener; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.spring.annotation.MessageModel; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Component; import java.nio.charset.Charset; import java.util.Date; @Component @RocketMQMessageListener(consumerGroup = "tagFilterGroupBoot",topic = "tagFilterBoot",selectorExpression = "TagA || TagC") public class TagFilterTopicListener implements RocketMQListener<MessageExt> { @Override public void onMessage(MessageExt messageExt) { System.out.println("success get:发送时间"+new Date()+new String(messageExt.getBody(), Charset.defaultCharset())); } }
SQL92消息过滤
生产者:
//SQL92消息 @Test public void sendSQL92FilterRocketMQ() { Message msg1 = MessageBuilder.withPayload("小红,年龄22,体重45").setHeader("age","22").setHeader("weight",45).build(); rocketMQTemplate.sendOneWay("SQL92FilterBoot", msg1); Message msg2 = MessageBuilder.withPayload("小明,年龄25,体重60").setHeader("age","25").setHeader("weight",60).build(); rocketMQTemplate.sendOneWay("SQL92FilterBoot", msg2); Message msg3 = MessageBuilder.withPayload("小蓝,年龄40,体重70").setHeader("age","40").setHeader("weight",70).build(); rocketMQTemplate.sendOneWay("SQL92FilterBoot", msg3); }
消费者:
package com.example.springbooTRocketMQConsumer.listener; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.annotation.SelectorType; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Component; import java.nio.charset.Charset; import java.util.Date; @Component @RocketMQMessageListener(consumerGroup = "SQL92FilterGroupBoot",topic = "SQL92FilterBoot",selectorType = SelectorType.SQL92,selectorExpression = "age > 23 and weight > 60") public class SQL92FilterTopicListener implements RocketMQListener<MessageExt> { @Override public void onMessage(MessageExt messageExt) { System.out.println("success get:发送时间"+new Date()+new String(messageExt.getBody(), Charset.defaultCharset())); } }
到此这篇关于SpringBoot集成RocketMQ的使用示例的文章就介绍到这了,更多相关SpringBoot集成RocketMQ内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!