Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
[SMPI] Replay: C++-ify TestAction.
[simgrid.git] / src / smpi / internals / smpi_replay.cpp
1 /* Copyright (c) 2009-2017. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "private.hpp"
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
14
15 #include <boost/algorithm/string/join.hpp>
16 #include <memory>
17 #include <numeric>
18 #include <unordered_map>
19 #include <vector>
20
21 using simgrid::s4u::Actor;
22
23 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
24
25 static int active_processes  = 0;
26 static std::unordered_map<int, std::vector<MPI_Request>*> reqq;
27
28 static MPI_Datatype MPI_DEFAULT_TYPE;
29
30 #define CHECK_ACTION_PARAMS(action, mandatory, optional)                                                               \
31   {                                                                                                                    \
32     if (action.size() < static_cast<unsigned long>(mandatory + 2))                                                     \
33       THROWF(arg_error, 0, "%s replay failed.\n"                                                                       \
34                            "%lu items were given on the line. First two should be process_id and action.  "            \
35                            "This action needs after them %lu mandatory arguments, and accepts %lu optional ones. \n"   \
36                            "Please contact the Simgrid team if support is needed",                                     \
37              __FUNCTION__, action.size(), static_cast<unsigned long>(mandatory),                                       \
38              static_cast<unsigned long>(optional));                                                                    \
39   }
40
41 static void log_timed_action(simgrid::xbt::ReplayAction& action, double clock)
42 {
43   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
44     std::string s = boost::algorithm::join(action, " ");
45     XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
46   }
47 }
48
49 static std::vector<MPI_Request>* get_reqq_self()
50 {
51   return reqq.at(Actor::self()->getPid());
52 }
53
54 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
55 {
56    reqq.insert({Actor::self()->getPid(), mpi_request});
57 }
58
59 /* Helper function */
60 static double parse_double(std::string string)
61 {
62   return xbt_str_parse_double(string.c_str(), "%s is not a double");
63 }
64
65 namespace simgrid {
66 namespace smpi {
67
68 namespace Replay {
69 class ActionArgParser {
70 public:
71   virtual void parse(simgrid::xbt::ReplayAction& action) { CHECK_ACTION_PARAMS(action, 0, 0) }
72 };
73
74 class SendRecvParser : public ActionArgParser {
75 public:
76   /* communication partner; if we send, this is the receiver and vice versa */
77   int partner;
78   double size;
79   MPI_Datatype datatype1 = MPI_DEFAULT_TYPE;
80
81   void parse(simgrid::xbt::ReplayAction& action) override
82   {
83     CHECK_ACTION_PARAMS(action, 2, 1)
84     partner = std::stoi(action[2]);
85     size    = parse_double(action[3]);
86     if (action.size() > 4)
87       datatype1 = simgrid::smpi::Datatype::decode(action[4]);
88   }
89 };
90
91 class ComputeParser : public ActionArgParser {
92 public:
93   /* communication partner; if we send, this is the receiver and vice versa */
94   double flops;
95
96   void parse(simgrid::xbt::ReplayAction& action) override
97   {
98     CHECK_ACTION_PARAMS(action, 1, 0)
99     flops = parse_double(action[2]);
100   }
101 };
102
103 template <class T> class ReplayAction {
104 protected:
105   const std::string name;
106   T args;
107
108   int my_proc_id;
109
110 public:
111   explicit ReplayAction(std::string name) : name(name), my_proc_id(simgrid::s4u::Actor::self()->getPid()) {}
112
113   virtual void execute(simgrid::xbt::ReplayAction& action)
114   {
115     // Needs to be re-initialized for every action, hence here
116     double start_time = smpi_process()->simulated_elapsed();
117     args.parse(action);
118     kernel(action);
119     log_timed_action(action, start_time);
120   }
121
122   virtual void kernel(simgrid::xbt::ReplayAction& action) = 0;
123 };
124
125 class WaitAction : public ReplayAction<ActionArgParser> {
126 public:
127   WaitAction() : ReplayAction("Wait") {}
128   void kernel(simgrid::xbt::ReplayAction& action) override
129   {
130     CHECK_ACTION_PARAMS(action, 0, 0)
131     MPI_Status status;
132
133     std::string s = boost::algorithm::join(action, " ");
134     xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
135     MPI_Request request = get_reqq_self()->back();
136     get_reqq_self()->pop_back();
137
138     if (request == nullptr) {
139       /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just
140        * return.*/
141       return;
142     }
143
144     int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
145
146     MPI_Group group          = request->comm()->group();
147     int src_traced           = group->rank(request->src());
148     int dst_traced           = group->rank(request->dst());
149     bool is_wait_for_receive = (request->flags() & RECV);
150     // TODO: Here we take the rank while we normally take the process id (look for my_proc_id)
151     TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::NoOpTIData("wait"));
152
153     Request::wait(&request, &status);
154
155     TRACE_smpi_comm_out(rank);
156     if (is_wait_for_receive)
157       TRACE_smpi_recv(src_traced, dst_traced, 0);
158   }
159 };
160
161 class SendAction : public ReplayAction<SendRecvParser> {
162 public:
163   SendAction() = delete;
164   SendAction(std::string name) : ReplayAction(name) {}
165   void kernel(simgrid::xbt::ReplayAction& action) override
166   {
167     int dst_traced = MPI_COMM_WORLD->group()->actor(args.partner)->getPid();
168
169     TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
170                                                                                  Datatype::encode(args.datatype1)));
171     if (not TRACE_smpi_view_internals())
172       TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, args.size * args.datatype1->size());
173
174     if (name == "send") {
175       Request::send(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
176     } else if (name == "Isend") {
177       MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
178       get_reqq_self()->push_back(request);
179     } else {
180       xbt_die("Don't know this action, %s", name.c_str());
181     }
182
183     TRACE_smpi_comm_out(my_proc_id);
184   }
185 };
186
187 class RecvAction : public ReplayAction<SendRecvParser> {
188 public:
189   RecvAction() = delete;
190   explicit RecvAction(std::string name) : ReplayAction(name) {}
191   void kernel(simgrid::xbt::ReplayAction& action) override
192   {
193     int src_traced = MPI_COMM_WORLD->group()->actor(args.partner)->getPid();
194
195     TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
196                                                                                  Datatype::encode(args.datatype1)));
197
198     MPI_Status status;
199     // unknown size from the receiver point of view
200     if (args.size <= 0.0) {
201       Request::probe(args.partner, 0, MPI_COMM_WORLD, &status);
202       args.size = status.count;
203     }
204
205     if (name == "recv") {
206       Request::recv(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD, &status);
207     } else if (name == "Irecv") {
208       MPI_Request request = Request::irecv(nullptr, args.size, args.datatype1, args.partner, 0, MPI_COMM_WORLD);
209       get_reqq_self()->push_back(request);
210     }
211
212     TRACE_smpi_comm_out(my_proc_id);
213     // TODO: Check why this was only activated in the "recv" case and not in the "Irecv" case
214     if (name == "recv" && not TRACE_smpi_view_internals()) {
215       TRACE_smpi_recv(src_traced, my_proc_id, 0);
216     }
217   }
218 };
219
220 class ComputeAction : public ReplayAction<ComputeParser> {
221 public:
222   ComputeAction() : ReplayAction("compute") {}
223   void kernel(simgrid::xbt::ReplayAction& action) override
224   {
225     TRACE_smpi_computing_in(my_proc_id, args.flops);
226     smpi_execute_flops(args.flops);
227     TRACE_smpi_computing_out(my_proc_id);
228   }
229 };
230
231 class TestAction : public ReplayAction<ActionArgParser> {
232 public:
233   TestAction() : ReplayAction("Test") {}
234   void kernel(simgrid::xbt::ReplayAction& action) override
235   {
236     MPI_Request request = get_reqq_self()->back();
237     get_reqq_self()->pop_back();
238     // if request is null here, this may mean that a previous test has succeeded
239     // Different times in traced application and replayed version may lead to this
240     // In this case, ignore the extra calls.
241     if (request != nullptr) {
242       TRACE_smpi_testing_in(my_proc_id);
243
244       MPI_Status status;
245       int flag = Request::test(&request, &status);
246
247       XBT_DEBUG("MPI_Test result: %d", flag);
248       /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
249        * nullptr.*/
250       get_reqq_self()->push_back(request);
251
252       TRACE_smpi_testing_out(my_proc_id);
253     }
254   }
255 };
256
257 } // Replay Namespace
258
259 static void action_init(simgrid::xbt::ReplayAction& action)
260 {
261   XBT_DEBUG("Initialize the counters");
262   CHECK_ACTION_PARAMS(action, 0, 1)
263   if (action.size() > 2)
264     MPI_DEFAULT_TYPE = MPI_DOUBLE; // default MPE datatype
265   else
266     MPI_DEFAULT_TYPE = MPI_BYTE; // default TAU datatype
267
268   /* start a simulated timer */
269   smpi_process()->simulated_start();
270   /*initialize the number of active processes */
271   active_processes = smpi_process_count();
272
273   set_reqq_self(new std::vector<MPI_Request>);
274 }
275
276 static void action_finalize(simgrid::xbt::ReplayAction& action)
277 {
278   /* Nothing to do */
279 }
280
281 static void action_comm_size(simgrid::xbt::ReplayAction& action)
282 {
283   log_timed_action (action, smpi_process()->simulated_elapsed());
284 }
285
286 static void action_comm_split(simgrid::xbt::ReplayAction& action)
287 {
288   log_timed_action (action, smpi_process()->simulated_elapsed());
289 }
290
291 static void action_comm_dup(simgrid::xbt::ReplayAction& action)
292 {
293   log_timed_action (action, smpi_process()->simulated_elapsed());
294 }
295
296 static void action_compute(simgrid::xbt::ReplayAction& action)
297 {
298   Replay::ComputeAction().execute(action);
299 }
300
301 static void action_test(simgrid::xbt::ReplayAction& action)
302 {
303   CHECK_ACTION_PARAMS(action, 0, 0)
304   double clock = smpi_process()->simulated_elapsed();
305   MPI_Status status;
306
307   MPI_Request request = get_reqq_self()->back();
308   get_reqq_self()->pop_back();
309   //if request is null here, this may mean that a previous test has succeeded
310   //Different times in traced application and replayed version may lead to this
311   //In this case, ignore the extra calls.
312   if(request!=nullptr){
313     int my_proc_id = Actor::self()->getPid();
314     TRACE_smpi_testing_in(my_proc_id);
315
316     int flag = Request::test(&request, &status);
317
318     XBT_DEBUG("MPI_Test result: %d", flag);
319     /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now nullptr.*/
320     get_reqq_self()->push_back(request);
321
322     TRACE_smpi_testing_out(my_proc_id);
323   }
324   log_timed_action (action, clock);
325 }
326
327 static void action_waitall(simgrid::xbt::ReplayAction& action)
328 {
329   CHECK_ACTION_PARAMS(action, 0, 0)
330   double clock = smpi_process()->simulated_elapsed();
331   const unsigned int count_requests = get_reqq_self()->size();
332
333   if (count_requests>0) {
334     MPI_Status status[count_requests];
335
336     int my_proc_id_traced = Actor::self()->getPid();
337     TRACE_smpi_comm_in(my_proc_id_traced, __FUNCTION__,
338                        new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
339     int recvs_snd[count_requests];
340     int recvs_rcv[count_requests];
341     for (unsigned int i = 0; i < count_requests; i++) {
342       const auto& req = (*get_reqq_self())[i];
343       if (req && (req->flags() & RECV)) {
344         recvs_snd[i] = req->src();
345         recvs_rcv[i] = req->dst();
346       } else
347         recvs_snd[i] = -100;
348    }
349    Request::waitall(count_requests, &(*get_reqq_self())[0], status);
350
351    for (unsigned i = 0; i < count_requests; i++) {
352      if (recvs_snd[i]!=-100)
353        TRACE_smpi_recv(recvs_snd[i], recvs_rcv[i],0);
354    }
355    TRACE_smpi_comm_out(my_proc_id_traced);
356   }
357   log_timed_action (action, clock);
358 }
359
360 static void action_barrier(simgrid::xbt::ReplayAction& action)
361 {
362   double clock = smpi_process()->simulated_elapsed();
363   int my_proc_id = Actor::self()->getPid();
364   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::NoOpTIData("barrier"));
365
366   Colls::barrier(MPI_COMM_WORLD);
367
368   TRACE_smpi_comm_out(my_proc_id);
369   log_timed_action (action, clock);
370 }
371
372 static void action_bcast(simgrid::xbt::ReplayAction& action)
373 {
374   CHECK_ACTION_PARAMS(action, 1, 2)
375   double size = parse_double(action[2]);
376   double clock = smpi_process()->simulated_elapsed();
377   int root     = (action.size() > 3) ? std::stoi(action[3]) : 0;
378   /* Initialize MPI_CURRENT_TYPE in order to decrease the number of the checks */
379   MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE;
380
381   int my_proc_id = Actor::self()->getPid();
382   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
383                      new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(root)->getPid(), -1.0, size,
384                                                     -1, Datatype::encode(MPI_CURRENT_TYPE), ""));
385
386   void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
387
388   Colls::bcast(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
389
390   TRACE_smpi_comm_out(my_proc_id);
391   log_timed_action (action, clock);
392 }
393
394 static void action_reduce(simgrid::xbt::ReplayAction& action)
395 {
396   CHECK_ACTION_PARAMS(action, 2, 2)
397   double comm_size = parse_double(action[2]);
398   double comp_size = parse_double(action[3]);
399   double clock = smpi_process()->simulated_elapsed();
400   int root         = (action.size() > 4) ? std::stoi(action[4]) : 0;
401
402   MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 5) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE;
403
404   int my_proc_id = Actor::self()->getPid();
405   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
406                      new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(root)->getPid(), comp_size,
407                                                     comm_size, -1, Datatype::encode(MPI_CURRENT_TYPE), ""));
408
409   void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
410   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
411   Colls::reduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
412   smpi_execute_flops(comp_size);
413
414   TRACE_smpi_comm_out(my_proc_id);
415   log_timed_action (action, clock);
416 }
417
418 static void action_allReduce(simgrid::xbt::ReplayAction& action)
419 {
420   CHECK_ACTION_PARAMS(action, 2, 1)
421   double comm_size = parse_double(action[2]);
422   double comp_size = parse_double(action[3]);
423
424   MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE;
425
426   double clock = smpi_process()->simulated_elapsed();
427   int my_proc_id = Actor::self()->getPid();
428   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::CollTIData("allReduce", -1, comp_size, comm_size, -1,
429                                                                               Datatype::encode(MPI_CURRENT_TYPE), ""));
430
431   void *recvbuf = smpi_get_tmp_recvbuffer(comm_size* MPI_CURRENT_TYPE->size());
432   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
433   Colls::allreduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
434   smpi_execute_flops(comp_size);
435
436   TRACE_smpi_comm_out(my_proc_id);
437   log_timed_action (action, clock);
438 }
439
440 static void action_allToAll(simgrid::xbt::ReplayAction& action)
441 {
442   CHECK_ACTION_PARAMS(action, 2, 2) //two mandatory (send and recv volumes) and two optional (corresponding datatypes)
443   double clock = smpi_process()->simulated_elapsed();
444   unsigned long comm_size = MPI_COMM_WORLD->size();
445   int send_size = parse_double(action[2]);
446   int recv_size = parse_double(action[3]);
447   MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE};
448   MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
449
450   void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* MPI_CURRENT_TYPE->size());
451   void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
452
453   int my_proc_id = Actor::self()->getPid();
454   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
455                      new simgrid::instr::CollTIData("allToAll", -1, -1.0, send_size, recv_size,
456                                                     Datatype::encode(MPI_CURRENT_TYPE),
457                                                     Datatype::encode(MPI_CURRENT_TYPE2)));
458
459   Colls::alltoall(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
460
461   TRACE_smpi_comm_out(my_proc_id);
462   log_timed_action (action, clock);
463 }
464
465 static void action_gather(simgrid::xbt::ReplayAction& action)
466 {
467   /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
468         0 gather 68 68 0 0 0
469       where:
470         1) 68 is the sendcounts
471         2) 68 is the recvcounts
472         3) 0 is the root node
473         4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
474         5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
475   */
476   CHECK_ACTION_PARAMS(action, 2, 3)
477   double clock = smpi_process()->simulated_elapsed();
478   unsigned long comm_size = MPI_COMM_WORLD->size();
479   int send_size = parse_double(action[2]);
480   int recv_size = parse_double(action[3]);
481   MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
482   MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[6]) : MPI_DEFAULT_TYPE};
483
484   void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
485   void *recv = nullptr;
486   int root   = (action.size() > 4) ? std::stoi(action[4]) : 0;
487   int rank = MPI_COMM_WORLD->rank();
488
489   if(rank==root)
490     recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
491
492   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
493                                                                         Datatype::encode(MPI_CURRENT_TYPE),
494                                                                         Datatype::encode(MPI_CURRENT_TYPE2)));
495
496   Colls::gather(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
497
498   TRACE_smpi_comm_out(Actor::self()->getPid());
499   log_timed_action (action, clock);
500 }
501
502 static void action_scatter(simgrid::xbt::ReplayAction& action)
503 {
504   /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
505         0 gather 68 68 0 0 0
506       where:
507         1) 68 is the sendcounts
508         2) 68 is the recvcounts
509         3) 0 is the root node
510         4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
511         5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
512   */
513   CHECK_ACTION_PARAMS(action, 2, 3)
514   double clock                   = smpi_process()->simulated_elapsed();
515   unsigned long comm_size        = MPI_COMM_WORLD->size();
516   int send_size                  = parse_double(action[2]);
517   int recv_size                  = parse_double(action[3]);
518   MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
519   MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[6]) : MPI_DEFAULT_TYPE};
520
521   void* send = smpi_get_tmp_sendbuffer(send_size * MPI_CURRENT_TYPE->size());
522   void* recv = nullptr;
523   int root   = (action.size() > 4) ? std::stoi(action[4]) : 0;
524   int rank = MPI_COMM_WORLD->rank();
525
526   if (rank == root)
527     recv = smpi_get_tmp_recvbuffer(recv_size * comm_size * MPI_CURRENT_TYPE2->size());
528
529   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
530                                                                         Datatype::encode(MPI_CURRENT_TYPE),
531                                                                         Datatype::encode(MPI_CURRENT_TYPE2)));
532
533   Colls::scatter(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
534
535   TRACE_smpi_comm_out(Actor::self()->getPid());
536   log_timed_action(action, clock);
537 }
538
539 static void action_gatherv(simgrid::xbt::ReplayAction& action)
540 {
541   /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
542        0 gather 68 68 10 10 10 0 0 0
543      where:
544        1) 68 is the sendcount
545        2) 68 10 10 10 is the recvcounts
546        3) 0 is the root node
547        4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
548        5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
549   */
550   double clock = smpi_process()->simulated_elapsed();
551   unsigned long comm_size = MPI_COMM_WORLD->size();
552   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
553   int send_size = parse_double(action[2]);
554   std::vector<int> disps(comm_size, 0);
555   std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
556
557   MPI_Datatype MPI_CURRENT_TYPE =
558       (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
559   MPI_Datatype MPI_CURRENT_TYPE2{
560       (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
561
562   void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
563   void *recv = nullptr;
564   for (unsigned int i = 0; i < comm_size; i++) {
565     (*recvcounts)[i] = std::stoi(action[i + 3]);
566   }
567   int recv_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
568
569   int root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
570   int rank = MPI_COMM_WORLD->rank();
571
572   if(rank==root)
573     recv = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
574
575   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData(
576                                              "gatherV", root, send_size, nullptr, -1, recvcounts,
577                                              Datatype::encode(MPI_CURRENT_TYPE), Datatype::encode(MPI_CURRENT_TYPE2)));
578
579   Colls::gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts->data(), disps.data(), MPI_CURRENT_TYPE2, root,
580                  MPI_COMM_WORLD);
581
582   TRACE_smpi_comm_out(Actor::self()->getPid());
583   log_timed_action (action, clock);
584 }
585
586 static void action_scatterv(simgrid::xbt::ReplayAction& action)
587 {
588   /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
589        0 gather 68 10 10 10 68 0 0 0
590      where:
591        1) 68 10 10 10 is the sendcounts
592        2) 68 is the recvcount
593        3) 0 is the root node
594        4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
595        5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
596   */
597   double clock  = smpi_process()->simulated_elapsed();
598   unsigned long comm_size = MPI_COMM_WORLD->size();
599   CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
600   int recv_size = parse_double(action[2 + comm_size]);
601   std::vector<int> disps(comm_size, 0);
602   std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
603
604   MPI_Datatype MPI_CURRENT_TYPE =
605       (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
606   MPI_Datatype MPI_CURRENT_TYPE2{
607       (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
608
609   void* send = nullptr;
610   void* recv = smpi_get_tmp_recvbuffer(recv_size * MPI_CURRENT_TYPE->size());
611   for (unsigned int i = 0; i < comm_size; i++) {
612     (*sendcounts)[i] = std::stoi(action[i + 2]);
613   }
614   int send_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
615
616   int root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
617   int rank = MPI_COMM_WORLD->rank();
618
619   if (rank == root)
620     send = smpi_get_tmp_sendbuffer(send_sum * MPI_CURRENT_TYPE2->size());
621
622   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData("gatherV", root, -1, sendcounts, recv_size,
623                                                                            nullptr, Datatype::encode(MPI_CURRENT_TYPE),
624                                                                            Datatype::encode(MPI_CURRENT_TYPE2)));
625
626   Colls::scatterv(send, sendcounts->data(), disps.data(), MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root,
627                   MPI_COMM_WORLD);
628
629   TRACE_smpi_comm_out(Actor::self()->getPid());
630   log_timed_action(action, clock);
631 }
632
633 static void action_reducescatter(simgrid::xbt::ReplayAction& action)
634 {
635   /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
636        0 reduceScatter 275427 275427 275427 204020 11346849 0
637      where:
638        1) The first four values after the name of the action declare the recvcounts array
639        2) The value 11346849 is the amount of instructions
640        3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
641  */
642   double clock = smpi_process()->simulated_elapsed();
643   unsigned long comm_size = MPI_COMM_WORLD->size();
644   CHECK_ACTION_PARAMS(action, comm_size+1, 1)
645   int comp_size = parse_double(action[2+comm_size]);
646   int my_proc_id                     = Actor::self()->getPid();
647   std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>);
648   MPI_Datatype MPI_CURRENT_TYPE =
649       (action.size() > 3 + comm_size) ? simgrid::smpi::Datatype::decode(action[3 + comm_size]) : MPI_DEFAULT_TYPE;
650
651   for (unsigned int i = 0; i < comm_size; i++) {
652     recvcounts->push_back(std::stoi(action[i + 2]));
653   }
654   int size{std::accumulate(recvcounts->begin(), recvcounts->end(), 0)};
655
656   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
657                      new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, recvcounts,
658                                                        std::to_string(comp_size), /* ugly hack to print comp_size */
659                                                        Datatype::encode(MPI_CURRENT_TYPE)));
660
661   void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
662   void *recvbuf = smpi_get_tmp_recvbuffer(size* MPI_CURRENT_TYPE->size());
663
664   Colls::reduce_scatter(sendbuf, recvbuf, recvcounts->data(), MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
665   smpi_execute_flops(comp_size);
666
667   TRACE_smpi_comm_out(my_proc_id);
668   log_timed_action (action, clock);
669 }
670
671 static void action_allgather(simgrid::xbt::ReplayAction& action)
672 {
673   /* The structure of the allgather action for the rank 0 (total 4 processes) is the following:
674         0 allGather 275427 275427
675     where:
676         1) 275427 is the sendcount
677         2) 275427 is the recvcount
678         3) No more values mean that the datatype for sent and receive buffer is the default one, see
679     simgrid::smpi::Datatype::decode().
680   */
681   double clock = smpi_process()->simulated_elapsed();
682
683   CHECK_ACTION_PARAMS(action, 2, 2)
684   int sendcount = std::stoi(action[2]);
685   int recvcount = std::stoi(action[3]);
686
687   MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE};
688   MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
689
690   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
691   void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* MPI_CURRENT_TYPE2->size());
692
693   int my_proc_id = Actor::self()->getPid();
694
695   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
696                      new simgrid::instr::CollTIData("allGather", -1, -1.0, sendcount, recvcount,
697                                                     Datatype::encode(MPI_CURRENT_TYPE),
698                                                     Datatype::encode(MPI_CURRENT_TYPE2)));
699
700   Colls::allgather(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
701
702   TRACE_smpi_comm_out(my_proc_id);
703   log_timed_action (action, clock);
704 }
705
706 static void action_allgatherv(simgrid::xbt::ReplayAction& action)
707 {
708   /* The structure of the allgatherv action for the rank 0 (total 4 processes) is the following:
709         0 allGatherV 275427 275427 275427 275427 204020
710      where:
711         1) 275427 is the sendcount
712         2) The next four elements declare the recvcounts array
713         3) No more values mean that the datatype for sent and receive buffer is the default one, see
714      simgrid::smpi::Datatype::decode().
715   */
716   double clock = smpi_process()->simulated_elapsed();
717
718   unsigned long comm_size = MPI_COMM_WORLD->size();
719   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
720   int sendcount = std::stoi(action[2]);
721   std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
722   std::vector<int> disps(comm_size, 0);
723
724   int datatype_index = 0, disp_index = 0;
725   if (action.size() > 3 + 2 * comm_size) { /* datatype + disp are specified */
726     datatype_index = 3 + comm_size;
727     disp_index     = datatype_index + 1;
728   } else if (action.size() > 3 + 2 * comm_size) { /* disps specified; datatype is not specified; use the default one */
729     datatype_index = -1;
730     disp_index     = 3 + comm_size;
731   } else if (action.size() > 3 + comm_size) { /* only datatype, no disp specified */
732     datatype_index = 3 + comm_size;
733   }
734
735   if (disp_index != 0) {
736     for (unsigned int i = 0; i < comm_size; i++)
737       disps[i]          = std::stoi(action[disp_index + i]);
738   }
739
740   MPI_Datatype MPI_CURRENT_TYPE{(datatype_index > 0) ? simgrid::smpi::Datatype::decode(action[datatype_index])
741                                                      : MPI_DEFAULT_TYPE};
742   MPI_Datatype MPI_CURRENT_TYPE2{(datatype_index > 0) ? simgrid::smpi::Datatype::decode(action[datatype_index])
743                                                       : MPI_DEFAULT_TYPE};
744
745   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
746
747   for (unsigned int i = 0; i < comm_size; i++) {
748     (*recvcounts)[i] = std::stoi(action[i + 3]);
749   }
750   int recv_sum  = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
751   void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
752
753   int my_proc_id = Actor::self()->getPid();
754
755   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
756                      new simgrid::instr::VarCollTIData("allGatherV", -1, sendcount, nullptr, -1, recvcounts,
757                                                        Datatype::encode(MPI_CURRENT_TYPE),
758                                                        Datatype::encode(MPI_CURRENT_TYPE2)));
759
760   Colls::allgatherv(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts->data(), disps.data(), MPI_CURRENT_TYPE2,
761                     MPI_COMM_WORLD);
762
763   TRACE_smpi_comm_out(my_proc_id);
764   log_timed_action (action, clock);
765 }
766
767 static void action_allToAllv(simgrid::xbt::ReplayAction& action)
768 {
769   /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
770         0 allToAllV 100 1 7 10 12 100 1 70 10 5
771      where:
772         1) 100 is the size of the send buffer *sizeof(int),
773         2) 1 7 10 12 is the sendcounts array
774         3) 100*sizeof(int) is the size of the receiver buffer
775         4)  1 70 10 5 is the recvcounts array
776   */
777   double clock = smpi_process()->simulated_elapsed();
778
779   unsigned long comm_size = MPI_COMM_WORLD->size();
780   CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
781   std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
782   std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
783   std::vector<int> senddisps(comm_size, 0);
784   std::vector<int> recvdisps(comm_size, 0);
785
786   MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 5 + 2 * comm_size)
787                                       ? simgrid::smpi::Datatype::decode(action[4 + 2 * comm_size])
788                                       : MPI_DEFAULT_TYPE;
789   MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5 + 2 * comm_size)
790                                      ? simgrid::smpi::Datatype::decode(action[5 + 2 * comm_size])
791                                      : MPI_DEFAULT_TYPE};
792
793   int send_buf_size=parse_double(action[2]);
794   int recv_buf_size=parse_double(action[3+comm_size]);
795   int my_proc_id = Actor::self()->getPid();
796   void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* MPI_CURRENT_TYPE->size());
797   void *recvbuf  = smpi_get_tmp_recvbuffer(recv_buf_size* MPI_CURRENT_TYPE2->size());
798
799   for (unsigned int i = 0; i < comm_size; i++) {
800     (*sendcounts)[i] = std::stoi(action[3 + i]);
801     (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
802   }
803   int send_size = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
804   int recv_size = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
805
806   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
807                      new simgrid::instr::VarCollTIData("allToAllV", -1, send_size, sendcounts, recv_size, recvcounts,
808                                                        Datatype::encode(MPI_CURRENT_TYPE),
809                                                        Datatype::encode(MPI_CURRENT_TYPE2)));
810
811   Colls::alltoallv(sendbuf, sendcounts->data(), senddisps.data(), MPI_CURRENT_TYPE, recvbuf, recvcounts->data(),
812                    recvdisps.data(), MPI_CURRENT_TYPE, MPI_COMM_WORLD);
813
814   TRACE_smpi_comm_out(my_proc_id);
815   log_timed_action (action, clock);
816 }
817
818 }} // namespace simgrid::smpi
819
820 /** @brief Only initialize the replay, don't do it for real */
821 void smpi_replay_init(int* argc, char*** argv)
822 {
823   simgrid::smpi::Process::init(argc, argv);
824   smpi_process()->mark_as_initialized();
825   smpi_process()->set_replaying(true);
826
827   int my_proc_id = Actor::self()->getPid();
828   TRACE_smpi_init(my_proc_id);
829   TRACE_smpi_computing_init(my_proc_id);
830   TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
831   TRACE_smpi_comm_out(my_proc_id);
832   xbt_replay_action_register("init",       simgrid::smpi::action_init);
833   xbt_replay_action_register("finalize",   simgrid::smpi::action_finalize);
834   xbt_replay_action_register("comm_size",  simgrid::smpi::action_comm_size);
835   xbt_replay_action_register("comm_split", simgrid::smpi::action_comm_split);
836   xbt_replay_action_register("comm_dup",   simgrid::smpi::action_comm_dup);
837
838   std::shared_ptr<simgrid::smpi::Replay::SendAction> isend(new simgrid::smpi::Replay::SendAction("Isend"));
839   std::shared_ptr<simgrid::smpi::Replay::SendAction> send(new simgrid::smpi::Replay::SendAction("send"));
840   std::shared_ptr<simgrid::smpi::Replay::RecvAction> irecv(new simgrid::smpi::Replay::RecvAction("Irecv"));
841   std::shared_ptr<simgrid::smpi::Replay::RecvAction> recv(new simgrid::smpi::Replay::RecvAction("recv"));
842   std::shared_ptr<simgrid::smpi::Replay::WaitAction> wait(new simgrid::smpi::Replay::WaitAction());
843
844   xbt_replay_action_register("send",
845                              std::bind(&simgrid::smpi::Replay::SendAction::execute, send, std::placeholders::_1));
846   xbt_replay_action_register("Isend",
847                              std::bind(&simgrid::smpi::Replay::SendAction::execute, isend, std::placeholders::_1));
848   xbt_replay_action_register("recv",
849                              std::bind(&simgrid::smpi::Replay::RecvAction::execute, recv, std::placeholders::_1));
850   xbt_replay_action_register("Irecv",
851                              std::bind(&simgrid::smpi::Replay::RecvAction::execute, irecv, std::placeholders::_1));
852   xbt_replay_action_register("test", simgrid::smpi::action_test);
853   xbt_replay_action_register("wait",
854                              std::bind(&simgrid::smpi::Replay::WaitAction::execute, wait, std::placeholders::_1));
855   xbt_replay_action_register("waitAll",    simgrid::smpi::action_waitall);
856   xbt_replay_action_register("barrier",    simgrid::smpi::action_barrier);
857   xbt_replay_action_register("bcast",      simgrid::smpi::action_bcast);
858   xbt_replay_action_register("reduce",     simgrid::smpi::action_reduce);
859   xbt_replay_action_register("allReduce",  simgrid::smpi::action_allReduce);
860   xbt_replay_action_register("allToAll",   simgrid::smpi::action_allToAll);
861   xbt_replay_action_register("allToAllV",  simgrid::smpi::action_allToAllv);
862   xbt_replay_action_register("gather",     simgrid::smpi::action_gather);
863   xbt_replay_action_register("scatter", simgrid::smpi::action_scatter);
864   xbt_replay_action_register("gatherV",    simgrid::smpi::action_gatherv);
865   xbt_replay_action_register("scatterV", simgrid::smpi::action_scatterv);
866   xbt_replay_action_register("allGather",  simgrid::smpi::action_allgather);
867   xbt_replay_action_register("allGatherV", simgrid::smpi::action_allgatherv);
868   xbt_replay_action_register("reduceScatter",  simgrid::smpi::action_reducescatter);
869   xbt_replay_action_register("compute", simgrid::smpi::action_compute);
870
871   //if we have a delayed start, sleep here.
872   if(*argc>2){
873     double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
874     XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
875     smpi_execute_flops(value);
876   } else {
877     //UGLY: force a context switch to be sure that all MSG_processes begin initialization
878     XBT_DEBUG("Force context switch by smpi_execute_flops  - Sleeping for 0.0 flops ");
879     smpi_execute_flops(0.0);
880   }
881 }
882
883 /** @brief actually run the replay after initialization */
884 void smpi_replay_main(int* argc, char*** argv)
885 {
886   simgrid::xbt::replay_runner(*argc, *argv);
887
888   /* and now, finalize everything */
889   /* One active process will stop. Decrease the counter*/
890   XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
891   if (not get_reqq_self()->empty()) {
892     unsigned int count_requests=get_reqq_self()->size();
893     MPI_Request requests[count_requests];
894     MPI_Status status[count_requests];
895     unsigned int i=0;
896
897     for (auto const& req : *get_reqq_self()) {
898       requests[i] = req;
899       i++;
900     }
901     simgrid::smpi::Request::waitall(count_requests, requests, status);
902   }
903   delete get_reqq_self();
904   active_processes--;
905
906   if(active_processes==0){
907     /* Last process alive speaking: end the simulated timer */
908     XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
909     smpi_free_replay_tmp_buffers();
910   }
911
912   TRACE_smpi_comm_in(Actor::self()->getPid(), "smpi_replay_run_finalize", new simgrid::instr::NoOpTIData("finalize"));
913
914   smpi_process()->finalize();
915
916   TRACE_smpi_comm_out(Actor::self()->getPid());
917   TRACE_smpi_finalize(Actor::self()->getPid());
918 }
919
920 /** @brief chain a replay initialization and a replay start */
921 void smpi_replay_run(int* argc, char*** argv)
922 {
923   smpi_replay_init(argc, argv);
924   smpi_replay_main(argc, argv);
925 }