java实现生产者消费者模式
一: 什么是生產(chǎn)者消費(fèi)者模型
? ? ? ?生產(chǎn)者消費(fèi)者模式是通過一個(gè)容器來解決生產(chǎn)者和消費(fèi)者的強(qiáng)耦合問題。生產(chǎn)者和消費(fèi)者彼此之間不直接通訊,而通過阻塞隊(duì)列來進(jìn)行通訊,所以生產(chǎn)者生產(chǎn)完數(shù)據(jù)之后不用等待消費(fèi)者處理,直接扔給阻塞隊(duì)列,消費(fèi)者不找生產(chǎn)者要數(shù)據(jù),而是直接從阻塞隊(duì)列里取,阻塞隊(duì)列就相當(dāng)于一個(gè)緩沖區(qū),平衡了生產(chǎn)者和消費(fèi)者的處理能力。
簡(jiǎn)單來說:
? ? ? ?生產(chǎn)者消費(fèi)者模型就是指,在一個(gè)系統(tǒng)中,存在兩種角色,一個(gè)為生產(chǎn)者,一個(gè)為消費(fèi)者,通過一個(gè)緩沖區(qū)(倉(cāng)庫(kù))進(jìn)行通信,生產(chǎn)者將生產(chǎn)的產(chǎn)品放入倉(cāng)庫(kù),消費(fèi)者從倉(cāng)庫(kù)中取產(chǎn)品。當(dāng)倉(cāng)庫(kù)滿時(shí),生產(chǎn)者阻塞,當(dāng)倉(cāng)庫(kù)空時(shí),消費(fèi)者阻塞。
二: 關(guān)系圖
三: 實(shí)現(xiàn)方式
3.1 wait—notify 方式
3.1.1 舉例1
生產(chǎn)者類
/*** 生產(chǎn)者類* 實(shí)現(xiàn)runnable接口* @author DH**/ public class Producer implements Runnable{private BufferArea ba;//通過傳入?yún)?shù)的方式是使得對(duì)象相同,具有互斥鎖的效果。public Producer(BufferArea ba){this.ba = ba;}@Overridepublic void run() {while(true){setIntervalTime();ba.set();//生產(chǎn)產(chǎn)品}}//設(shè)置時(shí)間間隔public void setIntervalTime(){try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}}消費(fèi)者類
/*** 消費(fèi)者類* 實(shí)現(xiàn)runnable接口* @author DH**/ public class Consumer implements Runnable{private BufferArea ba;public Consumer(BufferArea ba){this.ba = ba;}@Overridepublic void run() {while(true){setIntervalTime();ba.get();//消費(fèi)產(chǎn)品}}//設(shè)置時(shí)間間隔public void setIntervalTime(){try {Thread.sleep(200);} catch (InterruptedException e) {e.printStackTrace();}} }倉(cāng)庫(kù)?
/*** 倉(cāng)庫(kù)* 緩沖區(qū)* wait()/notify()* @author DH**/ public class BufferArea {private int currNum = 0;//當(dāng)前倉(cāng)庫(kù)的產(chǎn)品數(shù)量private int maxNum = 10;//倉(cāng)庫(kù)最大產(chǎn)品容量public synchronized void set(){if(currNum<maxNum){currNum++;System.out.println(Thread.currentThread().getName()+" 生產(chǎn)了一件產(chǎn)品!當(dāng)前產(chǎn)品數(shù)為:"+currNum);notifyAll();}else{//當(dāng)前產(chǎn)品數(shù)大于倉(cāng)庫(kù)的最大容量try {System.out.println(Thread.currentThread().getName()+" 開始等待!當(dāng)前倉(cāng)庫(kù)已滿,產(chǎn)品數(shù)為:"+currNum);wait();} catch (InterruptedException e) {e.printStackTrace();}}}public synchronized void get(){if(currNum>0){//倉(cāng)庫(kù)中有產(chǎn)品currNum--;System.out.println(Thread.currentThread().getName()+" 獲得了一件產(chǎn)品!當(dāng)前產(chǎn)品數(shù)為:"+currNum);notifyAll();}else{try {System.out.println(Thread.currentThread().getName()+" 開始等待!當(dāng)前倉(cāng)庫(kù)為空,產(chǎn)品數(shù)為:"+currNum);wait();} catch (InterruptedException e) {e.printStackTrace();}}} }測(cè)試類
/*** 測(cè)試類* @author DH**/ public class MainCode {public static void main(String[] args) {//同一個(gè)倉(cāng)庫(kù)BufferArea ba = new BufferArea();//三個(gè)生產(chǎn)者Producer p1 = new Producer(ba);Producer p2 = new Producer(ba);Producer p3 = new Producer(ba);//三個(gè)消費(fèi)者Consumer c1 = new Consumer(ba);Consumer c2 = new Consumer(ba);Consumer c3 = new Consumer(ba);//創(chuàng)建線程,并給線程命名Thread t1 = new Thread(p1,"生產(chǎn)者1");Thread t2 = new Thread(p2,"生產(chǎn)者2");Thread t3 = new Thread(p3,"生產(chǎn)者3");Thread t4 = new Thread(c1,"消費(fèi)者1");Thread t5 = new Thread(c2,"消費(fèi)者2");Thread t6 = new Thread(c3,"消費(fèi)者3");//使線程進(jìn)入就緒狀態(tài)t1.start();t2.start();t3.start();t4.start();t5.start();t6.start();} }通過設(shè)置生產(chǎn)者消費(fèi)者的時(shí)間間隔,可以測(cè)試倉(cāng)庫(kù)滿和空時(shí)的情況,當(dāng)生產(chǎn)者時(shí)間間隔小,表示生產(chǎn)的快,會(huì)出現(xiàn)倉(cāng)庫(kù)滿了的情況。測(cè)試結(jié)果如下。
?生產(chǎn)者2 生產(chǎn)了一件產(chǎn)品!當(dāng)前產(chǎn)品數(shù)為:1
生產(chǎn)者1 生產(chǎn)了一件產(chǎn)品!當(dāng)前產(chǎn)品數(shù)為:2
生產(chǎn)者3 生產(chǎn)了一件產(chǎn)品!當(dāng)前產(chǎn)品數(shù)為:3
生產(chǎn)者3 生產(chǎn)了一件產(chǎn)品!當(dāng)前產(chǎn)品數(shù)為:4
生產(chǎn)者1 生產(chǎn)了一件產(chǎn)品!當(dāng)前產(chǎn)品數(shù)為:5
生產(chǎn)者2 生產(chǎn)了一件產(chǎn)品!當(dāng)前產(chǎn)品數(shù)為:6
生產(chǎn)者1 生產(chǎn)了一件產(chǎn)品!當(dāng)前產(chǎn)品數(shù)為:7
生產(chǎn)者3 生產(chǎn)了一件產(chǎn)品!當(dāng)前產(chǎn)品數(shù)為:8
生產(chǎn)者2 生產(chǎn)了一件產(chǎn)品!當(dāng)前產(chǎn)品數(shù)為:9
生產(chǎn)者3 生產(chǎn)了一件產(chǎn)品!當(dāng)前產(chǎn)品數(shù)為:10
生產(chǎn)者2 開始等待!當(dāng)前倉(cāng)庫(kù)已滿,產(chǎn)品數(shù)為:10
生產(chǎn)者1 開始等待!當(dāng)前倉(cāng)庫(kù)已滿,產(chǎn)品數(shù)為:10
消費(fèi)者1 獲得了一件產(chǎn)品!當(dāng)前產(chǎn)品數(shù)為:9
消費(fèi)者2 獲得了一件產(chǎn)品!當(dāng)前產(chǎn)品數(shù)為:8
當(dāng)消費(fèi)者時(shí)間間隔小,表示消費(fèi)的快,會(huì)出現(xiàn)倉(cāng)庫(kù)為空的情況。測(cè)試結(jié)果如下。
消費(fèi)者2 開始等待!當(dāng)前倉(cāng)庫(kù)為空,產(chǎn)品數(shù)為:0
生產(chǎn)者3 生產(chǎn)了一件產(chǎn)品!當(dāng)前產(chǎn)品數(shù)為:1
生產(chǎn)者2 生產(chǎn)了一件產(chǎn)品!當(dāng)前產(chǎn)品數(shù)為:2
生產(chǎn)者1 生產(chǎn)了一件產(chǎn)品!當(dāng)前產(chǎn)品數(shù)為:3
消費(fèi)者2 獲得了一件產(chǎn)品!當(dāng)前產(chǎn)品數(shù)為:2
消費(fèi)者1 獲得了一件產(chǎn)品!當(dāng)前產(chǎn)品數(shù)為:1
消費(fèi)者3 獲得了一件產(chǎn)品!當(dāng)前產(chǎn)品數(shù)為:0
消費(fèi)者3 開始等待!當(dāng)前倉(cāng)庫(kù)為空,產(chǎn)品數(shù)為:0
3.1.2 舉例2?
產(chǎn)品類(倉(cāng)庫(kù))
package test.exception.producer_consumer_model;/* 假設(shè)為產(chǎn)品為筆*/public class Production {private String type = "";private String color = "";private long code = 0; // 產(chǎn)品編號(hào)private boolean isProduced = false; // 是否生產(chǎn)完成 初始狀態(tài)為未生產(chǎn)狀態(tài)private boolean isContinueProduce = true; // 是否停產(chǎn)該產(chǎn)品public void setContinueProduce(boolean continueProduce) {isContinueProduce = continueProduce;}public void setCode(long code) {this.code = code;}public Production(){}public boolean isContinueProduce() {return isContinueProduce;}public void setType(String type) {this.type = type;}public void setColor(String color) {this.color = color;}public void setProduced(boolean produced) {isProduced = produced;}public boolean isProduced() {return isProduced;}@Overridepublic String toString() {return color + type + "-" + code;} }生產(chǎn)者
package test.exception.producer_consumer_model;public class Producer implements Runnable {private final Production pen; // 產(chǎn)品public Producer(Production pen) {this.pen = pen;}// 生產(chǎn)public void produce() {long code = 0;while (this.pen.isContinueProduce()) {synchronized (this.pen) {if (this.pen.isProduced()) {try {this.pen.wait(); // 等待消費(fèi)者消費(fèi)} catch (InterruptedException e) {e.printStackTrace();}}// 開始生產(chǎn)this.pen.setType("鉛筆");this.pen.setColor("藍(lán)色");this.pen.setCode(code++);this.pen.setProduced(true);System.out.println(this.pen + " is produced");this.pen.notify();}}System.out.println("finish producing");}@Overridepublic void run() {produce();} }消費(fèi)者
package test.exception.producer_consumer_model;public class Consumer implements Runnable {private final Production pen;public Consumer(Production pen) {this.pen = pen;}// 持續(xù)消費(fèi)public void consumer() {while (this.pen.isContinueProduce()) {synchronized (this.pen) {if (!this.pen.isProduced()) {try {this.pen.wait(); // 等待生產(chǎn)者生產(chǎn)} catch (InterruptedException e) {e.printStackTrace();}}System.out.println(this.pen + " is consumed"); // 使用this.pen.setProduced(false); // 使用完后更新狀態(tài)this.pen.notify();}}// 確保停止生產(chǎn)后,能夠使用最后生產(chǎn)的一支筆if (this.pen.isProduced()) {System.out.println(this.pen + " is consumed");}System.out.println("finish using");}@Overridepublic void run() {consumer();} }測(cè)試類
package test.exception.producer_consumer_model;public class Demo {public static void main(String[] args) throws InterruptedException {Production pen = new Production();Consumer consumer = new Consumer(pen);Producer producer = new Producer(pen);new Thread(producer).start(); // 開啟生產(chǎn)者線程new Thread(consumer).start(); // 開啟消費(fèi)者線程Thread.sleep(10000);pen.setContinueProduce(false); // 10s后停止生產(chǎn)該類型的筆} }運(yùn)行結(jié)果?
3.2 阻塞隊(duì)列方式?
這里因?yàn)槠? 詳細(xì)的實(shí)現(xiàn)方式可以看我的這篇博客;?https://blog.csdn.net/m0_50370837/article/details/124339524
參考文章:?Java的生產(chǎn)者消費(fèi)者模型_Hi--Man的博客-CSDN博客
https://www.jb51.net/article/187908.htm
總結(jié)
以上是生活随笔為你收集整理的java实现生产者消费者模式的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: androidx使用Toolbar
- 下一篇: 使用 Mailgun 配置 Ghost