Kafka分区分配计算(分区器Partitions)
歡迎支持筆者新作:《深入理解Kafka:核心設(shè)計(jì)與實(shí)踐原理》和《RabbitMQ實(shí)戰(zhàn)指南》,同時(shí)歡迎關(guān)注筆者的微信公眾號(hào):朱小廝的博客。
歡迎跳轉(zhuǎn)到本文的原文鏈接:https://honeypps.com/mq/kafka-partitions-distributed-calculation/
KafkaProducer在調(diào)用send方法發(fā)送消息至broker的過(guò)程中,首先是經(jīng)過(guò)攔截器Inteceptors處理,然后是經(jīng)過(guò)序列化Serializer處理,之后就到了Partitions階段,即分區(qū)分配計(jì)算階段。在某些應(yīng)用場(chǎng)景下,業(yè)務(wù)邏輯需要控制每條消息落到合適的分區(qū)中,有些情形下則只要根據(jù)默認(rèn)的分配規(guī)則即可。在KafkaProducer計(jì)算分配時(shí),首先根據(jù)的是ProducerRecord中的partition字段指定的序號(hào)計(jì)算分區(qū)。讀者有可能剛睡醒,看到這個(gè)ProducerRecord似曾相識(shí),沒(méi)有關(guān)系,先看段Kafka生產(chǎn)者的示例片段:
Producer<String,String> producer = new KafkaProducer<String,String>(properties); String message = "kafka producer demo"; ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(topic,message); try {producer.send(producerRecord).get(); } catch (InterruptedException e) {e.printStackTrace(); } catch (ExecutionException e) {e.printStackTrace(); }沒(méi)錯(cuò),ProducerRecord只是一個(gè)封裝了消息的對(duì)象而已,ProducerRecord一共有5個(gè)成員變量,即:
private final String topic;//所要發(fā)送的topic private final Integer partition;//指定的partition序號(hào) private final Headers headers;//一組鍵值對(duì),與RabbitMQ中的headers類(lèi)似,kafka0.11.x版本才引入的一個(gè)屬性 private final K key;//消息的key private final V value;//消息的value,即消息體 private final Long timestamp;//消息的時(shí)間戳,可以分為Create_Time和LogAppend_Time之分,這個(gè)以后的文章中再表。在KafkaProducer的源碼(1.0.0)中,計(jì)算分區(qū)時(shí)調(diào)用的是下面的partition()方法:
/*** computes partition for given record.* if the record has partition returns the value otherwise* calls configured partitioner class to compute the partition.*/ private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {Integer partition = record.partition();return partition != null ?partition :partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster); }可以看出的確是先判斷有無(wú)指明ProducerRecord的partition字段,如果沒(méi)有指明,則再進(jìn)一步計(jì)算分區(qū)。上面這段代碼中的partitioner在默認(rèn)情況下是指Kafka默認(rèn)實(shí)現(xiàn)的org.apache.kafka.clients.producer.DefaultPartitioner,其partition()方法實(shí)現(xiàn)如下:
/*** Compute the partition for the given record.** @param topic The topic name* @param key The key to partition on (or null if no key)* @param keyBytes serialized key to partition on (or null if no key)* @param value The value to partition on or null* @param valueBytes serialized value to partition on or null* @param cluster The current cluster metadata*/ public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);int numPartitions = partitions.size();if (keyBytes == null) {int nextValue = nextValue(topic);List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);if (availablePartitions.size() > 0) {int part = Utils.toPositive(nextValue) % availablePartitions.size();return availablePartitions.get(part).partition();} else {// no partitions are available, give a non-available partitionreturn Utils.toPositive(nextValue) % numPartitions;}} else {// hash the keyBytes to choose a partitionreturn Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;} }private int nextValue(String topic) {AtomicInteger counter = topicCounterMap.get(topic);if (null == counter) {counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);if (currentCounter != null) {counter = currentCounter;}}return counter.getAndIncrement(); }由上源碼可以看出partition的計(jì)算方式:
KafkaProducer中還支持自定義分區(qū)分配方式,與org.apache.kafka.clients.producer.internals.DefaultPartitioner一樣首先實(shí)現(xiàn)org.apache.kafka.clients.producer.Partitioner接口,然后在KafkaProducer的配置中指定partitioner.class為對(duì)應(yīng)的自定義分區(qū)器(Partitioners)即可,即:
properties.put("partitioner.class","com.hidden.partitioner.DemoPartitioner");自定義DemoPartitioner主要是實(shí)現(xiàn)Partitioner接口的public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster)的方法。DemoPartitioner稍微修改了下DefaultPartitioner的計(jì)算方式,詳細(xì)參考如下:
public class DemoPartitioner implements Partitioner {private final AtomicInteger atomicInteger = new AtomicInteger(0);@Overridepublic void configure(Map<String, ?> configs) {}@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);int numPartitions = partitions.size();if (null == keyBytes || keyBytes.length<1) {return atomicInteger.getAndIncrement() % numPartitions;}//借用String的hashCode的計(jì)算方式int hash = 0;for (byte b : keyBytes) {hash = 31 * hash + b;}return hash % numPartitions;}@Overridepublic void close() {} }這個(gè)自定義分區(qū)器的實(shí)現(xiàn)比較簡(jiǎn)單,讀者可以根據(jù)自身業(yè)務(wù)的需求來(lái)靈活實(shí)現(xiàn)分配分區(qū)的計(jì)算方式,比如:一般大型電商都有多個(gè)倉(cāng)庫(kù),可以將倉(cāng)庫(kù)的名稱(chēng)或者ID作為Key來(lái)靈活的記錄商品信息。
歡迎跳轉(zhuǎn)到本文的原文鏈接:https://honeypps.com/mq/kafka-partitions-distributed-calculation/
歡迎支持筆者新作:《深入理解Kafka:核心設(shè)計(jì)與實(shí)踐原理》和《RabbitMQ實(shí)戰(zhàn)指南》,同時(shí)歡迎關(guān)注筆者的微信公眾號(hào):朱小廝的博客。
總結(jié)
以上是生活随笔為你收集整理的Kafka分区分配计算(分区器Partitions)的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: Kafka消息序列化和反序列化(下)
- 下一篇: Kafka监控架构设计