Kafka- Spark消费Kafka
生活随笔
收集整理的這篇文章主要介紹了
Kafka- Spark消费Kafka
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
?
?
?
在高版本的API中
val brokers = properties.getProperty("kafka.host.list") val topics = Set(properties.getProperty("kafka.application.topic")) val kafkaParams = Map[String, String]("bootstrap.servers" -> brokers,"group.id" -> "ntaflowgroup","auto.commit.interval.ms" -> "1000","key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer","value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer","auto.offset.reset" -> "latest","enable.auto.commit" -> "true" ) val ntaflowCache: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String]( ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams) )?
轉載于:https://www.cnblogs.com/RzCong/p/8630043.html
總結
以上是生活随笔為你收集整理的Kafka- Spark消费Kafka的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: windows下,‘nmake‘不是内部
- 下一篇: 前端用crypto.js进行加密和解密