1 /* Copyright (c) 2009-2017. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
15 #include <unordered_map>
18 #define KEY_SIZE (sizeof(int) * 2 + 1)
20 using simgrid::s4u::Actor;
22 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
24 int communicator_size = 0;
25 static int active_processes = 0;
26 std::unordered_map<int,std::vector<MPI_Request>*> reqq;
28 MPI_Datatype MPI_DEFAULT_TYPE;
29 MPI_Datatype MPI_CURRENT_TYPE;
31 static int sendbuffer_size=0;
32 char* sendbuffer=nullptr;
33 static int recvbuffer_size=0;
34 char* recvbuffer=nullptr;
36 static void log_timed_action (const char *const *action, double clock){
37 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
38 char *name = xbt_str_join_array(action, " ");
39 XBT_VERB("%s %f", name, smpi_process()->simulated_elapsed()-clock);
44 static std::vector<MPI_Request>* get_reqq_self()
46 return reqq.at(Actor::self()->getPid());
49 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
51 reqq.insert({Actor::self()->getPid(), mpi_request});
54 //allocate a single buffer for all sends, growing it if needed
55 void* smpi_get_tmp_sendbuffer(int size)
57 if (not smpi_process()->replaying())
58 return xbt_malloc(size);
59 if (sendbuffer_size<size){
60 sendbuffer=static_cast<char*>(xbt_realloc(sendbuffer,size));
66 //allocate a single buffer for all recv
67 void* smpi_get_tmp_recvbuffer(int size){
68 if (not smpi_process()->replaying())
69 return xbt_malloc(size);
70 if (recvbuffer_size<size){
71 recvbuffer=static_cast<char*>(xbt_realloc(recvbuffer,size));
77 void smpi_free_tmp_buffer(void* buf){
78 if (not smpi_process()->replaying())
83 static double parse_double(const char *string)
86 double value = strtod(string, &endptr);
88 THROWF(unknown_error, 0, "%s is not a double", string);
93 //TODO: this logic should be moved inside the datatype class, to support all predefined types and get rid of is_replayable.
94 static MPI_Datatype decode_datatype(const char *const action)
96 switch(atoi(action)) {
98 MPI_CURRENT_TYPE=MPI_DOUBLE;
101 MPI_CURRENT_TYPE=MPI_INT;
104 MPI_CURRENT_TYPE=MPI_CHAR;
107 MPI_CURRENT_TYPE=MPI_SHORT;
110 MPI_CURRENT_TYPE=MPI_LONG;
113 MPI_CURRENT_TYPE=MPI_FLOAT;
116 MPI_CURRENT_TYPE=MPI_BYTE;
119 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
122 return MPI_CURRENT_TYPE;
125 const char* encode_datatype(MPI_Datatype datatype)
127 if (datatype==MPI_BYTE)
129 if(datatype==MPI_DOUBLE)
131 if(datatype==MPI_INT)
133 if(datatype==MPI_CHAR)
135 if(datatype==MPI_SHORT)
137 if(datatype==MPI_LONG)
139 if(datatype==MPI_FLOAT)
141 // default - not implemented.
142 // do not warn here as we pass in this function even for other trace formats
146 #define CHECK_ACTION_PARAMS(action, mandatory, optional) {\
148 while(action[i]!=nullptr)\
151 THROWF(arg_error, 0, "%s replay failed.\n" \
152 "%d items were given on the line. First two should be process_id and action. " \
153 "This action needs after them %d mandatory arguments, and accepts %d optional ones. \n" \
154 "Please contact the Simgrid team if support is needed", __FUNCTION__, i, mandatory, optional);\
160 static void action_init(const char *const *action)
162 XBT_DEBUG("Initialize the counters");
163 CHECK_ACTION_PARAMS(action, 0, 1)
165 MPI_DEFAULT_TYPE = MPI_DOUBLE; // default MPE datatype
167 MPI_DEFAULT_TYPE = MPI_BYTE; // default TAU datatype
169 /* start a simulated timer */
170 smpi_process()->simulated_start();
171 /*initialize the number of active processes */
172 active_processes = smpi_process_count();
174 set_reqq_self(new std::vector<MPI_Request>);
177 static void action_finalize(const char *const *action)
182 static void action_comm_size(const char *const *action)
184 communicator_size = parse_double(action[2]);
185 log_timed_action (action, smpi_process()->simulated_elapsed());
188 static void action_comm_split(const char *const *action)
190 log_timed_action (action, smpi_process()->simulated_elapsed());
193 static void action_comm_dup(const char *const *action)
195 log_timed_action (action, smpi_process()->simulated_elapsed());
198 static void action_compute(const char *const *action)
200 CHECK_ACTION_PARAMS(action, 1, 0)
201 double clock = smpi_process()->simulated_elapsed();
202 double flops= parse_double(action[2]);
203 int my_proc_id = Actor::self()->getPid();
205 TRACE_smpi_computing_in(my_proc_id, flops);
206 smpi_execute_flops(flops);
207 TRACE_smpi_computing_out(my_proc_id);
209 log_timed_action (action, clock);
212 static void action_send(const char *const *action)
214 CHECK_ACTION_PARAMS(action, 2, 1)
215 int to = atoi(action[2]);
216 double size=parse_double(action[3]);
217 double clock = smpi_process()->simulated_elapsed();
220 MPI_CURRENT_TYPE=decode_datatype(action[4]);
222 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
224 int my_proc_id = Actor::self()->getPid();
225 int dst_traced = MPI_COMM_WORLD->group()->actor(to)->getPid();
227 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
228 new simgrid::instr::Pt2PtTIData("send", to, size, encode_datatype(MPI_CURRENT_TYPE)));
229 if (not TRACE_smpi_view_internals())
230 TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, size * MPI_CURRENT_TYPE->size());
232 Request::send(nullptr, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
234 TRACE_smpi_comm_out(my_proc_id);
236 log_timed_action(action, clock);
239 static void action_Isend(const char *const *action)
241 CHECK_ACTION_PARAMS(action, 2, 1)
242 int to = atoi(action[2]);
243 double size=parse_double(action[3]);
244 double clock = smpi_process()->simulated_elapsed();
247 MPI_CURRENT_TYPE=decode_datatype(action[4]);
249 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
251 int my_proc_id = Actor::self()->getPid();
252 int dst_traced = MPI_COMM_WORLD->group()->actor(to)->getPid();
253 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
254 new simgrid::instr::Pt2PtTIData("Isend", to, size, encode_datatype(MPI_CURRENT_TYPE)));
255 if (not TRACE_smpi_view_internals())
256 TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, size * MPI_CURRENT_TYPE->size());
258 MPI_Request request = Request::isend(nullptr, size, MPI_CURRENT_TYPE, to, 0, MPI_COMM_WORLD);
260 TRACE_smpi_comm_out(my_proc_id);
262 get_reqq_self()->push_back(request);
264 log_timed_action (action, clock);
267 static void action_recv(const char *const *action) {
268 CHECK_ACTION_PARAMS(action, 2, 1)
269 int from = atoi(action[2]);
270 double size=parse_double(action[3]);
271 double clock = smpi_process()->simulated_elapsed();
275 MPI_CURRENT_TYPE=decode_datatype(action[4]);
277 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
279 int my_proc_id = Actor::self()->getPid();
280 int src_traced = MPI_COMM_WORLD->group()->actor(from)->getPid();
282 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
283 new simgrid::instr::Pt2PtTIData("recv", from, size, encode_datatype(MPI_CURRENT_TYPE)));
285 //unknown size from the receiver point of view
287 Request::probe(from, 0, MPI_COMM_WORLD, &status);
291 Request::recv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
293 TRACE_smpi_comm_out(my_proc_id);
294 if (not TRACE_smpi_view_internals()) {
295 TRACE_smpi_recv(src_traced, my_proc_id, 0);
298 log_timed_action (action, clock);
301 static void action_Irecv(const char *const *action)
303 CHECK_ACTION_PARAMS(action, 2, 1)
304 int from = atoi(action[2]);
305 double size=parse_double(action[3]);
306 double clock = smpi_process()->simulated_elapsed();
309 MPI_CURRENT_TYPE=decode_datatype(action[4]);
311 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
313 int my_proc_id = Actor::self()->getPid();
314 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
315 new simgrid::instr::Pt2PtTIData("Irecv", from, size, encode_datatype(MPI_CURRENT_TYPE)));
317 //unknow size from the receiver pov
319 Request::probe(from, 0, MPI_COMM_WORLD, &status);
323 MPI_Request request = Request::irecv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
325 TRACE_smpi_comm_out(my_proc_id);
326 get_reqq_self()->push_back(request);
328 log_timed_action (action, clock);
331 static void action_test(const char* const* action)
333 CHECK_ACTION_PARAMS(action, 0, 0)
334 double clock = smpi_process()->simulated_elapsed();
337 MPI_Request request = get_reqq_self()->back();
338 get_reqq_self()->pop_back();
339 //if request is null here, this may mean that a previous test has succeeded
340 //Different times in traced application and replayed version may lead to this
341 //In this case, ignore the extra calls.
342 if(request!=nullptr){
343 int my_proc_id = Actor::self()->getPid();
344 TRACE_smpi_testing_in(my_proc_id);
346 int flag = Request::test(&request, &status);
348 XBT_DEBUG("MPI_Test result: %d", flag);
349 /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now nullptr.*/
350 get_reqq_self()->push_back(request);
352 TRACE_smpi_testing_out(my_proc_id);
354 log_timed_action (action, clock);
357 static void action_wait(const char *const *action){
358 CHECK_ACTION_PARAMS(action, 0, 0)
359 double clock = smpi_process()->simulated_elapsed();
362 xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s",
363 xbt_str_join_array(action," "));
364 MPI_Request request = get_reqq_self()->back();
365 get_reqq_self()->pop_back();
367 if (request==nullptr){
368 /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just return.*/
372 int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
374 MPI_Group group = request->comm()->group();
375 int src_traced = group->rank(request->src());
376 int dst_traced = group->rank(request->dst());
377 int is_wait_for_receive = (request->flags() & RECV);
378 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::NoOpTIData("wait"));
380 Request::wait(&request, &status);
382 TRACE_smpi_comm_out(rank);
383 if (is_wait_for_receive)
384 TRACE_smpi_recv(src_traced, dst_traced, 0);
385 log_timed_action (action, clock);
388 static void action_waitall(const char *const *action){
389 CHECK_ACTION_PARAMS(action, 0, 0)
390 double clock = smpi_process()->simulated_elapsed();
391 const unsigned int count_requests = get_reqq_self()->size();
393 if (count_requests>0) {
394 MPI_Status status[count_requests];
396 int my_proc_id_traced = Actor::self()->getPid();
397 TRACE_smpi_comm_in(my_proc_id_traced, __FUNCTION__,
398 new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
399 int recvs_snd[count_requests];
400 int recvs_rcv[count_requests];
401 for (unsigned int i = 0; i < count_requests; i++) {
402 const auto& req = (*get_reqq_self())[i];
403 if (req && (req->flags() & RECV)) {
404 recvs_snd[i] = req->src();
405 recvs_rcv[i] = req->dst();
409 Request::waitall(count_requests, &(*get_reqq_self())[0], status);
411 for (unsigned i = 0; i < count_requests; i++) {
412 if (recvs_snd[i]!=-100)
413 TRACE_smpi_recv(recvs_snd[i], recvs_rcv[i],0);
415 TRACE_smpi_comm_out(my_proc_id_traced);
417 log_timed_action (action, clock);
420 static void action_barrier(const char *const *action){
421 double clock = smpi_process()->simulated_elapsed();
422 int my_proc_id = Actor::self()->getPid();
423 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::NoOpTIData("barrier"));
425 Colls::barrier(MPI_COMM_WORLD);
427 TRACE_smpi_comm_out(my_proc_id);
428 log_timed_action (action, clock);
431 static void action_bcast(const char *const *action)
433 CHECK_ACTION_PARAMS(action, 1, 2)
434 double size = parse_double(action[2]);
435 double clock = smpi_process()->simulated_elapsed();
437 /* Initialize MPI_CURRENT_TYPE in order to decrease the number of the checks */
438 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
441 root= atoi(action[3]);
443 MPI_CURRENT_TYPE=decode_datatype(action[4]);
446 int my_proc_id = Actor::self()->getPid();
447 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
448 new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(root)->getPid(), -1.0, size,
449 -1, encode_datatype(MPI_CURRENT_TYPE), ""));
451 void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
453 Colls::bcast(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
455 TRACE_smpi_comm_out(my_proc_id);
456 log_timed_action (action, clock);
459 static void action_reduce(const char *const *action)
461 CHECK_ACTION_PARAMS(action, 2, 2)
462 double comm_size = parse_double(action[2]);
463 double comp_size = parse_double(action[3]);
464 double clock = smpi_process()->simulated_elapsed();
466 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
469 root= atoi(action[4]);
471 MPI_CURRENT_TYPE=decode_datatype(action[5]);
474 int my_proc_id = Actor::self()->getPid();
475 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
476 new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(root)->getPid(), comp_size,
477 comm_size, -1, encode_datatype(MPI_CURRENT_TYPE), ""));
479 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
480 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
481 Colls::reduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
482 smpi_execute_flops(comp_size);
484 TRACE_smpi_comm_out(my_proc_id);
485 log_timed_action (action, clock);
488 static void action_allReduce(const char *const *action) {
489 CHECK_ACTION_PARAMS(action, 2, 1)
490 double comm_size = parse_double(action[2]);
491 double comp_size = parse_double(action[3]);
494 MPI_CURRENT_TYPE=decode_datatype(action[4]);
496 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
498 double clock = smpi_process()->simulated_elapsed();
499 int my_proc_id = Actor::self()->getPid();
500 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::CollTIData("allReduce", -1, comp_size, comm_size, -1,
501 encode_datatype(MPI_CURRENT_TYPE), ""));
503 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
504 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
505 Colls::allreduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
506 smpi_execute_flops(comp_size);
508 TRACE_smpi_comm_out(my_proc_id);
509 log_timed_action (action, clock);
512 static void action_allToAll(const char *const *action) {
513 CHECK_ACTION_PARAMS(action, 2, 2) //two mandatory (send and recv volumes) and two optional (corresponding datatypes)
514 double clock = smpi_process()->simulated_elapsed();
515 int comm_size = MPI_COMM_WORLD->size();
516 int send_size = parse_double(action[2]);
517 int recv_size = parse_double(action[3]);
518 MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
520 if(action[4] && action[5]) {
521 MPI_CURRENT_TYPE=decode_datatype(action[4]);
522 MPI_CURRENT_TYPE2=decode_datatype(action[5]);
525 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
527 void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* MPI_CURRENT_TYPE->size());
528 void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
530 int my_proc_id = Actor::self()->getPid();
531 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
532 new simgrid::instr::CollTIData("allToAll", -1, -1.0, send_size, recv_size,
533 encode_datatype(MPI_CURRENT_TYPE),
534 encode_datatype(MPI_CURRENT_TYPE2)));
536 Colls::alltoall(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
538 TRACE_smpi_comm_out(my_proc_id);
539 log_timed_action (action, clock);
542 static void action_gather(const char *const *action) {
543 /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
546 1) 68 is the sendcounts
547 2) 68 is the recvcounts
548 3) 0 is the root node
549 4) 0 is the send datatype id, see decode_datatype()
550 5) 0 is the recv datatype id, see decode_datatype()
552 CHECK_ACTION_PARAMS(action, 2, 3)
553 double clock = smpi_process()->simulated_elapsed();
554 int comm_size = MPI_COMM_WORLD->size();
555 int send_size = parse_double(action[2]);
556 int recv_size = parse_double(action[3]);
557 MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
558 if(action[4] && action[5]) {
559 MPI_CURRENT_TYPE=decode_datatype(action[5]);
560 MPI_CURRENT_TYPE2=decode_datatype(action[6]);
562 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
564 void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
565 void *recv = nullptr;
568 root=atoi(action[4]);
569 int rank = MPI_COMM_WORLD->rank();
572 recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
574 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
575 encode_datatype(MPI_CURRENT_TYPE),
576 encode_datatype(MPI_CURRENT_TYPE2)));
578 Colls::gather(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
580 TRACE_smpi_comm_out(Actor::self()->getPid());
581 log_timed_action (action, clock);
584 static void action_scatter(const char* const* action)
586 /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
589 1) 68 is the sendcounts
590 2) 68 is the recvcounts
591 3) 0 is the root node
592 4) 0 is the send datatype id, see decode_datatype()
593 5) 0 is the recv datatype id, see decode_datatype()
595 CHECK_ACTION_PARAMS(action, 2, 3)
596 double clock = smpi_process()->simulated_elapsed();
597 int comm_size = MPI_COMM_WORLD->size();
598 int send_size = parse_double(action[2]);
599 int recv_size = parse_double(action[3]);
600 MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
601 if (action[4] && action[5]) {
602 MPI_CURRENT_TYPE = decode_datatype(action[5]);
603 MPI_CURRENT_TYPE2 = decode_datatype(action[6]);
605 MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
607 void* send = smpi_get_tmp_sendbuffer(send_size * MPI_CURRENT_TYPE->size());
608 void* recv = nullptr;
611 root = atoi(action[4]);
612 int rank = MPI_COMM_WORLD->rank();
615 recv = smpi_get_tmp_recvbuffer(recv_size * comm_size * MPI_CURRENT_TYPE2->size());
617 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
618 encode_datatype(MPI_CURRENT_TYPE),
619 encode_datatype(MPI_CURRENT_TYPE2)));
621 Colls::scatter(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
623 TRACE_smpi_comm_out(Actor::self()->getPid());
624 log_timed_action(action, clock);
627 static void action_gatherv(const char *const *action) {
628 /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
629 0 gather 68 68 10 10 10 0 0 0
631 1) 68 is the sendcount
632 2) 68 10 10 10 is the recvcounts
633 3) 0 is the root node
634 4) 0 is the send datatype id, see decode_datatype()
635 5) 0 is the recv datatype id, see decode_datatype()
637 double clock = smpi_process()->simulated_elapsed();
638 int comm_size = MPI_COMM_WORLD->size();
639 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
640 int send_size = parse_double(action[2]);
641 int disps[comm_size];
642 int recvcounts[comm_size];
645 MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
646 if(action[4+comm_size] && action[5+comm_size]) {
647 MPI_CURRENT_TYPE=decode_datatype(action[4+comm_size]);
648 MPI_CURRENT_TYPE2=decode_datatype(action[5+comm_size]);
650 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
652 void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
653 void *recv = nullptr;
654 for(int i=0;i<comm_size;i++) {
655 recvcounts[i] = atoi(action[i+3]);
656 recv_sum=recv_sum+recvcounts[i];
660 int root=atoi(action[3+comm_size]);
661 int rank = MPI_COMM_WORLD->rank();
664 recv = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
666 std::vector<int>* trace_recvcounts = new std::vector<int>;
667 for (int i = 0; i < comm_size; i++) // copy data to avoid bad free
668 trace_recvcounts->push_back(recvcounts[i]);
670 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData(
671 "gatherV", root, send_size, nullptr, -1, trace_recvcounts,
672 encode_datatype(MPI_CURRENT_TYPE), encode_datatype(MPI_CURRENT_TYPE2)));
674 Colls::gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts, disps, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
676 TRACE_smpi_comm_out(Actor::self()->getPid());
677 log_timed_action (action, clock);
680 static void action_scatterv(const char* const* action)
682 /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
683 0 gather 68 10 10 10 68 0 0 0
685 1) 68 10 10 10 is the sendcounts
686 2) 68 is the recvcount
687 3) 0 is the root node
688 4) 0 is the send datatype id, see decode_datatype()
689 5) 0 is the recv datatype id, see decode_datatype()
691 double clock = smpi_process()->simulated_elapsed();
692 int comm_size = MPI_COMM_WORLD->size();
693 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
694 int recv_size = parse_double(action[2 + comm_size]);
695 int disps[comm_size];
696 int sendcounts[comm_size];
699 MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
700 if (action[4 + comm_size] && action[5 + comm_size]) {
701 MPI_CURRENT_TYPE = decode_datatype(action[4 + comm_size]);
702 MPI_CURRENT_TYPE2 = decode_datatype(action[5 + comm_size]);
704 MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
706 void* send = nullptr;
707 void* recv = smpi_get_tmp_recvbuffer(recv_size * MPI_CURRENT_TYPE->size());
708 for (int i = 0; i < comm_size; i++) {
709 sendcounts[i] = atoi(action[i + 2]);
710 send_sum += sendcounts[i];
714 int root = atoi(action[3 + comm_size]);
715 int rank = MPI_COMM_WORLD->rank();
718 send = smpi_get_tmp_sendbuffer(send_sum * MPI_CURRENT_TYPE2->size());
720 std::vector<int>* trace_sendcounts = new std::vector<int>;
721 for (int i = 0; i < comm_size; i++) // copy data to avoid bad free
722 trace_sendcounts->push_back(sendcounts[i]);
724 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData(
725 "gatherV", root, -1, trace_sendcounts, recv_size, nullptr,
726 encode_datatype(MPI_CURRENT_TYPE), encode_datatype(MPI_CURRENT_TYPE2)));
728 Colls::scatterv(send, sendcounts, disps, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
730 TRACE_smpi_comm_out(Actor::self()->getPid());
731 log_timed_action(action, clock);
734 static void action_reducescatter(const char *const *action) {
735 /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
736 0 reduceScatter 275427 275427 275427 204020 11346849 0
738 1) The first four values after the name of the action declare the recvcounts array
739 2) The value 11346849 is the amount of instructions
740 3) The last value corresponds to the datatype, see decode_datatype().
742 double clock = smpi_process()->simulated_elapsed();
743 int comm_size = MPI_COMM_WORLD->size();
744 CHECK_ACTION_PARAMS(action, comm_size+1, 1)
745 int comp_size = parse_double(action[2+comm_size]);
746 int recvcounts[comm_size];
747 int my_proc_id = Actor::self()->getPid();
749 std::vector<int>* trace_recvcounts = new std::vector<int>;
750 if(action[3+comm_size])
751 MPI_CURRENT_TYPE=decode_datatype(action[3+comm_size]);
753 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
755 for(int i=0;i<comm_size;i++) {
756 recvcounts[i] = atoi(action[i+2]);
757 trace_recvcounts->push_back(recvcounts[i]);
761 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
762 new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, trace_recvcounts,
763 std::to_string(comp_size), /* ugly hack to print comp_size */
764 encode_datatype(MPI_CURRENT_TYPE)));
766 void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
767 void *recvbuf = smpi_get_tmp_recvbuffer(size* MPI_CURRENT_TYPE->size());
769 Colls::reduce_scatter(sendbuf, recvbuf, recvcounts, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
770 smpi_execute_flops(comp_size);
772 TRACE_smpi_comm_out(my_proc_id);
773 log_timed_action (action, clock);
776 static void action_allgather(const char *const *action) {
777 /* The structure of the allgather action for the rank 0 (total 4 processes) is the following:
778 0 allGather 275427 275427
780 1) 275427 is the sendcount
781 2) 275427 is the recvcount
782 3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
784 double clock = smpi_process()->simulated_elapsed();
786 CHECK_ACTION_PARAMS(action, 2, 2)
787 int sendcount=atoi(action[2]);
788 int recvcount=atoi(action[3]);
790 MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
792 if(action[4] && action[5]) {
793 MPI_CURRENT_TYPE = decode_datatype(action[4]);
794 MPI_CURRENT_TYPE2 = decode_datatype(action[5]);
796 MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
798 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
799 void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* MPI_CURRENT_TYPE2->size());
801 int my_proc_id = Actor::self()->getPid();
803 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
804 new simgrid::instr::CollTIData("allGather", -1, -1.0, sendcount, recvcount,
805 encode_datatype(MPI_CURRENT_TYPE),
806 encode_datatype(MPI_CURRENT_TYPE2)));
808 Colls::allgather(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
810 TRACE_smpi_comm_out(my_proc_id);
811 log_timed_action (action, clock);
814 static void action_allgatherv(const char *const *action) {
815 /* The structure of the allgatherv action for the rank 0 (total 4 processes) is the following:
816 0 allGatherV 275427 275427 275427 275427 204020
818 1) 275427 is the sendcount
819 2) The next four elements declare the recvcounts array
820 3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
822 double clock = smpi_process()->simulated_elapsed();
824 int comm_size = MPI_COMM_WORLD->size();
825 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
826 int sendcount=atoi(action[2]);
827 int recvcounts[comm_size];
828 int disps[comm_size];
830 MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
832 if(action[3+comm_size] && action[4+comm_size]) {
833 MPI_CURRENT_TYPE = decode_datatype(action[3+comm_size]);
834 MPI_CURRENT_TYPE2 = decode_datatype(action[4+comm_size]);
836 MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
838 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
840 for(int i=0;i<comm_size;i++) {
841 recvcounts[i] = atoi(action[i+3]);
842 recv_sum=recv_sum+recvcounts[i];
845 void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
847 int my_proc_id = Actor::self()->getPid();
849 std::vector<int>* trace_recvcounts = new std::vector<int>;
850 for (int i = 0; i < comm_size; i++) // copy data to avoid bad free
851 trace_recvcounts->push_back(recvcounts[i]);
853 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
854 new simgrid::instr::VarCollTIData("allGatherV", -1, sendcount, nullptr, -1, trace_recvcounts,
855 encode_datatype(MPI_CURRENT_TYPE),
856 encode_datatype(MPI_CURRENT_TYPE2)));
858 Colls::allgatherv(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts, disps, MPI_CURRENT_TYPE2,
861 TRACE_smpi_comm_out(my_proc_id);
862 log_timed_action (action, clock);
865 static void action_allToAllv(const char *const *action) {
866 /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
867 0 allToAllV 100 1 7 10 12 100 1 70 10 5
869 1) 100 is the size of the send buffer *sizeof(int),
870 2) 1 7 10 12 is the sendcounts array
871 3) 100*sizeof(int) is the size of the receiver buffer
872 4) 1 70 10 5 is the recvcounts array
874 double clock = smpi_process()->simulated_elapsed();
876 int comm_size = MPI_COMM_WORLD->size();
877 CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
880 int sendcounts[comm_size];
881 std::vector<int>* trace_sendcounts = new std::vector<int>;
882 int recvcounts[comm_size];
883 std::vector<int>* trace_recvcounts = new std::vector<int>;
884 int senddisps[comm_size];
885 int recvdisps[comm_size];
887 MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
889 int send_buf_size=parse_double(action[2]);
890 int recv_buf_size=parse_double(action[3+comm_size]);
891 if(action[4+2*comm_size] && action[5+2*comm_size]) {
892 MPI_CURRENT_TYPE=decode_datatype(action[4+2*comm_size]);
893 MPI_CURRENT_TYPE2=decode_datatype(action[5+2*comm_size]);
896 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
898 int my_proc_id = Actor::self()->getPid();
899 void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* MPI_CURRENT_TYPE->size());
900 void *recvbuf = smpi_get_tmp_recvbuffer(recv_buf_size* MPI_CURRENT_TYPE2->size());
902 for(int i=0;i<comm_size;i++) {
903 sendcounts[i] = atoi(action[i+3]);
904 trace_sendcounts->push_back(sendcounts[i]);
905 send_size += sendcounts[i];
906 recvcounts[i] = atoi(action[i+4+comm_size]);
907 trace_recvcounts->push_back(recvcounts[i]);
908 recv_size += recvcounts[i];
913 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
914 new simgrid::instr::VarCollTIData("allToAllV", -1, send_size, trace_sendcounts, recv_size,
915 trace_recvcounts, encode_datatype(MPI_CURRENT_TYPE),
916 encode_datatype(MPI_CURRENT_TYPE2)));
918 Colls::alltoallv(sendbuf, sendcounts, senddisps, MPI_CURRENT_TYPE,recvbuf, recvcounts, recvdisps,
919 MPI_CURRENT_TYPE, MPI_COMM_WORLD);
921 TRACE_smpi_comm_out(my_proc_id);
922 log_timed_action (action, clock);
925 }} // namespace simgrid::smpi
927 /** @brief Only initialize the replay, don't do it for real */
928 void smpi_replay_init(int* argc, char*** argv)
930 simgrid::smpi::Process::init(argc, argv);
931 smpi_process()->mark_as_initialized();
932 smpi_process()->set_replaying(true);
934 int my_proc_id = Actor::self()->getPid();
935 TRACE_smpi_init(my_proc_id);
936 TRACE_smpi_computing_init(my_proc_id);
937 TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
938 TRACE_smpi_comm_out(my_proc_id);
939 xbt_replay_action_register("init", simgrid::smpi::action_init);
940 xbt_replay_action_register("finalize", simgrid::smpi::action_finalize);
941 xbt_replay_action_register("comm_size", simgrid::smpi::action_comm_size);
942 xbt_replay_action_register("comm_split", simgrid::smpi::action_comm_split);
943 xbt_replay_action_register("comm_dup", simgrid::smpi::action_comm_dup);
944 xbt_replay_action_register("send", simgrid::smpi::action_send);
945 xbt_replay_action_register("Isend", simgrid::smpi::action_Isend);
946 xbt_replay_action_register("recv", simgrid::smpi::action_recv);
947 xbt_replay_action_register("Irecv", simgrid::smpi::action_Irecv);
948 xbt_replay_action_register("test", simgrid::smpi::action_test);
949 xbt_replay_action_register("wait", simgrid::smpi::action_wait);
950 xbt_replay_action_register("waitAll", simgrid::smpi::action_waitall);
951 xbt_replay_action_register("barrier", simgrid::smpi::action_barrier);
952 xbt_replay_action_register("bcast", simgrid::smpi::action_bcast);
953 xbt_replay_action_register("reduce", simgrid::smpi::action_reduce);
954 xbt_replay_action_register("allReduce", simgrid::smpi::action_allReduce);
955 xbt_replay_action_register("allToAll", simgrid::smpi::action_allToAll);
956 xbt_replay_action_register("allToAllV", simgrid::smpi::action_allToAllv);
957 xbt_replay_action_register("gather", simgrid::smpi::action_gather);
958 xbt_replay_action_register("scatter", simgrid::smpi::action_scatter);
959 xbt_replay_action_register("gatherV", simgrid::smpi::action_gatherv);
960 xbt_replay_action_register("scatterV", simgrid::smpi::action_scatterv);
961 xbt_replay_action_register("allGather", simgrid::smpi::action_allgather);
962 xbt_replay_action_register("allGatherV", simgrid::smpi::action_allgatherv);
963 xbt_replay_action_register("reduceScatter", simgrid::smpi::action_reducescatter);
964 xbt_replay_action_register("compute", simgrid::smpi::action_compute);
966 //if we have a delayed start, sleep here.
968 double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
969 XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
970 smpi_execute_flops(value);
972 //UGLY: force a context switch to be sure that all MSG_processes begin initialization
973 XBT_DEBUG("Force context switch by smpi_execute_flops - Sleeping for 0.0 flops ");
974 smpi_execute_flops(0.0);
978 /** @brief actually run the replay after initialization */
979 void smpi_replay_main(int* argc, char*** argv)
981 simgrid::xbt::replay_runner(*argc, *argv);
983 /* and now, finalize everything */
984 /* One active process will stop. Decrease the counter*/
985 XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
986 if (not get_reqq_self()->empty()) {
987 unsigned int count_requests=get_reqq_self()->size();
988 MPI_Request requests[count_requests];
989 MPI_Status status[count_requests];
992 for (auto const& req : *get_reqq_self()) {
996 simgrid::smpi::Request::waitall(count_requests, requests, status);
998 delete get_reqq_self();
1001 if(active_processes==0){
1002 /* Last process alive speaking: end the simulated timer */
1003 XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
1004 xbt_free(sendbuffer);
1005 xbt_free(recvbuffer);
1008 TRACE_smpi_comm_in(Actor::self()->getPid(), "smpi_replay_run_finalize", new simgrid::instr::NoOpTIData("finalize"));
1010 smpi_process()->finalize();
1012 TRACE_smpi_comm_out(Actor::self()->getPid());
1013 TRACE_smpi_finalize(Actor::self()->getPid());
1016 /** @brief chain a replay initialization and a replay start */
1017 void smpi_replay_run(int* argc, char*** argv)
1019 smpi_replay_init(argc, argv);
1020 smpi_replay_main(argc, argv);