跳到主要内容

RocketMQ 事务实现源码

介绍

RocketMQ 是阿里巴巴开源的一款分布式消息中间件,广泛应用于大规模分布式系统中。事务消息是 RocketMQ 提供的一种高级特性,用于确保消息发送与本地事务的一致性。本文将深入分析 RocketMQ 事务消息的实现源码,帮助初学者理解其核心机制。

什么是事务消息?

事务消息是指消息发送与本地事务的执行保持一致的机制。在分布式系统中,消息发送和本地事务可能分布在不同的节点上,如何确保它们的一致性是一个复杂的问题。RocketMQ 通过两阶段提交(2PC)的方式来实现事务消息。

RocketMQ 事务消息的核心流程

RocketMQ 事务消息的实现主要分为以下几个步骤:

  1. 发送半消息:生产者发送一条半消息到 RocketMQ 服务器,此时消息对消费者不可见。
  2. 执行本地事务:生产者执行本地事务。
  3. 提交或回滚消息:根据本地事务的执行结果,生产者向 RocketMQ 服务器发送提交或回滚请求。
  4. 消息回查:如果 RocketMQ 服务器未收到提交或回滚请求,会定期回查生产者的本地事务状态。

源码分析

1. 发送半消息

在 RocketMQ 中,事务消息的发送通过 TransactionMQProducer 类实现。以下是发送半消息的关键代码:

java
public class TransactionMQProducer extends DefaultMQProducer {
public TransactionMQProducer(String producerGroup) {
super(producerGroup);
}

public SendResult sendMessageInTransaction(final Message msg, final Object arg) throws MQClientException {
// 发送半消息
return this.defaultMQProducerImpl.sendMessageInTransaction(msg, arg);
}
}

2. 执行本地事务

本地事务的执行由用户自定义实现,通常通过实现 TransactionListener 接口来完成。以下是一个简单的本地事务实现示例:

java
public class LocalTransactionListenerImpl implements TransactionListener {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
// 执行本地事务
try {
// 模拟事务执行
boolean success = doBusiness();
if (success) {
return LocalTransactionState.COMMIT_MESSAGE;
} else {
return LocalTransactionState.ROLLBACK_MESSAGE;
}
} catch (Exception e) {
return LocalTransactionState.UNKNOW;
}
}

@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
// 事务回查
return LocalTransactionState.COMMIT_MESSAGE;
}
}

3. 提交或回滚消息

根据本地事务的执行结果,生产者会向 RocketMQ 服务器发送提交或回滚请求。以下是提交消息的关键代码:

java
public class DefaultMQProducerImpl implements MQProducerInner {
public SendResult sendMessageInTransaction(final Message msg, final Object arg) throws MQClientException {
// 发送半消息
SendResult sendResult = this.send(msg);

// 执行本地事务
LocalTransactionState localTransactionState = transactionListener.executeLocalTransaction(msg, arg);

// 提交或回滚消息
this.endTransaction(sendResult, localTransactionState, null);
return sendResult;
}
}

4. 消息回查

如果 RocketMQ 服务器未收到提交或回滚请求,会定期回查生产者的本地事务状态。以下是回查的关键代码:

java
public class TransactionalMessageServiceImpl implements TransactionalMessageService {
public void check(long transactionTimeout) {
// 查询未完成的事务消息
List<MessageExt> msgList = this.batchQueryUnfinishedTransaction(transactionTimeout);

for (MessageExt msg : msgList) {
// 回查本地事务状态
LocalTransactionState localTransactionState = transactionListener.checkLocalTransaction(msg);
this.endTransaction(msg, localTransactionState, null);
}
}
}

实际案例

假设我们有一个电商系统,用户下单后需要发送订单消息到 RocketMQ,同时需要更新本地数据库中的订单状态。我们可以使用 RocketMQ 的事务消息来确保这两个操作的一致性。

java
public class OrderService {
private TransactionMQProducer producer;

public OrderService() {
producer = new TransactionMQProducer("order_producer_group");
producer.setTransactionListener(new LocalTransactionListenerImpl());
producer.start();
}

public void createOrder(Order order) throws MQClientException {
Message msg = new Message("order_topic", "order_tag", order.toString().getBytes());
producer.sendMessageInTransaction(msg, order);
}
}

在这个案例中,createOrder 方法会发送一条事务消息到 RocketMQ,并在本地事务中更新订单状态。如果本地事务成功,消息会被提交;如果失败,消息会被回滚。

总结

RocketMQ 的事务消息通过两阶段提交的方式,确保了消息发送与本地事务的一致性。本文详细分析了 RocketMQ 事务消息的实现源码,并通过实际案例展示了其应用场景。希望本文能帮助初学者更好地理解 RocketMQ 的事务消息机制。

附加资源

练习

  1. 尝试在本地搭建一个 RocketMQ 环境,并实现一个简单的事务消息示例。
  2. 修改 LocalTransactionListenerImpl 中的 executeLocalTransaction 方法,模拟不同的本地事务执行结果,观察消息的提交与回滚行为。