1 /* Copyright (c) 2004, 2005, 2007, 2009, 2010. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
6 #include "internal_config.h"
10 #include <sys/syscall.h>
14 #include <linux/futex.h>
18 #include "xbt/parmap.h"
20 #include "xbt/function_types.h"
21 #include "xbt/dynar.h"
22 #include "xbt/xbt_os_thread.h"
23 #include "xbt/sysdep.h"
25 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(xbt_parmap, xbt, "parmap: parallel map");
26 XBT_LOG_NEW_SUBCATEGORY(xbt_parmap_unit, xbt_parmap, "parmap unit testing");
31 } e_xbt_parmap_flag_t;
33 static void xbt_parmap_set_mode(xbt_parmap_t parmap, e_xbt_parmap_mode_t mode);
34 static void *xbt_parmap_worker_main(void *parmap);
35 static void xbt_parmap_work(xbt_parmap_t parmap);
37 static void xbt_parmap_posix_master_wait(xbt_parmap_t parmap);
38 static void xbt_parmap_posix_worker_signal(xbt_parmap_t parmap);
39 static void xbt_parmap_posix_master_signal(xbt_parmap_t parmap);
40 static void xbt_parmap_posix_worker_wait(xbt_parmap_t parmap, unsigned round);
43 static void xbt_parmap_futex_master_wait(xbt_parmap_t parmap);
44 static void xbt_parmap_futex_worker_signal(xbt_parmap_t parmap);
45 static void xbt_parmap_futex_master_signal(xbt_parmap_t parmap);
46 static void xbt_parmap_futex_worker_wait(xbt_parmap_t parmap, unsigned round);
47 static void futex_wait(unsigned *uaddr, unsigned val);
48 static void futex_wake(unsigned *uaddr, unsigned val);
51 static void xbt_parmap_busy_master_wait(xbt_parmap_t parmap);
52 static void xbt_parmap_busy_worker_signal(xbt_parmap_t parmap);
53 static void xbt_parmap_busy_master_signal(xbt_parmap_t parmap);
54 static void xbt_parmap_busy_worker_wait(xbt_parmap_t parmap, unsigned round);
57 * \brief Parallel map structure
59 typedef struct s_xbt_parmap {
60 e_xbt_parmap_flag_t status; /**< is the parmap active or being destroyed? */
61 unsigned work; /**< index of the current round */
62 unsigned thread_counter; /**< number of workers that have done the work */
64 unsigned int num_workers; /**< total number of worker threads including the controller */
65 void_f_pvoid_t fun; /**< function to run in parallel on each element of data */
66 xbt_dynar_t data; /**< parameters to pass to fun in parallel */
67 unsigned int index; /**< index of the next element of data to pick */
70 xbt_os_cond_t ready_cond;
71 xbt_os_mutex_t ready_mutex;
72 xbt_os_cond_t done_cond;
73 xbt_os_mutex_t done_mutex;
75 /* fields that depend on the synchronization mode */
76 e_xbt_parmap_mode_t mode; /**< synchronization mode */
77 void (*master_wait_f)(xbt_parmap_t); /**< wait for the workers to have done the work */
78 void (*worker_signal_f)(xbt_parmap_t); /**< signal the master that a worker has done the work */
79 void (*master_signal_f)(xbt_parmap_t); /**< wakes the workers threads to process tasks */
80 void (*worker_wait_f)(xbt_parmap_t, unsigned); /**< waits for more work */
84 * \brief Creates a parallel map object
85 * \param num_workers number of worker threads to create
86 * \param mode how to synchronize the worker threads
87 * \return the parmap created
89 xbt_parmap_t xbt_parmap_new(unsigned int num_workers, e_xbt_parmap_mode_t mode)
92 xbt_os_thread_t worker = NULL;
94 XBT_DEBUG("Create new parmap (%u workers)", num_workers);
96 /* Initialize the thread pool data structure */
97 xbt_parmap_t parmap = xbt_new0(s_xbt_parmap_t, 1);
99 parmap->num_workers = num_workers;
100 parmap->status = XBT_PARMAP_WORK;
101 xbt_parmap_set_mode(parmap, mode);
103 /* Create the pool of worker threads */
104 for (i = 1; i < num_workers; i++) {
105 worker = xbt_os_thread_create(NULL, xbt_parmap_worker_main, parmap, NULL);
106 xbt_os_thread_detach(worker);
112 * \brief Destroys a parmap
113 * \param parmap the parmap to destroy
115 void xbt_parmap_destroy(xbt_parmap_t parmap)
121 parmap->status = XBT_PARMAP_DESTROY;
122 parmap->master_signal_f(parmap);
123 parmap->master_wait_f(parmap);
125 xbt_os_cond_destroy(parmap->ready_cond);
126 xbt_os_mutex_destroy(parmap->ready_mutex);
127 xbt_os_cond_destroy(parmap->done_cond);
128 xbt_os_mutex_destroy(parmap->done_mutex);
134 * \brief Sets the synchronization mode of a parmap.
135 * \param parmap a parallel map object
136 * \param mode the synchronization mode
138 static void xbt_parmap_set_mode(xbt_parmap_t parmap, e_xbt_parmap_mode_t mode)
140 if (mode == XBT_PARMAP_DEFAULT) {
142 mode = XBT_PARMAP_FUTEX;
144 mode = XBT_PARMAP_POSIX;
151 case XBT_PARMAP_POSIX:
152 parmap->master_wait_f = xbt_parmap_posix_master_wait;
153 parmap->worker_signal_f = xbt_parmap_posix_worker_signal;
154 parmap->master_signal_f = xbt_parmap_posix_master_signal;
155 parmap->worker_wait_f = xbt_parmap_posix_worker_wait;
157 parmap->ready_cond = xbt_os_cond_init();
158 parmap->ready_mutex = xbt_os_mutex_init();
159 parmap->done_cond = xbt_os_cond_init();
160 parmap->done_mutex = xbt_os_mutex_init();
164 case XBT_PARMAP_FUTEX:
166 parmap->master_wait_f = xbt_parmap_futex_master_wait;
167 parmap->worker_signal_f = xbt_parmap_futex_worker_signal;
168 parmap->master_signal_f = xbt_parmap_futex_master_signal;
169 parmap->worker_wait_f = xbt_parmap_futex_worker_wait;
171 xbt_os_cond_destroy(parmap->ready_cond);
172 xbt_os_mutex_destroy(parmap->ready_mutex);
173 xbt_os_cond_destroy(parmap->done_cond);
174 xbt_os_mutex_destroy(parmap->done_mutex);
177 xbt_die("Futex is not available on this OS.");
180 case XBT_PARMAP_BUSY_WAIT:
181 parmap->master_wait_f = xbt_parmap_busy_master_wait;
182 parmap->worker_signal_f = xbt_parmap_busy_worker_signal;
183 parmap->master_signal_f = xbt_parmap_busy_master_signal;
184 parmap->worker_wait_f = xbt_parmap_busy_worker_wait;
186 xbt_os_cond_destroy(parmap->ready_cond);
187 xbt_os_mutex_destroy(parmap->ready_mutex);
188 xbt_os_cond_destroy(parmap->done_cond);
189 xbt_os_mutex_destroy(parmap->done_mutex);
192 case XBT_PARMAP_DEFAULT:
199 * \brief Applies a list of tasks in parallel.
200 * \param parmap a parallel map object
201 * \param fun the function to call in parallel
202 * \param data each element of this dynar will be passed as an argument to fun
204 void xbt_parmap_apply(xbt_parmap_t parmap, void_f_pvoid_t fun, xbt_dynar_t data)
206 /* Assign resources to worker threads */
210 parmap->master_signal_f(parmap);
211 xbt_parmap_work(parmap);
212 parmap->master_wait_f(parmap);
213 XBT_DEBUG("Job done");
217 * \brief Returns a next task to process.
219 * Worker threads call this function to get more work.
221 * \return the next task to process, or NULL if there is no more work
223 void* xbt_parmap_next(xbt_parmap_t parmap)
225 unsigned int index = __sync_fetch_and_add(&parmap->index, 1);
226 if (index < xbt_dynar_length(parmap->data)) {
227 return xbt_dynar_get_as(parmap->data, index, void*);
232 static void xbt_parmap_work(xbt_parmap_t parmap)
235 while ((index = __sync_fetch_and_add(&parmap->index, 1))
236 < xbt_dynar_length(parmap->data))
237 parmap->fun(xbt_dynar_get_as(parmap->data, index, void*));
241 * \brief Main function of a worker thread.
242 * \param arg the parmap
244 static void *xbt_parmap_worker_main(void *arg)
246 xbt_parmap_t parmap = (xbt_parmap_t) arg;
249 XBT_DEBUG("New worker thread created");
251 /* Worker's main loop */
253 parmap->worker_wait_f(parmap, ++round);
254 if (parmap->status == XBT_PARMAP_WORK) {
256 XBT_DEBUG("Worker got a job");
258 xbt_parmap_work(parmap);
259 parmap->worker_signal_f(parmap);
261 XBT_DEBUG("Worker has finished");
263 /* We are destroying the parmap */
265 parmap->worker_signal_f(parmap);
272 static void futex_wait(unsigned *uaddr, unsigned val)
274 XBT_VERB("Waiting on futex %p", uaddr);
275 syscall(SYS_futex, uaddr, FUTEX_WAIT_PRIVATE, val, NULL, NULL, 0);
278 static void futex_wake(unsigned *uaddr, unsigned val)
280 XBT_VERB("Waking futex %p", uaddr);
281 syscall(SYS_futex, uaddr, FUTEX_WAKE_PRIVATE, val, NULL, NULL, 0);
286 * \brief Starts the parmap: waits for all workers to be ready and returns.
288 * This function is called by the controller thread.
290 * \param parmap a parmap
292 static void xbt_parmap_posix_master_wait(xbt_parmap_t parmap)
294 xbt_os_mutex_acquire(parmap->done_mutex);
295 if (parmap->thread_counter < parmap->num_workers) {
296 /* wait for all workers to be ready */
297 xbt_os_cond_wait(parmap->done_cond, parmap->done_mutex);
299 xbt_os_mutex_release(parmap->done_mutex);
303 * \brief Ends the parmap: wakes the controller thread when all workers terminate.
305 * This function is called by all worker threads when they end (not including
308 * \param parmap a parmap
310 static void xbt_parmap_posix_worker_signal(xbt_parmap_t parmap)
312 xbt_os_mutex_acquire(parmap->done_mutex);
313 if (++parmap->thread_counter == parmap->num_workers) {
314 /* all workers have finished, wake the controller */
315 xbt_os_cond_signal(parmap->done_cond);
317 xbt_os_mutex_release(parmap->done_mutex);
321 * \brief Wakes all workers and waits for them to finish the tasks.
323 * This function is called by the controller thread.
325 * \param parmap a parmap
327 static void xbt_parmap_posix_master_signal(xbt_parmap_t parmap)
329 xbt_os_mutex_acquire(parmap->ready_mutex);
330 parmap->thread_counter = 1;
332 /* wake all workers */
333 xbt_os_cond_broadcast(parmap->ready_cond);
334 xbt_os_mutex_release(parmap->ready_mutex);
338 * \brief Waits for some work to process.
340 * This function is called by each worker thread (not including the controller)
341 * when it has no more work to do.
343 * \param parmap a parmap
344 * \param round the expected round number
346 static void xbt_parmap_posix_worker_wait(xbt_parmap_t parmap, unsigned round)
348 xbt_os_mutex_acquire(parmap->ready_mutex);
349 /* wait for more work */
350 if (parmap->work != round) {
351 xbt_os_cond_wait(parmap->ready_cond, parmap->ready_mutex);
353 xbt_os_mutex_release(parmap->ready_mutex);
358 * \brief Starts the parmap: waits for all workers to be ready and returns.
360 * This function is called by the controller thread.
362 * \param parmap a parmap
364 static void xbt_parmap_futex_master_wait(xbt_parmap_t parmap)
366 unsigned count = parmap->thread_counter;
367 while (count < parmap->num_workers) {
368 /* wait for all workers to be ready */
369 futex_wait(&parmap->thread_counter, count);
370 count = parmap->thread_counter;
375 * \brief Ends the parmap: wakes the controller thread when all workers terminate.
377 * This function is called by all worker threads when they end (not including
380 * \param parmap a parmap
382 static void xbt_parmap_futex_worker_signal(xbt_parmap_t parmap)
384 unsigned count = __sync_add_and_fetch(&parmap->thread_counter, 1);
385 if (count == parmap->num_workers) {
386 /* all workers have finished, wake the controller */
387 futex_wake(&parmap->thread_counter, INT_MAX);
392 * \brief Wakes all workers and waits for them to finish the tasks.
394 * This function is called by the controller thread.
396 * \param parmap a parmap
398 static void xbt_parmap_futex_master_signal(xbt_parmap_t parmap)
400 parmap->thread_counter = 1;
401 __sync_add_and_fetch(&parmap->work, 1);
402 /* wake all workers */
403 futex_wake(&parmap->work, INT_MAX);
407 * \brief Waits for some work to process.
409 * This function is called by each worker thread (not including the controller)
410 * when it has no more work to do.
412 * \param parmap a parmap
413 * \param round the expected round number
415 static void xbt_parmap_futex_worker_wait(xbt_parmap_t parmap, unsigned round)
417 unsigned work = parmap->work;
418 /* wait for more work */
419 while (work != round) {
420 futex_wait(&parmap->work, work);
427 * \brief Starts the parmap: waits for all workers to be ready and returns.
429 * This function is called by the controller thread.
431 * \param parmap a parmap
433 static void xbt_parmap_busy_master_wait(xbt_parmap_t parmap)
435 while (parmap->thread_counter < parmap->num_workers) {
436 xbt_os_thread_yield();
441 * \brief Ends the parmap: wakes the controller thread when all workers terminate.
443 * This function is called by all worker threads when they end.
445 * \param parmap a parmap
447 static void xbt_parmap_busy_worker_signal(xbt_parmap_t parmap)
449 __sync_add_and_fetch(&parmap->thread_counter, 1);
453 * \brief Wakes all workers and waits for them to finish the tasks.
455 * This function is called by the controller thread.
457 * \param parmap a parmap
459 static void xbt_parmap_busy_master_signal(xbt_parmap_t parmap)
461 parmap->thread_counter = 1;
462 __sync_add_and_fetch(&parmap->work, 1);
466 * \brief Waits for some work to process.
468 * This function is called by each worker thread (not including the controller)
469 * when it has no more work to do.
471 * \param parmap a parmap
472 * \param round the expected round number
474 static void xbt_parmap_busy_worker_wait(xbt_parmap_t parmap, unsigned round)
476 /* wait for more work */
477 while (parmap->work != round) {
478 xbt_os_thread_yield();
485 #include "xbt/xbt_os_thread.h"
486 #include "xbt/xbt_os_time.h"
487 #include "internal_config.h" /* HAVE_FUTEX_H */
489 XBT_TEST_SUITE("parmap", "Parallel Map");
490 XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(xbt_parmap_unit);
493 #define TEST_PARMAP_SKIP_TEST(mode) 0
495 #define TEST_PARMAP_SKIP_TEST(mode) ((mode) == XBT_PARMAP_FUTEX)
498 #define TEST_PARMAP_VALIDATE_MODE(mode) \
499 if (TEST_PARMAP_SKIP_TEST(mode)) { xbt_test_skip(); return; } else ((void)0)
501 static void fun_double(void *arg)
507 /* Check that the computations are correctly done. */
508 static void test_parmap_basic(e_xbt_parmap_mode_t mode)
510 unsigned num_workers;
512 for (num_workers = 1 ; num_workers <= 16 ; num_workers *= 2) {
513 const unsigned len = 1033;
514 const unsigned num = 5;
520 xbt_test_add("Basic parmap usage (%u workers)", num_workers);
522 TEST_PARMAP_VALIDATE_MODE(mode);
523 parmap = xbt_parmap_new(num_workers, mode);
525 a = xbt_malloc(len * sizeof *a);
526 data = xbt_dynar_new(sizeof a, NULL);
527 for (i = 0; i < len; i++) {
529 xbt_dynar_push_as(data, void *, &a[i]);
532 for (i = 0; i < num; i++)
533 xbt_parmap_apply(parmap, fun_double, data);
535 for (i = 0; i < len; i++) {
536 unsigned expected = (1U << num) * (i + 1) - 1;
537 xbt_test_assert(a[i] == expected,
538 "a[%u]: expected %u, got %u", i, expected, a[i]);
541 xbt_dynar_free(&data);
543 xbt_parmap_destroy(parmap);
547 XBT_TEST_UNIT("basic_posix", test_parmap_basic_posix, "Basic usage: posix")
549 test_parmap_basic(XBT_PARMAP_POSIX);
552 XBT_TEST_UNIT("basic_futex", test_parmap_basic_futex, "Basic usage: futex")
554 test_parmap_basic(XBT_PARMAP_FUTEX);
557 XBT_TEST_UNIT("basic_busy_wait", test_parmap_basic_busy_wait, "Basic usage: busy_wait")
559 test_parmap_basic(XBT_PARMAP_BUSY_WAIT);
562 static void fun_get_id(void *arg)
564 *(uintptr_t *)arg = (uintptr_t)xbt_os_thread_self();
568 static int fun_compare(const void *pa, const void *pb)
570 uintptr_t a = *(uintptr_t *)pa;
571 uintptr_t b = *(uintptr_t *)pb;
572 return a < b ? -1 : a > b ? 1 : 0;
575 /* Check that all threads are working. */
576 static void test_parmap_extended(e_xbt_parmap_mode_t mode)
578 unsigned num_workers;
580 for (num_workers = 1 ; num_workers <= 16 ; num_workers *= 2) {
581 const unsigned len = 2 * num_workers;
588 xbt_test_add("Extended parmap usage (%u workers)", num_workers);
590 TEST_PARMAP_VALIDATE_MODE(mode);
591 parmap = xbt_parmap_new(num_workers, mode);
593 a = xbt_malloc(len * sizeof *a);
594 data = xbt_dynar_new(sizeof a, NULL);
595 for (i = 0; i < len; i++)
596 xbt_dynar_push_as(data, void *, &a[i]);
598 xbt_parmap_apply(parmap, fun_get_id, data);
600 qsort(a, len, sizeof a[0], fun_compare);
602 for (i = 1; i < len; i++)
603 if (a[i] != a[i - 1])
605 xbt_test_assert(count == num_workers,
606 "only %u/%u threads did some work", count, num_workers);
608 xbt_dynar_free(&data);
610 xbt_parmap_destroy(parmap);
614 XBT_TEST_UNIT("extended_posix", test_parmap_extended_posix, "Extended usage: posix")
616 test_parmap_extended(XBT_PARMAP_POSIX);
619 XBT_TEST_UNIT("extended_futex", test_parmap_extended_futex, "Extended usage: futex")
621 test_parmap_extended(XBT_PARMAP_FUTEX);
624 XBT_TEST_UNIT("extended_busy_wait", test_parmap_extended_busy_wait, "Extended usage: busy_wait")
626 test_parmap_extended(XBT_PARMAP_BUSY_WAIT);
629 #endif /* SIMGRID_TEST */