java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > RocketMQ延迟消息

RocketMQ-延迟消息的处理流程介绍

作者:pigcoffee

这篇文章主要介绍了RocketMQ-延迟消息的处理流程,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教

概述

RocketMQ 支持发送延迟消息,但不支持任意时间的延迟消息的设置,仅支持内置预设值的延迟时间间隔的延迟消息;

预设值的延迟时间间隔为:

1s、 5s、 10s、 30s、 1m、 2m、 3m、 4m、 5m、 6m、 7m、 8m、 9m、 10m、 20m、 30m、 1h、 2h;

在消息创建的时候,调用 setDelayTimeLevel(int level) 方法设置延迟时间;

broker在接收到延迟消息的时候会把对应延迟级别的消息先存储到对应的延迟队列中,等延迟消息时间到达时,会把消息重新存储到对应的topic的queue里面。

Broker处理延迟消息

CommitLog.putMessage()

//获取消息的sysflag
        final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
        //非事务消息 或 已commit事务消息
        if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
            || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
            // Delay Delivery 判断消息是否设置延迟
            if (msg.getDelayTimeLevel() > 0) {
                //判断延迟级别是否大于最大级别,如果大于最大值,则将延迟级别设置为最大级
                if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
                    msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
                }
                //延迟消息的topic为 SCHEDULE_TOPIC_XXXX
                topic = ScheduleMessageService.SCHEDULE_TOPIC;
                //获取延迟级别,一个延迟级别对应一个Queue
                queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
 
                // Backup real topic, queueId
                //消息原始的topic,queueid保存到消息的property中
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
                msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
 
                msg.setTopic(topic);
                msg.setQueueId(queueId);
            }
        }

1、判断消息类型,如果是非事务消息、已commit事务消息,才能处理延迟消息

2、判断消息是否设置延迟级别,如果延迟级别大于0,则该消息为延迟消息

3、判断延迟级别是否大于最大级别,如果大于最大值,则将延迟级别设置为最大级

4、延迟消息的topic为 SCHEDULE_TOPIC_XXXX

5、获取延迟级别,一个延迟级别对应一个Queue

6、消息原始的topic,queueid保存到消息的property中

7、修改消息的topci、queueid

启动延迟消息定时任务

ScheduleMessageService.start()

延迟消息投递

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

您可能感兴趣的文章:
阅读全文