区块链教程Fabric1.0源代码分析流言算法Gossip服务端二
生活随笔
收集整理的這篇文章主要介紹了
区块链教程Fabric1.0源代码分析流言算法Gossip服务端二
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
區塊鏈教程Fabric1.0源代碼分析流言算法Gossip服務端二
Fabric 1.0源代碼筆記 之 gossip(流言算法) #GossipServer(Gossip服務端)
5.2、commImpl結構體方法
//conn.serviceConnection(),啟動連接服務 func (c *commImpl) GossipStream(stream proto.Gossip_GossipStreamServer) error //return &proto.Empty{} func (c *commImpl) Ping(context.Context, *proto.Empty) (*proto.Empty, error)func (c *commImpl) GetPKIid() common.PKIidType //向指定節點發送消息 func (c *commImpl) Send(msg *proto.SignedGossipMessage, peers ...*RemotePeer) //探測遠程節點是否有響應,_, err = cl.Ping(context.Background(), &proto.Empty{}) func (c *commImpl) Probe(remotePeer *RemotePeer) error //握手驗證遠程節點,_, err = cl.Ping(context.Background(), &proto.Empty{}) func (c *commImpl) Handshake(remotePeer *RemotePeer) (api.PeerIdentityType, error) func (c *commImpl) Accept(acceptor common.MessageAcceptor) <-chan proto.ReceivedMessage func (c *commImpl) PresumedDead() <-chan common.PKIidType func (c *commImpl) CloseConn(peer *RemotePeer) func (c *commImpl) Stop()//創建并啟動gRPC Server,以及注冊GossipServer實例 func NewCommInstanceWithServer(port int, idMapper identity.Mapper, peerIdentity api.PeerIdentityType, //將GossipServer實例注冊至peerServer func NewCommInstance(s *grpc.Server, cert *tls.Certificate, idStore identity.Mapper, func extractRemoteAddress(stream stream) string func readWithTimeout(stream interface{}, timeout time.Duration, address string) (*proto.SignedGossipMessage, error) //創建gRPC Server,grpc.NewServer(serverOpts...) func createGRPCLayer(port int) (*grpc.Server, net.Listener, api.PeerSecureDialOpts, []byte)//創建與服務端連接 func (c *commImpl) createConnection(endpoint string, expectedPKIID common.PKIidType) (*connection, error) //向指定節點發送消息 func (c *commImpl) sendToEndpoint(peer *RemotePeer, msg *proto.SignedGossipMessage) //return atomic.LoadInt32(&c.stopping) == int32(1) func (c *commImpl) isStopping() bool func (c *commImpl) emptySubscriptions() func (c *commImpl) authenticateRemotePeer(stream stream) (*proto.ConnectionInfo, error) func (c *commImpl) disconnect(pkiID common.PKIidType) func (c *commImpl) createConnectionMsg(pkiID common.PKIidType, certHash []byte, cert api.PeerIdentityType, signer proto.Signer) (*proto.SignedGossipMessage, error) //代碼在gossip/comm/comm_impl.go5.2.1、func NewCommInstanceWithServer(port int, idMapper identity.Mapper, peerIdentity api.PeerIdentityType,secureDialOpts api.PeerSecureDialOpts, dialOpts ...grpc.DialOption) (Comm, error)
創建并啟動gRPC Server,以及注冊GossipServer實例
func NewCommInstanceWithServer(port int, idMapper identity.Mapper, peerIdentity api.PeerIdentityType,secureDialOpts api.PeerSecureDialOpts, dialOpts ...grpc.DialOption) (Comm, error) {var ll net.Listenervar s *grpc.Servervar certHash []byteif len(dialOpts) == 0 {//peer.gossip.dialTimeout,gRPC連接撥號的超時dialOpts = []grpc.DialOption{grpc.WithTimeout(util.GetDurationOrDefault("peer.gossip.dialTimeout", defDialTimeout))}}if port > 0 {//創建gRPC Server,grpc.NewServer(serverOpts...)s, ll, secureDialOpts, certHash = createGRPCLayer(port)}commInst := &commImpl{selfCertHash: certHash,PKIID: idMapper.GetPKIidOfCert(peerIdentity),idMapper: idMapper,logger: util.GetLogger(util.LoggingCommModule, fmt.Sprintf("%d", port)),peerIdentity: peerIdentity,opts: dialOpts,secureDialOpts: secureDialOpts,port: port,lsnr: ll,gSrv: s,msgPublisher: NewChannelDemultiplexer(),lock: &sync.RWMutex{},deadEndpoints: make(chan common.PKIidType, 100),stopping: int32(0),exitChan: make(chan struct{}, 1),subscriptions: make([]chan proto.ReceivedMessage, 0),}commInst.connStore = newConnStore(commInst, commInst.logger)if port > 0 {commInst.stopWG.Add(1)go func() {defer commInst.stopWG.Done()s.Serve(ll) //啟動gRPC Server}()//commInst注冊到gRPC Serverproto.RegisterGossipServer(s, commInst)}return commInst, nil }//代碼在gossip/comm/comm_impl.go5.2.2、func NewCommInstance(s grpc.Server, cert tls.Certificate, idStore identity.Mapper,peerIdentity api.PeerIdentityType, secureDialOpts api.PeerSecureDialOpts,dialOpts ...grpc.DialOption) (Comm, error)
將GossipServer實例注冊至peerServer
func NewCommInstance(s *grpc.Server, cert *tls.Certificate, idStore identity.Mapper,peerIdentity api.PeerIdentityType, secureDialOpts api.PeerSecureDialOpts,dialOpts ...grpc.DialOption) (Comm, error) {dialOpts = append(dialOpts, grpc.WithTimeout(util.GetDurationOrDefault("peer.gossip.dialTimeout", defDialTimeout)))//構造commImplcommInst, err := NewCommInstanceWithServer(-1, idStore, peerIdentity, secureDialOpts, dialOpts...)if cert != nil {inst := commInst.(*commImpl)inst.selfCertHash = certHashFromRawCert(cert.Certificate[0])}proto.RegisterGossipServer(s, commInst.(*commImpl))return commInst, nil }//代碼在gossip/comm/comm_impl.go//創建與服務端連接
5.2.3、func (c commImpl) createConnection(endpoint string, expectedPKIID common.PKIidType) (connection, error)
func (c *commImpl) createConnection(endpoint string, expectedPKIID common.PKIidType) (*connection, error) {var err errorvar cc *grpc.ClientConnvar stream proto.Gossip_GossipStreamClientvar pkiID common.PKIidTypevar connInfo *proto.ConnectionInfovar dialOpts []grpc.DialOptiondialOpts = append(dialOpts, c.secureDialOpts()...)dialOpts = append(dialOpts, grpc.WithBlock())dialOpts = append(dialOpts, c.opts...)cc, err = grpc.Dial(endpoint, dialOpts...)cl := proto.NewGossipClient(cc)if _, err = cl.Ping(context.Background(), &proto.Empty{}); err != nil {cc.Close()return nil, err}ctx, cf := context.WithCancel(context.Background())stream, err = cl.GossipStream(ctx)connInfo, err = c.authenticateRemotePeer(stream)pkiID = connInfo.IDconn := newConnection(cl, cc, stream, nil)conn.pkiID = pkiIDconn.info = connInfoconn.logger = c.loggerconn.cancel = cfh := func(m *proto.SignedGossipMessage) {c.msgPublisher.DeMultiplex(&ReceivedMessageImpl{conn: conn,lock: conn,SignedGossipMessage: m,connInfo: connInfo,})}conn.handler = hreturn conn, nil } //代碼在gossip/comm/comm_impl.go6、connectionStore和connection結構體及方法
6.1、connection結構體及方法
type connection struct {cancel context.CancelFuncinfo *proto.ConnectionInfooutBuff chan *msgSendinglogger *logging.Logger // loggerpkiID common.PKIidType // pkiID of the remote endpointhandler handler // function to invoke upon a message receptionconn *grpc.ClientConn // gRPC connection to remote endpointcl proto.GossipClient // gRPC stub of remote endpointclientStream proto.Gossip_GossipStreamClient // client-side stream to remote endpointserverStream proto.Gossip_GossipStreamServer // server-side stream to remote endpointstopFlag int32 // indicates whether this connection is in process of stoppingstopChan chan struct{} // a method to stop the server-side gRPC call from a different go-routinesync.RWMutex // synchronizes access to shared variables }//構造connection func newConnection(cl proto.GossipClient, c *grpc.ClientConn, cs proto.Gossip_GossipStreamClient, ss proto.Gossip_GossipStreamServer) *connection //關閉connection func (conn *connection) close() //atomic.LoadInt32(&(conn.stopFlag)) == int32(1) func (conn *connection) toDie() bool //conn.outBuff <- m,其中m為msgSending{envelope: msg.Envelope,onErr: onErr,} func (conn *connection) send(msg *proto.SignedGossipMessage, onErr func(error)) //go conn.readFromStream(errChan, msgChan)、go conn.writeToStream(),同時msg := <-msgChan,conn.handler(msg) func (conn *connection) serviceConnection() error //循環不間斷從conn.outBuff取數據,然后stream.Send(m.envelope) func (conn *connection) writeToStream() //循環不間斷envelope, err := stream.Recv()、msg, err := envelope.ToGossipMessage()、msgChan <- msg func (conn *connection) readFromStream(errChan chan error, msgChan chan *proto.SignedGossipMessage) //獲取conn.serverStream func (conn *connection) getStream() stream //代碼在gossip/comm/conn.go6.2、connectionStore結構體及方法
type connectionStore struct {logger *logging.Logger // loggerisClosing bool // whether this connection store is shutting downconnFactory connFactory // creates a connection to remote peersync.RWMutex // synchronize access to shared variablespki2Conn map[string]*connection //connection map, key為pkiID,value為connectiondestinationLocks map[string]*sync.RWMutex //mapping between pkiIDs and locks,// used to prevent concurrent connection establishment to the same remote endpoint }//構造connectionStore func newConnStore(connFactory connFactory, logger *logging.Logger) *connectionStore //從connection map中獲取連接,如無則創建并啟動連接,并寫入connection map中 func (cs *connectionStore) getConnection(peer *RemotePeer) (*connection, error) //連接數量 func (cs *connectionStore) connNum() int //關閉指定連接 func (cs *connectionStore) closeConn(peer *RemotePeer) //關閉所有連接 func (cs *connectionStore) shutdown() func (cs *connectionStore) onConnected(serverStream proto.Gossip_GossipStreamServer, connInfo *proto.ConnectionInfo) *connection //注冊連接 func (cs *connectionStore) registerConn(connInfo *proto.ConnectionInfo, serverStream proto.Gossip_GossipStreamServer) *connection //關閉指定連接 func (cs *connectionStore) closeByPKIid(pkiID common.PKIidType) //代碼在gossip/comm/conn.go6.2.1、func (cs connectionStore) getConnection(peer RemotePeer) (*connection, error)
func (cs *connectionStore) getConnection(peer *RemotePeer) (*connection, error) {cs.RLock()isClosing := cs.isClosingcs.RUnlock()pkiID := peer.PKIIDendpoint := peer.Endpointcs.Lock()destinationLock, hasConnected := cs.destinationLocks[string(pkiID)]if !hasConnected {destinationLock = &sync.RWMutex{}cs.destinationLocks[string(pkiID)] = destinationLock}cs.Unlock()destinationLock.Lock()cs.RLock()//從connection map中獲取conn, exists := cs.pki2Conn[string(pkiID)]if exists {cs.RUnlock()destinationLock.Unlock()return conn, nil}cs.RUnlock()//創建連接createdConnection, err := cs.connFactory.createConnection(endpoint, pkiID)destinationLock.Unlock()conn = createdConnectioncs.pki2Conn[string(createdConnection.pkiID)] = conngo conn.serviceConnection() //啟動連接的消息接收處理、以及向對方節點發送消息return conn, nil } //代碼在gossip/comm/conn.go7、ChannelDeMultiplexer結構體及方法(多路復用器)
type ChannelDeMultiplexer struct {channels []*channellock *sync.RWMutexclosed int32 }//構造ChannelDeMultiplexer func NewChannelDemultiplexer() *ChannelDeMultiplexer //atomic.LoadInt32(&m.closed) == int32(1) func (m *ChannelDeMultiplexer) isClosed() bool //關閉 func (m *ChannelDeMultiplexer) Close() //添加通道 func (m *ChannelDeMultiplexer) AddChannel(predicate common.MessageAcceptor) chan interface{} //挨個通道發送消息 func (m *ChannelDeMultiplexer) DeMultiplex(msg interface{})轉載于:https://blog.51cto.com/14041296/2311323
總結
以上是生活随笔為你收集整理的区块链教程Fabric1.0源代码分析流言算法Gossip服务端二的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: php htts cookies,Htt
- 下一篇: 台湾19大IT业营收连衰 全球产业景气警