1 /* Copyright (c) 2009-2017. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
17 #include <unordered_map>
20 using simgrid::s4u::Actor;
22 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
24 static int communicator_size = 0;
25 static int active_processes = 0;
26 static std::unordered_map<int, std::vector<MPI_Request>*> reqq;
28 static MPI_Datatype MPI_DEFAULT_TYPE;
30 static int sendbuffer_size = 0;
31 static char* sendbuffer = nullptr;
32 static int recvbuffer_size = 0;
33 static char* recvbuffer = nullptr;
35 class ReplayActionArg {
39 static void log_timed_action (const char *const *action, double clock){
40 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
41 char *name = xbt_str_join_array(action, " ");
42 XBT_VERB("%s %f", name, smpi_process()->simulated_elapsed()-clock);
47 static std::vector<MPI_Request>* get_reqq_self()
49 return reqq.at(Actor::self()->getPid());
52 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
54 reqq.insert({Actor::self()->getPid(), mpi_request});
57 //allocate a single buffer for all sends, growing it if needed
58 void* smpi_get_tmp_sendbuffer(int size)
60 if (not smpi_process()->replaying())
61 return xbt_malloc(size);
62 if (sendbuffer_size<size){
63 sendbuffer=static_cast<char*>(xbt_realloc(sendbuffer,size));
69 //allocate a single buffer for all recv
70 void* smpi_get_tmp_recvbuffer(int size){
71 if (not smpi_process()->replaying())
72 return xbt_malloc(size);
73 if (recvbuffer_size<size){
74 recvbuffer=static_cast<char*>(xbt_realloc(recvbuffer,size));
80 void smpi_free_tmp_buffer(void* buf){
81 if (not smpi_process()->replaying())
86 static double parse_double(const char *string)
89 double value = strtod(string, &endptr);
91 THROWF(unknown_error, 0, "%s is not a double", string);
96 //TODO: this logic should be moved inside the datatype class, to support all predefined types and get rid of is_replayable.
97 static MPI_Datatype decode_datatype(const char *const action)
99 return simgrid::smpi::Datatype::decode(action);
102 const char* encode_datatype(MPI_Datatype datatype)
104 if (datatype == nullptr) /* this actually does seem to be possible, had this in the scatter2 test */
107 return datatype->encode();
110 #define CHECK_ACTION_PARAMS(action, mandatory, optional) {\
112 while(action[i]!=nullptr)\
115 THROWF(arg_error, 0, "%s replay failed.\n" \
116 "%d items were given on the line. First two should be process_id and action. " \
117 "This action needs after them %d mandatory arguments, and accepts %d optional ones. \n" \
118 "Please contact the Simgrid team if support is needed", __FUNCTION__, i, mandatory, optional);\
124 static void action_init(const char *const *action)
126 XBT_DEBUG("Initialize the counters");
127 CHECK_ACTION_PARAMS(action, 0, 1)
129 MPI_DEFAULT_TYPE = MPI_DOUBLE; // default MPE datatype
131 MPI_DEFAULT_TYPE = MPI_BYTE; // default TAU datatype
133 /* start a simulated timer */
134 smpi_process()->simulated_start();
135 /*initialize the number of active processes */
136 active_processes = smpi_process_count();
138 set_reqq_self(new std::vector<MPI_Request>);
141 static void action_finalize(const char *const *action)
146 static void action_comm_size(const char *const *action)
148 communicator_size = parse_double(action[2]);
149 log_timed_action (action, smpi_process()->simulated_elapsed());
152 static void action_comm_split(const char *const *action)
154 log_timed_action (action, smpi_process()->simulated_elapsed());
157 static void action_comm_dup(const char *const *action)
159 log_timed_action (action, smpi_process()->simulated_elapsed());
162 static void action_compute(const char *const *action)
164 CHECK_ACTION_PARAMS(action, 1, 0)
165 double clock = smpi_process()->simulated_elapsed();
166 double flops= parse_double(action[2]);
167 int my_proc_id = Actor::self()->getPid();
169 TRACE_smpi_computing_in(my_proc_id, flops);
170 smpi_execute_flops(flops);
171 TRACE_smpi_computing_out(my_proc_id);
173 log_timed_action (action, clock);
176 static void action_send(const char *const *action)
178 CHECK_ACTION_PARAMS(action, 2, 1)
179 int to = atoi(action[2]);
180 double size=parse_double(action[3]);
181 double clock = smpi_process()->simulated_elapsed();
183 MPI_Datatype MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
185 int my_proc_id = Actor::self()->getPid();
186 int dst_traced = MPI_COMM_WORLD->group()->actor(to)->getPid();
188 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
189 new simgrid::instr::Pt2PtTIData("send", to, size, MPI_CURRENT_TYPE->encode()));
190 if (not TRACE_smpi_view_internals())
191 TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, size * MPI_CURRENT_TYPE->size());
193 Request::send(nullptr, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
195 TRACE_smpi_comm_out(my_proc_id);
197 log_timed_action(action, clock);
200 static void action_Isend(const char *const *action)
202 CHECK_ACTION_PARAMS(action, 2, 1)
203 int to = atoi(action[2]);
204 double size=parse_double(action[3]);
205 double clock = smpi_process()->simulated_elapsed();
207 MPI_Datatype MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
209 int my_proc_id = Actor::self()->getPid();
210 int dst_traced = MPI_COMM_WORLD->group()->actor(to)->getPid();
211 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
212 new simgrid::instr::Pt2PtTIData("Isend", to, size, MPI_CURRENT_TYPE->encode()));
213 if (not TRACE_smpi_view_internals())
214 TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, size * MPI_CURRENT_TYPE->size());
216 MPI_Request request = Request::isend(nullptr, size, MPI_CURRENT_TYPE, to, 0, MPI_COMM_WORLD);
218 TRACE_smpi_comm_out(my_proc_id);
220 get_reqq_self()->push_back(request);
222 log_timed_action (action, clock);
225 static void action_recv(const char *const *action) {
226 CHECK_ACTION_PARAMS(action, 2, 1)
227 int from = atoi(action[2]);
228 double size=parse_double(action[3]);
229 double clock = smpi_process()->simulated_elapsed();
232 MPI_Datatype MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
234 int my_proc_id = Actor::self()->getPid();
235 int src_traced = MPI_COMM_WORLD->group()->actor(from)->getPid();
237 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
238 new simgrid::instr::Pt2PtTIData("recv", from, size, MPI_CURRENT_TYPE->encode()));
240 //unknown size from the receiver point of view
242 Request::probe(from, 0, MPI_COMM_WORLD, &status);
246 Request::recv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
248 TRACE_smpi_comm_out(my_proc_id);
249 if (not TRACE_smpi_view_internals()) {
250 TRACE_smpi_recv(src_traced, my_proc_id, 0);
253 log_timed_action (action, clock);
256 static void action_Irecv(const char *const *action)
258 CHECK_ACTION_PARAMS(action, 2, 1)
259 int from = atoi(action[2]);
260 double size=parse_double(action[3]);
261 double clock = smpi_process()->simulated_elapsed();
263 MPI_Datatype MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
265 int my_proc_id = Actor::self()->getPid();
266 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
267 new simgrid::instr::Pt2PtTIData("Irecv", from, size, MPI_CURRENT_TYPE->encode()));
269 //unknow size from the receiver pov
271 Request::probe(from, 0, MPI_COMM_WORLD, &status);
275 MPI_Request request = Request::irecv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
277 TRACE_smpi_comm_out(my_proc_id);
278 get_reqq_self()->push_back(request);
280 log_timed_action (action, clock);
283 static void action_test(const char* const* action)
285 CHECK_ACTION_PARAMS(action, 0, 0)
286 double clock = smpi_process()->simulated_elapsed();
289 MPI_Request request = get_reqq_self()->back();
290 get_reqq_self()->pop_back();
291 //if request is null here, this may mean that a previous test has succeeded
292 //Different times in traced application and replayed version may lead to this
293 //In this case, ignore the extra calls.
294 if(request!=nullptr){
295 int my_proc_id = Actor::self()->getPid();
296 TRACE_smpi_testing_in(my_proc_id);
298 int flag = Request::test(&request, &status);
300 XBT_DEBUG("MPI_Test result: %d", flag);
301 /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now nullptr.*/
302 get_reqq_self()->push_back(request);
304 TRACE_smpi_testing_out(my_proc_id);
306 log_timed_action (action, clock);
309 static void action_wait(const char *const *action){
310 CHECK_ACTION_PARAMS(action, 0, 0)
311 double clock = smpi_process()->simulated_elapsed();
314 xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s",
315 xbt_str_join_array(action," "));
316 MPI_Request request = get_reqq_self()->back();
317 get_reqq_self()->pop_back();
319 if (request==nullptr){
320 /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just return.*/
324 int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
326 MPI_Group group = request->comm()->group();
327 int src_traced = group->rank(request->src());
328 int dst_traced = group->rank(request->dst());
329 int is_wait_for_receive = (request->flags() & RECV);
330 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::NoOpTIData("wait"));
332 Request::wait(&request, &status);
334 TRACE_smpi_comm_out(rank);
335 if (is_wait_for_receive)
336 TRACE_smpi_recv(src_traced, dst_traced, 0);
337 log_timed_action (action, clock);
340 static void action_waitall(const char *const *action){
341 CHECK_ACTION_PARAMS(action, 0, 0)
342 double clock = smpi_process()->simulated_elapsed();
343 const unsigned int count_requests = get_reqq_self()->size();
345 if (count_requests>0) {
346 MPI_Status status[count_requests];
348 int my_proc_id_traced = Actor::self()->getPid();
349 TRACE_smpi_comm_in(my_proc_id_traced, __FUNCTION__,
350 new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
351 int recvs_snd[count_requests];
352 int recvs_rcv[count_requests];
353 for (unsigned int i = 0; i < count_requests; i++) {
354 const auto& req = (*get_reqq_self())[i];
355 if (req && (req->flags() & RECV)) {
356 recvs_snd[i] = req->src();
357 recvs_rcv[i] = req->dst();
361 Request::waitall(count_requests, &(*get_reqq_self())[0], status);
363 for (unsigned i = 0; i < count_requests; i++) {
364 if (recvs_snd[i]!=-100)
365 TRACE_smpi_recv(recvs_snd[i], recvs_rcv[i],0);
367 TRACE_smpi_comm_out(my_proc_id_traced);
369 log_timed_action (action, clock);
372 static void action_barrier(const char *const *action){
373 double clock = smpi_process()->simulated_elapsed();
374 int my_proc_id = Actor::self()->getPid();
375 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::NoOpTIData("barrier"));
377 Colls::barrier(MPI_COMM_WORLD);
379 TRACE_smpi_comm_out(my_proc_id);
380 log_timed_action (action, clock);
383 static void action_bcast(const char *const *action)
385 CHECK_ACTION_PARAMS(action, 1, 2)
386 double size = parse_double(action[2]);
387 double clock = smpi_process()->simulated_elapsed();
388 int root = (action[3]) ? atoi(action[3]) : 0;
389 /* Initialize MPI_CURRENT_TYPE in order to decrease the number of the checks */
390 MPI_Datatype MPI_CURRENT_TYPE = (action[3] && action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
392 int my_proc_id = Actor::self()->getPid();
393 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
394 new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(root)->getPid(), -1.0, size,
395 -1, MPI_CURRENT_TYPE->encode(), ""));
397 void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
399 Colls::bcast(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
401 TRACE_smpi_comm_out(my_proc_id);
402 log_timed_action (action, clock);
405 static void action_reduce(const char *const *action)
407 CHECK_ACTION_PARAMS(action, 2, 2)
408 double comm_size = parse_double(action[2]);
409 double comp_size = parse_double(action[3]);
410 double clock = smpi_process()->simulated_elapsed();
411 int root = (action[4]) ? atoi(action[4]) : 0;
413 MPI_Datatype MPI_CURRENT_TYPE = (action[4] && action[5]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE;
415 int my_proc_id = Actor::self()->getPid();
416 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
417 new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(root)->getPid(), comp_size,
418 comm_size, -1, MPI_CURRENT_TYPE->encode(), ""));
420 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
421 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
422 Colls::reduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
423 smpi_execute_flops(comp_size);
425 TRACE_smpi_comm_out(my_proc_id);
426 log_timed_action (action, clock);
429 static void action_allReduce(const char *const *action) {
430 CHECK_ACTION_PARAMS(action, 2, 1)
431 double comm_size = parse_double(action[2]);
432 double comp_size = parse_double(action[3]);
434 MPI_Datatype MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
436 double clock = smpi_process()->simulated_elapsed();
437 int my_proc_id = Actor::self()->getPid();
438 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::CollTIData("allReduce", -1, comp_size, comm_size, -1,
439 MPI_CURRENT_TYPE->encode(), ""));
441 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
442 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
443 Colls::allreduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
444 smpi_execute_flops(comp_size);
446 TRACE_smpi_comm_out(my_proc_id);
447 log_timed_action (action, clock);
450 static void action_allToAll(const char *const *action) {
451 CHECK_ACTION_PARAMS(action, 2, 2) //two mandatory (send and recv volumes) and two optional (corresponding datatypes)
452 double clock = smpi_process()->simulated_elapsed();
453 int comm_size = MPI_COMM_WORLD->size();
454 int send_size = parse_double(action[2]);
455 int recv_size = parse_double(action[3]);
456 MPI_Datatype MPI_CURRENT_TYPE = (action[4] && action[5]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
457 MPI_Datatype MPI_CURRENT_TYPE2{(action[4] && action[5]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE};
459 void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* MPI_CURRENT_TYPE->size());
460 void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
462 int my_proc_id = Actor::self()->getPid();
463 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
464 new simgrid::instr::CollTIData("allToAll", -1, -1.0, send_size, recv_size,
465 MPI_CURRENT_TYPE->encode(), MPI_CURRENT_TYPE2->encode()));
467 Colls::alltoall(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
469 TRACE_smpi_comm_out(my_proc_id);
470 log_timed_action (action, clock);
473 static void action_gather(const char *const *action) {
474 /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
477 1) 68 is the sendcounts
478 2) 68 is the recvcounts
479 3) 0 is the root node
480 4) 0 is the send datatype id, see decode_datatype()
481 5) 0 is the recv datatype id, see decode_datatype()
483 CHECK_ACTION_PARAMS(action, 2, 3)
484 double clock = smpi_process()->simulated_elapsed();
485 int comm_size = MPI_COMM_WORLD->size();
486 int send_size = parse_double(action[2]);
487 int recv_size = parse_double(action[3]);
488 MPI_Datatype MPI_CURRENT_TYPE = (action[5] && action[6]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE;
489 MPI_Datatype MPI_CURRENT_TYPE2{(action[5] && action[6]) ? decode_datatype(action[6]) : MPI_DEFAULT_TYPE};
491 void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
492 void *recv = nullptr;
493 int root = (action[4]) ? atoi(action[4]) : 0;
494 int rank = MPI_COMM_WORLD->rank();
497 recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
499 TRACE_smpi_comm_in(rank, __FUNCTION__,
500 new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
501 MPI_CURRENT_TYPE->encode(), MPI_CURRENT_TYPE2->encode()));
503 Colls::gather(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
505 TRACE_smpi_comm_out(Actor::self()->getPid());
506 log_timed_action (action, clock);
509 static void action_scatter(const char* const* action)
511 /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
514 1) 68 is the sendcounts
515 2) 68 is the recvcounts
516 3) 0 is the root node
517 4) 0 is the send datatype id, see decode_datatype()
518 5) 0 is the recv datatype id, see decode_datatype()
520 CHECK_ACTION_PARAMS(action, 2, 3)
521 double clock = smpi_process()->simulated_elapsed();
522 int comm_size = MPI_COMM_WORLD->size();
523 int send_size = parse_double(action[2]);
524 int recv_size = parse_double(action[3]);
525 MPI_Datatype MPI_CURRENT_TYPE = (action[5] && action[6]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE;
526 MPI_Datatype MPI_CURRENT_TYPE2{(action[5] && action[6]) ? decode_datatype(action[6]) : MPI_DEFAULT_TYPE};
528 void* send = smpi_get_tmp_sendbuffer(send_size * MPI_CURRENT_TYPE->size());
529 void* recv = nullptr;
530 int root = (action[4]) ? atoi(action[4]) : 0;
531 int rank = MPI_COMM_WORLD->rank();
534 recv = smpi_get_tmp_recvbuffer(recv_size * comm_size * MPI_CURRENT_TYPE2->size());
536 TRACE_smpi_comm_in(rank, __FUNCTION__,
537 new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
538 MPI_CURRENT_TYPE->encode(), MPI_CURRENT_TYPE2->encode()));
540 Colls::scatter(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
542 TRACE_smpi_comm_out(Actor::self()->getPid());
543 log_timed_action(action, clock);
546 static void action_gatherv(const char *const *action) {
547 /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
548 0 gather 68 68 10 10 10 0 0 0
550 1) 68 is the sendcount
551 2) 68 10 10 10 is the recvcounts
552 3) 0 is the root node
553 4) 0 is the send datatype id, see decode_datatype()
554 5) 0 is the recv datatype id, see decode_datatype()
556 double clock = smpi_process()->simulated_elapsed();
557 int comm_size = MPI_COMM_WORLD->size();
558 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
559 int send_size = parse_double(action[2]);
560 std::vector<int> disps(comm_size, 0);
561 std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
563 MPI_Datatype MPI_CURRENT_TYPE =
564 (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
565 MPI_Datatype MPI_CURRENT_TYPE2{
566 (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
568 void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
569 void *recv = nullptr;
570 for(int i=0;i<comm_size;i++) {
571 (*recvcounts)[i] = atoi(action[i + 3]);
573 int recv_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
575 int root = (action[3 + comm_size]) ? atoi(action[3 + comm_size]) : 0;
576 int rank = MPI_COMM_WORLD->rank();
579 recv = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
581 TRACE_smpi_comm_in(rank, __FUNCTION__,
582 new simgrid::instr::VarCollTIData("gatherV", root, send_size, nullptr, -1, recvcounts,
583 MPI_CURRENT_TYPE->encode(), MPI_CURRENT_TYPE2->encode()));
585 Colls::gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts->data(), disps.data(), MPI_CURRENT_TYPE2, root,
588 TRACE_smpi_comm_out(Actor::self()->getPid());
589 log_timed_action (action, clock);
592 static void action_scatterv(const char* const* action)
594 /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
595 0 gather 68 10 10 10 68 0 0 0
597 1) 68 10 10 10 is the sendcounts
598 2) 68 is the recvcount
599 3) 0 is the root node
600 4) 0 is the send datatype id, see decode_datatype()
601 5) 0 is the recv datatype id, see decode_datatype()
603 double clock = smpi_process()->simulated_elapsed();
604 int comm_size = MPI_COMM_WORLD->size();
605 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
606 int recv_size = parse_double(action[2 + comm_size]);
607 std::vector<int> disps(comm_size, 0);
608 std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
610 MPI_Datatype MPI_CURRENT_TYPE =
611 (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
612 MPI_Datatype MPI_CURRENT_TYPE2{
613 (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
615 void* send = nullptr;
616 void* recv = smpi_get_tmp_recvbuffer(recv_size * MPI_CURRENT_TYPE->size());
617 for (int i = 0; i < comm_size; i++) {
618 (*sendcounts)[i] = atoi(action[i + 2]);
620 int send_sum = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
622 int root = (action[3 + comm_size]) ? atoi(action[3 + comm_size]) : 0;
623 int rank = MPI_COMM_WORLD->rank();
626 send = smpi_get_tmp_sendbuffer(send_sum * MPI_CURRENT_TYPE2->size());
628 TRACE_smpi_comm_in(rank, __FUNCTION__,
629 new simgrid::instr::VarCollTIData("gatherV", root, -1, sendcounts, recv_size, nullptr,
630 MPI_CURRENT_TYPE->encode(), MPI_CURRENT_TYPE2->encode()));
632 Colls::scatterv(send, sendcounts->data(), disps.data(), MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root,
635 TRACE_smpi_comm_out(Actor::self()->getPid());
636 log_timed_action(action, clock);
639 static void action_reducescatter(const char *const *action) {
640 /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
641 0 reduceScatter 275427 275427 275427 204020 11346849 0
643 1) The first four values after the name of the action declare the recvcounts array
644 2) The value 11346849 is the amount of instructions
645 3) The last value corresponds to the datatype, see decode_datatype().
647 double clock = smpi_process()->simulated_elapsed();
648 int comm_size = MPI_COMM_WORLD->size();
649 CHECK_ACTION_PARAMS(action, comm_size+1, 1)
650 int comp_size = parse_double(action[2+comm_size]);
651 int my_proc_id = Actor::self()->getPid();
652 std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>);
653 MPI_Datatype MPI_CURRENT_TYPE = (action[3 + comm_size]) ? decode_datatype(action[3 + comm_size]) : MPI_DEFAULT_TYPE;
655 for(int i=0;i<comm_size;i++) {
656 recvcounts->push_back(atoi(action[i + 2]));
658 int size{std::accumulate(recvcounts->begin(), recvcounts->end(), 0)};
660 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
661 new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, recvcounts,
662 std::to_string(comp_size), /* ugly hack to print comp_size */
663 MPI_CURRENT_TYPE->encode()));
665 void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
666 void *recvbuf = smpi_get_tmp_recvbuffer(size* MPI_CURRENT_TYPE->size());
668 Colls::reduce_scatter(sendbuf, recvbuf, recvcounts->data(), MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
669 smpi_execute_flops(comp_size);
671 TRACE_smpi_comm_out(my_proc_id);
672 log_timed_action (action, clock);
675 static void action_allgather(const char *const *action) {
676 /* The structure of the allgather action for the rank 0 (total 4 processes) is the following:
677 0 allGather 275427 275427
679 1) 275427 is the sendcount
680 2) 275427 is the recvcount
681 3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
683 double clock = smpi_process()->simulated_elapsed();
685 CHECK_ACTION_PARAMS(action, 2, 2)
686 int sendcount=atoi(action[2]);
687 int recvcount=atoi(action[3]);
689 MPI_Datatype MPI_CURRENT_TYPE = (action[4] && action[5]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
690 MPI_Datatype MPI_CURRENT_TYPE2{(action[4] && action[5]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE};
692 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
693 void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* MPI_CURRENT_TYPE2->size());
695 int my_proc_id = Actor::self()->getPid();
697 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
698 new simgrid::instr::CollTIData("allGather", -1, -1.0, sendcount, recvcount,
699 MPI_CURRENT_TYPE->encode(), MPI_CURRENT_TYPE2->encode()));
701 Colls::allgather(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
703 TRACE_smpi_comm_out(my_proc_id);
704 log_timed_action (action, clock);
707 static void action_allgatherv(const char *const *action) {
708 /* The structure of the allgatherv action for the rank 0 (total 4 processes) is the following:
709 0 allGatherV 275427 275427 275427 275427 204020
711 1) 275427 is the sendcount
712 2) The next four elements declare the recvcounts array
713 3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
715 double clock = smpi_process()->simulated_elapsed();
717 int comm_size = MPI_COMM_WORLD->size();
718 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
719 int sendcount=atoi(action[2]);
720 std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
721 std::vector<int> disps(comm_size, 0);
723 int datatype_index = 0, disp_index = 0;
724 if (action[3 + 2 * comm_size]) { /* datatype + disp are specified */
725 datatype_index = 3 + comm_size;
726 disp_index = datatype_index + 1;
727 } else if (action[3 + 2 * comm_size]) { /* disps specified; datatype is not specified; use the default one */
729 disp_index = 3 + comm_size;
730 } else if (action[3 + comm_size]) { /* only datatype, no disp specified */
731 datatype_index = 3 + comm_size;
734 if (disp_index != 0) {
735 std::copy(action[disp_index], action[disp_index + comm_size], disps.begin());
738 MPI_Datatype MPI_CURRENT_TYPE{(datatype_index > 0) ? decode_datatype(action[datatype_index]) : MPI_DEFAULT_TYPE};
739 MPI_Datatype MPI_CURRENT_TYPE2{(datatype_index > 0) ? decode_datatype(action[datatype_index]) : MPI_DEFAULT_TYPE};
741 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
743 for(int i=0;i<comm_size;i++) {
744 (*recvcounts)[i] = atoi(action[i + 3]);
746 int recv_sum = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
747 void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
749 int my_proc_id = Actor::self()->getPid();
751 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
752 new simgrid::instr::VarCollTIData("allGatherV", -1, sendcount, nullptr, -1, recvcounts,
753 MPI_CURRENT_TYPE->encode(), MPI_CURRENT_TYPE2->encode()));
755 Colls::allgatherv(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts->data(), disps.data(), MPI_CURRENT_TYPE2,
758 TRACE_smpi_comm_out(my_proc_id);
759 log_timed_action (action, clock);
762 static void action_allToAllv(const char *const *action) {
763 /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
764 0 allToAllV 100 1 7 10 12 100 1 70 10 5
766 1) 100 is the size of the send buffer *sizeof(int),
767 2) 1 7 10 12 is the sendcounts array
768 3) 100*sizeof(int) is the size of the receiver buffer
769 4) 1 70 10 5 is the recvcounts array
771 double clock = smpi_process()->simulated_elapsed();
773 int comm_size = MPI_COMM_WORLD->size();
774 CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
775 std::shared_ptr<std::vector<int>> sendcounts(new std::vector<int>(comm_size));
776 std::shared_ptr<std::vector<int>> recvcounts(new std::vector<int>(comm_size));
777 std::vector<int> senddisps(comm_size, 0);
778 std::vector<int> recvdisps(comm_size, 0);
780 MPI_Datatype MPI_CURRENT_TYPE = (action[4 + 2 * comm_size] && action[5 + 2 * comm_size])
781 ? decode_datatype(action[4 + 2 * comm_size])
783 MPI_Datatype MPI_CURRENT_TYPE2{(action[4 + 2 * comm_size] && action[5 + 2 * comm_size])
784 ? decode_datatype(action[5 + 2 * comm_size])
787 int send_buf_size=parse_double(action[2]);
788 int recv_buf_size=parse_double(action[3+comm_size]);
789 int my_proc_id = Actor::self()->getPid();
790 void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* MPI_CURRENT_TYPE->size());
791 void *recvbuf = smpi_get_tmp_recvbuffer(recv_buf_size* MPI_CURRENT_TYPE2->size());
793 for(int i=0;i<comm_size;i++) {
794 (*sendcounts)[i] = atoi(action[3 + i]);
795 (*recvcounts)[i] = atoi(action[4 + comm_size + i]);
797 int send_size = std::accumulate(sendcounts->begin(), sendcounts->end(), 0);
798 int recv_size = std::accumulate(recvcounts->begin(), recvcounts->end(), 0);
800 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
801 new simgrid::instr::VarCollTIData("allToAllV", -1, send_size, sendcounts, recv_size, recvcounts,
802 MPI_CURRENT_TYPE->encode(), MPI_CURRENT_TYPE2->encode()));
804 Colls::alltoallv(sendbuf, sendcounts->data(), senddisps.data(), MPI_CURRENT_TYPE, recvbuf, recvcounts->data(),
805 recvdisps.data(), MPI_CURRENT_TYPE, MPI_COMM_WORLD);
807 TRACE_smpi_comm_out(my_proc_id);
808 log_timed_action (action, clock);
811 }} // namespace simgrid::smpi
813 /** @brief Only initialize the replay, don't do it for real */
814 void smpi_replay_init(int* argc, char*** argv)
816 simgrid::smpi::Process::init(argc, argv);
817 smpi_process()->mark_as_initialized();
818 smpi_process()->set_replaying(true);
820 int my_proc_id = Actor::self()->getPid();
821 TRACE_smpi_init(my_proc_id);
822 TRACE_smpi_computing_init(my_proc_id);
823 TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
824 TRACE_smpi_comm_out(my_proc_id);
825 xbt_replay_action_register("init", simgrid::smpi::action_init);
826 xbt_replay_action_register("finalize", simgrid::smpi::action_finalize);
827 xbt_replay_action_register("comm_size", simgrid::smpi::action_comm_size);
828 xbt_replay_action_register("comm_split", simgrid::smpi::action_comm_split);
829 xbt_replay_action_register("comm_dup", simgrid::smpi::action_comm_dup);
830 xbt_replay_action_register("send", simgrid::smpi::action_send);
831 xbt_replay_action_register("Isend", simgrid::smpi::action_Isend);
832 xbt_replay_action_register("recv", simgrid::smpi::action_recv);
833 xbt_replay_action_register("Irecv", simgrid::smpi::action_Irecv);
834 xbt_replay_action_register("test", simgrid::smpi::action_test);
835 xbt_replay_action_register("wait", simgrid::smpi::action_wait);
836 xbt_replay_action_register("waitAll", simgrid::smpi::action_waitall);
837 xbt_replay_action_register("barrier", simgrid::smpi::action_barrier);
838 xbt_replay_action_register("bcast", simgrid::smpi::action_bcast);
839 xbt_replay_action_register("reduce", simgrid::smpi::action_reduce);
840 xbt_replay_action_register("allReduce", simgrid::smpi::action_allReduce);
841 xbt_replay_action_register("allToAll", simgrid::smpi::action_allToAll);
842 xbt_replay_action_register("allToAllV", simgrid::smpi::action_allToAllv);
843 xbt_replay_action_register("gather", simgrid::smpi::action_gather);
844 xbt_replay_action_register("scatter", simgrid::smpi::action_scatter);
845 xbt_replay_action_register("gatherV", simgrid::smpi::action_gatherv);
846 xbt_replay_action_register("scatterV", simgrid::smpi::action_scatterv);
847 xbt_replay_action_register("allGather", simgrid::smpi::action_allgather);
848 xbt_replay_action_register("allGatherV", simgrid::smpi::action_allgatherv);
849 xbt_replay_action_register("reduceScatter", simgrid::smpi::action_reducescatter);
850 xbt_replay_action_register("compute", simgrid::smpi::action_compute);
852 //if we have a delayed start, sleep here.
854 double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
855 XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
856 smpi_execute_flops(value);
858 //UGLY: force a context switch to be sure that all MSG_processes begin initialization
859 XBT_DEBUG("Force context switch by smpi_execute_flops - Sleeping for 0.0 flops ");
860 smpi_execute_flops(0.0);
864 /** @brief actually run the replay after initialization */
865 void smpi_replay_main(int* argc, char*** argv)
867 simgrid::xbt::replay_runner(*argc, *argv);
869 /* and now, finalize everything */
870 /* One active process will stop. Decrease the counter*/
871 XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
872 if (not get_reqq_self()->empty()) {
873 unsigned int count_requests=get_reqq_self()->size();
874 MPI_Request requests[count_requests];
875 MPI_Status status[count_requests];
878 for (auto const& req : *get_reqq_self()) {
882 simgrid::smpi::Request::waitall(count_requests, requests, status);
884 delete get_reqq_self();
887 if(active_processes==0){
888 /* Last process alive speaking: end the simulated timer */
889 XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
890 xbt_free(sendbuffer);
891 xbt_free(recvbuffer);
894 TRACE_smpi_comm_in(Actor::self()->getPid(), "smpi_replay_run_finalize", new simgrid::instr::NoOpTIData("finalize"));
896 smpi_process()->finalize();
898 TRACE_smpi_comm_out(Actor::self()->getPid());
899 TRACE_smpi_finalize(Actor::self()->getPid());
902 /** @brief chain a replay initialization and a replay start */
903 void smpi_replay_run(int* argc, char*** argv)
905 smpi_replay_init(argc, argv);
906 smpi_replay_main(argc, argv);