Apache Flink 零基础入门(五)Flink开发实时处理应用程序
使用Flink + java實(shí)現(xiàn)需求
環(huán)境
JDK:1.8
Maven:3.6.1(最低Maven 3.0.4)
使用上一節(jié)中的springboot-flink-train項(xiàng)目
開(kāi)發(fā)步驟
第一步:創(chuàng)建流處理上下文環(huán)境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();第二步:讀取數(shù)據(jù),使用socket流方式讀取數(shù)據(jù)
DataStreamSource<String> text = env.socketTextStream("192.168.152.45", 9999);第三步:transform
text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {String[] tokens = value.toLowerCase().split(",");for(String token: tokens) {if(token.length() > 0) {out.collect(new Tuple2<String, Integer>(token, 1));}}}}).keyBy(0).timeWindow(Time.seconds(5)).sum(1).print();這里我們使用逗號(hào)分隔,然后跟批處理不同的是,這里使用keyBy(0),而不是groupBy(0)。timewindow表示每隔多久執(zhí)行一次。
第四步:執(zhí)行
env.execute("StreamingWCJavaApp");整體代碼如下:
/*** 使用Java API來(lái)開(kāi)發(fā)Flink的實(shí)時(shí)處理應(yīng)用程序* wc統(tǒng)計(jì)的數(shù)據(jù)源自socket*/ public class StreamingWCJava02App {public static void main(String[] args) throws Exception {// 獲取參數(shù)int port;try{ParameterTool tool = ParameterTool.fromArgs(args);port = tool.getInt("port");} catch (Exception e) {System.out.println("端口未設(shè)置, 使用默認(rèn)端口9999");port = 9999;}// step1: 獲取流處理上下文環(huán)境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// step2: 讀取數(shù)據(jù)DataStreamSource<String> text = env.socketTextStream("192.168.152.45", port);// step3: transformtext.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {String[] tokens = value.toLowerCase().split(",");for(String token: tokens) {if(token.length() > 0) {out.collect(new Tuple2<String, Integer>(token, 1));}}}}).keyBy(0).timeWindow(Time.seconds(5)).sum(1).print();env.execute("StreamingWCJavaApp");}}運(yùn)行
首先在192.168.152.45上運(yùn)行命令
nc -l 9999然后在運(yùn)行main方法。在192.168.152.45的nc上輸入
abc,def,abc,ddd在idea控制臺(tái)輸出如下:
4> (abc,2) 1> (def,1) 4> (ddd,1)這個(gè)前面的"4>"表示并行度。我們可以設(shè)置setParallelism(1)來(lái)忽略這個(gè)問(wèn)題。如下所示:
text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {String[] tokens = value.toLowerCase().split(",");for(String token: tokens) {if(token.length() > 0) {out.collect(new Tuple2<String, Integer>(token, 1));}}}}).keyBy(0).timeWindow(Time.seconds(5)).sum(1).print().setParallelism(1);這樣控制臺(tái)的打印結(jié)果如下:
(abc,2) (ddd,1) (def,1)這樣一個(gè)簡(jiǎn)單的demo就成功了!
重構(gòu)代碼
上面的代碼中l(wèi)ocalhost與port需要用參數(shù)傳遞進(jìn)來(lái)。
代碼如下:
// 獲取參數(shù)int port;try{ParameterTool tool = ParameterTool.fromArgs(args);port = tool.getInt("port");} catch (Exception e) {System.out.println("端口未設(shè)置, 使用默認(rèn)端口9999");port = 9999;}使用Flink提供的ParameterTool來(lái)接收參數(shù)。
我們?cè)谶\(yùn)行時(shí)就可以指定參數(shù)列表了,其中的key必須以“-”或者“--”開(kāi)頭。
在運(yùn)行時(shí),配置參數(shù):
這樣運(yùn)行就可以從外界傳遞參數(shù)了
使用Flink + Scala實(shí)現(xiàn)需求
接下來(lái)使用Scala方式實(shí)現(xiàn),在項(xiàng)目springboot-flink-train-scala中新建StreamingWCScalaApp,內(nèi)容如下:
/*** 使用Scala開(kāi)發(fā)Flink的實(shí)時(shí)處理應(yīng)用程序*/ object StreamingWCScalaApp {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment// 引入隱式轉(zhuǎn)換import org.apache.flink.api.scala._val text = env.socketTextStream("192.168.152.45", 9999)text.flatMap(_.split(",")).map((_,1)).keyBy(0).timeWindow(Time.seconds(5)).sum(1).print().setParallelism(1)env.execute("StreamingWCScalaApp");} }這種方式比java實(shí)現(xiàn)更加簡(jiǎn)潔。
與50位技術(shù)專(zhuān)家面對(duì)面20年技術(shù)見(jiàn)證,附贈(zèng)技術(shù)全景圖總結(jié)
以上是生活随笔為你收集整理的Apache Flink 零基础入门(五)Flink开发实时处理应用程序的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: Apache Flink 零基础入门(四
- 下一篇: Apache Flink 零基础入门(六