java.util.concurrent包
本文是我們學院課程中名為Java Concurrency Essentials的一部分 。
在本課程中,您將深入探討并發的魔力。 將向您介紹并發和并發代碼的基礎知識,并學習諸如原子性,同步和線程安全之類的概念。 在這里查看 !
目錄
1.簡介 2. java.util.concurrent1.簡介
下一章介紹java.util.concurrent包。 在該程序包中包含許多有趣的類,這些類提供了實現多線程應用程序所需的必要和有用的功能。 在討論了如何使用Executor接口及其實現之后,本章介紹了原子數據類型和并發數據結構。 最后一部分向信號燈和倒數鎖存器發出信號。
2. java.util.concurrent
閱讀了先前關于并發和多線程的文章之后,您可能會覺得編寫在多線程環境中執行良好的健壯代碼并不總是那么簡單。 有一個諺語可以說明這一點(來源未知):
- 初級程序員認為并發很難。
- 經驗豐富的程序員認為并發很容易。
- 高級程序員認為并發很難。
因此,一個可靠的數據結構和類庫可提供經過良好測試的線程安全性,對于編寫使用并發性程序的任何人都非常有幫助。 幸運的是,JDK為此目的提供了一組現成的數據結構和功能。 所有這些類都位于包java.util.concurrent中。
執行者
java.util.concurrent包定義了一組接口,這些接口的實現執行任務。 其中最簡單的一個是Executor接口:
public interface Executor {void execute(Runnable command); }因此,執行器實現采用給定的Runnable實例并執行它。 該接口不對執行方式進行任何假設,javadoc僅聲明“將來某個時候執行給定命令”。 因此,一個簡單的實現可以是:
public class MyExecutor implements Executor {public void execute(Runnable r) {(new Thread(r)).start();} }除了純接口外,JDK還提供了一個成熟且可擴展的實現,名為ThreadPoolExecutor 。 在后臺, ThreadPoolExecutor維護線程池,并在給定execute()方法的情況下將Runnable實例調度到該池。 傳遞給構造函數的參數控制線程池的行為。 參數最多的構造函數如下:
ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit單位,BlockingQueue <Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler處理程序)
讓我們逐步研究不同的參數:
- corePoolSize : ThreadPoolExecutor具有一個corePoolSize屬性,該屬性確定只有在隊列已滿時才啟動新線程,直到啟動新線程為止。
- maximumPoolSize :此屬性確定最大啟動多少線程。 您可以將其設置為Integer.MAX_VALUE ,以使其沒有上限。
- keepAliveTime :當ThreadPoolExecutor創建的ThreadPoolExecutor數超過corePoolSize ,當線程在給定的時間內空閑時,該線程將從池中刪除。
- unit :這只是keepAliveTime的TimeUnit 。
- workQueue :此隊列保存通過execute()方法給定的Runnable實例,直到它們實際啟動為止。
- threadFactory :此接口的實現使您可以控制ThreadPoolExecutor使用的線程的創建。
- handler :當您為workQueue指定固定大小并提供maximumPoolSize時,可能會發生ThreadPoolExecutor由于飽和而無法執行您的Runnable實例的情況。 在這種情況下,將調用提供的處理程序,并讓您控制在這種情況下應該發生的情況。
由于有許多參數需要調整,讓我們檢查一些使用它們的代碼:
public class ThreadPoolExecutorExample implements Runnable {private static AtomicInteger counter = new AtomicInteger();private final int taskId;public int getTaskId() {return taskId;}public ThreadPoolExecutorExample(int taskId) {this.taskId = taskId;}public void run() {try {Thread.sleep(5000);} catch (InterruptedException e) {e.printStackTrace();}}public static void main(String[] args) {BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>(10);ThreadFactory threadFactory = new ThreadFactory() {public Thread newThread(Runnable r) {int currentCount = counter.getAndIncrement();System.out.println("Creating new thread: " + currentCount);return new Thread(r, "mythread" + currentCount);}};RejectedExecutionHandler rejectedHandler = new RejectedExecutionHandler() {public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {if (r instanceof ThreadPoolExecutorExample) {ThreadPoolExecutorExample example = (ThreadPoolExecutorExample) r;System.out.println("Rejecting task with id " + example.getTaskId());}}};ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 1, TimeUnit.SECONDS, queue, threadFactory, rejectedHandler);for (int i = 0; i < 100; i++) {executor.execute(new ThreadPoolExecutorExample(i));}executor.shutdown();} }我們的run()實現僅睡5秒鐘,但這不是此代碼的主要重點。 ThreadPoolExecutor從5個核心線程開始,并允許池最多擴展到10個線程。 出于演示目的,我們僅將未使用的線程閑置大約1秒鐘。 這里的隊列實現是LinkedBlockingQueue與10個的容量Runnable實例。 我們還實現了一個簡單的ThreadFactory以便跟蹤線程的創建。 對于RejectedExecutionHandler也是如此。
在環路main()方法現在發出100 Runnable實例很短的時間量內該池。 該示例的輸出顯示我們必須創建10個線程(最多)來處理所有未決的Runnables :
Creating new thread: 0 ... Creating new thread: 9 Rejecting task with id 20 ... Rejecting task with id 99但它也顯示所有taskId大于19的任務都轉發到RejectedExecutionHandler 。 這是因為我們的Runnable實現休眠了5秒鐘。 第10個線程已經啟動后,隊列只能持有另外10個Runnable實例。 然后,必須拒絕所有其他實例。
最后, shutdown()方法使ThreadPoolExecutor拒絕所有其他任務,并等待直到已執行的任務已執行。 您可以將調用shutdown()替換為shutdownNow() 。 后者嘗試中斷所有正在運行的線程并關閉線程池,而不等待所有線程完成。 在上面的示例中,您會看到十個InterruptedException異常,因為我們的十個睡眠線程被立即喚醒。
執行器服務
Executor接口非常簡單,它僅強制底層實現實現execute()方法。 ExecutorService進一步擴展了Executor接口,并添加了一系列實用程序方法(例如,添加了完整的任務集合),關閉線程池的方法以及查詢實現以獲取執行結果的能力執行一項任務。 我們已經看到, Runnable接口僅定義一個run()方法作為返回值是無效的。 因此,有必要引入一個名為Callable的新接口,該接口類似于Runnable定義也只有一個方法,但是此方法返回一個值:
V call();但是,JDK如何處理任務返回一個值但提交給線程池以執行的事實呢?
任務的提交者無法提前知道任務何時執行以及執行的持續時間。 讓當前線程等待結果顯然不是解決方案。 在另一個類java.util.concurrent.Future<V>實現了檢查結果是否已經可用的功能,該功能可以阻止或等待一定時間。 此類只有幾種方法可以檢查任務是否已完成,取消任務以及檢索其結果。
最后但并非最不重要的一點是,我們還有另一個接口,該接口通過某些方法擴展了Executor接口和ExecutorService接口,以在給定的時間點計劃任務。 接口的名稱為ScheduledExecutorService ,它基本上提供了一個schedule()方法,該方法使用一個參數來等待任務執行之前需要等待多長時間:
schedule(Callable<V> callable, long delay, TimeUnit unit); schedule(Runnable command, long delay, TimeUnit unit);就像ExecutorService一樣, schedule()方法有兩種變體:一種用于Runnable接口,一種用于使用Callable接口返回值的任務。 ScheduledExecutorService還提供了一種定期執行任務的方法:
scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);在初始延遲旁邊,我們可以指定任務應運行的時間。
最后一個示例已經展示了如何創建ThreadPoolExecutor 。 ScheduledExecutorService的實現名為ScheduledThreadPoolExecutor ,其處理方式與上面使用的ThreadPoolExecutor非常相似。 但是通常不需要完全控制ExecutorService的所有功能。 試想一下,一個簡單的測試客戶端應該使用一個簡單的ThreadPool調用一些服務器方法。
因此,JDK的創建者創建了一個名為Executors的簡單工廠類(請注意結尾的)。 此類提供了一些靜態方法來創建可使用的ThreadPoolExecutor 。 所有這些使我們能夠實現一個簡單的線程池,該線程池執行一堆計算一些數字的任務(這里的數字運算操作是出于演示目的,由一個簡單的Thread.sleep()代替):
public class ExecutorsExample implements Callable<Integer> {private static Random random = new Random(System.currentTimeMillis());public Integer call() throws Exception {Thread.sleep(1000);return random.nextInt(100);}public static void main(String[] args) throws InterruptedException, ExecutionException {ExecutorService executorService = Executors.newFixedThreadPool(5);Future<Integer>[] futures = new Future[5];for (int i = 0; i < futures.length; i++) {futures[i] = executorService.submit(new ExecutorsExample());}for (int i = 0; i < futures.length; i++) {Integer retVal = futures[i].get();System.out.println(retVal);}executorService.shutdown();} }ExecutorService的創建是ExecutorService 。 要執行一些任務,我們只需要一個for循環,即可創建ExecutorsExample的一些新實例并將返回的Future存儲在數組中。 將任務提交給服務后,我們只需等待結果。 Future get()方法正在阻塞,即當前線程進入睡眠狀態直到結果可用。 如果任務未在定義的時間段內完成,則此方法的重寫版本采用超時規范,以便等待線程繼續進行。
并發集合
Java集合框架包含每個Java程序員在日常工作中使用的各種數據結構。 此集合由java.util.concurrent包中的數據結構擴展。 這些實現提供了在多線程環境中使用的線程安全集合。
許多Java程序員甚至不知不覺地使用線程安全的數據結構。 “舊”類Hashtable和Vector是此類的示例。 自1.0版以來,它們是JDK的一部分,這些基本數據結構在設計時考慮了線程安全性。 盡管此處的線程安全性僅意味著所有方法都在實例級別上同步。 以下代碼取自Oracle的JDK實現:
public synchronized void clear() {Entry tab[] = table;modCount++;for (int index = tab.length; --index >= 0; )tab[index] = null;count = 0; }這與諸如HashMap或ArrayList (自JDK 1.2起都提供)之類的“較新”集合類(它們本身都不是線程安全的)的關鍵區別。 但是,有一種方便的方法可以檢索此類“較新”的集合類的線程安全實例:
HashMap<Long,String> map = new HashMap<Long, String>(); Map<Long, String> synchronizedMap = Collections.synchronizedMap(map);正如我們在上面的代碼中看到的那樣, Collections類使我們可以在運行時創建以前未同步的collections類的同步版本。
如前所述,將關鍵字sync同步到方法會導致在每個時間點只有一個線程執行所研究對象的方法。 當然,這是使簡單集合類具有線程安全性的最簡單方法。 更高級的技術包括專為并發訪問而設計的特殊算法。 這些算法在java.util.concurrent包的集合類java.util.concurrent實現。
此類的一個示例是ConcurrentHashMap :
ConcurrentHashMap<Long,String> map = new ConcurrentHashMap<Long,String>(); map.put(key, value); String value2 = map.get(key);上面的代碼看起來與普通的HashMap幾乎相同,但是底層實現卻完全不同。 ConcurrentHashMap不是將整個表僅使用一個鎖,而是將整個表細分為許多小分區。 每個分區都有自己的鎖。 因此,假設不同線程在表的不同分區上進行寫入,則它們從不同線程對該映射的寫入操作不會競爭,并且可以使用自己的鎖。
該實現還引入了提交寫操作的想法,以減少讀操作的等待時間。 這將略微更改讀取操作的語義,因為它將返回已完成的最新寫入操作的結果。 這意味著在執行read方法之前和之后,條目的數量可能不一樣,就像使用同步方法時一樣,但是對于并發應用程序,這并不總是很重要。 ConcurrentHashMap的迭代器實現也是如此。
為了更好地了解Hashtable性能,同步的HashMap和ConcurrentHashMap的性能,讓我們實現一個簡單的性能測試。 以下代碼啟動了幾個線程,并允許每個線程從映射中的一個隨機位置檢索一個值,然后在另一個隨機位置更新一個值:
public class MapComparison implements Runnable {private static Map<Integer, String> map;private Random random = new Random(System.currentTimeMillis());public static void main(String[] args) throws InterruptedException {runPerfTest(new Hashtable<Integer, String>());runPerfTest(Collections.synchronizedMap(new HashMap<Integer,String>()));runPerfTest(new ConcurrentHashMap<Integer, String>());runPerfTest(new ConcurrentSkipListMap<Integer, String>());}private static void runPerfTest(Map<Integer, String> map) throws InterruptedException {MapComparison.map = map;fillMap(map);ExecutorService executorService = Executors.newFixedThreadPool(10);long startMillis = System.currentTimeMillis();for (int i = 0; i < 10; i++) {executorService.execute(new MapComparison());}executorService.shutdown();executorService.awaitTermination(1, TimeUnit.MINUTES);System.out.println(map.getClass().getSimpleName() + " took " + (System.currentTimeMillis() - startMillis) + " ms");}private static void fillMap(Map<Integer, String> map) {for (int i = 0; i < 100; i++) {map.put(i, String.valueOf(i));}}public void run() {for (int i = 0; i < 100000; i++) {int randomInt = random.nextInt(100);map.get(randomInt);randomInt = random.nextInt(100);map.put(randomInt, String.valueOf(randomInt));}} }該程序的輸出如下:
Hashtable took 436 ms SynchronizedMap took 433 ms ConcurrentHashMap took 75 ms ConcurrentSkipListMap took 89 ms正如我們所期望的, Hashtable和同步的HashMap實現遠遠落后于并HashMap實現。 本示例還介紹了HashMap的跳過列表實現,其中一個存儲桶中的鏈接項形成一個跳過列表,這意味著對列表進行了排序,并且列表中鏈接項的級別不同。 最高級別的指針直接指向列表中間的某個項目。 如果該項目已經大于當前項目,則迭代器必須采用下一個較低的鏈接級別,以跳過比最高級別更少的元素。 跳過列表的詳細說明可以在此處找到。 關于跳過列表的有趣之處在于,即使所有項目都存儲在同一存儲桶中,所有讀取訪問也要花費log(n)時間。
原子變量
當多個線程共享一個變量時,我們需要同步對該變量的訪問。 原因是這樣的事實,即使像i ++這樣的簡單指令也不是原子的。 它基本上由以下字節碼指令組成:
iload_1 iinc 1, 1 istore_1在不了解Java字節碼的情況下,人們看到了局部變量1的當前值被壓入操作數堆棧,它以常數1遞增,然后從堆棧中彈出并存儲在局部變量號1中。 。 這意味著我們需要三個原子操作才能將局部變量加1。 在多線程環境中,這還意味著調度程序可以停止在這些指令中的每條指令之間執行當前線程,并啟動一個新線程,然后該新線程又可以在同一變量上工作。
為了應對這種情況,您當然可以同步對此特定變量的訪問:
synchronized(i) {i++; }但這也意味著當前線程必須獲取i的鎖,這需要在JVM中進行一些內部同步和計算。 這種方法也稱為悲觀鎖定,因為我們認為另一個線程當前持有我們想要獲取的鎖定的可能性很高。 另一種稱為樂觀鎖定的方法假定沒有太多線程爭用資源,因此我們只是嘗試更新該值并查看是否起作用。 此方法的一種實現是比較交換(CAS)方法。 此操作在許多現代CPU上實現為原子操作。 它將給定存儲位置的內容與給定值(“期望值”)進行比較,如果當前值等于期望值,則將其更新為新值。 用偽代碼看起來像:
int currentValue = getValueAtMemoryPosition(pos); if(currentValue == expectedValue) {setValueAtMemoryPosition(pos, newValue); }CAS操作將上述代碼實現為一個原子操作。 因此,它可以用來查看某個變量的值是否仍為當前線程持有的值,并在這種情況下將其更新為遞增的值。 由于CAS操作的使用需要硬件支持,因此JDK提供了特殊的類來支持這些操作。 它們都位于包java.util.concurrent.atomic中。
這些類的代表是java.util.concurrent.atomic.AtomicInteger 。 上面討論的CAS操作是通過該方法實現的
boolean compareAndSet(int expect, int update)布爾值返回值指示更新操作是否成功。 基于此功能,可以實現進一步的操作,例如原子增量操作(此處取自Oracle的JDK實現):
public final int getAndIncrement() {for (;;) {int current = get();int next = current + 1;if (compareAndSet(current, next))return current;}}現在我們可以通過不同的線程遞增整數變量,而無需使用悲觀鎖:
public class AtomicIntegerExample implements Runnable {private static final AtomicInteger atomicInteger = new AtomicInteger();public static void main(String[] args) {ExecutorService executorService = Executors.newFixedThreadPool(5);for (int i = 0; i < 5; i++) {executorService.execute(new AtomicIntegerExample());}executorService.shutdown();}public void run() {for (int i = 0; i < 10; i++) {int newValue = atomicInteger.getAndIncrement();if (newValue == 42) {System.out.println("[" + Thread.currentThread().getName() + "]: " + newValue);}}} }上面的代碼啟動了五個線程,并讓每個線程遞增AtomicInteger變量。 得到答案的幸運線42將其打印到控制臺。 重復執行此示例代碼時,輸??出將僅由一個線程創建。
在AtomicInteger旁邊,JDK還提供了用于對長值,整數和長數組以及引用進行原子操作的類。
信號
信號量用于控制對共享資源的訪問。 與簡單的同步塊相反,信號量具有一個內部計數器,該內部計數器在線程每次獲取鎖時增加,而在線程釋放其之前獲得的鎖時減少。 遞增和遞減操作當然是同步的,因此可以使用信號量來控制同時通過關鍵部分的線程數。 線程的兩個基本操作是:
void acquire(); void release();構造函數采用并發鎖定公平性參數的數量。 fairness參數決定是否在等待線程列表的開頭或結尾設置嘗試獲取鎖的新線程。 將新線程放在線程末尾可確保所有線程在一段時間后將獲得鎖,因此不會出現線程饑餓的情況。
Semaphore(int permits, boolean fair)為了說明所描述的行為,讓我們建立一個具有五個線程的簡單線程池,但通過一個信號量進行控制,在每個時間點運行的信號不超過三個:
public class SemaphoreExample implements Runnable {private static final Semaphore semaphore = new Semaphore(3, true);private static final AtomicInteger counter = new AtomicInteger();private static final long endMillis = System.currentTimeMillis() + 10000;public static void main(String[] args) {ExecutorService executorService = Executors.newFixedThreadPool(5);for (int i = 0; i < 5; i++) {executorService.execute(new SemaphoreExample());}executorService.shutdown();}public void run() {while(System.currentTimeMillis() < endMillis) {try {semaphore.acquire();} catch (InterruptedException e) {System.out.println("["+Thread.currentThread().getName()+"] Interrupted in acquire().");}int counterValue = counter.incrementAndGet();System.out.println("["+Thread.currentThread().getName()+"] semaphore acquired: "+counterValue);if(counterValue > 3) {throw new IllegalStateException("More than three threads acquired the lock.");}counter.decrementAndGet();semaphore.release();}} }通過將3作為并發許可的數量來構造信號量。 當嘗試獲取鎖時,被阻止的線程可能會遇到必須捕獲的InterruptedException 。 或者,也可以調用實用程序方法acquireUninterruptibly()來繞過try-catch構造。
為確保關鍵部分中的并發線程不超過三個,我們使用AtomicInteger ,每次進程進入該部分時該AtomicInteger都會遞增,而在離開該部分之前會遞減。 當計數器的值大于4時,將引發IllegalStateException 。 最后,我們release()信號量,然后讓另一個等待線程進入臨界區。
CountDownLatch
CountDownLatch類是另一個有助于從JDK進行線程同步的類。 類似于Semaphore類,它提供了一個計數器,但是CountDownLatch的計數器只能減少到零為止。 一旦計數器達到零,所有等待CountDownLatch線程都可以繼續。 當池中的所有線程必須在某個點進行同步才能繼續進行時,通常需要這種功能。 一個簡單的示例是一個應用程序,該應用程序必須先從不同來源收集數據,然后才能將新數據集存儲到數據庫中。
以下代碼演示了五個線程如何在隨機時間內睡眠。 喚醒的每個線程都會對閂鎖進行遞減計數,然后等待閂鎖變為零。 最后,所有線程輸出它們已完成的輸出。
public class CountDownLatchExample implements Runnable {private static final int NUMBER_OF_THREADS = 5;private static final CountDownLatch latch = new CountDownLatch(NUMBER_OF_THREADS);private static Random random = new Random(System.currentTimeMillis());public static void main(String[] args) {ExecutorService executorService = Executors.newFixedThreadPool(NUMBER_OF_THREADS);for (int i = 0; i < NUMBER_OF_THREADS; i++) {executorService.execute(new CountDownLatchExample());}executorService.shutdown();}public void run() {try {int randomSleepTime = random.nextInt(20000);System.out.println("[" + Thread.currentThread().getName() + "] Sleeping for " + randomSleepTime);Thread.sleep(randomSleepTime);latch.countDown();System.out.println("[" + Thread.currentThread().getName() + "] Waiting for latch.");latch.await();System.out.println("[" + Thread.currentThread().getName() + "] Finished.");} catch (InterruptedException e) {e.printStackTrace();}} }運行此示例時,您將看到輸出“ Waiting for閂鎖”。 在不同的時間點出現,但“完成”。 每個線程的消息立即一個接一個地打印。
循環屏障
與CountDownLatch , CyclicBarrier類實現了一個計數器,該計數器在遞減為零后可以重置。 所有線程必須調用其方法await()直到內部計數器設置為零為止。 等待的線程然后被喚醒并可以繼續。 然后在內部將計數器重置為其原始值,并且整個過程可以再次開始:
public class CyclicBarrierExample implements Runnable {private static final int NUMBER_OF_THREADS = 5;private static AtomicInteger counter = new AtomicInteger();private static Random random = new Random(System.currentTimeMillis());private static final CyclicBarrier barrier = new CyclicBarrier(5, new Runnable() {public void run() {counter.incrementAndGet();}});public static void main(String[] args) {ExecutorService executorService = Executors.newFixedThreadPool(NUMBER_OF_THREADS);for (int i = 0; i < NUMBER_OF_THREADS; i++) {executorService.execute(new CyclicBarrierExample());}executorService.shutdown();}public void run() {try {while(counter.get() < 3) {int randomSleepTime = random.nextInt(10000);System.out.println("[" + Thread.currentThread().getName() + "] Sleeping for " + randomSleepTime);Thread.sleep(randomSleepTime);System.out.println("[" + Thread.currentThread().getName() + "] Waiting for barrier.");barrier.await();System.out.println("[" + Thread.currentThread().getName() + "] Finished.");}} catch (Exception e) {e.printStackTrace();}} }上面的示例與CountDownLatch非常相似,但是與前面的示例相反,我向run()方法添加了while循環。 這種run()實現使每個線程都能繼續進行sleeping和await()過程,直到計數器為三。 還要注意提供給CyclicBarrier的構造函數的匿名Runnable()實現。 每當障礙被觸發時,其run()方法都會執行。 在這里,我們增加了并發線程檢查的計數器。
3.下載源代碼
您可以下載本課程的源代碼: concurrency-4.zip
翻譯自: https://www.javacodegeeks.com/2015/09/the-java-util-concurrent-package.html
創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎總結
以上是生活随笔為你收集整理的java.util.concurrent包的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Android允许后台活动管理,安卓基础
- 下一篇: 外币兑换人民币公式?