目录
  • 简介
  • 原理
  • 具体实现
    • 消费者
    • 消费者
    • 生产者消息监听器
  • 消息事务测试
    • 正常测试
    • 异常测试
    • 代码调整
    • 执行结果
  • 总结

    简介

    RocketMQ 事务消息(Transactional Message)是指应用本地事务和发送消息操作可以被定义到全局事务中,要么同时成功,要么同时失败。RocketMQ 的事务消息提供类似 X/Open XA 的分布事务功能,通过事务消息能达到分布式事务的最终一致。

    原理

    RocketMQ事务消息通过异步确保方式,保证事务的最终一致性。设计的思想可以借鉴两个阶段提交事务。其执行流程图如下:

    SpringBoot集成RocketMQ发送事务消息的原理解析

    • 发送方向MQ服务端发送消息。
    • MQ Server将消息持久化成功之后,向发送方 ACK 确认消息已经发送成功,此时消息为半消息。
    • 发送方开始执行本地事务逻辑。
    • 发送方根据本地事务执行结果向 MQ Server 提交二次确认(Commit 或是 Rollback),MQ Server 收到 Commit 状态则将半消息标记为可投递,订阅方最终将收到该消息;MQ Server 收到 Rollback 状态则删除半消息,订阅方将不会接受该消息。
    • 在断网或者是应用重启的特殊情况下,上述步骤4提交的二次确认最终未到达 MQ Server,经过固定时间后 MQ Server 将对该消息发起消息回查。
    • 发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
    • 发送方根据检查得到的本地事务的最终状态再次提交二次确认,MQ Server 仍按照步骤4对半消息进行操作。

    具体实现

    消费者

    @Component
    public class TransactionProduce
    {
        private Logger logger = LoggerFactory.getLogger(getClass());
        
        @Autowired
        private RocketMQTemplate rocketMQTemplate;
        
        public void sendTransactionMessage(String msg)
        {
            logger.info("start sendTransMessage hashKey:{}",msg);
           
             Message message =new Message();
             message.setBody("this is tx message".getBytes());
             TransactionSendResult result=rocketMQTemplate.sendMessageInTransaction("test-tx-rocketmq", 
                     MessageBuilder.withPayload(message).build(), msg);
             
             //发送状态
             String sendStatus = result.getSendStatus().name();
             // 本地事务执行状态
             String localTxState = result.getLocalTransactionState().name();
             logger.info("send tx message sendStatus:{},localTXState:{}",sendStatus,localTxState);
        } 
    }

    说明:发送事务消息采用的是sendMessageInTransaction方法,返回结果为TransactionSendResult对象,该对象中包含了事务发送的状态、本地事务执行的状态等。

    消费者

    @Component
    @RocketMQMessageListener(consumerGroup="test-txRocketmq-group",topic="test-tx-rocketmq", messageModel = MessageModel.CLUSTERING)
    public class TransactionConsumer implements RocketMQListener<String>
    {
        private Logger logger =LoggerFactory.getLogger(getClass());
        @Override
        public void onMessage(String message)
        {
            logger.info("send transaction mssage parma is:{}", message);
        }
    }

    说明:发送事务消息的消费者与普通的消费者一样没有太大的区别。

    生产者消息监听器

    发送事务消息除了生产者和消费者以外,我们还需要创建生产者的消息监听器,来监听本地事务执行的状态和检查本地事务状态。

    @RocketMQTransactionListener
    public class TransactionMsgListener implements RocketMQLocalTransactionListener
    {
        private Logger logger = LoggerFactory.getLogger(getClass());
        /**
         * 执行本地事务
         */
        @Override
        public RocketMQLocalTransactionState executeLocalTransaction(Message msg,
                Object obj)
        {
            logger.info("start invoke local rocketMQ transaction");
            RocketMQLocalTransactionState resultState = RocketMQLocalTransactionState.COMMIT;
            
            try
            {
                //处理业务
                String jsonStr = new String((byte[]) msg.getPayload(), StandardCharsets.UTF_8);
                logger.info("invoke msg content:{}",jsonStr);
            }
            catch (Exception e)
            {
                logger.error("invoke local mq trans error",e);
                resultState = RocketMQLocalTransactionState.UNKNOWN;
            }
            
            return resultState;
        }
    
        /**
         * 检查本地事务的状态
         */
        @Override
        public RocketMQLocalTransactionState checkLocalTransaction(Message msg)
        {
            logger.info("start check Local rocketMQ transaction");
            
            RocketMQLocalTransactionState resultState = RocketMQLocalTransactionState.COMMIT;
            
            try
            {
                String jsonStr = new String((byte[]) msg.getPayload(), StandardCharsets.UTF_8);
                logger.info("check trans msg content:{}",jsonStr);
            }
            catch (Exception e)
            {
                resultState  = RocketMQLocalTransactionState.ROLLBACK;
            }
            return resultState;
        }
    }
    

    说明:RocketMQ本地事务状态由如下几种:

    • RocketMQLocalTransactionState.COMMIT:提交事务,允许消费者消费此消息。
    • RocketMQLocalTransactionState.ROLLBACK: 回滚事务,消息将被删除,不允许被消费。
    • RocketMQLocalTransactionState.UNKNOWN:中间状态,代表需要进行检查来确定状态。

    注意:Spring Boot2.0的版本之后,@RocketMQTransactionListener 已经没有了txProducerGroup属性,且sendMessageInTransaction方法也将其移除。所以在同一项目中只能有一个@RocketMQTransactionListener,不能出现多个,否则会报如下错误:

    java.lang.IllegalStateException: rocketMQTemplate already exists RocketMQLocalTransactionListener

    消息事务测试

    正常测试

    c.s.fw.mq.produce.TransactionProduce - product start sendTransMessage msg:{"userId":"zhangsann"}
    c.s.f.m.p.TransactionMsgListener - start invoke local rocketMQ transaction
    c.s.f.m.p.TransactionMsgListener - invoke local transaction msg content:{"topic":null,"flag":0,"properties":null,"body":"dGhpcyBpcyB0eCBtZXNzYWdl","transactionId":null,"keys":null,"tags":null,"delayTimeLevel":0,"waitStoreMsgOK":true,"buyerId":null}
    c.s.fw.mq.produce.TransactionProduce - send tx message sendStatus:SEND_OK,localTXState:COMMIT_MESSAGE
    c.s.f.m.consumer.TransactionConsumer - send transaction mssage parma is:{"topic":null,"flag":0,"properties":null,"body":"dGhpcyBpcyB0eCBtZXNzYWdl","transactionId":null,"keys":null,"tags":null,"delayTimeLevel":0,"waitStoreMsgOK":true,"buyerId":null}
    

    说明:通过日志我们可以看出,执行的流程与上述的一致,执行成功后,消息执行成功返回的结果为SEND_OK,本地事务执行的状态为COMMIT_MESSAGE。

    异常测试

    如果在执行本地消息时出现异常,那么执行结果会是怎样?修改下本地事务执行的方法,让其出现异常。

    代码调整

      @Override
        public RocketMQLocalTransactionState executeLocalTransaction(Message msg,
                Object obj)
        {
            logger.info("start invoke local rocketMQ transaction");
            RocketMQLocalTransactionState resultState = RocketMQLocalTransactionState.COMMIT;
            
            try
            {
                //处理业务
                String jsonStr = new String((byte[]) msg.getPayload(), StandardCharsets.UTF_8);
                logger.info("invoke local transaction msg content:{}",jsonStr);
                 int c=1/0;
            }
            catch (Exception e)
            {
                logger.error("invoke local mq trans error",e);
                resultState = RocketMQLocalTransactionState.UNKNOWN;
            }
            
            return resultState;
        }
    

    执行结果

    c.s.fw.mq.produce.TransactionProduce – send tx message sendStatus:SEND_OK,localTXState:UNKNOW

    从执行的结果可以看出,消息执行成功返回的结果为SEND_OK,本地事务执行的状态为:UNKNOW.所以消费端无法消费此消息。

    总结

    声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。