1 /* Copyright (c) 2009-2017. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
15 #include <boost/algorithm/string/join.hpp>
18 #include <unordered_map>
21 using simgrid::s4u::Actor;
23 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
25 static int active_processes = 0;
26 static std::unordered_map<int, std::vector<MPI_Request>*> reqq;
28 static MPI_Datatype MPI_DEFAULT_TYPE;
30 #define CHECK_ACTION_PARAMS(action, mandatory, optional) \
32 if (action.size() < static_cast<unsigned long>(mandatory + 2)) \
33 THROWF(arg_error, 0, "%s replay failed.\n" \
34 "%lu items were given on the line. First two should be process_id and action. " \
35 "This action needs after them %lu mandatory arguments, and accepts %lu optional ones. \n" \
36 "Please contact the Simgrid team if support is needed", \
37 __FUNCTION__, action.size(), static_cast<unsigned long>(mandatory), \
38 static_cast<unsigned long>(optional)); \
41 static void log_timed_action(simgrid::xbt::ReplayAction& action, double clock)
43 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
44 std::string s = boost::algorithm::join(action, " ");
45 XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
49 static std::vector<MPI_Request>* get_reqq_self()
51 return reqq.at(Actor::self()->getPid());
54 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
56 reqq.insert({Actor::self()->getPid(), mpi_request});
60 static double parse_double(std::string string)
62 return xbt_str_parse_double(string.c_str(), "%s is not a double");
69 class ActionArgParser {
71 virtual void parse(simgrid::xbt::ReplayAction& action) { CHECK_ACTION_PARAMS(action, 0, 0) }
74 class SendRecvParser : public ActionArgParser {
76 /* communication partner; if we send, this is the receiver and vice versa */
79 MPI_Datatype datatype1 = MPI_DEFAULT_TYPE;
81 void parse(simgrid::xbt::ReplayAction& action) override
83 CHECK_ACTION_PARAMS(action, 2, 1)
84 partner = std::stoi(action[2]);
85 size = parse_double(action[3]);
86 if (action.size() > 4)
87 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
91 class ComputeParser : public ActionArgParser {
93 /* communication partner; if we send, this is the receiver and vice versa */
96 void parse(simgrid::xbt::ReplayAction& action) override
98 CHECK_ACTION_PARAMS(action, 1, 0)
99 flops = parse_double(action[2]);
103 class BcastArgParser : public ActionArgParser {
107 MPI_Datatype datatype = MPI_DEFAULT_TYPE;
108 void parse(simgrid::xbt::ReplayAction& action) override
110 CHECK_ACTION_PARAMS(action, 1, 2)
111 size = parse_double(action[2]);
112 root = (action.size() > 3) ? std::stoi(action[3]) : 0;
113 if (action.size() > 4)
114 datatype = simgrid::smpi::Datatype::decode(action[4]);
118 template <class T> class ReplayAction {
120 const std::string name;
126 explicit ReplayAction(std::string name) : name(name), my_proc_id(simgrid::s4u::Actor::self()->getPid()) {}
128 virtual void execute(simgrid::xbt::ReplayAction& action)
130 // Needs to be re-initialized for every action, hence here
131 double start_time = smpi_process()->simulated_elapsed();
135 log_timed_action(action, start_time);
138 virtual void kernel(simgrid::xbt::ReplayAction& action) = 0;
141 class WaitAction : public ReplayAction<ActionArgParser> {
143 WaitAction() : ReplayAction("Wait") {}
144 void kernel(simgrid::xbt::ReplayAction& action) override
146 std::string s = boost::algorithm::join(action, " ");
147 xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
148 MPI_Request request = get_reqq_self()->back();
149 get_reqq_self()->pop_back();
151 if (request == nullptr) {
152 /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just
157 int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
159 // Must be taken before Request::wait() since the request may be set to
160 // MPI_REQUEST_NULL by Request::wait!
161 int src = request->comm()->group()->rank(request->src());
162 int dst = request->comm()->group()->rank(request->dst());
163 bool is_wait_for_receive = (request->flags() & RECV);
164 // TODO: Here we take the rank while we normally take the process id (look for my_proc_id)
165 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::NoOpTIData("wait"));
168 Request::wait(&request, &status);
170 TRACE_smpi_comm_out(rank);
171 if (is_wait_for_receive)
172 TRACE_smpi_recv(src, dst, 0);
176 class SendAction : public ReplayAction<SendRecvParser> {
178 SendAction() = delete;
179 SendAction(std::string name) : ReplayAction(name) {}
180 void kernel(simgrid::xbt::ReplayAction& action) override
182 int dst_traced = MPI_COMM_WORLD->group()->actor(args.partner)->getPid();
184 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
185 Datatype::encode(args.datatype1)));
186 if (not TRACE_smpi_view_internals())
187 TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, args.size * args.datatype1->size());
189 if (name == "send") {
190 Request::send(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
191 } else if (name == "Isend") {
192 MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
193 get_reqq_self()->push_back(request);
195 xbt_die("Don't know this action, %s", name.c_str());
198 TRACE_smpi_comm_out(my_proc_id);
202 class RecvAction : public ReplayAction<SendRecvParser> {
204 RecvAction() = delete;
205 explicit RecvAction(std::string name) : ReplayAction(name) {}
206 void kernel(simgrid::xbt::ReplayAction& action) override
208 int src_traced = MPI_COMM_WORLD->group()->actor(args.partner)->getPid();
210 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
211 Datatype::encode(args.datatype1)));
214 // unknown size from the receiver point of view
215 if (args.size <= 0.0) {
216 Request::probe(args.partner, 0, MPI_COMM_WORLD, &status);
217 args.size = status.count;
220 if (name == "recv") {
221 Request::recv(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD, &status);
222 } else if (name == "Irecv") {
223 MPI_Request request = Request::irecv(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
224 get_reqq_self()->push_back(request);
227 TRACE_smpi_comm_out(my_proc_id);
228 // TODO: Check why this was only activated in the "recv" case and not in the "Irecv" case
229 if (name == "recv" && not TRACE_smpi_view_internals()) {
230 TRACE_smpi_recv(src_traced, my_proc_id, 0);
235 class ComputeAction : public ReplayAction<ComputeParser> {
237 ComputeAction() : ReplayAction("compute") {}
238 void kernel(simgrid::xbt::ReplayAction& action) override
240 TRACE_smpi_computing_in(my_proc_id, args.flops);
241 smpi_execute_flops(args.flops);
242 TRACE_smpi_computing_out(my_proc_id);
246 class TestAction : public ReplayAction<ActionArgParser> {
248 TestAction() : ReplayAction("Test") {}
249 void kernel(simgrid::xbt::ReplayAction& action) override
251 MPI_Request request = get_reqq_self()->back();
252 get_reqq_self()->pop_back();
253 // if request is null here, this may mean that a previous test has succeeded
254 // Different times in traced application and replayed version may lead to this
255 // In this case, ignore the extra calls.
256 if (request != nullptr) {
257 TRACE_smpi_testing_in(my_proc_id);
260 int flag = Request::test(&request, &status);
262 XBT_DEBUG("MPI_Test result: %d", flag);
263 /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
265 get_reqq_self()->push_back(request);
267 TRACE_smpi_testing_out(my_proc_id);
272 class InitAction : public ReplayAction<ActionArgParser> {
274 InitAction() : ReplayAction("Init") {}
275 void kernel(simgrid::xbt::ReplayAction& action) override
277 CHECK_ACTION_PARAMS(action, 0, 1)
278 MPI_DEFAULT_TYPE = (action.size() > 2) ? MPI_DOUBLE // default MPE datatype
279 : MPI_BYTE; // default TAU datatype
281 /* start a simulated timer */
282 smpi_process()->simulated_start();
283 /*initialize the number of active processes */
284 active_processes = smpi_process_count();
286 set_reqq_self(new std::vector<MPI_Request>);
290 class CommunicatorAction : public ReplayAction<ActionArgParser> {
292 CommunicatorAction() : ReplayAction("Comm") {}
293 void kernel(simgrid::xbt::ReplayAction& action) override { /* nothing to do */}
296 class WaitAllAction : public ReplayAction<ActionArgParser> {
298 WaitAllAction() : ReplayAction("waitAll") {}
299 void kernel(simgrid::xbt::ReplayAction& action) override
301 const unsigned int count_requests = get_reqq_self()->size();
303 if (count_requests > 0) {
304 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
305 new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
306 std::vector<std::pair</*sender*/int,/*recv*/int>> sender_receiver;
307 for (const auto& req : (*get_reqq_self())) {
308 if (req && (req->flags() & RECV)) {
309 sender_receiver.push_back({req->src(), req->dst()});
312 MPI_Status status[count_requests];
313 Request::waitall(count_requests, &(*get_reqq_self())[0], status);
315 for (auto& pair : sender_receiver) {
316 TRACE_smpi_recv(pair.first, pair.second, 0);
318 TRACE_smpi_comm_out(my_proc_id);
323 class BarrierAction : public ReplayAction<ActionArgParser> {
325 BarrierAction() : ReplayAction("barrier") {}
326 void kernel(simgrid::xbt::ReplayAction& action) override
328 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::NoOpTIData("barrier"));
329 Colls::barrier(MPI_COMM_WORLD);
330 TRACE_smpi_comm_out(my_proc_id);
334 class BcastAction : public ReplayAction<BcastArgParser> {
336 BcastAction() : ReplayAction("bcast") {}
337 void kernel(simgrid::xbt::ReplayAction& action) override
339 TRACE_smpi_comm_in(my_proc_id, "action_bcast",
340 new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(args.root)->getPid(),
341 -1.0, args.size, -1, Datatype::encode(args.datatype), ""));
343 void* sendbuf = smpi_get_tmp_sendbuffer(args.size * args.datatype->size());
345 Colls::bcast(sendbuf, args.size, args.datatype, args.root, MPI_COMM_WORLD);
347 TRACE_smpi_comm_out(my_proc_id);
352 static void action_reduce(simgrid::xbt::ReplayAction& action)
354 CHECK_ACTION_PARAMS(action, 2, 2)
355 double comm_size = parse_double(action[2]);
356 double comp_size = parse_double(action[3]);
357 double clock = smpi_process()->simulated_elapsed();
358 int root = (action.size() > 4) ? std::stoi(action[4]) : 0;
360 MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 5) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE;
362 int my_proc_id = Actor::self()->getPid();
363 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
364 new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(root)->getPid(), comp_size,
365 comm_size, -1, Datatype::encode(MPI_CURRENT_TYPE), ""));
367 void* recvbuf = smpi_get_tmp_recvbuffer(comm_size * MPI_CURRENT_TYPE->size());
368 void* sendbuf = smpi_get_tmp_sendbuffer(comm_size * MPI_CURRENT_TYPE->size());
369 Colls::reduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
370 smpi_execute_flops(comp_size);
372 TRACE_smpi_comm_out(my_proc_id);
373 log_timed_action (action, clock);
376 static void action_allReduce(simgrid::xbt::ReplayAction& action)
378 CHECK_ACTION_PARAMS(action, 2, 1)
379 double comm_size = parse_double(action[2]);
380 double comp_size = parse_double(action[3]);
382 MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE;
384 double clock = smpi_process()->simulated_elapsed();
385 int my_proc_id = Actor::self()->getPid();
386 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::CollTIData("allReduce", -1, comp_size, comm_size, -1,
387 Datatype::encode(MPI_CURRENT_TYPE), ""));
389 void *recvbuf = smpi_get_tmp_recvbuffer(comm_size* MPI_CURRENT_TYPE->size());
390 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
391 Colls::allreduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
392 smpi_execute_flops(comp_size);
394 TRACE_smpi_comm_out(my_proc_id);
395 log_timed_action (action, clock);
398 static void action_allToAll(simgrid::xbt::ReplayAction& action)
400 CHECK_ACTION_PARAMS(action, 2, 2) //two mandatory (send and recv volumes) and two optional (corresponding datatypes)
401 double clock = smpi_process()->simulated_elapsed();
402 unsigned long comm_size = MPI_COMM_WORLD->size();
403 int send_size = parse_double(action[2]);
404 int recv_size = parse_double(action[3]);
405 MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE};
406 MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
408 void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* MPI_CURRENT_TYPE->size());
409 void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
411 int my_proc_id = Actor::self()->getPid();
412 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
413 new simgrid::instr::CollTIData("allToAll", -1, -1.0, send_size, recv_size,
414 Datatype::encode(MPI_CURRENT_TYPE),
415 Datatype::encode(MPI_CURRENT_TYPE2)));
417 Colls::alltoall(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
419 TRACE_smpi_comm_out(my_proc_id);
420 log_timed_action (action, clock);
423 static void action_gather(simgrid::xbt::ReplayAction& action)
425 /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
428 1) 68 is the sendcounts
429 2) 68 is the recvcounts
430 3) 0 is the root node
431 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
432 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
434 CHECK_ACTION_PARAMS(action, 2, 3)
435 double clock = smpi_process()->simulated_elapsed();
436 unsigned long comm_size = MPI_COMM_WORLD->size();
437 int send_size = parse_double(action[2]);
438 int recv_size = parse_double(action[3]);
439 MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
440 MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[6]) : MPI_DEFAULT_TYPE};
442 void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
443 void *recv = nullptr;
444 int root = (action.size() > 4) ? std::stoi(action[4]) : 0;
445 int rank = MPI_COMM_WORLD->rank();
448 recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
450 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
451 Datatype::encode(MPI_CURRENT_TYPE),
452 Datatype::encode(MPI_CURRENT_TYPE2)));
454 Colls::gather(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
456 TRACE_smpi_comm_out(Actor::self()->getPid());
457 log_timed_action (action, clock);
460 static void action_scatter(simgrid::xbt::ReplayAction& action)
462 /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
465 1) 68 is the sendcounts
466 2) 68 is the recvcounts
467 3) 0 is the root node
468 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
469 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
471 CHECK_ACTION_PARAMS(action, 2, 3)
472 double clock = smpi_process()->simulated_elapsed();
473 unsigned long comm_size = MPI_COMM_WORLD->size();
474 int send_size = parse_double(action[2]);
475 int recv_size = parse_double(action[3]);
476 MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
477 MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[6]) : MPI_DEFAULT_TYPE};
479 void* send = smpi_get_tmp_sendbuffer(send_size * MPI_CURRENT_TYPE->size());
480 void* recv = nullptr;
481 int root = (action.size() > 4) ? std::stoi(action[4]) : 0;
482 int rank = MPI_COMM_WORLD->rank();
485 recv = smpi_get_tmp_recvbuffer(recv_size * comm_size * MPI_CURRENT_TYPE2->size());
487 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
488 Datatype::encode(MPI_CURRENT_TYPE),
489 Datatype::encode(MPI_CURRENT_TYPE2)));
491 Colls::scatter(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
493 TRACE_smpi_comm_out(Actor::self()->getPid());
494 log_timed_action(action, clock);
497 static void action_gatherv(simgrid::xbt::ReplayAction& action)
499 /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
500 0 gather 68 68 10 10 10 0 0 0
502 1) 68 is the sendcount
503 2) 68 10 10 10 is the recvcounts
504 3) 0 is the root node
505 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
506 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
508 double clock = smpi_process()->simulated_elapsed();
509 unsigned long comm_size = MPI_COMM_WORLD->size();
510 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
511 int send_size = parse_double(action[2]);
512 std::vector<int> disps(comm_size, 0);
513 std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
515 MPI_Datatype MPI_CURRENT_TYPE =
516 (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
517 MPI_Datatype MPI_CURRENT_TYPE2{
518 (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
520 void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
521 void *recv = nullptr;
522 for (unsigned int i = 0; i < comm_size; i++) {
523 (*recvcounts)[i] = std::stoi(action[i + 3]);
525 int recv_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
527 int root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
528 int rank = MPI_COMM_WORLD->rank();
531 recv = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
533 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData(
534 "gatherV", root, send_size, nullptr, -1, recvcounts,
535 Datatype::encode(MPI_CURRENT_TYPE), Datatype::encode(MPI_CURRENT_TYPE2)));
537 Colls::gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts->data(), disps.data(), MPI_CURRENT_TYPE2, root,
540 TRACE_smpi_comm_out(Actor::self()->getPid());
541 log_timed_action (action, clock);
544 static void action_scatterv(simgrid::xbt::ReplayAction& action)
546 /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
547 0 gather 68 10 10 10 68 0 0 0
549 1) 68 10 10 10 is the sendcounts
550 2) 68 is the recvcount
551 3) 0 is the root node
552 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
553 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
555 double clock = smpi_process()->simulated_elapsed();
556 unsigned long comm_size = MPI_COMM_WORLD->size();
557 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
558 int recv_size = parse_double(action[2 + comm_size]);
559 std::vector<int> disps(comm_size, 0);
560 std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
562 MPI_Datatype MPI_CURRENT_TYPE =
563 (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
564 MPI_Datatype MPI_CURRENT_TYPE2{
565 (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
567 void* send = nullptr;
568 void* recv = smpi_get_tmp_recvbuffer(recv_size * MPI_CURRENT_TYPE->size());
569 for (unsigned int i = 0; i < comm_size; i++) {
570 (*sendcounts)[i] = std::stoi(action[i + 2]);
572 int send_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
574 int root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
575 int rank = MPI_COMM_WORLD->rank();
578 send = smpi_get_tmp_sendbuffer(send_sum * MPI_CURRENT_TYPE2->size());
580 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData("gatherV", root, -1, sendcounts, recv_size,
581 nullptr, Datatype::encode(MPI_CURRENT_TYPE),
582 Datatype::encode(MPI_CURRENT_TYPE2)));
584 Colls::scatterv(send, sendcounts->data(), disps.data(), MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root,
587 TRACE_smpi_comm_out(Actor::self()->getPid());
588 log_timed_action(action, clock);
591 static void action_reducescatter(simgrid::xbt::ReplayAction& action)
593 /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
594 0 reduceScatter 275427 275427 275427 204020 11346849 0
596 1) The first four values after the name of the action declare the recvcounts array
597 2) The value 11346849 is the amount of instructions
598 3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
600 double clock = smpi_process()->simulated_elapsed();
601 unsigned long comm_size = MPI_COMM_WORLD->size();
602 CHECK_ACTION_PARAMS(action, comm_size+1, 1)
603 int comp_size = parse_double(action[2+comm_size]);
604 int my_proc_id = Actor::self()->getPid();
605 std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>);
606 MPI_Datatype MPI_CURRENT_TYPE =
607 (action.size() > 3 + comm_size) ? simgrid::smpi::Datatype::decode(action[3 + comm_size]) : MPI_DEFAULT_TYPE;
609 for (unsigned int i = 0; i < comm_size; i++) {
610 recvcounts->push_back(std::stoi(action[i + 2]));
612 int size{std::accumulate(recvcounts->begin(), recvcounts->end(), 0)};
614 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
615 new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, recvcounts,
616 std::to_string(comp_size), /* ugly hack to print comp_size */
617 Datatype::encode(MPI_CURRENT_TYPE)));
619 void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
620 void *recvbuf = smpi_get_tmp_recvbuffer(size* MPI_CURRENT_TYPE->size());
622 Colls::reduce_scatter(sendbuf, recvbuf, recvcounts->data(), MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
623 smpi_execute_flops(comp_size);
625 TRACE_smpi_comm_out(my_proc_id);
626 log_timed_action (action, clock);
629 static void action_allgather(simgrid::xbt::ReplayAction& action)
631 /* The structure of the allgather action for the rank 0 (total 4 processes) is the following:
632 0 allGather 275427 275427
634 1) 275427 is the sendcount
635 2) 275427 is the recvcount
636 3) No more values mean that the datatype for sent and receive buffer is the default one, see
637 simgrid::smpi::Datatype::decode().
639 double clock = smpi_process()->simulated_elapsed();
641 CHECK_ACTION_PARAMS(action, 2, 2)
642 int sendcount = std::stoi(action[2]);
643 int recvcount = std::stoi(action[3]);
645 MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE};
646 MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
648 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
649 void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* MPI_CURRENT_TYPE2->size());
651 int my_proc_id = Actor::self()->getPid();
653 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
654 new simgrid::instr::CollTIData("allGather", -1, -1.0, sendcount, recvcount,
655 Datatype::encode(MPI_CURRENT_TYPE),
656 Datatype::encode(MPI_CURRENT_TYPE2)));
658 Colls::allgather(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
660 TRACE_smpi_comm_out(my_proc_id);
661 log_timed_action (action, clock);
664 static void action_allgatherv(simgrid::xbt::ReplayAction& action)
666 /* The structure of the allgatherv action for the rank 0 (total 4 processes) is the following:
667 0 allGatherV 275427 275427 275427 275427 204020
669 1) 275427 is the sendcount
670 2) The next four elements declare the recvcounts array
671 3) No more values mean that the datatype for sent and receive buffer is the default one, see
672 simgrid::smpi::Datatype::decode().
674 double clock = smpi_process()->simulated_elapsed();
676 unsigned long comm_size = MPI_COMM_WORLD->size();
677 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
678 int sendcount = std::stoi(action[2]);
679 std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
680 std::vector<int> disps(comm_size, 0);
682 int datatype_index = 0, disp_index = 0;
683 if (action.size() > 3 + 2 * comm_size) { /* datatype + disp are specified */
684 datatype_index = 3 + comm_size;
685 disp_index = datatype_index + 1;
686 } else if (action.size() > 3 + 2 * comm_size) { /* disps specified; datatype is not specified; use the default one */
688 disp_index = 3 + comm_size;
689 } else if (action.size() > 3 + comm_size) { /* only datatype, no disp specified */
690 datatype_index = 3 + comm_size;
693 if (disp_index != 0) {
694 for (unsigned int i = 0; i < comm_size; i++)
695 disps[i] = std::stoi(action[disp_index + i]);
698 MPI_Datatype MPI_CURRENT_TYPE{(datatype_index > 0) ? simgrid::smpi::Datatype::decode(action[datatype_index])
700 MPI_Datatype MPI_CURRENT_TYPE2{(datatype_index > 0) ? simgrid::smpi::Datatype::decode(action[datatype_index])
703 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
705 for (unsigned int i = 0; i < comm_size; i++) {
706 (*recvcounts)[i] = std::stoi(action[i + 3]);
708 int recv_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
709 void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
711 int my_proc_id = Actor::self()->getPid();
713 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
714 new simgrid::instr::VarCollTIData("allGatherV", -1, sendcount, nullptr, -1, recvcounts,
715 Datatype::encode(MPI_CURRENT_TYPE),
716 Datatype::encode(MPI_CURRENT_TYPE2)));
718 Colls::allgatherv(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts->data(), disps.data(), MPI_CURRENT_TYPE2,
721 TRACE_smpi_comm_out(my_proc_id);
722 log_timed_action (action, clock);
725 static void action_allToAllv(simgrid::xbt::ReplayAction& action)
727 /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
728 0 allToAllV 100 1 7 10 12 100 1 70 10 5
730 1) 100 is the size of the send buffer *sizeof(int),
731 2) 1 7 10 12 is the sendcounts array
732 3) 100*sizeof(int) is the size of the receiver buffer
733 4) 1 70 10 5 is the recvcounts array
735 double clock = smpi_process()->simulated_elapsed();
737 unsigned long comm_size = MPI_COMM_WORLD->size();
738 CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
739 std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
740 std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
741 std::vector<int> senddisps(comm_size, 0);
742 std::vector<int> recvdisps(comm_size, 0);
744 MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 5 + 2 * comm_size)
745 ? simgrid::smpi::Datatype::decode(action[4 + 2 * comm_size])
747 MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5 + 2 * comm_size)
748 ? simgrid::smpi::Datatype::decode(action[5 + 2 * comm_size])
751 int send_buf_size=parse_double(action[2]);
752 int recv_buf_size=parse_double(action[3+comm_size]);
753 int my_proc_id = Actor::self()->getPid();
754 void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* MPI_CURRENT_TYPE->size());
755 void *recvbuf = smpi_get_tmp_recvbuffer(recv_buf_size* MPI_CURRENT_TYPE2->size());
757 for (unsigned int i = 0; i < comm_size; i++) {
758 (*sendcounts)[i] = std::stoi(action[3 + i]);
759 (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
761 int send_size = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
762 int recv_size = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
764 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
765 new simgrid::instr::VarCollTIData("allToAllV", -1, send_size, sendcounts, recv_size, recvcounts,
766 Datatype::encode(MPI_CURRENT_TYPE),
767 Datatype::encode(MPI_CURRENT_TYPE2)));
769 Colls::alltoallv(sendbuf, sendcounts->data(), senddisps.data(), MPI_CURRENT_TYPE, recvbuf, recvcounts->data(),
770 recvdisps.data(), MPI_CURRENT_TYPE, MPI_COMM_WORLD);
772 TRACE_smpi_comm_out(my_proc_id);
773 log_timed_action (action, clock);
776 }} // namespace simgrid::smpi
778 /** @brief Only initialize the replay, don't do it for real */
779 void smpi_replay_init(int* argc, char*** argv)
781 simgrid::smpi::Process::init(argc, argv);
782 smpi_process()->mark_as_initialized();
783 smpi_process()->set_replaying(true);
785 int my_proc_id = Actor::self()->getPid();
786 TRACE_smpi_init(my_proc_id);
787 TRACE_smpi_computing_init(my_proc_id);
788 TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
789 TRACE_smpi_comm_out(my_proc_id);
790 xbt_replay_action_register("init", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::InitAction().execute(action); });
791 xbt_replay_action_register("finalize", [](simgrid::xbt::ReplayAction& action) { /* nothing to do */ });
792 xbt_replay_action_register("comm_size", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::CommunicatorAction().execute(action); });
793 xbt_replay_action_register("comm_split",[](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::CommunicatorAction().execute(action); });
794 xbt_replay_action_register("comm_dup", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::CommunicatorAction().execute(action); });
796 xbt_replay_action_register("send", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::SendAction("send").execute(action); });
797 xbt_replay_action_register("Isend", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::SendAction("Isend").execute(action); });
798 xbt_replay_action_register("recv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::RecvAction("recv").execute(action); });
799 xbt_replay_action_register("Irecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::RecvAction("Irecv").execute(action); });
800 xbt_replay_action_register("test", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::TestAction().execute(action); });
801 xbt_replay_action_register("wait", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::WaitAction().execute(action); });
802 xbt_replay_action_register("waitAll", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::WaitAllAction().execute(action); });
803 xbt_replay_action_register("barrier", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::BarrierAction().execute(action); });
804 xbt_replay_action_register("bcast", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::BcastAction().execute(action); });
805 xbt_replay_action_register("reduce", simgrid::smpi::action_reduce);
806 xbt_replay_action_register("allReduce", simgrid::smpi::action_allReduce);
807 xbt_replay_action_register("allToAll", simgrid::smpi::action_allToAll);
808 xbt_replay_action_register("allToAllV", simgrid::smpi::action_allToAllv);
809 xbt_replay_action_register("gather", simgrid::smpi::action_gather);
810 xbt_replay_action_register("scatter", simgrid::smpi::action_scatter);
811 xbt_replay_action_register("gatherV", simgrid::smpi::action_gatherv);
812 xbt_replay_action_register("scatterV", simgrid::smpi::action_scatterv);
813 xbt_replay_action_register("allGather", simgrid::smpi::action_allgather);
814 xbt_replay_action_register("allGatherV", simgrid::smpi::action_allgatherv);
815 xbt_replay_action_register("reduceScatter", simgrid::smpi::action_reducescatter);
816 xbt_replay_action_register("compute", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::Replay::ComputeAction().execute(action); });
818 //if we have a delayed start, sleep here.
820 double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
821 XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
822 smpi_execute_flops(value);
824 //UGLY: force a context switch to be sure that all MSG_processes begin initialization
825 XBT_DEBUG("Force context switch by smpi_execute_flops - Sleeping for 0.0 flops ");
826 smpi_execute_flops(0.0);
830 /** @brief actually run the replay after initialization */
831 void smpi_replay_main(int* argc, char*** argv)
833 simgrid::xbt::replay_runner(*argc, *argv);
835 /* and now, finalize everything */
836 /* One active process will stop. Decrease the counter*/
837 XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
838 if (not get_reqq_self()->empty()) {
839 unsigned int count_requests=get_reqq_self()->size();
840 MPI_Request requests[count_requests];
841 MPI_Status status[count_requests];
844 for (auto const& req : *get_reqq_self()) {
848 simgrid::smpi::Request::waitall(count_requests, requests, status);
850 delete get_reqq_self();
853 if(active_processes==0){
854 /* Last process alive speaking: end the simulated timer */
855 XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
856 smpi_free_replay_tmp_buffers();
859 TRACE_smpi_comm_in(Actor::self()->getPid(), "smpi_replay_run_finalize", new simgrid::instr::NoOpTIData("finalize"));
861 smpi_process()->finalize();
863 TRACE_smpi_comm_out(Actor::self()->getPid());
864 TRACE_smpi_finalize(Actor::self()->getPid());
867 /** @brief chain a replay initialization and a replay start */
868 void smpi_replay_run(int* argc, char*** argv)
870 smpi_replay_init(argc, argv);
871 smpi_replay_main(argc, argv);