Fawkes API  Fawkes Development Version
sync_thread.cpp
1 
2 /***************************************************************************
3  * sync_thread.cpp - Fawkes BlackBoard Synchronization Thread
4  *
5  * Created: Thu Jun 04 18:13:06 2009
6  * Copyright 2006-2009 Tim Niemueller [www.niemueller.de]
7  *
8  ****************************************************************************/
9 
10 /* This program is free software; you can redistribute it and/or modify
11  * it under the terms of the GNU General Public License as published by
12  * the Free Software Foundation; either version 2 of the License, or
13  * (at your option) any later version.
14  *
15  * This program is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18  * GNU Library General Public License for more details.
19  *
20  * Read the full text in the LICENSE.GPL file in the doc directory.
21  */
22 
23 #include "sync_thread.h"
24 
25 #include <blackboard/remote.h>
26 #include <core/threading/mutex_locker.h>
27 #include <utils/time/wait.h>
28 
29 #include <cstring>
30 
31 using namespace std;
32 using namespace fawkes;
33 
34 /** @class BlackBoardSynchronizationThread "sync_thread.h"
35  * Thread to synchronize two BlackBoards.
36  * @author Tim Niemueller
37  */
38 
39 /** Constructor.
40  * @param bbsync_cfg_prefix Configuration prefix for the whole bbsync plugin
41  * @param peer_cfg_prefix The configuration prefix for the peer this sync thread
42  * has been created for.
43  * @param peer name of the peer configuration for this thread
44  */
46  std::string &peer_cfg_prefix,
47  std::string &peer)
48 : Thread("", Thread::OPMODE_CONTINUOUS)
49 {
50  set_name("BBSyncThread[%s]", peer.c_str());
52 
53  bbsync_cfg_prefix_ = bbsync_cfg_prefix;
54  peer_cfg_prefix_ = peer_cfg_prefix;
55  peer_ = peer;
56 
57  remote_bb_ = NULL;
58 }
59 
60 /** Destructor. */
62 {
63 }
64 
65 void
67 {
68  logger->log_debug(name(), "Initializing");
69  unsigned int check_interval = 0;
70  try {
71  host_ = config->get_string((peer_cfg_prefix_ + "host").c_str());
72  port_ = config->get_uint((peer_cfg_prefix_ + "port").c_str());
73 
74  check_interval = config->get_uint((bbsync_cfg_prefix_ + "check_interval").c_str());
75  } catch (Exception &e) {
76  e.append("Host or port not specified for peer");
77  throw;
78  }
79 
80  try {
81  check_interval = config->get_uint((peer_cfg_prefix_ + "check_interval").c_str());
82  logger->log_debug(name(), "Peer check interval set, overriding default.");
83  } catch (Exception &e) {
84  logger->log_debug(name(), "No per-peer check interval set, using default");
85  }
86 
87  read_config_combos(peer_cfg_prefix_ + "reading/", /* writing */ false);
88  read_config_combos(peer_cfg_prefix_ + "writing/", /* writing */ true);
89 
90  for (ComboMap::iterator i = combos_.begin(); i != combos_.end(); ++i) {
92  "Combo: %s, %s (%s, R) -> %s (%s, W)",
93  i->second.type.c_str(),
94  i->second.reader_id.c_str(),
95  i->second.remote_writer ? "local" : "remote",
96  i->second.writer_id.c_str(),
97  i->second.remote_writer ? "remote" : "local");
98  }
99 
100  wsl_local_ = new SyncWriterInterfaceListener(this, logger, (peer_ + "/local").c_str());
101  wsl_remote_ = new SyncWriterInterfaceListener(this, logger, (peer_ + "/remote").c_str());
102 
103  if (!check_connection()) {
104  logger->log_warn(name(), "Remote peer not reachable, will keep trying");
105  }
106 
107  logger->log_debug(name(), "Checking for remote aliveness every %u ms", check_interval);
108  timewait_ = new TimeWait(clock, check_interval * 1000);
109 }
110 
111 void
113 {
114  delete timewait_;
115 
116  close_interfaces();
117 
118  delete wsl_local_;
119  delete wsl_remote_;
120  delete remote_bb_;
121  remote_bb_ = NULL;
122 }
123 
124 void
126 {
127  timewait_->mark_start();
128  check_connection();
129  timewait_->wait_systime();
130 }
131 
132 bool
133 BlackBoardSynchronizationThread::check_connection()
134 {
135  if (!remote_bb_ || !remote_bb_->is_alive()) {
136  if (remote_bb_) {
137  logger->log_warn(name(),
138  "Lost connection via remote BB to %s (%s:%u), will try to re-establish",
139  peer_.c_str(),
140  host_.c_str(),
141  port_);
142  blackboard->unregister_listener(wsl_local_);
143  remote_bb_->unregister_listener(wsl_remote_);
144  close_interfaces();
145  delete remote_bb_;
146  remote_bb_ = NULL;
147  }
148 
149  try {
150  remote_bb_ = new RemoteBlackBoard(host_.c_str(), port_);
151  logger->log_info(name(),
152  "Successfully connected via remote BB to %s (%s:%u)",
153  peer_.c_str(),
154  host_.c_str(),
155  port_);
156 
157  open_interfaces();
158  blackboard->register_listener(wsl_local_, BlackBoard::BBIL_FLAG_WRITER);
159  remote_bb_->register_listener(wsl_remote_, BlackBoard::BBIL_FLAG_WRITER);
160  } catch (Exception &e) {
161  e.print_trace();
162  return false;
163  }
164  }
165  return true;
166 }
167 
168 void
169 BlackBoardSynchronizationThread::read_config_combos(std::string prefix, bool writing)
170 {
171  Configuration::ValueIterator *i = config->search(prefix.c_str());
172  while (i->next()) {
173  if (strcmp(i->type(), "string") != 0) {
174  TypeMismatchException e("Only values of type string may occur in %s, "
175  "but found value of type %s",
176  prefix.c_str(),
177  i->type());
178  delete i;
179  throw e;
180  }
181 
182  std::string varname = std::string(i->path()).substr(prefix.length());
183  std::string uid = i->get_string();
184  size_t sf;
185 
186  if ((sf = uid.find("::")) == std::string::npos) {
187  delete i;
188  throw Exception("Interface UID '%s' at %s is not valid, missing double colon",
189  uid.c_str(),
190  i->path());
191  }
192 
193  std::string type = uid.substr(0, sf);
194  std::string id = uid.substr(sf + 2);
195  combo_t combo = {type, id, id, writing};
196 
197  if ((sf = id.find("=")) != std::string::npos) {
198  // we got a mapping
199  combo.reader_id = id.substr(0, sf);
200  combo.writer_id = id.substr(sf + 1);
201  }
202 
203  combos_[varname] = combo;
204  }
205  delete i;
206 }
207 
208 void
209 BlackBoardSynchronizationThread::open_interfaces()
210 {
211  logger->log_debug(name(), "Opening interfaces");
212  MutexLocker lock(interfaces_.mutex());
213 
214  ComboMap::iterator i;
215  for (i = combos_.begin(); i != combos_.end(); ++i) {
216  Interface *iface_reader = NULL, *iface_writer = NULL;
217 
218  BlackBoard *writer_bb = i->second.remote_writer ? remote_bb_ : blackboard;
219  BlackBoard *reader_bb = i->second.remote_writer ? blackboard : remote_bb_;
220 
221  try {
222  logger->log_debug(name(),
223  "Opening reading %s (%s:%s)",
224  i->second.remote_writer ? "locally" : "remotely",
225  i->second.type.c_str(),
226  i->second.reader_id.c_str());
227  iface_reader =
228  reader_bb->open_for_reading(i->second.type.c_str(), i->second.reader_id.c_str());
229  logger->log_debug(name(),
230  "Opened interface with serial %s",
231  iface_reader->serial().get_string().c_str());
232 
233  if (iface_reader->has_writer()) {
234  logger->log_debug(name(),
235  "Opening writing on %s (%s:%s)",
236  i->second.remote_writer ? "remotely" : "locally",
237  i->second.type.c_str(),
238  i->second.writer_id.c_str());
239  iface_writer =
240  writer_bb->open_for_writing(i->second.type.c_str(), i->second.writer_id.c_str());
241  }
242 
243  InterfaceInfo ii(&i->second, iface_writer, reader_bb, writer_bb);
244  interfaces_[iface_reader] = ii;
245 
246  } catch (Exception &e) {
247  reader_bb->close(iface_reader);
248  writer_bb->close(iface_writer);
249  throw;
250  }
251 
252  SyncInterfaceListener *sync_listener = NULL;
253  if (iface_writer) {
254  logger->log_debug(name(), "Creating sync listener");
255  sync_listener =
256  new SyncInterfaceListener(logger, iface_reader, iface_writer, reader_bb, writer_bb);
257  }
258  sync_listeners_[iface_reader] = sync_listener;
259 
260  if (i->second.remote_writer) {
261  wsl_local_->add_interface(iface_reader);
262  } else {
263  wsl_remote_->add_interface(iface_reader);
264  }
265  }
266 }
267 
268 void
269 BlackBoardSynchronizationThread::close_interfaces()
270 {
271  SyncListenerMap::iterator s;
272  for (s = sync_listeners_.begin(); s != sync_listeners_.end(); ++s) {
273  if (s->second) {
274  logger->log_debug(name(), "Closing sync listener %s", s->second->bbil_name());
275  delete s->second;
276  }
277  }
278  MutexLocker lock(interfaces_.mutex());
279  InterfaceMap::iterator i;
280  for (i = interfaces_.begin(); i != interfaces_.end(); ++i) {
281  logger->log_debug(name(),
282  "Closing %s reading interface %s",
283  i->second.combo->remote_writer ? "local" : "remote",
284  i->first->uid());
285  if (i->second.combo->remote_writer) {
286  wsl_local_->remove_interface(i->first);
287  blackboard->close(i->first);
288  } else {
289  wsl_remote_->remove_interface(i->first);
290  remote_bb_->close(i->first);
291  }
292  if (i->second.writer) {
293  logger->log_debug(name(),
294  "Closing %s writing interface %s",
295  i->second.combo->remote_writer ? "remote" : "local",
296  i->second.writer->uid());
297  if (i->second.combo->remote_writer) {
298  remote_bb_->close(i->second.writer);
299  } else {
300  blackboard->close(i->second.writer);
301  }
302  }
303  }
304  interfaces_.clear();
305  sync_listeners_.clear();
306 }
307 
308 /** A writer has been added for an interface.
309  * To be called only by SyncWriterInterfaceListener.
310  * @param interface the interface a writer has been added for.
311  */
312 void
314 {
315  MutexLocker lock(interfaces_.mutex());
316 
317  if (interfaces_[interface].writer) {
318  // There exists a writer!?
319  logger->log_warn(name(),
320  "Writer added for %s, but relay exists already. Bug?",
321  interface->uid());
322  } else {
323  logger->log_warn(name(), "Writer added for %s, opening relay writer", interface->uid());
324 
325  Interface * iface = NULL;
326  SyncInterfaceListener *sync_listener = NULL;
327  InterfaceInfo & ii = interfaces_[interface];
328  try {
329  iface = ii.writer_bb->open_for_writing(ii.combo->type.c_str(), ii.combo->writer_id.c_str());
330 
331  logger->log_debug(name(),
332  "Creating sync listener for %s:%s-%s",
333  ii.combo->type.c_str(),
334  ii.combo->reader_id.c_str(),
335  ii.combo->writer_id.c_str());
336 
337  sync_listener =
338  new SyncInterfaceListener(logger, interface, iface, ii.reader_bb, ii.writer_bb);
339 
340  sync_listeners_[interface] = sync_listener;
341  ii.writer = iface;
342 
343  } catch (Exception &e) {
344  delete sync_listener;
345  ii.writer_bb->close(iface);
346  logger->log_error(name(),
347  "Failed to open writer for %s:%s-%s, sync broken",
348  ii.combo->type.c_str(),
349  ii.combo->reader_id.c_str(),
350  ii.combo->writer_id.c_str());
351  logger->log_error(name(), e);
352  }
353  }
354 }
355 
356 /** A writer has been removed for an interface.
357  * To be called only by SyncWriterInterfaceListener.
358  * @param interface the interface a writer has been removed for.
359  */
360 void
362 {
363  MutexLocker lock(interfaces_.mutex());
364 
365  if (!interfaces_[interface].writer) {
366  // We do not have a writer!?
367  logger->log_warn(name(), "Writer removed for %s, but no relay exists. Bug?", interface->uid());
368  } else {
369  logger->log_warn(name(), "Writer removed for %s, closing relay writer", interface->uid());
370 
371  InterfaceInfo &ii = interfaces_[interface];
372  try {
373  delete sync_listeners_[interface];
374  sync_listeners_[interface] = NULL;
375 
376  ii.writer_bb->close(ii.writer);
377  ii.writer = NULL;
378 
379  } catch (Exception &e) {
380  logger->log_error(name(),
381  "Failed to close writer for %s:%s-%s, sync broken",
382  ii.combo->type.c_str(),
383  ii.combo->reader_id.c_str(),
384  ii.combo->writer_id.c_str());
385  logger->log_error(name(), e);
386  }
387  }
388 }
void writer_removed(fawkes::Interface *interface) noexcept
A writer has been removed for an interface.
virtual void init()
Initialize the thread.
Definition: sync_thread.cpp:66
virtual void finalize()
Finalize the thread.
virtual void loop()
Code to execute in the thread.
void writer_added(fawkes::Interface *interface) noexcept
A writer has been added for an interface.
virtual ~BlackBoardSynchronizationThread()
Destructor.
Definition: sync_thread.cpp:61
BlackBoardSynchronizationThread(std::string &bbsync_cfg_prefix, std::string &peer_cfg_prefix, std::string &peer)
Constructor.
Definition: sync_thread.cpp:45
Synchronize two interfaces.
Definition: sync_listener.h:34
Listener for writer events in bbsync plugin.
void add_interface(fawkes::Interface *interface)
Add an interface to listen to.
void remove_interface(fawkes::Interface *interface)
Remove an interface to listen to.
BlackBoard * blackboard
This is the BlackBoard instance you can use to interact with the BlackBoard.
Definition: blackboard.h:44
The BlackBoard abstract class.
Definition: blackboard.h:46
virtual bool is_alive() const noexcept=0
Check if the BlackBoard is still alive.
virtual Interface * open_for_reading(const char *interface_type, const char *identifier, const char *owner=NULL)=0
Open interface for reading.
virtual void unregister_listener(BlackBoardInterfaceListener *listener)
Unregister BB interface listener.
Definition: blackboard.cpp:212
virtual Interface * open_for_writing(const char *interface_type, const char *identifier, const char *owner=NULL)=0
Open interface for writing.
virtual void register_listener(BlackBoardInterfaceListener *listener, ListenerRegisterFlag flag=BBIL_FLAG_ALL)
Register BB event listener.
Definition: blackboard.cpp:185
virtual void close(Interface *interface)=0
Close interface.
Clock * clock
By means of this member access to the clock is given.
Definition: clock.h:42
Configuration * config
This is the Configuration member used to access the configuration.
Definition: configurable.h:41
Iterator interface to iterate over config values.
Definition: config.h:75
virtual const char * path() const =0
Path of value.
virtual bool next()=0
Check if there is another element and advance to this if possible.
virtual const char * type() const =0
Type of value.
virtual std::string get_string() const =0
Get string value.
virtual unsigned int get_uint(const char *path)=0
Get value from configuration which is of type unsigned int.
virtual ValueIterator * search(const char *path)=0
Iterator with search results.
virtual std::string get_string(const char *path)=0
Get value from configuration which is of type string.
Base class for exceptions in Fawkes.
Definition: exception.h:36
void print_trace() noexcept
Prints trace to stderr.
Definition: exception.cpp:601
void append(const char *format,...) noexcept
Append messages to the message list.
Definition: exception.cpp:333
Interface info.
Base class for all Fawkes BlackBoard interfaces.
Definition: interface.h:80
Uuid serial() const
Get instance serial of interface.
Definition: interface.cpp:695
bool has_writer() const
Check if there is a writer for the interface.
Definition: interface.cpp:848
RefPtr< Mutex > mutex() const
Get access to the internal mutex.
Definition: lock_map.h:133
virtual void log_debug(const char *component, const char *format,...)=0
Log debug message.
virtual void log_warn(const char *component, const char *format,...)=0
Log warning message.
virtual void log_info(const char *component, const char *format,...)=0
Log informational message.
Logger * logger
This is the Logger member used to access the logger.
Definition: logging.h:41
virtual void log_warn(const char *component, const char *format,...)
Log warning message.
Definition: multi.cpp:216
virtual void log_debug(const char *component, const char *format,...)
Log debug message.
Definition: multi.cpp:174
virtual void log_error(const char *component, const char *format,...)
Log error message.
Definition: multi.cpp:237
Mutex locking helper.
Definition: mutex_locker.h:34
Remote BlackBoard.
Definition: remote.h:50
Thread class encapsulation of pthreads.
Definition: thread.h:46
void set_prepfin_conc_loop(bool concurrent=true)
Set concurrent execution of prepare_finalize() and loop().
Definition: thread.cpp:716
const char * name() const
Get name of thread.
Definition: thread.h:100
void set_name(const char *format,...)
Set name of thread.
Definition: thread.cpp:748
Time wait utility.
Definition: wait.h:33
void mark_start()
Mark start of loop.
Definition: wait.cpp:68
void wait_systime()
Wait until minimum loop time has been reached in real time.
Definition: wait.cpp:96
std::string get_string() const
Get the string representation of the Uuid.
Definition: uuid.cpp:107
Fawkes library namespace.