跳到主要内容

RocketMQ 消息发送源码分析

RocketMQ 是一款高性能、高吞吐量的分布式消息中间件,广泛应用于大规模分布式系统中。消息发送是 RocketMQ 的核心功能之一,理解其源码实现对于掌握 RocketMQ 的工作原理至关重要。本文将逐步解析 RocketMQ 消息发送的源码,并通过实际案例展示其应用场景。

1. 消息发送的基本流程

在 RocketMQ 中,消息发送的基本流程可以分为以下几个步骤:

  1. 创建消息生产者(Producer):首先需要创建一个消息生产者实例。
  2. 构建消息(Message):定义消息的内容、主题(Topic)、标签(Tag)等属性。
  3. 发送消息:将消息发送到指定的主题。
  4. 处理发送结果:根据发送结果进行相应的处理,如重试、日志记录等。

1.1 创建消息生产者

在 RocketMQ 中,消息生产者是通过 DefaultMQProducer 类来创建的。以下是一个简单的示例:

java
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
备注

DefaultMQProducer 是 RocketMQ 提供的默认消息生产者实现,ProducerGroupName 是生产者组的名称,NamesrvAddr 是 NameServer 的地址。

1.2 构建消息

消息是通过 Message 类来构建的。以下是一个构建消息的示例:

java
Message msg = new Message("TopicTest", "TagA", "Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET));
提示

TopicTest 是消息的主题,TagA 是消息的标签,Hello RocketMQ 是消息的内容。

1.3 发送消息

消息发送是通过 send 方法完成的。以下是一个发送消息的示例:

java
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
警告

send 方法是同步发送消息,会阻塞直到消息发送完成。RocketMQ 还提供了异步发送和单向发送的方式。

1.4 处理发送结果

发送结果通过 SendResult 类返回,包含了消息的发送状态、消息ID等信息。以下是一个处理发送结果的示例:

java
if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
System.out.println("消息发送成功");
} else {
System.out.println("消息发送失败");
}

2. 消息发送的源码解析

2.1 消息发送的核心方法

RocketMQ 消息发送的核心方法是 DefaultMQProducerImpl#sendDefaultImpl。该方法负责处理消息发送的整个流程,包括消息的校验、路由信息的获取、消息的发送等。

java
private SendResult sendDefaultImpl(
Message msg,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
// 校验消息
Validators.checkMessage(msg, this.defaultMQProducer);

// 获取路由信息
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());

// 选择消息队列
MessageQueue mq = topicPublishInfo.selectOneMessageQueue();

// 发送消息
SendResult sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, timeout);

return sendResult;
}
备注

sendDefaultImpl 方法是 RocketMQ 消息发送的核心逻辑,包含了消息的校验、路由信息的获取、消息队列的选择以及消息的发送。

2.2 消息发送的底层实现

消息发送的底层实现是通过 DefaultMQProducerImpl#sendKernelImpl 方法完成的。该方法负责将消息发送到指定的消息队列。

java
private SendResult sendKernelImpl(
final Message msg,
final MessageQueue mq,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
// 构建发送消息的请求头
SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTopic(msg.getTopic());
requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setSysFlag(0);
requestHeader.setBornTimestamp(System.currentTimeMillis());
requestHeader.setFlag(msg.getFlag());
requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));

// 发送消息
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);
request.setBody(msg.getBody());

RemotingCommand response = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
BrokerVIPChannelUtil.getBrokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel()),
mq.getBrokerName(),
request,
timeout
);

// 处理发送结果
SendResult sendResult = MQClientAPIImpl.processSendResponse(response);
return sendResult;
}
提示

sendKernelImpl 方法是 RocketMQ 消息发送的底层实现,负责将消息发送到指定的消息队列,并处理发送结果。

3. 实际案例

3.1 电商系统中的订单消息发送

在电商系统中,订单创建后需要发送消息通知其他系统进行处理。以下是一个订单消息发送的示例:

java
DefaultMQProducer producer = new DefaultMQProducer("OrderProducerGroup");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();

// 构建订单消息
Order order = new Order("123456", "1001", new Date());
Message msg = new Message("OrderTopic", "CreateOrder", JSON.toJSONString(order).getBytes(RemotingHelper.DEFAULT_CHARSET));

// 发送订单消息
SendResult sendResult = producer.send(msg);
if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
System.out.println("订单消息发送成功");
} else {
System.out.println("订单消息发送失败");
}

producer.shutdown();
警告

在实际应用中,消息发送可能会遇到网络波动、Broker 宕机等问题,因此需要做好异常处理和消息重试机制。

4. 总结

本文详细解析了 RocketMQ 消息发送的源码实现,从消息生产者的创建、消息的构建、消息的发送到发送结果的处理,逐步讲解了 RocketMQ 消息发送的核心流程。通过实际案例展示了 RocketMQ 消息发送在电商系统中的应用场景。

5. 附加资源与练习

  • 附加资源

  • 练习

    • 尝试使用 RocketMQ 发送不同类型的消息(如同步消息、异步消息、单向消息)。
    • 实现一个简单的消息重试机制,处理消息发送失败的情况。
注意

在进行练习时,请确保已经正确配置了 RocketMQ 的环境,并熟悉基本的 Java 编程知识。