c#中connect函数_Flink算子使用方法及实例演示:union和connect
Flink的Transformation轉(zhuǎn)換主要包括四種:單數(shù)據(jù)流基本轉(zhuǎn)換、基于Key的分組轉(zhuǎn)換、多數(shù)據(jù)流轉(zhuǎn)換和數(shù)據(jù)重分布轉(zhuǎn)換。讀者可以使用Flink Scala Shell或者Intellij Idea來進(jìn)行練習(xí):
- Flink Scala Shell:使用交互式編程環(huán)境學(xué)習(xí)和調(diào)試Flink
- Flink 01 | 十分鐘搭建第一個(gè)Flink應(yīng)用和本地集群
- Flink算子使用方法及實(shí)例演示:map、filter和flatMap
- Flink算子使用方法及實(shí)例演示:keyBy、reduce和aggregations
很多情況下,我們需要對(duì)多個(gè)數(shù)據(jù)流進(jìn)行整合處理,Flink為我們提供了多流轉(zhuǎn)換算子,本文主要介紹多流轉(zhuǎn)換。
union
在DataStream上使用union算子可以合并多個(gè)同類型的數(shù)據(jù)流,并生成同類型的數(shù)據(jù)流,即可以將多個(gè)DataStream[T]合并為一個(gè)新的DataStream[T]。數(shù)據(jù)將按照先進(jìn)先出(First In First Out)的模式合并,且不去重。下圖union對(duì)白色和深色兩個(gè)數(shù)據(jù)流進(jìn)行合并,生成一個(gè)數(shù)據(jù)流。
union示意圖
假設(shè)股票價(jià)格數(shù)據(jù)流來自不同的交易所,我們將其合并成一個(gè)數(shù)據(jù)流:
val shenzhenStockStream: DataStream[StockPrice] = ...val hongkongStockStream: DataStream[StockPrice] = ...val shanghaiStockStream: DataStream[StockPrice] = ...val unionStockStream: DataStream[StockPrice] = shenzhenStockStream.union(hongkongStockStream, shanghaiStockStream)connect
union雖然可以合并多個(gè)數(shù)據(jù)流,但有一個(gè)限制,即多個(gè)數(shù)據(jù)流的數(shù)據(jù)類型必須相同。connect提供了和union類似的功能,用來連接兩個(gè)數(shù)據(jù)流,它與union的區(qū)別在于:
connect經(jīng)常被應(yīng)用在對(duì)一個(gè)數(shù)據(jù)流使用另外一個(gè)流進(jìn)行控制處理的場(chǎng)景上,如下圖所示。控制流可以是閾值、規(guī)則、機(jī)器學(xué)習(xí)模型或其他參數(shù)。
對(duì)一個(gè)數(shù)據(jù)流進(jìn)行控制處理
對(duì)于ConnectedStreams,我們需要重寫CoMapFunction或CoFlatMapFunction。這兩個(gè)接口都提供了三個(gè)泛型,這三個(gè)泛型分別對(duì)應(yīng)第一個(gè)輸入流的數(shù)據(jù)類型、第二個(gè)輸入流的數(shù)據(jù)類型和輸出流的數(shù)據(jù)類型。在重寫函數(shù)時(shí),對(duì)于CoMapFunction,map1處理第一個(gè)流的數(shù)據(jù),map2處理第二個(gè)流的數(shù)據(jù);對(duì)于CoFlatMapFunction,flatMap1處理第一個(gè)流的數(shù)據(jù),flatMap2處理第二個(gè)流的數(shù)據(jù)。Flink并不能保證兩個(gè)函數(shù)調(diào)用順序,兩個(gè)函數(shù)的調(diào)用依賴于兩個(gè)數(shù)據(jù)流數(shù)據(jù)的流入先后順序,即第一個(gè)數(shù)據(jù)流有數(shù)據(jù)到達(dá)時(shí),map1或flatMap1會(huì)被調(diào)用,第二個(gè)數(shù)據(jù)流有數(shù)據(jù)到達(dá)時(shí),map2或flatMap2會(huì)被調(diào)用。下面的代碼對(duì)一個(gè)整數(shù)流和一個(gè)字符串流進(jìn)行了connect操作。
val intStream: DataStream[Int] = senv.fromElements(1, 0, 9, 2, 3, 6)val stringStream: DataStream[String] = senv.fromElements("LOW", "HIGH", "LOW", "LOW")val connectedStream: ConnectedStreams[Int, String] = intStream.connect(stringStream)// CoMapFunction三個(gè)泛型分別對(duì)應(yīng)第一個(gè)流的輸入、第二個(gè)流的輸入,map之后的輸出class MyCoMapFunction extends CoMapFunction[Int, String, String] { override def map1(input1: Int): String = input1.toString override def map2(input2: String): String = input2}val mapResult = connectedStream.map(new MyCoMapFunction)我們知道,如果不對(duì)DataStream按照Key進(jìn)行分組,數(shù)據(jù)是隨機(jī)分配在各個(gè)TaskSlot上的,而絕大多數(shù)情況我們是要對(duì)某個(gè)Key進(jìn)行分析和處理,Flink允許我們將connect和keyBy或broadcast結(jié)合起來使用。例如,我們將之前的股票價(jià)格數(shù)據(jù)流與一個(gè)媒體評(píng)價(jià)數(shù)據(jù)流結(jié)合起來,按照股票代號(hào)進(jìn)行分組。
// 先將兩個(gè)流connect,再進(jìn)行keyByval keyByConnect1: ConnectedStreams[StockPrice, Media] = stockPriceRawStream .connect(mediaStatusStream) .keyBy(0,0)// 先keyBy再connectval keyByConnect2: ConnectedStreams[StockPrice, Media] = stockPriceRawStream.keyBy(0).connect(mediaStatusStream.keyBy(0))無論先keyBy還是先connect,我們都可以將含有相同Key的數(shù)據(jù)轉(zhuǎn)發(fā)到下游同一個(gè)算子實(shí)例上。這種操作有點(diǎn)像SQL中的join操作。Flink也提供了join算子,join主要在時(shí)間窗口維度上,connect相比而言更廣義一些,關(guān)于join的介紹將在后續(xù)文章中介紹。
下面的代碼展示了如何將股票價(jià)格和媒體正負(fù)面評(píng)價(jià)結(jié)合起來,當(dāng)媒體評(píng)價(jià)為正且股票價(jià)格大于閾值時(shí),輸出一個(gè)正面信號(hào)。完整代碼在我的github上:https://github.com/luweizheng/flink-tutorials
package com.flink.tutorials.demos.stockimport java.util.Calendarimport com.flink.tutorials.demos.stock.StockPriceDemo.{StockPrice, StockPriceSource, StockPriceTimeAssigner}import org.apache.flink.configuration.Configurationimport org.apache.flink.streaming.api.TimeCharacteristicimport org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunctionimport org.apache.flink.streaming.api.functions.source.RichSourceFunctionimport org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContextimport org.apache.flink.streaming.api.scala._import org.apache.flink.util.Collectorimport scala.util.Randomobject StockMediaConnectedDemo { def main(args: Array[String]) { // 設(shè)置執(zhí)行環(huán)境 val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) // 每5秒生成一個(gè)Watermark env.getConfig.setAutoWatermarkInterval(5000L) // 股票價(jià)格數(shù)據(jù)流 val stockPriceRawStream: DataStream[StockPrice] = env // 該數(shù)據(jù)流由StockPriceSource類隨機(jī)生成 .addSource(new StockPriceSource) // 設(shè)置 Timestamp 和 Watermark .assignTimestampsAndWatermarks(new StockPriceTimeAssigner) val mediaStatusStream: DataStream[Media] = env .addSource(new MediaSource) // 先將兩個(gè)流connect,再進(jìn)行keyBy val keyByConnect1: ConnectedStreams[StockPrice, Media] = stockPriceRawStream .connect(mediaStatusStream) .keyBy(0,0) // 先keyBy再connect val keyByConnect2: ConnectedStreams[StockPrice, Media] = stockPriceRawStream.keyBy(0) .connect(mediaStatusStream.keyBy(0)) val alert1 = keyByConnect1.flatMap(new AlertFlatMap).print() val alerts2 = keyByConnect2.flatMap(new AlertFlatMap).print() // 執(zhí)行程序 env.execute("connect stock price with media status") } /** 媒體評(píng)價(jià) * * symbol 股票代號(hào) * timestamp 時(shí)間戳 * status 評(píng)價(jià) 正面/一般/負(fù)面 */ case class Media(symbol: String, timestamp: Long, status: String) class MediaSource extends RichSourceFunction[Media]{ var isRunning: Boolean = true val rand = new Random() var stockId = 0 override def run(srcCtx: SourceContext[Media]): Unit = { while (isRunning) { // 每次從列表中隨機(jī)選擇一只股票 stockId = rand.nextInt(5) var status: String = "NORMAL" if (rand.nextGaussian() > 0.9) { status = "POSITIVE" } else if (rand.nextGaussian() < 0.05) { status = "NEGATIVE" } val curTime = Calendar.getInstance.getTimeInMillis srcCtx.collect(Media(stockId.toString, curTime, status)) Thread.sleep(rand.nextInt(100)) } } override def cancel(): Unit = { isRunning = false } } case class Alert(symbol: String, timestamp: Long, alert: String) class AlertFlatMap extends RichCoFlatMapFunction[StockPrice, Media, Alert] { var priceMaxThreshold: List[Double] = List(101.0d, 201.0d, 301.0d, 401.0d, 501.0d) var mediaLevel: String = "NORMAL" override def flatMap1(stock: StockPrice, collector: Collector[Alert]) : Unit = { val stockId = stock.symbol.toInt if ("POSITIVE".equals(mediaLevel) && stock.price > priceMaxThreshold(stockId)) { collector.collect(Alert(stock.symbol, stock.timestamp, "POSITIVE")) } } override def flatMap2(media: Media, collector: Collector[Alert]): Unit = { mediaLevel = media.status } }}總結(jié)
以上是生活随笔為你收集整理的c#中connect函数_Flink算子使用方法及实例演示:union和connect的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: (枚举)餐厅点餐(fzu2086)
- 下一篇: 利用matlab绘制图形