第二十节: 深入理解并发机制以及解决方案(锁机制、EF自有机制、队列模式等)
一. 理解并發(fā)機制
1. 什么是并發(fā),并發(fā)與多線程有什么關系?
①. 先從廣義上來說,或者從實際場景上來說.
高并發(fā)通常是海量用戶同時訪問(比如:12306買票、淘寶的雙十一搶購),如果把一個用戶看做一個線程的話那么并發(fā)可以理解成多線程同時訪問,高并發(fā)即海量線程同時訪問。
? ? ??(ps:我們在這里模擬高并發(fā)可以for循環(huán)多個線程即可)
②.從代碼或數(shù)據(jù)的層次上來說.
多個線程同時在一條相同的數(shù)據(jù)上執(zhí)行多個數(shù)據(jù)庫操作。
2. 從代碼層次上來說,給并發(fā)分類。
①.積極并發(fā)(樂觀并發(fā)、樂觀鎖):無論何時從數(shù)據(jù)庫請求數(shù)據(jù),數(shù)據(jù)都會被讀取并保存到應用內(nèi)存中。數(shù)據(jù)庫級別沒有放置任何顯式鎖。數(shù)據(jù)操作會按照數(shù)據(jù)層接收到的先后順序來執(zhí)行。
積極并發(fā)本質就是允許沖突發(fā)生,然后在代碼本身采取一種合理的方式去解決這個并發(fā)沖突,常見的方式有:
a.忽略沖突強制更新:數(shù)據(jù)庫會保存最后一次更新操作(以更新為例),會損失很多用戶的更新操作。
b.部分更新:允許所有的更改,但是不允許更新完整的行,只有特定用戶擁有的列更新了。這就意味著,如果兩個用戶更新相同的記錄但卻不同的列,那么這兩個更新都會成功,而且來自這兩個用戶的更改都是可見的。(EF默認實現(xiàn)不了這種情況)
c.詢問用戶:當一個用戶嘗試更新一個記錄時,但是該記錄自從他讀取之后已經(jīng)被別人修改了,這時應用程序就會警告該用戶該數(shù)據(jù)已經(jīng)被某人更改了,然后詢問他是否仍然要重寫該數(shù)據(jù)還是首先檢查已經(jīng)更新的數(shù)據(jù)。(EF可以實現(xiàn)這種情況,在后面詳細介紹)
d.拒絕修改:當一個用戶嘗試更新一個記錄時,但是該記錄自從他讀取之后已經(jīng)被別人修改了,此時告訴該用戶不允許更新該數(shù)據(jù),因為數(shù)據(jù)已經(jīng)被某人更新了。
(EF可以實現(xiàn)這種情況,在后面詳細介紹)
②.消極并發(fā)(悲觀并發(fā)、悲觀鎖):無論何時從數(shù)據(jù)庫請求數(shù)據(jù),數(shù)據(jù)都會被讀取,然后該數(shù)據(jù)上就會加鎖,因此沒有人能訪問該數(shù)據(jù)。這會降低并發(fā)出現(xiàn)問題的機會,缺點是加鎖是一個昂貴的操作,會降低整個應用程序的性能。
消極并發(fā)的本質就是永遠不讓沖突發(fā)生,通常的處理凡是是只讀鎖和更新鎖。
a. 當把只讀鎖放到記錄上時,應用程序只能讀取該記錄。如果應用程序要更新該記錄,它必須獲取到該記錄上的更新鎖。如果記錄上加了只讀鎖,那么該記錄仍然能夠被想要只讀鎖的請求使用。然而,如果需要更新鎖,該請求必須等到所有的只讀鎖釋放。同樣,如果記錄上加了更新鎖,那么其他的請求不能再在這個記錄上加鎖,該請求必須等到已存在的更新鎖釋放才能加鎖。
總結,這里我們可以簡單理解把并發(fā)業(yè)務部分用一個鎖(如:lock,實質是數(shù)據(jù)庫鎖,后面章節(jié)單獨介紹)鎖住,使其同時只允許一個線程訪問即可。
b. 加鎖會帶來很多弊端:
(1):應用程序必須管理每個操作正在獲取的所有鎖;
(2):加鎖機制的內(nèi)存需求會降低應用性能
(3):多個請求互相等待需要的鎖,會增加死鎖的可能性。
總結:盡量不要使用消極并發(fā),EF默認是不支持消極并發(fā)的!!!
注意:EF默認就是積極并發(fā),當然EF也可以配置成消極并發(fā)。
二. 并發(fā)機制的解決方案
1. 從架構的角度去解決(大層次 如:12306買票)
nginx負載均衡、數(shù)據(jù)庫讀寫分離、多個業(yè)務服務器、多個數(shù)據(jù)庫服務器、NoSQL, 使用隊列來處理業(yè)務,將高并發(fā)的業(yè)務依次放到隊列中,然后按照先進先出的原則,?逐個處理(隊列的處理可以采用 Redis、RabbitMq等等)
(PS:在后面的框架篇章里詳細介紹該方案)
2. 從代碼的角度去解決(在服務器能承載壓力的情況下,并發(fā)訪問同一條數(shù)據(jù))
實際的業(yè)務場景:如進銷存類的項目,涉及到同一個物品的出庫、入庫、庫存,我們都知道庫存在數(shù)據(jù)庫里對應了一條記錄,入庫要查出現(xiàn)在庫存的數(shù)量,然后加上入庫的數(shù)量,假設兩個線程同時入庫,假設查詢出來的庫存數(shù)量相同,但是更新庫存數(shù)量在數(shù)據(jù)庫層次上是有先后,最終就保留了后更新的數(shù)據(jù),顯然是不正確的,應該保留的是兩次入庫的數(shù)量和。
(該案例的實質:多個線程同時在一條相同的數(shù)據(jù)上執(zhí)行多個數(shù)據(jù)庫操作)
事先準備一張數(shù)據(jù)庫表:
解決方案一:(最常用的方式)
給入庫和出庫操作加一個鎖,使其同時只允許一個線程訪問,這樣即使兩個線程同時訪問,但在代碼層次上,由于鎖的原因,還是有先有后的,這樣就保證了入庫操作的線程唯一性,當然庫存量就不會出錯了.
總結:該方案可以說是適合處理小范圍的并發(fā)且鎖內(nèi)的業(yè)務執(zhí)行不是很復雜。假設一萬線程同時入庫,每次入庫要等2s,那么這一萬個線程執(zhí)行完成需要的總時間非常多,顯然不適合。
? ? (這種方式的實質就是給核心業(yè)務加了個lock鎖,這里就不做測試了)
?
解決方案二:EF處理積極并發(fā)帶來的沖突
1. 配置準備
(1). 針對DBFirst模式,可以給相應的表額外加一列RowVersion,數(shù)據(jù)庫中為timestamp類型,對應的類中為byte[]類型,并且在Edmx模型上給該字段的并發(fā)模式設置為fixed(默認為None),這樣該表中所有字段都監(jiān)控并發(fā)。
如果不想監(jiān)視所有列(在不添加RowVersion的情況下),只需在Edmx模型是給特定的字段的并發(fā)模式設置為fixed,這樣只有被設置的字段被監(jiān)測并發(fā)。
測試結果: (DBFirst模式下的并發(fā)測試)
事先在UserInfor1表中插入一條id、userName、userSex、userAge均為1的數(shù)據(jù)(清空數(shù)據(jù))。
測試情況1:
在不設置RowVersion并發(fā)模式為Fixed的情況下,兩個線程修改不同字段(修改同一個字段一個道理),后執(zhí)行的線程的結果覆蓋前面的線程結果.
發(fā)現(xiàn)測試結果為:1,1,男,1 ; 顯然db1線程修改的結果被db2線程給覆蓋了. (修改同一個字段一個道理)
?View Code
測試情況2:
設置RowVersion并發(fā)模式為Fixed的情況下,兩個線程修改不同字段(修改同一個字段一個道理),如果該條數(shù)據(jù)已經(jīng)被修改,利用DbUpdateConcurrencyException可以捕獲異常,進行積極并發(fā)的沖突處理。測試結果如下:
a.RefreshMode.ClientWins: 1,1,男,1
b.RefreshMode.StoreWins: 1,ypf,1,1
c.ex.Entries.Single().Reload(); 1,ypf,1,1
1 {2 //1.創(chuàng)建兩個EF上下文,模擬代表兩個線程3 var db1 = new ConcurrentTestDBEntities();4 var db2 = new ConcurrentTestDBEntities();5 6 UserInfor1 user1 = db1.UserInfor1.Find("1");7 UserInfor1 user2 = db2.UserInfor1.Find("1");8 9 //2. 執(zhí)行修改操作 10 //(db1的線程先執(zhí)行完修改操作,并保存) 11 user1.userName = "ypf"; 12 db1.Entry(user1).State = EntityState.Modified; 13 db1.SaveChanges(); 14 15 //(db2的線程在db1線程修改完成后,執(zhí)行修改操作) 16 try 17 { 18 user2.userSex = "男"; 19 db2.Entry(user2).State = EntityState.Modified; 20 db2.SaveChanges(); 21 22 Console.WriteLine("測試成功"); 23 } 24 catch (DbUpdateConcurrencyException ex) 25 { 26 Console.WriteLine("測試失敗:" + ex.Message); 27 28 //1. 保留上下文中的現(xiàn)有數(shù)據(jù)(即最新,最后一次輸入) 29 //var oc = ((IObjectContextAdapter)db2).ObjectContext; 30 //oc.Refresh(RefreshMode.ClientWins, user2); 31 //oc.SaveChanges(); 32 33 //2. 保留原始數(shù)據(jù)(即數(shù)據(jù)源中的數(shù)據(jù)代替當前上下文中的數(shù)據(jù)) 34 //var oc = ((IObjectContextAdapter)db2).ObjectContext; 35 //oc.Refresh(RefreshMode.StoreWins, user2); 36 //oc.SaveChanges(); 37 38 //3. 保留原始數(shù)據(jù)(而Reload處理也就是StoreWins,意味著放棄當前內(nèi)存中的實體,重新到數(shù)據(jù)庫中加載當前實體) 39 ex.Entries.Single().Reload(); 40 db2.SaveChanges(); 41 } 42 }測試情況3:
在不設置RowVersion并發(fā)模式為Fixed的情況下(也不需要RowVersion這個字段),單獨設置userName字段的并發(fā)模式為Fixed,兩個線程同時修改該字段,利用DbUpdateConcurrencyException可以捕獲異常,進行積極并發(fā)的沖突處理,但如果是兩個線程同時修改userName以外的字段,將不能捕獲異常,將走EF默認的處理方式,后執(zhí)行的覆蓋先執(zhí)行的。
a.RefreshMode.ClientWins: 1,ypf2,1,1
b.RefreshMode.StoreWins: 1,ypf,1,1
c.ex.Entries.Single().Reload(); 1,ypf,1,1
?View Code
(2). 針對CodeFirst模式,需要有這樣的一個屬性 public byte[] RowVersion { get; set; },并且給屬性加上特性1702220118,這樣該表中所有字段都監(jiān)控并發(fā)。如果不想監(jiān)視所有列(在不添加RowVersion的情況下),只需給特定的字段加上特性 [ConcurrencyCheck],這樣只有被設置的字段被監(jiān)測并發(fā)。
除了再配置上不同于DBFirst模式以為,是通過加特性的方式來標記并發(fā),其它捕獲并發(fā)和積極并發(fā)的幾類處理方式均同DBFirst模式相同。(這里不做測試了)
2. 積極并發(fā)處理的三種形式總結:
利用DbUpdateConcurrencyException可以捕獲異常,然后:
a. RefreshMode.ClientWins:保留上下文中的現(xiàn)有數(shù)據(jù)(即最新,最后一次輸入)
b. RefreshMode.StoreWins:保留原始數(shù)據(jù)(即數(shù)據(jù)源中的數(shù)據(jù)代替當前上下文中的數(shù)據(jù))
c.ex.Entries.Single().Reload(); 保留原始數(shù)據(jù)(而Reload處理也就是StoreWins,意味著放棄當前內(nèi)存中的實體,重新到數(shù)據(jù)庫中加載當前實體)
3. 該方案總結:
這種模式實質上就是獲取異常告訴程序,讓開發(fā)人員結合需求自己選擇怎么處理,但這種模式是解決代碼層次上的并發(fā)沖突,并不是解決大數(shù)量同時訪問崩潰問題的。
解決方案三:利用隊列來解決業(yè)務上的并發(fā)(架構層次上其實也是這種思路解決的)
1.先分析:
前面說過所謂的高并發(fā),就是海量的用戶同時向服務器發(fā)送請求,進行某個業(yè)務處理(比如定時秒殺的搶單),而這個業(yè)務處理是需要?一定時間的。
2.處理思路:
將海量用戶的請求放到一個隊列里(如:Queue),先不進行業(yè)務處理,然后另外一個服務器從線程中讀取這個請求(MVC框架可以放到Global全局里),依次進行業(yè)務處理,至于處理完成后,是否需要告訴客戶端,可以根據(jù)實際需求來定,如果需要的話(可以借助Socket、Signalr、推送等技術來進行).
特別注意:讀取隊列的線程是一直在運行,只要隊列中有數(shù)據(jù),就給他拿出來.
這里使用Queue隊列,可以參考:http://www.cnblogs.com/yaopengfei/p/8322016.html
(PS:架構層次上的處理方案無非隊列是單獨一臺服務器,執(zhí)行從隊列讀取的是另外一臺業(yè)務服務器,處理思想是相同的)
隊列單例類的代碼:
?View Code
PS:這里的入隊和出隊都要加鎖,因為Queue默認不是線程安全的,不加鎖會存在資源競用問題從而業(yè)務出錯,或者直接使用ConcurrentQueue線程安全的隊列,就不需要加鎖了,關于隊列線程安全問題詳見:http://www.cnblogs.com/yaopengfei/p/8322016.html
臨時存儲數(shù)據(jù)類的代碼:
1 /// <summary>2 /// 該類用來存儲請求信息3 /// </summary>4 public class TempInfor5 {6 /// <summary>7 /// 用戶編號8 /// </summary>9 public string userId { get; set; } 10 }模擬高并發(fā)入隊,單獨線程出隊的代碼:
1 {2 //3.1 模擬高并發(fā)請求 寫入隊列3 {4 for (int i = 0; i < 100; i++)5 {6 Task.Run(() =>7 {8 TempInfor tempInfor = new TempInfor();9 tempInfor.userId = Guid.NewGuid().ToString("N"); 10 //下面進行入隊操作 11 QueueUtils.instanse.Enqueue(tempInfor); 12 13 }); 14 } 15 } 16 //3.2 模擬另外一個線程隊列中讀取數(shù)據(jù)請求標記,進行相應的業(yè)務處理(該線程一直運行,不停止) 17 Task.Run(() => 18 { 19 while (true) 20 { 21 if (QueueUtils.instanse.getCount() > 0) 22 { 23 //下面進行出隊操作 24 TempInfor tempInfor2 = (TempInfor)QueueUtils.instanse.Dequeue(); 25 26 //拿到請求標記,進行相應的業(yè)務處理 27 Console.WriteLine("id={0}的業(yè)務執(zhí)行成功", tempInfor2.userId); 28 } 29 } 30 }); 31 //3.3 模擬過了一段時間(6s后),又有新的請求寫入 32 Thread.Sleep(6000); 33 Console.WriteLine("6s的時間已經(jīng)過去了"); 34 { 35 for (int j = 0; j < 100; j++) 36 { 37 Task.Run(() => 38 { 39 TempInfor tempInfor = new TempInfor(); 40 tempInfor.userId = Guid.NewGuid().ToString("N"); 41 //下面進行入隊操作 42 QueueUtils.instanse.Enqueue(tempInfor); 43 44 }); 45 } 46 } 47 }3.下面案例的測試結果:
一次輸出100條數(shù)據(jù),6s過后,再一次輸出100條數(shù)據(jù)。
4. 總結:
該方案是一種迂回的方式處理高并發(fā),在業(yè)內(nèi)這種思想也是非常常見,但該方案也有一個弊端,客戶端請求的實時性很難保證,或者即使要保證(比如引入實時通訊技術),
?也要付出不少代價.
?
解決方案四: 利用數(shù)據(jù)庫自有的鎖機制進行處理
? (在后面數(shù)據(jù)鎖機制章節(jié)進行介紹)
總結
以上是生活随笔為你收集整理的第二十节: 深入理解并发机制以及解决方案(锁机制、EF自有机制、队列模式等)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 银行将有大动作,不良贷款余额开始增加,新
- 下一篇: 光大银行信用卡现金分期怎么办理