Apache Flink 零基础入门(七)Flink中keyBy三种方式指定key
keyBy 如何指定key
不管是stream還是batch處理,都有一個keyBy(stream)和groupBy(batch)操作。那么該如何指定key?
Some transformations (join, coGroup, keyBy, groupBy) require that a key be defined on a collection of elements. Other transformations (Reduce, GroupReduce, Aggregate, Windows) allow data being grouped on a key before they are applied.
?一些算子(transformations)例如join,coGroup,keyBy,groupBy往往需要定義一個key。其他的算子例如Reduce, GroupReduce, Aggregate, Windows,也允許數據按照key進行分組。
DataSet
DataSet<...> input = // [...] DataSet<...> reduced = input.groupBy(/*define key here*/).reduceGroup(/*do something*/);DataStream
DataStream<...> input = // [...] DataStream<...> windowed = input.keyBy(/*define key here*/).window(/*window specification*/);類似于mysql中的join操作:select a.* , b.* from a join b on a.id=b.id
這里的keyBy就是a.id=b.id
有哪幾種方式定義Key?
方式一:Tuple
DataStream<Tuple3<Integer,String,Long>> input = // [...] KeyedStream<Tuple3<Integer,String,Long>,Tuple> keyed = input.keyBy(0)可以傳字段的位置
DataStream<Tuple3<Integer,String,Long>> input = // [...] KeyedStream<Tuple3<Integer,String,Long>,Tuple> keyed = input.keyBy(0,1)可以傳字段位置的組合
這對于簡單的使用時沒問題的。但是對于內嵌的Tuple,如下所示:
DataStream<Tuple3<Tuple2<Integer, Float>,String,Long>> ds;如果使用keyBy(0),那么他就會使用整個Tuple2<Integer, Float>作為key,(因為Tuple2<Integer, Float>是Tuple3<Tuple2<Integer, Float>,String,Long>的0號位置)。如果想要指定key到Tuple2<Integer, Float>內部中,可以使用下面的方式。
方式二:字段表達式
我們可以使用基于字符串字段表達式來引用內嵌字段去定義key。
之前我們的算子寫法是這樣的:
text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {String[] tokens = value.toLowerCase().split(",");for(String token: tokens) {if(token.length() > 0) {out.collect(new Tuple2<String, Integer>(token, 1));}}}}).keyBy(0).timeWindow(Time.seconds(5)).sum(1).print().setParallelism(1);其中的new FlatMapFunction<String, Tuple2<String, Integer>>表示輸入是一個String,輸出是一個Tuple2<String, Integer>。這里我們重新定義一個內部類:
public static class WC {private String word;private int count;public WC() {}public WC(String word, int count) {this.word = word;this.count = count;}@Overridepublic String toString() {return "WC{" +"word='" + word + '\'' +", count=" + count +'}';}public String getWord() {return word;}public void setWord(String word) {this.word = word;}public int getCount() {return count;}public void setCount(int count) {this.count = count;}}修改算子的寫法:
text.flatMap(new FlatMapFunction<String, WC>() {@Overridepublic void flatMap(String value, Collector<WC> out) throws Exception {String[] tokens = value.toLowerCase().split(",");for (String token : tokens) {if (token.length() > 0) {out.collect(new WC(token, 1));}}}}).keyBy("word").timeWindow(Time.seconds(5)).sum("count").print().setParallelism(1);將原來的輸出Tuple2<String, Integer>,修改為輸出WC類型;將原來的keyBy(0)修改為keyBy("word");將原來的sum(1)修改為sum("count")
因此,在這個例子中我們有一個POJO類,有兩個字段分別是"word"和"count",可以傳遞字段名到keyBy("")中。
語法:
- 字段名一定要與POJO類中的字段名一致。一定要提供默認的構造函數,和get與set方法。
- 使用Tuple時,0表示第一個字段
- 可以使用嵌套方式,舉例如下:
- "count",指向的是WC中的字段count
- "complex",指向的是復雜數據類型,會遞歸選擇所有ComplexNestedClass的字段
- "complex.word.f2",指向的是Tuple3中的最后一個字段。
- "complex.hadoopCitizen",指向的是Hadoop?IntWritable?type
scala寫法:
object StreamingWCScalaApp {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment// 引入隱式轉換import org.apache.flink.api.scala._val text = env.socketTextStream("192.168.152.45", 9999)text.flatMap(_.split(",")).map(x => WC(x,1)).keyBy("word").timeWindow(Time.seconds(5)).sum("count").print().setParallelism(1)env.execute("StreamingWCScalaApp");}case class WC(word: String, count: Int) }?方式三:key選擇器函數
.keyBy(new KeySelector<WC, Object>() {@Overridepublic Object getKey(WC value) throws Exception {return value.word;}})總結
以上是生活随笔為你收集整理的Apache Flink 零基础入门(七)Flink中keyBy三种方式指定key的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Apache Flink 零基础入门(六
- 下一篇: Apache Flink 零基础入门(八