跳到主要内容

RocketMQ 延迟消息

介绍

RocketMQ 是一个分布式消息中间件,广泛应用于异步通信、解耦系统、流量削峰等场景。RocketMQ 提供了延迟消息的功能,允许消息在发送后延迟一段时间再被消费者消费。这种机制在需要定时任务、延迟处理等场景中非常有用。

延迟消息的核心思想是:消息在发送时指定一个延迟时间,消息队列会在延迟时间到达后,再将消息投递给消费者。RocketMQ 支持多个延迟级别,每个级别对应一个固定的延迟时间。

延迟消息的工作原理

RocketMQ 的延迟消息是通过消息的 delayTimeLevel 属性来实现的。当生产者发送消息时,可以设置 delayTimeLevel,RocketMQ 会根据这个值将消息存储在对应的延迟队列中。当延迟时间到达后,消息会被转移到正常的消息队列中,供消费者消费。

RocketMQ 默认支持 18 个延迟级别,每个级别对应一个固定的延迟时间:

延迟级别延迟时间
11s
25s
310s
430s
51m
62m
73m
84m
95m
106m
117m
128m
139m
1410m
1520m
1630m
171h
182h
备注

延迟级别是固定的,无法自定义延迟时间。如果需要更灵活的延迟时间,可以考虑使用定时消息。

如何使用延迟消息

1. 发送延迟消息

在发送消息时,可以通过设置 delayTimeLevel 来指定延迟级别。以下是一个 Java 示例:

java
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;

public class DelayMessageProducer {
public static void main(String[] args) throws Exception {
// 实例化一个生产者
DefaultMQProducer producer = new DefaultMQProducer("DelayMessageProducerGroup");
// 设置NameServer地址
producer.setNamesrvAddr("localhost:9876");
// 启动生产者
producer.start();

// 创建消息,指定Topic和消息体
Message msg = new Message("TestTopic", "Hello, RocketMQ!".getBytes());
// 设置延迟级别为3,即延迟10秒
msg.setDelayTimeLevel(3);

// 发送消息
producer.send(msg);
System.out.println("消息已发送,延迟10秒后投递");

// 关闭生产者
producer.shutdown();
}
}

在这个示例中,我们创建了一个消息并设置了 delayTimeLevel 为 3,表示消息将在 10 秒后投递给消费者。

2. 消费延迟消息

消费者在消费延迟消息时,与消费普通消息的方式相同。以下是一个简单的消费者示例:

java
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class DelayMessageConsumer {
public static void main(String[] args) throws Exception {
// 实例化一个消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("DelayMessageConsumerGroup");
// 设置NameServer地址
consumer.setNamesrvAddr("localhost:9876");
// 订阅Topic
consumer.subscribe("TestTopic", "*");

// 注册消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.println("收到消息: " + new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});

// 启动消费者
consumer.start();
System.out.println("消费者已启动,等待消息...");
}
}

在这个示例中,消费者会订阅 TestTopic,并在收到消息后打印消息内容。由于消息设置了延迟时间,消费者将在 10 秒后收到消息。

实际应用场景

1. 订单超时取消

在电商系统中,用户下单后如果未在规定时间内支付,订单会被自动取消。这种场景可以使用延迟消息来实现。当用户下单时,发送一条延迟消息,延迟时间为订单超时时间。如果用户在延迟时间内未支付,消费者会收到消息并执行取消订单的操作。

2. 定时任务

在某些系统中,可能需要定时执行某些任务,例如每天凌晨生成报表。可以使用延迟消息来实现定时任务。发送一条延迟消息,延迟时间为任务执行时间,消费者在收到消息后执行相应的任务。

总结

RocketMQ 的延迟消息功能为开发者提供了一种简单而强大的工具,用于处理需要延迟执行的任务。通过设置 delayTimeLevel,可以轻松实现消息的延迟投递。在实际应用中,延迟消息可以用于订单超时取消、定时任务等场景。

附加资源

练习

  1. 修改上述生产者代码,尝试发送不同延迟级别的消息,观察消费者的接收时间。
  2. 设计一个场景,使用延迟消息实现一个简单的定时任务系统。