Kafka配置动态SASL_SCRAM认证
Kafka配置動態SASL_SCRAM認證
- 1.啟動Zookeeper和Kafka
- 2.創建SCRAM證書
- 3.維護SCRAM證書
- 3.1查看SCRAM證書
- 3.1刪除SCRAM證書
- 4.服務端配置
- 5.客戶端配置
- 6.Java代碼測試
- 6.1生產者
- 6.2消費者
Kafka中需要加上認證,并動態新增用戶,SASL/SCRAM驗證可以支持
本文章是對https://blog.csdn.net/qq_38616503/article/details/117529690中的內容整理與重新記錄
1.啟動Zookeeper和Kafka
第一步,在沒有設置任何權限的配置下啟動Kafka和Zookeeper,如需要從頭安裝Kafka,可參見Kafka的安裝單機安裝以及集群安裝
2.創建SCRAM證書
(1)創建broker通信用戶:admin(在使用sasl之前必須先創建,否則啟動報錯)
bin/kafka-configs.sh --zookeeper 192.168.56.101:2181,192.168.56.102:2181,192.168.56.103:2181 --alter --add-config 'SCRAM-SHA-256=[password=admin-sec],SCRAM-SHA-512=[password=admin-sec]' --entity-type users --entity-name admin(2)創建生產用戶producer
bin/kafka-configs.sh --zookeeper 192.168.56.101:2181,192.168.56.102:2181,192.168.56.103:2181 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=prod-sec],SCRAM-SHA-512=[password=prod-sec]' --entity-type users --entity-name producer(2)創建消費用戶:consumer
bin/kafka-configs.sh --zookeeper 192.168.56.101:2181,192.168.56.102:2181,192.168.56.103:2181 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=cons-sec],SCRAM-SHA-512=[password=cons-sec]' --entity-type users --entity-name consumerSCRAM-SHA-256/SCRAM-SHA-512是對密碼加密的算法,二者有其一即可
3.維護SCRAM證書
3.1查看SCRAM證書
bin/kafka-configs.sh --zookeeper 192.168.56.101:2181,192.168.56.102:2181,192.168.56.103:2181 --describe --entity-type users --entity-name consumer bin/kafka-configs.sh --zookeeper 192.168.56.101:2181,192.168.56.102:2181,192.168.56.103:2181 --describe --entity-type users --entity-name producer3.1刪除SCRAM證書
bin/kafka-configs.sh --zookeeper localhost:2181 --alter --delete-config 'SCRAM-SHA-512' --delete-config 'SCRAM-SHA-256' --entity-type users --entity-name producer4.服務端配置
在用戶證書創建完畢之后開始Kafka服務端的配置
(1)創建JAAS文件:
(2)將JAAS配置文件位置作為JVM參數傳遞給每個Kafka Broker【bin/kafka-server-start.sh】添加-Djava.security.auth.login.config=/opt/kafka_2.12-2.4.1/config/kafka_server_jaas.conf
-Djava.security.auth.login.config=/opt/kafka_2.12-2.4.1/config/kafka_server_jaas.conf exec $base_dir/kafka-run-class.sh $EXTRA_ARGS -Djava.security.auth.login.config=/opt/kafka_2.12-2.4.1/config/kafka_server_jaas.conf kafka.Kafka "$@"(3)配置server.properties【config/server.properties】
#認證配置 listeners=SASL_PLAINTEXT://IP:9092 security.inter.broker.protocol=SASL_PLAINTEXT sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256 sasl.enabled.mechanisms=SCRAM-SHA-256 advertised.listeners=SASL_PLAINTEXT://IP:9092#ACL配置 allow.everyone.if.no.acl.found=false super.users=User:admin authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer可以根據自己的需求選擇SASL_SSL或SASL_PLAINTEXT, PLAINTEXT為不加密明文傳輸,性能好一點。配置完后重啟Kafka和Zookeeper
5.客戶端配置
(1)創建的三個用戶的三個JAAS文件:
kafka_client_scram_admin_jaas.conf
kafka_client_scram_producer_jaas.conf
kafka_client_scram_consumer_jaas.conf
(2)修改啟動腳本引入JAAS文件
###生產者配置bin/kafka-console-producer.sh exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=/opt/kafka_2.12-2.4.1/config/kafka_client_scram_producer_jaas.conf###消費者配置bin/kafka-console-consumer.sh exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=/opt/kafka_2.12-2.4.1/config/kafka_client_scram_consumer_jaas.conf(3)配置consumer.properties和producer.properties,都加入以下配置
security.protocol=SASL_PLAINTEXT sasl.mechanism=SCRAM-SHA-256 bootstrap.servers=192.168.56.101:9092,192.168.56.102:9092,192.168.56.103:9092(4)創建主題
bin/kafka-topics.sh --zookeeper 192.168.56.101:2181,192.168.56.102:2181,192.168.56.103:2181 --create --topic topictest --partitions 3 --replication-factor 1(5)啟動生產
bin/kafka-console-producer.sh --broker-list 192.168.56.101:9092,192.168.56.102:9092,192.168.56.103:9092 --topic topictest --producer.config config/producer.properties(6)對生產者賦予寫的權限
bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=192.168.56.101:2181,192.168.56.102:2181,192.168.56.103:2181 --add --allow-principal User:producer --operation Write --topic topictest(7)查看權限
bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=192.168.56.101:2181,192.168.56.102:2181,192.168.56.103:2181 --list(8)對消費者賦予讀的權限
bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=192.168.56.101:2181,192.168.56.102:2181,192.168.56.103:2181 --add --allow-principal User:consumer --operation Read --topic topictest(9)對消費者賦予組的權限
bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=192.168.56.101:2181,192.168.56.102:2181,192.168.56.103:2181 --add --allow-principal User:consumer --operation Read --group test-consumer-group(10)啟動消費者
bin/kafka-console-consumer.sh --bootstrap-server 192.168.56.101:9092,192.168.56.102:9092,192.168.56.103:9092 --topic topictest --from-beginning --consumer.config config/consumer.properties6.Java代碼測試
6.1生產者
maven的pom.xml
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.3.0</version><!-- <version>0.10.2.0</version> --></dependency>kafka_client_scram_producer_jaas.conf
KafkaClient { org.apache.kafka.common.security.scram.ScramLoginModule required username="producer" password="prod-sec"; };代碼:
import java.util.Properties; import java.util.concurrent.Future;import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata;public class MySaslScramProducer {public static MySaslScramProducer ins ;private Producer<String, String> producer;private MySaslScramProducer(){Properties props = new Properties();props.put("bootstrap.servers","192.168.56.101:9092,192.168.56.102:9092,192.168.56.103:9092");props.put("acks", "1");props.put("retries", 3);props.put("batch.size", 16384);props.put("linger.ms", 1);props.put("buffer.memory", 33554432);//props.put("compression.type","gzip");//props.put("max.request.size","5242880");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");//配置文件采用項目相對路徑訪問,plan-text鑒權將以下注解開放即可System.out.println(MySaslScramProducer.class.getResource("/").getPath() + "kafka_client_scram_producer_jaas.conf");System.setProperty("java.security.auth.login.config", MySaslScramProducer.class.getResource("/").getPath() + "kafka_client_scram_producer_jaas.conf");props.put("security.protocol", "SASL_PLAINTEXT");props.put("sasl.mechanism", "SCRAM-SHA-256");producer = new KafkaProducer<>(props);}public static MySaslScramProducer getIns(){if(ins == null) {synchronized (MySaslScramProducer.class) {if(ins == null) {ins = new MySaslScramProducer();}}}return ins;}public Future<RecordMetadata> send(String topic, String valueStr){//采用異步發送,在失敗時打印出失敗的日志,備核查Future<RecordMetadata> meta = producer.send(new ProducerRecord<String, String>(topic, valueStr), new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if(exception != null) {//發送失敗的打印出來到error.logSystem.out.println("sendi failed--->>> " + valueStr);}else {System.out.println("topic:" + metadata.topic() + " ,partition:" +metadata.partition() +" , offset:" + metadata.offset() + " -> " + valueStr);}}});return meta;}public void close(){if(producer != null) producer.close();}public static void main(String[] args) throws InterruptedException {String valueStr = "{\"metric\":\"host.mem.pused\",\"value\":\"97.781098\",\"tags\":{\"resCi\":\"TA_RES_PHYSICALHOST\",\"dataType\":0,\"ip\":\"132.121.93.69\",\"cmd\":\"\",\"resId\":\"auto217A77657DDC70403B949090D3EA5543\",\"itemKey\":\"vm.memory.size[pavailable]\"},\"timestamp\":\"1617673320000\"}";MySaslScramProducer.getIns().send("topictest", valueStr);MySaslScramProducer.getIns().close();} }6.2消費者
kafka_client_scram_consumer_jaas.conf
KafkaClient { org.apache.kafka.common.security.scram.ScramLoginModule required username="consumer" password="cons-sec"; };代碼:
package cn.gzsendi;import java.util.Arrays; import java.util.HashMap; import java.util.Map; import java.util.Properties; import java.util.Set;import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import org.slf4j.Logger; import org.slf4j.LoggerFactory;public class SaslScramTopicTest {public static boolean stop = false;private static Logger logger = LoggerFactory.getLogger(SaslScramTopicTest.class);public static void main(String[] args) {KafkaConsumer<String, String> consumer = null;Properties props = new Properties();props.put("bootstrap.servers","192.168.56.101:9092,192.168.56.102:9092,192.168.56.103:9092");props.put("group.id", "liutest");props.put("enable.auto.commit", "true"); // 自動提交props.put("auto.offset.reset", "latest");props.put("auto.commit.interval.ms", "1000");props.put("session.timeout.ms", "300000");props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");System.setProperty("java.security.auth.login.config", SaslScramTopicTest.class.getResource("/").getPath() + "kafka_client_scram_consumer_jaas.conf");props.put("security.protocol", "SASL_PLAINTEXT");props.put("sasl.mechanism", "SCRAM-SHA-256");consumer = new KafkaConsumer<>(props);String topicName = "topictest";consumer.subscribe(Arrays.asList(topicName));while (!stop) {try {ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records) {String valueStr = record.value();try {logger.info(valueStr);logger.info("topic:" + record.topic() +" ,partition:" + record.partition() + " ,offset:" +record.offset() + " -> " + record.value());} catch (Exception e) {System.out.println("error------->>> " + valueStr);}}} catch (Exception e) {e.printStackTrace();}}if (consumer != null)consumer.close();}/**** <跳過歷史數據,從最新的數據開始消費>** @param consumer* @throws*/public static void assignOffset(KafkaConsumer<String, String> consumer) {if (consumer == null) {return;}Map<TopicPartition, OffsetAndMetadata> offsetMap = new HashMap<TopicPartition, OffsetAndMetadata>();consumer.poll(100);Set<TopicPartition> assignment = consumer.assignment();consumer.seekToEnd(assignment);//consumer.seekToBeginning(assignment);for (TopicPartition topicPartition : assignment) {long position = consumer.position(topicPartition);offsetMap.put(topicPartition, new OffsetAndMetadata(position));consumer.commitSync(offsetMap);}} }總結
以上是生活随笔為你收集整理的Kafka配置动态SASL_SCRAM认证的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 什么是云计算管理平台
- 下一篇: sqlplus基本使用