pyspark 并行调用udf函数
生活随笔
收集整理的這篇文章主要介紹了
pyspark 并行调用udf函数
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
背景:在 pyspark中udf寫法及其使用中我們使用pyspark定義好的udf逐條處理數據(dataframe)。這篇文章提供一種“并行”調用udf的方法。
from pyspark.sql import Row from pyspark.sql.types import StringType, StructField, StructType# udf定義 def proc_func(row):row_dict = row.asDict(True) # 將rdd轉化成字典格式col_1 = row_dict['col_1']new_col = 'proc_{}'.format(col_1)row_dict['new_col'] = new_colreturn Row(**row_dict)# 定義輸出字段 output_struct = StructType([StructField('col_1', StringType()),StructField('new_col', StringType()) ]) # 調用udf your_df = your_df.rdd.map(proc_func) # 需要將DataFrame轉為rdd# 將處理結果再解析為Dataframe your_df = spark.createDataFrame(your_df.map(lambda row: Row(row.col_1,row.new_col)), output_struct)其他補充,以下是幾個常見的類型
from pyspark.sql.types import IntegerType, StringType, ArrayType# int型 IntegerType()# list型 ArrayType(IntegerType()) # int list ArrayType(StringType()) # string list使用這種方式,會大大減少數據計算時間。
總結
以上是生活随笔為你收集整理的pyspark 并行调用udf函数的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 【C语言】qsort函数的使用和模拟实现
- 下一篇: 华硕Z390P主板-win10-ubun