redis队列
1、lpush+rpop
采用rpop需要不停調用rpop方法查看list中是否有待處理消息。每調用一次都會發起一次連接,造成不必要浪費
? ? ? ? ??
代碼:
producer: ? ? ??
package com.eval.mind.service.redis;import java.util.UUID; import java.util.concurrent.TimeUnit;import redis.clients.jedis.Jedis; public class Producer_Lpush extends Thread{public static final String MESSAGE_KEY = "message:queue";private Jedis jedis;private String producerName;private volatile int count;public Producer_Lpush(String name) {this.producerName = name;init();}private void init() {jedis=new Jedis("192.168.80.4",6379);}public void putMessage(String message) {Long size = jedis.lpush(MESSAGE_KEY, message);System.out.println(producerName + ": 當前未被處理消息條數為:" + size);count++;}public int getCount() {return count;}@Overridepublic void run() {try {while (true) {putMessage(UUID.randomUUID().toString());TimeUnit.SECONDS.sleep(1);}} catch (InterruptedException e) {} catch (Exception e) {e.printStackTrace();}}public static void main(String[] args) throws InterruptedException{Producer_Lpush producer = new Producer_Lpush("myProducer");producer.start();for(; ;) {System.out.println("main : 已存儲消息條數:" + producer.getCount());TimeUnit.SECONDS.sleep(10);}} } View Code
? ?? consumer:
? ? ? ? ? ? ? ?
package com.eval.mind.service.redis;import redis.clients.jedis.Jedis;/** rpop從redis隊列中pop處一個String類型元素,String mes=jedis.rpop(key)* 缺點是消費端需要不停的調用rpop方法查看list是否有待處理消息。每調用依次都會發起依次連接,這會造成不必要浪費;為解決此參考Consumer_Brpop.java方法*/ public class Consumer_Rpop extends Thread {private String customerName;private volatile int count;private Jedis jedis;public Consumer_Rpop(String name) {this.customerName = name;init();}private void init() {jedis = new Jedis("192.168.80.4", 6379);}public void processMessage() {String message = jedis.rpop(Producer_Lpush.MESSAGE_KEY); //如果redis隊列中沒有數據,也會一直調用if (message != null) {count++;handle(message);}}public void handle(String message) {System.out.println(customerName + " 正在處理消息,消息內容是: " + message + " 這是第" + count + "條");}@Overridepublic void run() {while (true) {processMessage();}}public static void main(String[] args) {Consumer_Rpop customer = new Consumer_Rpop("yamikaze");customer.start();} } View Code?
2、lpush+brpop
brpop:blocking rpop,采用brpop時,如果redis隊列中不存在數據則調用List<String> messages=jedis.brpop(int i,key1,key2)時會阻塞,不會往下執行,一直等待redis隊列中再次push進數據后繼續執行pop操作
? ? ?* brpop支持多個列表(隊列)
* brpop指令是支持隊列優先級的,比如這個例子中MESSAGE_KEY的優先級大于testKey(依據brpop中順序決定)。* 如果兩個列表中都有元素,會優先返回優先級高的列表中的元素,所以這兒優先返回MESSAGE_KEY* 0表示不限制等待,會一直阻塞在這兒代碼:
producer: ? ? ? ? ? ? ? ? ? ? ? ? ? ??
package com.eval.mind.service.redis;import java.util.UUID; import java.util.concurrent.TimeUnit;import redis.clients.jedis.Jedis; public class Producer_Lpush extends Thread{public static final String MESSAGE_KEY = "message:queue";private Jedis jedis;private String producerName;private volatile int count;public Producer_Lpush(String name) {this.producerName = name;init();}private void init() {jedis=new Jedis("192.168.80.4",6379);}public void putMessage(String message) {Long size = jedis.lpush(MESSAGE_KEY, message);System.out.println(producerName + ": 當前未被處理消息條數為:" + size);count++;}public int getCount() {return count;}@Overridepublic void run() {try {while (true) {putMessage(UUID.randomUUID().toString());TimeUnit.SECONDS.sleep(1);}} catch (InterruptedException e) {} catch (Exception e) {e.printStackTrace();}}public static void main(String[] args) throws InterruptedException{Producer_Lpush producer = new Producer_Lpush("myProducer");producer.start();for(; ;) {System.out.println("main : 已存儲消息條數:" + producer.getCount());TimeUnit.SECONDS.sleep(10);}} } View Code?
? ?consumer:
package com.eval.mind.service.redis;import java.util.List;import org.springframework.util.CollectionUtils;import redis.clients.jedis.Jedis;/** brpop:blocking rpop,這個指令只有在有元素時才返回,沒有則會阻塞到超時返回null;* brpop支持多個列表(隊列),支持隊列優先級,比如messagekey優先級大于testkey,如果兩個列表中都有元素,會優先返回messagekey列表中元素* 返回List<String>類型 List<String> messages=jedis.brpop(key);*/ public class Consumer_Brpop extends Thread{private volatile int count;String name;private Jedis jedis;public Consumer_Brpop(String name) {this.name=name;init();}private void init() {jedis = new Jedis("192.168.80.4", 6379);}private void processMessage() {List<String> messages=jedis.brpop(0, "message:queue","message:tmp"); //如果redis隊列中沒有數據,此處會阻塞不會往下執行,直到redis隊列又push進數據if(!CollectionUtils.isEmpty(messages)) {String keyName=messages.get(0);String messageValue=messages.get(1);count++;handle(messageValue);}}public void handle(String message) {System.out.println(name + " 正在處理消息,消息內容是: " + message + " 這是第" + count + "條");}@Overridepublic void run() {while (true) {processMessage();}}public static void main(String[] args) {Consumer_Brpop customer = new Consumer_Brpop("yamikaze");customer.start();} } View Code?
?
3、publish+subscribe
redis除了對消息隊列提供支持外,還提供一組命令用于支持發布/訂閱模式
發布:
publish指令可用于發布一條消息:publish channel message
訂閱:
subscribe指令用于接收一條消息:subscribe channel
? ? 可以看到使用subscribe指令進入訂閱模式后,并沒有接收到publish發送的消息,這是因為只有在消息發送出去前才會收到,也就是說訂閱subscribe啟動要在? ? ? ?publish之前執行
訂閱發布模式和消息隊列模式區別:消息隊列模式是通過key方式實現,取出就刪除了,其他進程取不到。訂閱發布可以支持多客戶端獲取同一個頻道? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? (channel)發布的消息
? ? 代碼:
publish: ? ?
package com.eval.mind.service.redis;import org.apache.commons.lang3.StringUtils;import redis.clients.jedis.Jedis;public class Producer_Publish {public static final String Channel_key="channel:message";private Jedis jedis;public Producer_Publish() {jedis=new Jedis("192.168.80.4",6379);}public void publishMessage(String message) {if(StringUtils.isNoneBlank(message)) {return;}jedis.publish(Channel_key, message); //publish方法 發布消息 }public static void main(String[] args) {Producer_Publish publisher=new Producer_Publish();publisher.publishMessage("Hello Publish Redis");} } View Code?
subscribe:
package com.eval.mind.service.redis;import org.apache.commons.lang3.StringUtils;import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPubSub;public class Consumer_Subscribe {private Jedis jedis;private static final String EXIT_COMMAND = "exit";public Consumer_Subscribe() {jedis = new Jedis("192.168.80.4", 6379);}public void subscribe(String channel) { // subscribe方法訂閱消息if (StringUtils.isBlank(channel)) {return;}// JedisPubSub類是一個沒有抽象方法的抽象類,里邊方法時一些空實現,可以選擇需要的方法覆蓋,這里使用的是subscribe指令,所以覆蓋了onMessage// 如果使用pubsubscribe指令則覆蓋onPmessage方法 JedisPubSub jps = new JedisPubSub() {@Overridepublic void onMessage(String channel, String message) {while (true) { //相對于redis隊列只能消費一次,此處會對channel_key一直訂閱,if (Producer_Publish.Channel_key.equals(channel)) {System.out.println("接收到消息: channel : " + message);// 接收到exit消息后退出if (EXIT_COMMAND.equals(message)) {System.exit(0);}}}}/*** 訂閱時*/@Overridepublic void onSubscribe(String channel, int subscribedChannels) {if (Producer_Publish.Channel_key.equals(channel)) {System.out.println("訂閱了頻道:" + channel);}}};jedis.subscribe(jps, channel);}public static void main(String[] args) {Consumer_Subscribe client = new Consumer_Subscribe();client.subscribe(Producer_Publish.Channel_key);} } View Code?
?
轉載于:https://www.cnblogs.com/enhance/p/11118714.html
總結
- 上一篇: 导数,微积分,牛顿运动学制作创意地图
- 下一篇: 杭电acm2015偶数求和