flink sql clinet 实战:模拟数据----flink-1.13.6
生活随笔
收集整理的這篇文章主要介紹了
flink sql clinet 实战:模拟数据----flink-1.13.6
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
1、模擬數據
package com.chb.flink.combat.ch1import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} import org.apache.kafka.common.serialization.StringSerializerimport java.time.LocalDateTime import java.time.format.DateTimeFormatter import java.util.{Properties, Random}/*** 造模擬數據*/ object MockData2Kafka {def main(args: Array[String]): Unit = {val users = Array(1, 2, 3, 4, 5, 6)val itemIds = Array(1001, 1002, 1003, 1004)val categoryIds = Array(10001, 10002, 10003, 10004)val actions = Array("pv", "buy", "cart", "fav")val kafkaProps = new Properties()kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "chb1:9092")kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])val producer = new KafkaProducer[String, String](kafkaProps)val topic = "user_behavior"val random = new Random()while (true) {val value = users(random.nextInt(users.length)) + "," + itemIds(random.nextInt(itemIds.length)) + "," +categoryIds(random.nextInt(categoryIds.length)) + "," + actions(random.nextInt(actions.length)) +"," + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))producer.send(new ProducerRecord[String, String](topic, value))Thread.sleep(300)}} }csv格式: user_id,item_id,category_id,event_time 4,1002,10001,buy,2022-10-17 19:25:10 6,1002,10004,buy,2022-10-17 19:25:11 5,1004,10004,cart,2022-10-17 19:25:11 1,1004,10004,buy,2022-10-17 19:25:11 5,1004,10001,fav,2022-10-17 19:25:12 5,1001,10001,pv,2022-10-17 19:25:12 1,1004,10002,fav,2022-10-17 19:25:12 1,1004,10003,cart,2022-10-17 19:25:13 2,1004,10001,buy,2022-10-17 19:25:13總結
以上是生活随笔為你收集整理的flink sql clinet 实战:模拟数据----flink-1.13.6的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 物联网对网页设计与开发的影响
- 下一篇: 推荐一款超级好用的演讲软件,一键生成PP