1 /* Copyright (c) 2009-2017. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
15 #include <unordered_map>
18 #define KEY_SIZE (sizeof(int) * 2 + 1)
20 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
22 int communicator_size = 0;
23 static int active_processes = 0;
24 std::unordered_map<int,std::vector<MPI_Request>*> reqq;
26 MPI_Datatype MPI_DEFAULT_TYPE;
27 MPI_Datatype MPI_CURRENT_TYPE;
29 static int sendbuffer_size=0;
30 char* sendbuffer=nullptr;
31 static int recvbuffer_size=0;
32 char* recvbuffer=nullptr;
34 static void log_timed_action (const char *const *action, double clock){
35 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
36 char *name = xbt_str_join_array(action, " ");
37 XBT_VERB("%s %f", name, smpi_process()->simulated_elapsed()-clock);
42 static std::vector<MPI_Request>* get_reqq_self()
44 return reqq.at(smpi_process()->index());
47 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
49 reqq.insert({smpi_process()->index(), mpi_request});
52 //allocate a single buffer for all sends, growing it if needed
53 void* smpi_get_tmp_sendbuffer(int size)
55 if (not smpi_process()->replaying())
56 return xbt_malloc(size);
57 if (sendbuffer_size<size){
58 sendbuffer=static_cast<char*>(xbt_realloc(sendbuffer,size));
64 //allocate a single buffer for all recv
65 void* smpi_get_tmp_recvbuffer(int size){
66 if (not smpi_process()->replaying())
67 return xbt_malloc(size);
68 if (recvbuffer_size<size){
69 recvbuffer=static_cast<char*>(xbt_realloc(recvbuffer,size));
75 void smpi_free_tmp_buffer(void* buf){
76 if (not smpi_process()->replaying())
81 static double parse_double(const char *string)
84 double value = strtod(string, &endptr);
86 THROWF(unknown_error, 0, "%s is not a double", string);
90 static MPI_Datatype decode_datatype(const char *const action)
92 switch(atoi(action)) {
94 MPI_CURRENT_TYPE=MPI_DOUBLE;
97 MPI_CURRENT_TYPE=MPI_INT;
100 MPI_CURRENT_TYPE=MPI_CHAR;
103 MPI_CURRENT_TYPE=MPI_SHORT;
106 MPI_CURRENT_TYPE=MPI_LONG;
109 MPI_CURRENT_TYPE=MPI_FLOAT;
112 MPI_CURRENT_TYPE=MPI_BYTE;
115 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
118 return MPI_CURRENT_TYPE;
121 const char* encode_datatype(MPI_Datatype datatype)
123 if (datatype==MPI_BYTE)
125 if(datatype==MPI_DOUBLE)
127 if(datatype==MPI_INT)
129 if(datatype==MPI_CHAR)
131 if(datatype==MPI_SHORT)
133 if(datatype==MPI_LONG)
135 if(datatype==MPI_FLOAT)
137 // default - not implemented.
138 // do not warn here as we pass in this function even for other trace formats
142 #define CHECK_ACTION_PARAMS(action, mandatory, optional) {\
144 while(action[i]!=nullptr)\
147 THROWF(arg_error, 0, "%s replay failed.\n" \
148 "%d items were given on the line. First two should be process_id and action. " \
149 "This action needs after them %d mandatory arguments, and accepts %d optional ones. \n" \
150 "Please contact the Simgrid team if support is needed", __FUNCTION__, i, mandatory, optional);\
156 static void action_init(const char *const *action)
158 XBT_DEBUG("Initialize the counters");
159 CHECK_ACTION_PARAMS(action, 0, 1)
161 MPI_DEFAULT_TYPE = MPI_DOUBLE; // default MPE datatype
163 MPI_DEFAULT_TYPE = MPI_BYTE; // default TAU datatype
165 /* start a simulated timer */
166 smpi_process()->simulated_start();
167 /*initialize the number of active processes */
168 active_processes = smpi_process_count();
170 set_reqq_self(new std::vector<MPI_Request>);
173 static void action_finalize(const char *const *action)
178 static void action_comm_size(const char *const *action)
180 communicator_size = parse_double(action[2]);
181 log_timed_action (action, smpi_process()->simulated_elapsed());
184 static void action_comm_split(const char *const *action)
186 log_timed_action (action, smpi_process()->simulated_elapsed());
189 static void action_comm_dup(const char *const *action)
191 log_timed_action (action, smpi_process()->simulated_elapsed());
194 static void action_compute(const char *const *action)
196 CHECK_ACTION_PARAMS(action, 1, 0)
197 double clock = smpi_process()->simulated_elapsed();
198 double flops= parse_double(action[2]);
199 int rank = smpi_process()->index();
200 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
201 extra->type=TRACING_COMPUTING;
202 extra->comp_size=flops;
203 TRACE_smpi_computing_in(rank, extra);
205 smpi_execute_flops(flops);
207 TRACE_smpi_computing_out(rank);
208 log_timed_action (action, clock);
211 static void action_send(const char *const *action)
213 CHECK_ACTION_PARAMS(action, 2, 1)
214 int to = atoi(action[2]);
215 double size=parse_double(action[3]);
216 double clock = smpi_process()->simulated_elapsed();
219 MPI_CURRENT_TYPE=decode_datatype(action[4]);
221 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
223 int rank = smpi_process()->index();
225 int dst_traced = MPI_COMM_WORLD->group()->rank(to);
226 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
227 extra->type = TRACING_SEND;
228 extra->send_size = size;
230 extra->dst = dst_traced;
231 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
232 TRACE_smpi_comm_in(rank, __FUNCTION__, extra);
233 if (not TRACE_smpi_view_internals())
234 TRACE_smpi_send(rank, rank, dst_traced, 0, size*MPI_CURRENT_TYPE->size());
236 Request::send(nullptr, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
238 log_timed_action (action, clock);
240 TRACE_smpi_comm_out(rank);
243 static void action_Isend(const char *const *action)
245 CHECK_ACTION_PARAMS(action, 2, 1)
246 int to = atoi(action[2]);
247 double size=parse_double(action[3]);
248 double clock = smpi_process()->simulated_elapsed();
251 MPI_CURRENT_TYPE=decode_datatype(action[4]);
253 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
255 int rank = smpi_process()->index();
256 int dst_traced = MPI_COMM_WORLD->group()->rank(to);
257 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
258 extra->type = TRACING_ISEND;
259 extra->send_size = size;
261 extra->dst = dst_traced;
262 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
263 TRACE_smpi_comm_in(rank, __FUNCTION__, extra);
264 if (not TRACE_smpi_view_internals())
265 TRACE_smpi_send(rank, rank, dst_traced, 0, size*MPI_CURRENT_TYPE->size());
267 MPI_Request request = Request::isend(nullptr, size, MPI_CURRENT_TYPE, to, 0, MPI_COMM_WORLD);
269 TRACE_smpi_comm_out(rank);
271 get_reqq_self()->push_back(request);
273 log_timed_action (action, clock);
276 static void action_recv(const char *const *action) {
277 CHECK_ACTION_PARAMS(action, 2, 1)
278 int from = atoi(action[2]);
279 double size=parse_double(action[3]);
280 double clock = smpi_process()->simulated_elapsed();
284 MPI_CURRENT_TYPE=decode_datatype(action[4]);
286 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
288 int rank = smpi_process()->index();
289 int src_traced = MPI_COMM_WORLD->group()->rank(from);
291 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
292 extra->type = TRACING_RECV;
293 extra->send_size = size;
294 extra->src = src_traced;
296 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
297 TRACE_smpi_comm_in(rank, __FUNCTION__, extra);
299 //unknown size from the receiver point of view
301 Request::probe(from, 0, MPI_COMM_WORLD, &status);
305 Request::recv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
307 TRACE_smpi_comm_out(rank);
308 if (not TRACE_smpi_view_internals()) {
309 TRACE_smpi_recv(src_traced, rank, 0);
312 log_timed_action (action, clock);
315 static void action_Irecv(const char *const *action)
317 CHECK_ACTION_PARAMS(action, 2, 1)
318 int from = atoi(action[2]);
319 double size=parse_double(action[3]);
320 double clock = smpi_process()->simulated_elapsed();
323 MPI_CURRENT_TYPE=decode_datatype(action[4]);
325 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
327 int rank = smpi_process()->index();
328 int src_traced = MPI_COMM_WORLD->group()->rank(from);
329 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
330 extra->type = TRACING_IRECV;
331 extra->send_size = size;
332 extra->src = src_traced;
334 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
335 TRACE_smpi_comm_in(rank, __FUNCTION__, extra);
337 //unknow size from the receiver pov
339 Request::probe(from, 0, MPI_COMM_WORLD, &status);
343 MPI_Request request = Request::irecv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
345 TRACE_smpi_comm_out(rank);
346 get_reqq_self()->push_back(request);
348 log_timed_action (action, clock);
351 static void action_test(const char *const *action){
352 CHECK_ACTION_PARAMS(action, 0, 0)
353 double clock = smpi_process()->simulated_elapsed();
356 MPI_Request request = get_reqq_self()->back();
357 get_reqq_self()->pop_back();
358 //if request is null here, this may mean that a previous test has succeeded
359 //Different times in traced application and replayed version may lead to this
360 //In this case, ignore the extra calls.
361 if(request!=nullptr){
362 int rank = smpi_process()->index();
363 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
364 extra->type=TRACING_TEST;
365 TRACE_smpi_testing_in(rank, extra);
367 int flag = Request::test(&request, &status);
369 XBT_DEBUG("MPI_Test result: %d", flag);
370 /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now nullptr.*/
371 get_reqq_self()->push_back(request);
373 TRACE_smpi_testing_out(rank);
375 log_timed_action (action, clock);
378 static void action_wait(const char *const *action){
379 CHECK_ACTION_PARAMS(action, 0, 0)
380 double clock = smpi_process()->simulated_elapsed();
383 xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s",
384 xbt_str_join_array(action," "));
385 MPI_Request request = get_reqq_self()->back();
386 get_reqq_self()->pop_back();
388 if (request==nullptr){
389 /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just return.*/
393 int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
395 MPI_Group group = request->comm()->group();
396 int src_traced = group->rank(request->src());
397 int dst_traced = group->rank(request->dst());
398 int is_wait_for_receive = (request->flags() & RECV);
399 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
400 extra->type = TRACING_WAIT;
401 TRACE_smpi_comm_in(rank, __FUNCTION__, extra);
403 Request::wait(&request, &status);
405 TRACE_smpi_comm_out(rank);
406 if (is_wait_for_receive)
407 TRACE_smpi_recv(src_traced, dst_traced, 0);
408 log_timed_action (action, clock);
411 static void action_waitall(const char *const *action){
412 CHECK_ACTION_PARAMS(action, 0, 0)
413 double clock = smpi_process()->simulated_elapsed();
414 const unsigned int count_requests = get_reqq_self()->size();
416 if (count_requests>0) {
417 MPI_Status status[count_requests];
419 int rank_traced = smpi_process()->index();
420 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
421 extra->type = TRACING_WAITALL;
422 extra->send_size=count_requests;
423 TRACE_smpi_comm_in(rank_traced, __FUNCTION__, extra);
424 int recvs_snd[count_requests];
425 int recvs_rcv[count_requests];
426 for (unsigned int i = 0; i < count_requests; i++) {
427 const auto& req = (*get_reqq_self())[i];
428 if (req && (req->flags () & RECV)){
429 recvs_snd[i]=req->src();
430 recvs_rcv[i]=req->dst();
434 Request::waitall(count_requests, &(*get_reqq_self())[0], status);
436 for (unsigned i = 0; i < count_requests; i++) {
437 if (recvs_snd[i]!=-100)
438 TRACE_smpi_recv(recvs_snd[i], recvs_rcv[i],0);
440 TRACE_smpi_comm_out(rank_traced);
442 log_timed_action (action, clock);
445 static void action_barrier(const char *const *action){
446 double clock = smpi_process()->simulated_elapsed();
447 int rank = smpi_process()->index();
448 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
449 extra->type = TRACING_BARRIER;
450 TRACE_smpi_comm_in(rank, __FUNCTION__, extra);
452 Colls::barrier(MPI_COMM_WORLD);
454 TRACE_smpi_comm_out(rank);
455 log_timed_action (action, clock);
458 static void action_bcast(const char *const *action)
460 CHECK_ACTION_PARAMS(action, 1, 2)
461 double size = parse_double(action[2]);
462 double clock = smpi_process()->simulated_elapsed();
464 /* Initialize MPI_CURRENT_TYPE in order to decrease the number of the checks */
465 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
468 root= atoi(action[3]);
470 MPI_CURRENT_TYPE=decode_datatype(action[4]);
473 int rank = smpi_process()->index();
474 int root_traced = MPI_COMM_WORLD->group()->index(root);
476 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
477 extra->type = TRACING_BCAST;
478 extra->send_size = size;
479 extra->root = root_traced;
480 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
481 TRACE_smpi_comm_in(rank, __FUNCTION__, extra);
482 void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
484 Colls::bcast(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
486 TRACE_smpi_comm_out(rank);
487 log_timed_action (action, clock);
490 static void action_reduce(const char *const *action)
492 CHECK_ACTION_PARAMS(action, 2, 2)
493 double comm_size = parse_double(action[2]);
494 double comp_size = parse_double(action[3]);
495 double clock = smpi_process()->simulated_elapsed();
497 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
500 root= atoi(action[4]);
502 MPI_CURRENT_TYPE=decode_datatype(action[5]);
505 int rank = smpi_process()->index();
506 int root_traced = MPI_COMM_WORLD->group()->rank(root);
507 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
508 extra->type = TRACING_REDUCE;
509 extra->send_size = comm_size;
510 extra->comp_size = comp_size;
511 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
512 extra->root = root_traced;
514 TRACE_smpi_comm_in(rank, __FUNCTION__, extra);
516 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
517 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
518 Colls::reduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
519 smpi_execute_flops(comp_size);
521 TRACE_smpi_comm_out(rank);
522 log_timed_action (action, clock);
525 static void action_allReduce(const char *const *action) {
526 CHECK_ACTION_PARAMS(action, 2, 1)
527 double comm_size = parse_double(action[2]);
528 double comp_size = parse_double(action[3]);
531 MPI_CURRENT_TYPE=decode_datatype(action[4]);
533 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
535 double clock = smpi_process()->simulated_elapsed();
536 int rank = smpi_process()->index();
537 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
538 extra->type = TRACING_ALLREDUCE;
539 extra->send_size = comm_size;
540 extra->comp_size = comp_size;
541 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
542 TRACE_smpi_comm_in(rank, __FUNCTION__, extra);
544 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
545 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
546 Colls::allreduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
547 smpi_execute_flops(comp_size);
549 TRACE_smpi_comm_out(rank);
550 log_timed_action (action, clock);
553 static void action_allToAll(const char *const *action) {
554 CHECK_ACTION_PARAMS(action, 2, 2) //two mandatory (send and recv volumes) and two optional (corresponding datatypes)
555 double clock = smpi_process()->simulated_elapsed();
556 int comm_size = MPI_COMM_WORLD->size();
557 int send_size = parse_double(action[2]);
558 int recv_size = parse_double(action[3]);
559 MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
561 if(action[4] && action[5]) {
562 MPI_CURRENT_TYPE=decode_datatype(action[4]);
563 MPI_CURRENT_TYPE2=decode_datatype(action[5]);
566 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
568 void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* MPI_CURRENT_TYPE->size());
569 void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
571 int rank = smpi_process()->index();
572 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
573 extra->type = TRACING_ALLTOALL;
574 extra->send_size = send_size;
575 extra->recv_size = recv_size;
576 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
577 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
579 TRACE_smpi_comm_in(rank, __FUNCTION__, extra);
581 Colls::alltoall(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
583 TRACE_smpi_comm_out(rank);
584 log_timed_action (action, clock);
587 static void action_gather(const char *const *action) {
588 /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
591 1) 68 is the sendcounts
592 2) 68 is the recvcounts
593 3) 0 is the root node
594 4) 0 is the send datatype id, see decode_datatype()
595 5) 0 is the recv datatype id, see decode_datatype()
597 CHECK_ACTION_PARAMS(action, 2, 3)
598 double clock = smpi_process()->simulated_elapsed();
599 int comm_size = MPI_COMM_WORLD->size();
600 int send_size = parse_double(action[2]);
601 int recv_size = parse_double(action[3]);
602 MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
603 if(action[4] && action[5]) {
604 MPI_CURRENT_TYPE=decode_datatype(action[5]);
605 MPI_CURRENT_TYPE2=decode_datatype(action[6]);
607 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
609 void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
610 void *recv = nullptr;
613 root=atoi(action[4]);
614 int rank = MPI_COMM_WORLD->rank();
617 recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
619 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
620 extra->type = TRACING_GATHER;
621 extra->send_size = send_size;
622 extra->recv_size = recv_size;
624 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
625 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
627 TRACE_smpi_comm_in(smpi_process()->index(), __FUNCTION__, extra);
629 Colls::gather(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
631 TRACE_smpi_comm_out(smpi_process()->index());
632 log_timed_action (action, clock);
635 static void action_gatherv(const char *const *action) {
636 /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
637 0 gather 68 68 10 10 10 0 0 0
639 1) 68 is the sendcount
640 2) 68 10 10 10 is the recvcounts
641 3) 0 is the root node
642 4) 0 is the send datatype id, see decode_datatype()
643 5) 0 is the recv datatype id, see decode_datatype()
645 double clock = smpi_process()->simulated_elapsed();
646 int comm_size = MPI_COMM_WORLD->size();
647 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
648 int send_size = parse_double(action[2]);
649 int disps[comm_size];
650 int recvcounts[comm_size];
653 MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
654 if(action[4+comm_size] && action[5+comm_size]) {
655 MPI_CURRENT_TYPE=decode_datatype(action[4+comm_size]);
656 MPI_CURRENT_TYPE2=decode_datatype(action[5+comm_size]);
658 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
660 void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
661 void *recv = nullptr;
662 for(int i=0;i<comm_size;i++) {
663 recvcounts[i] = atoi(action[i+3]);
664 recv_sum=recv_sum+recvcounts[i];
668 int root=atoi(action[3+comm_size]);
669 int rank = MPI_COMM_WORLD->rank();
672 recv = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
674 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
675 extra->type = TRACING_GATHERV;
676 extra->send_size = send_size;
677 extra->recvcounts = new int[comm_size];
678 for(int i=0; i< comm_size; i++)//copy data to avoid bad free
679 extra->recvcounts[i] = recvcounts[i];
681 extra->num_processes = comm_size;
682 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
683 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
685 TRACE_smpi_comm_in(smpi_process()->index(), __FUNCTION__, extra);
687 Colls::gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts, disps, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
689 TRACE_smpi_comm_out(smpi_process()->index());
690 log_timed_action (action, clock);
693 static void action_reducescatter(const char *const *action) {
694 /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
695 0 reduceScatter 275427 275427 275427 204020 11346849 0
697 1) The first four values after the name of the action declare the recvcounts array
698 2) The value 11346849 is the amount of instructions
699 3) The last value corresponds to the datatype, see decode_datatype().
701 double clock = smpi_process()->simulated_elapsed();
702 int comm_size = MPI_COMM_WORLD->size();
703 CHECK_ACTION_PARAMS(action, comm_size+1, 1)
704 int comp_size = parse_double(action[2+comm_size]);
705 int recvcounts[comm_size];
706 int rank = smpi_process()->index();
708 if(action[3+comm_size])
709 MPI_CURRENT_TYPE=decode_datatype(action[3+comm_size]);
711 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
713 for(int i=0;i<comm_size;i++) {
714 recvcounts[i] = atoi(action[i+2]);
718 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
719 extra->type = TRACING_REDUCE_SCATTER;
720 extra->send_size = 0;
721 extra->recvcounts = new int[comm_size];
722 for(int i=0; i< comm_size; i++)//copy data to avoid bad free
723 extra->recvcounts[i] = recvcounts[i];
724 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
725 extra->comp_size = comp_size;
726 extra->num_processes = comm_size;
728 TRACE_smpi_comm_in(rank, __FUNCTION__, extra);
730 void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
731 void *recvbuf = smpi_get_tmp_recvbuffer(size* MPI_CURRENT_TYPE->size());
733 Colls::reduce_scatter(sendbuf, recvbuf, recvcounts, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
734 smpi_execute_flops(comp_size);
736 TRACE_smpi_comm_out(rank);
737 log_timed_action (action, clock);
740 static void action_allgather(const char *const *action) {
741 /* The structure of the allgather action for the rank 0 (total 4 processes) is the following:
742 0 allGather 275427 275427
744 1) 275427 is the sendcount
745 2) 275427 is the recvcount
746 3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
748 double clock = smpi_process()->simulated_elapsed();
750 CHECK_ACTION_PARAMS(action, 2, 2)
751 int sendcount=atoi(action[2]);
752 int recvcount=atoi(action[3]);
754 MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
756 if(action[4] && action[5]) {
757 MPI_CURRENT_TYPE = decode_datatype(action[4]);
758 MPI_CURRENT_TYPE2 = decode_datatype(action[5]);
760 MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
762 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
763 void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* MPI_CURRENT_TYPE2->size());
765 int rank = smpi_process()->index();
766 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
767 extra->type = TRACING_ALLGATHER;
768 extra->send_size = sendcount;
769 extra->recv_size= recvcount;
770 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
771 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
772 extra->num_processes = MPI_COMM_WORLD->size();
774 TRACE_smpi_comm_in(rank, __FUNCTION__, extra);
776 Colls::allgather(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
778 TRACE_smpi_comm_out(rank);
779 log_timed_action (action, clock);
782 static void action_allgatherv(const char *const *action) {
783 /* The structure of the allgatherv action for the rank 0 (total 4 processes) is the following:
784 0 allGatherV 275427 275427 275427 275427 204020
786 1) 275427 is the sendcount
787 2) The next four elements declare the recvcounts array
788 3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
790 double clock = smpi_process()->simulated_elapsed();
792 int comm_size = MPI_COMM_WORLD->size();
793 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
794 int sendcount=atoi(action[2]);
795 int recvcounts[comm_size];
796 int disps[comm_size];
798 MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
800 if(action[3+comm_size] && action[4+comm_size]) {
801 MPI_CURRENT_TYPE = decode_datatype(action[3+comm_size]);
802 MPI_CURRENT_TYPE2 = decode_datatype(action[4+comm_size]);
804 MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
806 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
808 for(int i=0;i<comm_size;i++) {
809 recvcounts[i] = atoi(action[i+3]);
810 recv_sum=recv_sum+recvcounts[i];
813 void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
815 int rank = smpi_process()->index();
816 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
817 extra->type = TRACING_ALLGATHERV;
818 extra->send_size = sendcount;
819 extra->recvcounts = new int[comm_size];
820 for(int i=0; i< comm_size; i++)//copy data to avoid bad free
821 extra->recvcounts[i] = recvcounts[i];
822 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
823 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
824 extra->num_processes = comm_size;
826 TRACE_smpi_comm_in(rank, __FUNCTION__, extra);
828 Colls::allgatherv(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts, disps, MPI_CURRENT_TYPE2,
831 TRACE_smpi_comm_out(rank);
832 log_timed_action (action, clock);
835 static void action_allToAllv(const char *const *action) {
836 /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
837 0 allToAllV 100 1 7 10 12 100 1 70 10 5
839 1) 100 is the size of the send buffer *sizeof(int),
840 2) 1 7 10 12 is the sendcounts array
841 3) 100*sizeof(int) is the size of the receiver buffer
842 4) 1 70 10 5 is the recvcounts array
844 double clock = smpi_process()->simulated_elapsed();
846 int comm_size = MPI_COMM_WORLD->size();
847 CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
848 int sendcounts[comm_size];
849 int recvcounts[comm_size];
850 int senddisps[comm_size];
851 int recvdisps[comm_size];
853 MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
855 int send_buf_size=parse_double(action[2]);
856 int recv_buf_size=parse_double(action[3+comm_size]);
857 if(action[4+2*comm_size] && action[5+2*comm_size]) {
858 MPI_CURRENT_TYPE=decode_datatype(action[4+2*comm_size]);
859 MPI_CURRENT_TYPE2=decode_datatype(action[5+2*comm_size]);
862 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
864 void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* MPI_CURRENT_TYPE->size());
865 void *recvbuf = smpi_get_tmp_recvbuffer(recv_buf_size* MPI_CURRENT_TYPE2->size());
867 for(int i=0;i<comm_size;i++) {
868 sendcounts[i] = atoi(action[i+3]);
869 recvcounts[i] = atoi(action[i+4+comm_size]);
874 int rank = smpi_process()->index();
875 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
876 extra->type = TRACING_ALLTOALLV;
877 extra->recvcounts = new int[comm_size];
878 extra->sendcounts = new int[comm_size];
879 extra->num_processes = comm_size;
881 for(int i=0; i< comm_size; i++){//copy data to avoid bad free
882 extra->send_size += sendcounts[i];
883 extra->sendcounts[i] = sendcounts[i];
884 extra->recv_size += recvcounts[i];
885 extra->recvcounts[i] = recvcounts[i];
887 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE);
888 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2);
890 TRACE_smpi_comm_in(rank, __FUNCTION__, extra);
892 Colls::alltoallv(sendbuf, sendcounts, senddisps, MPI_CURRENT_TYPE,recvbuf, recvcounts, recvdisps,
893 MPI_CURRENT_TYPE, MPI_COMM_WORLD);
895 TRACE_smpi_comm_out(rank);
896 log_timed_action (action, clock);
899 }} // namespace simgrid::smpi
901 /** @brief Only initialize the replay, don't do it for real */
902 void smpi_replay_init(int* argc, char*** argv)
904 simgrid::smpi::Process::init(argc, argv);
905 smpi_process()->mark_as_initialized();
906 smpi_process()->set_replaying(true);
908 int rank = smpi_process()->index();
909 TRACE_smpi_init(rank);
910 TRACE_smpi_computing_init(rank);
911 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
912 extra->type = TRACING_INIT;
913 TRACE_smpi_comm_in(rank, "smpi_replay_run_init", extra);
914 TRACE_smpi_comm_out(rank);
915 xbt_replay_action_register("init", simgrid::smpi::action_init);
916 xbt_replay_action_register("finalize", simgrid::smpi::action_finalize);
917 xbt_replay_action_register("comm_size", simgrid::smpi::action_comm_size);
918 xbt_replay_action_register("comm_split", simgrid::smpi::action_comm_split);
919 xbt_replay_action_register("comm_dup", simgrid::smpi::action_comm_dup);
920 xbt_replay_action_register("send", simgrid::smpi::action_send);
921 xbt_replay_action_register("Isend", simgrid::smpi::action_Isend);
922 xbt_replay_action_register("recv", simgrid::smpi::action_recv);
923 xbt_replay_action_register("Irecv", simgrid::smpi::action_Irecv);
924 xbt_replay_action_register("test", simgrid::smpi::action_test);
925 xbt_replay_action_register("wait", simgrid::smpi::action_wait);
926 xbt_replay_action_register("waitAll", simgrid::smpi::action_waitall);
927 xbt_replay_action_register("barrier", simgrid::smpi::action_barrier);
928 xbt_replay_action_register("bcast", simgrid::smpi::action_bcast);
929 xbt_replay_action_register("reduce", simgrid::smpi::action_reduce);
930 xbt_replay_action_register("allReduce", simgrid::smpi::action_allReduce);
931 xbt_replay_action_register("allToAll", simgrid::smpi::action_allToAll);
932 xbt_replay_action_register("allToAllV", simgrid::smpi::action_allToAllv);
933 xbt_replay_action_register("gather", simgrid::smpi::action_gather);
934 xbt_replay_action_register("gatherV", simgrid::smpi::action_gatherv);
935 xbt_replay_action_register("allGather", simgrid::smpi::action_allgather);
936 xbt_replay_action_register("allGatherV", simgrid::smpi::action_allgatherv);
937 xbt_replay_action_register("reduceScatter", simgrid::smpi::action_reducescatter);
938 xbt_replay_action_register("compute", simgrid::smpi::action_compute);
940 //if we have a delayed start, sleep here.
942 double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
943 XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
944 smpi_execute_flops(value);
946 //UGLY: force a context switch to be sure that all MSG_processes begin initialization
947 XBT_DEBUG("Force context switch by smpi_execute_flops - Sleeping for 0.0 flops ");
948 smpi_execute_flops(0.0);
952 /** @brief actually run the replay after initialization */
953 void smpi_replay_main(int* argc, char*** argv)
955 simgrid::xbt::replay_runner(*argc, *argv);
957 /* and now, finalize everything */
958 /* One active process will stop. Decrease the counter*/
959 XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
960 if (not get_reqq_self()->empty()) {
961 unsigned int count_requests=get_reqq_self()->size();
962 MPI_Request requests[count_requests];
963 MPI_Status status[count_requests];
966 for (auto const& req : *get_reqq_self()) {
970 simgrid::smpi::Request::waitall(count_requests, requests, status);
972 delete get_reqq_self();
975 if(active_processes==0){
976 /* Last process alive speaking: end the simulated timer */
977 XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
978 xbt_free(sendbuffer);
979 xbt_free(recvbuffer);
982 instr_extra_data extra_fin = xbt_new0(s_instr_extra_data_t,1);
983 extra_fin->type = TRACING_FINALIZE;
984 TRACE_smpi_comm_in(smpi_process()->index(), "smpi_replay_run_finalize", extra_fin);
986 smpi_process()->finalize();
988 TRACE_smpi_comm_out(smpi_process()->index());
989 TRACE_smpi_finalize(smpi_process()->index());
992 /** @brief chain a replay initialization and a replay start */
993 void smpi_replay_run(int* argc, char*** argv)
995 smpi_replay_init(argc, argv);
996 smpi_replay_main(argc, argv);