Kafka消息序列化和反序列化(上)
歡迎支持筆者新作:《深入理解Kafka:核心設計與實踐原理》和《RabbitMQ實戰指南》,同時歡迎關注筆者的微信公眾號:朱小廝的博客。
歡迎跳轉到本文的原文鏈接:https://honeypps.com/mq/kafka-message-serialize-and-deserialize-1/
Kafka Producer在發送消息時必須配置的參數為:bootstrap.servers、key.serializer、value.serializer。序列化操作是在攔截器(Interceptor)執行之后并且在分配分區(partitions)之前執行的。
首先我們通過一段示例代碼來看下普通情況下Kafka Producer如何編寫:
public class ProducerJavaDemo {public static final String brokerList = "192.168.0.2:9092,192.168.0.3:9092,192.168.0.4:9092";public static final String topic = "hidden-topic";public static void main(String[] args) {Properties properties = new Properties();properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");properties.put("client.id", "hidden-producer-client-id-1");properties.put("bootstrap.servers", brokerList);Producer<String,String> producer = new KafkaProducer<String,String>(properties);while (true) {String message = "kafka_message-" + new Date().getTime() + "-edited by hidden.zhu";ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(topic,message);try {Future<RecordMetadata> future = producer.send(producerRecord, new Callback() {public void onCompletion(RecordMetadata metadata, Exception exception) {System.out.print(metadata.offset()+" ");System.out.print(metadata.topic()+" ");System.out.println(metadata.partition());}});} catch (Exception e) {e.printStackTrace();}try {TimeUnit.MILLISECONDS.sleep(10);} catch (InterruptedException e) {e.printStackTrace();}}} }這里采用的客戶端不是0.8.x.x時代的Scala版本,而是Java編寫的新Kafka Producer, 相應的Maven依賴如下:
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>1.0.0</version> </dependency>上面的程序中使用的是Kafka客戶端自帶的org.apache.kafka.common.serialization.StringSerializer,除了用于String類型的序列化器之外還有:ByteArray、ByteBuffer、Bytes、Double、Integer、Long這幾種類型,它們都實現了org.apache.kafka.common.serialization.Serializer接口,此接口有三種方法:
下面我們來看看Kafka中org.apache.kafka.common.serialization.StringSerializer的具體實現,源碼如下:
public class StringSerializer implements Serializer<String> {private String encoding = "UTF8";@Overridepublic void configure(Map<String, ?> configs, boolean isKey) {String propertyName = isKey ? "key.serializer.encoding" : "value.serializer.encoding";Object encodingValue = configs.get(propertyName);if (encodingValue == null)encodingValue = configs.get("serializer.encoding");if (encodingValue != null && encodingValue instanceof String)encoding = (String) encodingValue;}@Overridepublic byte[] serialize(String topic, String data) {try {if (data == null)return null;elsereturn data.getBytes(encoding);} catch (UnsupportedEncodingException e) {throw new SerializationException("Error when serializing string to byte[] due to unsupported encoding " + encoding);}}@Overridepublic void close() {// nothing to do} }首先看下StringSerializer中的configure(Map<String, ?> configs, boolean isKey)方法,這個方法的執行是在創建KafkaProducer實例的時候調用的,即執行代碼Producer<String,String> producer = new KafkaProducer<String,String>(properties)時調用,主要用來確定編碼類型,不過一般key.serializer.encoding或serializer.encoding都不會配置,更確切的來說在Kafka Producer Configs列表里都沒有此項,所以一般情況下encoding的值就是UTF-8。serialize(String topic, String data)方法非常的直觀,就是將String類型的data轉為byte[]類型即可。
如果Kafka自身提供的諸如String、ByteArray、ByteBuffer、Bytes、Double、Integer、Long這些類型的Serializer都不能滿足需求,讀者可以選擇使用如Avro、JSON、Thrift、ProtoBuf或者Protostuff等通用的序列化工具來實現,亦或者是使用自定義類型的Serializer來實現。下面就以一個簡單的例子來介紹下如何自定義類型的使用方法。
假設我們要發送的消息都是Company對象,這個Company的定義很簡單,只有名稱name和地址address,具體如下:
public class Company {private String name;private String address;//省略Getter, Setter, Constructor & toString方法 }接下去我們來實現Company類型的Serializer,即下面代碼示例中的DemoSerializer。
package com.hidden.client; public class DemoSerializer implements Serializer<Company> {public void configure(Map<String, ?> configs, boolean isKey) {}public byte[] serialize(String topic, Company data) {if (data == null) {return null;}byte[] name, address;try {if (data.getName() != null) {name = data.getName().getBytes("UTF-8");} else {name = new byte[0];}if (data.getAddress() != null) {address = data.getAddress().getBytes("UTF-8");} else {address = new byte[0];}ByteBuffer buffer = ByteBuffer.allocate(4+4+name.length + address.length);buffer.putInt(name.length);buffer.put(name);buffer.putInt(address.length);buffer.put(address);return buffer.array();} catch (UnsupportedEncodingException e) {e.printStackTrace();}return new byte[0];}public void close() {} }使用時只需要在Kafka Producer的config中修改value.serializer屬性即可,示例如下:
properties.put("value.serializer", "com.hidden.client.DemoSerializer"); //記得也要將相應的String類型改為Company類型,如: //Producer<String,Company> producer = new KafkaProducer<String,Company>(properties); //Company company = new Company(); //company.setName("hidden.cooperation-" + new Date().getTime()); //company.setAddress("Shanghai, China"); //ProducerRecord<String, Company> producerRecord = new ProducerRecord<String, Company>(topic,company);示例中只修改了value.serializer,而key.serializer和value.serializer沒有什么區別,如果有真實需要,修改以下也未嘗不可。
接下一篇:Kafka消息序列化和反序列化(下)
歡迎跳轉到本文的原文鏈接:https://honeypps.com/mq/kafka-message-serialize-and-deserialize-1/
歡迎支持筆者新作:《深入理解Kafka:核心設計與實踐原理》和《RabbitMQ實戰指南》,同時歡迎關注筆者的微信公眾號:朱小廝的博客。
總結
以上是生活随笔為你收集整理的Kafka消息序列化和反序列化(上)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Kafka Producer拦截器
- 下一篇: Kafka消息序列化和反序列化(下)