Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
[SMPI] Replay: Remove side effect from decode_datatype
[simgrid.git] / src / smpi / internals / smpi_replay.cpp
1 /* Copyright (c) 2009-2017. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "private.hpp"
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
14
15 #include <unordered_map>
16 #include <vector>
17
18 using simgrid::s4u::Actor;
19
20 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
21
22 static int communicator_size = 0;
23 static int active_processes  = 0;
24 static std::unordered_map<int, std::vector<MPI_Request>*> reqq;
25
26 static MPI_Datatype MPI_DEFAULT_TYPE;
27 static MPI_Datatype MPI_CURRENT_TYPE;
28
29 static int sendbuffer_size = 0;
30 static char* sendbuffer    = nullptr;
31 static int recvbuffer_size = 0;
32 static char* recvbuffer    = nullptr;
33
34 static void log_timed_action (const char *const *action, double clock){
35   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
36     char *name = xbt_str_join_array(action, " ");
37     XBT_VERB("%s %f", name, smpi_process()->simulated_elapsed()-clock);
38     xbt_free(name);
39   }
40 }
41
42 static std::vector<MPI_Request>* get_reqq_self()
43 {
44   return reqq.at(Actor::self()->getPid());
45 }
46
47 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
48 {
49    reqq.insert({Actor::self()->getPid(), mpi_request});
50 }
51
52 //allocate a single buffer for all sends, growing it if needed
53 void* smpi_get_tmp_sendbuffer(int size)
54 {
55   if (not smpi_process()->replaying())
56     return xbt_malloc(size);
57   if (sendbuffer_size<size){
58     sendbuffer=static_cast<char*>(xbt_realloc(sendbuffer,size));
59     sendbuffer_size=size;
60   }
61   return sendbuffer;
62 }
63
64 //allocate a single buffer for all recv
65 void* smpi_get_tmp_recvbuffer(int size){
66   if (not smpi_process()->replaying())
67     return xbt_malloc(size);
68   if (recvbuffer_size<size){
69     recvbuffer=static_cast<char*>(xbt_realloc(recvbuffer,size));
70     recvbuffer_size=size;
71   }
72   return recvbuffer;
73 }
74
75 void smpi_free_tmp_buffer(void* buf){
76   if (not smpi_process()->replaying())
77     xbt_free(buf);
78 }
79
80 /* Helper function */
81 static double parse_double(const char *string)
82 {
83   char *endptr;
84   double value = strtod(string, &endptr);
85   if (*endptr != '\0')
86     THROWF(unknown_error, 0, "%s is not a double", string);
87   return value;
88 }
89
90
91 //TODO: this logic should be moved inside the datatype class, to support all predefined types and get rid of is_replayable.
92 static MPI_Datatype decode_datatype(const char *const action)
93 {
94   switch(atoi(action)) {
95     case 0:
96       return MPI_DOUBLE;
97       break;
98     case 1:
99       return MPI_INT;
100       break;
101     case 2:
102       return MPI_CHAR;
103       break;
104     case 3:
105       return MPI_SHORT;
106       break;
107     case 4:
108       return MPI_LONG;
109       break;
110     case 5:
111       return MPI_FLOAT;
112       break;
113     case 6:
114       return MPI_BYTE;
115       break;
116     default:
117       return MPI_DEFAULT_TYPE;
118       break;
119   }
120 }
121
122 const char* encode_datatype(MPI_Datatype datatype)
123 {
124   if (datatype==MPI_BYTE)
125       return "";
126   if(datatype==MPI_DOUBLE)
127       return "0";
128   if(datatype==MPI_INT)
129       return "1";
130   if(datatype==MPI_CHAR)
131       return "2";
132   if(datatype==MPI_SHORT)
133       return "3";
134   if(datatype==MPI_LONG)
135     return "4";
136   if(datatype==MPI_FLOAT)
137       return "5";
138   // default - not implemented.
139   // do not warn here as we pass in this function even for other trace formats
140   return "-1";
141 }
142
143 #define CHECK_ACTION_PARAMS(action, mandatory, optional) {\
144     int i=0;\
145     while(action[i]!=nullptr)\
146      i++;\
147     if(i<mandatory+2)                                           \
148     THROWF(arg_error, 0, "%s replay failed.\n" \
149           "%d items were given on the line. First two should be process_id and action.  " \
150           "This action needs after them %d mandatory arguments, and accepts %d optional ones. \n" \
151           "Please contact the Simgrid team if support is needed", __FUNCTION__, i, mandatory, optional);\
152   }
153
154 namespace simgrid {
155 namespace smpi {
156
157 static void action_init(const char *const *action)
158 {
159   XBT_DEBUG("Initialize the counters");
160   CHECK_ACTION_PARAMS(action, 0, 1)
161   if(action[2])
162     MPI_DEFAULT_TYPE = MPI_DOUBLE; // default MPE datatype
163   else
164     MPI_DEFAULT_TYPE = MPI_BYTE; // default TAU datatype
165
166   /* start a simulated timer */
167   smpi_process()->simulated_start();
168   /*initialize the number of active processes */
169   active_processes = smpi_process_count();
170
171   set_reqq_self(new std::vector<MPI_Request>);
172 }
173
174 static void action_finalize(const char *const *action)
175 {
176   /* Nothing to do */
177 }
178
179 static void action_comm_size(const char *const *action)
180 {
181   communicator_size = parse_double(action[2]);
182   log_timed_action (action, smpi_process()->simulated_elapsed());
183 }
184
185 static void action_comm_split(const char *const *action)
186 {
187   log_timed_action (action, smpi_process()->simulated_elapsed());
188 }
189
190 static void action_comm_dup(const char *const *action)
191 {
192   log_timed_action (action, smpi_process()->simulated_elapsed());
193 }
194
195 static void action_compute(const char *const *action)
196 {
197   CHECK_ACTION_PARAMS(action, 1, 0)
198   double clock = smpi_process()->simulated_elapsed();
199   double flops= parse_double(action[2]);
200   int my_proc_id = Actor::self()->getPid();
201
202   TRACE_smpi_computing_in(my_proc_id, flops);
203   smpi_execute_flops(flops);
204   TRACE_smpi_computing_out(my_proc_id);
205
206   log_timed_action (action, clock);
207 }
208
209 static void action_send(const char *const *action)
210 {
211   CHECK_ACTION_PARAMS(action, 2, 1)
212   int to = atoi(action[2]);
213   double size=parse_double(action[3]);
214   double clock = smpi_process()->simulated_elapsed();
215
216   MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
217
218   int my_proc_id = Actor::self()->getPid();
219   int dst_traced = MPI_COMM_WORLD->group()->actor(to)->getPid();
220
221   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
222                      new simgrid::instr::Pt2PtTIData("send", to, size, encode_datatype(MPI_CURRENT_TYPE)));
223   if (not TRACE_smpi_view_internals())
224     TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, size * MPI_CURRENT_TYPE->size());
225
226   Request::send(nullptr, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
227
228   TRACE_smpi_comm_out(my_proc_id);
229
230   log_timed_action(action, clock);
231 }
232
233 static void action_Isend(const char *const *action)
234 {
235   CHECK_ACTION_PARAMS(action, 2, 1)
236   int to = atoi(action[2]);
237   double size=parse_double(action[3]);
238   double clock = smpi_process()->simulated_elapsed();
239
240   MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
241
242   int my_proc_id = Actor::self()->getPid();
243   int dst_traced = MPI_COMM_WORLD->group()->actor(to)->getPid();
244   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
245                      new simgrid::instr::Pt2PtTIData("Isend", to, size, encode_datatype(MPI_CURRENT_TYPE)));
246   if (not TRACE_smpi_view_internals())
247     TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, size * MPI_CURRENT_TYPE->size());
248
249   MPI_Request request = Request::isend(nullptr, size, MPI_CURRENT_TYPE, to, 0, MPI_COMM_WORLD);
250
251   TRACE_smpi_comm_out(my_proc_id);
252
253   get_reqq_self()->push_back(request);
254
255   log_timed_action (action, clock);
256 }
257
258 static void action_recv(const char *const *action) {
259   CHECK_ACTION_PARAMS(action, 2, 1)
260   int from = atoi(action[2]);
261   double size=parse_double(action[3]);
262   double clock = smpi_process()->simulated_elapsed();
263   MPI_Status status;
264
265   MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
266
267   int my_proc_id = Actor::self()->getPid();
268   int src_traced = MPI_COMM_WORLD->group()->actor(from)->getPid();
269
270   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
271                      new simgrid::instr::Pt2PtTIData("recv", from, size, encode_datatype(MPI_CURRENT_TYPE)));
272
273   //unknown size from the receiver point of view
274   if (size <= 0.0) {
275     Request::probe(from, 0, MPI_COMM_WORLD, &status);
276     size=status.count;
277   }
278
279   Request::recv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
280
281   TRACE_smpi_comm_out(my_proc_id);
282   if (not TRACE_smpi_view_internals()) {
283     TRACE_smpi_recv(src_traced, my_proc_id, 0);
284   }
285
286   log_timed_action (action, clock);
287 }
288
289 static void action_Irecv(const char *const *action)
290 {
291   CHECK_ACTION_PARAMS(action, 2, 1)
292   int from = atoi(action[2]);
293   double size=parse_double(action[3]);
294   double clock = smpi_process()->simulated_elapsed();
295
296   MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
297
298   int my_proc_id = Actor::self()->getPid();
299   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
300                      new simgrid::instr::Pt2PtTIData("Irecv", from, size, encode_datatype(MPI_CURRENT_TYPE)));
301   MPI_Status status;
302   //unknow size from the receiver pov
303   if (size <= 0.0) {
304     Request::probe(from, 0, MPI_COMM_WORLD, &status);
305     size = status.count;
306   }
307
308   MPI_Request request = Request::irecv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
309
310   TRACE_smpi_comm_out(my_proc_id);
311   get_reqq_self()->push_back(request);
312
313   log_timed_action (action, clock);
314 }
315
316 static void action_test(const char* const* action)
317 {
318   CHECK_ACTION_PARAMS(action, 0, 0)
319   double clock = smpi_process()->simulated_elapsed();
320   MPI_Status status;
321
322   MPI_Request request = get_reqq_self()->back();
323   get_reqq_self()->pop_back();
324   //if request is null here, this may mean that a previous test has succeeded
325   //Different times in traced application and replayed version may lead to this
326   //In this case, ignore the extra calls.
327   if(request!=nullptr){
328     int my_proc_id = Actor::self()->getPid();
329     TRACE_smpi_testing_in(my_proc_id);
330
331     int flag = Request::test(&request, &status);
332
333     XBT_DEBUG("MPI_Test result: %d", flag);
334     /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now nullptr.*/
335     get_reqq_self()->push_back(request);
336
337     TRACE_smpi_testing_out(my_proc_id);
338   }
339   log_timed_action (action, clock);
340 }
341
342 static void action_wait(const char *const *action){
343   CHECK_ACTION_PARAMS(action, 0, 0)
344   double clock = smpi_process()->simulated_elapsed();
345   MPI_Status status;
346
347   xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s",
348       xbt_str_join_array(action," "));
349   MPI_Request request = get_reqq_self()->back();
350   get_reqq_self()->pop_back();
351
352   if (request==nullptr){
353     /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just return.*/
354     return;
355   }
356
357   int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
358
359   MPI_Group group = request->comm()->group();
360   int src_traced = group->rank(request->src());
361   int dst_traced = group->rank(request->dst());
362   int is_wait_for_receive = (request->flags() & RECV);
363   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::NoOpTIData("wait"));
364
365   Request::wait(&request, &status);
366
367   TRACE_smpi_comm_out(rank);
368   if (is_wait_for_receive)
369     TRACE_smpi_recv(src_traced, dst_traced, 0);
370   log_timed_action (action, clock);
371 }
372
373 static void action_waitall(const char *const *action){
374   CHECK_ACTION_PARAMS(action, 0, 0)
375   double clock = smpi_process()->simulated_elapsed();
376   const unsigned int count_requests = get_reqq_self()->size();
377
378   if (count_requests>0) {
379     MPI_Status status[count_requests];
380
381     int my_proc_id_traced = Actor::self()->getPid();
382     TRACE_smpi_comm_in(my_proc_id_traced, __FUNCTION__,
383                        new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
384     int recvs_snd[count_requests];
385     int recvs_rcv[count_requests];
386     for (unsigned int i = 0; i < count_requests; i++) {
387       const auto& req = (*get_reqq_self())[i];
388       if (req && (req->flags() & RECV)) {
389         recvs_snd[i] = req->src();
390         recvs_rcv[i] = req->dst();
391       } else
392         recvs_snd[i] = -100;
393    }
394    Request::waitall(count_requests, &(*get_reqq_self())[0], status);
395
396    for (unsigned i = 0; i < count_requests; i++) {
397      if (recvs_snd[i]!=-100)
398        TRACE_smpi_recv(recvs_snd[i], recvs_rcv[i],0);
399    }
400    TRACE_smpi_comm_out(my_proc_id_traced);
401   }
402   log_timed_action (action, clock);
403 }
404
405 static void action_barrier(const char *const *action){
406   double clock = smpi_process()->simulated_elapsed();
407   int my_proc_id = Actor::self()->getPid();
408   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::NoOpTIData("barrier"));
409
410   Colls::barrier(MPI_COMM_WORLD);
411
412   TRACE_smpi_comm_out(my_proc_id);
413   log_timed_action (action, clock);
414 }
415
416 static void action_bcast(const char *const *action)
417 {
418   CHECK_ACTION_PARAMS(action, 1, 2)
419   double size = parse_double(action[2]);
420   double clock = smpi_process()->simulated_elapsed();
421   int root     = (action[3]) ? atoi(action[3]) : 0;
422   /* Initialize MPI_CURRENT_TYPE in order to decrease the number of the checks */
423   MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
424
425   MPI_CURRENT_TYPE = (action[3] && action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
426
427   int my_proc_id = Actor::self()->getPid();
428   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
429                      new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(root)->getPid(), -1.0, size,
430                                                     -1, encode_datatype(MPI_CURRENT_TYPE), ""));
431
432   void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
433
434   Colls::bcast(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
435
436   TRACE_smpi_comm_out(my_proc_id);
437   log_timed_action (action, clock);
438 }
439
440 static void action_reduce(const char *const *action)
441 {
442   CHECK_ACTION_PARAMS(action, 2, 2)
443   double comm_size = parse_double(action[2]);
444   double comp_size = parse_double(action[3]);
445   double clock = smpi_process()->simulated_elapsed();
446   int root         = (action[4]) ? atoi(action[4]) : 0;
447
448   MPI_CURRENT_TYPE = (action[4] && action[5]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE;
449
450   int my_proc_id = Actor::self()->getPid();
451   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
452                      new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(root)->getPid(), comp_size,
453                                                     comm_size, -1, encode_datatype(MPI_CURRENT_TYPE), ""));
454
455   void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
456   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
457   Colls::reduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
458   smpi_execute_flops(comp_size);
459
460   TRACE_smpi_comm_out(my_proc_id);
461   log_timed_action (action, clock);
462 }
463
464 static void action_allReduce(const char *const *action) {
465   CHECK_ACTION_PARAMS(action, 2, 1)
466   double comm_size = parse_double(action[2]);
467   double comp_size = parse_double(action[3]);
468
469   MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
470
471   double clock = smpi_process()->simulated_elapsed();
472   int my_proc_id = Actor::self()->getPid();
473   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::CollTIData("allReduce", -1, comp_size, comm_size, -1,
474                                                                               encode_datatype(MPI_CURRENT_TYPE), ""));
475
476   void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
477   void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
478   Colls::allreduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
479   smpi_execute_flops(comp_size);
480
481   TRACE_smpi_comm_out(my_proc_id);
482   log_timed_action (action, clock);
483 }
484
485 static void action_allToAll(const char *const *action) {
486   CHECK_ACTION_PARAMS(action, 2, 2) //two mandatory (send and recv volumes) and two optional (corresponding datatypes)
487   double clock = smpi_process()->simulated_elapsed();
488   int comm_size = MPI_COMM_WORLD->size();
489   int send_size = parse_double(action[2]);
490   int recv_size = parse_double(action[3]);
491   MPI_CURRENT_TYPE = (action[4] && action[5]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
492   MPI_Datatype MPI_CURRENT_TYPE2{(action[4] && action[5]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE};
493
494   void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* MPI_CURRENT_TYPE->size());
495   void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
496
497   int my_proc_id = Actor::self()->getPid();
498   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
499                      new simgrid::instr::CollTIData("allToAll", -1, -1.0, send_size, recv_size,
500                                                     encode_datatype(MPI_CURRENT_TYPE),
501                                                     encode_datatype(MPI_CURRENT_TYPE2)));
502
503   Colls::alltoall(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
504
505   TRACE_smpi_comm_out(my_proc_id);
506   log_timed_action (action, clock);
507 }
508
509 static void action_gather(const char *const *action) {
510   /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
511         0 gather 68 68 0 0 0
512       where:
513         1) 68 is the sendcounts
514         2) 68 is the recvcounts
515         3) 0 is the root node
516         4) 0 is the send datatype id, see decode_datatype()
517         5) 0 is the recv datatype id, see decode_datatype()
518   */
519   CHECK_ACTION_PARAMS(action, 2, 3)
520   double clock = smpi_process()->simulated_elapsed();
521   int comm_size = MPI_COMM_WORLD->size();
522   int send_size = parse_double(action[2]);
523   int recv_size = parse_double(action[3]);
524   MPI_CURRENT_TYPE = (action[5] && action[6]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE;
525   MPI_Datatype MPI_CURRENT_TYPE2{(action[5] && action[6]) ? decode_datatype(action[6]) : MPI_DEFAULT_TYPE};
526
527   void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
528   void *recv = nullptr;
529   int root   = (action[4]) ? atoi(action[4]) : 0;
530   int rank = MPI_COMM_WORLD->rank();
531
532   if(rank==root)
533     recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
534
535   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
536                                                                         encode_datatype(MPI_CURRENT_TYPE),
537                                                                         encode_datatype(MPI_CURRENT_TYPE2)));
538
539   Colls::gather(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
540
541   TRACE_smpi_comm_out(Actor::self()->getPid());
542   log_timed_action (action, clock);
543 }
544
545 static void action_scatter(const char* const* action)
546 {
547   /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
548         0 gather 68 68 0 0 0
549       where:
550         1) 68 is the sendcounts
551         2) 68 is the recvcounts
552         3) 0 is the root node
553         4) 0 is the send datatype id, see decode_datatype()
554         5) 0 is the recv datatype id, see decode_datatype()
555   */
556   CHECK_ACTION_PARAMS(action, 2, 3)
557   double clock                   = smpi_process()->simulated_elapsed();
558   int comm_size                  = MPI_COMM_WORLD->size();
559   int send_size                  = parse_double(action[2]);
560   int recv_size                  = parse_double(action[3]);
561   MPI_CURRENT_TYPE               = (action[5] && action[6]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE;
562   MPI_Datatype MPI_CURRENT_TYPE2{(action[5] && action[6]) ? decode_datatype(action[6]) : MPI_DEFAULT_TYPE};
563
564   void* send = smpi_get_tmp_sendbuffer(send_size * MPI_CURRENT_TYPE->size());
565   void* recv = nullptr;
566   int root   = (action[4]) ? atoi(action[4]) : 0;
567   int rank = MPI_COMM_WORLD->rank();
568
569   if (rank == root)
570     recv = smpi_get_tmp_recvbuffer(recv_size * comm_size * MPI_CURRENT_TYPE2->size());
571
572   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
573                                                                         encode_datatype(MPI_CURRENT_TYPE),
574                                                                         encode_datatype(MPI_CURRENT_TYPE2)));
575
576   Colls::scatter(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
577
578   TRACE_smpi_comm_out(Actor::self()->getPid());
579   log_timed_action(action, clock);
580 }
581
582 static void action_gatherv(const char *const *action) {
583   /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
584        0 gather 68 68 10 10 10 0 0 0
585      where:
586        1) 68 is the sendcount
587        2) 68 10 10 10 is the recvcounts
588        3) 0 is the root node
589        4) 0 is the send datatype id, see decode_datatype()
590        5) 0 is the recv datatype id, see decode_datatype()
591   */
592   double clock = smpi_process()->simulated_elapsed();
593   int comm_size = MPI_COMM_WORLD->size();
594   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
595   int send_size = parse_double(action[2]);
596   int disps[comm_size];
597   int recvcounts[comm_size];
598   int recv_sum=0;
599
600   MPI_CURRENT_TYPE =
601       (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
602   MPI_Datatype MPI_CURRENT_TYPE2{
603       (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
604
605   void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
606   void *recv = nullptr;
607   for(int i=0;i<comm_size;i++) {
608     recvcounts[i] = atoi(action[i+3]);
609     recv_sum=recv_sum+recvcounts[i];
610     disps[i]=0;
611   }
612
613   int root = (action[3 + comm_size]) ? atoi(action[3 + comm_size]) : 0;
614   int rank = MPI_COMM_WORLD->rank();
615
616   if(rank==root)
617     recv = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
618
619   std::vector<int>* trace_recvcounts = new std::vector<int>(recvcounts, recvcounts + comm_size);
620
621   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData(
622                                              "gatherV", root, send_size, nullptr, -1, trace_recvcounts,
623                                              encode_datatype(MPI_CURRENT_TYPE), encode_datatype(MPI_CURRENT_TYPE2)));
624
625   Colls::gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts, disps, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
626
627   TRACE_smpi_comm_out(Actor::self()->getPid());
628   log_timed_action (action, clock);
629 }
630
631 static void action_scatterv(const char* const* action)
632 {
633   /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
634        0 gather 68 10 10 10 68 0 0 0
635      where:
636        1) 68 10 10 10 is the sendcounts
637        2) 68 is the recvcount
638        3) 0 is the root node
639        4) 0 is the send datatype id, see decode_datatype()
640        5) 0 is the recv datatype id, see decode_datatype()
641   */
642   double clock  = smpi_process()->simulated_elapsed();
643   int comm_size = MPI_COMM_WORLD->size();
644   CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
645   int recv_size = parse_double(action[2 + comm_size]);
646   int disps[comm_size];
647   int sendcounts[comm_size];
648   int send_sum = 0;
649
650   MPI_CURRENT_TYPE =
651       (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
652   MPI_Datatype MPI_CURRENT_TYPE2{
653       (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
654
655   void* send = nullptr;
656   void* recv = smpi_get_tmp_recvbuffer(recv_size * MPI_CURRENT_TYPE->size());
657   for (int i = 0; i < comm_size; i++) {
658     sendcounts[i] = atoi(action[i + 2]);
659     send_sum += sendcounts[i];
660     disps[i] = 0;
661   }
662
663   int root = (action[3 + comm_size]) ? atoi(action[3 + comm_size]) : 0;
664   int rank = MPI_COMM_WORLD->rank();
665
666   if (rank == root)
667     send = smpi_get_tmp_sendbuffer(send_sum * MPI_CURRENT_TYPE2->size());
668
669   std::vector<int>* trace_sendcounts = new std::vector<int>(sendcounts, sendcounts + comm_size);
670
671   TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData(
672                                              "gatherV", root, -1, trace_sendcounts, recv_size, nullptr,
673                                              encode_datatype(MPI_CURRENT_TYPE), encode_datatype(MPI_CURRENT_TYPE2)));
674
675   Colls::scatterv(send, sendcounts, disps, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
676
677   TRACE_smpi_comm_out(Actor::self()->getPid());
678   log_timed_action(action, clock);
679 }
680
681 static void action_reducescatter(const char *const *action) {
682  /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
683       0 reduceScatter 275427 275427 275427 204020 11346849 0
684     where:
685       1) The first four values after the name of the action declare the recvcounts array
686       2) The value 11346849 is the amount of instructions
687       3) The last value corresponds to the datatype, see decode_datatype().
688 */
689   double clock = smpi_process()->simulated_elapsed();
690   int comm_size = MPI_COMM_WORLD->size();
691   CHECK_ACTION_PARAMS(action, comm_size+1, 1)
692   int comp_size = parse_double(action[2+comm_size]);
693   int recvcounts[comm_size];
694   int my_proc_id                     = Actor::self()->getPid();
695   int size = 0;
696   std::vector<int>* trace_recvcounts = new std::vector<int>;
697   MPI_CURRENT_TYPE = (action[3 + comm_size]) ? decode_datatype(action[3 + comm_size]) : MPI_DEFAULT_TYPE;
698
699   for(int i=0;i<comm_size;i++) {
700     recvcounts[i] = atoi(action[i+2]);
701     trace_recvcounts->push_back(recvcounts[i]);
702     size+=recvcounts[i];
703   }
704
705   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
706                      new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, trace_recvcounts,
707                                                        std::to_string(comp_size), /* ugly hack to print comp_size */
708                                                        encode_datatype(MPI_CURRENT_TYPE)));
709
710   void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
711   void *recvbuf = smpi_get_tmp_recvbuffer(size* MPI_CURRENT_TYPE->size());
712
713   Colls::reduce_scatter(sendbuf, recvbuf, recvcounts, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
714   smpi_execute_flops(comp_size);
715
716   TRACE_smpi_comm_out(my_proc_id);
717   log_timed_action (action, clock);
718 }
719
720 static void action_allgather(const char *const *action) {
721   /* The structure of the allgather action for the rank 0 (total 4 processes) is the following:
722         0 allGather 275427 275427
723     where:
724         1) 275427 is the sendcount
725         2) 275427 is the recvcount
726         3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
727   */
728   double clock = smpi_process()->simulated_elapsed();
729
730   CHECK_ACTION_PARAMS(action, 2, 2)
731   int sendcount=atoi(action[2]);
732   int recvcount=atoi(action[3]);
733
734   MPI_CURRENT_TYPE = (action[4] && action[5]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
735   MPI_Datatype MPI_CURRENT_TYPE2{(action[4] && action[5]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE};
736
737   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
738   void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* MPI_CURRENT_TYPE2->size());
739
740   int my_proc_id = Actor::self()->getPid();
741
742   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
743                      new simgrid::instr::CollTIData("allGather", -1, -1.0, sendcount, recvcount,
744                                                     encode_datatype(MPI_CURRENT_TYPE),
745                                                     encode_datatype(MPI_CURRENT_TYPE2)));
746
747   Colls::allgather(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
748
749   TRACE_smpi_comm_out(my_proc_id);
750   log_timed_action (action, clock);
751 }
752
753 static void action_allgatherv(const char *const *action) {
754   /* The structure of the allgatherv action for the rank 0 (total 4 processes) is the following:
755         0 allGatherV 275427 275427 275427 275427 204020
756      where:
757         1) 275427 is the sendcount
758         2) The next four elements declare the recvcounts array
759         3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
760   */
761   double clock = smpi_process()->simulated_elapsed();
762
763   int comm_size = MPI_COMM_WORLD->size();
764   CHECK_ACTION_PARAMS(action, comm_size+1, 2)
765   int sendcount=atoi(action[2]);
766   int recvcounts[comm_size];
767   int disps[comm_size];
768   int recv_sum=0;
769
770   MPI_CURRENT_TYPE =
771       (action[3 + comm_size] && action[4 + comm_size]) ? decode_datatype(action[3 + comm_size]) : MPI_DEFAULT_TYPE;
772   MPI_Datatype MPI_CURRENT_TYPE2{
773       (action[3 + comm_size] && action[4 + comm_size]) ? decode_datatype(action[4 + comm_size]) : MPI_DEFAULT_TYPE};
774
775   void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
776
777   for(int i=0;i<comm_size;i++) {
778     recvcounts[i] = atoi(action[i+3]);
779     recv_sum=recv_sum+recvcounts[i];
780     disps[i] = 0;
781   }
782   void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
783
784   int my_proc_id = Actor::self()->getPid();
785
786   std::vector<int>* trace_recvcounts = new std::vector<int>(recvcounts, recvcounts + comm_size);
787
788   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
789                      new simgrid::instr::VarCollTIData("allGatherV", -1, sendcount, nullptr, -1, trace_recvcounts,
790                                                        encode_datatype(MPI_CURRENT_TYPE),
791                                                        encode_datatype(MPI_CURRENT_TYPE2)));
792
793   Colls::allgatherv(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts, disps, MPI_CURRENT_TYPE2,
794                           MPI_COMM_WORLD);
795
796   TRACE_smpi_comm_out(my_proc_id);
797   log_timed_action (action, clock);
798 }
799
800 static void action_allToAllv(const char *const *action) {
801   /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
802         0 allToAllV 100 1 7 10 12 100 1 70 10 5
803      where:
804         1) 100 is the size of the send buffer *sizeof(int),
805         2) 1 7 10 12 is the sendcounts array
806         3) 100*sizeof(int) is the size of the receiver buffer
807         4)  1 70 10 5 is the recvcounts array
808   */
809   double clock = smpi_process()->simulated_elapsed();
810
811   int comm_size = MPI_COMM_WORLD->size();
812   CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
813   int send_size = 0;
814   int recv_size = 0;
815   int sendcounts[comm_size];
816   std::vector<int>* trace_sendcounts = new std::vector<int>;
817   int recvcounts[comm_size];
818   std::vector<int>* trace_recvcounts = new std::vector<int>;
819   int senddisps[comm_size];
820   int recvdisps[comm_size];
821
822   MPI_CURRENT_TYPE = (action[4 + 2 * comm_size] && action[5 + 2 * comm_size])
823                          ? decode_datatype(action[4 + 2 * comm_size])
824                          : MPI_DEFAULT_TYPE;
825   MPI_Datatype MPI_CURRENT_TYPE2{(action[4 + 2 * comm_size] && action[5 + 2 * comm_size])
826                                      ? decode_datatype(action[5 + 2 * comm_size])
827                                      : MPI_DEFAULT_TYPE};
828
829   int send_buf_size=parse_double(action[2]);
830   int recv_buf_size=parse_double(action[3+comm_size]);
831   int my_proc_id = Actor::self()->getPid();
832   void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* MPI_CURRENT_TYPE->size());
833   void *recvbuf  = smpi_get_tmp_recvbuffer(recv_buf_size* MPI_CURRENT_TYPE2->size());
834
835   for(int i=0;i<comm_size;i++) {
836     sendcounts[i] = atoi(action[i+3]);
837     trace_sendcounts->push_back(sendcounts[i]);
838     send_size += sendcounts[i];
839     recvcounts[i] = atoi(action[i+4+comm_size]);
840     trace_recvcounts->push_back(recvcounts[i]);
841     recv_size += recvcounts[i];
842     senddisps[i] = 0;
843     recvdisps[i] = 0;
844   }
845
846   TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
847                      new simgrid::instr::VarCollTIData("allToAllV", -1, send_size, trace_sendcounts, recv_size,
848                                                        trace_recvcounts, encode_datatype(MPI_CURRENT_TYPE),
849                                                        encode_datatype(MPI_CURRENT_TYPE2)));
850
851   Colls::alltoallv(sendbuf, sendcounts, senddisps, MPI_CURRENT_TYPE,recvbuf, recvcounts, recvdisps,
852                          MPI_CURRENT_TYPE, MPI_COMM_WORLD);
853
854   TRACE_smpi_comm_out(my_proc_id);
855   log_timed_action (action, clock);
856 }
857
858 }} // namespace simgrid::smpi
859
860 /** @brief Only initialize the replay, don't do it for real */
861 void smpi_replay_init(int* argc, char*** argv)
862 {
863   simgrid::smpi::Process::init(argc, argv);
864   smpi_process()->mark_as_initialized();
865   smpi_process()->set_replaying(true);
866
867   int my_proc_id = Actor::self()->getPid();
868   TRACE_smpi_init(my_proc_id);
869   TRACE_smpi_computing_init(my_proc_id);
870   TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
871   TRACE_smpi_comm_out(my_proc_id);
872   xbt_replay_action_register("init",       simgrid::smpi::action_init);
873   xbt_replay_action_register("finalize",   simgrid::smpi::action_finalize);
874   xbt_replay_action_register("comm_size",  simgrid::smpi::action_comm_size);
875   xbt_replay_action_register("comm_split", simgrid::smpi::action_comm_split);
876   xbt_replay_action_register("comm_dup",   simgrid::smpi::action_comm_dup);
877   xbt_replay_action_register("send",       simgrid::smpi::action_send);
878   xbt_replay_action_register("Isend",      simgrid::smpi::action_Isend);
879   xbt_replay_action_register("recv",       simgrid::smpi::action_recv);
880   xbt_replay_action_register("Irecv",      simgrid::smpi::action_Irecv);
881   xbt_replay_action_register("test",       simgrid::smpi::action_test);
882   xbt_replay_action_register("wait",       simgrid::smpi::action_wait);
883   xbt_replay_action_register("waitAll",    simgrid::smpi::action_waitall);
884   xbt_replay_action_register("barrier",    simgrid::smpi::action_barrier);
885   xbt_replay_action_register("bcast",      simgrid::smpi::action_bcast);
886   xbt_replay_action_register("reduce",     simgrid::smpi::action_reduce);
887   xbt_replay_action_register("allReduce",  simgrid::smpi::action_allReduce);
888   xbt_replay_action_register("allToAll",   simgrid::smpi::action_allToAll);
889   xbt_replay_action_register("allToAllV",  simgrid::smpi::action_allToAllv);
890   xbt_replay_action_register("gather",     simgrid::smpi::action_gather);
891   xbt_replay_action_register("scatter", simgrid::smpi::action_scatter);
892   xbt_replay_action_register("gatherV",    simgrid::smpi::action_gatherv);
893   xbt_replay_action_register("scatterV", simgrid::smpi::action_scatterv);
894   xbt_replay_action_register("allGather",  simgrid::smpi::action_allgather);
895   xbt_replay_action_register("allGatherV", simgrid::smpi::action_allgatherv);
896   xbt_replay_action_register("reduceScatter",  simgrid::smpi::action_reducescatter);
897   xbt_replay_action_register("compute",    simgrid::smpi::action_compute);
898
899   //if we have a delayed start, sleep here.
900   if(*argc>2){
901     double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
902     XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
903     smpi_execute_flops(value);
904   } else {
905     //UGLY: force a context switch to be sure that all MSG_processes begin initialization
906     XBT_DEBUG("Force context switch by smpi_execute_flops  - Sleeping for 0.0 flops ");
907     smpi_execute_flops(0.0);
908   }
909 }
910
911 /** @brief actually run the replay after initialization */
912 void smpi_replay_main(int* argc, char*** argv)
913 {
914   simgrid::xbt::replay_runner(*argc, *argv);
915
916   /* and now, finalize everything */
917   /* One active process will stop. Decrease the counter*/
918   XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
919   if (not get_reqq_self()->empty()) {
920     unsigned int count_requests=get_reqq_self()->size();
921     MPI_Request requests[count_requests];
922     MPI_Status status[count_requests];
923     unsigned int i=0;
924
925     for (auto const& req : *get_reqq_self()) {
926       requests[i] = req;
927       i++;
928     }
929     simgrid::smpi::Request::waitall(count_requests, requests, status);
930   }
931   delete get_reqq_self();
932   active_processes--;
933
934   if(active_processes==0){
935     /* Last process alive speaking: end the simulated timer */
936     XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
937     xbt_free(sendbuffer);
938     xbt_free(recvbuffer);
939   }
940
941   TRACE_smpi_comm_in(Actor::self()->getPid(), "smpi_replay_run_finalize", new simgrid::instr::NoOpTIData("finalize"));
942
943   smpi_process()->finalize();
944
945   TRACE_smpi_comm_out(Actor::self()->getPid());
946   TRACE_smpi_finalize(Actor::self()->getPid());
947 }
948
949 /** @brief chain a replay initialization and a replay start */
950 void smpi_replay_run(int* argc, char*** argv)
951 {
952   smpi_replay_init(argc, argv);
953   smpi_replay_main(argc, argv);
954 }