1 /* Copyright (c) 2009-2021. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
6 #include "smpi_coll.hpp"
7 #include "smpi_comm.hpp"
8 #include "smpi_datatype.hpp"
9 #include "smpi_group.hpp"
10 #include "smpi_request.hpp"
11 #include "xbt/replay.hpp"
12 #include <simgrid/smpi/smpi_replay.hpp>
13 #include <src/smpi/include/private.hpp>
17 #include <unordered_map>
22 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay, smpi, "Trace Replay with SMPI");
24 // From https://stackoverflow.com/questions/7110301/generic-hash-for-tuples-in-unordered-map-unordered-set
25 // This is all just to make std::unordered_map work with std::tuple. If we need this in other places,
26 // this could go into a header file.
27 namespace hash_tuple {
28 template <typename TT> class hash {
30 size_t operator()(TT const& tt) const { return std::hash<TT>()(tt); }
33 template <class T> inline void hash_combine(std::size_t& seed, T const& v)
35 seed ^= hash_tuple::hash<T>()(v) + 0x9e3779b9 + (seed << 6) + (seed >> 2);
38 // Recursive template code derived from Matthieu M.
39 template <class Tuple, size_t Index = std::tuple_size<Tuple>::value - 1> class HashValueImpl {
41 static void apply(size_t& seed, Tuple const& tuple)
43 HashValueImpl<Tuple, Index - 1>::apply(seed, tuple);
44 hash_combine(seed, std::get<Index>(tuple));
48 template <class Tuple> class HashValueImpl<Tuple, 0> {
50 static void apply(size_t& seed, Tuple const& tuple) { hash_combine(seed, std::get<0>(tuple)); }
53 template <typename... TT> class hash<std::tuple<TT...>> {
55 size_t operator()(std::tuple<TT...> const& tt) const
58 HashValueImpl<std::tuple<TT...>>::apply(seed, tt);
64 using req_key_t = std::tuple</*sender*/ int, /* receiver */ int, /* tag */ int>;
65 using req_storage_t = std::unordered_map<req_key_t, MPI_Request, hash_tuple::hash<std::tuple<int, int, int>>>;
67 void log_timed_action(const simgrid::xbt::ReplayAction& action, double clock)
69 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
70 std::string s = boost::algorithm::join(action, " ");
71 XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
76 static double parse_double(const std::string& string)
78 return xbt_str_parse_double(string.c_str(), "%s is not a double");
85 MPI_Datatype MPI_DEFAULT_TYPE;
87 class RequestStorage {
92 RequestStorage() = default;
93 int size() const { return store.size(); }
95 req_storage_t& get_store() { return store; }
97 void get_requests(std::vector<MPI_Request>& vec) const
99 for (auto const& pair : store) {
100 auto& req = pair.second;
101 aid_t my_proc_id = simgrid::s4u::this_actor::get_pid();
102 if (req != MPI_REQUEST_NULL && (req->src() == my_proc_id || req->dst() == my_proc_id)) {
103 vec.push_back(pair.second);
104 pair.second->print_request("MM");
109 MPI_Request find(int src, int dst, int tag)
111 auto it = store.find(req_key_t(src, dst, tag));
112 return (it == store.end()) ? MPI_REQUEST_NULL : it->second;
115 void remove(const Request* req)
117 if (req == MPI_REQUEST_NULL) return;
119 store.erase(req_key_t(req->src()-1, req->dst()-1, req->tag()));
122 void add(MPI_Request req)
124 if (req != MPI_REQUEST_NULL) // Can and does happen in the case of TestAction
125 store.insert({req_key_t(req->src()-1, req->dst()-1, req->tag()), req});
128 /* Sometimes we need to re-insert MPI_REQUEST_NULL but we still need src,dst and tag */
129 void addNullRequest(int src, int dst, int tag)
131 store.insert({req_key_t(MPI_COMM_WORLD->group()->actor(src) - 1, MPI_COMM_WORLD->group()->actor(dst) - 1, tag),
136 void WaitTestParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
138 CHECK_ACTION_PARAMS(action, 3, 0)
139 src = std::stoi(action[2]);
140 dst = std::stoi(action[3]);
141 tag = std::stoi(action[4]);
144 void SendRecvParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
146 CHECK_ACTION_PARAMS(action, 3, 1)
147 partner = std::stoi(action[2]);
148 tag = std::stoi(action[3]);
149 size = parse_double(action[4]);
150 if (action.size() > 5)
151 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
154 void ComputeParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
156 CHECK_ACTION_PARAMS(action, 1, 0)
157 flops = parse_double(action[2]);
160 void SleepParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
162 CHECK_ACTION_PARAMS(action, 1, 0)
163 time = parse_double(action[2]);
166 void LocationParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
168 CHECK_ACTION_PARAMS(action, 2, 0)
169 filename = std::string(action[2]);
170 line = std::stoi(action[3]);
173 void BcastArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
175 CHECK_ACTION_PARAMS(action, 1, 2)
176 size = parse_double(action[2]);
177 root = (action.size() > 3) ? std::stoi(action[3]) : 0;
178 if (action.size() > 4)
179 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
182 void ReduceArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
184 CHECK_ACTION_PARAMS(action, 2, 2)
185 comm_size = parse_double(action[2]);
186 comp_size = parse_double(action[3]);
187 root = (action.size() > 4) ? std::stoi(action[4]) : 0;
188 if (action.size() > 5)
189 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
192 void AllReduceArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
194 CHECK_ACTION_PARAMS(action, 2, 1)
195 comm_size = parse_double(action[2]);
196 comp_size = parse_double(action[3]);
197 if (action.size() > 4)
198 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
201 void AllToAllArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
203 CHECK_ACTION_PARAMS(action, 2, 1)
204 comm_size = MPI_COMM_WORLD->size();
205 send_size = parse_double(action[2]);
206 recv_size = parse_double(action[3]);
208 if (action.size() > 4)
209 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
210 if (action.size() > 5)
211 datatype2 = simgrid::smpi::Datatype::decode(action[5]);
214 void GatherArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string& name)
216 /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
219 1) 68 is the sendcounts
220 2) 68 is the recvcounts
221 3) 0 is the root node
222 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
223 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
225 CHECK_ACTION_PARAMS(action, 2, 3)
226 comm_size = MPI_COMM_WORLD->size();
227 send_size = parse_double(action[2]);
228 recv_size = parse_double(action[3]);
230 if (name == "gather") {
231 root = (action.size() > 4) ? std::stoi(action[4]) : 0;
232 if (action.size() > 5)
233 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
234 if (action.size() > 6)
235 datatype2 = simgrid::smpi::Datatype::decode(action[6]);
237 if (action.size() > 4)
238 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
239 if (action.size() > 5)
240 datatype2 = simgrid::smpi::Datatype::decode(action[5]);
244 void GatherVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string& name)
246 /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
247 0 gather 68 68 10 10 10 0 0 0
249 1) 68 is the sendcount
250 2) 68 10 10 10 is the recvcounts
251 3) 0 is the root node
252 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
253 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
255 comm_size = MPI_COMM_WORLD->size();
256 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
257 send_size = parse_double(action[2]);
258 disps = std::vector<int>(comm_size, 0);
259 recvcounts = std::make_shared<std::vector<int>>(comm_size);
261 if (name == "gatherv") {
262 root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
263 if (action.size() > 4 + comm_size)
264 datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
265 if (action.size() > 5 + comm_size)
266 datatype2 = simgrid::smpi::Datatype::decode(action[5 + comm_size]);
269 /* The 3 comes from "0 gather <sendcount>", which must always be present.
270 * The + comm_size is the recvcounts array, which must also be present
272 if (action.size() > 3 + comm_size + comm_size) { /* datatype + disp are specified */
273 int datatype_index = 3 + comm_size;
274 disp_index = datatype_index + 1;
275 datatype1 = simgrid::smpi::Datatype::decode(action[datatype_index]);
276 datatype2 = simgrid::smpi::Datatype::decode(action[datatype_index]);
277 } else if (action.size() >
278 3 + comm_size + 2) { /* disps specified; datatype is not specified; use the default one */
279 disp_index = 3 + comm_size;
280 } else if (action.size() > 3 + comm_size) { /* only datatype, no disp specified */
281 int datatype_index = 3 + comm_size;
282 datatype1 = simgrid::smpi::Datatype::decode(action[datatype_index]);
283 datatype2 = simgrid::smpi::Datatype::decode(action[datatype_index]);
286 if (disp_index != 0) {
287 for (unsigned int i = 0; i < comm_size; i++)
288 disps[i] = std::stoi(action[disp_index + i]);
292 for (unsigned int i = 0; i < comm_size; i++) {
293 (*recvcounts)[i] = std::stoi(action[i + 3]);
295 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
298 void ScatterArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
300 /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
303 1) 68 is the sendcounts
304 2) 68 is the recvcounts
305 3) 0 is the root node
306 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
307 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
309 CHECK_ACTION_PARAMS(action, 2, 3)
310 comm_size = MPI_COMM_WORLD->size();
311 send_size = parse_double(action[2]);
312 recv_size = parse_double(action[3]);
313 root = (action.size() > 4) ? std::stoi(action[4]) : 0;
314 if (action.size() > 5)
315 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
316 if (action.size() > 6)
317 datatype2 = simgrid::smpi::Datatype::decode(action[6]);
320 void ScatterVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
322 /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
323 0 gather 68 10 10 10 68 0 0 0
325 1) 68 10 10 10 is the sendcounts
326 2) 68 is the recvcount
327 3) 0 is the root node
328 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
329 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
331 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
332 recv_size = parse_double(action[2 + comm_size]);
333 disps = std::vector<int>(comm_size, 0);
334 sendcounts = std::make_shared<std::vector<int>>(comm_size);
336 if (action.size() > 5 + comm_size)
337 datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
338 if (action.size() > 5 + comm_size)
339 datatype2 = simgrid::smpi::Datatype::decode(action[5]);
341 for (unsigned int i = 0; i < comm_size; i++) {
342 (*sendcounts)[i] = std::stoi(action[i + 2]);
344 send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
345 root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
348 void ReduceScatterArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
350 /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
351 0 reducescatter 275427 275427 275427 204020 11346849 0
353 1) The first four values after the name of the action declare the recvcounts array
354 2) The value 11346849 is the amount of instructions
355 3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
357 comm_size = MPI_COMM_WORLD->size();
358 CHECK_ACTION_PARAMS(action, comm_size + 1, 1)
359 comp_size = parse_double(action[2 + comm_size]);
360 recvcounts = std::make_shared<std::vector<int>>(comm_size);
361 if (action.size() > 3 + comm_size)
362 datatype1 = simgrid::smpi::Datatype::decode(action[3 + comm_size]);
364 for (unsigned int i = 0; i < comm_size; i++) {
365 recvcounts->push_back(std::stoi(action[i + 2]));
367 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
370 void AllToAllVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
372 /* The structure of the alltoallv action for the rank 0 (total 4 processes) is the following:
373 0 alltoallv 100 1 7 10 12 100 1 70 10 5
375 1) 100 is the size of the send buffer *sizeof(int),
376 2) 1 7 10 12 is the sendcounts array
377 3) 100*sizeof(int) is the size of the receiver buffer
378 4) 1 70 10 5 is the recvcounts array
380 comm_size = MPI_COMM_WORLD->size();
381 CHECK_ACTION_PARAMS(action, 2 * comm_size + 2, 2)
382 sendcounts = std::make_shared<std::vector<int>>(comm_size);
383 recvcounts = std::make_shared<std::vector<int>>(comm_size);
384 senddisps = std::vector<int>(comm_size, 0);
385 recvdisps = std::vector<int>(comm_size, 0);
387 if (action.size() > 5 + 2 * comm_size)
388 datatype1 = simgrid::smpi::Datatype::decode(action[4 + 2 * comm_size]);
389 if (action.size() > 5 + 2 * comm_size)
390 datatype2 = simgrid::smpi::Datatype::decode(action[5 + 2 * comm_size]);
392 send_buf_size = parse_double(action[2]);
393 recv_buf_size = parse_double(action[3 + comm_size]);
394 for (unsigned int i = 0; i < comm_size; i++) {
395 (*sendcounts)[i] = std::stoi(action[3 + i]);
396 (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
398 send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
399 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
402 void WaitAction::kernel(simgrid::xbt::ReplayAction& action)
404 std::string s = boost::algorithm::join(action, " ");
405 xbt_assert(req_storage.size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
406 const WaitTestParser& args = get_args();
407 MPI_Request request = req_storage.find(args.src, args.dst, args.tag);
408 req_storage.remove(request);
410 if (request == MPI_REQUEST_NULL) {
411 /* Assume that the trace is well formed, meaning the comm might have been caught by an MPI_test. Then just
416 aid_t rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
418 // Must be taken before Request::wait() since the request may be set to
419 // MPI_REQUEST_NULL by Request::wait!
420 bool is_wait_for_receive = (request->flags() & MPI_REQ_RECV);
421 // TODO: Here we take the rank while we normally take the process id (look for get_pid())
422 TRACE_smpi_comm_in(rank, __func__, new simgrid::instr::WaitTIData(args.src, args.dst, args.tag));
425 Request::wait(&request, &status);
427 TRACE_smpi_comm_out(rank);
428 if (is_wait_for_receive)
429 TRACE_smpi_recv(args.src, args.dst, args.tag);
432 void SendAction::kernel(simgrid::xbt::ReplayAction&)
434 const SendRecvParser& args = get_args();
435 aid_t dst_traced = MPI_COMM_WORLD->group()->actor(args.partner);
439 new simgrid::instr::Pt2PtTIData(get_name(), args.partner, args.size, args.tag, Datatype::encode(args.datatype1)));
440 if (not TRACE_smpi_view_internals())
441 TRACE_smpi_send(get_pid(), get_pid(), dst_traced, args.tag, args.size * args.datatype1->size());
443 if (get_name() == "send") {
444 Request::send(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
445 } else if (get_name() == "isend") {
446 MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
447 req_storage.add(request);
449 xbt_die("Don't know this action, %s", get_name().c_str());
452 TRACE_smpi_comm_out(get_pid());
455 void RecvAction::kernel(simgrid::xbt::ReplayAction&)
457 const SendRecvParser& args = get_args();
460 new simgrid::instr::Pt2PtTIData(get_name(), args.partner, args.size, args.tag, Datatype::encode(args.datatype1)));
463 // unknown size from the receiver point of view
464 double arg_size = args.size;
465 if (arg_size <= 0.0) {
466 Request::probe(args.partner, args.tag, MPI_COMM_WORLD, &status);
467 arg_size = status.count;
470 bool is_recv = false; // Help analyzers understanding that status is not used uninitialized
471 if (get_name() == "recv") {
473 Request::recv(nullptr, arg_size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD, &status);
474 } else if (get_name() == "irecv") {
475 MPI_Request request = Request::irecv(nullptr, arg_size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
476 req_storage.add(request);
481 TRACE_smpi_comm_out(get_pid());
482 if (is_recv && not TRACE_smpi_view_internals()) {
483 aid_t src_traced = MPI_COMM_WORLD->group()->actor(status.MPI_SOURCE);
484 TRACE_smpi_recv(src_traced, get_pid(), args.tag);
488 void ComputeAction::kernel(simgrid::xbt::ReplayAction&)
490 const ComputeParser& args = get_args();
491 if (smpi_cfg_simulate_computation()) {
492 smpi_execute_flops(args.flops/smpi_adjust_comp_speed());
496 void SleepAction::kernel(simgrid::xbt::ReplayAction&)
498 const SleepParser& args = get_args();
499 XBT_DEBUG("Sleep for: %lf secs", args.time);
500 aid_t pid = simgrid::s4u::this_actor::get_pid();
501 TRACE_smpi_sleeping_in(pid, args.time);
502 simgrid::s4u::this_actor::sleep_for(args.time/smpi_adjust_comp_speed());
503 TRACE_smpi_sleeping_out(pid);
506 void LocationAction::kernel(simgrid::xbt::ReplayAction&)
508 const LocationParser& args = get_args();
509 smpi_trace_set_call_location(args.filename.c_str(), args.line);
512 void TestAction::kernel(simgrid::xbt::ReplayAction&)
514 const WaitTestParser& args = get_args();
515 MPI_Request request = req_storage.find(args.src, args.dst, args.tag);
516 req_storage.remove(request);
517 // if request is null here, this may mean that a previous test has succeeded
518 // Different times in traced application and replayed version may lead to this
519 // In this case, ignore the extra calls.
520 if (request != MPI_REQUEST_NULL) {
521 TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::NoOpTIData("test"));
525 Request::test(&request, &status, &flag);
527 XBT_DEBUG("MPI_Test result: %d", flag);
528 /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
530 if (request == MPI_REQUEST_NULL)
531 req_storage.addNullRequest(args.src, args.dst, args.tag);
533 req_storage.add(request);
535 TRACE_smpi_comm_out(get_pid());
539 void InitAction::kernel(simgrid::xbt::ReplayAction& action)
541 CHECK_ACTION_PARAMS(action, 0, 1)
542 MPI_DEFAULT_TYPE = (action.size() > 2) ? MPI_DOUBLE // default MPE datatype
543 : MPI_BYTE; // default TAU datatype
545 /* start a simulated timer */
546 smpi_process()->simulated_start();
549 void CommunicatorAction::kernel(simgrid::xbt::ReplayAction&)
554 void WaitAllAction::kernel(simgrid::xbt::ReplayAction&)
556 const unsigned int count_requests = req_storage.size();
558 if (count_requests > 0) {
559 TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::Pt2PtTIData("waitall", -1, count_requests, ""));
560 std::vector<std::pair</*sender*/ aid_t, /*recv*/ aid_t>> sender_receiver;
561 std::vector<MPI_Request> reqs;
562 req_storage.get_requests(reqs);
563 for (auto const& req : reqs) {
564 if (req && (req->flags() & MPI_REQ_RECV)) {
565 sender_receiver.emplace_back(req->src(), req->dst());
568 Request::waitall(count_requests, &(reqs.data())[0], MPI_STATUSES_IGNORE);
569 req_storage.get_store().clear();
571 for (auto const& pair : sender_receiver) {
572 TRACE_smpi_recv(pair.first, pair.second, 0);
574 TRACE_smpi_comm_out(get_pid());
578 void BarrierAction::kernel(simgrid::xbt::ReplayAction&)
580 TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::NoOpTIData("barrier"));
581 colls::barrier(MPI_COMM_WORLD);
582 TRACE_smpi_comm_out(get_pid());
585 void BcastAction::kernel(simgrid::xbt::ReplayAction&)
587 const BcastArgParser& args = get_args();
588 TRACE_smpi_comm_in(get_pid(), "action_bcast",
589 new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(args.root), -1.0, args.size,
590 -1, Datatype::encode(args.datatype1), ""));
592 colls::bcast(send_buffer(args.size * args.datatype1->size()), args.size, args.datatype1, args.root, MPI_COMM_WORLD);
594 TRACE_smpi_comm_out(get_pid());
597 void ReduceAction::kernel(simgrid::xbt::ReplayAction&)
599 const ReduceArgParser& args = get_args();
600 TRACE_smpi_comm_in(get_pid(), "action_reduce",
601 new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(args.root), args.comp_size,
602 args.comm_size, -1, Datatype::encode(args.datatype1), ""));
604 colls::reduce(send_buffer(args.comm_size * args.datatype1->size()),
605 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL,
606 args.root, MPI_COMM_WORLD);
607 private_execute_flops(args.comp_size);
609 TRACE_smpi_comm_out(get_pid());
612 void AllReduceAction::kernel(simgrid::xbt::ReplayAction&)
614 const AllReduceArgParser& args = get_args();
615 TRACE_smpi_comm_in(get_pid(), "action_allreduce",
616 new simgrid::instr::CollTIData("allreduce", -1, args.comp_size, args.comm_size, -1,
617 Datatype::encode(args.datatype1), ""));
619 colls::allreduce(send_buffer(args.comm_size * args.datatype1->size()),
620 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL,
622 private_execute_flops(args.comp_size);
624 TRACE_smpi_comm_out(get_pid());
627 void AllToAllAction::kernel(simgrid::xbt::ReplayAction&)
629 const AllToAllArgParser& args = get_args();
630 TRACE_smpi_comm_in(get_pid(), "action_alltoall",
631 new simgrid::instr::CollTIData("alltoall", -1, -1.0, args.send_size, args.recv_size,
632 Datatype::encode(args.datatype1),
633 Datatype::encode(args.datatype2)));
635 colls::alltoall(send_buffer(args.send_size * args.comm_size * args.datatype1->size()), args.send_size, args.datatype1,
636 recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()), args.recv_size, args.datatype2,
639 TRACE_smpi_comm_out(get_pid());
642 void GatherAction::kernel(simgrid::xbt::ReplayAction&)
644 const GatherArgParser& args = get_args();
645 TRACE_smpi_comm_in(get_pid(), get_name().c_str(),
646 new simgrid::instr::CollTIData(get_name(), (get_name() == "gather") ? args.root : -1, -1.0,
647 args.send_size, args.recv_size, Datatype::encode(args.datatype1),
648 Datatype::encode(args.datatype2)));
650 if (get_name() == "gather") {
651 int rank = MPI_COMM_WORLD->rank();
652 colls::gather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
653 (rank == args.root) ? recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()) : nullptr,
654 args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
656 colls::allgather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
657 recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2,
660 TRACE_smpi_comm_out(get_pid());
663 void GatherVAction::kernel(simgrid::xbt::ReplayAction&)
665 int rank = MPI_COMM_WORLD->rank();
666 const GatherVArgParser& args = get_args();
667 TRACE_smpi_comm_in(get_pid(), get_name().c_str(),
668 new simgrid::instr::VarCollTIData(
669 get_name(), (get_name() == "gatherv") ? args.root : -1, args.send_size, nullptr, -1,
670 args.recvcounts, Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
672 if (get_name() == "gatherv") {
673 colls::gatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
674 (rank == args.root) ? recv_buffer(args.recv_size_sum * args.datatype2->size()) : nullptr,
675 args.recvcounts->data(), args.disps.data(), args.datatype2, args.root, MPI_COMM_WORLD);
677 colls::allgatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
678 recv_buffer(args.recv_size_sum * args.datatype2->size()), args.recvcounts->data(),
679 args.disps.data(), args.datatype2, MPI_COMM_WORLD);
682 TRACE_smpi_comm_out(get_pid());
685 void ScatterAction::kernel(simgrid::xbt::ReplayAction&)
687 int rank = MPI_COMM_WORLD->rank();
688 const ScatterArgParser& args = get_args();
689 TRACE_smpi_comm_in(get_pid(), "action_scatter",
690 new simgrid::instr::CollTIData(get_name(), args.root, -1.0, args.send_size, args.recv_size,
691 Datatype::encode(args.datatype1),
692 Datatype::encode(args.datatype2)));
694 colls::scatter(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
695 (rank == args.root) ? recv_buffer(args.recv_size * args.datatype2->size()) : nullptr, args.recv_size,
696 args.datatype2, args.root, MPI_COMM_WORLD);
698 TRACE_smpi_comm_out(get_pid());
701 void ScatterVAction::kernel(simgrid::xbt::ReplayAction&)
703 int rank = MPI_COMM_WORLD->rank();
704 const ScatterVArgParser& args = get_args();
705 TRACE_smpi_comm_in(get_pid(), "action_scatterv",
706 new simgrid::instr::VarCollTIData(get_name(), args.root, -1, args.sendcounts, args.recv_size,
707 nullptr, Datatype::encode(args.datatype1),
708 Datatype::encode(args.datatype2)));
710 colls::scatterv((rank == args.root) ? send_buffer(args.send_size_sum * args.datatype1->size()) : nullptr,
711 args.sendcounts->data(), args.disps.data(), args.datatype1,
712 recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, args.root,
715 TRACE_smpi_comm_out(get_pid());
718 void ReduceScatterAction::kernel(simgrid::xbt::ReplayAction&)
720 const ReduceScatterArgParser& args = get_args();
722 get_pid(), "action_reducescatter",
723 new simgrid::instr::VarCollTIData("reducescatter", -1, 0, nullptr, -1, args.recvcounts,
724 std::to_string(args.comp_size), /* ugly hack to print comp_size */
725 Datatype::encode(args.datatype1)));
727 colls::reduce_scatter(send_buffer(args.recv_size_sum * args.datatype1->size()),
728 recv_buffer(args.recv_size_sum * args.datatype1->size()), args.recvcounts->data(),
729 args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
731 private_execute_flops(args.comp_size);
732 TRACE_smpi_comm_out(get_pid());
735 void AllToAllVAction::kernel(simgrid::xbt::ReplayAction&)
737 const AllToAllVArgParser& args = get_args();
738 TRACE_smpi_comm_in(get_pid(), __func__,
739 new simgrid::instr::VarCollTIData(
740 "alltoallv", -1, args.send_size_sum, args.sendcounts, args.recv_size_sum, args.recvcounts,
741 Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
743 colls::alltoallv(send_buffer(args.send_buf_size * args.datatype1->size()), args.sendcounts->data(),
744 args.senddisps.data(), args.datatype1, recv_buffer(args.recv_buf_size * args.datatype2->size()),
745 args.recvcounts->data(), args.recvdisps.data(), args.datatype2, MPI_COMM_WORLD);
747 TRACE_smpi_comm_out(get_pid());
749 } // Replay Namespace
750 }} // namespace simgrid::smpi
752 static std::unordered_map<aid_t, simgrid::smpi::replay::RequestStorage> storage;
753 /** @brief Only initialize the replay, don't do it for real */
754 void smpi_replay_init(const char* instance_id, int rank, double start_delay_flops)
756 xbt_assert(not smpi_process()->initializing());
758 simgrid::s4u::Actor::self()->set_property("instance_id", instance_id);
759 simgrid::s4u::Actor::self()->set_property("rank", std::to_string(rank));
760 simgrid::smpi::ActorExt::init();
762 smpi_process()->mark_as_initialized();
763 smpi_process()->set_replaying(true);
765 TRACE_smpi_init(simgrid::s4u::this_actor::get_pid(), "smpi_replay_run_init");
766 xbt_replay_action_register("init", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::InitAction().execute(action); });
767 xbt_replay_action_register("finalize", [](simgrid::xbt::ReplayAction const&) { /* nothing to do */ });
768 xbt_replay_action_register("comm_size", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
769 xbt_replay_action_register("comm_split",[](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
770 xbt_replay_action_register("comm_dup", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
771 xbt_replay_action_register("send", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("send", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
772 xbt_replay_action_register("isend", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("isend", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
773 xbt_replay_action_register("recv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("recv", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
774 xbt_replay_action_register("irecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("irecv", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
775 xbt_replay_action_register("test", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::TestAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
776 xbt_replay_action_register("wait", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
777 xbt_replay_action_register("waitall", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAllAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
778 xbt_replay_action_register("barrier", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BarrierAction().execute(action); });
779 xbt_replay_action_register("bcast", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BcastAction().execute(action); });
780 xbt_replay_action_register("reduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceAction().execute(action); });
781 xbt_replay_action_register("allreduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllReduceAction().execute(action); });
782 xbt_replay_action_register("alltoall", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllAction().execute(action); });
783 xbt_replay_action_register("alltoallv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllVAction().execute(action); });
784 xbt_replay_action_register("gather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("gather").execute(action); });
785 xbt_replay_action_register("scatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterAction().execute(action); });
786 xbt_replay_action_register("gatherv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("gatherv").execute(action); });
787 xbt_replay_action_register("scatterv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterVAction().execute(action); });
788 xbt_replay_action_register("allgather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("allgather").execute(action); });
789 xbt_replay_action_register("allgatherv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("allgatherv").execute(action); });
790 xbt_replay_action_register("reducescatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceScatterAction().execute(action); });
791 xbt_replay_action_register("compute", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ComputeAction().execute(action); });
792 xbt_replay_action_register("sleep", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SleepAction().execute(action); });
793 xbt_replay_action_register("location", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::LocationAction().execute(action); });
795 //if we have a delayed start, sleep here.
796 if (start_delay_flops > 0) {
797 XBT_VERB("Delayed start for instance - Sleeping for %f flops ", start_delay_flops);
798 private_execute_flops(start_delay_flops);
800 // Wait for the other actors to initialize also
801 simgrid::s4u::this_actor::yield();
805 /** @brief actually run the replay after initialization */
806 void smpi_replay_main(int rank, const char* private_trace_filename)
808 static int active_processes = 0;
810 storage[simgrid::s4u::this_actor::get_pid()] = simgrid::smpi::replay::RequestStorage();
811 std::string rank_string = std::to_string(rank);
812 simgrid::xbt::replay_runner(rank_string.c_str(), private_trace_filename);
814 /* and now, finalize everything */
815 /* One active process will stop. Decrease the counter*/
816 unsigned int count_requests = storage[simgrid::s4u::this_actor::get_pid()].size();
817 XBT_DEBUG("There are %ud elements in reqq[*]", count_requests);
818 if (count_requests > 0) {
819 std::vector<MPI_Request> requests(count_requests);
822 for (auto const& pair : storage[simgrid::s4u::this_actor::get_pid()].get_store()) {
823 requests[i] = pair.second;
826 simgrid::smpi::Request::waitall(count_requests, requests.data(), MPI_STATUSES_IGNORE);
830 if(active_processes==0){
831 /* Last process alive speaking: end the simulated timer */
832 XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
833 smpi_free_replay_tmp_buffers();
836 TRACE_smpi_comm_in(simgrid::s4u::this_actor::get_pid(), "smpi_replay_run_finalize",
837 new simgrid::instr::NoOpTIData("finalize"));
839 smpi_process()->finalize();
841 TRACE_smpi_comm_out(simgrid::s4u::this_actor::get_pid());
844 /** @brief chain a replay initialization and a replay start */
845 void smpi_replay_run(const char* instance_id, int rank, double start_delay_flops, const char* private_trace_filename)
847 smpi_replay_init(instance_id, rank, start_delay_flops);
848 smpi_replay_main(rank, private_trace_filename);