RabbitMQ使用SpringAMQP的配置方法
作者:在无清风
这篇文章主要介绍了RabbitMQ使用SpringAMQP的配置方法,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧
简介
绝对的简单,绝对的易懂,方便初学者,更加利于理解和上手使用(代码可直接复制粘贴进行使用)
如有其它问题,大家可以留言或私聊。
主要为了给大家展示各个代码使用
如果需要更加完整的文档,可以点击下方连接进行阅读http://t.csdnimg.cn/WWdhG
配置
消息发送配置
logging: pattern: dateformat: MM-dd HH:mm:ss:SSS spring: rabbitmq: host: 8.137.59.245 port: 5672 username: itcast password: 123321 # 设置虚拟主机路径 virtual-host: /
消息接收配置
logging: pattern: dateformat: MM-dd HH:mm:ss:SSS spring: rabbitmq: host: 8.137.59.245 port: 5672 username: itcast password: 123321 virtual-host: / listener: simple: prefetch: 1 #每次只能接收一个消息,处理完后在队列中删除,在获取下一个消息
Basic Queue简单队列
简单介绍
消息发送者直接发送消息给队列,消息接收者接收队列中发过来的信息,实现消息的接收。
消息发送者
// Basic Queue简单队列 @Test public void testSendMessage2SimpleQueue(){ //队列名称 String queueName = "simple.queue"; //消息 String message = "Hello World!spring amqp!!"; //发送消息 rabbitTemplate.convertAndSend(queueName, message); /** * *rabbitTemplate.convertAndSend(queueName, routingKey, message); * 发送消息,参数分别是:队列名称,RoutingKey(暂时为空),消息 */ }
消息接收者
// Basic Queue简单队列 @RabbitListener(queues = "simple.queue") public void listenSimpleQueue(String msg){ System.out.println("消费者接收到simple.queue的消息:{"+msg+"}"); // 通过simple.queue队列接收消息,并答应出接收的消息 }
Work Queue队列
简单介绍
Work Queue队列和Basic Queue队列比较。当发送多个信息需要处理Work Queue队列可以分别交给多个队列处理,增加处理消息速度
消息发送者
//WorkQueue队列 @Test public void testSendMessage2WorkQueue() throws InterruptedException { // 队列名称 String queueName = "simple.queue"; // 消息 String message = "Hello World!spring amqp!!__"; for (int i = 1; i < 50; i++) { // 发送消息 rabbitTemplate.convertAndSend(queueName, message+i); Thread.sleep(20); } // 发送50个消息到simple.queue队列中 }
消息接收者
// Work Queue工作队列 @RabbitListener(queues = "simple.queue") public void listenWorkQueue1(String msg) throws InterruptedException { System.out.println("消费者1接收到simple.queue的消息:{"+msg+"}"+ LocalTime.now()); Thread.sleep(20); } @RabbitListener(queues = "simple.queue") public void listenWorkQueue2(String msg) throws InterruptedException { System.out.println("消费者2接收到simple.queue的消息:{"+msg+"}"+ LocalTime.now()); Thread.sleep(50); } // 分别用两个消息接收者来接收消息,防止同一个队列处理太快,所以使用sleep来减慢处理速度
发布订阅模型-Fanout交换机
简单介绍
这里添加交换机(主要将不同消息推送到不同队列中进行消费)
消息发送者
// 发布订阅模型-Fanout交换机 @Test public void testSendFanoutExchange(){ //交换机名称 String exchangeName = "itcast.fanout"; //消息 String message = "hello,every one!"; //发送消息,参数分别是:交换机名称,RoutingKey(暂时为空),消息 rabbitTemplate.convertAndSend(exchangeName, "", message); // 这里我们不给队列名,而给交换机名称,通过交换机来发送消息给不同的队列 }
消息接收者
这里添加了交换机所以我们先写交换机和不同的队列进行绑定
这里我们使用类来操作,后面两不会使用这种比较繁琐的绑定方式
交换机
//itcast.fanout(交换机) @Bean public FanoutExchange fanoutExchange(){ return new FanoutExchange("itcast.fanout"); } //itcast.queue1(队列一) @Bean public Queue fanoutQueue1(){ return new Queue("fanout.queue1"); } //绑定队列1到交换机 @Bean public Binding fanoutBinding1(Queue fanoutQueue1, FanoutExchange fanoutExchange){ return BindingBuilder .bind(fanoutQueue1). to(fanoutExchange); } //itcast.queue2(队列二) @Bean public Queue fanoutQueue2(){ return new Queue("fanout.queue2"); } //绑定队列2到交换机 @Bean public Binding fanoutBinding2(Queue fanoutQueue2, FanoutExchange fanoutExchange){ return BindingBuilder .bind(fanoutQueue2). to(fanoutExchange); } /** * 这里先定义一个队列,用于接收消息发送者发送的消息 * 然后再定义一个交换机,用于接收队列1和队列2的消息 * 这里的交换机和前面消息发送者的交换机一样名字。 */
消息接收者
// 发布订阅模型-Fanout交换机 @RabbitListener(queues = "fanout.queue1") public void listenFanoutQueue1(String msg){ System.out.println("消费者1接收到fanout.queue1的消息:{"+msg+"}"); } @RabbitListener(queues = "fanout.queue2") public void listenFanoutQueue2(String msg){ System.out.println("消费者2接收到fanout.queue2的消息:{"+msg+"}"); }
订阅发布模型-Direct交换机
简单介绍
Direct交换机和Fanout交换机比较,不一样点在于RoutingKey给的值不一样
消息发送者
// 订阅,发布模型-Direct交换机 @Test public void testSendDirectExchange(){ //交换机名称 String exchangeName = "itcast.direct"; //消息 String message = "hello,blue one!"; //发送消息,参数分别是:交换机名称,RoutingKey(暂时为空),消息 rabbitTemplate.convertAndSend(exchangeName, "blue", message); /** * 消息发送,参数分别是:交换机名称,RoutingKey,消息 * routingKey: 用来区分不同的队列 * 上面routingKey给“blue”的意义是将消息发送给blue队列 * */ }
消息接收者
// 订阅,发布模型-Direct交换机 @RabbitListener(bindings = @QueueBinding( //队列 value = @Queue(name = "direct.queue1"), //交换机 exchange = @Exchange(name = "itcast.direct",type = ExchangeTypes.DIRECT), //邦定机置 key = {"red","blue"} )) public void listenDirectQueue1(String msg){ System.out.println("消费者接收到direct.queue1的消息:{"+msg+"}"); // 接收red和blue的消息并传入队列进行消费 } @RabbitListener(bindings = @QueueBinding( //队列 value = @Queue(name = "direct.queue2"), //交换机 exchange = @Exchange(name = "itcast.direct",type = ExchangeTypes.DIRECT), //邦定机置 key = {"red","yellow"} )) public void listenDirectQueue2(String msg){ System.out.println("消费者接收到direct.queue2的消息:{"+msg+"}"); // 接收red和yellow的消息并传入队列进行消费 }
发布订阅模型-Topic交换机
简单介绍
个人理解:算是对Direct交换机进行扩展吧,可以实现更多样性的订阅发布
消息发送者
// 订阅,发布模型-Direct交换机 @Test public void testSendDirectExchange(){ //交换机名称 String exchangeName = "itcast.direct"; //消息 String message = "hello,blue one!"; //发送消息,参数分别是:交换机名称,RoutingKey(暂时为空),消息 rabbitTemplate.convertAndSend(exchangeName, "blue", message); /** * 消息发送,参数分别是:交换机名称,RoutingKey,消息 * routingKey: 用来区分不同的队列 * 上面routingKey给“blue”的意义是将消息发送给blue队列 * */ }
消息接收者
// 订阅,发布模型-Direct交换机 @RabbitListener(bindings = @QueueBinding( //队列 value = @Queue(name = "direct.queue1"), //交换机 exchange = @Exchange(name = "itcast.direct",type = ExchangeTypes.DIRECT), //邦定机置 key = {"red","blue"} )) public void listenDirectQueue1(String msg){ System.out.println("消费者接收到direct.queue1的消息:{"+msg+"}"); // 接收red和blue的消息并传入队列进行消费 } @RabbitListener(bindings = @QueueBinding( //队列 value = @Queue(name = "direct.queue2"), //交换机 exchange = @Exchange(name = "itcast.direct",type = ExchangeTypes.DIRECT), //邦定机置 key = {"red","yellow"} )) public void listenDirectQueue2(String msg){ System.out.println("消费者接收到direct.queue2的消息:{"+msg+"}"); // 接收red和yellow的消息并传入队列进行消费 }
Json序列化
简单介绍
为什么要添加呢:大致就是实现可消息发送类型的更多
添加Maven
<!--rabbitmq使用json序列化--> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> </dependency>
消息发送者
// 使用json发送对象队列 @Test public void testSendObjectQueue() { Map<String, Object> msg = new HashMap<>(); msg.put("name", "留言"); msg.put("age", 21); rabbitTemplate.convertAndSend("object.queue", msg); // 发送Map类型的消息到object.queue队列中 }
消息接收者
@RabbitListener(queues = "object.queue") public void listenObjectQueue(Map<String,Object> msg){ System.out.println("消费者接收到object.queue的消息:{"+msg+"}"); msg.forEach((k,v)->{ System.out.println(k+"="+v); System.out.println("{"+k+"}"); }); // 接收消息并循环打印接收到的消息 }
感悟
个人感觉使用其实不难,如果要看源码分析源码,可能就难起来了,希望以后有机会和大家一起分析分析其它项目的源码!谢谢!!!
到此这篇关于RabbitMQ使用SpringAMQP的文章就介绍到这了,更多相关RabbitMQ使用SpringAMQP内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!