RocketMQ offset确认机制示例详解
作者:土豆肉丝盖浇饭
消息存储
offset?
本文要探讨的offset指的是上图中的Queue Offset。
为了保存消费的消费进度,避免重复消费,我们需要将offset保存下来。
针对集群消费,offset保存在broker,在客户端使用RemoteBrokerOffsetStore。
针对广播消费,offset保存在本地,在客户端使用LocalFileOffsetStore。
最后,比较重要的一点是,保存的offset指的是下一条消息的offset,而不是消费完最后一条消息的offset。
比如,你消费了上图中第一个Queue的offset为0的消息,其实保存的offset为1,表示下次我从offset=1的位置进行消费。
broker端
在broker端,通过ConsumerOffsetManager中的offsetTable来保存Topic下各个ConsumerGroup的消费进度。
private ConcurrentMap<String/* topic@group */, ConcurrentMap<Integer, Long>> offsetTable = new ConcurrentHashMap<String, ConcurrentMap<Integer, Long>>(512);
从offsetTable的双层Map结构也是能够看出,我上面说的消费进度,细指为ConsumerGroup在Topic下每个queue的消费进度。
offsetTable毕竟只是内存结构,因此ConsumerOffsetManager继承了ConfigManager实现了持久化功能。
实现了encode,decode,configFilePath三个模板方法。用于指定序列化,反序列化的逻辑以及保存位置
public String encode() { return this.encode(false); } @Override public String configFilePath() { return BrokerPathConfigHelper.getConsumerOffsetPath(this.brokerController.getMessageStoreConfig().getStorePathRootDir()); } @Override public void decode(String jsonString) { if (jsonString != null) { ConsumerOffsetManager obj = RemotingSerializable.fromJson(jsonString, ConsumerOffsetManager.class); if (obj != null) { this.offsetTable = obj.offsetTable; } } } public String encode(final boolean prettyFormat) { return RemotingSerializable.toJson(this, prettyFormat); }
其中序列化,反序列化的逻辑很简单,就是使用到了我们的FastJson。
保存文件名为consumerOffset.json。
offset加载
broker启动时从本地文件加载
org.apache.rocketmq.broker.BrokerController#initialize
result = result && this.consumerOffsetManager.load();
offset持久化
定时触发,持久化到磁盘
org.apache.rocketmq.broker.BrokerController#initialize
//定期将consumeroffset持久化到磁盘 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.consumerOffsetManager.persist(); } catch (Throwable e) { log.error("schedule persist consumerOffset error.", e); } } }, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
- shutdown触发
BrokerController#shutdown
offset更新
- ConsumerManageProcessor#updateConsumerOffset
用于consumer定时同步offset
- PullMessageProcessor
拉取消息时会顺带确认offset
- TransactionalMessageBridge
事务回查触发,暂不深入研究
consumer端
本文只讨论PUSH模式的集群消费,本地的offset缓存到RemoteBrokerOffsetStore的offsetTable中,定期同步到broker。
private ConcurrentMap<MessageQueue, AtomicLong> offsetTable = new ConcurrentHashMap<MessageQueue, AtomicLong>();
因为consumer每次重启都会重新拉取offset,只是一个临时存储,因此RemoteBrokerOffsetStore的offsetTable的设计没有像ConsumerOffsetManager那么复杂。
offset拉取
consumer启动后会进行第一次rebalance,并且之后都会定期rebalance。
在rebalance分配好messagequeue之后,会根据messagequeue生成processqueue进行消息拉取。
而在进行消息拉取前,有一个关键的操作,拉取对应messagequeue的offset。
RebalanceImpl#updateProcessQueueTableInRebalance
for (MessageQueue mq : mqSet) { if (!this.processQueueTable.containsKey(mq)) { //如果是顺序消费 但是lock失败 那么跳过 if (isOrder && !this.lock(mq)) { log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq); continue; } //从offsetstore删除之前的数据,可能之前有一段时间属于该消费者 this.removeDirtyOffset(mq); ProcessQueue pq = new ProcessQueue(); // 获取该mq应该从哪里开始消费 // pull模式 默认是0 // push模式 动态计算 long nextOffset = this.computePullFromWhere(mq); if (nextOffset >= 0) { ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq); if (pre != null) { log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq); } else { log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq); //创建新的pullRequest PullRequest pullRequest = new PullRequest(); pullRequest.setConsumerGroup(consumerGroup); pullRequest.setNextOffset(nextOffset); pullRequest.setMessageQueue(mq); pullRequest.setProcessQueue(pq); pullRequestList.add(pullRequest); changed = true; } } else { log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq); } } }
public long computePullFromWhere(MessageQueue mq) { long result = -1; final ConsumeFromWhere consumeFromWhere = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeFromWhere(); final OffsetStore offsetStore = this.defaultMQPushConsumerImpl.getOffsetStore(); switch (consumeFromWhere) { case CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST: case CONSUME_FROM_MIN_OFFSET: case CONSUME_FROM_MAX_OFFSET: case CONSUME_FROM_LAST_OFFSET: { long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE); if (lastOffset >= 0) { result = lastOffset; } // First start,no offset else if (-1 == lastOffset) { if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { result = 0L; } else { try { result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq); } catch (MQClientException e) { result = -1; } } } else { result = -1; } break; } case CONSUME_FROM_FIRST_OFFSET: { long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE); if (lastOffset >= 0) { result = lastOffset; } else if (-1 == lastOffset) { result = 0L; } else { result = -1; } break; } case CONSUME_FROM_TIMESTAMP: { long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE); if (lastOffset >= 0) { result = lastOffset; } else if (-1 == lastOffset) { if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { try { result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq); } catch (MQClientException e) { result = -1; } } else { try { long timestamp = UtilAll.parseDate(this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeTimestamp(), UtilAll.YYYYMMDDHHMMSS).getTime(); result = this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp); } catch (MQClientException e) { result = -1; } } } else { result = -1; } break; } default: break; } return result; }
其中获取消息拉取初始位置有三种策略
CONSUME_FROM_LAST_OFFSET 最新的offset
CONSUME_FROM_FIRST_OFFSET 第一个offset
CONSUME_FROM_TIMESTAMP 根据时间戳获取offset
但是从源码中可以看出来,实际上的逻辑和我们想象的有点不同,上面三个的逻辑的触发前提是,从broker拉取不到offset进度。
这应该是为了防止重复消费以及少消费,毕竟rocketmq是业务相关的mq。
long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE); if (lastOffset >= 0) { result = lastOffset; }
offset更新
在consumer端,针对offsetTable的更新,当然通过消费消息触发。
并发消费
ConsumeMessageConcurrentlyService#processConsumeResult
//removeMessage会返回offset //不管消费成功还是失败 都会确认offset //失败的消息会在重试topic long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs()); //更新offsetstore if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) { //increaseOnly 表示offset只能增加 this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true); }
针对并发消费的offset,更新值来源于ProcessQueue#removeMessage方法
/** * 移除pq中的msg,并且返回剩下第一条消息的offset * 如果消息全部消费完,返回this.queueOffsetMax + 1 * 如果msgs为空 返回-1 * @param msgs * @return */ public long removeMessage(final List<MessageExt> msgs) { long result = -1; final long now = System.currentTimeMillis(); try { this.lockTreeMap.writeLock().lockInterruptibly(); this.lastConsumeTimestamp = now; try { if (!msgTreeMap.isEmpty()) { result = this.queueOffsetMax + 1; int removedCnt = 0; for (MessageExt msg : msgs) { //通过map的key移除,也就是queueoffset MessageExt prev = msgTreeMap.remove(msg.getQueueOffset()); if (prev != null) { removedCnt--; msgSize.addAndGet(0 - msg.getBody().length); } } msgCount.addAndGet(removedCnt); //如果消息没有全部消费完毕,剩下消息中最小的那个 if (!msgTreeMap.isEmpty()) { result = msgTreeMap.firstKey(); } } } finally { this.lockTreeMap.writeLock().unlock(); } } catch (Throwable t) { log.error("removeMessage exception", t); } return result; }
removeMessage的逻辑,用到了滑动窗口的算法。
比如10条消息,offset为 0 - 9。
在多线程并发消费的场景下
比如我第一个线程消费了offset为0的消息,那么offsetTable中的offset更新为1
然后我第二个线程消费了offset为5的消息,removeMessage返回的offset还是为1
只有前面的消息全被消费了,窗口才会滑动
顺序消费
ConsumeMessageOrderlyService#processConsumeResult
if (commitOffset >= 0 && !consumeRequest.getProcessQueue().isDropped()) { this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), commitOffset, false); }
顺序消费,暂不研究。
offset持久化
最终的offset以broker为准,因此本地的offset要定期持久化到offset。
主要持久化逻辑在persistAll和persist方法。
org.apache.rocketmq.client.consumer.store.RemoteBrokerOffsetStore#persist
public void persist(MessageQueue mq) { AtomicLong offset = this.offsetTable.get(mq); if (offset != null) { try { this.updateConsumeOffsetToBroker(mq, offset.get()); log.info("[persist] Group: {} ClientId: {} updateConsumeOffsetToBroker {} {}", this.groupName, this.mQClientFactory.getClientId(), mq, offset.get()); } catch (Exception e) { log.error("updateConsumeOffsetToBroker exception, " + mq.toString(), e); } } }
persistAll和persist逻辑大致相同,核心逻辑都是通过updateConsumeOffsetToBroker持久化到broker。
触发持久化逻辑的时机有以下4个
定时同步
MQClientInstance#startScheduledTask
//定时同步offset到broker //除了拉消息的时候会同步下 也有定时 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { //持久化消费进度 MQClientInstance.this.persistAllConsumerOffset(); } catch (Exception e) { log.error("ScheduledTask persistAllConsumerOffset exception", e); } } }, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
shutdown
DefaultMQPullConsumerImpl#shutdown
public synchronized void shutdown() { switch (this.serviceState) { case CREATE_JUST: break; case RUNNING: this.persistConsumerOffset();//here this.mQClientFactory.unregisterConsumer(this.defaultMQPullConsumer.getConsumerGroup()); this.mQClientFactory.shutdown(); log.info("the consumer [{}] shutdown OK", this.defaultMQPullConsumer.getConsumerGroup()); this.serviceState = ServiceState.SHUTDOWN_ALREADY; break; case SHUTDOWN_ALREADY: break; default: break; } }
DefaultMQPushConsumerImpl#shutdown
public synchronized void shutdown() { switch (this.serviceState) { case CREATE_JUST: break; case RUNNING: this.consumeMessageService.shutdown(); this.persistConsumerOffset();//here this.mQClientFactory.unregisterConsumer(this.defaultMQPushConsumer.getConsumerGroup()); this.mQClientFactory.shutdown(); log.info("the consumer [{}] shutdown OK", this.defaultMQPushConsumer.getConsumerGroup()); this.rebalanceImpl.destroy(); this.serviceState = ServiceState.SHUTDOWN_ALREADY; break; case SHUTDOWN_ALREADY: break; default: break; } }
Rebalance
当一个queue不再属于当前consumer的时候,需要同步进步给broker,以便于新拿到queue的consumer从最新未消费的消息开始拉取
RebalancePullImpl#removeUnnecessaryMessageQueue
public boolean removeUnnecessaryMessageQueue(MessageQueue mq, ProcessQueue pq) { this.defaultMQPullConsumerImpl.getOffsetStore().persist(mq); this.defaultMQPullConsumerImpl.getOffsetStore().removeOffset(mq); return true; }
拉取消息
拉取消息的时候会顺带commit offset
DefaultMQPushConsumerImpl#pullMessage
// 拉取消息的时候顺带ack消息进度 boolean commitOffsetEnable = false; long commitOffsetValue = 0L; if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) { //READ_FROM_MEMORY 获取本地缓存的消费进度 commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY); if (commitOffsetValue > 0) { commitOffsetEnable = true; } }
this.pullAPIWrapper.pullKernelImpl( pullRequest.getMessageQueue(), subExpression, subscriptionData.getExpressionType(), subscriptionData.getSubVersion(), pullRequest.getNextOffset(),//需要拉取的下个offset ,拉取到了 ,不代表被消费到 this.defaultMQPushConsumer.getPullBatchSize(), sysFlag, commitOffsetValue,//确认的offset BROKER_SUSPEND_MAX_TIME_MILLIS, CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND, CommunicationMode.ASYNC, //异步拉取消息 pullCallback //注意这个回调也传过去了 );
PullMessageProcessor#processRequest
boolean storeOffsetEnable = brokerAllowSuspend; //需要hasCommitOffsetFlag=true storeOffsetEnable = storeOffsetEnable && hasCommitOffsetFlag; //需要当前broker是master storeOffsetEnable = storeOffsetEnable && this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE; if (storeOffsetEnable) { //这边应该是保存改group在该topic下面的消费进度 this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(channel), requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset()); }
总结
- 针对集群消费offset保存在broker,针对广播消费offset保存在本地
- offset保存的值为消息下次开始消费的位置,而不是上次消费结束的位置
- 针对offset的提交采取基于TreeMap的滑动窗口机制
- 消费者启动会先从broker拉取对应Topic的offset进度,然后在进行消息拉取
- offset持久化触发的几种方式
问题
消息消费失败是否影响窗口滑动
正常情况下,消息消费失败不会影响窗口滑动,因为针对消费失败的消息,client会进行sendback。
sendback之后,消息经过延迟之后会发往Topic=%RETRY%{CONSUMERGROUP}的Retry队列
每个ConsumerGroup会强制监听Retry队列的消息
ConsumeMessageConcurrentlyService#processConsumeResult
List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size()); //因为msgsize=1,所以只有失败的时候才会进入下面的循环 for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) { MessageExt msg = consumeRequest.getMsgs().get(i); //消费失败的消息 重新插入到commitlog 发送到group对应重试topic boolean result = this.sendMessageBack(msg, context); //如果发送请求失败 那么本地再消费一次试试 if (!result) { //本地的重试也算重试次数!!! msg.setReconsumeTimes(msg.getReconsumeTimes() + 1); msgBackFailed.add(msg); } } //发送到重试队列失败的消息,重新消费 if (!msgBackFailed.isEmpty()) { consumeRequest.getMsgs().removeAll(msgBackFailed); //如果消费失败 重新消费 this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue()); }
而sendMessageBack失败的消息,会重新封装成另一个ConsumeRequest在本地再次消费。
这些失败的消息会从之前的consumeRequest移除,因此也就影响到了ProcessQueue#removeMessage的返回值。
但是这是一个优化,重试之后窗口大概率上还是会正常滑动。
消费者并发消费如何保证提交位置偏移量正确,保证消费消费不丢失? 比如a线程消费msgid=1,b线程消费msgid=2,b线程消费速度比a线程快。如何避免a线程消费失败,消息不丢失?
如何保证并发消费提交偏移量正确?
基于TreeMap的滑动窗口
如何保证消息消费不丢失?
滑动窗口+broker远端保存+sendback+本地重试兜底
应用重启,消息从哪里开始消费
如果broker保存了offset
那么从对应offset重新拉取消息
如果broker没有保存offset,或者其他情况丢失
那么根据配置的策略,从对应的offset开始拉取
以上就是RocketMQ offset确认机制示例详解的详细内容,更多关于RocketMQ offset确认机制的资料请关注脚本之家其它相关文章!