异步CDI事件
幾天前,在我們的常規代碼審查中,我的一位同事提出了一個問題,即如果可能,一次同時調用CDI觀察者(這樣的方法帶有參數@Observes的方法)會發生什么?用于不同的事件實例。 換句話說,在產生少量事件之后,以下方法是否可能同時被多個線程處理:
考慮一下之后,我決定運行一些測試并在本文中描述結果。
最初的結果:發生了CDI事件以同步模式觸發,這讓我有些驚訝。 為什么?
到目前為止,我是這樣看的:CDI觀察者允許我非常干凈地將事件生產者與事件消費者分開,因此我沒有任何硬編碼的偵聽器注冊,維護偵聽器列表并手動通知它們。 CDI容器為我做一切。
因此,如果我們將生產者與消費者完全分開,我認為存在某種事件總線運行在專門的線程執行程序池中,該池負責注冊事件與調用的觀察者方法之間的中介。 我想我是基于其他事件/偵聽器解決方案(例如Google Guava EventBus)的這一假設。 它們使您有機會定義是否要使用同步(默認, EventBus )或異步事件分派器( AsyncEventBus) 。
而且,如果EJB既是生產者又是消費者,那么我認為它具有與異步EJB調用相同的功能。 異步事件觀察器唯一可能的JTA事務屬性是: REQUIRED , REQUIRES_NEW或NOT_SUPPORTED 。
現在,這就是我期望的所有工作方式,這似乎與當前狀態大不相同 。 現實生活表明CDI事件是同步的。
使異步事件在CDI 1.1中可用存在一個問題,但是我不確定此功能的當前狀態如何,并且在CDI 1.1(Java EE 7的一部分)中沒有找到有關此功能的信息。
讓我們看看如何獨自處理它。
目錄
默認同步事件
讓我們從顯示問題的基本示例開始。 看一下代碼–首先,CDI Bean生產者:
@Path("/produce") public class EventGenerator {@Injectprivate Logger logger;@Injectprivate Event<MyEvent> events;@Path("/cdiBean/{eventsNum}")@GETpublic String generateEvents(@PathParam("eventsNum") int numberOfEventsToGenerate) {for (int i = 0; i < numberOfEventsToGenerate; i++) {MyEvent event = new MyEvent(i);logger.info("Generating Event: " + event);events.fire(event);}return "Finished. Generated " + numberOfEventsToGenerate + " events.";} }MyEvent只是一些事件對象,在這里并不是很重要。 它存儲我們在實例化時傳遞的事件序列號。
消費者是一個非常簡單的CDI Bean:
public class EventConsumer {@Injectprivate Logger logger;public void consumeEvent(@Observes MyEvent myEvent) throws InterruptedException {logger.info("Receiving event: " + myEvent);TimeUnit.MILLISECONDS.sleep(500);} }請注意,我已經插入了一個線程睡眠來模擬一些長時間運行的事件接收器進程。
現在,讓我們通過調用EventProducer公開的REST命令來運行此示例。 結果(運行JBoss EAP 6.1 Alpha )將類似于以下內容:
14:15:59,196 [com.piotrnowicki.EventGenerator](http- / 127.0.0.1:8080-1)生成事件:MyEvent [ seqNo = 0 ] 14:15:59,197 [com.piotrnowicki.EventConsumer](http- / 127.0 .0.1:8080-1)接收事件:MyEvent [ seqNo = 0 ] 14:15:59,697 [com.piotrnowicki.EventGenerator](http- / 127.0.0.1:8080-1)生成事件:MyEvent [ seqNo = 1 ] 14 :15:59,698 [com.piotrnowicki.EventConsumer](http- / 127.0.0.1:8080-1)接收事件:MyEvent [ seqNo = 1 ] 14:16:00,199 [com.piotrnowicki.EventGenerator](http- / 127.0。 0.1:8080-1)生成事件:MyEvent [ seqNo = 2 ] 14:16:00,200 [com.piotrnowicki.EventConsumer](http- / 127.0.0.1:8080-1)接收事件:MyEvent [ seqNo = 2 ]
它顯示了CDI事件的同步性質–事件的產生和使用發生在同一線程中,一個接一個地發生。
那么,如何使用CDI實現異步事件?
解決方案1 ??– CDI生產者和Singleton EJB作為接收者
生產者堅持使用–純CDI bean:
@Path("/produce") public class EventGenerator {@Path("/cdiBean/{eventsNum}")@GETpublic String generateEvents(@PathParam("eventsNum") int numberOfEventsToGenerate) { ... } }現在,如果您將接收器變成@Singleton EJB,并將observes方法標記為@Asynchronous,如下所示:
@Singleton public class EventConsumer {@Asynchronouspublic void consumeEvent(@Observes MyEvent myEvent) throws InterruptedException { ... } }您將得到以下結果:
14:21:19,341 [com.piotrnowicki.EventGenerator](http- / 127.0.0.1:8080-1)生成事件:MyEvent [seqNo = 0] 14:21:19,343 [com.piotrnowicki.EventGenerator](http- / 127.0 .0.1:8080-1)生成事件:MyEvent [seqNo = 1] 14:21:19,343 [com.piotrnowicki.EventGenerator](http- / 127.0.0.1:8080-1)生成事件:MyEvent [seqNo = 2] 14 :21: 19,347 [com.piotrnowicki.EventConsumer](EJB默認– 2)接收事件:MyEvent [seqNo = 1] 14:21: 19,848 [com.piotrnowicki.EventConsumer](EJB默認– 1)接收事件:MyEvent [seqNo = 0] 14:21: 20,350 [com.piotrnowicki.EventConsumer](EJB默認值– 3)接收事件:MyEvent [seqNo = 2]
事件是一個接一個地產生的,并且是在單獨的線程中產生的。SingletonEJB一次又一次地為它們提供服務(請查看事件處理的時間。)這是因為Singleton EJB的每個業務方法都具有隱式寫鎖定。 因此,這是:
異步: 是
線程安全的觀察者方法: 是
解決方案2 –使用Singleton EJB作為具有讀取鎖定的接收器
這種方法與解決方案1非常相似,但是,由于所有事件處理都是并行進行的,因此它為您提供了更高的吞吐量。
我們的生產者保持不變–它是一個CDI bean:
@Path("/produce") public class EventGenerator {@Path("/cdiBean/{eventsNum}")@GETpublic String generateEvents(@PathParam("eventsNum") int numberOfEventsToGenerate) { ... } }我們的使用者將@Lock(READ)添加到其@Lock(READ)方法中; 這使得能夠同時處理多個事件的魔力:
@Singleton public class EventConsumer {@Asynchronous@Lock(LockType.READ)public void consumeEvent(@Observes MyEvent myEvent) throws InterruptedException { ... } }結果就是這樣:
14:24:44,202 [com.piotrnowicki.EventGenerator](http- / 127.0.0.1:8080-1)生成事件:MyEvent [seqNo = 0] 14:24:44,204 [com.piotrnowicki.EventGenerator](http- / 127.0 .0.1:8080-1)生成事件:MyEvent [seqNo = 1] 14:24:44,205 [com.piotrnowicki.EventGenerator](http- / 127.0.0.1:8080-1)生成事件:MyEvent [seqNo = 2] 14 :24: 44,207 [com.piotrnowicki.EventConsumer](EJB默認– 4)接收事件:MyEvent [seqNo = 0] 14:24: 44,207 [com.piotrnowicki.EventConsumer](EJB默認– 6)接收事件:MyEvent [seqNo = 2] 14:24: 44,207 [com.piotrnowicki.EventConsumer](EJB默認值– 5)接收事件:MyEvent [seqNo = 1]
同時服務事件的不同線程為您提供更大的吞吐量。 因此,這是:
異步: 是
線程安全的觀察者方法: 否
解決方案3 – EJB生產者和CDI使用者
CDI允許您觀察特定交易階段的事件。 您可以使用@Observes(during=TransactionPhase...)指定它。 在我們的情況下,我們希望CDI堆疊所有這些事件并僅在事務結束后才調用觀察者。 為此,我們只需將以上屬性添加到我們的CDI Bean觀察器中:
public class EventConsumer { public void consumeEvent(@Observes(during = TransactionPhase.AFTER_COMPLETION) MyEvent myEvent) { ... } }現在,我們只需要確保EventGenerator方法中有正在運行的事務EventGenerator 。 我們可以通過將CDI Bean轉換為@Stateless EJB并使用其隱式REQUIRED TransactionAttribute來快速完成此操作:
@Stateless @Path("/produce") public class EventGenerator {@Path("/cdiBean/{eventsNum}")@GETpublic String generateEvents(@PathParam("eventsNum") int numberOfEventsToGenerate) { ... } }這是我們可能最終得到的結果:
14:39:06,776 [com.piotrnowicki.EventGenerator](http- / 127.0.0.1:8080-1)生成事件:MyEvent [seqNo = 0] 14:39:06,776 [com.piotrnowicki.EventGenerator](http- / 127.0 .0.1:8080-1)生成事件:MyEvent [seqNo = 1] 14:39:06,776 [com.piotrnowicki.EventGenerator](http- / 127.0.0.1:8080-1)生成事件:MyEvent [seqNo = 2] 14 :39: 06,778 [com.piotrnowicki.EventConsumer](http- / 127.0.0.1:8080-1)接收事件:MyEvent [seqNo = 2] 14:39: 07,279 [com.piotrnowicki.EventConsumer](http- / 127.0。 0.1:8080-1)接收事件:MyEvent [seqNo = 0] 14:39: 07,780 [com.piotrnowicki.EventConsumer](http- / 127.0.0.1:8080-1)
EJB EventGenerator啟動事務,并且只有在事務完成之后,才會以序列化的方式調用CDI bean觀察器。
異步: 是
線程安全的觀察者方法: 是
解決方案4 – EJB生產者和EJB使用者
這與解決方案3非常相似。我們的生成器保持不變(無狀態EJB):
@Stateless @Path("/produce") public class EventGenerator {@Path("/cdiBean/{eventsNum}")@GETpublic String generateEvents(@PathParam("eventsNum") int numberOfEventsToGenerate) { ... } }現在對EventConsumer進行了更改:
@Singleton public class EventConsumer {@Asynchronous@Lock(LockType.READ)public void consumeEvent(@Observes(during = TransactionPhase.AFTER_COMPLETION) MyEvent myEvent) throws InterruptedException { ... } }結果可能如下:
14:44:09,363 [com.piotrnowicki.EventGenerator](http- / 127.0.0.1:8080-1)生成事件:MyEvent [seqNo = 0] 14:44:09,464 [com.piotrnowicki.EventGenerator](http- / 127.0 .0.1:8080-1)生成事件:MyEvent [seqNo = 1] 14:44:09,564 [com.piotrnowicki.EventGenerator](http- / 127.0.0.1:8080-1)生成事件:MyEvent [seqNo = 2] 14 :44: 09,670 [com.piotrnowicki.EventConsumer](EJB默認– 8)接收事件:MyEvent [seqNo = 2] 14: 44 : 09,670 [com.piotrnowicki.EventConsumer](EJB默認– 2)接收事件:MyEvent [seqNo = 1] 14:44: 09,670 [com.piotrnowicki.EventConsumer](EJB默認– 1)接收事件:MyEvent [seqNo = 0]
我們在這里使用了兩個功能–一個是事件使用者方法是異步的,第二個是在生產者事務完成之前不會通知使用者。 這給我們:
異步: 是
線程安全的觀察者方法: 否
解決方案4與解決方案2
這兩個解決方案似乎是相同的。 它們僅與消費者的注釋不同: @Observes與@Observes(during = TransactionPhase.AFTER_COMPLETION) 。 此外,對于我們的測試用例,它們的行為相同: 它們是異步的,并且多個線程可以同時處理事件接收器 。 但是,它們之間有一個很大的區別。
在我們的測試案例中,我們一個接一個地觸發事件。 想象一下,事件觸發之間還有其他操作。 在這種情況下:
- 解決方案2( @Observes )將在第一個事件觸發后立即開始處理事件,
- 解決方案4( @Observes(during = TransactionPhase.AFTER_COMPLETION) )將在事務完成后立即開始處理,因此將觸發所有事件。
這顯示了這種情況的可能結果:
解決方案2( @Observes )
15:01:34,318 [com.piotrnowicki.EventGenerator](http- / 127.0.0.1:8080-1)生成事件:MyEvent [ seqNo = 0 ] 15:01:34,320 [com.piotrnowicki.EventConsumer](EJB默認– 3 )接收事件:MyEvent [ seqNo = 0 ] 15:01:34,419 [com.piotrnowicki.EventGenerator](http- / 127.0.0.1:8080-1)生成事件:MyEvent [ seqNo = 1 ] 15:01:34,420 [com .piotrnowicki.EventConsumer](EJB默認– 6)接收事件:MyEvent [ seqNo = 1 ] 15:01:34,520 [com.piotrnowicki.EventGenerator](http- / 127.0.0.1:8080-1)生成事件:MyEvent [ seqNo = 2 ] 15:01:34,521 [com.piotrnowicki.EventConsumer](EJB默認值– 9)接收事件:MyEvent [ seqNo = 2 ]
解決方案4( @Observes(during = TransactionPhase.AFTER_COMPLETION) )
15:00:41,126 [com.piotrnowicki.EventGenerator](http- / 127.0.0.1:8080-1)生成事件:MyEvent [ seqNo = 0 ] 15:00:41,226 [com.piotrnowicki.EventGenerator](http- / 127.0 .0.1:8080-1)生成事件:MyEvent [ seqNo = 1 ] 15:00:41,326 [com.piotrnowicki.EventGenerator](http- / 127.0.0.1:8080-1)生成事件:MyEvent [ seqNo = 2 ] 15 :00:41,432 [com.piotrnowicki.EventConsumer](EJB默認– 10)接收事件:MyEvent [ seqNo = 2 ] 15:00:41,432 [com.piotrnowicki.EventConsumer](EJB默認– 4)接收事件:MyEvent [ seqNo = 1 ] 15:00:41,432 [com.piotrnowicki.EventConsumer](EJB默認值– 5)接收事件:MyEvent [ seqNo = 0 ]
解決方案5 – EJB生產者和CDI使用者II
到目前為止,我們已經嘗試使接收器異步。 也有相反的方法–我們可以使事件生成器異步 。 我們可以通過將生產者標記為@Stateless并調用自己的異步方法來觸發事件來實現:
@Stateless @Path("/produce") public class EventGenerator {// ...@Resourceprivate SessionContext sctx;@Path("/cdiBean/{eventsNum}")@GETpublic String generateEvents(@PathParam("eventsNum") int numberOfEventsToGenerate) {for (int i = 0; i < numberOfEventsToGenerate; i++) {sctx.getBusinessObject(EventGenerator.class).fireEvent(new MyEvent(i));}return "Finished. Generated " + numberOfEventsToGenerate + " events.";}@Asynchronouspublic void fireEvent(final MyEvent event) {events.fire(event);} } 使用SessionContext仔細研究EJB自動引用。 在這種情況下,這是必需的,因為我們希望容器分派我們的方法調用并添加它的異步性質。 我們不希望使之成為本地呼叫,所以我們拒絕使用隱含的this對象。
另一方面,事件使用者是純CDI bean:
結果可能如下:
00:40:32,820 [com.piotrnowicki.EventGenerator](EJB默認– 2)正在生成事件:MyEvent [seqNo = 1] 00:40:32,820 [com.piotrnowicki.EventGenerator](EJB默認– 3)正在生成事件:MyEvent [ SEQNO = 2] 00:40:32820 [com.piotrnowicki.EventGenerator](EJB默認- 1)產生事件:MyEvent [SEQNO = 0] 00:40:32821 [com.piotrnowicki.EventConsumer](EJB默認- 1)接收事件:MyEvent [seqNo = 0] 00 : 40 : 32,821 [com.piotrnowicki.EventConsumer](EJB默認– 2)接收事件:MyEvent [seqNo = 1] 00 : 40 : 32,821 [com.piotrnowicki.EventConsumer](EJB默認– 3)接收事件:MyEvent [seqNo = 2]
異步: 是
線程安全的觀察者方法: 否
解決方案6 –使用JMS進行CDI
這是Juliano Viana在他的博客上提出的解決方案。 它使用JMS作為事件總線。 生成CDI事件,然后由負責將該事件放入JMS主題/隊列的某個類獲取。 從主題/隊列中獲取消息的MDB正在生成一個調用實際接收者的事件。 這不僅為您提供了事件的異步傳遞,而且還為其添加了事務性質。 例如,如果事件接收者無法處理該消息–它可以回滾該事務,并且隊列將確保該消息將被重新發送(也許您的事件處理器下次將能夠處理此事件?)
結論
CDI 1.0不支持異步事件生成。 CDI 1.1似乎也沒有這種支持。
但是,這并不意味著您無法實現異步處理。 已經存在基于EJB 3.1或現有CDI觀察器屬性的現有解決方案。 您還應該能夠編寫可移植的CDI擴展 ,以將此功能添加到代碼中。
翻譯自: https://www.javacodegeeks.com/2013/05/asynchronous-cdi-events.html
總結
- 上一篇: 2023 东京电玩展直播时间表公布,将提
- 下一篇: 电影《敢死队 4:最终章》预告片公开,杰