使用Canal监听MySQL Binlog日志的实现方案
作者:Hello Dam
引入
原始实现
首先看下面的代码,这段代码的作用是关闭超时未支付的订单,包括两个步骤
1、将订单状态修改为取消状态
2、调用远程服务,恢复数据库和缓存中的库存
@Override @Transactional(rollbackFor = Throwable.class) public void closeOrder(String orderSn) { OrderDO orderDO = baseMapper.selectByOrderSn(orderSn); if (orderDO.getOrderStatus().equals(OrderStatusConstant.UN_PAID)) { // --if-- 到时间了,订单还没有支付,取消该订单 // 修改订单状态为取消状态 orderDO.setOrderStatus(OrderStatusConstant.CANCEL); // 分片键不能更新 orderDO.setVenueId(null); baseMapper.updateByOrderSn(orderDO); // 还原数据库和缓存中的库存 Result<OrderDO> result = null; try { result = timePeriodFeignService.release(TimePeriodStockRestoreReqDTO.builder() .timePeriodId(orderDO.getTimePeriodId()) .partitionId(orderDO.getPartitionId()) .courtIndex(orderDO.getCourtIndex()) .userId(orderDO.getUserId()) .build()); } catch (Exception e) { // --if-- 库存恢复远程接口调用失败 throw new ServiceException(BaseErrorCode.REMOTE_ERROR); } if (result != null && !result.isSuccess()) { // 因为使用了Transactional,如果这里出现了异常,订单的关闭修改会回退 throw new ServiceException("调用远程服务释放时间段数据库库存失败", BaseErrorCode.SERVICE_ERROR); } } }
存在问题
为了确保这两个步骤要么全部成功,要么全部失败,在这段代码中,使用了@Transactional
注解来管理本地数据库事务。如果说调用远程服务恢复库存时,调用失败,事务会进行回滚,即订单状态还是保持原样,不会被取消。然而,在分布式环境中,当涉及到调用远程服务时,@Transactional
只能保证本地事务的一致性,而不能保证跨服务的一致性。例如在极端情况下会出现如下问题:
- 远程服务实际上已经成功处理了请求,完成了库存的恢复。
- 但由于网络延迟或中断,本地服务未能接收到远程服务的成功响应。
- 结果是本地服务认为库存恢复失败,触发了本地事务的回滚,使订单状态回到未取消的状态。
这种情况下,就会产生事务不一致的问题:库存已经被正确地恢复,但订单仍然处于可支付状态。这可能导致客户继续尝试支付一个实际上应该被取消的订单,或者导致库存数据与订单状态之间的不匹配。
替代方案
开启 MySQL 的 Binlog 日志,通过 Canal 监听订单状态的变化并异步发送消息至消息队列。消费者从队列中接收消息后,如果检测到订单的状态是从未支付
修改为已取消
,就负责调用库存服务恢复商品库存。
这种方式解耦了订单服务与库存服务,提高了系统的容错性和处理效率,支持异步操作和流量削峰,确保了最终一致性,并通过幂等性设计保障了数据的准确性和系统的稳定性。
**为什么说确保了最终一致性?**当订单关闭之后,消息队列会保证消息至少被成功消费一次,即库存如果还原失败,消息队列会多次重发消息,如果达到重发上限可以接入人工来处理死信队列的消息
操作
MySQL 开启 Binlog
log-bin=mysql-bin # 开启 binlog binlog-format=ROW # 选择 ROW 模式 server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
通过 CMD 命令行窗口重启 MySQL 数据库,让配置生效
C:\Windows\System32>net stop mysql8 mysql8 服务正在停止.. mysql8 服务已成功停止。 C:\Windows\System32>net start mysql8 mysql8 服务正在启动 . mysql8 服务已经启动成功。
连接进入MySQL之后,使用show variables like 'log_%';
查看BinLog启动是否成功,如果查询出来log_bin
对应的值为ON,说明启动成功
C:\Windows\System32>mysql -u root -p12345678 mysql: [Warning] Using a password on the command line interface can be insecure. Welcome to the MySQL monitor. Commands end with ; or \g. Your MySQL connection id is 9 Server version: 8.0.27 MySQL Community Server - GPL Copyright (c) 2000, 2021, Oracle and/or its affiliates. Oracle is a registered trademark of Oracle Corporation and/or its affiliates. Other names may be trademarks of their respective owners. Type 'help;' or '\h' for help. Type '\c' to clear the current input statement. mysql> show variables like 'log_%'; +----------------------------------------+----------------------------------------------------------------------------------+ | Variable_name | Value | +----------------------------------------+----------------------------------------------------------------------------------+ | log_bin | ON | | log_bin_basename | D:\Development\Sql\Mysql\mysql8\exe\mysql-8.0.27-winx64\data\mysql-bin | | log_bin_index | D:\Development\Sql\Mysql\mysql8\exe\mysql-8.0.27-winx64\data\mysql-bin.index | | log_bin_trust_function_creators | OFF | | log_bin_use_v1_row_events | OFF | | log_error | D:\Development\Sql\Mysql\mysql8\exe\mysql-8.0.27-winx64\data\DESKTOP-TQSE9JO.err | | log_error_services | log_filter_internal; log_sink_internal | | log_error_suppression_list | | | log_error_verbosity | 2 | | log_output | FILE | | log_queries_not_using_indexes | OFF | | log_raw | OFF | | log_replica_updates | ON | | log_slave_updates | ON | | log_slow_admin_statements | OFF | | log_slow_extra | OFF | | log_slow_replica_statements | OFF | | log_slow_slave_statements | OFF | | log_statements_unsafe_for_binlog | ON | | log_throttle_queries_not_using_indexes | 0 | | log_timestamps | UTC | +----------------------------------------+----------------------------------------------------------------------------------+ 21 rows in set, 1 warning (0.01 sec)
给canal创建一个单独使用的账号来进行 Binlog 的同步和监听
CREATE USER canal IDENTIFIED BY 'canal'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; FLUSH PRIVILEGES;
执行成功,MySQL的user表就多了一天canal的记录
mysql> CREATE USER canal IDENTIFIED BY 'canal'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';FLUSH PRIVILEGES; Query OK, 0 rows affected (0.01 sec) Query OK, 0 rows affected (0.00 sec) Query OK, 0 rows affected (0.00 sec)
Canal 中间件
下载方式
源码地址:https://github.com/alibaba/canal
下载地址:https://github.com/alibaba/canal/releases
解压之后,目录如下:
修改配置文件
instance.properties
首先修改配置文件instance.properties
- 修改
canal.instance.master.address
,指向真正的 MySQL 的 IP 和端口 - 修改
canal.mq.topic
,声明发送到消息队列的消息的 Topic
如果你给canal提供的账号密码不是canal,需要修改
最后,并不是所有数据库或数据表改动我们都需要做出反应的,这里我只针对我需要监听的数据库和数据表即可,通过设置canal.instance.filter.regex
进行过滤
我的设置为canal.instance.filter.regex=^(venue-reservation)\\.(time_period_order_([0-9]|1[0-9]|2[0-9]|3[0-1]))$
,解释如下:
^
表示字符串的开始。(venue-reservation)
匹配名为venue-reservation
的数据库。这里使用了括号()
来创建一个捕获组。如果说项目使用分库,需要匹配多个数据库的话,可以这样写(venue-reservation_0|venue-reservation_1|venue-reservation_2)
\\.
匹配实际的点号,这是数据库名和表名之间的分隔符。(time_period_order_([0-9]|1[0-9]|2[0-9]|3[0-1]))
匹配特定模式的表名。这里也是用括号创建了一个捕获组。因为项目对time_period_order
进行了分表,所以需要这样设置。具体地:time_period_order_
匹配固定前缀time_period_order_
。([0-9]|1[0-9]|2[0-9]|3[0-1])
这一部分是用来匹配数字部分,看起来像是为了匹配类似于时间间隔或者编号的表。更具体地说:[0-9]
匹配从0到9的任何数字。1[0-9]
匹配从10到19的两位数。2[0-9]
匹配从20到29的两位数。3[0-1]
匹配30或31。
$
表示字符串的结束。
canal.properties
修改配置文件canal.properties
- 因为我所使用的消息队列是RocketMQ,首先将模式
canal.serverMode
设置为RocketMQ - 将
rocketmq.namesrv.addr
指向的RocketMQ服务器指向正确的IP和端口
启动
启动Canal,如果是win,直接双击startup.bat
启动即可
下图启动之后弹出的窗口,如果要关闭 canal ,就点右上角的 x 即可
想要查看 canal 是否启动成功,可以通过日志文件查看,如果出现如下红色部分的输出,说明启动成功
测试
将订单状态从0改成2
去RocketMQ中查看收到的消息,topic选择刚刚配置文件中设置的vrs_canal_common_topic
刚刚接收到的消息详情如下
使用json格式化工具查看
{ "data": [ { "order_sn": "1866821518450221056850432", "is_deleted": "0", "order_time": "2024-12-11 20:25:09", "venue_id": "1865271207637635072", "partition_id": "1865276571322015744", "partition_index": "0", "time_period_id": "1866776397058904064", "user_id": "1864637732760850432", "order_status": "2", "payment_method": null, "transaction_id": null, "pay_time": null, "pay_amount": null, "refund_status": null, "refund_amount": null, "refund_time": null } ], "database": "venue-reservation", "es": 1734228903000, "gtid": "", "id": 16, "isDdl": false, "mysqlType": { "order_sn": "varchar(30)", "is_deleted": "tinyint", "order_time": "datetime", "venue_id": "bigint", "partition_id": "bigint", "partition_index": "int", "time_period_id": "bigint", "user_id": "bigint", "order_status": "tinyint", "payment_method": "tinyint", "transaction_id": "varchar(255)", "pay_time": "datetime", "pay_amount": "decimal(10,2)", "refund_status": "tinyint", "refund_amount": "decimal(10,2)", "refund_time": "datetime" }, "old": [ { "order_status": "0" } ], "pkNames": [ "order_sn" ], "sql": "", "sqlType": { "order_sn": 12, "is_deleted": -6, "order_time": 93, "venue_id": -5, "partition_id": -5, "partition_index": 4, "time_period_id": -5, "user_id": -5, "order_status": -6, "payment_method": -6, "transaction_id": 12, "pay_time": 93, "pay_amount": 3, "refund_status": -6, "refund_amount": 3, "refund_time": 93 }, "table": "time_period_order_0", "ts": 1734228903999, "type": "UPDATE" }
消息监听处理
实体类
首先定义一个实体类,用来接收Canal推送过来的消息
import lombok.Data; import java.util.List; import java.util.Map; /** * 用来接收canal发送过来的消息的数据 * @Author dam * @create 2024/12/10 14:11 */ @Data public class CanalBinlogDTO { /** * 变更之后的数据 */ private List<Map<String, Object>> data; /** * 数据库名称 */ private String database; /** * es 是指 Mysql Binlog 里原始的时间戳,也就是数据原始变更的时间 * Canal 的消费延迟 = ts - es */ private Long es; /** * 递增 ID,从 1 开始 */ private Long id; /** * 当前变更是否是 DDL 语句 */ private Boolean isDdl; /** * 表结构字段类型 */ private Map<String, Object> mysqlType; /** * 修改之前的旧数据 */ private List<Map<String, Object>> old; /** * 主键名称 */ private List<String> pkNames; /** * SQL 语句 */ private String sql; /** * SQL 类型 */ private Map<String, Object> sqlType; /** * 表名 */ private String table; /** * ts 是指 Canal 收到这个 Binlog,构造为自己协议对象的时间 * 应用消费的延迟 = now - ts */ private Long ts; /** * INSERT(新增)、UPDATE(更新)、DELETE(删除)等等 */ private String type; }
监听
获取到消息之后,如果判断到所做的修改是UPDATE
类型,而且修改的是订单号,即oldDataMap.containsKey("order_status")
,则进一步判断是否为将订单号从0
修改为2
,如果是则调用恢复库存方法
import cn.hutool.core.util.ObjectUtil; import com.vrs.annotation.Idempotent; import com.vrs.constant.OrderStatusConstant; import com.vrs.constant.RocketMqConstant; import com.vrs.domain.dto.mq.CanalBinlogDTO; import com.vrs.domain.dto.req.TimePeriodStockRestoreReqDTO; import com.vrs.enums.IdempotentSceneEnum; import com.vrs.service.TimePeriodService; import lombok.RequiredArgsConstructor; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.annotation.MessageModel; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Component; import java.util.Map; /** * @Author dam * @create 2024/12/10 14:12 */ @Slf4j(topic = RocketMqConstant.CANAL_TOPIC) @Component @RocketMQMessageListener(topic = RocketMqConstant.CANAL_TOPIC, consumerGroup = RocketMqConstant.CANAL_CONSUMER_GROUP, messageModel = MessageModel.CLUSTERING ) @RequiredArgsConstructor public class CanalBinlogCommonListener implements RocketMQListener<CanalBinlogDTO> { private final TimePeriodService timePeriodService; /** * 消费消息的方法 * 方法报错就会拒收消息 * * @param messageWrapper 消息内容,类型和上面的泛型一致。如果泛型指定了固定的类型,消息体就是我们的参数 */ @Idempotent( uniqueKeyPrefix = "canal_binlog_common:", key = "#canalBinlogDTO.getId()+''", scene = IdempotentSceneEnum.MQ, keyTimeout = 3600L ) @SneakyThrows @Override public void onMessage(CanalBinlogDTO canalBinlogDTO) { if (canalBinlogDTO.getOld() == null) { return; } Map<String, Object> alterDataMap = canalBinlogDTO.getData().get(0); Map<String, Object> oldDataMap = canalBinlogDTO.getOld().get(0); if (ObjectUtil.equal(canalBinlogDTO.getType(), "UPDATE") && oldDataMap.containsKey("order_status")) { log.info("[消费者] 消费canal的消息,恢复时间段的库存和空闲场号,时间段ID:{}", alterDataMap.get("time_period_id")); Long userId = Long.parseLong(alterDataMap.get("user_id").toString()); Long timePeriodId = Long.parseLong(alterDataMap.get("time_period_id").toString()); Long partitionId = Long.parseLong(alterDataMap.get("partition_id").toString()); Long courtIndex; if (alterDataMap.containsKey("partition_index")) { courtIndex = Long.parseLong(alterDataMap.get("partition_index").toString()); } else { courtIndex = Long.parseLong(alterDataMap.get("court_index").toString()); } Integer orderStatus = Integer.parseInt(alterDataMap.get("order_status").toString()); Integer oldOrderStatus = Integer.parseInt(oldDataMap.get("order_status").toString()); if (orderStatus.equals(OrderStatusConstant.CANCEL) && oldOrderStatus.equals(OrderStatusConstant.UN_PAID)) { // 恢复库存 timePeriodService.restoreStockAndBookedSlots(TimePeriodStockRestoreReqDTO.builder() .userId(userId) .courtIndex(courtIndex) .timePeriodId(timePeriodId) .partitionId(partitionId) .build()); } } } }
以上就是使用Canal监听MySQL Binlog日志的实现方案的详细内容,更多关于Canal监听MySQL Binlog的资料请关注脚本之家其它相关文章!