无锁-1
我發(fā)現(xiàn)JDK當(dāng)中的一些工具類(lèi),大量的都是用了無(wú)鎖的一些工具,其實(shí)我們已經(jīng)有提到他的基本概念,首先無(wú)鎖呢,他就是無(wú)障礙的運(yùn)行,它是指所有的線(xiàn)程,都能夠同時(shí)進(jìn)入臨界區(qū),但是無(wú)鎖在無(wú)障礙的基礎(chǔ)上呢,他增加了一條,我每一次競(jìng)爭(zhēng)的時(shí)候呢,都能夠決定出一個(gè)優(yōu)勝者,并不是大家都失敗,因此相對(duì)于無(wú)障礙來(lái)講呢,可能是一個(gè)更加切實(shí)可行的方案,但是事情也不是絕對(duì)這樣子的,因?yàn)楦鶕?jù)有些理論來(lái)說(shuō)呢,在實(shí)踐中,其實(shí)無(wú)障礙和無(wú)等待是比較接近的,只是從理論上來(lái)講,無(wú)障礙看起來(lái)呢,似乎線(xiàn)程有點(diǎn)出不去,但是實(shí)際上呢,運(yùn)氣不會(huì)這么差,那么無(wú)鎖這個(gè)實(shí)現(xiàn)原理呢,是使用了CAS的指令,那么CAS是一個(gè)什么樣的思路呢,是說(shuō)我現(xiàn)在要對(duì)這個(gè)數(shù)據(jù)進(jìn)行一個(gè)賦值,這個(gè)數(shù)據(jù)本來(lái)是在臨界區(qū)當(dāng)中,一個(gè)被保護(hù)的一個(gè)數(shù)據(jù),那我要對(duì)她賦值,如果多個(gè)線(xiàn)程同時(shí)進(jìn)來(lái),我應(yīng)該只有一個(gè)線(xiàn)程能夠成功的,那么我怎么判斷的,這個(gè)線(xiàn)程可以成功呢,他這里就要求說(shuō),線(xiàn)程在對(duì)數(shù)據(jù)進(jìn)行操作的時(shí)候呢,你需要給出一個(gè)值,如果期望值和數(shù)據(jù)實(shí)際的值,是相符的,那么你就可以把這個(gè)數(shù)據(jù)設(shè)置下去,否則你就失敗,因?yàn)槿绻f(shuō)你這個(gè)期望的數(shù)值,和他實(shí)際的數(shù)值不相符,那么我們就可能覺(jué)得呢,你這個(gè)數(shù)據(jù),在你操作的限期當(dāng)中呢,已經(jīng)被其他線(xiàn)程給修改過(guò)了,因?yàn)檫@個(gè)數(shù)據(jù)已經(jīng)被其他數(shù)據(jù)修改過(guò)了,所以你這一次就不能修改了,你要修改這個(gè)數(shù)據(jù)呢,要等到下一次去修改,CAS就是Compare And Swap,比較和交換,可以說(shuō)每一次操作的時(shí)候呢,它會(huì)先去讀一下,讀一下當(dāng)前值是多少,把這個(gè)期望值,讀出來(lái)的數(shù)據(jù),跟你現(xiàn)在的數(shù)據(jù)做一個(gè)比較,CAS總是抱著一個(gè)樂(lè)觀的操作,完不成就重試,他認(rèn)為這種概率是很小的,JAVA虛擬機(jī)的時(shí)候也介紹過(guò)無(wú)鎖的算法,CAS基本的思想,他是不是有一點(diǎn)小的bug在里面,因?yàn)槟惆岩粋€(gè)值先去讀,比了之后你再設(shè),如果說(shuō)你這個(gè)數(shù)據(jù)讀出來(lái)的時(shí)候,比較完了之后,還沒(méi)有設(shè)進(jìn)去之前,有一個(gè)新的線(xiàn)程進(jìn)來(lái)了,這個(gè)數(shù)據(jù)不就有一些沖突,他就認(rèn)為這個(gè)CAS步驟太多,那么在步驟之間呢,有可能會(huì)被其他線(xiàn)程干擾,那么這是不是一個(gè)合適的方案呢,上面的這個(gè)擔(dān)心是多余的,為什么呢,因?yàn)镃AS整的一個(gè)操作過(guò)程,它是一個(gè)原子操作,是由一條CPU執(zhí)行完成的,不是由好多CPU去完成的,他只是由一條CPU去完成的,比較交換,他的邏輯是這樣子的
我目標(biāo)值是不是和我的值是不是相等,相等我就設(shè)一個(gè)標(biāo)志,并且把原始值設(shè)置到目標(biāo)里面去,否則我就不設(shè)了,CAS它是從指令層面,來(lái)保證他這個(gè)操作是可靠的,是有效的,那么JAVA當(dāng)中呢,他提供了一些有關(guān)無(wú)鎖類(lèi)的使用,提供比較交換指令,來(lái)實(shí)現(xiàn)的,與阻塞的相比呢,有鎖的方式呢,因?yàn)槟阌辛随i之后呢,你會(huì)導(dǎo)致這個(gè)鎖被阻塞,會(huì)被掛起,甚至他進(jìn)入臨界區(qū)之前,系統(tǒng)對(duì)他進(jìn)行了掛起,無(wú)鎖的方式性能會(huì)更好一些,因?yàn)橐话銇?lái)說(shuō),除非你認(rèn)為的掛起這個(gè)線(xiàn)程,否則在你通過(guò)無(wú)鎖的方式呢,這個(gè)線(xiàn)程是不會(huì)被掛起的,會(huì)不斷地做重試,根據(jù)我們之前有說(shuō)過(guò),一個(gè)線(xiàn)程如果被掛起,如果我只是做一次重試操作,很有可能只是一個(gè)循環(huán)體,如果我這個(gè)循環(huán)體不是太復(fù)雜的話(huà)呢,可能也就是2到3條指令,10條以?xún)?nèi)的指令,因此在這種情況之下,你相當(dāng)于在拿一個(gè)很少的成本,讀鎖的操作要比阻塞的效果要好很多,在無(wú)鎖類(lèi)當(dāng)中最有名的一個(gè)呢,應(yīng)該就是AtomicInteger,這個(gè)無(wú)鎖的整數(shù),這個(gè)無(wú)鎖的整數(shù)呢,他其實(shí)就是一個(gè)Integer,它是在concurrent的atomic的包當(dāng)中,他繼承與這個(gè)Number,其實(shí)Integer也是繼承與Number這個(gè)類(lèi)的public class AtomicInteger extends Number implements java.io.Serializable {他的主要接口有這么多
第一個(gè)是get,取得當(dāng)前這個(gè)數(shù)字,set就是把這個(gè)值設(shè)下去,getAndSet,設(shè)置一個(gè)新的值,返回一個(gè)舊的值,CompareAndSet,舊的期望值,我要設(shè)置的這個(gè)值,如果期望值等于當(dāng)前值,那我就設(shè)置成功,失敗就返回false,getAndIncrement,我取得舊的值,并且把新的值加1,這是一個(gè)線(xiàn)程安全的操作,這是減1,這也是一個(gè)線(xiàn)程安全的操作,這里是加上一個(gè)delta,這兩個(gè)函數(shù)都可以那他來(lái)實(shí)現(xiàn),一個(gè)傳1,一個(gè)傳-1,那這里是添加和獲取,他返回的是新值,這個(gè)和上面也是類(lèi)似的,先增加再去get,這就是主要的一些接口,那我們先來(lái)看一下,接口做了哪些具體的實(shí)現(xiàn),他內(nèi)部有一個(gè)非常重要的字段,一個(gè)value,他這個(gè)Integer封裝的一個(gè)類(lèi)型,所有的操作都是對(duì)value去做的,value才是他內(nèi)部真正的值,Atomic只是對(duì)他的一個(gè)包裝而已private volatile int value;看一個(gè)比較具有代表性的,/*** Atomically sets the value to the given updated value* if the current value {@code ==} the expected value.** @param expect the expected value* @param update the new value* @return {@code true} if successful. False return indicates that* the actual value was not equal to the expected value.*/
public final boolean compareAndSet(int expect, int update) {return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}compareAndSet,他做了一個(gè)什么事情呢,所期望值是這么多,我要更新的新值是這么多,如果成功了我就返回true,如果失敗就返回false,如果返回false呢,就表示實(shí)際的值和期望的值是不相等的,是這么個(gè)意思,這里還用到了unsafe,從這個(gè)名詞可以看到,它是一個(gè)不安全的,為什么JAVA相對(duì)于C++和C要比較安全呢,因?yàn)樗延嘘P(guān)指針,一些內(nèi)容呢給屏蔽掉了,那么unsafe他恰恰相反,相較于JAVA比較底層呢,它會(huì)去提供一些,類(lèi)似于指針的一些操作,看這個(gè)unsafe的compareAndSet,他就是說(shuō),我要對(duì)于這個(gè)類(lèi),這個(gè)偏移量上的,這個(gè)數(shù)據(jù),看他的期望值是多少,非常有C感覺(jué)的一個(gè)操作,那么我們看看valueOffset是什么東西呢private static final long valueOffset;他是一個(gè)偏移量,那這個(gè)偏移量是哪里來(lái)的呢,它是從objectFieldOffset里面拿到的static {try {valueOffset = unsafe.objectFieldOffset(AtomicInteger.class.getDeclaredField("value"));} catch (Exception ex) { throw new Error(ex); }
}在對(duì)象當(dāng)中的一個(gè)偏移量,這里內(nèi)部肯定是一些C的實(shí)現(xiàn),我們來(lái)看一下getAndIncrement/*** Atomically increments by one the current value.** @return the previous value*/
public final int getAndIncrement() {return unsafe.getAndAddInt(this, valueOffset, 1);
}這個(gè)是指說(shuō),我返回一個(gè)當(dāng)前值,并且把這個(gè)值加1,我先把這個(gè)值get出來(lái),就把這個(gè)值加1,因此我加1加出的數(shù)據(jù),一定是我當(dāng)前這個(gè)值的加1,不可能是別人改過(guò)的數(shù)據(jù),然后我會(huì)做比較,我比較的期望值是多少呢,是當(dāng)前值,然后我的目標(biāo)呢是next,如果有其他線(xiàn)程先于我修改這個(gè)數(shù)據(jù),那就會(huì)導(dǎo)致我這個(gè)current,跟實(shí)際的期望值不相等,所以我這個(gè)設(shè)置必然失敗,如果我這個(gè)設(shè)置失敗,這個(gè)函數(shù)我返回false,我就走不到return,走不到return怎么辦呢,我就走到for循環(huán)重置一次,再把它拿出來(lái),再去嘗試做一個(gè)設(shè)置,直到我設(shè)置成功為止,就在我加和進(jìn)一步設(shè)置之間,沒(méi)有人來(lái)妨礙我,所以我設(shè)置成功,一旦我設(shè)置成功呢,我就把舊的那個(gè)值給返回,從這個(gè)代碼當(dāng)中呢,所謂的無(wú)鎖算法,基本上它會(huì)套在一個(gè)死循環(huán)當(dāng)中,死循環(huán)里面一直做重試,知道成功為止,這個(gè)也是一個(gè)非常通用的一個(gè)思路,其他一些應(yīng)該也是差不多的,/*** Atomically adds the given value to the current value.** @param delta the value to add* @return the previous value*/
public final int getAndAdd(int delta) {return unsafe.getAndAddInt(this, valueOffset, delta);
}基本上都是一個(gè)死循環(huán),下面我們來(lái)看一下unsafe
Unsafe提供了一些不太安全的一些操作,在JDK內(nèi)部,并不是對(duì)外提供的,如果你要是想去拿unsafe實(shí)例呢,實(shí)際上是需要?jiǎng)右恍┦帜_的,他提供的一些操作呢,根據(jù)偏移量去設(shè)置一個(gè)數(shù)據(jù),什么叫根據(jù)偏移量去設(shè)置一個(gè)數(shù)據(jù)呢,就是我們剛才在這里有看過(guò),這也是unsafe里面的一個(gè)函數(shù)static {try {valueOffset = unsafe.objectFieldOffset(AtomicInteger.class.getDeclaredField("value"));} catch (Exception ex) { throw new Error(ex); }
}我拿到AtomicInteger實(shí)例,這個(gè)實(shí)例他的字段,他在這個(gè)類(lèi)上的偏移量是多少,這個(gè)怎么理解呢,如果說(shuō)我們有寫(xiě)過(guò)C的代碼呢,就會(huì)比較清楚,我們C里面有一個(gè)結(jié)構(gòu)體,結(jié)構(gòu)體如果你定義兩個(gè),一個(gè)int a,第二個(gè)是int b,這個(gè)時(shí)候如果說(shuō),a的偏移量是0,int是占4個(gè)字節(jié)的,b的偏移量就是4,你只要給我結(jié)構(gòu)的基地址,加上偏移量呢,offset就是4,我加上偏移量4之后呢,拿到的就是b所在的地址,我拿到b這個(gè)地址之后呢,我就可以對(duì)b進(jìn)行操作,到JAVA當(dāng)中也是一樣,只不過(guò)我是一個(gè)class,因?yàn)槲疫€有對(duì)象頭部的一些信息,然后才可能是我的字段,比如我有個(gè)int value,或者我還有其他的亂七八糟的東西,那么object的offset方法呢,要拿的就是類(lèi)class的一個(gè)偏移量,所以我們后面就可以根據(jù)這個(gè)偏移量呢,去做一些設(shè)置,/*** Atomically sets the value to the given updated value* if the current value {@code ==} the expected value.** @param expect the expected value* @param update the new value* @return {@code true} if successful. False return indicates that* the actual value was not equal to the expected value.*/
public final boolean compareAndSet(int expect, int update) {return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}對(duì)于這個(gè)實(shí)例,在C里面有一個(gè)首地址,在這個(gè)偏移量的位置上,我去做一個(gè)int的一個(gè)設(shè)置,我希望它是expect的,然后他還提供一個(gè)park,就是把這個(gè)線(xiàn)程停下來(lái),我們?cè)诤竺娴恼n程當(dāng)中會(huì)講到,然后底層的CAS操作,是在unsafe里面實(shí)現(xiàn)的,因?yàn)閁nsafe這個(gè)類(lèi)呢,是非公開(kāi)的API,可能會(huì)出現(xiàn)比較大的差異,因?yàn)樗槐WC是向前或向后兼容的,因?yàn)樗⒉幌M阌眠@個(gè)東西,這里就是unsafe的一些函數(shù),getint什么意思呢,我把這個(gè)對(duì)象的offset,把它看成一個(gè)整數(shù),Unsafe在JDK內(nèi)部是被大量使用的,而且包括其他第三方的一些框架,高性能的一些框架,內(nèi)部也會(huì)使用Unsafe這個(gè)類(lèi),我們?cè)賮?lái)看一下AtomicReference
和AtomicInteger相比,AtomicIngeger他封裝的是一個(gè)整數(shù),而Reference他是引用,他是一個(gè)對(duì)象的引用,你只要對(duì)這個(gè)對(duì)象的引用進(jìn)行修改,那你就可以用AtomicReference進(jìn)行修改,我們可以看一下,這個(gè)AtomicReferecne他是一個(gè)模板/*** An object reference that may be updated atomically. See the {@link* java.util.concurrent.atomic} package specification for description* of the properties of atomic variables.* @since 1.5* @author Doug Lea* @param <V> The type of object referred to by this reference*/
public class AtomicReference<V> implements java.io.Serializable它帶有一個(gè)目標(biāo)的變量V,他可以封裝任意類(lèi)型的數(shù)據(jù),他和Integer非常類(lèi)似private volatile V value;他也有一個(gè)valueOffsetprivate static final long valueOffset;static {try {valueOffset = unsafe.objectFieldOffset(AtomicReference.class.getDeclaredField("value"));} catch (Exception ex) { throw new Error(ex); }
}他也有g(shù)e方法/*** Gets the current value.** @return the current value*/
public final V get() {return value;
}/*** Sets to the given value.** @param newValue the new value*/
public final void set(V newValue) {value = newValue;
}/*** Atomically sets the value to the given updated value* if the current value {@code ==} the expected value.* @param expect the expected value* @param update the new value* @return {@code true} if successful. False return indicates that* the actual value was not equal to the expected value.*/
public final boolean compareAndSet(V expect, V update) {return unsafe.compareAndSwapObject(this, valueOffset, expect, update);
}這些是比較常用的,/*** Atomically sets to the given value and returns the old value.** @param newValue the new value* @return the previous value*/
@SuppressWarnings("unchecked")
public final V getAndSet(V newValue) {return (V)unsafe.getAndSetObject(this, valueOffset, newValue);
}Integer我們這里使用10個(gè)線(xiàn)程,對(duì)Integer做add操作,如果說(shuō)它是線(xiàn)程安全的話(huà)呢,每個(gè)線(xiàn)程加1萬(wàn)次,那10個(gè)線(xiàn)程就是10萬(wàn)
package com.learn.thread;import java.util.concurrent.atomic.AtomicInteger;public class AtomicIntegerDemo {static AtomicInteger i = new AtomicInteger();public static class AddThread implements Runnable{@Overridepublic void run() {for(int k=0;k<1000;k++) {i.incrementAndGet();}}}public static void main(String[] args) throws InterruptedException {Thread[] ts = new Thread[10];for(int k=0;k<10;k++) {ts[k] = new Thread(new AddThread());}for(int k=0;k<10;k++){ts[k].start();}for(int k=0;k<10;k++){ts[k].join();}System.out.println(i);}}
證明他的線(xiàn)程安全性是沒(méi)有問(wèn)題的,下面我們來(lái)看一下Reference,這里是使用Reference來(lái)封裝一個(gè)Stringhttps://www.jianshu.com/p/9de799ef9742我把String指向abc這個(gè)字符串,然后嘗試把它修改成def,大家可以看到,這段代碼我開(kāi)啟的是10個(gè)線(xiàn)程,很顯然在這個(gè)修改過(guò)程當(dāng)中,只有一個(gè)線(xiàn)程是能夠成功的,這里讓每個(gè)線(xiàn)程休眠若干秒,都是有一個(gè)隨機(jī)的線(xiàn)程,而其他線(xiàn)程由于被改成def之后呢,就不可能執(zhí)行修改這個(gè)操作了,這里第10個(gè)線(xiàn)程修改這個(gè)操作
package com.learn.thread;import java.util.concurrent.atomic.AtomicReference;public class AtomicReferenceTest {public static final AtomicReference<String> atomicStr=new AtomicReference<String>("abc");public static void main(String[] args) {for(int i=0;i<10;i++) {new Thread(){public void run() {try {Thread.sleep((int)Math.abs(Math.random()*100));}catch (InterruptedException e) {e.printStackTrace();}if(atomicStr.compareAndSet("abc", "def")) {System.out.println("Thread:"+Thread.currentThread().getId()+"change value to def");}else {System.out.println("Thread:"+Thread.currentThread().getId()+"failed");}}}.start();}}
}
Thread:10change value to def
Thread:16failed
Thread:18failed
Thread:17failed
Thread:19failed
Thread:13failed
Thread:14failed
Thread:15failed
Thread:11failed
Thread:12failed
那么其他線(xiàn)程只能宣告失敗,因?yàn)閏ompare是不滿(mǎn)足要求的,從這里我們就能夠看到,如果你有一個(gè)對(duì)象的引用,你希望他可以在多個(gè)線(xiàn)程,修改這個(gè)引用的時(shí)候呢,希望特可以保持線(xiàn)程的安全,你就可以使用AtomicReference,下面再來(lái)看一下AtomicStampReference,他也是一個(gè)對(duì)象的引用,Stample我們一般認(rèn)為是一個(gè)郵戳,時(shí)間戳,也就是一個(gè)有唯一性標(biāo)識(shí)的一個(gè)字段,比如說(shuō)我們的時(shí)間戳,它具有一個(gè)唯一性標(biāo)識(shí),或者我們一個(gè)遞增的數(shù)據(jù),這樣遞增上去的,沒(méi)有重復(fù)的,這個(gè)東西我們就認(rèn)為他是一個(gè)stample,這個(gè)東西是干什么用的呢,他是為了解決這么一個(gè)問(wèn)題,如果我們現(xiàn)在有一個(gè)數(shù)值,Reference是A,然后我們把它該成了B,然后我們?cè)侔阉某闪薃,這種情況下,有一個(gè)線(xiàn)程,他首先拿到Referece是A,接著他通過(guò)compare操作呢,接著他拿到A之后呢,他可能會(huì)做一些自己額外的一些操作,比如他會(huì)做一些計(jì)算,然后他開(kāi)始準(zhǔn)備賦值了,這個(gè)時(shí)候有另外一個(gè)線(xiàn)程,把這個(gè)A呢改成了B,又有一個(gè)線(xiàn)程把這個(gè)B又改成了A,然后這個(gè)線(xiàn)程計(jì)算完畢之后呢,開(kāi)始做這個(gè)操作,他也做這個(gè)CS操作,他讀了一下發(fā)現(xiàn)還是A,因?yàn)樗J(rèn)為A這個(gè)數(shù)據(jù)沒(méi)有被人改過(guò),所以他就成功的把這個(gè)數(shù)據(jù)設(shè)置回去了,設(shè)置成C,那么這么一個(gè)過(guò)程,如果只是一個(gè)簡(jiǎn)單的加法,那么問(wèn)題不大,因?yàn)槭呛瓦^(guò)程狀態(tài)無(wú)關(guān)的,只和最終結(jié)果是相關(guān)的,答案也不會(huì)錯(cuò),但是有些情況之下呢,我們?cè)O(shè)置的這個(gè)數(shù)據(jù),可能是和他的過(guò)程相關(guān)的,打個(gè)比方比如說(shuō),我們要給哪個(gè)用戶(hù),充10塊錢(qián),那么每個(gè)人只能充值一次,不能因?yàn)槟慊?0塊錢(qián),我就再給你充10塊錢(qián),這個(gè)不行的,當(dāng)你對(duì)這個(gè)數(shù)據(jù)的變化過(guò)程,是敏感的時(shí)候,其實(shí)用普通的CS操作呢,是沒(méi)有辦法來(lái)區(qū)別這個(gè)A和這個(gè)A的,如果說(shuō)我們要區(qū)別這個(gè)A,那我們有一個(gè)很聰明的辦法呢,就是我們給每一個(gè)對(duì)象加上一個(gè)time,加上一個(gè)時(shí)間戳,假設(shè)我們給他加上一個(gè)s,改了B之后呢,比如叫s+1,當(dāng)再次改成A的時(shí)候呢,這個(gè)time就改成s+2,我不僅僅看A,我還要這個(gè)time是否是s,我最開(kāi)始把A拿出來(lái)的時(shí)候呢,我不僅僅把A拿出來(lái)了,我還要把這個(gè)S拿出來(lái)了,我既檢查A是否相等,并且還要檢查S是否相等,我發(fā)現(xiàn)s不等于s+2的,所以我這個(gè)設(shè)置就會(huì)失敗,StampleReference,他就是解決這個(gè)ABA的問(wèn)題,確保在狀態(tài)過(guò)程敏感的程序當(dāng)中呢,不會(huì)出現(xiàn)類(lèi)似的問(wèn)題
假設(shè)這里有一個(gè)程序,給用戶(hù)充錢(qián),這個(gè)money表示用戶(hù)的余額,有3個(gè)線(xiàn)程不斷地去更新,只要用戶(hù)的錢(qián)小于20呢,馬上給用戶(hù)加20塊錢(qián),這個(gè)線(xiàn)程模擬用戶(hù)的消費(fèi),只要用戶(hù)大于10塊錢(qián)呢,我就消費(fèi)10塊錢(qián),這里有三個(gè)線(xiàn)程,3個(gè)充值線(xiàn)程,和1個(gè)消費(fèi)線(xiàn)程,那么我們可以看到,因?yàn)樵瓉?lái)是19塊,你充了20塊之后呢,就變成了39,然后用戶(hù)消費(fèi)掉兩個(gè)10之后呢,也還會(huì)變成19塊,還是變成ABA的問(wèn)題,我們只希望用戶(hù)一次充值的機(jī)會(huì),不能給他多次充值,所以和過(guò)程狀態(tài)是有關(guān)系的,就不能滿(mǎn)足你充值的要求了,這里對(duì)后臺(tái)進(jìn)程來(lái)講,給用戶(hù)充值20塊錢(qián)在這里,然后他在充值的同時(shí),除了設(shè)置金額,還設(shè)置了timestamp,就是我們說(shuō)的Stamp,它會(huì)在這上面+1,每一次我修改我都會(huì)加1,使得這個(gè)stamp永遠(yuǎn)都不會(huì)重復(fù)的,用戶(hù)的修改也要對(duì)他加1,那么我們來(lái)運(yùn)行這個(gè)程序,來(lái)看一下結(jié)果,這個(gè)用戶(hù)在整個(gè)過(guò)程當(dāng)中只充值一次
package com.learn.thread;import java.util.concurrent.atomic.AtomicStampedReference;public class AtomicStampedReferenceDemo {static AtomicStampedReference<Integer> money=new AtomicStampedReference<Integer>(19, 0) ;public static void main(String[] args) {//模擬多個(gè)線(xiàn)程同時(shí)更新后臺(tái)數(shù)據(jù)庫(kù),為用戶(hù)充值for(int i=0;i<3;i++) {final int timestamp=money.getStamp();new Thread() {public void run() {while(true) {while(true) {Integer m=money.getReference();if(m<20) {if(money.compareAndSet(m, m+20, timestamp, timestamp+1)) {System.out.println("余額小于20,充值成功,余額:"+money.getReference()+"元");break;}}else {break;}}}}}.start();}//用戶(hù)消費(fèi)線(xiàn)程,模擬消費(fèi)行為new Thread() {public void run() {for(int i=0;i<100;i++) {while(true) {int timeStamp=money.getStamp();Integer m=money.getReference();if(m>10) {System.out.println("大于10元");if(money.compareAndSet(m, m-10, timeStamp, timeStamp+1)) {System.out.println("成功消費(fèi)10元,余額:"+money.getReference());break;}}}}}}.start();}
}
余額小于20,充值成功,余額:39元
大于10元
成功消費(fèi)10元,余額:29
大于10元
成功消費(fèi)10元,余額:19
大于10元
成功消費(fèi)10元,余額:9
只充值了一次,但是到了19元沒(méi)有再次充值的發(fā)生,那我們可以想想,如果我們不給他加1,我下面也不給他加1,那這樣的情況呢,就相當(dāng)于把StampleReference,他不會(huì)去判斷stamp有沒(méi)有修改過(guò),我們會(huì)發(fā)現(xiàn)它會(huì)不停的運(yùn)行,只要用戶(hù)的錢(qián)少了,他就立即沖進(jìn)去,就是stamp導(dǎo)致的問(wèn)題,從內(nèi)部實(shí)現(xiàn)上來(lái)看呢,他的實(shí)現(xiàn)也沒(méi)有讓人太驚奇的地方,即使封裝也是在JAVA基礎(chǔ)上做的,/*** Atomically sets the value of both the reference and stamp* to the given update values if the* current reference is {@code ==} to the expected reference* and the current stamp is equal to the expected stamp.** @param expectedReference the expected value of the reference* @param newReference the new value for the reference* @param expectedStamp the expected value of the stamp* @param newStamp the new value for the stamp* @return {@code true} if successful*/
public boolean compareAndSet(V expectedReference,V newReference,int expectedStamp,int newStamp) {Pair<V> current = pair;returnexpectedReference == current.reference &&expectedStamp == current.stamp &&((newReference == current.reference &&newStamp == current.stamp) ||casPair(current, Pair.of(newReference, newStamp)));
}StampedReferece呢,他內(nèi)部封裝了一個(gè)Pairprivate static class Pair<T> {final T reference;final int stamp;private Pair(T reference, int stamp) {this.reference = reference;this.stamp = stamp;}static <T> Pair<T> of(T reference, int stamp) {return new Pair<T>(reference, stamp);}
}這一對(duì)是什么呢,首先是我們要用的數(shù)據(jù)reference,對(duì)于以前的value來(lái)講,這個(gè)AtomicReference來(lái)講呢,線(xiàn)程把這個(gè)value進(jìn)行一個(gè)封裝,這個(gè)Value包裝在Pair里面,然后還給他一個(gè)stamp,這個(gè)stamp就是我們說(shuō)的,這個(gè)stamp就是決定你這個(gè)數(shù)據(jù)能不能設(shè)置成功,這個(gè)of方法是靜態(tài)的工廠(chǎng)方法,他生成一個(gè)Pair的實(shí)例,StampedReference里面是包裝了Pair實(shí)例,當(dāng)你去拿Reference的時(shí)候呢,當(dāng)然就返回pair的reference,/*** Returns the current value of the reference.** @return the current value of the reference*/
public V getReference() {return pair.reference;
}/*** Returns the current value of the stamp.** @return the current value of the stamp*/
public int getStamp() {return pair.stamp;
}當(dāng)你視圖去修改reference的時(shí)候呢,你要提供一個(gè)期望的引用數(shù)據(jù),你要設(shè)置你這個(gè)對(duì)象的引用,以及你的stamp值,你什么情況下能夠設(shè)置成功呢,當(dāng)你期望的這個(gè)值和你當(dāng)前的這個(gè)值相等,并且這個(gè)stamp值也是一樣的情況下呢,這兩個(gè)都相等的情況下,才有可能設(shè)置的成功,并且說(shuō)如果你期望的這個(gè)值,新的值和當(dāng)前的值,也可以認(rèn)為是成功的一個(gè)標(biāo)志,private boolean casPair(Pair<V> cmp, Pair<V> val) {return UNSAFE.compareAndSwapObject(this, pairOffset, cmp, val);
}下面我們?cè)賮?lái)看一下數(shù)組,我們有了引用,有了普通的Integer,當(dāng)然還有一個(gè)Long,還有對(duì)數(shù)組的一個(gè)支持,那么數(shù)組的支持是什么樣的一個(gè)情況呢,數(shù)組相對(duì)來(lái)講,里面就是有若干個(gè)成員,整數(shù)的數(shù)組,這里必然需要一些API,來(lái)取得他下標(biāo)的一個(gè)值
相對(duì)于Integer相比呢,他要多傳入一個(gè)下標(biāo),其他都是非常非常類(lèi)似的,因?yàn)檫@個(gè)數(shù)組是AtomicIntegerArray,所以它里面的每一項(xiàng),每一個(gè)元素,他都是線(xiàn)程安全的,只要你調(diào)用這個(gè)方法去做,這里我們開(kāi)了10個(gè)線(xiàn)程,一運(yùn)行這個(gè)程序之后呢,所有的數(shù)據(jù)都是10萬(wàn)
package com.learn.thread;import java.util.concurrent.atomic.AtomicIntegerArray;public class AtomicIntegerArrayDemo {static AtomicIntegerArray arr=new AtomicIntegerArray(10);public static class AddThread implements Runnable{@Overridepublic void run() {for(int k=0;k<100000;k++) {arr.getAndIncrement(k%arr.length());}}}public static void main(String[] args) throws InterruptedException {Thread[] ts=new Thread[10];for(int k=0;k<10;k++) {ts[k]=new Thread(new AddThread());}for(int k=0;k<10;k++) {ts[k].start();}for(int k=0;k<10;k++) {ts[k].join();}System.out.println(arr);}
}
[100000, 100000, 100000, 100000, 100000, 100000, 100000, 100000, 100000, 100000]
其實(shí)每個(gè)位都是線(xiàn)程安全的,那么像這樣的AtomicIntegerArray,它是怎么實(shí)現(xiàn)的呢,它內(nèi)部會(huì)封裝一個(gè)普通的array,private final int[] array;他也沒(méi)有volatile,也不需要,當(dāng)你試圖去get的時(shí)候呢,它里面有一個(gè)方法getRaw/*** Gets the current value at position {@code i}.** @param i the index* @return the current value*/
public final int get(int i) {return getRaw(checkedByteOffset(i));
}private int getRaw(long offset) {return unsafe.getIntVolatile(array, offset);
}也就是unsafe當(dāng)中,去取得一個(gè)整數(shù)的一個(gè)數(shù)值,從哪里取呢,從我們這個(gè)array對(duì)象當(dāng)中取,取哪個(gè)位置呢,取這個(gè)offset,取這個(gè)偏移量,也就是對(duì)這個(gè)array來(lái)講,對(duì)這個(gè)數(shù)組來(lái)講,偏移為offset的整數(shù),他把他拿出來(lái),private long checkedByteOffset(int i) {if (i < 0 || i >= array.length)throw new IndexOutOfBoundsException("index " + i);return byteOffset(i);
}private static long byteOffset(int i) {return ((long) i << shift) + base;
}對(duì)于一些高性能的實(shí)現(xiàn)當(dāng)中,并不會(huì)使用synchronized,它會(huì)用一些相對(duì)比較復(fù)雜,偏移量的方式,這里有一個(gè)checkedByteOffset,去拿第i個(gè)下標(biāo),他的偏移量是多少,那么這個(gè)偏移量是多少呢,從這里這么算出來(lái)的,當(dāng)然你不可以越界,越界就直接拋異常了,他通過(guò)byteOffset去算,這里我們?cè)敿?xì)解釋一下private static final int base = unsafe.arrayBaseOffset(int[].class);第一個(gè)元素的基地址,其他元素的首地址,static {int scale = unsafe.arrayIndexScale(int[].class);if ((scale & (scale - 1)) != 0)throw new Error("data type scale not a power of two");shift = 31 - Integer.numberOfLeadingZeros(scale);
}
IntegerFieldUpdater,整數(shù)的字段的更新,比如我們?cè)谙到y(tǒng)中使用成員變量,在我們最開(kāi)始使用的時(shí)候呢,我們只是簡(jiǎn)單地把它定義為int,但是你在后面的過(guò)程當(dāng)中呢,我可能不想去改這個(gè)數(shù)據(jù)類(lèi)型,這個(gè)時(shí)候就可以選擇IntegerFieldUpdater,普通變量也享受原子操作
這里分?jǐn)?shù)顯然不是Atomic的一個(gè)定義,但是是volatile,如果我們沒(méi)有把它定義成volatile沒(méi)有關(guān)系,如果你把int改成AtomicInteger,那這個(gè)改動(dòng)相對(duì)而言會(huì)比較大,AtomicIntegerFieldUpdater,我們來(lái)修改這個(gè)int和score,在內(nèi)部他也是使用反射/*** Creates and returns an updater for objects with the given field.* The Class argument is needed to check that reflective types and* generic types match.** @param tclass the class of the objects holding the field* @param fieldName the name of the field to be updated* @param <U> the type of instances of tclass* @return the updater* @throws IllegalArgumentException if the field is not a* volatile integer type* @throws RuntimeException with a nested reflection-based* exception if the class does not hold field or is the wrong type,* or the field is inaccessible to the caller according to Java language* access control*/
@CallerSensitive
public static <U> AtomicIntegerFieldUpdater<U> newUpdater(Class<U> tclass,String fieldName) {return new AtomicIntegerFieldUpdaterImpl<U>(tclass, fieldName, Reflection.getCallerClass());
}AtomicIntegerFieldUpdaterImpl(final Class<T> tclass,final String fieldName,final Class<?> caller) {final Field field;final int modifiers;try {field = AccessController.doPrivileged(new PrivilegedExceptionAction<Field>() {public Field run() throws NoSuchFieldException {return tclass.getDeclaredField(fieldName);}});modifiers = field.getModifiers();sun.reflect.misc.ReflectUtil.ensureMemberAccess(caller, tclass, null, modifiers);ClassLoader cl = tclass.getClassLoader();ClassLoader ccl = caller.getClassLoader();if ((ccl != null) && (ccl != cl) &&((cl == null) || !isAncestor(cl, ccl))) {sun.reflect.misc.ReflectUtil.checkPackageAccess(tclass);}} catch (PrivilegedActionException pae) {throw new RuntimeException(pae.getException());} catch (Exception ex) {throw new RuntimeException(ex);}if (field.getType() != int.class)throw new IllegalArgumentException("Must be integer type");if (!Modifier.isVolatile(modifiers))throw new IllegalArgumentException("Must be volatile type");// Access to protected field members is restricted to receivers only// of the accessing class, or one of its subclasses, and the// accessing class must in turn be a subclass (or package sibling)// of the protected member's defining class.// If the updater refers to a protected field of a declaring class// outside the current package, the receiver argument will be// narrowed to the type of the accessing class.this.cclass = (Modifier.isProtected(modifiers) &&tclass.isAssignableFrom(caller) &&!isSamePackage(tclass, caller))? caller : tclass;this.tclass = tclass;this.offset = U.objectFieldOffset(field);
}
package com.learn.thread;import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;public class AtomicIntegerFieldUpdaterDemo {public static class Candidate{int id;volatile int score;}public final static AtomicIntegerFieldUpdater<Candidate> scoreUpdater=AtomicIntegerFieldUpdater.newUpdater(Candidate.class, "score");//檢查Updater是否工作正確public static AtomicInteger allScore=new AtomicInteger(0);public static void main(String[] args) throws InterruptedException {final Candidate stu=new Candidate();Thread[] ts=new Thread[10000];for(int i=0;i<10000;i++) {ts[i]=new Thread() {public void run() {if(Math.random()>0.4) {scoreUpdater.incrementAndGet(stu);allScore.incrementAndGet();}}};ts[i].start();}for(int i=0;i<10000;i++) {ts[i].join();}System.out.println("score="+stu.score);System.out.println("allscore="+allScore);}
}
大家如果對(duì)JDK自帶的Vector比較屬性的話(huà)呢,Vector是一個(gè)向量,是一個(gè)數(shù)組,我們介紹add方法的一個(gè)實(shí)現(xiàn),/*** Adds the specified component to the end of this vector,* increasing its size by one. The capacity of this vector is* increased if its size becomes greater than its capacity.** <p>This method is identical in functionality to the* {@link #add(Object) add(E)}* method (which is part of the {@link List} interface).** @param obj the component to be added*/
public synchronized void addElement(E obj) {modCount++;ensureCapacityHelper(elementCount + 1);elementData[elementCount++] = obj;
}他是個(gè)同步方法,只有一個(gè)線(xiàn)程能夠?qū)?shù)據(jù)進(jìn)行操作,/*** The array buffer into which the components of the vector are* stored. The capacity of the vector is the length of this array buffer,* and is at least large enough to contain all the vector's elements.** <p>Any array elements following the last element in the Vector are null.** @serial*/
protected Object[] elementData;所有的元素實(shí)際上都保存在object數(shù)組當(dāng)中,當(dāng)add一個(gè)元素進(jìn)去的時(shí)候呢,ensureCapacityHelper對(duì)容量的檢查,數(shù)組容量是固定的,所以當(dāng)你初始化Vector的時(shí)候呢,他元素的格式實(shí)際上已經(jīng)確定在里面了,所以每次添加之前,他都得看,我現(xiàn)在會(huì)不會(huì)越界,如果越界我就要做擴(kuò)展/*** This implements the unsynchronized semantics of ensureCapacity.* Synchronized methods in this class can internally call this* method for ensuring capacity without incurring the cost of an* extra synchronization.** @see #ensureCapacity(int)*/
private void ensureCapacityHelper(int minCapacity) {// overflow-conscious codeif (minCapacity - elementData.length > 0)grow(minCapacity);
}private void grow(int minCapacity) {// overflow-conscious codeint oldCapacity = elementData.length;int newCapacity = oldCapacity + ((capacityIncrement > 0) ? capacityIncrement : oldCapacity);if (newCapacity - minCapacity < 0)newCapacity = minCapacity;if (newCapacity - MAX_ARRAY_SIZE > 0)newCapacity = hugeCapacity(minCapacity);elementData = Arrays.copyOf(elementData, newCapacity);
}擴(kuò)展的時(shí)候我怎么擴(kuò)呢,capacityIncrement這個(gè)你在構(gòu)造函數(shù)中是可以指定他的,不指定他就是0,就返回oldCapacity老的容量,新的容量是誰(shuí)決定的呢,絕大部分是不指定的,成倍的增長(zhǎng)可能并不是特別的合適,下面我們來(lái)看一下無(wú)鎖的Vector,這里是為無(wú)鎖的Vector做一些鋪墊,這里用到的是ReferenceArray,對(duì)象的數(shù)組,多線(xiàn)程考慮的,并且是無(wú)鎖的,并行程序就有這個(gè)問(wèn)題,當(dāng)你進(jìn)行修改,你多個(gè)線(xiàn)程的同步時(shí)非常困難的,多線(xiàn)程的難度就是當(dāng)你多個(gè)線(xiàn)程去修改的時(shí)候呢,當(dāng)我第一個(gè)元素存儲(chǔ)滿(mǎn)了,當(dāng)我第一個(gè)數(shù)組存儲(chǔ)滿(mǎn)了,如果你再往后追加元素,不夠用的時(shí)候呢,我第一個(gè)元素,第一個(gè)數(shù)組,我是不會(huì)去動(dòng)他的,我就往后擴(kuò)充,當(dāng)我第二個(gè)滿(mǎn)了我也不動(dòng)他,buckets籃子,他有好多個(gè)籃子,第i個(gè)籃子能放多少籃子呢,是前面籃子的個(gè)數(shù)乘以2,無(wú)鎖去做會(huì)有一個(gè)好處,你可以毫無(wú)顧忌的不斷地去做這個(gè)重試,在程序的各個(gè)地方你覺(jué)得有需要的地方,你就可以調(diào)用compareAndSet,失敗就失敗了,失敗也沒(méi)有什么關(guān)系
?
總結(jié)
- 上一篇: Java高并发程序设计前言
- 下一篇: JDK并发包1-1