1 /* Copyright (c) 2009-2015. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
10 #include <xbt/replay.h>
11 #include <unordered_map>
14 #define KEY_SIZE (sizeof(int) * 2 + 1)
16 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
18 int communicator_size = 0;
19 static int active_processes = 0;
20 std::unordered_map<int,std::vector<MPI_Request>*> reqq;
22 MPI_Datatype MPI_DEFAULT_TYPE;
23 MPI_Datatype MPI_CURRENT_TYPE;
25 static int sendbuffer_size=0;
26 char* sendbuffer=nullptr;
27 static int recvbuffer_size=0;
28 char* recvbuffer=nullptr;
30 static void log_timed_action (const char *const *action, double clock){
31 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
32 char *name = xbt_str_join_array(action, " ");
33 XBT_VERB("%s %f", name, smpi_process_simulated_elapsed()-clock);
38 static std::vector<MPI_Request>* get_reqq_self()
40 return reqq.at(smpi_process_index());
43 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
45 reqq.insert({smpi_process_index(), mpi_request});
48 //allocate a single buffer for all sends, growing it if needed
49 void* smpi_get_tmp_sendbuffer(int size)
51 if (!smpi_process_get_replaying())
52 return xbt_malloc(size);
53 if (sendbuffer_size<size){
54 sendbuffer=static_cast<char*>(xbt_realloc(sendbuffer,size));
60 //allocate a single buffer for all recv
61 void* smpi_get_tmp_recvbuffer(int size){
62 if (!smpi_process_get_replaying())
63 return xbt_malloc(size);
64 if (recvbuffer_size<size){
65 recvbuffer=static_cast<char*>(xbt_realloc(recvbuffer,size));
71 void smpi_free_tmp_buffer(void* buf){
72 if (!smpi_process_get_replaying())
77 static double parse_double(const char *string)
81 value = strtod(string, &endptr);
83 THROWF(unknown_error, 0, "%s is not a double", string);
87 static MPI_Datatype decode_datatype(const char *const action)
89 // Declared datatypes,
90 switch(atoi(action)) {
92 MPI_CURRENT_TYPE=MPI_DOUBLE;
95 MPI_CURRENT_TYPE=MPI_INT;
98 MPI_CURRENT_TYPE=MPI_CHAR;
101 MPI_CURRENT_TYPE=MPI_SHORT;
104 MPI_CURRENT_TYPE=MPI_LONG;
107 MPI_CURRENT_TYPE=MPI_FLOAT;
110 MPI_CURRENT_TYPE=MPI_BYTE;
113 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
115 return MPI_CURRENT_TYPE;
119 const char* encode_datatype(MPI_Datatype datatype, int* known)
121 //default type for output is set to MPI_BYTE
122 // MPI_DEFAULT_TYPE is not set for output, use directly MPI_BYTE
125 if (datatype==MPI_BYTE){
128 if(datatype==MPI_DOUBLE)
130 if(datatype==MPI_INT)
132 if(datatype==MPI_CHAR)
134 if(datatype==MPI_SHORT)
136 if(datatype==MPI_LONG)
138 if(datatype==MPI_FLOAT)
140 //tell that the datatype is not handled by replay, and that its size should be measured and replayed as size*MPI_BYTE
143 // default - not implemented.
144 // do not warn here as we pass in this function even for other trace formats
148 #define CHECK_ACTION_PARAMS(action, mandatory, optional) {\
150 while(action[i]!=nullptr)\
153 THROWF(arg_error, 0, "%s replay failed.\n" \
154 "%d items were given on the line. First two should be process_id and action. " \
155 "This action needs after them %d mandatory arguments, and accepts %d optional ones. \n" \
156 "Please contact the Simgrid team if support is needed", __FUNCTION__, i, mandatory, optional);\
159 static void action_init(const char *const *action)
161 XBT_DEBUG("Initialize the counters");
162 CHECK_ACTION_PARAMS(action, 0, 1)
164 MPI_DEFAULT_TYPE=MPI_DOUBLE; // default MPE dataype
165 else MPI_DEFAULT_TYPE= MPI_BYTE; // default TAU datatype
167 /* start a simulated timer */
168 smpi_process_simulated_start();
169 /*initialize the number of active processes */
170 active_processes = smpi_process_count();
172 set_reqq_self(new std::vector<MPI_Request>);
175 static void action_finalize(const char *const *action)
180 static void action_comm_size(const char *const *action)
182 double clock = smpi_process_simulated_elapsed();
184 communicator_size = parse_double(action[2]);
185 log_timed_action (action, clock);
188 static void action_comm_split(const char *const *action)
190 double clock = smpi_process_simulated_elapsed();
192 log_timed_action (action, clock);
195 static void action_comm_dup(const char *const *action)
197 double clock = smpi_process_simulated_elapsed();
199 log_timed_action (action, clock);
202 static void action_compute(const char *const *action)
204 CHECK_ACTION_PARAMS(action, 1, 0)
205 double clock = smpi_process_simulated_elapsed();
206 double flops= parse_double(action[2]);
207 int rank = smpi_process_index();
208 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
209 extra->type=TRACING_COMPUTING;
210 extra->comp_size=flops;
211 TRACE_smpi_computing_in(rank, extra);
213 smpi_execute_flops(flops);
215 TRACE_smpi_computing_out(rank);
216 log_timed_action (action, clock);
219 static void action_send(const char *const *action)
221 CHECK_ACTION_PARAMS(action, 2, 1)
222 int to = atoi(action[2]);
223 double size=parse_double(action[3]);
224 double clock = smpi_process_simulated_elapsed();
227 MPI_CURRENT_TYPE=decode_datatype(action[4]);
229 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
231 int rank = smpi_process_index();
233 int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
234 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
235 extra->type = TRACING_SEND;
236 extra->send_size = size;
238 extra->dst = dst_traced;
239 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
240 TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
241 if (!TRACE_smpi_view_internals()) {
242 TRACE_smpi_send(rank, rank, dst_traced, 0, size*smpi_datatype_size(MPI_CURRENT_TYPE));
245 smpi_mpi_send(nullptr, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
247 log_timed_action (action, clock);
249 TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
252 static void action_Isend(const char *const *action)
254 CHECK_ACTION_PARAMS(action, 2, 1)
255 int to = atoi(action[2]);
256 double size=parse_double(action[3]);
257 double clock = smpi_process_simulated_elapsed();
261 MPI_CURRENT_TYPE=decode_datatype(action[4]);
263 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
265 int rank = smpi_process_index();
266 int dst_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), to);
267 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
268 extra->type = TRACING_ISEND;
269 extra->send_size = size;
271 extra->dst = dst_traced;
272 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
273 TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
274 if (!TRACE_smpi_view_internals()) {
275 TRACE_smpi_send(rank, rank, dst_traced, 0, size*smpi_datatype_size(MPI_CURRENT_TYPE));
278 request = smpi_mpi_isend(nullptr, size, MPI_CURRENT_TYPE, to, 0,MPI_COMM_WORLD);
280 TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
283 get_reqq_self()->push_back(request);
285 log_timed_action (action, clock);
288 static void action_recv(const char *const *action) {
289 CHECK_ACTION_PARAMS(action, 2, 1)
290 int from = atoi(action[2]);
291 double size=parse_double(action[3]);
292 double clock = smpi_process_simulated_elapsed();
296 MPI_CURRENT_TYPE=decode_datatype(action[4]);
298 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
300 int rank = smpi_process_index();
301 int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
303 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
304 extra->type = TRACING_RECV;
305 extra->send_size = size;
306 extra->src = src_traced;
308 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
309 TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
311 //unknown size from the receiver point of view
313 smpi_mpi_probe(from, 0, MPI_COMM_WORLD, &status);
317 smpi_mpi_recv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
319 TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
320 if (!TRACE_smpi_view_internals()) {
321 TRACE_smpi_recv(rank, src_traced, rank, 0);
324 log_timed_action (action, clock);
327 static void action_Irecv(const char *const *action)
329 CHECK_ACTION_PARAMS(action, 2, 1)
330 int from = atoi(action[2]);
331 double size=parse_double(action[3]);
332 double clock = smpi_process_simulated_elapsed();
336 MPI_CURRENT_TYPE=decode_datatype(action[4]);
338 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
340 int rank = smpi_process_index();
341 int src_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), from);
342 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
343 extra->type = TRACING_IRECV;
344 extra->send_size = size;
345 extra->src = src_traced;
347 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
348 TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
350 //unknow size from the receiver pov
352 smpi_mpi_probe(from, 0, MPI_COMM_WORLD, &status);
356 request = smpi_mpi_irecv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
358 TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
360 get_reqq_self()->push_back(request);
362 log_timed_action (action, clock);
365 static void action_test(const char *const *action){
366 CHECK_ACTION_PARAMS(action, 0, 0)
367 double clock = smpi_process_simulated_elapsed();
371 MPI_Request request = get_reqq_self()->back();
372 get_reqq_self()->pop_back();
373 //if request is null here, this may mean that a previous test has succeeded
374 //Different times in traced application and replayed version may lead to this
375 //In this case, ignore the extra calls.
376 if(request!=nullptr){
377 int rank = smpi_process_index();
378 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
379 extra->type=TRACING_TEST;
380 TRACE_smpi_testing_in(rank, extra);
382 flag = smpi_mpi_test(&request, &status);
384 XBT_DEBUG("MPI_Test result: %d", flag);
385 /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now nullptr.*/
386 get_reqq_self()->push_back(request);
388 TRACE_smpi_testing_out(rank);
390 log_timed_action (action, clock);
393 static void action_wait(const char *const *action){
394 CHECK_ACTION_PARAMS(action, 0, 0)
395 double clock = smpi_process_simulated_elapsed();
398 xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s",
399 xbt_str_join_array(action," "));
400 MPI_Request request = get_reqq_self()->back();
401 get_reqq_self()->pop_back();
403 if (request==nullptr){
404 /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just return.*/
408 int rank = request->comm != MPI_COMM_NULL ? smpi_comm_rank(request->comm) : -1;
410 MPI_Group group = smpi_comm_group(request->comm);
411 int src_traced = smpi_group_rank(group, request->src);
412 int dst_traced = smpi_group_rank(group, request->dst);
413 int is_wait_for_receive = request->recv;
414 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
415 extra->type = TRACING_WAIT;
416 TRACE_smpi_ptp_in(rank, src_traced, dst_traced, __FUNCTION__, extra);
418 smpi_mpi_wait(&request, &status);
420 TRACE_smpi_ptp_out(rank, src_traced, dst_traced, __FUNCTION__);
421 if (is_wait_for_receive)
422 TRACE_smpi_recv(rank, src_traced, dst_traced, 0);
423 log_timed_action (action, clock);
426 static void action_waitall(const char *const *action){
427 CHECK_ACTION_PARAMS(action, 0, 0)
428 double clock = smpi_process_simulated_elapsed();
429 unsigned int count_requests=get_reqq_self()->size();
431 if (count_requests>0) {
432 MPI_Status status[count_requests];
434 int rank_traced = smpi_process_index();
435 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
436 extra->type = TRACING_WAITALL;
437 extra->send_size=count_requests;
438 TRACE_smpi_ptp_in(rank_traced, -1, -1, __FUNCTION__,extra);
439 int* recvs_snd= xbt_new0(int,count_requests);
440 int* recvs_rcv= xbt_new0(int,count_requests);
442 for (auto req : *(get_reqq_self())){
443 if (req && req->recv){
444 recvs_snd[i]=req->src;
445 recvs_rcv[i]=req->dst;
450 smpi_mpi_waitall(count_requests, &(*get_reqq_self())[0], status);
452 for (i=0; i<count_requests;i++){
453 if (recvs_snd[i]!=-100)
454 TRACE_smpi_recv(rank_traced, recvs_snd[i], recvs_rcv[i],0);
458 TRACE_smpi_ptp_out(rank_traced, -1, -1, __FUNCTION__);
460 log_timed_action (action, clock);
463 static void action_barrier(const char *const *action){
464 double clock = smpi_process_simulated_elapsed();
465 int rank = smpi_process_index();
466 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
467 extra->type = TRACING_BARRIER;
468 TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
470 mpi_coll_barrier_fun(MPI_COMM_WORLD);
472 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
473 log_timed_action (action, clock);
476 static void action_bcast(const char *const *action)
478 CHECK_ACTION_PARAMS(action, 1, 2)
479 double size = parse_double(action[2]);
480 double clock = smpi_process_simulated_elapsed();
482 /* Initialize MPI_CURRENT_TYPE in order to decrease the number of the checks */
483 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
486 root= atoi(action[3]);
488 MPI_CURRENT_TYPE=decode_datatype(action[4]);
492 int rank = smpi_process_index();
493 int root_traced = smpi_group_index(smpi_comm_group(MPI_COMM_WORLD), root);
495 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
496 extra->type = TRACING_BCAST;
497 extra->send_size = size;
498 extra->root = root_traced;
499 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
500 TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__, extra);
501 void *sendbuf = smpi_get_tmp_sendbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
503 mpi_coll_bcast_fun(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
505 TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
506 log_timed_action (action, clock);
509 static void action_reduce(const char *const *action)
511 CHECK_ACTION_PARAMS(action, 2, 2)
512 double comm_size = parse_double(action[2]);
513 double comp_size = parse_double(action[3]);
514 double clock = smpi_process_simulated_elapsed();
516 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
519 root= atoi(action[4]);
521 MPI_CURRENT_TYPE=decode_datatype(action[5]);
525 int rank = smpi_process_index();
526 int root_traced = smpi_group_rank(smpi_comm_group(MPI_COMM_WORLD), root);
527 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
528 extra->type = TRACING_REDUCE;
529 extra->send_size = comm_size;
530 extra->comp_size = comp_size;
531 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
532 extra->root = root_traced;
534 TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__,extra);
536 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
537 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
538 mpi_coll_reduce_fun(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
539 smpi_execute_flops(comp_size);
541 TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
542 log_timed_action (action, clock);
545 static void action_allReduce(const char *const *action) {
546 CHECK_ACTION_PARAMS(action, 2, 1)
547 double comm_size = parse_double(action[2]);
548 double comp_size = parse_double(action[3]);
551 MPI_CURRENT_TYPE=decode_datatype(action[4]);
553 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
555 double clock = smpi_process_simulated_elapsed();
556 int rank = smpi_process_index();
557 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
558 extra->type = TRACING_ALLREDUCE;
559 extra->send_size = comm_size;
560 extra->comp_size = comp_size;
561 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
562 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
564 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
565 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
566 mpi_coll_allreduce_fun(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
567 smpi_execute_flops(comp_size);
569 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
570 log_timed_action (action, clock);
573 static void action_allToAll(const char *const *action) {
574 CHECK_ACTION_PARAMS(action, 2, 2) //two mandatory (send and recv volumes)
575 //two optional (corresponding datatypes)
576 double clock = smpi_process_simulated_elapsed();
577 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
578 int send_size = parse_double(action[2]);
579 int recv_size = parse_double(action[3]);
580 MPI_Datatype MPI_CURRENT_TYPE2;
582 if(action[4] && action[5]) {
583 MPI_CURRENT_TYPE=decode_datatype(action[4]);
584 MPI_CURRENT_TYPE2=decode_datatype(action[5]);
587 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
588 MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
591 void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE));
592 void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
594 int rank = smpi_process_index();
595 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
596 extra->type = TRACING_ALLTOALL;
597 extra->send_size = send_size;
598 extra->recv_size = recv_size;
599 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
600 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
602 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
604 mpi_coll_alltoall_fun(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
606 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
607 log_timed_action (action, clock);
610 static void action_gather(const char *const *action) {
611 /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
615 1) 68 is the sendcounts
616 2) 68 is the recvcounts
617 3) 0 is the root node
618 4) 0 is the send datatype id, see decode_datatype()
619 5) 0 is the recv datatype id, see decode_datatype()
621 CHECK_ACTION_PARAMS(action, 2, 3)
622 double clock = smpi_process_simulated_elapsed();
623 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
624 int send_size = parse_double(action[2]);
625 int recv_size = parse_double(action[3]);
626 MPI_Datatype MPI_CURRENT_TYPE2;
627 if(action[4] && action[5]) {
628 MPI_CURRENT_TYPE=decode_datatype(action[5]);
629 MPI_CURRENT_TYPE2=decode_datatype(action[6]);
631 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
632 MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
634 void *send = smpi_get_tmp_sendbuffer(send_size* smpi_datatype_size(MPI_CURRENT_TYPE));
635 void *recv = nullptr;
638 root=atoi(action[4]);
639 int rank = smpi_comm_rank(MPI_COMM_WORLD);
642 recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
644 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
645 extra->type = TRACING_GATHER;
646 extra->send_size = send_size;
647 extra->recv_size = recv_size;
649 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
650 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
652 TRACE_smpi_collective_in(smpi_process_index(), root, __FUNCTION__, extra);
654 mpi_coll_gather_fun(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
656 TRACE_smpi_collective_out(smpi_process_index(), -1, __FUNCTION__);
657 log_timed_action (action, clock);
660 static void action_gatherv(const char *const *action) {
661 /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
662 0 gather 68 68 10 10 10 0 0 0
665 1) 68 is the sendcount
666 2) 68 10 10 10 is the recvcounts
667 3) 0 is the root node
668 4) 0 is the send datatype id, see decode_datatype()
669 5) 0 is the recv datatype id, see decode_datatype()
672 double clock = smpi_process_simulated_elapsed();
673 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
674 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
675 int send_size = parse_double(action[2]);
676 int *disps = xbt_new0(int, comm_size);
677 int *recvcounts = xbt_new0(int, comm_size);
680 MPI_Datatype MPI_CURRENT_TYPE2;
681 if(action[4+comm_size] && action[5+comm_size]) {
682 MPI_CURRENT_TYPE=decode_datatype(action[4+comm_size]);
683 MPI_CURRENT_TYPE2=decode_datatype(action[5+comm_size]);
685 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
686 MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
688 void *send = smpi_get_tmp_sendbuffer(send_size* smpi_datatype_size(MPI_CURRENT_TYPE));
689 void *recv = nullptr;
690 for(i=0;i<comm_size;i++) {
691 recvcounts[i] = atoi(action[i+3]);
692 recv_sum=recv_sum+recvcounts[i];
696 int root=atoi(action[3+comm_size]);
697 int rank = smpi_comm_rank(MPI_COMM_WORLD);
700 recv = smpi_get_tmp_recvbuffer(recv_sum* smpi_datatype_size(MPI_CURRENT_TYPE2));
702 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
703 extra->type = TRACING_GATHERV;
704 extra->send_size = send_size;
705 extra->recvcounts= xbt_new(int,comm_size);
706 for(i=0; i< comm_size; i++)//copy data to avoid bad free
707 extra->recvcounts[i] = recvcounts[i];
709 extra->num_processes = comm_size;
710 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
711 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
713 TRACE_smpi_collective_in(smpi_process_index(), root, __FUNCTION__, extra);
715 smpi_mpi_gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts, disps, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
717 TRACE_smpi_collective_out(smpi_process_index(), -1, __FUNCTION__);
718 log_timed_action (action, clock);
719 xbt_free(recvcounts);
723 static void action_reducescatter(const char *const *action) {
724 /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
725 0 reduceScatter 275427 275427 275427 204020 11346849 0
728 1) The first four values after the name of the action declare the recvcounts array
729 2) The value 11346849 is the amount of instructions
730 3) The last value corresponds to the datatype, see decode_datatype().
732 We analyze a MPI_Reduce_scatter call to one MPI_Reduce and one MPI_Scatterv. */
733 double clock = smpi_process_simulated_elapsed();
734 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
735 CHECK_ACTION_PARAMS(action, comm_size+1, 1)
736 int comp_size = parse_double(action[2+comm_size]);
737 int *recvcounts = xbt_new0(int, comm_size);
738 int *disps = xbt_new0(int, comm_size);
740 int rank = smpi_process_index();
742 if(action[3+comm_size])
743 MPI_CURRENT_TYPE=decode_datatype(action[3+comm_size]);
745 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
747 for(i=0;i<comm_size;i++) {
748 recvcounts[i] = atoi(action[i+2]);
753 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
754 extra->type = TRACING_REDUCE_SCATTER;
755 extra->send_size = 0;
756 extra->recvcounts= xbt_new(int, comm_size);
757 for(i=0; i< comm_size; i++)//copy data to avoid bad free
758 extra->recvcounts[i] = recvcounts[i];
759 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
760 extra->comp_size = comp_size;
761 extra->num_processes = comm_size;
763 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
765 void *sendbuf = smpi_get_tmp_sendbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
766 void *recvbuf = smpi_get_tmp_recvbuffer(size* smpi_datatype_size(MPI_CURRENT_TYPE));
768 mpi_coll_reduce_scatter_fun(sendbuf, recvbuf, recvcounts, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
769 smpi_execute_flops(comp_size);
771 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
772 xbt_free(recvcounts);
774 log_timed_action (action, clock);
777 static void action_allgather(const char *const *action) {
778 /* The structure of the allgather action for the rank 0 (total 4 processes) is the following:
779 0 allGather 275427 275427
782 1) 275427 is the sendcount
783 2) 275427 is the recvcount
784 3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype(). */
785 double clock = smpi_process_simulated_elapsed();
787 CHECK_ACTION_PARAMS(action, 2, 2)
788 int sendcount=atoi(action[2]);
789 int recvcount=atoi(action[3]);
791 MPI_Datatype MPI_CURRENT_TYPE2;
793 if(action[4] && action[5]) {
794 MPI_CURRENT_TYPE = decode_datatype(action[4]);
795 MPI_CURRENT_TYPE2 = decode_datatype(action[5]);
797 MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
798 MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
800 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* smpi_datatype_size(MPI_CURRENT_TYPE));
801 void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* smpi_datatype_size(MPI_CURRENT_TYPE2));
803 int rank = smpi_process_index();
804 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
805 extra->type = TRACING_ALLGATHER;
806 extra->send_size = sendcount;
807 extra->recv_size= recvcount;
808 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
809 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
810 extra->num_processes = smpi_comm_size(MPI_COMM_WORLD);
812 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
814 mpi_coll_allgather_fun(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
816 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
817 log_timed_action (action, clock);
820 static void action_allgatherv(const char *const *action) {
821 /* The structure of the allgatherv action for the rank 0 (total 4 processes) is the following:
822 0 allGatherV 275427 275427 275427 275427 204020
825 1) 275427 is the sendcount
826 2) The next four elements declare the recvcounts array
827 3) No more values mean that the datatype for sent and receive buffer
828 is the default one, see decode_datatype(). */
829 double clock = smpi_process_simulated_elapsed();
831 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
832 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
834 int sendcount=atoi(action[2]);
835 int *recvcounts = xbt_new0(int, comm_size);
836 int *disps = xbt_new0(int, comm_size);
838 MPI_Datatype MPI_CURRENT_TYPE2;
840 if(action[3+comm_size] && action[4+comm_size]) {
841 MPI_CURRENT_TYPE = decode_datatype(action[3+comm_size]);
842 MPI_CURRENT_TYPE2 = decode_datatype(action[4+comm_size]);
844 MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
845 MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
847 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* smpi_datatype_size(MPI_CURRENT_TYPE));
849 for(i=0;i<comm_size;i++) {
850 recvcounts[i] = atoi(action[i+3]);
851 recv_sum=recv_sum+recvcounts[i];
853 void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* smpi_datatype_size(MPI_CURRENT_TYPE2));
855 int rank = smpi_process_index();
856 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
857 extra->type = TRACING_ALLGATHERV;
858 extra->send_size = sendcount;
859 extra->recvcounts= xbt_new(int, comm_size);
860 for(i=0; i< comm_size; i++)//copy data to avoid bad free
861 extra->recvcounts[i] = recvcounts[i];
862 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
863 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
864 extra->num_processes = comm_size;
866 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
868 mpi_coll_allgatherv_fun(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts, disps, MPI_CURRENT_TYPE2,
871 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
872 log_timed_action (action, clock);
873 xbt_free(recvcounts);
877 static void action_allToAllv(const char *const *action) {
878 /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
879 0 allToAllV 100 1 7 10 12 100 1 70 10 5
882 1) 100 is the size of the send buffer *sizeof(int),
883 2) 1 7 10 12 is the sendcounts array
884 3) 100*sizeof(int) is the size of the receiver buffer
885 4) 1 70 10 5 is the recvcounts array */
886 double clock = smpi_process_simulated_elapsed();
888 int comm_size = smpi_comm_size(MPI_COMM_WORLD);
889 CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
890 int *sendcounts = xbt_new0(int, comm_size);
891 int *recvcounts = xbt_new0(int, comm_size);
892 int *senddisps = xbt_new0(int, comm_size);
893 int *recvdisps = xbt_new0(int, comm_size);
895 MPI_Datatype MPI_CURRENT_TYPE2;
897 int send_buf_size=parse_double(action[2]);
898 int recv_buf_size=parse_double(action[3+comm_size]);
899 if(action[4+2*comm_size] && action[5+2*comm_size]) {
900 MPI_CURRENT_TYPE=decode_datatype(action[4+2*comm_size]);
901 MPI_CURRENT_TYPE2=decode_datatype(action[5+2*comm_size]);
904 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
905 MPI_CURRENT_TYPE2=MPI_DEFAULT_TYPE;
908 void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* smpi_datatype_size(MPI_CURRENT_TYPE));
909 void *recvbuf = smpi_get_tmp_recvbuffer(recv_buf_size* smpi_datatype_size(MPI_CURRENT_TYPE2));
911 for(int i=0;i<comm_size;i++) {
912 sendcounts[i] = atoi(action[i+3]);
913 recvcounts[i] = atoi(action[i+4+comm_size]);
916 int rank = smpi_process_index();
917 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
918 extra->type = TRACING_ALLTOALLV;
919 extra->recvcounts= xbt_new(int, comm_size);
920 extra->sendcounts= xbt_new(int, comm_size);
921 extra->num_processes = comm_size;
923 for(int i=0; i< comm_size; i++){//copy data to avoid bad free
924 extra->send_size += sendcounts[i];
925 extra->sendcounts[i] = sendcounts[i];
926 extra->recv_size += recvcounts[i];
927 extra->recvcounts[i] = recvcounts[i];
929 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
930 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
932 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
934 mpi_coll_alltoallv_fun(sendbuf, sendcounts, senddisps, MPI_CURRENT_TYPE,recvbuf, recvcounts, recvdisps,
935 MPI_CURRENT_TYPE, MPI_COMM_WORLD);
937 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
938 log_timed_action (action, clock);
939 xbt_free(sendcounts);
940 xbt_free(recvcounts);
945 void smpi_replay_run(int *argc, char***argv){
946 /* First initializes everything */
947 smpi_process_init(argc, argv);
948 smpi_process_mark_as_initialized();
949 smpi_process_set_replaying(true);
951 int rank = smpi_process_index();
952 TRACE_smpi_init(rank);
953 TRACE_smpi_computing_init(rank);
954 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
955 extra->type = TRACING_INIT;
956 char *operation =bprintf("%s_init",__FUNCTION__);
957 TRACE_smpi_collective_in(rank, -1, operation, extra);
958 TRACE_smpi_collective_out(rank, -1, operation);
961 if (_xbt_replay_action_init()==0) {
962 xbt_replay_action_register("init", action_init);
963 xbt_replay_action_register("finalize", action_finalize);
964 xbt_replay_action_register("comm_size", action_comm_size);
965 xbt_replay_action_register("comm_split", action_comm_split);
966 xbt_replay_action_register("comm_dup", action_comm_dup);
967 xbt_replay_action_register("send", action_send);
968 xbt_replay_action_register("Isend", action_Isend);
969 xbt_replay_action_register("recv", action_recv);
970 xbt_replay_action_register("Irecv", action_Irecv);
971 xbt_replay_action_register("test", action_test);
972 xbt_replay_action_register("wait", action_wait);
973 xbt_replay_action_register("waitAll", action_waitall);
974 xbt_replay_action_register("barrier", action_barrier);
975 xbt_replay_action_register("bcast", action_bcast);
976 xbt_replay_action_register("reduce", action_reduce);
977 xbt_replay_action_register("allReduce", action_allReduce);
978 xbt_replay_action_register("allToAll", action_allToAll);
979 xbt_replay_action_register("allToAllV", action_allToAllv);
980 xbt_replay_action_register("gather", action_gather);
981 xbt_replay_action_register("gatherV", action_gatherv);
982 xbt_replay_action_register("allGather", action_allgather);
983 xbt_replay_action_register("allGatherV", action_allgatherv);
984 xbt_replay_action_register("reduceScatter", action_reducescatter);
985 xbt_replay_action_register("compute", action_compute);
988 //if we have a delayed start, sleep here.
991 double value = strtod((*argv)[2], &endptr);
993 THROWF(unknown_error, 0, "%s is not a double", (*argv)[2]);
994 XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
995 smpi_execute_flops(value);
997 //UGLY: force a context switch to be sure that all MSG_processes begin initialization
998 XBT_DEBUG("Force context switch by smpi_execute_flops - Sleeping for 0.0 flops ");
999 smpi_execute_flops(0.0);
1002 /* Actually run the replay */
1003 xbt_replay_action_runner(*argc, *argv);
1005 /* and now, finalize everything */
1006 double sim_time= 1.;
1007 /* One active process will stop. Decrease the counter*/
1008 XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
1009 if (!get_reqq_self()->empty()){
1010 unsigned int count_requests=get_reqq_self()->size();
1011 MPI_Request requests[count_requests];
1012 MPI_Status status[count_requests];
1015 for (auto req: *get_reqq_self()){
1019 smpi_mpi_waitall(count_requests, requests, status);
1025 if(active_processes==0){
1026 /* Last process alive speaking */
1027 /* end the simulated timer */
1028 sim_time = smpi_process_simulated_elapsed();
1029 XBT_INFO("Simulation time %f", sim_time);
1030 _xbt_replay_action_exit();
1031 xbt_free(sendbuffer);
1032 xbt_free(recvbuffer);
1035 instr_extra_data extra_fin = xbt_new0(s_instr_extra_data_t,1);
1036 extra_fin->type = TRACING_FINALIZE;
1037 operation =bprintf("%s_finalize",__FUNCTION__);
1038 TRACE_smpi_collective_in(rank, -1, operation, extra_fin);
1040 smpi_process_finalize();
1042 TRACE_smpi_collective_out(rank, -1, operation);
1043 TRACE_smpi_finalize(smpi_process_index());
1044 smpi_process_destroy();
1045 xbt_free(operation);