1 /* Copyright (c) 2009-2017. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
6 #include "src/smpi/private.h"
7 #include "src/smpi/smpi_coll.hpp"
8 #include "src/smpi/smpi_comm.hpp"
9 #include "src/smpi/smpi_datatype.hpp"
10 #include "src/smpi/smpi_group.hpp"
11 #include "src/smpi/smpi_process.hpp"
12 #include "src/smpi/smpi_request.hpp"
13 #include "xbt/replay.hpp"
15 #include <unordered_map>
18 #define KEY_SIZE (sizeof(int) * 2 + 1)
20 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
22 int communicator_size = 0;
23 static int active_processes = 0;
24 std::unordered_map<int,std::vector<MPI_Request>*> reqq;
26 MPI_Datatype MPI_DEFAULT_TYPE;
27 MPI_Datatype MPI_CURRENT_TYPE;
29 static int sendbuffer_size=0;
30 char* sendbuffer=nullptr;
31 static int recvbuffer_size=0;
32 char* recvbuffer=nullptr;
34 static void log_timed_action (const char *const *action, double clock){
35 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
36 char *name = xbt_str_join_array(action, " ");
37 XBT_VERB("%s %f", name, smpi_process()->simulated_elapsed()-clock);
42 static std::vector<MPI_Request>* get_reqq_self()
44 return reqq.at(smpi_process()->index());
47 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
49 reqq.insert({smpi_process()->index(), mpi_request});
52 //allocate a single buffer for all sends, growing it if needed
53 void* smpi_get_tmp_sendbuffer(int size)
55 if (!smpi_process()->replaying())
56 return xbt_malloc(size);
57 if (sendbuffer_size<size){
58 sendbuffer=static_cast<char*>(xbt_realloc(sendbuffer,size));
64 //allocate a single buffer for all recv
65 void* smpi_get_tmp_recvbuffer(int size){
66 if (!smpi_process()->replaying())
67 return xbt_malloc(size);
68 if (recvbuffer_size<size){
69 recvbuffer=static_cast<char*>(xbt_realloc(recvbuffer,size));
75 void smpi_free_tmp_buffer(void* buf){
76 if (!smpi_process()->replaying())
81 static double parse_double(const char *string)
84 double value = strtod(string, &endptr);
86 THROWF(unknown_error, 0, "%s is not a double", string);
90 static MPI_Datatype decode_datatype(const char *const action)
92 switch(atoi(action)) {
94 MPI_CURRENT_TYPE=MPI_DOUBLE;
97 MPI_CURRENT_TYPE=MPI_INT;
100 MPI_CURRENT_TYPE=MPI_CHAR;
103 MPI_CURRENT_TYPE=MPI_SHORT;
106 MPI_CURRENT_TYPE=MPI_LONG;
109 MPI_CURRENT_TYPE=MPI_FLOAT;
112 MPI_CURRENT_TYPE=MPI_BYTE;
115 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
117 return MPI_CURRENT_TYPE;
120 const char* encode_datatype(MPI_Datatype datatype, int* known)
122 //default type for output is set to MPI_BYTE
123 // MPI_DEFAULT_TYPE is not set for output, use directly MPI_BYTE
126 if (datatype==MPI_BYTE)
128 if(datatype==MPI_DOUBLE)
130 if(datatype==MPI_INT)
132 if(datatype==MPI_CHAR)
134 if(datatype==MPI_SHORT)
136 if(datatype==MPI_LONG)
138 if(datatype==MPI_FLOAT)
140 //tell that the datatype is not handled by replay, and that its size should be measured and replayed as size*MPI_BYTE
143 // default - not implemented.
144 // do not warn here as we pass in this function even for other trace formats
148 #define CHECK_ACTION_PARAMS(action, mandatory, optional) {\
150 while(action[i]!=nullptr)\
153 THROWF(arg_error, 0, "%s replay failed.\n" \
154 "%d items were given on the line. First two should be process_id and action. " \
155 "This action needs after them %d mandatory arguments, and accepts %d optional ones. \n" \
156 "Please contact the Simgrid team if support is needed", __FUNCTION__, i, mandatory, optional);\
162 static void action_init(const char *const *action)
164 XBT_DEBUG("Initialize the counters");
165 CHECK_ACTION_PARAMS(action, 0, 1)
167 MPI_DEFAULT_TYPE=MPI_DOUBLE; // default MPE dataype
168 else MPI_DEFAULT_TYPE= MPI_BYTE; // default TAU datatype
170 /* start a simulated timer */
171 smpi_process()->simulated_start();
172 /*initialize the number of active processes */
173 active_processes = smpi_process_count();
175 set_reqq_self(new std::vector<MPI_Request>);
178 static void action_finalize(const char *const *action)
183 static void action_comm_size(const char *const *action)
185 communicator_size = parse_double(action[2]);
186 log_timed_action (action, smpi_process()->simulated_elapsed());
189 static void action_comm_split(const char *const *action)
191 log_timed_action (action, smpi_process()->simulated_elapsed());
194 static void action_comm_dup(const char *const *action)
196 log_timed_action (action, smpi_process()->simulated_elapsed());
199 static void action_compute(const char *const *action)
201 CHECK_ACTION_PARAMS(action, 1, 0)
202 double clock = smpi_process()->simulated_elapsed();
203 double flops= parse_double(action[2]);
204 int rank = smpi_process()->index();
205 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
206 extra->type=TRACING_COMPUTING;
207 extra->comp_size=flops;
208 TRACE_smpi_computing_in(rank, extra);
210 smpi_execute_flops(flops);
212 TRACE_smpi_computing_out(rank);
213 log_timed_action (action, clock);
216 static void action_send(const char *const *action)
218 CHECK_ACTION_PARAMS(action, 2, 1)
219 int to = atoi(action[2]);
220 double size=parse_double(action[3]);
221 double clock = smpi_process()->simulated_elapsed();
224 MPI_CURRENT_TYPE=decode_datatype(action[4]);
226 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
228 int rank = smpi_process()->index();
230 int dst_traced = MPI_COMM_WORLD->group()->rank(to);
231 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
232 extra->type = TRACING_SEND;
233 extra->send_size = size;
235 extra->dst = dst_traced;
236 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
237 TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
238 if (!TRACE_smpi_view_internals())
239 TRACE_smpi_send(rank, rank, dst_traced, 0, size*MPI_CURRENT_TYPE->size());
241 Request::send(nullptr, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
243 log_timed_action (action, clock);
245 TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
248 static void action_Isend(const char *const *action)
250 CHECK_ACTION_PARAMS(action, 2, 1)
251 int to = atoi(action[2]);
252 double size=parse_double(action[3]);
253 double clock = smpi_process()->simulated_elapsed();
256 MPI_CURRENT_TYPE=decode_datatype(action[4]);
258 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
260 int rank = smpi_process()->index();
261 int dst_traced = MPI_COMM_WORLD->group()->rank(to);
262 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
263 extra->type = TRACING_ISEND;
264 extra->send_size = size;
266 extra->dst = dst_traced;
267 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
268 TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
269 if (!TRACE_smpi_view_internals())
270 TRACE_smpi_send(rank, rank, dst_traced, 0, size*MPI_CURRENT_TYPE->size());
272 MPI_Request request = Request::isend(nullptr, size, MPI_CURRENT_TYPE, to, 0,MPI_COMM_WORLD);
274 TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
276 get_reqq_self()->push_back(request);
278 log_timed_action (action, clock);
281 static void action_recv(const char *const *action) {
282 CHECK_ACTION_PARAMS(action, 2, 1)
283 int from = atoi(action[2]);
284 double size=parse_double(action[3]);
285 double clock = smpi_process()->simulated_elapsed();
289 MPI_CURRENT_TYPE=decode_datatype(action[4]);
291 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
293 int rank = smpi_process()->index();
294 int src_traced = MPI_COMM_WORLD->group()->rank(from);
296 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
297 extra->type = TRACING_RECV;
298 extra->send_size = size;
299 extra->src = src_traced;
301 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
302 TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
304 //unknown size from the receiver point of view
306 Request::probe(from, 0, MPI_COMM_WORLD, &status);
310 Request::recv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
312 TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
313 if (!TRACE_smpi_view_internals()) {
314 TRACE_smpi_recv(rank, src_traced, rank, 0);
317 log_timed_action (action, clock);
320 static void action_Irecv(const char *const *action)
322 CHECK_ACTION_PARAMS(action, 2, 1)
323 int from = atoi(action[2]);
324 double size=parse_double(action[3]);
325 double clock = smpi_process()->simulated_elapsed();
328 MPI_CURRENT_TYPE=decode_datatype(action[4]);
330 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
332 int rank = smpi_process()->index();
333 int src_traced = MPI_COMM_WORLD->group()->rank(from);
334 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
335 extra->type = TRACING_IRECV;
336 extra->send_size = size;
337 extra->src = src_traced;
339 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
340 TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
342 //unknow size from the receiver pov
344 Request::probe(from, 0, MPI_COMM_WORLD, &status);
348 MPI_Request request = Request::irecv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
350 TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
351 get_reqq_self()->push_back(request);
353 log_timed_action (action, clock);
356 static void action_test(const char *const *action){
357 CHECK_ACTION_PARAMS(action, 0, 0)
358 double clock = smpi_process()->simulated_elapsed();
361 MPI_Request request = get_reqq_self()->back();
362 get_reqq_self()->pop_back();
363 //if request is null here, this may mean that a previous test has succeeded
364 //Different times in traced application and replayed version may lead to this
365 //In this case, ignore the extra calls.
366 if(request!=nullptr){
367 int rank = smpi_process()->index();
368 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
369 extra->type=TRACING_TEST;
370 TRACE_smpi_testing_in(rank, extra);
372 int flag = Request::test(&request, &status);
374 XBT_DEBUG("MPI_Test result: %d", flag);
375 /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now nullptr.*/
376 get_reqq_self()->push_back(request);
378 TRACE_smpi_testing_out(rank);
380 log_timed_action (action, clock);
383 static void action_wait(const char *const *action){
384 CHECK_ACTION_PARAMS(action, 0, 0)
385 double clock = smpi_process()->simulated_elapsed();
388 xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s",
389 xbt_str_join_array(action," "));
390 MPI_Request request = get_reqq_self()->back();
391 get_reqq_self()->pop_back();
393 if (request==nullptr){
394 /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just return.*/
398 int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
400 MPI_Group group = request->comm()->group();
401 int src_traced = group->rank(request->src());
402 int dst_traced = group->rank(request->dst());
403 int is_wait_for_receive = (request->flags() & RECV);
404 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
405 extra->type = TRACING_WAIT;
406 TRACE_smpi_ptp_in(rank, src_traced, dst_traced, __FUNCTION__, extra);
408 Request::wait(&request, &status);
410 TRACE_smpi_ptp_out(rank, src_traced, dst_traced, __FUNCTION__);
411 if (is_wait_for_receive)
412 TRACE_smpi_recv(rank, src_traced, dst_traced, 0);
413 log_timed_action (action, clock);
416 static void action_waitall(const char *const *action){
417 CHECK_ACTION_PARAMS(action, 0, 0)
418 double clock = smpi_process()->simulated_elapsed();
419 unsigned int count_requests=get_reqq_self()->size();
421 if (count_requests>0) {
422 MPI_Status status[count_requests];
424 int rank_traced = smpi_process()->index();
425 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
426 extra->type = TRACING_WAITALL;
427 extra->send_size=count_requests;
428 TRACE_smpi_ptp_in(rank_traced, -1, -1, __FUNCTION__,extra);
429 int recvs_snd[count_requests];
430 int recvs_rcv[count_requests];
432 for (auto req : *(get_reqq_self())){
433 if (req && (req->flags () & RECV)){
434 recvs_snd[i]=req->src();
435 recvs_rcv[i]=req->dst();
440 Request::waitall(count_requests, &(*get_reqq_self())[0], status);
442 for (i=0; i<count_requests;i++){
443 if (recvs_snd[i]!=-100)
444 TRACE_smpi_recv(rank_traced, recvs_snd[i], recvs_rcv[i],0);
446 TRACE_smpi_ptp_out(rank_traced, -1, -1, __FUNCTION__);
448 log_timed_action (action, clock);
451 static void action_barrier(const char *const *action){
452 double clock = smpi_process()->simulated_elapsed();
453 int rank = smpi_process()->index();
454 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
455 extra->type = TRACING_BARRIER;
456 TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
458 Colls::barrier(MPI_COMM_WORLD);
460 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
461 log_timed_action (action, clock);
464 static void action_bcast(const char *const *action)
466 CHECK_ACTION_PARAMS(action, 1, 2)
467 double size = parse_double(action[2]);
468 double clock = smpi_process()->simulated_elapsed();
470 /* Initialize MPI_CURRENT_TYPE in order to decrease the number of the checks */
471 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
474 root= atoi(action[3]);
476 MPI_CURRENT_TYPE=decode_datatype(action[4]);
479 int rank = smpi_process()->index();
480 int root_traced = MPI_COMM_WORLD->group()->index(root);
482 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
483 extra->type = TRACING_BCAST;
484 extra->send_size = size;
485 extra->root = root_traced;
486 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
487 TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__, extra);
488 void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
490 Colls::bcast(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
492 TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
493 log_timed_action (action, clock);
496 static void action_reduce(const char *const *action)
498 CHECK_ACTION_PARAMS(action, 2, 2)
499 double comm_size = parse_double(action[2]);
500 double comp_size = parse_double(action[3]);
501 double clock = smpi_process()->simulated_elapsed();
503 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
506 root= atoi(action[4]);
508 MPI_CURRENT_TYPE=decode_datatype(action[5]);
511 int rank = smpi_process()->index();
512 int root_traced = MPI_COMM_WORLD->group()->rank(root);
513 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
514 extra->type = TRACING_REDUCE;
515 extra->send_size = comm_size;
516 extra->comp_size = comp_size;
517 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
518 extra->root = root_traced;
520 TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__,extra);
522 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
523 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
524 Colls::reduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
525 smpi_execute_flops(comp_size);
527 TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
528 log_timed_action (action, clock);
531 static void action_allReduce(const char *const *action) {
532 CHECK_ACTION_PARAMS(action, 2, 1)
533 double comm_size = parse_double(action[2]);
534 double comp_size = parse_double(action[3]);
537 MPI_CURRENT_TYPE=decode_datatype(action[4]);
539 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
541 double clock = smpi_process()->simulated_elapsed();
542 int rank = smpi_process()->index();
543 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
544 extra->type = TRACING_ALLREDUCE;
545 extra->send_size = comm_size;
546 extra->comp_size = comp_size;
547 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
548 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
550 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
551 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
552 Colls::allreduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
553 smpi_execute_flops(comp_size);
555 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
556 log_timed_action (action, clock);
559 static void action_allToAll(const char *const *action) {
560 CHECK_ACTION_PARAMS(action, 2, 2) //two mandatory (send and recv volumes) and two optional (corresponding datatypes)
561 double clock = smpi_process()->simulated_elapsed();
562 int comm_size = MPI_COMM_WORLD->size();
563 int send_size = parse_double(action[2]);
564 int recv_size = parse_double(action[3]);
565 MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
567 if(action[4] && action[5]) {
568 MPI_CURRENT_TYPE=decode_datatype(action[4]);
569 MPI_CURRENT_TYPE2=decode_datatype(action[5]);
572 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
574 void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* MPI_CURRENT_TYPE->size());
575 void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
577 int rank = smpi_process()->index();
578 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
579 extra->type = TRACING_ALLTOALL;
580 extra->send_size = send_size;
581 extra->recv_size = recv_size;
582 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
583 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
585 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
587 Colls::alltoall(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
589 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
590 log_timed_action (action, clock);
593 static void action_gather(const char *const *action) {
594 /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
597 1) 68 is the sendcounts
598 2) 68 is the recvcounts
599 3) 0 is the root node
600 4) 0 is the send datatype id, see decode_datatype()
601 5) 0 is the recv datatype id, see decode_datatype()
603 CHECK_ACTION_PARAMS(action, 2, 3)
604 double clock = smpi_process()->simulated_elapsed();
605 int comm_size = MPI_COMM_WORLD->size();
606 int send_size = parse_double(action[2]);
607 int recv_size = parse_double(action[3]);
608 MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
609 if(action[4] && action[5]) {
610 MPI_CURRENT_TYPE=decode_datatype(action[5]);
611 MPI_CURRENT_TYPE2=decode_datatype(action[6]);
613 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
615 void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
616 void *recv = nullptr;
619 root=atoi(action[4]);
620 int rank = MPI_COMM_WORLD->rank();
623 recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
625 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
626 extra->type = TRACING_GATHER;
627 extra->send_size = send_size;
628 extra->recv_size = recv_size;
630 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
631 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
633 TRACE_smpi_collective_in(smpi_process()->index(), root, __FUNCTION__, extra);
635 Colls::gather(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
637 TRACE_smpi_collective_out(smpi_process()->index(), -1, __FUNCTION__);
638 log_timed_action (action, clock);
641 static void action_gatherv(const char *const *action) {
642 /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
643 0 gather 68 68 10 10 10 0 0 0
645 1) 68 is the sendcount
646 2) 68 10 10 10 is the recvcounts
647 3) 0 is the root node
648 4) 0 is the send datatype id, see decode_datatype()
649 5) 0 is the recv datatype id, see decode_datatype()
651 double clock = smpi_process()->simulated_elapsed();
652 int comm_size = MPI_COMM_WORLD->size();
653 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
654 int send_size = parse_double(action[2]);
655 int disps[comm_size];
656 int recvcounts[comm_size];
659 MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
660 if(action[4+comm_size] && action[5+comm_size]) {
661 MPI_CURRENT_TYPE=decode_datatype(action[4+comm_size]);
662 MPI_CURRENT_TYPE2=decode_datatype(action[5+comm_size]);
664 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
666 void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
667 void *recv = nullptr;
668 for(int i=0;i<comm_size;i++) {
669 recvcounts[i] = atoi(action[i+3]);
670 recv_sum=recv_sum+recvcounts[i];
674 int root=atoi(action[3+comm_size]);
675 int rank = MPI_COMM_WORLD->rank();
678 recv = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
680 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
681 extra->type = TRACING_GATHERV;
682 extra->send_size = send_size;
683 extra->recvcounts= xbt_new(int,comm_size);
684 for(int i=0; i< comm_size; i++)//copy data to avoid bad free
685 extra->recvcounts[i] = recvcounts[i];
687 extra->num_processes = comm_size;
688 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
689 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
691 TRACE_smpi_collective_in(smpi_process()->index(), root, __FUNCTION__, extra);
693 Colls::gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts, disps, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
695 TRACE_smpi_collective_out(smpi_process()->index(), -1, __FUNCTION__);
696 log_timed_action (action, clock);
699 static void action_reducescatter(const char *const *action) {
700 /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
701 0 reduceScatter 275427 275427 275427 204020 11346849 0
703 1) The first four values after the name of the action declare the recvcounts array
704 2) The value 11346849 is the amount of instructions
705 3) The last value corresponds to the datatype, see decode_datatype().
707 double clock = smpi_process()->simulated_elapsed();
708 int comm_size = MPI_COMM_WORLD->size();
709 CHECK_ACTION_PARAMS(action, comm_size+1, 1)
710 int comp_size = parse_double(action[2+comm_size]);
711 int recvcounts[comm_size];
712 int rank = smpi_process()->index();
714 if(action[3+comm_size])
715 MPI_CURRENT_TYPE=decode_datatype(action[3+comm_size]);
717 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
719 for(int i=0;i<comm_size;i++) {
720 recvcounts[i] = atoi(action[i+2]);
724 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
725 extra->type = TRACING_REDUCE_SCATTER;
726 extra->send_size = 0;
727 extra->recvcounts= xbt_new(int, comm_size);
728 for(int i=0; i< comm_size; i++)//copy data to avoid bad free
729 extra->recvcounts[i] = recvcounts[i];
730 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
731 extra->comp_size = comp_size;
732 extra->num_processes = comm_size;
734 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
736 void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
737 void *recvbuf = smpi_get_tmp_recvbuffer(size* MPI_CURRENT_TYPE->size());
739 Colls::reduce_scatter(sendbuf, recvbuf, recvcounts, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
740 smpi_execute_flops(comp_size);
742 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
743 log_timed_action (action, clock);
746 static void action_allgather(const char *const *action) {
747 /* The structure of the allgather action for the rank 0 (total 4 processes) is the following:
748 0 allGather 275427 275427
750 1) 275427 is the sendcount
751 2) 275427 is the recvcount
752 3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
754 double clock = smpi_process()->simulated_elapsed();
756 CHECK_ACTION_PARAMS(action, 2, 2)
757 int sendcount=atoi(action[2]);
758 int recvcount=atoi(action[3]);
760 MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
762 if(action[4] && action[5]) {
763 MPI_CURRENT_TYPE = decode_datatype(action[4]);
764 MPI_CURRENT_TYPE2 = decode_datatype(action[5]);
766 MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
768 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
769 void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* MPI_CURRENT_TYPE2->size());
771 int rank = smpi_process()->index();
772 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
773 extra->type = TRACING_ALLGATHER;
774 extra->send_size = sendcount;
775 extra->recv_size= recvcount;
776 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
777 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
778 extra->num_processes = MPI_COMM_WORLD->size();
780 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
782 Colls::allgather(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
784 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
785 log_timed_action (action, clock);
788 static void action_allgatherv(const char *const *action) {
789 /* The structure of the allgatherv action for the rank 0 (total 4 processes) is the following:
790 0 allGatherV 275427 275427 275427 275427 204020
792 1) 275427 is the sendcount
793 2) The next four elements declare the recvcounts array
794 3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
796 double clock = smpi_process()->simulated_elapsed();
798 int comm_size = MPI_COMM_WORLD->size();
799 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
800 int sendcount=atoi(action[2]);
801 int recvcounts[comm_size];
802 int disps[comm_size];
804 MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
806 if(action[3+comm_size] && action[4+comm_size]) {
807 MPI_CURRENT_TYPE = decode_datatype(action[3+comm_size]);
808 MPI_CURRENT_TYPE2 = decode_datatype(action[4+comm_size]);
810 MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
812 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
814 for(int i=0;i<comm_size;i++) {
815 recvcounts[i] = atoi(action[i+3]);
816 recv_sum=recv_sum+recvcounts[i];
819 void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
821 int rank = smpi_process()->index();
822 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
823 extra->type = TRACING_ALLGATHERV;
824 extra->send_size = sendcount;
825 extra->recvcounts= xbt_new(int, comm_size);
826 for(int i=0; i< comm_size; i++)//copy data to avoid bad free
827 extra->recvcounts[i] = recvcounts[i];
828 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
829 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
830 extra->num_processes = comm_size;
832 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
834 Colls::allgatherv(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts, disps, MPI_CURRENT_TYPE2,
837 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
838 log_timed_action (action, clock);
841 static void action_allToAllv(const char *const *action) {
842 /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
843 0 allToAllV 100 1 7 10 12 100 1 70 10 5
845 1) 100 is the size of the send buffer *sizeof(int),
846 2) 1 7 10 12 is the sendcounts array
847 3) 100*sizeof(int) is the size of the receiver buffer
848 4) 1 70 10 5 is the recvcounts array
850 double clock = smpi_process()->simulated_elapsed();
852 int comm_size = MPI_COMM_WORLD->size();
853 CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
854 int sendcounts[comm_size];
855 int recvcounts[comm_size];
856 int senddisps[comm_size];
857 int recvdisps[comm_size];
859 MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
861 int send_buf_size=parse_double(action[2]);
862 int recv_buf_size=parse_double(action[3+comm_size]);
863 if(action[4+2*comm_size] && action[5+2*comm_size]) {
864 MPI_CURRENT_TYPE=decode_datatype(action[4+2*comm_size]);
865 MPI_CURRENT_TYPE2=decode_datatype(action[5+2*comm_size]);
868 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
870 void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* MPI_CURRENT_TYPE->size());
871 void *recvbuf = smpi_get_tmp_recvbuffer(recv_buf_size* MPI_CURRENT_TYPE2->size());
873 for(int i=0;i<comm_size;i++) {
874 sendcounts[i] = atoi(action[i+3]);
875 recvcounts[i] = atoi(action[i+4+comm_size]);
880 int rank = smpi_process()->index();
881 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
882 extra->type = TRACING_ALLTOALLV;
883 extra->recvcounts= xbt_new(int, comm_size);
884 extra->sendcounts= xbt_new(int, comm_size);
885 extra->num_processes = comm_size;
887 for(int i=0; i< comm_size; i++){//copy data to avoid bad free
888 extra->send_size += sendcounts[i];
889 extra->sendcounts[i] = sendcounts[i];
890 extra->recv_size += recvcounts[i];
891 extra->recvcounts[i] = recvcounts[i];
893 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
894 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
896 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
898 Colls::alltoallv(sendbuf, sendcounts, senddisps, MPI_CURRENT_TYPE,recvbuf, recvcounts, recvdisps,
899 MPI_CURRENT_TYPE, MPI_COMM_WORLD);
901 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
902 log_timed_action (action, clock);
905 }} // namespace simgrid::smpi
907 void smpi_replay_run(int *argc, char***argv){
908 /* First initializes everything */
909 simgrid::smpi::Process::init(argc, argv);
910 smpi_process()->mark_as_initialized();
911 smpi_process()->set_replaying(true);
913 int rank = smpi_process()->index();
914 TRACE_smpi_init(rank);
915 TRACE_smpi_computing_init(rank);
916 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
917 extra->type = TRACING_INIT;
918 char *operation =bprintf("%s_init",__FUNCTION__);
919 TRACE_smpi_collective_in(rank, -1, operation, extra);
920 TRACE_smpi_collective_out(rank, -1, operation);
922 xbt_replay_action_register("init", simgrid::smpi::action_init);
923 xbt_replay_action_register("finalize", simgrid::smpi::action_finalize);
924 xbt_replay_action_register("comm_size", simgrid::smpi::action_comm_size);
925 xbt_replay_action_register("comm_split", simgrid::smpi::action_comm_split);
926 xbt_replay_action_register("comm_dup", simgrid::smpi::action_comm_dup);
927 xbt_replay_action_register("send", simgrid::smpi::action_send);
928 xbt_replay_action_register("Isend", simgrid::smpi::action_Isend);
929 xbt_replay_action_register("recv", simgrid::smpi::action_recv);
930 xbt_replay_action_register("Irecv", simgrid::smpi::action_Irecv);
931 xbt_replay_action_register("test", simgrid::smpi::action_test);
932 xbt_replay_action_register("wait", simgrid::smpi::action_wait);
933 xbt_replay_action_register("waitAll", simgrid::smpi::action_waitall);
934 xbt_replay_action_register("barrier", simgrid::smpi::action_barrier);
935 xbt_replay_action_register("bcast", simgrid::smpi::action_bcast);
936 xbt_replay_action_register("reduce", simgrid::smpi::action_reduce);
937 xbt_replay_action_register("allReduce", simgrid::smpi::action_allReduce);
938 xbt_replay_action_register("allToAll", simgrid::smpi::action_allToAll);
939 xbt_replay_action_register("allToAllV", simgrid::smpi::action_allToAllv);
940 xbt_replay_action_register("gather", simgrid::smpi::action_gather);
941 xbt_replay_action_register("gatherV", simgrid::smpi::action_gatherv);
942 xbt_replay_action_register("allGather", simgrid::smpi::action_allgather);
943 xbt_replay_action_register("allGatherV", simgrid::smpi::action_allgatherv);
944 xbt_replay_action_register("reduceScatter", simgrid::smpi::action_reducescatter);
945 xbt_replay_action_register("compute", simgrid::smpi::action_compute);
947 //if we have a delayed start, sleep here.
950 double value = strtod((*argv)[2], &endptr);
952 THROWF(unknown_error, 0, "%s is not a double", (*argv)[2]);
953 XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
954 smpi_execute_flops(value);
956 //UGLY: force a context switch to be sure that all MSG_processes begin initialization
957 XBT_DEBUG("Force context switch by smpi_execute_flops - Sleeping for 0.0 flops ");
958 smpi_execute_flops(0.0);
961 /* Actually run the replay */
962 simgrid::xbt::replay_runner(*argc, *argv);
964 /* and now, finalize everything */
965 /* One active process will stop. Decrease the counter*/
966 XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
967 if (!get_reqq_self()->empty()){
968 unsigned int count_requests=get_reqq_self()->size();
969 MPI_Request requests[count_requests];
970 MPI_Status status[count_requests];
973 for (auto req: *get_reqq_self()){
977 simgrid::smpi::Request::waitall(count_requests, requests, status);
979 delete get_reqq_self();
982 if(active_processes==0){
983 /* Last process alive speaking: end the simulated timer */
984 XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
985 xbt_free(sendbuffer);
986 xbt_free(recvbuffer);
989 instr_extra_data extra_fin = xbt_new0(s_instr_extra_data_t,1);
990 extra_fin->type = TRACING_FINALIZE;
991 operation =bprintf("%s_finalize",__FUNCTION__);
992 TRACE_smpi_collective_in(rank, -1, operation, extra_fin);
994 smpi_process()->finalize();
996 TRACE_smpi_collective_out(rank, -1, operation);
997 TRACE_smpi_finalize(smpi_process()->index());