1 /* Copyright (c) 2009-2015. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
10 #include <xbt/replay.h>
12 #define KEY_SIZE (sizeof(int) * 2 + 1)
14 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
16 int communicator_size = 0;
17 static int active_processes = 0;
18 xbt_dict_t reqq = NULL;
20 MPI_Datatype MPI_DEFAULT_TYPE;
21 MPI_Datatype MPI_CURRENT_TYPE;
23 static int sendbuffer_size=0;
24 char* sendbuffer=NULL;
25 static int recvbuffer_size=0;
26 char* recvbuffer=NULL;
28 static void log_timed_action (const char *const *action, double clock){
29 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
30 char *name = xbt_str_join_array(action, " ");
31 XBT_VERB("%s %f", name, smpi_process_simulated_elapsed()-clock);
37 static xbt_dynar_t get_reqq_self(){
40 int size = asprintf(&key, "%d", smpi_process_index());
42 xbt_die("could not allocate memory for asprintf");
43 xbt_dynar_t dynar_mpi_request = (xbt_dynar_t) xbt_dict_get(reqq, key);
46 return dynar_mpi_request;
49 static void set_reqq_self(xbt_dynar_t mpi_request){
52 int size = asprintf(&key, "%d", smpi_process_index());
54 xbt_die("could not allocate memory for asprintf");
55 xbt_dict_set(reqq, key, mpi_request, free);
60 //allocate a single buffer for all sends, growing it if needed
61 void* smpi_get_tmp_sendbuffer(int size){
62 if (!smpi_process_get_replaying())
63 return xbt_malloc(size);
64 if (sendbuffer_size<size){
65 sendbuffer=xbt_realloc(sendbuffer,size);
70 //allocate a single buffer for all recv
71 void* smpi_get_tmp_recvbuffer(int size){
72 if (!smpi_process_get_replaying())
73 return xbt_malloc(size);
74 if (recvbuffer_size<size){
75 recvbuffer=xbt_realloc(recvbuffer,size);
81 void smpi_free_tmp_buffer(void* buf){
82 if (!smpi_process_get_replaying())
87 static double parse_double(const char *string)
91 value = strtod(string, &endptr);
93 THROWF(unknown_error, 0, "%s is not a double", string);
97 static MPI_Datatype decode_datatype(const char *const action)
99 // Declared datatypes,
104 MPI_CURRENT_TYPE=MPI_DOUBLE;
107 MPI_CURRENT_TYPE=MPI_INT;
110 MPI_CURRENT_TYPE=MPI_CHAR;
113 MPI_CURRENT_TYPE=MPI_SHORT;
116 MPI_CURRENT_TYPE=MPI_LONG;
119 MPI_CURRENT_TYPE=MPI_FLOAT;
122 MPI_CURRENT_TYPE=MPI_BYTE;
125 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
128 return MPI_CURRENT_TYPE;
132 const char* encode_datatype(MPI_Datatype datatype, int* known)
135 //default type for output is set to MPI_BYTE
136 // MPI_DEFAULT_TYPE is not set for output, use directly MPI_BYTE
138 if (datatype==MPI_BYTE){
141 if(datatype==MPI_DOUBLE)
143 if(datatype==MPI_INT)
145 if(datatype==MPI_CHAR)
147 if(datatype==MPI_SHORT)
149 if(datatype==MPI_LONG)
151 if(datatype==MPI_FLOAT)
153 //tell that the datatype is not handled by replay, and that its size should be measured and replayed as size*MPI_BYTE
155 // default - not implemented.
156 // do not warn here as we pass in this function even for other trace formats
160 #define CHECK_ACTION_PARAMS(action, mandatory, optional) {\
162 while(action[i]!=NULL)\
165 THROWF(arg_error, 0, "%s replay failed.\n" \
166 "%d items were given on the line. First two should be process_id and action. " \
167 "This action needs after them %d mandatory arguments, and accepts %d optional ones. \n" \
168 "Please contact the Simgrid team if support is needed", __FUNCTION__, i, mandatory, optional);\
172 static void action_init(const char *const *action)
174 XBT_DEBUG("Initialize the counters");
175 CHECK_ACTION_PARAMS(action, 0, 1);
176 if(action[2]) MPI_DEFAULT_TYPE= MPI_DOUBLE; // default MPE dataype
177 else MPI_DEFAULT_TYPE= MPI_BYTE; // default TAU datatype
179 /* start a simulated timer */
180 smpi_process_simulated_start();
181 /*initialize the number of active processes */
182 active_processes = smpi_process_count();
185 reqq = xbt_dict_new();
188 set_reqq_self(xbt_dynar_new(sizeof(MPI_Request),&xbt_free_ref));
191 reqq=xbt_new0(xbt_dynar_t,active_processes);
193 for(i=0;i<active_processes;i++){
194 reqq[i]=xbt_dynar_new(sizeof(MPI_Request),&xbt_free_ref);
200 static void action_finalize(const char *const *action)
204 static void action_comm_size(const char *const *action)
206 double clock = smpi_process_simulated_elapsed();
208 communicator_size = parse_double(action[2]);
209 log_timed_action (action, clock);
212 static void action_comm_split(const char *const *action)
214 double clock = smpi_process_simulated_elapsed();
216 log_timed_action (action, clock);
219 static void action_comm_dup(const char *const *action)
221 double clock = smpi_process_simulated_elapsed();
223 log_timed_action (action, clock);
226 static void action_compute(const char *const *action)
228 CHECK_ACTION_PARAMS(action, 1, 0);
229 double clock = smpi_process_simulated_elapsed();
230 double flops= parse_double(action[2]);
231 int rank = smpi_process_index();
232 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
233 extra->type=TRACING_COMPUTING;
234 extra->comp_size=flops;
235 TRACE_smpi_computing_in(rank, extra);
237 smpi_execute_flops(flops);
239 TRACE_smpi_computing_out(rank);
240 log_timed_action (action, clock);
243 static void action_send(const char *const *action)
245 CHECK_ACTION_PARAMS(action, 2, 1);
246 int to = atoi(action[2]);
247 double size=parse_double(action[3]);
248 double clock = smpi_process_simulated_elapsed();
251 MPI_CURRENT_TYPE=decode_datatype(action[4]);
253 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
256 int rank = smpi_process_index();
258 int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
259 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
260 extra->type = TRACING_SEND;
261 extra->send_size = size;
263 extra->dst = dst_traced;
264 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
265 TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
266 if (!TRACE_smpi_view_internals()) {
267 TRACE_smpi_send(rank, rank, dst_traced, size*smpi_datatype_size(MPI_CURRENT_TYPE));
270 smpi_mpi_send(NULL, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
272 log_timed_action (action, clock);
274 TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
277 static void action_Isend(const char *const *action)
279 CHECK_ACTION_PARAMS(action, 2, 1);
280 int to = atoi(action[2]);
281 double size=parse_double(action[3]);
282 double clock = smpi_process_simulated_elapsed();
285 if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
286 else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
288 int rank = smpi_process_index();
289 int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
290 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
291 extra->type = TRACING_ISEND;
292 extra->send_size = size;
294 extra->dst = dst_traced;
295 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
296 TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
297 if (!TRACE_smpi_view_internals()) {
298 TRACE_smpi_send(rank, rank, dst_traced, size*smpi_datatype_size(MPI_CURRENT_TYPE));
301 request = smpi_mpi_isend(NULL, size, MPI_CURRENT_TYPE, to, 0,MPI_COMM_WORLD);
303 TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
306 xbt_dynar_push(get_reqq_self(),&request);
308 log_timed_action (action, clock);
311 static void action_recv(const char *const *action) {
312 CHECK_ACTION_PARAMS(action, 2, 1);
313 int from = atoi(action[2]);
314 double size=parse_double(action[3]);
315 double clock = smpi_process_simulated_elapsed();
318 if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
319 else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
321 int rank = smpi_process_index();
322 int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
324 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
325 extra->type = TRACING_RECV;
326 extra->send_size = size;
327 extra->src = src_traced;
329 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
330 TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
332 //unknow size from the receiver pov
334 smpi_mpi_probe(from, 0, MPI_COMM_WORLD, &status);
338 smpi_mpi_recv(NULL, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
340 TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
341 if (!TRACE_smpi_view_internals()) {
342 TRACE_smpi_recv(rank, src_traced, rank);
345 log_timed_action (action, clock);
348 static void action_Irecv(const char *const *action)
350 CHECK_ACTION_PARAMS(action, 2, 1);
351 int from = atoi(action[2]);
352 double size=parse_double(action[3]);
353 double clock = smpi_process_simulated_elapsed();
356 if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
357 else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
359 int rank = smpi_process_index();
360 int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
361 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
362 extra->type = TRACING_IRECV;
363 extra->send_size = size;
364 extra->src = src_traced;
366 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
367 TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
369 //unknow size from the receiver pov
371 smpi_mpi_probe(from, 0, MPI_COMM_WORLD, &status);
375 request = smpi_mpi_irecv(NULL, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
377 TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
379 xbt_dynar_push(get_reqq_self(),&request);
381 log_timed_action (action, clock);
384 static void action_test(const char *const *action){
385 CHECK_ACTION_PARAMS(action, 0, 0);
386 double clock = smpi_process_simulated_elapsed();
391 request = xbt_dynar_pop_as(get_reqq_self(),MPI_Request);
392 //if request is null here, this may mean that a previous test has succeeded
393 //Different times in traced application and replayed version may lead to this
394 //In this case, ignore the extra calls.
396 int rank = smpi_process_index();
397 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
398 extra->type=TRACING_TEST;
399 TRACE_smpi_testing_in(rank, extra);
401 flag = smpi_mpi_test(&request, &status);
403 XBT_DEBUG("MPI_Test result: %d", flag);
404 /* push back request in dynar to be caught by a subsequent wait. if the test
405 * did succeed, the request is now NULL.
407 xbt_dynar_push_as(get_reqq_self(),MPI_Request, request);
409 TRACE_smpi_testing_out(rank);
411 log_timed_action (action, clock);
414 static void action_wait(const char *const *action){
415 CHECK_ACTION_PARAMS(action, 0, 0);
416 double clock = smpi_process_simulated_elapsed();
420 xbt_assert(xbt_dynar_length(get_reqq_self()),
421 "action wait not preceded by any irecv or isend: %s",
422 xbt_str_join_array(action," "));
423 request = xbt_dynar_pop_as(get_reqq_self(),MPI_Request);
426 /* Assuming that the trace is well formed, this mean the comm might have
427 * been caught by a MPI_test. Then just return.
432 int rank = request->comm != MPI_COMM_NULL
433 ? smpi_comm_rank(request->comm)
436 MPI_Group group = smpi_comm_group(request->comm);
437 int src_traced = smpi_group_rank(group, request->src);
438 int dst_traced = smpi_group_rank(group, request->dst);
439 int is_wait_for_receive = request->recv;
440 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
441 extra->type = TRACING_WAIT;
442 TRACE_smpi_ptp_in(rank, src_traced, dst_traced, __FUNCTION__, extra);
444 smpi_mpi_wait(&request, &status);
446 TRACE_smpi_ptp_out(rank, src_traced, dst_traced, __FUNCTION__);
447 if (is_wait_for_receive)
448 TRACE_smpi_recv(rank, src_traced, dst_traced);
449 log_timed_action (action, clock);
452 static void action_waitall(const char *const *action){
453 CHECK_ACTION_PARAMS(action, 0, 0);
454 double clock = smpi_process_simulated_elapsed();
455 int count_requests=0;
458 count_requests=xbt_dynar_length(get_reqq_self());
460 if (count_requests>0) {
461 MPI_Request requests[count_requests];
462 MPI_Status status[count_requests];
464 /* The reqq is an array of dynars. Its index corresponds to the rank.
465 Thus each rank saves its own requests to the array request. */
466 xbt_dynar_foreach(get_reqq_self(),i,requests[i]);
468 //save information from requests
470 xbt_dynar_t srcs = xbt_dynar_new(sizeof(int), NULL);
471 xbt_dynar_t dsts = xbt_dynar_new(sizeof(int), NULL);
472 xbt_dynar_t recvs = xbt_dynar_new(sizeof(int), NULL);
473 for (i = 0; i < count_requests; i++) {
475 int *asrc = xbt_new(int, 1);
476 int *adst = xbt_new(int, 1);
477 int *arecv = xbt_new(int, 1);
478 *asrc = requests[i]->src;
479 *adst = requests[i]->dst;
480 *arecv = requests[i]->recv;
481 xbt_dynar_insert_at(srcs, i, asrc);
482 xbt_dynar_insert_at(dsts, i, adst);
483 xbt_dynar_insert_at(recvs, i, arecv);
488 int *t = xbt_new(int, 1);
489 xbt_dynar_insert_at(srcs, i, t);
490 xbt_dynar_insert_at(dsts, i, t);
491 xbt_dynar_insert_at(recvs, i, t);
495 int rank_traced = smpi_process_index();
496 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
497 extra->type = TRACING_WAITALL;
498 extra->send_size=count_requests;
499 TRACE_smpi_ptp_in(rank_traced, -1, -1, __FUNCTION__,extra);
501 smpi_mpi_waitall(count_requests, requests, status);
503 for (i = 0; i < count_requests; i++) {
504 int src_traced, dst_traced, is_wait_for_receive;
505 xbt_dynar_get_cpy(srcs, i, &src_traced);
506 xbt_dynar_get_cpy(dsts, i, &dst_traced);
507 xbt_dynar_get_cpy(recvs, i, &is_wait_for_receive);
508 if (is_wait_for_receive) {
509 TRACE_smpi_recv(rank_traced, src_traced, dst_traced);
512 TRACE_smpi_ptp_out(rank_traced, -1, -1, __FUNCTION__);
514 xbt_dynar_free(&srcs);
515 xbt_dynar_free(&dsts);
516 xbt_dynar_free(&recvs);
518 //TODO xbt_dynar_free_container(get_reqq_self());
519 set_reqq_self(xbt_dynar_new(sizeof(MPI_Request),&xbt_free_ref));
521 log_timed_action (action, clock);
524 static void action_barrier(const char *const *action){
525 double clock = smpi_process_simulated_elapsed();
526 int rank = smpi_process_index();
527 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
528 extra->type = TRACING_BARRIER;
529 TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
531 mpi_coll_barrier_fun(MPI_COMM_WORLD);
533 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
534 log_timed_action (action, clock);
538 static void action_bcast(const char *const *action)
540 CHECK_ACTION_PARAMS(action, 1, 2);
541 double size = parse_double(action[2]);
542 double clock = smpi_process_simulated_elapsed();
545 * Initialize MPI_CURRENT_TYPE in order to decrease
546 * the number of the checks
548 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
551 root= atoi(action[3]);
553 MPI_CURRENT_TYPE=decode_datatype(action[4]);
557 int rank = smpi_process_index();
558 int root_traced = smpi_group_index(smpi_comm_group(MPI_COMM_WORLD), root);
560 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
561 extra->type = TRACING_BCAST;
562 extra->send_size = size;
563 extra->root = root_traced;
564 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
565 TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__, extra);
566 void *sendbuf = smpi_get_tmp_sendbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
568 mpi_coll_bcast_fun(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
570 TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
571 log_timed_action (action, clock);
574 static void action_reduce(const char *const *action)
576 CHECK_ACTION_PARAMS(action, 2, 2);
577 double comm_size = parse_double(action[2]);
578 double comp_size = parse_double(action[3]);
579 double clock = smpi_process_simulated_elapsed();
581 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
584 root= atoi(action[4]);
586 MPI_CURRENT_TYPE=decode_datatype(action[5]);
592 int rank = smpi_process_index();
593 int root_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), root);
594 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
595 extra->type = TRACING_REDUCE;
596 extra->send_size = comm_size;
597 extra->comp_size = comp_size;
598 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
599 extra->root = root_traced;
601 TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__,extra);
603 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
604 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
605 mpi_coll_reduce_fun(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
606 smpi_execute_flops(comp_size);
608 TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
609 log_timed_action (action, clock);
612 static void action_allReduce(const char *const *action) {
613 CHECK_ACTION_PARAMS(action, 2, 1);
614 double comm_size = parse_double(action[2]);
615 double comp_size = parse_double(action[3]);
617 if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
618 else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
620 double clock = smpi_process_simulated_elapsed();
621 int rank = smpi_process_index();
622 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
623 extra->type = TRACING_ALLREDUCE;
624 extra->send_size = comm_size;
625 extra->comp_size = comp_size;
626 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
627 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
629 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
630 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
631 mpi_coll_allreduce_fun(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
632 smpi_execute_flops(comp_size);
634 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
635 log_timed_action (action, clock);
638 static void action_allToAll(const char *const *action) {
639 CHECK_ACTION_PARAMS(action, 2, 2); //two mandatory (send and recv volumes)
640 //two optional (corresponding datatypes)
641 double clock = smpi_process_simulated_elapsed();
642 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
643 int send_size = parse_double(action[2]);
644 int recv_size = parse_double(action[3]);
645 MPI_Datatype MPI_CURRENT_TYPE2;
648 MPI_CURRENT_TYPE=decode_datatype(action[4]);
651 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
655 MPI_CURRENT_TYPE2=decode_datatype(action[5]);
658 MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
660 void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
661 void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
663 int rank = smpi_process_index();
664 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
665 extra->type = TRACING_ALLTOALL;
666 extra->send_size = send_size;
667 extra->recv_size = recv_size;
668 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
669 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, NULL);
671 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
673 mpi_coll_alltoall_fun(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
675 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
676 log_timed_action (action, clock);
680 static void action_gather(const char *const *action) {
682 The structure of the gather action for the rank 0 (total 4 processes)
687 1) 68 is the sendcounts
688 2) 68 is the recvcounts
689 3) 0 is the root node
690 4) 0 is the send datatype id, see decode_datatype()
691 5) 0 is the recv datatype id, see decode_datatype()
693 CHECK_ACTION_PARAMS(action, 2, 3);
694 double clock = smpi_process_simulated_elapsed();
695 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
696 int send_size = parse_double(action[2]);
697 int recv_size = parse_double(action[3]);
698 MPI_Datatype MPI_CURRENT_TYPE2;
699 if(action[4] && action[5]) {
700 MPI_CURRENT_TYPE=decode_datatype(action[5]);
701 MPI_CURRENT_TYPE2=decode_datatype(action[6]);
703 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
704 MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
706 void *send = smpi_get_tmp_sendbuffer(send_size* smpi_datatype_size(MPI_CURRENT_TYPE));
710 root=atoi(action[4]);
711 int rank = smpi_comm_rank(MPI_COMM_WORLD);
714 recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
716 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
717 extra->type = TRACING_GATHER;
718 extra->send_size = send_size;
719 extra->recv_size = recv_size;
721 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
722 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, NULL);
724 TRACE_smpi_collective_in(smpi_process_index(), root, __FUNCTION__, extra);
726 mpi_coll_gather_fun(send, send_size, MPI_CURRENT_TYPE,
727 recv, recv_size, MPI_CURRENT_TYPE2,
728 root, MPI_COMM_WORLD);
730 TRACE_smpi_collective_out(smpi_process_index(), -1, __FUNCTION__);
731 log_timed_action (action, clock);
736 static void action_gatherv(const char *const *action) {
738 The structure of the gatherv action for the rank 0 (total 4 processes)
740 0 gather 68 68 10 10 10 0 0 0
743 1) 68 is the sendcount
744 2) 68 10 10 10 is the recvcounts
745 3) 0 is the root node
746 4) 0 is the send datatype id, see decode_datatype()
747 5) 0 is the recv datatype id, see decode_datatype()
750 double clock = smpi_process_simulated_elapsed();
751 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
752 CHECK_ACTION_PARAMS(action, comm_size+1, 2);
753 int send_size = parse_double(action[2]);
754 int *disps = xbt_new0(int, comm_size);
755 int *recvcounts = xbt_new0(int, comm_size);
758 MPI_Datatype MPI_CURRENT_TYPE2;
759 if(action[4+comm_size]) {
760 MPI_CURRENT_TYPE=decode_datatype(action[4+comm_size]);
761 MPI_CURRENT_TYPE2=decode_datatype(action[5+comm_size]);
763 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
764 MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
766 void *send = smpi_get_tmp_sendbuffer(send_size* smpi_datatype_size(MPI_CURRENT_TYPE));
768 for(i=0;i<comm_size;i++) {
769 recvcounts[i] = atoi(action[i+3]);
770 recv_sum=recv_sum+recvcounts[i];
774 int root=atoi(action[3+comm_size]);
775 int rank = smpi_comm_rank(MPI_COMM_WORLD);;
778 recv = smpi_get_tmp_recvbuffer(recv_sum* smpi_datatype_size(MPI_CURRENT_TYPE2));
780 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
781 extra->type = TRACING_GATHERV;
782 extra->send_size = send_size;
783 extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
784 for(i=0; i< comm_size; i++)//copy data to avoid bad free
785 extra->recvcounts[i] = recvcounts[i];
787 extra->num_processes = comm_size;
788 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
789 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, NULL);
791 TRACE_smpi_collective_in(smpi_process_index(), root, __FUNCTION__, extra);
793 smpi_mpi_gatherv(send, send_size, MPI_CURRENT_TYPE,
794 recv, recvcounts, disps, MPI_CURRENT_TYPE2,
795 root, MPI_COMM_WORLD);
797 TRACE_smpi_collective_out(smpi_process_index(), -1, __FUNCTION__);
798 log_timed_action (action, clock);
799 xbt_free(recvcounts);
803 static void action_reducescatter(const char *const *action) {
806 The structure of the reducescatter action for the rank 0 (total 4 processes)
808 0 reduceScatter 275427 275427 275427 204020 11346849 0
811 1) The first four values after the name of the action declare the recvcounts array
812 2) The value 11346849 is the amount of instructions
813 3) The last value corresponds to the datatype, see decode_datatype().
815 We analyze a MPI_Reduce_scatter call to one MPI_Reduce and one MPI_Scatterv.
819 double clock = smpi_process_simulated_elapsed();
820 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
821 CHECK_ACTION_PARAMS(action, comm_size+1, 1);
822 int comp_size = parse_double(action[2+comm_size]);
823 int *recvcounts = xbt_new0(int, comm_size);
824 int *disps = xbt_new0(int, comm_size);
826 int rank = smpi_process_index();
828 if(action[3+comm_size])
829 MPI_CURRENT_TYPE=decode_datatype(action[3+comm_size]);
831 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
833 for(i=0;i<comm_size;i++) {
834 recvcounts[i] = atoi(action[i+2]);
839 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
840 extra->type = TRACING_REDUCE_SCATTER;
841 extra->send_size = 0;
842 extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
843 for(i=0; i< comm_size; i++)//copy data to avoid bad free
844 extra->recvcounts[i] = recvcounts[i];
845 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
846 extra->comp_size = comp_size;
847 extra->num_processes = comm_size;
849 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
851 void *sendbuf = smpi_get_tmp_sendbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
852 void *recvbuf = smpi_get_tmp_recvbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
854 mpi_coll_reduce_scatter_fun(sendbuf, recvbuf, recvcounts, MPI_CURRENT_TYPE, MPI_OP_NULL,
856 smpi_execute_flops(comp_size);
859 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
860 xbt_free(recvcounts);
862 log_timed_action (action, clock);
865 static void action_allgather(const char *const *action) {
867 The structure of the allgather action for the rank 0 (total 4 processes)
869 0 allGather 275427 275427
872 1) 275427 is the sendcount
873 2) 275427 is the recvcount
874 3) No more values mean that the datatype for sent and receive buffer
875 is the default one, see decode_datatype().
879 double clock = smpi_process_simulated_elapsed();
881 CHECK_ACTION_PARAMS(action, 2, 2);
882 int sendcount=atoi(action[2]);
883 int recvcount=atoi(action[3]);
885 MPI_Datatype MPI_CURRENT_TYPE2;
888 MPI_CURRENT_TYPE = decode_datatype(action[3]);
889 MPI_CURRENT_TYPE2 = decode_datatype(action[4]);
891 MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
892 MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
894 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* smpi_datatype_size(MPI_CURRENT_TYPE));
895 void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* smpi_datatype_size(MPI_CURRENT_TYPE2));
897 int rank = smpi_process_index();
898 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
899 extra->type = TRACING_ALLGATHER;
900 extra->send_size = sendcount;
901 extra->recv_size= recvcount;
902 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
903 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, NULL);
904 extra->num_processes = smpi_comm_size(MPI_COMM_WORLD);
906 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
908 mpi_coll_allgather_fun(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
910 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
911 log_timed_action (action, clock);
914 static void action_allgatherv(const char *const *action) {
917 The structure of the allgatherv action for the rank 0 (total 4 processes)
919 0 allGatherV 275427 275427 275427 275427 204020
922 1) 275427 is the sendcount
923 2) The next four elements declare the recvcounts array
924 3) No more values mean that the datatype for sent and receive buffer
925 is the default one, see decode_datatype().
929 double clock = smpi_process_simulated_elapsed();
931 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
932 CHECK_ACTION_PARAMS(action, comm_size+1, 2);
934 int sendcount=atoi(action[2]);
935 int *recvcounts = xbt_new0(int, comm_size);
936 int *disps = xbt_new0(int, comm_size);
938 MPI_Datatype MPI_CURRENT_TYPE2;
940 if(action[3+comm_size] && action[4+comm_size]) {
941 MPI_CURRENT_TYPE = decode_datatype(action[3+comm_size]);
942 MPI_CURRENT_TYPE2 = decode_datatype(action[4+comm_size]);
944 MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
945 MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
947 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* smpi_datatype_size(MPI_CURRENT_TYPE));
949 for(i=0;i<comm_size;i++) {
950 recvcounts[i] = atoi(action[i+3]);
951 recv_sum=recv_sum+recvcounts[i];
953 void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* smpi_datatype_size(MPI_CURRENT_TYPE2));
955 int rank = smpi_process_index();
956 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
957 extra->type = TRACING_ALLGATHERV;
958 extra->send_size = sendcount;
959 extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
960 for(i=0; i< comm_size; i++)//copy data to avoid bad free
961 extra->recvcounts[i] = recvcounts[i];
962 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
963 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, NULL);
964 extra->num_processes = comm_size;
966 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
968 mpi_coll_allgatherv_fun(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts, disps, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
970 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
971 log_timed_action (action, clock);
972 xbt_free(recvcounts);
976 static void action_allToAllv(const char *const *action) {
978 The structure of the allToAllV action for the rank 0 (total 4 processes)
980 0 allToAllV 100 1 7 10 12 100 1 70 10 5
983 1) 100 is the size of the send buffer *sizeof(int),
984 2) 1 7 10 12 is the sendcounts array
985 3) 100*sizeof(int) is the size of the receiver buffer
986 4) 1 70 10 5 is the recvcounts array
991 double clock = smpi_process_simulated_elapsed();
993 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
994 CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2);
995 int send_buf_size=0,recv_buf_size=0,i=0;
996 int *sendcounts = xbt_new0(int, comm_size);
997 int *recvcounts = xbt_new0(int, comm_size);
998 int *senddisps = xbt_new0(int, comm_size);
999 int *recvdisps = xbt_new0(int, comm_size);
1001 MPI_Datatype MPI_CURRENT_TYPE2;
1003 send_buf_size=parse_double(action[2]);
1004 recv_buf_size=parse_double(action[3+comm_size]);
1005 if(action[4+2*comm_size] && action[5+2*comm_size]) {
1006 MPI_CURRENT_TYPE=decode_datatype(action[4+2*comm_size]);
1007 MPI_CURRENT_TYPE2=decode_datatype(action[5+2*comm_size]);
1010 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
1011 MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
1014 void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* smpi_datatype_size(MPI_CURRENT_TYPE));
1015 void *recvbuf = smpi_get_tmp_recvbuffer(recv_buf_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
1017 for(i=0;i<comm_size;i++) {
1018 sendcounts[i] = atoi(action[i+3]);
1019 recvcounts[i] = atoi(action[i+4+comm_size]);
1023 int rank = smpi_process_index();
1024 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
1025 extra->type = TRACING_ALLTOALLV;
1026 extra->recvcounts= xbt_malloc(comm_size*sizeof(int));
1027 extra->sendcounts= xbt_malloc(comm_size*sizeof(int));
1028 extra->num_processes = comm_size;
1030 for(i=0; i< comm_size; i++){//copy data to avoid bad free
1031 extra->send_size += sendcounts[i];
1032 extra->sendcounts[i] = sendcounts[i];
1033 extra->recv_size += recvcounts[i];
1034 extra->recvcounts[i] = recvcounts[i];
1036 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
1037 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, NULL);
1039 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
1041 mpi_coll_alltoallv_fun(sendbuf, sendcounts, senddisps, MPI_CURRENT_TYPE,
1042 recvbuf, recvcounts, recvdisps, MPI_CURRENT_TYPE,
1045 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
1046 log_timed_action (action, clock);
1047 xbt_free(sendcounts);
1048 xbt_free(recvcounts);
1049 xbt_free(senddisps);
1050 xbt_free(recvdisps);
1053 void smpi_replay_run(int *argc, char***argv){
1054 /* First initializes everything */
1055 smpi_process_init(argc, argv);
1056 smpi_process_mark_as_initialized();
1057 smpi_process_set_replaying(1);
1059 int rank = smpi_process_index();
1060 TRACE_smpi_init(rank);
1061 TRACE_smpi_computing_init(rank);
1062 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
1063 extra->type = TRACING_INIT;
1064 char *operation =bprintf("%s_init",__FUNCTION__);
1065 TRACE_smpi_collective_in(rank, -1, operation, extra);
1066 TRACE_smpi_collective_out(rank, -1, operation);
1069 if (!_xbt_replay_action_init()) {
1070 xbt_replay_action_register("init", action_init);
1071 xbt_replay_action_register("finalize", action_finalize);
1072 xbt_replay_action_register("comm_size", action_comm_size);
1073 xbt_replay_action_register("comm_split", action_comm_split);
1074 xbt_replay_action_register("comm_dup", action_comm_dup);
1075 xbt_replay_action_register("send", action_send);
1076 xbt_replay_action_register("Isend", action_Isend);
1077 xbt_replay_action_register("recv", action_recv);
1078 xbt_replay_action_register("Irecv", action_Irecv);
1079 xbt_replay_action_register("test", action_test);
1080 xbt_replay_action_register("wait", action_wait);
1081 xbt_replay_action_register("waitAll", action_waitall);
1082 xbt_replay_action_register("barrier", action_barrier);
1083 xbt_replay_action_register("bcast", action_bcast);
1084 xbt_replay_action_register("reduce", action_reduce);
1085 xbt_replay_action_register("allReduce", action_allReduce);
1086 xbt_replay_action_register("allToAll", action_allToAll);
1087 xbt_replay_action_register("allToAllV", action_allToAllv);
1088 xbt_replay_action_register("gather", action_gather);
1089 xbt_replay_action_register("gatherV", action_gatherv);
1090 xbt_replay_action_register("allGather", action_allgather);
1091 xbt_replay_action_register("allGatherV", action_allgatherv);
1092 xbt_replay_action_register("reduceScatter", action_reducescatter);
1093 xbt_replay_action_register("compute", action_compute);
1096 //if we have a delayed start, sleep here.
1099 double value = strtod((*argv)[2], &endptr);
1100 if (*endptr != '\0')
1101 THROWF(unknown_error, 0, "%s is not a double", (*argv)[2]);
1102 XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
1103 smpi_execute_flops(value);
1105 //UGLY: force a context switch to be sure that all MSG_processes begin initialization
1106 XBT_DEBUG("Force context switch by smpi_execute_flops - Sleeping for 0.0 flops ");
1107 smpi_execute_flops(0.0);
1110 /* Actually run the replay */
1111 xbt_replay_action_runner(*argc, *argv);
1113 /* and now, finalize everything */
1114 double sim_time= 1.;
1115 /* One active process will stop. Decrease the counter*/
1116 XBT_DEBUG("There are %lu elements in reqq[*]",
1117 xbt_dynar_length(get_reqq_self()));
1118 if (!xbt_dynar_is_empty(get_reqq_self())){
1119 int count_requests=xbt_dynar_length(get_reqq_self());
1120 MPI_Request requests[count_requests];
1121 MPI_Status status[count_requests];
1124 xbt_dynar_foreach(get_reqq_self(),i,requests[i]);
1125 smpi_mpi_waitall(count_requests, requests, status);
1131 if(!active_processes){
1132 /* Last process alive speaking */
1133 /* end the simulated timer */
1134 sim_time = smpi_process_simulated_elapsed();
1138 //TODO xbt_dynar_free_container(get_reqq_self()));
1140 if(!active_processes){
1141 XBT_INFO("Simulation time %f", sim_time);
1142 _xbt_replay_action_exit();
1143 xbt_free(sendbuffer);
1144 xbt_free(recvbuffer);
1146 xbt_dict_free(&reqq); //not need, data have been freed ???
1150 instr_extra_data extra_fin = xbt_new0(s_instr_extra_data_t,1);
1151 extra_fin->type = TRACING_FINALIZE;
1152 operation =bprintf("%s_finalize",__FUNCTION__);
1153 TRACE_smpi_collective_in(rank, -1, operation, extra_fin);
1155 smpi_process_finalize();
1157 TRACE_smpi_collective_out(rank, -1, operation);
1158 TRACE_smpi_finalize(smpi_process_index());
1159 smpi_process_destroy();