redis杂谈
使用緩存,可以 提升應用程序性能、提高讀取吞吐量(IOPS)、消除數據庫熱點、可預測的性能、減少后端負載、降低數據庫成本
Redis 相關概念
1、緩存穿透
? 緩存穿透是指查詢一個根本不存在的數據, 緩存層和存儲層都不會命中, 通常出于容錯的考慮, 如果從存儲層查不到數據則不寫入緩存層。
問題:緩存穿透將導致不存在的數據每次請求都要到存儲層去查詢, 失去了緩存保護后端存儲的意義。
造成緩存穿透的基本原因有兩個:
-
自身業務代碼或者數據出現問題。
-
一些惡意攻擊、 爬蟲等造成大量空命中。
#### 解決方案 -
緩存空對象:當沒有命中緩存,從數據庫中查詢數據為空后則緩存空對象,注意為了避免redis內存緩存空對象的浪費,需要為該空對象設置過期時間(過期時間能一定程度上解決頻繁地用不存在的數據的Key來進行請求)。
-
布隆過濾器:、某個值存在時,這個值可能不存在;當不存在時,那就肯定不存在,布隆過濾器解決的問題是:如何準確快速的判斷某個數據是否在大數據量集合中
2、緩存失效(擊穿)
? 由于大批量緩存在同一時間失效可能導致大量請求同時穿透緩存直達數據庫,可能會造成數據庫瞬間壓力過大甚至掛掉。
解決方案
- 數據設置不同的緩存時間()
- 根據不同的緩存使用次數延長其過期時間(具體的實現呢?)
3、緩存雪崩
緩存雪崩:就是redis緩存直接掛掉了,請求穿過緩存直接到達數據庫,最終導致數據庫宕機,服務不可用
解決方案
-
保證緩存層服務高可用性,比如使用Redis Sentinel或Redis Cluster。
-
依賴隔離組件為后端限流熔斷并降級。比如使用Sentinel或Hystrix限流降級組件。
4、數據一致性
-
[1.方式一:先更新數據庫,再更新緩存場景]
并發訪問會出現數據不一致的問題
-
[2.方式二:先更新緩存,再更新數據庫場景]
? 同方式一,并發訪問出現數據不一致
-
[3.方式三:先刪除緩存,再更新數據庫的場景]
? 同方式一,并發訪問出現數據不一致
-
[4.方式四:先更新數據庫,在刪除緩存場景]
并發訪問可能會短暫出現數據不一致情況,但最終都會一致。推薦
-
[5.方式五:最佳實現,數據異步同步]
canal:基于數據庫增量日志解析,提供增量數據訂閱和消費
mysql會將操作記錄在Binary log日志中,通過canal去監聽數據庫日志二進制文件,解析log日志,同步到redis中進行增刪改操作。
canal的工作原理:canal 模擬 MySQL slave 的交互協議,偽裝自己為 MySQL slave ,向 MySQL master 發送dump 協議;MySQL master 收到 dump 請求,開始推送 binary log 給 slave (即 canal );canal 解析 binary log 對象(原始為 byte 流)。
5、緩存過期淘汰策略
1. Redis緩存淘汰策略工作流程
- 首先,客戶端會發起需要更多內存的申請;
- 其次,Redis檢查內存使用情況,如果實際使用內存已經超出maxmemory,Redis就會根據用戶配置的淘汰策略選出無用的key;
- 最后,確認選中數據沒有問題,成功執行淘汰任務。
2. Redis3.0版本支持淘汰策略有6種
- no-eviction:當內存不足以容納新寫入數據時,新寫入操作會報錯。
- allkeys-lru:當內存不足以容納新寫入數據時,在鍵空間中,移除最近最少使用的key。
- allkeys-random:當內存不足以容納新寫入數據時,在鍵空間中,隨機移除某個key。
- volatile-lru:當內存不足以容納新寫入數據時,在設置了過期時間的鍵空間中,移除最近最少使用的key。
- volatile-random:當內存不足以容納新寫入數據時,在設置了過期時間的鍵空間中,隨機移除某個key。
- volatile-ttl:當內存不足以容納新寫入數據時,在設置了過期時間的鍵空間中,有更早過期時間的key優先移除。
7、redis應用場景
(1)熱點數據的緩存
? 這個應用場景我們比較常見的使用方式,為了降低對數據庫的訪問,會將對應數據添加到緩存中,提供并發訪問的能力,從而提高系統吞吐量。
(2)限時業務的運用
-
驗證碼,二維碼生存周期
手機號(唯一標識) 生成的驗證碼、二維碼信息保存在redis中指定過期時間(如果用戶輸入后 redis中的驗證碼過期 需要重新輸入),在一定時間內如果redis有信息,用戶頻繁獲取則將該信息直接返回并不調用真實獲取驗證碼,二維碼的接口。
-
接口api防刷,訂單重復提交問題
ip+api接口 作為key存儲并設置過期時間,value為請求次數 如果請求次數到達閾值則禁止請求。
訂單重復提交類似
(3)計數器相關問題
? 文章的點贊數、頁面的瀏覽數、網站的訪客數、視頻的播放數這些數據增長量很快,一旦數據規模上來后,對 mysql 讀寫都有很大的壓力,這時就要考慮 memcache、redis 進行存儲或 cache,同時定時同步到DB層。
(4)排行榜相關問題
? 對于千萬級別的數據、大量并發情況下,基于redis可靠的讀寫請求以及其zset數據結構 可以考慮使用redis來實現相關排行榜功能。新增數據后并在zset中添加數據(需要考慮排行榜的維度作為score)。
(5)分布式鎖
? redis是一個分布式存儲系統,同時其setNx命令是阻塞的(key存在則設置不成功) 可以很好的用來進行分布式的鎖操作處理,從而實現 秒殺、模擬搶單、搶紅包等關鍵資源的并發場景下的有序訪問。
(6)延時操作
- 訂單超過 30 分鐘未支付,則自動取消。
- 外賣商家超時未接單,則自動取消。
- 醫生搶單電話點診,超過 30 分鐘未打電話,則自動退款
針對如上場景 :我們可以使用 zset(sortedset)這個命令,用設置好的時間戳作為score進行排序,使用 zadd score1 value1 …命令就可以一直往內存中生產消息。再利用 zrangebysocre 查詢符合條件的所有待處理的任務,通過循環執行隊列任務即可。也可以通過 zrangebyscore key min max withscores limit 0 1 查詢最早的一條任務,來進行消費。
(7) 隊列
? redis支持list數據結果 使用LPUSH 和RPUSH、LPOP和RPOP可以很輕松的實現棧、隊列等數據結構
(8) 分布式應用session(redis實現)
? redis是分布式存儲且支持讀寫很快,所有用戶登錄后的相關用戶信息可以保存到redis中便于在后續分布式應用中進行使用。
8、redis高級用法
創建redis連接
@BeforeEach public void createMasterSlaveClient(){JedisPoolConfig config = new JedisPoolConfig();config.setMaxTotal(20);config.setMaxIdle(10);config.setMinIdle(5);//timeout,這里既是連接超時又是讀寫超時,從Jedis 2.8開始有區分connectionTimeout和soTimeout的構造函數jedisPool = new JedisPool(config, "IP", port,3000, "password"); }Pipeline管道的使用
-
首先Redis的管道(pipeline)并不是Redis服務端提供的功能,而是Redis客戶端為了減少網絡交互而提供的一種功能。
-
pipeline主要就是將多個請求合并,進行一次提交給Redis服務器,Redis服務器將所有請求處理完成之后,再一次性返回給客戶端。
-
pipeline執行的操作,和mget,mset,hmget這樣的操作不同,pipeline的操作是不具備原子性的。還有在集群模式下因為數據是被分散在不同的slot里面的,因此在進行批量操作的時候,不能保證操作的數據都在同一臺服務器的slot上,所以集群模式下是禁止執行像mget、mset、pipeline等批量操作的,如果非要使用批量操作,需要自己維護key與slot的關系。
-
pipeline也不能保證批量操作中有命令執行失敗了而中斷,也不能讓下一個指令依賴上一個指令, 如果非要這樣的復雜邏輯,建議使用lua腳本來完成操作。
位圖bitmap使用
-
redis的bitMap是使用一個bit來表示某個狀態,通常用于統計實時用戶登錄,活躍用戶數、用戶簽到、用戶在線狀態、統計活躍用戶、各種狀態值、自定義布隆過濾器、點贊功能
據悉 統計一億用戶實時登錄個數 使用內存為11.9M
使用位圖主要是考慮其如下優點
Redis分布式鎖
redis能用的的加鎖命令分表是INCR、SETNX、SET
1、INCR鎖
這種加鎖的思路是, key 不存在,那么 key 的值會先被初始化為 0 ,然后再執行 INCR 操作進行加一。
然后其它用戶在執行 INCR 操作進行加一時,如果返回的數大于 1 ,說明這個鎖正在被使用當中。例子如下:
redis實現的IncrLock鎖
class IncrLock {//加鎖public boolean trylock(String key,Long lockTime) {Jedis client = jedisPool.getResource();Long incrLock = client.incr(key);if(incrLock <=1){//加鎖成功System.out.println("incr 加鎖成功");//設置過期時間client.expire("incrLock",lockTime);return true;}else{System.out.println("incr 加鎖失敗");return false;}}//解鎖public void unLock(String key) {Jedis client = jedisPool.getResource();if(client.exists(key)){client.del(key);}}}模擬線程
class IncrTask implements Runnable {@Overridepublic void run() {IncrLock incrLock = new IncrLock();String key = "incrLock";try{//獲取鎖boolean incrLockFlag = incrLock.trylock(key,60L);if(incrLockFlag){//鎖資源獲取成功后的相關操作System.out.println("模擬鎖操作");Thread.sleep(50*1000);}}catch (Exception e){System.out.println("異常處理");}finally {incrLock.unLock(key);}} }測試
@Test public void testRedisIncrLock(){ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(10,10,100L, TimeUnit.SECONDS,new ArrayBlockingQueue<>(20));for(int i = 0; i<10;i++){poolExecutor.submit(new IncrTask());}try {Thread.sleep(6000L);} catch (InterruptedException e) {e.printStackTrace();} }2、setNx鎖
在指定的 key 不存在時,為 key 設置指定的值設置成功,返回 1 。 設置失敗,返回 0,key的測試編寫和ince一致,這里只提供加解鎖代碼
class SetNxLock {//加鎖public String trylock(String key,Long lockTime) {Jedis client = jedisPool.getResource();String value = UUID.randomUUID().toString();Long setNxLock = client.setnx(key,value);if(setNxLock == 1){//加鎖成功System.out.println("setNx 加鎖成功");//設置過期時間client.expire(key,lockTime);return value;}else{System.out.println("setNx 加鎖失敗");return null;}}//解鎖public void unLock(String key,String value) {Jedis client = jedisPool.getResource();if(value.equals(client.get(key))){client.del(key);}} }3、set鎖
? 上述的兩個鎖獲取成功后會設置鎖過期時間(防止程序異常退出使鎖無法釋放導致后續的操作無法再獲取到鎖),但是獲取鎖和設置過期時間會兩者組合會破壞其原子性,需要通過事務來確保原子性,但是還是有些問題,所以SET命令本身已經從版本 2.6.12 開始包含了不存在設置nx和設置過期時間的功能。如下為demo實現:
//redis的set鎖操作 class SetLock {//加鎖public String trylock(String key,Long lockTime) {Jedis client = jedisPool.getResource();String value = UUID.randomUUID().toString();SetParams params = new SetParams();//設置nx等同于setNx()方法 同時設置ex添加鎖過期時間params.nx().ex(lockTime);String setLock = client.set(key, value, params);if(StringUtils.isNotBlank(setLock)){//加鎖成功System.out.println("set 加鎖成功");//設置過期時間client.expire(key,lockTime);return value;}else{System.out.println("set 加鎖失敗");return null;}}//解鎖public void unLock(String key,String value) {Jedis client = jedisPool.getResource();if(value.equals(client.get(key))){client.del(key);}}在上述實現中我們只是簡單的對redis鎖進行了實現,但是上述代碼并非標準的實現方式,上述實現有一些問題,我們來一一解決
Redis事務
redis事務提供了一種“將多個命令打包, 然后一次性、按順序地執行”的機制, 并且事務在執行的期間不會主動中斷 —— 服務器在執行完事務中的所有命令之后, 才會繼續處理其他客戶端的其他命令。
若在事務隊列中存在語法性錯誤,則執行EXEC命令時,其他正確命令會被執行,錯誤命令拋出異常(redis不能保證事務的原子性)。
//解鎖public boolean unLock(String key,String value) {Jedis client = jedisPool.getResource();//監聽key 事務執行前key被其他命令修改則事務會被打斷client.watch(key);//開啟事務Transaction transaction = client.multi();try{//判斷是否為自己的鎖才進行釋放if(value.equals(transaction.get(key))){transaction.del(key);}//執行事務List<Object> results = transaction.exec();if (results == null) {return false;}}catch (Exception e){//事務回滾transaction.discard();}client.unwatch();return true;}redis使用lua腳本
Lua 是一種輕量小巧的腳本語言,用標準C語言編寫并以源代碼形式開放, 其設計目的是為了嵌入應用程序中,從而為應用程序提供靈活的擴展和定制功能。
LUA腳本的融合將使Redis數據庫產生更多的使用場景,迸發更多新的優勢:
- **高效性:**減少網絡開銷及時延,多次redis服務器網絡請求的操作,使用LUA腳本可以用一個請求完成
- **數據可靠性:**Redis會將整個腳本作為一個整體執行,中間不會被其他命令插入。
- **復用性:**LUA腳本執行后會永久存儲在Redis服務器端,其他客戶端可以直接復用
所以通常情況下redis+lua用來解決redis事務不能解決保證的原子性操作(redis事務本身是有問題的)
class LuaLock {// 加鎖腳本private static final String SCRIPT_LOCK = "if redis.call('setnx', KEYS[1], ARGV[1]) == 1 then redis.call('expire', KEYS[1], ARGV[2]) return 1 else return 2 end";// 解鎖腳本private static final String SCRIPT_UNLOCK = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";private static final String SCRIPT_TEMP = "return {KEYS[1],KEYS[2],ARGV[1],ARGV[2]}";// 加鎖腳本sha1值 // private static final String SCRIPT_LOCK_SHA1 = Sha1Util.encrypt(SCRIPT_LOCK);// 解鎖腳本sha1值 // private static final String SCRIPT_UNLOCK_SHA1 = Sha1Util.encrypt(SCRIPT_UNLOCK);//加鎖public String trylock(String key,Long lockTime) {Jedis client = jedisPool.getResource();String value = UUID.randomUUID().toString();//判斷lua腳本是否存在,不存在加載,存在使用 // if(client.scriptExists(SCRIPT_LOCK_SHA1)){ // client.evalsha(SCRIPT_LOCK_SHA1); // } // Long result = (Long) client.eval(SCRIPT_LOCK,2,key,value,key,"60000");Long result = (Long) client.eval(LuaLock.SCRIPT_LOCK, Lists.newArrayList("1"),Lists.newArrayList(key,"40000"));System.out.println("lua 腳本加鎖 "+result);if(result == 0){return null;}return value;}//解鎖public void unLock(String key,String value) {Jedis client = jedisPool.getResource();Object result = client.eval(SCRIPT_UNLOCK, Lists.newArrayList(key), Lists.newArrayList(value));System.out.println("lua 腳本解鎖 "+result);}}發布/訂閱
| 1 | [PSUBSCRIBE pattern [pattern …]] 訂閱一個或多個符合給定模式的頻道。 |
| 2 | [PUBSUB subcommand [argument [argument …]]] 查看訂閱與發布系統狀態。 |
| 3 | [PUBLISH channel message] 將信息發送到指定的頻道。 |
| 4 | [PUNSUBSCRIBE [pattern [pattern …]]] 退訂所有給定模式的頻道。 |
| 5 | [SUBSCRIBE channel [channel …]] 訂閱給定的一個或多個頻道的信息。 |
| 6 | [UNSUBSCRIBE [channel [channel …]]] 指退訂給定的頻道。 |
總結
- 上一篇: 【Zookeeper】源码分析之服务器(
- 下一篇: python实现抓取网页上的内容并发送到