av激情亚洲男人的天堂国语,日韩欧美精品一中文字幕,无码av一区二区三区无码,国产又色又爽又刺激的a片,国产又色又爽又刺激的a片

詳解Flink的窗口操作

我們經(jīng)常需要在一個(gè)時(shí)間窗口維度上對(duì)數(shù)據(jù)進(jìn)行聚合,窗口是流處理應(yīng)用中經(jīng)常需要解決的問(wèn)題。Flink的窗口算子為我們提供了方便易用的API,我們可以將數(shù)據(jù)流切分成一個(gè)個(gè)窗口,對(duì)窗口內(nèi)的數(shù)據(jù)進(jìn)行處理,下面為大家詳細(xì)講解一下Flink的窗口操作。

資源ssl適用于網(wǎng)站、小程序/APP、API接口等需要進(jìn)行數(shù)據(jù)傳輸應(yīng)用場(chǎng)景,ssl證書(shū)未來(lái)市場(chǎng)廣闊!成為成都創(chuàng)新互聯(lián)公司的ssl證書(shū)銷(xiāo)售渠道,可以享受市場(chǎng)價(jià)格4-6折優(yōu)惠!如果有意向歡迎電話聯(lián)系或者加微信:18980820575(備注:SSL證書(shū)合作)期待與您的合作!

一、窗口(window)的類(lèi)型

對(duì)于窗口的操作主要分為兩種,分別對(duì)于Keyedstream和Datastream。他們的主要區(qū)別也僅僅在于建立窗口的時(shí)候一個(gè)為.window(…),一個(gè)為.windowAll(…)。對(duì)于Keyedstream的窗口來(lái)說(shuō),他可以使得多任務(wù)并行計(jì)算,每一個(gè)logical key stream將會(huì)被獨(dú)立的進(jìn)行處理。

stream
      .keyBy(...)               "assigner"
     [.trigger(...)]            "trigger" (else default trigger)
     [.evictor(...)]            "evictor" (else no evictor)
     [.allowedLateness(...)]    "lateness" (else zero)
     [.sideOutputLateData(...)] "output tag" (else no side output for late data)
      .reduce/aggregate/fold/apply()      "function"
     [.getSideOutput(...)]      "output tag"

按照窗口的Assigner來(lái)分,窗口可以分為

Tumbling window, sliding window,session window,global window,custom window

每種窗口又可分別基于processing time和event time,這樣的話,窗口的類(lèi)型嚴(yán)格來(lái)說(shuō)就有很多。

還有一種window叫做count window,依據(jù)元素到達(dá)的數(shù)量進(jìn)行分配,之后也會(huì)提到。

窗口的生命周期開(kāi)始在第一個(gè)屬于這個(gè)窗口的元素到達(dá)的時(shí)候,結(jié)束于第一個(gè)不屬于這個(gè)窗口的元素到達(dá)的時(shí)候。

二、窗口的操作

2.1 Tumbling window

固定相同間隔分配窗口,每個(gè)窗口之間沒(méi)有重疊看圖一眼明白。

下面的例子定義了每隔3毫秒一個(gè)窗口的流:

WindowedStream
  
    Rates = rates    .keyBy(MovieRate::getUserId)    .window(TumblingEventTimeWindows.of(Time.milliseconds(3))); 
  

2.2 Sliding Windows

跟上面一樣,固定相同間隔分配窗口,只不過(guò)每個(gè)窗口之間有重疊。窗口重疊的部分如果比窗口小,窗口將會(huì)有多個(gè)重疊,即一個(gè)元素可能被分配到多個(gè)窗口里去。

下面的例子給出窗口大小為10毫秒,重疊為5毫秒的流:

WindowedStream
  
    Rates = rates                .keyBy(MovieRate::getUserId)                .window(SlidingEventTimeWindows.of(Time.milliseconds(10), Time.milliseconds(5))); 
  

2.3 Session window

