兄弟连区块链教程Fabric1.0源代码分析configupdate处理通道配置更新
區塊鏈教程Fabric1.0源代碼分析configupdate處理通道配置更新,2018年下半年,區塊鏈行業正逐漸褪去發展之初的浮躁、回歸理性,表面上看相關人才需求與身價似乎正在回落。但事實上,正是初期泡沫的漸退,讓人們更多的關注點放在了區塊鏈真正的技術之上。
Fabric 1.0源代碼筆記 之 Orderer #configupdate(處理通道配置更新)
1、configupdate概述
configupdate,用于接收配置交易,并處理通道配置更新。
相關代碼在orderer/configupdate目錄。
2、SupportManager接口定義及實現
2.1、SupportManager接口定義
type SupportManager interface {GetChain(chainID string) (Support, bool)NewChannelConfig(envConfigUpdate *cb.Envelope) (configtxapi.Manager, error) } //代碼在orderer/configupdate/configupdate.go2.2、SupportManager接口實現
SupportManager接口實現,即configUpdateSupport結構體及方法。
type configUpdateSupport struct {multichain.Manager //type multiLedger struct }func (cus configUpdateSupport) GetChain(chainID string) (configupdate.Support, bool) {return cus.Manager.GetChain(chainID) } //代碼在orderer/server.gomultichain.Manager接口及實現multiLedger,見Fabric 1.0源代碼筆記 之 Orderer #multichain(多鏈支持包)
3、Support接口定義及實現
3.1、Support接口定義
type Support interface {ProposeConfigUpdate(env *cb.Envelope) (*cb.ConfigEnvelope, error) } //代碼在orderer/configupdate/configupdate.go3.2、Support接口實現
Support接口實現,即configManager結構體及方法。
type configManager struct {api.ResourcescallOnUpdate []func(api.Manager)initializer api.Initializercurrent *configSet }func (cm *configManager) processConfig(channelGroup *cb.ConfigGroup) (*configResult, error) //./config.go func (cm *configManager) commitCallbacks() //./manager.go func (cm *configManager) ProposeConfigUpdate(configtx *cb.Envelope) (*cb.ConfigEnvelope, error) //./manager.go func (cm *configManager) proposeConfigUpdate(configtx *cb.Envelope) (*cb.ConfigEnvelope, error) //./manager.go func (cm *configManager) prepareApply(configEnv *cb.ConfigEnvelope) (*configResult, error) //./manager.go func (cm *configManager) Validate(configEnv *cb.ConfigEnvelope) error //./manager.go func (cm *configManager) Apply(configEnv *cb.ConfigEnvelope) error //./manager.go func (cm *configManager) ChainID() string //./manager.go func (cm *configManager) Sequence() uint64 //./manager.go func (cm *configManager) ConfigEnvelope() *cb.ConfigEnvelope //./manager.go func (cm *configManager) verifyDeltaSet(deltaSet map[string]comparable, signedData []*cb.SignedData) error //./update.go func (cm *configManager) authorizeUpdate(configUpdateEnv *cb.ConfigUpdateEnvelope) (map[string]comparable, error) //./update.go func (cm *configManager) policyForItem(item comparable) (policies.Policy, bool) //./update.go func (cm *configManager) computeUpdateResult(updatedConfig map[string]comparable) map[string]comparable //./update.go //代碼在common/configtx/manager.go4、ConfigUpdateProcessor接口定義及實現
4.1、ConfigUpdateProcessor接口定義
type ConfigUpdateProcessor interface {Process(envConfigUpdate *cb.Envelope) (*cb.Envelope, error) } //代碼在orderer/common/broadcast/broadcast.go4.2、ConfigUpdateProcessor接口實現
ConfigUpdateProcessor接口實現,即Processor結構體及方法。
type Processor struct {signer crypto.LocalSignermanager SupportManager //即type configUpdateSupport struct,或者即multichain.multiLedgersystemChannelID stringsystemChannelSupport Support }//構造Processor func New(systemChannelID string, supportManager SupportManager, signer crypto.LocalSigner) *Processor //獲取channelID func channelID(env *cb.Envelope) (string, error) //處理通道配置更新 func (p *Processor) Process(envConfigUpdate *cb.Envelope) (*cb.Envelope, error) func (p *Processor) existingChannelConfig(envConfigUpdate *cb.Envelope, channelID string, support Support) (*cb.Envelope, error) func (p *Processor) proposeNewChannelToSystemChannel(newChannelEnvConfig *cb.Envelope) (*cb.Envelope, error) func (p *Processor) newChannelConfig(channelID string, envConfigUpdate *cb.Envelope) (*cb.Envelope, error) //代碼在orderer/configupdate/configupdate.go4.2.1、func New(systemChannelID string, supportManager SupportManager, signer crypto.LocalSigner) *Processor
func New(systemChannelID string, supportManager SupportManager, signer crypto.LocalSigner) *Processor {support, ok := supportManager.GetChain(systemChannelID)return &Processor{systemChannelID: systemChannelID,manager: supportManager,signer: signer,systemChannelSupport: support,} } //代碼在orderer/configupdate/configupdate.go4.2.2、func (p Processor) Process(envConfigUpdate cb.Envelope) (*cb.Envelope, error)
func (p *Processor) Process(envConfigUpdate *cb.Envelope) (*cb.Envelope, error) {channelID, err := channelID(envConfigUpdate)support, ok := p.manager.GetChain(channelID) //存在if ok {return p.existingChannelConfig(envConfigUpdate, channelID, support)}return p.newChannelConfig(channelID, envConfigUpdate) //不存在 } //代碼在orderer/configupdate/configupdate.go4.2.3、func (p Processor) existingChannelConfig(envConfigUpdate cb.Envelope, channelID string, support Support) (*cb.Envelope, error)
func (p *Processor) existingChannelConfig(envConfigUpdate *cb.Envelope, channelID string, support Support) (*cb.Envelope, error) {configEnvelope, err := support.ProposeConfigUpdate(envConfigUpdate)return utils.CreateSignedEnvelope(cb.HeaderType_CONFIG, channelID, p.signer, configEnvelope, msgVersion, epoch) } //代碼在orderer/configupdate/configupdate.go4.2.4、func (p Processor) newChannelConfig(channelID string, envConfigUpdate cb.Envelope) (*cb.Envelope, error)
func (p *Processor) newChannelConfig(channelID string, envConfigUpdate *cb.Envelope) (*cb.Envelope, error) {ctxm, err := p.manager.NewChannelConfig(envConfigUpdate) //創建新的通道newChannelConfigEnv, err := ctxm.ProposeConfigUpdate(envConfigUpdate) //創建新的通道后處理通道配置newChannelEnvConfig, err := utils.CreateSignedEnvelope(cb.HeaderType_CONFIG, channelID, p.signer, newChannelConfigEnv, msgVersion, epoch)return p.proposeNewChannelToSystemChannel(newChannelEnvConfig) } //代碼在orderer/configupdate/configupdate.go5、詳解configManager結構體
5.1、configManager結構體定義及方法
type configManager struct {api.ResourcescallOnUpdate []func(api.Manager)initializer api.Initializercurrent *configSet }func validateConfigID(configID string) error func validateChannelID(channelID string) error func NewManagerImpl(envConfig *cb.Envelope, initializer api.Initializer, callOnUpdate []func(api.Manager)) (api.Manager, error) func (cm *configManager) commitCallbacks() func (cm *configManager) ProposeConfigUpdate(configtx *cb.Envelope) (*cb.ConfigEnvelope, error) func (cm *configManager) proposeConfigUpdate(configtx *cb.Envelope) (*cb.ConfigEnvelope, error) func (cm *configManager) prepareApply(configEnv *cb.ConfigEnvelope) (*configResult, error) func (cm *configManager) Validate(configEnv *cb.ConfigEnvelope) error func (cm *configManager) Apply(configEnv *cb.ConfigEnvelope) error func (cm *configManager) ChainID() string func (cm *configManager) Sequence() uint64 func (cm *configManager) ConfigEnvelope() *cb.ConfigEnvelopefunc proposeGroup(result *configResult) error func processConfig(channelGroup *cb.ConfigGroup, proposer api.Proposer) (*configResult, error) func (cm *configManager) processConfig(channelGroup *cb.ConfigGroup) (*configResult, error)func (c *configSet) verifyReadSet(readSet map[string]comparable) error func ComputeDeltaSet(readSet, writeSet map[string]comparable) map[string]comparable func validateModPolicy(modPolicy string) error func (cm *configManager) verifyDeltaSet(deltaSet map[string]comparable, signedData []*cb.SignedData) error func verifyFullProposedConfig(writeSet, fullProposedConfig map[string]comparable) error //驗證所有修改的配置都有相應的修改策略,返回修改過的配置的映射map[string]comparable func (cm *configManager) authorizeUpdate(configUpdateEnv *cb.ConfigUpdateEnvelope) (map[string]comparable, error) func (cm *configManager) policyForItem(item comparable) (policies.Policy, bool) func (cm *configManager) computeUpdateResult(updatedConfig map[string]comparable) map[string]comparable //Envelope轉換為ConfigUpdateEnvelope func envelopeToConfigUpdate(configtx *cb.Envelope) (*cb.ConfigUpdateEnvelope, error) //代碼在common/configtx/manager.go5.2、func (cm configManager) ProposeConfigUpdate(configtx cb.Envelope) (*cb.ConfigEnvelope, error)
func (cm *configManager) ProposeConfigUpdate(configtx *cb.Envelope) (*cb.ConfigEnvelope, error) {return cm.proposeConfigUpdate(configtx) }func (cm *configManager) proposeConfigUpdate(configtx *cb.Envelope) (*cb.ConfigEnvelope, error) {//Envelope轉換為ConfigUpdateEnvelopeconfigUpdateEnv, err := envelopeToConfigUpdate(configtx)//驗證所有修改的配置都有相應的修改策略,返回修改過的配置的映射map[string]comparableconfigMap, err := cm.authorizeUpdate(configUpdateEnv)channelGroup, err := configMapToConfig(configMap) //ConfigGroup//實際調用processConfig(channelGroup, cm.initializer),并最終調用proposeGroup(configResult)result, err := cm.processConfig(channelGroup)result.rollback()return &cb.ConfigEnvelope{Config: &cb.Config{Sequence: cm.current.sequence + 1,ChannelGroup: channelGroup,},LastUpdate: configtx,}, nil } //代碼在common/configtx/manager.go補充ConfigUpdateEnvelope:
type ConfigUpdateEnvelope struct {ConfigUpdate []byte //type ConfigUpdate structSignatures []*ConfigSignature }type ConfigUpdate struct {ChannelId stringReadSet *ConfigGroupWriteSet *ConfigGroup }type ConfigGroup struct {Version uint64Groups map[string]*ConfigGroupValues map[string]*ConfigValuePolicies map[string]*ConfigPolicyModPolicy string } //代碼在protos/common/configtx.pb.go補充ConfigGroup:
### 5.3、func (cm *configManager) authorizeUpdate(configUpdateEnv *cb.ConfigUpdateEnvelope) (map[string]comparable, error)func (cm configManager) authorizeUpdate(configUpdateEnv cb.ConfigUpdateEnvelope) (map[string]comparable, error) {
????//反序列化configUpdateEnv.ConfigUpdate
????configUpdate, err := UnmarshalConfigUpdate(configUpdateEnv.ConfigUpdate)
????readSet, err := MapConfig(configUpdate.ReadSet) //map[string]comparable
????err = cm.current.verifyReadSet(readSet)
????writeSet, err := MapConfig(configUpdate.WriteSet) //map[string]comparable
????//從writeSet中逐一對比readSet,去除沒有發生變更的
????deltaSet := ComputeDeltaSet(readSet, writeSet)
????signedData, err := configUpdateEnv.AsSignedData() //轉換為SignedData
????err = cm.verifyDeltaSet(deltaSet, signedData) //校驗deltaSet
????fullProposedConfig := cm.computeUpdateResult(deltaSet) //合并為fullProposedConfig
????err := verifyFullProposedConfig(writeSet, fullProposedConfig)
????return fullProposedConfig, nil
}
//代碼在common/configtx/update.go
type comparable struct {
????*cb.ConfigGroup
????*cb.ConfigValue
????*cb.ConfigPolicy
????key string
????path []string
}
//代碼在common/configtx/compare.go
func (cm configManager) processConfig(channelGroup cb.ConfigGroup) (*configResult, error) {
????configResult, err := processConfig(channelGroup, cm.initializer)
????err = configResult.preCommit()
????return configResult, nil
}
//代碼在common/configtx/config.go
type configResult struct {
????tx interface{}
????groupName string
????groupKey string
????group *cb.ConfigGroup
????valueHandler config.ValueProposer
????policyHandler policies.Proposer
????subResults []*configResult
????deserializedValues map[string]proto.Message
????deserializedPolicies map[string]proto.Message
}
func NewConfigResult(config *cb.ConfigGroup, proposer api.Proposer) (ConfigResult, error)
func (cr *configResult) JSON() string
func (cr configResult) bufferJSON(buffer bytes.Buffer)
//cr.valueHandler.PreCommit(cr.tx)
func (cr *configResult) preCommit() error
//cr.valueHandler.CommitProposals(cr.tx)
//cr.policyHandler.CommitProposals(cr.tx)
func (cr *configResult) commit()
//cr.valueHandler.RollbackProposals(cr.tx)
//cr.policyHandler.RollbackProposals(cr.tx)
func (cr *configResult) rollback()
func proposeGroup(result *configResult) error
func processConfig(channelGroup cb.ConfigGroup, proposer api.Proposer) (configResult, error)
//代碼在common/configtx/config.go
func processConfig(channelGroup cb.ConfigGroup, proposer api.Proposer) (configResult, error) {
????helperGroup := cb.NewConfigGroup()
????helperGroup.Groups[RootGroupKey] = channelGroup
????configResult := &configResult{
????????group: helperGroup,
????????valueHandler: proposer.ValueProposer(),
????????policyHandler: proposer.PolicyProposer(),
????}
????err := proposeGroup(configResult)
????return configResult, nil
}
//代碼在common/configtx/config.go
func proposeGroup(result *configResult) error {
????subGroups := make([]string, len(result.group.Groups))
????i := 0
????for subGroup := range result.group.Groups {
????????subGroups[i] = subGroup
????????i++
????}
????valueDeserializer, subValueHandlers, err := result.valueHandler.BeginValueProposals(result.tx, subGroups)
????subPolicyHandlers, err := result.policyHandler.BeginPolicyProposals(result.tx, subGroups)
????for key, value := range result.group.Values {
????????msg, err := valueDeserializer.Deserialize(key, value.Value)
????????result.deserializedValues[key] = msg
????}
????for key, policy := range result.group.Policies {
????????policy, err := result.policyHandler.ProposePolicy(result.tx, key, policy)
????????result.deserializedPolicies[key] = policy
????}
????result.subResults = make([]*configResult, 0, len(subGroups))
????for i, subGroup := range subGroups {
????????result.subResults = append(result.subResults, &configResult{
????????????tx: result.tx,
????????????groupKey: subGroup,
????????????groupName: result.groupName + "/" + subGroup,
????????????group: result.group.Groups[subGroup],
????????????valueHandler: subValueHandlers[i],
????????????policyHandler: subPolicyHandlers[i],
????????????deserializedValues: make(map[string]proto.Message),
????????????deserializedPolicies: make(map[string]proto.Message),
????????})
????????err := proposeGroup(result.subResults[i])
????}
????return nil
}
//代碼在common/configtx/config.go
總結
以上是生活随笔為你收集整理的兄弟连区块链教程Fabric1.0源代码分析configupdate处理通道配置更新的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Java泛型通用常量类案例
- 下一篇: 实时计算 Flink性能调优