如果被问到分布式锁,应该怎样回答?
作者 | tech-bus.七十一
來源 | 程序員巴士
說到鎖,在平時的工作中,主要是使用synchronized關鍵字,或者相關的一些類庫來實現同步,但這都是基于單機應用而言的,當我們的應用多實例部署時,這時候就需要用到分布式鎖了,常用的分布式鎖主要是基于redis的分布式鎖和基于zookeeper的分布式鎖及基數據庫的分布式鎖,前倆個主要基于中間件的特性來實現,今天介紹一下基于數據庫的分布式鎖的實現,在一些并發不高的場景下比較適用。
首先需要在數據庫中建立好數據表,相關的字段如下所示:
CREATE TABLE IF NOT EXISTS `lock_tbl`(`lock_id` INT NOT NULL, -- 主鍵且主要字段不可少`des_one` VARCHAR(20), -- 可有可無`des_two` VARCHAR(20), -- 可有可無PRIMARY KEY ( `lock_id` ) )ENGINE=InnoDB DEFAULT CHARSET=utf8;接著我們使用單實例應用,編寫一個接口,去買一個表里的商品,大致思路就是:讀取庫存,庫存減一,回寫數據庫,返回成功,其核心代碼如下:
public class StockServiceImpl implements StockService{@AutowiredStockMapper stockMapper;@Overridepublic Stock selectByPrimaryKey(Integer goodsId) {return stockMapper.selectByPrimaryKey(goodsId);}// 加鎖也只能保證單個實例線程安全性public synchronized void byGoods() throws InterruptedException {// 這里寫死,數據庫里就一條記錄且ID為1,拿到數據Stock stock = selectByPrimaryKey(1);// 獲取到商品的庫存Long goodsStock = stock.getGoodsStock();// 減庫存goodsStock -= 1;stock.setGoodsStock(goodsStock);// 為了將問題放大這里睡上幾秒 拉長查庫存和更新庫存的之間的時間間隔Thread.sleep(3000);// 更新updateByPrimaryKeySelective(stock);// 輸出System.out.println("更新后庫存為:" + goodsStock);}@Overridepublic int updateByPrimaryKeySelective(Stock record) {return stockMapper.updateByPrimaryKeySelective(record);} }在單個實例里面加個synchronized后完全正常的減庫存,然后我們啟動兩個實例后使用postman對接口進行壓測,出現如下情況:
實例1打印日志實例2打印日志通過截圖可知上述程序已經出現超賣現象,接下來進行改造,使用數據庫層面的鎖,我們知道向一張表中插入倆條相同主鍵的數據,只可能成功一條,因為主鍵具有約束性,所以利用這個特點,當我們向數據庫插入成功時,即代表獲取到鎖,從而去運行我們的業務代碼,當我們的業務代碼運行完時,我們把數據庫的該條記錄進行刪除,即代表釋放鎖,從而其他線程即有機會獲取到鎖,再去跑業務代碼,這樣即使運行的是倆個實例,同一時間也只能一個線程去運行業務代碼,也就不會出現超賣這種情況了。下面給出加鎖和解鎖的代碼:
// 上鎖。由于上鎖失敗的話會直接返回失敗,并不會再次獲取 // 是非阻塞的,這里利用循環實現阻塞。@Overridepublic boolean tryLock() {// 這里的Lock就是簡單的一個POJO對象映射到數據庫中一張表的字段Lock lock = new Lock();lock.setLockId(1);// 通過while循環來實現阻塞while (true) {try {// 首先查詢一下主鍵為1的數據是否存在,如果存在則說明鎖已經被占用了if (lockMapper.selectByPrimaryKey(1) == null) {// 不存在則嘗試加鎖即向數據庫中插入數據int i = lockMapper.insert(lock);if (i == 1) {return true;}}Thread.sleep(1000);} catch (InterruptedException e) {}}}// 解鎖代碼@Overridepublic void unLock() {deleteByPrimaryKey(1);}對service層的購買商品的代碼就進行加鎖
// 買商品public void byWithLock() throws InterruptedException {// 上鎖lockService.tryLock();// 業務代碼byGoods();// 釋放鎖并跳出循環lockService.unLock();}對于controller層的代碼
@RestController public class LoadBalance {@AutowiredStockServiceImpl stockService;@RequestMapping("/balance")public String balance() {try {stockService.byWithLock();} catch (InterruptedException e) {e.printStackTrace();}return "success";} }再次將程序啟動,使用postman簡單做下壓測,發現已經正常進行減庫存了。結果如下圖所示
實例1日志實例2日志??
存在的問題
如果有一臺實例拿到鎖后宕機了,鎖未能及時釋放,那么其他實例將永遠無法獲取到鎖。
不可重入,一臺實例拿到鎖后,想再次獲取該鎖時會失敗
如何解決
對于存在實例宕機導致鎖無法釋放的問題,可以在插入數據的時候將當前的一個時間戳也插入數據庫中,然后啟一個定時任務,定期去掃表,同時設定一個鎖的超時時間(該超時時間一定要大于正常的接口調用時間),將超時的記錄進行刪除。
對于不可重入,可以在表中插入數據的時候增加實例和線程相關的信息,當獲取鎖時進行判斷,如果相符則直接獲取鎖。
悲觀鎖
悲觀鎖簡單理解就是在任何情況下都是悲觀的認為請求臨界資源的時候都會與其他線程發生沖突,因此每次都是加悲觀鎖,這種鎖具有強烈的侵占性和排他性。上述的例子中所加的鎖就是悲觀鎖即先取鎖再訪問,MySql自帶的悲觀鎖是For Update,使用For Update可以顯示的增加行鎖,但悲觀鎖會讓數據庫額外的開銷,同時增加死鎖的風險。
樂觀鎖
樂觀鎖簡單理解就是每次線程請求臨界資源時都認為不會有其他線程與其競爭,只有在數據進行提交的時候才進行競爭,在檢測數據沖突時并不依賴數據庫本身的鎖機制,不影響請求的性能。上述例子我們可以在數據庫表中增加一個Version版本號,對于要進行修改的數據,先從數據庫中將改Version的版本號查出來,然后修改的時候帶上該版本號一起修改?
SELECT VERSION FROM TABLE_A -- 假設這里查出來version的值是OldVersion UPDATE TABLE_A SET COUNT = COUNT -1, VERSION = VERSION + 1 WHERE VERSION = OldVersion總結
并發不是特別高的情況下可以考慮使用基于數據庫的分布式鎖,盡量采用樂觀鎖的方式以提高應用的吞吐量。
往期推薦
為什么大家都在抵制用定時任務實現「關閉超時訂單」功能?
Gartner 發布 2022 年汽車行業五大技術趨勢
別再用 Redis List 實現消息隊列了,Stream 專為隊列而生
OpenStack 如何跨版本升級
點分享
點收藏
點點贊
點在看
總結
以上是生活随笔為你收集整理的如果被问到分布式锁,应该怎样回答?的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 5G 落地进入爆发期,是时候让毫米波登场
- 下一篇: 可怕!CPU暗藏了这些未公开的指令!