1/*
2 * Copyright (c) Facebook, Inc. and its affiliates.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#pragma once
18
19#include <atomic>
20#include <chrono>
21#include <memory>
22
23#include <glog/logging.h>
24
25#include <folly/ConstexprMath.h>
26#include <folly/Optional.h>
27#include <folly/Traits.h>
28#include <folly/concurrency/CacheLocality.h>
29#include <folly/lang/Align.h>
30#include <folly/synchronization/Hazptr.h>
31#include <folly/synchronization/SaturatingSemaphore.h>
32#include <folly/synchronization/WaitOptions.h>
33#include <folly/synchronization/detail/Spin.h>
34
35namespace folly {
36
37/// UnboundedQueue supports a variety of options for unbounded
38/// dynamically expanding an shrinking queues, including variations of:
39/// - Single vs. multiple producers
40/// - Single vs. multiple consumers
41/// - Blocking vs. spin-waiting
42/// - Non-waiting, timed, and waiting consumer operations.
43/// Producer operations never wait or fail (unless out-of-memory).
44///
45/// Template parameters:
46/// - T: element type
47/// - SingleProducer: true if there can be only one producer at a
48/// time.
49/// - SingleConsumer: true if there can be only one consumer at a
50/// time.
51/// - MayBlock: true if consumers may block, false if they only
52/// spin. A performance tuning parameter.
53/// - LgSegmentSize (default 8): Log base 2 of number of elements per
54/// segment. A performance tuning parameter. See below.
55/// - LgAlign (default 7): Log base 2 of alignment directive; can be
56/// used to balance scalability (avoidance of false sharing) with
57/// memory efficiency.
58///
59/// When to use UnboundedQueue:
60/// - If a small bound may lead to deadlock or performance degradation
61/// under bursty patterns.
62/// - If there is no risk of the queue growing too much.
63///
64/// When not to use UnboundedQueue:
65/// - If there is risk of the queue growing too much and a large bound
66/// is acceptable, then use DynamicBoundedQueue.
67/// - If the queue must not allocate on enqueue or it must have a
68/// small bound, then use fixed-size MPMCQueue or (if non-blocking
69/// SPSC) ProducerConsumerQueue.
70///
71/// Template Aliases:
72/// USPSCQueue<T, MayBlock, LgSegmentSize, LgAlign>
73/// UMPSCQueue<T, MayBlock, LgSegmentSize, LgAlign>
74/// USPMCQueue<T, MayBlock, LgSegmentSize, LgAlign>
75/// UMPMCQueue<T, MayBlock, LgSegmentSize, LgAlign>
76///
77/// Functions:
78/// Producer operations never wait or fail (unless OOM)
79/// void enqueue(const T&);
80/// void enqueue(T&&);
81/// Adds an element to the end of the queue.
82///
83/// Consumer operations:
84/// void dequeue(T&);
85/// T dequeue();
86/// Extracts an element from the front of the queue. Waits
87/// until an element is available if needed.
88/// bool try_dequeue(T&);
89/// folly::Optional<T> try_dequeue();
90/// Tries to extract an element from the front of the queue
91/// if available.
92/// bool try_dequeue_until(T&, time_point& deadline);
93/// folly::Optional<T> try_dequeue_until(time_point& deadline);
94/// Tries to extract an element from the front of the queue
95/// if available until the specified deadline.
96/// bool try_dequeue_for(T&, duration&);
97/// folly::Optional<T> try_dequeue_for(duration&);
98/// Tries to extract an element from the front of the queue if
99/// available until the expiration of the specified duration.
100/// const T* try_peek();
101/// Returns pointer to the element at the front of the queue
102/// if available, or nullptr if the queue is empty. Only for
103/// SPSC and MPSC.
104///
105/// Secondary functions:
106/// size_t size();
107/// Returns an estimate of the size of the queue.
108/// bool empty();
109/// Returns true only if the queue was empty during the call.
110/// Note: size() and empty() are guaranteed to be accurate only if
111/// the queue is not changed concurrently.
112///
113/// Usage examples:
114/// @code
115/// /* UMPSC, doesn't block, 1024 int elements per segment */
116/// UMPSCQueue<int, false, 10> q;
117/// q.enqueue(1);
118/// q.enqueue(2);
119/// q.enqueue(3);
120/// ASSERT_FALSE(q.empty());
121/// ASSERT_EQ(q.size(), 3);
122/// int v;
123/// q.dequeue(v);
124/// ASSERT_EQ(v, 1);
125/// ASSERT_TRUE(try_dequeue(v));
126/// ASSERT_EQ(v, 2);
127/// ASSERT_TRUE(try_dequeue_until(v, now() + seconds(1)));
128/// ASSERT_EQ(v, 3);
129/// ASSERT_TRUE(q.empty());
130/// ASSERT_EQ(q.size(), 0);
131/// ASSERT_FALSE(try_dequeue(v));
132/// ASSERT_FALSE(try_dequeue_for(v, microseconds(100)));
133/// @endcode
134///
135/// Design:
136/// - The queue is composed of one or more segments. Each segment has
137/// a fixed size of 2^LgSegmentSize entries. Each segment is used
138/// exactly once.
139/// - Each entry is composed of a futex and a single element.
140/// - The queue contains two 64-bit ticket variables. The producer
141/// ticket counts the number of producer tickets issued so far, and
142/// the same for the consumer ticket. Each ticket number corresponds
143/// to a specific entry in a specific segment.
144/// - The queue maintains two pointers, head and tail. Head points to
145/// the segment that corresponds to the current consumer
146/// ticket. Similarly, tail pointer points to the segment that
147/// corresponds to the producer ticket.
148/// - Segments are organized as a singly linked list.
149/// - The producer with the first ticket in the current producer
150/// segment has primary responsibility for allocating and linking
151/// the next segment. Other producers and connsumers may help do so
152/// when needed if that thread is delayed.
153/// - The producer with the last ticket in the current producer
154/// segment is primarily responsible for advancing the tail pointer
155/// to the next segment. Other producers and consumers may help do
156/// so when needed if that thread is delayed.
157/// - Similarly, the consumer with the last ticket in the current
158/// consumer segment is primarily responsible for advancing the head
159/// pointer to the next segment. Other consumers may help do so when
160/// needed if that thread is delayed.
161/// - The tail pointer must not lag behind the head pointer.
162/// Otherwise, the algorithm cannot be certain about the removal of
163/// segment and would have to incur higher costs to ensure safe
164/// reclamation. Consumers must ensure that head never overtakes
165/// tail.
166///
167/// Memory Usage:
168/// - An empty queue contains one segment. A nonempty queue contains
169/// one or two more segment than fits its contents.
170/// - Removed segments are not reclaimed until there are no threads,
171/// producers or consumers, with references to them or their
172/// predecessors. That is, a lagging thread may delay the reclamation
173/// of a chain of removed segments.
174/// - The template parameter LgAlign can be used to reduce memory usage
175/// at the cost of increased chance of false sharing.
176///
177/// Performance considerations:
178/// - All operations take constant time, excluding the costs of
179/// allocation, reclamation, interference from other threads, and
180/// waiting for actions by other threads.
181/// - In general, using the single producer and or single consumer
182/// variants yield better performance than the MP and MC
183/// alternatives.
184/// - SPSC without blocking is the fastest configuration. It doesn't
185/// include any read-modify-write atomic operations, full fences, or
186/// system calls in the critical path.
187/// - MP adds a fetch_add to the critical path of each producer operation.
188/// - MC adds a fetch_add or compare_exchange to the critical path of
189/// each consumer operation.
190/// - The possibility of consumers blocking, even if they never do,
191/// adds a compare_exchange to the critical path of each producer
192/// operation.
193/// - MPMC, SPMC, MPSC require the use of a deferred reclamation
194/// mechanism to guarantee that segments removed from the linked
195/// list, i.e., unreachable from the head pointer, are reclaimed
196/// only after they are no longer needed by any lagging producers or
197/// consumers.
198/// - The overheads of segment allocation and reclamation are intended
199/// to be mostly out of the critical path of the queue's throughput.
200/// - If the template parameter LgSegmentSize is changed, it should be
201/// set adequately high to keep the amortized cost of allocation and
202/// reclamation low.
203/// - It is recommended to measure performance with different variants
204/// when applicable, e.g., UMPMC vs UMPSC. Depending on the use
205/// case, sometimes the variant with the higher sequential overhead
206/// may yield better results due to, for example, more favorable
207/// producer-consumer balance or favorable timing for avoiding
208/// costly blocking.
209
210template <
211 typename T,
212 bool SingleProducer,
213 bool SingleConsumer,
214 bool MayBlock,
215 size_t LgSegmentSize = 8,
216 size_t LgAlign = constexpr_log2(hardware_destructive_interference_size),
217 template <typename> class Atom = std::atomic>
218class UnboundedQueue {
219 using Ticket = uint64_t;
220 class Entry;
221 class Segment;
222
223 static constexpr bool SPSC = SingleProducer && SingleConsumer;
224 static constexpr size_t Stride = SPSC || (LgSegmentSize <= 1) ? 1 : 27;
225 static constexpr size_t SegmentSize = 1u << LgSegmentSize;
226 static constexpr size_t Align = 1u << LgAlign;
227
228 static_assert(
229 std::is_nothrow_destructible<T>::value,
230 "T must be nothrow_destructible");
231 static_assert((Stride & 1) == 1, "Stride must be odd");
232 static_assert(LgSegmentSize < 32, "LgSegmentSize must be < 32");
233 static_assert(LgAlign < 16, "LgAlign must be < 16");
234
235 using Sem = folly::SaturatingSemaphore<MayBlock, Atom>;
236
237 struct Consumer {
238 Atom<Segment*> head;
239 Atom<Ticket> ticket;
240 hazptr_obj_cohort<Atom> cohort;
241 explicit Consumer(Segment* s) : head(s), ticket(0) {
242 s->set_cohort_no_tag(&cohort); // defined in hazptr_obj
243 }
244 };
245 struct Producer {
246 Atom<Segment*> tail;
247 Atom<Ticket> ticket;
248 explicit Producer(Segment* s) : tail(s), ticket(0) {}
249 };
250
251 alignas(Align) Consumer c_;
252 alignas(Align) Producer p_;
253
254 public:
255 /** constructor */
256 UnboundedQueue()
257 : c_(new Segment(0)), p_(c_.head.load(std::memory_order_relaxed)) {}
258
259 /** destructor */
260 ~UnboundedQueue() {
261 cleanUpRemainingItems();
262 reclaimRemainingSegments();
263 }
264
265 /** enqueue */
266 FOLLY_ALWAYS_INLINE void enqueue(const T& arg) {
267 enqueueImpl(arg);
268 }
269
270 FOLLY_ALWAYS_INLINE void enqueue(T&& arg) {
271 enqueueImpl(std::move(arg));
272 }
273
274 /** dequeue */
275 FOLLY_ALWAYS_INLINE void dequeue(T& item) noexcept {
276 item = dequeueImpl();
277 }
278
279 FOLLY_ALWAYS_INLINE T dequeue() noexcept {
280 return dequeueImpl();
281 }
282
283 /** try_dequeue */
284 FOLLY_ALWAYS_INLINE bool try_dequeue(T& item) noexcept {
285 auto o = try_dequeue();
286 if (LIKELY(o.has_value())) {
287 item = std::move(*o);
288 return true;
289 }
290 return false;
291 }
292
293 FOLLY_ALWAYS_INLINE folly::Optional<T> try_dequeue() noexcept {
294 return tryDequeueUntil(std::chrono::steady_clock::time_point::min());
295 }
296
297 /** try_dequeue_until */
298 template <typename Clock, typename Duration>
299 FOLLY_ALWAYS_INLINE bool try_dequeue_until(
300 T& item,
301 const std::chrono::time_point<Clock, Duration>& deadline) noexcept {
302 folly::Optional<T> o = try_dequeue_until(deadline);
303
304 if (LIKELY(o.has_value())) {
305 item = std::move(*o);
306 return true;
307 }
308
309 return false;
310 }
311
312 template <typename Clock, typename Duration>
313 FOLLY_ALWAYS_INLINE folly::Optional<T> try_dequeue_until(
314 const std::chrono::time_point<Clock, Duration>& deadline) noexcept {
315 return tryDequeueUntil(deadline);
316 }
317
318 /** try_dequeue_for */
319 template <typename Rep, typename Period>
320 FOLLY_ALWAYS_INLINE bool try_dequeue_for(
321 T& item,
322 const std::chrono::duration<Rep, Period>& duration) noexcept {
323 folly::Optional<T> o = try_dequeue_for(duration);
324
325 if (LIKELY(o.has_value())) {
326 item = std::move(*o);
327 return true;
328 }
329
330 return false;
331 }
332
333 template <typename Rep, typename Period>
334 FOLLY_ALWAYS_INLINE folly::Optional<T> try_dequeue_for(
335 const std::chrono::duration<Rep, Period>& duration) noexcept {
336 folly::Optional<T> o = try_dequeue();
337 if (LIKELY(o.has_value())) {
338 return o;
339 }
340 return tryDequeueUntil(std::chrono::steady_clock::now() + duration);
341 }
342
343 /** try_peek */
344 FOLLY_ALWAYS_INLINE const T* try_peek() noexcept {
345 /* This function is supported only for USPSC and UMPSC queues. */
346 DCHECK(SingleConsumer);
347 return tryPeekUntil(std::chrono::steady_clock::time_point::min());
348 }
349
350 /** size */
351 size_t size() const noexcept {
352 auto p = producerTicket();
353 auto c = consumerTicket();
354 return p > c ? p - c : 0;
355 }
356
357 /** empty */
358 bool empty() const noexcept {
359 auto c = consumerTicket();
360 auto p = producerTicket();
361 return p <= c;
362 }
363
364 private:
365 /** enqueueImpl */
366 template <typename Arg>
367 FOLLY_ALWAYS_INLINE void enqueueImpl(Arg&& arg) {
368 if (SPSC) {
369 Segment* s = tail();
370 enqueueCommon(s, std::forward<Arg>(arg));
371 } else {
372 // Using hazptr_holder instead of hazptr_local because it is
373 // possible that the T ctor happens to use hazard pointers.
374 hazptr_holder<Atom> hptr;
375 Segment* s = hptr.get_protected(p_.tail);
376 enqueueCommon(s, std::forward<Arg>(arg));
377 }
378 }
379
380 /** enqueueCommon */
381 template <typename Arg>
382 FOLLY_ALWAYS_INLINE void enqueueCommon(Segment* s, Arg&& arg) {
383 Ticket t = fetchIncrementProducerTicket();
384 if (!SingleProducer) {
385 s = findSegment(s, t);
386 }
387 DCHECK_GE(t, s->minTicket());
388 DCHECK_LT(t, s->minTicket() + SegmentSize);
389 size_t idx = index(t);
390 Entry& e = s->entry(idx);
391 e.putItem(std::forward<Arg>(arg));
392 if (responsibleForAlloc(t)) {
393 allocNextSegment(s);
394 }
395 if (responsibleForAdvance(t)) {
396 advanceTail(s);
397 }
398 }
399
400 /** dequeueImpl */
401 FOLLY_ALWAYS_INLINE T dequeueImpl() noexcept {
402 if (SPSC) {
403 Segment* s = head();
404 return dequeueCommon(s);
405 } else {
406 // Using hazptr_holder instead of hazptr_local because it is
407 // possible to call the T dtor and it may happen to use hazard
408 // pointers.
409 hazptr_holder<Atom> hptr;
410 Segment* s = hptr.get_protected(c_.head);
411 return dequeueCommon(s);
412 }
413 }
414
415 /** dequeueCommon */
416 FOLLY_ALWAYS_INLINE T dequeueCommon(Segment* s) noexcept {
417 Ticket t = fetchIncrementConsumerTicket();
418 if (!SingleConsumer) {
419 s = findSegment(s, t);
420 }
421 size_t idx = index(t);
422 Entry& e = s->entry(idx);
423 auto res = e.takeItem();
424 if (responsibleForAdvance(t)) {
425 advanceHead(s);
426 }
427 return res;
428 }
429
430 /** tryDequeueUntil */
431 template <typename Clock, typename Duration>
432 FOLLY_ALWAYS_INLINE folly::Optional<T> tryDequeueUntil(
433 const std::chrono::time_point<Clock, Duration>& deadline) noexcept {
434 if (SingleConsumer) {
435 Segment* s = head();
436 return tryDequeueUntilSC(s, deadline);
437 } else {
438 // Using hazptr_holder instead of hazptr_local because it is
439 // possible to call ~T() and it may happen to use hazard pointers.
440 hazptr_holder<Atom> hptr;
441 Segment* s = hptr.get_protected(c_.head);
442 return tryDequeueUntilMC(s, deadline);
443 }
444 }
445
446 /** tryDequeueUntilSC */
447 template <typename Clock, typename Duration>
448 FOLLY_ALWAYS_INLINE folly::Optional<T> tryDequeueUntilSC(
449 Segment* s,
450 const std::chrono::time_point<Clock, Duration>& deadline) noexcept {
451 Ticket t = consumerTicket();
452 DCHECK_GE(t, s->minTicket());
453 DCHECK_LT(t, (s->minTicket() + SegmentSize));
454 size_t idx = index(t);
455 Entry& e = s->entry(idx);
456 if (UNLIKELY(!tryDequeueWaitElem(e, t, deadline))) {
457 return folly::Optional<T>();
458 }
459 setConsumerTicket(t + 1);
460 folly::Optional<T> ret = e.takeItem();
461 if (responsibleForAdvance(t)) {
462 advanceHead(s);
463 }
464 return ret;
465 }
466
467 /** tryDequeueUntilMC */
468 template <typename Clock, typename Duration>
469 FOLLY_ALWAYS_INLINE folly::Optional<T> tryDequeueUntilMC(
470 Segment* s,
471 const std::chrono::time_point<Clock, Duration>& deadline) noexcept {
472 while (true) {
473 Ticket t = consumerTicket();
474 if (UNLIKELY(t >= (s->minTicket() + SegmentSize))) {
475 s = getAllocNextSegment(s, t);
476 DCHECK(s);
477 continue;
478 }
479 size_t idx = index(t);
480 Entry& e = s->entry(idx);
481 if (UNLIKELY(!tryDequeueWaitElem(e, t, deadline))) {
482 return folly::Optional<T>();
483 }
484 if (!c_.ticket.compare_exchange_weak(
485 t, t + 1, std::memory_order_acq_rel, std::memory_order_acquire)) {
486 continue;
487 }
488 folly::Optional<T> ret = e.takeItem();
489 if (responsibleForAdvance(t)) {
490 advanceHead(s);
491 }
492 return ret;
493 }
494 }
495
496 /** tryDequeueWaitElem */
497 template <typename Clock, typename Duration>
498 FOLLY_ALWAYS_INLINE bool tryDequeueWaitElem(
499 Entry& e,
500 Ticket t,
501 const std::chrono::time_point<Clock, Duration>& deadline) noexcept {
502 if (LIKELY(e.tryWaitUntil(deadline))) {
503 return true;
504 }
505 return t < producerTicket();
506 }
507
508 /** tryPeekUntil */
509 template <typename Clock, typename Duration>
510 FOLLY_ALWAYS_INLINE const T* tryPeekUntil(
511 const std::chrono::time_point<Clock, Duration>& deadline) noexcept {
512 Segment* s = head();
513 Ticket t = consumerTicket();
514 DCHECK_GE(t, s->minTicket());
515 DCHECK_LT(t, (s->minTicket() + SegmentSize));
516 size_t idx = index(t);
517 Entry& e = s->entry(idx);
518 if (UNLIKELY(!tryDequeueWaitElem(e, t, deadline))) {
519 return nullptr;
520 }
521 return e.peekItem();
522 }
523
524 /** findSegment */
525 FOLLY_ALWAYS_INLINE
526 Segment* findSegment(Segment* s, const Ticket t) noexcept {
527 while (UNLIKELY(t >= (s->minTicket() + SegmentSize))) {
528 s = getAllocNextSegment(s, t);
529 DCHECK(s);
530 }
531 return s;
532 }
533
534 /** getAllocNextSegment */
535 Segment* getAllocNextSegment(Segment* s, Ticket t) noexcept {
536 Segment* next = s->nextSegment();
537 if (!next) {
538 DCHECK_GE(t, s->minTicket() + SegmentSize);
539 auto diff = t - (s->minTicket() + SegmentSize);
540 if (diff > 0) {
541 auto dur = std::chrono::microseconds(diff);
542 auto deadline = std::chrono::steady_clock::now() + dur;
543 WaitOptions opt;
544 opt.spin_max(dur);
545 detail::spin_pause_until(
546 deadline, opt, [s] { return s->nextSegment(); });
547 next = s->nextSegment();
548 if (next) {
549 return next;
550 }
551 }
552 next = allocNextSegment(s);
553 }
554 DCHECK(next);
555 return next;
556 }
557
558 /** allocNextSegment */
559 Segment* allocNextSegment(Segment* s) {
560 auto t = s->minTicket() + SegmentSize;
561 Segment* next = new Segment(t);
562 next->set_cohort_no_tag(&c_.cohort); // defined in hazptr_obj
563 next->acquire_ref_safe(); // defined in hazptr_obj_base_linked
564 if (!s->casNextSegment(next)) {
565 delete next;
566 next = s->nextSegment();
567 }
568 DCHECK(next);
569 return next;
570 }
571
572 /** advanceTail */
573 void advanceTail(Segment* s) noexcept {
574 if (SPSC) {
575 Segment* next = s->nextSegment();
576 DCHECK(next);
577 setTail(next);
578 } else {
579 Ticket t = s->minTicket() + SegmentSize;
580 advanceTailToTicket(t);
581 }
582 }
583
584 /** advanceTailToTicket */
585 void advanceTailToTicket(Ticket t) noexcept {
586 Segment* s = tail();
587 while (s->minTicket() < t) {
588 Segment* next = s->nextSegment();
589 if (!next) {
590 next = allocNextSegment(s);
591 }
592 DCHECK(next);
593 casTail(s, next);
594 s = tail();
595 }
596 }
597
598 /** advanceHead */
599 void advanceHead(Segment* s) noexcept {
600 if (SPSC) {
601 while (tail() == s) {
602 /* Wait for producer to advance tail. */
603 asm_volatile_pause();
604 }
605 Segment* next = s->nextSegment();
606 DCHECK(next);
607 setHead(next);
608 reclaimSegment(s);
609 } else {
610 Ticket t = s->minTicket() + SegmentSize;
611 advanceHeadToTicket(t);
612 }
613 }
614
615 /** advanceHeadToTicket */
616 void advanceHeadToTicket(Ticket t) noexcept {
617 /* Tail must not lag behind head. Otherwise, the algorithm cannot
618 be certain about removal of segments. */
619 advanceTailToTicket(t);
620 Segment* s = head();
621 if (SingleConsumer) {
622 DCHECK_EQ(s->minTicket() + SegmentSize, t);
623 Segment* next = s->nextSegment();
624 DCHECK(next);
625 setHead(next);
626 reclaimSegment(s);
627 } else {
628 while (s->minTicket() < t) {
629 Segment* next = s->nextSegment();
630 DCHECK(next);
631 if (casHead(s, next)) {
632 reclaimSegment(s);
633 s = next;
634 }
635 }
636 }
637 }
638
639 /** reclaimSegment */
640 void reclaimSegment(Segment* s) noexcept {
641 if (SPSC) {
642 delete s;
643 } else {
644 s->retire(); // defined in hazptr_obj_base_linked
645 }
646 }
647
648 /** cleanUpRemainingItems */
649 void cleanUpRemainingItems() {
650 auto end = producerTicket();
651 auto s = head();
652 for (auto t = consumerTicket(); t < end; ++t) {
653 if (t >= s->minTicket() + SegmentSize) {
654 s = s->nextSegment();
655 }
656 DCHECK_LT(t, (s->minTicket() + SegmentSize));
657 auto idx = index(t);
658 auto& e = s->entry(idx);
659 e.destroyItem();
660 }
661 }
662
663 /** reclaimRemainingSegments */
664 void reclaimRemainingSegments() {
665 auto h = head();
666 auto s = h->nextSegment();
667 h->setNextSegment(nullptr);
668 reclaimSegment(h);
669 while (s) {
670 auto next = s->nextSegment();
671 delete s;
672 s = next;
673 }
674 }
675
676 FOLLY_ALWAYS_INLINE size_t index(Ticket t) const noexcept {
677 return (t * Stride) & (SegmentSize - 1);
678 }
679
680 FOLLY_ALWAYS_INLINE bool responsibleForAlloc(Ticket t) const noexcept {
681 return (t & (SegmentSize - 1)) == 0;
682 }
683
684 FOLLY_ALWAYS_INLINE bool responsibleForAdvance(Ticket t) const noexcept {
685 return (t & (SegmentSize - 1)) == (SegmentSize - 1);
686 }
687
688 FOLLY_ALWAYS_INLINE Segment* head() const noexcept {
689 return c_.head.load(std::memory_order_acquire);
690 }
691
692 FOLLY_ALWAYS_INLINE Segment* tail() const noexcept {
693 return p_.tail.load(std::memory_order_acquire);
694 }
695
696 FOLLY_ALWAYS_INLINE Ticket producerTicket() const noexcept {
697 return p_.ticket.load(std::memory_order_acquire);
698 }
699
700 FOLLY_ALWAYS_INLINE Ticket consumerTicket() const noexcept {
701 return c_.ticket.load(std::memory_order_acquire);
702 }
703
704 void setHead(Segment* s) noexcept {
705 DCHECK(SingleConsumer);
706 c_.head.store(s, std::memory_order_relaxed);
707 }
708
709 void setTail(Segment* s) noexcept {
710 DCHECK(SPSC);
711 p_.tail.store(s, std::memory_order_release);
712 }
713
714 bool casHead(Segment*& s, Segment* next) noexcept {
715 DCHECK(!SingleConsumer);
716 return c_.head.compare_exchange_strong(
717 s, next, std::memory_order_release, std::memory_order_acquire);
718 }
719
720 void casTail(Segment*& s, Segment* next) noexcept {
721 DCHECK(!SPSC);
722 p_.tail.compare_exchange_strong(
723 s, next, std::memory_order_release, std::memory_order_relaxed);
724 }
725
726 FOLLY_ALWAYS_INLINE void setProducerTicket(Ticket t) noexcept {
727 p_.ticket.store(t, std::memory_order_release);
728 }
729
730 FOLLY_ALWAYS_INLINE void setConsumerTicket(Ticket t) noexcept {
731 c_.ticket.store(t, std::memory_order_release);
732 }
733
734 FOLLY_ALWAYS_INLINE Ticket fetchIncrementConsumerTicket() noexcept {
735 if (SingleConsumer) {
736 Ticket oldval = consumerTicket();
737 setConsumerTicket(oldval + 1);
738 return oldval;
739 } else { // MC
740 return c_.ticket.fetch_add(1, std::memory_order_acq_rel);
741 }
742 }
743
744 FOLLY_ALWAYS_INLINE Ticket fetchIncrementProducerTicket() noexcept {
745 if (SingleProducer) {
746 Ticket oldval = producerTicket();
747 setProducerTicket(oldval + 1);
748 return oldval;
749 } else { // MP
750 return p_.ticket.fetch_add(1, std::memory_order_acq_rel);
751 }
752 }
753
754 /**
755 * Entry
756 */
757 class Entry {
758 Sem flag_;
759 aligned_storage_for_t<T> item_;
760
761 public:
762 template <typename Arg>
763 FOLLY_ALWAYS_INLINE void putItem(Arg&& arg) {
764 new (&item_) T(std::forward<Arg>(arg));
765 flag_.post();
766 }
767
768 FOLLY_ALWAYS_INLINE T takeItem() noexcept {
769 flag_.wait();
770 return getItem();
771 }
772
773 FOLLY_ALWAYS_INLINE const T* peekItem() noexcept {
774 flag_.wait();
775 return itemPtr();
776 }
777
778 template <typename Clock, typename Duration>
779 FOLLY_EXPORT FOLLY_ALWAYS_INLINE bool tryWaitUntil(
780 const std::chrono::time_point<Clock, Duration>& deadline) noexcept {
781 // wait-options from benchmarks on contended queues:
782 static constexpr auto const opt =
783 Sem::wait_options().spin_max(std::chrono::microseconds(10));
784 return flag_.try_wait_until(deadline, opt);
785 }
786
787 FOLLY_ALWAYS_INLINE void destroyItem() noexcept {
788 itemPtr()->~T();
789 }
790
791 private:
792 FOLLY_ALWAYS_INLINE T getItem() noexcept {
793 T ret = std::move(*(itemPtr()));
794 destroyItem();
795 return ret;
796 }
797
798 FOLLY_ALWAYS_INLINE T* itemPtr() noexcept {
799 return static_cast<T*>(static_cast<void*>(&item_));
800 }
801 }; // Entry
802
803 /**
804 * Segment
805 */
806 class Segment : public hazptr_obj_base_linked<Segment, Atom> {
807 Atom<Segment*> next_{nullptr};
808 const Ticket min_;
809 alignas(Align) Entry b_[SegmentSize];
810
811 public:
812 explicit Segment(const Ticket t) noexcept : min_(t) {}
813
814 Segment* nextSegment() const noexcept {
815 return next_.load(std::memory_order_acquire);
816 }
817
818 void setNextSegment(Segment* next) {
819 next_.store(next, std::memory_order_relaxed);
820 }
821
822 bool casNextSegment(Segment* next) noexcept {
823 Segment* expected = nullptr;
824 return next_.compare_exchange_strong(
825 expected, next, std::memory_order_release, std::memory_order_relaxed);
826 }
827
828 FOLLY_ALWAYS_INLINE Ticket minTicket() const noexcept {
829 DCHECK_EQ((min_ & (SegmentSize - 1)), Ticket(0));
830 return min_;
831 }
832
833 FOLLY_ALWAYS_INLINE Entry& entry(size_t index) noexcept {
834 return b_[index];
835 }
836
837 template <typename S>
838 void push_links(bool m, S& s) {
839 if (m == false) { // next_ is immutable
840 auto p = nextSegment();
841 if (p) {
842 s.push(p);
843 }
844 }
845 }
846 }; // Segment
847
848}; // UnboundedQueue
849
850/* Aliases */
851
852template <
853 typename T,
854 bool MayBlock,
855 size_t LgSegmentSize = 8,
856 size_t LgAlign = constexpr_log2(hardware_destructive_interference_size),
857 template <typename> class Atom = std::atomic>
858using USPSCQueue =
859 UnboundedQueue<T, true, true, MayBlock, LgSegmentSize, LgAlign, Atom>;
860
861template <
862 typename T,
863 bool MayBlock,
864 size_t LgSegmentSize = 8,
865 size_t LgAlign = constexpr_log2(hardware_destructive_interference_size),
866 template <typename> class Atom = std::atomic>
867using UMPSCQueue =
868 UnboundedQueue<T, false, true, MayBlock, LgSegmentSize, LgAlign, Atom>;
869
870template <
871 typename T,
872 bool MayBlock,
873 size_t LgSegmentSize = 8,
874 size_t LgAlign = constexpr_log2(hardware_destructive_interference_size),
875 template <typename> class Atom = std::atomic>
876using USPMCQueue =
877 UnboundedQueue<T, true, false, MayBlock, LgSegmentSize, LgAlign, Atom>;
878
879template <
880 typename T,
881 bool MayBlock,
882 size_t LgSegmentSize = 8,
883 size_t LgAlign = constexpr_log2(hardware_destructive_interference_size),
884 template <typename> class Atom = std::atomic>
885using UMPMCQueue =
886 UnboundedQueue<T, false, false, MayBlock, LgSegmentSize, LgAlign, Atom>;
887
888} // namespace folly
889