Golang实现RabbitMQ中死信队列几种情况
作者:ccgkk
本文主要介绍了Golang实现RabbitMQ中死信队列几种情况,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
下面这段教程针对是你已经有一些基本的MQ的知识,比如说能够很清楚的理解queue、exchange等概念,如果你还不是很理解,我建议你先访问官网查看基本的教程。
1、造成死信队列的主要原因
- 消费者超时未应答
- 队列的容量有限
- 消费者拒绝了的消息
2、操作逻辑图
3、代码实战
其实整体的思路就是分别创建一个normal_exchange、dead_exchange、normal_queue、dead_queue,然后将normal_exchange与normal_queue进行绑定,将dead_exchange与dead_queue进行绑定,这里比较关键的一个点在于说如何将normal_queue与dead_exchange进行绑定,这样才能将错误的消息传递过来。下面就是这段代码的关键。
// 声明一个normal队列 _, err = ch.QueueDeclare( constant.NormalQueue, true, false, false, false, amqp.Table{ //"x-message-ttl": 5000, // 指定过期时间 //"x-max-length": 6, // 指定长度。超过这个长度的消息会发送到dead_exchange中 "x-dead-letter-exchange": constant.DeadExchange, // 指定死信交换机 "x-dead-letter-routing-key": constant.DeadRoutingKey, // 指定死信routing-key })
3.1 针对原因1:消费者超出时间未应答
consumer1.go
package day07 import ( amqp "github.com/rabbitmq/amqp091-go" "log" "v1/utils" ) type Constant struct { NormalExchange string DeadExchange string NormalQueue string DeadQueue string NormalRoutingKey string DeadRoutingKey string } func Consumer1() { // 获取连接 ch := utils.GetChannel() // 创建一个变量常量 constant := Constant{ NormalExchange: "normal_exchange", DeadExchange: "dead_exchange", NormalQueue: "normal_queue", DeadQueue: "dead_queue", NormalRoutingKey: "normal_key", DeadRoutingKey: "dead_key", } // 声明normal交换机 err := ch.ExchangeDeclare( constant.NormalExchange, amqp.ExchangeDirect, true, false, false, false, nil, ) utils.FailOnError(err, "Failed to declare a normal exchange") // 声明一个dead交换机 err = ch.ExchangeDeclare( constant.DeadExchange, amqp.ExchangeDirect, true, false, false, false, nil, ) utils.FailOnError(err, "Failed to declare a dead exchange") // 声明一个normal队列 _, err = ch.QueueDeclare( constant.NormalQueue, true, false, false, false, amqp.Table{ "x-message-ttl": 5000, // 指定过期时间 //"x-max-length": 6, "x-dead-letter-exchange": constant.DeadExchange, // 指定死信交换机 "x-dead-letter-routing-key": constant.DeadRoutingKey, // 指定死信routing-key }) utils.FailOnError(err, "Failed to declare a normal queue") // 声明一个dead队列:注意不要给死信队列设置消息时间,否者死信队列里面的信息会再次过期 _, err = ch.QueueDeclare( constant.DeadQueue, true, false, false, false, nil) utils.FailOnError(err, "Failed to declare a dead queue") // 将normal_exchange与normal_queue进行绑定 err = ch.QueueBind(constant.NormalQueue, constant.NormalRoutingKey, constant.NormalExchange, false, nil) utils.FailOnError(err, "Failed to binding normal_exchange with normal_queue") // 将dead_exchange与dead_queue进行绑定 err = ch.QueueBind(constant.DeadQueue, constant.DeadRoutingKey, constant.DeadExchange, false, nil) utils.FailOnError(err, "Failed to binding dead_exchange with dead_queue") // 消费消息 msgs, err := ch.Consume(constant.NormalQueue, "", false, // 这个地方一定要关闭自动应答 false, false, false, nil) utils.FailOnError(err, "Failed to consume in Consumer1") var forever chan struct{} go func() { for d := range msgs { if err := d.Reject(false); err != nil { utils.FailOnError(err, "Failed to Reject a message") } } }() log.Printf(" [*] Waiting for logs. To exit press CTRL+C") <-forever }
consumer2.go
package day07 import ( amqp "github.com/rabbitmq/amqp091-go" "log" "v1/utils" ) func Consumer2() { // 拿取信道 ch := utils.GetChannel() // 声明一个交换机 err := ch.ExchangeDeclare( "dead_exchange", amqp.ExchangeDirect, true, false, false, false, nil) utils.FailOnError(err, "Failed to Declare a exchange") // 接收消息的应答 msgs, err := ch.Consume("dead_queue", "", false, false, false, false, nil, ) var forever chan struct{} go func() { for d := range msgs { log.Printf("[x] %s", d.Body) // 开启手动应答ß d.Ack(false) } }() log.Printf(" [*] Waiting for logs. To exit press CTRL+C") <-forever }
produce.go
package day07 import ( "context" amqp "github.com/rabbitmq/amqp091-go" "strconv" "time" "v1/utils" ) func Produce() { // 获取信道 ch := utils.GetChannel() // 声明一个交换机 err := ch.ExchangeDeclare( "normal_exchange", amqp.ExchangeDirect, true, false, false, false, nil) utils.FailOnError(err, "Failed to declare a exchange") ctx, cancer := context.WithTimeout(context.Background(), 5*time.Second) defer cancer() // 发送了10条消息 for i := 0; i < 10; i++ { msg := "Info:" + strconv.Itoa(i) ch.PublishWithContext(ctx, "normal_exchange", "normal_key", false, false, amqp.Publishing{ ContentType: "text/plain", Body: []byte(msg), }) } }
3.2 针对原因2:限制一定的长度
只需要改变consumer1.go中的对normal_queue的声明
// 声明一个normal队列 _, err = ch.QueueDeclare( constant.NormalQueue, true, false, false, false, amqp.Table{ //"x-message-ttl": 5000, // 指定过期时间 "x-max-length": 6, "x-dead-letter-exchange": constant.DeadExchange, // 指定死信交换机 "x-dead-letter-routing-key": constant.DeadRoutingKey, // 指定死信routing-key })
3.3 针对原因3:消费者拒绝的消息回到死信队列中
这里需要完成两点工作
工作1:需要在consumer1中作出拒绝的操作
go func() { for d := range msgs { if err := d.Reject(false); err != nil { utils.FailOnError(err, "Failed to Reject a message") } } }()
工作2:如果你consume的时候开启了自动应答一定要关闭
// 消费消息 msgs, err := ch.Consume(constant.NormalQueue, "", false, // 这个地方一定要关闭自动应答 false, false, false, nil)
其他的部分不需要改变,按照问题1中的设计即可。
到此这篇关于Golang实现RabbitMQ中死信队列几种情况的文章就介绍到这了,更多相关Golang RabbitMQ死信队列内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!