1 /* Copyright (c) 2009-2017. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
15 #include <boost/algorithm/string/join.hpp>
18 #include <unordered_map>
21 using simgrid::s4u::Actor;
23 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
25 static int communicator_size = 0;
26 static int active_processes = 0;
27 static std::unordered_map<int, std::vector<MPI_Request>*> reqq;
29 static MPI_Datatype MPI_DEFAULT_TYPE;
31 #define CHECK_ACTION_PARAMS(action, mandatory, optional) \
33 if (action.size() < static_cast<unsigned long>(mandatory + 2)) \
34 THROWF(arg_error, 0, "%s replay failed.\n" \
35 "%lu items were given on the line. First two should be process_id and action. " \
36 "This action needs after them %lu mandatory arguments, and accepts %lu optional ones. \n" \
37 "Please contact the Simgrid team if support is needed", \
38 __FUNCTION__, action.size(), static_cast<unsigned long>(mandatory), \
39 static_cast<unsigned long>(optional)); \
42 static void log_timed_action(simgrid::xbt::ReplayAction& action, double clock)
44 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
45 std::string s = boost::algorithm::join(action, " ");
46 XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
50 static std::vector<MPI_Request>* get_reqq_self()
52 return reqq.at(Actor::self()->getPid());
55 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
57 reqq.insert({Actor::self()->getPid(), mpi_request});
61 static double parse_double(std::string string)
63 return xbt_str_parse_double(string.c_str(), "%s is not a double");
70 class ActionArgParser {
72 virtual void parse(simgrid::xbt::ReplayAction& action){};
75 class SendRecvParser : public ActionArgParser {
79 MPI_Datatype datatype1 = MPI_DEFAULT_TYPE;
81 void parse(simgrid::xbt::ReplayAction& action) override
83 CHECK_ACTION_PARAMS(action, 2, 1)
84 partner = std::stoi(action[2]);
85 size = parse_double(action[3]);
86 if (action.size() > 4)
87 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
91 template <class T> class ReplayAction {
93 const std::string name;
97 * Used to compute the duration of this action.
104 explicit ReplayAction(std::string name)
105 : name(name), start_time(smpi_process()->simulated_elapsed()), my_proc_id(simgrid::s4u::Actor::self()->getPid())
109 virtual void execute(simgrid::xbt::ReplayAction& action)
113 log_timed_action(action, start_time);
116 virtual void kernel(simgrid::xbt::ReplayAction& action) = 0;
119 class WaitAction : public ReplayAction<ActionArgParser> {
121 WaitAction() : ReplayAction("Wait") {}
122 void kernel(simgrid::xbt::ReplayAction& action) override
124 CHECK_ACTION_PARAMS(action, 0, 0)
127 std::string s = boost::algorithm::join(action, " ");
128 xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
129 MPI_Request request = get_reqq_self()->back();
130 get_reqq_self()->pop_back();
132 if (request == nullptr) {
133 /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just
138 int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
140 MPI_Group group = request->comm()->group();
141 int src_traced = group->rank(request->src());
142 int dst_traced = group->rank(request->dst());
143 bool is_wait_for_receive = (request->flags() & RECV);
144 // TODO: Here we take the rank while we normally take the process id (look for my_proc_id)
145 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::NoOpTIData("wait"));
147 Request::wait(&request, &status);
149 TRACE_smpi_comm_out(rank);
150 if (is_wait_for_receive)
151 TRACE_smpi_recv(src_traced, dst_traced, 0);
155 class SendAction : public ReplayAction<SendRecvParser> {
157 SendAction() = delete;
158 SendAction(std::string name) : ReplayAction(name) {}
159 void kernel(simgrid::xbt::ReplayAction& action) override
161 int dst_traced = MPI_COMM_WORLD->group()->actor(args.partner)->getPid();
163 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
164 Datatype::encode(args.datatype1)));
165 if (not TRACE_smpi_view_internals())
166 TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, args.size * args.datatype1->size());
168 if (name == "send") {
169 Request::send(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
170 } else if (name == "Isend") {
171 MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
172 get_reqq_self()->push_back(request);
174 xbt_die("Don't know this action, %s", name.c_str());
177 TRACE_smpi_comm_out(my_proc_id);
181 class RecvAction : public ReplayAction<SendRecvParser> {
183 RecvAction() = delete;
184 explicit RecvAction(std::string name) : ReplayAction(name) {}
185 void kernel(simgrid::xbt::ReplayAction& action) override
187 int src_traced = MPI_COMM_WORLD->group()->actor(args.partner)->getPid();
189 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
190 Datatype::encode(args.datatype1)));
193 // unknown size from the receiver point of view
194 if (args.size <= 0.0) {
195 Request::probe(args.partner, 0, MPI_COMM_WORLD, &status);
196 args.size = status.count;
199 if (name == "recv") {
200 Request::recv(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD, &status);
201 } else if (name == "Irecv") {
202 MPI_Request request = Request::irecv(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
203 get_reqq_self()->push_back(request);
206 TRACE_smpi_comm_out(my_proc_id);
207 // TODO: Check why this was only activated in the "recv" case and not in the "Irecv" case
208 if (name == "recv" && not TRACE_smpi_view_internals()) {
209 TRACE_smpi_recv(src_traced, my_proc_id, 0);
214 } // Replay Namespace
216 static void action_init(simgrid::xbt::ReplayAction& action)
218 XBT_DEBUG("Initialize the counters");
219 CHECK_ACTION_PARAMS(action, 0, 1)
220 if (action.size() > 2)
221 MPI_DEFAULT_TYPE = MPI_DOUBLE; // default MPE datatype
223 MPI_DEFAULT_TYPE = MPI_BYTE; // default TAU datatype
225 /* start a simulated timer */
226 smpi_process()->simulated_start();
227 /*initialize the number of active processes */
228 active_processes = smpi_process_count();
230 set_reqq_self(new std::vector<MPI_Request>);
233 static void action_finalize(simgrid::xbt::ReplayAction& action)
238 static void action_comm_size(simgrid::xbt::ReplayAction& action)
240 communicator_size = parse_double(action[2]);
241 log_timed_action (action, smpi_process()->simulated_elapsed());
244 static void action_comm_split(simgrid::xbt::ReplayAction& action)
246 log_timed_action (action, smpi_process()->simulated_elapsed());
249 static void action_comm_dup(simgrid::xbt::ReplayAction& action)
251 log_timed_action (action, smpi_process()->simulated_elapsed());
254 static void action_compute(simgrid::xbt::ReplayAction& action)
256 CHECK_ACTION_PARAMS(action, 1, 0)
257 double clock = smpi_process()->simulated_elapsed();
258 double flops= parse_double(action[2]);
259 int my_proc_id = Actor::self()->getPid();
261 TRACE_smpi_computing_in(my_proc_id, flops);
262 smpi_execute_flops(flops);
263 TRACE_smpi_computing_out(my_proc_id);
265 log_timed_action (action, clock);
268 static void action_send(simgrid::xbt::ReplayAction& action)
270 Replay::SendAction("send").execute(action);
273 static void action_Isend(simgrid::xbt::ReplayAction& action)
275 Replay::SendAction("Isend").execute(action);
278 static void action_recv(simgrid::xbt::ReplayAction& action)
280 Replay::RecvAction("recv").execute(action);
283 static void action_Irecv(simgrid::xbt::ReplayAction& action)
285 Replay::RecvAction("Irecv").execute(action);
288 static void action_test(simgrid::xbt::ReplayAction& action)
290 CHECK_ACTION_PARAMS(action, 0, 0)
291 double clock = smpi_process()->simulated_elapsed();
294 MPI_Request request = get_reqq_self()->back();
295 get_reqq_self()->pop_back();
296 //if request is null here, this may mean that a previous test has succeeded
297 //Different times in traced application and replayed version may lead to this
298 //In this case, ignore the extra calls.
299 if(request!=nullptr){
300 int my_proc_id = Actor::self()->getPid();
301 TRACE_smpi_testing_in(my_proc_id);
303 int flag = Request::test(&request, &status);
305 XBT_DEBUG("MPI_Test result: %d", flag);
306 /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now nullptr.*/
307 get_reqq_self()->push_back(request);
309 TRACE_smpi_testing_out(my_proc_id);
311 log_timed_action (action, clock);
314 static void action_wait(simgrid::xbt::ReplayAction& action)
316 Replay::WaitAction().execute(action);
319 static void action_waitall(simgrid::xbt::ReplayAction& action)
321 CHECK_ACTION_PARAMS(action, 0, 0)
322 double clock = smpi_process()->simulated_elapsed();
323 const unsigned int count_requests = get_reqq_self()->size();
325 if (count_requests>0) {
326 MPI_Status status[count_requests];
328 int my_proc_id_traced = Actor::self()->getPid();
329 TRACE_smpi_comm_in(my_proc_id_traced, __FUNCTION__,
330 new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
331 int recvs_snd[count_requests];
332 int recvs_rcv[count_requests];
333 for (unsigned int i = 0; i < count_requests; i++) {
334 const auto& req = (*get_reqq_self())[i];
335 if (req && (req->flags() & RECV)) {
336 recvs_snd[i] = req->src();
337 recvs_rcv[i] = req->dst();
341 Request::waitall(count_requests, &(*get_reqq_self())[0], status);
343 for (unsigned i = 0; i < count_requests; i++) {
344 if (recvs_snd[i]!=-100)
345 TRACE_smpi_recv(recvs_snd[i], recvs_rcv[i],0);
347 TRACE_smpi_comm_out(my_proc_id_traced);
349 log_timed_action (action, clock);
352 static void action_barrier(simgrid::xbt::ReplayAction& action)
354 double clock = smpi_process()->simulated_elapsed();
355 int my_proc_id = Actor::self()->getPid();
356 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::NoOpTIData("barrier"));
358 Colls::barrier(MPI_COMM_WORLD);
360 TRACE_smpi_comm_out(my_proc_id);
361 log_timed_action (action, clock);
364 static void action_bcast(simgrid::xbt::ReplayAction& action)
366 CHECK_ACTION_PARAMS(action, 1, 2)
367 double size = parse_double(action[2]);
368 double clock = smpi_process()->simulated_elapsed();
369 int root = (action.size() > 3) ? std::stoi(action[3]) : 0;
370 /* Initialize MPI_CURRENT_TYPE in order to decrease the number of the checks */
371 MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE;
373 int my_proc_id = Actor::self()->getPid();
374 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
375 new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(root)->getPid(), -1.0, size,
376 -1, Datatype::encode(MPI_CURRENT_TYPE), ""));
378 void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
380 Colls::bcast(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
382 TRACE_smpi_comm_out(my_proc_id);
383 log_timed_action (action, clock);
386 static void action_reduce(simgrid::xbt::ReplayAction& action)
388 CHECK_ACTION_PARAMS(action, 2, 2)
389 double comm_size = parse_double(action[2]);
390 double comp_size = parse_double(action[3]);
391 double clock = smpi_process()->simulated_elapsed();
392 int root = (action.size() > 4) ? std::stoi(action[4]) : 0;
394 MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 5) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE;
396 int my_proc_id = Actor::self()->getPid();
397 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
398 new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(root)->getPid(), comp_size,
399 comm_size, -1, Datatype::encode(MPI_CURRENT_TYPE), ""));
401 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
402 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
403 Colls::reduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
404 smpi_execute_flops(comp_size);
406 TRACE_smpi_comm_out(my_proc_id);
407 log_timed_action (action, clock);
410 static void action_allReduce(simgrid::xbt::ReplayAction& action)
412 CHECK_ACTION_PARAMS(action, 2, 1)
413 double comm_size = parse_double(action[2]);
414 double comp_size = parse_double(action[3]);
416 MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE;
418 double clock = smpi_process()->simulated_elapsed();
419 int my_proc_id = Actor::self()->getPid();
420 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::CollTIData("allReduce", -1, comp_size, comm_size, -1,
421 Datatype::encode(MPI_CURRENT_TYPE), ""));
423 void *recvbuf = smpi_get_tmp_recvbuffer(comm_size* MPI_CURRENT_TYPE->size());
424 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
425 Colls::allreduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
426 smpi_execute_flops(comp_size);
428 TRACE_smpi_comm_out(my_proc_id);
429 log_timed_action (action, clock);
432 static void action_allToAll(simgrid::xbt::ReplayAction& action)
434 CHECK_ACTION_PARAMS(action, 2, 2) //two mandatory (send and recv volumes) and two optional (corresponding datatypes)
435 double clock = smpi_process()->simulated_elapsed();
436 unsigned long comm_size = MPI_COMM_WORLD->size();
437 int send_size = parse_double(action[2]);
438 int recv_size = parse_double(action[3]);
439 MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE};
440 MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
442 void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* MPI_CURRENT_TYPE->size());
443 void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
445 int my_proc_id = Actor::self()->getPid();
446 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
447 new simgrid::instr::CollTIData("allToAll", -1, -1.0, send_size, recv_size,
448 Datatype::encode(MPI_CURRENT_TYPE),
449 Datatype::encode(MPI_CURRENT_TYPE2)));
451 Colls::alltoall(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
453 TRACE_smpi_comm_out(my_proc_id);
454 log_timed_action (action, clock);
457 static void action_gather(simgrid::xbt::ReplayAction& action)
459 /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
462 1) 68 is the sendcounts
463 2) 68 is the recvcounts
464 3) 0 is the root node
465 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
466 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
468 CHECK_ACTION_PARAMS(action, 2, 3)
469 double clock = smpi_process()->simulated_elapsed();
470 unsigned long comm_size = MPI_COMM_WORLD->size();
471 int send_size = parse_double(action[2]);
472 int recv_size = parse_double(action[3]);
473 MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
474 MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[6]) : MPI_DEFAULT_TYPE};
476 void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
477 void *recv = nullptr;
478 int root = (action.size() > 4) ? std::stoi(action[4]) : 0;
479 int rank = MPI_COMM_WORLD->rank();
482 recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
484 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
485 Datatype::encode(MPI_CURRENT_TYPE),
486 Datatype::encode(MPI_CURRENT_TYPE2)));
488 Colls::gather(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
490 TRACE_smpi_comm_out(Actor::self()->getPid());
491 log_timed_action (action, clock);
494 static void action_scatter(simgrid::xbt::ReplayAction& action)
496 /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
499 1) 68 is the sendcounts
500 2) 68 is the recvcounts
501 3) 0 is the root node
502 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
503 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
505 CHECK_ACTION_PARAMS(action, 2, 3)
506 double clock = smpi_process()->simulated_elapsed();
507 unsigned long comm_size = MPI_COMM_WORLD->size();
508 int send_size = parse_double(action[2]);
509 int recv_size = parse_double(action[3]);
510 MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
511 MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[6]) : MPI_DEFAULT_TYPE};
513 void* send = smpi_get_tmp_sendbuffer(send_size * MPI_CURRENT_TYPE->size());
514 void* recv = nullptr;
515 int root = (action.size() > 4) ? std::stoi(action[4]) : 0;
516 int rank = MPI_COMM_WORLD->rank();
519 recv = smpi_get_tmp_recvbuffer(recv_size * comm_size * MPI_CURRENT_TYPE2->size());
521 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
522 Datatype::encode(MPI_CURRENT_TYPE),
523 Datatype::encode(MPI_CURRENT_TYPE2)));
525 Colls::scatter(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
527 TRACE_smpi_comm_out(Actor::self()->getPid());
528 log_timed_action(action, clock);
531 static void action_gatherv(simgrid::xbt::ReplayAction& action)
533 /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
534 0 gather 68 68 10 10 10 0 0 0
536 1) 68 is the sendcount
537 2) 68 10 10 10 is the recvcounts
538 3) 0 is the root node
539 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
540 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
542 double clock = smpi_process()->simulated_elapsed();
543 unsigned long comm_size = MPI_COMM_WORLD->size();
544 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
545 int send_size = parse_double(action[2]);
546 std::vector<int> disps(comm_size, 0);
547 std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
549 MPI_Datatype MPI_CURRENT_TYPE =
550 (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
551 MPI_Datatype MPI_CURRENT_TYPE2{
552 (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
554 void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
555 void *recv = nullptr;
556 for (unsigned int i = 0; i < comm_size; i++) {
557 (*recvcounts)[i] = std::stoi(action[i + 3]);
559 int recv_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
561 int root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
562 int rank = MPI_COMM_WORLD->rank();
565 recv = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
567 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData(
568 "gatherV", root, send_size, nullptr, -1, recvcounts,
569 Datatype::encode(MPI_CURRENT_TYPE), Datatype::encode(MPI_CURRENT_TYPE2)));
571 Colls::gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts->data(), disps.data(), MPI_CURRENT_TYPE2, root,
574 TRACE_smpi_comm_out(Actor::self()->getPid());
575 log_timed_action (action, clock);
578 static void action_scatterv(simgrid::xbt::ReplayAction& action)
580 /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
581 0 gather 68 10 10 10 68 0 0 0
583 1) 68 10 10 10 is the sendcounts
584 2) 68 is the recvcount
585 3) 0 is the root node
586 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
587 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
589 double clock = smpi_process()->simulated_elapsed();
590 unsigned long comm_size = MPI_COMM_WORLD->size();
591 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
592 int recv_size = parse_double(action[2 + comm_size]);
593 std::vector<int> disps(comm_size, 0);
594 std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
596 MPI_Datatype MPI_CURRENT_TYPE =
597 (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
598 MPI_Datatype MPI_CURRENT_TYPE2{
599 (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
601 void* send = nullptr;
602 void* recv = smpi_get_tmp_recvbuffer(recv_size * MPI_CURRENT_TYPE->size());
603 for (unsigned int i = 0; i < comm_size; i++) {
604 (*sendcounts)[i] = std::stoi(action[i + 2]);
606 int send_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
608 int root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
609 int rank = MPI_COMM_WORLD->rank();
612 send = smpi_get_tmp_sendbuffer(send_sum * MPI_CURRENT_TYPE2->size());
614 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData("gatherV", root, -1, sendcounts, recv_size,
615 nullptr, Datatype::encode(MPI_CURRENT_TYPE),
616 Datatype::encode(MPI_CURRENT_TYPE2)));
618 Colls::scatterv(send, sendcounts->data(), disps.data(), MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root,
621 TRACE_smpi_comm_out(Actor::self()->getPid());
622 log_timed_action(action, clock);
625 static void action_reducescatter(simgrid::xbt::ReplayAction& action)
627 /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
628 0 reduceScatter 275427 275427 275427 204020 11346849 0
630 1) The first four values after the name of the action declare the recvcounts array
631 2) The value 11346849 is the amount of instructions
632 3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
634 double clock = smpi_process()->simulated_elapsed();
635 unsigned long comm_size = MPI_COMM_WORLD->size();
636 CHECK_ACTION_PARAMS(action, comm_size+1, 1)
637 int comp_size = parse_double(action[2+comm_size]);
638 int my_proc_id = Actor::self()->getPid();
639 std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>);
640 MPI_Datatype MPI_CURRENT_TYPE =
641 (action.size() > 3 + comm_size) ? simgrid::smpi::Datatype::decode(action[3 + comm_size]) : MPI_DEFAULT_TYPE;
643 for (unsigned int i = 0; i < comm_size; i++) {
644 recvcounts->push_back(std::stoi(action[i + 2]));
646 int size{std::accumulate(recvcounts->begin(), recvcounts->end(), 0)};
648 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
649 new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, recvcounts,
650 std::to_string(comp_size), /* ugly hack to print comp_size */
651 Datatype::encode(MPI_CURRENT_TYPE)));
653 void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
654 void *recvbuf = smpi_get_tmp_recvbuffer(size* MPI_CURRENT_TYPE->size());
656 Colls::reduce_scatter(sendbuf, recvbuf, recvcounts->data(), MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
657 smpi_execute_flops(comp_size);
659 TRACE_smpi_comm_out(my_proc_id);
660 log_timed_action (action, clock);
663 static void action_allgather(simgrid::xbt::ReplayAction& action)
665 /* The structure of the allgather action for the rank 0 (total 4 processes) is the following:
666 0 allGather 275427 275427
668 1) 275427 is the sendcount
669 2) 275427 is the recvcount
670 3) No more values mean that the datatype for sent and receive buffer is the default one, see
671 simgrid::smpi::Datatype::decode().
673 double clock = smpi_process()->simulated_elapsed();
675 CHECK_ACTION_PARAMS(action, 2, 2)
676 int sendcount = std::stoi(action[2]);
677 int recvcount = std::stoi(action[3]);
679 MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE};
680 MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
682 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
683 void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* MPI_CURRENT_TYPE2->size());
685 int my_proc_id = Actor::self()->getPid();
687 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
688 new simgrid::instr::CollTIData("allGather", -1, -1.0, sendcount, recvcount,
689 Datatype::encode(MPI_CURRENT_TYPE),
690 Datatype::encode(MPI_CURRENT_TYPE2)));
692 Colls::allgather(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
694 TRACE_smpi_comm_out(my_proc_id);
695 log_timed_action (action, clock);
698 static void action_allgatherv(simgrid::xbt::ReplayAction& action)
700 /* The structure of the allgatherv action for the rank 0 (total 4 processes) is the following:
701 0 allGatherV 275427 275427 275427 275427 204020
703 1) 275427 is the sendcount
704 2) The next four elements declare the recvcounts array
705 3) No more values mean that the datatype for sent and receive buffer is the default one, see
706 simgrid::smpi::Datatype::decode().
708 double clock = smpi_process()->simulated_elapsed();
710 unsigned long comm_size = MPI_COMM_WORLD->size();
711 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
712 int sendcount = std::stoi(action[2]);
713 std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
714 std::vector<int> disps(comm_size, 0);
716 int datatype_index = 0, disp_index = 0;
717 if (action.size() > 3 + 2 * comm_size) { /* datatype + disp are specified */
718 datatype_index = 3 + comm_size;
719 disp_index = datatype_index + 1;
720 } else if (action.size() > 3 + 2 * comm_size) { /* disps specified; datatype is not specified; use the default one */
722 disp_index = 3 + comm_size;
723 } else if (action.size() > 3 + comm_size) { /* only datatype, no disp specified */
724 datatype_index = 3 + comm_size;
727 if (disp_index != 0) {
728 for (unsigned int i = 0; i < comm_size; i++)
729 disps[i] = std::stoi(action[disp_index + i]);
732 MPI_Datatype MPI_CURRENT_TYPE{(datatype_index > 0) ? simgrid::smpi::Datatype::decode(action[datatype_index])
734 MPI_Datatype MPI_CURRENT_TYPE2{(datatype_index > 0) ? simgrid::smpi::Datatype::decode(action[datatype_index])
737 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
739 for (unsigned int i = 0; i < comm_size; i++) {
740 (*recvcounts)[i] = std::stoi(action[i + 3]);
742 int recv_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
743 void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
745 int my_proc_id = Actor::self()->getPid();
747 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
748 new simgrid::instr::VarCollTIData("allGatherV", -1, sendcount, nullptr, -1, recvcounts,
749 Datatype::encode(MPI_CURRENT_TYPE),
750 Datatype::encode(MPI_CURRENT_TYPE2)));
752 Colls::allgatherv(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts->data(), disps.data(), MPI_CURRENT_TYPE2,
755 TRACE_smpi_comm_out(my_proc_id);
756 log_timed_action (action, clock);
759 static void action_allToAllv(simgrid::xbt::ReplayAction& action)
761 /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
762 0 allToAllV 100 1 7 10 12 100 1 70 10 5
764 1) 100 is the size of the send buffer *sizeof(int),
765 2) 1 7 10 12 is the sendcounts array
766 3) 100*sizeof(int) is the size of the receiver buffer
767 4) 1 70 10 5 is the recvcounts array
769 double clock = smpi_process()->simulated_elapsed();
771 unsigned long comm_size = MPI_COMM_WORLD->size();
772 CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
773 std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
774 std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
775 std::vector<int> senddisps(comm_size, 0);
776 std::vector<int> recvdisps(comm_size, 0);
778 MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 5 + 2 * comm_size)
779 ? simgrid::smpi::Datatype::decode(action[4 + 2 * comm_size])
781 MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5 + 2 * comm_size)
782 ? simgrid::smpi::Datatype::decode(action[5 + 2 * comm_size])
785 int send_buf_size=parse_double(action[2]);
786 int recv_buf_size=parse_double(action[3+comm_size]);
787 int my_proc_id = Actor::self()->getPid();
788 void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* MPI_CURRENT_TYPE->size());
789 void *recvbuf = smpi_get_tmp_recvbuffer(recv_buf_size* MPI_CURRENT_TYPE2->size());
791 for (unsigned int i = 0; i < comm_size; i++) {
792 (*sendcounts)[i] = std::stoi(action[3 + i]);
793 (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
795 int send_size = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
796 int recv_size = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
798 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
799 new simgrid::instr::VarCollTIData("allToAllV", -1, send_size, sendcounts, recv_size, recvcounts,
800 Datatype::encode(MPI_CURRENT_TYPE),
801 Datatype::encode(MPI_CURRENT_TYPE2)));
803 Colls::alltoallv(sendbuf, sendcounts->data(), senddisps.data(), MPI_CURRENT_TYPE, recvbuf, recvcounts->data(),
804 recvdisps.data(), MPI_CURRENT_TYPE, MPI_COMM_WORLD);
806 TRACE_smpi_comm_out(my_proc_id);
807 log_timed_action (action, clock);
810 }} // namespace simgrid::smpi
812 /** @brief Only initialize the replay, don't do it for real */
813 void smpi_replay_init(int* argc, char*** argv)
815 simgrid::smpi::Process::init(argc, argv);
816 smpi_process()->mark_as_initialized();
817 smpi_process()->set_replaying(true);
819 int my_proc_id = Actor::self()->getPid();
820 TRACE_smpi_init(my_proc_id);
821 TRACE_smpi_computing_init(my_proc_id);
822 TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
823 TRACE_smpi_comm_out(my_proc_id);
824 xbt_replay_action_register("init", simgrid::smpi::action_init);
825 xbt_replay_action_register("finalize", simgrid::smpi::action_finalize);
826 xbt_replay_action_register("comm_size", simgrid::smpi::action_comm_size);
827 xbt_replay_action_register("comm_split", simgrid::smpi::action_comm_split);
828 xbt_replay_action_register("comm_dup", simgrid::smpi::action_comm_dup);
829 xbt_replay_action_register("send", simgrid::smpi::action_send);
830 xbt_replay_action_register("Isend", simgrid::smpi::action_Isend);
831 xbt_replay_action_register("recv", simgrid::smpi::action_recv);
832 xbt_replay_action_register("Irecv", simgrid::smpi::action_Irecv);
833 xbt_replay_action_register("test", simgrid::smpi::action_test);
834 xbt_replay_action_register("wait", simgrid::smpi::action_wait);
835 xbt_replay_action_register("waitAll", simgrid::smpi::action_waitall);
836 xbt_replay_action_register("barrier", simgrid::smpi::action_barrier);
837 xbt_replay_action_register("bcast", simgrid::smpi::action_bcast);
838 xbt_replay_action_register("reduce", simgrid::smpi::action_reduce);
839 xbt_replay_action_register("allReduce", simgrid::smpi::action_allReduce);
840 xbt_replay_action_register("allToAll", simgrid::smpi::action_allToAll);
841 xbt_replay_action_register("allToAllV", simgrid::smpi::action_allToAllv);
842 xbt_replay_action_register("gather", simgrid::smpi::action_gather);
843 xbt_replay_action_register("scatter", simgrid::smpi::action_scatter);
844 xbt_replay_action_register("gatherV", simgrid::smpi::action_gatherv);
845 xbt_replay_action_register("scatterV", simgrid::smpi::action_scatterv);
846 xbt_replay_action_register("allGather", simgrid::smpi::action_allgather);
847 xbt_replay_action_register("allGatherV", simgrid::smpi::action_allgatherv);
848 xbt_replay_action_register("reduceScatter", simgrid::smpi::action_reducescatter);
849 xbt_replay_action_register("compute", simgrid::smpi::action_compute);
851 //if we have a delayed start, sleep here.
853 double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
854 XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
855 smpi_execute_flops(value);
857 //UGLY: force a context switch to be sure that all MSG_processes begin initialization
858 XBT_DEBUG("Force context switch by smpi_execute_flops - Sleeping for 0.0 flops ");
859 smpi_execute_flops(0.0);
863 /** @brief actually run the replay after initialization */
864 void smpi_replay_main(int* argc, char*** argv)
866 simgrid::xbt::replay_runner(*argc, *argv);
868 /* and now, finalize everything */
869 /* One active process will stop. Decrease the counter*/
870 XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
871 if (not get_reqq_self()->empty()) {
872 unsigned int count_requests=get_reqq_self()->size();
873 MPI_Request requests[count_requests];
874 MPI_Status status[count_requests];
877 for (auto const& req : *get_reqq_self()) {
881 simgrid::smpi::Request::waitall(count_requests, requests, status);
883 delete get_reqq_self();
886 if(active_processes==0){
887 /* Last process alive speaking: end the simulated timer */
888 XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
889 smpi_free_replay_tmp_buffers();
892 TRACE_smpi_comm_in(Actor::self()->getPid(), "smpi_replay_run_finalize", new simgrid::instr::NoOpTIData("finalize"));
894 smpi_process()->finalize();
896 TRACE_smpi_comm_out(Actor::self()->getPid());
897 TRACE_smpi_finalize(Actor::self()->getPid());
900 /** @brief chain a replay initialization and a replay start */
901 void smpi_replay_run(int* argc, char*** argv)
903 smpi_replay_init(argc, argv);
904 smpi_replay_main(argc, argv);