Spark On Yarn 运行项目
在spark中,支持4中運行模式:
- Local:往往使用本地開發的時候使用。
- StandAlone:是spark自帶的,如果一個集群是StandAlone模式的話,那么就需要在多臺機器上同時部署Spark環境。缺點是改一個機器的配置,其余所有機器的配置都需要同步才生效。
- YARN:推薦使用YARN,統一使用YARN進行整個集群作業(MR、Spark)的資源調度。
- Mesos
不管使用什么模式,spark應用程序的代碼是一樣的,只需要在提交的時候通過--master參數來指定運行模式即可。
Spark支持可插拔的集群管理模式。
對于Yarn而言,Spark Application僅僅是一個客戶端而已。
Spark On Yarn兩種模式
Driver的運行位置
- Client:Driver運行在Client端(提交Spark作業的機器),client會和請求到的container進行通信,來完成作業的調度和執行,Client是不能退出的。ApplicationMaster的職責就是到Yarn Resource Manager中申請資源。
- Cluster:Driver運行在ApplicationMaster中,Client只要提交完作業就可以關掉,因為作業已經在Yarn中運行了。ApplicationMaster的職責不僅要去Yarn Resource Manager中申請資源,還要處理作業的調度。
運行輸出日志位置
- client:日志信息在控制臺,便于測試
- Cluster:運行在終端,看不到的。因為日志在Driver上,只能通過yarn -logs applicationId來進行查看。
Spark On Yarn執行命令
命令:
spark-submit \ --class org.apache.spark.examples.SparkPi \ --master yarn \ --deploy-mode cluster \ # 省略,則默認是Client模式 --executor-memory 1G \ --num-executors 1 \ --conf spark.sql.shuffle.partitions=100 \ # 設置partition數量,partition代表并行度,默認200 /home/iie4bu/app/spark-2.4.5-bin-2.6.0-cdh5.15.1/examples/jars/spark-examples_2.11-2.4.5.jar \ 4首先啟動Yarn
在~/app/hadoop-2.6.0-cdh5.15.1/sbin中啟動start-all.sh,瀏覽器中訪問:
Client運行模式
執行:
spark-submit \ --class org.apache.spark.examples.SparkPi \ --master yarn \ --executor-memory 1G \ --num-executors 1 \ --conf spark.sql.shuffle.partitions=100 \ # 設置partition數量,partition代表并行度,默認200 /home/iie4bu/app/spark-2.4.5-bin-2.6.0-cdh5.15.1/examples/jars/spark-examples_2.11-2.4.5.jar \ 4出現報錯信息:
Exception in thread "main" org.apache.spark.SparkException: When running with master 'yarn' either HADOOP_CONF_DIR or YARN_CONF_DIR must be set in the environment.如果想運行在Yarn之上,那么就必須設置HADOOP_CONF_DIR 或者是YARN_CONF_DIR
兩種解決方式:
- 方式一:添加環境變量HADOOP_CONF_DIR=/home/iie4bu/app/hadoop-2.6.0-cdh5.15.1/etc
- 方式二:修改spark-env.sh,在這個文件中添加HADOOP_CONF_DIR
修改完之后,再次運行上面的命令,控制臺輸出結果:
Cluster運行模式
執行:
spark-submit \ --class org.apache.spark.examples.SparkPi \ --master yarn-cluster \ --executor-memory 1G \ --num-executors 1 \ --conf spark.sql.shuffle.partitions=100 \ # 設置partition數量,partition代表并行度,默認200 /home/iie4bu/app/spark-2.4.5-bin-2.6.0-cdh5.15.1/examples/jars/spark-examples_2.11-2.4.5.jar \ 4控制臺中沒有輸出結果:
但是可以看到applicationId
通過執行命令:yarn logs -applicationId application_1586230035025_0002,可以看到執行結果:
將之前的項目打包
之前的項目代碼SparkStatCleanJobYARN修改如下:
package cn.ac.iie.logimport org.apache.spark.sql.{SaveMode, SparkSession}/*** 使用Spark完成我們的數據清洗操作, 運行在Yarn之上*/ object SparkStatCleanJobYARN {def main(args: Array[String]): Unit = {if(args.length != 2){println("Usage: SparkStatCleanJobYARN <inputPath> <outputPath>")System.exit(1)}val Array(inputPath, outputPath) = argsval spark = SparkSession.builder().getOrCreate()val acessRDD = spark.sparkContext.textFile(inputPath)// acessRDD.take(10).foreach(println)// RDD => DFval accessDF = spark.createDataFrame(acessRDD.map(x => AccessConvertUtil.parseLog(x)), AccessConvertUtil.struct)// accessDF.printSchema()// accessDF.show(false)accessDF.coalesce(1).write.format("parquet").mode(SaveMode.Overwrite).partitionBy("day").save(outputPath)spark.stop()}}對于TopNStatJobYARN代碼修改如下:
package cn.ac.iie.logimport org.apache.spark.sql.expressions.Window import org.apache.spark.sql.functions._ import org.apache.spark.sql.{DataFrame, SparkSession}import scala.collection.mutable.ListBuffer/*** TopN 統計spark作業, 運行在Yarn上*/ object TopNStatJobYARN {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().config("spark.sql.sources.partitionColumnTypeInference.enabled", "false").getOrCreate()if(args.length != 2){println("Usage: TopNStatJobYARN <inputPath> <day>")System.exit(1)}val Array(inputPath, day) = argsval accessDF = spark.read.format("parquet").load(inputPath)// accessDF.printSchema()// accessDF.show(false)//val day = "20190702"StatDao.deleteDay(day)// 最受歡迎的TopN netTypenetTypeAccessTopNStat(spark, accessDF, day)// 按照地市進行統計TopN課程cityTypeAccessTopNStat(spark, accessDF,day)// 按照流量進行統計netTypeTrafficTopNStat(spark, accessDF, day)spark.stop}/*** 按流量進行統計* @param spark* @param accessDF*/def netTypeTrafficTopNStat(spark: SparkSession, accessDF: DataFrame, day: String): Unit = {val trafficsTopNDF = accessDF.filter(accessDF.col("day") === day && accessDF.col("netType") === "wifi").groupBy("day", "uid").agg(sum("num").as("traffics")).orderBy(desc("traffics"))// .show(false)// 將統計結果寫入到Mysql中try {trafficsTopNDF.foreachPartition(partitionOfRecords => {val list = new ListBuffer[DayNetTypeTrafficsStat]partitionOfRecords.foreach(info => {val day = info.getAs[String]("day")val uid = info.getAs[String]("uid").toLongval traffics = info.getAs[Long]("traffics")list.append(DayNetTypeTrafficsStat(day, uid, traffics))})StatDao.insertDayNetTypeTrafficsAccessTopN(list)})} catch {case e: Exception => e.printStackTrace()}}/*** 按照地市進行統計Top3課程** @param spark* @param accessDF*/def cityTypeAccessTopNStat(spark: SparkSession, accessDF: DataFrame, day: String): Unit = {val cityAccessTopNDF = accessDF.filter(accessDF.col("day") === day && accessDF.col("netType") === "wifi").groupBy("day", "uid", "city").agg(count("uid").as("times")).orderBy(desc("times"))cityAccessTopNDF.show(false)// window 函數在Spark SQL的使用val top3DF = cityAccessTopNDF.select(cityAccessTopNDF("day"), cityAccessTopNDF("uid"), cityAccessTopNDF("city"), cityAccessTopNDF("times"), row_number().over(Window.partitionBy("city").orderBy(cityAccessTopNDF("times").desc)).as("times_rank")).filter("times_rank <= 3")//.show(false)// 將統計結果寫入到Mysql中try {top3DF.foreachPartition(partitionOfRecords => {val list = new ListBuffer[DayCityNetTypeAccessStat]partitionOfRecords.foreach(info => {val day = info.getAs[String]("day")val uid = info.getAs[String]("uid").toLongval city = info.getAs[String]("city")val times = info.getAs[Long]("times")val timesRank = info.getAs[Int]("times_rank")list.append(DayCityNetTypeAccessStat(day, uid, city, times, timesRank))})StatDao.insertDayNetTypeCityAccessTopN(list)})} catch {case e: Exception => e.printStackTrace()}}/*** 最受歡迎的TopN netType** @param spark* @param accessDF*/def netTypeAccessTopNStat(spark: SparkSession, accessDF: DataFrame, day: String): Unit = {val wifiAccessTopNDF = accessDF.filter(accessDF.col("day") === day && accessDF.col("netType") === "wifi").groupBy("day", "uid").agg(count("uid").as("times")).orderBy(desc("times"))wifiAccessTopNDF.show(false)// accessDF.createOrReplaceTempView("access_logs") // val wifiAccessTopNDF = spark.sql("select day,uid,count(1) as times from access_logs where day='20190702' and netType='wifi' group by day,uid order by times desc") // wifiAccessTopNDF.show(false)// 將統計結果寫入到Mysql中try {wifiAccessTopNDF.foreachPartition(partitionOfRecords => {val list = new ListBuffer[DayNetTypeAccessStat]partitionOfRecords.foreach(info => {val day = info.getAs[String]("day")val uid = info.getAs[String]("uid").toLongval times = info.getAs[Long]("times")list.append(DayNetTypeAccessStat(day, uid, times))})StatDao.insertNetTypeAccessTopN(list)})} catch {case e: Exception => e.printStackTrace()}} }appName和master,可以在命令行端進行指定。
因為我們的后臺環境中,已經有相關的spark環境,沒有ipdatabase,mysql等我們環境的依賴庫,所以需要把spark依賴去掉。
對應的pom.xml文件內容如下:
而且需要添加相應的plugin
使用命令mvn assembly:assembly
將jar文件上傳到服務器中
將原始數據上傳到hdfs中
執行命令:
注意:--files的使用
查看我們導入的文件內容:
使用spark-shell:./spark-shell --master local[2] --jars /home/iie4bu/software/mysql-connector-java-5.1.35.jar
說明數據清洗成功。
接下來統計作業:
可以看到mysql中的數據已經成功添加進去了。
總結
以上是生活随笔為你收集整理的Spark On Yarn 运行项目的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: zeppelin安装使用
- 下一篇: ubuntu Mysql乱码解决