oracle aq_通过Java 8流使用Oracle AQ
oracle aq
Oracle數據庫最令人敬畏的功能之一是Oracle AQ:Oracle數據庫高級隊列 。 AQ API直接在數據庫中實現了完整的事務性消息傳遞系統。
在數據庫處于系統中心的經典體系結構中,使用AQ進行進程間通信時,多個應用程序(其中一些是用Java編寫的,其他應用是用Perl或PL / SQL編寫的,等等)訪問同一數據庫。太好了 如果您更喜歡Java EE,則可以購買基于Java的MQ解決方案,并將該消息總線/中間件放在系統體系結構的中心。 但是,為什么不使用數據庫呢?
如何在jOOQ中使用PL / SQL AQ API
用于AQ消息入隊和出隊的PL / SQL API非常簡單,可以使用jOOQ的OracleDSL.DBMS_AQ API從Java輕松訪問它。
此處使用的隊列配置如下所示:
CREATE OR REPLACE TYPE message_t AS OBJECT (ID NUMBER(7),title VARCHAR2(100 CHAR) ) /BEGINDBMS_AQADM.CREATE_QUEUE_TABLE(queue_table => 'message_aq_t',queue_payload_type => 'message_t');DBMS_AQADM.CREATE_QUEUE(queue_name => 'message_q',queue_table => 'message_aq_t');DBMS_AQADM.START_QUEUE(queue_name => 'message_q');COMMIT; END; /并且jOOQ代碼生成器將生成有用的類,并將所有類型信息直接與其關聯(簡化示例):
class Queues {static final Queue<MessageTRecord> MESSAGE_Q = new QueueImpl<>("NEW_AUTHOR_AQ", MESSAGE_T); }class MessageTRecord {void setId(Integer id) { ... }Integer getId() { ... }void setTitle(String title) { ... }String getTitle() { ... }MessageTRecord(Integer id, String title) { ... } }然后,可以使用這些類直接在生成的隊列引用上安全地使消息類型入隊和出隊:
// The jOOQ configuration Configuration c = ...// Enqueue a message DBMS_AQ.enqueue(c, MESSAGE_Q, new MessageTRecord(1, "test"));// Dequeue it again MessageTRecord message = DBMS_AQ.dequeue(c, MESSAGE_Q);很簡單,不是嗎?
現在,讓我們利用Java 8功能
消息隊列就是無限(阻塞)消息流。 從Java 8開始,我們為此類消息流提供了強大的API,即Stream API。
這就是為什么我們為即將到來的jOOQ 3.8添加了新的API,將現有的jOOQ AQ API與Jav??a 8 Streams相結合的原因:
// The jOOQ configuration Configuration c = ...DBMS_AQ.dequeueStream(c, MESSAGE_Q).filter(m -> "test".equals(m.getTitle())).forEach(System.out::println);上面的流管道將在MESSAGE_Q隊列上偵聽,使用所有消息,過濾掉不包含"test"的消息,并打印其余消息。
阻止流
有趣的是,這是一個無限的阻塞流。 只要隊列中沒有新消息,流管道處理就會簡單地在隊列上阻塞,等待新消息。 這對于順序流不是問題,但是在調用Stream.parallel() ,會發生什么?
jOOQ將消耗事務中的每條消息。 jOOQ 3.8事務在ForkJoinPool.ManagedBlocker運行:
static <T> Supplier<T> blocking(Supplier<T> supplier) {return new Supplier<T>() {volatile T result;@Overridepublic T get() {try {ForkJoinPool.managedBlock(new ManagedBlocker() {@Overridepublic boolean block() {result = supplier.get();return true;}@Overridepublic boolean isReleasable() {return result != null;}});}catch (InterruptedException e) {throw new RuntimeException(e);}return asyncResult;}}; }這不是很多魔術。 當ManagedBlocker由ForkJoinWorkerThread運行時,它會運行一些特殊的代碼,以確保線程的ForkJoinPool不會由于線程耗盡而死鎖。 有關更多信息,請在此處閱讀此有趣的文章:http: //zeroturnaround.com/rebellabs/java-parallel-streams-are-bad-for-your-health
或以下堆棧溢出答案: http : //stackoverflow.com/a/35272153/521799
因此,如果您想要超快速的并行AQ出隊過程,請運行:
// The jOOQ configuration. Make sure its referenced // ConnectionPool has enough connections Configuration c = ...DBMS_AQ.dequeueStream(c, MESSAGE_Q).parallel().filter(m -> "test".equals(m.getTitle())).forEach(System.out::println);而且您將擁有多個線程,這些線程將使消息并行出隊。
不想等待jOOQ 3.8?
沒問題。 使用當前版本并將dequeue操作包裝在您自己的Stream :
Stream<MessageTRecord> stream = Stream.generate(() ->DSL.using(config).transactionResult(c ->dequeue(c, MESSAGE_Q)) );做完了
獎勵:異步出隊
在我們討論時,排隊系統的另一個很好的功能是它們的異步性。 在Java 8中, CompletionStage是一個非常有用的用于建模(和組合)異步算法的類型,它是默認實現CompletableFuture ,它再次在ForkJoinPool執行任務。
使用jOOQ 3.8,您可以再次簡單地調用
// The jOOQ configuration. Make sure its referenced // ConnectionPool has enough connections Configuration c = ...CompletionStage<MessageTRecord> stage = DBMS_AQ.dequeueAsync(c, MESSAGE_Q).thenCompose(m -> ...)...;敬請期待jOOQ博客上的另一篇文章,我們將研究更復雜的異步用例,用jOOQ 3.8和Java 8阻止SQL語句。
翻譯自: https://www.javacodegeeks.com/2016/02/using-oracle-aq-via-java-8-streams.html
oracle aq
創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎總結
以上是生活随笔為你收集整理的oracle aq_通过Java 8流使用Oracle AQ的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 安卓一键清理内存清理(安卓一键清理内存)
- 下一篇: (备案倒金款)