#scala
val a=sc.parallelize(List("dog","tiger","lion","cat","panther","eagle"))
val b=a.map(x=>(x,1))
b.collect#python
a=sc.parallelize(("dog","tiger","lion","cat","panther","eagle"))
b=a.map(lambda x:(x,1))
b.collect()
演示
加載本地文件
addFile(path, recursive = False)
接收本地文件
通過SparkFiles.get()方法來獲取文件的絕對路徑
addPyFile( path )
加載已存在的文件并調用其中的方法
現在本地創建一個文件:vi sci.py寫入下面兩個方法人,然后保存退出
#sci.py
def sqrt(num):return num * numdef circle_area(r):return 3.14 * sqrt(r)
在pyspark中通過addPyFile加載該文件
#加載預寫入方法的文件
sc.addPyFile("file:///root/sci.py")#導入文件中的方法from sci import circle_area
#創建rdd并使用文件中的方法
sc.parallelize([5,9,21]).map(lambda x : circle_area(x)).collect()
# Pandas DataFrame to Spark DataFrameimport numpy as np
import pandas as pd
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
pandas_df = pd.read_csv("./products.csv", header=None, usecols=[1,3,5])print(pandas_df)# convert to Spark DataFrame
spark_df = spark.createDataFrame(pandas_df)
spark_df.show()
df2 = spark_df.withColumnRenamed("1","id").withColumnRenamed("3","name").withColumnRenamed("5","remark")# convert back to Pandas DataFrame
df2.toPandas()
演示
使用PySpark通過圖形進行數據探索
將數據劃分為多個區間,并統計區間中的數據個數
# 獲取上面演示示例中的第一個df對象
rdd = df.select("LifeExp").rdd.map(lambda x: x[0])#把數據劃為10個區間,并獲得每個區間中的數據個數(countries, bins)= rdd.histogram(10)print(countries)print(bins)#導入圖形生成包import matplotlib.pyplot as plt
import numpy as np plt.hist(rdd.collect(),10)# by default the # of bins is 10
plt.title("Life Expectancy Histogram")
plt.xlabel("Life Expectancy")
plt.ylabel("# of Countries")