一文教你快速上手PyFlink
作者|付典
本文介紹了PyFlink項(xiàng)目的目標(biāo)和發(fā)展歷程,以及PyFlink目前的核心功能,包括Python Table API、Python UDF、向量化Python UDF、Python UDF Metrics、PyFlink依賴管理和Python UDF執(zhí)行優(yōu)化,同時(shí)也針對(duì)功能展示了相關(guān)demo。本文主要分為4個(gè)部分:
PyFlink介紹
PyFlink是Flink的一個(gè)子模塊,也是整個(gè)Flink項(xiàng)目的一部分,主要目的是提供Flink的Python語言支持。因?yàn)樵跈C(jī)器學(xué)習(xí)和數(shù)據(jù)分析等領(lǐng)域,Python語言非常重要,甚至是最主要的開發(fā)語言。所以,為了滿足更多用戶需求,拓寬Flink的生態(tài),我們啟動(dòng)了PyFlink項(xiàng)目。
PyFlink項(xiàng)目的目標(biāo)主要有兩點(diǎn),第一點(diǎn)是將Flink的計(jì)算能力輸出給Python用戶,也就是我們會(huì)在Flink中提供一系列的Python API,方便對(duì)Python語言比較熟悉的用戶開發(fā)Flink作業(yè)。
第二點(diǎn),就是將Python生態(tài)基于Flink進(jìn)行分布式化。雖然我們會(huì)在Flink中提供一系列的Python API來給Python用戶來使用,但這對(duì)用戶來說是有學(xué)習(xí)成本的,因?yàn)橛脩粢獙W(xué)習(xí)怎么使用Flink的Python API,了解每一個(gè)API的用途。所以我們希望用戶能在API層使用他們比較熟悉的 Python庫的API,但是底層的計(jì)算引擎使用Flink,從而降低他們的學(xué)習(xí)成本。這是我們未來要做的事情,目前處于啟動(dòng)階段。
下圖是PyFlink項(xiàng)目的發(fā)展情況,目前發(fā)布了3個(gè)版本,支持的內(nèi)容也越來越豐富。
PyFlink相關(guān)功能介紹
我們主要介紹PyFlink以下功能,Python Table API、Python UDF、向量化Python UDF、Python UDF Metrics、PyFlink依賴管理和Python UDF執(zhí)行優(yōu)化。
Python Table API
Python Table API的目的是為了讓用戶可以使用Python語言來開發(fā)Flink作業(yè)。Flink里面有三種類型的API,Process、Function和Table API,前兩者是較為底層的API,基于Process和Function開發(fā)的作業(yè),其邏輯會(huì)嚴(yán)格按照用戶定義的行為進(jìn)行執(zhí)行,而Table API是較為高層的API,基于Table API開發(fā)的作業(yè),其邏輯會(huì)經(jīng)過一系列的優(yōu)化之后進(jìn)行執(zhí)行。
Python Table API,顧名思義就是提供 Table API的Python語言支持。
以下是Python Table API開發(fā)的一個(gè)Flink作業(yè),作業(yè)邏輯是讀取文件,計(jì)算word count,然后再把計(jì)算結(jié)果寫到文件中去。這個(gè)例子雖然簡單,但包括了開發(fā)一個(gè)Python Table API作業(yè)的所有基本流程。
首先我們需要定義作業(yè)的執(zhí)行模式,比如說是批模式還是流模式,作業(yè)的并發(fā)度是多少?作業(yè)的配置是什么。接下來我們需要定義source表和sink表,source表定義了作業(yè)的數(shù)據(jù)源來源于哪里,數(shù)據(jù)的格式是什么;sink表定義了作業(yè)的執(zhí)行結(jié)果寫到哪里去,數(shù)據(jù)格式是什么。最后我們需要定義作業(yè)的執(zhí)行邏輯,在這個(gè)例子中是計(jì)算寫過來的count。
以下是Python Table API的部分截圖,可以看到它的數(shù)量和功能都比較齊全。
Python UDF
Python Table API是一種關(guān)系型的API,其功能可以類比成SQL,而SQL里自定義函數(shù)是非常重要的功能,可以極大地?cái)U(kuò)展SQL的使用范圍。Python UDF的主要目的就是允許用戶使用Python語言來開發(fā)自定義函數(shù),從而擴(kuò)展Python Table API的使用場景。同時(shí),Python UDF除了可以用在Python Table API作業(yè)中之外,還可以用在Java Table API作業(yè)以及SQL作業(yè)中。
在PyFlink中我們支持多種方式來定義Python UDF。用戶可以定義一個(gè)Python類,繼承ScalarFunction,也可以定義一個(gè)普通的Python函數(shù)或者Lambda函數(shù),實(shí)現(xiàn)自定義函數(shù)的邏輯。除此之外,我們還支持通過Callable Function和Partial Function定義Python UDF。用戶可以根據(jù)自己的需要選擇最適合自己的方式。
PyFlink里面提供了多種Python UDF的使用方式,包括Python Table API、Java table API和SQL,我們一一介紹。
在Python Table API中使用Python UDF,在定義完P(guān)ython UDF之后,用戶首先需要注冊(cè)Python UDF,可以調(diào)用table environment register來注冊(cè),然后命名,然后就可以在作業(yè)中通過這個(gè)名字來使用 Python UDF了。
在Java Table API中它的使用方式也比較相似,但是注冊(cè)方式不一樣,Java Table API作業(yè)中需要通過DDL語句來進(jìn)行注冊(cè)。
除此之外,用戶也可以在SQL的作業(yè)中使用Python UDF。與前面兩種方式類似,用戶首先需要注冊(cè)Python UDF,可以在SQL腳本中通過DDL語句來注冊(cè),也可以在SQL Client的環(huán)境配置文件里面注冊(cè)。
Python UDF架構(gòu)
簡單介紹下Python UDF的執(zhí)行架構(gòu)。Flink是用Java語言編寫的,運(yùn)行在Java虛擬機(jī)中,而Python UDF運(yùn)行在 Python虛擬機(jī)中,所以Java進(jìn)程和Python進(jìn)程需要進(jìn)行數(shù)據(jù)通信。 除此之外,兩者間還需要傳輸state、log、metrics,它們的傳輸協(xié)議需要支持4種類型。
向量化Python UDF
向量化Python UDF的主要目的是使 Python用戶可以利用Pandas或者Numpy等數(shù)據(jù)分析領(lǐng)域常用的Python庫,開發(fā)高性能的Python UDF。
向量化Python UDF是相對(duì)于普通Python UDF而言的,我們可以在下圖看到兩者的區(qū)別。
下圖顯示了向量化Python UDF的執(zhí)行過程。首先在Java端,Java在攢完多條數(shù)據(jù)之后會(huì)轉(zhuǎn)換成Arrow格式,然后發(fā)送給Python進(jìn)程。Python進(jìn)程在收到數(shù)據(jù)之后,將其轉(zhuǎn)換成Pandas的數(shù)據(jù)結(jié)構(gòu),然后調(diào)用用戶自定義的向量化Python UDF。同時(shí)向量化Python UDF的執(zhí)行結(jié)果會(huì)再轉(zhuǎn)化成Arrow格式的數(shù)據(jù),再發(fā)送給 Java進(jìn)程。
在使用方式上,向量化Python UDF與普通Python UDF是類似的,只有以下幾個(gè)地方稍有不同。首先向量化Python UDF的聲明方式需要加一個(gè)UDF type,聲明這是一個(gè)向量化Python UDF,同時(shí)UDF的輸入輸出類型是Pandas Series。
Python UDF Metrics
前面我們提到 Python UDF有多種定義方式,但是如果需要在Python UDF中使用Metrics,那么Python UDF必須繼承ScalarFunction來進(jìn)行定義。在Python UDF的 open方法里面提供了一個(gè)Function Context參數(shù),用戶可以通過Function Context參數(shù)來注冊(cè)Metrics,然后就可以通過注冊(cè)的 Metrics對(duì)象來匯報(bào)了。
PyFlink依賴管理
從類型來說,PyFlink依賴主要包括以下幾種類型,普通的PyFlink文件、存檔文件,第三方的庫、PyFlink解釋器,或者Java的Jar包等等。從解決方案來看,針對(duì)每種類型的依賴,PyFlink提供了兩種解決方案,一種是API的解決方案,一種是命令行選項(xiàng)的方式,大家選擇其一即可。
Python UDF執(zhí)行優(yōu)化
Python UDF的執(zhí)行優(yōu)化主要包括兩個(gè)方面,執(zhí)行計(jì)劃優(yōu)化和運(yùn)行時(shí)優(yōu)化。它與SQL非常像,一個(gè)包含Python UDF的作業(yè),首先會(huì)經(jīng)過預(yù)先定義的規(guī)則,生成一個(gè)最優(yōu)的執(zhí)行計(jì)劃。在執(zhí)行計(jì)劃已經(jīng)確定的情況下,在實(shí)際執(zhí)行的時(shí)候,又可以運(yùn)用一些其他的優(yōu)化手段來達(dá)到盡可能高的執(zhí)行效率。
Python UDF執(zhí)行計(jì)劃優(yōu)化
執(zhí)行計(jì)劃的優(yōu)化主要有以下幾個(gè)優(yōu)化思路。一個(gè)是不同類型的 UDF的拆分,由于在一個(gè)節(jié)點(diǎn)中可能同時(shí)包含多種類型的UDF,而不同的類型的UDF是不能放在一塊執(zhí)行的;第二個(gè)方面是Filter下推,其主要目的是盡可能降低含有Python UDF節(jié)點(diǎn)的輸入數(shù)據(jù)量,從而提升整個(gè)作業(yè)的執(zhí)行性能;第三個(gè)優(yōu)化思路是Python UDF Chaining,Java進(jìn)程與Python進(jìn)程之間的通信開銷以及序列化反序列化開銷比較大,而Python UDF Chaining可以盡量減少Java進(jìn)程和Python進(jìn)程之間的通信開銷。
不同類型UDF的拆分
假如有這樣一個(gè)作業(yè),它包含了兩個(gè)UDF,其中add是Python UDF, subtract是向量化Python UDF。默認(rèn)情況下,這個(gè)作業(yè)的執(zhí)行計(jì)劃會(huì)有一個(gè)project節(jié)點(diǎn),這兩個(gè) UDF同時(shí)位于這一project的節(jié)點(diǎn)里面。這個(gè)執(zhí)行計(jì)劃的主要問題是,普通Python UDF每次處理一條數(shù)據(jù),而向量化Python UDF,每次處理多條數(shù)據(jù),所以這樣的一個(gè)執(zhí)行計(jì)劃是沒有辦法執(zhí)行的。
但是通過拆分,我們可以把這一個(gè)project的節(jié)點(diǎn)拆分成了兩個(gè)project的節(jié)點(diǎn),其中第一個(gè)project的節(jié)點(diǎn)只包含普通Python UDF,而第二個(gè)節(jié)點(diǎn)只包含向量化Python UDF。不同類型的Python UDF拆分到不同的節(jié)點(diǎn)之后,每一個(gè)節(jié)點(diǎn)都只包含了一種類型的UDF,所以算子就可以根據(jù)它所包含的UDF的類型選擇最合適的執(zhí)行方式。
Filter下推到Python UDF之前
Filter下推的主要目的是將過濾算子下推到Python UDF節(jié)點(diǎn)之前,盡量減少Python UDF節(jié)點(diǎn)的數(shù)據(jù)量。
假如我們有這樣一個(gè)作業(yè),作業(yè)原始執(zhí)行計(jì)劃里面包括了兩個(gè)Project的節(jié)點(diǎn),一個(gè)是add、 subtract,同時(shí)還包括一個(gè)Filter節(jié)點(diǎn)。這個(gè)執(zhí)行計(jì)劃是可以運(yùn)行的,但需要更優(yōu)化??梢钥吹?#xff0c;因?yàn)镻ython的節(jié)點(diǎn)位于Filter節(jié)點(diǎn)之前,所以在Filter節(jié)點(diǎn)之前Python UDF已經(jīng)計(jì)算完了,但是如果把Filter過濾下,推到Python UDF之前,那么就可以大大降低Python UDF節(jié)點(diǎn)的輸入數(shù)據(jù)量。
Python UDF Chaining
假如我們有這樣一個(gè)作業(yè),里面包含兩種類型的UDF,一個(gè)是add,一個(gè)是subtract,它們都是普通的Python UDF。在一個(gè)執(zhí)行計(jì)劃里面包含兩個(gè)project的節(jié)點(diǎn),其中第一個(gè)project的節(jié)點(diǎn)先算subtract,然后再傳輸給第二個(gè)project節(jié)點(diǎn)進(jìn)行執(zhí)行。
它的主要問題是,由于subtract和add位于兩個(gè)不同的節(jié)點(diǎn),其計(jì)算結(jié)果需要從Python發(fā)送回Java,然后再由Java進(jìn)程發(fā)送給第二個(gè)節(jié)點(diǎn)的Python進(jìn)行執(zhí)行。相當(dāng)于數(shù)據(jù)在Java進(jìn)程和Python進(jìn)程之間轉(zhuǎn)了一圈,所以它帶來了完全沒有必要的通信開銷和序列化反序列化開銷。因此,我們可以將執(zhí)行計(jì)劃優(yōu)化成右圖,就是將add節(jié)點(diǎn)和subtract節(jié)點(diǎn)放在一個(gè)節(jié)點(diǎn)中運(yùn)行,subtract節(jié)點(diǎn)的結(jié)果計(jì)算出來之后直接去調(diào)用add節(jié)點(diǎn)。
Python UDF運(yùn)行時(shí)優(yōu)化
目前提高Python UDF運(yùn)營時(shí)的執(zhí)行效率有三種:一是Cython優(yōu)化,用它來提高Python代碼的執(zhí)行效率;二是自定義Java進(jìn)程和Python進(jìn)程之間的序列化器和反序列化器,提高序列化和反序列化效率;三是提供向量化Python UDF功能。
PyFlink相關(guān)功能演示
首先大家打開這個(gè)頁面,里面提供了PyFlink的一些demo,這些demo是運(yùn)行在docker里面的,所以大家如果要運(yùn)行這些demo就需要在本機(jī)安裝docker環(huán)境。
隨后,我們可以運(yùn)行命令,命令會(huì)啟動(dòng)一個(gè)PyFlink的集群,后面我們運(yùn)行的PyFlink的例子都會(huì)提交到集群去執(zhí)行。
第一個(gè)例子是word count,我們首先在里面定義了環(huán)境、source、sink等,我們可以運(yùn)行一下這個(gè)作業(yè)。
這是作業(yè)的執(zhí)行結(jié)果,可以看到Flink這個(gè)單詞出現(xiàn)了兩次,PyFlink這個(gè)單詞出現(xiàn)了一次。
接下來再運(yùn)行一個(gè)Python UDF的例子。這個(gè)例子和前面有一些類似,首先我們定義它使用PyFlink,運(yùn)行在批這種模式下,同時(shí)作業(yè)的并發(fā)度是1。不一樣的地方是我們?cè)谧鳂I(yè)里定義了一個(gè)UDF,它的輸入包括兩個(gè)列,都是Bigint類型,而且它輸出類型也是對(duì)應(yīng)的。這個(gè)UDF的邏輯是把這兩個(gè)列的相加作為一個(gè)結(jié)果輸出。
我們執(zhí)行一下作業(yè),執(zhí)行結(jié)果是3。
接下來我們?cè)龠\(yùn)行一個(gè)帶有依賴的Python UDF。前面作業(yè)的UDF是不包含任何依賴的,直接就把兩個(gè)輸入列相加起來。而在這個(gè)例子里,UDF引用了一個(gè)第三方的依賴,我們可以通過API set python requirement來執(zhí)行。
接下來我們運(yùn)行作業(yè),它的執(zhí)行結(jié)果和前面是一樣的,因?yàn)檫@兩個(gè)作業(yè)的邏輯是類似的。
接下來我們?cè)倏匆粋€(gè)向量化Python UDF的例子。在 UDF定義的時(shí)候,我們加了一個(gè)UDF的type字段,說明說我們是一個(gè)向量化的Python UDF,其他的邏輯和普通Python UDF的邏輯類似。最后它的執(zhí)行結(jié)果也是3,因?yàn)樗倪壿嫼颓懊媸且粯拥?#xff0c;計(jì)算兩頁的之和。
我們?cè)賮砜匆粋€(gè)例子,在Java的Table作業(yè)里面使用Python。在這個(gè)作業(yè)里面我們又會(huì)用到一個(gè)Python UDF,它通過DDL語句進(jìn)行注冊(cè),然后在execute SQL語句里面進(jìn)行使用。
接下來我們?cè)倏丛诩僑QL作業(yè)中使用Python UDF的例子。在資源文件里面我們聲明了一個(gè)UDF,名字叫add1,它的類型是Python,同時(shí)我們也能看到它的UDF位置。
接下來我們運(yùn)行它,執(zhí)行結(jié)果是234。
PyFlink下一步規(guī)劃
目前PyFlink只支持了Python Table API,我們計(jì)劃在下一個(gè)版本中支持DataStream API,同時(shí)也會(huì)支持Python UDAF以及Pandas UDAF,另外,在執(zhí)行層也會(huì)持續(xù)優(yōu)化PyFlink的執(zhí)行效率。
這是一些資源的鏈接,包括PyFlink的文檔地址。
- Python Table API文檔
https://ci.apache.org/projects/flink/flink-docs-master/api/python/
- PyFlink文檔
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/python/
- PyFlink playground
https://github.com/pyflink/playgrounds/tree/1.11
好的,我們今天的分享就到這里了,歡迎大家繼續(xù)關(guān)注我們的課程。
活動(dòng)推薦:
僅需99元即可體驗(yàn)阿里云基于 Apache Flink 構(gòu)建的企業(yè)級(jí)產(chǎn)品-實(shí)時(shí)計(jì)算 Flink 版!點(diǎn)擊下方鏈接了解活動(dòng)詳情:https://www.aliyun.com/product/bigdata/sc?utm_content=g_1000250506
原文鏈接:https://developer.aliyun.com/article/782821?
版權(quán)聲明:本文內(nèi)容由阿里云實(shí)名注冊(cè)用戶自發(fā)貢獻(xiàn),版權(quán)歸原作者所有,阿里云開發(fā)者社區(qū)不擁有其著作權(quán),亦不承擔(dān)相應(yīng)法律責(zé)任。具體規(guī)則請(qǐng)查看《阿里云開發(fā)者社區(qū)用戶服務(wù)協(xié)議》和《阿里云開發(fā)者社區(qū)知識(shí)產(chǎn)權(quán)保護(hù)指引》。如果您發(fā)現(xiàn)本社區(qū)中有涉嫌抄襲的內(nèi)容,填寫侵權(quán)投訴表單進(jìn)行舉報(bào),一經(jīng)查實(shí),本社區(qū)將立刻刪除涉嫌侵權(quán)內(nèi)容。總結(jié)
以上是生活随笔為你收集整理的一文教你快速上手PyFlink的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 关于写文章的一点经验
- 下一篇: Flink SQL 在网易云音乐的产品化