當(dāng)前位置:
首頁 >
前端技术
> javascript
>内容正文
javascript
SpringBoot 自定义Kafka消息序列化和反序列化
生活随笔
收集整理的這篇文章主要介紹了
SpringBoot 自定义Kafka消息序列化和反序列化
小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.
1. 概述
Kafka傳輸自定義的DTO對象時(shí),不能像平時(shí)一樣使用StringSerializer和StringDeserializer。這種情況需要自己實(shí)現(xiàn)對應(yīng)DTO的序列化器和反序列化器。假設(shè)現(xiàn)在有個(gè) KafkaMsgDto 類,代碼如下:
@Data public class KafkaMsgDto {private String id;private ActionEnum action;public KafkaMsgDto(){}public KafkaMsgDto(String id, ActionEnum action){this.id = id;this.action = action;}public enum ActionEnum{SAVE,DELETE;} }2. Serializer
public class KafkaMsgSerializer implements Serializer<KafkaMsgDto> {private String encoding = "UTF8";public KafkaMsgSerializer(){}@Overridepublic void configure(Map<String, ?> configs, boolean isKey) {String propertyName = isKey ? "key.serializer.encoding" : "value.serializer.encoding";Object encodingValue = configs.get(propertyName);if (encodingValue == null) {encodingValue = configs.get("serializer.encoding");}if (encodingValue instanceof String) {this.encoding = (String)encodingValue;}}@Overridepublic byte[] serialize(String s, KafkaMsgDto data) {try {if (data == null){return null;}return JSON.toJSONString(data).getBytes(this.encoding);} catch (UnsupportedEncodingException var4) {throw new SerializationException("Error when serializing string to byte[] due to unsupported encoding " + this.encoding);}} }3. Deserializer
public class KafkaMsgDeserializer implements Deserializer<KafkaMsgDto> {private String encoding = "UTF8";public KafkaMsgDeserializer(){}public void configure(Map<String, ?> configs, boolean isKey) {String propertyName = isKey ? "key.deserializer.encoding" : "value.deserializer.encoding";Object encodingValue = configs.get(propertyName);if (encodingValue == null) {encodingValue = configs.get("deserializer.encoding");}if (encodingValue instanceof String) {this.encoding = (String)encodingValue;}}public KafkaMsgDto deserialize(String topic, byte[] data) {try {if (data == null){return null;}return JSON.parseObject(new String(data, this.encoding), KafkaMsgDto.class);} catch (UnsupportedEncodingException var4) {throw new SerializationException("Error when deserializing byte[] to string due to unsupported encoding " + this.encoding);}} }4. application.properties
修改對應(yīng)的序列化、反序列化配置
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=com.train.kafka.serialization.KafkaMsgSerializerspring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=com.train.kafka.serialization.KafkaMsgDeserializer5. 發(fā)送消息
KafkaMsgDto dto = new KafkaMsgDto(id, KafkaMsgDto.ActionEnum.SAVE); kafkaTemplate.send("test-topic", dto);6. 接收消息
@KafkaListener(topics = "test-topic")public void onListener(KafkaMsgDto dto){if (dto == null){System.out.println("接收到空消息");return;}System.out.println(String.format("接收到消息:%s", dto.toString()));if (dto.getAction() == KafkaMsgDto.ActionEnum.DELETE){//執(zhí)行刪除業(yè)務(wù)邏輯}else{//執(zhí)行保存業(yè)務(wù)邏輯}}總結(jié)
以上是生活随笔為你收集整理的SpringBoot 自定义Kafka消息序列化和反序列化的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: docke容器无法访问宿主主机的端口
- 下一篇: Windows环境安装Tomcat