pyspark汇总小结
20220402
Spark報Total size of serialized results of 12189 tasks is bigger than spark.driver.maxResultSize
https://blog.csdn.net/qq_27600723/article/details/107023574
pyspark讀寫iceberg# code:utf-8
import findspark
findspark.init(r"D:\Python37\Lib\site-packages\pyspark")
這里要指定pyspark的路徑,如果是服務器的話最好用spark所在的pyspark路徑
import os
java8_location = r'D:\Java\jdk1.8.0_301/' # 設置你自己的路徑
os.environ['JAVA_HOME'] = java8_location
from pyspark.sql import SparkSessiondef get_spark():# pyspark 讀iceberg表spark = SparkSession.builder.getOrCreate()spark.conf.set("spark.sql.catalog.iceberg", "org.apache.iceberg.spark.SparkCatalog")spark.conf.set("spark.sql.catalog.iceberg.type", "hive")spark.conf.set("spark.sql.catalog.iceberg.uri", "thrift://192.168.1.54:9083")不同的目標地址,不同的服務器集群,要拷貝對應的兩個hive文件到當地客戶端的pyspar conf文件夾下return sparkif __name__ == '__main__':spark = get_spark()pdf = spark.sql("select shangpgg from iceberg.test.end_spec limit 10")spark.sql("insert into iceberg.test.end_spec values ('aa','bb')")pdf.show()print()
1. 在pyspark下新建conf文件夾,把iceberg下的兩個hive配置文件
放在下面
hdfs-site.xml
hive-site.xm
2. iceberg-spark3-runtime-0.13.1.jar 把這個文件放在pyspark的jars文件夾
Failed to open input stream for file: hdfs://ns1/warehouse/test.db/end_spec/metadata/00025-73e8d58b-c4f1-4c81-b0a8-f1a8a12090b1.metadata.json
org.apache.iceberg.exceptions.RuntimeIOException: Failed to open input stream for file: hdfs://ns1/warehouse/test.db/end_spec/metadata/00025-73e8d58b-c4f1-4c81-b0a8-f1a8a12090b1.metadata.json沒找到hive的兩個配置文件,需要在init里面指定pyspark的路徑即可解決
# findspark.init(r"D:\Python37\Lib\site-packages\pyspark")
od_all = spark.createDataFrame(od)od_all.createOrReplaceTempView('od_all')od_duplicate = spark.sql("select distinct user_id,goods_id,category_second_id from od_all;")od_duplicate.createOrReplaceTempView('od_duplicate')od_goods_group = spark.sql(" select user_id,count(goods_id) goods_nums_total from od_duplicate group by user_id ;")
sql語句中所牽扯的表,需要createOrReplaceTempView創建
執行sql時出現錯誤 extraneous input ';' expecting EOF near '<EOF>'
https://blog.csdn.net/xieganyu3460/article/details/83055935
https://spark.apache.org/docs/latest/api/python/user_guide/pandas_on_spark/types.html?highlight=type
pyspark數據類型
TypeError: field id: Can not merge type <class 'pyspark.sql.types.StringType'> and <class 'pyspark.sql.types.LongType'>https://blog.csdn.net/weixin_40983094/article/details/115630358
# code:utf-8
from pathlib import Pathimport pandas as pd
from pyspark.ml.fpm import FPGrowth
import datetime
import platform
import os
import warnings
warnings.filterwarnings("ignore")
from utils_ import usetime,log_generate
from param_config import configlogger = log_generate(config.log["name"], config.log["date"])sys = platform.system()
if sys == "Windows":PATH = os.path.abspath(str(Path("").absolute())) + "/"
else:PATH = "/home/guanlian_algo_confirm3/"os.environ["JAVA_HOME"] = r"D:\Java\jdk1.8.0_301"t1 = datetime.datetime.now()@usetime
def calculate_fpgrowth(spark, data, total_nums):data = spark.createDataFrame(data)data.createOrReplaceTempView("all_data")part_data = spark.sql("select * from all_data ")all_record = part_data.select("goods_huizong") # 篩選多列all_record.show(5)def transform_to_list(col):per_row = col.split("|") # 傳入的列數據,自動對每行數據進行處理return per_rowall_record = all_record.rdd.map(lambda row: (row["goods_huizong"], transform_to_list(row["goods_huizong"])))all_record = spark.createDataFrame(all_record, ["goods_huizong", "goods_huizong_list"])all_record.show(5)all_record = all_record.select("goods_huizong_list")all_record = all_record.withColumnRenamed("goods_huizong_list", "items")logger.debug()("總數據條數 {}".format(total_nums))fp = FPGrowth(minSupport=0.0001, minConfidence=0.8)fpm = fp.fit(all_record) # 模型擬合fpm.freqItemsets.show(5) # 在控制臺顯示前五條頻繁項集fp_count = fpm.freqItemsets.count()if fp_count == 0:return pd.DataFrame()logger.debug()("*" * 100)logger.debug()("頻繁項條數 {} ".format(fp_count))ass_rule = fpm.associationRules # 強關聯規則ass_rule.show()rule_nums = ass_rule.count()if rule_nums == 0:return pd.DataFrame()logger.debug()("規則條數 {} ".format(rule_nums))ass_rule = ass_rule.select(["antecedent", "consequent", "confidence"])ass_rule.show(5)ass_rule_df = ass_rule.toPandas()ass_rule_df["antecedent_str"] = ass_rule_df["antecedent"].apply(lambda x: str(x))ass_rule_df.sort_values(["antecedent_str", "confidence"], ascending=[True, False], inplace=True)t2 = datetime.datetime.now()logger.debug()("spent ts:", t2 - t1)return ass_rule_df簡單實例
20220314
代碼設置參數比命令行傳參數的級別高,最終用的還是代碼里面設置的參數
py4j.protocol.Py4JJavaError: An error occurred while calling o24.sql.
: org.apache.spark.SparkException: Cannot find catalog plugin class for catalog 'iceberg': org.apache.iceberg.spark.SparkCatalog
需要去iceberg官網下載一個 iceberg-spark-runtime-3.2_2.12-0.13.1.jar包
放在spark的jars下面https://iceberg.apache.org/docs/latest/getting-started/
# code:utf-8
import findspark
import pandas as pd
findspark.init()
from datetime import datetime, date
import re
from pyspark.sql import SparkSession
# from out_udf import outer_udf
# /home/spark-3.1.2-bin-hadoop3.2/bin/spark-submit \
# --master local --py-files /root/bin/python_job/pyspark/out_udf.py hello_spark.py
# from pyspark.sql.functions import pandas_udf
spark = SparkSession.builder.getOrCreate()df = spark.createDataFrame([(1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),(2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),(3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))
], schema='a long, b double, c string, d date, e timestamp')
df.createOrReplaceTempView("t1")# UDF- 匿名函數
spark.udf.register('xtrim', lambda x: re.sub('[ \n\r\t]', '', x), 'string')# UDF 顯式函數
def xtrim2(record):return re.sub('[ \n\r\t]', '', record)# pyspark 讀iceberg表
spark.conf.set("spark.sql.catalog.iceberg", "org.apache.iceberg.spark.SparkCatalog")
spark.conf.set("spark.sql.catalog.iceberg.type", "hive")
spark.conf.set("spark.sql.catalog.iceberg.uri", "thrift://192.168.1.54:9083")spark.udf.register('xtrim2', xtrim2, 'string')# spark.udf.register('outer_udf', outer_udf)if __name__ == '__main__':df.show()spark.sql("select * from t1").show()spark.sql("select xtrim2('測試 數據 你好') ").show()spark.sql("use iceberg").show()spark.sql("show databases").show()pyspark讀取iceberg
from datetime import datetime, date
import re
from pyspark.sql import SparkSession
from out_udf import outer_udf
# /home/spark-3.1.2-bin-hadoop3.2/bin/spark-submit \
# --master local --py-files /root/bin/python_job/pyspark/out_udf.py hello_spark.py
# from pyspark.sql.functions import pandas_udfspark = SparkSession.builder.getOrCreate()df = spark.createDataFrame([(1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),(2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),(3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))
], schema='a long, b double, c string, d date, e timestamp')
df.createOrReplaceTempView("t1")# UDF- 匿名函數
spark.udf.register('xtrim', lambda x: re.sub('[ \n\r\t]', '', x), 'string')# UDF 顯式函數
def xtrim2(record):return re.sub('[ \n\r\t]', '', record)# pyspark 讀iceberg表
spark.conf.set("spark.sql.catalog.iceberg", "org.apache.iceberg.spark.SparkCatalog")
spark.conf.set("spark.sql.catalog.iceberg.type", "hive")
spark.conf.set("spark.sql.catalog.iceberg.uri", "thrift://192.168.1.54:9083")spark.udf.register('xtrim2', xtrim2, 'string')
spark.udf.register('outer_udf', outer_udf)if __name__ == '__main__':df.show()spark.sql("select * from t1").show()spark.sql("select xtrim2('測試 數據 你好') ").show()spark.sql("select outer_udf('測試數據你好') ").show()spark.sql("use iceberg").show()spark.sql("show databases").show()pyspark對iceberg(hive)進行操作
20220311
AttributeError: 'NoneType' object has no attribute 'sc' 解決方法!
把構建spark對象放在循環外面或者臨時建一個sc對象?
spark的本質就是處理數據的代碼換一種語言,另一種表達方式而已
參數調節
把executor數量調小,其他參數值調大,不容易報錯
Spark任務報java.lang.StackOverflowError
https://blog.csdn.net/u010936936/article/details/88363449
Spark:java.io.IOException: No space left on devicehttps://blog.csdn.net/dupihua/article/details/51133551
ass_rule = ass_rule.filter('antecedent_len == 1')ass_rule = ass_rule.filter('consequent_len == 1')
dataframe過濾https://blog.csdn.net/qq_40006058/article/details/88931884
dataframe各種操作
20220310
data = spark.createDataFrame(data) # 普通dataframe轉spark dataframe
data.createOrReplaceTempView("all_data") # 轉sql表進行操作part_data = spark.sql("select * from all_data where user_type= " + str(cus_type)) #sql操作
https://blog.csdn.net/zhurui_idea/article/details/73090951
ass_rule = ass_rule.rdd.map(lambda row:(row["antecedent"],row['consequent'], calculate_len(row['antecedent'])))# rdd執行一下就變成了pipelinerddass_rule = spark.createDataFrame(ass_rule)再createDataFrame一下就變回dataframe
dataframe和RDD的轉換
自動對每行數據進行處理并保留原始其他字段
java.lang.IllegalStateException: Input row doesn't have expected number of values required by the sc
好奇怪字符分裂為列表的時候,必須前面還有其他字段或者會報錯
part_data = spark.sql("select * from all_data where user_type= " + str(cus_type))part_data.show()all_record = part_data.select("user_type",'goods_huizong') # 可以選多個字段all_record = all_record.rdd.map(lambda row: (row['user_type],transform_to_list(row['goods_huizong'])))
后面也可以選多個字段
File "/usr/local/spark-2.4.5-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 875, in subimport__import__(name)
ModuleNotFoundError: No module named 'utils_'與pyspark大數據相關的函數只能放在當前模塊?通過其他模塊導入
會不能識別?
20211231
Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources資源被其他人占用了
20211230
Spark 2.0.x dump a csv file from a dataframe containing one array of type string
https://stackoverflow.com/questions/40426106/spark-2-0-x-dump-a-csv-file-from-a-dataframe-containing-one-array-of-type-string
from pyspark.sql.functions import udf
from pyspark.sql.types import StringTypedef array_to_string(my_list):return '[' + ','.join([str(elem) for elem in my_list]) + ']'array_to_string_udf = udf(array_to_string, StringType())df = df.withColumn('column_as_str', array_to_string_udf(df["column_as_array"]))
df.drop("column_as_array").write.csv(...)
上面的方式有問題 生成的列里面的值是生成式import org.apache.spark.sql.functions._
val dumpCSV = df.withColumn("ArrayOfString", assRule["ArrayOfString"].cast("string")).write.csv(path="/home/me/saveDF")
這一種可以實現
https://www.jianshu.com/p/3735b5e2c540
https://www.jianshu.com/p/80964332b3c4
rdd或者sparkDataframe寫入csv普通的pandas不能寫入hdfs
import findspark
findspark.init()
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.ml.fpm import FPGrowth
import datetime
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf
from tqdm import tqdm
import platform
import os
os.environ['JAVA_HOME']=r'/usr/local/jdk1.8.0_212'
t1 = datetime.datetime.now()
appname = "FPgrowth"
#master = "local[6]"spark = SparkSession.Builder().appName(appname)\.config('spark.num-executors','50')\.config('spark.executor.memory','4g')\.config('spark.executor.cores','3')\.config('spark.driver.memory','1g')\.config('spark.default.parallelism','1000')\.config('spark.storage.memoryFraction','0.5')\.config('spark.shuffle.memoryFraction','0.3')\.config("spark.speculation",'True')\.config("spark.speculation.interval",'100')\.config("spark.speculation.quantile","0.75")\.config("spark.speculation.multiplier",'1.5')\.config("spark.scheduler.mode",'FAIR')\.getOrCreate()
df = spark.read.format("csv"). \option("header", "true") \.load("/data/tb_order_user_sec_type_group.csv")df.createOrReplaceTempView('all_data')
sec_type=spark.sql("select sec_type from all_data ")
https://hub.mybinder.turing.ac.uk/user/apache-spark-sjqwupmp/notebooks/python/docs/source/getting_started/quickstart_ps.ipynb
Quickstart: Pandas API on Spark 快速開始基于pyspark的pandas
part_data=spark.sql("select * from all_data where sec_type= "+ cus_type)
part_data.count() # 統計RDD中的元素個數 行數
lines.first() # 這個RDD中的第一個元素,也就是README.md的第一行
http://spark.apache.org/docs/latest/api/python/getting_started/index.html
pyspark 官方文檔 sparksql和sparkdataframe都參考官方文檔
快速轉化成pandas進行操作
20210831
Windows10:spark報錯。WARN Utils: Service ‘SparkUI‘ could not bind on port 4040. Attempting port 4041.
https://blog.csdn.net/weixin_43748432/article/details/107378033
java.lang.OutOfMemoryError: GC overhead limit exceeded
https://blog.csdn.net/gaokao2011/article/details/51707163調大下面的參數
Spark算子:RDD基本轉換操作(5)–mapPartitions、
http://lxw1234.com/archives/2015/07/348.htm
以分區為單位來map而不是對每個元素單獨map
提高效率
spark = SparkSession.Builder().appName(appname).master(master)\.config('spark.some.config.option0','some_value') \ .config('spark.executor.memory','2g')\ #executor 內存設置.config('spark.executor.cores','2')\ #單個executor的可用的cpu核心數.config("spark.executor.instances",'10')\ #executor的總個數.config('spark.driver.memory','1g')\ # driver 的設置 要比 executor的小?.config('spark.default.parallelism','1000')\ #任務數的設置.config('spark.sql.shuffle.partitions','300')\ #分區數的設置.config("spark.driver.extraJavaOptions","-Xss2048M")\ #jvm相關設置 .config("spark.speculation",'True')\ # 避免卡在某個stage.config("spark.speculation.interval",'100')\ # 避免卡在某個stage.config("spark.speculation.quantile","0.1")\ # 避免卡在某個stage.config("spark.speculation.multiplier",'1')\ # 避免卡在某個stage.config("spark.scheduler.mode",'FAIR')\ # 調度方式.getOrCreate()
參數設置spark = SparkSession.Builder().appName(appname).master(master)\.config('spark.some.config.option0','some_value') \.config('spark.executor.memory','2g')\.config('spark.executor.cores','2')\.config("spark.executor.instances",'10')\.config('spark.driver.memory','3g')\
#這個參數很重要 .config('spark.default.parallelism','1000')\#這個參數很重要 .config('spark.sql.shuffle.partitions','300')\.config("spark.driver.extraJavaOptions","-Xss3072M")\#這個參數很重要 .config("spark.speculation",'True')\.config("spark.speculation.interval",'100')\.config("spark.speculation.quantile","0.1")\.config("spark.speculation.multiplier",'1')\.config("spark.scheduler.mode",'FAIR')\.getOrCreate()總共32gb內存 這個配置能很快的跑出結果
https://blog.csdn.net/lotusws/article/details/52423254
spark master local 參數
然后訪問瀏覽器地址:http://192.168.1.116:4040
sparkui
spark面板地址
配置參數查看
正在跑的stage
pending 還沒跑的stage
completed 完成的stage
12/69 13 一共69個 stage 已經跑了12個 13個正在跑
面板主要看stage 和 executor
時間線 從左到右
job 下面查看具體失敗原因
https://blog.csdn.net/weixin_42340179/article/details/82415085
https://blog.csdn.net/whgyxy/article/details/88779965
在某個stage卡住
spark運行正常,某一個Stage卡住,停止不前異常分析
https://blog.csdn.net/yf_bit/article/details/93610829
重點
https://www.cnblogs.com/candlia/p/11920289.html
https://www.cnblogs.com/xiao02fang/p/13197877.html
影響spark性能的因素
https://www.csdn.net/tags/OtDaUgysMTk3Mi1ibG9n.html
https://www.cnblogs.com/yangsy0915/p/6060532.html
重點
pyspark 配置參數
https://www.javaroad.cn/questions/15705
按行循環
http://www.sofasofa.io/forum_main_post.php?postid=1005461
獲取總行數和總列數
https://blog.csdn.net/qq_40006058/article/details/88822268
PySpark學習 | 常用的 68 個函數 | 解釋 + python代碼
https://blog.csdn.net/qq_29153321/article/details/88648948
RDD操作
https://www.jianshu.com/p/55efdcabd163
pyspark一些簡單常用的函數方法
http://sofasofa.io/forum_main_post.php?postid=1002482
dataframe更改列名
總結
以上是生活随笔為你收集整理的pyspark汇总小结的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 大数据报错问题
- 下一篇: pyspark性能调优参数