RocketMQ消息存储文件的加载与恢复机制源码分析
作者:林师傅
前言
前面文章我们介绍了Broker是如何将消息全量存储到CommitLog文件中,并异步生成dispatchRequest任务更新ConsumeQueue,IndexFile的过程以及ConsumeQueue和IndexFile的文件结构。由于是异步转发消息,就可能出现消息成功存储到CommitLog文件,转发请求任务执行失败,Broker宕机了,此时CommitLog和Index消息并未处理完,导致CommitLog与ConsumeQueue和IndexFile文件中的数据不一致。如果由一部分消息在CommitLog中存在,在ConsumeQueue中不存在,那么这部分消息Consumer将永远无法消费到了,那么Broker是如何保证数据一致性的呢?
StoreCheckPoint介绍
StoreCheckPoint的作用是记录CommitLog,ConsumeQueue和IndexFile的刷盘点,当Broker异常结束时会根据StoreCheckPoint的数据恢复,StoreCheckPoint属性如下
public class StoreCheckpoint { // commitLog最后一条信息的刷盘时间戳 private volatile long physicMsgTimestamp = 0; // consumeQueue最后一个存储单元刷盘时间戳 private volatile long logicsMsgTimestamp = 0; // 最近一个已经写完IndexFile的最后一条记录刷盘时间戳 private volatile long indexMsgTimestamp = 0; }
StoreCheckPoint文件的存储位置是${user.home}/store/checkpoint
,文件的固定长度为4K,但StoreCheckPoint只占用了前24个字节,存储格式如下图所示
StoreCheckPoint时间戳更新时机
physicMsgTimestamp
FlushRealTimeService刷盘时更新
// org.apache.rocketmq.store.CommitLog.FlushRealTimeService#run public void run() { // ... // 更新CommitLog刷盘时间戳 if (storeTimestamp > 0) { CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp); } }
GroupCommitService刷盘时更新
// org.apache.rocketmq.store.CommitLog.GroupCommitService#doCommit private void doCommit() { // ... // 更新CommitLog刷盘时间戳 if (storeTimestamp > 0) { CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp); } }
logicsMsgTimestamp
ConsumeQueue保存消息存储单元时更新
// org.apache.rocketmq.store.ConsumeQueue#putMessagePositionInfoWrapper public void putMessagePositionInfoWrapper(DispatchRequest request, boolean multiQueue) { // ... // 如果consumeQueue保存成功,则更新ConsumeQueue存储点信息 if (result) { this.defaultMessageStore.getStoreCheckpoint().setLogicsMsgTimestamp(request.getStoreTimestamp()); } }
ConsumeQueue刷盘时更新并触发StoreCheckPoint刷盘
// org.apache.rocketmq.store.DefaultMessageStore.FlushConsumeQueueService#doFlush private void doFlush(int retryTimes) { // ... // 更新ConsumeQueue存储时间戳,并刷盘 if (0 == flushConsumeQueueLeastPages) { if (logicsMsgTimestamp > 0) { DefaultMessageStore.this.getStoreCheckpoint().setLogicsMsgTimestamp(logicsMsgTimestamp); } // 更新存储点 DefaultMessageStore.this.getStoreCheckpoint().flush(); } }
indexMsgTimestamp
// org.apache.rocketmq.store.index.IndexService#getAndCreateLastIndexFile public IndexFile getAndCreateLastIndexFile() { // 获取最新IndexFile,如果IndexFile已经满了,需要创建一个新的IndexFile if (indexFile == null) { indexFile = new IndexFile(fileName, this.hashSlotNum, this.indexNum, lastUpdateEndPhyOffset, lastUpdateIndexTimestamp); // 如果创建新的IndexFile成功,原IndexFile刷盘 if (indexFile != null) { final IndexFile flushThisFile = prevIndexFile; Thread flushThread = new Thread(new Runnable() { @Override public void run() { // indexFile刷盘 IndexService.this.flush(flushThisFile); } }, "FlushIndexFileThread"); flushThread.setDaemon(true); flushThread.start(); } } return indexFile; } // org.apache.rocketmq.store.index.IndexService#flush public void flush(final IndexFile f) { if (null == f) return; long indexMsgTimestamp = 0; if (f.isWriteFull()) { indexMsgTimestamp = f.getEndTimestamp(); } f.flush(); if (indexMsgTimestamp > 0) { // 更新checkPoint的indexMsgTimestamp并触发刷盘 this.defaultMessageStore.getStoreCheckpoint().setIndexMsgTimestamp(indexMsgTimestamp); this.defaultMessageStore.getStoreCheckpoint().flush(); } }
- 保存消息Index,获取最新的IndexFile如果满了,则会创建一个新的IndexFile,并且更新IndexMsgTimestamp并触发StoreCheckPoint刷盘
StoreCheckPoint刷盘源码
StoreCheckPoint刷盘源码如下所示,就是将CommitLog,ConsumeQueue和IndexFile刷盘时间戳持久化到硬盘上,由上面源码可知它的刷盘触发时机
- ConsumeQueue刷盘时触发
- 创建新IndexFile文件时触发
StoreCheckPoint刷盘源码如下
// org.apache.rocketmq.store.StoreCheckpoint#flush public void flush() { this.mappedByteBuffer.putLong(0, this.physicMsgTimestamp); this.mappedByteBuffer.putLong(8, this.logicsMsgTimestamp); this.mappedByteBuffer.putLong(16, this.indexMsgTimestamp); this.mappedByteBuffer.force(); }
消息加载源码分析
在BrokerController启动时会调用DefaultMessageStore#load
加载存储文件加载和恢复过程主要分为下面几步
- 判断Broker上次是否正常退出。这个判断逻辑是根据
${user.home}/store/abort
是否存在。如果文件存在,说明上次是异常退出,如果文件不存在,则说明是正常退出。 - 加载CommitLog
- 加载ConsumeQueue
- 加载StoreCheckPoint
- 加载IndexFile
- 恢复ConsumeQueue与IndexFile
- 加载延迟队列服务
// org.apache.rocketmq.store.DefaultMessageStore#load public boolean load() { boolean result = true; try { // 1. Broker上次是否正常退出 boolean lastExitOK = !this.isTempFileExist(); log.info("last shutdown {}", lastExitOK ? "normally" : "abnormally"); // 2. 加载commitLog result = result && this.commitLog.load(); // 3. 加载consumeQueue result = result && this.loadConsumeQueue(); if (result) { // 4. 加载StoreCheckPoint this.storeCheckpoint = new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir())); // 5. 加载IndexFile this.indexService.load(lastExitOK); // 6. 恢复ConsumeQueue与IndexFile this.recover(lastExitOK); // 7. 延迟队列服务加载 if (null != scheduleMessageService) { result = this.scheduleMessageService.load(); } } } return result; }
CommitLog加载
前面文章介绍过,CommitLog文件的存储目录是${user.home}/store/commitlog/
,并且CommitLog文件的底层是MappedFile,由MappedFileQueue管理。
CommitLog文件的加载其实调用的是MappedFileQueue#load
方法,代码如下所示,load()中首先加载CommitLog文件目录下的所有文件,并调用doLoad()方法加载CommitLog。
// org.apache.rocketmq.store.MappedFileQueue#load public boolean load() { File dir = new File(this.storePath/*${user.home}/store/commitlog/*/); File[] ls = dir.listFiles(); if (ls != null) { return doLoad(Arrays.asList(ls)); } return true; }
MappedFile的加载过程如下所示,核心逻辑主要分为下面三步
- 按照文件名称将文件排序,排序好的文件就会按照消息保存的先后顺序存放在列表中
- 校验文件大小与mappedFile是否一致,如果commitLog文件大小与mappedFileSize不一致,则说明配置被改了,或者CommitLog文件被修改
- 创建mappedFile,并且设置wrotePosition,flushedPosition,committedPosition为mappedFileSize
public boolean doLoad(List<File> files) { // 按照文件名称排序 files.sort(Comparator.comparing(File::getName)); for (File file : files) { // 如果commitLog文件大小与mappedFileSize不一致,则说明配置被改了,或者CommitLog文件被修改 if (file.length() != this.mappedFileSize) { return false; } try { // 创建MappedFile MappedFile mappedFile = new MappedFile(file.getPath(), mappedFileSize); mappedFile.setWrotePosition(this.mappedFileSize); mappedFile.setFlushedPosition(this.mappedFileSize); mappedFile.setCommittedPosition(this.mappedFileSize); this.mappedFiles.add(mappedFile); } } return true; }
看到这里肯定会有疑问,加载后的MappedFile的wrotePosition,flushedPosition和committedPosition的值都为mappedFileSize,如果最后一个MappedFile没有使用完,Broker启动后还会从最后一个MappedFile开始写么?我们可以在后面消息文件恢复源码分析找到答案。
ConsumeQueue加载
从前面文章我们知道,ConsumeQueue文件底层其实也是MappedFile,因此ConsumeQueue文件的加载与CommitLog加载差别不大。ConsumeQueue加载逻辑为
- 获取ConsumeQueue目录下存储的所有Topic目录,遍历Topic目录
- 遍历每个Topic目录下的所有queueId目录,逐个加载ququeId中的所有MappedFile
// org.apache.rocketmq.store.DefaultMessageStore#loadConsumeQueue private boolean loadConsumeQueue() { // 获取consumeQueue目录 File dirLogic = new File(StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()/*${user.home}/store */)); // topic文件夹数组 File[] fileTopicList = dirLogic.listFiles(); if (fileTopicList != null) { // 遍历topic for (File fileTopic : fileTopicList) { // 获取topic名称 String topic = fileTopic.getName(); // 获取queueId文件夹数组 File[] fileQueueIdList = fileTopic.listFiles(); // 遍历queueId if (fileQueueIdList != null) { for (File fileQueueId : fileQueueIdList) { int queueId; // 文件夹名称就是queueId queueId = Integer.parseInt(fileQueueId.getName()); // 构建consumeQueue ConsumeQueue logic = new ConsumeQueue(/* ... */); this.putConsumeQueue(topic, queueId, logic); // ConsumeQueue加载 if (!logic.load()) { return false; } } } } } return true; }
IndexFile加载
IndexFile文件加载过程调用的是IndexService#load
,首先获取${user.home}/store/index
目录下的所有文件,遍历所有文件,如果IndexFile最后存储时间大于StoreCheckPoint中indexMsgTimestamp,则会先删除IndexFile
// org.apache.rocketmq.store.index.IndexService#load public boolean load(final boolean lastExitOK) { // indexFile文件目录 File dir = new File(this.storePath); // indexFile文件列表 File[] files = dir.listFiles(); if (files != null) { // 文件排序 Arrays.sort(files); for (File file : files) { try { IndexFile f = new IndexFile(file.getPath(), this.hashSlotNum, this.indexNum, 0, 0); f.load(); if (!lastExitOK) { // 文件最后存储时间戳大于刷盘点,则摧毁indexFile,重建 if (f.getEndTimestamp() > this.defaultMessageStore.getStoreCheckpoint()/*存储点时间*/ .getIndexMsgTimestamp()) { f.destroy(0); continue; } } this.indexFileList.add(f); } } } return true; }
ConsumeQueue与IndexFile恢复
如果是正常退出,数据都已经正常刷盘,前面我们说到CommitLog在加载时的wrotePosition,flushedPosition,committedPosition都设置为mappedFileSize,
因此即使是正常退出,也会调用CommitLog#recoverNormally
找到最后一条消息的位置,更新这三个属性。
// org.apache.rocketmq.store.DefaultMessageStore#recover private void recover(final boolean lastExitOK) { // consumeQueue中最大物理偏移量 long maxPhyOffsetOfConsumeQueue = this.recoverConsumeQueue(); if (lastExitOK) { // 正常退出文件恢复 this.commitLog.recoverNormally(maxPhyOffsetOfConsumeQueue); } else { // 异常退出文件恢复 this.commitLog.recoverAbnormally(maxPhyOffsetOfConsumeQueue); } // 恢复topicQueueTable this.recoverTopicQueueTable(); }
正常恢复的源码如下,由于Broker是正常关闭,因此CommitLog,ConsumeQueue与IndexFile都已经正确刷盘,并且三者的消息是一致的。正常恢复的主要目的是找到找到最后一条消息的偏移量,然后更新CommitLog的MappedFileQueue中的刷盘点(flushWhere)和提交点(committedWhere),
- 从最后3个mappedFile开始恢复,如果mappedFile总数不足3个,则从第0个mappedFile开始恢复
- 逐个遍历mappedFile,找到每个MappedFile的最后一条消息的偏移量,并将其更新到CommitLog中MappedFileQueue的刷盘点和提交点中
- 清除ConsumeQueue冗余数据
public void recoverNormally(long maxPhyOffsetOfConsumeQueue) { // 确认消息是否完整,默认是true boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover(); final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles(); if (!mappedFiles.isEmpty()) { // 默认从最后3个mappedFile开始恢复 int index = mappedFiles.size() - 3; // 如果commitLog不足三个,则从第一个文件开始恢复 if (index < 0) index = 0; MappedFile mappedFile = mappedFiles.get(index); ByteBuffer byteBuffer = mappedFile.sliceByteBuffer(); // 最后一个MappedFile的文件起始偏移量 long processOffset = mappedFile.getFileFromOffset(); // mappedFileOffset偏移量 long mappedFileOffset = 0; // 遍历CommitLog文件 while (true) { // 校验消息完整性 DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover); // 获取消息size int size = dispatchRequest.getMsgSize(); // 返回结果为true并且消息size>0,说明消息是完整的 if (dispatchRequest.isSuccess() && size > 0) { mappedFileOffset += size; } } // 最大物理偏移量 processOffset += mappedFileOffset; // 更新flushedWhere和committedPosition指针 this.mappedFileQueue.setFlushedWhere(processOffset); this.mappedFileQueue.setCommittedWhere(processOffset); this.mappedFileQueue.truncateDirtyFiles(processOffset); // 清除ConsumeQueue冗余数据 if (maxPhyOffsetOfConsumeQueue >= processOffset) { this.defaultMessageStore.truncateDirtyLogicFiles(processOffset/*CommitLog最大物理偏移量*/); } } }
异常恢复源码如下,由于上次Broker没有正常关闭,因此由可能存在CommitLog、ConsumeQueue与IndexFile不一致的情况,因此在异常恢复时可能需要恢复ConsumeQueue和IndexFile,异常恢复核心逻辑主要包括
- 倒序查CommitLog的mappedFile文件,找到第一条消息存储的时间戳比StoreCheckPoint里的physicMsgTimestamp,logicsMsgTimestamp和indexMsgTimestamp三者都小的最大MappedFile,该mappedFile至少有一部分消息是被正常转发,正常存储,正常刷盘的
- 从该mappedFile开始逐条转发消息,重新恢复ConsumeQueue和IndexFile
- 当遍历到最后一条消息,将其偏移量更新到CommitLog中MappedFileQueue的刷盘点和提交点中
- 清除ConsumeQueue冗余数据
// org.apache.rocketmq.store.CommitLog#recoverAbnormally public void recoverAbnormally(long maxPhyOffsetOfConsumeQueue) { // 是否CRC校验 boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover(); final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles(); if (!mappedFiles.isEmpty()) { // 最后一个mappedFile的index int index = mappedFiles.size() - 1; MappedFile mappedFile = null; // 倒序遍历mappedFile数组, for (; index >= 0; index--) { mappedFile = mappedFiles.get(index); // 1. 如果第一条消息的时间戳小于存储点时间戳 if (this.isMappedFileMatchedRecover(mappedFile)) { break; } } long processOffset = mappedFile.getFileFromOffset(); long mappedFileOffset = 0; while (true) { DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover); int size = dispatchRequest.getMsgSize(); if (dispatchRequest.isSuccess()) { if (size > 0) { mappedFileOffset += size; // 2. 转发消息 if (this.defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()/*消息是否可以重复,默认是false*/) { if (dispatchRequest.getCommitLogOffset() < this.defaultMessageStore.getConfirmOffset()) { this.defaultMessageStore.doDispatch(dispatchRequest); } } else { this.defaultMessageStore.doDispatch(dispatchRequest); } } } // 3. 更新MappedFileQueue中的刷盘位置和提交位置 processOffset += mappedFileOffset; this.mappedFileQueue.setFlushedWhere(processOffset); this.mappedFileQueue.setCommittedWhere(processOffset); this.mappedFileQueue.truncateDirtyFiles(processOffset); // 清除ConsumeQueue中的冗余数据 if (maxPhyOffsetOfConsumeQueue >= processOffset) { this.defaultMessageStore.truncateDirtyLogicFiles(processOffset); } } }
总结
Broker启动时会分别加载CommitLog、ConsumeQueue与IndexFile。加载完成后,如果Broker上次是正常退出,只需要找到CommitLog的最后一条消息,并更新刷盘点和提交点。如果Broker上次是异常退出,就有可能出现ConsumeQueue、IndexFile与CommitLog不一致的情况,需要根据StoreCheckPoint存储的时间戳从CommitLog找到消息,逐条恢复ConsumeQueue与IndexFile。
以上就是RocketMQ | 源码分析】消息存储文件的加载与恢复机制的详细内容,更多关于RocketMQ 消息存储文件加载恢复的资料请关注脚本之家其它相关文章!