用Java解决生产者-消费者问题
生活随笔
收集整理的這篇文章主要介紹了
用Java解决生产者-消费者问题
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
當我們嘗試多線程編程時,生產者-消費者問題是最常見的問題之一。 盡管不像多線程編程中的其他一些問題那樣具有挑戰性,但是錯誤地實現此問題可能會造成應用程序混亂。 生產的物品將不使用,開始的物品將被跳過,消耗量取決于生產是在消耗嘗試之前還是之后開始的,等等。此外,您可能會在異常發生后很長時間注意到異常,最重要的是,幾乎所有異常線程程序,這一程序也很難調試和復制。 因此,在這篇文章中,我認為我將嘗試借助Java出色的java.util.concurrent包及其類來解決Java中的此問題。 首先,讓我們看一下生產者-消費者問題的特征:
- 生產者生產物品。
- 消費者消費生產者生產的物品。
- 生產者完成生產,并讓消費者知道他們已經完成了。
- 消耗該項目的步驟獨立產生,而不依賴于其他項目。
- 處理項目的時間大于生產項目的時間。
- 可以有多個生產者。
- 將有多個消費者。
- 一旦完成新物品的生產,生產者將告知消費者,以便消費者在消費并加工完最后一件物品后退出。
在這里,可以由多個類似商品的生產者共享消費者。 類似的項目,我的意思是生產者,其生產“項目”類型的對象。 Item的定義如下:
package com.maximus.consumer;public interface Item {public void process(); }現在我們來看一下Consumer接口的實現:
package com.maximus.consumer;import java.util.LinkedList; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue;public class ConsumerImpl implements Consumer {private BlockingQueue< Item > itemQueue = new LinkedBlockingQueue<Item>();private ExecutorService executorService = Executors.newCachedThreadPool();private List<ItemProcessor> jobList = new LinkedList<ItemProcessor>();private volatile boolean shutdownCalled = false;public ConsumerImpl(int poolSize){for(int i = 0; i < poolSize; i++){ItemProcessor jobThread = new ItemProcessor(itemQueue);jobList.add(jobThread);executorService.submit(jobThread);}}public boolean consume(Item j){if(!shutdownCalled){try{itemQueue.put(j);}catch(InterruptedException ie){Thread.currentThread().interrupt();return false;}return true;}else{return false;}}public void finishConsumption(){for(ItemProcessor j : jobList){j.cancelExecution();}executorService.shutdown();} }現在,唯一感興趣的點是消費者內部用于處理傳入商品的ItemProcessor。 ItemProcessor的編碼如下:
package com.maximus.consumer;import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit;public class ItemProcessor implements Runnable {private BlockingQueue<Item> jobQueue;private volatile boolean keepProcessing;public ItemProcessor(BlockingQueue<Item> queue){jobQueue = queue;keepProcessing = true;}public void run(){while(keepProcessing || !jobQueue.isEmpty()){try{Item j = jobQueue.poll(10, TimeUnit.SECONDS);if(j != null){j.process();}}catch(InterruptedException ie){Thread.currentThread().interrupt();return;}}}public void cancelExecution(){this.keepProcessing = false;} } 上面唯一的挑戰是while循環中的條件。 這樣編寫while循環,即使在生產者完成生產并通知消費者生產完成之后,也可以支持項目消耗的繼續。 上面的while循環可確保在線程退出之前完成所有項目的消耗。 上面的使用者是線程安全的,可以共享多個生產者,以便每個生產者可以并發調用consumer.consume(),而不必擔心同步和其他多線程警告。 生產者只需要提交Item接口的實現,其process()方法將包含如何完成消耗的邏輯。 作為閱讀本文的獎勵,我提出了一個測試程序,演示了如何使用上述類: package com.maximus.consumer;import java.io.BufferedReader; import java.io.File; import java.io.FileInputStream; import java.io.InputStreamReader;public class Test {public static void main(String[] args) throws Exception{Consumer consumer = new ConsumerImpl(10);BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(new File(args[0]))));String line = "";while((line = br.readLine()) != null){System.out.println("Producer producing: " + line);consumer.consume(new PrintJob(line));}consumer.finishConsumption();} }class PrintJob implements Item {private String line;public PrintJob(String s){line = s;}public void process(){System.out.println(Thread.currentThread().getName() + " consuming :" + line);} } 可以通過多種不同的方式來調整上述消費者,使其更加靈活。 我們可以定義生產完成后消費者將做什么。 可能對其進行了調整,以允許批處理,但我將其留給用戶使用。 隨意使用它,并以任何想要的方式扭曲它。 編碼愉快!參考: The Java HotSpot博客上的JCG合作伙伴 Sarma Swaranga 解決了Java中的生產者-消費者問題 。
翻譯自: https://www.javacodegeeks.com/2012/05/solving-producer-consumer-problem-in.html
總結
以上是生活随笔為你收集整理的用Java解决生产者-消费者问题的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 动态ADF火车:以编程方式添加火车停靠站
- 下一篇: 超出了GC开销限制– Java堆分析