ASP.NET Core 3.0 gRPC 双向流
目錄
ASP.NET Core 3.0 使用gRPC
ASP.NET Core 3.0 gRPC 雙向流
ASP.NET Core 3.0 gRPC 認證授權
一.前言
在前一文 《
二. 什么是 gRPC 流
gRPC 有四種服務類型,分別是:簡單 RPC(Unary RPC)、服務端流式 RPC (Server streaming RPC)、客戶端流式 RPC (Client streaming RPC)、雙向流式 RPC(Bi-directional streaming RPC)。它們主要有以下特點:
| 簡單 RPC | 一般的rpc調用,傳入一個請求對象,返回一個返回對象 |
| 服務端流式 RPC | 傳入一個請求對象,服務端可以返回多個結果對象 |
| 客戶端流式 RPC | 客戶端傳入多個請求對象,服務端返回一個結果對象 |
| 雙向流式 RPC | 結合客戶端流式RPC和服務端流式RPC,可以傳入多個請求對象,返回多個結果對象 |
三.為什么 gRPC 支持流
gRPC 通信是基于 HTTP/2 實現的,它的雙向流映射到 HTTP/2 流。HTTP/2 具有流的概念,流是為了實現HTTP/2的多路復用。流是服務器和客戶端在HTTP/2連接內用于交換幀數據的獨立雙向序列,邏輯上可看做一個較為完整的交互處理單元,即表達一次完整的資源請求、響應數據交換流程;一個業務處理單元,在一個流內進行處理完畢,這個流生命周期完結。
特點如下:
一個HTTP/2連接可同時保持多個打開的流,任一端點交換幀
流可被客戶端或服務器單獨或共享創建和使用
流可被任一端關閉
在流內發送和接收數據都要按照順序
流的標識符自然數表示,1~2^31-1區間,有創建流的終端分配
流與流之間邏輯上是并行、獨立存在
摘自?HTTP/2筆記之流和多路復用?by 聶永
四.gRPC中使用雙向流調用
我們在前文中編寫的RPC屬于簡單RPC,沒有包含流調用,下面直接講一下雙向流,根據第二小節列舉的四種服務類型,如果我們掌握了簡單RPC和雙向流RPC,那么服務端流式 RPC和客戶端流式 RPC自然也就沒有問題了。
這里我們繼續使用前文的代碼,要實現的目標是一次給多個貓洗澡。
① 首先在?LuCat.proto?定義兩個rpc,一個 Count 用于統計貓的數量,一個 雙向流 RPC BathTheCat 用于給貓洗澡
syntax = "proto3";option csharp_namespace = "AspNetCoregRpcService";import "google/protobuf/empty.proto"; package LuCat;service LuCat{rpc BathTheCat(stream BathTheCatReq) returns ( stream BathTheCatResp);rpc Count(google.protobuf.Empty) returns (CountCatResult); }message SuckingCatResult{string message=1; } message BathTheCatReq{int32 id=1; } message BathTheCatResp{string message=1; } message CountCatResult{int32 Count=1; }② 添加服務的實現
這里安利下Resharper,非常方便
private readonly ILogger<LuCatService> _logger; private static readonly List<string> Cats=new List<string>(){"英短銀漸層","英短金漸層","美短","藍貓","貍花貓","橘貓"}; private static readonly Random Rand=new Random(DateTime.Now.Millisecond);public LuCatService(ILogger<LuCatService> logger) {_logger = logger; }public override async Task BathTheCat(IAsyncStreamReader<BathTheCatReq> requestStream, IServerStreamWriter<BathTheCatResp> responseStream, ServerCallContext context) {var bathQueue=new Queue<int>();while (await requestStream.MoveNext()){bathQueue.Enqueue(requestStream.Current.Id);_logger.LogInformation($"Cat {requestStream.Current.Id} Enqueue.");}while (bathQueue.TryDequeue(out var catId)){await responseStream.WriteAsync(new BathTheCatResp() { Message = $"鏟屎的成功給一只{Cats[catId]}洗了澡!" });await Task.Delay(500);} }public override Task<CountCatResult> Count(Empty request, ServerCallContext context) {return Task.FromResult(new CountCatResult(){Count = Cats.Count}); }BathTheCat 方法會接收多個客戶端發來的CatId,然后將他們加入隊列中,等客戶端發送完成后開始依次洗澡并返回給客戶端。
③ 客戶端實現
隨機發送10個貓Id給服務端,然后接收10個洗澡結果。
var channel = GrpcChannel.ForAddress("https://localhost:5001"); var catClient = new LuCat.LuCatClient(channel);var catCount = await catClient.CountAsync(new Empty()); Console.WriteLine($"一共{catCount.Count}只貓。"); var rand = new Random(DateTime.Now.Millisecond);var bathCat = catClient.BathTheCat();var bathCatRespTask = Task.Run(async() => {await foreach (var resp in bathCat.ResponseStream.ReadAllAsync()){Console.WriteLine(resp.Message);} });for (int i = 0; i < 10; i++) {await bathCat.RequestStream.WriteAsync(new BathTheCatReq() {Id = rand.Next(0, catCount.Count)}); }await bathCat.RequestStream.CompleteAsync(); Console.WriteLine("客戶端已發送完10個需要洗澡的貓id"); Console.WriteLine("接收洗澡結果:");await bathCatRespTask;Console.WriteLine("洗澡完畢");④ 運行
可以看到雙向流調用成功,客戶端發送了10個貓洗澡請求對象,服務端返回了10個貓洗澡結果對象。且是實時推送的,這就是 gRPC 的雙向流調用。
這里大家可以自行改進來演變成客戶端流式或者服務端流式調用。客戶端發送一個貓Id列表,然后服務端返回每個貓洗澡結果,這就是服務端流式調用。客戶端依次發送貓Id,然后服務端一次性返回所有貓的洗澡結果(給所有貓洗澡看做是一個業務,返回這個業務的結果),就是客戶端流式調用。這里我就不再演示了。
五.流控制
gRPC 的流式調用支持對流進行主動取消的控制,進而可以衍生出流超時限制等控制。
在流式調用是,可以傳一個 CancellationToken 參數,它就是我們用來對流進行取消控制的:
改造一下我們在第四小節的代碼:
① 客戶端
var cts = new CancellationTokenSource();cts.CancelAfter(TimeSpan.FromSeconds(2.5)); var bathCat = catClient.BathTheCat(cancellationToken: cts.Token);var bathCatRespTask = Task.Run(async() => {try{await foreach (var resp in bathCat.ResponseStream.ReadAllAsync()){Console.WriteLine(resp.Message);}}catch (RpcException ex) when (ex.StatusCode == StatusCode.Cancelled){Console.WriteLine("Stream cancelled.");} });② 服務端
while (!context.CancellationToken.IsCancellationRequested && bathQueue.TryDequeue(out var catId)) {_logger.LogInformation($"Cat {catId} Dequeue.");await responseStream.WriteAsync(new BathTheCatResp() { Message = $"鏟屎的成功給一只{Cats[catId]}洗了澡!" });await Task.Delay(500); }③ 運行
設置的是雙向流式調用2.5s后取消流,從客戶端調用結果看到,并沒有收到全部10個貓的洗澡返回結果,流就已經被取消了,這就是 gRPC 的流控制。
六.結束
這里流式調用可以實現實時推送,服務端到客戶端或者客戶端到服務端短實時推送消息,但是這個和傳統意義上的長連接主動推送、廣播消息不一樣,不管你是服務端流式、客戶端流式還是雙向流式,必須要由客戶端進行發起,通過客戶端請求來建立流通信。
七.參考資料
GRPC的四種服務類型?by twtydgo
HTTP/2筆記之流和多路復用?by 聶永
本文所用代碼
原文鏈接:https://www.cnblogs.com/stulzq/p/11590088.html
.NET社區新聞,深度好文,歡迎訪問公眾號文章匯總?http://www.csharpkit.com?
總結
以上是生活随笔為你收集整理的ASP.NET Core 3.0 gRPC 双向流的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Asp.Net Core Mvc Raz
- 下一篇: 开源公司被云厂商“寄生”,咋整?