java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > RocketMQ 消息Message

RocketMQ 消息Message的结构和使用方式详解

作者:csdn_tom_168

Message是RocketMQ的数据包,它不仅是业务数据的载体,更是路由、过滤、追踪、延迟、事务等功能的基础,掌握Message,你就掌握了RocketMQ的语言,本文给大家介绍什么是 Message及理解Message的结构、属性、生命周期和使用方式,感兴趣的朋友一起看看吧

🚀 RocketMQ 消息(Message)详解

在 Apache RocketMQ 中,消息(Message) 是数据传输的最小单元,是生产者与消费者之间通信的“载体”。理解 Message 的结构、属性、生命周期和使用方式,是掌握 RocketMQ 的核心基础。

推荐阅读:深入理解Apache RocketMQ 中Message 消息的核心概念

一、什么是 Message?

✅ 定义:

Message 是 RocketMQ 中封装实际业务数据的对象,包含消息体(Body)和一系列元数据(如 Topic、Tag、Key、Properties 等),用于在生产者与消费者之间传递信息。

类比:就像一封信,信纸是内容(Body),信封上写着收件人(Topic)、标签(Tag)、编号(Key)等信息。

二、Message 的核心结构

一个 Message 对象主要由以下几个部分组成:

字段类型是否必填说明
TopicString✅ 必填消息所属的主题,用于路由和分类
Bodybyte[]✅ 必填消息的实际内容,通常为序列化后的 JSON、Protobuf 等
TagsString❌ 可选子分类标签,用于消费者过滤(如 CREATE, CANCEL
KeysString❌ 可选消息的唯一键或业务主键(如订单号),用于排查、索引
Flagint❌ 可选消息标志位(如是否压缩)
DelayTimeLevelint❌ 可选延迟消息级别(1~18),实现定时投递
PropertiesMap<String, String>❌ 可选自定义属性,RocketMQ 内部也使用它存储系统属性

三、Message 各字段详解

1.Topic(主题)

new Message("ORDER_TOPIC", ...);

2.Body(消息体)

String content = "{\"orderId\":\"1001\",\"userId\":10086}";
Message msg = new Message(topic, tag, content.getBytes(StandardCharsets.UTF_8));

⚠️ 注意:

  • 单条消息大小默认最大 4MB(可配置)
  • 过大消息会影响性能,建议拆分或使用外部存储(如上传文件后传 URL)

3.Tags(标签)

// 发送
new Message("ORDER_TOPIC", "CREATE", "创建订单".getBytes());
new Message("ORDER_TOPIC", "PAY", "支付完成".getBytes());
// 订阅 CREATE 类型消息
consumer.subscribe("ORDER_TOPIC", "CREATE");

✅ 优势:轻量级过滤,避免消费者接收无关消息。

⚠️ 注意:Tags 是字符串匹配,不支持正则(但支持 * 通配和 || 多选)

4.Keys(消息键)

Message msg = new Message(...);
msg.setKeys("ORDER_20240501001");

✅ 建议:关键业务消息务必设置 Keys,便于追踪。

5.Properties(属性)

msg.putUserProperty("traceId", "abc123");
msg.putUserProperty("source", "web");

⚠️ 注意:系统属性以 PREFIX_SYS_PROP 开头,不要冲突。

6.DelayTimeLevel(延迟级别)

级别时间
11s
25s
310s
430s
51m
62m
73m
84m
95m
106m
117m
128m
139m
1410m
1520m
1630m
171h
182h
Message msg = new Message("DELAY_TOPIC", "TAG", "延迟消息".getBytes());
msg.setDelayTimeLevel(5); // 延迟1分钟
producer.send(msg);

⚠️ 注意:延迟消息不保证精确时间,存在轻微误差。

四、Message 的生命周期

1. 生产者创建 Message 对象
   ↓
2. 发送到 Broker(写入 CommitLog)
   ↓
3. 构建 ConsumeQueue 和 IndexFile
   ↓
4. 消费者拉取消息(根据 Topic + Queue)
   ↓
5. 处理成功 → 提交 Offset
   ↓
6. 消息过期(默认 72 小时)→ 被删除

✅ 消息是持久化存储的,即使消费者未上线,消息也不会丢失。

五、Message 的存储机制

虽然 Message 是逻辑对象,但在 Broker 端有严格的物理存储结构:

1.CommitLog

2.ConsumeQueue

ConsumeQueue/{Topic}/{QueueId}/
   ├── 00000000000000000000
   └── ...

3.IndexFile

IndexFile/index_1714567890000

六、Message 的发送方式回顾

方式说明
同步发送阻塞等待结果,适用于关键消息
异步发送回调通知结果,高吞吐场景
单向发送不关心结果,日志类消息
事务消息半消息 + 本地事务 + 提交/回滚

所有方式发送的都是 Message 对象。

七、最佳实践与注意事项

实践说明
✅ 设置 Topic 和 Tag合理分类,便于管理和过滤
✅ 关键消息设置 Keys便于通过 mqadmin 查询
✅ 控制 Body 大小≤ 4MB,避免影响性能
✅ 使用 UTF-8 编码防止乱码
✅ 避免空 Body可能导致异常
✅ 合理使用延迟消息替代部分定时任务,但不要滥用
✅ 自定义属性用 putUserProperty避免覆盖系统属性

八、常见问题排查

问题原因解决方案
MessageExt is null拉取超时或无消息正常现象,重试即可
msg put message to store error消息过大或磁盘满检查大小限制和磁盘空间
延迟消息未按时投递时间误差或 Broker 压力大接受轻微延迟,或使用外部调度系统
通过 Key 查不到消息IndexFile 未生成或已过期检查 messageIndexEnable 配置
消息重复网络重试、Rebalance消费者做幂等处理

✅ 总结:Message 核心要点

维度说明
角色消息传输的基本单元
组成Topic + Body + Tags + Keys + Properties + Delay
大小限制默认 ≤ 4MB
存储方式顺序写 CommitLog,索引通过 ConsumeQueue 和 IndexFile
可查询性支持按 Key、时间、Offset 查询
扩展性支持自定义属性,灵活传递上下文
高级功能支持延迟、事务、顺序消息

🚀 一句话总结:

Message 是 RocketMQ 的“数据包” —— 它不仅是业务数据的载体,更是路由、过滤、追踪、延迟、事务等功能的基础。
设计好 Message 的结构与属性,才能让消息系统真正高效、可靠、易维护。

掌握 Message,你就掌握了 RocketMQ 的“语言”。

到此这篇关于RocketMQ 消息Message的结构和使用方式详解的文章就介绍到这了,更多相关RocketMQ 消息Message内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文