使用java.util.concurrent包处理多线程
出處:http://blog.csdn.net/hjl_168562/article/details/8158023
1、使用擁有固定的線程數的線程池執行線程任務
package com.justin.thread.concurrent;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class TestThreadPool {
?public static void main(String args[]) throws InterruptedException {
??// only two threads
??ExecutorService exec = Executors.newFixedThreadPool(5);
??for (int index = 0; index < 10; index++) {
???Runnable run = new Runnable() {
????public void run() {
?????long time = (long) (Math.random() * 1000);
?????System.out.println(Thread.currentThread().getName() + ":Sleeping " + time + "ms");
?????try {
??????Thread.sleep(time);
?????} catch (InterruptedException e) {
?????}
????}
???};
???exec.execute(run);
??}
??// must shutdown
??exec.shutdown();
?}
}
2、執行定期任務
public class TestScheduledThread {
?public static void main(String[] args) {
??final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);
??final Runnable beeper = new Runnable() {
???int count = 0;
???public void run() {
????System.out.println(new Date() + " beep " + (++count));
???}
??};
??// 1秒鐘后運行,并每隔2秒運行一次
??final ScheduledFuture<?> beeperHandle = scheduler.scheduleAtFixedRate(beeper, 1, 2, TimeUnit.SECONDS);
??// 2秒鐘后運行,并每次在上次任務運行完后等待5秒后重新運行
??final ScheduledFuture<?> beeperHandle2 = scheduler.scheduleWithFixedDelay(beeper, 2, 5, TimeUnit.SECONDS);
??// 30秒后結束關閉任務,并且關閉Scheduler
??scheduler.schedule(new Runnable() {
???public void run() {
????beeperHandle.cancel(true);
????beeperHandle2.cancel(true);
????scheduler.shutdown();
???}
??}, 30, TimeUnit.SECONDS);
?}
}
3、多線程工程以完成同一件事情,而且在完成過程中,往往會等待其他線程都完成某一階段后再執行,等所有線程都到達某一個階段后再統一執行(比如有幾個旅行團需要途經深圳、廣州、韶關、長沙最后到達武漢。旅行團中有自駕游的,有徒步的,有乘坐旅游大巴的;這些旅行團同時出發,并且每到一個目的地,都要等待其他旅行團到達此地后再同時出發,直到都到達終點站武漢)
package com.justin.thread.concurrent;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class TestCyclicBarrier {
?// 徒步需要的時間: Shenzhen, Guangzhou, Shaoguan, Changsha, Wuhan
?private static int[] timeWalk = { 5, 8, 15, 15, 10 };
?// 自駕游
?private static int[] timeSelf = { 1, 3, 4, 4, 5 };
?// 旅游大巴
?private static int[] timeBus = { 2, 4, 6, 6, 7 };
?static String now() {
??SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
??return sdf.format(new Date()) + ": ";
?}
?static class Tour implements Runnable {
??private int[] times;
??private CyclicBarrier barrier;
??private String tourName;
??public Tour(CyclicBarrier barrier, String tourName, int[] times) {
???this.times = times;
???this.tourName = tourName;
???this.barrier = barrier;
??}
??public void run() {
???try {
????Thread.sleep(times[0] * 1000);
????System.out.println(now() + tourName + " Reached Shenzhen");
????barrier.await();
????Thread.sleep(times[1] * 1000);
????System.out.println(now() + tourName + " Reached Guangzhou");
????barrier.await();
????Thread.sleep(times[2] * 1000);
????System.out.println(now() + tourName + " Reached Shaoguan");
????barrier.await();
????Thread.sleep(times[3] * 1000);
????System.out.println(now() + tourName + " Reached Changsha");
????barrier.await();
????Thread.sleep(times[4] * 1000);
????System.out.println(now() + tourName + " Reached Wuhan");
????barrier.await();
???} catch (InterruptedException e) {
???} catch (BrokenBarrierException e) {
???}
??}
?}
?public static void main(String[] args) {
??// 三個旅行團
??CyclicBarrier barrier = new CyclicBarrier(3);
??ExecutorService exec = Executors.newFixedThreadPool(3);
??exec.submit(new Tour(barrier, "WalkTour", timeWalk));
??exec.submit(new Tour(barrier, "SelfTour", timeSelf));
??exec.submit(new Tour(barrier, "BusTour", timeBus));
??exec.shutdown();
?}
}
4、BlockingQueue,該類主要提供了兩個方法put()和take(),前者將一個對象放到隊列中,如果隊列已經滿了,就等待直到有空閑節點;后者從head取一個對象,如果沒有對象,就等待直到有可取的對象
package com.justin.thread.concurrent;
import java.io.File;
import java.io.FileFilter;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
public class TestBlockingQueue {
?static long randomTime() {
??return (long) (Math.random() * 1000);
?}
?public static void main(String[] args) {
??// 能容納100個文件
??final BlockingQueue queue = new LinkedBlockingQueue(100);
??// 線程池
??final ExecutorService exec = Executors.newFixedThreadPool(5);
??final File root = new File("D:\\Workspace\\Study");
??// 完成標志
??final File exitFile = new File("");
??// 讀個數
??final AtomicInteger rc = new AtomicInteger();
??// 寫個數
??final AtomicInteger wc = new AtomicInteger();
??// 讀線程
??Runnable read = new Runnable() {
???public void run() {
????scanFile(root);
????scanFile(exitFile);
???}
???public void scanFile(File file) {
????if (file.isDirectory()) {
?????File[] files = file.listFiles(new FileFilter() {
??????public boolean accept(File pathname) {
???????return pathname.isDirectory() || pathname.getPath().endsWith(".java");
??????}
?????});
?????for (File one : files)
??????scanFile(one);
????} else {
?????try {
??????int index = rc.incrementAndGet();
??????System.out.println("Read0: " + index + " " + file.getPath());
??????queue.put(file);
?????} catch (InterruptedException e) {
?????}
????}
???}
??};
??exec.submit(read);
??// 四個寫線程
??for (int index = 0; index < 4; index++) {
???// write thread
???final int NO = index;
???Runnable write = new Runnable() {
????String threadName = "Write" + NO;
????public void run() {
?????while (true) {
??????try {
???????Thread.sleep(randomTime());
???????int index = wc.incrementAndGet();
???????File file = (File) queue.take();
???????// 隊列已經無對象
???????if (file == exitFile) {
????????// 再次添加"標志",以讓其他線程正常退出
????????queue.put(exitFile);
????????break;
???????}
???????System.out.println(threadName + ": " + index + " " + file.getPath());
??????} catch (InterruptedException e) {
??????}
?????}
????}
???};
???exec.submit(write);
??}
??exec.shutdown();
?}
}
5、CountDownLatch,下面的例子簡單的說明了CountDownLatch的使用方法,模擬了100米賽跑,10名選手已經準備就緒,只等裁判一聲令下。當所有人都到達終點時,比賽結束
package com.justin.thread.concurrent;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class TestCountDownLatch {
?public static void main(String[] args) throws InterruptedException {
??// 開始的倒數鎖
??final CountDownLatch begin = new CountDownLatch(1);
??// 結束的倒數鎖
??final CountDownLatch end = new CountDownLatch(10);
??// 十名選手
??final ExecutorService exec = Executors.newFixedThreadPool(10);
??for (int index = 0; index < 10; index++) {
???final int NO = index + 1;
???Runnable run = new Runnable() {
????public void run() {
?????try {
??????begin.await();
??????Thread.sleep((long) (Math.random() * 10000));
??????System.out.println("No." + NO + " arrived");
?????} catch (InterruptedException e) {
?????} finally {
??????end.countDown();
?????}
????}
???};
???exec.submit(run);
??}
??System.out.println("Game Start");
??begin.countDown();
??end.await();
??System.out.println("Game Over");
??exec.shutdown();
?}
}
6、Future,比如用網頁瀏覽器瀏覽新聞時,最重要的是要顯示文字內容,至于與新聞相匹配的圖片就沒有那么重要的,所以此時首先保證文字信息先顯示,而圖片信息會后顯示,但又不能不顯示,由于下載圖片是一個耗時的操作,所以必須一開始就得下載
package com.justin.thread.concurrent;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class TestFutureTask {
?public static void main(String[] args) throws InterruptedException, ExecutionException {
??final ExecutorService exec = Executors.newFixedThreadPool(5);
??Callable call = new Callable() {
???public String call() throws Exception {
????Thread.sleep(1000 * 5);
????return "Other less important but longtime things.";
???}
??};
??Future task = exec.submit(call);
??// 重要的事情
??Thread.sleep(1000 * 3);
??System.out.println("Let’s do important things.");
??// 其他不重要的事情
??String obj = (String) task.get();
??System.out.println(obj);
??// 關閉線程池
??exec.shutdown();
?}
}
7、ExecutorCompletionService, 考慮以下場景:瀏覽網頁時,瀏覽器了5個線程下載網頁中的圖片文件,由于圖片大小、網站訪問速度等諸多因素的影響,完成圖片下載的時間就會有很大的不同。如果先下載完成的圖片就會被先顯示到界面上,反之,后下載的圖片就后顯示。
Java的并發庫的CompletionService可以滿足這種場景要求。該接口有兩個重要方法:submit()和take()。submit用于提交一個runnable或者callable,一般會提交給一個線程池處理;而take就是取出已經執行完畢runnable或者callable實例的Future對象,如果沒有滿足要求的,就等待了。 CompletionService還有一個對應的方法poll,該方法與take類似,只是不會等待,如果沒有滿足要求,就返回null對象
package com.justin.thread.concurrent;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class CompletionService {
?public static void main(String[] args) throws InterruptedException, ExecutionException {
??ExecutorService exec = Executors.newFixedThreadPool(10);
??ExecutorCompletionService serv = new ExecutorCompletionService(exec);
??for (int index = 0; index < 5; index++) {
???final int NO = index;
???Callable downImg = new Callable() {
????public String call() throws Exception {
?????Thread.sleep((long) (Math.random() * 10000));
?????return "Downloaded Image " + NO;
????}
???};
???serv.submit(downImg);
??}
??Thread.sleep(1000 * 2);
??System.out.println("Show web content");
??for (int index = 0; index < 5; index++) {
???Future task = serv.take();
???String img = (String) task.get();
???System.out.println(img);
??}
??System.out.println("End");
??// 關閉線程池
??exec.shutdown();
?}
}
8、Semaphore,下面的Demo中申明了一個只有5個許可的Semaphore,而有20個線程要訪問這個資源,通過acquire()和release()獲取和釋放訪問許可
package com.justin.thread.concurrent;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
public class TestSemaphore {
?public static void main(String[] args) {
??// 線程池
??ExecutorService exec = Executors.newCachedThreadPool();
??// 只能5個線程同時訪問
??final Semaphore semp = new Semaphore(5);
??// 模擬20個客戶端訪問
??for (int index = 0; index < 20; index++) {
???final int NO = index;
???Runnable run = new Runnable() {
????public void run() {
?????try {
??????// 獲取許可
??????semp.acquire();
??????System.out.println("Accessing: " + NO);
??????Thread.sleep((long) (Math.random() * 10000));
??????// 訪問完后,釋放
??????semp.release();
?????} catch (InterruptedException e) {
?????}
????}
???};
???exec.execute(run);
??}
??// 退出線程池
??exec.shutdown();
?}
}
總結
以上是生活随笔為你收集整理的使用java.util.concurrent包处理多线程的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 二分查找法---java实现
- 下一篇: java并发面试题(一)基础