1 /* Copyright (c) 2009-2017. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
15 #include <unordered_map>
18 #define KEY_SIZE (sizeof(int) * 2 + 1)
20 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
22 int communicator_size = 0;
23 static int active_processes = 0;
24 std::unordered_map<int,std::vector<MPI_Request>*> reqq;
26 MPI_Datatype MPI_DEFAULT_TYPE;
27 MPI_Datatype MPI_CURRENT_TYPE;
29 static int sendbuffer_size=0;
30 char* sendbuffer=nullptr;
31 static int recvbuffer_size=0;
32 char* recvbuffer=nullptr;
34 static void log_timed_action (const char *const *action, double clock){
35 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
36 char *name = xbt_str_join_array(action, " ");
37 XBT_VERB("%s %f", name, smpi_process()->simulated_elapsed()-clock);
42 static std::vector<MPI_Request>* get_reqq_self()
44 return reqq.at(smpi_process()->index());
47 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
49 reqq.insert({smpi_process()->index(), mpi_request});
52 //allocate a single buffer for all sends, growing it if needed
53 void* smpi_get_tmp_sendbuffer(int size)
55 if (not smpi_process()->replaying())
56 return xbt_malloc(size);
57 if (sendbuffer_size<size){
58 sendbuffer=static_cast<char*>(xbt_realloc(sendbuffer,size));
64 //allocate a single buffer for all recv
65 void* smpi_get_tmp_recvbuffer(int size){
66 if (not smpi_process()->replaying())
67 return xbt_malloc(size);
68 if (recvbuffer_size<size){
69 recvbuffer=static_cast<char*>(xbt_realloc(recvbuffer,size));
75 void smpi_free_tmp_buffer(void* buf){
76 if (not smpi_process()->replaying())
81 static double parse_double(const char *string)
84 double value = strtod(string, &endptr);
86 THROWF(unknown_error, 0, "%s is not a double", string);
90 static MPI_Datatype decode_datatype(const char *const action)
92 switch(atoi(action)) {
94 MPI_CURRENT_TYPE=MPI_DOUBLE;
97 MPI_CURRENT_TYPE=MPI_INT;
100 MPI_CURRENT_TYPE=MPI_CHAR;
103 MPI_CURRENT_TYPE=MPI_SHORT;
106 MPI_CURRENT_TYPE=MPI_LONG;
109 MPI_CURRENT_TYPE=MPI_FLOAT;
112 MPI_CURRENT_TYPE=MPI_BYTE;
115 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
118 return MPI_CURRENT_TYPE;
121 const char* encode_datatype(MPI_Datatype datatype)
123 if (datatype==MPI_BYTE)
125 if(datatype==MPI_DOUBLE)
127 if(datatype==MPI_INT)
129 if(datatype==MPI_CHAR)
131 if(datatype==MPI_SHORT)
133 if(datatype==MPI_LONG)
135 if(datatype==MPI_FLOAT)
137 // default - not implemented.
138 // do not warn here as we pass in this function even for other trace formats
142 #define CHECK_ACTION_PARAMS(action, mandatory, optional) {\
144 while(action[i]!=nullptr)\
147 THROWF(arg_error, 0, "%s replay failed.\n" \
148 "%d items were given on the line. First two should be process_id and action. " \
149 "This action needs after them %d mandatory arguments, and accepts %d optional ones. \n" \
150 "Please contact the Simgrid team if support is needed", __FUNCTION__, i, mandatory, optional);\
156 static void action_init(const char *const *action)
158 XBT_DEBUG("Initialize the counters");
159 CHECK_ACTION_PARAMS(action, 0, 1)
161 MPI_DEFAULT_TYPE = MPI_DOUBLE; // default MPE datatype
163 MPI_DEFAULT_TYPE = MPI_BYTE; // default TAU datatype
165 /* start a simulated timer */
166 smpi_process()->simulated_start();
167 /*initialize the number of active processes */
168 active_processes = smpi_process_count();
170 set_reqq_self(new std::vector<MPI_Request>);
173 static void action_finalize(const char *const *action)
178 static void action_comm_size(const char *const *action)
180 communicator_size = parse_double(action[2]);
181 log_timed_action (action, smpi_process()->simulated_elapsed());
184 static void action_comm_split(const char *const *action)
186 log_timed_action (action, smpi_process()->simulated_elapsed());
189 static void action_comm_dup(const char *const *action)
191 log_timed_action (action, smpi_process()->simulated_elapsed());
194 static void action_compute(const char *const *action)
196 CHECK_ACTION_PARAMS(action, 1, 0)
197 double clock = smpi_process()->simulated_elapsed();
198 double flops= parse_double(action[2]);
199 int rank = smpi_process()->index();
201 TRACE_smpi_computing_in(rank, flops);
202 smpi_execute_flops(flops);
203 TRACE_smpi_computing_out(rank);
205 log_timed_action (action, clock);
208 static void action_send(const char *const *action)
210 CHECK_ACTION_PARAMS(action, 2, 1)
211 int to = atoi(action[2]);
212 double size=parse_double(action[3]);
213 double clock = smpi_process()->simulated_elapsed();
216 MPI_CURRENT_TYPE=decode_datatype(action[4]);
218 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
220 int rank = smpi_process()->index();
221 int dst_traced = MPI_COMM_WORLD->group()->rank(to);
223 TRACE_smpi_comm_in(rank, __FUNCTION__,
224 new simgrid::instr::Pt2PtTIData("send", dst_traced, size, encode_datatype(MPI_CURRENT_TYPE)));
225 if (not TRACE_smpi_view_internals())
226 TRACE_smpi_send(rank, rank, dst_traced, 0, size*MPI_CURRENT_TYPE->size());
228 Request::send(nullptr, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
230 TRACE_smpi_comm_out(rank);
232 log_timed_action(action, clock);
235 static void action_Isend(const char *const *action)
237 CHECK_ACTION_PARAMS(action, 2, 1)
238 int to = atoi(action[2]);
239 double size=parse_double(action[3]);
240 double clock = smpi_process()->simulated_elapsed();
243 MPI_CURRENT_TYPE=decode_datatype(action[4]);
245 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
247 int rank = smpi_process()->index();
248 int dst_traced = MPI_COMM_WORLD->group()->rank(to);
249 TRACE_smpi_comm_in(rank, __FUNCTION__,
250 new simgrid::instr::Pt2PtTIData("Isend", dst_traced, size, encode_datatype(MPI_CURRENT_TYPE)));
251 if (not TRACE_smpi_view_internals())
252 TRACE_smpi_send(rank, rank, dst_traced, 0, size*MPI_CURRENT_TYPE->size());
254 MPI_Request request = Request::isend(nullptr, size, MPI_CURRENT_TYPE, to, 0, MPI_COMM_WORLD);
256 TRACE_smpi_comm_out(rank);
258 get_reqq_self()->push_back(request);
260 log_timed_action (action, clock);
263 static void action_recv(const char *const *action) {
264 CHECK_ACTION_PARAMS(action, 2, 1)
265 int from = atoi(action[2]);
266 double size=parse_double(action[3]);
267 double clock = smpi_process()->simulated_elapsed();
271 MPI_CURRENT_TYPE=decode_datatype(action[4]);
273 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
275 int rank = smpi_process()->index();
276 int src_traced = MPI_COMM_WORLD->group()->rank(from);
278 TRACE_smpi_comm_in(rank, __FUNCTION__,
279 new simgrid::instr::Pt2PtTIData("recv", src_traced, size, encode_datatype(MPI_CURRENT_TYPE)));
281 //unknown size from the receiver point of view
283 Request::probe(from, 0, MPI_COMM_WORLD, &status);
287 Request::recv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
289 TRACE_smpi_comm_out(rank);
290 if (not TRACE_smpi_view_internals()) {
291 TRACE_smpi_recv(src_traced, rank, 0);
294 log_timed_action (action, clock);
297 static void action_Irecv(const char *const *action)
299 CHECK_ACTION_PARAMS(action, 2, 1)
300 int from = atoi(action[2]);
301 double size=parse_double(action[3]);
302 double clock = smpi_process()->simulated_elapsed();
305 MPI_CURRENT_TYPE=decode_datatype(action[4]);
307 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
309 int rank = smpi_process()->index();
310 int src_traced = MPI_COMM_WORLD->group()->rank(from);
311 TRACE_smpi_comm_in(rank, __FUNCTION__,
312 new simgrid::instr::Pt2PtTIData("Irecv", src_traced, size, encode_datatype(MPI_CURRENT_TYPE)));
314 //unknow size from the receiver pov
316 Request::probe(from, 0, MPI_COMM_WORLD, &status);
320 MPI_Request request = Request::irecv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
322 TRACE_smpi_comm_out(rank);
323 get_reqq_self()->push_back(request);
325 log_timed_action (action, clock);
328 static void action_test(const char* const* action)
330 CHECK_ACTION_PARAMS(action, 0, 0)
331 double clock = smpi_process()->simulated_elapsed();
334 MPI_Request request = get_reqq_self()->back();
335 get_reqq_self()->pop_back();
336 //if request is null here, this may mean that a previous test has succeeded
337 //Different times in traced application and replayed version may lead to this
338 //In this case, ignore the extra calls.
339 if(request!=nullptr){
340 int rank = smpi_process()->index();
341 TRACE_smpi_testing_in(rank);
343 int flag = Request::test(&request, &status);
345 XBT_DEBUG("MPI_Test result: %d", flag);
346 /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now nullptr.*/
347 get_reqq_self()->push_back(request);
349 TRACE_smpi_testing_out(rank);
351 log_timed_action (action, clock);
354 static void action_wait(const char *const *action){
355 CHECK_ACTION_PARAMS(action, 0, 0)
356 double clock = smpi_process()->simulated_elapsed();
359 xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s",
360 xbt_str_join_array(action," "));
361 MPI_Request request = get_reqq_self()->back();
362 get_reqq_self()->pop_back();
364 if (request==nullptr){
365 /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just return.*/
369 int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
371 MPI_Group group = request->comm()->group();
372 int src_traced = group->rank(request->src());
373 int dst_traced = group->rank(request->dst());
374 int is_wait_for_receive = (request->flags() & RECV);
375 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::NoOpTIData("wait"));
377 Request::wait(&request, &status);
379 TRACE_smpi_comm_out(rank);
380 if (is_wait_for_receive)
381 TRACE_smpi_recv(src_traced, dst_traced, 0);
382 log_timed_action (action, clock);
385 static void action_waitall(const char *const *action){
386 CHECK_ACTION_PARAMS(action, 0, 0)
387 double clock = smpi_process()->simulated_elapsed();
388 const unsigned int count_requests = get_reqq_self()->size();
390 if (count_requests>0) {
391 MPI_Status status[count_requests];
393 int rank_traced = smpi_process()->index();
394 TRACE_smpi_comm_in(rank_traced, __FUNCTION__, new simgrid::instr::Pt2PtTIData("waitAll", -1, count_requests, ""));
395 int recvs_snd[count_requests];
396 int recvs_rcv[count_requests];
397 for (unsigned int i = 0; i < count_requests; i++) {
398 const auto& req = (*get_reqq_self())[i];
399 if (req && (req->flags () & RECV)){
400 recvs_snd[i]=req->src();
401 recvs_rcv[i]=req->dst();
405 Request::waitall(count_requests, &(*get_reqq_self())[0], status);
407 for (unsigned i = 0; i < count_requests; i++) {
408 if (recvs_snd[i]!=-100)
409 TRACE_smpi_recv(recvs_snd[i], recvs_rcv[i],0);
411 TRACE_smpi_comm_out(rank_traced);
413 log_timed_action (action, clock);
416 static void action_barrier(const char *const *action){
417 double clock = smpi_process()->simulated_elapsed();
418 int rank = smpi_process()->index();
419 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::NoOpTIData("barrier"));
421 Colls::barrier(MPI_COMM_WORLD);
423 TRACE_smpi_comm_out(rank);
424 log_timed_action (action, clock);
427 static void action_bcast(const char *const *action)
429 CHECK_ACTION_PARAMS(action, 1, 2)
430 double size = parse_double(action[2]);
431 double clock = smpi_process()->simulated_elapsed();
433 /* Initialize MPI_CURRENT_TYPE in order to decrease the number of the checks */
434 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
437 root= atoi(action[3]);
439 MPI_CURRENT_TYPE=decode_datatype(action[4]);
442 int rank = smpi_process()->index();
443 TRACE_smpi_comm_in(rank, __FUNCTION__,
444 new simgrid::instr::CollTIData("bcast", MPI_COMM_WORLD->group()->index(root), -1.0, size, -1,
445 encode_datatype(MPI_CURRENT_TYPE), ""));
447 void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
449 Colls::bcast(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
451 TRACE_smpi_comm_out(rank);
452 log_timed_action (action, clock);
455 static void action_reduce(const char *const *action)
457 CHECK_ACTION_PARAMS(action, 2, 2)
458 double comm_size = parse_double(action[2]);
459 double comp_size = parse_double(action[3]);
460 double clock = smpi_process()->simulated_elapsed();
462 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
465 root= atoi(action[4]);
467 MPI_CURRENT_TYPE=decode_datatype(action[5]);
470 int rank = smpi_process()->index();
471 TRACE_smpi_comm_in(rank, __FUNCTION__,
472 new simgrid::instr::CollTIData("reduce", MPI_COMM_WORLD->group()->index(root), comp_size,
473 comm_size, -1, encode_datatype(MPI_CURRENT_TYPE), ""));
475 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
476 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
477 Colls::reduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
478 smpi_execute_flops(comp_size);
480 TRACE_smpi_comm_out(rank);
481 log_timed_action (action, clock);
484 static void action_allReduce(const char *const *action) {
485 CHECK_ACTION_PARAMS(action, 2, 1)
486 double comm_size = parse_double(action[2]);
487 double comp_size = parse_double(action[3]);
490 MPI_CURRENT_TYPE=decode_datatype(action[4]);
492 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
494 double clock = smpi_process()->simulated_elapsed();
495 int rank = smpi_process()->index();
496 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("allReduce", -1, comp_size, comm_size, -1,
497 encode_datatype(MPI_CURRENT_TYPE), ""));
499 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
500 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
501 Colls::allreduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
502 smpi_execute_flops(comp_size);
504 TRACE_smpi_comm_out(rank);
505 log_timed_action (action, clock);
508 static void action_allToAll(const char *const *action) {
509 CHECK_ACTION_PARAMS(action, 2, 2) //two mandatory (send and recv volumes) and two optional (corresponding datatypes)
510 double clock = smpi_process()->simulated_elapsed();
511 int comm_size = MPI_COMM_WORLD->size();
512 int send_size = parse_double(action[2]);
513 int recv_size = parse_double(action[3]);
514 MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
516 if(action[4] && action[5]) {
517 MPI_CURRENT_TYPE=decode_datatype(action[4]);
518 MPI_CURRENT_TYPE2=decode_datatype(action[5]);
521 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
523 void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* MPI_CURRENT_TYPE->size());
524 void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
526 int rank = smpi_process()->index();
527 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("allToAll", -1, -1.0, send_size, recv_size,
528 encode_datatype(MPI_CURRENT_TYPE),
529 encode_datatype(MPI_CURRENT_TYPE2)));
531 Colls::alltoall(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
533 TRACE_smpi_comm_out(rank);
534 log_timed_action (action, clock);
537 static void action_gather(const char *const *action) {
538 /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
541 1) 68 is the sendcounts
542 2) 68 is the recvcounts
543 3) 0 is the root node
544 4) 0 is the send datatype id, see decode_datatype()
545 5) 0 is the recv datatype id, see decode_datatype()
547 CHECK_ACTION_PARAMS(action, 2, 3)
548 double clock = smpi_process()->simulated_elapsed();
549 int comm_size = MPI_COMM_WORLD->size();
550 int send_size = parse_double(action[2]);
551 int recv_size = parse_double(action[3]);
552 MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
553 if(action[4] && action[5]) {
554 MPI_CURRENT_TYPE=decode_datatype(action[5]);
555 MPI_CURRENT_TYPE2=decode_datatype(action[6]);
557 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
559 void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
560 void *recv = nullptr;
563 root=atoi(action[4]);
564 int rank = MPI_COMM_WORLD->rank();
567 recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
569 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
570 encode_datatype(MPI_CURRENT_TYPE),
571 encode_datatype(MPI_CURRENT_TYPE2)));
573 Colls::gather(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
575 TRACE_smpi_comm_out(smpi_process()->index());
576 log_timed_action (action, clock);
579 static void action_scatter(const char* const* action)
581 /* The structure of the scatter action for the rank 0 (total 4 processes) is the following:
584 1) 68 is the sendcounts
585 2) 68 is the recvcounts
586 3) 0 is the root node
587 4) 0 is the send datatype id, see decode_datatype()
588 5) 0 is the recv datatype id, see decode_datatype()
590 CHECK_ACTION_PARAMS(action, 2, 3)
591 double clock = smpi_process()->simulated_elapsed();
592 int comm_size = MPI_COMM_WORLD->size();
593 int send_size = parse_double(action[2]);
594 int recv_size = parse_double(action[3]);
595 MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
596 if (action[4] && action[5]) {
597 MPI_CURRENT_TYPE = decode_datatype(action[5]);
598 MPI_CURRENT_TYPE2 = decode_datatype(action[6]);
600 MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
602 void* send = smpi_get_tmp_sendbuffer(send_size * MPI_CURRENT_TYPE->size());
603 void* recv = nullptr;
606 root = atoi(action[4]);
607 int rank = MPI_COMM_WORLD->rank();
610 recv = smpi_get_tmp_recvbuffer(recv_size * comm_size * MPI_CURRENT_TYPE2->size());
612 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("gather", root, -1.0, send_size, recv_size,
613 encode_datatype(MPI_CURRENT_TYPE),
614 encode_datatype(MPI_CURRENT_TYPE2)));
616 Colls::scatter(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
618 TRACE_smpi_comm_out(smpi_process()->index());
619 log_timed_action(action, clock);
622 static void action_gatherv(const char *const *action) {
623 /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
624 0 gather 68 68 10 10 10 0 0 0
626 1) 68 is the sendcount
627 2) 68 10 10 10 is the recvcounts
628 3) 0 is the root node
629 4) 0 is the send datatype id, see decode_datatype()
630 5) 0 is the recv datatype id, see decode_datatype()
632 double clock = smpi_process()->simulated_elapsed();
633 int comm_size = MPI_COMM_WORLD->size();
634 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
635 int send_size = parse_double(action[2]);
636 int disps[comm_size];
637 int recvcounts[comm_size];
640 MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
641 if(action[4+comm_size] && action[5+comm_size]) {
642 MPI_CURRENT_TYPE=decode_datatype(action[4+comm_size]);
643 MPI_CURRENT_TYPE2=decode_datatype(action[5+comm_size]);
645 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
647 void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
648 void *recv = nullptr;
649 for(int i=0;i<comm_size;i++) {
650 recvcounts[i] = atoi(action[i+3]);
651 recv_sum=recv_sum+recvcounts[i];
655 int root=atoi(action[3+comm_size]);
656 int rank = MPI_COMM_WORLD->rank();
659 recv = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
661 std::vector<int>* trace_recvcounts = new std::vector<int>;
662 for (int i = 0; i < comm_size; i++) // copy data to avoid bad free
663 trace_recvcounts->push_back(recvcounts[i]);
665 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData(
666 "gatherV", root, send_size, nullptr, -1, trace_recvcounts,
667 encode_datatype(MPI_CURRENT_TYPE), encode_datatype(MPI_CURRENT_TYPE2)));
669 Colls::gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts, disps, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
671 TRACE_smpi_comm_out(smpi_process()->index());
672 log_timed_action (action, clock);
675 static void action_scatterv(const char* const* action)
677 /* The structure of the scatterv action for the rank 0 (total 4 processes) is the following:
678 0 gather 68 10 10 10 68 0 0 0
680 1) 68 10 10 10 is the sendcounts
681 2) 68 is the recvcount
682 3) 0 is the root node
683 4) 0 is the send datatype id, see decode_datatype()
684 5) 0 is the recv datatype id, see decode_datatype()
686 double clock = smpi_process()->simulated_elapsed();
687 int comm_size = MPI_COMM_WORLD->size();
688 CHECK_ACTION_PARAMS(action, comm_size + 1, 2)
689 int recv_size = parse_double(action[2 + comm_size]);
690 int disps[comm_size];
691 int sendcounts[comm_size];
694 MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
695 if (action[4 + comm_size] && action[5 + comm_size]) {
696 MPI_CURRENT_TYPE = decode_datatype(action[4 + comm_size]);
697 MPI_CURRENT_TYPE2 = decode_datatype(action[5 + comm_size]);
699 MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
701 void* send = nullptr;
702 void* recv = smpi_get_tmp_recvbuffer(recv_size * MPI_CURRENT_TYPE->size());
703 for (int i = 0; i < comm_size; i++) {
704 sendcounts[i] = atoi(action[i + 2]);
705 send_sum += sendcounts[i];
709 int root = atoi(action[3 + comm_size]);
710 int rank = MPI_COMM_WORLD->rank();
713 send = smpi_get_tmp_sendbuffer(send_sum * MPI_CURRENT_TYPE2->size());
715 std::vector<int>* trace_sendcounts = new std::vector<int>;
716 for (int i = 0; i < comm_size; i++) // copy data to avoid bad free
717 trace_sendcounts->push_back(sendcounts[i]);
719 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData(
720 "gatherV", root, -1, trace_sendcounts, recv_size, nullptr,
721 encode_datatype(MPI_CURRENT_TYPE), encode_datatype(MPI_CURRENT_TYPE2)));
723 Colls::scatterv(send, sendcounts, disps, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
725 TRACE_smpi_comm_out(smpi_process()->index());
726 log_timed_action(action, clock);
729 static void action_reducescatter(const char *const *action) {
730 /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
731 0 reduceScatter 275427 275427 275427 204020 11346849 0
733 1) The first four values after the name of the action declare the recvcounts array
734 2) The value 11346849 is the amount of instructions
735 3) The last value corresponds to the datatype, see decode_datatype().
737 double clock = smpi_process()->simulated_elapsed();
738 int comm_size = MPI_COMM_WORLD->size();
739 CHECK_ACTION_PARAMS(action, comm_size+1, 1)
740 int comp_size = parse_double(action[2+comm_size]);
741 int recvcounts[comm_size];
742 int rank = smpi_process()->index();
744 std::vector<int>* trace_recvcounts = new std::vector<int>;
745 if(action[3+comm_size])
746 MPI_CURRENT_TYPE=decode_datatype(action[3+comm_size]);
748 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
750 for(int i=0;i<comm_size;i++) {
751 recvcounts[i] = atoi(action[i+2]);
752 trace_recvcounts->push_back(recvcounts[i]);
756 TRACE_smpi_comm_in(rank, __FUNCTION__,
757 new simgrid::instr::VarCollTIData("reduceScatter", -1, 0, nullptr, -1, trace_recvcounts,
758 std::to_string(comp_size), /* ugly hack to print comp_size */
759 encode_datatype(MPI_CURRENT_TYPE)));
761 void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
762 void *recvbuf = smpi_get_tmp_recvbuffer(size* MPI_CURRENT_TYPE->size());
764 Colls::reduce_scatter(sendbuf, recvbuf, recvcounts, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
765 smpi_execute_flops(comp_size);
767 TRACE_smpi_comm_out(rank);
768 log_timed_action (action, clock);
771 static void action_allgather(const char *const *action) {
772 /* The structure of the allgather action for the rank 0 (total 4 processes) is the following:
773 0 allGather 275427 275427
775 1) 275427 is the sendcount
776 2) 275427 is the recvcount
777 3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
779 double clock = smpi_process()->simulated_elapsed();
781 CHECK_ACTION_PARAMS(action, 2, 2)
782 int sendcount=atoi(action[2]);
783 int recvcount=atoi(action[3]);
785 MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
787 if(action[4] && action[5]) {
788 MPI_CURRENT_TYPE = decode_datatype(action[4]);
789 MPI_CURRENT_TYPE2 = decode_datatype(action[5]);
791 MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
793 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
794 void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* MPI_CURRENT_TYPE2->size());
796 int rank = smpi_process()->index();
798 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::CollTIData("allGather", -1, -1.0, sendcount, recvcount,
799 encode_datatype(MPI_CURRENT_TYPE),
800 encode_datatype(MPI_CURRENT_TYPE2)));
802 Colls::allgather(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
804 TRACE_smpi_comm_out(rank);
805 log_timed_action (action, clock);
808 static void action_allgatherv(const char *const *action) {
809 /* The structure of the allgatherv action for the rank 0 (total 4 processes) is the following:
810 0 allGatherV 275427 275427 275427 275427 204020
812 1) 275427 is the sendcount
813 2) The next four elements declare the recvcounts array
814 3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
816 double clock = smpi_process()->simulated_elapsed();
818 int comm_size = MPI_COMM_WORLD->size();
819 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
820 int sendcount=atoi(action[2]);
821 int recvcounts[comm_size];
822 int disps[comm_size];
824 MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
826 if(action[3+comm_size] && action[4+comm_size]) {
827 MPI_CURRENT_TYPE = decode_datatype(action[3+comm_size]);
828 MPI_CURRENT_TYPE2 = decode_datatype(action[4+comm_size]);
830 MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
832 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
834 for(int i=0;i<comm_size;i++) {
835 recvcounts[i] = atoi(action[i+3]);
836 recv_sum=recv_sum+recvcounts[i];
839 void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
841 int rank = smpi_process()->index();
843 std::vector<int>* trace_recvcounts = new std::vector<int>;
844 for (int i = 0; i < comm_size; i++) // copy data to avoid bad free
845 trace_recvcounts->push_back(recvcounts[i]);
847 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData(
848 "allGatherV", -1, sendcount, nullptr, -1, trace_recvcounts,
849 encode_datatype(MPI_CURRENT_TYPE), encode_datatype(MPI_CURRENT_TYPE2)));
851 Colls::allgatherv(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts, disps, MPI_CURRENT_TYPE2,
854 TRACE_smpi_comm_out(rank);
855 log_timed_action (action, clock);
858 static void action_allToAllv(const char *const *action) {
859 /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
860 0 allToAllV 100 1 7 10 12 100 1 70 10 5
862 1) 100 is the size of the send buffer *sizeof(int),
863 2) 1 7 10 12 is the sendcounts array
864 3) 100*sizeof(int) is the size of the receiver buffer
865 4) 1 70 10 5 is the recvcounts array
867 double clock = smpi_process()->simulated_elapsed();
869 int comm_size = MPI_COMM_WORLD->size();
870 CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
873 int sendcounts[comm_size];
874 std::vector<int>* trace_sendcounts = new std::vector<int>;
875 int recvcounts[comm_size];
876 std::vector<int>* trace_recvcounts = new std::vector<int>;
877 int senddisps[comm_size];
878 int recvdisps[comm_size];
880 MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
882 int send_buf_size=parse_double(action[2]);
883 int recv_buf_size=parse_double(action[3+comm_size]);
884 if(action[4+2*comm_size] && action[5+2*comm_size]) {
885 MPI_CURRENT_TYPE=decode_datatype(action[4+2*comm_size]);
886 MPI_CURRENT_TYPE2=decode_datatype(action[5+2*comm_size]);
889 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
891 int rank = smpi_process()->index();
892 void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* MPI_CURRENT_TYPE->size());
893 void *recvbuf = smpi_get_tmp_recvbuffer(recv_buf_size* MPI_CURRENT_TYPE2->size());
895 for(int i=0;i<comm_size;i++) {
896 sendcounts[i] = atoi(action[i+3]);
897 trace_sendcounts->push_back(sendcounts[i]);
898 send_size += sendcounts[i];
899 recvcounts[i] = atoi(action[i+4+comm_size]);
900 trace_recvcounts->push_back(recvcounts[i]);
901 recv_size += recvcounts[i];
906 TRACE_smpi_comm_in(rank, __FUNCTION__, new simgrid::instr::VarCollTIData(
907 "allToAllV", -1, send_size, trace_sendcounts, recv_size, trace_recvcounts,
908 encode_datatype(MPI_CURRENT_TYPE), encode_datatype(MPI_CURRENT_TYPE2)));
910 Colls::alltoallv(sendbuf, sendcounts, senddisps, MPI_CURRENT_TYPE,recvbuf, recvcounts, recvdisps,
911 MPI_CURRENT_TYPE, MPI_COMM_WORLD);
913 TRACE_smpi_comm_out(rank);
914 log_timed_action (action, clock);
917 }} // namespace simgrid::smpi
919 /** @brief Only initialize the replay, don't do it for real */
920 void smpi_replay_init(int* argc, char*** argv)
922 simgrid::smpi::Process::init(argc, argv);
923 smpi_process()->mark_as_initialized();
924 smpi_process()->set_replaying(true);
926 int rank = smpi_process()->index();
927 TRACE_smpi_init(rank);
928 TRACE_smpi_computing_init(rank);
929 TRACE_smpi_comm_in(rank, "smpi_replay_run_init", new simgrid::instr::NoOpTIData("init"));
930 TRACE_smpi_comm_out(rank);
931 xbt_replay_action_register("init", simgrid::smpi::action_init);
932 xbt_replay_action_register("finalize", simgrid::smpi::action_finalize);
933 xbt_replay_action_register("comm_size", simgrid::smpi::action_comm_size);
934 xbt_replay_action_register("comm_split", simgrid::smpi::action_comm_split);
935 xbt_replay_action_register("comm_dup", simgrid::smpi::action_comm_dup);
936 xbt_replay_action_register("send", simgrid::smpi::action_send);
937 xbt_replay_action_register("Isend", simgrid::smpi::action_Isend);
938 xbt_replay_action_register("recv", simgrid::smpi::action_recv);
939 xbt_replay_action_register("Irecv", simgrid::smpi::action_Irecv);
940 xbt_replay_action_register("test", simgrid::smpi::action_test);
941 xbt_replay_action_register("wait", simgrid::smpi::action_wait);
942 xbt_replay_action_register("waitAll", simgrid::smpi::action_waitall);
943 xbt_replay_action_register("barrier", simgrid::smpi::action_barrier);
944 xbt_replay_action_register("bcast", simgrid::smpi::action_bcast);
945 xbt_replay_action_register("reduce", simgrid::smpi::action_reduce);
946 xbt_replay_action_register("allReduce", simgrid::smpi::action_allReduce);
947 xbt_replay_action_register("allToAll", simgrid::smpi::action_allToAll);
948 xbt_replay_action_register("allToAllV", simgrid::smpi::action_allToAllv);
949 xbt_replay_action_register("gather", simgrid::smpi::action_gather);
950 xbt_replay_action_register("scatter", simgrid::smpi::action_scatter);
951 xbt_replay_action_register("gatherV", simgrid::smpi::action_gatherv);
952 xbt_replay_action_register("scatterV", simgrid::smpi::action_scatterv);
953 xbt_replay_action_register("allGather", simgrid::smpi::action_allgather);
954 xbt_replay_action_register("allGatherV", simgrid::smpi::action_allgatherv);
955 xbt_replay_action_register("reduceScatter", simgrid::smpi::action_reducescatter);
956 xbt_replay_action_register("compute", simgrid::smpi::action_compute);
958 //if we have a delayed start, sleep here.
960 double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
961 XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
962 smpi_execute_flops(value);
964 //UGLY: force a context switch to be sure that all MSG_processes begin initialization
965 XBT_DEBUG("Force context switch by smpi_execute_flops - Sleeping for 0.0 flops ");
966 smpi_execute_flops(0.0);
970 /** @brief actually run the replay after initialization */
971 void smpi_replay_main(int* argc, char*** argv)
973 simgrid::xbt::replay_runner(*argc, *argv);
975 /* and now, finalize everything */
976 /* One active process will stop. Decrease the counter*/
977 XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
978 if (not get_reqq_self()->empty()) {
979 unsigned int count_requests=get_reqq_self()->size();
980 MPI_Request requests[count_requests];
981 MPI_Status status[count_requests];
984 for (auto const& req : *get_reqq_self()) {
988 simgrid::smpi::Request::waitall(count_requests, requests, status);
990 delete get_reqq_self();
993 if(active_processes==0){
994 /* Last process alive speaking: end the simulated timer */
995 XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
996 xbt_free(sendbuffer);
997 xbt_free(recvbuffer);
1000 TRACE_smpi_comm_in(smpi_process()->index(), "smpi_replay_run_finalize", new simgrid::instr::NoOpTIData("finalize"));
1002 smpi_process()->finalize();
1004 TRACE_smpi_comm_out(smpi_process()->index());
1005 TRACE_smpi_finalize(smpi_process()->index());
1008 /** @brief chain a replay initialization and a replay start */
1009 void smpi_replay_run(int* argc, char*** argv)
1011 smpi_replay_init(argc, argv);
1012 smpi_replay_main(argc, argv);