1 /* Copyright (c) 2009-2017. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
17 #include <unordered_map>
20 using simgrid::s4u::Actor;
22 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
24 static int communicator_size = 0;
25 static int active_processes = 0;
26 static std::unordered_map<int, std::vector<MPI_Request>*> reqq;
28 static MPI_Datatype MPI_DEFAULT_TYPE;
29 static MPI_Datatype MPI_CURRENT_TYPE;
31 static int sendbuffer_size = 0;
32 static char* sendbuffer = nullptr;
33 static int recvbuffer_size = 0;
34 static char* recvbuffer = nullptr;
36 static void log_timed_action (const char *const *action, double clock){
37 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
38 char *name = xbt_str_join_array(action, " ");
39 XBT_VERB("%s %f", name, smpi_process()->simulated_elapsed()-clock);
44 static std::vector<MPI_Request>* get_reqq_self()
46 return reqq.at(Actor::self()->getPid());
49 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
51 reqq.insert({Actor::self()->getPid(), mpi_request});
54 //allocate a single buffer for all sends, growing it if needed
55 void* smpi_get_tmp_sendbuffer(int size)
57 if (not smpi_process()->replaying())
58 return xbt_malloc(size);
59 if (sendbuffer_size<size){
60 sendbuffer=static_cast<char*>(xbt_realloc(sendbuffer,size));
66 //allocate a single buffer for all recv
67 void* smpi_get_tmp_recvbuffer(int size){
68 if (not smpi_process()->replaying())
69 return xbt_malloc(size);
70 if (recvbuffer_size<size){
71 recvbuffer=static_cast<char*>(xbt_realloc(recvbuffer,size));
77 void smpi_free_tmp_buffer(void* buf){
78 if (not smpi_process()->replaying())
83 static double parse_double(const char *string)
86 double value = strtod(string, &endptr);
88 THROWF(unknown_error, 0, "%s is not a double", string);
93 //TODO: this logic should be moved inside the datatype class, to support all predefined types and get rid of is_replayable.
94 static MPI_Datatype decode_datatype(const char *const action)
96 switch(atoi(action)) {
119 return MPI_DEFAULT_TYPE;
124 const char* encode_datatype(MPI_Datatype datatype)
126 if (datatype==MPI_BYTE)
128 if(datatype==MPI_DOUBLE)
130 if(datatype==MPI_INT)
132 if(datatype==MPI_CHAR)
134 if(datatype==MPI_SHORT)
136 if(datatype==MPI_LONG)
138 if(datatype==MPI_FLOAT)
140 // default - not implemented.
141 // do not warn here as we pass in this function even for other trace formats
145 #define CHECK_ACTION_PARAMS(action, mandatory, optional) {\
147 while(action[i]!=nullptr)\
150 THROWF(arg_error, 0, "%s replay failed.\n" \
151 "%d items were given on the line. First two should be process_id and action. " \
152 "This action needs after them %d mandatory arguments, and accepts %d optional ones. \n" \
153 "Please contact the Simgrid team if support is needed", __FUNCTION__, i, mandatory, optional);\
159 static void action_init(const char *const *action)
161 XBT_DEBUG("Initialize the counters");
162 CHECK_ACTION_PARAMS(action, 0, 1)
164 MPI_DEFAULT_TYPE = MPI_DOUBLE; // default MPE datatype
166 MPI_DEFAULT_TYPE = MPI_BYTE; // default TAU datatype
168 /* start a simulated timer */
169 smpi_process()->simulated_start();
170 /*initialize the number of active processes */
171 active_processes = smpi_process_count();
173 set_reqq_self(new std::vector<MPI_Request>);
176 static void action_finalize(const char *const *action)
181 static void action_comm_size(const char *const *action)
183 communicator_size = parse_double(action[2]);
184 log_timed_action (action, smpi_process()->simulated_elapsed());
187 static void action_comm_split(const char *const *action)
189 log_timed_action (action, smpi_process()->simulated_elapsed());
192 static void action_comm_dup(const char *const *action)
194 log_timed_action (action, smpi_process()->simulated_elapsed());
197 static void action_compute(const char *const *action)
199 CHECK_ACTION_PARAMS(action, 1, 0)
200 double clock = smpi_process()->simulated_elapsed();
201 double flops= parse_double(action[2]);
202 int my_proc_id = Actor::self()->getPid();
204 TRACE_smpi_computing_in(my_proc_id, flops);
205 smpi_execute_flops(flops);
206 TRACE_smpi_computing_out(my_proc_id);
208 log_timed_action (action, clock);
211 static void action_send(const char *const *action)
213 CHECK_ACTION_PARAMS(action, 2, 1)
214 int to = atoi(action[2]);
215 double size=parse_double(action[3]);
216 double clock = smpi_process()->simulated_elapsed();
218 MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
220 int my_proc_id = Actor::self()->getPid();
221 int dst_traced = MPI_COMM_WORLD->group()->actor(to)->getPid();
223 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
224 new simgrid::instr::Pt2PtTIData("send", to, size, encode_datatype(MPI_CURRENT_TYPE)));
225 if (not TRACE_smpi_view_internals())
226 TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, size * MPI_CURRENT_TYPE->size());
228 Request::send(nullptr, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
230 TRACE_smpi_comm_out(my_proc_id);
232 log_timed_action(action, clock);
235 static void action_Isend(const char *const *action)
237 CHECK_ACTION_PARAMS(action, 2, 1)
238 int to = atoi(action[2]);
239 double size=parse_double(action[3]);
240 double clock = smpi_process()->simulated_elapsed();
242 MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
244 int my_proc_id = Actor::self()->getPid();
245 int dst_traced = MPI_COMM_WORLD->group()->actor(to)->getPid();
246 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
247 new simgrid::instr::Pt2PtTIData("Isend", to, size, encode_datatype(MPI_CURRENT_TYPE)));
248 if (not TRACE_smpi_view_internals())
249 TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, size * MPI_CURRENT_TYPE->size());
251 MPI_Request request = Request::isend(nullptr, size, MPI_CURRENT_TYPE, to, 0, MPI_COMM_WORLD);
253 TRACE_smpi_comm_out(my_proc_id);
255 get_reqq_self()->push_back(request);
257 log_timed_action (action, clock);
260 static void action_recv(const char *const *action) {
261 CHECK_ACTION_PARAMS(action, 2, 1)
262 int from = atoi(action[2]);
263 double size=parse_double(action[3]);
264 double clock = smpi_process()->simulated_elapsed();
267 MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
269 int my_proc_id = Actor::self()->getPid();
270 int src_traced = MPI_COMM_WORLD->group()->actor(from)->getPid();
272 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
273 new simgrid::instr::Pt2PtTIData("recv", from, size, encode_datatype(MPI_CURRENT_TYPE)));
275 //unknown size from the receiver point of view
277 Request::probe(from, 0, MPI_COMM_WORLD, &status);
281 Request::recv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
283 TRACE_smpi_comm_out(my_proc_id);
284 if (not TRACE_smpi_view_internals()) {
285 TRACE_smpi_recv(src_traced, my_proc_id, 0);
288 log_timed_action (action, clock);
291 static void action_Irecv(const char *const *action)
293 CHECK_ACTION_PARAMS(action, 2, 1)
294 int from = atoi(action[2]);
295 double size=parse_double(action[3]);
296 double clock = smpi_process()->simulated_elapsed();
298 MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
300 int my_proc_id = Actor::self()->getPid();
301 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
302 new simgrid::instr::Pt2PtTIData("Irecv", from, size, encode_datatype(MPI_CURRENT_TYPE)));
304 //unknow size from the receiver pov
306 Request::probe(from, 0, MPI_COMM_WORLD, &status);
310 MPI_Request request = Request::irecv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
312 TRACE_smpi_comm_out(my_proc_id);
313 get_reqq_self()->push_back(request);
315 log_timed_action (action, clock);
318 static void action_test(const char* const* action)
320 CHECK_ACTION_PARAMS(action, 0, 0)
321 double clock = smpi_process()->simulated_elapsed();
324 MPI_Request request = get_reqq_self()->back();
325 get_reqq_self()->pop_back();
326 //if request is null here, this may mean that a previous test has succeeded
327 //Different times in traced application and replayed version may lead to this
328 //In this case, ignore the extra calls.
329 if(request!=nullptr){
330 int my_proc_id = Actor::self()->getPid();
331 TRACE_smpi_testing_in(my_proc_id);
333 int flag = Request::test(&request, &status);
335 XBT_DEBUG("MPI_Test result: %d", flag);
336 /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now nullptr.*/
337 get_reqq_self()->push_back(request);
339 TRACE_smpi_testing_out(my_proc_id);
341 log_timed_action (action, clock);
344 static void action_wait(const char *const *action){
345 CHECK_ACTION_PARAMS(action, 0, 0)
346 double clock = smpi_process()->simulated_elapsed();
349 xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s",
350 xbt_str_join_array(action," "));
351 MPI_Request request = get_reqq_self()->back();
352 get_reqq_self()->pop_back();
354 if (request==nullptr){
355 /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just return.*/
359 int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
361 MPI_Group group = request->comm()->group();
362 int src_traced = group->rank(request->src());
363 int dst_traced = group->rank(request->dst());
364 int is_wait_for_receive = (request->flags() & RECV);
365 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::NoOpTIData("wait"));
367 Request::wait(&request, &status);
369 TRACE_smpi_comm_out(rank);
370 if (is_wait_for_receive)
371 TRACE_smpi_recv(src_traced, dst_traced, 0);
372 log_timed_action (action, clock);
375 static void action_waitall(const char *const *action){
376 CHECK_ACTION_PARAMS(action, 0, 0)
377 double clock = smpi_process()->simulated_elapsed();
378 const unsigned int count_requests = get_reqq_self()->size();
380 if (count_requests>0) {
381 MPI_Status status[count_requests];
383 int my_proc_id_traced = Actor::self()->getPid();
384 TRACE_smpi_comm_in(my_proc_id_traced, __FUNCTION__,
385 new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
386 int recvs_snd[count_requests];
387 int recvs_rcv[count_requests];
388 for (unsigned int i = 0; i < count_requests; i++) {
389 const auto& req = (*get_reqq_self())[i];
390 if (req && (req->flags() & RECV)) {
391 recvs_snd[i] = req->src();
392 recvs_rcv[i] = req->dst();
396 Request::waitall(count_requests, &(*get_reqq_self())[0], status);
398 for (unsigned i = 0; i < count_requests; i++) {
399 if (recvs_snd[i]!=-100)
400 TRACE_smpi_recv(recvs_snd[i], recvs_rcv[i],0);
402 TRACE_smpi_comm_out(my_proc_id_traced);
404 log_timed_action (action, clock);
407 static void action_barrier(const char *const *action){
408 double clock = smpi_process()->simulated_elapsed();
409 int my_proc_id = Actor::self()->getPid();
410 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::NoOpTIData("barrier"));
412 Colls::barrier(MPI_COMM_WORLD);
414 TRACE_smpi_comm_out(my_proc_id);
415 log_timed_action (action, clock);
418 static void action_bcast(const char *const *action)
420 CHECK_ACTION_PARAMS(action, 1, 2)
421 double size = parse_double(action[2]);
422 double clock = smpi_process()->simulated_elapsed();
423 int root = (action[3]) ? atoi(action[3]) : 0;
424 /* Initialize MPI_CURRENT_TYPE in order to decrease the number of the checks */
425 MPI_CURRENT_TYPE = (action[3] && action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
427 int my_proc_id = Actor::self()->getPid();
428 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
429 new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(root)->getPid(), -1.0, size,
430 -1, encode_datatype(MPI_CURRENT_TYPE), ""));
432 void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
434 Colls::bcast(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
436 TRACE_smpi_comm_out(my_proc_id);
437 log_timed_action (action, clock);
440 static void action_reduce(const char *const *action)
442 CHECK_ACTION_PARAMS(action, 2, 2)
443 double comm_size = parse_double(action[2]);
444 double comp_size = parse_double(action[3]);
445 double clock = smpi_process()->simulated_elapsed();
446 int root = (action[4]) ? atoi(action[4]) : 0;
448 MPI_CURRENT_TYPE = (action[4] && action[5]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE;
450 int my_proc_id = Actor::self()->getPid();
451 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
452 new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(root)->getPid(), comp_size,
453 comm_size, -1, encode_datatype(MPI_CURRENT_TYPE), ""));
455 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
456 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
457 Colls::reduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
458 smpi_execute_flops(comp_size);
460 TRACE_smpi_comm_out(my_proc_id);
461 log_timed_action (action, clock);
464 static void action_allReduce(const char *const *action) {
465 CHECK_ACTION_PARAMS(action, 2, 1)
466 double comm_size = parse_double(action[2]);
467 double comp_size = parse_double(action[3]);
469 MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
471 double clock = smpi_process()->simulated_elapsed();
472 int my_proc_id = Actor::self()->getPid();
473 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::CollTIData("allReduce", -1, comp_size, comm_size, -1,
474 encode_datatype(MPI_CURRENT_TYPE), ""));
476 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
477 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
478 Colls::allreduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
479 smpi_execute_flops(comp_size);
481 TRACE_smpi_comm_out(my_proc_id);
482 log_timed_action (action, clock);
485 static void action_allToAll(const char *const *action) {
486 CHECK_ACTION_PARAMS(action, 2, 2) //two mandatory (send and recv volumes) and two optional (corresponding datatypes)
487 double clock = smpi_process()->simulated_elapsed();
488 int comm_size = MPI_COMM_WORLD->size();
489 int send_size = parse_double(action[2]);
490 int recv_size = parse_double(action[3]);
491 MPI_CURRENT_TYPE = (action[4] && action[5]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
492 MPI_Datatype MPI_CURRENT_TYPE2{(action[4] && action[5]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE};
494 void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* MPI_CURRENT_TYPE->size());
495 void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
497 int my_proc_id = Actor::self()->getPid();
498 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
499 new simgrid::instr::CollTIData("allToAll", -1, -1.0, send_size, recv_size,
500 encode_datatype(MPI_CURRENT_TYPE),
501 encode_datatype(MPI_CURRENT_TYPE2)));
503 Colls::alltoall(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
505 TRACE_smpi_comm_out(my_proc_id);
506 log_timed_action (action, clock);
509 static void action_gather(const char *const *action) {
510 /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
513 1) 68 is the sendcounts
514 2) 68 is the recvcounts
515 3) 0 is the root node
516 4) 0 is the send datatype id, see decode_datatype()
517 5) 0 is the recv datatype id, see decode_datatype()
519 CHECK_ACTION_PARAMS(action, 2, 3)
520 double clock = smpi_process()->simulated_elapsed();
521 int comm_size = MPI_COMM_WORLD->size();
522 int send_size = parse_double(action[2]);
523 int recv_size = parse_double(action[3]);
524 MPI_CURRENT_TYPE = (action[5] && action[6]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE;
525 MPI_Datatype MPI_CURRENT_TYPE2{(action[5] && action[6]) ? decode_datatype(action[6]) : MPI_DEFAULT_TYPE};
527 void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
528 void *recv = nullptr;
529 int root = (action[4]) ? atoi(action[4]) : 0;
530 int rank = MPI_COMM_WORLD->rank();
533 recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
535 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
536 encode_datatype(MPI_CURRENT_TYPE),
537 encode_datatype(MPI_CURRENT_TYPE2)));
539 Colls::gather(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
541 TRACE_smpi_comm_out(Actor::self()->getPid());
542 log_timed_action (action, clock);
545 static void action_scatter(const char* const* action)
547 /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
550 1) 68 is the sendcounts
551 2) 68 is the recvcounts
552 3) 0 is the root node
553 4) 0 is the send datatype id, see decode_datatype()
554 5) 0 is the recv datatype id, see decode_datatype()
556 CHECK_ACTION_PARAMS(action, 2, 3)
557 double clock = smpi_process()->simulated_elapsed();
558 int comm_size = MPI_COMM_WORLD->size();
559 int send_size = parse_double(action[2]);
560 int recv_size = parse_double(action[3]);
561 MPI_CURRENT_TYPE = (action[5] && action[6]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE;
562 MPI_Datatype MPI_CURRENT_TYPE2{(action[5] && action[6]) ? decode_datatype(action[6]) : MPI_DEFAULT_TYPE};
564 void* send = smpi_get_tmp_sendbuffer(send_size * MPI_CURRENT_TYPE->size());
565 void* recv = nullptr;
566 int root = (action[4]) ? atoi(action[4]) : 0;
567 int rank = MPI_COMM_WORLD->rank();
570 recv = smpi_get_tmp_recvbuffer(recv_size * comm_size * MPI_CURRENT_TYPE2->size());
572 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
573 encode_datatype(MPI_CURRENT_TYPE),
574 encode_datatype(MPI_CURRENT_TYPE2)));
576 Colls::scatter(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
578 TRACE_smpi_comm_out(Actor::self()->getPid());
579 log_timed_action(action, clock);
582 static void action_gatherv(const char *const *action) {
583 /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
584 0 gather 68 68 10 10 10 0 0 0
586 1) 68 is the sendcount
587 2) 68 10 10 10 is the recvcounts
588 3) 0 is the root node
589 4) 0 is the send datatype id, see decode_datatype()
590 5) 0 is the recv datatype id, see decode_datatype()
592 double clock = smpi_process()->simulated_elapsed();
593 int comm_size = MPI_COMM_WORLD->size();
594 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
595 int send_size = parse_double(action[2]);
596 std::vector<int> disps(comm_size, 0);
597 int recvcounts[comm_size];
601 (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
602 MPI_Datatype MPI_CURRENT_TYPE2{
603 (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
605 void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
606 void *recv = nullptr;
607 for(int i=0;i<comm_size;i++) {
608 recvcounts[i] = atoi(action[i+3]);
609 recv_sum=recv_sum+recvcounts[i];
612 int root = (action[3 + comm_size]) ? atoi(action[3 + comm_size]) : 0;
613 int rank = MPI_COMM_WORLD->rank();
616 recv = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
618 std::vector<int>* trace_recvcounts = new std::vector<int>(recvcounts, recvcounts + comm_size);
620 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData(
621 "gatherV", root, send_size, nullptr, -1, trace_recvcounts,
622 encode_datatype(MPI_CURRENT_TYPE), encode_datatype(MPI_CURRENT_TYPE2)));
624 Colls::gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts, disps.data(), MPI_CURRENT_TYPE2, root,
627 TRACE_smpi_comm_out(Actor::self()->getPid());
628 log_timed_action (action, clock);
631 static void action_scatterv(const char* const* action)
633 /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
634 0 gather 68 10 10 10 68 0 0 0
636 1) 68 10 10 10 is the sendcounts
637 2) 68 is the recvcount
638 3) 0 is the root node
639 4) 0 is the send datatype id, see decode_datatype()
640 5) 0 is the recv datatype id, see decode_datatype()
642 double clock = smpi_process()->simulated_elapsed();
643 int comm_size = MPI_COMM_WORLD->size();
644 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
645 int recv_size = parse_double(action[2 + comm_size]);
646 std::vector<int> disps(comm_size, 0);
647 int sendcounts[comm_size];
651 (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
652 MPI_Datatype MPI_CURRENT_TYPE2{
653 (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
655 void* send = nullptr;
656 void* recv = smpi_get_tmp_recvbuffer(recv_size * MPI_CURRENT_TYPE->size());
657 for (int i = 0; i < comm_size; i++) {
658 sendcounts[i] = atoi(action[i + 2]);
659 send_sum += sendcounts[i];
662 int root = (action[3 + comm_size]) ? atoi(action[3 + comm_size]) : 0;
663 int rank = MPI_COMM_WORLD->rank();
666 send = smpi_get_tmp_sendbuffer(send_sum * MPI_CURRENT_TYPE2->size());
668 std::vector<int>* trace_sendcounts = new std::vector<int>(sendcounts, sendcounts + comm_size);
670 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData(
671 "gatherV", root, -1, trace_sendcounts, recv_size, nullptr,
672 encode_datatype(MPI_CURRENT_TYPE), encode_datatype(MPI_CURRENT_TYPE2)));
674 Colls::scatterv(send, sendcounts, disps.data(), MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
676 TRACE_smpi_comm_out(Actor::self()->getPid());
677 log_timed_action(action, clock);
680 static void action_reducescatter(const char *const *action) {
681 /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
682 0 reduceScatter 275427 275427 275427 204020 11346849 0
684 1) The first four values after the name of the action declare the recvcounts array
685 2) The value 11346849 is the amount of instructions
686 3) The last value corresponds to the datatype, see decode_datatype().
688 double clock = smpi_process()->simulated_elapsed();
689 int comm_size = MPI_COMM_WORLD->size();
690 CHECK_ACTION_PARAMS(action, comm_size+1, 1)
691 int comp_size = parse_double(action[2+comm_size]);
692 int my_proc_id = Actor::self()->getPid();
693 std::vector<int>* trace_recvcounts = new std::vector<int>;
694 MPI_CURRENT_TYPE = (action[3 + comm_size]) ? decode_datatype(action[3 + comm_size]) : MPI_DEFAULT_TYPE;
696 for(int i=0;i<comm_size;i++) {
697 trace_recvcounts->push_back(atoi(action[i + 2]));
699 int size{std::accumulate(trace_recvcounts->begin(), trace_recvcounts->end(), 0)};
701 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
702 new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, trace_recvcounts,
703 std::to_string(comp_size), /* ugly hack to print comp_size */
704 encode_datatype(MPI_CURRENT_TYPE)));
706 void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
707 void *recvbuf = smpi_get_tmp_recvbuffer(size* MPI_CURRENT_TYPE->size());
709 Colls::reduce_scatter(sendbuf, recvbuf, trace_recvcounts->data(), MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
710 smpi_execute_flops(comp_size);
712 TRACE_smpi_comm_out(my_proc_id);
713 log_timed_action (action, clock);
716 static void action_allgather(const char *const *action) {
717 /* The structure of the allgather action for the rank 0 (total 4 processes) is the following:
718 0 allGather 275427 275427
720 1) 275427 is the sendcount
721 2) 275427 is the recvcount
722 3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
724 double clock = smpi_process()->simulated_elapsed();
726 CHECK_ACTION_PARAMS(action, 2, 2)
727 int sendcount=atoi(action[2]);
728 int recvcount=atoi(action[3]);
730 MPI_CURRENT_TYPE = (action[4] && action[5]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
731 MPI_Datatype MPI_CURRENT_TYPE2{(action[4] && action[5]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE};
733 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
734 void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* MPI_CURRENT_TYPE2->size());
736 int my_proc_id = Actor::self()->getPid();
738 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
739 new simgrid::instr::CollTIData("allGather", -1, -1.0, sendcount, recvcount,
740 encode_datatype(MPI_CURRENT_TYPE),
741 encode_datatype(MPI_CURRENT_TYPE2)));
743 Colls::allgather(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
745 TRACE_smpi_comm_out(my_proc_id);
746 log_timed_action (action, clock);
749 static void action_allgatherv(const char *const *action) {
750 /* The structure of the allgatherv action for the rank 0 (total 4 processes) is the following:
751 0 allGatherV 275427 275427 275427 275427 204020
753 1) 275427 is the sendcount
754 2) The next four elements declare the recvcounts array
755 3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
757 double clock = smpi_process()->simulated_elapsed();
759 int comm_size = MPI_COMM_WORLD->size();
760 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
761 int sendcount=atoi(action[2]);
762 int recvcounts[comm_size];
763 std::vector<int> disps(comm_size, 0);
767 (action[3 + comm_size] && action[4 + comm_size]) ? decode_datatype(action[3 + comm_size]) : MPI_DEFAULT_TYPE;
768 MPI_Datatype MPI_CURRENT_TYPE2{
769 (action[3 + comm_size] && action[4 + comm_size]) ? decode_datatype(action[4 + comm_size]) : MPI_DEFAULT_TYPE};
771 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
773 for(int i=0;i<comm_size;i++) {
774 recvcounts[i] = atoi(action[i+3]);
775 recv_sum=recv_sum+recvcounts[i];
777 void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
779 int my_proc_id = Actor::self()->getPid();
781 std::vector<int>* trace_recvcounts = new std::vector<int>(recvcounts, recvcounts + comm_size);
783 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
784 new simgrid::instr::VarCollTIData("allGatherV", -1, sendcount, nullptr, -1, trace_recvcounts,
785 encode_datatype(MPI_CURRENT_TYPE),
786 encode_datatype(MPI_CURRENT_TYPE2)));
788 Colls::allgatherv(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts, disps.data(), MPI_CURRENT_TYPE2,
791 TRACE_smpi_comm_out(my_proc_id);
792 log_timed_action (action, clock);
795 static void action_allToAllv(const char *const *action) {
796 /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
797 0 allToAllV 100 1 7 10 12 100 1 70 10 5
799 1) 100 is the size of the send buffer *sizeof(int),
800 2) 1 7 10 12 is the sendcounts array
801 3) 100*sizeof(int) is the size of the receiver buffer
802 4) 1 70 10 5 is the recvcounts array
804 double clock = smpi_process()->simulated_elapsed();
806 int comm_size = MPI_COMM_WORLD->size();
807 CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
808 std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
809 std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
810 std::vector<int> senddisps(comm_size, 0);
811 std::vector<int> recvdisps(comm_size, 0);
813 MPI_CURRENT_TYPE = (action[4 + 2 * comm_size] && action[5 + 2 * comm_size])
814 ? decode_datatype(action[4 + 2 * comm_size])
816 MPI_Datatype MPI_CURRENT_TYPE2{(action[4 + 2 * comm_size] && action[5 + 2 * comm_size])
817 ? decode_datatype(action[5 + 2 * comm_size])
820 int send_buf_size=parse_double(action[2]);
821 int recv_buf_size=parse_double(action[3+comm_size]);
822 int my_proc_id = Actor::self()->getPid();
823 void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* MPI_CURRENT_TYPE->size());
824 void *recvbuf = smpi_get_tmp_recvbuffer(recv_buf_size* MPI_CURRENT_TYPE2->size());
826 for(int i=0;i<comm_size;i++) {
827 (*sendcounts)[i] = atoi(action[3 + i]);
828 (*recvcounts)[i] = atoi(action[4 + comm_size + i]);
830 int send_size = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
831 int recv_size = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
833 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
834 new simgrid::instr::VarCollTIData("allToAllV", -1, send_size, sendcounts, recv_size, recvcounts,
835 encode_datatype(MPI_CURRENT_TYPE),
836 encode_datatype(MPI_CURRENT_TYPE2)));
838 Colls::alltoallv(sendbuf, sendcounts->data(), senddisps.data(), MPI_CURRENT_TYPE, recvbuf, recvcounts->data(),
839 recvdisps.data(), MPI_CURRENT_TYPE, MPI_COMM_WORLD);
841 TRACE_smpi_comm_out(my_proc_id);
842 log_timed_action (action, clock);
845 }} // namespace simgrid::smpi
847 /** @brief Only initialize the replay, don't do it for real */
848 void smpi_replay_init(int* argc, char*** argv)
850 simgrid::smpi::Process::init(argc, argv);
851 smpi_process()->mark_as_initialized();
852 smpi_process()->set_replaying(true);
854 int my_proc_id = Actor::self()->getPid();
855 TRACE_smpi_init(my_proc_id);
856 TRACE_smpi_computing_init(my_proc_id);
857 TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
858 TRACE_smpi_comm_out(my_proc_id);
859 xbt_replay_action_register("init", simgrid::smpi::action_init);
860 xbt_replay_action_register("finalize", simgrid::smpi::action_finalize);
861 xbt_replay_action_register("comm_size", simgrid::smpi::action_comm_size);
862 xbt_replay_action_register("comm_split", simgrid::smpi::action_comm_split);
863 xbt_replay_action_register("comm_dup", simgrid::smpi::action_comm_dup);
864 xbt_replay_action_register("send", simgrid::smpi::action_send);
865 xbt_replay_action_register("Isend", simgrid::smpi::action_Isend);
866 xbt_replay_action_register("recv", simgrid::smpi::action_recv);
867 xbt_replay_action_register("Irecv", simgrid::smpi::action_Irecv);
868 xbt_replay_action_register("test", simgrid::smpi::action_test);
869 xbt_replay_action_register("wait", simgrid::smpi::action_wait);
870 xbt_replay_action_register("waitAll", simgrid::smpi::action_waitall);
871 xbt_replay_action_register("barrier", simgrid::smpi::action_barrier);
872 xbt_replay_action_register("bcast", simgrid::smpi::action_bcast);
873 xbt_replay_action_register("reduce", simgrid::smpi::action_reduce);
874 xbt_replay_action_register("allReduce", simgrid::smpi::action_allReduce);
875 xbt_replay_action_register("allToAll", simgrid::smpi::action_allToAll);
876 xbt_replay_action_register("allToAllV", simgrid::smpi::action_allToAllv);
877 xbt_replay_action_register("gather", simgrid::smpi::action_gather);
878 xbt_replay_action_register("scatter", simgrid::smpi::action_scatter);
879 xbt_replay_action_register("gatherV", simgrid::smpi::action_gatherv);
880 xbt_replay_action_register("scatterV", simgrid::smpi::action_scatterv);
881 xbt_replay_action_register("allGather", simgrid::smpi::action_allgather);
882 xbt_replay_action_register("allGatherV", simgrid::smpi::action_allgatherv);
883 xbt_replay_action_register("reduceScatter", simgrid::smpi::action_reducescatter);
884 xbt_replay_action_register("compute", simgrid::smpi::action_compute);
886 //if we have a delayed start, sleep here.
888 double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
889 XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
890 smpi_execute_flops(value);
892 //UGLY: force a context switch to be sure that all MSG_processes begin initialization
893 XBT_DEBUG("Force context switch by smpi_execute_flops - Sleeping for 0.0 flops ");
894 smpi_execute_flops(0.0);
898 /** @brief actually run the replay after initialization */
899 void smpi_replay_main(int* argc, char*** argv)
901 simgrid::xbt::replay_runner(*argc, *argv);
903 /* and now, finalize everything */
904 /* One active process will stop. Decrease the counter*/
905 XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
906 if (not get_reqq_self()->empty()) {
907 unsigned int count_requests=get_reqq_self()->size();
908 MPI_Request requests[count_requests];
909 MPI_Status status[count_requests];
912 for (auto const& req : *get_reqq_self()) {
916 simgrid::smpi::Request::waitall(count_requests, requests, status);
918 delete get_reqq_self();
921 if(active_processes==0){
922 /* Last process alive speaking: end the simulated timer */
923 XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
924 xbt_free(sendbuffer);
925 xbt_free(recvbuffer);
928 TRACE_smpi_comm_in(Actor::self()->getPid(), "smpi_replay_run_finalize", new simgrid::instr::NoOpTIData("finalize"));
930 smpi_process()->finalize();
932 TRACE_smpi_comm_out(Actor::self()->getPid());
933 TRACE_smpi_finalize(Actor::self()->getPid());
936 /** @brief chain a replay initialization and a replay start */
937 void smpi_replay_run(int* argc, char*** argv)
939 smpi_replay_init(argc, argv);
940 smpi_replay_main(argc, argv);