生活随笔
收集整理的這篇文章主要介紹了
disruptor笔记之六:常见场景
小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.
歡迎訪問我的GitHub
這里分類和匯總了欣宸的全部原創(chuàng)(含配套源碼):https://github.com/zq2599/blog_demos
《disruptor筆記》系列鏈接
快速入門Disruptor類分析環(huán)形隊(duì)列的基礎(chǔ)操作(不用Disruptor類)事件消費(fèi)知識(shí)點(diǎn)小結(jié)事件消費(fèi)實(shí)戰(zhàn)常見場(chǎng)景等待策略知識(shí)點(diǎn)補(bǔ)充(終篇)
本篇概覽
- 本文是《disruptor筆記》系列的第六篇,主要內(nèi)容是將一些常用的消費(fèi)模式做匯總,后續(xù)日常開發(fā)中如果有需要就能拿來即用;
- 以下是常用的模式:
多個(gè)消費(fèi)者獨(dú)立消費(fèi),前文已實(shí)現(xiàn),本篇跳過多個(gè)消費(fèi)者共同消費(fèi),前文已實(shí)現(xiàn),本篇跳過既有獨(dú)立消費(fèi),也有共同消費(fèi),前文已實(shí)現(xiàn),本篇跳過多個(gè)生產(chǎn)者和多個(gè)獨(dú)立消費(fèi)者:
C1、C2獨(dú)立消費(fèi),C3依賴C1和C2
C1獨(dú)立消費(fèi),C2和C3也獨(dú)立消費(fèi),但依賴C1,C4依賴C2和C3:
C1和C2獨(dú)立消費(fèi),C3和C4也是獨(dú)立消費(fèi),但C3和C4都依賴C1和C2,然后C5依賴C3和C4:
C1和C2共同消費(fèi),C3和C4也是共同消費(fèi),但C3和C4都依賴C1和C2,然后C5依賴C3和C4:
C1和C2共同消費(fèi),C3和C4獨(dú)立消費(fèi),但C3和C4都依賴C1和C2,然后C5依賴C3和C4:
C1和C2獨(dú)立消費(fèi),C3和C4是共同消費(fèi),但C3和C4都依賴C1和C2,然后C5依賴C3和C4:
關(guān)于本篇代碼
- 為了省事兒,本次不會(huì)新建工程,而是直接使用前文的consume-mode模塊,因此,下面這些類直接就直接使用了,無需重寫代碼:
事件定義:OrderEvent事件工廠:OrderEventFactory事件生產(chǎn)者:OrderEventProducer用在獨(dú)立消費(fèi)場(chǎng)景的事件消費(fèi)者:MailEventHandler用在共同消費(fèi)場(chǎng)景的事件消費(fèi)者:MailWorkHandler
源碼下載
- 本篇實(shí)戰(zhàn)中的完整源碼可在GitHub下載到,地址和鏈接信息如下表所示(https://github.com/zq2599/blog_demos):
名稱鏈接備注
| 項(xiàng)目主頁(yè) | https://github.com/zq2599/blog_demos | 該項(xiàng)目在GitHub上的主頁(yè) |
| git倉(cāng)庫(kù)地址(https) | https://github.com/zq2599/blog_demos.git | 該項(xiàng)目源碼的倉(cāng)庫(kù)地址,https協(xié)議 |
| git倉(cāng)庫(kù)地址(ssh) | git@github.com:zq2599/blog_demos.git | 該項(xiàng)目源碼的倉(cāng)庫(kù)地址,ssh協(xié)議 |
- 這個(gè)git項(xiàng)目中有多個(gè)文件夾,本次實(shí)戰(zhàn)的源碼在disruptor-tutorials文件夾下,如下圖紅框所示:
- disruptor-tutorials是個(gè)父工程,里面有多個(gè)module,本篇實(shí)戰(zhàn)的module是consume-mode,如下圖紅框所示:
多個(gè)生產(chǎn)者和多個(gè)獨(dú)立消費(fèi)者
咱們即將實(shí)現(xiàn)下圖的邏輯:
- 前面幾篇文章所有實(shí)戰(zhàn)的生產(chǎn)者都只有一個(gè),到了本篇,為了讓consume-mode模塊的代碼能夠支持多生產(chǎn)者,咱們要對(duì)功能業(yè)務(wù)的抽象父類做以下兩處改動(dòng):
init方法原本為private型,現(xiàn)在為了能讓子類重此方法,將其改為protected類型;增加名為publishWithProducer2的方法,可見內(nèi)部只有拋出異常,要想其正常工作,需要子類自己來實(shí)現(xiàn):
public void publishWithProducer2(String value
) throws Exception {throw new Exception("父類未實(shí)現(xiàn)此方法,請(qǐng)?jiān)谧宇愔兄貙懘朔椒ê笤僬{(diào)用");
}
- 為了實(shí)現(xiàn)多生產(chǎn)者功能,新增MultiProducerServiceImpl.java,有幾處要注意的地方稍后會(huì)提到:
package com.bolingcavalry.service.impl;import com.bolingcavalry.service.*;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import lombok.Setter;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;@Service("multiProducerService")
public class MultiProducerServiceImpl extends ConsumeModeService {@Setterprotected OrderEventProducer producer2
;@PostConstruct@Overrideprotected void init() {disruptor
= new Disruptor<>(new OrderEventFactory(),BUFFER_SIZE
,new CustomizableThreadFactory("event-handler-"),ProducerType.MULTI
,new BlockingWaitStrategy());disruptorOperate();disruptor
.start();setProducer(new OrderEventProducer(disruptor
.getRingBuffer()));setProducer2(new OrderEventProducer(disruptor
.getRingBuffer()));}@Overrideprotected void disruptorOperate() {MailEventHandler c1
= new MailEventHandler(eventCountPrinter
);MailEventHandler c2
= new MailEventHandler(eventCountPrinter
);disruptor
.handleEventsWith(c1
, c2
);}@Overridepublic void publishWithProducer2(String value
) throws Exception {producer2
.onData(value
);}
}
重寫父類的init方法,主要是實(shí)例化Disruptor的時(shí)候,多傳入兩個(gè)參數(shù):ProducerType.MULTI表示生產(chǎn)類型是多生產(chǎn)者,BlockingWaitStrategy是等待策略,之前的代碼中咱們沒有傳此參數(shù)時(shí),默認(rèn)的就是BlockingWaitStrategyinit方法中還執(zhí)行了setProducer2方法,設(shè)置成員變量producer2重寫publishWithProducer2方法,調(diào)用成員變量producer2發(fā)表事件重寫disruptorOperate方法,里面設(shè)置了兩個(gè)獨(dú)立消費(fèi)者
- 驗(yàn)證上述代碼的方式依舊是單元測(cè)試,打開ConsumeModeServiceTest.java,新增以下代碼,可見新增了兩個(gè)線程同時(shí)執(zhí)行發(fā)布事件的操作:
@Autowired@Qualifier("multiProducerService")ConsumeModeService multiProducerService
;@Testpublic void testMultiProducerService() throws InterruptedException {log
.info("start testMultiProducerService");CountDownLatch countDownLatch
= new CountDownLatch(1);int expectEventCount
= EVENT_COUNT
*4;multiProducerService
.setCountDown(countDownLatch
, expectEventCount
);new Thread(() -> {for(int i
=0;i
<EVENT_COUNT
;i
++) {log
.info("publich {}", i
);multiProducerService
.publish(String.valueOf(i
));}}).start();new Thread(() -> {for(int i
=0;i
<EVENT_COUNT
;i
++) {log
.info("publishWithProducer2 {}", i
);try {multiProducerService
.publishWithProducer2(String.valueOf(i
));} catch (Exception e
) {e
.printStackTrace();}}}).start();countDownLatch
.await();assertEquals(expectEventCount
, multiProducerService
.eventCount());}
- 測(cè)試結(jié)果如下,測(cè)試通過,符合預(yù)期:
C1、C2獨(dú)立消費(fèi),C3依賴C1和C2
- 邏輯圖如下:
- 實(shí)現(xiàn)代碼如下,非常簡(jiǎn)單,依賴關(guān)系用then即可實(shí)現(xiàn):
package com.bolingcavalry.service.impl;import com.bolingcavalry.service.ConsumeModeService;
import com.bolingcavalry.service.MailEventHandler;
import com.bolingcavalry.service.MailWorkHandler;
import com.bolingcavalry.service.SmsEventHandler;
import org.springframework.stereotype.Service;@Service("scene5")
public class Scene5 extends ConsumeModeService {@Overrideprotected void disruptorOperate() {MailEventHandler c1
= new MailEventHandler(eventCountPrinter
);MailEventHandler c2
= new MailEventHandler(eventCountPrinter
);MailEventHandler c3
= new MailEventHandler(eventCountPrinter
);disruptor
.handleEventsWith(c1
, c2
).then(c3
);}
}
@Autowired@Qualifier("scene5")Scene5 scene5
;@Testpublic void testScene5
() throws InterruptedException {log
.info("start testScene5");testConsumeModeService(scene5
,EVENT_COUNT
,EVENT_COUNT
* 3);}
- 為了節(jié)省篇幅,測(cè)試結(jié)果就不貼了,要注意的是,每個(gè)事件都一定是C1和C2先消費(fèi)過,才會(huì)被C3消費(fèi)到;
C1獨(dú)立消費(fèi),C2和C3也獨(dú)立消費(fèi),但依賴C1,C4依賴C2和C3
- 邏輯圖如下:
- 實(shí)現(xiàn)代碼如下:
package com.bolingcavalry.service.impl;import com.bolingcavalry.service.ConsumeModeService;
import com.bolingcavalry.service.MailEventHandler;
import org.springframework.stereotype.Service;@Service("scene6")
public class Scene6 extends ConsumeModeService {@Overrideprotected void disruptorOperate() {MailEventHandler c1
= new MailEventHandler(eventCountPrinter
);MailEventHandler c2
= new MailEventHandler(eventCountPrinter
);MailEventHandler c3
= new MailEventHandler(eventCountPrinter
);MailEventHandler c4
= new MailEventHandler(eventCountPrinter
);disruptor
.handleEventsWith(c1
).then(c2
, c3
).then(c4
);}
}
@Autowired@Qualifier("scene6")Scene6 scene6
;@Testpublic void testScene6
() throws InterruptedException {log
.info("start testScene6");testConsumeModeService(scene6
,EVENT_COUNT
,EVENT_COUNT
* 4);}
C1和C2獨(dú)立消費(fèi),C3和C4也是獨(dú)立消費(fèi),但C3和C4都依賴C1和C2,然后C5依賴C3和C4
- 邏輯圖如下:
- 實(shí)現(xiàn)代碼如下:
package com.bolingcavalry.service.impl;import com.bolingcavalry.service.ConsumeModeService;
import com.bolingcavalry.service.MailEventHandler;
import org.springframework.stereotype.Service;@Service("scene7")
public class Scene7 extends ConsumeModeService {@Overrideprotected void disruptorOperate() {MailEventHandler c1
= new MailEventHandler(eventCountPrinter
);MailEventHandler c2
= new MailEventHandler(eventCountPrinter
);MailEventHandler c3
= new MailEventHandler(eventCountPrinter
);MailEventHandler c4
= new MailEventHandler(eventCountPrinter
);MailEventHandler c5
= new MailEventHandler(eventCountPrinter
);disruptor
.handleEventsWith(c1
, c2
).then(c3
, c4
).then(c5
);}
}
@Autowired@Qualifier("scene7")Scene7 scene7
;@Testpublic void testScene7
() throws InterruptedException {log
.info("start testScene7");testConsumeModeService(scene7
,EVENT_COUNT
,EVENT_COUNT
* 5);}
C1和C2共同消費(fèi),C3和C4也是共同消費(fèi),但C3和C4都依賴C1和C2,然后C5依賴C3和C4
- 邏輯圖如下:
- 實(shí)現(xiàn)代碼如下:
package com.bolingcavalry.service.impl;import com.bolingcavalry.service.ConsumeModeService;
import com.bolingcavalry.service.MailEventHandler;
import com.bolingcavalry.service.MailWorkHandler;
import org.springframework.stereotype.Service;
@Service("scene8")
public class Scene8 extends ConsumeModeService {@Overrideprotected void disruptorOperate() {MailWorkHandler c1
= new MailWorkHandler(eventCountPrinter
);MailWorkHandler c2
= new MailWorkHandler(eventCountPrinter
);MailWorkHandler c3
= new MailWorkHandler(eventCountPrinter
);MailWorkHandler c4
= new MailWorkHandler(eventCountPrinter
);MailWorkHandler c5
= new MailWorkHandler(eventCountPrinter
);disruptor
.handleEventsWithWorkerPool(c1
, c2
).thenHandleEventsWithWorkerPool(c3
, c4
).thenHandleEventsWithWorkerPool(c5
);}
}
@Autowired@Qualifier("scene8")Scene8 scene8
;@Testpublic void testScene8
() throws InterruptedException {log
.info("start testScene8");testConsumeModeService(scene8
,EVENT_COUNT
,EVENT_COUNT
* 3);}
C1和C2共同消費(fèi),C3和C4獨(dú)立消費(fèi),但C3和C4都依賴C1和C2,然后C5依賴C3和C4
- 邏輯圖如下:
- 實(shí)現(xiàn)代碼如下:
package com.bolingcavalry.service.impl;import com.bolingcavalry.service.ConsumeModeService;
import com.bolingcavalry.service.MailEventHandler;
import com.bolingcavalry.service.MailWorkHandler;
import org.springframework.stereotype.Service;@Service("scene9")
public class Scene9 extends ConsumeModeService {@Overrideprotected void disruptorOperate() {MailWorkHandler c1
= new MailWorkHandler(eventCountPrinter
);MailWorkHandler c2
= new MailWorkHandler(eventCountPrinter
);MailEventHandler c3
= new MailEventHandler(eventCountPrinter
);MailEventHandler c4
= new MailEventHandler(eventCountPrinter
);MailEventHandler c5
= new MailEventHandler(eventCountPrinter
);disruptor
.handleEventsWithWorkerPool(c1
, c2
).then(c3
, c4
).then(c5
);}
}
@Autowired@Qualifier("scene9")Scene9 scene9
;@Testpublic void testScene9
() throws InterruptedException {log
.info("start testScene9");testConsumeModeService(scene9
,EVENT_COUNT
,EVENT_COUNT
* 4);}
C1和C2獨(dú)立消費(fèi),C3和C4是共同消費(fèi),但C3和C4都依賴C1和C2,然后C5依賴C3和C4
- 邏輯圖如下:
- 實(shí)現(xiàn)代碼如下:
package com.bolingcavalry.service.impl;import com.bolingcavalry.service.ConsumeModeService;
import com.bolingcavalry.service.MailEventHandler;
import com.bolingcavalry.service.MailWorkHandler;
import org.springframework.stereotype.Service;@Service("scene10")
public class Scene10 extends ConsumeModeService {@Overrideprotected void disruptorOperate() {MailEventHandler c1
= new MailEventHandler(eventCountPrinter
);MailEventHandler c2
= new MailEventHandler(eventCountPrinter
);MailWorkHandler c3
= new MailWorkHandler(eventCountPrinter
);MailWorkHandler c4
= new MailWorkHandler(eventCountPrinter
);MailEventHandler c5
= new MailEventHandler(eventCountPrinter
);disruptor
.handleEventsWith(c1
, c2
).thenHandleEventsWithWorkerPool(c3
, c4
).then(c5
);}
}
@Testpublic void testScene10
() throws InterruptedException {log
.info("start testScene10");testConsumeModeService(scene10
,EVENT_COUNT
,EVENT_COUNT
* 4);}
- 至此,一些常見場(chǎng)景的代碼已完成,希望本文能給您一些參考,幫您更得心應(yīng)手的用好這個(gè)優(yōu)秀的工具;
你不孤單,欣宸原創(chuàng)一路相伴
Java系列Spring系列Docker系列kubernetes系列數(shù)據(jù)庫(kù)+中間件系列DevOps系列
總結(jié)
以上是生活随笔為你收集整理的disruptor笔记之六:常见场景的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。