2021年大数据Spark(二十):Spark Core外部数据源引入
目錄
外部數據源
MySQL 數據源
演示代碼
HBase 數據源
HBase Sink
???????HBase Source
外部數據源
?
Spark可以從外部存儲系統讀取數據,比如RDBMs表中或者HBase表中讀寫數據,這也是企業中常常使用,如:
?1)、要分析的數據存儲在HBase表中,需要從其中讀取數據數據分析
日志數據:電商網站的商家操作日志
訂單數據:保險行業訂單數據
?2)、使用Spark進行離線分析以后,往往將報表結果保存到MySQL表中
網站基本分析(pv、uv。。。。。)
注意:實際開發中會封裝為工具類直接使用
https://github.com/teeyog/blog/issues/22
https://blog.csdn.net/u011817217/article/details/81667115
?
?
?
MySQL 數據源
?????實際開發中常常將分析結果RDD保存至MySQL表中,使用foreachPartition函數;此外Spark中提供JdbcRDD用于從MySQL表中讀取數據。
調用RDD#foreachPartition函數將每個分區數據保存至MySQL表中,保存時考慮降低RDD分區數目和批量插入,提升程序性能。
演示代碼
package cn.itcast.coreimport java.sql.{Connection, DriverManager, PreparedStatement, ResultSet}import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.{JdbcRDD, RDD}/*** Author itcast* Desc 演示使用Spark將數據寫入到MySQL,再從MySQL讀取出來*/
object SparkJdbcDataSource {def main(args: Array[String]): Unit = {//1.創建SparkContextval sparkConf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName.stripSuffix("$")).setMaster("local[*]")val sc: SparkContext = new SparkContext(sparkConf)sc.setLogLevel("WARN")//2.準備數據val data: RDD[(String, Int)] = sc.parallelize(List(("jack", 18), ("tom", 19), ("rose", 20)))//3.將RDD中的數據保存到MySQL中去//將每一個分區中的數據保存到MySQL中去,有幾個分區,就會開啟關閉連接幾次//data.foreachPartition(itar=>dataToMySQL(itar))data.foreachPartition(dataToMySQL) //方法即函數,函數即對象//4.從MySQL讀取數據/*class JdbcRDD[T: ClassTag](sc: SparkContext,getConnection: () => Connection,sql: String,lowerBound: Long,upperBound: Long,numPartitions: Int,mapRow: (ResultSet) => T = JdbcRDD.resultSetToObjectArray _)*/val getConnection = ()=> DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8","root","root")val sql:String = "select id,name,age from t_student where id >= ? and id <= ?"val mapRow = (rs:ResultSet) => {val id: Int = rs.getInt(1)val name: String = rs.getString(2)val age: Int = rs.getInt("age")(id,name,age)}val studentRDD: JdbcRDD[(Int, String, Int)] = new JdbcRDD(sc,getConnection,sql,4,5,2,mapRow)println(studentRDD.collect().toBuffer)}/*** 將分區中的數據保存到MySQL* @param itar 傳過來的每個分區有多條數據*/def dataToMySQL(itar: Iterator[(String, Int)]): Unit = {//0.加載驅動//Class.forName("") //源碼中已經加載了//1.獲取連接val connection: Connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8","root","root")//2.編寫sqlval sql:String = "INSERT INTO `t_student` (`name`, `age`) VALUES (?, ?);"//3.獲取psval ps: PreparedStatement = connection.prepareStatement(sql)itar.foreach(data=>{//4.設置參數ps.setString(1,data._1)ps.setInt(2,data._2)//5.執行sqlps.addBatch()})ps.executeBatch()ps.close()connection.close()}
}
?
???????HBase 數據源
Spark可以從HBase表中讀寫(Read/Write)數據,底層采用TableInputFormat和TableOutputFormat方式,與MapReduce與HBase集成完全一樣,使用輸入格式InputFormat和輸出格式OutputFoamt。
?
?
???????HBase Sink
回顧MapReduce向HBase表中寫入數據,使用TableReducer,其中OutputFormat為TableOutputFormat,讀取數據Key:ImmutableBytesWritable(Rowkey),Value:Put(Put對象)。
寫入數據時,需要將RDD轉換為RDD[(ImmutableBytesWritable, Put)]類型,調用saveAsNewAPIHadoopFile方法數據保存至HBase表中。
HBase Client連接時,需要設置依賴Zookeeper地址相關信息及表的名稱,通過Configuration設置屬性值進行傳遞。
?
范例演示:將詞頻統計結果保存HBase表,表的設計
?
代碼如下:
package cn.itcast.coreimport org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}/*** 將RDD數據保存至HBase表中*/
object SparkWriteHBase {def main(args: Array[String]): Unit = {val sparkConf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName.stripSuffix("$")).setMaster("local[*]")val sc: SparkContext = new SparkContext(sparkConf)sc.setLogLevel("WARN")// 構建RDDval list = List(("hadoop", 234), ("spark", 3454), ("hive", 343434), ("ml", 8765))val outputRDD: RDD[(String, Int)] = sc.parallelize(list, numSlices = 2)// 將數據寫入到HBase表中, 使用saveAsNewAPIHadoopFile函數,要求RDD是(key, Value)// ?組裝RDD[(ImmutableBytesWritable, Put)]/*** HBase表的設計:* 表的名稱:htb_wordcount* Rowkey: ?word* 列簇: ???info* 字段名稱: count*/val putsRDD: RDD[(ImmutableBytesWritable, Put)] = outputRDD.mapPartitions { iter =>iter.map { case (word, count) =>// 創建Put實例對象val put = new Put(Bytes.toBytes(word))// 添加列put.addColumn(// 實際項目中使用HBase時,插入數據,先將所有字段的值轉為String,再使用Bytes轉換為字節數組Bytes.toBytes("info"), Bytes.toBytes("cout"), Bytes.toBytes(count.toString))// 返回二元組(new ImmutableBytesWritable(put.getRow), put)}}// 構建HBase Client配置信息val conf: Configuration = HBaseConfiguration.create()// 設置連接Zookeeper屬性conf.set("hbase.zookeeper.quorum", "node1")conf.set("hbase.zookeeper.property.clientPort", "2181")conf.set("zookeeper.znode.parent", "/hbase")// 設置將數據保存的HBase表的名稱conf.set(TableOutputFormat.OUTPUT_TABLE, "htb_wordcount")/*def saveAsNewAPIHadoopFile(path: String,// 保存的路徑keyClass: Class[_], // Key類型valueClass: Class[_], // Value類型outputFormatClass: Class[_ <: NewOutputFormat[_, _]], // 輸出格式OutputFormat實現conf: Configuration = self.context.hadoopConfiguration // 配置信息): Unit*/putsRDD.saveAsNewAPIHadoopFile("datas/spark/htb-output-" + System.nanoTime(), //classOf[ImmutableBytesWritable], //classOf[Put], //classOf[TableOutputFormat[ImmutableBytesWritable]], //conf)// 應用程序運行結束,關閉資源sc.stop()}
}
運行完成以后,使用hbase shell查看數據:
?
?
???????HBase Source
回顧MapReduce從讀HBase表中的數據,使用TableMapper,其中InputFormat為TableInputFormat,讀取數據Key:ImmutableBytesWritable,Value:Result。
???從HBase表讀取數據時,同樣需要設置依賴Zookeeper地址信息和表的名稱,使用Configuration設置屬性,形式如下:
?
?????此外,讀取的數據封裝到RDD中,Key和Value類型分別為:ImmutableBytesWritable和Result,不支持Java Serializable導致處理數據時報序列化異常。設置Spark Application使用Kryo序列化,性能要比Java 序列化要好,創建SparkConf對象設置相關屬性,如下所示:
?
范例演示:從HBase表讀取詞頻統計結果,代碼如下
package cn.itcast.coreimport org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.{CellUtil, HBaseConfiguration}
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}/*** 從HBase 表中讀取數據,封裝到RDD數據集*/
object SparkReadHBase {def main(args: Array[String]): Unit = {val sparkConf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName.stripSuffix("$")).setMaster("local[*]")val sc: SparkContext = new SparkContext(sparkConf)sc.setLogLevel("WARN")// 讀取HBase Client 配置信息val conf: Configuration = HBaseConfiguration.create()conf.set("hbase.zookeeper.quorum", "node1")conf.set("hbase.zookeeper.property.clientPort", "2181")conf.set("zookeeper.znode.parent", "/hbase")// 設置讀取的表的名稱conf.set(TableInputFormat.INPUT_TABLE, "htb_wordcount")/*def newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]](conf: Configuration = hadoopConfiguration,fClass: Class[F],kClass: Class[K],vClass: Class[V]): RDD[(K, V)]*/val resultRDD: RDD[(ImmutableBytesWritable, Result)] = sc.newAPIHadoopRDD(conf,classOf[TableInputFormat],classOf[ImmutableBytesWritable],classOf[Result])println(s"Count = ${resultRDD.count()}")resultRDD.take(5).foreach { case (rowKey, result) =>println(s"RowKey = ${Bytes.toString(rowKey.get())}")// HBase表中的每條數據封裝在result對象中,解析獲取每列的值result.rawCells().foreach { cell =>val cf = Bytes.toString(CellUtil.cloneFamily(cell))val column = Bytes.toString(CellUtil.cloneQualifier(cell))val value = Bytes.toString(CellUtil.cloneValue(cell))val version = cell.getTimestampprintln(s"\t $cf:$column?= $value, version = $version")}}// 應用程序運行結束,關閉資源sc.stop()}
}
運行結果:
?
?
總結
以上是生活随笔為你收集整理的2021年大数据Spark(二十):Spark Core外部数据源引入的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 2021年大数据Spark(十九):Sp
- 下一篇: 2021年大数据Spark(二十一):S