你不知道的gRPC反向代理
一個gRPC Proxy的實現方案
導語
可用性、可靠性和擴展性是衡量后臺服務的基本標準,HTTP反向代理,是任何一個提供大型Web服務后臺所必備的,用以提高服務的這些基礎參數,且通過支持到負載均衡而進一步提升服務性能。然而,隨著微服務框架的盛行,RPC技術也已經開始承載大量的微服務之間的通信,在眾多RPC技術中,gRPC是Google開源的通用高性能RPC框架,因此,一個支持gRPC的反向代理的需求應運而生。
一個基于gRPC的微服務架構示例
背景知識
gRPC底層通信使用了HTTP/2技術,關于HTTP/2相對于HTTP/1.1的區別和優勢,有興趣的同學,可以通過下面的鏈接詳細了解HTTP/2.0 相比1.0有哪些重大改進,有助于增加本文的理解。在此,我們不會詳細的去談HTTP/2的特性,簡單的總結,之所以使用HTTP/2,主要是由于HTTP/2的這兩點:
Streaming
Stream是HTTP/2中是一個邏輯上的概念,指的是在一個TCP連接上,我們可以向對方不斷發送一個個的消息,這里每一個消息即是一個frame,而每一個frame中有個stream identifier的字段標明這一幀屬于哪個“stream”。當對方接收到每一個frame時,根據stream identifier拼接每個“stream”的所有frame組成一整塊數據。
Multiplexing
眾所周知,HTTP/1.1中的已經有了keepAlive功能,相比于KeepAlive,在HTTP/2中的通過分幀技術實現了Multiplexing,從而更高效的實現了數據傳輸,一個速度展示的示例:http2響應速度展示
下圖是HTTP/1.1與HTTP/2的數據傳輸區別圖示:
HTTP/1.1與HTTP/2數據傳輸的比較
需要做什么
接下來的內容,建立在讀者對gRPC已經有了一個很好的使用和了解的基礎上。同時,也依然強烈建議通過閱讀從源碼透析gRPC調用原理一文來較深入地理解gRPC調用過程中的一些細節實現,從而更好地理解為什么會有下文的實現思路。
在說到具體如何實現grpc的proxy之前,我們先來分析一下為了實現gRPC的反向代理,我們需要做到什么。作為代理,一言以蔽之,需要做到的是兩件事:
- 接收請求方的gRPC請求,并透明的數據轉發到對應的響應方;
- 接收響應方的gRPC回復,并透明的將數據回送到原請求方;
在gRPC中,為了實現上述功能,主要有這些特性需要實現:
1. 特殊的編解碼-codec
我們知道,gRPC在收到消息后,會根據protobuf的codec來對消息進行編解碼。然而,在proxy的內部,其既需要作為server端接收數據,又需要作為client端發送數據。而對于數據在proxy內部轉發的時候,不需要對其進行任何形式的編解碼(換句話說,只需要將數據當成原始裸數據,直接轉發即可),不可以采用gRPC自帶的codec。因此,在gRPC中,對于收到的protobuf消息,我們需要采用默認的protobuf codec,而對于proxy內部的數據轉發我們需要一個對協議的無感知僅作轉發的codec。
2. 流控制器-stream director
為了實現在編解碼中說到的,在前后端服務之間做到數據的正確轉發,當proxy接收到任何一個請求流之后,需要根據該流攜帶的信息,判斷出正確的對應的目的方,并建立到該目的方的鏈接connection,從而將請求流通過該connection發送到目的方。
3. 數據處理器-stream handler
當獲得到特性2中的connection之后,便需要一個數據處理的handler,通過該connection去真正操作數據的轉發。關于該handler的實現,根據在上文中分析,需要同時做到一邊接收請求方數據并推送到響應方,一邊接收響應方數據并回復到請求方。
4. 負載均衡-balancer
當實現了上述三點之后,proxy的基本功能已經可以滿足了。但是,負載均衡的功能一般都是會附帶支持的,這里可以可以實現一個balancer的接口,通過不同的負載均衡算法對balancer的實現來支持到不同的負載均衡算法。
實現
codec
無論是請求方的數據流,還是響應方的數據流,對于proxy服務來說都是數據流的進入,也即是proxy需要作為一個server的身份來處理這些請求。因此,proxy需要作為gRPC Server來啟動,也即是:
s := grpc.NewServer()在gRPC中,通過從源碼透析gRPC調用原理一文,我們了解到作為server啟動時,支持一系列的ServerOption來實現gRPC的一些服務端配置。為了實現特性1中說到的特殊的編解碼,我們需要借助grpc.CustomCodec()方法來實現:
func CustomCodec(codec Codec) ServerOption {return func(o *options) {o.codec = codec} }type Codec interface {// Marshal returns the wire format of v.Marshal(v interface{}) ([]byte, error)// Unmarshal parses the wire format into v.Unmarshal(data []byte, v interface{}) error// String returns the name of the Codec implementation. This is unused by// gRPC.String() string }CustomCodec()函數返回一個ServerOption類型的實例在NewServer()時作為參數傳入,從而用于設定grpc中消息的Marshal和Unmarshal,也即是在初始化server時我們需要這樣來寫:
grpc.NewServer(grpc.CustomCodec(myCodec))其中,myCodec即是一個需要我們去實現的,一個支持到了grpc.Codec接口的類型的實例。我們將通過下面的Codec()函數來返回該實例,從而支持到了自定義codec:
// 返回了一個grpc.Codec類型的實例, // 以protobuf原生codec為默認codec,實現了一個透明的Marshal和UnmarshMal func Codec() grpc.Codec {return CodecWithParent(&protoCodec{}) }// 一個協議無感知的codec實現,返回一個grpc.Codec類型的實例 // 該函數嘗試將gRPC消息當作raw bytes來實現,當嘗試失敗后,會有fallback作為一個后退的codec func CodecWithParent(fallback grpc.Codec) grpc.Codec {return &rawCodec{fallback} }// 自定義codec類型, // 實現了grpc.Codec接口中的Marshal和Unmarshal // 成員變量parentCodec用于當自定義Marshal和Unmarshal失敗時的回退codec type rawCodec struct {parentCodec grpc.Codec }type frame struct {payload []byte }// 序列化函數, // 嘗試將消息轉換為*frame類型,并返回frame的payload實現序列化 // 若失敗,則采用變量parentCodec中的Marshal進行序列化 func (c *rawCodec) Marshal(v interface{}) ([]byte, error) {out, ok := v.(*frame)if !ok {return c.parentCodec.Marshal(v)}return out.payload, nil}// 反序列化函數, // 嘗試通過將消息轉為*frame類型,提取出payload到[]byte,實現反序列化 // 若失敗,則采用變量parentCodec中的Unmarshal進行反序列化 func (c *rawCodec) Unmarshal(data []byte, v interface{}) error {dst, ok := v.(*frame)if !ok {return c.parentCodec.Unmarshal(data, v)}dst.payload = datareturn nil }func (c *rawCodec) String() string {return fmt.Sprintf("proxy>%s", c.parentCodec.String()) }//----------------------- // protoCodec實現protobuf的默認的codec type protoCodec struct{}func (protoCodec) Marshal(v interface{}) ([]byte, error) {return proto.Marshal(v.(proto.Message)) }func (protoCodec) Unmarshal(data []byte, v interface{}) error {return proto.Unmarshal(data, v.(proto.Message)) }func (protoCodec) String() string {return "proto" }StreamDirector
關于特性2中的流轉發器,需要根據請求流的meta信息,判斷出正確的對應的目的方,返回一個正確到目的方的connection,從而方便之后的handler在拿到正確的connection之后,實現數據包的透明轉發。
為了去獲取到請求流的meta信息,我們有兩種方式:
第一,我們可以結合gRPC協議的特性,根據grpc.MethodFromServerStream()函數,從grpc client的請求中剝離出調用的接口名fullMethodName:
// 該函數根據請求的strem,返回請求的方法名(string類型) // 返回的字符串格式為:"/service/method". func MethodFromServerStream(stream ServerStream) (string, bool) {return Method(stream.Context()) }第二,同樣是依托于gRPC的特性,我們利用了其metadata,通過在請求方傳入metadata參數,當proxy收到請求流之后在讀取出metadata,從而根據metada來進行endpoint的選擇。
無論采用哪種方式,當我們找到endpoint并與之建立鏈接后,便可以得到期望的connection,下方是一個StreamDirector的實現:
func GetDirector() func(context.Context, string) (context.Context, *grpc.ClientConn, error) {return func(ctx context.Context, fullMethodName string) (context.Context, *grpc.ClientConn, error) {// 獲取配置信息appConfig := config.GetViper(inited.ConfigPrefix)var cfg lconfig.Configerr := appConfig.Unmarshal(&cfg)if err != nil {log.LoggerWrapperWithCaller().Errorf(err.Error())return nil, nil, err}// 尋找對應到fullMethodName的endpoint for _, backend := range cfg.Backends {// 僅轉發外部的請求if strings.HasPrefix(fullMethodName, backend.Filter) {md, ok := metadata.FromIncomingContext(ctx)if !ok {err := fmt.Errorf("incoming metadata is empty")log.LoggerWrapperWithCaller().Errorf(err.Error())return nil, nil, err}outCtx, _ := context.WithCancel(ctx)outCtx = metadata.NewOutgoingContext(outCtx, md.Copy())if ok {// 根據gRPC中的metadata,獲取到對應的endpointif val, exists := md[MODULE_TEXT]; exists && inSlice(val[0], backend.Module) {log.LoggerWrapperWithCaller().Debugf("Found md caller [%v]", val)// 做一個負載均衡endpoint, err := getBackendByRR(backend)if err != nil {log.LoggerWrapperWithCaller().Errorf("grpc.DialContext failed: ", err.Error())return nil, nil, err}log.LoggerWrapperWithCaller().Debugf("balanced, redirecting to [%v]", endpoint)// 根據獲取到的endpoint,建立到目的方的connection// 同時,需要配置客戶端codec為我們自定義的codecconn, err := grpc.DialContext(ctx, endpoint, grpc.WithCodec(proxy.Codec()), grpc.WithInsecure())if err != nil {log.LoggerWrapperWithCaller().Errorf("grpc.DialContext failed: ", err.Error())return nil, nil, err}return outCtx, conn, err}}}}return nil, nil, fmt.Errorf("Unknown method")}}在上述代碼中,有三點需要額外解釋的:
首先,在調用gRPC.DialContext()創建連接的時候,我們傳入了自定義codec用以配置client端的編解碼,在從源碼透析gRPC調用原理一文中有對該實現的原理的具體分析。
其次,appConfig是從配置文件加載到的數據,下面列了一個示例配置。在demo中,proxy代理了兩種類型的服務:
- 加解密的crypto服務
- 與cos交互的服務
且兩種服務均有兩份實例進行運行著。對于crypto服務,運行在"127.0.0.1:12001"和"127.0.0.1:12002",對于cos服務的兩個運行實例,則是"127.0.0.1:33001"和"127.0.0.1:33002"。多份實例存在的原因是為了實現負載均衡,下文將會單獨介紹這一功能。
[project] name = "grpc-proxy" version = "1.0.0"[server] ip = "0.0.0.0" port = 50051[[backends]] tag = 0 backend = ["127.0.0.1:12001","127.0.0.1:12002"] filter = "/cloud.crypto.pb.CryptoService" module = ["crypto"][[backends]] tag = 1 backend = ["127.0.0.1:33001","127.0.0.1:33002"] filter = "/cloud.cos.pb.CosService" module = ["cos"]最后,需要注意的是,通過gRPC請求的fullMethodName,顯而易見的是我們完全能夠支持到接口級別的代理,根據不同的接口扔到不同的后端服務中。與此同時,通過支持了metadata來進行endpoint的選擇,那么就完全可以實現任意自定義的路由組合。
StreamHandler
作為server來啟動時,為了實現一個協議無感知的代理,我們需要利用到了grpc中的UnknownServiceHandler()接口。同樣地,UnknownServiceHandler()是返回一個ServerOption類型的實例在NewServer()時作為參數傳入,其主要功能是支持了一個自定義的對未知服務的handler。通過配置了該方法,當grpc server接收到一個未注冊的服務時,不再返回一個“unimplemented”的gRPC錯誤,而是通過我們實現的handler來進行服務,從而實現了協議無感知的proxy。也是因此,我們的handler一定是一個bidi-streaming RPC handler。因此,我們啟動服務時的命令變成了這樣:
grpc.NewServer(grpc.CustomCodec(proxy.Codec()),grpc.UnknownServiceHandler(proxy.TransparentHandler(director)))下面,就讓我們來看下,如何實現這樣一個bidi-streaming的handler,來完成流的轉發。
// 該handler以gRPC server的模式來接受數據流,并將受到的數據轉發到指定的connection中 func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) error {// 獲取請求流的目的接口名稱fullMethodName, ok := grpc.MethodFromServerStream(serverStream)if !ok {return grpc.Errorf(codes.Internal, "failed to get method from server stream")}// 該director即為上述的StreamDirector,獲取到對應的目的方connectionoutgoingCtx, backendConn, err := s.director(serverStream.Context(), fullMethodName)if err != nil {return err}defer backendConn.Close()// 封裝為clientStreamclientCtx, clientCancel := context.WithCancel(outgoingCtx)clientStream, err := grpc.NewClientStream(clientCtx, clientStreamDescForProxying, backendConn, fullMethodName)if err != nil {return err}// 啟動流控,目的方->請求方s2cErrChan := s.forwardServerToClient(serverStream, clientStream)// 啟動流控,請求方->目的方 c2sErrChan := s.forwardClientToServer(clientStream, serverStream)// 數據流結束處理 & 錯誤處理for i := 0; i < 2; i++ {select {case s2cErr := <-s2cErrChan:if s2cErr == io.EOF {// 正常結束clientStream.CloseSend()break} else {// 錯誤處理 (如鏈接斷開、讀錯誤等) clientCancel()return grpc.Errorf(codes.Internal, "failed proxying s2c: %v", s2cErr)}case c2sErr := <-c2sErrChan:// 設置TrailerserverStream.SetTrailer(clientStream.Trailer())if c2sErr != io.EOF {return c2sErr}return nil}}return grpc.Errorf(codes.Internal, "gRPC proxying should never reach this stage.") }func (s *handler) forwardClientToServer(src grpc.ClientStream, dst grpc.ServerStream) chan error {ret := make(chan error, 1)go func() {// 設置*frame結構作為RecvMsg的參數,// *frame即為我們自定義codec中使用到的數據結構f := &frame{}for i := 0; ; i++ {if err := src.RecvMsg(f); err != nil {ret <- errbreak}if i == 0 {// grpc中客戶端到服務器的header只能在第一個客戶端消息后才可以讀取到,// 同時又必須在flush第一個msg之前寫入到流中。md, err := src.Header()if err != nil {ret <- errbreak}if err := dst.SendHeader(md); err != nil {ret <- errbreak}}if err := dst.SendMsg(f); err != nil {ret <- errbreak}}}()return ret }func (s *handler) forwardServerToClient(src grpc.ServerStream, dst grpc.ClientStream) chan error {ret := make(chan error, 1)go func() {f := &frame{}for i := 0; ; i++ {if err := src.RecvMsg(f); err != nil {ret <- errbreak}if err := dst.SendMsg(f); err != nil {ret <- errbreak}}}()return ret }其中,實現流控的兩個函數分別是:
- forwardClientToServer(),將請求方的數據發送到相應的目的方,通過src.RecvMsg(f)來接收后請求方的數據,并通過dst.SendMsg(f)將數據再推送到目的方。
- forwardServerToClient(),負責將后臺服務端的數據發送到請求方,通過src.RecvMsg(f)來接收后臺服務的數據,并通過dst.SendMsg(f)將數據再推送到請求方。
我們通過forwardServerToClient()來詳細介紹如何實現轉發的。在forwardServerToClient()中,啟動了一個go程,持續著接收來自于src(也就是server端)的數據流,并在收到后將數據轉發到dst(client端)。
對于src.RecvMsg(f),最終會調用grpc/rpc_util.go中的recv()函數,除去一些不關鍵的代碼,我們看到,其通過codec中的Unmarshal()將接收到的data反序列化到了參數m中,而m就是src.RecvMsg(f)中的f實參。
func recv(p *parser, c baseCodec, s *transport.Stream, dc Decompressor, m interface{}, maxReceiveMessageSize int, inPayload *stats.InPayload, compressor encoding.Compressor) error {pf, d, err := p.recvMsg(maxReceiveMessageSize)...if err := c.Unmarshal(d, m); err != nil {return status.Errorf(codes.Internal, "grpc: failed to unmarshal the received message %v", err)}... return nil }而在這兒使用到的codec,即是我們上文中實現的codec,因為f是我們定義的frame{}結構,所以自定義的dst, ok := v.(*frame)會得到成功執行,并通過frame.payload承載原始的data數據。
func (c *rawCodec) Unmarshal(data []byte, v interface{}) error {dst, ok := v.(*frame)if !ok {return c.parentCodec.Unmarshal(data, v)}dst.payload = datareturn nil }同樣,在dst.SendMsg(f)時,也將會通過rawCodec實現的Marshal()方法,將frame中的payload取出并發送。從而也就實現了對于proxy內部的數據的透明傳輸。
func (c *rawCodec) Marshal(v interface{}) ([]byte, error) {out, ok := v.(*frame)if !ok {return c.parentCodec.Marshal(v)}return out.payload, nil}Balancer
如上所說,負載均衡并不是proxy所必須的,可以說是一個錦上添花的附加功能。為了實現負責均衡,就需要在director中獲取到connection之前運用負載均衡算法來獲取到endpoint:
// a roundrobin balancer endpoint, err := getBackendByRR(backend)我們在這兒實現的是一個簡單的RoundRobin的負載均衡,通過RoundRobin算法拿到合適的目的方endpoint并通過grpc.Dial()建立到目的方的鏈接,獲取connection。
func getBackendByRR(backend lconfig.BackendConfig) (endpoint string, err error) {tag := backend.Tagendpoints := backend.Backendif rrPicker == nil {err = fmt.Errorf("getBackendByRR failed, picker is nil")log.LoggerWrapperWithCaller().Errorf(err.Error())return}iendpoints := make([]interface{}, 0)for _, v := range endpoints {iendpoints = append(iendpoints, v)}rrPicker.SetCandidates(tag, iendpoints)end, err := rrPicker.Pick(tag)if err != nil {log.LoggerWrapperWithCaller().Errorf(err.Error())return}endpoint, ok := end.(string)if !ok {err = fmt.Errorf("Pick reply format failed, [%v]", end)log.LoggerWrapperWithCaller().Errorf(err.Error())return}return }func (r *RoundRobinPicker) Pick(tag int) (interface{}, error) {r.mu.Lock()defer r.mu.Unlock()candidate, ok := r.candidates[tag]if !ok || len(candidate) <= 0 {err := fmt.Errorf("roundrobin candidates [%v] is empty", tag)return nil, err}if r.next[tag] >= len(candidate) {r.next[tag] = 0}sc := candidate[r.next[tag]]r.next[tag] = (r.next[tag] + 1) % len(candidate)return sc, nil }總結
gRPC,作為google開源的高性能RPC方案,已經日趨成熟且被眾多框架中使用了。這次因為開發項目中有了對gRPC proxy的需求,結合互聯網這一廣大的資源集合處來實現,同時也是借助了這次機會,更多的了解了gRPC的實現。不得不說,gRPC中的知識點還是很多的,其深層的處理數據的方式,諸如數據包的接收、HTTP/2協議的處理等,都是有著很多細節的實現以至于在本文中沒有更多的精力去一一解讀。如果大家在閱讀本文中有一些實現的疑問,可以留言,也可以去讀一讀源碼,我相信,一定是會收獲很多的!
相關閱讀
?
https://cloud.tencent.com/developer/article/1189548
總結
以上是生活随笔為你收集整理的你不知道的gRPC反向代理的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 一种绕开反病毒引擎的方法
- 下一篇: Shell 企业29道面试题 [转]