活性卡桑德拉
或是冒險從Cassandra被動地讀取數據。
總覽
讓我們首先嘗試從編程的角度定義什么是反應性。
功能反應式編程是使用功能性編程的構建塊進行反應式編程的編程范例。
函數式編程是一種編程范例,是一種構建計算機程序的結構和元素的樣式,這種處理將計算視為對避免狀態和可變數據的數學函數的評估。 函數式編程強調產生僅取決于其輸入而不取決于程序狀態的結果的函數。
我們如何用Java進行函數式編程? Java是面向對象的編程語言,其中到處都有可變狀態。
世界各地的所有Java開發人員都使用以下任何接口:
java.lang.Runnable,java.util.Comparator,java.util.concurrent.Callable或java.awt.event.ActionListener。 所有這些接口都只聲明了一個方法。 這些接口被稱為單一抽象方法或SAM。 使用這些流行的方法是創建匿名內部類。
public class RunnableTest {public static void main(Sting[] args){new Thread(new Runnable(){@Overridepublic void run(){System.out.println("A new thread is running ...");}}).start();} }由于語言規范中不包含函數,因此Java中的函數式編程非常困難。 通過引入“ lambda”,它將在Java 8中變得更加簡單。 但是我們如何用Java進行函數式編程呢?
讓我們看一個簡單的例子。
@FunctionalInterface public interface Worker {public void doWork(); } public class FunctionalWorker {public static void main(String[] args){// anonymous inner class wayexecute( new Worker(){@Overridepublic void doWork() {System.out.println ("working ...");}});// lambda's wayexecute(() -> System.out.println("working in lambda's way ..."));}public static void execute(Worker worker){worker.doWork();} }響應式編程是一種圍繞數據流和更改傳播的編程范例。 例如,在命令式編程設置中,a:= b + c表示在表達式求值的瞬間,即為a分配了b + c的結果。 以后可以更改b或c的值,而不會影響a。 在反應式編程中,a的值將基于新值自動更新。
因此,我們應該對什么是功能響應式編程有一個很好的了解,讓我們開始構建一個原型……
反應性地從Cassandra讀取數據
卡桑德拉(Cassandra)是其中的NoSql存儲之一,非常受歡迎。
假設我們必須構建一個頭像服務。 該服務將把化身的元信息和內容直接存儲在cassandra中。
我們正在使用的Java驅動程序通過executeAsync()方法為我們提供了查詢cassandra異步的支持 。 調用此方法將返回一個Future。 眾所周知,java Futures是可阻止的并且無法組成。
好的,所以我們有異步支持,但是我們仍然需要一種能夠以被動方式讀取它的方法……
Netflix建立了RxJava庫,并在以后開源了該庫,該庫提供了Java(以及其他JVM語言)的功能性響應編程。
通過提供可過濾,選擇,轉換,組合和組成Observable的運算符的集合,Functional React提供了有效的執行和組合。
可以將Observable數據類型視為等效于Iterable的“推”,即“拉”。 使用Iterable,使用者從生產者和線程塊中提取值,直到這些值到達為止。 與Observable類型相反,只要值可用,生產者就會將值推送給消費者。 這種方法更加靈活,因為值可以同步或異步到達。
Observable類型將四個缺少的語義添加到Gang of Four的Observer模式中,在Iterable類型中可用:
通過這兩個簡單的加法,我們統一了Iterable和Observable類型。 它們之間的唯一區別是數據流動的方向。 這一點非常重要,因為現在我們對Iterable執行的任何操作也可以對Observable執行。
讓我們看看如何結合RxJava和Cassandra異步查詢執行來構建Observable。
executeAsync()方法返回一個Guava可監聽的Future 。 在此將來添加回調可以使我們正確地實現Observer接口。
一個簡單的服務可以實現如下:
package net.devsprint.reactive.cassandra;import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors;import rx.Observable;import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.Session;public class AvatarService {private static final String QUERY = "select * avatars";private static final ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());private final Session session;public AvatarService(Session session) {this.session = session;}Observable getAvatars() {return ObservableCassandra.executeAsync(session, QUERY, executorService);}}假設查詢很繁重,我們將提供一個單獨的執行上下文,在該上下文中將執行回調。
通過這兩個類,我們可以啟動Avatar服務,但它不會做任何事情。 僅當至少有一個訂戶時,它才會開始從Cassandra獲取數據。 一個完整的例子可以在Reactive Cassandra中找到。
翻譯自: https://www.javacodegeeks.com/2014/01/reactive-cassandra.html
總結
- 上一篇: Hibernate 4.3 ORM工具
- 下一篇: 笔记本硬盘拆开内部图解电脑硬盘如何拆卸