1 | /* Standard C headers */ |
2 | #include <assert.h> |
3 | #include <limits.h> |
4 | #include <stdbool.h> |
5 | #include <stdint.h> |
6 | #include <stdlib.h> |
7 | #include <string.h> |
8 | |
9 | /* Configuration header */ |
10 | #include "threadpool-common.h" |
11 | |
12 | /* POSIX headers */ |
13 | #include <pthread.h> |
14 | #include <unistd.h> |
15 | |
16 | /* Futex-specific headers */ |
17 | #if PTHREADPOOL_USE_FUTEX |
18 | #if defined(__linux__) |
19 | #include <sys/syscall.h> |
20 | #include <linux/futex.h> |
21 | |
22 | /* Old Android NDKs do not define SYS_futex and FUTEX_PRIVATE_FLAG */ |
23 | #ifndef SYS_futex |
24 | #define SYS_futex __NR_futex |
25 | #endif |
26 | #ifndef FUTEX_PRIVATE_FLAG |
27 | #define FUTEX_PRIVATE_FLAG 128 |
28 | #endif |
29 | #elif defined(__EMSCRIPTEN__) |
30 | /* math.h for INFINITY constant */ |
31 | #include <math.h> |
32 | |
33 | #include <emscripten/threading.h> |
34 | #else |
35 | #error "Platform-specific implementation of futex_wait and futex_wake_all required" |
36 | #endif |
37 | #endif |
38 | |
39 | /* Windows-specific headers */ |
40 | #ifdef _WIN32 |
41 | #include <sysinfoapi.h> |
42 | #endif |
43 | |
44 | /* Dependencies */ |
45 | #if PTHREADPOOL_USE_CPUINFO |
46 | #include <cpuinfo.h> |
47 | #endif |
48 | |
49 | /* Public library header */ |
50 | #include <pthreadpool.h> |
51 | |
52 | /* Internal library headers */ |
53 | #include "threadpool-atomics.h" |
54 | #include "threadpool-object.h" |
55 | #include "threadpool-utils.h" |
56 | |
57 | |
58 | #if PTHREADPOOL_USE_FUTEX |
59 | #if defined(__linux__) |
60 | static int futex_wait(pthreadpool_atomic_uint32_t* address, uint32_t value) { |
61 | return syscall(SYS_futex, address, FUTEX_WAIT | FUTEX_PRIVATE_FLAG, value, NULL); |
62 | } |
63 | |
64 | static int futex_wake_all(pthreadpool_atomic_uint32_t* address) { |
65 | return syscall(SYS_futex, address, FUTEX_WAKE | FUTEX_PRIVATE_FLAG, INT_MAX); |
66 | } |
67 | #elif defined(__EMSCRIPTEN__) |
68 | static int futex_wait(pthreadpool_atomic_uint32_t* address, uint32_t value) { |
69 | return emscripten_futex_wait((volatile void*) address, value, INFINITY); |
70 | } |
71 | |
72 | static int futex_wake_all(pthreadpool_atomic_uint32_t* address) { |
73 | return emscripten_futex_wake((volatile void*) address, INT_MAX); |
74 | } |
75 | #else |
76 | #error "Platform-specific implementation of futex_wait and futex_wake_all required" |
77 | #endif |
78 | #endif |
79 | |
80 | static void checkin_worker_thread(struct pthreadpool* threadpool) { |
81 | #if PTHREADPOOL_USE_FUTEX |
82 | if (pthreadpool_decrement_fetch_relaxed_size_t(&threadpool->active_threads) == 0) { |
83 | pthreadpool_store_release_uint32_t(&threadpool->has_active_threads, 0); |
84 | futex_wake_all(&threadpool->has_active_threads); |
85 | } |
86 | #else |
87 | pthread_mutex_lock(&threadpool->completion_mutex); |
88 | if (pthreadpool_decrement_fetch_release_size_t(&threadpool->active_threads) == 0) { |
89 | pthread_cond_signal(&threadpool->completion_condvar); |
90 | } |
91 | pthread_mutex_unlock(&threadpool->completion_mutex); |
92 | #endif |
93 | } |
94 | |
95 | static void wait_worker_threads(struct pthreadpool* threadpool) { |
96 | /* Initial check */ |
97 | #if PTHREADPOOL_USE_FUTEX |
98 | uint32_t has_active_threads = pthreadpool_load_acquire_uint32_t(&threadpool->has_active_threads); |
99 | if (has_active_threads == 0) { |
100 | return; |
101 | } |
102 | #else |
103 | size_t active_threads = pthreadpool_load_acquire_size_t(&threadpool->active_threads); |
104 | if (active_threads == 0) { |
105 | return; |
106 | } |
107 | #endif |
108 | |
109 | /* Spin-wait */ |
110 | for (uint32_t i = PTHREADPOOL_SPIN_WAIT_ITERATIONS; i != 0; i--) { |
111 | pthreadpool_yield(); |
112 | |
113 | #if PTHREADPOOL_USE_FUTEX |
114 | has_active_threads = pthreadpool_load_acquire_uint32_t(&threadpool->has_active_threads); |
115 | if (has_active_threads == 0) { |
116 | return; |
117 | } |
118 | #else |
119 | active_threads = pthreadpool_load_acquire_size_t(&threadpool->active_threads); |
120 | if (active_threads == 0) { |
121 | return; |
122 | } |
123 | #endif |
124 | } |
125 | |
126 | /* Fall-back to mutex/futex wait */ |
127 | #if PTHREADPOOL_USE_FUTEX |
128 | while ((has_active_threads = pthreadpool_load_acquire_uint32_t(&threadpool->has_active_threads)) != 0) { |
129 | futex_wait(&threadpool->has_active_threads, 1); |
130 | } |
131 | #else |
132 | pthread_mutex_lock(&threadpool->completion_mutex); |
133 | while (pthreadpool_load_acquire_size_t(&threadpool->active_threads) != 0) { |
134 | pthread_cond_wait(&threadpool->completion_condvar, &threadpool->completion_mutex); |
135 | }; |
136 | pthread_mutex_unlock(&threadpool->completion_mutex); |
137 | #endif |
138 | } |
139 | |
140 | static uint32_t wait_for_new_command( |
141 | struct pthreadpool* threadpool, |
142 | uint32_t last_command, |
143 | uint32_t last_flags) |
144 | { |
145 | uint32_t command = pthreadpool_load_acquire_uint32_t(&threadpool->command); |
146 | if (command != last_command) { |
147 | return command; |
148 | } |
149 | |
150 | if ((last_flags & PTHREADPOOL_FLAG_YIELD_WORKERS) == 0) { |
151 | /* Spin-wait loop */ |
152 | for (uint32_t i = PTHREADPOOL_SPIN_WAIT_ITERATIONS; i != 0; i--) { |
153 | pthreadpool_yield(); |
154 | |
155 | command = pthreadpool_load_acquire_uint32_t(&threadpool->command); |
156 | if (command != last_command) { |
157 | return command; |
158 | } |
159 | } |
160 | } |
161 | |
162 | /* Spin-wait disabled or timed out, fall back to mutex/futex wait */ |
163 | #if PTHREADPOOL_USE_FUTEX |
164 | do { |
165 | futex_wait(&threadpool->command, last_command); |
166 | command = pthreadpool_load_acquire_uint32_t(&threadpool->command); |
167 | } while (command == last_command); |
168 | #else |
169 | /* Lock the command mutex */ |
170 | pthread_mutex_lock(&threadpool->command_mutex); |
171 | /* Read the command */ |
172 | while ((command = pthreadpool_load_acquire_uint32_t(&threadpool->command)) == last_command) { |
173 | /* Wait for new command */ |
174 | pthread_cond_wait(&threadpool->command_condvar, &threadpool->command_mutex); |
175 | } |
176 | /* Read a new command */ |
177 | pthread_mutex_unlock(&threadpool->command_mutex); |
178 | #endif |
179 | return command; |
180 | } |
181 | |
182 | static void* thread_main(void* arg) { |
183 | struct thread_info* thread = (struct thread_info*) arg; |
184 | struct pthreadpool* threadpool = thread->threadpool; |
185 | uint32_t last_command = threadpool_command_init; |
186 | struct fpu_state saved_fpu_state = { 0 }; |
187 | uint32_t flags = 0; |
188 | |
189 | /* Check in */ |
190 | checkin_worker_thread(threadpool); |
191 | |
192 | /* Monitor new commands and act accordingly */ |
193 | for (;;) { |
194 | uint32_t command = wait_for_new_command(threadpool, last_command, flags); |
195 | pthreadpool_fence_acquire(); |
196 | |
197 | flags = pthreadpool_load_relaxed_uint32_t(&threadpool->flags); |
198 | |
199 | /* Process command */ |
200 | switch (command & THREADPOOL_COMMAND_MASK) { |
201 | case threadpool_command_parallelize: |
202 | { |
203 | const thread_function_t thread_function = |
204 | (thread_function_t) pthreadpool_load_relaxed_void_p(&threadpool->thread_function); |
205 | if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) { |
206 | saved_fpu_state = get_fpu_state(); |
207 | disable_fpu_denormals(); |
208 | } |
209 | |
210 | thread_function(threadpool, thread); |
211 | if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) { |
212 | set_fpu_state(saved_fpu_state); |
213 | } |
214 | break; |
215 | } |
216 | case threadpool_command_shutdown: |
217 | /* Exit immediately: the master thread is waiting on pthread_join */ |
218 | return NULL; |
219 | case threadpool_command_init: |
220 | /* To inhibit compiler warning */ |
221 | break; |
222 | } |
223 | /* Notify the master thread that we finished processing */ |
224 | checkin_worker_thread(threadpool); |
225 | /* Update last command */ |
226 | last_command = command; |
227 | }; |
228 | } |
229 | |
230 | struct pthreadpool* pthreadpool_create(size_t threads_count) { |
231 | #if PTHREADPOOL_USE_CPUINFO |
232 | if (!cpuinfo_initialize()) { |
233 | return NULL; |
234 | } |
235 | #endif |
236 | |
237 | if (threads_count == 0) { |
238 | #if PTHREADPOOL_USE_CPUINFO |
239 | threads_count = cpuinfo_get_processors_count(); |
240 | #elif defined(_SC_NPROCESSORS_ONLN) |
241 | threads_count = (size_t) sysconf(_SC_NPROCESSORS_ONLN); |
242 | #if defined(__EMSCRIPTEN_PTHREADS__) |
243 | /* Limit the number of threads to 8 to match link-time PTHREAD_POOL_SIZE option */ |
244 | if (threads_count >= 8) { |
245 | threads_count = 8; |
246 | } |
247 | #endif |
248 | #elif defined(_WIN32) |
249 | SYSTEM_INFO system_info; |
250 | ZeroMemory(&system_info, sizeof(system_info)); |
251 | GetSystemInfo(&system_info); |
252 | threads_count = (size_t) system_info.dwNumberOfProcessors; |
253 | #else |
254 | #error "Platform-specific implementation of sysconf(_SC_NPROCESSORS_ONLN) required" |
255 | #endif |
256 | } |
257 | |
258 | struct pthreadpool* threadpool = pthreadpool_allocate(threads_count); |
259 | if (threadpool == NULL) { |
260 | return NULL; |
261 | } |
262 | threadpool->threads_count = fxdiv_init_size_t(threads_count); |
263 | for (size_t tid = 0; tid < threads_count; tid++) { |
264 | threadpool->threads[tid].thread_number = tid; |
265 | threadpool->threads[tid].threadpool = threadpool; |
266 | } |
267 | |
268 | /* Thread pool with a single thread computes everything on the caller thread. */ |
269 | if (threads_count > 1) { |
270 | pthread_mutex_init(&threadpool->execution_mutex, NULL); |
271 | #if !PTHREADPOOL_USE_FUTEX |
272 | pthread_mutex_init(&threadpool->completion_mutex, NULL); |
273 | pthread_cond_init(&threadpool->completion_condvar, NULL); |
274 | pthread_mutex_init(&threadpool->command_mutex, NULL); |
275 | pthread_cond_init(&threadpool->command_condvar, NULL); |
276 | #endif |
277 | |
278 | #if PTHREADPOOL_USE_FUTEX |
279 | pthreadpool_store_relaxed_uint32_t(&threadpool->has_active_threads, 1); |
280 | #endif |
281 | pthreadpool_store_relaxed_size_t(&threadpool->active_threads, threads_count - 1 /* caller thread */); |
282 | |
283 | /* Caller thread serves as worker #0. Thus, we create system threads starting with worker #1. */ |
284 | for (size_t tid = 1; tid < threads_count; tid++) { |
285 | pthread_create(&threadpool->threads[tid].thread_object, NULL, &thread_main, &threadpool->threads[tid]); |
286 | } |
287 | |
288 | /* Wait until all threads initialize */ |
289 | wait_worker_threads(threadpool); |
290 | } |
291 | return threadpool; |
292 | } |
293 | |
294 | PTHREADPOOL_INTERNAL void pthreadpool_parallelize( |
295 | struct pthreadpool* threadpool, |
296 | thread_function_t thread_function, |
297 | const void* params, |
298 | size_t params_size, |
299 | void* task, |
300 | void* context, |
301 | size_t linear_range, |
302 | uint32_t flags) |
303 | { |
304 | assert(threadpool != NULL); |
305 | assert(thread_function != NULL); |
306 | assert(task != NULL); |
307 | assert(linear_range > 1); |
308 | |
309 | /* Protect the global threadpool structures */ |
310 | pthread_mutex_lock(&threadpool->execution_mutex); |
311 | |
312 | #if !PTHREADPOOL_USE_FUTEX |
313 | /* Lock the command variables to ensure that threads don't start processing before they observe complete command with all arguments */ |
314 | pthread_mutex_lock(&threadpool->command_mutex); |
315 | #endif |
316 | |
317 | /* Setup global arguments */ |
318 | pthreadpool_store_relaxed_void_p(&threadpool->thread_function, (void*) thread_function); |
319 | pthreadpool_store_relaxed_void_p(&threadpool->task, task); |
320 | pthreadpool_store_relaxed_void_p(&threadpool->argument, context); |
321 | pthreadpool_store_relaxed_uint32_t(&threadpool->flags, flags); |
322 | |
323 | /* Locking of completion_mutex not needed: readers are sleeping on command_condvar */ |
324 | const struct fxdiv_divisor_size_t threads_count = threadpool->threads_count; |
325 | pthreadpool_store_relaxed_size_t(&threadpool->active_threads, threads_count.value - 1 /* caller thread */); |
326 | #if PTHREADPOOL_USE_FUTEX |
327 | pthreadpool_store_relaxed_uint32_t(&threadpool->has_active_threads, 1); |
328 | #endif |
329 | |
330 | if (params_size != 0) { |
331 | memcpy(&threadpool->params, params, params_size); |
332 | pthreadpool_fence_release(); |
333 | } |
334 | |
335 | /* Spread the work between threads */ |
336 | const struct fxdiv_result_size_t range_params = fxdiv_divide_size_t(linear_range, threads_count); |
337 | size_t range_start = 0; |
338 | for (size_t tid = 0; tid < threads_count.value; tid++) { |
339 | struct thread_info* thread = &threadpool->threads[tid]; |
340 | const size_t range_length = range_params.quotient + (size_t) (tid < range_params.remainder); |
341 | const size_t range_end = range_start + range_length; |
342 | pthreadpool_store_relaxed_size_t(&thread->range_start, range_start); |
343 | pthreadpool_store_relaxed_size_t(&thread->range_end, range_end); |
344 | pthreadpool_store_relaxed_size_t(&thread->range_length, range_length); |
345 | |
346 | /* The next subrange starts where the previous ended */ |
347 | range_start = range_end; |
348 | } |
349 | |
350 | /* |
351 | * Update the threadpool command. |
352 | * Imporantly, do it after initializing command parameters (range, task, argument, flags) |
353 | * ~(threadpool->command | THREADPOOL_COMMAND_MASK) flips the bits not in command mask |
354 | * to ensure the unmasked command is different then the last command, because worker threads |
355 | * monitor for change in the unmasked command. |
356 | */ |
357 | const uint32_t old_command = pthreadpool_load_relaxed_uint32_t(&threadpool->command); |
358 | const uint32_t new_command = ~(old_command | THREADPOOL_COMMAND_MASK) | threadpool_command_parallelize; |
359 | |
360 | /* |
361 | * Store the command with release semantics to guarantee that if a worker thread observes |
362 | * the new command value, it also observes the updated command parameters. |
363 | * |
364 | * Note: release semantics is necessary even with a conditional variable, because the workers might |
365 | * be waiting in a spin-loop rather than the conditional variable. |
366 | */ |
367 | pthreadpool_store_release_uint32_t(&threadpool->command, new_command); |
368 | #if PTHREADPOOL_USE_FUTEX |
369 | /* Wake up the threads */ |
370 | futex_wake_all(&threadpool->command); |
371 | #else |
372 | /* Unlock the command variables before waking up the threads for better performance */ |
373 | pthread_mutex_unlock(&threadpool->command_mutex); |
374 | |
375 | /* Wake up the threads */ |
376 | pthread_cond_broadcast(&threadpool->command_condvar); |
377 | #endif |
378 | |
379 | /* Save and modify FPU denormals control, if needed */ |
380 | struct fpu_state saved_fpu_state = { 0 }; |
381 | if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) { |
382 | saved_fpu_state = get_fpu_state(); |
383 | disable_fpu_denormals(); |
384 | } |
385 | |
386 | /* Do computations as worker #0 */ |
387 | thread_function(threadpool, &threadpool->threads[0]); |
388 | |
389 | /* Restore FPU denormals control, if needed */ |
390 | if (flags & PTHREADPOOL_FLAG_DISABLE_DENORMALS) { |
391 | set_fpu_state(saved_fpu_state); |
392 | } |
393 | |
394 | /* Wait until the threads finish computation */ |
395 | wait_worker_threads(threadpool); |
396 | |
397 | /* Make changes by other threads visible to this thread */ |
398 | pthreadpool_fence_acquire(); |
399 | |
400 | /* Unprotect the global threadpool structures */ |
401 | pthread_mutex_unlock(&threadpool->execution_mutex); |
402 | } |
403 | |
404 | void pthreadpool_destroy(struct pthreadpool* threadpool) { |
405 | if (threadpool != NULL) { |
406 | const size_t threads_count = threadpool->threads_count.value; |
407 | if (threads_count > 1) { |
408 | #if PTHREADPOOL_USE_FUTEX |
409 | pthreadpool_store_relaxed_size_t(&threadpool->active_threads, threads_count - 1 /* caller thread */); |
410 | pthreadpool_store_relaxed_uint32_t(&threadpool->has_active_threads, 1); |
411 | |
412 | /* |
413 | * Store the command with release semantics to guarantee that if a worker thread observes |
414 | * the new command value, it also observes the updated active_threads/has_active_threads values. |
415 | */ |
416 | pthreadpool_store_release_uint32_t(&threadpool->command, threadpool_command_shutdown); |
417 | |
418 | /* Wake up worker threads */ |
419 | futex_wake_all(&threadpool->command); |
420 | #else |
421 | /* Lock the command variable to ensure that threads don't shutdown until both command and active_threads are updated */ |
422 | pthread_mutex_lock(&threadpool->command_mutex); |
423 | |
424 | pthreadpool_store_relaxed_size_t(&threadpool->active_threads, threads_count - 1 /* caller thread */); |
425 | |
426 | /* |
427 | * Store the command with release semantics to guarantee that if a worker thread observes |
428 | * the new command value, it also observes the updated active_threads value. |
429 | * |
430 | * Note: the release fence inside pthread_mutex_unlock is insufficient, |
431 | * because the workers might be waiting in a spin-loop rather than the conditional variable. |
432 | */ |
433 | pthreadpool_store_release_uint32_t(&threadpool->command, threadpool_command_shutdown); |
434 | |
435 | /* Wake up worker threads */ |
436 | pthread_cond_broadcast(&threadpool->command_condvar); |
437 | |
438 | /* Commit the state changes and let workers start processing */ |
439 | pthread_mutex_unlock(&threadpool->command_mutex); |
440 | #endif |
441 | |
442 | /* Wait until all threads return */ |
443 | for (size_t thread = 1; thread < threads_count; thread++) { |
444 | pthread_join(threadpool->threads[thread].thread_object, NULL); |
445 | } |
446 | |
447 | /* Release resources */ |
448 | pthread_mutex_destroy(&threadpool->execution_mutex); |
449 | #if !PTHREADPOOL_USE_FUTEX |
450 | pthread_mutex_destroy(&threadpool->completion_mutex); |
451 | pthread_cond_destroy(&threadpool->completion_condvar); |
452 | pthread_mutex_destroy(&threadpool->command_mutex); |
453 | pthread_cond_destroy(&threadpool->command_condvar); |
454 | #endif |
455 | } |
456 | #if PTHREADPOOL_USE_CPUINFO |
457 | cpuinfo_deinitialize(); |
458 | #endif |
459 | pthreadpool_deallocate(threadpool); |
460 | } |
461 | } |
462 | |