基于TableStore构建简易海量Topic消息队列
摘要: 前言 消息隊列,通常有兩種場景,一種是發布者訂閱模式,一種是生產者消費者模式。發布者訂閱模式,即發布者生產消息放入隊列,多個監聽的消費者都會收到同一份消息,也就是每個消費者收到的消息是一樣的。生產者消費者模式,生產者生產消息放入隊列,多個消費者同時監聽隊列,誰先搶到消息就會從隊列中取走消息,最終每個消息只會有一個消費者擁有。
前言
消息隊列,通常有兩種場景,一種是發布者訂閱模式,一種是生產者消費者模式。發布者訂閱模式,即發布者生產消息放入隊列,多個監聽的消費者都會收到同一份消息,也就是每個消費者收到的消息是一樣的。生產者消費者模式,生產者生產消息放入隊列,多個消費者同時監聽隊列,誰先搶到消息就會從隊列中取走消息,最終每個消息只會有一個消費者擁有。
在大數據時代,傳統的生產者消費者隊列模式中的Topic數目可能從少量的幾個變為海量topic。例如要實現一個全網爬蟲抓取任務調度系統,每個大型的門戶,SNS都會成為一個topic。在topic內部也會有海量的子網頁需要抓取。在實現這樣的一個任務分發調度系統時可能會遇到以下一些問題:
海量的topic,意味著我們可能會有海量的隊列。針對爬蟲場景,根據網頁類型,一類網站對應到一個任務隊列,不同的任務隊列會有自己的生產者和消費者。
生產者和消費者會有多個,在業務峰值期間,產生較大并發訪問,消息總量也是海量。針對爬蟲任務消息總量可能就是全網的網頁地址數量。
任務可能會有優先級,為了實現優先級高的任務優先調度,我們可能會在一個topic下再細分子隊列。
消息消費不能丟失,如果是作為任務的調度消息,我們的消息丟失失零容忍的。
消費者模式中如果消費者因為種種原因處理失敗或者超時,需要支持消息被重新調度。
在保證消息一定會被處理的前提下,我們也要避免少量消息因為各種原因處理堆積,而影響整個系統的吞吐。因為消息讀區往往是輕量級,消息的處理是資源密集型。我們不希望因為消息讀區堆積導致處理資源閑置。
解決方案
基于TableStore(表格存儲)的跨分區高并發,主鍵自增列這個特性又很好的適配到我們的隊列特性。支持海量,不同分區鍵下使用各自的自增主鍵,可以很好的實現海量隊列。具體我們給出如下方案:
需要設計以下表:
任務消息表
消息消費checkpoint表
全量消息表
在介紹表設計之前,先做一些名詞解釋。
每個任務消息,我們假設已有一個唯一的id。
任務優先級,我們假設優先級范圍是固定并且已經知道,如果任務優先級過多,可以分層,例如優先級1~100的映射到層級1。這里如果我們的任務沒有優先級,那可以根據任務數據量級做一個簡單的分桶,然后輪訓抓取每個分桶中的任務。
兩個游標,對應到每個topic的每個優先層級,我們需要記錄2個游標位移點。一個是抓取掃描游標,一個是完成游標。掃描游標的定義是指當前任務當前優先層級下,被掃描到的最大位移位置。完成位移點表示改任務當前優先層級下,最大的抓取完成位移點,之前的任務都已經完成抓取。
表設計
任務消息表
這里,每一個子任務都會被插入這張表,任務可能由不同的爬蟲端抓取后產生子任務,在子任務產生的同時,任務的訪問地址,訪問優先級已經被固定。我們根據一個分層算法進行映射。所以主鍵前三列已經確定,插入TableStore(表格存儲)后,id會自增生成,用于后續消費者讀任務用。
消息消費checkpoint表
這張表用于消息消費的checkpoint。下面會結合schema具體說下checkpoint的內容。
這張表屬性列上會有兩列,一列用來表示抓取掃描位移點,一列記錄完成位移點。這里checkpoint的記錄需要使用條件更新,即我們只會確保原來值小于待更新的值才會更新。
全量消息表
我們用全量消息表存放我們的消息id以及對應屬性,一個消息任務是否重復處理也通過這張表做判斷。
在全網信息表中,有一列屬性用來表示任務處理狀態,消費者在拿到任務id時需要條件更新這張表對應的這個key,對應行不存在可以直接插入。如果已經存在,需要先讀狀態為非結束狀態,版本為讀到版本情況下再做更新。更新成功者意味著當前id的任務被這個消費者搶占。其中行不存在表示第一次爬取,如果存在非結束狀態,表示之前的任務可能已經失敗。
任務消費處理流程
下面我們用爬蟲抓取全網網頁做為例子來看下具體如何基于TableStore(表格存儲)做消息隊列并最終實現任務的分發:
這張圖展現了我們的整個爬蟲框架,爬蟲具體流程如下
不同的爬蟲端會根據自身爬取進度定時從TableStore的爬蟲任務表進行拉取爬蟲任務,這里一般我們單線程GetRange訪問TableStore,我們認為這里的任務讀區速率會遠大于抓取消費者的速度,從TableStore讀區到的任務數據進入爬蟲內存隊列,然后進行下一輪任務消息讀區。直到當前內存隊列滿后等待下輪喚醒繼續抓取,如果有特殊需求可以并發拉取不同優先級。
初始對于每個任務的各個priority,他們的默認checkpoint都對應于TableStore的一個flag即Inf_Min,也就是第一行。
GetRange拉取到當前任務各優先級抓取任務后(例如我們可以設置從優先級高到低,一次最多200條,抓夠200條進行一次任務搶占),爬蟲會先根據具體優先級排序,然后按照優先級從高到低嘗試更新網頁信息表,進行爬取任務搶占,搶占成功后,該任務會被放進爬蟲的內存任務隊列給抓取線程使用。搶占成功同時我們也會更新一下爬蟲任務表中的狀態,和當前的時間,表示任務最新的更新時間,后續的任務狀態檢驗線程會查看任務是否已經過期需要重新處理。注意這里假如有一個爬蟲線程比較leg,是上一輪搶占任務后卡了很久才嘗試更新這個時間,也沒有問題。這種小概率的leg可能會帶來重復抓取,但是不會影響數據的一致性。并且我們可以在內存中記錄下每一步的時間,如果我們發現每一步內存中的時間超時也可以結束當前任務,進一步減少小概率的重復抓取。
當一輪的任務全部填充后,我們會根據當前拿到的最大任務表id+1(即爬蟲任務表第三個主鍵,也就是自增主鍵)進行嘗試當前任務對應優先級checkpoint表的更新(這里更新頻率可以根據業務自由決定),更新的原則是新的id要大于等于當前id。如果更新成功后,可以使用當前更新值繼續拉取,如果更新失敗,意味著有另一個爬蟲已經取得更新的任務,需要重新讀一下checkpoint表獲得最新的checkpoint id值,從該id繼續拉取。
除了任務抓取線程以為,每個爬蟲端可以有一個頻率更低的任務進行任務完成掃描,這個任務用來最新的完成任務游標。掃描中getrange的最大值為當前拉取的起始位置,掃描的邏輯分以下幾種:
掃描到該行已經更新為完成,此時游標可以直接下移
掃描到任務還是initial狀態,一個任務沒有被任何人設置為running,切被拉去過,原因是這個任務是一個重復抓取的任務,此時可以去url表中檢查這個url是否存在,存在直接跳過。
掃描到任務是running,不超時認為任務還在執行,結束當輪掃描。如果檢查時間戳超時,檢查url表,如果內容已經存在,則有可能是更新狀態回任務表失敗,游標繼續下移。如果內容也不存在,一種簡單做法是直接在表對應優先級中put一個新任務,唯一的問題是如果是并發檢查可能會產生重復的任務(重復任務通過url去重也可以解決)。另一種做法也是通過搶任務一樣更新url表,更新成功者可以新建任務下移坐標。其余的人停止掃描,更新checkpoint為當前位置。更新成功者可以繼續下移掃描直至尾部或者任務正常進行位置,然后更新checkpoint。
爬蟲抓取每個任務完成后,會更新全網url表中的狀態以及對應爬蟲任務表中的狀態,其中全網url的狀態用來給后續抓取任務去重使用,爬蟲任務表中的狀態給上面步驟5的完成游標掃描線程使用判斷一個任務是否已經完成。
整個寫入子任務和讀取我們可以抽象出下面這張圖
新任務會根據優先級并發寫入不同的隊列,其中圖中編號就對應表格存儲中的自增列,用戶按照上面設計表結構的話,不需要自己處理并發寫入的編號,表格存儲服務端會保證唯一且自增,即新任務在對應隊列末尾。爬蟲讀取任務的游標就是圖中紅色,藍色對應完成的任務列表。兩個游標在響應優先級下獨立維護。
下面我們舉個例子,如果一個爬蟲任務拉取線程假如設置一次拉2個任務為例,
我們的爬蟲任務表會從上面切換成下圖,task1 priority=3的掃描游標更新到了10011,priority=2的掃描游標更新到10006。也就意味著掃描優先級3的下次會從10011開始,優先級2的會從10006開始。
并發處理
多爬蟲拉取任務有重復,這部分我們通過條件更新大表決定了同一個網頁不會同時被抓取。
多爬蟲條件更新checkpoint表決定了我們整個拉取任務不會漏過當前拉到的一批任務,如果checkpoint更新如果條件失敗任務繼續進行,其他類型可重試錯誤會繼續重試(例如服務短時間不可用,leg等。)這里只有可能導致其他爬蟲喚醒后拉到重復數據,但是抓取因為搶占失敗也不會重復拉取,并且新喚醒的客戶端也會更新更大的游標,保證系統不會因為一個客戶端leg而任務掃描游標滯后。
任務判定完成邏輯我們可以做分布式互斥,同時只有一個進程在判斷。也可以在判斷任務失敗的時候進行條件更新原表,更新成功后再新插入一條新任務。
總結
最后我們再來看下整個設計中幾個關鍵的問題是否滿足
海量topic,TableStore(表格存儲)天然的以一個分區鍵做為一個隊列的能力使得我們可以很容易的實現海量的隊列,數量級可以在億級別甚至更多。
優先級,優先級對應一個主鍵列,依照優先級進行分層優先級高的會被優先getrange獲得。
系統吞吐,整個系統中兩個游標的設計,使得我們任務掃描游標每輪掃描后都會快速向下走,長尾任務不會阻礙對新任務的掃描。另一方面我們任務會在url大表上做搶占,避免不必要的重復抓取。
子任務不丟失,自增列的保證了新任務會用更大的值即排在當前隊列末尾。另外有一個完成掃描線程,會確保新任務全部完成后才會更新,這個游標代表了最后整個任務是否完成。這個游標也保證了任務不會丟失。這個任務會對長尾的任務重新建一個任務并插入隊列,新任務會被新爬蟲端重新觸發,也避免了因為一個客戶端卡住而餓死的問題。
總結
以上是生活随笔為你收集整理的基于TableStore构建简易海量Topic消息队列的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 数据增长浪潮下,PCIe 6.0的问与R
- 下一篇: 设计方案,拿来吧你!