异步广度优先搜索算法
為什么要異步?
CPU的工藝越來越小,Cannon Lake架構的Intel CPU已經達到10nm技術,因此在面積不變的情況下,核心數可以明顯提升。單純的提升主頻將造成發熱量大、需要的電壓大、功耗大的問題。而傳統的算法與數據結構是針對單核心單線程同步而言的,因此,傳統的算法無法將CPU利用率達到最大。
廣度優先搜索
首先我們先了解一下與之對應的深度優先搜索(DFS),深度優先搜索即像走迷宮時,始終保持左手與左側墻壁接觸,換言之即遇到岔路時永遠向左拐,從而尋找出口。而廣度優先搜索,則在每個岔路時,變出一個分身,繼續前進。
但是,實際上是這樣嗎?答案是否定的,剛剛已經講到,傳統的算法與數據結構是建立在單線程同步基礎上的,因此傳統算法只能夠模擬分身在同時前進,這時就要引入隊列來保存和展開岔路節點(當遇到新岔路時,將這個節點放入隊列。隊列頭部元素進行展開尋找新的岔路并放入隊列尾部)。
基于Parallel的并行廣度優先搜索
而在并行或異步以及多線程的環境下,我們可以真的讓“分身”們同時前進。首先使用并行廣度優先搜索的前提是你不在意真的保證了廣度是同步的,雖然并行廣度優先搜索能夠尋找到全部解,但是無法保證同一時刻進行搜索的任務是在同一深度的。
在這一前提下,我們以遍歷圖為例,首先定義鄰接表的數據結構:
public class Node{ ? ?public string Value { get; set; } ? ?public LinkedList<Node> Nodes { get; set; } =
? ? ? ?new LinkedList<Node>(); }
假設我們的圖結構如下:
進行數據的初始化:
public static void Main(string[] args){ ???var A = new Node { Value = "A" }; ? ?
? ?var B = new Node { Value = "B" }; ? ?
? ?var C = new Node { Value = "C" };
??var D = new Node { Value = "D" }; ?
?? var E = new Node { Value = "E" }; ? ?
?? var F = new Node { Value = "F" }; ? ?
?? var G = new Node { Value = "G" };A.Nodes.AddLast(B);A.Nodes.AddLast(C);A.Nodes.AddLast(D);B.Nodes.AddLast(A);B.Nodes.AddLast(D);C.Nodes.AddLast(A);D.Nodes.AddLast(A);D.Nodes.AddLast(B);D.Nodes.AddLast(E);E.Nodes.AddLast(D);E.Nodes.AddLast(F);F.Nodes.AddLast(E);F.Nodes.AddLast(G);G.Nodes.AddLast(F); ? ?
?? // TODO: Async visit}
在此處,姑且認為Node.GetHashCode()可以作為Node的唯一標識,我們來定義一個HashSet來存儲已經訪問過的Node標識:
private static HashSet<int> Visited = new HashSet<int>();此時,我們只需要按照正常編寫深度優先搜索的遞歸方法編寫即可,但其中的循環使用Parallel提供的循環方法,這樣即可實現廣度搜索:
public void Visit(Node n){ ? ?lock (Visited){ ? ? ? ?if (Visited.Contains(n.GetHashCode())){ ? ? ? ? ? ?return;}Visited.Add(n.GetHashCode());}Console.WriteLine($"{ n.Value } ");Parallel.ForEach(n.Nodes, x => {Visit(x);}); }
基于Task的異步廣度優先搜索
如果我們需要在進行搜索時,保持同一個時間點的任務所涉及到的節點的深度一致,我們就需要將上述方法改寫成異步方式,并使用異步信號量來使處于同一深度的Task等待同深度其他Task完成:
首先定義一個異步信號控制器類AsyncSemaphore,其中包含一個公共構造方法和兩個公共方法:
public class AsyncSemaphore{ ? ?public void AddTaskCount(int) ? ?public Task WaitAsync(); ? ?public void Release(); }該類被初始化時,認為需要等待的任務數量為0,通過調用AddTaskCount來增加需要等待的任務數,WaitAsync被調用時,將先判斷需要等待的任務數量是否與已經完成的任務數量相等,如果相等則不等待,不相等則返回一個等待信號。當Release被調用后,判斷需要等待的任務數量是否與已經完成的任務數量相等,如果相等則置所有等待信號放行。
因此這個類的具體實現如下:
public class AsyncSemaphore{ ??private int m_totalCount = 0; ?
?private int m_finishedCount = 0; ?
?
? ?private readonly List<TaskCompletionSource<bool>> m_waiters =
? ? new List<TaskCompletionSource<bool>>(); ?
? ?
? ?private readonly static Task s_completed =
? ? ? ? ? ? ? ? ? ? ? ? ?Task.FromResult(true); ? ?
? ?
? ?public void AddTaskCount(int count) ? ?{m_totalCount += count;} ?
? ?
? ??public Task WaitAsync() ? ?{ ? ?
? ??? ?lock (m_waiters){ ? ? ? ? ?
? ??? ? ? ?if (m_finishedCount == m_totalCount){ ? ? ? ? ?
? ??? ? ? ? ? ? ?return s_completed;} ? ? ? ? ?
? ??? ? ? ? ?else{ ? ? ? ?
? ??? ? ? ? ?? ?var waiter = new TaskCompletionSource<bool>();m_waiters.Add(waiter); ? ? ? ? ?
? ??? ? ? ? ?? ??return waiter.Task;}}} ?
?
? ??public void Release() ? ?{ ? ? ? ?lock (m_waiters){m_finishedCount++; ? ? ?
? ?? ? ? ? ?if (m_finishedCount == m_totalCount){Parallel.ForEach(m_waiters, x => {x.SetResult(true);});m_waiters.Clear();}}} }
在編寫Visit方法之前,我們需要對每個深度設置一個鎖,因此我們需要定義一個Dictionary來存儲各個深度(或叫層)的鎖:
private static Dictionary<int, AsyncSemaphore> Lockers =new Dictionary<int, AsyncSemaphore>();
同時,需要為起點層預設一個鎖:
Lockers.Add(0, new AsyncSemaphore()); Lockers[0].AddTaskCount(1);接下來編寫VisitAsync方法,該方法是一個異步函數,第一個參數接收節點,第二個參數為當前深度,起點深度為0。
public static async Task VisitAsync(Node n, int deep = 0)在VisitAsync方法被調用時,應先檢查該節點是否被訪問:
lock (Visited) { ?? ?if (Visited.Contains(n.GetHashCode())) ? ?{ ? ? ? ?Lockers[deep].Release();return;}Visited.Add(n.GetHashCode()); }
這里需要額外說明的一點就是,如果這個節點被訪問過,也是需要釋放鎖的。因為在后面的節點展開代碼中,我們并沒有過濾節點是否被訪問過,因此訪問過的節點也包含在了AddTaskCount()的參數中。
接下來,我們需要檢查下一層的鎖有無被初始化:
lock(Lockers) { ? ?if (!Lockers.ContainsKey(deep + 1)) ? ?{ ? ? ? ?Lockers.Add(deep + 1, new AsyncSemaphore());} }這些準備工作完成后,即可輸出當前節點的Value,輸出后,我們計算一下當前節點有多少子節點,將這個數值累加到下一層的異步鎖中,添加完畢后,通知本層鎖已經完成了一個任務并等待本層其他任務完成后,繼續展開本節點:
Console.Write($"{ n.Value } "); Lockers[deep + 1].AddTaskCount(n.Nodes.Count); Lockers[deep].Release(); await Lockers[deep].WaitAsync(); Parallel.ForEach(n.Nodes, x => {VisitAsync(x, deep + 1); });運行調試時,我們可以觀察到A永遠是第一個輸出的,結尾順序永遠是EFG,而第二層的順序是不固定的。因此證明了廣度優先搜索是成功的。以上即為基于異步實現的廣度優先搜索。
原文地址:http://www.1234.sh/post/async-bfs-algorithm
.NET社區新聞,深度好文,微信中搜索dotNET跨平臺或掃描二維碼關注
贊賞
人贊賞
創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎總結
以上是生活随笔為你收集整理的异步广度优先搜索算法的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: [Asp.Net Core轻量级Aop解
- 下一篇: 开箱即用 - jwt 无状态分布式授权