當前位置:
首頁 >
前端技术
> javascript
>内容正文
javascript
(转) SpringBoot接入两套kafka集群
生活随笔
收集整理的這篇文章主要介紹了
(转) SpringBoot接入两套kafka集群
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
轉自:
SpringBoot接入兩套kafka集群 - 風小雅 - 博客園引入依賴 compile 'org.springframework.kafka:spring-kafka' 第一套kafka配置 package myapp.kafka; importhttps://www.cnblogs.com/ylty/p/13673357.html
引入依賴
compile 'org.springframework.kafka:spring-kafka'第一套kafka配置
package myapp.kafka;import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Primary; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.core.*; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;import java.util.HashMap; import java.util.Map;/*** 默認的kafka配置** @author zhengqian*/ @Slf4j @Configuration @Data public class K1KafkaConfiguration {@Value("${app-name.kafka.k1.consumer.bootstrap-servers}")private String consumerBootstrapServers;@Value("${app-name.kafka.k1.consumer.group-id}")private String groupId;@Value("${app-name.kafka.k1.consumer.auto-offset-reset}")private String autoOffsetReset;@Value("${app-name.kafka.k1.consumer.enable-auto-commit}")private Boolean enableAutoCommit;@Value("${app-name.kafka.k2.producer.bootstrap-servers}")private String producerBootstrapServers;@Bean@PrimaryKafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());factory.setConcurrency(3);factory.getContainerProperties().setPollTimeout(3000);return factory;}@Beanpublic ConsumerFactory<Integer, String> consumerFactory() {return new DefaultKafkaConsumerFactory<>(consumerConfigs());}@Beanpublic Map<String, Object> consumerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, consumerBootstrapServers);props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);return props;}@Beanpublic Map<String, Object> producerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, producerBootstrapServers);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);return props;}@Beanpublic ProducerFactory<String, String> producerFactory() {return new DefaultKafkaProducerFactory<>(producerConfigs());}@Beanpublic KafkaTemplate<String, String> kafkaTemplate() {return new KafkaTemplate<>(producerFactory());} }第二套kafka配置
package myapp.kafka;import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Primary; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.core.*; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;import java.util.HashMap; import java.util.Map;/*** 默認的kafka配置** @author zhengqian*/ @Slf4j @Configuration @Data public class K2KafkaConfiguration {@Value("${app-name.kafka.k2.consumer.bootstrap-servers}")private String consumerBootstrapServers;@Value("${app-name.kafka.k2.consumer.group-id}")private String groupId;@Value("${app-name.kafka.k2.consumer.auto-offset-reset}")private String autoOffsetReset;@Value("${app-name.kafka.k2.consumer.enable-auto-commit}")private Boolean enableAutoCommit;@Value("${app-name.kafka.k2.producer.bootstrap-servers}")private String producerBootstrapServers;@Bean@PrimaryKafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactoryK2() {ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactoryK2());factory.setConcurrency(3);factory.getContainerProperties().setPollTimeout(3000);return factory;}@Beanpublic ConsumerFactory<Integer, String> consumerFactoryK2() {return new DefaultKafkaConsumerFactory<>(consumerConfigsK2());}@Beanpublic Map<String, Object> consumerConfigsK2() {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, consumerBootstrapServers);props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);return props;}@Beanpublic Map<String, Object> producerConfigsK2() {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, producerBootstrapServers);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);return props;}@Beanpublic ProducerFactory<String, String> producerFactoryK2() {return new DefaultKafkaProducerFactory<>(producerConfigsK2());}@Beanpublic KafkaTemplate<String, String> kafkaTemplateK2() {return new KafkaTemplate<>(producerFactoryK2());} }配置文件
app-name: kafka:k1:consumer:bootstrap-servers: host1:9092group-id: my-appauto-offset-reset: earliestkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerenable-auto-commit: trueproducer:bootstrap-servers: host1:9092key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerk2:consumer:bootstrap-servers: host2:9092group-id: my-appauto-offset-reset: earliestkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerenable-auto-commit: trueproducer:bootstrap-servers: host2:9092key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializer指定消費的kafka集群
@KafkaListener(topics = "topic-name", containerFactory = "kafkaListenerContainerFactoryK2")public void onEvent(ConsumerRecord<String, String> record) {// 省略}指定生產者發生的kafka集群
public class KafkaTest {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;@Testpublic void test() {ListenableFuture<SendResult<String, String>> result = kafkaTemplate.send("topic", "data");try {SendResult<String, String> value = result.get(2, TimeUnit.SECONDS);System.out.println(value.getProducerRecord());System.out.println(value.getRecordMetadata());} catch (Exception e) {e.printStackTrace();}} }總結
以上是生活随笔為你收集整理的(转) SpringBoot接入两套kafka集群的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 十大智商最高的龟(十大最值得养的龟)
- 下一篇: 华硕路由器设置地址(华硕路由器设置地址r