flink 分词程序代码(批处理和实时)
生活随笔
收集整理的這篇文章主要介紹了
flink 分词程序代码(批处理和实时)
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
批處理
pom文件
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>demo</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><flink.version>1.13.0</flink.version><java.version>1.8</java.version><scala.binary.version>2.11</scala.binary.version><slf4j.version>1.7.30</slf4j.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><!-- 引入日志管理相關依賴--><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>${slf4j.version}</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>${slf4j.version}</version></dependency></dependencies></project>BatchWordCount 程序代碼
import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.*; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector; public class BatchWordCount {public static void main(String[] args) throws Exception {// 1. 創建執行環境ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();// 2. 從文件讀取數據 按行讀取(存儲的元素就是每行的文本)DataSource<String> lineDS = env.readTextFile("input/words.txt");// 3. 轉換數據格式FilterOperator<Tuple2<String, Long>> wordAndOne = lineDS.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {String[] words = line.replace(",","").replace("\"","").replace(".","").replace("-","").split(" ");for (String word : words) {out.collect(Tuple2.of(word, 1L));}}).returns(Types.TUPLE(Types.STRING, Types.LONG)).filter(new FilterFunction<Tuple2<String, Long>>() {@Overridepublic boolean filter(Tuple2<String, Long> stringLongTuple2) throws Exception {if (stringLongTuple2.f0.equals(""))return false;return true;}});//當 Lambda 表達式使用 Java 泛型的時候, 由于泛型擦除的存在, 需要顯示的聲明類型信息// 4. 按照 word 進行分組UnsortedGrouping<Tuple2<String, Long>> wordAndOneUG =wordAndOne.groupBy(0);// 5. 分組內聚合統計AggregateOperator<Tuple2<String, Long>> sum = wordAndOneUG.sum(1);// 6. 打印結果sum.print();} }在目錄 src/main/resources 下添加文件:log4j.properties,內容配置如下:
log4j.rootLogger=error, stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n在工程根目錄下新建一個 input 文件夾,并在下面創建文本文件 words.txt
For a long time, the US Central Intelligence Agency (CIA) has plotted "peaceful evolution" and "color revolutions" as well as spying activities around the world. Although details about these operations have always been murky, a new report released by China's National Computer Virus Emergency Response Center and Chinese cybersecurity company 360 on Thursday unveiled the main technical means the CIA has used to scheme and promote unrest around the world. According to the report, since the beginning of the 21st century, the rapid development of the internet offered "new opportunity" for CIA's infiltration activities in other countries and regions. Any institutions or individuals from anywhere in the world that use US digital equipment or software could be turned into the CIA's "puppet agent." For decades, the CIA has overthrown or attempted to overthrow at least 50 legitimate governments abroad (the CIA has only recognized seven of these instances), causing turmoil in related countries. Whether it is the "color revolution" in Ukraine in 2014, the "sunflower revolution" in Taiwan island, China, or the "saffron revolution" in Myanmar in 2007, the "green revolution" in Iran in 2009, and other attempted "color revolutions" -- the US intelligence agencies are behind them all, according to the report. The US' leading position in technologies of telecommunication and on-site command has provided unprecedented possibilities for the US intelligence community to launch "color revolutions" abroad. The report released by the National Computer Virus Emergency Response Center and 360 disclosed five methods commonly used by the CIA. The first is to provide encrypted network communication services. In order to help protesters in some countries in the Middle East keep in touch and avoid being tracked and arrested, an American company, which, reportedly, has a US military background, developed TOR technology that can stealthily access the internet -- the Onion Router technology. The servers encrypt all information that flows through them to help certain users to surf the web anonymously. After the project was launched by American companies, it was immediately provided free of charge to anti-government elements in Iran, Tunisia, Egypt and other countries and regions to ensure that those "young dissidents who want to shake their own government's rule" can avoid the scrutiny of the government, according to the report. The second method is to provide offline communication services. For example, in order to ensure that anti-government personnel in Tunisia, Egypt and other countries can still keep in touch with the outside world when the internet is disconnected, Google and Twitter quickly launched a special service called "Speak2Tweet," which allows users to dial and upload voice notes for free. These messages are automatically converted into tweets and then uploaded to the internet, and publicly released through Twitter and other platforms to complete the "real-time reporting" of the event on site, said the report.執行結果
E:\jdk\bin\java.exe "-javaagent:E:\IntelliJ IDEA 2021.3.3\lib\idea_rt.jar=49857:E:\IntelliJ IDEA 2021.3.3\bin" -Dfile.encoding=UTF-8 -classpath E:\jdk\jre\lib\charsets.jar;E:\jdk\jre\lib\deploy.jar;E:\jdk\jre\lib\ext\access-bridge-64.jar;E:\jdk\jre\lib\ext\cldrdata.jar;E:\jdk\jre\lib\ext\dnsns.jar;E:\jdk\jre\lib\ext\jaccess.jar;E:\jdk\jre\lib\ext\jfxrt.jar;E:\jdk\jre\lib\ext\localedata.jar;E:\jdk\jre\lib\ext\nashorn.jar;E:\jdk\jre\lib\ext\sunec.jar;E:\jdk\jre\lib\ext\sunjce_provider.jar;E:\jdk\jre\lib\ext\sunmscapi.jar;E:\jdk\jre\lib\ext\sunpkcs11.jar;E:\jdk\jre\lib\ext\zipfs.jar;E:\jdk\jre\lib\javaws.jar;E:\jdk\jre\lib\jce.jar;E:\jdk\jre\lib\jfr.jar;E:\jdk\jre\lib\jfxswt.jar;E:\jdk\jre\lib\jsse.jar;E:\jdk\jre\lib\management-agent.jar;E:\jdk\jre\lib\plugin.jar;E:\jdk\jre\lib\resources.jar;E:\jdk\jre\lib\rt.jar;F:\aspose-words-20.3.jar;F:\work\demo\target\classes;E:\maven\apache-maven-3.8.6-bin\maven_repository\org\apache\flink\flink-java\1.13.0\flink-java-1.13.0.jar;E:\maven\apache-maven-3.8.6-bin\maven_repository\org\apache\flink\flink-core\1.13.0\flink-core-1.13.0.jar;E:\maven\apache-maven-3.8.6-bin\maven_repository\org\apache\flink\flink-annotations\1.13.0\flink-annotations-1.13.0.jar;E:\maven\apache-maven-3.8.6-bin\maven_repository\org\apache\flink\flink-metrics-core\1.13.0\flink-metrics-core-1.13.0.jar;E:\maven\apache-maven-3.8.6-bin\maven_repository\org\apache\flink\flink-shaded-asm-7\7.1-13.0\flink-shaded-asm-7-7.1-13.0.jar;E:\maven\apache-maven-3.8.6-bin\maven_repository\com\esotericsoftware\kryo\kryo\2.24.0\kryo-2.24.0.jar;E:\maven\apache-maven-3.8.6-bin\maven_repository\com\esotericsoftware\minlog\minlog\1.2\minlog-1.2.jar;E:\maven\apache-maven-3.8.6-bin\maven_repository\org\objenesis\objenesis\2.1\objenesis-2.1.jar;E:\maven\apache-maven-3.8.6-bin\maven_repository\commons-collections\commons-collections\3.2.2\commons-collections-3.2.2.jar;E:\maven\apache-maven-3.8.6-bin\maven_repository\org\apache\commons\commons-compress\1.20\commons-compress-1.20.jar;E:\maven\apache-maven-3.8.6-bin\maven_repository\org\apache\commons\commons-lang3\3.3.2\commons-lang3-3.3.2.jar;E:\maven\apache-maven-3.8.6-bin\maven_repository\org\apache\commons\commons-math3\3.5\commons-math3-3.5.jar;E:\maven\apache-maven-3.8.6-bin\maven_repository\com\google\code\findbugs\jsr305\1.3.9\jsr305-1.3.9.jar;E:\maven\apache-maven-3.8.6-bin\maven_repository\org\apache\flink\force-shading\1.13.0\force-shading-1.13.0.jar;E:\maven\apache-maven-3.8.6-bin\maven_repository\org\apache\flink\flink-streaming-java_2.11\1.13.0\flink-streaming-java_2.11-1.13.0.jar;E:\maven\apache-maven-3.8.6-bin\maven_repository\org\apache\flink\flink-file-sink-common\1.13.0\flink-file-sink-common-1.13.0.jar;E:\maven\apache-maven-3.8.6-bin\maven_repository\org\apache\flink\flink-runtime_2.11\1.13.0\flink-runtime_2.11-1.13.0.jar;E:\maven\apache-maven-3.8.6-bin\maven_repository\org\apache\flink\flink-queryable-state-client-java\1.13.0\flink-queryable-state-client-java-1.13.0.jar;E:\maven\apache-maven-3.8.6-bin\maven_repository\org\apache\flink\flink-hadoop-fs\1.13.0\flink-hadoop-fs-1.13.0.jar;E:\maven\apache-maven-3.8.6-bin\maven_repository\commons-io\commons-io\2.7\commons-io-2.7.jar;E:\maven\apache-maven-3.8.6-bin\maven_repository\org\apache\flink\flink-shaded-netty\4.1.49.Final-13.0\flink-shaded-netty-4.1.49.Final-13.0.jar;E:\maven\apache-maven-3.8.6-bin\maven_repository\org\apache\flink\flink-shaded-jackson\2.12.1-13.0\flink-shaded-jackson-2.12.1-13.0.jar;E:\maven\apache-maven-3.8.6-bin\maven_repository\org\apache\flink\flink-shaded-zookeeper-3\3.4.14-13.0\flink-shaded-zookeeper-3-3.4.14-13.0.jar;E:\maven\apache-maven-3.8.6-bin\maven_repository\org\javassist\javassist\3.24.0-GA\javassist-3.24.0-GA.jar;E:\maven\apache-maven-3.8.6-bin\maven_repository\org\scala-lang\scala-library\2.11.12\scala-library-2.11.12.jar;E:\maven\apache-maven-3.8.6-bin\maven_repository\com\typesafe\akka\akka-actor_2.11\2.5.21\akka-actor_2.11-2.5.21.jar;E:\maven\apache-maven-3.8.6-bin\maven_repository\com\typesafe\config\1.3.3\config-1.3.3.jar;E:\maven\apache-maven-3.8.6-bin\maven_repository\org\scala-lang\modules\scala-java8-compat_2.11\0.7.0\scala-java8-compat_2.11-0.7.0.jar;E:\maven\apache-maven-3.8.6-bin\maven_repository\com\typesafe\akka\akka-stream_2.11\2.5.21\akka-stream_2.11-2.5.21.jar;E:\maven\apache-maven-3.8.6-bin\maven_repository\org\reactivestreams\reactive-streams\1.0.2\reactive-streams-1.0.2.jar;E:\maven\apache-maven-3.8.6-bin\maven_repository\com\typesafe\ssl-config-core_2.11\0.3.7\ssl-config-core_2.11-0.3.7.jar;E:\maven\apache-maven-3.8.6-bin\maven_repository\org\scala-lang\modules\scala-parser-combinators_2.11\1.1.1\scala-parser-combinators_2.11-1.1.1.jar;E:\maven\apache-maven-3.8.6-bin\maven_repository\com\typesafe\akka\akka-protobuf_2.11\2.5.21\akka-protobuf_2.11-2.5.21.jar;E:\maven\apache-maven-3.8.6-bin\maven_repository\com\typesafe\akka\akka-slf4j_2.11\2.5.21\akka-slf4j_2.11-2.5.21.jar;E:\maven\apache-maven-3.8.6-bin\maven_repository\org\clapper\grizzled-slf4j_2.11\1.3.2\grizzled-slf4j_2.11-1.3.2.jar;E:\maven\apache-maven-3.8.6-bin\maven_repository\com\github\scopt\scopt_2.11\3.5.0\scopt_2.11-3.5.0.jar;E:\maven\apache-maven-3.8.6-bin\maven_repository\org\xerial\snappy\snappy-java\1.1.8.3\snappy-java-1.1.8.3.jar;E:\maven\apache-maven-3.8.6-bin\maven_repository\com\twitter\chill_2.11\0.7.6\chill_2.11-0.7.6.jar;E:\maven\apache-maven-3.8.6-bin\maven_repository\com\twitter\chill-java\0.7.6\chill-java-0.7.6.jar;E:\maven\apache-maven-3.8.6-bin\maven_repository\org\lz4\lz4-java\1.6.0\lz4-java-1.6.0.jar;E:\maven\apache-maven-3.8.6-bin\maven_repository\org\apache\flink\flink-shaded-guava\18.0-13.0\flink-shaded-guava-18.0-13.0.jar;E:\maven\apache-maven-3.8.6-bin\maven_repository\org\apache\flink\flink-clients_2.11\1.13.0\flink-clients_2.11-1.13.0.jar;E:\maven\apache-maven-3.8.6-bin\maven_repository\org\apache\flink\flink-optimizer_2.11\1.13.0\flink-optimizer_2.11-1.13.0.jar;E:\maven\apache-maven-3.8.6-bin\maven_repository\commons-cli\commons-cli\1.3.1\commons-cli-1.3.1.jar;E:\maven\apache-maven-3.8.6-bin\maven_repository\org\slf4j\slf4j-api\1.7.30\slf4j-api-1.7.30.jar;E:\maven\apache-maven-3.8.6-bin\maven_repository\org\slf4j\slf4j-log4j12\1.7.30\slf4j-log4j12-1.7.30.jar;E:\maven\apache-maven-3.8.6-bin\maven_repository\log4j\log4j\1.2.17\log4j-1.2.17.jar BatchWordCount (China,1) (TOR,1) (Tunisia,2) (allows,1) (an,1) (encrypt,1) (plotted,1) (promote,1) (provide,2) (since,1) (technology,2) (unrest,1) (users,2) (50,1) (According,1) (American,2) (a,4) (been,1) (encrypted,1) (first,1) (five,1) (flows,1) (free,2) (in,18) (launch,1) (network,1) (or,4) (outside,1) (reportedly,1) (special,1) (unveiled,1) (360,2) (US',1) (Whether,1) (automatically,1) (century,1) (development,1) (elements,1) (government,1) (green,1) (means,1) (on,2) (own,1) (position,1) (spying,1) (the,36) (world,4) (young,1) (Computer,2) (For,3) (according,2) (agent,1) (attempted,2) (internet,4) (rapid,1) (said,1) (site,1) (when,1) (which,2) (Myanmar,1) (Taiwan,1) (Virus,2) (and,17) (avoid,2) (background,1) (be,1) (community,1) (into,2) (of,7) (touch,2) (tracked,1) (unprecedented,1) (use,1) (After,1) (Egypt,2) (agencies,1) (anywhere,1) (charge,1) (company,2) (order,2) (platforms,1) (puppet,1) (revolutions,3) (still,1) ((CIA),1) ((the,1) (2007,1) (US,5) (being,1) (commonly,1) (ensure,2) (governments,1) (has,6) (island,1) (new,2) (offline,1) (peaceful,1) (publicly,1) (regions,2) (technologies,1) (through,2) (turned,1) (21st,1) (In,1) (The,5) (all,2) (by,4) (companies,1) (developed,1) (digital,1) (evolution,1) (from,1) (help,2) (individuals,1) (information,1) (it,2) (then,1) (Although,1) (CIA,4) (access,1) (around,2) (arrested,1) (called,1) (causing,1) (certain,1) (is,4) (long,1) (method,1) (opportunity,1) (recognized,1) (services,2) (surf,1) (these,2) (upload,1) (voice,1) (with,1) (Twitter,2) (Ukraine,1) (about,1) (abroad,2) (communication,2) (complete,1) (converted,1) (decades,1) (details,1) (dissidents,1) (equipment,1) (messages,1) (methods,1) (offered,1) (other,5) (personnel,1) (project,1) (protesters,1) (related,1) (second,1) (servers,1) (shake,1) (some,1) (want,1) (Central,1) (Chinese,1) (Google,1) (National,2) (Router,1) (activities,2) (could,1) (notes,1) (report,6) (service,1) (seven,1) (software,1) (those,1) (turmoil,1) (tweets,1) (well,1) (2009,1) (2014,1) (CIA's,2) (Center,2) (Onion,1) (beginning,1) (behind,1) (command,1) (disconnected,1) (for,3) (murky,1) (only,1) (onsite,1) (overthrow,1) (released,3) (to,18) (used,2) (Middle,1) (anonymously,1) (disclosed,1) (event,1) (government's,1) (immediately,1) (keep,2) (overthrown,1) (quickly,1) (uploaded,1) (web,1) (China's,1) (Response,2) (antigovernment,2) (are,2) (as,2) (color,4) (dial,1) (instances),1) (intelligence,2) (launched,2) (least,1) (legitimate,1) (realtime,1) (scheme,1) (stealthily,1) (technical,1) (time,1) (who,1) (Agency,1) (Emergency,2) (Intelligence,1) (at,1) (can,3) (countries,5) (example,1) (infiltration,1) (institutions,1) (leading,1) (military,1) (provided,2) (reporting,1) (rule,1) (sunflower,1) (telecommunication,1) (them,2) (Any,1) (East,1) (Iran,2) (Speak2Tweet,1) (These,1) (Thursday,1) (always,1) (cybersecurity,1) (have,1) (main,1) (operations,1) (possibilities,1) (revolution,4) (saffron,1) (scrutiny,1) (that,5) (their,1) (was,2)進程已結束,退出代碼0實時代碼
import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; import java.util.Arrays; public class BoundedStreamWordCount {public static void main(String[] args) throws Exception {// 1. 創建流式執行環境StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// 2. 讀取文件DataStreamSource<String> lineDSS = env.readTextFile("input/words.txt");// 3. 轉換數據格式SingleOutputStreamOperator<Tuple2<String, Long>> wordAndOne = lineDSS.flatMap((String line, Collector<String> words) -> {Arrays.stream(line.split(" ")).forEach(words::collect);}).returns(Types.STRING).map(word -> Tuple2.of(word, 1L)).returns(Types.TUPLE(Types.STRING, Types.LONG));// 4. 分組KeyedStream<Tuple2<String, Long>, String> wordAndOneKS = wordAndOne.keyBy(t -> t.f0);// 5. 求和SingleOutputStreamOperator<Tuple2<String, Long>> result = wordAndOneKS.sum(1);// 6. 打印result.print();// 7. 執行env.execute();} }總結
以上是生活随笔為你收集整理的flink 分词程序代码(批处理和实时)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 港科百创 | 决赛成功举办!2021香港
- 下一篇: 学校计算机房电脑桌,学校机房用双机位电脑