Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
[SMPI] Replay: Classify the actions, start with Wait
[simgrid.git] / src / smpi / internals / smpi_replay.cpp
1 /* Copyright (c) 2009-2017. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "private.hpp"
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
14
15 #include <boost/algorithm/string/join.hpp>
16 #include <memory>
17 #include <numeric>
18 #include <unordered_map>
19 #include <vector>
20
21 using simgrid::s4u::Actor;
22
23 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
24
25 static int communicator_size = 0;
26 static int active_processes  = 0;
27 static std::unordered_map<int, std::vector<MPI_Request>*> reqq;
28
29 static MPI_Datatype MPI_DEFAULT_TYPE;
30
31 #define CHECK_ACTION_PARAMS(action, mandatory, optional)                                                               \
32   {                                                                                                                    \
33     if (action.size() < static_cast<unsigned long>(mandatory + 2))                                                     \
34       THROWF(arg_error, 0, "%s replay failed.\n"                                                                       \
35                            "%lu items were given on the line. First two should be process_id and action.  "            \
36                            "This action needs after them %lu mandatory arguments, and accepts %lu optional ones. \n"   \
37                            "Please contact the Simgrid team if support is needed",                                     \
38              __FUNCTION__, action.size(), static_cast<unsigned long>(mandatory),                                       \
39              static_cast<unsigned long>(optional));                                                                    \
40   }
41
42 static void log_timed_action (simgrid::xbt::ReplayAction& action, double clock){
43   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
44     std::string s = boost::algorithm::join(action, " ");
45     XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed()-clock);
46   }
47 }
48
49 static std::vector<MPI_Request>* get_reqq_self()
50 {
51   return reqq.at(Actor::self()->getPid());
52 }
53
54 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
55 {
56    reqq.insert({Actor::self()->getPid(), mpi_request});
57 }
58
59 /* Helper function */
60 static double parse_double(std::string string)
61 {
62   return xbt_str_parse_double(string.c_str(), "%s is not a double");
63 }
64
65 namespace simgrid {
66 namespace smpi {
67
68 namespace Replay {
69 class ActionArgParser {
70 public:
71   virtual void parse(simgrid::xbt::ReplayAction& action){};
72 };
73
74 template<class T> class ReplayAction {
75   protected:
76   const std::string name;
77   T args;
78
79   /*
80    * Used to compute the duration of this action.
81    */
82   double start_time;
83
84   int my_proc_id;
85
86 public:
87   explicit ReplayAction(std::string name)
88       : name(name), start_time(smpi_process()->simulated_elapsed()), my_proc_id(simgrid::s4u::Actor::self()->getPid())
89   {
90   }
91
92   virtual void execute(simgrid::xbt::ReplayAction& action)
93   {
94     args.parse(action);
95     kernel(action);
96     log_timed_action(action, start_time);
97   }
98
99   virtual void kernel(simgrid::xbt::ReplayAction& action) = 0;
100 };
101
102 class WaitAction : public ReplayAction<ActionArgParser> {
103 public:
104   WaitAction() : ReplayAction("Wait") {}
105   void kernel(simgrid::xbt::ReplayAction& action) override
106   {
107     CHECK_ACTION_PARAMS(action, 0, 0)
108     MPI_Status status;
109
110     std::string s = boost::algorithm::join(action, " ");
111     xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
112     MPI_Request request = get_reqq_self()->back();
113     get_reqq_self()->pop_back();
114
115     if (request == nullptr) {
116       /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just
117        * return.*/
118       return;
119     }
120
121     int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
122
123     MPI_Group group          = request->comm()->group();
124     int src_traced           = group->rank(request->src());
125     int dst_traced           = group->rank(request->dst());
126     bool is_wait_for_receive = (request->flags() & RECV);
127     // TODO: Here we take the rank while we normally take the process id (look for my_proc_id)
128     TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::NoOpTIData("wait"));
129
130     Request::wait(&request, &status);
131
132     TRACE_smpi_comm_out(rank);
133     if (is_wait_for_receive)
134       TRACE_smpi_recv(src_traced, dst_traced, 0);
135   }
136 };
137
138 static void action_init(simgrid::xbt::ReplayAction& action)
139 {
140   XBT_DEBUG("Initialize the counters");
141   CHECK_ACTION_PARAMS(action, 0, 1)
142   if (action.size() > 2)
143     MPI_DEFAULT_TYPE = MPI_DOUBLE; // default MPE datatype
144   else
145     MPI_DEFAULT_TYPE = MPI_BYTE; // default TAU datatype
146
147   /* start a simulated timer */
148   smpi_process()->simulated_start();
149   /*initialize the number of active processes */
150   active_processes = smpi_process_count();
151
152   set_reqq_self(new std::vector<MPI_Request>);
153 }
154
155 static void action_finalize(simgrid::xbt::ReplayAction& action)
156 {
157   /* Nothing to do */
158 }
159
160 static void action_comm_size(simgrid::xbt::ReplayAction& action)
161 {
162   communicator_size = parse_double(action[2]);
163   log_timed_action (action, smpi_process()->simulated_elapsed());
164 }
165
166 static void action_comm_split(simgrid::xbt::ReplayAction& action)
167 {
168   log_timed_action (action, smpi_process()->simulated_elapsed());
169 }
170
171 static void action_comm_dup(simgrid::xbt::ReplayAction& action)
172 {
173   log_timed_action (action, smpi_process()->simulated_elapsed());
174 }
175
176 static void action_compute(simgrid::xbt::ReplayAction& action)
177 {
178   CHECK_ACTION_PARAMS(action, 1, 0)
179   double clock = smpi_process()->simulated_elapsed();
180   double flops= parse_double(action[2]);
181   int my_proc_id = Actor::self()->getPid();
182
183   TRACE_smpi_computing_in(my_proc_id, flops);
184   smpi_execute_flops(flops);
185   TRACE_smpi_computing_out(my_proc_id);
186
187   log_timed_action (action, clock);
188 }
189
190 static void action_send(simgrid::xbt::ReplayAction& action)
191 {
192   CHECK_ACTION_PARAMS(action, 2, 1)
193   int to       = std::stoi(action[2]);
194   double size=parse_double(action[3]);
195   double clock = smpi_process()->simulated_elapsed();
196
197   MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE;
198
199   int my_proc_id = Actor::self()->getPid();
200   int dst_traced = MPI_COMM_WORLD->group()->actor(to)->getPid();
201
202   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
203                      new simgrid::instr::Pt2PtTIData("send", to, size, Datatype::encode(MPI_CURRENT_TYPE)));
204   if (not TRACE_smpi_view_internals())
205     TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, size * MPI_CURRENT_TYPE->size());
206
207   Request::send(nullptr, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
208
209   TRACE_smpi_comm_out(my_proc_id);
210
211   log_timed_action(action, clock);
212 }
213
214 static void action_Isend(simgrid::xbt::ReplayAction& action)
215 {
216   CHECK_ACTION_PARAMS(action, 2, 1)
217   int to       = std::stoi(action[2]);
218   double size=parse_double(action[3]);
219   double clock = smpi_process()->simulated_elapsed();
220
221   MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE;
222
223   int my_proc_id = Actor::self()->getPid();
224   int dst_traced = MPI_COMM_WORLD->group()->actor(to)->getPid();
225   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
226                      new simgrid::instr::Pt2PtTIData("Isend", to, size, Datatype::encode(MPI_CURRENT_TYPE)));
227   if (not TRACE_smpi_view_internals())
228     TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, size * MPI_CURRENT_TYPE->size());
229
230   MPI_Request request = Request::isend(nullptr, size, MPI_CURRENT_TYPE, to, 0, MPI_COMM_WORLD);
231
232   TRACE_smpi_comm_out(my_proc_id);
233
234   get_reqq_self()->push_back(request);
235
236   log_timed_action (action, clock);
237 }
238
239 static void action_recv(simgrid::xbt::ReplayAction& action)
240 {
241   CHECK_ACTION_PARAMS(action, 2, 1)
242   int from     = std::stoi(action[2]);
243   double size=parse_double(action[3]);
244   double clock = smpi_process()->simulated_elapsed();
245   MPI_Status status;
246
247   MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE;
248
249   int my_proc_id = Actor::self()->getPid();
250   int src_traced = MPI_COMM_WORLD->group()->actor(from)->getPid();
251
252   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
253                      new simgrid::instr::Pt2PtTIData("recv", from, size, Datatype::encode(MPI_CURRENT_TYPE)));
254
255   //unknown size from the receiver point of view
256   if (size <= 0.0) {
257     Request::probe(from, 0, MPI_COMM_WORLD, &status);
258     size=status.count;
259   }
260
261   Request::recv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
262
263   TRACE_smpi_comm_out(my_proc_id);
264   if (not TRACE_smpi_view_internals()) {
265     TRACE_smpi_recv(src_traced, my_proc_id, 0);
266   }
267
268   log_timed_action (action, clock);
269 }
270
271 static void action_Irecv(simgrid::xbt::ReplayAction& action)
272 {
273   CHECK_ACTION_PARAMS(action, 2, 1)
274   int from     = std::stoi(action[2]);
275   double size=parse_double(action[3]);
276   double clock = smpi_process()->simulated_elapsed();
277
278   MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE;
279
280   int my_proc_id = Actor::self()->getPid();
281   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
282                      new simgrid::instr::Pt2PtTIData("Irecv", from, size, Datatype::encode(MPI_CURRENT_TYPE)));
283   MPI_Status status;
284   //unknow size from the receiver pov
285   if (size <= 0.0) {
286     Request::probe(from, 0, MPI_COMM_WORLD, &status);
287     size = status.count;
288   }
289
290   MPI_Request request = Request::irecv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
291
292   TRACE_smpi_comm_out(my_proc_id);
293   get_reqq_self()->push_back(request);
294
295   log_timed_action (action, clock);
296 }
297
298 static void action_test(simgrid::xbt::ReplayAction& action)
299 {
300   CHECK_ACTION_PARAMS(action, 0, 0)
301   double clock = smpi_process()->simulated_elapsed();
302   MPI_Status status;
303
304   MPI_Request request = get_reqq_self()->back();
305   get_reqq_self()->pop_back();
306   //if request is null here, this may mean that a previous test has succeeded
307   //Different times in traced application and replayed version may lead to this
308   //In this case, ignore the extra calls.
309   if(request!=nullptr){
310     int my_proc_id = Actor::self()->getPid();
311     TRACE_smpi_testing_in(my_proc_id);
312
313     int flag = Request::test(&request, &status);
314
315     XBT_DEBUG("MPI_Test result: %d", flag);
316     /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now nullptr.*/
317     get_reqq_self()->push_back(request);
318
319     TRACE_smpi_testing_out(my_proc_id);
320   }
321   log_timed_action (action, clock);
322 }
323
324 static void action_wait(simgrid::xbt::ReplayAction& action)
325 {
326   Replay::WaitAction().execute(action);
327 }
328
329 static void action_waitall(simgrid::xbt::ReplayAction& action)
330 {
331   CHECK_ACTION_PARAMS(action, 0, 0)
332   double clock = smpi_process()->simulated_elapsed();
333   const unsigned int count_requests = get_reqq_self()->size();
334
335   if (count_requests>0) {
336     MPI_Status status[count_requests];
337
338     int my_proc_id_traced = Actor::self()->getPid();
339     TRACE_smpi_comm_in(my_proc_id_traced, __FUNCTION__,
340                        new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
341     int recvs_snd[count_requests];
342     int recvs_rcv[count_requests];
343     for (unsigned int i = 0; i < count_requests; i++) {
344       const auto& req = (*get_reqq_self())[i];
345       if (req && (req->flags() & RECV)) {
346         recvs_snd[i] = req->src();
347         recvs_rcv[i] = req->dst();
348       } else
349         recvs_snd[i] = -100;
350    }
351    Request::waitall(count_requests, &(*get_reqq_self())[0], status);
352
353    for (unsigned i = 0; i < count_requests; i++) {
354      if (recvs_snd[i]!=-100)
355        TRACE_smpi_recv(recvs_snd[i], recvs_rcv[i],0);
356    }
357    TRACE_smpi_comm_out(my_proc_id_traced);
358   }
359   log_timed_action (action, clock);
360 }
361
362 static void action_barrier(simgrid::xbt::ReplayAction& action)
363 {
364   double clock = smpi_process()->simulated_elapsed();
365   int my_proc_id = Actor::self()->getPid();
366   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::NoOpTIData("barrier"));
367
368   Colls::barrier(MPI_COMM_WORLD);
369
370   TRACE_smpi_comm_out(my_proc_id);
371   log_timed_action (action, clock);
372 }
373
374 static void action_bcast(simgrid::xbt::ReplayAction& action)
375 {
376   CHECK_ACTION_PARAMS(action, 1, 2)
377   double size = parse_double(action[2]);
378   double clock = smpi_process()->simulated_elapsed();
379   int root     = (action.size() > 3) ? std::stoi(action[3]) : 0;
380   /* Initialize MPI_CURRENT_TYPE in order to decrease the number of the checks */
381   MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE;
382
383   int my_proc_id = Actor::self()->getPid();
384   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
385                      new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(root)->getPid(), -1.0, size,
386                                                     -1, Datatype::encode(MPI_CURRENT_TYPE), ""));
387
388   void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
389
390   Colls::bcast(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
391
392   TRACE_smpi_comm_out(my_proc_id);
393   log_timed_action (action, clock);
394 }
395
396 static void action_reduce(simgrid::xbt::ReplayAction& action)
397 {
398   CHECK_ACTION_PARAMS(action, 2, 2)
399   double comm_size = parse_double(action[2]);
400   double comp_size = parse_double(action[3]);
401   double clock = smpi_process()->simulated_elapsed();
402   int root         = (action.size() > 4) ? std::stoi(action[4]) : 0;
403
404   MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 5) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE;
405
406   int my_proc_id = Actor::self()->getPid();
407   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
408                      new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(root)->getPid(), comp_size,
409                                                     comm_size, -1, Datatype::encode(MPI_CURRENT_TYPE), ""));
410
411   void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
412   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
413   Colls::reduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
414   smpi_execute_flops(comp_size);
415
416   TRACE_smpi_comm_out(my_proc_id);
417   log_timed_action (action, clock);
418 }
419
420 static void action_allReduce(simgrid::xbt::ReplayAction& action)
421 {
422   CHECK_ACTION_PARAMS(action, 2, 1)
423   double comm_size = parse_double(action[2]);
424   double comp_size = parse_double(action[3]);
425
426   MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE;
427
428   double clock = smpi_process()->simulated_elapsed();
429   int my_proc_id = Actor::self()->getPid();
430   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::CollTIData("allReduce", -1, comp_size, comm_size, -1,
431                                                                               Datatype::encode(MPI_CURRENT_TYPE), ""));
432
433   void *recvbuf = smpi_get_tmp_recvbuffer(comm_size* MPI_CURRENT_TYPE->size());
434   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
435   Colls::allreduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
436   smpi_execute_flops(comp_size);
437
438   TRACE_smpi_comm_out(my_proc_id);
439   log_timed_action (action, clock);
440 }
441
442 static void action_allToAll(simgrid::xbt::ReplayAction& action)
443 {
444   CHECK_ACTION_PARAMS(action, 2, 2) //two mandatory (send and recv volumes) and two optional (corresponding datatypes)
445   double clock = smpi_process()->simulated_elapsed();
446   unsigned long comm_size = MPI_COMM_WORLD->size();
447   int send_size = parse_double(action[2]);
448   int recv_size = parse_double(action[3]);
449   MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE};
450   MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
451
452   void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* MPI_CURRENT_TYPE->size());
453   void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
454
455   int my_proc_id = Actor::self()->getPid();
456   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
457                      new simgrid::instr::CollTIData("allToAll", -1, -1.0, send_size, recv_size,
458                                                     Datatype::encode(MPI_CURRENT_TYPE),
459                                                     Datatype::encode(MPI_CURRENT_TYPE2)));
460
461   Colls::alltoall(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
462
463   TRACE_smpi_comm_out(my_proc_id);
464   log_timed_action (action, clock);
465 }
466
467 static void action_gather(simgrid::xbt::ReplayAction& action)
468 {
469   /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
470         0 gather 68 68 0 0 0
471       where:
472         1) 68 is the sendcounts
473         2) 68 is the recvcounts
474         3) 0 is the root node
475         4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
476         5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
477   */
478   CHECK_ACTION_PARAMS(action, 2, 3)
479   double clock = smpi_process()->simulated_elapsed();
480   unsigned long comm_size = MPI_COMM_WORLD->size();
481   int send_size = parse_double(action[2]);
482   int recv_size = parse_double(action[3]);
483   MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
484   MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[6]) : MPI_DEFAULT_TYPE};
485
486   void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
487   void *recv = nullptr;
488   int root   = (action.size() > 4) ? std::stoi(action[4]) : 0;
489   int rank = MPI_COMM_WORLD->rank();
490
491   if(rank==root)
492     recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
493
494   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
495                                                                         Datatype::encode(MPI_CURRENT_TYPE),
496                                                                         Datatype::encode(MPI_CURRENT_TYPE2)));
497
498   Colls::gather(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
499
500   TRACE_smpi_comm_out(Actor::self()->getPid());
501   log_timed_action (action, clock);
502 }
503
504 static void action_scatter(simgrid::xbt::ReplayAction& action)
505 {
506   /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
507         0 gather 68 68 0 0 0
508       where:
509         1) 68 is the sendcounts
510         2) 68 is the recvcounts
511         3) 0 is the root node
512         4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
513         5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
514   */
515   CHECK_ACTION_PARAMS(action, 2, 3)
516   double clock                   = smpi_process()->simulated_elapsed();
517   unsigned long comm_size        = MPI_COMM_WORLD->size();
518   int send_size                  = parse_double(action[2]);
519   int recv_size                  = parse_double(action[3]);
520   MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
521   MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[6]) : MPI_DEFAULT_TYPE};
522
523   void* send = smpi_get_tmp_sendbuffer(send_size * MPI_CURRENT_TYPE->size());
524   void* recv = nullptr;
525   int root   = (action.size() > 4) ? std::stoi(action[4]) : 0;
526   int rank = MPI_COMM_WORLD->rank();
527
528   if (rank == root)
529     recv = smpi_get_tmp_recvbuffer(recv_size * comm_size * MPI_CURRENT_TYPE2->size());
530
531   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
532                                                                         Datatype::encode(MPI_CURRENT_TYPE),
533                                                                         Datatype::encode(MPI_CURRENT_TYPE2)));
534
535   Colls::scatter(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
536
537   TRACE_smpi_comm_out(Actor::self()->getPid());
538   log_timed_action(action, clock);
539 }
540
541 static void action_gatherv(simgrid::xbt::ReplayAction& action)
542 {
543   /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
544        0 gather 68 68 10 10 10 0 0 0
545      where:
546        1) 68 is the sendcount
547        2) 68 10 10 10 is the recvcounts
548        3) 0 is the root node
549        4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
550        5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
551   */
552   double clock = smpi_process()->simulated_elapsed();
553   unsigned long comm_size = MPI_COMM_WORLD->size();
554   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
555   int send_size = parse_double(action[2]);
556   std::vector<int> disps(comm_size, 0);
557   std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
558
559   MPI_Datatype MPI_CURRENT_TYPE =
560       (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
561   MPI_Datatype MPI_CURRENT_TYPE2{
562       (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
563
564   void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
565   void *recv = nullptr;
566   for (unsigned int i = 0; i < comm_size; i++) {
567     (*recvcounts)[i] = std::stoi(action[i + 3]);
568   }
569   int recv_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
570
571   int root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
572   int rank = MPI_COMM_WORLD->rank();
573
574   if(rank==root)
575     recv = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
576
577   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData(
578                                              "gatherV", root, send_size, nullptr, -1, recvcounts,
579                                              Datatype::encode(MPI_CURRENT_TYPE), Datatype::encode(MPI_CURRENT_TYPE2)));
580
581   Colls::gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts->data(), disps.data(), MPI_CURRENT_TYPE2, root,
582                  MPI_COMM_WORLD);
583
584   TRACE_smpi_comm_out(Actor::self()->getPid());
585   log_timed_action (action, clock);
586 }
587
588 static void action_scatterv(simgrid::xbt::ReplayAction& action)
589 {
590   /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
591        0 gather 68 10 10 10 68 0 0 0
592      where:
593        1) 68 10 10 10 is the sendcounts
594        2) 68 is the recvcount
595        3) 0 is the root node
596        4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
597        5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
598   */
599   double clock  = smpi_process()->simulated_elapsed();
600   unsigned long comm_size = MPI_COMM_WORLD->size();
601   CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
602   int recv_size = parse_double(action[2 + comm_size]);
603   std::vector<int> disps(comm_size, 0);
604   std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
605
606   MPI_Datatype MPI_CURRENT_TYPE =
607       (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
608   MPI_Datatype MPI_CURRENT_TYPE2{
609       (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
610
611   void* send = nullptr;
612   void* recv = smpi_get_tmp_recvbuffer(recv_size * MPI_CURRENT_TYPE->size());
613   for (unsigned int i = 0; i < comm_size; i++) {
614     (*sendcounts)[i] = std::stoi(action[i + 2]);
615   }
616   int send_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
617
618   int root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
619   int rank = MPI_COMM_WORLD->rank();
620
621   if (rank == root)
622     send = smpi_get_tmp_sendbuffer(send_sum * MPI_CURRENT_TYPE2->size());
623
624   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData("gatherV", root, -1, sendcounts, recv_size,
625                                                                            nullptr, Datatype::encode(MPI_CURRENT_TYPE),
626                                                                            Datatype::encode(MPI_CURRENT_TYPE2)));
627
628   Colls::scatterv(send, sendcounts->data(), disps.data(), MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root,
629                   MPI_COMM_WORLD);
630
631   TRACE_smpi_comm_out(Actor::self()->getPid());
632   log_timed_action(action, clock);
633 }
634
635 static void action_reducescatter(simgrid::xbt::ReplayAction& action)
636 {
637   /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
638        0 reduceScatter 275427 275427 275427 204020 11346849 0
639      where:
640        1) The first four values after the name of the action declare the recvcounts array
641        2) The value 11346849 is the amount of instructions
642        3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
643  */
644   double clock = smpi_process()->simulated_elapsed();
645   unsigned long comm_size = MPI_COMM_WORLD->size();
646   CHECK_ACTION_PARAMS(action, comm_size+1, 1)
647   int comp_size = parse_double(action[2+comm_size]);
648   int my_proc_id                     = Actor::self()->getPid();
649   std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>);
650   MPI_Datatype MPI_CURRENT_TYPE =
651       (action.size() > 3 + comm_size) ? simgrid::smpi::Datatype::decode(action[3 + comm_size]) : MPI_DEFAULT_TYPE;
652
653   for (unsigned int i = 0; i < comm_size; i++) {
654     recvcounts->push_back(std::stoi(action[i + 2]));
655   }
656   int size{std::accumulate(recvcounts->begin(), recvcounts->end(), 0)};
657
658   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
659                      new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, recvcounts,
660                                                        std::to_string(comp_size), /* ugly hack to print comp_size */
661                                                        Datatype::encode(MPI_CURRENT_TYPE)));
662
663   void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
664   void *recvbuf = smpi_get_tmp_recvbuffer(size* MPI_CURRENT_TYPE->size());
665
666   Colls::reduce_scatter(sendbuf, recvbuf, recvcounts->data(), MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
667   smpi_execute_flops(comp_size);
668
669   TRACE_smpi_comm_out(my_proc_id);
670   log_timed_action (action, clock);
671 }
672
673 static void action_allgather(simgrid::xbt::ReplayAction& action)
674 {
675   /* The structure of the allgather action for the rank 0 (total 4 processes) is the following:
676         0 allGather 275427 275427
677     where:
678         1) 275427 is the sendcount
679         2) 275427 is the recvcount
680         3) No more values mean that the datatype for sent and receive buffer is the default one, see
681     simgrid::smpi::Datatype::decode().
682   */
683   double clock = smpi_process()->simulated_elapsed();
684
685   CHECK_ACTION_PARAMS(action, 2, 2)
686   int sendcount = std::stoi(action[2]);
687   int recvcount = std::stoi(action[3]);
688
689   MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE};
690   MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
691
692   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
693   void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* MPI_CURRENT_TYPE2->size());
694
695   int my_proc_id = Actor::self()->getPid();
696
697   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
698                      new simgrid::instr::CollTIData("allGather", -1, -1.0, sendcount, recvcount,
699                                                     Datatype::encode(MPI_CURRENT_TYPE),
700                                                     Datatype::encode(MPI_CURRENT_TYPE2)));
701
702   Colls::allgather(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
703
704   TRACE_smpi_comm_out(my_proc_id);
705   log_timed_action (action, clock);
706 }
707
708 static void action_allgatherv(simgrid::xbt::ReplayAction& action)
709 {
710   /* The structure of the allgatherv action for the rank 0 (total 4 processes) is the following:
711         0 allGatherV 275427 275427 275427 275427 204020
712      where:
713         1) 275427 is the sendcount
714         2) The next four elements declare the recvcounts array
715         3) No more values mean that the datatype for sent and receive buffer is the default one, see
716      simgrid::smpi::Datatype::decode().
717   */
718   double clock = smpi_process()->simulated_elapsed();
719
720   unsigned long comm_size = MPI_COMM_WORLD->size();
721   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
722   int sendcount = std::stoi(action[2]);
723   std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
724   std::vector<int> disps(comm_size, 0);
725
726   int datatype_index = 0, disp_index = 0;
727   if (action.size() > 3 + 2 * comm_size) { /* datatype + disp are specified */
728     datatype_index = 3 + comm_size;
729     disp_index     = datatype_index + 1;
730   } else if (action.size() > 3 + 2 * comm_size) { /* disps specified; datatype is not specified; use the default one */
731     datatype_index = -1;
732     disp_index     = 3 + comm_size;
733   } else if (action.size() > 3 + comm_size) { /* only datatype, no disp specified */
734     datatype_index = 3 + comm_size;
735   }
736
737   if (disp_index != 0) {
738     for (unsigned int i = 0; i < comm_size; i++)
739       disps[i]          = std::stoi(action[disp_index + i]);
740   }
741
742   MPI_Datatype MPI_CURRENT_TYPE{(datatype_index > 0) ? simgrid::smpi::Datatype::decode(action[datatype_index])
743                                                      : MPI_DEFAULT_TYPE};
744   MPI_Datatype MPI_CURRENT_TYPE2{(datatype_index > 0) ? simgrid::smpi::Datatype::decode(action[datatype_index])
745                                                       : MPI_DEFAULT_TYPE};
746
747   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
748
749   for (unsigned int i = 0; i < comm_size; i++) {
750     (*recvcounts)[i] = std::stoi(action[i + 3]);
751   }
752   int recv_sum  = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
753   void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
754
755   int my_proc_id = Actor::self()->getPid();
756
757   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
758                      new simgrid::instr::VarCollTIData("allGatherV", -1, sendcount, nullptr, -1, recvcounts,
759                                                        Datatype::encode(MPI_CURRENT_TYPE),
760                                                        Datatype::encode(MPI_CURRENT_TYPE2)));
761
762   Colls::allgatherv(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts->data(), disps.data(), MPI_CURRENT_TYPE2,
763                     MPI_COMM_WORLD);
764
765   TRACE_smpi_comm_out(my_proc_id);
766   log_timed_action (action, clock);
767 }
768
769 static void action_allToAllv(simgrid::xbt::ReplayAction& action)
770 {
771   /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
772         0 allToAllV 100 1 7 10 12 100 1 70 10 5
773      where:
774         1) 100 is the size of the send buffer *sizeof(int),
775         2) 1 7 10 12 is the sendcounts array
776         3) 100*sizeof(int) is the size of the receiver buffer
777         4)  1 70 10 5 is the recvcounts array
778   */
779   double clock = smpi_process()->simulated_elapsed();
780
781   unsigned long comm_size = MPI_COMM_WORLD->size();
782   CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
783   std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
784   std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
785   std::vector<int> senddisps(comm_size, 0);
786   std::vector<int> recvdisps(comm_size, 0);
787
788   MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 5 + 2 * comm_size)
789                                       ? simgrid::smpi::Datatype::decode(action[4 + 2 * comm_size])
790                                       : MPI_DEFAULT_TYPE;
791   MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5 + 2 * comm_size)
792                                      ? simgrid::smpi::Datatype::decode(action[5 + 2 * comm_size])
793                                      : MPI_DEFAULT_TYPE};
794
795   int send_buf_size=parse_double(action[2]);
796   int recv_buf_size=parse_double(action[3+comm_size]);
797   int my_proc_id = Actor::self()->getPid();
798   void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* MPI_CURRENT_TYPE->size());
799   void *recvbuf  = smpi_get_tmp_recvbuffer(recv_buf_size* MPI_CURRENT_TYPE2->size());
800
801   for (unsigned int i = 0; i < comm_size; i++) {
802     (*sendcounts)[i] = std::stoi(action[3 + i]);
803     (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
804   }
805   int send_size = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
806   int recv_size = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
807
808   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
809                      new simgrid::instr::VarCollTIData("allToAllV", -1, send_size, sendcounts, recv_size, recvcounts,
810                                                        Datatype::encode(MPI_CURRENT_TYPE),
811                                                        Datatype::encode(MPI_CURRENT_TYPE2)));
812
813   Colls::alltoallv(sendbuf, sendcounts->data(), senddisps.data(), MPI_CURRENT_TYPE, recvbuf, recvcounts->data(),
814                    recvdisps.data(), MPI_CURRENT_TYPE, MPI_COMM_WORLD);
815
816   TRACE_smpi_comm_out(my_proc_id);
817   log_timed_action (action, clock);
818 }
819
820 }} // namespace simgrid::smpi
821
822 /** @brief Only initialize the replay, don't do it for real */
823 void smpi_replay_init(int* argc, char*** argv)
824 {
825   simgrid::smpi::Process::init(argc, argv);
826   smpi_process()->mark_as_initialized();
827   smpi_process()->set_replaying(true);
828
829   int my_proc_id = Actor::self()->getPid();
830   TRACE_smpi_init(my_proc_id);
831   TRACE_smpi_computing_init(my_proc_id);
832   TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
833   TRACE_smpi_comm_out(my_proc_id);
834   xbt_replay_action_register("init",       simgrid::smpi::action_init);
835   xbt_replay_action_register("finalize",   simgrid::smpi::action_finalize);
836   xbt_replay_action_register("comm_size",  simgrid::smpi::action_comm_size);
837   xbt_replay_action_register("comm_split", simgrid::smpi::action_comm_split);
838   xbt_replay_action_register("comm_dup",   simgrid::smpi::action_comm_dup);
839   xbt_replay_action_register("send",       simgrid::smpi::action_send);
840   xbt_replay_action_register("Isend",      simgrid::smpi::action_Isend);
841   xbt_replay_action_register("recv",       simgrid::smpi::action_recv);
842   xbt_replay_action_register("Irecv",      simgrid::smpi::action_Irecv);
843   xbt_replay_action_register("test",       simgrid::smpi::action_test);
844   xbt_replay_action_register("wait",       simgrid::smpi::action_wait);
845   xbt_replay_action_register("waitAll",    simgrid::smpi::action_waitall);
846   xbt_replay_action_register("barrier",    simgrid::smpi::action_barrier);
847   xbt_replay_action_register("bcast",      simgrid::smpi::action_bcast);
848   xbt_replay_action_register("reduce",     simgrid::smpi::action_reduce);
849   xbt_replay_action_register("allReduce",  simgrid::smpi::action_allReduce);
850   xbt_replay_action_register("allToAll",   simgrid::smpi::action_allToAll);
851   xbt_replay_action_register("allToAllV",  simgrid::smpi::action_allToAllv);
852   xbt_replay_action_register("gather",     simgrid::smpi::action_gather);
853   xbt_replay_action_register("scatter", simgrid::smpi::action_scatter);
854   xbt_replay_action_register("gatherV",    simgrid::smpi::action_gatherv);
855   xbt_replay_action_register("scatterV", simgrid::smpi::action_scatterv);
856   xbt_replay_action_register("allGather",  simgrid::smpi::action_allgather);
857   xbt_replay_action_register("allGatherV", simgrid::smpi::action_allgatherv);
858   xbt_replay_action_register("reduceScatter",  simgrid::smpi::action_reducescatter);
859   xbt_replay_action_register("compute",    simgrid::smpi::action_compute);
860
861   //if we have a delayed start, sleep here.
862   if(*argc>2){
863     double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
864     XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
865     smpi_execute_flops(value);
866   } else {
867     //UGLY: force a context switch to be sure that all MSG_processes begin initialization
868     XBT_DEBUG("Force context switch by smpi_execute_flops  - Sleeping for 0.0 flops ");
869     smpi_execute_flops(0.0);
870   }
871 }
872
873 /** @brief actually run the replay after initialization */
874 void smpi_replay_main(int* argc, char*** argv)
875 {
876   simgrid::xbt::replay_runner(*argc, *argv);
877
878   /* and now, finalize everything */
879   /* One active process will stop. Decrease the counter*/
880   XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
881   if (not get_reqq_self()->empty()) {
882     unsigned int count_requests=get_reqq_self()->size();
883     MPI_Request requests[count_requests];
884     MPI_Status status[count_requests];
885     unsigned int i=0;
886
887     for (auto const& req : *get_reqq_self()) {
888       requests[i] = req;
889       i++;
890     }
891     simgrid::smpi::Request::waitall(count_requests, requests, status);
892   }
893   delete get_reqq_self();
894   active_processes--;
895
896   if(active_processes==0){
897     /* Last process alive speaking: end the simulated timer */
898     XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
899     smpi_free_replay_tmp_buffers();
900   }
901
902   TRACE_smpi_comm_in(Actor::self()->getPid(), "smpi_replay_run_finalize", new simgrid::instr::NoOpTIData("finalize"));
903
904   smpi_process()->finalize();
905
906   TRACE_smpi_comm_out(Actor::self()->getPid());
907   TRACE_smpi_finalize(Actor::self()->getPid());
908 }
909
910 /** @brief chain a replay initialization and a replay start */
911 void smpi_replay_run(int* argc, char*** argv)
912 {
913   smpi_replay_init(argc, argv);
914   smpi_replay_main(argc, argv);
915 }