Flink流计算WordCount代码示例
生活随笔
收集整理的這篇文章主要介紹了
Flink流计算WordCount代码示例
小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.
代碼
package com.zxl.flinkimport org.apache.flink.streaming.api.scala.StreamExecutionEnvironment/*** flink的流計算的WordCount*/ object FlinkStreamWordCount {def main(args: Array[String]): Unit = {//1、初始化Flink流計算的環(huán)境val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//修改并行度streamEnv.setParallelism(1) //默認(rèn)所有算子的并行度為1//2、導(dǎo)入隱式轉(zhuǎn)換import org.apache.flink.streaming.api.scala._//3、讀取數(shù)據(jù),讀取sock流中的數(shù)據(jù)//val stream: DataStream[String] = streamEnv.socketTextStream("hadoop101",8888) //DataStream ==> spark 中Dstream//nc -lk 8888val stream: DataStream[String] = streamEnv.socketTextStream("localhost",8888) //DataStream ==> spark 中Dstream//4、轉(zhuǎn)換和處理數(shù)據(jù)val result: DataStream[(String, Int)] = stream.flatMap(_.split(" ")).map((_, 1)).setParallelism(2).keyBy(0)//分組算子 : 0 或者 1 代表下標(biāo)。前面的DataStream[二元組] , 0代表單詞 ,1代表單詞出現(xiàn)的次數(shù).sum(1).setParallelism(2) //聚會累加算子//5、打印結(jié)果result.print("結(jié)果").setParallelism(1)//6、啟動流計算程序streamEnv.execute("wordcount")} }調(diào)試
發(fā)數(shù)據(jù)
console日志輸出
結(jié)果> (scala,1) 結(jié)果> (hello,1) 結(jié)果> (zxl,1) 結(jié)果> (hello,2) 結(jié)果> (hello,3) 結(jié)果> (flink,1) 結(jié)果> (hello,4) 結(jié)果> (spark,1) 結(jié)果> (filne,1) 結(jié)果> (flink,2)附pom文件
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.zxl</groupId><artifactId>Flink-SXT</artifactId><version>1.0-SNAPSHOT</version><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-scala_2.11</artifactId><version>1.9.1</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_2.11</artifactId><version>1.9.1</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>2.7.2</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>2.7.2</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.11</artifactId><version>1.9.1</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>0.11.0.3</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-filesystem_2.11</artifactId><version>1.9.1</version></dependency><dependency><groupId>org.apache.bahir</groupId><artifactId>flink-connector-redis_2.11</artifactId><version>1.0</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.44</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_2.11</artifactId><version>1.9.1</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-scala-bridge_2.11</artifactId><version>1.9.1</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-cep-scala_2.11</artifactId><version>1.9.1</version></dependency> </dependencies><build><plugins><!-- 該插件用于將Scala代碼編譯成class文件 --><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.4.6</version><executions><execution><!-- 聲明綁定到maven的compile階段 --><goals><goal>testCompile</goal></goals></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>3.0.0</version><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build></project> 與50位技術(shù)專家面對面20年技術(shù)見證,附贈技術(shù)全景圖總結(jié)
以上是生活随笔為你收集整理的Flink流计算WordCount代码示例的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 如何在Linux中安装和使用Silver
- 下一篇: kafka删除主题