GRPC golang版源码分析之客户端(一)
Table of Contents
- 1. 前言
- 2. 源碼目錄瀏覽
- 3. 客戶端
- 4. 相關(guān)鏈接
1?前言
grpc是一個通用的rpc框架,用google實現(xiàn),當然也有g(shù)o語言的版本。在工作中主要用到這個庫,所以看看源碼加強自己對框架的了解。目前來說主要分析的都以go版本為主(并沒有看其他語言版本).由于個人水平有限,代碼中的有些思想也是個人揣測,難免有些錯誤,如果發(fā)現(xiàn)錯誤,還望幫忙指出。
2?源碼目錄瀏覽
grpc使用protobuf(google的序列化框架)作為通信協(xié)議,底層上使用http2作為其傳輸協(xié)議,grpc源碼中自己實現(xiàn)了http2的服務(wù)端跟客戶端,而并沒有用net/http包。http2有很多特性能夠高效的傳輸數(shù)據(jù),具體特點可以看相關(guān)鏈接詳細了解。 grpc目錄如下:看名字大概能看出這些目錄中代碼是哪些關(guān)系,documentation目錄是存放一些文檔,benchmark是壓測,credentials是驗證,examples是例子,grpclb是負載均衡,grpclog是日志,health是服務(wù)健康檢查,metadata是元數(shù)據(jù)(用戶客戶端給服務(wù)端傳送一些特殊數(shù)據(jù),具體可以看相關(guān)鏈接),naming目錄是提供名字服務(wù)需要實現(xiàn)的接口(相當于一個dns),stats是統(tǒng)計信息,transport 傳輸層實現(xiàn)(主要是http2的客戶端與服務(wù)端時實現(xiàn), 不會詳細說這個目錄),還有其他一些比較無關(guān)緊要的目錄就不一一介紹了。
3?客戶端
在example目錄中有兩個比較簡單的例子,就先從這里入手吧,
func main() {// Set up a connection to the server.//建立一個鏈接conn, err := grpc.Dial(address, grpc.WithInsecure())if err != nil {log.Fatalf("did not connect: %v", err)}defer conn.Close()c := pb.NewGreeterClient(conn)// Contact the server and print out its response.name := defaultNameif len(os.Args) > 1 {name = os.Args[1]}//調(diào)用函數(shù)r, err := c.SayHello(context.Background(), &pb.HelloRequest{Name: name})if err != nil {log.Fatalf("could not greet: %v", err)}log.Printf("Greeting: %s", r.Message) }grcp.WithInsecure參數(shù)是在鏈接https服務(wù)端時不用檢查服務(wù)端的證書(要是你相信服務(wù)端就不用檢查).Dial函數(shù)對服務(wù)端建立一個連接, grpc.Dial函數(shù):
func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {cc := &ClientConn{target: target,conns: make(map[Address]*addrConn),}cc.ctx, cc.cancel = context.WithCancel(context.Background())defer func() {select {case <-ctx.Done():conn, err = nil, ctx.Err()default:}if err != nil {cc.Close()}}()//設(shè)置grpc的各種選項for _, opt := range opts {opt(&cc.dopts)}// Set defaults.if cc.dopts.codec == nil {//默認用protobuf編解碼cc.dopts.codec = protoCodec{}}if cc.dopts.bs == nil {cc.dopts.bs = DefaultBackoffConfig}creds := cc.dopts.copts.TransportCredentials//驗證信息if creds != nil && creds.Info().ServerName != "" {cc.authority = creds.Info().ServerName} else {colonPos := strings.LastIndex(target, ":")if colonPos == -1 {colonPos = len(target)}cc.authority = target[:colonPos]}var ok boolwaitC := make(chan error, 1)//啟動一個goroutine啟動名字服務(wù)器(類似dns)go func() {var addrs []Addressif cc.dopts.balancer == nil {// Connect to target directly if balancer is nil.// 如果沒設(shè)置負載均衡器,則直接連接addrs = append(addrs, Address{Addr: target})} else {var credsClone credentials.TransportCredentialsif creds != nil {credsClone = creds.Clone()}config := BalancerConfig{DialCreds: credsClone,}//啟動負載均衡服務(wù)if err := cc.dopts.balancer.Start(target, config); err != nil {waitC <- errreturn}ch := cc.dopts.balancer.Notify()if ch == nil {// There is no name resolver installed.addrs = append(addrs, Address{Addr: target})} else {addrs, ok = <-chif !ok || len(addrs) == 0 {waitC <- errNoAddrreturn}}}for _, a := range addrs {//給每個地址一個conn,連接池if err := cc.resetAddrConn(a, false, nil); err != nil {waitC <- errreturn}}close(waitC)}()var timeoutCh <-chan time.Timeif cc.dopts.timeout > 0 {timeoutCh = time.After(cc.dopts.timeout)}select {case <-ctx.Done():return nil, ctx.Err()case err := <-waitC:if err != nil {return nil, err}case <-timeoutCh:return nil, ErrClientConnTimeout}// If balancer is nil or balancer.Notify() is nil, ok will be false here.// The lbWatcher goroutine will not be created.if ok {go cc.lbWatcher()}return cc, nil }通過dial這個函數(shù),grpc已經(jīng)建立了到服務(wù)端的連接,啟動了自定義負載平衡(如果有的話). pb.NewGreeterClient這行代碼是通過protoc工具自動生成的,它包一個grpc連接包裹在一個struct內(nèi)方便調(diào)用生成的客戶端grpc調(diào)用代碼。接下來grpc客戶端調(diào)用SayHello向服務(wù)器發(fā)送rpc請求。
func (c *greeterClient) SayHello(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloReply, error) {out := new(HelloReply)//調(diào)用實際的發(fā)送請求函數(shù)err := grpc.Invoke(ctx, "/helloworld.Greeter/SayHello", in, out, c.cc, opts...)if err != nil {return nil, err}return out, nil }//最后主要是invoke函數(shù) func invoke(ctx context.Context, method string, args, reply interface{}, cc *ClientConn, opts ...CallOption) (e error) {c := defaultCallInfofor _, o := range opts {//調(diào)用之前的hookif err := o.before(&c); err != nil {return toRPCErr(err)}}defer func() {for _, o := range opts {//執(zhí)行完后的hooko.after(&c)}}()//trace相關(guān)代碼if EnableTracing {c.traceInfo.tr = trace.New("grpc.Sent."+methodFamily(method), method)defer c.traceInfo.tr.Finish()c.traceInfo.firstLine.client = trueif deadline, ok := ctx.Deadline(); ok {c.traceInfo.firstLine.deadline = deadline.Sub(time.Now())}c.traceInfo.tr.LazyLog(&c.traceInfo.firstLine, false)// TODO(dsymonds): Arrange for c.traceInfo.firstLine.remoteAddr to be set.defer func() {if e != nil {c.traceInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{e}}, true)c.traceInfo.tr.SetError()}}()}//統(tǒng)計相關(guān)代碼if stats.On() {ctx = stats.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method})begin := &stats.Begin{Client: true,BeginTime: time.Now(),FailFast: c.failFast,}stats.HandleRPC(ctx, begin)}defer func() {//結(jié)束后的統(tǒng)計相關(guān)代碼if stats.On() {end := &stats.End{Client: true,EndTime: time.Now(),Error: e,}stats.HandleRPC(ctx, end)}}()topts := &transport.Options{Last: true,Delay: false,}for {var (err errort transport.ClientTransportstream *transport.Stream// Record the put handler from Balancer.Get(...). It is called once the// RPC has completed or failed.put func())// TODO(zhaoq): Need a formal spec of fail-fast.//傳輸層的配置callHdr := &transport.CallHdr{Host: cc.authority,Method: method,}if cc.dopts.cp != nil {callHdr.SendCompress = cc.dopts.cp.Type()}gopts := BalancerGetOptions{BlockingWait: !c.failFast,}//得到傳輸成連接,在http2中一個傳輸單位是一個流。t, put, err = cc.getTransport(ctx, gopts)if err != nil {// TODO(zhaoq): Probably revisit the error handling.if _, ok := err.(*rpcError); ok {return err}if err == errConnClosing || err == errConnUnavailable {if c.failFast {return Errorf(codes.Unavailable, "%v", err)}continue}// All the other errors are treated as Internal errors.return Errorf(codes.Internal, "%v", err)}if c.traceInfo.tr != nil {c.traceInfo.tr.LazyLog(&payload{sent: true, msg: args}, true)}// 發(fā)送請求stream, err = sendRequest(ctx, cc.dopts.codec, cc.dopts.cp, callHdr, t, args, topts)if err != nil {if put != nil {put()put = nil}// Retry a non-failfast RPC when// i) there is a connection error; or// ii) the server started to drain before this RPC was initiated.// 在這兩種情況下重試,1 鏈接錯誤 2 在rpc初始化之前服務(wù)端已經(jīng)開始服務(wù)if _, ok := err.(transport.ConnectionError); ok || err == transport.ErrStreamDrain {if c.failFast {return toRPCErr(err)}continue}return toRPCErr(err)}//收消息err = recvResponse(ctx, cc.dopts, t, &c, stream, reply)if err != nil {if put != nil {put()put = nil}if _, ok := err.(transport.ConnectionError); ok || err == transport.ErrStreamDrain {if c.failFast {return toRPCErr(err)}continue}return toRPCErr(err)}if c.traceInfo.tr != nil {c.traceInfo.tr.LazyLog(&payload{sent: false, msg: reply}, true)}//關(guān)閉一個http2流t.CloseStream(stream, nil)if put != nil {put()put = nil}//Errorf會判斷返回十分okreturn Errorf(stream.StatusCode(), "%s", stream.StatusDesc())} }在這個函數(shù)最主要是兩個函數(shù),一個是sendRequest,一個是recvResponse,首先看看sendRequest函數(shù):
func sendRequest(ctx context.Context, codec Codec, compressor Compressor, callHdr *transport.CallHdr, t transport.ClientTransport, args interface{}, opts *transport.Options) (_ *transport.Stream, err error) {// 創(chuàng)建一個http2流stream, err := t.NewStream(ctx, callHdr)if err != nil {return nil, err}defer func() {if err != nil {// If err is connection error, t will be closed, no need to close stream here.if _, ok := err.(transport.ConnectionError); !ok {t.CloseStream(stream, err)}}}()var (cbuf *bytes.BufferoutPayload *stats.OutPayload)//壓縮不為空if compressor != nil {cbuf = new(bytes.Buffer)}//統(tǒng)計if stats.On() {outPayload = &stats.OutPayload{Client: true,}}//編碼并壓縮數(shù)據(jù)outBuf, err := encode(codec, args, compressor, cbuf, outPayload)if err != nil {return nil, Errorf(codes.Internal, "grpc: %v", err)}//寫入流err = t.Write(stream, outBuf, opts)if err == nil && outPayload != nil {outPayload.SentTime = time.Now()stats.HandleRPC(ctx, outPayload)}// t.NewStream(...) could lead to an early rejection of the RPC (e.g., the service/method// does not exist.) so that t.Write could get io.EOF from wait(...). Leave the following// recvResponse to get the final status.if err != nil && err != io.EOF {return nil, err}// Sent successfully.return stream, nil }可以看到這個函數(shù)相當簡單,做了兩件事情,編碼壓縮數(shù)據(jù)并發(fā)送.再來看看recvResponse函數(shù):
func recvResponse(ctx context.Context, dopts dialOptions, t transport.ClientTransport, c *callInfo, stream *transport.Stream, reply interface{}) (err error) {// Try to acquire header metadata from the server if there is any.defer func() {if err != nil {if _, ok := err.(transport.ConnectionError); !ok {t.CloseStream(stream, err)}}}()c.headerMD, err = stream.Header()if err != nil {return}p := &parser{r: stream}var inPayload *stats.InPayloadif stats.On() {inPayload = &stats.InPayload{Client: true,}}for {//一直讀到流關(guān)閉if err = recv(p, dopts.codec, stream, dopts.dc, reply, math.MaxInt32, inPayload); err != nil {if err == io.EOF {break}return}}if inPayload != nil && err == io.EOF && stream.StatusCode() == codes.OK {// TODO in the current implementation, inTrailer may be handled before inPayload in some cases.// Fix the order if necessary.stats.HandleRPC(ctx, inPayload)}c.trailerMD = stream.Trailer()return nil }func recv(p *parser, c Codec, s *transport.Stream, dc Decompressor, m interface{}, maxMsgSize int, inPayload *stats.InPayload) error {//接受數(shù)據(jù)pf, d, err := p.recvMsg(maxMsgSize)if err != nil {return err}if inPayload != nil {inPayload.WireLength = len(d)}if err := checkRecvPayload(pf, s.RecvCompress(), dc); err != nil {return err}if pf == compressionMade {//解壓d, err = dc.Do(bytes.NewReader(d))if err != nil {return Errorf(codes.Internal, "grpc: failed to decompress the received message %v", err)}}if len(d) > maxMsgSize {// TODO: Revisit the error code. Currently keep it consistent with java// implementation.return Errorf(codes.Internal, "grpc: received a message of %d bytes exceeding %d limit", len(d), maxMsgSize)}//數(shù)據(jù)解碼if err := c.Unmarshal(d, m); err != nil {return Errorf(codes.Internal, "grpc: failed to unmarshal the received message %v", err)}if inPayload != nil {inPayload.RecvTime = time.Now()inPayload.Payload = m// TODO truncate large payload.inPayload.Data = dinPayload.Length = len(d)}return nil }這里可以看到一個recvRespon可能會處理多個返回,但是確實在同一個for循環(huán)中處理的,有點奇怪??蛻舳舜a大概就是這個流程。代碼來說不算太復(fù)雜。(主要不鉆進http2的實現(xiàn),剛開始我就去看http2,一頭霧水) 其中還有重要的地方就是負載均衡,通過它我們可以根據(jù)算法自動選擇要連接的ip跟地址,還有驗證的使用,放到下一篇吧
4?相關(guān)鏈接
總結(jié)
以上是生活随笔為你收集整理的GRPC golang版源码分析之客户端(一)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 如何学习区块链技术?
- 下一篇: GRPC golang版源码分析之客户端