RocketMQ事务消息机制详解
作者:智由静生
RocketMQ事务消息
RocketMQ提供了事务消息,通过事务消息就能达到分布式事务的最终一致,从而实现了可靠消息服务。
一、事务消息的实现步骤
事务消息发送步骤:
1. 发送方将半事务消息发送至RocketMQ服务端。
2. RocketMQ服务端将消息持久化之后,向发送方返回Ack确认消息已经发送成功。由于消息为半事务消息,在未收到生产者对该消息的二次确认前,此消息被标记成“暂不能投递”状态。
3. 发送方开始执行本地事务逻辑。
4. 发送方根据本地事务执行结果向服务端提交二次确认(Commit 或是 Rollback),服务端收到Commit 状态则将半事务消息标记为可投递,订阅方最终将收到该消息;服务端收到 Rollback 状态则删除半事务消息,订阅方将不会接受该消息。
事务消息回查步骤:
1. 在断网或者是应用重启的特殊情况下,上述步骤4提交的二次确认最终未到达服务端,经过固定时间后服务端将对该消息发起消息回查。
2. 发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果。 3. 发送方根据检查得到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行操作。
二、程序实现
事务消息处理类需要继承RocketMQLocalTransactionListener类。该类的executeLocalTransaction方法负责在接到RocketMQ服务端的Ack确认消息后执行本地方法,也就是事务消息发送步骤中的步骤3。该类的checkLocalTransaction方法负责,在断网或者是应用重启的特殊情况下,执行RocketMQ服务端的消息回查,也就是事务消息回查步骤中的步骤2。
此外,要使该类生效,还需要加@RocketMQTransactionListener注解。这里有个要特别注意的地方。在2.1.0版本前,这个注解有一个属性txProducerGroup,可以用多个@RocketMQTransactionListener来监听不同的txProducerGroup来发送不同类型的事务消息到topic。但是现在在一个项目中,如果你在一个project中写了多个@RocketMQTransactionListener,项目将不能启动,启动会报错。产生这个问题的原因据说是,当使用RocketMQTemplate并发的执行事务时,非常容易出现"illegal state"的异常,原因是一个TransactionProducer在执行事务时不能被共享。所以,必须使用同一个TransactionMQProducer来发送所有类型的事务消息。当然同理也就必须使用一个侦听器处理所有的消息了。
既然必须使用同一个TransactionMQProducer,对于比较大的应用,业务场景很多,就会造成混乱。这里我给出一个方案抛砖引玉。TransactionMQProducer在发送消息时,是可以传递参数对象和指定消息头的。可以把要执行的本地方法的bean名和方法名放进去。
//发送半事务消息 TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction( topicAndTag, MessageBuilder.withPayload(msg) .setHeader(Constants.TX_ID_HEADER_NAME, msg.getTxId()) .setHeader(Constants.CHECK_BEAN_ID_HEADER_NAME, def.getCheckBeanId()) .setHeader(Constants.BIZ_ID_HEADER_NAME, msg.getBizId()) .build(), def );
其中def就是参数对象,可以自定义对象,这里是我自定义的TransactionMsgDefinationDto类,可以把想传递的信息放进去,最重要的是要执行的本地方法的bean名和方法名和方法执行参数:executeBeanId(bean名)、executeBeanMethod(方法名)、executeBeanParams(方法执行参数)。该对象可以传给RocketMQLocalTransactionListener的executeLocalTransaction方法,然后通过反射执行。
@Override public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) { try { //保存消息记录 String body = new String((byte[]) msg.getPayload(), StandardCharsets.UTF_8); JSONObject jsonBody = JSONObject.parseObject(body); BaseMsgDto dto = JSONObject.toJavaObject(jsonBody, BaseMsgDto.class);//(BaseMsgDto)msg.getPayload(); TransactionMsgDefinationDto def = (TransactionMsgDefinationDto)arg; ProducerLog producerLog = BeanCopyUtils.copyProperties(def, ProducerLog::new); String[] tags = def.getMsgTags(); if(tags !=null && tags.length > 0) { StringBuilder tag = new StringBuilder(); for(int i = 0; i<tags.length; i++) { tag.append(tags[0]); if(i != tags.length-1) { tag.append("||"); } } producerLog.setMsgTag(tag.toString()); } producerLog.setBizId(dto.getBizId()); producerLog.setTxId(dto.getTxId()); producerLog.setBizType(dto.getBizType()); producerLog.setGroupName(dto.getProducerGroup()); producerLog.setMsgBody(body); producerLogService.save(producerLog); //执行事务方法 SpringUtil.invokeBeanMethod(def.getExecuteBeanId(), def.getExecuteBeanMethod(), def.getExecuteBeanParams()); return RocketMQLocalTransactionState.COMMIT; } catch (Exception e) { logger.error("发生错误:", e); return RocketMQLocalTransactionState.UNKNOWN; } }
放在消息头header中的数据可以传递给RocketMQLocalTransactionListener的checkLocalTransaction方法,然后同样通过反射执行。
@Override public RocketMQLocalTransactionState checkLocalTransaction(Message msg) { try { String txId = (String)msg.getHeaders().get(Constants.TX_ID_HEADER_NAME); String checkBeanId = (String)msg.getHeaders().get(Constants.CHECK_BEAN_ID_HEADER_NAME); Long bizId = Long.parseLong((String)msg.getHeaders().get(Constants.BIZ_ID_HEADER_NAME)); //执行检查方法 Boolean ret = (Boolean)SpringUtil.invokeBeanMethod(checkBeanId, "check", new Object[]{bizId, txId}); if(ret.booleanValue()) return RocketMQLocalTransactionState.COMMIT; else return RocketMQLocalTransactionState.ROLLBACK; } catch (Exception e) { logger.error("发生错误:", e); return RocketMQLocalTransactionState.UNKNOWN; } }
到此这篇关于RocketMQ事务消息机制详解的文章就介绍到这了,更多相关RocketMQ事务消息内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!