生活随笔
收集整理的這篇文章主要介紹了
[Scala] Flink项目小彩蛋(六)
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
傳送區
[Scala] Flink項目實例系列(零)
[Scala] Flink項目實時熱門商品統計(一)
[Scala] Flink項目實時流量統計(二)
[Scala] Flink項目惡意登錄監控(三)
[Scala] Flink項目訂單支付失效監控(四)
[Scala] Flink項目訂單支付實時對賬(五)
[Scala] Flink項目小彩蛋(六)
本項目的代碼及文件見這這這,友情碼是:3n9z。
Join
Join官方傳送
戳我
Tumbling Window Join
圖好話少
Sliding Window Join
Session Window Join
Interval Join
示例代碼
數據源結構
orderIdeventTypetxIdtimestamp
| 34729 | pay | sd76f87d6 | 1558430844 |
txIdpayChanneleventTime
| ewr342as4 | wechat | 1558430845 |
import org.apache.flink.streaming.api.
TimeCharacteristic
import org.apache.flink.streaming.api.functions.co.
ProcessJoinFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.
Time
import org.apache.flink.util.
Collectorobject TxMatchByJoin {def main(args:
Array[
String]):
Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(
1)env.setStreamTimeCharacteristic(
TimeCharacteristic.
EventTime)// 讀取訂單事件流
val resource = getClass.getResource(
"/OrderLog.csv")
val orderEventStream = env.readTextFile(resource.getPath)// val orderEventStream = env.socketTextStream("localhost", 7777).map(data => {
val dataArray = data.split(
",")
OrderEvent(dataArray(
0).trim.toLong, dataArray(
1).trim, dataArray(
2).trim, dataArray(
3).trim.toLong)}).filter(_.txId !=
"").assignAscendingTimestamps(_.eventTime *
1000L).keyBy(_.txId)// 讀取支付到賬事件流
val receiptResource = getClass.getResource(
"/ReceiptLog.csv")
val receiptEventStream = env.readTextFile(receiptResource.getPath)
// val receiptEventStream = env.socketTextStream("localhost", 8888).map(data => {
val dataArray = data.split(
",")
ReceiptEvent(dataArray(
0).trim, dataArray(
1).trim, dataArray(
2).toLong)}).assignAscendingTimestamps(_.eventTime *
1000L).keyBy(_.txId)// intervalJoin的概念見鏈接// https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/joining.html#interval-join
val processedStream = orderEventStream.intervalJoin(receiptEventStream).between(
Time.seconds(-
5),
Time.seconds(
5)).process(
new TxPayMatchByJoin())processedStream.print()env.execute(
"tx pay match by join job")}
}
class TxPayMatchByJoin()
extends ProcessJoinFunction[
OrderEvent,
ReceiptEvent, (
OrderEvent,
ReceiptEvent)] {
override def processElement(left:
OrderEvent, right:
ReceiptEvent, ctx:
ProcessJoinFunction[
OrderEvent,
ReceiptEvent, (
OrderEvent,
ReceiptEvent)]#
Context, out:
Collector[(
OrderEvent,
ReceiptEvent)]):
Unit = {out.collect((left, right))}
}
?
總結
以上是生活随笔為你收集整理的[Scala] Flink项目小彩蛋(六)的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。