1/*
2 * Copyright (c) Facebook, Inc. and its affiliates.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#include <folly/CancellationToken.h>
18#include <folly/Optional.h>
19#include <folly/synchronization/detail/Sleeper.h>
20
21#include <glog/logging.h>
22
23#include <algorithm>
24#include <new>
25#include <thread>
26#include <tuple>
27
28namespace folly {
29namespace detail {
30
31CancellationState::~CancellationState() {
32 DCHECK(head_ == nullptr);
33 DCHECK(!isLocked(state_.load(std::memory_order_relaxed)));
34 DCHECK(
35 state_.load(std::memory_order_relaxed) < kTokenReferenceCountIncrement);
36}
37
38bool CancellationState::tryAddCallback(
39 CancellationCallback* callback,
40 bool incrementRefCountIfSuccessful) noexcept {
41 // Try to acquire the lock, but abandon trying to acquire the lock if
42 // cancellation has already been requested (we can just immediately invoke
43 // the callback) or if cancellation can never be requested (we can just
44 // skip registration).
45 if (!tryLock([callback](std::uint64_t oldState) noexcept {
46 if (isCancellationRequested(oldState)) {
47 callback->invokeCallback();
48 return false;
49 }
50 return canBeCancelled(oldState);
51 })) {
52 return false;
53 }
54
55 // We've acquired the lock and cancellation has not yet been requested.
56 // Push this callback onto the head of the list.
57 if (head_ != nullptr) {
58 head_->prevNext_ = &callback->next_;
59 }
60 callback->next_ = head_;
61 callback->prevNext_ = &head_;
62 head_ = callback;
63
64 if (incrementRefCountIfSuccessful) {
65 // Combine multiple atomic operations into a single atomic operation.
66 unlockAndIncrementTokenCount();
67 } else {
68 unlock();
69 }
70
71 // Successfully added the callback.
72 return true;
73}
74
75void CancellationState::removeCallback(
76 CancellationCallback* callback) noexcept {
77 DCHECK(callback != nullptr);
78
79 lock();
80
81 if (callback->prevNext_ != nullptr) {
82 // Still registered in the list => not yet executed.
83 // Just remove it from the list.
84 *callback->prevNext_ = callback->next_;
85 if (callback->next_ != nullptr) {
86 callback->next_->prevNext_ = callback->prevNext_;
87 }
88
89 unlockAndDecrementTokenCount();
90 return;
91 }
92
93 unlock();
94
95 // Callback has either already executed or is executing concurrently on
96 // another thread.
97
98 if (signallingThreadId_ == std::this_thread::get_id()) {
99 // Callback executed on this thread or is still currently executing
100 // and is deregistering itself from within the callback.
101 if (callback->destructorHasRunInsideCallback_ != nullptr) {
102 // Currently inside the callback, let the requestCancellation() method
103 // know the object is about to be destructed and that it should
104 // not try to access the object when the callback returns.
105 *callback->destructorHasRunInsideCallback_ = true;
106 }
107 } else {
108 // Callback is currently executing on another thread, block until it
109 // finishes executing.
110 folly::detail::Sleeper sleeper;
111 while (!callback->callbackCompleted_.load(std::memory_order_acquire)) {
112 sleeper.wait();
113 }
114 }
115
116 removeTokenReference();
117}
118
119bool CancellationState::requestCancellation() noexcept {
120 if (!tryLockAndCancelUnlessCancelled()) {
121 // Was already marked as cancelled
122 return true;
123 }
124
125 // This thread marked as cancelled and acquired the lock
126
127 signallingThreadId_ = std::this_thread::get_id();
128
129 while (head_ != nullptr) {
130 // Dequeue the first item on the queue.
131 CancellationCallback* callback = head_;
132 head_ = callback->next_;
133 const bool anyMore = head_ != nullptr;
134 if (anyMore) {
135 head_->prevNext_ = &head_;
136 }
137 // Mark this item as removed from the list.
138 callback->prevNext_ = nullptr;
139
140 // Don't hold the lock while executing the callback
141 // as we don't want to block other threads from
142 // deregistering callbacks.
143 unlock();
144
145 // TRICKY: Need to store a flag on the stack here that the callback
146 // can use to signal that the destructor was executed inline
147 // during the call.
148 // If the destructor was executed inline then it's not safe to
149 // dereference 'callback' after 'invokeCallback()' returns.
150 // If the destructor runs on some other thread then the other
151 // thread will block waiting for this thread to signal that the
152 // callback has finished executing.
153 bool destructorHasRunInsideCallback = false;
154 callback->destructorHasRunInsideCallback_ = &destructorHasRunInsideCallback;
155
156 callback->invokeCallback();
157
158 if (!destructorHasRunInsideCallback) {
159 callback->destructorHasRunInsideCallback_ = nullptr;
160 callback->callbackCompleted_.store(true, std::memory_order_release);
161 }
162
163 if (!anyMore) {
164 // This was the last item in the queue when we dequeued it.
165 // No more items should be added to the queue after we have
166 // marked the state as cancelled, only removed from the queue.
167 // Avoid acquring/releasing the lock in this case.
168 return false;
169 }
170
171 lock();
172 }
173
174 unlock();
175
176 return false;
177}
178
179void CancellationState::lock() noexcept {
180 folly::detail::Sleeper sleeper;
181 std::uint64_t oldState = state_.load(std::memory_order_relaxed);
182 do {
183 while (isLocked(oldState)) {
184 sleeper.wait();
185 oldState = state_.load(std::memory_order_relaxed);
186 }
187 } while (!state_.compare_exchange_weak(
188 oldState,
189 oldState | kLockedFlag,
190 std::memory_order_acquire,
191 std::memory_order_relaxed));
192}
193
194void CancellationState::unlock() noexcept {
195 state_.fetch_sub(kLockedFlag, std::memory_order_release);
196}
197
198void CancellationState::unlockAndIncrementTokenCount() noexcept {
199 state_.fetch_sub(
200 kLockedFlag - kTokenReferenceCountIncrement, std::memory_order_release);
201}
202
203void CancellationState::unlockAndDecrementTokenCount() noexcept {
204 auto oldState = state_.fetch_sub(
205 kLockedFlag + kTokenReferenceCountIncrement, std::memory_order_acq_rel);
206 if (oldState < (kLockedFlag + 2 * kTokenReferenceCountIncrement)) {
207 delete this;
208 }
209}
210
211bool CancellationState::tryLockAndCancelUnlessCancelled() noexcept {
212 folly::detail::Sleeper sleeper;
213 std::uint64_t oldState = state_.load(std::memory_order_acquire);
214 while (true) {
215 if (isCancellationRequested(oldState)) {
216 return false;
217 } else if (isLocked(oldState)) {
218 sleeper.wait();
219 oldState = state_.load(std::memory_order_acquire);
220 } else if (state_.compare_exchange_weak(
221 oldState,
222 oldState | kLockedFlag | kCancellationRequestedFlag,
223 std::memory_order_acq_rel,
224 std::memory_order_acquire)) {
225 return true;
226 }
227 }
228}
229
230template <typename Predicate>
231bool CancellationState::tryLock(Predicate predicate) noexcept {
232 folly::detail::Sleeper sleeper;
233 std::uint64_t oldState = state_.load(std::memory_order_acquire);
234 while (true) {
235 if (!predicate(oldState)) {
236 return false;
237 } else if (isLocked(oldState)) {
238 sleeper.wait();
239 oldState = state_.load(std::memory_order_acquire);
240 } else if (state_.compare_exchange_weak(
241 oldState,
242 oldState | kLockedFlag,
243 std::memory_order_acquire)) {
244 return true;
245 }
246 }
247}
248
249} // namespace detail
250} // namespace folly
251