用idea编写代码作为生产者,Kafka接收其【持续】发来的广告日志信息【小案例】(二)
生活随笔
收集整理的這篇文章主要介紹了
用idea编写代码作为生产者,Kafka接收其【持续】发来的广告日志信息【小案例】(二)
小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.
接我們上一篇使用idea編寫代碼作為生產(chǎn)者,Kafka接收其發(fā)來的信息【小案例】(一)
https://georgedage.blog.csdn.net/article/details/103503400
上篇直接生產(chǎn)一個字符串,進行消費。
這一篇做一個改進,對字符串進行高級化。比如,時間戳+用戶ip+廣告ip+省+市
代碼如下:
package com.kafkaimport java.util.Propertiesimport org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}import scala.collection.mutable.Map import scala.util.Randomobject KafkaDemo {def main(args: Array[String]): Unit = {val props = new Properties()props.setProperty("bootstrap.servers","henu2:9092")props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")val provinces = List[String]("henan","beijing","guangzhou","hebei")val cities = Map[String,List[String]]()cities.put("henan",List[String]("kaifeng","zhengzhou"))cities.put("beijing",List[String]("daxing","haidian"))cities.put("guangzhou",List[String]("zhuhai","zhongshan"))cities.put("hebei",List[String]("shijiazhuang","handan"))val random = new Random()val timestamp = System.currentTimeMillis()val userId = random.nextInt(1000)val adId = random.nextInt(50)val proIndex = random.nextInt(4)val privince = provinces(proIndex)val cIndex = random.nextInt(2)val city = cities.getOrElse(privince,List(""))(cIndex)val log = timestamp + " " + userId + " " + adId + " " + privince + " " + cityprintln(log)val kp = new KafkaProducer[String,String](props)kp.send(new ProducerRecord[String,String]("george",log))kp.close()} }可以看到輸出結(jié)果:
那我們希望的是持續(xù)產(chǎn)生數(shù)據(jù),如何操作,很簡單,就是循環(huán),死循環(huán)。
代碼如下:
package com.kafkaimport java.util.Propertiesimport org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}import scala.collection.mutable.Map import scala.util.Randomobject KafkaDemo {def main(args: Array[String]): Unit = {val props = new Properties()props.setProperty("bootstrap.servers","henu2:9092")props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")val kp = new KafkaProducer[String,String](props)val provinces = List[String]("henan", "beijing", "guangzhou", "hebei")val cities = Map[String, List[String]]()cities.put("henan", List[String]("kaifeng", "zhengzhou"))cities.put("beijing", List[String]("daxing", "haidian"))cities.put("guangzhou", List[String]("zhuhai", "zhongshan"))cities.put("hebei", List[String]("shijiazhuang", "handan"))val random = new Random()while (true) {val timestamp = System.currentTimeMillis()val userId = random.nextInt(1000)val adId = random.nextInt(50)val proIndex = random.nextInt(4)val privince = provinces(proIndex)val cIndex = random.nextInt(2)val city = cities.getOrElse(privince, List(""))(cIndex)val log = timestamp + " " + userId + " " + adId + " " + privince + " " + citykp.send(new ProducerRecord[String,String]("george",log))}} }代碼并不繁雜,都是些我們常用的基本語法。
結(jié)果如下:
如果覺得速度太快,可以在循環(huán)結(jié)尾加上線程休眠
Thread.sleep(1000)?
總結(jié)
以上是生活随笔為你收集整理的用idea编写代码作为生产者,Kafka接收其【持续】发来的广告日志信息【小案例】(二)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 使用idea编写代码作为生产者,Kafk
- 下一篇: 使用idea编写消费者,接收生产者的持续