redis streams_初步了解Redis Streams以及如何在Java中使用它们
redis streams
自今年年初以來,Redis Streams已進入Redis的unstable分支,并且第一個客戶開始采用Redis Streams API。 因此,這是一個絕佳的時間,可以從客戶端的角度看一下Redis Streams提供的功能以及如何使用它們。
免責聲明:Redis Streams作為初稿提供,尚未成為穩定版本的一部分。 API可能會更改。
什么是Redis Stream?
Redis流是一種類似于日志/日志的數據結構,它按順序表示事件日志。 消息(事件)可以附加到流中。 然后可以以獨立方式或通過在消費者組中閱讀來使用這些消息。 使用者組是一個概念,其中可以將多個使用者(例如應用程序實例)分組為一個使用者組,其流偏移量(讀取進度)保留在服務器端。 由于不需要在用戶端保留流偏移,因此此概念簡化了構建客戶端的過程。
流消息由Redis在提交時生成的消息ID和表示為哈希(映射)的正文組成-基本上是一組鍵和值。
流本身由一個密鑰標識,并保存零到許多流消息以及一些元數據,例如消費者組。
Redis Stream API
到目前為止,所有流命令都以X為前綴。 流允許使用添加,讀取,自省和維護命令。 在下一部分中,您將看到最常見的命令:
- XADD key * field1 value1 [field2 value2] [fieldN valueN] :將消息附加(提交)到Redis流。
- XREAD [BLOCK timeout] [COUNT n] STREAMS key1 [keyN] offset1 [offsetN] :從Redis流中讀取消息。
- XRANGE key from to [COUNT n] :掃描( XRANGE key from to [COUNT n] )Redis流中的消息
此外,在使用使用者組時,還有其他命令在起作用:
- XREADGROUP GROUP name consumer [BLOCK timeout] [COUNT n] [NOACK] STREAMS key1 [keyN] offset1 [offsetN] :在使用者及其組的上下文中從Redis流中讀取消息。
- XACK key group messageId1 [messageId2] [messageIdN] :在使用者的上下文中讀取后確認消息。
- XPENDING key group [from to COUNT n] :枚舉未決(未確認的消息)。
- XGROUP和子命令:用于創建和刪除使用者組的API。
注意:為簡潔起見,以上命令被截斷了。 有關所有可能選項和組合的說明,請參見Redis Streams文檔 。
使用Redis流
讓我們來看一下如何通過redis-cli應用我們之前看到的命令來使用Redis Stream。 讓我們向新流添加(并最初創建流)消息。
127.0.0.1:6379> XADD my-stream * key value 1527062149743-0我們正在使用XADD通過鍵值元組向流my-stream添加新消息。 注意* (星號)? 這是用于控制ID生成的字段。 如果要由服務器生成消息ID(在99.5%的用例中都是如此,除非您是要復制的Redis服務器),請始終在此放置* 。 Redis回復消息ID 1527062149743-0 。
現在,我們的信息流包含一條消息。 讓我們用XREAD閱讀它。
127.0.0.1:6379> XREAD COUNT 1 STREAMS my-stream 0 1) 1) "my-stream"2) 1) 1) 1527062149743-02) 1) "key"2) "value"我們現在已經閱讀了該消息,并沿著讀取的內容檢索了正文。 讀取消息會將消息保留在流中。 我們可以使用XRANGE驗證這XRANGE :
127.0.0.1:6379> XRANGE my-stream - + 1) 1) 1527068644230-02) 1) "key"2) "value"發出具有相同流偏移量的后續讀取將返回相同的消息。 您有不同的選擇來避免此行為:
讓我們仔細看看這些選項。
MessageId追蹤
每個讀取操作都將返回消息ID和流消息。 如果您只有一個客戶端(沒有并發讀取),則可以在應用程序中保留最新消息ID的引用,并在后續的讀取調用中重用該消息ID。 讓我們針對我們之前看到的1527068644230-0的消息ID進行此1527068644230-0 :
127.0.0.1:6379> XADD my-stream * key value 1527069672240-0 127.0.0.1:6379> XREAD COUNT 1 STREAMS my-stream 1527068644230-0 1) 1) "my-stream"2) 1) 1) 1527069672240-02) 1) "key"2) "value"我們使用1527068644230-0作為流偏移并接收下一條添加的消息。 這種方法允許恢復讀取較舊的(可能已經消耗的消息),但是需要在客戶端進行一些協調,以免讀取重復的消息。
如果您不想跟蹤消息ID,而僅對最新消息感興趣,則可以使用阻塞讀取。
阻止讀取
通過XREAD讀取允許以阻塞方式從流讀取。 XREAD與BLPOP和BRPOP操作的行為類似,在BLPOP和BRPOP操作中,您指定超時,并且如果消息可用或讀取超時,則調用將返回。 但是,Stream API允許更多選項。 對于此示例,我們需要兩個單獨的參與方:生產者和消費者。 如果您從頭開始閱讀,您將看到使用單個客戶端執行的示例。 我們首先從消費者開始,否則產生的消息將到達流中而沒有機會通知正在等待的消費者。
消費者
我們正在將XREAD與BLOCK 10000一起使用以等待10000毫秒(10秒)。 請注意,我們使用的符號流偏移量$指向流的開頭。
127.0.0.1:6379> XREAD COUNT 1 BLOCK 10000 STREAMS my-stream $使用者現在被阻止,等待消息到達。
制片人
127.0.0.1:6379> XADD my-stream * key value 1527070630698-0Redis將消息寫入流中。 現在,讓我們切換回消費者。
消費者
消息寫入我們的流之后,消費者收到一條消息并再次被解除阻止。 您可以開始處理該消息,并可能發出其他讀取。
1) 1) "my-stream"2) 1) 1) 1527070630698-02) 1) "key"2) "value" (1.88s) 使用流偏移量$發出另一個讀取將再次等待到達該流的下一條消息。 但是,使用$會給我們提供一段時間,在此期間我們不會消耗其他消息。 為了避免這些漏洞,您應該跟蹤上一次閱讀的消息ID,并將其重新用于下一個XREAD調用。
還要注意ist并發性。 我們已經看到一個單個消費者的例子。 如果增加消費者數量怎么辦?
在這種情況下,例如,如果您有兩個使用方發出阻塞讀取,那么兩個使用方都會收到同一條消息,這又使我們不得不承擔協調讀取的任務,因此流消息不會被多次處理。
從流中刪除消息
可以從流中刪除消息,但是不建議這樣做。 我們還沒有看到XDEL ,但是從名稱上可以明顯看出我們可以從流中刪除消息:
127.0.0.1:6379> XDEL my-stream 1527070789716-0 (integer) 1該消息現在消失了。 不建議刪除,因為操作成本很高:流使用帶有宏節點的基數樹。 刪除是一種安全的操作,但是在與多個使用者一起使用一條消息時,您需要同步訪問權限,因為刪除不會阻止多次讀取消息。
限制流大小
將消息追加到流時,可以指定最大流大小。 發出XADD命令時,使用MAXLEN選項會發生這種情況。
127.0.0.1:6379> XADD my-stream MAXLEN 4 * key value 1527071269045-0消息將添加到流中,并且將盡最大努力將流修剪到大小限制。 這也意味著較舊的消息將被修剪并且不再可讀。
消費群體
解決重復消息處理的最后一種方法是利用使用者組。 消費者群體的想法是跟蹤確認。 確認允許將消息標記為消費者確認。 XACK命令返回是否已確認該消息或先前的使用者是否已確認該消息。
要使用消費者組,我們需要首先創建一個消費者組。 請注意,自撰寫本文時起,必須先存在一個流,然后才能創建消費者組。 這個問題可能將通過https://github.com/antirez/redis/issues/4824解決。
到目前為止,如果您遵循前面的示例,我們可以重用我們的流my-stream 。
我們正在創建一個名為my-group的使用者組,它僅對流my-stream有效。 請注意,最后一個參數是用于跟蹤讀取進度的流偏移量。 我們使用$指向流頭。
127.0.0.1:6379> XGROUP CREATE my-stream my-group $ OK現在,向流中添加一條消息:
127.0.0.1:6379> XADD my-stream * key value 1527072009813-0并通過XREADGROUP發出非阻塞讀取:
127.0.0.1:6379> XREADGROUP GROUP my-group c1 COUNT 1 STREAMS my-stream > 1) 1) "my-stream"2) 1) 1) 1527072009813-02) 1) "key"2) "value" XREADGROUP接受組名和使用者名來跟蹤閱讀進度。 另請注意,流偏移量> 。 此符號流偏移量指向使用者組my-group讀取的最新消息ID。
您可能已經注意到該組中有一個消費者名稱。 消費者群體旨在跟蹤消息傳遞并區分消費者。 如果您還記得上面的阻塞閱讀示例,您已經看到兩個使用者同時收到一條消息。 要更改(或保留)此行為,可以指定使用者名稱:
根據您使用消息的方式,您可能想重新啟動處理或使用多個客戶端使用消息,而無需建立自己的同步機制。 Redis流允許您通過確認消息來做到這一點。 默認情況下, XREADGROUP確認消息,表明該消息已被處理并且可以被逐出。 您可以指定NOACK在閱讀消息時不確認消息。 處理完消息后,確認消息發出XACK 。 根據返回的命令,您可以查看您是確認消息的對象還是其他客戶端已經確認的消息。
現在讓我們在這里暫停,不要再討論恢復和更高級的主題。 Redis網站在https://redis.io/topics/streams-intro提供了有關Redis Streams的完整文檔。
使用Java消耗Redis流
注意:在編寫本文時,唯一支持Redis Streams的Java客戶端是Lettuce預覽版本5.1.0.M1。
Redis Streams帶有新的服務器端API,該API也需要在客戶端采用。 讓我們使用Java客戶端重播以上示例。
首先,我們需要一個客戶端實例來準備連接。 我們將使用同步API。 但是,異步和React式API也支持Redis Stream API。
RedisClient client = RedisClient.create("redis://localhost"); StatefulRedisConnection<String, String> connection = client.connect(); RedisStreamCommands<String, String> streamCommands = connection.sync();Lettuce引入了一個新的命令接口RedisStreamCommands ,該接口聲明Redis Stream API方法及其各種類型(例如StreamOffset , Consumer和命令參數對象)。
我們要做的第一件事是向流中添加新消息:
Map<String, String> body = Collections.singletonMap("key", "value"); String messageId = streamCommands.xadd("my-stream", body);本示例使用UTF-8編碼的字符串表示鍵和值。 主體本身作為Map傳輸,并發出命令XADD my-stream * key value 。
現在,讓我們使用與XREAD COUNT 1 STREAMS my-stream 0相對應的命令從流中讀取一條消息:
List<StreamMessage<String, String>> messages = streamCommands.xread(XReadArgs.Builder.count(1), StreamOffset.from("my-stream", "0"));if(messages.size() == 1) { // a message was read} else { // no message was read}所述xread(…)方法接受XReadArgs和StreamOffset并返回的列表StreamMessage<K, V>包含消息ID與主體一起對象。 現在可以處理消息了,隨后的讀取將包括最后一個messageId以讀取新消息:
StreamMessage<String, String> message = …; List<StreamMessage<String, String>> messages = streamCommands.xread(XReadArgs.Builder.count(1), StreamOffset.from("my-stream", message.getId()));if(messages.size() == 1) { // a message was read} else { // no message was read}阻塞讀取需要將額外的持續時間傳遞到參數對象中。 添加BLOCK選項會將非阻塞調用(從Redis的角度來看)變成阻塞調用:
List<StreamMessage<String, String>> messages = streamCommands.xread(XReadArgs.Builder.count(1).block(Duration.ofSeconds(10)), StreamOffset.from("my-stream", "0"));在最后一個示例中,讓我們看一下消費者群體。 RedisStreamCommands提供了用于創建使用者的方法-截至撰寫本文時,Redis中尚未實現刪除使用者和使用者組的方法。
streamCommands.xadd("my-stream", Collections.singletonMap("key", "value")); // add a message to create the stream data structurestreamCommands.xgroupCreate("my-stream", "my-group", "$"); // add a group pointing to the stream headList<StreamMessage<String, String>> messages = streamCommands.xreadgroup(Consumer.from("my-group", "c1"),StreamOffset.lastConsumed("my-stream"));使用使用者組my-group和使用者c1從my-stream中讀取消息。 使用者組和使用者名稱是字節安全編碼的,因此在使用ASCII或UTF-8字符串時區分大小寫。
結論
這篇博客文章概述了Redis 5附帶的Redis Streams的初步外觀,以及如何在Lettuce Redis客戶端上使用Stream API。 該API尚未完全實現,因此我們應該期待更改。
翻譯自: https://www.javacodegeeks.com/2018/05/a-first-look-at-redis-streams-and-how-to-use-them-with-java.html
redis streams
總結
以上是生活随笔為你收集整理的redis streams_初步了解Redis Streams以及如何在Java中使用它们的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 淘宝客教学视频怎么下载地址(淘宝客视频教
- 下一篇: 织梦手机版怎么更新(织梦手机版怎么更新游