1 /* Copyright (c) 2009-2013. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
11 #include <xbt/replay.h>
13 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
15 int communicator_size = 0;
16 static int active_processes = 0;
17 xbt_dynar_t *reqq = NULL;
19 MPI_Datatype MPI_DEFAULT_TYPE;
20 MPI_Datatype MPI_CURRENT_TYPE;
22 static void log_timed_action (const char *const *action, double clock){
23 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
24 char *name = xbt_str_join_array(action, " ");
25 XBT_VERB("%s %f", name, smpi_process_simulated_elapsed()-clock);
31 static double parse_double(const char *string)
35 value = strtod(string, &endptr);
37 THROWF(unknown_error, 0, "%s is not a double", string);
41 static MPI_Datatype decode_datatype(const char *const action)
43 // Declared datatypes,
48 MPI_CURRENT_TYPE=MPI_DOUBLE;
51 MPI_CURRENT_TYPE=MPI_INT;
54 MPI_CURRENT_TYPE=MPI_CHAR;
57 MPI_CURRENT_TYPE=MPI_SHORT;
60 MPI_CURRENT_TYPE=MPI_LONG;
63 MPI_CURRENT_TYPE=MPI_FLOAT;
66 MPI_CURRENT_TYPE=MPI_BYTE;
69 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
72 return MPI_CURRENT_TYPE;
76 const char* encode_datatype(MPI_Datatype datatype)
79 //default type for output is set to MPI_BYTE
80 // MPI_DEFAULT_TYPE is not set for output, use directly MPI_BYTE
81 if (datatype==MPI_BYTE){
84 if(datatype==MPI_DOUBLE)
88 if(datatype==MPI_CHAR)
90 if(datatype==MPI_SHORT)
92 if(datatype==MPI_LONG)
94 if(datatype==MPI_FLOAT)
97 // default - not implemented.
98 // do not warn here as we pass in this function even for other trace formats
102 static void action_init(const char *const *action)
105 XBT_DEBUG("Initialize the counters");
107 if(action[2]) MPI_DEFAULT_TYPE= MPI_DOUBLE; // default MPE dataype
108 else MPI_DEFAULT_TYPE= MPI_BYTE; // default TAU datatype
110 /* start a simulated timer */
111 smpi_process_simulated_start();
112 /*initialize the number of active processes */
113 active_processes = smpi_process_count();
116 reqq=xbt_new0(xbt_dynar_t,active_processes);
118 for(i=0;i<active_processes;i++){
119 reqq[i]=xbt_dynar_new(sizeof(MPI_Request),&xbt_free_ref);
124 static void action_finalize(const char *const *action)
128 static void action_comm_size(const char *const *action)
130 double clock = smpi_process_simulated_elapsed();
132 communicator_size = parse_double(action[2]);
133 log_timed_action (action, clock);
136 static void action_comm_split(const char *const *action)
138 double clock = smpi_process_simulated_elapsed();
140 log_timed_action (action, clock);
143 static void action_comm_dup(const char *const *action)
145 double clock = smpi_process_simulated_elapsed();
147 log_timed_action (action, clock);
150 static void action_compute(const char *const *action)
152 double clock = smpi_process_simulated_elapsed();
153 double flops= parse_double(action[2]);
155 int rank = smpi_comm_rank(MPI_COMM_WORLD);
156 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
157 extra->type=TRACING_COMPUTING;
158 extra->comp_size=flops;
159 TRACE_smpi_computing_in(rank, extra);
161 smpi_execute_flops(flops);
163 TRACE_smpi_computing_out(rank);
166 log_timed_action (action, clock);
169 static void action_send(const char *const *action)
171 int to = atoi(action[2]);
172 double size=parse_double(action[3]);
173 double clock = smpi_process_simulated_elapsed();
176 MPI_CURRENT_TYPE=decode_datatype(action[4]);
178 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
182 int rank = smpi_comm_rank(MPI_COMM_WORLD);
184 int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
185 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
186 extra->type = TRACING_SEND;
187 extra->send_size = size;
189 extra->dst = dst_traced;
190 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
191 TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
192 TRACE_smpi_send(rank, rank, dst_traced, size*smpi_datatype_size(MPI_CURRENT_TYPE));
195 smpi_mpi_send(NULL, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
197 log_timed_action (action, clock);
200 TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
205 static void action_Isend(const char *const *action)
207 int to = atoi(action[2]);
208 double size=parse_double(action[3]);
209 double clock = smpi_process_simulated_elapsed();
212 if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
213 else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
216 int rank = smpi_comm_rank(MPI_COMM_WORLD);
217 int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
218 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
219 extra->type = TRACING_ISEND;
220 extra->send_size = size;
222 extra->dst = dst_traced;
223 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
224 TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
225 TRACE_smpi_send(rank, rank, dst_traced, size*smpi_datatype_size(MPI_CURRENT_TYPE));
228 request = smpi_mpi_isend(NULL, size, MPI_CURRENT_TYPE, to, 0,MPI_COMM_WORLD);
231 TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
235 xbt_dynar_push(reqq[smpi_comm_rank(MPI_COMM_WORLD)],&request);
237 log_timed_action (action, clock);
240 static void action_recv(const char *const *action) {
241 int from = atoi(action[2]);
242 double size=parse_double(action[3]);
243 double clock = smpi_process_simulated_elapsed();
246 if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
247 else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
250 int rank = smpi_comm_rank(MPI_COMM_WORLD);
251 int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
253 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
254 extra->type = TRACING_RECV;
255 extra->send_size = size;
256 extra->src = src_traced;
258 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
259 TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
262 //unknow size from the receiver pov
264 smpi_mpi_probe(from, 0, MPI_COMM_WORLD, &status);
268 smpi_mpi_recv(NULL, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
271 TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
272 TRACE_smpi_recv(rank, src_traced, rank);
275 log_timed_action (action, clock);
278 static void action_Irecv(const char *const *action)
280 int from = atoi(action[2]);
281 double size=parse_double(action[3]);
282 double clock = smpi_process_simulated_elapsed();
285 if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
286 else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
289 int rank = smpi_comm_rank(MPI_COMM_WORLD);
290 int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
291 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
292 extra->type = TRACING_IRECV;
293 extra->send_size = size;
294 extra->src = src_traced;
296 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
297 TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
300 //unknow size from the receiver pov
302 smpi_mpi_probe(from, 0, MPI_COMM_WORLD, &status);
306 request = smpi_mpi_irecv(NULL, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
309 TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
312 xbt_dynar_push(reqq[smpi_comm_rank(MPI_COMM_WORLD)],&request);
314 log_timed_action (action, clock);
317 static void action_wait(const char *const *action){
318 double clock = smpi_process_simulated_elapsed();
322 xbt_assert(xbt_dynar_length(reqq[smpi_comm_rank(MPI_COMM_WORLD)]),
323 "action wait not preceded by any irecv or isend: %s",
324 xbt_str_join_array(action," "));
325 request = xbt_dynar_pop_as(reqq[smpi_comm_rank(MPI_COMM_WORLD)],MPI_Request);
326 xbt_assert(request != NULL, "found null request in reqq");
328 int rank = request->comm != MPI_COMM_NULL
329 ? smpi_comm_rank(request->comm)
332 MPI_Group group = smpi_comm_group(request->comm);
333 int src_traced = smpi_group_rank(group, request->src);
334 int dst_traced = smpi_group_rank(group, request->dst);
335 int is_wait_for_receive = request->recv;
336 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
337 extra->type = TRACING_WAIT;
338 TRACE_smpi_ptp_in(rank, src_traced, dst_traced, __FUNCTION__, extra);
340 smpi_mpi_wait(&request, &status);
342 TRACE_smpi_ptp_out(rank, src_traced, dst_traced, __FUNCTION__);
343 if (is_wait_for_receive) {
344 TRACE_smpi_recv(rank, src_traced, dst_traced);
348 log_timed_action (action, clock);
351 static void action_waitall(const char *const *action){
352 double clock = smpi_process_simulated_elapsed();
353 int count_requests=0;
356 count_requests=xbt_dynar_length(reqq[smpi_comm_rank(MPI_COMM_WORLD)]);
358 if (count_requests>0) {
359 MPI_Request requests[count_requests];
360 MPI_Status status[count_requests];
362 /* The reqq is an array of dynars. Its index corresponds to the rank.
363 Thus each rank saves its own requests to the array request. */
364 xbt_dynar_foreach(reqq[smpi_comm_rank(MPI_COMM_WORLD)],i,requests[i]);
367 //save information from requests
369 xbt_dynar_t srcs = xbt_dynar_new(sizeof(int), NULL);
370 xbt_dynar_t dsts = xbt_dynar_new(sizeof(int), NULL);
371 xbt_dynar_t recvs = xbt_dynar_new(sizeof(int), NULL);
372 for (i = 0; i < count_requests; i++) {
374 int *asrc = xbt_new(int, 1);
375 int *adst = xbt_new(int, 1);
376 int *arecv = xbt_new(int, 1);
377 *asrc = requests[i]->src;
378 *adst = requests[i]->dst;
379 *arecv = requests[i]->recv;
380 xbt_dynar_insert_at(srcs, i, asrc);
381 xbt_dynar_insert_at(dsts, i, adst);
382 xbt_dynar_insert_at(recvs, i, arecv);
387 int *t = xbt_new(int, 1);
388 xbt_dynar_insert_at(srcs, i, t);
389 xbt_dynar_insert_at(dsts, i, t);
390 xbt_dynar_insert_at(recvs, i, t);
394 int rank_traced = smpi_process_index();
395 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
396 extra->type = TRACING_WAITALL;
397 extra->send_size=count_requests;
398 TRACE_smpi_ptp_in(rank_traced, -1, -1, __FUNCTION__,extra);
401 smpi_mpi_waitall(count_requests, requests, status);
404 for (i = 0; i < count_requests; i++) {
405 int src_traced, dst_traced, is_wait_for_receive;
406 xbt_dynar_get_cpy(srcs, i, &src_traced);
407 xbt_dynar_get_cpy(dsts, i, &dst_traced);
408 xbt_dynar_get_cpy(recvs, i, &is_wait_for_receive);
409 if (is_wait_for_receive) {
410 TRACE_smpi_recv(rank_traced, src_traced, dst_traced);
413 TRACE_smpi_ptp_out(rank_traced, -1, -1, __FUNCTION__);
415 xbt_dynar_free(&srcs);
416 xbt_dynar_free(&dsts);
417 xbt_dynar_free(&recvs);
420 xbt_dynar_free_container(&(reqq[smpi_comm_rank(MPI_COMM_WORLD)]));
422 log_timed_action (action, clock);
425 static void action_barrier(const char *const *action){
426 double clock = smpi_process_simulated_elapsed();
428 int rank = smpi_comm_rank(MPI_COMM_WORLD);
429 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
430 extra->type = TRACING_BARRIER;
431 TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
433 smpi_mpi_barrier(MPI_COMM_WORLD);
435 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
438 log_timed_action (action, clock);
442 static void action_bcast(const char *const *action)
444 double size = parse_double(action[2]);
445 double clock = smpi_process_simulated_elapsed();
448 * Initialize MPI_CURRENT_TYPE in order to decrease
449 * the number of the checks
451 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
454 root= atoi(action[3]);
456 MPI_CURRENT_TYPE=decode_datatype(action[4]);
461 int rank = smpi_comm_rank(MPI_COMM_WORLD);
462 int root_traced = smpi_group_index(smpi_comm_group(MPI_COMM_WORLD), root);
464 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
465 extra->type = TRACING_BCAST;
466 extra->send_size = size;
467 extra->root = root_traced;
468 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
469 TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__, extra);
473 mpi_coll_bcast_fun(NULL, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
475 TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
478 log_timed_action (action, clock);
481 static void action_reduce(const char *const *action)
483 double comm_size = parse_double(action[2]);
484 double comp_size = parse_double(action[3]);
485 double clock = smpi_process_simulated_elapsed();
487 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
490 root= atoi(action[4]);
492 MPI_CURRENT_TYPE=decode_datatype(action[5]);
497 int rank = smpi_comm_rank(MPI_COMM_WORLD);
498 int root_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), root);
499 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
500 extra->type = TRACING_REDUCE;
501 extra->send_size = comm_size;
502 extra->comp_size = comp_size;
503 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
504 extra->root = root_traced;
506 TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__,extra);
508 mpi_coll_reduce_fun(NULL, NULL, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
509 smpi_execute_flops(comp_size);
511 TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
514 log_timed_action (action, clock);
517 static void action_allReduce(const char *const *action) {
518 double comm_size = parse_double(action[2]);
519 double comp_size = parse_double(action[3]);
521 if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
522 else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
524 double clock = smpi_process_simulated_elapsed();
526 int rank = smpi_comm_rank(MPI_COMM_WORLD);
527 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
528 extra->type = TRACING_ALLREDUCE;
529 extra->send_size = comm_size;
530 extra->comp_size = comp_size;
531 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
533 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
535 mpi_coll_reduce_fun(NULL, NULL, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, 0, MPI_COMM_WORLD);
536 smpi_execute_flops(comp_size);
537 mpi_coll_bcast_fun(NULL, comm_size, MPI_CURRENT_TYPE, 0, MPI_COMM_WORLD);
539 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
542 log_timed_action (action, clock);
545 static void action_allToAll(const char *const *action) {
546 double clock = smpi_process_simulated_elapsed();
547 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
548 int send_size = parse_double(action[2]);
549 int recv_size = parse_double(action[3]);
550 MPI_Datatype MPI_CURRENT_TYPE2;
553 MPI_CURRENT_TYPE=decode_datatype(action[4]);
554 MPI_CURRENT_TYPE2=decode_datatype(action[5]);
557 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
558 MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
560 void *send = calloc(send_size*comm_size, smpi_datatype_size(MPI_CURRENT_TYPE));
561 void *recv = calloc(recv_size*comm_size, smpi_datatype_size(MPI_CURRENT_TYPE2));
564 int rank = smpi_process_index();
565 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
566 extra->type = TRACING_ALLTOALL;
567 extra->send_size = send_size;
568 extra->recv_size = recv_size;
569 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
570 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
572 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
575 mpi_coll_alltoall_fun(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
578 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
581 log_timed_action (action, clock);
587 static void action_gather(const char *const *action) {
589 The structure of the gather action for the rank 0 (total 4 processes)
594 1) 68 is the sendcounts
595 2) 68 is the recvcounts
596 3) 0 is the root node
597 4) 0 is the send datatype id, see decode_datatype()
598 5) 0 is the recv datatype id, see decode_datatype()
600 double clock = smpi_process_simulated_elapsed();
601 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
602 int send_size = parse_double(action[2]);
603 int recv_size = parse_double(action[3]);
604 MPI_Datatype MPI_CURRENT_TYPE2;
606 MPI_CURRENT_TYPE=decode_datatype(action[5]);
607 MPI_CURRENT_TYPE2=decode_datatype(action[6]);
609 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
610 MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
612 void *send = calloc(send_size, smpi_datatype_size(MPI_CURRENT_TYPE));
615 int root=atoi(action[4]);
616 int rank = smpi_process_index();
619 recv = calloc(recv_size*comm_size, smpi_datatype_size(MPI_CURRENT_TYPE2));
622 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
623 extra->type = TRACING_GATHER;
624 extra->send_size = send_size;
625 extra->recv_size = recv_size;
627 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
628 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
630 TRACE_smpi_collective_in(rank, root, __FUNCTION__, extra);
632 smpi_mpi_gather(send, send_size, MPI_CURRENT_TYPE,
633 recv, recv_size, MPI_CURRENT_TYPE2,
634 root, MPI_COMM_WORLD);
637 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
640 log_timed_action (action, clock);
647 static void action_gatherv(const char *const *action) {
649 The structure of the gatherv action for the rank 0 (total 4 processes)
651 0 gather 68 68 10 10 10 0 0 0
654 1) 68 is the sendcount
655 2) 68 10 10 10 is the recvcounts
656 3) 0 is the root node
657 4) 0 is the send datatype id, see decode_datatype()
658 5) 0 is the recv datatype id, see decode_datatype()
660 double clock = smpi_process_simulated_elapsed();
661 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
662 int send_size = parse_double(action[2]);
663 int *disps = xbt_new0(int, comm_size);
664 int *recvcounts = xbt_new0(int, comm_size);
667 MPI_Datatype MPI_CURRENT_TYPE2;
668 if(action[4+comm_size]) {
669 MPI_CURRENT_TYPE=decode_datatype(action[4+comm_size]);
670 MPI_CURRENT_TYPE2=decode_datatype(action[5+comm_size]);
672 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
673 MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
675 void *send = calloc(send_size, smpi_datatype_size(MPI_CURRENT_TYPE));
677 for(i=0;i<comm_size;i++) {
678 recvcounts[i] = atoi(action[i+3]);
679 recv_sum=recv_sum+recvcounts[i];
683 int root=atoi(action[3+comm_size]);
684 int rank = smpi_process_index();
687 recv = calloc(recv_sum, smpi_datatype_size(MPI_CURRENT_TYPE2));
690 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
691 extra->type = TRACING_GATHERV;
692 extra->send_size = send_size;
693 extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
694 for(i=0; i< comm_size; i++)//copy data to avoid bad free
695 extra->recvcounts[i] = recvcounts[i];
697 extra->num_processes = comm_size;
698 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
699 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
701 TRACE_smpi_collective_in(rank, root, __FUNCTION__, extra);
703 smpi_mpi_gatherv(send, send_size, MPI_CURRENT_TYPE,
704 recv, recvcounts, disps, MPI_CURRENT_TYPE2,
705 root, MPI_COMM_WORLD);
708 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
711 log_timed_action (action, clock);
712 xbt_free(recvcounts);
719 static void action_reducescatter(const char *const *action) {
722 The structure of the reducescatter action for the rank 0 (total 4 processes)
724 0 reduceScatter 275427 275427 275427 204020 11346849 0
727 1) The first four values after the name of the action declare the recvcounts array
728 2) The value 11346849 is the amount of instructions
729 3) The last value corresponds to the datatype, see decode_datatype().
731 We analyze a MPI_Reduce_scatter call to one MPI_Reduce and one MPI_Scatterv.
735 double clock = smpi_process_simulated_elapsed();
736 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
737 int comp_size = parse_double(action[2+comm_size]);
738 int *recvcounts = xbt_new0(int, comm_size);
739 int *disps = xbt_new0(int, comm_size);
742 int rank = smpi_process_index();
744 if(action[3+comm_size])
745 MPI_CURRENT_TYPE=decode_datatype(action[3+comm_size]);
747 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
749 for(i=0;i<comm_size;i++) {
750 recvcounts[i] = atoi(action[i+2]);
751 recv_sum=recv_sum+recvcounts[i];
756 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
757 extra->type = TRACING_REDUCE_SCATTER;
758 extra->send_size = 0;
759 extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
760 for(i=0; i< comm_size; i++)//copy data to avoid bad free
761 extra->recvcounts[i] = recvcounts[i];
762 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
763 extra->comp_size = comp_size;
764 extra->num_processes = comm_size;
767 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
769 mpi_coll_reduce_fun(NULL, NULL, recv_sum, MPI_CURRENT_TYPE, MPI_OP_NULL,
770 root, MPI_COMM_WORLD);
771 smpi_mpi_scatterv(NULL, recvcounts, disps, MPI_CURRENT_TYPE, NULL,
772 recvcounts[rank], MPI_CURRENT_TYPE, 0, MPI_COMM_WORLD);
773 smpi_execute_flops(comp_size);
777 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
779 xbt_free(recvcounts);
781 log_timed_action (action, clock);
785 static void action_allgatherv(const char *const *action) {
788 The structure of the allgatherv action for the rank 0 (total 4 processes)
790 0 allGatherV 275427 275427 275427 275427 204020
793 1) 275427 is the sendcount
794 2) The next four elements declare the recvcounts array
795 3) No more values mean that the datatype for sent and receive buffer
796 is the default one, see decode_datatype().
800 double clock = smpi_process_simulated_elapsed();
802 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
804 int sendcount=atoi(action[2]);
805 int *recvcounts = xbt_new0(int, comm_size);
806 int *disps = xbt_new0(int, comm_size);
808 MPI_Datatype MPI_CURRENT_TYPE2;
810 if(action[3+comm_size]) {
811 MPI_CURRENT_TYPE = decode_datatype(action[3+comm_size]);
812 MPI_CURRENT_TYPE2 = decode_datatype(action[4+comm_size]);
814 MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
815 MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
817 void *sendbuf = calloc(sendcount, smpi_datatype_size(MPI_CURRENT_TYPE));
819 for(i=0;i<comm_size;i++) {
820 recvcounts[i] = atoi(action[i+3]);
821 recv_sum=recv_sum+recvcounts[i];
823 void *recvbuf = calloc(recv_sum, smpi_datatype_size(MPI_CURRENT_TYPE2));
826 int rank = smpi_process_index();
827 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
828 extra->type = TRACING_ALLGATHERV;
829 extra->send_size = sendcount;
830 extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
831 for(i=0; i< comm_size; i++)//copy data to avoid bad free
832 extra->recvcounts[i] = recvcounts[i];
833 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
834 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
835 extra->num_processes = comm_size;
837 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
840 mpi_coll_allgatherv_fun(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts, disps, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
843 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
846 log_timed_action (action, clock);
849 xbt_free(recvcounts);
854 static void action_allToAllv(const char *const *action) {
856 The structure of the allToAllV action for the rank 0 (total 4 processes)
858 0 allToAllV 100 1 7 10 12 100 1 70 10 5
861 1) 100 is the size of the send buffer *sizeof(int),
862 2) 1 7 10 12 is the sendcounts array
863 3) 100*sizeof(int) is the size of the receiver buffer
864 4) 1 70 10 5 is the recvcounts array
869 double clock = smpi_process_simulated_elapsed();
871 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
872 int send_buf_size=0,recv_buf_size=0,i=0;
873 int *sendcounts = xbt_new0(int, comm_size);
874 int *recvcounts = xbt_new0(int, comm_size);
875 int *senddisps = xbt_new0(int, comm_size);
876 int *recvdisps = xbt_new0(int, comm_size);
878 MPI_Datatype MPI_CURRENT_TYPE2;
880 send_buf_size=parse_double(action[2]);
881 recv_buf_size=parse_double(action[3+comm_size]);
882 if(action[4+2*comm_size]) {
883 MPI_CURRENT_TYPE=decode_datatype(action[4+2*comm_size]);
884 MPI_CURRENT_TYPE2=decode_datatype(action[5+2*comm_size]);
887 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
888 MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
891 void *sendbuf = calloc(send_buf_size, smpi_datatype_size(MPI_CURRENT_TYPE));
892 void *recvbuf = calloc(recv_buf_size, smpi_datatype_size(MPI_CURRENT_TYPE2));
894 for(i=0;i<comm_size;i++) {
895 sendcounts[i] = atoi(action[i+3]);
896 recvcounts[i] = atoi(action[i+4+comm_size]);
901 int rank = smpi_process_index();
902 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
903 extra->type = TRACING_ALLTOALLV;
904 extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
905 extra->sendcounts= xbt_malloc(comm_size*sizeof(int));
906 extra->num_processes = comm_size;
908 for(i=0; i< comm_size; i++){//copy data to avoid bad free
909 extra->send_size += sendcounts[i];
910 extra->sendcounts[i] = sendcounts[i];
911 extra->recv_size += recvcounts[i];
912 extra->recvcounts[i] = recvcounts[i];
914 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
915 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
917 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
919 mpi_coll_alltoallv_fun(sendbuf, sendcounts, senddisps, MPI_CURRENT_TYPE,
920 recvbuf, recvcounts, recvdisps, MPI_CURRENT_TYPE,
923 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
926 log_timed_action (action, clock);
929 xbt_free(sendcounts);
930 xbt_free(recvcounts);
935 void smpi_replay_init(int *argc, char***argv){
936 smpi_process_init(argc, argv);
937 smpi_process_mark_as_initialized();
939 int rank = smpi_process_index();
940 TRACE_smpi_init(rank);
941 TRACE_smpi_computing_init(rank);
942 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
943 extra->type = TRACING_INIT;
944 TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
945 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
948 if (!smpi_process_index()){
949 _xbt_replay_action_init();
950 xbt_replay_action_register("init", action_init);
951 xbt_replay_action_register("finalize", action_finalize);
952 xbt_replay_action_register("comm_size", action_comm_size);
953 xbt_replay_action_register("comm_split", action_comm_split);
954 xbt_replay_action_register("comm_dup", action_comm_dup);
955 xbt_replay_action_register("send", action_send);
956 xbt_replay_action_register("Isend", action_Isend);
957 xbt_replay_action_register("recv", action_recv);
958 xbt_replay_action_register("Irecv", action_Irecv);
959 xbt_replay_action_register("wait", action_wait);
960 xbt_replay_action_register("waitAll", action_waitall);
961 xbt_replay_action_register("barrier", action_barrier);
962 xbt_replay_action_register("bcast", action_bcast);
963 xbt_replay_action_register("reduce", action_reduce);
964 xbt_replay_action_register("allReduce", action_allReduce);
965 xbt_replay_action_register("allToAll", action_allToAll);
966 xbt_replay_action_register("allToAllV", action_allToAllv);
967 xbt_replay_action_register("gather", action_gather);
968 xbt_replay_action_register("gatherV", action_gatherv);
969 xbt_replay_action_register("allGatherV", action_allgatherv);
970 xbt_replay_action_register("reduceScatter", action_reducescatter);
971 xbt_replay_action_register("compute", action_compute);
974 xbt_replay_action_runner(*argc, *argv);
977 int smpi_replay_finalize(){
979 /* One active process will stop. Decrease the counter*/
980 XBT_DEBUG("There are %lu elements in reqq[*]",
981 xbt_dynar_length(reqq[smpi_comm_rank(MPI_COMM_WORLD)]));
982 if (!xbt_dynar_is_empty(reqq[smpi_comm_rank(MPI_COMM_WORLD)])){
983 int count_requests=xbt_dynar_length(reqq[smpi_comm_rank(MPI_COMM_WORLD)]);
984 MPI_Request requests[count_requests];
985 MPI_Status status[count_requests];
988 xbt_dynar_foreach(reqq[smpi_comm_rank(MPI_COMM_WORLD)],i,requests[i]);
989 smpi_mpi_waitall(count_requests, requests, status);
995 xbt_dynar_free_container(&(reqq[smpi_comm_rank(MPI_COMM_WORLD)]));
997 if(!active_processes){
998 /* Last process alive speaking */
999 /* end the simulated timer */
1000 sim_time = smpi_process_simulated_elapsed();
1001 XBT_INFO("Simulation time %f", sim_time);
1002 _xbt_replay_action_exit();
1006 smpi_mpi_barrier(MPI_COMM_WORLD);
1008 int rank = smpi_process_index();
1009 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
1010 extra->type = TRACING_FINALIZE;
1011 TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
1013 smpi_process_finalize();
1015 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
1016 TRACE_smpi_finalize(smpi_process_index());
1018 smpi_process_destroy();