生活随笔
收集整理的這篇文章主要介紹了
boost asio 异步实现tcp通讯
小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.
一、前言
boost asio可算是一個(gè)簡(jiǎn)單易用,功能又強(qiáng)大可跨平臺(tái)的C++通訊庫(kù),效率也表現(xiàn)的不錯(cuò),linux環(huán)境是epoll實(shí)現(xiàn)的,而windows環(huán)境是iocp實(shí)現(xiàn)的。而tcp通訊是項(xiàng)目當(dāng)中經(jīng)常用到通訊方式之一,實(shí)現(xiàn)的方法有各式各樣,因此總結(jié)一套適用于自己項(xiàng)目的方法是很有必要,很可能下一個(gè)項(xiàng)目直接套上去就可以用了。
二、實(shí)現(xiàn)思路
1.通訊包數(shù)據(jù)結(jié)構(gòu)
Tag:檢查數(shù)據(jù)包是否合法,具體會(huì)在下面講解;
Length:描述Body的長(zhǎng)度;
Command:表示數(shù)據(jù)包的類型,0表示心跳包(長(zhǎng)連接需要心跳來檢測(cè)連接是否正常),1表示注冊(cè)包(客戶端連接上服務(wù)器之后要將相關(guān)信息注冊(cè)給服務(wù)器),2表示業(yè)務(wù)消息包;
business_type:業(yè)務(wù)消息包類型,服務(wù)器會(huì)根據(jù)業(yè)務(wù)消息包類型將數(shù)據(jù)路由到對(duì)應(yīng)的客戶端(客戶端是有業(yè)務(wù)類型分類的);
app_id:客戶端唯一標(biāo)識(shí)符;
Data:消息數(shù)據(jù);
2.連接對(duì)象
客戶端連接上服務(wù)器之后,雙方都會(huì)產(chǎn)生一個(gè)socket連接對(duì)象,通過這個(gè)對(duì)象可以收發(fā)數(shù)據(jù),因此我定義為socket_session。
//socket_session.h
[cpp] view plain
copy #pragma?once??#include?<iostream>??#include?<list>??#include?<hash_map>??#include?<boost/bind.hpp>??#include?<boost/asio.hpp>??#include?<boost/shared_ptr.hpp>??#include?<boost/make_shared.hpp>??#include?<boost/thread.hpp>??#include?<boost/thread/mutex.hpp>??#include?<boost/enable_shared_from_this.hpp>??#include?<firebird/log/logger_log4.hpp>??#include?<firebird/detail/config.hpp>??#include?<firebird/socket_utils/message_archive.hpp>????using?boost::asio::ip::tcp;????namespace?firebird{??????enum?command{?heartbeat?=?0,?regist,?normal};????????const?std::string?tag?=?"KDS";????????class?FIREBIRD_DECL?socket_session;??????typedef?boost::shared_ptr<socket_session>?socket_session_ptr;????????class?FIREBIRD_DECL?socket_session:??????????public?boost::enable_shared_from_this<socket_session>,??????????private?boost::noncopyable??????{??????public:??????????typedef?boost::function<void(socket_session_ptr)>?close_callback;??????????typedef?boost::function<void(??????????????const?boost::system::error_code&,???????????????socket_session_ptr,?message&)>?read_data_callback;????????????socket_session(boost::asio::io_service&?io_service);??????????~socket_session(void);????????????DWORD?id()?{?return?m_id;?}??????????WORD?get_business_type(){?return?m_business_type;?}??????????void?set_business_type(WORD?type)?{?m_business_type?=?type;?}??????????DWORD?get_app_id(){?return?m_app_id;?}??????????void?set_app_id(DWORD?app_id)?{?m_app_id?=?app_id;?}??????????std::string&?get_remote_addr()?{?return?m_name;?}??????????void?set_remote_addr(std::string&?name)?{?m_name?=?name;?}??????????tcp::socket&?socket()?{?return?m_socket;?}????????????void?installCloseCallBack(close_callback?cb){?close_cb?=?cb;?}??????????void?installReadDataCallBack(read_data_callback?cb)?{?read_data_cb?=?cb;?}????????????void?start();??????????void?close();??????????void?async_write(const?std::string&?sMsg);??????????void?async_write(message&?msg);????????????bool?is_timeout();??????????void?set_op_time(){std::time(&m_last_op_time);}????????private:??????????static?boost::detail::atomic_count?m_last_id;????????????DWORD?m_id;??????????WORD??m_business_type;??????????DWORD?m_app_id;??????????std::string?m_name;??????????boost::array<char,?7>?sHeader;??????????std::string?sBody;????????????tcp::socket?m_socket;??????????boost::asio::io_service&?m_io_service;????????????std::time_t?m_last_op_time;????????????close_callback?close_cb;??????????read_data_callback?read_data_cb;??????????????????????void?handle_write(const?boost::system::error_code&?e,???????????????std::size_t?bytes_transferred,?std::string*?pmsg);??????????????????????void?handle_read_header(const?boost::system::error_code&?error);????????????????????void?handle_read_body(const?boost::system::error_code&?error);????????????void?handle_close();??????};??}??
這里注意的是,定義了一個(gè)tag="KDS",目的是為了檢查收到的數(shù)據(jù)包是否有效,每一個(gè)數(shù)據(jù)包前3個(gè)字節(jié)不為“KDS”,那么就認(rèn)為是非法的請(qǐng)求包,你也可以定義tag等于其它字符串,只要按協(xié)議發(fā)包就正常,當(dāng)然這是比較簡(jiǎn)單的數(shù)據(jù)包檢查方法了。比較嚴(yán)謹(jǐn)?shù)姆椒ㄊ请p方使用哈希算法來檢查的,怎么做,這里先不做詳解。
//socket_session.cpp
[cpp] view plain
copy #include?"socket_session.h"????namespace?firebird{??????boost::detail::atomic_count?socket_session::m_last_id(0);????????socket_session::socket_session(boost::asio::io_service&?io_srv)??????????:m_io_service(io_srv),?m_socket(io_srv),???????????m_business_type(0),?m_app_id(0)??????{??????????m_id?=?++socket_session::m_last_id;??????}????????socket_session::~socket_session(void)??????{??????????m_socket.close();??????}????????void?socket_session::start()??????{??????????m_socket.set_option(boost::asio::ip::tcp::acceptor::linger(true,?0));??????????m_socket.set_option(boost::asio::socket_base::keep_alive(true));??????????std::time(&m_last_op_time);??????????const?boost::system::error_code?error;??????????handle_read_header(error);??????}????????void?socket_session::handle_close()??????{??????????try{??????????????m_socket.close();??????????????close_cb(shared_from_this());??????????}??????????catch(std::exception&?e)??????????{??????????????LOG4CXX_ERROR(firebird_log,?KDS_CODE_INFO?<<?"連接遠(yuǎn)程地址:["?<<?get_remote_addr()?<<?"],socket異常:["?<<?e.what()?<<?"]");??????????}??????????catch(...)??????????{??????????????LOG4CXX_ERROR(firebird_log,?KDS_CODE_INFO?<<?"連接遠(yuǎn)程地址:["?<<?get_remote_addr()?<<?"],socket異常:[未知異常]");??????????}??????}????????void?socket_session::close()??????{???????????????????????????????m_io_service.post(boost::bind(&socket_session::handle_close,?shared_from_this()));??????}????????static?int?connection_timeout?=?60;????????bool?socket_session::is_timeout()??????{??????????std::time_t?now;??????????std::time(&now);??????????????return?now?-?m_last_op_time?>?connection_timeout;??????}??????????????void?socket_session::handle_read_header(const?boost::system::error_code&?error)??????{??????????LOG4CXX_DEBUG(firebird_log,?KDS_CODE_INFO??<<?"enter.");????????????try{??????????????if(error)??????????????{??????????????????LOG4CXX_ERROR(firebird_log,?KDS_CODE_INFO??<<?"連接遠(yuǎn)程地址:["?<<?get_remote_addr()?<<?"],socket異常:["?<<?error.message().c_str()?<<?"]");??????????????????close();??????????????????return;??????????????}????????????????std::string?data;??????????????data.swap(sBody);??????????????boost::asio::async_read(m_socket,???????????????????boost::asio::buffer(sHeader),??????????????????boost::bind(&socket_session::handle_read_body,?shared_from_this(),??????????????????boost::asio::placeholders::error));????????????????if?(data.length()?>?0?&&?data?!=?"")??????????????{??????????????????message?msg;??????????????????message_iarchive(msg,?data);????????????????????read_data_cb(error,?shared_from_this(),?msg);??????????????}??????????}??????????catch(std::exception&?e)??????????{??????????????LOG4CXX_ERROR(firebird_log,?KDS_CODE_INFO?<<?"連接遠(yuǎn)程地址:["?<<?get_remote_addr()?<<?"],socket異常:["?<<?e.what()?<<?"]");??????????????close();??????????}??????????catch(...)??????????{??????????????LOG4CXX_ERROR(firebird_log,?KDS_CODE_INFO?<<?"連接遠(yuǎn)程地址:["?<<?get_remote_addr()?<<?"],socket異常:[未知異常]");??????????????close();??????????}??????}??????????????void?socket_session::handle_read_body(const?boost::system::error_code&?error)??????{??????????LOG4CXX_DEBUG(firebird_log,?KDS_CODE_INFO?<<?"enter.");????????????try{??????????????if(error)??????????????{??????????????????LOG4CXX_ERROR(firebird_log,?KDS_CODE_INFO?<<?"連接遠(yuǎn)程地址:["?<<?get_remote_addr()?<<?"],socket異常:["?<<?error.message().c_str()?<<?"]");??????????????????close();??????????????????return;??????????????}????????????????if?(tag.compare(0,?tag.length(),?sHeader.data(),?0,?tag.length()))??????????????{??????????????????LOG4CXX_ERROR(firebird_log,?KDS_CODE_INFO?<<??"連接遠(yuǎn)程地址:["?<<?get_remote_addr()?<<?"],socket異常:[這是個(gè)非法連接!]");??????????????????close();??????????????????return;??????????????}????????????????DWORD?dwLength?=?0;????????????????char*?len?=?(char*)&dwLength;??????????????memcpy(len,?&sHeader[tag.length()],?sizeof(dwLength));????????????????sBody.resize(dwLength);??????????????char*?pBody?=?&sBody[0];????????????????boost::asio::async_read(m_socket,???????????????????boost::asio::buffer(pBody,?dwLength),??????????????????boost::bind(&socket_session::handle_read_header,?shared_from_this(),??????????????????boost::asio::placeholders::error));??????????}??????????catch(std::exception&?e)??????????{??????????????LOG4CXX_ERROR(firebird_log,?KDS_CODE_INFO?<<?"連接遠(yuǎn)程地址:["?<<?get_remote_addr()?<<?"],socket異常:["?<<?e.what()?<<?"]");??????????????close();??????????}??????????catch(...)??????????{??????????????LOG4CXX_ERROR(firebird_log,?KDS_CODE_INFO?<<?"連接遠(yuǎn)程地址:["?<<?get_remote_addr()?<<?"],socket異常:[未知異常]");??????????????close();??????????}??????}????????void?socket_session::handle_write(const?boost::system::error_code&?error,???????????std::size_t?bytes_transferred,?std::string*?pmsg)??????{????????????????????if?(pmsg?!=?NULL)??????????{??????????????delete?pmsg;??????????}????????????if(error)??????????{??????????????LOG4CXX_ERROR(firebird_log,?KDS_CODE_INFO?<<?"連接遠(yuǎn)程地址:["?<<?get_remote_addr()?<<?"],socket異常:["?<<?error.message().c_str()?<<?"]");??????????????close();??????????????return;??????????}??????}????????void?socket_session::async_write(const?std::string&?sMsg)??????{??????????LOG4CXX_DEBUG(firebird_log,?KDS_CODE_INFO??<<?"enter.")????????????try??????????{??????????????DWORD?dwLength?=?sMsg.size();??????????????char*?pLen?=?(char*)&dwLength;??????????????????????????????std::string*?msg?=?new?std::string();??????????????msg->append(tag);??????????????msg->append(pLen,?sizeof(dwLength));??????????????msg->append(sMsg);????????????????boost::asio::async_write(m_socket,boost::asio::buffer(*msg,?msg->size()),???????????????????boost::bind(&socket_session::handle_write,?shared_from_this(),??????????????????boost::asio::placeholders::error,?boost::asio::placeholders::bytes_transferred,??????????????????msg));????????????}??????????catch(std::exception&?e)??????????{??????????????LOG4CXX_ERROR(firebird_log,?KDS_CODE_INFO?<<?"連接遠(yuǎn)程地址:["?<<?get_remote_addr()?<<?"],socket異常:["?<<?e.what()?<<?"]");??????????????close();??????????}??????????catch(...)??????????{??????????????LOG4CXX_ERROR(firebird_log,?KDS_CODE_INFO?<<?"連接遠(yuǎn)程地址:["?<<?get_remote_addr()?<<?"],socket異常:[未知異常]");??????????????close();??????????}??????}????????void?socket_session::async_write(message&?msg)??????{??????????std::string?data;??????????message_oarchive(data,?msg);????????????????????async_write(data);??????}??}??
接受數(shù)據(jù)時(shí),socket_session會(huì)先讀取7個(gè)字節(jié)的head,比較前3個(gè)字節(jié)“KDS”,然后取得4個(gè)字節(jié)的Length,再讀出Length長(zhǎng)度的數(shù)據(jù),最后將該數(shù)據(jù)傳給read_data_cb回調(diào)函數(shù)處理,read_data_cb回調(diào)函數(shù)是在外部注冊(cè)的。
3.連接管理器
對(duì)于服務(wù)器來說,它同時(shí)服務(wù)多個(gè)客戶端,為了有效的管理,因此需要一個(gè)連接管理器,我定義為session_manager。session_manager主要是對(duì)socket_session的增刪改查,和有效性檢查。
//session_manager.h
[cpp] view plain
copy #pragma?once??#include?"socket_session.h"??#include?"filter_container.h"??#include?<boost/date_time/posix_time/posix_time.hpp>??#include?<boost/multi_index_container.hpp>??#include?<boost/multi_index/member.hpp>??#include?<boost/multi_index/ordered_index.hpp>??#include?<boost/typeof/typeof.hpp>??#include?<boost/random.hpp>??#include?<boost/pool/detail/singleton.hpp>????namespace?firebird{??????template<typename?T>??????class?var_gen_wraper??????{??????public:??????????var_gen_wraper():?gen(boost::mt19937((boost::int32_t)std::time(0)),???????????????boost::uniform_smallint<>(1,?100))?{}??????????typename?T::result_type?operator()?()?{?return?gen();?}??????private:??????????T?gen;??????};????????struct??session_stu??????{??????????DWORD???id;??????????WORD????business_type;??????????std::string?address;??????????DWORD???app_id;??????????socket_session_ptr?session;??????};????????struct?sid{};??????struct?sbusiness_type{};??????struct?saddress{};??????struct?sapp_id{};????????enum?session_idx_member{?session_id?=?0,?session_business_type,?session_address,?app_id};??#define?CLIENT?0??#define?SERVER?1????????typedef?boost::multi_index::multi_index_container<??????????session_stu,???????????boost::multi_index::indexed_by<??????????boost::multi_index::ordered_unique<??????????boost::multi_index::tag<sid>,?BOOST_MULTI_INDEX_MEMBER(session_stu,?DWORD,?id)>,??????????boost::multi_index::ordered_non_unique<??????????boost::multi_index::tag<sbusiness_type>,?BOOST_MULTI_INDEX_MEMBER(session_stu,?WORD,?business_type)>,??????????boost::multi_index::ordered_non_unique<??????????boost::multi_index::tag<saddress>,?BOOST_MULTI_INDEX_MEMBER(session_stu,?std::string,?address)>,??????????boost::multi_index::ordered_non_unique<??????????boost::multi_index::tag<sapp_id>,?BOOST_MULTI_INDEX_MEMBER(session_stu,?DWORD,?app_id)>??????????>??????>?session_set;????#define?MULTI_MEMBER_CON(Tag)?boost::multi_index::index<session_set,Tag>::type&??#define?MULTI_MEMBER_ITR(Tag)?boost::multi_index::index<session_set,Tag>::type::iterator????????struct?is_business_type?{??????????is_business_type(WORD?type)??????????????:m_type(type)??????????{????????????}??????????bool?operator()(const?session_stu&?s)???????????{??????????????return?(s.business_type?==?m_type);??????????}????????????WORD?m_type;??????};????????class?session_manager??????{??????public:??????????typedef?boost::shared_lock<boost::shared_mutex>?readLock;??????????typedef?boost::?unique_lock<boost::shared_mutex>?writeLock;????????????session_manager(boost::asio::io_service&?io_srv,?int?type,?int?expires_time);??????????~session_manager();????????????void?add_session(socket_session_ptr?p);??????????void?update_session(socket_session_ptr?p);????????????template<typename?Tag,?typename?Member>??????????void?del_session(Member?m)??????????{??????????????writeLock?lock(m_mutex);??????????????if?(m_sessions.empty())??????????????{??????????????????return?;??????????????}????????????????MULTI_MEMBER_CON(Tag)?idx?=?boost::multi_index::get<Tag>(m_sessions);????????????????????????????BOOST_AUTO(iter,?idx.find(m));????????????????if?(iter?!=?idx.end())??????????????{??????????????????idx.erase(iter);??????????????}??????????}??????????????????????template<typename?Tag,?typename?Member>??????????socket_session_ptr?get_session(Member?m)??????????{??????????????readLock?lock(m_mutex);????????????????if?(m_sessions.empty())??????????????{??????????????????return?socket_session_ptr();??????????????}????????????????MULTI_MEMBER_CON(Tag)?idx?=?boost::multi_index::get<Tag>(m_sessions);??????????????BOOST_AUTO(iter,?idx.find(m));??????????????return?iter?!=?boost::end(idx)???iter->session?:?socket_session_ptr();??????????}??????????????????????template<typename?Tag>??????????socket_session_ptr?get_session_by_business_type(WORD?m)??????????{??????????????typedef?filter_container<is_business_type,?MULTI_MEMBER_ITR(Tag)>?FilterContainer;??????????????readLock?lock(m_mutex);????????????????if?(m_sessions.empty())??????????????{??????????????????return?socket_session_ptr();??????????????}????????????????MULTI_MEMBER_CON(Tag)?idx?=?boost::multi_index::get<Tag>(m_sessions);??????????????????????????????is_business_type?predicate(m);??????????????FilterContainer?fc(predicate,?idx.begin(),?idx.end());??????????????FilterContainer::FilterIter?iter?=?fc.begin();????????????????if?(fc.begin()?==?fc.end())??????????????{??????????????????return?socket_session_ptr();??????????????}????????????????????????????????????????????????????????????????????????????????????????int?step?=?m_next_session?%?fc.szie();??????????????++m_next_session;????????????????for?(int?i?=?0;?i?<?step;?++i)??????????????{??????????????????iter++;??????????????}????????????????return?iter?!=?fc.end()???iter->session?:?socket_session_ptr();??????????}??????????????????????template<typename?Tag>??????????socket_session_ptr?get_session_by_type_ip(WORD?m,?std::string&?ip)??????????{??????????????typedef?filter_container<is_business_type,?MULTI_MEMBER_ITR(Tag)>?FilterContainer;??????????????readLock?lock(m_mutex);????????????????if?(m_sessions.empty())??????????????{??????????????????return?socket_session_ptr();??????????????}????????????????MULTI_MEMBER_CON(Tag)?idx?=?boost::multi_index::get<Tag>(m_sessions);??????????????????????????????is_business_type?predicate(m);??????????????FilterContainer?fc(predicate,?idx.begin(),?idx.end());??????????????FilterContainer::FilterIter?iter?=?fc.begin();????????????????if?(fc.begin()?==?fc.end())??????????????{??????????????????return?socket_session_ptr();??????????????}????????????????while?(iter?!=?fc.end())??????????????{??????????????????if?(iter->session->get_remote_addr().find(ip)?!=?std::string::npos)??????????????????{??????????????????????break;??????????????????}????????????????????iter++;??????????????}????????????????return?iter?!=?fc.end()???iter->session?:?socket_session_ptr();??????????}??????????????????????template<typename?Tag>??????????socket_session_ptr?get_session_by_type_appid(WORD?m,?DWORD?app_id)??????????{??????????????typedef?filter_container<is_business_type,?MULTI_MEMBER_ITR(Tag)>?FilterContainer;??????????????readLock?lock(m_mutex);????????????????if?(m_sessions.empty())??????????????{??????????????????return?socket_session_ptr();??????????????}????????????????MULTI_MEMBER_CON(Tag)?idx?=?boost::multi_index::get<Tag>(m_sessions);??????????????????????????????is_business_type?predicate(m);??????????????FilterContainer?fc(predicate,?idx.begin(),?idx.end());??????????????FilterContainer::FilterIter?iter?=?fc.begin();????????????????if?(fc.begin()?==?fc.end())??????????????{??????????????????return?socket_session_ptr();??????????????}????????????????while?(iter?!=?fc.end())??????????????{??????????????????if?(iter->session->get_app_id()?==?app_id)??????????????????{??????????????????????break;??????????????????}????????????????????iter++;??????????????}????????????????return?iter?!=?fc.end()???iter->session?:?socket_session_ptr();??????????}????????private:??????????int?m_type;??????????int?m_expires_time;??????????boost::asio::io_service&?m_io_srv;??????????boost::asio::deadline_timer?m_check_tick;??????????boost::shared_mutex?m_mutex;??????????unsigned?short?m_next_session;????????????session_set?m_sessions;????????????void?check_connection();??????};??}??
這里主要用到了boost的multi_index容器,這是一個(gè)非常有用方便的容器,可實(shí)現(xiàn)容器的多列索引,具體的使用方法,在這里不多做詳解。
//session_manager.cpp
[cpp] view plain
copy #include?"session_manager.h"????namespace?firebird{??????session_manager::session_manager(boost::asio::io_service&?io_srv,?int?type,?int?expires_time)??????????:m_io_srv(io_srv),?m_check_tick(io_srv),?m_type(type),?m_expires_time(expires_time),m_next_session(0)??????{??????????check_connection();??????}????????session_manager::~session_manager()??????{????????}??????????????void?session_manager::check_connection()??????{??????????try{??????????????writeLock?lock(m_mutex);????????????????session_set::iterator?iter?=?m_sessions.begin();??????????????while?(iter?!=?m_sessions.end())??????????????{??????????????????LOG4CXX_DEBUG(firebird_log,?"循環(huán)");??????????????????if?(CLIENT?==?m_type)??????????????????{??????????????????????if?(!iter->session->socket().is_open())??????????????????????{??????????????????????????LOG4CXX_INFO(firebird_log,?"重新連接["?<<?iter->address?<<?"]");??????????????????????????iter->session->close();???????????????????????}??????????????????????else{??????????????????????????message?msg;??????????????????????????msg.command?=?heartbeat;??????????????????????????msg.business_type?=?iter->session->get_business_type();??????????????????????????msg.app_id?=?iter->session->get_app_id();??????????????????????????msg.data()?=?"H";????????????????????????????iter->session->async_write(msg);??????????????????????????iter->session->set_op_time();??????????????????????}??????????????????}??????????????????else?if?(SERVER?==?m_type)??????????????????{??????????????????????if?(!iter->session->socket().is_open())??????????????????????{??????????????????????????LOG4CXX_INFO(firebird_log,?KDS_CODE_INFO?<<?"刪除已關(guān)閉的session:["?<<?iter->session->get_remote_addr()?<<?"]");??????????????????????????iter?=?m_sessions.erase(iter);??????????????????????????continue;??????????????????????}??????????????????????else{??????????????????????????if?(iter->session->is_timeout())???????????????????????????{??????????????????????????????LOG4CXX_INFO(firebird_log,?KDS_CODE_INFO?<<?"刪除已超時(shí)的session:["?<<?iter->session->get_remote_addr()?<<?"]");??????????????????????????????iter->session->close();??????????????????????????}??????????????????????}????????????????????????iter->session->set_op_time();??????????????????}??????????????????else{??????????????????????LOG4CXX_ERROR(firebird_log,?KDS_CODE_INFO?<<?"unknown?manager_type");??????????????????}??????????????????++iter;??????????????}????????????????LOG4CXX_DEBUG(firebird_log,?"定時(shí)檢查");??????????????m_check_tick.expires_from_now(boost::posix_time::seconds(m_expires_time));??????????????m_check_tick.async_wait(boost::bind(&session_manager::check_connection,?this));??????????}??????????catch(std::exception&?e)??????????{??????????????LOG4CXX_ERROR(firebird_log,?KDS_CODE_INFO?<<?"["?<<?e.what()?<<?"]");??????????}??????????catch(...)??????????{??????????????LOG4CXX_ERROR(firebird_log,?KDS_CODE_INFO?<<?"unknown?exception.");??????????}??????}????????void?session_manager::add_session(socket_session_ptr?p)??????{??????????writeLock?lock(m_mutex);??????????session_stu?stuSession;??????????stuSession.id?=?p->id();??????????stuSession.business_type?=?0;??????????stuSession.address?=?p->get_remote_addr();??????????stuSession.app_id?=?p->get_app_id();??????????stuSession.session?=?p;??????????m_sessions.insert(stuSession);??????}????????void?session_manager::update_session(socket_session_ptr?p)??????{??????????writeLock?lock(m_mutex);??????????if?(m_sessions.empty())??????????{??????????????return?;??????????}????????????MULTI_MEMBER_CON(sid)?idx?=?boost::multi_index::get<sid>(m_sessions);??????????BOOST_AUTO(iter,?idx.find(p->id()));????????????if?(iter?!=?idx.end())??????????{??????????????const_cast<session_stu&>(*iter).business_type?=?p->get_business_type();??????????????const_cast<session_stu&>(*iter).app_id?=?p->get_app_id();??????????}??????}??}??
這個(gè)時(shí)候,我就可以使用id、business_type、address、app_id當(dāng)做key來索引socket_session了,單使用map容器是做不到的。
還有索引時(shí),需要的一個(gè)條件過濾器
//filter_container.h
[cpp] view plain
copy #pragma?once??#include?<boost/iterator/filter_iterator.hpp>????namespace?firebird{??????template?<class?Predicate,?class?Iterator>??????class?filter_container??????{??????public:??????????typedef?boost::filter_iterator<Predicate,?Iterator>?FilterIter;????????????filter_container(Predicate?p,?Iterator?begin,?Iterator?end)??????????????:m_begin(p,?begin,?end),??????????????m_end(p,?end,?end)??????????{????????????}??????????~filter_container()?{}????????????FilterIter?begin()?{?return?m_begin;?}??????????FilterIter?end()???{?return?m_end;?}??????????int?szie()?{??????????????int?i?=?0;??????????????FilterIter?fi?=?m_begin;??????????????while(fi?!=?m_end)??????????????{??????????????????++i;??????????????????++fi;??????????????}????????????????return?i;??????????}????????private:??????????FilterIter?m_begin;??????????FilterIter?m_end;??????};??}??
4.服務(wù)器端的實(shí)現(xiàn)
服務(wù)器我定義為server_socket_utils,擁有一個(gè)session_manager,每當(dāng)accept成功得到一個(gè)socket_session時(shí),都會(huì)將其增加到session_manager去管理,注冊(cè)相關(guān)回調(diào)函數(shù)。
read_data_callback ? 接收到數(shù)據(jù)的回調(diào)函數(shù)
收到數(shù)據(jù)之后,也就是數(shù)據(jù)包的body部分,反序列化出command、business_type、app_id和data(我使用到了thrift),如果command==normal正常的業(yè)務(wù)包,會(huì)調(diào)用handle_read_data傳入data。
close_callback 關(guān)閉socket_session觸發(fā)的回調(diào)函數(shù)
根據(jù)id將該連接從session_manager中刪除掉
//server_socket_utils.h
[cpp] view plain
copy #pragma?once??#include?"socket_session.h"??#include?"session_manager.h"??#include?<boost/format.hpp>??#include?<firebird/message/message.hpp>????namespace?firebird{??????using?boost::asio::ip::tcp;????????class?FIREBIRD_DECL?server_socket_utils??????{??????private:??????????boost::asio::io_service?m_io_srv;??????????boost::asio::io_service::work?m_work;??????????tcp::acceptor?m_acceptor;????????????void?handle_accept(socket_session_ptr?session,?const?boost::system::error_code&?error);????????????void?close_callback(socket_session_ptr?session);??????????void?read_data_callback(const?boost::system::error_code&?e,???????????????socket_session_ptr?session,?message&?msg);????????protected:??????????virtual?void?handle_read_data(message&?msg,?socket_session_ptr?pSession)?=?0;????????public:??????????server_socket_utils(int?port);??????????~server_socket_utils(void);????????????void?start();??????????boost::asio::io_service&?get_io_service()?{?return?m_io_srv;?}????????????session_manager?m_manager;??????};??}??
//server_socket_utils.cpp
[cpp] view plain
copy #include?"server_socket_utils.h"????namespace?firebird{??????server_socket_utils::server_socket_utils(int?port)??????????:m_work(m_io_srv),??????????m_acceptor(m_io_srv,?tcp::endpoint(tcp::v4(),?port)),??????????m_manager(m_io_srv,?SERVER,?3)??????{??????????????????????????????????????????????????????????????????}????????server_socket_utils::~server_socket_utils(void)??????{??????}????????void?server_socket_utils::start()??????{??????????try{??????????????socket_session_ptr?new_session(new?socket_session(m_io_srv));??????????????m_acceptor.async_accept(new_session->socket(),??????????????????boost::bind(&server_socket_utils::handle_accept,?this,?new_session,??????????????????boost::asio::placeholders::error));??????????}??????????catch(std::exception&?e)??????????{??????????????LOG4CXX_ERROR(firebird_log,?KDS_CODE_INFO?<<?"socket異常:["?<<?e.what()?<<?"]");??????????}??????????catch(...)??????????{??????????????LOG4CXX_ERROR(firebird_log,?KDS_CODE_INFO?<<?"socket異常:[未知異常]");??????????}??????}????????void?server_socket_utils::handle_accept(socket_session_ptr?session,?const?boost::system::error_code&?error)??????{??????????if?(!error)??????????{??????????????try{??????????????????socket_session_ptr?new_session(new?socket_session(m_io_srv));??????????????????m_acceptor.async_accept(new_session->socket(),??????????????????????boost::bind(&server_socket_utils::handle_accept,?this,?new_session,??????????????????????boost::asio::placeholders::error));????????????????????if?(session?!=?NULL)??????????????????{????????????????????????????????????????????session->installCloseCallBack(boost::bind(&server_socket_utils::close_callback,?this,?_1));????????????????????????????????????????????session->installReadDataCallBack(boost::bind(&server_socket_utils::read_data_callback,?this,?_1,?_2,?_3));????????????????????????boost::format?fmt("%1%:%2%");??????????????????????fmt?%?session->socket().remote_endpoint().address().to_string();??????????????????????fmt?%?session->socket().remote_endpoint().port();??????????????????????session->set_remote_addr(fmt.str());????????????????????????session->start();??????????????????????m_manager.add_session(session);??????????????????}??????????????}??????????????catch(std::exception&?e)??????????????{??????????????????LOG4CXX_ERROR(firebird_log,?KDS_CODE_INFO?<<?"socket異常:["?<<?e.what()?<<?"]");??????????????}??????????????catch(...)??????????????{??????????????????LOG4CXX_ERROR(firebird_log,?KDS_CODE_INFO?<<?"socket異常:[未知異常]");??????????????}????????????}??????}????????void?server_socket_utils::close_callback(socket_session_ptr?session)??????{??????????LOG4CXX_DEBUG(firebird_log,?"close_callback");??????????try{??????????????m_manager.del_session<sid>(session->id());??????????}??????????catch(std::exception&?e)??????????{??????????????LOG4CXX_ERROR(firebird_log,?KDS_CODE_INFO?<<?"socket異常:["?<<?e.what()?<<?"]");??????????}??????????catch(...)??????????{??????????????LOG4CXX_ERROR(firebird_log,?KDS_CODE_INFO?<<?"socket異常:[未知異常]");??????????}????????}????????void?server_socket_utils::read_data_callback(const?boost::system::error_code&?e,???????????socket_session_ptr?session,?message&?msg)??????{??????????try{??????????????LOG4CXX_DEBUG(firebird_log,?"command?=["?<<?msg.command?<<?"],["???????????????????<<?msg.business_type?<<?"],["?<<?msg.data()?<<?"]");????????????????if?(msg.command?==?heartbeat)??????????????{??????????????????session->async_write(msg);??????????????}??????????????else?if?(msg.command?==?regist)??????????????{??????????????????session->set_business_type(msg.business_type);??????????????????session->set_app_id(msg.app_id);??????????????????m_manager.update_session(session);????????????????????session->async_write(msg);??????????????????LOG4CXX_FATAL(firebird_log,?"遠(yuǎn)程地址:["?<<?session->get_remote_addr()?<<?"],服務(wù)器類型:["?<<??????????????????????session->get_business_type()?<<?"],服務(wù)器ID:["?<<?session->get_app_id()?<<?"]注冊(cè)成功!");??????????????}??????????????else?if?(msg.command?==?normal)??????????????{??????????????????handle_read_data(msg,?session);??????????????}??????????????else???????????????{??????????????????LOG4CXX_ERROR(firebird_log,?KDS_CODE_INFO?<<?"收到非法消息包!");??????????????}??????????}??????????catch(std::exception&?e)??????????{??????????????LOG4CXX_ERROR(firebird_log,?KDS_CODE_INFO?<<?"socket異常:["?<<?e.what()?<<?"]");??????????}??????????catch(...)??????????{??????????????LOG4CXX_ERROR(firebird_log,?KDS_CODE_INFO?<<?"socket異常:[未知異常]");??????????}??????}??}??
5.客戶端
客戶端與服務(wù)器的邏輯也差不多,區(qū)別就是在于客戶端通過connect得到socket_session,而服務(wù)器是通過accept得到socket_session。
//client_socket_utils.h
[cpp] view plain
copy #pragma?once??#include?"socket_session.h"??#include?"session_manager.h"??#include?<boost/algorithm/string.hpp>??#include?<firebird/message/message.hpp>????namespace?firebird{??????class?FIREBIRD_DECL?client_socket_utils??????{??????public:??????????client_socket_utils();??????????~client_socket_utils();????????????void?session_connect(std::vector<socket_session_ptr>&?vSession);??????????void?session_connect(socket_session_ptr?pSession);????????????????????boost::asio::io_service&?get_io_service()?{?return?m_io_srv;?}????????protected:??????????virtual?void?handle_read_data(message&?msg,?socket_session_ptr?pSession)?=?0;????????private:??????????boost::asio::io_service?m_io_srv;??????????boost::asio::io_service::work?m_work;??????????session_manager?m_manager;????????????void?handle_connect(const?boost::system::error_code&?error,??????????????tcp::resolver::iterator?endpoint_iterator,?socket_session_ptr?pSession);????????????void?close_callback(socket_session_ptr?session);??????????void?read_data_callback(const?boost::system::error_code&?e,???????????????socket_session_ptr?session,?message&?msg);??????};??}??
//client_socket_utils.cpp
[cpp] view plain
copy #include?"client_socket_utils.h"????namespace?firebird{??????client_socket_utils::client_socket_utils()??????????:m_work(m_io_srv),?m_manager(m_io_srv,?CLIENT,?3)??????{??????}????????client_socket_utils::~client_socket_utils()??????{??????}????????void?client_socket_utils::session_connect(std::vector<socket_session_ptr>&?vSession)??????{??????????for?(int?i?=?0;?i?<?vSession.size();?++i)??????????{??????????????session_connect(vSession[i]);??????????}??????}????????void?client_socket_utils::session_connect(socket_session_ptr?pSession)??????{??????????std::string&?addr?=?pSession->get_remote_addr();??????????try{????????????????????????????pSession->installCloseCallBack(boost::bind(&client_socket_utils::close_callback,?this,?_1));????????????????????????????pSession->installReadDataCallBack(boost::bind(&client_socket_utils::read_data_callback,?this,?_1,?_2,?_3));????????????????std::vector<std::string>?ip_port;??????????????boost::split(ip_port,?addr,?boost::is_any_of(":"));????????????????if?(ip_port.size()?<?2)??????????????{????????????????????????????????????LOG4CXX_ERROR(firebird_log,?"["?<<?addr?<<?"]?ip?格式不正確!");??????????????????return;??????????????}????????????????tcp::resolver?resolver(pSession->socket().get_io_service());??????????????tcp::resolver::query?query(ip_port[0],?ip_port[1]);??????????????tcp::resolver::iterator?endpoint_iterator?=?resolver.resolve(query);????????????????????????????????????????????m_manager.add_session(pSession);????????????????tcp::endpoint?endpoint?=?*endpoint_iterator;??????????????pSession->socket().async_connect(endpoint,??????????????????boost::bind(&client_socket_utils::handle_connect,?this,??????????????????boost::asio::placeholders::error,?++endpoint_iterator,?pSession));??????????}??????????catch(std::exception&?e)??????????{??????????????LOG4CXX_ERROR(firebird_log,?KDS_CODE_INFO?<<?"連接遠(yuǎn)程地址:["?<<?addr?<<?"],socket異常:["?<<?e.what()?<<?"]");??????????}??????????catch(...)??????????{??????????????LOG4CXX_ERROR(firebird_log,?KDS_CODE_INFO?<<?"連接遠(yuǎn)程地址:["?<<?addr?<<?"],socket異常:[未知異常]");??????????}??????}????????void?client_socket_utils::handle_connect(const?boost::system::error_code&?error,??????????tcp::resolver::iterator?endpoint_iterator,?socket_session_ptr?pSession)??????{??????????LOG4CXX_DEBUG(firebird_log,?KDS_CODE_INFO?<<?"?enter.");??????????std::string?sLog;??????????try{??????????????if?(!error)??????????????{??????????????????LOG4CXX_FATAL(firebird_log,?"服務(wù)器:["?<<?pSession->get_business_type()?<<"],連接遠(yuǎn)程地址:["?<<?pSession->get_remote_addr().c_str()?<<?"]成功!");??????????????????pSession->start();??????????????????????????????????????message?msg;??????????????????msg.command?=?regist;??????????????????msg.business_type?=?pSession->get_business_type();??????????????????msg.app_id?=?pSession->get_app_id();??????????????????msg.data()?=?"R";????????????????????pSession->async_write(msg);??????????????}??????????????else?if?(endpoint_iterator?!=?tcp::resolver::iterator())??????????????{??????????????????LOG4CXX_ERROR(firebird_log,?"連接遠(yuǎn)程地址:["?<<?pSession->get_remote_addr().c_str()?<<?"]失敗,試圖重連下一個(gè)地址。");??????????????????pSession->socket().close();??????????????????tcp::endpoint?endpoint?=?*endpoint_iterator;??????????????????pSession->socket().async_connect(endpoint,??????????????????????boost::bind(&client_socket_utils::handle_connect,?this,??????????????????????boost::asio::placeholders::error,?++endpoint_iterator,?pSession));??????????????}??????????????else??????????????{??????????????????LOG4CXX_ERROR(firebird_log,?KDS_CODE_INFO?<<?"連接遠(yuǎn)程地址:["?<<?pSession->get_remote_addr().c_str()?<<?"]失敗!");??????????????????pSession->socket().close();??????????????}??????????}??????????catch(std::exception&?e)??????????{??????????????LOG4CXX_ERROR(firebird_log,?KDS_CODE_INFO?<<?"連接遠(yuǎn)程地址:["?<<?pSession->get_remote_addr().c_str()?<<"],socket異常:["?<<?e.what()?<<?"]");??????????}??????????catch(...)??????????{??????????????LOG4CXX_ERROR(firebird_log,?KDS_CODE_INFO?<<?"連接遠(yuǎn)程地址:["?<<?pSession->get_remote_addr().c_str()?<<"],socket異常:[未知異常]");??????????}??????}????????void?client_socket_utils::read_data_callback(const?boost::system::error_code&?e,???????????socket_session_ptr?session,?message&?msg)??????{??????????LOG4CXX_DEBUG(firebird_log,?"command?=["?<<?msg.command?<<?"],["???????????????<<?msg.business_type?<<?"],["?<<?msg.data()?<<?"]");????????????if?(msg.command?==?heartbeat)??????????{??????????}??????????else?if?(msg.command?==?regist)??????????{??????????????LOG4CXX_FATAL(firebird_log,?"服務(wù)器:["?<<?session->get_business_type()?<<"]注冊(cè)成功。");??????????}??????????else?if?(msg.command?==?normal)??????????{??????????????handle_read_data(msg,?session);??????????}??????????else???????????{??????????????LOG4CXX_ERROR(firebird_log,?KDS_CODE_INFO?<<?"收到非法消息包!");??????????}??????}??????????????void?client_socket_utils::close_callback(socket_session_ptr?session)??????{??????????LOG4CXX_DEBUG(firebird_log,?KDS_CODE_INFO?<<?"enter.");????????????try{??????????????????????????????std::string&?addr?=?session->get_remote_addr();????????????????std::vector<std::string>?ip_port;??????????????boost::split(ip_port,?addr,?boost::is_any_of(":"));????????????????if?(ip_port.size()?<?2)??????????????{??????????????????LOG4CXX_ERROR(firebird_log,?"["?<<?addr?<<?"]?ip?格式不正確!");??????????????????return;??????????????}????????????????tcp::resolver?resolver(session->socket().get_io_service());??????????????tcp::resolver::query?query(ip_port[0],?ip_port[1]);??????????????tcp::resolver::iterator?endpoint_iterator?=?resolver.resolve(query);????????????????tcp::endpoint?endpoint?=?*endpoint_iterator;??????????????session->socket().async_connect(endpoint,??????????????????boost::bind(&client_socket_utils::handle_connect,?this,??????????????????boost::asio::placeholders::error,?++endpoint_iterator,?session));??????????}??????????catch(std::exception&?e)??????????{??????????????LOG4CXX_ERROR(firebird_log,?KDS_CODE_INFO?<<?"連接遠(yuǎn)程地址:["?<<?session->get_remote_addr().c_str()?<<"],socket異常:["?<<?e.what()?<<?"]");??????????}??????????catch(...)??????????{??????????????LOG4CXX_ERROR(firebird_log,?KDS_CODE_INFO?<<?"連接遠(yuǎn)程地址:["?<<?session->get_remote_addr().c_str()?<<"],socket異常:[未知異常]");??????????}??????}??}??
5.對(duì)象串行化
socket_session發(fā)送和接收數(shù)據(jù)包的時(shí)候使用到了對(duì)象串行化,我這里是通過thrift實(shí)現(xiàn)的,其實(shí)boost的serialization庫(kù)也提供了這樣的功能,使用起來更為方便,但我在測(cè)試過程中,thrift相比之下性能會(huì)高很多,因此就堅(jiān)持使用thrift了,感興趣的話可以看我之前寫的《使用thrift串行化對(duì)象》和《輕量級(jí)序列化庫(kù)boost serialization》。
5.1字符串與thrift對(duì)象的相互轉(zhuǎn)換
[cpp] view plain
copy #pragma?once??#include?<boost/shared_ptr.hpp>??#include?<transport/TBufferTransports.h>??#include?<protocol/TProtocol.h>??#include?<protocol/TBinaryProtocol.h>????namespace?firebird{??????using?namespace?apache::thrift;??????using?namespace?apache::thrift::transport;??????using?namespace?apache::thrift::protocol;????????template<typename?T>??????void?thrift_iserialize(T&?stu,?std::string&?s)??????{??????????boost::shared_ptr<TMemoryBuffer>?trans(new?TMemoryBuffer((uint8_t*)&s[0],?s.size()));??????????boost::shared_ptr<TProtocol>?proto(new?TBinaryProtocol(trans));??????????stu.read(proto.get());??????}????????template<typename?T>??????void?thrift_oserialize(T&?stu,?std::string&?s)??????{??????????boost::shared_ptr<TMemoryBuffer>?trans(new?TMemoryBuffer());??????????boost::shared_ptr<TProtocol>?proto(new?TBinaryProtocol(trans));??????????stu.write(proto.get());??????????s?=?trans->getBufferAsString();??????}??}??
5.2通過thrift對(duì)象,普通的對(duì)象與字符串的相互轉(zhuǎn)換
[cpp] view plain
copy #pragma?once????#include?"message_archive.hpp"??#include?<firebird/archive/thrift_archive.hpp>??#include?<firebird/message/TMessage_types.h>????namespace?firebird??{????????????void?msg_to_tmsg(TMessage&?tmsg,?message&?msg)??????{????????????????????tmsg.command?=?msg.command;??????????tmsg.business_type?=?msg.business_type;??????????tmsg.app_id?=?msg.app_id;????????????????????tmsg.context.cmdVersion?=?msg.context().cmdVersion;??????????tmsg.context.cpid.swap(msg.context().cpid);??????????tmsg.context.remote_ip.swap(msg.context().remote_ip);??????????tmsg.context.wSerialNumber?=?msg.context().wSerialNumber;??????????tmsg.context.session_id?=?msg.context().session_id;??????????????????????for?(int?i?=?0;?i?<?msg.source().size();?++i)??????????{??????????????tmsg.source.push_back(msg.source()[i]);??????????}??????????????????????for?(int?i?=?0;?i?<?msg.destination().size();?++i)??????????{??????????????tmsg.destination.push_back(msg.destination()[i]);??????????}??????????????????????tmsg.data?=?msg.data();??????}??????????????void?tmsg_to_msg(message&?msg,?TMessage&?tmsg)??????{????????????????????msg.command?=?tmsg.command;??????????msg.business_type?=?tmsg.business_type;??????????msg.app_id?=?tmsg.app_id;??????????????????????msg.context().cmdVersion?=?tmsg.context.cmdVersion;??????????msg.context().cpid?=?tmsg.context.cpid;??????????msg.context().remote_ip?=?tmsg.context.remote_ip;??????????msg.context().wSerialNumber?=?tmsg.context.wSerialNumber;??????????msg.context().session_id?=?tmsg.context.session_id;??????????????????????for?(int?i?=?0;?i?<?tmsg.source.size();?++i)??????????{??????????????msg.source()?<<?tmsg.source[i];??????????}??????????????????????for?(int?i?=?0;?i?<?tmsg.destination.size();?++i)??????????{??????????????msg.destination()?<<?tmsg.destination[i];??????????}??????????????????????msg.data()?=?tmsg.data;??????}????????void?message_iarchive(message&?msg,?std::string&?s)??????{??????????TMessage?tmsg;??????????thrift_iserialize(tmsg,?s);??????????tmsg_to_msg(msg,?tmsg);??????}????????void?message_oarchive(std::string&?s,?message&?msg)??????{??????????TMessage?tmsg;??????????msg_to_tmsg(tmsg,?msg);??????????thrift_oserialize(tmsg,?s);??????}??}??
總結(jié)
以上是生活随笔為你收集整理的boost asio 异步实现tcp通讯的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。