scala常用spark的pom.xml与读取csv为rdd到最终join操作
這個問題其實本來沒啥難度,不值得記錄,
但是因為join需要的是Array((),(),())這樣的格式,
而不是Array(Array(),Array(),Array())這樣的格式,讓問題瞬間有了一點點難度.
---------------------------------------------------------------------------------------------------------------------------------------------------------
import org.apache.spark.{SparkConf, SparkContext} import org.apache.log4j.{Level, Logger} //注意這個代碼連接的是真實集群, 每次運行前都要跑一次mvn package,然后再在intellij中點擊runobject hello {def main(args: Array[String]){Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)Logger.getLogger("org.apache.spark").setLevel(Level.WARN)Logger.getLogger("org.project-spark").setLevel(Level.WARN)import org.apache.spark.SparkConfval package_path="/home/appleyuchi/桌面/spark_success/Spark數據傾斜處理/Java/hello/target/hello-1.0-SNAPSHOT.jar"val conf = new SparkConf().setMaster("spark://Desktop:7077").setJars(Array[String](package_path)).setAppName("TestSpark")var sc = new SparkContext(conf)// 下面開始讀取數據var rdd1=sc.textFile("hdfs://Desktop:9000/rdd1.csv").map(line=>line.split(",")).map{ case Array(f1,f2) => (f1,f2)}var rdd2=sc.textFile("hdfs://Desktop:9000/rdd2.csv").map(line=>line.split(",")).map{ case Array(f1,f2) => (f1,f2)}println("---------------------rdd1-----------------------")println(rdd1.join(rdd2).collect().mkString)} }------------------------------------------------------pom.xml-----------------------------------------------------------------------------------
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>hello</groupId><artifactId>hello</artifactId><version>1.0-SNAPSHOT</version><dependencies><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>3.0.0</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.12</artifactId><version>3.0.0</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.12</artifactId><version>3.0.0</version><scope>provided</scope></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-mllib_2.12</artifactId><version>3.0.0</version><scope>runtime</scope></dependency><!-- https://mvnrepository.com/artifact/org.apache.spark/spark-graphx --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-graphx_2.12</artifactId><version>3.0.0</version></dependency></dependencies></project>---------------------------------------------------------------------------------------------------------------------------------------------------------
scala> var csv1 = sc.textFile("/file1.csv")
csv1: org.apache.spark.rdd.RDD[String] = /file1.csv MapPartitionsRDD[20] at textFile at <console>:24
scala> var data1 = csv1.map(line => line.split(",")).map{case Array(f1,f2,f3)=>(f1,f2,f3)}
data1: org.apache.spark.rdd.RDD[(String, String, String)] = MapPartitionsRDD[22] at map at <console>:25
scala> data1.collect()
res21: Array[(String, String, String)] = Array((user,topic,yuchi), (om,scala,yuchi), (daniel,spark,8099), (3754978,spark,199))
-------------------------------------------------------------------附錄-------------------------------------------------------------------------------------
全部上傳到hdfs系統
hdfs dfs -put file1.csv /
hdfs dfs -put file2.csv /
--------------------------注意只能是兩列數據,不能是三列--------------------------------------------
rdd1.csv內容
001,hello
001,hello
001,hello
001,hello
--------------------
rdd2.csv內容
001,world
001,world
001,world
001,world
----------------------------------------------------------------------------------------------------------------------------------------------------------------
Reference:
[1]How to convert an Array to a Tuple?
總結
以上是生活随笔為你收集整理的scala常用spark的pom.xml与读取csv为rdd到最终join操作的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 二百五由来(什么叫做二百五)
- 下一篇: 结扎对女性的影响(结扎了复通手术成功率是