掃二維碼與項(xiàng)目經(jīng)理溝通
我們在微信上24小時(shí)期待你的聲音
解答本文疑問/技術(shù)咨詢/運(yùn)營咨詢/技術(shù)建議/互聯(lián)網(wǎng)交流
作者: 微觀技術(shù) 2021-07-30 07:28:15
開發(fā)
架構(gòu)
Kafka Kafka作為一款開源的消息引擎,很多人并不陌生,但深入其源碼的同學(xué)估計(jì)不多,除非你是中間件團(tuán)隊(duì)消息系統(tǒng)維護(hù)者。

本文轉(zhuǎn)載自微信公眾號「微觀技術(shù)」,作者微觀技術(shù)。轉(zhuǎn)載本文請聯(lián)系微觀技術(shù)公眾號。
大家好,我是Tom哥~
Kafka作為一款開源的消息引擎,很多人并不陌生,但深入其源碼的同學(xué)估計(jì)不多,除非你是中間件團(tuán)隊(duì)消息系統(tǒng)維護(hù)者。但術(shù)業(yè)有專攻,市面上那么多開源框架且每個(gè)框架又經(jīng)常迭代升級,花精力深入了解每一個(gè)框架源碼不太現(xiàn)實(shí),本文會以業(yè)務(wù)視角羅列工作中大家需要熟知的一些知識
本篇文章的目錄:
它是一個(gè)分布式協(xié)調(diào)框架,負(fù)責(zé)協(xié)調(diào)管理并保存 Kafka 集群的所有元數(shù)據(jù)信息,比如集群都有哪些 Broker 在運(yùn)行、創(chuàng)建了哪些 Topic,每個(gè) Topic 都有多少分區(qū)以及這些分區(qū)的 Leader 副本都在哪些機(jī)器上等信息。
純二進(jìn)制的字節(jié)序列。當(dāng)然消息還是結(jié)構(gòu)化的,只是在使用之前都要將其轉(zhuǎn)換成二進(jìn)制的字節(jié)序列。
生產(chǎn)者程序中配置compression.type 參數(shù)即表示啟用指定類型的壓縮算法。
props.put(“compression.type”, “gzip”),它表明該 Producer 的壓縮算法使用的是GZIP。這樣 Producer 啟動后生產(chǎn)的每個(gè)消息集合都是經(jīng) GZIP 壓縮過的,故而能很好地節(jié)省網(wǎng)絡(luò)傳輸帶寬以及 Kafka Broker 端的磁盤占用。
但如果Broker又指定了不同的壓縮算法,如:Snappy,會將生產(chǎn)端的消息解壓然后按自己的算法重新壓縮。
各壓縮算法比較:吞吐量方面:LZ4 > Snappy > zstd 和 GZIP;而在壓縮比方面,zstd > LZ4 > GZIP > Snappy。
kafka默認(rèn)不指定壓縮算法。
當(dāng) Consumer pull消息時(shí),Broker 會原樣發(fā)送出去,當(dāng)消息到達(dá) Consumer 端后,由 Consumer 自行解壓縮還原成之前的消息。
編寫一個(gè)類實(shí)現(xiàn)org.apache.kafka.clients.Partitioner接口。實(shí)現(xiàn)內(nèi)部兩個(gè)方法:partition()和close()。然后顯式地配置生產(chǎn)者端的參數(shù)partitioner.class
常見的策略:
在new KafkaProducer 實(shí)例時(shí),生產(chǎn)者應(yīng)用會在后臺創(chuàng)建并啟動一個(gè)名為 Sender 的線程,該 Sender 線程開始運(yùn)行時(shí)首先會創(chuàng)建與 Broker 的連接。此時(shí)還不知道給哪個(gè)topic發(fā)消息,所以Producer 啟動時(shí)會發(fā)起與所有的 Broker 的連接。
Producer 通過metadata.max.age.ms 參數(shù)定期地去更新元數(shù)據(jù)信息,默認(rèn)值是 300000,即 5 分鐘,不管集群那邊是否有變化,Producer 每 5 分鐘都會強(qiáng)制刷新一次元數(shù)據(jù)以保證它是最新的數(shù)據(jù)。
Producer 使用帶回調(diào)通知的發(fā)送 API, producer.send(msg, callback)。
設(shè)置 acks = all。Producer 的一個(gè)參數(shù),表示所有副本都成功接收到消息,該消息才算是“已提交”,最高等級,acks的其它值說明。min.insync.replicas > 1,表示消息至少要被寫入到多少個(gè)副本才算是“已提交”
retries 是 Producer 的參數(shù)。當(dāng)出現(xiàn)網(wǎng)絡(luò)的瞬時(shí)抖動時(shí),消息發(fā)送可能會失敗,此時(shí)配置了 retries > 0 的 Producer 能夠自動重試消息發(fā)送,避免消息丟失。
設(shè)置參數(shù)props.put(“enable.idempotence”, ture),Producer 自動升級成冪等性 Producer,其他所有的代碼邏輯都不需要改變。Kafka 自動幫你做消息的重復(fù)去重。
原理很簡單,就是經(jīng)典的空間換時(shí)間,即在 Broker 端多保存一些字段。當(dāng) Producer 發(fā)送了具有相同字段值的消息后,Broker 能夠自動知曉這些消息已經(jīng)重復(fù)了,可以在后臺默默地把它們“丟棄”掉。
只能保證單分區(qū)、單會話上的消息冪等性。一個(gè)冪等性 Producer 能夠保證某個(gè)topic的一個(gè)分區(qū)上不出現(xiàn)重復(fù)消息,但無法實(shí)現(xiàn)多個(gè)分區(qū)的冪等性。比如采用輪詢,下一次提交換了一個(gè)分區(qū)就無法解決
能夠保證將消息原子性地寫入到多個(gè)分區(qū)中。這批消息要么全部寫入成功,要么全部失敗。能夠保證跨分區(qū)、跨會話間的冪等性。
- producer.initTransactions();
- try {
- producer.beginTransaction();
- producer.send(record1);
- producer.send(record2);
- //提交事務(wù)
- producer.commitTransaction();
- } catch (KafkaException e) {
- //事務(wù)終止
- producer.abortTransaction();
- }
實(shí)際上即使寫入失敗,Kafka 也會把它們寫入到底層的日志中,也就是說 Consumer 還是會看到這些消息。要不要處理在 Consumer 端設(shè)置 isolation.level ,這個(gè)參數(shù)有兩個(gè)值:
Kafka 使用消息日志(Log)來保存數(shù)據(jù),一個(gè)日志就是磁盤上一個(gè)只能追加寫(Append-only)消息的物理文件。因?yàn)橹荒茏芳訉懭?,故避免了緩慢的隨機(jī) I/O 操作,改為性能較好的順序 I/O 寫操作,這也是實(shí)現(xiàn) Kafka 高吞吐量特性的一個(gè)重要手段。
不過如果你不停地向一個(gè)日志寫入消息,最終也會耗盡所有的磁盤空間,因此 Kafka 必然要定期地刪除消息以回收磁盤。怎么刪除呢?
簡單來說就是通過日志段(Log Segment)機(jī)制。在 Kafka 底層,一個(gè)日志又近一步細(xì)分成多個(gè)日志段,消息被追加寫到當(dāng)前最新的日志段中,當(dāng)寫滿了一個(gè)日志段后,Kafka 會自動切分出一個(gè)新的日志段,并將老的日志段封存起來。Kafka 在后臺還有定時(shí)任務(wù)會定期地檢查老的日志段是否能夠被刪除,從而實(shí)現(xiàn)回收磁盤空間的目的。
相同的數(shù)據(jù)拷貝到多臺機(jī)器上。副本的數(shù)量是可以配置的。Kafka 中follow副本不會對外提供服務(wù)。
副本的工作機(jī)制也很簡單:生產(chǎn)者總是向leader副本寫消息;而消費(fèi)者總是從leader副本讀消息。至于follow副本,它只做一件事:向leader副本以異步方式發(fā)送pull請求,請求leader把最新的消息同步給它,必然有一個(gè)時(shí)間窗口導(dǎo)致它和leader中的數(shù)據(jù)是不一致的,或者說它是落后于leader。
主要是為了提升消費(fèi)者端的吞吐量。多個(gè)消費(fèi)者實(shí)例同時(shí)消費(fèi),加速整個(gè)消費(fèi)端的吞吐量(TPS)。
在一個(gè)消費(fèi)者組下,一個(gè)分區(qū)只能被一個(gè)消費(fèi)者消費(fèi),但一個(gè)消費(fèi)者可能被分配多個(gè)分區(qū),因而在提交位移時(shí)也就能提交多個(gè)分區(qū)的位移。如果1個(gè)topic有2個(gè)分區(qū),消費(fèi)者組有3個(gè)消費(fèi)者,有一個(gè)消費(fèi)者將無法分配到任何分區(qū),處于idle狀態(tài)。
理想情況下,Consumer 實(shí)例的數(shù)量應(yīng)該等于該 Group 訂閱topic(可能多個(gè))的分區(qū)總數(shù)。
消費(fèi)端先拉取并消費(fèi)消息,然后再ack更新offset。
1)消費(fèi)者程序啟動多個(gè)線程,每個(gè)線程維護(hù)專屬的 KafkaConsumer 實(shí)例,負(fù)責(zé)完整的消息拉取、消息處理流程。一個(gè)KafkaConsumer負(fù)責(zé)一個(gè)分區(qū),能保證分區(qū)內(nèi)的消息消費(fèi)順序。
缺點(diǎn):線程數(shù)受限于 Consumer 訂閱topic的總分區(qū)數(shù)。
2)任務(wù)切分成了消息獲取和消息處理兩個(gè)部分。消費(fèi)者程序使用單或多線程拉取消息,同時(shí)創(chuàng)建專門線程池執(zhí)行業(yè)務(wù)邏輯。優(yōu)點(diǎn):可以靈活調(diào)節(jié)消息獲取的線程數(shù),以及消息處理的線程數(shù)。
缺點(diǎn):無法保證分區(qū)內(nèi)的消息消費(fèi)順序。另外引入了多組線程,使得整個(gè)消息消費(fèi)鏈路被拉長,最終導(dǎo)致正確位移提交會變得異常困難,可能會出現(xiàn)消息的重復(fù)消費(fèi)或丟失。
1)老版本的 Consumer組把位移保存在 ZooKeeper 中,但很快發(fā)現(xiàn)zk并不適合頻繁的寫更新。
2)在新版本的 Consumer Group 中,Kafka 社區(qū)重新設(shè)計(jì)了 Consumer組的位移管理方式,采用了將位移保存在 Broker端的內(nèi)部topic中,也稱為“位移主題”,由kafka自己來管理。原理很簡單, Consumer的位移數(shù)據(jù)作為一條條普通的 Kafka 消息,提交到__consumer_offsets 中。它的消息格式由 Kafka 自己定義,用戶不能修改。位移主題的 Key 主要包括 3 部分內(nèi)容:
Kafka Consumer 提交位移的方式有兩種:自動提交位移和手動提交位移。
Kafka 使用Compact策略來刪除位移主題中的過期消息,避免該topic無限期膨脹。提供了專門的后臺線程定期地巡檢待 Compact 的主題,看看是否存在滿足條件的可刪除數(shù)據(jù)。
1)組成員數(shù)發(fā)生變更。比如有新的 Consumer 實(shí)例加入組或者離開組,又或是有 Consumer 實(shí)例崩潰被“踢出”組。(99%原因是由它導(dǎo)致)
2) 訂閱topic數(shù)發(fā)生變更。Consumer Group 可以使用正則表達(dá)式的方式訂閱topic,比如 consumer.subscribe(Pattern.compile(“t.*c”)) 就表明該 Group 訂閱所有以字母 t 開頭、字母 c 結(jié)尾的topic。在 Consumer Group 的運(yùn)行過程中,你新創(chuàng)建了一個(gè)滿足這樣條件的topic,那么該 Group 就會發(fā)生 Rebalance。
3) 訂閱topic的分區(qū)數(shù)發(fā)生變化。Kafka 目前只允許增加topic的分區(qū)數(shù)。當(dāng)分區(qū)數(shù)增加時(shí),也會觸發(fā)訂閱該topic的所有 Group 開啟 Rebalance。
Kafka的設(shè)計(jì)中多個(gè)分區(qū)的話無法保證全局的消息順序。如果一定要實(shí)現(xiàn)全局的消息順序,只能單分區(qū)。
方法二:通過有key分組,同一個(gè)key的消息放入同一個(gè)分區(qū),保證局部有序
基于保存時(shí)間,log.retention.hours
基于日志大小的清理策略。通過log.retention.bytes控制
組合方式

我們在微信上24小時(shí)期待你的聲音
解答本文疑問/技術(shù)咨詢/運(yùn)營咨詢/技術(shù)建議/互聯(lián)網(wǎng)交流