Kafka消息模拟器
生活随笔
收集整理的這篇文章主要介紹了
Kafka消息模拟器
小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.
package clickstream
import java.util.{Properties, Random, UUID}
import kafka.producer.{KeyedMessage, Producer, ProducerConfig}
import org.codehaus.jettison.json.JSONObject/** *
Created by 郭飛 on 2016/5/31.
*/
object KafkaMessageGenerator {private val random = new Random()private var pointer = -1private val os_type = Array("Android", "IPhone OS","None", "Windows Phone")def click() : Double = {random.nextInt(10)}def getOsType() : String = {pointer = pointer + 1if(pointer >= os_type.length) {pointer = 0os_type(pointer)} else {os_type(pointer)}}def main(args: Array[String]): Unit = {val topic = "user_events"//本地虛擬機ZK地址val brokers = "hadoop1:9092,hadoop2:9092,hadoop3:9092"val props = new Properties()props.put("metadata.broker.list", brokers)props.put("serializer.class", "kafka.serializer.StringEncoder")val kafkaConfig = new ProducerConfig(props)val producer = new Producer[String, String](kafkaConfig)while(true) {// prepare event dataval event = new JSONObject()event.put("uid", UUID.randomUUID())//隨機生成用戶id.put("event_time", System.currentTimeMillis.toString) //記錄時間發(fā)生時間.put("os_type", getOsType) //設備類型.put("click_count", click) //點擊次數(shù)// produce event messageproducer.send(new KeyedMessage[String, String](topic, event.toString))println("Message sent: " + event)Thread.sleep(200)}}
}作者:MichaelFly
鏈接:http://www.jianshu.com/p/ccba410462ba
來源:簡書
著作權歸作者所有。商業(yè)轉載請聯(lián)系作者獲得授權,非商業(yè)轉載請注明出處。
?
轉載于:https://www.cnblogs.com/rocky-AGE-24/p/7404754.html
總結
以上是生活随笔為你收集整理的Kafka消息模拟器的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Linux文件系统命令 cat
- 下一篇: Spring事务源码分析