python-kafka 常用 api 汇总
簡(jiǎn)介
???? python連接kafka的標(biāo)準(zhǔn)庫,kafka-python和pykafka。kafka-python使用的人多是比較成熟的庫,kafka-python并沒有zk的支持。pykafka是Samsa的升級(jí)版本,使用samsa連接zookeeper,生產(chǎn)者直接連接kafka服務(wù)器列表,消費(fèi)者才用zookeeper。
安裝
# PyPI安裝 pip install kafka-python# conda安裝 conda install -c conda-forge kafka-python# anaconda自帶pip安裝 /root/anaconda3/bin/pip install kafka-python官方鏈接
- 官網(wǎng):https://kafka-python.readthedocs.io/en/master/index.html
- git:https://github.com/dpkp/kafka-python
注意:1.4.0 以上的 kafka-python 版本使用了獨(dú)立的心跳線程去上報(bào)心跳
生產(chǎn)者
API:https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html。
生產(chǎn)者代碼是線程安全的,支持多線程,而消費(fèi)者則不然。
類 KafkaProducer
class kafka.KafkaProducer(**configs)
- bootstrap_servers –'host[:port]'字符串,或者由'host[:port]'組成的字符串,形如['10.202.24.5:9096', '10.202.24.6:9096', '10.202.24.7:9096']),其中,host為broker(Broker:緩存代理,Kafka集群中的單臺(tái)服務(wù)器)地址,默認(rèn)值為 localhost, port默認(rèn)值為9092,這里可以不用填寫所有broker的host和port,但必須保證至少有一個(gè)broker)
- key_serializer (可調(diào)用對(duì)象) –用于轉(zhuǎn)換用戶提供的key值為字節(jié),必須返回字節(jié)數(shù)據(jù)。 如果為None,則等同調(diào)用f(key)。 默認(rèn)值: None.
- value_serializer(可調(diào)用對(duì)象) – 用于轉(zhuǎn)換用戶提供的value消息值為字節(jié),必須返回字節(jié)數(shù)據(jù)。 如果為None,則等同調(diào)用f(value)。 默認(rèn)值: None.
方法
send(topic, value=None, key=None, headers=None, partition=None, timestamp_ms=None)
函數(shù)返回FutureRecordMetadata類型的RecordMetadata數(shù)據(jù)
- topic(str) – 設(shè)置消息將要發(fā)布到的主題,即消息所屬主題
- value(可選) – 消息內(nèi)容,必須為字節(jié)數(shù)據(jù),或者通過value_serializer序列化后的字節(jié)數(shù)據(jù)。如果為None,則key必填,消息等同于“刪除”。( If value is None, key is required and message acts as a ‘delete’)
- partition (int, 可選) – 指定分區(qū)。如果未設(shè)置,則使用配置的partitioner
- key (可選) – 和消息對(duì)應(yīng)的key,可用于決定消息發(fā)送到哪個(gè)分區(qū)。如果partition為None,則相同key的消息會(huì)被發(fā)布到相同分區(qū)(但是如果key為None,則隨機(jī)選取分區(qū))(If partition is None (and producer’s partitioner config is left as default), then messages with the same key will be delivered to the same partition (but if key is None, partition is chosen randomly)). 必須為字節(jié)數(shù)據(jù)或者通過配置的key_serializer序列化后的字節(jié)數(shù)據(jù).
- headers (可選) – 設(shè)置消息header,header-value鍵值對(duì)表示的list。list項(xiàng)為元組:格式 (str_header,bytes_value)
- timestamp_ms (int, 可選) –毫秒數(shù) (從1970 1月1日 UTC算起) ,作為消息時(shí)間戳。默認(rèn)為當(dāng)前時(shí)間
flush(timeout=None)
發(fā)送所有可以立即獲取的緩沖消息(即時(shí)linger_ms大于0),線程block直到這些記錄發(fā)送完成。當(dāng)一個(gè)線程等待flush調(diào)用完成而block時(shí),其它線程可以繼續(xù)發(fā)送消息。
注意:flush調(diào)用不保證記錄發(fā)送成功
metrics(raw=False)
獲取生產(chǎn)者性能指標(biāo)。
#-*- encoding:utf-8 -*- from kafka import KafkaProducer import jsonproducer = KafkaProducer(bootstrap_servers=['127.0.0.1:9092']) for i in range(0, 100):producer.send('MY_TOPIC1', value=b'lai zi shouke de msg', key=None, headers=None, partition=None, timestamp_ms=None)# Block直到單條消息發(fā)送完或者超時(shí) future = producer.send('MY_TOPIC1', value=b'another msg',key=b'othermsg') result = future.get(timeout=60) print(result) # future.get函數(shù)等待單條消息發(fā)送完成或超時(shí),經(jīng)測(cè)試,必須有這個(gè)函數(shù),不然發(fā)送不出去,或用time.sleep代替,待驗(yàn)證# Block直到所有阻塞的消息發(fā)送到網(wǎng)絡(luò) # 注意: 該操作不保證傳輸或者消息發(fā)送成功,僅在配置了linger_ms的情況下有用。(It is really only useful if you configure internal batching using linger_ms# 序列化json數(shù)據(jù) producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092', value_serializer=lambda v: json.dumps(v).encode('utf-8')) producer.send('MY_TOPIC1', {'shouke':'kafka'})# 序列化字符串key producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092', key_serializer=str.encode) producer.send('MY_TOPIC1', b'shouke', key='strKey')# 壓縮 producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092',compression_type='gzip') for i in range(2):producer.send('MY_TOPIC1', ('msg %d' % i).encode('utf-8'))# 消息記錄攜帶header producer.send('MY_TOPIC1', value=b'c29tZSB2YWx1ZQ==', headers=[('content-encoding', b'base64'),])# 獲取性能數(shù)據(jù)(注意,實(shí)踐發(fā)現(xiàn)分區(qū)較多的情況下,該操作比較耗時(shí) metrics = producer.metrics() print(metrics) producer.flush()實(shí)踐中遇到錯(cuò)誤: kafka.errors.NoBrokersAvailable: NoBrokersAvailable,解決方案如下:
進(jìn)入到配置目錄(config),編輯server.properties文件,查找并設(shè)置listener,配置監(jiān)聽端口,格式:listeners = listener_name://host_name:port,供kafka客戶端連接用的ip和端口,例中配置如下:
listeners=PLAINTEXT://127.0.0.1:9092
消費(fèi)者
參考API:https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html
消費(fèi)者代碼不是線程安全的,最好不要用多線程
類KafkaConsumer
class kafka.KafkaConsumer(*topics, **configs)
*topics (str) – 可選,設(shè)置需要訂閱的topic,如果未設(shè)置,需要在消費(fèi)記錄前調(diào)用subscribe或者assign。
- bootstrap_servers –'host[:port]'字符串,或者由'host[:port]'組成的字符串,形如['10.202.24.5:9096', '10.202.24.6:9096', '10.202.24.7:9096']),其中,host為broker(Broker:緩存代理,Kafka集群中的單臺(tái)服務(wù)器)地址,默認(rèn)值為 localhost, port默認(rèn)值為9092,這里可以不用填寫所有broker的host和port,但必須保證至少有一個(gè)broker)
- client_id (str) – 客戶端名稱,默認(rèn)值: ‘kafka-python-{version}’
- group_id (str or None) – 消費(fèi)組名稱。如果為None,則通過group coordinator auto-partition分區(qū)分配,offset提交被禁用。默認(rèn)為None
- auto_offset_reset (str) – 重置offset策略: 'earliest'將移動(dòng)到最老的可用消息, 'latest'將移動(dòng)到最近消息。 設(shè)置為其它任何值將拋出異常。默認(rèn)值:'latest'。
- enable_auto_commit (bool) – 如果為True,將自動(dòng)定時(shí)提交消費(fèi)者offset。默認(rèn)為True。
- auto_commit_interval_ms (int) – 自動(dòng)提交offset之間的間隔毫秒數(shù)。如果enable_auto_commit 為true,默認(rèn)值為: 5000。
- value_deserializer(可調(diào)用對(duì)象) - 攜帶原始消息value并返回反序列化后的value
- consumer_timeout_ms – 毫秒數(shù),若不指定 consumer_timeout_ms,默認(rèn)一直循環(huán)等待接收,若指定,則超時(shí)返回,不再等待
- max_poll_interval_ms – 毫秒數(shù),它表示最大的poll數(shù)據(jù)間隔,如果超過這個(gè)間隔沒有發(fā)起pool請(qǐng)求,但heartbeat仍舊在發(fā),就認(rèn)為該 consumer 處于 livelock 狀態(tài),進(jìn)行 reblancing
- session_timout_ms – 毫秒數(shù),控制心跳超時(shí)時(shí)間。在分布式系統(tǒng)中,由于網(wǎng)絡(luò)問題你不清楚沒接收到心跳,是因?yàn)閷?duì)方真正掛了還是只是因?yàn)樨?fù)載過重沒來得及發(fā)生心跳或是網(wǎng)絡(luò)堵塞。所以一般會(huì)約定一個(gè)時(shí)間,超時(shí)即判定對(duì)方掛了
- heartbeat_interval_ms – 毫秒數(shù),控制心跳發(fā)送頻率,頻率越高越不容易被誤判,但也會(huì)消耗更多資源。
- max_pool_record(int),kafka 每次 pool 拉取消息的最大數(shù)量
subscribe(topics=(), pattern=None, listener=None)
訂閱需要的主題
- topics (list) – 需要訂閱的主題列表
- pattern (str) – 用于匹配可用主題的模式,即正則表達(dá)式。注意:必須提供topics、pattern兩者參數(shù)之一,但不能同時(shí)提供兩者。
metrics(raw=False)
獲取消費(fèi)者性能指標(biāo)。
#-*- encoding:utf-8 -*- from kafka import KafkaConsumer from kafka import TopicPartition import json consumer = KafkaConsumer('MY_TOPIC1',bootstrap_servers=['127.0.0.1:9092'],auto_offset_reset='latest', # 消費(fèi) kafka 中最近的數(shù)據(jù),如果設(shè)置為 earliest 則消費(fèi)最早的未被消費(fèi)的數(shù)據(jù)enable_auto_commit=True, # 自動(dòng)提交消費(fèi)者的 offsetauto_commit_interval_ms=3000, # 自動(dòng)提交消費(fèi)者 offset 的時(shí)間間隔group_id='MY_GROUP1',consumer_timeout_ms= 10000, # 如果 10 秒內(nèi) kafka 中沒有可供消費(fèi)的數(shù)據(jù),自動(dòng)退出client_id='consumer-python3' )for msg in consumer:print (msg)print('topic: ', msg.topic)print('partition: ', msg.partition)print('key: ', msg.key, 'value: ', msg.value)print('offset:', msg.offset)print('headers:', msg.headers)# Get consumer metrics metrics = consumer.metrics() print(metrics)# 通過assign、subscribe兩者之一為消費(fèi)者設(shè)置消費(fèi)的主題 consumer = KafkaConsumer(bootstrap_servers=['127.0.0.1:9092'],auto_offset_reset='latest',enable_auto_commit=True, # 自動(dòng)提交消費(fèi)數(shù)據(jù)的 offsetconsumer_timeout_ms= 10000, # 如果 10 秒內(nèi) kafka 中沒有可供消費(fèi)的數(shù)據(jù),自動(dòng)退出value_deserializer=lambda m: json.loads(m.decode('ascii')), #消費(fèi)json 格式的消息client_id='consumer-python3' )# consumer.assign([TopicPartition('MY_TOPIC1', 0)]) # msg = next(consumer) # print(msg) consumer.subscribe('MY_TOPIC1') for msg in consumer:print (msg)客戶端
- 參考API: https://kafka-python.readthedocs.io/en/master/apidoc/KafkaClient.html
- 用于異步請(qǐng)求/響應(yīng)網(wǎng)絡(luò)I / O的網(wǎng)絡(luò)客戶端。
- 這是一個(gè)內(nèi)部類,用于實(shí)現(xiàn)面向用戶的生產(chǎn)者和消費(fèi)者客戶端。
- 此類不是線程安全的!
- 參考API:?https://kafka-python.readthedocs.io/en/master/apidoc/KafkaAdminClient.html?
- 管理Kafka集群
類?KafkaClient
class kafka.client.KafkaClient(**configs)
- bootstrap_servers –'host[:port]'字符串,或者由'host[:port]'組成的字符串,形如['10.202.24.5:9096', '10.202.24.6:9096', '10.202.24.7:9096']),其中,host為broker(Broker:緩存代理,Kafka集群中的單臺(tái)服務(wù)器)地址,默認(rèn)值為 localhost, port默認(rèn)值為9092,這里可以不用填寫所有broker的host和port,但必須保證至少有一個(gè)broker)
- client_id (str) – 客戶端名稱,默認(rèn)值: ‘kafka-python-{version}’
- request_timeout_ms (int) – 客戶端請(qǐng)求超時(shí)時(shí)間,單位毫秒。默認(rèn)值: 30000.
方法
brokers()
獲取所有broker元數(shù)據(jù)
available_partitions_for_topic(topic)
返回主題的所有分區(qū)
#-*- encoding:utf-8 -*- from kafka.client import KafkaClientclient = KafkaClient(bootstrap_servers=['127.0.0.1:9092'], request_timeout_ms=3000)# 獲取所有broker brokers = client.cluster.brokers() for broker in brokers:print('broker: ', broker) # broker: BrokerMetadata(nodeId=0, host='127.0.0.1', port=9092, rack=None)print('broker nodeId: ', broker.nodeId) # broker nodeId: 0# 獲取主題的所有分區(qū) topic = 'MY_TOPIC1' partitions = client.cluster.available_partitions_for_topic(topic) print(partitions) # {0}partition_dict = {} partition_dict[topic] = [partition for partition in partitions] print(partition_dict) # {'MY_TOPIC1': [0]}?
類?KafkaAdminClient
class kafka.client.KafkaAdminClient(**configs)
- bootstrap_servers –'host[:port]'字符串,或者由'host[:port]'組成的字符串,形如['10.202.24.5:9096', '10.202.24.6:9096', '10.202.24.7:9096']),其中,host為broker(Broker:緩存代理,Kafka集群中的單臺(tái)服務(wù)器)地址,默認(rèn)值為 localhost, port默認(rèn)值為9092,這里可以不用填寫所有broker的host和port,但必須保證至少有一個(gè)broker)
- client_id (str) – 客戶端名稱,默認(rèn)值: ‘kafka-python-{version}’
- request_timeout_ms (int) – 客戶端請(qǐng)求超時(shí)時(shí)間,單位毫秒。默認(rèn)值: 30000.
方法
list_topics()
獲取所有的 topic
create_partitions(topic_partitions,timeout_ms = None,validate_only = False )
為現(xiàn)有主題創(chuàng)建其他分區(qū)。返回值:合適版本的CreatePartitionsResponse類。
- topic_partitions –主題名稱字符串到NewPartition對(duì)象的映射。
- timeout_ms –代理返回之前等待創(chuàng)建新分區(qū)的毫秒數(shù)。
- validate_only –如果為True,則實(shí)際上不創(chuàng)建新分區(qū)。默認(rèn)值:False
create_topics(new_topics,timeout_ms = None,validate_only = False )
在集群中創(chuàng)建新主題。返回值:合適版本的CreateTopicResponse類。
- new_topics – NewTopic對(duì)象的列表。
- timeout_ms –代理返回之前等待創(chuàng)建新主題的毫秒。
- validate_only –如果為True,則實(shí)際上不創(chuàng)建新主題。并非所有版本都支持。默認(rèn)值:False
delete_topics(主題,timeout_ms =無)
從集群中刪除主題。返回值:合適版本的DeleteTopicsResponse類。
- 主題-主題名稱的字符串列表。
- timeout_ms –代理返回之前等待刪除主題的毫秒數(shù)。
describe_consumer_groups(group_ids,group_coordinator_id = None,include_authorized_operations = False)
描述一組消費(fèi)者group。返回值:組說明列表。目前,組描述是DescribeGroupsResponse的原始結(jié)果。
- group_ids –消費(fèi)者組ID的列表。這些通常是作為字符串的組名。
- group_coordinator_id –組的協(xié)調(diào)器代理的node_id。如果設(shè)置為None,它將查詢?nèi)杭械拿總€(gè)組以找到該組的協(xié)調(diào)器。如果您已經(jīng)知道組協(xié)調(diào)器,則明確指定此選項(xiàng)對(duì)于避免額外的網(wǎng)絡(luò)往返很有用。這僅在所有g(shù)roup_id具有相同的協(xié)調(diào)器時(shí)才有用,否則會(huì)出錯(cuò)。默認(rèn)值:無。
- include_authorized_operations –是否包括有關(guān)允許組執(zhí)行的操作的信息。僅在API版本> = v3上受支持。默認(rèn)值:False。
list_consumer_group_offsets(group_id,group_coordinator_id = None,partitions = None)
獲取單個(gè)消費(fèi)者組的消費(fèi)者offset。注意:這不會(huì)驗(yàn)證group_id或分區(qū)在集群中是否實(shí)際存在。一旦遇到任何錯(cuò)誤,就會(huì)立即報(bào)錯(cuò)。? ?返回字典:具有TopicPartition鍵和OffsetAndMetada值的字典。省略未指定且group_id沒有記錄偏移的分區(qū)。偏移值-1表示group_id對(duì)于該TopicPartition沒有偏移。一個(gè)-1只能發(fā)生于顯式指定的分區(qū)。?
- group_id –要獲取其偏移量的消費(fèi)者組ID名稱。
- group_coordinator_id –組的協(xié)調(diào)代理的node_id。如果設(shè)置為None,將查詢?nèi)杭圆檎医M協(xié)調(diào)器。如果您已經(jīng)知道組協(xié)調(diào)器,則明確指定此選項(xiàng)對(duì)于防止額外的網(wǎng)絡(luò)往返很有用。默認(rèn)值:無。
- partitions –要獲取其偏移量的TopicPartitions列表。在> = 0.10.2上,可以將其設(shè)置為“無”以獲取使用者組的所有已知偏移量。默認(rèn)值:無。
list_consumer_groups(broker_ids = None)
列出集群已知的所有消費(fèi)者組。這將返回消費(fèi)者組元組的列表。元組由使用者組名稱和使用者組協(xié)議類型組成。僅返回將偏移量存儲(chǔ)在Kafka中的消費(fèi)者組。對(duì)于使用Kafka <0.9 API創(chuàng)建的群組,協(xié)議類型將為空字符串,因?yàn)楸M管它們將偏移量存儲(chǔ)在Kafka中,但它們并不使用Kafka進(jìn)行群組協(xié)調(diào)。對(duì)于使用Kafka> = 0.9創(chuàng)建的群組,協(xié)議類型通常為“消費(fèi)者”。
- broker_ids –用于查詢使用者組的代理節(jié)點(diǎn)ID的列表。如果設(shè)置為None,將查詢集群中的所有代理。明確指定經(jīng)紀(jì)人對(duì)于確定哪些消費(fèi)者組由這些經(jīng)紀(jì)人進(jìn)行協(xié)調(diào)很有用。默認(rèn)值:無
?
總結(jié)
以上是生活随笔為你收集整理的python-kafka 常用 api 汇总的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: STM32读取伺服电机编码器信号
- 下一篇: 机器学习算法之线性回归