基于Consul的分布式信号量实现
本文將繼續討論基于Consul的分布式鎖實現。信號量是我們在實現并發控制時會經常使用的手段,主要用來限制同時并發線程或進程的數量,比如:Zuul默認情況下就使用信號量來限制每個路由的并發數,以實現不同路由間的資源隔離。
信號量(Semaphore),有時被稱為信號燈,是在多線程環境下使用的一種設施,是可以用來保證兩個或多個關鍵代碼段不被并發調用。在進入一個關鍵代碼段之前,線程必須獲取一個信號量;一旦該關鍵代碼段完成了,那么該線程必須釋放信號量。其它想進入該關鍵代碼段的線程必須等待直到第一個線程釋放信號量。為了完成這個過程,需要創建一個信號量VI,然后將Acquire Semaphore VI以及Release Semaphore VI分別放置在每個關鍵代碼段的首末端,確認這些信號量VI引用的是初始創建的信號量。如在這個停車場系統中,車位是公共資源,每輛車好比一個線程,看門人起的就是信號量的作用。
實現思路
- 信號量存儲:semaphore/key
- acquired操作:
- 創建session
- 鎖定key競爭者:semaphore/key/session
- 查詢信號量:semaphore/key/.lock,可以獲得如下內容(如果是第一次創建信號量,將獲取不到,這個時候就直接創建)
| { "limit": 3, "holders": [ "90c0772a-4bd3-3a3c-8215-3b8937e36027", "93e5611d-5365-a374-8190-f80c4a7280ab" ] } |
- 如果持有者已達上限,返回false,如果阻塞模式,就繼續嘗試acquired操作
- 如果持有者未達上限,更新semaphore/key/.lock的內容,將當前線程的sessionId加入到holders中。注意:更新的時候需要設置cas,它的值是“查詢信號量”步驟獲得的“ModifyIndex”值,該值用于保證更新操作的基礎沒有被其他競爭者更新。如果更新成功,就開始執行具體邏輯。如果沒有更新成功,說明有其他競爭者搶占了資源,返回false,阻塞模式下繼續嘗試acquired操作
- release操作:
- 從semaphore/key/.lock的holders中移除當前sessionId
- 刪除semaphore/key/session
- 刪除當前的session
流程圖
代碼實現
| public class Semaphore { private Logger logger = Logger.getLogger(getClass()); private static final String prefix = "semaphore/"; // 信號量參數前綴 private ConsulClient consulClient; private int limit; private String keyPath; private String sessionId = null; private boolean acquired = false; /** * * @param consulClient consul客戶端實例 * @param limit 信號量上限值 * @param keyPath 信號量在consul中存儲的參數路徑 */ public Semaphore(ConsulClient consulClient, int limit, String keyPath) { this.consulClient = consulClient; this.limit = limit; this.keyPath = prefix + keyPath; } /** * acquired信號量 * * @param block 是否阻塞。如果為true,那么一直嘗試,直到獲取到該資源為止。 * @return * @throws IOException */ public Boolean acquired(boolean block) throws IOException { if(acquired) { logger.error(sessionId + " - Already acquired"); throw new RuntimeException(sessionId + " - Already acquired"); } // create session clearSession(); this.sessionId = createSessionId("semaphore"); logger.debug("Create session : " + sessionId); // add contender entry String contenderKey = keyPath + "/" + sessionId; logger.debug("contenderKey : " + contenderKey); PutParams putParams = new PutParams(); putParams.setAcquireSession(sessionId); Boolean b = consulClient.setKVValue(contenderKey, "", putParams).getValue(); if(!b) { logger.error("Failed to add contender entry : " + contenderKey + ", " + sessionId); throw new RuntimeException("Failed to add contender entry : " + contenderKey + ", " + sessionId); } while(true) { // try to take the semaphore String lockKey = keyPath + "/.lock"; String lockKeyValue; GetValue lockKeyContent = consulClient.getKVValue(lockKey).getValue(); if (lockKeyContent != null) { // lock值轉換 lockKeyValue = lockKeyContent.getValue(); BASE64Decoder decoder = new BASE64Decoder(); byte[] v = decoder.decodeBuffer(lockKeyValue); String lockKeyValueDecode = new String(v); logger.debug("lockKey=" + lockKey + ", lockKeyValueDecode=" + lockKeyValueDecode); Gson gson = new Gson(); ContenderValue contenderValue = gson.fromJson(lockKeyValueDecode, ContenderValue.class); // 當前信號量已滿 if(contenderValue.getLimit() == contenderValue.getHolders().size()) { logger.debug("Semaphore limited " + contenderValue.getLimit() + ", waiting..."); if(block) { // 如果是阻塞模式,再嘗試 try { Thread.sleep(100L); } catch (InterruptedException e) { } continue; } // 非阻塞模式,直接返回沒有獲取到信號量 return false; } // 信號量增加 contenderValue.getHolders().add(sessionId); putParams = new PutParams(); putParams.setCas(lockKeyContent.getModifyIndex()); boolean c = consulClient.setKVValue(lockKey, contenderValue.toString(), putParams).getValue(); if(c) { acquired = true; return true; } else continue; } else { // 當前信號量還沒有,所以創建一個,并馬上搶占一個資源 ContenderValue contenderValue = new ContenderValue(); contenderValue.setLimit(limit); contenderValue.getHolders().add(sessionId); putParams = new PutParams(); putParams.setCas(0L); boolean c = consulClient.setKVValue(lockKey, contenderValue.toString(), putParams).getValue(); if (c) { acquired = true; return true; } continue; } } } /** * 創建sessionId * @param sessionName * @return */ public String createSessionId(String sessionName) { NewSession newSession = new NewSession(); newSession.setName(sessionName); return consulClient.sessionCreate(newSession, null).getValue(); } /** * 釋放session、并從lock中移除當前的sessionId * @throws IOException */ public void release() throws IOException { if(this.acquired) { // remove session from lock while(true) { String contenderKey = keyPath + "/" + sessionId; String lockKey = keyPath + "/.lock"; String lockKeyValue; GetValue lockKeyContent = consulClient.getKVValue(lockKey).getValue(); if (lockKeyContent != null) { // lock值轉換 lockKeyValue = lockKeyContent.getValue(); BASE64Decoder decoder = new BASE64Decoder(); byte[] v = decoder.decodeBuffer(lockKeyValue); String lockKeyValueDecode = new String(v); Gson gson = new Gson(); ContenderValue contenderValue = gson.fromJson(lockKeyValueDecode, ContenderValue.class); contenderValue.getHolders().remove(sessionId); PutParams putParams = new PutParams(); putParams.setCas(lockKeyContent.getModifyIndex()); consulClient.deleteKVValue(contenderKey); boolean c = consulClient.setKVValue(lockKey, contenderValue.toString(), putParams).getValue(); if(c) { break; } } } // remove session key } this.acquired = false; clearSession(); } public void clearSession() { if(sessionId != null) { consulClient.sessionDestroy(sessionId, null); sessionId = null; } } class ContenderValue implements Serializable { private Integer limit; private List<String> holders = new ArrayList<>(); public Integer getLimit() { return limit; } public void setLimit(Integer limit) { this.limit = limit; } public List<String> getHolders() { return holders; } public void setHolders(List<String> holders) { this.holders = holders; } public String toString() { return new Gson().toJson(this); } } } |
單元測試
下面單元測試的邏輯:通過線程的方式來模擬不同的分布式服務來獲取信號量執行業務邏輯。由于信號量與簡單的分布式互斥鎖有所不同,它不是只限定一個線程可以操作,而是可以控制多個線程的并發,所以通過下面的單元測試,我們設置信號量為3,然后同時啟動15個線程來競爭的情況,來觀察分布式信號量實現的結果如何。
| public class TestLock { private Logger logger = Logger.getLogger(getClass()); public void testSemaphore() throws Exception { new Thread(new SemaphoreRunner(1)).start(); new Thread(new SemaphoreRunner(2)).start(); new Thread(new SemaphoreRunner(3)).start(); new Thread(new SemaphoreRunner(4)).start(); new Thread(new SemaphoreRunner(5)).start(); new Thread(new SemaphoreRunner(6)).start(); new Thread(new SemaphoreRunner(7)).start(); new Thread(new SemaphoreRunner(8)).start(); new Thread(new SemaphoreRunner(9)).start(); new Thread(new SemaphoreRunner(10)).start(); Thread.sleep(1000000L); } } public class SemaphoreRunner implements Runnable { private Logger logger = Logger.getLogger(getClass()); private int flag; public SemaphoreRunner(int flag) { this.flag = flag; } public void run() { Semaphore semaphore = new Semaphore(new ConsulClient(), 3, "mg-init"); try { if (semaphore.acquired(true)) { // 獲取到信號量,執行業務邏輯 logger.info("Thread " + flag + " start!"); Thread.sleep(new Random().nextInt(10000)); logger.info("Thread " + flag + " end!"); } } catch (Exception e) { e.printStackTrace(); } finally { try { // 信號量釋放、Session鎖釋放、Session刪除 semaphore.release(); } catch (IOException e) { e.printStackTrace(); } } } } |
| INFO [Thread-6] SemaphoreRunner - Thread 7 start! INFO [Thread-2] SemaphoreRunner - Thread 3 start! INFO [Thread-7] SemaphoreRunner - Thread 8 start! INFO [Thread-2] SemaphoreRunner - Thread 3 end! INFO [Thread-5] SemaphoreRunner - Thread 6 start! INFO [Thread-6] SemaphoreRunner - Thread 7 end! INFO [Thread-9] SemaphoreRunner - Thread 10 start! INFO [Thread-5] SemaphoreRunner - Thread 6 end! INFO [Thread-1] SemaphoreRunner - Thread 2 start! INFO [Thread-7] SemaphoreRunner - Thread 8 end! INFO [Thread-10] SemaphoreRunner - Thread 11 start! INFO [Thread-10] SemaphoreRunner - Thread 11 end! INFO [Thread-12] SemaphoreRunner - Thread 13 start! INFO [Thread-1] SemaphoreRunner - Thread 2 end! INFO [Thread-3] SemaphoreRunner - Thread 4 start! INFO [Thread-9] SemaphoreRunner - Thread 10 end! INFO [Thread-0] SemaphoreRunner - Thread 1 start! INFO [Thread-3] SemaphoreRunner - Thread 4 end! INFO [Thread-14] SemaphoreRunner - Thread 15 start! INFO [Thread-12] SemaphoreRunner - Thread 13 end! INFO [Thread-0] SemaphoreRunner - Thread 1 end! INFO [Thread-13] SemaphoreRunner - Thread 14 start! INFO [Thread-11] SemaphoreRunner - Thread 12 start! INFO [Thread-13] SemaphoreRunner - Thread 14 end! INFO [Thread-4] SemaphoreRunner - Thread 5 start! INFO [Thread-4] SemaphoreRunner - Thread 5 end! INFO [Thread-8] SemaphoreRunner - Thread 9 start! INFO [Thread-11] SemaphoreRunner - Thread 12 end! INFO [Thread-14] SemaphoreRunner - Thread 15 end! INFO [Thread-8] SemaphoreRunner - Thread 9 end! |
從測試結果,我們可以發現當信號量持有者數量達到信號量上限3的時候,其他競爭者就開始進行等待了,只有當某個持有者釋放信號量之后,才會有新的線程變成持有者,從而開始執行自己的業務邏輯。所以,分布式信號量可以幫助我們有效的控制同時操作某個共享資源的并發數。
優化建議
同前文一樣,這里只是做了簡單的實現。線上應用還必須加入TTL的session清理以及對.lock資源中的無效holder進行清理的機制。
參考文檔:https://www.consul.io/docs/guides/semaphore.html
實現代碼
- GitHub:https://github.com/dyc87112/consul-distributed-lock
- 開源中國:http://git.oschina.net/didispace/consul-distributed-lock
總結
以上是生活随笔為你收集整理的基于Consul的分布式信号量实现的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 如果你跟夕小瑶恋爱了...(上)
- 下一篇: 深入理解 Objective-C:方法缓