Redis中的发布订阅模式
列表的局限
前面我們說通過隊列的rpush 和lpop 可以實現(xiàn)消息隊列(隊尾進(jìn)隊頭出),但是消費者需要不停地調(diào)用lpop 查看List 中是否有等待處理的消息(比如寫一個while 循環(huán))。為了減少通信的消耗,可以sleep()一段時間再消費,但是會有兩個問題:
1、如果生產(chǎn)者生產(chǎn)消息的速度遠(yuǎn)大于消費者消費消息的速度,List 會占用大量的內(nèi)存。
2、消息的實時性降低。
list 還提供了一個阻塞的命令:blpop,沒有任何元素可以彈出的時候,連接會被阻塞。
blpop queue 5基于list 實現(xiàn)的消息隊列,不支持一對多的消息分發(fā)。
發(fā)布訂閱模式
除了通過list 實現(xiàn)消息隊列之外,Redis 還提供了一組命令實現(xiàn)發(fā)布/訂閱模式。
這種方式,發(fā)送者和接收者沒有直接關(guān)聯(lián)(實現(xiàn)了解耦),接收者也不需要持續(xù)嘗試獲取消息。
?
訂閱頻道
首先,我們有很多的頻道(channel),我們也可以把這個頻道理解成queue。訂閱者可以訂閱一個或者多個頻道。消息的發(fā)布者(生產(chǎn)者)可以給指定的頻道發(fā)布消息。
只要有消息到達(dá)了頻道,所有訂閱了這個頻道的訂閱者都會收到這條消息。
需要注意的注意是,發(fā)出去的消息不會被持久化,因為它已經(jīng)從隊列里面移除了,所以消費者只能收到它開始訂閱這個頻道之后發(fā)布的消息。
下面我們來看一下發(fā)布訂閱命令的使用方法。
訂閱者訂閱頻道:可以一次訂閱多個,比如這個客戶端訂閱了3 個頻道。
subscribe channel-1 channel-2 channel-3發(fā)布者可以向指定頻道發(fā)布消息(并不支持一次向多個頻道發(fā)送消息):
publish channel-1 2673取消訂閱(不能在訂閱狀態(tài)下使用):
unsubscribe channel-1按規(guī)則(Pattern)訂閱頻道
支持?和*占位符。?代表一個字符,*代表0 個或者多個字符。
消費端1,關(guān)注運動信息:
psubscribe *sport消費端2,關(guān)注所有新聞:
psubscribe news*消費端3,關(guān)注天氣新聞:
psubscribe news-weather生產(chǎn)者,發(fā)布3 條信息
publish news-sport yaoming publish news-music jaychou publish news-weather rain public class MyListener extends JedisPubSub {// 取得訂閱的消息后的處理public void onMessage(String channel, String message) {System.out.println(channel + "=" + message);}// 初始化訂閱時候的處理public void onSubscribe(String channel, int subscribedChannels) {// System.out.println(channel + "=" + subscribedChannels);}// 取消訂閱時候的處理public void onUnsubscribe(String channel, int subscribedChannels) {// System.out.println(channel + "=" + subscribedChannels);}// 初始化按表達(dá)式的方式訂閱時候的處理public void onPSubscribe(String pattern, int subscribedChannels) {// System.out.println(pattern + "=" + subscribedChannels);}// 取消按表達(dá)式的方式訂閱時候的處理public void onPUnsubscribe(String pattern, int subscribedChannels) {// System.out.println(pattern + "=" + subscribedChannels);}// 取得按表達(dá)式的方式訂閱的消息后的處理public void onPMessage(String pattern, String channel, String message) {System.out.println(pattern + "=" + channel + "=" + message);} } public class ListenTest {public static void main(String[] args) {Jedis jedis = new Jedis("127.0.0.1", 6379);final MyListener listener = new MyListener();// 使用模式匹配的方式設(shè)置頻道// 會阻塞jedis.psubscribe(listener, new String[]{"qingshan-*"});} } public class PublishTest {public static void main(String[] args) {Jedis jedis = new Jedis("127.0.0.1", 6379);jedis.publish("qingshan-123", "666");jedis.publish("qingshan-abc", "pengyuyan");} }?
總結(jié)
以上是生活随笔為你收集整理的Redis中的发布订阅模式的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Redis中的zset 存储结构(实现)
- 下一篇: Redis中的Lua 脚本