1 /* Copyright (c) 2009-2019. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
6 #include "smpi_coll.hpp"
7 #include "smpi_comm.hpp"
8 #include "smpi_datatype.hpp"
9 #include "smpi_group.hpp"
10 #include "smpi_request.hpp"
11 #include "xbt/replay.hpp"
12 #include <simgrid/smpi/replay.hpp>
13 #include <src/smpi/include/private.hpp>
17 #include <unordered_map>
22 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay, smpi, "Trace Replay with SMPI");
24 // From https://stackoverflow.com/questions/7110301/generic-hash-for-tuples-in-unordered-map-unordered-set
25 // This is all just to make std::unordered_map work with std::tuple. If we need this in other places,
26 // this could go into a header file.
27 namespace hash_tuple {
28 template <typename TT> class hash {
30 size_t operator()(TT const& tt) const { return std::hash<TT>()(tt); }
33 template <class T> inline void hash_combine(std::size_t& seed, T const& v)
35 seed ^= hash_tuple::hash<T>()(v) + 0x9e3779b9 + (seed << 6) + (seed >> 2);
38 // Recursive template code derived from Matthieu M.
39 template <class Tuple, size_t Index = std::tuple_size<Tuple>::value - 1> class HashValueImpl {
41 static void apply(size_t& seed, Tuple const& tuple)
43 HashValueImpl<Tuple, Index - 1>::apply(seed, tuple);
44 hash_combine(seed, std::get<Index>(tuple));
48 template <class Tuple> class HashValueImpl<Tuple, 0> {
50 static void apply(size_t& seed, Tuple const& tuple) { hash_combine(seed, std::get<0>(tuple)); }
53 template <typename... TT> class hash<std::tuple<TT...>> {
55 size_t operator()(std::tuple<TT...> const& tt) const
58 HashValueImpl<std::tuple<TT...>>::apply(seed, tt);
64 typedef std::tuple</*sender*/ int, /* reciever */ int, /* tag */int> req_key_t;
65 typedef std::unordered_map<req_key_t, MPI_Request, hash_tuple::hash<std::tuple<int,int,int>>> req_storage_t;
67 void log_timed_action(simgrid::xbt::ReplayAction& action, double clock)
69 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
70 std::string s = boost::algorithm::join(action, " ");
71 XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed() - clock);
76 static double parse_double(const std::string& string)
78 return xbt_str_parse_double(string.c_str(), "%s is not a double");
85 MPI_Datatype MPI_DEFAULT_TYPE;
87 class RequestStorage {
98 req_storage_t& get_store()
103 void get_requests(std::vector<MPI_Request>& vec)
105 for (auto& pair : store) {
106 auto& req = pair.second;
107 auto my_proc_id = simgrid::s4u::this_actor::get_pid();
108 if (req != MPI_REQUEST_NULL && (req->src() == my_proc_id || req->dst() == my_proc_id)) {
109 vec.push_back(pair.second);
110 pair.second->print_request("MM");
115 MPI_Request find(int src, int dst, int tag)
117 req_storage_t::iterator it = store.find(req_key_t(src, dst, tag));
118 return (it == store.end()) ? MPI_REQUEST_NULL : it->second;
121 void remove(MPI_Request req)
123 if (req == MPI_REQUEST_NULL) return;
125 store.erase(req_key_t(req->src()-1, req->dst()-1, req->tag()));
128 void add(MPI_Request req)
130 if (req != MPI_REQUEST_NULL) // Can and does happen in the case of TestAction
131 store.insert({req_key_t(req->src()-1, req->dst()-1, req->tag()), req});
134 /* Sometimes we need to re-insert MPI_REQUEST_NULL but we still need src,dst and tag */
135 void addNullRequest(int src, int dst, int tag)
137 store.insert({req_key_t(
138 MPI_COMM_WORLD->group()->actor(src)->get_pid()-1,
139 MPI_COMM_WORLD->group()->actor(dst)->get_pid()-1,
140 tag), MPI_REQUEST_NULL});
144 void WaitTestParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
146 CHECK_ACTION_PARAMS(action, 3, 0)
147 src = std::stoi(action[2]);
148 dst = std::stoi(action[3]);
149 tag = std::stoi(action[4]);
152 void SendRecvParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
154 CHECK_ACTION_PARAMS(action, 3, 1)
155 partner = std::stoi(action[2]);
156 tag = std::stoi(action[3]);
157 size = parse_double(action[4]);
158 if (action.size() > 5)
159 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
162 void ComputeParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
164 CHECK_ACTION_PARAMS(action, 1, 0)
165 flops = parse_double(action[2]);
168 void BcastArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
170 CHECK_ACTION_PARAMS(action, 1, 2)
171 size = parse_double(action[2]);
172 root = (action.size() > 3) ? std::stoi(action[3]) : 0;
173 if (action.size() > 4)
174 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
177 void ReduceArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
179 CHECK_ACTION_PARAMS(action, 2, 2)
180 comm_size = parse_double(action[2]);
181 comp_size = parse_double(action[3]);
182 root = (action.size() > 4) ? std::stoi(action[4]) : 0;
183 if (action.size() > 5)
184 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
187 void AllReduceArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
189 CHECK_ACTION_PARAMS(action, 2, 1)
190 comm_size = parse_double(action[2]);
191 comp_size = parse_double(action[3]);
192 if (action.size() > 4)
193 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
196 void AllToAllArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
198 CHECK_ACTION_PARAMS(action, 2, 1)
199 comm_size = MPI_COMM_WORLD->size();
200 send_size = parse_double(action[2]);
201 recv_size = parse_double(action[3]);
203 if (action.size() > 4)
204 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
205 if (action.size() > 5)
206 datatype2 = simgrid::smpi::Datatype::decode(action[5]);
209 void GatherArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string& name)
211 /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
214 1) 68 is the sendcounts
215 2) 68 is the recvcounts
216 3) 0 is the root node
217 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
218 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
220 CHECK_ACTION_PARAMS(action, 2, 3)
221 comm_size = MPI_COMM_WORLD->size();
222 send_size = parse_double(action[2]);
223 recv_size = parse_double(action[3]);
225 if (name == "gather") {
226 root = (action.size() > 4) ? std::stoi(action[4]) : 0;
227 if (action.size() > 5)
228 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
229 if (action.size() > 6)
230 datatype2 = simgrid::smpi::Datatype::decode(action[6]);
232 if (action.size() > 4)
233 datatype1 = simgrid::smpi::Datatype::decode(action[4]);
234 if (action.size() > 5)
235 datatype2 = simgrid::smpi::Datatype::decode(action[5]);
239 void GatherVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string& name)
241 /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
242 0 gather 68 68 10 10 10 0 0 0
244 1) 68 is the sendcount
245 2) 68 10 10 10 is the recvcounts
246 3) 0 is the root node
247 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
248 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
250 comm_size = MPI_COMM_WORLD->size();
251 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
252 send_size = parse_double(action[2]);
253 disps = std::vector<int>(comm_size, 0);
254 recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
256 if (name == "gatherv") {
257 root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
258 if (action.size() > 4 + comm_size)
259 datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
260 if (action.size() > 5 + comm_size)
261 datatype2 = simgrid::smpi::Datatype::decode(action[5 + comm_size]);
263 int datatype_index = 0;
265 /* The 3 comes from "0 gather <sendcount>", which must always be present.
266 * The + comm_size is the recvcounts array, which must also be present
268 if (action.size() > 3 + comm_size + comm_size) { /* datatype + disp are specified */
269 datatype_index = 3 + comm_size;
270 disp_index = datatype_index + 1;
271 datatype1 = simgrid::smpi::Datatype::decode(action[datatype_index]);
272 datatype2 = simgrid::smpi::Datatype::decode(action[datatype_index]);
273 } else if (action.size() >
274 3 + comm_size + 2) { /* disps specified; datatype is not specified; use the default one */
275 disp_index = 3 + comm_size;
276 } else if (action.size() > 3 + comm_size) { /* only datatype, no disp specified */
277 datatype_index = 3 + comm_size;
278 datatype1 = simgrid::smpi::Datatype::decode(action[datatype_index]);
279 datatype2 = simgrid::smpi::Datatype::decode(action[datatype_index]);
282 if (disp_index != 0) {
283 for (unsigned int i = 0; i < comm_size; i++)
284 disps[i] = std::stoi(action[disp_index + i]);
288 for (unsigned int i = 0; i < comm_size; i++) {
289 (*recvcounts)[i] = std::stoi(action[i + 3]);
291 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
294 void ScatterArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
296 /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
299 1) 68 is the sendcounts
300 2) 68 is the recvcounts
301 3) 0 is the root node
302 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
303 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
305 CHECK_ACTION_PARAMS(action, 2, 3)
306 comm_size = MPI_COMM_WORLD->size();
307 send_size = parse_double(action[2]);
308 recv_size = parse_double(action[3]);
309 root = (action.size() > 4) ? std::stoi(action[4]) : 0;
310 if (action.size() > 5)
311 datatype1 = simgrid::smpi::Datatype::decode(action[5]);
312 if (action.size() > 6)
313 datatype2 = simgrid::smpi::Datatype::decode(action[6]);
316 void ScatterVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
318 /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
319 0 gather 68 10 10 10 68 0 0 0
321 1) 68 10 10 10 is the sendcounts
322 2) 68 is the recvcount
323 3) 0 is the root node
324 4) 0 is the send datatype id, see simgrid::smpi::Datatype::decode()
325 5) 0 is the recv datatype id, see simgrid::smpi::Datatype::decode()
327 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
328 recv_size = parse_double(action[2 + comm_size]);
329 disps = std::vector<int>(comm_size, 0);
330 sendcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
332 if (action.size() > 5 + comm_size)
333 datatype1 = simgrid::smpi::Datatype::decode(action[4 + comm_size]);
334 if (action.size() > 5 + comm_size)
335 datatype2 = simgrid::smpi::Datatype::decode(action[5]);
337 for (unsigned int i = 0; i < comm_size; i++) {
338 (*sendcounts)[i] = std::stoi(action[i + 2]);
340 send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
341 root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
344 void ReduceScatterArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
346 /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
347 0 reducescatter 275427 275427 275427 204020 11346849 0
349 1) The first four values after the name of the action declare the recvcounts array
350 2) The value 11346849 is the amount of instructions
351 3) The last value corresponds to the datatype, see simgrid::smpi::Datatype::decode().
353 comm_size = MPI_COMM_WORLD->size();
354 CHECK_ACTION_PARAMS(action, comm_size + 1, 1)
355 comp_size = parse_double(action[2 + comm_size]);
356 recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
357 if (action.size() > 3 + comm_size)
358 datatype1 = simgrid::smpi::Datatype::decode(action[3 + comm_size]);
360 for (unsigned int i = 0; i < comm_size; i++) {
361 recvcounts->push_back(std::stoi(action[i + 2]));
363 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
366 void AllToAllVArgParser::parse(simgrid::xbt::ReplayAction& action, const std::string&)
368 /* The structure of the alltoallv action for the rank 0 (total 4 processes) is the following:
369 0 alltoallv 100 1 7 10 12 100 1 70 10 5
371 1) 100 is the size of the send buffer *sizeof(int),
372 2) 1 7 10 12 is the sendcounts array
373 3) 100*sizeof(int) is the size of the receiver buffer
374 4) 1 70 10 5 is the recvcounts array
376 comm_size = MPI_COMM_WORLD->size();
377 CHECK_ACTION_PARAMS(action, 2 * comm_size + 2, 2)
378 sendcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
379 recvcounts = std::shared_ptr<std::vector<int>>(new std::vector<int>(comm_size));
380 senddisps = std::vector<int>(comm_size, 0);
381 recvdisps = std::vector<int>(comm_size, 0);
383 if (action.size() > 5 + 2 * comm_size)
384 datatype1 = simgrid::smpi::Datatype::decode(action[4 + 2 * comm_size]);
385 if (action.size() > 5 + 2 * comm_size)
386 datatype2 = simgrid::smpi::Datatype::decode(action[5 + 2 * comm_size]);
388 send_buf_size = parse_double(action[2]);
389 recv_buf_size = parse_double(action[3 + comm_size]);
390 for (unsigned int i = 0; i < comm_size; i++) {
391 (*sendcounts)[i] = std::stoi(action[3 + i]);
392 (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
394 send_size_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
395 recv_size_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
398 void WaitAction::kernel(simgrid::xbt::ReplayAction& action)
400 std::string s = boost::algorithm::join(action, " ");
401 xbt_assert(req_storage.size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
402 MPI_Request request = req_storage.find(args.src, args.dst, args.tag);
403 req_storage.remove(request);
405 if (request == MPI_REQUEST_NULL) {
406 /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just
411 int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
413 // Must be taken before Request::wait() since the request may be set to
414 // MPI_REQUEST_NULL by Request::wait!
415 bool is_wait_for_receive = (request->flags() & MPI_REQ_RECV);
416 // TODO: Here we take the rank while we normally take the process id (look for my_proc_id)
417 TRACE_smpi_comm_in(rank, __func__, new simgrid::instr::WaitTIData(args.src, args.dst, args.tag));
420 Request::wait(&request, &status);
422 TRACE_smpi_comm_out(rank);
423 if (is_wait_for_receive)
424 TRACE_smpi_recv(args.src, args.dst, args.tag);
427 void SendAction::kernel(simgrid::xbt::ReplayAction&)
429 int dst_traced = MPI_COMM_WORLD->group()->actor(args.partner)->get_pid();
431 TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
432 args.tag, Datatype::encode(args.datatype1)));
433 if (not TRACE_smpi_view_internals())
434 TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, args.tag, args.size * args.datatype1->size());
436 if (name == "send") {
437 Request::send(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
438 } else if (name == "isend") {
439 MPI_Request request = Request::isend(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
440 req_storage.add(request);
442 xbt_die("Don't know this action, %s", name.c_str());
445 TRACE_smpi_comm_out(my_proc_id);
448 void RecvAction::kernel(simgrid::xbt::ReplayAction&)
450 TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData(name, args.partner, args.size,
451 args.tag, Datatype::encode(args.datatype1)));
454 // unknown size from the receiver point of view
455 if (args.size <= 0.0) {
456 Request::probe(args.partner, args.tag, MPI_COMM_WORLD, &status);
457 args.size = status.count;
460 bool is_recv = false; // Help analyzers understanding that status is not used unintialized
461 if (name == "recv") {
463 Request::recv(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD, &status);
464 } else if (name == "irecv") {
465 MPI_Request request = Request::irecv(nullptr, args.size, args.datatype1, args.partner, args.tag, MPI_COMM_WORLD);
466 req_storage.add(request);
471 TRACE_smpi_comm_out(my_proc_id);
472 if (is_recv && not TRACE_smpi_view_internals()) {
473 int src_traced = MPI_COMM_WORLD->group()->actor(status.MPI_SOURCE)->get_pid();
474 TRACE_smpi_recv(src_traced, my_proc_id, args.tag);
478 void ComputeAction::kernel(simgrid::xbt::ReplayAction&)
480 TRACE_smpi_computing_in(my_proc_id, args.flops);
481 smpi_execute_flops(args.flops);
482 TRACE_smpi_computing_out(my_proc_id);
485 void TestAction::kernel(simgrid::xbt::ReplayAction&)
487 MPI_Request request = req_storage.find(args.src, args.dst, args.tag);
488 req_storage.remove(request);
489 // if request is null here, this may mean that a previous test has succeeded
490 // Different times in traced application and replayed version may lead to this
491 // In this case, ignore the extra calls.
492 if (request != MPI_REQUEST_NULL) {
493 TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::NoOpTIData("test"));
497 Request::test(&request, &status, &flag);
499 XBT_DEBUG("MPI_Test result: %d", flag);
500 /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now
502 if (request == MPI_REQUEST_NULL)
503 req_storage.addNullRequest(args.src, args.dst, args.tag);
505 req_storage.add(request);
507 TRACE_smpi_comm_out(my_proc_id);
511 void InitAction::kernel(simgrid::xbt::ReplayAction& action)
513 CHECK_ACTION_PARAMS(action, 0, 1)
514 MPI_DEFAULT_TYPE = (action.size() > 2) ? MPI_DOUBLE // default MPE datatype
515 : MPI_BYTE; // default TAU datatype
517 /* start a simulated timer */
518 smpi_process()->simulated_start();
521 void CommunicatorAction::kernel(simgrid::xbt::ReplayAction&)
526 void WaitAllAction::kernel(simgrid::xbt::ReplayAction&)
528 const unsigned int count_requests = req_storage.size();
530 if (count_requests > 0) {
531 TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::Pt2PtTIData("waitall", -1, count_requests, ""));
532 std::vector<std::pair</*sender*/int,/*recv*/int>> sender_receiver;
533 std::vector<MPI_Request> reqs;
534 req_storage.get_requests(reqs);
535 for (const auto& req : reqs) {
536 if (req && (req->flags() & MPI_REQ_RECV)) {
537 sender_receiver.push_back({req->src(), req->dst()});
540 MPI_Status status[count_requests];
541 Request::waitall(count_requests, &(reqs.data())[0], status);
542 req_storage.get_store().clear();
544 for (auto& pair : sender_receiver) {
545 TRACE_smpi_recv(pair.first, pair.second, 0);
547 TRACE_smpi_comm_out(my_proc_id);
551 void BarrierAction::kernel(simgrid::xbt::ReplayAction&)
553 TRACE_smpi_comm_in(my_proc_id, __func__, new simgrid::instr::NoOpTIData("barrier"));
554 Colls::barrier(MPI_COMM_WORLD);
555 TRACE_smpi_comm_out(my_proc_id);
558 void BcastAction::kernel(simgrid::xbt::ReplayAction&)
560 TRACE_smpi_comm_in(my_proc_id, "action_bcast",
561 new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(args.root)->get_pid(),
562 -1.0, args.size, -1, Datatype::encode(args.datatype1), ""));
564 Colls::bcast(send_buffer(args.size * args.datatype1->size()), args.size, args.datatype1, args.root, MPI_COMM_WORLD);
566 TRACE_smpi_comm_out(my_proc_id);
569 void ReduceAction::kernel(simgrid::xbt::ReplayAction&)
571 TRACE_smpi_comm_in(my_proc_id, "action_reduce",
572 new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(args.root)->get_pid(),
573 args.comp_size, args.comm_size, -1,
574 Datatype::encode(args.datatype1), ""));
576 Colls::reduce(send_buffer(args.comm_size * args.datatype1->size()),
577 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL, args.root, MPI_COMM_WORLD);
578 smpi_execute_flops(args.comp_size);
580 TRACE_smpi_comm_out(my_proc_id);
583 void AllReduceAction::kernel(simgrid::xbt::ReplayAction&)
585 TRACE_smpi_comm_in(my_proc_id, "action_allreduce", new simgrid::instr::CollTIData("allreduce", -1, args.comp_size, args.comm_size, -1,
586 Datatype::encode(args.datatype1), ""));
588 Colls::allreduce(send_buffer(args.comm_size * args.datatype1->size()),
589 recv_buffer(args.comm_size * args.datatype1->size()), args.comm_size, args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
590 smpi_execute_flops(args.comp_size);
592 TRACE_smpi_comm_out(my_proc_id);
595 void AllToAllAction::kernel(simgrid::xbt::ReplayAction&)
597 TRACE_smpi_comm_in(my_proc_id, "action_alltoall",
598 new simgrid::instr::CollTIData("alltoall", -1, -1.0, args.send_size, args.recv_size,
599 Datatype::encode(args.datatype1),
600 Datatype::encode(args.datatype2)));
602 Colls::alltoall(send_buffer(args.send_size * args.comm_size * args.datatype1->size()), args.send_size,
603 args.datatype1, recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()),
604 args.recv_size, args.datatype2, MPI_COMM_WORLD);
606 TRACE_smpi_comm_out(my_proc_id);
609 void GatherAction::kernel(simgrid::xbt::ReplayAction&)
611 TRACE_smpi_comm_in(my_proc_id, name.c_str(), new simgrid::instr::CollTIData(name, (name == "gather") ? args.root : -1, -1.0, args.send_size, args.recv_size,
612 Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
614 if (name == "gather") {
615 int rank = MPI_COMM_WORLD->rank();
616 Colls::gather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
617 (rank == args.root) ? recv_buffer(args.recv_size * args.comm_size * args.datatype2->size()) : nullptr, args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
620 Colls::allgather(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
621 recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, MPI_COMM_WORLD);
623 TRACE_smpi_comm_out(my_proc_id);
626 void GatherVAction::kernel(simgrid::xbt::ReplayAction&)
628 int rank = MPI_COMM_WORLD->rank();
630 TRACE_smpi_comm_in(my_proc_id, name.c_str(), new simgrid::instr::VarCollTIData(
631 name, (name == "gatherv") ? args.root : -1, args.send_size, nullptr, -1, args.recvcounts,
632 Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
634 if (name == "gatherv") {
635 Colls::gatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
636 (rank == args.root) ? recv_buffer(args.recv_size_sum * args.datatype2->size()) : nullptr,
637 args.recvcounts->data(), args.disps.data(), args.datatype2, args.root, MPI_COMM_WORLD);
640 Colls::allgatherv(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
641 recv_buffer(args.recv_size_sum * args.datatype2->size()), args.recvcounts->data(),
642 args.disps.data(), args.datatype2, MPI_COMM_WORLD);
645 TRACE_smpi_comm_out(my_proc_id);
648 void ScatterAction::kernel(simgrid::xbt::ReplayAction&)
650 int rank = MPI_COMM_WORLD->rank();
651 TRACE_smpi_comm_in(my_proc_id, "action_scatter", new simgrid::instr::CollTIData(name, args.root, -1.0, args.send_size, args.recv_size,
652 Datatype::encode(args.datatype1),
653 Datatype::encode(args.datatype2)));
655 Colls::scatter(send_buffer(args.send_size * args.datatype1->size()), args.send_size, args.datatype1,
656 (rank == args.root) ? recv_buffer(args.recv_size * args.datatype2->size()) : nullptr, args.recv_size, args.datatype2, args.root, MPI_COMM_WORLD);
658 TRACE_smpi_comm_out(my_proc_id);
661 void ScatterVAction::kernel(simgrid::xbt::ReplayAction&)
663 int rank = MPI_COMM_WORLD->rank();
664 TRACE_smpi_comm_in(my_proc_id, "action_scatterv", new simgrid::instr::VarCollTIData(name, args.root, -1, args.sendcounts, args.recv_size,
665 nullptr, Datatype::encode(args.datatype1),
666 Datatype::encode(args.datatype2)));
668 Colls::scatterv((rank == args.root) ? send_buffer(args.send_size_sum * args.datatype1->size()) : nullptr,
669 args.sendcounts->data(), args.disps.data(), args.datatype1,
670 recv_buffer(args.recv_size * args.datatype2->size()), args.recv_size, args.datatype2, args.root,
673 TRACE_smpi_comm_out(my_proc_id);
676 void ReduceScatterAction::kernel(simgrid::xbt::ReplayAction&)
678 TRACE_smpi_comm_in(my_proc_id, "action_reducescatter",
679 new simgrid::instr::VarCollTIData("reducescatter", -1, 0, nullptr, -1, args.recvcounts,
680 std::to_string(args.comp_size), /* ugly hack to print comp_size */
681 Datatype::encode(args.datatype1)));
683 Colls::reduce_scatter(send_buffer(args.recv_size_sum * args.datatype1->size()),
684 recv_buffer(args.recv_size_sum * args.datatype1->size()), args.recvcounts->data(),
685 args.datatype1, MPI_OP_NULL, MPI_COMM_WORLD);
687 smpi_execute_flops(args.comp_size);
688 TRACE_smpi_comm_out(my_proc_id);
691 void AllToAllVAction::kernel(simgrid::xbt::ReplayAction&)
693 TRACE_smpi_comm_in(my_proc_id, __func__,
694 new simgrid::instr::VarCollTIData(
695 "alltoallv", -1, args.send_size_sum, args.sendcounts, args.recv_size_sum, args.recvcounts,
696 Datatype::encode(args.datatype1), Datatype::encode(args.datatype2)));
698 Colls::alltoallv(send_buffer(args.send_buf_size * args.datatype1->size()), args.sendcounts->data(), args.senddisps.data(), args.datatype1,
699 recv_buffer(args.recv_buf_size * args.datatype2->size()), args.recvcounts->data(), args.recvdisps.data(), args.datatype2, MPI_COMM_WORLD);
701 TRACE_smpi_comm_out(my_proc_id);
703 } // Replay Namespace
704 }} // namespace simgrid::smpi
706 static std::unordered_map<aid_t, simgrid::smpi::replay::RequestStorage> storage;
707 /** @brief Only initialize the replay, don't do it for real */
708 void smpi_replay_init(const char* instance_id, int rank, double start_delay_flops)
710 if (not smpi_process()->initializing()){
711 simgrid::s4u::Actor::self()->set_property("instance_id", instance_id);
712 simgrid::s4u::Actor::self()->set_property("rank", std::to_string(rank));
713 simgrid::smpi::ActorExt::init();
715 smpi_process()->mark_as_initialized();
716 smpi_process()->set_replaying(true);
718 int my_proc_id = simgrid::s4u::this_actor::get_pid();
720 TRACE_smpi_init(my_proc_id);
721 TRACE_smpi_computing_init(my_proc_id);
722 TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
723 TRACE_smpi_comm_out(my_proc_id);
724 xbt_replay_action_register("init", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::InitAction().execute(action); });
725 xbt_replay_action_register("finalize", [](simgrid::xbt::ReplayAction&) { /* nothing to do */ });
726 xbt_replay_action_register("comm_size", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
727 xbt_replay_action_register("comm_split",[](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
728 xbt_replay_action_register("comm_dup", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::CommunicatorAction().execute(action); });
729 xbt_replay_action_register("send", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("send", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
730 xbt_replay_action_register("isend", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::SendAction("isend", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
731 xbt_replay_action_register("recv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("recv", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
732 xbt_replay_action_register("irecv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::RecvAction("irecv", storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
733 xbt_replay_action_register("test", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::TestAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
734 xbt_replay_action_register("wait", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
735 xbt_replay_action_register("waitall", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::WaitAllAction(storage[simgrid::s4u::this_actor::get_pid()]).execute(action); });
736 xbt_replay_action_register("barrier", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BarrierAction().execute(action); });
737 xbt_replay_action_register("bcast", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::BcastAction().execute(action); });
738 xbt_replay_action_register("reduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceAction().execute(action); });
739 xbt_replay_action_register("allreduce", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllReduceAction().execute(action); });
740 xbt_replay_action_register("alltoall", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllAction().execute(action); });
741 xbt_replay_action_register("alltoallv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::AllToAllVAction().execute(action); });
742 xbt_replay_action_register("gather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("gather").execute(action); });
743 xbt_replay_action_register("scatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterAction().execute(action); });
744 xbt_replay_action_register("gatherv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("gatherv").execute(action); });
745 xbt_replay_action_register("scatterv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ScatterVAction().execute(action); });
746 xbt_replay_action_register("allgather", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherAction("allgather").execute(action); });
747 xbt_replay_action_register("allgatherv", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::GatherVAction("allgatherv").execute(action); });
748 xbt_replay_action_register("reducescatter", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ReduceScatterAction().execute(action); });
749 xbt_replay_action_register("compute", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::replay::ComputeAction().execute(action); });
751 //if we have a delayed start, sleep here.
752 if (start_delay_flops > 0) {
753 XBT_VERB("Delayed start for instance - Sleeping for %f flops ", start_delay_flops);
754 smpi_execute_flops(start_delay_flops);
756 // Wait for the other actors to initialize also
757 simgrid::s4u::this_actor::yield();
761 /** @brief actually run the replay after initialization */
762 void smpi_replay_main(int rank, const char* trace_filename)
764 static int active_processes = 0;
766 storage[simgrid::s4u::this_actor::get_pid()] = simgrid::smpi::replay::RequestStorage();
767 std::string rank_string = std::to_string(rank);
768 simgrid::xbt::replay_runner(rank_string.c_str(), trace_filename);
770 /* and now, finalize everything */
771 /* One active process will stop. Decrease the counter*/
772 unsigned int count_requests = storage[simgrid::s4u::this_actor::get_pid()].size();
773 XBT_DEBUG("There are %ud elements in reqq[*]", count_requests);
774 if (count_requests > 0) {
775 MPI_Request requests[count_requests];
776 MPI_Status status[count_requests];
779 for (auto const& pair : storage[simgrid::s4u::this_actor::get_pid()].get_store()) {
780 requests[i] = pair.second;
783 simgrid::smpi::Request::waitall(count_requests, requests, status);
787 if(active_processes==0){
788 /* Last process alive speaking: end the simulated timer */
789 XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
790 smpi_free_replay_tmp_buffers();
793 TRACE_smpi_comm_in(simgrid::s4u::this_actor::get_pid(), "smpi_replay_run_finalize",
794 new simgrid::instr::NoOpTIData("finalize"));
796 smpi_process()->finalize();
798 TRACE_smpi_comm_out(simgrid::s4u::this_actor::get_pid());
799 TRACE_smpi_finalize(simgrid::s4u::this_actor::get_pid());
802 /** @brief chain a replay initialization and a replay start */
803 void smpi_replay_run(const char* instance_id, int rank, double start_delay_flops, const char* trace_filename)
805 smpi_replay_init(instance_id, rank, start_delay_flops);
806 smpi_replay_main(rank, trace_filename);