Spark2.11 两种流操作 + Kafka
生活随笔
收集整理的這篇文章主要介紹了
Spark2.11 两种流操作 + Kafka
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
Spark2.x 自從引入了?Structured Streaming?后,未來數據操作將逐步轉化到?DataFrame/DataSet,以下將介紹 Spark2.x 如何與?Kafka0.10+整合
Structured Streaming + Kafka
引包
為了讓更直觀的展示包的依賴,以下是我的工程 sbt 文件
name?:=?"spark-test" version?:=?"1.0" scalaVersion?:=?"2.11.7" //?https://mvnrepository.com/artifact/org.apache.spark/spark-core_2.11 libraryDependencies?+=?"org.apache.spark"?%?"spark-core_2.11"?%?"2.1.1"?%?"provided" //?https://mvnrepository.com/artifact/org.apache.spark/spark-mllib_2.11 libraryDependencies?+=?"org.apache.spark"?%?"spark-mllib_2.11"?%?"2.1.1"?%?"provided" //?https://mvnrepository.com/artifact/org.apache.spark/spark-streaming_2.11 libraryDependencies?+=?"org.apache.spark"?%?"spark-streaming_2.11"?%?"2.1.1"?%?"provided" //?https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client libraryDependencies?+=?"org.apache.hadoop"?%?"hadoop-client"?%?"2.7.3" //?https://mvnrepository.com/artifact/mysql/mysql-connector-java libraryDependencies?+=?"mysql"?%?"mysql-connector-java"?%?"5.1.38" //?https://mvnrepository.com/artifact/org.apache.kafka/kafka_2.11 libraryDependencies?+=?"org.apache.kafka"?%?"kafka_2.11"?%?"0.10.2.1" //libraryDependencies?+=?"org.apache.spark"?%?"spark-streaming-kafka-0-10_2.11"?%?"2.1.1" libraryDependencies?+=?"org.apache.spark"?%?"spark-sql-kafka-0-10_2.11"?%?"2.1.1"Structured Streaming 連接 Kafka
流的元數據如下
| key | binary |
| value | binary |
| topic | string |
| partition | int |
| offset | long |
| timestamp | long |
| timestampType | int |
可配參數
| assign | json string {"topicA":[0,1],"topicB":[2,4]} | 用于指定消費的 TopicPartitions,assign,subscribe,subscribePattern是三種消費方式,只能同時指定一個 |
| subscribe | A comma-separated list of topics | 用于指定要消費的 topic |
| subscribePattern | Java regex string | 使用正則表達式匹配消費的 topic |
| kafka.bootstrap.servers | A comma-separated list of host:port | kafka brokers |
不能配置的參數
group.id: 對每個查詢,kafka 自動創建一個唯一的 group
auto.offset.reset: 可以通過 startingOffsets 指定,Structured Streaming 會對任何流數據維護 offset, 以保證承諾的 exactly once.
key.deserializer: 在 DataFrame 上指定,默認?ByteArrayDeserializer
value.deserializer: 在 DataFrame 上指定,默認?ByteArrayDeserializer
enable.auto.commit:
interceptor.classes:
Stream + Kafka
從最新offset開始消費
def?main(args:?Array[String]):?Unit?=?{val?kafkaParams?=?Map[String,?Object]("bootstrap.servers"?->?"localhost:9092","key.deserializer"?->?classOf[StringDeserializer],"value.deserializer"?->?classOf[StringDeserializer],"group.id"?->?"use_a_separate_group_id_for_each_stream","auto.offset.reset"?->?"latest","enable.auto.commit"?->?(false:?java.lang.Boolean))val?ssc?=new?StreamingContext(OpContext.sc,?Seconds(2))val?topics?=?Array("test")val?stream?=?KafkaUtils.createDirectStream[String,?String](ssc,PreferConsistent,Subscribe[String,?String](topics,?kafkaParams))stream.foreachRDD(rdd=>{val?offsetRanges=rdd.asInstanceOf[HasOffsetRanges].offsetRangesrdd.foreachPartition(iter=>{val?o:?OffsetRange?=?offsetRanges(TaskContext.get.partitionId)println(s"${o.topic}?${o.partition}?${o.fromOffset}?${o.untilOffset}")})stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)})//????stream.map(record?=>?(record.key,?record.value)).print(1)ssc.start()ssc.awaitTermination()}從指定的offset開始消費
def?main(args:?Array[String]):?Unit?=?{val?kafkaParams?=?Map[String,?Object]("bootstrap.servers"?->?"localhost:9092","key.deserializer"?->?classOf[StringDeserializer],"value.deserializer"?->?classOf[StringDeserializer],"group.id"?->?"use_a_separate_group_id_for_each_stream",//??????"auto.offset.reset"?->?"latest","enable.auto.commit"?->?(false:?java.lang.Boolean))val?ssc?=?new?StreamingContext(OpContext.sc,?Seconds(2))val?fromOffsets?=?Map(new?TopicPartition("test",?0)?->?1100449855L)val?stream?=?KafkaUtils.createDirectStream[String,?String](ssc,PreferConsistent,Assign[String,?String](fromOffsets.keys.toList,?kafkaParams,?fromOffsets))stream.foreachRDD(rdd?=>?{val?offsetRanges?=?rdd.asInstanceOf[HasOffsetRanges].offsetRangesfor?(o?<-?offsetRanges)?{println(s"${o.topic}?${o.partition}?${o.fromOffset}?${o.untilOffset}")}stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)})//????stream.map(record?=>?(record.key,?record.value)).print(1)ssc.start()ssc.awaitTermination() }轉載于:https://blog.51cto.com/13064681/1943431
總結
以上是生活随笔為你收集整理的Spark2.11 两种流操作 + Kafka的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Linq使用Group By
- 下一篇: ❀❀ selenium 学习网站 ★★