同一个事务里面对同一条数据做2次修改_要我说,多线程事务它必须就是个伪命题!
這是why技術的第 74 篇原創文章
別問,問就是不行
分布式事務你應該是知道的。但是這個多線程事務......
沒事,我慢慢給你說。
如圖所示,有個小伙伴想要實現多線程事務。
這個需求其實我在不同的地方看到過很多次,所以我才說:這個問題又出現了。
那么有解決方案嗎?
在此之前,我的回答都是非常的肯定:毋庸置疑,肯定是沒有的。
為什么呢?
我們先從理論上去推理一下。
來,首先我問你,事務的特性是什么?
這個不難吧?八股文必背內容之一,ACID 必須張口就來:
- 原子性(Atomicity)
- 一致性(Consistency)
- 隔離性(Isolation)
- 持久性(Durability)
那么問題又來了,你覺得如果有多線程事務,那么我們破壞了哪個特性?
多線程事務你也別想的多深奧,你就想,兩個不同的用戶各自發起了一個下單請求,這個請求對應的后臺實現邏輯中是有事務存在的。
這不就是多線程事務嗎?
這種場景下你沒有想過怎么分別去控制兩個用戶的事務操作吧?
因為這兩個操作之間就是完全隔離的,各自拿著各自的鏈接玩兒。
所以多個事務之間的最基本的原則是什么?
隔離性。兩個事務操作之間不應該相互干擾。
而多線程事務想要實現的是 A 線程異常了。A,B 線程的事務一起回滾。
事務的特性里面就卡的死死的。所以,多線程事務從理論上就是行不通的。
通過理論指導實踐,那么多線程事務的代碼也就是寫不出來的。
前面說到隔離性。那么請問,Spring 的源碼里面,對于事務的隔離性是如何保證的呢?
答案就是 ThreadLocal。
在事務開啟的時候,把當前的鏈接保存在了 ThreadLocal 里面,從而保證了多線程之間的隔離性:
可以看到,這個 resource 對象是一個 ThreadLocal 對象。
在下面這個方法中進行了賦值操作:
org.springframework.jdbc.datasource.DataSourceTransactionManager#doBegin
其中的 bindResource 方法中,就是把當前鏈接綁定到當前線程中,其中的 resource 就是我們剛剛說的 ThreadLocal:
就是每個線程里面都各自玩自己的,我們不可能打破 ThreadLocal 的使用規則,讓各個線程共享同一個 ThreadLocal 吧?
鐵子,你要是這樣去做的話,那豈不是走遠了?
所以,無論從理論上,還是代碼實現上,我都認為這個需求是不能實現的。
至少我之前是這樣想的。
但是事情,稍稍的發生了一點點的變化。
說個場景,常規實現
任何脫離場景討論技術實現的行為都是耍流氓。
所以,我們先看一下場景是什么。
假設我們有一個大數據系統,每天指定時間,我們就需要從大數據系統中拉取 50w 條數據,對數據進行一個清洗操作,然后把數據保存到我們業務系統的數據庫中。
對于業務系統而言,這 50w 條數據,必須全部落庫,差一條都不行。要么就是一條都不插入。
在這個過程中,不會去調用其他的外部接口,也不會有其他的流程去操作這個表的數據。
既然說到一條不差了,那么對于大家直觀而言,想到的肯定是兩個解決方案:
對于這種需求,開啟事務,然后在 for 循環中一條條的插入可以說是非常 low 的解決方案了。
效率非常的低下,給大家演示一下。
比如,我們有一個 Student 表,表結構非常簡單,如下:
CREATE TABLE `student` (`id` bigint(63) NOT NULL AUTO_INCREMENT,`name` varchar(32) DEFAULT NULL,`home` varchar(64) DEFAULT NULL,PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;在我們的項目中,我們通過 for 循環插入數據,同時該方法上有 @Transactional 注解:
num 參數是我們通過前端請求傳遞過來的數據,代表要插入 num 條數據:
這種情況下,我們可以通過下面的鏈接,模擬插入指定數量的數據:
http://127.0.0.1:8081/insertOneByOne?num=xxx
我嘗試了把 num 設置為 50w,讓它慢慢的跑著,但是我還是太年輕了,等了非常長的時間都沒有等到結果。
于是我把 num 改為了 5000,運行結果如下:
insertOneByOne執行耗時:133449ms,num=5000
一條條的插入 5000 條數據,耗時 133.5 s 的樣子。
按照這個速度,插入 50w 條數據得 13350s,大概也是這么多小時:
這誰頂得住啊。
所以,這方案擁有巨大的優化空間。
比如我們優化為這樣批量插入:
其對應的 sql 語句是這樣的:
insert into table ([列名],[列名]) VALUES ([列值],[列值]), ([列值],[列值]);
我們還是通過前端接口調用:
當我們的 num 設置為 5000 的時候,我頁面刷新了 10 次,你看耗時基本上在 200ms 毫秒以內:
從 133.5s 到 200ms,朋友們,這是什么東西?
這是質的飛躍啊。性能提升了近 667 倍的樣子。
為什么批量插入能有這么大的飛躍呢?
你想啊,之前 for 循環插入,雖然 SpringBoot 2.0 默認使用了 HikariPool,連接池里面默認給你搞 10 個連接。
但是你只需要一個連接,開啟一次事務。這個不耗時。
耗時的地方是你 5000 次 IO 呀。
所以,耗時長是必然的。
而批量插入只是一條 sql 語句,所以只需要一個連接,還不需要開啟事務。
為啥不用開啟事務?
你一條 sql 開啟事務有錘子用啊?
那么,如果我們一口氣插入 50w 條數據,會是怎么樣的呢?
來,搞一波,試一下:
http://127.0.0.1:8081/insertBatch?num=500000
可以看到拋出了一個異常。而且錯誤信息非常的清晰:
Packet for query is too large (42777840 > 1048576). You can change this value on the server by setting the max_allowed_packet' variable.; nested exception is com.mysql.jdbc.PacketTooBigException: Packet for query is too large (42777840 > 1048576).You can change this value on the server by setting the max_allowed_packet' variable.說你這個包太大了??梢酝ㄟ^設置 max_allowed_packet 來改變包大小。
我們可以通過下面的語句查詢當前的配置大小:
select @@max_allowed_packet;
可以看到是 1048576,即 1024*1024,1M 大小。
而我們需要傳輸的包大小是 42777840 字節,大概是 41M 的樣子。
所以我們需要修改配置大小。
這個地方也給大家提了個醒:如果你的 sql 語句非常大,里面有大字段,記得調整一下 mysql 的這個參數。
可以通過修改配置文件或者直接執行 sql 語句的方式進行修改。
我這里就使用 sql 語句修改為 64M:
set global max_allowed_packet = 1024*1024*64;
然后再次執行,可以看到插入成功了:
50w 的數據,74s 的樣子。
數據要么全部提交,要么一條也沒有,需求也實現了。
時間上呢,是有點長,但是好像也想不到什么好的提升方案。
那么我們怎么還能再縮短點時間呢?
騷想法出現了
我能想到的,只能是祭出多線程了。
50w 數據。我們開五個線程,一個線程處理 10w 數據,沒有異常就保存入庫,出現問題就回滾。
這個需求很好實現。分分鐘就能寫出來。
但是再加上一個需求:這 5 個線程的數據,如果有一個線程出現問題了,需要全部回滾。
順著思路慢慢擼,我們發現這個時候就是所謂的多線程事務了。
我之前說完全不可能實現是因為提到事務我就想到了 @Transactional 注解去實現了。
我們只需要正確使用它,然后關系業務邏輯即可,不需要也根本插手不了事務的開啟和提交或者回滾。
這種代碼的寫法我們叫做聲明式事務。
和聲明式事務對應的就是編程式事務了。
通過編程式事務,我們就能完全掌控事務的開啟和提交或者回滾操作。
能想到編程式事務,這事基本上就成了一半了。
你想,首先我們有一個全局變量為 Boolean 類型,默認為可以提交。
在子線程里面,我們可以先通過編程式事務開啟事務,然后插入 10w 條數據后,但是不提交。同時告訴主線程,我這邊準備好了,進入等待。
如果子線程里面出現了異常,那么我就告訴主線程,我這邊出問題了,然后自己進行回滾。
最后主線程收集到了 5 個子線程的狀態。
如果有一個線程出現了問題,那么設置全局變量為不可提交。
然后喚醒所有等待的子線程,進行回滾。
根據上面的流程,寫出模擬代碼就是這樣的,大家可以直接復制出來運行:
public class MainTest {//是否可以提交public static volatile boolean IS_OK = true;public static void main(String[] args) {//子線程等待主線程通知CountDownLatch mainMonitor = new CountDownLatch(1);int threadCount = 5;CountDownLatch childMonitor = new CountDownLatch(threadCount);//子線程運行結果List<Boolean> childResponse = new ArrayList<Boolean>();ExecutorService executor = Executors.newCachedThreadPool();for (int i = 0; i < threadCount; i++) {int finalI = i;executor.execute(() -> {try {System.out.println(Thread.currentThread().getName() + ":開始執行"); // if (finalI == 4) { // throw new Exception("出現異常"); // }TimeUnit.MILLISECONDS.sleep(ThreadLocalRandom.current().nextInt(1000));childResponse.add(Boolean.TRUE);childMonitor.countDown();System.out.println(Thread.currentThread().getName() + ":準備就緒,等待其他線程結果,判斷是否事務提交");mainMonitor.await();if (IS_OK) {System.out.println(Thread.currentThread().getName() + ":事務提交");} else {System.out.println(Thread.currentThread().getName() + ":事務回滾");}} catch (Exception e) {childResponse.add(Boolean.FALSE);childMonitor.countDown();System.out.println(Thread.currentThread().getName() + ":出現異常,開始事務回滾");}});}//主線程等待所有子線程執行responsetry {childMonitor.await();for (Boolean resp : childResponse) {if (!resp) {//如果有一個子線程執行失敗了,則改變mainResult,讓所有子線程回滾System.out.println(Thread.currentThread().getName()+":有線程執行失敗,標志位設置為false");IS_OK = false;break;}}//主線程獲取結果成功,讓子線程開始根據主線程的結果執行(提交或回滾)mainMonitor.countDown();//為了讓主線程阻塞,讓子線程執行。Thread.currentThread().join();} catch (Exception e) {e.printStackTrace();}} }在所有子線程都正常的情況下,輸出結果是這樣的:
從結果看,是符合我們的預期的。
假設有子線程出現了異常,那么運行結果是這樣的:
一個線程出現異常,全部線程都進行回滾,這樣看來也是符合預期的。
如果你根據前面的需求寫出了這樣的代碼,那么恭喜你,一不留神實現了一個類似于兩階段提交(2PC)的一致性協議。
我前面說的能想到編程式事務,這事基本上就成了一半了。
而另外一半,就是兩階段提交(2PC)。
依瓢畫葫蘆
有了前面的瓢,你照著畫個葫蘆不是很簡單的事情嗎?
就不大段上代碼了,示例代碼可以點擊這里獲取到,所以我這里截個圖吧:
上面的代碼應該是非常好理解的,開啟五個線程,每個線程插入 10w 條數據。
這個不用說,用腳趾頭想也能知道,肯定是比一次性批量插入 50w 條數據快的。
至于快多少,不廢話了,直接看執行效果吧。
由于我們的 controller 是這樣的:
所以調用鏈接:
http://127.0.0.1:8081/batchHandle
輸出結果如下:
還記得我們批量插入的耗時嗎?
73791ms。
從 73791ms 到 15719ms。快了 58s 的樣子。
已經非常不錯了。
那么如果是某個線程拋出了異常呢?比如這樣:
我們看看日志輸出:
通過日志分析,看起來也是符合要求的。
而從讀者反饋的實際測試效果來看,也是非常顯著的:
真的符合要求嗎?
符合要求,只是看起來而已。
經驗老道的讀者朋友們肯定早就看到問題所在了。已經把手舉得高高的:老師,這題我知道。
我之前說了,這個實現方式實際上就是編程式事務配合二階段提交(2PC)使用。
破綻就出在 2PC 上。
就像我和讀者討論這樣的:
不能再往后扯了,再往后就是 3PC,TCC,Seata 這一套分布式事務的東西了。
這套東西寫下來,就得上萬字了。所以我從海神那邊轉了一篇文章,放在第二條推送里面了。如果大家有興趣的可以去看一下。干貨滿滿。
其實當我們把一個個子線程理解為微服務中的一個個子系統的時候,這就是一個分布式事務的場景了。
而我們拿出來的解決方案,并不是一個完美的解決方案。
雖然,從某種角度上,我們繞開了事務的隔離性,但是有一定概率出現數據一致性問題,雖然概率比較小。
所以我稱之為這種方案叫做:基于運氣編程,用運氣換時間。
注意事項
關于上面的代碼,其實還有幾個需要注意的地方。
給大家提個醒。
第一個:啟用多少線程進行分配數據插入,這個參數是可以進行調整的。
比如我修改為 10 個線程,每個線程插入 5w 條數據。那么執行時間又快了 2s:
但是一定記得不是越大越好,同時記得調整數據庫連接池的最大連接數。不然白搭。
第二個:正是因為啟動多少線程是可以進行調整的,甚至是可以每次進行計算的。
那么必須要注意的一個問題是不能讓任何一個任務進入隊列里面。一旦進入隊列,程序立馬就涼。
你想,如果我們需要開啟 5 個子線程,但是核心線程數只有 4 個,有一個任務進入隊列了。
那么這 4 個核心線程會一直阻塞住,等待主線程喚醒。
而主線程這個時候在干什么?
在等 5 個線程的運行結果,但是它只能收集到 4 個結果。
所以它會一直等下去。
第三個:這里是多個線程開啟了事務在往表里插入數據,謹防數據庫死鎖。
第四個:注意程序里面的代碼,countDown 安裝標準寫法上是要放到 finally 代碼塊里面的,我這里為了截圖的美觀度,省去了這個步驟:
你如果真的要用,得注意一下。而且這個finally你得想清楚了寫,不是隨便寫的。
第五個:我這里只是提供一個思路,而且它也根本不是什么多線程事務。
也再次證明了,多線程事務就是一個偽命題。
所以我給出一個基于運氣的偽一致性的回答也不過分吧。
第六個:多線程事務換個角度想,可以理解為分布式事務。,可以借助這個案例去了解分布式事務。但是解決分布式事務的最好的方法就是:不要有分布式事務!
而解決分布式事務的絕大部分落地方案都是:最終一致性。
性價比高,大多數業務上也能接受。
第七個:這個解決方案你要拿到生產用的話,記得先和業務同事溝通好,能不能接受這種情況。速度和安全之間的兩難抉擇。
同時自己留好人工修數的接口:
最后說一句
才疏學淺,難免會有紕漏,如果你發現了錯誤的地方,可以在留言區提出來,我對其加以修改。 感謝您的閱讀,我堅持原創,十分歡迎并感謝您的關注。
我是 why,一個被代碼耽誤的文學創作者,不是大佬,但是喜歡分享,是一個又暖又有料的四川好男人。
還有,歡迎關注我呀。
總結
以上是生活随笔為你收集整理的同一个事务里面对同一条数据做2次修改_要我说,多线程事务它必须就是个伪命题!的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: python能爬视频吗_Python爬取
- 下一篇: 币未来趋势分析_分析:中国便利店零售市场