1 /* Copyright (c) 2009-2014. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
10 #include <xbt/replay.h>
12 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
14 int communicator_size = 0;
15 static int active_processes = 0;
16 xbt_dynar_t *reqq = NULL;
18 MPI_Datatype MPI_DEFAULT_TYPE;
19 MPI_Datatype MPI_CURRENT_TYPE;
21 static void log_timed_action (const char *const *action, double clock){
22 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
23 char *name = xbt_str_join_array(action, " ");
24 XBT_VERB("%s %f", name, smpi_process_simulated_elapsed()-clock);
30 static double parse_double(const char *string)
34 value = strtod(string, &endptr);
36 THROWF(unknown_error, 0, "%s is not a double", string);
40 static MPI_Datatype decode_datatype(const char *const action)
42 // Declared datatypes,
47 MPI_CURRENT_TYPE=MPI_DOUBLE;
50 MPI_CURRENT_TYPE=MPI_INT;
53 MPI_CURRENT_TYPE=MPI_CHAR;
56 MPI_CURRENT_TYPE=MPI_SHORT;
59 MPI_CURRENT_TYPE=MPI_LONG;
62 MPI_CURRENT_TYPE=MPI_FLOAT;
65 MPI_CURRENT_TYPE=MPI_BYTE;
68 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
71 return MPI_CURRENT_TYPE;
75 const char* encode_datatype(MPI_Datatype datatype)
78 //default type for output is set to MPI_BYTE
79 // MPI_DEFAULT_TYPE is not set for output, use directly MPI_BYTE
80 if (datatype==MPI_BYTE){
83 if(datatype==MPI_DOUBLE)
87 if(datatype==MPI_CHAR)
89 if(datatype==MPI_SHORT)
91 if(datatype==MPI_LONG)
93 if(datatype==MPI_FLOAT)
96 // default - not implemented.
97 // do not warn here as we pass in this function even for other trace formats
101 static void action_init(const char *const *action)
104 XBT_DEBUG("Initialize the counters");
106 if(action[2]) MPI_DEFAULT_TYPE= MPI_DOUBLE; // default MPE dataype
107 else MPI_DEFAULT_TYPE= MPI_BYTE; // default TAU datatype
109 /* start a simulated timer */
110 smpi_process_simulated_start();
111 /*initialize the number of active processes */
112 active_processes = smpi_process_count();
115 reqq=xbt_new0(xbt_dynar_t,active_processes);
117 for(i=0;i<active_processes;i++){
118 reqq[i]=xbt_dynar_new(sizeof(MPI_Request),&xbt_free_ref);
123 static void action_finalize(const char *const *action)
127 static void action_comm_size(const char *const *action)
129 double clock = smpi_process_simulated_elapsed();
131 communicator_size = parse_double(action[2]);
132 log_timed_action (action, clock);
135 static void action_comm_split(const char *const *action)
137 double clock = smpi_process_simulated_elapsed();
139 log_timed_action (action, clock);
142 static void action_comm_dup(const char *const *action)
144 double clock = smpi_process_simulated_elapsed();
146 log_timed_action (action, clock);
149 static void action_compute(const char *const *action)
151 double clock = smpi_process_simulated_elapsed();
152 double flops= parse_double(action[2]);
154 int rank = smpi_comm_rank(MPI_COMM_WORLD);
155 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
156 extra->type=TRACING_COMPUTING;
157 extra->comp_size=flops;
158 TRACE_smpi_computing_in(rank, extra);
160 smpi_execute_flops(flops);
162 TRACE_smpi_computing_out(rank);
165 log_timed_action (action, clock);
168 static void action_send(const char *const *action)
170 int to = atoi(action[2]);
171 double size=parse_double(action[3]);
172 double clock = smpi_process_simulated_elapsed();
175 MPI_CURRENT_TYPE=decode_datatype(action[4]);
177 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
181 int rank = smpi_comm_rank(MPI_COMM_WORLD);
183 int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
184 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
185 extra->type = TRACING_SEND;
186 extra->send_size = size;
188 extra->dst = dst_traced;
189 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
190 TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
191 TRACE_smpi_send(rank, rank, dst_traced, size*smpi_datatype_size(MPI_CURRENT_TYPE));
194 smpi_mpi_send(NULL, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
196 log_timed_action (action, clock);
199 TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
204 static void action_Isend(const char *const *action)
206 int to = atoi(action[2]);
207 double size=parse_double(action[3]);
208 double clock = smpi_process_simulated_elapsed();
211 if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
212 else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
215 int rank = smpi_comm_rank(MPI_COMM_WORLD);
216 int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
217 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
218 extra->type = TRACING_ISEND;
219 extra->send_size = size;
221 extra->dst = dst_traced;
222 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
223 TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
224 TRACE_smpi_send(rank, rank, dst_traced, size*smpi_datatype_size(MPI_CURRENT_TYPE));
227 request = smpi_mpi_isend(NULL, size, MPI_CURRENT_TYPE, to, 0,MPI_COMM_WORLD);
230 TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
234 xbt_dynar_push(reqq[smpi_comm_rank(MPI_COMM_WORLD)],&request);
236 log_timed_action (action, clock);
239 static void action_recv(const char *const *action) {
240 int from = atoi(action[2]);
241 double size=parse_double(action[3]);
242 double clock = smpi_process_simulated_elapsed();
245 if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
246 else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
249 int rank = smpi_comm_rank(MPI_COMM_WORLD);
250 int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
252 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
253 extra->type = TRACING_RECV;
254 extra->send_size = size;
255 extra->src = src_traced;
257 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
258 TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
261 //unknow size from the receiver pov
263 smpi_mpi_probe(from, 0, MPI_COMM_WORLD, &status);
267 smpi_mpi_recv(NULL, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
270 TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
271 TRACE_smpi_recv(rank, src_traced, rank);
274 log_timed_action (action, clock);
277 static void action_Irecv(const char *const *action)
279 int from = atoi(action[2]);
280 double size=parse_double(action[3]);
281 double clock = smpi_process_simulated_elapsed();
284 if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
285 else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
288 int rank = smpi_comm_rank(MPI_COMM_WORLD);
289 int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
290 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
291 extra->type = TRACING_IRECV;
292 extra->send_size = size;
293 extra->src = src_traced;
295 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
296 TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
299 //unknow size from the receiver pov
301 smpi_mpi_probe(from, 0, MPI_COMM_WORLD, &status);
305 request = smpi_mpi_irecv(NULL, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
308 TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
311 xbt_dynar_push(reqq[smpi_comm_rank(MPI_COMM_WORLD)],&request);
313 log_timed_action (action, clock);
316 static void action_wait(const char *const *action){
317 double clock = smpi_process_simulated_elapsed();
321 xbt_assert(xbt_dynar_length(reqq[smpi_comm_rank(MPI_COMM_WORLD)]),
322 "action wait not preceded by any irecv or isend: %s",
323 xbt_str_join_array(action," "));
324 request = xbt_dynar_pop_as(reqq[smpi_comm_rank(MPI_COMM_WORLD)],MPI_Request);
325 xbt_assert(request != NULL, "found null request in reqq");
327 int rank = request->comm != MPI_COMM_NULL
328 ? smpi_comm_rank(request->comm)
331 MPI_Group group = smpi_comm_group(request->comm);
332 int src_traced = smpi_group_rank(group, request->src);
333 int dst_traced = smpi_group_rank(group, request->dst);
334 int is_wait_for_receive = request->recv;
335 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
336 extra->type = TRACING_WAIT;
337 TRACE_smpi_ptp_in(rank, src_traced, dst_traced, __FUNCTION__, extra);
339 smpi_mpi_wait(&request, &status);
341 TRACE_smpi_ptp_out(rank, src_traced, dst_traced, __FUNCTION__);
342 if (is_wait_for_receive) {
343 TRACE_smpi_recv(rank, src_traced, dst_traced);
347 log_timed_action (action, clock);
350 static void action_waitall(const char *const *action){
351 double clock = smpi_process_simulated_elapsed();
352 int count_requests=0;
355 count_requests=xbt_dynar_length(reqq[smpi_comm_rank(MPI_COMM_WORLD)]);
357 if (count_requests>0) {
358 MPI_Request requests[count_requests];
359 MPI_Status status[count_requests];
361 /* The reqq is an array of dynars. Its index corresponds to the rank.
362 Thus each rank saves its own requests to the array request. */
363 xbt_dynar_foreach(reqq[smpi_comm_rank(MPI_COMM_WORLD)],i,requests[i]);
366 //save information from requests
368 xbt_dynar_t srcs = xbt_dynar_new(sizeof(int), NULL);
369 xbt_dynar_t dsts = xbt_dynar_new(sizeof(int), NULL);
370 xbt_dynar_t recvs = xbt_dynar_new(sizeof(int), NULL);
371 for (i = 0; i < count_requests; i++) {
373 int *asrc = xbt_new(int, 1);
374 int *adst = xbt_new(int, 1);
375 int *arecv = xbt_new(int, 1);
376 *asrc = requests[i]->src;
377 *adst = requests[i]->dst;
378 *arecv = requests[i]->recv;
379 xbt_dynar_insert_at(srcs, i, asrc);
380 xbt_dynar_insert_at(dsts, i, adst);
381 xbt_dynar_insert_at(recvs, i, arecv);
386 int *t = xbt_new(int, 1);
387 xbt_dynar_insert_at(srcs, i, t);
388 xbt_dynar_insert_at(dsts, i, t);
389 xbt_dynar_insert_at(recvs, i, t);
393 int rank_traced = smpi_process_index();
394 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
395 extra->type = TRACING_WAITALL;
396 extra->send_size=count_requests;
397 TRACE_smpi_ptp_in(rank_traced, -1, -1, __FUNCTION__,extra);
400 smpi_mpi_waitall(count_requests, requests, status);
403 for (i = 0; i < count_requests; i++) {
404 int src_traced, dst_traced, is_wait_for_receive;
405 xbt_dynar_get_cpy(srcs, i, &src_traced);
406 xbt_dynar_get_cpy(dsts, i, &dst_traced);
407 xbt_dynar_get_cpy(recvs, i, &is_wait_for_receive);
408 if (is_wait_for_receive) {
409 TRACE_smpi_recv(rank_traced, src_traced, dst_traced);
412 TRACE_smpi_ptp_out(rank_traced, -1, -1, __FUNCTION__);
414 xbt_dynar_free(&srcs);
415 xbt_dynar_free(&dsts);
416 xbt_dynar_free(&recvs);
419 xbt_dynar_free_container(&(reqq[smpi_comm_rank(MPI_COMM_WORLD)]));
421 log_timed_action (action, clock);
424 static void action_barrier(const char *const *action){
425 double clock = smpi_process_simulated_elapsed();
427 int rank = smpi_comm_rank(MPI_COMM_WORLD);
428 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
429 extra->type = TRACING_BARRIER;
430 TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
432 smpi_mpi_barrier(MPI_COMM_WORLD);
434 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
437 log_timed_action (action, clock);
441 static void action_bcast(const char *const *action)
443 double size = parse_double(action[2]);
444 double clock = smpi_process_simulated_elapsed();
447 * Initialize MPI_CURRENT_TYPE in order to decrease
448 * the number of the checks
450 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
453 root= atoi(action[3]);
455 MPI_CURRENT_TYPE=decode_datatype(action[4]);
460 int rank = smpi_comm_rank(MPI_COMM_WORLD);
461 int root_traced = smpi_group_index(smpi_comm_group(MPI_COMM_WORLD), root);
463 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
464 extra->type = TRACING_BCAST;
465 extra->send_size = size;
466 extra->root = root_traced;
467 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
468 TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__, extra);
472 mpi_coll_bcast_fun(NULL, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
474 TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
477 log_timed_action (action, clock);
480 static void action_reduce(const char *const *action)
482 double comm_size = parse_double(action[2]);
483 double comp_size = parse_double(action[3]);
484 double clock = smpi_process_simulated_elapsed();
486 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
489 root= atoi(action[4]);
491 MPI_CURRENT_TYPE=decode_datatype(action[5]);
496 int rank = smpi_comm_rank(MPI_COMM_WORLD);
497 int root_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), root);
498 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
499 extra->type = TRACING_REDUCE;
500 extra->send_size = comm_size;
501 extra->comp_size = comp_size;
502 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
503 extra->root = root_traced;
505 TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__,extra);
507 mpi_coll_reduce_fun(NULL, NULL, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
508 smpi_execute_flops(comp_size);
510 TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
513 log_timed_action (action, clock);
516 static void action_allReduce(const char *const *action) {
517 double comm_size = parse_double(action[2]);
518 double comp_size = parse_double(action[3]);
520 if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
521 else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
523 double clock = smpi_process_simulated_elapsed();
525 int rank = smpi_comm_rank(MPI_COMM_WORLD);
526 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
527 extra->type = TRACING_ALLREDUCE;
528 extra->send_size = comm_size;
529 extra->comp_size = comp_size;
530 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
532 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
534 mpi_coll_reduce_fun(NULL, NULL, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, 0, MPI_COMM_WORLD);
535 smpi_execute_flops(comp_size);
536 mpi_coll_bcast_fun(NULL, comm_size, MPI_CURRENT_TYPE, 0, MPI_COMM_WORLD);
538 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
541 log_timed_action (action, clock);
544 static void action_allToAll(const char *const *action) {
545 double clock = smpi_process_simulated_elapsed();
546 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
547 int send_size = parse_double(action[2]);
548 int recv_size = parse_double(action[3]);
549 MPI_Datatype MPI_CURRENT_TYPE2;
552 MPI_CURRENT_TYPE=decode_datatype(action[4]);
553 MPI_CURRENT_TYPE2=decode_datatype(action[5]);
556 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
557 MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
559 void *send = calloc(send_size*comm_size, smpi_datatype_size(MPI_CURRENT_TYPE));
560 void *recv = calloc(recv_size*comm_size, smpi_datatype_size(MPI_CURRENT_TYPE2));
563 int rank = smpi_process_index();
564 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
565 extra->type = TRACING_ALLTOALL;
566 extra->send_size = send_size;
567 extra->recv_size = recv_size;
568 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
569 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
571 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
574 mpi_coll_alltoall_fun(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
577 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
580 log_timed_action (action, clock);
586 static void action_gather(const char *const *action) {
588 The structure of the gather action for the rank 0 (total 4 processes)
593 1) 68 is the sendcounts
594 2) 68 is the recvcounts
595 3) 0 is the root node
596 4) 0 is the send datatype id, see decode_datatype()
597 5) 0 is the recv datatype id, see decode_datatype()
599 double clock = smpi_process_simulated_elapsed();
600 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
601 int send_size = parse_double(action[2]);
602 int recv_size = parse_double(action[3]);
603 MPI_Datatype MPI_CURRENT_TYPE2;
605 MPI_CURRENT_TYPE=decode_datatype(action[5]);
606 MPI_CURRENT_TYPE2=decode_datatype(action[6]);
608 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
609 MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
611 void *send = calloc(send_size, smpi_datatype_size(MPI_CURRENT_TYPE));
614 int root=atoi(action[4]);
615 int rank = smpi_process_index();
618 recv = calloc(recv_size*comm_size, smpi_datatype_size(MPI_CURRENT_TYPE2));
621 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
622 extra->type = TRACING_GATHER;
623 extra->send_size = send_size;
624 extra->recv_size = recv_size;
626 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
627 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
629 TRACE_smpi_collective_in(rank, root, __FUNCTION__, extra);
631 smpi_mpi_gather(send, send_size, MPI_CURRENT_TYPE,
632 recv, recv_size, MPI_CURRENT_TYPE2,
633 root, MPI_COMM_WORLD);
636 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
639 log_timed_action (action, clock);
646 static void action_gatherv(const char *const *action) {
648 The structure of the gatherv action for the rank 0 (total 4 processes)
650 0 gather 68 68 10 10 10 0 0 0
653 1) 68 is the sendcount
654 2) 68 10 10 10 is the recvcounts
655 3) 0 is the root node
656 4) 0 is the send datatype id, see decode_datatype()
657 5) 0 is the recv datatype id, see decode_datatype()
659 double clock = smpi_process_simulated_elapsed();
660 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
661 int send_size = parse_double(action[2]);
662 int *disps = xbt_new0(int, comm_size);
663 int *recvcounts = xbt_new0(int, comm_size);
666 MPI_Datatype MPI_CURRENT_TYPE2;
667 if(action[4+comm_size]) {
668 MPI_CURRENT_TYPE=decode_datatype(action[4+comm_size]);
669 MPI_CURRENT_TYPE2=decode_datatype(action[5+comm_size]);
671 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
672 MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
674 void *send = calloc(send_size, smpi_datatype_size(MPI_CURRENT_TYPE));
676 for(i=0;i<comm_size;i++) {
677 recvcounts[i] = atoi(action[i+3]);
678 recv_sum=recv_sum+recvcounts[i];
682 int root=atoi(action[3+comm_size]);
683 int rank = smpi_process_index();
686 recv = calloc(recv_sum, smpi_datatype_size(MPI_CURRENT_TYPE2));
689 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
690 extra->type = TRACING_GATHERV;
691 extra->send_size = send_size;
692 extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
693 for(i=0; i< comm_size; i++)//copy data to avoid bad free
694 extra->recvcounts[i] = recvcounts[i];
696 extra->num_processes = comm_size;
697 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
698 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
700 TRACE_smpi_collective_in(rank, root, __FUNCTION__, extra);
702 smpi_mpi_gatherv(send, send_size, MPI_CURRENT_TYPE,
703 recv, recvcounts, disps, MPI_CURRENT_TYPE2,
704 root, MPI_COMM_WORLD);
707 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
710 log_timed_action (action, clock);
711 xbt_free(recvcounts);
718 static void action_reducescatter(const char *const *action) {
721 The structure of the reducescatter action for the rank 0 (total 4 processes)
723 0 reduceScatter 275427 275427 275427 204020 11346849 0
726 1) The first four values after the name of the action declare the recvcounts array
727 2) The value 11346849 is the amount of instructions
728 3) The last value corresponds to the datatype, see decode_datatype().
730 We analyze a MPI_Reduce_scatter call to one MPI_Reduce and one MPI_Scatterv.
734 double clock = smpi_process_simulated_elapsed();
735 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
736 int comp_size = parse_double(action[2+comm_size]);
737 int *recvcounts = xbt_new0(int, comm_size);
738 int *disps = xbt_new0(int, comm_size);
741 int rank = smpi_process_index();
743 if(action[3+comm_size])
744 MPI_CURRENT_TYPE=decode_datatype(action[3+comm_size]);
746 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
748 for(i=0;i<comm_size;i++) {
749 recvcounts[i] = atoi(action[i+2]);
750 recv_sum=recv_sum+recvcounts[i];
755 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
756 extra->type = TRACING_REDUCE_SCATTER;
757 extra->send_size = 0;
758 extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
759 for(i=0; i< comm_size; i++)//copy data to avoid bad free
760 extra->recvcounts[i] = recvcounts[i];
761 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
762 extra->comp_size = comp_size;
763 extra->num_processes = comm_size;
766 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
768 mpi_coll_reduce_fun(NULL, NULL, recv_sum, MPI_CURRENT_TYPE, MPI_OP_NULL,
769 root, MPI_COMM_WORLD);
770 smpi_mpi_scatterv(NULL, recvcounts, disps, MPI_CURRENT_TYPE, NULL,
771 recvcounts[rank], MPI_CURRENT_TYPE, 0, MPI_COMM_WORLD);
772 smpi_execute_flops(comp_size);
776 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
778 xbt_free(recvcounts);
780 log_timed_action (action, clock);
784 static void action_allgatherv(const char *const *action) {
787 The structure of the allgatherv action for the rank 0 (total 4 processes)
789 0 allGatherV 275427 275427 275427 275427 204020
792 1) 275427 is the sendcount
793 2) The next four elements declare the recvcounts array
794 3) No more values mean that the datatype for sent and receive buffer
795 is the default one, see decode_datatype().
799 double clock = smpi_process_simulated_elapsed();
801 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
803 int sendcount=atoi(action[2]);
804 int *recvcounts = xbt_new0(int, comm_size);
805 int *disps = xbt_new0(int, comm_size);
807 MPI_Datatype MPI_CURRENT_TYPE2;
809 if(action[3+comm_size]) {
810 MPI_CURRENT_TYPE = decode_datatype(action[3+comm_size]);
811 MPI_CURRENT_TYPE2 = decode_datatype(action[4+comm_size]);
813 MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
814 MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
816 void *sendbuf = calloc(sendcount, smpi_datatype_size(MPI_CURRENT_TYPE));
818 for(i=0;i<comm_size;i++) {
819 recvcounts[i] = atoi(action[i+3]);
820 recv_sum=recv_sum+recvcounts[i];
822 void *recvbuf = calloc(recv_sum, smpi_datatype_size(MPI_CURRENT_TYPE2));
825 int rank = smpi_process_index();
826 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
827 extra->type = TRACING_ALLGATHERV;
828 extra->send_size = sendcount;
829 extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
830 for(i=0; i< comm_size; i++)//copy data to avoid bad free
831 extra->recvcounts[i] = recvcounts[i];
832 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
833 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
834 extra->num_processes = comm_size;
836 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
839 mpi_coll_allgatherv_fun(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts, disps, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
842 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
845 log_timed_action (action, clock);
848 xbt_free(recvcounts);
853 static void action_allToAllv(const char *const *action) {
855 The structure of the allToAllV action for the rank 0 (total 4 processes)
857 0 allToAllV 100 1 7 10 12 100 1 70 10 5
860 1) 100 is the size of the send buffer *sizeof(int),
861 2) 1 7 10 12 is the sendcounts array
862 3) 100*sizeof(int) is the size of the receiver buffer
863 4) 1 70 10 5 is the recvcounts array
868 double clock = smpi_process_simulated_elapsed();
870 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
871 int send_buf_size=0,recv_buf_size=0,i=0;
872 int *sendcounts = xbt_new0(int, comm_size);
873 int *recvcounts = xbt_new0(int, comm_size);
874 int *senddisps = xbt_new0(int, comm_size);
875 int *recvdisps = xbt_new0(int, comm_size);
877 MPI_Datatype MPI_CURRENT_TYPE2;
879 send_buf_size=parse_double(action[2]);
880 recv_buf_size=parse_double(action[3+comm_size]);
881 if(action[4+2*comm_size]) {
882 MPI_CURRENT_TYPE=decode_datatype(action[4+2*comm_size]);
883 MPI_CURRENT_TYPE2=decode_datatype(action[5+2*comm_size]);
886 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
887 MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
890 void *sendbuf = calloc(send_buf_size, smpi_datatype_size(MPI_CURRENT_TYPE));
891 void *recvbuf = calloc(recv_buf_size, smpi_datatype_size(MPI_CURRENT_TYPE2));
893 for(i=0;i<comm_size;i++) {
894 sendcounts[i] = atoi(action[i+3]);
895 recvcounts[i] = atoi(action[i+4+comm_size]);
900 int rank = smpi_process_index();
901 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
902 extra->type = TRACING_ALLTOALLV;
903 extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
904 extra->sendcounts= xbt_malloc(comm_size*sizeof(int));
905 extra->num_processes = comm_size;
907 for(i=0; i< comm_size; i++){//copy data to avoid bad free
908 extra->send_size += sendcounts[i];
909 extra->sendcounts[i] = sendcounts[i];
910 extra->recv_size += recvcounts[i];
911 extra->recvcounts[i] = recvcounts[i];
913 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
914 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
916 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
918 mpi_coll_alltoallv_fun(sendbuf, sendcounts, senddisps, MPI_CURRENT_TYPE,
919 recvbuf, recvcounts, recvdisps, MPI_CURRENT_TYPE,
922 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
925 log_timed_action (action, clock);
928 xbt_free(sendcounts);
929 xbt_free(recvcounts);
934 void smpi_replay_init(int *argc, char***argv){
935 smpi_process_init(argc, argv);
936 smpi_process_mark_as_initialized();
938 int rank = smpi_process_index();
939 TRACE_smpi_init(rank);
940 TRACE_smpi_computing_init(rank);
941 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
942 extra->type = TRACING_INIT;
943 TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
944 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
947 if (!smpi_process_index()){
948 _xbt_replay_action_init();
949 xbt_replay_action_register("init", action_init);
950 xbt_replay_action_register("finalize", action_finalize);
951 xbt_replay_action_register("comm_size", action_comm_size);
952 xbt_replay_action_register("comm_split", action_comm_split);
953 xbt_replay_action_register("comm_dup", action_comm_dup);
954 xbt_replay_action_register("send", action_send);
955 xbt_replay_action_register("Isend", action_Isend);
956 xbt_replay_action_register("recv", action_recv);
957 xbt_replay_action_register("Irecv", action_Irecv);
958 xbt_replay_action_register("wait", action_wait);
959 xbt_replay_action_register("waitAll", action_waitall);
960 xbt_replay_action_register("barrier", action_barrier);
961 xbt_replay_action_register("bcast", action_bcast);
962 xbt_replay_action_register("reduce", action_reduce);
963 xbt_replay_action_register("allReduce", action_allReduce);
964 xbt_replay_action_register("allToAll", action_allToAll);
965 xbt_replay_action_register("allToAllV", action_allToAllv);
966 xbt_replay_action_register("gather", action_gather);
967 xbt_replay_action_register("gatherV", action_gatherv);
968 xbt_replay_action_register("allGatherV", action_allgatherv);
969 xbt_replay_action_register("reduceScatter", action_reducescatter);
970 xbt_replay_action_register("compute", action_compute);
973 xbt_replay_action_runner(*argc, *argv);
976 int smpi_replay_finalize(){
978 /* One active process will stop. Decrease the counter*/
979 XBT_DEBUG("There are %lu elements in reqq[*]",
980 xbt_dynar_length(reqq[smpi_comm_rank(MPI_COMM_WORLD)]));
981 if (!xbt_dynar_is_empty(reqq[smpi_comm_rank(MPI_COMM_WORLD)])){
982 int count_requests=xbt_dynar_length(reqq[smpi_comm_rank(MPI_COMM_WORLD)]);
983 MPI_Request requests[count_requests];
984 MPI_Status status[count_requests];
987 xbt_dynar_foreach(reqq[smpi_comm_rank(MPI_COMM_WORLD)],i,requests[i]);
988 smpi_mpi_waitall(count_requests, requests, status);
994 xbt_dynar_free_container(&(reqq[smpi_comm_rank(MPI_COMM_WORLD)]));
996 if(!active_processes){
997 /* Last process alive speaking */
998 /* end the simulated timer */
999 sim_time = smpi_process_simulated_elapsed();
1000 XBT_INFO("Simulation time %f", sim_time);
1001 _xbt_replay_action_exit();
1005 smpi_mpi_barrier(MPI_COMM_WORLD);
1007 int rank = smpi_process_index();
1008 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
1009 extra->type = TRACING_FINALIZE;
1010 TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
1012 smpi_process_finalize();
1014 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
1015 TRACE_smpi_finalize(smpi_process_index());
1017 smpi_process_destroy();