spark从hbase读数据到存入hbase数据两种版本写法
生活随笔
收集整理的這篇文章主要介紹了
spark从hbase读数据到存入hbase数据两种版本写法
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
spark2版本:
object SparkCoreTest {def main(args: Array[String]): Unit = {// 使用sparksession來創建對象val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkCoreTest")val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()// 設置讀表和寫表val readTable: String = "hydrogenation_flow_record"val writeTable: String = "test200"// 創建hbase輸入的配置文件,并且把服務器上的hbase-site放進resources目錄下val hBaseConfRead: Configuration = HBaseConfiguration.create()// inputtable代表是讀數據的配置hBaseConfRead.set(TableInputFormat.INPUT_TABLE, readTable)//配置寫入表,要定義一個Jobconf,與讀表不同val hBaseConfWrite: Configuration = HBaseConfiguration.create()val jobConf = new JobConf(hBaseConfWrite)jobConf.setOutputFormat(classOf[TableOutputFormat])jobConf.set(TableOutputFormat.OUTPUT_TABLE, writeTable);// 創建hadooprdd算子,出來的rdd為一個元組對象,第一個元素類型為ImmutableBytesWritable,所以寫入時也要轉成同樣的轉子val hbaseRDD: RDD[(ImmutableBytesWritable, Result)] = spark.sparkContext.newAPIHadoopRDD(hBaseConfRead, classOf[TableInputFormat],classOf[ImmutableBytesWritable],classOf[Result])// 導入toDF變成dataframe的隱式依賴,讓下面可以用toDF方法import spark.implicits._val sps: DataFrame = hbaseRDD.map(r => (Bytes.toString(r._2.getValue(Bytes.toBytes("SPSFlowInfo"), Bytes.toBytes("SPSFlowTotal"))),Bytes.toString(r._2.getRow))).toDF("SPSFlowTotal", "row")// 創建出來的dataframe進行命名sps.createOrReplaceTempView("sps")// 執行sql語句val frame: DataFrame = spark.sql("SELECT sum(SPSFlowTotal) as A FROM sps WHERE row BETWEEN '4000069:1618539744390' and '4000069:1618539744426'")// 將查到的數據組裝成元組類型,元組的第一個為qualifier,元組的第二個是從dataframe里讀到的數據val tupleDS: Dataset[(String, String)] = frame.map(t => ("SPSFlowTotal", t(0).toString))// 配置輸出到hbase的rdd,新建一個put,第一個為row,第二個為具體列,具體列可以填寫列族列,值,可以同時加多個列val rdd: RDD[(ImmutableBytesWritable, Put)] = tupleDS.rdd.map { a => {val put: Put = new Put(Bytes.toBytes("34353454353"))put.addColumn(Bytes.toBytes("info"), Bytes.toBytes(a._1.toString), Bytes.toBytes(a._2))// 封裝成元組時第一個必須為ImmutableBytesWritable,符合spark和hadoop的規范(new ImmutableBytesWritable, put)}}// 執行保存操作rdd.saveAsHadoopDataset(jobConf)// 關閉sessionspark.stop()}}spark老版本:
object SparkCoreTest {def main(args: Array[String]): Unit = {val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkCoreTest") // val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()val sc: SparkContext = new SparkContext(sparkConf)val hBaseConf = HBaseConfiguration.create()hBaseConf.set(TableInputFormat.INPUT_TABLE, "hydrogenation_flow_record")val sqlContext = new SQLContext(sc)import sqlContext.implicits._ // import spark.implicits._val hBaseConf1 = HBaseConfiguration.create()val jobConf = new JobConf(hBaseConf1)jobConf.setOutputFormat(classOf[TableOutputFormat])jobConf.set(TableOutputFormat.OUTPUT_TABLE, "test200");val hbaseRDD: RDD[(ImmutableBytesWritable, Result)] = sc.newAPIHadoopRDD(hBaseConf, classOf[TableInputFormat],classOf[ImmutableBytesWritable],classOf[Result])val sps: DataFrame = hbaseRDD.map(r => (Bytes.toString(r._2.getValue(Bytes.toBytes("SPSFlowInfo"), Bytes.toBytes("SPSFlowTotal"))),Bytes.toString(r._2.getRow))).toDF("SPSFlowTotal", "row")// sps.registerTempTable("sps")sps.createOrReplaceTempView("sps")val frame: DataFrame = sqlContext.sql("SELECT sum(SPSFlowTotal) as A FROM sps WHERE row BETWEEN '4000069:1618539744390' and '4000069:1618539744426'")val tupleDS: Dataset[(String, String)] = frame.map(t => ("SPSFlowTotal", t(0).toString))val rdd: RDD[(ImmutableBytesWritable, Put)] = tupleDS.rdd.map { a => {val put: Put = new Put(Bytes.toBytes("343534543533".toString))put.addColumn(Bytes.toBytes("info"), Bytes.toBytes(a._1.toString), Bytes.toBytes(a._2))(new ImmutableBytesWritable, put)}}rdd.saveAsHadoopDataset(jobConf)spark.stop()}}總結
以上是生活随笔為你收集整理的spark从hbase读数据到存入hbase数据两种版本写法的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 记录kubesphere的安装与使用
- 下一篇: 深度讲解spring的循环依赖以及三级缓