這種窗口主要是根據(jù)活動(dòng)的事件進(jìn)行窗口化,他們通常不重疊,也沒(méi)有一個(gè)固定的開(kāi)始和結(jié)束時(shí)間。一個(gè)session window關(guān)閉通常是由于一段時(shí)間沒(méi)有收到元素。在這種用戶(hù)交互事件流中,我們首先想到的是將事件聚合到會(huì)話窗口中(一段用戶(hù)持續(xù)活躍的周期),由非活躍的間隙分隔開(kāi)。

// 靜態(tài)間隔時(shí)間
WindowedStream
  
    Rates = rates                .keyBy(MovieRate::getUserId)                .window(EventTimeSessionWindows.withGap(Time.milliseconds(10))); // 動(dòng)態(tài)時(shí)間 WindowedStream
   
     Rates = rates                .keyBy(MovieRate::getUserId)                .window(EventTimeSessionWindows.withDynamicGap(())); 
   
  

2.4 Global window

將所有相同keyed的元素分配到一個(gè)窗口里。好吧,就這樣:

WindowedStream
  
    Rates = rates    .keyBy(MovieRate::getUserId)    .window(GlobalWindows.create()); 
  

三、窗口函數(shù)

窗口函數(shù)就是這四個(gè):ReduceFunction,AggregateFunction,F(xiàn)oldFunction,ProcessWindowFunction。前兩個(gè)執(zhí)行得更有效,因?yàn)镕link可以增量地聚合每個(gè)到達(dá)窗口的元素。

Flink必須在調(diào)用函數(shù)之前在內(nèi)部緩沖窗口中的所有元素,所以使用ProcessWindowFunction進(jìn)行操作效率不高。不過(guò)ProcessWindowFunction可以跟其他的窗口函數(shù)結(jié)合使用,其他函數(shù)接受增量信息,ProcessWindowFunction接受窗口的元數(shù)據(jù)。

舉一個(gè)AggregateFunction的例子吧,下面代碼為MovieRate按user分組,且分配5毫秒的Tumbling窗口,返回每個(gè)user在窗口內(nèi)評(píng)分的所有分?jǐn)?shù)的平均值。

DataStream
  
   > Rates = rates                .keyBy(MovieRate::getUserId)                .window(TumblingEventTimeWindows.of(Time.milliseconds(5)))                .aggregate(new AggregateFunction
   
    >() {                    @Override                    public AverageAccumulator 
    createAccumulator() {                        
    return new AverageAccumulator();                    }                    @Override                    public AverageAccumulator add(MovieRate movieRate, AverageAccumulator acc) {                        acc.userId = movieRate.userId;                        acc.sum += movieRate.rate;                        acc.count++;                        
    return acc;                    }                    @Override                    public Tuple2
    
      getResult(AverageAccumulator acc) {                        
     return  Tuple2.of(acc.userId, acc.sum/(double)acc.count);                    }                    @Override                    public AverageAccumulator merge(AverageAccumulator acc0, AverageAccumulator acc1) {                        acc0.count += acc1.count;                        acc0.sum += acc1.sum;                        
     return acc0;                    }                }); public static class AverageAccumulator{        int userId;        int count;        double sum;    } 
    
   
  

以下是部分輸出:

...
1> (44,3.0)
4> (96,0.5)
2> (51,0.5)
3> (90,2.75)
...

看上面的代碼,會(huì)發(fā)現(xiàn)add()函數(shù)特別生硬,因?yàn)槲覀兿敕祷豑uple2 類(lèi)型,即Integer為key,但AggregateFunction似乎沒(méi)有提供這個(gè)機(jī)制可以讓AverageAccumulator的構(gòu)造函數(shù)提供參數(shù)。所以,這里引入ProcessWindowFunction與AggregateFunction的結(jié)合版,AggregateFunction進(jìn)行增量疊加,當(dāng)窗口關(guān)閉時(shí),ProcessWindowFunction將會(huì)被提供AggregateFunction返回的結(jié)果,進(jìn)行Tuple封裝:

