GNU Radio's TEST Package
readerwriterqueue.h
Go to the documentation of this file.
1// ©2013-2015 Cameron Desrochers.
2// Distributed under the simplified BSD license (see the license file that
3// should have come with this header).
4
5#pragma once
6
7#include "atomicops.h"
8#include <type_traits>
9#include <utility>
10#include <cassert>
11#include <stdexcept>
12#include <cstdint>
13#include <cstdlib> // For malloc/free & size_t
14
15
16// A lock-free queue for a single-consumer, single-producer architecture.
17// The queue is also wait-free in the common path (except if more memory
18// needs to be allocated, in which case malloc is called).
19// Allocates memory sparingly (O(lg(n) times, amortized), and only once if
20// the original maximum size estimate is never exceeded.
21// Tested on x86/x64 processors, but semantics should be correct for all
22// architectures (given the right implementations in atomicops.h), provided
23// that aligned integer and pointer accesses are naturally atomic.
24// Note that there should only be one consumer thread and producer thread;
25// Switching roles of the threads, or using multiple consecutive threads for
26// one role, is not safe unless properly synchronized.
27// Using the queue exclusively from one thread is fine, though a bit silly.
28
29#define CACHE_LINE_SIZE 64
30
31#ifdef AE_VCPP
32#pragma warning(push)
33#pragma warning(disable: 4324) // structure was padded due to __declspec(align())
34#pragma warning(disable: 4820) // padding was added
35#pragma warning(disable: 4127) // conditional expression is constant
36#endif
37
38namespace moodycamel {
39
40template<typename T, size_t MAX_BLOCK_SIZE = 512>
42{
43 // Design: Based on a queue-of-queues. The low-level queues are just
44 // circular buffers with front and tail indices indicating where the
45 // next element to dequeue is and where the next element can be enqueued,
46 // respectively. Each low-level queue is called a "block". Each block
47 // wastes exactly one element's worth of space to keep the design simple
48 // (if front == tail then the queue is empty, and can't be full).
49 // The high-level queue is a circular linked list of blocks; again there
50 // is a front and tail, but this time they are pointers to the blocks.
51 // The front block is where the next element to be dequeued is, provided
52 // the block is not empty. The back block is where elements are to be
53 // enqueued, provided the block is not full.
54 // The producer thread owns all the tail indices/pointers. The consumer
55 // thread owns all the front indices/pointers. Both threads read each
56 // other's variables, but only the owning thread updates them. E.g. After
57 // the consumer reads the producer's tail, the tail may change before the
58 // consumer is done dequeuing an object, but the consumer knows the tail
59 // will never go backwards, only forwards.
60 // If there is no room to enqueue an object, an additional block (of
61 // equal size to the last block) is added. Blocks are never removed.
62
63public:
64 // Constructs a queue that can hold maxSize elements without further
65 // allocations. If more than MAX_BLOCK_SIZE elements are requested,
66 // then several blocks of MAX_BLOCK_SIZE each are reserved (including
67 // at least one extra buffer block).
68 explicit ReaderWriterQueue(size_t maxSize = 15)
69#ifndef NDEBUG
70 : enqueuing(false)
71 ,dequeuing(false)
72#endif
73 {
74 assert(maxSize > 0);
75 assert(MAX_BLOCK_SIZE == ceilToPow2(MAX_BLOCK_SIZE) && "MAX_BLOCK_SIZE must be a power of 2");
76 assert(MAX_BLOCK_SIZE >= 2 && "MAX_BLOCK_SIZE must be at least 2");
77
78 Block* firstBlock = nullptr;
79
80 largestBlockSize = ceilToPow2(maxSize + 1); // We need a spare slot to fit maxSize elements in the block
81 if (largestBlockSize > MAX_BLOCK_SIZE * 2) {
82 // We need a spare block in case the producer is writing to a different block the consumer is reading from, and
83 // wants to enqueue the maximum number of elements. We also need a spare element in each block to avoid the ambiguity
84 // between front == tail meaning "empty" and "full".
85 // So the effective number of slots that are guaranteed to be usable at any time is the block size - 1 times the
86 // number of blocks - 1. Solving for maxSize and applying a ceiling to the division gives us (after simplifying):
87 size_t initialBlockCount = (maxSize + MAX_BLOCK_SIZE * 2 - 3) / (MAX_BLOCK_SIZE - 1);
88 largestBlockSize = MAX_BLOCK_SIZE;
89 Block* lastBlock = nullptr;
90 for (size_t i = 0; i != initialBlockCount; ++i) {
91 auto block = make_block(largestBlockSize);
92 if (block == nullptr) {
93 throw std::bad_alloc();
94 }
95 if (firstBlock == nullptr) {
96 firstBlock = block;
97 }
98 else {
99 lastBlock->next = block;
100 }
101 lastBlock = block;
102 block->next = firstBlock;
103 }
104 }
105 else {
106 firstBlock = make_block(largestBlockSize);
107 if (firstBlock == nullptr) {
108 throw std::bad_alloc();
109 }
110 firstBlock->next = firstBlock;
111 }
112 frontBlock = firstBlock;
113 tailBlock = firstBlock;
114
115 // Make sure the reader/writer threads will have the initialized memory setup above:
117 }
118
119 // Note: The queue should not be accessed concurrently while it's
120 // being deleted. It's up to the user to synchronize this.
122 {
123 // Make sure we get the latest version of all variables from other CPUs:
125
126 // Destroy any remaining objects in queue and free memory
127 Block* frontBlock_ = frontBlock;
128 Block* block = frontBlock_;
129 do {
130 Block* nextBlock = block->next;
131 size_t blockFront = block->front;
132 size_t blockTail = block->tail;
133
134 for (size_t i = blockFront; i != blockTail; i = (i + 1) & block->sizeMask) {
135 auto element = reinterpret_cast<T*>(block->data + i * sizeof(T));
136 element->~T();
137 (void)element;
138 }
139
140 auto rawBlock = block->rawThis;
141 block->~Block();
142 std::free(rawBlock);
143 block = nextBlock;
144 } while (block != frontBlock_);
145 }
146
147
148 // Enqueues a copy of element if there is room in the queue.
149 // Returns true if the element was enqueued, false otherwise.
150 // Does not allocate memory.
151 AE_FORCEINLINE bool try_enqueue(T const& element)
152 {
153 return inner_enqueue<CannotAlloc>(element);
154 }
155
156 // Enqueues a moved copy of element if there is room in the queue.
157 // Returns true if the element was enqueued, false otherwise.
158 // Does not allocate memory.
159 AE_FORCEINLINE bool try_enqueue(T&& element)
160 {
161 return inner_enqueue<CannotAlloc>(std::forward<T>(element));
162 }
163
164
165 // Enqueues a copy of element on the queue.
166 // Allocates an additional block of memory if needed.
167 // Only fails (returns false) if memory allocation fails.
168 AE_FORCEINLINE bool enqueue(T const& element)
169 {
170 return inner_enqueue<CanAlloc>(element);
171 }
172
173 // Enqueues a moved copy of element on the queue.
174 // Allocates an additional block of memory if needed.
175 // Only fails (returns false) if memory allocation fails.
176 AE_FORCEINLINE bool enqueue(T&& element)
177 {
178 return inner_enqueue<CanAlloc>(std::forward<T>(element));
179 }
180
181
182 // Attempts to dequeue an element; if the queue is empty,
183 // returns false instead. If the queue has at least one element,
184 // moves front to result using operator=, then returns true.
185 template<typename U>
186 bool try_dequeue(U& result)
187 {
188#ifndef NDEBUG
189 ReentrantGuard guard(this->dequeuing);
190#endif
191
192 // High-level pseudocode:
193 // Remember where the tail block is
194 // If the front block has an element in it, dequeue it
195 // Else
196 // If front block was the tail block when we entered the function, return false
197 // Else advance to next block and dequeue the item there
198
199 // Note that we have to use the value of the tail block from before we check if the front
200 // block is full or not, in case the front block is empty and then, before we check if the
201 // tail block is at the front block or not, the producer fills up the front block *and
202 // moves on*, which would make us skip a filled block. Seems unlikely, but was consistently
203 // reproducible in practice.
204 // In order to avoid overhead in the common case, though, we do a double-checked pattern
205 // where we have the fast path if the front block is not empty, then read the tail block,
206 // then re-read the front block and check if it's not empty again, then check if the tail
207 // block has advanced.
208
209 Block* frontBlock_ = frontBlock.load();
210 size_t blockTail = frontBlock_->localTail;
211 size_t blockFront = frontBlock_->front.load();
212
213 if (blockFront != blockTail || blockFront != (frontBlock_->localTail = frontBlock_->tail.load())) {
215
216 non_empty_front_block:
217 // Front block not empty, dequeue from here
218 auto element = reinterpret_cast<T*>(frontBlock_->data + blockFront * sizeof(T));
219 result = std::move(*element);
220 element->~T();
221
222 blockFront = (blockFront + 1) & frontBlock_->sizeMask;
223
225 frontBlock_->front = blockFront;
226 }
227 else if (frontBlock_ != tailBlock.load()) {
229
230 frontBlock_ = frontBlock.load();
231 blockTail = frontBlock_->localTail = frontBlock_->tail.load();
232 blockFront = frontBlock_->front.load();
234
235 if (blockFront != blockTail) {
236 // Oh look, the front block isn't empty after all
237 goto non_empty_front_block;
238 }
239
240 // Front block is empty but there's another block ahead, advance to it
241 Block* nextBlock = frontBlock_->next;
242 // Don't need an acquire fence here since next can only ever be set on the tailBlock,
243 // and we're not the tailBlock, and we did an acquire earlier after reading tailBlock which
244 // ensures next is up-to-date on this CPU in case we recently were at tailBlock.
245
246 size_t nextBlockFront = nextBlock->front.load();
247 size_t nextBlockTail = nextBlock->localTail = nextBlock->tail.load();
249
250 // Since the tailBlock is only ever advanced after being written to,
251 // we know there's for sure an element to dequeue on it
252 assert(nextBlockFront != nextBlockTail);
253 AE_UNUSED(nextBlockTail);
254
255 // We're done with this block, let the producer use it if it needs
256 fence(memory_order_release); // Expose possibly pending changes to frontBlock->front from last dequeue
257 frontBlock = frontBlock_ = nextBlock;
258
259 compiler_fence(memory_order_release); // Not strictly needed
260
261 auto element = reinterpret_cast<T*>(frontBlock_->data + nextBlockFront * sizeof(T));
262
263 result = std::move(*element);
264 element->~T();
265
266 nextBlockFront = (nextBlockFront + 1) & frontBlock_->sizeMask;
267
269 frontBlock_->front = nextBlockFront;
270 }
271 else {
272 // No elements in current block and no other block to advance to
273 return false;
274 }
275
276 return true;
277 }
278
279
280 // Returns a pointer to the front element in the queue (the one that
281 // would be removed next by a call to `try_dequeue` or `pop`). If the
282 // queue appears empty at the time the method is called, nullptr is
283 // returned instead.
284 // Must be called only from the consumer thread.
285 T* peek()
286 {
287#ifndef NDEBUG
288 ReentrantGuard guard(this->dequeuing);
289#endif
290 // See try_dequeue() for reasoning
291
292 Block* frontBlock_ = frontBlock.load();
293 size_t blockTail = frontBlock_->localTail;
294 size_t blockFront = frontBlock_->front.load();
295
296 if (blockFront != blockTail || blockFront != (frontBlock_->localTail = frontBlock_->tail.load())) {
298 non_empty_front_block:
299 return reinterpret_cast<T*>(frontBlock_->data + blockFront * sizeof(T));
300 }
301 else if (frontBlock_ != tailBlock.load()) {
303 frontBlock_ = frontBlock.load();
304 blockTail = frontBlock_->localTail = frontBlock_->tail.load();
305 blockFront = frontBlock_->front.load();
307
308 if (blockFront != blockTail) {
309 goto non_empty_front_block;
310 }
311
312 Block* nextBlock = frontBlock_->next;
313
314 size_t nextBlockFront = nextBlock->front.load();
316
317 assert(nextBlockFront != nextBlock->tail.load());
318 return reinterpret_cast<T*>(nextBlock->data + nextBlockFront * sizeof(T));
319 }
320
321 return nullptr;
322 }
323
324 // Removes the front element from the queue, if any, without returning it.
325 // Returns true on success, or false if the queue appeared empty at the time
326 // `pop` was called.
327 bool pop()
328 {
329#ifndef NDEBUG
330 ReentrantGuard guard(this->dequeuing);
331#endif
332 // See try_dequeue() for reasoning
333
334 Block* frontBlock_ = frontBlock.load();
335 size_t blockTail = frontBlock_->localTail;
336 size_t blockFront = frontBlock_->front.load();
337
338 if (blockFront != blockTail || blockFront != (frontBlock_->localTail = frontBlock_->tail.load())) {
340
341 non_empty_front_block:
342 auto element = reinterpret_cast<T*>(frontBlock_->data + blockFront * sizeof(T));
343 element->~T();
344
345 blockFront = (blockFront + 1) & frontBlock_->sizeMask;
346
348 frontBlock_->front = blockFront;
349 }
350 else if (frontBlock_ != tailBlock.load()) {
352 frontBlock_ = frontBlock.load();
353 blockTail = frontBlock_->localTail = frontBlock_->tail.load();
354 blockFront = frontBlock_->front.load();
356
357 if (blockFront != blockTail) {
358 goto non_empty_front_block;
359 }
360
361 // Front block is empty but there's another block ahead, advance to it
362 Block* nextBlock = frontBlock_->next;
363
364 size_t nextBlockFront = nextBlock->front.load();
365 size_t nextBlockTail = nextBlock->localTail = nextBlock->tail.load();
367
368 assert(nextBlockFront != nextBlockTail);
369 AE_UNUSED(nextBlockTail);
370
372 frontBlock = frontBlock_ = nextBlock;
373
375
376 auto element = reinterpret_cast<T*>(frontBlock_->data + nextBlockFront * sizeof(T));
377 element->~T();
378
379 nextBlockFront = (nextBlockFront + 1) & frontBlock_->sizeMask;
380
382 frontBlock_->front = nextBlockFront;
383 }
384 else {
385 // No elements in current block and no other block to advance to
386 return false;
387 }
388
389 return true;
390 }
391
392 // Returns the approximate number of items currently in the queue.
393 // Safe to call from both the producer and consumer threads.
394 inline size_t size_approx() const
395 {
396 size_t result = 0;
397 Block* frontBlock_ = frontBlock.load();
398 Block* block = frontBlock_;
399 do {
401 size_t blockFront = block->front.load();
402 size_t blockTail = block->tail.load();
403 result += (blockTail - blockFront) & block->sizeMask;
404 block = block->next.load();
405 } while (block != frontBlock_);
406 return result;
407 }
408
409
410private:
411 enum AllocationMode { CanAlloc, CannotAlloc };
412
413 template<AllocationMode canAlloc, typename U>
414 bool inner_enqueue(U&& element)
415 {
416#ifndef NDEBUG
417 ReentrantGuard guard(this->enqueuing);
418#endif
419
420 // High-level pseudocode (assuming we're allowed to alloc a new block):
421 // If room in tail block, add to tail
422 // Else check next block
423 // If next block is not the head block, enqueue on next block
424 // Else create a new block and enqueue there
425 // Advance tail to the block we just enqueued to
426
427 Block* tailBlock_ = tailBlock.load();
428 size_t blockFront = tailBlock_->localFront;
429 size_t blockTail = tailBlock_->tail.load();
430
431 size_t nextBlockTail = (blockTail + 1) & tailBlock_->sizeMask;
432 if (nextBlockTail != blockFront || nextBlockTail != (tailBlock_->localFront = tailBlock_->front.load())) {
434 // This block has room for at least one more element
435 char* location = tailBlock_->data + blockTail * sizeof(T);
436 new (location) T(std::forward<U>(element));
437
439 tailBlock_->tail = nextBlockTail;
440 }
441 else {
443 if (tailBlock_->next.load() != frontBlock) {
444 // Note that the reason we can't advance to the frontBlock and start adding new entries there
445 // is because if we did, then dequeue would stay in that block, eventually reading the new values,
446 // instead of advancing to the next full block (whose values were enqueued first and so should be
447 // consumed first).
448
449 fence(memory_order_acquire); // Ensure we get latest writes if we got the latest frontBlock
450
451 // tailBlock is full, but there's a free block ahead, use it
452 Block* tailBlockNext = tailBlock_->next.load();
453 size_t nextBlockFront = tailBlockNext->localFront = tailBlockNext->front.load();
454 nextBlockTail = tailBlockNext->tail.load();
456
457 // This block must be empty since it's not the head block and we
458 // go through the blocks in a circle
459 assert(nextBlockFront == nextBlockTail);
460 tailBlockNext->localFront = nextBlockFront;
461
462 char* location = tailBlockNext->data + nextBlockTail * sizeof(T);
463 new (location) T(std::forward<U>(element));
464
465 tailBlockNext->tail = (nextBlockTail + 1) & tailBlockNext->sizeMask;
466
468 tailBlock = tailBlockNext;
469 }
470 else if (canAlloc == CanAlloc) {
471 // tailBlock is full and there's no free block ahead; create a new block
472 auto newBlockSize = largestBlockSize >= MAX_BLOCK_SIZE ? largestBlockSize : largestBlockSize * 2;
473 auto newBlock = make_block(newBlockSize);
474 if (newBlock == nullptr) {
475 // Could not allocate a block!
476 return false;
477 }
478 largestBlockSize = newBlockSize;
479
480 new (newBlock->data) T(std::forward<U>(element));
481
482 assert(newBlock->front == 0);
483 newBlock->tail = newBlock->localTail = 1;
484
485 newBlock->next = tailBlock_->next.load();
486 tailBlock_->next = newBlock;
487
488 // Might be possible for the dequeue thread to see the new tailBlock->next
489 // *without* seeing the new tailBlock value, but this is OK since it can't
490 // advance to the next block until tailBlock is set anyway (because the only
491 // case where it could try to read the next is if it's already at the tailBlock,
492 // and it won't advance past tailBlock in any circumstance).
493
495 tailBlock = newBlock;
496 }
497 else if (canAlloc == CannotAlloc) {
498 // Would have had to allocate a new block to enqueue, but not allowed
499 return false;
500 }
501 else {
502 assert(false && "Should be unreachable code");
503 return false;
504 }
505 }
506
507 return true;
508 }
509
510
511 // Disable copying
513
514 // Disable assignment
515 ReaderWriterQueue& operator=(ReaderWriterQueue const&) { }
516
517
518
519 AE_FORCEINLINE static size_t ceilToPow2(size_t x)
520 {
521 // From http://graphics.stanford.edu/~seander/bithacks.html#RoundUpPowerOf2
522 --x;
523 x |= x >> 1;
524 x |= x >> 2;
525 x |= x >> 4;
526 for (size_t i = 1; i < sizeof(size_t); i <<= 1) {
527 x |= x >> (i << 3);
528 }
529 ++x;
530 return x;
531 }
532
533 template<typename U>
534 static AE_FORCEINLINE char* align_for(char* ptr)
535 {
536 const std::size_t alignment = std::alignment_of<U>::value;
537 return ptr + (alignment - (reinterpret_cast<std::uintptr_t>(ptr) % alignment)) % alignment;
538 }
539private:
540#ifndef NDEBUG
541 struct ReentrantGuard
542 {
543 ReentrantGuard(bool& _inSection)
544 : inSection(_inSection)
545 {
546 assert(!inSection);
547 if (inSection) {
548 throw std::runtime_error("ReaderWriterQueue does not support enqueuing or dequeuing elements from other elements' ctors and dtors");
549 }
550
551 inSection = true;
552 }
553
554 ~ReentrantGuard() { inSection = false; }
555
556 private:
557 ReentrantGuard& operator=(ReentrantGuard const&);
558
559 private:
560 bool& inSection;
561 };
562#endif
563
564 struct Block
565 {
566 // Avoid false-sharing by putting highly contended variables on their own cache lines
567 weak_atomic<size_t> front; // (Atomic) Elements are read from here
568 size_t localTail; // An uncontended shadow copy of tail, owned by the consumer
569
570 char cachelineFiller0[CACHE_LINE_SIZE - sizeof(weak_atomic<size_t>) - sizeof(size_t)];
571 weak_atomic<size_t> tail; // (Atomic) Elements are enqueued here
572 size_t localFront;
573
574 char cachelineFiller1[CACHE_LINE_SIZE - sizeof(weak_atomic<size_t>) - sizeof(size_t)]; // next isn't very contended, but we don't want it on the same cache line as tail (which is)
575 weak_atomic<Block*> next; // (Atomic)
576
577 char* data; // Contents (on heap) are aligned to T's alignment
578
579 const size_t sizeMask;
580
581
582 // size must be a power of two (and greater than 0)
583 Block(size_t const& _size, char* _rawThis, char* _data)
584 : front(0), localTail(0), tail(0), localFront(0), next(nullptr), data(_data), sizeMask(_size - 1), rawThis(_rawThis)
585 {
586 }
587
588 private:
589 // C4512 - Assignment operator could not be generated
590 Block& operator=(Block const&);
591
592 public:
593 char* rawThis;
594 };
595
596
597 static Block* make_block(size_t capacity)
598 {
599 // Allocate enough memory for the block itself, as well as all the elements it will contain
600 auto size = sizeof(Block) + std::alignment_of<Block>::value - 1;
601 size += sizeof(T) * capacity + std::alignment_of<T>::value - 1;
602 auto newBlockRaw = static_cast<char*>(std::malloc(size));
603 if (newBlockRaw == nullptr) {
604 return nullptr;
605 }
606
607 auto newBlockAligned = align_for<Block>(newBlockRaw);
608 auto newBlockData = align_for<T>(newBlockAligned + sizeof(Block));
609 return new (newBlockAligned) Block(capacity, newBlockRaw, newBlockData);
610 }
611
612private:
613 weak_atomic<Block*> frontBlock; // (Atomic) Elements are enqueued to this block
614
615 char cachelineFiller[CACHE_LINE_SIZE - sizeof(weak_atomic<Block*>)];
616 weak_atomic<Block*> tailBlock; // (Atomic) Elements are dequeued from this block
617
618 size_t largestBlockSize;
619
620#ifndef NDEBUG
621 bool enqueuing;
622 bool dequeuing;
623#endif
624};
625
626// Like ReaderWriterQueue, but also providees blocking operations
627template<typename T, size_t MAX_BLOCK_SIZE = 512>
629{
630private:
631 typedef ::moodycamel::ReaderWriterQueue<T, MAX_BLOCK_SIZE> ReaderWriterQueue;
632
633public:
634 explicit BlockingReaderWriterQueue(size_t maxSize = 15)
635 : inner(maxSize)
636 { }
637
638
639 // Enqueues a copy of element if there is room in the queue.
640 // Returns true if the element was enqueued, false otherwise.
641 // Does not allocate memory.
642 AE_FORCEINLINE bool try_enqueue(T const& element)
643 {
644 if (inner.try_enqueue(element)) {
645 sema.signal();
646 return true;
647 }
648 return false;
649 }
650
651 // Enqueues a moved copy of element if there is room in the queue.
652 // Returns true if the element was enqueued, false otherwise.
653 // Does not allocate memory.
654 AE_FORCEINLINE bool try_enqueue(T&& element)
655 {
656 if (inner.try_enqueue(std::forward<T>(element))) {
657 sema.signal();
658 return true;
659 }
660 return false;
661 }
662
663
664 // Enqueues a copy of element on the queue.
665 // Allocates an additional block of memory if needed.
666 // Only fails (returns false) if memory allocation fails.
667 AE_FORCEINLINE bool enqueue(T const& element)
668 {
669 if (inner.enqueue(element)) {
670 sema.signal();
671 return true;
672 }
673 return false;
674 }
675
676 // Enqueues a moved copy of element on the queue.
677 // Allocates an additional block of memory if needed.
678 // Only fails (returns false) if memory allocation fails.
679 AE_FORCEINLINE bool enqueue(T&& element)
680 {
681 if (inner.enqueue(std::forward<T>(element))) {
682 sema.signal();
683 return true;
684 }
685 return false;
686 }
687
688
689 // Attempts to dequeue an element; if the queue is empty,
690 // returns false instead. If the queue has at least one element,
691 // moves front to result using operator=, then returns true.
692 template<typename U>
693 bool try_dequeue(U& result)
694 {
695 if (sema.tryWait()) {
696 bool success = inner.try_dequeue(result);
697 assert(success);
698 AE_UNUSED(success);
699 return true;
700 }
701 return false;
702 }
703
704
705 // Attempts to dequeue an element; if the queue is empty,
706 // waits until an element is available, then dequeues it.
707 template<typename U>
708 void wait_dequeue(U& result)
709 {
710 sema.wait();
711 bool success = inner.try_dequeue(result);
712 AE_UNUSED(result);
713 assert(success);
714 AE_UNUSED(success);
715 }
716
717
718 // Returns a pointer to the front element in the queue (the one that
719 // would be removed next by a call to `try_dequeue` or `pop`). If the
720 // queue appears empty at the time the method is called, nullptr is
721 // returned instead.
722 // Must be called only from the consumer thread.
724 {
725 return inner.peek();
726 }
727
728 // Removes the front element from the queue, if any, without returning it.
729 // Returns true on success, or false if the queue appeared empty at the time
730 // `pop` was called.
732 {
733 if (sema.tryWait()) {
734 bool result = inner.pop();
735 assert(result);
736 AE_UNUSED(result);
737 return true;
738 }
739 return false;
740 }
741
742 // Returns the approximate number of items currently in the queue.
743 // Safe to call from both the producer and consumer threads.
745 {
746 return sema.availableApprox();
747 }
748
749
750private:
751 // Disable copying & assignment
753 BlockingReaderWriterQueue& operator=(ReaderWriterQueue const&) { }
754
755private:
756 ReaderWriterQueue inner;
757 spsc_sema::LightweightSemaphore sema;
758};
759
760} // end namespace moodycamel
761
762#ifdef AE_VCPP
763#pragma warning(pop)
764#endif
#define AE_UNUSED(x)
Definition atomicops.h:41
#define AE_FORCEINLINE
Definition atomicops.h:51
AE_FORCEINLINE T * peek()
Definition readerwriterqueue.h:723
AE_FORCEINLINE bool enqueue(T const &element)
Definition readerwriterqueue.h:667
AE_FORCEINLINE bool try_enqueue(T &&element)
Definition readerwriterqueue.h:654
AE_FORCEINLINE size_t size_approx() const
Definition readerwriterqueue.h:744
AE_FORCEINLINE bool pop()
Definition readerwriterqueue.h:731
void wait_dequeue(U &result)
Definition readerwriterqueue.h:708
AE_FORCEINLINE bool try_enqueue(T const &element)
Definition readerwriterqueue.h:642
BlockingReaderWriterQueue(size_t maxSize=15)
Definition readerwriterqueue.h:634
bool try_dequeue(U &result)
Definition readerwriterqueue.h:693
AE_FORCEINLINE bool enqueue(T &&element)
Definition readerwriterqueue.h:679
Definition readerwriterqueue.h:42
size_t size_approx() const
Definition readerwriterqueue.h:394
bool pop()
Definition readerwriterqueue.h:327
ReaderWriterQueue(size_t maxSize=15)
Definition readerwriterqueue.h:68
T * peek()
Definition readerwriterqueue.h:285
AE_FORCEINLINE bool try_enqueue(T const &element)
Definition readerwriterqueue.h:151
AE_FORCEINLINE bool enqueue(T const &element)
Definition readerwriterqueue.h:168
AE_FORCEINLINE bool enqueue(T &&element)
Definition readerwriterqueue.h:176
AE_FORCEINLINE bool try_enqueue(T &&element)
Definition readerwriterqueue.h:159
bool try_dequeue(U &result)
Definition readerwriterqueue.h:186
~ReaderWriterQueue()
Definition readerwriterqueue.h:121
AE_FORCEINLINE T load() const
Definition atomicops.h:293
Definition atomicops.h:68
@ memory_order_acquire
Definition atomicops.h:72
@ memory_order_sync
Definition atomicops.h:79
@ memory_order_release
Definition atomicops.h:73
AE_FORCEINLINE void fence(memory_order order)
Definition atomicops.h:193
AE_FORCEINLINE void compiler_fence(memory_order order)
Definition atomicops.h:181
#define CACHE_LINE_SIZE
Definition readerwriterqueue.h:29