掃二維碼與項目經(jīng)理溝通
我們在微信上24小時期待你的聲音
解答本文疑問/技術(shù)咨詢/運營咨詢/技術(shù)建議/互聯(lián)網(wǎng)交流
作者:Java互聯(lián)網(wǎng)架構(gòu) 2019-12-10 09:08:29
開源
分布式 我們實現(xiàn)本地延時比較簡單,直接使用Java中現(xiàn)成的即可,那我們分布式消息隊列的實現(xiàn)有哪些難點呢?

背景
開源版的RocketMQ只提供了18個層級的消息隊列延時,這個功能在開源版中顯得特別雞肋,但是在阿里云中的RocketMQ卻提供了支持40天之內(nèi)任意秒級延時隊列,果然有些功能你只能充錢才能擁有。當然你或許想換一個開源的消息隊列,在開源社區(qū)中消息隊列延時消息很多都沒有被支持比如:RabbitMQ,Kafka等,都只能通過一些特殊方法才能完成延時的功能。為什么這么多都沒有實現(xiàn)這個功能呢?是因為技術(shù)難度比較復(fù)雜嗎?接下來我們分析一下如何才能實現(xiàn)一個延時消息。
本地延時
在實現(xiàn)分布式消息隊列的延時消息之前,我們想想我們平時是如何在自己的應(yīng)用程序上實現(xiàn)一些延時功能的?在Java中可以通過下面的方式來完成我們延時功能:
分布式消息隊列延時
我們實現(xiàn)本地延時比較簡單,直接使用Java中現(xiàn)成的即可,那我們分布式消息隊列的實現(xiàn)有哪些難點呢?
有很多同學(xué)首先會想到我們實現(xiàn)分布式消息隊列的延時任務(wù),可不可以直接使用本地的那一套,用ScheduledThreadPoolExecutor,Timer,當然這是可以的,前提是你的消息量很小,但是我們分布式消息隊列往往都是企業(yè)級別的中間件,數(shù)據(jù)量都是非常的大,那么我們純內(nèi)存的方案肯定是行不通的。所以我們就有了下面這幾個方案來解決我們這個問題。
數(shù)據(jù)庫
數(shù)據(jù)庫一般來說是我們很容易想到的一個辦法,我們通??梢越⑾旅孢@樣一個表:
- CREATE TABLE `delay_message` (
- `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
- `excute_time` bigint(16) DEFAULT NULL COMMENT '執(zhí)行時間,ms級別',
- `body` varchar(4096) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '消息體',
- PRIMARY KEY (`id`),
- KEY `time_index` (`excute_time`)
- ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
這個表中我們使用excute_time代表我們真實的執(zhí)行時間,并且對其建立索引,然后在我們的消息服務(wù)中,啟動一個定時任務(wù),定時從數(shù)據(jù)庫中掃描已經(jīng)可以執(zhí)行的消息,然后開始執(zhí)行,具體流程如下面所示:
使用數(shù)據(jù)庫的方法是一個比較原始的方法,在沒有延時消息這個概念之前,要做一個訂單多少分鐘過期的這種功能,通常使用這個方法去完成。而這個方法通常也比較局限于我們單個業(yè)務(wù),如果想擴展為我們企業(yè)級的一個中間件的話是不行的,因為mysql由于BTree的特性,會隨著維護二級索引的開銷越來越大,導(dǎo)致寫入會越來越慢,所以這個方案通常不會被考慮。
RocksDB/LevelDB
我們之前介紹RocketMQ在開源版本中只實現(xiàn)了18個Level的延時消息,但是有很多公司基于RocketMQ做了自己的一套支持任意時間的延時消息,在美團內(nèi)部封裝了RocketMQ使用LevelDB做了對延時消息的封裝,在滴滴開源的DDMQ中,使用了RocksDB對RocketMQ的延時消息部分進行了封裝。
其原理基本和Mysql類似,如下圖所示:
為什么同樣是數(shù)據(jù)庫RocksDB會比Mysql更加合適呢?因為RocksDB的特性是LSM樹,其使用場景適用于大量寫入,和消息隊列的場景更加契合,所以這個也是滴滴和美團選擇其作為延時消息封裝的存儲介質(zhì)。
3.2 時間輪+磁盤存儲
再說時間輪之前,讓我們再次回到我們的實現(xiàn)本地延時的時候使用的ScheduledThreadPoolExecutor還有Timer,他們都是使用的優(yōu)先級隊列完成的,優(yōu)先級隊列本質(zhì)上也就是堆結(jié)構(gòu),堆結(jié)構(gòu)的插入的時間復(fù)雜度是O(LogN),如果未來我們的內(nèi)存可以做到無限,我們使用使用優(yōu)先級隊列去做延時消息的存儲,但是隨著消息的增多,我們的插入消息的效率也會越來越低,那么怎么才能讓我們的插入消息的效率不隨著消息的增多而變低呢?答案就是時間輪。
什么是時間輪呢?其實我們可以簡單的將其看做是一個多維數(shù)組。在很多框架中都使用了時間輪來做一些定時的任務(wù),用來替代我們的Timer,比如我之前講過的有關(guān)本地緩存Caffeine一篇文章,在Caffeine中是一個二層時間輪,也就是二維數(shù)組,其一維的數(shù)據(jù)表示較大的時間維度比如,秒,分,時,天等,其二維的數(shù)據(jù)表示該時間維度較小的時間維度,比如秒內(nèi)的某個區(qū)間段。當定位到一個TimeWhile[i][j]之后,其數(shù)據(jù)結(jié)構(gòu)其實是一個鏈表,記錄著我們的Node。在Caffeine利用時間輪記錄我們在某個時間過期的數(shù)據(jù),然后去處理。
由于時間輪是一個數(shù)組的結(jié)構(gòu),那么其插入復(fù)雜度是O(1)。我們解決了效率之后,但是我們的內(nèi)存依舊不是無限的,我們時間輪如何使用呢?答案當然就是磁盤,在去哪兒開源的QMQ中已經(jīng)實現(xiàn)了時間輪+磁盤存儲,這里為了方便描述我將其轉(zhuǎn)化為RocketMQ中的結(jié)構(gòu)來進行講解,實現(xiàn)圖如下:
時間輪+磁盤存儲我個人覺得比上面的RocksDB要更加正統(tǒng)一點,不依賴其他的中間件就可以完成,可用性自然也就更高,當然阿里云的RocketMQ具體怎么實現(xiàn)的這個兩種方案都有可能。
3.3 redis
在社區(qū)中也有很多公司使用的Redis做的延時消息,在Redis中有一個數(shù)據(jù)結(jié)構(gòu)是Zest,也就是有序集合,他可以實現(xiàn)類似我們的優(yōu)先級隊列的功能,同樣的他也是堆結(jié)構(gòu),所以插入算法復(fù)雜度依然是O(logN),但是由于Redis足夠快,所以這一塊可以忽略。(這塊沒有做對比的基準測試,只是猜測)。有同學(xué)會問,redis不是純內(nèi)存的k,v嗎,同樣的應(yīng)該也會受到內(nèi)存限制啊,為什么還會選擇他呢?
其實在這個場景中,Redis是很容易水平擴展的當一個Redis內(nèi)存不夠,這里可以使用兩個甚至更多,來滿足我們的需要,redis延時消息的原理圖(原圖出自:https://www.cnblogs.com/lylife/p/7881950.html)如下:
我們怎么才能知道Delayed Queue中的消息到期了呢?這里有兩種方法:
總結(jié)
本文介紹了三種方式實現(xiàn)分布式延時消息,希望能在你實現(xiàn)自己的延遲消息的時候提供一點思路。總的來說可能前兩種方法來說適用面更加廣一點,畢竟在RocketMQ這些大型的消息隊列中間件,還有一些其他的集成功能,比如順序消息,事務(wù)消息等,延時消息可能更加傾向于是分布式消息隊列中的一個功能,而不是作為一個獨立的組件存在。當然其中還有一些細節(jié)并沒有一一介紹,具體細節(jié)可以去參考QMQ和DDMQ的源碼。

我們在微信上24小時期待你的聲音
解答本文疑問/技術(shù)咨詢/運營咨詢/技術(shù)建議/互聯(lián)網(wǎng)交流