RocketMQ 延迟消息
介绍
RocketMQ 是一个分布式消息中间件,广泛应用于异步通信、解耦系统、流量削峰等场景。RocketMQ 提供了延迟消息的功能,允许消息在发送后延迟一段时间再被消费者消费。这种机制在需要定时任务、延迟处理等场景中非常有用。
延迟消息的核心思想是:消息在发送时指定一个延迟时间,消息队列会在延迟时间到达后,再将消息投递给消费者。RocketMQ 支持多个延迟级别,每个级别对应一个固定的延迟时间。
延迟消息的工作原理
RocketMQ 的延迟消息是通过消息的 delayTimeLevel
属性来实现的。当生产者发送消息时,可以设置 delayTimeLevel
,RocketMQ 会根据这个值将消息存储在对应的延迟队列中。当延迟时间到达后,消息会被转移到正常的消息队列中,供消费者消费。
RocketMQ 默认支持 18 个延迟级别,每个级别对应一个固定的延迟时间:
延迟级别 | 延迟时间 |
---|---|
1 | 1s |
2 | 5s |
3 | 10s |
4 | 30s |
5 | 1m |
6 | 2m |
7 | 3m |
8 | 4m |
9 | 5m |
10 | 6m |
11 | 7m |
12 | 8m |
13 | 9m |
14 | 10m |
15 | 20m |
16 | 30m |
17 | 1h |
18 | 2h |
延迟级别是固定的,无法自定义延迟时间。如果需要更灵活的延迟时间,可以考虑使用定时消息。
如何使用延迟消息
1. 发送延迟消息
在发送消息时,可以通过设置 delayTimeLevel
来指定延迟级别。以下是一个 Java 示例:
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
public class DelayMessageProducer {
public static void main(String[] args) throws Exception {
// 实例化一个生产者
DefaultMQProducer producer = new DefaultMQProducer("DelayMessageProducerGroup");
// 设置NameServer地址
producer.setNamesrvAddr("localhost:9876");
// 启动生产者
producer.start();
// 创建消息,指定Topic和消息体
Message msg = new Message("TestTopic", "Hello, RocketMQ!".getBytes());
// 设置延迟级别为3,即延迟10秒
msg.setDelayTimeLevel(3);
// 发送消息
producer.send(msg);
System.out.println("消息已发送,延迟10秒后投递");
// 关闭生产者
producer.shutdown();
}
}
在这个示例中,我们创建了一个消息并设置了 delayTimeLevel
为 3,表示消息将在 10 秒后投递给消费者。
2. 消费延迟消息
消费者在消费延迟消息时,与消费普通消息的方式相同。以下是一个简单的消费者示例:
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class DelayMessageConsumer {
public static void main(String[] args) throws Exception {
// 实例化一个消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("DelayMessageConsumerGroup");
// 设置NameServer地址
consumer.setNamesrvAddr("localhost:9876");
// 订阅Topic
consumer.subscribe("TestTopic", "*");
// 注册消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.println("收到消息: " + new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费者
consumer.start();
System.out.println("消费者已启动,等待消息...");
}
}
在这个示例中,消费者会订阅 TestTopic
,并在收到消息后打印消息内容。由于消息设置了延迟时间,消费者将在 10 秒后收到消息。
实际应用场景
1. 订单超时取消
在电商系统中,用户下单后如果未在规定时间内支付,订单会被自动取消。这种场景可以使用延迟消息来实现。当用户下单时,发送一条延迟消息,延迟时间为订单超时时间。如果用户在延迟时间内未支付,消费者会收到消息并执行取消订单的操作。
2. 定时任务
在某些系统中,可能需要定时执行某些任务,例如每天凌晨生成报表。可以使用延迟消息来实现定时任务。发送一条延迟消息,延迟时间为任务执行时间,消费者在收到消息后执行相应的任务。
总结
RocketMQ 的延迟消息功能为开发者提供了一种简单而强大的工具,用于处理需要延迟执行的任务。通过设置 delayTimeLevel
,可以轻松实现消息的延迟投递。在实际应用中,延迟消息可以用于订单超时取消、定时任务等场景。
附加资源
练习
- 修改上述生产者代码,尝试发送不同延迟级别的消息,观察消费者的接收时间。
- 设计一个场景,使用延迟消息实现一个简单的定时任务系统。