1// Copyright 2020 The Marl Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15// This file contains a number of benchmarks that do not use marl.
16// They exist to compare marl's performance against other simple scheduler
17// approaches.
18
19#include "marl_bench.h"
20
21#include "benchmark/benchmark.h"
22
23#include <mutex>
24#include <queue>
25#include <thread>
26
27namespace {
28
29// Event provides a basic wait-and-signal synchronization primitive.
30class Event {
31 public:
32 // wait blocks until the event is fired.
33 void wait() {
34 std::unique_lock<std::mutex> lock(mutex_);
35 cv_.wait(lock, [&] { return signalled_; });
36 }
37
38 // signal signals the Event, unblocking any calls to wait.
39 void signal() {
40 std::unique_lock<std::mutex> lock(mutex_);
41 signalled_ = true;
42 cv_.notify_all();
43 }
44
45 private:
46 std::condition_variable cv_;
47 std::mutex mutex_;
48 bool signalled_ = false;
49};
50
51} // anonymous namespace
52
53// A simple multi-thread, single-queue task executor that shares a single mutex
54// across N threads. This implementation suffers from lock contention.
55static void SingleQueueTaskExecutor(benchmark::State& state) {
56 using Task = std::function<uint32_t(uint32_t)>;
57
58 auto const numTasks = Schedule::numTasks(state);
59 auto const numThreads = Schedule::numThreads(state);
60
61 for (auto _ : state) {
62 state.PauseTiming();
63
64 std::mutex mutex;
65 // Set everything up with the mutex locked to prevent the threads from
66 // performing work while the timing is paused.
67 mutex.lock();
68
69 // Set up the tasks.
70 std::queue<Task> tasks;
71 for (int i = 0; i < numTasks; i++) {
72 tasks.push(Schedule::doSomeWork);
73 }
74
75 auto taskRunner = [&] {
76 while (true) {
77 Task task;
78
79 // Take the next task.
80 // Note that this lock is likely to block while waiting for other
81 // threads.
82 mutex.lock();
83 if (tasks.size() > 0) {
84 task = tasks.front();
85 tasks.pop();
86 }
87 mutex.unlock();
88
89 if (task) {
90 task(123);
91 } else {
92 return; // done.
93 }
94 }
95 };
96
97 // Set up the threads.
98 std::vector<std::thread> threads;
99 for (int i = 0; i < numThreads; i++) {
100 threads.emplace_back(std::thread(taskRunner));
101 }
102
103 state.ResumeTiming();
104 mutex.unlock(); // Go threads, go!
105
106 if (numThreads > 0) {
107 // Wait for all threads to finish.
108 for (auto& thread : threads) {
109 thread.join();
110 }
111 } else {
112 // Single-threaded test - just run the worker.
113 taskRunner();
114 }
115 }
116}
117BENCHMARK(SingleQueueTaskExecutor)->Apply(Schedule::args);
118
119// A simple multi-thread, multi-queue task executor that avoids lock contention.
120// Tasks queues are evenly balanced, and each should take an equal amount of
121// time to execute.
122static void MultiQueueTaskExecutor(benchmark::State& state) {
123 using Task = std::function<uint32_t(uint32_t)>;
124 using TaskQueue = std::vector<Task>;
125
126 auto const numTasks = Schedule::numTasks(state);
127 auto const numThreads = Schedule::numThreads(state);
128 auto const numQueues = std::max(numThreads, 1);
129
130 // Set up the tasks queues.
131 std::vector<TaskQueue> taskQueues(numQueues);
132 for (int i = 0; i < numTasks; i++) {
133 taskQueues[i % numQueues].emplace_back(Schedule::doSomeWork);
134 }
135
136 for (auto _ : state) {
137 if (numThreads > 0) {
138 state.PauseTiming();
139 Event start;
140
141 // Set up the threads.
142 std::vector<std::thread> threads;
143 for (int i = 0; i < numThreads; i++) {
144 threads.emplace_back(std::thread([&, i] {
145 start.wait();
146 for (auto& task : taskQueues[i]) {
147 task(123);
148 }
149 }));
150 }
151
152 state.ResumeTiming();
153 start.signal();
154
155 // Wait for all threads to finish.
156 for (auto& thread : threads) {
157 thread.join();
158 }
159 } else {
160 // Single-threaded test - just run the tasks.
161 for (auto& task : taskQueues[0]) {
162 task(123);
163 }
164 }
165 }
166}
167BENCHMARK(MultiQueueTaskExecutor)->Apply(Schedule::args);