在Asp.Net Core中集成Kafka
在我們的業務中,我們通常需要在自己的業務子系統之間相互發送消息,一端去發送消息另一端去消費當前消息,這就涉及到使用消息隊列MQ的一些內容,消息隊列成熟的框架有多種,這里你可以讀這篇文章來了解這些MQ的不同,這篇文章的主要目的是用來系統講述如何在Asp.Net Core中使用Kafka,整篇文章將介紹如何寫消息發送方代碼、消費方代碼、配套的工具的使用,希望讀完這篇文章之后對整個消息的運行機制有一定的理解,在這里通過一張圖來簡要了解一下消息隊列中的一些概念。
圖一 Kafka消息隊列
一 安裝NUGET包
在寫代碼之前首先要做的就是安裝nuget包了,我們這里使用的是Confluent.Kafka 1.0.0-RC4版本,具體項目要根據具體的時間來確定引用包的版本,這些包可能更新比較快。
圖二 引用Kafka包依賴
二?消息發送方(Producer)
1 在項目中添加所有觸發事件的接口 IIntegrationEvent,后面所有的觸發事件都是繼承自這個接口。
| /// <summary>/// 集成事件的接口定義/// </summary>public?interface?IIntegrationEvent {string?Key {?get;?set; }} |
2 定義Kafka生產者
| /// <summary>/// Kafka 生產者的 Domain Service/// </summary>public?class?KafkaProducer : DomainService {private?readonly?IConfiguration _config;private?readonly?ILogger<KafkaProducer> _logger;public?KafkaProducer(IConfiguration config,ILogger<KafkaProducer> logger) {_config = config;_logger = logger;}/// <summary>/// 發送事件/// </summary>/// <param name="event"></param>public?void?Produce(IIntegrationEvent @event) {var?topic = _config.GetValue<string>($"Kafka:Topics:{@event.GetType().Name}");var?producerConfig =?new?ProducerConfig {BootstrapServers = _config.GetValue<string>("Kafka:BootstrapServers"),MessageTimeoutMs = _config.GetValue<int>("Kafka:MessageTimeoutMs")};var?builder =?new?ProducerBuilder<string,?string>(producerConfig);using?(var?producer = builder.Build()) {try?{var?json = JsonConvert.SerializeObject(@event);var?dr = producer.ProduceAsync(topic,?new?Message<string,?string> { Key = @event.Key, Value = json }).GetAwaiter().GetResult();_logger.LogDebug("發送事件 {0} 到 {1} 成功", dr.Value, dr.TopicPartitionOffset);}?catch?(ProduceException<string,?string> ex) {_logger.LogError(ex,?"發送事件到 {0} 失敗,原因 {1} ", topic, ex.Error.Reason);}}}} |
在這里我們的Producer根據業務的需要定義在領域服務中,這里面最關鍵的就是Produce方法了,該方法的參數是繼承自IIntegrationEvent 接口的各種各樣事件,在這個方法中,我們獲取配置在appsetting.json中配置的各種Topic以及Kafka服務器的地址,具體的配置如下方截圖所示。
圖三 配置服務器地址以及各種Topic
通過當前配置我們就知道我們的消息要發往何處,然后我們就可以創建一個producer來將我們的事件(實際上是定義的數據結構)序列化成Json,然后通過異步的方式發送出去,這里需要注意我們創建的Producer要放在一個using塊中,這樣在創建完成并發送消息之后就會釋放當前生產者。這里如果發送失敗會在當前日志中記錄發送的值以及錯誤的原因從而便于進行調試。這里舉出其中的一個事件RepairContractFinishedEvent為例來說明。
| /// <summary>/// 維修合同完成的事件/// </summary>public?class?RepairContractFinishedEvent : IIntegrationEvent {public?RepairContract RepairContract {?get;?set; }//一個維修合同會對應多個調整單public?List<RepairContractAdjust> RepairContractAdjusts {?get;?set; }public?string?Key {?get;?set; }} |
這個里面RepairContract以及List集合都是我們定義的一種數據結構。
最后我們來看看在具體的領域層中我們該如何觸發此事件的,這里我們也定義了一個叫做IRepairContractEventManager接口的領域服務,并在里面定義了一個叫做Finished的接口,然后在RepairContractEventManager中實現該方法。
| public?class?RepairContractEventManager : DomainService, IRepairContractEventManager {private?readonly?KafkaProducer _producer;private?readonly?IRepository<RepairContract, Guid> _repairContractRepository;private?readonly?IRepository<RepairContractAdjust, Guid> _repairContractAdjustRepository;public?RepairContractEventManager(KafkaProducer producer,IRepository<RepairContract, Guid> repairContractRepository,IRepository<RepairContractAdjust, Guid> repairContractAdjustRepository) {_producer = producer;_repairContractRepository = repairContractRepository;_repairContractAdjustRepository = repairContractAdjustRepository;}public?void?Finished(Guid repairContractId) {var?repairContract = _repairContractRepository.GetAll().Include(c => c.RepairContractWorkItems).ThenInclude(w => w.Materials).SingleOrDefaultAsync(c => c.Id == repairContractId).GetAwaiter().GetResult();var?repairContractAdjusts = _repairContractAdjustRepository.GetAll().Include(a => a.WorkItems).ThenInclude(w => w.Materials).Where(a => a.RepairContractId == repairContractId).ToListAsync().GetAwaiter().GetResult();var?@event?=?new?RepairContractFinishedEvent {Key = repairContract?.Code,RepairContract = repairContract,RepairContractAdjusts = repairContractAdjusts};_producer.Produce(@event);}} |
這段代碼就是組裝RepairContractFinishedEvent的具體實現過程,然后調用我們之前創建的KafkaProducer對象然后將消息發送出去,這樣在需要觸發當前RepairContractFinishedEvent 的地方來注入IRepairContractEventManager接口,然后調對應的Finished方法,這樣就完成了整個消息的發送的過程了。
三 查看消息的發送
在發送完消息后我們可以到Kafka 集群 Control Center中查找我們發送的所有消息。選擇其中的一條消息,雙擊,然后選擇INSPECT來查看發送的消息
圖四 Kafka Control Center中查看發送消息
四 消息的接收方(Consumer)
在正確創建消息的發送方后緊接著就是定義消息的接收方了,消息的接收方顧名思義就是消費剛才消息的一方,這里的步驟和發送類似,但是也有很大的不同,消息的消費方核心是一個后臺服務,并且在單獨的線程中監聽來自發送方的消息,并進行消費,這里我們先定義一個叫做KafkaConsumerHostedService的基類,我們具體來看看代碼。
| /// <summary>/// Kafka 消費者的后臺服務基礎類/// </summary>/// <typeparam name="T">事件類型</typeparam>public?abstract?class?KafkaConsumerHostedService<T> : BackgroundService?where?T : IIntegrationEvent {protected?readonly?IServiceProvider _services;protected?readonly?IConfiguration _config;protected?readonly?ILogger<KafkaConsumerHostedService<T>> _logger;public?KafkaConsumerHostedService(IServiceProvider services, IConfiguration config, ILogger<KafkaConsumerHostedService<T>> logger) {_services = services;_config = config;_logger = logger;}/// <summary>/// 消費該事件,比如調用 Application Service 持久化數據等/// </summary>/// <param name="event">事件內容</param>protected?abstract?void?DoWork(T @event);/// <summary>/// 構造 Kafka 消費者實例,監聽指定 Topic,獲得最新的事件/// </summary>/// <param name="stoppingToken">終止標識</param>/// <returns></returns>protected?override?async Task ExecuteAsync(CancellationToken stoppingToken) {await Task.Factory.StartNew(() => {var?topic = _config.GetValue<string>($"Kafka:Topics:{typeof(T).Name}");var?consumerConfig =?new?ConsumerConfig {BootstrapServers = _config.GetValue<string>("Kafka:BootstrapServers"),AutoOffsetReset = AutoOffsetReset.Earliest,GroupId = _config.GetValue<string>("Application:Name"),EnableAutoCommit =?true,};var?builder =?new?ConsumerBuilder<string,?string>(consumerConfig);using?(var?consumer = builder.Build()) {consumer.Subscribe(topic);while?(!stoppingToken.IsCancellationRequested) {try?{var?result = consumer.Consume(stoppingToken);var?@event?= JsonConvert.DeserializeObject<T>(result.Value);DoWork(@event);//consumer.StoreOffset(result);}?catch?(OperationCanceledException ex) {consumer.Close();_logger.LogDebug(ex,?"Kafka 消費者結束,退出后臺線程");}?catch?(AbpValidationException ex) {_logger.LogError(ex, $"Kafka {GetValidationErrorNarrative(ex)}");}?catch?(ConsumeException ex) {_logger.LogError(ex,?"Kafka 消費者產生異常");}?catch?(KafkaException ex) {_logger.LogError(ex,?"Kafka 產生異常");}?catch?(ValidationException ex) {_logger.LogError(ex,?"Kafka 消息驗證失敗");}?catch?(Exception ex) {_logger.LogError(ex,?"Kafka 捕獲意外異常");}}}}, stoppingToken, TaskCreationOptions.LongRunning, TaskScheduler.Default);}private?string?GetValidationErrorNarrative(AbpValidationException validationException) {var?detailBuilder =?new?StringBuilder();detailBuilder.AppendLine("驗證過程中檢測到以下錯誤");foreach?(var?validationResult?in?validationException.ValidationErrors) {detailBuilder.AppendFormat(" - {0}", validationResult.ErrorMessage);detailBuilder.AppendLine();}return?detailBuilder.ToString();}} |
這段代碼中我們會創建一個consumer,這里我們會在一個While循環中去訂閱特定Topic消息,這里的BootstrapServers是和發送方保持一致,并且也是在當前應用程序中的appsetting.json中進行配置的,而且這里的consumer.Consume方法是一個阻塞式方法,當發送方發送特定事件后,這里會接收到同樣名稱的Topic的消息,然后將接收到的Json數據進行反序列化,然后交由后面的DoWork方法進行處理。這里還是以之前生成者發送的RepairContractFinished事件為例,這里也需要定義一個RepairContractFinishedEventHandler來處理生產者發送的消息。
| public?class?RepairContractFinishedEventHandler : KafkaConsumerHostedService<RepairContractFinishedEvent> {public?RepairContractFinishedEventHandler(IServiceProvider services,IConfiguration config, ILogger<KafkaConsumerHostedService<RepairContractFinishedEvent>> logger):?base(services, config, logger) {}/// <summary>/// 調用 Application Service,新增或更新維修合同及關聯實體/// </summary>/// <param name="event">待消費的事件</param>protected?override?void?DoWork(RepairContractFinishedEvent @event) {using?(var?scope = _services.CreateScope()) {var?service = scope.ServiceProvider.GetRequiredService<IRepairContractAppService>();service.AddOrUpdateRepairContract(@event.RepairContract, @event.RepairContractAdjusts);}}} |
這里需要特別注意的是在這里我么也需要定義一個繼承自IIntegrationEvent接口的事件,這里也是定義一種數據結構,并且這里的數據結構和生成者定義的要保持一致,否則消費方在反序列化的時候會丟失不能夠匹配的信息。
| public?class?RepairContractFinishedEvent : IIntegrationEvent {public?RepairContractDto RepairContract {?get;?set; }public?List<RepairContractAdjustDto> RepairContractAdjusts {?get;?set; }public?string?Key {?get;?set; }} |
另外在DoWork方法中我們也需要注意代碼也需要用using包裹,從而在消費方消費完后釋放掉當前的應用服務。最后需要注意的就是我們的每一個Handle都是一個后臺服務,我們需要在Asp.Net Core的Startup的ConfigureServices進行配置,從而將當前的后臺服務添加到Asp.Net Core依賴注入容器中。
| /// <summary>/// 注冊集成事件的處理器/// </summary>/// <param name="services"></param>private?void?AddIntegrationEventHandlers(IServiceCollection services) {services.AddHostedService<RepairContractFinishedEventHandler>();services.AddHostedService<ProductTransferDataEventHandler>();services.AddHostedService<PartUpdateEventHandler>();services.AddHostedService<VehicleSoldFinishedEventHandler>();services.AddHostedService<AddOrUpdateDealerEventHandler>();services.AddHostedService<AddOrUpdateProductCategoryEventHandler>();services.AddHostedService<CustomerFinishedEventHandler>();services.AddHostedService<VehicleSoldUpdateStatusEventHandler>();services.AddHostedService<AddCustomerEventHandler>();} |
最后我們也看看我們的appsetting.json的配置文件關于kafka的配置。
| "Kafka": {"BootstrapServers":?"127.0.0.1:9092","MessageTimeoutMs": 5000,"Topics": {"RepairContractFinishedEvent":?"repair-contract-finished","AddOrUpdateProductCategoryEvent":?"add-update-product-category","AddOrUpdateDealerEvent":?"add-update-dealer","ClaimApproveEvent":?"claim-approve","ProductTransferDataEvent":?"product-update","PartUpdateEvent":?"part-update","VehicleSoldFinishedEvent":?"vehiclesold-finished","CustomerFinishedEvent":?"customer-update","VehicleInformationUpdateStatusEvent":?"add-update-vehicle-info","AddCustomerEvent":?"add-customer"}}, |
這里需要注意的是發送方和接收方必須保證Topic一致,并且配置的服務器名稱端口保持一致,這樣才能夠保證消息的準確發送和接收。最后對于服務端,這里推薦一個VSCode的插件kafka,能夠創建并發送消息,這樣就方便我們來發送我們需要的數據了,這里同樣需要我們先建立一個.kafka的文件,然后配置Kafka服務的地址和端口號。
圖五 利用VSCode Kafka插件發送消息
原文地址:https://www.cnblogs.com/seekdream/p/10757541.html
.NET社區新聞,深度好文,歡迎訪問公眾號文章匯總?http://www.csharpkit.com?
總結
以上是生活随笔為你收集整理的在Asp.Net Core中集成Kafka的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 请给你的短信验证码接口加上SSL双向验证
- 下一篇: 分享一个.NET平台开源免费跨平台的大数