1 /* Copyright (c) 2009-2017. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
15 #include <unordered_map>
18 using simgrid::s4u::Actor;
20 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
22 static int communicator_size = 0;
23 static int active_processes = 0;
24 static std::unordered_map<int, std::vector<MPI_Request>*> reqq;
26 static MPI_Datatype MPI_DEFAULT_TYPE;
27 static MPI_Datatype MPI_CURRENT_TYPE;
29 static int sendbuffer_size = 0;
30 static char* sendbuffer = nullptr;
31 static int recvbuffer_size = 0;
32 static char* recvbuffer = nullptr;
34 static void log_timed_action (const char *const *action, double clock){
35 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
36 char *name = xbt_str_join_array(action, " ");
37 XBT_VERB("%s %f", name, smpi_process()->simulated_elapsed()-clock);
42 static std::vector<MPI_Request>* get_reqq_self()
44 return reqq.at(Actor::self()->getPid());
47 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
49 reqq.insert({Actor::self()->getPid(), mpi_request});
52 //allocate a single buffer for all sends, growing it if needed
53 void* smpi_get_tmp_sendbuffer(int size)
55 if (not smpi_process()->replaying())
56 return xbt_malloc(size);
57 if (sendbuffer_size<size){
58 sendbuffer=static_cast<char*>(xbt_realloc(sendbuffer,size));
64 //allocate a single buffer for all recv
65 void* smpi_get_tmp_recvbuffer(int size){
66 if (not smpi_process()->replaying())
67 return xbt_malloc(size);
68 if (recvbuffer_size<size){
69 recvbuffer=static_cast<char*>(xbt_realloc(recvbuffer,size));
75 void smpi_free_tmp_buffer(void* buf){
76 if (not smpi_process()->replaying())
81 static double parse_double(const char *string)
84 double value = strtod(string, &endptr);
86 THROWF(unknown_error, 0, "%s is not a double", string);
91 //TODO: this logic should be moved inside the datatype class, to support all predefined types and get rid of is_replayable.
92 static MPI_Datatype decode_datatype(const char *const action)
94 switch(atoi(action)) {
117 return MPI_DEFAULT_TYPE;
122 const char* encode_datatype(MPI_Datatype datatype)
124 if (datatype==MPI_BYTE)
126 if(datatype==MPI_DOUBLE)
128 if(datatype==MPI_INT)
130 if(datatype==MPI_CHAR)
132 if(datatype==MPI_SHORT)
134 if(datatype==MPI_LONG)
136 if(datatype==MPI_FLOAT)
138 // default - not implemented.
139 // do not warn here as we pass in this function even for other trace formats
143 #define CHECK_ACTION_PARAMS(action, mandatory, optional) {\
145 while(action[i]!=nullptr)\
148 THROWF(arg_error, 0, "%s replay failed.\n" \
149 "%d items were given on the line. First two should be process_id and action. " \
150 "This action needs after them %d mandatory arguments, and accepts %d optional ones. \n" \
151 "Please contact the Simgrid team if support is needed", __FUNCTION__, i, mandatory, optional);\
157 static void action_init(const char *const *action)
159 XBT_DEBUG("Initialize the counters");
160 CHECK_ACTION_PARAMS(action, 0, 1)
162 MPI_DEFAULT_TYPE = MPI_DOUBLE; // default MPE datatype
164 MPI_DEFAULT_TYPE = MPI_BYTE; // default TAU datatype
166 /* start a simulated timer */
167 smpi_process()->simulated_start();
168 /*initialize the number of active processes */
169 active_processes = smpi_process_count();
171 set_reqq_self(new std::vector<MPI_Request>);
174 static void action_finalize(const char *const *action)
179 static void action_comm_size(const char *const *action)
181 communicator_size = parse_double(action[2]);
182 log_timed_action (action, smpi_process()->simulated_elapsed());
185 static void action_comm_split(const char *const *action)
187 log_timed_action (action, smpi_process()->simulated_elapsed());
190 static void action_comm_dup(const char *const *action)
192 log_timed_action (action, smpi_process()->simulated_elapsed());
195 static void action_compute(const char *const *action)
197 CHECK_ACTION_PARAMS(action, 1, 0)
198 double clock = smpi_process()->simulated_elapsed();
199 double flops= parse_double(action[2]);
200 int my_proc_id = Actor::self()->getPid();
202 TRACE_smpi_computing_in(my_proc_id, flops);
203 smpi_execute_flops(flops);
204 TRACE_smpi_computing_out(my_proc_id);
206 log_timed_action (action, clock);
209 static void action_send(const char *const *action)
211 CHECK_ACTION_PARAMS(action, 2, 1)
212 int to = atoi(action[2]);
213 double size=parse_double(action[3]);
214 double clock = smpi_process()->simulated_elapsed();
216 MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
218 int my_proc_id = Actor::self()->getPid();
219 int dst_traced = MPI_COMM_WORLD->group()->actor(to)->getPid();
221 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
222 new simgrid::instr::Pt2PtTIData("send", to, size, encode_datatype(MPI_CURRENT_TYPE)));
223 if (not TRACE_smpi_view_internals())
224 TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, size * MPI_CURRENT_TYPE->size());
226 Request::send(nullptr, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
228 TRACE_smpi_comm_out(my_proc_id);
230 log_timed_action(action, clock);
233 static void action_Isend(const char *const *action)
235 CHECK_ACTION_PARAMS(action, 2, 1)
236 int to = atoi(action[2]);
237 double size=parse_double(action[3]);
238 double clock = smpi_process()->simulated_elapsed();
240 MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
242 int my_proc_id = Actor::self()->getPid();
243 int dst_traced = MPI_COMM_WORLD->group()->actor(to)->getPid();
244 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
245 new simgrid::instr::Pt2PtTIData("Isend", to, size, encode_datatype(MPI_CURRENT_TYPE)));
246 if (not TRACE_smpi_view_internals())
247 TRACE_smpi_send(my_proc_id, my_proc_id, dst_traced, 0, size * MPI_CURRENT_TYPE->size());
249 MPI_Request request = Request::isend(nullptr, size, MPI_CURRENT_TYPE, to, 0, MPI_COMM_WORLD);
251 TRACE_smpi_comm_out(my_proc_id);
253 get_reqq_self()->push_back(request);
255 log_timed_action (action, clock);
258 static void action_recv(const char *const *action) {
259 CHECK_ACTION_PARAMS(action, 2, 1)
260 int from = atoi(action[2]);
261 double size=parse_double(action[3]);
262 double clock = smpi_process()->simulated_elapsed();
265 MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
267 int my_proc_id = Actor::self()->getPid();
268 int src_traced = MPI_COMM_WORLD->group()->actor(from)->getPid();
270 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
271 new simgrid::instr::Pt2PtTIData("recv", from, size, encode_datatype(MPI_CURRENT_TYPE)));
273 //unknown size from the receiver point of view
275 Request::probe(from, 0, MPI_COMM_WORLD, &status);
279 Request::recv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
281 TRACE_smpi_comm_out(my_proc_id);
282 if (not TRACE_smpi_view_internals()) {
283 TRACE_smpi_recv(src_traced, my_proc_id, 0);
286 log_timed_action (action, clock);
289 static void action_Irecv(const char *const *action)
291 CHECK_ACTION_PARAMS(action, 2, 1)
292 int from = atoi(action[2]);
293 double size=parse_double(action[3]);
294 double clock = smpi_process()->simulated_elapsed();
296 MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
298 int my_proc_id = Actor::self()->getPid();
299 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
300 new simgrid::instr::Pt2PtTIData("Irecv", from, size, encode_datatype(MPI_CURRENT_TYPE)));
302 //unknow size from the receiver pov
304 Request::probe(from, 0, MPI_COMM_WORLD, &status);
308 MPI_Request request = Request::irecv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
310 TRACE_smpi_comm_out(my_proc_id);
311 get_reqq_self()->push_back(request);
313 log_timed_action (action, clock);
316 static void action_test(const char* const* action)
318 CHECK_ACTION_PARAMS(action, 0, 0)
319 double clock = smpi_process()->simulated_elapsed();
322 MPI_Request request = get_reqq_self()->back();
323 get_reqq_self()->pop_back();
324 //if request is null here, this may mean that a previous test has succeeded
325 //Different times in traced application and replayed version may lead to this
326 //In this case, ignore the extra calls.
327 if(request!=nullptr){
328 int my_proc_id = Actor::self()->getPid();
329 TRACE_smpi_testing_in(my_proc_id);
331 int flag = Request::test(&request, &status);
333 XBT_DEBUG("MPI_Test result: %d", flag);
334 /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now nullptr.*/
335 get_reqq_self()->push_back(request);
337 TRACE_smpi_testing_out(my_proc_id);
339 log_timed_action (action, clock);
342 static void action_wait(const char *const *action){
343 CHECK_ACTION_PARAMS(action, 0, 0)
344 double clock = smpi_process()->simulated_elapsed();
347 xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s",
348 xbt_str_join_array(action," "));
349 MPI_Request request = get_reqq_self()->back();
350 get_reqq_self()->pop_back();
352 if (request==nullptr){
353 /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just return.*/
357 int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
359 MPI_Group group = request->comm()->group();
360 int src_traced = group->rank(request->src());
361 int dst_traced = group->rank(request->dst());
362 int is_wait_for_receive = (request->flags() & RECV);
363 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::NoOpTIData("wait"));
365 Request::wait(&request, &status);
367 TRACE_smpi_comm_out(rank);
368 if (is_wait_for_receive)
369 TRACE_smpi_recv(src_traced, dst_traced, 0);
370 log_timed_action (action, clock);
373 static void action_waitall(const char *const *action){
374 CHECK_ACTION_PARAMS(action, 0, 0)
375 double clock = smpi_process()->simulated_elapsed();
376 const unsigned int count_requests = get_reqq_self()->size();
378 if (count_requests>0) {
379 MPI_Status status[count_requests];
381 int my_proc_id_traced = Actor::self()->getPid();
382 TRACE_smpi_comm_in(my_proc_id_traced, __FUNCTION__,
383 new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
384 int recvs_snd[count_requests];
385 int recvs_rcv[count_requests];
386 for (unsigned int i = 0; i < count_requests; i++) {
387 const auto& req = (*get_reqq_self())[i];
388 if (req && (req->flags() & RECV)) {
389 recvs_snd[i] = req->src();
390 recvs_rcv[i] = req->dst();
394 Request::waitall(count_requests, &(*get_reqq_self())[0], status);
396 for (unsigned i = 0; i < count_requests; i++) {
397 if (recvs_snd[i]!=-100)
398 TRACE_smpi_recv(recvs_snd[i], recvs_rcv[i],0);
400 TRACE_smpi_comm_out(my_proc_id_traced);
402 log_timed_action (action, clock);
405 static void action_barrier(const char *const *action){
406 double clock = smpi_process()->simulated_elapsed();
407 int my_proc_id = Actor::self()->getPid();
408 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::NoOpTIData("barrier"));
410 Colls::barrier(MPI_COMM_WORLD);
412 TRACE_smpi_comm_out(my_proc_id);
413 log_timed_action (action, clock);
416 static void action_bcast(const char *const *action)
418 CHECK_ACTION_PARAMS(action, 1, 2)
419 double size = parse_double(action[2]);
420 double clock = smpi_process()->simulated_elapsed();
421 int root = (action[3]) ? atoi(action[3]) : 0;
422 /* Initialize MPI_CURRENT_TYPE in order to decrease the number of the checks */
423 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
425 MPI_CURRENT_TYPE = (action[3] && action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
427 int my_proc_id = Actor::self()->getPid();
428 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
429 new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->actor(root)->getPid(), -1.0, size,
430 -1, encode_datatype(MPI_CURRENT_TYPE), ""));
432 void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
434 Colls::bcast(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
436 TRACE_smpi_comm_out(my_proc_id);
437 log_timed_action (action, clock);
440 static void action_reduce(const char *const *action)
442 CHECK_ACTION_PARAMS(action, 2, 2)
443 double comm_size = parse_double(action[2]);
444 double comp_size = parse_double(action[3]);
445 double clock = smpi_process()->simulated_elapsed();
446 int root = (action[4]) ? atoi(action[4]) : 0;
448 MPI_CURRENT_TYPE = (action[4] && action[5]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE;
450 int my_proc_id = Actor::self()->getPid();
451 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
452 new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->actor(root)->getPid(), comp_size,
453 comm_size, -1, encode_datatype(MPI_CURRENT_TYPE), ""));
455 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
456 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
457 Colls::reduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
458 smpi_execute_flops(comp_size);
460 TRACE_smpi_comm_out(my_proc_id);
461 log_timed_action (action, clock);
464 static void action_allReduce(const char *const *action) {
465 CHECK_ACTION_PARAMS(action, 2, 1)
466 double comm_size = parse_double(action[2]);
467 double comp_size = parse_double(action[3]);
469 MPI_CURRENT_TYPE = (action[4]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
471 double clock = smpi_process()->simulated_elapsed();
472 int my_proc_id = Actor::self()->getPid();
473 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__, new simgrid::instr::CollTIData("allReduce", -1, comp_size, comm_size, -1,
474 encode_datatype(MPI_CURRENT_TYPE), ""));
476 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
477 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
478 Colls::allreduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
479 smpi_execute_flops(comp_size);
481 TRACE_smpi_comm_out(my_proc_id);
482 log_timed_action (action, clock);
485 static void action_allToAll(const char *const *action) {
486 CHECK_ACTION_PARAMS(action, 2, 2) //two mandatory (send and recv volumes) and two optional (corresponding datatypes)
487 double clock = smpi_process()->simulated_elapsed();
488 int comm_size = MPI_COMM_WORLD->size();
489 int send_size = parse_double(action[2]);
490 int recv_size = parse_double(action[3]);
491 MPI_CURRENT_TYPE = (action[4] && action[5]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
492 MPI_Datatype MPI_CURRENT_TYPE2{(action[4] && action[5]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE};
494 void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* MPI_CURRENT_TYPE->size());
495 void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
497 int my_proc_id = Actor::self()->getPid();
498 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
499 new simgrid::instr::CollTIData("allToAll", -1, -1.0, send_size, recv_size,
500 encode_datatype(MPI_CURRENT_TYPE),
501 encode_datatype(MPI_CURRENT_TYPE2)));
503 Colls::alltoall(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
505 TRACE_smpi_comm_out(my_proc_id);
506 log_timed_action (action, clock);
509 static void action_gather(const char *const *action) {
510 /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
513 1) 68 is the sendcounts
514 2) 68 is the recvcounts
515 3) 0 is the root node
516 4) 0 is the send datatype id, see decode_datatype()
517 5) 0 is the recv datatype id, see decode_datatype()
519 CHECK_ACTION_PARAMS(action, 2, 3)
520 double clock = smpi_process()->simulated_elapsed();
521 int comm_size = MPI_COMM_WORLD->size();
522 int send_size = parse_double(action[2]);
523 int recv_size = parse_double(action[3]);
524 MPI_CURRENT_TYPE = (action[5] && action[6]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE;
525 MPI_Datatype MPI_CURRENT_TYPE2{(action[5] && action[6]) ? decode_datatype(action[6]) : MPI_DEFAULT_TYPE};
527 void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
528 void *recv = nullptr;
529 int root = (action[4]) ? atoi(action[4]) : 0;
530 int rank = MPI_COMM_WORLD->rank();
533 recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
535 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
536 encode_datatype(MPI_CURRENT_TYPE),
537 encode_datatype(MPI_CURRENT_TYPE2)));
539 Colls::gather(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
541 TRACE_smpi_comm_out(Actor::self()->getPid());
542 log_timed_action (action, clock);
545 static void action_scatter(const char* const* action)
547 /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
550 1) 68 is the sendcounts
551 2) 68 is the recvcounts
552 3) 0 is the root node
553 4) 0 is the send datatype id, see decode_datatype()
554 5) 0 is the recv datatype id, see decode_datatype()
556 CHECK_ACTION_PARAMS(action, 2, 3)
557 double clock = smpi_process()->simulated_elapsed();
558 int comm_size = MPI_COMM_WORLD->size();
559 int send_size = parse_double(action[2]);
560 int recv_size = parse_double(action[3]);
561 MPI_CURRENT_TYPE = (action[5] && action[6]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE;
562 MPI_Datatype MPI_CURRENT_TYPE2{(action[5] && action[6]) ? decode_datatype(action[6]) : MPI_DEFAULT_TYPE};
564 void* send = smpi_get_tmp_sendbuffer(send_size * MPI_CURRENT_TYPE->size());
565 void* recv = nullptr;
566 int root = (action[4]) ? atoi(action[4]) : 0;
567 int rank = MPI_COMM_WORLD->rank();
570 recv = smpi_get_tmp_recvbuffer(recv_size * comm_size * MPI_CURRENT_TYPE2->size());
572 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
573 encode_datatype(MPI_CURRENT_TYPE),
574 encode_datatype(MPI_CURRENT_TYPE2)));
576 Colls::scatter(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
578 TRACE_smpi_comm_out(Actor::self()->getPid());
579 log_timed_action(action, clock);
582 static void action_gatherv(const char *const *action) {
583 /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
584 0 gather 68 68 10 10 10 0 0 0
586 1) 68 is the sendcount
587 2) 68 10 10 10 is the recvcounts
588 3) 0 is the root node
589 4) 0 is the send datatype id, see decode_datatype()
590 5) 0 is the recv datatype id, see decode_datatype()
592 double clock = smpi_process()->simulated_elapsed();
593 int comm_size = MPI_COMM_WORLD->size();
594 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
595 int send_size = parse_double(action[2]);
596 int disps[comm_size];
597 int recvcounts[comm_size];
601 (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
602 MPI_Datatype MPI_CURRENT_TYPE2{
603 (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
605 void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
606 void *recv = nullptr;
607 for(int i=0;i<comm_size;i++) {
608 recvcounts[i] = atoi(action[i+3]);
609 recv_sum=recv_sum+recvcounts[i];
613 int root = (action[3 + comm_size]) ? atoi(action[3 + comm_size]) : 0;
614 int rank = MPI_COMM_WORLD->rank();
617 recv = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
619 std::vector<int>* trace_recvcounts = new std::vector<int>(recvcounts, recvcounts + comm_size);
621 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData(
622 "gatherV", root, send_size, nullptr, -1, trace_recvcounts,
623 encode_datatype(MPI_CURRENT_TYPE), encode_datatype(MPI_CURRENT_TYPE2)));
625 Colls::gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts, disps, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
627 TRACE_smpi_comm_out(Actor::self()->getPid());
628 log_timed_action (action, clock);
631 static void action_scatterv(const char* const* action)
633 /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
634 0 gather 68 10 10 10 68 0 0 0
636 1) 68 10 10 10 is the sendcounts
637 2) 68 is the recvcount
638 3) 0 is the root node
639 4) 0 is the send datatype id, see decode_datatype()
640 5) 0 is the recv datatype id, see decode_datatype()
642 double clock = smpi_process()->simulated_elapsed();
643 int comm_size = MPI_COMM_WORLD->size();
644 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
645 int recv_size = parse_double(action[2 + comm_size]);
646 int disps[comm_size];
647 int sendcounts[comm_size];
651 (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[4 + comm_size]) : MPI_DEFAULT_TYPE;
652 MPI_Datatype MPI_CURRENT_TYPE2{
653 (action[4 + comm_size] && action[5 + comm_size]) ? decode_datatype(action[5 + comm_size]) : MPI_DEFAULT_TYPE};
655 void* send = nullptr;
656 void* recv = smpi_get_tmp_recvbuffer(recv_size * MPI_CURRENT_TYPE->size());
657 for (int i = 0; i < comm_size; i++) {
658 sendcounts[i] = atoi(action[i + 2]);
659 send_sum += sendcounts[i];
663 int root = (action[3 + comm_size]) ? atoi(action[3 + comm_size]) : 0;
664 int rank = MPI_COMM_WORLD->rank();
667 send = smpi_get_tmp_sendbuffer(send_sum * MPI_CURRENT_TYPE2->size());
669 std::vector<int>* trace_sendcounts = new std::vector<int>(sendcounts, sendcounts + comm_size);
671 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData(
672 "gatherV", root, -1, trace_sendcounts, recv_size, nullptr,
673 encode_datatype(MPI_CURRENT_TYPE), encode_datatype(MPI_CURRENT_TYPE2)));
675 Colls::scatterv(send, sendcounts, disps, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
677 TRACE_smpi_comm_out(Actor::self()->getPid());
678 log_timed_action(action, clock);
681 static void action_reducescatter(const char *const *action) {
682 /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
683 0 reduceScatter 275427 275427 275427 204020 11346849 0
685 1) The first four values after the name of the action declare the recvcounts array
686 2) The value 11346849 is the amount of instructions
687 3) The last value corresponds to the datatype, see decode_datatype().
689 double clock = smpi_process()->simulated_elapsed();
690 int comm_size = MPI_COMM_WORLD->size();
691 CHECK_ACTION_PARAMS(action, comm_size+1, 1)
692 int comp_size = parse_double(action[2+comm_size]);
693 int recvcounts[comm_size];
694 int my_proc_id = Actor::self()->getPid();
696 std::vector<int>* trace_recvcounts = new std::vector<int>;
697 MPI_CURRENT_TYPE = (action[3 + comm_size]) ? decode_datatype(action[3 + comm_size]) : MPI_DEFAULT_TYPE;
699 for(int i=0;i<comm_size;i++) {
700 recvcounts[i] = atoi(action[i+2]);
701 trace_recvcounts->push_back(recvcounts[i]);
705 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
706 new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, trace_recvcounts,
707 std::to_string(comp_size), /* ugly hack to print comp_size */
708 encode_datatype(MPI_CURRENT_TYPE)));
710 void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
711 void *recvbuf = smpi_get_tmp_recvbuffer(size* MPI_CURRENT_TYPE->size());
713 Colls::reduce_scatter(sendbuf, recvbuf, recvcounts, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
714 smpi_execute_flops(comp_size);
716 TRACE_smpi_comm_out(my_proc_id);
717 log_timed_action (action, clock);
720 static void action_allgather(const char *const *action) {
721 /* The structure of the allgather action for the rank 0 (total 4 processes) is the following:
722 0 allGather 275427 275427
724 1) 275427 is the sendcount
725 2) 275427 is the recvcount
726 3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
728 double clock = smpi_process()->simulated_elapsed();
730 CHECK_ACTION_PARAMS(action, 2, 2)
731 int sendcount=atoi(action[2]);
732 int recvcount=atoi(action[3]);
734 MPI_CURRENT_TYPE = (action[4] && action[5]) ? decode_datatype(action[4]) : MPI_DEFAULT_TYPE;
735 MPI_Datatype MPI_CURRENT_TYPE2{(action[4] && action[5]) ? decode_datatype(action[5]) : MPI_DEFAULT_TYPE};
737 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
738 void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* MPI_CURRENT_TYPE2->size());
740 int my_proc_id = Actor::self()->getPid();
742 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
743 new simgrid::instr::CollTIData("allGather", -1, -1.0, sendcount, recvcount,
744 encode_datatype(MPI_CURRENT_TYPE),
745 encode_datatype(MPI_CURRENT_TYPE2)));
747 Colls::allgather(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
749 TRACE_smpi_comm_out(my_proc_id);
750 log_timed_action (action, clock);
753 static void action_allgatherv(const char *const *action) {
754 /* The structure of the allgatherv action for the rank 0 (total 4 processes) is the following:
755 0 allGatherV 275427 275427 275427 275427 204020
757 1) 275427 is the sendcount
758 2) The next four elements declare the recvcounts array
759 3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
761 double clock = smpi_process()->simulated_elapsed();
763 int comm_size = MPI_COMM_WORLD->size();
764 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
765 int sendcount=atoi(action[2]);
766 int recvcounts[comm_size];
767 int disps[comm_size];
771 (action[3 + comm_size] && action[4 + comm_size]) ? decode_datatype(action[3 + comm_size]) : MPI_DEFAULT_TYPE;
772 MPI_Datatype MPI_CURRENT_TYPE2{
773 (action[3 + comm_size] && action[4 + comm_size]) ? decode_datatype(action[4 + comm_size]) : MPI_DEFAULT_TYPE};
775 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
777 for(int i=0;i<comm_size;i++) {
778 recvcounts[i] = atoi(action[i+3]);
779 recv_sum=recv_sum+recvcounts[i];
782 void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
784 int my_proc_id = Actor::self()->getPid();
786 std::vector<int>* trace_recvcounts = new std::vector<int>(recvcounts, recvcounts + comm_size);
788 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
789 new simgrid::instr::VarCollTIData("allGatherV", -1, sendcount, nullptr, -1, trace_recvcounts,
790 encode_datatype(MPI_CURRENT_TYPE),
791 encode_datatype(MPI_CURRENT_TYPE2)));
793 Colls::allgatherv(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts, disps, MPI_CURRENT_TYPE2,
796 TRACE_smpi_comm_out(my_proc_id);
797 log_timed_action (action, clock);
800 static void action_allToAllv(const char *const *action) {
801 /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
802 0 allToAllV 100 1 7 10 12 100 1 70 10 5
804 1) 100 is the size of the send buffer *sizeof(int),
805 2) 1 7 10 12 is the sendcounts array
806 3) 100*sizeof(int) is the size of the receiver buffer
807 4) 1 70 10 5 is the recvcounts array
809 double clock = smpi_process()->simulated_elapsed();
811 int comm_size = MPI_COMM_WORLD->size();
812 CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
815 int sendcounts[comm_size];
816 std::vector<int>* trace_sendcounts = new std::vector<int>;
817 int recvcounts[comm_size];
818 std::vector<int>* trace_recvcounts = new std::vector<int>;
819 int senddisps[comm_size];
820 int recvdisps[comm_size];
822 MPI_CURRENT_TYPE = (action[4 + 2 * comm_size] && action[5 + 2 * comm_size])
823 ? decode_datatype(action[4 + 2 * comm_size])
825 MPI_Datatype MPI_CURRENT_TYPE2{(action[4 + 2 * comm_size] && action[5 + 2 * comm_size])
826 ? decode_datatype(action[5 + 2 * comm_size])
829 int send_buf_size=parse_double(action[2]);
830 int recv_buf_size=parse_double(action[3+comm_size]);
831 int my_proc_id = Actor::self()->getPid();
832 void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* MPI_CURRENT_TYPE->size());
833 void *recvbuf = smpi_get_tmp_recvbuffer(recv_buf_size* MPI_CURRENT_TYPE2->size());
835 for(int i=0;i<comm_size;i++) {
836 sendcounts[i] = atoi(action[i+3]);
837 trace_sendcounts->push_back(sendcounts[i]);
838 send_size += sendcounts[i];
839 recvcounts[i] = atoi(action[i+4+comm_size]);
840 trace_recvcounts->push_back(recvcounts[i]);
841 recv_size += recvcounts[i];
846 TRACE_smpi_comm_in(my_proc_id, __FUNCTION__,
847 new simgrid::instr::VarCollTIData("allToAllV", -1, send_size, trace_sendcounts, recv_size,
848 trace_recvcounts, encode_datatype(MPI_CURRENT_TYPE),
849 encode_datatype(MPI_CURRENT_TYPE2)));
851 Colls::alltoallv(sendbuf, sendcounts, senddisps, MPI_CURRENT_TYPE,recvbuf, recvcounts, recvdisps,
852 MPI_CURRENT_TYPE, MPI_COMM_WORLD);
854 TRACE_smpi_comm_out(my_proc_id);
855 log_timed_action (action, clock);
858 }} // namespace simgrid::smpi
860 /** @brief Only initialize the replay, don't do it for real */
861 void smpi_replay_init(int* argc, char*** argv)
863 simgrid::smpi::Process::init(argc, argv);
864 smpi_process()->mark_as_initialized();
865 smpi_process()->set_replaying(true);
867 int my_proc_id = Actor::self()->getPid();
868 TRACE_smpi_init(my_proc_id);
869 TRACE_smpi_computing_init(my_proc_id);
870 TRACE_smpi_comm_in(my_proc_id, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
871 TRACE_smpi_comm_out(my_proc_id);
872 xbt_replay_action_register("init", simgrid::smpi::action_init);
873 xbt_replay_action_register("finalize", simgrid::smpi::action_finalize);
874 xbt_replay_action_register("comm_size", simgrid::smpi::action_comm_size);
875 xbt_replay_action_register("comm_split", simgrid::smpi::action_comm_split);
876 xbt_replay_action_register("comm_dup", simgrid::smpi::action_comm_dup);
877 xbt_replay_action_register("send", simgrid::smpi::action_send);
878 xbt_replay_action_register("Isend", simgrid::smpi::action_Isend);
879 xbt_replay_action_register("recv", simgrid::smpi::action_recv);
880 xbt_replay_action_register("Irecv", simgrid::smpi::action_Irecv);
881 xbt_replay_action_register("test", simgrid::smpi::action_test);
882 xbt_replay_action_register("wait", simgrid::smpi::action_wait);
883 xbt_replay_action_register("waitAll", simgrid::smpi::action_waitall);
884 xbt_replay_action_register("barrier", simgrid::smpi::action_barrier);
885 xbt_replay_action_register("bcast", simgrid::smpi::action_bcast);
886 xbt_replay_action_register("reduce", simgrid::smpi::action_reduce);
887 xbt_replay_action_register("allReduce", simgrid::smpi::action_allReduce);
888 xbt_replay_action_register("allToAll", simgrid::smpi::action_allToAll);
889 xbt_replay_action_register("allToAllV", simgrid::smpi::action_allToAllv);
890 xbt_replay_action_register("gather", simgrid::smpi::action_gather);
891 xbt_replay_action_register("scatter", simgrid::smpi::action_scatter);
892 xbt_replay_action_register("gatherV", simgrid::smpi::action_gatherv);
893 xbt_replay_action_register("scatterV", simgrid::smpi::action_scatterv);
894 xbt_replay_action_register("allGather", simgrid::smpi::action_allgather);
895 xbt_replay_action_register("allGatherV", simgrid::smpi::action_allgatherv);
896 xbt_replay_action_register("reduceScatter", simgrid::smpi::action_reducescatter);
897 xbt_replay_action_register("compute", simgrid::smpi::action_compute);
899 //if we have a delayed start, sleep here.
901 double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
902 XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
903 smpi_execute_flops(value);
905 //UGLY: force a context switch to be sure that all MSG_processes begin initialization
906 XBT_DEBUG("Force context switch by smpi_execute_flops - Sleeping for 0.0 flops ");
907 smpi_execute_flops(0.0);
911 /** @brief actually run the replay after initialization */
912 void smpi_replay_main(int* argc, char*** argv)
914 simgrid::xbt::replay_runner(*argc, *argv);
916 /* and now, finalize everything */
917 /* One active process will stop. Decrease the counter*/
918 XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
919 if (not get_reqq_self()->empty()) {
920 unsigned int count_requests=get_reqq_self()->size();
921 MPI_Request requests[count_requests];
922 MPI_Status status[count_requests];
925 for (auto const& req : *get_reqq_self()) {
929 simgrid::smpi::Request::waitall(count_requests, requests, status);
931 delete get_reqq_self();
934 if(active_processes==0){
935 /* Last process alive speaking: end the simulated timer */
936 XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
937 xbt_free(sendbuffer);
938 xbt_free(recvbuffer);
941 TRACE_smpi_comm_in(Actor::self()->getPid(), "smpi_replay_run_finalize", new simgrid::instr::NoOpTIData("finalize"));
943 smpi_process()->finalize();
945 TRACE_smpi_comm_out(Actor::self()->getPid());
946 TRACE_smpi_finalize(Actor::self()->getPid());
949 /** @brief chain a replay initialization and a replay start */
950 void smpi_replay_run(int* argc, char*** argv)
952 smpi_replay_init(argc, argv);
953 smpi_replay_main(argc, argv);