Java程序创建Kafka Topic,以及数据生产消费,常用的命令
轉(zhuǎn)自: Java程序創(chuàng)建Kafka Topic,以及數(shù)據(jù)生產(chǎn)消費,常用的命令_Zyy_z_的博客-CSDN博客_java kafka創(chuàng)建topicKafka簡介: Kafka是一個分布式發(fā)布——訂閱消息傳遞系統(tǒng)。Kafka快速、可擴展且耐用。它保留主題中的消息源。生產(chǎn)者將數(shù)據(jù)寫入主題,消費者從主題中讀取數(shù)據(jù)。Kafka的特點: 1. 同時為分布和訂閱提供高吞吐量。據(jù)了解,Kafka每秒可以生產(chǎn)約25萬條消息(50MB),每秒處理55萬條消息...https://blog.csdn.net/Zyy_z_/article/details/101680138
【1】Kafka簡介
Kafka是一個分布式發(fā)布——訂閱消息傳遞系統(tǒng)。Kafka快速、可擴展且耐用。它保留主題中的消息源。生產(chǎn)者將數(shù)據(jù)寫入主題,消費者從主題中讀取數(shù)據(jù)。
1)Kafka的特點:
- 1. 同時為分布和訂閱提供高吞吐量。 據(jù)了解,Kafka每秒可以生產(chǎn)約25萬條消息(50MB),每秒處理55萬條消息(110MB)這里說條數(shù),可能不上特別準(zhǔn)確,因為消息的大小可能不一致;
- 2. 可進行持久化操作,將消息持久化到到磁盤,以日志的形式存儲,因此可用于批量消費,例如ETL,以及實時應(yīng)用程序。 通過將數(shù)據(jù)持久化到硬盤以及replication防止數(shù)據(jù)丟失。
- 3. 分布式系統(tǒng),易于向外拓展。所有的Producer、broker和consumer都會有多個,均為分布式。無需停機即可拓展 機器。
- 4. 消息被處理的狀態(tài)是在consumer端維護,而不是由server端維護,當(dāng)失敗時能自動平衡。
2)Kafka名詞解釋:
3)Kafka常用命令:
【2】kafka java api
【2.1】Java程序操作創(chuàng)建Topic:? ?
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); AdminClient create = KafkaAdminClient.create(props);//創(chuàng)建Topic create.createTopics(Lists.newArrayList(new NewTopic("Topic名稱"),1,(short)1));//一個分區(qū) create.close();//關(guān)閉其他創(chuàng)建Topic得方式Java API:
https://blog.csdn.net/meng984611383/article/details/80500761https://blog.csdn.net/meng984611383/article/details/80500761
【2.2】Kafka生產(chǎn)數(shù)據(jù):
Producer<String, String> producer = new KafkaProducer<>(props); for(int i = 0; i < 100; i++) //生產(chǎn)數(shù)據(jù)producer.send(new ProducerRecord<String, String>("Topic名稱", Integer.toString(i), Integer.toString(i))); producer.close(); //關(guān)閉【2.3】消費數(shù)據(jù)
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("foo", "bar")); while (true) {ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records)System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());生產(chǎn)者的緩沖空間池保留尚未發(fā)送到服務(wù)器的消息,后臺I/O線程負責(zé)將這些消息轉(zhuǎn)換成請求發(fā)送到集群。如果使用后不關(guān)閉生產(chǎn)者,則會泄露這些資源。
send()方法是異步的,添加消息到緩沖區(qū)等待發(fā)送,并立即返回。生產(chǎn)者將單個的消息批量在一起發(fā)送來提高效率。
?
?
總結(jié)
以上是生活随笔為你收集整理的Java程序创建Kafka Topic,以及数据生产消费,常用的命令的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 奥特曼大结局 你一定想不到
- 下一篇: 怪物猎人电脑版怎么下载(怪物猎人pc版下