1 /* Copyright (c) 2009-2017. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
15 #include <boost/algorithm/string/join.hpp>
18 #include <unordered_map>
21 using simgrid::s4u::Actor;
23 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
25 static int communicator_size = 0;
26 static int active_processes = 0;
27 static std::unordered_map<int, std::vector<MPI_Request>*> reqq;
29 static MPI_Datatype MPI_DEFAULT_TYPE;
31 #define CHECK_ACTION_PARAMS(action, mandatory, optional) \
33 if (action.size() < static_cast<unsigned long>(mandatory + 2)) \
34 THROWF(arg_error, 0, "%s replay failed.\n" \
35 "%lu items were given on the line. First two should be process_id and action. " \
36 "This action needs after them %lu mandatory arguments, and accepts %lu optional ones. \n" \
37 "Please contact the Simgrid team if support is needed", \
38 __FUNCTION__, action.size(), static_cast<unsigned long>(mandatory), \
39 static_cast<unsigned long>(optional)); \
42 static void log_timed_action (simgrid::xbt::ReplayAction& action, double clock){
43 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
44 std::string s = boost::algorithm::join(action, " ");
45 XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed()-clock);
49 static std::vector<MPI_Request>* get_reqq_self()
51 return reqq.at(Actor::self()->getPid());
54 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
56 reqq.insert({Actor::self()->getPid(), mpi_request});
60 static double parse_double(std::string string)
62 return xbt_str_parse_double(string.c_str(), "%s is not a double");
69 class ActionArgParser {
71 virtual void parse(simgrid::xbt::ReplayAction& action){};
74 template<class T> class ReplayAction {
76 const std::string name;
80 * Used to compute the duration of this action.
87 explicit ReplayAction(std::string name)
88 : name(name), start_time(smpi_process()->simulated_elapsed()), my_proc_id(simgrid::s4u::Actor::self()->getPid())
92 virtual void execute(simgrid::xbt::ReplayAction& action)
96 log_timed_action(action, start_time);
99 virtual void kernel(simgrid::xbt::ReplayAction& action) = 0;
102 class WaitAction : public ReplayAction<ActionArgParser> {
104 WaitAction() : ReplayAction("Wait") {}
105 void kernel(simgrid::xbt::ReplayAction& action) override
107 CHECK_ACTION_PARAMS(action, 0, 0)
110 std::string s = boost::algorithm::join(action, " ");
111 xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
112 MPI_Request request = get_reqq_self()->back();
113 get_reqq_self()->pop_back();
115 if (request == nullptr) {
116 /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just
121 int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
123 MPI_Group group = request->comm()->group();
124 int src_traced = group->rank(request->src());
125 int dst_traced = group->rank(request->dst());
126 bool is_wait_for_receive = (request->flags() & RECV);
127 // TODO: Here we take the rank while we normally take the process id (look for my_proc_id)
128 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::NoOpTIData("wait"));
130 Request::wait(&request, &status);
132 TRACE_smpi_comm_out(rank);
133 if (is_wait_for_receive)
134 TRACE_smpi_recv(src_traced, dst_traced, 0);
138 static void action_init(simgrid::xbt::ReplayAction& action)
140 XBT_DEBUG("Initialize the counters");
141 CHECK_ACTION_PARAMS(action, 0, 1)
142 if (action.size() > 2)
143 MPI_DEFAULT_TYPE = MPI_DOUBLE; // default MPE datatype
145 MPI_DEFAULT_TYPE = MPI_BYTE; // default TAU datatype
147 /* start a simulated timer */
148 smpi_process()->simulated_start();
149 /*initialize the number of active processes */
150 active_processes = smpi_process_count();
152 set_reqq_self(new std::vector<MPI_Request>);
155 static void action_finalize(simgrid::xbt::ReplayAction& action)
160 static void action_comm_size(simgrid::xbt::ReplayAction& action)
162 communicator_size = parse_double(action[2]);
163 log_timed_action (action, smpi_process()->simulated_elapsed());
166 static void action_comm_split(simgrid::xbt::ReplayAction& action)
168 log_timed_action (action, smpi_process()->simulated_elapsed());
171 static void action_comm_dup(simgrid::xbt::ReplayAction& action)
173 log_timed_action (action, smpi_process()->simulated_elapsed());
176 static void action_compute(simgrid::xbt::ReplayAction& action)
178 CHECK_ACTION_PARAMS(action, 1, 0)
179 double clock = smpi_process()->simulated_elapsed();
180 double flops= parse_double(action[2]);
181 int my_proc_id = Actor::self()->getPid();
183 TRACE_smpi_computing_in(my_proc_id, flops);
184 smpi_execute_flops(flops);
185 TRACE_smpi_computing_out(my_proc_id);
187 log_timed_action (action, clock);
190 static void action_send(simgrid::xbt::ReplayAction& action)
192 CHECK_ACTION_PARAMS(action, 2, 1)
193 int to = std::stoi(action[2]);
194 double size=parse_double(action[3]);
195 double clock = smpi_process()->simulated_elapsed();
197 MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE;
199 int my_proc_id = Actor::self()->getPid();
200 int dst_traced = MPI_COMM_WORLD->group()->actor(to)->getPid();
202 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
203 new simgrid::instr::Pt2PtTIData("send", to, size, Datatype::encode(MPI_CURRENT_TYPE)));
204 if (not TRACE_smpi_view_internals())
205 TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, size * MPI_CURRENT_TYPE->size());
207 Request::send(nullptr, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
209 TRACE_smpi_comm_out(my_proc_id);
211 log_timed_action(action, clock);
214 static void action_Isend(simgrid::xbt::ReplayAction& action)
216 CHECK_ACTION_PARAMS(action, 2, 1)
217 int to = std::stoi(action[2]);
218 double size=parse_double(action[3]);
219 double clock = smpi_process()->simulated_elapsed();
221 MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE;
223 int my_proc_id = Actor::self()->getPid();
224 int dst_traced = MPI_COMM_WORLD->group()->actor(to)->getPid();
225 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
226 new simgrid::instr::Pt2PtTIData("Isend", to, size, Datatype::encode(MPI_CURRENT_TYPE)));
227 if (not TRACE_smpi_view_internals())
228 TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, size * MPI_CURRENT_TYPE->size());
230 MPI_Request request = Request::isend(nullptr, size, MPI_CURRENT_TYPE, to, 0, MPI_COMM_WORLD);
232 TRACE_smpi_comm_out(my_proc_id);
234 get_reqq_self()->push_back(request);
236 log_timed_action (action, clock);
239 static void action_recv(simgrid::xbt::ReplayAction& action)
241 CHECK_ACTION_PARAMS(action, 2, 1)
242 int from = std::stoi(action[2]);
243 double size=parse_double(action[3]);
244 double clock = smpi_process()->simulated_elapsed();
247 MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE;
249 int my_proc_id = Actor::self()->getPid();
250 int src_traced = MPI_COMM_WORLD->group()->actor(from)->getPid();
252 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
253 new simgrid::instr::Pt2PtTIData("recv", from, size, Datatype::encode(MPI_CURRENT_TYPE)));
255 //unknown size from the receiver point of view
257 Request::probe(from, 0, MPI_COMM_WORLD, &status);
261 Request::recv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
263 TRACE_smpi_comm_out(my_proc_id);
264 if (not TRACE_smpi_view_internals()) {
265 TRACE_smpi_recv(src_traced, my_proc_id, 0);
268 log_timed_action (action, clock);
271 static void action_Irecv(simgrid::xbt::ReplayAction& action)
273 CHECK_ACTION_PARAMS(action, 2, 1)
274 int from = std::stoi(action[2]);
275 double size=parse_double(action[3]);
276 double clock = smpi_process()->simulated_elapsed();
278 MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE;
280 int my_proc_id = Actor::self()->getPid();
281 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
282 new simgrid::instr::Pt2PtTIData("Irecv", from, size, Datatype::encode(MPI_CURRENT_TYPE)));
284 //unknow size from the receiver pov
286 Request::probe(from, 0, MPI_COMM_WORLD, &status);
290 MPI_Request request = Request::irecv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
292 TRACE_smpi_comm_out(my_proc_id);
293 get_reqq_self()->push_back(request);
295 log_timed_action (action, clock);
298 static void action_test(simgrid::xbt::ReplayAction& action)
300 CHECK_ACTION_PARAMS(action, 0, 0)
301 double clock = smpi_process()->simulated_elapsed();
304 MPI_Request request = get_reqq_self()->back();
305 get_reqq_self()->pop_back();
306 //if request is null here, this may mean that a previous test has succeeded
307 //Different times in traced application and replayed version may lead to this
308 //In this case, ignore the extra calls.
309 if(request!=nullptr){
310 int my_proc_id = Actor::self()->getPid();
311 TRACE_smpi_testing_in(my_proc_id);
313 int flag = Request::test(&request, &status);
315 XBT_DEBUG("MPI_Test result: %d", flag);
316 /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now nullptr.*/
317 get_reqq_self()->push_back(request);
319 TRACE_smpi_testing_out(my_proc_id);
321 log_timed_action (action, clock);
324 static void action_wait(simgrid::xbt::ReplayAction& action)
326 Replay::WaitAction().execute(action);
329 static void action_waitall(simgrid::xbt::ReplayAction& action)
331 CHECK_ACTION_PARAMS(action, 0, 0)
332 double clock = smpi_process()->simulated_elapsed();
333 const unsigned int count_requests = get_reqq_self()->size();
335 if (count_requests>0) {
336 MPI_Status status[count_requests];
338 int my_proc_id_traced = Actor::self()->getPid();
339 TRACE_smpi_comm_in(my_proc_id_traced, __FUNCTION__,
340 new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
341 int recvs_snd[count_requests];
342 int recvs_rcv[count_requests];
343 for (unsigned int i = 0; i < count_requests; i++) {
344 const auto& req = (*get_reqq_self())[i];
345 if (req && (req->flags() & RECV)) {
346 recvs_snd[i] = req->src();
347 recvs_rcv[i] = req->dst();
351 Request::waitall(count_requests, &(*get_reqq_self())[0], status);
353 for (unsigned i = 0; i < count_requests; i++) {
354 if (recvs_snd[i]!=-100)
355 TRACE_smpi_recv(recvs_snd[i], recvs_rcv[i],0);
357 TRACE_smpi_comm_out(my_proc_id_traced);
359 log_timed_action (action, clock);
362 static void action_barrier(simgrid::xbt::ReplayAction& action)
364 double clock = smpi_process()->simulated_elapsed();
365 int my_proc_id = Actor::self()->getPid();
366 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::NoOpTIData("barrier"));
368 Colls::barrier(MPI_COMM_WORLD);
370 TRACE_smpi_comm_out(my_proc_id);
371 log_timed_action (action, clock);
374 static void action_bcast(simgrid::xbt::ReplayAction& action)
376 CHECK_ACTION_PARAMS(action, 1, 2)
377 double size = parse_double(action[2]);
378 double clock = smpi_process()->simulated_elapsed();
379 int root = (action.size() > 3) ? std::stoi(action[3]) : 0;
380 /* Initialize MPI_CURRENT_TYPE in order to decrease the number of the checks */
381 MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE;
383 int my_proc_id = Actor::self()->getPid();
384 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
385 new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(root)->getPid(), -1.0, size,
386 -1, Datatype::encode(MPI_CURRENT_TYPE), ""));
388 void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
390 Colls::bcast(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
392 TRACE_smpi_comm_out(my_proc_id);
393 log_timed_action (action, clock);
396 static void action_reduce(simgrid::xbt::ReplayAction& action)
398 CHECK_ACTION_PARAMS(action, 2, 2)
399 double comm_size = parse_double(action[2]);
400 double comp_size = parse_double(action[3]);
401 double clock = smpi_process()->simulated_elapsed();
402 int root = (action.size() > 4) ? std::stoi(action[4]) : 0;
404 MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 5) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE;
406 int my_proc_id = Actor::self()->getPid();
407 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
408 new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(root)->getPid(), comp_size,
409 comm_size, -1, Datatype::encode(MPI_CURRENT_TYPE), ""));
411 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
412 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
413 Colls::reduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
414 smpi_execute_flops(comp_size);
416 TRACE_smpi_comm_out(my_proc_id);
417 log_timed_action (action, clock);
420 static void action_allReduce(simgrid::xbt::ReplayAction& action)
422 CHECK_ACTION_PARAMS(action, 2, 1)
423 double comm_size = parse_double(action[2]);
424 double comp_size = parse_double(action[3]);
426 MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE;
428 double clock = smpi_process()->simulated_elapsed();
429 int my_proc_id = Actor::self()->getPid();
430 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::CollTIData("allReduce", -1, comp_size, comm_size, -1,
431 Datatype::encode(MPI_CURRENT_TYPE), ""));
433 void *recvbuf = smpi_get_tmp_recvbuffer(comm_size* MPI_CURRENT_TYPE->size());
434 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
435 Colls::allreduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
436 smpi_execute_flops(comp_size);
438 TRACE_smpi_comm_out(my_proc_id);
439 log_timed_action (action, clock);
442 static void action_allToAll(simgrid::xbt::ReplayAction& action)
444 CHECK_ACTION_PARAMS(action, 2, 2) //two mandatory (send and recv volumes) and two optional (corresponding datatypes)
445 double clock = smpi_process()->simulated_elapsed();
446 unsigned long comm_size = MPI_COMM_WORLD->size();
447 int send_size = parse_double(action[2]);
448 int recv_size = parse_double(action[3]);
449 MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE};
450 MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
452 void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* MPI_CURRENT_TYPE->size());
453 void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
455 int my_proc_id = Actor::self()->getPid();
456 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
457 new simgrid::instr::CollTIData("allToAll", -1, -1.0, send_size, recv_size,
458 Datatype::encode(MPI_CURRENT_TYPE),
459 Datatype::encode(MPI_CURRENT_TYPE2)));
461 Colls::alltoall(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
463 TRACE_smpi_comm_out(my_proc_id);
464 log_timed_action (action, clock);
467 static void action_gather(simgrid::xbt::ReplayAction& action)
469 /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
472 1) 68 is the sendcounts
473 2) 68 is the recvcounts
474 3) 0 is the root node
475 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
476 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
478 CHECK_ACTION_PARAMS(action, 2, 3)
479 double clock = smpi_process()->simulated_elapsed();
480 unsigned long comm_size = MPI_COMM_WORLD->size();
481 int send_size = parse_double(action[2]);
482 int recv_size = parse_double(action[3]);
483 MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
484 MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[6]) : MPI_DEFAULT_TYPE};
486 void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
487 void *recv = nullptr;
488 int root = (action.size() > 4) ? std::stoi(action[4]) : 0;
489 int rank = MPI_COMM_WORLD->rank();
492 recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
494 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
495 Datatype::encode(MPI_CURRENT_TYPE),
496 Datatype::encode(MPI_CURRENT_TYPE2)));
498 Colls::gather(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
500 TRACE_smpi_comm_out(Actor::self()->getPid());
501 log_timed_action (action, clock);
504 static void action_scatter(simgrid::xbt::ReplayAction& action)
506 /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
509 1) 68 is the sendcounts
510 2) 68 is the recvcounts
511 3) 0 is the root node
512 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
513 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
515 CHECK_ACTION_PARAMS(action, 2, 3)
516 double clock = smpi_process()->simulated_elapsed();
517 unsigned long comm_size = MPI_COMM_WORLD->size();
518 int send_size = parse_double(action[2]);
519 int recv_size = parse_double(action[3]);
520 MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
521 MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 6) ? simgrid::smpi::Datatype::decode(action[6]) : MPI_DEFAULT_TYPE};
523 void* send = smpi_get_tmp_sendbuffer(send_size * MPI_CURRENT_TYPE->size());
524 void* recv = nullptr;
525 int root = (action.size() > 4) ? std::stoi(action[4]) : 0;
526 int rank = MPI_COMM_WORLD->rank();
529 recv = smpi_get_tmp_recvbuffer(recv_size * comm_size * MPI_CURRENT_TYPE2->size());
531 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
532 Datatype::encode(MPI_CURRENT_TYPE),
533 Datatype::encode(MPI_CURRENT_TYPE2)));
535 Colls::scatter(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
537 TRACE_smpi_comm_out(Actor::self()->getPid());
538 log_timed_action(action, clock);
541 static void action_gatherv(simgrid::xbt::ReplayAction& action)
543 /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
544 0 gather 68 68 10 10 10 0 0 0
546 1) 68 is the sendcount
547 2) 68 10 10 10 is the recvcounts
548 3) 0 is the root node
549 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
550 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
552 double clock = smpi_process()->simulated_elapsed();
553 unsigned long comm_size = MPI_COMM_WORLD->size();
554 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
555 int send_size = parse_double(action[2]);
556 std::vector<int> disps(comm_size, 0);
557 std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
559 MPI_Datatype MPI_CURRENT_TYPE =
560 (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
561 MPI_Datatype MPI_CURRENT_TYPE2{
562 (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
564 void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
565 void *recv = nullptr;
566 for (unsigned int i = 0; i < comm_size; i++) {
567 (*recvcounts)[i] = std::stoi(action[i + 3]);
569 int recv_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
571 int root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
572 int rank = MPI_COMM_WORLD->rank();
575 recv = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
577 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData(
578 "gatherV", root, send_size, nullptr, -1, recvcounts,
579 Datatype::encode(MPI_CURRENT_TYPE), Datatype::encode(MPI_CURRENT_TYPE2)));
581 Colls::gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts->data(), disps.data(), MPI_CURRENT_TYPE2, root,
584 TRACE_smpi_comm_out(Actor::self()->getPid());
585 log_timed_action (action, clock);
588 static void action_scatterv(simgrid::xbt::ReplayAction& action)
590 /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
591 0 gather 68 10 10 10 68 0 0 0
593 1) 68 10 10 10 is the sendcounts
594 2) 68 is the recvcount
595 3) 0 is the root node
596 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
597 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
599 double clock = smpi_process()->simulated_elapsed();
600 unsigned long comm_size = MPI_COMM_WORLD->size();
601 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
602 int recv_size = parse_double(action[2 + comm_size]);
603 std::vector<int> disps(comm_size, 0);
604 std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
606 MPI_Datatype MPI_CURRENT_TYPE =
607 (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
608 MPI_Datatype MPI_CURRENT_TYPE2{
609 (action.size() > 5 + comm_size) ? simgrid::smpi::Datatype::decode(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
611 void* send = nullptr;
612 void* recv = smpi_get_tmp_recvbuffer(recv_size * MPI_CURRENT_TYPE->size());
613 for (unsigned int i = 0; i < comm_size; i++) {
614 (*sendcounts)[i] = std::stoi(action[i + 2]);
616 int send_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
618 int root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
619 int rank = MPI_COMM_WORLD->rank();
622 send = smpi_get_tmp_sendbuffer(send_sum * MPI_CURRENT_TYPE2->size());
624 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData("gatherV", root, -1, sendcounts, recv_size,
625 nullptr, Datatype::encode(MPI_CURRENT_TYPE),
626 Datatype::encode(MPI_CURRENT_TYPE2)));
628 Colls::scatterv(send, sendcounts->data(), disps.data(), MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root,
631 TRACE_smpi_comm_out(Actor::self()->getPid());
632 log_timed_action(action, clock);
635 static void action_reducescatter(simgrid::xbt::ReplayAction& action)
637 /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
638 0 reduceScatter 275427 275427 275427 204020 11346849 0
640 1) The first four values after the name of the action declare the recvcounts array
641 2) The value 11346849 is the amount of instructions
642 3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
644 double clock = smpi_process()->simulated_elapsed();
645 unsigned long comm_size = MPI_COMM_WORLD->size();
646 CHECK_ACTION_PARAMS(action, comm_size+1, 1)
647 int comp_size = parse_double(action[2+comm_size]);
648 int my_proc_id = Actor::self()->getPid();
649 std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>);
650 MPI_Datatype MPI_CURRENT_TYPE =
651 (action.size() > 3 + comm_size) ? simgrid::smpi::Datatype::decode(action[3 + comm_size]) : MPI_DEFAULT_TYPE;
653 for (unsigned int i = 0; i < comm_size; i++) {
654 recvcounts->push_back(std::stoi(action[i + 2]));
656 int size{std::accumulate(recvcounts->begin(), recvcounts->end(), 0)};
658 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
659 new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, recvcounts,
660 std::to_string(comp_size), /* ugly hack to print comp_size */
661 Datatype::encode(MPI_CURRENT_TYPE)));
663 void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
664 void *recvbuf = smpi_get_tmp_recvbuffer(size* MPI_CURRENT_TYPE->size());
666 Colls::reduce_scatter(sendbuf, recvbuf, recvcounts->data(), MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
667 smpi_execute_flops(comp_size);
669 TRACE_smpi_comm_out(my_proc_id);
670 log_timed_action (action, clock);
673 static void action_allgather(simgrid::xbt::ReplayAction& action)
675 /* The structure of the allgather action for the rank 0 (total 4 processes) is the following:
676 0 allGather 275427 275427
678 1) 275427 is the sendcount
679 2) 275427 is the recvcount
680 3) No more values mean that the datatype for sent and receive buffer is the default one, see
681 simgrid::smpi::Datatype::decode().
683 double clock = smpi_process()->simulated_elapsed();
685 CHECK_ACTION_PARAMS(action, 2, 2)
686 int sendcount = std::stoi(action[2]);
687 int recvcount = std::stoi(action[3]);
689 MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[4]) : MPI_DEFAULT_TYPE};
690 MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5) ? simgrid::smpi::Datatype::decode(action[5]) : MPI_DEFAULT_TYPE};
692 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
693 void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* MPI_CURRENT_TYPE2->size());
695 int my_proc_id = Actor::self()->getPid();
697 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
698 new simgrid::instr::CollTIData("allGather", -1, -1.0, sendcount, recvcount,
699 Datatype::encode(MPI_CURRENT_TYPE),
700 Datatype::encode(MPI_CURRENT_TYPE2)));
702 Colls::allgather(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
704 TRACE_smpi_comm_out(my_proc_id);
705 log_timed_action (action, clock);
708 static void action_allgatherv(simgrid::xbt::ReplayAction& action)
710 /* The structure of the allgatherv action for the rank 0 (total 4 processes) is the following:
711 0 allGatherV 275427 275427 275427 275427 204020
713 1) 275427 is the sendcount
714 2) The next four elements declare the recvcounts array
715 3) No more values mean that the datatype for sent and receive buffer is the default one, see
716 simgrid::smpi::Datatype::decode().
718 double clock = smpi_process()->simulated_elapsed();
720 unsigned long comm_size = MPI_COMM_WORLD->size();
721 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
722 int sendcount = std::stoi(action[2]);
723 std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
724 std::vector<int> disps(comm_size, 0);
726 int datatype_index = 0, disp_index = 0;
727 if (action.size() > 3 + 2 * comm_size) { /* datatype + disp are specified */
728 datatype_index = 3 + comm_size;
729 disp_index = datatype_index + 1;
730 } else if (action.size() > 3 + 2 * comm_size) { /* disps specified; datatype is not specified; use the default one */
732 disp_index = 3 + comm_size;
733 } else if (action.size() > 3 + comm_size) { /* only datatype, no disp specified */
734 datatype_index = 3 + comm_size;
737 if (disp_index != 0) {
738 for (unsigned int i = 0; i < comm_size; i++)
739 disps[i] = std::stoi(action[disp_index + i]);
742 MPI_Datatype MPI_CURRENT_TYPE{(datatype_index > 0) ? simgrid::smpi::Datatype::decode(action[datatype_index])
744 MPI_Datatype MPI_CURRENT_TYPE2{(datatype_index > 0) ? simgrid::smpi::Datatype::decode(action[datatype_index])
747 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
749 for (unsigned int i = 0; i < comm_size; i++) {
750 (*recvcounts)[i] = std::stoi(action[i + 3]);
752 int recv_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
753 void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
755 int my_proc_id = Actor::self()->getPid();
757 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
758 new simgrid::instr::VarCollTIData("allGatherV", -1, sendcount, nullptr, -1, recvcounts,
759 Datatype::encode(MPI_CURRENT_TYPE),
760 Datatype::encode(MPI_CURRENT_TYPE2)));
762 Colls::allgatherv(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts->data(), disps.data(), MPI_CURRENT_TYPE2,
765 TRACE_smpi_comm_out(my_proc_id);
766 log_timed_action (action, clock);
769 static void action_allToAllv(simgrid::xbt::ReplayAction& action)
771 /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
772 0 allToAllV 100 1 7 10 12 100 1 70 10 5
774 1) 100 is the size of the send buffer *sizeof(int),
775 2) 1 7 10 12 is the sendcounts array
776 3) 100*sizeof(int) is the size of the receiver buffer
777 4) 1 70 10 5 is the recvcounts array
779 double clock = smpi_process()->simulated_elapsed();
781 unsigned long comm_size = MPI_COMM_WORLD->size();
782 CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
783 std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
784 std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
785 std::vector<int> senddisps(comm_size, 0);
786 std::vector<int> recvdisps(comm_size, 0);
788 MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 5 + 2 * comm_size)
789 ? simgrid::smpi::Datatype::decode(action[4 + 2 * comm_size])
791 MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5 + 2 * comm_size)
792 ? simgrid::smpi::Datatype::decode(action[5 + 2 * comm_size])
795 int send_buf_size=parse_double(action[2]);
796 int recv_buf_size=parse_double(action[3+comm_size]);
797 int my_proc_id = Actor::self()->getPid();
798 void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* MPI_CURRENT_TYPE->size());
799 void *recvbuf = smpi_get_tmp_recvbuffer(recv_buf_size* MPI_CURRENT_TYPE2->size());
801 for (unsigned int i = 0; i < comm_size; i++) {
802 (*sendcounts)[i] = std::stoi(action[3 + i]);
803 (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
805 int send_size = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
806 int recv_size = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
808 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
809 new simgrid::instr::VarCollTIData("allToAllV", -1, send_size, sendcounts, recv_size, recvcounts,
810 Datatype::encode(MPI_CURRENT_TYPE),
811 Datatype::encode(MPI_CURRENT_TYPE2)));
813 Colls::alltoallv(sendbuf, sendcounts->data(), senddisps.data(), MPI_CURRENT_TYPE, recvbuf, recvcounts->data(),
814 recvdisps.data(), MPI_CURRENT_TYPE, MPI_COMM_WORLD);
816 TRACE_smpi_comm_out(my_proc_id);
817 log_timed_action (action, clock);
820 }} // namespace simgrid::smpi
822 /** @brief Only initialize the replay, don't do it for real */
823 void smpi_replay_init(int* argc, char*** argv)
825 simgrid::smpi::Process::init(argc, argv);
826 smpi_process()->mark_as_initialized();
827 smpi_process()->set_replaying(true);
829 int my_proc_id = Actor::self()->getPid();
830 TRACE_smpi_init(my_proc_id);
831 TRACE_smpi_computing_init(my_proc_id);
832 TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
833 TRACE_smpi_comm_out(my_proc_id);
834 xbt_replay_action_register("init", simgrid::smpi::action_init);
835 xbt_replay_action_register("finalize", simgrid::smpi::action_finalize);
836 xbt_replay_action_register("comm_size", simgrid::smpi::action_comm_size);
837 xbt_replay_action_register("comm_split", simgrid::smpi::action_comm_split);
838 xbt_replay_action_register("comm_dup", simgrid::smpi::action_comm_dup);
839 xbt_replay_action_register("send", simgrid::smpi::action_send);
840 xbt_replay_action_register("Isend", simgrid::smpi::action_Isend);
841 xbt_replay_action_register("recv", simgrid::smpi::action_recv);
842 xbt_replay_action_register("Irecv", simgrid::smpi::action_Irecv);
843 xbt_replay_action_register("test", simgrid::smpi::action_test);
844 xbt_replay_action_register("wait", simgrid::smpi::action_wait);
845 xbt_replay_action_register("waitAll", simgrid::smpi::action_waitall);
846 xbt_replay_action_register("barrier", simgrid::smpi::action_barrier);
847 xbt_replay_action_register("bcast", simgrid::smpi::action_bcast);
848 xbt_replay_action_register("reduce", simgrid::smpi::action_reduce);
849 xbt_replay_action_register("allReduce", simgrid::smpi::action_allReduce);
850 xbt_replay_action_register("allToAll", simgrid::smpi::action_allToAll);
851 xbt_replay_action_register("allToAllV", simgrid::smpi::action_allToAllv);
852 xbt_replay_action_register("gather", simgrid::smpi::action_gather);
853 xbt_replay_action_register("scatter", simgrid::smpi::action_scatter);
854 xbt_replay_action_register("gatherV", simgrid::smpi::action_gatherv);
855 xbt_replay_action_register("scatterV", simgrid::smpi::action_scatterv);
856 xbt_replay_action_register("allGather", simgrid::smpi::action_allgather);
857 xbt_replay_action_register("allGatherV", simgrid::smpi::action_allgatherv);
858 xbt_replay_action_register("reduceScatter", simgrid::smpi::action_reducescatter);
859 xbt_replay_action_register("compute", simgrid::smpi::action_compute);
861 //if we have a delayed start, sleep here.
863 double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
864 XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
865 smpi_execute_flops(value);
867 //UGLY: force a context switch to be sure that all MSG_processes begin initialization
868 XBT_DEBUG("Force context switch by smpi_execute_flops - Sleeping for 0.0 flops ");
869 smpi_execute_flops(0.0);
873 /** @brief actually run the replay after initialization */
874 void smpi_replay_main(int* argc, char*** argv)
876 simgrid::xbt::replay_runner(*argc, *argv);
878 /* and now, finalize everything */
879 /* One active process will stop. Decrease the counter*/
880 XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
881 if (not get_reqq_self()->empty()) {
882 unsigned int count_requests=get_reqq_self()->size();
883 MPI_Request requests[count_requests];
884 MPI_Status status[count_requests];
887 for (auto const& req : *get_reqq_self()) {
891 simgrid::smpi::Request::waitall(count_requests, requests, status);
893 delete get_reqq_self();
896 if(active_processes==0){
897 /* Last process alive speaking: end the simulated timer */
898 XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
899 smpi_free_replay_tmp_buffers();
902 TRACE_smpi_comm_in(Actor::self()->getPid(), "smpi_replay_run_finalize", new simgrid::instr::NoOpTIData("finalize"));
904 smpi_process()->finalize();
906 TRACE_smpi_comm_out(Actor::self()->getPid());
907 TRACE_smpi_finalize(Actor::self()->getPid());
910 /** @brief chain a replay initialization and a replay start */
911 void smpi_replay_run(int* argc, char*** argv)
913 smpi_replay_init(argc, argv);
914 smpi_replay_main(argc, argv);