SpringBoot整合RabbitMQ的5种模式的注解绑定详解
作者:小爽帅到拖网速
这篇文章主要介绍了SpringBoot整合RabbitMQ的5种模式的注解绑定详解,RabbitMQ 是一个消息中间件,它接收消息并且转发,是"消费-生产者模型"的一个典型的代表,一端往消息队列中不断的写入消息,而另一端则可以读取或者订阅队列中的消息,需要的朋友可以参考下
RabbitMQ 5种模式的注解绑定
1、导入依赖
<!--AMQP依赖,包含RabbitMQ--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <!--对象转换--> <dependency> <groupId>com.fasterxml.jackson.dataformat</groupId> <artifactId>jackson-dataformat-xml</artifactId> </dependency>
2、配置连接信息
spring: rabbitmq: host: localhost port: 5672 username: root password: root virtual-host: / # listener: # simple: # prefetch: 1 # 工作队列能者多劳模式
3、5种使用模式
1、HelloWorld模式
一个队列一个消费者
// 消息发布 @Test void testHelloWorldMode(){ rabbitTemplate.convertAndSend("helloworld_queue","hello world!"); } // 消息订阅 @RabbitListener(queuesToDeclare = {@Queue(name = "helloworld_queue")}) public void helloWorldC1(String msg){ System.out.println("helloWorldC1:-------->"+msg); }
2、Work模式 按均分配
一个队列,多个消费者
注意:一条消息只能被消费一次,默认是按均分配,在消费者开始消费之前队列中的消息就已经分配好了
往队列中放入50条消息
@Test void testWorkMode(){ String msg = "work mode"; for (int i = 1; i <= 50; i++) { rabbitTemplate.convertAndSend("work_zs_queue","work mode"+i); } }
创建两个消费者,并且设置20ms 和 200ms 的延迟
@RabbitListener(queuesToDeclare = {@Queue(name = "work_zs_queue")}) @SneakyThrows public void workc1(String msg){ Thread.sleep(20); System.out.println("workc1:---------------->"+msg); } @RabbitListener(queuesToDeclare = {@Queue(name="work_zs_queue")}) @SneakyThrows public void workc2(String msg){ Thread.sleep(200); System.err.println("workc2:------------------>"+msg); }
运行结果:
可以发现workc1比workc2提前完成消费任务,并且c1 c2是按照奇偶数顺序消费的任务,这也进一步验证了在消费开始前就已经分配好了任务
这种按均分配的效果效率低下,我们应该遵循能者多劳的方式去分配任务
能者多劳
修改配置文件,让消费者一次只能接收一个任务,当前任务消费完才可以接收下一个任务
spring: rabbitmq: host: 192.168.137.7 port: 5672 username: root password: root virtual-host: / listener: # 消息确认机制 simple: prefetch: 1
重新启动运行代码
3、Fanout模式
fanout模式也叫广播模式,每一条消息多可以被绑定在同一个交换机上的所有队列的消费者消费
参数1:交换机:fanout_exchange
参数2:routingkey 在fanout模式不使用,会在direct和topic模式使用
参数3:发送的消息
@Test void testFanoutMode(){ rabbitTemplate.convertAndSend("fanout_exchange","","fanout mode 1"); rabbitTemplate.convertAndSend("fanout_exchange","","fanout mode 2"); }
定义消费者
使用@RabbitListener注解中的bindings声明并绑定交换机和队列
@RabbitListener(bindings = @QueueBinding( value = @Queue(name = "fanout_queue1"), exchange = @Exchange(name = "fanout_exchange2",type = ExchangeTypes.FANOUT) )) public void fanoutc1(String msg){ System.out.println("fanoutc1:------------------>"+msg); } @RabbitListener(bindings=@QueueBinding( value = @Queue(name = "fanout_queue2"), exchange = @Exchange(name = "fanout_exchange2",type = ExchangeTypes.FANOUT) )) public void fanoutc2(String msg){ System.out.println("fanoutc1:------------------>"+msg); }
运行结果:
每一条消息都会被所有消费者消费
4、direct模式
direct模式与fanout模式的区别在于,队列都是绑定同一个交换机,但是在队列上会添加routingkey标识
消费者会根据不同的表示去消费对应队列中的消息
@RabbitListener(bindings = @QueueBinding( value = @Queue(name = "direct_queue1"), exchange = @Exchange(name = "direct_zs_exchange",type = ExchangeTypes.DIRECT), key = "zs_news1" )) public void direct1(String msg){ System.out.println("direct_queuq1:---------------------->"+msg); } @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "direct_queue2"), exchange = @Exchange(name = "direct_zs_exchange",type = ExchangeTypes.DIRECT), key = "zs_news2" )) public void direct2(String msg){ System.out.println("direct_queuq2:---------------------->"+msg); }
@Test void testDirectMode(){ rabbitTemplate.convertAndSend("direct_zs_exchange","zs_news1","direct mode1"); rabbitTemplate.convertAndSend("direct_zs_exchange","zs_news2","direct mode2"); }
5、topic模式
- Topic交换机接收的消息RoutingKey必须是多个单词,以 **.** 分割
- Topic交换机与队列绑定时的bindingKey可以指定通配符
- #:代表0个或多个词
- *:代表1个词
@RabbitListener(bindings = @QueueBinding( value = @Queue(name = "topic_queue1"), exchange = @Exchange(name = "topic_zs_exchange",type = ExchangeTypes.TOPIC), key = "zs_new.#" )) public void topic1(String msg){ User user = JSONUtil.toBean(msg, User.class); System.out.println(user.toString()); } @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "topic_queue1"), exchange = @Exchange(name = "topic_zs_exchange",type = ExchangeTypes.TOPIC), key = "#.zs_new" )) public void topic2(String msg){ User user = JSONUtil.toBean(msg, User.class); System.out.println(user.toString()); }
@Test void testTopicMode(){ String jsonStr1 = JSONUtil.toJsonStr(new User("小爽", 22)); rabbitTemplate.convertAndSend("topic_zs_exchange","zs_new.user",jsonStr1); String jsonStr2 = JSONUtil.toJsonStr(new User("路飞", 17)); rabbitTemplate.convertAndSend("topic_zs_exchange","lufei.zs_new",jsonStr2); }
到此这篇关于SpringBoot整合RabbitMQ的5种模式的注解绑定详解的文章就介绍到这了,更多相关RabbitMQ 5种模式的注解绑定内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!