Fawkes API  Fawkes Development Version
syncpoint.cpp
1 /***************************************************************************
2  * syncpoint.cpp - Fawkes SyncPoint
3  *
4  * Created: Thu Jan 09 12:35:57 2014
5  * Copyright 2014-2018 Till Hofmann
6  *
7  ****************************************************************************/
8 
9 /* This program is free software; you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation; either version 2 of the License, or
12  * (at your option) any later version.
13  *
14  * This program is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17  * GNU Library General Public License for more details.
18  *
19  * Read the full text in the LICENSE.GPL file in the doc directory.
20  */
21 
22 #include <core/threading/mutex_locker.h>
23 #include <syncpoint/exceptions.h>
24 #include <syncpoint/syncpoint.h>
25 #include <utils/time/time.h>
26 
27 #include <algorithm>
28 #include <pthread.h>
29 #include <sstream>
30 #include <string.h>
31 
32 using namespace std;
33 
34 namespace fawkes {
35 
36 /** @class SyncPoint <syncpoint/syncpoint.h>
37  * The SyncPoint class.
38  * This class is used for dynamic synchronization of threads which depend
39  * on each other, e.g. threads which are part of a processing chain.
40  *
41  * As an example, thread E generates data which is needed by thread W.
42  * Therefore, both threads share a SyncPoint.
43  * Thread W wait()s for the SyncPoint to be emitted.
44  * Once thread E is done, it emit()s the SyncPoint, which wakes up thread W.
45  *
46  * @author Till Hofmann
47  * @see SyncPointManager
48  */
49 
50 /** Constructor.
51  * @param identifier The identifier of the SyncPoint. This must be in absolute
52  * path style, e.g. '/some/syncpoint'.
53  * @param logger The logger to use for error messages.
54  * @param max_waittime_sec the maximum number of seconds to wait until a timeout
55  * is triggered
56  * @param max_waittime_nsec the maximum number of nanoseconds to wait until a
57  * timeout is triggered
58  */
59 SyncPoint::SyncPoint(string identifier,
60  MultiLogger *logger,
61  uint max_waittime_sec /* = 0 */,
62  uint max_waittime_nsec /* = 0 */)
63 : identifier_(identifier),
64  emit_calls_(CircularBuffer<SyncPointCall>(1000)),
65  wait_for_one_calls_(CircularBuffer<SyncPointCall>(1000)),
66  wait_for_all_calls_(CircularBuffer<SyncPointCall>(1000)),
67  creation_time_(Time()),
68  mutex_(new Mutex()),
69  mutex_next_wait_(new Mutex()),
70  cond_next_wait_(new WaitCondition(mutex_next_wait_)),
71  mutex_wait_for_one_(new Mutex()),
72  cond_wait_for_one_(new WaitCondition(mutex_wait_for_one_)),
73  mutex_wait_for_all_(new Mutex()),
74  cond_wait_for_all_(new WaitCondition(mutex_wait_for_all_)),
75  wait_for_all_timer_running_(false),
76  max_waittime_sec_(max_waittime_sec),
77  max_waittime_nsec_(max_waittime_nsec),
78  logger_(logger),
79  last_emitter_reset_(Time(0l))
80 {
81  if (identifier.empty()) {
82  cleanup();
83  throw SyncPointInvalidIdentifierException(identifier.c_str());
84  }
85  if (identifier.compare(0, 1, "/")) {
86  cleanup();
87  throw SyncPointInvalidIdentifierException(identifier.c_str());
88  }
89  // check if last charater is '/'
90  // The identifier may only end in '/' if '/' is the complete identifier.
91  // '/' is allowed, '/some/' is not allowed
92  if (identifier != "/" && !identifier.compare(identifier.size() - 1, 1, "/")) {
93  cleanup();
94  throw SyncPointInvalidIdentifierException(identifier.c_str());
95  }
96 }
97 
98 SyncPoint::~SyncPoint()
99 {
100  cleanup();
101 }
102 
103 /**
104  * @return the identifier of the SyncPoint
105  */
106 string
108 {
109  return identifier_;
110 }
111 
112 /** EqualOperator.
113  * Two SyncPoints are considered equal iff they have the same identifier
114  * @param other The other SyncPoint
115  * @return true if the identifiers of the SyncPoints are equal
116  */
117 bool
118 SyncPoint::operator==(const SyncPoint &other) const
119 {
120  return identifier_ == other.get_identifier();
121 }
122 
123 /** EqualOperator.
124  * A SyncPoint is equal to a given string iff the string is equal to the
125  * SyncPoint's identifier.
126  * @param other the string to compare
127  * @return true if the identifier of the SyncPoint matches the given string
128  */
129 bool
130 SyncPoint::operator==(const string &other) const
131 {
132  return identifier_ == other;
133 }
134 
135 /** LessThan Operator.
136  * Compare two SyncPoints using their identifiers.
137  * @param other The other SyncPoint
138  * @return true if strcmp returns a value < 0 for the identifiers
139  */
140 bool
141 SyncPoint::operator<(const SyncPoint &other) const
142 {
143  return identifier_ < other.get_identifier();
144 }
145 
146 /** Wake up all components which are waiting for this SyncPoint
147  * @param component The identifier of the component emitting the SyncPoint
148  */
149 void
150 SyncPoint::emit(const std::string &component)
151 {
152  emit(component, true);
153 }
154 
155 /** Wake up all components which are waiting for this SyncPoint
156  * @param component The identifier of the component emitting the SyncPoint
157  * @param remove_from_pending if set to true, the component will be removed
158  * from the pending emitters for this syncpoint
159  */
160 void
161 SyncPoint::emit(const std::string &component, bool remove_from_pending)
162 {
164  if (!emit_locker_.empty()) {
166  }
168  MutexLocker ml(mutex_);
169  if (!watchers_.count(component)) {
170  throw SyncPointNonWatcherCalledEmitException(component.c_str(), get_identifier().c_str());
171  }
172 
173  // unlock all wait_for_one waiters
174  watchers_wait_for_one_.clear();
178 
179  if (!emitters_.count(component)) {
180  throw SyncPointNonEmitterCalledEmitException(component.c_str(), get_identifier().c_str());
181  }
182 
183  /* 1. remember whether the component was pending; if so, it may be removed
184  * from the pending components of the predecessor. Otherwise, it should
185  * not be removed
186  * 2. only erase the component once; it may be registered multiple times
187  */
188  bool pred_remove_from_pending = false;
189  if (remove_from_pending) {
190  multiset<string>::iterator it_pending = pending_emitters_.find(component);
191  if (it_pending != pending_emitters_.end()) {
192  pending_emitters_.erase(it_pending);
193  if (predecessor_) {
194  if (last_emitter_reset_ <= predecessor_->last_emitter_reset_) {
195  pred_remove_from_pending = true;
196  }
197  }
198 
199  // unlock all wait_for_all waiters if all pending emitters have emitted
200  if (pending_emitters_.empty()) {
201  watchers_wait_for_all_.clear();
205  reset_emitters();
206  }
207  }
208  }
209 
210  emit_calls_.push_back(SyncPointCall(component));
211 
212  if (predecessor_) {
213  predecessor_->emit(component, pred_remove_from_pending);
214  }
215 }
216 
217 /** Wait until SyncPoint is emitted.
218  * Either wait until a single emitter has emitted the SyncPoint, or wait
219  * until all registered emitters have emitted the SyncPoint.
220  * If wait_sec != 0 or wait_nsec !=0, then only wait for
221  * wait_sec + wait_nsec*10^-9 seconds and set the SyncPoint's maximum waiting
222  * time to the specified time (i.e., on any subsequent wait calls, wait for
223  * the specified time until a timeout is triggered).
224  * If the maximal wait time has been exceeded, a warning is shown and the
225  * SyncPoint is released.
226  * If the WakeupType is WAIT_FOR_ALL, then the time limit is only used if there
227  * is currently no other component waiting in WAIT_FOR_ALL mode. If there is
228  * already a component waiting, that component's wait_time is used to compute
229  * the timeout. This ensures that in case a timeout occurs, all waiting
230  * components in WAIT_FOR_ALL mode are released simultaneously. Components in
231  * WAIT_FOR_ONE mode are treated separately and have their own timeouts.
232  * @param component The identifier of the component waiting for the SyncPoint
233  * @param type the wakeup type. If this is set to WAIT_FOR_ONE, wait returns
234  * when a single emitter has emitted the SyncPoint. If set to WAIT_FOR_ALL, wait
235  * until all registered emitters have emitted the SyncPoint.
236  * @param wait_sec number of seconds to wait for the SyncPoint
237  * @param wait_nsec number of nanoseconds to wait for the SyncPoint
238  * @see SyncPoint::WakeupType
239  */
240 void
241 SyncPoint::wait(const std::string &component,
242  WakeupType type /* = WAIT_FOR_ONE */,
243  uint wait_sec /* = 0 */,
244  uint wait_nsec /* = 0 */)
245 {
246  MutexLocker ml(mutex_);
247 
248  std::set<std::string> * watchers;
249  WaitCondition * cond;
251  Mutex * mutex_cond;
252  bool * timer_running;
253  string * timer_owner;
254  // set watchers, cond and calls depending of the Wakeup type
255  if (type == WAIT_FOR_ONE) {
256  watchers = &watchers_wait_for_one_;
257  cond = cond_wait_for_one_;
258  mutex_cond = mutex_wait_for_one_;
259  calls = &wait_for_one_calls_;
260  timer_running = NULL;
261  } else if (type == WAIT_FOR_ALL) {
262  watchers = &watchers_wait_for_all_;
263  cond = cond_wait_for_all_;
264  mutex_cond = mutex_wait_for_all_;
265  timer_running = &wait_for_all_timer_running_;
266  timer_owner = &wait_for_all_timer_owner_;
267  calls = &wait_for_all_calls_;
268  } else {
270  }
271 
272  Time start;
273  mutex_cond->lock();
274 
275  // check if calling component is registered for this SyncPoint
276  if (!watchers_.count(component)) {
277  mutex_cond->unlock();
278  throw SyncPointNonWatcherCalledWaitException(component.c_str(), get_identifier().c_str());
279  }
280  // check if calling component is not already waiting
281  if (watchers->count(component)) {
282  mutex_cond->unlock();
283  throw SyncPointMultipleWaitCallsException(component.c_str(), get_identifier().c_str());
284  }
285 
286  /* if type == WAIT_FOR_ALL but no emitter has registered, we can
287  * immediately return
288  * if type == WAIT_FOR_ONE, we always wait
289  */
290  bool need_to_wait = !emitters_.empty() || type == WAIT_FOR_ONE;
291  if (need_to_wait) {
292  watchers->insert(component);
293  }
294 
296  if (emit_locker_ == component) {
297  emit_locker_ = "";
299  }
301  if (need_to_wait) {
302  if (type == WAIT_FOR_ONE) {
303  ml.unlock();
304  bool timeout;
305  pthread_cleanup_push(cleanup_mutex, mutex_cond);
306  timeout = !cond->reltimed_wait(wait_sec, wait_nsec);
307  pthread_cleanup_pop(1);
308  if (timeout) {
309  ml.relock();
310  handle_default(component, type);
311  ml.unlock();
312  }
313  } else {
314  if (*timer_running) {
315  ml.unlock();
316  pthread_cleanup_push(cleanup_mutex, mutex_cond);
317  cond->wait();
318  pthread_cleanup_pop(1);
319  } else {
320  *timer_running = true;
321  *timer_owner = component;
322  if (wait_sec != 0 || wait_nsec != 0) {
323  max_waittime_sec_ = wait_sec;
324  max_waittime_nsec_ = wait_nsec;
325  }
326  ml.unlock();
327  bool timeout;
328  pthread_cleanup_push(cleanup_mutex, mutex_cond);
330  pthread_cleanup_pop(1);
331  ml.relock();
332  *timer_running = false;
333  if (timeout) {
334  // wait failed, handle default
335  handle_default(component, type);
336  mutex_cond->lock();
337  cond->wake_all();
338  mutex_cond->unlock();
339  }
340  ml.unlock();
341  }
342  }
343  } else {
344  ml.unlock();
345  mutex_cond->unlock();
346  }
347  Time wait_time = Time() - start;
348  ml.relock();
349  calls->push_back(SyncPointCall(component, start, wait_time));
350 }
351 
352 /** Wait for a single emitter.
353  * @param component The identifier of the calling component.
354  */
355 void
356 SyncPoint::wait_for_one(const string &component)
357 {
358  wait(component, WAIT_FOR_ONE);
359 }
360 
361 /** Wait for all registered emitters.
362  * @param component The identifier of the calling component.
363  */
364 void
365 SyncPoint::wait_for_all(const string &component)
366 {
367  wait(component, WAIT_FOR_ALL);
368 }
369 
370 /** Wait for a single emitter for the given time.
371  * @param component The identifier of the calling component.
372  * @param wait_sec number of seconds to wait
373  * @param wait_nsec number of nanoseconds to wait additionally to wait_sec
374  */
375 void
376 SyncPoint::reltime_wait_for_one(const string &component, uint wait_sec, uint wait_nsec)
377 {
378  wait(component, SyncPoint::WAIT_FOR_ONE, wait_sec, wait_nsec);
379 }
380 
381 /** Wait for all registered emitters for the given time.
382  * @param component The identifier of the calling component.
383  * @param wait_sec number of seconds to wait
384  * @param wait_nsec number of nanoseconds to wait additionally to wait_sec
385  */
386 void
387 SyncPoint::reltime_wait_for_all(const string &component, uint wait_sec, uint wait_nsec)
388 {
389  wait(component, SyncPoint::WAIT_FOR_ALL, wait_sec, wait_nsec);
390 }
391 
392 /** Do not wait for the SyncPoint any longer.
393  * Removes the component from the list of waiters. If the given component is
394  * not waiting, do nothing.
395  * @param component the component to remove from the waiters
396  */
397 void
398 SyncPoint::unwait(const string &component)
399 {
400  MutexLocker ml(mutex_);
401  watchers_wait_for_one_.erase(component);
402  watchers_wait_for_all_.erase(component);
403  if (wait_for_all_timer_owner_ == component) {
404  // TODO: this lets the other waiting components wait indefinitely, even on
405  // a timed wait.
407  }
408 }
409 
410 /** Lock the SyncPoint for emitters until the specified component does the next
411  * wait() call. This forces an emitter of this SyncPoint to wait during the
412  * emit until the waiter calls wait(). This is useful if you want to guarantee
413  * that the waiter does not call wait() immediately after the emitter has
414  * called emit().
415  * @param component the component locking the SyncPoint
416  */
417 void
418 SyncPoint::lock_until_next_wait(const string &component)
419 {
420  MutexLocker ml(mutex_);
422  if (emit_locker_.empty()) {
423  emit_locker_ = component;
424  } else {
425  logger_->log_warn("SyncPoints",
426  "%s tried to call lock_until_next_wait, "
427  "but %s already did the same. Ignoring.",
428  component.c_str(),
429  emit_locker_.c_str());
430  }
432 }
433 
434 /** Register an emitter. A thread can only emit the barrier if it has been
435  * registered.
436  * @param component The identifier of the registering component.
437  */
438 void
439 SyncPoint::register_emitter(const string &component)
440 {
441  MutexLocker ml(mutex_);
442  emitters_.insert(component);
443  pending_emitters_.insert(component);
444  if (predecessor_) {
445  predecessor_->register_emitter(component);
446  }
447 }
448 
449 /** Unregister an emitter. This removes the component from the syncpoint, thus
450  * other components will not wait for it anymore.
451  * @param component The identifier of the component which is unregistered.
452  * @param emit_if_pending if this is set to true and the component is a
453  * pending emitter, emit the syncpoint before releasing it.
454  */
455 void
456 SyncPoint::unregister_emitter(const string &component, bool emit_if_pending)
457 {
458  // TODO should this throw if the calling component is not registered?
459  multiset<string>::iterator it_emitter = emitters_.find(component);
460  if (it_emitter == emitters_.end()) {
461  // component is not an emitter
462  return;
463  }
464  MutexLocker ml(mutex_);
465  if (emit_if_pending && is_pending(component)) {
466  ml.unlock();
467  emit(component);
468  ml.relock();
469  }
470 
471  // erase a single element from the set of emitters
472  emitters_.erase(it_emitter);
473  if (predecessor_) {
474  // never emit the predecessor if it's pending; it is already emitted above
475  predecessor_->unregister_emitter(component, false);
476  }
477 }
478 
479 /** Check if the given component is an emitter.
480  * @param component The name of the component.
481  * @return True iff the given component is an emitter of this syncpoint.
482  */
483 bool
484 SyncPoint::is_emitter(const string &component) const
485 {
486  MutexLocker ml(mutex_);
487  return emitters_.count(component) > 0;
488 }
489 
490 /** Check if the given component is a watch.
491  * @param component The name of the component.
492  * @return True iff the given component is a watcher.
493  */
494 bool
495 SyncPoint::is_watcher(const string &component) const
496 {
497  MutexLocker ml(mutex_);
498  return watchers_.count(component) > 0;
499 }
500 
501 /** Add a watcher to the watch list
502  * @param watcher the new watcher
503  * @return A pair, of which the first element is an iterator that points
504  * to the possibly inserted element, and the second is a bool
505  * that is true if the element was actually inserted.
506  */
507 pair<set<string>::iterator, bool>
508 SyncPoint::add_watcher(string watcher)
509 {
510  MutexLocker ml(mutex_);
511  return watchers_.insert(watcher);
512 }
513 
514 /**
515  * @return all watchers of the SyncPoint
516  */
517 std::set<std::string>
519 {
520  MutexLocker ml(mutex_);
521  return watchers_;
522 }
523 
524 /**
525  * @return a copy of the wait call buffer with the given type
526  * @param type the type of the wait call buffer
527  */
529 SyncPoint::get_wait_calls(WakeupType type /* = WAIT_FOR_ONE */) const
530 {
531  MutexLocker ml(mutex_);
532  if (type == WAIT_FOR_ONE) {
533  return wait_for_one_calls_;
534  } else if (type == WAIT_FOR_ALL) {
535  return wait_for_all_calls_;
536  } else {
538  }
539 }
540 
541 /**
542  * @return a copy of the set of registered emitters
543  */
544 multiset<string>
546 {
547  return emitters_;
548 }
549 
550 /**
551  * @return a copy of the emit call buffer
552  */
555 {
556  MutexLocker ml(mutex_);
557  return emit_calls_;
558 }
559 
560 /**
561  * Check if the given waiter is currently waiting with the given type
562  * @param watcher the string identifier of the watcher to check
563  * @param type the type of call to check
564  * @return true if the waiter is currently waiting
565  */
566 bool
567 SyncPoint::watcher_is_waiting(std::string watcher, WakeupType type) const
568 {
569  switch (type) {
570  case SyncPoint::WAIT_FOR_ONE: {
572  return watchers_wait_for_one_.count(watcher);
573  }
574  case SyncPoint::WAIT_FOR_ALL: {
576  return watchers_wait_for_all_.count(watcher);
577  }
578  default: throw Exception("Unknown watch type %u for syncpoint %s", type, identifier_.c_str());
579  }
580 }
581 
582 void
583 SyncPoint::reset_emitters()
584 {
585  last_emitter_reset_ = Time();
586  pending_emitters_ = emitters_;
587 }
588 
589 bool
590 SyncPoint::is_pending(string component)
591 {
592  return pending_emitters_.count(component) > 0;
593 }
594 
595 void
596 SyncPoint::handle_default(string component, WakeupType type)
597 {
598  logger_->log_warn(component.c_str(),
599  "Thread time limit exceeded while waiting for syncpoint '%s'. "
600  "Time limit: %f sec.",
601  get_identifier().c_str(),
602  max_waittime_sec_ + static_cast<float>(max_waittime_nsec_) / 1000000000.f);
603  bad_components_.insert(pending_emitters_.begin(), pending_emitters_.end());
604  if (!bad_components_.empty()) {
605  stringstream message;
606  for (set<string>::const_iterator it = bad_components_.begin(); it != bad_components_.end();
607  it++) {
608  message << " " << *it;
609  const auto &last_call =
610  std::find_if(emit_calls_.rbegin(), emit_calls_.rend(), [&](const SyncPointCall &call) {
611  return call.get_caller() == *it;
612  });
613  if (last_call != emit_calls_.rend()) {
614  message << " (" << Time().in_sec() - last_call->get_call_time().in_sec() << "s)";
615  }
616  }
617  logger_->log_warn(component.c_str(), "bad components:%s", message.str().c_str());
618  } else if (type == SyncPoint::WAIT_FOR_ALL) {
619  throw Exception("SyncPoints: component %s defaulted, "
620  "but there is no pending emitter. This is probably a bug.",
621  component.c_str());
622  }
623 
624  watchers_wait_for_all_.erase(component);
625  watchers_wait_for_one_.erase(component);
626 }
627 
628 void
629 SyncPoint::cleanup()
630 {
631  delete cond_wait_for_one_;
632  delete mutex_wait_for_one_;
633  delete cond_wait_for_all_;
634  delete mutex_wait_for_all_;
635  delete mutex_next_wait_;
636  delete mutex_;
637 }
638 } // namespace fawkes
Circular buffer with a fixed size.
void push_back(const Type &val)
Insert an element at the end of the buffer and delete the first element if necessary.
Base class for exceptions in Fawkes.
Definition: exception.h:36
Log through multiple loggers.
Definition: multi.h:35
virtual void log_warn(const char *component, const char *format,...)
Log warning message.
Definition: multi.cpp:216
Mutex locking helper.
Definition: mutex_locker.h:34
void relock()
Lock this mutex, again.
void unlock()
Unlock the mutex.
Mutex mutual exclusion lock.
Definition: mutex.h:33
void lock()
Lock this mutex.
Definition: mutex.cpp:87
void unlock()
Unlock the mutex.
Definition: mutex.cpp:131
A call (wait() or emit()) to a SyncPoint.
Invalid identifier used (i.e.
Definition: exceptions.h:122
A component called wait() but is already waiting.
Definition: exceptions.h:156
Emit was called on a SyncBarrier but the calling component is not registered as emitter.
Definition: exceptions.h:174
Emit was called by a component which isn't in the watcher set (or wrong component argument was passed...
Definition: exceptions.h:52
Emit was called by a component which isn't in the watcher set (or wrong component argument was passed...
Definition: exceptions.h:70
The SyncPoint class.
Definition: syncpoint.h:50
bool operator<(const SyncPoint &other) const
LessThan Operator.
Definition: syncpoint.cpp:141
virtual void reltime_wait_for_one(const std::string &component, uint wait_sec, uint wait_nsec)
wait for the sync point, but abort after given time
Definition: syncpoint.cpp:376
CircularBuffer< SyncPointCall > get_wait_calls(WakeupType type=WAIT_FOR_ONE) const
Definition: syncpoint.cpp:529
virtual void wait_for_one(const std::string &component)
Wait for a single emitter.
Definition: syncpoint.cpp:356
virtual void wait(const std::string &component, WakeupType=WAIT_FOR_ONE, uint wait_sec=0, uint wait_nsec=0)
wait for the sync point to be emitted by any other component
Definition: syncpoint.cpp:241
std::multiset< std::string > get_emitters() const
Definition: syncpoint.cpp:545
virtual void reltime_wait_for_all(const std::string &component, uint wait_sec, uint wait_nsec)
Wait for all registered emitters for the given time.
Definition: syncpoint.cpp:387
std::set< std::string > watchers_wait_for_one_
Set of all components which are currently waiting for a single emitter.
Definition: syncpoint.h:117
bool wait_for_all_timer_running_
true if the wait for all timer is running
Definition: syncpoint.h:145
virtual void unwait(const std::string &component)
abort waiting
Definition: syncpoint.cpp:398
CircularBuffer< SyncPointCall > wait_for_all_calls_
A buffer of the most recent wait calls of type WAIT_FOR_ALL.
Definition: syncpoint.h:126
Mutex * mutex_wait_for_one_
Mutex used for cond_wait_for_one_.
Definition: syncpoint.h:137
WaitCondition * cond_wait_for_one_
WaitCondition which is used for wait_for_one()
Definition: syncpoint.h:139
virtual void unregister_emitter(const std::string &component, bool emit_if_pending=true)
unregister as emitter
Definition: syncpoint.cpp:456
uint max_waittime_sec_
maximum waiting time in secs
Definition: syncpoint.h:149
std::string wait_for_all_timer_owner_
the component that started the wait-for-all timer
Definition: syncpoint.h:147
Mutex * mutex_next_wait_
Mutex used to allow lock_until_next_wait.
Definition: syncpoint.h:133
uint max_waittime_nsec_
maximum waiting time in nsecs
Definition: syncpoint.h:151
Mutex * mutex_wait_for_all_
Mutex used for cond_wait_for_all_.
Definition: syncpoint.h:141
std::set< std::string > get_watchers() const
Definition: syncpoint.cpp:518
std::set< std::string > watchers_wait_for_all_
Set of all components which are currently waiting on the barrier.
Definition: syncpoint.h:119
bool is_emitter(const std::string &component) const
Check if the given component is an emitter.
Definition: syncpoint.cpp:484
std::pair< std::set< std::string >::iterator, bool > add_watcher(std::string watcher)
Add a watcher to the watch list.
Definition: syncpoint.cpp:508
virtual void register_emitter(const std::string &component)
register as emitter
Definition: syncpoint.cpp:439
bool watcher_is_waiting(std::string watcher, WakeupType type) const
Check if the given waiter is currently waiting with the given type.
Definition: syncpoint.cpp:567
virtual void emit(const std::string &component)
send a signal to all waiting threads
Definition: syncpoint.cpp:150
WaitCondition * cond_next_wait_
WaitCondition used for lock_until_next_wait.
Definition: syncpoint.h:135
MultiLogger * logger_
Logger.
Definition: syncpoint.h:154
CircularBuffer< SyncPointCall > get_emit_calls() const
Definition: syncpoint.cpp:554
void lock_until_next_wait(const std::string &component)
Lock the SyncPoint for emitters until the specified component does the next wait() call.
Definition: syncpoint.cpp:418
std::set< std::string > watchers_
Set of all components which use this SyncPoint.
Definition: syncpoint.h:115
CircularBuffer< SyncPointCall > emit_calls_
A buffer of the most recent emit calls.
Definition: syncpoint.h:122
CircularBuffer< SyncPointCall > wait_for_one_calls_
A buffer of the most recent wait calls of type WAIT_FOR_ONE.
Definition: syncpoint.h:124
std::string get_identifier() const
Definition: syncpoint.cpp:107
const std::string identifier_
The unique identifier of the SyncPoint.
Definition: syncpoint.h:113
bool is_watcher(const std::string &component) const
Check if the given component is a watch.
Definition: syncpoint.cpp:495
virtual void wait_for_all(const std::string &component)
Wait for all registered emitters.
Definition: syncpoint.cpp:365
WakeupType
Type to define when a thread wakes up after waiting for a SyncPoint.
Definition: syncpoint.h:56
WaitCondition * cond_wait_for_all_
WaitCondition which is used for wait_for_all()
Definition: syncpoint.h:143
bool operator==(const SyncPoint &other) const
EqualOperator.
Definition: syncpoint.cpp:118
Mutex * mutex_
Mutex used to protect all member variables.
Definition: syncpoint.h:131
A class for handling time.
Definition: time.h:93
Wait until a given condition holds.
void wait()
Wait for the condition forever.
void wake_all()
Wake up all waiting threads.
bool reltimed_wait(unsigned int sec, unsigned int nanosec)
Wait with relative timeout.
Fawkes library namespace.