跳到主要内容

RocketMQ 消息过滤

在分布式消息系统中,消息过滤是一种重要的机制,它允许生产者将消息发送给特定的消费者,而不是所有消费者。RocketMQ 提供了强大的消息过滤功能,使得开发者可以根据业务需求灵活地控制消息的流向。

什么是消息过滤?

消息过滤是指根据一定的规则或条件,选择性地将消息发送给特定的消费者。在 RocketMQ 中,消息过滤可以通过 TagSQL92 两种方式来实现。

  • Tag 过滤:通过为消息设置标签(Tag),消费者可以订阅特定的标签来接收消息。
  • SQL92 过滤:通过 SQL92 语法,消费者可以根据消息的属性进行更复杂的过滤。

Tag 过滤

1. 基本概念

Tag 是 RocketMQ 中最简单的消息过滤方式。每个消息可以设置一个或多个 Tag,消费者在订阅时可以指定只接收带有特定 Tag 的消息。

2. 代码示例

以下是一个使用 Tag 过滤的示例:

java
// 生产者代码
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroup");
producer.start();

Message msg = new Message("TopicTest", "TagA", "Hello RocketMQ".getBytes());
SendResult sendResult = producer.send(msg);
System.out.println("消息发送成功: " + sendResult);

producer.shutdown();
java
// 消费者代码
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroup");
consumer.subscribe("TopicTest", "TagA || TagB");

consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
for (MessageExt msg : msgs) {
System.out.println("收到消息: " + new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});

consumer.start();

3. 解释

  • 生产者发送消息时,指定了 TagA 作为消息的标签。
  • 消费者订阅时,使用了 TagA || TagB 的表达式,表示只接收带有 TagATagB 的消息。
提示

Tag 过滤适用于简单的过滤场景,如果需要更复杂的过滤条件,可以考虑使用 SQL92 过滤。

SQL92 过滤

1. 基本概念

SQL92 过滤允许消费者根据消息的属性进行更复杂的过滤。消息的属性可以在生产者发送消息时设置,消费者可以通过 SQL92 语法来过滤这些属性。

2. 代码示例

以下是一个使用 SQL92 过滤的示例:

java
// 生产者代码
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroup");
producer.start();

Message msg = new Message("TopicTest", "TagA", "Hello RocketMQ".getBytes());
msg.putUserProperty("a", "1");
msg.putUserProperty("b", "2");
SendResult sendResult = producer.send(msg);
System.out.println("消息发送成功: " + sendResult);

producer.shutdown();
java
// 消费者代码
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroup");
consumer.subscribe("TopicTest", MessageSelector.bySql("a > 0 AND b = 2"));

consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
for (MessageExt msg : msgs) {
System.out.println("收到消息: " + new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});

consumer.start();

3. 解释

  • 生产者在发送消息时,设置了两个属性 ab
  • 消费者订阅时,使用了 SQL92 表达式 a > 0 AND b = 2,表示只接收属性 a 大于 0 且属性 b 等于 2 的消息。
警告

SQL92 过滤功能需要 RocketMQ Broker 支持,并且可能会对性能产生一定影响,建议在必要时使用。

实际应用场景

1. 电商订单系统

在电商系统中,订单消息可能包含不同的状态(如“待支付”、“已支付”、“已发货”)。通过 Tag 过滤,可以将不同状态的订单消息发送给不同的消费者进行处理。

2. 日志处理系统

在日志处理系统中,日志消息可能包含不同的级别(如“INFO”、“WARN”、“ERROR”)。通过 SQL92 过滤,可以根据日志级别将消息发送给不同的消费者进行处理。

总结

RocketMQ 的消息过滤功能为开发者提供了灵活的消息分发机制。通过 Tag 过滤和 SQL92 过滤,开发者可以根据业务需求精确控制消息的流向。无论是简单的标签过滤还是复杂的属性过滤,RocketMQ 都能满足你的需求。

附加资源

练习

  1. 尝试在本地搭建 RocketMQ 环境,并使用 Tag 过滤实现一个简单的消息发送和接收示例。
  2. 修改上述示例,使用 SQL92 过滤实现更复杂的消息过滤条件。