PahoMqttCpp
MQTT C++ Client for POSIX and Windows
Loading...
Searching...
No Matches
thread_queue.h
Go to the documentation of this file.
1
8
9/*******************************************************************************
10 * Copyright (c) 2017-2022 Frank Pagliughi <fpagliughi@mindspring.com>
11 *
12 * All rights reserved. This program and the accompanying materials
13 * are made available under the terms of the Eclipse Public License v2.0
14 * and Eclipse Distribution License v1.0 which accompany this distribution.
15 *
16 * The Eclipse Public License is available at
17 * http://www.eclipse.org/legal/epl-v20.html
18 * and the Eclipse Distribution License is available at
19 * http://www.eclipse.org/org/documents/edl-v10.php.
20 *
21 * Contributors:
22 * Frank Pagliughi - initial implementation and documentation
23 *******************************************************************************/
24
25#ifndef __mqtt_thread_queue_h
26#define __mqtt_thread_queue_h
27
28#include <algorithm>
29#include <condition_variable>
30#include <deque>
31#include <limits>
32#include <mutex>
33#include <queue>
34#include <thread>
35
36namespace mqtt {
37
42class queue_closed : public std::runtime_error
43{
44public:
45 queue_closed() : std::runtime_error("queue is closed") {}
46};
47
49
84template <typename T, class Container = std::deque<T>>
86{
87public:
89 using container_type = Container;
91 using value_type = T;
93 using size_type = typename Container::size_type;
94
96 static constexpr size_type MAX_CAPACITY = std::numeric_limits<size_type>::max();
97
98private:
100 mutable std::mutex lock_;
102 std::condition_variable notEmptyCond_;
104 std::condition_variable notFullCond_;
108 bool closed_{false};
109
111 std::queue<T, Container> que_;
112
114 using guard = std::lock_guard<std::mutex>;
116 using unique_guard = std::unique_lock<std::mutex>;
117
119 bool is_done() const {
120 return closed_ && que_.empty();
121 }
122
123public:
135 explicit thread_queue(size_t cap) : cap_(std::max<size_type>(cap, 1)) {}
141 bool empty() const {
142 guard g{lock_};
143 return que_.empty();
144 }
145
150 guard g{lock_};
151 return cap_;
152 }
153
159 void capacity(size_type cap) {
160 guard g{lock_};
161 cap_ = cap;
162 }
163
167 size_type size() const {
168 guard g{lock_};
169 return que_.size();
170 }
171
177 void close() {
178 guard g{lock_};
179 closed_ = true;
180 notFullCond_.notify_all();
181 notEmptyCond_.notify_all();
182 }
183
190 bool closed() const {
191 guard g{lock_};
192 return closed_;
193 }
194
200 bool done() const {
201 guard g{lock_};
202 return is_done();
203 }
204
208 void clear() {
209 guard g{lock_};
210 while (!que_.empty())
211 que_.pop();
212 notFullCond_.notify_all();
213 }
214
220 void put(value_type val) {
221 unique_guard g{lock_};
222 notFullCond_.wait(g, [this] { return que_.size() < cap_ || closed_; });
223 if (closed_) throw queue_closed{};
224
225 que_.emplace(std::move(val));
226 notEmptyCond_.notify_one();
227 }
228
234 bool try_put(value_type val) {
235 guard g{lock_};
236 if (que_.size() >= cap_ || closed_)
237 return false;
238
239 que_.emplace(std::move(val));
240 notEmptyCond_.notify_one();
241 return true;
242 }
243
252 template <typename Rep, class Period>
253 bool try_put_for(value_type val, const std::chrono::duration<Rep, Period>& relTime) {
254 unique_guard g{lock_};
255 bool to = !notFullCond_.wait_for(
256 g, relTime,
257 [this] { return que_.size() < cap_ || closed_; }
258 );
259 if (to || closed_)
260 return false;
261
262 que_.emplace(std::move(val));
263 notEmptyCond_.notify_one();
264 return true;
265 }
266
276 template <class Clock, class Duration>
278 value_type val, const std::chrono::time_point<Clock, Duration>& absTime
279 ) {
280 unique_guard g{lock_};
281 bool to = !notFullCond_.wait_until(
282 g, absTime,
283 [this] { return que_.size() < cap_ || closed_; }
284 );
285
286 if (to || closed_)
287 return false;
288
289 que_.emplace(std::move(val));
290 notEmptyCond_.notify_one();
291 return true;
292 }
293
299 bool get(value_type* val) {
300 if (!val)
301 return false;
302
303 unique_guard g{lock_};
304 notEmptyCond_.wait(g, [this] { return !que_.empty() || closed_; });
305 if (que_.empty()) // We must be done
306 return false;
307
308 *val = std::move(que_.front());
309 que_.pop();
310 notFullCond_.notify_one();
311 return true;
312 }
313
320 unique_guard g{lock_};
321 notEmptyCond_.wait(g, [this] { return !que_.empty() || closed_; });
322 if (que_.empty()) // We must be done
323 throw queue_closed{};
324
325 value_type val = std::move(que_.front());
326 que_.pop();
327 notFullCond_.notify_one();
328 return val;
329 }
330
338 bool try_get(value_type* val) {
339 if (!val)
340 return false;
341
342 guard g{lock_};
343 if (que_.empty())
344 return false;
345
346 *val = std::move(que_.front());
347 que_.pop();
348 notFullCond_.notify_one();
349 return true;
350 }
351
361 template <typename Rep, class Period>
362 bool try_get_for(value_type* val, const std::chrono::duration<Rep, Period>& relTime) {
363 if (!val)
364 return false;
365
366 unique_guard g{lock_};
367 notEmptyCond_.wait_for(
368 g, relTime,
369 [this] { return !que_.empty() || closed_; }
370 );
371
372 if (que_.empty())
373 return false;
374
375 *val = std::move(que_.front());
376 que_.pop();
377 notFullCond_.notify_one();
378 return true;
379 }
380
390 template <class Clock, class Duration>
392 value_type* val, const std::chrono::time_point<Clock, Duration>& absTime
393 ) {
394 if (!val)
395 return false;
396
397 unique_guard g{lock_};
398 notEmptyCond_.wait_until(
399 g, absTime, [this] { return !que_.empty() || closed_; }
400 );
401 if (que_.empty())
402 return false;
403
404 *val = std::move(que_.front());
405 que_.pop();
406 notFullCond_.notify_one();
407 return true;
408 }
409};
410
412} // namespace mqtt
413
414#endif // __mqtt_thread_queue_h
Definition thread_queue.h:43
queue_closed()
Definition thread_queue.h:45
typename Container::size_type size_type
Definition thread_queue.h:93
T value_type
Definition thread_queue.h:91
size_type size() const
Definition thread_queue.h:167
void capacity(size_type cap)
Definition thread_queue.h:159
bool done() const
Definition thread_queue.h:200
Container container_type
Definition thread_queue.h:89
bool try_put(value_type val)
Definition thread_queue.h:234
bool try_get_for(value_type *val, const std::chrono::duration< Rep, Period > &relTime)
Definition thread_queue.h:362
bool try_put_for(value_type val, const std::chrono::duration< Rep, Period > &relTime)
Definition thread_queue.h:253
static constexpr size_type MAX_CAPACITY
Definition thread_queue.h:96
bool try_get(value_type *val)
Definition thread_queue.h:338
thread_queue()
Definition thread_queue.h:128
bool try_put_until(value_type val, const std::chrono::time_point< Clock, Duration > &absTime)
Definition thread_queue.h:277
void clear()
Definition thread_queue.h:208
bool closed() const
Definition thread_queue.h:190
bool get(value_type *val)
Definition thread_queue.h:299
bool empty() const
Definition thread_queue.h:141
void close()
Definition thread_queue.h:177
size_type capacity() const
Definition thread_queue.h:149
thread_queue(size_t cap)
Definition thread_queue.h:135
void put(value_type val)
Definition thread_queue.h:220
value_type get()
Definition thread_queue.h:319
bool try_get_until(value_type *val, const std::chrono::time_point< Clock, Duration > &absTime)
Definition thread_queue.h:391
Definition async_client.h:60