Node.hh
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2012 Open Source Robotics Foundation
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *
16 */
17 
18 #ifndef GAZEBO_TRANSPORT_NODE_HH_
19 #define GAZEBO_TRANSPORT_NODE_HH_
20 
21 #ifndef Q_MOC_RUN
22 #include <tbb/task.h>
23 #endif
24 #include <boost/bind.hpp>
25 #include <boost/enable_shared_from_this.hpp>
26 #include <map>
27 #include <list>
28 #include <string>
29 #include <vector>
30 
33 #include "gazebo/util/system.hh"
34 
35 namespace gazebo
36 {
37  namespace transport
38  {
41  class GZ_TRANSPORT_VISIBLE PublishTask : public tbb::task
42  {
46  public: PublishTask(transport::PublisherPtr _pub,
47  const google::protobuf::Message &_message)
48  : pub(_pub)
49  {
50  this->msg = _message.New();
51  this->msg->CopyFrom(_message);
52  }
53 
56  public: tbb::task *execute()
57  {
58  this->pub->WaitForConnection();
59  this->pub->Publish(*this->msg, true);
60  this->pub->SendMessage();
61  delete this->msg;
62  this->pub.reset();
63  return NULL;
64  }
65 
67  private: transport::PublisherPtr pub;
68 
70  private: google::protobuf::Message *msg;
71  };
73 
76 
80  class GZ_TRANSPORT_VISIBLE Node :
81  public boost::enable_shared_from_this<Node>
82  {
84  public: Node();
85 
87  public: virtual ~Node();
88 
97  public: void Init(const std::string &_space ="");
98 
110  public: bool TryInit(
111  const common::Time &_maxWait = common::Time(1, 0));
112 
117  public: bool IsInitialized() const;
118 
120  public: void Fini();
121 
124  public: std::string GetTopicNamespace() const;
125 
129  public: std::string DecodeTopicName(const std::string &_topic);
130 
134  public: std::string EncodeTopicName(const std::string &_topic);
135 
138  public: unsigned int GetId() const;
139 
142  public: void ProcessPublishers();
143 
145  public: void ProcessIncoming();
146 
150  public: bool HasLatchedSubscriber(const std::string &_topic) const;
151 
152 
159  public: template<typename M>
160  void Publish(const std::string &_topic,
161  const google::protobuf::Message &_message)
162  {
163  transport::PublisherPtr pub = this->Advertise<M>(_topic);
164  PublishTask *task = new(tbb::task::allocate_root())
165  PublishTask(pub, _message);
166 
167  tbb::task::enqueue(*task);
168  return;
169  }
170 
178  public: template<typename M>
179  transport::PublisherPtr Advertise(const std::string &_topic,
180  unsigned int _queueLimit = 1000,
181  double _hzRate = 0)
182  {
183  std::string decodedTopic = this->DecodeTopicName(_topic);
184  PublisherPtr publisher =
185  transport::TopicManager::Instance()->Advertise<M>(
186  decodedTopic, _queueLimit, _hzRate);
187 
188  boost::mutex::scoped_lock lock(this->publisherMutex);
189  publisher->SetNode(shared_from_this());
190  this->publishers.push_back(publisher);
191 
192  return publisher;
193  }
194 
201  public: void Publish(const std::string &_topic,
202  const google::protobuf::Message &_message)
203  {
204  transport::PublisherPtr pub = this->Advertise(_topic,
205  _message.GetTypeName());
206  pub->WaitForConnection();
207 
208  pub->Publish(_message, true);
209  }
210 
218  public: transport::PublisherPtr Advertise(const std::string &_topic,
219  const std::string &_msgTypeName,
220  unsigned int _queueLimit = 1000,
221  double _hzRate = 0)
222  {
223  std::string decodedTopic = this->DecodeTopicName(_topic);
224  PublisherPtr publisher =
226  decodedTopic, _msgTypeName, _queueLimit, _hzRate);
227 
228  boost::mutex::scoped_lock lock(this->publisherMutex);
229  publisher->SetNode(shared_from_this());
230  this->publishers.push_back(publisher);
231 
232  return publisher;
233  }
234 
242  public: template<typename M, typename T>
243  SubscriberPtr Subscribe(const std::string &_topic,
244  void(T::*_fp)(const boost::shared_ptr<M const> &), T *_obj,
245  bool _latching = false)
246  {
247  SubscribeOptions ops;
248  std::string decodedTopic = this->DecodeTopicName(_topic);
249  ops.template Init<M>(decodedTopic, shared_from_this(), _latching);
250 
251  {
252  boost::recursive_mutex::scoped_lock lock(this->incomingMutex);
253  this->callbacks[decodedTopic].push_back(CallbackHelperPtr(
254  new CallbackHelperT<M>(boost::bind(_fp, _obj, _1), _latching)));
255  }
256 
257  SubscriberPtr result =
258  transport::TopicManager::Instance()->Subscribe(ops);
259 
260  result->SetCallbackId(this->callbacks[decodedTopic].back()->GetId());
261 
262  return result;
263  }
264 
271  public: template<typename M>
272  SubscriberPtr Subscribe(const std::string &_topic,
273  void(*_fp)(const boost::shared_ptr<M const> &),
274  bool _latching = false)
275  {
276  SubscribeOptions ops;
277  std::string decodedTopic = this->DecodeTopicName(_topic);
278  ops.template Init<M>(decodedTopic, shared_from_this(), _latching);
279 
280  {
281  boost::recursive_mutex::scoped_lock lock(this->incomingMutex);
282  this->callbacks[decodedTopic].push_back(
283  CallbackHelperPtr(new CallbackHelperT<M>(_fp, _latching)));
284  }
285 
286  SubscriberPtr result =
287  transport::TopicManager::Instance()->Subscribe(ops);
288 
289  result->SetCallbackId(this->callbacks[decodedTopic].back()->GetId());
290 
291  return result;
292  }
293 
301  template<typename T>
302  SubscriberPtr Subscribe(const std::string &_topic,
303  void(T::*_fp)(const std::string &), T *_obj,
304  bool _latching = false)
305  {
306  SubscribeOptions ops;
307  std::string decodedTopic = this->DecodeTopicName(_topic);
308  ops.Init(decodedTopic, shared_from_this(), _latching);
309 
310  {
311  boost::recursive_mutex::scoped_lock lock(this->incomingMutex);
312  this->callbacks[decodedTopic].push_back(CallbackHelperPtr(
313  new RawCallbackHelper(boost::bind(_fp, _obj, _1))));
314  }
315 
316  SubscriberPtr result =
317  transport::TopicManager::Instance()->Subscribe(ops);
318 
319  result->SetCallbackId(this->callbacks[decodedTopic].back()->GetId());
320 
321  return result;
322  }
323 
324 
331  SubscriberPtr Subscribe(const std::string &_topic,
332  void(*_fp)(const std::string &), bool _latching = false)
333  {
334  SubscribeOptions ops;
335  std::string decodedTopic = this->DecodeTopicName(_topic);
336  ops.Init(decodedTopic, shared_from_this(), _latching);
337 
338  {
339  boost::recursive_mutex::scoped_lock lock(this->incomingMutex);
340  this->callbacks[decodedTopic].push_back(
342  }
343 
344  SubscriberPtr result =
345  transport::TopicManager::Instance()->Subscribe(ops);
346 
347  result->SetCallbackId(this->callbacks[decodedTopic].back()->GetId());
348 
349  return result;
350  }
351 
356  public: bool HandleData(const std::string &_topic,
357  const std::string &_msg);
358 
363  public: bool HandleMessage(const std::string &_topic, MessagePtr _msg);
364 
371  public: void InsertLatchedMsg(const std::string &_topic,
372  const std::string &_msg);
373 
380  public: void InsertLatchedMsg(const std::string &_topic,
381  MessagePtr _msg);
382 
386  public: std::string GetMsgType(const std::string &_topic) const;
387 
393  public: void RemoveCallback(const std::string &_topic, unsigned int _id);
394 
406  private: bool PrivateInit(const std::string &_space,
407  const common::Time &_maxWait,
408  const bool _fallbackToDefault);
409 
410  private: std::string topicNamespace;
411  private: std::vector<PublisherPtr> publishers;
412  private: std::vector<PublisherPtr>::iterator publishersIter;
413  private: static unsigned int idCounter;
414  private: unsigned int id;
415 
416  private: typedef std::list<CallbackHelperPtr> Callback_L;
417  private: typedef std::map<std::string, Callback_L> Callback_M;
418  private: Callback_M callbacks;
419  private: std::map<std::string, std::list<std::string> > incomingMsgs;
420 
422  private: std::map<std::string, std::list<MessagePtr> > incomingMsgsLocal;
423 
424  private: boost::mutex publisherMutex;
425  private: boost::mutex publisherDeleteMutex;
426  private: boost::recursive_mutex incomingMutex;
427 
430  private: boost::recursive_mutex processIncomingMutex;
431 
432  private: bool initialized;
433  };
435  }
436 }
437 #endif
void ProcessIncoming()
Process incoming messages.
transport::PublisherPtr Advertise(const std::string &_topic, const std::string &_msgTypeName, unsigned int _queueLimit=1000, double _hzRate=0)
Advertise a topic.
Definition: Node.hh:218
SubscriberPtr Subscribe(const std::string &_topic, void(*_fp)(const boost::shared_ptr< M const > &), bool _latching=false)
Subscribe to a topic using a bare function as the callback.
Definition: Node.hh:272
Node()
Constructor.
void Publish(const std::string &_topic, const google::protobuf::Message &_message)
A convenience function for a one-time publication of a message.
Definition: Node.hh:201
Options for a subscription.
Definition: SubscribeOptions.hh:36
virtual ~Node()
Destructor.
Forward declarations for the common classes.
Definition: Animation.hh:27
boost::shared_ptr< google::protobuf::Message > MessagePtr
Definition: TransportTypes.hh:45
A Time class, can be used to hold wall- or sim-time. stored as sec and nano-sec.
Definition: Time.hh:48
#define NULL
Definition: CommonTypes.hh:31
boost::shared_ptr< Publisher > PublisherPtr
Definition: TransportTypes.hh:49
bool HandleMessage(const std::string &_topic, MessagePtr _msg)
Handle incoming msg.
std::string GetMsgType(const std::string &_topic) const
Get the message type for a topic.
Forward declarations for transport.
bool HandleData(const std::string &_topic, const std::string &_msg)
Handle incoming data.
std::string EncodeTopicName(const std::string &_topic)
Encode a topic name.
void InsertLatchedMsg(const std::string &_topic, const std::string &_msg)
Add a latched message to the node for publication.
std::string DecodeTopicName(const std::string &_topic)
Decode a topic name.
boost::shared_ptr< Subscriber > SubscriberPtr
Definition: TransportTypes.hh:53
void Publish(const std::string &_topic, const google::protobuf::Message &_message)
A convenience function for a one-time publication of a message.
Definition: Node.hh:160
SubscriberPtr Subscribe(const std::string &_topic, void(*_fp)(const std::string &), bool _latching=false)
Subscribe to a topic using a bare function as the callback.
Definition: Node.hh:331
A node can advertise and subscribe topics, publish on advertised topics and listen to subscribed topi...
Definition: Node.hh:82
unsigned int GetId() const
Get the unique ID of the node.
bool TryInit(const common::Time &_maxWait=common::Time(1, 0))
Try to initialize the node to use the global namespace, and specify the maximum wait time.
Callback helper Template.
Definition: CallbackHelper.hh:112
void InsertLatchedMsg(const std::string &_topic, MessagePtr _msg)
Add a latched message to the node for publication.
static TopicManager * Instance()
Get an instance of the singleton.
Definition: SingletonT.hh:36
void RemoveCallback(const std::string &_topic, unsigned int _id)
void Init(const std::string &_space="")
Init the node.
void Init(const std::string &_topic, NodePtr _node, bool _latching)
Initialize the options.
Definition: SubscribeOptions.hh:48
transport
Definition: ConnectionManager.hh:35
void ProcessPublishers()
Process all publishers, which has each publisher send it's most recent message over the wire.
transport::PublisherPtr Advertise(const std::string &_topic, unsigned int _queueLimit=1000, double _hzRate=0)
Advertise a topic.
Definition: Node.hh:179
SubscriberPtr Subscribe(const std::string &_topic, void(T::*_fp)(const std::string &), T *_obj, bool _latching=false)
Subscribe to a topic using a class method as the callback.
Definition: Node.hh:302
boost::shared_ptr< CallbackHelper > CallbackHelperPtr
boost shared pointer to transport::CallbackHelper
Definition: CallbackHelper.hh:105
std::string GetTopicNamespace() const
Get the topic namespace for this node.
bool HasLatchedSubscriber(const std::string &_topic) const
Return true if a subscriber on a specific topic is latched.
SubscriberPtr Subscribe(const std::string &_topic, void(T::*_fp)(const boost::shared_ptr< M const > &), T *_obj, bool _latching=false)
Subscribe to a topic using a class method as the callback.
Definition: Node.hh:243
void Fini()
Finalize the node.
bool IsInitialized() const
Check if this Node has been initialized.
Used to connect publishers to subscribers, where the subscriber wants the raw data from the publisher...
Definition: CallbackHelper.hh:178