當前位置:
首頁 >
前端技术
> javascript
>内容正文
javascript
Kafka笔记-Spring Boot消费者构造
生活随笔
收集整理的這篇文章主要介紹了
Kafka笔记-Spring Boot消费者构造
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
程序運行截圖如下:
生產者端
消費者打印:
那個HOW ARE YOU就是了!
這里關鍵是:
這個@KafkaListener注解,監聽了數據。
相關的配置文件如下:
package com.kafkatest.kafkatest.config;import com.kafkatest.kafkatest.common.MessageEntity; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import org.springframework.kafka.support.serializer.JsonDeserializer;import java.util.HashMap; import java.util.Map;@Configuration @EnableKafka public class KafkaConsumerConfig {@Value("${kafka.consumer.servers}")private String servers;@Value("${kafka.consumer.enable.auto.commit}")private boolean enableAutoCommit;@Value("${kafka.consumer.session.timeout}")private String sessionTimeout;@Value("${kafka.consumer.auto.commit.interval}")private String autoCommitInterval;@Value("${kafka.consumer.group.id}")private String groupId;@Value("${kafka.consumer.auto.offset.reset}")private String autoOffsetReset;@Value("${kafka.consumer.concurrency}")private int concurrency;@Beanpublic KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, MessageEntity>> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, MessageEntity> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());factory.setConcurrency(concurrency);factory.getContainerProperties().setPollTimeout(1500);return factory;}private ConsumerFactory<String, MessageEntity> consumerFactory() {return new DefaultKafkaConsumerFactory<>(consumerConfigs(),new StringDeserializer(),new JsonDeserializer<>(MessageEntity.class));}private Map<String, Object> consumerConfigs() {Map<String, Object> propsMap = new HashMap<>();propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);return propsMap;} }application.properties如下:
kafka.producer.servers=122.51.245.141:9092 kafka.producer.retries=0 kafka.producer.batch.size=4096 kafka.producer.linger=1 kafka.producer.buffer.memory=40960 kafka.topic.default=TESTkafka.consumer.zookeeper.connect=122.51.245.141:2181 kafka.consumer.servers=122.51.245.141:9092 kafka.consumer.enable.auto.commit=true kafka.consumer.session.timeout=6000 kafka.consumer.auto.commit.interval=100 kafka.consumer.auto.offset.reset=latest kafka.consumer.topic=TEST kafka.consumer.group.id=TEST kafka.consumer.concurrency=10源碼下載地址:
https://github.com/fengfanchen/Java/tree/master/kafkatest_web_consumer%26producer
?
總結
以上是生活随笔為你收集整理的Kafka笔记-Spring Boot消费者构造的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Qt笔记-Qt获取百度下拉推荐词
- 下一篇: Java文档阅读笔记-EJB Tutor