并发编程-CAS
?
CAS (compareAndSwap),中文叫比較交換,一種無鎖原子算法。
過程是這樣:它包含 3 個參數 CAS(V,E,N),V表示要更新變量的值,E表示預期值,N表示新值。僅當 V值等于E值時,才會將V的值設為N,如果V值和E值不同,則說明已經有其他線程做兩個更新,則當前線程則什么都不做。最后,CAS 返回當前V的真實值。CAS 操作時抱著樂觀的態度進行的,它總是認為自己可以成功完成操作。
CAS是一條CPU的原子指令,其作用是讓CPU先進行比較兩個值是否相等,然后原子地更新某個位置的值,其實現方式是基于硬件平臺的匯編指令,在intel的CPU中,使用的是cmpxchg指令,就是說CAS是靠硬件實現的,從而在硬件層面提升效率。
當多個線程同時使用CAS 操作一個變量時,只有一個會勝出,并成功更新,其余均會失敗。失敗的線程不會掛起,僅是被告知失敗,并且允許再次嘗試,當然也允許實現的線程放棄操作。基于這樣的原理,CAS 操作即使沒有鎖,也可以發現其他線程對當前線程的干擾。
與鎖相比,使用CAS會使程序看起來更加復雜一些,但由于其非阻塞的,它對死鎖問題天生免疫,并且,線程間的相互影響也非常小。更為重要的是,使用無鎖的方式完全沒有鎖競爭帶來的系統開銷,也沒有線程間頻繁調度帶來的開銷,因此,他要比基于鎖的方式擁有更優越的性能。
簡單的說,CAS 需要你額外給出一個期望值,也就是你認為這個變量現在應該是什么樣子的。如果變量不是你想象的那樣,哪說明它已經被別人修改過了。你就需要重新讀取,再次嘗試修改就好了。
CAS底層原理
這樣歸功于硬件指令集的發展,實際上,我們可以使用同步將這兩個操作變成原子的,但是這么做就沒有意義了。所以我們只能靠硬件來完成,硬件保證一個從語義上看起來需要多次操作的行為只通過一條處理器指令就能完成。這類指令常用的有:?
1. 測試并設置(Tetst-and-Set)?
2. 獲取并增加(Fetch-and-Increment)?
3. 交換(Swap)?
4. 比較并交換(Compare-and-Swap)?
5. 加載鏈接/條件存儲(Load-Linked/Store-Conditional)
?
CPU 實現原子指令有2種方式:?
1. 通過總線鎖定來保證原子性。?
總線鎖定其實就是處理器使用了總線鎖,所謂總線鎖就是使用處理器提供的一個 LOCK# 信號,當一個處理器在總線上輸出此信號時,其他處理器的請求將被阻塞住,那么該處理器可以獨占共享內存。但是該方法成本太大。因此有了下面的方式。
2、通過緩存鎖定來保證原子性。?
所謂 緩存鎖定 是指內存區域如果被緩存在處理器的緩存行中,并且在Lock 操作期間被鎖定,那么當他執行鎖操作寫回到內存時,處理器不在總線上聲言 LOCK# 信號,而是修改內部的內存地址,并允許他的緩存一致性機制來保證操作的原子性,因為緩存一致性機制會阻止同時修改兩個以上處理器緩存的內存區域數據(這里和 volatile 的可見性原理相同),當其他處理器回寫已被鎖定的緩存行的數據時,會使緩存行無效。
注意:有兩種情況下處理器不會使用緩存鎖定。?
1. 當操作的數據不能被緩存在處理器內部,或操作的數據跨多個緩存行時,則處理器會調用總線鎖定。?
2. 有些處理器不支持緩存鎖定,對于 Intel 486 和 Pentium 處理器,就是鎖定的內存區域在處理器的緩存行也會調用總線鎖定。
?
CAS舉例
JUC下的atomic類都是通過CAS來實現的,下面就以AtomicInteger為例來闡述CAS的實現。如下:
??
import java.util.concurrent.atomic.AtomicInteger;public class CAS1 {private static volatile int m = 0;private static AtomicInteger atomicI = new AtomicInteger(0);private static final int ThreadsNum = 200000;public static void increase1() {++m; //不具備原子性}public static void increase2() {atomicI.incrementAndGet(); //具備原子性}public static void main(String[] args) throws InterruptedException {Thread[] t = new Thread[ThreadsNum];for(int tf = 0; tf < ThreadsNum; ++tf) {t[tf] = new Thread(() -> {increase1();});t[tf].start();t[tf].join();}System.out.println(m);Thread[] var4 = new Thread[ThreadsNum];for(int i = 0; i < ThreadsNum; ++i) {var4[i] = new Thread(() -> {increase2();});var4[i].start();var4[i].join();}System.out.println("atomic:" + atomicI.get());}}?
Unsafe是CAS的核心類,Java無法直接訪問底層操作系統,而是通過本地(native)方法來訪問。不過盡管如此,JVM還是開了一個后門:Unsafe,它提供了硬件級別的原子操作。
?
內部調用unsafe的getAndAddInt方法,在getAndAddInt方法中主要是看compareAndSwapInt方法:
?
CAS可以保證一次的讀-改-寫操作是原子操作,在單處理器上該操作容易實現,但是在多處理器上實現就有點兒復雜了。
緩存加鎖:其實針對于上面那種情況我們只需要保證在同一時刻對某個內存地址的操作是原子性的即可。緩存加鎖就是緩存在內存區域的數據如果在加鎖期間,當它執行鎖操作寫回內存時,處理器不在輸出LOCK#信號,而是修改內部的內存地址,利用緩存一致性協議來保證原子性。緩存一致性機制可以保證同一個內存區域的數據僅能被一個處理器修改,也就是說當CPU1修改緩存行中的i時使用緩存鎖定,那么CPU2就不能同時緩存了i的緩存行
CAS缺點
CAS雖然高效地解決了原子操作,但是還是存在一些缺陷的,主要表現在三個方法:循環時間太長、只能保證一個共享變量原子操作、ABA問題。
?
循環時間太長
如果CAS一直不成功呢?這種情況絕對有可能發生,如果自旋CAS長時間地不成功,則會給CPU帶來非常大的開銷。在JUC中有些地方就限制了CAS自旋的次數,例如BlockingQueue的SynchronousQueue。
?
只能保證一個共享變量原子操作
看了CAS的實現就知道這只能針對一個共享變量,如果是多個共享變量就只能使用鎖了,當然如果你有辦法把多個變量整成一個變量,利用CAS也不錯。例如讀寫鎖中state的高地位
ABA問題
CAS需要檢查操作值有沒有發生改變,如果沒有發生改變則更新。但是存在這樣一種情況:如果一個值原來是A,變成了B,然后又變成了A,那么在CAS檢查的時候會發現沒有改變,但是實質上它已經發生了改變,這就是所謂的ABA問題。對于ABA問題其解決方案是加上版本號,即在每個變量都加上一個版本號,每次改變時加1,即A —> B —> A,變成1A —> 2B —> 3A。
import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicStampedReference;public class CAS2 {private static volatile int m = 0;private static AtomicInteger atomicI = new AtomicInteger(100);//解決ABA問題private static AtomicStampedReference asr = new AtomicStampedReference(Integer.valueOf(100), 1);public static void main(String[] args) throws InterruptedException { // new Thread(() -> { // System.out.println(atomicI.compareAndSet(100, 110)); // }).start(); // new Thread(() -> { // try { // TimeUnit.SECONDS.sleep(2L); // } catch (InterruptedException var1) { // var1.printStackTrace(); // } // // System.out.println(atomicI.compareAndSet(110, 100)); // }).start(); // new Thread(() -> { // try { // TimeUnit.SECONDS.sleep(3L); // } catch (InterruptedException var1) { // var1.printStackTrace(); // } // // System.out.println(atomicI.compareAndSet(100, 120)); // }).start(); // System.out.println("==========================================");Thread tf1 = new Thread(() -> {try {TimeUnit.SECONDS.sleep(2L);} catch (InterruptedException var1) {var1.printStackTrace();}System.out.println(asr.compareAndSet(Integer.valueOf(100), Integer.valueOf(110), asr.getStamp(), asr.getStamp() + 1));System.out.println(asr.compareAndSet(Integer.valueOf(110), Integer.valueOf(100), asr.getStamp(), asr.getStamp() + 1));});Thread tf2 = new Thread(() -> {int stamp = asr.getStamp();try {TimeUnit.SECONDS.sleep(4L);} catch (InterruptedException var2) {var2.printStackTrace();}System.out.println(asr.compareAndSet(Integer.valueOf(100), Integer.valueOf(120), stamp, stamp + 1));});tf1.start();tf2.start();}}
?
轉載于:https://www.cnblogs.com/yintingting/p/11427621.html
與50位技術專家面對面20年技術見證,附贈技術全景圖總結
- 上一篇: 并发编程-volatile和synchr
- 下一篇: 并发编程-AQS