Fawkes API  Fawkes Development Version
tf_thread.cpp
1 
2 /***************************************************************************
3  * tf_thread.cpp - Thread to exchange transforms
4  *
5  * Created: Wed Oct 26 01:02:59 2011
6  * Copyright 2011 Tim Niemueller [www.niemueller.de]
7  ****************************************************************************/
8 
9 /* This program is free software; you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation; either version 2 of the License, or
12  * (at your option) any later version.
13  *
14  * This program is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17  * GNU Library General Public License for more details.
18  *
19  * Read the full text in the LICENSE.GPL file in the doc directory.
20  */
21 
22 #include "tf_thread.h"
23 
24 #include <core/threading/mutex_locker.h>
25 #include <ros/this_node.h>
26 
27 using namespace fawkes;
28 
29 /** @class RosTfThread "tf_thread.h"
30  * Thread to exchange transforms between Fawkes and ROS.
31  * This threads connects to Fawkes and ROS to read and write transforms.
32  * Transforms received on one end are republished to the other side. To
33  * Fawkes new frames are published during the sensor hook.
34  * @author Tim Niemueller
35  */
36 
37 /** Constructor. */
39 : Thread("RosTfThread", Thread::OPMODE_WAITFORWAKEUP),
40  BlockedTimingAspect(BlockedTimingAspect::WAKEUP_HOOK_SENSOR_ACQUIRE),
41  TransformAspect(TransformAspect::DEFER_PUBLISHER),
42  BlackBoardInterfaceListener("RosTfThread")
43 {
44  tf_msg_queue_mutex_ = new Mutex();
45  seq_num_mutex_ = new Mutex();
46  last_update_ = new Time();
47 }
48 
49 /** Destructor. */
51 {
52  delete tf_msg_queue_mutex_;
53  delete seq_num_mutex_;
54  delete last_update_;
55 }
56 
57 void
59 {
60  active_queue_ = 0;
61  seq_num_ = 0;
62  last_update_->set_clock(clock);
63  last_update_->set_time(0, 0);
64 
65  cfg_use_tf2_ = config->get_bool("/ros/tf/use_tf2");
66  cfg_update_interval_ = 1.0;
67  try {
68  cfg_update_interval_ = config->get_float("/ros/tf/static-update-interval");
69  } catch (Exception &e) {
70  } // ignored, use default
71 
72  // Must do that before registering listener because we might already
73  // get events right away
74  if (cfg_use_tf2_) {
75 #ifndef HAVE_TF2_MSGS
76  throw Exception("tf2 enabled in config but not available at compile time");
77 #else
78  sub_tf_ = rosnode->subscribe("tf", 100, &RosTfThread::tf_message_cb_dynamic, this);
79  sub_static_tf_ = rosnode->subscribe("tf_static", 100, &RosTfThread::tf_message_cb_static, this);
80  pub_tf_ = rosnode->advertise<tf2_msgs::TFMessage>("tf", 100);
81  pub_static_tf_ = rosnode->advertise<tf2_msgs::TFMessage>("tf_static", 100, /* latch */ true);
82 #endif
83  } else {
84  sub_tf_ = rosnode->subscribe("tf", 100, &RosTfThread::tf_message_cb, this);
85  pub_tf_ = rosnode->advertise<::tf::tfMessage>("tf", 100);
86  }
87 
89  std::list<TransformInterface *>::iterator i;
90  for (i = tfifs_.begin(); i != tfifs_.end(); ++i) {
94  }
96 
97  publish_static_transforms_to_ros();
98 
99  bbio_add_observed_create("TransformInterface", "/tf*");
101 }
102 
103 void
105 {
108 
109  sub_tf_.shutdown();
110  pub_tf_.shutdown();
111 
112  std::list<TransformInterface *>::iterator i;
113  for (i = tfifs_.begin(); i != tfifs_.end(); ++i) {
114  blackboard->close(*i);
115  }
116  tfifs_.clear();
117 }
118 
119 void
121 {
122  tf_msg_queue_mutex_->lock();
123  unsigned int queue = active_queue_;
124  active_queue_ = 1 - active_queue_;
125  tf_msg_queue_mutex_->unlock();
126 
127  if (cfg_use_tf2_) {
128 #ifdef HAVE_TF2_MSGS
129  while (!tf2_msg_queues_[queue].empty()) {
130  const std::pair<bool, tf2_msgs::TFMessage::ConstPtr> &q = tf2_msg_queues_[queue].front();
131  const tf2_msgs::TFMessage::ConstPtr & msg = q.second;
132  const size_t tsize = msg->transforms.size();
133  for (size_t i = 0; i < tsize; ++i) {
134  publish_transform_to_fawkes(msg->transforms[i], q.first);
135  }
136  tf2_msg_queues_[queue].pop();
137  }
138 #endif
139  } else {
140  while (!tf_msg_queues_[queue].empty()) {
141  const ::tf::tfMessage::ConstPtr &msg = tf_msg_queues_[queue].front();
142  const size_t tsize = msg->transforms.size();
143  for (size_t i = 0; i < tsize; ++i) {
144  geometry_msgs::TransformStamped ts = msg->transforms[i];
145  if (!ts.header.frame_id.empty() && ts.header.frame_id[0] == '/') {
146  ts.header.frame_id = ts.header.frame_id.substr(1);
147  }
148  if (!ts.child_frame_id.empty() && ts.child_frame_id[0] == '/') {
149  ts.child_frame_id = ts.child_frame_id.substr(1);
150  }
151  publish_transform_to_fawkes(ts);
152  }
153  tf_msg_queues_[queue].pop();
154  }
155 
156  fawkes::Time now(clock);
157  if ((now - last_update_) > cfg_update_interval_) {
158  last_update_->stamp();
159 
160  publish_static_transforms_to_ros();
161  }
162  }
163 }
164 
165 void
167 {
168  TransformInterface *tfif = dynamic_cast<TransformInterface *>(interface);
169  if (!tfif)
170  return;
171 
172  tfif->read();
173 
174  if (cfg_use_tf2_ && tfif->is_static_transform()) {
175  publish_static_transforms_to_ros();
176  } else {
177  if (cfg_use_tf2_) {
178 #ifdef HAVE_TF2_MSGS
179  tf2_msgs::TFMessage tmsg;
180  tmsg.transforms.push_back(create_transform_stamped(tfif));
181  pub_tf_.publish(tmsg);
182 #endif
183  } else {
184  ::tf::tfMessage tmsg;
185  if (tfif->is_static_transform()) {
186  // date time stamps slightly into the future so they are valid
187  // for longer and need less frequent updates.
188  fawkes::Time timestamp = fawkes::Time(clock) + (cfg_update_interval_ * 1.1);
189 
190  tmsg.transforms.push_back(create_transform_stamped(tfif, &timestamp));
191  } else {
192  tmsg.transforms.push_back(create_transform_stamped(tfif));
193  }
194  pub_tf_.publish(tmsg);
195  }
196  }
197 }
198 
199 void
200 RosTfThread::bb_interface_created(const char *type, const char *id) noexcept
201 {
202  if (strncmp(type, "TransformInterface", INTERFACE_TYPE_SIZE_) != 0)
203  return;
204 
205  for (const auto &f : ros_frames_) {
206  // ignore interfaces that we publish ourself
207  if (f == id)
208  return;
209  }
210 
211  TransformInterface *tfif;
212  try {
213  //logger->log_info(name(), "Opening %s:%s", type, id);
214  tfif = blackboard->open_for_reading<TransformInterface>(id);
215  } catch (Exception &e) {
216  // ignored
217  logger->log_warn(name(), "Failed to open %s:%s: %s", type, id, e.what());
218  return;
219  }
220 
221  try {
222  bbil_add_data_interface(tfif);
223  bbil_add_reader_interface(tfif);
224  bbil_add_writer_interface(tfif);
225  blackboard->update_listener(this);
226  tfifs_.push_back(tfif);
227  } catch (Exception &e) {
228  blackboard->close(tfif);
229  logger->log_warn(name(), "Failed to register for %s:%s: %s", type, id, e.what());
230  return;
231  }
232 }
233 
234 void
236  fawkes::Uuid instance_serial) noexcept
237 {
238  conditional_close(interface);
239 }
240 
241 void
243  fawkes::Uuid instance_serial) noexcept
244 {
245  conditional_close(interface);
246 }
247 
248 void
249 RosTfThread::conditional_close(Interface *interface) noexcept
250 {
251  // Verify it's a TransformInterface
252  TransformInterface *tfif = dynamic_cast<TransformInterface *>(interface);
253  if (!tfif)
254  return;
255 
256  std::list<TransformInterface *>::iterator i;
257  for (i = tfifs_.begin(); i != tfifs_.end(); ++i) {
258  if (*interface == **i) {
259  if (!interface->has_writer() && (interface->num_readers() == 1)) {
260  // It's only us
261  logger->log_info(name(), "Last on %s, closing", interface->uid());
262  bbil_remove_data_interface(*i);
263  bbil_remove_reader_interface(*i);
264  bbil_remove_writer_interface(*i);
265  blackboard->update_listener(this);
266  blackboard->close(*i);
267  tfifs_.erase(i);
268  break;
269  }
270  }
271  }
272 }
273 
274 geometry_msgs::TransformStamped
275 RosTfThread::create_transform_stamped(TransformInterface *tfif, const Time *time)
276 {
277  double *translation = tfif->translation();
278  double *rotation = tfif->rotation();
279  if (!time)
280  time = tfif->timestamp();
281 
282  geometry_msgs::Vector3 t;
283  t.x = translation[0];
284  t.y = translation[1];
285  t.z = translation[2];
286  geometry_msgs::Quaternion r;
287  r.x = rotation[0];
288  r.y = rotation[1];
289  r.z = rotation[2];
290  r.w = rotation[3];
291  geometry_msgs::Transform tr;
292  tr.translation = t;
293  tr.rotation = r;
294 
295  geometry_msgs::TransformStamped ts;
296  seq_num_mutex_->lock();
297  ts.header.seq = ++seq_num_;
298  seq_num_mutex_->unlock();
299  ts.header.stamp = ros::Time(time->get_sec(), time->get_nsec());
300  ts.header.frame_id = tfif->frame();
301  ts.child_frame_id = tfif->child_frame();
302  ts.transform = tr;
303 
304  return ts;
305 }
306 
307 void
308 RosTfThread::publish_static_transforms_to_ros()
309 {
310  std::list<fawkes::TransformInterface *>::iterator t;
311  fawkes::Time now(clock);
312 
313  if (cfg_use_tf2_) {
314 #ifdef HAVE_TF2_MSGS
315  tf2_msgs::TFMessage tmsg;
316  for (t = tfifs_.begin(); t != tfifs_.end(); ++t) {
317  fawkes::TransformInterface *tfif = *t;
318  tfif->read();
319  if (tfif->is_static_transform()) {
320  tmsg.transforms.push_back(create_transform_stamped(tfif, &now));
321  }
322  }
323  pub_static_tf_.publish(tmsg);
324 #endif
325  } else {
326  // date time stamps slightly into the future so they are valid
327  // for longer and need less frequent updates.
328  fawkes::Time timestamp = now + (cfg_update_interval_ * 1.1);
329 
330  ::tf::tfMessage tmsg;
331  for (t = tfifs_.begin(); t != tfifs_.end(); ++t) {
332  fawkes::TransformInterface *tfif = *t;
333  tfif->read();
334  if (tfif->is_static_transform()) {
335  tmsg.transforms.push_back(create_transform_stamped(tfif, &timestamp));
336  }
337  }
338  pub_tf_.publish(tmsg);
339  }
340 }
341 
342 void
343 RosTfThread::publish_transform_to_fawkes(const geometry_msgs::TransformStamped &ts, bool static_tf)
344 {
345  const geometry_msgs::Vector3 & t = ts.transform.translation;
346  const geometry_msgs::Quaternion &r = ts.transform.rotation;
347 
348  fawkes::Time time(ts.header.stamp.sec, ts.header.stamp.nsec / 1000);
349 
350  fawkes::tf::Transform tr(fawkes::tf::Quaternion(r.x, r.y, r.z, r.w),
351  fawkes::tf::Vector3(t.x, t.y, t.z));
352  fawkes::tf::StampedTransform st(tr, time, ts.header.frame_id, ts.child_frame_id);
353 
354  if (tf_publishers.find(ts.child_frame_id) == tf_publishers.end()) {
355  try {
356  ros_frames_.push_back(std::string("/tf/") + ts.child_frame_id);
357  tf_add_publisher("%s", ts.child_frame_id.c_str());
358  tf_publishers[ts.child_frame_id]->send_transform(st, static_tf);
359  } catch (Exception &e) {
360  ros_frames_.pop_back();
361  logger->log_warn(name(),
362  "Failed to create Fawkes transform publisher for frame %s from ROS",
363  ts.child_frame_id.c_str());
364  logger->log_warn(name(), e);
365  }
366  } else {
367  tf_publishers[ts.child_frame_id]->send_transform(st, static_tf);
368  }
369 }
370 
371 /** Callback function for ROS tf message subscription.
372  * @param msg incoming message
373  */
374 void
375 RosTfThread::tf_message_cb(const ros::MessageEvent<::tf::tfMessage const> &msg_evt)
376 {
377  MutexLocker lock(tf_msg_queue_mutex_);
378 
379  const ::tf::tfMessage::ConstPtr &msg = msg_evt.getConstMessage();
380  std::string authority = msg_evt.getPublisherName();
381 
382  if (authority == "") {
383  logger->log_warn(name(), "Message received without callerid");
384  } else if (authority != ros::this_node::getName()) {
385  tf_msg_queues_[active_queue_].push(msg);
386  }
387 }
388 
389 #ifdef HAVE_TF2_MSGS
390 void
391 RosTfThread::tf_message_cb_static(const ros::MessageEvent<tf2_msgs::TFMessage const> &msg_evt)
392 {
393  tf_message_cb(msg_evt, true);
394 }
395 
396 void
397 RosTfThread::tf_message_cb_dynamic(const ros::MessageEvent<tf2_msgs::TFMessage const> &msg_evt)
398 {
399  tf_message_cb(msg_evt, false);
400 }
401 
402 void
403 RosTfThread::tf_message_cb(const ros::MessageEvent<tf2_msgs::TFMessage const> &msg_evt,
404  bool static_tf)
405 {
406  MutexLocker lock(tf_msg_queue_mutex_);
407 
408  const tf2_msgs::TFMessage::ConstPtr &msg = msg_evt.getConstMessage();
409  std::string authority = msg_evt.getPublisherName();
410 
411  if (authority == "") {
412  logger->log_warn(name(), "Message received without callerid");
413  } else if (authority != ros::this_node::getName()) {
414  tf2_msg_queues_[active_queue_].push(std::make_pair(static_tf, msg));
415  }
416 }
417 #endif
virtual void bb_interface_reader_removed(fawkes::Interface *interface, fawkes::Uuid instance_serial) noexcept
A reading instance has been closed for a watched interface.
Definition: tf_thread.cpp:242
RosTfThread()
Constructor.
Definition: tf_thread.cpp:38
virtual void loop()
Code to execute in the thread.
Definition: tf_thread.cpp:120
virtual void init()
Initialize the thread.
Definition: tf_thread.cpp:58
virtual void bb_interface_writer_removed(fawkes::Interface *interface, fawkes::Uuid instance_serial) noexcept
A writing instance has been closed for a watched interface.
Definition: tf_thread.cpp:235
virtual void finalize()
Finalize the thread.
Definition: tf_thread.cpp:104
virtual void bb_interface_created(const char *type, const char *id) noexcept
BlackBoard interface created notification.
Definition: tf_thread.cpp:200
virtual ~RosTfThread()
Destructor.
Definition: tf_thread.cpp:50
virtual void bb_interface_data_refreshed(fawkes::Interface *interface) noexcept
BlackBoard data refreshed notification.
Definition: tf_thread.cpp:166
BlackBoard * blackboard
This is the BlackBoard instance you can use to interact with the BlackBoard.
Definition: blackboard.h:44
BlackBoard interface listener.
void bbil_add_reader_interface(Interface *interface)
Add an interface to the reader addition/removal watch list.
void bbil_add_writer_interface(Interface *interface)
Add an interface to the writer addition/removal watch list.
void bbil_add_data_interface(Interface *interface)
Add an interface to the data modification watch list.
void bbio_add_observed_create(const char *type_pattern, const char *id_pattern="*") noexcept
Add interface creation type to watch list.
virtual void unregister_observer(BlackBoardInterfaceObserver *observer)
Unregister BB interface observer.
Definition: blackboard.cpp:240
virtual Interface * open_for_reading(const char *interface_type, const char *identifier, const char *owner=NULL)=0
Open interface for reading.
virtual void update_listener(BlackBoardInterfaceListener *listener, ListenerRegisterFlag flag=BBIL_FLAG_ALL)
Update BB event listener.
Definition: blackboard.cpp:197
virtual void register_observer(BlackBoardInterfaceObserver *observer)
Register BB interface observer.
Definition: blackboard.cpp:225
virtual void unregister_listener(BlackBoardInterfaceListener *listener)
Unregister BB interface listener.
Definition: blackboard.cpp:212
virtual std::list< Interface * > open_multiple_for_reading(const char *type_pattern, const char *id_pattern="*", const char *owner=NULL)=0
Open multiple interfaces for reading.
virtual void register_listener(BlackBoardInterfaceListener *listener, ListenerRegisterFlag flag=BBIL_FLAG_ALL)
Register BB event listener.
Definition: blackboard.cpp:185
virtual void close(Interface *interface)=0
Close interface.
Thread aspect to use blocked timing.
Clock * clock
By means of this member access to the clock is given.
Definition: clock.h:42
Configuration * config
This is the Configuration member used to access the configuration.
Definition: configurable.h:41
virtual bool get_bool(const char *path)=0
Get value from configuration which is of type bool.
virtual float get_float(const char *path)=0
Get value from configuration which is of type float.
Base class for exceptions in Fawkes.
Definition: exception.h:36
virtual const char * what() const noexcept
Get primary string.
Definition: exception.cpp:639
Base class for all Fawkes BlackBoard interfaces.
Definition: interface.h:80
const Time * timestamp() const
Get timestamp of last write.
Definition: interface.cpp:714
void read()
Read from BlackBoard into local copy.
Definition: interface.cpp:479
virtual void log_warn(const char *component, const char *format,...)=0
Log warning message.
Logger * logger
This is the Logger member used to access the logger.
Definition: logging.h:41
virtual void log_info(const char *component, const char *format,...)
Log informational message.
Definition: multi.cpp:195
virtual void log_warn(const char *component, const char *format,...)
Log warning message.
Definition: multi.cpp:216
Mutex locking helper.
Definition: mutex_locker.h:34
Mutex mutual exclusion lock.
Definition: mutex.h:33
void lock()
Lock this mutex.
Definition: mutex.cpp:87
void unlock()
Unlock the mutex.
Definition: mutex.cpp:131
LockPtr< ros::NodeHandle > rosnode
Central ROS node handle.
Definition: ros.h:47
Thread class encapsulation of pthreads.
Definition: thread.h:46
const char * name() const
Get name of thread.
Definition: thread.h:100
A class for handling time.
Definition: time.h:93
void set_clock(Clock *clock)
Set clock for this instance.
Definition: time.cpp:308
Time & stamp()
Set this time to the current time.
Definition: time.cpp:704
long get_nsec() const
Get nanoseconds.
Definition: time.h:132
void set_time(const timeval *tv)
Sets the time.
Definition: time.cpp:246
long get_sec() const
Get seconds.
Definition: time.h:117
Thread aspect to access the transform system.
Definition: tf.h:39
std::map< std::string, tf::TransformPublisher * > tf_publishers
Map of transform publishers created through the aspect.
Definition: tf.h:70
void tf_add_publisher(const char *frame_id_format,...)
Late add of publisher.
Definition: tf.cpp:186
TransformInterface Fawkes BlackBoard Interface.
char * frame() const
Get frame value.
char * child_frame() const
Get child_frame value.
bool is_static_transform() const
Get static_transform value.
double * rotation() const
Get rotation value.
double * translation() const
Get translation value.
A convenience class for universally unique identifiers (UUIDs).
Definition: uuid.h:29
Transform that contains a timestamp and frame IDs.
Definition: types.h:92
Fawkes library namespace.