生产者-消费者
生產(chǎn)者消費(fèi)者也是一個非常經(jīng)典的多線程模式,我們在實(shí)際開發(fā)中應(yīng)用非常廣泛的思想理念。在生產(chǎn)者-消費(fèi)模式中:通常由兩類線程,即若干個生產(chǎn)者的線程和若干個消費(fèi)者的線程。生產(chǎn)者線程負(fù)責(zé)提交用戶請求,消費(fèi)者線程則負(fù)責(zé)具體處理生產(chǎn)者提交的任務(wù),在生產(chǎn)者和消費(fèi)者之間通過共享內(nèi)存緩存區(qū)進(jìn)行通信。
MQ:messageQueue消息隊(duì)列,是一個中間件
代碼實(shí)現(xiàn):
Provide:
package com.java.day04_mode_pro_custom;import java.util.Random; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger;public class Provider implements Runnable{//共享緩存區(qū)private BlockingQueue<Data> queue;//多線程間啟動自動變量,有強(qiáng)制從主內(nèi)存中刷新的功能,即時返回線程的狀態(tài)private volatile boolean isRunning = true;//id生成器private static AtomicInteger count = new AtomicInteger();//隨即對象private static Random r = new Random();public Provider(BlockingQueue<Data> queue){this.queue=queue;}@Overridepublic void run() {while(isRunning){System.out.println("*******開始生產(chǎn)數(shù)據(jù)*********");try {//隨機(jī)休眠0-1000毫秒 表示獲取數(shù)據(jù)(產(chǎn)生數(shù)據(jù)的耗時)Thread.sleep(r.nextInt(1000));//獲取的數(shù)據(jù)進(jìn)行累計(jì)int id = count.incrementAndGet();//比如通過一個getData方法獲取了Data data = new Data(id,"數(shù)據(jù)"+id);System.out.println("當(dāng)前線程:"+Thread.currentThread().getName()+"獲取了數(shù)據(jù),id為:"+id+",進(jìn)行裝載到公共緩沖區(qū)中。。。。");//有可能緩沖區(qū)數(shù)據(jù)滿了的情況,此時會提交失敗if(!this.queue.offer(data,2,TimeUnit.SECONDS)){System.out.println("數(shù)據(jù)裝載到緩沖區(qū)失敗。。。");//do something 比如重新提交。。。。 }System.out.println("*******一次生產(chǎn)完成*********");} catch (InterruptedException e) {e.printStackTrace();}}}public void stop(){this.isRunning=false;}}Consumer:
1 package com.java.day04_mode_pro_custom; 2 3 import java.util.Random; 4 import java.util.concurrent.BlockingQueue; 5 6 public class Consumer implements Runnable{ 7 8 //數(shù)據(jù)緩沖區(qū) 9 private BlockingQueue <Data>queue; 10 11 //隨機(jī)對象 12 private static Random r = new Random(); 13 14 public Consumer(BlockingQueue<Data> queue){ 15 this.queue=queue; 16 } 17 18 @Override 19 public void run() { 20 //線程不停的在處理數(shù)據(jù) 21 while(true){ 22 try { 23 Data data = this.queue.take(); 24 //隨機(jī)休眠時間為0-1000 表示正在處理數(shù)據(jù) 25 Thread.sleep(r.nextInt(1000)); 26 27 System.out.println("當(dāng)前消費(fèi)線程:"+Thread.currentThread().getName()+"消費(fèi)成功,消費(fèi)數(shù)據(jù)為:"+data.getName()); 28 29 30 } catch (InterruptedException e) { 31 e.printStackTrace(); 32 } 33 34 35 } 36 } 37 38 39 40 41 }Main:
1 package com.java.day04_mode_pro_custom; 2 3 import java.util.concurrent.BlockingQueue; 4 import java.util.concurrent.ExecutorService; 5 import java.util.concurrent.Executors; 6 import java.util.concurrent.LinkedBlockingQueue; 7 8 public class Main { 9 10 11 public static void main(String[] args) { 12 //內(nèi)存緩沖區(qū) 13 BlockingQueue<Data> queue = new LinkedBlockingQueue<Data>(10); 14 15 //生產(chǎn)者 16 Provider p1 = new Provider(queue); 17 Provider p2 = new Provider(queue); 18 Provider p3 = new Provider(queue); 19 20 //消費(fèi)者 21 Consumer c1 = new Consumer(queue); 22 23 Consumer c2 = new Consumer(queue); 24 Consumer c3 = new Consumer(queue); 25 26 //創(chuàng)建線程池運(yùn)行,這是一個緩存的線程池,可以創(chuàng)建無窮大的線程,沒有任務(wù)的時候不創(chuàng)建線程,空閑線程的存活時間為60s 27 ExecutorService cachePool = Executors.newCachedThreadPool(); 28 //傳的對象要實(shí)現(xiàn)runnable接口 29 cachePool.execute(p1); 30 cachePool.execute(p2); 31 cachePool.execute(p3); 32 cachePool.execute(c1); 33 cachePool.execute(c2); 34 cachePool.execute(c3); 35 36 try { 37 Thread.sleep(3000); 38 } catch (InterruptedException e) { 39 e.printStackTrace(); 40 } 41 42 //不再產(chǎn)生數(shù)據(jù) 43 p1.stop(); 44 p2.stop(); 45 p3.stop(); 46 47 System.out.println("停止生產(chǎn)數(shù)據(jù)。。。"); 48 try { 49 Thread.sleep(2000); 50 } catch (InterruptedException e) { 51 e.printStackTrace(); 52 } 53 54 55 56 57 } 58 59 60 61 62 63 64 65 66 }Data:
1 package com.java.day04_mode_pro_custom; 2 3 public class Data { 4 5 private int id; 6 private String name; 7 8 public Data(int id,String name){ 9 this.id=id; 10 this.name=name; 11 } 12 13 public int getId() { 14 return id; 15 } 16 17 public void setId(int id) { 18 this.id = id; 19 } 20 21 public String getName() { 22 return name; 23 } 24 25 public void setName(String name) { 26 this.name = name; 27 } 28 29 @Override 30 public String toString() { 31 return "Data [id=" + id + ", name=" + name + "]"; 32 } 33 34 35 36 }運(yùn)行結(jié)果:
1 *******開始生產(chǎn)數(shù)據(jù)********* 2 *******開始生產(chǎn)數(shù)據(jù)********* 3 *******開始生產(chǎn)數(shù)據(jù)********* 4 當(dāng)前線程:pool-1-thread-2獲取了數(shù)據(jù),id為:1,進(jìn)行裝載到公共緩沖區(qū)中。。。。 5 *******一次生產(chǎn)完成********* 6 *******開始生產(chǎn)數(shù)據(jù)********* 7 當(dāng)前線程:pool-1-thread-1獲取了數(shù)據(jù),id為:2,進(jìn)行裝載到公共緩沖區(qū)中。。。。 8 當(dāng)前消費(fèi)線程:pool-1-thread-4消費(fèi)成功,消費(fèi)數(shù)據(jù)為:數(shù)據(jù)1 9 *******一次生產(chǎn)完成********* 10 *******開始生產(chǎn)數(shù)據(jù)********* 11 當(dāng)前線程:pool-1-thread-3獲取了數(shù)據(jù),id為:3,進(jìn)行裝載到公共緩沖區(qū)中。。。。 12 *******一次生產(chǎn)完成********* 13 *******開始生產(chǎn)數(shù)據(jù)********* 14 當(dāng)前線程:pool-1-thread-2獲取了數(shù)據(jù),id為:4,進(jìn)行裝載到公共緩沖區(qū)中。。。。 15 *******一次生產(chǎn)完成********* 16 *******開始生產(chǎn)數(shù)據(jù)********* 17 當(dāng)前消費(fèi)線程:pool-1-thread-6消費(fèi)成功,消費(fèi)數(shù)據(jù)為:數(shù)據(jù)2 18 當(dāng)前消費(fèi)線程:pool-1-thread-4消費(fèi)成功,消費(fèi)數(shù)據(jù)為:數(shù)據(jù)4 19 當(dāng)前消費(fèi)線程:pool-1-thread-5消費(fèi)成功,消費(fèi)數(shù)據(jù)為:數(shù)據(jù)3 20 當(dāng)前線程:pool-1-thread-3獲取了數(shù)據(jù),id為:5,進(jìn)行裝載到公共緩沖區(qū)中。。。。 21 *******一次生產(chǎn)完成********* 22 *******開始生產(chǎn)數(shù)據(jù)********* 23 當(dāng)前線程:pool-1-thread-1獲取了數(shù)據(jù),id為:6,進(jìn)行裝載到公共緩沖區(qū)中。。。。 24 *******一次生產(chǎn)完成********* 25 *******開始生產(chǎn)數(shù)據(jù)********* 26 當(dāng)前線程:pool-1-thread-2獲取了數(shù)據(jù),id為:7,進(jìn)行裝載到公共緩沖區(qū)中。。。。 27 *******一次生產(chǎn)完成********* 28 *******開始生產(chǎn)數(shù)據(jù)********* 29 當(dāng)前消費(fèi)線程:pool-1-thread-6消費(fèi)成功,消費(fèi)數(shù)據(jù)為:數(shù)據(jù)5 30 當(dāng)前消費(fèi)線程:pool-1-thread-5消費(fèi)成功,消費(fèi)數(shù)據(jù)為:數(shù)據(jù)7 31 當(dāng)前消費(fèi)線程:pool-1-thread-4消費(fèi)成功,消費(fèi)數(shù)據(jù)為:數(shù)據(jù)6 32 當(dāng)前線程:pool-1-thread-2獲取了數(shù)據(jù),id為:8,進(jìn)行裝載到公共緩沖區(qū)中。。。。 33 *******一次生產(chǎn)完成********* 34 *******開始生產(chǎn)數(shù)據(jù)********* 35 當(dāng)前消費(fèi)線程:pool-1-thread-6消費(fèi)成功,消費(fèi)數(shù)據(jù)為:數(shù)據(jù)8 36 當(dāng)前線程:pool-1-thread-3獲取了數(shù)據(jù),id為:9,進(jìn)行裝載到公共緩沖區(qū)中。。。。 37 *******一次生產(chǎn)完成********* 38 *******開始生產(chǎn)數(shù)據(jù)********* 39 當(dāng)前線程:pool-1-thread-3獲取了數(shù)據(jù),id為:10,進(jìn)行裝載到公共緩沖區(qū)中。。。。 40 *******一次生產(chǎn)完成********* 41 *******開始生產(chǎn)數(shù)據(jù)********* 42 當(dāng)前消費(fèi)線程:pool-1-thread-4消費(fèi)成功,消費(fèi)數(shù)據(jù)為:數(shù)據(jù)10 43 當(dāng)前線程:pool-1-thread-3獲取了數(shù)據(jù),id為:11,進(jìn)行裝載到公共緩沖區(qū)中。。。。 44 *******一次生產(chǎn)完成********* 45 *******開始生產(chǎn)數(shù)據(jù)********* 46 當(dāng)前線程:pool-1-thread-1獲取了數(shù)據(jù),id為:12,進(jìn)行裝載到公共緩沖區(qū)中。。。。 47 *******一次生產(chǎn)完成********* 48 *******開始生產(chǎn)數(shù)據(jù)********* 49 當(dāng)前消費(fèi)線程:pool-1-thread-4消費(fèi)成功,消費(fèi)數(shù)據(jù)為:數(shù)據(jù)12 50 當(dāng)前線程:pool-1-thread-3獲取了數(shù)據(jù),id為:13,進(jìn)行裝載到公共緩沖區(qū)中。。。。 51 *******一次生產(chǎn)完成********* 52 *******開始生產(chǎn)數(shù)據(jù)********* 53 當(dāng)前消費(fèi)線程:pool-1-thread-5消費(fèi)成功,消費(fèi)數(shù)據(jù)為:數(shù)據(jù)9 54 當(dāng)前線程:pool-1-thread-1獲取了數(shù)據(jù),id為:14,進(jìn)行裝載到公共緩沖區(qū)中。。。。 55 *******一次生產(chǎn)完成********* 56 *******開始生產(chǎn)數(shù)據(jù)********* 57 當(dāng)前線程:pool-1-thread-2獲取了數(shù)據(jù),id為:15,進(jìn)行裝載到公共緩沖區(qū)中。。。。 58 *******一次生產(chǎn)完成********* 59 *******開始生產(chǎn)數(shù)據(jù)********* 60 當(dāng)前線程:pool-1-thread-3獲取了數(shù)據(jù),id為:16,進(jìn)行裝載到公共緩沖區(qū)中。。。。 61 *******一次生產(chǎn)完成********* 62 *******開始生產(chǎn)數(shù)據(jù)********* 63 當(dāng)前消費(fèi)線程:pool-1-thread-6消費(fèi)成功,消費(fèi)數(shù)據(jù)為:數(shù)據(jù)11 64 當(dāng)前消費(fèi)線程:pool-1-thread-4消費(fèi)成功,消費(fèi)數(shù)據(jù)為:數(shù)據(jù)13 65 停止生產(chǎn)數(shù)據(jù)。。。 66 當(dāng)前消費(fèi)線程:pool-1-thread-6消費(fèi)成功,消費(fèi)數(shù)據(jù)為:數(shù)據(jù)15 67 當(dāng)前消費(fèi)線程:pool-1-thread-5消費(fèi)成功,消費(fèi)數(shù)據(jù)為:數(shù)據(jù)14 68 當(dāng)前線程:pool-1-thread-3獲取了數(shù)據(jù),id為:17,進(jìn)行裝載到公共緩沖區(qū)中。。。。 69 *******一次生產(chǎn)完成********* 70 當(dāng)前消費(fèi)線程:pool-1-thread-4消費(fèi)成功,消費(fèi)數(shù)據(jù)為:數(shù)據(jù)16 71 當(dāng)前線程:pool-1-thread-2獲取了數(shù)據(jù),id為:18,進(jìn)行裝載到公共緩沖區(qū)中。。。。 72 *******一次生產(chǎn)完成********* 73 當(dāng)前線程:pool-1-thread-1獲取了數(shù)據(jù),id為:19,進(jìn)行裝載到公共緩沖區(qū)中。。。。 74 *******一次生產(chǎn)完成********* 75 當(dāng)前消費(fèi)線程:pool-1-thread-6消費(fèi)成功,消費(fèi)數(shù)據(jù)為:數(shù)據(jù)17 76 當(dāng)前消費(fèi)線程:pool-1-thread-4消費(fèi)成功,消費(fèi)數(shù)據(jù)為:數(shù)據(jù)19 77 當(dāng)前消費(fèi)線程:pool-1-thread-5消費(fèi)成功,消費(fèi)數(shù)據(jù)為:數(shù)據(jù)18?
轉(zhuǎn)載于:https://www.cnblogs.com/syousetu/p/6757265.html
總結(jié)
- 上一篇: 自定义弹框(手机端),定时消失
- 下一篇: ES6/ES2015核心内容(上)