2021年大数据Spark(四十三):SparkStreaming整合Kafka 0.10 开发使用
目錄
整合Kafka 0-10-開發使用
原理
1.Direct方式
2.簡單的并行度1?:?1
???????API
注意
???????代碼實現-自動提交偏移量到默認主題
???????代碼實現-手動提交偏移量到默認主題
???????代碼實現-手動提交偏移量到MySQL-擴展
整合Kafka 0-10-開發使用
原理
目前企業中基本都使用New Consumer API集成,優勢如下:
1.Direct方式
直接到Kafka Topic中依據偏移量范圍獲取數據,進行處理分析;
The Spark Streaming integration for Kafka 0.10 is similar in design to the 0.8 Direct Stream approach;
2.簡單的并行度1?:?1
每批次中RDD的分區與Topic分區一對一關系;
It provides simple parallelism, 1:1 correspondence between Kafka partitions and Spark partitions, and access to offsets and metadata;
獲取Topic中數據的同時,還可以獲取偏移量和元數據信息;
?
?
采用Direct方式消費數據時,可以設置每批次處理數據的最大量,防止【波峰】時數據太多,導致批次數據處理有性能問題:
- ?參數:spark.streaming.kafka.maxRatePerPartition
- ?含義:Topic中每個分區每秒中消費數據的最大值
- ?舉例說明:
- BatchInterval:5s、Topic-Partition:3、maxRatePerPartition: 10000
- 最大消費數據量:10000 * 3 * 5 = 150000 條
?
???????API
http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html#obtaining-offsets
添加相關Maven依賴:
<dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.11</artifactId><version>${spark.version}</version></dependency>
注意
?
?
???????代碼實現-自動提交偏移量到默認主題
package cn.itcast.streamingimport org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}/*** Author itcast* Desc 使用spark-streaming-kafka-0-10版本中的Direct模式連接Kafka并自動提交偏移量*/
object SparkStreaming_Kafka_01 {def main(args: Array[String]): Unit = {//1.準備SparkStreaming執行環境--StreamingContextval conf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName.stripSuffix("$")).setMaster("local[*]")val sc: SparkContext = new SparkContext(conf)sc.setLogLevel("WARN")val ssc: StreamingContext = new StreamingContext(sc, Seconds(5))//2.準備Kafka的連接參數,如集群地址,主題,消費者組名稱,是否自動提交,offset重置位置,kv序列化val kafkaParams = Map[String, Object]("bootstrap.servers" -> "node1:9092,node2:9092,node3:9092",//集群地址"key.deserializer" -> classOf[StringDeserializer],//key的反序列化規則"value.deserializer" -> classOf[StringDeserializer],//value的反序列化規則"group.id" -> "spark",//消費者組名稱//earliest:表示如果有offset記錄從offset記錄開始消費,如果沒有從最早的消息開始消費//latest:表示如果有offset記錄從offset記錄開始消費,如果沒有從最后/最新的消息開始消費//none:表示如果有offset記錄從offset記錄開始消費,如果沒有就報錯"auto.offset.reset" -> "latest",//offset重置位置"auto.commit.interval.ms"->"1000",//自動提交的時間間隔"enable.auto.commit" -> (true: java.lang.Boolean)//是否自動提交偏移量)val topics = Array("spark_kafka")//要消費哪個主題//3.使用spark-streaming-kafka-0-10中的Direct模式連接Kafka// ssc: StreamingContext,// locationStrategy: LocationStrategy,位置策略,直接使用源碼推薦的優先一致性策略即可,在大多數情況下,它將一致地在所有執行器之間分配分區// consumerStrategy: ConsumerStrategy[K, V],消費策略,直接使用源碼推薦的訂閱模式,通過參數訂閱主題即可//kafkaDS就是從Kafka中消費到的完整的消息記錄!val kafkaDS: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](topics, kafkaParams))//4.從kafkaDS中獲取發送的valueval valuesDS: DStream[String] = kafkaDS.map(_.value)//5.輸出valuesDS.print()//6.啟動并等待結束ssc.start()ssc.awaitTermination()ssc.stop(stopSparkContext = true, stopGracefully = true)//注意://1.啟動kafka//2.準備主題:/export/server/kafka/bin/kafka-topics.sh --create --zookeeper node1:2181 --replication-factor 1?--partitions 3 --topic spark_kafka//3.開啟控制臺生產者:/export/server/kafka/bin/kafka-console-producer.sh --broker-list node1:9092 --topic spark_kafka}
}
?
???????代碼實現-手動提交偏移量到默認主題
?
package cn.itcast.streamingimport org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{CanCommitOffsets, ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies, OffsetRange}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}/*** Author itcast* Desc 使用spark-streaming-kafka-0-10版本中的Direct模式連接Kafka并手動提交偏移量*/
object SparkStreaming_Kafka_02 {def main(args: Array[String]): Unit = {//1.準備SparkStreaming執行環境--StreamingContextval conf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName.stripSuffix("$")).setMaster("local[*]")val sc: SparkContext = new SparkContext(conf)sc.setLogLevel("WARN")val ssc: StreamingContext = new StreamingContext(sc, Seconds(5))//2.準備Kafka的連接參數,如集群地址,主題,消費者組名稱,是否自動提交,offset重置位置,kv序列化val kafkaParams = Map[String, Object]("bootstrap.servers" -> "node1:9092,node2:9092,node3:9092",//集群地址"key.deserializer" -> classOf[StringDeserializer],//key的反序列化規則"value.deserializer" -> classOf[StringDeserializer],//value的反序列化規則"group.id" -> "spark",//消費者組名稱//earliest:表示如果有offset記錄從offset記錄開始消費,如果沒有從最早的消息開始消費//latest:表示如果有offset記錄從offset記錄開始消費,如果沒有從最后/最新的消息開始消費//none:表示如果有offset記錄從offset記錄開始消費,如果沒有就報錯"auto.offset.reset" -> "latest",//offset重置位置//"auto.commit.interval.ms"->"1000",//自動提交的時間間隔"enable.auto.commit" -> (false: java.lang.Boolean)//是否自動提交偏移量)val topics = Array("spark_kafka")//要消費哪個主題//3.使用spark-streaming-kafka-0-10中的Direct模式連接Kafka// ssc: StreamingContext,// locationStrategy: LocationStrategy,位置策略,直接使用源碼推薦的優先一致性策略即可,在大多數情況下,它將一致地在所有執行器之間分配分區// consumerStrategy: ConsumerStrategy[K, V],消費策略,直接使用源碼推薦的訂閱模式,通過參數訂閱主題即可//kafkaDS就是從Kafka中消費到的完整的消息記錄!val kafkaDS: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](topics, kafkaParams))//4.處理從Kafka中消費到的value//手動提交偏移量的時機://1.每隔一段時間提交一次:可以,但是和自動提交一樣了,那還不如直接自動提交!//2.消費一條消息就提交一次offset:可以但是提交的太頻繁了,可能會影響效率!除非對數據安全要求特別高!//3.消費一小批消息就提交一次offset:可以!一小批數據在SparkStreaming里面就是DStream底層的RDD(微批)!kafkaDS.foreachRDD(rdd=>{//該如何消費/處理就如何消費/處理//完事之后就應該提交該批次的offset!if(!rdd.isEmpty()){//當前批次的rdd不為空,那么就消費該批次數據并提交偏移量rdd.foreach(r=>{println(s"消費到的消息記錄的分區為:${r.partition()},offset為:${r.offset()},key為:${r.key()},value為:${r.value()}")})//代碼走到這里說明該批次數據已經消費并處理了,那么應該手動提交偏移量了!//要手動提交的偏移量信息都在rdd中,但是我們要提交的僅僅是offset相關的信息,所以將rdd轉為方便我們提交的Array[OffsetRange]類型val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges//上面的offsetRanges數組中就記錄了各個分區的偏移量信息!offsetRanges.foreach(o=>{println(s"offsetRanges中記錄的分區為:${o.partition},開始offset為:${o.fromOffset},結束offset為${o.untilOffset}")})//手動提交--提交到Kafka的默認主題中!(注:如果設置了Checkpoint,還會儲存一份到Checkpoint中)kafkaDS.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)println("當前批次的offset已經提交到默認主題中")}})//5.輸出//6.啟動并等待結束ssc.start()ssc.awaitTermination()ssc.stop(stopSparkContext = true, stopGracefully = true)//注意://1.啟動kafka//2.準備主題:/export/server/kafka/bin/kafka-topics.sh --create --zookeeper node1:2181 --replication-factor 1?--partitions 3 --topic spark_kafka//3.開啟控制臺生產者:/export/server/kafka/bin/kafka-console-producer.sh --broker-list node1:9092 --topic spark_kafka}
}
?
?
???????代碼實現-手動提交偏移量到MySQL-擴展
package cn.itcast.streamingimport java.sql.{DriverManager, ResultSet}import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}import scala.collection.mutable/*** Author itcast* Desc 使用spark-streaming-kafka-0-10版本中的Direct模式連接Kafka并手動提交偏移量到MySQL*/
object SparkStreaming_Kafka_03 {def main(args: Array[String]): Unit = {//1.準備SparkStreaming執行環境--StreamingContextval conf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName.stripSuffix("$")).setMaster("local[*]")val sc: SparkContext = new SparkContext(conf)sc.setLogLevel("WARN")val ssc: StreamingContext = new StreamingContext(sc, Seconds(5))//2.準備Kafka的連接參數,如集群地址,主題,消費者組名稱,是否自動提交,offset重置位置,kv序列化val kafkaParams = Map[String, Object]("bootstrap.servers" -> "node1:9092,node2:9092,node3:9092", //集群地址"key.deserializer" -> classOf[StringDeserializer], //key的反序列化規則"value.deserializer" -> classOf[StringDeserializer], //value的反序列化規則"group.id" -> "spark", //消費者組名稱//earliest:表示如果有offset記錄從offset記錄開始消費,如果沒有從最早的消息開始消費//latest:表示如果有offset記錄從offset記錄開始消費,如果沒有從最后/最新的消息開始消費//none:表示如果有offset記錄從offset記錄開始消費,如果沒有就報錯"auto.offset.reset" -> "latest", //offset重置位置//"auto.commit.interval.ms"->"1000",//自動提交的時間間隔"enable.auto.commit" -> (false: java.lang.Boolean) //是否自動提交偏移量)val topics = Array("spark_kafka") //要消費哪個主題//3.使用spark-streaming-kafka-0-10中的Direct模式連接Kafka//連接kafka之前,要先去MySQL看下有沒有該消費者組的offset記錄,如果有從記錄的位置開始消費,如果沒有從"auto.offset.reset" -> "latest"位置開始消費!//Map[主題分區為key, offset為value]val offsetMap: mutable.Map[TopicPartition, Long] = OffsetUtil.getOffsetMap("spark", "spark_kafka")val kafkaDS: InputDStream[ConsumerRecord[String, String]] = if (offsetMap.size > 0) {println("MySQL中有記錄該消費者消費該主題的各個分區的offset信息,所以接著該記錄開始消費")KafkaUtils.createDirectStream[String, String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](topics, kafkaParams, offsetMap))} else {println("MySQL沒有記錄該消費者消費該主題的各個分區的offset信息,所以從auto.offset.reset配置的latest開始消費")KafkaUtils.createDirectStream[String, String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](topics, kafkaParams))}//4.處理從Kafka中消費到的value//手動提交偏移量的時機://1.每隔一段時間提交一次:可以,但是和自動提交一樣了,那還不如直接自動提交!//2.消費一條消息就提交一次offset:可以但是提交的太頻繁了,可能會影響效率!除非對數據安全要求特別高!//3.消費一小批消息就提交一次offset:可以!一小批數據在SparkStreaming里面就是DStream底層的RDD(微批)!kafkaDS.foreachRDD(rdd => {//該如何消費/處理就如何消費/處理//完事之后就應該提交該批次的offset!if (!rdd.isEmpty()) { //當前批次的rdd不為空,那么就消費該批次數據并提交偏移量rdd.foreach(r => {println(s"消費到的消息記錄的分區為:${r.partition()},offset為:${r.offset()},key為:${r.key()},value為:${r.value()}")})//代碼走到這里說明該批次數據已經消費并處理了,那么應該手動提交偏移量了!//要手動提交的偏移量信息都在rdd中,但是我們要提交的僅僅是offset相關的信息,所以將rdd轉為方便我們提交的Array[OffsetRange]類型val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges//上面的offsetRanges數組中就記錄了各個分區的偏移量信息!offsetRanges.foreach(o => {println(s"offsetRanges中記錄的分區為:${o.partition},開始offset為:${o.fromOffset},結束offset為${o.untilOffset}")})//手動提交--提交到Kafka的默認主題中!(注:如果設置了Checkpoint,還會儲存一份到Checkpoint中)//kafkaDS.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)OffsetUtil.saveOffsetRanges("spark", offsetRanges)println("當前批次的offset已經提交到MySQL中")}})//5.輸出//6.啟動并等待結束ssc.start()ssc.awaitTermination()ssc.stop(stopSparkContext = true, stopGracefully = true)//注意://1.啟動kafka//2.準備主題:/export/server/kafka/bin/kafka-topics.sh --create --zookeeper node1:2181 --replication-factor 1?--partitions 3 --topic spark_kafka//3.開啟控制臺生產者:/export/server/kafka/bin/kafka-console-producer.sh --broker-list node1:9092 --topic spark_kafka}/*手動維護offset的工具類首先在MySQL創建如下表CREATE TABLE `t_offset` (`topic` varchar(255) NOT NULL,`partition` int(11) NOT NULL,`groupid` varchar(255) NOT NULL,`offset` bigint(20) DEFAULT NULL,PRIMARY KEY (`topic`,`partition`,`groupid`)) ENGINE=InnoDB DEFAULT CHARSET=utf8;*/object OffsetUtil {//1.將偏移量保存到數據庫def saveOffsetRanges(groupid: String, offsetRange: Array[OffsetRange]) = {val connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8", "root", "root")//replace into表示之前有就替換,沒有就插入val ps = connection.prepareStatement("replace into t_offset (`topic`, `partition`, `groupid`, `offset`) values(?,?,?,?)")for (o <- offsetRange) {ps.setString(1, o.topic)ps.setInt(2, o.partition)ps.setString(3, groupid)ps.setLong(4, o.untilOffset)ps.executeUpdate()}ps.close()connection.close()}//2.從數據庫讀取偏移量def getOffsetMap(groupid: String, topic: String) = {val connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8", "root", "root")val ps = connection.prepareStatement("select * from t_offset where groupid=? and topic=?")ps.setString(1, groupid)ps.setString(2, topic)val rs: ResultSet = ps.executeQuery()val offsetMap = mutable.Map[TopicPartition, Long]()while (rs.next()) {offsetMap += new TopicPartition(rs.getString("topic"), rs.getInt("partition")) -> rs.getLong("offset")}rs.close()ps.close()connection.close()offsetMap}}
}
?
總結
以上是生活随笔為你收集整理的2021年大数据Spark(四十三):SparkStreaming整合Kafka 0.10 开发使用的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 2021年大数据Spark(四十二):S
- 下一篇: 2021年大数据Spark(四十四):S