Go to the documentation of this file.
18 #ifndef GAZEBO_TRANSPORT_NODE_HH_
19 #define GAZEBO_TRANSPORT_NODE_HH_
24 #include <boost/bind.hpp>
25 #include <boost/enable_shared_from_this.hpp>
41 class GZ_TRANSPORT_VISIBLE PublishTask :
public tbb::task
47 const google::protobuf::Message &_message)
50 this->msg = _message.New();
51 this->msg->CopyFrom(_message);
56 public: tbb::task *execute()
58 this->pub->WaitForConnection();
59 this->pub->Publish(*this->msg,
true);
60 this->pub->SendMessage();
70 private: google::protobuf::Message *msg;
80 class GZ_TRANSPORT_VISIBLE
Node :
81 public boost::enable_shared_from_this<Node>
97 public:
void Init(
const std::string &_space =
"");
159 public:
template<
typename M>
161 const google::protobuf::Message &_message)
164 PublishTask *task =
new(tbb::task::allocate_root())
165 PublishTask(pub, _message);
167 tbb::task::enqueue(*task);
178 public:
template<
typename M>
180 unsigned int _queueLimit = 1000,
183 std::string decodedTopic = this->DecodeTopicName(_topic);
186 decodedTopic, _queueLimit, _hzRate);
188 boost::mutex::scoped_lock lock(this->publisherMutex);
189 publisher->SetNode(shared_from_this());
190 this->publishers.push_back(publisher);
201 public:
void Publish(
const std::string &_topic,
202 const google::protobuf::Message &_message)
205 _message.GetTypeName());
206 pub->WaitForConnection();
208 pub->Publish(_message,
true);
219 const std::string &_msgTypeName,
220 unsigned int _queueLimit = 1000,
223 std::string decodedTopic = this->DecodeTopicName(_topic);
226 decodedTopic, _msgTypeName, _queueLimit, _hzRate);
228 boost::mutex::scoped_lock lock(this->publisherMutex);
229 publisher->SetNode(shared_from_this());
230 this->publishers.push_back(publisher);
242 public:
template<
typename M,
typename T>
244 void(T::*_fp)(
const boost::shared_ptr<M const> &), T *_obj,
245 bool _latching =
false)
248 std::string decodedTopic = this->DecodeTopicName(_topic);
249 ops.template Init<M>(decodedTopic, shared_from_this(), _latching);
252 boost::recursive_mutex::scoped_lock lock(this->incomingMutex);
260 result->SetCallbackId(this->callbacks[decodedTopic].back()->GetId());
271 public:
template<
typename M>
273 void(*_fp)(
const boost::shared_ptr<M const> &),
274 bool _latching =
false)
277 std::string decodedTopic = this->DecodeTopicName(_topic);
278 ops.template Init<M>(decodedTopic, shared_from_this(), _latching);
281 boost::recursive_mutex::scoped_lock lock(this->incomingMutex);
282 this->callbacks[decodedTopic].push_back(
289 result->SetCallbackId(this->callbacks[decodedTopic].back()->GetId());
303 void(T::*_fp)(
const std::string &), T *_obj,
304 bool _latching =
false)
307 std::string decodedTopic = this->DecodeTopicName(_topic);
308 ops.
Init(decodedTopic, shared_from_this(), _latching);
311 boost::recursive_mutex::scoped_lock lock(this->incomingMutex);
319 result->SetCallbackId(this->callbacks[decodedTopic].back()->GetId());
332 void(*_fp)(
const std::string &),
bool _latching =
false)
335 std::string decodedTopic = this->DecodeTopicName(_topic);
336 ops.
Init(decodedTopic, shared_from_this(), _latching);
339 boost::recursive_mutex::scoped_lock lock(this->incomingMutex);
340 this->callbacks[decodedTopic].push_back(
347 result->SetCallbackId(this->callbacks[decodedTopic].back()->GetId());
357 const std::string &_msg);
372 const std::string &_msg);
386 public: std::string
GetMsgType(
const std::string &_topic)
const;
406 private:
bool PrivateInit(
const std::string &_space,
408 const bool _fallbackToDefault);
410 private: std::string topicNamespace;
411 private: std::vector<PublisherPtr> publishers;
412 private: std::vector<PublisherPtr>::iterator publishersIter;
413 private:
static unsigned int idCounter;
414 private:
unsigned int id;
416 private:
typedef std::list<CallbackHelperPtr> Callback_L;
417 private:
typedef std::map<std::string, Callback_L> Callback_M;
418 private: Callback_M callbacks;
419 private: std::map<std::string, std::list<std::string> > incomingMsgs;
422 private: std::map<std::string, std::list<MessagePtr> > incomingMsgsLocal;
424 private: boost::mutex publisherMutex;
425 private: boost::mutex publisherDeleteMutex;
426 private: boost::recursive_mutex incomingMutex;
430 private: boost::recursive_mutex processIncomingMutex;
432 private:
bool initialized;
void ProcessIncoming()
Process incoming messages.
transport::PublisherPtr Advertise(const std::string &_topic, const std::string &_msgTypeName, unsigned int _queueLimit=1000, double _hzRate=0)
Advertise a topic.
Definition: Node.hh:218
SubscriberPtr Subscribe(const std::string &_topic, void(*_fp)(const boost::shared_ptr< M const > &), bool _latching=false)
Subscribe to a topic using a bare function as the callback.
Definition: Node.hh:272
void Publish(const std::string &_topic, const google::protobuf::Message &_message)
A convenience function for a one-time publication of a message.
Definition: Node.hh:201
Options for a subscription.
Definition: SubscribeOptions.hh:36
virtual ~Node()
Destructor.
Forward declarations for the common classes.
Definition: Animation.hh:27
boost::shared_ptr< google::protobuf::Message > MessagePtr
Definition: TransportTypes.hh:45
A Time class, can be used to hold wall- or sim-time. stored as sec and nano-sec.
Definition: Time.hh:48
#define NULL
Definition: CommonTypes.hh:31
boost::shared_ptr< Publisher > PublisherPtr
Definition: TransportTypes.hh:49
bool HandleMessage(const std::string &_topic, MessagePtr _msg)
Handle incoming msg.
std::string GetMsgType(const std::string &_topic) const
Get the message type for a topic.
Forward declarations for transport.
bool HandleData(const std::string &_topic, const std::string &_msg)
Handle incoming data.
std::string EncodeTopicName(const std::string &_topic)
Encode a topic name.
void InsertLatchedMsg(const std::string &_topic, const std::string &_msg)
Add a latched message to the node for publication.
std::string DecodeTopicName(const std::string &_topic)
Decode a topic name.
boost::shared_ptr< Subscriber > SubscriberPtr
Definition: TransportTypes.hh:53
void Publish(const std::string &_topic, const google::protobuf::Message &_message)
A convenience function for a one-time publication of a message.
Definition: Node.hh:160
SubscriberPtr Subscribe(const std::string &_topic, void(*_fp)(const std::string &), bool _latching=false)
Subscribe to a topic using a bare function as the callback.
Definition: Node.hh:331
A node can advertise and subscribe topics, publish on advertised topics and listen to subscribed topi...
Definition: Node.hh:82
unsigned int GetId() const
Get the unique ID of the node.
bool TryInit(const common::Time &_maxWait=common::Time(1, 0))
Try to initialize the node to use the global namespace, and specify the maximum wait time.
Callback helper Template.
Definition: CallbackHelper.hh:112
void InsertLatchedMsg(const std::string &_topic, MessagePtr _msg)
Add a latched message to the node for publication.
static TopicManager * Instance()
Get an instance of the singleton.
Definition: SingletonT.hh:36
void RemoveCallback(const std::string &_topic, unsigned int _id)
void Init(const std::string &_space="")
Init the node.
void Init(const std::string &_topic, NodePtr _node, bool _latching)
Initialize the options.
Definition: SubscribeOptions.hh:48
transport
Definition: ConnectionManager.hh:35
void ProcessPublishers()
Process all publishers, which has each publisher send it's most recent message over the wire.
transport::PublisherPtr Advertise(const std::string &_topic, unsigned int _queueLimit=1000, double _hzRate=0)
Advertise a topic.
Definition: Node.hh:179
SubscriberPtr Subscribe(const std::string &_topic, void(T::*_fp)(const std::string &), T *_obj, bool _latching=false)
Subscribe to a topic using a class method as the callback.
Definition: Node.hh:302
boost::shared_ptr< CallbackHelper > CallbackHelperPtr
boost shared pointer to transport::CallbackHelper
Definition: CallbackHelper.hh:105
std::string GetTopicNamespace() const
Get the topic namespace for this node.
bool HasLatchedSubscriber(const std::string &_topic) const
Return true if a subscriber on a specific topic is latched.
SubscriberPtr Subscribe(const std::string &_topic, void(T::*_fp)(const boost::shared_ptr< M const > &), T *_obj, bool _latching=false)
Subscribe to a topic using a class method as the callback.
Definition: Node.hh:243
void Fini()
Finalize the node.
bool IsInitialized() const
Check if this Node has been initialized.
Used to connect publishers to subscribers, where the subscriber wants the raw data from the publisher...
Definition: CallbackHelper.hh:178