av激情亚洲男人的天堂国语,日韩欧美精品一中文字幕,无码av一区二区三区无码,国产又色又爽又刺激的a片,国产又色又爽又刺激的a片

ApacheKafka與SparkStreaming的兩種整合方法及其優(yōu)缺點(diǎn)

Apache Kafka與Spark Streaming的兩種整合方法及其優(yōu)缺點(diǎn)

譯文
作者:陳峻編譯 2018-10-24 09:00:26

大數(shù)據(jù)

Kafka

Spark 我們?cè)趯pache Kafka與Spark Streaming整合的實(shí)戰(zhàn)過(guò)程中,一般可以選用兩種方面來(lái)配置Spark Streaming,并接收來(lái)自Kafka的數(shù)據(jù)。第一種是利用接收器和Kafka的高級(jí)API;而第二種新的方法則并不使用接收器。這兩種方法在性能特征和語(yǔ)義保持上,有著不同的編程模式。

10余年的四方臺(tái)網(wǎng)站建設(shè)經(jīng)驗(yàn),針對(duì)設(shè)計(jì)、前端、開(kāi)發(fā)、售后、文案、推廣等六對(duì)一服務(wù),響應(yīng)快,48小時(shí)及時(shí)工作處理。成都全網(wǎng)營(yíng)銷的優(yōu)勢(shì)是能夠根據(jù)用戶設(shè)備顯示端的尺寸不同,自動(dòng)調(diào)整四方臺(tái)建站的顯示方式,使網(wǎng)站能夠適用不同顯示終端,在瀏覽器中調(diào)整網(wǎng)站的寬度,無(wú)論在任何一種瀏覽器上瀏覽網(wǎng)站,都能展現(xiàn)優(yōu)雅布局與設(shè)計(jì),從而大程度地提升瀏覽體驗(yàn)。成都創(chuàng)新互聯(lián)公司從事“四方臺(tái)網(wǎng)站設(shè)計(jì)”,“四方臺(tái)網(wǎng)站推廣”以來(lái),每個(gè)客戶項(xiàng)目都認(rèn)真落實(shí)執(zhí)行。

【51CTO.com快譯】Kafka與Spark Streaming的整合

我們?cè)趯pache Kafka與Spark Streaming整合的實(shí)戰(zhàn)過(guò)程中,一般可以選用兩種方面來(lái)配置Spark Streaming,并接收來(lái)自Kafka的數(shù)據(jù)。***種是利用接收器和Kafka的高級(jí)API;而第二種新的方法則并不使用接收器。這兩種方法在性能特征和語(yǔ)義保持上,有著不同的編程模式。

下面讓我們來(lái)詳細(xì)探究一下這兩種方法。

一、基于接收器的方法

此法運(yùn)用接收器(Receiver)來(lái)接收數(shù)據(jù)。而接收器是利用Kafka的高級(jí)消費(fèi)者(consumer)API來(lái)實(shí)現(xiàn)的。此外,接收到的數(shù)據(jù)會(huì)被存儲(chǔ)在Spark的各個(gè)執(zhí)行器(executor)中。然后由Spark Streaming所啟動(dòng)的作業(yè)來(lái)處理數(shù)據(jù)。

但是在出現(xiàn)失敗時(shí),這種方法的默認(rèn)配置可能會(huì)丟失數(shù)據(jù)。因此,我們必須在Spark Streaming中額外地啟用預(yù)寫(xiě)日志(write-ahead log),以確保數(shù)據(jù)的零丟失。它將所有接收到的Kafka數(shù)據(jù),同步地保存到某個(gè)分布式文件系統(tǒng)的預(yù)寫(xiě)日志中,以便在出現(xiàn)失敗時(shí)恢復(fù)所有的數(shù)據(jù)。

下面,我們將討論如何在Kafka-Spark Streaming應(yīng)用中,使用該基于接收器的方法。

1.鏈接

現(xiàn)在,先將您的Kafka Streaming應(yīng)用與如下的artifact相鏈接,對(duì)于Scala和Java類型的應(yīng)用,我們會(huì)用到SBT(Simple Build Tool)和Maven(一種構(gòu)建工具)的各種項(xiàng)目定義。

 

  
 
 
 
  1. groupId = org.apache.spark 
  2. artifactId = spark-streaming-kafka-0-8_2.11 
  3. version = 2.2.0 

而對(duì)于Python類型的應(yīng)用,我們必須在部署自己的應(yīng)用時(shí),額外添加上述庫(kù)、及其各種依賴項(xiàng)。

2.編程

