如何从 0 到 1 开发 PyFlink API 作业
Apache Flink 作為當前最流行的流批統一的計算引擎,在實時 ETL、事件處理、數據分析、CEP、實時機器學習等領域都有著廣泛的應用。從 Flink 1.9 開始,Apache Flink 社區開始在原有的 Java、Scala、SQL 等編程語言的基礎之上,提供對于 Python 語言的支持。經過 Flink 1.9 ~ 1.12 以及即將發布的 1.13 版本的多個版本的開發,目前 PyFlink API 的功能已經日趨完善,可以滿足絕大多數情況下 Python 用戶的需求。接下來,我們以 Flink 1.12 為例,介紹如何使用 Python 語言,通過 PyFlink API 來開發 Flink 作業。內容包括:
GitHub 地址
https://github.com/apache/flink
歡迎大家給 Flink 點贊送 star~
環境準備
第一步:安裝 Python
PyFlink 僅支持 Python 3.5+,您首先需要確認您的開發環境是否已安裝了 Python 3.5+,如果沒有的話,首先需要安裝 Python 3.5+。
第二步:安裝 JDK
我們知道 Flink 的運行時是使用 Java 語言開發的,所以為了執行 Flink 作業,您還需要安裝 JDK。Flink 提供了對于 JDK 8 以及 JDK 11 的全面支持,您需要確認您的開發環境中是否已經安裝了上述版本的 JDK,如果沒有的話,首先需要安裝 JDK。
第三步:安裝 PyFlink
接下來需要安裝 PyFlink,可以通過以下命令進行安裝:
# 創建 Python 虛擬環境 python3 -m pip install virtualenv virtualenv -p `which python3` venv# 使用上述創建的 Python 虛擬環境 ./venv/bin/activate# 安裝 PyFlink 1.12 python3 -m pip install apache-flink==1.12.2作業開發
PyFlink Table API 作業
我們首先介紹一下如何開發 PyFlink Table API 作業。
■ 1)創建 TableEnvironment 對象
對于 Table API 作業來說,用戶首先需要創建一個 TableEnvironment 對象。以下示例定義了一個 TableEnvironment 對象,使用該對象的定義的作業,運行在流模式,且使用 blink planner 執行。
env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build() t_env = StreamTableEnvironment.create(environment_settings=env_settings)■ 2)配置作業的執行參數
可以通過以下方式,配置作業的執行參數。以下示例將作業的默認并發度設置為4。
t_env.get_config().get_configuration().set_string('parallelism.default', '4')■ 3)創建數據源表
接下來,需要為作業創建一個數據源表。PyFlink 中提供了多種方式來定義數據源表。
方式一:from_elements
PyFlink 支持用戶從一個給定列表,創建源表。以下示例定義了包含了 3 行數據的表:[("hello", 1), ("world", 2), ("flink", 3)],該表有 2 列,列名分別為 a 和 b,類型分別為 VARCHAR 和 BIGINT。
tab = t_env.from_elements([("hello", 1), ("world", 2), ("flink", 3)], ['a', 'b'])說明:
- 這種方式通常用于測試階段,可以快速地創建一個數據源表,驗證作業邏輯
- from_elements 方法可以接收多個參數,其中第一個參數用于指定數據列表,列表中的每一個元素必須為 tuple 類型;第二個參數用于指定表的 schema
方式二:DDL
除此之外,數據也可以來自于一個外部的數據源。以下示例定義了一個名字為my_source,類型為 datagen 的表,表中有兩個類型為 VARCHAR 的字段。
t_env.execute_sql("""CREATE TABLE my_source (a VARCHAR,b VARCHAR) WITH ('connector' = 'datagen','number-of-rows' = '10')""")tab = t_env.from_path('my_source')說明:
- 通過 DDL 的方式來定義數據源表是目前最推薦的方式,且所有 Java Table API & SQL 中支持的 connector,都可以通過 DDL 的方式,在 PyFlink Table API 作業中使用,詳細的 connector 列表請參見 Flink 官方文檔 [1]。
- 當前僅有部分 connector 的實現包含在 Flink 官方提供的發行包中,比如 FileSystem,DataGen、Print、BlackHole 等,大部分 connector 的實現當前沒有包含在 Flink 官方提供的發行包中,比如 Kafka、ES 等。針對沒有包含在 Flink 官方提供的發行包中的 connector,如果需要在 PyFlink 作業中使用,用戶需要顯式地指定相應 FAT JAR,比如針對 Kafka,需要使用 JAR 包 [2],JAR 包可以通過如下方式指定:
方式三:catalog
hive_catalog = HiveCatalog("hive_catalog") t_env.register_catalog("hive_catalog", hive_catalog) t_env.use_catalog("hive_catalog")# 假設hive catalog中已經定義了一個名字為source_table的表 tab = t_env.from_path('source_table')這種方式和 DDL 的方式類似,只不過表的定義事先已經注冊到了 catalog 中了,不需要在作業中重新再定義一遍了。
■ 4)定義作業的計算邏輯
方式一:通過 Table API
得到 source 表之后,接下來就可以使用 Table API 中提供的各種操作,定義作業的計算邏輯,對表進行各種變換了,比如:
@udf(result_type=DataTypes.STRING()) def sub_string(s: str, begin: int, end: int):return s[begin:end]transformed_tab = tab.select(sub_string(col('a'), 2, 4))方式二:通過 SQL 語句
除了可以使用 Table API 中提供的各種操作之外,也可以直接通過 SQL 語句來對表進行變換,比如上述邏輯,也可以通過 SQL 語句來實現:
t_env.create_temporary_function("sub_string", sub_string) transformed_tab = t_env.sql_query("SELECT sub_string(a, 2, 4) FROM %s" % tab)說明:
- TableEnvironment 中提供了多種方式用于執行 SQL 語句,其用途略有不同:
■ 5)查看執行計劃
用戶在開發或者調試作業的過程中,可能需要查看作業的執行計劃,可以通過如下方式。
方式一:Table.explain
比如,當我們需要知道 transformed_tab 當前的執行計劃時,可以執行:print(transformed_tab.explain()),可以得到如下輸出:
== Abstract Syntax Tree == LogicalProject(EXPR$0=[sub_string($0, 2, 4)]) +- LogicalTableScan(table=[[default_catalog, default_database, Unregistered_TableSource_582508460, source: [PythonInputFormatTableSource(a)]]])== Optimized Logical Plan == PythonCalc(select=[sub_string(a, 2, 4) AS EXPR$0]) +- LegacyTableSourceScan(table=[[default_catalog, default_database, Unregistered_TableSource_582508460, source: [PythonInputFormatTableSource(a)]]], fields=[a])== Physical Execution Plan == Stage 1 : Data Sourcecontent : Source: PythonInputFormatTableSource(a)Stage 2 : Operatorcontent : SourceConversion(table=[default_catalog.default_database.Unregistered_TableSource_582508460, source: [PythonInputFormatTableSource(a)]], fields=[a])ship_strategy : FORWARDStage 3 : Operatorcontent : StreamExecPythonCalcship_strategy : FORWARD方式二:TableEnvironment.explain_sql
方式一適用于查看某一個 table 的執行計劃,有時候并沒有一個現成的 table 對象可用,比如:
print(t_env.explain_sql("INSERT INTO my_sink SELECT * FROM %s " % transformed_tab))其執行計劃如下所示:
== Abstract Syntax Tree == LogicalSink(table=[default_catalog.default_database.my_sink], fields=[EXPR$0]) +- LogicalProject(EXPR$0=[sub_string($0, 2, 4)])+- LogicalTableScan(table=[[default_catalog, default_database, Unregistered_TableSource_1143388267, source: [PythonInputFormatTableSource(a)]]])== Optimized Logical Plan == Sink(table=[default_catalog.default_database.my_sink], fields=[EXPR$0]) +- PythonCalc(select=[sub_string(a, 2, 4) AS EXPR$0])+- LegacyTableSourceScan(table=[[default_catalog, default_database, Unregistered_TableSource_1143388267, source: [PythonInputFormatTableSource(a)]]], fields=[a])== Physical Execution Plan == Stage 1 : Data Sourcecontent : Source: PythonInputFormatTableSource(a)Stage 2 : Operatorcontent : SourceConversion(table=[default_catalog.default_database.Unregistered_TableSource_1143388267, source: [PythonInputFormatTableSource(a)]], fields=[a])ship_strategy : FORWARDStage 3 : Operatorcontent : StreamExecPythonCalcship_strategy : FORWARDStage 4 : Data Sinkcontent : Sink: Sink(table=[default_catalog.default_database.my_sink], fields=[EXPR$0])ship_strategy : FORWARD■ 6)寫出結果數據
方式一:通過 DDL
和創建數據源表類似,也可以通過 DDL 的方式來創建結果表。
t_env.execute_sql("""CREATE TABLE my_sink (`sum` VARCHAR) WITH ('connector' = 'print')""")table_result = transformed_tab.execute_insert('my_sink')說明:
- 當使用 print 作為 sink 時,作業結果會打印到標準輸出中。如果不需要查看輸出,也可以使用 blackhole 作為 sink。
方式二:collect
也可以通過 collect 方法,將 table 的結果收集到客戶端,并逐條查看。
table_result = transformed_tab.execute() with table_result.collect() as results:for result in results:print(result)說明:
- 該方式可以方便地將 table 的結果收集到客戶端并查看
- 由于數據最終會收集到客戶端,所以最好限制一下數據條數,比如:
transformed_tab.limit(10).execute(),限制只收集 10 條數據到客戶端
方式三:to_pandas
也可以通過 to_pandas 方法,將 table 的結果轉換成 pandas.DataFrame 并查看。
result = transformed_tab.to_pandas() print(result)可以看到如下輸出:
_c0 0 32 1 e6 2 8b 3 be 4 4f 5 b4 6 a6 7 49 8 35 9 6b說明:
- 該方式與 collect 類似,也會將 table 的結果收集到客戶端,所以最好限制一下結果數據的條數
■ 7)總結
完整的作業示例如下:
from pyflink.table import DataTypes, EnvironmentSettings, StreamTableEnvironment from pyflink.table.expressions import col from pyflink.table.udf import udfdef table_api_demo():env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()t_env = StreamTableEnvironment.create(environment_settings=env_settings)t_env.get_config().get_configuration().set_string('parallelism.default', '4')t_env.execute_sql("""CREATE TABLE my_source (a VARCHAR,b VARCHAR) WITH ('connector' = 'datagen','number-of-rows' = '10')""")tab = t_env.from_path('my_source')@udf(result_type=DataTypes.STRING())def sub_string(s: str, begin: int, end: int):return s[begin:end]transformed_tab = tab.select(sub_string(col('a'), 2, 4))t_env.execute_sql("""CREATE TABLE my_sink (`sum` VARCHAR) WITH ('connector' = 'print')""")table_result = transformed_tab.execute_insert('my_sink')# 1)等待作業執行結束,用于local執行,否則可能作業尚未執行結束,該腳本已退出,會導致minicluster過早退出# 2)當作業通過detach模式往remote集群提交時,比如YARN/Standalone/K8s等,需要移除該方法table_result.wait()if __name__ == '__main__':table_api_demo()執行結果如下:
4> +I(a1) 3> +I(b0) 2> +I(b1) 1> +I(37) 3> +I(74) 4> +I(3d) 1> +I(07) 2> +I(f4) 1> +I(7f) 2> +I(da)PyFlink DataStream API 作業
■ 1)創建 StreamExecutionEnvironment 對象
對于 DataStream API 作業來說,用戶首先需要定義一個 StreamExecutionEnvironment 對象。
env = StreamExecutionEnvironment.get_execution_environment()■ 2)配置作業的執行參數
可以通過以下方式,配置作業的執行參數。以下示例將作業的默認并發度設置為4。
env.set_parallelism(4)■ 3)創建數據源
接下來,需要為作業創建一個數據源。PyFlink 中提供了多種方式來定義數據源。
方式一:from_collection
PyFlink 支持用戶從一個列表創建源表。以下示例定義了包含了 3 行數據的表:[(1, 'aaa|bb'), (2, 'bb|a'), (3, 'aaa|a')],該表有 2 列,列名分別為 a 和 b,類型分別為 VARCHAR 和 BIGINT。
ds = env.from_collection(collection=[(1, 'aaa|bb'), (2, 'bb|a'), (3, 'aaa|a')],type_info=Types.ROW([Types.INT(), Types.STRING()]))說明:
- 這種方式通常用于測試階段,可以方便地創建一個數據源
- from_collection 方法可以接收兩個參數,其中第一個參數用于指定數據列表;第二個參數用于指定數據的類型
方式二:使用 PyFlink DataStream API 中定義的 connector
此外,也可以使用 PyFlink DataStream API 中已經支持的 connector,需要注意的是,1.12 中僅提供了 Kafka connector 的支持。
deserialization_schema = JsonRowDeserializationSchema.builder() \.type_info(type_info=Types.ROW([Types.INT(), Types.STRING()])).build()kafka_consumer = FlinkKafkaConsumer(topics='test_source_topic',deserialization_schema=deserialization_schema,properties={'bootstrap.servers': 'localhost:9092', 'group.id': 'test_group'})ds = env.add_source(kafka_consumer)說明:
- Kafka connector 當前沒有包含在 Flink 官方提供的發行包中,如果需要在PyFlink 作業中使用,用戶需要顯式地指定相應 FAT JAR [2],JAR 包可以通過如下方式指定:
- 即使是 PyFlink DataStream API 作業,也推薦使用 Table & SQL connector 中打包出來的 FAT JAR,可以避免遞歸依賴的問題。
方式三:使用 PyFlink Table API 中定義的 connector
以下示例定義了如何將 Table & SQL 中支持的 connector 用于 PyFlink DataStream API 作業。
t_env = StreamTableEnvironment.create(stream_execution_environment=env)t_env.execute_sql("""CREATE TABLE my_source (a INT,b VARCHAR) WITH ('connector' = 'datagen','number-of-rows' = '10')""")ds = t_env.to_append_stream(t_env.from_path('my_source'),Types.ROW([Types.INT(), Types.STRING()]))說明:
- 由于當前 PyFlink DataStream API 中 built-in 支持的 connector 種類還比較少,推薦通過這種方式來創建 PyFlink DataStream API 作業中使用的數據源表,這樣的話,所有 PyFlink Table API 中可以使用的 connector,都可以在 PyFlink DataStream API 作業中使用。
- 需要注意的是,TableEnvironment 需要通過以下方式創建 StreamTableEnvironment.create(stream_execution_environment=env),以使得 PyFlink DataStream API 與 PyFlink Table API 共享同一個 StreamExecutionEnvironment 對象。
■ 4)定義計算邏輯
生成數據源對應的 DataStream 對象之后,接下來就可以使用 PyFlink DataStream API 中定義的各種操作,定義計算邏輯,對 DataStream 對象進行變換了,比如:
def split(s):splits = s[1].split("|")for sp in splits:yield s[0], spds = ds.map(lambda i: (i[0] + 1, i[1])) \.flat_map(split) \.key_by(lambda i: i[1]) \.reduce(lambda i, j: (i[0] + j[0], i[1]))■ 5)寫出結果數據
方式一:print
可以調用 DataStream 對象上的 print 方法,將 DataStream 的結果打印到標準輸出中,比如:
ds.print()方式二:使用 PyFlink DataStream API 中定義的 connector
可以直接使用 PyFlink DataStream API 中已經支持的 connector,需要注意的是,1.12 中提供了對于 FileSystem、JDBC、Kafka connector 的支持,以 Kafka 為例:
serialization_schema = JsonRowSerializationSchema.builder() \.with_type_info(type_info=Types.ROW([Types.INT(), Types.STRING()])).build()kafka_producer = FlinkKafkaProducer(topic='test_sink_topic',serialization_schema=serialization_schema,producer_config={'bootstrap.servers': 'localhost:9092', 'group.id': 'test_group'})ds.add_sink(kafka_producer)說明:
- JDBC、Kafka connector 當前沒有包含在 Flink 官方提供的發行包中,如果需要在 PyFlink 作業中使用,用戶需要顯式地指定相應 FAT JAR,比如 Kafka connector 可以使用 JAR 包 [2],JAR 包可以通過如下方式指定:
- 推薦使用 Table & SQL connector 中打包出來的 FAT JAR,可以避免遞歸依賴的問題。
方式三:使用 PyFlink Table API 中定義的 connector
以下示例展示了如何將 Table & SQL 中支持的 connector,用作 PyFlink DataStream API 作業的 sink。
# 寫法一:ds類型為Types.ROW def split(s):splits = s[1].split("|")for sp in splits:yield Row(s[0], sp)ds = ds.map(lambda i: (i[0] + 1, i[1])) \.flat_map(split, Types.ROW([Types.INT(), Types.STRING()])) \.key_by(lambda i: i[1]) \.reduce(lambda i, j: Row(i[0] + j[0], i[1]))# 寫法二:ds類型為Types.TUPLE def split(s):splits = s[1].split("|")for sp in splits:yield s[0], spds = ds.map(lambda i: (i[0] + 1, i[1])) \.flat_map(split, Types.TUPLE([Types.INT(), Types.STRING()])) \.key_by(lambda i: i[1]) \.reduce(lambda i, j: (i[0] + j[0], i[1]))# 將ds寫出到sink t_env.execute_sql("""CREATE TABLE my_sink (a INT,b VARCHAR) WITH ('connector' = 'print')""")table = t_env.from_data_stream(ds) table_result = table.execute_insert("my_sink")說明:
- 需要注意的是,t_env.from_data_stream(ds) 中的 ds 對象的 result type 類型必須是復合類型 Types.ROW 或者 Types.TUPLE,這也就是為什么需要顯式聲明作業計算邏輯中 flat_map 操作的 result 類型
- 作業的提交,需要通過 PyFlink Table API 中提供的作業提交方式進行提交
- 由于當前 PyFlink DataStream API 中支持的 connector 種類還比較少,推薦通過這種方式來定義 PyFlink DataStream API 作業中使用的數據源表,這樣的話,所有 PyFlink Table API 中可以使用的 connector,都可以作為 PyFlink DataStream API 作業的 sink。
■ 7)總結
完整的作業示例如下:
方式一(適合調試):
from pyflink.common.typeinfo import Types from pyflink.datastream import StreamExecutionEnvironmentdef data_stream_api_demo():env = StreamExecutionEnvironment.get_execution_environment()env.set_parallelism(4)ds = env.from_collection(collection=[(1, 'aaa|bb'), (2, 'bb|a'), (3, 'aaa|a')],type_info=Types.ROW([Types.INT(), Types.STRING()]))def split(s):splits = s[1].split("|")for sp in splits:yield s[0], spds = ds.map(lambda i: (i[0] + 1, i[1])) \.flat_map(split) \.key_by(lambda i: i[1]) \.reduce(lambda i, j: (i[0] + j[0], i[1]))ds.print()env.execute()if __name__ == '__main__':data_stream_api_demo()執行結果如下:
3> (2, 'aaa') 3> (2, 'bb') 3> (6, 'aaa') 3> (4, 'a') 3> (5, 'bb') 3> (7, 'a')方式二(適合線上作業):
from pyflink.common.typeinfo import Types from pyflink.datastream import StreamExecutionEnvironment from pyflink.table import StreamTableEnvironmentdef data_stream_api_demo():env = StreamExecutionEnvironment.get_execution_environment()t_env = StreamTableEnvironment.create(stream_execution_environment=env)env.set_parallelism(4)t_env.execute_sql("""CREATE TABLE my_source (a INT,b VARCHAR) WITH ('connector' = 'datagen','number-of-rows' = '10')""")ds = t_env.to_append_stream(t_env.from_path('my_source'),Types.ROW([Types.INT(), Types.STRING()]))def split(s):splits = s[1].split("|")for sp in splits:yield s[0], spds = ds.map(lambda i: (i[0] + 1, i[1])) \.flat_map(split, Types.TUPLE([Types.INT(), Types.STRING()])) \.key_by(lambda i: i[1]) \.reduce(lambda i, j: (i[0] + j[0], i[1]))t_env.execute_sql("""CREATE TABLE my_sink (a INT,b VARCHAR) WITH ('connector' = 'print')""")table = t_env.from_data_stream(ds)table_result = table.execute_insert("my_sink")# 1)等待作業執行結束,用于local執行,否則可能作業尚未執行結束,該腳本已退出,會導致minicluster過早退出# 2)當作業通過detach模式往remote集群提交時,比如YARN/Standalone/K8s等,需要移除該方法table_result.wait()if __name__ == '__main__':data_stream_api_demo()作業提交
Flink 提供了多種作業部署方式,比如 local、standalone、YARN、K8s 等,PyFlink 也支持上述作業部署方式,請參考 Flink 官方文檔 [3],了解更多詳細信息。
local
說明:使用該方式執行作業時,會啟動一個 minicluster,作業會提交到minicluster 中執行,該方式適合作業開發階段。
示例:python3 table_api_demo.py
standalone
說明:使用該方式執行作業時,作業會提交到一個遠端的 standalone 集群。
示例:
./bin/flink run --jobmanager localhost:8081 --python table_api_demo.py
YARN Per-Job
說明:使用該方式執行作業時,作業會提交到一個遠端的 YARN 集群。
示例:
./bin/flink run --target yarn-per-job --python table_api_demo.py
K8s application mode
說明:使用該方式執行作業時,作業會提交到 K8s 集群,以 application mode 的方式執行。
示例:
./bin/flink run-application \
--target kubernetes-application \ --parallelism 8 \ -Dkubernetes.cluster-id**=**<ClusterId> \ -Dtaskmanager.memory.process.size**=**4096m \ -Dkubernetes.taskmanager.cpu**=**2 \ -Dtaskmanager.numberOfTaskSlots**=**4 \ -Dkubernetes.container.image**=**<PyFlinkImageName> \--pyModule table_api_demo \
--pyFiles file:///path/to/table_api_demo.py參數說明
除了上面提到的參數之外,通過 flink run 提交的時候,還有其它一些和 PyFlink 作業相關的參數。
| -py / --python | 指定作業的入口文件 | -py file:///path/to/table_api_demo.py |
| -pym / --pyModule | 指定作業的 entry module,功能和--python類似,可用于當作業的 Python 文件為 zip 包,無法通過--python 指定時,相比--python 來說,更通用 | -pym table_api_demo -pyfs file:///path/to/table_api_demo.py |
| -pyfs / --pyFiles | 指定一個到多個 Python 文件(.py/.zip等,逗號分割),這些 Python 文件在作業執行的時候,會放到 Python 進程的 PYTHONPATH 中,可以在 Python 自定義函數中訪問到 | -pyfs file:///path/to/table_api_demo.py,file:///path/to/deps.zip |
| -pyarch / --pyArchives | 指定一個到多個存檔文件(逗號分割),這些存檔文件,在作業執行的時候,會被解壓之后,放到 Python 進程的 workspace 目錄,可以通過相對路徑的方式進行訪問 | -pyarch file:///path/to/venv.zip |
| -pyexec / --pyExecutable | 指定作業執行的時候,Python 進程的路徑 | -pyarch file:///path/to/venv.zip -pyexec venv.zip/venv/bin/python3 |
| -pyreq / --pyRequirements | 指定 requirements 文件,requirements 文件中定義了作業的依賴 | -pyreq requirements.txt |
問題排查
當我們剛剛上手 PyFlink 作業開發的時候,難免會遇到各種各樣的問題,學會如何排查問題是非常重要的。接下來,我們介紹一些常見的問題排查手段。
client 端異常輸出
PyFlink 作業也遵循 Flink 作業的提交方式,作業首先會在 client 端編譯成 JobGraph,然后提交到 Flink 集群執行。如果作業編譯有問題,會導致在 client 端提交作業的時候就拋出異常,此時可以在 client 端看到類似這樣的輸出:
Traceback (most recent call last):File "/Users/dianfu/code/src/github/pyflink-usecases/datastream_api_demo.py", line 50, in <module>data_stream_api_demo()File "/Users/dianfu/code/src/github/pyflink-usecases/datastream_api_demo.py", line 45, in data_stream_api_demotable_result = table.execute_insert("my_")File "/Users/dianfu/venv/pyflink-usecases/lib/python3.8/site-packages/pyflink/table/table.py", line 864, in execute_insertreturn TableResult(self._j_table.executeInsert(table_path, overwrite))File "/Users/dianfu/venv/pyflink-usecases/lib/python3.8/site-packages/py4j/java_gateway.py", line 1285, in __call__return_value = get_return_value(File "/Users/dianfu/venv/pyflink-usecases/lib/python3.8/site-packages/pyflink/util/exceptions.py", line 162, in decoraise java_exception pyflink.util.exceptions.TableException: Sink `default_catalog`.`default_database`.`my_` does not existsat org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:247)at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:159)at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:159)at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)at scala.collection.Iterator$class.foreach(Iterator.scala:891)at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)at scala.collection.AbstractIterable.foreach(Iterable.scala:54)at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)at scala.collection.AbstractTraversable.map(Traversable.scala:104)at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:159)at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1329)at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:676)at org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:572)at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)at java.lang.reflect.Method.invoke(Method.java:498)at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)at java.lang.Thread.run(Thread.java:748)Process finished with exit code 1比如上述報錯說明作業中使用的名字為"my_"的表不存在。
TaskManager 日志文件
有些錯誤直到作業運行的過程中才會發生,比如臟數據或者 Python 自定義函數的實現問題等,針對這種錯誤,通常需要查看 TaskManager 的日志文件,比如以下錯誤反映用戶在 Python 自定義函數中訪問的 opencv 庫不存在。
Caused by: java.lang.RuntimeException: Error received from SDK harness for instruction 2: Traceback (most recent call last):File "/Users/dianfu/venv/pyflink-usecases/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 253, in _executeresponse = task()File "/Users/dianfu/venv/pyflink-usecases/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 310, in <lambda>lambda: self.create_worker().do_instruction(request), request)File "/Users/dianfu/venv/pyflink-usecases/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 479, in do_instructionreturn getattr(self, request_type)(File "/Users/dianfu/venv/pyflink-usecases/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 515, in process_bundlebundle_processor.process_bundle(instruction_id))File "/Users/dianfu/venv/pyflink-usecases/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 977, in process_bundleinput_op_by_transform_id[element.transform_id].process_encoded(File "/Users/dianfu/venv/pyflink-usecases/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 218, in process_encodedself.output(decoded_value)File "apache_beam/runners/worker/operations.py", line 330, in apache_beam.runners.worker.operations.Operation.outputFile "apache_beam/runners/worker/operations.py", line 332, in apache_beam.runners.worker.operations.Operation.outputFile "apache_beam/runners/worker/operations.py", line 195, in apache_beam.runners.worker.operations.SingletonConsumerSet.receiveFile "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 71, in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.processFile "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 85, in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.processFile "pyflink/fn_execution/coder_impl_fast.pyx", line 83, in pyflink.fn_execution.coder_impl_fast.DataStreamFlatMapCoderImpl.encode_to_streamFile "/Users/dianfu/code/src/github/pyflink-usecases/datastream_api_demo.py", line 26, in splitimport cv2 ModuleNotFoundError: No module named 'cv2'at org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:177)at org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:157)at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:251)at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33)at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76)at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailableInternal(ServerCallImpl.java:309)at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:292)at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:782)at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)... 1 more說明:
- local 模式下,TaskManager 的 log 位于 PyFlink 的安裝目錄下:site-packages/pyflink/log/,也可以通過如下命令找到:
\>>> import pyflink
\>>> print(pyflink.__path__)
['/Users/dianfu/venv/pyflink-usecases/lib/python3.8/site-packages/pyflink'],則log文件位于/Users/dianfu/venv/pyflink-usecases/lib/python3.8/site-packages/pyflink/log目錄下
自定義日志
有時候,異常日志的內容并不足以幫助我們定位問題,此時可以考慮在 Python 自定義函數中打印一些日志信息。PyFlink 支持用戶在 Python 自定義函數中通過 logging 的方式輸出 log,比如:
def split(s):import logginglogging.info("s: " + str(s))splits = s[1].split("|")for sp in splits:yield s[0], sp通過上述方式,split 函數的輸入參數,會打印到 TaskManager 的日志文件中。
遠程調試
PyFlink 作業,在運行過程中,會啟動一個獨立的 Python 進程執行 Python 自定義函數,所以如果需要調試 Python 自定義函數,需要通過遠程調試的方式進行,可以參見[4],了解如何在 Pycharm 中進行 Python 遠程調試。
1)在 Python 環境中安裝 pydevd-pycharm:
pip install pydevd-pycharm~=203.7717.65
2)在 Python 自定義函數中設置遠程調試參數:
def split(s):import pydevd_pycharmpydevd_pycharm.settrace('localhost', port=6789, stdoutToServer=True, stderrToServer=True)splits = s[1].split("|")for sp in splits:yield s[0], sp3)按照 Pycharm 中遠程調試的步驟,進行操作即可,可以參見[4],也可以參考博客[5]中“代碼調試”部分的介紹。
說明:Python 遠程調試功能只在 Pycharm 的 professional 版才支持。
社區用戶郵件列表
如果通過以上步驟之后,問題還未解決,也可以訂閱 Flink 用戶郵件列表 [6],將問題發送到 Flink 用戶郵件列表。需要注意的是,將問題發送到郵件列表時,盡量將問題描述清楚,最好有可復現的代碼及數據,可以參考一下這個郵件[7]。
總結
在這篇文章中,我們主要介紹了 PyFlink API 作業的環境準備、作業開發、作業提交、問題排查等方面的信息,希望可以幫助用戶使用 Python 語言快速構建一個 Flink 作業,希望對大家有所幫助。接下來,我們會繼續推出 PyFlink 系列文章,幫助 PyFlink 用戶深入了解 PyFlink 中各種功能、應用場景、最佳實踐等。
為此我們推出一個調查問卷,希望大家積極參與這個問卷,幫助我們更好的去整理 PyFlink 相關學習資料。填完問卷后即可參與抽獎,Flink 定制款 Polo 衫送送送!4月30日中午12:00準時開獎哦 ~
引用鏈接
[1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/
[2] https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.11/1.12.0/flink-sql-connector-kafka_2.11-1.12.0.jar
[3] https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/cli.html#submitting-pyflink-jobs
[4] https://www.jetbrains.com/help/pycharm/remote-debugging-with-product.html#remote-debug-config
[5] https://mp.weixin.qq.com/s?__biz=MzIzMDMwNTg3MA==&mid=2247485386&idx=1&sn=da24e5200d72e0627717494c22d0372e&chksm=e8b43eebdfc3b7fdbd10b49e6749cb761b7aa5f8ddc90b34eb3170119a8bbb3ddd7327acb712&scene=178&cur_album_id=1386152464113811456#rd
[6] https://flink.apache.org/community.html#mailing-lists
[7] http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/PyFlink-called-already-closed-and-NullPointerException-td42997.html
活動推薦:
僅需99元即可體驗阿里云基于 Apache Flink 構建的企業級產品-實時計算 Flink 版!點擊下方鏈接了解活動詳情:https://www.aliyun.com/product/bigdata/sc?utm_content=g_1000250506
原文鏈接:https://developer.aliyun.com/article/783823?
版權聲明:本文內容由阿里云實名注冊用戶自發貢獻,版權歸原作者所有,阿里云開發者社區不擁有其著作權,亦不承擔相應法律責任。具體規則請查看《阿里云開發者社區用戶服務協議》和《阿里云開發者社區知識產權保護指引》。如果您發現本社區中有涉嫌抄襲的內容,填寫侵權投訴表單進行舉報,一經查實,本社區將立刻刪除涉嫌侵權內容。總結
以上是生活随笔為你收集整理的如何从 0 到 1 开发 PyFlink API 作业的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 2021年中国云原生用户调查问卷
- 下一篇: 因云而生 | 阿里云发布云服务器操作系统