bucket list 函数解析
cls_bucket_list 函數
librados::IoCtx index_ctx;
? // key?? - oid (for different shards if there is any)
? // value - list result for the corresponding oid (shard), it is filled by the AIO callback
? map<int, string> oids;
? map<int, struct rgw_cls_list_ret> list_results;
? int r = open_bucket_index(bucket, index_ctx, oids, shard_id);
? if (r < 0)
??? return r;
? cls_rgw_obj_key start_key(start.name, start.instance);
? r = CLSRGWIssueBucketList(index_ctx, start_key, prefix, num_entries, list_versions,
??????????????????????????? oids, list_results, cct->_conf->rgw_bucket_index_max_aio)();
? if (r < 0)
??? return r;
獲取桶的shard 對象,存入oids map中. 獲取的內容存儲到list_results
? // Create a list of iterators that are used to iterate each shard
? vector<map<string, struct rgw_bucket_dir_entry>::iterator> vcurrents(list_results.size());
? vector<map<string, struct rgw_bucket_dir_entry>::iterator> vends(list_results.size());
? vector<string> vnames(list_results.size());
? map<int, struct rgw_cls_list_ret>::iterator iter = list_results.begin();
? *is_truncated = false;
? for (; iter != list_results.end(); ++iter) {
? ??vcurrents.push_back(iter->second.dir.m.begin());
??? vends.push_back(iter->second.dir.m.end());
??? vnames.push_back(oids[iter->first]);
??? *is_truncated = (*is_truncated || iter->second.is_truncated);
? }
處理list_results, list_results容器中存放的是桶各個shard的對象.
? // Create a map to track the next candidate entry from each shard, if the entry
? // from a specified shard is selected/erased, the next entry from that shard will
? // be inserted for next round selection
? map<string, size_t> candidates;
? for (size_t i = 0; i < vcurrents.size(); ++i) {
??? if (vcurrents[i] != vends[i]) {
????? candidates[vcurrents[i]->first] = i;
??? }
? }
創建一個map用于跟蹤各個bucket shard的
??? // Select the next one
??? int pos = candidates.begin()->second;
??? const string& name = vcurrents[pos]->first;
??? struct rgw_bucket_dir_entry& dirent = vcurrents[pos]->second;
?
??? // fill it in with initial values; we may correct later
??? RGWObjEnt e;
??? e.key.set(dirent.key.name, dirent.key.instance);
??? e.size = dirent.meta.size;
??? e.accounted_size = dirent.meta.accounted_size;
??? e.mtime = dirent.meta.mtime;
??? e.etag = dirent.meta.etag;
??? e.owner = dirent.meta.owner;
??? e.owner_display_name = dirent.meta.owner_display_name;
??? e.content_type = dirent.meta.content_type;
??? e.tag = dirent.tag;
??? e.flags = dirent.flags;
e.versioned_epoch = dirent.versioned_epoch;
獲取到對應的bucket_entry的值.
??? bool force_check = force_check_filter && force_check_filter(dirent.key.name);
??? if ((!dirent.exists && !dirent.is_delete_marker()) || !dirent.pending_map.empty() || force_check) {
????? /* there are uncommitted ops. We need to check the current state,
?????? * and if the tags are old we need to do cleanup as well. */
????? librados::IoCtx sub_ctx;
????? sub_ctx.dup(index_ctx);
????? r = check_disk_state(sub_ctx, bucket, dirent, e, updates[vnames[pos]]);
????? if (r < 0 && r != -ENOENT) {
????????? return r;
????? }
}
判斷是否應該進入更新,判斷條件
??? if (r >= 0) {
???? ?ldout(cct, 10) << "RGWRados::cls_bucket_list: got " << e.key.name << "[" << e.key.instance << "]" << dendl;
????? m[name] = std::move(e);
????? ++count;
}
檢查完成, 將bucket_index entry 復制給m表.
?? // Refresh the candidates map
candidates.erase(candidates.begin());
?
??? ++vcurrents[pos];
??? if (vcurrents[pos] != vends[pos]) {
????? candidates[vcurrents[pos]->first] = pos;
}
刷新更新表, 繼續解析下一個對象.
?
? // Suggest updates if there is any
? map<string, bufferlist>::iterator miter = updates.begin();
? for (; miter != updates.end(); ++miter) {
??? if (miter->second.length()) {
????? ObjectWriteOperation o;
????? cls_rgw_suggest_changes(o, miter->second);
????? // we don't care if we lose suggested updates, send them off blindly
????? AioCompletion *c = librados::Rados::aio_create_completion(NULL, NULL, NULL);
????? index_ctx.aio_operate(miter->first, c, &o);
??????? c->release();
??? }
? }
更新對象操作, 重點參考 cls_rgw_suggest_changes 函數
? // Check if all the returned entries are consumed or not
? for (size_t i = 0; i < vcurrents.size(); ++i) {
??? if (vcurrents[i] != vends[i])
????? *is_truncated = true;
? }
? if (!m.empty())
*last_entry = m.rbegin()->first;
最后設置is_truncated的值? 設置last_entry的值
?
?
check_disk_state
函數說明: 檢查磁盤上的對象的狀態
? rgw_obj obj;
? std::string oid, instance, loc, ns;
? rgw_obj_key key;
? key.set(list_state.key);
? oid = key.name;
? if (!rgw_obj::strip_namespace_from_object(oid, ns, instance)) {
??? // well crap
??? assert(0 == "got bad object name off disk");
? }
? obj.init(bucket, oid);
? obj.set_loc(list_state.locator);
? obj.set_ns(ns);
? obj.set_instance(key.instance);
? get_obj_bucket_and_oid_loc(obj, bucket, oid, loc);
? io_ctx.locator_set_key(loc);
?
? RGWObjState *astate = NULL;
? RGWObjectCtx rctx(this);
? int r = get_obj_state(&rctx, obj, &astate, NULL);
? if (r < 0)
??? return r;
?
? list_state.pending_map.clear(); // we don't need this and it inflates size
? if (!astate->exists) {
????? /* object doesn't exist right now -- hopefully because it's
?????? * marked as !exists and got deleted */
?? ?if (list_state.exists) {
????? /* FIXME: what should happen now? Work out if there are any
?????? * non-bad ways this could happen (there probably are, but annoying
?????? * to handle!) */
??? }
??? // encode a suggested removal of that key
??? list_state.ver.epoch = io_ctx.get_last_version();
??? list_state.ver.pool = io_ctx.get_id();
??? cls_rgw_encode_suggestion(CEPH_RGW_REMOVE, list_state, suggested_updates);
??? return -ENOENT;
? }
?
? string etag;
? string content_type;
? ACLOwner owner;
?
? object.size = astate->size;
? object.mtime = astate->mtime;
?
? map<string, bufferlist>::iterator iter = astate->attrset.find(RGW_ATTR_ETAG);
? if (iter != astate->attrset.end()) {
??? etag = iter->second.c_str();
? }
? iter = astate->attrset.find(RGW_ATTR_CONTENT_TYPE);
? if (iter != astate->attrset.end()) {
??? content_type = iter->second.c_str();
? }
? iter = astate->attrset.find(RGW_ATTR_ACL);
? if (iter != astate->attrset.end()) {
??? r = decode_policy(iter->second, &owner);
??? if (r < 0) {
????? dout(0) << "WARNING: could not decode policy for object: " << obj << dendl;
??? }
? }
?
? if (astate->has_manifest) {
??? RGWObjManifest::obj_iterator miter;
??? RGWObjManifest& manifest = astate->manifest;
??? for (miter = manifest.obj_begin(); miter != manifest.obj_end(); ++miter) {
????? rgw_obj loc = miter.get_location();
?
????? if (loc.ns == RGW_OBJ_NS_MULTIPART) {
??????? //dout(10) << "check_disk_state(): removing manifest part from index: " << loc << dendl;
??????? r = delete_obj_index(loc);
??????? if (r < 0) {
????????? dout(0) << "WARNING: delete_obj_index() returned r=" << r << dendl;
??????? }
????? }
??? }
? }
?
? object.etag = etag;
? object.content_type = content_type;
? object.owner = owner.get_id();
? object.owner_display_name = owner.get_display_name();
?
? // encode suggested updates
? list_state.ver.pool = io_ctx.get_id();
? list_state.ver.epoch = astate->epoch;
? list_state.meta.size = object.size;
? list_state.meta.mtime = object.mtime;
? list_state.meta.category = main_category;
? list_state.meta.etag = etag;
? list_state.meta.content_type = content_type;
? if (astate->obj_tag.length() > 0)
??? list_state.tag = astate->obj_tag.c_str();
? list_state.meta.owner = owner.get_id().to_str();
? list_state.meta.owner_display_name = owner.get_display_name();
?
? list_state.exists = true;
? cls_rgw_encode_suggestion(CEPH_RGW_UPDATE, list_state, suggested_updates);
? return 0;
?
cls_rgw_suggest_changes
cls_rgw.cc? rgw_dir_suggest_changes
struct rgw_bucket_dir_header {
? map<uint8_t, rgw_bucket_category_stats> stats;
? uint64_t tag_timeout;
? uint64_t ver;
? uint64_t master_ver;
? string max_marker;
?
? rgw_bucket_dir_header() : tag_timeout(0), ver(0), master_ver(0) {}
};
Bucket header的結構體:
Tag_timeout :? pending_map項的時間戳和當前時間相比,相差超過tag-timeout,則刪除pending_map項.
Ver : 每次更新都會增加1
Master_ver : ??? 檢查代碼中只有獲取,沒有賦值的地方.
Next_marker : ??
rgw_bucket_dir_header持久化為omap header
? CLS_LOG(1, "rgw_dir_suggest_changes()");
? bufferlist header_bl;
? struct rgw_bucket_dir_header header;
? bool header_changed = false;
? int rc = read_bucket_header(hctx, &header);
? if (rc < 0) {
??? CLS_LOG(1, "ERROR: rgw_dir_suggest_changes(): failed to read header\n");
??? return rc;
? }
取出桶的header信息.
?
timespan tag_timeout(header.tag_timeout ? header.tag_timeout : CEPH_RGW_TAG_TIMEOUT);
計算超時時長.
? while (!in_iter.end()) {
??? __u8 op;
??? rgw_bucket_dir_entry cur_change;
??? rgw_bucket_dir_entry cur_disk;
??? try {
????? ::decode(op, in_iter);
????? ::decode(cur_change, in_iter);
??? } catch (buffer::error& err) {
????? CLS_LOG(1, "ERROR: rgw_dir_suggest_changes(): failed to decode request\n");
????? return -EINVAL;
??? }
??? //decode dir_key
??? bufferlist cur_disk_bl;
??? string cur_change_key;
??? encode_obj_index_key(cur_change.key, &cur_change_key);
??? int ret = cls_cxx_map_get_val(hctx, cur_change_key, &cur_disk_bl);
??? if (ret < 0 && ret != -ENOENT)
????? return -EINVAL;
??? //獲取osd中對象信息
??? if (cur_disk_bl.length()) {
????? bufferlist::iterator cur_disk_iter = cur_disk_bl.begin();
????? try {
??????? ::decode(cur_disk, cur_disk_iter);
????? } catch (buffer::error& error) {
??????? CLS_LOG(1, "ERROR: rgw_dir_suggest_changes(): failed to decode cur_disk\n");
??????? return -EINVAL;
????? }
?
????? real_time cur_time = real_clock::now();
????? map<string, struct rgw_bucket_pending_info>::iterator iter =
??????????????? cur_disk.pending_map.begin();
????? while(iter != cur_disk.pending_map.end()) {
??????? map<string, struct rgw_bucket_pending_info>::iterator cur_iter=iter++;
??????? if (cur_time > (cur_iter->second.timestamp + tag_timeout)) {
????????? cur_disk.pending_map.erase(cur_iter);
??????? }
????? ??//如果超時了.則刪除這個pending_map,這個可能是安全性的檢查.
????? }
??? }
?
??? CLS_LOG(20, "cur_disk.pending_map.empty()=%d op=%d cur_disk.exists=%d cur_change.pending_map.size()=%d cur_change.exists=%d\n",
???????? ??? cur_disk.pending_map.empty(), (int)op, cur_disk.exists,
???????? ??? (int)cur_change.pending_map.size(), cur_change.exists);
?
??? if (cur_disk.pending_map.empty()) {
????? if (cur_disk.exists) {
??????? struct rgw_bucket_category_stats& old_stats = header.stats[cur_disk.meta.category];
??????? CLS_LOG(10, "total_entries: %" PRId64 " -> %" PRId64 "\n", old_stats.num_entries, old_stats.num_entries - 1);
??????? old_stats.num_entries--;
??????? old_stats.total_size -= cur_disk.meta.accounted_size;
??????? old_stats.total_size_rounded -= get_rounded_size(cur_disk.meta.accounted_size);
??????? header_changed = true;
????? }
????? struct rgw_bucket_category_stats& stats =
????????? header.stats[cur_change.meta.category];
????? switch(op) {
????? case CEPH_RGW_REMOVE:
??????? CLS_LOG(10, "CEPH_RGW_REMOVE name=%s instance=%s\n", cur_change.key.name.c_str(), cur_change.key.instance.c_str());
???????? ret = cls_cxx_map_remove_key(hctx, cur_change_key);
???????? if (ret < 0)
???????? ? return ret;
??????? break;
?? ???case CEPH_RGW_UPDATE:
??????? CLS_LOG(10, "CEPH_RGW_UPDATE name=%s instance=%s total_entries: %" PRId64 " -> %" PRId64 "\n",
??????????????? cur_change.key.name.c_str(), cur_change.key.instance.c_str(), stats.num_entries, stats.num_entries + 1);
??? ????//統計更新
??????? stats.num_entries++;
??????? stats.total_size += cur_change.meta.accounted_size;
??????? stats.total_size_rounded += get_rounded_size(cur_change.meta.accounted_size);
??????? header_changed = true;
??????? cur_change.index_ver = header.ver;
??????? bufferlist cur_state_bl;
??????? ::encode(cur_change, cur_state_bl);
??????? ret = cls_cxx_map_set_val(hctx, cur_change_key, &cur_state_bl);
??????? if (ret < 0)
???????? ? return ret;
??????? break;
????? }
??? }
? }
轉載于:https://www.cnblogs.com/damizhou/p/6248204.html
總結
以上是生活随笔為你收集整理的bucket list 函数解析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: fetch用英语解释_fetch是什么意
- 下一篇: 什么是域名邮箱?发出的邮件可以撤回吗?