Java多线程——生产者消费者问题
創建多個線程去執行不同的任務,如果這些任務之間有著某種關系,那么線程之間必須能夠通信來協調完成工作。
生產者消費者問題(英語:Producer-consumer problem)就是典型的多線程同步案例,它也被稱為有限緩沖問題(英語:Bounded-buffer problem)。該問題描述了共享固定大小緩沖區的兩個線程——即所謂的“生產者”和“消費者”——在實際運行時會發生的問題。生產者的主要作用是生成一定量的數據放到緩沖區中,然后重復此過程。與此同時,消費者也在緩沖區消耗這些數據。該問題的關鍵就是要保證生產者不會在緩沖區滿時加入數據,消費者也不會在緩沖區中空時消耗數據
注意:生產者-消費者模式中的內存緩存區的主要功能是數據在多線程間的共享,此外,通過該緩沖區,可以緩解生產者和消費者的性能差;
準備基礎代碼:無通信的生產者消費者
我們來自己編寫一個例子:一個生產者,一個消費者,并且讓他們讓他們使用同一個共享資源,并且我們期望的是生產者生產一條放到共享資源中,消費者就會對應地消費一條。
我們先來模擬一個簡單的共享資源對象:
publicclass ShareResource {private String name;private String gender;/*** 模擬生產者向共享資源對象中存儲數據** @param name* @param gender*/public void push(String name, String gender) {this.name = name;this.gender = gender;}/*** 模擬消費者從共享資源中取出數據*/public void popup() {System.out.println(this.name + "-" + this.gender);} }然后來編寫我們的生產者,使用循環來交替地向共享資源中添加不同的數據:
publicclass Producer implements Runnable {private ShareResource shareResource;public Producer(ShareResource shareResource) {this.shareResource = shareResource;}@Overridepublic void run() {for (int i = 0; i < 50; i++) {if (i % 2 == 0) {shareResource.push("鳳姐", "女");} else {shareResource.push("張三", "男");}}} }接著讓我們的消費者不停地消費生產者產生的數據:
publicclass Consumer implements Runnable {private ShareResource shareResource;public Consumer(ShareResource shareResource) {this.shareResource = shareResource;}@Overridepublic void run() {for (int i = 0; i < 50; i++) {shareResource.popup();}} }然后我們寫一段測試代碼,來看看效果:
public static void main(String[] args) {// 創建生產者和消費者的共享資源對象ShareResource shareResource = new ShareResource();// 啟動生產者線程new Thread(new Producer(shareResource)).start();// 啟動消費者線程new Thread(new Consumer(shareResource)).start(); }我們運行發現出現了詭異的現象,所有的生產者都似乎消費到了同一條數據:
張三-男 張三-男 ....以下全是張三-男....為什么會出現這樣的情況呢?照理說,我的生產者在交替地向共享資源中生產數據,消費者也應該交替消費才對呀…我們大膽猜測一下,會不會是因為消費者是直接循環了 30 次打印共享資源中的數據,而此時生產者還沒有來得及更新共享資源中的數據,消費者就已經連續打印了 30 次了,所以我們讓消費者消費的時候以及生產者生產的時候都小睡個 10 ms 來緩解消費太快 or 生產太快帶來的影響,也讓現象更明顯一些:
/*** 模擬生產者向共享資源對象中存儲數據** @param name* @param gender*/ public void push(String name, String gender) {try {Thread.sleep(10);} catch (InterruptedException ignored) {}this.name = name;this.gender = gender; }/*** 模擬消費者從共享資源中取出數據*/ public void popup() {try {Thread.sleep(10);} catch (InterruptedException ignored) {}System.out.println(this.name + "-" + this.gender); }再次運行代碼,發現了出現了以下的幾種情況:
重復消費:消費者連續地出現兩次相同的消費情況(張三-男/ 張三-男);
性別紊亂:消費者消費到了臟數據(張三-女/ 鳳姐-男);
分析出現問題的原因
重復消費:我們先來看看重復消費的問題,當生產者生產出一條數據的時候,消費者正確地消費了一條,但是當消費者再來共享資源中消費的時候,生產者還沒有準備好新的一條數據,所以消費者就又消費到老數據了,這其中的根本原因是生產者和消費者的速率不一致。
性別紊亂:再來分析第二種情況。不同于上面的情況,消費者在消費第二條數據時,生產者也正在生產新的數據,但是尷尬的是,生產者只生產了一半兒(也就是該執行完 this.name = name),也就是還沒有來得及給 gender 賦值就被消費者給取走消費了… 造成這樣情況的根本原因是沒有保證生產者生產數據的原子性。
解決出現的問題
加鎖解決性別紊亂
我們先來解決性別紊亂,也就是原子性的問題吧,上一篇文章里我們也提到了,對于這樣的原子性操作,解決方法也很簡單:加鎖。稍微改造一下就好了:
/*** 模擬生產者向共享資源對象中存儲數據** @param name* @param gender*/ synchronized public void push(String name, String gender) {this.name = name;try {Thread.sleep(10);} catch (InterruptedException ignored) {}this.gender = gender; }/*** 模擬消費者從共享資源中取出數據*/ synchronized public void popup() {try {Thread.sleep(10);} catch (InterruptedException ignored) {}System.out.println(this.name + "-" + this.gender); }我們在方法前面都加上了 synchronized 關鍵字,來保證每一次讀取和修改都只能是一個線程,這是因為當 synchronized 修飾在普通同步方法上時,它會自動鎖住當前實例對象,也就是說這樣改造之后讀/ 寫操作同時只能進行其一;
我把 push 方法小睡的代碼改在了賦值 name 和 gender 的中間,以強化驗證原子性操作是否成功,因為如果不是原子性的話,就很可能出現賦值 name 還沒賦值給 gender 就被取走的情況,小睡一會兒是為了加強這種情況的出現概率(可以試著把 synchronized 去掉看看效果);
運行代碼后發現,并沒有出現性別紊亂的現象了,但是重復消費仍然存在。
等待喚醒機制解決重復消費
我們期望的是 張三-男 和 鳳姐-女 交替出現,而不是有重復消費的情況,所以我們的生產者和消費者之間需要一點溝通,最容易想到的解決方法是,我們新增加一個標志位,然后在消費者中使用 while 循環判斷,不滿足條件則不消費,條件滿足則退出 while 循環,從而完成消費者的工作。
while (value != desire) {Thread.sleep(10); } doSomething();這樣做的目的就是為了防止「過快的無效嘗試」,這種方法看似能夠實現所需的功能,但是卻存在如下的問題:
1)難以確保及時性。在睡眠時,基本不消耗處理器的資源,但是如果睡得過久,就不能及時發現條件已經變化,也就是及時性難以保證;
2)難以降低開銷。如果降低睡眠的時間,比如休眠 1 毫秒,這樣消費者能夠更加迅速地發現條件變化,但是卻可能消耗更多的處理資源,造成了無端的浪費。
以上兩個問題嗎,看似矛盾難以調和,但是 Java 通過內置的等待/ 通知機制能夠很好地解決這個矛盾并實現所需的功能。
等待/ 通知機制,是指一個線程 A 調用了對象 O 的 wait() 方法進入等待狀態,而另一個線程 B 調用了對象 O 的 notifyAll() 方法,線程 A 收到通知后從對象 O 的 wait() 方法返回,進而執行后續操作。上述兩個線程都是通過對象 O 來完成交互的,而對象上的 wait 和 notify/ notifyAll 的關系就如同開關信號一樣,用來完成等待方和通知方之間的交互工作。
這里有一個比較奇怪的點是,為什么看起來像是線程之間操作的 wait 和 notify/ notifyAll 方法會是 Object 類中的方法,而不是 Thread 類中的方法呢?
簡單來說:因為 synchronized 中的這把鎖可以是任意對象,因為要滿足任意對象都能夠調用,所以屬于 Object 類;
專業點說:因為這些方法在操作同步線程時,都必須要標識它們操作線程的鎖,只有同一個鎖上的被等待線程,可以被同一個鎖上的 notify 喚醒,不可以對不同鎖中的線程進行喚醒。也就是說,等待和喚醒必須是同一個鎖。而鎖可以是任意對象,所以可以被任意對象調用的方法是定義在 Object 類中。
好,簡單介紹完等待/ 通知機制,我們開始改造吧:
publicclass ShareResource {private String name;private String gender;// 新增加一個標志位,表示共享資源是否為空,默認為 trueprivateboolean isEmpty = true;/*** 模擬生產者向共享資源對象中存儲數據** @param name* @param gender*/synchronized public void push(String name, String gender) {try {while (!isEmpty) {// 當前共享資源不為空的時,則等待消費者來消費// 使用同步鎖對象來調用,表示當前線程釋放同步鎖,進入等待池,只能被其他線程所喚醒this.wait();}// 開始生產this.name = name;Thread.sleep(10);this.gender = gender;// 生產結束isEmpty = false;// 生產結束喚醒一個消費者來消費this.notify();} catch (Exception ignored) {}}/*** 模擬消費者從共享資源中取出數據*/synchronized public void popup() {try {while (isEmpty) {// 為空則等著生產者進行生產// 使用同步鎖對象來調用,表示當前線程釋放同步鎖,進入等待池,只能被其他線程所喚醒this.wait();}// 消費開始Thread.sleep(10);System.out.println(this.name + "-" + this.gender);// 消費結束isEmpty = true;// 消費結束喚醒一個生產者去生產this.notify();} catch (InterruptedException ignored) {}} }我們期望生產者生產一條,然后就去通知消費者消費一條,那么在生產和消費之前,都需要考慮當前是否需要生產 or 消費,所以我們新增了一個標志位來判斷,如果不滿足則等待;
被通知后仍然要檢查條件,條件滿足,則執行我們相應的生產 or 消費的邏輯,然后改變條件(這里是 isEmpty),并且通知所有等待在對象上的線程;
注意:上面的代碼中通知使用的 notify() 方法,這是因為例子中寫死了只有一個消費者和生產者,在實際情況中建議還是使用 notifyAll() 方法,這樣多個消費和生產者邏輯也能夠保證(可以自己試一下);
小結
通過初始版本一步步地分析問題和解決問題,我們就差不多寫出了我們經典生產者消費者的經典代碼,但通常消費和生產的邏輯是寫在各自的消費者和生產者代碼里的,這里我為了方便閱讀,把他們都抽離到了共享資源上,我們可以簡單地再來回顧一下這個消費生產和等待通知的整個過程:
以上就是關于生產者生產一條數據,消費者消費一次的過程了,涉及的一些具體細節我們下面來說。
總結
以上是生活随笔為你收集整理的Java多线程——生产者消费者问题的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 判断某点是否在三角形内
- 下一篇: 拒绝垃圾专业化学:选择正确的专业远比多考