flink的scala版本的wordcount+flink没有输出结果的几种原因
###########################################實驗步驟######################################
①啟動集群
$FLINK_HOME/bin/start-cluster.sh
②開啟一個socket
nc -lk 9999
mua mua2 mua3 mua2 mua
mua mua2 mua3 mua2 mua
mua mua2 mua3 mua2 mua
mua mua2 mua3 mua2 mua
mua mua2 mua3 mua2 mua
mua mua2 mua3 mua2 mua
mua mua2 mua3 mua2 mua
mua mua2 mua3 mua2 mua
mua mua2 mua3 mua2 mua
mua mua2 mua3 mua2 mua
mua mua2 mua3 mua2 muamua mua2 mua3 mua2 mua
mua mua2 mua3 mua2 mua
③編譯命令:
mvn scala:compile package
④提交flink任務,產生另外一個socket
flink run -c WordCount target/scala-module-dependency-sample-1.0-SNAPSHOT.jar --port 9999
?
然后上述兩個socket就會通過tcp進行通信
###########################實驗結果###################################################
輸出結果在:
$FLINK_HOME/log/flink-appleyuchi-taskexecutor-2-Desktop.out
3> WordWithCount(mua3,2)
4> WordWithCount(mua2,4)
4> WordWithCount(mua2,14)
3> WordWithCount(mua3,7)
3> WordWithCount(mua3,12)
4> WordWithCount(mua2,24)
3> WordWithCount(muamua,1)
3> WordWithCount(muamua,1)
4> WordWithCount(mua2,24)
3> WordWithCount(mua3,12)
3> WordWithCount(muamua,1)
4> WordWithCount(mua2,24)
3> WordWithCount(mua3,12)
3> WordWithCount(muamua,1)
4> WordWithCount(mua2,20)
3> WordWithCount(mua3,10)
3> WordWithCount(muamua,1)
4> WordWithCount(mua2,10)
3> WordWithCount(mua3,5)
#####################################附錄-項目結構##################################
├── pom.xml
├── src
│???└── main
│??? ? ?└── scala
│??? ? ? ? ?└── WordCount.scala
└── 運行方法.txt
#########################附錄-代碼#######################################
完整的pom.xml:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"><modelVersion>4.0.0</modelVersion><groupId>sample</groupId><artifactId>scala-module-dependency-sample</artifactId><version>1.0-SNAPSHOT</version><dependencies><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>2.11.8</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-scala_2.11</artifactId><version>1.6.2</version></dependency><!-- <dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.11</artifactId><version>1.6.2</version></dependency>--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_2.11</artifactId><version>1.6.2</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.11</artifactId><version>1.6.2</version></dependency></dependencies><build><plugins><plugin><groupId>org.scala-tools</groupId><artifactId>maven-scala-plugin</artifactId><version>2.15.2</version><executions><execution><id>scala-compile-first</id><goals><goal>compile</goal></goals><configuration><includes><include>**/*.scala</include></includes></configuration></execution><execution><id>scala-test-compile</id><goals><goal>testCompile</goal></goals></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>1.8</source><target>1.8</target></configuration></plugin></plugins></build></project>WordCount.scala
import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.time.Timeobject WordCount {// Data type for words with countcase class WordWithCount(word: String, count: Long)def main(args: Array[String]): Unit = {// 獲取執行器的環境val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//獲取數據: 從socket中獲取val textDataStream = env.socketTextStream("Desktop", 9999, '\n')val tupDataStream = textDataStream.flatMap(_.split(" ")).map(WordWithCount(_,1))//groupby: 按照指定的字段聚合val windowDstram = tupDataStream.keyBy("word").timeWindow(Time.seconds(5), Time.seconds(1))//窗口bsize=5秒, slid=1swindowDstram.sum("count").print()//啟動執行器,執行程序env.execute("Socket Window WordCount")} }####################################附錄-排查######################################################
Flink運行實驗后找不到輸出結果的原因有如下幾種:
①前面的job沒有cancel影響下一個,可以輸入flink list后用flink cancel刪除。
②yarn的queue資源被占滿了。(flink on yarn模式)
③ 輸出結果在yarn的界面中的某個節點或者某個節點的#$FLINK_HOME/log下面
④不要刪除$FLINK/log下面的flink-用戶名-taskexecutor-2-Desktop.out,這個不會在新job時自動生成,只會在start-all.sh啟動時生成,如果刪除了,除非重啟flink集群,否則是看不到實驗結果了。
⑤啟動集群前6123端口沒有關閉,導致重啟后,任務無法提交
Caused by: java.net.BindException: Could not start actor system on any port in port range 6123
注意提交任務前,首先要確保$FLINK_HOME/log下面輸入grep -ri error沒有error
?
?
?
總結
以上是生活随笔為你收集整理的flink的scala版本的wordcount+flink没有输出结果的几种原因的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 辣木籽的功效?(辣木籽的营养成分)
- 下一篇: 龙井茶的种类与价格(都是龙井茶,价格却天