當前位置:
首頁 >
前端技术
> javascript
>内容正文
javascript
【基于注解方式】Spring整合Kafka
生活随笔
收集整理的這篇文章主要介紹了
【基于注解方式】Spring整合Kafka
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
文章目錄
- 1. 添加Maven依賴
- 2. 配置與參數分離
- 3. 工具類度內容
- 4. Producer 消息生產者配置
- 5. Consumer 消息消費者配置
- 6. 使用注解監聽消息
- 7. 請求測試
- 8. 測試結果
1. 添加Maven依賴
<!-- 添加spring-kafka支持 --> <dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.2.2.RELEASE</version> </dependency>2. 配置與參數分離
使用kafka.properties文件形式,將配置與參數分離,方便管理。
kafka.properties文件如下:
3. 工具類度內容
添加 PropertiesUtils 工具類,來讀取 properties文件內容
package com.demo.utils;import org.slf4j.Logger; import org.slf4j.LoggerFactory;import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.util.HashMap; import java.util.Map; import java.util.Properties; import java.util.Set;public class PropertiesUtils {private static Logger log = LoggerFactory.getLogger(PropertiesUtils.class);/*** 根據文件名獲取Properties對象* @param fileName* @return*/public static Properties read(String fileName){InputStream in = null;try{Properties prop = new Properties();//InputStream in = Object.class.getResourceAsStream("/"+fileName);in = PropertiesUtils.class.getClassLoader().getResourceAsStream(fileName);if(in == null){return null;}prop.load(in);return prop;}catch(Exception e){e.printStackTrace();}finally{try {if(in != null){in.close();}} catch (IOException e) {e.printStackTrace();}}return null;}/*** 根據文件名和鍵名獲取值* @param fileName* @param key* @return*/public static String readKeyValue(String fileName, String key){Properties prop = read(fileName);if(prop != null){return prop.getProperty(key);}return null;}/*** 根據鍵名獲取值* @param prop* @param key* @return*/public static String readKeyValue(Properties prop, String key){if(prop != null){return prop.getProperty(key);}return null;}/*** 寫入* @param fileName* @param key* @param value*/public static void writeValueByKey(String fileName, String key, String value){Map<String, String> properties = new HashMap<String, String>();properties.put(key, value);writeValues(fileName, properties);}/*** 寫入* @param fileName* @param properties*/public static void writeValues(String fileName, Map<String, String> properties){InputStream in = null;OutputStream out = null;try {in = PropertiesUtils.class.getClassLoader().getResourceAsStream(fileName);if(in == null){throw new RuntimeException("讀取的文件("+fileName+")不存在,請確認!"); }Properties prop = new Properties();prop.load(in);String path = PropertiesUtils.class.getResource("/"+fileName).getPath();out = new FileOutputStream(path);if(properties != null){Set<String> set = properties.keySet();for (String string : set) {prop.setProperty(string, properties.get(string));log.info("更新"+fileName+"的鍵("+string+")值為:"+properties.get(string));}}prop.store(out, "update properties");} catch (Exception e) {e.printStackTrace();} finally{try {if(in != null){in.close();}if(out != null){out.flush();out.close();}} catch (Exception e2) {e2.printStackTrace();}}}public static void main(String[] args) throws Exception {//System.out.println("read="+read("config.properties"));//System.out.println("readKeyValue="+readKeyValue("config.properties","superAdmin"));//writeValueByKey(CC.WEIXI_PROPERTIES, "access_token", "ddd");Map<String, String> properties = new HashMap<String, String>();properties.put("access_token", "ddd2");properties.put("access_token1", "ee2");properties.put("bbbb", "bbbb");} }4. Producer 消息生產者配置
package com.demo.kafka;import com.demo.utils.PropertiesUtils; import org.apache.kafka.clients.producer.ProducerConfig; import org.springframework.beans.factory.annotation.Configurable; import org.springframework.context.annotation.Bean; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import org.springframework.stereotype.Component;import java.util.HashMap; import java.util.Map; import java.util.Properties;/*** Kafka 消息生產者配置*/ @Configurable @Component @EnableKafka public class KafkaProducerConfig {Properties properties = PropertiesUtils.read("kafka.properties");public KafkaProducerConfig() {System.out.println("kafka 生產者配置加載...");}@Beanpublic ProducerFactory<String, String> producerFactory() {return new DefaultKafkaProducerFactory(producerProperties());}public Map<String, Object> producerProperties() {Map<String, Object> props = new HashMap<String, Object>();//Kafka服務地址props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getProperty("kafka.producer.bootstrap.servers"));//設置當前客戶端idprops.put(ProducerConfig.CLIENT_ID_CONFIG, properties.getProperty("kafka.producer.client.id"));//設置消費端確認機制props.put(ProducerConfig.ACKS_CONFIG, properties.getProperty("kafka.producer.acks"));//發送失敗重試次數props.put(ProducerConfig.RETRIES_CONFIG, properties.getProperty("kafka.producer.retries"));//批處理條數,當多個記錄被發送至統一分區時,producer對于同一個分區來說,會按照 batch.size 的大小進行統一收集,批量發送props.put(ProducerConfig.BATCH_SIZE_CONFIG, properties.getProperty("kafka.producer.batch.size"));//與 batch.size 配合使用。延遲統一收集,產生聚合,然后批量發送至brokerprops.put(ProducerConfig.LINGER_MS_CONFIG,properties.getProperty("kafka.producer.linger.ms"));//Key序列化props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, properties.getProperty("kafka.producer.key.serializer"));//Value序列化props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, properties.getProperty("kafka.producer.value.serializer"));return props;}@Beanpublic KafkaTemplate<String,String> kafkaTemplate(){KafkaTemplate<String,String> kafkaTemplate = new KafkaTemplate<String, String>(producerFactory(),true);//設置默認的topic(此處可做一些具體設置)kafkaTemplate.setDefaultTopic(properties.getProperty("kafka.producer.defaultTopic"));return kafkaTemplate;} }5. Consumer 消息消費者配置
package com.demo.kafka;import com.demo.utils.PropertiesUtils; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.springframework.beans.factory.annotation.Configurable; import org.springframework.context.annotation.Bean; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import org.springframework.stereotype.Component;import java.util.HashMap; import java.util.Map; import java.util.Properties;@Configurable @Component @EnableKafka public class KafkaConsumerConfig {Properties properties = PropertiesUtils.read("kafka.properties");public KafkaConsumerConfig() {System.out.println("kafka消費者配置加載...");}public Map<String, Object> consumerProperties() {Map<String, Object> props = new HashMap<String, Object>();//Kafka服務地址props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getProperty("kafka.consumer.bootstrap.servers"));//消費組props.put(ConsumerConfig.GROUP_ID_CONFIG, properties.getProperty("kafka.consumer.group.id"));//設置props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, properties.getProperty("kafka.consumer.enable.auto.commit"));//設置間隔時間props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, properties.getProperty("kafka.consumer.auto.commit.interval.ms"));//Key反序列化類props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, properties.getProperty("kafka.consumer.key.deserializer"));//Value反序列化props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, properties.getProperty("kafka.consumer.value.deserializer"));//從頭開始消費props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, properties.getProperty("kafka.consumer.auto.offset.reset"));return props;}public ConsumerFactory<String, String> consumerFactory() {return new DefaultKafkaConsumerFactory<String, String>(consumerProperties());}@Beanpublic KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory();factory.setConsumerFactory(consumerFactory());factory.setConcurrency(3);factory.getContainerProperties().setPollTimeout(3000);return factory;}@Beanpublic KafkaConsumerListener kafkaConsumerListener() {return new KafkaConsumerListener();} }6. 使用注解監聽消息
通過 @KafkaListener 注解的形式,消費topic中的消息
public class KafkaConsumerListener {@KafkaListener(topics = "testTopic01")public void listen01(ConsumerRecord<String,String> consumerRecord){System.out.println("開始消費testTopic01的消息");System.out.println("消費者線程:"+Thread.currentThread().getName()+"[ 消息 來自kafkatopic:"+consumerRecord.topic()+",分區:"+consumerRecord.partition() +" ,委托時間:"+consumerRecord.timestamp()+"]消息內容如下:");System.out.println(consumerRecord.value());}@KafkaListener(topics = "testTopic02")public void listen02(ConsumerRecord<String,String> consumerRecord){System.out.println("開始消費testTopic02的消息");System.out.println(consumerRecord.value());}/*** 消費 某個topic 下指定分區的消息*/@KafkaListener(topicPartitions = {@TopicPartition(topic = "liuzebiao",partitions = {"1"})})public void topicMessage(ConsumerRecord<?, ?> record,String content){System.out.println("消息:"+ content);System.out.println("消息被消費------>Topic:"+ record.topic() + ",------>Value:" + record.value() +",------>Partition:" + record.partition());} }7. 請求測試
通過Spring Controller的形式,開始測試
@Controller @RequestMapping("kafka") public class KafkaController {@AutowiredKafkaTemplate kafkaTemplate;/*** 消息發送*/@RequestMapping("producer")@ResponseBodypublic void producer(){kafkaTemplate.send("testTopic01","producer發送消息01");kafkaTemplate.send("testTopic02","producer發送消息02");}}8. 測試結果
通過 localhost:8080/kafka/producer 調用接口,使用 kafkaTemplate 發送消息。
消息發送成功后,在控制臺會收到如下信息:
開始消費testTopic01的消息 消費者線程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1[ 消息 來自kafkatopic:testTopic01,分區:0 ,委托時間:1568107936693]消息內容如下: producer發送消息01開始消費testTopic02的消息 producer發送消息02項目鏈接:
https://github.com/gb-heima/spring-annotataion-kafka
總結
以上是生活随笔為你收集整理的【基于注解方式】Spring整合Kafka的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Vue优化策略_项目发布_01
- 下一篇: oracle sql语句大全