消息中间件 --- Kafka快速入门
生活随笔
收集整理的這篇文章主要介紹了
消息中间件 --- Kafka快速入门
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
消息中間件 --- Kafka 快速入門
消息中間件:https://blog.51cto.com/u_9291927/category33
GitHub: GitHub - scorpiostudio/HelloKafka: HelloKafka
- Kafka快速入門(一)--- Kafka簡介:https://blog.51cto.com/9291927/2493953
- Kafka快速入門(二)--- Kafka架構:https://blog.51cto.com/9291927/2497814
- Kafka快速入門(三)--- Kafka核心技術:https://blog.51cto.com/9291927/2497820
- Kafka快速入門(四)--- Kafka高級功能:https://blog.51cto.com/9291927/2497828
- Kafka快速入門(五)--- Kafka管理:https://blog.51cto.com/9291927/2497842
- Kafka快速入門(六)--- Kafka集群部署:https://blog.51cto.com/9291927/2498428
- Kafka快速入門(七)--- Kafka監控:https://blog.51cto.com/9291927/2498434
- Kafka快速入門(八)--- Confluent Kafka簡介:https://blog.51cto.com/9291927/2499090
- Kafka快速入門(九)--- C客戶端:https://blog.51cto.com/9291927/2502001
- Kafka快速入門(十)--- C++客戶端:https://blog.51cto.com/9291927/2502063
- Kafka快速入門(十一)--- RdKafka源碼分析:https://blog.51cto.com/9291927/2504489
- Kafka快速入門(十二)--- Python客戶端:https://blog.51cto.com/9291927/2504495
Python3 學習(五十四):confluent-kafka 模塊的使用
From:https://blog.csdn.net/liao392781/article/details/90487438
coufluent-kafka 是 Python 模塊,是對 librdkafka 的輕量級封裝,librdkafka 又是基于 c/c++ 的kafka 庫,性能上不必多說。使用上要優于 kafka-python。confluent-kafka-python 是 Confluent 用于 Apache Kafka(?Apache Kafka ) 和 Confluent Platform(?Data in Motion Platform for the Enterprise | Confluent )?的 Python 客戶端。
特征:
- 高性能 :?confluent-kafka-python 是 librdkafka(?https://github.com/edenhill/librdkafka ) 的一個輕量級包裝器,librdkafka是一個 經過精心調優的C客戶端。
- 可靠性 :?在編寫Apache Kafka客戶端時,有很多細節要做。我們將它們放在一個地方(librdkafka)并在我們所有客戶中利用這項工作(也是匯合 - kafka-go (?https://github.com/confluentinc/confluent-kafka-go ) 和 confluent-kafka-dotnet (?GitHub - confluentinc/confluent-kafka-dotnet: Confluent's Apache Kafka .NET client ))
示例代碼:
# -*- coding: utf-8 -*- # @Author : # @Date : # @File : kafka_operate.py # @description : XXXimport time import datetime from confluent_kafka import avro from confluent_kafka.avro import AvroProducer, AvroConsumer from confluent_kafka.avro.serializer import SerializerError from confluent_kafka import Producer, Consumer, KafkaErrordef delivery_report(err, msg):""" Called once for each message produced to indicate delivery result.Triggered by poll() or flush(). """if err is not None:print('Message delivery failed: {}'.format(err))else:print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))def kafka_producer():p = Producer({'bootstrap.servers': 'mybroker1,mybroker2'})while True:try:current_date = str(datetime.datetime.now().replace(microsecond=0))data = current_date# Trigger any available delivery report callbacks from previous produce() callsp.poll(0)# Asynchronously produce a message, the delivery report callback# will be triggered from poll() above, or flush() below, when the message has# been successfully delivered or failed permanently.p.produce('my_topic', data.encode('utf-8'), callback=delivery_report)time.sleep(1)except BaseException as be:print(be)break# Wait for any outstanding messages to be delivered and delivery report# callbacks to be triggered.p.flush()def kafka_consumer():c = Consumer({'bootstrap.servers': 'mybroker','group.id': 'mygroup','auto.offset.reset': 'earliest'})c.subscribe(['my_topic'])while True:msg = c.poll(1.0)if msg is None:continueif msg.error():print("Consumer error: {}".format(msg.error()))continueprint('Received message: {}'.format(msg.value().decode('utf-8')))c.close()def kafka_avro_producer():value_schema_str = """{"namespace": "my.test","name": "value","type": "record","fields" : [{"name" : "name","type" : "string"}]}"""key_schema_str = """{"namespace": "my.test","name": "key","type": "record","fields" : [{"name" : "name","type" : "string"}]}"""value_schema = avro.loads(value_schema_str)key_schema = avro.loads(key_schema_str)value = {"name": "Value"}key = {"name": "Key"}avro_producer = AvroProducer({'bootstrap.servers': 'mybroker,mybroker2','schema.registry.url': 'http://schem_registry_host:port'}, default_key_schema=key_schema, default_value_schema=value_schema)avro_producer.produce(topic='my_topic', value=value, key=key)avro_producer.flush()def kafka_avro_consumer():c = AvroConsumer({'bootstrap.servers': 'mybroker,mybroker2','group.id': 'groupid','schema.registry.url': 'http://127.0.0.1:8081'})c.subscribe(['my_topic'])while True:try:msg = c.poll(10)except SerializerError as e:print("Message deserialization failed for {}: {}".format(msg, e))breakif msg is None:continueif msg.error():print("AvroConsumer error: {}".format(msg.error()))continueprint(msg.value())c.close()if __name__ == '__main__':pass 創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎總結
以上是生活随笔為你收集整理的消息中间件 --- Kafka快速入门的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 辗转相除法--最大公约数/最大公倍数
- 下一篇: Linux dd 命令