隨后,我們?cè)趕treaming應(yīng)用的代碼中,通過(guò)導(dǎo)入KafkaUtils,來(lái)創(chuàng)建一項(xiàng)DStream輸入:

 

  
 
 
 
  1. import org.apache.spark.streaming.kafka._ 
  2. val kafkaStream = KafkaUtils.createStream(streamingContext, 
  3.     [ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume]) 

同樣,通過(guò)使用createStream的各種變形方式,我們可以制定出不同的鍵/值類,及其對(duì)應(yīng)的解碼類。

3.部署

通常情況下,對(duì)于任何Spark應(yīng)用而言,您都可以使用spark-submit來(lái)發(fā)布自己的應(yīng)用。當(dāng)然,就具體的Scala、Java和Python應(yīng)用來(lái)說(shuō),它們?cè)诩?xì)節(jié)上會(huì)略有不同。

其中,由于Python應(yīng)用缺少SBT和Maven的項(xiàng)目管理,我們可以使用–packages spark-streaming-kafka-0-8_2.11、及其各個(gè)依賴項(xiàng),直接添加到spark-submit處。

  
 
 
 
  1. ./bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0 ... 

此外,我們還可以從Maven的存儲(chǔ)庫(kù)中下載Maven artifact的spark-streaming-Kafka-0-8-assembly所對(duì)應(yīng)的JAR包,然后使用-jars,將其添加到spark-submit處。

二、直接方法(無(wú)接收器)

在基于接收器的方法之后,新的一種無(wú)接收器式“直接”方法誕生了。此法提供了更強(qiáng)大的端到端保證。它定期查詢Kafka在每個(gè)topic+分區(qū)(partition)中的***偏移量,而不再使用接收器去接收數(shù)據(jù)。同時(shí),它也定義了要在每個(gè)批次中處理的不同偏移范圍。特別是在那些處理數(shù)據(jù)的作業(yè)被啟動(dòng)時(shí),其簡(jiǎn)單消費(fèi)者(consumer)API就會(huì)被用于讀取Kafka中預(yù)定義的偏移范圍。可見(jiàn),此過(guò)程類似于從某個(gè)文件系統(tǒng)中讀取各種文件。

注:針對(duì)Scala和Java API,Spark在其1.3版本中引入了此功能;而針對(duì)Python API,它在其1.4版本中同樣引入了該功能。

下面,我們將討論如何在Streaming應(yīng)用中使用該方法,并深入了解更多有關(guān)消費(fèi)者API的鏈接:

1.鏈接

當(dāng)然,這種方法僅被Scala和Java應(yīng)用所支持,并且通過(guò)如下artifact來(lái)鏈接STB和Maven項(xiàng)目。

 

  
 
 
 
  1. groupId = org.apache.spark 
  2. artifactId = spark-streaming-kafka-0-8_2.11 
  3. version = 2.2.0 

2.編程

隨后,我們?cè)赟treaming應(yīng)用的代碼中,通過(guò)導(dǎo)入KafkaUtils,來(lái)創(chuàng)建一項(xiàng)DStream輸入:

 

  
 
 
 
  1. import org.apache.spark.streaming.kafka._ 
  2. val directKafkaStream = KafkaUtils.createDirectStream[ 
  3.     [key class], [value class], [key decoder class], [value decoder class] ]( 
  4.     streamingContext, [map of Kafka parameters], [set of topics to consume]) 

我們必須在Kafka的參數(shù)中,指定metadata.broker.list或bootstrap.servers,以便它能夠在默認(rèn)情況下,從各個(gè)Kafka分區(qū)的***偏移量開(kāi)始消費(fèi)。當(dāng)然,如果您在Kafka的參數(shù)中將auto.offset.reset配置為最小,那么它就會(huì)從最小的偏移開(kāi)始消費(fèi)。

此外,通過(guò)使用KafkaUtils.createDirectStream的各種變形方式,我們能夠從任意偏移量開(kāi)始消費(fèi)。當(dāng)然,我們也可以在每一個(gè)批次中,按照如下的方式去消費(fèi)Kafka的偏移量。

 

  
 
 
 
  1. // Hold a reference to the current offset ranges, so downstream can use it 
  2. var offsetRanges = Array.empty[OffsetRange] 
  3. directKafkaStream.transform { rdd => 
  4.   offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges 
  5.   rdd 
  6. }.map { 
  7.           ... 
  8. }.foreachRDD { rdd => 
  9.   for (o <- offsetRanges) { 
  10.     println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}") 
  11.   } 
  12.   ... 

