Datastream 开发打包问题
簡介:Datastream作業(yè)開發(fā)時往往會遇到一些jar包沖突等問題,本文主要講解作業(yè)開發(fā)時需要引入哪些依賴以及哪些需要被打包進(jìn)作業(yè)的jar中,從而避免不必要的依賴被打入了作業(yè)jar中以及可能產(chǎn)生的依賴沖突。
Datastream作業(yè)開發(fā)時往往會遇到一些jar包沖突等問題,本文主要講解作業(yè)開發(fā)時需要引入哪些依賴以及哪些需要被打包進(jìn)作業(yè)的jar中,從而避免不必要的依賴被打入了作業(yè)jar中以及可能產(chǎn)生的依賴沖突。
一個Datastream作業(yè)主要涉及下述依賴:
Flink的核心依賴以及應(yīng)用程序自身的依賴
每一個Flink應(yīng)用程序都依賴于一系列相關(guān)的庫,其中至少應(yīng)該包括Flink的API. 許多應(yīng)用程序還依賴于連接器相關(guān)的庫(如 Kafka, Cassandra等).在運(yùn)行Flink應(yīng)用程序時,無論是在運(yùn)行在分布式的環(huán)境下還是在本地IDE進(jìn)行測試,Flink的運(yùn)行時相關(guān)依賴都是必須的。
與大多數(shù)運(yùn)行用戶自定義應(yīng)用程序的系統(tǒng)一樣,Flink 中有兩大類依賴項(xiàng):
- Flink核心依賴:Flink 本身由一組運(yùn)行系統(tǒng)所必需的類和依賴項(xiàng)組成,例如協(xié)調(diào)器、網(wǎng)絡(luò)、檢查點(diǎn)、容錯、API、算子(例如窗口)、資源管理等。 所有這些類和依賴項(xiàng)的集合構(gòu)成了 Flink 運(yùn)行時的核心,在 Flink 應(yīng)用程序啟動時必須存在。這些核心類和依賴項(xiàng)都被打包在 flink-dist jar 中。 它們是 Flink 的 lib 文件夾的一部分,也是Flink基礎(chǔ)容器鏡像的一部分。這些依賴之于Flink就像Java 運(yùn)行所需的包含 String 和 List 等類的核心庫(rt.jar、charsets.jar 等)之于Java。Flink的核心依賴不包含任何連接器或擴(kuò)展庫(CEP、SQL、ML等),這使得Flink的核心依賴盡可能小,以避免默認(rèn)情況下類路徑中有過多的依賴項(xiàng),同時減少依賴沖突。
- 用戶應(yīng)用程序依賴項(xiàng):指特定用戶應(yīng)用程序所需的所有連接器、Format或擴(kuò)展庫。用戶應(yīng)用程序通常被打包成一個 jar文件,其中包含應(yīng)用程序代碼以及所需的連接器和庫依賴項(xiàng)。用戶應(yīng)用程序依賴項(xiàng)不應(yīng)包括 Flink DataStream API 和運(yùn)行時依賴項(xiàng),因?yàn)檫@些已經(jīng)被包含在了Flink 的核心依賴中。
依賴配置步驟
1.添加基礎(chǔ)依賴
每一個Flink應(yīng)用程序的開發(fā)至少需要添加對相關(guān)API的基礎(chǔ)依賴。
手動配置項(xiàng)目時,需要添加對Java/Scala API的依賴(這里以Maven為例,在其他構(gòu)建工具(Gradle,SBT等)中可以使用同樣的依賴)。
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.11</artifactId><version>1.12.3</version><scope>provided</scope> </dependency>重要提示:請注意,所有這些依賴項(xiàng)都將其范圍設(shè)置為"provided"。這意味著需要對它們進(jìn)行編譯,但不應(yīng)將它們打包到項(xiàng)目生成的應(yīng)用程序jar文件中——這些依賴項(xiàng)是Flink核心依賴項(xiàng),在實(shí)際運(yùn)行時已經(jīng)被加載。
強(qiáng)烈建議將依賴項(xiàng)設(shè)置成"provided"的范圍,如果未將它們設(shè)置為"provided",最好的情況下會導(dǎo)致生成的jar變得臃腫,因?yàn)樗€包含所有Flink核心依賴項(xiàng)。而最懷的情況下,添加到應(yīng)用程序jar文件中的Flink核心依賴項(xiàng)與您自己的一些依賴項(xiàng)會發(fā)生版本沖突(通常通過Flink的反向類加載機(jī)制來避免)。
關(guān)于IntelliJ的注意事項(xiàng):為了使應(yīng)用程序在IntelliJ IDEA中運(yùn)行,有必要在運(yùn)行配置中勾選"Include dependencies with "Provided" scope"選項(xiàng)框。如果沒有該選項(xiàng)(可能是由于使用較舊的IntelliJ IDEA版本),那么一個簡單的解決方法是創(chuàng)建一個調(diào)用應(yīng)用程序 main() 方法的測試用例。
2.添加連接器和庫的依賴
大多數(shù)應(yīng)用程序的運(yùn)行需要特定的連接器或庫,例如Kafka、Cassandra等連接器。這些連接器不是Flink核心依賴項(xiàng)的一部分,必須作為額外依賴項(xiàng)添加到應(yīng)用程序中。
下述代碼是添加Kafka連接器依賴項(xiàng)的示例(Maven語法):
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.11</artifactId><version>1.12.3</version> </dependency>我們建議將應(yīng)用程序代碼和它所有的依賴以jar-with-dependencies?的形式打包到一個application jar中。這個應(yīng)用程序jar包可以被提交到已經(jīng)存在的Flink集群上去,或者被加入到Flink應(yīng)用程序的容器鏡像中去。
從Maven作業(yè)模版(見下文Maven作業(yè)模版部分)創(chuàng)建的項(xiàng)目,通過mvn clean package命令會自動把依賴打到應(yīng)用程序的jar包中去。對于沒有使用模版進(jìn)行配置的情況,建議使用Maven Shade Plugin (配置如附錄所示) 來構(gòu)建包含依賴的jar包。
重要提示:對于Maven(和其他構(gòu)建工具)來說,要將依賴項(xiàng)正確打包到應(yīng)用程序jar中,這些應(yīng)用程序依賴項(xiàng)的scope必須指定為"compile"(與核心依賴項(xiàng)不同,核心依賴項(xiàng)的scope必須指定為"provided")。
注意事項(xiàng)
Scala版本
Scala的不同版本(2.11,2.12等)相互之間是不兼容的。因此,Scala 2.11對應(yīng)的Flink版本不能用于使用Scala 2.12的應(yīng)用程序。
所有依賴(或傳遞依賴)于Scala的Flink依賴項(xiàng)都以構(gòu)建它們的Scala版本作為后綴,例如flink-streaming-scala_2.11。
只使用Java進(jìn)行開發(fā)時可以選擇任何Scala版本,使用Scala開發(fā)時需要選擇與其應(yīng)用程序的Scala版本匹配的Flink依賴版本。
注:2.12.8之后的Scala版本與之前的2.12.x版本不兼容,因此Flink項(xiàng)目無法將其2.12.x版本升級到2.12.8之后的版本。用戶可以在本地自己編譯對應(yīng)Scala版本的Flink。為了使其能夠正常工作,需要添加-Djapicmp.skip以在構(gòu)建時跳過二進(jìn)制兼容性檢查。
Hadoop依賴
一般的規(guī)則: 永遠(yuǎn)不要將Hadoop相關(guān)依賴直接添加到應(yīng)用程序中.?(唯一的例外是將現(xiàn)有的Hadoop輸入/輸出Format與Flink的Hadoop兼容包一起使用時)
如果希望將Flink與Hadoop結(jié)合使用,則需要包含Hadoop依賴的Flink啟動項(xiàng),而不是將Hadoop添加為應(yīng)用程序依賴項(xiàng)。Flink將使用HADOOP_CLASSPATH環(huán)境變量指定的Hadoop依賴項(xiàng),可通過以下方式進(jìn)行設(shè)置:
export HADOOP_CLASSPATH**=**hadoop classpath``
這種設(shè)計(jì)有兩個主要原因:
- 一些與Hadoop的交互可能發(fā)生在Flink的核心模塊中,并且在用戶應(yīng)用程序啟動之前,例如為檢查點(diǎn)設(shè)置HDFS、通過Hadoop的Kerberos令牌進(jìn)行身份驗(yàn)證,或者在YARN上進(jìn)行部署等。
- Flink的反向類加載機(jī)制從核心依賴項(xiàng)中隱藏了許多可傳遞的依賴項(xiàng)。這不僅適用于Flink自己的核心依賴項(xiàng),而且適用于Hadoop的依賴項(xiàng)。這樣,應(yīng)用程序就可以使用相同依賴項(xiàng)的不同版本,而不會發(fā)生依賴項(xiàng)沖突(相信我們,這是一件大事,因?yàn)镠adoop依賴樹非常龐大。)
如果在IDE內(nèi)部的測試或開發(fā)過程中需要Hadoop依賴項(xiàng)(例如HDFS訪問),請將這些依賴項(xiàng)的scope配置為
test?或則?provided。
Transform table connector/format resources?#
Flink使用Java的Service Provider Interfaces (SPI)?機(jī)制通過特定標(biāo)識符加載table的connector/format工廠。由于每個table的connector/format的名為org.apache.flink.table.factories.Factory的SPI資源文件位于同一目錄:META-INF/services下,因此在構(gòu)建使用多個table connector/format的項(xiàng)目的uber jar時,這些資源文件將相互覆蓋,這將導(dǎo)致Flink無法正確加載工廠類。
在這種情況下,推薦的方法是通過maven shade插件的ServicesResourceTransformer轉(zhuǎn)換META-INF/services目錄下的這些資源文件。給定示例的pom.xml文件內(nèi)容如下,其中包含連接器flink-sql-connector-hive-3.1.2和flink-parquet format。
<modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>myProject</artifactId><version>1.0-SNAPSHOT</version><dependencies><!-- other project dependencies ...--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-sql-connector-hive-3.1.2__2.11</artifactId><version>1.13.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-parquet__2.11<</artifactId><version>1.13.0</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><executions><execution><id>shade</id><phase>package</phase><goals><goal>shade</goal></goals><configuration><transformers combine.children="append"><!-- The service transformer is needed to merge META-INF/services files --><transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/><!-- ... --></transformers></configuration></execution></executions></plugin></plugins></build>在配置了ServicesResourceTransformer之后, 項(xiàng)目構(gòu)建uber-jar時,META-INF/services目錄下的這些資源文件會被整合在一起而不是相互覆蓋。
Maven作業(yè)模版
強(qiáng)烈建議使用該方式進(jìn)行配置,可以減少很多重復(fù)的配置工作。
前置要求
唯一的環(huán)境要求是安裝了Maven 3.0.4(或更高版本)和Java 8.x。
創(chuàng)建項(xiàng)目
使用以下兩種方式中的一種創(chuàng)建項(xiàng)目:
- 使用Maven archetypes
這允許您命名新創(chuàng)建的項(xiàng)目。它將以交互方式要求您輸入groupId、artifactId和包名。
- 運(yùn)行quickstart腳本
我們建議您將此項(xiàng)目導(dǎo)入IDE以開發(fā)和測試它。IntelliJ IDEA原生支持Maven項(xiàng)目。如果使用Eclipse,可以使用m2e插件導(dǎo)入Maven項(xiàng)目。默認(rèn)情況下,某些Eclipse捆綁包包含該插件,否則需要您手動安裝。
請注意:默認(rèn)的Java JVM heap size對于Flink來說可能太小了。你必須手動增加它。在Eclipse中,選擇RunConfigurations->Arguments并寫入VM Arguments框:-Xmx800m。在IntelliJ IDEA中,更改JVM選項(xiàng)的推薦方法是使用Help | Edit Custom VM Options選項(xiàng)菜單。細(xì)節(jié)見這篇文章.
構(gòu)建項(xiàng)目
如果要生成/打包項(xiàng)目,請轉(zhuǎn)到項(xiàng)目目錄并運(yùn)行"mvn clean package"命令。執(zhí)行后將會得到一個JAR文件:target/-.jar,其中包含您的應(yīng)用程序,以及作為依賴項(xiàng)添加到應(yīng)用程序的連接器和庫。
注意:如果使用與StreamingJob不同的類作為應(yīng)用程序的主類/入口點(diǎn),我們建議您相應(yīng)地更改pom.xml文件中的mainClass設(shè)置。這樣,Flink就可以直接從JAR文件運(yùn)行應(yīng)用程序,而無需另外指定主類。
附錄: 構(gòu)建帶依賴的jar包的模版
要構(gòu)建包含連接器和庫所需的所有依賴項(xiàng)的應(yīng)用程序JAR,可以使用以下shade插件定義:
<build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.1.1</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><artifactSet><excludes><exclude>com.google.code.findbugs:jsr305</exclude><exclude>org.slf4j:*</exclude><exclude>log4j:*</exclude></excludes></artifactSet><filters><filter><!-- Do not copy the signatures in the META-INF folder.Otherwise, this might cause SecurityExceptions when using the JAR. --><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>my.programs.main.clazz</mainClass></transformer></transformers></configuration></execution></executions></plugin></plugins> </build>原文鏈接
本文為阿里云原創(chuàng)內(nèi)容,未經(jīng)允許不得轉(zhuǎn)載。?
總結(jié)
以上是生活随笔為你收集整理的Datastream 开发打包问题的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 如何通过任务调度实现百万规则报警
- 下一篇: 技术分享:从双11看实时数仓Hologr