Kafka安全认证授权配置
Kafka安全認證授權配置
- 一 概述
- 1.1 Kafka的權限分類
- 1.2 實現方式
- 二 安全認證授權配置
- 2.1 zookeeper配置
- 2.1.1 引入kafka依賴包
- 2.1.2修改ZK配置文件
- 2.1.3 創建jaas文件
- 2.1.4 修改zkEnv.sh
- 2.1.5 啟動zk
- 2.2 Kafka配置
- 2.2.1 創建超級用戶
- 2.2.2 創建jaas文件
- 2.2.3 修改kafka配置文件
- 2.2.4啟動kafka
- 三 測試連接
- 3.1 生產者測試
- 3.2 消費者測試
- 3.3 beats工具連接
- 3.4 logstash消費
一 概述
1.1 Kafka的權限分類
1)身份認證(Authentication):對client 與服務器的連接進行身份認證,brokers和zookeeper之間的連接進行Authentication(producer 和 consumer)、其他 brokers、tools與 brokers 之間連接的認證。
2)權限控制(Authorization):實現對于消息級別的權限控制,clients的讀寫操作進行Authorization:(生產/消費/group)數據權限。
1.2 實現方式
自0.9.0.0版本開始Kafka社區添加了許多功能用于提高Kafka集群的安全性,Kafka提供SSL或者SASL兩種安全策略。SSL方式主要是通過CA令牌實現,此方案主要介紹SASL方式。
1)SASL驗證:
| SASL/PLAIN | 0.10.0.0 | 不能動態添加用戶 |
| SASL/SCRAM | 0.10.2.0 | 可以動態添加用戶 |
| SASL/Kerberos | 0.9.0.0 | 需要獨立部署驗證服務 |
| SASL/OAUTHBEARER | 2.0.0 | 需自己實現接口實現token的創建和驗證,需要額外的oauth服務 |
2)SSL加密: 使用SSL加密在代理和客戶端之間,代理之間或代理和工具之間傳輸的數據。
二 安全認證授權配置
本文檔主要演示SASL/SCRAM和SASL/PLAIN 搭配使用的方案。版本:kafka_2.12-2.5.1.tgz、apache-zookeeper-3.5.8-bin.tar.gz
2.1 zookeeper配置
2.1.1 引入kafka依賴包
將kafka包下面的相關依賴包復制到zookeeper的目錄下,不同的kafka
版本jar包版本會不一樣。
2.1.2修改ZK配置文件
Zookeeper的配置文件zoo.cfg中添加如下內容:
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider requireClientAuthScheme=sasl jaasLoginRenew=36000002.1.3 創建jaas文件
在zookeeper的conf目錄下添加安全認證的jaas文件,該文件定義需要鏈接到Zookeeper服務器的用戶名和密碼。這里定義文件名為zk-server-jaas.conf,文件內容為:
Server { org.apache.kafka.common.security.plain.PlainLoginModule required username="zoo_admin" password="zoo_admin" user_kafka="kafka"; };該文件中username和password是定義zookeeper集群節點之間認證的用戶名和密碼;user_開頭配置的用戶kafka和密碼kafka是提供給外部應用連接zookeeper使用的,后面kafka使用此用戶連接zookeeper。
2.1.4 修改zkEnv.sh
將上一步添加的jaas配置文件添加到zookeeper的環境變量中,zkEnv.sh文件最后添加一行:
export SERVER_JVMFLAGS="-Djava.security.auth.login.config=/mnt/datadisk/zookeeper/conf/zk_server_jaas.conf"2.1.5 啟動zk
nohup bin/zkServer.sh start &2.2 Kafka配置
Kafka和生產者/消費者之間采用SASL/PLAIN和SASL/SCRAM兩種方式共同完成認證,授權使用ACL方式。PLAIN方式的用戶是在jaas文件中寫死的,不能動態的添加;SCRAM支持動態的添加用戶。
2.2.1 創建超級用戶
配置SASL/SCRAM認證的第一步,是配置可以連接到kafka集群的用戶。本方案演示創建3個用戶:kafka_server_admin,writer,reader。kafka_server_admin用戶用于broker之間的認證通信,writer用戶用于生產者連接kafka,reader用戶用于消費者連接kafka。
bin/kafka-configs.sh --zookeeper 192.168.13.202:2181 --alter --add-config 'SCRAM-SHA-256=[password=admin],SCRAM-SHA-512=[password=admin]' --entity-type users --entity-name kafka_server_admin正常情況打印信息:Completed updating config for entity: user-principal ‘admin’.
bin/kafka-configs.sh --zookeeper 192.168.13.202:2181 --alter --add-config 'SCRAM-SHA-256=[password=writer],SCRAM-SHA-512=[password=writer]' --entity-type users --entity-name writer bin/kafka-configs.sh --zookeeper 192.168.13.202:2181 --alter --add-config 'SCRAM-SHA-256=[password=reader],SCRAM-SHA-512=[password=reader]' --entity-type users --entity-name reader在zk客戶端中可以查看創建成功的用戶:
bin/zkCli.sh # 客戶端打開后執行下面命令,查看用戶節點,可以看到創建成功的用戶 ls /config/userskafka-configs 腳本是用來設置主題級別參數的。其實,它的功能還有很多。比如在這個例子中,我們使用它來創建 SASL/SCRAM 認證中的用戶信息。可以使用下列命令來查看剛才創建的用戶數據。
bin/kafka-configs.sh --zookeeper 192.168.13.202:2181 --describe --entity-type users --entity-name writer這段命令包含了 writer 用戶加密算法 SCRAM-SHA-256 以及 SCRAM-SHA-512 對應的鹽值 (Salt)、ServerKey 和 StoreKey,這些都是 SCRAM 機制的術語。
2.2.2 創建jaas文件
配置了用戶之后,我們需要為 Broker 創建一個對應的 JAAS 文件。在實際場景中,需要為每臺單獨的物理 Broker 機器都創建一份 JAAS 文件。
# 在kafka目錄下創建文件 vi config/kafka_server_jaas.confkafka_server_jaas.conf文件的內容為:
KafkaServer { org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka_server_admin" password="admin";org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-secret" user_filebeat="123456"; };Client { org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka" password="kafka"; };KafkaServer配置的是kafka的賬號和密碼。Client配置了broker到Zookeeper的連接用戶名密碼,這里要和前面2.1.3節zookeeper配置中的zk_server_jaas.conf中user_kafka的賬號和密碼相同。中間部分配置的是PLAIN認證方式的賬戶和密碼,其中filebeat是賬戶名,123456是密碼。
關于這個文件內容,需要注意以下兩點:
2.2.3 修改kafka配置文件
server.properties文件中添加如下內容:
# 啟用ACL allow.everyone.if.no.acl.found=false authorizer.class.name=kafka.security.authorizer.AclAuthorizer # 設置本例中admin為超級用戶;在Zookeeper的“/kafka/config/users”下存在用戶 super.users=User:kafka_server_admin # 同時啟用SCRAM和PLAIN機制 sasl.enabled.mechanisms=SCRAM-SHA-256,PLAIN # 為broker間通訊開啟SCRAM機制,采用SCRAM-SHA-512算法 sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256 # broker間通訊使用PLAINTEXT,本例中不演示SSL配置 security.inter.broker.protocol=SASL_PLAINTEXT # 配置listeners使用SASL_PLAINTEXT listeners=SASL_PLAINTEXT://:9092 # 配置advertised.listeners advertised.listeners=SASL_PLAINTEXT://192.168.13.202:9092啟動腳本kafka-server-start.sh文件最后一行注釋掉,修改為:
#exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@" exec $base_dir/kafka-run-class.sh $EXTRA_ARGS -Djava.security.auth.login.config=/mnt/datadisk/kafka/config/kafka_server_jaas.conf kafka.Kafka "$@"kafka配置文件server.properties完整配置如下:
# kafka不同節點的唯一標識 broker.id=0#認證配置 listeners=SASL_PLAINTEXT://:9092 security.inter.broker.protocol=SASL_PLAINTEXT sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256 sasl.enabled.mechanisms=SCRAM-SHA-256,PLAIN #ACL配置 allow.everyone.if.no.acl.found=false super.users=User:kafka_server_admin authorizer.class.name=kafka.security.authorizer.AclAuthorizer# ip為本機ip advertised.listeners=SASL_PLAINTEXT://192.168.13.202:9092 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 # 必改項 數據和日志分離, kafka中數據的存放目錄 log.dirs=/mnt/datadisk/kafka/data # 默認的分區數量 num.partitions=3 # 默認的副本數量 default.replication.factor=1 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 # zookeeper集群地址 zookeeper.connect=192.168.13.202:2181 zookeeper.connection.timeout.ms=90000 # 可以刪除topic delete.topic.enable=true group.initial.rebalance.delay.ms=0 # 消息體大小 message.max.bytes=10485760 socket.request.max.bytes=524288000 replica.lag.time.max.ms=1200002.2.4啟動kafka
nohup bin/kafka-server-start.sh config/server.properties &三 測試連接
3.1 生產者測試
使用kafka-console-producer.sh腳本測試生產者,由于開啟安全認證和授權,因此客戶端需要做相應的配置。config目錄下創建一個producer.conf的文件,文件內容如下:
security.protocol=SASL_PLAINTEXT sasl.mechanism=SCRAM-SHA-256 sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="writer" password="writer";創建一個測試主題test_topic:
bin/kafka-topics.sh --zookeeper 192.168.13.202:2181 --create --replication-factor 1 --partitions 1 --topic test_topic這里使用writer用戶認證授權,通過ACL為writer用戶分配操作test_topic權限(下面兩個命令二選一即可):
1.分配寫的權限
2.分配producer權限
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:writer --producer --topic 'test_topic'啟動生產者發送消息:
bin/kafka-console-producer.sh --broker-list 192.168.13.202:9092 --topic test_topic --producer.config /mnt/datadisk/kafka/config/producer.conf3.2 消費者測試
使用kafka-console-consumer.sh腳本測試生產者,由于開啟安全認證和授權,因此客戶端需要做相應的配置。config目錄下創建一個consumer.conf的文件,文件內容如下:
security.protocol=SASL_PLAINTEXT sasl.mechanism=SCRAM-SHA-256 sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="reader" password="reader";這里使用reader用戶認證授權,通過ACL為reader用戶分配操作test_topic權限(下面兩個命令二選一即可):
1.分配讀的權限
2.分配consumer的權限
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:reader --consumer --topic test_topic --group test_group啟動消費者消費消息:
bin/kafka-console-consumer.sh --bootstrap-server 192.168.13.202:9092 --topic test_topic --group 'test_group' --from-beginning --consumer.config /mnt/datadisk/kafka/config/consumer.conf3.3 beats工具連接
kafka官網提供了許多beats采集工具,用于不同場景下采集數據,感興趣的可以去官網了解下,這里就不介紹了。筆者這里使用的是beats工具是7.9版本的,采集的數據如果要發送到kafka,只支持SASL/PLAIN認證方式,據說高版本的beats工具支持SCRAM方式的。這里介紹下filebeat-7.9.1的配置方式。PLAIN類型的用戶在2.2.2章節已經創建,在filebeat中用此用戶名和密碼即可完成認證,授權參考3.1章節。
filebeat的配置文件filebeat.yml中輸出kafka配置中加入用戶名和密碼,如下:
可以啟動一個消費者監控filebeat采集的數據,啟動filebeat,并在filebeat采集的文件中輸入內容,可以看到消費者可以獲取對應的消息:
3.4 logstash消費
logstash可以使用SCRAM方式認證,logstash的config目錄下新增kafka-client-jaas.conf文件,文件內容如下:
KafkaClient { org.apache.kafka.common.security.scram.ScramLoginModule required username="reader" password="reader"; };修改congfig目錄下對應的conf處理文件,在input模塊中引入SCRAM認證:
security_protocol => "SASL_PLAINTEXT" sasl_mechanism => "SCRAM-SHA-256" jaas_path => "/mnt/datadisk/logstash/config/kafka-client-jaas.conf"
啟動生產者往logstash監控的topic中發送數據,測試如下:
下一篇:【Java版 Kafka ACL使用實戰】
總結
以上是生活随笔為你收集整理的Kafka安全认证授权配置的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Python之多张图片拼接
- 下一篇: 模拟集成电路学习