生产者消费者模型java实现
做題的時候遇到了生產者消費者問題,這個問題可以說是線程學習的經典題目了,就忍不住研究了一波。它描述是有一塊緩沖區(qū)(隊列實現(xiàn))作為倉庫,生產者可以將產品放入倉庫,消費者則可以從倉庫中取走產品。在Java中這個數(shù)組線程阻塞的問題,多個用戶同時發(fā)送多個請求,怎么保證不發(fā)生線程死鎖,是我們要考慮的問題。
生產者消費者模式說明:
1.生產者只在倉庫未滿時進行生產,倉庫滿時生產者進程被阻塞;
2.消費者只在倉庫非空時進行消費,倉庫為空時消費者進程被阻塞;
3.當消費者發(fā)現(xiàn)倉庫為空時會通知生產者生產;
3.當生產者發(fā)現(xiàn)倉庫滿時會通知消費者消費;
實現(xiàn)的關鍵:
我們知道在JAVA環(huán)境中,線程Thread有如下幾個狀態(tài):
1.新建狀態(tài)
2.就緒狀態(tài)
3.運行狀態(tài)
4.阻塞狀態(tài)
5.死亡狀態(tài)
生產者消費者問題就是要控制線程的阻塞狀態(tài),保證生產者和消費者進程在一定條件下,一直穩(wěn)定運行,不出現(xiàn)沒有商品但是消費者還是一直購買,商品滿了但是生產者還是不斷生產導致浪費的情況。
?
我們考慮線程常用的Sychronized、RetrenLock還有阻塞隊列來實現(xiàn)。
(1)Object的wait() / notify()方法?
wait(): wait()方法可以讓線程進入等待狀態(tài),當緩沖區(qū)已滿/空時,生產者/消費者線程停止自己的執(zhí)行,放棄鎖,使自己處于等待狀態(tài),讓其他線程執(zhí)行。
notify():notify隨機選擇一個在該對象上調用wait方法的線程,解除其阻塞狀態(tài)。當生產者/消費者向緩沖區(qū)放入/取出一個產品時,向其他等待的線程發(fā)出可執(zhí)行的通知,同時放棄鎖,使自己處于等待狀態(tài)。
?
?
代碼實現(xiàn):
import java.util.LinkedList; import java.util.Queue; import java.util.Random;/*** 生產者消費者模式:使用Object.wait() / notify()方法實現(xiàn)*/ public class ProducerConsumer {private static final int CAPACITY = 5; //申請一個容量最大的倉庫public static void main(String args[]){Queue<Integer> queue = new LinkedList<Integer>();Thread producer1 = new Producer("P1", queue, CAPACITY);Thread producer2 = new Producer("P2", queue, CAPACITY);Thread consumer1 = new Consumer("C1", queue, CAPACITY);Thread consumer2 = new Consumer("C2", queue, CAPACITY);Thread consumer3 = new Consumer("C3", queue, CAPACITY);producer1.start();producer2.start();consumer1.start();consumer2.start();consumer3.start();}/*** 生產者*/public static class Producer extends Thread{private Queue<Integer> queue;//隊列作為倉庫String name;int maxSize;int i = 0;public Producer(String name, Queue<Integer> queue, int maxSize){super(name);this.name = name;this.queue = queue;this.maxSize = maxSize;}@Overridepublic void run(){while(true){ //while(condition)為自旋鎖,為防止該線程沒有收到notify()調用也從wait()中返回 //(也稱作虛假喚醒),這個線程會重新去檢查condition條件以決定當前是否可以安全 //地繼續(xù)執(zhí)行還是需要重新保持等待,而不是認為線程被喚醒了就可以安全地繼續(xù)執(zhí)行 //了,自旋鎖當終止條件滿足時,才會停止自旋,這里設置了一直執(zhí)行,直到程序手動停 //止。synchronized(queue){//給隊列加鎖,保證線程安全while(queue.size() == maxSize){//當隊列是滿的時候,生產者線程等待,由消費者線程進行操作try {System.out .println("Queue is full, Producer[" + name + "] thread waiting for " + "consumer to take something from queue.");queue.wait();} catch (Exception ex) {ex.printStackTrace();}}//隊列不為空的時候,生產者被喚醒進行操作System.out.println("[" + name + "] Producing value : +" + i);queue.offer(i++);//因此如果想在一個滿的隊列中加入一個新項,調用 add() 方法就會拋出一//個 unchecked 異常,而調用 offer() 方法會返回 falsequeue.notifyAll();try {Thread.sleep(new Random().nextInt(1000));} catch (InterruptedException e) {e.printStackTrace();}}}}}/*** 消費者*/public static class Consumer extends Thread{private Queue<Integer> queue;String name;int maxSize;public Consumer(String name, Queue<Integer> queue, int maxSize){super(name);this.name = name;this.queue = queue;this.maxSize = maxSize;}@Overridepublic void run(){while(true){synchronized(queue){while(queue.isEmpty()){try {//隊列為空,說明沒有生產者生產的商品,消費者進行等待System.out.println("Queue is empty, Consumer[" + name + "] thread is waiting for Producer");queue.wait();} catch (Exception ex) {ex.printStackTrace();}}int x = queue.poll();//如果隊列元素為空,調用remove() 的行為與 Collection 接口的版本相似會拋出異常,這里是模擬消費者取走商品的過程// 但是新的 poll() 方法在用空集合調用時只是返回 null。因此新的方法更適合容易出現(xiàn)異常條件的情況。System.out.println("[" + name + "] Consuming value : " + x);queue.notifyAll();//喚醒所有隊列,消費者和生產者根據(jù)隊列情況進行操作try {Thread.sleep(new Random().nextInt(1000));} catch (InterruptedException e) {e.printStackTrace();}}}}} }?
2. 使用Lock和Condition的await() / signal()方法
Condition接口的await()和signal()是用來做同步的兩種方法,它們的功能基本上和Object的wait()/?nofity()相同,完全可以取代它們,但是它們和新引入的鎖定機制Lock直接掛鉤,具有更大的靈活性。通過在Lock對象上調用newCondition()方法,將條件變量和一個鎖對象進行綁定,進而控制并發(fā)程序訪問競爭資源的安全。
代碼實現(xiàn):
import java.util.LinkedList;import java.util.Queue;import java.util.Random;import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.Lock;import java.util.concurrent.locks.ReentrantLock;/*** 生產者消費者模式:使用Lock和Condition實現(xiàn)*/public class ProducerConsumer {private static final int CAPACITY = 5;private static final Lock lock = new ReentrantLock();private static final Condition fullCondition = lock.newCondition();//隊列滿的條件private static final Condition emptyCondition = lock.newCondition();//隊列空的條件public static void main(String args[]){Queue<Integer> queue = new LinkedList<Integer>();Thread producer1 = new Producer("P1", queue, CAPACITY);Thread producer2 = new Producer("P2", queue, CAPACITY);Thread consumer1 = new Consumer("C1", queue, CAPACITY);Thread consumer2 = new Consumer("C2", queue, CAPACITY);Thread consumer3 = new Consumer("C3", queue, CAPACITY);producer1.start();producer2.start();consumer1.start();consumer2.start();consumer3.start();}/*** 生產者*/public static class Producer extends Thread{private Queue<Integer> queue;String name;int maxSize;int i = 0;public Producer(String name, Queue<Integer> queue, int maxSize){super(name);this.name = name;this.queue = queue;this.maxSize = maxSize;}@Overridepublic void run(){while(true){//獲得鎖lock.lock();while(queue.size() == maxSize){try {System.out .println("Queue is full, Producer[" + name + "] thread waiting for " + "consumer to take something from queue.");//這里可以和wait()進行對比,兩種控制線程阻塞的方式fullCondition.await();} catch (InterruptedException ex) {ex.printStackTrace();}}System.out.println("[" + name + "] Producing value : +" + i);queue.offer(i++);//喚醒其他所有生產者、消費者fullCondition.signalAll();emptyCondition.signalAll();//釋放鎖,Lock不同于Sychronized,需要手動釋放鎖lock.unlock();try {Thread.sleep(new Random().nextInt(1000));} catch (InterruptedException e) {e.printStackTrace();}}}}/*** 消費者*/public static class Consumer extends Thread{private Queue<Integer> queue;String name;int maxSize;public Consumer(String name, Queue<Integer> queue, int maxSize){super(name);this.name = name;this.queue = queue;this.maxSize = maxSize;}@Overridepublic void run(){while(true){//獲得鎖lock.lock();while(queue.isEmpty()){try {System.out.println("Queue is empty, Consumer[" + name + "] thread is waiting for Producer");//隊列為空滿足條件,消費者線程阻塞emptyCondition.await();} catch (Exception ex) {ex.printStackTrace();}}int x = queue.poll();System.out.println("[" + name + "] Consuming value : " + x);//喚醒其他所有生產者、消費者fullCondition.signalAll();emptyCondition.signalAll();//釋放鎖lock.unlock();try {Thread.sleep(new Random().nextInt(1000));} catch (InterruptedException e) {e.printStackTrace();}}}}}(3)BlockingQueue阻塞隊列方法?
我們采用一個阻塞隊列來實現(xiàn)。
?
通過隊列可以很便利地實現(xiàn)兩者之間的數(shù)據(jù)共享。假設我們有若干生產者線程,另外又有若干個消費者線程。如果生產者線程需要把準備好的數(shù)據(jù)共享給消費者線程,利用隊列的方式來傳遞數(shù)據(jù),就可以很方便地解決他們之間的數(shù)據(jù)共享問題。但如果生產者和消費者在某個時間段內,萬一發(fā)生數(shù)據(jù)處理速度不匹配的情況呢?理想情況下,如果生產者產出數(shù)據(jù)的速度大于消費者消費的速度,并且當生產出來的數(shù)據(jù)累積到一定程度的時候,那么生產者必須暫停等待一下(阻塞生產者線程),以便等待消費者線程把累積的數(shù)據(jù)處理完畢,反之亦然。
我們這里使用LinkedBlockingQueue,它是一個已經在內部實現(xiàn)了同步的隊列,實現(xiàn)方式采用的是我們第2種await()/?signal()方法。它可以在生成對象時指定容量大小。它用于阻塞操作的是put()和take()方法。
- put()方法:類似于我們上面的生產者線程,容量達到最大時,自動阻塞。
- take()方法:類似于我們上面的消費者線程,容量為0時,自動阻塞。
?
?
代碼實現(xiàn):
import java.util.LinkedList;import java.util.Queue;import java.util.Random;import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.Lock;import java.util.concurrent.locks.ReentrantLock;/*** 生產者消費者模式:使用Lock和Condition實現(xiàn)*/public class ProducerConsumer {private static final int CAPACITY = 5;private static final Lock lock = new ReentrantLock();private static final Condition fullCondition = lock.newCondition();//隊列滿的條件private static final Condition emptyCondition = lock.newCondition();//隊列空的條件public static void main(String args[]){Queue<Integer> queue = new LinkedList<Integer>();Thread producer1 = new Producer("P1", queue, CAPACITY);Thread producer2 = new Producer("P2", queue, CAPACITY);Thread consumer1 = new Consumer("C1", queue, CAPACITY);Thread consumer2 = new Consumer("C2", queue, CAPACITY);Thread consumer3 = new Consumer("C3", queue, CAPACITY);producer1.start();producer2.start();consumer1.start();consumer2.start();consumer3.start();}/*** 生產者*/public static class Producer extends Thread{private Queue<Integer> queue;String name;int maxSize;int i = 0;public Producer(String name, Queue<Integer> queue, int maxSize){super(name);this.name = name;this.queue = queue;this.maxSize = maxSize;}@Overridepublic void run(){while(true){//獲得鎖lock.lock();while(queue.size() == maxSize){try {System.out .println("Queue is full, Producer[" + name + "] thread waiting for " + "consumer to take something from queue.");//這里可以和wait()進行對比,兩種控制線程阻塞的方式fullCondition.await();} catch (InterruptedException ex) {ex.printStackTrace();}}System.out.println("[" + name + "] Producing value : +" + i);queue.offer(i++); //喚醒其他所有生產者、消費者fullCondition.signalAll();emptyCondition.signalAll();//釋放鎖,Lock不同于Sychronized,需要手動釋放鎖lock.unlock();try {Thread.sleep(new Random().nextInt(1000));} catch (InterruptedException e) {e.printStackTrace();}}}}/*** 消費者*/public static class Consumer extends Thread{private Queue<Integer> queue;String name;int maxSize;public Consumer(String name, Queue<Integer> queue, int maxSize){super(name);this.name = name;this.queue = queue;this.maxSize = maxSize;}@Overridepublic void run(){while(true){lock.lock();while(queue.isEmpty()){try {System.out.println("Queue is empty, Consumer[" + name + "] thread is waiting for Producer"); //隊列為空滿足條件,消費者線程阻塞emptyCondition.await();} catch (Exception ex) {ex.printStackTrace();}}int x = queue.poll();System.out.println("[" + name + "] Consuming value : " + x);//喚醒其他所有生產者、消費者fullCondition.signalAll();emptyCondition.signalAll();//釋放鎖lock.unlock();try {Thread.sleep(new Random().nextInt(1000));} catch (InterruptedException e) {e.printStackTrace();}}}}}小結:三種實現(xiàn)形式,其實理念都是相同的,都是控制阻塞狀態(tài),根據(jù)條件去控制線程的運行狀態(tài)和阻塞狀態(tài)。生產者消費者模式 為信息傳輸開辟了一個嶄新的概念,因為它的優(yōu)先級最高,所以即使網絡發(fā)生堵塞時它也會最先通過,最大程度的保證了設備的安全。也有缺點,就是在網絡中的個數(shù)是有限制的。生產者消費者模式在設置時比較簡單,使用方便安全,在將來的自動化行業(yè)必定會大大被人們所認同。
參考資料:
https://blog.csdn.net/u010983881/article/details/78554671#commentBox
總結
以上是生活随笔為你收集整理的生产者消费者模型java实现的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 5月6日----5月9日二年级课程表
- 下一篇: 前端之CSS