跳到主要内容

RocketMQ 消息发送方式

RocketMQ 是一个分布式消息中间件,广泛应用于高并发、高吞吐量的场景。作为 RocketMQ 的生产者,了解如何发送消息是至关重要的。RocketMQ 提供了三种主要的消息发送方式:同步发送异步发送单向发送。本文将详细介绍这三种方式,并通过代码示例帮助您理解它们的使用场景。

1. 同步发送

同步发送是最常用的消息发送方式。在这种模式下,生产者发送消息后会等待 Broker 的响应,确保消息成功发送到 Broker 后再继续执行后续代码。这种方式适合对消息可靠性要求较高的场景。

代码示例

java
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

public class SyncProducer {
public static void main(String[] args) throws Exception {
// 实例化生产者
DefaultMQProducer producer = new DefaultMQProducer("producer_group");
// 设置NameServer地址
producer.setNamesrvAddr("localhost:9876");
// 启动生产者
producer.start();

for (int i = 0; i < 10; i++) {
// 创建消息,指定Topic、Tag和消息体
Message msg = new Message("TopicTest", "TagA", ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
// 发送消息并获取发送结果
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}

// 关闭生产者
producer.shutdown();
}
}

输出示例

SendResult [sendStatus=SEND_OK, msgId=7F0000010A6418B4AAC2F3D7B7D80000, offsetMsgId=7F00000100002A9F0000000000000000, queueId=0, queueOffset=0]
备注

同步发送的优点是消息发送的可靠性高,缺点是发送速度较慢,因为需要等待 Broker 的响应。

2. 异步发送

异步发送允许生产者在发送消息后立即返回,而不需要等待 Broker 的响应。Broker 的响应会通过回调函数返回给生产者。这种方式适合对发送速度要求较高,但对消息可靠性要求相对较低的场景。

代码示例

java
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

public class AsyncProducer {
public static void main(String[] args) throws Exception {
// 实例化生产者
DefaultMQProducer producer = new DefaultMQProducer("producer_group");
// 设置NameServer地址
producer.setNamesrvAddr("localhost:9876");
// 启动生产者
producer.start();

for (int i = 0; i < 10; i++) {
// 创建消息,指定Topic、Tag和消息体
Message msg = new Message("TopicTest", "TagA", ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
// 异步发送消息,并注册回调函数
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.printf("消息发送成功: %s%n", sendResult);
}

@Override
public void onException(Throwable e) {
System.out.printf("消息发送失败: %s%n", e);
}
});
}

// 关闭生产者
producer.shutdown();
}
}

输出示例

消息发送成功: SendResult [sendStatus=SEND_OK, msgId=7F0000010A6418B4AAC2F3D7B7D80000, offsetMsgId=7F00000100002A9F0000000000000000, queueId=0, queueOffset=0]
提示

异步发送的优点是发送速度快,适合高吞吐量的场景。缺点是消息发送的可靠性较低,因为无法立即知道消息是否成功发送到 Broker。

3. 单向发送

单向发送是一种“发后即忘”的发送方式。生产者发送消息后不会等待 Broker 的响应,也不会收到任何回调。这种方式适合对消息可靠性要求较低,但对发送速度要求极高的场景。

代码示例

java
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

public class OnewayProducer {
public static void main(String[] args) throws Exception {
// 实例化生产者
DefaultMQProducer producer = new DefaultMQProducer("producer_group");
// 设置NameServer地址
producer.setNamesrvAddr("localhost:9876");
// 启动生产者
producer.start();

for (int i = 0; i < 10; i++) {
// 创建消息,指定Topic、Tag和消息体
Message msg = new Message("TopicTest", "TagA", ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
// 单向发送消息
producer.sendOneway(msg);
System.out.printf("单向发送消息: %s%n", msg);
}

// 关闭生产者
producer.shutdown();
}
}

输出示例

单向发送消息: Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=0}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 48]}
警告

单向发送的优点是发送速度最快,缺点是消息发送的可靠性最低,因为无法确认消息是否成功发送到 Broker。

实际应用场景

  1. 同步发送:适用于金融交易、订单处理等对消息可靠性要求极高的场景。
  2. 异步发送:适用于日志收集、监控数据上报等对发送速度要求较高,但对消息可靠性要求相对较低的场景。
  3. 单向发送:适用于广告推送、通知消息等对发送速度要求极高,但对消息可靠性要求较低的场景。

总结

RocketMQ 提供了三种消息发送方式,每种方式都有其适用的场景。同步发送适合对消息可靠性要求高的场景,异步发送适合对发送速度要求高的场景,而单向发送则适合对发送速度要求极高的场景。根据实际需求选择合适的发送方式,可以显著提升系统的性能和可靠性。

附加资源

练习

  1. 尝试使用同步发送方式发送一条消息,并观察发送结果。
  2. 使用异步发送方式发送多条消息,并实现回调函数处理发送结果。
  3. 使用单向发送方式发送消息,并观察消息是否成功发送到 Broker。

通过以上练习,您将更好地理解 RocketMQ 的消息发送方式及其适用场景。