成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

徹底搞清 Flink 中的 Window 機制

系統 Windows
在流處理應用中,數據是連續不斷的,有時我們需要做一些聚合類的處理,例如:在過去的1分鐘內有多少用戶點擊了我們的網頁。

[[432700]]

一、 為什么需要Window

在流處理應用中,數據是連續不斷的,有時我們需要做一些聚合類的處理,例如:在過去的1分鐘內有多少用戶點擊了我們的網頁。

在這種情況下,我們必須定義一個窗口(window),用來收集最近1分鐘內的數據,并對這個窗口內的數據進行計算

二、Window的分類

2.1 按照time和count分類

time-window:時間窗口:根據時間劃分窗口,如:每xx分鐘統計最近xx分鐘的數據

count-window:數量窗口:根據數量劃分窗口,如:每xx個數據統計最近xx個數據

2.2 按照slide和size分類

窗口有兩個重要的屬性: 窗口大小size和滑動間隔slide,根據它們的大小關系可分為:

tumbling-window:滾動窗口:size=slide,如:每隔10s統計最近10s的數據

sliding-window:滑動窗口:size>slide,如:每隔5s統計最近10s數據

注意:當size<slide的時候,如每隔15s統計最近10s的數據,那么中間5s

小結

按照上面窗口的分類方式進行組合,可以得出如下的窗口:

  • 基于時間的滾動窗口tumbling-time-window--用的較多
  • 基于時間的滑動窗口sliding-time-window--用的較多
  • 基于數量的滾動窗口tumbling-count-window--用的較少
  • 基于數量的滑動窗口sliding-count-window--用的較少

注意:Flink還支持一個特殊的窗口:Session會話窗口,需要設置一個會話超時時間,如30s,則表示30s內沒有數據到來,則觸發上個窗口的計算

三、WindowAPI

3.1 window和windowAll

使用keyby的流,應該使用window方法

未使用keyby的流,應該調用windowAll方法

區別:

Window算子:是可以設置并行度的

WindowAll 算子:并行度始終為1

3.2 WindowAssigner

Windows Assigner的作用是指定窗口的類型,定義如何將數據流分配到一個或者多個窗口,API中通過window (WindowsAssigner assigner)指定。在Flink中支持兩種類型的窗口,一種是基于時間的窗口(TimeWindow),另一種是基于數量的窗口(countWindow)。窗口所表現出的類型特性取決于window assigner的定義。

Flink底層Window模型僅有TimeWindow以及GlobalWindow。

Flink提供了很多各種場景用的WindowAssigner:

如果需要自定制數據分發策略,則可以實現一個 class,繼承自 WindowAssigner。

3.3 evictor

evictor 主要用于做一些數據的自定義操作,可以在執行用戶代碼之前,也可以在執行

用戶代碼之后,更詳細的描述可以參考org.apache.flink.streaming.api.windowing.evictors.Evictor 的 evicBefore 和 evicAfter兩個方法。

Flink 提供了如下三種通用的 evictor:

CountEvictor 保留指定數量的元素

TimeEvictor 設定一個閾值 interval,刪除所有不再 max_ts - interval 范圍內的元

素,其中 max_ts 是窗口內時間戳的最大值。

DeltaEvictor 通過執行用戶給定的 DeltaFunction 以及預設的 theshold,判斷是否刪 除一個元素。

3.4 trigger

trigger 用來判斷一個窗口是否需要被觸發,每個 WindowAssigner 都自帶一個默認的trigger,

如果默認的 trigger 不能滿足你的需求,則可以自定義一個類,繼承自Trigger 即可,我們詳細描述下 Trigger 的接口以及含義:

onEventTime() 當 event-time timer 被觸發的時候會調用

onElement() 每次往 window 增加一個元素的時候都會觸發

