1 /* Copyright (c) 2009-2015. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
10 #include <xbt/replay.h>
12 #define KEY_SIZE (sizeof(int) * 2 + 1)
14 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
16 int communicator_size = 0;
17 static int active_processes = 0;
18 xbt_dict_t reqq = NULL;
20 MPI_Datatype MPI_DEFAULT_TYPE;
21 MPI_Datatype MPI_CURRENT_TYPE;
23 static int sendbuffer_size=0;
24 char* sendbuffer=NULL;
25 static int recvbuffer_size=0;
26 char* recvbuffer=NULL;
28 static void log_timed_action (const char *const *action, double clock){
29 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
30 char *name = xbt_str_join_array(action, " ");
31 XBT_VERB("%s %f", name, smpi_process_simulated_elapsed()-clock);
36 static xbt_dynar_t get_reqq_self()
38 char * key = bprintf("%d", smpi_process_index());
39 xbt_dynar_t dynar_mpi_request = (xbt_dynar_t) xbt_dict_get(reqq, key);
42 return dynar_mpi_request;
45 static void set_reqq_self(xbt_dynar_t mpi_request)
47 char * key = bprintf("%d", smpi_process_index());
48 xbt_dict_set(reqq, key, mpi_request, free);
52 //allocate a single buffer for all sends, growing it if needed
53 void* smpi_get_tmp_sendbuffer(int size)
55 if (!smpi_process_get_replaying())
56 return xbt_malloc(size);
57 if (sendbuffer_size<size){
58 sendbuffer=static_cast<char*>(xbt_realloc(sendbuffer,size));
64 //allocate a single buffer for all recv
65 void* smpi_get_tmp_recvbuffer(int size){
66 if (!smpi_process_get_replaying())
67 return xbt_malloc(size);
68 if (recvbuffer_size<size){
69 recvbuffer=static_cast<char*>(xbt_realloc(recvbuffer,size));
75 void smpi_free_tmp_buffer(void* buf){
76 if (!smpi_process_get_replaying())
81 static double parse_double(const char *string)
85 value = strtod(string, &endptr);
87 THROWF(unknown_error, 0, "%s is not a double", string);
91 static MPI_Datatype decode_datatype(const char *const action)
93 // Declared datatypes,
94 switch(atoi(action)) {
96 MPI_CURRENT_TYPE=MPI_DOUBLE;
99 MPI_CURRENT_TYPE=MPI_INT;
102 MPI_CURRENT_TYPE=MPI_CHAR;
105 MPI_CURRENT_TYPE=MPI_SHORT;
108 MPI_CURRENT_TYPE=MPI_LONG;
111 MPI_CURRENT_TYPE=MPI_FLOAT;
114 MPI_CURRENT_TYPE=MPI_BYTE;
117 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
119 return MPI_CURRENT_TYPE;
123 const char* encode_datatype(MPI_Datatype datatype, int* known)
125 //default type for output is set to MPI_BYTE
126 // MPI_DEFAULT_TYPE is not set for output, use directly MPI_BYTE
128 if (datatype==MPI_BYTE){
131 if(datatype==MPI_DOUBLE)
133 if(datatype==MPI_INT)
135 if(datatype==MPI_CHAR)
137 if(datatype==MPI_SHORT)
139 if(datatype==MPI_LONG)
141 if(datatype==MPI_FLOAT)
143 //tell that the datatype is not handled by replay, and that its size should be measured and replayed as size*MPI_BYTE
145 // default - not implemented.
146 // do not warn here as we pass in this function even for other trace formats
150 #define CHECK_ACTION_PARAMS(action, mandatory, optional) {\
152 while(action[i]!=NULL)\
155 THROWF(arg_error, 0, "%s replay failed.\n" \
156 "%d items were given on the line. First two should be process_id and action. " \
157 "This action needs after them %d mandatory arguments, and accepts %d optional ones. \n" \
158 "Please contact the Simgrid team if support is needed", __FUNCTION__, i, mandatory, optional);\
161 static void action_init(const char *const *action)
163 XBT_DEBUG("Initialize the counters");
164 CHECK_ACTION_PARAMS(action, 0, 1);
165 if(action[2]) MPI_DEFAULT_TYPE= MPI_DOUBLE; // default MPE dataype
166 else MPI_DEFAULT_TYPE= MPI_BYTE; // default TAU datatype
168 /* start a simulated timer */
169 smpi_process_simulated_start();
170 /*initialize the number of active processes */
171 active_processes = smpi_process_count();
174 reqq = xbt_dict_new();
177 set_reqq_self(xbt_dynar_new(sizeof(MPI_Request),&xbt_free_ref));
180 static void action_finalize(const char *const *action)
184 static void action_comm_size(const char *const *action)
186 double clock = smpi_process_simulated_elapsed();
188 communicator_size = parse_double(action[2]);
189 log_timed_action (action, clock);
192 static void action_comm_split(const char *const *action)
194 double clock = smpi_process_simulated_elapsed();
196 log_timed_action (action, clock);
199 static void action_comm_dup(const char *const *action)
201 double clock = smpi_process_simulated_elapsed();
203 log_timed_action (action, clock);
206 static void action_compute(const char *const *action)
208 CHECK_ACTION_PARAMS(action, 1, 0);
209 double clock = smpi_process_simulated_elapsed();
210 double flops= parse_double(action[2]);
211 int rank = smpi_process_index();
212 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
213 extra->type=TRACING_COMPUTING;
214 extra->comp_size=flops;
215 TRACE_smpi_computing_in(rank, extra);
217 smpi_execute_flops(flops);
219 TRACE_smpi_computing_out(rank);
220 log_timed_action (action, clock);
223 static void action_send(const char *const *action)
225 CHECK_ACTION_PARAMS(action, 2, 1);
226 int to = atoi(action[2]);
227 double size=parse_double(action[3]);
228 double clock = smpi_process_simulated_elapsed();
231 MPI_CURRENT_TYPE=decode_datatype(action[4]);
233 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
236 int rank = smpi_process_index();
238 int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
239 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
240 extra->type = TRACING_SEND;
241 extra->send_size = size;
243 extra->dst = dst_traced;
244 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
245 TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
246 if (!TRACE_smpi_view_internals()) {
247 TRACE_smpi_send(rank, rank, dst_traced, size*smpi_datatype_size(MPI_CURRENT_TYPE));
250 smpi_mpi_send(NULL, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
252 log_timed_action (action, clock);
254 TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
257 static void action_Isend(const char *const *action)
259 CHECK_ACTION_PARAMS(action, 2, 1);
260 int to = atoi(action[2]);
261 double size=parse_double(action[3]);
262 double clock = smpi_process_simulated_elapsed();
265 if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
266 else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
268 int rank = smpi_process_index();
269 int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
270 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
271 extra->type = TRACING_ISEND;
272 extra->send_size = size;
274 extra->dst = dst_traced;
275 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
276 TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
277 if (!TRACE_smpi_view_internals()) {
278 TRACE_smpi_send(rank, rank, dst_traced, size*smpi_datatype_size(MPI_CURRENT_TYPE));
281 request = smpi_mpi_isend(NULL, size, MPI_CURRENT_TYPE, to, 0,MPI_COMM_WORLD);
283 TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
286 xbt_dynar_push(get_reqq_self(),&request);
288 log_timed_action (action, clock);
291 static void action_recv(const char *const *action) {
292 CHECK_ACTION_PARAMS(action, 2, 1);
293 int from = atoi(action[2]);
294 double size=parse_double(action[3]);
295 double clock = smpi_process_simulated_elapsed();
298 if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
299 else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
301 int rank = smpi_process_index();
302 int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
304 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
305 extra->type = TRACING_RECV;
306 extra->send_size = size;
307 extra->src = src_traced;
309 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
310 TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
312 //unknow size from the receiver pov
314 smpi_mpi_probe(from, 0, MPI_COMM_WORLD, &status);
318 smpi_mpi_recv(NULL, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
320 TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
321 if (!TRACE_smpi_view_internals()) {
322 TRACE_smpi_recv(rank, src_traced, rank);
325 log_timed_action (action, clock);
328 static void action_Irecv(const char *const *action)
330 CHECK_ACTION_PARAMS(action, 2, 1);
331 int from = atoi(action[2]);
332 double size=parse_double(action[3]);
333 double clock = smpi_process_simulated_elapsed();
336 if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
337 else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
339 int rank = smpi_process_index();
340 int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
341 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
342 extra->type = TRACING_IRECV;
343 extra->send_size = size;
344 extra->src = src_traced;
346 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
347 TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
349 //unknow size from the receiver pov
351 smpi_mpi_probe(from, 0, MPI_COMM_WORLD, &status);
355 request = smpi_mpi_irecv(NULL, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
357 TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
359 xbt_dynar_push(get_reqq_self(),&request);
361 log_timed_action (action, clock);
364 static void action_test(const char *const *action){
365 CHECK_ACTION_PARAMS(action, 0, 0);
366 double clock = smpi_process_simulated_elapsed();
371 request = xbt_dynar_pop_as(get_reqq_self(),MPI_Request);
372 //if request is null here, this may mean that a previous test has succeeded
373 //Different times in traced application and replayed version may lead to this
374 //In this case, ignore the extra calls.
376 int rank = smpi_process_index();
377 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
378 extra->type=TRACING_TEST;
379 TRACE_smpi_testing_in(rank, extra);
381 flag = smpi_mpi_test(&request, &status);
383 XBT_DEBUG("MPI_Test result: %d", flag);
384 /* push back request in dynar to be caught by a subsequent wait. if the test did succeed, the request is now NULL.*/
385 xbt_dynar_push_as(get_reqq_self(),MPI_Request, request);
387 TRACE_smpi_testing_out(rank);
389 log_timed_action (action, clock);
392 static void action_wait(const char *const *action){
393 CHECK_ACTION_PARAMS(action, 0, 0);
394 double clock = smpi_process_simulated_elapsed();
398 xbt_assert(xbt_dynar_length(get_reqq_self()),
399 "action wait not preceded by any irecv or isend: %s",
400 xbt_str_join_array(action," "));
401 request = xbt_dynar_pop_as(get_reqq_self(),MPI_Request);
404 /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just return.*/
408 int rank = request->comm != MPI_COMM_NULL ? smpi_comm_rank(request->comm) : -1;
410 MPI_Group group = smpi_comm_group(request->comm);
411 int src_traced = smpi_group_rank(group, request->src);
412 int dst_traced = smpi_group_rank(group, request->dst);
413 int is_wait_for_receive = request->recv;
414 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
415 extra->type = TRACING_WAIT;
416 TRACE_smpi_ptp_in(rank, src_traced, dst_traced, __FUNCTION__, extra);
418 smpi_mpi_wait(&request, &status);
420 TRACE_smpi_ptp_out(rank, src_traced, dst_traced, __FUNCTION__);
421 if (is_wait_for_receive)
422 TRACE_smpi_recv(rank, src_traced, dst_traced);
423 log_timed_action (action, clock);
426 static void action_waitall(const char *const *action){
427 CHECK_ACTION_PARAMS(action, 0, 0);
428 double clock = smpi_process_simulated_elapsed();
429 int count_requests=0;
432 count_requests=xbt_dynar_length(get_reqq_self());
434 if (count_requests>0) {
435 MPI_Request requests[count_requests];
436 MPI_Status status[count_requests];
438 /* The reqq is an array of dynars. Its index corresponds to the rank.
439 Thus each rank saves its own requests to the array request. */
440 xbt_dynar_foreach(get_reqq_self(),i,requests[i]);
442 //save information from requests
443 xbt_dynar_t srcs = xbt_dynar_new(sizeof(int), NULL);
444 xbt_dynar_t dsts = xbt_dynar_new(sizeof(int), NULL);
445 xbt_dynar_t recvs = xbt_dynar_new(sizeof(int), NULL);
446 for (i = 0; (int)i < count_requests; i++) {
448 int *asrc = xbt_new(int, 1);
449 int *adst = xbt_new(int, 1);
450 int *arecv = xbt_new(int, 1);
451 *asrc = requests[i]->src;
452 *adst = requests[i]->dst;
453 *arecv = requests[i]->recv;
454 xbt_dynar_insert_at(srcs, i, asrc);
455 xbt_dynar_insert_at(dsts, i, adst);
456 xbt_dynar_insert_at(recvs, i, arecv);
461 int *t = xbt_new(int, 1);
462 xbt_dynar_insert_at(srcs, i, t);
463 xbt_dynar_insert_at(dsts, i, t);
464 xbt_dynar_insert_at(recvs, i, t);
468 int rank_traced = smpi_process_index();
469 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
470 extra->type = TRACING_WAITALL;
471 extra->send_size=count_requests;
472 TRACE_smpi_ptp_in(rank_traced, -1, -1, __FUNCTION__,extra);
474 smpi_mpi_waitall(count_requests, requests, status);
476 for (i = 0; (int)i < count_requests; i++) {
477 int src_traced, dst_traced, is_wait_for_receive;
478 xbt_dynar_get_cpy(srcs, i, &src_traced);
479 xbt_dynar_get_cpy(dsts, i, &dst_traced);
480 xbt_dynar_get_cpy(recvs, i, &is_wait_for_receive);
481 if (is_wait_for_receive) {
482 TRACE_smpi_recv(rank_traced, src_traced, dst_traced);
485 TRACE_smpi_ptp_out(rank_traced, -1, -1, __FUNCTION__);
487 xbt_dynar_free(&srcs);
488 xbt_dynar_free(&dsts);
489 xbt_dynar_free(&recvs);
491 //TODO xbt_dynar_free_container(get_reqq_self());
492 set_reqq_self(xbt_dynar_new(sizeof(MPI_Request),&xbt_free_ref));
494 log_timed_action (action, clock);
497 static void action_barrier(const char *const *action){
498 double clock = smpi_process_simulated_elapsed();
499 int rank = smpi_process_index();
500 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
501 extra->type = TRACING_BARRIER;
502 TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
504 mpi_coll_barrier_fun(MPI_COMM_WORLD);
506 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
507 log_timed_action (action, clock);
510 static void action_bcast(const char *const *action)
512 CHECK_ACTION_PARAMS(action, 1, 2);
513 double size = parse_double(action[2]);
514 double clock = smpi_process_simulated_elapsed();
516 /* Initialize MPI_CURRENT_TYPE in order to decrease the number of the checks */
517 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
520 root= atoi(action[3]);
522 MPI_CURRENT_TYPE=decode_datatype(action[4]);
526 int rank = smpi_process_index();
527 int root_traced = smpi_group_index(smpi_comm_group(MPI_COMM_WORLD), root);
529 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
530 extra->type = TRACING_BCAST;
531 extra->send_size = size;
532 extra->root = root_traced;
533 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
534 TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__, extra);
535 void *sendbuf = smpi_get_tmp_sendbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
537 mpi_coll_bcast_fun(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
539 TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
540 log_timed_action (action, clock);
543 static void action_reduce(const char *const *action)
545 CHECK_ACTION_PARAMS(action, 2, 2);
546 double comm_size = parse_double(action[2]);
547 double comp_size = parse_double(action[3]);
548 double clock = smpi_process_simulated_elapsed();
550 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
553 root= atoi(action[4]);
555 MPI_CURRENT_TYPE=decode_datatype(action[5]);
559 int rank = smpi_process_index();
560 int root_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), root);
561 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
562 extra->type = TRACING_REDUCE;
563 extra->send_size = comm_size;
564 extra->comp_size = comp_size;
565 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
566 extra->root = root_traced;
568 TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__,extra);
570 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
571 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
572 mpi_coll_reduce_fun(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
573 smpi_execute_flops(comp_size);
575 TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
576 log_timed_action (action, clock);
579 static void action_allReduce(const char *const *action) {
580 CHECK_ACTION_PARAMS(action, 2, 1);
581 double comm_size = parse_double(action[2]);
582 double comp_size = parse_double(action[3]);
584 if(action[4]) MPI_CURRENT_TYPE=decode_datatype(action[4]);
585 else MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
587 double clock = smpi_process_simulated_elapsed();
588 int rank = smpi_process_index();
589 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
590 extra->type = TRACING_ALLREDUCE;
591 extra->send_size = comm_size;
592 extra->comp_size = comp_size;
593 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
594 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
596 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
597 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
598 mpi_coll_allreduce_fun(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
599 smpi_execute_flops(comp_size);
601 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
602 log_timed_action (action, clock);
605 static void action_allToAll(const char *const *action) {
606 CHECK_ACTION_PARAMS(action, 2, 2); //two mandatory (send and recv volumes)
607 //two optional (corresponding datatypes)
608 double clock = smpi_process_simulated_elapsed();
609 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
610 int send_size = parse_double(action[2]);
611 int recv_size = parse_double(action[3]);
612 MPI_Datatype MPI_CURRENT_TYPE2;
614 if(action[4] && action[5]) {
615 MPI_CURRENT_TYPE=decode_datatype(action[4]);
616 MPI_CURRENT_TYPE2=decode_datatype(action[5]);
619 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
620 MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
623 void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
624 void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
626 int rank = smpi_process_index();
627 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
628 extra->type = TRACING_ALLTOALL;
629 extra->send_size = send_size;
630 extra->recv_size = recv_size;
631 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
632 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, NULL);
634 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
636 mpi_coll_alltoall_fun(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
638 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
639 log_timed_action (action, clock);
642 static void action_gather(const char *const *action) {
643 /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
647 1) 68 is the sendcounts
648 2) 68 is the recvcounts
649 3) 0 is the root node
650 4) 0 is the send datatype id, see decode_datatype()
651 5) 0 is the recv datatype id, see decode_datatype()
653 CHECK_ACTION_PARAMS(action, 2, 3);
654 double clock = smpi_process_simulated_elapsed();
655 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
656 int send_size = parse_double(action[2]);
657 int recv_size = parse_double(action[3]);
658 MPI_Datatype MPI_CURRENT_TYPE2;
659 if(action[4] && action[5]) {
660 MPI_CURRENT_TYPE=decode_datatype(action[5]);
661 MPI_CURRENT_TYPE2=decode_datatype(action[6]);
663 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
664 MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
666 void *send = smpi_get_tmp_sendbuffer(send_size* smpi_datatype_size(MPI_CURRENT_TYPE));
670 root=atoi(action[4]);
671 int rank = smpi_comm_rank(MPI_COMM_WORLD);
674 recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
676 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
677 extra->type = TRACING_GATHER;
678 extra->send_size = send_size;
679 extra->recv_size = recv_size;
681 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
682 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, NULL);
684 TRACE_smpi_collective_in(smpi_process_index(), root, __FUNCTION__, extra);
686 mpi_coll_gather_fun(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
688 TRACE_smpi_collective_out(smpi_process_index(), -1, __FUNCTION__);
689 log_timed_action (action, clock);
692 static void action_gatherv(const char *const *action) {
693 /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
694 0 gather 68 68 10 10 10 0 0 0
697 1) 68 is the sendcount
698 2) 68 10 10 10 is the recvcounts
699 3) 0 is the root node
700 4) 0 is the send datatype id, see decode_datatype()
701 5) 0 is the recv datatype id, see decode_datatype()
704 double clock = smpi_process_simulated_elapsed();
705 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
706 CHECK_ACTION_PARAMS(action, comm_size+1, 2);
707 int send_size = parse_double(action[2]);
708 int *disps = xbt_new0(int, comm_size);
709 int *recvcounts = xbt_new0(int, comm_size);
712 MPI_Datatype MPI_CURRENT_TYPE2;
713 if(action[4+comm_size] && action[5+comm_size]) {
714 MPI_CURRENT_TYPE=decode_datatype(action[4+comm_size]);
715 MPI_CURRENT_TYPE2=decode_datatype(action[5+comm_size]);
717 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
718 MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
720 void *send = smpi_get_tmp_sendbuffer(send_size* smpi_datatype_size(MPI_CURRENT_TYPE));
722 for(i=0;i<comm_size;i++) {
723 recvcounts[i] = atoi(action[i+3]);
724 recv_sum=recv_sum+recvcounts[i];
728 int root=atoi(action[3+comm_size]);
729 int rank = smpi_comm_rank(MPI_COMM_WORLD);;
732 recv = smpi_get_tmp_recvbuffer(recv_sum* smpi_datatype_size(MPI_CURRENT_TYPE2));
734 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
735 extra->type = TRACING_GATHERV;
736 extra->send_size = send_size;
737 extra->recvcounts= xbt_new(int,comm_size);
738 for(i=0; i< comm_size; i++)//copy data to avoid bad free
739 extra->recvcounts[i] = recvcounts[i];
741 extra->num_processes = comm_size;
742 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
743 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, NULL);
745 TRACE_smpi_collective_in(smpi_process_index(), root, __FUNCTION__, extra);
747 smpi_mpi_gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts, disps, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
749 TRACE_smpi_collective_out(smpi_process_index(), -1, __FUNCTION__);
750 log_timed_action (action, clock);
751 xbt_free(recvcounts);
755 static void action_reducescatter(const char *const *action) {
756 /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
757 0 reduceScatter 275427 275427 275427 204020 11346849 0
760 1) The first four values after the name of the action declare the recvcounts array
761 2) The value 11346849 is the amount of instructions
762 3) The last value corresponds to the datatype, see decode_datatype().
764 We analyze a MPI_Reduce_scatter call to one MPI_Reduce and one MPI_Scatterv. */
765 double clock = smpi_process_simulated_elapsed();
766 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
767 CHECK_ACTION_PARAMS(action, comm_size+1, 1);
768 int comp_size = parse_double(action[2+comm_size]);
769 int *recvcounts = xbt_new0(int, comm_size);
770 int *disps = xbt_new0(int, comm_size);
772 int rank = smpi_process_index();
774 if(action[3+comm_size])
775 MPI_CURRENT_TYPE=decode_datatype(action[3+comm_size]);
777 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
779 for(i=0;i<comm_size;i++) {
780 recvcounts[i] = atoi(action[i+2]);
785 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
786 extra->type = TRACING_REDUCE_SCATTER;
787 extra->send_size = 0;
788 extra->recvcounts= xbt_new(int, comm_size);
789 for(i=0; i< comm_size; i++)//copy data to avoid bad free
790 extra->recvcounts[i] = recvcounts[i];
791 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
792 extra->comp_size = comp_size;
793 extra->num_processes = comm_size;
795 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
797 void *sendbuf = smpi_get_tmp_sendbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
798 void *recvbuf = smpi_get_tmp_recvbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
800 mpi_coll_reduce_scatter_fun(sendbuf, recvbuf, recvcounts, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
801 smpi_execute_flops(comp_size);
803 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
804 xbt_free(recvcounts);
806 log_timed_action (action, clock);
809 static void action_allgather(const char *const *action) {
810 /* The structure of the allgather action for the rank 0 (total 4 processes) is the following:
811 0 allGather 275427 275427
814 1) 275427 is the sendcount
815 2) 275427 is the recvcount
816 3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype(). */
817 double clock = smpi_process_simulated_elapsed();
819 CHECK_ACTION_PARAMS(action, 2, 2);
820 int sendcount=atoi(action[2]);
821 int recvcount=atoi(action[3]);
823 MPI_Datatype MPI_CURRENT_TYPE2;
825 if(action[4] && action[5]) {
826 MPI_CURRENT_TYPE = decode_datatype(action[4]);
827 MPI_CURRENT_TYPE2 = decode_datatype(action[5]);
829 MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
830 MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
832 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* smpi_datatype_size(MPI_CURRENT_TYPE));
833 void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* smpi_datatype_size(MPI_CURRENT_TYPE2));
835 int rank = smpi_process_index();
836 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
837 extra->type = TRACING_ALLGATHER;
838 extra->send_size = sendcount;
839 extra->recv_size= recvcount;
840 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
841 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, NULL);
842 extra->num_processes = smpi_comm_size(MPI_COMM_WORLD);
844 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
846 mpi_coll_allgather_fun(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
848 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
849 log_timed_action (action, clock);
852 static void action_allgatherv(const char *const *action) {
853 /* The structure of the allgatherv action for the rank 0 (total 4 processes) is the following:
854 0 allGatherV 275427 275427 275427 275427 204020
857 1) 275427 is the sendcount
858 2) The next four elements declare the recvcounts array
859 3) No more values mean that the datatype for sent and receive buffer
860 is the default one, see decode_datatype(). */
861 double clock = smpi_process_simulated_elapsed();
863 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
864 CHECK_ACTION_PARAMS(action, comm_size+1, 2);
866 int sendcount=atoi(action[2]);
867 int *recvcounts = xbt_new0(int, comm_size);
868 int *disps = xbt_new0(int, comm_size);
870 MPI_Datatype MPI_CURRENT_TYPE2;
872 if(action[3+comm_size] && action[4+comm_size]) {
873 MPI_CURRENT_TYPE = decode_datatype(action[3+comm_size]);
874 MPI_CURRENT_TYPE2 = decode_datatype(action[4+comm_size]);
876 MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
877 MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
879 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* smpi_datatype_size(MPI_CURRENT_TYPE));
881 for(i=0;i<comm_size;i++) {
882 recvcounts[i] = atoi(action[i+3]);
883 recv_sum=recv_sum+recvcounts[i];
885 void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* smpi_datatype_size(MPI_CURRENT_TYPE2));
887 int rank = smpi_process_index();
888 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
889 extra->type = TRACING_ALLGATHERV;
890 extra->send_size = sendcount;
891 extra->recvcounts= xbt_new(int, comm_size);
892 for(i=0; i< comm_size; i++)//copy data to avoid bad free
893 extra->recvcounts[i] = recvcounts[i];
894 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
895 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, NULL);
896 extra->num_processes = comm_size;
898 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
900 mpi_coll_allgatherv_fun(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts, disps, MPI_CURRENT_TYPE2,
903 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
904 log_timed_action (action, clock);
905 xbt_free(recvcounts);
909 static void action_allToAllv(const char *const *action) {
910 /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
911 0 allToAllV 100 1 7 10 12 100 1 70 10 5
914 1) 100 is the size of the send buffer *sizeof(int),
915 2) 1 7 10 12 is the sendcounts array
916 3) 100*sizeof(int) is the size of the receiver buffer
917 4) 1 70 10 5 is the recvcounts array */
918 double clock = smpi_process_simulated_elapsed();
920 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
921 CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2);
922 int send_buf_size=0,recv_buf_size=0,i=0;
923 int *sendcounts = xbt_new0(int, comm_size);
924 int *recvcounts = xbt_new0(int, comm_size);
925 int *senddisps = xbt_new0(int, comm_size);
926 int *recvdisps = xbt_new0(int, comm_size);
928 MPI_Datatype MPI_CURRENT_TYPE2;
930 send_buf_size=parse_double(action[2]);
931 recv_buf_size=parse_double(action[3+comm_size]);
932 if(action[4+2*comm_size] && action[5+2*comm_size]) {
933 MPI_CURRENT_TYPE=decode_datatype(action[4+2*comm_size]);
934 MPI_CURRENT_TYPE2=decode_datatype(action[5+2*comm_size]);
937 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
938 MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
941 void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* smpi_datatype_size(MPI_CURRENT_TYPE));
942 void *recvbuf = smpi_get_tmp_recvbuffer(recv_buf_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
944 for(i=0;i<comm_size;i++) {
945 sendcounts[i] = atoi(action[i+3]);
946 recvcounts[i] = atoi(action[i+4+comm_size]);
949 int rank = smpi_process_index();
950 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
951 extra->type = TRACING_ALLTOALLV;
952 extra->recvcounts= xbt_new(int, comm_size);
953 extra->sendcounts= xbt_new(int, comm_size);
954 extra->num_processes = comm_size;
956 for(i=0; i< comm_size; i++){//copy data to avoid bad free
957 extra->send_size += sendcounts[i];
958 extra->sendcounts[i] = sendcounts[i];
959 extra->recv_size += recvcounts[i];
960 extra->recvcounts[i] = recvcounts[i];
962 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, NULL);
963 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, NULL);
965 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
967 mpi_coll_alltoallv_fun(sendbuf, sendcounts, senddisps, MPI_CURRENT_TYPE,recvbuf, recvcounts, recvdisps,
968 MPI_CURRENT_TYPE, MPI_COMM_WORLD);
970 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
971 log_timed_action (action, clock);
972 xbt_free(sendcounts);
973 xbt_free(recvcounts);
978 void smpi_replay_run(int *argc, char***argv){
979 /* First initializes everything */
980 smpi_process_init(argc, argv);
981 smpi_process_mark_as_initialized();
982 smpi_process_set_replaying(1);
984 int rank = smpi_process_index();
985 TRACE_smpi_init(rank);
986 TRACE_smpi_computing_init(rank);
987 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
988 extra->type = TRACING_INIT;
989 char *operation =bprintf("%s_init",__FUNCTION__);
990 TRACE_smpi_collective_in(rank, -1, operation, extra);
991 TRACE_smpi_collective_out(rank, -1, operation);
994 if (!_xbt_replay_action_init()) {
995 xbt_replay_action_register("init", action_init);
996 xbt_replay_action_register("finalize", action_finalize);
997 xbt_replay_action_register("comm_size", action_comm_size);
998 xbt_replay_action_register("comm_split", action_comm_split);
999 xbt_replay_action_register("comm_dup", action_comm_dup);
1000 xbt_replay_action_register("send", action_send);
1001 xbt_replay_action_register("Isend", action_Isend);
1002 xbt_replay_action_register("recv", action_recv);
1003 xbt_replay_action_register("Irecv", action_Irecv);
1004 xbt_replay_action_register("test", action_test);
1005 xbt_replay_action_register("wait", action_wait);
1006 xbt_replay_action_register("waitAll", action_waitall);
1007 xbt_replay_action_register("barrier", action_barrier);
1008 xbt_replay_action_register("bcast", action_bcast);
1009 xbt_replay_action_register("reduce", action_reduce);
1010 xbt_replay_action_register("allReduce", action_allReduce);
1011 xbt_replay_action_register("allToAll", action_allToAll);
1012 xbt_replay_action_register("allToAllV", action_allToAllv);
1013 xbt_replay_action_register("gather", action_gather);
1014 xbt_replay_action_register("gatherV", action_gatherv);
1015 xbt_replay_action_register("allGather", action_allgather);
1016 xbt_replay_action_register("allGatherV", action_allgatherv);
1017 xbt_replay_action_register("reduceScatter", action_reducescatter);
1018 xbt_replay_action_register("compute", action_compute);
1021 //if we have a delayed start, sleep here.
1024 double value = strtod((*argv)[2], &endptr);
1025 if (*endptr != '\0')
1026 THROWF(unknown_error, 0, "%s is not a double", (*argv)[2]);
1027 XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
1028 smpi_execute_flops(value);
1030 //UGLY: force a context switch to be sure that all MSG_processes begin initialization
1031 XBT_DEBUG("Force context switch by smpi_execute_flops - Sleeping for 0.0 flops ");
1032 smpi_execute_flops(0.0);
1035 /* Actually run the replay */
1036 xbt_replay_action_runner(*argc, *argv);
1038 /* and now, finalize everything */
1039 double sim_time= 1.;
1040 /* One active process will stop. Decrease the counter*/
1041 XBT_DEBUG("There are %lu elements in reqq[*]", xbt_dynar_length(get_reqq_self()));
1042 if (!xbt_dynar_is_empty(get_reqq_self())){
1043 int count_requests=xbt_dynar_length(get_reqq_self());
1044 MPI_Request requests[count_requests];
1045 MPI_Status status[count_requests];
1048 xbt_dynar_foreach(get_reqq_self(),i,requests[i]);
1049 smpi_mpi_waitall(count_requests, requests, status);
1055 if(!active_processes){
1056 /* Last process alive speaking */
1057 /* end the simulated timer */
1058 sim_time = smpi_process_simulated_elapsed();
1061 //TODO xbt_dynar_free_container(get_reqq_self()));
1063 if(!active_processes){
1064 XBT_INFO("Simulation time %f", sim_time);
1065 _xbt_replay_action_exit();
1066 xbt_free(sendbuffer);
1067 xbt_free(recvbuffer);
1069 xbt_dict_free(&reqq); //not need, data have been freed ???
1073 instr_extra_data extra_fin = xbt_new0(s_instr_extra_data_t,1);
1074 extra_fin->type = TRACING_FINALIZE;
1075 operation =bprintf("%s_finalize",__FUNCTION__);
1076 TRACE_smpi_collective_in(rank, -1, operation, extra_fin);
1078 smpi_process_finalize();
1080 TRACE_smpi_collective_out(rank, -1, operation);
1081 TRACE_smpi_finalize(smpi_process_index());
1082 smpi_process_destroy();