RabbitMQ高级应用之消费端限流策略basicQos详解
作者:疯狂的帆
这篇文章主要介绍了RabbitMQ高级应用之消费端限流策略basicQos详解,高并发情况下,队列里面一瞬间就就积累了上万条数据,但是消费者无法同时处理这么多请求,这种场景下我们就需要对消费端进行限流,需要的朋友可以参考下
业务场景
高并发情况下,队列里面一瞬间就就积累了上万条数据,但是消费者无法同时处理这么多请求,这个时候当我们打开客户端,瞬间就有巨量的信息给推送过来
但是客户端是没有办法同时处理这么多数据的,结果就是消费者(客户端)挂掉了…
这种场景下我们就需要对消费端进行限流
限流策略实现
限流策略关键代码:
channel.basicQos();
编写生产者
// 生产者 public class Producer { private static final String QUEUE_NAME = "queue_limit_1"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); for (int i = 0; i < 100; i++) { channel.basicPublish("", QUEUE_NAME, null, ("消费端限流策略—测试数据:" + i).getBytes()); } channel.close(); connection.close(); } }
编写消费者1
// 消费者1 public class Consumer { private static final String QUEUE_NAME = "queue_limit_1"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者1接收到信息:" + new String(body)); channel.basicAck(envelope.getDeliveryTag(), false); } }; channel.basicConsume(QUEUE_NAME, false, defaultConsumer); } }
编写消费者2
// 消费者2 public class Consumer2 { private static final String QUEUE_NAME = "queue_limit_1"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); /** 设置限流机制 * param1: prefetchSize,消息本身的大小 如果设置为0 那么表示对消息本身的大小不限制 * param2: prefetchCount,告诉rabbitmq不要一次性给消费者推送大于N个消息 * param3:global,是否将上面的设置应用于整个通道,false表示只应用于当前消费者 */ channel.basicQos(0, 5, false); DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者2接收到信息:" + new String(body)); channel.basicAck(envelope.getDeliveryTag(), false); try { Thread.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } } }; channel.basicConsume(QUEUE_NAME, false, defaultConsumer); } }
运行结果
小结
- 限流的核心代码就是
channel.basicQos();
- 限流情况 ack 不能设置自动签收,一定要手动签收 channel.basicQos()
/** * @param prefetchSize maximum amount of content (measured in * octets) that the server will deliver, 0 if unlimited * @param prefetchCount maximum number of messages that the server * will deliver, 0 if unlimited * @param global true if the settings should be applied to the * entire channel rather than each consumer */ void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;
该方法的作用是:进行消费端的限流
- param1:prefetchSize,消息本身的大小 如果设置为0 那么表示对消息本身的大小不限制
- param2:prefetchCount,告诉rabbitmq不要一次性给消费者推送大于N个消息
- param3:global,是否将上面的设置应用于整个通道
- false:表示只应用于当前消费者
- true:表示当前通道的所有消费者都应用这个限流策略
到此这篇关于RabbitMQ高级应用之消费端限流策略basicQos详解的文章就介绍到这了,更多相关RabbitMQ消费端限流策略basicQos内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!