成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

以直播平臺監控用戶彈幕為例詳解Flink CEP

開發 架構
本文不分析彈幕數據的應用價值,只通過彈幕內容審核與監控案例來了解下Flink CEP的概念及功能。

[[393484]]

本文轉載自微信公眾號「五分鐘學大數據」,作者五分鐘學大數據。轉載本文請聯系五分鐘學大數據公眾號。

我們在看直播的時候,不管對于主播還是用戶來說,非常重要的一項就是彈幕文化。為了增加直播趣味性和互動性, 各大網絡直播平臺紛紛采用彈窗彈幕作為用戶實時交流的方式,內容豐富且形式多樣的彈幕數據中隱含著復雜的用戶屬性與用戶行為, 研究并理解在線直播平臺用戶具有彈幕內容審核與監控、輿論熱點預測、個性化摘要標注等多方面的應用價值。

本文不分析彈幕數據的應用價值,只通過彈幕內容審核與監控案例來了解下Flink CEP的概念及功能。

在用戶發彈幕時,直播平臺主要實時監控識別兩類彈幕內容:一類是發布不友善彈幕的用戶 ;一類是刷屏的用戶。

我們先記住上述需要實時監控識別的兩類用戶,接下來介紹Flink CEP的API,然后使用CEP解決上述問題。

Flink CEP

Flink CEP 是什么

Flink CEP是一個基于Flink的復雜事件處理庫,可以從多個數據流中發現復雜事件,識別有意義的事件(例如機會或者威脅),并盡快的做出響應,而不是需要等待幾天或則幾個月相當長的時間,才發現問題。

Flink CEP API

CEP API的核心是Pattern(模式) API,它允許你快速定義復雜的事件模式。每個模式包含多個階段(stage)或者我們也可稱為狀態(state)。從一個狀態切換到另一個狀態,用戶可以指定條件,這些條件可以作用在鄰近的事件或獨立事件上。

介紹API之前先來理解幾個概念:

1. 模式與模式序列

  • 簡單模式稱為模式,將最終在數據流中進行搜索匹配的復雜模式序列稱為模式序列,每個復雜模式序列是由多個簡單模式組成。
  • 匹配是一系列輸入事件,這些事件通過一系列有效的模式轉換,能夠訪問復雜模式圖的所有模式。
  • 每個模式必須具有唯一的名稱,我們可以使用模式名稱來標識該模式匹配到的事件。

2. 單個模式

一個模式既可以是單例的,也可以是循環的。單例模式接受單個事件,循環模式可以接受多個事件。

3. 模式示例:

有如下模式:a b+ c?d

其中a,b,c,d這些字母代表的是模式,+代表循環,b+就是循環模式;?代表可選,c?就是可選模式;

所以上述模式的意思就是:a后面可以跟一個或多個b,后面再可選的跟c,最后跟d。

其中a、c? 、d是單例模式,b+是循環模式。

一般情況下,模式都是單例模式,可以使用量詞(Quantifiers)將其轉換為循環模式。

每個模式可以帶有一個或多個條件,這些條件是基于事件接收進行定義的。或者說,每個模式通過一個或多個條件來匹配和接收事件。

了解完上述概念后,接下來介紹下案例中需要用到的幾個CEP API:

案例中用到的CEP API:

  • Begin:定義一個起始模式狀態

用法:start = Pattern.<:Event>begin("start");

  • Next:附加一個新的模式狀態。匹配事件必須直接接續上一個匹配事件

用法:next = start.next("next");

  • Where:定義當前模式狀態的過濾條件。僅當事件通過過濾器時,它才能與狀態匹配

用法:patternState.where(_.message == "TMD");

  • Within: 定義事件序列與模式匹配的最大時間間隔。如果未完成的事件序列超過此時間,則將其丟棄

用法:patternState.within(Time.seconds(10));

  • Times:一個給定類型的事件出現了指定次數

用法:patternState.times(5);

API 先介紹以上這幾個,接下來我們解決下文章開頭提到的案例:

監測用戶彈幕行為案例

案例一:監測惡意用戶

規則:用戶如果在10s內,同時輸入 TMD 超過5次,就認為用戶為惡意攻擊,識別出該用戶。

