1 | /* |
2 | * Copyright (c) Facebook, Inc. and its affiliates. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | |
17 | #include <folly/CancellationToken.h> |
18 | #include <folly/Optional.h> |
19 | #include <folly/synchronization/detail/Sleeper.h> |
20 | |
21 | #include <glog/logging.h> |
22 | |
23 | #include <algorithm> |
24 | #include <new> |
25 | #include <thread> |
26 | #include <tuple> |
27 | |
28 | namespace folly { |
29 | namespace detail { |
30 | |
31 | CancellationState::~CancellationState() { |
32 | DCHECK(head_ == nullptr); |
33 | DCHECK(!isLocked(state_.load(std::memory_order_relaxed))); |
34 | DCHECK( |
35 | state_.load(std::memory_order_relaxed) < kTokenReferenceCountIncrement); |
36 | } |
37 | |
38 | bool CancellationState::tryAddCallback( |
39 | CancellationCallback* callback, |
40 | bool incrementRefCountIfSuccessful) noexcept { |
41 | // Try to acquire the lock, but abandon trying to acquire the lock if |
42 | // cancellation has already been requested (we can just immediately invoke |
43 | // the callback) or if cancellation can never be requested (we can just |
44 | // skip registration). |
45 | if (!tryLock([callback](std::uint64_t oldState) noexcept { |
46 | if (isCancellationRequested(oldState)) { |
47 | callback->invokeCallback(); |
48 | return false; |
49 | } |
50 | return canBeCancelled(oldState); |
51 | })) { |
52 | return false; |
53 | } |
54 | |
55 | // We've acquired the lock and cancellation has not yet been requested. |
56 | // Push this callback onto the head of the list. |
57 | if (head_ != nullptr) { |
58 | head_->prevNext_ = &callback->next_; |
59 | } |
60 | callback->next_ = head_; |
61 | callback->prevNext_ = &head_; |
62 | head_ = callback; |
63 | |
64 | if (incrementRefCountIfSuccessful) { |
65 | // Combine multiple atomic operations into a single atomic operation. |
66 | unlockAndIncrementTokenCount(); |
67 | } else { |
68 | unlock(); |
69 | } |
70 | |
71 | // Successfully added the callback. |
72 | return true; |
73 | } |
74 | |
75 | void CancellationState::removeCallback( |
76 | CancellationCallback* callback) noexcept { |
77 | DCHECK(callback != nullptr); |
78 | |
79 | lock(); |
80 | |
81 | if (callback->prevNext_ != nullptr) { |
82 | // Still registered in the list => not yet executed. |
83 | // Just remove it from the list. |
84 | *callback->prevNext_ = callback->next_; |
85 | if (callback->next_ != nullptr) { |
86 | callback->next_->prevNext_ = callback->prevNext_; |
87 | } |
88 | |
89 | unlockAndDecrementTokenCount(); |
90 | return; |
91 | } |
92 | |
93 | unlock(); |
94 | |
95 | // Callback has either already executed or is executing concurrently on |
96 | // another thread. |
97 | |
98 | if (signallingThreadId_ == std::this_thread::get_id()) { |
99 | // Callback executed on this thread or is still currently executing |
100 | // and is deregistering itself from within the callback. |
101 | if (callback->destructorHasRunInsideCallback_ != nullptr) { |
102 | // Currently inside the callback, let the requestCancellation() method |
103 | // know the object is about to be destructed and that it should |
104 | // not try to access the object when the callback returns. |
105 | *callback->destructorHasRunInsideCallback_ = true; |
106 | } |
107 | } else { |
108 | // Callback is currently executing on another thread, block until it |
109 | // finishes executing. |
110 | folly::detail::Sleeper sleeper; |
111 | while (!callback->callbackCompleted_.load(std::memory_order_acquire)) { |
112 | sleeper.wait(); |
113 | } |
114 | } |
115 | |
116 | removeTokenReference(); |
117 | } |
118 | |
119 | bool CancellationState::requestCancellation() noexcept { |
120 | if (!tryLockAndCancelUnlessCancelled()) { |
121 | // Was already marked as cancelled |
122 | return true; |
123 | } |
124 | |
125 | // This thread marked as cancelled and acquired the lock |
126 | |
127 | signallingThreadId_ = std::this_thread::get_id(); |
128 | |
129 | while (head_ != nullptr) { |
130 | // Dequeue the first item on the queue. |
131 | CancellationCallback* callback = head_; |
132 | head_ = callback->next_; |
133 | const bool anyMore = head_ != nullptr; |
134 | if (anyMore) { |
135 | head_->prevNext_ = &head_; |
136 | } |
137 | // Mark this item as removed from the list. |
138 | callback->prevNext_ = nullptr; |
139 | |
140 | // Don't hold the lock while executing the callback |
141 | // as we don't want to block other threads from |
142 | // deregistering callbacks. |
143 | unlock(); |
144 | |
145 | // TRICKY: Need to store a flag on the stack here that the callback |
146 | // can use to signal that the destructor was executed inline |
147 | // during the call. |
148 | // If the destructor was executed inline then it's not safe to |
149 | // dereference 'callback' after 'invokeCallback()' returns. |
150 | // If the destructor runs on some other thread then the other |
151 | // thread will block waiting for this thread to signal that the |
152 | // callback has finished executing. |
153 | bool destructorHasRunInsideCallback = false; |
154 | callback->destructorHasRunInsideCallback_ = &destructorHasRunInsideCallback; |
155 | |
156 | callback->invokeCallback(); |
157 | |
158 | if (!destructorHasRunInsideCallback) { |
159 | callback->destructorHasRunInsideCallback_ = nullptr; |
160 | callback->callbackCompleted_.store(true, std::memory_order_release); |
161 | } |
162 | |
163 | if (!anyMore) { |
164 | // This was the last item in the queue when we dequeued it. |
165 | // No more items should be added to the queue after we have |
166 | // marked the state as cancelled, only removed from the queue. |
167 | // Avoid acquring/releasing the lock in this case. |
168 | return false; |
169 | } |
170 | |
171 | lock(); |
172 | } |
173 | |
174 | unlock(); |
175 | |
176 | return false; |
177 | } |
178 | |
179 | void CancellationState::lock() noexcept { |
180 | folly::detail::Sleeper sleeper; |
181 | std::uint64_t oldState = state_.load(std::memory_order_relaxed); |
182 | do { |
183 | while (isLocked(oldState)) { |
184 | sleeper.wait(); |
185 | oldState = state_.load(std::memory_order_relaxed); |
186 | } |
187 | } while (!state_.compare_exchange_weak( |
188 | oldState, |
189 | oldState | kLockedFlag, |
190 | std::memory_order_acquire, |
191 | std::memory_order_relaxed)); |
192 | } |
193 | |
194 | void CancellationState::unlock() noexcept { |
195 | state_.fetch_sub(kLockedFlag, std::memory_order_release); |
196 | } |
197 | |
198 | void CancellationState::unlockAndIncrementTokenCount() noexcept { |
199 | state_.fetch_sub( |
200 | kLockedFlag - kTokenReferenceCountIncrement, std::memory_order_release); |
201 | } |
202 | |
203 | void CancellationState::unlockAndDecrementTokenCount() noexcept { |
204 | auto oldState = state_.fetch_sub( |
205 | kLockedFlag + kTokenReferenceCountIncrement, std::memory_order_acq_rel); |
206 | if (oldState < (kLockedFlag + 2 * kTokenReferenceCountIncrement)) { |
207 | delete this; |
208 | } |
209 | } |
210 | |
211 | bool CancellationState::tryLockAndCancelUnlessCancelled() noexcept { |
212 | folly::detail::Sleeper sleeper; |
213 | std::uint64_t oldState = state_.load(std::memory_order_acquire); |
214 | while (true) { |
215 | if (isCancellationRequested(oldState)) { |
216 | return false; |
217 | } else if (isLocked(oldState)) { |
218 | sleeper.wait(); |
219 | oldState = state_.load(std::memory_order_acquire); |
220 | } else if (state_.compare_exchange_weak( |
221 | oldState, |
222 | oldState | kLockedFlag | kCancellationRequestedFlag, |
223 | std::memory_order_acq_rel, |
224 | std::memory_order_acquire)) { |
225 | return true; |
226 | } |
227 | } |
228 | } |
229 | |
230 | template <typename Predicate> |
231 | bool CancellationState::tryLock(Predicate predicate) noexcept { |
232 | folly::detail::Sleeper sleeper; |
233 | std::uint64_t oldState = state_.load(std::memory_order_acquire); |
234 | while (true) { |
235 | if (!predicate(oldState)) { |
236 | return false; |
237 | } else if (isLocked(oldState)) { |
238 | sleeper.wait(); |
239 | oldState = state_.load(std::memory_order_acquire); |
240 | } else if (state_.compare_exchange_weak( |
241 | oldState, |
242 | oldState | kLockedFlag, |
243 | std::memory_order_acquire)) { |
244 | return true; |
245 | } |
246 | } |
247 | } |
248 | |
249 | } // namespace detail |
250 | } // namespace folly |
251 | |