(转)生产者/消费者问题的多种Java实现方式 (待整理)
背景:生產(chǎn)者消費者的問題真的是繞不開,面試時候很可能讓手寫此代碼,需要深入總結(jié)下。
實質(zhì)上,很多后臺服務(wù)程序并發(fā)控制的基本原理都可以歸納為生產(chǎn)者/消費者模式,而這是恰恰是在本科操作系統(tǒng)課堂上老師反復(fù)講解,而我們卻視而不見不以為然的。在博文《一種面向作業(yè)流(工作流)的輕量級可復(fù)用的異步流水開發(fā)框架的設(shè)計與實現(xiàn)》中將介紹一種生產(chǎn)者/消費者模式的具體應(yīng)用。
生產(chǎn)者消費者問題是研究多線程程序時繞不開的經(jīng)典問題之一,它描述是有一塊緩沖區(qū)作為倉庫,生產(chǎn)者可以將產(chǎn)品放入倉庫,消費者則可以從倉庫中取走產(chǎn)品。
解決生產(chǎn)者/消費者問題的方法可分為兩類:
(1)采用某種機制保護(hù)生產(chǎn)者和消費者之間的同步;
(2)在生產(chǎn)者和消費者之間建立一個管道。
第一種方式有較高的效率,并且易于實現(xiàn),代碼的可控制性較好,屬于常用的模式。第二種管道緩沖區(qū)不易控制,被傳輸數(shù)據(jù)對象不易于封裝等,實用性不強。
因此本文只介紹同步機制實現(xiàn)的生產(chǎn)者/消費者問題。
同步問題核心在于:如何保證同一資源被多個線程并發(fā)訪問時的完整性。常用的同步方法是采用信號或加鎖機制,保證資源在任意時刻至多被一個線程訪問。Java語言在多線程編程上實現(xiàn)了完全對象化,提供了對同步機制的良好支持。
在Java中一共有四種方法支持同步,其中前三個是同步方法,一個是管道方法。
(1)wait() / notify()方法
(2)await() / signal()方法
(3)BlockingQueue阻塞隊列方法
(4)PipedInputStream / PipedOutputStream
本文只介紹最常用的前三種,第四種暫不做討論,有興趣的讀者可以自己去網(wǎng)上找答案。
wait()和notify()方法的實現(xiàn)
wait() / nofity()方法是基類Object的兩個方法,也就意味著所有Java類都會擁有這兩個方法,這樣,我們就可以為任何對象實現(xiàn)同步機制。
調(diào)用 wait() 使得線程等待某個條件滿足,線程在等待時會被掛起,當(dāng)其他線程的運行使得這個條件滿足時,其它線程會調(diào)用 notify() 或者 notifyAll() 來喚醒掛起的線程。
它們都屬于 Object 的一部分,而不屬于 Thread。
只能用在同步方法或者同步控制塊中使用,否則會在運行時拋出 IllegalMonitorStateException。
使用 wait() 掛起期間,線程會釋放鎖。這是因為,如果沒有釋放鎖,那么其它線程就無法進(jìn)入對象的同步方法或者同步控制塊中,那么就無法執(zhí)行 notify() 或者 notifyAll() 來喚醒掛起的線程,造成死鎖。
1 /** 2 * Project Name:basic 3 * File Name:ProducerAndConsumerWaitNotifyAll.java 4 * Package Name:com.forwork.com.basic.thread0411 5 * Date:2019年4月11日上午6:45:33 6 * Copyright (c) 2019, 深圳金融電子結(jié)算中心 All Rights Reserved. 7 * 8 */ 9 10 package com.forwork.com.basic.thread0411; 11 12 /** 13 * ClassName:ProducerAndConsumerWaitNotifyAll <br/> 14 * Function: TODO <br/> 15 * Date: 2019年4月11日 上午6:45:33 <br/> 16 * @author Administrator 17 * @version 1.0 18 * @since JDK 1.7 19 * @see 20 */ 21 public class ProducerAndConsumerWaitNotifyAll { 22 23 private static int count = 0; 24 private static int FULL = 3; //等待條件 25 private static int EMPTY = 0; 26 private static String LOCK = "lock"; 27 28 private static class Producer implements Runnable { 29 public void run() { 30 for (int i = 0; i < 3; i++) { 31 try { 32 Thread.sleep(1000); 33 } catch (InterruptedException e) { 34 e.printStackTrace(); 35 } 36 synchronized (LOCK) { 37 if (count == FULL) { 38 System.out.println(Thread.currentThread().getName() + "producelock:" + count); 39 try { 40 LOCK.wait(); 41 } catch (InterruptedException e) { 42 e.printStackTrace(); 43 } 44 } 45 count++; 46 System.out.println(Thread.currentThread().getName() + "produce:" + count); 47 LOCK.notifyAll(); 48 49 } 50 } 51 } 52 } 53 54 private static class Consumer implements Runnable { 55 public void run() { 56 for (int i = 0; i < 3; i++) { 57 try { 58 Thread.sleep(1000); 59 } catch (InterruptedException e) { 60 e.printStackTrace(); 61 } 62 63 synchronized (LOCK) { 64 if (count == EMPTY) { 65 try { 66 System.out.println(Thread.currentThread().getName() + "consumerlock:" + count); 67 LOCK.wait(); 68 } catch (Exception e) { 69 e.printStackTrace(); 70 } 71 }// (count == EMPTY) 72 count--; 73 System.out.println(Thread.currentThread().getName() + "consumer:" + count); 74 LOCK.notifyAll(); 75 } 76 } 77 } 78 } 79 80 public static void main(String[] args) { 81 for (int i = 0; i < 5; i++) { 82 Producer producer = new Producer(); 83 new Thread(producer).start(); 84 } 85 86 for (int i = 0; i < 5; i++) { 87 Consumer consumer = new Consumer(); 88 new Thread(consumer).start(); 89 } 90 } 91 92 }?
?
結(jié)果:
1 Thread-1produce:1 2 Thread-6consumer:0 3 Thread-5consumerlock:0 4 Thread-8consumerlock:0 5 Thread-9consumerlock:0 6 Thread-7consumerlock:0 7 Thread-4produce:1 8 Thread-0produce:2 9 Thread-3produce:3 10 Thread-2producelock:3 11 Thread-7consumer:2 12 Thread-9consumer:1 13 Thread-8consumer:0 14 Thread-5consumer:-1 15 Thread-2produce:0 16 Thread-1produce:1 17 Thread-6consumer:0 18 Thread-0produce:1 19 Thread-3produce:2 20 Thread-4produce:3 21 Thread-9consumer:2 22 Thread-7consumer:1 23 Thread-2produce:2 24 Thread-8consumer:1 25 Thread-5consumer:0 26 Thread-1produce:1 27 Thread-6consumer:0 28 Thread-0produce:1 29 Thread-4produce:2 30 Thread-3produce:3 31 Thread-2producelock:3 32 Thread-9consumer:2 33 Thread-8consumer:1 34 Thread-7consumer:0 35 Thread-5consumerlock:0 36 Thread-2produce:1 37 Thread-5consumer:0 View Code?
生產(chǎn)者在緩沖區(qū)full后wait,等待消費者調(diào)用notifyAll()喚醒后繼續(xù)生產(chǎn);消費者在緩沖區(qū)empty后wait,等待生產(chǎn)者調(diào)用notifyAll()喚醒后繼續(xù)消費。
wait() 和 sleep() 的區(qū)別
- wait() 是 Object 的方法,而 sleep() 是 Thread 的靜態(tài)方法;
- wait() 會釋放鎖,sleep() 不會。
可重入鎖ReentrantLock的實現(xiàn)
java.util.concurrent.lock 中的 Lock 框架是鎖定的一個抽象,通過對lock的lock()方法和unlock()方法實現(xiàn)了對鎖的顯示控制,而synchronize()則是對鎖的隱性控制。
可重入鎖,也叫做遞歸鎖,指的是同一線程 外層函數(shù)獲得鎖之后 ,內(nèi)層遞歸函數(shù)仍然有獲取該鎖的代碼,但不受影響,簡單來說,該鎖維護(hù)這一個與獲取鎖相關(guān)的計數(shù)器,如果擁有鎖的某個線程再次得到鎖,那么獲取計數(shù)器就加1,函數(shù)調(diào)用結(jié)束計數(shù)器就減1,然后鎖需要被釋放兩次才能獲得真正釋放。已經(jīng)獲取鎖的線程進(jìn)入其他需要相同鎖的同步代碼塊不會被阻塞。
java.util.concurrent 類庫中提供了 Condition 類來實現(xiàn)線程之間的協(xié)調(diào),可以在 Condition 上調(diào)用 await() 方法使線程等待,其它線程調(diào)用 signal() 或 signalAll() 方法喚醒等待的線程。
相比于 wait() 這種等待方式,await() 可以指定等待的條件,因此更加靈活。
使用 Lock 來獲取一個 Condition 對象。
?
1 package com.forwork.com.basic.thread0411; 2 3 import java.util.concurrent.locks.Condition; 4 import java.util.concurrent.locks.Lock; 5 import java.util.concurrent.locks.ReentrantLock; 6 7 /** 8 * ClassName:ProduceAndConsumerReenTrantLock <br/> 9 * Function: ReenTrantLock實現(xiàn) 10 * Date: 2019年4月11日 上午7:55:20 <br/> 11 * @author Administrator 12 * @version 1.0 13 * @since JDK 1.7 14 * @see 15 */ 16 public class ProduceAndConsumerReenTrantLock { 17 18 private static int count = 0; 19 private static int FULL = 3; //等待條件 20 private static int EMPTY = 0; 21 private static Lock clock = new ReentrantLock(); 22 private static Condition empty = clock.newCondition(); 23 private static Condition full = clock.newCondition(); 24 25 private static class Producer implements Runnable { 26 public void run() { 27 for (int i = 0; i < 3; i++) { 28 try { 29 Thread.sleep(1000); 30 } catch (InterruptedException e) { 31 e.printStackTrace(); 32 } 33 34 clock.lock(); 35 try { 36 if (count == FULL) { 37 System.out.println(Thread.currentThread().getName() + " producelock:" + count); 38 try { 39 full.await(); 40 } catch (InterruptedException e) { 41 e.printStackTrace(); 42 } 43 } 44 45 count++; 46 System.out.println(Thread.currentThread().getName() + " produce:" + count); 47 empty.signalAll(); //喚醒消費者 48 } finally { 49 clock.unlock(); 50 } 51 } 52 } 53 } 54 55 private static class Consumer implements Runnable { 56 public void run() { 57 for (int i = 0; i < 3; i++) { 58 try { 59 Thread.sleep(1000); 60 } catch (InterruptedException e) { 61 e.printStackTrace(); 62 } 63 clock.lock(); 64 try { 65 if (count == EMPTY) { 66 try { 67 System.out.println(Thread.currentThread().getName() + " consumerlock:" + count); 68 empty.await(); 69 } catch (Exception e) { 70 e.printStackTrace(); 71 } 72 }// (count == EMPTY) 73 count--; 74 System.out.println(Thread.currentThread().getName() + " consumer:" + count); 75 full.signalAll(); //喚醒生產(chǎn)者 76 } finally { 77 clock.unlock(); 78 } 79 } 80 } 81 } 82 83 public static void main(String[] args) { 84 for (int i = 0; i < 5; i++) { 85 Producer producer = new Producer(); 86 new Thread(producer).start(); 87 } 88 89 for (int i = 0; i < 5; i++) { 90 Consumer consumer = new Consumer(); 91 new Thread(consumer).start(); 92 } 93 } 94 95 }?
結(jié)果:
Thread-1 produce:1 Thread-4 produce:2 Thread-0 produce:3 Thread-2 producelock:3 Thread-6 consumer:2 Thread-5 consumer:1 Thread-7 consumer:0 Thread-3 produce:1 Thread-9 consumer:0 Thread-8 consumerlock:0 Thread-2 produce:1 Thread-8 consumer:0 Thread-0 produce:1 Thread-1 produce:2 Thread-4 produce:3 Thread-5 consumer:2 Thread-6 consumer:1 Thread-9 consumer:0 Thread-7 consumerlock:0 Thread-3 produce:1 Thread-7 consumer:0 Thread-2 produce:1 Thread-8 consumer:0 Thread-4 produce:1 Thread-0 produce:2 Thread-1 produce:3 Thread-6 consumer:2 Thread-5 consumer:1 Thread-3 produce:2 Thread-7 consumer:1 Thread-9 consumer:0 Thread-2 produce:1 Thread-8 consumer:0 View Code?通過clock來newCondition()。
在try finally塊中釋放lock鎖。
Condition 類上通過await()和signal() signalAll()實現(xiàn)線程的協(xié)同
三、BlockingQueue阻塞隊列方法
BlockingQueue是JDK5.0的新增內(nèi)容,它是一個已經(jīng)在內(nèi)部實現(xiàn)了同步的隊列,實現(xiàn)方式采用的是我們第2種await() / signal()方法。它可以在生成對象時指定容量大小。它用于阻塞操作的是put()和take()方法。
put()方法:類似于我們上面的生產(chǎn)者線程,容量達(dá)到最大時,自動阻塞。
take()方法:類似于我們上面的消費者線程,容量為0時,自動阻塞。
public class ProducerConsumer {private static BlockingQueue<String> queue = new ArrayBlockingQueue<>(5);private static class Producer extends Thread {@Overridepublic void run() {try {queue.put("product");} catch (InterruptedException e) {e.printStackTrace();}System.out.print("produce..");}}private static class Consumer extends Thread {@Overridepublic void run() {try {String product = queue.take();} catch (InterruptedException e) {e.printStackTrace();}System.out.print("consume..");}} }?
?
public static void main(String[] args) {for (int i = 0; i < 2; i++) {Producer producer = new Producer();producer.start();}for (int i = 0; i < 5; i++) {Consumer consumer = new Consumer();consumer.start();}for (int i = 0; i < 3; i++) {Producer producer = new Producer();producer.start();} }?
produce..produce..consume..consume..produce..consume..produce..consume..produce..consume..?
?
BlockingQueue即阻塞隊列,從阻塞這個詞可以看出,在某些情況下對阻塞隊列的訪問可能會造成阻塞。被阻塞的情況主要有如下兩種:
因此,當(dāng)一個線程對已經(jīng)滿了的阻塞隊列進(jìn)行入隊操作時會阻塞,除非有另外一個線程進(jìn)行了出隊操作,當(dāng)一個線程對一個空的阻塞隊列進(jìn)行出隊操作時也會阻塞,除非有另外一個線程進(jìn)行了入隊操作。
從上可知,阻塞隊列是線程安全的。
下面是BlockingQueue接口的一些方法:
其實阻塞隊列實現(xiàn)阻塞同步的方式很簡單,使用的就是是lock鎖的多條件(condition)阻塞控制。使用BlockingQueue封裝了根據(jù)條件阻塞線程的過程,而我們就不用關(guān)心繁瑣的await/signal操作了。
package com.forwork.com.basic.thread0411;import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue;/*** ClassName:ProduceAndConsumerBlockQueue <br/>* Function: TODO <br/>* Date: 2019年4月12日 上午6:50:14 <br/>* @author Administrator* @version 1.0* @since JDK 1.7* @see */ public class ProduceAndConsumerBlockQueue {private static BlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(3);private static class Producer implements Runnable {public void run() {for (int i = 0; i < 3; i++) {try {Thread.sleep(1000);queue.put(i); //入隊System.out.println(Thread.currentThread().getName() + " produce:" + queue.size());} catch (Exception e) {e.printStackTrace();}}}}private static class Consumer implements Runnable {public void run() {for (int i = 0; i < 3; i++) {try {Thread.sleep(1000);queue.take(); //出隊System.out.println(Thread.currentThread().getName() + " consumer:" + queue.size());} catch (Exception e) {e.printStackTrace();}}}}public static void main(String[] args) {for (int i = 0; i < 3; i++) {Producer producer = new Producer();new Thread(producer).start();}for (int i = 0; i < 3; i++) {Consumer consumer = new Consumer();new Thread(consumer).start();}}}?
1 Thread-0 produce:3 2 Thread-2 produce:0 3 Thread-1 produce:0 4 Thread-4 consumer:0 5 Thread-5 consumer:1 6 Thread-3 consumer:2 7 Thread-0 produce:1 8 Thread-3 consumer:0 9 Thread-5 consumer:0 10 Thread-4 consumer:0 11 Thread-1 produce:1 12 Thread-2 produce:2 13 Thread-0 produce:1 14 Thread-3 consumer:0 15 Thread-1 produce:1 16 Thread-2 produce:2 17 Thread-4 consumer:0 18 Thread-5 consumer:1 View Codeput和take采用阻塞的方式插入和取出元素。
當(dāng)隊列為空或者滿的時候,線程會掛起,直到有元素放入或者取出時候才會繼續(xù)執(zhí)行。
?Java并發(fā)編程-阻塞隊列(BlockingQueue)的實現(xiàn)原理
信號量Semaphore的實現(xiàn)
Semaphore(信號量)是用來控制同時訪問特定資源的線程數(shù)量,它通過協(xié)調(diào)各個線程,以保證合理的使用公共資源,在操作系統(tǒng)中是一個非常重要的問題,可以用來解決哲學(xué)家就餐問題。Java中的Semaphore維護(hù)了一個許可集,一開始先設(shè)定這個許可集的數(shù)量,可以使用acquire()方法獲得一個許可,當(dāng)許可不足時會被阻塞,release()添加一個許可。在下列代碼中,還加入了另外一個mutex信號量,維護(hù)生產(chǎn)者消費者之間的同步關(guān)系,保證生產(chǎn)者和消費者之間的交替進(jìn)行
?
/*** Project Name:basic* File Name:ProduceAndConsumerSemaphore.java* Package Name:com.forwork.com.basic.thread0411* Date:2019年4月12日上午8:05:07* Copyright (c) 2019, 深圳金融電子結(jié)算中心 All Rights Reserved.* */package com.forwork.com.basic.thread0411;import java.util.concurrent.Semaphore;/*** ClassName:ProduceAndConsumerSemaphore <br/>* Function: TODO <br/>* Date: 2019年4月12日 上午8:05:07 <br/>* @author Administrator* @version 1.0* @since JDK 1.7* @see */ public class ProduceAndConsumerSemaphore {private static Semaphore sp = new Semaphore(3);private static class Producer implements Runnable {public void run() {for (int i = 0; i < 3; i++) {try {Thread.sleep(1000);sp.acquire(); //入隊System.out.println(Thread.currentThread().getName() + " produce:" + sp.availablePermits());} catch (Exception e) {e.printStackTrace();}}}}private static class Consumer implements Runnable {public void run() {for (int i = 0; i < 3; i++) {try {Thread.sleep(1000);sp.release(); //出隊System.out.println(Thread.currentThread().getName() + " consumer:" + sp.availablePermits());} catch (Exception e) {e.printStackTrace();}}}}public static void main(String[] args) {for (int i = 0; i < 3; i++) {Producer producer = new Producer();new Thread(producer).start();}for (int i = 0; i < 3; i++) {Consumer consumer = new Consumer();new Thread(consumer).start();}}}?
結(jié)果:
1 Thread-0 produce:0 2 Thread-2 produce:0 3 Thread-1 produce:0 4 Thread-3 consumer:1 5 Thread-4 consumer:3 6 Thread-5 consumer:3 7 Thread-2 produce:0 8 Thread-1 produce:0 9 Thread-0 produce:0 10 Thread-3 consumer:1 11 Thread-4 consumer:3 12 Thread-5 consumer:3 13 Thread-0 produce:1 14 Thread-2 produce:1 15 Thread-1 produce:0 16 Thread-3 consumer:1 17 Thread-4 consumer:3 18 Thread-5 consumer:3 View Code?
?
/*** Project Name:basic* File Name:SemaphoreTest.java* Package Name:com.forwork.com.basic.thread0411* Date:2019年4月12日上午8:07:48* Copyright (c) 2019, 深圳金融電子結(jié)算中心 All Rights Reserved.* */package com.forwork.com.basic.thread0411;import java.util.concurrent.Semaphore;/*** ClassName:SemaphoreTest <br/>* Function: Semaphore相當(dāng)于一個隊列,隊列中可用的信號量為初始化分配的數(shù)量n。* 每次release就多分配一個,acquire就消耗一個 <br/>* Date: 2019年4月12日 上午8:07:48 <br/>* @author Administrator* @version 1.0* @since JDK 1.7* @see */ public class SemaphoreTest {private static Semaphore sp = new Semaphore(0);public static void main(String[] args) {try {for (int i = 0; i < 3; i++) {System.out.println(sp.availablePermits() + ":one");sp.release();sp.release();System.out.println(sp.availablePermits() + ":two");sp.acquire();System.out.println(sp.availablePermits() + ":three");sp.acquire();System.out.println(sp.availablePermits() + ":four");}} catch (Exception e) {e.printStackTrace();}}}?
結(jié)果:
1 0:one 2 2:two 3 1:three 4 0:four 5 0:one 6 2:two 7 1:three 8 0:four 9 0:one 10 2:two 11 1:three 12 0:four View Code如何取得可用數(shù)量集的個數(shù):sp.availablePermits()
每次release可用數(shù)量集會增加?是的,相當(dāng)于BlockingQueue中的put操作
管道輸入輸出流PipedInputStream和PipedOutputStream實現(xiàn)
ps:了解
Java里的管道輸入流 PipedInputStream與管道輸出流 PipedOutputStream
感覺不是很好用~
在java的io包下,PipedOutputStream和PipedInputStream分別是管道輸出流和管道輸入流。
它們的作用是讓多線程可以通過管道進(jìn)行線程間的通訊。在使用管道通信時,必須將PipedOutputStream和PipedInputStream配套使用。
使用方法:先創(chuàng)建一個管道輸入流和管道輸出流,然后將輸入流和輸出流進(jìn)行連接,用生產(chǎn)者線程往管道輸出流中寫入數(shù)據(jù),消費者在管道輸入流中讀取數(shù)據(jù),這樣就可以實現(xiàn)了不同線程間的相互通訊,但是這種方式在生產(chǎn)者和生產(chǎn)者、消費者和消費者之間不能保證同步,也就是說在一個生產(chǎn)者和一個消費者的情況下是可以生產(chǎn)者和消費者之間交替運行的,多個生成者和多個消費者者之間則不行
?
/*** 使用管道實現(xiàn)生產(chǎn)者消費者模型* @author ZGJ* @date 2017年6月30日*/ public class Test5 {final PipedInputStream pis = new PipedInputStream();final PipedOutputStream pos = new PipedOutputStream();{try {pis.connect(pos);} catch (IOException e) {e.printStackTrace();}}class Producer implements Runnable {@Overridepublic void run() {try {while(true) {Thread.sleep(1000);int num = (int) (Math.random() * 255);System.out.println(Thread.currentThread().getName() + "生產(chǎn)者生產(chǎn)了一個數(shù)字,該數(shù)字為: " + num);pos.write(num);pos.flush();} } catch (Exception e) {e.printStackTrace();} finally {try {pos.close();pis.close();} catch (IOException e) {e.printStackTrace();}}}}class Consumer implements Runnable {@Overridepublic void run() {try {while(true) {Thread.sleep(1000);int num = pis.read();System.out.println("消費者消費了一個數(shù)字,該數(shù)字為:" + num);}} catch (Exception e) {e.printStackTrace();} finally {try {pos.close();pis.close();} catch (IOException e) {e.printStackTrace();}}}}public static void main(String[] args) {Test5 test5 = new Test5();new Thread(test5.new Producer()).start();new Thread(test5.new Consumer()).start();} }?
?
?
轉(zhuǎn)載于:https://www.cnblogs.com/lixuwu/p/5676112.html
總結(jié)
以上是生活随笔為你收集整理的(转)生产者/消费者问题的多种Java实现方式 (待整理)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: linux命令(常用)
- 下一篇: SCI科技论文写作技巧-核心价值