mysql作为kafka生产者_Kafka之生产者
[TOC]
從編程的角度而言,生產者就是負責向 Kafka 發送消息的應用程序。在 Kafka 的歷史變遷 中, 一共有兩個大版本的生產者客戶端: 第-個是于 Kafka開源之初使用 Scala語言編寫的客戶 端,我們可以稱之為舊生產者客戶端(OldProducer)或 Scala版生產者客戶端;第二個是從 Kafka 0.9.x 版本開始推出的使用 Java 語言編寫的客戶端,我們可以稱之為新生產者客戶端( New Producer)或 Java 版生產者客戶端,它彌補了 舊版客戶端中存在的諸 多設計缺陷
雖然Kafka是用 Java/Scalai吾言編寫的,但這并不妨礙它對于多語言的支持,在 Kafka官網中,“CLIENTS”的入口 l提供了一份多語言的支持列表,其中包括常用的CIC++、 Python、 Go等語 言 ,不過這些其他類語 言 的客戶端并非由 Kafka社區維護,如果使用則需要另行下載 。本章主要針對現下流行的新生產者 CJava語言編寫的)客戶端做詳細介紹,而舊生產者客戶端己被湖汰, 故不再做相應的介紹了。
客戶端開發
一個正常的生產邏輯需要具備以下幾個步驟 :
(1 )配置生產者客戶端參數及創建相應的生產者實例。
(2)構建待發送的消息 。
(3 )發送消息。
(4)關閉生產者實例。
代碼清單 1-2 中己經簡單對生產者客戶端 的編碼做了 一個基本演示 如代碼清單 2-1 所示 。
public class KafkaProducerAnalysis {
public static final String brokerList = "localhost:9092";
public static final String topic = "topic-demo";
public static Properties initConfig() {
Properties props = new Properties();
props.put("bootstrap.servers", brokerList);
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("client.id", "producer.client.id.demo");
return props;
}
public static Properties initNewConfig() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.CLIENT_ID_CONFIG, "producer.client.id.demo");
return props;
}
public static Properties initPerferConfig() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
return props;
}
public static void main(String[] args) throws InterruptedException {
Properties props = initConfig();
KafkaProducer producer = new KafkaProducer<>(props);
// KafkaProducer producer = new KafkaProducer<>(props,
// new StringSerializer(), new StringSerializer());
ProducerRecord record = new ProducerRecord<>(topic, "hello, Kafka!");
try {
producer.send(record);
// producer.send(record, new Callback() {
// @Override
// public void onCompletion(RecordMetadata metadata, Exception exception) {
// if (exception == null) {
// System.out.println(metadata.partition() + ":" + metadata.offset());
// }
// }
// });
} catch (Exception e) {
e.printStackTrace();
}
// TimeUnit.SECONDS.sleep(5);
}
}
這里有必要單獨說明的是構建的消息對象 ProducerRecord,它并不是單純意義上的消息, 它包含了多個屬性 , 原本需要發送的與業務 相 關的消息體只是其 中 的一個 value 屬性 ,比 如 “ Hello, Kafka!”只是 ProducerRecord對象中的一個屬性。 ProducerRecord類的定義如下(只截
取成員變量) :
public class ProducerRecord {
private final String topic;
private final Integer partition;
private final Headers headers;
private final K key;
private final V value;
private final Long timestamp;
川 省略其他成員方法和構造方法
}
其中 topic 和 partition 字段分別代表消息要發往的主題和分區號。 headers 字段是 消息的頭部, Kafka 0.11.x 版本才引入這個屬性,它大多用來設定 一些與 應用相關的信息,如無 需要也可以不用設置。 key 是用來指定消息的鍵,它不僅是消息的附加信息,還可以用來計算 分區號進而可以讓消息發往特定的分區。前面提及消息以主題為單位進行歸類,而這個 key 可 以讓消息再進行二次歸類,同 一個 key 的消息會被劃分到同一個分區中,value 是指消息體, 一般不為空, 如果為空則表示特定的消息一一墓碑消息,timestamp 是指消息的時間戳, 它有 CreateTime 和 LogAppendTime 兩種類型,前者表示消息創建的時間,后者表示消息追加到日志文件的時間。
2.1.1 必要的參數配置
在創建真正的生產者實例前需要配置相應的參數,比如需要連接的 Kafka集群地址。參照代碼清單 2-1 中的 initConfig()方法,在 Kafka 生產者客戶端 KatkaProducer 中有 3 個參數是必填的。
bootstrap.servers:該參數用來指定生產者客戶端連接 Kafka 集群所需的 broker地址清單,具體的內容格式為hostl:portl,host2:port2,可以設置一個或多個 地址,中間以逗號隔開,此 參數 的默認值為“” 。 注意這里并非需要所有的 broker 地 址,因為生產者會從給定的 broker 里查找到其他 broker 的信息 。不過建議至少要設置 兩個以上的 broker 地址信息,當其中任意 一個巖機時,生產者仍然可以連接到 Kafka 集群上。
key. serializer 和 value . serializer: broker 端接收的消息必須以字節數組 (byte[])的形式存在。代碼清單 2-1 中生產者使用的 KatkaProducer和 ProducerRecord中的泛型 對應 的就是消息中 key 和value 的類型,生產者客戶端使用這種方式可以讓代碼具有良好 的可讀性 ,不過在發 往 broker之前需要將消息中對應的 key 和 value 做相應的序列化操作來轉換成字節 數組。 key . serial工zer 和 value .serializer 這兩個參數分 別用來指定 key 和 value 序列化操作 的序列化器,這兩個參數無默認值。注意這里必須填寫序列化器的 全限定名 ,如代碼清單 2-1 中的org.apache.kafka.common.serialization.StringSerializer, 單單指定 StringSerializer是錯誤的
注意到代碼清單 2-1 中的 initConfig()方法里還設置了 一個參數 client.id,這個參數用來 設定 KafkaProducer 對應的客戶端 id, 默認值為“” 。
注意到代碼清單 2-1 中的 initConfig()方法里還設置了 一個參數 client.id,這個參數用來 設定 KafkaProducer 對應的客戶端 id, 默認值為“” 。 如果客戶端不設置, 則 KafkaProducer 會 自動生成一個非空字符串,內容形式如“producer-I”“producer-2”
KafkaProducer 中的參數眾多,遠非示例 initConfig()方法 中的那樣只有 4 個,開發人員可以 根據業務應用的實際需求來修改這些參數的默認值,以達到靈活調配的目的。一般情況下,普 通開發人員無法記住所有的參數名稱,只能有個大致的印象。在實際使用過程中,諸如
“ key.serializer”“max.request.size” “interceptor.classes” 之類的字符串經常 由于人為因素而書 寫錯誤。為 此,我們可以 直接使用客戶端中的 org.apache.kafka.clients.producer.ProducerConfig 類來做一定程度上的預防措施,每個參數在 ProducerConfig 類中都有對應 的名稱,以代碼清單 2-1 中的 initConfig()方法為例 ,引入 ProducerConfig 后的修改結果如 下:
public static Properties initNewConfig() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.CLIENT_ID_CONFIG, "producer.client.id.demo");
return props;
}
注意到上面的代碼中 key . serializer 和 value . serializer 參數對應類 的全限定名 比較長,也比較容易寫錯, 這里通過 Java 中的技巧來做進一步的改進, 相關代碼如下:
props.put (ProducerConfig . KEY_SERIALIZER_CLASS_CONFIG , StringSerializer .class.getName());
props.put(ProducerConfig .VALUE SERIALIZER CLASS CONFIG, StringSerializer .class .getName()) ;
如此代碼便簡潔了許多,同時進一步降低了人為出錯的可能性。在配置完參數之后,我們就可以使用它來創建一個生產者實例,示例如下:
KafkaProducer producer= new KafkaProducer<>(props) ;
KafkaProducer 是線程安全的,可以在多個線程中共享單個 KafkaProducer 實例,也可以將KafkaProducer 實例進行池化來供其他線程調用。
KafkaProducer 中有多個構造方法,比如在創 建 KafkaProducer 實例時并沒有設定 key.serializer 和 value .serializer 這兩個配置參數,那么就需要在構造方法中添加 對應的序列化器,示例如下:
KafkaProducer producer= new KafkaProducer<>(props, new StringSerializer() , new StringSerializer());
其內部原理和無序列化器的構造方法 - 樣,不過就實際應用而言,一般都選用 KafkaProducer(Properties properties)這個構造方法來創建 KafkaProducer 實例。
2.1.2 消息的發送
在創建完生產者實例之后,接下來的工作就是構建消息,即創建 ProducerRecord 對象 。通 過代碼清單 2-1 中我們己經了解了 ProducerRecord 的屬性結構, 其中 topic 屬性和 value 屬 性是必填項,其余屬性是選填項,對應的 ProducerRecord 的構造方法也有多種,參考如下:
image.png
代碼清單 2-1 中使用的是最后一種構造方法,也是最簡單的一種,這種方式相當于將 ProducerRecord 中除 topic 和 value 外的屬性全部值設置為 null。在實際 的應用中,還會用到 其他構造方法,比如要指定 key,或者添加 headers 等。
創建生產者實例和構建消息之后 , 就可以開始發送消息了。發送消息主要有三種模式 : 發 后 即忘( fire-and-forget〕、同步( sync)及異步 (async)。
代碼清單 2-1 中的這種發送方式就是發后即忘,它只管往 Kafka 中發送消息而并不關心消 息是否正確到達。在大多數情況下,這種發送方式沒有什么 問題 , 不過在某些時候( 比如發生 不可重試異常時〉會造成消息的丟失。這種發送方式的性能最高,可靠性也最差。
KafkaProducer 的 send()方法井非是 void 類型 , 而是 Future類型 , send() 方法有 2 個重載方法,具體定義如下 :
publ工C Future send(ProducerRecord record)
public Future send(ProducerRecord record ,
Callback callback)
要實現同步的發送方式,可以利用返回的 Future對象實現, 示例如下:
try {
producer . send(record) .get() ;
} catch (ExecutionException I InterruptedException e) {
e .printStackTrace() ;
}
實際上 send()方法本身就是異步的, send()方法返回的 Future對象可以使調用方稍后獲得發 送的結果。示例中在執行 send()方法之后直接鏈式調用了 get()方法來阻塞等待 Kafka 的響應, 直到消息發送成功,或者發生異常。如果發生異常,那么就需要捕獲異常并交 由外層邏輯處理。
也可以在執行完 send()方法之后不直接調用 get()方法, 比如下面的一種同步發送方式的實現
try {
Future future= producer.send(record) ;
RecordMetadata metadata =future .get();
System.out.println(metadata . top工c () + ” - " +
metadata.partition() + ”: ” + metadata .offset() );
這樣可以獲取一個 RecordMetadata 對象,在 RecordMetadata 對象里包含了消息的 一 些元數 據信息,比如當前消息的主題、分區號、分區中的偏移量( offset〕、時間戳等。如果在應用代 碼中需要這些信息,則可以使用這個方式 。 如果不需要,則直接采用 producer.send(record).get() 的方式更省事。
Future 表示一個任務的生命周期,并提供了相應的方法來判斷任務是否己經完成或取消, 以及獲取任務的結果和取消任務等 。 既然 KafkaProduc巳r.send()方法的返回值是一個 Future 類型 的對象,那么完全可以用 Java 語言層面的技巧來豐富應用的實現,比如使用 Future 中的 get(long
timeout, TimeUnit unit)方法實現可超時的阻塞。
KafkaProducer 中一般會發生兩種類型的異常 : 可重試的異常和不可重試的異常 。常見的可 重試異常有 : NetworkException、 LeaderNotAvailableException、 UnknownTopicOrPartitionException、 NotEnoughReplicasException、 NotCoordinatorException 等。 比如 NetworkException 表示網絡異 常,這個有可能是由于網絡瞬時故障而導致的異常,可以通過重試解決;又比如 LeaderNotAvailableException表示分區的 leader副本不可用,這個異常通常發生在 leader副本下 線而新的 leader 副本選舉完成之前,重試之后可以重新恢復。不可重試的異常,比如 1.4 節中 提及的 RecordTooLargeException異常,暗示了所發送的消息太大, KafkaProducer對此不會進行 任何重試 , 直接拋 出異常 。
對于可重試的異常,如果配置了 retries 參數,那么只要在規定的重試次數內自行恢復 了,就不會拋出異常 。 retries 參數的默認值為 0,配置方式參考如下:
props . put(ProducerConf 工q . RETRIES_CONFIG, 1 0 ) ;
示例中配置了 10 次重試。如果重試了 10 次之后還沒有恢復,那么仍會拋出異常,進而發 送的外層邏輯就要處理這些異常了。
同步發送的方式可 靠性高,要么消息被發送成功,要么發生異常。如果發生異常 ,則可以 捕獲并進行相應的處理,而不會像“發后即忘”的方式直接造成消息的丟失。不過同步發送的 方式的性能會差很多,需要阻塞等待一條消息發送完之后才能發送下一條。
我們再來了解一下異步發送的方式,一般是在 send()方法里指定一個 Callback 的回調函數,
Kafka在返回響應時調用該函數來實現異步的發送確認。有讀者或許會有疑問, send()方法的返 回值類型就是 Future,而 Future本身就可以用作異步的邏輯處理 。這樣做不是不行,只不過 Future 里的 get()方法在何時調用,以及怎么調用都是需要面對的問題,消息不停地發送,那么諸多消息對應的 Future對象的處理難免會引起代碼處理邏輯的混亂。使用 Callback的方式非常簡潔明了, Kafka 有 響應時就會回調 , 要么發送成功,要么拋出異常。異步發送方式的示例如下 :
image.png
示例代碼中遇到異常時( exception!=null)只是做了 簡單的打印操作,在實際應用中應該使 用更加穩妥的方式來處理,比如可以將異常記錄以便日后分析,也可以做 一定 的處理來進行消 息重發。 onCompletion()方法的兩個參數是互斥的,消息發送成功時, metadata 不為 null 而 exception為 null:消息發送異常時, metadata為 null而 exception不為 null。
producer .send(recordl, callbackl) ;
producer . send (record2 , callback2);
對于同一個分區而言,如果消息 recordl 于 record2 之前先發送(參考上面的示例代碼〉, 那么 KafkaProducer就可以保證對應的 callbackl 在 callback2 之前調用,也就是說,因調函數的 調用也可以保證分 區有序。
通常,一個 KafkaProducer 不會只負 責發送單條消息,更多 的是發送多條消息,在發送完這 些消息之后,需要調用 KafkaProducer 的 close()方法來回收資源。 下面的示例中發送了 100 條消 息,之后就調用了 close()方法來回收所占用的資源:
int i = O;
while (i < 100) {
ProducerRecord record =
new ProducerRecord<>{topic, ” msg” +i++) ;
try {
producer . send(record) .get() ;
} catch (InterruptedExcept工on I ExecutionException e) {
close()方法會阻塞等待之前所有的發送請求完成后再關閉 KafkaProducer。與此同時, KafkaProducer 還提供了一個帶超時時間的 close()方法,具體定義如下 :
public void close(long timeout, TimeUnit timeUnit)
如果調用了帶超時時 間 timeout 的 close()方法,那么只會在等待 timeout 時間 內來完成所有 尚未完成 的請求 處理 , 然后強行退出。在實際應用 中 ,一般使用 的都是無參的 close()方法。
2.1.3 序列化
生產者需要用序列化器(Serializer)把對象轉換成字節數組才能通過網絡發送給 Kafka。而 在對側,消費者需要用反序列化器(Deserializer)把從 Kafka 中收到的字節數組轉換成相應的 對象。在代碼清單2-1 中,為了方便,消息的key和value都使用了字符串, 對應程序中的 序列化器也使用 了客戶端自帶的 org.apache.kafka.common.serialization.StringSerializer, 除了用于 String類型的序列化器, 還有 ByteAη町、 ByteBuffer、 Bytes、 Double、 Integer、 Long這幾種類 型,它們都實現了 org.apache.kafka.common.serialization.Serializer接口, 此接口有 3個方法:
publiC void configure (Map configs , boolean isKey) public byte[] serialize(String topic , T data)
public void close()
configure()方法用來配置當前類 , serialize()方法用來執行序列化操作。而 close()方法用來關 閉當前的序列化器, 一般情況下 close()是一個空方法, 如果實現了此方法,則必須確保此方法 的幕等性,因為這個方法很可能會被 KafkaProducer調用多次。
生產者使用的序列化器和消費者使用的反序列化器是需要一一對應的,如果生產者使用了 某種序列化器,比如 StringSerializer, 而消費者使用了另一種序列化器,比如 IntegerSerializer, 那么是無法解析 出想要的數據的。
package chapter2;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Serializer;
import java.io.UnsupportedEncodingException;
import java.util.Map;
/**
* 代碼清單2-2
* 摘抄至Kafka源碼:org.apache.kafka.common.serialization.StringSerializer
*
*/
public class StringSerializer implements Serializer {
private String encoding = "UTF8";
@Override
public void configure(Map configs, boolean isKey) {
String propertyName = isKey ? "key.serializer.encoding" : "value.serializer.encoding";
Object encodingValue = configs.get(propertyName);
if (encodingValue == null)
encodingValue = configs.get("serializer.encoding");
if (encodingValue instanceof String)
encoding = (String) encodingValue;
}
@Override
public byte[] serialize(String topic, String data) {
try {
if (data == null)
return null;
else
return data.getBytes(encoding);
} catch (UnsupportedEncodingException e) {
throw new SerializationException("Error when serializing string to byte[] due to unsupported encoding " + encoding);
}
}
@Override
public void close() {
// nothing to do
}
}
首先是 configureO方法, 這個方法是在創建 KafkaProducer實例的時候調用的, 主要用來確 定編碼類型, 不過一般客戶端對于 key. serializer.encoding, val ue .seria lizer . encoding和 serializer.e口coding這幾個參數都不會配置,在 KafkaProducer的參數集合( ProducerConfig)里也沒有這幾個 參數( 它們可以看作用戶 自定義的參數〉,所以一般情況下 encoding 的值就為默認的“ UTF-8” 。 serialize()方法非常直觀,就是將 String 類型轉為 byte[] 類型。
如果 Kafka 客戶端提供的幾種序列化器都無法滿足應用需求,則可以選擇使用如 Avro、JSON、 Thri魚、 ProtoBuf和 Protos陽ff等通用的序列化工具來實現 , 或者使用自定義類型的序列 化器來實現 。 下面就以 一個簡單的例子來介紹自定義類型的使用方法 。
假設我們要發送的消息都是 Company對象,這個 Company的 定義很簡單,只有名稱 name和 地址address,示例代碼參考如下(為了構建方便,示例中使用 了lombok1工具) :
package chapter2;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* 代碼清單2-3中的Company類
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class Company {
private String name;
private String address;
// private String telphone;
}
下面我們再來看一下 Company對應的序列化器 CompanySerializer,示例代碼如代碼清單 2-3 所示。
package chapter2;
import org.apache.kafka.common.serialization.Serializer;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.Map;
/**
* 代碼清單2-3
*/
public class CompanySerializer implements Serializer {
@Override
public void configure(Map configs, boolean isKey) {
}
@Override
public byte[] serialize(String topic, Company data) {
if (data == null) {
return null;
}
byte[] name, address;
try {
if (data.getName() != null) {
name = data.getName().getBytes("UTF-8");
} else {
name = new byte[0];
}
if (data.getAddress() != null) {
address = data.getAddress().getBytes("UTF-8");
} else {
address = new byte[0];
}
ByteBuffer buffer = ByteBuffer.
allocate(4 + 4 + name.length + address.length);
buffer.putInt(name.length);
buffer.put(name);
buffer.putInt(address.length);
buffer.put(address);
return buffer.array();
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
return new byte[0];
}
@Override
public void close() {
}
}
上面的這段代碼的邏輯很簡單, configure()和close()方法也都為空。與此對應的反序列化器 CompanyDeserializer 的詳細實現參見 3.2.3 節。
如何使用自定義的序列化器 CompanySerializer 呢?只需將 KafkaProducer 的 value . serializer 參數設置為 CompanySerializ巳r類的全限定名即可。假如我們要發送一個 Company對象到 Kafka, 關鍵代碼如代碼清單 2-4所示。
package chapter2;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
/**
* 代碼清單2-4
*/
public class ProducerSelfSerializer {
public static final String brokerList = "localhost:9092";
public static final String topic = "topic-demo";
public static void main(String[] args)
throws ExecutionException, InterruptedException {
Properties properties = new Properties();
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
CompanySerializer.class.getName());
// properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
// ProtostuffSerializer.class.getName());
properties.put("bootstrap.servers", brokerList);
KafkaProducer producer =
new KafkaProducer<>(properties);
Company company = Company.builder().name("hiddenkafka")
.address("China").build();
// Company company = Company.builder().name("hiddenkafka")
// .address("China").telphone("13000000000").build();
ProducerRecord record =
new ProducerRecord<>(topic, company);
producer.send(record).get();
}
}
2.1.4 分區器
消息在通過 send()方法發往 broker 的過程 中, 有可能需要經過攔截器( Interceptor)、序列 化器(Serializer)和分區器(Partitioner)的一系列作用之后才能被真正地發往 broker。攔截器 (下一章會詳細介紹〉一般不是必需的,而序列化器是必需的。消息經過序列 化 之后就需要確 定它發往的分區,如果消息 ProducerRecord 中指定了 partition 字段, 那么就不需要分區器
的作用 ,因 為 partition 代表的就是所要發往的分區號。
如 果 消 息 ProducerRecord 中沒有指定 partition 字段,那么就需要依賴分區器 , 根據 key這個字段來計算 partition 的值。分區器的作用就是為消息分配分區。
Kafka 中提供的默認分區器是 org.apache.kafka.clients.producer.intemals.DefaultPartitioner, 它 實現了org.apache.kafka.clients.producer.Partitioner接口 , 這個接口中定義了 2 個方法 , 具體如下 所示。
public int partition(Str工ng topic , Object key, byte[] keyBytes ,
Ob] ect value , byte[] valueBytes , Cluster cluster) ;
public void close() ;
其中 partition()方法用來計算分區號,返回值為 int 類型。 partition()方法中的參數分別表示
主題 、鍵、序列化后的 鍵、值、序列 化后的值,以及集群的元數據信息,通過這些信息 可以實 現功能豐富的分區器。 close()方法在關閉分區器的時候用來回收一些資源。
在默認分區器 DefaultPartitioner 的實現中, close()是空方法,而在 partition()方法中定義了 主要的分區分配邏輯 。 如果 key 不為 null,那 么默認的分區器會對 key 進行哈 希(采 用 MurmurHash2 算法 ,具備高運算性能及低碰撞率),最終根據得到 的哈希值來計算分區號, 擁 有相同 key 的消息會被寫入同一個分區 。 如果 key 為 null,那么消息將會以輪詢的方式發往主 題內的各個可用分區。
在不改變主題分區數量的情況下 , key 與分區之間的映射可 以保持不變。不過, 一旦主題中增加了分區,那么就難 以保證 key 與分區之間的映射關系了。
除了使用 Kafka 提供的默認分區器進行分區分配,還可以使用自定義的分區器,只需同
DefaultPartitioner一樣實現 Partitioner接口即可。默認的分區器在 key為 null時不會選擇非可用 的分區,我們可以通過自 定義的分區器 DemoPartitioner來打破這一限制,具體的實現可以參考 下面的示例代碼,如代碼清單 2-5 所示。
package chapter2;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.utils.Utils;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 代碼清單2-5
*/
public class DemoPartitioner implements Partitioner {
private final AtomicInteger counter = new AtomicInteger(0);
@Override
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
List partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (null == keyBytes) {
return counter.getAndIncrement() % numPartitions;
} else
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
@Override
public void close() {
}
@Override
public void configure(Map configs) {
}
}
實現 自定義的 DemoPartitioner 類之后 , 需要通過配置參數 partitioner. class 來顯式 指定 這個分區器。示例 如 下 :
props . put (ProducerConfig . PARTITIONER_CLASS_CONFIG , DemoPartitioner .class .getName() ) ;
2.1.5 生產者攔截器
攔截器 (Interceptor)是早在 Kafka0.10.0.0 中就已經引入的一個功能, Kafka一共有兩種攔 截器 : 生產者攔截器和消費者攔截器。本節主要講述生產者攔截器 的相關 內容
生產者攔截器既可以用來在消息發送前做一些準備工作, 比如按照某個規則過濾不符合要 求的消息、修改消息的內容等, 也可以用來在發送回調邏輯前做一些定制化的需求,比如統計 類工作。
生產者攔截器 的 使用 也 很方便,主要是自定義實現 org.apache.kafka.clients.producer. Producerlnterceptor接口。 ProducerInterceptor接口中包含3個方法:
public ProducerRecord onSend (ProducerRecord record);
public void onAcknowledgement(RecordMetadata metadata, Excepti on exception );
public void close() ;
KafkaProducer在將消息序列化和計算分區之前會調用生產者攔截器的 onSend()方法來對消 息進行相應 的定制化操作。一般來說最好不要修改消息 ProducerRecord 的 topic、 key 和partition 等信息,如果要修改,則需確保對其有準確的判斷,否則會與預想的效果出現偏 差。比如修改 key 不僅會影響分區的計算,同樣會影響 broker 端日志壓縮( Log Compaction) 的功能 。
KafkaProducer 會在消息被應答( Acknowledgement)之前或消息發送失敗時調用生產者攔 截器的 onAcknowledgement()方法,優先于用戶設定的 Callback 之前執行。這個方法運行在 Producer 的 I/O 線程中,所以這個方法中實現的代碼邏輯越簡單越好, 否則會影響消息的發送 速度。
下面通過一個示例來 演示生產者攔截器的具體用法 , ProducerlnterceptorPrefix 中通過 onSend()方法來為每條消息添加一個前綴“prefixl-”,井且通過 onAcknowledgement()方法來計 算發送消息的成功率。 ProducerlnterceptorPrefix 類的具體實現如代碼清單 2-6 所示 。
package chapter2;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Map;
/**
* 代碼清單2-6
*/
public class ProducerInterceptorPrefix implements
ProducerInterceptor {
private volatile long sendSuccess = 0;
private volatile long sendFailure = 0;
@Override
public ProducerRecord onSend(
ProducerRecord record) {
String modifiedValue = "prefix1-" + record.value();
return new ProducerRecord<>(record.topic(),
record.partition(), record.timestamp(),
record.key(), modifiedValue, record.headers());
// if (record.value().length() < 5) {
// throw new RuntimeException();
// }
// return record;
}
@Override
public void onAcknowledgement(
RecordMetadata recordMetadata,
Exception e) {
if (e == null) {
sendSuccess++;
} else {
sendFailure++;
}
}
@Override
public void close() {
double successRatio = (double) sendSuccess / (sendFailure + sendSuccess);
System.out.println("[INFO] 發送成功率="
+ String.format("%f", successRatio * 100) + "%");
}
@Override
public void configure(Map map) {
}
}
實現自定義的 ProducerInterceptorPrefix 之后,需要在 KafkaProducer 的配置參數 川 terceptor.classes 中指定這個攔截器,此參數的默認值為“”。示例如下:
properties.put(ProducerConfiq .INTERCEPTOR_CLASSES_ CONFIG , ProducerinterceptorPrefix.class.getName());
然后使用指定了 ProducerInterceptorPrefix 的生產者連續發送 10 條內容為“ kafka”的消 息, 在發送完之后客戶端打印出如下信息:
[INFO]發送成功率= 100.000000%
如果消費這 10 條消息,會發現消費了的消息都變成了“ prefix1-kafka”,而不是原來的
“kafka” 。
KafkaProducer 中不僅可以指定一個攔截器,還可以指定多個攔截器以形成攔截鏈。攔截鏈 會按照 interceptor.classes 參數配置的攔截器的順序來一一執行(配置的時候,各個攔 截器之間使用逗號隔開)。下面我們再添加一個自定義攔截器 ProducerlnterceptorPrefixPlus,它 只實現了 Interceptor接口中的 onSend()方法,主要用來為每條消息添加另一個前綴“prefix2-”, 具體實現如下:
properties.put(ProducerConfig . INTERCEPTOR CLASSES CONFIG , ProducerinterceptorPrefix.class.getName( ) + ”,”
+ ProducerinterceptorPrefi xPlus . class.getName ());
在攔截鏈中,如果某個攔截器執行失敗,那么下 一個攔截器會接著從上-個執行成功 的攔截器繼續執行 。
原理分析
在前面的章節中,我們己經了解了 KafkaProducer的具體使用方法,而本節的內容主要是對 Kafka 生產者客戶端的內部原理進行分析,通過了解生產者客戶端的整體脈絡可以讓我們更好 地使用它,避免因為一些理解上的偏 差 而造成使用上的錯誤。
2.2.1 整體架構
在 2.1.4 節的 開頭介紹了消息在真正發往 Kafka 之前,有可能 需要經歷 攔截器 Clnterceptor)、 序列化器 (Serializer)和分區器(Partitioner)等一系列的作用,那么在此之后又會發生什么呢? 下面我們來看一下生產者客戶端的整體架構,如圖 2”l 所示。
image.png
整個生產者客戶端由兩個線程協調運行,這兩個線程分別為主線程和 Sender線程 (發送線 程)。在主線程中由 KafkaProducer創建消息,然后通過可能的攔截器、序列化器和分區器的作 用之后緩存到消息累加器( RecordAccumulator,也稱為消息收 集器〉中。 Sender 線程負責從 RecordAccumulator中獲取消息并將其發送到 Kafka中。
RecordAccumulator 主要用來緩存消息 以便 Sender 線程可以批量發送,進而減少網絡傳輸 的資源消耗以提升性能 。 RecordAccumulator 緩存的大 小可以通過生產者客戶端參數 buffer.memory 配置,默認值為 33554432B,即 321\侶。 如果生產者發送消息的速度超過發 送到服務器的速度,則會導致生產者空間不足,這個時候 KafkaProducer的 send()方法調用要么 被阻塞,要么拋出異常,這個取決于參數 max.block.ms 的配置,此參數的默認值為 60000, 即 60 秒 。
主線程中發送過來的消息都會被迫加到 RecordAccumulator 的某個雙端隊列( Deque)中, 在 RecordAccumulator 的內部為每個分區都維護了 一 個雙端隊列,隊列中的內容就是 ProducerBatch,即 Deque。消息寫入緩存 時,追加到雙端隊列的尾部: Sender讀取消息時 ,從雙端隊列的頭部讀取。注意 ProducerBatch 不是 ProducerRecord, ProducerBatch 中可以包含一至多個 ProducerRecord。 通俗地說, ProducerRecord 是生產者中創建的消息,而 ProducerBatch 是指一個消息批次 , ProducerRecord 會被包含在 ProducerBatch 中,這樣可以使 宇 節的使用更加緊湊。與此同時,將較小的 ProducerRecord 拼湊成一個較大 的 ProducerBatch,也 可以減少網絡請求的次數以提升整體的吞吐量 。 ProducerBatch 和消息的具體格式有關,更多的 詳細內容可以參考 5.2 節。如果生產者客戶端需要向很多分區發送消息, 則可以 將buffer .memory 參數適當調大以增加整體的吞吐量 。
ProducerBatch 的大小和 batch . size 參數也有著密切的關系。當一條消息( ProducerRecord ) 流入 RecordAccumulator 時,會先尋找與消息分區所對應的雙端隊列(如果沒有則新建),再從 這個雙端隊列的尾部獲取一個 ProducerBatch (如果沒有則新建),查看 ProducerBatch 中是否 還可以寫入這個 ProducerRecord,如 果可以 則 寫入,如果不可 以則 需要 創 建一個新 的 ProducerBatch。在新建 ProducerBatch時評估這條消息的大小是否超過 batch.size 參數的大 小,如果不超過,那么就以 batch. size 參數的大小來創建 ProducerBatch,這樣在使用完這 段內存區域之后,可以通過 BufferPool 的管理來進行復用;如果超過,那么就以評估的大小來 創建 ProducerBatch, 這段內存區域不會被復用。
Sender 從 RecordAccumulator 中 獲取緩存的消息之后,會進 一 步將原本>的保存形式轉變成的形式,其中 Node 表示 Kafka 集群 的 broker 節點 。對于網絡連接來說,生產者客戶端是與具體 的 broker 節點建立 的連接,也 就是向具體的 broker 節點發送消息,而并不關心消息屬于哪一個分區;而對于 KafkaProducer 的應用邏輯而 言 ,我們只 關注向哪個分區中發送哪些消息,所 以在這里需要做一個應用邏輯層 面到網絡 1/0 層面的轉換。
在轉換成>的形式之后, Sender 還 會進一步封裝成的形式,這樣就可以將 Request 請求發往各個 Nod巳了, 這里 的 Request 是指 Kafka 的 各種協議請求,對于消息發送而言就是指具體的 ProduceRequest
請求在從 Sender 線程發往 Kafka 之前還會保存到 InFlightRequests 中, InFlightRequests 保存對象的具體形式為 Map>,它的主要作用是緩存 了已經發出去但還 沒有收到響應的請求( Nodeld 是一個 String 類型,表示節點的 id 編號)。與此同時, InFlightRequests 還提供了許多管理類 的方法,并且通過配置參數還可 以限制每個連接(也就是 客戶端與 Node 之間的連接)最多緩存的請求數。這個配置參數為 max. 工n . flight.requests . p e r . connection,默認值為 5,即每個連接最多只能緩存 5 個未響應的請求,超過該數值 之后就不能再向這個連接發送更多的請求了,除非有緩存的請求收到了響應( Response)。通 過比較 Deque的 size 與這個參數的大小來判斷對應的 Node 中是否己 經堆積了很多未 響應的消息,如果真是如此,那么說明這個 Node 節點負載較大或網絡連接有問題,再繼續向 其發送請求會增大請求超時的可能。
2.2.2 元數據的更新
2.2.1 節中提及的 InFlightRequests 還可以獲得 leastLoadedNode,即所有 Node 中負載最小的 那一個 。這里 的負載最小是通過每個 Node 在 InFlightRequests 中還未確認的請求決定的,未確 認的請求越多則認為負載越大 。 對于圖 2-2 中的 InFlightRequests 來說,圖中展示了 三個節點 NodeO、Nodel 和 Node2,很明顯 Nodel 的 負載最 小 。也就是說, Nodel 為當前的 leastLoadedNode。 選擇 leastLoadedNode 發送請求可以使它能夠盡快發出,避免因網絡擁塞 等異常 而影響整體的進 度。 leastLoadedNode 的概念可以用于多個應用場合,比如元數據請求、消費者組播協議的交互。
image.png
我們使用如下的方式創建了一條消息 ProducerRecord:
new ProducerRecord<>(topic, ” Hello, Kafka !”) ;
我們只知道主題的名稱,對于其他一些必要的信息卻一無所知 。 KafkaProducer要將此消息 追加到指定主題的某個分區所對應的 leader 副本之前,首先需要知道主題的分區數量,然后經 過計算得出(或者直接指定〉目標分區,之后 KafkaProducer 需要知道目標分區的 leader 副本所 在的 broker 節點的地址、端口等信息才能建立連接,最終才能將消息發送到 Kafka,在這一過 程中 所 需要 的信息都屬于元數據信息。
在 2.1.1 節中我們了解了 bootstrap.servers 參數只需要配置部分 broker節點的地址即 可,不需要配置所有 broker節點的地址,因為客戶端可以自己發現其他 broker節點的地址, 這 一過程也屬于元數據相關的更新操作 。與此同時 ,分區數量及 leader 副本的分布都會動態地變 化, 客戶端也需要動態地捕捉這些變化。
元數據是指 Kafka 集群的元數據,這些元數據具體記錄了集群中有哪些主題,這些主題有 哪些分區,每個分區的 lead巳r副本分配在哪個節點上, follower副本分配在哪些節點上,哪些副 本在 AR、 ISR 等集合中,集群中有哪些節點,控制器節點又是哪一個等信息。
當客戶端中沒有需要使用的元數據信息時,比如沒有指定的主題信息,或者超 過 rnetadata .rnax.age.rns 時間沒有更新元數據都會引起元數據的更新操作 。客 戶端參數 rnetadata .rnax.age.rns 的默認值為 300000,即 5 分鐘。元數據的更新操作是在客戶端 內部 進行的,對客戶端的外部使用者不可見。當需要更新元數據時,會先挑選出 leastLoadedNode, 然后 向這個 Node發送 MetadataRequest請求來獲取具體的元數據信息。這個更新操作是由 Sender 線程發起的, 在創建完 MetadataRequest之后同樣會存入 InF!ightRequests,之后的步驟就和發送 消息時的類似 。 元數據雖然由 Sender 線程負責更新,但是主線程也需要讀取這些信息,這里的 數據同步通過 synchronized 和 final 關鍵字來保障。
2.3 重要的生產者參數
在 KafkaProducer 中 ,除了 2.1.1 節提及的 3 個默認的客戶端參數,大部分 的參數都有合理 的默認值, 一般不需要修改它們。不過了解這些參數可以讓我們更合理地使用生產者客戶端, 其中還有一些重要的參數涉及程序的可用性和性能,如果能夠熟練掌握它 們,也可以讓我們在 編寫相關的程序時能夠更好地進行性能調優與故障排查 。 下面挑選一些重要的參數進行講解。
1. acks
這個參數用來指定分區中必須要有多少個副本收到這條消息,之后生產者才會認為這條消 息是成功寫入的。 acks 是生產者客戶端中一個非常重要 的參數 ,它涉及消息的可靠性和吞吐 量之間的權衡。 acks參數有3種類型的值(都是字符串類型)。
acks = 1。默認值即為 l。生產者發送消息之后,只要分區的 leader副本成功寫入消 息,那么它就會收到來自服務端的成功響應 。 如果消息無法寫入 leader 副本,比如在 leader 副本崩潰、重新選舉新的 leader 副本的過程中,那么生產者就會收到一個錯誤 的響應,為了避免消息丟失,生產者可以選擇重發消息 。如果消息寫入 leader 副本并 返回成功響應給生產者,且在被其他 follower 副本拉取之前 leader 副本崩潰,那么此 時消息還是會丟失,因為新選舉的 leader 副本中并沒有這條對應的消息 。 acks 設置為 l,是消息可 靠性和吞吐量之 間的折中方案。
acks = 0。生產者發送消 息之后不需要等待任何服務端的響應 。如果在消息從發送到 寫入 Kafka 的過程中出現某些異常,導致 Kafka 并沒有收到這條消息,那么生產者也 無從得知,消息也就丟失了。在其他配置環境相同的情況下, acks 設置為 0 可以達 到最大的吞吐量。
acks =一l 或 acks =all。生產者在消息發送之后,需要等待 ISR 中的所有副本都成功 寫入消息之后才能夠收到來自服務端的成功響應。在其他配置環境相同的情況下, acks 設置為 1 (all)可以達到最強的可靠性。但這并不意味著消息就一定可靠,因 為 JSR 中可 能只有 leader 副本,這樣就退化成了 acks=l 的情況。要獲得更高的消息 可靠性需要配合 min.insync.replicas 等參數的聯動,消息可靠性分析的具體內 容可以參考 8.3節。
注意 acks 參數配置的值是一個字符串類型,而不是整數類型。舉個例子,將 acks 參數 設置為 0, 需要采用下面這兩種形式 :
properties . put( " acks ”, ” 。 ” );
2. max.「equest.size
這個參數用來限制生產者客戶端能發送的消息的最大值,默認值為 1048576B,即 lMB。 一般情況下,這個默認值就可以滿足大多數的應用場景了。筆者并不建議讀者盲目地增大這個 參數的配置值,尤其是在對 Kafka 整體脈絡沒有足夠把控的時候。因為這個參數還涉及一些其 他參數 的聯動,比如 broker 端的 message.max .bytes 參數,如果配置錯誤可能會引起一些不 必要的異常。比如將broker端的message.max.bytes參數配置為 10,而max.request.size 參數配置為 20,那么當我們發送一條大小為 15B 的消息時,生產者客戶端就會報出如下的異常:
org . apache . kafka.commo口 . errors.RecordTooLargeExcept工on : The request included a message larger than the max message size the server will accept.
3. retries 和 retry.backoffms
retries 參數用來配置生產者重試的次數,默認值為 0,即在發生異常的時候不進行任何 重試動作。消息在從生產者發出到成功寫入服務器之前可能發生一些臨時性的異常, 比如網 絡 抖動、 leader副本的選舉等,這種異常往往是可以自行恢復的,生產者可以通過配置 retries 大于 0 的值,以此通過 內部重試來恢復而不是一昧地將異常拋給生產者的應用程序。 如果重試 達到設定的 次數 ,那么生產者就會放棄重試并返回異常。不過并不是所有的異常都是可以通過 重試來解決的,比如消息太大,超過 max . request . size 參數配置的值時,這種方式就不可 行了 。
重試還和另一個參數 retry.backoff.ms 有關,這個參數的默認值為 100, 它用來設定 兩次重試之間的時間間隔,避免無效的頻繁重試。在配置 retries 和 retry . backoff.ms 之前,最好先估算一下可能的異常恢復時間,這樣可以設定總的重試時間大于這個異常恢復時 間,以此來避免生產者過早地放棄重試 。
Kafka 可以保證同一個分區中的消息是有序的。如果生產者按照一定的順序發送消息,那 么這些消息也會順序地寫入分區,進而消費者也可以按照同樣的順序消費它們。對于某些應用 來說,順序性非常重要,比如 MySQL 的 binlog傳輸,如果出現錯誤就會造成非常嚴重的后果。 如 果將 acks 參數配置為非零值,并且 max . i口 .flight.requests . per . connection 參數 配置為大于 l 的值,那么就會出現錯序的現象: 如果第一批次消息寫入失敗, 而第二批次消息 寫入成功,那么生產者會重試發送第一批次的消息, 此時如果第一批次的消息寫入成功,那么 這兩個批次的消息就出現了錯序 。 一般而言,在需要保證消息順序的場合建議把參數 max.in.flight . requests .per.connection 配置為 1,而不是把 acks 配置為 0, 不過 這樣也會影響整體的吞吐。
4. compression.type
這個參數用來指定消息的壓縮方式,默認值為“ none”,即默認情況下,消息不會被壓縮。
該參數還可以配置為“ gzip” “snappy” 和“ lz4” 。 對消息進行壓縮可以極大地減少網絡傳輸 量、降低網絡 1/0,從而提高整體的性能。消息壓縮是一種使用時間換空間的優化方式,如果對 時延有一定的要求,則不推薦對消息進行壓縮 。
6. linger.ms
這個參數用來指定生產者發送 ProducerBatch 之前等待更多消息( ProducerRecord)加入 ProducerBatch 的時間,默認值為 0。生產者客戶端會在 ProducerBatch 被填滿或等待時間超過 linger .ms 值時發迭出去。增大這個參數的值會增加消息的延遲,但是同時能提升一定的吞 吐量。 這個linger.ms參數與TCP協議中的Nagle算法有異曲同工之妙。
7. receive.buffe「.bytes
這個參數用來設置 Socket 接收消息緩沖區( SO 阻 CBUF)的大小,默認值為 32768 (島, 即 32阻。如果設置為一l,則使用操作系統的默認值。如果 Producer與 Kafka處于不同的機房, 則可以 適地調大這個參數值 。
8. send.buffer.bytes
這個參數用來設置Socket發送消息緩沖區CSO SNDBUF)的大小,默認值為131072CB),
即 128KB。與 receive . buffer .bytes 參數一樣 , 如果設置為 l,則使用操作系統的默認值。
9. request.timeout.ms
這個參數用來配置 Producer等待請求響應的最長時間,默認值為 30000 (ms)。請求超時 之后可以選擇進行重試 。注意這個參數需要 比 broker 端參數 replica.lag.time.max.ms 的 值要大 ,這樣 可 以減少因客戶端重試而引起的消息重復的概率。
還有一些生產者客戶端的參數在本節中沒有提及,這些 參數同樣非常重要 ,它們 需要單獨 的章節或場景來描述。部分參數在前面的章節 中 己經提及 ,比如 bootstrap . servers,還有 部分參數會在后面的 章節 中提及,比如 transactional . id。表 2-1 中羅列 了一份詳細的參 數列表以供讀者參閱。
image.png
總結
以上是生活随笔為你收集整理的mysql作为kafka生产者_Kafka之生产者的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 双曲函数与反双曲函数
- 下一篇: 惠普打印机M180N,不小心点了 升级(