使用Infinispan进行Camel的事务性缓存
不久前,我為Camel 創(chuàng)建了Redis連接器。 Redis是很棒的鍵值存儲(還有更多),但是隨后我需要一個在與Camel相同的JVM中運行的緩存,并注意到Infinispan已切換到ASL v2 。 Camel中已經(jīng)有其他用于在JVM上進行緩存的連接器,例如Hazelcast和EHCache,但是如果您已經(jīng)將Camel用作其他Red Hat產(chǎn)品的一部分,或者想了解LIRS驅(qū)逐如何勝過LRU,那么Infinispan值得嘗試。
簡而言之,Infinispan是事務(wù)性內(nèi)存鍵值存儲和數(shù)據(jù)網(wǎng)格。 在嵌入式模式下使用時,Infinispan與Camel駐留在同一JVM中,并允許Camel使用者接收緩存更改通知:
在上面的示例中,當(dāng)創(chuàng)建緩存條目時,Infinispan將觸發(fā)兩個事件-一個事件在創(chuàng)建緩存條目之前和之后。 也可以同步接收事件,即在處理高速緩存操作的同一線程中接收事件,或在不阻止高速緩存操作的情況下在單獨的線程中異步接收事件。
將Infinispan用作本地緩存很簡單,它公開了ConcurrentMap接口,并具有通常的到期,收回,鈍化,持久存儲,查詢等功能。 使Infinispan成為數(shù)據(jù)網(wǎng)格的是節(jié)點發(fā)現(xiàn)其他節(jié)點以及在它們之間復(fù)制或分發(fā)數(shù)據(jù)的能力。 復(fù)制允許跨集群共享數(shù)據(jù),而分發(fā)使用一致的哈希算法來實現(xiàn)更好的可伸縮性。
在客戶端-服務(wù)器模式下,Infinispan作為獨立應(yīng)用程序運行,并且Camel生產(chǎn)者可以使用Infinispan的Hot Rod客戶端發(fā)送消息。 Hot Rod是一種二進制,語言無關(guān)的智能協(xié)議,允許以拓撲結(jié)構(gòu)和散列分布感知方式與Infinisnap服務(wù)器進行交互。
位于駱駝的Infinispan生產(chǎn)商目前提供GET , PUT , REMOVE和CLEAR操作。 這是生產(chǎn)者將數(shù)據(jù)放入訂單緩存的示例:
<route><from uri="direct:orderCache"/><setHeader headerName="CamelInfinispanKey"><simple>${in.header.orderId}</simple></setHeader><setHeader headerName="CamelInfinispanValue"><simple>${in.header.orderTotal}</simple></setHeader><setHeader headerName="CamelInfinispanOperation"><simple>CamelInfinispanOperationPut</simple></setHeader><to uri="infinispan://localhost?caseName=orders"/> </route>讓我們創(chuàng)建一個更有趣的示例。 Infinispan也符合JTA規(guī)范,可以參與交易。 我們將創(chuàng)建一個用于注冊人員的REST API,該API將首先使用Camel sql組件將該人員持久保存在關(guān)系數(shù)據(jù)庫中,然后在同一事務(wù)中將firstName放入Infinispan緩存中。 我們將使用事務(wù)處理的 Camel路由來做到這一點,因此,如果在路由過程中發(fā)生錯誤,在任何階段,Camel都會確保回滾事務(wù)(用于緩存和數(shù)據(jù)庫),以便數(shù)據(jù)庫和緩存始終處于一致的狀態(tài)。
<route><from uri="restlet:/persons?restletMethod=POST"/><transacted/><!-- PERSIST TO DB --><to uri="sql:insert into person(firstName, lastName) values(:#firstName,:#lastName)?dataSource=#dataSource"/><!-- DAMN EXCEPTION THROWER--><filter><simple>${in.header.lastName} == "damn"</simple><throwException ref="damn"/></filter><!-- PUT TO CACHE --><to uri="sql:select id from person WHERE id = (select max(id) from person)?dataSource=#dataSource"/><setHeader headerName="personId"><simple>${body[0][ID]}</simple></setHeader><setHeader headerName="CamelInfinispanKey"><simple>${headerAs(personId, String)}</simple></setHeader><setHeader headerName="CamelInfinispanValue"><simple>${in.header.firstName}</simple></setHeader><setHeader headerName="CamelInfinispanOperation"><simple>CamelInfinispanOperationPut</simple></setHeader><to uri="infinispan://localhost?cacheContainer=#cacheContainer&caseName=orders"/> </route>如您所見,路由中沒有任何魔術(shù)或額外的配置,這是一條標(biāo)準路由。 我們只有一小段代碼,當(dāng)該人的lastName被該死以模擬路線中間的錯誤時,將引發(fā)異常。
該應(yīng)用程序使用atomikos JTA事務(wù)管理器以獨立模式運行。 首先,我們創(chuàng)建一個JtaTransactionManager ,以供交易路線使用:
<bean id="userTransaction" class="com.atomikos.icatch.jta.UserTransactionImp"/> <bean id="userTransactionManager" class="com.atomikos.icatch.jta.UserTransactionManager"/> <bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager"> <constructor-arg ref="userTransaction"/><constructor-arg ref="userTransactionManager"/> </bean>然后用它包裝我們的數(shù)據(jù)源 :
public AtomikosDataSourceBean atomikosDataSourceBean() throws Exception {EmbeddedXADataSource ds = new EmbeddedXADataSource();ds.setCreateDatabase("create");ds.setDatabaseName("target/testdb");ds.setUser("");ds.setPassword("");AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean();xaDataSource.setXaDataSource(ds);xaDataSource.setUniqueResourceName("xaDerby");return xaDataSource; }并使用TransactionManagerLookup告訴Infinispan參與同一筆交易:
public BasicCacheContainer basicCacheContainer() throws Throwable {GlobalConfiguration glob = new GlobalConfigurationBuilder().nonClusteredDefault().build();Configuration loc = new ConfigurationBuilder().transaction().transactionMode(TransactionMode.TRANSACTIONAL).transactionManagerLookup(new TransactionManagerLookup() {@Overridepublic TransactionManager getTransactionManager() throws Exception {return jtaTransactionManager.getTransactionManager();}}).build();return new DefaultCacheManager(glob, loc, true); }完成所有這些樣板代碼之后,我們就有了數(shù)據(jù)源,緩存和Camel路由參與同一事務(wù)。 要查看具有兩個階段提交和回滾的完整REST示例,請從github獲取源代碼并進行使用。
BTW Camel-infinispan組件仍然不是Camel主干的一部分,要運行示例,您也將需要它 。
翻譯自: https://www.javacodegeeks.com/2013/08/transactional-caching-for-camel-with-infinispan.html
總結(jié)
以上是生活随笔為你收集整理的使用Infinispan进行Camel的事务性缓存的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 社保工伤备案流程(社保工伤备案)
- 下一篇: ddos能攻击手机吗(怎么ddos攻击手