如果您想使用基于Zookeeper的Kafka監(jiān)控工具(https://data-flair.training/blogs/zookeeper-in-kafka/),來(lái)顯示Streaming應(yīng)用的進(jìn)度,那么您也可以自行將其更新到Zookeeper中。

3.部署

該方面的部署過(guò)程與基于接收器的方法類似,此處就不贅述了。

三、直接方法的優(yōu)點(diǎn)

就Spark Streaming與Kafka整合的角度而言,第二種方法較***種方法有著如下的優(yōu)點(diǎn):

1.簡(jiǎn)化并行

無(wú)需創(chuàng)建與合并多個(gè)輸入的Kafka Streams(https://data-flair.training/blogs/kafka-streams/)。但是,Sparking Streaming會(huì)創(chuàng)建同樣多的RDD(Resilient Distributed Datasets,彈性分布式數(shù)據(jù)集)分區(qū),以供多個(gè)Kafka分區(qū)使用直接的方法進(jìn)行消費(fèi)。這些分區(qū)也會(huì)并行地從Kafka中讀取數(shù)據(jù)。因此我們可以說(shuō):在Kafka和RDD分區(qū)之間存在更容易被理解和調(diào)整的、一對(duì)一的映射關(guān)系。

2.效率

為了實(shí)現(xiàn)數(shù)據(jù)的零丟失,***種方法需要將數(shù)據(jù)存儲(chǔ)在預(yù)寫(xiě)日志中,以供進(jìn)一步復(fù)制數(shù)據(jù)。此方法的效率實(shí)際上是比較低的,因?yàn)閿?shù)據(jù)被Kafka和預(yù)寫(xiě)日志實(shí)際復(fù)制了兩次。而在直接的方法中,由于沒(méi)有了接收器,因此不需要預(yù)先寫(xiě)入日志,此問(wèn)題也就迎刃而解了。只要您擁有足夠多的Kafka數(shù)據(jù)保留,各種消息就能夠從Kafka中被恢復(fù)回來(lái)。

3.準(zhǔn)確到位的語(yǔ)義

在***種方法中,我們使用Kafka的高級(jí)API,在Zookeeper中存儲(chǔ)被消費(fèi)的偏移量。然而,這種傳統(tǒng)的、從Kafka中消費(fèi)數(shù)據(jù)的方式,雖然能夠確保數(shù)據(jù)的零丟失,但是在某些失敗情況下,數(shù)據(jù)可能會(huì)被小概率地消費(fèi)兩次。實(shí)際上,這種情況源自那些被Spark Streaming可靠地接收到的數(shù)據(jù),與Zookeeper跟蹤到的偏移量之間所產(chǎn)生的不一致。因此在第二種方法中,我們不再使用Zookeeper,而只是使用一個(gè)簡(jiǎn)單的Kafka API。Spark Streaming通過(guò)其各個(gè)檢查點(diǎn)(checkpoints),來(lái)跟蹤不同的偏移量,籍此消除了Spark Streaming和Zookeeper之間的不一致性。

可見(jiàn),就算出現(xiàn)了失敗的情況,那些記錄也都會(huì)被Spark Streaming有效地、準(zhǔn)確地一次性接收。它能夠確保我們的輸出操作,即:將數(shù)據(jù)保存到外部數(shù)據(jù)存儲(chǔ)庫(kù)時(shí),各種保存結(jié)果和偏移量的冪等性、和原子事務(wù)性,這同時(shí)也有助于實(shí)現(xiàn)準(zhǔn)確到位的語(yǔ)義。

不過(guò),這種方法也有一個(gè)缺點(diǎn):由于它不會(huì)在Zookeeper中更新各種偏移量,因此那些基于Zookeeper的Kafka監(jiān)控工具將無(wú)法顯示進(jìn)度。當(dāng)然,您也可以自行訪問(wèn)每個(gè)批次中由此方法處理的偏移量,并更新到Zookeeper之中。

結(jié)論

通過(guò)上述討論,我們學(xué)到了Kafka與Spark Streaming整合的全體概念。同時(shí),我們也討論了Kafka-Spark Streaming的兩種不同配置方法:接收器方法和直接方法,以及直接方法的幾項(xiàng)優(yōu)點(diǎn)。

原文標(biāo)題:Apache Kafka + Spark Streaming Integration,作者:Rinu Gour

【51CTO譯稿,合作站點(diǎn)轉(zhuǎn)載請(qǐng)注明原文譯者和出處為51CTO.com】


新聞標(biāo)題:ApacheKafka與SparkStreaming的兩種整合方法及其優(yōu)缺點(diǎn)
本文網(wǎng)址:http://uogjgqi.cn/article/dpisdpp.html
掃二維碼與項(xiàng)目經(jīng)理溝通

我們?cè)谖⑿派?4小時(shí)期待你的聲音

解答本文疑問(wèn)/技術(shù)咨詢/運(yùn)營(yíng)咨詢/技術(shù)建議/互聯(lián)網(wǎng)交流