Thrift 异步模式
生活随笔
收集整理的這篇文章主要介紹了
Thrift 异步模式
小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.
我們廣泛使用thrift作為我們內(nèi)部接口調(diào)用的RPC框架,而且基本上都是使用多線程請(qǐng)求等待應(yīng)答的同步模式 。但是在一些情況下(例如大數(shù)據(jù)量同步),如果可以使用異步模式,可以優(yōu)化程序結(jié)構(gòu)和提高模塊性能。
thrift 有提供一套異步模式模式供我們使用,我們跟往常一樣來編寫一個(gè)thrift 協(xié)議文件。
namespace cpp example
service Twitter {string sendString(1:string data);
}不同的是,我們需要加入cpp:cob_type 來生成代碼。
thrift -r -strict –gen cpp:cob_style -o ./ test.thrift
生成的代碼文件表和之前的基本相同,但在Twitter.cpp 和Twitter.h 文件中增加了異步客戶端和異步服務(wù)器使用的類。
$ tree gen-cpp?
|– Test_constants.cpp?
|– Test_constants.h?
|– Test_types.cpp?
|– Test_types.h?
|– Twitter.cpp?
|– Twitter.h?
|–Twitter_server.skeleton.cpp?
|-Twitter_async_server.skeleton.cpp
用戶只要關(guān)心在Twitter.h 中的TwitterCobClient、TwitterCobSvIf和TwitterAsyncProcessor這三個(gè)類。
Thrift 異步Client
異步客戶端代碼有TwitterCobClient 以及它繼承的類。
class TwitterCobClient : virtual public TwitterCobClIf {public:TwitterCobClient(boost::shared_ptr< ::apache::thrift::async::TAsyncChannel> channel, ::apache::thrift::protocol::TProtocolFactory* protocolFactory) :channel_(channel),itrans_(new ::apache::thrift::transport::TMemoryBuffer()),otrans_(new ::apache::thrift::transport::TMemoryBuffer()),piprot_(protocolFactory->getProtocol(itrans_)),poprot_(protocolFactory->getProtocol(otrans_)) {iprot_ = piprot_.get();oprot_ = poprot_.get();}boost::shared_ptr< ::apache::thrift::async::TAsyncChannel> getChannel() {return channel_;}virtual void completed__(bool /* success */) {}void sendString(tcxx::function<void(TwitterCobClient* client)> cob, const std::string& data);void send_sendString(const std::string& data);void recv_sendString(std::string& _return);protected:boost::shared_ptr< ::apache::thrift::async::TAsyncChannel> channel_;boost::shared_ptr< ::apache::thrift::transport::TMemoryBuffer> itrans_;boost::shared_ptr< ::apache::thrift::transport::TMemoryBuffer> otrans_;boost::shared_ptr< ::apache::thrift::protocol::TProtocol> piprot_;boost::shared_ptr< ::apache::thrift::protocol::TProtocol> poprot_;::apache::thrift::protocol::TProtocol* iprot_;::apache::thrift::protocol::TProtocol* oprot_;
};從源文件上看,通過類實(shí)現(xiàn)發(fā)現(xiàn):
completed__(bool /* success */)是虛函數(shù),用于通知用戶數(shù)據(jù)接收完成;
sendString函數(shù)帶有回調(diào)參數(shù) function <void(TwitterCobClient* client)> cob,用于數(shù)據(jù)接收時(shí)回調(diào),這是異步的特點(diǎn);
send_sendString和recv_sendString分別用于寫數(shù)據(jù)到輸出緩存和從輸入緩存讀數(shù)據(jù)
列表內(nèi)容擁有TAsyncChannel,異步功能的核心在于TAsyncChannel,它是用于回調(diào)函數(shù)注冊(cè)和異步收發(fā)數(shù)據(jù);
Transport采用TMemoryBuffer,TMemoryBuffer是用于程序內(nèi)部之間通信用的,在這里起到讀寫緩存作用
下面看看關(guān)鍵函數(shù) sendString的實(shí)現(xiàn)
void TwitterCobClient::sendString(t cxx::function<v oid(TwitterCobClient* client)> cob, const std::string& data)
{send_sendString(data);channel_->sendAndRecvMessage(tcxx::bind(cob, this), otrans_.get(), itrans_.get());
}send_sendString函數(shù)是想緩沖區(qū)(TMemoryBuffer)寫入數(shù)據(jù), 而sendString 則通過調(diào)用TAsyncChannel的sendAndRecvMessage接口注冊(cè)回調(diào)函數(shù)。
TAsyncChannel作為接口類定義了三個(gè)接口函數(shù)。
/*** Send a message over the channel.*/virtual void sendMessage(const VoidCallback& cob,apache::thrift::transport::TMemoryBuffer* message) = 0;/*** Receive a message from the channel.*/virtual void recvMessage(const VoidCallback& cob,apache::thrift::transport::TMemoryBuffer* message) = 0;/*** Send a message over the channel and receive a response.*/virtual void sendAndRecvMessage(const VoidCallback& cob,apache::thrift::transport::TMemoryBuffer* sendBuf,apache::thrift::transport::TMemoryBuffer* recvBuf);TAsyncChannel目前為止(0.9.1版本)只有一種客戶端實(shí)現(xiàn)類TEvhttpClientChannel,顧名思義它是基于libevent和http協(xié)議實(shí)現(xiàn)的。 使用libevent的方法就不在這里累贅了,主要看下sendAndRecvMessage的實(shí)現(xiàn)。
void TEvhttpClientChannel::sendAndRecvMessage(const VoidCallback& cob,apache::thrift::transport::TMemoryBuffer* sendBuf,apache::thrift::transport::TMemoryBuffer* recvBuf) {cob_ = cob;// 綁定回調(diào)函數(shù)recvBuf_ = recvBuf;struct evhttp_request* req = evhttp_request_new(response, this);uint8_t* obuf;uint32_t sz;sendBuf->getBuffer(&obuf, &sz);rv = evbuffer_add(req->output_buffer, obuf, sz);rv = evhttp_make_request(conn_, req, EVHTTP_REQ_POST, path_.c_str());// 發(fā)送http 請(qǐng)求
}
從sendAndRecvMessage實(shí)現(xiàn)可看出,TEvhttpClientChannel是用采用http協(xié)議來與服務(wù)器通信,后面介紹異步server時(shí)會(huì)發(fā)現(xiàn),同樣采用是http協(xié)議,它們使用的http庫(kù)是libevent庫(kù)的evhttp。
通過向evhttp_request中注冊(cè)相應(yīng)回調(diào)函數(shù)respones和傳入回調(diào)實(shí)例本身的指針,在相應(yīng)時(shí)候回調(diào)函數(shù)中調(diào)用TEvhttpClientChannel實(shí)例的finish接口完成數(shù)據(jù)接收,并寫入緩存中,供應(yīng)用層獲取使用。?
看下回調(diào)函數(shù)response 的實(shí)現(xiàn):
/* static */ void TEvhttpClientChannel::response(struct evhttp_request* req, void* arg) {TEvhttpClientChannel* self = (TEvhttpClientChannel*)arg;try {self->finish(req);} catch (std::exception& e) {// don't propagate a C++ exception in C code (e.g. libevent)std::cerr << "TEvhttpClientChannel::response exception thrown (ignored): " << e.what()<< std::endl;}
}Thrift 異步server
異步server關(guān)心另外兩個(gè)類:TwitterCobSvIf和TwitterAsyncProcessor。很明顯TwitterCobSvIf是要用戶繼承實(shí)現(xiàn)的,它與同步TwitterSvIf不同的地方是成員函數(shù)多一個(gè)cob回調(diào)函數(shù),在實(shí)現(xiàn)TwitterSvIf時(shí),需要調(diào)用cob。示例如下:
class TwitterCobSvNull : virtual public TwitterCobSvIf {public:virtual ~TwitterCobSvNull() {}void sendString(tcxx::function<void(std::string const& _return)> cob, const std::string& /* data */) {std::string _return;return cob(_return);}
};
那么這個(gè)cob是什么函數(shù),哪里注冊(cè)的?這在thrift lib庫(kù)里的TEvhttpServer和TAsyncProtocolProcessor類里可找到答案,其中TEvhttpServer是異步server,傳輸是采用http協(xié)議,與異步client對(duì)上。
先看看TEvhttpServer實(shí)現(xiàn),同樣采用event_base來異步收發(fā)數(shù)據(jù),收到數(shù)據(jù)時(shí),回調(diào)request函數(shù)。
void TEvhttpServer::request(struct evhttp_request* req, void* self) {try {static_cast<TEvhttpServer*>(self)->process(req);} catch(std::exception& e) {evhttp_send_reply(req, HTTP_INTERNAL, e.what(), 0);}
}
void TEvhttpServer::process(struct evhttp_request* req) {RequestContext* ctx = new RequestContext(req);return processor_->process( // 這里的processor_正是TAsyncProtocolProcessorstd::tr1::bind(&TEvhttpServer::complete, // 注冊(cè)completethis,ctx,std::tr1::placeholders::_1),ctx->ibuf,ctx->obuf);
}void TEvhttpServer::complete(RequestContext* ctx, bool success) {std::auto_ptr<RequestContext> ptr(ctx);int code = success ? 200 : 400;const char* reason = success ? "OK" : "Bad Request";int rv = evhttp_add_header(ctx->req->output_headers, "Content-Type", "application/x-thrift");struct evbuffer* buf = evbuffer_new();uint8_t* obuf;uint32_t sz;ctx->obuf->getBuffer(&obuf, &sz); // 從輸出緩沖讀數(shù)據(jù)int ret = evbuffer_add(buf, obuf, sz);evhttp_send_reply(ctx->req, code, reason, buf); // 發(fā)送數(shù)據(jù)}接著看TAsyncProtocolProcessor的process實(shí)現(xiàn)
void TAsyncProtocolProcessor::process(std::tr1::function<void(bool healthy)> _return,boost::shared_ptr<TBufferBase> ibuf,boost::shared_ptr<TBufferBase> obuf) {boost::shared_ptr<TProtocol> iprot(pfact_->getProtocol(ibuf));boost::shared_ptr<TProtocol> oprot(pfact_->getProtocol(obuf));return underlying_->process( // underlying_是生成代碼里的TwitterAsyncProcessorstd::tr1::bind(&TAsyncProtocolProcessor::finish, _return, // compere函數(shù)oprot,std::tr1::placeholders::_1),iprot, oprot);
}/* static */ void TAsyncProtocolProcessor::finish(std::tr1::function<void(bool healthy)> _return,boost::shared_ptr<TProtocol> oprot,bool healthy) {(void) oprot;// This is a stub function to hold a reference to oprot.return _return(healthy); // 回調(diào)compere函數(shù)
}最后看TwitterAsyncProcessor::process,它先寫fname,mtype, seqid然后調(diào)用process_fn,process_fn選擇調(diào)用合理的處理函數(shù)(如process_sendString),看process_sendString實(shí)現(xiàn):
void TwitterAsyncProcessor::process_sendString(std::tr1::function<void(bool ok)> cob, int32_t seqid, ::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot)void (TwitterAsyncProcessor::*return_fn)(std::tr1::function<void(bool ok)> cob, int32_t seqid, ::apache::thrift::protocol::TProtocol* oprot, void* ctx, const std::string& _return) =&TwitterAsyncProcessor::return_sendString; // return_sendString正是我們要找的cob函數(shù)iface_->sendString( // iface_是TwitterCobSvIf的具體類,用戶實(shí)現(xiàn)的std::tr1::bind(return_fn, this, cob, seqid, oprot, ctx, std::tr1::placeholders::_1), // cob 是 finish函數(shù)args.data);}上面return_sendString是我們要找的cob函數(shù),該函數(shù)將用戶處理的結(jié)果寫入輸出沖緩,并發(fā)送給client。
下面實(shí)現(xiàn)了一個(gè)異步客戶端和異步服務(wù)端?
采用異步時(shí),必須采用http 傳輸層。
異步客戶端的實(shí)現(xiàn)
demo_async_client.cc
#include <string>
#include "boost/shared_ptr.hpp"
#include <thrift/Thrift.h>
#include <thrift/protocol/TProtocol.h>
#include <thrift/transport/TSocket.h>
#include <thrift/transport/TTransportUtils.h>
#include <thrift/concurrency/ThreadManager.h>
#include <thrift/transport/TBufferTransports.h>
#include <thrift/server/TServer.h>
#include <thrift/async/TAsyncChannel.h>
#include <thrift/async/TEvhttpClientChannel.h>
#include "common/thrift/Twitter.h"
#include "boost/function.hpp"
#include "boost/bind.hpp"
#include <event.h>
#include <stdio.h>
using namespace apache::thrift;
using namespace apache::thrift::protocol;
using namespace apache::thrift::transport;
using std::string;
using boost::shared_ptr;
using namespace example;
using namespace apache::thrift::async;class testClient : public TwitterCobClient
{
public:testClient(boost::shared_ptr< ::apache::thrift::async::TAsyncChannel> channel, TProtocolFactory* protocolFactory): TwitterCobClient(channel, protocolFactory){ };virtual void completed__(bool success){if (success){printf("respone : %s \n", res.c_str()); // 輸出返回結(jié)果}else{printf("failed to respone\n");}fflush(0);};string res;
};//callback function
static void my_recv_sendString(TwitterCobClient *client){client->recv_sendString(dynamic_cast<testClient*>(client)->res);
}static void sendString(testClient & client){
printf("snedstring start\n");
std::function<void(TwitterCobClient*client)>cob = bind(&my_recv_sendString,_1);
client.sendString(cob,"Hello");
printf("sendstring end\n");
}static void DoSimpleTest(const std::string & host, int port){printf("running SimpleTset(%s, %d)..\n", host.c_str(),port);event_base* base = event_base_new();boost::shared_ptr< ::apache::thrift::async::TAsyncChannel> channel1( new TEvhttpClientChannel( host, "/", host.c_str(), port, base ) );testClient client1( channel1, new TBinaryProtocolFactory() );sendString(client1); // 發(fā)送第一個(gè)請(qǐng)求boost::shared_ptr< ::apache::thrift::async::TAsyncChannel> channel2( new TEvhttpClientChannel( host, "/", host.c_str(), port, base ) );testClient client2( channel2, new TBinaryProtocolFactory() );sendString(client2); // 發(fā)送第二個(gè)請(qǐng)求event_base_dispatch(base);event_base_free(base);printf( "done DoSimpleTest().\n" );
}int main( int argc, char* argv[] )
{DoSimpleTest( "localhost", 14488 );return 0;}
異步服務(wù)端的實(shí)現(xiàn)
demo_async_serv.cc
#include <string>
#include "boost/shared_ptr.hpp"
#include <thrift/Thrift.h>
#include <thrift/protocol/TProtocol.h>
#include <thrift/transport/TSocket.h>
#include <thrift/transport/TTransportUtils.h>
#include <thrift/concurrency/ThreadManager.h>
#include <thrift/transport/TBufferTransports.h>
#include <thrift/server/TServer.h>
#include <thrift/async/TAsyncChannel.h>
#include <thrift/async/TEvhttpClientChannel.h>
#include "common/thrift/Twitter.h"
#include <thrift/async/TAsyncProtocolProcessor.h>
#include <thrift/async/TEvhttpServer.h>using namespace apache::thrift;
using namespace apache::thrift::protocol;
using namespace apache::thrift::transport;
using std::string;
using namespace boost;
using namespace example;
using namespace apache::thrift::async;class TwitterAsyncHandler : public TwitterCobSvIf {public:TwitterAsyncHandler() {// Your initialization goes here}void sendString(std::function<void(std::string const& _return)> cob, const std::string& data) {printf("sendString rec:%s\n", data.c_str()); // 輸出收到的數(shù)據(jù)std::string _return = "world"; // 返回world字符串給客戶端return cob(_return);}};int main(int argc, char **argv) {shared_ptr<TAsyncProcessor> underlying_pro(new TwitterAsyncProcessor( shared_ptr<TwitterCobSvIf>(new TwitterAsyncHandler()) ) );shared_ptr<TAsyncBufferProcessor> processor( new TAsyncProtocolProcessor( underlying_pro, shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory()) ) );TEvhttpServer server(processor, 14488);server.serve();return 0;
}
參考
http://blog.csdn.net/whycold/article/details/10973?
http://tech.uc.cn/?p=2668553
總結(jié)
以上是生活随笔為你收集整理的Thrift 异步模式的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: browse下载插件DownThemAl
- 下一篇: 简单的docker命令ubuntu系统