.net 使用阿里云RocketMQ
生活随笔
收集整理的這篇文章主要介紹了
.net 使用阿里云RocketMQ
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
1.首先我們來講解一下消息隊列的作用
比如說我們的訂單系統,再客戶訂單生成了以后,可能會有
快遞系統,通知系統,和打印系統需要用到當前訂單的詳細內容
所以這個時候常規的操作是在A里面通過代碼調用B,C? ,D系統的接口來通知他們有新訂單了
?
如果此時有個E系統呢
那我們的做法可能只能在A系統中增加代碼來通知E系統,但是如果后期我們的E系統又不要了呢,豈不是我們又要在A系統中去除掉這一部分代碼所以說這樣的代碼冗余就很高,對后期的性能也很有影響,因為系統A中通知B,C,D系統還要判斷 B,C,D返回值中是否成功,如果沒有成功還要子再次請求這樣系統性能就非常的低下
所以說我們使用了MQ來解決這個問題
我們A系統秩序通知MQ系統 后面的B,C,D要信息的話直接找MQ系統調用就行
接下來我這里講解下如何把MQ集成到.net 項目中
?
生產者端:
using Aliyun.MQ; using Aliyun.MQ.Model; using Microsoft.AspNetCore.Http; using Microsoft.AspNetCore.Mvc; using Microsoft.Extensions.Logging; using Newtonsoft.Json; using System; using System.Collections.Generic; using System.Linq; using System.Threading.Tasks;namespace CSDNSign.Controllers {[Route("api/[controller]")][ApiController]public class MQController : ControllerBase{readonly IFreeSql _sql;private readonly ILogger<MQController> _logger;private const string _endpoint = "*************";// AccessKey ID阿里云身份驗證,在阿里云服務器管理控制臺創建。private const string _accessKeyId = "*************";// AccessKey Secret阿里云身份驗證,在阿里云服務器管理控制臺創建。private const string _secretAccessKey = "*************";// 所屬的Topic。private const string _topicName = "xsw";// Topic所屬實例ID,默認實例為空。private const string _instanceId = "*************";private static MQClient _client = new Aliyun.MQ.MQClient(_accessKeyId, _secretAccessKey, _endpoint);static MQProducer producer = _client.GetProducer(_instanceId, _topicName);/// <summary>/// /// </summary>/// <param name="sql"></param>/// <param name="logger"></param>public MQController(IFreeSql sql, ILogger<MQController> logger){_sql = sql;_logger = logger;}#region /// <summary>/// /// </summary>/// <param name="json"></param>/// <param name="key"></param>/// <param name="tag"></param>/// <returns></returns>[HttpPost][Route("TestMQ")]public string TestMQ(string json, string key, string tag){try{TopicMessage sendMsg = new TopicMessage(json);// 設置屬性。sendMsg.PutProperty("a", "a");// 設置Key。sendMsg.MessageKey = key;TopicMessage result = producer.PublishMessage(sendMsg);return JsonConvert.SerializeObject(result) ;}catch (Exception ex){return ex.Message.ToString();}}#endregion} }?
其中的endpoint可以在阿里云中接入點中查看
由于我們用的http的方式接受所以我們復制下面的地址
?
消費者端:
using Aliyun.MQ; using Aliyun.MQ.Model; using Aliyun.MQ.Model.Exp; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Security.Permissions; using System.Threading; using System.Threading.Tasks; using System.Windows; using System.Windows.Controls; using System.Windows.Data; using System.Windows.Documents; using System.Windows.Input; using System.Windows.Media; using System.Windows.Media.Imaging; using System.Windows.Navigation; using System.Windows.Shapes;namespace WPFTest {/// <summary>/// Interaction logic for MainWindow.xaml/// </summary>public partial class MainWindow : Window{private const string _endpoint = "********";// AccessKey ID阿里云身份驗證,在阿里云服務器管理控制臺創建。private const string _accessKeyId = "*****";// AccessKey Secret阿里云身份驗證,在阿里云服務器管理控制臺創建。private const string _secretAccessKey = "*****";// 所屬的Topic。private const string _topicName = "*****";// Topic所屬實例ID,默認實例為空。private const string _instanceId = "*******";private const string _groupId = "*********";private static MQClient _client = new Aliyun.MQ.MQClient(_accessKeyId, _secretAccessKey, _endpoint);static MQConsumer consumer = _client.GetConsumer(_instanceId, _topicName, _groupId, null);private Thread thread;public MainWindow(){InitializeComponent();this.button_Stop.IsEnabled = false;}private void button_Click(object sender, RoutedEventArgs e){thread = new Thread(new ThreadStart(TaskStart));thread.IsBackground = true;thread.Start();this.button_get.IsEnabled = false;this.button_Stop.IsEnabled = true;}/// <summary>/// 開始任務/// </summary>private void TaskStart(){Dispatcher.Invoke(() => this.textBox.Text = "====== 開始獲取MQ任務 ====== \n");// 在當前線程循環消費消息,建議多開個幾個線程并發消費消息。while (true){try{// 長輪詢消費消息。// 長輪詢表示如果Topic沒有消息,則請求會在服務端掛起3s,3s內如果有消息可以消費則立即返回客戶端。List<Message> messages = null;try{messages = consumer.ConsumeMessage(3, // 一次最多消費3條(最多可設置為16條)。3 // 長輪詢時間3秒(最多可設置為30秒)。);}catch (Exception exp1){if (exp1 is MessageNotExistException){Dispatcher.Invoke(() => textBox.Text += string.Format(Thread.CurrentThread.Name + " No new message, " + ((MessageNotExistException)exp1).RequestId + "\n"));continue;}Console.WriteLine(exp1);Thread.Sleep(2000);}if (messages == null){continue;}List<string> handlers = new List<string>();List<string> mqmessage = new List<string>();Console.WriteLine(Thread.CurrentThread.Name + " Receive Messages:");// 處理業務邏輯。foreach (Message message in messages){Console.WriteLine(message);Console.WriteLine("Property a is:" + message.GetProperty("a"));handlers.Add(message.ReceiptHandle);mqmessage.Add(message.Body);}// Message.nextConsumeTime前若不確認消息消費成功,則消息會被重復消費。// 消息句柄有時間戳,同一條消息每次消費拿到的都不一樣。try{consumer.AckMessage(handlers);Console.WriteLine("Ack message success:");foreach (string handle in mqmessage){Dispatcher.Invoke(() => textBox.Text += handle + "\n");}Console.WriteLine();return;}catch (Exception exp2){// 某些消息的句柄可能超時,會導致消息消費狀態確認不成功。if (exp2 is AckMessageException){AckMessageException ackExp = (AckMessageException)exp2;Console.WriteLine("Ack message fail, RequestId:" + ackExp.RequestId);foreach (AckMessageErrorItem errorItem in ackExp.ErrorItems){Dispatcher.Invoke(() => textBox.Text += string.Format("\tErrorHandle:" + errorItem.ReceiptHandle + ",ErrorCode:" + errorItem.ErrorCode + ",ErrorMsg:" + errorItem.ErrorMessage));}}}}catch (Exception ex){Console.WriteLine(ex);Thread.Sleep(2000);}}}private void button_Clean_Click(object sender, RoutedEventArgs e){this.textBox.Text = "";}private void button_Stop_Click(object sender, RoutedEventArgs e){this.button_get.IsEnabled = true;this.button_Stop.IsEnabled = false;thread.Interrupt();// Wait for newThread to end.//thread.Join();}} }?最后項目效果如下所示:
?
?
總結
以上是生活随笔為你收集整理的.net 使用阿里云RocketMQ的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: github客户端从gitLab下拉取代
- 下一篇: 【软件测试】美团一面、阿里一面复盘总结