生活随笔
收集整理的這篇文章主要介紹了
RPC框架系列——Protocol Buffers
小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.
http://blog.jeoygin.org/2011/09/rpc-framework-protocol-buffers.html
1.下載與安裝
官方網站:http://code.google.com/p/protobuf/
下載地址:http://protobuf.googlecode.com/files/protobuf-2.4.1.tar.bz2
protocol buffers并沒有實現(xiàn)RPC通信,可以使用第三方的RPC實現(xiàn)protobuf-socket-rpc,下載地址是:http://protobuf-socket-rpc.googlecode.com/files/protobuf-socket-rpc-2.0.jar
cd /usr/local/src wget http://protobuf.googlecode.com/files/protobuf-2.4.1.tar.bz2 tar jxvf protobuf-2.4.1.tar.bz2 cd protobuf-2.4.1 ./configure make make check make install
下面將編譯生成jar包,以便在java中使用Protocol Buffers,需確保已安裝了maven。
cd java mvn test mvn install mvn package
安裝、編譯后在target/目錄下會生成protobuf-java-2.4.1.jar。
2.消息結構與服務接口
首先需要編寫一個.proto文件,結構化數(shù)據被稱為Message。
package protobuf; ? option java_package = "protobuf"; option java_outer_classname = "PersonProtos"; option java_generic_services = true; ? message Person { ?? ?required string name = 1; ?? ?required int32 id = 2; ?? ?optional string email = 3; ? ?? ?enum PhoneType { ?? ? ? ?MOBILE = 0; ?? ? ? ?HOME = 1; ?? ? ? ?WORK = 2; ?? ?} ? ?? ?message PhoneNumber { ?? ? ? ?required string number = 1; ?? ? ? ?optional PhoneType type = 2 [default = HOME]; ?? ?} ? ?? ?repeated PhoneNumber phone = 4; ? ?? ?service PhoneService { ?? ? ? ?rpc GetPhone (Phone) returns (Phone); ?? ?} }
消息的成員需要指定其規(guī)則:
(1) required:這個域在消息中必須剛好有1個;
(2) optional:這個域在消息中可以有0或1個;
(3) repeated:這個域在消息中可以有從多個,包括0個。
Protobuf的類型與Java類型的映射關系:
double? ?->? double float? ? ->? float int32? ? ->? int int64? ? ->? long uint32? ?->? int[1] uint64? ?->? long[1] sint32? ?->? int sint64? ?->? long fixed32? ->? int[1] fixed64? ->? long[1] sfixed32 ->? int sfixed64 ->? long bool? ? ?->? boolean string? ?->? String bytes? ? ->? ByteString
編寫完.proto文件后,就可以使用下面的命令將會在protobuf目錄中生成源文件PersonProtos.java
protoc –java_out=. person.proto
3.序列化
先看下面一個例子:
message Test1 { ?? ?required int32 a = 1; }
創(chuàng)建一個Test1消息,并且把a設置為150,那么序列化后有如下3個字節(jié):
08 96 01
3.1.varint編碼
varint編碼的序列化使用一個或多個字節(jié),數(shù)字越大使用的字節(jié)數(shù)越多。對于序列化后的字節(jié),除了最后一個字節(jié),都有一個most significant bit(msb):表示后邊是否有更多的字節(jié)。整數(shù)序列化時按7位一組,每個字節(jié)的低7位保存一組,第一個字節(jié)存儲最低位一組,即使用little endian。
比如300序列化后的字節(jié)序列是:
10101100 00000010
先去掉每個字節(jié)的msb:
0101100 0000010
交換字節(jié)的順序:
0000010 0101100 -> 100101100 -> 256 + 32 + 8 + 4 = 300
3.2.消息結構
一個protocol buffer message是一個key/value對序列。每一key/value對的key實際是兩個值:.proto文件中的field number以及wire type。可用的wire type如下所示:
| Type | Meaning | Used For |
| 0 | Varint | int32, int64, uint32, uint64, sint32, sint64, bool, enum |
| 1 | 64-bit | fixed64, sfixed64, double |
| 2 | Length-delimited | string, bytes, embedded messages, packed repeated fields |
| 3 | Start group | groups (deprecated) |
| 4 | End group | groups (deprecated) |
| 5 | 32-bit | fixed32, sfixed32, float |
每一個key是一個varint,值是(field_number << 3) | wire_type,即低三位存儲wire type。
3.3.有符號整數(shù)
有符號整數(shù)使用ZigZag編碼來將有符號整數(shù)映射到無符號整數(shù)。
| Signed Original | Encoded As |
| 0 | 0 |
| -1 | 1 |
| 1 | 2 |
| -2 | 3 |
| 2147483647 | 4294967294 |
| -2147483648 | 4294967294 |
3.4.非varint編碼
message Test2 { ?? ?required string b = 2; }
將b的值設置為“testing”,編碼結果為:
12 07 74 65 73 74 69 6e 67
這里的key是0×12:field_number = 2, type = 2。字符串的長度是7。
3.5.嵌套消息
message Tes3 { ?? ?required Test1 c = 3; }
c的成員a的值設置為150,編碼結果為:
1a 03 08 96 01
后三個字節(jié)和Test1一樣,之前的數(shù)字3表示長度。
3.5.Repeated域
message Test4 { ?? ?repeated int32 d = 4; }
{3, 270, 86942}編碼結果為:
22? ? ? ? // tag (field number 4, wire type 2) 06? ? ? ? // payload size (6 bytes) 03? ? ? ? // first element (varint 3) 8E 02? ? ?// second element (varint 270) 9E A7 05? // third element (varint 86942)
4.rpc通信實現(xiàn)
使用protocol buffers的第三方rpc實現(xiàn)protobuf-socket-rpc。
假設protocol buffers生成的類是protobuf. MessageProtos,其中定義了一個消息類Message和一個服務類MessageService,MessageService中定義了一個接口getMessage(RpcController, Message request)。
服務接口實現(xiàn)MessageServiceImpl.java:
package?protobuf; ? import?com.google.protobuf.RpcController; import?com.google.protobuf.ServiceException; import?protobuf.MessageProtos.Message; import?protobuf.MessageProtos.MessageService.BlockingInterface; ? public?class?MessageServiceImpl?implements?BlockingInterface?{ ? ? @Override ? ??public?Message?getMessage(RpcController?controller,?Message?request) ? ? ? ? ? ??throws?ServiceException?{ ? ? ? ??// process request? ? ? ? …… ? ? ? ??return?request; ? ??} }
服務端實現(xiàn)Server.java:
package?protobuf; ? import?java.util.concurrent.Executors; ? import?com.googlecode.protobuf.socketrpc.RpcServer; import?com.googlecode.protobuf.socketrpc.ServerRpcConnectionFactory; import?com.googlecode.protobuf.socketrpc.SocketRpcConnectionFactories; import?protobuf.MessageProtos.MessageService; ? public?class?Server?{ ? ??private?int?port; ? ??private?int?threadPoolSize; ? ? ??public?Server(int?port,?int?threadPoolSize)?{ ? ? ? ??this.port?=?port; ? ? ? ??this.threadPoolSize?=?threadPoolSize; ? ??} ? ? ??public?void?run()?{ ? ? ? ??// Start server ? ? ? ??ServerRpcConnectionFactory?rpcConnectionFactory?=SocketRpcConnectionFactories ? ? ? ? ? ? ? ? .createServerRpcConnectionFactory(port); ? ? ? ??RpcServer?server?=?new?RpcServer(rpcConnectionFactory, ? ? ? ? ? ? ? ??Executors.newFixedThreadPool(threadPoolSize),?true); ? ? ? ??server.registerBlockingService(MessageService ? ? ? ? ? ? ? ? .newReflectiveBlockingService(new?MessageServiceImpl())); ? ? ? ??server.run(); ? ??} ? ? ??public?static?void?main(String[]?args)?{ ? ? ? ??if?(args.length?!=?2)?{ ? ? ? ? ? ??System.out.println("Usage: Server port thread_pool_size"); ? ? ? ? ? ??return; ? ? ? ??} ? ? ? ? ??int?port?=?Integer.parseInt(args[0]); ? ? ? ??int?size?=?Integer.parseInt(args[1]); ? ? ? ? ??new?Server(port,?size).run(); ? ??} }
客戶端實現(xiàn)Client.java:
package?protobuf; ? import?protobuf.MessageProtos.Message; import?protobuf.MessageProtos.MessageService; import?protobuf.MessageProtos.MessageService.BlockingInterface; ? import?com.google.protobuf.BlockingRpcChannel; import?com.google.protobuf.ByteString; import?com.google.protobuf.RpcController; import?com.google.protobuf.ServiceException; import?com.googlecode.protobuf.socketrpc.RpcChannels; import?com.googlecode.protobuf.socketrpc.RpcConnectionFactory; import?com.googlecode.protobuf.socketrpc.SocketRpcConnectionFactories; import?com.googlecode.protobuf.socketrpc.SocketRpcController; ? public?class?Client?{ ? ??private?int?port; ? ??private?String?host; ? ??private?int?size; ? ??private?int?count; ? ? ??public?Client(int?port,?String?host,?int?size,?int?count)?{ ? ? ? ??super(); ? ? ? ??this.port?=?port; ? ? ? ??this.host?=?host; ? ? ? ??this.size?=?size; ? ? ? ??this.count?=?count; ? ??} ? ? ??public?long?run()?{ ? ? ? ??// Create channel ? ? ? ??RpcConnectionFactory?connectionFactory?=?SocketRpcConnectionFactories ? ? ? ? ? ? ? ? .createRpcConnectionFactory(host,?port); ? ? ? ??BlockingRpcChannel?channel?=?RpcChannels ? ? ? ? ? ? ? ? .newBlockingRpcChannel(connectionFactory); ? ? ? ? ??// Call service ? ? ? ??BlockingInterface?service?=?MessageService.newBlockingStub(channel); ? ? ? ??RpcController?controller?=?new?SocketRpcController(); ? ? ? ??Message.Builder?message?=?Message.newBuilder(); ? ? ? ??// initiate the message ? ? ? ? … ? ? ? ? ??long?start?=?0; ? ? ? ??long?end?=?0; ? ? ? ??try?{ ? ? ? ? ? ??start?=?System.currentTimeMillis(); ? ? ? ? ? ??for?(int?i?=?0;?i?<?count;?i++)?{ ? ? ? ? ? ? ? ??service.getMessage(controller,?message.build()); ? ? ? ? ? ??} ? ? ? ? ? ??end?=?System.currentTimeMillis(); ? ? ? ? ? ??System.out.println(end?-?start); ? ? ? ??}?catch?(ServiceException?e)?{ ? ? ? ? ? ??e.printStackTrace(); ? ? ? ??} ? ? ? ? ??// Check success ? ? ? ??if?(controller.failed())?{ ? ? ? ? ? ??System.err.println(String.format("Rpc failed %s : %s", ? ? ? ? ? ? ? ? ? ??((SocketRpcController)?controller).errorReason(), ? ? ? ? ? ? ? ? ? ??controller.errorText())); ? ? ? ??} ? ? ? ? ??return?end?-?start; ? ??} ? ? ??public?static?void?main(String[]?args)?{ ? ? ? ??if?(args.length?!=?4)?{ ? ? ? ? ? ??System.out.println("Usage: Client host port dataSize count"); ? ? ? ? ? ??return; ? ? ? ??} ? ? ? ??String?host?=?args[0]; ? ? ? ??int?port?=?Integer.parseInt(args[1]); ? ? ? ??int?size?=?Integer.parseInt(args[2]); ? ? ? ??int?count?=?Integer.parseInt(args[3]); ? ? ? ? ??new?Client(port,?host,?size,?count).run(); ? ??} }
5.參考資料
(1) Protocol Buffers Documentation:?http://code.google.com/apis/protocolbuffers/docs/overview.html
總結
以上是生活随笔為你收集整理的RPC框架系列——Protocol Buffers的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。