TitanDB GC详细实现原理 及其 引入的问题
文章目錄
- 1. 為什么要有GC
- 2. GC的觸發條件
- 3. GC的核心邏輯
- 1. blob file形態
- 2. GC Prepare
- 3. GC pick file
- 4. GC run
- 4. GC 引入的問題
- 5. Titan的測試代碼
通過本篇,能夠從TitanDB的源代碼中看到 key/value 分離之后引入的一些復雜性,這個復雜性很難避免。
主要從如下幾點展開描述:
- GC 觸發條件
- GC 的 核心邏輯
- GC 引入的問題
希望通過對Titan源代碼的分析能夠加深各位對key-value分離策略優劣的理解,為今后自己的業務選型提供參考。
1. 為什么要有GC
Titan 從 Wisckey 中借鑒的key-value分離思想來降低LSM-tree的寫放大 以及LSM-tree的 Compaction 中I/O帶寬的占用。這對于引擎層很薄且大value場景 來說 還是比較適用的,比較薄的引擎層能夠讓應用的性能盡可能能接近引擎,而大value場景中傳統的LSM-tree會攜帶著value進行compaction,這對于傳統的NVME-ssd / SATA-ssd 來說簡直是災難,sst的讀寫I/O隊列是共享的,而帶著大value compaction 場景下的I/O很容易達到帶寬瓶頸,從而讓應用的讀長尾不忍直視。。。
跑偏了,以上是titan或者說key-value分離出現的緣由。而引入了key-value分離,也就是僅僅讓較小數據量的key存放在LSM-tree中,并參與LSM-tree的compaction調度,而大value則單獨存放在另一個地方,所以需要對大value的存放文件進行GC,保正compaction對key的清理 能夠同步到大value中,清理對應的過期value,這是titan正確性保障的基本功能,也是基于lsm-tree的key-value分離系統必須要實現的。
接下來我們詳細看看Titan的實踐過程。
2. GC的觸發條件
前面提到了GC 的作用是清理因為compaction而過期的key的value,所以GC的觸發肯定和compaction同步或者先后順序的;而且compaction 的過程才能知道具體的sst文件的變動情況。
所以titan 的GC通過rocksdb的EventListener 來觸發的。
這里又要說一下Rocksdb的靈活擴展性,為用戶提供了多種多樣能夠操作引擎內部核心邏輯的接口,來讓熟悉的用戶更好得使用引擎。
就像titan GC這里用到的EventListener中,titan只需要在OnCompactionCompleted實現觸發GC的邏輯,就能在compaction完成之后嘗試調度GC,這個時候能夠看到comapction的一些內部信息,非常之方便。
rocksdb compaction中調度EventListener 邏輯如下:
Status DBImpl::BackgroundCompaction(bool* made_progress,JobContext* job_context,LogBuffer* log_buffer,PrepickedCompaction* prepicked_compaction,Env::Priority thread_pri) {...if (c != nullptr) {c->ReleaseCompactionFiles(status);*made_progress = true;#ifndef ROCKSDB_LITE// Need to make sure SstFileManager does its bookkeepingauto sfm = static_cast<SstFileManagerImpl*>(immutable_db_options_.sst_file_manager.get());if (sfm && sfm_reserved_compact_space) {sfm->OnCompactionCompletion(c.get());}
#endif // ROCKSDB_LITE// 調度用戶態實現的 OnCompactionCompletedNotifyOnCompactionCompleted(c->column_family_data(), c.get(), status,compaction_job_stats, job_context->job_id);}...
}
其中NotifyOnCompactionCompleted會執行具體的OnCompactionCompleted的邏輯
void DBImpl::NotifyOnCompactionCompleted(ColumnFamilyData* cfd, Compaction* c, const Status& st,const CompactionJobStats& compaction_job_stats, const int job_id) {...TEST_SYNC_POINT("DBImpl::NotifyOnCompactionCompleted::UnlockMutex");{CompactionJobInfo info{};BuildCompactionJobInfo(cfd, c, st, compaction_job_stats, job_id, current,&info);//執行for (auto listener : immutable_db_options_.listeners) {listener->OnCompactionCompleted(this, info);}}...
}
到現在基本就已經進入了GC的邏輯了,也就是titan實現的void TitanDBImpl::OnCompactionCompleted
3. GC的核心邏輯
講解詳細的GC邏輯之前先整體看看titan的blobfile形態。
1. blob file形態
看看基本的blobfile的內部存儲結構
相關的所有block信息都在blob_format.h中
大體結構和我們rocksdb的sst文件結構類似的,只不過value的存儲上使用的是record,一個record是一個key-value的記錄,而sst中則是一個data-block。
-
blob file header: 整個blob file的header 字段,用來標識當前file的version信息和魔數, 實現了兩個版本,用來兼容rocksdb的blobdb
// Format of blob file header for version 1 (8 bytes): // // +--------------+---------+ // | magic number | version | // +--------------+---------+ // | Fixed32 | Fixed32 | // +--------------+---------+ // // For version 2, there are another 4 bytes for flags: // // +--------------+---------+---------+ // | magic number | version | flags | // +--------------+---------+---------+ // | Fixed32 | Fixed32 | Fixed32 | // +--------------+---------+---------+ -
blob record: 整個blob file存放 key-value的區域
主要包括兩部分:record head 和 record其中 record head 主要保存的是當前 record 內key+value的總大小,還有一個字節標識當前record是否開啟了壓縮
// Format of blob head (9 bytes): // // +---------+---------+-------------+ // | crc | size | compression | // +---------+---------+-------------+ // | Fixed32 | Fixed32 | char | // +---------+---------+-------------+ //這個record head 字段的添加是在 encode recod 的時候寫入的
void BlobEncoder::EncodeRecord(const BlobRecord& record) {record_buffer_.clear();compressed_buffer_.clear();// encode recordCompressionType compression;record.EncodeTo(&record_buffer_);record_ = Compress(compression_info_, record_buffer_, &compressed_buffer_,&compression);// encode record headassert(record_.size() < std::numeric_limits<uint32_t>::max());EncodeFixed32(header_ + 4, static_cast<uint32_t>(record_.size()));header_[8] = compression;// 生成src,并ecode 到頭部字段uint32_t crc = crc32c::Value(header_ + 4, sizeof(header_) - 4);crc = crc32c::Extend(crc, record_.data(), record_.size());EncodeFixed32(header_, crc); }第二部分的record 是主體,存放key-value,是的,沒錯,這里會存放一個key的備份,來所以value。也就是titan相比于rocksdb這里會多存放一份key。
struct BlobRecord {Slice key;Slice value;void EncodeTo(std::string* dst) const;Status DecodeFrom(Slice* src);size_t size() const { return key.size() + value.size(); }friend bool operator==(const BlobRecord& lhs, const BlobRecord& rhs); }; -
blob file meta 這里存放當前blobfile的一些元信息,類似于sst的 properties block(存放當前sst的大小,datablock個數,每個datablock大小,處于哪個層 ,最大最小key等)/filter block(bloom filter)/ compress block/range del block 。blob file這里也是為了方便擴展,也允許加入很多meta block。
// Format of blob file meta (not fixed size): // // +-------------+-----------+--------------+------------+ // | file number | file size | file entries | file level | // +-------------+-----------+--------------+------------+ // | Varint64 | Varint64 | Varint64 | Varint32 | // +-------------+-----------+--------------+------------+ // +--------------------+--------------------+ // | smallest key | largest key | // +--------------------+--------------------+ // | Varint32 + key_len | Varint32 + key_len | // +--------------------+--------------------+ // // The blob file meta is stored in Titan's manifest for quick constructing of // meta infomations of all the blob files in memory. // // Legacy format: // // +-------------+-----------+ // | file number | file size | // +-------------+-----------+ // | Varint64 | Varint64 | // +-------------+-----------+ -
Blob index block,報錯record所處當前blobfile的offset和整個record的size,用來在seek的過程中快速找到對應的blob record
// Format of blob index (not fixed size): // // +------+-------------+------------------------------------+ // | type | file number | blob handle | // +------+-------------+------------------------------------+ // | char | Varint64 | Varint64(offsest) + Varint64(size) | // +------+-------------+------------------------------------+ // // It is stored in LSM-Tree as the value of key, then Titan can use this blob // index to locate actual value from blob file. -
Blob file footer 這是最后一個block,目前只是保存了一個空的index handle 以及
kEncodedLength{kBlobFooterSize}size。
讀blobfile的時候減去kEncodedLength{kBlobFooterSize};之后的偏移地址就是index record的部分。因為現在的實現是沒有meta block的,這里應該是為了預留meta block的index的,所以實際encode到footer的
meta_index_handle內容是空的。
這里說的有點啰嗦了,總之知道底層文件的組織形態,那上層的讀寫細節就很容易把控了。
接下來看看底層的核心設計細節。
2. GC Prepare
前面說了,compaction結束之后會通過OnCompactionCompleted 來觸發。
這個時候正常的GC邏輯中是 要知道 接下來清理哪一些blob,以及清理blob中多少的數據。如果這個時候inplace update,那還需要大量的隨機讀,并且是隨機寫入,后續文件的清理也比較麻煩,哪怕只有一條record也不能清理;所以整體GC的核心讀寫都是和compaction的思想類似的,從文件中讀取record、丟掉過期的record,只保留新寫入到新的blobfile中。勢必消耗較少的cpu,還能保證GC的效率。
這里看一下GC pick file的邏輯,選擇需要參與GC的 blob file。
在Titan中key 從用戶下發,到形成sst/blob file的過程如下:
Flush的過程會根據min_blob_size來區分value的存儲,如果會存放在blobfile中,則它會生成一個上圖中的key-key-index 準備寫入sst文件,這個key-index 包括三列
- 第一列:這個key的value所屬blobfile的filenumber
- 第二列:這個key在blobfile中的record的偏移地址
- 第三列:這個key所在blobfile的record的大小
也就是通過key-index,能夠索引到這個key在blobfile中的record,當這個key-index要被寫入到sst中時 titan為了收集GC需要的信息,會實現一個Rocksdb的TablePropertiesCollector,在flush/compaction形成sst時,將當前sst中key的index信息做一個匯總寫入到table properties block中。這里的匯總信息包括上圖中最左側的部分,當然實際上當前sst中對應多少個blobfile就會有多少條(一個map數據結構,以blob file number為key,對應的record size為value)。
有了properties的信息之后,就能夠知道compaction完成之后輸入的sst文件的properties和輸出的sst文件的blob file中key的變動情況(blob record也存儲了對應的key-value),這樣就能決策哪一些blobfile是否達到GC的觸發條件(關于GC的觸發條件后面會說)。
大體就是輸入的sst文件總的properties信息 和 輸出的propertis的blobfile信息做差值,就知道最后每個blobfile會被丟棄多少的數據。
這個數據會存放在 blob_file_size_diff變量中, 對應的處理代碼如下:
void TitanDBImpl::OnCompactionCompleted(const CompactionJobInfo& compaction_job_info) {...// lamda 表達式,處理inputs 時 to_add為false,ExtractGCStatsFromTableProperty 僅僅是累積// 處理outputs時,會將to_add置為true,則會和之前的做diff,這樣就知道每個blobfile丟棄的record大小情況auto update_diff = [&](const std::vector<std::string>& files, bool to_add) {for (const auto& file_name : files) {auto prop_iter = prop_collection.find(file_name);if (prop_iter == prop_collection.end()) {ROCKS_LOG_WARN(db_options_.info_log,"OnCompactionCompleted[%d]: No table properties for file %s.",compaction_job_info.job_id, file_name.c_str());continue;}Status gc_stats_status = ExtractGCStatsFromTableProperty(prop_iter->second, to_add, &blob_file_size_diff);if (!gc_stats_status.ok()) {// TODO: Should treat it as background error and make DB read-only.ROCKS_LOG_ERROR(db_options_.info_log,"OnCompactionCompleted[%d]: failed to extract GC stats from table ""property: compaction file: %s, error: %s",compaction_job_info.job_id, file_name.c_str(),gc_stats_status.ToString().c_str());assert(false);}}};update_diff(compaction_job_info.input_files, false /*to_add*/);update_diff(compaction_job_info.output_files, true /*to_add*/);...
}
有了每次compaction前后 每個 blobfile中過期數據量的情況,接下來titan會為上面blob_file_size_diff map中的每一個blob file 維護一個state,用來標識這個文件中的“垃圾”數據的比例,計算方式很簡單 ,大體就是1 - live_data_size / file_size:
double GetDiscardableRatio() const {if (file_size_ == 0) {return 0;}// TODO: Exclude meta blocks from file sizereturn 1 - (static_cast<double>(live_data_size_) /(file_size_ - kBlobMaxHeaderSize - kBlobFooterSize));
}
而這個數據則會被當作當前blobfile 是否會被選擇參與GC的score 標準。
計算每個blobfile的gc score 邏輯如下:
需要注意的是如果一個blobfile 過小(默認小于8M),會為其設置0.5的core,從而加快小文件的回收。
gc_score_ 會從大到小排個序,后續的GC過程中的blobfile文件挑選會優先挑選gc_score較高的。
void BlobStorage::ComputeGCScore() {// TODO: no need to recompute all everytimeMutexLock l(&mutex_);gc_score_.clear();for (auto& file : files_) {if (file.second->is_obsolete()) {continue;}gc_score_.push_back({});auto& gcs = gc_score_.back();gcs.file_number = file.first;if (file.second->file_size() < cf_options_.merge_small_file_threshold) {// for the small file or file with gc mark (usually the file that just// recovered) we want gc these file but more hope to gc other file with// more invalid datagcs.score = cf_options_.blob_file_discardable_ratio;} else {gcs.score = file.second->GetDiscardableRatio();}}// gc_score會從大到小排序std::sort(gc_score_.begin(), gc_score_.end(),[](const GCScore& first, const GCScore& second) {return first.score > second.score;});
}
接下來有了每個文件的gc_sore信息就可以正式調度一個gc job 了,默認只能調度一個GCjob。
設置一個的目的并不是說有數據沖突/一致性問題,而是GC的I/O代價太高,需要限制I/O,保證應用的延時
void TitanDBImpl::MaybeScheduleGC() {mutex_.AssertHeld();if (db_options_.disable_background_gc) return;if (shuting_down_.load(std::memory_order_acquire)) return;while (unscheduled_gc_ > 0 &&bg_gc_scheduled_ < db_options_.max_background_gc) {unscheduled_gc_--;bg_gc_scheduled_++;// 調度gc job 開始異步GCthread_pool_->SubmitJob(std::bind(&TitanDBImpl::BGWorkGC, this));}
}
3. GC pick file
接下來順著代碼,會進入到GC 入口函數 TitanDBImpl::BackgroundGC,首先會挑選當前需要參與GC 的blob file。
- 首先根據上面prepare過程中維護的 gc_socre 來確認當前文件是否符合gc的基本條件。即要求每一個core都大于等于
cf_options_.blob_file_discardable_ratio,默認是0.5 - 滿足的話 找到這個文件的標識,并添加到GC文件 數組
blob_files中 - 如果累積的gc file總文件大小超過 gc batch size:
cf_options_.max_gc_batch_size(默認1G) 或者 有效總數據超過cf_options_.blob_file_target_size(默認256M) ,則認為當前job文件挑選夠了
std::unique_ptr<BlobGC> BasicBlobGCPicker::PickBlobGC(BlobStorage* blob_storage) {Status s;std::vector<std::shared_ptr<BlobFileMeta>> blob_files;...for (auto& gc_score : blob_storage->gc_score()) {// score 是否滿足GC 最低score的要求if (gc_score.score < cf_options_.blob_file_discardable_ratio) {break;}// 確認這個文件存在auto blob_file = blob_storage->FindFile(gc_score.file_number).lock();if (!CheckBlobFile(blob_file.get())) {// Skip this file id this file is being GCed// or this file had been GCedROCKS_LOG_INFO(db_options_.info_log, "Blob file %" PRIu64 " no need gc",blob_file->file_number());continue;}// 繼續挑選當前job參與GC的文件if (!stop_picking) {blob_files.emplace_back(blob_file);batch_size += blob_file->file_size();estimate_output_size += blob_file->live_data_size();// 是否達到了GC的batch大小以及 有效數據量大小閾值的要求,滿足則停止挑選文件if (batch_size >= cf_options_.max_gc_batch_size ||estimate_output_size >= cf_options_.blob_file_target_size) {// Stop pick file for this gc, but still check file for whether need// trigger gc after thisstop_picking = true;}} else { // 完成挑選,將剩下的文件放在下一個job中。next_gc_size += blob_file->file_size();if (next_gc_size > cf_options_.min_gc_batch_size) {maybe_continue_next_time = true;RecordTick(statistics(stats_), TITAN_GC_REMAIN, 1);ROCKS_LOG_INFO(db_options_.info_log,"remain more than %" PRIu64" bytes to be gc and trigger after this gc",next_gc_size);break;}}}...return std::unique_ptr<BlobGC>(new BlobGC(std::move(blob_files), std::move(cf_options_), maybe_continue_next_time));
}
4. GC run
實際函數主體的入口是DoRunGC(), 這一部分就是類似于comapction的調度邏輯了。
-
構造一個最小堆 迭代器,并為參與GC的所有文件也都維護一個迭代器, 每次迭代器的移動(next)都會從 多個blobfile中取一個record,將key 按照compartor放在最小堆里。
-
從迭代器直接取key(最小堆里的堆頂,讀上來的最小的key),去sst中反查key是否存在
- 存在則認為是不可丟棄的,會寫入到新的blob文件中。
- 否則就會被丟棄,迭代器直接跳過這個key的處理。
-
一邊通過迭代器讀取,一邊將從最小堆中取出的key 確認不會被丟棄就寫入到新的blobfile中
-
通過write callback ,將寫入到新的blobfile的key再回寫到lsm-tree中。因為 value已經放到了新的blobfile中了,但是lsm-tree中的key并不知道這一點。所以還需要重寫一次。
其中迭代器處理構造一個最小堆的過程如下:
void BlobFileMergeIterator::SeekToFirst() {for (auto& iter : blob_file_iterators_) {iter->SeekToFirst();// 將每一個blobfile問的迭代器添加到最小堆中if (iter->status().ok() && iter->Valid()) min_heap_.push(iter.get());}if (!min_heap_.empty()) {current_ = min_heap_.top();min_heap_.pop();} else {status_ = Status::Aborted("No iterator is valid");}
}
判斷一個key是否能夠被丟棄,邏輯如下:
Status BlobGCJob::DiscardEntry(const Slice& key, const BlobIndex& blob_index,bool* discardable) {TitanStopWatch sw(env_, metrics_.gc_read_lsm_micros);assert(discardable != nullptr);PinnableSlice index_entry;bool is_blob_index = false;// 從SST文件先讀一次,確認是否存在Status s = base_db_impl_->GetImpl(ReadOptions(), blob_gc_->column_family_handle(), key, &index_entry,nullptr /*value_found*/, nullptr /*read_callback*/, &is_blob_index);if (!s.ok() && !s.IsNotFound()) {return s;}// count read bytes for checking LSM entrymetrics_.gc_bytes_read += key.size() + index_entry.size();// 找不到,則確認可以丟棄,更新丟棄標記。if (s.IsNotFound() || !is_blob_index) {// Either the key is deleted or updated with a newer version which is// inlined in LSM.*discardable = true;return Status::OK();}......return Status::OK();
}
寫入到新的blobfile邏輯如下:
void BlobFileBuilder::Add(const BlobRecord& record, BlobHandle* handle) {if (!ok()) return;encoder_.EncodeRecord(record);handle->offset = file_->GetFileSize();handle->size = encoder_.GetEncodedSize();live_data_size_ += handle->size;// 追加寫入,先寫每個record的head// 每個record的內容前面已經描述過了。status_ = file_->Append(encoder_.GetHeader());if (ok()) {// 再寫recordstatus_ = file_->Append(encoder_.GetRecord());num_entries_++;// The keys added into blob files are in order.if (smallest_key_.empty()) {smallest_key_.assign(record.key.data(), record.key.size());}assert(cf_options_.comparator->Compare(record.key, Slice(smallest_key_)) >=0);assert(cf_options_.comparator->Compare(record.key, Slice(largest_key_)) >=0);largest_key_.assign(record.key.data(), record.key.size());}
}
重寫回寫key到lsm-tree中的邏輯如下:
new_blob_index.EncodeToBase(&index_entry);
// Store WriteBatch for rewriting new Key-Index pairs to LSM
// 通過writecallback來回寫
GarbageCollectionWriteCallback callback(cfh, blob_record.key.ToString(),std::move(blob_index));
callback.value = index_entry;
rewrite_batches_.emplace_back(std::make_pair(WriteBatch(), std::move(callback)));
auto& wb = rewrite_batches_.back().first;
s = WriteBatchInternal::PutBlobIndex(&wb, cfh->GetID(), blob_record.key,index_entry);
4. GC 引入的問題
從上面的GC核心處理邏輯中,我們能夠看到GC的實現相比于compaction還是很簡單的,可能是因為blobfile的各種meta block功能沒有上全,所以gc過程并不會特別復雜。
需要注意的是GC 的處理細節:
- 從blobfile中取到的record,還需要拿著key再去lsm-tree中讀一下。因為不確定這個key是存在還是不存在,需要額外的一次讀。
- 寫完blobfile之后還需要對lsm-tree中的key進行一次更新,因為lsm-tree中的key不知道它的value已經寫入到了新的blobfile中。又需要一次額外的寫。
- 更嚴重的是GC會帶著大value讀寫,這對于當下 共享讀寫隊列 的硬件來說,是一個長尾災難。寫入延時的增大會造成讀的長尾。
這有錢的可以用intel optane ssd p5800,據說讀寫隊列分離,互不影響;沒錢的加限速唄。。。。
假如我們僅僅是使用titan,并不想改動/優化 一下titan的話,那很簡單,讓value-size : key-size 差異足夠大,這個時候GC的這兩個額外讀和額外寫相比于GC本身的value讀寫,都降低很多了。lsm-tree中key的密度大了,文件個數相對較少。
假如我們key-size:10B ,value size: 64KB,有一塊6T的SSD,不考慮寫放大的情況,整個盤都寫滿,key+key-index形成的sst所占用的空間也還不到2G,LSM-tree也就2層。。。。即使考慮上寫放大, 也就最多到第三層。且寫放大完全可以通過level_compaction_dynamic_level_bytes降低到最小。這樣的LSM-tree中的查找基本不會消耗太多的I/O,很快的。
而如果想要改進Titan,讓它的性能進一步提升,需要對源代碼有足夠深入的了解和思考。
歡迎一起討論!!!
5. Titan的測試代碼
使用如下代碼可以測試使用titan和不使用titan的一些性能對比,使用titan的話-use_titan=true即可
#include <unistd.h>
#include <atomic>
#include <iostream>
#include <random>
#include <string>
#include <thread>#include <sys/time.h>#include "rocksdb/db.h"
#include "rocksdb/table.h"
#include "rocksdb/cache.h"
#include "rocksdb/slice_transform.h"
#include "rocksdb/filter_policy.h"
#include "rocksdb/write_batch.h"
#include "rocksdb/rate_limiter.h"
#include "rocksdb/perf_context.h"
#include "rocksdb/iostats_context.h"
#include "gflags/gflags.h"#include "titan/db.h"
#include "titan/options.h"using namespace google;DEFINE_int64(read_ratio,0,"read threads' num");
DEFINE_int64(time,1200,"read threads' ratio");
DEFINE_int64(thread_num,64,"total threads ");
DEFINE_bool(rate_limiter,false,"use rocksdb ratelimiter");
DEFINE_bool(use_dynamic,false,"use dynamic level size");
DEFINE_int64(limit_size, 512, "rate limiter size");
DEFINE_string(db_dir,"./rocksdb_data","rocksdb data's directory");
DEFINE_int64(multidb_nums,1,"the number of multidb");
DEFINE_int64(value_len, 1024, "value size");
DEFINE_bool(use_titan, false, "use titandb");
DEFINE_bool(disable_wal, false, "disable wal");std::atomic<long> g_op_W;
std::atomic<long> g_op_R;// 這里測試過程中為了區分兩種db,聲明了兩個
// 實際的話可以只用一種std::vector<rocksdb::DB*> src_db;
// 實例化完成一個titandb之后 可以將該titandb 賦值給src_db,
// 即 src_db[0] = titan_db;
std::vector<rocksdb::DB*> src_db;
std::vector<rocksdb::titandb::TitanDB*> titan_db;
rocksdb::Options options;
rocksdb::titandb::TitanOptions titan_options;
std::mt19937 generator_; // 生成偽隨機數static double now()
{struct timeval t;gettimeofday(&t, NULL);return t.tv_sec + t.tv_usec / 1e6;
}void SetOptions() {options.create_if_missing = true;options.compression = rocksdb::kNoCompression;options.statistics = rocksdb::CreateDBStatistics();options.stats_persist_period_sec = 10;options.stats_dump_period_sec = 10;std::shared_ptr<rocksdb::Cache> cache = rocksdb::NewLRUCache(10737418240);rocksdb::BlockBasedTableOptions bbto;bbto.whole_key_filtering = true;bbto.cache_index_and_filter_blocks = true;bbto.filter_policy.reset(rocksdb::NewBloomFilterPolicy(16,false));bbto.block_cache = cache;options.table_factory.reset(rocksdb::NewBlockBasedTableFactory(bbto));options.max_background_compactions = 32;if(FLAGS_rate_limiter) {options.strict_bytes_per_sync = true;options.bytes_per_sync = 1024*1024;options.rate_limiter.reset(rocksdb::NewGenericRateLimiter(FLAGS_limit_size*1024*1024,100*1000,10,rocksdb::RateLimiter::Mode::kWritesOnly,true));}if(FLAGS_use_titan) {titan_options = options;}
}
void OpenDB() {SetOptions();for (int i = 1;i <= FLAGS_multidb_nums; i ++) {if (FLAGS_use_titan) {rocksdb::titandb::TitanDB* tmp_db;auto s = rocksdb::titandb::TitanDB::Open(titan_options, FLAGS_db_dir+std::to_string(i), &tmp_db);if (!s.ok()) {std::cout << "open failed :" << s.ToString() << std::endl;}titan_db.push_back(tmp_db);} else {rocksdb::DB* tmp_db;auto s = rocksdb::DB::Open(options, FLAGS_db_dir+std::to_string(i), &tmp_db);if (!s.ok()) {std::cout << "open failed :" << s.ToString() << std::endl;}src_db.push_back(tmp_db);}}
}void DBWrite(int num) {double ts = now();int db_num = num % FLAGS_multidb_nums;while (true) {std::string key = std::to_string(generator_());std::string value(FLAGS_value_len, 'x');if(num == 0) {rocksdb::SetPerfLevel(rocksdb::PerfLevel::kEnableTimeExceptForMutex);rocksdb::get_perf_context()->Reset();rocksdb::get_iostats_context()->Reset();}rocksdb::WriteOptions wo;wo.disableWAL = FLAGS_disable_wal;if(FLAGS_use_titan) {titan_db[db_num]->Put(wo, "test_graph_"+key, value);} else {src_db[db_num]->Put(wo, "test_graph_"+key, value);}++g_op_W;if(num == 0 && now() - ts >= 1) { // 每隔一秒,打印一次0號線程的延時數據rocksdb::SetPerfLevel(rocksdb::PerfLevel::kDisable);std::cout<< "\nwrite_wal_time "<< rocksdb::get_perf_context()->write_wal_time<< "\nwrite_memtable_time "<< rocksdb::get_perf_context()->write_memtable_time<< "\nwrite_delay_time "<< rocksdb::get_perf_context()->write_delay_time<< std::endl;ts = now();}}
}void DBRead(int num) {std::string value;double ts = now();int db_num = num % FLAGS_multidb_nums;while (true) {std::string key = std::to_string(generator_());if(num == 0) { // 為0號線程開啟 perfrocksdb::SetPerfLevel(rocksdb::PerfLevel::kEnableTimeExceptForMutex);rocksdb::get_perf_context()->Reset();rocksdb::get_iostats_context()->Reset();}if(FLAGS_use_titan) {titan_db[db_num]->Get(rocksdb::ReadOptions(), key, &value);} else {src_db[db_num]->Get(rocksdb::ReadOptions(), key, &value);}++g_op_R;if(num == 0 && now() - ts >= 1) {rocksdb::SetPerfLevel(rocksdb::PerfLevel::kDisable);std::cout<< "\nget_from_memtable_time: " << rocksdb::get_perf_context()->get_from_memtable_time<< "\nget_from_output_files_time: " << rocksdb::get_perf_context()->get_from_output_files_time<< "\nread_nanos "<< rocksdb::get_iostats_context()-> read_nanos<< std::endl;ts = now();}}
}int main(int argc, char** argv) {ParseCommandLineFlags(&argc,&argv, true);OpenDB();if (FLAGS_multidb_nums > FLAGS_thread_num) {std::cout << "multidb nums bigger than thread num, invalid" << std::endl;return -1;}int write_threads = (int)(FLAGS_thread_num - (double)FLAGS_read_ratio / 100.0 * FLAGS_thread_num);for (int i = 0;i < FLAGS_thread_num - write_threads ; i++) {new std::thread(DBRead, i);}for(int i = 0;i < write_threads; ++i) {new std::thread(DBWrite, i);}long last_opn_R = 0;long last_opn_W = 0;int count = FLAGS_time;while(count > 0) {sleep(1);long nopn_R = g_op_R;long nopn_W = g_op_W;std::cout << "read_speed : " << nopn_R - last_opn_R << std::endl;std::cout << "write_speed : " << nopn_W - last_opn_W << std::endl;last_opn_R = nopn_R;last_opn_W = nopn_W;count --;std::string out;// src_db[0]->GetProperty("rocksdb.stats", &out); // 每一層的延時信息// fprintf(stdout, "rocksdb.stats: %s\n", out.c_str());}return 0;
}
總結
以上是生活随笔為你收集整理的TitanDB GC详细实现原理 及其 引入的问题的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 光绪元宝铜块值多少钱?
- 下一篇: “非惜年芳绝”下一句是什么