跳到主要内容

RocketMQ 普通消息

RocketMQ 是一个分布式消息中间件,广泛应用于大规模分布式系统中。普通消息是 RocketMQ 中最基础的消息类型,适用于大多数场景。本文将详细介绍 RocketMQ 普通消息的概念、使用方式以及实际应用场景。

什么是普通消息?

普通消息是 RocketMQ 中最简单的消息类型,也称为“点对点消息”或“队列消息”。它的特点是:

  • 无序性:普通消息在队列中是无序的,消费者按照消息到达的顺序进行消费。
  • 可靠性:RocketMQ 保证消息的可靠传递,确保消息不会丢失。
  • 高吞吐量:普通消息适用于高吞吐量的场景,能够处理大量的消息。

普通消息适用于不需要严格顺序的场景,例如日志收集、通知推送等。

普通消息的使用

发送普通消息

在 RocketMQ 中,发送普通消息非常简单。以下是一个使用 Java 客户端发送普通消息的示例:

java
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

public class Producer {
public static void main(String[] args) throws Exception {
// 实例化一个生产者,指定生产者组名
DefaultMQProducer producer = new DefaultMQProducer("producer_group");
// 设置NameServer地址
producer.setNamesrvAddr("localhost:9876");
// 启动生产者
producer.start();

for (int i = 0; i < 10; i++) {
// 创建消息,指定Topic、Tag和消息体
Message msg = new Message("TopicTest", "TagA", ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
// 发送消息
producer.send(msg);
System.out.printf("发送消息:%s%n", new String(msg.getBody()));
}

// 关闭生产者
producer.shutdown();
}
}
备注

在上面的代码中,我们创建了一个生产者实例,并通过 send 方法发送了10条普通消息。每条消息都包含一个主题(Topic)、标签(Tag)和消息体(Body)。

消费普通消息

消费普通消息同样简单。以下是一个使用 Java 客户端消费普通消息的示例:

java
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class Consumer {
public static void main(String[] args) throws Exception {
// 实例化一个消费者,指定消费者组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group");
// 设置NameServer地址
consumer.setNamesrvAddr("localhost:9876");
// 订阅Topic和Tag
consumer.subscribe("TopicTest", "*");
// 注册消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.printf("收到消息:%s%n", new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费者
consumer.start();
System.out.println("消费者已启动,等待消息...");
}
}
提示

在上面的代码中,我们创建了一个消费者实例,并通过 registerMessageListener 方法注册了一个消息监听器。当有消息到达时,监听器会自动处理消息。

实际应用场景

普通消息在以下场景中非常有用:

  1. 日志收集:将应用程序的日志发送到 RocketMQ,然后由消费者进行集中处理和分析。
  2. 通知推送:将通知消息发送到 RocketMQ,然后由消费者推送给用户。
  3. 异步任务处理:将任务消息发送到 RocketMQ,然后由消费者异步处理任务。

总结

RocketMQ 的普通消息是最基础的消息类型,适用于大多数不需要严格顺序的场景。通过本文的介绍,你应该已经掌握了如何发送和消费普通消息,并了解了其实际应用场景。

附加资源

练习

  1. 尝试修改上面的代码,发送和消费带有不同标签(Tag)的消息。
  2. 编写一个生产者,发送100条普通消息,并编写一个消费者,统计接收到的消息数量。

通过实践,你将更好地理解 RocketMQ 普通消息的工作原理。