spark抽取mysql数据到hive_使用spark将内存中的数据写入到hive表中
使用spark將內存中的數據寫入到hive表中
hive-site.xml
hive.metastore.uris
thrift://master:9083
Thrift URI for the remote metastore. Used by metastore client to connect to remote metastore.
javax.jdo.option.ConnectionURL
jdbc:mysql://master:3306/metastore?createDatabaseIfNotExist=true
JDBC connect string for a JDBC metastore
javax.jdo.option.ConnectionDriverName
com.mysql.jdbc.Driver
Driver class name for a JDBC metastore
javax.jdo.option.ConnectionUserName
root
username to use against metastore database
javax.jdo.option.ConnectionPassword
123456
password to use against metastore database
hive.cli.print.header
true
Whether to print the names of the columns in query output.
hive.cli.print.current.db
true
Whether to include the current database in the Hive prompt.
下面是示例代碼
package spark_sql
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import test.ProductData
/**
* @Program: spark01
* @Author: 努力就是魅力
* @Since: 2018-10-19 08:30
* Description:
*
* 使用spark將內存中的數據寫入到hive表中,這是一個可以完整運行的例子
*
*
* 下面是hive表查詢的結果
* hive (hadoop10)> select * from data_block;
* OK
* data_block.ip data_block.time data_block.phonenum
* 40.234.66.122 2018-10-12 09:35:21
* 5.150.203.160 2018-10-03 14:41:09 13389202989
*
**/
case class Datablock(ip: String, time:String, phoneNum:String)
object WriteTabletoHive {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.master("local[*]")
.appName("WriteTableToHive")
.config("spark.sql.warehouse.dir","D:\\reference-data\\spark01\\spark-warehouse")
.enableHiveSupport()
.getOrCreate()
import spark.implicits._
val schemaString = "ip time phoneNum"
val fields = schemaString.split(" ")
.map(fieldName => StructField(fieldName, StringType,nullable = true))
val schema = StructType(fields)
// val datablockDS = Seq(Datablock(ProductData.getRandomIp,ProductData.getRecentAMonthRandomTime("yyyy-MM-dd HH:mm:ss"),ProductData.getRandomPhoneNumber)).toDS()
// val datablockDS = Seq(Datablock("192.168.40.122","2018-01-01 12:25:25","18866556699")).toDS()
datablockDS.show()
datablockDS.toDF().createOrReplaceTempView("dataBlock")
spark.sql("select * from dataBlock")
.write.mode("append")
.saveAsTable("hadoop10.data_block")
}
}
總結
以上是生活随笔為你收集整理的spark抽取mysql数据到hive_使用spark将内存中的数据写入到hive表中的全部內容,希望文章能夠幫你解決所遇到的問題。