intellij运行flink的wordcount实验-Java版本
注意哈,intellij運(yùn)行wordcount這個(gè)并不屬于flink集群中的任何一種模式,
這個(gè)屬于java應(yīng)用方式提交,不需要啟動(dòng)任何flink集群.
########################項(xiàng)目結(jié)構(gòu)####################################################
├──pom.xml
├── src
│???├── main
│???│???├── java
│???│???│???└── WordCount.java
│???│???└── resources
└──pom.xml
#############################實(shí)驗(yàn)步驟###############################################
①建立上述結(jié)構(gòu)的工程,在intellij中導(dǎo)入,每次改動(dòng)pom.xml時(shí),intellij都會(huì)自動(dòng)下載依賴,但是第一次下載依賴耗時(shí)較長,需要耐心等待。
②nc -lk 9999
然后輸入
hello hello world world world(一定要按下回車鍵,然后Flink才會(huì)開始統(tǒng)計(jì)詞頻)
③Alt+Shift+F10選擇WordCount運(yùn)行
注意②③順序不能反,否則一定概率報(bào)錯(cuò)。(發(fā)生的概率大小不確定)
結(jié)果:
?
##############################附錄##############################################
WordCount.java(IP改成自己的,默認(rèn)的是"Desktop")
import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.util.Collector;public class WordCount {public static void main(String[] args) throws Exception {//定義socket的端口號int port;try{ParameterTool parameterTool = ParameterTool.fromArgs(args);port = parameterTool.getInt("port");}catch (Exception e){System.err.println("沒有指定port參數(shù),使用默認(rèn)值9000");port = 9999;}//獲取運(yùn)行環(huán)境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//連接socket獲取輸入的數(shù)據(jù)DataStreamSource<String> text = env.socketTextStream("Desktop", port, "\n");//計(jì)算數(shù)據(jù)DataStream<WordWithCount> windowCount = text.flatMap(new FlatMapFunction<String, WordWithCount>() {public void flatMap(String value, Collector<WordWithCount> out) throws Exception {String[] splits = value.split("\\s");for (String word:splits) {out.collect(new WordWithCount(word,1L));}}})//打平操作,把每行的單詞轉(zhuǎn)為<word,count>類型的數(shù)據(jù).keyBy("word")//針對相同的word數(shù)據(jù)進(jìn)行分組.timeWindow(Time.seconds(2),Time.seconds(1))//指定計(jì)算數(shù)據(jù)的窗口大小和滑動(dòng)窗口大小.sum("count");//把數(shù)據(jù)打印到控制臺windowCount.print().setParallelism(1);//使用一個(gè)并行度//注意:因?yàn)閒link是懶加載的,所以必須調(diào)用execute方法,上面的代碼才會(huì)執(zhí)行env.execute("streaming word count");}/*** 主要為了存儲單詞以及單詞出現(xiàn)的次數(shù)*/public static class WordWithCount{public String word;public long count;public WordWithCount(){}public WordWithCount(String word, long count) {this.word = word;this.count = count;}@Overridepublic String toString() {return "WordWithCount{" +"word='" + word + '\'' +", count=" + count +'}';}}}pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.xiao</groupId><artifactId>bbb</artifactId><version>1.0-SNAPSHOT</version><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.10.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>1.10.1</version> <!-- <scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.10</artifactId><version>1.2.0</version></dependency></dependencies></project>?
創(chuàng)作挑戰(zhàn)賽新人創(chuàng)作獎(jiǎng)勵(lì)來咯,堅(jiān)持創(chuàng)作打卡瓜分現(xiàn)金大獎(jiǎng)總結(jié)
以上是生活随笔為你收集整理的intellij运行flink的wordcount实验-Java版本的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 深入理解Java堆内存分配策略(Xmx和
- 下一篇: Wrapper模式(Decorator模