MongoDB 4.2 内核解析 - Change Stream
MongoDB 從3.6版本開始支持了 Change Stream 能力(4.0、4.2 版本在能力上做了很多增強),用于訂閱 MongoDB 內部的修改操作,change stream 可用于 MongoDB 之間的增量數據遷移、同步,也可以將 MongoDB 的增量訂閱應用到其他的關聯系統;比如電商場景里,MongoDB 里存儲新的訂單信息,業務需要根據新增的訂單信息去通知庫存管理系統發貨。
Change Stream 與 Tailing Oplog 對比
在 change stream 功能之前,如果要獲取 MongoDB 增量的修改,可以通過不斷?tailing oplog? 的方式來?拉取增量的 oplog?,然后針對拉取到的 oplog 集合,來過濾滿足條件的 oplog。這種方式也能滿足絕大部分場景的需求,但存在如下的不足。
MongoDB Change Stream 解決了 Tailing oplog 存在的不足
Change Stream 實戰
以 Mongo shell 為例,使用 Change Stream 非常簡單,mongo shell 封裝了針對整個實例、DB、Collection 級別的訂閱操作。
db.getMongo().watch() 訂閱整個實例的修改 db.watch() 訂閱指定DB的修改 db.collection.watch() 訂閱指定Collection的修改? ? ? ? ?
mytest:PRIMARY> db.coll.insert({x: 100}) WriteResult({ "nInserted" : 1 }) mytest:PRIMARY> db.coll.insert({x: 101}) WriteResult({ "nInserted" : 1 }) mytest:PRIMARY> db.coll.insert({x: 102}) WriteResult({ "nInserted" : 1 })?
? ??
mytest:PRIMARY> db.coll.watch([], {maxAwaitTimeMS: 60000, resumeAfter: { "_data" : "825E0D5E35000000012B022C0100296E5A1004EA4E00977BCC482FB44DEED9A3C2999946645F696400645E0D5E353BE5C36D695042C90004" }}) { "_id" : { "_data" : "825E0D5E37000000012B022C0100296E5A1004EA4E00977BCC482FB44DEED9A3C2999946645F696400645E0D5E373BE5C36D695042CA0004" }, "operationType" : "insert", "clusterTime" : Timestamp(1577934391, 1), "fullDocument" : { "_id" : ObjectId("5e0d5e373be5c36d695042ca"), "x" : 101 }, "ns" : { "db" : "test", "coll" : "coll" }, "documentKey" : { "_id" : ObjectId("5e0d5e373be5c36d695042ca") } } { "_id" : { "_data" : "825E0D5E39000000012B022C0100296E5A1004EA4E00977BCC482FB44DEED9A3C2999946645F696400645E0D5E393BE5C36D695042CB0004" }, "operationType" : "insert", "clusterTime" : Timestamp(1577934393, 1), "fullDocument" : { "_id" : ObjectId("5e0d5e393be5c36d695042cb"), "x" : 102 }, "ns" : { "db" : "test", "coll" : "coll" }, "documentKey" : { "_id" : ObjectId("5e0d5e393be5c36d695042cb") } }Change Stream 內部實現
watch() wrapper
db.watch() 實際上是一個 API wrapper,實際上 Change Stream 在 MongoDB 內部實際上是一個 aggregation 命令,只是加了一個特殊的?$changestream? 階段,在發起 change stream 訂閱操作后,可通過 db.currentOp() 看到對應的 aggregation/getMore 操作的詳細參數。
{"op" : "getmore","ns" : "test.coll","command" : {"getMore" : NumberLong("233479991942333714"),"collection" : "coll","maxTimeMS" : 50000,"lsid" : {"id" : UUID("e4fffa71-e168-4527-be61-f0918849d107")},},"planSummary" : "COLLSCAN","cursor" : {"cursorId" : NumberLong("233479991942333714"),"createdDate" : ISODate("2019-12-31T06:35:52.479Z"),"lastAccessDate" : ISODate("2019-12-31T06:36:09.988Z"),"nDocsReturned" : NumberLong(1),"nBatchesReturned" : NumberLong(1),"noCursorTimeout" : false,"tailable" : true,"awaitData" : true,"originatingCommand" : {"aggregate" : "coll","pipeline" : [{"$changeStream" : {"fullDocument" : "default"}}],"cursor" : {},"lsid" : {"id" : UUID("e4fffa71-e168-4527-be61-f0918849d107")},"$clusterTime" : {"clusterTime" : Timestamp(1577774144, 1),"signature" : {"hash" : BinData(0,"AAAAAAAAAAAAAAAAAAAAAAAAAAA="),"keyId" : NumberLong(0)}},"$db" : "test"},"operationUsingCursorId" : NumberLong(7019500)},"numYields" : 2,"locks" : {}}resume token
resume token 用來描述一個訂閱點,本質上是 oplog 信息的一個封裝,包含 clusterTime、uuid、documentKey等信息,當訂閱 API 帶上 resume token 時,MongoDB Server 會將 token 轉換為對應的信息,并定位到 oplog 起點繼續訂閱操作。
struct ResumeTokenData {Timestamp clusterTime;int version = 0;size_t applyOpsIndex = 0;Value documentKey;boost::optional<UUID> uuid; };ResumeTokenData 結構里包含 version 信息,在 4.0.7 以前的版本,version 均為0; 4.0.7 引入了一種新的 resume token 格式,version 為 1; 另外在 3.6 版本里,Resume Token 的編碼與 4.0 也有所不同;所以在版本升級后,有可能出現不同版本 token 無法識別的問題,所以盡量要讓 MongoDB Server 所有組件(Replica Set 各個成員,ConfigServer、Mongos)都保持相同的內核版本。
updateLookup
Change Stream 支持針對 update 操作,獲取當前的文檔完整內容,而不是僅更新操作本身,比如
mytest:PRIMARY> db.coll.find({_id: 101}) { "_id" : 101, "name" : "jack", "age" : 18 } mytest:PRIMARY> db.coll.update({_id: 101}, {$set: {age: 20}}) WriteResult({ "nMatched" : 1, "nUpserted" : 0, "nModified" : 1 })上面的 update 操作,默認情況下,change stream 會收到??{_id: 101}, {$set: {age: 20}? 的內容,而并不會包含這個文檔其他未更新字段的信息;而加上 fullDocument: "updateLookup" 選項后,Change Stream 會根據文檔 _id 去查找文檔當前的內容并返回。
需要注意的是,updateLookup 選項只能保證最終一致性,比如針對上述文檔,如果連續更新100次,update 的 change stream 并不會按順序收到中間每一次的更新,因為每次都是去查找文檔當前的內容,而當前的內容可能已經被后續的修改覆蓋。
Sharded cluster
Change Stream 支持針對 sharded cluster 進行訂閱,會保證全局有序的返回結果;為了達到全局有序這個目標,mongos 需要從每個 shard 都返回訂閱結果按時間戳進行排序合并返回。
在極端情況下,如果某些 shard 寫入量很少或者沒有寫入,change stream 的返回延時會受到影響,因為需要等到所有 shard 都返回訂閱結果;默認情況下,mongod server 每10s會產生一條 Noop 的特殊oplog,這個機制會間接驅動 sharded cluster 在寫入量不高的情況下也能持續運轉下去。
由于需要全局排序,在 sharded cluster 寫入量很高時,Change Stream 的性能很可能跟不上;如果對性能要求非常高,可以考慮關閉 Balancer,在每個 shard 上各自建立 Change Stream。
原文鏈接
本文為阿里云原創內容,未經允許不得轉載。
?
總結
以上是生活随笔為你收集整理的MongoDB 4.2 内核解析 - Change Stream的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 阿里云峰会|阿里云数据中台重磅升级后拟扶
- 下一篇: 单人开发场景下的测试环境实践