1 | /* |
2 | * Copyright (c) Glow Contributors. See CONTRIBUTORS file. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | #include "glow/Support/ThreadPool.h" |
17 | #include "folly/system/ThreadName.h" |
18 | |
19 | namespace glow { |
20 | |
21 | namespace threads { |
22 | |
23 | static std::atomic<std::size_t> thread_idx{0}; |
24 | |
25 | size_t getThreadId() { |
26 | thread_local std::size_t id = thread_idx++; |
27 | return id; |
28 | } |
29 | |
30 | size_t createThreadId() { return thread_idx++; } |
31 | |
32 | } // namespace threads |
33 | |
34 | ThreadExecutor::ThreadExecutor(const std::string &name) |
35 | : shouldStop_(false), worker_([this, name]() { |
36 | if (!name.empty()) { |
37 | folly::setThreadName(name); |
38 | } |
39 | threadPoolWorkerMain(); |
40 | }) {} |
41 | |
42 | ThreadExecutor::~ThreadExecutor() { stop(true); } |
43 | |
44 | void ThreadExecutor::stop(bool block) { |
45 | // Lock mutex before signalling for threads to stop to make sure |
46 | // // a thread can't wait on the condition variable after checking the |
47 | // // *old* value of shouldStop_. |
48 | std::unique_lock<std::mutex> lock(workQueueMtx_); |
49 | |
50 | shouldStop_ = true; |
51 | lock.unlock(); |
52 | queueNotEmpty_.notify_all(); |
53 | |
54 | if (block && worker_.joinable()) { |
55 | worker_.join(); |
56 | } |
57 | } |
58 | |
59 | void ThreadExecutor::threadPoolWorkerMain() { |
60 | std::unique_lock<std::mutex> lock(workQueueMtx_, std::defer_lock); |
61 | |
62 | while (!shouldStop_) { |
63 | // Lock the lock after processing a work item. |
64 | lock.lock(); |
65 | |
66 | // If work queue is empty, wait to be signalled when |
67 | // a work item is submitted. |
68 | while (workQueue_.empty() && !shouldStop_) { |
69 | queueNotEmpty_.wait(lock); |
70 | } |
71 | |
72 | // If shouldStop_ was set to false while the thread |
73 | // was asleep, break out of the main loop. |
74 | if (shouldStop_) { |
75 | break; |
76 | } |
77 | |
78 | // Pop a work item from the queue, and make sure to unlock |
79 | // the lock before processing it. |
80 | auto workItem = std::move(workQueue_.front()); |
81 | workQueue_.pop(); |
82 | lock.unlock(); |
83 | |
84 | // Process work item. |
85 | workItem(); |
86 | } |
87 | } |
88 | |
89 | std::future<void> |
90 | ThreadExecutor::submit(std::packaged_task<void(void)> &&task) { |
91 | std::unique_lock<std::mutex> lock(workQueueMtx_); |
92 | auto future = task.get_future(); |
93 | workQueue_.push(std::move(task)); |
94 | lock.unlock(); |
95 | queueNotEmpty_.notify_one(); |
96 | return future; |
97 | } |
98 | |
99 | ThreadPool::ThreadPool(unsigned numWorkers, const std::string &name) { |
100 | // Intialize all workers and make each one run threadPoolWorkerMain. |
101 | workers_.reserve(kNumWorkers); |
102 | for (unsigned i = 0; i < numWorkers; i++) { |
103 | workers_.push_back(new ThreadExecutor(name)); |
104 | size_t threadId{0}; |
105 | workers_.back() |
106 | ->submit([&threadId] { threadId = threads::getThreadId(); }) |
107 | .wait(); |
108 | threadIds_.insert(threadId); |
109 | } |
110 | } |
111 | |
112 | ThreadPool::~ThreadPool() { |
113 | stop(true); |
114 | for (auto *w : workers_) { |
115 | delete w; |
116 | } |
117 | workers_.clear(); |
118 | } |
119 | |
120 | void ThreadPool::stop(bool block) { |
121 | // Signal to workers to stop. |
122 | for (auto *w : workers_) { |
123 | w->stop(block); |
124 | } |
125 | } |
126 | |
127 | std::future<void> ThreadPool::submit(std::packaged_task<void(void)> &&task) { |
128 | ThreadExecutor *ex = getExecutor(); |
129 | return ex->submit(std::move(task)); |
130 | } |
131 | |
132 | } // namespace glow |
133 | |