生产者/消费者模式(阻塞队列)
生活随笔
收集整理的這篇文章主要介紹了
生产者/消费者模式(阻塞队列)
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
?
生產消費者模式? 貌似也是阻塞的問題?花了一些時間終于弄明白這個鳥東東,以前還以為是不復雜的一個東西的,以前一直以為和觀察者模式差不多(其實也是差不多的,呵呵),生產消費者模式應該是可以通過觀察者模式來實現的,對于在什么環境下使用現在想的還不是特別清楚,主要是在實際中還沒使用過這個。?
需要使用到同步,以及線程,屬于多并發行列,和觀察者模式的差異也就在于此吧,所以實現起來也主要在這里的差異。 在實際的軟件開發過程中,經常會碰到如下場景:某個模塊負責產生數據,這些數據由另一個模塊來負責處理(此處的模塊是廣義的,可以是類、函數、線程、進程等)。產生數據的模塊,就形象地稱為生產者;而處理數據的模塊,就稱為消費者。?
單單抽象出生產者和消費者,還夠不上是生產者/消費者模式。該模式還需要有一個緩沖區處于生產者和消費者之間,作為一個中介。生產者把數據放入緩沖區,而消費者從緩沖區取出數據?
◇解耦?
假設生產者和消費者分別是兩個類。如果讓生產者直接調用消費者的某個方法,那么生產者對于消費者就會產生依賴(也就是耦合)。將來如果消費者的代碼發生變化,可能會影響到生產者。而如果兩者都依賴于某個緩沖區,兩者之間不直接依賴,耦合也就相應降低了。?
◇支持并發(concurrency)?
生產者直接調用消費者的某個方法,還有另一個弊端。由于函數調用是同步的(或者叫阻塞的),在消費者的方法沒有返回之前,生產者只好一直等在那邊。萬一消費者處理數據很慢,生產者就會白白糟蹋大好時光。?
使用了生產者/消費者模式之后,生產者和消費者可以是兩個獨立的并發主體(常見并發類型有進程和線程兩種,后面的帖子會講兩種并發類型下的應用)。生產者把制造出來的數據往緩沖區一丟,就可以再去生產下一個數據。基本上不用依賴消費者的處理速度。其實當初這個模式,主要就是用來處理并發問題的。?
◇支持忙閑不均?
緩沖區還有另一個好處。如果制造數據的速度時快時慢,緩沖區的好處就體現出來了。當數據制造快的時候,消費者來不及處理,未處理的數據可以暫時存在緩沖區中。等生產者的制造速度慢下來,消費者再慢慢處理掉。?
用了兩種方式實現了一下這個模式,主要參考了網上的一些例子才弄明白,這里對隊列的實現有很多種方法,需要和具體的應用相結合吧,隊列緩沖區很簡單,現在已有大量的實現,缺點是在性能上面(內存分配的開銷和同步/互斥的開銷),下面的實現都是這種方式;環形緩沖區(減少了內存分配的開銷),雙緩沖區(減少了同步/互斥的開銷)。?
第一個例子是使用的信號量的東東,沒有執行具體的東西,只是實現了這個例子,要做復雜的業務邏輯的話需要自己在某些方法內去具體實現?
代碼如下:?
消費者:?
public class TestConsumer implements Runnable { TestQueue obj; public TestConsumer(TestQueue tq){ this.obj=tq; } public void run() { try { for(int i=0;i<10;i++){ obj.consumer(); } } catch (Exception e) { e.printStackTrace(); } } }
?
生產者:?
public class TestProduct implements Runnable { TestQueue obj; public TestProduct(TestQueue tq){ this.obj=tq; } public void run() { for(int i=0;i<10;i++){ try { obj.product("test"+i); } catch (Exception e) { e.printStackTrace(); } } } }
?
隊列(使用了信號量,采用synchronized進行同步,采用lock進行同步會出錯,或許是還不知道實現的方法):?
public static Object signal=new Object(); boolean bFull=false; private List thingsList=new ArrayList(); private final ReentrantLock lock = new ReentrantLock(true); BlockingQueue q = new ArrayBlockingQueue(10); /** * 生產 * @param thing * @throws Exception */ public void product(String thing) throws Exception{ synchronized(signal){ if(!bFull){ bFull=true; //產生一些東西,放到 thingsList 共享資源中 System.out.println("product"); System.out.println("倉庫已滿,正等待消費..."); thingsList.add(thing); signal.notify(); //然后通知消費者 } signal.wait(); // 然后自己進入signal待召隊列 } } /** * 消費 * @return * @throws Exception */ public String consumer()throws Exception{ synchronized(signal){ if(!bFull) { signal.wait(); // 進入signal待召隊列,等待生產者的通知 } bFull=false; // 讀取buf 共享資源里面的東西 System.out.println("consume"); System.out.println("倉庫已空,正等待生產..."); signal.notify(); // 然后通知生產者 } String result=""; if(thingsList.size()>0){ result=thingsList.get(thingsList.size()-1).toString(); thingsList.remove(thingsList.size()-1); } return result; }
?
測試代碼:?
public class TestMain { public static void main(String[] args) throws Exception{ TestQueue tq=new TestQueue(); TestProduct tp=new TestProduct(tq); TestConsumer tc=new TestConsumer(tq); Thread t1=new Thread(tp); Thread t2=new Thread(tc); t1.start(); t2.start(); } }
?
運行結果:?
product 倉庫已滿,正等待消費... consume 倉庫已空,正等待生產... product 倉庫已滿,正等待消費... consume 倉庫已空,正等待生產... product 倉庫已滿,正等待消費... consume 倉庫已空,正等待生產... product 倉庫已滿,正等待消費... consume 倉庫已空,正等待生產... product 倉庫已滿,正等待消費... consume 倉庫已空,正等待生產... product 倉庫已滿,正等待消費... consume 倉庫已空,正等待生產... product 倉庫已滿,正等待消費... consume 倉庫已空,正等待生產... product 倉庫已滿,正等待消費... consume 倉庫已空,正等待生產... product 倉庫已滿,正等待消費... consume 倉庫已空,正等待生產... product 倉庫已滿,正等待消費... consume 倉庫已空,正等待生產...
?
第二種發放使用java.util.concurrent.BlockingQueue類來重寫的隊列那個類,使用這個方法比較簡單,并且性能上也沒有什么問題。?這是jdk里面的例子? * class Producer implements Runnable { * private final BlockingQueue queue; * Producer(BlockingQueue q) { queue = q; } * public void run() { * try { * while(true) { queue.put(produce()); } * } catch (InterruptedException ex) { ... handle ...} * } * Object produce() { ... } * } * * class Consumer implements Runnable { * private final BlockingQueue queue; * Consumer(BlockingQueue q) { queue = q; } * public void run() { * try { * while(true) { consume(queue.take()); } * } catch (InterruptedException ex) { ... handle ...} * } * void consume(Object x) { ... } * } * * class Setup { * void main() { * BlockingQueue q = new SomeQueueImplementation(); * Producer p = new Producer(q); * Consumer c1 = new Consumer(q); * Consumer c2 = new Consumer(q); * new Thread(p).start(); * new Thread(c1).start(); * new Thread(c2).start(); * } * }
?
jdk1.5以上的一個實現,使用了Lock以及條件變量等東西?
class BoundedBuffer { final Lock lock = new ReentrantLock(); final Condition notFull = lock.newCondition(); final Condition notEmpty = lock.newCondition(); final Object[] items = new Object[100]; int putptr, takeptr, count; public void put(Object x) throws InterruptedException { lock.lock(); try { while (count == items.length) notFull.await(); items[putptr] = x; if (++putptr == items.length) putptr = 0; ++count; notEmpty.signal(); } finally { lock.unlock(); } } public Object take() throws InterruptedException { lock.lock(); try { while (count == 0) notEmpty.await(); Object x = items[takeptr]; if (++takeptr == items.length) takeptr = 0; --count; notFull.signal(); return x; } finally { lock.unlock(); } } }
?
參考1:生產者/消費者模式(阻塞隊列) 參考2:生產者/消費者模式(阻塞隊列)總結
以上是生活随笔為你收集整理的生产者/消费者模式(阻塞队列)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 段子
- 下一篇: 工匠精神,缔造美国净水传奇