DataStream
  
   > Rates = rates    .keyBy(MovieRate::getUserId)    .window(TumblingEventTimeWindows.of(Time.milliseconds(5)))    .aggregate(new MyAggregateFunction(), new MyProcessWindowFunction()); public static class MyAggregateFunction implements AggregateFunction
   
     {    @Override    public AverageAccumulator 
    createAccumulator() {        
    return new AverageAccumulator();    }    @Override    public AverageAccumulator add(MovieRate movieRate, AverageAccumulator acc) {        acc.sum += movieRate.rate;        acc.count++;        
    return acc;    }    @Override    public Double getResult(AverageAccumulator acc) {        
    return  acc.sum/(double)acc.count;    }    @Override    public AverageAccumulator merge(AverageAccumulator acc0, AverageAccumulator acc1) {        acc0.count += acc1.count;        acc0.sum += acc1.sum;        
    return acc0;    } } public static class MyProcessWindowFunction extends    ProcessWindowFunction
    
     , Integer, TimeWindow> {    @Override    public void process(Integer key,                        Context context,                        Iterable
     
       results,                        Collector
      
       > out) throws Exception {        Double result = results.iterator().next();        out.collect(new Tuple2(key, result));    } } public static class AverageAccumulator{    int count;    double sum; } 
      
     
    
   
  

可以得到,結(jié)果與上面一樣,但代碼好看了很多。

四、其他操作

4.1 Triggers(觸發(fā)器)

觸發(fā)器定義了窗口何時(shí)準(zhǔn)備好被窗口處理。每個(gè)窗口分配器默認(rèn)都有一個(gè)觸發(fā)器,如果默認(rèn)的觸發(fā)器不符合你的要求,就可以使用trigger(…)自定義觸發(fā)器。

通常來(lái)說(shuō),默認(rèn)的觸發(fā)器適用于多種場(chǎng)景。例如,多有的event-time窗口分配器都有一個(gè)EventTimeTrigger作為默認(rèn)觸發(fā)器。該觸發(fā)器在watermark通過(guò)窗口末尾時(shí)出發(fā)。

PS:GlobalWindow默認(rèn)的觸發(fā)器時(shí)NeverTrigger,該觸發(fā)器從不出發(fā),所以在使用GlobalWindow時(shí)必須自定義觸發(fā)器。

4.2 Evictors(驅(qū)逐器)

Evictors可以在觸發(fā)器觸發(fā)之后以及窗口函數(shù)被應(yīng)用之前和/或之后可選擇的移除元素。使用Evictor可以防止預(yù)聚合,因?yàn)榇翱诘乃性囟急仨氃趹?yīng)用計(jì)算邏輯之前先傳給Evictor進(jìn)行處理

4.3 Allowed Lateness

當(dāng)使用event-time窗口時(shí),元素可能會(huì)晚到,例如Flink用于跟蹤event-time進(jìn)度的watermark已經(jīng)超過(guò)了窗口的結(jié)束時(shí)間戳。

默認(rèn)來(lái)說(shuō),當(dāng)watermark超過(guò)窗口的末尾時(shí),晚到的元素會(huì)被丟棄。但是flink也允許為窗口operator指定最大的allowed lateness,以至于可以容忍在徹底刪除元素之前依然接收晚到的元素,其默認(rèn)值是0。

為了支持該功能,F(xiàn)link會(huì)保持窗口的狀態(tài),知道allowed lateness到期。一旦到期,flink會(huì)刪除窗口并刪除其狀態(tài)。

把晚到的元素當(dāng)作side output。

SingleOutputStreamOperator
  
    result = input    .keyBy(
   
    )    .window(
    
     )    .allowedLateness(
     )    .sideOutputLateData(lateOutputTag)    .
      
       (
       
        function>); 
       
      
    
   
  

網(wǎng)頁(yè)名稱(chēng):詳解Flink的窗口操作
文章來(lái)源:http://uogjgqi.cn/article/dpidpsh.html
掃二維碼與項(xiàng)目經(jīng)理溝通

我們?cè)谖⑿派?4小時(shí)期待你的聲音

解答本文疑問(wèn)/技術(shù)咨詢(xún)/運(yùn)營(yíng)咨詢(xún)/技術(shù)建議/互聯(lián)網(wǎng)交流