1// Copyright(c) 2015-present, Gabi Melman & spdlog contributors.
2// Distributed under the MIT License (http://opensource.org/licenses/MIT)
3
4#pragma once
5
6// multi producer-multi consumer blocking queue.
7// enqueue(..) - will block until room found to put the new message.
8// enqueue_nowait(..) - will return immediately with false if no room left in
9// the queue.
10// dequeue_for(..) - will block until the queue is not empty or timeout have
11// passed.
12
13#include <spdlog/details/circular_q.h>
14
15#include <condition_variable>
16#include <mutex>
17
18namespace spdlog {
19namespace details {
20
21template<typename T>
22class mpmc_blocking_queue
23{
24public:
25 using item_type = T;
26 explicit mpmc_blocking_queue(size_t max_items)
27 : q_(max_items)
28 {}
29
30#ifndef __MINGW32__
31 // try to enqueue and block if no room left
32 void enqueue(T &&item)
33 {
34 {
35 std::unique_lock<std::mutex> lock(queue_mutex_);
36 pop_cv_.wait(lock, [this] { return !this->q_.full(); });
37 q_.push_back(std::move(item));
38 }
39 push_cv_.notify_one();
40 }
41
42 // enqueue immediately. overrun oldest message in the queue if no room left.
43 void enqueue_nowait(T &&item)
44 {
45 {
46 std::unique_lock<std::mutex> lock(queue_mutex_);
47 q_.push_back(std::move(item));
48 }
49 push_cv_.notify_one();
50 }
51
52 // try to dequeue item. if no item found. wait up to timeout and try again
53 // Return true, if succeeded dequeue item, false otherwise
54 bool dequeue_for(T &popped_item, std::chrono::milliseconds wait_duration)
55 {
56 {
57 std::unique_lock<std::mutex> lock(queue_mutex_);
58 if (!push_cv_.wait_for(lock, wait_duration, [this] { return !this->q_.empty(); }))
59 {
60 return false;
61 }
62 popped_item = std::move(q_.front());
63 q_.pop_front();
64 }
65 pop_cv_.notify_one();
66 return true;
67 }
68
69#else
70 // apparently mingw deadlocks if the mutex is released before cv.notify_one(),
71 // so release the mutex at the very end each function.
72
73 // try to enqueue and block if no room left
74 void enqueue(T &&item)
75 {
76 std::unique_lock<std::mutex> lock(queue_mutex_);
77 pop_cv_.wait(lock, [this] { return !this->q_.full(); });
78 q_.push_back(std::move(item));
79 push_cv_.notify_one();
80 }
81
82 // enqueue immediately. overrun oldest message in the queue if no room left.
83 void enqueue_nowait(T &&item)
84 {
85 std::unique_lock<std::mutex> lock(queue_mutex_);
86 q_.push_back(std::move(item));
87 push_cv_.notify_one();
88 }
89
90 // try to dequeue item. if no item found. wait up to timeout and try again
91 // Return true, if succeeded dequeue item, false otherwise
92 bool dequeue_for(T &popped_item, std::chrono::milliseconds wait_duration)
93 {
94 std::unique_lock<std::mutex> lock(queue_mutex_);
95 if (!push_cv_.wait_for(lock, wait_duration, [this] { return !this->q_.empty(); }))
96 {
97 return false;
98 }
99 popped_item = std::move(q_.front());
100 q_.pop_front();
101 pop_cv_.notify_one();
102 return true;
103 }
104
105#endif
106
107 size_t overrun_counter()
108 {
109 std::unique_lock<std::mutex> lock(queue_mutex_);
110 return q_.overrun_counter();
111 }
112
113 size_t size()
114 {
115 std::unique_lock<std::mutex> lock(queue_mutex_);
116 return q_.size();
117 }
118
119 void reset_overrun_counter()
120 {
121 std::unique_lock<std::mutex> lock(queue_mutex_);
122 q_.reset_overrun_counter();
123 }
124
125private:
126 std::mutex queue_mutex_;
127 std::condition_variable push_cv_;
128 std::condition_variable pop_cv_;
129 spdlog::details::circular_q<T> q_;
130};
131} // namespace details
132} // namespace spdlog
133