1.10.Flink DataStreamAPI(API的抽象级别、Data Sources、connectors、Source容错性保证、Sink容错性保证、自定义sink、partition等)
1.10.Flink DataStreamAPI
1.10.1.Flink API的抽象級別
1.10.2.DatSource部分詳解
1.10.2.1.DataStream API之Data Sources
1.10.2.2.DataSources API
1.10.2.3.DataStream內置connectors
1.10.2.4.Source容錯性保證
1.10.2.5.Sink容錯性保證
1.10.2.6.自定義sink
1.10.2.7.Table & SQL Connectors
1.10.2.8.自定義source
1.10.2.9.DataStream API之Transformations部分詳解
1.10.2.10.DataStream API之partition
1.10.2.11.DataStream API之Data Sink
1.10.Flink DataStreamAPI
1.10.1.Flink API的抽象級別
?Flink API 最底層的抽象為有狀態實時流處理。其抽象實現是 Process Function,并且 Process Function 被 Flink 框架集成到了 DataStream API 中來為我們使用。它允許用戶在應用程序中自由地處理來自單流或多流的事件(數據),并提供具有全局一致性和容錯保障的狀態。此外,用戶可以在此層抽象中注冊事件時間(event time)和處理時間(processing time)回調方法,從而允許程序可以實現復雜計算。
?Flink API 第二層抽象是 Core APIs。實際上,許多應用程序不需要使用到上述最底層抽象的 API,而是可以使用 Core APIs 進行編程:其中包含 DataStream API(應用于有界/無界數據流場景)和 DataSet API(應用于有界數據集場景)兩部分。Core APIs 提供的流式 API(Fluent API)為數據處理提供了通用的模塊組件,例如各種形式的用戶自定義轉換(transformations)、聯接(joins)、聚合(aggregations)、窗口(windows)和狀態(state)操作等。此層 API 中處理的數據類型在每種編程語言中都有其對應的類。
Process Function 這類底層抽象和 DataStream API 的相互集成使得用戶可以選擇使用更底層的抽象 API 來實現自己的需求。DataSet API 還額外提供了一些原語,比如循環/迭代(loop/iteration)操作。
?Flink API 第三層抽象是 Table API。Table API 是以表(Table)為中心的聲明式編程(DSL)API,例如在流式數據場景下,它可以表示一張正在動態改變的表。Table API 遵循(擴展)關系模型:即表擁有 schema(類似于關系型數據庫中的 schema),并且 Table API 也提供了類似于關系模型中的操作,比如 select、project、join、group-by 和 aggregate 等。Table API 程序是以聲明的方式定義應執行的邏輯操作,而不是確切地指定程序應該執行的代碼。盡管 Table API 使用起來很簡潔并且可以由各種類型的用戶自定義函數擴展功能,但還是比 Core API 的表達能力差。此外,Table API 程序在執行之前還會使用優化器中的優化規則對用戶編寫的表達式進行優化。
表和 DataStream/DataSet 可以進行無縫切換,Flink 允許用戶在編寫應用程序時將 Table API 與 DataStream/DataSet API 混合使用。
?Flink API 最頂層抽象是 SQL。這層抽象在語義和程序表達式上都類似于 Table API,但是其程序實現都是 SQL 查詢表達式。SQL 抽象與 Table API 抽象之間的關聯是非常緊密的,并且 SQL 查詢語句可以在 Table API 中定義的表上執行。
本技術文檔上案例所需的pom.xml如下:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>xxxxx.demo</groupId><artifactId>flink-demo</artifactId><version>1.0-SNAPSHOT</version><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><!--maven properties --><maven.test.skip>false</maven.test.skip><maven.javadoc.skip>false</maven.javadoc.skip><!-- compiler settings properties --><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><rocketmq.version>4.7.1</rocketmq.version><flink.version>1.11.1</flink.version><commons-lang.version>2.5</commons-lang.version><scala.binary.version>2.12</scala.binary.version></properties><distributionManagement><repository><id>releases</id><layout>default</layout><url>http://ip/nexus/content/repositories/releases/</url></repository><snapshotRepository><id>snapshots</id><name>snapshots</name><url>http://ip/nexus/content/repositories/snapshots/</url></snapshotRepository></distributionManagement><repositories><repository><id>releases</id><layout>default</layout><url>http://ip/nexus/content/repositories/releases/</url></repository><repository><id>snapshots</id><name>snapshots</name><url>http://ip/nexus/content/repositories/snapshots/</url><snapshots><enabled>true</enabled><updatePolicy>always</updatePolicy><checksumPolicy>warn</checksumPolicy></snapshots></repository><repository><id>tianque</id><name>tianque</name><url>http://ip/nexus/content/repositories/tianque/</url></repository><repository><id>public</id><name>public</name><url>http://ip/nexus/content/groups/public/</url></repository><!-- 新加 --><repository><id>cloudera</id><url>https://repository.cloudera.com/artifactory/cloudera-repos/</url></repository></repositories><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><!--1.compile : 默認的scope,運行期有效,需要打入包中。2.provided : 編譯器有效,運行期不需要提供,不會打入包中。3.runtime : 編譯不需要,在運行期有效,需要導入包中。(接口與實現分離)4.test : 測試需要,不會打入包中5.system : 非本地倉庫引入、存在系統的某個路徑下的jar。(一般不使用)--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_${scala.binary.version}</artifactId><version>${flink.version}</version><!--<scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>com.tianque.doraemon</groupId><artifactId>issue-business-api</artifactId><version>1.0.6.RELEASE</version></dependency><!-- 使用scala編程的時候使用下面的依賴 start--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-scala_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><!-- 使用scala編程的時候使用下面的依賴 end--><!-- kafka connector scala 2.12 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.0.0</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>1.7.25</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.25</version><scope>test</scope></dependency><dependency><groupId>org.apache.bahir</groupId><artifactId>flink-connector-redis_2.11</artifactId><version>1.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-queryable-state-runtime_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>${rocketmq.version}</version></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-acl</artifactId><version>${rocketmq.version}</version></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-common</artifactId><version>${rocketmq.version}</version><exclusions><exclusion><groupId>io.netty</groupId><artifactId>netty-tcnative</artifactId></exclusion></exclusions></dependency><dependency><groupId>commons-lang</groupId><artifactId>commons-lang</artifactId><version>${commons-lang.version}</version></dependency><!--test --><dependency><groupId>junit</groupId><artifactId>junit</artifactId><scope>test</scope><version>4.12</version></dependency><dependency><groupId>org.powermock</groupId><artifactId>powermock-module-junit4</artifactId><version>1.5.5</version><scope>test</scope></dependency><dependency><groupId>org.powermock</groupId><artifactId>powermock-api-mockito</artifactId><version>1.5.5</version><scope>test</scope></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-namesrv</artifactId><version>${rocketmq.version}</version><scope>test</scope></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-broker</artifactId><version>${rocketmq.version}</version><scope>test</scope></dependency><dependency><groupId>com.tianque</groupId><artifactId>caterpillar-sdk</artifactId><version>0.1.4</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.73</version></dependency><dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>3.3.0</version></dependency></dependencies><build><plugins><!-- 編譯插件 --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.6.0</version><configuration><source>${maven.compiler.source}</source><target>${maven.compiler.target}</target><encoding>UTF-8</encoding><compilerVersion>${maven.compiler.source}</compilerVersion><showDeprecation>true</showDeprecation><showWarnings>true</showWarnings></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-surefire-plugin</artifactId><version>2.12.4</version><configuration><skipTests>${maven.test.skip}</skipTests></configuration></plugin><plugin><groupId>org.apache.rat</groupId><artifactId>apache-rat-plugin</artifactId><version>0.12</version><configuration><excludes><exclude>README.md</exclude></excludes></configuration></plugin><plugin><artifactId>maven-checkstyle-plugin</artifactId><version>2.17</version><executions><execution><id>verify</id><phase>verify</phase><configuration><configLocation>style/rmq_checkstyle.xml</configLocation><encoding>UTF-8</encoding><consoleOutput>true</consoleOutput><failsOnError>true</failsOnError><includeTestSourceDirectory>false</includeTestSourceDirectory><includeTestResources>false</includeTestResources></configuration><goals><goal>check</goal></goals></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-javadoc-plugin</artifactId><version>2.10.4</version><configuration><aggregate>true</aggregate><reportOutputDirectory>javadocs</reportOutputDirectory><locale>en</locale></configuration></plugin><!-- scala編譯插件 --><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.1.6</version><configuration><scalaCompatVersion>2.11</scalaCompatVersion><scalaVersion>2.11.12</scalaVersion><encoding>UTF-8</encoding></configuration><executions><execution><id>compile-scala</id><phase>compile</phase><goals><goal>add-source</goal><goal>compile</goal></goals></execution><execution><id>test-compile-scala</id><phase>test-compile</phase><goals><goal>add-source</goal><goal>testCompile</goal></goals></execution></executions></plugin><!-- 打jar包插件(會包含所有依賴) --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>2.6</version><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs><archive><manifest><!-- 可以設置jar包的入口類(可選),此處根據自己項目的情況進行修改 --><mainClass>xxxxx.SocketWindowWordCountJava</mainClass></manifest></archive></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build></project>1.10.2.DatSource部分詳解
1.10.2.1.DataStream API之Data Sources
?source是程序的數據源輸入,你可以通過StreamExecutionEnvironment.addSource(sourceFunction)來為你的程序添加一個source。
?Flink提供了大量的已經實現好的source方法,你也可以自定義source
- ?通過實現sourceFunction接口來自定義無并行度的source
- ?或者你也可以通過實現ParallelSourceFunction接口or繼承RichParallelSourceFunction來自定義有并行度的source。
以下是自定義Source相關的內容
import org.apache.flink.streaming.api.functions.source.SourceFunction;/*** 自定義實現并行度為1的source** 模擬產生從1開始的遞增數字*** 注意:* SourceFunction 和 SourceContext 都需要指定數據類型,如果不指定,代碼運行的時候會報錯* Caused by: org.apache.flink.api.common.functions.InvalidTypesException:* The types of the interface org.apache.flink.streaming.api.functions.source.SourceFunction could not be inferred.* Support for synthetic interfaces, lambdas, and generic or raw types is limited at this point** Created by xxxx on 2020/10/09 on 2018/10/23.*/ public class MyNoParalleSource implements SourceFunction<Long>{private long count = 1L;private boolean isRunning = true;/*** 主要的方法* 啟動一個source* 大部分情況下,都需要在這個run方法中實現一個循環,這樣就可以循環產生數據了** @param ctx* @throws Exception*/@Overridepublic void run(SourceContext<Long> ctx) throws Exception {while(isRunning){ctx.collect(count);count++;//每秒產生一條數據Thread.sleep(1000);}}/*** 取消一個cancel的時候會調用的方法**/@Overridepublic void cancel() {isRunning = false;} }scala代碼:
import org.apache.flink.streaming.api.functions.source.SourceFunction import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext/*** 創建自定義并行度為1的source** 實現從1開始產生遞增數字** Created by xxxx on 2020/10/09 on 2018/10/23.*/ class MyNoParallelSourceScala extends SourceFunction[Long]{var count = 1Lvar isRunning = trueoverride def run(ctx: SourceContext[Long]) = {while(isRunning){ctx.collect(count)count+=1Thread.sleep(1000)}}override def cancel() = {isRunning = false}} import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;/*** 自定義實現一個支持并行度的source* Created by xxxx on 2020/10/09 on 2018/10/23.*/ public class MyParalleSource implements ParallelSourceFunction<Long> {private long count = 1L;private boolean isRunning = true;/*** 主要的方法* 啟動一個source* 大部分情況下,都需要在這個run方法中實現一個循環,這樣就可以循環產生數據了** @param ctx* @throws Exception*/@Overridepublic void run(SourceContext<Long> ctx) throws Exception {while(isRunning){ctx.collect(count);count++;//每秒產生一條數據Thread.sleep(1000);}}/*** 取消一個cancel的時候會調用的方法**/@Overridepublic void cancel() {isRunning = false;} } ```java import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;/*** 自定義實現一個支持并行度的source** RichParallelSourceFunction 會額外提供open和close方法* 針對source中如果需要獲取其他鏈接資源,那么可以在open方法中獲取資源鏈接,在close中關閉資源鏈接** Created by xxxx on 2020/10/09 on 2018/10/23.*/ public class MyRichParalleSource extends RichParallelSourceFunction<Long> {private long count = 1L;private boolean isRunning = true;/*** 主要的方法* 啟動一個source* 大部分情況下,都需要在這個run方法中實現一個循環,這樣就可以循環產生數據了** @param ctx* @throws Exception*/@Overridepublic void run(SourceContext<Long> ctx) throws Exception {while(isRunning){ctx.collect(count);count++;//每秒產生一條數據Thread.sleep(1000);}}/*** 取消一個cancel的時候會調用的方法**/@Overridepublic void cancel() {isRunning = false;}/*** 這個方法只會在最開始的時候被調用一次* 實現獲取鏈接的代碼* @param parameters* @throws Exception*/@Overridepublic void open(Configuration parameters) throws Exception {System.out.println("open.............");super.open(parameters);}/*** 實現關閉鏈接的代碼* @throws Exception*/@Overridepublic void close() throws Exception {super.close(); }}使用自己定義的source
import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.Time;/*** 使用并行度為1的source** Created by xxxx on 2020/10/09 on 2018/10/23.*/ public class StreamingDemoWithMyNoPralalleSource {public static void main(String[] args) throws Exception {//獲取Flink的運行環境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//獲取數據源DataStreamSource<Long> text = env.addSource(new MyNoParalleSource()).setParallelism(1);//注意:針對此source,并行度只能設置為1DataStream<Long> num = text.map(new MapFunction<Long, Long>() {@Overridepublic Long map(Long value) throws Exception {System.out.println("接收到數據:" + value);return value;}});//每2秒鐘處理一次數據DataStream<Long> sum = num.timeWindowAll(Time.seconds(2)).sum(0);//打印結果sum.print().setParallelism(1);String jobName = StreamingDemoWithMyNoPralalleSource.class.getSimpleName();env.execute(jobName);} } import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.Time;/*** 使用多并行度的source** Created by xxxx on 2020/10/09 on 2018/10/23.*/ public class StreamingDemoWithMyPralalleSource {public static void main(String[] args) throws Exception {//獲取Flink的運行環境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//獲取數據源DataStreamSource<Long> text = env.addSource(new MyParalleSource()).setParallelism(2);DataStream<Long> num = text.map(new MapFunction<Long, Long>() {@Overridepublic Long map(Long value) throws Exception {System.out.println("接收到數據:" + value);return value;}});//每2秒鐘處理一次數據DataStream<Long> sum = num.timeWindowAll(Time.seconds(2)).sum(0);//打印結果sum.print().setParallelism(1);String jobName = StreamingDemoWithMyPralalleSource.class.getSimpleName();env.execute(jobName);} } import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.Time;/*** 使用多并行度的source** Created by xxxx on 2020/10/09 on 2018/10/23.*/ public class StreamingDemoWithMyRichPralalleSource {public static void main(String[] args) throws Exception {//獲取Flink的運行環境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//獲取數據源DataStreamSource<Long> text = env.addSource(new MyRichParalleSource()).setParallelism(2);DataStream<Long> num = text.map(new MapFunction<Long, Long>() {@Overridepublic Long map(Long value) throws Exception {System.out.println("接收到數據:" + value);return value;}});//每2秒鐘處理一次數據DataStream<Long> sum = num.timeWindowAll(Time.seconds(2)).sum(0);//打印結果sum.print().setParallelism(1);String jobName = StreamingDemoWithMyRichPralalleSource.class.getSimpleName();env.execute(jobName);} }以下是scala的實現
import org.apache.flink.streaming.api.functions.source.SourceFunction import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext/*** 創建自定義并行度為1的source** 實現從1開始產生遞增數字** Created by xxxx on 2020/10/09 on 2018/10/23.*/ class MyNoParallelSourceScala extends SourceFunction[Long]{var count = 1Lvar isRunning = trueoverride def run(ctx: SourceContext[Long]) = {while(isRunning){ctx.collect(count)count+=1Thread.sleep(1000)}}override def cancel() = {isRunning = false}} import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext/*** 創建自定義并行度為1的source** 實現從1開始產生遞增數字** Created by xxxx on 2020/10/09 on 2018/10/23.*/ class MyParallelSourceScala extends ParallelSourceFunction[Long]{var count = 1Lvar isRunning = trueoverride def run(ctx: SourceContext[Long]) = {while(isRunning){ctx.collect(count)count+=1Thread.sleep(1000)}}override def cancel() = {isRunning = false}} import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext/*** 創建自定義并行度為1的source** 實現從1開始產生遞增數字** Created by xxxx on 2020/10/09 on 2018/10/23.*/ class MyRichParallelSourceScala extends RichParallelSourceFunction[Long]{var count = 1Lvar isRunning = trueoverride def run(ctx: SourceContext[Long]) = {while(isRunning){ctx.collect(count)count+=1Thread.sleep(1000)}}override def cancel() = {isRunning = false}override def open(parameters: Configuration): Unit = super.open(parameters)override def close(): Unit = super.close() } import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.windowing.time.Time/*** Created by xxxx on 2020/10/09 on 2018/10/23.*/ object StreamingDemoWithMyNoParallelSourceScala {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment//隱式轉換import org.apache.flink.api.scala._val text = env.addSource(new MyNoParallelSourceScala)val mapData = text.map(line=>{println("接收到的數據:"+line)line})val sum = mapData.timeWindowAll(Time.seconds(2)).sum(0)sum.print().setParallelism(1)env.execute("StreamingDemoWithMyNoParallelSourceScala")}} import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.windowing.time.Time/*** Created by xxxx on 2020/10/09 on 2018/10/23.*/ object StreamingDemoWithMyParallelSourceScala {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment//隱式轉換import org.apache.flink.api.scala._val text = env.addSource(new MyParallelSourceScala).setParallelism(2)val mapData = text.map(line=>{println("接收到的數據:"+line)line})val sum = mapData.timeWindowAll(Time.seconds(2)).sum(0)sum.print().setParallelism(1)env.execute("StreamingDemoWithMyNoParallelSourceScala")}} import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.windowing.time.Time/*** Created by xxxx on 2020/10/09 on 2018/10/23.*/ object StreamingDemoWithMyRichParallelSourceScala {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment//隱式轉換import org.apache.flink.api.scala._val text = env.addSource(new MyRichParallelSourceScala).setParallelism(2)val mapData = text.map(line=>{println("接收到的數據:"+line)line})val sum = mapData.timeWindowAll(Time.seconds(2)).sum(0)sum.print().setParallelism(1)env.execute("StreamingDemoWithMyNoParallelSourceScala")}}1.10.2.2.DataSources API
?基于文件
- ?readTextFile(path)
- ?讀取文本文件,文件遵循TextInputFormat讀取規則,逐行讀取并返回。
?基于socket - ?從socket中讀取數據,元素可以通過一個分隔符切開。
?基于集合 - ?fromCollection(Collection)
- ?通過java 的collection集合創建一個數據流,集合中的所有元素必須是相同類型的。
?自定義輸入 - ?addSource 可以實現讀取第三方數據源的數據
- ?系統內置提供了一批connectors,連接器會提供對應的source支持【kafka】
基于集合的案例fromCollection(Collection):
import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.util.ArrayList;/*** @author tuzuoquan* @version 1.0* @ClassName StreamingFromCollection* @description TODO* @date 2020/9/16 13:49**/ public class StreamingFromCollection {public static void main(String[] args) throws Exception {//獲取Flink的運行環境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();ArrayList<Integer> data = new ArrayList<>();data.add(10);data.add(15);data.add(20);//指定數據源DataStreamSource<Integer> collectionData = env.fromCollection(data);//通map對數據進行處理 // DataStream<Integer> num = collectionData.map(new MapFunction<Integer, Integer>() { // @Override // public Integer map(Integer value) throws Exception { // return value + 1; // } // });//通map對數據進行處理//DataStream<String>中的String為最終的系統中返回值//new MapFunction<Integer, String>中的String和返回值的類型保值一致//public String map(Integer value)中的String就是返回值中的類型DataStream<String> num = collectionData.map(new MapFunction<Integer, String>() {@Overridepublic String map(Integer value) throws Exception {return value + 1 + "_suffix";}});//直接打印num.print().setParallelism(1);env.execute("StreamingFromCollection");}}1.10.2.3.DataStream內置connectors
一些比較基本的 Source 和 Sink 已經內置在 Flink 里。 預定義 data sources 支持從文件、目錄、socket,以及 collections 和 iterators 中讀取數據。 預定義 data sinks 支持把數據寫入文件、標準輸出(stdout)、標準錯誤輸出(stderr)和socket。
?Apache Kafka (source/sink)
?Apache Cassandra (sink)
?Amazon Kinesis Streams (source/sink)
?Elasticsearch (sink)
?Hadoop FileSystem (sink)
?RabbitMQ (source/sink)
?Apache NiFi (source/sink)
?Twitter Streaming API (source)
?Google PubSub (source/sink)
?JDBC (sink)
在使用一種連接器時,通常需要額外的第三方組件,比如:數據存儲服務器或者消費隊列。要注意這些列舉的連接器是Flink工程的一部分,包含在發布的源碼中,但是不包含在二進制發行版本中。
1.10.2.4.Source容錯性保證
| Apache Kafka | 精確一次 | 根據你的版本恰當的Kafka連接器 |
| AWS Kinesis Streams | 精確一次 | |
| RabbitMQ | 至多一次 (v 0.10) / 精確一次 (v 1.0) | |
| Twitter Streaming API | 至多一次 | |
| Google PubSub | 至少一次 | |
| Collections | 精確一次 | |
| Files | 精確一次 | |
| Sockets | 至多一次 |
為了保證端到端精確一次的數據交付(在精確一次的狀態語義上更進一步),sink需要參與checkpointing。下表列舉了Flink與其自帶Sink的交付保證(假設精確一次狀態更新)
1.10.2.5.Sink容錯性保證
| HDFS BucketingSink | 精確一次 | 實現方法取決于 Hadoop 的版本 |
| Elasticsearch | 至少一次 | |
| Kafka producer | 至少一次/精確一次 | 當使用事務生產者時,保證精確一次 (v 0.11+) |
| Cassandra sink | 至少一次 / 精確一次 | |
| AWS Kinesis Streams | 至少一次 | |
| File sinks | 精確一次 | |
| Socket sinks | 至少一次 | |
| Standard output | 至少一次 | |
| Redis sink | 至少一次 |
1.10.2.6.自定義sink
?實現自定義的sink
- ?實現SinkFunction接口
- ?或者繼承RichSinkFunction
1.10.2.7.Table & SQL Connectors
- ?Formats
- ?Kafka
- ?JDBC
- ?Elasticsearch
- ?FileSystem
- ?HBASE
- ?DataGen
- ?BlackHole
1.10.2.8.自定義source
?實現并行度為1的自定義source
- ?實現SourceFunction
- ?一般不需要實現容錯性保證
- ?處理好cancel方法(cancel應用的時候,這個方法會被調用)
?實現并行化的自定義source - ?實現ParallelSourceFunction
- ?或者繼承RichParallelSourceFunction
繼承RichParallelSourceFunction的那些SourceFunction意味著它們都是并行執行的并且可能有一些資源需要open/close
import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.Time; import xxx.xxx.streaming.custormSource.MyNoParalleSource;/*** broadcast分區規則** Created by xxxx on 2020/10/09 on 2018/10/23.*/ public class StreamingDemoWithMyNoPralalleSourceBroadcast {public static void main(String[] args) throws Exception {//獲取Flink的運行環境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(4);//獲取數據源//注意:針對此source,并行度只能設置為1DataStreamSource<Long> text = env.addSource(new MyNoParalleSource()).setParallelism(1);DataStream<Long> num = text.broadcast().map(new MapFunction<Long, Long>() {@Overridepublic Long map(Long value) throws Exception {long id = Thread.currentThread().getId();System.out.println("線程id:"+id+",接收到數據:" + value);return value;}});//每2秒鐘處理一次數據DataStream<Long> sum = num.timeWindowAll(Time.seconds(2)).sum(0);//打印結果sum.print().setParallelism(1);String jobName = StreamingDemoWithMyNoPralalleSourceBroadcast.class.getSimpleName();env.execute(jobName);} }1.10.2.9.DataStream API之Transformations部分詳解
- ?map:輸入一個元素,然后返回一個元素,中間可以做一些清洗轉換等操作
- ?flatmap:輸入一個元素,可以返回零個,一個或者多個元素
- ?filter:過濾函數,對傳入的數據進行判斷,符合條件的數據會被留下
- ?keyBy:根據指定的key進行分組,相同key的數據會進入同一個分區【典型用法見備注】
- ?reduce:對數據進行聚合操作,結合當前元素和上一次reduce返回的值進行聚合操作,然后返回一個新的值
- ?aggregations:sum(),min(),max()等
- ?window:在后面單獨詳解
- ?Union:合并多個流,新的流會包含所有流中的數據,但是union是一個限制,就是所有合并的流類型必須是一致的。
- ?Connect:和union類似,但是只能連接兩個流,兩個流的數據類型可以不同,會對兩個流中的數據應用不同的處理方法。
- ?CoMap, CoFlatMap:在ConnectedStreams中需要使用這種函數,類似于map和flatmap
- ?Split:根據規則把一個數據流切分為多個流
- ?Select:和split配合使用,選擇切分后的流
兩種典型用法:
dataStream.keyBy(“someKey”) // 指定對象中的 "someKey"字段作為分組key
dataStream.keyBy(0) //指定Tuple中的第一個元素作為分組key
注意:以下類型是無法作為key的
- 1:一個實體類對象,沒有重寫hashCode方法,并且依賴object的hashCode方法
- 2:一個任意形式的數組類型
- 3:基本數據類型,int,long
1.10.2.10.DataStream API之partition
?Random partitioning:隨機分區
- ?dataStream.shuffle()
?Rebalancing:對數據集進行再平衡,重分區,消除數據傾斜 - ?dataStream.rebalance()
?Rescaling:解釋見備注 - ?dataStream.rescale()
?Custom partitioning:自定義分區 - ?自定義分區需要實現Partitioner接口
- ?dataStream.partitionCustom(partitioner, “someKey”)
- ?或者dataStream.partitionCustom(partitioner, 0);
?Broadcasting:在后面單獨詳解
Rescaling解釋:
舉個例子:
如果上游操作有2個并發,而下游操作有4個并發,那么上游的一個并發結果分配給下游的兩個并發操作,另外的一個并發結果分配給了下游的另外兩個并發操作.另一方面,下游有兩個并發操作而上游又4個并發操作,那么上游的其中兩個操作的結果分配給下游的一個并發操作而另外兩個并發操作的結果則分配給另外一個并發操作。
Rescaling與Rebalancing的區別:
Rebalancing會產生全量重分區,而Rescaling不會。
自定義分區案例:
import org.apache.flink.api.common.functions.Partitioner;/*** Created by xxxx on 2020/10/09*/ public class MyPartition implements Partitioner<Long> {@Overridepublic int partition(Long key, int numPartitions) {System.out.println("分區總數:"+numPartitions);if(key % 2 == 0){return 0;}else{return 1;}} } import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import xxx.xxx.streaming.custormSource.MyNoParalleSource;/**** 使用自定義分析* 根據數字的奇偶性來分區** Created by xxxx on 2020/10/09 on 2018/10/23.*/ public class SteamingDemoWithMyParitition {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);DataStreamSource<Long> text = env.addSource(new MyNoParalleSource());//對數據進行轉換,把long類型轉成tuple1類型DataStream<Tuple1<Long>> tupleData = text.map(new MapFunction<Long, Tuple1<Long>>() {@Overridepublic Tuple1<Long> map(Long value) throws Exception {return new Tuple1<>(value);}});//分區之后的數據DataStream<Tuple1<Long>> partitionData = tupleData.partitionCustom(new MyPartition(), 0);DataStream<Long> result = partitionData.map(new MapFunction<Tuple1<Long>, Long>() {@Overridepublic Long map(Tuple1<Long> value) throws Exception {System.out.println("當前線程id:" + Thread.currentThread().getId() + ",value: " + value);return value.getField(0);}});result.print().setParallelism(1);env.execute("SteamingDemoWithMyParitition");} }scala案例:
import org.apache.flink.api.common.functions.Partitioner/*** Created by xxxx on 2020/10/09 on 2018/10/23.*/ class MyPartitionerScala extends Partitioner[Long]{override def partition(key: Long, numPartitions: Int) = {println("分區總數:"+numPartitions)if(key % 2 ==0){0}else{1}}} import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import xxx.xxx.streaming.custormSource.MyNoParallelSourceScala/*** Created by xxxx on 2020/10/09 on 2018/10/23.*/ object StreamingDemoMyPartitionerScala {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(2)//隱式轉換import org.apache.flink.api.scala._val text = env.addSource(new MyNoParallelSourceScala)//把long類型的數據轉成tuple類型val tupleData = text.map(line=>{Tuple1(line)// 注意tuple1的實現方式})val partitionData = tupleData.partitionCustom(new MyPartitionerScala,0)val result = partitionData.map(line=>{println("當前線程id:"+Thread.currentThread().getId+",value: "+line)line._1})result.print().setParallelism(1)env.execute("StreamingDemoWithMyNoParallelSourceScala")}}1.10.2.11.DataStream API之Data Sink
?writeAsText():將元素以字符串形式逐行寫入,這些字符串通過調用每個元素的toString()方法來獲取
?print() / printToErr():打印每個元素的toString()方法的值到標準輸出或者標準錯誤輸出流中
?自定義輸出addSink【kafka、redis】
關于redis sink的案例:
import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.redis.RedisSink; import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;/*** 接收socket數據,把數據保存到redis中** list** lpush list_key value** Created by xxxx on 2020/10/09 on 2018/10/23.*/ public class StreamingDemoToRedis {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> text = env.socketTextStream("hadoop100", 9000, "\n");//lpsuh l_words word//對數據進行組裝,把string轉化為tuple2<String,String>DataStream<Tuple2<String, String>> l_wordsData = text.map(new MapFunction<String, Tuple2<String, String>>() {@Overridepublic Tuple2<String, String> map(String value) throws Exception {return new Tuple2<>("l_words", value);}});//創建redis的配置FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("hadoop110").setPort(6379).build();//創建redissinkRedisSink<Tuple2<String, String>> redisSink = new RedisSink<>(conf, new MyRedisMapper());l_wordsData.addSink(redisSink);env.execute("StreamingDemoToRedis");}public static class MyRedisMapper implements RedisMapper<Tuple2<String, String>>{//表示從接收的數據中獲取需要操作的redis key@Overridepublic String getKeyFromData(Tuple2<String, String> data) {return data.f0;}//表示從接收的數據中獲取需要操作的redis value@Overridepublic String getValueFromData(Tuple2<String, String> data) {return data.f1;}@Overridepublic RedisCommandDescription getCommandDescription() {return new RedisCommandDescription(RedisCommand.LPUSH);}} } import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.connectors.redis.RedisSink import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}/***** Created by xxxx on 2020/10/09 .*/ object StreamingDataToRedisScala {def main(args: Array[String]): Unit = {//獲取socket端口號val port = 9000//獲取運行環境val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//鏈接socket獲取輸入數據val text = env.socketTextStream("hadoop100",port,'\n')//注意:必須要添加這一行隱式轉行,否則下面的flatmap方法執行會報錯import org.apache.flink.api.scala._val l_wordsData = text.map(line=>("l_words_scala",line))val conf = new FlinkJedisPoolConfig.Builder().setHost("hadoop110").setPort(6379).build()val redisSink = new RedisSink[Tuple2[String,String]](conf,new MyRedisMapper)l_wordsData.addSink(redisSink)//執行任務env.execute("Socket window count");}class MyRedisMapper extends RedisMapper[Tuple2[String,String]]{override def getKeyFromData(data: (String, String)) = {data._1}override def getValueFromData(data: (String, String)) = {data._2}override def getCommandDescription = {new RedisCommandDescription(RedisCommand.LPUSH)}} }總結
以上是生活随笔為你收集整理的1.10.Flink DataStreamAPI(API的抽象级别、Data Sources、connectors、Source容错性保证、Sink容错性保证、自定义sink、partition等)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 销售内业一般工资多少 还是要看具体
- 下一篇: 增仓占比是什么意思