技术停滞_检测和测试停滞的流– RxJava常见问题解答
技術(shù)停滯
假設(shè)您有一個流以不可預(yù)測的頻率發(fā)布事件。 有時您可以預(yù)期每秒會有數(shù)十條消息,但是偶爾幾秒鐘都看不到任何事件。 如果您的流是通過Web套接字,SSE或任何其他網(wǎng)絡(luò)協(xié)議傳輸?shù)?#xff0c;則可能會出現(xiàn)問題。 靜默時間過長(停頓)可以解釋為網(wǎng)絡(luò)問題。 因此,我們經(jīng)常不時發(fā)送人造事件( ping ),以確保:
- 客戶還活著
- 讓客戶知道我們還活著
舉一個更具體的例子,假設(shè)我們有一個Flowable<String>流,它會產(chǎn)生一些事件。 如果沒有事件超過一秒鐘,我們應(yīng)該發(fā)送一個占位符"PING"消息。 當(dāng)靜默時間更長時,應(yīng)該每秒發(fā)出一個"PING"消息。 我們?nèi)绾卧赗xJava中實現(xiàn)這樣的要求? 最明顯但不正確的解決方案是將原始流與ping合并:
Flowable<String> events = //... Flowable<String> pings = Flowable.interval(1, SECONDS).map(x -> "PING");Flowable<String> eventsWithPings = events.mergeWith(pings);mergeWith()運算符至關(guān)重要:它接受真正的events ,并將它們與恒定的ping流合并。 當(dāng)然,當(dāng)沒有真實的事件出現(xiàn)時,將出現(xiàn)"PING"消息。 不幸的是,它們與原始流完全無關(guān)。 這意味著即使有很多正常事件,我們也會繼續(xù)發(fā)送ping命令。 而且,當(dāng)靜默開始時,我們不會在一秒鐘后精確發(fā)送"PING" 。 如果您對這種機制感到滿意,則可以在此處停止閱讀。
一種更復(fù)雜的方法需要發(fā)現(xiàn)持續(xù)超過1秒的靜音。 我們可以使用timeout()運算符。 不幸的是,它會產(chǎn)生TimeoutException并從上游退訂-行為過于激進(jìn)。 我們只想收到某種通知。 事實證明,可以使用debounce()運算符。 通常,此操作員會推遲新事件的發(fā)出,以防萬一有新事件出現(xiàn),從而覆蓋了舊事件。 所以,如果我說:
Flowable<String> events = //... Flowable<String> delayed = events.debounce(1, SECONDS);這意味著delayed流僅在1秒內(nèi)未跟隨其他事件時才會發(fā)出事件。 如果events流使產(chǎn)生事件的速度足夠快,從技術(shù)上講, delayed可能永遠(yuǎn)不會發(fā)出任何東西。 我們將使用delayed流通過以下方式發(fā)現(xiàn)沉默:
Flowable<String> events = //... Flowable<String> delayed = events.debounce(1, SECONDS); Flowable<String> pings = delayed.map(ev -> "PING"); Flowable<String> eventsWithPings = Flowable.merge(events, pings);請記住, mergeWith()和它的static merge()對應(yīng)項之間沒有區(qū)別。 所以我們到了某個地方。 如果流繁忙,則delayed流將永遠(yuǎn)不會收到任何事件,因此不會發(fā)送"PING"消息。 但是,當(dāng)原始流不發(fā)送任何事件超過1秒時, delayed接收到最后看到的事件,將其忽略并轉(zhuǎn)換為"PING" 。 聰明,但壞了。 此實現(xiàn)僅在發(fā)現(xiàn)停頓后才發(fā)送一個"PING" ,而不是每秒發(fā)送一次定期ping。 很容易修復(fù)! 除了將最后一次看到的事件轉(zhuǎn)換為單個"PING"我們還可以將其轉(zhuǎn)換為周期性ping序列:
Flowable<String> events = //... Flowable<String> delayed = events.debounce(1, SECONDS); Flowable<String> pings = delayed.flatMap(x -> Flowable.interval(0, 1, SECONDS).map(e -> "PING")); Flowable<String> eventsWithPings = Flowable.merge(events, pings);您能看到缺陷在哪里嗎? 每當(dāng)原始流中出現(xiàn)一點沉默時,我們就會每秒發(fā)出一次ping 。 但是,一旦出現(xiàn)一些真正的事件,我們應(yīng)該停止這樣做。 我們沒有。 上游的每個停頓都會導(dǎo)致新的無限ping流出現(xiàn)在最終的合并流中。 我們必須以某種方式告訴pings流,因為原始流發(fā)出了真正的事件,所以它應(yīng)該停止發(fā)出ping 。 猜猜是什么,有takeUntil()運算符可以做到這一點!
Flowable<String> events = //... Flowable<String> delayed = events.debounce(1, SECONDS); Flowable<String> pings = delayed.flatMap(x -> Flowable.interval(0, 1, SECONDS).map(e -> "PING").takeUntil(events)); Flowable<String> eventsWithPings = Flowable.merge(events, pings);花一點時間完全掌握上面的代碼片段。 每當(dāng)原始流上超過1秒沒有任何React時, delayed流就會發(fā)出一個事件。 pings流發(fā)射的序列"PING"每秒從發(fā)射每個事件的事件delayed 。 但是,一旦事件出現(xiàn)在events流上,便會終止pings流。 您甚至可以將所有這些定義為單個表達(dá)式:
Flowable<String> events = //... Flowable<String> eventsWithPings = events.mergeWith(events.debounce(1, SECONDS).flatMap(x1 -> Flowable.interval(0, 1, SECONDS).map(e -> "PING").takeUntil(events)));可測性
好的,我們已經(jīng)編寫了所有這些內(nèi)容,但是我們應(yīng)該如何測試事件驅(qū)動代碼的這個三層嵌套的Blob? 我們?nèi)绾未_保ping在正確的時間出現(xiàn)并在靜默結(jié)束時停止? 如何模擬各種與時間相關(guān)的場景? RxJava具有許多殺手級功能,但是測試時間流逝可能是最大的功能。 首先,讓我們的ping代碼更具可測試性和通用性:
<T> Flowable<T> withPings(Flowable<T> events, Scheduler clock, T ping) {return events.mergeWith(events.debounce(1, SECONDS, clock).flatMap(x1 -> Flowable.interval(0, 1, SECONDS, clock).map(e -> ping).takeUntil(events)));}此實用程序方法采用任意的T流并添加ping ,以防該流在較長時間內(nèi)不產(chǎn)生任何事件。 我們在測試中像這樣使用它:
PublishProcessor<String> events = PublishProcessor.create(); TestScheduler clock = new TestScheduler(); Flowable<String> eventsWithPings = withPings(events, clock, "PING");哦,男孩, PublishProcessor , TestScheduler ? PublishProcessor是一個有趣的類,它是一個亞型Flowable (所以我們可以使用它作為一個普通的流)。 另一方面,我們可以使用其onNext()方法強制發(fā)出事件:
events.onNext("A");如果有人收聽events流,他將立即收到"A"事件。 這clock是怎么回事? RxJava中以任何方式處理時間的每個運算符(例如debounce debounce() , interval() , timeout() , window() )都可以采用可選的Scheduler參數(shù)。 它充當(dāng)時間的外部來源。 特殊的TestScheduler是我們完全控制的人為的時間來源。 也就是說,只要我們不顯式調(diào)用advanceTimeBy()時間就保持靜止:
clock.advanceTimeBy(999, MILLISECONDS);999毫秒不是巧合。 Ping在1秒鐘后開始精確顯示,因此在999毫秒后將不可見。 現(xiàn)在是時候揭示完整的測試用例了:
@Test public void shouldAddPings() throws Exception {PublishProcessor<String> events = PublishProcessor.create();final TestScheduler clock = new TestScheduler();final Flowable<String> eventsWithPings = withPings(events, clock, "PING");final TestSubscriber<String> test = eventsWithPings.test();events.onNext("A");test.assertValues("A");clock.advanceTimeBy(999, MILLISECONDS);events.onNext("B");test.assertValues("A", "B");clock.advanceTimeBy(999, MILLISECONDS);test.assertValues("A", "B");clock.advanceTimeBy(1, MILLISECONDS);test.assertValues("A", "B", "PING");clock.advanceTimeBy(999, MILLISECONDS);test.assertValues("A", "B", "PING");events.onNext("C");test.assertValues("A", "B", "PING", "C");clock.advanceTimeBy(1000, MILLISECONDS);test.assertValues("A", "B", "PING", "C", "PING");clock.advanceTimeBy(999, MILLISECONDS);test.assertValues("A", "B", "PING", "C", "PING");clock.advanceTimeBy(1, MILLISECONDS);test.assertValues("A", "B", "PING", "C", "PING", "PING");clock.advanceTimeBy(999, MILLISECONDS);test.assertValues("A", "B", "PING", "C", "PING", "PING");events.onNext("D");test.assertValues("A", "B", "PING", "C", "PING", "PING", "D");clock.advanceTimeBy(999, MILLISECONDS);events.onNext("E");test.assertValues("A", "B", "PING", "C", "PING", "PING", "D", "E");clock.advanceTimeBy(999, MILLISECONDS);test.assertValues("A", "B", "PING", "C", "PING", "PING", "D", "E");clock.advanceTimeBy(1, MILLISECONDS);test.assertValues("A", "B", "PING", "C", "PING", "PING", "D", "E", "PING");clock.advanceTimeBy(3_000, MILLISECONDS);test.assertValues("A", "B", "PING", "C", "PING", "PING", "D", "E", "PING", "PING", "PING", "PING"); }看起來像一堵墻,但這實際上是我們邏輯的完整測試方案。 它可以確保ping在1000毫秒后精確顯示,當(dāng)沉默很長時重復(fù)一次,而在真正事件出現(xiàn)時則重復(fù)很短。 但最重要的部分是:該測試是100%可預(yù)測的并且非常快。 沒有Awaitility ,忙等待,輪詢,間歇性測試失敗和緩慢。 我們完全控制的人工時鐘可確保所有這些組合流均按預(yù)期工作。
翻譯自: https://www.javacodegeeks.com/2017/09/detecting-testing-stalled-streams-rxjava-faq.html
技術(shù)停滯
總結(jié)
以上是生活随笔為你收集整理的技术停滞_检测和测试停滞的流– RxJava常见问题解答的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 如何将手机数据备份到电脑如何将手机导入电
- 下一篇: 苹果WWDC22总结:更个性化的iOS1