1/*
2 * Copyright (c) Facebook, Inc. and its affiliates.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#pragma once
18
19#include <algorithm>
20#include <atomic>
21#include <cassert>
22#include <cstring>
23#include <limits>
24#include <type_traits>
25
26#include <folly/Traits.h>
27#include <folly/concurrency/CacheLocality.h>
28#include <folly/detail/TurnSequencer.h>
29#include <folly/portability/Unistd.h>
30
31namespace folly {
32
33namespace detail {
34
35template <typename T, template <typename> class Atom>
36struct SingleElementQueue;
37
38template <typename T>
39class MPMCPipelineStageImpl;
40
41/// MPMCQueue base CRTP template
42template <typename>
43class MPMCQueueBase;
44
45} // namespace detail
46
47/// MPMCQueue<T> is a high-performance bounded concurrent queue that
48/// supports multiple producers, multiple consumers, and optional blocking.
49/// The queue has a fixed capacity, for which all memory will be allocated
50/// up front. The bulk of the work of enqueuing and dequeuing can be
51/// performed in parallel.
52///
53/// MPMCQueue is linearizable. That means that if a call to write(A)
54/// returns before a call to write(B) begins, then A will definitely end up
55/// in the queue before B, and if a call to read(X) returns before a call
56/// to read(Y) is started, that X will be something from earlier in the
57/// queue than Y. This also means that if a read call returns a value, you
58/// can be sure that all previous elements of the queue have been assigned
59/// a reader (that reader might not yet have returned, but it exists).
60///
61/// The underlying implementation uses a ticket dispenser for the head and
62/// the tail, spreading accesses across N single-element queues to produce
63/// a queue with capacity N. The ticket dispensers use atomic increment,
64/// which is more robust to contention than a CAS loop. Each of the
65/// single-element queues uses its own CAS to serialize access, with an
66/// adaptive spin cutoff. When spinning fails on a single-element queue
67/// it uses futex()'s _BITSET operations to reduce unnecessary wakeups
68/// even if multiple waiters are present on an individual queue (such as
69/// when the MPMCQueue's capacity is smaller than the number of enqueuers
70/// or dequeuers).
71///
72/// In benchmarks (contained in tao/queues/ConcurrentQueueTests)
73/// it handles 1 to 1, 1 to N, N to 1, and N to M thread counts better
74/// than any of the alternatives present in fbcode, for both small (~10)
75/// and large capacities. In these benchmarks it is also faster than
76/// tbb::concurrent_bounded_queue for all configurations. When there are
77/// many more threads than cores, MPMCQueue is _much_ faster than the tbb
78/// queue because it uses futex() to block and unblock waiting threads,
79/// rather than spinning with sched_yield.
80///
81/// NOEXCEPT INTERACTION: tl;dr; If it compiles you're fine. Ticket-based
82/// queues separate the assignment of queue positions from the actual
83/// construction of the in-queue elements, which means that the T
84/// constructor used during enqueue must not throw an exception. This is
85/// enforced at compile time using type traits, which requires that T be
86/// adorned with accurate noexcept information. If your type does not
87/// use noexcept, you will have to wrap it in something that provides
88/// the guarantee. We provide an alternate safe implementation for types
89/// that don't use noexcept but that are marked folly::IsRelocatable
90/// and std::is_nothrow_constructible, which is common for folly types.
91/// In particular, if you can declare FOLLY_ASSUME_FBVECTOR_COMPATIBLE
92/// then your type can be put in MPMCQueue.
93///
94/// If you have a pool of N queue consumers that you want to shut down
95/// after the queue has drained, one way is to enqueue N sentinel values
96/// to the queue. If the producer doesn't know how many consumers there
97/// are you can enqueue one sentinel and then have each consumer requeue
98/// two sentinels after it receives it (by requeuing 2 the shutdown can
99/// complete in O(log P) time instead of O(P)).
100template <
101 typename T,
102 template <typename> class Atom = std::atomic,
103 bool Dynamic = false>
104class MPMCQueue : public detail::MPMCQueueBase<MPMCQueue<T, Atom, Dynamic>> {
105 friend class detail::MPMCPipelineStageImpl<T>;
106 using Slot = detail::SingleElementQueue<T, Atom>;
107
108 public:
109 explicit MPMCQueue(size_t queueCapacity)
110 : detail::MPMCQueueBase<MPMCQueue<T, Atom, Dynamic>>(queueCapacity) {
111 this->stride_ = this->computeStride(queueCapacity);
112 this->slots_ = new Slot[queueCapacity + 2 * this->kSlotPadding];
113 }
114
115 MPMCQueue() noexcept {}
116};
117
118/// *** The dynamic version of MPMCQueue is deprecated. ***
119/// Use UnboundedQueue instead.
120
121/// The dynamic version of MPMCQueue allows dynamic expansion of queue
122/// capacity, such that a queue may start with a smaller capacity than
123/// specified and expand only if needed. Users may optionally specify
124/// the initial capacity and the expansion multiplier.
125///
126/// The design uses a seqlock to enforce mutual exclusion among
127/// expansion attempts. Regular operations read up-to-date queue
128/// information (slots array, capacity, stride) inside read-only
129/// seqlock sections, which are unimpeded when no expansion is in
130/// progress.
131///
132/// An expansion computes a new capacity, allocates a new slots array,
133/// and updates stride. No information needs to be copied from the
134/// current slots array to the new one. When this happens, new slots
135/// will not have sequence numbers that match ticket numbers. The
136/// expansion needs to compute a ticket offset such that operations
137/// that use new arrays can adjust the calculations of slot indexes
138/// and sequence numbers that take into account that the new slots
139/// start with sequence numbers of zero. The current ticket offset is
140/// packed with the seqlock in an atomic 64-bit integer. The initial
141/// offset is zero.
142///
143/// Lagging write and read operations with tickets lower than the
144/// ticket offset of the current slots array (i.e., the minimum ticket
145/// number that can be served by the current array) must use earlier
146/// closed arrays instead of the current one. Information about closed
147/// slots arrays (array address, capacity, stride, and offset) is
148/// maintained in a logarithmic-sized structure. Each entry in that
149/// structure never needs to be changed once set. The number of closed
150/// arrays is half the value of the seqlock (when unlocked).
151///
152/// The acquisition of the seqlock to perform an expansion does not
153/// prevent the issuing of new push and pop tickets concurrently. The
154/// expansion must set the new ticket offset to a value that couldn't
155/// have been issued to an operation that has already gone through a
156/// seqlock read-only section (and hence obtained information for
157/// older closed arrays).
158///
159/// Note that the total queue capacity can temporarily exceed the
160/// specified capacity when there are lagging consumers that haven't
161/// yet consumed all the elements in closed arrays. Users should not
162/// rely on the capacity of dynamic queues for synchronization, e.g.,
163/// they should not expect that a thread will definitely block on a
164/// call to blockingWrite() when the queue size is known to be equal
165/// to its capacity.
166///
167/// Note that some writeIfNotFull() and tryWriteUntil() operations may
168/// fail even if the size of the queue is less than its maximum
169/// capacity and despite the success of expansion, if the operation
170/// happens to acquire a ticket that belongs to a closed array. This
171/// is a transient condition. Typically, one or two ticket values may
172/// be subject to such condition per expansion.
173///
174/// The dynamic version is a partial specialization of MPMCQueue with
175/// Dynamic == true
176template <typename T, template <typename> class Atom>
177class MPMCQueue<T, Atom, true>
178 : public detail::MPMCQueueBase<MPMCQueue<T, Atom, true>> {
179 friend class detail::MPMCQueueBase<MPMCQueue<T, Atom, true>>;
180 using Slot = detail::SingleElementQueue<T, Atom>;
181
182 struct ClosedArray {
183 uint64_t offset_{0};
184 Slot* slots_{nullptr};
185 size_t capacity_{0};
186 int stride_{0};
187 };
188
189 public:
190 explicit MPMCQueue(size_t queueCapacity)
191 : detail::MPMCQueueBase<MPMCQueue<T, Atom, true>>(queueCapacity) {
192 size_t cap = std::min<size_t>(kDefaultMinDynamicCapacity, queueCapacity);
193 initQueue(cap, kDefaultExpansionMultiplier);
194 }
195
196 explicit MPMCQueue(
197 size_t queueCapacity,
198 size_t minCapacity,
199 size_t expansionMultiplier)
200 : detail::MPMCQueueBase<MPMCQueue<T, Atom, true>>(queueCapacity) {
201 minCapacity = std::max<size_t>(1, minCapacity);
202 size_t cap = std::min<size_t>(minCapacity, queueCapacity);
203 expansionMultiplier = std::max<size_t>(2, expansionMultiplier);
204 initQueue(cap, expansionMultiplier);
205 }
206
207 MPMCQueue() noexcept {
208 dmult_ = 0;
209 closed_ = nullptr;
210 }
211
212 MPMCQueue(MPMCQueue<T, Atom, true>&& rhs) noexcept {
213 this->capacity_ = rhs.capacity_;
214 new (&this->dslots_)
215 Atom<Slot*>(rhs.dslots_.load(std::memory_order_relaxed));
216 new (&this->dstride_)
217 Atom<int>(rhs.dstride_.load(std::memory_order_relaxed));
218 this->dstate_.store(
219 rhs.dstate_.load(std::memory_order_relaxed), std::memory_order_relaxed);
220 this->dcapacity_.store(
221 rhs.dcapacity_.load(std::memory_order_relaxed),
222 std::memory_order_relaxed);
223 this->pushTicket_.store(
224 rhs.pushTicket_.load(std::memory_order_relaxed),
225 std::memory_order_relaxed);
226 this->popTicket_.store(
227 rhs.popTicket_.load(std::memory_order_relaxed),
228 std::memory_order_relaxed);
229 this->pushSpinCutoff_.store(
230 rhs.pushSpinCutoff_.load(std::memory_order_relaxed),
231 std::memory_order_relaxed);
232 this->popSpinCutoff_.store(
233 rhs.popSpinCutoff_.load(std::memory_order_relaxed),
234 std::memory_order_relaxed);
235 dmult_ = rhs.dmult_;
236 closed_ = rhs.closed_;
237
238 rhs.capacity_ = 0;
239 rhs.dslots_.store(nullptr, std::memory_order_relaxed);
240 rhs.dstride_.store(0, std::memory_order_relaxed);
241 rhs.dstate_.store(0, std::memory_order_relaxed);
242 rhs.dcapacity_.store(0, std::memory_order_relaxed);
243 rhs.pushTicket_.store(0, std::memory_order_relaxed);
244 rhs.popTicket_.store(0, std::memory_order_relaxed);
245 rhs.pushSpinCutoff_.store(0, std::memory_order_relaxed);
246 rhs.popSpinCutoff_.store(0, std::memory_order_relaxed);
247 rhs.dmult_ = 0;
248 rhs.closed_ = nullptr;
249 }
250
251 MPMCQueue<T, Atom, true> const& operator=(MPMCQueue<T, Atom, true>&& rhs) {
252 if (this != &rhs) {
253 this->~MPMCQueue();
254 new (this) MPMCQueue(std::move(rhs));
255 }
256 return *this;
257 }
258
259 ~MPMCQueue() {
260 if (closed_ != nullptr) {
261 for (int i = getNumClosed(this->dstate_.load()) - 1; i >= 0; --i) {
262 delete[] closed_[i].slots_;
263 }
264 delete[] closed_;
265 }
266 using AtomInt = Atom<int>;
267 this->dstride_.~AtomInt();
268 using AtomSlot = Atom<Slot*>;
269 // Sort of a hack to get ~MPMCQueueBase to free dslots_
270 auto slots = this->dslots_.load();
271 this->dslots_.~AtomSlot();
272 this->slots_ = slots;
273 }
274
275 size_t allocatedCapacity() const noexcept {
276 return this->dcapacity_.load(std::memory_order_relaxed);
277 }
278
279 template <typename... Args>
280 void blockingWrite(Args&&... args) noexcept {
281 uint64_t ticket = this->pushTicket_++;
282 Slot* slots;
283 size_t cap;
284 int stride;
285 uint64_t state;
286 uint64_t offset;
287 do {
288 if (!trySeqlockReadSection(state, slots, cap, stride)) {
289 asm_volatile_pause();
290 continue;
291 }
292 if (maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride)) {
293 // There was an expansion after this ticket was issued.
294 break;
295 }
296 if (slots[this->idx((ticket - offset), cap, stride)].mayEnqueue(
297 this->turn(ticket - offset, cap))) {
298 // A slot is ready. No need to expand.
299 break;
300 } else if (
301 this->popTicket_.load(std::memory_order_relaxed) + cap > ticket) {
302 // May block, but a pop is in progress. No need to expand.
303 // Get seqlock read section info again in case an expansion
304 // occurred with an equal or higher ticket.
305 continue;
306 } else {
307 // May block. See if we can expand.
308 if (tryExpand(state, cap)) {
309 // This or another thread started an expansion. Get updated info.
310 continue;
311 } else {
312 // Can't expand.
313 break;
314 }
315 }
316 } while (true);
317 this->enqueueWithTicketBase(
318 ticket - offset, slots, cap, stride, std::forward<Args>(args)...);
319 }
320
321 void blockingReadWithTicket(uint64_t& ticket, T& elem) noexcept {
322 ticket = this->popTicket_++;
323 Slot* slots;
324 size_t cap;
325 int stride;
326 uint64_t state;
327 uint64_t offset;
328 while (!trySeqlockReadSection(state, slots, cap, stride)) {
329 asm_volatile_pause();
330 }
331 // If there was an expansion after the corresponding push ticket
332 // was issued, adjust accordingly
333 maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride);
334 this->dequeueWithTicketBase(ticket - offset, slots, cap, stride, elem);
335 }
336
337 private:
338 enum {
339 kSeqlockBits = 6,
340 kDefaultMinDynamicCapacity = 10,
341 kDefaultExpansionMultiplier = 10,
342 };
343
344 size_t dmult_;
345
346 // Info about closed slots arrays for use by lagging operations
347 ClosedArray* closed_;
348
349 void initQueue(const size_t cap, const size_t mult) {
350 new (&this->dstride_) Atom<int>(this->computeStride(cap));
351 Slot* slots = new Slot[cap + 2 * this->kSlotPadding];
352 new (&this->dslots_) Atom<Slot*>(slots);
353 this->dstate_.store(0);
354 this->dcapacity_.store(cap);
355 dmult_ = mult;
356 size_t maxClosed = 0;
357 for (size_t expanded = cap; expanded < this->capacity_; expanded *= mult) {
358 ++maxClosed;
359 }
360 closed_ = (maxClosed > 0) ? new ClosedArray[maxClosed] : nullptr;
361 }
362
363 bool tryObtainReadyPushTicket(
364 uint64_t& ticket,
365 Slot*& slots,
366 size_t& cap,
367 int& stride) noexcept {
368 uint64_t state;
369 do {
370 ticket = this->pushTicket_.load(std::memory_order_acquire); // A
371 if (!trySeqlockReadSection(state, slots, cap, stride)) {
372 asm_volatile_pause();
373 continue;
374 }
375
376 // If there was an expansion with offset greater than this ticket,
377 // adjust accordingly
378 uint64_t offset;
379 maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride);
380
381 if (slots[this->idx((ticket - offset), cap, stride)].mayEnqueue(
382 this->turn(ticket - offset, cap))) {
383 // A slot is ready.
384 if (this->pushTicket_.compare_exchange_strong(ticket, ticket + 1)) {
385 // Adjust ticket
386 ticket -= offset;
387 return true;
388 } else {
389 continue;
390 }
391 } else {
392 if (ticket != this->pushTicket_.load(std::memory_order_relaxed)) { // B
393 // Try again. Ticket changed.
394 continue;
395 }
396 // Likely to block.
397 // Try to expand unless the ticket is for a closed array
398 if (offset == getOffset(state)) {
399 if (tryExpand(state, cap)) {
400 // This or another thread started an expansion. Get up-to-date info.
401 continue;
402 }
403 }
404 return false;
405 }
406 } while (true);
407 }
408
409 bool tryObtainPromisedPushTicket(
410 uint64_t& ticket,
411 Slot*& slots,
412 size_t& cap,
413 int& stride) noexcept {
414 uint64_t state;
415 do {
416 ticket = this->pushTicket_.load(std::memory_order_acquire);
417 auto numPops = this->popTicket_.load(std::memory_order_acquire);
418 if (!trySeqlockReadSection(state, slots, cap, stride)) {
419 asm_volatile_pause();
420 continue;
421 }
422
423 const auto curCap = cap;
424 // If there was an expansion with offset greater than this ticket,
425 // adjust accordingly
426 uint64_t offset;
427 maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride);
428
429 int64_t n = ticket - numPops;
430
431 if (n >= static_cast<ssize_t>(cap)) {
432 if ((cap == curCap) && tryExpand(state, cap)) {
433 // This or another thread started an expansion. Start over.
434 continue;
435 }
436 // Can't expand.
437 ticket -= offset;
438 return false;
439 }
440
441 if (this->pushTicket_.compare_exchange_strong(ticket, ticket + 1)) {
442 // Adjust ticket
443 ticket -= offset;
444 return true;
445 }
446 } while (true);
447 }
448
449 bool tryObtainReadyPopTicket(
450 uint64_t& ticket,
451 Slot*& slots,
452 size_t& cap,
453 int& stride) noexcept {
454 uint64_t state;
455 do {
456 ticket = this->popTicket_.load(std::memory_order_relaxed);
457 if (!trySeqlockReadSection(state, slots, cap, stride)) {
458 asm_volatile_pause();
459 continue;
460 }
461
462 // If there was an expansion after the corresponding push ticket
463 // was issued, adjust accordingly
464 uint64_t offset;
465 maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride);
466
467 if (slots[this->idx((ticket - offset), cap, stride)].mayDequeue(
468 this->turn(ticket - offset, cap))) {
469 if (this->popTicket_.compare_exchange_strong(ticket, ticket + 1)) {
470 // Adjust ticket
471 ticket -= offset;
472 return true;
473 }
474 } else {
475 return false;
476 }
477 } while (true);
478 }
479
480 bool tryObtainPromisedPopTicket(
481 uint64_t& ticket,
482 Slot*& slots,
483 size_t& cap,
484 int& stride) noexcept {
485 uint64_t state;
486 do {
487 ticket = this->popTicket_.load(std::memory_order_acquire);
488 auto numPushes = this->pushTicket_.load(std::memory_order_acquire);
489 if (!trySeqlockReadSection(state, slots, cap, stride)) {
490 asm_volatile_pause();
491 continue;
492 }
493
494 uint64_t offset;
495 // If there was an expansion after the corresponding push
496 // ticket was issued, adjust accordingly
497 maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride);
498
499 if (ticket >= numPushes) {
500 ticket -= offset;
501 return false;
502 }
503 if (this->popTicket_.compare_exchange_strong(ticket, ticket + 1)) {
504 ticket -= offset;
505 return true;
506 }
507 } while (true);
508 }
509
510 /// Enqueues an element with a specific ticket number
511 template <typename... Args>
512 void enqueueWithTicket(const uint64_t ticket, Args&&... args) noexcept {
513 Slot* slots;
514 size_t cap;
515 int stride;
516 uint64_t state;
517 uint64_t offset;
518
519 while (!trySeqlockReadSection(state, slots, cap, stride)) {
520 }
521
522 // If there was an expansion after this ticket was issued, adjust
523 // accordingly
524 maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride);
525
526 this->enqueueWithTicketBase(
527 ticket - offset, slots, cap, stride, std::forward<Args>(args)...);
528 }
529
530 uint64_t getOffset(const uint64_t state) const noexcept {
531 return state >> kSeqlockBits;
532 }
533
534 int getNumClosed(const uint64_t state) const noexcept {
535 return (state & ((1 << kSeqlockBits) - 1)) >> 1;
536 }
537
538 /// Try to expand the queue. Returns true if this expansion was
539 /// successful or a concurent expansion is in progress. Returns
540 /// false if the queue has reached its maximum capacity or
541 /// allocation has failed.
542 bool tryExpand(const uint64_t state, const size_t cap) noexcept {
543 if (cap == this->capacity_) {
544 return false;
545 }
546 // Acquire seqlock
547 uint64_t oldval = state;
548 assert((state & 1) == 0);
549 if (this->dstate_.compare_exchange_strong(oldval, state + 1)) {
550 assert(cap == this->dcapacity_.load());
551 uint64_t ticket =
552 1 + std::max(this->pushTicket_.load(), this->popTicket_.load());
553 size_t newCapacity = std::min(dmult_ * cap, this->capacity_);
554 Slot* newSlots =
555 new (std::nothrow) Slot[newCapacity + 2 * this->kSlotPadding];
556 if (newSlots == nullptr) {
557 // Expansion failed. Restore the seqlock
558 this->dstate_.store(state);
559 return false;
560 }
561 // Successful expansion
562 // calculate the current ticket offset
563 uint64_t offset = getOffset(state);
564 // calculate index in closed array
565 int index = getNumClosed(state);
566 assert((index << 1) < (1 << kSeqlockBits));
567 // fill the info for the closed slots array
568 closed_[index].offset_ = offset;
569 closed_[index].slots_ = this->dslots_.load();
570 closed_[index].capacity_ = cap;
571 closed_[index].stride_ = this->dstride_.load();
572 // update the new slots array info
573 this->dslots_.store(newSlots);
574 this->dcapacity_.store(newCapacity);
575 this->dstride_.store(this->computeStride(newCapacity));
576 // Release the seqlock and record the new ticket offset
577 this->dstate_.store((ticket << kSeqlockBits) + (2 * (index + 1)));
578 return true;
579 } else { // failed to acquire seqlock
580 // Someone acaquired the seqlock. Go back to the caller and get
581 // up-to-date info.
582 return true;
583 }
584 }
585
586 /// Seqlock read-only section
587 bool trySeqlockReadSection(
588 uint64_t& state,
589 Slot*& slots,
590 size_t& cap,
591 int& stride) noexcept {
592 state = this->dstate_.load(std::memory_order_acquire);
593 if (state & 1) {
594 // Locked.
595 return false;
596 }
597 // Start read-only section.
598 slots = this->dslots_.load(std::memory_order_relaxed);
599 cap = this->dcapacity_.load(std::memory_order_relaxed);
600 stride = this->dstride_.load(std::memory_order_relaxed);
601 // End of read-only section. Validate seqlock.
602 std::atomic_thread_fence(std::memory_order_acquire);
603 return (state == this->dstate_.load(std::memory_order_relaxed));
604 }
605
606 /// If there was an expansion after ticket was issued, update local variables
607 /// of the lagging operation using the most recent closed array with
608 /// offset <= ticket and return true. Otherwise, return false;
609 bool maybeUpdateFromClosed(
610 const uint64_t state,
611 const uint64_t ticket,
612 uint64_t& offset,
613 Slot*& slots,
614 size_t& cap,
615 int& stride) noexcept {
616 offset = getOffset(state);
617 if (ticket >= offset) {
618 return false;
619 }
620 for (int i = getNumClosed(state) - 1; i >= 0; --i) {
621 offset = closed_[i].offset_;
622 if (offset <= ticket) {
623 slots = closed_[i].slots_;
624 cap = closed_[i].capacity_;
625 stride = closed_[i].stride_;
626 return true;
627 }
628 }
629 // A closed array with offset <= ticket should have been found
630 assert(false);
631 return false;
632 }
633};
634
635namespace detail {
636
637/// CRTP specialization of MPMCQueueBase
638template <
639 template <typename T, template <typename> class Atom, bool Dynamic>
640 class Derived,
641 typename T,
642 template <typename> class Atom,
643 bool Dynamic>
644class MPMCQueueBase<Derived<T, Atom, Dynamic>> {
645 // Note: Using CRTP static casts in several functions of this base
646 // template instead of making called functions virtual or duplicating
647 // the code of calling functions in the derived partially specialized
648 // template
649
650 static_assert(
651 std::is_nothrow_constructible<T, T&&>::value ||
652 folly::IsRelocatable<T>::value,
653 "T must be relocatable or have a noexcept move constructor");
654
655 public:
656 typedef T value_type;
657
658 using Slot = detail::SingleElementQueue<T, Atom>;
659
660 explicit MPMCQueueBase(size_t queueCapacity)
661 : capacity_(queueCapacity),
662 dstate_(0),
663 dcapacity_(0),
664 pushTicket_(0),
665 popTicket_(0),
666 pushSpinCutoff_(0),
667 popSpinCutoff_(0) {
668 if (queueCapacity == 0) {
669 throw std::invalid_argument(
670 "MPMCQueue with explicit capacity 0 is impossible"
671 // Stride computation in derived classes would sigfpe if capacity is 0
672 );
673 }
674
675 // ideally this would be a static assert, but g++ doesn't allow it
676 assert(
677 alignof(MPMCQueue<T, Atom>) >= hardware_destructive_interference_size);
678 assert(
679 static_cast<uint8_t*>(static_cast<void*>(&popTicket_)) -
680 static_cast<uint8_t*>(static_cast<void*>(&pushTicket_)) >=
681 static_cast<ptrdiff_t>(hardware_destructive_interference_size));
682 }
683
684 /// A default-constructed queue is useful because a usable (non-zero
685 /// capacity) queue can be moved onto it or swapped with it
686 MPMCQueueBase() noexcept
687 : capacity_(0),
688 slots_(nullptr),
689 stride_(0),
690 dstate_(0),
691 dcapacity_(0),
692 pushTicket_(0),
693 popTicket_(0),
694 pushSpinCutoff_(0),
695 popSpinCutoff_(0) {}
696
697 /// IMPORTANT: The move constructor is here to make it easier to perform
698 /// the initialization phase, it is not safe to use when there are any
699 /// concurrent accesses (this is not checked).
700 MPMCQueueBase(MPMCQueueBase<Derived<T, Atom, Dynamic>>&& rhs) noexcept
701 : capacity_(rhs.capacity_),
702 slots_(rhs.slots_),
703 stride_(rhs.stride_),
704 dstate_(rhs.dstate_.load(std::memory_order_relaxed)),
705 dcapacity_(rhs.dcapacity_.load(std::memory_order_relaxed)),
706 pushTicket_(rhs.pushTicket_.load(std::memory_order_relaxed)),
707 popTicket_(rhs.popTicket_.load(std::memory_order_relaxed)),
708 pushSpinCutoff_(rhs.pushSpinCutoff_.load(std::memory_order_relaxed)),
709 popSpinCutoff_(rhs.popSpinCutoff_.load(std::memory_order_relaxed)) {
710 // relaxed ops are okay for the previous reads, since rhs queue can't
711 // be in concurrent use
712
713 // zero out rhs
714 rhs.capacity_ = 0;
715 rhs.slots_ = nullptr;
716 rhs.stride_ = 0;
717 rhs.dstate_.store(0, std::memory_order_relaxed);
718 rhs.dcapacity_.store(0, std::memory_order_relaxed);
719 rhs.pushTicket_.store(0, std::memory_order_relaxed);
720 rhs.popTicket_.store(0, std::memory_order_relaxed);
721 rhs.pushSpinCutoff_.store(0, std::memory_order_relaxed);
722 rhs.popSpinCutoff_.store(0, std::memory_order_relaxed);
723 }
724
725 /// IMPORTANT: The move operator is here to make it easier to perform
726 /// the initialization phase, it is not safe to use when there are any
727 /// concurrent accesses (this is not checked).
728 MPMCQueueBase<Derived<T, Atom, Dynamic>> const& operator=(
729 MPMCQueueBase<Derived<T, Atom, Dynamic>>&& rhs) {
730 if (this != &rhs) {
731 this->~MPMCQueueBase();
732 new (this) MPMCQueueBase(std::move(rhs));
733 }
734 return *this;
735 }
736
737 MPMCQueueBase(const MPMCQueueBase&) = delete;
738 MPMCQueueBase& operator=(const MPMCQueueBase&) = delete;
739
740 /// MPMCQueue can only be safely destroyed when there are no
741 /// pending enqueuers or dequeuers (this is not checked).
742 ~MPMCQueueBase() {
743 delete[] slots_;
744 }
745
746 /// Returns the number of writes (including threads that are blocked waiting
747 /// to write) minus the number of reads (including threads that are blocked
748 /// waiting to read). So effectively, it becomes:
749 /// elements in queue + pending(calls to write) - pending(calls to read).
750 /// If nothing is pending, then the method returns the actual number of
751 /// elements in the queue.
752 /// The returned value can be negative if there are no writers and the queue
753 /// is empty, but there is one reader that is blocked waiting to read (in
754 /// which case, the returned size will be -1).
755 ssize_t size() const noexcept {
756 // since both pushes and pops increase monotonically, we can get a
757 // consistent snapshot either by bracketing a read of popTicket_ with
758 // two reads of pushTicket_ that return the same value, or the other
759 // way around. We maximize our chances by alternately attempting
760 // both bracketings.
761 uint64_t pushes = pushTicket_.load(std::memory_order_acquire); // A
762 uint64_t pops = popTicket_.load(std::memory_order_acquire); // B
763 while (true) {
764 uint64_t nextPushes = pushTicket_.load(std::memory_order_acquire); // C
765 if (pushes == nextPushes) {
766 // pushTicket_ didn't change from A (or the previous C) to C,
767 // so we can linearize at B (or D)
768 return ssize_t(pushes - pops);
769 }
770 pushes = nextPushes;
771 uint64_t nextPops = popTicket_.load(std::memory_order_acquire); // D
772 if (pops == nextPops) {
773 // popTicket_ didn't chance from B (or the previous D), so we
774 // can linearize at C
775 return ssize_t(pushes - pops);
776 }
777 pops = nextPops;
778 }
779 }
780
781 /// Returns true if there are no items available for dequeue
782 bool isEmpty() const noexcept {
783 return size() <= 0;
784 }
785
786 /// Returns true if there is currently no empty space to enqueue
787 bool isFull() const noexcept {
788 // careful with signed -> unsigned promotion, since size can be negative
789 return size() >= static_cast<ssize_t>(capacity_);
790 }
791
792 /// Returns is a guess at size() for contexts that don't need a precise
793 /// value, such as stats. More specifically, it returns the number of writes
794 /// minus the number of reads, but after reading the number of writes, more
795 /// writers could have came before the number of reads was sampled,
796 /// and this method doesn't protect against such case.
797 /// The returned value can be negative.
798 ssize_t sizeGuess() const noexcept {
799 return writeCount() - readCount();
800 }
801
802 /// Doesn't change
803 size_t capacity() const noexcept {
804 return capacity_;
805 }
806
807 /// Doesn't change for non-dynamic
808 size_t allocatedCapacity() const noexcept {
809 return capacity_;
810 }
811
812 /// Returns the total number of calls to blockingWrite or successful
813 /// calls to write, including those blockingWrite calls that are
814 /// currently blocking
815 uint64_t writeCount() const noexcept {
816 return pushTicket_.load(std::memory_order_acquire);
817 }
818
819 /// Returns the total number of calls to blockingRead or successful
820 /// calls to read, including those blockingRead calls that are currently
821 /// blocking
822 uint64_t readCount() const noexcept {
823 return popTicket_.load(std::memory_order_acquire);
824 }
825
826 /// Enqueues a T constructed from args, blocking until space is
827 /// available. Note that this method signature allows enqueue via
828 /// move, if args is a T rvalue, via copy, if args is a T lvalue, or
829 /// via emplacement if args is an initializer list that can be passed
830 /// to a T constructor.
831 template <typename... Args>
832 void blockingWrite(Args&&... args) noexcept {
833 enqueueWithTicketBase(
834 pushTicket_++, slots_, capacity_, stride_, std::forward<Args>(args)...);
835 }
836
837 /// If an item can be enqueued with no blocking, does so and returns
838 /// true, otherwise returns false. This method is similar to
839 /// writeIfNotFull, but if you don't have a specific need for that
840 /// method you should use this one.
841 ///
842 /// One of the common usages of this method is to enqueue via the
843 /// move constructor, something like q.write(std::move(x)). If write
844 /// returns false because the queue is full then x has not actually been
845 /// consumed, which looks strange. To understand why it is actually okay
846 /// to use x afterward, remember that std::move is just a typecast that
847 /// provides an rvalue reference that enables use of a move constructor
848 /// or operator. std::move doesn't actually move anything. It could
849 /// more accurately be called std::rvalue_cast or std::move_permission.
850 template <typename... Args>
851 bool write(Args&&... args) noexcept {
852 uint64_t ticket;
853 Slot* slots;
854 size_t cap;
855 int stride;
856 if (static_cast<Derived<T, Atom, Dynamic>*>(this)->tryObtainReadyPushTicket(
857 ticket, slots, cap, stride)) {
858 // we have pre-validated that the ticket won't block
859 enqueueWithTicketBase(
860 ticket, slots, cap, stride, std::forward<Args>(args)...);
861 return true;
862 } else {
863 return false;
864 }
865 }
866
867 template <class Clock, typename... Args>
868 bool tryWriteUntil(
869 const std::chrono::time_point<Clock>& when,
870 Args&&... args) noexcept {
871 uint64_t ticket;
872 Slot* slots;
873 size_t cap;
874 int stride;
875 if (tryObtainPromisedPushTicketUntil(ticket, slots, cap, stride, when)) {
876 // we have pre-validated that the ticket won't block, or rather that
877 // it won't block longer than it takes another thread to dequeue an
878 // element from the slot it identifies.
879 enqueueWithTicketBase(
880 ticket, slots, cap, stride, std::forward<Args>(args)...);
881 return true;
882 } else {
883 return false;
884 }
885 }
886
887 /// If the queue is not full, enqueues and returns true, otherwise
888 /// returns false. Unlike write this method can be blocked by another
889 /// thread, specifically a read that has linearized (been assigned
890 /// a ticket) but not yet completed. If you don't really need this
891 /// function you should probably use write.
892 ///
893 /// MPMCQueue isn't lock-free, so just because a read operation has
894 /// linearized (and isFull is false) doesn't mean that space has been
895 /// made available for another write. In this situation write will
896 /// return false, but writeIfNotFull will wait for the dequeue to finish.
897 /// This method is required if you are composing queues and managing
898 /// your own wakeup, because it guarantees that after every successful
899 /// write a readIfNotEmpty will succeed.
900 template <typename... Args>
901 bool writeIfNotFull(Args&&... args) noexcept {
902 uint64_t ticket;
903 Slot* slots;
904 size_t cap;
905 int stride;
906 if (static_cast<Derived<T, Atom, Dynamic>*>(this)
907 ->tryObtainPromisedPushTicket(ticket, slots, cap, stride)) {
908 // some other thread is already dequeuing the slot into which we
909 // are going to enqueue, but we might have to wait for them to finish
910 enqueueWithTicketBase(
911 ticket, slots, cap, stride, std::forward<Args>(args)...);
912 return true;
913 } else {
914 return false;
915 }
916 }
917
918 /// Moves a dequeued element onto elem, blocking until an element
919 /// is available
920 void blockingRead(T& elem) noexcept {
921 uint64_t ticket;
922 static_cast<Derived<T, Atom, Dynamic>*>(this)->blockingReadWithTicket(
923 ticket, elem);
924 }
925
926 /// Same as blockingRead() but also records the ticket nunmer
927 void blockingReadWithTicket(uint64_t& ticket, T& elem) noexcept {
928 assert(capacity_ != 0);
929 ticket = popTicket_++;
930 dequeueWithTicketBase(ticket, slots_, capacity_, stride_, elem);
931 }
932
933 /// If an item can be dequeued with no blocking, does so and returns
934 /// true, otherwise returns false.
935 bool read(T& elem) noexcept {
936 uint64_t ticket;
937 return readAndGetTicket(ticket, elem);
938 }
939
940 /// Same as read() but also records the ticket nunmer
941 bool readAndGetTicket(uint64_t& ticket, T& elem) noexcept {
942 Slot* slots;
943 size_t cap;
944 int stride;
945 if (static_cast<Derived<T, Atom, Dynamic>*>(this)->tryObtainReadyPopTicket(
946 ticket, slots, cap, stride)) {
947 // the ticket has been pre-validated to not block
948 dequeueWithTicketBase(ticket, slots, cap, stride, elem);
949 return true;
950 } else {
951 return false;
952 }
953 }
954
955 template <class Clock, typename... Args>
956 bool tryReadUntil(
957 const std::chrono::time_point<Clock>& when,
958 T& elem) noexcept {
959 uint64_t ticket;
960 Slot* slots;
961 size_t cap;
962 int stride;
963 if (tryObtainPromisedPopTicketUntil(ticket, slots, cap, stride, when)) {
964 // we have pre-validated that the ticket won't block, or rather that
965 // it won't block longer than it takes another thread to enqueue an
966 // element on the slot it identifies.
967 dequeueWithTicketBase(ticket, slots, cap, stride, elem);
968 return true;
969 } else {
970 return false;
971 }
972 }
973
974 /// If the queue is not empty, dequeues and returns true, otherwise
975 /// returns false. If the matching write is still in progress then this
976 /// method may block waiting for it. If you don't rely on being able
977 /// to dequeue (such as by counting completed write) then you should
978 /// prefer read.
979 bool readIfNotEmpty(T& elem) noexcept {
980 uint64_t ticket;
981 Slot* slots;
982 size_t cap;
983 int stride;
984 if (static_cast<Derived<T, Atom, Dynamic>*>(this)
985 ->tryObtainPromisedPopTicket(ticket, slots, cap, stride)) {
986 // the matching enqueue already has a ticket, but might not be done
987 dequeueWithTicketBase(ticket, slots, cap, stride, elem);
988 return true;
989 } else {
990 return false;
991 }
992 }
993
994 protected:
995 enum {
996 /// Once every kAdaptationFreq we will spin longer, to try to estimate
997 /// the proper spin backoff
998 kAdaptationFreq = 128,
999
1000 /// To avoid false sharing in slots_ with neighboring memory
1001 /// allocations, we pad it with this many SingleElementQueue-s at
1002 /// each end
1003 kSlotPadding =
1004 (hardware_destructive_interference_size - 1) / sizeof(Slot) + 1
1005 };
1006
1007 /// The maximum number of items in the queue at once
1008 alignas(hardware_destructive_interference_size) size_t capacity_;
1009
1010 /// Anonymous union for use when Dynamic = false and true, respectively
1011 union {
1012 /// An array of capacity_ SingleElementQueue-s, each of which holds
1013 /// either 0 or 1 item. We over-allocate by 2 * kSlotPadding and don't
1014 /// touch the slots at either end, to avoid false sharing
1015 Slot* slots_;
1016 /// Current dynamic slots array of dcapacity_ SingleElementQueue-s
1017 Atom<Slot*> dslots_;
1018 };
1019
1020 /// Anonymous union for use when Dynamic = false and true, respectively
1021 union {
1022 /// The number of slots_ indices that we advance for each ticket, to
1023 /// avoid false sharing. Ideally slots_[i] and slots_[i + stride_]
1024 /// aren't on the same cache line
1025 int stride_;
1026 /// Current stride
1027 Atom<int> dstride_;
1028 };
1029
1030 /// The following two memebers are used by dynamic MPMCQueue.
1031 /// Ideally they should be in MPMCQueue<T,Atom,true>, but we get
1032 /// better cache locality if they are in the same cache line as
1033 /// dslots_ and dstride_.
1034 ///
1035 /// Dynamic state. A packed seqlock and ticket offset
1036 Atom<uint64_t> dstate_;
1037 /// Dynamic capacity
1038 Atom<size_t> dcapacity_;
1039
1040 /// Enqueuers get tickets from here
1041 alignas(hardware_destructive_interference_size) Atom<uint64_t> pushTicket_;
1042
1043 /// Dequeuers get tickets from here
1044 alignas(hardware_destructive_interference_size) Atom<uint64_t> popTicket_;
1045
1046 /// This is how many times we will spin before using FUTEX_WAIT when
1047 /// the queue is full on enqueue, adaptively computed by occasionally
1048 /// spinning for longer and smoothing with an exponential moving average
1049 alignas(
1050 hardware_destructive_interference_size) Atom<uint32_t> pushSpinCutoff_;
1051
1052 /// The adaptive spin cutoff when the queue is empty on dequeue
1053 alignas(hardware_destructive_interference_size) Atom<uint32_t> popSpinCutoff_;
1054
1055 /// Alignment doesn't prevent false sharing at the end of the struct,
1056 /// so fill out the last cache line
1057 char pad_[hardware_destructive_interference_size - sizeof(Atom<uint32_t>)];
1058
1059 /// We assign tickets in increasing order, but we don't want to
1060 /// access neighboring elements of slots_ because that will lead to
1061 /// false sharing (multiple cores accessing the same cache line even
1062 /// though they aren't accessing the same bytes in that cache line).
1063 /// To avoid this we advance by stride slots per ticket.
1064 ///
1065 /// We need gcd(capacity, stride) to be 1 so that we will use all
1066 /// of the slots. We ensure this by only considering prime strides,
1067 /// which either have no common divisors with capacity or else have
1068 /// a zero remainder after dividing by capacity. That is sufficient
1069 /// to guarantee correctness, but we also want to actually spread the
1070 /// accesses away from each other to avoid false sharing (consider a
1071 /// stride of 7 with a capacity of 8). To that end we try a few taking
1072 /// care to observe that advancing by -1 is as bad as advancing by 1
1073 /// when in comes to false sharing.
1074 ///
1075 /// The simple way to avoid false sharing would be to pad each
1076 /// SingleElementQueue, but since we have capacity_ of them that could
1077 /// waste a lot of space.
1078 static int computeStride(size_t capacity) noexcept {
1079 static const int smallPrimes[] = {2, 3, 5, 7, 11, 13, 17, 19, 23};
1080
1081 int bestStride = 1;
1082 size_t bestSep = 1;
1083 for (int stride : smallPrimes) {
1084 if ((stride % capacity) == 0 || (capacity % stride) == 0) {
1085 continue;
1086 }
1087 size_t sep = stride % capacity;
1088 sep = std::min(sep, capacity - sep);
1089 if (sep > bestSep) {
1090 bestStride = stride;
1091 bestSep = sep;
1092 }
1093 }
1094 return bestStride;
1095 }
1096
1097 /// Returns the index into slots_ that should be used when enqueuing or
1098 /// dequeuing with the specified ticket
1099 size_t idx(uint64_t ticket, size_t cap, int stride) noexcept {
1100 return ((ticket * stride) % cap) + kSlotPadding;
1101 }
1102
1103 /// Maps an enqueue or dequeue ticket to the turn should be used at the
1104 /// corresponding SingleElementQueue
1105 uint32_t turn(uint64_t ticket, size_t cap) noexcept {
1106 assert(cap != 0);
1107 return uint32_t(ticket / cap);
1108 }
1109
1110 /// Tries to obtain a push ticket for which SingleElementQueue::enqueue
1111 /// won't block. Returns true on immediate success, false on immediate
1112 /// failure.
1113 bool tryObtainReadyPushTicket(
1114 uint64_t& ticket,
1115 Slot*& slots,
1116 size_t& cap,
1117 int& stride) noexcept {
1118 ticket = pushTicket_.load(std::memory_order_acquire); // A
1119 slots = slots_;
1120 cap = capacity_;
1121 stride = stride_;
1122 while (true) {
1123 if (!slots[idx(ticket, cap, stride)].mayEnqueue(turn(ticket, cap))) {
1124 // if we call enqueue(ticket, ...) on the SingleElementQueue
1125 // right now it would block, but this might no longer be the next
1126 // ticket. We can increase the chance of tryEnqueue success under
1127 // contention (without blocking) by rechecking the ticket dispenser
1128 auto prev = ticket;
1129 ticket = pushTicket_.load(std::memory_order_acquire); // B
1130 if (prev == ticket) {
1131 // mayEnqueue was bracketed by two reads (A or prev B or prev
1132 // failing CAS to B), so we are definitely unable to enqueue
1133 return false;
1134 }
1135 } else {
1136 // we will bracket the mayEnqueue check with a read (A or prev B
1137 // or prev failing CAS) and the following CAS. If the CAS fails
1138 // it will effect a load of pushTicket_
1139 if (pushTicket_.compare_exchange_strong(ticket, ticket + 1)) {
1140 return true;
1141 }
1142 }
1143 }
1144 }
1145
1146 /// Tries until when to obtain a push ticket for which
1147 /// SingleElementQueue::enqueue won't block. Returns true on success, false
1148 /// on failure.
1149 /// ticket is filled on success AND failure.
1150 template <class Clock>
1151 bool tryObtainPromisedPushTicketUntil(
1152 uint64_t& ticket,
1153 Slot*& slots,
1154 size_t& cap,
1155 int& stride,
1156 const std::chrono::time_point<Clock>& when) noexcept {
1157 bool deadlineReached = false;
1158 while (!deadlineReached) {
1159 if (static_cast<Derived<T, Atom, Dynamic>*>(this)
1160 ->tryObtainPromisedPushTicket(ticket, slots, cap, stride)) {
1161 return true;
1162 }
1163 // ticket is a blocking ticket until the preceding ticket has been
1164 // processed: wait until this ticket's turn arrives. We have not reserved
1165 // this ticket so we will have to re-attempt to get a non-blocking ticket
1166 // if we wake up before we time-out.
1167 deadlineReached =
1168 !slots[idx(ticket, cap, stride)].tryWaitForEnqueueTurnUntil(
1169 turn(ticket, cap),
1170 pushSpinCutoff_,
1171 (ticket % kAdaptationFreq) == 0,
1172 when);
1173 }
1174 return false;
1175 }
1176
1177 /// Tries to obtain a push ticket which can be satisfied if all
1178 /// in-progress pops complete. This function does not block, but
1179 /// blocking may be required when using the returned ticket if some
1180 /// other thread's pop is still in progress (ticket has been granted but
1181 /// pop has not yet completed).
1182 bool tryObtainPromisedPushTicket(
1183 uint64_t& ticket,
1184 Slot*& slots,
1185 size_t& cap,
1186 int& stride) noexcept {
1187 auto numPushes = pushTicket_.load(std::memory_order_acquire); // A
1188 slots = slots_;
1189 cap = capacity_;
1190 stride = stride_;
1191 while (true) {
1192 ticket = numPushes;
1193 const auto numPops = popTicket_.load(std::memory_order_acquire); // B
1194 // n will be negative if pops are pending
1195 const int64_t n = int64_t(numPushes - numPops);
1196 if (n >= static_cast<ssize_t>(capacity_)) {
1197 // Full, linearize at B. We don't need to recheck the read we
1198 // performed at A, because if numPushes was stale at B then the
1199 // real numPushes value is even worse
1200 return false;
1201 }
1202 if (pushTicket_.compare_exchange_strong(numPushes, numPushes + 1)) {
1203 return true;
1204 }
1205 }
1206 }
1207
1208 /// Tries to obtain a pop ticket for which SingleElementQueue::dequeue
1209 /// won't block. Returns true on immediate success, false on immediate
1210 /// failure.
1211 bool tryObtainReadyPopTicket(
1212 uint64_t& ticket,
1213 Slot*& slots,
1214 size_t& cap,
1215 int& stride) noexcept {
1216 ticket = popTicket_.load(std::memory_order_acquire);
1217 slots = slots_;
1218 cap = capacity_;
1219 stride = stride_;
1220 while (true) {
1221 if (!slots[idx(ticket, cap, stride)].mayDequeue(turn(ticket, cap))) {
1222 auto prev = ticket;
1223 ticket = popTicket_.load(std::memory_order_acquire);
1224 if (prev == ticket) {
1225 return false;
1226 }
1227 } else {
1228 if (popTicket_.compare_exchange_strong(ticket, ticket + 1)) {
1229 return true;
1230 }
1231 }
1232 }
1233 }
1234
1235 /// Tries until when to obtain a pop ticket for which
1236 /// SingleElementQueue::dequeue won't block. Returns true on success, false
1237 /// on failure.
1238 /// ticket is filled on success AND failure.
1239 template <class Clock>
1240 bool tryObtainPromisedPopTicketUntil(
1241 uint64_t& ticket,
1242 Slot*& slots,
1243 size_t& cap,
1244 int& stride,
1245 const std::chrono::time_point<Clock>& when) noexcept {
1246 bool deadlineReached = false;
1247 while (!deadlineReached) {
1248 if (static_cast<Derived<T, Atom, Dynamic>*>(this)
1249 ->tryObtainPromisedPopTicket(ticket, slots, cap, stride)) {
1250 return true;
1251 }
1252 // ticket is a blocking ticket until the preceding ticket has been
1253 // processed: wait until this ticket's turn arrives. We have not reserved
1254 // this ticket so we will have to re-attempt to get a non-blocking ticket
1255 // if we wake up before we time-out.
1256 deadlineReached =
1257 !slots[idx(ticket, cap, stride)].tryWaitForDequeueTurnUntil(
1258 turn(ticket, cap),
1259 pushSpinCutoff_,
1260 (ticket % kAdaptationFreq) == 0,
1261 when);
1262 }
1263 return false;
1264 }
1265
1266 /// Similar to tryObtainReadyPopTicket, but returns a pop ticket whose
1267 /// corresponding push ticket has already been handed out, rather than
1268 /// returning one whose corresponding push ticket has already been
1269 /// completed. This means that there is a possibility that the caller
1270 /// will block when using the ticket, but it allows the user to rely on
1271 /// the fact that if enqueue has succeeded, tryObtainPromisedPopTicket
1272 /// will return true. The "try" part of this is that we won't have
1273 /// to block waiting for someone to call enqueue, although we might
1274 /// have to block waiting for them to finish executing code inside the
1275 /// MPMCQueue itself.
1276 bool tryObtainPromisedPopTicket(
1277 uint64_t& ticket,
1278 Slot*& slots,
1279 size_t& cap,
1280 int& stride) noexcept {
1281 auto numPops = popTicket_.load(std::memory_order_acquire); // A
1282 slots = slots_;
1283 cap = capacity_;
1284 stride = stride_;
1285 while (true) {
1286 ticket = numPops;
1287 const auto numPushes = pushTicket_.load(std::memory_order_acquire); // B
1288 if (numPops >= numPushes) {
1289 // Empty, or empty with pending pops. Linearize at B. We don't
1290 // need to recheck the read we performed at A, because if numPops
1291 // is stale then the fresh value is larger and the >= is still true
1292 return false;
1293 }
1294 if (popTicket_.compare_exchange_strong(numPops, numPops + 1)) {
1295 return true;
1296 }
1297 }
1298 }
1299
1300 // Given a ticket, constructs an enqueued item using args
1301 template <typename... Args>
1302 void enqueueWithTicketBase(
1303 uint64_t ticket,
1304 Slot* slots,
1305 size_t cap,
1306 int stride,
1307 Args&&... args) noexcept {
1308 slots[idx(ticket, cap, stride)].enqueue(
1309 turn(ticket, cap),
1310 pushSpinCutoff_,
1311 (ticket % kAdaptationFreq) == 0,
1312 std::forward<Args>(args)...);
1313 }
1314
1315 // To support tracking ticket numbers in MPMCPipelineStageImpl
1316 template <typename... Args>
1317 void enqueueWithTicket(uint64_t ticket, Args&&... args) noexcept {
1318 enqueueWithTicketBase(
1319 ticket, slots_, capacity_, stride_, std::forward<Args>(args)...);
1320 }
1321
1322 // Given a ticket, dequeues the corresponding element
1323 void dequeueWithTicketBase(
1324 uint64_t ticket,
1325 Slot* slots,
1326 size_t cap,
1327 int stride,
1328 T& elem) noexcept {
1329 assert(cap != 0);
1330 slots[idx(ticket, cap, stride)].dequeue(
1331 turn(ticket, cap),
1332 popSpinCutoff_,
1333 (ticket % kAdaptationFreq) == 0,
1334 elem);
1335 }
1336};
1337
1338/// SingleElementQueue implements a blocking queue that holds at most one
1339/// item, and that requires its users to assign incrementing identifiers
1340/// (turns) to each enqueue and dequeue operation. Note that the turns
1341/// used by SingleElementQueue are doubled inside the TurnSequencer
1342template <typename T, template <typename> class Atom>
1343struct SingleElementQueue {
1344 ~SingleElementQueue() noexcept {
1345 if ((sequencer_.uncompletedTurnLSB() & 1) == 1) {
1346 // we are pending a dequeue, so we have a constructed item
1347 destroyContents();
1348 }
1349 }
1350
1351 /// enqueue using in-place noexcept construction
1352 template <
1353 typename... Args,
1354 typename = typename std::enable_if<
1355 std::is_nothrow_constructible<T, Args...>::value>::type>
1356 void enqueue(
1357 const uint32_t turn,
1358 Atom<uint32_t>& spinCutoff,
1359 const bool updateSpinCutoff,
1360 Args&&... args) noexcept {
1361 sequencer_.waitForTurn(turn * 2, spinCutoff, updateSpinCutoff);
1362 new (&contents_) T(std::forward<Args>(args)...);
1363 sequencer_.completeTurn(turn * 2);
1364 }
1365
1366 /// enqueue using move construction, either real (if
1367 /// is_nothrow_move_constructible) or simulated using relocation and
1368 /// default construction (if IsRelocatable and is_nothrow_constructible)
1369 template <
1370 typename = typename std::enable_if<
1371 (folly::IsRelocatable<T>::value &&
1372 std::is_nothrow_constructible<T>::value) ||
1373 std::is_nothrow_constructible<T, T&&>::value>::type>
1374 void enqueue(
1375 const uint32_t turn,
1376 Atom<uint32_t>& spinCutoff,
1377 const bool updateSpinCutoff,
1378 T&& goner) noexcept {
1379 enqueueImpl(
1380 turn,
1381 spinCutoff,
1382 updateSpinCutoff,
1383 std::move(goner),
1384 typename std::conditional<
1385 std::is_nothrow_constructible<T, T&&>::value,
1386 ImplByMove,
1387 ImplByRelocation>::type());
1388 }
1389
1390 /// Waits until either:
1391 /// 1: the dequeue turn preceding the given enqueue turn has arrived
1392 /// 2: the given deadline has arrived
1393 /// Case 1 returns true, case 2 returns false.
1394 template <class Clock>
1395 bool tryWaitForEnqueueTurnUntil(
1396 const uint32_t turn,
1397 Atom<uint32_t>& spinCutoff,
1398 const bool updateSpinCutoff,
1399 const std::chrono::time_point<Clock>& when) noexcept {
1400 return sequencer_.tryWaitForTurn(
1401 turn * 2, spinCutoff, updateSpinCutoff, &when) !=
1402 TurnSequencer<Atom>::TryWaitResult::TIMEDOUT;
1403 }
1404
1405 bool mayEnqueue(const uint32_t turn) const noexcept {
1406 return sequencer_.isTurn(turn * 2);
1407 }
1408
1409 void dequeue(
1410 uint32_t turn,
1411 Atom<uint32_t>& spinCutoff,
1412 const bool updateSpinCutoff,
1413 T& elem) noexcept {
1414 dequeueImpl(
1415 turn,
1416 spinCutoff,
1417 updateSpinCutoff,
1418 elem,
1419 typename std::conditional<
1420 folly::IsRelocatable<T>::value,
1421 ImplByRelocation,
1422 ImplByMove>::type());
1423 }
1424
1425 /// Waits until either:
1426 /// 1: the enqueue turn preceding the given dequeue turn has arrived
1427 /// 2: the given deadline has arrived
1428 /// Case 1 returns true, case 2 returns false.
1429 template <class Clock>
1430 bool tryWaitForDequeueTurnUntil(
1431 const uint32_t turn,
1432 Atom<uint32_t>& spinCutoff,
1433 const bool updateSpinCutoff,
1434 const std::chrono::time_point<Clock>& when) noexcept {
1435 return sequencer_.tryWaitForTurn(
1436 turn * 2 + 1, spinCutoff, updateSpinCutoff, &when) !=
1437 TurnSequencer<Atom>::TryWaitResult::TIMEDOUT;
1438 }
1439
1440 bool mayDequeue(const uint32_t turn) const noexcept {
1441 return sequencer_.isTurn(turn * 2 + 1);
1442 }
1443
1444 private:
1445 /// Storage for a T constructed with placement new
1446 aligned_storage_for_t<T> contents_;
1447
1448 /// Even turns are pushes, odd turns are pops
1449 TurnSequencer<Atom> sequencer_;
1450
1451 T* ptr() noexcept {
1452 return static_cast<T*>(static_cast<void*>(&contents_));
1453 }
1454
1455 void destroyContents() noexcept {
1456 try {
1457 ptr()->~T();
1458 } catch (...) {
1459 // g++ doesn't seem to have std::is_nothrow_destructible yet
1460 }
1461 if (kIsDebug) {
1462 memset(&contents_, 'Q', sizeof(T));
1463 }
1464 }
1465
1466 /// Tag classes for dispatching to enqueue/dequeue implementation.
1467 struct ImplByRelocation {};
1468 struct ImplByMove {};
1469
1470 /// enqueue using nothrow move construction.
1471 void enqueueImpl(
1472 const uint32_t turn,
1473 Atom<uint32_t>& spinCutoff,
1474 const bool updateSpinCutoff,
1475 T&& goner,
1476 ImplByMove) noexcept {
1477 sequencer_.waitForTurn(turn * 2, spinCutoff, updateSpinCutoff);
1478 new (&contents_) T(std::move(goner));
1479 sequencer_.completeTurn(turn * 2);
1480 }
1481
1482 /// enqueue by simulating nothrow move with relocation, followed by
1483 /// default construction to a noexcept relocation.
1484 void enqueueImpl(
1485 const uint32_t turn,
1486 Atom<uint32_t>& spinCutoff,
1487 const bool updateSpinCutoff,
1488 T&& goner,
1489 ImplByRelocation) noexcept {
1490 sequencer_.waitForTurn(turn * 2, spinCutoff, updateSpinCutoff);
1491 memcpy(
1492 static_cast<void*>(&contents_),
1493 static_cast<void const*>(&goner),
1494 sizeof(T));
1495 sequencer_.completeTurn(turn * 2);
1496 new (&goner) T();
1497 }
1498
1499 /// dequeue by destructing followed by relocation. This version is preferred,
1500 /// because as much work as possible can be done before waiting.
1501 void dequeueImpl(
1502 uint32_t turn,
1503 Atom<uint32_t>& spinCutoff,
1504 const bool updateSpinCutoff,
1505 T& elem,
1506 ImplByRelocation) noexcept {
1507 try {
1508 elem.~T();
1509 } catch (...) {
1510 // unlikely, but if we don't complete our turn the queue will die
1511 }
1512 sequencer_.waitForTurn(turn * 2 + 1, spinCutoff, updateSpinCutoff);
1513 memcpy(
1514 static_cast<void*>(&elem),
1515 static_cast<void const*>(&contents_),
1516 sizeof(T));
1517 sequencer_.completeTurn(turn * 2 + 1);
1518 }
1519
1520 /// dequeue by nothrow move assignment.
1521 void dequeueImpl(
1522 uint32_t turn,
1523 Atom<uint32_t>& spinCutoff,
1524 const bool updateSpinCutoff,
1525 T& elem,
1526 ImplByMove) noexcept {
1527 sequencer_.waitForTurn(turn * 2 + 1, spinCutoff, updateSpinCutoff);
1528 elem = std::move(*ptr());
1529 destroyContents();
1530 sequencer_.completeTurn(turn * 2 + 1);
1531 }
1532};
1533
1534} // namespace detail
1535
1536} // namespace folly
1537