1 /* Copyright (c) 2009-2017. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
15 #include <unordered_map>
18 using simgrid::s4u::Actor;
20 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
22 static int communicator_size = 0;
23 static int active_processes = 0;
24 static std::unordered_map<int, std::vector<MPI_Request>*> reqq;
26 static MPI_Datatype MPI_DEFAULT_TYPE;
27 static MPI_Datatype MPI_CURRENT_TYPE;
29 static int sendbuffer_size = 0;
30 static char* sendbuffer = nullptr;
31 static int recvbuffer_size = 0;
32 static char* recvbuffer = nullptr;
34 static void log_timed_action (const char *const *action, double clock){
35 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
36 char *name = xbt_str_join_array(action, " ");
37 XBT_VERB("%s %f", name, smpi_process()->simulated_elapsed()-clock);
42 static std::vector<MPI_Request>* get_reqq_self()
44 return reqq.at(Actor::self()->getPid());
47 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
49 reqq.insert({Actor::self()->getPid(), mpi_request});
52 //allocate a single buffer for all sends, growing it if needed
53 void* smpi_get_tmp_sendbuffer(int size)
55 if (not smpi_process()->replaying())
56 return xbt_malloc(size);
57 if (sendbuffer_size<size){
58 sendbuffer=static_cast<char*>(xbt_realloc(sendbuffer,size));
64 //allocate a single buffer for all recv
65 void* smpi_get_tmp_recvbuffer(int size){
66 if (not smpi_process()->replaying())
67 return xbt_malloc(size);
68 if (recvbuffer_size<size){
69 recvbuffer=static_cast<char*>(xbt_realloc(recvbuffer,size));
75 void smpi_free_tmp_buffer(void* buf){
76 if (not smpi_process()->replaying())
81 static double parse_double(const char *string)
84 double value = strtod(string, &endptr);
86 THROWF(unknown_error, 0, "%s is not a double", string);
91 //TODO: this logic should be moved inside the datatype class, to support all predefined types and get rid of is_replayable.
92 static MPI_Datatype decode_datatype(const char *const action)
94 switch(atoi(action)) {
117 return MPI_DEFAULT_TYPE;
122 const char* encode_datatype(MPI_Datatype datatype)
124 if (datatype==MPI_BYTE)
126 if(datatype==MPI_DOUBLE)
128 if(datatype==MPI_INT)
130 if(datatype==MPI_CHAR)
132 if(datatype==MPI_SHORT)
134 if(datatype==MPI_LONG)
136 if(datatype==MPI_FLOAT)
138 // default - not implemented.
139 // do not warn here as we pass in this function even for other trace formats
143 #define CHECK_ACTION_PARAMS(action, mandatory, optional) {\
145 while(action[i]!=nullptr)\
148 THROWF(arg_error, 0, "%s replay failed.\n" \
149 "%d items were given on the line. First two should be process_id and action. " \
150 "This action needs after them %d mandatory arguments, and accepts %d optional ones. \n" \
151 "Please contact the Simgrid team if support is needed", __FUNCTION__, i, mandatory, optional);\
157 static void action_init(const char *const *action)
159 XBT_DEBUG("Initialize the counters");
160 CHECK_ACTION_PARAMS(action, 0, 1)
162 MPI_DEFAULT_TYPE = MPI_DOUBLE; // default MPE datatype
164 MPI_DEFAULT_TYPE = MPI_BYTE; // default TAU datatype
166 /* start a simulated timer */
167 smpi_process()->simulated_start();
168 /*initialize the number of active processes */
169 active_processes = smpi_process_count();
171 set_reqq_self(new std::vector<MPI_Request>);
174 static void action_finalize(const char *const *action)
179 static void action_comm_size(const char *const *action)
181 communicator_size = parse_double(action[2]);
182 log_timed_action (action, smpi_process()->simulated_elapsed());
185 static void action_comm_split(const char *const *action)
187 log_timed_action (action, smpi_process()->simulated_elapsed());
190 static void action_comm_dup(const char *const *action)
192 log_timed_action (action, smpi_process()->simulated_elapsed());
195 static void action_compute(const char *const *action)
197 CHECK_ACTION_PARAMS(action, 1, 0)
198 double clock = smpi_process()->simulated_elapsed();
199 double flops= parse_double(action[2]);
200 int my_proc_id = Actor::self()->getPid();
202 TRACE_smpi_computing_in(my_proc_id, flops);
203 smpi_execute_flops(flops);
204 TRACE_smpi_computing_out(my_proc_id);
206 log_timed_action (action, clock);
209 static void action_send(const char *const *action)
211 CHECK_ACTION_PARAMS(action, 2, 1)
212 int to = atoi(action[2]);
213 double size=parse_double(action[3]);
214 double clock = smpi_process()->simulated_elapsed();
216 MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
218 int my_proc_id = Actor::self()->getPid();
219 int dst_traced = MPI_COMM_WORLD->group()->actor(to)->getPid();
221 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
222 new simgrid::instr::Pt2PtTIData("send", to, size, encode_datatype(MPI_CURRENT_TYPE)));
223 if (not TRACE_smpi_view_internals())
224 TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, size * MPI_CURRENT_TYPE->size());
226 Request::send(nullptr, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
228 TRACE_smpi_comm_out(my_proc_id);
230 log_timed_action(action, clock);
233 static void action_Isend(const char *const *action)
235 CHECK_ACTION_PARAMS(action, 2, 1)
236 int to = atoi(action[2]);
237 double size=parse_double(action[3]);
238 double clock = smpi_process()->simulated_elapsed();
240 MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
242 int my_proc_id = Actor::self()->getPid();
243 int dst_traced = MPI_COMM_WORLD->group()->actor(to)->getPid();
244 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
245 new simgrid::instr::Pt2PtTIData("Isend", to, size, encode_datatype(MPI_CURRENT_TYPE)));
246 if (not TRACE_smpi_view_internals())
247 TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, size * MPI_CURRENT_TYPE->size());
249 MPI_Request request = Request::isend(nullptr, size, MPI_CURRENT_TYPE, to, 0, MPI_COMM_WORLD);
251 TRACE_smpi_comm_out(my_proc_id);
253 get_reqq_self()->push_back(request);
255 log_timed_action (action, clock);
258 static void action_recv(const char *const *action) {
259 CHECK_ACTION_PARAMS(action, 2, 1)
260 int from = atoi(action[2]);
261 double size=parse_double(action[3]);
262 double clock = smpi_process()->simulated_elapsed();
265 MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
267 int my_proc_id = Actor::self()->getPid();
268 int src_traced = MPI_COMM_WORLD->group()->actor(from)->getPid();
270 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
271 new simgrid::instr::Pt2PtTIData("recv", from, size, encode_datatype(MPI_CURRENT_TYPE)));
273 //unknown size from the receiver point of view
275 Request::probe(from, 0, MPI_COMM_WORLD, &status);
279 Request::recv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
281 TRACE_smpi_comm_out(my_proc_id);
282 if (not TRACE_smpi_view_internals()) {
283 TRACE_smpi_recv(src_traced, my_proc_id, 0);
286 log_timed_action (action, clock);
289 static void action_Irecv(const char *const *action)
291 CHECK_ACTION_PARAMS(action, 2, 1)
292 int from = atoi(action[2]);
293 double size=parse_double(action[3]);
294 double clock = smpi_process()->simulated_elapsed();
296 MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
298 int my_proc_id = Actor::self()->getPid();
299 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
300 new simgrid::instr::Pt2PtTIData("Irecv", from, size, encode_datatype(MPI_CURRENT_TYPE)));
302 //unknow size from the receiver pov
304 Request::probe(from, 0, MPI_COMM_WORLD, &status);
308 MPI_Request request = Request::irecv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
310 TRACE_smpi_comm_out(my_proc_id);
311 get_reqq_self()->push_back(request);
313 log_timed_action (action, clock);
316 static void action_test(const char* const* action)
318 CHECK_ACTION_PARAMS(action, 0, 0)
319 double clock = smpi_process()->simulated_elapsed();
322 MPI_Request request = get_reqq_self()->back();
323 get_reqq_self()->pop_back();
324 //if request is null here, this may mean that a previous test has succeeded
325 //Different times in traced application and replayed version may lead to this
326 //In this case, ignore the extra calls.
327 if(request!=nullptr){
328 int my_proc_id = Actor::self()->getPid();
329 TRACE_smpi_testing_in(my_proc_id);
331 int flag = Request::test(&request, &status);
333 XBT_DEBUG("MPI_Test result: %d", flag);
334 /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now nullptr.*/
335 get_reqq_self()->push_back(request);
337 TRACE_smpi_testing_out(my_proc_id);
339 log_timed_action (action, clock);
342 static void action_wait(const char *const *action){
343 CHECK_ACTION_PARAMS(action, 0, 0)
344 double clock = smpi_process()->simulated_elapsed();
347 xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s",
348 xbt_str_join_array(action," "));
349 MPI_Request request = get_reqq_self()->back();
350 get_reqq_self()->pop_back();
352 if (request==nullptr){
353 /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just return.*/
357 int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
359 MPI_Group group = request->comm()->group();
360 int src_traced = group->rank(request->src());
361 int dst_traced = group->rank(request->dst());
362 int is_wait_for_receive = (request->flags() & RECV);
363 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::NoOpTIData("wait"));
365 Request::wait(&request, &status);
367 TRACE_smpi_comm_out(rank);
368 if (is_wait_for_receive)
369 TRACE_smpi_recv(src_traced, dst_traced, 0);
370 log_timed_action (action, clock);
373 static void action_waitall(const char *const *action){
374 CHECK_ACTION_PARAMS(action, 0, 0)
375 double clock = smpi_process()->simulated_elapsed();
376 const unsigned int count_requests = get_reqq_self()->size();
378 if (count_requests>0) {
379 MPI_Status status[count_requests];
381 int my_proc_id_traced = Actor::self()->getPid();
382 TRACE_smpi_comm_in(my_proc_id_traced, __FUNCTION__,
383 new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
384 int recvs_snd[count_requests];
385 int recvs_rcv[count_requests];
386 for (unsigned int i = 0; i < count_requests; i++) {
387 const auto& req = (*get_reqq_self())[i];
388 if (req && (req->flags() & RECV)) {
389 recvs_snd[i] = req->src();
390 recvs_rcv[i] = req->dst();
394 Request::waitall(count_requests, &(*get_reqq_self())[0], status);
396 for (unsigned i = 0; i < count_requests; i++) {
397 if (recvs_snd[i]!=-100)
398 TRACE_smpi_recv(recvs_snd[i], recvs_rcv[i],0);
400 TRACE_smpi_comm_out(my_proc_id_traced);
402 log_timed_action (action, clock);
405 static void action_barrier(const char *const *action){
406 double clock = smpi_process()->simulated_elapsed();
407 int my_proc_id = Actor::self()->getPid();
408 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::NoOpTIData("barrier"));
410 Colls::barrier(MPI_COMM_WORLD);
412 TRACE_smpi_comm_out(my_proc_id);
413 log_timed_action (action, clock);
416 static void action_bcast(const char *const *action)
418 CHECK_ACTION_PARAMS(action, 1, 2)
419 double size = parse_double(action[2]);
420 double clock = smpi_process()->simulated_elapsed();
421 int root = (action[3]) ? atoi(action[3]) : 0;
422 /* Initialize MPI_CURRENT_TYPE in order to decrease the number of the checks */
423 MPI_CURRENT_TYPE = (action[3] && action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
425 int my_proc_id = Actor::self()->getPid();
426 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
427 new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(root)->getPid(), -1.0, size,
428 -1, encode_datatype(MPI_CURRENT_TYPE), ""));
430 void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
432 Colls::bcast(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
434 TRACE_smpi_comm_out(my_proc_id);
435 log_timed_action (action, clock);
438 static void action_reduce(const char *const *action)
440 CHECK_ACTION_PARAMS(action, 2, 2)
441 double comm_size = parse_double(action[2]);
442 double comp_size = parse_double(action[3]);
443 double clock = smpi_process()->simulated_elapsed();
444 int root = (action[4]) ? atoi(action[4]) : 0;
446 MPI_CURRENT_TYPE = (action[4] && action[5]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE;
448 int my_proc_id = Actor::self()->getPid();
449 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
450 new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(root)->getPid(), comp_size,
451 comm_size, -1, encode_datatype(MPI_CURRENT_TYPE), ""));
453 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
454 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
455 Colls::reduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
456 smpi_execute_flops(comp_size);
458 TRACE_smpi_comm_out(my_proc_id);
459 log_timed_action (action, clock);
462 static void action_allReduce(const char *const *action) {
463 CHECK_ACTION_PARAMS(action, 2, 1)
464 double comm_size = parse_double(action[2]);
465 double comp_size = parse_double(action[3]);
467 MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
469 double clock = smpi_process()->simulated_elapsed();
470 int my_proc_id = Actor::self()->getPid();
471 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::CollTIData("allReduce", -1, comp_size, comm_size, -1,
472 encode_datatype(MPI_CURRENT_TYPE), ""));
474 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
475 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
476 Colls::allreduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
477 smpi_execute_flops(comp_size);
479 TRACE_smpi_comm_out(my_proc_id);
480 log_timed_action (action, clock);
483 static void action_allToAll(const char *const *action) {
484 CHECK_ACTION_PARAMS(action, 2, 2) //two mandatory (send and recv volumes) and two optional (corresponding datatypes)
485 double clock = smpi_process()->simulated_elapsed();
486 int comm_size = MPI_COMM_WORLD->size();
487 int send_size = parse_double(action[2]);
488 int recv_size = parse_double(action[3]);
489 MPI_CURRENT_TYPE = (action[4] && action[5]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
490 MPI_Datatype MPI_CURRENT_TYPE2{(action[4] && action[5]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE};
492 void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* MPI_CURRENT_TYPE->size());
493 void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
495 int my_proc_id = Actor::self()->getPid();
496 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
497 new simgrid::instr::CollTIData("allToAll", -1, -1.0, send_size, recv_size,
498 encode_datatype(MPI_CURRENT_TYPE),
499 encode_datatype(MPI_CURRENT_TYPE2)));
501 Colls::alltoall(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
503 TRACE_smpi_comm_out(my_proc_id);
504 log_timed_action (action, clock);
507 static void action_gather(const char *const *action) {
508 /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
511 1) 68 is the sendcounts
512 2) 68 is the recvcounts
513 3) 0 is the root node
514 4) 0 is the send datatype id, see decode_datatype()
515 5) 0 is the recv datatype id, see decode_datatype()
517 CHECK_ACTION_PARAMS(action, 2, 3)
518 double clock = smpi_process()->simulated_elapsed();
519 int comm_size = MPI_COMM_WORLD->size();
520 int send_size = parse_double(action[2]);
521 int recv_size = parse_double(action[3]);
522 MPI_CURRENT_TYPE = (action[5] && action[6]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE;
523 MPI_Datatype MPI_CURRENT_TYPE2{(action[5] && action[6]) ? decode_datatype(action[6]) : MPI_DEFAULT_TYPE};
525 void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
526 void *recv = nullptr;
527 int root = (action[4]) ? atoi(action[4]) : 0;
528 int rank = MPI_COMM_WORLD->rank();
531 recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
533 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
534 encode_datatype(MPI_CURRENT_TYPE),
535 encode_datatype(MPI_CURRENT_TYPE2)));
537 Colls::gather(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
539 TRACE_smpi_comm_out(Actor::self()->getPid());
540 log_timed_action (action, clock);
543 static void action_scatter(const char* const* action)
545 /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
548 1) 68 is the sendcounts
549 2) 68 is the recvcounts
550 3) 0 is the root node
551 4) 0 is the send datatype id, see decode_datatype()
552 5) 0 is the recv datatype id, see decode_datatype()
554 CHECK_ACTION_PARAMS(action, 2, 3)
555 double clock = smpi_process()->simulated_elapsed();
556 int comm_size = MPI_COMM_WORLD->size();
557 int send_size = parse_double(action[2]);
558 int recv_size = parse_double(action[3]);
559 MPI_CURRENT_TYPE = (action[5] && action[6]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE;
560 MPI_Datatype MPI_CURRENT_TYPE2{(action[5] && action[6]) ? decode_datatype(action[6]) : MPI_DEFAULT_TYPE};
562 void* send = smpi_get_tmp_sendbuffer(send_size * MPI_CURRENT_TYPE->size());
563 void* recv = nullptr;
564 int root = (action[4]) ? atoi(action[4]) : 0;
565 int rank = MPI_COMM_WORLD->rank();
568 recv = smpi_get_tmp_recvbuffer(recv_size * comm_size * MPI_CURRENT_TYPE2->size());
570 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
571 encode_datatype(MPI_CURRENT_TYPE),
572 encode_datatype(MPI_CURRENT_TYPE2)));
574 Colls::scatter(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
576 TRACE_smpi_comm_out(Actor::self()->getPid());
577 log_timed_action(action, clock);
580 static void action_gatherv(const char *const *action) {
581 /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
582 0 gather 68 68 10 10 10 0 0 0
584 1) 68 is the sendcount
585 2) 68 10 10 10 is the recvcounts
586 3) 0 is the root node
587 4) 0 is the send datatype id, see decode_datatype()
588 5) 0 is the recv datatype id, see decode_datatype()
590 double clock = smpi_process()->simulated_elapsed();
591 int comm_size = MPI_COMM_WORLD->size();
592 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
593 int send_size = parse_double(action[2]);
594 int disps[comm_size];
595 int recvcounts[comm_size];
599 (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
600 MPI_Datatype MPI_CURRENT_TYPE2{
601 (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
603 void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
604 void *recv = nullptr;
605 for(int i=0;i<comm_size;i++) {
606 recvcounts[i] = atoi(action[i+3]);
607 recv_sum=recv_sum+recvcounts[i];
611 int root = (action[3 + comm_size]) ? atoi(action[3 + comm_size]) : 0;
612 int rank = MPI_COMM_WORLD->rank();
615 recv = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
617 std::vector<int>* trace_recvcounts = new std::vector<int>(recvcounts, recvcounts + comm_size);
619 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData(
620 "gatherV", root, send_size, nullptr, -1, trace_recvcounts,
621 encode_datatype(MPI_CURRENT_TYPE), encode_datatype(MPI_CURRENT_TYPE2)));
623 Colls::gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts, disps, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
625 TRACE_smpi_comm_out(Actor::self()->getPid());
626 log_timed_action (action, clock);
629 static void action_scatterv(const char* const* action)
631 /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
632 0 gather 68 10 10 10 68 0 0 0
634 1) 68 10 10 10 is the sendcounts
635 2) 68 is the recvcount
636 3) 0 is the root node
637 4) 0 is the send datatype id, see decode_datatype()
638 5) 0 is the recv datatype id, see decode_datatype()
640 double clock = smpi_process()->simulated_elapsed();
641 int comm_size = MPI_COMM_WORLD->size();
642 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
643 int recv_size = parse_double(action[2 + comm_size]);
644 int disps[comm_size];
645 int sendcounts[comm_size];
649 (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
650 MPI_Datatype MPI_CURRENT_TYPE2{
651 (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
653 void* send = nullptr;
654 void* recv = smpi_get_tmp_recvbuffer(recv_size * MPI_CURRENT_TYPE->size());
655 for (int i = 0; i < comm_size; i++) {
656 sendcounts[i] = atoi(action[i + 2]);
657 send_sum += sendcounts[i];
661 int root = (action[3 + comm_size]) ? atoi(action[3 + comm_size]) : 0;
662 int rank = MPI_COMM_WORLD->rank();
665 send = smpi_get_tmp_sendbuffer(send_sum * MPI_CURRENT_TYPE2->size());
667 std::vector<int>* trace_sendcounts = new std::vector<int>(sendcounts, sendcounts + comm_size);
669 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData(
670 "gatherV", root, -1, trace_sendcounts, recv_size, nullptr,
671 encode_datatype(MPI_CURRENT_TYPE), encode_datatype(MPI_CURRENT_TYPE2)));
673 Colls::scatterv(send, sendcounts, disps, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
675 TRACE_smpi_comm_out(Actor::self()->getPid());
676 log_timed_action(action, clock);
679 static void action_reducescatter(const char *const *action) {
680 /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
681 0 reduceScatter 275427 275427 275427 204020 11346849 0
683 1) The first four values after the name of the action declare the recvcounts array
684 2) The value 11346849 is the amount of instructions
685 3) The last value corresponds to the datatype, see decode_datatype().
687 double clock = smpi_process()->simulated_elapsed();
688 int comm_size = MPI_COMM_WORLD->size();
689 CHECK_ACTION_PARAMS(action, comm_size+1, 1)
690 int comp_size = parse_double(action[2+comm_size]);
691 int recvcounts[comm_size];
692 int my_proc_id = Actor::self()->getPid();
694 std::vector<int>* trace_recvcounts = new std::vector<int>;
695 MPI_CURRENT_TYPE = (action[3 + comm_size]) ? decode_datatype(action[3 + comm_size]) : MPI_DEFAULT_TYPE;
697 for(int i=0;i<comm_size;i++) {
698 recvcounts[i] = atoi(action[i+2]);
699 trace_recvcounts->push_back(recvcounts[i]);
703 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
704 new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, trace_recvcounts,
705 std::to_string(comp_size), /* ugly hack to print comp_size */
706 encode_datatype(MPI_CURRENT_TYPE)));
708 void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
709 void *recvbuf = smpi_get_tmp_recvbuffer(size* MPI_CURRENT_TYPE->size());
711 Colls::reduce_scatter(sendbuf, recvbuf, recvcounts, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
712 smpi_execute_flops(comp_size);
714 TRACE_smpi_comm_out(my_proc_id);
715 log_timed_action (action, clock);
718 static void action_allgather(const char *const *action) {
719 /* The structure of the allgather action for the rank 0 (total 4 processes) is the following:
720 0 allGather 275427 275427
722 1) 275427 is the sendcount
723 2) 275427 is the recvcount
724 3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
726 double clock = smpi_process()->simulated_elapsed();
728 CHECK_ACTION_PARAMS(action, 2, 2)
729 int sendcount=atoi(action[2]);
730 int recvcount=atoi(action[3]);
732 MPI_CURRENT_TYPE = (action[4] && action[5]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
733 MPI_Datatype MPI_CURRENT_TYPE2{(action[4] && action[5]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE};
735 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
736 void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* MPI_CURRENT_TYPE2->size());
738 int my_proc_id = Actor::self()->getPid();
740 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
741 new simgrid::instr::CollTIData("allGather", -1, -1.0, sendcount, recvcount,
742 encode_datatype(MPI_CURRENT_TYPE),
743 encode_datatype(MPI_CURRENT_TYPE2)));
745 Colls::allgather(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
747 TRACE_smpi_comm_out(my_proc_id);
748 log_timed_action (action, clock);
751 static void action_allgatherv(const char *const *action) {
752 /* The structure of the allgatherv action for the rank 0 (total 4 processes) is the following:
753 0 allGatherV 275427 275427 275427 275427 204020
755 1) 275427 is the sendcount
756 2) The next four elements declare the recvcounts array
757 3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
759 double clock = smpi_process()->simulated_elapsed();
761 int comm_size = MPI_COMM_WORLD->size();
762 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
763 int sendcount=atoi(action[2]);
764 int recvcounts[comm_size];
765 int disps[comm_size];
769 (action[3 + comm_size] && action[4 + comm_size]) ? decode_datatype(action[3 + comm_size]) : MPI_DEFAULT_TYPE;
770 MPI_Datatype MPI_CURRENT_TYPE2{
771 (action[3 + comm_size] && action[4 + comm_size]) ? decode_datatype(action[4 + comm_size]) : MPI_DEFAULT_TYPE};
773 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
775 for(int i=0;i<comm_size;i++) {
776 recvcounts[i] = atoi(action[i+3]);
777 recv_sum=recv_sum+recvcounts[i];
780 void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
782 int my_proc_id = Actor::self()->getPid();
784 std::vector<int>* trace_recvcounts = new std::vector<int>(recvcounts, recvcounts + comm_size);
786 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
787 new simgrid::instr::VarCollTIData("allGatherV", -1, sendcount, nullptr, -1, trace_recvcounts,
788 encode_datatype(MPI_CURRENT_TYPE),
789 encode_datatype(MPI_CURRENT_TYPE2)));
791 Colls::allgatherv(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts, disps, MPI_CURRENT_TYPE2,
794 TRACE_smpi_comm_out(my_proc_id);
795 log_timed_action (action, clock);
798 static void action_allToAllv(const char *const *action) {
799 /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
800 0 allToAllV 100 1 7 10 12 100 1 70 10 5
802 1) 100 is the size of the send buffer *sizeof(int),
803 2) 1 7 10 12 is the sendcounts array
804 3) 100*sizeof(int) is the size of the receiver buffer
805 4) 1 70 10 5 is the recvcounts array
807 double clock = smpi_process()->simulated_elapsed();
809 int comm_size = MPI_COMM_WORLD->size();
810 CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
813 int sendcounts[comm_size];
814 std::vector<int>* trace_sendcounts = new std::vector<int>;
815 int recvcounts[comm_size];
816 std::vector<int>* trace_recvcounts = new std::vector<int>;
817 int senddisps[comm_size];
818 int recvdisps[comm_size];
820 MPI_CURRENT_TYPE = (action[4 + 2 * comm_size] && action[5 + 2 * comm_size])
821 ? decode_datatype(action[4 + 2 * comm_size])
823 MPI_Datatype MPI_CURRENT_TYPE2{(action[4 + 2 * comm_size] && action[5 + 2 * comm_size])
824 ? decode_datatype(action[5 + 2 * comm_size])
827 int send_buf_size=parse_double(action[2]);
828 int recv_buf_size=parse_double(action[3+comm_size]);
829 int my_proc_id = Actor::self()->getPid();
830 void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* MPI_CURRENT_TYPE->size());
831 void *recvbuf = smpi_get_tmp_recvbuffer(recv_buf_size* MPI_CURRENT_TYPE2->size());
833 for(int i=0;i<comm_size;i++) {
834 sendcounts[i] = atoi(action[i+3]);
835 trace_sendcounts->push_back(sendcounts[i]);
836 send_size += sendcounts[i];
837 recvcounts[i] = atoi(action[i+4+comm_size]);
838 trace_recvcounts->push_back(recvcounts[i]);
839 recv_size += recvcounts[i];
844 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
845 new simgrid::instr::VarCollTIData("allToAllV", -1, send_size, trace_sendcounts, recv_size,
846 trace_recvcounts, encode_datatype(MPI_CURRENT_TYPE),
847 encode_datatype(MPI_CURRENT_TYPE2)));
849 Colls::alltoallv(sendbuf, sendcounts, senddisps, MPI_CURRENT_TYPE,recvbuf, recvcounts, recvdisps,
850 MPI_CURRENT_TYPE, MPI_COMM_WORLD);
852 TRACE_smpi_comm_out(my_proc_id);
853 log_timed_action (action, clock);
856 }} // namespace simgrid::smpi
858 /** @brief Only initialize the replay, don't do it for real */
859 void smpi_replay_init(int* argc, char*** argv)
861 simgrid::smpi::Process::init(argc, argv);
862 smpi_process()->mark_as_initialized();
863 smpi_process()->set_replaying(true);
865 int my_proc_id = Actor::self()->getPid();
866 TRACE_smpi_init(my_proc_id);
867 TRACE_smpi_computing_init(my_proc_id);
868 TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
869 TRACE_smpi_comm_out(my_proc_id);
870 xbt_replay_action_register("init", simgrid::smpi::action_init);
871 xbt_replay_action_register("finalize", simgrid::smpi::action_finalize);
872 xbt_replay_action_register("comm_size", simgrid::smpi::action_comm_size);
873 xbt_replay_action_register("comm_split", simgrid::smpi::action_comm_split);
874 xbt_replay_action_register("comm_dup", simgrid::smpi::action_comm_dup);
875 xbt_replay_action_register("send", simgrid::smpi::action_send);
876 xbt_replay_action_register("Isend", simgrid::smpi::action_Isend);
877 xbt_replay_action_register("recv", simgrid::smpi::action_recv);
878 xbt_replay_action_register("Irecv", simgrid::smpi::action_Irecv);
879 xbt_replay_action_register("test", simgrid::smpi::action_test);
880 xbt_replay_action_register("wait", simgrid::smpi::action_wait);
881 xbt_replay_action_register("waitAll", simgrid::smpi::action_waitall);
882 xbt_replay_action_register("barrier", simgrid::smpi::action_barrier);
883 xbt_replay_action_register("bcast", simgrid::smpi::action_bcast);
884 xbt_replay_action_register("reduce", simgrid::smpi::action_reduce);
885 xbt_replay_action_register("allReduce", simgrid::smpi::action_allReduce);
886 xbt_replay_action_register("allToAll", simgrid::smpi::action_allToAll);
887 xbt_replay_action_register("allToAllV", simgrid::smpi::action_allToAllv);
888 xbt_replay_action_register("gather", simgrid::smpi::action_gather);
889 xbt_replay_action_register("scatter", simgrid::smpi::action_scatter);
890 xbt_replay_action_register("gatherV", simgrid::smpi::action_gatherv);
891 xbt_replay_action_register("scatterV", simgrid::smpi::action_scatterv);
892 xbt_replay_action_register("allGather", simgrid::smpi::action_allgather);
893 xbt_replay_action_register("allGatherV", simgrid::smpi::action_allgatherv);
894 xbt_replay_action_register("reduceScatter", simgrid::smpi::action_reducescatter);
895 xbt_replay_action_register("compute", simgrid::smpi::action_compute);
897 //if we have a delayed start, sleep here.
899 double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
900 XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
901 smpi_execute_flops(value);
903 //UGLY: force a context switch to be sure that all MSG_processes begin initialization
904 XBT_DEBUG("Force context switch by smpi_execute_flops - Sleeping for 0.0 flops ");
905 smpi_execute_flops(0.0);
909 /** @brief actually run the replay after initialization */
910 void smpi_replay_main(int* argc, char*** argv)
912 simgrid::xbt::replay_runner(*argc, *argv);
914 /* and now, finalize everything */
915 /* One active process will stop. Decrease the counter*/
916 XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
917 if (not get_reqq_self()->empty()) {
918 unsigned int count_requests=get_reqq_self()->size();
919 MPI_Request requests[count_requests];
920 MPI_Status status[count_requests];
923 for (auto const& req : *get_reqq_self()) {
927 simgrid::smpi::Request::waitall(count_requests, requests, status);
929 delete get_reqq_self();
932 if(active_processes==0){
933 /* Last process alive speaking: end the simulated timer */
934 XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
935 xbt_free(sendbuffer);
936 xbt_free(recvbuffer);
939 TRACE_smpi_comm_in(Actor::self()->getPid(), "smpi_replay_run_finalize", new simgrid::instr::NoOpTIData("finalize"));
941 smpi_process()->finalize();
943 TRACE_smpi_comm_out(Actor::self()->getPid());
944 TRACE_smpi_finalize(Actor::self()->getPid());
947 /** @brief chain a replay initialization and a replay start */
948 void smpi_replay_run(int* argc, char*** argv)
950 smpi_replay_init(argc, argv);
951 smpi_replay_main(argc, argv);