java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > RabbitMQ交换机

Java中RabbitMQ消息队列的交换机详解

作者:迷鹿小女子

这篇文章主要介绍了Java中的RabbitMQ交换机详解,消息队列是指利用高效可靠的消息传递机制进行与平台无关的数据交流,并基于数据通信来进行分布式系统的集成,是在消息的传输过程中保存消息的容器,需要的朋友可以参考下

RabbitMQ交换机

在这里插入图片描述

交换机属性

直流交换机

直连交换机Direct Exchange(完全匹配路由key)

所有发送到Direct Exchange的消息会被转发到RouteKey中指定的Queue

注意:Direct模式可以使用RabbitMQ自带的Exchange:default Exchange,所以不需要将Exchange进行任何绑定(binding)操作,消息传递时,RouteKey必须完全匹配才会被队列接收,否则该消息会被抛弃;

在这里插入图片描述

消费端代码

package com.xieminglu.rabbitmqapi.exchange.direct;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
public class Consumer4DirectExchange {
    public static void main(String[] args) throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory() ;
        connectionFactory.setHost("192.168.248.134");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setAutomaticRecoveryEnabled(true);
        connectionFactory.setNetworkRecoveryInterval(3000);
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        //4 声明
        String exchangeName = "test_direct_exchange";
        String exchangeType = "direct";
        String queueName = "test_direct_queue";
        String routingKey = "test.direct";
        //表示声明了一个交换机
        channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
        //表示声明了一个队列
        channel.queueDeclare(queueName, false, false, false, null);
        //建立一个绑定关系:
        channel.queueBind(queueName, exchangeName, routingKey);
        //durable 是否持久化消息
        QueueingConsumer consumer = new QueueingConsumer(channel);
        //参数:队列名称、是否自动ACK、Consumer
        channel.basicConsume(queueName, true, consumer);
        //循环获取消息
        while(true){
            //获取消息,如果没有消息,这一步将会一直阻塞
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String msg = new String(delivery.getBody());
            System.out.println("收到消息:" + msg);
        }
    }
}

生产端代码

package com.xieminglu.rabbitmqapi.exchange.direct;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
 * @author 小李飞刀
 * @site www.javaxl.com
 * @company
 * @create  2019-11-18 10:22
 */
public class Producer4DirectExchange {
    public static void main(String[] args) throws Exception {
        //1 创建ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.248.134");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        //2 创建Connection
        Connection connection = connectionFactory.newConnection();
        //3 创建Channel
        Channel channel = connection.createChannel();
        //4 声明
        String exchangeName = "test_direct_exchange";
        String routingKey = "test.direct";
//        String routingKey = "test.direct111"; //收不到
        //5 发送
        String msg = "Hello World RabbitMQ 4  Direct Exchange Message 111 ... ";
        channel.basicPublish(exchangeName, routingKey , null , msg.getBytes());
    }
}

代码的区别: 一条消息只会发送在一个队列里

在这里插入图片描述

创建一个交换机与队列

在这里插入图片描述

所绑定的交换机

在这里插入图片描述

控制台输出

在这里插入图片描述

主题交换机

主题交换机Topic Exchange(匹配路由规则的交换机)

所有发送到Topic Exchange的消息被转发到所有关系RouteKey中指定Topic的Queue上;

Exchange将RouteKey和某Topic进行模糊匹配,此时队列需要绑定一个Topic;

注意:可以使用通配符进行模糊匹配

列如:

在这里插入图片描述

消费端代码

package com.xieminglu.rabbitmqapi.exchange.topic;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
public class Consumer4TopicExchange {
    public static void main(String[] args) throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory() ;
        connectionFactory.setHost("192.168.248.134");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setAutomaticRecoveryEnabled(true);
        connectionFactory.setNetworkRecoveryInterval(3000);
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        //4 声明
        String exchangeName = "test_topic_exchange";
        String exchangeType = "topic";
        String queueName = "test_topic_queue";
        String routingKey = "user.#";
//        String routingKey = "user.*";
        // 1 声明交换机
        channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
        // 2 声明队列
        channel.queueDeclare(queueName, false, false, false, null);
        // 3 建立交换机和队列的绑定关系:
        channel.queueBind(queueName, exchangeName, routingKey);
        //durable 是否持久化消息
        QueueingConsumer consumer = new QueueingConsumer(channel);
        //参数:队列名称、是否自动ACK、Consumer
        channel.basicConsume(queueName, true, consumer);
        //循环获取消息
        while(true){
            //获取消息,如果没有消息,这一步将会一直阻塞
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String msg = new String(delivery.getBody());
            System.out.println("收到消息:" + msg);
        }
    }
}

