1 | /* |
2 | * Copyright (c) Facebook, Inc. and its affiliates. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | |
17 | #pragma once |
18 | |
19 | #include <atomic> |
20 | #include <chrono> |
21 | #include <memory> |
22 | |
23 | #include <glog/logging.h> |
24 | |
25 | #include <folly/ConstexprMath.h> |
26 | #include <folly/Optional.h> |
27 | #include <folly/Traits.h> |
28 | #include <folly/concurrency/CacheLocality.h> |
29 | #include <folly/lang/Align.h> |
30 | #include <folly/synchronization/Hazptr.h> |
31 | #include <folly/synchronization/SaturatingSemaphore.h> |
32 | #include <folly/synchronization/WaitOptions.h> |
33 | #include <folly/synchronization/detail/Spin.h> |
34 | |
35 | namespace folly { |
36 | |
37 | /// UnboundedQueue supports a variety of options for unbounded |
38 | /// dynamically expanding an shrinking queues, including variations of: |
39 | /// - Single vs. multiple producers |
40 | /// - Single vs. multiple consumers |
41 | /// - Blocking vs. spin-waiting |
42 | /// - Non-waiting, timed, and waiting consumer operations. |
43 | /// Producer operations never wait or fail (unless out-of-memory). |
44 | /// |
45 | /// Template parameters: |
46 | /// - T: element type |
47 | /// - SingleProducer: true if there can be only one producer at a |
48 | /// time. |
49 | /// - SingleConsumer: true if there can be only one consumer at a |
50 | /// time. |
51 | /// - MayBlock: true if consumers may block, false if they only |
52 | /// spin. A performance tuning parameter. |
53 | /// - LgSegmentSize (default 8): Log base 2 of number of elements per |
54 | /// segment. A performance tuning parameter. See below. |
55 | /// - LgAlign (default 7): Log base 2 of alignment directive; can be |
56 | /// used to balance scalability (avoidance of false sharing) with |
57 | /// memory efficiency. |
58 | /// |
59 | /// When to use UnboundedQueue: |
60 | /// - If a small bound may lead to deadlock or performance degradation |
61 | /// under bursty patterns. |
62 | /// - If there is no risk of the queue growing too much. |
63 | /// |
64 | /// When not to use UnboundedQueue: |
65 | /// - If there is risk of the queue growing too much and a large bound |
66 | /// is acceptable, then use DynamicBoundedQueue. |
67 | /// - If the queue must not allocate on enqueue or it must have a |
68 | /// small bound, then use fixed-size MPMCQueue or (if non-blocking |
69 | /// SPSC) ProducerConsumerQueue. |
70 | /// |
71 | /// Template Aliases: |
72 | /// USPSCQueue<T, MayBlock, LgSegmentSize, LgAlign> |
73 | /// UMPSCQueue<T, MayBlock, LgSegmentSize, LgAlign> |
74 | /// USPMCQueue<T, MayBlock, LgSegmentSize, LgAlign> |
75 | /// UMPMCQueue<T, MayBlock, LgSegmentSize, LgAlign> |
76 | /// |
77 | /// Functions: |
78 | /// Producer operations never wait or fail (unless OOM) |
79 | /// void enqueue(const T&); |
80 | /// void enqueue(T&&); |
81 | /// Adds an element to the end of the queue. |
82 | /// |
83 | /// Consumer operations: |
84 | /// void dequeue(T&); |
85 | /// T dequeue(); |
86 | /// Extracts an element from the front of the queue. Waits |
87 | /// until an element is available if needed. |
88 | /// bool try_dequeue(T&); |
89 | /// folly::Optional<T> try_dequeue(); |
90 | /// Tries to extract an element from the front of the queue |
91 | /// if available. |
92 | /// bool try_dequeue_until(T&, time_point& deadline); |
93 | /// folly::Optional<T> try_dequeue_until(time_point& deadline); |
94 | /// Tries to extract an element from the front of the queue |
95 | /// if available until the specified deadline. |
96 | /// bool try_dequeue_for(T&, duration&); |
97 | /// folly::Optional<T> try_dequeue_for(duration&); |
98 | /// Tries to extract an element from the front of the queue if |
99 | /// available until the expiration of the specified duration. |
100 | /// const T* try_peek(); |
101 | /// Returns pointer to the element at the front of the queue |
102 | /// if available, or nullptr if the queue is empty. Only for |
103 | /// SPSC and MPSC. |
104 | /// |
105 | /// Secondary functions: |
106 | /// size_t size(); |
107 | /// Returns an estimate of the size of the queue. |
108 | /// bool empty(); |
109 | /// Returns true only if the queue was empty during the call. |
110 | /// Note: size() and empty() are guaranteed to be accurate only if |
111 | /// the queue is not changed concurrently. |
112 | /// |
113 | /// Usage examples: |
114 | /// @code |
115 | /// /* UMPSC, doesn't block, 1024 int elements per segment */ |
116 | /// UMPSCQueue<int, false, 10> q; |
117 | /// q.enqueue(1); |
118 | /// q.enqueue(2); |
119 | /// q.enqueue(3); |
120 | /// ASSERT_FALSE(q.empty()); |
121 | /// ASSERT_EQ(q.size(), 3); |
122 | /// int v; |
123 | /// q.dequeue(v); |
124 | /// ASSERT_EQ(v, 1); |
125 | /// ASSERT_TRUE(try_dequeue(v)); |
126 | /// ASSERT_EQ(v, 2); |
127 | /// ASSERT_TRUE(try_dequeue_until(v, now() + seconds(1))); |
128 | /// ASSERT_EQ(v, 3); |
129 | /// ASSERT_TRUE(q.empty()); |
130 | /// ASSERT_EQ(q.size(), 0); |
131 | /// ASSERT_FALSE(try_dequeue(v)); |
132 | /// ASSERT_FALSE(try_dequeue_for(v, microseconds(100))); |
133 | /// @endcode |
134 | /// |
135 | /// Design: |
136 | /// - The queue is composed of one or more segments. Each segment has |
137 | /// a fixed size of 2^LgSegmentSize entries. Each segment is used |
138 | /// exactly once. |
139 | /// - Each entry is composed of a futex and a single element. |
140 | /// - The queue contains two 64-bit ticket variables. The producer |
141 | /// ticket counts the number of producer tickets issued so far, and |
142 | /// the same for the consumer ticket. Each ticket number corresponds |
143 | /// to a specific entry in a specific segment. |
144 | /// - The queue maintains two pointers, head and tail. Head points to |
145 | /// the segment that corresponds to the current consumer |
146 | /// ticket. Similarly, tail pointer points to the segment that |
147 | /// corresponds to the producer ticket. |
148 | /// - Segments are organized as a singly linked list. |
149 | /// - The producer with the first ticket in the current producer |
150 | /// segment has primary responsibility for allocating and linking |
151 | /// the next segment. Other producers and connsumers may help do so |
152 | /// when needed if that thread is delayed. |
153 | /// - The producer with the last ticket in the current producer |
154 | /// segment is primarily responsible for advancing the tail pointer |
155 | /// to the next segment. Other producers and consumers may help do |
156 | /// so when needed if that thread is delayed. |
157 | /// - Similarly, the consumer with the last ticket in the current |
158 | /// consumer segment is primarily responsible for advancing the head |
159 | /// pointer to the next segment. Other consumers may help do so when |
160 | /// needed if that thread is delayed. |
161 | /// - The tail pointer must not lag behind the head pointer. |
162 | /// Otherwise, the algorithm cannot be certain about the removal of |
163 | /// segment and would have to incur higher costs to ensure safe |
164 | /// reclamation. Consumers must ensure that head never overtakes |
165 | /// tail. |
166 | /// |
167 | /// Memory Usage: |
168 | /// - An empty queue contains one segment. A nonempty queue contains |
169 | /// one or two more segment than fits its contents. |
170 | /// - Removed segments are not reclaimed until there are no threads, |
171 | /// producers or consumers, with references to them or their |
172 | /// predecessors. That is, a lagging thread may delay the reclamation |
173 | /// of a chain of removed segments. |
174 | /// - The template parameter LgAlign can be used to reduce memory usage |
175 | /// at the cost of increased chance of false sharing. |
176 | /// |
177 | /// Performance considerations: |
178 | /// - All operations take constant time, excluding the costs of |
179 | /// allocation, reclamation, interference from other threads, and |
180 | /// waiting for actions by other threads. |
181 | /// - In general, using the single producer and or single consumer |
182 | /// variants yield better performance than the MP and MC |
183 | /// alternatives. |
184 | /// - SPSC without blocking is the fastest configuration. It doesn't |
185 | /// include any read-modify-write atomic operations, full fences, or |
186 | /// system calls in the critical path. |
187 | /// - MP adds a fetch_add to the critical path of each producer operation. |
188 | /// - MC adds a fetch_add or compare_exchange to the critical path of |
189 | /// each consumer operation. |
190 | /// - The possibility of consumers blocking, even if they never do, |
191 | /// adds a compare_exchange to the critical path of each producer |
192 | /// operation. |
193 | /// - MPMC, SPMC, MPSC require the use of a deferred reclamation |
194 | /// mechanism to guarantee that segments removed from the linked |
195 | /// list, i.e., unreachable from the head pointer, are reclaimed |
196 | /// only after they are no longer needed by any lagging producers or |
197 | /// consumers. |
198 | /// - The overheads of segment allocation and reclamation are intended |
199 | /// to be mostly out of the critical path of the queue's throughput. |
200 | /// - If the template parameter LgSegmentSize is changed, it should be |
201 | /// set adequately high to keep the amortized cost of allocation and |
202 | /// reclamation low. |
203 | /// - It is recommended to measure performance with different variants |
204 | /// when applicable, e.g., UMPMC vs UMPSC. Depending on the use |
205 | /// case, sometimes the variant with the higher sequential overhead |
206 | /// may yield better results due to, for example, more favorable |
207 | /// producer-consumer balance or favorable timing for avoiding |
208 | /// costly blocking. |
209 | |
210 | template < |
211 | typename T, |
212 | bool SingleProducer, |
213 | bool SingleConsumer, |
214 | bool MayBlock, |
215 | size_t LgSegmentSize = 8, |
216 | size_t LgAlign = constexpr_log2(hardware_destructive_interference_size), |
217 | template <typename> class Atom = std::atomic> |
218 | class UnboundedQueue { |
219 | using Ticket = uint64_t; |
220 | class Entry; |
221 | class Segment; |
222 | |
223 | static constexpr bool SPSC = SingleProducer && SingleConsumer; |
224 | static constexpr size_t Stride = SPSC || (LgSegmentSize <= 1) ? 1 : 27; |
225 | static constexpr size_t SegmentSize = 1u << LgSegmentSize; |
226 | static constexpr size_t Align = 1u << LgAlign; |
227 | |
228 | static_assert( |
229 | std::is_nothrow_destructible<T>::value, |
230 | "T must be nothrow_destructible" ); |
231 | static_assert((Stride & 1) == 1, "Stride must be odd" ); |
232 | static_assert(LgSegmentSize < 32, "LgSegmentSize must be < 32" ); |
233 | static_assert(LgAlign < 16, "LgAlign must be < 16" ); |
234 | |
235 | using Sem = folly::SaturatingSemaphore<MayBlock, Atom>; |
236 | |
237 | struct Consumer { |
238 | Atom<Segment*> head; |
239 | Atom<Ticket> ticket; |
240 | hazptr_obj_cohort<Atom> cohort; |
241 | explicit Consumer(Segment* s) : head(s), ticket(0) { |
242 | s->set_cohort_no_tag(&cohort); // defined in hazptr_obj |
243 | } |
244 | }; |
245 | struct Producer { |
246 | Atom<Segment*> tail; |
247 | Atom<Ticket> ticket; |
248 | explicit Producer(Segment* s) : tail(s), ticket(0) {} |
249 | }; |
250 | |
251 | alignas(Align) Consumer c_; |
252 | alignas(Align) Producer p_; |
253 | |
254 | public: |
255 | /** constructor */ |
256 | UnboundedQueue() |
257 | : c_(new Segment(0)), p_(c_.head.load(std::memory_order_relaxed)) {} |
258 | |
259 | /** destructor */ |
260 | ~UnboundedQueue() { |
261 | cleanUpRemainingItems(); |
262 | reclaimRemainingSegments(); |
263 | } |
264 | |
265 | /** enqueue */ |
266 | FOLLY_ALWAYS_INLINE void enqueue(const T& arg) { |
267 | enqueueImpl(arg); |
268 | } |
269 | |
270 | FOLLY_ALWAYS_INLINE void enqueue(T&& arg) { |
271 | enqueueImpl(std::move(arg)); |
272 | } |
273 | |
274 | /** dequeue */ |
275 | FOLLY_ALWAYS_INLINE void dequeue(T& item) noexcept { |
276 | item = dequeueImpl(); |
277 | } |
278 | |
279 | FOLLY_ALWAYS_INLINE T dequeue() noexcept { |
280 | return dequeueImpl(); |
281 | } |
282 | |
283 | /** try_dequeue */ |
284 | FOLLY_ALWAYS_INLINE bool try_dequeue(T& item) noexcept { |
285 | auto o = try_dequeue(); |
286 | if (LIKELY(o.has_value())) { |
287 | item = std::move(*o); |
288 | return true; |
289 | } |
290 | return false; |
291 | } |
292 | |
293 | FOLLY_ALWAYS_INLINE folly::Optional<T> try_dequeue() noexcept { |
294 | return tryDequeueUntil(std::chrono::steady_clock::time_point::min()); |
295 | } |
296 | |
297 | /** try_dequeue_until */ |
298 | template <typename Clock, typename Duration> |
299 | FOLLY_ALWAYS_INLINE bool try_dequeue_until( |
300 | T& item, |
301 | const std::chrono::time_point<Clock, Duration>& deadline) noexcept { |
302 | folly::Optional<T> o = try_dequeue_until(deadline); |
303 | |
304 | if (LIKELY(o.has_value())) { |
305 | item = std::move(*o); |
306 | return true; |
307 | } |
308 | |
309 | return false; |
310 | } |
311 | |
312 | template <typename Clock, typename Duration> |
313 | FOLLY_ALWAYS_INLINE folly::Optional<T> try_dequeue_until( |
314 | const std::chrono::time_point<Clock, Duration>& deadline) noexcept { |
315 | return tryDequeueUntil(deadline); |
316 | } |
317 | |
318 | /** try_dequeue_for */ |
319 | template <typename Rep, typename Period> |
320 | FOLLY_ALWAYS_INLINE bool try_dequeue_for( |
321 | T& item, |
322 | const std::chrono::duration<Rep, Period>& duration) noexcept { |
323 | folly::Optional<T> o = try_dequeue_for(duration); |
324 | |
325 | if (LIKELY(o.has_value())) { |
326 | item = std::move(*o); |
327 | return true; |
328 | } |
329 | |
330 | return false; |
331 | } |
332 | |
333 | template <typename Rep, typename Period> |
334 | FOLLY_ALWAYS_INLINE folly::Optional<T> try_dequeue_for( |
335 | const std::chrono::duration<Rep, Period>& duration) noexcept { |
336 | folly::Optional<T> o = try_dequeue(); |
337 | if (LIKELY(o.has_value())) { |
338 | return o; |
339 | } |
340 | return tryDequeueUntil(std::chrono::steady_clock::now() + duration); |
341 | } |
342 | |
343 | /** try_peek */ |
344 | FOLLY_ALWAYS_INLINE const T* try_peek() noexcept { |
345 | /* This function is supported only for USPSC and UMPSC queues. */ |
346 | DCHECK(SingleConsumer); |
347 | return tryPeekUntil(std::chrono::steady_clock::time_point::min()); |
348 | } |
349 | |
350 | /** size */ |
351 | size_t size() const noexcept { |
352 | auto p = producerTicket(); |
353 | auto c = consumerTicket(); |
354 | return p > c ? p - c : 0; |
355 | } |
356 | |
357 | /** empty */ |
358 | bool empty() const noexcept { |
359 | auto c = consumerTicket(); |
360 | auto p = producerTicket(); |
361 | return p <= c; |
362 | } |
363 | |
364 | private: |
365 | /** enqueueImpl */ |
366 | template <typename Arg> |
367 | FOLLY_ALWAYS_INLINE void enqueueImpl(Arg&& arg) { |
368 | if (SPSC) { |
369 | Segment* s = tail(); |
370 | enqueueCommon(s, std::forward<Arg>(arg)); |
371 | } else { |
372 | // Using hazptr_holder instead of hazptr_local because it is |
373 | // possible that the T ctor happens to use hazard pointers. |
374 | hazptr_holder<Atom> hptr; |
375 | Segment* s = hptr.get_protected(p_.tail); |
376 | enqueueCommon(s, std::forward<Arg>(arg)); |
377 | } |
378 | } |
379 | |
380 | /** enqueueCommon */ |
381 | template <typename Arg> |
382 | FOLLY_ALWAYS_INLINE void enqueueCommon(Segment* s, Arg&& arg) { |
383 | Ticket t = fetchIncrementProducerTicket(); |
384 | if (!SingleProducer) { |
385 | s = findSegment(s, t); |
386 | } |
387 | DCHECK_GE(t, s->minTicket()); |
388 | DCHECK_LT(t, s->minTicket() + SegmentSize); |
389 | size_t idx = index(t); |
390 | Entry& e = s->entry(idx); |
391 | e.putItem(std::forward<Arg>(arg)); |
392 | if (responsibleForAlloc(t)) { |
393 | allocNextSegment(s); |
394 | } |
395 | if (responsibleForAdvance(t)) { |
396 | advanceTail(s); |
397 | } |
398 | } |
399 | |
400 | /** dequeueImpl */ |
401 | FOLLY_ALWAYS_INLINE T dequeueImpl() noexcept { |
402 | if (SPSC) { |
403 | Segment* s = head(); |
404 | return dequeueCommon(s); |
405 | } else { |
406 | // Using hazptr_holder instead of hazptr_local because it is |
407 | // possible to call the T dtor and it may happen to use hazard |
408 | // pointers. |
409 | hazptr_holder<Atom> hptr; |
410 | Segment* s = hptr.get_protected(c_.head); |
411 | return dequeueCommon(s); |
412 | } |
413 | } |
414 | |
415 | /** dequeueCommon */ |
416 | FOLLY_ALWAYS_INLINE T dequeueCommon(Segment* s) noexcept { |
417 | Ticket t = fetchIncrementConsumerTicket(); |
418 | if (!SingleConsumer) { |
419 | s = findSegment(s, t); |
420 | } |
421 | size_t idx = index(t); |
422 | Entry& e = s->entry(idx); |
423 | auto res = e.takeItem(); |
424 | if (responsibleForAdvance(t)) { |
425 | advanceHead(s); |
426 | } |
427 | return res; |
428 | } |
429 | |
430 | /** tryDequeueUntil */ |
431 | template <typename Clock, typename Duration> |
432 | FOLLY_ALWAYS_INLINE folly::Optional<T> tryDequeueUntil( |
433 | const std::chrono::time_point<Clock, Duration>& deadline) noexcept { |
434 | if (SingleConsumer) { |
435 | Segment* s = head(); |
436 | return tryDequeueUntilSC(s, deadline); |
437 | } else { |
438 | // Using hazptr_holder instead of hazptr_local because it is |
439 | // possible to call ~T() and it may happen to use hazard pointers. |
440 | hazptr_holder<Atom> hptr; |
441 | Segment* s = hptr.get_protected(c_.head); |
442 | return tryDequeueUntilMC(s, deadline); |
443 | } |
444 | } |
445 | |
446 | /** tryDequeueUntilSC */ |
447 | template <typename Clock, typename Duration> |
448 | FOLLY_ALWAYS_INLINE folly::Optional<T> tryDequeueUntilSC( |
449 | Segment* s, |
450 | const std::chrono::time_point<Clock, Duration>& deadline) noexcept { |
451 | Ticket t = consumerTicket(); |
452 | DCHECK_GE(t, s->minTicket()); |
453 | DCHECK_LT(t, (s->minTicket() + SegmentSize)); |
454 | size_t idx = index(t); |
455 | Entry& e = s->entry(idx); |
456 | if (UNLIKELY(!tryDequeueWaitElem(e, t, deadline))) { |
457 | return folly::Optional<T>(); |
458 | } |
459 | setConsumerTicket(t + 1); |
460 | folly::Optional<T> ret = e.takeItem(); |
461 | if (responsibleForAdvance(t)) { |
462 | advanceHead(s); |
463 | } |
464 | return ret; |
465 | } |
466 | |
467 | /** tryDequeueUntilMC */ |
468 | template <typename Clock, typename Duration> |
469 | FOLLY_ALWAYS_INLINE folly::Optional<T> tryDequeueUntilMC( |
470 | Segment* s, |
471 | const std::chrono::time_point<Clock, Duration>& deadline) noexcept { |
472 | while (true) { |
473 | Ticket t = consumerTicket(); |
474 | if (UNLIKELY(t >= (s->minTicket() + SegmentSize))) { |
475 | s = getAllocNextSegment(s, t); |
476 | DCHECK(s); |
477 | continue; |
478 | } |
479 | size_t idx = index(t); |
480 | Entry& e = s->entry(idx); |
481 | if (UNLIKELY(!tryDequeueWaitElem(e, t, deadline))) { |
482 | return folly::Optional<T>(); |
483 | } |
484 | if (!c_.ticket.compare_exchange_weak( |
485 | t, t + 1, std::memory_order_acq_rel, std::memory_order_acquire)) { |
486 | continue; |
487 | } |
488 | folly::Optional<T> ret = e.takeItem(); |
489 | if (responsibleForAdvance(t)) { |
490 | advanceHead(s); |
491 | } |
492 | return ret; |
493 | } |
494 | } |
495 | |
496 | /** tryDequeueWaitElem */ |
497 | template <typename Clock, typename Duration> |
498 | FOLLY_ALWAYS_INLINE bool tryDequeueWaitElem( |
499 | Entry& e, |
500 | Ticket t, |
501 | const std::chrono::time_point<Clock, Duration>& deadline) noexcept { |
502 | if (LIKELY(e.tryWaitUntil(deadline))) { |
503 | return true; |
504 | } |
505 | return t < producerTicket(); |
506 | } |
507 | |
508 | /** tryPeekUntil */ |
509 | template <typename Clock, typename Duration> |
510 | FOLLY_ALWAYS_INLINE const T* tryPeekUntil( |
511 | const std::chrono::time_point<Clock, Duration>& deadline) noexcept { |
512 | Segment* s = head(); |
513 | Ticket t = consumerTicket(); |
514 | DCHECK_GE(t, s->minTicket()); |
515 | DCHECK_LT(t, (s->minTicket() + SegmentSize)); |
516 | size_t idx = index(t); |
517 | Entry& e = s->entry(idx); |
518 | if (UNLIKELY(!tryDequeueWaitElem(e, t, deadline))) { |
519 | return nullptr; |
520 | } |
521 | return e.peekItem(); |
522 | } |
523 | |
524 | /** findSegment */ |
525 | FOLLY_ALWAYS_INLINE |
526 | Segment* findSegment(Segment* s, const Ticket t) noexcept { |
527 | while (UNLIKELY(t >= (s->minTicket() + SegmentSize))) { |
528 | s = getAllocNextSegment(s, t); |
529 | DCHECK(s); |
530 | } |
531 | return s; |
532 | } |
533 | |
534 | /** getAllocNextSegment */ |
535 | Segment* getAllocNextSegment(Segment* s, Ticket t) noexcept { |
536 | Segment* next = s->nextSegment(); |
537 | if (!next) { |
538 | DCHECK_GE(t, s->minTicket() + SegmentSize); |
539 | auto diff = t - (s->minTicket() + SegmentSize); |
540 | if (diff > 0) { |
541 | auto dur = std::chrono::microseconds(diff); |
542 | auto deadline = std::chrono::steady_clock::now() + dur; |
543 | WaitOptions opt; |
544 | opt.spin_max(dur); |
545 | detail::spin_pause_until( |
546 | deadline, opt, [s] { return s->nextSegment(); }); |
547 | next = s->nextSegment(); |
548 | if (next) { |
549 | return next; |
550 | } |
551 | } |
552 | next = allocNextSegment(s); |
553 | } |
554 | DCHECK(next); |
555 | return next; |
556 | } |
557 | |
558 | /** allocNextSegment */ |
559 | Segment* allocNextSegment(Segment* s) { |
560 | auto t = s->minTicket() + SegmentSize; |
561 | Segment* next = new Segment(t); |
562 | next->set_cohort_no_tag(&c_.cohort); // defined in hazptr_obj |
563 | next->acquire_ref_safe(); // defined in hazptr_obj_base_linked |
564 | if (!s->casNextSegment(next)) { |
565 | delete next; |
566 | next = s->nextSegment(); |
567 | } |
568 | DCHECK(next); |
569 | return next; |
570 | } |
571 | |
572 | /** advanceTail */ |
573 | void advanceTail(Segment* s) noexcept { |
574 | if (SPSC) { |
575 | Segment* next = s->nextSegment(); |
576 | DCHECK(next); |
577 | setTail(next); |
578 | } else { |
579 | Ticket t = s->minTicket() + SegmentSize; |
580 | advanceTailToTicket(t); |
581 | } |
582 | } |
583 | |
584 | /** advanceTailToTicket */ |
585 | void advanceTailToTicket(Ticket t) noexcept { |
586 | Segment* s = tail(); |
587 | while (s->minTicket() < t) { |
588 | Segment* next = s->nextSegment(); |
589 | if (!next) { |
590 | next = allocNextSegment(s); |
591 | } |
592 | DCHECK(next); |
593 | casTail(s, next); |
594 | s = tail(); |
595 | } |
596 | } |
597 | |
598 | /** advanceHead */ |
599 | void advanceHead(Segment* s) noexcept { |
600 | if (SPSC) { |
601 | while (tail() == s) { |
602 | /* Wait for producer to advance tail. */ |
603 | asm_volatile_pause(); |
604 | } |
605 | Segment* next = s->nextSegment(); |
606 | DCHECK(next); |
607 | setHead(next); |
608 | reclaimSegment(s); |
609 | } else { |
610 | Ticket t = s->minTicket() + SegmentSize; |
611 | advanceHeadToTicket(t); |
612 | } |
613 | } |
614 | |
615 | /** advanceHeadToTicket */ |
616 | void advanceHeadToTicket(Ticket t) noexcept { |
617 | /* Tail must not lag behind head. Otherwise, the algorithm cannot |
618 | be certain about removal of segments. */ |
619 | advanceTailToTicket(t); |
620 | Segment* s = head(); |
621 | if (SingleConsumer) { |
622 | DCHECK_EQ(s->minTicket() + SegmentSize, t); |
623 | Segment* next = s->nextSegment(); |
624 | DCHECK(next); |
625 | setHead(next); |
626 | reclaimSegment(s); |
627 | } else { |
628 | while (s->minTicket() < t) { |
629 | Segment* next = s->nextSegment(); |
630 | DCHECK(next); |
631 | if (casHead(s, next)) { |
632 | reclaimSegment(s); |
633 | s = next; |
634 | } |
635 | } |
636 | } |
637 | } |
638 | |
639 | /** reclaimSegment */ |
640 | void reclaimSegment(Segment* s) noexcept { |
641 | if (SPSC) { |
642 | delete s; |
643 | } else { |
644 | s->retire(); // defined in hazptr_obj_base_linked |
645 | } |
646 | } |
647 | |
648 | /** cleanUpRemainingItems */ |
649 | void cleanUpRemainingItems() { |
650 | auto end = producerTicket(); |
651 | auto s = head(); |
652 | for (auto t = consumerTicket(); t < end; ++t) { |
653 | if (t >= s->minTicket() + SegmentSize) { |
654 | s = s->nextSegment(); |
655 | } |
656 | DCHECK_LT(t, (s->minTicket() + SegmentSize)); |
657 | auto idx = index(t); |
658 | auto& e = s->entry(idx); |
659 | e.destroyItem(); |
660 | } |
661 | } |
662 | |
663 | /** reclaimRemainingSegments */ |
664 | void reclaimRemainingSegments() { |
665 | auto h = head(); |
666 | auto s = h->nextSegment(); |
667 | h->setNextSegment(nullptr); |
668 | reclaimSegment(h); |
669 | while (s) { |
670 | auto next = s->nextSegment(); |
671 | delete s; |
672 | s = next; |
673 | } |
674 | } |
675 | |
676 | FOLLY_ALWAYS_INLINE size_t index(Ticket t) const noexcept { |
677 | return (t * Stride) & (SegmentSize - 1); |
678 | } |
679 | |
680 | FOLLY_ALWAYS_INLINE bool responsibleForAlloc(Ticket t) const noexcept { |
681 | return (t & (SegmentSize - 1)) == 0; |
682 | } |
683 | |
684 | FOLLY_ALWAYS_INLINE bool responsibleForAdvance(Ticket t) const noexcept { |
685 | return (t & (SegmentSize - 1)) == (SegmentSize - 1); |
686 | } |
687 | |
688 | FOLLY_ALWAYS_INLINE Segment* head() const noexcept { |
689 | return c_.head.load(std::memory_order_acquire); |
690 | } |
691 | |
692 | FOLLY_ALWAYS_INLINE Segment* tail() const noexcept { |
693 | return p_.tail.load(std::memory_order_acquire); |
694 | } |
695 | |
696 | FOLLY_ALWAYS_INLINE Ticket producerTicket() const noexcept { |
697 | return p_.ticket.load(std::memory_order_acquire); |
698 | } |
699 | |
700 | FOLLY_ALWAYS_INLINE Ticket consumerTicket() const noexcept { |
701 | return c_.ticket.load(std::memory_order_acquire); |
702 | } |
703 | |
704 | void setHead(Segment* s) noexcept { |
705 | DCHECK(SingleConsumer); |
706 | c_.head.store(s, std::memory_order_relaxed); |
707 | } |
708 | |
709 | void setTail(Segment* s) noexcept { |
710 | DCHECK(SPSC); |
711 | p_.tail.store(s, std::memory_order_release); |
712 | } |
713 | |
714 | bool (Segment*& s, Segment* next) noexcept { |
715 | DCHECK(!SingleConsumer); |
716 | return c_.head.compare_exchange_strong( |
717 | s, next, std::memory_order_release, std::memory_order_acquire); |
718 | } |
719 | |
720 | void casTail(Segment*& s, Segment* next) noexcept { |
721 | DCHECK(!SPSC); |
722 | p_.tail.compare_exchange_strong( |
723 | s, next, std::memory_order_release, std::memory_order_relaxed); |
724 | } |
725 | |
726 | FOLLY_ALWAYS_INLINE void setProducerTicket(Ticket t) noexcept { |
727 | p_.ticket.store(t, std::memory_order_release); |
728 | } |
729 | |
730 | FOLLY_ALWAYS_INLINE void setConsumerTicket(Ticket t) noexcept { |
731 | c_.ticket.store(t, std::memory_order_release); |
732 | } |
733 | |
734 | FOLLY_ALWAYS_INLINE Ticket fetchIncrementConsumerTicket() noexcept { |
735 | if (SingleConsumer) { |
736 | Ticket oldval = consumerTicket(); |
737 | setConsumerTicket(oldval + 1); |
738 | return oldval; |
739 | } else { // MC |
740 | return c_.ticket.fetch_add(1, std::memory_order_acq_rel); |
741 | } |
742 | } |
743 | |
744 | FOLLY_ALWAYS_INLINE Ticket fetchIncrementProducerTicket() noexcept { |
745 | if (SingleProducer) { |
746 | Ticket oldval = producerTicket(); |
747 | setProducerTicket(oldval + 1); |
748 | return oldval; |
749 | } else { // MP |
750 | return p_.ticket.fetch_add(1, std::memory_order_acq_rel); |
751 | } |
752 | } |
753 | |
754 | /** |
755 | * Entry |
756 | */ |
757 | class Entry { |
758 | Sem flag_; |
759 | aligned_storage_for_t<T> item_; |
760 | |
761 | public: |
762 | template <typename Arg> |
763 | FOLLY_ALWAYS_INLINE void putItem(Arg&& arg) { |
764 | new (&item_) T(std::forward<Arg>(arg)); |
765 | flag_.post(); |
766 | } |
767 | |
768 | FOLLY_ALWAYS_INLINE T takeItem() noexcept { |
769 | flag_.wait(); |
770 | return getItem(); |
771 | } |
772 | |
773 | FOLLY_ALWAYS_INLINE const T* peekItem() noexcept { |
774 | flag_.wait(); |
775 | return itemPtr(); |
776 | } |
777 | |
778 | template <typename Clock, typename Duration> |
779 | FOLLY_EXPORT FOLLY_ALWAYS_INLINE bool tryWaitUntil( |
780 | const std::chrono::time_point<Clock, Duration>& deadline) noexcept { |
781 | // wait-options from benchmarks on contended queues: |
782 | static constexpr auto const opt = |
783 | Sem::wait_options().spin_max(std::chrono::microseconds(10)); |
784 | return flag_.try_wait_until(deadline, opt); |
785 | } |
786 | |
787 | FOLLY_ALWAYS_INLINE void destroyItem() noexcept { |
788 | itemPtr()->~T(); |
789 | } |
790 | |
791 | private: |
792 | FOLLY_ALWAYS_INLINE T getItem() noexcept { |
793 | T ret = std::move(*(itemPtr())); |
794 | destroyItem(); |
795 | return ret; |
796 | } |
797 | |
798 | FOLLY_ALWAYS_INLINE T* itemPtr() noexcept { |
799 | return static_cast<T*>(static_cast<void*>(&item_)); |
800 | } |
801 | }; // Entry |
802 | |
803 | /** |
804 | * Segment |
805 | */ |
806 | class Segment : public hazptr_obj_base_linked<Segment, Atom> { |
807 | Atom<Segment*> next_{nullptr}; |
808 | const Ticket min_; |
809 | alignas(Align) Entry b_[SegmentSize]; |
810 | |
811 | public: |
812 | explicit Segment(const Ticket t) noexcept : min_(t) {} |
813 | |
814 | Segment* nextSegment() const noexcept { |
815 | return next_.load(std::memory_order_acquire); |
816 | } |
817 | |
818 | void setNextSegment(Segment* next) { |
819 | next_.store(next, std::memory_order_relaxed); |
820 | } |
821 | |
822 | bool casNextSegment(Segment* next) noexcept { |
823 | Segment* expected = nullptr; |
824 | return next_.compare_exchange_strong( |
825 | expected, next, std::memory_order_release, std::memory_order_relaxed); |
826 | } |
827 | |
828 | FOLLY_ALWAYS_INLINE Ticket minTicket() const noexcept { |
829 | DCHECK_EQ((min_ & (SegmentSize - 1)), Ticket(0)); |
830 | return min_; |
831 | } |
832 | |
833 | FOLLY_ALWAYS_INLINE Entry& entry(size_t index) noexcept { |
834 | return b_[index]; |
835 | } |
836 | |
837 | template <typename S> |
838 | void push_links(bool m, S& s) { |
839 | if (m == false) { // next_ is immutable |
840 | auto p = nextSegment(); |
841 | if (p) { |
842 | s.push(p); |
843 | } |
844 | } |
845 | } |
846 | }; // Segment |
847 | |
848 | }; // UnboundedQueue |
849 | |
850 | /* Aliases */ |
851 | |
852 | template < |
853 | typename T, |
854 | bool MayBlock, |
855 | size_t LgSegmentSize = 8, |
856 | size_t LgAlign = constexpr_log2(hardware_destructive_interference_size), |
857 | template <typename> class Atom = std::atomic> |
858 | using USPSCQueue = |
859 | UnboundedQueue<T, true, true, MayBlock, LgSegmentSize, LgAlign, Atom>; |
860 | |
861 | template < |
862 | typename T, |
863 | bool MayBlock, |
864 | size_t LgSegmentSize = 8, |
865 | size_t LgAlign = constexpr_log2(hardware_destructive_interference_size), |
866 | template <typename> class Atom = std::atomic> |
867 | using UMPSCQueue = |
868 | UnboundedQueue<T, false, true, MayBlock, LgSegmentSize, LgAlign, Atom>; |
869 | |
870 | template < |
871 | typename T, |
872 | bool MayBlock, |
873 | size_t LgSegmentSize = 8, |
874 | size_t LgAlign = constexpr_log2(hardware_destructive_interference_size), |
875 | template <typename> class Atom = std::atomic> |
876 | using USPMCQueue = |
877 | UnboundedQueue<T, true, false, MayBlock, LgSegmentSize, LgAlign, Atom>; |
878 | |
879 | template < |
880 | typename T, |
881 | bool MayBlock, |
882 | size_t LgSegmentSize = 8, |
883 | size_t LgAlign = constexpr_log2(hardware_destructive_interference_size), |
884 | template <typename> class Atom = std::atomic> |
885 | using UMPMCQueue = |
886 | UnboundedQueue<T, false, false, MayBlock, LgSegmentSize, LgAlign, Atom>; |
887 | |
888 | } // namespace folly |
889 | |