掃二維碼與項目經(jīng)理溝通
我們在微信上24小時期待你的聲音
解答本文疑問/技術(shù)咨詢/運營咨詢/技術(shù)建議/互聯(lián)網(wǎng)交流
消息的消費一般有兩種模式,推模式和拉模式。推模式是服務(wù)端主動將消息推送給消費者,而拉模式是消費者主動向服務(wù)端發(fā)起請求來拉取消息。kakfa采用的是拉模式,這樣可以很好的控制消費速率。那么kafka消費的具體工作流程是什么樣的呢?kafka的位移管理又是怎么樣的呢?

kafka是以消費者組進行消費,一個消費者組,由多個consumer組成,他們和topic的消費規(guī)則如下:
通過這種分組、分區(qū)的消費方式,可以提高消費者的吞吐量,同時也能夠?qū)崿F(xiàn)消息的發(fā)布/訂閱模式和點對點兩種模式。
消費者消費總體分為兩個步驟,第一步是制定消費的方案,就是這個組下哪個消費者消費哪個分區(qū),第二個是建立網(wǎng)絡(luò)連接,獲取消息數(shù)據(jù)。
現(xiàn)在已經(jīng)初始化消費者組信息,知道哪個消費者消費哪個分區(qū),接著我們來看看消費者細節(jié)。
前面簡單提到了消費者組初始化的時候會對分區(qū)進行分配,那么具體的分配策略是什么呢,也就是哪個消費者消費哪個分區(qū)數(shù)據(jù)?
kafka有四種主流的分區(qū)分配策略: Range、RoundRobin、Sticky、CooperativeSticky??梢酝ㄟ^配置參數(shù)partition.assignment.strategy,修改分區(qū)的分配策略。默認策略是Range + CooperativeSticky。Kafka可以同時使用多個分區(qū)分配策略。
如上圖所示:有 7 個分區(qū),3 個消費者,排序后的分區(qū)將會是0,1,2,3,4,5,6;消費者排序完之后將會是C0,C1,C2。7/3 = 2 余 1 ,除不盡,那么 消費者 C0 便會多消費 1 個分區(qū)。 8/3=2余2,除不盡,那么C0和C1分別多消費一個。
這種方式容易造成數(shù)據(jù)傾斜!如果有 N 多個 topic,那么針對每個 topic,消費者 C0都將多消費 1 個分區(qū),topic越多,C0消費的分區(qū)會比其他消費者明顯多消費 N 個分區(qū)。
RoundRobin 針對集群中所有topic而言,RoundRobin 輪詢分區(qū)策略,是把所有的 partition 和所有的consumer 都列出來,然后按照 hashcode 進行排序,最后通過輪詢算法來分配 partition 給到各個消費者。
Sticky是粘性的意思,它是從 0.11.x 版本開始引入這種分配策略,首先會盡量均衡的放置分區(qū)到消費者上面,在出現(xiàn)同一消費者組內(nèi)消費者出現(xiàn)問題的時候,在rebalance會盡量保持原有分配的分區(qū)不變化,這樣可以節(jié)省開銷。
Cooperative Sticky和Sticky類似,但是它會將原來的一次大規(guī)模rebalance操作,拆分成了多次小規(guī)模的rebalance,直至最終平衡完成,所以體驗上會更好。
關(guān)于什么是rebalance繼續(xù)往下看你就知道了。
上面也提到了rebalance,也就是再均衡。當(dāng)kafka發(fā)生下面的情況會進行在均衡,也就是重新給消費者分配分區(qū):
消費者需要保存當(dāng)前消費到分區(qū)的什么位置了,這樣哪怕消費者故障,重啟后也能繼續(xù)消費,這就是消費者的維護offset管理。
消費者位移offset存儲在哪呢?
如何查看__consumer_offsets主題內(nèi)容?
bin/kafka-console-consumer.sh --topic
__consumer_offsets --bootstrap-server hadoop102:9092 --
consumer.config config/consumer.properties --formatter
"kafka.coordinator.group.GroupMetadataManager$OffsetsMessageForm
atter" --from-beginning
## topic1 1號分區(qū)
[offset,topic1,1]::OffsetAndMetadata(offset=7,
leaderEpoch=Optional[0], metadata=, commitTimestamp=1622442520203,
expireTimestamp=None)
## topic1 0號分區(qū)
[offset,topic1,0]::OffsetAndMetadata(offset=8,
leaderEpoch=Optional[0], metadata=, commitTimestamp=1622442520203,
expireTimestamp=None)
消費者是如何提交保存位移offset呢?
為了使我們能夠?qū)W⒂谧约旱臉I(yè)務(wù)邏輯,kafka默認提供了自動提交offset的功能。這個由消費者客戶端參數(shù) enable.auto.commit 配置, 默認值為 true 。當(dāng)然這個默認的自動提交不是每消費一條消息就提交一次,而是定期提交,這個定期的周期時間由客戶端參數(shù) auto.commit.interval.ms 配置,默認值為 5 秒。
自動提交會帶來什么問題?
自動提交消費位移的方式非常簡便,但會帶來是重復(fù)消費的問題。
假設(shè)剛剛提交完一次消費位移,然后拉取一批消息進行消費,在下一次自動提交消費位移之前,消費者崩潰了,那么又得從上一次位移提交的地方重新開始消費,這樣便發(fā)生了重復(fù)消費的現(xiàn)象。
我們可以通過減小位移提交的時間間隔來減小重復(fù)消息的窗口大小,但這樣 并不能避免重復(fù)消費的發(fā)送,而且也會使位移提交更加頻繁。
很多時候并不是說拉取到消息就算消費完成,而是需要將消息寫入數(shù)據(jù)庫、寫入本地緩存,或者是更 加復(fù)雜的業(yè)務(wù)處理。在這些場景下,所有的業(yè)務(wù)處理完成才能認為消息被成功消費。手動的提交方式可以讓開發(fā)人員根據(jù)程序的邏輯在合適的地方進行位移提交。
// 是否自動提交 offset
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);手動提交可以細分為同步提交和異步提交,對應(yīng)于 KafkaConsumer 中的 commitSync()和 commitAsync()兩種類型的方法。
同步提交會阻塞當(dāng)前線程,一直到提交成功,并且會自動失敗重試(由不可控因素導(dǎo)致,也會出現(xiàn)提交失?。?,它必須等待offset提交完畢,再去消費下一批數(shù)據(jù)。
// 同步提交 offset
consumer.commitSync();
異步提交則沒有失敗重試機制,故有可能提交失敗。它發(fā)送完提交offset請求后,就開始消費下一批數(shù)據(jù)了。
// 異步提交 offset
consumer.commitAsync();那么手動提交會帶來什么問題呢?可能會出現(xiàn)"漏消息"的情況。
設(shè)置offset為手動提交,當(dāng)offset被提交時,數(shù)據(jù)還在內(nèi)存中未落盤,此時剛好消費者線程被kill掉,那么offset已經(jīng)提交,但是數(shù)據(jù)未處理,導(dǎo)致這部分內(nèi)存中的數(shù)據(jù)丟失。
我們可以通過消費者事物來解決這樣的問題。
其實無論是手動提交還是自動提交,都有可能出現(xiàn)消息重復(fù)和是漏消息,與我們的編程模型有關(guān),需要我們開發(fā)的時候根據(jù)消息的重要程度來選擇合適的消費方案。
一個正常的消費邏輯需要具備以下幾個步驟:
(1)配置消費者客戶端參數(shù)及創(chuàng)建相應(yīng)的消費者實例;
(2)訂閱主題;
(3)拉取消息并消費;
(4)提交消費位移 offset;
(5)關(guān)閉消費者實例。
public class MyConsumer {
public static void main(String[] args) {
Properties props = new Properties();
// 定義 kakfa 服務(wù)的地址,不需要將所有 broker 指定上
props.put("bootstrap.servers", "doitedu01:9092");
// 制定 consumer group
props.put("group.id", "g1");
// 是否自動提交 offset
props.put("enable.auto.commit", "true");
// 自動提交 offset 的時間間隔
props.put("auto.commit.interval.ms", "1000");
// key 的反序列化類
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// value 的反序列化類
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// 如果沒有消費偏移量記錄,則自動重設(shè)為起始 offset:latest, earliest, none
props.put("auto.offset.reset","earliest");
// 定義 consumer
KafkaConsumer consumer = new KafkaConsumer<>(props);
// 消費者訂閱的 topic, 可同時訂閱多個
consumer.subscribe(Arrays.asList("first", "test","test1"));
while (true) {
// 讀取數(shù)據(jù),讀取超時時間為 100ms
ConsumerRecords records = consumer.poll(100);
for (ConsumerRecord record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
}
consumer.subscribe(Arrays.asList(topicl ));
consumer subscribe(Arrays.asList(topic2))
如果消費者采用的是正則表達式的方式(subscribe(Pattern))訂閱, 在之后的過程中,如果 有人又創(chuàng)建了新的主題,并且主題名字與正表達式相匹配,那么這個消費者就可以消費到 新添加的主題中的消息。
consumer.subscribe(Pattern.compile ("topic.*" ));
消費者不僅可以通過 KafkaConsumer.subscribe()方法訂閱主題,還可直接訂閱某些主題的指定分區(qū)。
consumer.assign(Arrays.asList(new TopicPartition ("tpc_1" , 0),new TopicPartition(“tpc_2”,1))) ;
通過unsubscribe()方法采取消主題的訂閱。
consumer.unsubscribe();
kafka 中的消息消費是一個不斷輪詢的過程,消費者所要做的就是重復(fù)地調(diào)用 poll() 方法, poll()方法返回的是所訂閱的主題(分區(qū))上的一組消息。
對于 poll () 方法而言,如果某些分區(qū)中沒有可供消費的消息,那么此分區(qū)對應(yīng)的消息拉取的結(jié)果就為空。
public ConsumerRecords poll(final Duration timeout) 超時時間參數(shù) timeout ,用來控制 poll() 方法的阻塞時間,在消費者的緩沖區(qū)里沒有可用數(shù)據(jù)時會發(fā)生阻塞。
有些時候,我們需要一種更細粒度的掌控,可以讓我們從特定的位移處開始拉取消息,而 KafkaConsumer 中的 seek( 方法正好提供了這個功能,讓我們可以追前消費或回溯消費。
public void seek(TopicPartiton partition,long offset)
最后我們總結(jié)一下消費者中重要的參數(shù)配置。
|
參數(shù)名稱 |
描述 |
|
bootstrap.servers |
向 Kafka 集群建立初始連接用到的 host/port 列表。 |
|
key.deserializer 和value.deserializer |
指定接收消息的 key 和 value 的反序列化類型。一定要寫全類名。 |
|
group.id |
標記消費者所屬的消費者組。 |
|
enable.auto.commit |
默認值為 true,消費者會自動周期性地向服務(wù)器提交偏移量。 |
|
auto.commit.interval.ms |
如果設(shè)置了 enable.auto.commit 的值為 true, 則該值定義了消費者偏移量向 Kafka 提交的頻率,默認 5s。 |
|
auto.offset.reset |
當(dāng) Kafka 中沒有初始偏移量或當(dāng)前偏移量在服務(wù)器中不存在(如,數(shù)據(jù)被刪除了),該如何處理? earliest:自動重置偏移量到最早的偏移量。 latest:默認,自動重置偏移量為最新的偏移量。 none:如果消費組原來的(previous)偏移量不存在,則向消費者拋異常。 anything:向消費者拋異常。 |
|
offsets.topic.num.partitions |
__consumer_offsets 的分區(qū)數(shù),默認是 50 個分區(qū)。 |
|
heartbeat.interval.ms |
Kafka 消費者和 coordinator 之間的心跳時間,默認 3s。該條目的值必須小于 session.timeout.ms ,也不應(yīng)該高于session.timeout.ms 的 1/3。 |
|
session.timeout.ms |
Kafka 消費者和 coordinator 之間連接超時時間,默認 45s。超過該值,該消費者被移除,消費者組執(zhí)行再平衡。 |
|
max.poll.interval.ms |
消費者處理消息的最大時長,默認是 5 分鐘。超過該值,該消費者被移除,消費者組執(zhí)行再平衡。 |
|
fetch.min.bytes |
默認 1 個字節(jié)。消費者獲取服務(wù)器端一批消息最小的字節(jié)數(shù)。 |
|
fetch.max.wait.ms |
默認 500ms。如果沒有從服務(wù)器端獲取到一批數(shù)據(jù)的最小字節(jié)數(shù)。該時間到,仍然會返回數(shù)據(jù)。 |
|
fetch.max.bytes |
默認 Default: 52428800(50 m)。消費者獲取服務(wù)器端一批消息最大的字節(jié)數(shù)。如果服務(wù)器端一批次的數(shù)據(jù)大于該值(50m)仍然可以拉取回來這批數(shù)據(jù),因此,這不是一個絕對最大值。一批次的大小受 message.max.bytes (broker config)or max.message.bytes (topic config)影響。 |
|
max.poll.records |
一次 poll 拉取數(shù)據(jù)返回消息的最大條數(shù),默認是 500 條。 |
kafka消費是很重要的一個環(huán)節(jié),本文總結(jié)kafka消費者的一些重要機制,包括消費者的整個流程,消費的分區(qū)策略,消費的再平衡以及消費的位移管理。在明白這些機制以后,簡單講解了如何使用消費者consumer的API以及消費者中重要的參數(shù)。

我們在微信上24小時期待你的聲音
解答本文疑問/技術(shù)咨詢/運營咨詢/技術(shù)建議/互聯(lián)網(wǎng)交流