Spark-SQL从MySQL中加载数据以及将数据写入到mysql中(Spark Shell方式,Spark SQL程序)
生活随笔
收集整理的這篇文章主要介紹了
Spark-SQL从MySQL中加载数据以及将数据写入到mysql中(Spark Shell方式,Spark SQL程序)
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
1. JDBC
Spark SQL可以通過JDBC從關系型數據庫中讀取數據的方式創建DataFrame,通過對DataFrame一系列的計算后,還可以將數據再寫回關系型數據庫中。
1.1. 從MySQL中加載數據(Spark Shell方式)
1.啟動Spark Shell,必須指定mysql連接驅動jar包
[root@hadoop1 spark-2.1.1-bin-hadoop2.7]# bin/spark-shell --master spark://hadoop1:7077,hadoop2:7077 --jars /home/tuzq/software/spark-2.1.1-bin-hadoop2.7/jars/mysql-connector-java-5.1.38.jar --driver-class-path /home/tuzq/software/spark-2.1.1-bin-hadoop2.7/jars/mysql-connector-java-5.1.38.jar2.從mysql中加載數據
進入bigdata中創建person表:
并初始化數據:
3.執行查詢
scala> jdbcDF.show +---+--------+---+ | id| name|age| +---+--------+---+ | 1|zhangsan| 19| | 2| lisi| 20| | 3| wangwu| 28| | 4| zhaoliu| 26| | 5| tianqi| 55| +---+--------+---+1.2. 將數據寫入到MySQL中(打jar包方式)
1.2.1編寫Spark SQL程序
package cn.toto.sparkimport java.sql.DriverManager import org.apache.spark.rdd.JdbcRDD import org.apache.spark.{SparkConf, SparkContext}/*** Created by toto on 2017/7/11.*/ object JdbcRDDDemo {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("JdbcRDDDemo").setMaster("local[2]")val sc = new SparkContext(conf)val connection = () => {Class.forName("com.mysql.jdbc.Driver").newInstance()DriverManager.getConnection("jdbc:mysql://hadoop10:3306/bigdata","root","123456")}//這個地方沒有讀取數據(數據庫表也用的是person)val jdbcRDD = new JdbcRDD(sc,connection,"SELECT * FROM person where id >= ? AND id <= ?",//這里表示從取數據庫中的第1、2、3、4條數據,然后分兩個區1, 4, 2,r => {val id = r.getInt(1)val code = r.getString(2)(id, code)})//這里相當于是action獲取到數據val jrdd = jdbcRDD.collect()println(jrdd.toBuffer)sc.stop()} }注意在運行的時候使用的還是person這個表,表中的數據如下:
如果是在IDEA中運行程序,程序結果如下:
1.2.2用maven將程序打包
1.2.3.將Jar包提交到spark集群
將bigdata-1.0-SNAPSHOT.jar放到:/home/tuzq/software/sparkdata,如下:
注意在運行執行,要將mysql-connector-java-5.1.38.jar 放到:/home/tuzq/software/spark-2.1.1-bin-hadoop2.7/jars/下
bin/spark-submit --class cn.toto.spark.JdbcRDDDemo --master spark://hadoop1:7077 --jars /home/tuzq/software/spark-2.1.1-bin-hadoop2.7/jars/mysql-connector-java-5.1.38.jar --driver-class-path /home/tuzq/software/spark-2.1.1-bin-hadoop2.7/jars/mysql-connector-java-5.1.38.jar /home/tuzq/software/sparkdata/bigdata-1.0-SNAPSHOT.jar運行結果:
2、通過Spark-sql將數據存儲到數據庫中
2.2.1.代碼如下:
package cn.toto.sparkimport java.util.Propertiesimport org.apache.spark.sql.{Row, SQLContext} import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} import org.apache.spark.{SparkConf, SparkContext}/*** Created by toto on 2017/7/11.*/ object JdbcRDD {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("MySQL-Demo").setMaster("local")val sc = new SparkContext(conf)val sqlContext = new SQLContext(sc)//通過并行化創建RDDval personRDD = sc.parallelize(Array("14 tom 5", "15 jerry 3", "16 kitty 6")).map(_.split(" "))//通過StrutType直接指定每個字段的schemaval schema = StructType(List(StructField("id",IntegerType,true),StructField("name",StringType,true),StructField("age",IntegerType,true)))//將RDD映射到rowRDDval rowRDD = personRDD.map(p => Row(p(0).toInt, p(1).trim, p(2).toInt))//將schema信息應用到rowRDD上val personDataFrame = sqlContext.createDataFrame(rowRDD,schema)//創建Properties存儲數據庫相關屬性val prop = new Properties()prop.put("user", "root")prop.put("password", "123456")//將數據追加到數據庫personDataFrame.write.mode("append").jdbc("jdbc:mysql://hadoop10:3306/bigdata","bigdata.person",prop)//停止SparkContextsc.stop()} }運行結果:
2.2.2、用maven將程序打包
2.2.3、將Jar包提交到spark集群
bin/spark-submit --class cn.toto.spark.JdbcRDD --master spark://hadoop1:7077 --jars /home/tuzq/software/spark-2.1.1-bin-hadoop2.7/jars/mysql-connector-java-5.1.38.jar --driver-class-path /home/tuzq/software/spark-2.1.1-bin-hadoop2.7/jars/mysql-connector-java-5.1.38.jar /home/tuzq/software/sparkdata/bigdata-1.0-SNAPSHOT.jar 與50位技術專家面對面20年技術見證,附贈技術全景圖總結
以上是生活随笔為你收集整理的Spark-SQL从MySQL中加载数据以及将数据写入到mysql中(Spark Shell方式,Spark SQL程序)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 马云为什么选择张勇接任 这位牛人的战绩了
- 下一篇: Spark-Sql整合hive,在spa