onMerge() 對兩個 `rigger 的 state 進行 merge 操作

clear() window 銷毀的時候被調用

上面的接口中前三個會返回一個 TriggerResult, TriggerResult 有如下幾種可能的選 擇:

  • CONTINUE 不做任何事情
  • FIRE 觸發 window
  • PURGE 清空整個 window 的元素并銷毀窗口
  • PURGE 清空整個 window 的元素并銷毀窗口

四、WindowAPI調用案例示例

4.1 基于時間的滾動和滑動窗口

測試數據

  1. 信號燈編號和通過該信號燈的車的數量 
  2. 9,3 
  3. 9,2 
  4. 9,7 
  5. 4,9 
  6. 2,6 
  7. 1,5 
  8. 2,3 
  9. 5,7 
  10. 5,4 

需求1:每5秒鐘統計一次,最近5秒鐘內,各個路口通過紅綠燈汽車的數量--基于時間的滾動窗口

需求2:每5秒鐘統計一次,最近10秒鐘內,各個路口通過紅綠燈汽車的數量--基于時間的滑動窗口

  1. package com.flink.source 
  2.  
  3. import org.apache.flink.api.common.functions.MapFunction 
  4. import org.apache.flink.streaming.api.scala._ 
  5. import org.apache.flink.streaming.api.windowing.assigners.{SlidingProcessingTimeWindows, TumblingProcessingTimeWindows} 
  6. import org.apache.flink.streaming.api.windowing.time.Time
  7.  
  8. /** 
  9.  * @Package com.flink.source 
  10.  * @File :WindowDemo_TimeWindow.java 
  11.  * @author 大數據老哥 
  12.  * @date 2021/10/26 10:50 
  13.  * @version V1.0 
  14.  */ 
  15. object WindowDemo_TimeWindow { 
  16.   def main(args: Array[String]): Unit = { 
  17.     val env = StreamExecutionEnvironment.getExecutionEnvironment 
  18.  
  19.     val socketData = env.socketTextStream("192.168.100.101", 9999) 
  20.     val socketMap = socketData.map(new MapFunction[String, CartInfo]() { 
  21.       override def map(t: String): CartInfo = { 
  22.         val arr = t.split(","
  23.         CartInfo(arr(0), arr(1).toInt) 
  24.       } 
  25.     }) 
  26.     //需求1:每5秒鐘統計一次,最近5秒鐘內,各個路口通過紅綠燈汽車的數量 
  27.     val result = socketMap.keyBy(_.sensorId).window(TumblingProcessingTimeWindows.of(Time.seconds(5))).sum("count"
  28.     //需求2:每5秒鐘統計一次,最近10秒鐘內,各個路口通過紅綠燈汽車的數量 
  29.     val result2 = socketMap.keyBy(_.sensorId).window(SlidingProcessingTimeWindows.of(Time.seconds(5), Time.seconds(10))).sum("count"
  30.     result.print() 
  31.     result2.print() 
  32.     env.execute("winds"
  33.   } 
  34.  
  35.  
  36. case class CartInfo(var sensorId: String, var countInt

4.2 基于數量的滾動和滑動窗口

測試數據

  1. 信號燈編號和通過該信號燈的車的數量 
  2. 9,3 
  3. 9,2 
  4. 9,7 
  5. 4,9 
  6. 2,6 
  7. 1,5 
  8. 2,3 
  9. 5,7 
  10. 5,4 

需求1:統計在最近5條消息中,各自路口通過的汽車數量,相同的key每出現5次進行統計--基于數量的滾動窗口

需求2:統計在最近5條消息中,各自路口通過的汽車數量,相同的key每出現3次進行統計--基于數量的滑動窗口

  1. package com.flink.source 
  2.  
  3. import org.apache.flink.api.common.functions.MapFunction 
  4. import org.apache.flink.streaming.api.scala._ 
  5.  
  6. /** 
  7.  * @Package com.flink.source 
  8.  * @File :WindosDemoo_CountWindos.java 
  9.  * @author 大數據老哥 
  10.  * @date 2021/10/26 14:04 
  11.  * @version V1.0 
  12.  */ 
  13. object WindowDemo_CountWindow { 
  14.   def main(args: Array[String]): Unit = { 
  15.     val env = StreamExecutionEnvironment.getExecutionEnvironment 
  16.     val socketData = env.socketTextStream("192.168.100.101", 9999) 
  17.     val socketMap = socketData.map(new MapFunction[String, CartInfo] { 
  18.       override def map(t: String): CartInfo = { 
  19.         val arr = t.split(","
  20.         CartInfo(arr(0), arr(1).toInt) 
  21.       } 
  22.     }) 
  23.      // 需求1:統計在最近5條消息中,各自路口通過的汽車數量,相同的key每出現5次進行統計 
  24.     val result = socketMap.keyBy(_.sensorId).countWindow(5L).sum("count"
  25.      // 需求2:統計在最近5條消息中,各自路口通過的汽車數量,相同的key每出現3次進行統計 
  26.     val result2 = socketMap.keyBy(_.sensorId).countWindow(5L,3L).sum("count"
  27.     result.print("result"
  28.     result2.print("result2"
  29.     env.execute() 
  30.  
  31.   } 
  32. case class CartInfo(var sensorId: String, var countInt

case class CartInfo(var sensorId: String, var count: Int)

4.3 會話窗口

測試數據

  1. 信號燈編號和通過該信號燈的車的數量 
  2. 9,3 
  3. 9,2 
  4. 9,7 
  5. 4,9 
  6. 2,6 
  7. 1,5 
  8. 2,3 
  9. 5,7 
  10. 5,4 

設置會話超時時間為10s,10s內沒有數據到來,則觸發上個窗口的計算

  1. package com.flink.source 
  2.  
  3. import org.apache.flink.api.common.functions.MapFunction 
  4. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator 
  5. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment 
  6. import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows 
  7. import org.apache.flink.streaming.api.windowing.time.Time 
  8.  
  9.   
  10. /** 
  11.  * @Package com.flink.source 
  12.  * @File :WindowDemo_SessionWindow.java 
  13.  * @author 大數據老哥 
  14.  * @date 2021/11/1 16:10 
  15.  * @version V1.0 
  16.  */ 
  17. object WindowDemo_SessionWindow { 
  18.   def main(args: Array[String]): Unit = { 
  19.     val env = StreamExecutionEnvironment.getExecutionEnvironment 
  20.  
  21.     val socketData = env.socketTextStream("192.168.100.101", 9999) 
  22.     val socketMap: SingleOutputStreamOperator[CartInfo] = socketData.map(new MapFunction[String, CartInfo]() { 
  23.       override def map(t: String): CartInfo = { 
  24.         val arr = t.split(","
  25.         CartInfo(arr(0), arr(1).toInt) 
  26.       } 
  27.     }) 
  28.     //設置會話超時時間為10s,10s內沒有數據到來,則觸發上個窗口的計算 
  29.     val result = socketMap.keyBy(0) 
  30.       .window(ProcessingTimeSessionWindows.withGap(Time.seconds(10))) 
  31.       .sum("count"
  32.     result.print() 
  33.     env.execute("winds"
  34.   } 
  35.  
  36.  
  37. case class CartInfo(var sensorId: String, var countInt

 

責任編輯:武曉燕 來源: 大數據老哥
相關推薦

2024-02-27 08:05:32

Flink分區機制數據傳輸

2020-11-02 11:40:24

Node.jsRequire前端

2022-01-14 07:56:38

Checkpoint機制Flink

2024-06-21 08:32:24

2020-06-03 08:19:00

Kubernetes

2022-04-25 09:03:16

JavaScript代碼

2024-05-11 08:31:20

中斷機制插隊機制React

2023-03-22 18:34:30

Flink調度部署

2021-09-12 07:01:07

Flink SQL ETL datastream

2022-05-19 08:47:30

Flinkwatermark窗口計算

2021-12-29 17:29:07

KubernetesEvents集群

2020-10-14 09:11:44

IO 多路復用實現機

2024-05-28 08:02:08

Vue3父組件子組件

2018-07-19 10:16:25

華光昱能

2022-08-16 09:03:01

JavaScript前端

2021-09-04 07:29:57

Android

2023-04-12 08:38:44

函數參數Context

2018-11-30 09:03:55

HTTP緩存Web

2024-05-17 10:05:06

Java機制應用

2024-04-09 07:50:59

Flink語義Watermark
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 伊人久久大香线 | 日本电影韩国电影免费观看 | 国产一区精品在线 | 狠狠操电影 | 欧美一区二区免费 | 成人在线视频免费观看 | 国产精品成人一区二区三区 | 亚洲精品乱码久久久久久黑人 | 亚洲 日本 欧美 中文幕 | 羞羞视频网页 | 日韩高清av | 欧美日韩在线观看视频网站 | 国产一级特黄真人毛片 | 欧美在线一区二区三区 | 国产一区二区三区四区在线观看 | 亚洲电影一区 | 久久高清免费视频 | 日韩中文字幕区 | 伊人一二三 | 亚洲一区 | a在线视频| 免费成人高清在线视频 | 夜夜爽99久久国产综合精品女不卡 | 麻豆va| 四季久久免费一区二区三区四区 | 91极品尤物在线播放国产 | 成人自拍av| 精品国产一区探花在线观看 | 伊人春色在线观看 | 欧美成年黄网站色视频 | 欧美在线观看免费观看视频 | 欧美13videosex性极品 | 国产激情视频在线 | 国产高清久久 | 国产精品久久久久一区二区三区 | 在线播放91 | 黄色大全免费看 | 久久久九九九九 | 久久蜜桃av一区二区天堂 | 国产精品黄 | 天堂影院av |