掃二維碼與項(xiàng)目經(jīng)理溝通
我們在微信上24小時(shí)期待你的聲音
解答本文疑問/技術(shù)咨詢/運(yùn)營咨詢/技術(shù)建議/互聯(lián)網(wǎng)交流
我們知道RocketMQ主要分為消息 生產(chǎn)、存儲(chǔ)(消息堆積)、消費(fèi) 三大塊領(lǐng)域。

前面已經(jīng)介紹了 生產(chǎn)消息、存儲(chǔ)消息 兩大塊內(nèi)容,那接下來,我們白話一下RocketMQ是如何消費(fèi)消息的,揭秘消息消費(fèi)全過程。
注意,如果白話中不小心提到相關(guān)代碼配置與類名,請參考RocketMQ 4.9.4版本
消費(fèi)者與消費(fèi)組、訂閱關(guān)系
消息消費(fèi)以 組 的模式開展。每個(gè)消費(fèi)組ConsumerGroup可以包含多個(gè)消費(fèi)者Consumer,并且可以訂閱多個(gè)主題Topic。
如果多個(gè)消費(fèi)者設(shè)置了相同的ConsumerGroup,我們認(rèn)為這些消費(fèi)者在同一個(gè)消費(fèi)組ConsumerGroup內(nèi)。
訂閱關(guān)系Subscription由消費(fèi)者組ConsumerGroup動(dòng)態(tài)注冊到服務(wù)端系統(tǒng),并在后續(xù)的消息傳輸中按照訂閱關(guān)系中的過濾規(guī)則進(jìn)行 消息過濾與匹配。
原則:
消費(fèi)組之間有兩種消費(fèi)模式:「集群模式」和「廣播模式」。
在「集群模式」下,同一主題下的消息只能被消費(fèi)組內(nèi)的某一個(gè)消費(fèi)者處理,一條消息會(huì)被 1 個(gè)消費(fèi)組內(nèi)的 N 個(gè)消費(fèi)者消費(fèi) 1 次。
在「廣播模式」下,同一主題下的消息將會(huì)被消費(fèi)組內(nèi)的所有消費(fèi)者處理一次,一條消息會(huì)被 1 個(gè)消費(fèi)組內(nèi)的 N 個(gè)消費(fèi)者消費(fèi) N 次。
如果消息消費(fèi)是「集群模式」,那么消息進(jìn)度保存在Broker上; 如果是「廣播模式」,那么消息消費(fèi)進(jìn)度存儲(chǔ)在Consumer端本地。
整體流程包括:
消息服務(wù)器與消費(fèi)者之間有兩種消息傳送方式:「推模式」和「拉模式」。
「拉模式」是消費(fèi)者主動(dòng)向消息服務(wù)器請求拉取消息。「推模式」是消息到達(dá)消息服務(wù)器后,由服務(wù)器主動(dòng)推送給消息消費(fèi)者。
在 RocketMQ 中,Consumer端的兩種消費(fèi)模式(Push/Pull)底層其實(shí)都是基于「拉模式」來獲取消息的。
具體實(shí)現(xiàn)方式是,消息拉取線程從服務(wù)器 拉取 一批消息后,將其提交給消息消費(fèi)線程池,并立即繼續(xù)向服務(wù)器嘗試?yán)∠?,以保持消息的連續(xù)性。
那如果拉取消息時(shí),Broker端暫時(shí)沒有新消息可以返回怎么辦?會(huì)一直無腦發(fā)送拉取請求嗎?
嗯,一定不會(huì)啦。
RocketMQ默認(rèn)會(huì)開啟「長輪詢機(jī)制」,這個(gè)機(jī)制能夠平衡 輪詢壓力 與 新消息的實(shí)時(shí)性 :
這就需要聊一聊消息消費(fèi)的「負(fù)載均衡機(jī)制」了。
注意,RocketMQ 5.x版本,對「推模式」底層增加了一種「Pop模式」的實(shí)現(xiàn)。Pop和Pull區(qū)別在于,Pop消費(fèi)的重平衡是在 Broker 端做的,而之前的 Pull 消費(fèi)都是由客戶端完成重平衡。本文還是介紹4.x版本。
消費(fèi)端的負(fù)載均衡是指將Broker端中多個(gè)隊(duì)列queue按照某種算法分配給同一個(gè)消費(fèi)組中的不同消費(fèi)者,負(fù)載均衡是客戶端開始消費(fèi)的起點(diǎn)。
注意,從RocketMQ服務(wù)端5.0版本開始額外支持了「消息粒度」的負(fù)載均衡策略,4.x/3.x版本僅支持「隊(duì)列粒度」的負(fù)載均衡策略。本文只介紹4.x的「隊(duì)列粒度」的。
RocketMQ「隊(duì)列粒度」的負(fù)載均衡的核心設(shè)計(jì)理念是:
負(fù)載均衡基本流程:
特別注意,無論是消息粒度負(fù)載均衡策略還是隊(duì)列粒度負(fù)載均衡策略,在消費(fèi)者上線或下線、服務(wù)端擴(kuò)縮容等場景下,都會(huì)觸發(fā)短暫的重新負(fù)載均衡動(dòng)作,可能會(huì)存在短暫的負(fù)載不一致情況,出現(xiàn)少量消息重復(fù)的現(xiàn)象。
因此,需要在下游消費(fèi)邏輯中做好消息「冪等去重」處理。
消息消費(fèi),主要關(guān)注兩個(gè)事情:
怎么保證消息消費(fèi)不丟失?
其實(shí)思路是比較直接的,就是 「消息確認(rèn)機(jī)制」和「失敗重試機(jī)制」。
消費(fèi)者從RocketMQ拉取消息后,需要返回"CONSUME_SUCCESS"來表示業(yè)務(wù)方已經(jīng)正常完成消費(fèi)。只有返回"CONSUME_SUCCESS"才算作消費(fèi)完成。這就是消費(fèi)時(shí)的「消息確認(rèn)機(jī)制」。
如果返回"CONSUME_LATER",則會(huì)按照不同的消息延遲級別進(jìn)行再次消費(fèi),延遲級別從秒到小時(shí)不等,最長延遲時(shí)間為2個(gè)小時(shí)后再次嘗試消費(fèi)。這就是消費(fèi)時(shí)的「失敗重試機(jī)制」。
重試消息會(huì)被存入名為 "%RETRY%+消費(fèi)組名稱" 的Topic中,原始主題Topic會(huì)存入屬性中。然后會(huì)基于定時(shí)任務(wù)機(jī)制,在到期時(shí)將任務(wù)再次拉取出來。
注意,從重試Topic的名稱我們可以了解到,RocketMQ消息重試是以消費(fèi)組為單位,而不是
Topic。
另外,RocketMQ跟kafka不同的是,天然支持了 「死信隊(duì)列機(jī)制」。
如果在嘗試消費(fèi)的過程中達(dá)到了最大重試次數(shù)(通常為16次),仍然無法成功消費(fèi),則消息將被發(fā)送到死信隊(duì)列,以確保消息存儲(chǔ)的可靠性。后續(xù)業(yè)務(wù)可以根據(jù)死信隊(duì)列,來做相關(guān)補(bǔ)償措施。
怎么保證消息消費(fèi)不重復(fù)?
其實(shí)思路也很直接,就是不保證不重復(fù)。
所有消息隊(duì)列的設(shè)計(jì),都是不保證消息消費(fèi)不重復(fù)的。所以使用消息隊(duì)列時(shí),要特別注意,如果有唯一性要求,必須做好消費(fèi)端的「冪等設(shè)計(jì)」。

我們在微信上24小時(shí)期待你的聲音
解答本文疑問/技術(shù)咨詢/運(yùn)營咨詢/技術(shù)建議/互聯(lián)網(wǎng)交流