掃二維碼與項(xiàng)目經(jīng)理溝通
我們?cè)谖⑿派?4小時(shí)期待你的聲音
解答本文疑問(wèn)/技術(shù)咨詢/運(yùn)營(yíng)咨詢/技術(shù)建議/互聯(lián)網(wǎng)交流
延遲消息是實(shí)際開(kāi)發(fā)中一個(gè)非常有用的功能,本文第一部分從整體上介紹秒級(jí)精度延遲消息的實(shí)現(xiàn)思路,在第二部分結(jié)合RocketMQ的延遲消息實(shí)現(xiàn),進(jìn)行細(xì)致的講解,點(diǎn)出關(guān)鍵部分的源碼。第三步介紹延遲消息與消息重試的關(guān)系。

創(chuàng)新互聯(lián)成立以來(lái)不斷整合自身及行業(yè)資源、不斷突破觀念以使企業(yè)策略得到完善和成熟,建立了一套“以技術(shù)為基點(diǎn),以客戶需求中心、市場(chǎng)為導(dǎo)向”的快速反應(yīng)體系。對(duì)公司的主營(yíng)項(xiàng)目,如中高端企業(yè)網(wǎng)站企劃 / 設(shè)計(jì)、行業(yè) / 企業(yè)門戶設(shè)計(jì)推廣、行業(yè)門戶平臺(tái)運(yùn)營(yíng)、手機(jī)APP定制開(kāi)發(fā)、移動(dòng)網(wǎng)站建設(shè)、微信網(wǎng)站制作、軟件開(kāi)發(fā)、服務(wù)器托管等實(shí)行標(biāo)準(zhǔn)化操作,讓客戶可以直觀的預(yù)知到從創(chuàng)新互聯(lián)可以獲得的服務(wù)效果。
1 延遲消息介紹
基本概念:延遲消息是指生產(chǎn)者發(fā)送消息發(fā)送消息后,不能立刻被消費(fèi)者消費(fèi),需要等待指定的時(shí)間后才可以被消費(fèi)。
場(chǎng)景案例:用戶下了一個(gè)訂單之后,需要在指定時(shí)間內(nèi)(例如30分鐘)進(jìn)行支付,在到期之前可以發(fā)送一個(gè)消息提醒用戶進(jìn)行支付。
一些消息中間件的Broker端內(nèi)置了延遲消息支持的能力,如:
Broker端內(nèi)置延遲消息處理能力,核心實(shí)現(xiàn)思路都是一樣:將延遲消息通過(guò)一個(gè)臨時(shí)存儲(chǔ)進(jìn)行暫存,到期后才投遞到目標(biāo)Topic中。如下圖所示:
步驟說(shuō)明如下:
顯然,臨時(shí)存儲(chǔ)模塊和延遲服務(wù)模塊,是延遲消息實(shí)現(xiàn)的關(guān)鍵。上圖中,臨時(shí)存儲(chǔ)和延遲服務(wù)都是在Broker內(nèi)部實(shí)現(xiàn),對(duì)業(yè)務(wù)透明。
此外, 還有一些消息中間件原生并不支持延遲消息,如Kafka。在這種情況下,可以選擇對(duì)Kafka進(jìn)行改造,但是成本較大。另外一種方式是使用第三方臨時(shí)存儲(chǔ),并加一層代理。
第三方存儲(chǔ)選型要求:
對(duì)于第三方臨時(shí)存儲(chǔ),其需要滿足以下幾個(gè)特點(diǎn):
例如,滴滴開(kāi)源的消息中間件DDMQ,底層消息中間件的基礎(chǔ)上加了一層代理,獨(dú)立部署延遲服務(wù)模塊,使用rocksdb進(jìn)行臨時(shí)存儲(chǔ)。rocksdb是一個(gè)高性能的KV存儲(chǔ),并支持排序。
此時(shí)對(duì)于延遲消息的流轉(zhuǎn)如下圖所示:
說(shuō)明如下:
這種方式的好處是,因?yàn)閐elay service的延遲投遞能力是獨(dú)立于broker實(shí)現(xiàn)的,不需要對(duì)broker做任何改造,對(duì)于任意MQ類型都可以提供支持延遲消息的能力。例如DDMQ對(duì)RocketMQ、Kafka都提供了秒級(jí)精度的延遲消息投遞能力,但是Kafka本身并不支持延遲消息,而RocketMQ雖然支持延遲消息,但不支持秒級(jí)精度。
事實(shí)上,DDMQ還提供了很多其他功能,僅僅從延遲消息的角度,完全沒(méi)有必要使用這個(gè)proxy,直接將消息投遞到緩沖Topic中,之后通過(guò)delay service完成延遲投遞邏輯即可。
具體到delay service模塊的實(shí)現(xiàn)上,也有一些重要的細(xì)節(jié):
1. 為了保證服務(wù)的高可用,delay service也是需要部署多個(gè)節(jié)點(diǎn)。
2. 為了保證數(shù)據(jù)不丟失,每個(gè)delay service節(jié)點(diǎn)都需要消費(fèi)緩沖Topic中的全量數(shù)據(jù),保存到各自的持久化存儲(chǔ)中,這樣就有了多個(gè)備份,并需要以時(shí)間為key。不過(guò)因?yàn)槭歉髯岳?,并不能保證強(qiáng)一致。如果一定要強(qiáng)一致,那么delay service就不需要內(nèi)置存儲(chǔ)實(shí)現(xiàn),可以借助于其他支持強(qiáng)一致的存儲(chǔ)。
3. 為了避免重復(fù)投遞,delay service需要進(jìn)行選主,可以借助于zookeeper、etcd等實(shí)現(xiàn)。只有master可以通過(guò)生產(chǎn)者投遞到目標(biāo)Topic中,其他節(jié)點(diǎn)處于備用狀態(tài)。否則,如果每個(gè)節(jié)點(diǎn)進(jìn)行都投遞,那么延遲消息就會(huì)被投遞多次,造成消費(fèi)重復(fù)。
4. master要記錄自己當(dāng)前投遞到的時(shí)間到一個(gè)共享存儲(chǔ)中,如果master掛了,從slave節(jié)點(diǎn)中選出一個(gè)新的master節(jié)點(diǎn),從之前記錄時(shí)間繼續(xù)開(kāi)始投遞。
5. 延遲消息的取消:一些延遲消息在未到期之前,可能希望進(jìn)行取消。通常取消邏輯實(shí)現(xiàn)較為復(fù)雜,且不夠精確。對(duì)于那些已經(jīng)快要到期的消息,可能還未取消之前,已經(jīng)發(fā)送出去了,因此需要在消費(fèi)者端做檢查,才能萬(wàn)無(wú)一失。
2 RocketMQ中的延遲消息
開(kāi)源RocketMQ支持延遲消息,但是不支持秒級(jí)精度。默認(rèn)支持18個(gè)level的延遲消息,這是通過(guò)broker端的messageDelayLevel配置項(xiàng)確定的,如下:
- messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
Broker在啟動(dòng)時(shí),內(nèi)部會(huì)創(chuàng)建一個(gè)內(nèi)部主題:SCHEDULE_TOPIC_XXXX,根據(jù)延遲level的個(gè)數(shù),創(chuàng)建對(duì)應(yīng)數(shù)量的隊(duì)列,也就是說(shuō)18個(gè)level對(duì)應(yīng)了18個(gè)隊(duì)列。注意,這并不是說(shuō)這個(gè)內(nèi)部主題只會(huì)有18個(gè)隊(duì)列,因?yàn)锽roker通常是集群模式部署的,因此每個(gè)節(jié)點(diǎn)都有18個(gè)隊(duì)列。
延遲級(jí)別的值可以進(jìn)行修改,以滿足自己的業(yè)務(wù)需求,可以修改/添加新的level。例如:你想支持2天的延遲,修改最后一個(gè)level的值為2d,這個(gè)時(shí)候依然是18個(gè)level;也可以增加一個(gè)2d,這個(gè)時(shí)候總共就有19個(gè)level。
可以看到這里并不支持秒級(jí)精度,按照《rocketmq developer guide》中的說(shuō)法,是為了避免在broker對(duì)消息進(jìn)行排序,造成性能影響。不過(guò)筆者考慮,之所以不支持,更多應(yīng)該是商業(yè)上的考慮。
生產(chǎn)者發(fā)送延遲消息:
生產(chǎn)者在發(fā)送延遲消息非常簡(jiǎn)單,只需要設(shè)置一個(gè)延遲級(jí)別即可,注意不是具體的延遲時(shí)間,如:
- Message msg=new Message();
- msg.setTopic("TopicA");
- msg.setTags("Tag");
- msg.setBody("this is a delay message".getBytes());
- //設(shè)置延遲level為5,對(duì)應(yīng)延遲1分鐘
- msg.setDelayTimeLevel(5);
- producer.send(msg);
如果設(shè)置的延遲level超過(guò)最大值,那么將會(huì)重置最最大值。
Broker端存儲(chǔ)延遲消息:
延遲消息在RocketMQ Broker端的流轉(zhuǎn)如下圖所示:
可以看到,總共有6個(gè)步驟,下面會(huì)對(duì)這6個(gè)步驟進(jìn)行詳細(xì)的講解:
第一步:修改消息Topic名稱和隊(duì)列信息
RocketMQ Broker端在存儲(chǔ)生產(chǎn)者寫入的消息時(shí),首先都會(huì)將其寫入到CommitLog中。之后根據(jù)消息中的Topic信息和隊(duì)列信息,將其轉(zhuǎn)發(fā)到目標(biāo)Topic的指定隊(duì)列(ConsumeQueue)中。
由于消息一旦存儲(chǔ)到ConsumeQueue中,消費(fèi)者就能消費(fèi)到,而延遲消息不能被立即消費(fèi),所以這里將Topic的名稱修改為SCHEDULE_TOPIC_XXXX,并根據(jù)延遲級(jí)別確定要投遞到哪個(gè)隊(duì)列下。
同時(shí),還會(huì)將消息原來(lái)要發(fā)送到的目標(biāo)Topic和隊(duì)列信息存儲(chǔ)到消息的屬性中。相關(guān)源碼如下所示:
org.apache.rocketmq.store.CommitLog#putMessage
第二步:轉(zhuǎn)發(fā)消息到延遲主題的CosumeQueue中
CommitLog中的消息轉(zhuǎn)發(fā)到CosumeQueue中是異步進(jìn)行的。在轉(zhuǎn)發(fā)過(guò)程中,會(huì)對(duì)延遲消息進(jìn)行特殊處理,主要是計(jì)算這條延遲消息需要在什么時(shí)候進(jìn)行投遞。
- 投遞時(shí)間=消息存儲(chǔ)時(shí)間(storeTimestamp) + 延遲級(jí)別對(duì)應(yīng)的時(shí)間
需要注意的是,會(huì)將計(jì)算出的投遞時(shí)間當(dāng)做消息Tag的哈希值存儲(chǔ)到CosumeQueue中,CosumeQueue單個(gè)存儲(chǔ)單元組成結(jié)構(gòu)如下圖所示:
其中:
相關(guān)源碼參見(jiàn):
CommitLog#checkMessageAndReturnSize
第三步:延遲服務(wù)消費(fèi)SCHEDULE_TOPIC_XXXX消息
Broker內(nèi)部有一個(gè)ScheduleMessageService類,其充當(dāng)延遲服務(wù),消費(fèi)SCHEDULE_TOPIC_XXXX中的消息,并投遞到目標(biāo)Topic中。
ScheduleMessageService在啟動(dòng)時(shí),其會(huì)創(chuàng)建一個(gè)定時(shí)器Timer,并根據(jù)延遲級(jí)別的個(gè)數(shù),啟動(dòng)對(duì)應(yīng)數(shù)量的TimerTask,每個(gè)TimerTask負(fù)責(zé)一個(gè)延遲級(jí)別的消費(fèi)與投遞。
相關(guān)源碼如下所示:
ScheduleMessageService#start
需要注意的是,每個(gè)TimeTask在檢查消息是否到期時(shí),首先檢查對(duì)應(yīng)隊(duì)列中尚未投遞第一條消息,如果這條消息沒(méi)到期,那么之后的消息都不會(huì)檢查。如果到期了,則進(jìn)行投遞,并檢查之后的消息是否到期。
第四步:將信息重新存儲(chǔ)到CommitLog中
在將消息到期后,需要投遞到目標(biāo)Topic。由于在第一步已經(jīng)記錄了原來(lái)的Topic和隊(duì)列信息,因此這里重新設(shè)置,再存儲(chǔ)到CommitLog即可。此外,由于之前Message Tag HashCode字段存儲(chǔ)的是消息的投遞時(shí)間,這里需要重新計(jì)算tag的哈希值后再存儲(chǔ)。
源碼參見(jiàn):DeliverDelayedMessageTimerTask的messageTimeup方法。
第五步:將消息投遞到目標(biāo)Topic中
這一步與第二步類似,不過(guò)由于消息的Topic名稱已經(jīng)改為了目標(biāo)Topic。因此消息會(huì)直接投遞到目標(biāo)Topic的ConsumeQueue中,之后消費(fèi)者即消費(fèi)到這條消息。
3 延遲消息與消費(fèi)重試的關(guān)系
RocketMQ提供了消息重試的能力,在并發(fā)模式消費(fèi)消費(fèi)失敗的情況下,可以返回一個(gè)枚舉值RECONSUME_LATER,那么消息之后將會(huì)進(jìn)行重試。如:
- consumer.registerMessageListener(new MessageListenerConcurrently() {
- @Override
- public ConsumeConcurrentlyStatus consumeMessage(List
msgs, - ConsumeConcurrentlyContext context) {
- //處理消息,失敗,返回RECONSUME_LATER,進(jìn)行重試
- return ConsumeConcurrentlyStatus.RECONSUME_LATER;
- }
- });
重試默認(rèn)會(huì)進(jìn)行重試16次。使用過(guò)RocketMQ消息重試功能的用戶,可能看到過(guò)以下這張圖:
| 第幾次重試 | 與上次重試的間隔時(shí)間 | 第幾次重試 | 與上次重試的間隔時(shí)間 |
| 1 | 10 秒 | 9 | 7 分鐘 |
| 2 | 30 秒 | 10 | 8 分鐘 |
| 3 | 1 分鐘 | 11 | 9 分鐘 |
| 4 | 2 分鐘 | 12 | 10 分鐘 |
| 5 | 3 分鐘 | 13 | 20 分鐘 |
| 6 | 4 分鐘 | 14 | 30 分鐘 |
| 7 | 5 分鐘 | 15 | 1 小時(shí) |
| 8 | 6 分鐘 | 16 | 2 小時(shí) |
細(xì)心地的讀者發(fā)現(xiàn)了,消息重試的16個(gè)級(jí)別,實(shí)際上是把延遲消息18個(gè)級(jí)別的前兩個(gè)level去掉了。事實(shí)上,RocketMQ的消息重試也是基于延遲消息來(lái)完成的。在消息消費(fèi)失敗的情況下,將其重新當(dāng)做延遲消息投遞回Broker。
在投遞回去時(shí),會(huì)跳過(guò)前兩個(gè)level,因此只重試16次。當(dāng)然,消息重試還有一些其他的設(shè)計(jì)邏輯,在之后的文章將會(huì)進(jìn)行分析。

我們?cè)谖⑿派?4小時(shí)期待你的聲音
解答本文疑問(wèn)/技術(shù)咨詢/運(yùn)營(yíng)咨詢/技術(shù)建議/互聯(lián)網(wǎng)交流