[一起读源码]走进C#并发队列ConcurrentQueue的内部世界 — .NET Core篇
在上一篇《走進(jìn)C#并發(fā)隊(duì)列ConcurrentQueue的內(nèi)部世界》中解析了Framework下的ConcurrentQueue實(shí)現(xiàn)原理,經(jīng)過(guò)拋磚引玉,得到了一眾大佬的指點(diǎn),找到了.NET Core版本下的ConcurrentQueue源碼,位于以下地址:
https://github.com/dotnet/runtime/blob/master/src/libraries/System.Private.CoreLib/src/System/Collections/Concurrent/ConcurrentQueue.cs
https://github.com/dotnet/runtime/blob/master/src/libraries/System.Private.CoreLib/src/System/Collections/Concurrent/ConcurrentQueueSegment.cs
我大致看了一下,雖然兩者的實(shí)現(xiàn)有不少相似的地方,不過(guò)在細(xì)節(jié)上新增了許多有意思的東西,還是覺(jué)得要單獨(dú)拉出來(lái)說(shuō)一下。畫(huà)外音:誰(shuí)叫我上篇立了flag,現(xiàn)在跪著也要寫(xiě)完。。????
必須要吐糟的是,代碼中ConcurrentQueue類(lèi)明明是包含在System.Collections.Concurrent命名空間下,但是源碼結(jié)構(gòu)中的文件卻放在System.Private.CoreLib目錄中,這是鬧哪出~
存儲(chǔ)結(jié)構(gòu)
從上面給出的源碼地址可以猜測(cè)出整個(gè)結(jié)構(gòu)依然是Segment+Queue的組合,通過(guò)一個(gè)Segment鏈表實(shí)現(xiàn)了Queue結(jié)構(gòu),但實(shí)際上內(nèi)部又加了新的設(shè)計(jì)。拋去Queue先不看的話,Segment本身就是一個(gè)實(shí)現(xiàn)了多生產(chǎn)者多消費(fèi)者的線程安全集合,甚至可以直接拿它當(dāng)一個(gè)固定容量的線程安全隊(duì)列使用,這點(diǎn)與之前Framework中差別很大。如果結(jié)合Queue整體來(lái)看,Segment不再是固定容量,而是可以由Queue來(lái)控制每個(gè)Segment的容量大小(最小是32,上限是1024 * 1024)。
在Framework中,隊(duì)列會(huì)給每個(gè)Segment分配一個(gè)索引,雖然這個(gè)索引是long類(lèi)型的,但理論上說(shuō)隊(duì)列容量還是存在上限。在Core中就不一樣了,它取消了這個(gè)索引,真正實(shí)現(xiàn)了一個(gè)無(wú)邊界(unbounded)隊(duì)列。
我猜測(cè)的原因是,在Framework中由于每個(gè)Segment是固定大小的,維護(hù)一個(gè)索引可以很方便的計(jì)算隊(duì)列里的元素?cái)?shù)量,但是Core中的Segment大小不是固定的,使用索引并不能加快計(jì)算速度,使得這個(gè)索引不再有意義,這也意味著計(jì)算元素?cái)?shù)量變得非常復(fù)雜。
一張圖看清它的真實(shí)面目,這里繼續(xù)沿用上一篇的結(jié)構(gòu)圖稍作修改:
從圖中可以看到,整體結(jié)構(gòu)上基本一致,核心改動(dòng)就是Segment中增加了Slot(槽)的概念,這是真正存儲(chǔ)數(shù)據(jù)的地方,同時(shí)有一個(gè)序列號(hào)與之對(duì)應(yīng)。
從代碼來(lái)看一下Segment的核心定義:
internal sealed class ConcurrentQueueSegment<T> {//存放數(shù)據(jù)的容器internal readonly Slot[] _slots;//這個(gè)mask用來(lái)計(jì)算槽點(diǎn),可以防止查找越界internal readonly int _slotsMask;//首尾位置指針internal PaddedHeadAndTail _headAndTail;//觀察保留標(biāo)記,表示當(dāng)前段在出隊(duì)時(shí)能否刪除數(shù)據(jù)internal bool _preservedForObservation;//標(biāo)記當(dāng)前段是否被鎖住internal bool _frozenForEnqueues;//下一段的指針internal ConcurrentQueueSegment<T>? _nextSegment; }其中_preservedForObservation和_frozenForEnqueues會(huì)比較難理解,后面再詳細(xì)介紹。
再看一下隊(duì)列的核心定義:
public class ConcurrentQueue<T> : IProducerConsumerCollection<T>, IReadOnlyCollection<T> {//每一段的初始化長(zhǎng)度,也是最小長(zhǎng)度private const int InitialSegmentLength = 32;//每一段的最大長(zhǎng)度private const int MaxSegmentLength = 1024 * 1024;//操作多個(gè)段時(shí)的鎖對(duì)象private readonly object _crossSegmentLock;//尾段指針private volatile ConcurrentQueueSegment<T> _tail;//首段指針private volatile ConcurrentQueueSegment<T> _head; }常規(guī)操作
還是按上一篇的套路為主線循序漸進(jìn)。
創(chuàng)建實(shí)例
ConcurrentQueue依然提供了2個(gè)構(gòu)造函數(shù),分別可以創(chuàng)建一個(gè)空隊(duì)列和指定數(shù)據(jù)集的隊(duì)列。
/// <summary> /// Initializes a new instance of the <see cref="ConcurrentQueue{T}"/> class. /// </summary> public ConcurrentQueue() {_crossSegmentLock = new object();_tail = _head = new ConcurrentQueueSegment<T>(InitialSegmentLength); }還是熟悉的操作,創(chuàng)建了一個(gè)長(zhǎng)度是32的Segment并把隊(duì)列的首尾指針都指向它,同時(shí)創(chuàng)建了鎖對(duì)象實(shí)例,僅此而已。
進(jìn)一步看看Segment是怎么創(chuàng)建的:
再看看怎么用集合初始化隊(duì)列,這個(gè)過(guò)程稍微麻煩點(diǎn),但是很有意思:
public ConcurrentQueue(IEnumerable<T> collection) {if (collection == null){ThrowHelper.ThrowArgumentNullException(ExceptionArgument.collection);}_crossSegmentLock = new object();//計(jì)算得到第一段的長(zhǎng)度int length = InitialSegmentLength;if (collection is ICollection<T> c){int count = c.Count;if (count > length){length = Math.Min(ConcurrentQueueSegment<T>.RoundUpToPowerOf2(count), MaxSegmentLength);}}//根據(jù)前面計(jì)算出來(lái)的長(zhǎng)度創(chuàng)建一個(gè)Segment,再把數(shù)據(jù)依次入隊(duì)_tail = _head = new ConcurrentQueueSegment<T>(length);foreach (T item in collection){Enqueue(item);} }可以看到,第一段的大小是根據(jù)初始集合的大小確定的,如果集合大小count大于32就對(duì)count進(jìn)行向上取2的N次冪(RoundUpToPowerOf2)得到實(shí)際大小(但是不能超過(guò)最大值),否則就按默認(rèn)值32來(lái)初始化。
向上取2的N次冪到底是啥意思??例如count是5,那得到的結(jié)果就是8(2×2×2);如果count是9,那結(jié)果就是16(2×2×2×2);如果剛好count是8那結(jié)果就是8(2×2×2),具體算法是通過(guò)位運(yùn)算實(shí)現(xiàn)的很有意思。至于為什么一定要是2的N次冪,中間的玄機(jī)我也沒(méi)搞明白。。
順藤摸瓜,再看看進(jìn)隊(duì)操作如何實(shí)現(xiàn)。
元素進(jìn)隊(duì)
/// <summary>在隊(duì)尾追加一個(gè)元素</summary> public void Enqueue(T item) {// 先嘗試在尾段插入一個(gè)元素if (!_tail.TryEnqueue(item)){// 如果插入失敗,就意味著尾段已經(jīng)填滿,需要往后擴(kuò)容EnqueueSlow(item);} }private void EnqueueSlow(T item) {while (true){ConcurrentQueueSegment<T> tail = _tail;// 先嘗試再隊(duì)尾插入元素,如果擴(kuò)容完成了就會(huì)成功if (tail.TryEnqueue(item)){return;}// 獲得一把鎖,避免多個(gè)線程同時(shí)進(jìn)行擴(kuò)容lock (_crossSegmentLock){//檢查是否擴(kuò)容過(guò)了if (tail == _tail){// 尾段凍結(jié)tail.EnsureFrozenForEnqueues();// 計(jì)算下一段的長(zhǎng)度int nextSize = tail._preservedForObservation ? InitialSegmentLength : Math.Min(tail.Capacity * 2, MaxSegmentLength);var newTail = new ConcurrentQueueSegment<T>(nextSize);// 改變隊(duì)尾指向tail._nextSegment = newTail;// 指針交換_tail = newTail;}}} }從以上流程可以看到,擴(kuò)容的主動(dòng)權(quán)不再由Segment去控制,而是交給了隊(duì)列。正因?yàn)槿绱?#xff0c;所以在跨段操作時(shí)要先加鎖,在Framework版本中是在原子操作獲得指針后進(jìn)行的擴(kuò)容所以不會(huì)有這個(gè)問(wèn)題,后面的出隊(duì)操作也是一樣的道理。擴(kuò)容過(guò)程中有兩個(gè)細(xì)節(jié)需要重點(diǎn)關(guān)注,那就是SegmentFrozen和下一段的長(zhǎng)度計(jì)算。
從前面Segment的定義中我們看到它維護(hù)了一個(gè)_frozenForEnqueues標(biāo)記字段,表示當(dāng)前段是否被凍結(jié)鎖定,在被鎖住的情況下會(huì)讓其他入隊(duì)操作失敗,看一下實(shí)現(xiàn)過(guò)程:
首先判斷當(dāng)前凍結(jié)狀態(tài),然后把它設(shè)置為true,再使用原子操作把尾指針增加了2倍段長(zhǎng)的偏移量,這個(gè)尾指針才是真正限制當(dāng)前段不可新增元素的關(guān)鍵點(diǎn),后面講段的元素追加再關(guān)聯(lián)起來(lái)詳細(xì)介紹。而為什么要指定2倍段長(zhǎng)這么一個(gè)特殊值呢,目的是為了把尾指針和mask做運(yùn)算后落在同一個(gè)slot上,也就是說(shuō)雖然兩個(gè)指針位置不一樣但是都指向的是同一個(gè)槽。
再說(shuō)說(shuō)下一段長(zhǎng)度的計(jì)算問(wèn)題,它主要是受_preservedForObservation這個(gè)字段影響,正常情況下一段的長(zhǎng)度是尾段的2倍,但如果尾段正好被標(biāo)記為觀察保留(類(lèi)似于上一篇的截取快照),那么下一段的長(zhǎng)度依然是初始值32,原作者認(rèn)為入隊(duì)操作不是很頻繁,這樣做主要是為了避免浪費(fèi)空間。
接著是重頭戲,看一下如何給段追加元素:
public bool TryEnqueue(T item) {Slot[] slots = _slots;// 如果發(fā)生競(jìng)爭(zhēng)就自旋等待SpinWait spinner = default;while (true){// 獲取當(dāng)前段的尾指針int currentTail = Volatile.Read(ref _headAndTail.Tail);// 計(jì)算槽點(diǎn)int slotsIndex = currentTail & _slotsMask;// 讀取對(duì)應(yīng)槽的序列號(hào)int sequenceNumber = Volatile.Read(ref slots[slotsIndex].SequenceNumber);// 判斷槽點(diǎn)序列號(hào)和指針是否匹配int diff = sequenceNumber - currentTail;if (diff == 0){// 通過(guò)原子操作比較交換,保證了只有一個(gè)入隊(duì)者獲得可用空間if (Interlocked.CompareExchange(ref _headAndTail.Tail, currentTail + 1, currentTail) == currentTail){// 把數(shù)據(jù)存入對(duì)應(yīng)的槽點(diǎn),以及更新序列號(hào)slots[slotsIndex].Item = item;Volatile.Write(ref slots[slotsIndex].SequenceNumber, currentTail + 1);return true;}}else if (diff < 0){// 序列號(hào)小于指針就說(shuō)明該段已經(jīng)裝滿了,直接返回falsereturn false;}// 這次競(jìng)爭(zhēng)失敗了,只好等下去spinner.SpinOnce(sleep1Threshold: -1);} }整個(gè)流程的核心就是借助槽點(diǎn)序列號(hào)和尾指針的匹配情況判斷是否有可用空間,因?yàn)樵诔跏蓟臅r(shí)候序列號(hào)是從0遞增,正常情況下尾指針和序列號(hào)肯定是匹配的,只有在整個(gè)段被裝滿時(shí)尾指針才會(huì)大于序列號(hào),因?yàn)榍懊娴膬鼋Y(jié)操作會(huì)給尾指針追加2倍段長(zhǎng)的偏移量。要重點(diǎn)提出的是,只有在數(shù)據(jù)被寫(xiě)入并且序列號(hào)更新完成后才表示整個(gè)位置的元素有效,才能有出隊(duì)的機(jī)會(huì),在Framework是通過(guò)維護(hù)一個(gè)狀態(tài)位來(lái)實(shí)現(xiàn)這個(gè)功能。整個(gè)設(shè)計(jì)很有意思,要慢慢品。
這里我們可以總結(jié)一下序列號(hào)的核心作用:假設(shè)一個(gè)槽點(diǎn)N,對(duì)應(yīng)序列號(hào)是Q,它能允許入隊(duì)的必要條件之一就是N==Q,由于入隊(duì)操作把位置N的序列號(hào)修改成N+1,那么可以猜測(cè)出在出隊(duì)時(shí)的必要條件之一就是滿足Q==N+1。
代碼中的CompareExchange在上一篇中有介紹,這里不再重復(fù)。另外關(guān)于Volatile相關(guān)的稍微提一下,它的核心作用是避免內(nèi)存與CPU之間的高速緩存帶來(lái)的數(shù)據(jù)不一致問(wèn)題,告訴編譯器直接讀寫(xiě)原始數(shù)據(jù),有興趣的可以找資料了解,限于篇幅不過(guò)多介紹。
元素出隊(duì)
可以猜測(cè)到,入隊(duì)的時(shí)候要根據(jù)容量大小進(jìn)行擴(kuò)容,那么與之對(duì)應(yīng)的,出隊(duì)的時(shí)候就需要對(duì)它進(jìn)行壓縮,也就是丟棄沒(méi)有數(shù)據(jù)的段。
/// <summary>從隊(duì)首移除一個(gè)元素</summary> public bool TryDequeue([MaybeNullWhen(false)] out T result) =>_head.TryDequeue(out result) ||TryDequeueSlow(out result);private bool TryDequeueSlow([MaybeNullWhen(false)] out T item) {// 不斷循環(huán)嘗試出隊(duì),直到成功或失敗為止while (true){ConcurrentQueueSegment<T> head = _head;// 嘗試從隊(duì)首移除,如果成功就直接返回了if (head.TryDequeue(out item)){return true;}// 如果首段為空并且沒(méi)有下一段了,則說(shuō)明整個(gè)隊(duì)列都沒(méi)有數(shù)據(jù)了,返回失敗if (head._nextSegment == null){item = default!;return false;}// 既然下一段不為空,那就再次確認(rèn)本段是否還能出隊(duì)成功,否則就要把它給移除了,等待下次循環(huán)從下一段出隊(duì)if (head.TryDequeue(out item)){return true;}// 首段指針要往后移動(dòng),表示當(dāng)前首段已丟棄,跨段操作要先加鎖lock (_crossSegmentLock){if (head == _head){_head = head._nextSegment;}}} }整體流程基本和入隊(duì)一樣,外層通過(guò)一個(gè)死循環(huán)不斷嘗試操作,直到出隊(duì)成功或者隊(duì)列為空返回失敗為止。釋放空間的操作也從Segment轉(zhuǎn)移到隊(duì)列上,所以要加鎖保證線程安全。這一步我在代碼注釋中寫(xiě)的很詳細(xì)就不多解釋了,再看一下核心操作Segment是如何移除元素的:
public bool TryDequeue([MaybeNullWhen(false)] out T item) {Slot[] slots = _slots;// 遇到競(jìng)爭(zhēng)時(shí)自旋等待SpinWait spinner = default;while (true){// 獲取頭指針地址int currentHead = Volatile.Read(ref _headAndTail.Head);// 計(jì)算槽點(diǎn)int slotsIndex = currentHead & _slotsMask;// 獲取槽點(diǎn)對(duì)應(yīng)的序列號(hào)int sequenceNumber = Volatile.Read(ref slots[slotsIndex].SequenceNumber);// 比較序列號(hào)是否和期望值一樣,為什么要加1的原因前面入隊(duì)時(shí)說(shuō)過(guò)int diff = sequenceNumber - (currentHead + 1);if (diff == 0){// 通過(guò)原子操作比較交換得到可以出隊(duì)的槽點(diǎn),并把頭指針往后移動(dòng)一位if (Interlocked.CompareExchange(ref _headAndTail.Head, currentHead + 1, currentHead) == currentHead){// 取出數(shù)據(jù)item = slots[slotsIndex].Item!;// 此時(shí)如果該段沒(méi)有被標(biāo)記觀察保護(hù),要把這個(gè)槽點(diǎn)的數(shù)據(jù)清空if (!Volatile.Read(ref _preservedForObservation)){slots[slotsIndex].Item = default;Volatile.Write(ref slots[slotsIndex].SequenceNumber, currentHead + slots.Length);}return true;}}else if (diff < 0){// 這種情況說(shuō)明該段已經(jīng)沒(méi)有有效數(shù)據(jù)了,直接返回失敗。bool frozen = _frozenForEnqueues;int currentTail = Volatile.Read(ref _headAndTail.Tail);if (currentTail - currentHead <= 0 || (frozen && (currentTail - FreezeOffset - currentHead <= 0))){item = default!;return false;}}// 競(jìng)爭(zhēng)失敗進(jìn)入下一輪等待spinner.SpinOnce(sleep1Threshold: -1);} }流程和追加元素類(lèi)似,大部分都寫(xiě)在備注里面了,這里只額外提一下為空的情況。Segment為空只有一種情況,那就是頭尾指針落在了同一個(gè)槽點(diǎn),但這是會(huì)出現(xiàn)兩種可能性:
第一種是都落在了非最后一個(gè)槽點(diǎn),意味著該段沒(méi)有被裝滿,拿首尾指針相減即可判斷。
第二種是都落在了最后一個(gè)槽點(diǎn),意味著該段已經(jīng)被裝滿了,如果此時(shí)正在進(jìn)行擴(kuò)容(frozen),那么必須要在尾指針的基礎(chǔ)上減去FreezeOffset再去和頭指針判斷,原因前面有說(shuō)過(guò);
是不是感覺(jué)環(huán)環(huán)相扣、相輔相成、如膠似漆、balabala.....????
統(tǒng)計(jì)元素?cái)?shù)量
前面也預(yù)告過(guò),因?yàn)殛?duì)列不再維護(hù)段索引,這樣會(huì)導(dǎo)致計(jì)算元素?cái)?shù)量變得非常復(fù)雜,復(fù)雜到我都不想說(shuō)這一部分了????。簡(jiǎn)單描述一下就跳過(guò)了:核心思路就是一段一段來(lái)遍歷,然后計(jì)算出每段的大小最后把結(jié)果累加,如果涉及多個(gè)段還得加鎖,具體到段內(nèi)部就要根據(jù)首尾指針計(jì)算槽點(diǎn)得出實(shí)際數(shù)量等等等等,代碼很長(zhǎng)就不貼出來(lái)了。
這里也嚴(yán)重提醒一句,非必要情況下不要調(diào)用Count不要調(diào)用Count不要調(diào)用Count。
接下來(lái)重點(diǎn)說(shuō)一下隊(duì)列的IsEmpty。由于Segment不再維護(hù)IsEmpty信息,所以實(shí)現(xiàn)方式就有點(diǎn)曲線救國(guó)了,通過(guò)嘗試能否從隊(duì)首位置獲取一個(gè)元素來(lái)判斷是否隊(duì)列為空,也就是常說(shuō)的TryPeek操作,但細(xì)節(jié)上稍有不同。
/// <summary> /// 判斷隊(duì)列是否為空,千萬(wàn)不要使用Count==0來(lái)判斷,也不要直接TryPeek /// </summary> public bool IsEmpty => !TryPeek(out _, resultUsed: false);private bool TryPeek([MaybeNullWhen(false)] out T result, bool resultUsed) {ConcurrentQueueSegment<T> s = _head;while (true){ConcurrentQueueSegment<T>? next = Volatile.Read(ref s._nextSegment);// 從首段中獲取頭部元素,成功的話直接返回true,獲取失敗就意味著首段為空了if (s.TryPeek(out result, resultUsed)){return true;}// 如果下一段不為空那就再嘗試從下一段重新獲取if (next != null){s = next;}//如果下一段為空就說(shuō)明整個(gè)隊(duì)列為空,跳出循環(huán)直接返回false了else if (Volatile.Read(ref s._nextSegment) == null){break;}}result = default!;return false; }上面的代碼可以看到有一個(gè)特殊的參數(shù)resultUsed,它具體會(huì)有什么影響呢,那就得看看Segment是如何peek的:
除了最開(kāi)始的resultUsed判斷,其他的基本和出隊(duì)的邏輯一致,前面說(shuō)的很詳細(xì),這里不多介紹了。
枚舉轉(zhuǎn)換數(shù)據(jù)
前面反復(fù)的提到觀察保護(hù),這究竟是個(gè)啥意思??為什么要有這個(gè)操作??
其實(shí)看過(guò)上一篇文章的話就比較好理解一點(diǎn),這里稍微回顧一下方便對(duì)比。在Framework中會(huì)有截取快照的操作,也就是類(lèi)似ToArray\ToList\GetEnumerator這種要做數(shù)據(jù)迭代,它是通過(guò)原子操作維護(hù)一個(gè)m_numSnapshotTakers字段來(lái)實(shí)現(xiàn)對(duì)數(shù)據(jù)的保護(hù),目的是為了告訴其他出隊(duì)的線程我正在遍歷數(shù)據(jù),你們執(zhí)行出隊(duì)的時(shí)候不要把數(shù)據(jù)給刪了我要用的。在Core中也是為了實(shí)現(xiàn)同樣的功能才引入了觀察保護(hù)的概念,換了一種實(shí)現(xiàn)方式而已。
那么就以ToArray為例是怎么和其他操作交互的:
public T[] ToArray() {// 這一步可以理解為保護(hù)現(xiàn)場(chǎng)SnapForObservation(out ConcurrentQueueSegment<T> head, out int headHead, out ConcurrentQueueSegment<T> tail, out int tailTail);// 計(jì)算隊(duì)列長(zhǎng)度,這也是要返回的數(shù)組大小long count = GetCount(head, headHead, tail, tailTail);T[] arr = new T[count];// 開(kāi)始迭代數(shù)據(jù)塞到目標(biāo)數(shù)組中using (IEnumerator<T> e = Enumerate(head, headHead, tail, tailTail)){int i = 0;while (e.MoveNext()){arr[i++] = e.Current;}Debug.Assert(count == i);}return arr; }上面的代碼中,有一次獲取隊(duì)列長(zhǎng)度的操作,還有一次獲取迭代數(shù)據(jù)的操作,這兩步邏輯比較相似都是對(duì)整個(gè)隊(duì)列進(jìn)行遍歷,所以做一次數(shù)據(jù)轉(zhuǎn)換的開(kāi)銷(xiāo)非常非常大,使用的時(shí)候一定要謹(jǐn)慎。別的不多說(shuō),重點(diǎn)介紹一下如何實(shí)現(xiàn)保護(hù)現(xiàn)場(chǎng)的過(guò)程:
private void SnapForObservation(out ConcurrentQueueSegment<T> head, out int headHead, out ConcurrentQueueSegment<T> tail, out int tailTail) {// 要保護(hù)現(xiàn)場(chǎng)肯定要先來(lái)一把鎖lock (_crossSegmentLock){head = _head;tail = _tail;// 一段一段進(jìn)行遍歷for (ConcurrentQueueSegment<T> s = head; ; s = s._nextSegment!){// 把每一段的觀察保護(hù)標(biāo)記設(shè)置成trues._preservedForObservation = true;// 遍歷到最后一段了就結(jié)束if (s == tail) break;}// 尾段凍結(jié),這樣就不能新增元素tail.EnsureFrozenForEnqueues();// 返回兩個(gè)指針地址用來(lái)對(duì)每一個(gè)元素進(jìn)行遍歷headHead = Volatile.Read(ref head._headAndTail.Head);tailTail = Volatile.Read(ref tail._headAndTail.Tail);} }可以看到上來(lái)就是一把鎖,如果此時(shí)正在進(jìn)行擴(kuò)容或者收容的操作會(huì)直接阻塞掉,運(yùn)氣好沒(méi)有阻塞的話你也不能有新元素入隊(duì)了,因?yàn)槲捕我呀?jīng)凍結(jié)鎖死只能自旋等待,而出隊(duì)也不能釋放空間了。原話是:
At this point, any dequeues from any segment won't overwrite the value, and none of the existing segments can have new items enqueued.
有人就要問(wèn),這里把尾段鎖死那等ToArray()完成后豈不是也不能有新元素入隊(duì)了?不用擔(dān)心,前面入隊(duì)邏輯提到過(guò)如果該段被鎖住隊(duì)列會(huì)新創(chuàng)建一個(gè)段然后再嘗試入隊(duì),這樣就能成功了。但是問(wèn)題又來(lái)了,假如前面的段還有很多空位,那豈不是有浪費(fèi)空間的嫌疑?我們知道沒(méi)有觀察保護(hù)的時(shí)候每段會(huì)以2倍長(zhǎng)度遞增,這樣的話空間浪費(fèi)率還是挺高的。帶著疑問(wèn)提了個(gè)Issue問(wèn)一下:
https://github.com/dotnet/runtime/issues/35094
到這里就基本把.NET Core ConcurrentQueue說(shuō)完了。
總結(jié)
對(duì)比Framework下的并發(fā)隊(duì)列,Core里面的改動(dòng)還是不小的,盡管保留了SpinWait和Interlocked相關(guān)操作,但是也加入了lock,邏輯上也復(fù)雜了很多,我一步步分析和寫(xiě)文章搞了好幾天。
至于性能對(duì)比,我找到一個(gè)官方給出的測(cè)試結(jié)果,有興趣的可以看看:
https://github.com/dotnet/runtime/issues/27458#issuecomment-423964046
最后強(qiáng)行打個(gè)廣告,基于.NET Core平臺(tái)的開(kāi)源分布式任務(wù)調(diào)度系統(tǒng)ScheduleMaster有興趣的star支持一下,2.0版本即將上線:
https://github.com/hey-hoho/ScheduleMasterCore
https://gitee.com/hey-hoho/ScheduleMasterCore(只從github同步)
總結(jié)
以上是生活随笔為你收集整理的[一起读源码]走进C#并发队列ConcurrentQueue的内部世界 — .NET Core篇的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 同步异步多线程这三者关系,你能给面试官一
- 下一篇: EFCore.Sharding(EFCo