JDK并发包1-2
我們看一下容器的并發情況,首先我們介紹一下HashMap,HashMap并不是一個安全的集合,如果大家在多線程中使用HashMap的話,那就會產生一些稀奇古怪的現象,最簡單的把HashMap變成線程安全的方式呢,Collections.synchronizedMap進行一個包裝
同樣對于其他的集合,List,Set,都提供了SynchronizedList方法,和SynchronizedSet方法,把任意非線程安全的集合,變成線程安全的集合,這個可以作為一個實用的工具類,但是這種只適合于并發量比較小的情況,我們可以看一下Collection內部是如何實現的,/*** Returns a synchronized (thread-safe) map backed by the specified* map. In order to guarantee serial access, it is critical that* <strong>all</strong> access to the backing map is accomplished* through the returned map.<p>** It is imperative that the user manually synchronize on the returned* map when iterating over any of its collection views:* <pre>* Map m = Collections.synchronizedMap(new HashMap());* ...* Set s = m.keySet(); // Needn't be in synchronized block* ...* synchronized (m) { // Synchronizing on m, not s!* Iterator i = s.iterator(); // Must be in synchronized block* while (i.hasNext())* foo(i.next());* }* </pre>* Failure to follow this advice may result in non-deterministic behavior.** <p>The returned map will be serializable if the specified map is* serializable.** @param <K> the class of the map keys* @param <V> the class of the map values* @param m the map to be "wrapped" in a synchronized map.* @return a synchronized view of the specified map.*/
public static <K,V> Map<K,V> synchronizedMap(Map<K,V> m) {return new SynchronizedMap<>(m);
}他把這個map包裝在SynchronizedMap里面,Map的每一個操作,他都會去做一個同步,put是同步,get也會做一個同步/*** @serial include*/
private static class SynchronizedMap<K,V>implements Map<K,V>, Serializable {private static final long serialVersionUID = 1978198479659022715L;private final Map<K,V> m; // Backing Mapfinal Object mutex; // Object on which to synchronizeSynchronizedMap(Map<K,V> m) {this.m = Objects.requireNonNull(m);mutex = this;}SynchronizedMap(Map<K,V> m, Object mutex) {this.m = m;this.mutex = mutex;}public int size() {synchronized (mutex) {return m.size();}}public boolean isEmpty() {synchronized (mutex) {return m.isEmpty();}}public boolean containsKey(Object key) {synchronized (mutex) {return m.containsKey(key);}}public boolean containsValue(Object value) {synchronized (mutex) {return m.containsValue(value);}}public V get(Object key) {synchronized (mutex) {return m.get(key);}}public V put(K key, V value) {synchronized (mutex) {return m.put(key, value);}}public V remove(Object key) {synchronized (mutex) {return m.remove(key);}}從而使得map變得線程安全的一個包裝,但是他的問題在哪里呢,就讓put和ge都變得非常串行的一個實現,get和get之間也得做一個等待,如果并發很高,其實是線程一個一個去做這個事情的,所以他的并發量并不會非常大,所以他只是一個并行的解決方案,并不是一個高并發的解決方案,這幾個也都是一樣的,這里給大家介紹一個高并發的解決方案呢,是ConcurrentHashMap,它是一個高性能的高并發的解決方案,那這里有關Map的使用,我就不介紹了,和普通的HashMap是一樣的,為什么ConcurrentHashMap是一個高并發的解決方案,而普通的HashMap做一個簡單的包裝呢,他只是一個簡單的并發方案呢,我想把普通的HashMap做一個介紹,內部實現其實是一個數組每一個表象放的是Entry,每一個Entry里面放的key和value,當我一個key進來的時候,我要放到哪一個表項里面去呢,放到數組的哪一個槽位當中呢,通過哈希算法得到的,映射到同一個槽位當中稱之為哈希沖突,雖然你映射到同一個槽位里,那我就把你放到同一個槽位里,一個entry數組槽位當中,我怎么放兩個entry呢,我這個entry槽位你有一個next,這樣我這里就變成了一個鏈表,他基本實現是一個數組,數組里面放的是entry,每一個entry都是鏈表當中的一環,當哈希發生大量的哈希沖突的時候呢,他又退化成一個鏈表,這個就是HashMap的一個基本實現,一般來說HashMap也不會放滿,因為你放滿之后必然會產生沖突,如果你所有的元素都放滿了,那你就沖突了,沖突是我們不想看見的,一旦HashMap發生沖突以后,后果性能就會降低,下面我們來看一下ConcurrentHashMap怎么做的,我們來看put操作,當你put一個key進去的時候,/*** Maps the specified key to the specified value in this table.* Neither the key nor the value can be null.** <p>The value can be retrieved by calling the {@code get} method* with a key that is equal to the original key.** @param key key with which the specified value is to be associated* @param value value to be associated with the specified key* @return the previous value associated with {@code key}, or* {@code null} if there was no mapping for {@code key}* @throws NullPointerException if the specified key or value is null*/
public V put(K key, V value) {return putVal(key, value, false);
}他有一個Segment,因為現在要一個高并發的HashMap,并不是一個普通的HashMap,如果有大量線程產生的時候,意味著我大量的線程,我可以把一個大的HashMap切分成若干個小的HashMap,然后我每一個線程進來的時候呢,先把當前的key映射到小HashMap里面去,然后在小HashMap里面做一個HashMap做的事情,假如我有16個小HashMap,意味著我可以接受16個線程的訪問,相比于我之前只能接受一個線程去訪問他,性能就提高了16倍了,就提高了16倍,而小的HashMap就是Segment,所以Segment也是key里面的對,但是它是一個小HashMap,所以當你put一個key value進去的時候呢,減少了多個線程的沖突,這個Segment繼承了ReentrantLock重入鎖,static class Segment<K,V> extends ReentrantLock implements Serializable {tryLock就是重入鎖tryLock,tryLock做什么呢,就是做CAS操作,因為我使用的是tryLock,所以不會出現等待的情況,lock才會等待,并沒有簡單的使用lock方法,而是使用tryLock嘗試拿鎖,拿不到我們在應用層做處理,tryLock在里面是一個CAS的實現,我不停的try,我有一個try的次數,相當于是一個自循,rehash會把空間翻倍,rehash可能是一個比較耗時的操作,/*** {@inheritDoc}*/
public int size() {long n = sumCount();return ((n < 0L) ? 0 :(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :(int)n);
}你有多個個段你就有多少個lock,有多少個段就做多少個unlock,size是把所有的segment數據都要加進來,如果還在進行修改的話,那我是沒有辦法統計的,所以在統計之前我要把所有的segment鎖都要拿到,然后再做這個數據的統計,看看到底有多少數據,然后再把這個鎖釋放掉,這就是一個小小的弊端,當你要取得全局鎖的時候你要把所有的鎖都獲取到,但是size并不是一個高頻率調用的一個方法,應該說也是可以接受的,我們再來看一個比較重要的東西,BlockQueue阻塞隊列,阻塞隊列在這里是一個接口,并不是一個實際的類,阻塞隊列首先明確一點,它是線程安全的,在并發容器當中,他的性能其實并不好,但為什么他重要呢,它是一個非常好的共享數據的,共享數據的一個容器,他有什么好處呢,如果你的隊列為空,然后你還試圖從這里讀一個數據的時候呢,讀的線程就會做一個等待,等待另外一個線程往里面寫數據的時候,等待線程就會被喚醒,拿到數據,反過來如果數據已滿,你還往隊列里面寫數據,那寫的線程就會等待,等待有人把這個數據拿掉之后呢,才能寫進去,這就是BlockQueue阻塞隊列,他就是取數據,拿數據,拿數據還可以設置一個時間,拿不到可以拋出一個異常,/*** Retrieves and removes the head of this queue, waiting up to the* specified wait time if necessary for an element to become available.** @param timeout how long to wait before giving up, in units of* {@code unit}* @param unit a {@code TimeUnit} determining how to interpret the* {@code timeout} parameter* @return the head of this queue, or {@code null} if the* specified waiting time elapses before an element is available* @throws InterruptedException if interrupted while waiting*/
E poll(long timeout, TimeUnit unit)throws InterruptedException;/*** Retrieves and removes the head of this queue, waiting if necessary* until an element becomes available.** @return the head of this queue* @throws InterruptedException if interrupted while waiting*/
E take() throws InterruptedException;我們介紹ArrayBlockingQueue,他內部就是使用數組來實現的,LinkedBlockingQueue他就使用鏈表來實現,ArrayBlockingQueue他用到了兩個工具,/** Main lock guarding all access */
final ReentrantLock lock;一個是用來進行訪問控制,我要保證他的線程安全,另外一個是用來做通知/** Condition for waiting takes */
private final Condition notEmpty;/** Condition for waiting puts */
private final Condition notFull;如果你要拿數據,你這個數據為空,我就需要在我有數據的時候通知你,這個時候使用的就是notEmpty這個condition,但反過來你要寫數據,你需要等待,怎么通知你數據已經不滿了呢,就通過notFull,/*** Inserts the specified element at the tail of this queue, waiting* for space to become available if the queue is full.** @throws InterruptedException {@inheritDoc}* @throws NullPointerException {@inheritDoc}*/
public void put(E e) throws InterruptedException {checkNotNull(e);final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {while (count == items.length)notFull.await();enqueue(e);} finally {lock.unlock();}
}首先做一個加鎖,為什么這里不是一個高性能的并發呢,但凡你看到他毫無顧忌的做加鎖操作的時候,他必然不是高性能的,像ConcurrentHashMap這種寫法才是高性能的寫法,不會隨隨便便去做一個lock操作,它會在應用層面做自循等待,我lock的時候會判斷是否滿了,如果我滿了的話我該做什么呢,我就會等,我不能再插入了,我需要插入,但是滿了我不需要插,public E take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {while (count == 0)notEmpty.await();return dequeue();} finally {lock.unlock();}
}/*** Extracts element at current take position, advances, and signals.* Call only when holding lock.*/
private E dequeue() {// assert lock.getHoldCount() == 1;// assert items[takeIndex] != null;final Object[] items = this.items;@SuppressWarnings("unchecked")E x = (E) items[takeIndex];items[takeIndex] = null;if (++takeIndex == items.length)takeIndex = 0;count--;if (itrs != null)itrs.elementDequeued();notFull.signal();return x;
}BlockingQueue適合生產者消費者模式,一個生產者往隊列里放數據,往隊列里take數據,拿不到數據我就等,拿到數據我就處理,ConcurrentLinkedQueue,也是一個隊列,它是個鏈表,這個是一個高性能的隊列,在高并發的情況下他可以保持一個高吞吐量,BlockQueue是不行的,它是一個高性能隊列,他的接口和BlokingQueue是一樣的,因為他們都是隊列,插入數據,拿取數據,知道有這么一個工具就可以了,如果需要在高并發的有一個性能表現的隊列,那么你就使用他就可以了,內部也是大量的使用了無鎖的算法,沒有把線程掛起的操作
?
總結