17 #ifndef _CONNECTION_HH_
18 #define _CONNECTION_HH_
23 #include <google/protobuf/message.h>
25 #include <boost/asio.hpp>
26 #include <boost/bind.hpp>
27 #include <boost/function.hpp>
28 #include <boost/thread.hpp>
29 #include <boost/tuple/tuple.hpp>
44 #define HEADER_LENGTH 8
59 class GZ_TRANSPORT_VISIBLE ConnectionReadTask :
public tbb::task
65 public: ConnectionReadTask(
66 boost::function<
void (
const std::string &)> _func,
67 const std::string &_data) :
75 public: tbb::task *execute()
77 this->func(this->data);
82 private: boost::function<void (
const std::string &)> func;
85 private: std::string data;
104 public boost::enable_shared_from_this<Connection>
116 public:
bool Connect(
const std::string &_host,
unsigned int _port);
146 private:
void Close();
154 public:
bool Read(std::string &_data);
164 boost::function<
void(uint32_t)> _cb, uint32_t _id,
165 bool _force =
false);
171 public:
void EnqueueMsg(
const std::string &_buffer,
bool _force =
false);
207 public:
template<
typename Handler>
210 boost::mutex::scoped_lock lock(this->socketMutex);
213 gzerr <<
"AsyncRead on a closed socket\n";
217 void (
Connection::*f)(
const boost::system::error_code &,
218 boost::tuple<Handler>) = &Connection::OnReadHeader<Handler>;
221 boost::asio::async_read(*this->socket,
222 boost::asio::buffer(this->inboundHeader),
224 boost::asio::placeholders::error,
225 boost::make_tuple(_handler)));
235 private:
template<
typename Handler>
236 void OnReadHeader(
const boost::system::error_code &_e,
237 boost::tuple<Handler> _handler)
241 if (_e.value() == boost::asio::error::eof)
242 this->isOpen =
false;
246 std::size_t inboundData_size = 0;
247 std::string header(&this->inboundHeader[0],
248 this->inboundHeader.size());
249 this->inboundHeader.clear();
251 inboundData_size = this->ParseHeader(header);
253 if (inboundData_size > 0)
256 this->inboundData.resize(inboundData_size);
258 void (Connection::*f)(
const boost::system::error_code &e,
259 boost::tuple<Handler>) =
260 &Connection::OnReadData<Handler>;
262 boost::asio::async_read(*this->socket,
263 boost::asio::buffer(this->inboundData),
265 boost::asio::placeholders::error,
270 gzerr <<
"Header is empty\n";
271 boost::get<0>(_handler)(
"");
295 private:
template<
typename Handler>
296 void OnReadData(
const boost::system::error_code &_e,
297 boost::tuple<Handler> _handler)
301 if (_e.value() == boost::asio::error::eof)
302 this->isOpen =
false;
306 std::string data(&this->inboundData[0],
307 this->inboundData.size());
308 this->inboundData.clear();
311 gzerr <<
"OnReadData got empty data!!!\n";
315 ConnectionReadTask *task =
new(tbb::task::allocate_root())
316 ConnectionReadTask(boost::get<0>(_handler), data);
317 tbb::task::enqueue(*task);
329 {
return this->
shutdown.Connect(_subscriber); }
350 private:
void PostWrite();
355 private:
void OnWrite(
const boost::system::error_code &_e);
359 private:
void OnAccept(
const boost::system::error_code &_e);
363 private: std::size_t ParseHeader(
const std::string &_header);
370 private:
static boost::asio::ip::tcp::endpoint GetLocalEndpoint();
374 private: boost::asio::ip::tcp::endpoint GetRemoteEndpoint()
const;
378 private:
static std::string GetHostname(
379 boost::asio::ip::tcp::endpoint _ep);
384 private:
void OnConnect(
const boost::system::error_code &_error,
385 boost::asio::ip::tcp::resolver::iterator _endPointIter);
388 private: boost::asio::ip::tcp::socket *socket;
391 private: boost::asio::ip::tcp::acceptor *acceptor;
394 private: std::deque<std::string> writeQueue;
398 private: std::deque< std::vector<
399 std::pair<boost::function<void(uint32_t)>, uint32_t> > >
403 private: boost::mutex connectMutex;
406 private: boost::recursive_mutex writeMutex;
409 private: boost::recursive_mutex readMutex;
412 private:
mutable boost::mutex socketMutex;
415 private: boost::condition_variable connectCondition;
421 private: std::vector<char> inboundHeader;
424 private: std::vector<char> inboundData;
427 private:
bool readQuit;
430 private:
unsigned int id;
433 private:
static unsigned int idCounter;
445 private:
unsigned int writeCount;
448 private: std::string localURI;
451 private: std::string localAddress;
454 private: std::string remoteURI;
457 private: std::string remoteAddress;
460 private:
bool connectError;
463 private: std::string ipWhiteList;
466 private:
bool dropMsgLogged;
469 private:
bool isOpen;