Flink ProcessFunction 介绍使用
目錄
實現功能
代碼
測試
問題
官網描述:https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/stream/operators/process_function.html
The?ProcessFunction?is a low-level stream processing operation, giving access to the basic building blocks of all (acyclic) streaming applications:
- events (stream elements)
- state (fault-tolerant, consistent, only on keyed stream)
- timers (event time and processing time, only on keyed stream)
The?ProcessFunction?can be thought of as a?FlatMapFunction?with access to keyed state and timers. It handles events by being invoked for each event received in the input stream(s).
For fault-tolerant state, the?ProcessFunction?gives access to Flink’s?keyed state, accessible via the?RuntimeContext, similar to the way other stateful functions can access keyed state.
The timers allow applications to react to changes in processing time and in?event time. Every call to the function?processElement(...)?gets a?Context?object which gives access to the element’s event time timestamp, and to the?TimerService. The?TimerService?can be used to register callbacks for future event-/processing-time instants. With event-time timers, the?onTimer(...)?method is called when the current watermark is advanced up to or beyond the timestamp of the timer, while with processing-time timers,?onTimer(...)?is called when wall clock time reaches the specified time. During that call, all states are again scoped to the key with which the timer was created, allowing timers to manipulate keyed state.
?
ProcessFunction是一個低階的流處理操作,它可以訪問流處理程序的基礎構建模塊:
1.事件(event)(流元素)。
2.狀態(state)(容錯性,一致性,僅在keyed stream中)。
3.定時器(timers)(event time和processing time, 僅在keyed stream中)。
?
state和timers 僅在keyed stream中使用,這里我們先介紹KeyedProcessFunction方法使用
實現功能
通過socketTextStream讀取9999端口數據,統計在一定時間內不同類型商品的銷售總額度,如果持續銷售額度為0,則執行定時器通知老板,是不是賣某種類型商品的員工偷懶了(只做功能演示,根據個人業務來使用,比如統計UV等操作)
代碼
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.tuple.Tuple import org.apache.flink.api.scala.typeutils.Types import org.apache.flink.streaming.api.functions.KeyedProcessFunction import org.apache.flink.streaming.api.scala._ import org.apache.flink.util.Collectorobject ProcessFuncationScala {def main(args: Array[String]): Unit = {val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentval stream: DataStream[String] = env.socketTextStream("localhost", 9999)val typeAndData: DataStream[(String, String)] = stream.map(x => (x.split(",")(0), x.split(",")(1))).setParallelism(4)typeAndData.keyBy(0).process(new MyprocessFunction()).print("結果")env.execute()}/*** 實現:* 根據key分類,統計每個key進來的數據量,定期統計數量,如果數量為0則預警*/class MyprocessFunction extends KeyedProcessFunction[Tuple,(String,String),String]{//統計間隔時間val delayTime : Long = 1000 * 10lazy val state : ValueState[(String,Long)] = getRuntimeContext.getState[(String,Long)](new ValueStateDescriptor[(String, Long)]("cjcount",classOf[Tuple2[String,Long]]))override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Tuple, (String, String), String]#OnTimerContext, out: Collector[String]): Unit = {printf("定時器觸發,時間為:%d,狀態為:%s,key為:%s\n",timestamp,state.value(),ctx.getCurrentKey)if(state.value()._2==0){//該時間段數據為0,進行預警printf("類型為:%s,數據為0,預警\n",state.value()._1)}//定期數據統計完成后,清零state.update(state.value()._1,0)//再次注冊定時器執行val currentTime: Long = ctx.timerService().currentProcessingTime()ctx.timerService().registerProcessingTimeTimer(currentTime + delayTime)}override def processElement(value: (String, String), ctx: KeyedProcessFunction[Tuple, (String, String), String]#Context, out: Collector[String]): Unit = {printf("狀態值:%s,state是否為空:%s\n",state.value(),(state.value()==null))if(state.value() == null){//獲取時間val currentTime: Long = ctx.timerService().currentProcessingTime()//注冊定時器十秒后觸發ctx.timerService().registerProcessingTimeTimer(currentTime + delayTime)printf("定時器注冊時間:%d\n",currentTime+10000L)state.update(value._1,value._2.toInt)} else{//統計數據val key: String = state.value()._1var count: Long = state.value()._2count += value._2.toInt//更新state值state.update((key,count))}println(getRuntimeContext.getTaskNameWithSubtasks+"->"+value)printf("狀態值:%s\n",state.value())//返回處理后結果out.collect("處理后返回數據->"+value)}}}?
代碼中使用ValueState記錄了狀態信息,每次來商品都會進行總額度累加;商品第一次進入的時候會注冊一個定時器,每隔十秒執行一次,定時器做預警功能,如果十秒內商品銷售等于0,我們則進行預警。
測試
往端口輸入數據
十秒內輸入四條數據
帽子,12 帽子,12 鞋,10 鞋,10?通過我們打印我們會發現統計完成,
定時器觸發,時間為:1586005420511,狀態為:(鞋,20),key為:(鞋) 定時器觸發,時間為:1586005421080,狀態為:(帽子,24),key為:(帽子)如果我們十秒內不輸入數據,則會提示數據為0,進行預警
定時器觸發,時間為:1586005406244,狀態為:(帽子,0),key為:(帽子) 類型為:帽子,數據為0,預警 定時器觸發,時間為:1586005406244,狀態為:(鞋,0),key為:(鞋) 類型為:鞋,數據為0,預警問題
到這里我們已經實現了定期統計功能,但有沒有發現,如果帽子分配在task1執行,鞋在task2執行,鞋一天進來1億條數據,帽子進來1條數據,我們會出現嚴重的數據傾斜問題。
我們實際看一下具體問題
計算結果我們就先不看了,直接看數據分配問題
三個task階段 , Socket是單并行的source,我們將并行度改為4
?
輸入數據:1條 帽子,10?;50條 鞋,10
我們看Map階段,數據是均衡的,因為這里還沒有進行keyby
?
我們再看keyby后的task
我們發現50條數據都在ID為3的subtask中,出現了嚴重數據傾斜問題?
這種問題我們可以進行兩階段keyby解決該問題
具體數據傾斜問題參考:https://datamining.blog.csdn.net/article/details/105322423
總結
以上是生活随笔為你收集整理的Flink ProcessFunction 介绍使用的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: ubuntu 配置 静态ip
- 下一篇: Kafka(六)Kafka基本客户端命令