Fawkes API  Fawkes Development Version
pcl_db_retrieve_thread.cpp
1 
2 /***************************************************************************
3  * pcl_db_retrieve_thread.cpp - Restore and retrieve point clouds from MongoDB
4  *
5  * Created: Thu Aug 22 12:04:09 2013
6  * Copyright 2012 Tim Niemueller [www.niemueller.de]
7  ****************************************************************************/
8 
9 /* This program is free software; you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation; either version 2 of the License, or
12  * (at your option) any later version.
13  *
14  * This program is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17  * GNU Library General Public License for more details.
18  *
19  * Read the full text in the LICENSE.GPL file in the doc directory.
20  */
21 
22 #include "pcl_db_retrieve_thread.h"
23 
24 #include <blackboard/utils/on_message_waker.h>
25 #include <interfaces/PclDatabaseRetrieveInterface.h>
26 
27 using namespace fawkes;
28 
29 /** @class PointCloudDBRetrieveThread "pcl_db_retrieve_thread.h"
30  * Thread to retrieve point clouds from database on request.
31  * @author Tim Niemueller
32  */
33 
34 /** Constructor. */
36 : Thread("PointCloudDBRetrieveThread", Thread::OPMODE_WAITFORWAKEUP), MongoDBAspect("default")
37 {
38 }
39 
40 /** Destructor. */
42 {
43 }
44 
45 void
47 {
48  pl_xyz_ = NULL;
49  pl_xyzrgb_ = NULL;
50  retrieve_if_ = NULL;
51  msg_waker_ = NULL;
52 
53  cfg_database_ = config->get_string(CFG_PREFIX "database-name");
54  cfg_output_id_ = config->get_string(CFG_PREFIX_RETRV "output-pcl-id");
55  cfg_original_id_ = config->get_string(CFG_PREFIX_RETRV "original-pcl-id");
56 
57  foutput_ = new pcl::PointCloud<pcl::PointXYZRGB>();
58  foutput_->is_dense = false;
59  pcl_manager->add_pointcloud<pcl::PointXYZRGB>(cfg_output_id_.c_str(), foutput_);
60  output_ = pcl_utils::cloudptr_from_refptr(foutput_);
61 
62  foriginal_ = new pcl::PointCloud<pcl::PointXYZRGB>();
63  foriginal_->is_dense = false;
64  pcl_manager->add_pointcloud<pcl::PointXYZRGB>(cfg_original_id_.c_str(), foriginal_);
65  original_ = pcl_utils::cloudptr_from_refptr(foriginal_);
66 
68  mongodb_client, config, logger, tf_listener, original_, output_);
69 
71  mongodb_client, config, logger, tf_listener, original_, output_);
72 
73  try {
74  retrieve_if_ =
75  blackboard->open_for_writing<PclDatabaseRetrieveInterface>("PCL Database Retrieve");
76 
77  msg_waker_ = new BlackBoardOnMessageWaker(blackboard, retrieve_if_, this);
78  } catch (Exception &e) {
79  finalize();
80  throw;
81  }
82 }
83 
84 void
86 {
87  delete msg_waker_;
88  blackboard->close(retrieve_if_);
89 
90  delete pl_xyz_;
91  delete pl_xyzrgb_;
92 
93  output_.reset();
94  pcl_manager->remove_pointcloud(cfg_output_id_.c_str());
95  foutput_.reset();
96 
97  original_.reset();
98  pcl_manager->remove_pointcloud(cfg_original_id_.c_str());
99  foriginal_.reset();
100 }
101 
102 void
104 {
105  long timestamp = 0;
106  std::vector<long long> times(1);
107  std::string database;
108  std::string collection;
109  std::string target_frame;
110  bool original_timestamp;
111 
112  if (retrieve_if_->msgq_empty())
113  return;
114 
115  if (PclDatabaseRetrieveInterface::RetrieveMessage *msg = retrieve_if_->msgq_first_safe(msg)) {
116  retrieve_if_->set_final(false);
117  retrieve_if_->set_msgid(msg->id());
118  retrieve_if_->set_error("");
119  retrieve_if_->write();
120 
121  timestamp = msg->timestamp();
122  times[0] = timestamp;
123  database = (strcmp(msg->database(), "") != 0) ? msg->database() : cfg_database_;
124  collection = msg->collection();
125  target_frame = msg->target_frame();
126  original_timestamp = msg->is_original_timestamp();
127  } else {
128  logger->log_warn(name(), "Unhandled message received");
129  retrieve_if_->msgq_pop();
130  return;
131  }
132  retrieve_if_->msgq_pop();
133 
134  logger->log_info(name(), "Restoring from '%s' for the time %li", collection.c_str(), timestamp);
135 
136  ApplicabilityStatus st_xyz, st_xyzrgb;
137  long actual_time = 0;
138 
139  pl_xyz_->applicable(times, database, collection);
140  if ((st_xyz = pl_xyz_->applicable(times, database, collection)) == APPLICABLE) {
141  logger->log_info(name(), "Restoring XYZ");
142  pl_xyz_->retrieve(timestamp, database, collection, target_frame, actual_time);
143  } else if ((st_xyzrgb = pl_xyzrgb_->applicable(times, database, collection)) == APPLICABLE) {
144  logger->log_info(name(), "Restoring XYZRGB");
145  pl_xyzrgb_->retrieve(timestamp, database, collection, target_frame, actual_time);
146  if (!original_timestamp) {
147  Time now(clock);
148  pcl_utils::set_time(foutput_, now);
149  }
150  } else {
151  logger->log_warn(name(), "No applicable merging pipeline known:");
152  logger->log_warn(name(), " XYZ: %s", to_string(st_xyz));
153  logger->log_warn(name(), " XYZ/RGB: %s", to_string(st_xyzrgb));
154  retrieve_if_->set_error("No applicable merging pipeline known (for details see log)");
155  }
156 
157  if (actual_time != 0) {
158  if (original_timestamp) {
159  Time now((long)actual_time);
160  Time last;
161  pcl_utils::get_time(foutput_, last);
162  // force sending with one microsecond offset if same than last time
163  if (last == now)
164  now += 1L;
165  pcl_utils::set_time(foutput_, now);
166  } else {
167  Time now(clock);
168  pcl_utils::set_time(foutput_, now);
169  }
170  }
171 
172  retrieve_if_->set_final(true);
173  retrieve_if_->write();
174 }
ApplicabilityStatus applicable(std::vector< long long > &times, std::string &database, std::string &collection)
Check if this pipeline instance is suitable for the given times.
void retrieve(long timestamp, std::string &database, std::string &collection, std::string &target_frame, long &actual_time)
Retrieve point clouds.
virtual ~PointCloudDBRetrieveThread()
Destructor.
virtual void loop()
Code to execute in the thread.
virtual void finalize()
Finalize the thread.
virtual void init()
Initialize the thread.
BlackBoard * blackboard
This is the BlackBoard instance you can use to interact with the BlackBoard.
Definition: blackboard.h:44
Wake threads on receiving a blackboard message.
virtual Interface * open_for_writing(const char *interface_type, const char *identifier, const char *owner=NULL)=0
Open interface for writing.
virtual void close(Interface *interface)=0
Close interface.
Clock * clock
By means of this member access to the clock is given.
Definition: clock.h:42
Configuration * config
This is the Configuration member used to access the configuration.
Definition: configurable.h:41
virtual std::string get_string(const char *path)=0
Get value from configuration which is of type string.
Base class for exceptions in Fawkes.
Definition: exception.h:36
virtual void log_warn(const char *component, const char *format,...)=0
Log warning message.
virtual void log_info(const char *component, const char *format,...)=0
Log informational message.
Logger * logger
This is the Logger member used to access the logger.
Definition: logging.h:41
Thread aspect to access MongoDB.
Definition: mongodb.h:39
mongocxx::client * mongodb_client
MongoDB client to use to interact with the database.
Definition: mongodb.h:54
PointCloudManager * pcl_manager
Manager to distribute and access point clouds.
Definition: pointcloud.h:47
void remove_pointcloud(const char *id)
Remove the point cloud.
void add_pointcloud(const char *id, RefPtr< pcl::PointCloud< PointT >> cloud)
Add point cloud.
void reset()
Reset pointer.
Definition: refptr.h:455
Thread class encapsulation of pthreads.
Definition: thread.h:46
const char * name() const
Get name of thread.
Definition: thread.h:100
A class for handling time.
Definition: time.h:93
tf::Transformer * tf_listener
This is the transform listener which saves transforms published by other threads in the system.
Definition: tf.h:67
Fawkes library namespace.