今天介绍下如何在 Spring Boot 3 中与 RocketMQ 整合实现分布式事务。RocketMQ 提供了类似 X/Open XA 的分布式事务功能,通过事务消息能达到分布式事务的最终一致。XA 是一种分布式事务解决方案,一种分布式事务处理模式。下面详细介绍下 RocketMQ 如何实现事务消息。
1. 基础概念
在 RocketMQ 中,半消息(Half Message)主要用于实现事务消息。它是指生产者在发送事务消息时,RocketMQ 会先将消息保存为 半消息,等待事务状态的最终确认,确保消息的可靠性和一致性。
半消息的工作流程:
- 发送半消息:生产者首先将消息发送到 RocketMQ,RocketMQ 将其标记为半消息,并暂时存储到消息队列中,但这时消费者不会收到该消息。
- 执行本地事务:生产者在发送半消息后,开始执行自己的本地事务操作。
- 提交或回滚消息:
- 如果本地事务成功,生产者会通知 RocketMQ 提交消息,RocketMQ 将半消息转换为正常消息,并发送给消费者。
- 如果本地事务失败,生产者会通知 RocketMQ回滚消息,RocketMQ 会删除该半消息。
- 事务状态回查:如果 RocketMQ 没有收到生产者的事务状态确认,RocketMQ 会通过回查机制询问生产者事务的最终状态,确保消息的一致性。
现有一个案例,后端文件上传接口,同时上传 OSS 和 MySQL,数据库负责文件元信息的增删改查, OSS负责存储文件对象,如何保证最终一致性?下面用RocketMQ 的事务消息来实现最终一致性。
2. 准备工作
请参考《重学SpringBoot3-集成RocketMQ(一)》进行环境搭建和配置工作。配置文件新增如下配置:
consumer2:
group: springboot-consumer-group2 # 新的消费者组名称
topic: transaction-topic # 订阅新的主题
access-key: RocketMQ # 若启用了 ACL 功能
secret-key: 12345678 # 若启用了 ACL 功能
3. 实现事务消息的生产者
创建一个事务消息的生产者类,通过事务生产者发送消息,并处理本地事务逻辑。
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@Service
public class TransactionalMessageProducer {
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 发送事务消息
*/
public void sendTransactionMessage(String topic, String message) {
TransactionSendResult sendResult = rocketMQTemplate.sendMessageInTransaction(topic, MessageBuilder.withPayload(message).build(), null);
System.out.println("Transaction message sent: " + sendResult.getLocalTransactionState());
}
}
4. 事务监听器实现
通过实现 RocketMQLocalTransactionListener
接口,定义事务的提交或回滚逻辑。
package com.example.boot308rocketmq;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
/**
* @author CoderJia
* @create 2024/09/12 15:06
* @Description
**/
@Component
@RocketMQTransactionListener
public class TransactionListenerImpl implements RocketMQLocalTransactionListener {
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
// 执行本地事务逻辑,根据业务情况返回事务的提交或回滚状态
try {
// 模拟本地事务处理逻辑
System.out.println("Executing local transaction...");
boolean success = performLocalTransaction();
if (success) {
return RocketMQLocalTransactionState.COMMIT;
} else {
return RocketMQLocalTransactionState.ROLLBACK;
}
} catch (Exception e) {
return RocketMQLocalTransactionState.ROLLBACK;
}
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
// 事务回查逻辑,确认本地事务的最终状态
System.out.println("Checking local transaction...");
// 根据本地事务的处理结果返回 COMMIT_MESSAGE 或 ROLLBACK_MESSAGE
System.out.println("local transaction check success");
return RocketMQLocalTransactionState.COMMIT;
}
private boolean performLocalTransaction() {
// TODO 模拟本地事务处理文件上传OSS
try {
System.out.println("Upload files to OSS...");
Thread.sleep(3000);
System.out.println("File upload to OSS completed");
return true;
} catch (InterruptedException e) {
System.out.println("Failed to upload file to OSS");
return false;
}
}
}
5. 消费者示例
创建一个消费者,订阅并消费事务消息。
RocketMQListener<String>
是一个接口类型,用于定义一个 RocketMQ 消息监听器,它指定接收的消息类型为 String。在 RocketMQ 中,消费者可以通过实现 RocketMQListener 接口来自动处理接收到的消息。
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
@Service
@RocketMQMessageListener(topic = "transaction-topic", consumerGroup = "transaction-consumer-group")
public class TransactionalMessageConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
// 处理接收到的消息
System.out.printf("Received message: %s%n", message);
}
}
6. 发送事务消息
在服务中调用事务消息生产者:
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class MessageController {
@Resource
private RocketMQProducer rocketMQProducer;
@GetMapping("/sendTransactionMessage")
public ResponseEntity<String> sendTransactionMessage(@RequestParam String message) {
rocketMQProducer.sendTransactionMessage("transaction-topic", message);
return ResponseEntity.ok("Transaction message sent: " + message);
}
}
7. 测试
7.1 模拟本地事务正常提交
如下图观察到,当本地事务即文件上传完成之后,生产者会通知 RocketMQ 提交消息,RocketMQ 将半消息转换为正常消息,并发送给消费进行消费。
7.2 模拟本地事务提交失败,未回查
在 RocketMQ 中,如果消息生产者没有在规定的时间内向消息队列确认事务状态,RocketMQ 会通过回查机制(即“回滚检查”或“事务回查”)来询问生产者事务的最终状态,从而确保消息的一致性。broker.conf 中配置 transactionCheckMax=10000
表示 RocketMQ 最长等待 10 秒后进行事务状态回查。
7.3 模拟本地事务提交失败,回查成功
本人搭建 RocketMQ 设置的回查时间为15s,所以将本地事务执行时间修改为 16s,这样会触发 RocketMQ 进行事务状态回查。
7.4 模拟本地事务提交失败,回查失败
修改回查方法的返回值,让RocketMQ 回查本地状态值将消息进行回滚,消费者同样不会消费消息。
7.5 模拟本地事务提交成功,消费失败
例如生产者本地事务执行成功,但是消费者消费失败的情况,RocketMQ 会进行消息重试。 修改一下消费者处理逻辑:
package com.example.boot308rocketmq;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
/**
* @author CoderJia
* @create 2024/09/12 15:06
* @Description
**/
@Slf4j
@Service
@RocketMQMessageListener(topic = "transaction-topic", consumerGroup = "springboot-consumer-group2")
public class TransactionalMessageConsumer02 implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt message) {
try {
// 处理消息的业务逻辑
String msgBody = new String(message.getBody(), "UTF-8");
log.info("Received message:{}", msgBody);
// 模拟业务处理
boolean success = processBusinessLogic(msgBody);
if (!success) {
throw new RuntimeException("Business processing failure");
}
log.info("Business processing successful");
} catch (Exception e) {
log.error("MsgID:{},reconsumeTimes:{},e:{}", message.getMsgId(), message.getReconsumeTimes(), e.getMessage());
// 重新抛出异常,让 RocketMQ 进行重试
throw new RuntimeException("Message consumption failed, retrying");
}
}
private boolean processBusinessLogic(String message) {
// 这里是业务逻辑,返回 true 表示成功,false 表示失败
// 例如:数据库操作或其他远程调用
return Math.random() > 0.5; // 模拟随机成功或失败
}
}
关键点总结
- 事务消息发送:通过
RocketMQTemplate.sendMessageInTransaction()
方法发送事务消息。 - 本地事务处理:实现
TransactionListener
接口的executeLocalTransaction()
方法处理本地事务逻辑。 - 事务回查:在
checkLocalTransaction()
方法中定义如何检查事务消息的最终状态。
这个示例展示了如何在 Spring Boot 3 中整合 RocketMQ,并实现事务消息的生产和消费。