现代的缓存设计方案:Window-TinyLFU
文章目錄
- 前導知識
- 傳統緩存的置換算法
- LRU
- LFU
- TinyLFU
- 哈希計數
- 哈希沖突的處理
- 保鮮機制
- Window-TinyLFU
- 架構設計
- 執行流程
- 查詢
- 寫入
- 代碼實現
- Window-LRU
- Segment-LRU
- BloomFilter
- CountMin-Sketch
- Window-TinyLFU
前導知識
- 位圖與布隆過濾器的概念以及實現
- LRU緩存機制(Least Recently Used)
- LFU緩存機制(Least Frequently Used)
- TinyLFU: A Highly Efficient Cache Admission Policy
本篇博客僅介紹算法原理,實現原理參考文末代碼中的注釋。
傳統緩存的置換算法
LRU
當我們提到緩存置換算法時,第一反應想到的就是 LRU(Least Recently Used) 算法。LRU 的原理很簡單,就是將最不常用的數據淘汰出緩存。
雖然這種方法非常簡單有效,但是我們發現 LRU 在淘汰緩存時僅僅依據上一次使用時間這一維度。那其就會存在這樣一個問題,大量訪問低頻的數據突發到來,就會把高頻數據給淘汰出緩存。 這里舉一個簡單的例子:
示例:LRU 中大量訪問低頻的數據突發到來,就會把高頻數據給淘汰出緩存如上圖所示,盡管 key3 訪問次數較多,但由于 LRU 僅使用時間作為維度,因此當只出現一次的新數據 key4 和 key5 到來時它仍然被淘汰出緩存。而這違背了我們緩存原本的目的:緩存高頻數據以降低下層(數據庫、文件系統等)查詢負擔。
為了解決這個問題,我們就必須提到 LFU。
LFU
LFU(Least Frequently Used)算法在 LRU 的基礎記錄了每個 key 的訪問頻次。其依據訪問頻次來作為淘汰的依據,原理如下:如果一個數據在最近一段時間很少被訪問到,那么可以認為在將來它被訪問的可能性也很小。因此,當空間滿時,將命中頻次最少的數據淘汰。
LFU 解決了 LRU 中低頻數據淘汰高頻數據的問題,但是以頻次作為淘汰依據,又帶來了新的問題。
- 曾經的高頻數據始終占據緩存,即使已經很有沒有訪問該 key 的請求。
- 突發的稀疏流量由于頻次較低,無法占據緩存。(導致短期內大量的數據庫查詢)
- 需要對每一個數據維護一個計數器,占用了大量的空間。
TinyLFU
TinyLRU 通過引入 Count-Min Sketch 算法來解決 LFU 面臨的其中兩個問題:
- LFU 需要大量空間來統計頻次信息(哈希計數)
- LFU 存在高頻舊數據長期不被淘汰(保鮮機制)
下面就來介紹一下 Count-Min Sketch 算法的核心原理。
哈希計數
在 LFU 中我們需要統計每一個 key 的訪問頻次,這時就需要使用一個整型或者長整型來存儲這個計數。但是,我們真的需要這么大的空間來計數嗎?在很多情況下,即使是熱點數據也并不會被過多的訪問,那么我們能否將進一步的壓縮計數所占據的存儲空間呢?
在之前的博客中我介紹過一種利用位圖來進行計數的場景:
給定100億個整數,設計算法找到只出現一次的整數?
通常我們在使用位圖時,由于一個位的值只有 0 或 1 ,因此大部分情況下用其來做一些 bool 類型的狀態標記(例如數據是否存在、用戶是否登錄等場景)。而如果我們將每個數據所占據的位數擴大,就可以標記更多的狀態。
假設我們認為緩存中一個 key 被訪問 15 次即作為高頻數據,那么我們是不是能利用 4 個 bit 位來標記一項數據?這樣就能夠將原先的 32/64 bit 壓縮至 4 bit,大大的減少了計數占據的空間。
此時我們將一個 32 位的 int 劃分為 8 個 4 bit 計數器,通過計算出數值對應的 int 下標以及其在 int 中偏移的位置,再加上位操作,就可以實現這樣的計數。偽代碼如下:
std::vector<int> _bits;int get(int num) const {int countIndex = num / 8;int countOffset = num % 8 * 4;return (_bits[countIndex] >> countOffset) & 0x7; }//增加計數 void increment(int num) {int countIndex = num / 8;int countOffset = num % 8 * 4;int value = (_bits[countIndex] >> countOffset) & 0x7;if (value < 15) {_bits[countIndex] += (1 << countOffset);} }哈希沖突的處理
既然采用了哈希,就必定會出現沖突的風險,那我們該如何去處理這個問題呢?
我們可以對同一個 key 設置多個計數器,將同一個 key 用不同的哈希函數(或者同一個哈希函數用不同 seed)映射到多個計數器中。然后選取其中最小的值作為計數的結果返回。(防止某些計數器中計數被其他 key 影響而帶來的結果偏大)
問題:這樣做能保證結果的準確性嗎?
答案是不能,我們只能得到一個近似的結果。因為我們最終應用的地方是緩存,我們唯一需要得到的結論就是訪問是否高頻,而不關心其準確的訪問次數是多少,因此在這種場景下我們能夠容忍結果的存在一定的偏差。
偽代碼如下:
//增加計數long long seedArr[]; void Increment(key) {//在每個計數器中使用不同的哈希進行計數,防止沖突for (auto& cmRow : cmRows) {cmRow.Increment(hash(key, seedArr[i]))} }//遍歷多個計數器,選取其中最小的一個計數返回 int getCountMin(hash) {for (auto& cmRow : cmRows) {value = cmRows.get(hash)if(min > value){min = value}}return min; }保鮮機制
LFU 所存在的一大問題就是,其僅僅使用頻次作為淘汰的維度,因此即使有些數據已經很久沒有使用,當由于其具有較高的訪問頻次而無法被淘汰出緩存。
為了解決這一問題,我們就需要定期去降低一些較老數據的頻次(例如設定訪問閾值,當訪問次數達到閾值時觸發保鮮),偽代碼如下:
void get(key, value) {......訪問總頻次++;if(訪問總頻次 < 保鮮閾值){//頻次減半countMinSketch.reset();訪問總頻次 = 0;}...... }通過這種定期的保鮮機制,就能夠防止數據在緩存中大量駐留。
Window-TinyLFU
雖然 TinyLFU 已經足夠優秀,但是它仍然沒有解決突發稀疏流量的問題,因為稀疏流量頻次較低,始終無法在緩存中占據一席之地。但這時我們再思考一下,這不正是 LRU 解決的問題嗎?如果我們在 TinyLRU 中再集合一個 LRU,是不是就完美的解決這個問題了?這也就是其名字中 Window 的由來,其正是 TinyLFU 中集合的一個 Window-LRU 緩存。
架構設計
比起之前的算法,W-TinyLFU 要復雜的多,其架構圖如下:
Window TinyLFU 架構圖其架構主要由以下幾部分組成
- Window LRU:窗口 LRU 緩存,用于存放突發稀疏流量。
- BloomFilter:布隆過濾器,用于避免不必要的緩存置換。
- CountMin Sketch:用于記錄緩存的訪問次數。
- Segmented LRU:主緩存,用于存儲多次命中的數據(即高頻次)。分為 peotected 區和 probation 區。
當數據進來時,首先訪問的是最前端的窗口緩存 W-LRU,這個緩存占據總大小的 1%,主要用于存儲突發的稀疏流量。
緊接著,就到了 TinyLFU 中 BloomFilter 與 CountMin Sketch 的部分。其主要用來記錄緩存是否第一次訪問,以及記錄緩存命中的次數。
主緩存 SLRU 則存儲訪問頻次較高的數據,占用空間 99%,其主要分為兩個區域:protected 和 probation。probation 區即非熱門數據區(命中一次),其占據主緩存大小的 20%,而 protected 則是熱門緩存區(命中兩次及以上),其占用了主緩存大小的 80%。
protected 與 probation 的數據是怎樣流動的呢?
我們在進行 put 操作時,數據只能被放入 probation。只有在 probation 中 get 命中緩存時,才會將數據轉移到 protected 中。此時我們會將新數據與 protected 末尾的數據進行位置交換。
執行流程
查詢
寫入
代碼實現
我用 C++ 實現了一個 Window-TinyLFU 的 demo,github 地址如下:Window-TinyLFU-Cache
具體的代碼實現如下,關鍵處步驟都已寫上注釋。
Window-LRU
enum SEGMENT_ZONE {PROBATION,PROTECTION };template<typename T> struct LRUNode { public:uint32_t _key;T _value;uint32_t _conflict; //在key出現沖突時的輔助hash值bool _flag; //用于標記在緩存中的位置explicit LRUNode(uint32_t key = -1, const T& value = T(), uint32_t conflict = 0, bool flag = PROBATION): _key(key), _value(value), _conflict(conflict), _flag(flag){}};template<typename T> class LRUCache { public:typedef LRUNode<T> LRUNode;explicit LRUCache(size_t capacity): _capacity(capacity), _hashmap(capacity){}std::pair<LRUNode, bool> get(const LRUNode& node){auto res = _hashmap.find(node._key);if (res == _hashmap.end()){return std::make_pair(LRUNode(), false);}//獲取數據的位置typename std::list<LRUNode>::iterator pos = res->second;LRUNode curNode = *pos;//將數據移動到隊首_lrulist.erase(pos);_lrulist.push_front(curNode);res->second = _lrulist.begin();return std::make_pair(curNode, true);}std::pair<LRUNode, bool> put(const LRUNode& node){bool flag = false; //是否置換數據LRUNode delNode;//數據已滿,淘汰末尾元素if (_lrulist.size() == _capacity){delNode = _lrulist.back();_lrulist.pop_back();_hashmap.erase(delNode._key);flag = true;}//插入數據_lrulist.push_front(node);_hashmap.insert(make_pair (node._key, _lrulist.begin()));return std::make_pair(delNode, flag);}size_t capacity() const{return _capacity;}size_t size() const {return _lrulist.size();}private:size_t _capacity;//利用哈希表來存儲數據以及迭代器,來實現o(1)的get和putstd::unordered_map<int, typename std::list<LRUNode>::iterator> _hashmap;//利用雙向鏈表來保存緩存使用情況,并保證o(1)的插入刪除std::list<LRUNode> _lrulist; };Segment-LRU
template<typename T> class SegmentLRUCache { public:typedef LRUNode<T> LRUNode;explicit SegmentLRUCache(size_t probationCapacity, size_t protectionCapacity): _probationCapacity(probationCapacity), _protectionCapacity(protectionCapacity), _hashmap(probationCapacity + protectionCapacity){}std::pair<LRUNode, bool> get(LRUNode& node){//找不到則返回空auto res = _hashmap.find(node._key);if (res == _hashmap.end()){return std::make_pair(LRUNode(), false);}//獲取數據的位置typename std::list<LRUNode>::iterator pos = res->second;//如果查找的值在PROTECTION區中,則直接移動到首部if (node._flag == PROTECTION){_protectionList.erase(pos);_protectionList.push_front(node);res->second = _protectionList.begin();return std::make_pair(node, true);}//如果是PROBATION區的數據,如果PROTECTION還有空位,則將其移過去if (_protectionList.size() < _probationCapacity){node._flag = PROTECTION;_probationList.erase(pos);_protectionList.push_front(node);res->second = _protectionList.begin();return std::make_pair(node, true);}//如果PROTECTION沒有空位,此時就將其最后一個與PROBATION當前元素進行交換位置LRUNode backNode = _protectionList.back();std::swap(backNode._flag, node._flag);_probationList.erase(_hashmap[node._key]);_protectionList.erase(_hashmap[backNode._key]);_probationList.push_front(backNode);_protectionList.push_front(node);_hashmap[backNode._key] = _probationList.begin();_hashmap[node._key] = _protectionList.begin();return std::make_pair(node, true);}std::pair<LRUNode, bool> put(LRUNode& newNode){//新節點放入PROBATION段中newNode._flag = PROBATION;LRUNode delNode;//如果還有剩余空間就直接插入if (_probationList.size() < _probationCapacity || size() < _probationCapacity + _protectionCapacity){_probationList.push_front(newNode);_hashmap.insert(make_pair(newNode._key, _probationList.begin()));return std::make_pair(delNode, false);}//如果沒有剩余空間,就需要淘汰掉末尾元素,然后再將元素插入首部delNode = _probationList.back();_hashmap.erase(delNode._key);_probationList.pop_back();_probationList.push_front(newNode);_hashmap.insert(make_pair(newNode._key, _probationList.begin()));return std::make_pair(delNode, true);}std::pair<LRUNode, bool> victim(){//如果還有剩余的空間,就不需要淘汰if (_probationCapacity + _protectionCapacity > size()){return std::make_pair(LRUNode(), false);}//否則淘汰_probationList的最后一個元素return std::make_pair(_probationList.back(), true);}int size() const{return _protectionList.size() + _probationList.size();}private:size_t _probationCapacity;size_t _protectionCapacity;std::unordered_map<int, typename std::list<LRUNode>::iterator> _hashmap;std::list<LRUNode> _probationList;std::list<LRUNode> _protectionList; };BloomFilter
class BloomFilter { public:explicit BloomFilter(size_t capacity, double fp): _bitsPerKey((getBitsPerKey(capacity, fp) < 32 ? 32 : _bitsPerKey) + 31), _hashCount(getHashCount(capacity, _bitsPerKey)), _bits(_bitsPerKey, 0){//如果哈希函數的數量超過上限,此時準確率就非常低if (_hashCount > MAX_HASH_COUNT){_hashCount = MAX_HASH_COUNT;}}void put(uint32_t hash){int delta = hash >> 17 | hash << 15;for (int i = 0; i < _hashCount; i++){int bitPos = hash % _bitsPerKey;_bits[bitPos / 32] |= (1 << (bitPos % 32));hash += delta;}}bool contains(uint32_t hash) const {//參考leveldb的做法,通過delta來打亂哈希值,模擬進行多次哈希函數int delta = hash >> 17 | hash << 15; //右旋17位,打亂哈希值int bitPos = 0;for (int i = 0; i < _hashCount; i++) {bitPos = hash % _bitsPerKey;if ((_bits[bitPos / 32] & (1 << (bitPos % 32))) == 0) {return false;}hash += delta;}return true;}bool allow(uint32_t hash){if (contains(hash) == false){put(hash);return false;}return true;}void clear() {for (auto& it : _bits) {it = 0;}}private:size_t _bitsPerKey;size_t _hashCount;std::vector<int> _bits;static const int MAX_HASH_COUNT = 30;int getBitsPerKey(int capacity, double fp) const{return std::ceil(-1 * capacity * std::log(fp) / std::pow(std::log(2), 2));}int getHashCount(int capacity, int bitsPerKey) const{return std::round(std::log(2) * bitsPerKey / double(capacity));} };CountMin-Sketch
class CountMinRow { public:explicit CountMinRow(size_t countNum) : _bits((countNum < 8 ? 8 : countNum) / 8, 0){}private:friend class CountMinSketch;std::vector<int> _bits;int get(int num) const {int countIndex = num / 8;int countOffset = num % 8 * 4;return (_bits[countIndex] >> countOffset) & 0x7;}//增加計數void increment(int num) {int countIndex = num / 8;int countOffset = num % 8 * 4;int value = (_bits[countIndex] >> countOffset) & 0x7;if (value < 15) {_bits[countIndex] += (1 << countOffset);}}void clear() {for (auto& it : _bits) {it = 0;}}//保鮮算法,使計數值減半void reset() {for (auto& it : _bits) {it = (it >> 1) & 0x77777777;}} };class CountMinSketch { public:explicit CountMinSketch(size_t countNum): _seed(4){countNum = next2Power(countNum);if (countNum < 8) {countNum = 8;}_cmRows.resize(4, CountMinRow(countNum));_mask = countNum - 1;//利用當前時間作為generator的seedunsigned time = std::chrono::system_clock::now().time_since_epoch().count();std::mt19937 generator(time);for (int i = 0; i < COUNT_MIN_SKETCH_DEPTH; i++) {//隨機生成一個32位seedgenerator.discard(generator.state_size);_seed[i] = generator();}}//選擇幾個cmRow最小的一個作為有效值返回int getCountMin(uint32_t hash) {int min = 16, value = 0;for (int i = 0; i < _cmRows.size(); i++) {value = _cmRows[i].get((hash ^ _seed[i]) & _mask);min = (value < min) ? value : min;}return min;}void Increment(uint32_t hash) {for (int i = 0; i < _cmRows.size(); i++) {_cmRows[i].increment((hash ^ _seed[i]) & _mask);}}void Reset() {for (auto& cmRow : _cmRows) {cmRow.reset();}}void Clear() {for (auto& cmRow : _cmRows) {cmRow.clear();}}private:std::vector<CountMinRow> _cmRows;std::vector<uint32_t> _seed; //用于打散哈希值uint32_t _mask; //掩碼用于取低n位static const int COUNT_MIN_SKETCH_DEPTH = 4;//用于計算下一個最接近x的二次冪int next2Power(int x) {x--;x |= x >> 1;x |= x >> 2;x |= x >> 4;x |= x >> 8;x |= x >> 16;x |= x >> 32;x++;return x;} };Window-TinyLFU
enum CACHE_ZONE {SEGMENT_LRU,Window_LRU };template<typename V> class WindowTinyLFU { public:typedef LRUCache<V> LRUCache;typedef LRUNode<V> LRUNode;typedef SegmentLRUCache<V> SegmentLRUCache;explicit WindowTinyLFU(size_t capacity): _wlru(std::ceil(capacity * 0.01)), _slru(std::ceil(0.2 * (capacity * (1 - 0.01))), std::ceil((1 - 0.2) * (capacity * (1 - 0.01)))), _bloomFilter(capacity, BLOOM_FALSE_POSITIVE_RATE / 100), _cmSketch(capacity), _dataMap(capacity), _totalVisit(0), _threshold(100){}std::pair<V, bool> Get(const std::string& key){uint32_t keyHash = Hash(key.c_str(), key.size(), KEY_HASH_SEED);uint32_t conflictHash = Hash(key.c_str(), key.size(), CONFLICT_HASH_SEED);std::shared_lock<std::shared_mutex> rLock(_rwMutex);return get(keyHash, conflictHash);}std::pair<V, bool> Del(const std::string& key){uint32_t keyHash = Hash(key.c_str(), key.size(), KEY_HASH_SEED);uint32_t conflictHash = Hash(key.c_str(), key.size(), CONFLICT_HASH_SEED);std::unique_lock<std::shared_mutex> wLock(_rwMutex); return del(keyHash, conflictHash);}bool Put(const std::string& key, const V& value){uint32_t keyHash = Hash(key.c_str(), key.size(), KEY_HASH_SEED);uint32_t conflictHash = Hash(key.c_str(), key.size(), CONFLICT_HASH_SEED);std::unique_lock<std::shared_mutex> wLock(_rwMutex);return put(keyHash, value, conflictHash);}static unsigned int Hash(const void* key, int len, int seed){const unsigned int m = 0x5bd1e995;const int r = 24;unsigned int h = seed ^ len;const unsigned char* data = (const unsigned char*)key;while (len >= 4){unsigned int k = *(unsigned int*)data;k *= m;k ^= k >> r;k *= m;h *= m;h ^= k;data += 4;len -= 4;}switch (len){case 3: h ^= data[2] << 16;case 2: h ^= data[1] << 8;case 1: h ^= data[0];h *= m;};h ^= h >> 13;h *= m;h ^= h >> 15;return h;}private:LRUCache _wlru;SegmentLRUCache _slru;BloomFilter _bloomFilter;CountMinSketch _cmSketch;std::shared_mutex _rwMutex; //讀寫鎖,保證線程安全 std::unordered_map<int, LRUNode> _dataMap; //用于記錄數據所在的區域size_t _totalVisit;size_t _threshold; //保鮮機制static const int BLOOM_FALSE_POSITIVE_RATE = 1; static const int KEY_HASH_SEED = 0xbc9f1d34;static const int CONFLICT_HASH_SEED = 0x9ae16a3b;std::pair<V, bool> get(uint32_t keyHash, uint32_t conflictHash){//判斷訪問次數,如果訪問次數達到限制,則觸發保鮮機制++_totalVisit;if (_totalVisit >= _threshold){_cmSketch.Reset();_bloomFilter.clear();_totalVisit = 0;}//在cmSketch對訪問計數_cmSketch.Increment(keyHash);//首先查找map,如果map中沒有找到則說明不存在auto res = _dataMap.find(keyHash);if (res == _dataMap.end()){return std::make_pair(V(), false);}LRUNode node = res->second;V value;//校驗conflictHash是否一致,防止keyHash沖突查到錯誤數據if (node._conflict != conflictHash){return std::make_pair (V(), false);}//判斷數據位于slru還是wlru,進入對應的緩存中查詢if (node._flag == Window_LRU){_wlru.get(node);}else{_slru.get(node);}return std::make_pair(node._value, true);}std::pair<V, bool> del(uint32_t keyHash, uint32_t conflictHash){auto res = _dataMap.find(keyHash);if (res == _dataMap.end()){return std::make_pair(V(), false);}LRUNode node = res->second;//再次驗證conflictHash是否相同,防止由于keyHash沖突導致的誤刪除if (node._conflict != conflictHash){return std::make_pair(V(), false);}_dataMap.erase(keyHash);return std::make_pair(node._value, true);}bool put(uint32_t keyHash, const V& value, uint32_t conflictHash){LRUNode newNode(keyHash, value, conflictHash, Window_LRU);//將數據放入wlru,如果wlru沒滿,則直接返回std::pair<LRUNode, bool> res = _wlru.put(newNode);if (res.second == false){_dataMap[keyHash] = newNode;return true;}//如果此時發生了淘汰,將淘汰節點刪去if (_dataMap[res.first._key]._flag == Window_LRU){_dataMap.erase(res.first._key);}//如果slru沒滿,則插入到slru中。newNode._flag = SEGMENT_LRU;std::pair<LRUNode, bool> victimRes = _slru.victim();if (victimRes.second == false){_dataMap[keyHash] = newNode;_slru.put(newNode);return true;}//如果該值沒有在布隆過濾器中出現過,其就不可能比victimNode高頻,則插入布隆過濾器后返回if (!_bloomFilter.allow(keyHash)){return false;}//如果其在布隆過濾器出現過,此時就對比其和victim在cmSketch中的計數,保留大的那一個int victimCount = _cmSketch.getCountMin(victimRes.first._key);int newNodeCount = _cmSketch.getCountMin(newNode._key);//如果victim大于當前節點,則沒有插入的必要if (newNodeCount < victimCount){return false;}_dataMap[keyHash] = newNode;_slru.put(newNode);//如果滿了,此時發生了淘汰,將淘汰節點刪去if (_dataMap[res.first._key]._flag == SEGMENT_LRU){_dataMap.erase(victimRes.first._key);}return true;} };總結
以上是生活随笔為你收集整理的现代的缓存设计方案:Window-TinyLFU的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: ElasticSearch探索之路(六)
- 下一篇: LevelDB是什么?为什么我们需要K-