1 /* Copyright (c) 2009-2017. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
15 #include <unordered_map>
18 #define KEY_SIZE (sizeof(int) * 2 + 1)
20 using simgrid::s4u::Actor;
22 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
24 static int communicator_size = 0;
25 static int active_processes = 0;
26 static std::unordered_map<int, std::vector<MPI_Request>*> reqq;
28 static MPI_Datatype MPI_DEFAULT_TYPE;
29 static MPI_Datatype MPI_CURRENT_TYPE;
31 static int sendbuffer_size = 0;
32 static char* sendbuffer = nullptr;
33 static int recvbuffer_size = 0;
34 static char* recvbuffer = nullptr;
36 static void log_timed_action (const char *const *action, double clock){
37 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
38 char *name = xbt_str_join_array(action, " ");
39 XBT_VERB("%s %f", name, smpi_process()->simulated_elapsed()-clock);
44 static std::vector<MPI_Request>* get_reqq_self()
46 return reqq.at(Actor::self()->getPid());
49 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
51 reqq.insert({Actor::self()->getPid(), mpi_request});
54 //allocate a single buffer for all sends, growing it if needed
55 void* smpi_get_tmp_sendbuffer(int size)
57 if (not smpi_process()->replaying())
58 return xbt_malloc(size);
59 if (sendbuffer_size<size){
60 sendbuffer=static_cast<char*>(xbt_realloc(sendbuffer,size));
66 //allocate a single buffer for all recv
67 void* smpi_get_tmp_recvbuffer(int size){
68 if (not smpi_process()->replaying())
69 return xbt_malloc(size);
70 if (recvbuffer_size<size){
71 recvbuffer=static_cast<char*>(xbt_realloc(recvbuffer,size));
77 void smpi_free_tmp_buffer(void* buf){
78 if (not smpi_process()->replaying())
83 static double parse_double(const char *string)
86 double value = strtod(string, &endptr);
88 THROWF(unknown_error, 0, "%s is not a double", string);
93 //TODO: this logic should be moved inside the datatype class, to support all predefined types and get rid of is_replayable.
94 static MPI_Datatype decode_datatype(const char *const action)
96 switch(atoi(action)) {
98 MPI_CURRENT_TYPE=MPI_DOUBLE;
101 MPI_CURRENT_TYPE=MPI_INT;
104 MPI_CURRENT_TYPE=MPI_CHAR;
107 MPI_CURRENT_TYPE=MPI_SHORT;
110 MPI_CURRENT_TYPE=MPI_LONG;
113 MPI_CURRENT_TYPE=MPI_FLOAT;
116 MPI_CURRENT_TYPE=MPI_BYTE;
119 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
122 return MPI_CURRENT_TYPE;
125 const char* encode_datatype(MPI_Datatype datatype)
127 if (datatype==MPI_BYTE)
129 if(datatype==MPI_DOUBLE)
131 if(datatype==MPI_INT)
133 if(datatype==MPI_CHAR)
135 if(datatype==MPI_SHORT)
137 if(datatype==MPI_LONG)
139 if(datatype==MPI_FLOAT)
141 // default - not implemented.
142 // do not warn here as we pass in this function even for other trace formats
146 #define CHECK_ACTION_PARAMS(action, mandatory, optional) {\
148 while(action[i]!=nullptr)\
151 THROWF(arg_error, 0, "%s replay failed.\n" \
152 "%d items were given on the line. First two should be process_id and action. " \
153 "This action needs after them %d mandatory arguments, and accepts %d optional ones. \n" \
154 "Please contact the Simgrid team if support is needed", __FUNCTION__, i, mandatory, optional);\
160 static void action_init(const char *const *action)
162 XBT_DEBUG("Initialize the counters");
163 CHECK_ACTION_PARAMS(action, 0, 1)
165 MPI_DEFAULT_TYPE = MPI_DOUBLE; // default MPE datatype
167 MPI_DEFAULT_TYPE = MPI_BYTE; // default TAU datatype
169 /* start a simulated timer */
170 smpi_process()->simulated_start();
171 /*initialize the number of active processes */
172 active_processes = smpi_process_count();
174 set_reqq_self(new std::vector<MPI_Request>);
177 static void action_finalize(const char *const *action)
182 static void action_comm_size(const char *const *action)
184 communicator_size = parse_double(action[2]);
185 log_timed_action (action, smpi_process()->simulated_elapsed());
188 static void action_comm_split(const char *const *action)
190 log_timed_action (action, smpi_process()->simulated_elapsed());
193 static void action_comm_dup(const char *const *action)
195 log_timed_action (action, smpi_process()->simulated_elapsed());
198 static void action_compute(const char *const *action)
200 CHECK_ACTION_PARAMS(action, 1, 0)
201 double clock = smpi_process()->simulated_elapsed();
202 double flops= parse_double(action[2]);
203 int my_proc_id = Actor::self()->getPid();
205 TRACE_smpi_computing_in(my_proc_id, flops);
206 smpi_execute_flops(flops);
207 TRACE_smpi_computing_out(my_proc_id);
209 log_timed_action (action, clock);
212 static void action_send(const char *const *action)
214 CHECK_ACTION_PARAMS(action, 2, 1)
215 int to = atoi(action[2]);
216 double size=parse_double(action[3]);
217 double clock = smpi_process()->simulated_elapsed();
219 MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
221 int my_proc_id = Actor::self()->getPid();
222 int dst_traced = MPI_COMM_WORLD->group()->actor(to)->getPid();
224 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
225 new simgrid::instr::Pt2PtTIData("send", to, size, encode_datatype(MPI_CURRENT_TYPE)));
226 if (not TRACE_smpi_view_internals())
227 TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, size * MPI_CURRENT_TYPE->size());
229 Request::send(nullptr, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
231 TRACE_smpi_comm_out(my_proc_id);
233 log_timed_action(action, clock);
236 static void action_Isend(const char *const *action)
238 CHECK_ACTION_PARAMS(action, 2, 1)
239 int to = atoi(action[2]);
240 double size=parse_double(action[3]);
241 double clock = smpi_process()->simulated_elapsed();
243 MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
245 int my_proc_id = Actor::self()->getPid();
246 int dst_traced = MPI_COMM_WORLD->group()->actor(to)->getPid();
247 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
248 new simgrid::instr::Pt2PtTIData("Isend", to, size, encode_datatype(MPI_CURRENT_TYPE)));
249 if (not TRACE_smpi_view_internals())
250 TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, size * MPI_CURRENT_TYPE->size());
252 MPI_Request request = Request::isend(nullptr, size, MPI_CURRENT_TYPE, to, 0, MPI_COMM_WORLD);
254 TRACE_smpi_comm_out(my_proc_id);
256 get_reqq_self()->push_back(request);
258 log_timed_action (action, clock);
261 static void action_recv(const char *const *action) {
262 CHECK_ACTION_PARAMS(action, 2, 1)
263 int from = atoi(action[2]);
264 double size=parse_double(action[3]);
265 double clock = smpi_process()->simulated_elapsed();
268 MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
270 int my_proc_id = Actor::self()->getPid();
271 int src_traced = MPI_COMM_WORLD->group()->actor(from)->getPid();
273 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
274 new simgrid::instr::Pt2PtTIData("recv", from, size, encode_datatype(MPI_CURRENT_TYPE)));
276 //unknown size from the receiver point of view
278 Request::probe(from, 0, MPI_COMM_WORLD, &status);
282 Request::recv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
284 TRACE_smpi_comm_out(my_proc_id);
285 if (not TRACE_smpi_view_internals()) {
286 TRACE_smpi_recv(src_traced, my_proc_id, 0);
289 log_timed_action (action, clock);
292 static void action_Irecv(const char *const *action)
294 CHECK_ACTION_PARAMS(action, 2, 1)
295 int from = atoi(action[2]);
296 double size=parse_double(action[3]);
297 double clock = smpi_process()->simulated_elapsed();
299 MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
301 int my_proc_id = Actor::self()->getPid();
302 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
303 new simgrid::instr::Pt2PtTIData("Irecv", from, size, encode_datatype(MPI_CURRENT_TYPE)));
305 //unknow size from the receiver pov
307 Request::probe(from, 0, MPI_COMM_WORLD, &status);
311 MPI_Request request = Request::irecv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
313 TRACE_smpi_comm_out(my_proc_id);
314 get_reqq_self()->push_back(request);
316 log_timed_action (action, clock);
319 static void action_test(const char* const* action)
321 CHECK_ACTION_PARAMS(action, 0, 0)
322 double clock = smpi_process()->simulated_elapsed();
325 MPI_Request request = get_reqq_self()->back();
326 get_reqq_self()->pop_back();
327 //if request is null here, this may mean that a previous test has succeeded
328 //Different times in traced application and replayed version may lead to this
329 //In this case, ignore the extra calls.
330 if(request!=nullptr){
331 int my_proc_id = Actor::self()->getPid();
332 TRACE_smpi_testing_in(my_proc_id);
334 int flag = Request::test(&request, &status);
336 XBT_DEBUG("MPI_Test result: %d", flag);
337 /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now nullptr.*/
338 get_reqq_self()->push_back(request);
340 TRACE_smpi_testing_out(my_proc_id);
342 log_timed_action (action, clock);
345 static void action_wait(const char *const *action){
346 CHECK_ACTION_PARAMS(action, 0, 0)
347 double clock = smpi_process()->simulated_elapsed();
350 xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s",
351 xbt_str_join_array(action," "));
352 MPI_Request request = get_reqq_self()->back();
353 get_reqq_self()->pop_back();
355 if (request==nullptr){
356 /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just return.*/
360 int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
362 MPI_Group group = request->comm()->group();
363 int src_traced = group->rank(request->src());
364 int dst_traced = group->rank(request->dst());
365 int is_wait_for_receive = (request->flags() & RECV);
366 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::NoOpTIData("wait"));
368 Request::wait(&request, &status);
370 TRACE_smpi_comm_out(rank);
371 if (is_wait_for_receive)
372 TRACE_smpi_recv(src_traced, dst_traced, 0);
373 log_timed_action (action, clock);
376 static void action_waitall(const char *const *action){
377 CHECK_ACTION_PARAMS(action, 0, 0)
378 double clock = smpi_process()->simulated_elapsed();
379 const unsigned int count_requests = get_reqq_self()->size();
381 if (count_requests>0) {
382 MPI_Status status[count_requests];
384 int my_proc_id_traced = Actor::self()->getPid();
385 TRACE_smpi_comm_in(my_proc_id_traced, __FUNCTION__,
386 new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
387 int recvs_snd[count_requests];
388 int recvs_rcv[count_requests];
389 for (unsigned int i = 0; i < count_requests; i++) {
390 const auto& req = (*get_reqq_self())[i];
391 if (req && (req->flags() & RECV)) {
392 recvs_snd[i] = req->src();
393 recvs_rcv[i] = req->dst();
397 Request::waitall(count_requests, &(*get_reqq_self())[0], status);
399 for (unsigned i = 0; i < count_requests; i++) {
400 if (recvs_snd[i]!=-100)
401 TRACE_smpi_recv(recvs_snd[i], recvs_rcv[i],0);
403 TRACE_smpi_comm_out(my_proc_id_traced);
405 log_timed_action (action, clock);
408 static void action_barrier(const char *const *action){
409 double clock = smpi_process()->simulated_elapsed();
410 int my_proc_id = Actor::self()->getPid();
411 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::NoOpTIData("barrier"));
413 Colls::barrier(MPI_COMM_WORLD);
415 TRACE_smpi_comm_out(my_proc_id);
416 log_timed_action (action, clock);
419 static void action_bcast(const char *const *action)
421 CHECK_ACTION_PARAMS(action, 1, 2)
422 double size = parse_double(action[2]);
423 double clock = smpi_process()->simulated_elapsed();
424 int root = (action[3]) ? atoi(action[3]) : 0;
425 /* Initialize MPI_CURRENT_TYPE in order to decrease the number of the checks */
426 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
428 MPI_CURRENT_TYPE = (action[3] && action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
430 int my_proc_id = Actor::self()->getPid();
431 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
432 new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(root)->getPid(), -1.0, size,
433 -1, encode_datatype(MPI_CURRENT_TYPE), ""));
435 void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
437 Colls::bcast(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
439 TRACE_smpi_comm_out(my_proc_id);
440 log_timed_action (action, clock);
443 static void action_reduce(const char *const *action)
445 CHECK_ACTION_PARAMS(action, 2, 2)
446 double comm_size = parse_double(action[2]);
447 double comp_size = parse_double(action[3]);
448 double clock = smpi_process()->simulated_elapsed();
449 int root = (action[4]) ? atoi(action[4]) : 0;
451 MPI_CURRENT_TYPE = (action[4] && action[5]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE;
453 int my_proc_id = Actor::self()->getPid();
454 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
455 new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(root)->getPid(), comp_size,
456 comm_size, -1, encode_datatype(MPI_CURRENT_TYPE), ""));
458 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
459 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
460 Colls::reduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
461 smpi_execute_flops(comp_size);
463 TRACE_smpi_comm_out(my_proc_id);
464 log_timed_action (action, clock);
467 static void action_allReduce(const char *const *action) {
468 CHECK_ACTION_PARAMS(action, 2, 1)
469 double comm_size = parse_double(action[2]);
470 double comp_size = parse_double(action[3]);
472 MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
474 double clock = smpi_process()->simulated_elapsed();
475 int my_proc_id = Actor::self()->getPid();
476 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::CollTIData("allReduce", -1, comp_size, comm_size, -1,
477 encode_datatype(MPI_CURRENT_TYPE), ""));
479 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
480 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
481 Colls::allreduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
482 smpi_execute_flops(comp_size);
484 TRACE_smpi_comm_out(my_proc_id);
485 log_timed_action (action, clock);
488 static void action_allToAll(const char *const *action) {
489 CHECK_ACTION_PARAMS(action, 2, 2) //two mandatory (send and recv volumes) and two optional (corresponding datatypes)
490 double clock = smpi_process()->simulated_elapsed();
491 int comm_size = MPI_COMM_WORLD->size();
492 int send_size = parse_double(action[2]);
493 int recv_size = parse_double(action[3]);
494 MPI_CURRENT_TYPE = (action[4] && action[5]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
495 MPI_Datatype MPI_CURRENT_TYPE2{(action[4] && action[5]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE};
497 void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* MPI_CURRENT_TYPE->size());
498 void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
500 int my_proc_id = Actor::self()->getPid();
501 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
502 new simgrid::instr::CollTIData("allToAll", -1, -1.0, send_size, recv_size,
503 encode_datatype(MPI_CURRENT_TYPE),
504 encode_datatype(MPI_CURRENT_TYPE2)));
506 Colls::alltoall(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
508 TRACE_smpi_comm_out(my_proc_id);
509 log_timed_action (action, clock);
512 static void action_gather(const char *const *action) {
513 /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
516 1) 68 is the sendcounts
517 2) 68 is the recvcounts
518 3) 0 is the root node
519 4) 0 is the send datatype id, see decode_datatype()
520 5) 0 is the recv datatype id, see decode_datatype()
522 CHECK_ACTION_PARAMS(action, 2, 3)
523 double clock = smpi_process()->simulated_elapsed();
524 int comm_size = MPI_COMM_WORLD->size();
525 int send_size = parse_double(action[2]);
526 int recv_size = parse_double(action[3]);
527 MPI_CURRENT_TYPE = (action[5] && action[6]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE;
528 MPI_Datatype MPI_CURRENT_TYPE2{(action[5] && action[6]) ? decode_datatype(action[6]) : MPI_DEFAULT_TYPE};
530 void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
531 void *recv = nullptr;
532 int root = (action[4]) ? atoi(action[4]) : 0;
533 int rank = MPI_COMM_WORLD->rank();
536 recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
538 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
539 encode_datatype(MPI_CURRENT_TYPE),
540 encode_datatype(MPI_CURRENT_TYPE2)));
542 Colls::gather(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
544 TRACE_smpi_comm_out(Actor::self()->getPid());
545 log_timed_action (action, clock);
548 static void action_scatter(const char* const* action)
550 /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
553 1) 68 is the sendcounts
554 2) 68 is the recvcounts
555 3) 0 is the root node
556 4) 0 is the send datatype id, see decode_datatype()
557 5) 0 is the recv datatype id, see decode_datatype()
559 CHECK_ACTION_PARAMS(action, 2, 3)
560 double clock = smpi_process()->simulated_elapsed();
561 int comm_size = MPI_COMM_WORLD->size();
562 int send_size = parse_double(action[2]);
563 int recv_size = parse_double(action[3]);
564 MPI_CURRENT_TYPE = (action[5] && action[6]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE;
565 MPI_Datatype MPI_CURRENT_TYPE2{(action[5] && action[6]) ? decode_datatype(action[6]) : MPI_DEFAULT_TYPE};
567 void* send = smpi_get_tmp_sendbuffer(send_size * MPI_CURRENT_TYPE->size());
568 void* recv = nullptr;
569 int root = (action[4]) ? atoi(action[4]) : 0;
570 int rank = MPI_COMM_WORLD->rank();
573 recv = smpi_get_tmp_recvbuffer(recv_size * comm_size * MPI_CURRENT_TYPE2->size());
575 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
576 encode_datatype(MPI_CURRENT_TYPE),
577 encode_datatype(MPI_CURRENT_TYPE2)));
579 Colls::scatter(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
581 TRACE_smpi_comm_out(Actor::self()->getPid());
582 log_timed_action(action, clock);
585 static void action_gatherv(const char *const *action) {
586 /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
587 0 gather 68 68 10 10 10 0 0 0
589 1) 68 is the sendcount
590 2) 68 10 10 10 is the recvcounts
591 3) 0 is the root node
592 4) 0 is the send datatype id, see decode_datatype()
593 5) 0 is the recv datatype id, see decode_datatype()
595 double clock = smpi_process()->simulated_elapsed();
596 int comm_size = MPI_COMM_WORLD->size();
597 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
598 int send_size = parse_double(action[2]);
599 int disps[comm_size];
600 int recvcounts[comm_size];
604 (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
605 MPI_Datatype MPI_CURRENT_TYPE2{
606 (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
608 void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
609 void *recv = nullptr;
610 for(int i=0;i<comm_size;i++) {
611 recvcounts[i] = atoi(action[i+3]);
612 recv_sum=recv_sum+recvcounts[i];
616 int root = (action[3 + comm_size]) ? atoi(action[3 + comm_size]) : 0;
617 int rank = MPI_COMM_WORLD->rank();
620 recv = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
622 std::vector<int>* trace_recvcounts = new std::vector<int>(recvcounts, recvcounts + comm_size);
624 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData(
625 "gatherV", root, send_size, nullptr, -1, trace_recvcounts,
626 encode_datatype(MPI_CURRENT_TYPE), encode_datatype(MPI_CURRENT_TYPE2)));
628 Colls::gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts, disps, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
630 TRACE_smpi_comm_out(Actor::self()->getPid());
631 log_timed_action (action, clock);
634 static void action_scatterv(const char* const* action)
636 /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
637 0 gather 68 10 10 10 68 0 0 0
639 1) 68 10 10 10 is the sendcounts
640 2) 68 is the recvcount
641 3) 0 is the root node
642 4) 0 is the send datatype id, see decode_datatype()
643 5) 0 is the recv datatype id, see decode_datatype()
645 double clock = smpi_process()->simulated_elapsed();
646 int comm_size = MPI_COMM_WORLD->size();
647 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
648 int recv_size = parse_double(action[2 + comm_size]);
649 int disps[comm_size];
650 int sendcounts[comm_size];
654 (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
655 MPI_Datatype MPI_CURRENT_TYPE2{
656 (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
658 void* send = nullptr;
659 void* recv = smpi_get_tmp_recvbuffer(recv_size * MPI_CURRENT_TYPE->size());
660 for (int i = 0; i < comm_size; i++) {
661 sendcounts[i] = atoi(action[i + 2]);
662 send_sum += sendcounts[i];
666 int root = (action[3 + comm_size]) ? atoi(action[3 + comm_size]) : 0;
667 int rank = MPI_COMM_WORLD->rank();
670 send = smpi_get_tmp_sendbuffer(send_sum * MPI_CURRENT_TYPE2->size());
672 std::vector<int>* trace_sendcounts = new std::vector<int>(sendcounts, sendcounts + comm_size);
674 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData(
675 "gatherV", root, -1, trace_sendcounts, recv_size, nullptr,
676 encode_datatype(MPI_CURRENT_TYPE), encode_datatype(MPI_CURRENT_TYPE2)));
678 Colls::scatterv(send, sendcounts, disps, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
680 TRACE_smpi_comm_out(Actor::self()->getPid());
681 log_timed_action(action, clock);
684 static void action_reducescatter(const char *const *action) {
685 /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
686 0 reduceScatter 275427 275427 275427 204020 11346849 0
688 1) The first four values after the name of the action declare the recvcounts array
689 2) The value 11346849 is the amount of instructions
690 3) The last value corresponds to the datatype, see decode_datatype().
692 double clock = smpi_process()->simulated_elapsed();
693 int comm_size = MPI_COMM_WORLD->size();
694 CHECK_ACTION_PARAMS(action, comm_size+1, 1)
695 int comp_size = parse_double(action[2+comm_size]);
696 int recvcounts[comm_size];
697 int my_proc_id = Actor::self()->getPid();
699 std::vector<int>* trace_recvcounts = new std::vector<int>;
700 MPI_CURRENT_TYPE = (action[3 + comm_size]) ? decode_datatype(action[3 + comm_size]) : MPI_DEFAULT_TYPE;
702 for(int i=0;i<comm_size;i++) {
703 recvcounts[i] = atoi(action[i+2]);
704 trace_recvcounts->push_back(recvcounts[i]);
708 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
709 new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, trace_recvcounts,
710 std::to_string(comp_size), /* ugly hack to print comp_size */
711 encode_datatype(MPI_CURRENT_TYPE)));
713 void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
714 void *recvbuf = smpi_get_tmp_recvbuffer(size* MPI_CURRENT_TYPE->size());
716 Colls::reduce_scatter(sendbuf, recvbuf, recvcounts, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
717 smpi_execute_flops(comp_size);
719 TRACE_smpi_comm_out(my_proc_id);
720 log_timed_action (action, clock);
723 static void action_allgather(const char *const *action) {
724 /* The structure of the allgather action for the rank 0 (total 4 processes) is the following:
725 0 allGather 275427 275427
727 1) 275427 is the sendcount
728 2) 275427 is the recvcount
729 3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
731 double clock = smpi_process()->simulated_elapsed();
733 CHECK_ACTION_PARAMS(action, 2, 2)
734 int sendcount=atoi(action[2]);
735 int recvcount=atoi(action[3]);
737 MPI_CURRENT_TYPE = (action[4] && action[5]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
738 MPI_Datatype MPI_CURRENT_TYPE2{(action[4] && action[5]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE};
740 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
741 void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* MPI_CURRENT_TYPE2->size());
743 int my_proc_id = Actor::self()->getPid();
745 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
746 new simgrid::instr::CollTIData("allGather", -1, -1.0, sendcount, recvcount,
747 encode_datatype(MPI_CURRENT_TYPE),
748 encode_datatype(MPI_CURRENT_TYPE2)));
750 Colls::allgather(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
752 TRACE_smpi_comm_out(my_proc_id);
753 log_timed_action (action, clock);
756 static void action_allgatherv(const char *const *action) {
757 /* The structure of the allgatherv action for the rank 0 (total 4 processes) is the following:
758 0 allGatherV 275427 275427 275427 275427 204020
760 1) 275427 is the sendcount
761 2) The next four elements declare the recvcounts array
762 3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
764 double clock = smpi_process()->simulated_elapsed();
766 int comm_size = MPI_COMM_WORLD->size();
767 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
768 int sendcount=atoi(action[2]);
769 int recvcounts[comm_size];
770 int disps[comm_size];
774 (action[3 + comm_size] && action[4 + comm_size]) ? decode_datatype(action[3 + comm_size]) : MPI_DEFAULT_TYPE;
775 MPI_Datatype MPI_CURRENT_TYPE2{
776 (action[3 + comm_size] && action[4 + comm_size]) ? decode_datatype(action[4 + comm_size]) : MPI_DEFAULT_TYPE};
778 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
780 for(int i=0;i<comm_size;i++) {
781 recvcounts[i] = atoi(action[i+3]);
782 recv_sum=recv_sum+recvcounts[i];
785 void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
787 int my_proc_id = Actor::self()->getPid();
789 std::vector<int>* trace_recvcounts = new std::vector<int>(recvcounts, recvcounts + comm_size);
791 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
792 new simgrid::instr::VarCollTIData("allGatherV", -1, sendcount, nullptr, -1, trace_recvcounts,
793 encode_datatype(MPI_CURRENT_TYPE),
794 encode_datatype(MPI_CURRENT_TYPE2)));
796 Colls::allgatherv(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts, disps, MPI_CURRENT_TYPE2,
799 TRACE_smpi_comm_out(my_proc_id);
800 log_timed_action (action, clock);
803 static void action_allToAllv(const char *const *action) {
804 /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
805 0 allToAllV 100 1 7 10 12 100 1 70 10 5
807 1) 100 is the size of the send buffer *sizeof(int),
808 2) 1 7 10 12 is the sendcounts array
809 3) 100*sizeof(int) is the size of the receiver buffer
810 4) 1 70 10 5 is the recvcounts array
812 double clock = smpi_process()->simulated_elapsed();
814 int comm_size = MPI_COMM_WORLD->size();
815 CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
818 int sendcounts[comm_size];
819 std::vector<int>* trace_sendcounts = new std::vector<int>;
820 int recvcounts[comm_size];
821 std::vector<int>* trace_recvcounts = new std::vector<int>;
822 int senddisps[comm_size];
823 int recvdisps[comm_size];
825 MPI_CURRENT_TYPE = (action[4 + 2 * comm_size] && action[5 + 2 * comm_size])
826 ? decode_datatype(action[4 + 2 * comm_size])
828 MPI_Datatype MPI_CURRENT_TYPE2{(action[4 + 2 * comm_size] && action[5 + 2 * comm_size])
829 ? decode_datatype(action[5 + 2 * comm_size])
832 int send_buf_size=parse_double(action[2]);
833 int recv_buf_size=parse_double(action[3+comm_size]);
834 int my_proc_id = Actor::self()->getPid();
835 void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* MPI_CURRENT_TYPE->size());
836 void *recvbuf = smpi_get_tmp_recvbuffer(recv_buf_size* MPI_CURRENT_TYPE2->size());
838 for(int i=0;i<comm_size;i++) {
839 sendcounts[i] = atoi(action[i+3]);
840 trace_sendcounts->push_back(sendcounts[i]);
841 send_size += sendcounts[i];
842 recvcounts[i] = atoi(action[i+4+comm_size]);
843 trace_recvcounts->push_back(recvcounts[i]);
844 recv_size += recvcounts[i];
849 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
850 new simgrid::instr::VarCollTIData("allToAllV", -1, send_size, trace_sendcounts, recv_size,
851 trace_recvcounts, encode_datatype(MPI_CURRENT_TYPE),
852 encode_datatype(MPI_CURRENT_TYPE2)));
854 Colls::alltoallv(sendbuf, sendcounts, senddisps, MPI_CURRENT_TYPE,recvbuf, recvcounts, recvdisps,
855 MPI_CURRENT_TYPE, MPI_COMM_WORLD);
857 TRACE_smpi_comm_out(my_proc_id);
858 log_timed_action (action, clock);
861 }} // namespace simgrid::smpi
863 /** @brief Only initialize the replay, don't do it for real */
864 void smpi_replay_init(int* argc, char*** argv)
866 simgrid::smpi::Process::init(argc, argv);
867 smpi_process()->mark_as_initialized();
868 smpi_process()->set_replaying(true);
870 int my_proc_id = Actor::self()->getPid();
871 TRACE_smpi_init(my_proc_id);
872 TRACE_smpi_computing_init(my_proc_id);
873 TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
874 TRACE_smpi_comm_out(my_proc_id);
875 xbt_replay_action_register("init", simgrid::smpi::action_init);
876 xbt_replay_action_register("finalize", simgrid::smpi::action_finalize);
877 xbt_replay_action_register("comm_size", simgrid::smpi::action_comm_size);
878 xbt_replay_action_register("comm_split", simgrid::smpi::action_comm_split);
879 xbt_replay_action_register("comm_dup", simgrid::smpi::action_comm_dup);
880 xbt_replay_action_register("send", simgrid::smpi::action_send);
881 xbt_replay_action_register("Isend", simgrid::smpi::action_Isend);
882 xbt_replay_action_register("recv", simgrid::smpi::action_recv);
883 xbt_replay_action_register("Irecv", simgrid::smpi::action_Irecv);
884 xbt_replay_action_register("test", simgrid::smpi::action_test);
885 xbt_replay_action_register("wait", simgrid::smpi::action_wait);
886 xbt_replay_action_register("waitAll", simgrid::smpi::action_waitall);
887 xbt_replay_action_register("barrier", simgrid::smpi::action_barrier);
888 xbt_replay_action_register("bcast", simgrid::smpi::action_bcast);
889 xbt_replay_action_register("reduce", simgrid::smpi::action_reduce);
890 xbt_replay_action_register("allReduce", simgrid::smpi::action_allReduce);
891 xbt_replay_action_register("allToAll", simgrid::smpi::action_allToAll);
892 xbt_replay_action_register("allToAllV", simgrid::smpi::action_allToAllv);
893 xbt_replay_action_register("gather", simgrid::smpi::action_gather);
894 xbt_replay_action_register("scatter", simgrid::smpi::action_scatter);
895 xbt_replay_action_register("gatherV", simgrid::smpi::action_gatherv);
896 xbt_replay_action_register("scatterV", simgrid::smpi::action_scatterv);
897 xbt_replay_action_register("allGather", simgrid::smpi::action_allgather);
898 xbt_replay_action_register("allGatherV", simgrid::smpi::action_allgatherv);
899 xbt_replay_action_register("reduceScatter", simgrid::smpi::action_reducescatter);
900 xbt_replay_action_register("compute", simgrid::smpi::action_compute);
902 //if we have a delayed start, sleep here.
904 double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
905 XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
906 smpi_execute_flops(value);
908 //UGLY: force a context switch to be sure that all MSG_processes begin initialization
909 XBT_DEBUG("Force context switch by smpi_execute_flops - Sleeping for 0.0 flops ");
910 smpi_execute_flops(0.0);
914 /** @brief actually run the replay after initialization */
915 void smpi_replay_main(int* argc, char*** argv)
917 simgrid::xbt::replay_runner(*argc, *argv);
919 /* and now, finalize everything */
920 /* One active process will stop. Decrease the counter*/
921 XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
922 if (not get_reqq_self()->empty()) {
923 unsigned int count_requests=get_reqq_self()->size();
924 MPI_Request requests[count_requests];
925 MPI_Status status[count_requests];
928 for (auto const& req : *get_reqq_self()) {
932 simgrid::smpi::Request::waitall(count_requests, requests, status);
934 delete get_reqq_self();
937 if(active_processes==0){
938 /* Last process alive speaking: end the simulated timer */
939 XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
940 xbt_free(sendbuffer);
941 xbt_free(recvbuffer);
944 TRACE_smpi_comm_in(Actor::self()->getPid(), "smpi_replay_run_finalize", new simgrid::instr::NoOpTIData("finalize"));
946 smpi_process()->finalize();
948 TRACE_smpi_comm_out(Actor::self()->getPid());
949 TRACE_smpi_finalize(Actor::self()->getPid());
952 /** @brief chain a replay initialization and a replay start */
953 void smpi_replay_run(int* argc, char*** argv)
955 smpi_replay_init(argc, argv);
956 smpi_replay_main(argc, argv);