1/*
2 * Copyright (c) Glow Contributors. See CONTRIBUTORS file.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16#include "glow/Support/ThreadPool.h"
17#include "folly/system/ThreadName.h"
18
19namespace glow {
20
21namespace threads {
22
23static std::atomic<std::size_t> thread_idx{0};
24
25size_t getThreadId() {
26 thread_local std::size_t id = thread_idx++;
27 return id;
28}
29
30size_t createThreadId() { return thread_idx++; }
31
32} // namespace threads
33
34ThreadExecutor::ThreadExecutor(const std::string &name)
35 : shouldStop_(false), worker_([this, name]() {
36 if (!name.empty()) {
37 folly::setThreadName(name);
38 }
39 threadPoolWorkerMain();
40 }) {}
41
42ThreadExecutor::~ThreadExecutor() { stop(true); }
43
44void ThreadExecutor::stop(bool block) {
45 // Lock mutex before signalling for threads to stop to make sure
46 // // a thread can't wait on the condition variable after checking the
47 // // *old* value of shouldStop_.
48 std::unique_lock<std::mutex> lock(workQueueMtx_);
49
50 shouldStop_ = true;
51 lock.unlock();
52 queueNotEmpty_.notify_all();
53
54 if (block && worker_.joinable()) {
55 worker_.join();
56 }
57}
58
59void ThreadExecutor::threadPoolWorkerMain() {
60 std::unique_lock<std::mutex> lock(workQueueMtx_, std::defer_lock);
61
62 while (!shouldStop_) {
63 // Lock the lock after processing a work item.
64 lock.lock();
65
66 // If work queue is empty, wait to be signalled when
67 // a work item is submitted.
68 while (workQueue_.empty() && !shouldStop_) {
69 queueNotEmpty_.wait(lock);
70 }
71
72 // If shouldStop_ was set to false while the thread
73 // was asleep, break out of the main loop.
74 if (shouldStop_) {
75 break;
76 }
77
78 // Pop a work item from the queue, and make sure to unlock
79 // the lock before processing it.
80 auto workItem = std::move(workQueue_.front());
81 workQueue_.pop();
82 lock.unlock();
83
84 // Process work item.
85 workItem();
86 }
87}
88
89std::future<void>
90ThreadExecutor::submit(std::packaged_task<void(void)> &&task) {
91 std::unique_lock<std::mutex> lock(workQueueMtx_);
92 auto future = task.get_future();
93 workQueue_.push(std::move(task));
94 lock.unlock();
95 queueNotEmpty_.notify_one();
96 return future;
97}
98
99ThreadPool::ThreadPool(unsigned numWorkers, const std::string &name) {
100 // Intialize all workers and make each one run threadPoolWorkerMain.
101 workers_.reserve(kNumWorkers);
102 for (unsigned i = 0; i < numWorkers; i++) {
103 workers_.push_back(new ThreadExecutor(name));
104 size_t threadId{0};
105 workers_.back()
106 ->submit([&threadId] { threadId = threads::getThreadId(); })
107 .wait();
108 threadIds_.insert(threadId);
109 }
110}
111
112ThreadPool::~ThreadPool() {
113 stop(true);
114 for (auto *w : workers_) {
115 delete w;
116 }
117 workers_.clear();
118}
119
120void ThreadPool::stop(bool block) {
121 // Signal to workers to stop.
122 for (auto *w : workers_) {
123 w->stop(block);
124 }
125}
126
127std::future<void> ThreadPool::submit(std::packaged_task<void(void)> &&task) {
128 ThreadExecutor *ex = getExecutor();
129 return ex->submit(std::move(task));
130}
131
132} // namespace glow
133