Kafka解析之topic创建(3)——合法性验证
歡迎支持筆者新作:《深入理解Kafka:核心設計與實踐原理》和《RabbitMQ實戰指南》,同時歡迎關注筆者的微信公眾號:朱小廝的博客。
歡迎跳轉到本文的原文鏈接:https://honeypps.com/mq/kafka-analysis-of-topic-creation-3-validity-verification/
前文摘要
在《Kafka解析之Topic創建(1)》這篇文章中,我們講述了創建Topic的方式有兩種:
在學習了KafkaAdminClient之后我們發現它也可以用來創建Topic,即通過發送CreateTopicsRequest請求的方式來創建。KafkaAdminClient的詳細內容可以參考:《集群管理工具KafkaAdminClient——原理與示例》和《集群管理工具KafkaAdminClient——改造》。
一般情況下,Kafka生產環境中的 auto.create.topics.enable參數會被修改為false,即自動創建Topic這條路會被堵住。kafka-topics.sh腳本創建的方式一般由運維人員操作,普通用戶無權過問。那么KafkaAdminClient就為普通用戶提供了一個口子,或者將其集成到公司內部的資源申請、審核系統中更加的方便。普通用戶在創建Topic的時候,有可能由于誤操作或者其他原因而創建了不符合運維規范的Topic,比如命名不規范,副本因子數太低等,這些都會影響后期的系統運維。如果創建Topic的操作是封裝在資源申請、審核系統中的話,那么可以在前端就可以根據規則過濾掉不符合規范的申請操作。然而如果用戶就是用了KafkaAdminClient或者類似的工具來創建了一個錯誤的Topic,我們有什么辦法可以做相應的規范處理呢?
在Kafka服務端中提供了這樣一個參數:create.topic.policy.class.name,其提供了一個入口用來驗證Topic創建的合法性。使用方式是自定義實現org.apache.kafka.server.policy.CreateTopicPolicy接口,比如下面的PolicyDemo,然后在kafka broker中的config/server.properties配置文件中配置參數create.topic.policy.class.name=org.apache.kafka.server.policy.PolicyDemo,然后啟動Kafka服務即可。PolicyDemo的代碼參考如下,主要實現接口中的configure、close以及validate方法,configure方法會在Kafka服務啟動的時候執行,validate方法用來鑒定Topic參數的合法性,其在創建Topic的時候執行,close方法在關閉Kafka服務的時候執行。
public class PolicyDemo implements CreateTopicPolicy{public void configure(Map<String, ?> configs) {}public void close() throws Exception {}public void validate(RequestMetadata requestMetadata)throws PolicyViolationException {if(requestMetadata.numPartitions()!=null || requestMetadata.replicationFactor()!=null){if(requestMetadata.numPartitions()< 5){throw new PolicyViolationException("Topic should have at least 5 partitions, received: "+ requestMetadata.numPartitions());}if(requestMetadata.replicationFactor()<= 1){throw new PolicyViolationException("Topic should have at least 2 replication factor, recevied: "+ requestMetadata.replicationFactor());}}}}采用文章《集群管理工具KafkaAdminClient——原理與示例》中的所提及的關于KafkaAdminClient來創建Topic,測試代碼如下,創建一個分區數為4,副本數為1的Topic:
@Test public void createTopics() {NewTopic newTopic = new NewTopic(NEW_TOPIC,4, (short) 1);Collection<NewTopic> newTopicList = new ArrayList<>();newTopicList.add(newTopic);CreateTopicsResult result = adminClient.createTopics(newTopicList);try {result.all().get();} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();} }測試結果如期報錯:
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.PolicyViolationException: Topic should have at least 5 partitions, received: 4相應的Kafka服務端的日志如下:
CreateTopicPolicy.RequestMetadata(topic=topic-test2, numPartitions=4, replicationFactor=1, replicasAssignments=null, configs={}) [2018-04-18 19:52:02,747] INFO [Admin Manager on Broker 0]: Error processing create topic request for topic topic-test2 with arguments (numPartitions=4, replicationFactor=1, replicasAssignments={}, configs={}) (kafka.server.AdminManager) org.apache.kafka.common.errors.PolicyViolationException: Topic should have at least 5 partitions, received: 4客戶端向Kafka服務端發送了CreateTopicsRequest請求之后,會經過KafkaApis:
case ApiKeys.CREATE_TOPICS => handleCreateTopicsRequest(request)然后調用handleCreateTopicsRequest()方法,Topic最終在服務端的創建是在AdminManager中的createTopics方法中實現的。而CreateTopicPolicy的作用域也限定在這個createTopics方法之內,故只有通過CreateTopicsRequest請求的方式才能促使CreateTopicPolicy有效,而對于類似于kafka-topics.sh腳本的創建方式無效。不過在正文開頭就提及了在運維規范的情況下,一般是通過KafkaAdminClient進行操作,或者更加規范的話直接通過申請頁面來創建,這樣就可以在前端規避風險,這樣顯得更加的專業。
歡迎跳轉到本文的原文鏈接:https://honeypps.com/mq/kafka-analysis-of-topic-creation-3-validity-verification/
歡迎支持筆者新作:《深入理解Kafka:核心設計與實踐原理》和《RabbitMQ實戰指南》,同時歡迎關注筆者的微信公眾號:朱小廝的博客。
超強干貨來襲 云風專訪:近40年碼齡,通宵達旦的技術人生
總結
以上是生活随笔為你收集整理的Kafka解析之topic创建(3)——合法性验证的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 集群管理工具KafkaAdminClie
- 下一篇: 再看Kafka Lag