1 /* Copyright (c) 2009-2017. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
15 #include <unordered_map>
18 #define KEY_SIZE (sizeof(int) * 2 + 1)
20 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
22 int communicator_size = 0;
23 static int active_processes = 0;
24 std::unordered_map<int,std::vector<MPI_Request>*> reqq;
26 MPI_Datatype MPI_DEFAULT_TYPE;
27 MPI_Datatype MPI_CURRENT_TYPE;
29 static int sendbuffer_size=0;
30 char* sendbuffer=nullptr;
31 static int recvbuffer_size=0;
32 char* recvbuffer=nullptr;
34 static void log_timed_action (const char *const *action, double clock){
35 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
36 char *name = xbt_str_join_array(action, " ");
37 XBT_VERB("%s %f", name, smpi_process()->simulated_elapsed()-clock);
42 static std::vector<MPI_Request>* get_reqq_self()
44 return reqq.at(smpi_process()->index());
47 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
49 reqq.insert({smpi_process()->index(), mpi_request});
52 //allocate a single buffer for all sends, growing it if needed
53 void* smpi_get_tmp_sendbuffer(int size)
55 if (not smpi_process()->replaying())
56 return xbt_malloc(size);
57 if (sendbuffer_size<size){
58 sendbuffer=static_cast<char*>(xbt_realloc(sendbuffer,size));
64 //allocate a single buffer for all recv
65 void* smpi_get_tmp_recvbuffer(int size){
66 if (not smpi_process()->replaying())
67 return xbt_malloc(size);
68 if (recvbuffer_size<size){
69 recvbuffer=static_cast<char*>(xbt_realloc(recvbuffer,size));
75 void smpi_free_tmp_buffer(void* buf){
76 if (not smpi_process()->replaying())
81 static double parse_double(const char *string)
84 double value = strtod(string, &endptr);
86 THROWF(unknown_error, 0, "%s is not a double", string);
91 //TODO: this logic should be moved inside the datatype class, to support all predefined types and get rid of is_replayable.
92 static MPI_Datatype decode_datatype(const char *const action)
94 switch(atoi(action)) {
96 MPI_CURRENT_TYPE=MPI_DOUBLE;
99 MPI_CURRENT_TYPE=MPI_INT;
102 MPI_CURRENT_TYPE=MPI_CHAR;
105 MPI_CURRENT_TYPE=MPI_SHORT;
108 MPI_CURRENT_TYPE=MPI_LONG;
111 MPI_CURRENT_TYPE=MPI_FLOAT;
114 MPI_CURRENT_TYPE=MPI_BYTE;
117 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
120 return MPI_CURRENT_TYPE;
123 const char* encode_datatype(MPI_Datatype datatype)
125 if (datatype==MPI_BYTE)
127 if(datatype==MPI_DOUBLE)
129 if(datatype==MPI_INT)
131 if(datatype==MPI_CHAR)
133 if(datatype==MPI_SHORT)
135 if(datatype==MPI_LONG)
137 if(datatype==MPI_FLOAT)
139 // default - not implemented.
140 // do not warn here as we pass in this function even for other trace formats
144 #define CHECK_ACTION_PARAMS(action, mandatory, optional) {\
146 while(action[i]!=nullptr)\
149 THROWF(arg_error, 0, "%s replay failed.\n" \
150 "%d items were given on the line. First two should be process_id and action. " \
151 "This action needs after them %d mandatory arguments, and accepts %d optional ones. \n" \
152 "Please contact the Simgrid team if support is needed", __FUNCTION__, i, mandatory, optional);\
158 static void action_init(const char *const *action)
160 XBT_DEBUG("Initialize the counters");
161 CHECK_ACTION_PARAMS(action, 0, 1)
163 MPI_DEFAULT_TYPE = MPI_DOUBLE; // default MPE datatype
165 MPI_DEFAULT_TYPE = MPI_BYTE; // default TAU datatype
167 /* start a simulated timer */
168 smpi_process()->simulated_start();
169 /*initialize the number of active processes */
170 active_processes = smpi_process_count();
172 set_reqq_self(new std::vector<MPI_Request>);
175 static void action_finalize(const char *const *action)
180 static void action_comm_size(const char *const *action)
182 communicator_size = parse_double(action[2]);
183 log_timed_action (action, smpi_process()->simulated_elapsed());
186 static void action_comm_split(const char *const *action)
188 log_timed_action (action, smpi_process()->simulated_elapsed());
191 static void action_comm_dup(const char *const *action)
193 log_timed_action (action, smpi_process()->simulated_elapsed());
196 static void action_compute(const char *const *action)
198 CHECK_ACTION_PARAMS(action, 1, 0)
199 double clock = smpi_process()->simulated_elapsed();
200 double flops= parse_double(action[2]);
201 int rank = smpi_process()->index();
203 TRACE_smpi_computing_in(rank, flops);
204 smpi_execute_flops(flops);
205 TRACE_smpi_computing_out(rank);
207 log_timed_action (action, clock);
210 static void action_send(const char *const *action)
212 CHECK_ACTION_PARAMS(action, 2, 1)
213 int to = atoi(action[2]);
214 double size=parse_double(action[3]);
215 double clock = smpi_process()->simulated_elapsed();
218 MPI_CURRENT_TYPE=decode_datatype(action[4]);
220 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
222 int rank = smpi_process()->index();
223 int dst_traced = MPI_COMM_WORLD->group()->rank(to);
225 TRACE_smpi_comm_in(rank, __FUNCTION__,
226 new simgrid::instr::Pt2PtTIData("send", dst_traced, size, encode_datatype(MPI_CURRENT_TYPE)));
227 if (not TRACE_smpi_view_internals())
228 TRACE_smpi_send(rank, rank, dst_traced, 0, size*MPI_CURRENT_TYPE->size());
230 Request::send(nullptr, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
232 TRACE_smpi_comm_out(rank);
234 log_timed_action(action, clock);
237 static void action_Isend(const char *const *action)
239 CHECK_ACTION_PARAMS(action, 2, 1)
240 int to = atoi(action[2]);
241 double size=parse_double(action[3]);
242 double clock = smpi_process()->simulated_elapsed();
245 MPI_CURRENT_TYPE=decode_datatype(action[4]);
247 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
249 int rank = smpi_process()->index();
250 int dst_traced = MPI_COMM_WORLD->group()->rank(to);
251 TRACE_smpi_comm_in(rank, __FUNCTION__,
252 new simgrid::instr::Pt2PtTIData("Isend", dst_traced, size, encode_datatype(MPI_CURRENT_TYPE)));
253 if (not TRACE_smpi_view_internals())
254 TRACE_smpi_send(rank, rank, dst_traced, 0, size*MPI_CURRENT_TYPE->size());
256 MPI_Request request = Request::isend(nullptr, size, MPI_CURRENT_TYPE, to, 0, MPI_COMM_WORLD);
258 TRACE_smpi_comm_out(rank);
260 get_reqq_self()->push_back(request);
262 log_timed_action (action, clock);
265 static void action_recv(const char *const *action) {
266 CHECK_ACTION_PARAMS(action, 2, 1)
267 int from = atoi(action[2]);
268 double size=parse_double(action[3]);
269 double clock = smpi_process()->simulated_elapsed();
273 MPI_CURRENT_TYPE=decode_datatype(action[4]);
275 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
277 int rank = smpi_process()->index();
278 int src_traced = MPI_COMM_WORLD->group()->rank(from);
280 TRACE_smpi_comm_in(rank, __FUNCTION__,
281 new simgrid::instr::Pt2PtTIData("recv", src_traced, size, encode_datatype(MPI_CURRENT_TYPE)));
283 //unknown size from the receiver point of view
285 Request::probe(from, 0, MPI_COMM_WORLD, &status);
289 Request::recv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
291 TRACE_smpi_comm_out(rank);
292 if (not TRACE_smpi_view_internals()) {
293 TRACE_smpi_recv(src_traced, rank, 0);
296 log_timed_action (action, clock);
299 static void action_Irecv(const char *const *action)
301 CHECK_ACTION_PARAMS(action, 2, 1)
302 int from = atoi(action[2]);
303 double size=parse_double(action[3]);
304 double clock = smpi_process()->simulated_elapsed();
307 MPI_CURRENT_TYPE=decode_datatype(action[4]);
309 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
311 int rank = smpi_process()->index();
312 int src_traced = MPI_COMM_WORLD->group()->rank(from);
313 TRACE_smpi_comm_in(rank, __FUNCTION__,
314 new simgrid::instr::Pt2PtTIData("Irecv", src_traced, size, encode_datatype(MPI_CURRENT_TYPE)));
316 //unknow size from the receiver pov
318 Request::probe(from, 0, MPI_COMM_WORLD, &status);
322 MPI_Request request = Request::irecv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
324 TRACE_smpi_comm_out(rank);
325 get_reqq_self()->push_back(request);
327 log_timed_action (action, clock);
330 static void action_test(const char* const* action)
332 CHECK_ACTION_PARAMS(action, 0, 0)
333 double clock = smpi_process()->simulated_elapsed();
336 MPI_Request request = get_reqq_self()->back();
337 get_reqq_self()->pop_back();
338 //if request is null here, this may mean that a previous test has succeeded
339 //Different times in traced application and replayed version may lead to this
340 //In this case, ignore the extra calls.
341 if(request!=nullptr){
342 int rank = smpi_process()->index();
343 TRACE_smpi_testing_in(rank);
345 int flag = Request::test(&request, &status);
347 XBT_DEBUG("MPI_Test result: %d", flag);
348 /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now nullptr.*/
349 get_reqq_self()->push_back(request);
351 TRACE_smpi_testing_out(rank);
353 log_timed_action (action, clock);
356 static void action_wait(const char *const *action){
357 CHECK_ACTION_PARAMS(action, 0, 0)
358 double clock = smpi_process()->simulated_elapsed();
361 xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s",
362 xbt_str_join_array(action," "));
363 MPI_Request request = get_reqq_self()->back();
364 get_reqq_self()->pop_back();
366 if (request==nullptr){
367 /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just return.*/
371 int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
373 MPI_Group group = request->comm()->group();
374 int src_traced = group->rank(request->src());
375 int dst_traced = group->rank(request->dst());
376 int is_wait_for_receive = (request->flags() & RECV);
377 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::NoOpTIData("wait"));
379 Request::wait(&request, &status);
381 TRACE_smpi_comm_out(rank);
382 if (is_wait_for_receive)
383 TRACE_smpi_recv(src_traced, dst_traced, 0);
384 log_timed_action (action, clock);
387 static void action_waitall(const char *const *action){
388 CHECK_ACTION_PARAMS(action, 0, 0)
389 double clock = smpi_process()->simulated_elapsed();
390 const unsigned int count_requests = get_reqq_self()->size();
392 if (count_requests>0) {
393 MPI_Status status[count_requests];
395 int rank_traced = smpi_process()->index();
396 TRACE_smpi_comm_in(rank_traced, __FUNCTION__, new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
397 int recvs_snd[count_requests];
398 int recvs_rcv[count_requests];
399 for (unsigned int i = 0; i < count_requests; i++) {
400 const auto& req = (*get_reqq_self())[i];
401 if (req && (req->flags () & RECV)){
402 recvs_snd[i]=req->src();
403 recvs_rcv[i]=req->dst();
407 Request::waitall(count_requests, &(*get_reqq_self())[0], status);
409 for (unsigned i = 0; i < count_requests; i++) {
410 if (recvs_snd[i]!=-100)
411 TRACE_smpi_recv(recvs_snd[i], recvs_rcv[i],0);
413 TRACE_smpi_comm_out(rank_traced);
415 log_timed_action (action, clock);
418 static void action_barrier(const char *const *action){
419 double clock = smpi_process()->simulated_elapsed();
420 int rank = smpi_process()->index();
421 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::NoOpTIData("barrier"));
423 Colls::barrier(MPI_COMM_WORLD);
425 TRACE_smpi_comm_out(rank);
426 log_timed_action (action, clock);
429 static void action_bcast(const char *const *action)
431 CHECK_ACTION_PARAMS(action, 1, 2)
432 double size = parse_double(action[2]);
433 double clock = smpi_process()->simulated_elapsed();
435 /* Initialize MPI_CURRENT_TYPE in order to decrease the number of the checks */
436 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
439 root= atoi(action[3]);
441 MPI_CURRENT_TYPE=decode_datatype(action[4]);
444 int rank = smpi_process()->index();
445 TRACE_smpi_comm_in(rank, __FUNCTION__,
446 new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(root)->getPid()-1, -1.0, size, -1,
447 encode_datatype(MPI_CURRENT_TYPE), ""));
449 void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
451 Colls::bcast(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
453 TRACE_smpi_comm_out(rank);
454 log_timed_action (action, clock);
457 static void action_reduce(const char *const *action)
459 CHECK_ACTION_PARAMS(action, 2, 2)
460 double comm_size = parse_double(action[2]);
461 double comp_size = parse_double(action[3]);
462 double clock = smpi_process()->simulated_elapsed();
464 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
467 root= atoi(action[4]);
469 MPI_CURRENT_TYPE=decode_datatype(action[5]);
472 int rank = smpi_process()->index();
473 TRACE_smpi_comm_in(rank, __FUNCTION__,
474 new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(root)->getPid()-1, comp_size,
475 comm_size, -1, encode_datatype(MPI_CURRENT_TYPE), ""));
477 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
478 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
479 Colls::reduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
480 smpi_execute_flops(comp_size);
482 TRACE_smpi_comm_out(rank);
483 log_timed_action (action, clock);
486 static void action_allReduce(const char *const *action) {
487 CHECK_ACTION_PARAMS(action, 2, 1)
488 double comm_size = parse_double(action[2]);
489 double comp_size = parse_double(action[3]);
492 MPI_CURRENT_TYPE=decode_datatype(action[4]);
494 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
496 double clock = smpi_process()->simulated_elapsed();
497 int rank = smpi_process()->index();
498 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("allReduce", -1, comp_size, comm_size, -1,
499 encode_datatype(MPI_CURRENT_TYPE), ""));
501 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
502 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
503 Colls::allreduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
504 smpi_execute_flops(comp_size);
506 TRACE_smpi_comm_out(rank);
507 log_timed_action (action, clock);
510 static void action_allToAll(const char *const *action) {
511 CHECK_ACTION_PARAMS(action, 2, 2) //two mandatory (send and recv volumes) and two optional (corresponding datatypes)
512 double clock = smpi_process()->simulated_elapsed();
513 int comm_size = MPI_COMM_WORLD->size();
514 int send_size = parse_double(action[2]);
515 int recv_size = parse_double(action[3]);
516 MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
518 if(action[4] && action[5]) {
519 MPI_CURRENT_TYPE=decode_datatype(action[4]);
520 MPI_CURRENT_TYPE2=decode_datatype(action[5]);
523 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
525 void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* MPI_CURRENT_TYPE->size());
526 void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
528 int rank = smpi_process()->index();
529 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("allToAll", -1, -1.0, send_size, recv_size,
530 encode_datatype(MPI_CURRENT_TYPE),
531 encode_datatype(MPI_CURRENT_TYPE2)));
533 Colls::alltoall(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
535 TRACE_smpi_comm_out(rank);
536 log_timed_action (action, clock);
539 static void action_gather(const char *const *action) {
540 /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
543 1) 68 is the sendcounts
544 2) 68 is the recvcounts
545 3) 0 is the root node
546 4) 0 is the send datatype id, see decode_datatype()
547 5) 0 is the recv datatype id, see decode_datatype()
549 CHECK_ACTION_PARAMS(action, 2, 3)
550 double clock = smpi_process()->simulated_elapsed();
551 int comm_size = MPI_COMM_WORLD->size();
552 int send_size = parse_double(action[2]);
553 int recv_size = parse_double(action[3]);
554 MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
555 if(action[4] && action[5]) {
556 MPI_CURRENT_TYPE=decode_datatype(action[5]);
557 MPI_CURRENT_TYPE2=decode_datatype(action[6]);
559 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
561 void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
562 void *recv = nullptr;
565 root=atoi(action[4]);
566 int rank = MPI_COMM_WORLD->rank();
569 recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
571 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
572 encode_datatype(MPI_CURRENT_TYPE),
573 encode_datatype(MPI_CURRENT_TYPE2)));
575 Colls::gather(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
577 TRACE_smpi_comm_out(smpi_process()->index());
578 log_timed_action (action, clock);
581 static void action_scatter(const char* const* action)
583 /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
586 1) 68 is the sendcounts
587 2) 68 is the recvcounts
588 3) 0 is the root node
589 4) 0 is the send datatype id, see decode_datatype()
590 5) 0 is the recv datatype id, see decode_datatype()
592 CHECK_ACTION_PARAMS(action, 2, 3)
593 double clock = smpi_process()->simulated_elapsed();
594 int comm_size = MPI_COMM_WORLD->size();
595 int send_size = parse_double(action[2]);
596 int recv_size = parse_double(action[3]);
597 MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
598 if (action[4] && action[5]) {
599 MPI_CURRENT_TYPE = decode_datatype(action[5]);
600 MPI_CURRENT_TYPE2 = decode_datatype(action[6]);
602 MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
604 void* send = smpi_get_tmp_sendbuffer(send_size * MPI_CURRENT_TYPE->size());
605 void* recv = nullptr;
608 root = atoi(action[4]);
609 int rank = MPI_COMM_WORLD->rank();
612 recv = smpi_get_tmp_recvbuffer(recv_size * comm_size * MPI_CURRENT_TYPE2->size());
614 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
615 encode_datatype(MPI_CURRENT_TYPE),
616 encode_datatype(MPI_CURRENT_TYPE2)));
618 Colls::scatter(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
620 TRACE_smpi_comm_out(smpi_process()->index());
621 log_timed_action(action, clock);
624 static void action_gatherv(const char *const *action) {
625 /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
626 0 gather 68 68 10 10 10 0 0 0
628 1) 68 is the sendcount
629 2) 68 10 10 10 is the recvcounts
630 3) 0 is the root node
631 4) 0 is the send datatype id, see decode_datatype()
632 5) 0 is the recv datatype id, see decode_datatype()
634 double clock = smpi_process()->simulated_elapsed();
635 int comm_size = MPI_COMM_WORLD->size();
636 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
637 int send_size = parse_double(action[2]);
638 int disps[comm_size];
639 int recvcounts[comm_size];
642 MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
643 if(action[4+comm_size] && action[5+comm_size]) {
644 MPI_CURRENT_TYPE=decode_datatype(action[4+comm_size]);
645 MPI_CURRENT_TYPE2=decode_datatype(action[5+comm_size]);
647 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
649 void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
650 void *recv = nullptr;
651 for(int i=0;i<comm_size;i++) {
652 recvcounts[i] = atoi(action[i+3]);
653 recv_sum=recv_sum+recvcounts[i];
657 int root=atoi(action[3+comm_size]);
658 int rank = MPI_COMM_WORLD->rank();
661 recv = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
663 std::vector<int>* trace_recvcounts = new std::vector<int>;
664 for (int i = 0; i < comm_size; i++) // copy data to avoid bad free
665 trace_recvcounts->push_back(recvcounts[i]);
667 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData(
668 "gatherV", root, send_size, nullptr, -1, trace_recvcounts,
669 encode_datatype(MPI_CURRENT_TYPE), encode_datatype(MPI_CURRENT_TYPE2)));
671 Colls::gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts, disps, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
673 TRACE_smpi_comm_out(smpi_process()->index());
674 log_timed_action (action, clock);
677 static void action_scatterv(const char* const* action)
679 /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
680 0 gather 68 10 10 10 68 0 0 0
682 1) 68 10 10 10 is the sendcounts
683 2) 68 is the recvcount
684 3) 0 is the root node
685 4) 0 is the send datatype id, see decode_datatype()
686 5) 0 is the recv datatype id, see decode_datatype()
688 double clock = smpi_process()->simulated_elapsed();
689 int comm_size = MPI_COMM_WORLD->size();
690 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
691 int recv_size = parse_double(action[2 + comm_size]);
692 int disps[comm_size];
693 int sendcounts[comm_size];
696 MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
697 if (action[4 + comm_size] && action[5 + comm_size]) {
698 MPI_CURRENT_TYPE = decode_datatype(action[4 + comm_size]);
699 MPI_CURRENT_TYPE2 = decode_datatype(action[5 + comm_size]);
701 MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
703 void* send = nullptr;
704 void* recv = smpi_get_tmp_recvbuffer(recv_size * MPI_CURRENT_TYPE->size());
705 for (int i = 0; i < comm_size; i++) {
706 sendcounts[i] = atoi(action[i + 2]);
707 send_sum += sendcounts[i];
711 int root = atoi(action[3 + comm_size]);
712 int rank = MPI_COMM_WORLD->rank();
715 send = smpi_get_tmp_sendbuffer(send_sum * MPI_CURRENT_TYPE2->size());
717 std::vector<int>* trace_sendcounts = new std::vector<int>;
718 for (int i = 0; i < comm_size; i++) // copy data to avoid bad free
719 trace_sendcounts->push_back(sendcounts[i]);
721 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData(
722 "gatherV", root, -1, trace_sendcounts, recv_size, nullptr,
723 encode_datatype(MPI_CURRENT_TYPE), encode_datatype(MPI_CURRENT_TYPE2)));
725 Colls::scatterv(send, sendcounts, disps, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
727 TRACE_smpi_comm_out(smpi_process()->index());
728 log_timed_action(action, clock);
731 static void action_reducescatter(const char *const *action) {
732 /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
733 0 reduceScatter 275427 275427 275427 204020 11346849 0
735 1) The first four values after the name of the action declare the recvcounts array
736 2) The value 11346849 is the amount of instructions
737 3) The last value corresponds to the datatype, see decode_datatype().
739 double clock = smpi_process()->simulated_elapsed();
740 int comm_size = MPI_COMM_WORLD->size();
741 CHECK_ACTION_PARAMS(action, comm_size+1, 1)
742 int comp_size = parse_double(action[2+comm_size]);
743 int recvcounts[comm_size];
744 int rank = smpi_process()->index();
746 std::vector<int>* trace_recvcounts = new std::vector<int>;
747 if(action[3+comm_size])
748 MPI_CURRENT_TYPE=decode_datatype(action[3+comm_size]);
750 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
752 for(int i=0;i<comm_size;i++) {
753 recvcounts[i] = atoi(action[i+2]);
754 trace_recvcounts->push_back(recvcounts[i]);
758 TRACE_smpi_comm_in(rank, __FUNCTION__,
759 new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, trace_recvcounts,
760 std::to_string(comp_size), /* ugly hack to print comp_size */
761 encode_datatype(MPI_CURRENT_TYPE)));
763 void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
764 void *recvbuf = smpi_get_tmp_recvbuffer(size* MPI_CURRENT_TYPE->size());
766 Colls::reduce_scatter(sendbuf, recvbuf, recvcounts, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
767 smpi_execute_flops(comp_size);
769 TRACE_smpi_comm_out(rank);
770 log_timed_action (action, clock);
773 static void action_allgather(const char *const *action) {
774 /* The structure of the allgather action for the rank 0 (total 4 processes) is the following:
775 0 allGather 275427 275427
777 1) 275427 is the sendcount
778 2) 275427 is the recvcount
779 3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
781 double clock = smpi_process()->simulated_elapsed();
783 CHECK_ACTION_PARAMS(action, 2, 2)
784 int sendcount=atoi(action[2]);
785 int recvcount=atoi(action[3]);
787 MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
789 if(action[4] && action[5]) {
790 MPI_CURRENT_TYPE = decode_datatype(action[4]);
791 MPI_CURRENT_TYPE2 = decode_datatype(action[5]);
793 MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
795 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
796 void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* MPI_CURRENT_TYPE2->size());
798 int rank = smpi_process()->index();
800 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("allGather", -1, -1.0, sendcount, recvcount,
801 encode_datatype(MPI_CURRENT_TYPE),
802 encode_datatype(MPI_CURRENT_TYPE2)));
804 Colls::allgather(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
806 TRACE_smpi_comm_out(rank);
807 log_timed_action (action, clock);
810 static void action_allgatherv(const char *const *action) {
811 /* The structure of the allgatherv action for the rank 0 (total 4 processes) is the following:
812 0 allGatherV 275427 275427 275427 275427 204020
814 1) 275427 is the sendcount
815 2) The next four elements declare the recvcounts array
816 3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
818 double clock = smpi_process()->simulated_elapsed();
820 int comm_size = MPI_COMM_WORLD->size();
821 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
822 int sendcount=atoi(action[2]);
823 int recvcounts[comm_size];
824 int disps[comm_size];
826 MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
828 if(action[3+comm_size] && action[4+comm_size]) {
829 MPI_CURRENT_TYPE = decode_datatype(action[3+comm_size]);
830 MPI_CURRENT_TYPE2 = decode_datatype(action[4+comm_size]);
832 MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
834 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
836 for(int i=0;i<comm_size;i++) {
837 recvcounts[i] = atoi(action[i+3]);
838 recv_sum=recv_sum+recvcounts[i];
841 void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
843 int rank = smpi_process()->index();
845 std::vector<int>* trace_recvcounts = new std::vector<int>;
846 for (int i = 0; i < comm_size; i++) // copy data to avoid bad free
847 trace_recvcounts->push_back(recvcounts[i]);
849 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData(
850 "allGatherV", -1, sendcount, nullptr, -1, trace_recvcounts,
851 encode_datatype(MPI_CURRENT_TYPE), encode_datatype(MPI_CURRENT_TYPE2)));
853 Colls::allgatherv(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts, disps, MPI_CURRENT_TYPE2,
856 TRACE_smpi_comm_out(rank);
857 log_timed_action (action, clock);
860 static void action_allToAllv(const char *const *action) {
861 /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
862 0 allToAllV 100 1 7 10 12 100 1 70 10 5
864 1) 100 is the size of the send buffer *sizeof(int),
865 2) 1 7 10 12 is the sendcounts array
866 3) 100*sizeof(int) is the size of the receiver buffer
867 4) 1 70 10 5 is the recvcounts array
869 double clock = smpi_process()->simulated_elapsed();
871 int comm_size = MPI_COMM_WORLD->size();
872 CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
875 int sendcounts[comm_size];
876 std::vector<int>* trace_sendcounts = new std::vector<int>;
877 int recvcounts[comm_size];
878 std::vector<int>* trace_recvcounts = new std::vector<int>;
879 int senddisps[comm_size];
880 int recvdisps[comm_size];
882 MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
884 int send_buf_size=parse_double(action[2]);
885 int recv_buf_size=parse_double(action[3+comm_size]);
886 if(action[4+2*comm_size] && action[5+2*comm_size]) {
887 MPI_CURRENT_TYPE=decode_datatype(action[4+2*comm_size]);
888 MPI_CURRENT_TYPE2=decode_datatype(action[5+2*comm_size]);
891 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
893 int rank = smpi_process()->index();
894 void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* MPI_CURRENT_TYPE->size());
895 void *recvbuf = smpi_get_tmp_recvbuffer(recv_buf_size* MPI_CURRENT_TYPE2->size());
897 for(int i=0;i<comm_size;i++) {
898 sendcounts[i] = atoi(action[i+3]);
899 trace_sendcounts->push_back(sendcounts[i]);
900 send_size += sendcounts[i];
901 recvcounts[i] = atoi(action[i+4+comm_size]);
902 trace_recvcounts->push_back(recvcounts[i]);
903 recv_size += recvcounts[i];
908 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData(
909 "allToAllV", -1, send_size, trace_sendcounts, recv_size, trace_recvcounts,
910 encode_datatype(MPI_CURRENT_TYPE), encode_datatype(MPI_CURRENT_TYPE2)));
912 Colls::alltoallv(sendbuf, sendcounts, senddisps, MPI_CURRENT_TYPE,recvbuf, recvcounts, recvdisps,
913 MPI_CURRENT_TYPE, MPI_COMM_WORLD);
915 TRACE_smpi_comm_out(rank);
916 log_timed_action (action, clock);
919 }} // namespace simgrid::smpi
921 /** @brief Only initialize the replay, don't do it for real */
922 void smpi_replay_init(int* argc, char*** argv)
924 simgrid::smpi::Process::init(argc, argv);
925 smpi_process()->mark_as_initialized();
926 smpi_process()->set_replaying(true);
928 int rank = smpi_process()->index();
929 TRACE_smpi_init(rank);
930 TRACE_smpi_computing_init(rank);
931 TRACE_smpi_comm_in(rank, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
932 TRACE_smpi_comm_out(rank);
933 xbt_replay_action_register("init", simgrid::smpi::action_init);
934 xbt_replay_action_register("finalize", simgrid::smpi::action_finalize);
935 xbt_replay_action_register("comm_size", simgrid::smpi::action_comm_size);
936 xbt_replay_action_register("comm_split", simgrid::smpi::action_comm_split);
937 xbt_replay_action_register("comm_dup", simgrid::smpi::action_comm_dup);
938 xbt_replay_action_register("send", simgrid::smpi::action_send);
939 xbt_replay_action_register("Isend", simgrid::smpi::action_Isend);
940 xbt_replay_action_register("recv", simgrid::smpi::action_recv);
941 xbt_replay_action_register("Irecv", simgrid::smpi::action_Irecv);
942 xbt_replay_action_register("test", simgrid::smpi::action_test);
943 xbt_replay_action_register("wait", simgrid::smpi::action_wait);
944 xbt_replay_action_register("waitAll", simgrid::smpi::action_waitall);
945 xbt_replay_action_register("barrier", simgrid::smpi::action_barrier);
946 xbt_replay_action_register("bcast", simgrid::smpi::action_bcast);
947 xbt_replay_action_register("reduce", simgrid::smpi::action_reduce);
948 xbt_replay_action_register("allReduce", simgrid::smpi::action_allReduce);
949 xbt_replay_action_register("allToAll", simgrid::smpi::action_allToAll);
950 xbt_replay_action_register("allToAllV", simgrid::smpi::action_allToAllv);
951 xbt_replay_action_register("gather", simgrid::smpi::action_gather);
952 xbt_replay_action_register("scatter", simgrid::smpi::action_scatter);
953 xbt_replay_action_register("gatherV", simgrid::smpi::action_gatherv);
954 xbt_replay_action_register("scatterV", simgrid::smpi::action_scatterv);
955 xbt_replay_action_register("allGather", simgrid::smpi::action_allgather);
956 xbt_replay_action_register("allGatherV", simgrid::smpi::action_allgatherv);
957 xbt_replay_action_register("reduceScatter", simgrid::smpi::action_reducescatter);
958 xbt_replay_action_register("compute", simgrid::smpi::action_compute);
960 //if we have a delayed start, sleep here.
962 double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
963 XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
964 smpi_execute_flops(value);
966 //UGLY: force a context switch to be sure that all MSG_processes begin initialization
967 XBT_DEBUG("Force context switch by smpi_execute_flops - Sleeping for 0.0 flops ");
968 smpi_execute_flops(0.0);
972 /** @brief actually run the replay after initialization */
973 void smpi_replay_main(int* argc, char*** argv)
975 simgrid::xbt::replay_runner(*argc, *argv);
977 /* and now, finalize everything */
978 /* One active process will stop. Decrease the counter*/
979 XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
980 if (not get_reqq_self()->empty()) {
981 unsigned int count_requests=get_reqq_self()->size();
982 MPI_Request requests[count_requests];
983 MPI_Status status[count_requests];
986 for (auto const& req : *get_reqq_self()) {
990 simgrid::smpi::Request::waitall(count_requests, requests, status);
992 delete get_reqq_self();
995 if(active_processes==0){
996 /* Last process alive speaking: end the simulated timer */
997 XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
998 xbt_free(sendbuffer);
999 xbt_free(recvbuffer);
1002 TRACE_smpi_comm_in(smpi_process()->index(), "smpi_replay_run_finalize", new simgrid::instr::NoOpTIData("finalize"));
1004 smpi_process()->finalize();
1006 TRACE_smpi_comm_out(smpi_process()->index());
1007 TRACE_smpi_finalize(smpi_process()->index());
1010 /** @brief chain a replay initialization and a replay start */
1011 void smpi_replay_run(int* argc, char*** argv)
1013 smpi_replay_init(argc, argv);
1014 smpi_replay_main(argc, argv);