java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > channel接口常用参数

RabbitMq中channel接口的几种常用参数详解

作者:Alan0517

这篇文章主要介绍了RabbitMq中channel接口的几种常用参数详解,RabbitMQ 不会为未确认的消息设置过期时间,它判断此消息是否需要重新投递给消费者的唯一依据是消费该消息的消费者连接是否己经断开,需要的朋友可以参考下

1. 背景概述

为了保证消息从队列可靠地达到消费者, RabbitMQ 提供了消息确认机制( message acknowledgement), 消费者在订阅队列时,可以指定autoAck参数,

采用消息确认机制后,只要设置autoAck 参数为false ,消费者就有足够的时间处理消息(任务) ,不用担心处理消息过程中消费者进程挂掉后消息丢失的问题,因为RabbitMQ 会一直等待持有消息直到消费者显式调用Basic.Ack 命令为止。

当autoAck 参数置为false ,对于RabbitMQ 服务端而言,队列中的消息分成了两个部分:一部分是等待投递给消费者的消息:一部分是己经投递给消费者,但是还没有收到消费者确认信号的消息。如果RabbitMQ 一直没有收到消费者的确认信号,并且消费此消息的消费者己经断开连接, 则RabbitMQ 会安排该消息重新进入队列,等待投递给下一个消费者,当然也有可能还是原来的那个消费者。

RabbitMQ 不会为未确认的消息设置过期时间,它判断此消息是否需要重新投递给消费者的唯一依据是消费该消息的消费者连接是否己经断开, 这么设计的原因是RabbitMQ 允许消费者消费一条消息的时间可以很久很久。

RabbtiMQ 的Web 管理平台(15672端口)上可以看到当前队列中的" Ready" 状态和"Unacknowledged" 状态的消息数,分别对应上文中的等待投递给消费者的消息数和己经投递给消费者但是未收到确认信号的消息数

也可以通过相应的命令来查看上述信息:

rabbitmqctl list_queues name messages_ready messages_unacknowledged

在消费者接收到消息后,如果想明确拒绝当前的消息而不是确认,那么应该怎么做呢?

RabbitMQ 在2 .0.0 版本开始引入了Basic.Reject 这个命令,消费者客户端可以调用与其对应的channel.basicReject 方法来告诉RabbitMQ 拒绝这个消息。

Channel 类中的basicReject 方法定义如下:

void basicReject(long deliveryTag, boolean requeue) throws IOException;

其中deliveryTag 可以看作消息的编号,它是一个64 位的长整型值,最大值是9223372036854775807, 如果requeue 参数设置为true ,则RabbitMQ 会重新将这条消息存入队列,以便可以发送给下一个订阅的消费者, 如果requeue 参数设置为false ,则RabbitMQ立即会把消息从队列中移除,而不会把它发送给新的消费者。

Basic.Reject 命令一次只能拒绝一条消息,如果想要批量拒绝消息,则可以使用Basic.Nack 这个命令, 消费者客户端可以调用channel.basicNack 方法来实现,方法定义如下:

void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;

其中deliveryTag 和requeue 的含义可以参考basicReject 方法。

注意要点:

将channel.basicReject 或者channel.basicNack 中的requeue 设置为false ,可以启用" 死信队列 "的功能。

死信队列可以通过检测被拒绝或者未送达的消息来追踪问题。 对于requeue , AMQP 中还有一个命令Basic.Recover 具备可重入队列的特性。其对应的客户端方法为:

1.Basic.RecoverOk basicRecover() throws IOException;

2.Basic.RecoverOk basicRecover(boolean requeue) throws IOException;

这个channel.basicRecover 方法用来请求RabbitMQ 重新发送还未被确认的消息。

如果requeue 参数设置为true, 则未被确认的消息会被重新加入到队列中, 这样对于同一条消息来说,可能会被分配给与之前不同的消费者。

如果requeue 参数设置为false ,那么同一条消息会被分配给与之前相同的消费者, 默认情况下,如果不设置requeue 这个参数,相当于channel.basicRecover(true) ,即requeue 默认为true

2. 通常参数解释

因此 deliveryTag 可以用来回传告诉 rabbitmq 这个消息处理成功 清除此消息(basicAck方法)。

3. Channel一些Api解释

3.1. basicNack 不确认消息

    void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;

简单理解就是: 不确认deliveryTag对应的消息

第二个参数,怎么理解basic.nack多消息,比如现在有多条消息去调用这个nack方法,他是怎么执行的?

注意: nack后的消息也会被自己消费;

3.2. basicReject 拒绝消息

Basic.RecoverOk basicRecover(boolean requeue) throws IOException;

简单理解就是:拒绝deliveryTag对应的消息

区别在于:

3.3. RecoverOk 是否恢复消息到队列

Basic.RecoverOk basicRecover(boolean requeue) throws IOException;

是否恢复消息到队列,参数是是否requeue,true则重新入队列,并且尽可能的将之前recover的消息投递给其他消费者消费,而不是自己再次消费。 false则消息会重新被投递给自己。

3.4. exchangeDeclare 声明交换机

有多个重载方法,这些方法都是由下面这个方法中的缺省参数构成的

Exchange.DeclareOk exchangeDeclare(String exchange,String type,boolean durable,boolean autoDelete,boolean internal,Map<String,Object> arguments) throws IOException;

3.5. queueDeclare 声明队列

  Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map&lt;String, Object&gt; arguments) throws IOException;

3.6. queueBind 绑定队列

 Queue.BindOk queueBind(String queue, String exchange, String routingKey, Map&lt;String, Object&gt; arguments) throws IOException;

3.7. queueUnbind 解绑队列

Queue.UnbindOk queueUnbind(String queue, String exchange, String routingKey, Map&lt;String, Object&gt; arguments) throws IOException;

3.8. exchangeBind 绑定交换机

  Exchange.BindOk exchangeBind(String destination, String source, String routingKey, Map<String, Object> arguments) throws IOException;

消息从source交换器转发到destination交换器存储在destination绑定的队列queue中

3.9. exchangeUnbind 解绑交换机

 Exchange.UnbindOk exchangeUnbind(String destination, String source, String routingKey, Map&lt;String, Object&gt; arguments) throws IOException;

3.10. basicQos 消息流量

有多个重载方法,这些方法都是由下面这个方法中的缺省参数构成的,

void basicQos(int prefetchSize, int prefetchCount, boolean global)

消费者在接收到队列里的消息但没有返回确认结果之前,队列不会将新的消息分发给该消费者。

队列中没有被消费的消息不会被删除,还是存在于队列中。

一般和channel.basicAck配套使用

3.11. basicAck 消息确认

 void basicAck(long deliveryTag, boolean multiple) throws IOException

3.12. basicConsume 消息消费

该重载方法有点多,具体我就不列举了,参数解释一下:

启动一个消费者,并返回服务端生成的消费者标识

3.13. basicPublish 发布消息

void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body)

3.14. basicGet 主动拉取队列中的一条消息

GetResponse basicGet(String queue, boolean autoAck)

3.15. basicCancel 取消消费者对队列的订阅关系

void basicCancel(String consumerTag)

consumerTag:服务器端生成的消费者标识

4. 消息确认一些观点

到此这篇关于RabbitMq中channel接口的几种常用参数详解的文章就介绍到这了,更多相关channel接口常用参数内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文