【Microsoft Azure学习之旅】测试消息队列(Service Bus Queue)是否会丢消息
組里最近遇到一個問題,微軟的Azure Service Bus Queue是否可靠?是否會出現丟失消息的情況?
具體緣由如下,
由于開發的產品是SaaS產品,為防止消息丟失,跨Module消息傳遞使用的是微軟Azure消息隊列(Service Bus Queue),但是出現一個問題,一個Module向Queue里發送消息,但另一個Module沒有取到該消息。因為消息發送過程中并未有異常。所以大家懷疑,是否Azure Service Bus Queue不可靠,丟失了我們的一些消息?
官方的說法是,99.5%的概率消息不會丟失。
但我想應該沒有那么湊巧,畢竟我們的消息量還在測試階段,沒有那么大,不會那么湊巧碰上。所以索性根據同事的建議,寫一個測試程序來確定Service Bus Queue是否會或者容易丟失消息。
一. 測試程序簡介
原理:向消息隊列(Queue)中發送一定量的消息,看能否全部取到。如可全部取到,則可認為消息隊列基本可靠,問題出在我們自己身上。
過程:
首先建立一個消息隊列(Queue),程序使用Azure .Net SDK實現向Queue發送和接受消息(接收到消息后會調用方法在Queue中刪除此消息,刪除成功,則視為接收成功)。
主程序執行后,會啟動兩個線程,
線程1負責不斷向Queue中發送消息(總量一定,假定共發送10000條,由于SDK中Send方法無返回值告知是否發送成功,如果發送過程中無異常拋出,則視為成功發送)。
線程2負責不斷地從Queue中取消息,取到消息到本地后,即刪除在Queue中的此消息。取到消息并成功刪除視為成功取到消息,計數器+1。
日志模塊:
使用Log4net記錄日志
?
二. 代碼實現
Class?ServiceBusQueueHandler負責封裝.Net SDK的發送,接收消息。
class ServiceBusQueueHandler{public static readonly log4net.ILog log = log4net.LogManager.GetLogger(System.Reflection.MethodBase.GetCurrentMethod().DeclaringType);public ServiceBusQueueHandler(){/* For most scenarios, it is recommended that you keep Mode to Auto. * This indicates that your application will attempt to use TCP to connect to the Windows Azure Service Bus, * but will use HTTP if unable to do so. In general, this allows your connection to be more efficient. * However, if TCP is always unavailable to your application, * you can save some time on your connection if you globally set the mode to HTTP.*/ServiceBusEnvironment.SystemConnectivity.Mode = ConnectivityMode.AutoDetect;}//Send Messagepublic bool SendMessage(string strMessageBody, QueueClient client, int idelayTime = 0){//log.Debug("=>SendMessage");bool bRet = false;try{BrokeredMessage message = new BrokeredMessage(strMessageBody);DateTime utcEnqueueTime = DateTime.UtcNow.AddSeconds(idelayTime);//log.Debug(string.Format("DateTime.UtcNow = {0}", DateTime.UtcNow.ToString()));//log.Debug(string.Format("EnqueuedTimeUtc = {0}", utcEnqueueTime.ToString()));//set the time when this message will be visiablemessage.ScheduledEnqueueTimeUtc = utcEnqueueTime;//http://msdn.microsoft.com/en-us/library/microsoft.servicebus.messaging.queueclient.send.aspx client.Send(message);log.Debug(string.Format("Success send! Send Time = {0}, Body = {1}", DateTime.UtcNow.ToString(), message.GetBody<string>()));bRet = true;}catch (TimeoutException e){//Thrown when operation times out. Timeout period is initialized through the MessagingFactorySettings may need to increase the value of OperationTimeout to avoid this exception if timeout value is relatively low.log.Debug(string.Format("TimeoutException: {0}", e.Message));return bRet;}catch (ArgumentException e){//Thrown when the BrokeredMessage is null.log.Debug(string.Format("ArgumentException: {0}", e.Message));return bRet;}catch (InvalidOperationException e){//Thrown if the message has already been sent by a QueueClient or MessageSender once already.log.Debug(string.Format("InvalidOperationException: {0}", e.Message));return bRet;}catch (OperationCanceledException e){//Thrown if the client entity has been closed or aborted.log.Debug(string.Format("OperationCanceledException: {0}", e.Message));return bRet;}catch (UnauthorizedAccessException e){//Thrown if there is an I/O or security error.log.Debug(string.Format("UnauthorizedAccessException: {0}", e.Message));return bRet;}catch (SerializationException e){//Thrown when an error occurs during serialization or deserialization.log.Debug(string.Format("SerializationException: {0}", e.Message));return bRet;}catch (MessagingEntityNotFoundException e){//Thrown if the queue does not exist.log.Debug(string.Format("MessagingEntityNotFoundException: {0}", e.Message));return bRet;}catch (MessagingException e){log.Debug(string.Format("MessagingException: {0}", e.Message));if (e.IsTransient){//e.IsTransient: Gets a value indicating whether the exception is transient. Check this property to determine if the operation should be retried. HandleTransientErrors(e);}return bRet;}catch (Exception e){log.Debug(string.Format("Exception: {0}", e.Message));return bRet;}//log.Debug("<=SendMessage");return bRet;}//SendMessages, the maximum size of the batch is the same as the maximum size of a single message (currently 256 Kb).public bool SendMessages(List<string> arrayMessages, QueueClient client, int idelayTime = 0){//log.Debug("=>SendMessage");bool bRet = false;int i = 0;//prepare dataList<BrokeredMessage> arrayBrokedMessages = new List<BrokeredMessage>();DateTime utcEnqueueTime = DateTime.UtcNow.AddSeconds(idelayTime);log.Debug(string.Format("DateTime.UtcNow = {0}", DateTime.UtcNow.ToString()));log.Debug(string.Format("EnqueuedTimeUtc = {0}", utcEnqueueTime.ToString()));foreach (string strMessageBody in arrayMessages){BrokeredMessage message = new BrokeredMessage(strMessageBody);// The Id of message must be assigned message.MessageId = "Message_" + (++i).ToString();message.ScheduledEnqueueTimeUtc = utcEnqueueTime;arrayBrokedMessages.Add(message);}//send messagestry{client.SendBatch(arrayBrokedMessages);log.Debug(string.Format("Success send batch messages!"));bRet = true;}catch (TimeoutException e){//Thrown when operation times out. Timeout period is initialized through the MessagingFactorySettings may need to increase the value of OperationTimeout to avoid this exception if timeout value is relatively low.log.Debug(string.Format("TimeoutException: {0}", e.Message));return bRet;}catch (ArgumentException e){//Thrown when the BrokeredMessage is null.log.Debug(string.Format("ArgumentException: {0}", e.Message));return bRet;}catch (InvalidOperationException e){//Thrown if the message has already been sent by a QueueClient or MessageSender once already.log.Debug(string.Format("InvalidOperationException: {0}", e.Message));return bRet;}catch (OperationCanceledException e){//Thrown if the client entity has been closed or aborted.log.Debug(string.Format("OperationCanceledException: {0}", e.Message));return bRet;}catch (UnauthorizedAccessException e){//Thrown if there is an I/O or security error.log.Debug(string.Format("UnauthorizedAccessException: {0}", e.Message));return bRet;}catch (SerializationException e){//Thrown when an error occurs during serialization or deserialization.log.Debug(string.Format("SerializationException: {0}", e.Message));return bRet;}catch (MessagingEntityNotFoundException e){//Thrown if the queue does not exist.log.Debug(string.Format("MessagingEntityNotFoundException: {0}", e.Message));return bRet;}catch (MessagingException e){log.Debug(string.Format("MessagingException: {0}", e.Message));if (e.IsTransient){//e.IsTransient: Gets a value indicating whether the exception is transient. Check this property to determine if the operation should be retried. HandleTransientErrors(e);}return bRet;}catch (Exception e){log.Debug(string.Format("Exception: {0}", e.Message));return bRet;}log.Debug("<=SendMessage");return bRet;}//get messages from a queue//iWaitTimeout: The time span that the server will wait for the message batch to arrive before it times out.public List<BrokeredMessage> GetMessages(int iMaxNumMsg, int iWaitTimeout, QueueClient client){//log.Debug("=>ReceiveMessages"); List<BrokeredMessage> list = new List<BrokeredMessage>();try{//receive messages from Agent Subscriptionlist = client.ReceiveBatch(iMaxNumMsg, TimeSpan.FromSeconds(iWaitTimeout)).ToList<BrokeredMessage>();}catch (MessagingException e){log.Debug(string.Format("ReceiveMessages, MessagingException: {0}", e.Message));if (e.IsTransient){//e.IsTransient: Gets a value indicating whether the exception is transient. Check this property to determine if the operation should be retried. HandleTransientErrors(e);}return null;}catch (Exception e){log.Debug(string.Format("ReceiveMessages, Exception: {0}", e.Message));return null;}//subClient.Close();//log.Debug("<=ReceiveMessages");return list;}public bool DeleteMessage(BrokeredMessage message){//log.Debug("=>DeleteMessage");bool bRet = false;try{message.Complete();bRet = true;log.Debug(string.Format("Delete Message Successfully"));}catch (Exception e){log.Debug(e.Message);return bRet;}//log.Debug("<=DeleteMessage");return bRet;}private void HandleTransientErrors(MessagingException e){//If transient error/exception, let's back-off for 2 seconds and retry log.Debug(e.Message);log.Debug("Transient error happened! Will retry in 2 seconds");Thread.Sleep(2000);}}?
Main方法以及線程1,線程2的實現。
//this function is used to send a number of messages to a queuepublic static void SendMessageToQueue(){int sendMessageNum = 10000;log.Debug(string.Format("=> SendMessageToQueue, send message number = {0}", sendMessageNum));//prepare the handler, clientServiceBusQueueHandler handler = new ServiceBusQueueHandler();QueueClient client = QueueClient.CreateFromConnectionString(connectionString, queueName);//the message num which is sent successfullyint count = 0;for (int i = 0; i < sendMessageNum; i++){//send a messagestring strMessageBody = System.Guid.NewGuid().ToString();bool bRet = handler.SendMessage(strMessageBody, client, 10);if (bRet){count++;}//wait 2s, then send next messageThread.Sleep(2000);}log.Debug(string.Format("<= SendMessageToQueue, success sent message number = {0}", count));}public static void ReceiveMessageFromQueue(){log.Debug("=> ReceiveMessageFromQueue");//prepare the handler, clientServiceBusQueueHandler handler = new ServiceBusQueueHandler();QueueClient client = QueueClient.CreateFromConnectionString(connectionString, queueName);//the message num which is received successfullyint count = 0;//if we can't get message in 1 hour(60 * 60 = 30 * 120), we think there are no more messages in the queueint failCount = 0;while (failCount < 30){List<BrokeredMessage> list = handler.GetMessages(10, 120, client);if (list.Count > 0){foreach (BrokeredMessage e in list){log.Debug(string.Format("Received 1 Message, Time = {0}, Message Body = {1}", DateTime.UtcNow.ToString(), e.GetBody<string>()));//delete messagebool bRet = handler.DeleteMessage(e);if (bRet){count++;}}log.Debug(string.Format("Current Count Number = {0}", count));}else{failCount++;log.Debug(string.Format("Didn't Receive any Message this time, fail count number = {0}", failCount));}//wait 10s, then send next messageThread.Sleep(1000);}log.Debug(string.Format("<= ReceiveMessageFromQueue, success received message number = {0}", count));}static void Main(string[] args){log4net.GlobalContext.Properties["LogName"] = "TestServiceBus.log";log4net.Config.XmlConfigurator.Configure();Console.WriteLine("Start");Thread threadSendMessage = new Thread(SendMessageToQueue);Thread threadReceMessage = new Thread(ReceiveMessageFromQueue);threadSendMessage.Start();threadReceMessage.Start();//Console.WriteLine("Stop"); Console.ReadLine();}當然,這里有一個小地方,因為線程1只會發送10000條消息,線程2一直在接收,但當一個小時內沒有接收到消息時,則可認為隊列中不會再有消息,則停止接收。
?
三. 測試結果
從Log來看,程序跑了將近8個小時,最后結果如下:
成功發送10000條消息
2015-04-30 15:01:49,576 [3] DEBUG TestServiceBus.Program <= SendMessageToQueue, success sent message number = 10000成功接收10000條消息
2015-04-30 15:02:03,638 [4] DEBUG TestServiceBus.Program Current Count Number = 10000所以僅從此次測試結果來看,Service Bus Queue并未丟失消息。所以組里遇到消息的問題,建議還是從自己代碼入手檢查問題,是否我們自己出了問題,而非Service Bus Queue。
?
---------------------------------------------------------------
2015年5月5日更新:最終找到Service Bus丟失消息的原因,問題果然出在我們自己這邊,發消息時,message id有重復的可能,導致可能會丟信。message id應唯一。
?
拋磚引玉,謝謝:-)
?
Kevin Song
2015年5月2日
?
轉載于:https://www.cnblogs.com/KevinSong/p/Azure.html
總結
以上是生活随笔為你收集整理的【Microsoft Azure学习之旅】测试消息队列(Service Bus Queue)是否会丢消息的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: python_L2_operator
- 下一篇: linux定时任务生产java服务无法执