因为日常开发中遇见几次使用延时消息的场景,但是目前业务中使用到的消息中间件有rabbitmq和kafka,对延时消息的支持都不太理想。
其中
kafka延时消息通过在消费端判别消息是否达到消费时间,决定是否进行消费实现。未达到延时时间则暂停消费。
无论是rabbitmq的死信还是kafka消费端控制,基本上都是每位topic只能使用固定的延时时间。但现实中,也存在一些同一个业务场景使用不同延时时间的消息的场景:
于是督查了一下其他的消息中间件,发觉rocketmq其实是支持延时消息的,尽管也只有固定的18个延时等级,但相比rabbitmq和kafka固定的延时消息,要好好多了。于是开始学习探究rocketmq延时消息的实现。
正文开始
先了解一下rocketmqrocketmq的构架
整个构架如图,简单描述一下:
消息的储存
如图,rocketmq的所有消息都储存在commitlog中,之后ConsumerQueue作为逻辑消费队列消息队列使用流程图,维护一个topic消息的索引,记录topic内消息在commitlog中的一些信息。其中ConsumeQueue的储存单元为8字节的offset+4字节的size+8字节的tagshashcode,对于延时消息,最后8字节则用于储存消息计划投递时间。
之后关于rocketmq的延时消息的使用
rocketmq只支持固定18个等级的延时消息:
1s5s10s30s1m2m3m4m5m6m7m8m9m10m20m30m1h2h
发送延时消息只须要setDelayTimeLevel就可以,(连个延时等级相关的常量都没有。。。)
DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
// Launch producer
producer.start();
int totalMessagesToSend = 100;
for (int i = 0; i < totalMessagesToSend; i++) {
Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
// This message will be delivered to consumer 10 seconds later.
message.setDelayTimeLevel(3);
// Send the message
producer.send(message);
}
// Shutdown producer after use.
producer.shutdown();
之后关于延时消息的实现
先看下流程图
之后文字+代码介绍
总结
rocketmq先将不同延时等级的消息存入内部对应延时队列中消息队列使用流程图,之后不断的从延时队列中拉取消息判定是否到期,之后进行投递到对应的topic中。
通过固定延时等级的形式,同一个队列中的消息都是相同的延时等级,不须要对消息进行排序,只须要按次序拉取消息判定是否可以投递就行了。但也限制了延时时间。
另外,由于只要延时消息存入延时队列中,都会写入commitlog文件中,之后rocketmq的高可用(同步复制或异步复制)都会将消息复制到slave中,进而保证延时消息的可靠性。
尽管rocketmq不支持任意延时时间,但相比于rabbitmq的死信消息,仍旧提供了18个延时等级,基本也能覆盖好多场景了。
另外:前面又见到了去哪了开源的qmq,其实支持任意延后时间,觉得也可以学习一波。
暂无评论内容