1 /* Copyright (c) 2009-2013. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
11 #include <xbt/replay.h>
13 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
15 int communicator_size = 0;
16 static int active_processes = 0;
17 xbt_dynar_t *reqq = NULL;
19 MPI_Datatype MPI_DEFAULT_TYPE;
20 MPI_Datatype MPI_CURRENT_TYPE;
22 static void log_timed_action (const char *const *action, double clock){
23 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
24 char *name = xbt_str_join_array(action, " ");
25 XBT_VERB("%s %f", name, smpi_process_simulated_elapsed()-clock);
31 static double parse_double(const char *string)
35 value = strtod(string, &endptr);
37 THROWF(unknown_error, 0, "%s is not a double", string);
41 static MPI_Datatype decode_datatype(const char *const action)
43 // Declared datatypes,
48 MPI_CURRENT_TYPE=MPI_DOUBLE;
51 MPI_CURRENT_TYPE=MPI_INT;
54 MPI_CURRENT_TYPE=MPI_CHAR;
57 MPI_CURRENT_TYPE=MPI_SHORT;
60 MPI_CURRENT_TYPE=MPI_LONG;
63 MPI_CURRENT_TYPE=MPI_FLOAT;
66 MPI_CURRENT_TYPE=MPI_BYTE;
69 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
72 return MPI_CURRENT_TYPE;
76 const char* encode_datatype(MPI_Datatype datatype)
79 //default type for output is set to MPI_BYTE
80 // MPI_DEFAULT_TYPE is not set for output, use directly MPI_BYTE
81 if (datatype==MPI_BYTE){
84 if(datatype==MPI_DOUBLE)
88 if(datatype==MPI_CHAR)
90 if(datatype==MPI_SHORT)
92 if(datatype==MPI_LONG)
94 if(datatype==MPI_FLOAT)
97 // default - not implemented.
98 // do not warn here as we pass in this function even for other trace formats
102 static void action_init(const char *const *action)
105 XBT_DEBUG("Initialize the counters");
107 if(action[2]) MPI_DEFAULT_TYPE= MPI_DOUBLE; // default MPE dataype
108 else MPI_DEFAULT_TYPE= MPI_BYTE; // default TAU datatype
110 /* start a simulated timer */
111 smpi_process_simulated_start();
112 /*initialize the number of active processes */
113 active_processes = smpi_process_count();
116 reqq=xbt_new0(xbt_dynar_t,active_processes);
118 for(i=0;i<active_processes;i++){
119 reqq[i]=xbt_dynar_new(sizeof(MPI_Request),&xbt_free_ref);
124 static void action_finalize(const char *const *action)
128 static void action_comm_size(const char *const *action)
130 double clock = smpi_process_simulated_elapsed();
132 communicator_size = parse_double(action[2]);
133 log_timed_action (action, clock);
136 static void action_comm_split(const char *const *action)
138 double clock = smpi_process_simulated_elapsed();
140 log_timed_action (action, clock);
143 static void action_comm_dup(const char *const *action)
145 double clock = smpi_process_simulated_elapsed();
147 log_timed_action (action, clock);
150 static void action_compute(const char *const *action)
152 double clock = smpi_process_simulated_elapsed();
153 double flops= parse_double(action[2]);
155 int rank = smpi_comm_rank(MPI_COMM_WORLD);
156 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
157 extra->type=TRACING_COMPUTING;
158 extra->comp_size=flops;
159 TRACE_smpi_computing_in(rank, extra);
161 smpi_execute_flops(flops);
163 TRACE_smpi_computing_out(rank);
166 log_timed_action (action, clock);
169 static void action_send(const char *const *action)
171 int to = atoi(action[2]);
172 double size=parse_double(action[3]);
173 double clock = smpi_process_simulated_elapsed();
176 MPI_CURRENT_TYPE=decode_datatype(action[4]);
178 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
182 int rank = smpi_comm_rank(MPI_COMM_WORLD);
184 int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
185 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
186 extra->type = TRACING_SEND;
187 extra->send_size = size;
189 extra->dst = dst_traced;
190 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
191 TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
192 TRACE_smpi_send(rank, rank, dst_traced, size*smpi_datatype_size(MPI_CURRENT_TYPE));
195 smpi_mpi_send(NULL, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
197 log_timed_action (action, clock);
200 TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
205 static void action_Isend(const char *const *action)
207 int to = atoi(action[2]);
208 double size=parse_double(action[3]);
209 double clock = smpi_process_simulated_elapsed();
212 if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
213 else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
216 int rank = smpi_comm_rank(MPI_COMM_WORLD);
217 int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
218 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
219 extra->type = TRACING_ISEND;
220 extra->send_size = size;
222 extra->dst = dst_traced;
223 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
224 TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
225 TRACE_smpi_send(rank, rank, dst_traced, size*smpi_datatype_size(MPI_CURRENT_TYPE));
228 request = smpi_mpi_isend(NULL, size, MPI_CURRENT_TYPE, to, 0,MPI_COMM_WORLD);
231 TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
235 xbt_dynar_push(reqq[smpi_comm_rank(MPI_COMM_WORLD)],&request);
237 log_timed_action (action, clock);
240 static void action_recv(const char *const *action) {
241 int from = atoi(action[2]);
242 double size=parse_double(action[3]);
243 double clock = smpi_process_simulated_elapsed();
246 if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
247 else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
250 int rank = smpi_comm_rank(MPI_COMM_WORLD);
251 int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
253 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
254 extra->type = TRACING_RECV;
255 extra->send_size = size;
256 extra->src = src_traced;
258 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
259 TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
262 smpi_mpi_recv(NULL, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
265 TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
266 TRACE_smpi_recv(rank, src_traced, rank);
269 log_timed_action (action, clock);
272 static void action_Irecv(const char *const *action)
274 int from = atoi(action[2]);
275 double size=parse_double(action[3]);
276 double clock = smpi_process_simulated_elapsed();
279 if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
280 else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
283 int rank = smpi_comm_rank(MPI_COMM_WORLD);
284 int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
285 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
286 extra->type = TRACING_IRECV;
287 extra->send_size = size;
288 extra->src = src_traced;
290 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
291 TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
294 request = smpi_mpi_irecv(NULL, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
297 TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
300 xbt_dynar_push(reqq[smpi_comm_rank(MPI_COMM_WORLD)],&request);
302 log_timed_action (action, clock);
305 static void action_wait(const char *const *action){
306 double clock = smpi_process_simulated_elapsed();
310 xbt_assert(xbt_dynar_length(reqq[smpi_comm_rank(MPI_COMM_WORLD)]),
311 "action wait not preceded by any irecv or isend: %s",
312 xbt_str_join_array(action," "));
313 request = xbt_dynar_pop_as(reqq[smpi_comm_rank(MPI_COMM_WORLD)],MPI_Request);
314 xbt_assert(request != NULL, "found null request in reqq");
316 int rank = request->comm != MPI_COMM_NULL
317 ? smpi_comm_rank(request->comm)
320 MPI_Group group = smpi_comm_group(request->comm);
321 int src_traced = smpi_group_rank(group, request->src);
322 int dst_traced = smpi_group_rank(group, request->dst);
323 int is_wait_for_receive = request->recv;
324 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
325 extra->type = TRACING_WAIT;
326 TRACE_smpi_ptp_in(rank, src_traced, dst_traced, __FUNCTION__, extra);
328 smpi_mpi_wait(&request, &status);
330 TRACE_smpi_ptp_out(rank, src_traced, dst_traced, __FUNCTION__);
331 if (is_wait_for_receive) {
332 TRACE_smpi_recv(rank, src_traced, dst_traced);
336 log_timed_action (action, clock);
339 static void action_waitall(const char *const *action){
340 double clock = smpi_process_simulated_elapsed();
341 int count_requests=0;
344 count_requests=xbt_dynar_length(reqq[smpi_comm_rank(MPI_COMM_WORLD)]);
346 if (count_requests>0) {
347 MPI_Request requests[count_requests];
348 MPI_Status status[count_requests];
350 /* The reqq is an array of dynars. Its index corresponds to the rank.
351 Thus each rank saves its own requests to the array request. */
352 xbt_dynar_foreach(reqq[smpi_comm_rank(MPI_COMM_WORLD)],i,requests[i]);
355 //save information from requests
357 xbt_dynar_t srcs = xbt_dynar_new(sizeof(int), NULL);
358 xbt_dynar_t dsts = xbt_dynar_new(sizeof(int), NULL);
359 xbt_dynar_t recvs = xbt_dynar_new(sizeof(int), NULL);
360 for (i = 0; i < count_requests; i++) {
362 int *asrc = xbt_new(int, 1);
363 int *adst = xbt_new(int, 1);
364 int *arecv = xbt_new(int, 1);
365 *asrc = requests[i]->src;
366 *adst = requests[i]->dst;
367 *arecv = requests[i]->recv;
368 xbt_dynar_insert_at(srcs, i, asrc);
369 xbt_dynar_insert_at(dsts, i, adst);
370 xbt_dynar_insert_at(recvs, i, arecv);
375 int *t = xbt_new(int, 1);
376 xbt_dynar_insert_at(srcs, i, t);
377 xbt_dynar_insert_at(dsts, i, t);
378 xbt_dynar_insert_at(recvs, i, t);
382 int rank_traced = smpi_process_index();
383 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
384 extra->type = TRACING_WAITALL;
385 extra->send_size=count_requests;
386 TRACE_smpi_ptp_in(rank_traced, -1, -1, __FUNCTION__,extra);
389 smpi_mpi_waitall(count_requests, requests, status);
392 for (i = 0; i < count_requests; i++) {
393 int src_traced, dst_traced, is_wait_for_receive;
394 xbt_dynar_get_cpy(srcs, i, &src_traced);
395 xbt_dynar_get_cpy(dsts, i, &dst_traced);
396 xbt_dynar_get_cpy(recvs, i, &is_wait_for_receive);
397 if (is_wait_for_receive) {
398 TRACE_smpi_recv(rank_traced, src_traced, dst_traced);
401 TRACE_smpi_ptp_out(rank_traced, -1, -1, __FUNCTION__);
403 xbt_dynar_free(&srcs);
404 xbt_dynar_free(&dsts);
405 xbt_dynar_free(&recvs);
408 xbt_dynar_free_container(&(reqq[smpi_comm_rank(MPI_COMM_WORLD)]));
410 log_timed_action (action, clock);
413 static void action_barrier(const char *const *action){
414 double clock = smpi_process_simulated_elapsed();
416 int rank = smpi_comm_rank(MPI_COMM_WORLD);
417 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
418 extra->type = TRACING_BARRIER;
419 TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
421 smpi_mpi_barrier(MPI_COMM_WORLD);
423 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
426 log_timed_action (action, clock);
430 static void action_bcast(const char *const *action)
432 double size = parse_double(action[2]);
433 double clock = smpi_process_simulated_elapsed();
436 * Initialize MPI_CURRENT_TYPE in order to decrease
437 * the number of the checks
439 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
442 root= atoi(action[3]);
444 MPI_CURRENT_TYPE=decode_datatype(action[4]);
449 int rank = smpi_comm_rank(MPI_COMM_WORLD);
450 int root_traced = smpi_group_index(smpi_comm_group(MPI_COMM_WORLD), root);
452 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
453 extra->type = TRACING_BCAST;
454 extra->send_size = size;
455 extra->root = root_traced;
456 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
457 TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__, extra);
461 mpi_coll_bcast_fun(NULL, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
463 TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
466 log_timed_action (action, clock);
469 static void action_reduce(const char *const *action)
471 double comm_size = parse_double(action[2]);
472 double comp_size = parse_double(action[3]);
473 double clock = smpi_process_simulated_elapsed();
475 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
478 root= atoi(action[4]);
480 MPI_CURRENT_TYPE=decode_datatype(action[5]);
485 int rank = smpi_comm_rank(MPI_COMM_WORLD);
486 int root_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), root);
487 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
488 extra->type = TRACING_REDUCE;
489 extra->send_size = comm_size;
490 extra->comp_size = comp_size;
491 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
492 extra->root = root_traced;
494 TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__,extra);
496 mpi_coll_reduce_fun(NULL, NULL, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
497 smpi_execute_flops(comp_size);
499 TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
502 log_timed_action (action, clock);
505 static void action_allReduce(const char *const *action) {
506 double comm_size = parse_double(action[2]);
507 double comp_size = parse_double(action[3]);
509 if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
510 else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
512 double clock = smpi_process_simulated_elapsed();
514 int rank = smpi_comm_rank(MPI_COMM_WORLD);
515 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
516 extra->type = TRACING_ALLREDUCE;
517 extra->send_size = comm_size;
518 extra->comp_size = comp_size;
519 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
521 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
523 mpi_coll_reduce_fun(NULL, NULL, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, 0, MPI_COMM_WORLD);
524 smpi_execute_flops(comp_size);
525 mpi_coll_bcast_fun(NULL, comm_size, MPI_CURRENT_TYPE, 0, MPI_COMM_WORLD);
527 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
530 log_timed_action (action, clock);
533 static void action_allToAll(const char *const *action) {
534 double clock = smpi_process_simulated_elapsed();
535 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
536 int send_size = parse_double(action[2]);
537 int recv_size = parse_double(action[3]);
538 MPI_Datatype MPI_CURRENT_TYPE2;
541 MPI_CURRENT_TYPE=decode_datatype(action[4]);
542 MPI_CURRENT_TYPE2=decode_datatype(action[5]);
545 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
546 MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
548 void *send = calloc(send_size*comm_size, smpi_datatype_size(MPI_CURRENT_TYPE));
549 void *recv = calloc(recv_size*comm_size, smpi_datatype_size(MPI_CURRENT_TYPE2));
552 int rank = smpi_process_index();
553 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
554 extra->type = TRACING_ALLTOALL;
555 extra->send_size = send_size;
556 extra->recv_size = recv_size;
557 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
558 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
560 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
563 mpi_coll_alltoall_fun(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
566 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
569 log_timed_action (action, clock);
575 static void action_gather(const char *const *action) {
577 The structure of the gather action for the rank 0 (total 4 processes)
582 1) 68 is the sendcounts
583 2) 68 is the recvcounts
584 3) 0 is the root node
585 4) 0 is the send datatype id, see decode_datatype()
586 5) 0 is the recv datatype id, see decode_datatype()
588 double clock = smpi_process_simulated_elapsed();
589 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
590 int send_size = parse_double(action[2]);
591 int recv_size = parse_double(action[3]);
592 MPI_Datatype MPI_CURRENT_TYPE2;
594 MPI_CURRENT_TYPE=decode_datatype(action[5]);
595 MPI_CURRENT_TYPE2=decode_datatype(action[6]);
597 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
598 MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
600 void *send = calloc(send_size, smpi_datatype_size(MPI_CURRENT_TYPE));
603 int root=atoi(action[4]);
604 int rank = smpi_process_index();
607 recv = calloc(recv_size*comm_size, smpi_datatype_size(MPI_CURRENT_TYPE2));
610 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
611 extra->type = TRACING_GATHER;
612 extra->send_size = send_size;
613 extra->recv_size = recv_size;
615 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
616 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
618 TRACE_smpi_collective_in(rank, root, __FUNCTION__, extra);
620 smpi_mpi_gather(send, send_size, MPI_CURRENT_TYPE,
621 recv, recv_size, MPI_CURRENT_TYPE2,
622 root, MPI_COMM_WORLD);
625 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
628 log_timed_action (action, clock);
635 static void action_gatherv(const char *const *action) {
637 The structure of the gatherv action for the rank 0 (total 4 processes)
639 0 gather 68 68 10 10 10 0 0 0
642 1) 68 is the sendcount
643 2) 68 10 10 10 is the recvcounts
644 3) 0 is the root node
645 4) 0 is the send datatype id, see decode_datatype()
646 5) 0 is the recv datatype id, see decode_datatype()
648 double clock = smpi_process_simulated_elapsed();
649 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
650 int send_size = parse_double(action[2]);
651 int *disps = xbt_new0(int, comm_size);
652 int *recvcounts = xbt_new0(int, comm_size);
655 MPI_Datatype MPI_CURRENT_TYPE2;
656 if(action[4+comm_size]) {
657 MPI_CURRENT_TYPE=decode_datatype(action[4+comm_size]);
658 MPI_CURRENT_TYPE2=decode_datatype(action[5+comm_size]);
660 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
661 MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
663 void *send = calloc(send_size, smpi_datatype_size(MPI_CURRENT_TYPE));
665 for(i=0;i<comm_size;i++) {
666 recvcounts[i] = atoi(action[i+3]);
667 recv_sum=recv_sum+recvcounts[i];
671 int root=atoi(action[3+comm_size]);
672 int rank = smpi_process_index();
675 recv = calloc(recv_sum, smpi_datatype_size(MPI_CURRENT_TYPE2));
678 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
679 extra->type = TRACING_GATHERV;
680 extra->send_size = send_size;
681 extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
682 for(i=0; i< comm_size; i++)//copy data to avoid bad free
683 extra->recvcounts[i] = recvcounts[i];
685 extra->num_processes = comm_size;
686 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
687 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
689 TRACE_smpi_collective_in(rank, root, __FUNCTION__, extra);
691 smpi_mpi_gatherv(send, send_size, MPI_CURRENT_TYPE,
692 recv, recvcounts, disps, MPI_CURRENT_TYPE2,
693 root, MPI_COMM_WORLD);
696 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
699 log_timed_action (action, clock);
700 xbt_free(recvcounts);
707 static void action_reducescatter(const char *const *action) {
710 The structure of the reducescatter action for the rank 0 (total 4 processes)
712 0 reduceScatter 275427 275427 275427 204020 11346849 0
715 1) The first four values after the name of the action declare the recvcounts array
716 2) The value 11346849 is the amount of instructions
717 3) The last value corresponds to the datatype, see decode_datatype().
719 We analyze a MPI_Reduce_scatter call to one MPI_Reduce and one MPI_Scatterv.
723 double clock = smpi_process_simulated_elapsed();
724 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
725 int comp_size = parse_double(action[2+comm_size]);
726 int *recvcounts = xbt_new0(int, comm_size);
727 int *disps = xbt_new0(int, comm_size);
730 int rank = smpi_process_index();
732 if(action[3+comm_size])
733 MPI_CURRENT_TYPE=decode_datatype(action[3+comm_size]);
735 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
737 for(i=0;i<comm_size;i++) {
738 recvcounts[i] = atoi(action[i+2]);
739 recv_sum=recv_sum+recvcounts[i];
744 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
745 extra->type = TRACING_REDUCE_SCATTER;
746 extra->send_size = 0;
747 extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
748 for(i=0; i< comm_size; i++)//copy data to avoid bad free
749 extra->recvcounts[i] = recvcounts[i];
750 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
751 extra->comp_size = comp_size;
752 extra->num_processes = comm_size;
755 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
757 mpi_coll_reduce_fun(NULL, NULL, recv_sum, MPI_CURRENT_TYPE, MPI_OP_NULL,
758 root, MPI_COMM_WORLD);
759 smpi_mpi_scatterv(NULL, recvcounts, disps, MPI_CURRENT_TYPE, NULL,
760 recvcounts[rank], MPI_CURRENT_TYPE, 0, MPI_COMM_WORLD);
761 smpi_execute_flops(comp_size);
765 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
767 xbt_free(recvcounts);
769 log_timed_action (action, clock);
773 static void action_allgatherv(const char *const *action) {
776 The structure of the allgatherv action for the rank 0 (total 4 processes)
778 0 allGatherV 275427 275427 275427 275427 204020
781 1) 275427 is the sendcount
782 2) The next four elements declare the recvcounts array
783 3) No more values mean that the datatype for sent and receive buffer
784 is the default one, see decode_datatype().
788 double clock = smpi_process_simulated_elapsed();
790 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
792 int sendcount=atoi(action[2]);
793 int *recvcounts = xbt_new0(int, comm_size);
794 int *disps = xbt_new0(int, comm_size);
796 MPI_Datatype MPI_CURRENT_TYPE2;
798 if(action[3+comm_size]) {
799 MPI_CURRENT_TYPE = decode_datatype(action[3+comm_size]);
800 MPI_CURRENT_TYPE2 = decode_datatype(action[4+comm_size]);
802 MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
803 MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
805 void *sendbuf = calloc(sendcount, smpi_datatype_size(MPI_CURRENT_TYPE));
807 for(i=0;i<comm_size;i++) {
808 recvcounts[i] = atoi(action[i+3]);
809 recv_sum=recv_sum+recvcounts[i];
811 void *recvbuf = calloc(recv_sum, smpi_datatype_size(MPI_CURRENT_TYPE2));
814 int rank = smpi_process_index();
815 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
816 extra->type = TRACING_ALLGATHERV;
817 extra->send_size = sendcount;
818 extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
819 for(i=0; i< comm_size; i++)//copy data to avoid bad free
820 extra->recvcounts[i] = recvcounts[i];
821 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
822 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
823 extra->num_processes = comm_size;
825 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
828 mpi_coll_allgatherv_fun(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts, disps, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
831 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
834 log_timed_action (action, clock);
837 xbt_free(recvcounts);
842 static void action_allToAllv(const char *const *action) {
844 The structure of the allToAllV action for the rank 0 (total 4 processes)
846 0 allToAllV 100 1 7 10 12 100 1 70 10 5
849 1) 100 is the size of the send buffer *sizeof(int),
850 2) 1 7 10 12 is the sendcounts array
851 3) 100*sizeof(int) is the size of the receiver buffer
852 4) 1 70 10 5 is the recvcounts array
857 double clock = smpi_process_simulated_elapsed();
859 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
860 int send_buf_size=0,recv_buf_size=0,i=0;
861 int *sendcounts = xbt_new0(int, comm_size);
862 int *recvcounts = xbt_new0(int, comm_size);
863 int *senddisps = xbt_new0(int, comm_size);
864 int *recvdisps = xbt_new0(int, comm_size);
866 MPI_Datatype MPI_CURRENT_TYPE2;
868 send_buf_size=parse_double(action[2]);
869 recv_buf_size=parse_double(action[3+comm_size]);
870 if(action[4+2*comm_size]) {
871 MPI_CURRENT_TYPE=decode_datatype(action[4+2*comm_size]);
872 MPI_CURRENT_TYPE2=decode_datatype(action[5+2*comm_size]);
875 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
876 MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
879 void *sendbuf = calloc(send_buf_size, smpi_datatype_size(MPI_CURRENT_TYPE));
880 void *recvbuf = calloc(recv_buf_size, smpi_datatype_size(MPI_CURRENT_TYPE2));
882 for(i=0;i<comm_size;i++) {
883 sendcounts[i] = atoi(action[i+3]);
884 recvcounts[i] = atoi(action[i+4+comm_size]);
889 int rank = smpi_process_index();
890 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
891 extra->type = TRACING_ALLTOALLV;
892 extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
893 extra->sendcounts= xbt_malloc(comm_size*sizeof(int));
894 extra->num_processes = comm_size;
896 for(i=0; i< comm_size; i++){//copy data to avoid bad free
897 extra->send_size += sendcounts[i];
898 extra->sendcounts[i] = sendcounts[i];
899 extra->recv_size += recvcounts[i];
900 extra->recvcounts[i] = recvcounts[i];
902 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
903 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
905 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
907 mpi_coll_alltoallv_fun(sendbuf, sendcounts, senddisps, MPI_CURRENT_TYPE,
908 recvbuf, recvcounts, recvdisps, MPI_CURRENT_TYPE,
911 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
914 log_timed_action (action, clock);
917 xbt_free(sendcounts);
918 xbt_free(recvcounts);
923 void smpi_replay_init(int *argc, char***argv){
924 smpi_process_init(argc, argv);
925 smpi_process_mark_as_initialized();
927 int rank = smpi_process_index();
928 TRACE_smpi_init(rank);
929 TRACE_smpi_computing_init(rank);
930 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
931 extra->type = TRACING_INIT;
932 TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
933 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
936 if (!smpi_process_index()){
937 _xbt_replay_action_init();
938 xbt_replay_action_register("init", action_init);
939 xbt_replay_action_register("finalize", action_finalize);
940 xbt_replay_action_register("comm_size", action_comm_size);
941 xbt_replay_action_register("comm_split", action_comm_split);
942 xbt_replay_action_register("comm_dup", action_comm_dup);
943 xbt_replay_action_register("send", action_send);
944 xbt_replay_action_register("Isend", action_Isend);
945 xbt_replay_action_register("recv", action_recv);
946 xbt_replay_action_register("Irecv", action_Irecv);
947 xbt_replay_action_register("wait", action_wait);
948 xbt_replay_action_register("waitAll", action_waitall);
949 xbt_replay_action_register("barrier", action_barrier);
950 xbt_replay_action_register("bcast", action_bcast);
951 xbt_replay_action_register("reduce", action_reduce);
952 xbt_replay_action_register("allReduce", action_allReduce);
953 xbt_replay_action_register("allToAll", action_allToAll);
954 xbt_replay_action_register("allToAllV", action_allToAllv);
955 xbt_replay_action_register("gather", action_gather);
956 xbt_replay_action_register("gatherV", action_gatherv);
957 xbt_replay_action_register("allGatherV", action_allgatherv);
958 xbt_replay_action_register("reduceScatter", action_reducescatter);
959 xbt_replay_action_register("compute", action_compute);
962 xbt_replay_action_runner(*argc, *argv);
965 int smpi_replay_finalize(){
967 /* One active process will stop. Decrease the counter*/
968 XBT_DEBUG("There are %lu elements in reqq[*]",
969 xbt_dynar_length(reqq[smpi_comm_rank(MPI_COMM_WORLD)]));
970 if (!xbt_dynar_is_empty(reqq[smpi_comm_rank(MPI_COMM_WORLD)])){
971 int count_requests=xbt_dynar_length(reqq[smpi_comm_rank(MPI_COMM_WORLD)]);
972 MPI_Request requests[count_requests];
973 MPI_Status status[count_requests];
976 xbt_dynar_foreach(reqq[smpi_comm_rank(MPI_COMM_WORLD)],i,requests[i]);
977 smpi_mpi_waitall(count_requests, requests, status);
983 xbt_dynar_free_container(&(reqq[smpi_comm_rank(MPI_COMM_WORLD)]));
985 if(!active_processes){
986 /* Last process alive speaking */
987 /* end the simulated timer */
988 sim_time = smpi_process_simulated_elapsed();
989 XBT_INFO("Simulation time %g", sim_time);
990 _xbt_replay_action_exit();
994 smpi_mpi_barrier(MPI_COMM_WORLD);
996 int rank = smpi_process_index();
997 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
998 extra->type = TRACING_FINALIZE;
999 TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
1001 smpi_process_finalize();
1003 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
1004 TRACE_smpi_finalize(smpi_process_index());
1006 smpi_process_destroy();