Connection.hh
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2012 Open Source Robotics Foundation
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *
16 */
17 #ifndef _CONNECTION_HH_
18 #define _CONNECTION_HH_
19 
20 #ifndef Q_MOC_RUN
21 #include <tbb/task.h>
22 #endif
23 #include <google/protobuf/message.h>
24 
25 #include <boost/asio.hpp>
26 #include <boost/bind.hpp>
27 #include <boost/function.hpp>
28 #include <boost/thread.hpp>
29 #include <boost/tuple/tuple.hpp>
30 
31 #include <string>
32 #include <vector>
33 #include <iostream>
34 #include <iomanip>
35 #include <deque>
36 #include <utility>
37 
38 #include "gazebo/common/Event.hh"
39 #include "gazebo/common/Console.hh"
42 #include "gazebo/util/system.hh"
43 
44 #define HEADER_LENGTH 8
45 
46 namespace gazebo
47 {
48  namespace transport
49  {
50  extern GZ_TRANSPORT_VISIBLE bool is_stopped();
51 
52  class IOManager;
53  class Connection;
54  typedef boost::shared_ptr<Connection> ConnectionPtr;
55 
59  class GZ_TRANSPORT_VISIBLE ConnectionReadTask : public tbb::task
60  {
65  public: ConnectionReadTask(
66  boost::function<void (const std::string &)> _func,
67  const std::string &_data) :
68  func(_func),
69  data(_data)
70  {
71  }
72 
75  public: tbb::task *execute()
76  {
77  this->func(this->data);
78  return NULL;
79  }
80 
82  private: boost::function<void (const std::string &)> func;
83 
85  private: std::string data;
86  };
88 
103  class GZ_TRANSPORT_VISIBLE Connection :
104  public boost::enable_shared_from_this<Connection>
105  {
107  public: Connection();
108 
110  public: virtual ~Connection();
111 
116  public: bool Connect(const std::string &_host, unsigned int _port);
117 
119  typedef boost::function<void(const ConnectionPtr&)> AcceptCallback;
120 
125  public: void Listen(unsigned int _port, const AcceptCallback &_acceptCB);
126 
128  typedef boost::function<void(const std::string &_data)> ReadCallback;
129 
133  public: void StartRead(const ReadCallback &_cb);
134 
136  public: void StopRead();
137 
139  public: void Shutdown();
140 
143  public: bool IsOpen() const;
144 
146  private: void Close();
147 
149  public: void Cancel();
150 
154  public: bool Read(std::string &_data);
155 
163  public: void EnqueueMsg(const std::string &_buffer,
164  boost::function<void(uint32_t)> _cb, uint32_t _id,
165  bool _force = false);
166 
171  public: void EnqueueMsg(const std::string &_buffer, bool _force = false);
172 
175  public: std::string GetLocalURI() const;
176 
179  public: std::string GetRemoteURI() const;
180 
183  public: std::string GetLocalAddress() const;
184 
187  public: unsigned int GetLocalPort() const;
188 
191  public: std::string GetRemoteAddress() const;
192 
195  public: unsigned int GetRemotePort() const;
196 
199  public: std::string GetRemoteHostname() const;
200 
203  public: static std::string GetLocalHostname();
204 
207  public: template<typename Handler>
208  void AsyncRead(Handler _handler)
209  {
210  boost::mutex::scoped_lock lock(this->socketMutex);
211  if (!this->IsOpen())
212  {
213  gzerr << "AsyncRead on a closed socket\n";
214  return;
215  }
216 
217  void (Connection::*f)(const boost::system::error_code &,
218  boost::tuple<Handler>) = &Connection::OnReadHeader<Handler>;
219 
220  this->inboundHeader.resize(HEADER_LENGTH);
221  boost::asio::async_read(*this->socket,
222  boost::asio::buffer(this->inboundHeader),
223  common::weakBind(f, this->shared_from_this(),
224  boost::asio::placeholders::error,
225  boost::make_tuple(_handler)));
226  }
227 
235  private: template<typename Handler>
236  void OnReadHeader(const boost::system::error_code &_e,
237  boost::tuple<Handler> _handler)
238  {
239  if (_e)
240  {
241  if (_e.value() == boost::asio::error::eof)
242  this->isOpen = false;
243  }
244  else
245  {
246  std::size_t inboundData_size = 0;
247  std::string header(&this->inboundHeader[0],
248  this->inboundHeader.size());
249  this->inboundHeader.clear();
250 
251  inboundData_size = this->ParseHeader(header);
252 
253  if (inboundData_size > 0)
254  {
255  // Start the asynchronous call to receive data
256  this->inboundData.resize(inboundData_size);
257 
258  void (Connection::*f)(const boost::system::error_code &e,
259  boost::tuple<Handler>) =
260  &Connection::OnReadData<Handler>;
261 
262  boost::asio::async_read(*this->socket,
263  boost::asio::buffer(this->inboundData),
264  common::weakBind(f, this->shared_from_this(),
265  boost::asio::placeholders::error,
266  _handler));
267  }
268  else
269  {
270  gzerr << "Header is empty\n";
271  boost::get<0>(_handler)("");
272  // This code tries to read the header again. We should
273  // never get here.
274  // this->inboundHeader.resize(HEADER_LENGTH);
275 
276  // void (Connection::*f)(const boost::system::error_code &,
277  // boost::tuple<Handler>) =
278  // &Connection::OnReadHeader<Handler>;
279 
280  // boost::asio::async_read(*this->socket,
281  // boost::asio::buffer(this->inboundHeader),
282  // common::weakBind(f, this->shared_from_this(),
283  // boost::asio::placeholders::error, _handler));
284  }
285  }
286  }
287 
295  private: template<typename Handler>
296  void OnReadData(const boost::system::error_code &_e,
297  boost::tuple<Handler> _handler)
298  {
299  if (_e)
300  {
301  if (_e.value() == boost::asio::error::eof)
302  this->isOpen = false;
303  }
304 
305  // Inform caller that data has been received
306  std::string data(&this->inboundData[0],
307  this->inboundData.size());
308  this->inboundData.clear();
309 
310  if (data.empty())
311  gzerr << "OnReadData got empty data!!!\n";
312 
313  if (!_e && !transport::is_stopped())
314  {
315  ConnectionReadTask *task = new(tbb::task::allocate_root())
316  ConnectionReadTask(boost::get<0>(_handler), data);
317  tbb::task::enqueue(*task);
318 
319  // Non-tbb version:
320  // boost::get<0>(_handler)(data);
321  }
322  }
323 
327  public: event::ConnectionPtr ConnectToShutdown(boost::function<void()>
328  _subscriber)
329  { return this->shutdown.Connect(_subscriber); }
330 
332  public: void ProcessWriteQueue(bool _blocking = false);
333 
336  public: unsigned int GetId() const;
337 
341  public: static bool ValidateIP(const std::string &_ip);
342 
346  public: std::string GetIPWhiteList() const;
347 
350  private: void PostWrite();
351 
355  private: void OnWrite(const boost::system::error_code &_e);
356 
359  private: void OnAccept(const boost::system::error_code &_e);
360 
363  private: std::size_t ParseHeader(const std::string &_header);
364 
366  private: void ReadLoop(const ReadCallback &_cb);
367 
370  private: static boost::asio::ip::tcp::endpoint GetLocalEndpoint();
371 
374  private: boost::asio::ip::tcp::endpoint GetRemoteEndpoint() const;
375 
378  private: static std::string GetHostname(
379  boost::asio::ip::tcp::endpoint _ep);
380 
384  private: void OnConnect(const boost::system::error_code &_error,
385  boost::asio::ip::tcp::resolver::iterator _endPointIter);
386 
388  private: boost::asio::ip::tcp::socket *socket;
389 
391  private: boost::asio::ip::tcp::acceptor *acceptor;
392 
394  private: std::deque<std::string> writeQueue;
395 
398  private: std::deque< std::vector<
399  std::pair<boost::function<void(uint32_t)>, uint32_t> > >
400  callbacks;
401 
403  private: boost::mutex connectMutex;
404 
406  private: boost::recursive_mutex writeMutex;
407 
409  private: boost::recursive_mutex readMutex;
410 
412  private: mutable boost::mutex socketMutex;
413 
415  private: boost::condition_variable connectCondition;
416 
418  private: AcceptCallback acceptCB;
419 
421  private: std::vector<char> inboundHeader;
422 
424  private: std::vector<char> inboundData;
425 
427  private: bool readQuit;
428 
430  private: unsigned int id;
431 
433  private: static unsigned int idCounter;
434 
436  private: ConnectionPtr acceptConn;
437 
439  private: event::EventT<void()> shutdown;
440 
442  private: static IOManager *iomanager;
443 
445  private: unsigned int writeCount;
446 
448  private: std::string localURI;
449 
451  private: std::string localAddress;
452 
454  private: std::string remoteURI;
455 
457  private: std::string remoteAddress;
458 
460  private: bool connectError;
461 
463  private: std::string ipWhiteList;
464 
466  private: bool dropMsgLogged;
467 
469  private: bool isOpen;
470  };
472  }
473 }
474 #endif
unsigned int GetRemotePort() const
Get the remote port number.
void EnqueueMsg(const std::string &_buffer, boost::function< void(uint32_t)> _cb, uint32_t _id, bool _force=false)
Write data to the socket.
void Listen(unsigned int _port, const AcceptCallback &_acceptCB)
Start a server that listens on a port.
Forward declarations for the common classes.
Definition: Animation.hh:27
#define NULL
Definition: CommonTypes.hh:31
boost::function< void(const std::string &_data)> ReadCallback
The signature of a connection read callback.
Definition: Connection.hh:128
void StartRead(const ReadCallback &_cb)
Start a thread that reads from the connection and passes new message to the ReadCallback.
void AsyncRead(Handler _handler)
Peform an asyncronous read param[in] _handler Callback to invoke on received data.
Definition: Connection.hh:208
void StopRead()
Stop the read loop.
std::string GetRemoteAddress() const
Get the remote address.
static bool ValidateIP(const std::string &_ip)
Return true if the _ip is a valid.
bool Connect(const std::string &_host, unsigned int _port)
Connect to a remote host.
bool is_stopped()
Is the transport system stopped?
unsigned int GetLocalPort() const
Get the port of this connection.
#define HEADER_LENGTH
Definition: Connection.hh:44
#define gzerr
Output an error message.
Definition: Console.hh:50
A class for event processing.
Definition: Event.hh:98
void EnqueueMsg(const std::string &_buffer, bool _force=false)
Write data to the socket.
std::string GetRemoteURI() const
Get the remote URI.
std::string GetLocalAddress() const
Get the local address of this connection.
virtual ~Connection()
Destructor.
event::ConnectionPtr ConnectToShutdown(boost::function< void()> _subscriber)
Register a function to be called when the connection is shut down.
Definition: Connection.hh:327
ConnectionPtr Connect(const std::function< T > &_subscriber)
Connect a callback to this event.
Definition: Event.hh:558
void Cancel()
Cancel all async operations on an open socket.
boost::shared_ptr< Connection > ConnectionPtr
Definition: Connection.hh:53
boost::shared_ptr< Connection > ConnectionPtr
Definition: CommonTypes.hh:134
bool Read(std::string &_data)
Read data from the socket.
GAZEBO_VISIBLE bool shutdown()
Stop and cleanup simulation.
Manages boost::asio IO.
Definition: IOManager.hh:34
void Shutdown()
Shutdown the socket.
Connection()
Constructor.
transport
Definition: ConnectionManager.hh:35
Single TCP/IP connection manager.
Definition: Connection.hh:105
std::string GetLocalURI() const
Get the local URI.
void ProcessWriteQueue(bool _blocking=false)
Handle on-write callbacks.
unsigned int GetId() const
Get the ID of the connection.
std::string GetRemoteHostname() const
Get the remote hostname.
boost::function< void(const ConnectionPtr &)> AcceptCallback
The signature of a connection accept callback.
Definition: Connection.hh:119
static std::string GetLocalHostname()
Get the local hostname.
bool IsOpen() const
Is the connection open?
std::string GetIPWhiteList() const
Get the IP white list, from GAZEBO_IP_WHITE_LIST environment variable.
auto weakBind(Func _func, boost::shared_ptr< T > _ptr, Args... _args) -> decltype(details::makeWeakBinder(boost::bind(_func, _ptr.get(), _args...), boost::weak_ptr< T >(_ptr)))
Definition: WeakBind.hh:110