mysql和redis库存扣减和优化
前言
大流量情況下的庫存是老生常談的問題了,在這里我整理一下mysql和redis應(yīng)對扣除庫存的方案,采用jmeter進(jìn)行壓測。
JMETER設(shè)置
庫存初始值50,線程數(shù)量1000個,1秒以內(nèi)啟動全部,一個線程循環(huán)2次,共2000個請求
MySQL方案
初始方案
<update id="decreaseStock">
UPDATE stock
SET stock_num = stock_num - 1
WHERE id = #{id}
</update>
這種情況下,在并發(fā)條件肯定會出現(xiàn)超賣的
進(jìn)行修改:
<update id="decreaseStock">
UPDATE stock
SET stock_num = stock_num - 1
WHERE id = #{id} AND stock_num >= 1
</update>
增加AND stock_num >= 1條件,即可避免超賣。
相關(guān)代碼:
@PostMapping(value = "/decreaseStock/{id}")
public ResponseEntity<Object> decreaseStock(@PathVariable("id") Integer id) {
int result = stockService.decreaseStock(id);
return result == 1 ? new ResponseEntity<>("decreaseStock successfully", HttpStatus.OK) : new ResponseEntity<>("decreaseStock failed", HttpStatus.OK);
}
壓測情況:
根據(jù)Throught可知一秒可以處理200個事務(wù)(TPS)
如果說系統(tǒng)的并發(fā)量不高,則可以以這種方案進(jìn)行防止庫存超賣,但要注意,在可重復(fù)讀隔離級別情況下,如果where的條件字段沒有索引的話,進(jìn)行update語句會使整個表被鎖住,如果這里使用的where條件不是主鍵id而是product_name,那么需要給這個字段加索引。
在RR可重復(fù)讀隔離級別下,如果where條件沒有命中索引,那么會基于next-key lock(記錄鎖和間隙鎖的組合)對整個表的所有記錄加上這個鎖,進(jìn)行全表掃描,這個時候其他記錄想要更新就會被阻塞。
但是不一定是有了索引就不會鎖住整個表,這是由優(yōu)化器決定的,可以使用Explain語句來查看當(dāng)前語句是走的索引還是全表掃描,如果優(yōu)化器走的還是全標(biāo)掃描,可以使用 force index([index_name]) 強(qiáng)制使用某個索引。
改進(jìn)
在MySQL情況下還能有其他方案來提升性能嗎,在不借助Redis的情況(曾經(jīng)面試招銀網(wǎng)絡(luò)被問了這道題)
我當(dāng)時給出的回答是,把單個商品的庫存比如50個庫存,拆分成好幾份,一份10個,5份庫存,由于秒殺情況下流量很大,可以把這五份庫存分別放到五個數(shù)據(jù)庫里面,這樣性能至少是原先方案的5倍,那么還會出現(xiàn)新的問題,就是有些問題,負(fù)載均衡上的問題,可能會出現(xiàn)某些庫里還存在庫存,但是請求卻沒有打進(jìn)這個數(shù)據(jù)庫,而是打到庫存已經(jīng)沒有的數(shù)據(jù)庫里面。我當(dāng)時的想法是再搞個庫存表,這個庫存表采集各個商品的總庫存以及商品在各個分庫里面的庫存數(shù)量,然后再寫個服務(wù),包含負(fù)載均衡的算法,將用戶的請求平均打到各個分庫去,當(dāng)某個分庫的庫存達(dá)到0的時候,去通知該服務(wù),服務(wù)將這個庫剔除,使新的請求不會轉(zhuǎn)發(fā)過去。實際這種情況也是存在問題的,高并發(fā)下庫存為0的庫來不及被剔除,也會導(dǎo)致請求被打到庫存0的庫。
Redis方案
將庫存暫時放到Redis,然后從Redis進(jìn)行庫存扣減,能大大提升性能
壓測結(jié)果:
可見性能幾乎是MySQL的10倍了,但是這樣子在Redis里面會導(dǎo)致超賣
要確保Redis不超買,需要先查詢當(dāng)前的數(shù)量,如果大于0則進(jìn)行扣減,并且查詢和扣減需要為原子性,這里就需要借助lua腳本,將這兩次操作寫到一起。
加了Lua腳本的代碼:
private static final String LUA_DECRESE_STOCK_PATH = "lua/decreseStock.lua";
@PostMapping(value = "/decreaseStockByRedis/{id}")
public ResponseEntity<Object> decreaseStockByRedis(@PathVariable("id") Integer id) {
DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
redisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource(LUA_DECRESE_STOCK_PATH)));
redisScript.setResultType(Long.class);
// 執(zhí)行Lua腳本
Long result = (Long) redisTemplate.execute(redisScript, Collections.singletonList(id));
// 返回結(jié)果判斷
return (result != null && result == 1) ? new ResponseEntity<>("decreaseStock successfully", HttpStatus.OK) : new ResponseEntity<>("decreaseStock failed", HttpStatus.OK);
}
lua腳本放在resource/lua/decreseStock.lua
local key = KEYS[1]
-- 檢查鍵是否存在
local exists = redis.call('EXISTS', key)
if exists == 1 then
-- 鍵存在,獲取值
local value = redis.call('GET', key)
if tonumber(value) > 0 then
-- 如果值大于0,則遞減
redis.call('DECR', key)
return 1 -- 表示遞減成功
else
return 0 -- 表示遞減失敗,值不大于0
end
else
return -1 -- 表示遞減失敗,鍵不存在
end
Redis同步庫存到MySQL
但是在Redis扣減了庫存,總需要同步到MySQL里面
@PostMapping(value = "/decreaseStockByRedis/{id}")
public ResponseEntity<Object> decreaseStockByRedis(@PathVariable("id") Integer id) {
DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
redisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource(LUA_DECRESE_STOCK_PATH)));
redisScript.setResultType(Long.class);
// 執(zhí)行Lua腳本
Long redisResult = (Long) redisTemplate.execute(redisScript, Collections.singletonList(id));
int dataBaselResult = 0;
if (redisResult == 1) {
dataBaselResult = stockService.decreaseStock(id);
}
// 返回結(jié)果判斷
return (dataBaselResult == 1 && redisResult == 1) ? new ResponseEntity<>("decreaseStock successfully", HttpStatus.OK) : new ResponseEntity<>("decreaseStock failed", HttpStatus.OK);
}
直接按照上述代碼來寫,刪Redis后同時將庫存同步到MySQL,相當(dāng)于使用了Redis性能又沒有提升。
其實選擇了Redis來進(jìn)行庫存扣減,那么MySQL的庫存并不需要去實時進(jìn)行更新,只需要庫存達(dá)到最終一致性即可,即先對Redis的庫存進(jìn)行更新,然后再異步同步到MySQL的庫存。
如果使用spring的異步線程來解決,會不會出現(xiàn)同步MySQL失敗導(dǎo)致數(shù)據(jù)最終不一致呢,在流量很多的情況下,系統(tǒng)本身就處于壓力大的情況,再使用異步線程會占用額外的資源,最好的方法是引入MQ,把庫存的同步信息交給MQ,MQ再交到消費系統(tǒng),進(jìn)行減庫存的操作,由MQ保證消息被消費,實現(xiàn)最終一致性。
部分代碼如下,由MQ product發(fā)出,再由consumer進(jìn)行消費:
private final DecreaseStockProduce decreaseStockProduce;
@PostMapping(value = "/decreaseStockByRedis/{id}")
public ResponseEntity<Object> decreaseStockByRedis(@PathVariable("id") String id) {
DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
redisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource(LUA_DECRESE_STOCK_PATH)));
redisScript.setResultType(Long.class);
// 執(zhí)行Lua腳本
Long redisResult = (Long) redisTemplate.execute(redisScript, Collections.singletonList(id));
if (redisResult == 1) {
// 發(fā)送消息
try {
DecreaseStockEvent decreaseStockEvent = DecreaseStockEvent.builder()
.id(id)
.build();
SendResult sendResult = decreaseStockProduce.sendMessage(decreaseStockEvent);
if (!Objects.equals(sendResult.getSendStatus(), SendStatus.SEND_OK)) {
log.error("消息發(fā)送錯誤,請求參數(shù):{}", id);
}
} catch (Exception e) {
log.error("消息發(fā)送錯誤,請求參數(shù):{}", id, e);
}
}
// 返回結(jié)果判斷
return (redisResult == 1) ? new ResponseEntity<>("decreaseStock successfully", HttpStatus.OK) : new ResponseEntity<>("decreaseStock failed", HttpStatus.OK);
}
MQ [TIMEOUT_CLEAN_QUEUE] broker busy問題
這里直接壓測會報下面的錯誤,并且這個時候查看redis庫存已經(jīng)減到0,到是MySQL只減到了37
針對MQ [TIMEOUT_CLEAN_QUEUE] broker busy問題,需要去修改MQ的broker.conf文件
針對TIMEOUT_CLEAN_QUEUE broker busy問題,需要去修改MQ的broker.conf文件,上述的201ms超時了,我這里將等待時間改為400,并且將線程數(shù)設(shè)置為64,這個線程數(shù)可以根據(jù)實際壓測情況進(jìn)行調(diào)整。
# 發(fā)消息線程池數(shù)量
sendMessageThreadPoolNums=64
# 拉消息線程池數(shù)量
pullMessageThreadPoolNums=64
waitTimeMillsInSendQueue=400
現(xiàn)在再進(jìn)行壓測,發(fā)現(xiàn)tps能跑到1000,相比直接入庫mysql的200已經(jīng)是提升很大了。
雖然性能提高,也實現(xiàn)庫存的同步,但這個性能下還是會存在一些問題:
比如MQ消息發(fā)送失敗、或者M(jìn)ySQL庫存扣減失敗,并且實際情況還有訂單的生成和庫存之間的一致性也要考慮。
對于上述這些問題,可以查看我的另外一篇博客:
RocketMQ事務(wù)消息在訂單創(chuàng)建和庫存扣減的使用 - Scotyzh - 博客園 (cnblogs.com)
總結(jié)
以上是生活随笔為你收集整理的mysql和redis库存扣减和优化的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 目录文件夹和根目录
- 下一篇: 【scikit-learn基础】--『监