Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
71db9bfb316f1cc5025dc0b75fbced9917e04cc5
[simgrid.git] / src / smpi / smpi_replay.c
1 /* Copyright (c) 2009-2013. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7
8 #include "private.h"
9 #include <stdio.h>
10 #include <xbt.h>
11 #include <xbt/replay.h>
12
13 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
14
15 int communicator_size = 0;
16 static int active_processes = 0;
17 xbt_dynar_t *reqq = NULL;
18
19 MPI_Datatype MPI_DEFAULT_TYPE;
20 MPI_Datatype MPI_CURRENT_TYPE;
21
22 static void log_timed_action (const char *const *action, double clock){
23   if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
24     char *name = xbt_str_join_array(action, " ");
25     XBT_VERB("%s %f", name, smpi_process_simulated_elapsed()-clock);
26     free(name);
27   }
28 }
29
30 /* Helper function */
31 static double parse_double(const char *string)
32 {
33   double value;
34   char *endptr;
35   value = strtod(string, &endptr);
36   if (*endptr != '\0')
37     THROWF(unknown_error, 0, "%s is not a double", string);
38   return value;
39 }
40
41 static MPI_Datatype decode_datatype(const char *const action)
42 {
43 // Declared datatypes,
44
45   switch(atoi(action))
46   {
47     case 0:
48       MPI_CURRENT_TYPE=MPI_DOUBLE;
49       break;
50     case 1:
51       MPI_CURRENT_TYPE=MPI_INT;
52       break;
53     case 2:
54       MPI_CURRENT_TYPE=MPI_CHAR;
55       break;
56     case 3:
57       MPI_CURRENT_TYPE=MPI_SHORT;
58       break;
59     case 4:
60       MPI_CURRENT_TYPE=MPI_LONG;
61       break;
62     case 5:
63       MPI_CURRENT_TYPE=MPI_FLOAT;
64       break;
65     case 6:
66       MPI_CURRENT_TYPE=MPI_BYTE;
67       break;
68     default:
69       MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
70
71   }
72    return MPI_CURRENT_TYPE;
73 }
74
75
76 const char* encode_datatype(MPI_Datatype datatype)
77 {
78
79   //default type for output is set to MPI_BYTE
80   // MPI_DEFAULT_TYPE is not set for output, use directly MPI_BYTE
81   if (datatype==MPI_BYTE){
82       return "";
83   }
84   if(datatype==MPI_DOUBLE)
85       return "0";
86   if(datatype==MPI_INT)
87       return "1";
88   if(datatype==MPI_CHAR)
89       return "2";
90   if(datatype==MPI_SHORT)
91       return "3";
92   if(datatype==MPI_LONG)
93     return "4";
94   if(datatype==MPI_FLOAT)
95       return "5";
96
97   // default - not implemented.
98   // do not warn here as we pass in this function even for other trace formats
99   return "-1";
100 }
101
102 static void action_init(const char *const *action)
103 {
104   int i;
105   XBT_DEBUG("Initialize the counters");
106
107   if(action[2]) MPI_DEFAULT_TYPE= MPI_DOUBLE; // default MPE dataype 
108   else MPI_DEFAULT_TYPE= MPI_BYTE; // default TAU datatype
109
110   /* start a simulated timer */
111   smpi_process_simulated_start();
112   /*initialize the number of active processes */
113   active_processes = smpi_process_count();
114
115   if (!reqq) {
116     reqq=xbt_new0(xbt_dynar_t,active_processes);
117
118     for(i=0;i<active_processes;i++){
119       reqq[i]=xbt_dynar_new(sizeof(MPI_Request),&xbt_free_ref);
120     }
121   }
122 }
123
124 static void action_finalize(const char *const *action)
125 {
126 }
127
128 static void action_comm_size(const char *const *action)
129 {
130   double clock = smpi_process_simulated_elapsed();
131
132   communicator_size = parse_double(action[2]);
133   log_timed_action (action, clock);
134 }
135
136 static void action_comm_split(const char *const *action)
137 {
138   double clock = smpi_process_simulated_elapsed();
139
140   log_timed_action (action, clock);
141 }
142
143 static void action_comm_dup(const char *const *action)
144 {
145   double clock = smpi_process_simulated_elapsed();
146
147   log_timed_action (action, clock);
148 }
149
150 static void action_compute(const char *const *action)
151 {
152   double clock = smpi_process_simulated_elapsed();
153   double flops= parse_double(action[2]);
154 #ifdef HAVE_TRACING
155   int rank = smpi_comm_rank(MPI_COMM_WORLD);
156   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
157   extra->type=TRACING_COMPUTING;
158   extra->comp_size=flops;
159   TRACE_smpi_computing_in(rank, extra);
160 #endif
161   smpi_execute_flops(flops);
162 #ifdef HAVE_TRACING
163   TRACE_smpi_computing_out(rank);
164 #endif
165
166   log_timed_action (action, clock);
167 }
168
169 static void action_send(const char *const *action)
170 {
171   int to = atoi(action[2]);
172   double size=parse_double(action[3]);
173   double clock = smpi_process_simulated_elapsed();
174
175   if(action[4]) {
176     MPI_CURRENT_TYPE=decode_datatype(action[4]);
177   } else {
178     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
179   }
180
181 #ifdef HAVE_TRACING
182   int rank = smpi_comm_rank(MPI_COMM_WORLD);
183
184   int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
185   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
186   extra->type = TRACING_SEND;
187   extra->send_size = size;
188   extra->src = rank;
189   extra->dst = dst_traced;
190   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
191   TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
192   TRACE_smpi_send(rank, rank, dst_traced, size*smpi_datatype_size(MPI_CURRENT_TYPE));
193 #endif
194
195   smpi_mpi_send(NULL, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
196
197   log_timed_action (action, clock);
198
199   #ifdef HAVE_TRACING
200   TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
201 #endif
202
203 }
204
205 static void action_Isend(const char *const *action)
206 {
207   int to = atoi(action[2]);
208   double size=parse_double(action[3]);
209   double clock = smpi_process_simulated_elapsed();
210   MPI_Request request;
211
212   if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
213   else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
214
215 #ifdef HAVE_TRACING
216   int rank = smpi_comm_rank(MPI_COMM_WORLD);
217   int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
218   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
219   extra->type = TRACING_ISEND;
220   extra->send_size = size;
221   extra->src = rank;
222   extra->dst = dst_traced;
223   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
224   TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
225   TRACE_smpi_send(rank, rank, dst_traced, size*smpi_datatype_size(MPI_CURRENT_TYPE));
226 #endif
227
228   request = smpi_mpi_isend(NULL, size, MPI_CURRENT_TYPE, to, 0,MPI_COMM_WORLD);
229
230 #ifdef HAVE_TRACING
231   TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
232   request->send = 1;
233 #endif
234
235   xbt_dynar_push(reqq[smpi_comm_rank(MPI_COMM_WORLD)],&request);
236
237   log_timed_action (action, clock);
238 }
239
240 static void action_recv(const char *const *action) {
241   int from = atoi(action[2]);
242   double size=parse_double(action[3]);
243   double clock = smpi_process_simulated_elapsed();
244   MPI_Status status;
245
246   if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
247   else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
248
249 #ifdef HAVE_TRACING
250   int rank = smpi_comm_rank(MPI_COMM_WORLD);
251   int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
252
253   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
254   extra->type = TRACING_RECV;
255   extra->send_size = size;
256   extra->src = src_traced;
257   extra->dst = rank;
258   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
259   TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
260 #endif
261
262   //unknow size from the receiver pov
263   if(size==-1){
264       smpi_mpi_probe(from, 0, MPI_COMM_WORLD, &status);
265       size=status.count;
266   }
267
268   smpi_mpi_recv(NULL, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
269
270 #ifdef HAVE_TRACING
271   TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
272   TRACE_smpi_recv(rank, src_traced, rank);
273 #endif
274
275   log_timed_action (action, clock);
276 }
277
278 static void action_Irecv(const char *const *action)
279 {
280   int from = atoi(action[2]);
281   double size=parse_double(action[3]);
282   double clock = smpi_process_simulated_elapsed();
283   MPI_Request request;
284
285   if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
286   else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
287
288 #ifdef HAVE_TRACING
289   int rank = smpi_comm_rank(MPI_COMM_WORLD);
290   int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
291   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
292   extra->type = TRACING_IRECV;
293   extra->send_size = size;
294   extra->src = src_traced;
295   extra->dst = rank;
296   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
297   TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
298 #endif
299   MPI_Status status;
300   //unknow size from the receiver pov
301   if(size==-1){
302       smpi_mpi_probe(from, 0, MPI_COMM_WORLD, &status);
303       size=status.count;
304   }
305
306   request = smpi_mpi_irecv(NULL, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
307
308 #ifdef HAVE_TRACING
309   TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
310   request->recv = 1;
311 #endif
312   xbt_dynar_push(reqq[smpi_comm_rank(MPI_COMM_WORLD)],&request);
313
314   log_timed_action (action, clock);
315 }
316
317 static void action_wait(const char *const *action){
318   double clock = smpi_process_simulated_elapsed();
319   MPI_Request request;
320   MPI_Status status;
321
322   xbt_assert(xbt_dynar_length(reqq[smpi_comm_rank(MPI_COMM_WORLD)]),
323       "action wait not preceded by any irecv or isend: %s",
324       xbt_str_join_array(action," "));
325   request = xbt_dynar_pop_as(reqq[smpi_comm_rank(MPI_COMM_WORLD)],MPI_Request);
326   xbt_assert(request != NULL, "found null request in reqq");
327 #ifdef HAVE_TRACING
328   int rank = request->comm != MPI_COMM_NULL
329       ? smpi_comm_rank(request->comm)
330       : -1;
331
332   MPI_Group group = smpi_comm_group(request->comm);
333   int src_traced = smpi_group_rank(group, request->src);
334   int dst_traced = smpi_group_rank(group, request->dst);
335   int is_wait_for_receive = request->recv;
336   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
337   extra->type = TRACING_WAIT;
338   TRACE_smpi_ptp_in(rank, src_traced, dst_traced, __FUNCTION__, extra);
339 #endif
340   smpi_mpi_wait(&request, &status);
341 #ifdef HAVE_TRACING
342   TRACE_smpi_ptp_out(rank, src_traced, dst_traced, __FUNCTION__);
343   if (is_wait_for_receive) {
344     TRACE_smpi_recv(rank, src_traced, dst_traced);
345   }
346 #endif
347
348   log_timed_action (action, clock);
349 }
350
351 static void action_waitall(const char *const *action){
352   double clock = smpi_process_simulated_elapsed();
353   int count_requests=0;
354   unsigned int i=0;
355
356   count_requests=xbt_dynar_length(reqq[smpi_comm_rank(MPI_COMM_WORLD)]);
357
358   if (count_requests>0) {
359     MPI_Request requests[count_requests];
360     MPI_Status status[count_requests];
361
362     /*  The reqq is an array of dynars. Its index corresponds to the rank.
363      Thus each rank saves its own requests to the array request. */
364     xbt_dynar_foreach(reqq[smpi_comm_rank(MPI_COMM_WORLD)],i,requests[i]); 
365
366   #ifdef HAVE_TRACING
367    //save information from requests
368
369    xbt_dynar_t srcs = xbt_dynar_new(sizeof(int), NULL);
370    xbt_dynar_t dsts = xbt_dynar_new(sizeof(int), NULL);
371    xbt_dynar_t recvs = xbt_dynar_new(sizeof(int), NULL);
372    for (i = 0; i < count_requests; i++) {
373     if(requests[i]){
374       int *asrc = xbt_new(int, 1);
375       int *adst = xbt_new(int, 1);
376       int *arecv = xbt_new(int, 1);
377       *asrc = requests[i]->src;
378       *adst = requests[i]->dst;
379       *arecv = requests[i]->recv;
380       xbt_dynar_insert_at(srcs, i, asrc);
381       xbt_dynar_insert_at(dsts, i, adst);
382       xbt_dynar_insert_at(recvs, i, arecv);
383       xbt_free(asrc);
384       xbt_free(adst);
385       xbt_free(arecv);
386     }else {
387       int *t = xbt_new(int, 1);
388       xbt_dynar_insert_at(srcs, i, t);
389       xbt_dynar_insert_at(dsts, i, t);
390       xbt_dynar_insert_at(recvs, i, t);
391       xbt_free(t);
392     }
393    }
394    int rank_traced = smpi_process_index();
395    instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
396    extra->type = TRACING_WAITALL;
397    extra->send_size=count_requests;
398    TRACE_smpi_ptp_in(rank_traced, -1, -1, __FUNCTION__,extra);
399  #endif
400
401     smpi_mpi_waitall(count_requests, requests, status);
402
403   #ifdef HAVE_TRACING
404    for (i = 0; i < count_requests; i++) {
405     int src_traced, dst_traced, is_wait_for_receive;
406     xbt_dynar_get_cpy(srcs, i, &src_traced);
407     xbt_dynar_get_cpy(dsts, i, &dst_traced);
408     xbt_dynar_get_cpy(recvs, i, &is_wait_for_receive);
409     if (is_wait_for_receive) {
410       TRACE_smpi_recv(rank_traced, src_traced, dst_traced);
411     }
412    }
413    TRACE_smpi_ptp_out(rank_traced, -1, -1, __FUNCTION__);
414    //clean-up of dynars
415    xbt_dynar_free(&srcs);
416    xbt_dynar_free(&dsts);
417    xbt_dynar_free(&recvs);
418   #endif
419
420    xbt_dynar_free_container(&(reqq[smpi_comm_rank(MPI_COMM_WORLD)]));
421   }
422   log_timed_action (action, clock);
423 }
424
425 static void action_barrier(const char *const *action){
426   double clock = smpi_process_simulated_elapsed();
427 #ifdef HAVE_TRACING
428   int rank = smpi_comm_rank(MPI_COMM_WORLD);
429   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
430   extra->type = TRACING_BARRIER;
431   TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
432 #endif
433   smpi_mpi_barrier(MPI_COMM_WORLD);
434 #ifdef HAVE_TRACING
435   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
436 #endif
437
438   log_timed_action (action, clock);
439 }
440
441
442 static void action_bcast(const char *const *action)
443 {
444   double size = parse_double(action[2]);
445   double clock = smpi_process_simulated_elapsed();
446   int root=0;
447   /*
448    * Initialize MPI_CURRENT_TYPE in order to decrease
449    * the number of the checks
450    * */
451   MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;  
452
453   if(action[3]) {
454     root= atoi(action[3]);
455     if(action[4]) {
456       MPI_CURRENT_TYPE=decode_datatype(action[4]);   
457     }
458   }
459
460 #ifdef HAVE_TRACING
461   int rank = smpi_comm_rank(MPI_COMM_WORLD);
462   int root_traced = smpi_group_index(smpi_comm_group(MPI_COMM_WORLD), root);
463
464   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
465   extra->type = TRACING_BCAST;
466   extra->send_size = size;
467   extra->root = root_traced;
468   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
469   TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__, extra);
470
471 #endif
472
473   mpi_coll_bcast_fun(NULL, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
474 #ifdef HAVE_TRACING
475   TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
476 #endif
477
478   log_timed_action (action, clock);
479 }
480
481 static void action_reduce(const char *const *action)
482 {
483   double comm_size = parse_double(action[2]);
484   double comp_size = parse_double(action[3]);
485   double clock = smpi_process_simulated_elapsed();
486   int root=0;
487   MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
488
489   if(action[4]) {
490     root= atoi(action[4]);
491     if(action[5]) {
492       MPI_CURRENT_TYPE=decode_datatype(action[5]);
493     }
494   }
495
496 #ifdef HAVE_TRACING
497   int rank = smpi_comm_rank(MPI_COMM_WORLD);
498   int root_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), root);
499   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
500   extra->type = TRACING_REDUCE;
501   extra->send_size = comm_size;
502   extra->comp_size = comp_size;
503   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
504   extra->root = root_traced;
505
506   TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__,extra);
507 #endif
508    mpi_coll_reduce_fun(NULL, NULL, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
509    smpi_execute_flops(comp_size);
510 #ifdef HAVE_TRACING
511   TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
512 #endif
513
514   log_timed_action (action, clock);
515 }
516
517 static void action_allReduce(const char *const *action) {
518   double comm_size = parse_double(action[2]);
519   double comp_size = parse_double(action[3]);
520
521   if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
522   else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
523
524   double clock = smpi_process_simulated_elapsed();
525 #ifdef HAVE_TRACING
526   int rank = smpi_comm_rank(MPI_COMM_WORLD);
527   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
528   extra->type = TRACING_ALLREDUCE;
529   extra->send_size = comm_size;
530   extra->comp_size = comp_size;
531   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
532
533   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
534 #endif
535   mpi_coll_reduce_fun(NULL, NULL, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, 0, MPI_COMM_WORLD);
536   smpi_execute_flops(comp_size);
537   mpi_coll_bcast_fun(NULL, comm_size, MPI_CURRENT_TYPE, 0, MPI_COMM_WORLD);
538 #ifdef HAVE_TRACING
539   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
540 #endif
541
542   log_timed_action (action, clock);
543 }
544
545 static void action_allToAll(const char *const *action) {
546   double clock = smpi_process_simulated_elapsed();
547   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
548   int send_size = parse_double(action[2]);
549   int recv_size = parse_double(action[3]);
550   MPI_Datatype MPI_CURRENT_TYPE2;
551
552   if(action[4]) {
553     MPI_CURRENT_TYPE=decode_datatype(action[4]);
554     MPI_CURRENT_TYPE2=decode_datatype(action[5]);
555   }
556   else {
557     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
558     MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
559   }
560   void *send = calloc(send_size*comm_size, smpi_datatype_size(MPI_CURRENT_TYPE));  
561   void *recv = calloc(recv_size*comm_size, smpi_datatype_size(MPI_CURRENT_TYPE2));  
562
563 #ifdef HAVE_TRACING
564   int rank = smpi_process_index();
565   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
566   extra->type = TRACING_ALLTOALL;
567   extra->send_size = send_size;
568   extra->recv_size = recv_size;
569   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
570   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
571
572   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
573 #endif
574
575   mpi_coll_alltoall_fun(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
576
577 #ifdef HAVE_TRACING
578   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
579 #endif
580
581   log_timed_action (action, clock);
582   xbt_free(send);
583   xbt_free(recv);
584 }
585
586
587 static void action_gather(const char *const *action) {
588   /*
589  The structure of the gather action for the rank 0 (total 4 processes) 
590  is the following:   
591  0 gather 68 68 0 0 0
592
593   where: 
594   1) 68 is the sendcounts
595   2) 68 is the recvcounts
596   3) 0 is the root node
597   4) 0 is the send datatype id, see decode_datatype()
598   5) 0 is the recv datatype id, see decode_datatype()
599   */
600   double clock = smpi_process_simulated_elapsed();
601   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
602   int send_size = parse_double(action[2]);
603   int recv_size = parse_double(action[3]);
604   MPI_Datatype MPI_CURRENT_TYPE2;
605   if(action[5]) {
606     MPI_CURRENT_TYPE=decode_datatype(action[5]);
607     MPI_CURRENT_TYPE2=decode_datatype(action[6]);
608   } else {
609     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
610     MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
611   }
612   void *send = calloc(send_size, smpi_datatype_size(MPI_CURRENT_TYPE));
613   void *recv = NULL;
614
615   int root=atoi(action[4]);
616   int rank = smpi_process_index();
617
618   if(rank==root)
619     recv = calloc(recv_size*comm_size, smpi_datatype_size(MPI_CURRENT_TYPE2));
620
621 #ifdef HAVE_TRACING
622   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
623   extra->type = TRACING_GATHER;
624   extra->send_size = send_size;
625   extra->recv_size = recv_size;
626   extra->root = root;
627   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
628   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
629
630   TRACE_smpi_collective_in(rank, root, __FUNCTION__, extra);
631 #endif
632 smpi_mpi_gather(send, send_size, MPI_CURRENT_TYPE,
633                 recv, recv_size, MPI_CURRENT_TYPE2,
634                 root, MPI_COMM_WORLD);
635
636 #ifdef HAVE_TRACING
637   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
638 #endif
639
640   log_timed_action (action, clock);
641   xbt_free(send);
642   xbt_free(recv);
643 }
644
645
646
647 static void action_gatherv(const char *const *action) {
648   /*
649  The structure of the gatherv action for the rank 0 (total 4 processes)
650  is the following:
651  0 gather 68 68 10 10 10 0 0 0
652
653   where:
654   1) 68 is the sendcount
655   2) 68 10 10 10 is the recvcounts
656   3) 0 is the root node
657   4) 0 is the send datatype id, see decode_datatype()
658   5) 0 is the recv datatype id, see decode_datatype()
659   */
660   double clock = smpi_process_simulated_elapsed();
661   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
662   int send_size = parse_double(action[2]);
663   int *disps = xbt_new0(int, comm_size);
664   int *recvcounts = xbt_new0(int, comm_size);
665   int i=0,recv_sum=0;
666
667   MPI_Datatype MPI_CURRENT_TYPE2;
668   if(action[4+comm_size]) {
669     MPI_CURRENT_TYPE=decode_datatype(action[4+comm_size]);
670     MPI_CURRENT_TYPE2=decode_datatype(action[5+comm_size]);
671   } else {
672     MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
673     MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
674   }
675   void *send = calloc(send_size, smpi_datatype_size(MPI_CURRENT_TYPE));
676   void *recv = NULL;
677   for(i=0;i<comm_size;i++) {
678     recvcounts[i] = atoi(action[i+3]);
679     recv_sum=recv_sum+recvcounts[i];
680     disps[i] = 0;
681   }
682
683   int root=atoi(action[3+comm_size]);
684   int rank = smpi_process_index();
685
686   if(rank==root)
687     recv = calloc(recv_sum, smpi_datatype_size(MPI_CURRENT_TYPE2));
688
689 #ifdef HAVE_TRACING
690   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
691   extra->type = TRACING_GATHERV;
692   extra->send_size = send_size;
693   extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
694   for(i=0; i< comm_size; i++)//copy data to avoid bad free
695     extra->recvcounts[i] = recvcounts[i];
696   extra->root = root;
697   extra->num_processes = comm_size;
698   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
699   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
700
701   TRACE_smpi_collective_in(rank, root, __FUNCTION__, extra);
702 #endif
703 smpi_mpi_gatherv(send, send_size, MPI_CURRENT_TYPE,
704                 recv, recvcounts, disps, MPI_CURRENT_TYPE2,
705                 root, MPI_COMM_WORLD);
706
707 #ifdef HAVE_TRACING
708   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
709 #endif
710
711   log_timed_action (action, clock);
712   xbt_free(recvcounts);
713   xbt_free(send);
714   xbt_free(recv);
715   xbt_free(disps);
716
717 }
718
719 static void action_reducescatter(const char *const *action) {
720
721     /*
722  The structure of the reducescatter action for the rank 0 (total 4 processes) 
723  is the following:   
724 0 reduceScatter 275427 275427 275427 204020 11346849 0
725
726   where: 
727   1) The first four values after the name of the action declare the recvcounts array
728   2) The value 11346849 is the amount of instructions
729   3) The last value corresponds to the datatype, see decode_datatype().
730
731   We analyze a MPI_Reduce_scatter call to one MPI_Reduce and one MPI_Scatterv.
732
733    */
734
735   double clock = smpi_process_simulated_elapsed();
736   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
737   int comp_size = parse_double(action[2+comm_size]);
738   int *recvcounts = xbt_new0(int, comm_size);  
739   int *disps = xbt_new0(int, comm_size);  
740   int i=0,recv_sum=0;
741   int root=0;
742   int rank = smpi_process_index();
743
744   if(action[3+comm_size])
745     MPI_CURRENT_TYPE=decode_datatype(action[3+comm_size]);
746   else
747     MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
748
749   for(i=0;i<comm_size;i++) {
750     recvcounts[i] = atoi(action[i+2]);
751     recv_sum=recv_sum+recvcounts[i];
752     disps[i] = 0;
753   }
754
755 #ifdef HAVE_TRACING
756   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
757   extra->type = TRACING_REDUCE_SCATTER;
758   extra->send_size = 0;
759   extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
760   for(i=0; i< comm_size; i++)//copy data to avoid bad free
761     extra->recvcounts[i] = recvcounts[i];
762   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
763   extra->comp_size = comp_size;
764   extra->num_processes = comm_size;
765
766
767   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
768 #endif
769    mpi_coll_reduce_fun(NULL, NULL, recv_sum, MPI_CURRENT_TYPE, MPI_OP_NULL,
770        root, MPI_COMM_WORLD);
771    smpi_mpi_scatterv(NULL, recvcounts, disps, MPI_CURRENT_TYPE, NULL,
772                       recvcounts[rank], MPI_CURRENT_TYPE, 0, MPI_COMM_WORLD);
773    smpi_execute_flops(comp_size);
774
775
776 #ifdef HAVE_TRACING
777   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
778 #endif
779   xbt_free(recvcounts);
780   xbt_free(disps);
781   log_timed_action (action, clock);
782 }
783
784
785 static void action_allgatherv(const char *const *action) {
786
787   /*
788  The structure of the allgatherv action for the rank 0 (total 4 processes) 
789  is the following:   
790 0 allGatherV 275427 275427 275427 275427 204020
791
792   where: 
793   1) 275427 is the sendcount
794   2) The next four elements declare the recvcounts array
795   3) No more values mean that the datatype for sent and receive buffer
796   is the default one, see decode_datatype().
797
798    */
799
800   double clock = smpi_process_simulated_elapsed();
801
802   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
803   int i=0;
804   int sendcount=atoi(action[2]);
805   int *recvcounts = xbt_new0(int, comm_size);  
806   int *disps = xbt_new0(int, comm_size);  
807   int recv_sum=0;  
808   MPI_Datatype MPI_CURRENT_TYPE2;
809
810   if(action[3+comm_size]) {
811     MPI_CURRENT_TYPE = decode_datatype(action[3+comm_size]);
812     MPI_CURRENT_TYPE2 = decode_datatype(action[4+comm_size]);
813   } else {
814     MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
815     MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;    
816   }
817   void *sendbuf = calloc(sendcount, smpi_datatype_size(MPI_CURRENT_TYPE));    
818
819   for(i=0;i<comm_size;i++) {
820     recvcounts[i] = atoi(action[i+3]);
821     recv_sum=recv_sum+recvcounts[i];
822   }
823   void *recvbuf = calloc(recv_sum, smpi_datatype_size(MPI_CURRENT_TYPE2));  
824
825 #ifdef HAVE_TRACING
826   int rank = smpi_process_index();
827   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
828   extra->type = TRACING_ALLGATHERV;
829   extra->send_size = sendcount;
830   extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
831   for(i=0; i< comm_size; i++)//copy data to avoid bad free
832     extra->recvcounts[i] = recvcounts[i];
833   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
834   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
835   extra->num_processes = comm_size;
836
837   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
838 #endif
839
840 mpi_coll_allgatherv_fun(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts, disps, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
841
842 #ifdef HAVE_TRACING
843   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
844 #endif
845
846   log_timed_action (action, clock);
847   xbt_free(sendbuf);
848   xbt_free(recvbuf);
849   xbt_free(recvcounts);
850   xbt_free(disps);
851 }
852
853
854 static void action_allToAllv(const char *const *action) {
855   /*
856  The structure of the allToAllV action for the rank 0 (total 4 processes) 
857  is the following:   
858   0 allToAllV 100 1 7 10 12 100 1 70 10 5
859
860   where: 
861   1) 100 is the size of the send buffer *sizeof(int),
862   2) 1 7 10 12 is the sendcounts array
863   3) 100*sizeof(int) is the size of the receiver buffer
864   4)  1 70 10 5 is the recvcounts array
865
866    */
867
868
869   double clock = smpi_process_simulated_elapsed();
870
871   int comm_size = smpi_comm_size(MPI_COMM_WORLD);
872   int send_buf_size=0,recv_buf_size=0,i=0;
873   int *sendcounts = xbt_new0(int, comm_size);  
874   int *recvcounts = xbt_new0(int, comm_size);  
875   int *senddisps = xbt_new0(int, comm_size);  
876   int *recvdisps = xbt_new0(int, comm_size);  
877
878   MPI_Datatype MPI_CURRENT_TYPE2;
879
880   send_buf_size=parse_double(action[2]);
881   recv_buf_size=parse_double(action[3+comm_size]);
882   if(action[4+2*comm_size]) {
883     MPI_CURRENT_TYPE=decode_datatype(action[4+2*comm_size]);
884     MPI_CURRENT_TYPE2=decode_datatype(action[5+2*comm_size]);
885   }
886   else {
887       MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
888       MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
889   }
890
891   void *sendbuf = calloc(send_buf_size, smpi_datatype_size(MPI_CURRENT_TYPE));  
892   void *recvbuf = calloc(recv_buf_size, smpi_datatype_size(MPI_CURRENT_TYPE2));  
893
894   for(i=0;i<comm_size;i++) {
895     sendcounts[i] = atoi(action[i+3]);
896     recvcounts[i] = atoi(action[i+4+comm_size]);
897   }
898
899
900 #ifdef HAVE_TRACING
901   int rank = smpi_process_index();
902   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
903   extra->type = TRACING_ALLTOALLV;
904   extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
905   extra->sendcounts= xbt_malloc(comm_size*sizeof(int));
906   extra->num_processes = comm_size;
907
908   for(i=0; i< comm_size; i++){//copy data to avoid bad free
909     extra->send_size += sendcounts[i];
910     extra->sendcounts[i] = sendcounts[i];
911     extra->recv_size += recvcounts[i];
912     extra->recvcounts[i] = recvcounts[i];
913   }
914   extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
915   extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
916
917   TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
918 #endif
919     mpi_coll_alltoallv_fun(sendbuf, sendcounts, senddisps, MPI_CURRENT_TYPE,
920                                recvbuf, recvcounts, recvdisps, MPI_CURRENT_TYPE,
921                                MPI_COMM_WORLD);
922 #ifdef HAVE_TRACING
923   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
924 #endif
925
926   log_timed_action (action, clock);
927   xbt_free(sendbuf);
928   xbt_free(recvbuf);
929   xbt_free(sendcounts);
930   xbt_free(recvcounts);
931   xbt_free(senddisps);
932   xbt_free(recvdisps);
933 }
934
935 void smpi_replay_init(int *argc, char***argv){
936   smpi_process_init(argc, argv);
937   smpi_process_mark_as_initialized();
938 #ifdef HAVE_TRACING
939   int rank = smpi_process_index();
940   TRACE_smpi_init(rank);
941   TRACE_smpi_computing_init(rank);
942   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
943   extra->type = TRACING_INIT;
944   TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
945   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
946 #endif
947
948   if (!smpi_process_index()){
949     _xbt_replay_action_init();
950     xbt_replay_action_register("init",       action_init);
951     xbt_replay_action_register("finalize",   action_finalize);
952     xbt_replay_action_register("comm_size",  action_comm_size);
953     xbt_replay_action_register("comm_split", action_comm_split);
954     xbt_replay_action_register("comm_dup",   action_comm_dup);
955     xbt_replay_action_register("send",       action_send);
956     xbt_replay_action_register("Isend",      action_Isend);
957     xbt_replay_action_register("recv",       action_recv);
958     xbt_replay_action_register("Irecv",      action_Irecv);
959     xbt_replay_action_register("wait",       action_wait);
960     xbt_replay_action_register("waitAll",    action_waitall);
961     xbt_replay_action_register("barrier",    action_barrier);
962     xbt_replay_action_register("bcast",      action_bcast);
963     xbt_replay_action_register("reduce",     action_reduce);
964     xbt_replay_action_register("allReduce",  action_allReduce);
965     xbt_replay_action_register("allToAll",   action_allToAll);
966     xbt_replay_action_register("allToAllV",  action_allToAllv);
967     xbt_replay_action_register("gather",  action_gather);
968     xbt_replay_action_register("gatherV",  action_gatherv);
969     xbt_replay_action_register("allGatherV",  action_allgatherv);
970     xbt_replay_action_register("reduceScatter",  action_reducescatter);
971     xbt_replay_action_register("compute",    action_compute);
972   }
973
974   xbt_replay_action_runner(*argc, *argv);
975 }
976
977 int smpi_replay_finalize(){
978   double sim_time= 1.;
979   /* One active process will stop. Decrease the counter*/
980   XBT_DEBUG("There are %lu elements in reqq[*]",
981             xbt_dynar_length(reqq[smpi_comm_rank(MPI_COMM_WORLD)]));
982   if (!xbt_dynar_is_empty(reqq[smpi_comm_rank(MPI_COMM_WORLD)])){
983     int count_requests=xbt_dynar_length(reqq[smpi_comm_rank(MPI_COMM_WORLD)]);
984     MPI_Request requests[count_requests];
985     MPI_Status status[count_requests];
986     unsigned int i;
987
988     xbt_dynar_foreach(reqq[smpi_comm_rank(MPI_COMM_WORLD)],i,requests[i]);
989     smpi_mpi_waitall(count_requests, requests, status);
990     active_processes--;
991   } else {
992     active_processes--;
993   }
994
995   xbt_dynar_free_container(&(reqq[smpi_comm_rank(MPI_COMM_WORLD)]));
996
997   if(!active_processes){
998     /* Last process alive speaking */
999     /* end the simulated timer */
1000     sim_time = smpi_process_simulated_elapsed();
1001     XBT_INFO("Simulation time %f", sim_time);
1002     _xbt_replay_action_exit();
1003     xbt_free(reqq);
1004     reqq = NULL;
1005   }
1006   smpi_mpi_barrier(MPI_COMM_WORLD);
1007 #ifdef HAVE_TRACING
1008   int rank = smpi_process_index();
1009   instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
1010   extra->type = TRACING_FINALIZE;
1011   TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
1012 #endif
1013   smpi_process_finalize();
1014 #ifdef HAVE_TRACING
1015   TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
1016   TRACE_smpi_finalize(smpi_process_index());
1017 #endif
1018   smpi_process_destroy();
1019   return MPI_SUCCESS;
1020 }