Tendermint mempool分析
概述
在Tendermint/mempool/doc.go文件里,有對(duì)mempool詳細(xì)的解釋。
- 當(dāng)客戶(hù)端提交一筆交易時(shí),這筆交易首先會(huì)被mempool進(jìn)行打包。而mempool的主要作用之一就是保存從其他peer或者自己收到的交易。以便其它模塊的使用。
- 交易進(jìn)入到mempool之前還會(huì)調(diào)用一次客戶(hù)端的CheckTx() 函數(shù)。并把檢查通過(guò)的交易放入到交易池中。
- 交易被放到交易池保證了交易的順序性。
- 共識(shí)引擎通過(guò)調(diào)用Update() 和 ReapMaxBytesMaxGas() 方法來(lái)更新內(nèi)存池中的交易。
- mempool的底層使用的是鏈表
mempool 接口
我們先看一下在mempool/mempool.go 文件里定義了的mempool功能的接口。
// 對(duì)內(nèi)存池的更新需要與提交區(qū)塊同步,這樣應(yīng)用程序就可以在提交時(shí)重置它們的瞬間狀態(tài)。 type Mempool interface {// CheckTx對(duì)應(yīng)用程序執(zhí)行一個(gè)新事務(wù),來(lái)確定它的有效性,以及是否應(yīng)該將它添加到內(nèi)存池。CheckTx(tx types.Tx, callback func(*abci.Response), txInfo TxInfo) error// ReapMaxBytesMaxGas從內(nèi)存池中獲取最多為maxBytes字節(jié)總數(shù)的事務(wù),條件是總gasWanted必須小于maxGas。// 如果兩個(gè)最大值都是負(fù)數(shù),則所有返回交易的規(guī)模沒(méi)有上限,返回所有可用的交易ReapMaxBytesMaxGas(maxBytes, maxGas int64) types.Txs// ReapMaxTxs從內(nèi)存池中獲取最多的事務(wù)。// 如果兩個(gè)最大值都是負(fù)數(shù),則所有返回交易的規(guī)模沒(méi)有上限,返回所有可用的交易ReapMaxTxs(max int) types.Txs// Lock 鎖定mempool。共識(shí)必須能持鎖才能安全更新。Lock()// 解鎖mempoolUnlock()// Update通知內(nèi)存池,給定的txs已經(jīng)提交,可以被丟棄。// Update 應(yīng)該在區(qū)塊被共識(shí)提交后調(diào)用Update(blockHeight int64,blockTxs types.Txs,deliverTxResponses []*abci.ResponseDeliverTx,newPreFn PreCheckFunc,newPostFn PostCheckFunc,) error// FlushAppConn flushes the mempool connection to ensure async reqResCb calls are// done. E.g. from CheckTx.// NOTE: Lock/Unlock must be managed by callerFlushAppConn() error// Flush removes all transactions from the mempool and cacheFlush()// TxsAvailable returns a channel which fires once for every height,// and only when transactions are available in the mempool.// NOTE: the returned channel may be nil if EnableTxsAvailable was not called.TxsAvailable() <-chan struct{}// EnableTxsAvailable initializes the TxsAvailable channel, ensuring it will// trigger once every height when transactions are available.EnableTxsAvailable()// Size returns the number of transactions in the mempool.Size() int// TxsBytes returns the total size of all txs in the mempool.TxsBytes() int64// InitWAL creates a directory for the WAL file and opens a file itself. If// there is an error, it will be of type *PathError.InitWAL() error// CloseWAL closes and discards the underlying WAL file.// Any further writes will not be relayed to disk.CloseWAL() }ClistMempool
ClistMempool.go 實(shí)現(xiàn)了mempool中定義的接口,并添加了一些其他的功能。其主要作用就是檢查交易的合法性并判斷是否加入交易池。
NewCListMempool
NewClistMempool 使用給定的配置和連接的application等參數(shù),創(chuàng)建一個(gè)新的mempool。
func NewCListMempool(config *cfg.MempoolConfig,proxyAppConn proxy.AppConnMempool,height int64,options ...CListMempoolOption, ) *CListMempool {// 初始化相關(guān)的成員變量mempool := &CListMempool{// 給定的配置config: config,// 應(yīng)用層連接proxyAppConn: proxyAppConn,// 創(chuàng)建一個(gè)雙向鏈表,用來(lái)保存交易txs: clist.New(),height: height,recheckCursor: nil,recheckEnd: nil,logger: log.NewNopLogger(),metrics: NopMetrics(),}if config.CacheSize > 0 {// 創(chuàng)建內(nèi)存池緩存mempool.cache = newMapTxCache(config.CacheSize)} else {mempool.cache = nopTxCache{}}// 設(shè)置了代理連接的回調(diào)函數(shù)為globalCb(req *abci.Request, res *abci.Response)// 因?yàn)榻灰壮卦谑盏浇灰缀髸?huì)把交易提交給APP,根據(jù)APP的返回來(lái)決定后續(xù)如何這個(gè)交易// 如何處理,所以在APP處理完后的交易后回調(diào)mempool.globalCb 進(jìn)而讓mempool來(lái)繼續(xù)決定當(dāng)前交易如何處理proxyAppConn.SetResponseCallback(mempool.globalCb)for _, option := range options {option(mempool)}return mempool }globalCb
Global callback 將會(huì)在每次ABCI 響應(yīng)后進(jìn)行調(diào)用。
func (mem *CListMempool) globalCb(req *abci.Request, res *abci.Response) {if mem.recheckCursor == nil {// 符合要求,什么也不做return}mem.metrics.RecheckTimes.Add(1)// 也是檢測(cè)交易是否符合要求,符合就什么也不做mem.resCbRecheck(req, res)// update metricsmem.metrics.Size.Set(float64(mem.Size())) }CheckTx
func (mem *CListMempool) CheckTx(tx types.Tx, cb func(*abci.Response), txInfo TxInfo) error {// 并發(fā)控制,也就是說(shuō)當(dāng)共識(shí)引擎在進(jìn)行交易池讀取和更新的時(shí)候,此函數(shù)應(yīng)該是阻塞的mem.updateMtx.RLock()// use defer to unlock mutex because application (*local client*) might panicdefer mem.updateMtx.RUnlock()txSize := len(tx)// 判斷交易池是否滿(mǎn)了if err := mem.isFull(txSize); err != nil {return err}// 如果已經(jīng)超過(guò)了設(shè)置的內(nèi)存池則放棄加入if txSize > mem.config.MaxTxBytes {return ErrTxTooLarge{mem.config.MaxTxBytes, txSize}}if mem.preCheck != nil {if err := mem.preCheck(tx); err != nil {return ErrPreCheck{err}}}// NOTE: writing to the WAL and calling proxy must be done before adding tx// to the cache. otherwise, if either of them fails, next time CheckTx is// called with tx, ErrTxInCache will be returned without tx being checked at// all even once.if mem.wal != nil {// TODO: Notify administrators when WAL fails_, err := mem.wal.Write(append([]byte(tx), newline...))if err != nil {return fmt.Errorf("wal.Write: %w", err)}}// NOTE: proxyAppConn may error if tx buffer is fullif err := mem.proxyAppConn.Error(); err != nil {return err}// 先加入交易池cache 如果cache中存在此交易則返回falseif !mem.cache.Push(tx) {// Record a new sender for a tx we've already seen.// Note it's possible a tx is still in the cache but no longer in the mempool// (eg. after committing a block, txs are removed from mempool but not cache),// so we only record the sender for txs still in the mempool.if e, ok := mem.txsMap.Load(TxKey(tx)); ok {memTx := e.(*clist.CElement).Value.(*mempoolTx)_, loaded := memTx.senders.LoadOrStore(txInfo.SenderID, true)// TODO: consider punishing peer for dups,// its non-trivial since invalid txs can become valid,// but they can spam the same tx with little cost to them atm.if loaded {return ErrTxInCache}}mem.logger.Debug("tx exists already in cache", "tx_hash", tx.Hash())return nil}ctx := context.Background()if txInfo.Context != nil {ctx = txInfo.Context}// 此時(shí)把交易傳給proxyAppConn,并得到APP檢查的結(jié)果reqRes, err := mem.proxyAppConn.CheckTxAsync(ctx, abci.RequestCheckTx{Tx: tx})if err != nil {// 如果檢查不通過(guò),就從交易池緩存中移除這筆交易mem.cache.Remove(tx)return err}reqRes.SetCallback(mem.reqResCb(tx, txInfo.SenderID, txInfo.SenderP2PID, cb))return nil }Reactor
mempool reactor主要功能是在peer之間廣播包含交易的mempool。
跳過(guò)Reactor結(jié)構(gòu)體和NewReactor(),我們直接來(lái)看OnStart()
OnStart
func (r *Reactor) OnStart() error {if !r.config.Broadcast {r.Logger.Info("tx broadcasting is disabled")}// 處理Mempool通道的消息,最后會(huì)調(diào)用mempool中的CheckTx方法來(lái)判斷是否要加入到內(nèi)存池go r.processMempoolCh()// 處理每個(gè)節(jié)點(diǎn)的更新go r.processPeerUpdates()return nil }下圖為從客戶(hù)端提交的交易添加到交易池的過(guò)程:
最后
至此,Tendermint mempool源碼就分析完了。如果有錯(cuò)誤之處,希望路過(guò)的大佬能夠指點(diǎn)指點(diǎn)。最后推薦一位大佬的公眾號(hào),歡迎關(guān)注哦:區(qū)塊鏈技術(shù)棧
另外這個(gè)地址上還有很多區(qū)塊鏈學(xué)習(xí)資料:https://github.com/mindcarver/blockchain_guide
參考文章:https://gitee.com/wupeaking/tendermint_code_analysis/blob/master/Mempool%E6%BA%90%E7%A0%81%E5%88%86%E6%9E%90.md
總結(jié)
以上是生活随笔為你收集整理的Tendermint mempool分析的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: ipxe u盘启动linux内核,iPX
- 下一篇: Clist循环链表的实现