Fawkes API  Fawkes Development Version
interruptible_barrier.cpp
1 
2 /***************************************************************************
3  * interruptible_barrier.cpp - Interruptible Barrier
4  *
5  * Created: Sat Jan 31 12:30:32 2009
6  * Copyright 2006-2009 Tim Niemueller [www.niemueller.de]
7  *
8  ****************************************************************************/
9 
10 /* This program is free software; you can redistribute it and/or modify
11  * it under the terms of the GNU General Public License as published by
12  * the Free Software Foundation; either version 2 of the License, or
13  * (at your option) any later version. A runtime exception applies to
14  * this software (see LICENSE.GPL_WRE file mentioned below for details).
15  *
16  * This program is distributed in the hope that it will be useful,
17  * but WITHOUT ANY WARRANTY; without even the implied warranty of
18  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19  * GNU Library General Public License for more details.
20  *
21  * Read the full text in the LICENSE.GPL_WRE file in the doc directory.
22  */
23 
24 #include <core/exceptions/system.h>
25 #include <core/macros.h>
26 #include <core/threading/interruptible_barrier.h>
27 #include <core/threading/mutex.h>
28 #include <core/threading/thread_list.h>
29 #include <core/threading/wait_condition.h>
30 
31 namespace fawkes {
32 
33 /// @cond INTERNALS
34 class InterruptibleBarrierData
35 {
36 public:
37  unsigned int threads_left;
38  Mutex * mutex;
39  WaitCondition *waitcond;
40  bool own_mutex;
41 
42  InterruptibleBarrierData(Mutex *mutex)
43  {
44  if (mutex) {
45  this->mutex = mutex;
46  own_mutex = false;
47  } else {
48  this->mutex = new Mutex();
49  own_mutex = true;
50  }
51  waitcond = new WaitCondition(this->mutex);
52  }
53 
54  ~InterruptibleBarrierData()
55  {
56  if (own_mutex)
57  delete mutex;
58  delete waitcond;
59  }
60 };
61 /// @endcond
62 
63 /** @class InterruptibleBarrier <core/threading/barrier.h>
64  * A barrier is a synchronization tool which blocks until a given number of
65  * threads have reached the barrier. This particular implementations allows for
66  * giving a timeout after which the waiting is aborted.
67  *
68  * For general information when a barrier is useful see the Barrier class.
69  *
70  * Additionally to the general barrier features the InterruptibleBarrier::wait()
71  * can be given a timeout after which the waiting is aborted.
72  * Since the POSIX standard does not provide a timed wait for barriers this
73  * implementation uses a Mutex and WaitCondition internally to achieve the
74  * desired result.
75  *
76  * @see Barrier
77  * @ingroup Threading
78  * @author Tim Niemueller
79  */
80 
81 /** Constructor.
82  * @param count the number of threads to wait for
83  */
85 {
86  _count = count;
87  if (_count == 0) {
88  throw Exception("Barrier count must be at least 1");
89  }
90  data_ = new InterruptibleBarrierData(NULL);
91  data_->threads_left = 0;
92  passed_threads_ = RefPtr<ThreadList>(new ThreadList());
93 
94  interrupted_ = false;
95  timeout_ = false;
96  num_threads_in_wait_function_ = 0;
97 }
98 
99 /** Constructor with custom mutex.
100  * Use this constructor only if you really know what you are doing. This constructor
101  * allows to pass a mutex that is used internally for the barrier. Note that in
102  * this case it is your duty to lock the mutex before the wait() and unlock
103  * afterwards! It combines features of a barrier and a wait condition.
104  * @param mutex Mutex to use
105  * @param count the number of threads to wait for
106  */
107 InterruptibleBarrier::InterruptibleBarrier(Mutex *mutex, unsigned int count) : Barrier(count)
108 {
109  _count = count;
110  if (_count == 0) {
111  throw Exception("Barrier count must be at least 1");
112  }
113  data_ = new InterruptibleBarrierData(mutex);
114  data_->threads_left = 0;
115  passed_threads_ = RefPtr<ThreadList>(new ThreadList());
116 
117  interrupted_ = false;
118  timeout_ = false;
119  num_threads_in_wait_function_ = 0;
120 }
121 
122 /** Invalid constructor.
123  * This will throw an exception if called as it is illegal to copy
124  * a barrier.
125  * @param barrier to copy
126  */
128 {
129  throw Exception("Barriers cannot be copied");
130 }
131 
132 /** Invalid constructor.
133  * This will throw an exception if called as it is illegal to copy
134  * a barrier.
135  * @param barrier to copy
136  */
137 InterruptibleBarrier::InterruptibleBarrier(const InterruptibleBarrier *b) : Barrier()
138 {
139  throw Exception("Barriers cannot be copied");
140 }
141 
142 /** Invalid assignment operator.
143  * This will throw an exception if called as it is illegal to assign
144  * a barrier.
145  * @param barrier to copy
146  */
147 InterruptibleBarrier &
148 InterruptibleBarrier::operator=(const InterruptibleBarrier &b)
149 {
150  throw Exception("Barriers cannot be assigned");
151 }
152 
153 /** Invalid assignment operator.
154  * This will throw an exception if called as it is illegal to assign
155  * a barrier.
156  * @param barrier to copy
157  */
158 InterruptibleBarrier &
159 InterruptibleBarrier::operator=(const InterruptibleBarrier *b)
160 {
161  throw Exception("Barriers cannot be assigned");
162 }
163 
164 /** Destructor */
166 {
167  delete data_;
168 }
169 
170 /** Get a list of threads that passed the barrier.
171  * The list contains the threads that passed the barrier. With some book keeping
172  * outside of the barrier you can determine which threads you expected at the
173  * barrier but did not pass it.
174  * @return refptr to list of threads that passed the barrier.
175  */
178 {
179  return passed_threads_;
180 }
181 
182 /** Interrupt the barrier.
183  * This will cause all threads currently waiting on the barrier to
184  * throw an exception and no further thread will wait.
185  * You have to call reset() the before you use this barrier
186  * the next time.
187  */
188 void
190 {
191  if (likely(data_->own_mutex))
192  data_->mutex->lock();
193  interrupted_ = true;
194  data_->waitcond->wake_all();
195  if (likely(data_->own_mutex))
196  data_->mutex->unlock();
197 }
198 
199 /** Clears the barrier.
200  * Call this method when you want to use the barrier the next time after
201  * an interrupt or timeout occured. Make sure all threads that should have
202  * passed the barrier the last time did pass it.
203  */
204 void
206 {
207  if (likely(data_->own_mutex))
208  data_->mutex->lock();
209  interrupted_ = false;
210  timeout_ = false;
211  data_->threads_left = _count;
212  passed_threads_.clear();
213  if (likely(data_->own_mutex))
214  data_->mutex->unlock();
215 }
216 
217 /** Wait for other threads.
218  * This method will block until as many threads have called wait as you have
219  * given count to the constructor. Note that if the barrier is interrupted or
220  * times out you need to call reset() to get the barrier into a re-usable state.
221  * It is your duty to make sure that all threads using the barrier are in a
222  * cohesive state.
223  * @param timeout_sec relative timeout in seconds, added to timeout_nanosec
224  * @param timeout_nanosec timeout in nanoseconds
225  * @return true, if the barrier was properly reached, false if the barrier timeout
226  * was reached and the wait did not finish properly.
227  * @exception InterruptedException thrown if the barrier was forcefully interrupted
228  * by calling interrupt().
229  */
230 bool
231 InterruptibleBarrier::wait(unsigned int timeout_sec, unsigned int timeout_nanosec)
232 {
233  if (likely(data_->own_mutex))
234  data_->mutex->lock();
235  num_threads_in_wait_function_++;
236 
237  if (data_->threads_left == 0) {
238  // first to come
239  timeout_ = interrupted_ = wait_at_barrier_ = false;
240  data_->threads_left = _count;
241  passed_threads_->clear();
242  } else {
243  if (interrupted_ || timeout_) {
244  // interrupted or timed out threads need to be reset if they should be reused
245  num_threads_in_wait_function_--;
246  if (likely(data_->own_mutex))
247  data_->mutex->unlock();
248  return true;
249  }
250  }
251 
252  --data_->threads_left;
253  try {
254  passed_threads_->push_back_locked(Thread::current_thread());
255  } catch (Exception &e) {
256  // Cannot do anything more useful :-/
257  // to stay fully compatible with Barrier we do *not* re-throw
258  e.print_trace();
259  }
260 
261  bool local_timeout = false;
262 
263  //Am I the last thread the interruptable barrier is waiting for? Then I can wake the others up.
264  bool waker = (data_->threads_left == 0);
265 
266  while (data_->threads_left && !interrupted_ && !timeout_ && !local_timeout) {
267  //Here, the threads are waiting for the barrier
268  //pthread_cond_timedwait releases data_->mutex if it is not external
269  local_timeout = !data_->waitcond->reltimed_wait(timeout_sec, timeout_nanosec);
270  //before continuing, pthread_cond_timedwait locks data_->mutex again if it is not external
271  }
272 
273  if (local_timeout) {
274  //set timeout flag of the interruptable barrier so the other threads can continue
275  timeout_ = true;
276  }
277 
278  if (interrupted_) {
279  if (likely(data_->own_mutex))
280  data_->mutex->unlock();
281  throw InterruptedException("InterruptibleBarrier forcefully interrupted, only "
282  "%u of %u threads reached the barrier",
283  _count - data_->threads_left,
284  _count);
285  }
286 
287  if (waker) {
288  //all threads of this barrier have to synchronize at the standard Barrier
289  wait_at_barrier_ = true;
290  }
291 
292  if (waker || local_timeout) {
293  //the other threads can stop waiting in th while-loop
294  data_->waitcond->wake_all();
295  }
296 
297  if (likely(data_->own_mutex))
298  data_->mutex->unlock();
299 
300  if (wait_at_barrier_) {
301  //hard synchronization
302  Barrier::wait();
303  }
304 
305  if (likely(data_->own_mutex))
306  data_->mutex->lock();
307  //increment is not threadsafe
308  num_threads_in_wait_function_--;
309  if (likely(data_->own_mutex))
310  data_->mutex->unlock();
311 
312  return !timeout_;
313 }
314 
315 /** Checks if there are no more threads in the wait() function.
316  * This method is used to prevent the destruction of the barrier
317  * while there are threads in wait().
318  * @return true, if no thread currently is in wait()
319  */
320 bool
322 {
323  if (likely(data_->own_mutex))
324  data_->mutex->lock();
325  bool res = num_threads_in_wait_function_ == 0;
326  if (likely(data_->own_mutex))
327  data_->mutex->unlock();
328 
329  return res;
330 }
331 
332 } // end namespace fawkes
A barrier is a synchronization tool which blocks until a given number of threads have reached the bar...
Definition: barrier.h:32
unsigned int _count
Number of threads that are expected to wait for the barrier.
Definition: barrier.h:47
virtual void wait()
Wait for other threads.
Definition: barrier.cpp:153
unsigned int count()
Get number of threads this barrier will wait for.
Definition: barrier.cpp:176
Base class for exceptions in Fawkes.
Definition: exception.h:36
void print_trace() noexcept
Prints trace to stderr.
Definition: exception.cpp:601
The current system call has been interrupted (for instance by a signal).
Definition: system.h:39
A barrier is a synchronization tool which blocks until a given number of threads have reached the bar...
void interrupt() noexcept
Interrupt the barrier.
virtual void wait()
Wait for other threads.
InterruptibleBarrier(unsigned int count)
Constructor.
void reset() noexcept
Clears the barrier.
virtual ~InterruptibleBarrier()
Destructor.
bool no_threads_in_wait()
Checks if there are no more threads in the wait() function.
RefPtr< ThreadList > passed_threads()
Get a list of threads that passed the barrier.
Mutex mutual exclusion lock.
Definition: mutex.h:33
RefPtr<> is a reference-counting shared smartpointer.
Definition: refptr.h:50
List of threads.
Definition: thread_list.h:56
static Thread * current_thread()
Get the Thread instance of the currently running thread.
Definition: thread.cpp:1366
Fawkes library namespace.