Java基础教程:多线程基础(3)——阻塞队列
Java基礎(chǔ)教程:多線程基礎(chǔ)(3)——阻塞隊(duì)列
快速開始
引入問題
生產(chǎn)者消費(fèi)者問題是線程模型中的經(jīng)典問題:生產(chǎn)者和消費(fèi)者在同一時(shí)間段內(nèi)共用同一存儲(chǔ)空間,生產(chǎn)者向空間里生產(chǎn)數(shù)據(jù),而消費(fèi)者取走數(shù)據(jù)。
模擬情景
這里我們實(shí)現(xiàn)如下的情況的生產(chǎn)-消費(fèi)模型:
生產(chǎn)者不斷交替地生產(chǎn)兩組數(shù)據(jù)“姓名--1-->內(nèi)容--1”,“姓名--2-->內(nèi)容--2”,這里的“姓名--1”和“姓名--2”模擬為數(shù)據(jù)的名稱,“內(nèi)容--1 ”和“內(nèi)容--2 ”模擬為數(shù)據(jù)的內(nèi)容。
由于本程序中牽扯到線程運(yùn)行的不確定性,因此可能會(huì)出現(xiàn)以下問題:
1.假設(shè)生產(chǎn)者線程剛向數(shù)據(jù)存儲(chǔ)空間添加了數(shù)據(jù)的名稱,還沒有加入該信息的內(nèi)容,程序就切換到了消費(fèi)者線程,消費(fèi)者線程把信息的名稱和上一個(gè)信息的內(nèi)容聯(lián)系到了一起;
2.生產(chǎn)者生產(chǎn)了若干條數(shù)據(jù),消費(fèi)者才可以取數(shù)據(jù),或者是,消費(fèi)者取完一次數(shù)據(jù)后,還沒等生產(chǎn)者放入新的數(shù)據(jù),又重復(fù)取出了已取過的數(shù)據(jù)。
通過分析我們可知:
第一個(gè)問題可以通過同步來解決,第二個(gè)問題就需要用到線程通信。生產(chǎn)者線程放入數(shù)據(jù)后,通知消費(fèi)者線程取出數(shù)據(jù),消費(fèi)者線程取出數(shù)據(jù)后,通知生產(chǎn)者線程生產(chǎn)數(shù)據(jù),這里用wait\notigy機(jī)制來實(shí)現(xiàn)。
Java代碼
定義信息類
package thread;public class Info {private String name = "name";private String content = "content";//設(shè)置標(biāo)志位,用來進(jìn)行線程通信private boolean flag =true;/*** 設(shè)置消息,此處用到線程同步* @param name* @param content*/public synchronized void set(String name,String content){while (!flag){try {super.wait();} catch (InterruptedException e) {e.printStackTrace();}}this.name=name; //設(shè)置名稱try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}this.content=content; //設(shè)置內(nèi)容flag =false; //設(shè)置標(biāo)志位,表示現(xiàn)在生產(chǎn)停止,可以取走!}public synchronized void get(){while (flag){try {super.wait();} catch (InterruptedException e) {e.printStackTrace();}}try {Thread.sleep(300);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(name +" --> " + content) ;flag = true ; // 改變標(biāo)志位,表示可以生產(chǎn)super.notify();}}定義生產(chǎn)者
public class Producer implements Runnable {private Info info=null;public Producer(Info info){this.info=info;}@Overridepublic void run() {boolean flag = true ; // 定義標(biāo)記位for(int i=0;i<10;i++){if(flag){this.info.set("姓名--1","內(nèi)容--1") ; // 設(shè)置名稱flag = false ;}else{this.info.set("姓名--2","內(nèi)容--2") ; // 設(shè)置名稱flag = true ;}}} }定義消費(fèi)者
public class Consumer implements Runnable {private Info info = null ;public Consumer(Info info){this.info = info ;}public void run(){for(int i=0;i<10;i++){this.info.get() ;}}public static void main(String[] args) {Info info = new Info(); // 實(shí)例化Info對(duì)象Producer pro = new Producer(info) ; // 生產(chǎn)者Consumer con = new Consumer(info) ; // 消費(fèi)者new Thread(pro).start() ;//啟動(dòng)了生產(chǎn)者線程后,再啟動(dòng)消費(fèi)者線程try{Thread.sleep(500) ;}catch(InterruptedException e){e.printStackTrace() ;}new Thread(con).start() ;} }?使用阻塞隊(duì)列來實(shí)現(xiàn)相同功能
引入BlockingQueue
任何有效的生產(chǎn)者-消費(fèi)者問題解決方案都是通過控制生產(chǎn)者put()方法(生產(chǎn)資源)和消費(fèi)者take()方法(消費(fèi)資源)的調(diào)用來實(shí)現(xiàn)的,一旦你實(shí)現(xiàn)了對(duì)方法的阻塞控制,那么你將解決該問題。Java通過BlockingQueue提供了開箱即用的支持來控制這些方法的調(diào)用(一個(gè)線程創(chuàng)建資源,另一個(gè)消費(fèi)資源)。java.util.concurrent包下的BlockingQueue接口是一個(gè)線程安全的可用于存取對(duì)象的隊(duì)列。
BlockingQueue是一種數(shù)據(jù)結(jié)構(gòu),支持一個(gè)線程往里存資源,另一個(gè)線程從里取資源。這正是解決生產(chǎn)者消費(fèi)者問題所需要的,那么讓我們開始解決該問題吧。
Java代碼
消息類
public class InfoPlus {private String name = "name";private String content = "content";public InfoPlus(String name, String content) {this.name = name;this.content = content;}public String getName() {return name;}public void setName(String name) {this.name = name;}public String getContent() {return content;}public void setContent(String content) {this.content = content;}@Overridepublic String toString() {return "InfoPlus{" +"name='" + name + '\'' +", content='" + content + '\'' +'}';} }生產(chǎn)者
import java.util.concurrent.BlockingQueue;public class ProducerPlus implements Runnable {private BlockingQueue<InfoPlus> queue;public ProducerPlus(BlockingQueue<InfoPlus> queue) {this.queue = queue;}@Overridepublic void run() {for (int i=0;i<10;i++){try {Thread.sleep(1000);queue.put(new InfoPlus("name"+i,"content"+i));} catch (InterruptedException e) {e.printStackTrace();}}} }消費(fèi)者
import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingDeque;public class ConsumerPlus implements Runnable{private BlockingQueue<InfoPlus> queue;public ConsumerPlus(BlockingQueue<InfoPlus> queue) {this.queue = queue;}public void run() {while (true) {try {System.out.println(this.queue.take());} catch (InterruptedException e) {e.printStackTrace();}}}public static void main(String[] args) {BlockingQueue<InfoPlus> blockingQueue = new LinkedBlockingDeque<>();ProducerPlus producerPlus = new ProducerPlus(blockingQueue);ConsumerPlus consumerPlus = new ConsumerPlus(blockingQueue);ConsumerPlus consumerPlus1 = new ConsumerPlus(blockingQueue);new Thread(producerPlus).start();new Thread(consumerPlus).start();new Thread(consumerPlus1).start();} }?
轉(zhuǎn)載于:https://www.cnblogs.com/MrSaver/p/9409838.html
總結(jié)
以上是生活随笔為你收集整理的Java基础教程:多线程基础(3)——阻塞队列的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: chorme调试Paused in de
- 下一篇: 使用fastjson的parseObje