掃二維碼與項目經(jīng)理溝通
我們在微信上24小時期待你的聲音
解答本文疑問/技術(shù)咨詢/運營咨詢/技術(shù)建議/互聯(lián)網(wǎng)交流
原創(chuàng)
作者:凌晶 2020-03-17 07:41:50
開發(fā)
架構(gòu)
Kafka apache Kafka 是一個快速、可擴展的、高吞吐的、可容錯的分布式“發(fā)布-訂閱”消息系統(tǒng), 使用 Scala 與 Java 語言編寫,能夠?qū)⑾囊粋€端點傳遞到另一個端點。

創(chuàng)新互聯(lián)堅持“要么做到,要么別承諾”的工作理念,服務(wù)領(lǐng)域包括:成都網(wǎng)站設(shè)計、網(wǎng)站制作、企業(yè)官網(wǎng)、英文網(wǎng)站、手機端網(wǎng)站、網(wǎng)站推廣等服務(wù),滿足客戶于互聯(lián)網(wǎng)時代的灤南網(wǎng)站設(shè)計、移動媒體設(shè)計的需求,幫助企業(yè)找到有效的互聯(lián)網(wǎng)解決方案。努力成為您成熟可靠的網(wǎng)絡(luò)建設(shè)合作伙伴!
【51CTO.com原創(chuàng)稿件】Apache Kafka 是一個快速、可擴展的、高吞吐的、可容錯的分布式“發(fā)布-訂閱”消息系統(tǒng), 使用 Scala 與 Java 語言編寫,能夠?qū)⑾囊粋€端點傳遞到另一個端點。
較之傳統(tǒng)的消息中間件(例如 ActiveMQ、RabbitMQ),Kafka 具有高吞吐量、內(nèi)置分區(qū)、支持消息副本和高容錯的特性,非常適合大規(guī)模消息處理應(yīng)用程序。
Kafka 官網(wǎng):http://kafka.apache.org/
Kafka 主要設(shè)計目標(biāo)如下:
Kafka 通常用于兩大類應(yīng)用程序:
要了解 Kafka 如何執(zhí)行這些操作,讓我們從頭開始深入研究 Kafka 的功能。
首先幾個概念:
Kafka 架構(gòu)體系如下圖:
Kafka 的應(yīng)用場景非常多, 下面我們就來舉幾個我們最常見的場景:
①用戶的活動跟蹤:用戶在網(wǎng)站的不同活動消息發(fā)布到不同的主題中心,然后可以對這些消息進行實時監(jiān)測、實時處理。
當(dāng)然,也可以加載到 Hadoop 或離線處理數(shù)據(jù)倉庫,對用戶進行畫像。像淘寶、天貓、京東這些大型電商平臺,用戶的所有活動都要進行追蹤的。
②日志收集如下圖:
③限流削峰如下圖:
④高吞吐率實現(xiàn):Kafka 與其他 MQ 相比,最大的特點就是高吞吐率。為了增加存儲能力,Kafka 將所有的消息都寫入到了低速大容量的硬盤。
按理說,這將導(dǎo)致性能損失,但實際上,Kafka 仍然可以保持超高的吞吐率,并且其性能并未受到影響。
其主要采用如下方式實現(xiàn)了高吞吐率:
Kafka的優(yōu)點如下:
①解耦:在項目啟動之初來預(yù)測將來項目會碰到什么需求,是極其困難的。
消息系統(tǒng)在處理過程中間插入了一個隱含的、基于數(shù)據(jù)的接口層,兩邊的處理過程都要實現(xiàn)這一接口。
這允許你獨立的擴展或修改兩邊的處理過程,只要確保它們遵守同樣的接口約束。
②冗余(副本):有些情況下,處理數(shù)據(jù)的過程會失敗。除非數(shù)據(jù)被持久化,否則將造成丟失。
消息隊列把數(shù)據(jù)進行持久化直到它們已經(jīng)被完全處理,通過這一方式規(guī)避了數(shù)據(jù)丟失風(fēng)險。
許多消息隊列所采用的"插入-獲取-刪除"范式中,在把一個消息從隊列中刪除之前,需要你的處理系統(tǒng)明確的指出該消息已經(jīng)被處理完畢,從而確保你的數(shù)據(jù)被安全的保存直到你使用完畢。
③擴展性:因為消息隊列解耦了你的處理過程,所以增大消息入隊和處理的頻率是很容易的,只要另外增加處理過程即可。不需要改變代碼、不需要調(diào)節(jié)參數(shù)。擴展就像調(diào)大電力按鈕一樣簡單。
④靈活性&峰值處理能力:在訪問量劇增的情況下,應(yīng)用仍然需要繼續(xù)發(fā)揮作用,但是這樣的突發(fā)流量并不常見;如果為以能處理這類峰值訪問為標(biāo)準(zhǔn)來投入資源隨時待命無疑是巨大的浪費。
使用消息隊列能夠使關(guān)鍵組件頂住突發(fā)的訪問壓力,而不會因為突發(fā)的超負(fù)荷的請求而完全崩潰。
⑤可恢復(fù)性:系統(tǒng)的一部分組件失效時,不會影響到整個系統(tǒng)。消息隊列降低了進程間的耦合度,所以即使一個處理消息的進程掛掉,加入隊列中的消息仍然可以在系統(tǒng)恢復(fù)后被處理。
⑥順序保證:在大多使用場景下,數(shù)據(jù)處理的順序都很重要。大部分消息隊列本來就是排序的,并且能保證數(shù)據(jù)會按照特定的順序來處理。Kafka 保證一個 Partition 內(nèi)的消息的有序性。
⑦緩沖:在任何重要的系統(tǒng)中,都會有需要不同的處理時間的元素。例如,加載一張圖片比應(yīng)用過濾器花費更少的時間。
消息隊列通過一個緩沖層來幫助任務(wù)最高效率的執(zhí)行,寫入隊列的處理會盡可能的快速。該緩沖有助于控制和優(yōu)化數(shù)據(jù)流經(jīng)過系統(tǒng)的速度。
⑧異步通信:很多時候,用戶不想也不需要立即處理消息。消息隊列提供了異步處理機制,允許用戶把一個消息放入隊列,但并不立即處理它。想向隊列中放入多少消息就放多少,然后在需要的時候再去處理它們。
Kafka 于其他 MQ 對比如下:
①RabbitMQ:RabbitMQ 是使用 Erlang 編寫的一個開源的消息隊列,本身支持很多的協(xié)議:AMQP,XMPP,SMTP,STOMP,也正因如此,它非常重量級,更適合于企業(yè)級的開發(fā)。
同時實現(xiàn)了 Broker 構(gòu)架,這意味著消息在發(fā)送給客戶端時先在中心隊列排隊。對路由,負(fù)載均衡或者數(shù)據(jù)持久化都有很好的支持。
②Redis:Redis 是一個基于 Key-Value 對的 NoSQL 數(shù)據(jù)庫,開發(fā)維護很活躍。
雖然它是一個 Key-Value 數(shù)據(jù)庫存儲系統(tǒng),但它本身支持 MQ 功能,所以完全可以當(dāng)做一個輕量級的隊列服務(wù)來使用。
對于 RabbitMQ 和 Redis 的入隊和出隊操作,各執(zhí)行 100 萬次,每 10 萬次記錄一次執(zhí)行時間。測試數(shù)據(jù)分為 128Bytes、512Bytes、1K 和 10K 四個不同大小的數(shù)據(jù)。
實驗表明:入隊時,當(dāng)數(shù)據(jù)比較小時 Redis 的性能要高于 RabbitMQ,而如果數(shù)據(jù)大小超過了 10K,Redis 則慢的無法忍受;出隊時,無論數(shù)據(jù)大小,Redis 都表現(xiàn)出非常好的性能,而 RabbitMQ 的出隊性能則遠(yuǎn)低于 Redis。
③ZeroMQ:ZeroMQ 號稱最快的消息隊列系統(tǒng),尤其針對大吞吐量的需求場景。
ZeroMQ 能夠?qū)崿F(xiàn) RabbitMQ 不擅長的高級/復(fù)雜的隊列,但是開發(fā)人員需要自己組合多種技術(shù)框架,技術(shù)上的復(fù)雜度是對這 MQ 能夠應(yīng)用成功的挑戰(zhàn)。
ZeroMQ 具有一個獨特的非中間件的模式,你不需要安裝和運行一個消息服務(wù)器或中間件,因為你的應(yīng)用程序?qū)缪葸@個服務(wù)器角色。
你只需要簡單的引用 ZeroMQ 程序庫,可以使用 NuGet 安裝,然后你就可以愉快的在應(yīng)用程序之間發(fā)送消息了。
但是 ZeroMQ 僅提供非持久性的隊列,也就是說如果宕機,數(shù)據(jù)將會丟失。其中,Twitter 的 Storm 0.9.0 以前的版本中默認(rèn)使用 ZeroMQ 作為數(shù)據(jù)流的傳輸(Storm 從 0.9 版本開始同時支持 ZeroMQ 和 Netty 作為傳輸模塊)。
④ActiveMQ:ActiveMQ 是 Apache 下的一個子項目。類似于 ZeroMQ,它能夠以代理人和點對點的技術(shù)實現(xiàn)隊列。同時類似于 RabbitMQ,它少量代碼就可以高效地實現(xiàn)高級應(yīng)用場景。
⑤Kafka/Jafka:Kafka 是 Apache 下的一個子項目,是一個高性能跨語言分布式發(fā)布/訂閱消息隊列系統(tǒng),而 Jafka 是在 Kafka 之上孵化而來的,即 Kafka 的一個升級版。
具有以下特性:
Kafka 通過 Hadoop 的并行加載機制統(tǒng)一了在線和離線的消息處理。Apache Kafka 相對于 ActiveMQ 是一個非常輕量級的消息系統(tǒng),除了性能非常好之外,還是一個工作良好的分布式系統(tǒng)。
Kafka的幾種重要角色如下:
①Kafka 作為存儲系統(tǒng):任何允許發(fā)布與使用無關(guān)的消息發(fā)布的消息隊列都有效地充當(dāng)了運行中消息的存儲系統(tǒng)。Kafka 的不同之處在于它是一個非常好的存儲系統(tǒng)。
寫入 Kafka 的數(shù)據(jù)將寫入磁盤并進行復(fù)制以實現(xiàn)容錯功能。Kafka 允許生產(chǎn)者等待確認(rèn),以便直到完全復(fù)制并確保即使寫入服務(wù)器失敗的情況下寫入也不會完成。
Kafka 的磁盤結(jié)構(gòu)可以很好地擴展使用-無論服務(wù)器上有 50KB 還是 50TB 的持久數(shù)據(jù),Kafka 都將執(zhí)行相同的操作。
由于認(rèn)真對待存儲并允許客戶端控制其讀取位置,因此您可以將 Kafka 視為一種專用于高性能,低延遲提交日志存儲,復(fù)制和傳播的專用分布式文件系統(tǒng)。
②Kafka 作為消息傳遞系統(tǒng):Kafka 的流概念與傳統(tǒng)的企業(yè)消息傳遞系統(tǒng)相比如何?
傳統(tǒng)上,消息傳遞具有兩種模型:排隊和發(fā)布訂閱。在隊列中,一組使用者可以從服務(wù)器中讀取內(nèi)容,并且每條記錄都將轉(zhuǎn)到其中一個。
在發(fā)布-訂閱記錄中廣播給所有消費者。這兩個模型中的每一個都有優(yōu)點和缺點。
排隊的優(yōu)勢在于,它允許您將數(shù)據(jù)處理劃分到多個使用者實例上,從而擴展處理量。
不幸的是,隊列不是多用戶的—一次進程讀取了丟失的數(shù)據(jù)。發(fā)布-訂閱允許您將數(shù)據(jù)廣播到多個進程,但是由于每條消息都傳遞給每個訂閱者,因此無法擴展處理。
Kafka 的消費者群體概念概括了這兩個概念。與隊列一樣,使用者組允許您將處理劃分為一組進程(使用者組的成員)。與發(fā)布訂閱一樣,Kafka 允許您將消息廣播到多個消費者組。
Kafka 模型的優(yōu)點在于,每個主題都具有這些屬性-可以擴展處理范圍,并且是多訂閱者,無需選擇其中一個。
與傳統(tǒng)的消息傳遞系統(tǒng)相比,Kafka 還具有更強的訂購保證。傳統(tǒng)隊列將記錄按順序保留在服務(wù)器上,如果多個使用者從隊列中消費,則服務(wù)器將按記錄的存儲順序分發(fā)記錄。
但是,盡管服務(wù)器按順序分發(fā)記錄,但是這些記錄是異步傳遞給使用者的,因此它們可能在不同的使用者上亂序到達(dá)。
這實際上意味著在并行使用的情況下會丟失記錄的順序。消息傳遞系統(tǒng)通常通過“專有使用者”的概念來解決此問題,該概念僅允許一個進程從隊列中使用,但是,這當(dāng)然意味著在處理中沒有并行性。
Kafka 做得更好,通過在主題內(nèi)具有并行性(即分區(qū))的概念,Kafka 能夠在用戶進程池中提供排序保證和負(fù)載均衡。
這是通過將主題中的分區(qū)分配給消費者組中的消費者來實現(xiàn)的,以便每個分區(qū)都由組中的一個消費者完全消費。
通過這樣做,我們確保使用者是該分區(qū)的唯一讀取器,并按順序使用數(shù)據(jù)。由于存在許多分區(qū),因此仍然可以平衡許多使用者實例上的負(fù)載。但是請注意,使用者組中的使用者實例不能超過分區(qū)。
③Kafka 用作流處理:僅讀取,寫入和存儲數(shù)據(jù)流是不夠的,目的是實現(xiàn)對流的實時處理。
在 Kafka 中,流處理器是指從輸入主題中獲取連續(xù)數(shù)據(jù)流,對該輸入進行一些處理并生成連續(xù)數(shù)據(jù)流以輸出主題的任何東西。
例如,零售應(yīng)用程序可以接受銷售和裝運的輸入流,并輸出根據(jù)此數(shù)據(jù)計算出的重新訂購和價格調(diào)整流。
可以直接使用生產(chǎn)者和消費者 API 進行簡單處理。但是,對于更復(fù)雜的轉(zhuǎn)換,Kafka 提供了完全集成的 Streams API。
這允許構(gòu)建執(zhí)行非重要處理的應(yīng)用程序,這些應(yīng)用程序計算流的聚合或?qū)⒘鬟B接在一起。
該功能有助于解決此類應(yīng)用程序所面臨的難題:處理無序數(shù)據(jù),在代碼更改時重新處理輸入,執(zhí)行狀態(tài)計算等。
流 API 建立在 Kafka 提供的核心原語之上:它使用生產(chǎn)者和使用者 API 進行輸入,使用 Kafka 進行狀態(tài)存儲,并使用相同的組機制來實現(xiàn)流處理器實例之間的容錯。
Kafka 中的關(guān)鍵術(shù)語解釋
Topic:主題。在 Kafka 中,使用一個類別屬性來劃分消息的所屬類,劃分消息的這個類稱為 Topic。Topic 相當(dāng)于消息的分類標(biāo)簽,是一個邏輯概念。
物理上不同 Topic 的消息分開存儲,邏輯上一個 Topic 的消息雖然保存于一個或多個 Broker 上但用戶只需指定消息的 Topic 即可生產(chǎn)或消費數(shù)據(jù)而不必關(guān)心數(shù)據(jù)存于何處。
Partition:分區(qū)。Topic 中的消息被分割為一個或多個 Partition,其是一個物理概念,對應(yīng)到系統(tǒng)上 就是一個或若干個目錄。Partition 內(nèi)部的消息是有序的,但 Partition 間的消息是無序的。
Segment 段。將 Partition 進一步細(xì)分為了若干的 Segment,每個 Segment 文件的大小相等。
Broker:Kafka 集群包含一個或多個服務(wù)器,每個服務(wù)器節(jié)點稱為一個 Broker。
Broker 存儲 Topic 的數(shù)據(jù)。如果某 Topic 有 N 個 Partition,集群有 N 個 Broker,那么每個 Broker 存儲該 Topic 的一個 Partition。
如果某 Topic 有 N 個 Partition,集群有(N+M)個 Broker,那么其中有 N 個 Broker 存儲該 Topic 的一個 Partition,剩下的 M 個 Broker 不存儲該 Topic 的 Partition 數(shù)據(jù)。
如果某 Topic 有 N 個 Partition,集群中 Broker 數(shù)目少于 N 個,那么一個 Broker 存儲該 Topic 的一個或多個 Partition。
在實際生產(chǎn)環(huán)境中,盡量避免這種情況的發(fā)生,這種情況容易導(dǎo)致 Kafka 集群數(shù)據(jù)不均衡。
Producer:生產(chǎn)者。即消息的發(fā)布者,生產(chǎn)者將數(shù)據(jù)發(fā)布到他們選擇的主題。
生產(chǎn)者負(fù)責(zé)選擇將哪個記錄分配給主題中的哪個分區(qū)。即:生產(chǎn)者生產(chǎn)的一條消息,會被寫入到某一個 Partition。
Consumer:消費者??梢詮?Broker 中讀取消息。一個消費者可以消費多個 Topic 的消息;一個消費者可以消費同一個 Topic 中的多個 Partition 中的消息;一個 Partiton 允許多個 Consumer 同時消費。
Consumer Group:Consumer Group 是 Kafka 提供的可擴展且具有容錯性的消費者機制。
組內(nèi)可以有多個消費者,它們共享一個公共的 ID,即 Group ID。組內(nèi)的所有消費者協(xié)調(diào)在一起來消費訂閱主題 的所有分區(qū)。
Kafka 保證同一個 Consumer Group 中只有一個 Consumer 會消費某條消息。
實際上,Kafka 保證的是穩(wěn)定狀態(tài)下每一個 Consumer 實例只會消費某一個或多個特定的 Partition,而某個 Partition 的數(shù)據(jù)只會被某一個特定的 Consumer 實例所消費。
下面我們用官網(wǎng)的一張圖, 來標(biāo)識 Consumer 數(shù)量和 Partition 數(shù)量的對應(yīng)關(guān)系。
由兩臺服務(wù)器組成的 Kafka 群集,其中包含四個帶有兩個使用者組的分區(qū)(P0-P3)。消費者組 A 有兩個消費者實例,組 B 有四個。
對于這個消費組, 以前一直搞不明白,我自己的總結(jié)是:Topic 中的 Partitoin 到 Group 是發(fā)布訂閱的通信方式。
即一條 Topic 的 Partition 的消息會被所有的 Group 消費,屬于一對多模式;Group 到 Consumer 是點對點通信方式,屬于一對一模式。
舉個例子:不使用 Group 的話,啟動 10 個 Consumer 消費一個 Topic,這 10 個 Consumer 都能得到 Topic 的所有數(shù)據(jù),相當(dāng)于這個 Topic 中的任一條消息被消費 10 次。
使用 Group 的話,連接時帶上 groupid,Topic 的消息會分發(fā)到 10 個 Consumer 上,每條消息只被消費 1 次。
Replizcas of partition:分區(qū)副本。副本是一個分區(qū)的備份,是為了防止消息丟失而創(chuàng)建的分區(qū)的備份。
Partition Leader:每個 Partition 有多個副本,其中有且僅有一個作為 Leader,Leader 是當(dāng)前負(fù)責(zé)消息讀寫 的 Partition。即所有讀寫操作只能發(fā)生于 Leader 分區(qū)上。
Partition Follower:所有 Follower 都需要從 Leader 同步消息,F(xiàn)ollower 與 Leader 始終保持消息同步。Leader 與 Follower 的關(guān)系是主備關(guān)系,而非主從關(guān)系。
ISR:
Offset:偏移量。每條消息都有一個當(dāng)前 Partition 下唯一的 64 字節(jié)的 Offset,它是相當(dāng)于當(dāng)前分區(qū)第一條消息的偏移量。
Broker Controller:Kafka集群的多個 Broker 中,有一個會被選舉 Controller,負(fù)責(zé)管理整個集群中 Partition 和 Replicas 的狀態(tài)。
只有 Broker Controller 會向 Zookeeper 中注冊 Watcher,其他 Broker 及分區(qū)無需注冊。即 Zookeeper 僅需監(jiān)聽 Broker Controller 的狀態(tài)變化即可。
HW 與 LEO:
我相信你看完上面的概念還是懵逼的,好吧!下面我們就用圖來形象話的表示兩者的關(guān)系吧:
Zookeeper:Zookeeper 負(fù)責(zé)維護和協(xié)調(diào) Broker,負(fù)責(zé) Broker Controller 的選舉。在 Kafka 0.9 之前版本,Offset 是由 ZK 負(fù)責(zé)管理的。
總結(jié):ZK 負(fù)責(zé) Controller 的選舉,Controller 負(fù)責(zé) Leader 的選舉。
Coordinator:一般指的是運行在每個 Broker 上的 Group Coordinator 進程,用于管理 Consumer Group 中的各個成員,主要用于 Offset 位移管理和 Rebalance。一個 Coordinator 可以同時管理多個消費者組。
Rebalance:當(dāng)消費者組中的數(shù)量發(fā)生變化,或者 Topic 中的 Partition 數(shù)量發(fā)生了變化時,Partition 的所有權(quán)會在消費者間轉(zhuǎn)移,即 Partition 會重新分配,這個過程稱為再均衡 Rebalance。
再均衡能夠給消費者組及 Broker 帶來高性能、高可用性和伸縮,但在再均衡期間消費者是無法讀取消息的,即整個 Broker 集群有小一段時間是不可用的。因此要避免不必要的再均衡。
Offset Commit:Consumer 從 Broker 中取一批消息寫入 Buffer 進行消費,在規(guī)定的時間內(nèi)消費完消息后,會自動將其消費消息的 Offset 提交給 Broker,以記錄下哪些消息是消費過的。當(dāng)然,若在時限內(nèi)沒有消費完畢,其是不會提交 Offset 的。
Kafka的工作原理和過程
①消息寫入算法
消息發(fā)送者將消息發(fā)送給 Broker, 并形成最終的可供消費者消費的 log,是已給比較復(fù)雜的過程:
②消息路由策略
在通過 API 方式發(fā)布消息時,生產(chǎn)者是以 Record 為消息進行發(fā)布的。
Record 中包含 Key 與 Value,Value 才是我們真正的消息本身,而 Key 用于路由消息所要存放的 Partition。
消息要寫入到哪個 Partition 并不是隨機的,而是有路由策略的:
③HW 截斷機制
如果 Partition Leader 接收到了新的消息, ISR 中其它 Follower 正在同步過程中,還未同步完畢時 leader 宕機。
此時就需要選舉出新的 Leader。若沒有 HW 截斷機制,將會導(dǎo)致 Partition 中 Leader 與 Follower 數(shù)據(jù)的不一致。
當(dāng)原 Leader 宕機后又恢復(fù)時,將其 LEO 回退到其宕機時的 HW,然后再與新的 Leader 進行數(shù)據(jù)同步,這樣就可以保證老 Leader 與新 Leader 中數(shù)據(jù)一致了,這種機制稱為 HW 截斷機制。
④消息發(fā)送的可靠性
生產(chǎn)者向 Kafka 發(fā)送消息時,可以選擇需要的可靠性級別。通過 request.required.acks 參數(shù)的值進行設(shè)置。
0 值:異步發(fā)送。生產(chǎn)者向 Kafka 發(fā)送消息而不需要 Kafka 反饋成功 Ack。該方式效率最高,但可靠性最低。
其可能會存在消息丟失的情況:
1 值:同步發(fā)送。生產(chǎn)者發(fā)送消息給 Kafka,Broker 的 Partition Leader 在收到消息后馬上發(fā)送成功 Ack(無需等等 ISR 中的 Follower 同步)。
生產(chǎn)者收到后知道消息發(fā)送成功,然后會再發(fā)送消息。如果一直未收到 Kafka 的 Ack,則生產(chǎn)者會認(rèn)為消息發(fā)送失敗,會重發(fā)消息。
該方式對于 Producer 來說,若沒有收到 Ack,一定可以確認(rèn)消息發(fā)送失敗了,然后可以重發(fā)。
但是,即使收到了 ACK,也不能保證消息一定就發(fā)送成功了。故,這種情況,也可能會發(fā)生消息丟失的情況。
-1 值:同步發(fā)送。生產(chǎn)者發(fā)送消息給 Kafka,Kafka 收到消息后要等到 ISR 列表中的所有副本都 同步消息完成后,才向生產(chǎn)者發(fā)送成功 Ack。
如果一直未收到 Kafka 的 Ack,則認(rèn)為消息發(fā)送 失敗,會自動重發(fā)消息。該方式會出現(xiàn)消息重復(fù)接收的情況。
⑤消費者消費過程解析
生產(chǎn)者將消息發(fā)送到 Topitc 中,消費者即可對其進行消費,其消費過程如下:
⑥Partition Leader 選舉范圍
當(dāng) Leader 宕機后,Broker Controller 會從 ISR 中挑選一個 Follower 成為新的 Leader。
如果 ISR 中沒有其他副本怎么辦?可以通過 unclean.leader.election.enable 的值來設(shè)置 Leader 選舉范圍。
False:必須等到 ISR 列表中所有的副本都活過來才進行新的選舉。該策略可靠性有保證,但可用性低。
True:在 ISR 列表中沒有副本的情況下,可以選擇任意一個沒有宕機的主機作為新的 Leader,該策略可用性高,但可靠性沒有保證。
⑦重復(fù)消費問題的解決方案
同一個 Consumer 重復(fù)消費:當(dāng) Consumer 由于消費能力低而引發(fā)了消費超時,則可能會形成重復(fù)消費。
在某數(shù)據(jù)剛好消費完畢,但是正準(zhǔn)備提交 Offset 時候,消費時間超時,則 Broker 認(rèn)為這條消息未消費成功。這時就會產(chǎn)生重復(fù)消費問題。其解決方案:延長 Offset 提交時間。
不同的 Consumer 重復(fù)消費:當(dāng) Consumer 消費了消息,但還沒有提交 Offset 時宕機,則這些已經(jīng)被消費過的消息會被重復(fù)消費。其解決方案:將自動提交改為手動提交。
⑧從架構(gòu)設(shè)計上解決 Kafka 重復(fù)消費的問題
我們在設(shè)計程序的時候,比如考慮到網(wǎng)絡(luò)故障等一些異常的情況,我們都會設(shè)置消息的重試次數(shù),可能還有其他可能出現(xiàn)消息重復(fù),那我們應(yīng)該如何解決呢?下面提供三個方案:
方案一:保存并查詢
給每個消息都設(shè)置一個獨一無二的 uuid,所有的消息,我們都要存一個 uuid。
我們在消費消息的時候,首先去持久化系統(tǒng)中查詢一下看這個看是否以前消費過,如沒有消費過,在進行消費,如果已經(jīng)消費過,丟棄就好了。
下圖表明了這種方案:
方案二:利用冪等
冪等(Idempotence)在數(shù)學(xué)上是這樣定義的,如果一個函數(shù) f(x) 滿足:f(f(x)) = f(x),則函數(shù) f(x) 滿足冪等性。
這個概念被拓展到計算機領(lǐng)域,被用來描述一個操作、方法或者服務(wù)。一個冪等操作的特點是,其任意多次執(zhí)行所產(chǎn)生的影響均與一次執(zhí)行的影響相同。
一個冪等的方法,使用同樣的參數(shù),對它進行多次調(diào)用和一次調(diào)用,對系統(tǒng)產(chǎn)生的影響是一樣的。所以,對于冪等的方法,不用擔(dān)心重復(fù)執(zhí)行會對系統(tǒng)造成任何改變。
我們舉個例子來說明一下。在不考慮并發(fā)的情況下,“將 X 老師的賬戶余額設(shè)置為 100 萬元”,執(zhí)行一次后對系統(tǒng)的影響是,X 老師的賬戶余額變成了 100 萬元。
只要提供的參數(shù) 100 萬元不變,那即使再執(zhí)行多少次,X 老師的賬戶余額始終都是 100 萬元,不會變化,這個操作就是一個冪等的操作。
再舉一個例子,“將 X 老師的余額加 100 萬元”,這個操作它就不是冪等的,每執(zhí)行一次,賬戶余額就會增加 100 萬元,執(zhí)行多次和執(zhí)行一次對系統(tǒng)的影響(也就是賬戶的余額)是不一樣的。
所以,通過這兩個例子,我們可以想到如果系統(tǒng)消費消息的業(yè)務(wù)邏輯具備冪等性,那就不用擔(dān)心消息重復(fù)的問題了,因為同一條消息,消費一次和消費多次對系統(tǒng)的影響是完全一樣的。也就可以認(rèn)為,消費多次等于消費一次。
那么,如何實現(xiàn)冪等操作呢?最好的方式就是,從業(yè)務(wù)邏輯設(shè)計上入手,將消費的業(yè)務(wù)邏輯設(shè)計成具備冪等性的操作。
但是,不是所有的業(yè)務(wù)都能設(shè)計成天然冪等的,這里就需要一些方法和技巧來實現(xiàn)冪等。
下面我們介紹一種常用的方法:利用數(shù)據(jù)庫的唯一約束實現(xiàn)冪等。
例如,我們剛剛提到的那個不具備冪等特性的轉(zhuǎn)賬的例子:將 X 老師的賬戶余額加 100 萬元。在這個例子中,我們可以通過改造業(yè)務(wù)邏輯,讓它具備冪等性。
首先,我們可以限定,對于每個轉(zhuǎn)賬單每個賬戶只可以執(zhí)行一次變更操作,在分布式系統(tǒng)中,這個限制實現(xiàn)的方法非常多,最簡單的是我們在數(shù)據(jù)庫中建一張轉(zhuǎn)賬流水表。
這個表有三個字段:轉(zhuǎn)賬單 ID、賬戶 ID 和變更金額,然后給轉(zhuǎn)賬單 ID 和賬戶 ID 這兩個字段聯(lián)合起來創(chuàng)建一個唯一約束,這樣對于相同的轉(zhuǎn)賬單 ID 和賬戶 ID,表里至多只能存在一條記錄。
這樣,我們消費消息的邏輯可以變?yōu)椋骸霸谵D(zhuǎn)賬流水表中增加一條轉(zhuǎn)賬記錄,然后再根據(jù)轉(zhuǎn)賬記錄,異步操作更新用戶余額即可?!?/p>
在轉(zhuǎn)賬流水表增加一條轉(zhuǎn)賬記錄這個操作中,由于我們在這個表中預(yù)先定義了“賬戶 ID 轉(zhuǎn)賬單 ID”的唯一約束,對于同一個轉(zhuǎn)賬單同一個賬戶只能插入一條記錄,后續(xù)重復(fù)的插入操作都會失敗,這樣就實現(xiàn)了一個冪等的操作。
方案三:設(shè)置前提條件
為更新的數(shù)據(jù)設(shè)置前置條件另外一種實現(xiàn)冪等的思路是,給數(shù)據(jù)變更設(shè)置一個前置條件,如果滿足條件就更新數(shù)據(jù),否則拒絕更新數(shù)據(jù),在更新數(shù)據(jù)的時候,同時變更前置條件中需要判斷的數(shù)據(jù)。
這樣,重復(fù)執(zhí)行這個操作時,由于第一次更新數(shù)據(jù)的時候已經(jīng)變更了前置條件中需要判斷的數(shù)據(jù),不滿足前置條件,則不會重復(fù)執(zhí)行更新數(shù)據(jù)操作。
比如,剛剛我們說過,“將 X 老師的賬戶的余額增加 100 萬元”這個操作并不滿足冪等性,我們可以把這個操作加上一個前置條件,變?yōu)椋骸叭绻?X 老師的賬戶當(dāng)前的余額為 500 萬元,將余額加 100 萬元”,這個操作就具備了冪等性。
對應(yīng)到消息隊列中的使用時,可以在發(fā)消息時在消息體中帶上當(dāng)前的余額,在消費的時候進行判斷數(shù)據(jù)庫中,當(dāng)前余額是否與消息中的余額相等,只有相等才執(zhí)行變更操作。
但是,如果我們要更新的數(shù)據(jù)不是數(shù)值,或者我們要做一個比較復(fù)雜的更新操作怎么辦?用什么作為前置判斷條件呢?
更加通用的方法是,給你的數(shù)據(jù)增加一個版本號屬性,每次更數(shù)據(jù)前,比較當(dāng)前數(shù)據(jù)的版本號是否和消息中的版本號一致,如果不一致就拒絕更新數(shù)據(jù),更新數(shù)據(jù)的同時將版本號 +1,一樣可以實現(xiàn)冪等。
Kafka 集群搭建
我們在工作中,為了保證環(huán)境的高可用,防止單點,Kafka 都是以集群的方式出現(xiàn)的,下面就帶領(lǐng)大家一起搭建一套 Kafka 集群環(huán)境。
我們在官網(wǎng)下載 Kafka,下載地址為:http://kafka.apache.org/downloads,下載我們需要的版本,推薦使用穩(wěn)定的版本。
搭建集群
①下載并解壓
- cd /usr/local/src
- wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.4.0/kafka_2.11-2.4.0.tgz
- mkdir /data/servers
- tar xzvf kafka_2.11-2.4.0.tgz -C /data/servers/
- cd /data/servers/kafka_2.11-2.4.0
②修改配置文件
Kafka 的配置文件 $KAFKA_HOME/config/server.properties,主要修改一下下面幾項:
- 確保每個機器上的id不一樣
- broker.id=0
- 配置服務(wù)端的監(jiān)控地址
- listeners=PLAINTEXT://192.168.51.128:9092
- kafka 日志目錄
- log.dirs=/data/servers/kafka_2.11-2.4.0/logs
- #kafka設(shè)置的partitons的個數(shù)
- num.partitions=1
- zookeeper的連接地址, 如果有自己的zookeeper集群, 請直接使用自己搭建的zookeeper集群
- zookeeper.connect=192.168.51.128:2181
因為我自己是本機做實驗,所有使用的是一個主機的不同端口,在線上,就是不同的機器,大家參考即可。
我們這里使用 Kafka 的 Zookeeper,只啟動一個節(jié)點,但是正真的生產(chǎn)過程中,是需要 Zookeeper 集群,自己搭建就好,后期我們也會出 Zookeeper 的教程,大家請關(guān)注就好了。
③拷貝 3 份配置文件
- #創(chuàng)建對應(yīng)的日志目錄
- mkdir -p /data/servers/kafka_2.11-2.4.0/logs/9092
- mkdir -p /data/servers/kafka_2.11-2.4.0/logs/9093
- mkdir -p /data/servers/kafka_2.11-2.4.0/logs/9094
- #拷貝三份配置文件
- cp server.properties server_9092.properties
- cp server.properties server_9093.properties
- cp server.properties server_9094.properties
④修改不同端口對應(yīng)的文件
- #9092的id為0, 9093的id為1, 9094的id為2
- broker.id=0
- # 配置服務(wù)端的監(jiān)控地址, 分別在不通的配置文件中寫入不同的端口
- listeners=PLAINTEXT://192.168.51.128:9092
- # kafka 日志目錄, 目錄也是對應(yīng)不同的端口
- log.dirs=/data/servers/kafka_2.11-2.4.0/logs/9092
- # kafka設(shè)置的partitons的個數(shù)
- num.partitions=1
- # zookeeper的連接地址, 如果有自己的zookeeper集群, 請直接使用自己搭建的zookeeper集群
- zookeeper.connect=192.168.51.128:2181
修改 Zookeeper 的配置文件:
- dataDir=/data/servers/zookeeper
- server.1=192.168.51.128:2888:3888
然后創(chuàng)建 Zookeeper 的 myid 文件:
- echo "1"> /data/servers/zookeeper/myid
⑤啟動 Zookeeper
使用 Kafka 內(nèi)置的 Zookeeper:
- cd /data/servers/kafka_2.11-2.4.0/bin
- zookeeper-server-start.sh -daemon ../config/zookeeper.properties
- netstat -anp |grep 2181
啟動 Kafka:
- ./kafka-server-start.sh -daemon ../config/server_9092.properties
- ./kafka-server-start.sh -daemon ../config/server_9093.properties
- ./kafka-server-start.sh -daemon ../config/server_9094.properties
Kafka 的操作
①Topic
我們先來看一下創(chuàng)建 Topic 常用的參數(shù)吧:
示例:
- cd /data/servers/kafka_2.11-2.4.0/bin
- # 創(chuàng)建topic test1
- kafka-topics.sh --create --bootstrap-server=192.168.51.128:9092,10.231.128.96:9093,192.168.51.128:9094 --replication-factor 1 --partitions 1 --topic test1
- # 創(chuàng)建topic test2
- kafka-topics.sh --create --bootstrap-server=192.168.51.128:9092,10.231.128.96:9093,192.168.51.128:9094 --replication-factor 1 --partitions 1 --topic test2
- # 查看topic
- kafka-topics.sh --list --bootstrap-server=192.168.51.128:9092,10.231.128.96:9093,192.168.51.128:9094
②自動創(chuàng)建 Topic
我們在工作中,如果我們不想去管理 Topic,可以通過 Kafka 的配置文件來管理。
我們可以讓 Kafka 自動創(chuàng)建 Topic,需要在我們的 Kafka 配置文件中加入如下配置文件:
- auto.create.topics.enable=true
如果刪除 Topic 想達(dá)到物理刪除的目的,也是需要配置的:
- delete.topic.enable=true
③發(fā)送消息
他們可以通過客戶端的命令生產(chǎn)消息,先來看看 kafka-console-producer.sh 常用的幾個參數(shù)吧:
這個參數(shù)是必須的:
- kafka-console-producer.sh --broker-list 192.168.51.128:9092,192.168.51.128:9093,192.168.51.128:9094 --topic test1
④消費消息
我們也還是先來看看 kafka-console-consumer.sh 的參數(shù)吧:
- kafka-console-consumer.sh --bootstrap-server 192.168.51.128:9092,192.168.51.128:9093,192.168.51.128:9094 --topic test1 ---beginning
Kafka 的日志
Kafka 的日志分兩種:
那我們就來說說備份和分區(qū)吧:我們創(chuàng)建一個分區(qū),一個備份,那么 test 就應(yīng)該在三臺機器上或者三個數(shù)據(jù)目錄只有一個 test-0。(分區(qū)的下標(biāo)是從 0 開始的)
如果我們創(chuàng)建 N 個分區(qū),我們就會在三個服務(wù)器上發(fā)現(xiàn),test_0-n,如果我們創(chuàng)建 M 個備份,我們就會在發(fā)現(xiàn),test_0 到 test_n 每一個都是 M 個。
Kafka API
使用 Kafka 原生的 API
①消費者自動提交
定義自己的生產(chǎn)者:
- import org.apache.kafka.clients.producer.Callback;
- import org.apache.kafka.clients.producer.KafkaProducer;
- import org.apache.kafka.clients.producer.ProducerRecord;
- import org.apache.kafka.clients.producer.RecordMetadata;
- import java.util.Properties;
- /**
- * @ClassName MyKafkaProducer
- * @Description TODO
- * @Author lingxiangxiang
- * @Date 3:37 PM
- * @Version 1.0
- **/
- public class MyKafkaProducer {
- private org.apache.kafka.clients.producer.KafkaProducer
producer; - public MyKafkaProducer() {
- Properties properties = new Properties();
- properties.put("bootstrap.servers", "192.168.51.128:9092,192.168.51.128:9093,192.168.51.128:9094");
- properties.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
- properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- // 設(shè)置批量發(fā)送
- properties.put("batch.size", 16384);
- // 批量發(fā)送的等待時間50ms, 超過50ms, 不足批量大小也發(fā)送
- properties.put("linger.ms", 50);
- this.producer = new org.apache.kafka.clients.producer.KafkaProducer
(properties); - }
- public boolean sendMsg() {
- boolean result = true;
- try {
- // 正常發(fā)送, test2是topic, 0代表的是分區(qū), 1代表的是key, hello world是發(fā)送的消息內(nèi)容
- final ProducerRecord
record = new ProducerRecord ("test2", 0, 1, "hello world"); - producer.send(record);
- // 有回調(diào)函數(shù)的調(diào)用
- producer.send(record, new Callback() {
- @Override
- public void onCompletion(RecordMetadata recordMetadata, Exception e) {
- System.out.println(recordMetadata.topic());
- System.out.println(recordMetadata.partition());
- System.out.println(recordMetadata.offset());
- }
- });
- // 自己定義一個類
- producer.send(record, new MyCallback(record));
- } catch (Exception e) {
- result = false;
- }
- return result;
- }
- }
定義生產(chǎn)者發(fā)送成功的回調(diào)函數(shù):
- import org.apache.kafka.clients.producer.Callback;
- import org.apache.kafka.clients.producer.RecordMetadata;
- /**
- * @ClassName MyCallback
- * @Description TODO
- * @Author lingxiangxiang
- * @Date 3:51 PM
- * @Version 1.0
- **/
- public class MyCallback implements Callback {
- private Object msg;
- public MyCallback(Object msg) {
- this.msg = msg;
- }
- @Override
- public void onCompletion(RecordMetadata metadata, Exception e) {
- System.out.println("topic = " + metadata.topic());
- System.out.println("partiton = " + metadata.partition());
- System.out.println("offset = " + metadata.offset());
- System.out.println(msg);
- }
- }
生產(chǎn)者測試類:在生產(chǎn)者測試類中,自己遇到一個坑,就是最后自己沒有加 sleep,就是怎么檢查自己的代碼都沒有問題,但是最后就是沒法發(fā)送成功消息,最后加了一個 sleep 就可以了。
因為主函數(shù) main 已經(jīng)執(zhí)行完退出,但是消息并沒有發(fā)送完成,需要進行等待一下。當(dāng)然,你在生產(chǎn)環(huán)境中可能不會遇到這樣問題,呵呵!
代碼如下:
- import static java.lang.Thread.sleep;
- /**
- * @ClassName MyKafkaProducerTest
- * @Description TODO
- * @Author lingxiangxiang
- * @Date 3:46 PM
- * @Version 1.0
- **/
- public class MyKafkaProducerTest {
- public static void main(String[] args) throws InterruptedException {
- MyKafkaProducer producer = new MyKafkaProducer();
- boolean result = producer.sendMsg();
- System.out.println("send msg " + result);
- sleep(1000);
- }
- }
消費者類:
- import kafka.utils.ShutdownableThread;
- import org.apache.kafka.clients.consumer.ConsumerRecord;
- import org.apache.kafka.clients.consumer.ConsumerRecords;
- import org.apache.kafka.clients.consumer.KafkaConsumer;
- import java.util.Arrays;
- import java.util.Collections;
- import java.util.Properties;
- /**
- * @ClassName MyKafkaConsumer
- * @Description TODO
- * @Author lingxiangxiang
- * @Date 4:12 PM
- * @Version 1.0
- **/
- public class MyKafkaConsumer extends ShutdownableThread {
- private KafkaConsumer
consumer; - public MyKafkaConsumer() {
- super("KafkaConsumerTest", false);
- Properties properties = new Properties();
- properties.put("bootstrap.servers", "192.168.51.128:9092,192.168.51.128:9093,192.168.51.128:9094");
- properties.put("group.id", "mygroup");
- properties.put("enable.auto.commit", "true");
- properties.put("auto.commit.interval.ms", "1000");
- properties.put("session.timeout.ms", "30000");
- properties.put("heartbeat.interval.ms", "10000");
- properties.put("auto.offset.reset", "earliest");
- properties.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
- properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- this.consumer = new KafkaConsumer
(properties); - }
- @Override
- public void doWork() {
- consumer.subscribe(Arrays.asList("test2"));
- ConsumerRecords
records = consumer.poll(1000); - for (ConsumerRecord record : records) {
- System.out.println("topic = " + record.topic());
- System.out.println("partition = " + record.partition());
- System.out.println("key = " + record.key());
- System.out.println("value = " + record.value());
- }
- }
- }
消費者的測試類:
- /**
- * @ClassName MyConsumerTest
- * @Description TODO
- * @Author lingxiangxiang
- * @Date 4:23 PM
- * @Version 1.0
- **/
- public class MyConsumerTest {
- public static void main(String[] args) {
- MyKafkaConsumer consumer = new MyKafkaConsumer();
- consumer.start();
- System.out.println("==================");
- }
- }
②消費者同步手動提交
前面的消費者都是以自動提交 Offset 的方式對 Broker 中的消息進行消費的,但自動提交 可能會出現(xiàn)消息重復(fù)消費的情況。
所以在生產(chǎn)環(huán)境下,很多時候需要對 Offset 進行手動提交, 以解決重復(fù)消費的問題。
手動提交又可以劃分為同步提交、異步提交,同異步聯(lián)合提交。這些提交方式僅僅是 doWork() 方法不相同,其構(gòu)造器是相同的。
所以下面首先在前面消費者類的基礎(chǔ)上進行構(gòu)造器的修改,然后再分別實現(xiàn)三種不同的提交方式。
同步提交方式是,消費者向 Broker 提交 Offset 后等待 Broker 成功響應(yīng)。若沒有收到響應(yīng),則會重新提交,直到獲取到響應(yīng)。
而在這個等待過程中,消費者是阻塞的。其嚴(yán)重影響了消費者的吞吐量。
修改前面的 MyKafkaConsumer.java, 主要修改下面的配置:
- import kafka.utils.ShutdownableThread;
- import org.apache.kafka.clients.consumer.ConsumerRecord;
- import org.apache.kafka.clients.consumer.ConsumerRecords;
- import org.apache.kafka.clients.consumer.KafkaConsumer;
- import java.util.Arrays;
- import java.util.Collections;
- import java.util.Properties;
- /**
- * @ClassName MyKafkaConsumer
- * @Description TODO
- * @Author lingxiangxiang
- * @Date 4:12 PM
- * @Version 1.0
- **/
- public class MyKafkaConsumer extends ShutdownableThread {
- private KafkaConsumer
consumer; - public MyKafkaConsumer() {
- super("KafkaConsumerTest", false);
- Properties properties = new Properties();
- properties.put("bootstrap.servers", "192.168.51.128:9092,192.168.51.128:9093,192.168.51.128:9094");
- properties.put("group.id", "mygroup");
- // 這里要修改成手動提交
- properties.put("enable.auto.commit", "false");
- // properties.put("auto.commit.interval.ms", "1000");
- properties.put("session.timeout.ms", "30000");
- properties.put("heartbeat.interval.ms", "10000");
- properties.put("auto.offset.reset", "earliest");
- properties.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
- properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- this.consumer = new KafkaConsumer
(properties); - }
- @Override
- public void doWork() {
- consumer.subscribe(Arrays.asList("test2"));
- ConsumerRecords
records = consumer.poll(1000); - for (ConsumerRecord record : records) {
- System.out.println("topic = " + record.topic());
- System.out.println("partition = " + record.partition());
- &nbs
當(dāng)前標(biāo)題:從未如此簡單:10分鐘帶你逆襲Kafka!
標(biāo)題路徑:http://uogjgqi.cn/article/dhcdchp.html

我們在微信上24小時期待你的聲音
解答本文疑問/技術(shù)咨詢/運營咨詢/技術(shù)建議/互聯(lián)網(wǎng)交流