跳到主要内容

RocketMQ 消息消费源码

介绍

RocketMQ 是一个分布式消息中间件,广泛应用于大规模分布式系统中。消息消费是 RocketMQ 的核心功能之一,它允许消费者从消息队列中拉取消息并进行处理。本文将深入分析 RocketMQ 消息消费的源码实现,帮助初学者理解其工作原理。

消息消费的基本流程

RocketMQ 的消息消费流程可以分为以下几个步骤:

  1. 消费者启动:消费者启动时,会初始化相关的配置和资源。
  2. 订阅主题:消费者需要订阅一个或多个主题,以接收这些主题下的消息。
  3. 拉取消息:消费者从 Broker 拉取消息。
  4. 消息处理:消费者处理拉取到的消息。
  5. 提交消费进度:消费者处理完消息后,会向 Broker 提交消费进度。

消费者启动

消费者启动时,会初始化 DefaultMQPushConsumerDefaultMQPullConsumer 实例。以 DefaultMQPushConsumer 为例,启动代码如下:

java
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
// 处理消息
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();

订阅主题

消费者通过 subscribe 方法订阅主题。subscribe 方法的第一个参数是主题名称,第二个参数是消息过滤表达式(如 * 表示接收所有消息)。

拉取消息

RocketMQ 提供了两种消息拉取模式:Push 模式Pull 模式。Push 模式下,Broker 会主动将消息推送给消费者;Pull 模式下,消费者需要主动从 Broker 拉取消息。

在 Push 模式下,消费者通过 MessageListener 接口处理消息。MessageListenerConcurrently 是并发处理消息的接口,MessageListenerOrderly 是顺序处理消息的接口。

消息处理

消费者在 MessageListenerconsumeMessage 方法中处理消息。处理完成后,返回 ConsumeConcurrentlyStatus.CONSUME_SUCCESS 表示消费成功,返回 ConsumeConcurrentlyStatus.RECONSUME_LATER 表示消费失败,稍后重新消费。

提交消费进度

消费者处理完消息后,会向 Broker 提交消费进度。消费进度是消费者消费消息的偏移量,Broker 会根据消费进度来判断哪些消息已经被消费。

源码分析

消费者启动源码

DefaultMQPushConsumerstart 方法会调用 start 方法启动消费者。start 方法会初始化 MQClientInstance,并启动 PullMessageServiceRebalanceService

java
public void start() throws MQClientException {
this.defaultMQPushConsumerImpl.start();
}

消息拉取源码

PullMessageService 是负责拉取消息的服务。它会从 RebalanceService 获取消息队列,并调用 PullAPIWrapper 从 Broker 拉取消息。

java
public void run() {
while (!this.isStopped()) {
try {
PullRequest pullRequest = this.pullRequestQueue.take();
this.pullMessage(pullRequest);
} catch (InterruptedException e) {
// 处理异常
}
}
}

消息处理源码

ConsumeMessageService 是负责处理消息的服务。它会调用 MessageListenerconsumeMessage 方法处理消息。

java
public void submitConsumeRequest(
final List<MessageExt> msgs,
final ProcessQueue processQueue,
final MessageQueue messageQueue,
final boolean dispathToConsume) {
// 提交消费请求
}

提交消费进度源码

OffsetStore 是负责管理消费进度的组件。消费者处理完消息后,会调用 OffsetStoreupdateOffset 方法更新消费进度。

java
public void updateOffset(MessageQueue mq, long offset, boolean increaseOnly) {
// 更新消费进度
}

实际案例

假设我们有一个订单系统,订单创建后会发送一条消息到 RocketMQ。消费者订阅该消息并处理订单。

java
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_consumer_group");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.subscribe("OrderTopic", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
// 处理订单
System.out.println("处理订单: " + new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();

总结

本文详细分析了 RocketMQ 消息消费的源码实现,包括消费者启动、订阅主题、拉取消息、消息处理和提交消费进度等关键步骤。通过源码分析,我们可以更好地理解 RocketMQ 消息消费的工作原理。

附加资源

练习

  1. 尝试修改 MessageListener 的实现,使其在消费失败时返回 ConsumeConcurrentlyStatus.RECONSUME_LATER
  2. 使用 DefaultMQPullConsumer 实现一个简单的消息拉取程序,并观察其与 DefaultMQPushConsumer 的区别。