javascript
Apache Kafka-SpringBoot整合Kafka发送复杂对象
文章目錄
- Spring Kafka概述
- Code
- 依賴
- 配置文件
- 消息
- 生產者
- 消費者
- 單元測試: 同步發送
- 測試
- 單元測試: 異步發送
- 源碼地址
Spring Kafka概述
Spring提供了 Spring-Kafka 項目來操作 Kafka。
https://spring.io/projects/spring-kafka
Code
我們先對 Kafka-Spring 做個快速入門,實現 Producer發送消息 ,同時Consumer 消費消息。
依賴
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- 引入 Spring-Kafka 依賴 --> <dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><scope>test</scope></dependency></dependencies>Spring Boot 已經提供了 Kafka 的自動化配置的支持,但沒有提供 spring-boot-kafka-starter 包…
配置文件
spring:# Kafka 配置項,對應 KafkaProperties 配置類kafka:bootstrap-servers: 192.168.126.140:9092 # 指定 Kafka Broker 地址,可以設置多個,以逗號分隔# Kafka Producer 配置項producer:acks: 1 # 0-不應答。1-leader 應答。all-所有 leader 和 follower 應答。retries: 3 # 發送失敗時,重試發送的次數key-serializer: org.apache.kafka.common.serialization.StringSerializer # 消息的 key 的序列化value-serializer: org.springframework.kafka.support.serializer.JsonSerializer # 消息的 value 的序列化# Kafka Consumer 配置項consumer:auto-offset-reset: earliest # 設置消費者分組最初的消費進度為 earliestkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.springframework.kafka.support.serializer.JsonDeserializerproperties:spring:json:trusted:packages: com.artisan.springkafka.domain# Kafka Consumer Listener 監聽器配置listener:missing-topics-fatal: false # 消費監聽接口監聽的主題不存在時,默認會報錯。所以通過設置為 false ,解決報錯logging:level:org:springframework:kafka: ERROR # spring-kafka apache:kafka: ERROR # kafkaspring.kafka 配置項, 對應 KafkaProperties 配置類 。Spring Boot 提供的 KafkaAutoConfiguration 自動化配置類,實現 Kafka 的自動配置,創建相應的 Producer 和 Consumer 。
特別說明一下: 生產者 的value-serializer 配置了 Spring-Kafka 提供的 JsonSerializer 序列化類, 使用 JSON 的方式,序列化復雜的 Message 消息。
消費者的 value-serializer 配置,同樣使用了 JsonDeserializer 反序列化類,因為稍后我們要使用 JSON 的方式,反序列化復雜的 Message 消息。
properties.spring.json.trusted.packages 需要配置com.artisan.springkafka.domain 包下的 Message 類們。因為 JsonDeserializer 在反序列化消息時,考慮到安全性,只反序列化成信任的 Message 類。 務必配置
在序列化時,使用了 JsonSerializer 序列化 Message 消息對象,它會在 Kafka 消息 Headers 的 TypeId 上,值為 Message 消息對應的類全名。
在反序列化時,使用了 JsonDeserializer 序列化出 Message 消息對象,它會根據 Kafka 消息 Headers 的 TypeId 的值,反序列化消息內容成該 Message 對象。
消息
package com.artisan.springkafka.domain;/*** @author 小工匠* @version 1.0* @description: TODO* @date 2021/2/17 22:27* @mark: show me the code , change the world*/ public class MessageMock {private Integer id ;private String name ;public MessageMock() {}public MessageMock(Integer id, String name) {this.id = id;this.name = name;}public Integer getId() {return id;}public void setId(Integer id) {this.id = id;}public String getName() {return name;}public void setName(String name) {this.name = name;}@Overridepublic String toString() {return "MessageMock{" +"id=" + id +", name='" + name + '\'' +'}';} }生產者
package com.artisan.springkafka.producer;import com.artisan.springkafka.constants.TOPIC; import com.artisan.springkafka.domain.MessageMock; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.stereotype.Component;import java.util.Random; import java.util.concurrent.ExecutionException;/*** @author 小工匠* @version 1.0* @description: TODO* @date 2021/2/17 22:25* @mark: show me the code , change the world*/@Component public class ArtisanProducerMock {@Autowiredprivate KafkaTemplate<Object,Object> kafkaTemplate ;/*** 同步發送* @return* @throws ExecutionException* @throws InterruptedException*/public SendResult sendMsgSync() throws ExecutionException, InterruptedException {// 模擬發送的消息Integer id = new Random().nextInt(100);MessageMock messageMock = new MessageMock(id,"artisanTestMessage-" + id);// 同步等待return kafkaTemplate.send(TOPIC.TOPIC, messageMock).get();}}消費者
模擬兩個不同消費組下的消費者 ,測試消費情況
【消費者A 】
package com.artisan.springkafka.consumer;import com.artisan.springkafka.domain.MessageMock; import com.artisan.springkafka.constants.TOPIC; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component;/*** @author 小工匠* @version 1.0* @description: TODO* @date 2021/2/17 22:33* @mark: show me the code , change the world*/@Component public class ArtisanCosumerMock {private Logger logger = LoggerFactory.getLogger(getClass());private static final String CONSUMER_GROUP_PREFIX = "MOCK-A" ;@KafkaListener(topics = TOPIC.TOPIC ,groupId = CONSUMER_GROUP_PREFIX + TOPIC.TOPIC)public void onMessage(MessageMock messageMock){logger.info("【接受到消息][線程:{} 消息內容:{}]", Thread.currentThread().getName(), messageMock);}}在方法上添加了 @KafkaListener 注解,指定了消費的 Topic 和 消費者分組 。
建議:建 一個類,對應一個方法。一個消費者分組,僅消費一個 Topic ,確保每個消費者分組職責單一。
【消費者B 】
package com.artisan.springkafka.consumer;import com.artisan.springkafka.domain.MessageMock; import com.artisan.springkafka.constants.TOPIC; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component;/*** @author 小工匠* @version 1.0* @description: TODO* @date 2021/2/17 22:33* @mark: show me the code , change the world*/@Component public class ArtisanCosumerMockDiffConsumeGroup {private Logger logger = LoggerFactory.getLogger(getClass());private static final String CONSUMER_GROUP_PREFIX = "MOCK-B" ;@KafkaListener(topics = TOPIC.TOPIC ,groupId = CONSUMER_GROUP_PREFIX + TOPIC.TOPIC)public void onMessage(MessageMock messageMock){logger.info("【接受到消息][線程:{} 消息內容:{}]", Thread.currentThread().getName(), messageMock);}}在方法上添加了 @KafkaListener 注解,指定了消費的 Topic 和 消費者分組 。
消費組和第一個消費者屬于不同的消費組,請注意。
單元測試: 同步發送
package com.artisan.springkafka.produceTest;import com.artisan.springkafka.SpringkafkaApplication; import com.artisan.springkafka.producer.ArtisanProducerMock; import org.junit.Test; import org.junit.runner.RunWith; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.kafka.support.SendResult; import org.springframework.test.context.junit4.SpringRunner;import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException;/*** @author 小工匠* @version 1.0* @description: TODO* @date 2021/2/17 22:40* @mark: show me the code , change the world*/@RunWith(SpringRunner.class) @SpringBootTest(classes = SpringkafkaApplication.class) public class ProduceMockTest {private Logger logger = LoggerFactory.getLogger(getClass());@Autowiredprivate ArtisanProducerMock artisanProducerMock;@Testpublic void testSyncSend() throws ExecutionException, InterruptedException {SendResult sendResult = artisanProducerMock.sendMsgSync();logger.info("testSyncSend Result = topic:[{}] , partition:[{}], offset:[{}]",sendResult.getRecordMetadata().topic(),sendResult.getRecordMetadata().partition(),sendResult.getRecordMetadata().offset());// 阻塞等待,保證消費new CountDownLatch(1).await();}}在方法內部,調用 KafkaTemplate#send(topic, data) 方法,異步發送消息。不過,因為后面調用了 ListenableFuture#get() 方法,阻塞等待發送結果,實現了同步的效果。
測試
運行上面的單元測試 ,
2021-02-18 00:13:50.789 INFO 24768 --- [ main] c.a.s.produceTest.ProduceMockTest : testSyncSend Result = topic:[MOCK_TOPIC] , partition:[0], offset:[12] 2021-02-18 00:13:50.849 INFO 24768 --- [ntainer#1-0-C-1] a.s.c.ArtisanCosumerMockDiffConsumeGroup : 【接受到消息][線程:org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1 消息內容:MessageMock{id=51, name='artisanTestMessage-51'}] 2021-02-18 00:13:50.849 INFO 24768 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock : 【接受到消息][線程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息內容:MessageMock{id=51, name='artisanTestMessage-51'}]可以看到我們發送了一個消息到MOCK_TOPIC上, 兩個消費者屬于不同的消費者組,均訂閱了該TOPIC, 從結果上可以看到 該消息 可以分別被消費者組 “MOCK-ATOPIC” 和消費者組 “MOCK-BTOPIC” 都消費一次。
但是,如果我們啟動多個該示例的實例,則消費者分組 “MOCK-ATOPIC” 和 “MOCK-BTOPIC” 都會有多個 Consumer實例, 結果會怎樣呢?
我們再運行一個, 上次的不要關哈
此時,我們再發送一條消息到MOCK_TOPIC,只會被 "MOCK-ATOPIC"的一個 Consumer 消費一次,也同樣只會被 “MOCK-BTOPIC” 的一個 Consumer 消費一次。
這個有啥用呢? 舉個例子
通過集群消費的機制,可以實現針對相同 Topic ,不同消費者分組實現各自的業務邏輯。
比如說用戶注冊成功時,發送一條 Topic 為 “XXXX” 的消息。 不同模塊使用不同的消費者分組,訂閱該 Topic ,實現各自的拓展邏輯:
- 積分模塊:給用戶增加 積分
- 優惠劵模塊:發放新用戶專享優惠
- …
這樣,就可以將注冊成功后的業務拓展邏輯,實現業務上的解耦,未來也更加容易拓展。同時,也提高了注冊接口的性能。
單元測試: 異步發送
com.artisan.springkafka.producer.ArtisanProducerMock
public ListenableFuture<SendResult<Object, Object>> sendMsgASync() throws ExecutionException, InterruptedException {// 模擬發送的消息Integer id = new Random().nextInt(100);MessageMock messageMock = new MessageMock(id,"messageSendByAsync-" + id);// 異步發送消息ListenableFuture<SendResult<Object, Object>> result = kafkaTemplate.send(TOPIC.TOPIC, messageMock);return result ;}單元測試
com.artisan.springkafka.produceTest.ProduceMockTest
@Testpublic void testAsynSend() throws ExecutionException, InterruptedException {artisanProducerMock.sendMsgASync().addCallback(new ListenableFutureCallback<SendResult<Object, Object>>() {@Overridepublic void onFailure(Throwable throwable) {logger.info(" 發送異常{}]]", throwable);}@Overridepublic void onSuccess(SendResult<Object, Object> objectObjectSendResult) {logger.info("回調結果 Result = topic:[{}] , partition:[{}], offset:[{}]",objectObjectSendResult.getRecordMetadata().topic(),objectObjectSendResult.getRecordMetadata().partition(),objectObjectSendResult.getRecordMetadata().offset());}});// 阻塞等待,保證消費new CountDownLatch(1).await();}測試結果 同上
2021-02-18 00:40:22.443 INFO 24056 --- [ad | producer-1] c.a.s.produceTest.ProduceMockTest : 回調結果 Result = topic:[MOCK_TOPIC] , partition:[0], offset:[17] 2021-02-18 00:40:22.504 INFO 24056 --- [ntainer#0-0-C-1] c.a.s.consumer.ArtisanCosumerMock : 【接受到消息][線程:org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1 消息內容:MessageMock{id=37, name='messageSendByAsync-37'}] 2021-02-18 00:40:22.504 INFO 24056 --- [ntainer#1-0-C-1] a.s.c.ArtisanCosumerMockDiffConsumeGroup : 【接受到消息][線程:org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1 消息內容:MessageMock{id=37, name='messageSendByAsync-37'}]-
通過日志可以看到,發送的消息,分別被 ArtisanCosumerMock 和 ArtisanCosumerMockDiffConsumeGroup 兩個消費者(位于不同的消費者分組)均消費了一次。
-
兩個消費者在不同的線程中,消費了該條消息
源碼地址
https://github.com/yangshangwei/boot2/tree/master/springkafka
總結
以上是生活随笔為你收集整理的Apache Kafka-SpringBoot整合Kafka发送复杂对象的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Apache Kafka-生产消费基础篇
- 下一篇: Apache Kafka-生产者_批量发