1 | /* |
2 | * Copyright (c) Facebook, Inc. and its affiliates. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | |
17 | #pragma once |
18 | |
19 | #include <algorithm> |
20 | #include <atomic> |
21 | #include <cassert> |
22 | #include <cstring> |
23 | #include <limits> |
24 | #include <type_traits> |
25 | |
26 | #include <folly/Traits.h> |
27 | #include <folly/concurrency/CacheLocality.h> |
28 | #include <folly/detail/TurnSequencer.h> |
29 | #include <folly/portability/Unistd.h> |
30 | |
31 | namespace folly { |
32 | |
33 | namespace detail { |
34 | |
35 | template <typename T, template <typename> class Atom> |
36 | struct SingleElementQueue; |
37 | |
38 | template <typename T> |
39 | class MPMCPipelineStageImpl; |
40 | |
41 | /// MPMCQueue base CRTP template |
42 | template <typename> |
43 | class MPMCQueueBase; |
44 | |
45 | } // namespace detail |
46 | |
47 | /// MPMCQueue<T> is a high-performance bounded concurrent queue that |
48 | /// supports multiple producers, multiple consumers, and optional blocking. |
49 | /// The queue has a fixed capacity, for which all memory will be allocated |
50 | /// up front. The bulk of the work of enqueuing and dequeuing can be |
51 | /// performed in parallel. |
52 | /// |
53 | /// MPMCQueue is linearizable. That means that if a call to write(A) |
54 | /// returns before a call to write(B) begins, then A will definitely end up |
55 | /// in the queue before B, and if a call to read(X) returns before a call |
56 | /// to read(Y) is started, that X will be something from earlier in the |
57 | /// queue than Y. This also means that if a read call returns a value, you |
58 | /// can be sure that all previous elements of the queue have been assigned |
59 | /// a reader (that reader might not yet have returned, but it exists). |
60 | /// |
61 | /// The underlying implementation uses a ticket dispenser for the head and |
62 | /// the tail, spreading accesses across N single-element queues to produce |
63 | /// a queue with capacity N. The ticket dispensers use atomic increment, |
64 | /// which is more robust to contention than a CAS loop. Each of the |
65 | /// single-element queues uses its own CAS to serialize access, with an |
66 | /// adaptive spin cutoff. When spinning fails on a single-element queue |
67 | /// it uses futex()'s _BITSET operations to reduce unnecessary wakeups |
68 | /// even if multiple waiters are present on an individual queue (such as |
69 | /// when the MPMCQueue's capacity is smaller than the number of enqueuers |
70 | /// or dequeuers). |
71 | /// |
72 | /// In benchmarks (contained in tao/queues/ConcurrentQueueTests) |
73 | /// it handles 1 to 1, 1 to N, N to 1, and N to M thread counts better |
74 | /// than any of the alternatives present in fbcode, for both small (~10) |
75 | /// and large capacities. In these benchmarks it is also faster than |
76 | /// tbb::concurrent_bounded_queue for all configurations. When there are |
77 | /// many more threads than cores, MPMCQueue is _much_ faster than the tbb |
78 | /// queue because it uses futex() to block and unblock waiting threads, |
79 | /// rather than spinning with sched_yield. |
80 | /// |
81 | /// NOEXCEPT INTERACTION: tl;dr; If it compiles you're fine. Ticket-based |
82 | /// queues separate the assignment of queue positions from the actual |
83 | /// construction of the in-queue elements, which means that the T |
84 | /// constructor used during enqueue must not throw an exception. This is |
85 | /// enforced at compile time using type traits, which requires that T be |
86 | /// adorned with accurate noexcept information. If your type does not |
87 | /// use noexcept, you will have to wrap it in something that provides |
88 | /// the guarantee. We provide an alternate safe implementation for types |
89 | /// that don't use noexcept but that are marked folly::IsRelocatable |
90 | /// and std::is_nothrow_constructible, which is common for folly types. |
91 | /// In particular, if you can declare FOLLY_ASSUME_FBVECTOR_COMPATIBLE |
92 | /// then your type can be put in MPMCQueue. |
93 | /// |
94 | /// If you have a pool of N queue consumers that you want to shut down |
95 | /// after the queue has drained, one way is to enqueue N sentinel values |
96 | /// to the queue. If the producer doesn't know how many consumers there |
97 | /// are you can enqueue one sentinel and then have each consumer requeue |
98 | /// two sentinels after it receives it (by requeuing 2 the shutdown can |
99 | /// complete in O(log P) time instead of O(P)). |
100 | template < |
101 | typename T, |
102 | template <typename> class Atom = std::atomic, |
103 | bool Dynamic = false> |
104 | class MPMCQueue : public detail::MPMCQueueBase<MPMCQueue<T, Atom, Dynamic>> { |
105 | friend class detail::MPMCPipelineStageImpl<T>; |
106 | using Slot = detail::SingleElementQueue<T, Atom>; |
107 | |
108 | public: |
109 | explicit MPMCQueue(size_t queueCapacity) |
110 | : detail::MPMCQueueBase<MPMCQueue<T, Atom, Dynamic>>(queueCapacity) { |
111 | this->stride_ = this->computeStride(queueCapacity); |
112 | this->slots_ = new Slot[queueCapacity + 2 * this->kSlotPadding]; |
113 | } |
114 | |
115 | MPMCQueue() noexcept {} |
116 | }; |
117 | |
118 | /// *** The dynamic version of MPMCQueue is deprecated. *** |
119 | /// Use UnboundedQueue instead. |
120 | |
121 | /// The dynamic version of MPMCQueue allows dynamic expansion of queue |
122 | /// capacity, such that a queue may start with a smaller capacity than |
123 | /// specified and expand only if needed. Users may optionally specify |
124 | /// the initial capacity and the expansion multiplier. |
125 | /// |
126 | /// The design uses a seqlock to enforce mutual exclusion among |
127 | /// expansion attempts. Regular operations read up-to-date queue |
128 | /// information (slots array, capacity, stride) inside read-only |
129 | /// seqlock sections, which are unimpeded when no expansion is in |
130 | /// progress. |
131 | /// |
132 | /// An expansion computes a new capacity, allocates a new slots array, |
133 | /// and updates stride. No information needs to be copied from the |
134 | /// current slots array to the new one. When this happens, new slots |
135 | /// will not have sequence numbers that match ticket numbers. The |
136 | /// expansion needs to compute a ticket offset such that operations |
137 | /// that use new arrays can adjust the calculations of slot indexes |
138 | /// and sequence numbers that take into account that the new slots |
139 | /// start with sequence numbers of zero. The current ticket offset is |
140 | /// packed with the seqlock in an atomic 64-bit integer. The initial |
141 | /// offset is zero. |
142 | /// |
143 | /// Lagging write and read operations with tickets lower than the |
144 | /// ticket offset of the current slots array (i.e., the minimum ticket |
145 | /// number that can be served by the current array) must use earlier |
146 | /// closed arrays instead of the current one. Information about closed |
147 | /// slots arrays (array address, capacity, stride, and offset) is |
148 | /// maintained in a logarithmic-sized structure. Each entry in that |
149 | /// structure never needs to be changed once set. The number of closed |
150 | /// arrays is half the value of the seqlock (when unlocked). |
151 | /// |
152 | /// The acquisition of the seqlock to perform an expansion does not |
153 | /// prevent the issuing of new push and pop tickets concurrently. The |
154 | /// expansion must set the new ticket offset to a value that couldn't |
155 | /// have been issued to an operation that has already gone through a |
156 | /// seqlock read-only section (and hence obtained information for |
157 | /// older closed arrays). |
158 | /// |
159 | /// Note that the total queue capacity can temporarily exceed the |
160 | /// specified capacity when there are lagging consumers that haven't |
161 | /// yet consumed all the elements in closed arrays. Users should not |
162 | /// rely on the capacity of dynamic queues for synchronization, e.g., |
163 | /// they should not expect that a thread will definitely block on a |
164 | /// call to blockingWrite() when the queue size is known to be equal |
165 | /// to its capacity. |
166 | /// |
167 | /// Note that some writeIfNotFull() and tryWriteUntil() operations may |
168 | /// fail even if the size of the queue is less than its maximum |
169 | /// capacity and despite the success of expansion, if the operation |
170 | /// happens to acquire a ticket that belongs to a closed array. This |
171 | /// is a transient condition. Typically, one or two ticket values may |
172 | /// be subject to such condition per expansion. |
173 | /// |
174 | /// The dynamic version is a partial specialization of MPMCQueue with |
175 | /// Dynamic == true |
176 | template <typename T, template <typename> class Atom> |
177 | class MPMCQueue<T, Atom, true> |
178 | : public detail::MPMCQueueBase<MPMCQueue<T, Atom, true>> { |
179 | friend class detail::MPMCQueueBase<MPMCQueue<T, Atom, true>>; |
180 | using Slot = detail::SingleElementQueue<T, Atom>; |
181 | |
182 | struct ClosedArray { |
183 | uint64_t offset_{0}; |
184 | Slot* slots_{nullptr}; |
185 | size_t capacity_{0}; |
186 | int stride_{0}; |
187 | }; |
188 | |
189 | public: |
190 | explicit MPMCQueue(size_t queueCapacity) |
191 | : detail::MPMCQueueBase<MPMCQueue<T, Atom, true>>(queueCapacity) { |
192 | size_t cap = std::min<size_t>(kDefaultMinDynamicCapacity, queueCapacity); |
193 | initQueue(cap, kDefaultExpansionMultiplier); |
194 | } |
195 | |
196 | explicit MPMCQueue( |
197 | size_t queueCapacity, |
198 | size_t minCapacity, |
199 | size_t expansionMultiplier) |
200 | : detail::MPMCQueueBase<MPMCQueue<T, Atom, true>>(queueCapacity) { |
201 | minCapacity = std::max<size_t>(1, minCapacity); |
202 | size_t cap = std::min<size_t>(minCapacity, queueCapacity); |
203 | expansionMultiplier = std::max<size_t>(2, expansionMultiplier); |
204 | initQueue(cap, expansionMultiplier); |
205 | } |
206 | |
207 | MPMCQueue() noexcept { |
208 | dmult_ = 0; |
209 | closed_ = nullptr; |
210 | } |
211 | |
212 | MPMCQueue(MPMCQueue<T, Atom, true>&& rhs) noexcept { |
213 | this->capacity_ = rhs.capacity_; |
214 | new (&this->dslots_) |
215 | Atom<Slot*>(rhs.dslots_.load(std::memory_order_relaxed)); |
216 | new (&this->dstride_) |
217 | Atom<int>(rhs.dstride_.load(std::memory_order_relaxed)); |
218 | this->dstate_.store( |
219 | rhs.dstate_.load(std::memory_order_relaxed), std::memory_order_relaxed); |
220 | this->dcapacity_.store( |
221 | rhs.dcapacity_.load(std::memory_order_relaxed), |
222 | std::memory_order_relaxed); |
223 | this->pushTicket_.store( |
224 | rhs.pushTicket_.load(std::memory_order_relaxed), |
225 | std::memory_order_relaxed); |
226 | this->popTicket_.store( |
227 | rhs.popTicket_.load(std::memory_order_relaxed), |
228 | std::memory_order_relaxed); |
229 | this->pushSpinCutoff_.store( |
230 | rhs.pushSpinCutoff_.load(std::memory_order_relaxed), |
231 | std::memory_order_relaxed); |
232 | this->popSpinCutoff_.store( |
233 | rhs.popSpinCutoff_.load(std::memory_order_relaxed), |
234 | std::memory_order_relaxed); |
235 | dmult_ = rhs.dmult_; |
236 | closed_ = rhs.closed_; |
237 | |
238 | rhs.capacity_ = 0; |
239 | rhs.dslots_.store(nullptr, std::memory_order_relaxed); |
240 | rhs.dstride_.store(0, std::memory_order_relaxed); |
241 | rhs.dstate_.store(0, std::memory_order_relaxed); |
242 | rhs.dcapacity_.store(0, std::memory_order_relaxed); |
243 | rhs.pushTicket_.store(0, std::memory_order_relaxed); |
244 | rhs.popTicket_.store(0, std::memory_order_relaxed); |
245 | rhs.pushSpinCutoff_.store(0, std::memory_order_relaxed); |
246 | rhs.popSpinCutoff_.store(0, std::memory_order_relaxed); |
247 | rhs.dmult_ = 0; |
248 | rhs.closed_ = nullptr; |
249 | } |
250 | |
251 | MPMCQueue<T, Atom, true> const& operator=(MPMCQueue<T, Atom, true>&& rhs) { |
252 | if (this != &rhs) { |
253 | this->~MPMCQueue(); |
254 | new (this) MPMCQueue(std::move(rhs)); |
255 | } |
256 | return *this; |
257 | } |
258 | |
259 | ~MPMCQueue() { |
260 | if (closed_ != nullptr) { |
261 | for (int i = getNumClosed(this->dstate_.load()) - 1; i >= 0; --i) { |
262 | delete[] closed_[i].slots_; |
263 | } |
264 | delete[] closed_; |
265 | } |
266 | using AtomInt = Atom<int>; |
267 | this->dstride_.~AtomInt(); |
268 | using AtomSlot = Atom<Slot*>; |
269 | // Sort of a hack to get ~MPMCQueueBase to free dslots_ |
270 | auto slots = this->dslots_.load(); |
271 | this->dslots_.~AtomSlot(); |
272 | this->slots_ = slots; |
273 | } |
274 | |
275 | size_t allocatedCapacity() const noexcept { |
276 | return this->dcapacity_.load(std::memory_order_relaxed); |
277 | } |
278 | |
279 | template <typename... Args> |
280 | void blockingWrite(Args&&... args) noexcept { |
281 | uint64_t ticket = this->pushTicket_++; |
282 | Slot* slots; |
283 | size_t cap; |
284 | int stride; |
285 | uint64_t state; |
286 | uint64_t offset; |
287 | do { |
288 | if (!trySeqlockReadSection(state, slots, cap, stride)) { |
289 | asm_volatile_pause(); |
290 | continue; |
291 | } |
292 | if (maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride)) { |
293 | // There was an expansion after this ticket was issued. |
294 | break; |
295 | } |
296 | if (slots[this->idx((ticket - offset), cap, stride)].mayEnqueue( |
297 | this->turn(ticket - offset, cap))) { |
298 | // A slot is ready. No need to expand. |
299 | break; |
300 | } else if ( |
301 | this->popTicket_.load(std::memory_order_relaxed) + cap > ticket) { |
302 | // May block, but a pop is in progress. No need to expand. |
303 | // Get seqlock read section info again in case an expansion |
304 | // occurred with an equal or higher ticket. |
305 | continue; |
306 | } else { |
307 | // May block. See if we can expand. |
308 | if (tryExpand(state, cap)) { |
309 | // This or another thread started an expansion. Get updated info. |
310 | continue; |
311 | } else { |
312 | // Can't expand. |
313 | break; |
314 | } |
315 | } |
316 | } while (true); |
317 | this->enqueueWithTicketBase( |
318 | ticket - offset, slots, cap, stride, std::forward<Args>(args)...); |
319 | } |
320 | |
321 | void blockingReadWithTicket(uint64_t& ticket, T& elem) noexcept { |
322 | ticket = this->popTicket_++; |
323 | Slot* slots; |
324 | size_t cap; |
325 | int stride; |
326 | uint64_t state; |
327 | uint64_t offset; |
328 | while (!trySeqlockReadSection(state, slots, cap, stride)) { |
329 | asm_volatile_pause(); |
330 | } |
331 | // If there was an expansion after the corresponding push ticket |
332 | // was issued, adjust accordingly |
333 | maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride); |
334 | this->dequeueWithTicketBase(ticket - offset, slots, cap, stride, elem); |
335 | } |
336 | |
337 | private: |
338 | enum { |
339 | kSeqlockBits = 6, |
340 | kDefaultMinDynamicCapacity = 10, |
341 | kDefaultExpansionMultiplier = 10, |
342 | }; |
343 | |
344 | size_t dmult_; |
345 | |
346 | // Info about closed slots arrays for use by lagging operations |
347 | ClosedArray* closed_; |
348 | |
349 | void initQueue(const size_t cap, const size_t mult) { |
350 | new (&this->dstride_) Atom<int>(this->computeStride(cap)); |
351 | Slot* slots = new Slot[cap + 2 * this->kSlotPadding]; |
352 | new (&this->dslots_) Atom<Slot*>(slots); |
353 | this->dstate_.store(0); |
354 | this->dcapacity_.store(cap); |
355 | dmult_ = mult; |
356 | size_t maxClosed = 0; |
357 | for (size_t expanded = cap; expanded < this->capacity_; expanded *= mult) { |
358 | ++maxClosed; |
359 | } |
360 | closed_ = (maxClosed > 0) ? new ClosedArray[maxClosed] : nullptr; |
361 | } |
362 | |
363 | bool tryObtainReadyPushTicket( |
364 | uint64_t& ticket, |
365 | Slot*& slots, |
366 | size_t& cap, |
367 | int& stride) noexcept { |
368 | uint64_t state; |
369 | do { |
370 | ticket = this->pushTicket_.load(std::memory_order_acquire); // A |
371 | if (!trySeqlockReadSection(state, slots, cap, stride)) { |
372 | asm_volatile_pause(); |
373 | continue; |
374 | } |
375 | |
376 | // If there was an expansion with offset greater than this ticket, |
377 | // adjust accordingly |
378 | uint64_t offset; |
379 | maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride); |
380 | |
381 | if (slots[this->idx((ticket - offset), cap, stride)].mayEnqueue( |
382 | this->turn(ticket - offset, cap))) { |
383 | // A slot is ready. |
384 | if (this->pushTicket_.compare_exchange_strong(ticket, ticket + 1)) { |
385 | // Adjust ticket |
386 | ticket -= offset; |
387 | return true; |
388 | } else { |
389 | continue; |
390 | } |
391 | } else { |
392 | if (ticket != this->pushTicket_.load(std::memory_order_relaxed)) { // B |
393 | // Try again. Ticket changed. |
394 | continue; |
395 | } |
396 | // Likely to block. |
397 | // Try to expand unless the ticket is for a closed array |
398 | if (offset == getOffset(state)) { |
399 | if (tryExpand(state, cap)) { |
400 | // This or another thread started an expansion. Get up-to-date info. |
401 | continue; |
402 | } |
403 | } |
404 | return false; |
405 | } |
406 | } while (true); |
407 | } |
408 | |
409 | bool tryObtainPromisedPushTicket( |
410 | uint64_t& ticket, |
411 | Slot*& slots, |
412 | size_t& cap, |
413 | int& stride) noexcept { |
414 | uint64_t state; |
415 | do { |
416 | ticket = this->pushTicket_.load(std::memory_order_acquire); |
417 | auto numPops = this->popTicket_.load(std::memory_order_acquire); |
418 | if (!trySeqlockReadSection(state, slots, cap, stride)) { |
419 | asm_volatile_pause(); |
420 | continue; |
421 | } |
422 | |
423 | const auto curCap = cap; |
424 | // If there was an expansion with offset greater than this ticket, |
425 | // adjust accordingly |
426 | uint64_t offset; |
427 | maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride); |
428 | |
429 | int64_t n = ticket - numPops; |
430 | |
431 | if (n >= static_cast<ssize_t>(cap)) { |
432 | if ((cap == curCap) && tryExpand(state, cap)) { |
433 | // This or another thread started an expansion. Start over. |
434 | continue; |
435 | } |
436 | // Can't expand. |
437 | ticket -= offset; |
438 | return false; |
439 | } |
440 | |
441 | if (this->pushTicket_.compare_exchange_strong(ticket, ticket + 1)) { |
442 | // Adjust ticket |
443 | ticket -= offset; |
444 | return true; |
445 | } |
446 | } while (true); |
447 | } |
448 | |
449 | bool tryObtainReadyPopTicket( |
450 | uint64_t& ticket, |
451 | Slot*& slots, |
452 | size_t& cap, |
453 | int& stride) noexcept { |
454 | uint64_t state; |
455 | do { |
456 | ticket = this->popTicket_.load(std::memory_order_relaxed); |
457 | if (!trySeqlockReadSection(state, slots, cap, stride)) { |
458 | asm_volatile_pause(); |
459 | continue; |
460 | } |
461 | |
462 | // If there was an expansion after the corresponding push ticket |
463 | // was issued, adjust accordingly |
464 | uint64_t offset; |
465 | maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride); |
466 | |
467 | if (slots[this->idx((ticket - offset), cap, stride)].mayDequeue( |
468 | this->turn(ticket - offset, cap))) { |
469 | if (this->popTicket_.compare_exchange_strong(ticket, ticket + 1)) { |
470 | // Adjust ticket |
471 | ticket -= offset; |
472 | return true; |
473 | } |
474 | } else { |
475 | return false; |
476 | } |
477 | } while (true); |
478 | } |
479 | |
480 | bool tryObtainPromisedPopTicket( |
481 | uint64_t& ticket, |
482 | Slot*& slots, |
483 | size_t& cap, |
484 | int& stride) noexcept { |
485 | uint64_t state; |
486 | do { |
487 | ticket = this->popTicket_.load(std::memory_order_acquire); |
488 | auto numPushes = this->pushTicket_.load(std::memory_order_acquire); |
489 | if (!trySeqlockReadSection(state, slots, cap, stride)) { |
490 | asm_volatile_pause(); |
491 | continue; |
492 | } |
493 | |
494 | uint64_t offset; |
495 | // If there was an expansion after the corresponding push |
496 | // ticket was issued, adjust accordingly |
497 | maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride); |
498 | |
499 | if (ticket >= numPushes) { |
500 | ticket -= offset; |
501 | return false; |
502 | } |
503 | if (this->popTicket_.compare_exchange_strong(ticket, ticket + 1)) { |
504 | ticket -= offset; |
505 | return true; |
506 | } |
507 | } while (true); |
508 | } |
509 | |
510 | /// Enqueues an element with a specific ticket number |
511 | template <typename... Args> |
512 | void enqueueWithTicket(const uint64_t ticket, Args&&... args) noexcept { |
513 | Slot* slots; |
514 | size_t cap; |
515 | int stride; |
516 | uint64_t state; |
517 | uint64_t offset; |
518 | |
519 | while (!trySeqlockReadSection(state, slots, cap, stride)) { |
520 | } |
521 | |
522 | // If there was an expansion after this ticket was issued, adjust |
523 | // accordingly |
524 | maybeUpdateFromClosed(state, ticket, offset, slots, cap, stride); |
525 | |
526 | this->enqueueWithTicketBase( |
527 | ticket - offset, slots, cap, stride, std::forward<Args>(args)...); |
528 | } |
529 | |
530 | uint64_t getOffset(const uint64_t state) const noexcept { |
531 | return state >> kSeqlockBits; |
532 | } |
533 | |
534 | int getNumClosed(const uint64_t state) const noexcept { |
535 | return (state & ((1 << kSeqlockBits) - 1)) >> 1; |
536 | } |
537 | |
538 | /// Try to expand the queue. Returns true if this expansion was |
539 | /// successful or a concurent expansion is in progress. Returns |
540 | /// false if the queue has reached its maximum capacity or |
541 | /// allocation has failed. |
542 | bool tryExpand(const uint64_t state, const size_t cap) noexcept { |
543 | if (cap == this->capacity_) { |
544 | return false; |
545 | } |
546 | // Acquire seqlock |
547 | uint64_t oldval = state; |
548 | assert((state & 1) == 0); |
549 | if (this->dstate_.compare_exchange_strong(oldval, state + 1)) { |
550 | assert(cap == this->dcapacity_.load()); |
551 | uint64_t ticket = |
552 | 1 + std::max(this->pushTicket_.load(), this->popTicket_.load()); |
553 | size_t newCapacity = std::min(dmult_ * cap, this->capacity_); |
554 | Slot* newSlots = |
555 | new (std::nothrow) Slot[newCapacity + 2 * this->kSlotPadding]; |
556 | if (newSlots == nullptr) { |
557 | // Expansion failed. Restore the seqlock |
558 | this->dstate_.store(state); |
559 | return false; |
560 | } |
561 | // Successful expansion |
562 | // calculate the current ticket offset |
563 | uint64_t offset = getOffset(state); |
564 | // calculate index in closed array |
565 | int index = getNumClosed(state); |
566 | assert((index << 1) < (1 << kSeqlockBits)); |
567 | // fill the info for the closed slots array |
568 | closed_[index].offset_ = offset; |
569 | closed_[index].slots_ = this->dslots_.load(); |
570 | closed_[index].capacity_ = cap; |
571 | closed_[index].stride_ = this->dstride_.load(); |
572 | // update the new slots array info |
573 | this->dslots_.store(newSlots); |
574 | this->dcapacity_.store(newCapacity); |
575 | this->dstride_.store(this->computeStride(newCapacity)); |
576 | // Release the seqlock and record the new ticket offset |
577 | this->dstate_.store((ticket << kSeqlockBits) + (2 * (index + 1))); |
578 | return true; |
579 | } else { // failed to acquire seqlock |
580 | // Someone acaquired the seqlock. Go back to the caller and get |
581 | // up-to-date info. |
582 | return true; |
583 | } |
584 | } |
585 | |
586 | /// Seqlock read-only section |
587 | bool trySeqlockReadSection( |
588 | uint64_t& state, |
589 | Slot*& slots, |
590 | size_t& cap, |
591 | int& stride) noexcept { |
592 | state = this->dstate_.load(std::memory_order_acquire); |
593 | if (state & 1) { |
594 | // Locked. |
595 | return false; |
596 | } |
597 | // Start read-only section. |
598 | slots = this->dslots_.load(std::memory_order_relaxed); |
599 | cap = this->dcapacity_.load(std::memory_order_relaxed); |
600 | stride = this->dstride_.load(std::memory_order_relaxed); |
601 | // End of read-only section. Validate seqlock. |
602 | std::atomic_thread_fence(std::memory_order_acquire); |
603 | return (state == this->dstate_.load(std::memory_order_relaxed)); |
604 | } |
605 | |
606 | /// If there was an expansion after ticket was issued, update local variables |
607 | /// of the lagging operation using the most recent closed array with |
608 | /// offset <= ticket and return true. Otherwise, return false; |
609 | bool maybeUpdateFromClosed( |
610 | const uint64_t state, |
611 | const uint64_t ticket, |
612 | uint64_t& offset, |
613 | Slot*& slots, |
614 | size_t& cap, |
615 | int& stride) noexcept { |
616 | offset = getOffset(state); |
617 | if (ticket >= offset) { |
618 | return false; |
619 | } |
620 | for (int i = getNumClosed(state) - 1; i >= 0; --i) { |
621 | offset = closed_[i].offset_; |
622 | if (offset <= ticket) { |
623 | slots = closed_[i].slots_; |
624 | cap = closed_[i].capacity_; |
625 | stride = closed_[i].stride_; |
626 | return true; |
627 | } |
628 | } |
629 | // A closed array with offset <= ticket should have been found |
630 | assert(false); |
631 | return false; |
632 | } |
633 | }; |
634 | |
635 | namespace detail { |
636 | |
637 | /// CRTP specialization of MPMCQueueBase |
638 | template < |
639 | template <typename T, template <typename> class Atom, bool Dynamic> |
640 | class Derived, |
641 | typename T, |
642 | template <typename> class Atom, |
643 | bool Dynamic> |
644 | class MPMCQueueBase<Derived<T, Atom, Dynamic>> { |
645 | // Note: Using CRTP static casts in several functions of this base |
646 | // template instead of making called functions virtual or duplicating |
647 | // the code of calling functions in the derived partially specialized |
648 | // template |
649 | |
650 | static_assert( |
651 | std::is_nothrow_constructible<T, T&&>::value || |
652 | folly::IsRelocatable<T>::value, |
653 | "T must be relocatable or have a noexcept move constructor" ); |
654 | |
655 | public: |
656 | typedef T value_type; |
657 | |
658 | using Slot = detail::SingleElementQueue<T, Atom>; |
659 | |
660 | explicit MPMCQueueBase(size_t queueCapacity) |
661 | : capacity_(queueCapacity), |
662 | dstate_(0), |
663 | dcapacity_(0), |
664 | pushTicket_(0), |
665 | popTicket_(0), |
666 | pushSpinCutoff_(0), |
667 | popSpinCutoff_(0) { |
668 | if (queueCapacity == 0) { |
669 | throw std::invalid_argument( |
670 | "MPMCQueue with explicit capacity 0 is impossible" |
671 | // Stride computation in derived classes would sigfpe if capacity is 0 |
672 | ); |
673 | } |
674 | |
675 | // ideally this would be a static assert, but g++ doesn't allow it |
676 | assert( |
677 | alignof(MPMCQueue<T, Atom>) >= hardware_destructive_interference_size); |
678 | assert( |
679 | static_cast<uint8_t*>(static_cast<void*>(&popTicket_)) - |
680 | static_cast<uint8_t*>(static_cast<void*>(&pushTicket_)) >= |
681 | static_cast<ptrdiff_t>(hardware_destructive_interference_size)); |
682 | } |
683 | |
684 | /// A default-constructed queue is useful because a usable (non-zero |
685 | /// capacity) queue can be moved onto it or swapped with it |
686 | MPMCQueueBase() noexcept |
687 | : capacity_(0), |
688 | slots_(nullptr), |
689 | stride_(0), |
690 | dstate_(0), |
691 | dcapacity_(0), |
692 | pushTicket_(0), |
693 | popTicket_(0), |
694 | pushSpinCutoff_(0), |
695 | popSpinCutoff_(0) {} |
696 | |
697 | /// IMPORTANT: The move constructor is here to make it easier to perform |
698 | /// the initialization phase, it is not safe to use when there are any |
699 | /// concurrent accesses (this is not checked). |
700 | MPMCQueueBase(MPMCQueueBase<Derived<T, Atom, Dynamic>>&& rhs) noexcept |
701 | : capacity_(rhs.capacity_), |
702 | slots_(rhs.slots_), |
703 | stride_(rhs.stride_), |
704 | dstate_(rhs.dstate_.load(std::memory_order_relaxed)), |
705 | dcapacity_(rhs.dcapacity_.load(std::memory_order_relaxed)), |
706 | pushTicket_(rhs.pushTicket_.load(std::memory_order_relaxed)), |
707 | popTicket_(rhs.popTicket_.load(std::memory_order_relaxed)), |
708 | pushSpinCutoff_(rhs.pushSpinCutoff_.load(std::memory_order_relaxed)), |
709 | popSpinCutoff_(rhs.popSpinCutoff_.load(std::memory_order_relaxed)) { |
710 | // relaxed ops are okay for the previous reads, since rhs queue can't |
711 | // be in concurrent use |
712 | |
713 | // zero out rhs |
714 | rhs.capacity_ = 0; |
715 | rhs.slots_ = nullptr; |
716 | rhs.stride_ = 0; |
717 | rhs.dstate_.store(0, std::memory_order_relaxed); |
718 | rhs.dcapacity_.store(0, std::memory_order_relaxed); |
719 | rhs.pushTicket_.store(0, std::memory_order_relaxed); |
720 | rhs.popTicket_.store(0, std::memory_order_relaxed); |
721 | rhs.pushSpinCutoff_.store(0, std::memory_order_relaxed); |
722 | rhs.popSpinCutoff_.store(0, std::memory_order_relaxed); |
723 | } |
724 | |
725 | /// IMPORTANT: The move operator is here to make it easier to perform |
726 | /// the initialization phase, it is not safe to use when there are any |
727 | /// concurrent accesses (this is not checked). |
728 | MPMCQueueBase<Derived<T, Atom, Dynamic>> const& operator=( |
729 | MPMCQueueBase<Derived<T, Atom, Dynamic>>&& rhs) { |
730 | if (this != &rhs) { |
731 | this->~MPMCQueueBase(); |
732 | new (this) MPMCQueueBase(std::move(rhs)); |
733 | } |
734 | return *this; |
735 | } |
736 | |
737 | MPMCQueueBase(const MPMCQueueBase&) = delete; |
738 | MPMCQueueBase& operator=(const MPMCQueueBase&) = delete; |
739 | |
740 | /// MPMCQueue can only be safely destroyed when there are no |
741 | /// pending enqueuers or dequeuers (this is not checked). |
742 | ~MPMCQueueBase() { |
743 | delete[] slots_; |
744 | } |
745 | |
746 | /// Returns the number of writes (including threads that are blocked waiting |
747 | /// to write) minus the number of reads (including threads that are blocked |
748 | /// waiting to read). So effectively, it becomes: |
749 | /// elements in queue + pending(calls to write) - pending(calls to read). |
750 | /// If nothing is pending, then the method returns the actual number of |
751 | /// elements in the queue. |
752 | /// The returned value can be negative if there are no writers and the queue |
753 | /// is empty, but there is one reader that is blocked waiting to read (in |
754 | /// which case, the returned size will be -1). |
755 | ssize_t size() const noexcept { |
756 | // since both pushes and pops increase monotonically, we can get a |
757 | // consistent snapshot either by bracketing a read of popTicket_ with |
758 | // two reads of pushTicket_ that return the same value, or the other |
759 | // way around. We maximize our chances by alternately attempting |
760 | // both bracketings. |
761 | uint64_t pushes = pushTicket_.load(std::memory_order_acquire); // A |
762 | uint64_t pops = popTicket_.load(std::memory_order_acquire); // B |
763 | while (true) { |
764 | uint64_t nextPushes = pushTicket_.load(std::memory_order_acquire); // C |
765 | if (pushes == nextPushes) { |
766 | // pushTicket_ didn't change from A (or the previous C) to C, |
767 | // so we can linearize at B (or D) |
768 | return ssize_t(pushes - pops); |
769 | } |
770 | pushes = nextPushes; |
771 | uint64_t nextPops = popTicket_.load(std::memory_order_acquire); // D |
772 | if (pops == nextPops) { |
773 | // popTicket_ didn't chance from B (or the previous D), so we |
774 | // can linearize at C |
775 | return ssize_t(pushes - pops); |
776 | } |
777 | pops = nextPops; |
778 | } |
779 | } |
780 | |
781 | /// Returns true if there are no items available for dequeue |
782 | bool isEmpty() const noexcept { |
783 | return size() <= 0; |
784 | } |
785 | |
786 | /// Returns true if there is currently no empty space to enqueue |
787 | bool isFull() const noexcept { |
788 | // careful with signed -> unsigned promotion, since size can be negative |
789 | return size() >= static_cast<ssize_t>(capacity_); |
790 | } |
791 | |
792 | /// Returns is a guess at size() for contexts that don't need a precise |
793 | /// value, such as stats. More specifically, it returns the number of writes |
794 | /// minus the number of reads, but after reading the number of writes, more |
795 | /// writers could have came before the number of reads was sampled, |
796 | /// and this method doesn't protect against such case. |
797 | /// The returned value can be negative. |
798 | ssize_t sizeGuess() const noexcept { |
799 | return writeCount() - readCount(); |
800 | } |
801 | |
802 | /// Doesn't change |
803 | size_t capacity() const noexcept { |
804 | return capacity_; |
805 | } |
806 | |
807 | /// Doesn't change for non-dynamic |
808 | size_t allocatedCapacity() const noexcept { |
809 | return capacity_; |
810 | } |
811 | |
812 | /// Returns the total number of calls to blockingWrite or successful |
813 | /// calls to write, including those blockingWrite calls that are |
814 | /// currently blocking |
815 | uint64_t writeCount() const noexcept { |
816 | return pushTicket_.load(std::memory_order_acquire); |
817 | } |
818 | |
819 | /// Returns the total number of calls to blockingRead or successful |
820 | /// calls to read, including those blockingRead calls that are currently |
821 | /// blocking |
822 | uint64_t readCount() const noexcept { |
823 | return popTicket_.load(std::memory_order_acquire); |
824 | } |
825 | |
826 | /// Enqueues a T constructed from args, blocking until space is |
827 | /// available. Note that this method signature allows enqueue via |
828 | /// move, if args is a T rvalue, via copy, if args is a T lvalue, or |
829 | /// via emplacement if args is an initializer list that can be passed |
830 | /// to a T constructor. |
831 | template <typename... Args> |
832 | void blockingWrite(Args&&... args) noexcept { |
833 | enqueueWithTicketBase( |
834 | pushTicket_++, slots_, capacity_, stride_, std::forward<Args>(args)...); |
835 | } |
836 | |
837 | /// If an item can be enqueued with no blocking, does so and returns |
838 | /// true, otherwise returns false. This method is similar to |
839 | /// writeIfNotFull, but if you don't have a specific need for that |
840 | /// method you should use this one. |
841 | /// |
842 | /// One of the common usages of this method is to enqueue via the |
843 | /// move constructor, something like q.write(std::move(x)). If write |
844 | /// returns false because the queue is full then x has not actually been |
845 | /// consumed, which looks strange. To understand why it is actually okay |
846 | /// to use x afterward, remember that std::move is just a typecast that |
847 | /// provides an rvalue reference that enables use of a move constructor |
848 | /// or operator. std::move doesn't actually move anything. It could |
849 | /// more accurately be called std::rvalue_cast or std::move_permission. |
850 | template <typename... Args> |
851 | bool write(Args&&... args) noexcept { |
852 | uint64_t ticket; |
853 | Slot* slots; |
854 | size_t cap; |
855 | int stride; |
856 | if (static_cast<Derived<T, Atom, Dynamic>*>(this)->tryObtainReadyPushTicket( |
857 | ticket, slots, cap, stride)) { |
858 | // we have pre-validated that the ticket won't block |
859 | enqueueWithTicketBase( |
860 | ticket, slots, cap, stride, std::forward<Args>(args)...); |
861 | return true; |
862 | } else { |
863 | return false; |
864 | } |
865 | } |
866 | |
867 | template <class Clock, typename... Args> |
868 | bool tryWriteUntil( |
869 | const std::chrono::time_point<Clock>& when, |
870 | Args&&... args) noexcept { |
871 | uint64_t ticket; |
872 | Slot* slots; |
873 | size_t cap; |
874 | int stride; |
875 | if (tryObtainPromisedPushTicketUntil(ticket, slots, cap, stride, when)) { |
876 | // we have pre-validated that the ticket won't block, or rather that |
877 | // it won't block longer than it takes another thread to dequeue an |
878 | // element from the slot it identifies. |
879 | enqueueWithTicketBase( |
880 | ticket, slots, cap, stride, std::forward<Args>(args)...); |
881 | return true; |
882 | } else { |
883 | return false; |
884 | } |
885 | } |
886 | |
887 | /// If the queue is not full, enqueues and returns true, otherwise |
888 | /// returns false. Unlike write this method can be blocked by another |
889 | /// thread, specifically a read that has linearized (been assigned |
890 | /// a ticket) but not yet completed. If you don't really need this |
891 | /// function you should probably use write. |
892 | /// |
893 | /// MPMCQueue isn't lock-free, so just because a read operation has |
894 | /// linearized (and isFull is false) doesn't mean that space has been |
895 | /// made available for another write. In this situation write will |
896 | /// return false, but writeIfNotFull will wait for the dequeue to finish. |
897 | /// This method is required if you are composing queues and managing |
898 | /// your own wakeup, because it guarantees that after every successful |
899 | /// write a readIfNotEmpty will succeed. |
900 | template <typename... Args> |
901 | bool writeIfNotFull(Args&&... args) noexcept { |
902 | uint64_t ticket; |
903 | Slot* slots; |
904 | size_t cap; |
905 | int stride; |
906 | if (static_cast<Derived<T, Atom, Dynamic>*>(this) |
907 | ->tryObtainPromisedPushTicket(ticket, slots, cap, stride)) { |
908 | // some other thread is already dequeuing the slot into which we |
909 | // are going to enqueue, but we might have to wait for them to finish |
910 | enqueueWithTicketBase( |
911 | ticket, slots, cap, stride, std::forward<Args>(args)...); |
912 | return true; |
913 | } else { |
914 | return false; |
915 | } |
916 | } |
917 | |
918 | /// Moves a dequeued element onto elem, blocking until an element |
919 | /// is available |
920 | void blockingRead(T& elem) noexcept { |
921 | uint64_t ticket; |
922 | static_cast<Derived<T, Atom, Dynamic>*>(this)->blockingReadWithTicket( |
923 | ticket, elem); |
924 | } |
925 | |
926 | /// Same as blockingRead() but also records the ticket nunmer |
927 | void blockingReadWithTicket(uint64_t& ticket, T& elem) noexcept { |
928 | assert(capacity_ != 0); |
929 | ticket = popTicket_++; |
930 | dequeueWithTicketBase(ticket, slots_, capacity_, stride_, elem); |
931 | } |
932 | |
933 | /// If an item can be dequeued with no blocking, does so and returns |
934 | /// true, otherwise returns false. |
935 | bool read(T& elem) noexcept { |
936 | uint64_t ticket; |
937 | return readAndGetTicket(ticket, elem); |
938 | } |
939 | |
940 | /// Same as read() but also records the ticket nunmer |
941 | bool readAndGetTicket(uint64_t& ticket, T& elem) noexcept { |
942 | Slot* slots; |
943 | size_t cap; |
944 | int stride; |
945 | if (static_cast<Derived<T, Atom, Dynamic>*>(this)->tryObtainReadyPopTicket( |
946 | ticket, slots, cap, stride)) { |
947 | // the ticket has been pre-validated to not block |
948 | dequeueWithTicketBase(ticket, slots, cap, stride, elem); |
949 | return true; |
950 | } else { |
951 | return false; |
952 | } |
953 | } |
954 | |
955 | template <class Clock, typename... Args> |
956 | bool tryReadUntil( |
957 | const std::chrono::time_point<Clock>& when, |
958 | T& elem) noexcept { |
959 | uint64_t ticket; |
960 | Slot* slots; |
961 | size_t cap; |
962 | int stride; |
963 | if (tryObtainPromisedPopTicketUntil(ticket, slots, cap, stride, when)) { |
964 | // we have pre-validated that the ticket won't block, or rather that |
965 | // it won't block longer than it takes another thread to enqueue an |
966 | // element on the slot it identifies. |
967 | dequeueWithTicketBase(ticket, slots, cap, stride, elem); |
968 | return true; |
969 | } else { |
970 | return false; |
971 | } |
972 | } |
973 | |
974 | /// If the queue is not empty, dequeues and returns true, otherwise |
975 | /// returns false. If the matching write is still in progress then this |
976 | /// method may block waiting for it. If you don't rely on being able |
977 | /// to dequeue (such as by counting completed write) then you should |
978 | /// prefer read. |
979 | bool readIfNotEmpty(T& elem) noexcept { |
980 | uint64_t ticket; |
981 | Slot* slots; |
982 | size_t cap; |
983 | int stride; |
984 | if (static_cast<Derived<T, Atom, Dynamic>*>(this) |
985 | ->tryObtainPromisedPopTicket(ticket, slots, cap, stride)) { |
986 | // the matching enqueue already has a ticket, but might not be done |
987 | dequeueWithTicketBase(ticket, slots, cap, stride, elem); |
988 | return true; |
989 | } else { |
990 | return false; |
991 | } |
992 | } |
993 | |
994 | protected: |
995 | enum { |
996 | /// Once every kAdaptationFreq we will spin longer, to try to estimate |
997 | /// the proper spin backoff |
998 | kAdaptationFreq = 128, |
999 | |
1000 | /// To avoid false sharing in slots_ with neighboring memory |
1001 | /// allocations, we pad it with this many SingleElementQueue-s at |
1002 | /// each end |
1003 | kSlotPadding = |
1004 | (hardware_destructive_interference_size - 1) / sizeof(Slot) + 1 |
1005 | }; |
1006 | |
1007 | /// The maximum number of items in the queue at once |
1008 | alignas(hardware_destructive_interference_size) size_t capacity_; |
1009 | |
1010 | /// Anonymous union for use when Dynamic = false and true, respectively |
1011 | union { |
1012 | /// An array of capacity_ SingleElementQueue-s, each of which holds |
1013 | /// either 0 or 1 item. We over-allocate by 2 * kSlotPadding and don't |
1014 | /// touch the slots at either end, to avoid false sharing |
1015 | Slot* slots_; |
1016 | /// Current dynamic slots array of dcapacity_ SingleElementQueue-s |
1017 | Atom<Slot*> dslots_; |
1018 | }; |
1019 | |
1020 | /// Anonymous union for use when Dynamic = false and true, respectively |
1021 | union { |
1022 | /// The number of slots_ indices that we advance for each ticket, to |
1023 | /// avoid false sharing. Ideally slots_[i] and slots_[i + stride_] |
1024 | /// aren't on the same cache line |
1025 | int stride_; |
1026 | /// Current stride |
1027 | Atom<int> dstride_; |
1028 | }; |
1029 | |
1030 | /// The following two memebers are used by dynamic MPMCQueue. |
1031 | /// Ideally they should be in MPMCQueue<T,Atom,true>, but we get |
1032 | /// better cache locality if they are in the same cache line as |
1033 | /// dslots_ and dstride_. |
1034 | /// |
1035 | /// Dynamic state. A packed seqlock and ticket offset |
1036 | Atom<uint64_t> dstate_; |
1037 | /// Dynamic capacity |
1038 | Atom<size_t> dcapacity_; |
1039 | |
1040 | /// Enqueuers get tickets from here |
1041 | alignas(hardware_destructive_interference_size) Atom<uint64_t> pushTicket_; |
1042 | |
1043 | /// Dequeuers get tickets from here |
1044 | alignas(hardware_destructive_interference_size) Atom<uint64_t> popTicket_; |
1045 | |
1046 | /// This is how many times we will spin before using FUTEX_WAIT when |
1047 | /// the queue is full on enqueue, adaptively computed by occasionally |
1048 | /// spinning for longer and smoothing with an exponential moving average |
1049 | alignas( |
1050 | hardware_destructive_interference_size) Atom<uint32_t> pushSpinCutoff_; |
1051 | |
1052 | /// The adaptive spin cutoff when the queue is empty on dequeue |
1053 | alignas(hardware_destructive_interference_size) Atom<uint32_t> popSpinCutoff_; |
1054 | |
1055 | /// Alignment doesn't prevent false sharing at the end of the struct, |
1056 | /// so fill out the last cache line |
1057 | char pad_[hardware_destructive_interference_size - sizeof(Atom<uint32_t>)]; |
1058 | |
1059 | /// We assign tickets in increasing order, but we don't want to |
1060 | /// access neighboring elements of slots_ because that will lead to |
1061 | /// false sharing (multiple cores accessing the same cache line even |
1062 | /// though they aren't accessing the same bytes in that cache line). |
1063 | /// To avoid this we advance by stride slots per ticket. |
1064 | /// |
1065 | /// We need gcd(capacity, stride) to be 1 so that we will use all |
1066 | /// of the slots. We ensure this by only considering prime strides, |
1067 | /// which either have no common divisors with capacity or else have |
1068 | /// a zero remainder after dividing by capacity. That is sufficient |
1069 | /// to guarantee correctness, but we also want to actually spread the |
1070 | /// accesses away from each other to avoid false sharing (consider a |
1071 | /// stride of 7 with a capacity of 8). To that end we try a few taking |
1072 | /// care to observe that advancing by -1 is as bad as advancing by 1 |
1073 | /// when in comes to false sharing. |
1074 | /// |
1075 | /// The simple way to avoid false sharing would be to pad each |
1076 | /// SingleElementQueue, but since we have capacity_ of them that could |
1077 | /// waste a lot of space. |
1078 | static int computeStride(size_t capacity) noexcept { |
1079 | static const int smallPrimes[] = {2, 3, 5, 7, 11, 13, 17, 19, 23}; |
1080 | |
1081 | int bestStride = 1; |
1082 | size_t bestSep = 1; |
1083 | for (int stride : smallPrimes) { |
1084 | if ((stride % capacity) == 0 || (capacity % stride) == 0) { |
1085 | continue; |
1086 | } |
1087 | size_t sep = stride % capacity; |
1088 | sep = std::min(sep, capacity - sep); |
1089 | if (sep > bestSep) { |
1090 | bestStride = stride; |
1091 | bestSep = sep; |
1092 | } |
1093 | } |
1094 | return bestStride; |
1095 | } |
1096 | |
1097 | /// Returns the index into slots_ that should be used when enqueuing or |
1098 | /// dequeuing with the specified ticket |
1099 | size_t idx(uint64_t ticket, size_t cap, int stride) noexcept { |
1100 | return ((ticket * stride) % cap) + kSlotPadding; |
1101 | } |
1102 | |
1103 | /// Maps an enqueue or dequeue ticket to the turn should be used at the |
1104 | /// corresponding SingleElementQueue |
1105 | uint32_t turn(uint64_t ticket, size_t cap) noexcept { |
1106 | assert(cap != 0); |
1107 | return uint32_t(ticket / cap); |
1108 | } |
1109 | |
1110 | /// Tries to obtain a push ticket for which SingleElementQueue::enqueue |
1111 | /// won't block. Returns true on immediate success, false on immediate |
1112 | /// failure. |
1113 | bool tryObtainReadyPushTicket( |
1114 | uint64_t& ticket, |
1115 | Slot*& slots, |
1116 | size_t& cap, |
1117 | int& stride) noexcept { |
1118 | ticket = pushTicket_.load(std::memory_order_acquire); // A |
1119 | slots = slots_; |
1120 | cap = capacity_; |
1121 | stride = stride_; |
1122 | while (true) { |
1123 | if (!slots[idx(ticket, cap, stride)].mayEnqueue(turn(ticket, cap))) { |
1124 | // if we call enqueue(ticket, ...) on the SingleElementQueue |
1125 | // right now it would block, but this might no longer be the next |
1126 | // ticket. We can increase the chance of tryEnqueue success under |
1127 | // contention (without blocking) by rechecking the ticket dispenser |
1128 | auto prev = ticket; |
1129 | ticket = pushTicket_.load(std::memory_order_acquire); // B |
1130 | if (prev == ticket) { |
1131 | // mayEnqueue was bracketed by two reads (A or prev B or prev |
1132 | // failing CAS to B), so we are definitely unable to enqueue |
1133 | return false; |
1134 | } |
1135 | } else { |
1136 | // we will bracket the mayEnqueue check with a read (A or prev B |
1137 | // or prev failing CAS) and the following CAS. If the CAS fails |
1138 | // it will effect a load of pushTicket_ |
1139 | if (pushTicket_.compare_exchange_strong(ticket, ticket + 1)) { |
1140 | return true; |
1141 | } |
1142 | } |
1143 | } |
1144 | } |
1145 | |
1146 | /// Tries until when to obtain a push ticket for which |
1147 | /// SingleElementQueue::enqueue won't block. Returns true on success, false |
1148 | /// on failure. |
1149 | /// ticket is filled on success AND failure. |
1150 | template <class Clock> |
1151 | bool tryObtainPromisedPushTicketUntil( |
1152 | uint64_t& ticket, |
1153 | Slot*& slots, |
1154 | size_t& cap, |
1155 | int& stride, |
1156 | const std::chrono::time_point<Clock>& when) noexcept { |
1157 | bool deadlineReached = false; |
1158 | while (!deadlineReached) { |
1159 | if (static_cast<Derived<T, Atom, Dynamic>*>(this) |
1160 | ->tryObtainPromisedPushTicket(ticket, slots, cap, stride)) { |
1161 | return true; |
1162 | } |
1163 | // ticket is a blocking ticket until the preceding ticket has been |
1164 | // processed: wait until this ticket's turn arrives. We have not reserved |
1165 | // this ticket so we will have to re-attempt to get a non-blocking ticket |
1166 | // if we wake up before we time-out. |
1167 | deadlineReached = |
1168 | !slots[idx(ticket, cap, stride)].tryWaitForEnqueueTurnUntil( |
1169 | turn(ticket, cap), |
1170 | pushSpinCutoff_, |
1171 | (ticket % kAdaptationFreq) == 0, |
1172 | when); |
1173 | } |
1174 | return false; |
1175 | } |
1176 | |
1177 | /// Tries to obtain a push ticket which can be satisfied if all |
1178 | /// in-progress pops complete. This function does not block, but |
1179 | /// blocking may be required when using the returned ticket if some |
1180 | /// other thread's pop is still in progress (ticket has been granted but |
1181 | /// pop has not yet completed). |
1182 | bool tryObtainPromisedPushTicket( |
1183 | uint64_t& ticket, |
1184 | Slot*& slots, |
1185 | size_t& cap, |
1186 | int& stride) noexcept { |
1187 | auto numPushes = pushTicket_.load(std::memory_order_acquire); // A |
1188 | slots = slots_; |
1189 | cap = capacity_; |
1190 | stride = stride_; |
1191 | while (true) { |
1192 | ticket = numPushes; |
1193 | const auto numPops = popTicket_.load(std::memory_order_acquire); // B |
1194 | // n will be negative if pops are pending |
1195 | const int64_t n = int64_t(numPushes - numPops); |
1196 | if (n >= static_cast<ssize_t>(capacity_)) { |
1197 | // Full, linearize at B. We don't need to recheck the read we |
1198 | // performed at A, because if numPushes was stale at B then the |
1199 | // real numPushes value is even worse |
1200 | return false; |
1201 | } |
1202 | if (pushTicket_.compare_exchange_strong(numPushes, numPushes + 1)) { |
1203 | return true; |
1204 | } |
1205 | } |
1206 | } |
1207 | |
1208 | /// Tries to obtain a pop ticket for which SingleElementQueue::dequeue |
1209 | /// won't block. Returns true on immediate success, false on immediate |
1210 | /// failure. |
1211 | bool tryObtainReadyPopTicket( |
1212 | uint64_t& ticket, |
1213 | Slot*& slots, |
1214 | size_t& cap, |
1215 | int& stride) noexcept { |
1216 | ticket = popTicket_.load(std::memory_order_acquire); |
1217 | slots = slots_; |
1218 | cap = capacity_; |
1219 | stride = stride_; |
1220 | while (true) { |
1221 | if (!slots[idx(ticket, cap, stride)].mayDequeue(turn(ticket, cap))) { |
1222 | auto prev = ticket; |
1223 | ticket = popTicket_.load(std::memory_order_acquire); |
1224 | if (prev == ticket) { |
1225 | return false; |
1226 | } |
1227 | } else { |
1228 | if (popTicket_.compare_exchange_strong(ticket, ticket + 1)) { |
1229 | return true; |
1230 | } |
1231 | } |
1232 | } |
1233 | } |
1234 | |
1235 | /// Tries until when to obtain a pop ticket for which |
1236 | /// SingleElementQueue::dequeue won't block. Returns true on success, false |
1237 | /// on failure. |
1238 | /// ticket is filled on success AND failure. |
1239 | template <class Clock> |
1240 | bool tryObtainPromisedPopTicketUntil( |
1241 | uint64_t& ticket, |
1242 | Slot*& slots, |
1243 | size_t& cap, |
1244 | int& stride, |
1245 | const std::chrono::time_point<Clock>& when) noexcept { |
1246 | bool deadlineReached = false; |
1247 | while (!deadlineReached) { |
1248 | if (static_cast<Derived<T, Atom, Dynamic>*>(this) |
1249 | ->tryObtainPromisedPopTicket(ticket, slots, cap, stride)) { |
1250 | return true; |
1251 | } |
1252 | // ticket is a blocking ticket until the preceding ticket has been |
1253 | // processed: wait until this ticket's turn arrives. We have not reserved |
1254 | // this ticket so we will have to re-attempt to get a non-blocking ticket |
1255 | // if we wake up before we time-out. |
1256 | deadlineReached = |
1257 | !slots[idx(ticket, cap, stride)].tryWaitForDequeueTurnUntil( |
1258 | turn(ticket, cap), |
1259 | pushSpinCutoff_, |
1260 | (ticket % kAdaptationFreq) == 0, |
1261 | when); |
1262 | } |
1263 | return false; |
1264 | } |
1265 | |
1266 | /// Similar to tryObtainReadyPopTicket, but returns a pop ticket whose |
1267 | /// corresponding push ticket has already been handed out, rather than |
1268 | /// returning one whose corresponding push ticket has already been |
1269 | /// completed. This means that there is a possibility that the caller |
1270 | /// will block when using the ticket, but it allows the user to rely on |
1271 | /// the fact that if enqueue has succeeded, tryObtainPromisedPopTicket |
1272 | /// will return true. The "try" part of this is that we won't have |
1273 | /// to block waiting for someone to call enqueue, although we might |
1274 | /// have to block waiting for them to finish executing code inside the |
1275 | /// MPMCQueue itself. |
1276 | bool tryObtainPromisedPopTicket( |
1277 | uint64_t& ticket, |
1278 | Slot*& slots, |
1279 | size_t& cap, |
1280 | int& stride) noexcept { |
1281 | auto numPops = popTicket_.load(std::memory_order_acquire); // A |
1282 | slots = slots_; |
1283 | cap = capacity_; |
1284 | stride = stride_; |
1285 | while (true) { |
1286 | ticket = numPops; |
1287 | const auto numPushes = pushTicket_.load(std::memory_order_acquire); // B |
1288 | if (numPops >= numPushes) { |
1289 | // Empty, or empty with pending pops. Linearize at B. We don't |
1290 | // need to recheck the read we performed at A, because if numPops |
1291 | // is stale then the fresh value is larger and the >= is still true |
1292 | return false; |
1293 | } |
1294 | if (popTicket_.compare_exchange_strong(numPops, numPops + 1)) { |
1295 | return true; |
1296 | } |
1297 | } |
1298 | } |
1299 | |
1300 | // Given a ticket, constructs an enqueued item using args |
1301 | template <typename... Args> |
1302 | void enqueueWithTicketBase( |
1303 | uint64_t ticket, |
1304 | Slot* slots, |
1305 | size_t cap, |
1306 | int stride, |
1307 | Args&&... args) noexcept { |
1308 | slots[idx(ticket, cap, stride)].enqueue( |
1309 | turn(ticket, cap), |
1310 | pushSpinCutoff_, |
1311 | (ticket % kAdaptationFreq) == 0, |
1312 | std::forward<Args>(args)...); |
1313 | } |
1314 | |
1315 | // To support tracking ticket numbers in MPMCPipelineStageImpl |
1316 | template <typename... Args> |
1317 | void enqueueWithTicket(uint64_t ticket, Args&&... args) noexcept { |
1318 | enqueueWithTicketBase( |
1319 | ticket, slots_, capacity_, stride_, std::forward<Args>(args)...); |
1320 | } |
1321 | |
1322 | // Given a ticket, dequeues the corresponding element |
1323 | void dequeueWithTicketBase( |
1324 | uint64_t ticket, |
1325 | Slot* slots, |
1326 | size_t cap, |
1327 | int stride, |
1328 | T& elem) noexcept { |
1329 | assert(cap != 0); |
1330 | slots[idx(ticket, cap, stride)].dequeue( |
1331 | turn(ticket, cap), |
1332 | popSpinCutoff_, |
1333 | (ticket % kAdaptationFreq) == 0, |
1334 | elem); |
1335 | } |
1336 | }; |
1337 | |
1338 | /// SingleElementQueue implements a blocking queue that holds at most one |
1339 | /// item, and that requires its users to assign incrementing identifiers |
1340 | /// (turns) to each enqueue and dequeue operation. Note that the turns |
1341 | /// used by SingleElementQueue are doubled inside the TurnSequencer |
1342 | template <typename T, template <typename> class Atom> |
1343 | struct SingleElementQueue { |
1344 | ~SingleElementQueue() noexcept { |
1345 | if ((sequencer_.uncompletedTurnLSB() & 1) == 1) { |
1346 | // we are pending a dequeue, so we have a constructed item |
1347 | destroyContents(); |
1348 | } |
1349 | } |
1350 | |
1351 | /// enqueue using in-place noexcept construction |
1352 | template < |
1353 | typename... Args, |
1354 | typename = typename std::enable_if< |
1355 | std::is_nothrow_constructible<T, Args...>::value>::type> |
1356 | void enqueue( |
1357 | const uint32_t turn, |
1358 | Atom<uint32_t>& spinCutoff, |
1359 | const bool updateSpinCutoff, |
1360 | Args&&... args) noexcept { |
1361 | sequencer_.waitForTurn(turn * 2, spinCutoff, updateSpinCutoff); |
1362 | new (&contents_) T(std::forward<Args>(args)...); |
1363 | sequencer_.completeTurn(turn * 2); |
1364 | } |
1365 | |
1366 | /// enqueue using move construction, either real (if |
1367 | /// is_nothrow_move_constructible) or simulated using relocation and |
1368 | /// default construction (if IsRelocatable and is_nothrow_constructible) |
1369 | template < |
1370 | typename = typename std::enable_if< |
1371 | (folly::IsRelocatable<T>::value && |
1372 | std::is_nothrow_constructible<T>::value) || |
1373 | std::is_nothrow_constructible<T, T&&>::value>::type> |
1374 | void enqueue( |
1375 | const uint32_t turn, |
1376 | Atom<uint32_t>& spinCutoff, |
1377 | const bool updateSpinCutoff, |
1378 | T&& goner) noexcept { |
1379 | enqueueImpl( |
1380 | turn, |
1381 | spinCutoff, |
1382 | updateSpinCutoff, |
1383 | std::move(goner), |
1384 | typename std::conditional< |
1385 | std::is_nothrow_constructible<T, T&&>::value, |
1386 | ImplByMove, |
1387 | ImplByRelocation>::type()); |
1388 | } |
1389 | |
1390 | /// Waits until either: |
1391 | /// 1: the dequeue turn preceding the given enqueue turn has arrived |
1392 | /// 2: the given deadline has arrived |
1393 | /// Case 1 returns true, case 2 returns false. |
1394 | template <class Clock> |
1395 | bool tryWaitForEnqueueTurnUntil( |
1396 | const uint32_t turn, |
1397 | Atom<uint32_t>& spinCutoff, |
1398 | const bool updateSpinCutoff, |
1399 | const std::chrono::time_point<Clock>& when) noexcept { |
1400 | return sequencer_.tryWaitForTurn( |
1401 | turn * 2, spinCutoff, updateSpinCutoff, &when) != |
1402 | TurnSequencer<Atom>::TryWaitResult::TIMEDOUT; |
1403 | } |
1404 | |
1405 | bool mayEnqueue(const uint32_t turn) const noexcept { |
1406 | return sequencer_.isTurn(turn * 2); |
1407 | } |
1408 | |
1409 | void dequeue( |
1410 | uint32_t turn, |
1411 | Atom<uint32_t>& spinCutoff, |
1412 | const bool updateSpinCutoff, |
1413 | T& elem) noexcept { |
1414 | dequeueImpl( |
1415 | turn, |
1416 | spinCutoff, |
1417 | updateSpinCutoff, |
1418 | elem, |
1419 | typename std::conditional< |
1420 | folly::IsRelocatable<T>::value, |
1421 | ImplByRelocation, |
1422 | ImplByMove>::type()); |
1423 | } |
1424 | |
1425 | /// Waits until either: |
1426 | /// 1: the enqueue turn preceding the given dequeue turn has arrived |
1427 | /// 2: the given deadline has arrived |
1428 | /// Case 1 returns true, case 2 returns false. |
1429 | template <class Clock> |
1430 | bool tryWaitForDequeueTurnUntil( |
1431 | const uint32_t turn, |
1432 | Atom<uint32_t>& spinCutoff, |
1433 | const bool updateSpinCutoff, |
1434 | const std::chrono::time_point<Clock>& when) noexcept { |
1435 | return sequencer_.tryWaitForTurn( |
1436 | turn * 2 + 1, spinCutoff, updateSpinCutoff, &when) != |
1437 | TurnSequencer<Atom>::TryWaitResult::TIMEDOUT; |
1438 | } |
1439 | |
1440 | bool mayDequeue(const uint32_t turn) const noexcept { |
1441 | return sequencer_.isTurn(turn * 2 + 1); |
1442 | } |
1443 | |
1444 | private: |
1445 | /// Storage for a T constructed with placement new |
1446 | aligned_storage_for_t<T> contents_; |
1447 | |
1448 | /// Even turns are pushes, odd turns are pops |
1449 | TurnSequencer<Atom> sequencer_; |
1450 | |
1451 | T* ptr() noexcept { |
1452 | return static_cast<T*>(static_cast<void*>(&contents_)); |
1453 | } |
1454 | |
1455 | void destroyContents() noexcept { |
1456 | try { |
1457 | ptr()->~T(); |
1458 | } catch (...) { |
1459 | // g++ doesn't seem to have std::is_nothrow_destructible yet |
1460 | } |
1461 | if (kIsDebug) { |
1462 | memset(&contents_, 'Q', sizeof(T)); |
1463 | } |
1464 | } |
1465 | |
1466 | /// Tag classes for dispatching to enqueue/dequeue implementation. |
1467 | struct ImplByRelocation {}; |
1468 | struct ImplByMove {}; |
1469 | |
1470 | /// enqueue using nothrow move construction. |
1471 | void enqueueImpl( |
1472 | const uint32_t turn, |
1473 | Atom<uint32_t>& spinCutoff, |
1474 | const bool updateSpinCutoff, |
1475 | T&& goner, |
1476 | ImplByMove) noexcept { |
1477 | sequencer_.waitForTurn(turn * 2, spinCutoff, updateSpinCutoff); |
1478 | new (&contents_) T(std::move(goner)); |
1479 | sequencer_.completeTurn(turn * 2); |
1480 | } |
1481 | |
1482 | /// enqueue by simulating nothrow move with relocation, followed by |
1483 | /// default construction to a noexcept relocation. |
1484 | void enqueueImpl( |
1485 | const uint32_t turn, |
1486 | Atom<uint32_t>& spinCutoff, |
1487 | const bool updateSpinCutoff, |
1488 | T&& goner, |
1489 | ImplByRelocation) noexcept { |
1490 | sequencer_.waitForTurn(turn * 2, spinCutoff, updateSpinCutoff); |
1491 | memcpy( |
1492 | static_cast<void*>(&contents_), |
1493 | static_cast<void const*>(&goner), |
1494 | sizeof(T)); |
1495 | sequencer_.completeTurn(turn * 2); |
1496 | new (&goner) T(); |
1497 | } |
1498 | |
1499 | /// dequeue by destructing followed by relocation. This version is preferred, |
1500 | /// because as much work as possible can be done before waiting. |
1501 | void dequeueImpl( |
1502 | uint32_t turn, |
1503 | Atom<uint32_t>& spinCutoff, |
1504 | const bool updateSpinCutoff, |
1505 | T& elem, |
1506 | ImplByRelocation) noexcept { |
1507 | try { |
1508 | elem.~T(); |
1509 | } catch (...) { |
1510 | // unlikely, but if we don't complete our turn the queue will die |
1511 | } |
1512 | sequencer_.waitForTurn(turn * 2 + 1, spinCutoff, updateSpinCutoff); |
1513 | memcpy( |
1514 | static_cast<void*>(&elem), |
1515 | static_cast<void const*>(&contents_), |
1516 | sizeof(T)); |
1517 | sequencer_.completeTurn(turn * 2 + 1); |
1518 | } |
1519 | |
1520 | /// dequeue by nothrow move assignment. |
1521 | void dequeueImpl( |
1522 | uint32_t turn, |
1523 | Atom<uint32_t>& spinCutoff, |
1524 | const bool updateSpinCutoff, |
1525 | T& elem, |
1526 | ImplByMove) noexcept { |
1527 | sequencer_.waitForTurn(turn * 2 + 1, spinCutoff, updateSpinCutoff); |
1528 | elem = std::move(*ptr()); |
1529 | destroyContents(); |
1530 | sequencer_.completeTurn(turn * 2 + 1); |
1531 | } |
1532 | }; |
1533 | |
1534 | } // namespace detail |
1535 | |
1536 | } // namespace folly |
1537 | |