Flink 1.7.2 dataset transformation 示例 源碼 https://github.com/opensourceteams/flink-maven-scala 概述 Flink transformation示例 map,flatMap,filter,reduce,groupBy,reduceGroup,combineGroup,Aggregate(sum,max,min) distinct,join,join funtion,leftOuterJoin,rightOuterJoin,fullOuterJoin,union,first,coGroup,cross transformation map 對集合元素,進行一一遍歷處理 示例功能:給集合中的每一一行,都拼接字符串 package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.mapimport org.apache.flink.api.scala.ExecutionEnvironmentimport org.apache.flink.api.scala._object Run {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements("c a b d a c","d c a b c d")val dataSet2 = dataSet.map(_.toUpperCase + "字符串連接")dataSet2.print()}}
C A B D A C字符串連接
D C A B C D字符串連接
flatMap 對集合元素,進行一一遍歷處理,并把子集合中的數據拉到一個集合中 示例功能:把行進行拆分后,再把不同的行拆分之后的元素,匯總到一個集合中
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.flatmapimport org.apache.flink.api.scala.{ExecutionEnvironment, _}object Run {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements("c a b d a c","d c a b c d")val dataSet2 = dataSet.flatMap(_.toUpperCase().split(" "))dataSet2.print()}}
C
A
B
D
A
C
D
C
A
B
C
D
filter 對集合元素,進行一一遍歷處理,只過濾滿足條件的元素 示例功能:過濾空格數據 package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.filterimport org.apache.flink.api.scala.{ExecutionEnvironment, _}/*** filter 過濾器,對數據進行過濾處理*/
object Run {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements("c a b d a c","d c a b c d")val dataSet2 = dataSet.flatMap(_.toUpperCase().split(" ")).filter(_.nonEmpty)dataSet2.print()}}
C
A
B
D
A
C
D
C
A
B
C
D
reduce 對集合中所有元素,兩兩之間進行reduce函數表達式的計算 示例功能:統計所有數據的和 package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.mappackage com.opensourceteams.module.bigdata.flink.example.dataset.transformation.reduceimport org.apache.flink.api.scala.{ExecutionEnvironment, _}/*** 相當于進行所有元素的累加操作,求和操作*/
object Run {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements(3,5,8,9)// 3 + 5 + 8 + 9val dataSet2 = dataSet.reduce((a,b) => {println(s"${a} + ${b} = ${a +b}")a + b})dataSet2.print()}}
3 + 5 = 8
8 + 8 = 16
16 + 9 = 25
25
reduce (先groupBy) 對集合中所有元素,按指定的key分組,按組執行reduce 示例功能:按key分組統計所有數據的和 package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.reduceimport org.apache.flink.api.scala.{ExecutionEnvironment, _}/*** 相當于按key進行分組,然后對組內的元素進行的累加操作,求和操作*/
object ReduceGroupRun2 {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements(("a",1),("b",1),("c",1),("a",1),("c",1),("d",1),("f",1),("g",1),("f",1))/*** (a,1)* (b,1)* (c,1)* (a,1)* (c,1)* (d,1)* (f,1)* (g,1)*/val dataSet2 = dataSet.groupBy(0).reduce((x,y) => {(x._1,x._2 + y._2)})dataSet2.print()}}
(d,1)
(a,2)
(f,2)
(b,1)
(c,2)
(g,1)
groupBy (class Fields) 對集合中所有元素,按用例類中的屬性,進行分組 示例功能:按key分組統計所有數據的和 package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.groupByClassFieldsimport org.apache.flink.api.scala.{ExecutionEnvironment, _}/*** 相當于按key進行分組,然后對組內的元素進行的累加操作,求和操作*/
object ReduceGroupRun {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements("a","b","c","a","c","d","f","g","f")/*** (a,1)* (b,1)* (c,1)* (a,1)* (c,1)* (d,1)* (f,1)* (g,1)*/val dataSet2 = dataSet.map(WordCount(_,1)).groupBy("word").reduce((x,y) => WordCount(x.word, x.count + y.count))dataSet2.print()}case class WordCount(word:String,count:Int)}
WordCount(d,1)
WordCount(a,2)
WordCount(f,2)
WordCount(b,1)
WordCount(c,2)
WordCount(g,1)
groupBy (key Selector) 對集合中所有元素,按key 選擇器進行分組 示例功能:按key分組統計所有數據的和 package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.groupByKeySelectorimport org.apache.flink.api.scala.{ExecutionEnvironment, _}/*** 相當于按key進行分組,然后對組內的元素進行的累加操作,求和操作*/
object ReduceGroupRun {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements("a","b","c","a","c","d","f","g","f")/*** (a,1)* (b,1)* (c,1)* (a,1)* (c,1)* (d,1)* (f,1)* (g,1)*/val dataSet2 = dataSet.map((_,1)).groupBy(_._1).reduce((x,y) => (x._1,x._2 +y._2))dataSet2.print()}}
WordCount(d,1)
WordCount(a,2)
WordCount(f,2)
WordCount(b,1)
WordCount(c,2)
WordCount(g,1)
reduceGroup 對集合中所有元素,按指定的key分組,把相同key的元素,做為參數,調用reduceGroup()函數 示例功能:按key分組統計所有數據的和
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.reduceGroupimport org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.util.Collector/*** 相同的key的元素,都一次做為參數傳進來了*/
object ReduceGroupRun {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)val dataSet = env.fromElements("a","a","c","b","a")/*** 中間數據* (a,1)* (a,1)* (c,1)* (b,1)* (a,1)*/val result = dataSet.map((_,1)).groupBy(0).reduceGroup((in, out: Collector[(String,Int)]) =>{var count = 0 ;var word = "";while (in.hasNext){val next = in.next()word = next._1count = count + next._2}out.collect((word,count))})result.print()}}
(a,3)
(b,1)
(c,1)
combineGroup 對集合中所有元素,按指定的key分組,把相同key的元素,做為參數,調用combineGroup()函數,會在本地進行合并 示例功能:按key分組統計所有數據的和
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.combineGroupimport org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.util.Collector/*** 相同的key的元素,都一次做為參數傳進來了*/
object Run {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)val dataSet = env.fromElements("a","a","c","b","a")/*** 中間數據* (a,1)* (a,1)* (c,1)* (b,1)* (a,1)*/val result = dataSet.map((_,1)).groupBy(0).combineGroup((in, out: Collector[(String,Int)]) =>{var count = 0 ;var word = "";while (in.hasNext){val next = in.next()word = next._1count = count + next._2}out.collect((word,count))})result.print()}}
(a,3)
(b,1)
(c,1)
Aggregate sum 按key分組 對Tuple2(String,Int) 中value進行求和操作 package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.aggregate.sumimport org.apache.flink.api.scala.{ExecutionEnvironment, _}/*** 相當于按key進行分組,然后對組內的元素進行的累加操作,求和操作*/
object Run {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements(("a",3),("b",1),("c",5),("a",1),("c",1),("d",1),("f",1),("g",1),("f",1))/*** (a,1)* (b,1)* (c,1)* (a,1)* (c,1)* (d,1)* (f,1)* (g,1)*/val dataSet2 = dataSet.sum(1)dataSet2.print()}}
(f,15)
Aggregate max 按key分組 對Tuple2(String,Int) 中value進行求最大值操作
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.aggregate.maximport org.apache.flink.api.scala.{ExecutionEnvironment, _}/*** 相當于按key進行分組,然后對組內的元素進行的累加操作,求和操作*/
object Run {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements(("a",3),("b",1),("c",5),("a",1),("c",1),("d",1),("f",1),("g",1),("f",1))/*** (a,1)* (b,1)* (c,1)* (a,1)* (c,1)* (d,1)* (f,1)* (g,1)*/val dataSet2 = dataSet.max(1)dataSet2.print()}}
(f,5)
Aggregate min 按key分組 對Tuple2(String,Int) 中value進行求最小值操作
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.aggregate.minimport org.apache.flink.api.scala.{ExecutionEnvironment, _}/*** 相當于按key進行分組,然后對組內的元素進行的累加操作,求和操作*/
object Run {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements(("a",3),("b",1),("c",5),("a",1),("c",1),("d",1),("f",1),("g",1),("f",1))/*** (a,1)* (b,1)* (c,1)* (a,1)* (c,1)* (d,1)* (f,1)* (g,1)*/val dataSet2 = dataSet.min(1)dataSet2.print()}}
(f,1)
Aggregate sum (groupBy) 按key分組 對Tuple2(String,Int) 中的所有元素進行求和操作 示例功能:按key分組統計所有數據的和 package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.aggregate.sumimport org.apache.flink.api.scala.{ExecutionEnvironment, _}/*** 相當于按key進行分組,然后對組內的元素進行的累加操作,求和操作*/
object Run {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements(("a",1),("b",1),("c",1),("a",1),("c",1),("d",1),("f",1),("g",1),("f",1))/*** (a,1)* (b,1)* (c,1)* (a,1)* (c,1)* (d,1)* (f,1)* (g,1)*/val dataSet2 = dataSet.groupBy(0).sum(1)dataSet2.print()}}
(d,1)
(a,2)
(f,2)
(b,1)
(c,2)
(g,1)
Aggregate max (groupBy) 等于 maxBy 按key分組 對Tuple2(String,Int) 中value 進行求最大值 示例功能:按key分組統計最大值 package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.aggregate.maximport org.apache.flink.api.scala.{ExecutionEnvironment, _}/*** 相當于按key進行分組,然后對組內的元素進行的累加操作,求和操作*/
object Run {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements(("a",2),("b",1),("c",4),("a",1),("c",1),("d",1),("f",1),("g",1),("f",1))/*** (a,1)* (b,1)* (c,1)* (a,1)* (c,1)* (d,1)* (f,1)* (g,1)*/val dataSet2 = dataSet.groupBy(0).max(1)dataSet2.print()}}
(d,1)
(a,2)
(f,1)
(b,1)
(c,4)
(g,1)
Aggregate min (groupBy) 等于minBy 按key分組 對Tuple2(String,Int) 中value 進行求最小值 示例功能:按key分組統計最小值 package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.aggregate.maximport org.apache.flink.api.scala.{ExecutionEnvironment, _}/*** 相當于按key進行分組,然后對組內的元素進行的累加操作,求和操作*/
object Run {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements(("a",2),("b",1),("c",4),("a",1),("c",1),("d",1),("f",1),("g",1),("f",1))/*** (a,1)* (b,1)* (c,1)* (a,1)* (c,1)* (d,1)* (f,1)* (g,1)*/val dataSet2 = dataSet.groupBy(0).min(1)dataSet2.print()}}
(d,1)
(a,1)
(f,1)
(b,1)
(c,1)
(g,1)
distinct 去重 package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.aggregate.distinctimport org.apache.flink.api.scala.{ExecutionEnvironment, _}/*** 相當于按key進行分組,然后對組內的元素進行的累加操作,求和操作*/
object Run {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements(("a",3),("b",1),("c",5),("a",1),("c",1),("d",1),("f",1),("g",1),("f",1))/*** (a,1)* (b,1)* (c,1)* (a,1)* (c,1)* (d,1)* (f,1)* (g,1)*/val dataSet2 = dataSet.distinct(1)dataSet2.print()}}
(a,3)
(b,1)
(c,5)
join package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.joinimport org.apache.flink.api.scala.{ExecutionEnvironment, _}object Run {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements(("a",3),("b",1),("c",5),("a",1),("c",1),("d",1),("f",1),("g",1),("f",1))val dataSet2 = env.fromElements(("d",1),("f",1),("g",1),("f",1))//全外連接val dataSet3 = dataSet.join(dataSet2).where(0).equalTo(0)dataSet3.print()}}
((d,1),(d,1))
((f,1),(f,1))
((f,1),(f,1))
((f,1),(f,1))
((f,1),(f,1))
((g,1),(g,1))
join (Function) package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.joinFunctionimport org.apache.flink.api.scala.{ExecutionEnvironment, _}object Run {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements(("a",3),("b",1),("c",5),("a",1),("c",1),("d",1),("f",2),("g",5))val dataSet2 = env.fromElements(("g",1),("f",1))//全外連接val dataSet3 = dataSet.join(dataSet2).where(0).equalTo(0){(x,y) => (x._1,x._2+ y._2)}dataSet3.print()}}
(f,3)
(g,6)
leftOuterJoin 左外連接,左邊的Dataset中的每一個元素,去連接右邊的元素 package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.leftOuterJoinimport org.apache.flink.api.scala.{ExecutionEnvironment, _}object Run {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements(("a",3),("b",1),("c",5),("a",1),("c",1),("d",1),("f",2),("g",5))val dataSet2 = env.fromElements(("g",1),("f",1))//全外連接val dataSet3 = dataSet.leftOuterJoin(dataSet2).where(0).equalTo(0){(x,y) => {var count = 0;if(y != null ){count = y._2}(x._1,x._2+ count)}}dataSet3.print()}}
(d,1)
(a,3)
(a,1)
(f,3)
(b,1)
(c,5)
(c,1)
(g,6)
rightOuterJoin 右外連接,左邊的Dataset中的每一個元素,去連接左邊的元素
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.rightOuterJoinimport org.apache.flink.api.scala.{ExecutionEnvironment, _}object Run {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements(("a",3),("b",1),("c",5),("a",1),("c",1),("d",1),("f",2),("g",5))val dataSet2 = env.fromElements(("g",1),("f",1))//全外連接val dataSet3 = dataSet.rightOuterJoin(dataSet2).where(0).equalTo(0){(x,y) => {var count = 0;if(x != null ){count = x._2}(x._1,y._2 + count)}}dataSet3.print()}}
(f,2)
(g,2)
fullOuterJoin
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.fullOuterJoinimport org.apache.flink.api.scala.{ExecutionEnvironment, _}object Run {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements(("a",3),("b",1),("c",5),("a",1),("c",1),("d",1),("f",2),("g",5))val dataSet2 = env.fromElements(("g",1),("f",1))//全外連接val dataSet3 = dataSet.fullOuterJoin(dataSet2).where(0).equalTo(0){(x,y) => {var countY = 0;if(y != null ){countY = y._2}var countX = 0;if(x != null ){countX = x._2}(x._1,countX + countY)}}dataSet3.print()}}
(f,2)
(g,2)
union
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.unionimport org.apache.flink.api.scala.{ExecutionEnvironment, _}object Run {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements(("a",1),("g",1),("f",1))val dataSet2 = env.fromElements(("d",1),("f",1),("g",1),("f",1))//全外連接val dataSet3 = dataSet.union(dataSet2)dataSet3.print()}}
(a,1)
(d,1)
(g,1)
(f,1)
(f,1)
(g,1)
(f,1)
first n
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.firstimport org.apache.flink.api.scala.{ExecutionEnvironment, _}object Run {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements(("a",3),("b",1),("c",5),("a",1),("c",1),("d",1),("f",1),("g",1),("f",1))//全外連接val dataSet3 = dataSet.first(3)dataSet3.print()}}
(a,3)
(b,1)
(c,5)
coGroup 相當于,取出兩個數據集的所有去重的key,然后,再把第一個DataSet中的這個key的所有元素放到可迭代對象中,再把第二個DataSet中的這個key的所有元素放到可迭代對象中
package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.cogroupimport java.langimport org.apache.flink.api.common.functions.CoGroupFunction
import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.util.Collectorobject Run {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements(("a",1),("g",1),("a",1))val dataSet2 = env.fromElements(("a",1),("f",1))//全外連接val dataSet3 = dataSet.coGroup(dataSet2).where(0).equalTo(0){new CoGroupFunction[(String,Int),(String,Int), Collector[(String,Int)]] {override def coGroup(first: lang.Iterable[(String, Int)], second: lang.Iterable[(String, Int)], out: Collector[Collector[(String, Int)]]): Unit = {println("==============開始")println("first")println(first)val iteratorFirst = first.iterator()while (iteratorFirst.hasNext()){println(iteratorFirst.next())}println("second")println(second)val iteratorSecond = second.iterator()while (iteratorSecond.hasNext()){println(iteratorSecond.next())}println("==============結束")}}}dataSet3.print()}}
==============開始
first
org.apache.flink.runtime.util.NonReusingKeyGroupedIterator$ValuesIterator@3500e7b0
(a,1)
(a,1)
second
org.apache.flink.runtime.util.NonReusingKeyGroupedIterator$ValuesIterator@41230ea2
(a,1)
==============結束
==============開始
first
org.apache.flink.runtime.util.NonReusingKeyGroupedIterator$ValuesIterator@14602d0a
(g,1)
second
[]
==============結束
==============開始
first
[]
second
org.apache.flink.runtime.util.NonReusingKeyGroupedIterator$ValuesIterator@2b0a15b5
(f,1)
==============結束Process finished with exit code 0
cross package com.opensourceteams.module.bigdata.flink.example.dataset.transformation.crossimport org.apache.flink.api.scala.{ExecutionEnvironment, _}object Run {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentval dataSet = env.fromElements(("a",1),("g",1),("f",1))val dataSet2 = env.fromElements(("d",1),("f",1),("g",1),("f",1))//全外連接val dataSet3 = dataSet.cross(dataSet2)dataSet3.print()}}
((a,1),(d,1))
((a,1),(f,1))
((a,1),(g,1))
((a,1),(f,1))
((g,1),(d,1))
((g,1),(f,1))
((g,1),(g,1))
((g,1),(f,1))
((f,1),(d,1))
((f,1),(f,1))
((f,1),(g,1))
((f,1),(f,1))
創作挑戰賽 新人創作獎勵來咯,堅持創作打卡瓜分現金大獎
總結
以上是生活随笔 為你收集整理的Flink 1.7.2 dataset transformation 示例 的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔 網站內容還不錯,歡迎將生活随笔 推薦給好友。