使用idea编写SparkStreaming消费kafka中的数据【小案例】(四)
生活随笔
收集整理的這篇文章主要介紹了
使用idea编写SparkStreaming消费kafka中的数据【小案例】(四)
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
繼續接? ? ?使用idea編寫消費者,接收生產者的持續日志輸出【小案例】(三)
https://georgedage.blog.csdn.net/article/details/103506165
使用spark-Streaming進行流式接收處理。
記得添加pom
<dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.11</artifactId><version>2.3.1</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.11</artifactId><version>2.3.1</version></dependency>代碼如下:
package com.kafkaimport org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies} import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.{SparkConf, SparkContext}object KafkaConsumerStreamingDemo {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[3]").setAppName("kcs")val sc = new SparkContext(conf)sc.setLogLevel("error")val topic = List("george")val map = Map("bootstrap.servers" -> "henu2:9092","group.id" -> "george","key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer","value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer")val ssc = new StreamingContext(sc,Seconds(5))val ds = KafkaUtils.createDirectStream[String, String](ssc, LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](topic, map))val mapDs = ds.map(_.value())mapDs.print()ssc.start()ssc.awaitTermination()} }生產者:
https://georgedage.blog.csdn.net/article/details/103504598
代碼如下:
package com.kafkaimport java.util.Propertiesimport org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}import scala.collection.mutable.Map import scala.util.Randomobject KafkaProducerDemo {def main(args: Array[String]): Unit = {val props = new Properties()props.setProperty("bootstrap.servers","henu2:9092")props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")val kp = new KafkaProducer[String,String](props)val provinces = List[String]("henan", "beijing", "guangzhou", "hebei")val cities = Map[String, List[String]]()cities.put("henan", List[String]("kaifeng", "zhengzhou"))cities.put("beijing", List[String]("daxing", "haidian"))cities.put("guangzhou", List[String]("zhuhai", "zhongshan"))cities.put("hebei", List[String]("shijiazhuang", "handan"))val random = new Random()while (true) {val timestamp = System.currentTimeMillis()val userId = random.nextInt(1000)val adId = random.nextInt(50)val proIndex = random.nextInt(4)val privince = provinces(proIndex)val cIndex = random.nextInt(2)val city = cities.getOrElse(privince, List(""))(cIndex)val log = timestamp + " " + userId + " " + adId + " " + privince + " " + citykp.send(new ProducerRecord[String,String]("george",log))Thread.sleep(1000)}} }然后先啟動生產者,在啟動消費者
結果展示:
超強干貨來襲 云風專訪:近40年碼齡,通宵達旦的技術人生總結
以上是生活随笔為你收集整理的使用idea编写SparkStreaming消费kafka中的数据【小案例】(四)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 使用idea编写消费者,接收生产者的持续
- 下一篇: java.lang.IllegalArg