通过Java 8流使用Oracle AQ
Oracle數(shù)據(jù)庫最令人敬畏的功能之一是Oracle AQ:Oracle數(shù)據(jù)庫高級隊(duì)列 。 AQ API直接在數(shù)據(jù)庫中實(shí)現(xiàn)了完整的事務(wù)性消息傳遞系統(tǒng)。
在數(shù)據(jù)庫位于系統(tǒng)中心的經(jīng)典體系結(jié)構(gòu)中,使用AQ進(jìn)行進(jìn)程間通信時(shí),多個(gè)應(yīng)用程序(其中一些應(yīng)用程序用Java編寫,其他應(yīng)用程序用Perl或PL / SQL編寫)訪問同一數(shù)據(jù)庫。太好了 如果您更喜歡Java EE,則可以購買基于Java的MQ解決方案,并將該消息總線/中間件放在系統(tǒng)體系結(jié)構(gòu)的中心。 但是,為什么不使用數(shù)據(jù)庫呢?
如何在jOOQ中使用PL / SQL AQ API
用于AQ消息入隊(duì)和出隊(duì)的PL / SQL API非常簡單,可以使用jOOQ的OracleDSL.DBMS_AQ API從Java輕松訪問它。
此處使用的隊(duì)列配置如下所示:
CREATE OR REPLACE TYPE message_t AS OBJECT (ID NUMBER(7),title VARCHAR2(100 CHAR) ) /BEGINDBMS_AQADM.CREATE_QUEUE_TABLE(queue_table => 'message_aq_t',queue_payload_type => 'message_t');DBMS_AQADM.CREATE_QUEUE(queue_name => 'message_q',queue_table => 'message_aq_t');DBMS_AQADM.START_QUEUE(queue_name => 'message_q');COMMIT; END; /并且jOOQ代碼生成器將生成有用的類,并將所有類型信息直接與它們相關(guān)聯(lián)(簡化示例):
class Queues {static final Queue<MessageTRecord> MESSAGE_Q = new QueueImpl<>("NEW_AUTHOR_AQ", MESSAGE_T); }class MessageTRecord {void setId(Integer id) { ... }Integer getId() { ... }void setTitle(String title) { ... }String getTitle() { ... }MessageTRecord(Integer id, String title) { ... } }然后,可以使用這些類直接在生成的隊(duì)列引用上安全地使消息類型入隊(duì)和出隊(duì):
// The jOOQ configuration Configuration c = ...// Enqueue a message DBMS_AQ.enqueue(c, MESSAGE_Q, new MessageTRecord(1, "test"));// Dequeue it again MessageTRecord message = DBMS_AQ.dequeue(c, MESSAGE_Q);很簡單,不是嗎?
現(xiàn)在,讓我們利用Java 8功能
消息隊(duì)列就是無限(阻塞)消息流。 從Java 8開始,我們?yōu)榇祟愊⒘魈峁┝藦?qiáng)大的API,即Stream API。
這就是為什么我們?yōu)榧磳⒌絹淼膉OOQ 3.8添加了新的API,將現(xiàn)有的jOOQ AQ API與Jav??a 8 Streams相結(jié)合的原因:
// The jOOQ configuration Configuration c = ...DBMS_AQ.dequeueStream(c, MESSAGE_Q).filter(m -> "test".equals(m.getTitle())).forEach(System.out::println);上面的流管道將偵聽MESSAGE_Q隊(duì)列,使用所有消息,過濾掉不包含"test"的消息,并打印其余消息。
阻止流
有趣的是,這是一個(gè)無限的阻塞流。 只要隊(duì)列中沒有新消息,流管道處理就會簡單地在隊(duì)列上阻塞,等待新消息。 這對于順序流來說不是問題,但是在調(diào)用Stream.parallel()時(shí)會發(fā)生什么呢?
jOOQ將消耗事務(wù)中的每個(gè)消息。 jOOQ 3.8事務(wù)在ForkJoinPool.ManagedBlocker運(yùn)行:
static <T> Supplier<T> blocking(Supplier<T> supplier) {return new Supplier<T>() {volatile T result;@Overridepublic T get() {try {ForkJoinPool.managedBlock(new ManagedBlocker() {@Overridepublic boolean block() {result = supplier.get();return true;}@Overridepublic boolean isReleasable() {return result != null;}});}catch (InterruptedException e) {throw new RuntimeException(e);}return asyncResult;}}; }這不是很多魔術(shù)。 當(dāng)ManagedBlocker由ForkJoinWorkerThread運(yùn)行時(shí),它會運(yùn)行一些特殊的代碼,以確保線程的ForkJoinPool不會由于線程耗盡而死鎖。 有關(guān)更多信息,請?jiān)诖颂庨喿x此有趣的文章:http: //zeroturnaround.com/rebellabs/java-parallel-streams-are-bad-for-your-health
或以下堆棧溢出答案: http : //stackoverflow.com/a/35272153/521799
因此,如果您想要超快速的并行AQ出隊(duì)過程,請運(yùn)行:
// The jOOQ configuration. Make sure its referenced // ConnectionPool has enough connections Configuration c = ...DBMS_AQ.dequeueStream(c, MESSAGE_Q).parallel().filter(m -> "test".equals(m.getTitle())).forEach(System.out::println);而且您將擁有幾個(gè)線程,這些線程將使消息并行出隊(duì)。
不想等待jOOQ 3.8?
沒問題。 使用當(dāng)前版本并將dequeue操作包裝在您自己的Stream :
Stream<MessageTRecord> stream = Stream.generate(() ->DSL.using(config).transactionResult(c ->dequeue(c, MESSAGE_Q)) );做完了
獎(jiǎng)勵(lì):異步出隊(duì)
在我們討論時(shí),排隊(duì)系統(tǒng)的另一個(gè)很好的功能是它們的異步性。 在Java 8中, CompletionStage是一個(gè)非常有用的用于建模(和組合)異步算法的類型,它是默認(rèn)實(shí)現(xiàn)CompletableFuture ,它再次在ForkJoinPool執(zhí)行任務(wù)。
使用jOOQ 3.8,您可以再次簡單地調(diào)用
// The jOOQ configuration. Make sure its referenced // ConnectionPool has enough connections Configuration c = ...CompletionStage<MessageTRecord> stage = DBMS_AQ.dequeueAsync(c, MESSAGE_Q).thenCompose(m -> ...)...;敬請期待jOOQ博客上的另一篇文章,我們將研究更復(fù)雜的異步用例,并使用jOOQ 3.8和Java 8阻止SQL語句。
翻譯自: https://www.javacodegeeks.com/2016/02/using-oracle-aq-via-java-8-streams.html
總結(jié)
以上是生活随笔為你收集整理的通过Java 8流使用Oracle AQ的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 食用螺类大全 食用螺类的简介
- 下一篇: 使您的Java 8方法引用生效