1 /* Copyright (c) 2009-2017. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
15 #include <boost/algorithm/string/join.hpp>
18 #include <unordered_map>
21 using simgrid::s4u::Actor;
23 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
25 static int communicator_size = 0;
26 static int active_processes = 0;
27 static std::unordered_map<int, std::vector<MPI_Request>*> reqq;
29 static MPI_Datatype MPI_DEFAULT_TYPE;
31 #define CHECK_ACTION_PARAMS(action, mandatory, optional) \
33 if (action.size() < static_cast<unsigned long>(mandatory + 2)) \
34 THROWF(arg_error, 0, "%s replay failed.\n" \
35 "%lu items were given on the line. First two should be process_id and action. " \
36 "This action needs after them %lu mandatory arguments, and accepts %lu optional ones. \n" \
37 "Please contact the Simgrid team if support is needed", \
38 __FUNCTION__, action.size(), static_cast<unsigned long>(mandatory), \
39 static_cast<unsigned long>(optional)); \
42 static void log_timed_action (simgrid::xbt::ReplayAction& action, double clock){
43 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
44 std::string s = boost::algorithm::join(action, " ");
45 XBT_VERB("%s %f", s.c_str(), smpi_process()->simulated_elapsed()-clock);
49 static std::vector<MPI_Request>* get_reqq_self()
51 return reqq.at(Actor::self()->getPid());
54 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
56 reqq.insert({Actor::self()->getPid(), mpi_request});
60 static double parse_double(std::string string)
62 return xbt_str_parse_double(string.c_str(), "%s is not a double");
66 //TODO: this logic should be moved inside the datatype class, to support all predefined types and get rid of is_replayable.
67 static MPI_Datatype decode_datatype(std::string action)
69 return simgrid::smpi::Datatype::decode(const_cast<const char* const>(action.c_str()));
72 const char* encode_datatype(MPI_Datatype datatype)
74 if (datatype == nullptr) /* this actually does seem to be possible, had this in the scatter2 test */
77 return datatype->encode();
83 static void action_init(simgrid::xbt::ReplayAction& action)
85 XBT_DEBUG("Initialize the counters");
86 CHECK_ACTION_PARAMS(action, 0, 1)
87 if (action.size() > 2)
88 MPI_DEFAULT_TYPE = MPI_DOUBLE; // default MPE datatype
90 MPI_DEFAULT_TYPE = MPI_BYTE; // default TAU datatype
92 /* start a simulated timer */
93 smpi_process()->simulated_start();
94 /*initialize the number of active processes */
95 active_processes = smpi_process_count();
97 set_reqq_self(new std::vector<MPI_Request>);
100 static void action_finalize(simgrid::xbt::ReplayAction& action)
105 static void action_comm_size(simgrid::xbt::ReplayAction& action)
107 communicator_size = parse_double(action[2]);
108 log_timed_action (action, smpi_process()->simulated_elapsed());
111 static void action_comm_split(simgrid::xbt::ReplayAction& action)
113 log_timed_action (action, smpi_process()->simulated_elapsed());
116 static void action_comm_dup(simgrid::xbt::ReplayAction& action)
118 log_timed_action (action, smpi_process()->simulated_elapsed());
121 static void action_compute(simgrid::xbt::ReplayAction& action)
123 CHECK_ACTION_PARAMS(action, 1, 0)
124 double clock = smpi_process()->simulated_elapsed();
125 double flops= parse_double(action[2]);
126 int my_proc_id = Actor::self()->getPid();
128 TRACE_smpi_computing_in(my_proc_id, flops);
129 smpi_execute_flops(flops);
130 TRACE_smpi_computing_out(my_proc_id);
132 log_timed_action (action, clock);
135 static void action_send(simgrid::xbt::ReplayAction& action)
137 CHECK_ACTION_PARAMS(action, 2, 1)
138 int to = std::stoi(action[2]);
139 double size=parse_double(action[3]);
140 double clock = smpi_process()->simulated_elapsed();
142 MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
144 int my_proc_id = Actor::self()->getPid();
145 int dst_traced = MPI_COMM_WORLD->group()->actor(to)->getPid();
147 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
148 new simgrid::instr::Pt2PtTIData("send", to, size, MPI_CURRENT_TYPE->encode()));
149 if (not TRACE_smpi_view_internals())
150 TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, size * MPI_CURRENT_TYPE->size());
152 Request::send(nullptr, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
154 TRACE_smpi_comm_out(my_proc_id);
156 log_timed_action(action, clock);
159 static void action_Isend(simgrid::xbt::ReplayAction& action)
161 CHECK_ACTION_PARAMS(action, 2, 1)
162 int to = std::stoi(action[2]);
163 double size=parse_double(action[3]);
164 double clock = smpi_process()->simulated_elapsed();
166 MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
168 int my_proc_id = Actor::self()->getPid();
169 int dst_traced = MPI_COMM_WORLD->group()->actor(to)->getPid();
170 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
171 new simgrid::instr::Pt2PtTIData("Isend", to, size, MPI_CURRENT_TYPE->encode()));
172 if (not TRACE_smpi_view_internals())
173 TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, size * MPI_CURRENT_TYPE->size());
175 MPI_Request request = Request::isend(nullptr, size, MPI_CURRENT_TYPE, to, 0, MPI_COMM_WORLD);
177 TRACE_smpi_comm_out(my_proc_id);
179 get_reqq_self()->push_back(request);
181 log_timed_action (action, clock);
184 static void action_recv(simgrid::xbt::ReplayAction& action)
186 CHECK_ACTION_PARAMS(action, 2, 1)
187 int from = std::stoi(action[2]);
188 double size=parse_double(action[3]);
189 double clock = smpi_process()->simulated_elapsed();
192 MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
194 int my_proc_id = Actor::self()->getPid();
195 int src_traced = MPI_COMM_WORLD->group()->actor(from)->getPid();
197 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
198 new simgrid::instr::Pt2PtTIData("recv", from, size, MPI_CURRENT_TYPE->encode()));
200 //unknown size from the receiver point of view
202 Request::probe(from, 0, MPI_COMM_WORLD, &status);
206 Request::recv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
208 TRACE_smpi_comm_out(my_proc_id);
209 if (not TRACE_smpi_view_internals()) {
210 TRACE_smpi_recv(src_traced, my_proc_id, 0);
213 log_timed_action (action, clock);
216 static void action_Irecv(simgrid::xbt::ReplayAction& action)
218 CHECK_ACTION_PARAMS(action, 2, 1)
219 int from = std::stoi(action[2]);
220 double size=parse_double(action[3]);
221 double clock = smpi_process()->simulated_elapsed();
223 MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
225 int my_proc_id = Actor::self()->getPid();
226 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
227 new simgrid::instr::Pt2PtTIData("Irecv", from, size, MPI_CURRENT_TYPE->encode()));
229 //unknow size from the receiver pov
231 Request::probe(from, 0, MPI_COMM_WORLD, &status);
235 MPI_Request request = Request::irecv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
237 TRACE_smpi_comm_out(my_proc_id);
238 get_reqq_self()->push_back(request);
240 log_timed_action (action, clock);
243 static void action_test(simgrid::xbt::ReplayAction& action)
245 CHECK_ACTION_PARAMS(action, 0, 0)
246 double clock = smpi_process()->simulated_elapsed();
249 MPI_Request request = get_reqq_self()->back();
250 get_reqq_self()->pop_back();
251 //if request is null here, this may mean that a previous test has succeeded
252 //Different times in traced application and replayed version may lead to this
253 //In this case, ignore the extra calls.
254 if(request!=nullptr){
255 int my_proc_id = Actor::self()->getPid();
256 TRACE_smpi_testing_in(my_proc_id);
258 int flag = Request::test(&request, &status);
260 XBT_DEBUG("MPI_Test result: %d", flag);
261 /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now nullptr.*/
262 get_reqq_self()->push_back(request);
264 TRACE_smpi_testing_out(my_proc_id);
266 log_timed_action (action, clock);
269 static void action_wait(simgrid::xbt::ReplayAction& action)
271 CHECK_ACTION_PARAMS(action, 0, 0)
272 double clock = smpi_process()->simulated_elapsed();
275 std::string s = boost::algorithm::join(action, " ");
276 xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s", s.c_str());
277 MPI_Request request = get_reqq_self()->back();
278 get_reqq_self()->pop_back();
280 if (request==nullptr){
281 /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just return.*/
285 int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
287 MPI_Group group = request->comm()->group();
288 int src_traced = group->rank(request->src());
289 int dst_traced = group->rank(request->dst());
290 int is_wait_for_receive = (request->flags() & RECV);
291 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::NoOpTIData("wait"));
293 Request::wait(&request, &status);
295 TRACE_smpi_comm_out(rank);
296 if (is_wait_for_receive)
297 TRACE_smpi_recv(src_traced, dst_traced, 0);
298 log_timed_action (action, clock);
301 static void action_waitall(simgrid::xbt::ReplayAction& action)
303 CHECK_ACTION_PARAMS(action, 0, 0)
304 double clock = smpi_process()->simulated_elapsed();
305 const unsigned int count_requests = get_reqq_self()->size();
307 if (count_requests>0) {
308 MPI_Status status[count_requests];
310 int my_proc_id_traced = Actor::self()->getPid();
311 TRACE_smpi_comm_in(my_proc_id_traced, __FUNCTION__,
312 new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
313 int recvs_snd[count_requests];
314 int recvs_rcv[count_requests];
315 for (unsigned int i = 0; i < count_requests; i++) {
316 const auto& req = (*get_reqq_self())[i];
317 if (req && (req->flags() & RECV)) {
318 recvs_snd[i] = req->src();
319 recvs_rcv[i] = req->dst();
323 Request::waitall(count_requests, &(*get_reqq_self())[0], status);
325 for (unsigned i = 0; i < count_requests; i++) {
326 if (recvs_snd[i]!=-100)
327 TRACE_smpi_recv(recvs_snd[i], recvs_rcv[i],0);
329 TRACE_smpi_comm_out(my_proc_id_traced);
331 log_timed_action (action, clock);
334 static void action_barrier(simgrid::xbt::ReplayAction& action)
336 double clock = smpi_process()->simulated_elapsed();
337 int my_proc_id = Actor::self()->getPid();
338 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::NoOpTIData("barrier"));
340 Colls::barrier(MPI_COMM_WORLD);
342 TRACE_smpi_comm_out(my_proc_id);
343 log_timed_action (action, clock);
346 static void action_bcast(simgrid::xbt::ReplayAction& action)
348 CHECK_ACTION_PARAMS(action, 1, 2)
349 double size = parse_double(action[2]);
350 double clock = smpi_process()->simulated_elapsed();
351 int root = (action.size() > 3) ? std::stoi(action[3]) : 0;
352 /* Initialize MPI_CURRENT_TYPE in order to decrease the number of the checks */
353 MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
355 int my_proc_id = Actor::self()->getPid();
356 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
357 new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(root)->getPid(), -1.0, size,
358 -1, MPI_CURRENT_TYPE->encode(), ""));
360 void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
362 Colls::bcast(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
364 TRACE_smpi_comm_out(my_proc_id);
365 log_timed_action (action, clock);
368 static void action_reduce(simgrid::xbt::ReplayAction& action)
370 CHECK_ACTION_PARAMS(action, 2, 2)
371 double comm_size = parse_double(action[2]);
372 double comp_size = parse_double(action[3]);
373 double clock = smpi_process()->simulated_elapsed();
374 int root = (action.size() > 4) ? std::stoi(action[4]) : 0;
376 MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 5) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE;
378 int my_proc_id = Actor::self()->getPid();
379 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
380 new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(root)->getPid(), comp_size,
381 comm_size, -1, MPI_CURRENT_TYPE->encode(), ""));
383 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
384 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
385 Colls::reduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
386 smpi_execute_flops(comp_size);
388 TRACE_smpi_comm_out(my_proc_id);
389 log_timed_action (action, clock);
392 static void action_allReduce(simgrid::xbt::ReplayAction& action)
394 CHECK_ACTION_PARAMS(action, 2, 1)
395 double comm_size = parse_double(action[2]);
396 double comp_size = parse_double(action[3]);
398 MPI_Datatype MPI_CURRENT_TYPE = (action.size() > 4) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
400 double clock = smpi_process()->simulated_elapsed();
401 int my_proc_id = Actor::self()->getPid();
402 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::CollTIData("allReduce", -1, comp_size, comm_size, -1,
403 MPI_CURRENT_TYPE->encode(), ""));
405 void *recvbuf = smpi_get_tmp_recvbuffer(comm_size* MPI_CURRENT_TYPE->size());
406 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
407 Colls::allreduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
408 smpi_execute_flops(comp_size);
410 TRACE_smpi_comm_out(my_proc_id);
411 log_timed_action (action, clock);
414 static void action_allToAll(simgrid::xbt::ReplayAction& action)
416 CHECK_ACTION_PARAMS(action, 2, 2) //two mandatory (send and recv volumes) and two optional (corresponding datatypes)
417 double clock = smpi_process()->simulated_elapsed();
418 unsigned long comm_size = MPI_COMM_WORLD->size();
419 int send_size = parse_double(action[2]);
420 int recv_size = parse_double(action[3]);
421 MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 5) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE};
422 MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE};
424 void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* MPI_CURRENT_TYPE->size());
425 void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
427 int my_proc_id = Actor::self()->getPid();
428 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
429 new simgrid::instr::CollTIData("allToAll", -1, -1.0, send_size, recv_size,
430 MPI_CURRENT_TYPE->encode(), MPI_CURRENT_TYPE2->encode()));
432 Colls::alltoall(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
434 TRACE_smpi_comm_out(my_proc_id);
435 log_timed_action (action, clock);
438 static void action_gather(simgrid::xbt::ReplayAction& action)
440 /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
443 1) 68 is the sendcounts
444 2) 68 is the recvcounts
445 3) 0 is the root node
446 4) 0 is the send datatype id, see decode_datatype()
447 5) 0 is the recv datatype id, see decode_datatype()
449 CHECK_ACTION_PARAMS(action, 2, 3)
450 double clock = smpi_process()->simulated_elapsed();
451 unsigned long comm_size = MPI_COMM_WORLD->size();
452 int send_size = parse_double(action[2]);
453 int recv_size = parse_double(action[3]);
454 MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 6) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE};
455 MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 6) ? decode_datatype(action[6]) : MPI_DEFAULT_TYPE};
457 void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
458 void *recv = nullptr;
459 int root = (action.size() > 4) ? std::stoi(action[4]) : 0;
460 int rank = MPI_COMM_WORLD->rank();
463 recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
465 TRACE_smpi_comm_in(rank, __FUNCTION__,
466 new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
467 MPI_CURRENT_TYPE->encode(), MPI_CURRENT_TYPE2->encode()));
469 Colls::gather(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
471 TRACE_smpi_comm_out(Actor::self()->getPid());
472 log_timed_action (action, clock);
475 static void action_scatter(simgrid::xbt::ReplayAction& action)
477 /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
480 1) 68 is the sendcounts
481 2) 68 is the recvcounts
482 3) 0 is the root node
483 4) 0 is the send datatype id, see decode_datatype()
484 5) 0 is the recv datatype id, see decode_datatype()
486 CHECK_ACTION_PARAMS(action, 2, 3)
487 double clock = smpi_process()->simulated_elapsed();
488 unsigned long comm_size = MPI_COMM_WORLD->size();
489 int send_size = parse_double(action[2]);
490 int recv_size = parse_double(action[3]);
491 MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 6) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE};
492 MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 6) ? decode_datatype(action[6]) : MPI_DEFAULT_TYPE};
494 void* send = smpi_get_tmp_sendbuffer(send_size * MPI_CURRENT_TYPE->size());
495 void* recv = nullptr;
496 int root = (action.size() > 4) ? std::stoi(action[4]) : 0;
497 int rank = MPI_COMM_WORLD->rank();
500 recv = smpi_get_tmp_recvbuffer(recv_size * comm_size * MPI_CURRENT_TYPE2->size());
502 TRACE_smpi_comm_in(rank, __FUNCTION__,
503 new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
504 MPI_CURRENT_TYPE->encode(), MPI_CURRENT_TYPE2->encode()));
506 Colls::scatter(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
508 TRACE_smpi_comm_out(Actor::self()->getPid());
509 log_timed_action(action, clock);
512 static void action_gatherv(simgrid::xbt::ReplayAction& action)
514 /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
515 0 gather 68 68 10 10 10 0 0 0
517 1) 68 is the sendcount
518 2) 68 10 10 10 is the recvcounts
519 3) 0 is the root node
520 4) 0 is the send datatype id, see decode_datatype()
521 5) 0 is the recv datatype id, see decode_datatype()
523 double clock = smpi_process()->simulated_elapsed();
524 unsigned long comm_size = MPI_COMM_WORLD->size();
525 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
526 int send_size = parse_double(action[2]);
527 std::vector<int> disps(comm_size, 0);
528 std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
530 MPI_Datatype MPI_CURRENT_TYPE =
531 (action.size() > 5 + comm_size) ? decode_datatype(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
532 MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5 + comm_size) ? decode_datatype(action[5 + comm_size])
535 void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
536 void *recv = nullptr;
537 for (unsigned int i = 0; i < comm_size; i++) {
538 (*recvcounts)[i] = std::stoi(action[i + 3]);
540 int recv_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
542 int root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
543 int rank = MPI_COMM_WORLD->rank();
546 recv = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
548 TRACE_smpi_comm_in(rank, __FUNCTION__,
549 new simgrid::instr::VarCollTIData("gatherV", root, send_size, nullptr, -1, recvcounts,
550 MPI_CURRENT_TYPE->encode(), MPI_CURRENT_TYPE2->encode()));
552 Colls::gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts->data(), disps.data(), MPI_CURRENT_TYPE2, root,
555 TRACE_smpi_comm_out(Actor::self()->getPid());
556 log_timed_action (action, clock);
559 static void action_scatterv(simgrid::xbt::ReplayAction& action)
561 /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
562 0 gather 68 10 10 10 68 0 0 0
564 1) 68 10 10 10 is the sendcounts
565 2) 68 is the recvcount
566 3) 0 is the root node
567 4) 0 is the send datatype id, see decode_datatype()
568 5) 0 is the recv datatype id, see decode_datatype()
570 double clock = smpi_process()->simulated_elapsed();
571 unsigned long comm_size = MPI_COMM_WORLD->size();
572 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
573 int recv_size = parse_double(action[2 + comm_size]);
574 std::vector<int> disps(comm_size, 0);
575 std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
577 MPI_Datatype MPI_CURRENT_TYPE =
578 (action.size() > 5 + comm_size) ? decode_datatype(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
579 MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5 + comm_size) ? decode_datatype(action[5 + comm_size])
582 void* send = nullptr;
583 void* recv = smpi_get_tmp_recvbuffer(recv_size * MPI_CURRENT_TYPE->size());
584 for (unsigned int i = 0; i < comm_size; i++) {
585 (*sendcounts)[i] = std::stoi(action[i + 2]);
587 int send_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
589 int root = (action.size() > 3 + comm_size) ? std::stoi(action[3 + comm_size]) : 0;
590 int rank = MPI_COMM_WORLD->rank();
593 send = smpi_get_tmp_sendbuffer(send_sum * MPI_CURRENT_TYPE2->size());
595 TRACE_smpi_comm_in(rank, __FUNCTION__,
596 new simgrid::instr::VarCollTIData("gatherV", root, -1, sendcounts, recv_size, nullptr,
597 MPI_CURRENT_TYPE->encode(), MPI_CURRENT_TYPE2->encode()));
599 Colls::scatterv(send, sendcounts->data(), disps.data(), MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root,
602 TRACE_smpi_comm_out(Actor::self()->getPid());
603 log_timed_action(action, clock);
606 static void action_reducescatter(simgrid::xbt::ReplayAction& action)
608 /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
609 0 reduceScatter 275427 275427 275427 204020 11346849 0
611 1) The first four values after the name of the action declare the recvcounts array
612 2) The value 11346849 is the amount of instructions
613 3) The last value corresponds to the datatype, see decode_datatype().
615 double clock = smpi_process()->simulated_elapsed();
616 unsigned long comm_size = MPI_COMM_WORLD->size();
617 CHECK_ACTION_PARAMS(action, comm_size+1, 1)
618 int comp_size = parse_double(action[2+comm_size]);
619 int my_proc_id = Actor::self()->getPid();
620 std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>);
621 MPI_Datatype MPI_CURRENT_TYPE =
622 (action.size() > 3 + comm_size) ? decode_datatype(action[3 + comm_size]) : MPI_DEFAULT_TYPE;
624 for (unsigned int i = 0; i < comm_size; i++) {
625 recvcounts->push_back(std::stoi(action[i + 2]));
627 int size{std::accumulate(recvcounts->begin(), recvcounts->end(), 0)};
629 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
630 new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, recvcounts,
631 std::to_string(comp_size), /* ugly hack to print comp_size */
632 MPI_CURRENT_TYPE->encode()));
634 void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
635 void *recvbuf = smpi_get_tmp_recvbuffer(size* MPI_CURRENT_TYPE->size());
637 Colls::reduce_scatter(sendbuf, recvbuf, recvcounts->data(), MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
638 smpi_execute_flops(comp_size);
640 TRACE_smpi_comm_out(my_proc_id);
641 log_timed_action (action, clock);
644 static void action_allgather(simgrid::xbt::ReplayAction& action)
646 /* The structure of the allgather action for the rank 0 (total 4 processes) is the following:
647 0 allGather 275427 275427
649 1) 275427 is the sendcount
650 2) 275427 is the recvcount
651 3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
653 double clock = smpi_process()->simulated_elapsed();
655 CHECK_ACTION_PARAMS(action, 2, 2)
656 int sendcount = std::stoi(action[2]);
657 int recvcount = std::stoi(action[3]);
659 MPI_Datatype MPI_CURRENT_TYPE{(action.size() > 5) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE};
660 MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE};
662 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
663 void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* MPI_CURRENT_TYPE2->size());
665 int my_proc_id = Actor::self()->getPid();
667 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
668 new simgrid::instr::CollTIData("allGather", -1, -1.0, sendcount, recvcount,
669 MPI_CURRENT_TYPE->encode(), MPI_CURRENT_TYPE2->encode()));
671 Colls::allgather(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
673 TRACE_smpi_comm_out(my_proc_id);
674 log_timed_action (action, clock);
677 static void action_allgatherv(simgrid::xbt::ReplayAction& action)
679 /* The structure of the allgatherv action for the rank 0 (total 4 processes) is the following:
680 0 allGatherV 275427 275427 275427 275427 204020
682 1) 275427 is the sendcount
683 2) The next four elements declare the recvcounts array
684 3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
686 double clock = smpi_process()->simulated_elapsed();
688 unsigned long comm_size = MPI_COMM_WORLD->size();
689 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
690 int sendcount = std::stoi(action[2]);
691 std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
692 std::vector<int> disps(comm_size, 0);
694 int datatype_index = 0, disp_index = 0;
695 if (action.size() > 3 + 2 * comm_size) { /* datatype + disp are specified */
696 datatype_index = 3 + comm_size;
697 disp_index = datatype_index + 1;
698 } else if (action.size() > 3 + 2 * comm_size) { /* disps specified; datatype is not specified; use the default one */
700 disp_index = 3 + comm_size;
701 } else if (action.size() > 3 + comm_size) { /* only datatype, no disp specified */
702 datatype_index = 3 + comm_size;
705 if (disp_index != 0) {
706 for (unsigned int i = 0; i < comm_size; i++)
707 disps[i] = std::stoi(action[disp_index + i]);
710 MPI_Datatype MPI_CURRENT_TYPE{(datatype_index > 0) ? decode_datatype(action[datatype_index]) : MPI_DEFAULT_TYPE};
711 MPI_Datatype MPI_CURRENT_TYPE2{(datatype_index > 0) ? decode_datatype(action[datatype_index]) : MPI_DEFAULT_TYPE};
713 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
715 for (unsigned int i = 0; i < comm_size; i++) {
716 (*recvcounts)[i] = std::stoi(action[i + 3]);
718 int recv_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
719 void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
721 int my_proc_id = Actor::self()->getPid();
723 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
724 new simgrid::instr::VarCollTIData("allGatherV", -1, sendcount, nullptr, -1, recvcounts,
725 MPI_CURRENT_TYPE->encode(), MPI_CURRENT_TYPE2->encode()));
727 Colls::allgatherv(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts->data(), disps.data(), MPI_CURRENT_TYPE2,
730 TRACE_smpi_comm_out(my_proc_id);
731 log_timed_action (action, clock);
734 static void action_allToAllv(simgrid::xbt::ReplayAction& action)
736 /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
737 0 allToAllV 100 1 7 10 12 100 1 70 10 5
739 1) 100 is the size of the send buffer *sizeof(int),
740 2) 1 7 10 12 is the sendcounts array
741 3) 100*sizeof(int) is the size of the receiver buffer
742 4) 1 70 10 5 is the recvcounts array
744 double clock = smpi_process()->simulated_elapsed();
746 unsigned long comm_size = MPI_COMM_WORLD->size();
747 CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
748 std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
749 std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
750 std::vector<int> senddisps(comm_size, 0);
751 std::vector<int> recvdisps(comm_size, 0);
753 MPI_Datatype MPI_CURRENT_TYPE =
754 (action.size() > 5 + 2 * comm_size) ? decode_datatype(action[4 + 2 * comm_size]) : MPI_DEFAULT_TYPE;
755 MPI_Datatype MPI_CURRENT_TYPE2{(action.size() > 5 + 2 * comm_size) ? decode_datatype(action[5 + 2 * comm_size])
758 int send_buf_size=parse_double(action[2]);
759 int recv_buf_size=parse_double(action[3+comm_size]);
760 int my_proc_id = Actor::self()->getPid();
761 void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* MPI_CURRENT_TYPE->size());
762 void *recvbuf = smpi_get_tmp_recvbuffer(recv_buf_size* MPI_CURRENT_TYPE2->size());
764 for (unsigned int i = 0; i < comm_size; i++) {
765 (*sendcounts)[i] = std::stoi(action[3 + i]);
766 (*recvcounts)[i] = std::stoi(action[4 + comm_size + i]);
768 int send_size = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
769 int recv_size = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
771 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
772 new simgrid::instr::VarCollTIData("allToAllV", -1, send_size, sendcounts, recv_size, recvcounts,
773 MPI_CURRENT_TYPE->encode(), MPI_CURRENT_TYPE2->encode()));
775 Colls::alltoallv(sendbuf, sendcounts->data(), senddisps.data(), MPI_CURRENT_TYPE, recvbuf, recvcounts->data(),
776 recvdisps.data(), MPI_CURRENT_TYPE, MPI_COMM_WORLD);
778 TRACE_smpi_comm_out(my_proc_id);
779 log_timed_action (action, clock);
782 }} // namespace simgrid::smpi
784 /** @brief Only initialize the replay, don't do it for real */
785 void smpi_replay_init(int* argc, char*** argv)
787 simgrid::smpi::Process::init(argc, argv);
788 smpi_process()->mark_as_initialized();
789 smpi_process()->set_replaying(true);
791 int my_proc_id = Actor::self()->getPid();
792 TRACE_smpi_init(my_proc_id);
793 TRACE_smpi_computing_init(my_proc_id);
794 TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
795 TRACE_smpi_comm_out(my_proc_id);
796 xbt_replay_action_register("init", simgrid::smpi::action_init);
797 xbt_replay_action_register("finalize", simgrid::smpi::action_finalize);
798 xbt_replay_action_register("comm_size", simgrid::smpi::action_comm_size);
799 xbt_replay_action_register("comm_split", simgrid::smpi::action_comm_split);
800 xbt_replay_action_register("comm_dup", simgrid::smpi::action_comm_dup);
801 xbt_replay_action_register("send", simgrid::smpi::action_send);
802 xbt_replay_action_register("Isend", simgrid::smpi::action_Isend);
803 xbt_replay_action_register("recv", simgrid::smpi::action_recv);
804 xbt_replay_action_register("Irecv", simgrid::smpi::action_Irecv);
805 xbt_replay_action_register("test", simgrid::smpi::action_test);
806 xbt_replay_action_register("wait", simgrid::smpi::action_wait);
807 xbt_replay_action_register("waitAll", simgrid::smpi::action_waitall);
808 xbt_replay_action_register("barrier", simgrid::smpi::action_barrier);
809 xbt_replay_action_register("bcast", simgrid::smpi::action_bcast);
810 xbt_replay_action_register("reduce", simgrid::smpi::action_reduce);
811 xbt_replay_action_register("allReduce", simgrid::smpi::action_allReduce);
812 xbt_replay_action_register("allToAll", simgrid::smpi::action_allToAll);
813 xbt_replay_action_register("allToAllV", simgrid::smpi::action_allToAllv);
814 xbt_replay_action_register("gather", simgrid::smpi::action_gather);
815 xbt_replay_action_register("scatter", simgrid::smpi::action_scatter);
816 xbt_replay_action_register("gatherV", simgrid::smpi::action_gatherv);
817 xbt_replay_action_register("scatterV", simgrid::smpi::action_scatterv);
818 xbt_replay_action_register("allGather", simgrid::smpi::action_allgather);
819 xbt_replay_action_register("allGatherV", simgrid::smpi::action_allgatherv);
820 xbt_replay_action_register("reduceScatter", simgrid::smpi::action_reducescatter);
821 xbt_replay_action_register("compute", simgrid::smpi::action_compute);
823 //if we have a delayed start, sleep here.
825 double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
826 XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
827 smpi_execute_flops(value);
829 //UGLY: force a context switch to be sure that all MSG_processes begin initialization
830 XBT_DEBUG("Force context switch by smpi_execute_flops - Sleeping for 0.0 flops ");
831 smpi_execute_flops(0.0);
835 /** @brief actually run the replay after initialization */
836 void smpi_replay_main(int* argc, char*** argv)
838 simgrid::xbt::replay_runner(*argc, *argv);
840 /* and now, finalize everything */
841 /* One active process will stop. Decrease the counter*/
842 XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
843 if (not get_reqq_self()->empty()) {
844 unsigned int count_requests=get_reqq_self()->size();
845 MPI_Request requests[count_requests];
846 MPI_Status status[count_requests];
849 for (auto const& req : *get_reqq_self()) {
853 simgrid::smpi::Request::waitall(count_requests, requests, status);
855 delete get_reqq_self();
858 if(active_processes==0){
859 /* Last process alive speaking: end the simulated timer */
860 XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
861 smpi_free_replay_tmp_buffers();
864 TRACE_smpi_comm_in(Actor::self()->getPid(), "smpi_replay_run_finalize", new simgrid::instr::NoOpTIData("finalize"));
866 smpi_process()->finalize();
868 TRACE_smpi_comm_out(Actor::self()->getPid());
869 TRACE_smpi_finalize(Actor::self()->getPid());
872 /** @brief chain a replay initialization and a replay start */
873 void smpi_replay_run(int* argc, char*** argv)
875 smpi_replay_init(argc, argv);
876 smpi_replay_main(argc, argv);