NET Core微服务之路:让我们对上一个Demo通讯进行修改,完成RPC通讯
生活随笔
收集整理的這篇文章主要介紹了
NET Core微服务之路:让我们对上一个Demo通讯进行修改,完成RPC通讯
小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.
最近一段時(shí)間有些事情耽擱了更新,抱歉各位了。 上一篇我們簡(jiǎn)單的介紹了DotNetty通信框架,并簡(jiǎn)單的介紹了基于DotNetty實(shí)現(xiàn)了回路(Echo)通信過程。 我們來回憶一下上一個(gè)項(xiàng)目的整個(gè)流程: 當(dāng)服務(wù)端啟動(dòng)后,綁定并監(jiān)聽(READ)設(shè)定的端口,比如1889。 當(dāng)客戶端啟動(dòng)后,綁定指定端口,等待用戶輸入。 當(dāng)用戶輸入任意字符串?dāng)?shù)據(jù)后,客戶端將這組數(shù)據(jù)進(jìn)行轉(zhuǎn)碼為byte格式進(jìn)行傳輸?shù)椒?wù)端。 當(dāng)服務(wù)端收到客戶端傳來的數(shù)據(jù),進(jìn)行轉(zhuǎn)碼后輸出控制臺(tái),并將這組數(shù)據(jù)再次回傳到客戶端。 客戶端收到數(shù)據(jù),也打印出來。 服務(wù)端不可能都通過這樣笨拙的過濾方式來調(diào)用方法吧?是的,這只是DEMO,為了演示和理解基礎(chǔ)概念而已,而是要?jiǎng)舆^動(dòng)態(tài)代理來實(shí)現(xiàn)方法Invoke。 這個(gè)DEMO只是一個(gè)點(diǎn)對(duì)點(diǎn)的遠(yuǎn)程調(diào)用,不會(huì)涉及到任何服務(wù)路由和轉(zhuǎn)發(fā)等高級(jí)特性。 有新的接口的時(shí)候時(shí)候,需要重新編譯和暴露,如果有上萬個(gè)新的接口,這樣的重復(fù)工作豈不是瘋了。 ...etc 這里推薦一下最近構(gòu)建的一個(gè)小框架:Easy.Rpc(連接點(diǎn)我),實(shí)現(xiàn)了路由,轉(zhuǎn)發(fā),代理,動(dòng)態(tài)編譯的特性。這里也幫朋友們推薦一個(gè)同樣基于DotNetty的RPC框架(連接點(diǎn)我)張隊(duì)推薦我加入他們,可我不知道怎么加入他們的團(tuán)隊(duì),悲催啊... 簡(jiǎn)單介紹一下使用方法,本篇不詳細(xì)介紹這個(gè)框架是如何實(shí)現(xiàn)的,估計(jì)會(huì)好幾十萬字,單獨(dú)擰出來做個(gè)系列會(huì)更好,框架設(shè)計(jì)需要哪些原則,需要考慮到的問題,包含設(shè)計(jì)模式、依賴注入、動(dòng)態(tài)代理、動(dòng)態(tài)編譯、路由轉(zhuǎn)發(fā)等等特性。 默認(rèn)例子中該類沒有任何繼承,因此不會(huì)存在一個(gè)妖孽問題,但如果UserModel是一個(gè)子類,他繼承于一個(gè)父類,而這個(gè)父類也同樣擁有多個(gè)子類,直接ProtoContract參與序列化將會(huì)報(bào)錯(cuò),需要在特性上增加DataMemberOffset = x,此處的x不是字母,而是這個(gè)子類的一個(gè)序列化順序。比如有3個(gè)子類繼承同一個(gè)父類,前面兩個(gè)子類的偏移量分別是1和2,那么這個(gè)類的偏移量將設(shè)置為3,以此類推。 默認(rèn)的數(shù)據(jù)類型中,系統(tǒng)定義的標(biāo)準(zhǔn)類型沒問題,但有個(gè)妖孽的int[]這樣的數(shù)組類型,那也將是個(gè)噩夢(mèng),官網(wǎng)團(tuán)隊(duì)沒有解釋為何不支持?jǐn)?shù)組的序列化,我猜測(cè)估計(jì)是因?yàn)閿?shù)組的不規(guī)則性(比如多維數(shù)組、甚至不規(guī)則的多維數(shù)組)而放棄了這個(gè)類型的序列化,畢竟序列化是不能影響性能的。
?
很簡(jiǎn)單的實(shí)現(xiàn)了一個(gè)點(diǎn)對(duì)點(diǎn)的通信例子。接下來我們將對(duì)這個(gè)DEMO進(jìn)行簡(jiǎn)單的修改,模擬最簡(jiǎn)單的gRPC通信的一個(gè)構(gòu)造過程。 本篇很簡(jiǎn)單,只要實(shí)現(xiàn)了上一個(gè)demo,稍作修改,就能實(shí)現(xiàn)gRPC了(當(dāng)然實(shí)際構(gòu)建gRPC根本不會(huì)這么簡(jiǎn)單),本篇也是順帶一下這幾天搞出來的一個(gè)輕量級(jí)RPC框架,先接上一個(gè)例子。服務(wù)端
增加兩個(gè)靜態(tài)方法SayHello和SayByebye,用于提供遠(yuǎn)程調(diào)用,超級(jí)簡(jiǎn)單,不解釋。
public static class Say {public static string SayHello(string content){return $"hello {content}";}public static string SayByebye(string content){return $"byebye {content}";} }?
?
在我們?cè)瓉淼腃hannelRead函數(shù)中,將原有的Echo回路傳輸,直接替換成如下內(nèi)容。
1 public override void ChannelRead(IChannelHandlerContext context, object message) 2 { 3 if (message is IByteBuffer buffer) 4 { 5 Console.WriteLine($"message length is {buffer.Capacity}"); 6 var obj = JsonConvert.DeserializeObject<Dictionary<string, string>>(buffer.ToString(Encoding.UTF8).Replace(")", "")); // (1) 7 8 byte[] msg = null; 9 if (obj["func"].Contains("sayHello")) // (2) 10 { 11 msg = Encoding.UTF8.GetBytes(Say.SayHello(json["username"])); 12 } 13 14 if (obj["func"].Contains("sayByebye")) // (2) 15 { 16 msg = Encoding.UTF8.GetBytes(Say.SayByebye(json["username"])); 17 } 18 19 if (msg == null) return; 20 // 設(shè)置Buffer大小 21 var b = Unpooled.Buffer(msg.Length, msg.Length); // (3) 22 IByteBuffer byteBuffer = b.WriteBytes(msg); // (4) 23 context.WriteAsync(byteBuffer); // (5) 24 } 25 }?
?
(1):有這樣一句話Replace(")", ""),筆者不知為何每次傳送過來從buffer里轉(zhuǎn)義出來的字符串,始終會(huì)有一個(gè)左括號(hào)在里面,也許是消息頭,也許是protobuf-net的標(biāo)記頭,因?yàn)槎际莃yte格式,在服務(wù)端偷懶就沒有再進(jìn)行一次protobuf的反序列化了。 為何要用Dictionary來作為中間對(duì)象轉(zhuǎn)換,因?yàn)樾蛄谢枰獙?shí)體對(duì)象作為類型,為了簡(jiǎn)單的介紹RPC,目前也就這么干了,例如上面代碼所示。 (2):通過判斷“func”字段中的內(nèi)容進(jìn)行方法調(diào)用,并將調(diào)用過程的返回結(jié)果轉(zhuǎn)為BYTE格式。 (3):設(shè)置本次傳輸中的Buffer大小。 (4):將消息(數(shù)據(jù))寫入到DotNetty的Buffer。 (5):最終將Buffer寫入到當(dāng)前上下文(包含通道,傳輸對(duì)象,連接對(duì)象等等)。客戶端
我們將上一個(gè)demo中的EchoClientHandler做如下修改,以完成一個(gè)簡(jiǎn)單的請(qǐng)求
1 public EchoClientHandler() 2 { 3 var hello = new Dictionary<string, string> // (1) 4 { 5 {"func", "sayHello"}, 6 {"username", "stevelee"} 7 }; 8 SendMessage(ToStream(JsonConvert.SerializeObject(hello))); 9 } 10 11 private byte[] ToStream(string msg) 12 { 13 Console.WriteLine($"string length is {msg.Length}"); 14 using (var stream = new MemoryStream()) // (2) 15 { 16 Serializer.Serialize(stream, msg); 17 return stream.ToArray(); 18 } 19 } 20 21 private void SendMessage(byte[] msg) 22 { 23 Console.WriteLine($"byte length is {msg.Length}"); 24 _initialMessage = Unpooled.Buffer(msg.Length, msg.Length); 25 _initialMessage.WriteBytes(msg); // (3) 26 }?
(1):建立與服務(wù)端相關(guān)的通信數(shù)據(jù)。 (2):將數(shù)據(jù)序列化為二進(jìn)制流。 (3):將數(shù)據(jù)寫入到ByteBuffer中。啟動(dòng)一下
由于在客戶端明文標(biāo)注了使用sayHello這個(gè)方法,客戶端會(huì)收到服務(wù)端返回的"hello stevelee"。
這樣一個(gè)最簡(jiǎn)單的RPC遠(yuǎn)程調(diào)用就完成了(其實(shí)上一篇就也屬于RPC,只是這里用方法和過濾來指定調(diào)用)。
?
?問題
Esay.Rpc
正如上面提到問題,需要解決這些問題,就需要修改諸多內(nèi)容, 例如把函數(shù)改為接口,把接口的定義放置服務(wù)端并對(duì)外開放相應(yīng)端口,把接口的實(shí)現(xiàn)同樣放置服務(wù)端,提供接口的調(diào)用,客戶端通過類似API的方式進(jìn)行遠(yuǎn)程接口調(diào)用,因此這個(gè)接口的定義必須單列的一個(gè)項(xiàng)目; 如何將接口自動(dòng)部署(暴露)出來,可以通過中間協(xié)調(diào)器(也叫服務(wù)注冊(cè)中心,如ETCD,consul,zookeeper),如何將這些接口自動(dòng)注冊(cè)到服務(wù)中心呢,需要實(shí)現(xiàn)反射自動(dòng)掃描并添加到注冊(cè)中心。 我們添加一個(gè)Rpc.Common的中間通用庫,當(dāng)然Easy.Rpc的框架源碼也在這個(gè)里面(框架目前不探討),添加IUserService接口,UserModel實(shí)體類,UserServiceImpl實(shí)現(xiàn)類。其實(shí)通用類庫只需要接口和實(shí)體就行,接口實(shí)現(xiàn)完全放置服務(wù)端,這樣這個(gè)庫也能完全分離出來。(不過筆者偷懶都寫到Rpc.Common庫中去了,實(shí)際生產(chǎn)決不能這么膜,分離,分離,分離,這也是微服務(wù)的主要概念之一) DEMO結(jié)構(gòu)如下(Easy.Rpc源碼目前也包含在這個(gè)里面,過兩天單獨(dú)拎出來做成框架,方便調(diào)用)?
?
先看看接口定義了些什么:
1 /// <summary> 2 /// 接口UserService的定義 3 /// </summary> 4 [RpcTagBundle] 5 public interface IUserService 6 { 7 Task<string> GetUserName(int id); 8 9 Task<int> GetUserId(string userName); 10 11 Task<DateTime> GetUserLastSignInTime(int id); 12 13 Task<UserModel> GetUser(int id); 14 15 Task<bool> Update(int id, UserModel model); 16 17 Task<IDictionary<string, string>> GetDictionary(); 18 19 Task Try(); 20 21 Task TryThrowException(); 22 } 8個(gè)接口,幾乎囊括了目前RPC調(diào)用測(cè)試的所有方法場(chǎng)景。接口實(shí)現(xiàn)就不貼了,你完全可以自定義接口的任何實(shí)現(xiàn),或者就一句Console.Write("哇涼哇涼完啦")都可以。 接口參數(shù)中有個(gè)UserModel的實(shí)體對(duì)象,這里也貼上來。 1 [ProtoContract] 2 public class UserModel 3 { 4 [ProtoMember(1)] public string Name { get; set; } 5 6 [ProtoMember(2)] public int Age { get; set; } 7 }?
上面有兩個(gè)不一樣的標(biāo)記,也是protobuf-net中獨(dú)有的特性。
ProtoContract標(biāo)記:該類是參與序列化內(nèi)容的數(shù)據(jù)類。 ProtoMember標(biāo)題:該類需要序列化的字段和順序。?
protobuf-net的坑
接下來繼續(xù)服務(wù)端的代碼
1 static void Main() 2 { 3 var bTime = DateTime.Now; 4 5 // 實(shí)現(xiàn)自動(dòng)裝配 6 var serviceCollection = new ServiceCollection(); 7 { 8 serviceCollection 9 .AddLogging() 10 .AddRpcCore() 11 .AddService() 12 .UseSharedFileRouteManager("d:\\routes.txt") 13 .UseDotNettyTransport(); 14 15 // ** 注入本地測(cè)試類 16 serviceCollection.AddSingleton<IUserService, UserServiceImpl>(); 17 } 18 19 // 構(gòu)建當(dāng)前容器 20 var buildServiceProvider = serviceCollection.BuildServiceProvider(); 21 22 // 獲取服務(wù)管理實(shí)體類 23 var serviceEntryManager = buildServiceProvider.GetRequiredService<IServiceEntryManager>(); 24 var addressDescriptors = serviceEntryManager.GetEntries().Select(i => new ServiceRoute 25 { 26 Address = new[] 27 { 28 new IpAddressModel {Ip = "127.0.0.1", Port = 9881} 29 }, 30 ServiceDescriptor = i.Descriptor 31 }); 32 var serviceRouteManager = buildServiceProvider.GetRequiredService<IServiceRouteManager>(); 33 serviceRouteManager.SetRoutesAsync(addressDescriptors).Wait(); 34 35 // 構(gòu)建內(nèi)部日志處理 36 buildServiceProvider.GetRequiredService<ILoggerFactory>().AddConsole((console, logLevel) => (int) logLevel >= 0); 37 38 // 獲取服務(wù)宿主 39 var serviceHost = buildServiceProvider.GetRequiredService<IServiceHost>(); 40 41 Task.Factory.StartNew(async () => 42 { 43 //啟動(dòng)主機(jī) 44 await serviceHost.StartAsync(new IPEndPoint(IPAddress.Parse("127.0.0.1"), 9881)); 45 }); 46 47 Console.ReadLine(); 48 } 全程基于serviceCollection實(shí)現(xiàn)自動(dòng)裝配和構(gòu)造,相信用過Ioc容器都能明白這上面幾條依賴注入和自動(dòng)構(gòu)建服務(wù)的含義。 再添加客戶端代碼: 1 static void Main() 2 { 3 var serviceCollection = new ServiceCollection(); 4 { 5 serviceCollection 6 .AddLogging() // 添加日志 7 .AddClient() // 添加客戶端 8 .UseSharedFileRouteManager(@"d:\routes.txt") // 添加共享路由 9 .UseDotNettyTransport(); // 添加DotNetty通信傳輸 10 } 11 12 var serviceProvider = serviceCollection.BuildServiceProvider(); 13 14 serviceProvider.GetRequiredService<ILoggerFactory>().AddConsole((console, logLevel) => (int) logLevel >= 0); 15 16 var services = serviceProvider.GetRequiredService<IServiceProxyGenerater>() 17 .GenerateProxys(new[] {typeof(IUserService)}).ToArray(); 18 19 var userService = serviceProvider.GetRequiredService<IServiceProxyFactory>().CreateProxy<IUserService>( 20 services.Single(typeof(IUserService).GetTypeInfo().IsAssignableFrom) 21 ); 22 23 while (true) 24 { 25 Task.Run(async () => 26 { 27 Console.WriteLine($"userService.GetUserName:{await userService.GetUserName(1)}"); 28 Console.WriteLine($"userService.GetUserId:{await userService.GetUserId("rabbit")}"); 29 Console.WriteLine($"userService.GetUserLastSignInTime:{await userService.GetUserLastSignInTime(1)}"); 30 var user = await userService.GetUser(1); 31 Console.WriteLine($"userService.GetUser:name={user.Name},age={user.Age}"); 32 Console.WriteLine($"userService.Update:{await userService.Update(1, user)}"); 33 Console.WriteLine($"userService.GetDictionary:{(await userService.GetDictionary())["key"]}"); 34 await userService.Try(); 35 Console.WriteLine("client function completed!"); 36 }).Wait(); 37 Console.ReadKey(); 38 } 39 }?
我想看到這里,明白上面代碼的作用,也就明白了這個(gè)框架的作用,客戶端能像調(diào)用本地方法一樣去調(diào)用遠(yuǎn)程方法,并且中間過程是完全透明的,分離,分離,分離。 微服務(wù)的作用不再介紹,呵呵。 感謝閱讀!?
總結(jié)
以上是生活随笔為你收集整理的NET Core微服务之路:让我们对上一个Demo通讯进行修改,完成RPC通讯的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: matlab程序生成.dll,matla
- 下一篇: oracle中blob转换,BLOB转换