1 /* Copyright (c) 2009-2019. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
6 #include "smpi_coll.hpp"
7 #include "smpi_comm.hpp"
8 #include "smpi_datatype.hpp"
9 #include "smpi_group.hpp"
10 #include "smpi_request.hpp"
11 #include "xbt/replay.hpp"
12 #include <simgrid/smpi/replay.hpp>
13 #include <src/smpi/include/private.hpp>
17 #include <unordered_map>
22 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay, smpi, "Trace Replay with SMPI");
24 // From https://stackoverflow.com/questions/7110301/generic-hash-for-tuples-in-unordered-map-unordered-set
25 // This is all just to make std::unordered_map work with std::tuple. If we need this in other places,
26 // this could go into a header file.
27 namespace hash_tuple {
28 template <typename TT> class hash {
30 size_t operator()(TT const& tt) const { return std::hash<TT>()(tt); }
33 template <class T> inline void hash_combine(std::size_t& seed, T const& v)
35 seed ^= hash_tuple::hash<T>()(v) + 0x9e3779b9 + (seed << 6) + (seed >> 2);
38 // Recursive template code derived from Matthieu M.
39 template <class Tuple, size_t Index = std::tuple_size<Tuple>::value - 1> class HashValueImpl {
41 static void apply(size_t& seed, Tuple const& tuple)
43 HashValueImpl<Tuple, Index - 1>::apply(seed, tuple);
44 hash_combine(seed, std::get<Index>(tuple));
48 template <class Tuple> class HashValueImpl<Tuple, 0> {
50 static void apply(size_t& seed, Tuple const& tuple) { hash_combine(seed, std::get<0>(tuple)); }
53 template <typename... TT> class hash<std::tuple<TT...>> {
55 size_t operator()(std::tuple<TT...> const& tt) const
58 HashValueImpl<std::tuple<TT...>>::apply(seed, tt);
64 typedef std::tuple</*sender*/ int, /* reciever */ int, /* tag */int> req_key_t;
65 typedef std::unordered_map<req_key_t, MPI_Request, hash_tuple::hash<std::tuple<int,int,int>>> req_storage_t;
67 void log_timed_action(simgrid::xbt::ReplayAction& action, double clock)
69 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
70 std::string s = boost::algorithm::join(action, " ");
71 XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
76 static double parse_double(const std::string& string)
78 return xbt_str_parse_double(string.c_str(), "%s is not a double");
85 MPI_Datatype MPI_DEFAULT_TYPE;
87 class RequestStorage {
98 req_storage_t& get_store()
103 void get_requests(std::vector<MPI_Request>& vec)
105 for (auto& pair : store) {
106 auto& req = pair.second;
107 auto my_proc_id = simgrid::s4u::this_actor::get_pid();
108 if (req != MPI_REQUEST_NULL && (req->src() == my_proc_id || req->dst() == my_proc_id)) {
109 vec.push_back(pair.second);
110 pair.second->print_request("MM");
115 MPI_Request find(int src, int dst, int tag)
117 req_storage_t::iterator it = store.find(req_key_t(src, dst, tag));
118 return (it == store.end()) ? MPI_REQUEST_NULL : it->second;
121 void remove(MPI_Request req)
123 if (req == MPI_REQUEST_NULL) return;
125 store.erase(req_key_t(req->src()-1, req->dst()-1, req->tag()));
128 void add(MPI_Request req)
130 if (req != MPI_REQUEST_NULL) // Can and does happen in the case of TestAction
131 store.insert({req_key_t(req->src()-1, req->dst()-1, req->tag()), req});
134 /* Sometimes we need to re-insert MPI_REQUEST_NULL but we still need src,dst and tag */
135 void addNullRequest(int src, int dst, int tag)
137 store.insert({req_key_t(
138 MPI_COMM_WORLD->group()->actor(src)->get_pid()-1,
139 MPI_COMM_WORLD->group()->actor(dst)->get_pid()-1,
140 tag), MPI_REQUEST_NULL});
144 void WaitTestParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
146 CHECK_ACTION_PARAMS(action, 3, 0)
147 src = std::stoi(action[2]);
148 dst = std::stoi(action[3]);
149 tag = std::stoi(action[4]);
152 void SendRecvParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
154 CHECK_ACTION_PARAMS(action, 3, 1)
155 partner = std::stoi(action[2]);
156 tag = std::stoi(action[3]);
157 size = parse_double(action[4]);
158 if (action.size() > 5)
159 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
162 void ComputeParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
164 CHECK_ACTION_PARAMS(action, 1, 0)
165 flops = parse_double(action[2]);
168 void BcastArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
170 CHECK_ACTION_PARAMS(action, 1, 2)
171 size = parse_double(action[2]);
172 root = (action.size() > 3) ? std::stoi(action[3]) : 0;
173 if (action.size() > 4)
174 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
177 void ReduceArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
179 CHECK_ACTION_PARAMS(action, 2, 2)
180 comm_size = parse_double(action[2]);
181 comp_size = parse_double(action[3]);
182 root = (action.size() > 4) ? std::stoi(action[4]) : 0;
183 if (action.size() > 5)
184 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
187 void AllReduceArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
189 CHECK_ACTION_PARAMS(action, 2, 1)
190 comm_size = parse_double(action[2]);
191 comp_size = parse_double(action[3]);
192 if (action.size() > 4)
193 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
196 void AllToAllArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
198 CHECK_ACTION_PARAMS(action, 2, 1)
199 comm_size = MPI_COMM_WORLD->size();
200 send_size = parse_double(action[2]);
201 recv_size = parse_double(action[3]);
203 if (action.size() > 4)
204 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
205 if (action.size() > 5)
206 datatype2 = simgrid::smpi::Datatype::decode(action[5]);
209 void GatherArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string& name)
211 /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
214 1) 68 is the sendcounts
215 2) 68 is the recvcounts
216 3) 0 is the root node
217 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
218 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
220 CHECK_ACTION_PARAMS(action, 2, 3)
221 comm_size = MPI_COMM_WORLD->size();
222 send_size = parse_double(action[2]);
223 recv_size = parse_double(action[3]);
225 if (name == "gather") {
226 root = (action.size() > 4) ? std::stoi(action[4]) : 0;
227 if (action.size() > 5)
228 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
229 if (action.size() > 6)
230 datatype2 = simgrid::smpi::Datatype::decode(action[6]);
232 if (action.size() > 4)
233 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
234 if (action.size() > 5)
235 datatype2 = simgrid::smpi::Datatype::decode(action[5]);
239 void GatherVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string& name)
241 /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
242 0 gather 68 68 10 10 10 0 0 0
244 1) 68 is the sendcount
245 2) 68 10 10 10 is the recvcounts
246 3) 0 is the root node
247 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
248 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
250 comm_size = MPI_COMM_WORLD->size();
251 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
252 send_size = parse_double(action[2]);
253 disps = std::vector<int>(comm_size, 0);
254 recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
256 if (name == "gatherv") {
257 root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
258 if (action.size() > 4 + comm_size)
259 datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
260 if (action.size() > 5 + comm_size)
261 datatype2 = simgrid::smpi::Datatype::decode(action[5 + comm_size]);
263 int datatype_index = 0;
265 /* The 3 comes from "0 gather <sendcount>", which must always be present.
266 * The + comm_size is the recvcounts array, which must also be present
268 if (action.size() > 3 + comm_size + comm_size) { /* datatype + disp are specified */
269 datatype_index = 3 + comm_size;
270 disp_index = datatype_index + 1;
271 datatype1 = simgrid::smpi::Datatype::decode(action[datatype_index]);
272 datatype2 = simgrid::smpi::Datatype::decode(action[datatype_index]);
273 } else if (action.size() >
274 3 + comm_size + 2) { /* disps specified; datatype is not specified; use the default one */
275 disp_index = 3 + comm_size;
276 } else if (action.size() > 3 + comm_size) { /* only datatype, no disp specified */
277 datatype_index = 3 + comm_size;
278 datatype1 = simgrid::smpi::Datatype::decode(action[datatype_index]);
279 datatype2 = simgrid::smpi::Datatype::decode(action[datatype_index]);
282 if (disp_index != 0) {
283 for (unsigned int i = 0; i < comm_size; i++)
284 disps[i] = std::stoi(action[disp_index + i]);
288 for (unsigned int i = 0; i < comm_size; i++) {
289 (*recvcounts)[i] = std::stoi(action[i + 3]);
291 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
294 void ScatterArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
296 /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
299 1) 68 is the sendcounts
300 2) 68 is the recvcounts
301 3) 0 is the root node
302 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
303 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
305 CHECK_ACTION_PARAMS(action, 2, 3)
306 comm_size = MPI_COMM_WORLD->size();
307 send_size = parse_double(action[2]);
308 recv_size = parse_double(action[3]);
309 root = (action.size() > 4) ? std::stoi(action[4]) : 0;
310 if (action.size() > 5)
311 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
312 if (action.size() > 6)
313 datatype2 = simgrid::smpi::Datatype::decode(action[6]);
316 void ScatterVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
318 /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
319 0 gather 68 10 10 10 68 0 0 0
321 1) 68 10 10 10 is the sendcounts
322 2) 68 is the recvcount
323 3) 0 is the root node
324 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
325 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
327 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
328 recv_size = parse_double(action[2 + comm_size]);
329 disps = std::vector<int>(comm_size, 0);
330 sendcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
332 if (action.size() > 5 + comm_size)
333 datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
334 if (action.size() > 5 + comm_size)
335 datatype2 = simgrid::smpi::Datatype::decode(action[5]);
337 for (unsigned int i = 0; i < comm_size; i++) {
338 (*sendcounts)[i] = std::stoi(action[i + 2]);
340 send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
341 root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
344 void ReduceScatterArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
346 /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
347 0 reducescatter 275427 275427 275427 204020 11346849 0
349 1) The first four values after the name of the action declare the recvcounts array
350 2) The value 11346849 is the amount of instructions
351 3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
353 comm_size = MPI_COMM_WORLD->size();
354 CHECK_ACTION_PARAMS(action, comm_size + 1, 1)
355 comp_size = parse_double(action[2 + comm_size]);
356 recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
357 if (action.size() > 3 + comm_size)
358 datatype1 = simgrid::smpi::Datatype::decode(action[3 + comm_size]);
360 for (unsigned int i = 0; i < comm_size; i++) {
361 recvcounts->push_back(std::stoi(action[i + 2]));
363 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
366 void AllToAllVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
368 /* The structure of the alltoallv action for the rank 0 (total 4 processes) is the following:
369 0 alltoallv 100 1 7 10 12 100 1 70 10 5
371 1) 100 is the size of the send buffer *sizeof(int),
372 2) 1 7 10 12 is the sendcounts array
373 3) 100*sizeof(int) is the size of the receiver buffer
374 4) 1 70 10 5 is the recvcounts array
376 comm_size = MPI_COMM_WORLD->size();
377 CHECK_ACTION_PARAMS(action, 2 * comm_size + 2, 2)
378 sendcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
379 recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
380 senddisps = std::vector<int>(comm_size, 0);
381 recvdisps = std::vector<int>(comm_size, 0);
383 if (action.size() > 5 + 2 * comm_size)
384 datatype1 = simgrid::smpi::Datatype::decode(action[4 + 2 * comm_size]);
385 if (action.size() > 5 + 2 * comm_size)
386 datatype2 = simgrid::smpi::Datatype::decode(action[5 + 2 * comm_size]);
388 send_buf_size = parse_double(action[2]);
389 recv_buf_size = parse_double(action[3 + comm_size]);
390 for (unsigned int i = 0; i < comm_size; i++) {
391 (*sendcounts)[i] = std::stoi(action[3 + i]);
392 (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
394 send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
395 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
398 void WaitAction::kernel(simgrid::xbt::ReplayAction& action)
400 std::string s = boost::algorithm::join(action, " ");
401 xbt_assert(req_storage.size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
402 MPI_Request request = req_storage.find(args.src, args.dst, args.tag);
403 req_storage.remove(request);
405 if (request == MPI_REQUEST_NULL) {
406 /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just
411 int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
413 // Must be taken before Request::wait() since the request may be set to
414 // MPI_REQUEST_NULL by Request::wait!
415 bool is_wait_for_receive = (request->flags() & MPI_REQ_RECV);
416 // TODO: Here we take the rank while we normally take the process id (look for my_proc_id)
417 TRACE_smpi_comm_in(rank, __func__, new simgrid::instr::WaitTIData(args.src, args.dst, args.tag));
420 Request::wait(&request, &status);
422 TRACE_smpi_comm_out(rank);
423 if (is_wait_for_receive)
424 TRACE_smpi_recv(args.src, args.dst, args.tag);
427 void SendAction::kernel(simgrid::xbt::ReplayAction&)
429 int dst_traced = MPI_COMM_WORLD->group()->actor(args.partner)->get_pid();
431 TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
432 args.tag, Datatype::encode(args.datatype1)));
433 if (not TRACE_smpi_view_internals())
434 TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, args.tag, args.size * args.datatype1->size());
436 if (name == "send") {
437 Request::send(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
438 } else if (name == "isend") {
439 MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
440 req_storage.add(request);
442 xbt_die("Don't know this action, %s", name.c_str());
445 TRACE_smpi_comm_out(my_proc_id);
448 void RecvAction::kernel(simgrid::xbt::ReplayAction&)
450 TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
451 args.tag, Datatype::encode(args.datatype1)));
454 // unknown size from the receiver point of view
455 if (args.size <= 0.0) {
456 Request::probe(args.partner, args.tag, MPI_COMM_WORLD, &status);
457 args.size = status.count;
460 bool is_recv = false; // Help analyzers understanding that status is not used unintialized
461 if (name == "recv") {
463 Request::recv(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD, &status);
464 } else if (name == "irecv") {
465 MPI_Request request = Request::irecv(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
466 req_storage.add(request);
471 TRACE_smpi_comm_out(my_proc_id);
472 if (is_recv && not TRACE_smpi_view_internals()) {
473 int src_traced = MPI_COMM_WORLD->group()->actor(status.MPI_SOURCE)->get_pid();
474 TRACE_smpi_recv(src_traced, my_proc_id, args.tag);
478 void ComputeAction::kernel(simgrid::xbt::ReplayAction&)
480 TRACE_smpi_computing_in(my_proc_id, args.flops);
481 smpi_execute_flops(args.flops);
482 TRACE_smpi_computing_out(my_proc_id);
485 void TestAction::kernel(simgrid::xbt::ReplayAction&)
487 MPI_Request request = req_storage.find(args.src, args.dst, args.tag);
488 req_storage.remove(request);
489 // if request is null here, this may mean that a previous test has succeeded
490 // Different times in traced application and replayed version may lead to this
491 // In this case, ignore the extra calls.
492 if (request != MPI_REQUEST_NULL) {
493 TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::NoOpTIData("test"));
496 int flag = Request::test(&request, &status);
498 XBT_DEBUG("MPI_Test result: %d", flag);
499 /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
501 if (request == MPI_REQUEST_NULL)
502 req_storage.addNullRequest(args.src, args.dst, args.tag);
504 req_storage.add(request);
506 TRACE_smpi_comm_out(my_proc_id);
510 void InitAction::kernel(simgrid::xbt::ReplayAction& action)
512 CHECK_ACTION_PARAMS(action, 0, 1)
513 MPI_DEFAULT_TYPE = (action.size() > 2) ? MPI_DOUBLE // default MPE datatype
514 : MPI_BYTE; // default TAU datatype
516 /* start a simulated timer */
517 smpi_process()->simulated_start();
520 void CommunicatorAction::kernel(simgrid::xbt::ReplayAction&)
525 void WaitAllAction::kernel(simgrid::xbt::ReplayAction&)
527 const unsigned int count_requests = req_storage.size();
529 if (count_requests > 0) {
530 TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData("waitall", -1, count_requests, ""));
531 std::vector<std::pair</*sender*/int,/*recv*/int>> sender_receiver;
532 std::vector<MPI_Request> reqs;
533 req_storage.get_requests(reqs);
534 for (const auto& req : reqs) {
535 if (req && (req->flags() & MPI_REQ_RECV)) {
536 sender_receiver.push_back({req->src(), req->dst()});
539 MPI_Status status[count_requests];
540 Request::waitall(count_requests, &(reqs.data())[0], status);
541 req_storage.get_store().clear();
543 for (auto& pair : sender_receiver) {
544 TRACE_smpi_recv(pair.first, pair.second, 0);
546 TRACE_smpi_comm_out(my_proc_id);
550 void BarrierAction::kernel(simgrid::xbt::ReplayAction&)
552 TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::NoOpTIData("barrier"));
553 Colls::barrier(MPI_COMM_WORLD);
554 TRACE_smpi_comm_out(my_proc_id);
557 void BcastAction::kernel(simgrid::xbt::ReplayAction&)
559 TRACE_smpi_comm_in(my_proc_id, "action_bcast",
560 new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(args.root)->get_pid(),
561 -1.0, args.size, -1, Datatype::encode(args.datatype1), ""));
563 Colls::bcast(send_buffer(args.size * args.datatype1->size()), args.size, args.datatype1, args.root, MPI_COMM_WORLD);
565 TRACE_smpi_comm_out(my_proc_id);
568 void ReduceAction::kernel(simgrid::xbt::ReplayAction&)
570 TRACE_smpi_comm_in(my_proc_id, "action_reduce",
571 new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(args.root)->get_pid(),
572 args.comp_size, args.comm_size, -1,
573 Datatype::encode(args.datatype1), ""));
575 Colls::reduce(send_buffer(args.comm_size * args.datatype1->size()),
576 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL, args.root, MPI_COMM_WORLD);
577 smpi_execute_flops(args.comp_size);
579 TRACE_smpi_comm_out(my_proc_id);
582 void AllReduceAction::kernel(simgrid::xbt::ReplayAction&)
584 TRACE_smpi_comm_in(my_proc_id, "action_allreduce", new simgrid::instr::CollTIData("allreduce", -1, args.comp_size, args.comm_size, -1,
585 Datatype::encode(args.datatype1), ""));
587 Colls::allreduce(send_buffer(args.comm_size * args.datatype1->size()),
588 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
589 smpi_execute_flops(args.comp_size);
591 TRACE_smpi_comm_out(my_proc_id);
594 void AllToAllAction::kernel(simgrid::xbt::ReplayAction&)
596 TRACE_smpi_comm_in(my_proc_id, "action_alltoall",
597 new simgrid::instr::CollTIData("alltoall", -1, -1.0, args.send_size, args.recv_size,
598 Datatype::encode(args.datatype1),
599 Datatype::encode(args.datatype2)));
601 Colls::alltoall(send_buffer(args.send_size * args.comm_size * args.datatype1->size()), args.send_size,
602 args.datatype1, recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()),
603 args.recv_size, args.datatype2, MPI_COMM_WORLD);
605 TRACE_smpi_comm_out(my_proc_id);
608 void GatherAction::kernel(simgrid::xbt::ReplayAction&)
610 TRACE_smpi_comm_in(my_proc_id, name.c_str(), new simgrid::instr::CollTIData(name, (name == "gather") ? args.root : -1, -1.0, args.send_size, args.recv_size,
611 Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
613 if (name == "gather") {
614 int rank = MPI_COMM_WORLD->rank();
615 Colls::gather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
616 (rank == args.root) ? recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()) : nullptr, args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
619 Colls::allgather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
620 recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, MPI_COMM_WORLD);
622 TRACE_smpi_comm_out(my_proc_id);
625 void GatherVAction::kernel(simgrid::xbt::ReplayAction&)
627 int rank = MPI_COMM_WORLD->rank();
629 TRACE_smpi_comm_in(my_proc_id, name.c_str(), new simgrid::instr::VarCollTIData(
630 name, (name == "gatherv") ? args.root : -1, args.send_size, nullptr, -1, args.recvcounts,
631 Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
633 if (name == "gatherv") {
634 Colls::gatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
635 (rank == args.root) ? recv_buffer(args.recv_size_sum * args.datatype2->size()) : nullptr,
636 args.recvcounts->data(), args.disps.data(), args.datatype2, args.root, MPI_COMM_WORLD);
639 Colls::allgatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
640 recv_buffer(args.recv_size_sum * args.datatype2->size()), args.recvcounts->data(),
641 args.disps.data(), args.datatype2, MPI_COMM_WORLD);
644 TRACE_smpi_comm_out(my_proc_id);
647 void ScatterAction::kernel(simgrid::xbt::ReplayAction&)
649 int rank = MPI_COMM_WORLD->rank();
650 TRACE_smpi_comm_in(my_proc_id, "action_scatter", new simgrid::instr::CollTIData(name, args.root, -1.0, args.send_size, args.recv_size,
651 Datatype::encode(args.datatype1),
652 Datatype::encode(args.datatype2)));
654 Colls::scatter(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
655 (rank == args.root) ? recv_buffer(args.recv_size * args.datatype2->size()) : nullptr, args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
657 TRACE_smpi_comm_out(my_proc_id);
660 void ScatterVAction::kernel(simgrid::xbt::ReplayAction&)
662 int rank = MPI_COMM_WORLD->rank();
663 TRACE_smpi_comm_in(my_proc_id, "action_scatterv", new simgrid::instr::VarCollTIData(name, args.root, -1, args.sendcounts, args.recv_size,
664 nullptr, Datatype::encode(args.datatype1),
665 Datatype::encode(args.datatype2)));
667 Colls::scatterv((rank == args.root) ? send_buffer(args.send_size_sum * args.datatype1->size()) : nullptr,
668 args.sendcounts->data(), args.disps.data(), args.datatype1,
669 recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, args.root,
672 TRACE_smpi_comm_out(my_proc_id);
675 void ReduceScatterAction::kernel(simgrid::xbt::ReplayAction&)
677 TRACE_smpi_comm_in(my_proc_id, "action_reducescatter",
678 new simgrid::instr::VarCollTIData("reducescatter", -1, 0, nullptr, -1, args.recvcounts,
679 std::to_string(args.comp_size), /* ugly hack to print comp_size */
680 Datatype::encode(args.datatype1)));
682 Colls::reduce_scatter(send_buffer(args.recv_size_sum * args.datatype1->size()),
683 recv_buffer(args.recv_size_sum * args.datatype1->size()), args.recvcounts->data(),
684 args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
686 smpi_execute_flops(args.comp_size);
687 TRACE_smpi_comm_out(my_proc_id);
690 void AllToAllVAction::kernel(simgrid::xbt::ReplayAction&)
692 TRACE_smpi_comm_in(my_proc_id, __func__,
693 new simgrid::instr::VarCollTIData(
694 "alltoallv", -1, args.send_size_sum, args.sendcounts, args.recv_size_sum, args.recvcounts,
695 Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
697 Colls::alltoallv(send_buffer(args.send_buf_size * args.datatype1->size()), args.sendcounts->data(), args.senddisps.data(), args.datatype1,
698 recv_buffer(args.recv_buf_size * args.datatype2->size()), args.recvcounts->data(), args.recvdisps.data(), args.datatype2, MPI_COMM_WORLD);
700 TRACE_smpi_comm_out(my_proc_id);
702 } // Replay Namespace
703 }} // namespace simgrid::smpi
705 static std::unordered_map<aid_t, simgrid::smpi::replay::RequestStorage> storage;
706 /** @brief Only initialize the replay, don't do it for real */
707 void smpi_replay_init(const char* instance_id, int rank, double start_delay_flops)
709 if (not smpi_process()->initializing()){
710 simgrid::s4u::Actor::self()->set_property("instance_id", instance_id);
711 simgrid::s4u::Actor::self()->set_property("rank", std::to_string(rank));
712 simgrid::smpi::ActorExt::init();
714 smpi_process()->mark_as_initialized();
715 smpi_process()->set_replaying(true);
717 int my_proc_id = simgrid::s4u::this_actor::get_pid();
719 TRACE_smpi_init(my_proc_id);
720 TRACE_smpi_computing_init(my_proc_id);
721 TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
722 TRACE_smpi_comm_out(my_proc_id);
723 xbt_replay_action_register("init", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::InitAction().execute(action); });
724 xbt_replay_action_register("finalize", [](simgrid::xbt::ReplayAction&) { /* nothing to do */ });
725 xbt_replay_action_register("comm_size", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
726 xbt_replay_action_register("comm_split",[](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
727 xbt_replay_action_register("comm_dup", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
728 xbt_replay_action_register("send", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("send", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
729 xbt_replay_action_register("isend", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("isend", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
730 xbt_replay_action_register("recv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("recv", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
731 xbt_replay_action_register("irecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("irecv", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
732 xbt_replay_action_register("test", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::TestAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
733 xbt_replay_action_register("wait", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
734 xbt_replay_action_register("waitall", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAllAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
735 xbt_replay_action_register("barrier", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BarrierAction().execute(action); });
736 xbt_replay_action_register("bcast", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BcastAction().execute(action); });
737 xbt_replay_action_register("reduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceAction().execute(action); });
738 xbt_replay_action_register("allreduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllReduceAction().execute(action); });
739 xbt_replay_action_register("alltoall", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllAction().execute(action); });
740 xbt_replay_action_register("alltoallv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllVAction().execute(action); });
741 xbt_replay_action_register("gather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("gather").execute(action); });
742 xbt_replay_action_register("scatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterAction().execute(action); });
743 xbt_replay_action_register("gatherv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("gatherv").execute(action); });
744 xbt_replay_action_register("scatterv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterVAction().execute(action); });
745 xbt_replay_action_register("allgather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("allgather").execute(action); });
746 xbt_replay_action_register("allgatherv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("allgatherv").execute(action); });
747 xbt_replay_action_register("reducescatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceScatterAction().execute(action); });
748 xbt_replay_action_register("compute", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ComputeAction().execute(action); });
750 //if we have a delayed start, sleep here.
751 if (start_delay_flops > 0) {
752 XBT_VERB("Delayed start for instance - Sleeping for %f flops ", start_delay_flops);
753 smpi_execute_flops(start_delay_flops);
755 // Wait for the other actors to initialize also
756 simgrid::s4u::this_actor::yield();
760 /** @brief actually run the replay after initialization */
761 void smpi_replay_main(int rank, const char* trace_filename)
763 static int active_processes = 0;
765 storage[simgrid::s4u::this_actor::get_pid()] = simgrid::smpi::replay::RequestStorage();
766 std::string rank_string = std::to_string(rank);
767 simgrid::xbt::replay_runner(rank_string.c_str(), trace_filename);
769 /* and now, finalize everything */
770 /* One active process will stop. Decrease the counter*/
771 unsigned int count_requests = storage[simgrid::s4u::this_actor::get_pid()].size();
772 XBT_DEBUG("There are %ud elements in reqq[*]", count_requests);
773 if (count_requests > 0) {
774 MPI_Request requests[count_requests];
775 MPI_Status status[count_requests];
778 for (auto const& pair : storage[simgrid::s4u::this_actor::get_pid()].get_store()) {
779 requests[i] = pair.second;
782 simgrid::smpi::Request::waitall(count_requests, requests, status);
786 if(active_processes==0){
787 /* Last process alive speaking: end the simulated timer */
788 XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
789 smpi_free_replay_tmp_buffers();
792 TRACE_smpi_comm_in(simgrid::s4u::this_actor::get_pid(), "smpi_replay_run_finalize",
793 new simgrid::instr::NoOpTIData("finalize"));
795 smpi_process()->finalize();
797 TRACE_smpi_comm_out(simgrid::s4u::this_actor::get_pid());
798 TRACE_smpi_finalize(simgrid::s4u::this_actor::get_pid());
801 /** @brief chain a replay initialization and a replay start */
802 void smpi_replay_run(const char* instance_id, int rank, double start_delay_flops, const char* trace_filename)
804 smpi_replay_init(instance_id, rank, start_delay_flops);
805 smpi_replay_main(rank, trace_filename);