.Net Core 集成 Kafka
最近維護的一個系統并發有點高,所以想引入一個消息隊列來進行削峰。考察了一些產品,最終決定使用kafka來當做消息隊列。以下是關于kafka的一些知識的整理筆記。
kafka
kafka 是分布式流式平臺。它由linkedin開發,后貢獻給了Apache開源組織并成為頂級開源項目。它可以應用在高并發場景下的日志系統,也可以當作消息隊列來使用,也可以當作消息服務對系統進行解耦。
流處理平臺有以下三種特性:
1.可以讓你發布和訂閱流式的記錄。這一方面與消息隊列或者企業消息系統類似。2.可以儲存流式的記錄,并且有較好的容錯性。3.可以在流式記錄產生時就進行處理。
一般它可以應用于兩個場景:
1.構造實時流數據管道,它可以在系統或應用之間可靠地獲取數據。(相當于message queue)2.構建實時流式應用程序,對這些流數據進行轉換或者影響。(就是流處理,通過kafka stream topic和topic之間內部進行變化)
broker
kafka中的每個節點即每個服務器就是一個broker 。
topic
kafka中的topic是一個分類的概念,表示一類消息。生產者在生產消息的時候需要指定topic,消費者在消費消息的時候也需要指定topic。
partition
partition是分區的概念。kafka的一個topic可以有多個partition。每個partition會分散到不同的broker上,起到負載均衡的作用。生產者的消息會通過算法均勻的分散在各個partition上。
consumer group
kafka的消費者有個組的概念。一個partition可以被多consumer group訂閱。每個消息會廣播到每一個group中。但是每個消息只會被group中的一個consumer消費。相當于每個group,一個partition只能有一個consumer訂閱,所以group中的consumer數量不可以超過topic中partition的數量。并且消息的消費的順序在每個partition中是保證有序的,但是在多個partition之間是不保證的,因為consumer的消費速度是有快慢的。
所以如果要用kafka實現嚴格的消息隊列點對點模式那么我們可以設置一個partition并且設置一個consumer。如果對消息消費的順序不是那么敏感,那么可以設置多個partition來并行消費消息,提高吞吐量。
安裝kafka
為了能體驗下kafka,我們還是要實際安裝一下kafka,畢竟空想是沒有用的。現在有了docker,安裝起來也是相當滴簡單。我們只需要定義好docker-compose的yml就行了。
version: '3' services:zookeeper:image: wurstmeister/zookeeperports:- "2181:2181"kafka:image: wurstmeister/kafkadepends_on:- zookeeperports:- "9092:9092"environment:KAFKA_ADVERTISED_HOST_NAME: 192.168.0.117KAFKA_CREATE_TOPICS: "test:3:1"KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181我們在yml里定義2個service:
1.zookeeper,kafka的分布式依賴zookeeper,所以我需要先定義它。2.kafka ,kafka的定義有幾個地方要注意的。
?depends_on:zookeeper 指定kafka依賴zookeeper這個service,當啟動kafka的時候自動會啟動zookeeper。?KAFKA_ADVERTISED_HOST_NAME 這里要指定宿主機的ip?KAFKA_CREATE_TOPICS 這個變量只是的默認創建的topic。"test:3:1"代表創建一個名為test的topic并且創建3個分區1個復制。
定義好這些之后我們只需要使用docker-compose命令運行它:
sudo docker-compose up -d.net 操作 kafka
安裝好kafka的docker環境之后,下面演示下如何使用.net操作kafka,進行消息的生產與消費。
生產者
static async Task Main(string[] args){Console.WriteLine("Hello World Producer!");var config = new ProducerConfig{BootstrapServers = "192.168.0.117:9092",ClientId = Dns.GetHostName(),};using (var producer = new ProducerBuilder<Null, string>(config).Build()){string topic = "test";for (int i = 0; i < 100; i++){var msg = "message " + i;Console.WriteLine($"Send message: value {msg}");var result = await producer.ProduceAsync(topic, new Message<Null, string> { Value = msg });Console.WriteLine($"Result: key {result.Key} value {result.Value} partition:{result.TopicPartition}");Thread.Sleep(500);}}Console.ReadLine();}新建一個控制臺項目,從nuget安裝kafka的官方client。
Install-Package Confluent.Kafka代碼非常簡單,使用ProducerBuilder構造一個producer,然后調用ProduceAsync方法發送消息。
其中需要注意的是如果你的場景并發非常之高,官方文檔推薦的方法是Produce而不是ProduceAsync。這是一個比較迷的地方。按常理使用ProduceAsync應該比使用同步方法Produce能獲得更高的并發才對。但是文檔確確實實說高并發場景請使用Produce。可能是為了避免ProduceAsync結果返回的時候異步線程上下文切換造成的性能開銷。
原文:
消費者
static void Main(string[] args){Console.WriteLine("Hello World kafka consumer !");var config = new ConsumerConfig{BootstrapServers = "192.168.0.117:9092",GroupId = "foo",AutoOffsetReset = AutoOffsetReset.Earliest};var cancel = false;using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build()){var topic = "test";consumer.Subscribe(topic);while (!cancel){var consumeResult = consumer.Consume(CancellationToken.None);Console.WriteLine($"Consumer message: { consumeResult.Message.Value} topic: {consumeResult.Topic} Partition: {consumeResult.Partition}");}consumer.Close();}}消費者的演示代碼同樣很簡單。我們需要指定groupId,然后訂閱topic。使用ConsumerBuilder構造一個consumer,然后調用Consume方法進行消費就可以。
注意:
這里默認是自動commit消費。你也可以根據情況手動提交commit。
運行一下
我們運行一個生產者進程,按照500ms的速度生產消息。運行三個consumer進行消費,可以看到消息被均勻的推送到三個consumer上去。
總結
以上簡單的介紹了kafka的背景、安裝方法、使用場景。還簡單演示了如何使用.net來操作kafka。它可以當作流式計算平臺來使用,也可以當作傳統的消息隊列使用。它當前非常流行,網上的資料也多如牛毛。官方也提供了簡單易用的.net sdk ,為.net 平臺集成kafka提供了便利。
關注我的公眾號一起玩轉技術
總結
以上是生活随笔為你收集整理的.Net Core 集成 Kafka的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: C# 离线使用nuget
- 下一篇: Microsoft宣布将停止支持多个 .