Java 并发实践 — ConcurrentHashMap 与 CAS
轉載自?Java 并發(fā)實踐 — ConcurrentHashMap 與 CAS
最近在做接口限流時涉及到了一個有意思問題,牽扯出了關于concurrentHashMap的一些用法,以及CAS的一些概念。限流算法很多,我主要就以最簡單的計數器法來做引。先抽象化一下需求:統(tǒng)計每個接口訪問的次數。一個接口對應一個url,也就是一個字符串,每調用一次對其進行加一處理。可能出現的問題主要有三個:
但這次的博客并不是想描述怎么去實現接口限流,而是主要想描述一下遇到的問題,所以,第二點暫時不考慮,即不使用Redis。
說到并發(fā)的字符串統(tǒng)計,立即讓人聯(lián)想到的數據結構便是ConcurrentHashpMap<String,Long> urlCounter;
如果你剛剛接觸并發(fā)可能會寫出如代碼清單1的代碼
代碼清單1:
| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546 | publicclass CounterDemo1 {????privatefinal Map<String, Long> urlCounter = newConcurrentHashMap<>();????//接口調用次數+1????publiclong increase(String url) {????????Long oldValue = urlCounter.get(url);????????Long newValue = (oldValue == null) ? 1L : oldValue + 1;????????urlCounter.put(url, newValue);????????returnnewValue;????}????//獲取調用次數????publicLong getCount(String url){????????returnurlCounter.get(url);????}????publicstatic void main(String[] args) {????????ExecutorService executor = Executors.newFixedThreadPool(10);????????finalCounterDemo1 counterDemo = newCounterDemo1();????????intcallTime = 100000;????????finalString url = "http://localhost:8080/hello";????????CountDownLatch countDownLatch = newCountDownLatch(callTime);????????//模擬并發(fā)情況下的接口調用統(tǒng)計????????for(inti=0;i<callTime;i++){????????????executor.execute(newRunnable() {????????????????@Override????????????????publicvoid run() {????????????????????counterDemo.increase(url);????????????????????countDownLatch.countDown();????????????????}????????????});????????}????????try{????????????countDownLatch.await();????????}catch(InterruptedException e) {????????????e.printStackTrace();????????}????????executor.shutdown();????????//等待所有線程統(tǒng)計完成后輸出調用次數????????System.out.println("調用次數:"+counterDemo.getCount(url));????}}console output:調用次數:96526 |
都說concurrentHashMap是個線程安全的并發(fā)容器,所以沒有顯示加同步,實際效果呢并不如所愿。
問題就出在increase方法,concurrentHashMap能保證的是每一個操作(put,get,delete…)本身是線程安全的,但是我們的increase方法,對concurrentHashMap的操作是一個組合,先get再put,所以多個線程的操作出現了覆蓋。如果對整個increase方法加鎖,那么又違背了我們使用并發(fā)容器的初衷,因為鎖的開銷很大。我們有沒有方法改善統(tǒng)計方法呢?
代碼清單2羅列了concurrentHashMap父接口concurrentMap的一個非常有用但是又常常被忽略的方法。
代碼清單2:
| 1234567891011121314 | /**?* Replaces the entry for a key only if currently mapped to a given value.?* This is equivalent to?*? <pre> {@code?* if (map.containsKey(key) && Objects.equals(map.get(key), oldValue)) {?*?? map.put(key, newValue);?*?? return true;?* } else?*?? return false;?* }</pre>?*?* except that the action is performed atomically.?*/booleanreplace(K key, V oldValue, V newValue); |
這其實就是一個最典型的CAS操作,except that the action is performed atomically.這句話真是幫了大忙,我們可以保證比較和設置是一個原子操作,當A線程嘗試在increase時,舊值被修改的話就回導致replace失效,而我們只需要用一個循環(huán),不斷獲取最新值,直到成功replace一次,即可完成統(tǒng)計。
改進后的increase方法如下
代碼清單3:
| 1234567891011121314151617181920212223 | publiclong increase2(String url) {????????Long oldValue, newValue;????????while(true) {????????????oldValue = urlCounter.get(url);????????????if(oldValue == null) {????????????????newValue = 1l;????????????????//初始化成功,退出循環(huán)????????????????if(urlCounter.putIfAbsent(url, 1l) == null)????????????????????break;????????????????//如果初始化失敗,說明其他線程已經初始化過了????????????}else{????????????????newValue = oldValue + 1;????????????????//+1成功,退出循環(huán)????????????????if(urlCounter.replace(url, oldValue, newValue))????????????????????break;????????????????//如果+1失敗,說明其他線程已經修改過了舊值????????????}????????}????????returnnewValue;????}console output:調用次數:100000 |
再次調用后獲得了正確的結果,上述方案看上去比較繁瑣,因為第一次調用時需要進行一次初始化,所以多了一個判斷,也用到了另一個CAS操作putIfAbsent,他的源代碼描述如下:
代碼清單4:
| 123456789101112131415161718192021222324252627282930313233 | /**?????* If the specified key is not already associated?????* with a value, associate it with the given value.?????* This is equivalent to?????*? <pre> {@code?????* if (!map.containsKey(key))?????*?? return map.put(key, value);?????* else?????*?? return map.get(key);?????* }</pre>?????*?????* except that the action is performed atomically.?????*?????* @implNote This implementation intentionally re-abstracts the?????* inappropriate default provided in {@code Map}.?????*?????* @param key key with which the specified value is to be associated?????* @param value value to be associated with the specified key?????* @return the previous value associated with the specified key, or?????*???????? {@code null} if there was no mapping for the key.?????*???????? (A {@code null} return can also indicate that the map?????*???????? previously associated {@code null} with the key,?????*???????? if the implementation supports null values.)?????* @throws UnsupportedOperationException if the {@code put} operation?????*???????? is not supported by this map?????* @throws ClassCastException if the class of the specified key or value?????*???????? prevents it from being stored in this map?????* @throws NullPointerException if the specified key or value is null,?????*???????? and this map does not permit null keys or values?????* @throws IllegalArgumentException if some property of the specified key?????*???????? or value prevents it from being stored in this map?????*/?????V putIfAbsent(K key, V value); |
簡單翻譯如下:“如果(調用該方法時)key-value 已經存在,則返回那個 value 值。如果調用時 map 里沒有找到 key 的 mapping,返回一個 null 值”。值得注意點的一點就是concurrentHashMap的value是不能存在null值的。實際上呢,上述的方案也可以把Long替換成AtomicLong,可以簡化實現, ConcurrentHashMap
| 1234567891011 | privateAtomicLongMap<String> urlCounter3 = AtomicLongMap.create();publiclong increase3(String url) {????longnewValue = urlCounter3.incrementAndGet(url);????returnnewValue;}publicLong getCount3(String url) {????returnurlCounter3.get(url);} |
看一下他的源碼就會發(fā)現,其實和代碼清單3思路差不多,只不過功能更完善了一點。
和CAS很像的操作,我之前的博客中提到過數據庫的樂觀鎖,用version字段來進行并發(fā)控制,其實也是一種compare and swap的思想。
雜談:網上很多對ConcurrentHashMap的介紹,眾所周知,這是一個用分段鎖實現的一個線程安全的map容器,但是真正對他的使用場景有介紹的少之又少。面試中能知道這個容器的人也確實不少,問出去,也就回答一個分段鎖就沒有下文了,但我覺得吧,有時候一知半解反而會比不知道更可怕。
參考:
[1] https://my.oschina.net/mononite/blog/144329
[2] http://www.tuicool.com/articles/zuui6z
總結
以上是生活随笔為你收集整理的Java 并发实践 — ConcurrentHashMap 与 CAS的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Java 8新特性探究(二)深入解析默认
- 下一篇: 中国充电联盟:9 月公共充电桩环比增加