ASP.NET Core 2.0利用MassTransit集成RabbitMQ
在ASP.NET Core上利用MassTransit來集成使用RabbitMQ真的很簡單,代碼也很簡潔。近期因為項目需要,我便在這基礎上再次進行了封裝,抽成了公共方法,使得使用RabbitMQ的調用變得更方便簡潔。那么,就讓咱們來瞧瞧其魅力所在吧。
?
MassTransit
先看看MassTransit是個什么寶貝(MassTransit官網的簡介):
MassTransit是一個免費的開源輕量級消息總線,用于使用.NET框架創建分布式應用程序。MassTransit在現有的頂級消息傳輸上提供了一系列廣泛的功能,從而以開發人員友好的方式使用基于消息的會話模式異步連接服務。基于消息的通信是實現面向服務的體系結構的可靠且可擴展的方式。
通俗描述:
MassTransit就是一套基于消息服務的高級封裝類庫,下游可聯接RabbitMQ、Redis、MongoDb等服務。
github官網:https://github.com/MassTransit/MassTransit
?
RabbitMQ
RabbitMQ是成熟的MQ隊列服務,是由 Erlang 語言開發的 AMQP 的開源實現。關于介紹RabbitMQ的中文資料也很多,有需要可以自行查找。我這里貼出其官網與下載安裝的鏈接,如下:
官網:http://www.rabbitmq.com
下載與安裝:http://www.rabbitmq.com/download.html
?
實現代碼
通過上面的介紹,咱們已對MassTransit與RabbitMQ有了初步了解,那么現在來看看如何在ASP.NET Core上優雅的使用RabbitMQ吧。
1、創建一個名為“RabbitMQHelp.cs”公共類,用于封裝操作RabbitMQ的公共方法,并通過Nuget來管理并引用“MassTransit”與“MassTransit.RabbitMQ”類庫。
2、“RabbitMQHelp.cs”公共類主要對外封裝兩個靜態方法,其代碼如下:
using MassTransit;
using MassTransit.RabbitMqTransport;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
namespace Lezhima.Comm
{
? ? /// <summary>
? ? /// RabbitMQ公共操作類,基于MassTransit庫
? ? /// </summary>
? ? public class RabbitMQHelp
? ? {
? ? ? ? #region 交換器
? ? ? ? /// <summary>
? ? ? ? /// 操作日志交換器
? ? ? ? /// 同時需在RabbitMQ的管理后臺創建同名交換器
? ? ? ? /// </summary>
? ? ? ? public static readonly string actionLogExchange = "Lezhima.ActionLogExchange";
? ? ? ? #endregion
? ? ? ? #region 聲明變量
? ? ? ? /// <summary>
? ? ? ? /// MQ聯接地址,建議放到配置文件
? ? ? ? /// </summary>
? ? ? ? private static readonly string mqUrl = "rabbitmq://192.168.6.181/";
? ? ? ? /// <summary>
? ? ? ? /// MQ聯接賬號,建議放到配置文件
? ? ? ? /// </summary>
? ? ? ? private static readonly string mqUser = "admin";
? ? ? ? /// <summary>
? ? ? ? /// MQ聯接密碼,建議放到配置文件
? ? ? ? /// </summary>
? ? ? ? private static readonly string mqPwd = "admin";
? ? ? ? #endregion
? ? ? ? /// <summary>
? ? ? ? /// 創建連接對象
? ? ? ? /// 不對外公開
? ? ? ? /// </summary>
? ? ? ? private static IBusControl CreateBus(Action<IRabbitMqBusFactoryConfigurator, IRabbitMqHost> registrationAction = null)
? ? ? ? {
? ? ? ? ? ? //通過MassTransit創建MQ聯接工廠
? ? ? ? ? ? return Bus.Factory.CreateUsingRabbitMq(cfg =>
? ? ? ? ? ? {
? ? ? ? ? ? ? ? var host = cfg.Host(new Uri(mqUrl), hst =>
? ? ? ? ? ? ? ? {
? ? ? ? ? ? ? ? ? ? hst.Username(mqUser);
? ? ? ? ? ? ? ? ? ? hst.Password(mqPwd);
? ? ? ? ? ? ? ? });
? ? ? ? ? ? ? ? registrationAction?.Invoke(cfg, host);
? ? ? ? ? ? });
? ? ? ? }
? ? ? ? /// <summary>
? ? ? ? /// MQ生產者
? ? ? ? /// 這里使用fanout的交換類型
? ? ? ? /// </summary>
? ? ? ? /// <param name="obj"></param>
? ? ? ? public async static Task PushMessage(string exchange, object obj)
? ? ? ? {
? ? ? ? ? ? var bus = CreateBus();
? ? ? ? ? ? var sendToUri = new Uri($"{mqUrl}{exchange}");
? ? ? ? ? ? var endPoint = await bus.GetSendEndpoint(sendToUri);
? ? ? ? ? ? await endPoint.Send(obj);
? ? ? ? }
? ? ? ? /// <summary>
? ? ? ? /// MQ消費者
? ? ? ? /// 這里使用fanout的交換類型
? ? ? ? /// consumer必需是實現IConsumer接口的類實例
? ? ? ? /// </summary>
? ? ? ? /// <param name="obj"></param>
? ? ? ? public static void ReceiveMessage(string exchange, object consumer)
? ? ? ? {
? ? ? ? ? ? var bus = CreateBus((cfg, host) =>
? ? ? ? ? ? {
? ? ? ? ? ? ? ? //從指定的消息隊列獲取消息 通過consumer來實現消息接收
? ? ? ? ? ? ? ? cfg.ReceiveEndpoint(host, exchange, e =>
? ? ? ? ? ? ? ? {
? ? ? ? ? ? ? ? ? ? e.Instance(consumer);
? ? ? ? ? ? ? ? });
? ? ? ? ? ? });
? ? ? ? ? ? bus.Start();
? ? ? ? }
? ? }
}
3、“RabbitMQHelp.cs”公共類已經有了MQ“生產者”與“消費者”兩個對外的靜態公共方法,其中“生產者”方法可以在業務代碼中直接調用,可傳遞JSON、對象等類型的參數向指定的交換器發送數據。而“消費者”方法是從指定交換器中進行接收綁定,但接收到的數據處理功能則交給了“consumer”類(因為在實際項目中,不同的數據有不同的業務處理邏輯,所以這里我們直接就通過IConsumer接口交給具體的實現類去做了)。那么,下面我們再來看看消費者里傳遞進來的“consumer”類的代碼吧:
using MassTransit;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
namespace Lezhima.Storage.Consumer
{
? ? /// <summary>
? ? /// 從MQ接收并處理數據
? ? /// 實現MassTransit的IConsumer接口
? ? /// </summary>
? ? public class LogConsumer : IConsumer<ActionLog>
? ? {
? ? ? ? /// <summary>
? ? ? ? /// 重寫Consume方法
? ? ? ? /// 接收并處理數據
? ? ? ? /// </summary>
? ? ? ? /// <param name="context"></param>
? ? ? ? /// <returns></returns>
? ? ? ? public Task Consume(ConsumeContext<ActionLog> context)
? ? ? ? {
? ? ? ? ? ? return Task.Run(async () =>
? ? ? ? ? ? {
? ? ? ? ? ? ? ? //獲取接收到的對象
? ? ? ? ? ? ? ? var amsg = context.Message;
? ? ? ? ? ? ? ? Console.WriteLine($"Recevied By Consumer:{amsg}");
? ? ? ? ? ? ? ? Console.WriteLine($"Recevied By Consumer:{amsg.ActionLogId}");
? ? ? ? ? ? });
? ? ? ? }
? ? }
}
調用代碼
1、生產者調用代碼如下:
/// <summary>
/// 測試MQ生產者
/// </summary>
/// <returns></returns>
[HttpGet]
public async Task<MobiResult> AddMessageTest()
{
? ? //聲明一個實體對象
? ? var model = new ActionLog();
? ? model.ActionLogId = Guid.NewGuid();
? ? model.CreateTime = DateTime.Now;
? ? model.UpdateTime = DateTime.Now;
? ? //調用MQ
? ? await RabbitMQHelp.PushMessage(RabbitMQHelp.actionLogExchange, model);
? ? return new MobiResult(1000, "操作成功");
}
2、消費者調用代碼如下:
using Lezhima.Storage.Consumer;
using Microsoft.Extensions.Configuration;
using System;
using System.IO;
namespace Lezhima.Storage
{
? ? class Program
? ? {
? ? ? ? static void Main(string[] args)
? ? ? ? {
? ? ? ? ? ? var conf = new ConfigurationBuilder()
? ? ? ? ? ? ? .SetBasePath(Directory.GetCurrentDirectory())
? ? ? ? ? ? ? .AddJsonFile("appsettings.json", true, true)
? ? ? ? ? ? ? .Build();
? ? ? ? ? ? //調用接收者
? ? ? ? ? ? RabbitMQHelp.ReceiveMessage(RabbitMQHelp.actionLogExchange,
? ? ? ? ? ? ?new LogConsumer()
? ? ? ? ? ? );
? ? ? ? ? ? Console.ReadLine();
? ? ? ? }
? ? }
}
總結
1、基于MassTransit庫使得我們使用RabbitMQ變得更簡潔、方便。而基于再次封裝后,生產者與消費者將不需要關注具體的業務,也跟業務代碼解耦了,更能適應項目的需要。
2、RabbitMQ的交換器需在其管理后臺自行創建,而這里使用的fanout類型是因為其發送速度最快,且能滿足我的項目需要,各位可視自身情況選用不同的類型。fanout類型不會存儲消息,必需要消費者綁定交換器后才會發送給消費者。
原文鏈接:https://www.cnblogs.com/Andre/p/9579764.html
.NET社區新聞,深度好文,歡迎訪問公眾號文章匯總 http://www.csharpkit.com
總結
以上是生活随笔為你收集整理的ASP.NET Core 2.0利用MassTransit集成RabbitMQ的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: asp.net core添加全局异常处理
- 下一篇: 《通过C#学Proto.Actor模型》