09_Flink入门案例、word-count程序(java和scala版本)、添加依赖、Flink Streaming和Batch的区别 、在集群上执行程序等
1.9.Flink入門案例-wordCount
1.9.1.開發工具
1.9.2.編寫java版本word-count程序
1.9.2.1.添加Flink Maven依賴
1.9.2.2.編寫wordcount的java代碼
1.9.2.3.數據準備
1.9.2.4.執行結果
1.9.3.編寫scala版本word-count程序
1.9.3.1.添加Flink依賴
1.9.3.2.編寫wordcount的scala程序
1.9.4.Flink StreamingWindowWordCount
1.9.5.Flink程序開發步驟
1.9.6.Flink Streaming和Batch的區別 (API使用層面)
1.9.7.在集群上執行程序
1.9.8.集群跑jar的時候pom文件中需要進行的build配置
1.9.Flink入門案例-wordCount
1.9.1.開發工具
?官方建議使用Intellij IDEA,因為它默認集成scala和maven環境,使用更加方便
?開發flink程序,可以使用java或者scala語言。
個人建議,使用scala,因為實現起來更加簡潔。使用java代碼實現函數式編程比較別扭。
?建議使用maven國內鏡像倉庫地址
國外倉庫下載較慢,可以使用國內阿里云的maven倉庫
注意:如果發現國內源下載提示找不到依賴的時候,記得切換回國外源
國內鏡像倉庫配置見備注
1.9.2.編寫java版本word-count程序
新建一個FLINK IDEA工程,如下:
1.9.2.1.添加Flink Maven依賴
你只要將以下依賴添加到pom.xml中,就能在項目中引入Apache Flink。這些依賴項包含了本地執行環境,因此支持本地測試。
Scala API:為了使用Scala API,將flink-java的artifact id替換為flink-scala_2.11,同時將flink-streaming-java_2.11替換為flink-streaming-scala_2.11。
總的pom.xml文件內容如下:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.toto.learn</groupId><artifactId>flink-learn</artifactId><version>1.0-SNAPSHOT</version><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.11.1</version></dependency><!--1.compile : 默認的scope,運行期有效,需要打入包中。2.provided : 編譯器有效,運行期不需要提供,不會打入包中。3.runtime : 編譯不需要,在運行期有效,需要導入包中。(接口與實現分離)4.test : 測試需要,不會打入包中5.system : 非本地倉庫引入、存在系統的某個路徑下的jar。(一般不使用)--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>1.11.1</version><!--<scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>1.11.1</version></dependency><!-- 使用scala編程的時候使用下面的依賴 start--><!--<dependency><groupId>org.apache.flink</groupId><artifactId>flink-scala_2.12</artifactId><version>1.11.1</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_2.12</artifactId><version>1.11.1</version></dependency>--><!-- 使用scala編程的時候使用下面的依賴 end--></dependencies><build><plugins><!-- 編譯插件 --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.6.0</version><configuration><source>1.8</source><target>1.8</target><encoding>UTF-8</encoding></configuration></plugin><!-- scala編譯插件 --><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.1.6</version><configuration><scalaCompatVersion>2.11</scalaCompatVersion><scalaVersion>2.11.12</scalaVersion><encoding>UTF-8</encoding></configuration><executions><execution><id>compile-scala</id><phase>compile</phase><goals><goal>add-source</goal><goal>compile</goal></goals></execution><execution><id>test-compile-scala</id><phase>test-compile</phase><goals><goal>add-source</goal><goal>testCompile</goal></goals></execution></executions></plugin><!-- 打jar包插件(會包含所有依賴) --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>2.6</version><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs><archive><manifest><!-- 可以設置jar包的入口類(可選) --><mainClass>com.toto.learn.batch.BatchWordCountJava</mainClass></manifest></archive></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build></project>1.9.2.2.編寫wordcount的java代碼
import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector;/*** @author tuzuoquan* @version 1.0* @ClassName BatchWordCountJava* @description TODO* @date 2020/9/11 14:03**/ public class BatchWordCountJava {public static void main(String[] args) throws Exception {String inputPath = "E:/workspace/wordcount/input";String outputPath = "E:/workspace/wordcount/output";// 獲取運行環境ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();//獲取文件中的內容DataSource<String> text = env.readTextFile(inputPath);//groupBy(num) :按照第幾列進行排序;sum(num):排序后將第二列的值進行求和DataSet<Tuple2<String,Integer>> counts = text.flatMap(new Tokenizer()).groupBy(0).sum(1);counts.writeAsCsv(outputPath,"\n"," ").setParallelism(1);env.execute("batch word count");}public static class Tokenizer implements FlatMapFunction<String, Tuple2<String,Integer>>{@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {//\w 匹配字母或數字或下劃線或漢字 等價于[^A-Za-z0-9_]//\W 非數字字母下劃線String[] tokens = value.toLowerCase().split("\\W+");for (String token: tokens) {if (token.length() > 0) {//轉變成 word 1的格式。每個新的單詞字數都是1out.collect(new Tuple2<String,Integer>(token,1));}}}}}1.9.2.3.數據準備
words.txt中的內容如下:
1.9.2.4.執行結果
output的內容如下:
1.9.3.編寫scala版本word-count程序
1.9.3.1.添加Flink依賴
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.toto.learn</groupId><artifactId>flink-learn</artifactId><version>1.0-SNAPSHOT</version><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.11.1</version></dependency><!--1.compile : 默認的scope,運行期有效,需要打入包中。2.provided : 編譯器有效,運行期不需要提供,不會打入包中。3.runtime : 編譯不需要,在運行期有效,需要導入包中。(接口與實現分離)4.test : 測試需要,不會打入包中5.system : 非本地倉庫引入、存在系統的某個路徑下的jar。(一般不使用)--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>1.11.1</version><!--<scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>1.11.1</version></dependency><!-- 使用scala編程的時候使用下面的依賴 start--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-scala_2.12</artifactId><version>1.11.1</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_2.12</artifactId><version>1.11.1</version></dependency><!-- 使用scala編程的時候使用下面的依賴 end--></dependencies><build><plugins><!-- 編譯插件 --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.6.0</version><configuration><source>1.8</source><target>1.8</target><encoding>UTF-8</encoding></configuration></plugin><!-- scala編譯插件 --><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.1.6</version><configuration><scalaCompatVersion>2.11</scalaCompatVersion><scalaVersion>2.11.12</scalaVersion><encoding>UTF-8</encoding></configuration><executions><execution><id>compile-scala</id><phase>compile</phase><goals><goal>add-source</goal><goal>compile</goal></goals></execution><execution><id>test-compile-scala</id><phase>test-compile</phase><goals><goal>add-source</goal><goal>testCompile</goal></goals></execution></executions></plugin><!-- 打jar包插件(會包含所有依賴) --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>2.6</version><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs><archive><manifest><!-- 可以設置jar包的入口類(可選) --><mainClass>com.toto.learn.batch.BatchWordCountJava</mainClass></manifest></archive></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build></project>主要是添加了:
<!-- 使用scala編程的時候使用下面的依賴 start--> <dependency><groupId>org.apache.flink</groupId><artifactId>flink-scala_2.12</artifactId><version>1.11.1</version> </dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_2.12</artifactId><version>1.11.1</version> </dependency> <!-- 使用scala編程的時候使用下面的依賴 end-->1.9.3.2.編寫wordcount的scala程序
import org.apache.flink.api.scala.ExecutionEnvironmentobject BatchWordCountScala {def main(args: Array[String]): Unit = {val inputPath = "E:/workspace/wordcount/input"val outPut = "E:/workspace/wordcount/output"val env = ExecutionEnvironment.getExecutionEnvironmentval text = env.readTextFile(inputPath)//引入隱式轉換import org.apache.flink.api.scala._val counts = text.flatMap(_.toLowerCase.split("\\W")).filter(_.nonEmpty).map((_,1)).groupBy(0).sum(1)counts.writeAsCsv(outPut,"\n"," ").setParallelism(1)env.execute("batch word count")}}運行程序,結果如上java版本的運行結果一樣
1.9.4.Flink StreamingWindowWordCount
?需求分析
手工通過socket實時產生一些單詞,使用flink實時接收數據,對指定時間窗口內(例如:2秒)的數據進行聚合統計,并且把時間窗口計算的結果打印出來。
?代碼開發
添加對應的java依賴或者scala依賴
?執行
1:在hadoop上執行nc -l 9000
2:在本機啟動idea中的代碼
編寫程序:
import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.util.Collector;/*** @author tuzuoquan* @version 1.0* @ClassName SocketWindowWordCountJava* @description TODO* @date 2020/9/11 17:08**/ public class SocketWindowWordCountJava {public static void main(String[] args) throws Exception {//獲取需要的端口號int port;try {ParameterTool parameterTool = ParameterTool.fromArgs(args);port = parameterTool.getInt("port");}catch (Exception e){System.err.println("No port set. use default port 9000--java");port = 9000;}//1、獲取flink的運行環境org.apache.flink.streaming.api.environment.StreamExecutionEnvironment env = org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getExecutionEnvironment();String hostname = "127.0.0.1";String delimiter = "\n";//2、獲取數據源,連接socket獲取輸入的數據DataStreamSource<String> text = env.socketTextStream(hostname, port, delimiter);//3、指定算子進行運算,通過flatMap將數據打平DataStream<WordWithCount> windowCounts = text.flatMap(new FlatMapFunction<String, WordWithCount>() {@Overridepublic void flatMap(String value, Collector<WordWithCount> out) throws Exception {String[] splits = value.split("\\s");for (String word : splits) {//默認每個單詞出現1次out.collect(new WordWithCount(word,1L));}}}).keyBy("word") //根據key進行分組,它會找word中的值.timeWindow(Time.seconds(2),Time.seconds(1)) //指定時間窗口大小為2秒,指定時間間隔為1秒.sum("count"); //這里可以使用sum或reduce//4、指定數據存儲位置,這里是把它打印到控制臺。把數據打印到控制臺并設置并行度windowCounts.print().setParallelism(1);//5、執行,并且給它起個名字。這一行代碼一定要實現,否則程序不執行。env.execute("Socket window count");}public static class WordWithCount {public String word;public long count;public WordWithCount(){}public WordWithCount(String word,long count){this.word = word;this.count = count;}@Overridepublic String toString() {return "WordWithCount{" +"word='" + word + '\'' +", count=" + count +'}';}} }Scala的代碼如下:
import org.apache.flink.api.java.utils.ParameterTool import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.windowing.time.Time/*** 滑動窗口計算** 每隔1秒統計最近2秒內的數據,打印到控制臺** Created by xxx.xxx on 2018/10/8.*/ object SocketWindowWordCountScala {def main(args: Array[String]): Unit = {//獲取socket端口號val port: Int = try {ParameterTool.fromArgs(args).getInt("port")}catch {case e: Exception => {System.err.println("No port set. use default port 9000--scala")}9000}//獲取運行環境val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//鏈接socket獲取輸入數據val text = env.socketTextStream("127.0.0.1",port,'\n')//解析數據(把數據打平),分組,窗口計算,并且聚合求sum//注意:必須要添加這一行隱式轉行,否則下面的flatmap方法執行會報錯import org.apache.flink.api.scala._val windowCounts = text.flatMap(line => line.split("\\s"))//打平,把每一行單詞都切開.map(w => WordWithCount(w,1))//把單詞轉成word , 1這種形式.keyBy("word")//分組.timeWindow(Time.seconds(2),Time.seconds(1))//指定窗口大小,指定間隔時間.sum("count");// sum或者reduce都可以//.reduce((a,b)=>WordWithCount(a.word,a.count+b.count))//打印到控制臺windowCounts.print().setParallelism(1);//執行任務env.execute("Socket window count");}case class WordWithCount(word: String,count: Long) }1.9.5.Flink程序開發步驟
1:獲得一個執行環境
2:加載/創建 初始化數據
3:指定操作數據的transaction算子
4:指定把計算好的數據放在哪
5:調用execute()觸發執行程序
注意:Flink程序是延遲計算的,只有最后調用execute()方法的時候才會真正觸發執行程序。
延遲計算好處:你可以開發復雜的程序,但是Flink可以將復雜的程序轉成一個Plan,將Plan作為一個整體單元執行!
1.9.6.Flink Streaming和Batch的區別 (API使用層面)
?流處理Streaming
StreamExecutionEnvironment
DataStreaming
?批處理Batch
ExecutionEnvironment
DataSet
1.9.7.在集群上執行程序
?編譯
?需要在pom文件中添加build配置,打包時指定入口類全類名【或者在運行時動態指定】
?provided
?mvn clean package
?執行
1:在flink機器上啟動local flink集群
2:在flink機器上執行nc -l 9000
3: 在flink機器上執行./bin/flink run -c xxx.xxx.xxx.MainClass FlinkExample-xxxxx.jar --port 9000
4: 在flink機器上執行tail -f log/flink--taskexecutor-.out查看日志輸出
5: 停止任務
A:web ui界面停止
B:命令執行bin/flink cancel
1.9.8.集群跑jar的時候pom文件中需要進行的build配置
<build><plugins><!-- 編譯插件 --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.6.0</version><configuration><source>1.8</source><target>1.8</target><encoding>UTF-8</encoding></configuration></plugin><!-- scala編譯插件 --><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.1.6</version><configuration><scalaCompatVersion>2.11</scalaCompatVersion><scalaVersion>2.11.12</scalaVersion><encoding>UTF-8</encoding></configuration><executions><execution><id>compile-scala</id><phase>compile</phase><goals><goal>add-source</goal><goal>compile</goal></goals></execution><execution><id>test-compile-scala</id><phase>test-compile</phase><goals><goal>add-source</goal><goal>testCompile</goal></goals></execution></executions></plugin><!-- 打jar包插件(會包含所有依賴) --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>2.6</version><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs><archive><manifest><!-- 可以設置jar包的入口類(可選) --><mainClass>xxx.xxx.SocketWindowWordCountJava</mainClass></manifest></archive></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build>總結
以上是生活随笔為你收集整理的09_Flink入门案例、word-count程序(java和scala版本)、添加依赖、Flink Streaming和Batch的区别 、在集群上执行程序等的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 武夷山旅游 | 一站式吃遍闽北特色小吃!
- 下一篇: 哪些水果在六月份有啊!