生产端代码

package com.xieminglu.rabbitmqapi.exchange.topic;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer4TopicExchange {
    public static void main(String[] args) throws Exception {
        //1 创建ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.248.134");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        //2 创建Connection
        Connection connection = connectionFactory.newConnection();
        //3 创建Channel
        Channel channel = connection.createChannel();
        //4 声明
        String exchangeName = "test_topic_exchange";
        String routingKey1 = "user.save";
        String routingKey2 = "user.update";
        String routingKey3 = "user.delete.abc";
        //5 发送
        String msg = "Hello World RabbitMQ 4 Topic Exchange Message ...";
        channel.basicPublish(exchangeName, routingKey1 , null , msg.getBytes());
        channel.basicPublish(exchangeName, routingKey2 , null , msg.getBytes());
        channel.basicPublish(exchangeName, routingKey3 , null , msg.getBytes());
        channel.close();
        connection.close();
    }
}

代码的区别: 一条消息会发送在多个队列里 消费端:

在这里插入图片描述

生产端:

在这里插入图片描述

控制台输出

在这里插入图片描述

并且可以同时绑定多个交换机

在这里插入图片描述

输出交换机

输出交换机Fanout Exchange(不做路由)

在这里插入图片描述

消费端代码

package com.xieminglu.rabbitmqapi.exchange.fanout;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
public class Consumer4FanoutExchange {
    public static void main(String[] args) throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory() ;
        connectionFactory.setHost("192.168.248.134");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setAutomaticRecoveryEnabled(true);
        connectionFactory.setNetworkRecoveryInterval(3000);
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        //4 声明
        String exchangeName = "test_fanout_exchange";
        String exchangeType = "fanout";
        String queueName = "test_fanout_queue";
        String routingKey = "";    //不设置路由键
        channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
        channel.queueDeclare(queueName, false, false, false, null);
        channel.queueBind(queueName, exchangeName, routingKey);
        //durable 是否持久化消息
        QueueingConsumer consumer = new QueueingConsumer(channel);
        //参数:队列名称、是否自动ACK、Consumer
        channel.basicConsume(queueName, true, consumer);
        //循环获取消息
        while(true){
            //获取消息,如果没有消息,这一步将会一直阻塞
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String msg = new String(delivery.getBody());
            System.out.println("收到消息:" + msg);
        }
    }
}

生产端代码

package com.xieminglu.rabbitmqapi.exchange.fanout;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer4FanoutExchange {
    public static void main(String[] args) throws Exception {
        //1 创建ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.248.134");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        //2 创建Connection
        Connection connection = connectionFactory.newConnection();
        //3 创建Channel
        Channel channel = connection.createChannel();
        //4 声明
        String exchangeName = "test_fanout_exchange";
        //5 发送
        for(int i = 0; i < 10; i ++) {
            String msg = "Hello World RabbitMQ 4 FANOUT Exchange Message ...";
            channel.basicPublish(exchangeName, "", null , msg.getBytes());
        }
        channel.close();
        connection.close();
    }
}

消费端:

在这里插入图片描述

生产端:

在这里插入图片描述

在这里插入图片描述

控制台输出

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

Binding-绑定

Queue-消息队列

Message-消息

Message-其他属性

Virtual host-虚拟主机

总结一下

交换机的概念

在没有交换机的时候,我们的消息队列会处理所有发给这个消息队列的消息,然后由消费者一个一个消费这个队列里面的消息,如果由集群的话还会分摊对这个消息队列的处理。只不过这里面有一个

Message acknowledgment的概念

这将会导致严重的bug——Queue中堆积的消息会越来越多

在这里插入图片描述

当然一般的消息中间件都不会这么干,我们使用了交换机后,我们看到我们的三种策略,其实都可以说由交换机去找跟它所绑定的消息队列,如果生产端的路由键不符合要求或找不到消息队列定好的路由键的话就会进行其他处理。

在这里插入图片描述

到此这篇关于Java中RabbitMQ消息队列的交换机详解的文章就介绍到这了,更多相关RabbitMQ交换机内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文