MassTransit Get Started-
MassTransit:是一款.NET的分布式應(yīng)用程序框架(開(kāi)源、免費(fèi))。通過(guò)MassTransit,可以輕松創(chuàng)建利用基于消息的、松耦合異步通信的應(yīng)用程序和服務(wù),以提高可用性,可靠性和可伸縮性。
MassTransit本身定位輕量級(jí)的服務(wù)總線,并支持多種傳輸方式如:RabbitMQ、Azure Service Bus、ActiveMQ、Amazon SQS、Kafka、Azure Event Hub。消息異常處理:重試配置、重新交付、erro管道、死信管道。分布式事務(wù)處理:sagas、Courier。容器支持:.NETcore自身的、autofac、castle windsor等、調(diào)度支持:Quartz?、hangfire。更多功能參考官網(wǎng)文檔。
MassTransit目前已經(jīng)發(fā)布到了第7個(gè)版本了,7.0版本新增了對(duì)Kafka 的支持,構(gòu)建僅支持.NET Standard 2.0...其他改動(dòng)不大。MassTransit社區(qū)使用也是很活躍的,對(duì)于首次接觸的,通過(guò)本篇文章(基于rabbitmq)幫你快速入門(mén)!
一個(gè)應(yīng)用程序或服務(wù)可以使用兩種不同的方法來(lái)生產(chǎn)消息,主要區(qū)別是sent需要指定具體的端點(diǎn)地址,而pub不需要,下面的代碼會(huì)演示這兩種方式。
發(fā)布事件(多個(gè)接收者)
發(fā)送命令(一個(gè)接收者)
發(fā)布事件(事件消息)
場(chǎng)景假設(shè):在xx項(xiàng)目中,需要與第三方進(jìn)行交互。比如:訂單發(fā)貨之后,把發(fā)貨的信息的推送給第三方、把訂單的狀態(tài)變化也推送過(guò)去。我們分析下需求,系統(tǒng)要求在發(fā)貨之后,需要做若干事情。可以解讀為,發(fā)貨這個(gè)動(dòng)作已經(jīng)發(fā)生了,需要做的事情不確定。這不是典型的發(fā)布訂閱模式嘛!好了,那使用masstransit如何實(shí)現(xiàn)呢?
1.創(chuàng)建一個(gè)類庫(kù)項(xiàng)目定義消息體,命名為contract
public interface OrderShipped{public Guid OrderId { get; set; }//訂單號(hào)}2.創(chuàng)建一個(gè)api項(xiàng)目作為消息的生產(chǎn)方,命名為Delivery,然后安裝nuget包:
Install-Package MassTransit.AspNetCore Install-Package MassTransit.RabbitMQ在Startup類的ConfigureServices中,添加以下配置
services.AddMassTransit(x =>{x.UsingRabbitMq((context, config) =>{config.Host("rabbitmq://localhost:5672", hostConfig =>{hostConfig.Username("Amq");//填寫(xiě)你的用戶名hostConfig.Password("mq123");//填寫(xiě)你的用戶名});});});services.AddMassTransitHostedService();在ValueController中,進(jìn)行發(fā)布消息
readonly IPublishEndpoint _publishEndpoint;public ValuesController(IPublishEndpoint publishEndpoint){_publishEndpoint = publishEndpoint;}/// <summary>/// 測(cè)試發(fā)布/// </summary>/// <returns></returns>[HttpPost]public async Task<ActionResult> OrderDelivery(){//其他await _publishEndpoint.Publish<OrderShipped>(new{OrderId = Guid.NewGuid()});return Ok();}通過(guò)使用IPublishEndpoint實(shí)例的Publish方法,發(fā)布一個(gè)事件。
3.創(chuàng)建一個(gè)api項(xiàng)目作為消息的消費(fèi)方,命名為L(zhǎng)istener,然后安裝nuget包:
Install-Package MassTransit.AspNetCore Install-Package MassTransit.RabbitMQ在Startup類的ConfigureServices中,添加以下配置
services.AddMassTransit(x =>{x.AddConsumer<DeliveryExpressNotifyConsumer>();x.UsingRabbitMq((context, config) =>{config.Host("rabbitmq://localhost:5672", hostConfig =>{hostConfig.Username("Amq");hostConfig.Password("mq123");});config.ReceiveEndpoint("Order.Shipped", e =>{e.ConfigureConsumer<DeliveryExpressNotifyConsumer>(context);});});});services.AddMassTransitHostedService();實(shí)現(xiàn)接口IConsumer即可完成消費(fèi)邏輯的過(guò)程。
public class DeliveryExpressNotifyConsumer : IConsumer<OrderShipped>{ILogger<DeliveryExpressNotifyConsumer> _logger;public DeliveryExpressNotifyConsumer(ILogger<DeliveryExpressNotifyConsumer> logger){_logger = logger;}public async Task Consume(ConsumeContext<OrderShipped> context){_logger.LogInformation("訂單id: {Value}已發(fā)貨,通知第三方", context.Message.OrderId);}} 到此,消息生產(chǎn)方和消費(fèi)方代碼都已經(jīng)實(shí)現(xiàn)了,運(yùn)行一下,效果如下發(fā)送消息(命令消息)
發(fā)送消息適用的場(chǎng)景,常常是一種命令,并且期望消息只被一個(gè)接收者或服務(wù)實(shí)例進(jìn)行處理。masstransit使用發(fā)送消息和發(fā)布消息,在消息生產(chǎn)方不同之處,sent消息需要指定目標(biāo)地址,使用ISendEndpoint的Send方法,消費(fèi)者代碼一樣的配置。以下代碼演示發(fā)送一個(gè)創(chuàng)建發(fā)貨單的指令消息,比較簡(jiǎn)單直接貼出源碼:
1.定義一個(gè)消息SubmitShippingOrder
public class SubmitShippingOrder{public Guid OrderId { get; set; }}2.sent消息
readonly IPublishEndpoint _publishEndpoint;readonly ISendEndpointProvider _sendEndpointProvider;public ValuesController(IPublishEndpoint publishEndpoint, ISendEndpointProvider sendEndpointProvider){_publishEndpoint = publishEndpoint;_sendEndpointProvider = sendEndpointProvider;}/// <summary>/// 測(cè)試發(fā)送/// </summary>/// <returns></returns>[HttpPost]public async Task<ActionResult> Delivery(){var endpoint = await _sendEndpointProvider.GetSendEndpoint(new Uri("queue:Submit.Shipping.Order")); //獲取發(fā)送端點(diǎn)await endpoint.Send(new SubmitShippingOrder{OrderId = Guid.NewGuid()});return Ok();}3.消費(fèi)
public class SubmitShippingOrderConsumer : IConsumer<SubmitShippingOrder>{ILogger<SubmitShippingOrderConsumer> _logger;public SubmitShippingOrderConsumer(ILogger<SubmitShippingOrderConsumer> logger){_logger = logger;}public async Task Consume(ConsumeContext<SubmitShippingOrder> context){_logger.LogInformation("創(chuàng)建發(fā)貨單,訂單id: {Value}", context.Message.OrderId);}}運(yùn)行一下效果:
官網(wǎng)文檔:
http://masstransit-project.com/
https://masstransit-project.com/advanced/courier/
總結(jié)
以上是生活随笔為你收集整理的MassTransit Get Started-的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 在腾讯云云函数计算上部署.NET Cor
- 下一篇: ASP.NET Core Cookie