Kafka系列(五)、开启SASL安全认证以及配置ACL权限控制
目錄
開啟SASL
控制臺配置用戶
ACL授權
Python客戶端訪問?
ACL常用命令
Kafka系列:
kafka 2.4.1單機版部署及使用
kafka監控系統kafka eagle安裝使用
滴滴開源的kafka-manager編譯及部署使用
kafka管理監控系統 CMAK(yahoo的kafka-manager)部署及使用
Kafka系列(一)、2.6.0版本kafka集群搭建
Kafka系列(二)、架構原理及存儲機制
Kafka系列(三)、生產者分區策略、ISR、ACK機制、一致性語義
Kafka系列(四)、消費者策略、Rebalance機制、Offset存儲機制?
本篇來講解如何給kafka集群開啟SASL安全認證以及配置ACL權限,還是之前搭建的那個kafka2.6集群。
本文主要內容:
- wyk01,wyk02,wyk03 kafka集群配置SASL認證;
- wyk01的消費者和生產者客戶端配置用戶并驗證ACL;
- 利用其它客戶端(Python)訪問開啟了SASL的kafka集群;
開啟SASL
在 wyk01 & wyk02 & wyk03 機器上執行下面的步驟:?
(1)、配置kafka軟連接和環境變量 :
# 創建軟連接
ln -s -f /opt/app/kafka_2.12-2.6.0 /opt/app/kafka# 配置環境變量
vim /etc/profile
# 添加下面的內容
export KAFKA_HOME=/opt/app/kafka# 刷新環境變量
source /etc/profile
(2)、將kafka集群服務關閉:
$KAFKA_HOME/bin/kafka-server-stop.sh
(3)、修改kafka的server.properties 配置文件添加SASL認證,修改listeners中的ip為對應機器的:
cd $KAFKA_HOME/config
vim server.properties?
# 添加下面的內容
#--------------------------------------------------
# 配置ACL入口類
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer# SASL_PLAINTEXT
#這里的listener中的wyk01 在三臺機器上換成每臺機器對應的hostname/ip
listeners=SASL_PLAINTEXT://wyk01:9092
security.inter.broker.protocol= SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN# 設置admin超級用戶
super.users=User:admin#設置為true,ACL機制為黑名單機制,只有黑名單中的用戶無法訪問
#默認為false,ACL機制為白名單機制,只有白名單中的用戶可以訪問
allow.everyone.if.no.acl.found=false
#--------------------------------------------------
(4)、新增 kafka_server_jaas.conf 配置文件添加用戶,注意最后的兩個分號。
前三行是配置管理員賬戶(該賬戶與上面server.properties中配置的super.users一樣),后面的user_wyk_reader="wyk_reader_pwd"表示添加一個用戶名為wyk_reader對應的密碼為wyk_reader_pwd。即 user_用戶名="該用戶的密碼"。之后配置ACL的時候需要用到這里配置的用戶。
vim kafka_server_jaas.conf
#添加下面的內容
#--------------------------------------------------
KafkaServer { org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="admin"password="admin"user_admin="admin"user_wyk_reader="wyk_reader_pwd"user_wyk_writer="wyk_writer_pwd"user_no_acl="no_acl_pwd";};
?(5)、修改kafka-server-start.sh文件,將剛剛配置的用戶列表添加到kafka啟動腳本內:
vim ../bin/kafka-server-start.sh
#修改最后一行,改成下面的內容
#-------------------------------------------
#exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"
source /etc/profile
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS -Djava.security.auth.login.config=$KAFKA_HOME/config/kafka_server_jaas.conf kafka.Kafka "$@"
至此,kafka集群已完成了開啟SASL安全認證并添加了四個用戶admin、wyk_writer、wyk_reader?、no_acl。
(6)、重啟kafka集群
$KAFKA_HOME/bin/kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
控制臺客戶端配置認證文件
集群雖然開啟了SASL認證,接下來我們可以在控制臺使用consumer和producer命令行測試是否有權限,這個是客戶端級別的測試,因此只需要隨便找個kafka客戶端就可以用,不需要在每臺kafka broker機器上修改。如果不想要在控制臺測試可以跳過直接看后面的對比以及python客戶端訪問。
要在控制臺使用命令行需要先配置sasl用戶,步驟如下:
生產者:
(1)、新增配置文件wyk_writer_jaas.conf,這里的用戶名密碼必須和服務器端配置的一樣,我們的客戶端是拿著這個認證信息去和kafka 的服務端做校驗,如果不匹配就等同于登錄失敗。
vim wyk_writer_jaas.conf
#添加下面的內容
#-------------------------------
KafkaClient { org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="wyk_writer"password="wyk_writer_pwd";};
(2)、修改生產者啟動腳本,在啟動生產者客戶端時會去加載該認證文件。
vim ../bin/kafka-console-producer.sh
#修改最后一行,改成下面的內容
#-------------------------------------------
#exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleProducer "$@"
source /etc/profile
exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=$KAFKA_HOME/config/wyk_writer_jaas.conf kafka.tools.ConsoleProducer "$@"
(3)、修改?producer.properties 添加SASL認證。
vim producer.properties
#添加下面的內容
#-------------------------------------------
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
消費者:
(1)、新增配置文件wyk_reader_jaas.conf,這里的用戶名密碼必須和服務器端配置的一樣,我們的客戶端是拿著這個認證信息去和kafka 的服務端做校驗,如果不匹配就等同于登錄失敗。
vim wyk_reader_jaas.conf
#添加下面的內容
#-------------------------------
KafkaClient {org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="wyk_reader"password="wyk_reader_pwd";
};
(2)、修改消費者啟動腳本,在啟動消費者客戶端時會去加載該認證文件。
vim ../bin/kafka-console-consumer.sh?
#修改最后一行,改成下面的內容
#-------------------------------------------
#exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleConsumer "$@"
source /etc/profile
exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=$KAFKA_HOME/config/wyk_reader_jaas.conf kafka.tools.ConsoleConsumer "$@"
(3)、修改 consumer.properties 添加SASL認證。需注意,這個配置文件中有一個消費者組group.id=test-consumer-group,待會在給控制臺客戶端授權消費者權限時還需要指定這個消費者組。
vim consumer.properties
#添加下面的內容
#-------------------------------------------
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
?
ACL授權及驗證
我們已經開啟了SASL,并給控制臺客戶端配置了生產者和消費者的認證文件,下面我們來測試授權之后和未授權用戶的區別。
為了起到對比效果,我們先新增一個認證文件用戶為no_acl:
(1)、新增配置文件no_acl_jaas.conf
vim no_acl_jaas.conf
#添加下面的內容
#-------------------------------
KafkaClient {org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="no_acl"password="no_acl_pwd";
};
(2)、使用下面的命令行授權用戶 wyk_reader 對主題csdn01 的讀權限,授權用戶 wyk_writer 對主題csdn01 的寫權限,no_acl用戶不設置任何權限用做對比。
# 給用戶wyk_writer 添加csdn01主題的 生產者權限
bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=wyk01:2181,wyk02:2181,wyk03:2181 --add --allow-principal User:wyk_writer --operation Write --topic csdn01# 給用戶wyk_reader 添加csdn01主題的 消費者權限
#需要注意這里消費者還需要給消費者組配置權限,消費者組名稱要和consumer.properties中配置的group.id一致
bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=wyk01:2181,wyk02:2181,wyk03:2181 --add --allow-principal User:wyk_reader --operation Read --topic csdn01 --group test-consumer-group
?
(3)、驗證ACL權限:
(3.1)、當使用no_acl用戶作為生產者時可以看到啟動時會報錯:Not authorized to access topics:[csdn01]。而使用擁有csdn01主題的生產者權限的用戶wyk_writer啟動時可以正常的使用:
(3.2)、?當使用no_acl用戶作為消費者時可以看到啟動時會報錯:Not authorized to access topics:[csdn01]。而使用擁有csdn01主題的消費者權限的用戶wyk_reader啟動時可以正常的消費數據:
(3.3)、將生產者的sasl認證文件里的密碼 改為錯誤密碼后,啟動時會報錯invalid username or password:
Python客戶端訪問?
接下來我們用python客戶端訪問開啟了sasl認證的kafka,首先我們在kafka_server_jaas.conf 配置文件中添加一個用戶python_wyk (需要重啟kafka集群),并授權對主題csdn01的讀寫權限用作python客戶端的測試:
生產者腳本:
from kafka import KafkaProducer
import json
producer = KafkaProducer(bootstrap_servers=['wyk01:9092','wyk02:9092','wyk03:9092'],security_protocol="SASL_PLAINTEXT",sasl_mechanism="PLAIN",sasl_plain_username="python_wyk",sasl_plain_password="python_wyk_pwd"
)producer.send('csdn01', json.dumps({"id":"1","name":"wyk1","company":"csdn1"}).encode('utf-8'))
producer.send('csdn01', json.dumps({"id":"2","name":"wyk2","company":"csdn2"}).encode('utf-8'))
producer.send('csdn01', json.dumps({"id":"3","name":"wyk3","company":"csdn3"}).encode('utf-8'))
producer.flush()
消費者腳本:?
from kafka import KafkaConsumer
from kafka.structs import TopicPartition
import jsonconsumer = KafkaConsumer('csdn01',bootstrap_servers=['wyk01:9092','wyk02:9092','wyk03:9092'],group_id='test-consumer-group',auto_offset_reset='latest' ,enable_auto_commit=False,security_protocol="SASL_PLAINTEXT",sasl_mechanism="PLAIN",sasl_plain_username="python_wyk",sasl_plain_password="python_wyk_pwd"
)
for msg in consumer:print(msg)
ACL常用命令
查看權限列表:
#查看所有權限列表
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=wyk01:2181 --list#查看指定topic的權限列表
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=wyk01:2181 --list --topic csdn01
添加讀寫權限:
# 給用戶wyk_writer 添加csdn01主題的 生產者權限
bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=wyk01:2181,wyk02:2181,wyk03:2181 --add --allow-principal User:wyk_writer --operation Write --topic csdn01# 給用戶wyk_reader 添加csdn01主題的 消費者權限
#需要注意這里消費者還需要給消費者組配置權限,消費者組名稱要和consumer.properties中配置的group.id一致
bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=wyk01:2181,wyk02:2181,wyk03:2181 --add --allow-principal User:wyk_reader --operation Read --topic csdn01 --group test-consumer-group
移除權限:
#移除用戶wyk_all 對csdn01的讀寫權限
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=wyk01:2181 --remove --allow-principal User:wyk_all --operation Read --operation Write --topic csdn01
添加黑名單:
# 禁止來自ip:192.168.145.100的用戶 wyk_no對csdn01的消費權限,允許其他所有用戶的消費權限
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=wyk01:2181 --add --allow-principal User:* --allow-host * --deny-principal User:wyk_no --deny-host 192.168.145.100 --operation Read --topic csdn01
添加白名單:
#添加用戶wyk_yes在ip192.168.145.100 對csdn01的讀寫權限
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=wyk01:2181 --add --allow-principal User:wyk_yes --allow-host 192.168.145.100 --operation Read --operation Write --topic csdn01
希望本文對你有幫助,請點個贊鼓勵一下作者吧~ 謝謝!
總結
以上是生活随笔為你收集整理的Kafka系列(五)、开启SASL安全认证以及配置ACL权限控制的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Almost Sorted Array
- 下一篇: [Python|生信]从Fasta文件出