欧美一区二区三区,国内熟女精品熟女A片视频小说,日本av网,小鲜肉男男GAY做受XXX网站

rocketmq延時隊列實現原理

林子帆2年前81瀏覽0評論

rocketmq延時隊列實現原理?

RocketMQ是一款開源的分布式消息系統,基于高可用分布式集群技術,提供低延時的、高可靠、萬億級容量、靈活可伸縮的消息發布與訂閱服務。

它前身是MetaQ,是阿里基于Kafka的設計使用Java進行自主研發的。在2012年,阿里將其開源, 在2016年,阿里將其捐獻給Apache軟件基金會(Apache Software Foundation,簡稱為ASF),正式成為孵化項目。2017 年,Apache軟件基金會宣布RocketMQ已孵化成為 Apache頂級項目(Top Level Project,簡稱為TLP ),是國內首個互聯網中間件在 Apache上的頂級項目。

延遲消息

生產者把消息發送到消息隊列中以后,并不期望被立即消費,而是等待指定時間后才可以被消費者消費,這類消息通常被稱為延遲消息。

在RocketMQ中,支持延遲消息,但是不支持任意時間精度的延遲消息,只支持特定級別的延遲消息。如果要支持任意時間精度,不能避免在Broker層面做消息排序,再涉及到持久化的考量,那么消息排序就不可避免產生巨大的性能開銷。

消息延遲級別分別為1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h,共18個級別。在發送消息時,設置消息延遲級別即可,設置消息延遲級別時有以下3種情況:

設置消息延遲級別等于0時,則該消息為非延遲消息。設置消息延遲級別大于等于1并且小于等于18時,消息延遲特定時間,如:設置消息延遲級別等于1,則延遲1s;設置消息延遲級別等于2,則延遲5s,以此類推。設置消息延遲級別大于18時,則該消息延遲級別為18,如:設置消息延遲級別等于20,則延遲2h。

延遲消息示例

首先,寫一個消費者,用于消費延遲消息:

再寫一個延遲消息的生產者,用于發送延遲消息:

運行生產者以后,就會發送一條延遲消息:

10秒鐘后,消費者收到的這條延遲消息:

延遲消息的原理分析

以下分析的RocketMQ源碼的版本號是4.7.1,版本不同源碼略有差別。

CommitLog

在org.apache.rocketmq.store.CommitLog中,針對延遲消息做了一些處理:

可以看到,每一個延遲消息的主題都被暫時更改為SCHEDULE_TOPIC_XXXX,并且根據延遲級別延遲消息變更了新的隊列Id。接下來,處理延遲消息的就是org.apache.rocketmq.store.schedule.ScheduleMessageService。

ScheduleMessageService

ScheduleMessageService是由org.apache.rocketmq.store.DefaultMessageStore進行初始化的,初始化包括構造對象和調用load方法。最后,再執行ScheduleMessageService的start方法:

遍歷所有延遲級別,根據延遲級別獲得對應隊列的偏移量,如果偏移量不存在,則設置為0。然后為每個延遲級別創建定時任務,第一次啟動任務延遲為1秒,第二次及以后的啟動任務延遲才是延遲級別相應的延遲時間。

然后,又創建了一個定時任務,用于持久化每個隊列消費的偏移量。持久化的頻率由flushDelayOffsetInterval屬性進行配置,默認為10秒。

定時任務

ScheduleMessageService的start方法執行之后,每個延遲級別都創建自己的定時任務,這里的定時任務的具體實現就在DeliverDelayedMessageTimerTask類之中,它核心代碼是executeOnTimeup方法之中,我們來看一下主要部分:

如果沒有獲取到對應的消息隊列,則在DELAY_FOR_A_WHILE(默認為100)毫秒后再執行任務。如果獲取到了,就繼續執行下面操作:

如果沒有獲取到有效消息,則在DELAY_FOR_A_WHILE(默認為100)毫秒后再執行任務。如果獲取到了,就繼續執行下面操作:

如果當前消息不到消費的時間,則在countdown毫秒后再執行任務。如果到消費的時間,就繼續執行下面操作:

如果獲取到消息,則繼續執行下面操作:

清除了消息的延遲級別,并且恢復了真正的消息主題和隊列Id,重新把消息發送到真正的消息隊列上以后,消費者就可以立即消費了。

總結

經過以上對源碼的分析,可以總結出延遲消息的實現步驟:

如果消息的延遲級別大于0,則表示該消息為延遲消息,修改該消息的主題為SCHEDULE_TOPIC_XXXX,隊列Id為延遲級別減1。消息進入SCHEDULE_TOPIC_XXXX的隊列中。定時任務根據上次拉取的偏移量不斷從隊列中取出所有消息。根據消息的物理偏移量和大小再次獲取消息。根據消息屬性重新創建消息,清除延遲級別,恢復原主題和隊列Id。重新發送消息到原主題的隊列中,供消費者進行消費。

java毫秒,rocketmq延時隊列實現原理