1 /* Copyright (c) 2004-2019. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
8 #include "simgrid/Exception.hpp"
9 #include "simgrid/s4u/Actor.hpp"
10 #include "simgrid/s4u/Comm.hpp"
11 #include "simgrid/s4u/Exec.hpp"
12 #include "simgrid/s4u/Mailbox.hpp"
13 #include "src/instr/instr_private.hpp"
14 #include "src/kernel/activity/ExecImpl.hpp"
15 #include "src/msg/msg_private.hpp"
17 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(msg_gos, msg, "Logging specific to MSG (gos)");
20 * @brief Executes a parallel task and waits for its termination.
22 * @param task a #msg_task_t to execute on the location on which the process is running.
24 * @return #MSG_OK if the task was successfully completed, #MSG_TASK_CANCELED or #MSG_HOST_FAILURE otherwise
26 msg_error_t MSG_parallel_task_execute(msg_task_t task)
28 return MSG_parallel_task_execute_with_timeout(task, -1);
31 msg_error_t MSG_parallel_task_execute_with_timeout(msg_task_t task, double timeout)
33 msg_error_t status = MSG_OK;
35 xbt_assert((not task->compute) && not task->is_used(), "This task is executed somewhere else. Go fix your code!");
37 XBT_DEBUG("Computing on %s", MSG_process_get_name(MSG_process_self()));
39 if (TRACE_actor_is_enabled())
40 simgrid::instr::Container::by_name(instr_pid(MSG_process_self()))->get_state("ACTOR_STATE")->push_event("execute");
44 simgrid::s4u::ExecPtr e =
45 simgrid::s4u::this_actor::exec_init(task->hosts_, task->flops_parallel_amount, task->bytes_parallel_amount)
46 ->set_name(task->get_name())
47 ->set_tracing_category(task->get_tracing_category())
48 ->set_timeout(timeout)
50 task->compute = boost::static_pointer_cast<simgrid::kernel::activity::ExecImpl>(e->get_impl());
52 XBT_DEBUG("Parallel execution action created: %p", task->compute.get());
58 XBT_DEBUG("Execution task '%s' finished", task->get_cname());
59 } catch (simgrid::HostFailureException& e) {
60 status = MSG_HOST_FAILURE;
61 } catch (simgrid::TimeoutError& e) {
63 } catch (simgrid::CancelException& e) {
64 status = MSG_TASK_CANCELED;
67 /* action ended, set comm and compute = nullptr, the actions is already destroyed in the main function */
68 task->flops_amount = 0.0;
70 task->compute = nullptr;
72 if (TRACE_actor_is_enabled())
73 simgrid::instr::Container::by_name(instr_pid(MSG_process_self()))->get_state("ACTOR_STATE")->pop_event();
79 * @brief Receives a task from a mailbox.
81 * This is a blocking function, the execution flow will be blocked until the task is received. See #MSG_task_irecv
82 * for receiving tasks asynchronously.
84 * @param task a memory location for storing a #msg_task_t.
85 * @param alias name of the mailbox to receive the task from
88 * #MSG_OK if the task was successfully received,
89 * #MSG_HOST_FAILURE, or #MSG_TRANSFER_FAILURE otherwise.
91 msg_error_t MSG_task_receive(msg_task_t * task, const char *alias)
93 return MSG_task_receive_with_timeout(task, alias, -1);
97 * @brief Receives a task from a mailbox at a given rate.
99 * @param task a memory location for storing a #msg_task_t.
100 * @param alias name of the mailbox to receive the task from
101 * @param rate limit the reception to rate bandwidth (byte/sec)
103 * The rate parameter can be used to receive a task with a limited bandwidth (smaller than the physical available
104 * value). Use MSG_task_receive() if you don't limit the rate (or pass -1 as a rate value do disable this feature).
107 * #MSG_OK if the task was successfully received,
108 * #MSG_HOST_FAILURE, or #MSG_TRANSFER_FAILURE otherwise.
110 msg_error_t MSG_task_receive_bounded(msg_task_t * task, const char *alias, double rate)
112 return MSG_task_receive_with_timeout_bounded(task, alias, -1, rate);
116 * @brief Receives a task from a mailbox with a given timeout.
118 * This is a blocking function with a timeout, the execution flow will be blocked until the task is received or the
119 * timeout is achieved. See #MSG_task_irecv for receiving tasks asynchronously. You can provide a -1 timeout
120 * to obtain an infinite timeout.
122 * @param task a memory location for storing a #msg_task_t.
123 * @param alias name of the mailbox to receive the task from
124 * @param timeout is the maximum wait time for completion (if -1, this call is the same as #MSG_task_receive)
127 * #MSG_OK if the task was successfully received,
128 * #MSG_HOST_FAILURE, or #MSG_TRANSFER_FAILURE, or #MSG_TIMEOUT otherwise.
130 msg_error_t MSG_task_receive_with_timeout(msg_task_t * task, const char *alias, double timeout)
132 return MSG_task_receive_ext(task, alias, timeout, nullptr);
136 * @brief Receives a task from a mailbox with a given timeout and at a given rate.
138 * @param task a memory location for storing a #msg_task_t.
139 * @param alias name of the mailbox to receive the task from
140 * @param timeout is the maximum wait time for completion (if -1, this call is the same as #MSG_task_receive)
141 * @param rate limit the reception to rate bandwidth (byte/sec)
143 * The rate parameter can be used to send a task with a limited
144 * bandwidth (smaller than the physical available value). Use
145 * MSG_task_receive() if you don't limit the rate (or pass -1 as a
146 * rate value do disable this feature).
149 * #MSG_OK if the task was successfully received,
150 * #MSG_HOST_FAILURE, or #MSG_TRANSFER_FAILURE, or #MSG_TIMEOUT otherwise.
152 msg_error_t MSG_task_receive_with_timeout_bounded(msg_task_t* task, const char* alias, double timeout, double rate)
154 return MSG_task_receive_ext_bounded(task, alias, timeout, nullptr, rate);
158 * @brief Receives a task from a mailbox from a specific host with a given timeout.
160 * This is a blocking function with a timeout, the execution flow will be blocked until the task is received or the
161 * timeout is achieved. See #MSG_task_irecv for receiving tasks asynchronously. You can provide a -1 timeout
162 * to obtain an infinite timeout.
164 * @param task a memory location for storing a #msg_task_t.
165 * @param alias name of the mailbox to receive the task from
166 * @param timeout is the maximum wait time for completion (provide -1 for no timeout)
167 * @param host a #msg_host_t host from where the task was sent
170 * #MSG_OK if the task was successfully received,
171 * #MSG_HOST_FAILURE, or #MSG_TRANSFER_FAILURE, or #MSG_TIMEOUT otherwise.
173 msg_error_t MSG_task_receive_ext(msg_task_t * task, const char *alias, double timeout, msg_host_t host)
175 XBT_DEBUG("MSG_task_receive_ext: Trying to receive a message on mailbox '%s'", alias);
176 return MSG_task_receive_ext_bounded(task, alias, timeout, host, -1.0);
180 * @brief Receives a task from a mailbox from a specific host with a given timeout and at a given rate.
182 * @param task a memory location for storing a #msg_task_t.
183 * @param alias name of the mailbox to receive the task from
184 * @param timeout is the maximum wait time for completion (provide -1 for no timeout)
185 * @param host a #msg_host_t host from where the task was sent
186 * @param rate limit the reception to rate bandwidth (byte/sec)
188 * The rate parameter can be used to receive a task with a limited bandwidth (smaller than the physical available
189 * value). Use MSG_task_receive_ext() if you don't limit the rate (or pass -1 as a rate value do disable this feature).
192 * #MSG_OK if the task was successfully received,
193 * #MSG_HOST_FAILURE, or #MSG_TRANSFER_FAILURE, or #MSG_TIMEOUT otherwise.
195 msg_error_t MSG_task_receive_ext_bounded(msg_task_t * task, const char *alias, double timeout, msg_host_t host,
198 XBT_DEBUG("MSG_task_receive_ext: Trying to receive a message on mailbox '%s'", alias);
199 msg_error_t ret = MSG_OK;
200 /* We no longer support getting a task from a specific host */
205 xbt_assert(task, "Null pointer for the task storage");
208 XBT_WARN("Asked to write the received task in a non empty struct -- proceeding.");
210 /* Try to receive it by calling SIMIX network layer */
213 simgrid::s4u::Mailbox::by_name(alias)
215 ->set_dst_data(&payload, sizeof(msg_task_t*))
218 *task = static_cast<msg_task_t>(payload);
219 XBT_DEBUG("Got task %s from %s", (*task)->get_cname(), alias);
220 (*task)->set_not_used();
221 } catch (simgrid::HostFailureException& e) {
222 ret = MSG_HOST_FAILURE;
223 } catch (simgrid::TimeoutError& e) {
225 } catch (simgrid::CancelException& e) {
226 ret = MSG_TASK_CANCELED;
227 } catch (xbt_ex& e) {
228 if (e.category == network_error)
229 ret = MSG_TRANSFER_FAILURE;
234 if (TRACE_actor_is_enabled() && ret != MSG_HOST_FAILURE && ret != MSG_TRANSFER_FAILURE && ret != MSG_TIMEOUT) {
235 container_t process_container = simgrid::instr::Container::by_name(instr_pid(MSG_process_self()));
237 std::string key = std::string("p") + std::to_string((*task)->get_id());
238 simgrid::instr::Container::get_root()->get_link("ACTOR_TASK_LINK")->end_event(process_container, "SR", key);
245 * @brief Starts listening for receiving a task from an asynchronous communication.
247 * This is a non blocking function: use MSG_comm_wait() or MSG_comm_test() to end the communication.
249 * @param task a memory location for storing a #msg_task_t. has to be valid until the end of the communication.
250 * @param name of the mailbox to receive the task on
251 * @return the msg_comm_t communication created
253 msg_comm_t MSG_task_irecv(msg_task_t *task, const char *name)
255 return MSG_task_irecv_bounded(task, name, -1.0);
259 * @brief Starts listening for receiving a task from an asynchronous communication at a given rate.
261 * The rate parameter can be used to receive a task with a limited
262 * bandwidth (smaller than the physical available value). Use
263 * MSG_task_irecv() if you don't limit the rate (or pass -1 as a rate
264 * value do disable this feature).
266 * @param task a memory location for storing a #msg_task_t. has to be valid until the end of the communication.
267 * @param name of the mailbox to receive the task on
268 * @param rate limit the bandwidth to the given rate (byte/sec)
269 * @return the msg_comm_t communication created
271 msg_comm_t MSG_task_irecv_bounded(msg_task_t *task, const char *name, double rate)
273 simgrid::s4u::MailboxPtr mbox = simgrid::s4u::Mailbox::by_name(name);
275 /* FIXME: these functions are not traceable */
277 xbt_assert(task, "Null pointer for the task storage");
280 XBT_CRITICAL("MSG_task_irecv() was asked to write in a non empty task struct.");
282 /* Try to receive it by calling SIMIX network layer */
283 simgrid::s4u::CommPtr comm = simgrid::s4u::Mailbox::by_name(name)
285 ->set_dst_data((void**)task, sizeof(msg_task_t*))
289 return new simgrid::msg::Comm(nullptr, task, comm);
293 * @brief Checks whether a communication is done, and if yes, finalizes it.
294 * @param comm the communication to test
295 * @return 'true' if the communication is finished
296 * (but it may have failed, use MSG_comm_get_status() to know its status)
297 * or 'false' if the communication is not finished yet
298 * If the status is 'false', don't forget to use MSG_process_sleep() after the test.
300 int MSG_comm_test(msg_comm_t comm)
302 bool finished = false;
305 finished = comm->s_comm->test();
306 if (finished && comm->task_received != nullptr) {
307 /* I am the receiver */
308 (*comm->task_received)->set_not_used();
310 } catch (simgrid::TimeoutError& e) {
311 comm->status = MSG_TIMEOUT;
313 } catch (simgrid::CancelException& e) {
314 comm->status = MSG_TASK_CANCELED;
318 if (e.category == network_error) {
319 comm->status = MSG_TRANSFER_FAILURE;
330 * @brief This function checks if a communication is finished.
331 * @param comms a vector of communications
332 * @return the position of the finished communication if any
333 * (but it may have failed, use MSG_comm_get_status() to know its status), or -1 if none is finished
335 int MSG_comm_testany(xbt_dynar_t comms)
337 int finished_index = -1;
339 /* Create the equivalent array with SIMIX objects: */
340 std::vector<simgrid::kernel::activity::CommImpl*> s_comms;
341 s_comms.reserve(xbt_dynar_length(comms));
344 xbt_dynar_foreach(comms, cursor, comm) {
345 s_comms.push_back(static_cast<simgrid::kernel::activity::CommImpl*>(comm->s_comm->get_impl().get()));
348 msg_error_t status = MSG_OK;
350 finished_index = simcall_comm_testany(s_comms.data(), s_comms.size());
351 } catch (simgrid::TimeoutError& e) {
352 finished_index = e.value;
353 status = MSG_TIMEOUT;
354 } catch (simgrid::CancelException& e) {
355 finished_index = e.value;
356 status = MSG_TASK_CANCELED;
359 if (e.category != network_error)
361 finished_index = e.value;
362 status = MSG_TRANSFER_FAILURE;
365 if (finished_index != -1) {
366 comm = xbt_dynar_get_as(comms, finished_index, msg_comm_t);
367 /* the communication is finished */
368 comm->status = status;
370 if (status == MSG_OK && comm->task_received != nullptr) {
371 /* I am the receiver */
372 (*comm->task_received)->set_not_used();
376 return finished_index;
379 /** @brief Destroys the provided communication. */
380 void MSG_comm_destroy(msg_comm_t comm)
385 /** @brief Wait for the completion of a communication.
387 * It takes two parameters.
388 * @param comm the communication to wait.
389 * @param timeout Wait until the communication terminates or the timeout occurs.
390 * You can provide a -1 timeout to obtain an infinite timeout.
391 * @return msg_error_t
393 msg_error_t MSG_comm_wait(msg_comm_t comm, double timeout)
396 comm->s_comm->wait_for(timeout);
398 if (comm->task_received != nullptr) {
399 /* I am the receiver */
400 (*comm->task_received)->set_not_used();
403 /* FIXME: these functions are not traceable */
404 } catch (simgrid::TimeoutError& e) {
405 comm->status = MSG_TIMEOUT;
406 } catch (simgrid::CancelException& e) {
407 comm->status = MSG_TASK_CANCELED;
410 if (e.category == network_error)
411 comm->status = MSG_TRANSFER_FAILURE;
419 /** @brief This function is called by a sender and permit to wait for each communication
421 * @param comm a vector of communication
422 * @param nb_elem is the size of the comm vector
423 * @param timeout for each call of MSG_comm_wait
425 void MSG_comm_waitall(msg_comm_t * comm, int nb_elem, double timeout)
427 for (int i = 0; i < nb_elem; i++)
428 MSG_comm_wait(comm[i], timeout);
431 /** @brief This function waits for the first communication finished in a list.
432 * @param comms a vector of communications
433 * @return the position of the first finished communication
434 * (but it may have failed, use MSG_comm_get_status() to know its status)
436 int MSG_comm_waitany(xbt_dynar_t comms)
438 int finished_index = -1;
440 /* Create the equivalent array with SIMIX objects: */
441 std::vector<simgrid::kernel::activity::CommImpl*> s_comms;
442 s_comms.reserve(xbt_dynar_length(comms));
445 xbt_dynar_foreach(comms, cursor, comm) {
446 s_comms.push_back(static_cast<simgrid::kernel::activity::CommImpl*>(comm->s_comm->get_impl().get()));
449 msg_error_t status = MSG_OK;
451 finished_index = simcall_comm_waitany(s_comms.data(), s_comms.size(), -1);
452 } catch (simgrid::TimeoutError& e) {
453 finished_index = e.value;
454 status = MSG_TIMEOUT;
455 } catch (simgrid::CancelException& e) {
456 finished_index = e.value;
457 status = MSG_TASK_CANCELED;
460 if (e.category == network_error) {
461 finished_index = e.value;
462 status = MSG_TRANSFER_FAILURE;
468 xbt_assert(finished_index != -1, "WaitAny returned -1");
470 comm = xbt_dynar_get_as(comms, finished_index, msg_comm_t);
471 /* the communication is finished */
472 comm->status = status;
474 if (comm->task_received != nullptr) {
475 /* I am the receiver */
476 (*comm->task_received)->set_not_used();
479 return finished_index;
483 * @brief Returns the error (if any) that occurred during a finished communication.
484 * @param comm a finished communication
485 * @return the status of the communication, or #MSG_OK if no error occurred during the communication
487 msg_error_t MSG_comm_get_status(msg_comm_t comm) {
492 /** @brief Get a task (#msg_task_t) from a communication
494 * @param comm the communication where to get the task
495 * @return the task from the communication
497 msg_task_t MSG_comm_get_task(msg_comm_t comm)
499 xbt_assert(comm, "Invalid parameter");
501 return comm->task_received ? *comm->task_received : comm->task_sent;
505 * @brief This function is called by SIMIX in kernel mode to copy the data of a comm.
506 * @param comm the comm
507 * @param buff the data copied
508 * @param buff_size size of the buffer
510 void MSG_comm_copy_data_from_SIMIX(simgrid::kernel::activity::CommImpl* comm, void* buff, size_t buff_size)
512 SIMIX_comm_copy_pointer_callback(comm, buff, buff_size);
514 // notify the user callback if any
515 if (msg_global->task_copy_callback) {
516 msg_task_t task = static_cast<msg_task_t>(buff);
517 msg_global->task_copy_callback(task, comm->src_actor_->ciface(), comm->dst_actor_->ciface());
522 * @brief Sends a task to a mailbox
524 * This is a blocking function, the execution flow will be blocked until the task is sent (and received on the other
525 * side if #MSG_task_receive is used).
526 * See #MSG_task_isend for sending tasks asynchronously.
528 * @param task the task to be sent
529 * @param alias the mailbox name to where the task is sent
531 * @return Returns #MSG_OK if the task was successfully sent,
532 * #MSG_HOST_FAILURE, or #MSG_TRANSFER_FAILURE otherwise.
534 msg_error_t MSG_task_send(msg_task_t task, const char *alias)
536 XBT_DEBUG("MSG_task_send: Trying to send a message on mailbox '%s'", alias);
537 return MSG_task_send_with_timeout(task, alias, -1);
541 * @brief Sends a task to a mailbox with a maximum rate
543 * This is a blocking function, the execution flow will be blocked until the task is sent. The maxrate parameter allows
544 * the application to limit the bandwidth utilization of network links when sending the task.
546 * The maxrate parameter can be used to send a task with a limited bandwidth (smaller than the physical available
547 * value). Use MSG_task_send() if you don't limit the rate (or pass -1 as a rate value do disable this feature).
549 * @param task the task to be sent
550 * @param alias the mailbox name to where the task is sent
551 * @param maxrate the maximum communication rate for sending this task (byte/sec)
553 * @return Returns #MSG_OK if the task was successfully sent,
554 * #MSG_HOST_FAILURE, or #MSG_TRANSFER_FAILURE otherwise.
556 msg_error_t MSG_task_send_bounded(msg_task_t task, const char *alias, double maxrate)
558 task->set_rate(maxrate);
559 return MSG_task_send(task, alias);
563 * @brief Sends a task to a mailbox with a timeout
565 * This is a blocking function, the execution flow will be blocked until the task is sent or the timeout is achieved.
567 * @param task the task to be sent
568 * @param alias the mailbox name to where the task is sent
569 * @param timeout is the maximum wait time for completion (if -1, this call is the same as #MSG_task_send)
571 * @return Returns #MSG_OK if the task was successfully sent,
572 * #MSG_HOST_FAILURE, or #MSG_TRANSFER_FAILURE, or #MSG_TIMEOUT otherwise.
574 msg_error_t MSG_task_send_with_timeout(msg_task_t task, const char *alias, double timeout)
576 msg_error_t ret = MSG_OK;
579 simgrid::s4u::CommPtr comm = task->send_async(alias, nullptr, false);
581 comm->wait_for(timeout);
582 } catch (simgrid::TimeoutError& e) {
584 } catch (simgrid::CancelException& e) {
585 ret = MSG_HOST_FAILURE;
586 } catch (xbt_ex& e) {
587 if (e.category == network_error)
588 ret = MSG_TRANSFER_FAILURE;
592 /* If the send failed, it is not used anymore */
593 task->set_not_used();
600 * @brief Sends a task to a mailbox with a timeout and with a maximum rate
602 * This is a blocking function, the execution flow will be blocked until the task is sent or the timeout is achieved.
604 * The maxrate parameter can be used to send a task with a limited bandwidth (smaller than the physical available
605 * value). Use MSG_task_send_with_timeout() if you don't limit the rate (or pass -1 as a rate value do disable this
608 * @param task the task to be sent
609 * @param alias the mailbox name to where the task is sent
610 * @param timeout is the maximum wait time for completion (if -1, this call is the same as #MSG_task_send)
611 * @param maxrate the maximum communication rate for sending this task (byte/sec)
613 * @return Returns #MSG_OK if the task was successfully sent,
614 * #MSG_HOST_FAILURE, or #MSG_TRANSFER_FAILURE, or #MSG_TIMEOUT otherwise.
616 msg_error_t MSG_task_send_with_timeout_bounded(msg_task_t task, const char *alias, double timeout, double maxrate)
618 task->set_rate(maxrate);
619 return MSG_task_send_with_timeout(task, alias, timeout);
623 * @brief Look if there is a communication on a mailbox and return the PID of the sender process.
625 * @param alias the name of the mailbox to be considered
627 * @return Returns the PID of sender process,
628 * -1 if there is no communication in the mailbox.#include <cmath>
631 int MSG_task_listen_from(const char *alias)
633 /* looks inside the rdv directly. Not clean. */
634 simgrid::kernel::activity::CommImplPtr comm = simgrid::s4u::Mailbox::by_name(alias)->front();
636 if (comm && comm->src_actor_)
637 return comm->src_actor_->get_pid();