java并发包消息队列
消息隊列常用于有生產者和消費者兩類角色的多線程同步場景
?
BlockingQueue也是java.util.concurrent下的主要用來控制線程同步的工具。
主要的方法是:put、take一對阻塞存取;add、poll一對非阻塞存取。
???????? 插入:
?????????????????? 1)add(anObject):把anObject加到BlockingQueue里,即如果BlockingQueue可以容納,則返回true,否則拋出異常
??????? 2)offer(anObject):表示如果可能的話,將anObject加到BlockingQueue里,即如果BlockingQueue可以容納,則返回true,否則返回false.
??????? 3)put(anObject):把anObject加到BlockingQueue里,如果BlockQueue沒有空間,則調用此方法的線程被阻塞直到BlockingQueue里面有空間再繼續.
???????? 讀取:
?????? ?4)poll(time):取走BlockingQueue里排在首位的對象,若不能立即取出,則可以等time參數規定的時間,取不到時返回null
??????? 5)take():取走BlockingQueue里排在首位的對象,若BlockingQueue為空,阻斷進入等待狀態直到Blocking有新的對象被加入為止
???????? 其他
int remainingCapacity();返回隊列剩余的容量,在隊列插入和獲取的時候,不要瞎搞,數 據可能不準
boolean remove(Object o); 從隊列移除元素,如果存在,即移除一個或者更多,隊列改??? 變了返回true
public boolean contains(Object o); 查看隊列是否存在這個元素,存在返回true
int drainTo(Collection<? super E> c); 傳入的集合中的元素,如果在隊列中存在,那么將???? 隊列中的元素移動到集合中
int drainTo(Collection<? super E> c, int maxElements); 和上面方法的區別在于,制定了移?? 動的數量
案例:
package blockingqueue; ? import java.util.concurrent.BlockingQueue; ? public class Consumer implements Runnable { ??? BlockingQueue<String> queue; ??? ??? public Consumer(BlockingQueue<String> queue) { ????? this.queue = queue; ?? } ??? ?? @Override ?? public void run() { ????? try { ???????? String consumer = Thread.currentThread().getName(); ???????? System.out.println(consumer); ???????? //如果隊列為空,會阻塞當前線程 ???????? String temp = queue.take(); ???????? System.out.println(consumer + "消費者? get a product:" + temp); ????? } catch (Exception e) { ???????? e.printStackTrace(); ????? } ?? } ? } |
package blockingqueue; ? import java.util.concurrent.BlockingQueue; ? public class Producer implements Runnable { ?? BlockingQueue<String> queue;??? ??? public Producer(BlockingQueue<String> queue) {? ??????? this.queue = queue;? ??? }??? ??? @Override? ??? public void run() {? ??????? try {? ??????????? String temp = "A Product, 生產線程:"? ??????????????????? + Thread.currentThread().getName();? ??????????? queue.put(temp);//如果隊列是滿的話,會阻塞當前線程? ??????????? System.out.println("生產者 I have made a product: "? ??????????? ???? + Thread.currentThread().getName()); ??????? } catch (InterruptedException e) {? ??????????? e.printStackTrace();? ??????? }? ??? } } |
package blockingqueue; ? import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; ? public class Test { ? ?? public static void main(String[] args) throws Exception { ????? BlockingQueue<String> queue = new LinkedBlockingQueue<String>(2); ????? // BlockingQueue<String> queue = new LinkedBlockingQueue<String>(); ????? // 不設置的話,LinkedBlockingQueue默認大小為Integer.MAX_VALUE ????? // BlockingQueue<String> queue = new ArrayBlockingQueue<String>(2); ????? Consumer consumer = new Consumer(queue); ????? Producer producer = new Producer(queue); ????? for (int i = 0; i < 3; i++) { ???????? new Thread(producer, "Producer" + (i + 1)).start(); ????? } ????? for (int i = 0; i < 5; i++) { ???????? new Thread(consumer, "Consumer" + (i + 1)).start(); ????? } ????? ????? Thread.sleep(5000); ????? ????? new Thread(producer, "Producer" + (5)).start(); ?? } } |
?
BlockingQueue有四個具體的實現類,常用的兩種實現類為:
?
1、ArrayBlockingQueue:一個由數組支持的有界阻塞隊列,規定大小的BlockingQueue,其構造函數必須帶一個int參數來指明其大小.其所含的對象是以FIFO(先入先出)順序排序的。
?
2、LinkedBlockingQueue:大小不定的BlockingQueue,若其構造函數帶一個規定大小的參數,生成的BlockingQueue有大小限制,若不帶大小參數,所生成的BlockingQueue的大小由Integer.MAX_VALUE來決定.其所含的對象是以FIFO(先入先出)順序排序的。
???????? LinkedBlockingQueue 可以指定容量,也可以不指定,不指定的話,默認最大是Integer.MAX_VALUE,其中主要用到put和take方法,put方法在隊列滿的時候會阻塞直到有隊列成員被消費,take方法在隊列空的時候會阻塞,直到有隊列成員被放進來。
?
LinkedBlockingQueue和ArrayBlockingQueue區別:
?
LinkedBlockingQueue和ArrayBlockingQueue比較起來,它們背后所用的數據結構不一樣,導致LinkedBlockingQueue的數據吞吐量要大于ArrayBlockingQueue,但在線程數量很大時其性能的可預見性低于ArrayBlockingQueue.
?
生產者消費者的示例代碼:
見代碼? TestBlockingQueue? TestBlockingQueueConsumer?? TestBlockingQueueProducer
package blockingqueue; ? import java.util.Random; import java.util.concurrent.BlockingQueue; ? public class TestBlockingQueueProducer implements Runnable { ?? BlockingQueue<String> queue; ?? Random random = new Random(); ? ?? public TestBlockingQueueProducer(BlockingQueue<String> queue) { ????? this.queue = queue; ?? } ? ?? @Override ?? public void run() { ? ????? for (int i = 0; i < 10; i++) { ???????? try { ??????????? Thread.sleep(random.nextInt(10)); ??????????? String task = Thread.currentThread().getName() + " made a product " + i; ? ??????????? System.out.println(task); ??????????? queue.put(task);? //阻塞方法 ???????? } catch (InterruptedException e) { ??????????? ? ??????????? e.printStackTrace(); ???????? } ? ????? } ?? } } |
package blockingqueue; ? import java.util.Random; import java.util.concurrent.BlockingQueue; ? public class TestBlockingQueueConsumer implements Runnable { ?? BlockingQueue<String> queue; ??? Random random = new Random(); ??? ??? public TestBlockingQueueConsumer(BlockingQueue<String> queue){? ??????? this.queue = queue;? ??? }??????? ??? @Override? ??? public void run() {? ??????? try {? ??????? ? Thread.sleep(random.nextInt(10)); ??????? ? System.out.println(Thread.currentThread().getName()+ "trying..."); ??????????? String temp = queue.take();//如果隊列為空,會阻塞當前線程? ??????????? int remainingCapacity = queue.remainingCapacity(); ??????????? System.out.println(Thread.currentThread().getName() + " get a job " +temp); ??????????? // System.out.println("隊列中的元素個數: "+ remainingCapacity); ??????? } catch (InterruptedException e) {? ??????????? e.printStackTrace(); ??????? }? ??? } } |
package blockingqueue; ? import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; ? public class TestBlockingQueue { ? ?? public static void main(String[] args) { ????? BlockingQueue<String> queue = new LinkedBlockingQueue<String>(2); ????? // BlockingQueue<String> queue = new LinkedBlockingQueue<String>(); ????? // 不設置的話,LinkedBlockingQueue默認大小為Integer.MAX_VALUE ????? // BlockingQueue<String> queue = new ArrayBlockingQueue<String>(2); ????? TestBlockingQueueConsumer consumer = new TestBlockingQueueConsumer(queue); ????? TestBlockingQueueProducer producer = new TestBlockingQueueProducer(queue); ????? for (int i = 0; i < 3; i++) { ???????? new Thread(producer, "Producer" + (i + 1)).start(); ????? } ????? for (int i = 0; i < 5; i++) { ???????? new Thread(consumer, "Consumer" + (i + 1)).start(); ????? } ?? } } |
?
總結
以上是生活随笔為你收集整理的java并发包消息队列的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: java同步关键词解释、synchron
- 下一篇: Linux下MongoDB的安装,通过配