當前位置:
首頁 >
前端技术
> javascript
>内容正文
javascript
KAFKA 集成 SpringBoot2 消息发送和消费消息(基础篇)
生活随笔
收集整理的這篇文章主要介紹了
KAFKA 集成 SpringBoot2 消息发送和消费消息(基础篇)
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
文章目錄
- 1. 技術選型
- 2. 導入依賴
- 3. kafka配置
- 4. 生產者(同步)
- 5. 生產者(異步)
- 6. 消費者
1. 技術選型
| jdk | 1.8.0_202 |
| springboot | 2.5.4 |
| kafka server | kafka_2.12-2.8.0 |
| kafka client | 2.7.1 |
| zookeeper | 3.7.0 |
2. 導入依賴
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>3. kafka配置
properties版本
spring.application.name=springboot-kafka server.port=8080 # kafka 配置 spring.kafka.bootstrap-servers=node1:9092# producer 配置 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.IntegerSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer # 生產者每個批次最多方多少條記錄 spring.kafka.producer.batch-size=16384 # 生產者一端總的可用緩沖區大小,此處設置為32M * 1024 * 1024 spring.kafka.producer.buffer-memory=33544432# consumer 配置 spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.IntegerDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.group-id=springboot-consumer-02 # earliest - 如果找不到當前消費者的有效偏移量,則自動重置向到最開始 spring.kafka.consumer.auto-offset-reset=earliest # 消費者的偏移量是自動提交還是手動提交,此處自動提交偏移量 spring.kafka.consumer.enable-auto-commit=true # 消費者偏移量自動提交時間間隔 spring.kafka.consumer.auto-commit-interval=1000yml版本
server:port: 8080 spring:application:name: springboot-kafkakafka:bootstrap-servers: 192.168.92.104:9092consumer:auto-commit-interval: 1000auto-offset-reset: earliestenable-auto-commit: truegroup-id: springboot-consumer-02key-deserializer: org.apache.kafka.common.serialization.IntegerDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerproducer:batch-size: 16384buffer-memory: 33544432key-serializer: org.apache.kafka.common.serialization.IntegerSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializer4. 生產者(同步)
package com.gblfy.demo.controller;import org.apache.kafka.clients.producer.RecordMetadata; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController;import java.util.concurrent.ExecutionException;@RestController public class KafkaSyncController {private final static Logger log = LoggerFactory.getLogger(KafkaSyncController.class);@Autowiredprivate KafkaTemplate<Integer, String> kafkaTemplate;@RequestMapping("/send/sync/{message}")public String send(@PathVariable String message) {//同步發送消息ListenableFuture<SendResult<Integer, String>> future = kafkaTemplate.send("topic-springboot-01", 0, 0, message);try {SendResult<Integer, String> sendResult = future.get();RecordMetadata metadata = sendResult.getRecordMetadata();log.info("發送的主題:{} ,發送的分區:{} ,發送的偏移量:{} ", metadata.topic(), metadata.partition(), metadata.offset());// System.out.println(metadata.topic() + "\t" + metadata.partition() + "\t" + metadata.offset());} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}return "success";}}5. 生產者(異步)
package com.gblfy.demo.controller;import org.apache.kafka.clients.producer.RecordMetadata; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.ListenableFutureCallback; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController;@RestController public class KafkaAsyncController {private final static Logger log = LoggerFactory.getLogger(KafkaAsyncController.class);@Autowiredprivate KafkaTemplate<Integer, String> kafkaTemplate;//設置回調函數,異步等待broker端的返回結束@RequestMapping("/send/async/{message}")public String sendAsync(@PathVariable String message) {//同步發送消息ListenableFuture<SendResult<Integer, String>> future = kafkaTemplate.send("topic-springboot-01", 0, 1, message);future.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() {@Overridepublic void onFailure(Throwable e) {log.info("發送消息失敗: {}", e.getMessage());}@Overridepublic void onSuccess(SendResult<Integer, String> result) {RecordMetadata metadata = result.getRecordMetadata();log.info("發送的主題:{} ,發送的分區:{} ,發送的偏移量:{} ", metadata.topic(), metadata.partition(), metadata.offset());}});return "success";} }6. 消費者
package com.gblfy.kafka.consumer;import org.apache.kafka.clients.consumer.ConsumerRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component;@Component public class KafkaConsumer {private final static Logger log = LoggerFactory.getLogger(KafkaConsumer.class);@KafkaListener(topics = {"topic-springboot-01"})public void onMessage(ConsumerRecord<Integer, String> record) {log.info("消費者接收到消息主題:{} ,消息的分區:{} ,消息偏移量:{} ,消息key: {} ,消息values:{} ",record.topic(), record.partition(), record.offset(), record.key(), record.value());} }總結
以上是生活随笔為你收集整理的KAFKA 集成 SpringBoot2 消息发送和消费消息(基础篇)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Zipkin 存储追踪数据至 MySQL
- 下一篇: vue+vant 移动端H5 商城项目_