1 /* Copyright (c) 2009-2020. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
6 #include "smpi_coll.hpp"
7 #include "smpi_comm.hpp"
8 #include "smpi_datatype.hpp"
9 #include "smpi_group.hpp"
10 #include "smpi_request.hpp"
11 #include "xbt/replay.hpp"
12 #include <simgrid/smpi/replay.hpp>
13 #include <src/smpi/include/private.hpp>
17 #include <unordered_map>
22 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay, smpi, "Trace Replay with SMPI");
24 // From https://stackoverflow.com/questions/7110301/generic-hash-for-tuples-in-unordered-map-unordered-set
25 // This is all just to make std::unordered_map work with std::tuple. If we need this in other places,
26 // this could go into a header file.
27 namespace hash_tuple {
28 template <typename TT> class hash {
30 size_t operator()(TT const& tt) const { return std::hash<TT>()(tt); }
33 template <class T> inline void hash_combine(std::size_t& seed, T const& v)
35 seed ^= hash_tuple::hash<T>()(v) + 0x9e3779b9 + (seed << 6) + (seed >> 2);
38 // Recursive template code derived from Matthieu M.
39 template <class Tuple, size_t Index = std::tuple_size<Tuple>::value - 1> class HashValueImpl {
41 static void apply(size_t& seed, Tuple const& tuple)
43 HashValueImpl<Tuple, Index - 1>::apply(seed, tuple);
44 hash_combine(seed, std::get<Index>(tuple));
48 template <class Tuple> class HashValueImpl<Tuple, 0> {
50 static void apply(size_t& seed, Tuple const& tuple) { hash_combine(seed, std::get<0>(tuple)); }
53 template <typename... TT> class hash<std::tuple<TT...>> {
55 size_t operator()(std::tuple<TT...> const& tt) const
58 HashValueImpl<std::tuple<TT...>>::apply(seed, tt);
64 typedef std::tuple</*sender*/ int, /* receiver */ int, /* tag */ int> req_key_t;
65 typedef std::unordered_map<req_key_t, MPI_Request, hash_tuple::hash<std::tuple<int,int,int>>> req_storage_t;
67 void log_timed_action(const simgrid::xbt::ReplayAction& action, double clock)
69 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
70 std::string s = boost::algorithm::join(action, " ");
71 XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
76 static double parse_double(const std::string& string)
78 return xbt_str_parse_double(string.c_str(), "%s is not a double");
85 MPI_Datatype MPI_DEFAULT_TYPE;
87 class RequestStorage {
98 req_storage_t& get_store()
103 void get_requests(std::vector<MPI_Request>& vec)
105 for (auto const& pair : store) {
106 auto& req = pair.second;
107 auto my_proc_id = simgrid::s4u::this_actor::get_pid();
108 if (req != MPI_REQUEST_NULL && (req->src() == my_proc_id || req->dst() == my_proc_id)) {
109 vec.push_back(pair.second);
110 pair.second->print_request("MM");
115 MPI_Request find(int src, int dst, int tag)
117 req_storage_t::iterator it = store.find(req_key_t(src, dst, tag));
118 return (it == store.end()) ? MPI_REQUEST_NULL : it->second;
121 void remove(MPI_Request req)
123 if (req == MPI_REQUEST_NULL) return;
125 store.erase(req_key_t(req->src()-1, req->dst()-1, req->tag()));
128 void add(MPI_Request req)
130 if (req != MPI_REQUEST_NULL) // Can and does happen in the case of TestAction
131 store.insert({req_key_t(req->src()-1, req->dst()-1, req->tag()), req});
134 /* Sometimes we need to re-insert MPI_REQUEST_NULL but we still need src,dst and tag */
135 void addNullRequest(int src, int dst, int tag)
137 store.insert({req_key_t(
138 MPI_COMM_WORLD->group()->actor(src)->get_pid()-1,
139 MPI_COMM_WORLD->group()->actor(dst)->get_pid()-1,
140 tag), MPI_REQUEST_NULL});
144 void WaitTestParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
146 CHECK_ACTION_PARAMS(action, 3, 0)
147 src = std::stoi(action[2]);
148 dst = std::stoi(action[3]);
149 tag = std::stoi(action[4]);
152 void SendRecvParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
154 CHECK_ACTION_PARAMS(action, 3, 1)
155 partner = std::stoi(action[2]);
156 tag = std::stoi(action[3]);
157 size = parse_double(action[4]);
158 if (action.size() > 5)
159 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
162 void ComputeParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
164 CHECK_ACTION_PARAMS(action, 1, 0)
165 flops = parse_double(action[2]);
168 void SleepParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
170 CHECK_ACTION_PARAMS(action, 1, 0)
171 time = parse_double(action[2]);
174 void LocationParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
176 CHECK_ACTION_PARAMS(action, 2, 0)
177 filename = std::string(action[2]);
178 line = std::stoi(action[3]);
181 void BcastArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
183 CHECK_ACTION_PARAMS(action, 1, 2)
184 size = parse_double(action[2]);
185 root = (action.size() > 3) ? std::stoi(action[3]) : 0;
186 if (action.size() > 4)
187 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
190 void ReduceArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
192 CHECK_ACTION_PARAMS(action, 2, 2)
193 comm_size = parse_double(action[2]);
194 comp_size = parse_double(action[3]);
195 root = (action.size() > 4) ? std::stoi(action[4]) : 0;
196 if (action.size() > 5)
197 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
200 void AllReduceArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
202 CHECK_ACTION_PARAMS(action, 2, 1)
203 comm_size = parse_double(action[2]);
204 comp_size = parse_double(action[3]);
205 if (action.size() > 4)
206 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
209 void AllToAllArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
211 CHECK_ACTION_PARAMS(action, 2, 1)
212 comm_size = MPI_COMM_WORLD->size();
213 send_size = parse_double(action[2]);
214 recv_size = parse_double(action[3]);
216 if (action.size() > 4)
217 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
218 if (action.size() > 5)
219 datatype2 = simgrid::smpi::Datatype::decode(action[5]);
222 void GatherArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string& name)
224 /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
227 1) 68 is the sendcounts
228 2) 68 is the recvcounts
229 3) 0 is the root node
230 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
231 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
233 CHECK_ACTION_PARAMS(action, 2, 3)
234 comm_size = MPI_COMM_WORLD->size();
235 send_size = parse_double(action[2]);
236 recv_size = parse_double(action[3]);
238 if (name == "gather") {
239 root = (action.size() > 4) ? std::stoi(action[4]) : 0;
240 if (action.size() > 5)
241 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
242 if (action.size() > 6)
243 datatype2 = simgrid::smpi::Datatype::decode(action[6]);
245 if (action.size() > 4)
246 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
247 if (action.size() > 5)
248 datatype2 = simgrid::smpi::Datatype::decode(action[5]);
252 void GatherVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string& name)
254 /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
255 0 gather 68 68 10 10 10 0 0 0
257 1) 68 is the sendcount
258 2) 68 10 10 10 is the recvcounts
259 3) 0 is the root node
260 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
261 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
263 comm_size = MPI_COMM_WORLD->size();
264 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
265 send_size = parse_double(action[2]);
266 disps = std::vector<int>(comm_size, 0);
267 recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
269 if (name == "gatherv") {
270 root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
271 if (action.size() > 4 + comm_size)
272 datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
273 if (action.size() > 5 + comm_size)
274 datatype2 = simgrid::smpi::Datatype::decode(action[5 + comm_size]);
277 /* The 3 comes from "0 gather <sendcount>", which must always be present.
278 * The + comm_size is the recvcounts array, which must also be present
280 if (action.size() > 3 + comm_size + comm_size) { /* datatype + disp are specified */
281 int datatype_index = 3 + comm_size;
282 disp_index = datatype_index + 1;
283 datatype1 = simgrid::smpi::Datatype::decode(action[datatype_index]);
284 datatype2 = simgrid::smpi::Datatype::decode(action[datatype_index]);
285 } else if (action.size() >
286 3 + comm_size + 2) { /* disps specified; datatype is not specified; use the default one */
287 disp_index = 3 + comm_size;
288 } else if (action.size() > 3 + comm_size) { /* only datatype, no disp specified */
289 int datatype_index = 3 + comm_size;
290 datatype1 = simgrid::smpi::Datatype::decode(action[datatype_index]);
291 datatype2 = simgrid::smpi::Datatype::decode(action[datatype_index]);
294 if (disp_index != 0) {
295 for (unsigned int i = 0; i < comm_size; i++)
296 disps[i] = std::stoi(action[disp_index + i]);
300 for (unsigned int i = 0; i < comm_size; i++) {
301 (*recvcounts)[i] = std::stoi(action[i + 3]);
303 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
306 void ScatterArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
308 /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
311 1) 68 is the sendcounts
312 2) 68 is the recvcounts
313 3) 0 is the root node
314 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
315 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
317 CHECK_ACTION_PARAMS(action, 2, 3)
318 comm_size = MPI_COMM_WORLD->size();
319 send_size = parse_double(action[2]);
320 recv_size = parse_double(action[3]);
321 root = (action.size() > 4) ? std::stoi(action[4]) : 0;
322 if (action.size() > 5)
323 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
324 if (action.size() > 6)
325 datatype2 = simgrid::smpi::Datatype::decode(action[6]);
328 void ScatterVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
330 /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
331 0 gather 68 10 10 10 68 0 0 0
333 1) 68 10 10 10 is the sendcounts
334 2) 68 is the recvcount
335 3) 0 is the root node
336 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
337 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
339 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
340 recv_size = parse_double(action[2 + comm_size]);
341 disps = std::vector<int>(comm_size, 0);
342 sendcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
344 if (action.size() > 5 + comm_size)
345 datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
346 if (action.size() > 5 + comm_size)
347 datatype2 = simgrid::smpi::Datatype::decode(action[5]);
349 for (unsigned int i = 0; i < comm_size; i++) {
350 (*sendcounts)[i] = std::stoi(action[i + 2]);
352 send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
353 root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
356 void ReduceScatterArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
358 /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
359 0 reducescatter 275427 275427 275427 204020 11346849 0
361 1) The first four values after the name of the action declare the recvcounts array
362 2) The value 11346849 is the amount of instructions
363 3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
365 comm_size = MPI_COMM_WORLD->size();
366 CHECK_ACTION_PARAMS(action, comm_size + 1, 1)
367 comp_size = parse_double(action[2 + comm_size]);
368 recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
369 if (action.size() > 3 + comm_size)
370 datatype1 = simgrid::smpi::Datatype::decode(action[3 + comm_size]);
372 for (unsigned int i = 0; i < comm_size; i++) {
373 recvcounts->push_back(std::stoi(action[i + 2]));
375 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
378 void AllToAllVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
380 /* The structure of the alltoallv action for the rank 0 (total 4 processes) is the following:
381 0 alltoallv 100 1 7 10 12 100 1 70 10 5
383 1) 100 is the size of the send buffer *sizeof(int),
384 2) 1 7 10 12 is the sendcounts array
385 3) 100*sizeof(int) is the size of the receiver buffer
386 4) 1 70 10 5 is the recvcounts array
388 comm_size = MPI_COMM_WORLD->size();
389 CHECK_ACTION_PARAMS(action, 2 * comm_size + 2, 2)
390 sendcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
391 recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
392 senddisps = std::vector<int>(comm_size, 0);
393 recvdisps = std::vector<int>(comm_size, 0);
395 if (action.size() > 5 + 2 * comm_size)
396 datatype1 = simgrid::smpi::Datatype::decode(action[4 + 2 * comm_size]);
397 if (action.size() > 5 + 2 * comm_size)
398 datatype2 = simgrid::smpi::Datatype::decode(action[5 + 2 * comm_size]);
400 send_buf_size = parse_double(action[2]);
401 recv_buf_size = parse_double(action[3 + comm_size]);
402 for (unsigned int i = 0; i < comm_size; i++) {
403 (*sendcounts)[i] = std::stoi(action[3 + i]);
404 (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
406 send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
407 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
410 void WaitAction::kernel(simgrid::xbt::ReplayAction& action)
412 std::string s = boost::algorithm::join(action, " ");
413 xbt_assert(req_storage.size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
414 const WaitTestParser& args = get_args();
415 MPI_Request request = req_storage.find(args.src, args.dst, args.tag);
416 req_storage.remove(request);
418 if (request == MPI_REQUEST_NULL) {
419 /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just
424 int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
426 // Must be taken before Request::wait() since the request may be set to
427 // MPI_REQUEST_NULL by Request::wait!
428 bool is_wait_for_receive = (request->flags() & MPI_REQ_RECV);
429 // TODO: Here we take the rank while we normally take the process id (look for get_pid())
430 TRACE_smpi_comm_in(rank, __func__, new simgrid::instr::WaitTIData(args.src, args.dst, args.tag));
433 Request::wait(&request, &status);
435 TRACE_smpi_comm_out(rank);
436 if (is_wait_for_receive)
437 TRACE_smpi_recv(args.src, args.dst, args.tag);
440 void SendAction::kernel(simgrid::xbt::ReplayAction&)
442 const SendRecvParser& args = get_args();
443 int dst_traced = MPI_COMM_WORLD->group()->actor(args.partner)->get_pid();
447 new simgrid::instr::Pt2PtTIData(get_name(), args.partner, args.size, args.tag, Datatype::encode(args.datatype1)));
448 if (not TRACE_smpi_view_internals())
449 TRACE_smpi_send(get_pid(), get_pid(), dst_traced, args.tag, args.size * args.datatype1->size());
451 if (get_name() == "send") {
452 Request::send(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
453 } else if (get_name() == "isend") {
454 MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
455 req_storage.add(request);
457 xbt_die("Don't know this action, %s", get_name().c_str());
460 TRACE_smpi_comm_out(get_pid());
463 void RecvAction::kernel(simgrid::xbt::ReplayAction&)
465 const SendRecvParser& args = get_args();
468 new simgrid::instr::Pt2PtTIData(get_name(), args.partner, args.size, args.tag, Datatype::encode(args.datatype1)));
471 // unknown size from the receiver point of view
472 double arg_size = args.size;
473 if (arg_size <= 0.0) {
474 Request::probe(args.partner, args.tag, MPI_COMM_WORLD, &status);
475 arg_size = status.count;
478 bool is_recv = false; // Help analyzers understanding that status is not used unintialized
479 if (get_name() == "recv") {
481 Request::recv(nullptr, arg_size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD, &status);
482 } else if (get_name() == "irecv") {
483 MPI_Request request = Request::irecv(nullptr, arg_size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
484 req_storage.add(request);
489 TRACE_smpi_comm_out(get_pid());
490 if (is_recv && not TRACE_smpi_view_internals()) {
491 int src_traced = MPI_COMM_WORLD->group()->actor(status.MPI_SOURCE)->get_pid();
492 TRACE_smpi_recv(src_traced, get_pid(), args.tag);
496 void ComputeAction::kernel(simgrid::xbt::ReplayAction&)
498 const ComputeParser& args = get_args();
499 if (smpi_cfg_simulate_computation()) {
500 smpi_execute_flops(args.flops/smpi_adjust_comp_speed());
504 void SleepAction::kernel(simgrid::xbt::ReplayAction&)
506 const SleepParser& args = get_args();
507 XBT_DEBUG("Sleep for: %lf secs", args.time);
508 int rank = simgrid::s4u::this_actor::get_pid();
509 TRACE_smpi_sleeping_in(rank, args.time);
510 simgrid::s4u::this_actor::sleep_for(args.time/smpi_adjust_comp_speed());
511 TRACE_smpi_sleeping_out(rank);
514 void LocationAction::kernel(simgrid::xbt::ReplayAction&)
516 const LocationParser& args = get_args();
517 smpi_trace_set_call_location(args.filename.c_str(), args.line);
520 void TestAction::kernel(simgrid::xbt::ReplayAction&)
522 const WaitTestParser& args = get_args();
523 MPI_Request request = req_storage.find(args.src, args.dst, args.tag);
524 req_storage.remove(request);
525 // if request is null here, this may mean that a previous test has succeeded
526 // Different times in traced application and replayed version may lead to this
527 // In this case, ignore the extra calls.
528 if (request != MPI_REQUEST_NULL) {
529 TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::NoOpTIData("test"));
533 Request::test(&request, &status, &flag);
535 XBT_DEBUG("MPI_Test result: %d", flag);
536 /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
538 if (request == MPI_REQUEST_NULL)
539 req_storage.addNullRequest(args.src, args.dst, args.tag);
541 req_storage.add(request);
543 TRACE_smpi_comm_out(get_pid());
547 void InitAction::kernel(simgrid::xbt::ReplayAction& action)
549 CHECK_ACTION_PARAMS(action, 0, 1)
550 MPI_DEFAULT_TYPE = (action.size() > 2) ? MPI_DOUBLE // default MPE datatype
551 : MPI_BYTE; // default TAU datatype
553 /* start a simulated timer */
554 smpi_process()->simulated_start();
557 void CommunicatorAction::kernel(simgrid::xbt::ReplayAction&)
562 void WaitAllAction::kernel(simgrid::xbt::ReplayAction&)
564 const unsigned int count_requests = req_storage.size();
566 if (count_requests > 0) {
567 TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::Pt2PtTIData("waitall", -1, count_requests, ""));
568 std::vector<std::pair</*sender*/int,/*recv*/int>> sender_receiver;
569 std::vector<MPI_Request> reqs;
570 req_storage.get_requests(reqs);
571 for (auto const& req : reqs) {
572 if (req && (req->flags() & MPI_REQ_RECV)) {
573 sender_receiver.push_back({req->src(), req->dst()});
576 Request::waitall(count_requests, &(reqs.data())[0], MPI_STATUSES_IGNORE);
577 req_storage.get_store().clear();
579 for (auto const& pair : sender_receiver) {
580 TRACE_smpi_recv(pair.first, pair.second, 0);
582 TRACE_smpi_comm_out(get_pid());
586 void BarrierAction::kernel(simgrid::xbt::ReplayAction&)
588 TRACE_smpi_comm_in(get_pid(), __func__, new simgrid::instr::NoOpTIData("barrier"));
589 colls::barrier(MPI_COMM_WORLD);
590 TRACE_smpi_comm_out(get_pid());
593 void BcastAction::kernel(simgrid::xbt::ReplayAction&)
595 const BcastArgParser& args = get_args();
596 TRACE_smpi_comm_in(get_pid(), "action_bcast",
597 new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(args.root)->get_pid(), -1.0,
598 args.size, -1, Datatype::encode(args.datatype1), ""));
600 colls::bcast(send_buffer(args.size * args.datatype1->size()), args.size, args.datatype1, args.root, MPI_COMM_WORLD);
602 TRACE_smpi_comm_out(get_pid());
605 void ReduceAction::kernel(simgrid::xbt::ReplayAction&)
607 const ReduceArgParser& args = get_args();
608 TRACE_smpi_comm_in(get_pid(), "action_reduce",
609 new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(args.root)->get_pid(),
610 args.comp_size, args.comm_size, -1,
611 Datatype::encode(args.datatype1), ""));
613 colls::reduce(send_buffer(args.comm_size * args.datatype1->size()),
614 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL,
615 args.root, MPI_COMM_WORLD);
616 private_execute_flops(args.comp_size);
618 TRACE_smpi_comm_out(get_pid());
621 void AllReduceAction::kernel(simgrid::xbt::ReplayAction&)
623 const AllReduceArgParser& args = get_args();
624 TRACE_smpi_comm_in(get_pid(), "action_allreduce",
625 new simgrid::instr::CollTIData("allreduce", -1, args.comp_size, args.comm_size, -1,
626 Datatype::encode(args.datatype1), ""));
628 colls::allreduce(send_buffer(args.comm_size * args.datatype1->size()),
629 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL,
631 private_execute_flops(args.comp_size);
633 TRACE_smpi_comm_out(get_pid());
636 void AllToAllAction::kernel(simgrid::xbt::ReplayAction&)
638 const AllToAllArgParser& args = get_args();
639 TRACE_smpi_comm_in(get_pid(), "action_alltoall",
640 new simgrid::instr::CollTIData("alltoall", -1, -1.0, args.send_size, args.recv_size,
641 Datatype::encode(args.datatype1),
642 Datatype::encode(args.datatype2)));
644 colls::alltoall(send_buffer(args.send_size * args.comm_size * args.datatype1->size()), args.send_size, args.datatype1,
645 recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()), args.recv_size, args.datatype2,
648 TRACE_smpi_comm_out(get_pid());
651 void GatherAction::kernel(simgrid::xbt::ReplayAction&)
653 const GatherArgParser& args = get_args();
654 TRACE_smpi_comm_in(get_pid(), get_name().c_str(),
655 new simgrid::instr::CollTIData(get_name(), (get_name() == "gather") ? args.root : -1, -1.0,
656 args.send_size, args.recv_size, Datatype::encode(args.datatype1),
657 Datatype::encode(args.datatype2)));
659 if (get_name() == "gather") {
660 int rank = MPI_COMM_WORLD->rank();
661 colls::gather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
662 (rank == args.root) ? recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()) : nullptr,
663 args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
665 colls::allgather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
666 recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2,
669 TRACE_smpi_comm_out(get_pid());
672 void GatherVAction::kernel(simgrid::xbt::ReplayAction&)
674 int rank = MPI_COMM_WORLD->rank();
675 const GatherVArgParser& args = get_args();
676 TRACE_smpi_comm_in(get_pid(), get_name().c_str(),
677 new simgrid::instr::VarCollTIData(
678 get_name(), (get_name() == "gatherv") ? args.root : -1, args.send_size, nullptr, -1,
679 args.recvcounts, Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
681 if (get_name() == "gatherv") {
682 colls::gatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
683 (rank == args.root) ? recv_buffer(args.recv_size_sum * args.datatype2->size()) : nullptr,
684 args.recvcounts->data(), args.disps.data(), args.datatype2, args.root, MPI_COMM_WORLD);
686 colls::allgatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
687 recv_buffer(args.recv_size_sum * args.datatype2->size()), args.recvcounts->data(),
688 args.disps.data(), args.datatype2, MPI_COMM_WORLD);
691 TRACE_smpi_comm_out(get_pid());
694 void ScatterAction::kernel(simgrid::xbt::ReplayAction&)
696 int rank = MPI_COMM_WORLD->rank();
697 const ScatterArgParser& args = get_args();
698 TRACE_smpi_comm_in(get_pid(), "action_scatter",
699 new simgrid::instr::CollTIData(get_name(), args.root, -1.0, args.send_size, args.recv_size,
700 Datatype::encode(args.datatype1),
701 Datatype::encode(args.datatype2)));
703 colls::scatter(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
704 (rank == args.root) ? recv_buffer(args.recv_size * args.datatype2->size()) : nullptr, args.recv_size,
705 args.datatype2, args.root, MPI_COMM_WORLD);
707 TRACE_smpi_comm_out(get_pid());
710 void ScatterVAction::kernel(simgrid::xbt::ReplayAction&)
712 int rank = MPI_COMM_WORLD->rank();
713 const ScatterVArgParser& args = get_args();
714 TRACE_smpi_comm_in(get_pid(), "action_scatterv",
715 new simgrid::instr::VarCollTIData(get_name(), args.root, -1, args.sendcounts, args.recv_size,
716 nullptr, Datatype::encode(args.datatype1),
717 Datatype::encode(args.datatype2)));
719 colls::scatterv((rank == args.root) ? send_buffer(args.send_size_sum * args.datatype1->size()) : nullptr,
720 args.sendcounts->data(), args.disps.data(), args.datatype1,
721 recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, args.root,
724 TRACE_smpi_comm_out(get_pid());
727 void ReduceScatterAction::kernel(simgrid::xbt::ReplayAction&)
729 const ReduceScatterArgParser& args = get_args();
731 get_pid(), "action_reducescatter",
732 new simgrid::instr::VarCollTIData("reducescatter", -1, 0, nullptr, -1, args.recvcounts,
733 std::to_string(args.comp_size), /* ugly hack to print comp_size */
734 Datatype::encode(args.datatype1)));
736 colls::reduce_scatter(send_buffer(args.recv_size_sum * args.datatype1->size()),
737 recv_buffer(args.recv_size_sum * args.datatype1->size()), args.recvcounts->data(),
738 args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
740 private_execute_flops(args.comp_size);
741 TRACE_smpi_comm_out(get_pid());
744 void AllToAllVAction::kernel(simgrid::xbt::ReplayAction&)
746 const AllToAllVArgParser& args = get_args();
747 TRACE_smpi_comm_in(get_pid(), __func__,
748 new simgrid::instr::VarCollTIData(
749 "alltoallv", -1, args.send_size_sum, args.sendcounts, args.recv_size_sum, args.recvcounts,
750 Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
752 colls::alltoallv(send_buffer(args.send_buf_size * args.datatype1->size()), args.sendcounts->data(),
753 args.senddisps.data(), args.datatype1, recv_buffer(args.recv_buf_size * args.datatype2->size()),
754 args.recvcounts->data(), args.recvdisps.data(), args.datatype2, MPI_COMM_WORLD);
756 TRACE_smpi_comm_out(get_pid());
758 } // Replay Namespace
759 }} // namespace simgrid::smpi
761 static std::unordered_map<aid_t, simgrid::smpi::replay::RequestStorage> storage;
762 /** @brief Only initialize the replay, don't do it for real */
763 void smpi_replay_init(const char* instance_id, int rank, double start_delay_flops)
765 xbt_assert(not smpi_process()->initializing());
767 simgrid::s4u::Actor::self()->set_property("instance_id", instance_id);
768 simgrid::s4u::Actor::self()->set_property("rank", std::to_string(rank));
769 simgrid::smpi::ActorExt::init();
771 smpi_process()->mark_as_initialized();
772 smpi_process()->set_replaying(true);
774 int my_proc_id = simgrid::s4u::this_actor::get_pid();
776 TRACE_smpi_init(my_proc_id);
777 TRACE_smpi_computing_init(my_proc_id);
778 TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
779 TRACE_smpi_comm_out(my_proc_id);
780 xbt_replay_action_register("init", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::InitAction().execute(action); });
781 xbt_replay_action_register("finalize", [](simgrid::xbt::ReplayAction const&) { /* nothing to do */ });
782 xbt_replay_action_register("comm_size", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
783 xbt_replay_action_register("comm_split",[](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
784 xbt_replay_action_register("comm_dup", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
785 xbt_replay_action_register("send", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("send", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
786 xbt_replay_action_register("isend", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("isend", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
787 xbt_replay_action_register("recv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("recv", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
788 xbt_replay_action_register("irecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("irecv", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
789 xbt_replay_action_register("test", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::TestAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
790 xbt_replay_action_register("wait", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
791 xbt_replay_action_register("waitall", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAllAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
792 xbt_replay_action_register("barrier", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BarrierAction().execute(action); });
793 xbt_replay_action_register("bcast", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BcastAction().execute(action); });
794 xbt_replay_action_register("reduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceAction().execute(action); });
795 xbt_replay_action_register("allreduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllReduceAction().execute(action); });
796 xbt_replay_action_register("alltoall", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllAction().execute(action); });
797 xbt_replay_action_register("alltoallv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllVAction().execute(action); });
798 xbt_replay_action_register("gather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("gather").execute(action); });
799 xbt_replay_action_register("scatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterAction().execute(action); });
800 xbt_replay_action_register("gatherv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("gatherv").execute(action); });
801 xbt_replay_action_register("scatterv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterVAction().execute(action); });
802 xbt_replay_action_register("allgather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("allgather").execute(action); });
803 xbt_replay_action_register("allgatherv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("allgatherv").execute(action); });
804 xbt_replay_action_register("reducescatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceScatterAction().execute(action); });
805 xbt_replay_action_register("compute", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ComputeAction().execute(action); });
806 xbt_replay_action_register("sleep", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SleepAction().execute(action); });
807 xbt_replay_action_register("location", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::LocationAction().execute(action); });
809 //if we have a delayed start, sleep here.
810 if (start_delay_flops > 0) {
811 XBT_VERB("Delayed start for instance - Sleeping for %f flops ", start_delay_flops);
812 private_execute_flops(start_delay_flops);
814 // Wait for the other actors to initialize also
815 simgrid::s4u::this_actor::yield();
819 /** @brief actually run the replay after initialization */
820 void smpi_replay_main(int rank, const char* trace_filename)
822 static int active_processes = 0;
824 storage[simgrid::s4u::this_actor::get_pid()] = simgrid::smpi::replay::RequestStorage();
825 std::string rank_string = std::to_string(rank);
826 simgrid::xbt::replay_runner(rank_string.c_str(), trace_filename);
828 /* and now, finalize everything */
829 /* One active process will stop. Decrease the counter*/
830 unsigned int count_requests = storage[simgrid::s4u::this_actor::get_pid()].size();
831 XBT_DEBUG("There are %ud elements in reqq[*]", count_requests);
832 if (count_requests > 0) {
833 MPI_Request* requests= new MPI_Request[count_requests];
836 for (auto const& pair : storage[simgrid::s4u::this_actor::get_pid()].get_store()) {
837 requests[i] = pair.second;
840 simgrid::smpi::Request::waitall(count_requests, requests, MPI_STATUSES_IGNORE);
845 if(active_processes==0){
846 /* Last process alive speaking: end the simulated timer */
847 XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
848 smpi_free_replay_tmp_buffers();
851 TRACE_smpi_comm_in(simgrid::s4u::this_actor::get_pid(), "smpi_replay_run_finalize",
852 new simgrid::instr::NoOpTIData("finalize"));
854 smpi_process()->finalize();
856 TRACE_smpi_comm_out(simgrid::s4u::this_actor::get_pid());
859 /** @brief chain a replay initialization and a replay start */
860 void smpi_replay_run(const char* instance_id, int rank, double start_delay_flops, const char* trace_filename)
862 smpi_replay_init(instance_id, rank, start_delay_flops);
863 smpi_replay_main(rank, trace_filename);