【2】flink数据流转换算子
【README】
本文記錄了flink對數(shù)據(jù)的轉(zhuǎn)換操作,包括
本文使用的flink為 1.14.4 版本;maven依賴如下:
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.14.4</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>1.14.4</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>1.14.4</version></dependency>本文部分內(nèi)容參考了 flink 官方文檔:
概覽 | Apache Flink算子 # 用戶通過算子能將一個或多個 DataStream 轉(zhuǎn)換成新的 DataStream,在應用程序中可以將多個數(shù)據(jù)轉(zhuǎn)換算子合并成一個復雜的數(shù)據(jù)流拓撲。這部分內(nèi)容將描述 Flink DataStream API 中基本的數(shù)據(jù)轉(zhuǎn)換 API,數(shù)據(jù)轉(zhuǎn)換后各種數(shù)據(jù)分區(qū)方式,以及算子的鏈接策略。數(shù)據(jù)流轉(zhuǎn)換 # Map # DataStream → DataStream # 輸入一個元素同時輸出一個元素。下面是將輸入流中元素數(shù)值加倍的 map function:Java DataStream dataStream = //... dataStream.map(new MapFunction() { @Override public Integer map(Integer value) throws Exception { return 2 * value; } }); Scala dataStream.map { x => x * 2 } Python data_stream = env.from_collection(collection=[1, 2, 3, 4, 5]) data_stream.map(lambda x: 2 * x, output_type=Types.,>https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/datastream/operators/overview/
【1】基本轉(zhuǎn)換算子
包括 map-轉(zhuǎn)換, flatMap-打散,filter-過濾;
1)代碼如下:
/*** @Description flink對數(shù)據(jù)流的基本轉(zhuǎn)換* @author xiao tang* @version 1.0.0* @createTime 2022年04月15日*/ public class TransformTest1_Base {public static void main(String[] args) throws Exception {// 創(chuàng)建執(zhí)行環(huán)境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 從kafka讀取數(shù)據(jù)DataStream<String> baseStream = env.readTextFile("D:\\workbench_idea\\diydata\\flinkdemo2\\src\\main\\resources\\sensor.txt");// 1-map-轉(zhuǎn)換或映射或函數(shù); 把string轉(zhuǎn)為長度輸出 // DataStream<Integer> mapStream = baseStream.map(x->x.length());DataStream<Integer> mapStream = baseStream.map(String::length);// 2-flatMap-打散-按照逗號分割字段DataStream<String> flatMapStream = baseStream.flatMap((String raw, Collector<String> collector)->{for (String rd : raw.split(",")) {collector.collect(rd);}}).returns(Types.STRING);// 3-filter-過濾-篩選 sensor_1 開頭的結(jié)束DataStream<String> filterStream = baseStream.filter(x->x.startsWith("sensor_1"));// 打印輸出mapStream.print("mapStream");flatMapStream.print("flatMapStream");filterStream.print("filterStream");// 執(zhí)行env.execute("BaseTransformStreamJob");} }sensor 文本文件如下:
sensor_1,12341561,36.1 sensor_2,12341562,33.5 sensor_3,12341563,39.9 sensor_1,12341573,43.1打印結(jié)果:
mapStream> 22
flatMapStream> sensor_1
flatMapStream> 12341561
flatMapStream> 36.1
filterStream> sensor_1,12341561,36.1
mapStream> 22
flatMapStream> sensor_2
flatMapStream> 12341562
flatMapStream> 33.5
mapStream> 22
flatMapStream> sensor_3
flatMapStream> 12341563
flatMapStream> 39.9
mapStream> 22
flatMapStream> sensor_1
flatMapStream> 12341573
flatMapStream> 43.1
filterStream> sensor_1,12341573,43.1
【2】滾動聚合算子
keyBy算子-根據(jù)key對數(shù)據(jù)流分組,因為聚合前必須前分組,類似于sql的group by;
keyBy算子的作用:
- 邏輯把一個數(shù)據(jù)流拆分為多個分區(qū)(但還是同一個流),每個分區(qū)包含相同key(相同hash)的元素,底層對key求hash來實現(xiàn);
- 在邏輯上將流劃分為不相交的分區(qū)。具有相同 key 的記錄都分配到同一個分區(qū)。在內(nèi)部, keyBy() 是通過哈希分區(qū)實現(xiàn)的。
keyBy可以形成 KeyedStream;
然后滾動聚合算子可以對 KeyStream 進行操作,滾動聚合算子如下:
- sum
- min
- max
- minBy
- maxBy
【2.1】代碼示例
/*** @Description 滾動聚合算子* @author xiao tang* @version 1.0.0* @createTime 2022年04月15日*/ public class TransformTest2_RollingAgg {public static void main(String[] args) throws Exception {// 創(chuàng)建執(zhí)行環(huán)境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 從kafka讀取數(shù)據(jù)DataStream<String> inputStream = env.readTextFile("D:\\workbench_idea\\diydata\\flinkdemo2\\src\\main\\resources\\sensor.txt");// 轉(zhuǎn)換為 SensorReader pojo類型DataStream<SensorReading> sensorStream = inputStream.map(x -> {String[] arr = x.split(",");return new SensorReading(arr[0], Long.valueOf(arr[1]), Double.valueOf(arr[2]));});// keyBy算子對數(shù)據(jù)流分組,并做滾動聚合(單字段分組)KeyedStream<SensorReading, String> keyedStream = sensorStream.keyBy(SensorReading::getId);// keyBy 多字段分組 // KeyedStream<SensorReading, Tuple1<String>> keyedStream = sensorStream.keyBy(new KeySelector<SensorReading, Tuple1<String>>() { // @Override // public Tuple1<String> getKey(SensorReading sensorReading) throws Exception { // return Tuple1.of(sensorReading.getId()); // } // });// max聚合DataStream<SensorReading> maxTempratureStream = keyedStream.max("temperature");// maxBy 聚合DataStream<SensorReading> maxbyTempratureStream = keyedStream.maxBy("temperature");// 打印輸出maxTempratureStream.print("maxTempratureStream");// 打印輸出maxbyTempratureStream.print("maxbyTempratureStream");// 執(zhí)行env.execute("maxTempratureStreamJob");} }sensor文本內(nèi)容:
sensor_1,11,36.1 sensor_2,21,33.1 sensor_1,12,36.2 sensor_1,13,36.3 sensor_2,22,33.2max聚合打印結(jié)果:
max> SensorRd{id='sensor_1', timestamp=11, temperature=36.1}
max> SensorRd{id='sensor_2', timestamp=21, temperature=33.1}
max> SensorRd{id='sensor_1', timestamp=11, temperature=36.2}
max> SensorRd{id='sensor_1', timestamp=11, temperature=36.3}
max> SensorRd{id='sensor_2', timestamp=21, temperature=33.2}
maxBy聚合打印結(jié)果:
maxBy> SensorRd{id='sensor_1', timestamp=11, temperature=36.1}
maxBy> SensorRd{id='sensor_2', timestamp=21, temperature=33.1}
maxBy> SensorRd{id='sensor_1', timestamp=12, temperature=36.2}
maxBy> SensorRd{id='sensor_1', timestamp=13, temperature=36.3}
maxBy> SensorRd{id='sensor_2', timestamp=22, temperature=33.2}
小結(jié),max與maxBy區(qū)別:
- max:把聚合字段(最大溫度值)取出來,其他字段和第一條記錄保持一致;
- maxBy:把聚合字段(最大溫度值)取出來,且連同最大溫度值所在記錄的其他字段一并取出來;
同理 min與minby,本文不再演示;
補充: 聚合時要先分組,可以根據(jù)單字段分組,也可以根據(jù)多個字段分組;
上述代碼注釋部分給出了多個字段分組的例子,一個組記錄稱為Tuple,元組;
1個字段叫 Tuple1,2個字段叫Tuple2;....
【2.2】規(guī)約聚合-reduce
定義:
在相同 key 的數(shù)據(jù)流上“滾動”執(zhí)行 reduce。將當前元素與最后一次 reduce 得到的值組合然后輸出新值。
場景:根據(jù)sensorid分組后,形成keyedStream,然后查詢最大溫度,且最新時間戳;即多個聚合算子;
代碼
/*** @Description reduce規(guī)約聚合算子 * @author xiao tang* @version 1.0.0* @createTime 2022年04月15日*/ public class TransformTest3_Reduce {public static void main(String[] args) throws Exception {// 創(chuàng)建執(zhí)行環(huán)境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 從kafka讀取數(shù)據(jù)DataStream<String> inputStream = env.readTextFile("D:\\workbench_idea\\diydata\\flinkdemo2\\src\\main\\resources\\sensor.txt");// 轉(zhuǎn)換為 SensorReader pojo類型DataStream<SensorReading> sensorStream = inputStream.map(x -> {String[] arr = x.split(",");return new SensorReading(arr[0], Long.valueOf(arr[1]), Double.valueOf(arr[2]));});// keyBy算子對數(shù)據(jù)流分組,并做滾動聚合(單字段分組)KeyedStream<SensorReading, String> keyedStream = sensorStream.keyBy(SensorReading::getId);// reduce規(guī)約聚合-查詢最大溫度,且最新時間戳DataStream<SensorReading> reduceStream = keyedStream.reduce((a,b)->new SensorReading(a.getId(), Math.max(a.getTimestamp(),b.getTimestamp()), Math.max(a.getTemperature(),b.getTemperature())));// 打印輸出reduceStream.print("reduceStream");// 執(zhí)行env.execute("reduceStreamJob");} }sensor文本:
sensor_1,11,36.1 sensor_2,21,33.1 sensor_1,32,36.2 sensor_1,23,30.3 sensor_2,22,31.2打印結(jié)果:
reduceStream> SensorRd{id='sensor_1', timestamp=11, temperature=36.1}
reduceStream> SensorRd{id='sensor_2', timestamp=21, temperature=33.1}
reduceStream> SensorRd{id='sensor_1', timestamp=32, temperature=36.2}
reduceStream> SensorRd{id='sensor_1', timestamp=32, temperature=36.2}
reduceStream> SensorRd{id='sensor_2', timestamp=22, temperature=33.1}
【3】分流(把一個流切分為多個流)
flink 1.14.4 移除了 split 算子,refer2 ?https://issues.apache.org/jira/browse/FLINK-19083
轉(zhuǎn)而使用 side output 側(cè)輸出實現(xiàn),refer2
Side Outputs | Apache Flink
【3.1】 切分流(flink移除了split方法,需要使用 side output 來實現(xiàn)流切分)
1)代碼,啟動大于30度算高溫,否則低溫;
通過實現(xiàn)? ProcessFunction 來實現(xiàn);
public class TransformTest4_SplitStream {public static void main(String[] args) throws Exception {// 創(chuàng)建執(zhí)行環(huán)境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 從kafka讀取數(shù)據(jù)DataStream<String> inputStream = env.readTextFile("D:\\workbench_idea\\diydata\\flinkdemo2\\src\\main\\resources\\sensor.txt");// 轉(zhuǎn)換為 SensorReader pojo類型DataStream<SensorReading> sensorStream = inputStream.map(x -> {String[] arr = x.split(",");return new SensorReading(arr[0], Long.valueOf(arr[1]), Double.valueOf(arr[2]));});// 1,分流,按照溫度值是否大于30度,分為兩條流-高溫和低溫OutputTag<SensorReading> highTag = new OutputTag<SensorReading>("high") {};OutputTag<SensorReading> lowTag = new OutputTag<SensorReading>("low") {};SingleOutputStreamOperator<SensorReading> splitStream = sensorStream.process(new ProcessFunction<SensorReading, SensorReading>() {@Overridepublic void processElement(SensorReading record, Context context, Collector<SensorReading> collector) throws Exception {// 把數(shù)據(jù)發(fā)送到側(cè)輸出context.output(record.getTemperature()>30? highTag : lowTag, record);// 把數(shù)據(jù)發(fā)送到常規(guī)輸出collector.collect(record);}});// 2, 選擇流打印輸出splitStream.getSideOutput(highTag).print("high");splitStream.getSideOutput(lowTag).print("low");// 執(zhí)行env.execute("reduceStreamJob");} }sensor文本:
sensor_1,11,36.1 sensor_2,21,23.1 sensor_1,32,36.2 sensor_1,23,30.3 sensor_2,22,11.2打印結(jié)果:
high> SensorRd{id='sensor_1', timestamp=11, temperature=36.1}
low> SensorRd{id='sensor_2', timestamp=21, temperature=23.1}
high> SensorRd{id='sensor_1', timestamp=32, temperature=36.2}
high> SensorRd{id='sensor_1', timestamp=23, temperature=30.3}
low> SensorRd{id='sensor_2', timestamp=22, temperature=11.2}
以上分流代碼refer2 ?Process function: a versatile tool in Flink datastream API | Develop Paper
【4】connect 連接流
1)定義: 把多個流連接為一個流,叫做連接流,連接流中的子流的各自元素類型可以不同;
2)步驟:
- 把2個流 connect 連接再一起形成 ConnectedStream;
- 把連接流 通過 map 得到數(shù)據(jù)流;
代碼:
/*** @Description connect-連接流* @author xiao tang* @version 1.0.0* @createTime 2022年04月16日*/ public class TransformTest5_ConnectStream {public static void main(String[] args) throws Exception {// 創(chuàng)建執(zhí)行環(huán)境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 從kafka讀取數(shù)據(jù)DataStream<String> inputStream = env.readTextFile("D:\\workbench_idea\\diydata\\flinkdemo2\\src\\main\\resources\\sensor.txt");// 轉(zhuǎn)換為 SensorReader pojo類型DataStream<SensorReading> sensorStream = inputStream.map(x -> {String[] arr = x.split(",");return new SensorReading(arr[0], Long.valueOf(arr[1]), Double.valueOf(arr[2]));});// 1,分流,按照溫度值是否大于30度,分為兩條流-高溫和低溫OutputTag<SensorReading> highTag = new OutputTag<SensorReading>("high") {};OutputTag<SensorReading> lowTag = new OutputTag<SensorReading>("low") {};SingleOutputStreamOperator<SensorReading> splitStream = sensorStream.process(new ProcessFunction<SensorReading, SensorReading>() {@Overridepublic void processElement(SensorReading record, Context context, Collector<SensorReading> collector) throws Exception {// 把數(shù)據(jù)發(fā)送到側(cè)輸出context.output(record.getTemperature() > 30 ? highTag : lowTag, record);// 把數(shù)據(jù)發(fā)送到常規(guī)輸出collector.collect(record);}});// 得到高溫和低溫流DataStream<SensorReading> highStream = splitStream.getSideOutput(highTag);DataStream<SensorReading> lowStream = splitStream.getSideOutput(lowTag);// 2 把2個流連接為1個流(子流1的元素為3元組,子流2的元素為2元組)ConnectedStreams<SensorReading, SensorReading> connectedStreams = highStream.connect(lowStream);DataStream<Object> resultStream = connectedStreams.map(new CoMapFunction<SensorReading, SensorReading, Object>() {@Overridepublic Object map1(SensorReading rd) throws Exception {return new Tuple3<>(rd.getId(), rd.getTemperature(), "high"); // map1 作用于第1個流 highStream}@Overridepublic Object map2(SensorReading rd) throws Exception {return new Tuple2<>(rd.getId(), rd.getTemperature()); // map2 作用于第2個流 lowStream}});// 3 打印結(jié)果resultStream.print("connectedStream");// 執(zhí)行env.execute("connectedStreamJob");} }sensor文本:
sensor_1,11,36.1 sensor_2,21,23.1 sensor_1,32,36.2 sensor_1,23,30.3 sensor_2,22,11.2打印結(jié)果:
connectedStream> (sensor_1,36.1,high)
connectedStream> (sensor_2,23.1)
connectedStream> (sensor_1,36.2,high)
connectedStream> (sensor_2,11.2)
connectedStream> (sensor_1,30.3,high)
【5】合流-union
上述connect,只能連接兩條流,如果要合并多條流,connect需要多次連接,不太適合;
如果要合并多條流,需要用 union,前提是 多個流的元素數(shù)據(jù)類型需要相同;
1)代碼
// 2 把3個流合并為1個流DataStream<SensorReading> inputStream2 = env.readTextFile("D:\\workbench_idea\\diydata\\flinkdemo2\\src\\main\\resources\\sensor2.txt").map(x -> {String[] arr = x.split(",");return new SensorReading(arr[0], Long.valueOf(arr[1]), Double.valueOf(arr[2]));});DataStream<SensorReading> unionStream = highStream.union(lowStream,inputStream2);// 3 打印結(jié)果unionStream.print("unionStream");// 執(zhí)行env.execute("unionStreamJob");打印結(jié)果:
unionStream> SensorRd{id='sensor2_1', timestamp=11, temperature=36.1}
unionStream> SensorRd{id='sensor2_2', timestamp=21, temperature=23.1}
unionStream> SensorRd{id='sensor2_1', timestamp=32, temperature=36.2}
unionStream> SensorRd{id='sensor2_1', timestamp=23, temperature=30.3}
unionStream> SensorRd{id='sensor2_2', timestamp=22, temperature=11.2}
unionStream> SensorRd{id='sensor_1', timestamp=11, temperature=36.1}
unionStream> SensorRd{id='sensor_1', timestamp=32, temperature=36.2}
unionStream> SensorRd{id='sensor_1', timestamp=23, temperature=30.3}
unionStream> SensorRd{id='sensor_2', timestamp=21, temperature=23.1}
unionStream> SensorRd{id='sensor_2', timestamp=22, temperature=11.2}
【6】自定義函數(shù) UDF user-defined function
flink 暴露了所有udf 函數(shù)的接口,如MapFunction, FilterFunction, ProcessFunction等;可以理解為 java8引入的 函數(shù)式接口;
可以參考官方的udf文檔:
ck自定義函數(shù) | Apache Flinkhttps://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/table/functions/udfs/
【6.1】富函數(shù)
1)復函數(shù)可以獲取上下文信息,而普通函數(shù)則不行;
代碼:
/*** @Description 富函數(shù)* @author xiao tang* @version 1.0.0* @createTime 2022年04月16日*/ public class TransformTest7_RichFunction {public static void main(String[] args) throws Exception {// 創(chuàng)建執(zhí)行環(huán)境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);// 從文件讀取數(shù)據(jù)DataStream<String> inputStream = env.readTextFile("D:\\workbench_idea\\diydata\\flinkdemo2\\src\\main\\resources\\sensor.txt");// 轉(zhuǎn)換為 SensorReader pojo類型DataStream<SensorReading> sensorStream = inputStream.map(x -> {String[] arr = x.split(",");return new SensorReading(arr[0], Long.valueOf(arr[1]), Double.valueOf(arr[2]));});// 自定義富函數(shù)DataStream<Tuple3<String,Integer,Integer>> richMapStream = sensorStream.map(new MyRichMapFunction());// 3 打印結(jié)果richMapStream.print("richMapStream");// 執(zhí)行env.execute("richMapStreamJob");}// 富函數(shù)類static class MyRichMapFunction extends RichMapFunction<SensorReading, Tuple3<String, Integer, Integer>> {@Overridepublic Tuple3<String, Integer, Integer> map(SensorReading record) throws Exception {// 富函數(shù)可以獲取運行時上下文的屬性 getRuntimeContext() ,普通map函數(shù)則不行return new Tuple3<String, Integer, Integer>(record.getId(), record.getId().length(), getRuntimeContext().getIndexOfThisSubtask());}@Overridepublic void open(Configuration parameters) throws Exception {// 初始化工作,一般是定義狀態(tài), 或者建立數(shù)據(jù)庫連接System.out.println("open db conn");}@Overridepublic void close() throws Exception {// 關(guān)閉連接,清空狀態(tài)System.out.println("close db conn");}} }sensor文本:
sensor_1,11,36.1 sensor_2,21,23.1 sensor_1,32,36.2 sensor_1,23,30.3 sensor_2,22,11.2?打印結(jié)果:
open db conn
open db conn
richMapStream:1> (sensor_1,8,0)
richMapStream:2> (sensor_1,8,1)
richMapStream:1> (sensor_2,8,0)
richMapStream:2> (sensor_2,8,1)
richMapStream:1> (sensor_1,8,0)
close db conn
close db conn
從打印結(jié)果可以看出,每個子任務(線程)都會執(zhí)行 open close方法 ,tuple3中的第3個字段是 執(zhí)行上下文的任務id(這是富函數(shù)才可以獲得上下文);
【7】flink中的數(shù)據(jù)重分區(qū)
1)flink中的分區(qū)指的是: taskmanager中的槽,即線程;
分區(qū)操作有:
- shuffle-洗牌亂分區(qū);
- keyBy-按照key分區(qū);
- global 把數(shù)據(jù)轉(zhuǎn)到第1個分區(qū)
2)代碼 :
/*** @Description 重分區(qū)* @author xiao tang* @version 1.0.0* @createTime 2022年04月16日*/ public class TransformTest8_Partition2 {public static void main(String[] args) throws Exception {// 創(chuàng)建執(zhí)行環(huán)境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);// 從文件讀取數(shù)據(jù)DataStream<String> inputStream = env.readTextFile("D:\\workbench_idea\\diydata\\flinkdemo2\\src\\main\\resources\\sensor.txt");// 轉(zhuǎn)換為 SensorReader pojo類型DataStream<SensorReading> sensorStream = inputStream.map(x -> {String[] arr = x.split(",");return new SensorReading(arr[0], Long.valueOf(arr[1]), Double.valueOf(arr[2]));});// 1-shuffle 洗牌(亂分區(qū))DataStream<SensorReading> shuffleStream = sensorStream.shuffle();shuffleStream.print("shuffleStream");// 2-keyby 按照key分區(qū)DataStream<SensorReading> keybyStream = sensorStream.keyBy(SensorReading::getId); // keybyStream.print("keybyStream");// 3-global 把數(shù)據(jù)轉(zhuǎn)到第1個分區(qū)DataStream<SensorReading> globalStream = sensorStream.global(); // globalStream.print("globalStream");// 執(zhí)行env.execute("partitionJob");} }sensor文本:
sensor_1,11,36.1 sensor_2,21,23.1 sensor_1,32,36.2 sensor_1,23,30.3 sensor_2,22,11.2原生分區(qū)結(jié)果:(重分區(qū)前)
rawStream:1> SensorRd{id='sensor_1', timestamp=11, temperature=36.1} rawStream:2> SensorRd{id='sensor_1', timestamp=23, temperature=30.3} rawStream:1> SensorRd{id='sensor_2', timestamp=21, temperature=23.1} rawStream:2> SensorRd{id='sensor_2', timestamp=22, temperature=11.2} rawStream:1> SensorRd{id='sensor_1', timestamp=32, temperature=36.2}shuffle-洗牌亂分區(qū)結(jié)果:
shuffleStream:2> SensorRd{id='sensor_1', timestamp=11, temperature=36.1} shuffleStream:1> SensorRd{id='sensor_2', timestamp=22, temperature=11.2} shuffleStream:2> SensorRd{id='sensor_1', timestamp=32, temperature=36.2} shuffleStream:1> SensorRd{id='sensor_2', timestamp=21, temperature=23.1} shuffleStream:2> SensorRd{id='sensor_1', timestamp=23, temperature=30.3}keyby-按照key進行分區(qū)的結(jié)果:
keybyStream:1> SensorRd{id='sensor_2', timestamp=21, temperature=23.1} keybyStream:2> SensorRd{id='sensor_1', timestamp=23, temperature=30.3} keybyStream:1> SensorRd{id='sensor_2', timestamp=22, temperature=11.2} keybyStream:2> SensorRd{id='sensor_1', timestamp=11, temperature=36.1} keybyStream:2> SensorRd{id='sensor_1', timestamp=32, temperature=36.2}global-把數(shù)據(jù)轉(zhuǎn)到第1個分區(qū)的打印結(jié)果:
globalStream:1> SensorRd{id='sensor_1', timestamp=23, temperature=30.3} globalStream:1> SensorRd{id='sensor_2', timestamp=22, temperature=11.2} globalStream:1> SensorRd{id='sensor_1', timestamp=11, temperature=36.1} globalStream:1> SensorRd{id='sensor_2', timestamp=21, temperature=23.1} globalStream:1> SensorRd{id='sensor_1', timestamp=32, temperature=36.2}總結(jié)
以上是生活随笔為你收集整理的【2】flink数据流转换算子的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 华为手机如何删除app软件
- 下一篇: 优化差遭吐槽,《CS2(反恐精英 2)》