使用 Flink CEP 檢測惡意用戶:

  1. import org.apache.flink.api.scala._ 
  2. import org.apache.flink.cep.PatternSelectFunction 
  3. import org.apache.flink.cep.scala.{CEP, PatternStream} 
  4. import org.apache.flink.cep.scala.pattern.Pattern 
  5. import org.apache.flink.streaming.api.TimeCharacteristic 
  6. import org.apache.flink.streaming.api.scala.{DataStream, OutputTag, StreamExecutionEnvironment} 
  7. import org.apache.flink.streaming.api.windowing.time.Time 
  8.  
  9. object BarrageBehavior01 { 
  10.   case class  LoginEvent(userId:String, message:String, timestamp:Long){ 
  11.     override def toString: String = userId 
  12.   } 
  13.  
  14.   def main(args: Array[String]): Unit = { 
  15.     val env = StreamExecutionEnvironment.getExecutionEnvironment 
  16.  
  17.     // 使用IngestionTime作為EventTime 
  18.     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) 
  19.  
  20.     // 用于觀察測試數據處理順序 
  21.     env.setParallelism(1) 
  22.  
  23.     // 模擬數據源 
  24.     val loginEventStream: DataStream[LoginEvent] = env.fromCollection( 
  25.       List( 
  26.         LoginEvent("1""TMD", 1618498576), 
  27.         LoginEvent("1""TMD", 1618498577), 
  28.         LoginEvent("1""TMD", 1618498579), 
  29.         LoginEvent("1""TMD", 1618498582), 
  30.         LoginEvent("2""TMD", 1618498583),  
  31.         LoginEvent("1""TMD", 1618498585) 
  32.       ) 
  33.     ).assignAscendingTimestamps(_.timestamp * 1000) 
  34.  
  35.     //定義模式 
  36.     val loginEventPattern: Pattern[LoginEvent, LoginEvent] = Pattern.begin[LoginEvent]("begin"
  37.       .where(_.message == "TMD"
  38.       .times(5) 
  39.       .within(Time.seconds(10)) 
  40.  
  41.     //匹配模式 
  42.     val patternStream: PatternStream[LoginEvent] = CEP.pattern(loginEventStream.keyBy(_.userId), loginEventPattern) 
  43.  
  44.     import scala.collection.Map 
  45.     val result = patternStream.select((pattern:Map[String, Iterable[LoginEvent]])=> { 
  46.       val first = pattern.getOrElse("begin"null).iterator.next() 
  47.       (first.userId, first.timestamp
  48.     }) 
  49.     //惡意用戶,實際處理可將按用戶進行禁言等處理,為簡化此處僅打印出該用戶 
  50.     result.print("惡意用戶>>>"
  51.     env.execute("BarrageBehavior01"
  52.   } 

實例二:監測刷屏用戶

規則:用戶如果在10s內,同時連續輸入同樣一句話超過5次,就認為是惡意刷屏。

使用 Flink CEP檢測刷屏用戶

  1. object BarrageBehavior02 { 
  2.   case class Message(userId: String, ip: String, msg: String) 
  3.  
  4.   def main(args: Array[String]): Unit = { 
  5.     //初始化運行環境 
  6.     val env = StreamExecutionEnvironment.getExecutionEnvironment 
  7.  
  8.     //設置并行度 
  9.     env.setParallelism(1) 
  10.  
  11.     // 模擬數據源 
  12.     val loginEventStream: DataStream[Message] = env.fromCollection( 
  13.       List( 
  14.         Message("1""192.168.0.1""beijing"), 
  15.         Message("1""192.168.0.2""beijing"), 
  16.         Message("1""192.168.0.3""beijing"), 
  17.         Message("1""192.168.0.4""beijing"), 
  18.         Message("2""192.168.10.10""shanghai"), 
  19.         Message("3""192.168.10.10""beijing"), 
  20.         Message("3""192.168.10.11""beijing"), 
  21.         Message("4""192.168.10.10""beijing"), 
  22.         Message("5""192.168.10.11""shanghai"), 
  23.         Message("4""192.168.10.12""beijing"), 
  24.         Message("5""192.168.10.13""shanghai"), 
  25.         Message("5""192.168.10.14""shanghai"), 
  26.         Message("5""192.168.10.15""beijing"), 
  27.         Message("6""192.168.10.16""beijing"), 
  28.         Message("6""192.168.10.17""beijing"), 
  29.         Message("6""192.168.10.18""beijing"), 
  30.         Message("5""192.168.10.18""shanghai"), 
  31.         Message("6""192.168.10.19""beijing"), 
  32.         Message("6""192.168.10.19""beijing"), 
  33.         Message("5""192.168.10.18""shanghai"
  34.       ) 
  35.     ) 
  36.  
  37.     //定義模式 
  38.     val loginbeijingPattern = Pattern.begin[Message]("start"
  39.       .where(_.msg != null) //一條登錄失敗 
  40.       .times(5).optional  //將滿足五次的數據配對打印 
  41.       .within(Time.seconds(10)) 
  42.  
  43.     //進行分組匹配 
  44.     val loginbeijingDataPattern = CEP.pattern(loginEventStream.keyBy(_.userId), loginbeijingPattern) 
  45.  
  46.     //查找符合規則的數據 
  47.     val loginbeijingResult: DataStream[Option[Iterable[Message]]] = loginbeijingDataPattern.select(patternSelectFun = (pattern: collection.Map[String, Iterable[Message]]) => { 
  48.       var loginEventList: Option[Iterable[Message]] = null 
  49.       loginEventList = pattern.get("start") match { 
  50.         case Some(value) => { 
  51.           if (value.toList.map(x => (x.userId, x.msg)).distinct.size == 1) { 
  52.             Some(value) 
  53.           } else { 
  54.             None 
  55.           } 
  56.         } 
  57.       } 
  58.       loginEventList 
  59.     }) 
  60.  
  61.     //打印測試 
  62.     loginbeijingResult.filter(x=>x!=None).map(x=>{ 
  63.       x match { 
  64.         case Some(value)=> value 
  65.       } 
  66.     }).print() 
  67.  
  68.     env.execute("BarrageBehavior02) 
  69.   } 

Flink CEP API

除了案例中介紹的幾個API外,我們在介紹下其他的常用API:

1. 條件 API

為了讓傳入事件被模式所接受,給模式指定傳入事件必須滿足的條件,這些條件由事件本身的屬性或者前面匹配過的事件的屬性統計量等來設定。比如,事件的某個值大于5,或者大于先前接受事件的某個值的平均值。

可以使用pattern.where()、pattern.or()、pattern.until()方法來指定條件。條件既可以是迭代條件IterativeConditions,也可以是簡單條件SimpleConditions。

FlinkCEP支持事件之間的三種臨近條件:

  • next():嚴格的滿足條件

示例:模式為begin("first").where(_.name='a').next("second").where(.name='b')當且僅當數據為a,b時,模式才會被命中。如果數據為a,c,b,由于a的后面跟了c,所以a會被直接丟棄,模式不會命中。

  • followedBy():松散的滿足條件

示例:模式為begin("first").where(_.name='a').followedBy("second").where(.name='b')當且僅當數據為a,b或者為a,c,b,模式均被命中,中間的c會被忽略掉。

  • followedByAny():非確定的松散滿足條件

示例:模式為begin("first").where(_.name='a').followedByAny("second").where(.name='b')當且僅當數據為a,c,b,b時,對于followedBy模式而言命中的為{a,b},對于followedByAny而言會有兩次命中{a,b},{a,b}。

2. 量詞 API

還記得我們在上面講解模式概念時說過的一句話:一般情況下,模式都是單例模式,可以使用量詞(Quantifiers)將其轉換為循環模式。這里的量詞就是指的量詞API。

以下這幾個量詞API,可以將模式指定為循環模式:

  • pattern.oneOrMore():一個給定的事件有一次或多次出現,例如上面提到的b+。
  • pattern.times(#ofTimes):一個給定類型的事件出現了指定次數,例如4次。
  • pattern.times(#fromTimes, #toTimes):一個給定類型的事件出現的次數在指定次數范圍內,例如2~4次。
  • 可以使用pattern.greedy()方法將模式變成循環模式,但是不能讓一組模式都變成循環模式。greedy:就是盡可能的重復。
  • 使用pattern.optional()方法將循環模式變成可選的,即可以是循環模式也可以是單個模式。

3. 匹配后的跳過策略

所謂的匹配跳過策略,是對多個成功匹配的模式進行篩選。也就是說如果多個匹配成功,可能我不需要這么多,按照匹配策略,過濾下就可以。

Flink中有五種跳過策略:

  • NO_SKIP: 不過濾,所有可能的匹配都會被發出。
  • SKIP_TO_NEXT: 丟棄與開始匹配到的事件相同的事件,發出開始匹配到的事件,即直接跳到下一個模式匹配到的事件,以此類推。
  • SKIP_PAST_LAST_EVENT: 丟棄匹配開始后但結束之前匹配到的事件。
  • SKIP_TO_FIRST[PatternName]: 丟棄匹配開始后但在PatternName模式匹配到的第一個事件之前匹配到的事件。
  • SKIP_TO_LAST[PatternName]: 丟棄匹配開始后但在PatternName模式匹配到的最后一個事件之前匹配到的事件。

怎么理解上述策略,我們以NO_SKIP和SKIP_PAST_LAST_EVENT為例講解下:

在模式為:begin("start").where(_.name='a').oneOrMore().followedBy("second").where(_.name='b')中,我們輸入數據:a,a,a,a,b ,如果是NO_SKIP策略,即不過濾策略,模式匹配到的是:{a,b},{a,a,b},{a,a,a,b},{a,a,a,a,b};如果是SKIP_PAST_LAST_EVENT策略,即丟棄匹配開始后但結束之前匹配到的事件,模式匹配到的是:{a,a,a,a,b}。

Flink CEP 的使用場景

除上述案例場景外,Flink CEP 還廣泛用于網絡欺詐,故障檢測,風險規避,智能營銷等領域。

1. 實時反作弊和風控

對于電商來說,羊毛黨是必不可少的,國內拼多多曾爆出 100 元的無門檻券隨便領,當晚被人褥幾百億,對于這種情況肯定是沒有做好及時的風控。另外還有就是商家上架商品時通過頻繁修改商品的名稱和濫用標題來提高搜索關鍵字的排名、批量注冊一批機器賬號快速刷單來提高商品的銷售量等作弊行為,各種各樣的作弊手法也是需要不斷的去制定規則去匹配這種行為。

2. 實時營銷

分析用戶在手機 APP 的實時行為,統計用戶的活動周期,通過為用戶畫像來給用戶進行推薦。比如用戶在登錄 APP 后 1 分鐘內只瀏覽了商品沒有下單;用戶在瀏覽一個商品后,3 分鐘內又去查看其他同類的商品,進行比價行為;用戶商品下單后 1 分鐘內是否支付了該訂單。如果這些數據都可以很好的利用起來,那么就可以給用戶推薦瀏覽過的類似商品,這樣可以大大提高購買率。

3. 實時網絡攻擊檢測

當下互聯網安全形勢仍然嚴峻,網絡攻擊屢見不鮮且花樣眾多,這里我們以 DDOS(分布式拒絕服務攻擊)產生的流入流量來作為遭受攻擊的判斷依據。對網絡遭受的潛在攻擊進行實時檢測并給出預警,云服務廠商的多個數據中心會定時向監控中心上報其瞬時流量,如果流量在預設的正常范圍內則認為是正常現象,不做任何操作;如果某數據中心在 10 秒內連續 5 次上報的流量超過正常范圍的閾值,則觸發一條警告的事件;如果某數據中心 30 秒內連續出現 30 次上報的流量超過正常范圍的閾值,則觸發嚴重的告警。

Flink CEP 的原理簡單介紹

Apache Flink在實現CEP時借鑒了Efficient Pattern Matching over Event Streams論文中NFA的模型,在這篇論文中,還提到了一些優化,我們在這里先跳過,只說下NFA的概念。

在這篇論文中,提到了NFA,也就是Non-determined Finite Automaton,叫做不確定的有限狀態機,指的是狀態有限,但是每個狀態可能被轉換成多個狀態(不確定)。

非確定有限自動狀態機:

先介紹兩個概念:

  • 狀態:狀態分為三類,起始狀態、中間狀態和最終狀態。
  • 轉換:take/ignore/proceed都是轉換的名稱。

在NFA匹配規則里,本質上是一個狀態轉換的過程。三種轉換的含義如下所示:

  • Take: 主要是條件的判斷,當過來一條數據進行判斷,一旦滿足條件,獲取當前元素,放入到結果集中,然后將當前狀態轉移到下一個的狀態。
  • Proceed:當前的狀態可以不依賴任何的事件轉移到下一個狀態,比如說透傳的意思。
  • Ignore:當一條數據到來的時候,可以忽略這個消息事件,當前的狀態保持不變,相當于自己到自己的一個狀態。

NFA的特點:在NFA中,給定當前狀態,可能有多個下一個狀態。可以隨機選擇下一個狀態,也可以并行(同時)選擇下一個狀態。輸入符號可以為空。

規則引擎

規則引擎:將業務決策從應用程序代碼中分離出來,并使用預定義的語義模塊編寫業務決策。接受數據輸入,解釋業務規則,并根據業務規則做出業務決策。

使用規則引擎可以通過降低實現復雜業務邏輯的組件的復雜性,降低應用程序的維護和可擴展性成本。

1. Drools

Drools 是一款使用 Java 編寫的開源規則引擎,通常用來解決業務代碼與業務規則的分離,它內置的 Drools Fusion 模塊也提供 CEP 的功能。

優勢:

  • 功能較為完善,具有如系統監控、操作平臺等功能。
  • 規則支持動態更新。

劣勢:

  • 以內存實現時間窗功能,無法支持較長跨度的時間窗。
  • 無法有效支持定時觸達(如用戶在瀏覽發生一段時間后觸達條件判斷)。

2. Aviator

Aviator 是一個高性能、輕量級的 Java 語言實現的表達式求值引擎,主要用于各種表達式的動態求值。

優勢:

  • 支持大部分運算操作符。
  • 支持函數調用和自定義函數。
  • 支持正則表達式匹配。
  • 支持傳入變量并且性能優秀。

劣勢:

沒有 if else、do while 等語句,沒有賦值語句,沒有位運算符。

3. EasyRules

EasyRules 集成了 MVEL 和 SpEL 表達式的一款輕量級規則引擎。

優勢:

  • 輕量級框架,學習成本低。
  • 基于 POJO。
  • 為定義業務引擎提供有用的抽象和簡便的應用。
  • 支持從簡單的規則組建成復雜規則。

4. Esper

Esper 設計目標為 CEP 的輕量級解決方案,可以方便的嵌入服務中,提供 CEP 功能。

優勢:

  • 輕量級可嵌入開發,常用的 CEP 功能簡單好用。
  • EPL 語法與 SQL 類似,學習成本較低。

劣勢:

  • 單機全內存方案,需要整合其他分布式和存儲。
  • 以內存實現時間窗功能,無法支持較長跨度的時間窗。
  • 無法有效支持定時觸達(如用戶在瀏覽發生一段時間后觸達條件判斷)。

5. Flink CEP

Flink 是一個流式系統,具有高吞吐低延遲的特點,Flink CEP 是一套極具通用性、易于使用的實時流式事件處理方案。

優勢:

  • 繼承了 Flink 高吞吐的特點。
  • 事件支持存儲到外部,可以支持較長跨度的時間窗。
  • 可以支持定時觸達(用 followedBy + PartternTimeoutFunction 實現)。

 

責任編輯:武曉燕 來源: 五分鐘學大數據
相關推薦

2013-02-18 10:12:58

Apache服務器訪問動態網站

2016-12-05 14:03:07

Flink大數據

2016-12-06 20:03:48

Flink流處理謬見

2021-08-02 09:50:47

Vetur源碼SMART

2016-08-27 20:40:02

直播彈幕

2009-06-05 10:06:19

互聯網

2016-12-20 12:34:46

存儲MySQL流程

2015-12-01 18:08:26

EMUI 4.0

2021-05-31 08:00:00

消息隊列架構Rabbit MQ

2018-08-22 16:40:51

前端JavascriptVue

2019-05-20 08:20:40

數據集數據可視化數據

2017-12-18 18:13:54

聯想

2018-12-14 09:39:07

軟件開發用戶迭代

2011-07-08 09:55:02

數據中心防震

2018-11-13 14:47:13

2017-08-15 11:04:05

機器學習實際場景

2015-12-11 10:27:50

易維幫助臺/Helpd

2022-02-14 14:28:57

驅動開發鴻蒙系統

2025-05-13 08:09:56

2022-01-10 12:23:00

TypeScript ESLint前端
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 激情的网站 | 亚洲天堂av网 | 日韩精品视频网 | 国产精品精品久久久久久 | 男女精品网站 | 免费在线观看av网站 | 久久青视频 | 一区二区高清不卡 | 黄在线 | 成人久久久 | 久久久久久国产 | 日韩av成人在线 | 国产一区二区视频免费在线观看 | 亚洲视频一区在线观看 | 一区中文字幕 | 欧美一区二区三区大片 | 在线啊v| 国产精品成人一区二区三区夜夜夜 | 精品九九久久 | 日韩一区二区免费视频 | 日韩欧美一二三区 | 日韩网站在线 | 日韩欧美一区二区三区 | 久久久久一区二区三区 | 99精品一区二区三区 | 欧美在线综合 | 国产美女黄色片 | 日本三级网址 | 国产精品毛片无码 | 久久精品国产99国产精品 | 成人国产精品久久 | 久久精品国产一区二区三区 | 在线视频91| 不卡的av在线 | 精品免费视频一区二区 | 亚洲精品区 | caoporn国产精品免费公开 | av大片| h视频免费在线观看 | 久久一区二区av | 日本亚洲精品成人欧美一区 |