Kafka解惑之Old Producer(1)—— Beginning
歡迎支持筆者新作:《深入理解Kafka:核心設(shè)計與實踐原理》和《RabbitMQ實戰(zhàn)指南》,同時歡迎關(guān)注筆者的微信公眾號:朱小廝的博客。
歡迎跳轉(zhuǎn)到本文的原文鏈接:https://honeypps.com/mq/kafka-analysis-of-old-producer-beginning/
眾所周知,目前Kafka的最新版本已經(jīng)到達1.0.0,很多公司運行的kafka也大多升級到了0.10.x版本,Kafka的Producer客戶端早已不再使用0.8.2.x就已基本停止維護的Scala版本的Producer了,那么我們還有必要了解它么?當(dāng)然很有必要,通過Kafka Old Producer我們可以了解Kafka變遷升級的歷史:舊版的Old Producer模型相對簡單利于初始了解,通過對Old Producer的了解也可以慢慢的發(fā)現(xiàn)隱患的問題,這樣進一步可以研究探討解決方法,最后再通過對新版Producer的學(xué)習(xí)來提升對Kafka的認知,與此同時也可以讓讀者在遇到相似問題的時候可以借鑒Kafka的優(yōu)化過來來優(yōu)化自己的應(yīng)用。以銅為鑒,可以正衣冠。
在使用Scala版本的Kafka生產(chǎn)者客戶端kafka.javaapi.producer.Producer時,實際上調(diào)用的是kafka.producer.Producer類。
package kafka.javaapi.producer class Producer[K, V](private val underlying : kafka.producer.Producer[K, V]) extends scala.AnyRef {def this(config : kafka.producer.ProducerConfig) = { /* compiled code */ }def send(message : kafka.producer.KeyedMessage[K, V]) : scala.Unit = { /* compiled code */ }def send(messages : java.util.List[kafka.producer.KeyedMessage[K, V]]) : scala.Unit = { /* compiled code */ }def close : scala.Unit = { /* compiled code */ } }包括kafka-console-producer.sh的腳本(常用來測試發(fā)送消息之用)中,對于0.8.2.x版本如果不指定“-- new-producer”參數(shù);或者對于.0.0版本如果指定“-- old-producer”參數(shù)的話,實際上內(nèi)部調(diào)用的都是kafka.producer.Producer這個類。
對于kafka-console-producer.sh腳本的內(nèi)容如下:
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; thenexport KAFKA_HEAP_OPTS="-Xmx512M" fi exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleProducer "$@"我們看到實際上kafka-console-producer.sh的內(nèi)容就是運行kafka.tools.ConsoleProducer而已,可以看到main函數(shù)代碼塊中的config.useOldProducer,這個筆者看的是1.0.0版本的代碼,而0.8.2.2版本中的ConsoleProducer對應(yīng)的是config.useNewProducer,稍有不同而已,不過如果都指定使用舊版的Scala的Producer,那么都是指kafka.producer.OldProducer。
object ConsoleProducer {def main(args: Array[String]) {try {val config = new ProducerConfig(args)val reader = Class.forName(config.readerClass).newInstance().asInstanceOf[MessageReader]reader.init(System.in, getReaderProps(config))val producer =if(config.useOldProducer) {new OldProducer(getOldProducerProps(config))} else {new NewShinyProducer(getNewProducerProps(config))}進一步剖析,kafka.producer.OldProducer的內(nèi)部構(gòu)造很簡單,關(guān)鍵代碼如下:
class OldProducer(producerProps: Properties) extends BaseProducer {// default to byte array partitionerif (producerProps.getProperty("partitioner.class") == null)producerProps.setProperty("partitioner.class", classOf[kafka.producer.ByteArrayPartitioner].getName)val producer = new kafka.producer.Producer[Array[Byte], Array[Byte]](new ProducerConfig(producerProps))可以看到內(nèi)部的producer最終還是實例化的kafka.producer.Producer。最終驗證了開篇所述的舊版的Kafka生產(chǎn)者客戶端即為Kafka.producer.Producer。
新版的Java版的Kafka客戶端是:org.apache.kafka.clients.producer.KafkaProducer,讀者請注意區(qū)分。對于新版的KafkaProducer在以后的文章中會有詳細介紹。
下面就來深入了解下Kafka.producer.Producer(下面如無特殊說明都將Kafka.producer.Producer此簡稱為Producer)了。當(dāng)實例化Producer的時候,首先要讀取、解析以及校驗配置信息的合法性,根據(jù)配置信息來實例化Producer。Producer的配置項有18個,比如設(shè)置分區(qū)器、消息壓縮方式等,這些都比較好理解,而最主要的配置就是request.required.acks和producer.type這兩個配置。
request.required.acks是用來配置生產(chǎn)端消息確認的方式,在0.8.x這個系列的版本之中,可以配置為0,1,-1的值,也可以配置為其他的整數(shù)值,用來控制一條消息經(jīng)由多少個ISR中的副本所在的Broker確認之后才向客戶端發(fā)送確認信息,這個參數(shù)在之后的版本,比如1.0.0版本中就只能設(shè)置0,1,-1(all)這3(4)種取值,分別表示:
有關(guān)kafka的消息可靠性的更深層次的講解可以參考我2017年初的一篇博客:kafka數(shù)據(jù)可靠性深度解讀,這篇博客主要是針對0.8.2.x版本的kafka做深層次的探討,后續(xù)會對1.0.0版本做進一步的說明。
Producer的發(fā)送模式分為同步(sync)和異步(async)兩種情況,這一點可以通過參數(shù)producer.type來配置。同步模式會將消息直接發(fā)往broker中,而異步模式則會將消息存入LinkedBlockingQueue中,然后通過一個ProducerSendThread來專門發(fā)送消息。為了便于說明,筆者這里先對同步模式的情況來做說明,而異步模式只是在同步模式的基礎(chǔ)上做了一些封裝而已。
class Producer[K,V](val config: ProducerConfig,private val eventHandler: EventHandler[K,V]) // only for unit testingextends Logging {private val hasShutdown = new AtomicBoolean(false)private val queue = new LinkedBlockingQueue[KeyedMessage[K,V]](config.queueBufferingMaxMessages)private var sync: Boolean = trueprivate var producerSendThread: ProducerSendThread[K,V] = nullprivate val lock = new Object()config.producerType match {case "sync" =>case "async" =>sync = falseproducerSendThread = new ProducerSendThread[K,V]("ProducerSendThread-" + config.clientId,queue,eventHandler,config.queueBufferingMaxMs,config.batchNumMessages,config.clientId)producerSendThread.start()}在講述Producer的具體行為之前先來看一個發(fā)送方的Demo:
public class ProducerScalaDemo {public static final String brokerList = "xxx.xxx.xxx.xxx:9092";public static final String topic = "topic-zzh";public static void main(String[] args) {Properties properties = new Properties();properties.put("serializer.class", "kafka.serializer.StringEncoder");properties.put("metadata.broker.list", brokerList);properties.put("producer.type", "sync");properties.put("request.required.acks", "1");Producer<String, String> producer = new Producer<String, String>(new ProducerConfig(properties));String message = "kafka_message-" + new Date().getTime() + " edited by hidden.zhu";KeyedMessage<String, String> keyedMessage = new KeyedMessage<String, String>(topic,null, message);producer.send(keyedMessage);} }我們可以看到再初始化Producer的時候之用了ProducerConfig這一個類型的參數(shù),而在Producer的類定義中還用到了EventHandler這個類型的參數(shù)。在Scala語言中只有一個主構(gòu)造函數(shù),這個主構(gòu)造函數(shù)的參數(shù)列表就是跟在類名后面括號中的各個的參數(shù),如果要重載的話就需要自定義輔助構(gòu)造函數(shù),輔助構(gòu)造函數(shù)必須調(diào)用主構(gòu)造函數(shù)(this方法)。如此上面這個Demo中很顯然的就調(diào)用了輔助構(gòu)造函數(shù)來進行實例化,那么我們再來看下其對應(yīng)的輔助構(gòu)造函數(shù):
def this(config: ProducerConfig) =this(config,new DefaultEventHandler[K,V](config,CoreUtils.createObject[Partitioner](config.partitionerClass, config.props),CoreUtils.createObject[Encoder[V]](config.serializerClass, config.props),CoreUtils.createObject[Encoder[K]](config.keySerializerClass, config.props),new ProducerPool(config)))這里又引入了兩個新的東西:DefaultEventHandler和ProducerPool,這個DefaultEventHandler繼承了EventHandler這個類,這個是消息發(fā)送的關(guān)鍵。而ProducerPool內(nèi)部是一個HashMap,其中的key是broker的id,而value就是每個broker對應(yīng)的SyncProducer,這個SyncProducer就是真正的消息發(fā)送者。
歡迎跳轉(zhuǎn)到本文的原文鏈接:https://honeypps.com/mq/kafka-analysis-of-old-producer-beginning/
歡迎支持筆者新作:《深入理解Kafka:核心設(shè)計與實踐原理》和《RabbitMQ實戰(zhàn)指南》,同時歡迎關(guān)注筆者的微信公眾號:朱小廝的博客。
總結(jié)
以上是生活随笔為你收集整理的Kafka解惑之Old Producer(1)—— Beginning的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Linux IO磁盘篇整理小记
- 下一篇: Kafka解惑之Old Producer