Apache Flink 零基础入门(十一)Flink transformation
前面講了常用的DataSource的用法,DataSource其實是把數據加載進來,加載進來之后就需要做Transformation操作了。
Data transformations transform one or more DataSets into a new DataSet. Programs can combine multiple transformations into sophisticated assemblies.
?數據轉化可以將一個或多個DataSets轉化到一個新的DataSet。就是一個算法的綜合使用。
Map Function
Scala
新建一個Object
object DataSetTransformationApp {def main(args: Array[String]): Unit = {val environment = ExecutionEnvironment.getExecutionEnvironment}def mapFunction(env: ExecutionEnvironment): Unit = {val data = env.fromCollection(List(1,2,3,4,5,6,7,8,9,10))}}這里的數據源是一個1到10的list集合。Map的原理是:假設data數據集中有N個元素,將每一個元素進行轉化:
data.map { x => x.toInt }好比:y=f(x)
// 對data中的每一個元素都去做一個+1操作data.map((x:Int) => x + 1 ).print()然后對每一個元素都做一個+1操作。
簡單寫法:
如果這個里面只有一個元素,就可以直接寫成下面形式:
data.map((x) => x + 1).print()更簡潔的寫法:
data.map(x => x + 1).print()更簡潔的方法:
data.map(_ + 1).print()輸出結果:
2 3 4 5 6 7 8 9 10 11Java
public static void main(String[] args) throws Exception {ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();mapFunction(executionEnvironment);}public static void mapFunction(ExecutionEnvironment executionEnvironment) throws Exception {List<String> list = new ArrayList<>();for (int i = 1; i <= 10; i++) {list.add(i + "");}DataSource<String> data = executionEnvironment.fromCollection(list);data.map(new MapFunction<String, Integer>() {public Integer map(String input) {return Integer.parseInt(input) + 1;}}).print();}因為我們定義的List是一個String的泛型,因此MapFunction的泛型是<String, Integer>,第一個參數表示輸入的類型,第二個參數表示輸出是一個Integer類型。
Filter Function
將每個元素執行+1操作,并取出大于5的元素。
Scala
def filterFunction(env: ExecutionEnvironment): Unit = {val data = env.fromCollection(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))data.map(_ + 1).filter(_ > 5).print()}filter只會返回滿足條件的記錄。
Java
public static void filterFunction(ExecutionEnvironment env) throws Exception {List<Integer> list = new ArrayList<>();for (int i = 1; i <= 10; i++) {list.add(i);}DataSource<Integer> data = env.fromCollection(list);data.map(new MapFunction<Integer, Integer>() {public Integer map(Integer input) {return input + 1;}}).filter(new FilterFunction<Integer>() {@Overridepublic boolean filter(Integer input) throws Exception {return input > 5;}}).print();}MapPartition Function
map function 與 MapPartition function有什么區別?
需求:DataSource 中有100個元素,把結果存儲在數據庫中
如果使用map function ,那么實現方法如下:
// DataSource 中有100個元素,把結果存儲在數據庫中def mapPartitionFunction(env: ExecutionEnvironment): Unit = {val students = new ListBuffer[String]for (i <- 1 to 100) {students.append("Student" + i)}val data = env.fromCollection(students)data.map(x=>{// 每一個元素要存儲到數據庫中去,肯定需要先獲取到connectionval connection = DBUtils.getConnection()println(connection + " ... ")// TODO .... 保存數據到DBDBUtils.returnConnection(connection)}).print()}打印結果,將會打印100個獲取DBUtils.getConnection()的請求。如果數據量增多,顯然不停的獲取連接是不現實的。
因此MapPartition就應運而生了,轉換一個分區里面的數據,也就是說一個分區中的數據調用一次。
因此要首先設置分區:
val data = env.fromCollection(students).setParallelism(4)設置4個分區,也就是并行度,然后使用mapPartition來處理:
data.mapPartition(x => {val connection = DBUtils.getConnection()println(connection + " ... ")// TODO .... 保存數據到DBDBUtils.returnConnection(connection)x}).print()那么就會的到4次連接請求,每一個分區獲取一個connection。
Java
public static void mapPartitionFunction(ExecutionEnvironment env) throws Exception {List<String> list = new ArrayList<>();for (int i = 1; i <= 100; i++) {list.add("student:" + i);}DataSource<String> data = env.fromCollection(list);/*data.map(new MapFunction<String, String>() {@Overridepublic String map(String input) throws Exception {String connection = DBUtils.getConnection();System.out.println("connection = [" + connection + "]");DBUtils.returnConnection(connection);return input;}}).print();*/data.mapPartition(new MapPartitionFunction<String, Object>() {@Overridepublic void mapPartition(Iterable<String> values, Collector<Object> out) throws Exception {String connection = DBUtils.getConnection();System.out.println("connection = [" + connection + "]");DBUtils.returnConnection(connection);}}).print();}first? ?groupBy?sortGroup
Scala
first表示獲取前幾個,groupBy表示分組,sortGroup表示分組內排序
def firstFunction(env:ExecutionEnvironment): Unit = {val info = ListBuffer[(Int, String)]()info.append((1, "hadoop"))info.append((1, "spark"))info.append((1, "flink"))info.append((2, "java"))info.append((2, "springboot"))info.append((3, "linux"))info.append((4, "vue"))val data = env.fromCollection(info)data.first(3).print()//輸出:(1,hadoop)//(1,spark)//(1,flink)data.groupBy(0).first(2).print()//根據第一個字段分組,每個分組獲取前兩個數據//(3,linux)//(1,hadoop)//(1,spark)//(2,java)//(2,springboot)//(4,vue)data.groupBy(0).sortGroup(1, Order.ASCENDING).first(2).print() //根據第一個字段分組,然后在分組內根據第二個字段升序排序,并取出前兩個數據//輸出(3,linux)//(1,flink)//(1,hadoop)//(2,java)//(2,springboot)//(4,vue)}Java
public static void firstFunction(ExecutionEnvironment env) throws Exception {List<Tuple2<Integer, String>> info = new ArrayList<>();info.add(new Tuple2<>(1, "hadoop"));info.add(new Tuple2<>(1, "spark"));info.add(new Tuple2<>(1, "flink"));info.add(new Tuple2<>(2, "java"));info.add(new Tuple2<>(2, "springboot"));info.add(new Tuple2<>(3, "linux"));info.add(new Tuple2<>(4, "vue"));DataSource<Tuple2<Integer, String>> data = env.fromCollection(info);data.first(3).print();data.groupBy(0).first(2).print();data.groupBy(0).sortGroup(1, Order.ASCENDING).first(2).print();}FlatMap Function
獲取一個元素,然后產生0個、1個或多個元素
Scala
def flatMapFunction(env: ExecutionEnvironment): Unit = {val info = ListBuffer[(String)]()info.append("hadoop,spark");info.append("hadoop,flink");info.append("flink,flink");val data = env.fromCollection(info)data.flatMap(_.split(",")).print()}輸出:
hadoop spark hadoop flink flink flinkFlatMap將每個元素都用逗號分割,然后變成多個。
經典例子:
data.flatMap(_.split(",")).map((_,1)).groupBy(0).sum(1).print()將每個元素用逗號分割,然后每個元素做map,然后根據第一個字段分組,然后根據第二個字段求和。
輸出結果如下:
(hadoop,2) (flink,3) (spark,1)Java
同樣實現一個經典案例wordcount
public static void flatMapFunction(ExecutionEnvironment env) throws Exception {List<String> info = new ArrayList<>();info.add("hadoop,spark");info.add("hadoop,flink");info.add("flink,flink");DataSource<String> data = env.fromCollection(info);data.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String input, Collector<String> out) throws Exception {String[] splits = input.split(",");for(String split: splits) {//發送出去out.collect(split);}}}).map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String value) throws Exception {return new Tuple2<>(value,1);}}).groupBy(0).sum(1).print();}Distinct
去重操作
Scala
def distinctFunction(env: ExecutionEnvironment): Unit = {val info = ListBuffer[(String)]()info.append("hadoop,spark");info.append("hadoop,flink");info.append("flink,flink");val data = env.fromCollection(info)data.flatMap(_.split(",")).distinct().print()}這樣就將每一個元素都做了去重操作。輸出如下:
hadoop flink sparkJava
public static void distinctFunction(ExecutionEnvironment env) throws Exception {List<String> info = new ArrayList<>();info.add("hadoop,spark");info.add("hadoop,flink");info.add("flink,flink");DataSource<String> data = env.fromCollection(info);data.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String input, Collector<String> out) throws Exception {String[] splits = input.split(",");for(String split: splits) {//發送出去out.collect(split);}}}).distinct().print();}Join
Joins two data sets by creating all pairs of elements that are equal on their keys. Optionally uses a JoinFunction to turn the pair of elements into a single element, or a FlatJoinFunction to turn the pair of elements into arbitrarily many (including none) elements. See the?keys section?to learn how to define join keys.
result = input1.join(input2).where(0) // key of the first input (tuple field 0).equalTo(1); // key of the second input (tuple field 1)?表示第一個tuple input1中的第0個字段,與第二個tuple input2中的第一個字段進行join。
def joinFunction(env: ExecutionEnvironment): Unit = {val info1 = ListBuffer[(Int, String)]() //編號 名字info1.append((1, "hadoop"))info1.append((2, "spark"))info1.append((3, "flink"))info1.append((4, "java"))val info2 = ListBuffer[(Int, String)]() //編號 城市info2.append((1, "北京"))info2.append((2, "上海"))info2.append((3, "深圳"))info2.append((5, "廣州"))val data1 = env.fromCollection(info1)val data2 = env.fromCollection(info2)data1.join(data2).where(0).equalTo(0).apply((first, second)=>{(first._1, first._2, second._2)}).print()}輸出結果如下:
(3,flink,深圳) (1,hadoop,北京) (2,spark,上海)Java
public static void joinFunction(ExecutionEnvironment env) throws Exception {List<Tuple2<Integer, String>> info1 = new ArrayList<>(); //編號 名字info1.add(new Tuple2<>(1, "hadoop"));info1.add(new Tuple2<>(2, "spark"));info1.add(new Tuple2<>(3, "flink"));info1.add(new Tuple2<>(4, "java"));List<Tuple2<Integer, String>> info2 = new ArrayList<>(); //編號 城市info2.add(new Tuple2<>(1, "北京"));info2.add(new Tuple2<>(2, "上海"));info2.add(new Tuple2<>(3, "深圳"));info2.add(new Tuple2<>(5, "廣州"));DataSource<Tuple2<Integer, String>> data1 = env.fromCollection(info1);DataSource<Tuple2<Integer, String>> data2 = env.fromCollection(info2);data1.join(data2).where(0).equalTo(0).with(new JoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple3<Integer, String, String>>() {@Overridepublic Tuple3<Integer, String, String> join(Tuple2<Integer, String> first, Tuple2<Integer, String> second) throws Exception {return new Tuple3<Integer, String, String>(first.f0, first.f1,second.f1);}}).print();}Tuple2<Integer, String>, Tuple2<Integer, String>表示兩個輸入的集合,Tuple3<Integer, String, String>>表示輸出的Tuple3
OuterJoin
上面講的join是內連接,這個OuterJoin是外連接,包括左外連接,右外連接,全連接在兩個數據集上。
def outJoinFunction(env: ExecutionEnvironment): Unit = {val info1 = ListBuffer[(Int, String)]() //編號 名字info1.append((1, "hadoop"))info1.append((2, "spark"))info1.append((3, "flink"))info1.append((4, "java"))val info2 = ListBuffer[(Int, String)]() //編號 城市info2.append((1, "北京"))info2.append((2, "上海"))info2.append((3, "深圳"))info2.append((5, "廣州"))val data1 = env.fromCollection(info1)val data2 = env.fromCollection(info2)data1.leftOuterJoin(data2).where(0).equalTo(0).apply((first, second) => {if (second == null) {(first._1, first._2, "-")}else {(first._1, first._2, second._2)}}).print() //左外連接 把左邊的所有數據展示出來}左外連接,當左邊的數據在右邊沒有對應的數據時,需要進行處理,否則會出現空指針異常。輸出如下:
(3,flink,深圳) (1,hadoop,北京) (2,spark,上海) (4,java,-)右外連接:
data1.rightOuterJoin(data2).where(0).equalTo(0).apply((first, second) => {if (first == null) {(second._1, "-", second._2)}else {(first._1, first._2, second._2)}}).print()右外連接,輸出:
(3,flink,深圳) (1,hadoop,北京) (5,-,廣州) (2,spark,上海)全連接:
data1.fullOuterJoin(data2).where(0).equalTo(0).apply((first, second) => {if (first == null) {(second._1, "-", second._2)}else if (second == null){(second._1, "-", second._2)} else {(first._1, first._2, second._2)}}).print() (3,flink,深圳) (1,hadoop,北京) (5,-,廣州) (2,spark,上海) (4,java,-)Java
左外連接:
public static void outjoinFunction(ExecutionEnvironment env) throws Exception {List<Tuple2<Integer, String>> info1 = new ArrayList<>(); //編號 名字info1.add(new Tuple2<>(1, "hadoop"));info1.add(new Tuple2<>(2, "spark"));info1.add(new Tuple2<>(3, "flink"));info1.add(new Tuple2<>(4, "java"));List<Tuple2<Integer, String>> info2 = new ArrayList<>(); //編號 城市info2.add(new Tuple2<>(1, "北京"));info2.add(new Tuple2<>(2, "上海"));info2.add(new Tuple2<>(3, "深圳"));info2.add(new Tuple2<>(5, "廣州"));DataSource<Tuple2<Integer, String>> data1 = env.fromCollection(info1);DataSource<Tuple2<Integer, String>> data2 = env.fromCollection(info2);data1.leftOuterJoin(data2).where(0).equalTo(0).with(new JoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple3<Integer, String, String>>() {@Overridepublic Tuple3<Integer, String, String> join(Tuple2<Integer, String> first, Tuple2<Integer, String> second) throws Exception {if(second == null) {return new Tuple3<Integer, String, String>(first.f0, first.f1, "-");}return new Tuple3<Integer, String, String>(first.f0, first.f1,second.f1);}}).print();}右外連接:
data1.rightOuterJoin(data2).where(0).equalTo(0).with(new JoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple3<Integer, String, String>>() {@Overridepublic Tuple3<Integer, String, String> join(Tuple2<Integer, String> first, Tuple2<Integer, String> second) throws Exception {if (first == null) {return new Tuple3<Integer, String, String>(second.f0, "-", second.f1);}return new Tuple3<Integer, String, String>(first.f0, first.f1, second.f1);}}).print();全連接:
data1.fullOuterJoin(data2).where(0).equalTo(0).with(new JoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple3<Integer, String, String>>() {@Overridepublic Tuple3<Integer, String, String> join(Tuple2<Integer, String> first, Tuple2<Integer, String> second) throws Exception {if (first == null) {return new Tuple3<Integer, String, String>(second.f0, "-", second.f1);} else if (second == null) {return new Tuple3<Integer, String, String>(first.f0, first.f1, "-");}return new Tuple3<Integer, String, String>(first.f0, first.f1, second.f1);}}).print();cross function
Scala
笛卡爾積,左邊與右邊交叉處理
def crossFunction(env: ExecutionEnvironment): Unit = {val info1 = List("喬峰", "慕容復")val info2 = List(3,1,0)val data1 = env.fromCollection(info1)val data2 = env.fromCollection(info2)data1.cross(data2).print()}輸出:
(喬峰,3) (喬峰,1) (喬峰,0) (慕容復,3) (慕容復,1) (慕容復,0)Java
public static void crossFunction(ExecutionEnvironment env) throws Exception {List<String> info1 = new ArrayList<>();info1.add("喬峰");info1.add("慕容復");List<String> info2 = new ArrayList<>();info2.add("3");info2.add("1");info2.add("0");DataSource<String> data1 = env.fromCollection(info1);DataSource<String> data2 = env.fromCollection(info2);data1.cross(data2).print();}Broadcast?Variables
廣播變量是一組數據,這些數據可以用來清洗數據,廣播變量常駐在內存中,所以數據量一定不能太大
總結
以上是生活随笔為你收集整理的Apache Flink 零基础入门(十一)Flink transformation的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Docker 解决容器时间与主机时间不一
- 下一篇: Apache Flink 零基础入门(十