java并发之SynchronousQueue实现原理
前言
SynchronousQueue是一個(gè)比較特別的隊(duì)列,由于在線程池方面有所應(yīng)用,為了更好的理解線程池的實(shí)現(xiàn)原理,筆者花了些時(shí)間學(xué)習(xí)了一下該隊(duì)列源碼(JDK1.8),此隊(duì)列源碼中充斥著大量的CAS語(yǔ)句,理解起來(lái)是有些難度的,為了方便日后回顧,本篇文章會(huì)以簡(jiǎn)潔的圖形化方式展示該隊(duì)列底層的實(shí)現(xiàn)原理。
SynchronousQueue簡(jiǎn)單使用
經(jīng)典的生產(chǎn)者-消費(fèi)者模式,操作流程是這樣的:
有多個(gè)生產(chǎn)者,可以并發(fā)生產(chǎn)產(chǎn)品,把產(chǎn)品置入隊(duì)列中,如果隊(duì)列滿了,生產(chǎn)者就會(huì)阻塞;
有多個(gè)消費(fèi)者,并發(fā)從隊(duì)列中獲取產(chǎn)品,如果隊(duì)列空了,消費(fèi)者就會(huì)阻塞;
如下面的示意圖所示:
SynchronousQueue 也是一個(gè)隊(duì)列來(lái)的,但它的特別之處在于它內(nèi)部沒(méi)有容器,一個(gè)生產(chǎn)線程,當(dāng)它生產(chǎn)產(chǎn)品(即put的時(shí)候),如果當(dāng)前沒(méi)有人想要消費(fèi)產(chǎn)品(即當(dāng)前沒(méi)有線程執(zhí)行take),此生產(chǎn)線程必須阻塞,等待一個(gè)消費(fèi)線程調(diào)用take操作,take操作將會(huì)喚醒該生產(chǎn)線程,同時(shí)消費(fèi)線程會(huì)獲取生產(chǎn)線程的產(chǎn)品(即數(shù)據(jù)傳遞),這樣的一個(gè)過(guò)程稱為一次配對(duì)過(guò)程(當(dāng)然也可以先take后put,原理是一樣的)。
我們用一個(gè)簡(jiǎn)單的代碼來(lái)驗(yàn)證一下,如下所示:
package com.concurrent;
import java.util.concurrent.SynchronousQueue;
public class SynchronousQueueDemo {
? ? public static void main(String[] args) throws InterruptedException {
? ? ? ? final SynchronousQueue<Integer> queue = new SynchronousQueue<Integer>();
? ? ? ? Thread putThread = new Thread(new Runnable() {
? ? ? ? ? ? @Override
? ? ? ? ? ? public void run() {
? ? ? ? ? ? ? ? System.out.println("put thread start");
? ? ? ? ? ? ? ? try {
? ? ? ? ? ? ? ? ? ? queue.put(1);
? ? ? ? ? ? ? ? } catch (InterruptedException e) {
? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? System.out.println("put thread end");
? ? ? ? ? ? }
? ? ? ? });
? ? ? ? Thread takeThread = new Thread(new Runnable() {
? ? ? ? ? ? @Override
? ? ? ? ? ? public void run() {
? ? ? ? ? ? ? ? System.out.println("take thread start");
? ? ? ? ? ? ? ? try {
? ? ? ? ? ? ? ? ? ? System.out.println("take from putThread: " + queue.take());
? ? ? ? ? ? ? ? } catch (InterruptedException e) {
? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? System.out.println("take thread end");
? ? ? ? ? ? }
? ? ? ? });
? ? ? ? putThread.start();
? ? ? ? Thread.sleep(1000);
? ? ? ? takeThread.start();
? ? }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
一種輸出結(jié)果如下:
put thread start
take thread start
take from putThread: 1
put thread end
take thread end
1
2
3
4
5
從結(jié)果可以看出,put線程執(zhí)行queue.put(1) 后就被阻塞了,只有take線程進(jìn)行了消費(fèi),put線程才可以返回。可以認(rèn)為這是一種線程與線程間一對(duì)一傳遞消息的模型。
SynchronousQueue實(shí)現(xiàn)原理
不像ArrayBlockingQueue、LinkedBlockingDeque之類的阻塞隊(duì)列依賴AQS實(shí)現(xiàn)并發(fā)操作,SynchronousQueue直接使用CAS實(shí)現(xiàn)線程的安全訪問(wèn)。由于源碼中充斥著大量的CAS代碼,不易于理解,所以按照筆者的風(fēng)格,接下來(lái)會(huì)使用簡(jiǎn)單的示例來(lái)描述背后的實(shí)現(xiàn)模型。
隊(duì)列的實(shí)現(xiàn)策略通常分為公平模式和非公平模式,接下來(lái)將分別進(jìn)行說(shuō)明。
公平模式下的模型:
公平模式下,底層實(shí)現(xiàn)使用的是TransferQueue這個(gè)內(nèi)部隊(duì)列,它有一個(gè)head和tail指針,用于指向當(dāng)前正在等待匹配的線程節(jié)點(diǎn)。
初始化時(shí),TransferQueue的狀態(tài)如下:
接著我們進(jìn)行一些操作:
1、線程put1執(zhí)行 put(1)操作,由于當(dāng)前沒(méi)有配對(duì)的消費(fèi)線程,所以put1線程入隊(duì)列,自旋一小會(huì)后睡眠等待,這時(shí)隊(duì)列狀態(tài)如下:
2、接著,線程put2執(zhí)行了put(2)操作,跟前面一樣,put2線程入隊(duì)列,自旋一小會(huì)后睡眠等待,這時(shí)隊(duì)列狀態(tài)如下:
3、這時(shí)候,來(lái)了一個(gè)線程take1,執(zhí)行了 take操作,由于tail指向put2線程,put2線程跟take1線程配對(duì)了(一put一take),這時(shí)take1線程不需要入隊(duì),但是請(qǐng)注意了,這時(shí)候,要喚醒的線程并不是put2,而是put1。為何? 大家應(yīng)該知道我們現(xiàn)在講的是公平策略,所謂公平就是誰(shuí)先入隊(duì)了,誰(shuí)就優(yōu)先被喚醒,我們的例子明顯是put1應(yīng)該優(yōu)先被喚醒。至于讀者可能會(huì)有一個(gè)疑問(wèn),明明是take1線程跟put2線程匹配上了,結(jié)果是put1線程被喚醒消費(fèi),怎么確保take1線程一定可以和次首節(jié)點(diǎn)(head.next)也是匹配的呢?其實(shí)大家可以拿個(gè)紙畫一畫,就會(huì)發(fā)現(xiàn)真的就是這樣的。
公平策略總結(jié)下來(lái)就是:隊(duì)尾匹配隊(duì)頭出隊(duì)。
執(zhí)行后put1線程被喚醒,take1線程的 take()方法返回了1(put1線程的數(shù)據(jù)),這樣就實(shí)現(xiàn)了線程間的一對(duì)一通信,這時(shí)候內(nèi)部狀態(tài)如下:
4、最后,再來(lái)一個(gè)線程take2,執(zhí)行take操作,這時(shí)候只有put2線程在等候,而且兩個(gè)線程匹配上了,線程put2被喚醒,
take2線程take操作返回了2(線程put2的數(shù)據(jù)),這時(shí)候隊(duì)列又回到了起點(diǎn),如下所示:
以上便是公平模式下,SynchronousQueue的實(shí)現(xiàn)模型。總結(jié)下來(lái)就是:隊(duì)尾匹配隊(duì)頭出隊(duì),先進(jìn)先出,體現(xiàn)公平原則。
非公平模式下的模型:
我們還是使用跟公平模式下一樣的操作流程,對(duì)比兩種策略下有何不同。非公平模式底層的實(shí)現(xiàn)使用的是TransferStack,
一個(gè)棧,實(shí)現(xiàn)中用head指針指向棧頂,接著我們看看它的實(shí)現(xiàn)模型:
1、線程put1執(zhí)行 put(1)操作,由于當(dāng)前沒(méi)有配對(duì)的消費(fèi)線程,所以put1線程入棧,自旋一小會(huì)后睡眠等待,這時(shí)棧狀態(tài)如下:
2、接著,線程put2再次執(zhí)行了put(2)操作,跟前面一樣,put2線程入棧,自旋一小會(huì)后睡眠等待,這時(shí)棧狀態(tài)如下:
3、這時(shí)候,來(lái)了一個(gè)線程take1,執(zhí)行了take操作,這時(shí)候發(fā)現(xiàn)棧頂為put2線程,匹配成功,但是實(shí)現(xiàn)會(huì)先把take1線程入棧,然后take1線程循環(huán)執(zhí)行匹配put2線程邏輯,一旦發(fā)現(xiàn)沒(méi)有并發(fā)沖突,就會(huì)把棧頂指針直接指向 put1線程
4、最后,再來(lái)一個(gè)線程take2,執(zhí)行take操作,這跟步驟3的邏輯基本是一致的,take2線程入棧,然后在循環(huán)中匹配put1線程,最終全部匹配完畢,棧變?yōu)榭?#xff0c;恢復(fù)初始狀態(tài),如下圖所示:
可以從上面流程看出,雖然put1線程先入棧了,但是卻是后匹配,這就是非公平的由來(lái)。
總結(jié)
SynchronousQueue由于其獨(dú)有的線程一一配對(duì)通信機(jī)制,在大部分平常開(kāi)發(fā)中,可能都不太會(huì)用到,但線程池技術(shù)中會(huì)有所使用,由于內(nèi)部沒(méi)有使用AQS,而是直接使用CAS,所以代碼理解起來(lái)會(huì)比較困難,但這并不妨礙我們理解底層的實(shí)現(xiàn)模型,在理解了模型的基礎(chǔ)上,有興趣的話再查閱源碼,就會(huì)有方向感,看起來(lái)也會(huì)比較容易,希望本文有所借鑒意義。
————————————————
from
java并發(fā)之SynchronousQueue實(shí)現(xiàn)原理_yanyan19880509的專欄-CSDN博客_synchronousqueue
總結(jié)
以上是生活随笔為你收集整理的java并发之SynchronousQueue实现原理的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: Mock工具之Mockito实战
- 下一篇: 《漫画算法》源码整理-1 时间复杂度 空