《Pyflink》Flink集群安装,Python+Flink调研
Flink集群安裝,Python+Flink調研
Flink集群部署
下載對應版本安裝包:https://flink.apache.org/downloads.html
實驗環境為hadoop2.7, scala2.11 所以下載flink-1.7.1-bin-hadoop27-scala_2.11.tgz
配置conf/flink-conf.yaml
jobmanager.rpc.address : master 節點 jobmanager.heap.mb : JobManager可用的內存數量 taskmanager.heap.mb : 每個TaskManager可以用內存數量 taskmanager.numberOfTaskSlots : 每個機器可用的CPU數量 parallelism.default : 集群中總的CPU數量 taskmanager.tmp.dirs : 臨時目錄配置conf/slaves
slave1 slave2點擊查看更多配置項
把在master上配置好的,文件夾發送到各個worker節點上
scp -r flink-1.7.1 hadoop@slavle1:~ scp -r flink-1.7.1 hadoop@slavle2:~啟動/終止 Flink
# 啟動一個JobManager,并通過SSH連接列在slaves文件中的所有節點以便在每個節點上啟動TaskManager flink-1.7.1/bin/start-cluster.sh # 停止flink集群,直接在master節點運行bin/stop-cluster.sh flink-1.7.1/bin/stop-cluster.sh啟動后在web界面輸入:master:8081 查看Web-UI
運行Python腳本
-
以官網的一個示例進行測試,可以復制粘貼這些代碼存儲為wordcount.py并在本地運行。
-
wordcount.py
from flink.plan.Environment import get_environment from flink.functions.GroupReduceFunction import GroupReduceFunctionclass Adder(GroupReduceFunction):def reduce(self, iterator, collector):count, word = iterator.next()count += sum([x[0] for x in iterator])collector.collect((count, word)) # 1. 獲取一個運行環境 env = get_environment() # 2. 加載/創建初始數據 data = env.from_elements("Who's there?","I think I hear them. Stand, ho! Who's there?")# 3. 指定對這些數據的操作 data \.flat_map(lambda x, c: [(1, word) for word in x.lower().split()]) \.group_by(1) \.reduce_group(Adder(), combinable=True) \.output()# 4. 運行程序 env.execute(local=True) # 設置execute(local=True)強制程序在本機運行 -
執行方法:
為了在Flink中運行計劃任務,到Flink目錄下,運行/bin文件夾下的pyflink.sh腳本。對于python2.7版本,運行pyflink2.sh;對于python3.4版本,運行pyflink3.sh。包含計劃任務的腳本應當作為第一個輸入參數,其后可添加一些另外的python包,最后,在“-”之后,輸入其他附加參數。
./bin/pyflink<2/3>.sh <Script>[ <pathToPackage1>[ <pathToPackageX]][ - <param1>[ <paramX>]] -
針對上面的示例,在master-shell輸入:
flink-1.7.1/bin/pyflink.sh ./wordcount.py -
vi flink-1.7.1/log/flink-hadoop-taskexecutor-0-slave1.out查看輸出:
任務詳解
-
從示例程序可以看出,Flink程序看起來就像普通的python程序一樣。每個程序都包含相同的基本組成部分:
- 獲取一個運行環境
- 加載/創建初始數據
- 指定對這些數據的操作
- 指定計算結果的存放位置
- 運行程序
-
Environment(運行環境)是所有Flink程序的基礎。通過調用Environment類中的一些靜態方法來建立一個環境:
get_environment() -
運行環境可通過多種讀文件的方式來指定數據源。如果是簡單的按行讀取文本文件:
env = get_environment() text = env.read_text("file:///path/to/file")這樣,你就獲得了可以進行操作(apply transformations)的數據集。關于數據源和輸入格式的更多信息,請參考Data Sources
一旦你獲得了一個數據集DataSet,你就可以通過transformations來創建一個新的數據集,并把它寫入到文件,再次transform,或者與其他數據集相結合。你可以通過對數據集調用自己個性化定制的函數來進行數據操作。例如,一個類似這樣的數據映射操作:
data.map(lambda x: x*2)這將會創建一個新的數據集,其中的每個數據都是原來數據集中的2倍。若要獲取關于所有transformations的更多信息,及所有數據操作的列表,請參考Transformations。
-
當需要將所獲得的數據集寫入到磁盤時,調用下面三種函數的其中一個即可。
data.write_text("<file-path>", WriteMode=Constants.NO_OVERWRITE)write_csv("<file-path>", line_delimiter='\n', field_delimiter=',', write_mode=Constants.NO_OVERWRITE)output()其中,最后一種方法僅適用于在本機上進行開發/調試,它會將數據集的內容輸出到標準輸出。(請注意,當函數在集群上運行時,結果將會輸出到整個集群節點的標準輸出流,即輸出到workers的.out文件。)前兩種方法,能夠將數據集寫入到對應的文件中。關于寫入到文件的更多信息,請參考Data Sinks。
當設計好了程序之后,你需要在環境中執行execute命令來運行程序。可以選擇在本機運行,也可以提交到集群運行,這取決于Flink的創建方式。你可以通過設置execute(local=True)強制程序在本機運行。
創建項目
-
除了搭建好Flink運行環境,就無需進行其他準備工作了。Python包可以從你的Flink版本對應的/resource文件夾找到。在執行工作任務時,Flink 包,plan包和optional包均可以通過HDFS自動分發。
Python API官方已經在安裝了Python2.7或3.4的Linux/Windows系統上測試過。本次我是在安裝了Python3.6的Linux環境進行測試。
默認情況下,Flink通過調用”python”或”python3″來啟動python進程,這取決于使用了哪種啟動腳本。通過在 flink-conf.yaml 中設置 “python.binary.python[2/3]”對應的值,來設定你所需要的啟動方式。
惰性評價
-
所有的Flink程序都是延遲執行的。當程序的主函數執行時,數據的載入和操作并沒有在當時發生。與此相反,每一個被創建出來的操作都被加入到程序的計劃中。當程序環境中的某個對象調用了execute()函數時,這些操作才會被真正的執行。不論該程序是在本地運行還是集群上運行。
延遲求值能夠讓你建立復雜的程序,并在Flink上以一個整體的計劃單元來運行。
數據變換
- 數據變換(Data transformations)可以將一個或多個數據集映射為一個新的數據集。程序能夠將多種變換結合到一起來進行復雜的整合變換。
該小節將概述各種可以實現的數據變換。transformations documentation數據變換文檔中,有關于所有數據變換和示例的全面介紹。
Map:輸入一個元素,輸出一個元素
data.map(lambda x: x * 2)FlatMap:輸入一個元素,輸出0,1,或多個元素
data.flat_map( lambda x,c: [(1,word) for word in line.lower().split() for line in x])MapPartition:通過一次函數調用實現并行的分割操作。該函數將分割變換作為一個”迭代器”,并且能夠產生任意數量的輸出值。每次分割變換的元素數量取決于變換的并行性和之前的操作結果。
data.map_partition(lambda x,c: [value * 2 for value in x])Filter:對每一個元素,計算一個布爾表達式的值,保留函數計算結果為true的元素。
data.filter(lambda x: x > 1000)Reduce:通過不斷的將兩個元素組合為一個,來將一組元素結合為一個單一的元素。這種縮減變換可以應用于整個數據集,也可以應用于已分組的數據集。
data.reduce(lambda x,y : x + y)ReduceGroup:將一組元素縮減為1個或多個元素。縮減分組變換可以被應用于一個完整的數據集,或者一個分組數據集。
lass Adder(GroupReduceFunction): def reduce(self, iterator, collector):count, word = iterator.next()count += sum([x[0] for x in iterator) collector.collect((count, word))data.reduce_group(Adder())Aggregate:對一個數據集包含所有元組的一個域,或者數據集的每個數據組,執行某項built-in操作(求和,求最小值,求最大值)。聚集變換可以被應用于一個完整的數據集,或者一個分組數據集。
# This code finds the sum of all of the values in the first field and the maximum of all of the values in the second field data.aggregate(Aggregation.Sum, 0).and_agg(Aggregation.Max, 1)# min(), max(), and sum() syntactic sugar functions are also available data.sum(0).and_agg(Aggregation.Max, 1)Join:對兩個數據集進行聯合變換,將得到一個新的數據集,其中包含在兩個數據集中擁有相等關鍵字的所有元素對。也可通過JoinFunction來把成對的元素變為單獨的元素。關于join keys的更多信息請查看 keys
# In this case tuple fields are used as keys. # "0" is the join field on the first tuple # "1" is the join field on the second tuple. result = input1.join(input2).where(0).equal_to(1)CoGroup:是Reduce變換在二維空間的一個變體。將來自一個或多個域的數據加入數據組。變換函數transformation function將被每一對數據組調用。關于定義coGroup keys的更多信息,請查看 keys 。
data1.co_group(data2).where(0).equal_to(1)Cross:計算兩個輸入數據集的笛卡爾乘積(向量叉乘),得到所有元素對。也可通過CrossFunction實現將一對元素轉變為一個單獨的元素。
result = data1.cross(data2)Union:將兩個數據集進行合并。
data.union(data2)ZipWithIndex:為數據組中的元素逐個分配連續的索引。了解更多信息,請參考 【Zip Elements Guide】(zip_elements_guide.html#zip-with-a-dense-index).
data.zip_with_index()指定keys
-
一些變換(例如Join和CoGroup),需要在進行變換前,為作為輸入參數的數據集指定一個關鍵字,而另一些變換(例如Reduce和GroupReduce),則允許在變換操作之前,對數據集根據某個關鍵字進行分組。
數據集可通過如下方式分組
reduced = data \ .group_by(<define key here>) \ .reduce_group(<do something>)Flink中的數據模型并不是基于鍵-值對。你無需將數據集整理為keys和values的形式。鍵是”虛擬的”:它們被定義為在真實數據之上,引導分組操作的函數。
為元組定義keys
-
最簡單的情形是對一個數據集中的元組按照一個或多個域進行分組:
grouped = data \ .group_by(0) \ .reduce(/*do something*/)數據集中的元組被按照第一個域分組。對于接下來的group-reduce函數,輸入的數據組中,每個元組的第一個域都有相同的值。
grouped = data \ .group_by(0,1) \ .reduce(/*do something*/)在上面的例子中,數據集的分組基于第一個和第二個域形成的復合關鍵字,因此,reduce函數輸入數據組中,每個元組兩個域的值均相同。
關于嵌套元組需要注意:如果你有一個使用了嵌套元組的數據集,指定group_by()操作,系統將把整個元組作為關鍵字使用。
向Flink傳遞函數
-
一些特定的操作需要采用用戶自定義的函數,因此它們都接受lambda表達式和rich functions作為輸入參數。
data.filter(lambda x: x > 5)class Filter(FilterFunction):def filter(self, value):return value > 5data.filter(Filter())
Rich functions可以將函數作為輸入參數,允許使用broadcast-variables(廣播變量),能夠由init()函數參數化,是復雜函數的一個可考慮的實現方式。它們也是在reduce操作中,定義一個可選的combine function的唯一方式。
Lambda表達式可以讓函數在一行代碼上實現,非常便捷。需要注意的是,如果某個操作會返回多個數值,則其使用的lambda表達式應當返回一個迭代器。(所有函數將接收一個collector輸入 參數)。
數據類型
-
Flink的Python API目前僅支持python中的基本數據類型(int,float,bool,string)以及byte arrays。
class MyObj(object):def __init__(self, i):self.value = iclass MySerializer(object):def serialize(self, value):return struct.pack(">i", value.value)class MyDeserializer(object):def _deserialize(self, read):i = struct.unpack(">i", read(4))[0]return MyObj(i)env.register_custom_type(MyObj, MySerializer(), MyDeserializer())
運行環境對數據類型的支持,包括序列化器serializer,反序列化器deserializer,以及自定義類型的類。Tuples/Lists
可以使用元組(或列表)來表示復雜類型。Python中的元組可以轉換為Flink中的Tuple類型,它們包含數量固定的不同類型的域(最多25個)。每個域的元組可以是基本數據類型,也可以是其他的元組類型,從而形成嵌套元組類型。
word_counts = env.from_elements(("hello", 1), ("world",2))counts = word_counts.map(lambda x: x[1])當進行一些要求指定關鍵字的操作時,例如對數據記錄進行分組或配對。通過設定關鍵字,可以非常便捷地指定元組中各個域的位置。你可以指定多個位置,從而實現復合關鍵字(更多信息,查閱Section Data Transformations)。
wordCounts \ .group_by(0) \ .reduce(MyReduceFunction())
數據源
-
數據源創建了初始的數據集,包括來自文件,以及來自數據接口/集合兩種方式。
-
基于文件的:
read_text(path) – 按行讀取文件,并將每一行以String形式返回。
read_csv(path,type) – 解析以逗號(或其他字符)劃分數據域的文件。
返回一個包含若干元組的數據集。支持基本的java數據類型作為字段類型。 -
基于數據集合的:
from_elements(*args) – 基于一系列數據創建一個數據集,包含所有元素。
generate_sequence(from, to) – 按照指定的間隔,生成一系列數據。 -
Examples
env = get_environment\# read text file from local files system localLiens = env.read_text("file:#/path/to/my/textfile")\# read text file from a HDFS running at nnHost:nnPort hdfsLines = env.read_text("hdfs://nnHost:nnPort/path/to/my/textfile")\# read a CSV file with three fields, schema defined using constants defined in flink.plan.Constants csvInput = env.read_csv("hdfs:///the/CSV/file", (INT, STRING, DOUBLE))\# create a set from some given elements values = env.from_elements("Foo", "bar", "foobar", "fubar")\# generate a number sequence numbers = env.generate_sequence(1, 10000000)
數據接收器
-
數據接收器可以接受DataSet,并用來存儲和返回它們:
-
write_text() –按行以String形式寫入數據。可通過對每個數據項調用str()函數獲取String。
-
write_csv(…) – 將元組寫入逗號分隔數值文件。行數和數據字段均可配置。每個字段的值可通過對數據項調用str()方法得到。
-
output() – 在標準輸出上打印每個數據項的str()字符串。
一個數據集可以同時作為多個操作的輸入數據。程序可以在寫入或打印一個數據集的同時,對其進行其他的變換操作。 -
標準數據池相關方法示例如下:
write DataSet to a file on the local file system textData.write_text("file:///my/result/on/localFS")write DataSet to a file on a HDFS with a namenode running at nnHost:nnPort textData.write_text("hdfs://nnHost:nnPort/my/result/on/localFS")write DataSet to a file and overwrite the file if it exists textData.write_text("file:///my/result/on/localFS", WriteMode.OVERWRITE)tuples as lines with pipe as the separator "a|b|c" values.write_csv("file:///path/to/the/result/file", line_delimiter="\n", field_delimiter="|")this writes tuples in the text formatting "(a, b, c)", rather than as CSV lines values.write_text("file:///path/to/the/result/file")
廣播變量
-
使用廣播變量,能夠在使用普通輸入參數的基礎上,使得一個數據集同時被多個并行的操作所使用。這對于實現輔助數據集,或者是基于數據的參數化法非常有用。這樣,數據集就可以以集合的形式被訪問。
-
注冊廣播變量:廣播數據集可通過調用with_broadcast_set(DataSet,String)函數,按照名字注冊廣播變量。
-
訪問廣播變量:通過對調用self.context.get_broadcast_variable(String)可獲取廣播變量。
class MapperBcv(MapFunction): def map(self, value):factor = self.context.get_broadcast_variable("bcv")[0][0]return value * factor# 1. The DataSet to be broadcasted toBroadcast = env.from_elements(1, 2, 3) data = env.from_elements("a", "b")# 2. Broadcast the DataSet data.map(MapperBcv()).with_broadcast_set("bcv", toBroadcast) -
確保在進行廣播變量的注冊和訪問時,應當采用相同的名字(示例中的”bcv”)。
注意:由于廣播變量的內容被保存在每個節點的內部存儲中,不適合包含過多內容。一些簡單的參數,例如標量值,可簡單地通過參數化rich function來實現。
并行執行
- 該章節將描述如何在Flink中配置程序的并行執行。一個Flink程序可以包含多個任務(操作,數據源和數據池)。一個任務可以被劃分為多個可并行運行的部分,每個部分處理輸入數據的一個子集。并行運行的實例數量被稱作它的并行性或并行度degree of parallelism (DOP)。
在Flink中可以為任務指定不同等級的并行度。
運行環境級
-
Flink程序可在一個運行環境execution environment的上下文中運行。一個運行環境為其中運行的所有操作,數據源和數據池定義了一個默認的并行度。運行環境的并行度可通過對某個操作的并行度進行配置來修改。
一個運行環境的并行度可通過調用set_parallelism()方法來指定。例如,為了將WordCount示例程序中的所有操作,數據源和數據池的并行度設置為3,可以通過如下方式設置運行環境的默認并行度。
env = get_environment() env.set_parallelism(3)text.flat_map(lambda x,c: x.lower().split()) \.group_by(1) \.reduce_group(Adder(), combinable=True) \.output()env.execute()
系統級
- 通過設置位于./conf/flink-conf.yaml.文件的parallelism.default屬性,改變系統級的默認并行度,可設置所有運行環境的默認并行度。具體細節可查閱Configuration文檔。
執行方法
-
為了在Flink中運行計劃任務,到Flink目錄下,運行/bin文件夾下的pyflink.sh腳本。對于python2.7版本,運行pyflink2.sh;對于python3.4版本,運行pyflink3.sh。包含計劃任務的腳本應當作為第一個輸入參數,其后可添加一些另外的python包,最后,在“-”之后,輸入其他附加參數。
./bin/pyflink<2/3>.sh <Script>[ <pathToPackage1>[ <pathToPackageX]][ - <param1>[ <paramX>]]
總結
以上是生活随笔為你收集整理的《Pyflink》Flink集群安装,Python+Flink调研的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Boosting集合算法详解(一)
- 下一篇: 基于应用日志的扫描器检测实践