基于事件驱动架构构建微服务第13部分:使用来自Apache KAFKA的事件并将投影流传输到ElasticSearch...
原文鏈接:https://logcorner.com/building-microservices-through-event-driven-architecture-part13-read-model-projection-project-streams-into-elasticsearch/
在本教程中,我將展示如何從KAFKA讀取流并將流投影到ElasticSearch中。
我必須使用來自KAFKA的消息,我從KAFKA讀取的消息是事件流。所以我必須將這些流投影到結(jié)構(gòu)化表示中。然后我會將投影索引到ElasticSearch中。
所以我會建立一個訂閱KAFKA并監(jiān)聽事件的消費者。如果它接收到一個event,它將使用投影來創(chuàng)建該事件的結(jié)構(gòu)演示。最后將其存儲到一個nosql數(shù)據(jù)庫ElasticSearch。
投影事件
實際上,讀取數(shù)千個事件會花費太長時間,相反我們可以預先計算當前狀態(tài)并將其存儲到nosql數(shù)據(jù)庫中。投影可以定義為從一系列事件中導出的當前狀態(tài)
我定義了一個基本的泛型類Entity,因此每個投影都將從它派生。
我定義了一個抽象的泛型類Projection,它接受一個事件列表并將它們應用于具體類(在我們的例子中是 SpeechProjection)。
SpeechProjection是一個表示我想從事件(SpeechCreatedEvent、SpeechTitleChangedEvent、SpeechDescriptionChangedEvent、SpeechUrlChangedEvent和SpeechTypeChangedEvent)重建其狀態(tài)的實體的類。
因此,對于與給定實體(語音)相關(guān)的每個事件,我必須將事件應用于實體。
ElasticSearch介紹
Elasticsearch是一種分布式RESTful搜索和分析引擎,能夠處理越來越多的用例。作為Elastic Stack的核心,它集中存儲你的數(shù)據(jù),以實現(xiàn)閃電般的快速搜索、微調(diào)相關(guān)性和可輕松擴展的強大分析。https://www.elastic.co/elasticsearch/
轉(zhuǎn)到以下鏈接安裝elasticsearch:https://www.elastic.co/downloads/elasticsearch
你可以通過使用PowerShell運行以下命令 curl http://localhost:9200/ 或 Invoke-RestMethod http://localhost:9200 來驗證安裝是否正常
以下代碼創(chuàng)建一個通用存儲庫以連接到彈性搜索,并執(zhí)行CRUD操作。
創(chuàng)建工作服務
ASP.NET Core Worker Service模板為編寫長時間運行的服務應用程序提供了一個起點。
我們可以使用工作服務來構(gòu)建不需要用戶交互或執(zhí)行定期和長時間運行的工作負載的應用程序。
https://docs.microsoft.com/fr-fr/aspnet/core/fundamentals/host/hosted-services?view=aspnetcore-5.0&tabs=visual-studio
我將使用Worker Service構(gòu)建一個消費服務,該服務消費來自APACHE KAFKA的事件并將它們索引到ElasticSearch
ConsumerHostedService
ConsumerHostedService是承載ConsumerService的后臺服務
ConsumerService
ConsumerService調(diào)用服務總線,該總線在產(chǎn)生新事件時從Kafka接收通知。
服務總線
KafkaClient
KafkaClient實現(xiàn)了IServiceBusProvider的ReceiveAsync。它訂閱了一個Kafka主題,因此當一個事件發(fā)布到該主題時,它會通知一個中介服務。
ElasticSearchNotifier實現(xiàn)了INotificationHandler。這個類的職責是反序列化輸入事件并將其索引到elasticsearch。
測試
啟動zookeeper
zookeeper-server-start.bat config\zookeeper.properties
啟動Kafka
kafka-server-start.bat config\server.properties
啟動ElasticSearch
啟動下列工程:
LogCorner.EduSync.SignalR.Server
LogCorner.EduSync.Speech.Producer
LogCorner.EduSync.Speech.Consumer 啟動下列工程:
LogCorner.EduSync.Speech.Presentation
啟動Postman并且post一個新command?
你應該在消費者控制臺上看到以下輸出,使用postman上發(fā)布的命令
代碼源可在此處獲得:
https://github.com/logcorner/LogCorner.EduSync.Speech.Command https://github.com/logcorner/LogCorner.EduSync.Speech.ServiceBus/tree/Feature/Task/IndexMessagesToElasticSearch
總結(jié)
以上是生活随笔為你收集整理的基于事件驱动架构构建微服务第13部分:使用来自Apache KAFKA的事件并将投影流传输到ElasticSearch...的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Kubernetes + .NET Co
- 下一篇: 利用SOS扩展库进入高阶.NET6程序的