RabbitMQ操作代码封装
生活随笔
收集整理的這篇文章主要介紹了
RabbitMQ操作代码封装
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
1、Message.cs
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks;namespace XFC.RabbitMQ.Domain {/// <summary>/// 消息實體/// </summary>public class Message{/// <summary>/// 消息創建/// </summary>/// <param name="value">值</param>/// <param name="headers">頭信息</param>/// <param name="contentType">MIME content type,缺省值為 text/plain</param>public Message(string value, IDictionary<string, object> headers, string contentType){Value = value;Headers = headers;ContentType = contentType;}/// <summary>/// 消息創建/// </summary>/// <param name="value">值</param>/// <param name="headers">頭信息</param>public Message(string value, IDictionary<string, object> headers): this(value, headers, "text/plain"){}/// <summary>/// 消息創建/// </summary>/// <param name="value">值</param>/// <param name="contentType">MIME content type</param>public Message(string value, string contentType): this(value, null, contentType){}/// <summary>/// 消息創建/// </summary>/// <param name="value">值</param>public Message(string value): this(value, null, "text/plain"){}/// <summary>/// 消息創建/// </summary>public Message(): this("", null, "text/plain"){}/// <summary>/// 消息值/// </summary>public string Value { get; set; }/// <summary>/// 消息頭/// </summary>public IDictionary<string, object> Headers { get; set; }/// <summary>/// MIME content type/// </summary>public string ContentType { get; set; }} } View Code2、RabbitMqPublisher.cs
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using RabbitMQ.Client; using XFC.RabbitMQ.Domain;namespace XFC.RabbitMQ {public class RabbitMqPublisher{private readonly string _rabbitMqUri;/// <summary>/// 構造函數/// </summary>/// <param name="rabbitMqUri">連接串,如 amqp://guest:guest@localhost:5672/</param>public RabbitMqPublisher(string rabbitMqUri){this._rabbitMqUri = rabbitMqUri;}/// <summary>/// 創建連接/// </summary>private IConnection CreateConnection(){var factory = new ConnectionFactory{Uri = new Uri(_rabbitMqUri)};return factory.CreateConnection();}/// <summary>/// 創建信道/// </summary>private IModel CreateChannel(IConnection con, string exchangeName, string exchangeType, string queueName, string routeKey){var channel = con.CreateModel();channel.ExchangeDeclare(exchangeName, exchangeType, true, false, null);if (!string.IsNullOrEmpty(queueName)){channel.QueueDeclare(queueName, true, false, false, null); //創建一個消息隊列,用來存儲消息channel.QueueBind(queueName, exchangeName, routeKey, null);}channel.BasicQos(0, 3, true); //在非自動確認消息的前提下,如果一定數目的消息(通過基于consume或者channel設置Qos的值)未被確認前,不進行消費新的消息return channel;}/// <summary>/// 發送ExchangeType類型為Direct的消息/// </summary>/// <param name="exchangeName">交換機名稱</param>/// <param name="routeKey">消息路由key</param>/// <param name="message">消息實體</param>/// <param name="queueName">缺省隊列名(不存在則自動創建),設置后可避免消息發送后由于沒有隊列接收而丟失的問題</param>/// <returns></returns>public bool PublishDirectMessage(string exchangeName, string routeKey, Message message, string queueName = ""){return this.PublishMessage(exchangeName, ExchangeType.Direct, queueName, routeKey, new[] { message });}/// <summary>/// 批量發送ExchangeType類型為Direct的消息/// </summary>/// <param name="exchangeName">交換機名稱</param>/// <param name="routeKey">消息路由key</param>/// <param name="messages">消息實體</param>/// <param name="queueName">缺省隊列名(不存在則自動創建),設置后可避免消息發送后由于沒有隊列接收而丟失的問題</param>/// <returns></returns>public bool PublishDirectMessages(string exchangeName, string routeKey, IEnumerable<Message> messages, string queueName = ""){return this.PublishMessage(exchangeName, ExchangeType.Direct, queueName, routeKey, messages);}/// <summary>/// 發送ExchangeType類型為Fanout的消息/// </summary>/// <param name="exchangeName">交換機名稱</param>/// <param name="message">消息實體</param>/// <param name="queueName">缺省隊列名(不存在則自動創建),設置后可避免消息發送后由于沒有隊列接收而丟失的問題</param>/// <returns></returns>public bool PublishFanoutMessage(string exchangeName, Message message, string queueName = ""){return this.PublishMessage(exchangeName, ExchangeType.Fanout, queueName, "", new[] { message });}/// <summary>/// 批量發送ExchangeType類型為Fanout的消息/// </summary>/// <param name="exchangeName">交換機名稱</param>/// <param name="messages">消息實體</param>/// <param name="queueName">缺省隊列名(不存在則自動創建),設置后可避免消息發送后由于沒有隊列接收而丟失的問題</param>/// <returns></returns>public bool PublishFanoutMessages(string exchangeName, IEnumerable<Message> messages, string queueName = ""){return this.PublishMessage(exchangeName, ExchangeType.Fanout, queueName, "", messages);}private bool PublishMessage(string exchangeName, string exchangeType, string queueName, string routeKey, IEnumerable<Message> messages){using (var con = CreateConnection()){using (var channel = CreateChannel(con, exchangeName, exchangeType, queueName, routeKey)){channel.ConfirmSelect();//啟用消息發送確認機制foreach (var message in messages){var body = Encoding.UTF8.GetBytes(message.Value);var properties = channel.CreateBasicProperties();properties.Persistent = true; //使消息持久化properties.Headers = message.Headers;properties.ContentType = string.IsNullOrEmpty(message.ContentType) ? "text/plain" : message.ContentType;channel.BasicPublish(exchangeName, routeKey, properties, body);}return channel.WaitForConfirms();}}}} } View Code3、RabbitMqQuery.cs
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using RabbitMQ.Client; using XFC.RabbitMQ.Domain;namespace XFC.RabbitMQ {public class RabbitMqQuery{private readonly string _rabbitMqUri;/// <summary>/// 構造函數/// </summary>/// <param name="rabbitMqUri">連接串,如 amqp://guest:guest@localhost:5672/</param>public RabbitMqQuery(string rabbitMqUri){this._rabbitMqUri = rabbitMqUri;}/// <summary>/// 創建連接/// </summary>private IConnection CreateConnection(){var factory = new ConnectionFactory{Uri = new Uri(_rabbitMqUri)};return factory.CreateConnection();}/// <summary>/// 拉取隊列中的數據/// </summary>/// <param name="queueName">隊列名</param>/// <returns></returns>public Message GetMessage(string queueName){using (var con = this.CreateConnection()){var channel = con.CreateModel();var rs = channel.BasicGet(queueName, true);return ResultToMessage(rs);} }/// <summary>/// 批量拉取隊列中的數據/// </summary>/// <param name="queueName">隊列名</param>/// <param name="queryCount">拉取數據的條數,默認為1</param>/// <returns></returns>public Message[] GetMessages(string queueName, int queryCount = 1){if (queryCount <= 0){ queryCount = 1; }var msgLst = new List<Message>();using (var con = this.CreateConnection()){var channel = con.CreateModel();for (int i = 0; i < queryCount; i++){var rs = channel.BasicGet(queueName, true);if (rs != null){msgLst.Add(ResultToMessage(rs));}else{break;}} }return msgLst.ToArray();}private Message ResultToMessage(BasicGetResult rs){var msg = new Message();if (rs != null){var body = rs.Body;msg.Value = Encoding.UTF8.GetString(body);msg.ContentType = rs.BasicProperties.ContentType;msg.Headers = rs.BasicProperties.Headers;}return msg;}} } View Code4、RabbitMqListener.cs
using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Collections.Generic; using System.IO; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; using RabbitMQ.Client.Exceptions; using XFC.Log; using XFC.RabbitMQ.Domain;namespace XFC.RabbitMQ {/// <summary>/// RabbitMq消息監聽器/// </summary>public class RabbitMqListener : IDisposable{private ConnectionFactory _factory;private IConnection _con;private IModel _channel;private EventingBasicConsumer _consumer;private readonly string _rabbitMqUri;private readonly string _queueName;private Func<Message, bool> _messageHandler;/// <summary>/// 釋放標記/// </summary>private bool disposed;~RabbitMqListener(){Dispose(false);}/// <summary>/// RabbitMQ消息監聽器/// </summary>/// <param name="rabbitMqUri">連接串,如 amqp://guest:guest@localhost:5672/</param>/// <param name="queueName">要監聽的隊列</param>public RabbitMqListener(string rabbitMqUri, string queueName){this._rabbitMqUri = rabbitMqUri;this._queueName = queueName;}/// <summary>/// 創建連接/// </summary>private void CreateConnection(){_factory = new ConnectionFactory{Uri = new Uri(_rabbitMqUri),RequestedHeartbeat = 20,//與服務器協商使用的心跳超時間隔(以秒為單位)。AutomaticRecoveryEnabled = true,//開啟網絡異常重連機制NetworkRecoveryInterval = TimeSpan.FromSeconds(10),//設置每10s重連一次網絡TopologyRecoveryEnabled = true //開啟重連后恢復拓撲(交換,隊列,綁定等等)。 };_con = _factory.CreateConnection();_con.ConnectionShutdown += (sender, e) => ReMessageListen();//掉線重新連接并監聽隊列消息 }/// <summary>/// 創建信道/// </summary>private void CreateChannel(){_channel = _con.CreateModel();_channel.BasicQos(0, 3, true); //在非自動確認消息的前提下,如果一定數目的消息(通過基于consume或者channel設置Qos的值)未被確認前,不進行消費新的消息 }private Message ResultToMessage(BasicDeliverEventArgs rs){var msg = new Message();if (rs != null){var body = rs.Body;msg.Value = Encoding.UTF8.GetString(body);msg.ContentType = rs.BasicProperties.ContentType;msg.Headers = rs.BasicProperties.Headers;}return msg;}/// <summary>/// 監聽隊列消息/// </summary>/// <param name="messageHandler">消息處理器,當監測到隊列消息時回調該處理器</param>/// <returns>監聽狀態</returns>public bool MessageListen(Func<Message, bool> messageHandler){try{this.CreateConnection();this.CreateChannel();_consumer = new EventingBasicConsumer(_channel); //基于事件的消息推送方式_consumer.Received += (sender, e) =>{var message = this.ResultToMessage(e);if (messageHandler != null){this._messageHandler = messageHandler;try{var isOk = this._messageHandler(message);if (isOk){_channel.BasicAck(e.DeliveryTag, false);}}catch (Exception ex){LoggerManager.ErrorLog.Error("消息處理器執行異常:" + ex.Message, ex);} }};_channel.BasicConsume(_queueName, false, _consumer); //手動確認return true;}catch (Exception ex){LoggerManager.ErrorLog.Error("嘗試監聽隊列消息出現錯誤:" + ex.Message, ex);}return false;}private void ReMessageListen(){try{//清除連接及頻道 CleanupResource();var mres = new ManualResetEventSlim(false); //初始化狀態為falsewhile (!mres.Wait(3000)) //每3秒監測一次狀態,直到狀態為true {if (MessageListen(_messageHandler)){mres.Set(); //設置狀態為true并跳出循環 }}}catch (Exception ex){LoggerManager.ErrorLog.Error("嘗試連接RabbitMQ服務器出現錯誤:" + ex.Message, ex);}}/// <summary>/// 清理資源/// </summary>private void CleanupResource(){if (_channel != null && _channel.IsOpen){try{_channel.Close();}catch (Exception ex){LoggerManager.ErrorLog.Error("嘗試關閉RabbitMQ信道遇到錯誤", ex);}_channel = null;}if (_con != null && _con.IsOpen){try{_con.Close();}catch (Exception ex){LoggerManager.ErrorLog.Error("嘗試關閉RabbitMQ連接遇到錯誤", ex);}_con = null;}}protected virtual void Dispose(bool disposing){if (disposed){return;}CleanupResource();disposed = true;}public void Dispose(){Dispose(true);GC.SuppressFinalize(this);}} } View Code5、RabbitMqDelayPublisher
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using RabbitMQ.Client; using XFC.Exceptions; using XFC.RabbitMQ.Domain;namespace XFC.RabbitMQ {public class RabbitMqDelayPublisher{private readonly string _rabbitMqUri;private readonly string _dlxExchangeName;private readonly string _dlxQueueName;private readonly string _dlxRouteKey;/// <summary>/// 構造函數/// </summary>/// <param name="rabbitMqUri">連接串,如 amqp://guest:guest@localhost:5672/</param>public RabbitMqDelayPublisher(string rabbitMqUri){this._rabbitMqUri = rabbitMqUri;}/// <summary>/// 構造函數/// </summary>/// <param name="rabbitMqUri">連接串,如 amqp://guest:guest@localhost:5672/</param>/// <param name="dlxExchangeName">死信隊列交換機名稱</param>/// <param name="dlxQueueName">死信隊列名稱</param>/// <param name="dlxRouteKey">死信隊列消息路由key</param>public RabbitMqDelayPublisher(string rabbitMqUri, string dlxExchangeName, string dlxQueueName, string dlxRouteKey): this(rabbitMqUri){this._dlxExchangeName = dlxExchangeName;this._dlxQueueName = dlxQueueName;this._dlxRouteKey = dlxRouteKey;}/// <summary>/// 創建連接/// </summary>private IConnection CreateConnection(){var factory = new ConnectionFactory{Uri = new Uri(_rabbitMqUri)};return factory.CreateConnection();}/// <summary>/// 創建信道/// </summary>private IModel CreateChannel(IConnection con, string exchangeName, string exchangeType, string queueName, string routeKey, int delayTime){if (string.IsNullOrEmpty(queueName)){throw new XFCException("queueName不能為空");}var channel = con.CreateModel();var dlxExchangeName = string.IsNullOrEmpty(this._dlxExchangeName) ? string.Format("dlx_{0}", exchangeName) : this._dlxExchangeName;var dlxQueueName = string.IsNullOrEmpty(this._dlxQueueName) ? string.Format("dlx_{0}", queueName) : this._dlxQueueName;var dlxRouteKey = string.IsNullOrEmpty(this._dlxRouteKey) ? (string.IsNullOrEmpty(routeKey) ? Guid.NewGuid().ToString().Replace("-", "") : string.Format("dlx_{0}", routeKey)) : this._dlxRouteKey;channel.ExchangeDeclare(dlxExchangeName, ExchangeType.Direct, true, false, null);channel.QueueDeclare(dlxQueueName, true, false, false, null);channel.QueueBind(dlxQueueName, dlxExchangeName, dlxRouteKey, null);var args = new Dictionary<string, object>{{"x-message-ttl", delayTime},{"x-dead-letter-exchange", dlxExchangeName},{"x-dead-letter-routing-key", dlxRouteKey}};channel.ExchangeDeclare(exchangeName, exchangeType, true, false, null);channel.QueueDeclare(queueName, true, false, false, args);channel.QueueBind(queueName, exchangeName, routeKey, null);channel.BasicQos(0, 1, true);return channel;}/// <summary>/// 發送ExchangeType類型為Direct的消息/// </summary>/// <param name="exchangeName">交換機名稱</param>/// <param name="routeKey">消息路由key</param>/// <param name="expireTime">過期時間,單位為秒</param>/// <param name="message">消息實體</param>/// <param name="queueName">隊列名(必填)</param>/// <returns></returns>public bool PublishDirectMessage(string exchangeName, string queueName, string routeKey, int expireTime, Message message){return this.PublishMessage(exchangeName, ExchangeType.Direct, queueName, routeKey, expireTime, new[] { message });}/// <summary>/// 批量發送ExchangeType類型為Direct的消息/// </summary>/// <param name="exchangeName">交換機名稱</param>/// <param name="routeKey">消息路由key</param>/// <param name="expireTime">過期時間,單位為秒</param>/// <param name="messages">消息實體</param>/// <param name="queueName">隊列名(必填)</param>/// <returns></returns>public bool PublishDirectMessages(string exchangeName, string queueName, string routeKey, int expireTime, IEnumerable<Message> messages){return this.PublishMessage(exchangeName, ExchangeType.Direct, queueName, routeKey, expireTime, messages);}/// <summary>/// 發送ExchangeType類型為Fanout的消息/// </summary>/// <param name="exchangeName">交換機名稱</param>/// <param name="expireTime">過期時間,單位為秒</param>/// <param name="message">消息實體</param>/// <param name="queueName">隊列名(必填)</param>/// <returns></returns>public bool PublishFanoutMessage(string exchangeName, string queueName, int expireTime, Message message){return this.PublishMessage(exchangeName, ExchangeType.Fanout, queueName, "", expireTime, new[] { message });}/// <summary>/// 批量發送ExchangeType類型為Fanout的消息/// </summary>/// <param name="exchangeName">交換機名稱</param>/// <param name="expireTime">過期時間,單位為秒</param>/// <param name="messages">消息實體</param>/// <param name="queueName">隊列名(必填)</param>/// <returns></returns>public bool PublishFanoutMessages(string exchangeName, string queueName, int expireTime, IEnumerable<Message> messages){return this.PublishMessage(exchangeName, ExchangeType.Fanout, queueName, "", expireTime, messages);}private bool PublishMessage(string exchangeName, string exchangeType, string queueName, string routeKey, int expireTime, IEnumerable<Message> messages){using (var con = CreateConnection()){using (var channel = CreateChannel(con, exchangeName, exchangeType, queueName, routeKey, expireTime * 1000)){channel.ConfirmSelect();//啟用消息發送確認機制foreach (var message in messages){var body = Encoding.UTF8.GetBytes(message.Value);var properties = channel.CreateBasicProperties();properties.Persistent = true; //使消息持久化properties.Headers = message.Headers;properties.ContentType = string.IsNullOrEmpty(message.ContentType) ? "text/plain" : message.ContentType;channel.BasicPublish(exchangeName, routeKey, properties, body);}var state = channel.WaitForConfirms();return state;}}}} } View Code6、Test
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using RabbitMQ.Client; using XFC.RabbitMQ.Domain;namespace XFC.RabbitMQ.Test {class Program{private const string RabbitHostUri = "amqp://guest:guest@localhost:5672/";private const string ExchangeName = "xfc.mq.test";private const string QueueName = "xfc.mq.test.queue";private const string RouteKey = "xfc.mq.test.key";static void Main(string[] args){var publisher = new XFC.RabbitMQ.RabbitMqPublisher(RabbitHostUri);var dic = new Dictionary<string, object>();dic.Add("uniquekey", "s34sdf3423234523sdfdsgf");dic.Add("callbackurl", "http://wwww.1234.com/callback");var ms =new List<Message>();for (int i = 0; i < 1000; i++){ms.Add(new Message("hello...", dic, "application/json"));}publisher.PublishDirectMessages(ExchangeName, RouteKey, ms, QueueName);Console.WriteLine("is ok");Console.ReadKey();var mqQuery = new XFC.RabbitMQ.RabbitMqQuery(RabbitHostUri);var ss = mqQuery.GetMessages(QueueName, 1000);foreach (var s in ss){Console.WriteLine(s.Value);Console.WriteLine(s.ContentType);foreach (var header in s.Headers){Console.WriteLine(header.Key + ":" + Encoding.UTF8.GetString((Byte[])header.Value));}}Console.ReadKey();using (var mqListener = new XFC.RabbitMQ.RabbitMqListener(RabbitHostUri, QueueName)){mqListener.MessageListen(msg =>{Console.WriteLine(msg.Value);return true;});Console.WriteLine("按任意鍵退出程序...");Console.ReadKey();}}} } View Code轉載于:https://www.cnblogs.com/huangzelin/p/11202627.html
總結
以上是生活随笔為你收集整理的RabbitMQ操作代码封装的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: VueJS教程4
- 下一篇: 【leetcode】股票买卖系列总结