1 /* Copyright (c) 2009-2017. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
15 #include <boost/algorithm/string/join.hpp>
18 #include <unordered_map>
21 using simgrid::s4u::Actor;
23 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
25 static int active_processes = 0;
26 static std::unordered_map<int, std::vector<MPI_Request>*> reqq;
28 static MPI_Datatype MPI_DEFAULT_TYPE;
30 #define CHECK_ACTION_PARAMS(action, mandatory, optional) \
32 if (action.size() < static_cast<unsigned long>(mandatory + 2)) \
33 THROWF(arg_error, 0, "%s replay failed.\n" \
34 "%lu items were given on the line. First two should be process_id and action. " \
35 "This action needs after them %lu mandatory arguments, and accepts %lu optional ones. \n" \
36 "Please contact the Simgrid team if support is needed", \
37 __FUNCTION__, action.size(), static_cast<unsigned long>(mandatory), \
38 static_cast<unsigned long>(optional)); \
41 static void log_timed_action(simgrid::xbt::ReplayAction& action, double clock)
43 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
44 std::string s = boost::algorithm::join(action, " ");
45 XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
49 static std::vector<MPI_Request>* get_reqq_self()
51 return reqq.at(Actor::self()->getPid());
54 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
56 reqq.insert({Actor::self()->getPid(), mpi_request});
60 static double parse_double(std::string string)
62 return xbt_str_parse_double(string.c_str(), "%s is not a double");
69 class ActionArgParser {
71 virtual void parse(simgrid::xbt::ReplayAction& action) { CHECK_ACTION_PARAMS(action, 0, 0) }
74 class SendRecvParser : public ActionArgParser {
76 /* communication partner; if we send, this is the receiver and vice versa */
79 MPI_Datatype datatype1 = MPI_DEFAULT_TYPE;
81 void parse(simgrid::xbt::ReplayAction& action) override
83 CHECK_ACTION_PARAMS(action, 2, 1)
84 partner = std::stoi(action[2]);
85 size = parse_double(action[3]);
86 if (action.size() > 4)
87 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
91 class ComputeParser : public ActionArgParser {
93 /* communication partner; if we send, this is the receiver and vice versa */
96 void parse(simgrid::xbt::ReplayAction& action) override
98 CHECK_ACTION_PARAMS(action, 1, 0)
99 flops = parse_double(action[2]);
103 class CollCommParser : public ActionArgParser {
111 MPI_Datatype datatype1 = MPI_DEFAULT_TYPE;
112 MPI_Datatype datatype2 = MPI_DEFAULT_TYPE;
115 class BcastArgParser : public CollCommParser {
117 void parse(simgrid::xbt::ReplayAction& action) override
119 CHECK_ACTION_PARAMS(action, 1, 2)
120 size = parse_double(action[2]);
121 root = (action.size() > 3) ? std::stoi(action[3]) : 0;
122 if (action.size() > 4)
123 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
127 class ReduceArgParser : public CollCommParser {
129 void parse(simgrid::xbt::ReplayAction& action) override
131 CHECK_ACTION_PARAMS(action, 2, 2)
132 comm_size = parse_double(action[2]);
133 comp_size = parse_double(action[3]);
134 root = (action.size() > 4) ? std::stoi(action[4]) : 0;
135 if (action.size() > 5)
136 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
140 class AllReduceArgParser : public CollCommParser {
142 void parse(simgrid::xbt::ReplayAction& action) override
144 CHECK_ACTION_PARAMS(action, 2, 1)
145 comm_size = parse_double(action[2]);
146 comp_size = parse_double(action[3]);
147 if (action.size() > 4)
148 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
152 class AllToAllArgParser : public CollCommParser {
154 void parse(simgrid::xbt::ReplayAction& action) override
156 CHECK_ACTION_PARAMS(action, 2, 1)
157 comm_size = MPI_COMM_WORLD->size();
158 send_size = parse_double(action[2]);
159 recv_size = parse_double(action[3]);
161 if (action.size() > 4)
162 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
163 if (action.size() > 5)
164 datatype2 = simgrid::smpi::Datatype::decode(action[5]);
168 class GatherParser : public CollCommParser {
170 void parse(simgrid::xbt::ReplayAction& action) override
172 /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
175 1) 68 is the sendcounts
176 2) 68 is the recvcounts
177 3) 0 is the root node
178 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
179 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
181 CHECK_ACTION_PARAMS(action, 2, 3)
182 comm_size = MPI_COMM_WORLD->size();
183 send_size = parse_double(action[2]);
184 recv_size = parse_double(action[3]);
185 root = (action.size() > 4) ? std::stoi(action[4]) : 0;
187 if (action.size() > 5)
188 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
189 if (action.size() > 6)
190 datatype2 = simgrid::smpi::Datatype::decode(action[6]);
194 template <class T> class ReplayAction {
196 const std::string name;
202 explicit ReplayAction(std::string name) : name(name), my_proc_id(simgrid::s4u::Actor::self()->getPid()) {}
204 virtual void execute(simgrid::xbt::ReplayAction& action)
206 // Needs to be re-initialized for every action, hence here
207 double start_time = smpi_process()->simulated_elapsed();
211 log_timed_action(action, start_time);
214 virtual void kernel(simgrid::xbt::ReplayAction& action) = 0;
216 void* send_buffer(int size)
218 return smpi_get_tmp_sendbuffer(size);
221 void* recv_buffer(int size)
223 return smpi_get_tmp_recvbuffer(size);
227 class WaitAction : public ReplayAction<ActionArgParser> {
229 WaitAction() : ReplayAction("Wait") {}
230 void kernel(simgrid::xbt::ReplayAction& action) override
232 std::string s = boost::algorithm::join(action, " ");
233 xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
234 MPI_Request request = get_reqq_self()->back();
235 get_reqq_self()->pop_back();
237 if (request == nullptr) {
238 /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just
243 int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
245 // Must be taken before Request::wait() since the request may be set to
246 // MPI_REQUEST_NULL by Request::wait!
247 int src = request->comm()->group()->rank(request->src());
248 int dst = request->comm()->group()->rank(request->dst());
249 bool is_wait_for_receive = (request->flags() & RECV);
250 // TODO: Here we take the rank while we normally take the process id (look for my_proc_id)
251 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::NoOpTIData("wait"));
254 Request::wait(&request, &status);
256 TRACE_smpi_comm_out(rank);
257 if (is_wait_for_receive)
258 TRACE_smpi_recv(src, dst, 0);
262 class SendAction : public ReplayAction<SendRecvParser> {
264 SendAction() = delete;
265 SendAction(std::string name) : ReplayAction(name) {}
266 void kernel(simgrid::xbt::ReplayAction& action) override
268 int dst_traced = MPI_COMM_WORLD->group()->actor(args.partner)->getPid();
270 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
271 Datatype::encode(args.datatype1)));
272 if (not TRACE_smpi_view_internals())
273 TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, args.size * args.datatype1->size());
275 if (name == "send") {
276 Request::send(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
277 } else if (name == "Isend") {
278 MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
279 get_reqq_self()->push_back(request);
281 xbt_die("Don't know this action, %s", name.c_str());
284 TRACE_smpi_comm_out(my_proc_id);
288 class RecvAction : public ReplayAction<SendRecvParser> {
290 RecvAction() = delete;
291 explicit RecvAction(std::string name) : ReplayAction(name) {}
292 void kernel(simgrid::xbt::ReplayAction& action) override
294 int src_traced = MPI_COMM_WORLD->group()->actor(args.partner)->getPid();
296 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
297 Datatype::encode(args.datatype1)));
300 // unknown size from the receiver point of view
301 if (args.size <= 0.0) {
302 Request::probe(args.partner, 0, MPI_COMM_WORLD, &status);
303 args.size = status.count;
306 if (name == "recv") {
307 Request::recv(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD, &status);
308 } else if (name == "Irecv") {
309 MPI_Request request = Request::irecv(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
310 get_reqq_self()->push_back(request);
313 TRACE_smpi_comm_out(my_proc_id);
314 // TODO: Check why this was only activated in the "recv" case and not in the "Irecv" case
315 if (name == "recv" && not TRACE_smpi_view_internals()) {
316 TRACE_smpi_recv(src_traced, my_proc_id, 0);
321 class ComputeAction : public ReplayAction<ComputeParser> {
323 ComputeAction() : ReplayAction("compute") {}
324 void kernel(simgrid::xbt::ReplayAction& action) override
326 TRACE_smpi_computing_in(my_proc_id, args.flops);
327 smpi_execute_flops(args.flops);
328 TRACE_smpi_computing_out(my_proc_id);
332 class TestAction : public ReplayAction<ActionArgParser> {
334 TestAction() : ReplayAction("Test") {}
335 void kernel(simgrid::xbt::ReplayAction& action) override
337 MPI_Request request = get_reqq_self()->back();
338 get_reqq_self()->pop_back();
339 // if request is null here, this may mean that a previous test has succeeded
340 // Different times in traced application and replayed version may lead to this
341 // In this case, ignore the extra calls.
342 if (request != nullptr) {
343 TRACE_smpi_testing_in(my_proc_id);
346 int flag = Request::test(&request, &status);
348 XBT_DEBUG("MPI_Test result: %d", flag);
349 /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
351 get_reqq_self()->push_back(request);
353 TRACE_smpi_testing_out(my_proc_id);
358 class InitAction : public ReplayAction<ActionArgParser> {
360 InitAction() : ReplayAction("Init") {}
361 void kernel(simgrid::xbt::ReplayAction& action) override
363 CHECK_ACTION_PARAMS(action, 0, 1)
364 MPI_DEFAULT_TYPE = (action.size() > 2) ? MPI_DOUBLE // default MPE datatype
365 : MPI_BYTE; // default TAU datatype
367 /* start a simulated timer */
368 smpi_process()->simulated_start();
369 /*initialize the number of active processes */
370 active_processes = smpi_process_count();
372 set_reqq_self(new std::vector<MPI_Request>);
376 class CommunicatorAction : public ReplayAction<ActionArgParser> {
378 CommunicatorAction() : ReplayAction("Comm") {}
379 void kernel(simgrid::xbt::ReplayAction& action) override { /* nothing to do */}
382 class WaitAllAction : public ReplayAction<ActionArgParser> {
384 WaitAllAction() : ReplayAction("waitAll") {}
385 void kernel(simgrid::xbt::ReplayAction& action) override
387 const unsigned int count_requests = get_reqq_self()->size();
389 if (count_requests > 0) {
390 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
391 new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
392 std::vector<std::pair</*sender*/int,/*recv*/int>> sender_receiver;
393 for (const auto& req : (*get_reqq_self())) {
394 if (req && (req->flags() & RECV)) {
395 sender_receiver.push_back({req->src(), req->dst()});
398 MPI_Status status[count_requests];
399 Request::waitall(count_requests, &(*get_reqq_self())[0], status);
401 for (auto& pair : sender_receiver) {
402 TRACE_smpi_recv(pair.first, pair.second, 0);
404 TRACE_smpi_comm_out(my_proc_id);
409 class BarrierAction : public ReplayAction<ActionArgParser> {
411 BarrierAction() : ReplayAction("barrier") {}
412 void kernel(simgrid::xbt::ReplayAction& action) override
414 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::NoOpTIData("barrier"));
415 Colls::barrier(MPI_COMM_WORLD);
416 TRACE_smpi_comm_out(my_proc_id);
420 class BcastAction : public ReplayAction<BcastArgParser> {
422 BcastAction() : ReplayAction("bcast") {}
423 void kernel(simgrid::xbt::ReplayAction& action) override
425 TRACE_smpi_comm_in(my_proc_id, "action_bcast",
426 new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(args.root)->getPid(),
427 -1.0, args.size, -1, Datatype::encode(args.datatype1), ""));
429 Colls::bcast(send_buffer(args.size * args.datatype1->size()), args.size, args.datatype1, args.root, MPI_COMM_WORLD);
431 TRACE_smpi_comm_out(my_proc_id);
435 class ReduceAction : public ReplayAction<ReduceArgParser> {
437 ReduceAction() : ReplayAction("reduce") {}
438 void kernel(simgrid::xbt::ReplayAction& action) override
440 TRACE_smpi_comm_in(my_proc_id, "action_reduce",
441 new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(args.root)->getPid(), args.comp_size,
442 args.comm_size, -1, Datatype::encode(args.datatype1), ""));
444 Colls::reduce(send_buffer(args.comm_size * args.datatype1->size()),
445 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL, args.root, MPI_COMM_WORLD);
446 smpi_execute_flops(args.comp_size);
448 TRACE_smpi_comm_out(my_proc_id);
452 class AllReduceAction : public ReplayAction<AllReduceArgParser> {
454 AllReduceAction() : ReplayAction("allReduce") {}
455 void kernel(simgrid::xbt::ReplayAction& action) override
457 TRACE_smpi_comm_in(my_proc_id, "action_allReduce", new simgrid::instr::CollTIData("allReduce", -1, args.comp_size, args.comm_size, -1,
458 Datatype::encode(args.datatype1), ""));
460 Colls::allreduce(send_buffer(args.comm_size * args.datatype1->size()),
461 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
462 smpi_execute_flops(args.comp_size);
464 TRACE_smpi_comm_out(my_proc_id);
468 class AllToAllAction : public ReplayAction<AllToAllArgParser> {
470 AllToAllAction() : ReplayAction("allToAll") {}
471 void kernel(simgrid::xbt::ReplayAction& action) override
473 TRACE_smpi_comm_in(my_proc_id, "action_allToAll",
474 new simgrid::instr::CollTIData("allToAll", -1, -1.0, args.send_size, args.recv_size,
475 Datatype::encode(args.datatype1),
476 Datatype::encode(args.datatype2)));
478 Colls::alltoall(send_buffer(args.send_size*args.comm_size* args.datatype1->size()),
479 args.send_size, args.datatype1, recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()),
480 args.recv_size, args.datatype2, MPI_COMM_WORLD);
482 TRACE_smpi_comm_out(my_proc_id);
486 class GatherAction : public ReplayAction<GatherArgParser> {
488 GatherAction(std::string name) : ReplayAction(name) {}
489 void kernel(simgrid::xbt::ReplayAction& action) override
491 TRACE_smpi_comm_in(my_proc_id, name, new simgrid::instr::CollTIData(name, (name == "gather") ? args.root : -1, -1.0, args.send_size, args.recv_size,
492 Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
494 if (name == "gather")
495 Colls::gather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
496 (rank == args.root) ? recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()) : nullptr, args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
498 Colls::allgather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
499 recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, MPI_COMM_WORLD);
501 TRACE_smpi_comm_out(my_proc_id);
504 } // Replay Namespace
506 static void action_scatter(simgrid::xbt::ReplayAction& action)
508 /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
511 1) 68 is the sendcounts
512 2) 68 is the recvcounts
513 3) 0 is the root node
514 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
515 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
517 CHECK_ACTION_PARAMS(action, 2, 3)
518 double clock = smpi_process()->simulated_elapsed();
519 unsigned long comm_size = MPI_COMM_WORLD->size();
520 int send_size = parse_double(action[2]);
521 int recv_size = parse_double(action[3]);
522 MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
523 MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[6]) : MPI_DEFAULT_TYPE};
525 void* send = smpi_get_tmp_sendbuffer(send_size * MPI_CURRENT_TYPE->size());
526 void* recv = nullptr;
527 int root = (action.size() > 4) ? std::stoi(action[4]) : 0;
528 int rank = MPI_COMM_WORLD->rank();
531 recv = smpi_get_tmp_recvbuffer(recv_size * comm_size * MPI_CURRENT_TYPE2->size());
533 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
534 Datatype::encode(MPI_CURRENT_TYPE),
535 Datatype::encode(MPI_CURRENT_TYPE2)));
537 Colls::scatter(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
539 TRACE_smpi_comm_out(Actor::self()->getPid());
540 log_timed_action(action, clock);
543 static void action_gatherv(simgrid::xbt::ReplayAction& action)
545 /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
546 0 gather 68 68 10 10 10 0 0 0
548 1) 68 is the sendcount
549 2) 68 10 10 10 is the recvcounts
550 3) 0 is the root node
551 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
552 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
554 double clock = smpi_process()->simulated_elapsed();
555 unsigned long comm_size = MPI_COMM_WORLD->size();
556 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
557 int send_size = parse_double(action[2]);
558 std::vector<int> disps(comm_size, 0);
559 std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
561 MPI_Datatype MPI_CURRENT_TYPE =
562 (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
563 MPI_Datatype MPI_CURRENT_TYPE2{
564 (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
566 void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
567 void *recv = nullptr;
568 for (unsigned int i = 0; i < comm_size; i++) {
569 (*recvcounts)[i] = std::stoi(action[i + 3]);
571 int recv_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
573 int root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
574 int rank = MPI_COMM_WORLD->rank();
577 recv = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
579 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData(
580 "gatherV", root, send_size, nullptr, -1, recvcounts,
581 Datatype::encode(MPI_CURRENT_TYPE), Datatype::encode(MPI_CURRENT_TYPE2)));
583 Colls::gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts->data(), disps.data(), MPI_CURRENT_TYPE2, root,
586 TRACE_smpi_comm_out(Actor::self()->getPid());
587 log_timed_action (action, clock);
590 static void action_scatterv(simgrid::xbt::ReplayAction& action)
592 /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
593 0 gather 68 10 10 10 68 0 0 0
595 1) 68 10 10 10 is the sendcounts
596 2) 68 is the recvcount
597 3) 0 is the root node
598 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
599 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
601 double clock = smpi_process()->simulated_elapsed();
602 unsigned long comm_size = MPI_COMM_WORLD->size();
603 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
604 int recv_size = parse_double(action[2 + comm_size]);
605 std::vector<int> disps(comm_size, 0);
606 std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
608 MPI_Datatype MPI_CURRENT_TYPE =
609 (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
610 MPI_Datatype MPI_CURRENT_TYPE2{
611 (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
613 void* send = nullptr;
614 void* recv = smpi_get_tmp_recvbuffer(recv_size * MPI_CURRENT_TYPE->size());
615 for (unsigned int i = 0; i < comm_size; i++) {
616 (*sendcounts)[i] = std::stoi(action[i + 2]);
618 int send_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
620 int root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
621 int rank = MPI_COMM_WORLD->rank();
624 send = smpi_get_tmp_sendbuffer(send_sum * MPI_CURRENT_TYPE2->size());
626 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData("gatherV", root, -1, sendcounts, recv_size,
627 nullptr, Datatype::encode(MPI_CURRENT_TYPE),
628 Datatype::encode(MPI_CURRENT_TYPE2)));
630 Colls::scatterv(send, sendcounts->data(), disps.data(), MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root,
633 TRACE_smpi_comm_out(Actor::self()->getPid());
634 log_timed_action(action, clock);
637 static void action_reducescatter(simgrid::xbt::ReplayAction& action)
639 /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
640 0 reduceScatter 275427 275427 275427 204020 11346849 0
642 1) The first four values after the name of the action declare the recvcounts array
643 2) The value 11346849 is the amount of instructions
644 3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
646 double clock = smpi_process()->simulated_elapsed();
647 unsigned long comm_size = MPI_COMM_WORLD->size();
648 CHECK_ACTION_PARAMS(action, comm_size+1, 1)
649 int comp_size = parse_double(action[2+comm_size]);
650 int my_proc_id = Actor::self()->getPid();
651 std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>);
652 MPI_Datatype MPI_CURRENT_TYPE =
653 (action.size() > 3 + comm_size) ? simgrid::smpi::Datatype::decode(action[3 + comm_size]) : MPI_DEFAULT_TYPE;
655 for (unsigned int i = 0; i < comm_size; i++) {
656 recvcounts->push_back(std::stoi(action[i + 2]));
658 int size{std::accumulate(recvcounts->begin(), recvcounts->end(), 0)};
660 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
661 new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, recvcounts,
662 std::to_string(comp_size), /* ugly hack to print comp_size */
663 Datatype::encode(MPI_CURRENT_TYPE)));
665 void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
666 void *recvbuf = smpi_get_tmp_recvbuffer(size* MPI_CURRENT_TYPE->size());
668 Colls::reduce_scatter(sendbuf, recvbuf, recvcounts->data(), MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
669 smpi_execute_flops(comp_size);
671 TRACE_smpi_comm_out(my_proc_id);
672 log_timed_action (action, clock);
675 static void action_allgatherv(simgrid::xbt::ReplayAction& action)
677 /* The structure of the allgatherv action for the rank 0 (total 4 processes) is the following:
678 0 allGatherV 275427 275427 275427 275427 204020
680 1) 275427 is the sendcount
681 2) The next four elements declare the recvcounts array
682 3) No more values mean that the datatype for sent and receive buffer is the default one, see
683 simgrid::smpi::Datatype::decode().
685 double clock = smpi_process()->simulated_elapsed();
687 unsigned long comm_size = MPI_COMM_WORLD->size();
688 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
689 int sendcount = std::stoi(action[2]);
690 std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
691 std::vector<int> disps(comm_size, 0);
693 int datatype_index = 0, disp_index = 0;
694 if (action.size() > 3 + 2 * comm_size) { /* datatype + disp are specified */
695 datatype_index = 3 + comm_size;
696 disp_index = datatype_index + 1;
697 } else if (action.size() > 3 + 2 * comm_size) { /* disps specified; datatype is not specified; use the default one */
699 disp_index = 3 + comm_size;
700 } else if (action.size() > 3 + comm_size) { /* only datatype, no disp specified */
701 datatype_index = 3 + comm_size;
704 if (disp_index != 0) {
705 for (unsigned int i = 0; i < comm_size; i++)
706 disps[i] = std::stoi(action[disp_index + i]);
709 MPI_Datatype MPI_CURRENT_TYPE{(datatype_index > 0) ? simgrid::smpi::Datatype::decode(action[datatype_index])
711 MPI_Datatype MPI_CURRENT_TYPE2{(datatype_index > 0) ? simgrid::smpi::Datatype::decode(action[datatype_index])
714 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
716 for (unsigned int i = 0; i < comm_size; i++) {
717 (*recvcounts)[i] = std::stoi(action[i + 3]);
719 int recv_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
720 void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
722 int my_proc_id = Actor::self()->getPid();
724 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
725 new simgrid::instr::VarCollTIData("allGatherV", -1, sendcount, nullptr, -1, recvcounts,
726 Datatype::encode(MPI_CURRENT_TYPE),
727 Datatype::encode(MPI_CURRENT_TYPE2)));
729 Colls::allgatherv(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts->data(), disps.data(), MPI_CURRENT_TYPE2,
732 TRACE_smpi_comm_out(my_proc_id);
733 log_timed_action (action, clock);
736 static void action_allToAllv(simgrid::xbt::ReplayAction& action)
738 /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
739 0 allToAllV 100 1 7 10 12 100 1 70 10 5
741 1) 100 is the size of the send buffer *sizeof(int),
742 2) 1 7 10 12 is the sendcounts array
743 3) 100*sizeof(int) is the size of the receiver buffer
744 4) 1 70 10 5 is the recvcounts array
746 double clock = smpi_process()->simulated_elapsed();
748 unsigned long comm_size = MPI_COMM_WORLD->size();
749 CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
750 std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
751 std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
752 std::vector<int> senddisps(comm_size, 0);
753 std::vector<int> recvdisps(comm_size, 0);
755 MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 5 + 2 * comm_size)
756 ? simgrid::smpi::Datatype::decode(action[4 + 2 * comm_size])
758 MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5 + 2 * comm_size)
759 ? simgrid::smpi::Datatype::decode(action[5 + 2 * comm_size])
762 int send_buf_size=parse_double(action[2]);
763 int recv_buf_size=parse_double(action[3+comm_size]);
764 int my_proc_id = Actor::self()->getPid();
765 void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* MPI_CURRENT_TYPE->size());
766 void *recvbuf = smpi_get_tmp_recvbuffer(recv_buf_size* MPI_CURRENT_TYPE2->size());
768 for (unsigned int i = 0; i < comm_size; i++) {
769 (*sendcounts)[i] = std::stoi(action[3 + i]);
770 (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
772 int send_size = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
773 int recv_size = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
775 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
776 new simgrid::instr::VarCollTIData("allToAllV", -1, send_size, sendcounts, recv_size, recvcounts,
777 Datatype::encode(MPI_CURRENT_TYPE),
778 Datatype::encode(MPI_CURRENT_TYPE2)));
780 Colls::alltoallv(sendbuf, sendcounts->data(), senddisps.data(), MPI_CURRENT_TYPE, recvbuf, recvcounts->data(),
781 recvdisps.data(), MPI_CURRENT_TYPE, MPI_COMM_WORLD);
783 TRACE_smpi_comm_out(my_proc_id);
784 log_timed_action (action, clock);
787 }} // namespace simgrid::smpi
789 /** @brief Only initialize the replay, don't do it for real */
790 void smpi_replay_init(int* argc, char*** argv)
792 simgrid::smpi::Process::init(argc, argv);
793 smpi_process()->mark_as_initialized();
794 smpi_process()->set_replaying(true);
796 int my_proc_id = Actor::self()->getPid();
797 TRACE_smpi_init(my_proc_id);
798 TRACE_smpi_computing_init(my_proc_id);
799 TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
800 TRACE_smpi_comm_out(my_proc_id);
801 xbt_replay_action_register("init", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::InitAction().execute(action); });
802 xbt_replay_action_register("finalize", [](simgrid::xbt::ReplayAction& action) { /* nothing to do */ });
803 xbt_replay_action_register("comm_size", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::CommunicatorAction().execute(action); });
804 xbt_replay_action_register("comm_split",[](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::CommunicatorAction().execute(action); });
805 xbt_replay_action_register("comm_dup", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::CommunicatorAction().execute(action); });
807 xbt_replay_action_register("send", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::SendAction("send").execute(action); });
808 xbt_replay_action_register("Isend", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::SendAction("Isend").execute(action); });
809 xbt_replay_action_register("recv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::RecvAction("recv").execute(action); });
810 xbt_replay_action_register("Irecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::RecvAction("Irecv").execute(action); });
811 xbt_replay_action_register("test", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::TestAction().execute(action); });
812 xbt_replay_action_register("wait", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::WaitAction().execute(action); });
813 xbt_replay_action_register("waitAll", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::WaitAllAction().execute(action); });
814 xbt_replay_action_register("barrier", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::BarrierAction().execute(action); });
815 xbt_replay_action_register("bcast", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::BcastAction().execute(action); });
816 xbt_replay_action_register("reduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::ReduceAction().execute(action); });
817 xbt_replay_action_register("allReduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::AllReduceAction().execute(action); });
818 xbt_replay_action_register("allToAll", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::AllToAllAction().execute(action); });
819 xbt_replay_action_register("allToAllV", simgrid::smpi::action_allToAllv);
820 xbt_replay_action_register("gather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::GatherAction("gather").execute(action); });
821 xbt_replay_action_register("scatter", simgrid::smpi::action_scatter);
822 xbt_replay_action_register("gatherV", simgrid::smpi::action_gatherv);
823 xbt_replay_action_register("scatterV", simgrid::smpi::action_scatterv);
824 xbt_replay_action_register("allGather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::GatherAction("allGather").execute(action); });
825 xbt_replay_action_register("allGatherV", simgrid::smpi::action_allgatherv);
826 xbt_replay_action_register("reduceScatter", simgrid::smpi::action_reducescatter);
827 xbt_replay_action_register("compute", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::ComputeAction().execute(action); });
829 //if we have a delayed start, sleep here.
831 double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
832 XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
833 smpi_execute_flops(value);
835 //UGLY: force a context switch to be sure that all MSG_processes begin initialization
836 XBT_DEBUG("Force context switch by smpi_execute_flops - Sleeping for 0.0 flops ");
837 smpi_execute_flops(0.0);
841 /** @brief actually run the replay after initialization */
842 void smpi_replay_main(int* argc, char*** argv)
844 simgrid::xbt::replay_runner(*argc, *argv);
846 /* and now, finalize everything */
847 /* One active process will stop. Decrease the counter*/
848 XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
849 if (not get_reqq_self()->empty()) {
850 unsigned int count_requests=get_reqq_self()->size();
851 MPI_Request requests[count_requests];
852 MPI_Status status[count_requests];
855 for (auto const& req : *get_reqq_self()) {
859 simgrid::smpi::Request::waitall(count_requests, requests, status);
861 delete get_reqq_self();
864 if(active_processes==0){
865 /* Last process alive speaking: end the simulated timer */
866 XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
867 smpi_free_replay_tmp_buffers();
870 TRACE_smpi_comm_in(Actor::self()->getPid(), "smpi_replay_run_finalize", new simgrid::instr::NoOpTIData("finalize"));
872 smpi_process()->finalize();
874 TRACE_smpi_comm_out(Actor::self()->getPid());
875 TRACE_smpi_finalize(Actor::self()->getPid());
878 /** @brief chain a replay initialization and a replay start */
879 void smpi_replay_run(int* argc, char*** argv)
881 smpi_replay_init(argc, argv);
882 smpi_replay_main(argc, argv);