inputdstream mysql_【sparkStreaming】将DStream保存在MySQL
package SparkDemo
import java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object DStreamToMySQL {
//定義更新函數
def updateFunc(newValues : Seq[Int],state :Option[Int]):Option[Int] = {
val currentCount = newValues.foldLeft(0)(_+_)
val previousCount = state.getOrElse(0)
Some(currentCount+previousCount)
}
def main(args : Array[String]): Unit ={
//建立SparkStream
val conf = new SparkConf().setAppName("DStreamToMySQL")
val ssc = new StreamingContext(conf,Seconds(1))
//設置日志等級
StreamingLoggingExample.setStreamingLogLevels()
val lines = ssc.textFileStream("/tmp/yuhang.zhang/data")
val words = lines.flatMap(_.split(" "))
val pairWord = words.map((_,1))
//累計更新
val stateWordCount = pairWord.updateStateByKey[Int](updateFunc)
//將stateWordCount存入數據庫
//stateWordCount中包含一堆的Rdd
//我們需要對每個Rdd中的每條數據進行處理儲存
stateWordCount.foreachRDD(rdd => {
//每個rdd中包含的數據類型為(String,Int)
//我們把所有數據records定義為Iterator類型,方便我們遍歷
def func(records:Iterator[(String,Int)]): Unit ={
//注意,conn和stmt定義為var不能是val
var conn: Connection = null
var stmt : PreparedStatement = null
try{
//連接數據庫
val url = "jdbc:mysql://localhost:3306/spark" //地址+數據庫
val user = "root"
val password = ""
conn = DriverManager.getConnection(url,user,password)
//
records.foreach(p =>{
//wordcount為表名,word和count為要插入數據的屬性
//插入數據
val sql = "insert into wordcount(word,count) values(?,?)"
stmt = conn.prepareStatement(sql)
stmt.setString(1,p._1.trim)
stmt.setInt(2,p._2.toInt)
stmt.executeUpdate()
})
}catch {
case e : Exception => e.printStackTrace()
}finally {
if(stmt != null)
stmt.close()
if(conn != null)
conn.close()
}
}
val repairtitionedRDD = rdd.repartition(3)//將每個rdd重新分區
repairtitionedRDD.foreachPartition(func)//對重新分區后的rdd執行func函數
})
ssc.start()//啟動
ssc.awaitTermination()//等待終止命令
}
}
總結
以上是生活随笔為你收集整理的inputdstream mysql_【sparkStreaming】将DStream保存在MySQL的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 上交所业绩预告披露规则?
- 下一篇: ddos流量测试(ddos流量攻击检测)