将java.util.concurrent.BlockingQueue用作rx.Observable
在Java中,經(jīng)典的生產(chǎn)者-消費者模式相對簡單,因為我們有java.util.concurrent.BlockingQueue 。 為了避免繁忙的等待和容易出錯的手動鎖定,我們只需利用put()和take() 。 如果隊列已滿或為空,它們都將阻塞。 我們需要的是一堆線程共享對同一隊列的引用:一些正在生產(chǎn)而其他正在消耗。 當(dāng)然,隊列必須具有有限的容量,否則,如果生產(chǎn)者的表現(xiàn)優(yōu)于消費者,我們很快就會用光內(nèi)存。 格雷格·揚(yáng)(Greg Young)在波蘭Devoxx期間對這條規(guī)則的強(qiáng)調(diào)不夠:
永遠(yuǎn)不要創(chuàng)建無限隊列
使用
這是最簡單的例子。 首先,我們需要一個將對象放在共享隊列中的生產(chǎn)者:
import lombok.Value; import lombok.extern.slf4j.Slf4j;@Slf4j @Value class Producer implements Runnable {private final BlockingQueue<User> queue;@Overridepublic void run() {try {while (!Thread.currentThread().isInterrupted()) {final User user = new User("User " + System.currentTimeMillis());log.info("Producing {}", user);queue.put(user);TimeUnit.SECONDS.sleep(1);}} catch (Exception e) {log.error("Interrupted", e);}} }生產(chǎn)者只需每秒將User類的實例(無論它是什么)發(fā)布到給定隊列。 顯然,在現(xiàn)實生活中,將User在隊列中是系統(tǒng)中某些操作(例如用戶登錄)的結(jié)果。 同樣,消費者從隊列中獲取新項目并進(jìn)行處理:
@Slf4j @Value class Consumer implements Runnable {private final BlockingQueue<User> queue;@Overridepublic void run() {try {while (!Thread.currentThread().isInterrupted()) {final User user = queue.take();log.info("Consuming: {}", user);}} catch (Exception e) {log.error("Interrupted", e);}} }再次,在現(xiàn)實生活中,處理將意味著存儲在數(shù)據(jù)庫中或?qū)τ脩暨\行某些欺詐檢測。 我們使用隊列將處理線程與消耗線程解耦,例如減少延遲。 為了運行一個簡單的測試,讓我們啟動幾個生產(chǎn)者和消費者線程:
BlockingQueue<User> queue = new ArrayBlockingQueue<>(1_000); final List<Runnable> runnables = Arrays.asList(new Producer(queue),new Producer(queue),new Consumer(queue),new Consumer(queue),new Consumer(queue) );final List<Thread> threads = runnables.stream().map(runnable -> new Thread(runnable, threadName(runnable))).peek(Thread::start).collect(toList());TimeUnit.SECONDS.sleep(5); threads.forEach(Thread::interrupt);//...private static String threadName(Runnable runnable) {return runnable.getClass().getSimpleName() + "-" + System.identityHashCode(runnable); }我們有2個生產(chǎn)者和3個消費者,似乎一切正常。 在現(xiàn)實生活中,您可能會有一些隱式生產(chǎn)者線程,例如HTTP請求處理線程。 在使用者方面,您很可能會使用線程池。 這種模式效果很好,但是特別是在消費方面是很底層的。
介紹
本文的目的是介紹一種抽象,其行為類似于生產(chǎn)者方的隊列,但表現(xiàn)為來自消費者方的RxJava的Observable 。 換句話說,我們可以將添加到隊列中的對象視為可以在客戶端映射,過濾,撰寫等的流。 有趣的是,這不再是排在后面的隊列。 ObservableQueue<T>僅將所有新對象直接轉(zhuǎn)發(fā)給訂閱的使用者,并且在沒有人監(jiān)聽(“可觀察到的” 熱 )的情況下不緩沖事件。 ObservableQueue<T>本身并不是隊列,它只是一個API與另一個API之間的橋梁。 它類似于java.util.concurrent.SynchronousQueue ,但是如果沒有人對使用感興趣,則將對象簡單地丟棄。
這是第一個實驗性實現(xiàn)。 這只是一個玩具代碼,不要認(rèn)為它已準(zhǔn)備就緒。 另外,我們稍后將對其進(jìn)行簡化:
public class ObservableQueue<T> implements BlockingQueue<T>, Closeable {private final Set<Subscriber<? super T>> subscribers = Collections.newSetFromMap(new ConcurrentHashMap<>());private final Observable<T> observable = Observable.create(subscriber -> {subscriber.add(new Subscription() {@Overridepublic void unsubscribe() {subscribers.remove(subscriber);}@Overridepublic boolean isUnsubscribed() {return false;}});subscribers.add(subscriber);});public Observable<T> observe() {return observable;}@Overridepublic boolean add(T t) {return offer(t);}@Overridepublic boolean offer(T t) {subscribers.forEach(subscriber -> subscriber.onNext(t));return true;}@Overridepublic T remove() {return noSuchElement();}@Overridepublic T poll() {return null;}@Overridepublic T element() {return noSuchElement();}private T noSuchElement() {throw new NoSuchElementException();}@Overridepublic T peek() {return null;}@Overridepublic void put(T t) throws InterruptedException {offer(t);}@Overridepublic boolean offer(T t, long timeout, TimeUnit unit) throws InterruptedException {return offer(t);}@Overridepublic T take() throws InterruptedException {throw new UnsupportedOperationException("Use observe() instead");}@Overridepublic T poll(long timeout, TimeUnit unit) throws InterruptedException {return null;}@Overridepublic int remainingCapacity() {return 0;}@Overridepublic boolean remove(Object o) {return false;}@Overridepublic boolean containsAll(Collection<?> c) {return false;}@Overridepublic boolean addAll(Collection<? extends T> c) {c.forEach(this::offer);return true;}@Overridepublic boolean removeAll(Collection<?> c) {return false;}@Overridepublic boolean retainAll(Collection<?> c) {return false;}@Overridepublic void clear() {}@Overridepublic int size() {return 0;}@Overridepublic boolean isEmpty() {return true;}@Overridepublic boolean contains(Object o) {return false;}@Overridepublic Iterator<T> iterator() {return Collections.emptyIterator();}@Overridepublic Object[] toArray() {return new Object[0];}@Overridepublic <T> T[] toArray(T[] a) {return a;}@Overridepublic int drainTo(Collection<? super T> c) {return 0;}@Overridepublic int drainTo(Collection<? super T> c, int maxElements) {return 0;}@Overridepublic void close() throws IOException {subscribers.forEach(rx.Observer::onCompleted);} }關(guān)于它有兩個有趣的事實:
假設(shè)我正確實現(xiàn)了隊列協(xié)定,生產(chǎn)者可以像使用其他BlockingQueue<T>一樣使用ObservableQueue<T> 。 但是,消費者看起來更輕巧,更聰明:
final ObservableQueue<User> users = new ObservableQueue<>(); final Observable<User> observable = users.observe();users.offer(new User("A")); observable.subscribe(user -> log.info("User logged in: {}", user)); users.offer(new User("B")); users.offer(new User("C"));上面的代碼僅打印"B"和"C" 。 由于ObservableQueue會在沒有人監(jiān)聽的情況下丟棄項目,因此設(shè)計會丟失"A" 。 顯然, Producer類現(xiàn)在使用users隊列。 一切正常,您可以隨時調(diào)用users.observe()并應(yīng)用數(shù)十個Observable運算符之一。 但是有一個警告:默認(rèn)情況下,RxJava不執(zhí)行任何線程處理,因此消耗與產(chǎn)生線程在同一線程中發(fā)生! 我們失去了生產(chǎn)者-消費者模式的最重要特征,即線程去耦。 幸運的是,RxJava中的所有內(nèi)容都是聲明性的,線程調(diào)度也是如此:
users.observe().observeOn(Schedulers.computation()).forEach(user ->log.info("User logged in: {}", user));現(xiàn)在讓我們看一下RxJava的真正功能。 假設(shè)您要計算每秒登錄的用戶數(shù),其中每個登錄都作為事件放入隊列中:
users.observe().map(User::getName).filter(name -> !name.isEmpty()).window(1, TimeUnit.SECONDS).flatMap(Observable::count).doOnCompleted(() -> log.info("System shuts down")).forEach(c -> log.info("Logins in last second: {}", c));性能也是可以接受的,這樣的隊列每秒可以在我的一個訂戶的筆記本電腦上接受約300萬個對象。 將此類視為使用隊列到現(xiàn)代反應(yīng)世界的舊系統(tǒng)的適配器。 可是等等! 使用ObservableQueue<T>很容易,但是使用subscribers同步集的實現(xiàn)似乎太底層了。 幸運的是有Subject<T, T> 。 Subject是Observable “另一面” –您可以將事件推送到Subject但是它仍然實現(xiàn)Observable ,因此您可以輕松地創(chuàng)建任意Observable 。 使用Subject實現(xiàn)之一, ObservableQueue外觀如何:
public class ObservableQueue<T> implements BlockingQueue<T>, Closeable {private final Subject<T, T> subject = PublishSubject.create();public Observable<T> observe() {return subject;}@Overridepublic boolean add(T t) {return offer(t);}@Overridepublic boolean offer(T t) {subject.onNext(t);return true;}@Overridepublic void close() throws IOException {subject.onCompleted();}@Overridepublic T remove() {return noSuchElement();}@Overridepublic T poll() {return null;}@Overridepublic T element() {return noSuchElement();}private T noSuchElement() {throw new NoSuchElementException();}@Overridepublic T peek() {return null;}@Overridepublic void put(T t) throws InterruptedException {offer(t);}@Overridepublic boolean offer(T t, long timeout, TimeUnit unit) throws InterruptedException {return offer(t);}@Overridepublic T take() throws InterruptedException {throw new UnsupportedOperationException("Use observe() instead");}@Overridepublic T poll(long timeout, TimeUnit unit) throws InterruptedException {return null;}@Overridepublic int remainingCapacity() {return 0;}@Overridepublic boolean remove(Object o) {return false;}@Overridepublic boolean containsAll(Collection<?> c) {return false;}@Overridepublic boolean addAll(Collection<? extends T> c) {c.forEach(this::offer);return true;}@Overridepublic boolean removeAll(Collection<?> c) {return false;}@Overridepublic boolean retainAll(Collection<?> c) {return false;}@Overridepublic void clear() {}@Overridepublic int size() {return 0;}@Overridepublic boolean isEmpty() {return true;}@Overridepublic boolean contains(Object o) {return false;}@Overridepublic Iterator<T> iterator() {return Collections.emptyIterator();}@Overridepublic Object[] toArray() {return new Object[0];}@Overridepublic <T> T[] toArray(T[] a) {return a;}@Overridepublic int drainTo(Collection<? super T> c) {return 0;}@Overridepublic int drainTo(Collection<? super T> c, int maxElements) {return 0;}}上面的實現(xiàn)更加簡潔,我們完全不必?fù)?dān)心線程同步。
翻譯自: https://www.javacodegeeks.com/2015/07/consuming-java-util-concurrent-blockingqueue-as-rx-observable.html
總結(jié)
以上是生活随笔為你收集整理的将java.util.concurrent.BlockingQueue用作rx.Observable的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: (linux utf)
- 下一篇: linux的消息队列参数(linux的消