1 /* Copyright (c) 2009-2017. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
15 #include <boost/algorithm/string/join.hpp>
18 #include <unordered_map>
21 using simgrid::s4u::Actor;
23 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
25 static int active_processes = 0;
26 static std::unordered_map<int, std::vector<MPI_Request>*> reqq;
28 static MPI_Datatype MPI_DEFAULT_TYPE;
30 #define CHECK_ACTION_PARAMS(action, mandatory, optional) \
32 if (action.size() < static_cast<unsigned long>(mandatory + 2)) \
33 THROWF(arg_error, 0, "%s replay failed.\n" \
34 "%lu items were given on the line. First two should be process_id and action. " \
35 "This action needs after them %lu mandatory arguments, and accepts %lu optional ones. \n" \
36 "Please contact the Simgrid team if support is needed", \
37 __FUNCTION__, action.size(), static_cast<unsigned long>(mandatory), \
38 static_cast<unsigned long>(optional)); \
41 static void log_timed_action(simgrid::xbt::ReplayAction& action, double clock)
43 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
44 std::string s = boost::algorithm::join(action, " ");
45 XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
49 static std::vector<MPI_Request>* get_reqq_self()
51 return reqq.at(Actor::self()->getPid());
54 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
56 reqq.insert({Actor::self()->getPid(), mpi_request});
60 static double parse_double(std::string string)
62 return xbt_str_parse_double(string.c_str(), "%s is not a double");
69 class ActionArgParser {
71 virtual void parse(simgrid::xbt::ReplayAction& action) { CHECK_ACTION_PARAMS(action, 0, 0) }
74 class SendRecvParser : public ActionArgParser {
76 /* communication partner; if we send, this is the receiver and vice versa */
79 MPI_Datatype datatype1 = MPI_DEFAULT_TYPE;
81 void parse(simgrid::xbt::ReplayAction& action) override
83 CHECK_ACTION_PARAMS(action, 2, 1)
84 partner = std::stoi(action[2]);
85 size = parse_double(action[3]);
86 if (action.size() > 4)
87 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
91 class ComputeParser : public ActionArgParser {
93 /* communication partner; if we send, this is the receiver and vice versa */
96 void parse(simgrid::xbt::ReplayAction& action) override
98 CHECK_ACTION_PARAMS(action, 1, 0)
99 flops = parse_double(action[2]);
103 template <class T> class ReplayAction {
105 const std::string name;
111 explicit ReplayAction(std::string name) : name(name), my_proc_id(simgrid::s4u::Actor::self()->getPid()) {}
113 virtual void execute(simgrid::xbt::ReplayAction& action)
115 // Needs to be re-initialized for every action, hence here
116 double start_time = smpi_process()->simulated_elapsed();
119 log_timed_action(action, start_time);
122 virtual void kernel(simgrid::xbt::ReplayAction& action) = 0;
125 class WaitAction : public ReplayAction<ActionArgParser> {
127 WaitAction() : ReplayAction("Wait") {}
128 void kernel(simgrid::xbt::ReplayAction& action) override
130 CHECK_ACTION_PARAMS(action, 0, 0)
133 std::string s = boost::algorithm::join(action, " ");
134 xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
135 MPI_Request request = get_reqq_self()->back();
136 get_reqq_self()->pop_back();
138 if (request == nullptr) {
139 /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just
144 int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
146 MPI_Group group = request->comm()->group();
147 int src_traced = group->rank(request->src());
148 int dst_traced = group->rank(request->dst());
149 bool is_wait_for_receive = (request->flags() & RECV);
150 // TODO: Here we take the rank while we normally take the process id (look for my_proc_id)
151 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::NoOpTIData("wait"));
153 Request::wait(&request, &status);
155 TRACE_smpi_comm_out(rank);
156 if (is_wait_for_receive)
157 TRACE_smpi_recv(src_traced, dst_traced, 0);
161 class SendAction : public ReplayAction<SendRecvParser> {
163 SendAction() = delete;
164 SendAction(std::string name) : ReplayAction(name) {}
165 void kernel(simgrid::xbt::ReplayAction& action) override
167 int dst_traced = MPI_COMM_WORLD->group()->actor(args.partner)->getPid();
169 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
170 Datatype::encode(args.datatype1)));
171 if (not TRACE_smpi_view_internals())
172 TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, args.size * args.datatype1->size());
174 if (name == "send") {
175 Request::send(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
176 } else if (name == "Isend") {
177 MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
178 get_reqq_self()->push_back(request);
180 xbt_die("Don't know this action, %s", name.c_str());
183 TRACE_smpi_comm_out(my_proc_id);
187 class RecvAction : public ReplayAction<SendRecvParser> {
189 RecvAction() = delete;
190 explicit RecvAction(std::string name) : ReplayAction(name) {}
191 void kernel(simgrid::xbt::ReplayAction& action) override
193 int src_traced = MPI_COMM_WORLD->group()->actor(args.partner)->getPid();
195 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
196 Datatype::encode(args.datatype1)));
199 // unknown size from the receiver point of view
200 if (args.size <= 0.0) {
201 Request::probe(args.partner, 0, MPI_COMM_WORLD, &status);
202 args.size = status.count;
205 if (name == "recv") {
206 Request::recv(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD, &status);
207 } else if (name == "Irecv") {
208 MPI_Request request = Request::irecv(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
209 get_reqq_self()->push_back(request);
212 TRACE_smpi_comm_out(my_proc_id);
213 // TODO: Check why this was only activated in the "recv" case and not in the "Irecv" case
214 if (name == "recv" && not TRACE_smpi_view_internals()) {
215 TRACE_smpi_recv(src_traced, my_proc_id, 0);
220 class ComputeAction : public ReplayAction<ComputeParser> {
222 ComputeAction() : ReplayAction("compute") {}
223 void kernel(simgrid::xbt::ReplayAction& action) override
225 TRACE_smpi_computing_in(my_proc_id, args.flops);
226 smpi_execute_flops(args.flops);
227 TRACE_smpi_computing_out(my_proc_id);
231 class TestAction : public ReplayAction<ActionArgParser> {
233 TestAction() : ReplayAction("Test") {}
234 void kernel(simgrid::xbt::ReplayAction& action) override
236 MPI_Request request = get_reqq_self()->back();
237 get_reqq_self()->pop_back();
238 // if request is null here, this may mean that a previous test has succeeded
239 // Different times in traced application and replayed version may lead to this
240 // In this case, ignore the extra calls.
241 if (request != nullptr) {
242 TRACE_smpi_testing_in(my_proc_id);
245 int flag = Request::test(&request, &status);
247 XBT_DEBUG("MPI_Test result: %d", flag);
248 /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
250 get_reqq_self()->push_back(request);
252 TRACE_smpi_testing_out(my_proc_id);
257 } // Replay Namespace
259 static void action_init(simgrid::xbt::ReplayAction& action)
261 XBT_DEBUG("Initialize the counters");
262 CHECK_ACTION_PARAMS(action, 0, 1)
263 if (action.size() > 2)
264 MPI_DEFAULT_TYPE = MPI_DOUBLE; // default MPE datatype
266 MPI_DEFAULT_TYPE = MPI_BYTE; // default TAU datatype
268 /* start a simulated timer */
269 smpi_process()->simulated_start();
270 /*initialize the number of active processes */
271 active_processes = smpi_process_count();
273 set_reqq_self(new std::vector<MPI_Request>);
276 static void action_finalize(simgrid::xbt::ReplayAction& action)
281 static void action_comm_size(simgrid::xbt::ReplayAction& action)
283 log_timed_action (action, smpi_process()->simulated_elapsed());
286 static void action_comm_split(simgrid::xbt::ReplayAction& action)
288 log_timed_action (action, smpi_process()->simulated_elapsed());
291 static void action_comm_dup(simgrid::xbt::ReplayAction& action)
293 log_timed_action (action, smpi_process()->simulated_elapsed());
296 static void action_compute(simgrid::xbt::ReplayAction& action)
298 Replay::ComputeAction().execute(action);
301 static void action_test(simgrid::xbt::ReplayAction& action)
303 CHECK_ACTION_PARAMS(action, 0, 0)
304 double clock = smpi_process()->simulated_elapsed();
307 MPI_Request request = get_reqq_self()->back();
308 get_reqq_self()->pop_back();
309 //if request is null here, this may mean that a previous test has succeeded
310 //Different times in traced application and replayed version may lead to this
311 //In this case, ignore the extra calls.
312 if(request!=nullptr){
313 int my_proc_id = Actor::self()->getPid();
314 TRACE_smpi_testing_in(my_proc_id);
316 int flag = Request::test(&request, &status);
318 XBT_DEBUG("MPI_Test result: %d", flag);
319 /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now nullptr.*/
320 get_reqq_self()->push_back(request);
322 TRACE_smpi_testing_out(my_proc_id);
324 log_timed_action (action, clock);
327 static void action_waitall(simgrid::xbt::ReplayAction& action)
329 CHECK_ACTION_PARAMS(action, 0, 0)
330 double clock = smpi_process()->simulated_elapsed();
331 const unsigned int count_requests = get_reqq_self()->size();
333 if (count_requests>0) {
334 MPI_Status status[count_requests];
336 int my_proc_id_traced = Actor::self()->getPid();
337 TRACE_smpi_comm_in(my_proc_id_traced, __FUNCTION__,
338 new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
339 int recvs_snd[count_requests];
340 int recvs_rcv[count_requests];
341 for (unsigned int i = 0; i < count_requests; i++) {
342 const auto& req = (*get_reqq_self())[i];
343 if (req && (req->flags() & RECV)) {
344 recvs_snd[i] = req->src();
345 recvs_rcv[i] = req->dst();
349 Request::waitall(count_requests, &(*get_reqq_self())[0], status);
351 for (unsigned i = 0; i < count_requests; i++) {
352 if (recvs_snd[i]!=-100)
353 TRACE_smpi_recv(recvs_snd[i], recvs_rcv[i],0);
355 TRACE_smpi_comm_out(my_proc_id_traced);
357 log_timed_action (action, clock);
360 static void action_barrier(simgrid::xbt::ReplayAction& action)
362 double clock = smpi_process()->simulated_elapsed();
363 int my_proc_id = Actor::self()->getPid();
364 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::NoOpTIData("barrier"));
366 Colls::barrier(MPI_COMM_WORLD);
368 TRACE_smpi_comm_out(my_proc_id);
369 log_timed_action (action, clock);
372 static void action_bcast(simgrid::xbt::ReplayAction& action)
374 CHECK_ACTION_PARAMS(action, 1, 2)
375 double size = parse_double(action[2]);
376 double clock = smpi_process()->simulated_elapsed();
377 int root = (action.size() > 3) ? std::stoi(action[3]) : 0;
378 /* Initialize MPI_CURRENT_TYPE in order to decrease the number of the checks */
379 MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE;
381 int my_proc_id = Actor::self()->getPid();
382 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
383 new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(root)->getPid(), -1.0, size,
384 -1, Datatype::encode(MPI_CURRENT_TYPE), ""));
386 void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
388 Colls::bcast(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
390 TRACE_smpi_comm_out(my_proc_id);
391 log_timed_action (action, clock);
394 static void action_reduce(simgrid::xbt::ReplayAction& action)
396 CHECK_ACTION_PARAMS(action, 2, 2)
397 double comm_size = parse_double(action[2]);
398 double comp_size = parse_double(action[3]);
399 double clock = smpi_process()->simulated_elapsed();
400 int root = (action.size() > 4) ? std::stoi(action[4]) : 0;
402 MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 5) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE;
404 int my_proc_id = Actor::self()->getPid();
405 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
406 new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(root)->getPid(), comp_size,
407 comm_size, -1, Datatype::encode(MPI_CURRENT_TYPE), ""));
409 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
410 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
411 Colls::reduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
412 smpi_execute_flops(comp_size);
414 TRACE_smpi_comm_out(my_proc_id);
415 log_timed_action (action, clock);
418 static void action_allReduce(simgrid::xbt::ReplayAction& action)
420 CHECK_ACTION_PARAMS(action, 2, 1)
421 double comm_size = parse_double(action[2]);
422 double comp_size = parse_double(action[3]);
424 MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE;
426 double clock = smpi_process()->simulated_elapsed();
427 int my_proc_id = Actor::self()->getPid();
428 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::CollTIData("allReduce", -1, comp_size, comm_size, -1,
429 Datatype::encode(MPI_CURRENT_TYPE), ""));
431 void *recvbuf = smpi_get_tmp_recvbuffer(comm_size* MPI_CURRENT_TYPE->size());
432 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
433 Colls::allreduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
434 smpi_execute_flops(comp_size);
436 TRACE_smpi_comm_out(my_proc_id);
437 log_timed_action (action, clock);
440 static void action_allToAll(simgrid::xbt::ReplayAction& action)
442 CHECK_ACTION_PARAMS(action, 2, 2) //two mandatory (send and recv volumes) and two optional (corresponding datatypes)
443 double clock = smpi_process()->simulated_elapsed();
444 unsigned long comm_size = MPI_COMM_WORLD->size();
445 int send_size = parse_double(action[2]);
446 int recv_size = parse_double(action[3]);
447 MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE};
448 MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
450 void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* MPI_CURRENT_TYPE->size());
451 void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
453 int my_proc_id = Actor::self()->getPid();
454 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
455 new simgrid::instr::CollTIData("allToAll", -1, -1.0, send_size, recv_size,
456 Datatype::encode(MPI_CURRENT_TYPE),
457 Datatype::encode(MPI_CURRENT_TYPE2)));
459 Colls::alltoall(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
461 TRACE_smpi_comm_out(my_proc_id);
462 log_timed_action (action, clock);
465 static void action_gather(simgrid::xbt::ReplayAction& action)
467 /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
470 1) 68 is the sendcounts
471 2) 68 is the recvcounts
472 3) 0 is the root node
473 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
474 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
476 CHECK_ACTION_PARAMS(action, 2, 3)
477 double clock = smpi_process()->simulated_elapsed();
478 unsigned long comm_size = MPI_COMM_WORLD->size();
479 int send_size = parse_double(action[2]);
480 int recv_size = parse_double(action[3]);
481 MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
482 MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[6]) : MPI_DEFAULT_TYPE};
484 void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
485 void *recv = nullptr;
486 int root = (action.size() > 4) ? std::stoi(action[4]) : 0;
487 int rank = MPI_COMM_WORLD->rank();
490 recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
492 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
493 Datatype::encode(MPI_CURRENT_TYPE),
494 Datatype::encode(MPI_CURRENT_TYPE2)));
496 Colls::gather(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
498 TRACE_smpi_comm_out(Actor::self()->getPid());
499 log_timed_action (action, clock);
502 static void action_scatter(simgrid::xbt::ReplayAction& action)
504 /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
507 1) 68 is the sendcounts
508 2) 68 is the recvcounts
509 3) 0 is the root node
510 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
511 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
513 CHECK_ACTION_PARAMS(action, 2, 3)
514 double clock = smpi_process()->simulated_elapsed();
515 unsigned long comm_size = MPI_COMM_WORLD->size();
516 int send_size = parse_double(action[2]);
517 int recv_size = parse_double(action[3]);
518 MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
519 MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[6]) : MPI_DEFAULT_TYPE};
521 void* send = smpi_get_tmp_sendbuffer(send_size * MPI_CURRENT_TYPE->size());
522 void* recv = nullptr;
523 int root = (action.size() > 4) ? std::stoi(action[4]) : 0;
524 int rank = MPI_COMM_WORLD->rank();
527 recv = smpi_get_tmp_recvbuffer(recv_size * comm_size * MPI_CURRENT_TYPE2->size());
529 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
530 Datatype::encode(MPI_CURRENT_TYPE),
531 Datatype::encode(MPI_CURRENT_TYPE2)));
533 Colls::scatter(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
535 TRACE_smpi_comm_out(Actor::self()->getPid());
536 log_timed_action(action, clock);
539 static void action_gatherv(simgrid::xbt::ReplayAction& action)
541 /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
542 0 gather 68 68 10 10 10 0 0 0
544 1) 68 is the sendcount
545 2) 68 10 10 10 is the recvcounts
546 3) 0 is the root node
547 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
548 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
550 double clock = smpi_process()->simulated_elapsed();
551 unsigned long comm_size = MPI_COMM_WORLD->size();
552 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
553 int send_size = parse_double(action[2]);
554 std::vector<int> disps(comm_size, 0);
555 std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
557 MPI_Datatype MPI_CURRENT_TYPE =
558 (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
559 MPI_Datatype MPI_CURRENT_TYPE2{
560 (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
562 void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
563 void *recv = nullptr;
564 for (unsigned int i = 0; i < comm_size; i++) {
565 (*recvcounts)[i] = std::stoi(action[i + 3]);
567 int recv_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
569 int root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
570 int rank = MPI_COMM_WORLD->rank();
573 recv = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
575 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData(
576 "gatherV", root, send_size, nullptr, -1, recvcounts,
577 Datatype::encode(MPI_CURRENT_TYPE), Datatype::encode(MPI_CURRENT_TYPE2)));
579 Colls::gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts->data(), disps.data(), MPI_CURRENT_TYPE2, root,
582 TRACE_smpi_comm_out(Actor::self()->getPid());
583 log_timed_action (action, clock);
586 static void action_scatterv(simgrid::xbt::ReplayAction& action)
588 /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
589 0 gather 68 10 10 10 68 0 0 0
591 1) 68 10 10 10 is the sendcounts
592 2) 68 is the recvcount
593 3) 0 is the root node
594 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
595 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
597 double clock = smpi_process()->simulated_elapsed();
598 unsigned long comm_size = MPI_COMM_WORLD->size();
599 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
600 int recv_size = parse_double(action[2 + comm_size]);
601 std::vector<int> disps(comm_size, 0);
602 std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
604 MPI_Datatype MPI_CURRENT_TYPE =
605 (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
606 MPI_Datatype MPI_CURRENT_TYPE2{
607 (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
609 void* send = nullptr;
610 void* recv = smpi_get_tmp_recvbuffer(recv_size * MPI_CURRENT_TYPE->size());
611 for (unsigned int i = 0; i < comm_size; i++) {
612 (*sendcounts)[i] = std::stoi(action[i + 2]);
614 int send_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
616 int root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
617 int rank = MPI_COMM_WORLD->rank();
620 send = smpi_get_tmp_sendbuffer(send_sum * MPI_CURRENT_TYPE2->size());
622 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData("gatherV", root, -1, sendcounts, recv_size,
623 nullptr, Datatype::encode(MPI_CURRENT_TYPE),
624 Datatype::encode(MPI_CURRENT_TYPE2)));
626 Colls::scatterv(send, sendcounts->data(), disps.data(), MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root,
629 TRACE_smpi_comm_out(Actor::self()->getPid());
630 log_timed_action(action, clock);
633 static void action_reducescatter(simgrid::xbt::ReplayAction& action)
635 /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
636 0 reduceScatter 275427 275427 275427 204020 11346849 0
638 1) The first four values after the name of the action declare the recvcounts array
639 2) The value 11346849 is the amount of instructions
640 3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
642 double clock = smpi_process()->simulated_elapsed();
643 unsigned long comm_size = MPI_COMM_WORLD->size();
644 CHECK_ACTION_PARAMS(action, comm_size+1, 1)
645 int comp_size = parse_double(action[2+comm_size]);
646 int my_proc_id = Actor::self()->getPid();
647 std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>);
648 MPI_Datatype MPI_CURRENT_TYPE =
649 (action.size() > 3 + comm_size) ? simgrid::smpi::Datatype::decode(action[3 + comm_size]) : MPI_DEFAULT_TYPE;
651 for (unsigned int i = 0; i < comm_size; i++) {
652 recvcounts->push_back(std::stoi(action[i + 2]));
654 int size{std::accumulate(recvcounts->begin(), recvcounts->end(), 0)};
656 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
657 new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, recvcounts,
658 std::to_string(comp_size), /* ugly hack to print comp_size */
659 Datatype::encode(MPI_CURRENT_TYPE)));
661 void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
662 void *recvbuf = smpi_get_tmp_recvbuffer(size* MPI_CURRENT_TYPE->size());
664 Colls::reduce_scatter(sendbuf, recvbuf, recvcounts->data(), MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
665 smpi_execute_flops(comp_size);
667 TRACE_smpi_comm_out(my_proc_id);
668 log_timed_action (action, clock);
671 static void action_allgather(simgrid::xbt::ReplayAction& action)
673 /* The structure of the allgather action for the rank 0 (total 4 processes) is the following:
674 0 allGather 275427 275427
676 1) 275427 is the sendcount
677 2) 275427 is the recvcount
678 3) No more values mean that the datatype for sent and receive buffer is the default one, see
679 simgrid::smpi::Datatype::decode().
681 double clock = smpi_process()->simulated_elapsed();
683 CHECK_ACTION_PARAMS(action, 2, 2)
684 int sendcount = std::stoi(action[2]);
685 int recvcount = std::stoi(action[3]);
687 MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE};
688 MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
690 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
691 void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* MPI_CURRENT_TYPE2->size());
693 int my_proc_id = Actor::self()->getPid();
695 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
696 new simgrid::instr::CollTIData("allGather", -1, -1.0, sendcount, recvcount,
697 Datatype::encode(MPI_CURRENT_TYPE),
698 Datatype::encode(MPI_CURRENT_TYPE2)));
700 Colls::allgather(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
702 TRACE_smpi_comm_out(my_proc_id);
703 log_timed_action (action, clock);
706 static void action_allgatherv(simgrid::xbt::ReplayAction& action)
708 /* The structure of the allgatherv action for the rank 0 (total 4 processes) is the following:
709 0 allGatherV 275427 275427 275427 275427 204020
711 1) 275427 is the sendcount
712 2) The next four elements declare the recvcounts array
713 3) No more values mean that the datatype for sent and receive buffer is the default one, see
714 simgrid::smpi::Datatype::decode().
716 double clock = smpi_process()->simulated_elapsed();
718 unsigned long comm_size = MPI_COMM_WORLD->size();
719 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
720 int sendcount = std::stoi(action[2]);
721 std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
722 std::vector<int> disps(comm_size, 0);
724 int datatype_index = 0, disp_index = 0;
725 if (action.size() > 3 + 2 * comm_size) { /* datatype + disp are specified */
726 datatype_index = 3 + comm_size;
727 disp_index = datatype_index + 1;
728 } else if (action.size() > 3 + 2 * comm_size) { /* disps specified; datatype is not specified; use the default one */
730 disp_index = 3 + comm_size;
731 } else if (action.size() > 3 + comm_size) { /* only datatype, no disp specified */
732 datatype_index = 3 + comm_size;
735 if (disp_index != 0) {
736 for (unsigned int i = 0; i < comm_size; i++)
737 disps[i] = std::stoi(action[disp_index + i]);
740 MPI_Datatype MPI_CURRENT_TYPE{(datatype_index > 0) ? simgrid::smpi::Datatype::decode(action[datatype_index])
742 MPI_Datatype MPI_CURRENT_TYPE2{(datatype_index > 0) ? simgrid::smpi::Datatype::decode(action[datatype_index])
745 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
747 for (unsigned int i = 0; i < comm_size; i++) {
748 (*recvcounts)[i] = std::stoi(action[i + 3]);
750 int recv_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
751 void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
753 int my_proc_id = Actor::self()->getPid();
755 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
756 new simgrid::instr::VarCollTIData("allGatherV", -1, sendcount, nullptr, -1, recvcounts,
757 Datatype::encode(MPI_CURRENT_TYPE),
758 Datatype::encode(MPI_CURRENT_TYPE2)));
760 Colls::allgatherv(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts->data(), disps.data(), MPI_CURRENT_TYPE2,
763 TRACE_smpi_comm_out(my_proc_id);
764 log_timed_action (action, clock);
767 static void action_allToAllv(simgrid::xbt::ReplayAction& action)
769 /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
770 0 allToAllV 100 1 7 10 12 100 1 70 10 5
772 1) 100 is the size of the send buffer *sizeof(int),
773 2) 1 7 10 12 is the sendcounts array
774 3) 100*sizeof(int) is the size of the receiver buffer
775 4) 1 70 10 5 is the recvcounts array
777 double clock = smpi_process()->simulated_elapsed();
779 unsigned long comm_size = MPI_COMM_WORLD->size();
780 CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
781 std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
782 std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
783 std::vector<int> senddisps(comm_size, 0);
784 std::vector<int> recvdisps(comm_size, 0);
786 MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 5 + 2 * comm_size)
787 ? simgrid::smpi::Datatype::decode(action[4 + 2 * comm_size])
789 MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5 + 2 * comm_size)
790 ? simgrid::smpi::Datatype::decode(action[5 + 2 * comm_size])
793 int send_buf_size=parse_double(action[2]);
794 int recv_buf_size=parse_double(action[3+comm_size]);
795 int my_proc_id = Actor::self()->getPid();
796 void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* MPI_CURRENT_TYPE->size());
797 void *recvbuf = smpi_get_tmp_recvbuffer(recv_buf_size* MPI_CURRENT_TYPE2->size());
799 for (unsigned int i = 0; i < comm_size; i++) {
800 (*sendcounts)[i] = std::stoi(action[3 + i]);
801 (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
803 int send_size = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
804 int recv_size = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
806 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
807 new simgrid::instr::VarCollTIData("allToAllV", -1, send_size, sendcounts, recv_size, recvcounts,
808 Datatype::encode(MPI_CURRENT_TYPE),
809 Datatype::encode(MPI_CURRENT_TYPE2)));
811 Colls::alltoallv(sendbuf, sendcounts->data(), senddisps.data(), MPI_CURRENT_TYPE, recvbuf, recvcounts->data(),
812 recvdisps.data(), MPI_CURRENT_TYPE, MPI_COMM_WORLD);
814 TRACE_smpi_comm_out(my_proc_id);
815 log_timed_action (action, clock);
818 }} // namespace simgrid::smpi
820 /** @brief Only initialize the replay, don't do it for real */
821 void smpi_replay_init(int* argc, char*** argv)
823 simgrid::smpi::Process::init(argc, argv);
824 smpi_process()->mark_as_initialized();
825 smpi_process()->set_replaying(true);
827 int my_proc_id = Actor::self()->getPid();
828 TRACE_smpi_init(my_proc_id);
829 TRACE_smpi_computing_init(my_proc_id);
830 TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
831 TRACE_smpi_comm_out(my_proc_id);
832 xbt_replay_action_register("init", simgrid::smpi::action_init);
833 xbt_replay_action_register("finalize", simgrid::smpi::action_finalize);
834 xbt_replay_action_register("comm_size", simgrid::smpi::action_comm_size);
835 xbt_replay_action_register("comm_split", simgrid::smpi::action_comm_split);
836 xbt_replay_action_register("comm_dup", simgrid::smpi::action_comm_dup);
838 std::shared_ptr<simgrid::smpi::Replay::SendAction> isend(new simgrid::smpi::Replay::SendAction("Isend"));
839 std::shared_ptr<simgrid::smpi::Replay::SendAction> send(new simgrid::smpi::Replay::SendAction("send"));
840 std::shared_ptr<simgrid::smpi::Replay::RecvAction> irecv(new simgrid::smpi::Replay::RecvAction("Irecv"));
841 std::shared_ptr<simgrid::smpi::Replay::RecvAction> recv(new simgrid::smpi::Replay::RecvAction("recv"));
842 std::shared_ptr<simgrid::smpi::Replay::WaitAction> wait(new simgrid::smpi::Replay::WaitAction());
844 xbt_replay_action_register("send",
845 std::bind(&simgrid::smpi::Replay::SendAction::execute, send, std::placeholders::_1));
846 xbt_replay_action_register("Isend",
847 std::bind(&simgrid::smpi::Replay::SendAction::execute, isend, std::placeholders::_1));
848 xbt_replay_action_register("recv",
849 std::bind(&simgrid::smpi::Replay::RecvAction::execute, recv, std::placeholders::_1));
850 xbt_replay_action_register("Irecv",
851 std::bind(&simgrid::smpi::Replay::RecvAction::execute, irecv, std::placeholders::_1));
852 xbt_replay_action_register("test", simgrid::smpi::action_test);
853 xbt_replay_action_register("wait",
854 std::bind(&simgrid::smpi::Replay::WaitAction::execute, wait, std::placeholders::_1));
855 xbt_replay_action_register("waitAll", simgrid::smpi::action_waitall);
856 xbt_replay_action_register("barrier", simgrid::smpi::action_barrier);
857 xbt_replay_action_register("bcast", simgrid::smpi::action_bcast);
858 xbt_replay_action_register("reduce", simgrid::smpi::action_reduce);
859 xbt_replay_action_register("allReduce", simgrid::smpi::action_allReduce);
860 xbt_replay_action_register("allToAll", simgrid::smpi::action_allToAll);
861 xbt_replay_action_register("allToAllV", simgrid::smpi::action_allToAllv);
862 xbt_replay_action_register("gather", simgrid::smpi::action_gather);
863 xbt_replay_action_register("scatter", simgrid::smpi::action_scatter);
864 xbt_replay_action_register("gatherV", simgrid::smpi::action_gatherv);
865 xbt_replay_action_register("scatterV", simgrid::smpi::action_scatterv);
866 xbt_replay_action_register("allGather", simgrid::smpi::action_allgather);
867 xbt_replay_action_register("allGatherV", simgrid::smpi::action_allgatherv);
868 xbt_replay_action_register("reduceScatter", simgrid::smpi::action_reducescatter);
869 xbt_replay_action_register("compute", simgrid::smpi::action_compute);
871 //if we have a delayed start, sleep here.
873 double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
874 XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
875 smpi_execute_flops(value);
877 //UGLY: force a context switch to be sure that all MSG_processes begin initialization
878 XBT_DEBUG("Force context switch by smpi_execute_flops - Sleeping for 0.0 flops ");
879 smpi_execute_flops(0.0);
883 /** @brief actually run the replay after initialization */
884 void smpi_replay_main(int* argc, char*** argv)
886 simgrid::xbt::replay_runner(*argc, *argv);
888 /* and now, finalize everything */
889 /* One active process will stop. Decrease the counter*/
890 XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
891 if (not get_reqq_self()->empty()) {
892 unsigned int count_requests=get_reqq_self()->size();
893 MPI_Request requests[count_requests];
894 MPI_Status status[count_requests];
897 for (auto const& req : *get_reqq_self()) {
901 simgrid::smpi::Request::waitall(count_requests, requests, status);
903 delete get_reqq_self();
906 if(active_processes==0){
907 /* Last process alive speaking: end the simulated timer */
908 XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
909 smpi_free_replay_tmp_buffers();
912 TRACE_smpi_comm_in(Actor::self()->getPid(), "smpi_replay_run_finalize", new simgrid::instr::NoOpTIData("finalize"));
914 smpi_process()->finalize();
916 TRACE_smpi_comm_out(Actor::self()->getPid());
917 TRACE_smpi_finalize(Actor::self()->getPid());
920 /** @brief chain a replay initialization and a replay start */
921 void smpi_replay_run(int* argc, char*** argv)
923 smpi_replay_init(argc, argv);
924 smpi_replay_main(argc, argv);