高性能线程间队列 DISRUPTOR 简介
disruptor簡介
背景
Disruptor是英國外匯交易公司LMAX開發的一個高性能隊列,研發的初衷是解決內存隊列的延遲問題。與Kafka(Apache Kafka)、RabbitMQ(RabbitMQ)用于服務間的消息隊列不同,disruptor一般用于線程間消息的傳遞。基于Disruptor開發的系統單線程能支撐每秒600萬訂單,2010年在QCon演講后,獲得了業界關注。2011年,企業應用軟件專家Martin Fowler專門撰寫長文介紹The LMAX Architecture。同年它還獲得了Oracle官方的Duke大獎。其他關于disruptor的背景就不在此多言,可以自己google。
官方資料
disruptor github wiki有關于disruptor相關概念和原理的介紹,該wiki已經很久沒有更新。像Design and Implementation,對于想了解disruptor的人是很有吸引力的,但是只有題目沒有內容,還是很遺憾的。本文稍后會對其內部原理做一個介紹性的描述。
disruptor github wiki:
Home · LMAX-Exchange/disruptor Wiki
disruptor github:
LMAX-Exchange/disruptor: High Performance Inter-Thread Messaging Library
這個地方也有很多不錯的資料:Disruptor by LMAX-Exchange
性能
disruptor是用于一個JVM中多個線程之間的消息隊列,作用與ArrayBlockingQueue有相似之處,但是disruptor從功能、性能都遠好于ArrayBlockingQueue,當多個線程之間傳遞大量數據或對性能要求較高時,可以考慮使用disruptor作為ArrayBlockingQueue的替代者。
官方也對disruptor和ArrayBlockingQueue的性能在不同的應用場景下做了對比,本文列出其中一組數據,數據中P代表producer,C代表consumer,ABS代表ArrayBlockingQueue:
完整的官方性能測試數據在Performance Results · LMAX-Exchange/disruptor Wiki可以看到,性能測試的代碼已經包含在disruptor的代碼中,你完全可以git下來在自己的主機上測試一下看看
如何使用
單生產者,單消費者
| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354 | //聲明disruptor中事件類型及對應的事件工廠private class LongEvent {????????private long value;?????????????????public LongEvent() {????????????this.value = 0L;????????}?????????????????public void set(long value) {????????????this.value = value;????????}?????????????????public long get() {????????????return this.value;????????}????}private EventFactory<LongEvent> eventFactory = new EventFactory<LongEvent>() {????? ????????public LongEvent newInstance() {????????????return new LongEvent();????????}};//聲明disruptor,private int ringBufferSize = 1024;private Executor executor = Executors.newFixedThreadPool(8);private Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(eventFactory, ringBufferSize, executor);//pubisher邏輯,將原始數據轉換為event,publish到ringbufferprivate class Publisher implements EventTranslatorOneArg<LongEvent , String> {????????public void translateTo(LongEvent event, long sequence, String arg0) {????????????event.set(Long.parseLong(arg0));????????}?????? ????}//consumer邏輯,獲取event進行處理private class Consumer implements EventHandler<LongEvent> {????????public void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception {????????????long value = event.get();?????????? ????????????int index = (int) (value % Const.NUM_OF_FILE);????????????fileWriter[index].write("" + value + "\n");?????????????????????????if(value == Long.MAX_VALUE) {????????????????isFinish = true;????????????}????????}?????????????}//注冊consumer啟動disruptordisruptor.handleEventsWith(new Consumer());disruptor.start();//獲取disruptor的ringbuffer,用于生產數據private RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();ringBuffer.publishEvent(new Publisher(), line); |
多生產者
多生產者的改動相對簡單,只需將disruptor的聲明換一個構造函數即可,但是多生產者ringbuffer的處理邏輯完全不同,只是這些不同對使用者透明,本文將在后邊討論單生產者,多生產者ringbuffer邏輯的不同
| 1 | private Disruptor<LongEvent> disruptor1 = new Disruptor<LongEvent>(eventFactory, ringBufferSize, executor, ProducerType.MULTI, new BlockingWaitStrategy()); |
多消費者
多消費者的情況分為兩類:
- 廣播:對于多個消費者,每條信息會達到所有的消費者,被多次處理,一般每個消費者業務邏輯不通,用于同一個消息的不同業務邏輯處理
- 分組:對于同一組內的多個消費者,每條信息只會被組內一個消費者處理,每個消費者業務邏輯一般相同,用于多消費者并發處理一組消息
廣播
- 消費者之間無依賴關系
假設目前有handler1,handler2,handler3三個消費者處理一批消息,每個消息都要被三個消費者處理到,三個消費者無依賴關系,則如下所示即可
disruptor.handleEventsWith(handler1,handler2,handler3);
- 消費者之間有依賴關系
假設handler3必須在handler1,handler2處理完成后進行處理
disruptor.handleEventsWith(handler1,handler2).then(handler3);
其他情況可視為以上兩種情況的排列組合
分組
分組情況稍微不同,對于消費者,需要實現WorkHandler而不是EventHandler,借口定義分別如下所示:
| 123456789101112 | public interface EventHandler<T>{????/**?????* Called when a publisher has published an event to the {@link RingBuffer}?????*?????* @param event????? published to the {@link RingBuffer}?????* @param sequence?? of the event being processed?????* @param endOfBatch flag to indicate if this is the last event in a batch from the {@link RingBuffer}?????* @throws Exception if the EventHandler would like the exception handled further up the chain.?????*/????void onEvent(T event, long sequence, boolean endOfBatch) throws Exception;} |
| 12345678910 | public interface WorkHandler<T>{????/**?????* Callback to indicate a unit of work needs to be processed.?????*?????* @param event published to the {@link RingBuffer}?????* @throws Exception if the {@link WorkHandler} would like the exception handled further up the chain.?????*/????void onEvent(T event) throws Exception;} |
假設handler1,handler2,handler3都實現了WorkHandler,則調用以下代碼就可以實現分組
| 1 | disruptor.handleEventsWithWorkerPool(handler1, handler2, handler3); |
廣播和分組之間也是可以排列組合的
tips
disruptor也提供了函數讓你自定義消費者之間的關系,如
public EventHandlerGroup<T> handleEventsWith(final EventProcessor… processors)
當然,必須對disruptor有足夠的了解才能正確的在EventProcessor中實現多消費者正確的邏輯
實現原理
為何高效
事件預分配
在定義disruptor的時候我們需要指定事件工廠EventFactory的邏輯,disruptor內部的ringbuffer的數據結構是數組,EventFactory就用于disruptor初始化時數組每個元素的填充。生產者開始后,是通過獲取對應位置的Event,調用Event的setter函數更新Event達到生產數據的目的的。為什么這樣?假設使用LinkedList,在生產消費的場景下生產者會產生大量的新節點,新節點被消費后又需要被回收,頻繁的生產消費給GC帶來很大的壓力。使用數組后,在內存中存在的是一塊大小穩定的內存,頻繁的生產消費對GC并沒有什么影響,大大減小了系統的最慢響應時間,更不會因為消費者的滯后導致OOM的發生。因此這種事件預分配的方法對于減輕GC壓力可以說是一種簡單有效的方法,日常工作中的借鑒意義還是很大的。
無鎖算法
先看一段ABQ put算法的實現:
- 每個對象一個鎖,首先加鎖
- 如果數組是滿的,加入鎖的notFull條件等待隊列。(notFull的具體機制可以看這里的一篇文章wait、notify與Condition | forever)
- 元素加入數組
- 釋放鎖
| 123456789101112 | public void put(E e) throws InterruptedException {????????checkNotNull(e);????????final ReentrantLock lock = this.lock;????????lock.lockInterruptibly();????????try {????????????while (count == items.length)????????????????notFull.await();????????????enqueue(e);????????} finally {????????????lock.unlock();????????}????} |
通過以上代碼說明兩點:
- ABQ是通過lock機制實現的線程同步
- ABQ的所有操作共用同一個lock,故所有操作均是互斥的
這篇文章中講述了一個實驗, 測試程序調用了一個函數,該函數會對一個64位的計數器循環自增5億次,在2.4G 6核機器上得到了如下的實驗數據:
| Single thread | 300 |
| Single thread with CAS | 5,700 |
| Single thread with lock | 10,000 |
| Single thread with volatile write | 4,700 |
| Two threads with CAS | 30,000 |
| Two threads with lock | 224,000 |
實驗數據說明,使用CAS機制比使用lock機制快了一個數量級
另一方面,ABQ的所有操作都是互斥的,這點其實不是必要的,尤其像put和get操作,沒必要共享一個lock,完全可以降低鎖的粒度提高性能。
disruptor則與之不同:
disruptor使用了CAS機制同步線程,線程同步代價小于lock
disruptor遵守single writer原則,一塊內存對應單個線程,不僅produce和consume不是互斥的,多線程的produce也不是互斥的
偽共享
偽共享一直是一個比較高級的話題,Doug lea在JDK的Concurrent使用了大量的緩存行機制避免偽共享,disruptor也是用了這樣的機制。但是對于廣大的碼農而言,實際工作中我們可能很少會需要使用這樣的機制。畢竟對于大部分人而言,與避免偽共享帶來的性能提升而言,優化工程架構,算法,io等可能會給我們帶來更大的性能提升。所以本文只簡單提到這個話題,并不深入講解,畢竟我也沒有實際的應用經驗去講解這個話題。
單生產者模式
如圖所示,圖中數組代表ringbuffer,紅色元素代表已經發布過的事件槽,綠色元素代表將要發布的事件槽,白色元素代表尚未利用的事件槽。disruptor生產時間包括三個階段:申請事件槽,更新數據,發布事件槽。單生產者相對簡單,
- 申請事件槽:此時,ringbuffer會將cursor后的一個事件槽返回給用戶,但不更新cursor,所以對于消費者而言,該事件還是不可見的。
- 更新數據:生產者對該事件槽數據進行更新,
- 發布事件槽:發布的過程就是移動cursor的過程,完成移動cursor后,發布完成,該事件對生產者可見。
多生產者模式
多生產者的模式相對就比較復雜,也體現了disuptor是如何利用CAS機制進行的線程間同步,并保證多個生產者的生產不互斥。如圖所示,紅色的代表已經發布的事件,淡綠色代表生產者1申請的事件槽,淡黃色代表生產者2申請的事件槽。
- 申請事件槽:多生產者生產數據的過程就是移動cursor的過程,多個線程同時使用CAS操作更新cursor的值,哪個線程成功的更新了cursor的值哪個線程就成功申請了事件槽,而其他的線程則利用CAS操作繼續嘗試更新cursor的值。申請成功后cursor的值已經發生了改變,那怎么保證在該事件槽發布之前對消費者不可見呢?disruptor額外利用了一個數組,如圖中所示。深黃色代表相應的事件槽已經發布,白色代表相應的事件槽尚未發布。disruptor使用了UNSAFE類對該數組進行操作,從而保證數組值更新的高效性。
- 更新數據:生產者按序將成功申請到的事件槽數據進行更新
- 發布事件槽:生產者將對應數組的標志位更新
多個生產者生產數據唯一的競爭就發生在cursor值的更新,disruptor使用CAS操作更新cursor的值從而避免使用了鎖。申請數據之后,多個生產者可以并發更新數據,發布事件槽,互不影響。需要說明的是,如圖中所示,生產者1申請了三個事件槽,發布了一個事件槽,生產者2申請了兩個事件槽,發布了一個事件槽。時間上,在生產者1發布其剩余的兩個事件槽之前,生產者2發布的事件槽對于消費則也還是不可見的。所以,每個生產者一定要保證即便發生異常也要發布事件槽,避免其后的生產者發布的事件槽對消費者不可見。所以生產則更新數據和發布事件槽一般是一個try…finally結構。或者使用disruptor提供的EventTranslator機制發布事件,EventTranslator自動封裝了try…finally結構
tips
消費者的機制與生產者非常類似,本文不再贅述。
使用案例
LMAX應用場景
第一個講LMAX的應用場景,畢竟是催生disruptor的應用場景,所以非常典型。同時,disruptor作為內存消息隊列,怎么保證宕機的情況下數據不丟失這一關鍵問題在LMAX自身的應用中可以得到一點啟示。
LMAX的機構如圖所示,共包括三部分,Input Disruptor,Business Processor,Output Disruptor。
Input Disruptor從網絡接收到消息,在Business Processor處理之前需要完成三種操作:
- Journal:將收到的信息持久化,在Business Processor線程崩潰的時候恢復數據
- Replicate:復制信息到其他Business Processor節點
- Unmarshall:重組信息數據格式,便于Business Processor處理
Business Processor負責業務邏輯處理,并將結果寫入Output Disruptor
Output Disruptor負責讀取Business Processor處理結果,重組數據格式進行網絡傳輸。
重點介紹一下Input Disruptor,Input Disruptor的依賴關系如圖所示:
用disruptor的語言編寫就是:
disruptor.handleWith(journal, replacate, unmarshall).then(business)
LMAX為了避免business processor出現異常導致消息的丟失,在business processor處理前將消息全部持久化存儲。當business processor出現異常時,重新處理持久化的數據即可。我們可以借鑒LMAX的這種方式,來避免消息的丟失。更詳細關于LMAX的業務架構介紹可以參考The LMAX Architecture
log4j 2
以下一段文字引用自Apache log4j 2官網,這段文字足以說明disruptor對log4j 2的性能提升的巨大貢獻。
Log4j 2 contains next-generation Asynchronous Loggers based on the LMAX Disruptor library. In multi-threaded scenarios Asynchronous Loggers have 18 times higher throughput and orders of magnitude lower latency than Log4j 1.x and Logback.
log4j2性能的優越主要體現在異步日志記錄方面,以下兩個圖片摘自官網分別從吞吐率和響應時間兩個方面體現了log4j2異步日志性能的強悍。
log4j2異步日志的實現就是每次調用將待記錄的日志寫入disruptor后迅速返回,這樣無需等待信息落盤從而大大提高相應時間。同時,disruptor的事件槽重用機制避免產生大量Java對象,進而避免GC對相應時間和吞吐率的影響,也就是log4j2官網提到的Garbage-free。
文件hash
還有一種比較常見的應用場景是文件hash。如圖所示,需要對大文件進行hash以方便后續處理,由于文件太大,所以把文件分給四個線程分別處理,每個線程讀取相應信息,計算hash值,寫入相應文件。
這樣的方法有兩個弊端:
- 同一個線程內,讀寫相互依賴,互相等待
- 不同線程可能爭奪同一個輸出文件,需要lock同步
于是改為如下方法,四個線程讀取數據,計算hash值,將信息寫入相應disruptor。每個disruptor對應一個消費者,將disruptor中的信息落盤持久化。對于四個讀取線程而言,只有讀取文件操作,沒有寫文件操作,因此不存在讀寫互相依賴的問題。對于寫線程而言,只存在寫文件操作,沒有讀文件,因此也不存在讀寫互相依賴的問題。同時disruptor的存在又很好的解決了多個線程互相競爭同一個文件的問題,因此可以大大提高程序的吞吐率。
總結
以上是生活随笔為你收集整理的高性能线程间队列 DISRUPTOR 简介的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 使用spring boot +WebSo
- 下一篇: ES-Hadoop学习之ES和HDFS数