Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
[SMPI] Replay: C++-ify (I)Recv, (I)Send
[simgrid.git] / src / smpi / internals / smpi_replay.cpp
1 /* Copyright (c) 2009-2017. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "private.hpp"
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
14
15 #include <boost/algorithm/string/join.hpp>
16 #include <memory>
17 #include <numeric>
18 #include <unordered_map>
19 #include <vector>
20
21 using simgrid::s4u::Actor;
22
23 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
24
25 static int communicator_size = 0;
26 static int active_processes  = 0;
27 static std::unordered_map<int, std::vector<MPI_Request>*> reqq;
28
29 static MPI_Datatype MPI_DEFAULT_TYPE;
30
31 #define CHECK_ACTION_PARAMS(action, mandatory, optional)                                                               \
32   {                                                                                                                    \
33     if (action.size() < static_cast<unsigned long>(mandatory + 2))                                                     \
34       THROWF(arg_error, 0, "%s replay failed.\n"                                                                       \
35                            "%lu items were given on the line. First two should be process_id and action.  "            \
36                            "This action needs after them %lu mandatory arguments, and accepts %lu optional ones. \n"   \
37                            "Please contact the Simgrid team if support is needed",                                     \
38              __FUNCTION__, action.size(), static_cast<unsigned long>(mandatory),                                       \
39              static_cast<unsigned long>(optional));                                                                    \
40   }
41
42 static void log_timed_action(simgrid::xbt::ReplayAction& action, double clock)
43 {
44   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
45     std::string s = boost::algorithm::join(action, " ");
46     XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
47   }
48 }
49
50 static std::vector<MPI_Request>* get_reqq_self()
51 {
52   return reqq.at(Actor::self()->getPid());
53 }
54
55 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
56 {
57    reqq.insert({Actor::self()->getPid(), mpi_request});
58 }
59
60 /* Helper function */
61 static double parse_double(std::string string)
62 {
63   return xbt_str_parse_double(string.c_str(), "%s is not a double");
64 }
65
66 namespace simgrid {
67 namespace smpi {
68
69 namespace Replay {
70 class ActionArgParser {
71 public:
72   virtual void parse(simgrid::xbt::ReplayAction& action){};
73 };
74
75 class SendRecvParser : public ActionArgParser {
76 public:
77   int partner;
78   double size;
79   MPI_Datatype datatype1 = MPI_DEFAULT_TYPE;
80
81   void parse(simgrid::xbt::ReplayAction& action) override
82   {
83     CHECK_ACTION_PARAMS(action, 2, 1)
84     partner = std::stoi(action[2]);
85     size    = parse_double(action[3]);
86     if (action.size() > 4)
87       datatype1 = simgrid::smpi::Datatype::decode(action[4]);
88   }
89 };
90
91 template <class T> class ReplayAction {
92 protected:
93   const std::string name;
94   T args;
95
96   /*
97    * Used to compute the duration of this action.
98    */
99   double start_time;
100
101   int my_proc_id;
102
103 public:
104   explicit ReplayAction(std::string name)
105       : name(name), start_time(smpi_process()->simulated_elapsed()), my_proc_id(simgrid::s4u::Actor::self()->getPid())
106   {
107   }
108
109   virtual void execute(simgrid::xbt::ReplayAction& action)
110   {
111     args.parse(action);
112     kernel(action);
113     log_timed_action(action, start_time);
114   }
115
116   virtual void kernel(simgrid::xbt::ReplayAction& action) = 0;
117 };
118
119 class WaitAction : public ReplayAction<ActionArgParser> {
120 public:
121   WaitAction() : ReplayAction("Wait") {}
122   void kernel(simgrid::xbt::ReplayAction& action) override
123   {
124     CHECK_ACTION_PARAMS(action, 0, 0)
125     MPI_Status status;
126
127     std::string s = boost::algorithm::join(action, " ");
128     xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
129     MPI_Request request = get_reqq_self()->back();
130     get_reqq_self()->pop_back();
131
132     if (request == nullptr) {
133       /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just
134        * return.*/
135       return;
136     }
137
138     int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
139
140     MPI_Group group          = request->comm()->group();
141     int src_traced           = group->rank(request->src());
142     int dst_traced           = group->rank(request->dst());
143     bool is_wait_for_receive = (request->flags() & RECV);
144     // TODO: Here we take the rank while we normally take the process id (look for my_proc_id)
145     TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::NoOpTIData("wait"));
146
147     Request::wait(&request, &status);
148
149     TRACE_smpi_comm_out(rank);
150     if (is_wait_for_receive)
151       TRACE_smpi_recv(src_traced, dst_traced, 0);
152   }
153 };
154
155 class SendAction : public ReplayAction<SendRecvParser> {
156 public:
157   SendAction() = delete;
158   SendAction(std::string name) : ReplayAction(name) {}
159   void kernel(simgrid::xbt::ReplayAction& action) override
160   {
161     int dst_traced = MPI_COMM_WORLD->group()->actor(args.partner)->getPid();
162
163     TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
164                                                                                  Datatype::encode(args.datatype1)));
165     if (not TRACE_smpi_view_internals())
166       TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, args.size * args.datatype1->size());
167
168     if (name == "send") {
169       Request::send(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
170     } else if (name == "Isend") {
171       MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
172       get_reqq_self()->push_back(request);
173     } else {
174       xbt_die("Don't know this action, %s", name.c_str());
175     }
176
177     TRACE_smpi_comm_out(my_proc_id);
178   }
179 };
180
181 class RecvAction : public ReplayAction<SendRecvParser> {
182 public:
183   RecvAction() = delete;
184   explicit RecvAction(std::string name) : ReplayAction(name) {}
185   void kernel(simgrid::xbt::ReplayAction& action) override
186   {
187     int src_traced = MPI_COMM_WORLD->group()->actor(args.partner)->getPid();
188
189     TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
190                                                                                  Datatype::encode(args.datatype1)));
191
192     MPI_Status status;
193     // unknown size from the receiver point of view
194     if (args.size <= 0.0) {
195       Request::probe(args.partner, 0, MPI_COMM_WORLD, &status);
196       args.size = status.count;
197     }
198
199     if (name == "recv") {
200       Request::recv(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD, &status);
201     } else if (name == "Irecv") {
202       MPI_Request request = Request::irecv(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
203       get_reqq_self()->push_back(request);
204     }
205
206     TRACE_smpi_comm_out(my_proc_id);
207     // TODO: Check why this was only activated in the "recv" case and not in the "Irecv" case
208     if (name == "recv" && not TRACE_smpi_view_internals()) {
209       TRACE_smpi_recv(src_traced, my_proc_id, 0);
210     }
211   }
212 };
213
214 } // Replay Namespace
215
216 static void action_init(simgrid::xbt::ReplayAction& action)
217 {
218   XBT_DEBUG("Initialize the counters");
219   CHECK_ACTION_PARAMS(action, 0, 1)
220   if (action.size() > 2)
221     MPI_DEFAULT_TYPE = MPI_DOUBLE; // default MPE datatype
222   else
223     MPI_DEFAULT_TYPE = MPI_BYTE; // default TAU datatype
224
225   /* start a simulated timer */
226   smpi_process()->simulated_start();
227   /*initialize the number of active processes */
228   active_processes = smpi_process_count();
229
230   set_reqq_self(new std::vector<MPI_Request>);
231 }
232
233 static void action_finalize(simgrid::xbt::ReplayAction& action)
234 {
235   /* Nothing to do */
236 }
237
238 static void action_comm_size(simgrid::xbt::ReplayAction& action)
239 {
240   communicator_size = parse_double(action[2]);
241   log_timed_action (action, smpi_process()->simulated_elapsed());
242 }
243
244 static void action_comm_split(simgrid::xbt::ReplayAction& action)
245 {
246   log_timed_action (action, smpi_process()->simulated_elapsed());
247 }
248
249 static void action_comm_dup(simgrid::xbt::ReplayAction& action)
250 {
251   log_timed_action (action, smpi_process()->simulated_elapsed());
252 }
253
254 static void action_compute(simgrid::xbt::ReplayAction& action)
255 {
256   CHECK_ACTION_PARAMS(action, 1, 0)
257   double clock = smpi_process()->simulated_elapsed();
258   double flops= parse_double(action[2]);
259   int my_proc_id = Actor::self()->getPid();
260
261   TRACE_smpi_computing_in(my_proc_id, flops);
262   smpi_execute_flops(flops);
263   TRACE_smpi_computing_out(my_proc_id);
264
265   log_timed_action (action, clock);
266 }
267
268 static void action_send(simgrid::xbt::ReplayAction& action)
269 {
270   Replay::SendAction("send").execute(action);
271 }
272
273 static void action_Isend(simgrid::xbt::ReplayAction& action)
274 {
275   Replay::SendAction("Isend").execute(action);
276 }
277
278 static void action_recv(simgrid::xbt::ReplayAction& action)
279 {
280   Replay::RecvAction("recv").execute(action);
281 }
282
283 static void action_Irecv(simgrid::xbt::ReplayAction& action)
284 {
285   Replay::RecvAction("Irecv").execute(action);
286 }
287
288 static void action_test(simgrid::xbt::ReplayAction& action)
289 {
290   CHECK_ACTION_PARAMS(action, 0, 0)
291   double clock = smpi_process()->simulated_elapsed();
292   MPI_Status status;
293
294   MPI_Request request = get_reqq_self()->back();
295   get_reqq_self()->pop_back();
296   //if request is null here, this may mean that a previous test has succeeded
297   //Different times in traced application and replayed version may lead to this
298   //In this case, ignore the extra calls.
299   if(request!=nullptr){
300     int my_proc_id = Actor::self()->getPid();
301     TRACE_smpi_testing_in(my_proc_id);
302
303     int flag = Request::test(&request, &status);
304
305     XBT_DEBUG("MPI_Test result: %d", flag);
306     /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now nullptr.*/
307     get_reqq_self()->push_back(request);
308
309     TRACE_smpi_testing_out(my_proc_id);
310   }
311   log_timed_action (action, clock);
312 }
313
314 static void action_wait(simgrid::xbt::ReplayAction& action)
315 {
316   Replay::WaitAction().execute(action);
317 }
318
319 static void action_waitall(simgrid::xbt::ReplayAction& action)
320 {
321   CHECK_ACTION_PARAMS(action, 0, 0)
322   double clock = smpi_process()->simulated_elapsed();
323   const unsigned int count_requests = get_reqq_self()->size();
324
325   if (count_requests>0) {
326     MPI_Status status[count_requests];
327
328     int my_proc_id_traced = Actor::self()->getPid();
329     TRACE_smpi_comm_in(my_proc_id_traced, __FUNCTION__,
330                        new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
331     int recvs_snd[count_requests];
332     int recvs_rcv[count_requests];
333     for (unsigned int i = 0; i < count_requests; i++) {
334       const auto& req = (*get_reqq_self())[i];
335       if (req && (req->flags() & RECV)) {
336         recvs_snd[i] = req->src();
337         recvs_rcv[i] = req->dst();
338       } else
339         recvs_snd[i] = -100;
340    }
341    Request::waitall(count_requests, &(*get_reqq_self())[0], status);
342
343    for (unsigned i = 0; i < count_requests; i++) {
344      if (recvs_snd[i]!=-100)
345        TRACE_smpi_recv(recvs_snd[i], recvs_rcv[i],0);
346    }
347    TRACE_smpi_comm_out(my_proc_id_traced);
348   }
349   log_timed_action (action, clock);
350 }
351
352 static void action_barrier(simgrid::xbt::ReplayAction& action)
353 {
354   double clock = smpi_process()->simulated_elapsed();
355   int my_proc_id = Actor::self()->getPid();
356   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::NoOpTIData("barrier"));
357
358   Colls::barrier(MPI_COMM_WORLD);
359
360   TRACE_smpi_comm_out(my_proc_id);
361   log_timed_action (action, clock);
362 }
363
364 static void action_bcast(simgrid::xbt::ReplayAction& action)
365 {
366   CHECK_ACTION_PARAMS(action, 1, 2)
367   double size = parse_double(action[2]);
368   double clock = smpi_process()->simulated_elapsed();
369   int root     = (action.size() > 3) ? std::stoi(action[3]) : 0;
370   /* Initialize MPI_CURRENT_TYPE in order to decrease the number of the checks */
371   MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE;
372
373   int my_proc_id = Actor::self()->getPid();
374   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
375                      new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(root)->getPid(), -1.0, size,
376                                                     -1, Datatype::encode(MPI_CURRENT_TYPE), ""));
377
378   void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
379
380   Colls::bcast(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
381
382   TRACE_smpi_comm_out(my_proc_id);
383   log_timed_action (action, clock);
384 }
385
386 static void action_reduce(simgrid::xbt::ReplayAction& action)
387 {
388   CHECK_ACTION_PARAMS(action, 2, 2)
389   double comm_size = parse_double(action[2]);
390   double comp_size = parse_double(action[3]);
391   double clock = smpi_process()->simulated_elapsed();
392   int root         = (action.size() > 4) ? std::stoi(action[4]) : 0;
393
394   MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 5) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE;
395
396   int my_proc_id = Actor::self()->getPid();
397   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
398                      new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(root)->getPid(), comp_size,
399                                                     comm_size, -1, Datatype::encode(MPI_CURRENT_TYPE), ""));
400
401   void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
402   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
403   Colls::reduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
404   smpi_execute_flops(comp_size);
405
406   TRACE_smpi_comm_out(my_proc_id);
407   log_timed_action (action, clock);
408 }
409
410 static void action_allReduce(simgrid::xbt::ReplayAction& action)
411 {
412   CHECK_ACTION_PARAMS(action, 2, 1)
413   double comm_size = parse_double(action[2]);
414   double comp_size = parse_double(action[3]);
415
416   MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE;
417
418   double clock = smpi_process()->simulated_elapsed();
419   int my_proc_id = Actor::self()->getPid();
420   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::CollTIData("allReduce", -1, comp_size, comm_size, -1,
421                                                                               Datatype::encode(MPI_CURRENT_TYPE), ""));
422
423   void *recvbuf = smpi_get_tmp_recvbuffer(comm_size* MPI_CURRENT_TYPE->size());
424   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
425   Colls::allreduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
426   smpi_execute_flops(comp_size);
427
428   TRACE_smpi_comm_out(my_proc_id);
429   log_timed_action (action, clock);
430 }
431
432 static void action_allToAll(simgrid::xbt::ReplayAction& action)
433 {
434   CHECK_ACTION_PARAMS(action, 2, 2) //two mandatory (send and recv volumes) and two optional (corresponding datatypes)
435   double clock = smpi_process()->simulated_elapsed();
436   unsigned long comm_size = MPI_COMM_WORLD->size();
437   int send_size = parse_double(action[2]);
438   int recv_size = parse_double(action[3]);
439   MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE};
440   MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
441
442   void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* MPI_CURRENT_TYPE->size());
443   void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
444
445   int my_proc_id = Actor::self()->getPid();
446   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
447                      new simgrid::instr::CollTIData("allToAll", -1, -1.0, send_size, recv_size,
448                                                     Datatype::encode(MPI_CURRENT_TYPE),
449                                                     Datatype::encode(MPI_CURRENT_TYPE2)));
450
451   Colls::alltoall(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
452
453   TRACE_smpi_comm_out(my_proc_id);
454   log_timed_action (action, clock);
455 }
456
457 static void action_gather(simgrid::xbt::ReplayAction& action)
458 {
459   /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
460         0 gather 68 68 0 0 0
461       where:
462         1) 68 is the sendcounts
463         2) 68 is the recvcounts
464         3) 0 is the root node
465         4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
466         5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
467   */
468   CHECK_ACTION_PARAMS(action, 2, 3)
469   double clock = smpi_process()->simulated_elapsed();
470   unsigned long comm_size = MPI_COMM_WORLD->size();
471   int send_size = parse_double(action[2]);
472   int recv_size = parse_double(action[3]);
473   MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
474   MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[6]) : MPI_DEFAULT_TYPE};
475
476   void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
477   void *recv = nullptr;
478   int root   = (action.size() > 4) ? std::stoi(action[4]) : 0;
479   int rank = MPI_COMM_WORLD->rank();
480
481   if(rank==root)
482     recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
483
484   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
485                                                                         Datatype::encode(MPI_CURRENT_TYPE),
486                                                                         Datatype::encode(MPI_CURRENT_TYPE2)));
487
488   Colls::gather(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
489
490   TRACE_smpi_comm_out(Actor::self()->getPid());
491   log_timed_action (action, clock);
492 }
493
494 static void action_scatter(simgrid::xbt::ReplayAction& action)
495 {
496   /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
497         0 gather 68 68 0 0 0
498       where:
499         1) 68 is the sendcounts
500         2) 68 is the recvcounts
501         3) 0 is the root node
502         4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
503         5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
504   */
505   CHECK_ACTION_PARAMS(action, 2, 3)
506   double clock                   = smpi_process()->simulated_elapsed();
507   unsigned long comm_size        = MPI_COMM_WORLD->size();
508   int send_size                  = parse_double(action[2]);
509   int recv_size                  = parse_double(action[3]);
510   MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
511   MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[6]) : MPI_DEFAULT_TYPE};
512
513   void* send = smpi_get_tmp_sendbuffer(send_size * MPI_CURRENT_TYPE->size());
514   void* recv = nullptr;
515   int root   = (action.size() > 4) ? std::stoi(action[4]) : 0;
516   int rank = MPI_COMM_WORLD->rank();
517
518   if (rank == root)
519     recv = smpi_get_tmp_recvbuffer(recv_size * comm_size * MPI_CURRENT_TYPE2->size());
520
521   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
522                                                                         Datatype::encode(MPI_CURRENT_TYPE),
523                                                                         Datatype::encode(MPI_CURRENT_TYPE2)));
524
525   Colls::scatter(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
526
527   TRACE_smpi_comm_out(Actor::self()->getPid());
528   log_timed_action(action, clock);
529 }
530
531 static void action_gatherv(simgrid::xbt::ReplayAction& action)
532 {
533   /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
534        0 gather 68 68 10 10 10 0 0 0
535      where:
536        1) 68 is the sendcount
537        2) 68 10 10 10 is the recvcounts
538        3) 0 is the root node
539        4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
540        5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
541   */
542   double clock = smpi_process()->simulated_elapsed();
543   unsigned long comm_size = MPI_COMM_WORLD->size();
544   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
545   int send_size = parse_double(action[2]);
546   std::vector<int> disps(comm_size, 0);
547   std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
548
549   MPI_Datatype MPI_CURRENT_TYPE =
550       (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
551   MPI_Datatype MPI_CURRENT_TYPE2{
552       (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
553
554   void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
555   void *recv = nullptr;
556   for (unsigned int i = 0; i < comm_size; i++) {
557     (*recvcounts)[i] = std::stoi(action[i + 3]);
558   }
559   int recv_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
560
561   int root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
562   int rank = MPI_COMM_WORLD->rank();
563
564   if(rank==root)
565     recv = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
566
567   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData(
568                                              "gatherV", root, send_size, nullptr, -1, recvcounts,
569                                              Datatype::encode(MPI_CURRENT_TYPE), Datatype::encode(MPI_CURRENT_TYPE2)));
570
571   Colls::gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts->data(), disps.data(), MPI_CURRENT_TYPE2, root,
572                  MPI_COMM_WORLD);
573
574   TRACE_smpi_comm_out(Actor::self()->getPid());
575   log_timed_action (action, clock);
576 }
577
578 static void action_scatterv(simgrid::xbt::ReplayAction& action)
579 {
580   /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
581        0 gather 68 10 10 10 68 0 0 0
582      where:
583        1) 68 10 10 10 is the sendcounts
584        2) 68 is the recvcount
585        3) 0 is the root node
586        4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
587        5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
588   */
589   double clock  = smpi_process()->simulated_elapsed();
590   unsigned long comm_size = MPI_COMM_WORLD->size();
591   CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
592   int recv_size = parse_double(action[2 + comm_size]);
593   std::vector<int> disps(comm_size, 0);
594   std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
595
596   MPI_Datatype MPI_CURRENT_TYPE =
597       (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
598   MPI_Datatype MPI_CURRENT_TYPE2{
599       (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
600
601   void* send = nullptr;
602   void* recv = smpi_get_tmp_recvbuffer(recv_size * MPI_CURRENT_TYPE->size());
603   for (unsigned int i = 0; i < comm_size; i++) {
604     (*sendcounts)[i] = std::stoi(action[i + 2]);
605   }
606   int send_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
607
608   int root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
609   int rank = MPI_COMM_WORLD->rank();
610
611   if (rank == root)
612     send = smpi_get_tmp_sendbuffer(send_sum * MPI_CURRENT_TYPE2->size());
613
614   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData("gatherV", root, -1, sendcounts, recv_size,
615                                                                            nullptr, Datatype::encode(MPI_CURRENT_TYPE),
616                                                                            Datatype::encode(MPI_CURRENT_TYPE2)));
617
618   Colls::scatterv(send, sendcounts->data(), disps.data(), MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root,
619                   MPI_COMM_WORLD);
620
621   TRACE_smpi_comm_out(Actor::self()->getPid());
622   log_timed_action(action, clock);
623 }
624
625 static void action_reducescatter(simgrid::xbt::ReplayAction& action)
626 {
627   /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
628        0 reduceScatter 275427 275427 275427 204020 11346849 0
629      where:
630        1) The first four values after the name of the action declare the recvcounts array
631        2) The value 11346849 is the amount of instructions
632        3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
633  */
634   double clock = smpi_process()->simulated_elapsed();
635   unsigned long comm_size = MPI_COMM_WORLD->size();
636   CHECK_ACTION_PARAMS(action, comm_size+1, 1)
637   int comp_size = parse_double(action[2+comm_size]);
638   int my_proc_id                     = Actor::self()->getPid();
639   std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>);
640   MPI_Datatype MPI_CURRENT_TYPE =
641       (action.size() > 3 + comm_size) ? simgrid::smpi::Datatype::decode(action[3 + comm_size]) : MPI_DEFAULT_TYPE;
642
643   for (unsigned int i = 0; i < comm_size; i++) {
644     recvcounts->push_back(std::stoi(action[i + 2]));
645   }
646   int size{std::accumulate(recvcounts->begin(), recvcounts->end(), 0)};
647
648   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
649                      new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, recvcounts,
650                                                        std::to_string(comp_size), /* ugly hack to print comp_size */
651                                                        Datatype::encode(MPI_CURRENT_TYPE)));
652
653   void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
654   void *recvbuf = smpi_get_tmp_recvbuffer(size* MPI_CURRENT_TYPE->size());
655
656   Colls::reduce_scatter(sendbuf, recvbuf, recvcounts->data(), MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
657   smpi_execute_flops(comp_size);
658
659   TRACE_smpi_comm_out(my_proc_id);
660   log_timed_action (action, clock);
661 }
662
663 static void action_allgather(simgrid::xbt::ReplayAction& action)
664 {
665   /* The structure of the allgather action for the rank 0 (total 4 processes) is the following:
666         0 allGather 275427 275427
667     where:
668         1) 275427 is the sendcount
669         2) 275427 is the recvcount
670         3) No more values mean that the datatype for sent and receive buffer is the default one, see
671     simgrid::smpi::Datatype::decode().
672   */
673   double clock = smpi_process()->simulated_elapsed();
674
675   CHECK_ACTION_PARAMS(action, 2, 2)
676   int sendcount = std::stoi(action[2]);
677   int recvcount = std::stoi(action[3]);
678
679   MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE};
680   MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
681
682   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
683   void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* MPI_CURRENT_TYPE2->size());
684
685   int my_proc_id = Actor::self()->getPid();
686
687   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
688                      new simgrid::instr::CollTIData("allGather", -1, -1.0, sendcount, recvcount,
689                                                     Datatype::encode(MPI_CURRENT_TYPE),
690                                                     Datatype::encode(MPI_CURRENT_TYPE2)));
691
692   Colls::allgather(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
693
694   TRACE_smpi_comm_out(my_proc_id);
695   log_timed_action (action, clock);
696 }
697
698 static void action_allgatherv(simgrid::xbt::ReplayAction& action)
699 {
700   /* The structure of the allgatherv action for the rank 0 (total 4 processes) is the following:
701         0 allGatherV 275427 275427 275427 275427 204020
702      where:
703         1) 275427 is the sendcount
704         2) The next four elements declare the recvcounts array
705         3) No more values mean that the datatype for sent and receive buffer is the default one, see
706      simgrid::smpi::Datatype::decode().
707   */
708   double clock = smpi_process()->simulated_elapsed();
709
710   unsigned long comm_size = MPI_COMM_WORLD->size();
711   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
712   int sendcount = std::stoi(action[2]);
713   std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
714   std::vector<int> disps(comm_size, 0);
715
716   int datatype_index = 0, disp_index = 0;
717   if (action.size() > 3 + 2 * comm_size) { /* datatype + disp are specified */
718     datatype_index = 3 + comm_size;
719     disp_index     = datatype_index + 1;
720   } else if (action.size() > 3 + 2 * comm_size) { /* disps specified; datatype is not specified; use the default one */
721     datatype_index = -1;
722     disp_index     = 3 + comm_size;
723   } else if (action.size() > 3 + comm_size) { /* only datatype, no disp specified */
724     datatype_index = 3 + comm_size;
725   }
726
727   if (disp_index != 0) {
728     for (unsigned int i = 0; i < comm_size; i++)
729       disps[i]          = std::stoi(action[disp_index + i]);
730   }
731
732   MPI_Datatype MPI_CURRENT_TYPE{(datatype_index > 0) ? simgrid::smpi::Datatype::decode(action[datatype_index])
733                                                      : MPI_DEFAULT_TYPE};
734   MPI_Datatype MPI_CURRENT_TYPE2{(datatype_index > 0) ? simgrid::smpi::Datatype::decode(action[datatype_index])
735                                                       : MPI_DEFAULT_TYPE};
736
737   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
738
739   for (unsigned int i = 0; i < comm_size; i++) {
740     (*recvcounts)[i] = std::stoi(action[i + 3]);
741   }
742   int recv_sum  = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
743   void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
744
745   int my_proc_id = Actor::self()->getPid();
746
747   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
748                      new simgrid::instr::VarCollTIData("allGatherV", -1, sendcount, nullptr, -1, recvcounts,
749                                                        Datatype::encode(MPI_CURRENT_TYPE),
750                                                        Datatype::encode(MPI_CURRENT_TYPE2)));
751
752   Colls::allgatherv(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts->data(), disps.data(), MPI_CURRENT_TYPE2,
753                     MPI_COMM_WORLD);
754
755   TRACE_smpi_comm_out(my_proc_id);
756   log_timed_action (action, clock);
757 }
758
759 static void action_allToAllv(simgrid::xbt::ReplayAction& action)
760 {
761   /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
762         0 allToAllV 100 1 7 10 12 100 1 70 10 5
763      where:
764         1) 100 is the size of the send buffer *sizeof(int),
765         2) 1 7 10 12 is the sendcounts array
766         3) 100*sizeof(int) is the size of the receiver buffer
767         4)  1 70 10 5 is the recvcounts array
768   */
769   double clock = smpi_process()->simulated_elapsed();
770
771   unsigned long comm_size = MPI_COMM_WORLD->size();
772   CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
773   std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
774   std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
775   std::vector<int> senddisps(comm_size, 0);
776   std::vector<int> recvdisps(comm_size, 0);
777
778   MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 5 + 2 * comm_size)
779                                       ? simgrid::smpi::Datatype::decode(action[4 + 2 * comm_size])
780                                       : MPI_DEFAULT_TYPE;
781   MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5 + 2 * comm_size)
782                                      ? simgrid::smpi::Datatype::decode(action[5 + 2 * comm_size])
783                                      : MPI_DEFAULT_TYPE};
784
785   int send_buf_size=parse_double(action[2]);
786   int recv_buf_size=parse_double(action[3+comm_size]);
787   int my_proc_id = Actor::self()->getPid();
788   void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* MPI_CURRENT_TYPE->size());
789   void *recvbuf  = smpi_get_tmp_recvbuffer(recv_buf_size* MPI_CURRENT_TYPE2->size());
790
791   for (unsigned int i = 0; i < comm_size; i++) {
792     (*sendcounts)[i] = std::stoi(action[3 + i]);
793     (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
794   }
795   int send_size = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
796   int recv_size = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
797
798   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
799                      new simgrid::instr::VarCollTIData("allToAllV", -1, send_size, sendcounts, recv_size, recvcounts,
800                                                        Datatype::encode(MPI_CURRENT_TYPE),
801                                                        Datatype::encode(MPI_CURRENT_TYPE2)));
802
803   Colls::alltoallv(sendbuf, sendcounts->data(), senddisps.data(), MPI_CURRENT_TYPE, recvbuf, recvcounts->data(),
804                    recvdisps.data(), MPI_CURRENT_TYPE, MPI_COMM_WORLD);
805
806   TRACE_smpi_comm_out(my_proc_id);
807   log_timed_action (action, clock);
808 }
809
810 }} // namespace simgrid::smpi
811
812 /** @brief Only initialize the replay, don't do it for real */
813 void smpi_replay_init(int* argc, char*** argv)
814 {
815   simgrid::smpi::Process::init(argc, argv);
816   smpi_process()->mark_as_initialized();
817   smpi_process()->set_replaying(true);
818
819   int my_proc_id = Actor::self()->getPid();
820   TRACE_smpi_init(my_proc_id);
821   TRACE_smpi_computing_init(my_proc_id);
822   TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
823   TRACE_smpi_comm_out(my_proc_id);
824   xbt_replay_action_register("init",       simgrid::smpi::action_init);
825   xbt_replay_action_register("finalize",   simgrid::smpi::action_finalize);
826   xbt_replay_action_register("comm_size",  simgrid::smpi::action_comm_size);
827   xbt_replay_action_register("comm_split", simgrid::smpi::action_comm_split);
828   xbt_replay_action_register("comm_dup",   simgrid::smpi::action_comm_dup);
829   xbt_replay_action_register("send",       simgrid::smpi::action_send);
830   xbt_replay_action_register("Isend",      simgrid::smpi::action_Isend);
831   xbt_replay_action_register("recv",       simgrid::smpi::action_recv);
832   xbt_replay_action_register("Irecv",      simgrid::smpi::action_Irecv);
833   xbt_replay_action_register("test",       simgrid::smpi::action_test);
834   xbt_replay_action_register("wait",       simgrid::smpi::action_wait);
835   xbt_replay_action_register("waitAll",    simgrid::smpi::action_waitall);
836   xbt_replay_action_register("barrier",    simgrid::smpi::action_barrier);
837   xbt_replay_action_register("bcast",      simgrid::smpi::action_bcast);
838   xbt_replay_action_register("reduce",     simgrid::smpi::action_reduce);
839   xbt_replay_action_register("allReduce",  simgrid::smpi::action_allReduce);
840   xbt_replay_action_register("allToAll",   simgrid::smpi::action_allToAll);
841   xbt_replay_action_register("allToAllV",  simgrid::smpi::action_allToAllv);
842   xbt_replay_action_register("gather",     simgrid::smpi::action_gather);
843   xbt_replay_action_register("scatter", simgrid::smpi::action_scatter);
844   xbt_replay_action_register("gatherV",    simgrid::smpi::action_gatherv);
845   xbt_replay_action_register("scatterV", simgrid::smpi::action_scatterv);
846   xbt_replay_action_register("allGather",  simgrid::smpi::action_allgather);
847   xbt_replay_action_register("allGatherV", simgrid::smpi::action_allgatherv);
848   xbt_replay_action_register("reduceScatter",  simgrid::smpi::action_reducescatter);
849   xbt_replay_action_register("compute",    simgrid::smpi::action_compute);
850
851   //if we have a delayed start, sleep here.
852   if(*argc>2){
853     double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
854     XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
855     smpi_execute_flops(value);
856   } else {
857     //UGLY: force a context switch to be sure that all MSG_processes begin initialization
858     XBT_DEBUG("Force context switch by smpi_execute_flops  - Sleeping for 0.0 flops ");
859     smpi_execute_flops(0.0);
860   }
861 }
862
863 /** @brief actually run the replay after initialization */
864 void smpi_replay_main(int* argc, char*** argv)
865 {
866   simgrid::xbt::replay_runner(*argc, *argv);
867
868   /* and now, finalize everything */
869   /* One active process will stop. Decrease the counter*/
870   XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
871   if (not get_reqq_self()->empty()) {
872     unsigned int count_requests=get_reqq_self()->size();
873     MPI_Request requests[count_requests];
874     MPI_Status status[count_requests];
875     unsigned int i=0;
876
877     for (auto const& req : *get_reqq_self()) {
878       requests[i] = req;
879       i++;
880     }
881     simgrid::smpi::Request::waitall(count_requests, requests, status);
882   }
883   delete get_reqq_self();
884   active_processes--;
885
886   if(active_processes==0){
887     /* Last process alive speaking: end the simulated timer */
888     XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
889     smpi_free_replay_tmp_buffers();
890   }
891
892   TRACE_smpi_comm_in(Actor::self()->getPid(), "smpi_replay_run_finalize", new simgrid::instr::NoOpTIData("finalize"));
893
894   smpi_process()->finalize();
895
896   TRACE_smpi_comm_out(Actor::self()->getPid());
897   TRACE_smpi_finalize(Actor::self()->getPid());
898 }
899
900 /** @brief chain a replay initialization and a replay start */
901 void smpi_replay_run(int* argc, char*** argv)
902 {
903   smpi_replay_init(argc, argv);
904   smpi_replay_main(argc, argv);
905 }