1 /* Copyright (c) 2009-2017. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
7 #include "smpi_coll.hpp"
8 #include "smpi_comm.hpp"
9 #include "smpi_datatype.hpp"
10 #include "smpi_group.hpp"
11 #include "smpi_process.hpp"
12 #include "smpi_request.hpp"
13 #include "xbt/replay.hpp"
15 #include <unordered_map>
18 #define KEY_SIZE (sizeof(int) * 2 + 1)
20 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_replay,smpi,"Trace Replay with SMPI");
22 int communicator_size = 0;
23 static int active_processes = 0;
24 std::unordered_map<int,std::vector<MPI_Request>*> reqq;
26 MPI_Datatype MPI_DEFAULT_TYPE;
27 MPI_Datatype MPI_CURRENT_TYPE;
29 static int sendbuffer_size=0;
30 char* sendbuffer=nullptr;
31 static int recvbuffer_size=0;
32 char* recvbuffer=nullptr;
34 static void log_timed_action (const char *const *action, double clock){
35 if (XBT_LOG_ISENABLED(smpi_replay, xbt_log_priority_verbose)){
36 char *name = xbt_str_join_array(action, " ");
37 XBT_VERB("%s %f", name, smpi_process()->simulated_elapsed()-clock);
42 static std::vector<MPI_Request>* get_reqq_self()
44 return reqq.at(smpi_process()->index());
47 static void set_reqq_self(std::vector<MPI_Request> *mpi_request)
49 reqq.insert({smpi_process()->index(), mpi_request});
52 //allocate a single buffer for all sends, growing it if needed
53 void* smpi_get_tmp_sendbuffer(int size)
55 if (not smpi_process()->replaying())
56 return xbt_malloc(size);
57 if (sendbuffer_size<size){
58 sendbuffer=static_cast<char*>(xbt_realloc(sendbuffer,size));
64 //allocate a single buffer for all recv
65 void* smpi_get_tmp_recvbuffer(int size){
66 if (not smpi_process()->replaying())
67 return xbt_malloc(size);
68 if (recvbuffer_size<size){
69 recvbuffer=static_cast<char*>(xbt_realloc(recvbuffer,size));
75 void smpi_free_tmp_buffer(void* buf){
76 if (not smpi_process()->replaying())
81 static double parse_double(const char *string)
84 double value = strtod(string, &endptr);
86 THROWF(unknown_error, 0, "%s is not a double", string);
90 static MPI_Datatype decode_datatype(const char *const action)
92 switch(atoi(action)) {
94 MPI_CURRENT_TYPE=MPI_DOUBLE;
97 MPI_CURRENT_TYPE=MPI_INT;
100 MPI_CURRENT_TYPE=MPI_CHAR;
103 MPI_CURRENT_TYPE=MPI_SHORT;
106 MPI_CURRENT_TYPE=MPI_LONG;
109 MPI_CURRENT_TYPE=MPI_FLOAT;
112 MPI_CURRENT_TYPE=MPI_BYTE;
115 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
118 return MPI_CURRENT_TYPE;
121 const char* encode_datatype(MPI_Datatype datatype, int* known)
123 //default type for output is set to MPI_BYTE
124 // MPI_DEFAULT_TYPE is not set for output, use directly MPI_BYTE
127 if (datatype==MPI_BYTE)
129 if(datatype==MPI_DOUBLE)
131 if(datatype==MPI_INT)
133 if(datatype==MPI_CHAR)
135 if(datatype==MPI_SHORT)
137 if(datatype==MPI_LONG)
139 if(datatype==MPI_FLOAT)
141 //tell that the datatype is not handled by replay, and that its size should be measured and replayed as size*MPI_BYTE
144 // default - not implemented.
145 // do not warn here as we pass in this function even for other trace formats
149 #define CHECK_ACTION_PARAMS(action, mandatory, optional) {\
151 while(action[i]!=nullptr)\
154 THROWF(arg_error, 0, "%s replay failed.\n" \
155 "%d items were given on the line. First two should be process_id and action. " \
156 "This action needs after them %d mandatory arguments, and accepts %d optional ones. \n" \
157 "Please contact the Simgrid team if support is needed", __FUNCTION__, i, mandatory, optional);\
163 static void action_init(const char *const *action)
165 XBT_DEBUG("Initialize the counters");
166 CHECK_ACTION_PARAMS(action, 0, 1)
168 MPI_DEFAULT_TYPE=MPI_DOUBLE; // default MPE dataype
169 else MPI_DEFAULT_TYPE= MPI_BYTE; // default TAU datatype
171 /* start a simulated timer */
172 smpi_process()->simulated_start();
173 /*initialize the number of active processes */
174 active_processes = smpi_process_count();
176 set_reqq_self(new std::vector<MPI_Request>);
179 static void action_finalize(const char *const *action)
184 static void action_comm_size(const char *const *action)
186 communicator_size = parse_double(action[2]);
187 log_timed_action (action, smpi_process()->simulated_elapsed());
190 static void action_comm_split(const char *const *action)
192 log_timed_action (action, smpi_process()->simulated_elapsed());
195 static void action_comm_dup(const char *const *action)
197 log_timed_action (action, smpi_process()->simulated_elapsed());
200 static void action_compute(const char *const *action)
202 CHECK_ACTION_PARAMS(action, 1, 0)
203 double clock = smpi_process()->simulated_elapsed();
204 double flops= parse_double(action[2]);
205 int rank = smpi_process()->index();
206 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
207 extra->type=TRACING_COMPUTING;
208 extra->comp_size=flops;
209 TRACE_smpi_computing_in(rank, extra);
211 smpi_execute_flops(flops);
213 TRACE_smpi_computing_out(rank);
214 log_timed_action (action, clock);
217 static void action_send(const char *const *action)
219 CHECK_ACTION_PARAMS(action, 2, 1)
220 int to = atoi(action[2]);
221 double size=parse_double(action[3]);
222 double clock = smpi_process()->simulated_elapsed();
225 MPI_CURRENT_TYPE=decode_datatype(action[4]);
227 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
229 int rank = smpi_process()->index();
231 int dst_traced = MPI_COMM_WORLD->group()->rank(to);
232 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
233 extra->type = TRACING_SEND;
234 extra->send_size = size;
236 extra->dst = dst_traced;
237 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
238 TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
239 if (not TRACE_smpi_view_internals())
240 TRACE_smpi_send(rank, rank, dst_traced, 0, size*MPI_CURRENT_TYPE->size());
242 Request::send(nullptr, size, MPI_CURRENT_TYPE, to , 0, MPI_COMM_WORLD);
244 log_timed_action (action, clock);
246 TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
249 static void action_Isend(const char *const *action)
251 CHECK_ACTION_PARAMS(action, 2, 1)
252 int to = atoi(action[2]);
253 double size=parse_double(action[3]);
254 double clock = smpi_process()->simulated_elapsed();
257 MPI_CURRENT_TYPE=decode_datatype(action[4]);
259 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
261 int rank = smpi_process()->index();
262 int dst_traced = MPI_COMM_WORLD->group()->rank(to);
263 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
264 extra->type = TRACING_ISEND;
265 extra->send_size = size;
267 extra->dst = dst_traced;
268 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
269 TRACE_smpi_ptp_in(rank, rank, dst_traced, __FUNCTION__, extra);
270 if (not TRACE_smpi_view_internals())
271 TRACE_smpi_send(rank, rank, dst_traced, 0, size*MPI_CURRENT_TYPE->size());
273 MPI_Request request = Request::isend(nullptr, size, MPI_CURRENT_TYPE, to, 0,MPI_COMM_WORLD);
275 TRACE_smpi_ptp_out(rank, rank, dst_traced, __FUNCTION__);
277 get_reqq_self()->push_back(request);
279 log_timed_action (action, clock);
282 static void action_recv(const char *const *action) {
283 CHECK_ACTION_PARAMS(action, 2, 1)
284 int from = atoi(action[2]);
285 double size=parse_double(action[3]);
286 double clock = smpi_process()->simulated_elapsed();
290 MPI_CURRENT_TYPE=decode_datatype(action[4]);
292 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
294 int rank = smpi_process()->index();
295 int src_traced = MPI_COMM_WORLD->group()->rank(from);
297 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
298 extra->type = TRACING_RECV;
299 extra->send_size = size;
300 extra->src = src_traced;
302 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
303 TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
305 //unknown size from the receiver point of view
307 Request::probe(from, 0, MPI_COMM_WORLD, &status);
311 Request::recv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD, &status);
313 TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
314 if (not TRACE_smpi_view_internals()) {
315 TRACE_smpi_recv(rank, src_traced, rank, 0);
318 log_timed_action (action, clock);
321 static void action_Irecv(const char *const *action)
323 CHECK_ACTION_PARAMS(action, 2, 1)
324 int from = atoi(action[2]);
325 double size=parse_double(action[3]);
326 double clock = smpi_process()->simulated_elapsed();
329 MPI_CURRENT_TYPE=decode_datatype(action[4]);
331 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
333 int rank = smpi_process()->index();
334 int src_traced = MPI_COMM_WORLD->group()->rank(from);
335 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
336 extra->type = TRACING_IRECV;
337 extra->send_size = size;
338 extra->src = src_traced;
340 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
341 TRACE_smpi_ptp_in(rank, src_traced, rank, __FUNCTION__, extra);
343 //unknow size from the receiver pov
345 Request::probe(from, 0, MPI_COMM_WORLD, &status);
349 MPI_Request request = Request::irecv(nullptr, size, MPI_CURRENT_TYPE, from, 0, MPI_COMM_WORLD);
351 TRACE_smpi_ptp_out(rank, src_traced, rank, __FUNCTION__);
352 get_reqq_self()->push_back(request);
354 log_timed_action (action, clock);
357 static void action_test(const char *const *action){
358 CHECK_ACTION_PARAMS(action, 0, 0)
359 double clock = smpi_process()->simulated_elapsed();
362 MPI_Request request = get_reqq_self()->back();
363 get_reqq_self()->pop_back();
364 //if request is null here, this may mean that a previous test has succeeded
365 //Different times in traced application and replayed version may lead to this
366 //In this case, ignore the extra calls.
367 if(request!=nullptr){
368 int rank = smpi_process()->index();
369 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
370 extra->type=TRACING_TEST;
371 TRACE_smpi_testing_in(rank, extra);
373 int flag = Request::test(&request, &status);
375 XBT_DEBUG("MPI_Test result: %d", flag);
376 /* push back request in vector to be caught by a subsequent wait. if the test did succeed, the request is now nullptr.*/
377 get_reqq_self()->push_back(request);
379 TRACE_smpi_testing_out(rank);
381 log_timed_action (action, clock);
384 static void action_wait(const char *const *action){
385 CHECK_ACTION_PARAMS(action, 0, 0)
386 double clock = smpi_process()->simulated_elapsed();
389 xbt_assert(get_reqq_self()->size(), "action wait not preceded by any irecv or isend: %s",
390 xbt_str_join_array(action," "));
391 MPI_Request request = get_reqq_self()->back();
392 get_reqq_self()->pop_back();
394 if (request==nullptr){
395 /* Assume that the trace is well formed, meaning the comm might have been caught by a MPI_test. Then just return.*/
399 int rank = request->comm() != MPI_COMM_NULL ? request->comm()->rank() : -1;
401 MPI_Group group = request->comm()->group();
402 int src_traced = group->rank(request->src());
403 int dst_traced = group->rank(request->dst());
404 int is_wait_for_receive = (request->flags() & RECV);
405 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
406 extra->type = TRACING_WAIT;
407 TRACE_smpi_ptp_in(rank, src_traced, dst_traced, __FUNCTION__, extra);
409 Request::wait(&request, &status);
411 TRACE_smpi_ptp_out(rank, src_traced, dst_traced, __FUNCTION__);
412 if (is_wait_for_receive)
413 TRACE_smpi_recv(rank, src_traced, dst_traced, 0);
414 log_timed_action (action, clock);
417 static void action_waitall(const char *const *action){
418 CHECK_ACTION_PARAMS(action, 0, 0)
419 double clock = smpi_process()->simulated_elapsed();
420 unsigned int count_requests=get_reqq_self()->size();
422 if (count_requests>0) {
423 MPI_Status status[count_requests];
425 int rank_traced = smpi_process()->index();
426 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
427 extra->type = TRACING_WAITALL;
428 extra->send_size=count_requests;
429 TRACE_smpi_ptp_in(rank_traced, -1, -1, __FUNCTION__,extra);
430 int recvs_snd[count_requests];
431 int recvs_rcv[count_requests];
433 for (auto req : *(get_reqq_self())){
434 if (req && (req->flags () & RECV)){
435 recvs_snd[i]=req->src();
436 recvs_rcv[i]=req->dst();
441 Request::waitall(count_requests, &(*get_reqq_self())[0], status);
443 for (i=0; i<count_requests;i++){
444 if (recvs_snd[i]!=-100)
445 TRACE_smpi_recv(rank_traced, recvs_snd[i], recvs_rcv[i],0);
447 TRACE_smpi_ptp_out(rank_traced, -1, -1, __FUNCTION__);
449 log_timed_action (action, clock);
452 static void action_barrier(const char *const *action){
453 double clock = smpi_process()->simulated_elapsed();
454 int rank = smpi_process()->index();
455 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
456 extra->type = TRACING_BARRIER;
457 TRACE_smpi_collective_in(rank, -1, __FUNCTION__, extra);
459 Colls::barrier(MPI_COMM_WORLD);
461 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
462 log_timed_action (action, clock);
465 static void action_bcast(const char *const *action)
467 CHECK_ACTION_PARAMS(action, 1, 2)
468 double size = parse_double(action[2]);
469 double clock = smpi_process()->simulated_elapsed();
471 /* Initialize MPI_CURRENT_TYPE in order to decrease the number of the checks */
472 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
475 root= atoi(action[3]);
477 MPI_CURRENT_TYPE=decode_datatype(action[4]);
480 int rank = smpi_process()->index();
481 int root_traced = MPI_COMM_WORLD->group()->index(root);
483 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
484 extra->type = TRACING_BCAST;
485 extra->send_size = size;
486 extra->root = root_traced;
487 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
488 TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__, extra);
489 void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
491 Colls::bcast(sendbuf, size, MPI_CURRENT_TYPE, root, MPI_COMM_WORLD);
493 TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
494 log_timed_action (action, clock);
497 static void action_reduce(const char *const *action)
499 CHECK_ACTION_PARAMS(action, 2, 2)
500 double comm_size = parse_double(action[2]);
501 double comp_size = parse_double(action[3]);
502 double clock = smpi_process()->simulated_elapsed();
504 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
507 root= atoi(action[4]);
509 MPI_CURRENT_TYPE=decode_datatype(action[5]);
512 int rank = smpi_process()->index();
513 int root_traced = MPI_COMM_WORLD->group()->rank(root);
514 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
515 extra->type = TRACING_REDUCE;
516 extra->send_size = comm_size;
517 extra->comp_size = comp_size;
518 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
519 extra->root = root_traced;
521 TRACE_smpi_collective_in(rank, root_traced, __FUNCTION__,extra);
523 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
524 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
525 Colls::reduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, root, MPI_COMM_WORLD);
526 smpi_execute_flops(comp_size);
528 TRACE_smpi_collective_out(rank, root_traced, __FUNCTION__);
529 log_timed_action (action, clock);
532 static void action_allReduce(const char *const *action) {
533 CHECK_ACTION_PARAMS(action, 2, 1)
534 double comm_size = parse_double(action[2]);
535 double comp_size = parse_double(action[3]);
538 MPI_CURRENT_TYPE=decode_datatype(action[4]);
540 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
542 double clock = smpi_process()->simulated_elapsed();
543 int rank = smpi_process()->index();
544 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
545 extra->type = TRACING_ALLREDUCE;
546 extra->send_size = comm_size;
547 extra->comp_size = comp_size;
548 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
549 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
551 void *recvbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
552 void *sendbuf = smpi_get_tmp_sendbuffer(comm_size* MPI_CURRENT_TYPE->size());
553 Colls::allreduce(sendbuf, recvbuf, comm_size, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
554 smpi_execute_flops(comp_size);
556 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
557 log_timed_action (action, clock);
560 static void action_allToAll(const char *const *action) {
561 CHECK_ACTION_PARAMS(action, 2, 2) //two mandatory (send and recv volumes) and two optional (corresponding datatypes)
562 double clock = smpi_process()->simulated_elapsed();
563 int comm_size = MPI_COMM_WORLD->size();
564 int send_size = parse_double(action[2]);
565 int recv_size = parse_double(action[3]);
566 MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
568 if(action[4] && action[5]) {
569 MPI_CURRENT_TYPE=decode_datatype(action[4]);
570 MPI_CURRENT_TYPE2=decode_datatype(action[5]);
573 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
575 void *send = smpi_get_tmp_sendbuffer(send_size*comm_size* MPI_CURRENT_TYPE->size());
576 void *recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
578 int rank = smpi_process()->index();
579 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
580 extra->type = TRACING_ALLTOALL;
581 extra->send_size = send_size;
582 extra->recv_size = recv_size;
583 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
584 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
586 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
588 Colls::alltoall(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
590 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
591 log_timed_action (action, clock);
594 static void action_gather(const char *const *action) {
595 /* The structure of the gather action for the rank 0 (total 4 processes) is the following:
598 1) 68 is the sendcounts
599 2) 68 is the recvcounts
600 3) 0 is the root node
601 4) 0 is the send datatype id, see decode_datatype()
602 5) 0 is the recv datatype id, see decode_datatype()
604 CHECK_ACTION_PARAMS(action, 2, 3)
605 double clock = smpi_process()->simulated_elapsed();
606 int comm_size = MPI_COMM_WORLD->size();
607 int send_size = parse_double(action[2]);
608 int recv_size = parse_double(action[3]);
609 MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
610 if(action[4] && action[5]) {
611 MPI_CURRENT_TYPE=decode_datatype(action[5]);
612 MPI_CURRENT_TYPE2=decode_datatype(action[6]);
614 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
616 void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
617 void *recv = nullptr;
620 root=atoi(action[4]);
621 int rank = MPI_COMM_WORLD->rank();
624 recv = smpi_get_tmp_recvbuffer(recv_size*comm_size* MPI_CURRENT_TYPE2->size());
626 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
627 extra->type = TRACING_GATHER;
628 extra->send_size = send_size;
629 extra->recv_size = recv_size;
631 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
632 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
634 TRACE_smpi_collective_in(smpi_process()->index(), root, __FUNCTION__, extra);
636 Colls::gather(send, send_size, MPI_CURRENT_TYPE, recv, recv_size, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
638 TRACE_smpi_collective_out(smpi_process()->index(), -1, __FUNCTION__);
639 log_timed_action (action, clock);
642 static void action_gatherv(const char *const *action) {
643 /* The structure of the gatherv action for the rank 0 (total 4 processes) is the following:
644 0 gather 68 68 10 10 10 0 0 0
646 1) 68 is the sendcount
647 2) 68 10 10 10 is the recvcounts
648 3) 0 is the root node
649 4) 0 is the send datatype id, see decode_datatype()
650 5) 0 is the recv datatype id, see decode_datatype()
652 double clock = smpi_process()->simulated_elapsed();
653 int comm_size = MPI_COMM_WORLD->size();
654 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
655 int send_size = parse_double(action[2]);
656 int disps[comm_size];
657 int recvcounts[comm_size];
660 MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
661 if(action[4+comm_size] && action[5+comm_size]) {
662 MPI_CURRENT_TYPE=decode_datatype(action[4+comm_size]);
663 MPI_CURRENT_TYPE2=decode_datatype(action[5+comm_size]);
665 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
667 void *send = smpi_get_tmp_sendbuffer(send_size* MPI_CURRENT_TYPE->size());
668 void *recv = nullptr;
669 for(int i=0;i<comm_size;i++) {
670 recvcounts[i] = atoi(action[i+3]);
671 recv_sum=recv_sum+recvcounts[i];
675 int root=atoi(action[3+comm_size]);
676 int rank = MPI_COMM_WORLD->rank();
679 recv = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
681 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
682 extra->type = TRACING_GATHERV;
683 extra->send_size = send_size;
684 extra->recvcounts= xbt_new(int,comm_size);
685 for(int i=0; i< comm_size; i++)//copy data to avoid bad free
686 extra->recvcounts[i] = recvcounts[i];
688 extra->num_processes = comm_size;
689 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
690 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
692 TRACE_smpi_collective_in(smpi_process()->index(), root, __FUNCTION__, extra);
694 Colls::gatherv(send, send_size, MPI_CURRENT_TYPE, recv, recvcounts, disps, MPI_CURRENT_TYPE2, root, MPI_COMM_WORLD);
696 TRACE_smpi_collective_out(smpi_process()->index(), -1, __FUNCTION__);
697 log_timed_action (action, clock);
700 static void action_reducescatter(const char *const *action) {
701 /* The structure of the reducescatter action for the rank 0 (total 4 processes) is the following:
702 0 reduceScatter 275427 275427 275427 204020 11346849 0
704 1) The first four values after the name of the action declare the recvcounts array
705 2) The value 11346849 is the amount of instructions
706 3) The last value corresponds to the datatype, see decode_datatype().
708 double clock = smpi_process()->simulated_elapsed();
709 int comm_size = MPI_COMM_WORLD->size();
710 CHECK_ACTION_PARAMS(action, comm_size+1, 1)
711 int comp_size = parse_double(action[2+comm_size]);
712 int recvcounts[comm_size];
713 int rank = smpi_process()->index();
715 if(action[3+comm_size])
716 MPI_CURRENT_TYPE=decode_datatype(action[3+comm_size]);
718 MPI_CURRENT_TYPE= MPI_DEFAULT_TYPE;
720 for(int i=0;i<comm_size;i++) {
721 recvcounts[i] = atoi(action[i+2]);
725 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
726 extra->type = TRACING_REDUCE_SCATTER;
727 extra->send_size = 0;
728 extra->recvcounts= xbt_new(int, comm_size);
729 for(int i=0; i< comm_size; i++)//copy data to avoid bad free
730 extra->recvcounts[i] = recvcounts[i];
731 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
732 extra->comp_size = comp_size;
733 extra->num_processes = comm_size;
735 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
737 void *sendbuf = smpi_get_tmp_sendbuffer(size* MPI_CURRENT_TYPE->size());
738 void *recvbuf = smpi_get_tmp_recvbuffer(size* MPI_CURRENT_TYPE->size());
740 Colls::reduce_scatter(sendbuf, recvbuf, recvcounts, MPI_CURRENT_TYPE, MPI_OP_NULL, MPI_COMM_WORLD);
741 smpi_execute_flops(comp_size);
743 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
744 log_timed_action (action, clock);
747 static void action_allgather(const char *const *action) {
748 /* The structure of the allgather action for the rank 0 (total 4 processes) is the following:
749 0 allGather 275427 275427
751 1) 275427 is the sendcount
752 2) 275427 is the recvcount
753 3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
755 double clock = smpi_process()->simulated_elapsed();
757 CHECK_ACTION_PARAMS(action, 2, 2)
758 int sendcount=atoi(action[2]);
759 int recvcount=atoi(action[3]);
761 MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
763 if(action[4] && action[5]) {
764 MPI_CURRENT_TYPE = decode_datatype(action[4]);
765 MPI_CURRENT_TYPE2 = decode_datatype(action[5]);
767 MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
769 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
770 void *recvbuf = smpi_get_tmp_recvbuffer(recvcount* MPI_CURRENT_TYPE2->size());
772 int rank = smpi_process()->index();
773 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
774 extra->type = TRACING_ALLGATHER;
775 extra->send_size = sendcount;
776 extra->recv_size= recvcount;
777 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
778 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
779 extra->num_processes = MPI_COMM_WORLD->size();
781 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
783 Colls::allgather(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcount, MPI_CURRENT_TYPE2, MPI_COMM_WORLD);
785 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
786 log_timed_action (action, clock);
789 static void action_allgatherv(const char *const *action) {
790 /* The structure of the allgatherv action for the rank 0 (total 4 processes) is the following:
791 0 allGatherV 275427 275427 275427 275427 204020
793 1) 275427 is the sendcount
794 2) The next four elements declare the recvcounts array
795 3) No more values mean that the datatype for sent and receive buffer is the default one, see decode_datatype().
797 double clock = smpi_process()->simulated_elapsed();
799 int comm_size = MPI_COMM_WORLD->size();
800 CHECK_ACTION_PARAMS(action, comm_size+1, 2)
801 int sendcount=atoi(action[2]);
802 int recvcounts[comm_size];
803 int disps[comm_size];
805 MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
807 if(action[3+comm_size] && action[4+comm_size]) {
808 MPI_CURRENT_TYPE = decode_datatype(action[3+comm_size]);
809 MPI_CURRENT_TYPE2 = decode_datatype(action[4+comm_size]);
811 MPI_CURRENT_TYPE = MPI_DEFAULT_TYPE;
813 void *sendbuf = smpi_get_tmp_sendbuffer(sendcount* MPI_CURRENT_TYPE->size());
815 for(int i=0;i<comm_size;i++) {
816 recvcounts[i] = atoi(action[i+3]);
817 recv_sum=recv_sum+recvcounts[i];
820 void *recvbuf = smpi_get_tmp_recvbuffer(recv_sum* MPI_CURRENT_TYPE2->size());
822 int rank = smpi_process()->index();
823 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
824 extra->type = TRACING_ALLGATHERV;
825 extra->send_size = sendcount;
826 extra->recvcounts= xbt_new(int, comm_size);
827 for(int i=0; i< comm_size; i++)//copy data to avoid bad free
828 extra->recvcounts[i] = recvcounts[i];
829 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
830 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
831 extra->num_processes = comm_size;
833 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
835 Colls::allgatherv(sendbuf, sendcount, MPI_CURRENT_TYPE, recvbuf, recvcounts, disps, MPI_CURRENT_TYPE2,
838 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
839 log_timed_action (action, clock);
842 static void action_allToAllv(const char *const *action) {
843 /* The structure of the allToAllV action for the rank 0 (total 4 processes) is the following:
844 0 allToAllV 100 1 7 10 12 100 1 70 10 5
846 1) 100 is the size of the send buffer *sizeof(int),
847 2) 1 7 10 12 is the sendcounts array
848 3) 100*sizeof(int) is the size of the receiver buffer
849 4) 1 70 10 5 is the recvcounts array
851 double clock = smpi_process()->simulated_elapsed();
853 int comm_size = MPI_COMM_WORLD->size();
854 CHECK_ACTION_PARAMS(action, 2*comm_size+2, 2)
855 int sendcounts[comm_size];
856 int recvcounts[comm_size];
857 int senddisps[comm_size];
858 int recvdisps[comm_size];
860 MPI_Datatype MPI_CURRENT_TYPE2 = MPI_DEFAULT_TYPE;
862 int send_buf_size=parse_double(action[2]);
863 int recv_buf_size=parse_double(action[3+comm_size]);
864 if(action[4+2*comm_size] && action[5+2*comm_size]) {
865 MPI_CURRENT_TYPE=decode_datatype(action[4+2*comm_size]);
866 MPI_CURRENT_TYPE2=decode_datatype(action[5+2*comm_size]);
869 MPI_CURRENT_TYPE=MPI_DEFAULT_TYPE;
871 void *sendbuf = smpi_get_tmp_sendbuffer(send_buf_size* MPI_CURRENT_TYPE->size());
872 void *recvbuf = smpi_get_tmp_recvbuffer(recv_buf_size* MPI_CURRENT_TYPE2->size());
874 for(int i=0;i<comm_size;i++) {
875 sendcounts[i] = atoi(action[i+3]);
876 recvcounts[i] = atoi(action[i+4+comm_size]);
881 int rank = smpi_process()->index();
882 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
883 extra->type = TRACING_ALLTOALLV;
884 extra->recvcounts= xbt_new(int, comm_size);
885 extra->sendcounts= xbt_new(int, comm_size);
886 extra->num_processes = comm_size;
888 for(int i=0; i< comm_size; i++){//copy data to avoid bad free
889 extra->send_size += sendcounts[i];
890 extra->sendcounts[i] = sendcounts[i];
891 extra->recv_size += recvcounts[i];
892 extra->recvcounts[i] = recvcounts[i];
894 extra->datatype1 = encode_datatype(MPI_CURRENT_TYPE, nullptr);
895 extra->datatype2 = encode_datatype(MPI_CURRENT_TYPE2, nullptr);
897 TRACE_smpi_collective_in(rank, -1, __FUNCTION__,extra);
899 Colls::alltoallv(sendbuf, sendcounts, senddisps, MPI_CURRENT_TYPE,recvbuf, recvcounts, recvdisps,
900 MPI_CURRENT_TYPE, MPI_COMM_WORLD);
902 TRACE_smpi_collective_out(rank, -1, __FUNCTION__);
903 log_timed_action (action, clock);
906 }} // namespace simgrid::smpi
908 /** @brief Only initialize the replay, don't do it for real */
909 void smpi_replay_init(int* argc, char*** argv)
911 simgrid::smpi::Process::init(argc, argv);
912 smpi_process()->mark_as_initialized();
913 smpi_process()->set_replaying(true);
915 int rank = smpi_process()->index();
916 TRACE_smpi_init(rank);
917 TRACE_smpi_computing_init(rank);
918 instr_extra_data extra = xbt_new0(s_instr_extra_data_t,1);
919 extra->type = TRACING_INIT;
920 TRACE_smpi_collective_in(rank, -1, "smpi_replay_run_init", extra);
921 TRACE_smpi_collective_out(rank, -1, "smpi_replay_run_init");
922 xbt_replay_action_register("init", simgrid::smpi::action_init);
923 xbt_replay_action_register("finalize", simgrid::smpi::action_finalize);
924 xbt_replay_action_register("comm_size", simgrid::smpi::action_comm_size);
925 xbt_replay_action_register("comm_split", simgrid::smpi::action_comm_split);
926 xbt_replay_action_register("comm_dup", simgrid::smpi::action_comm_dup);
927 xbt_replay_action_register("send", simgrid::smpi::action_send);
928 xbt_replay_action_register("Isend", simgrid::smpi::action_Isend);
929 xbt_replay_action_register("recv", simgrid::smpi::action_recv);
930 xbt_replay_action_register("Irecv", simgrid::smpi::action_Irecv);
931 xbt_replay_action_register("test", simgrid::smpi::action_test);
932 xbt_replay_action_register("wait", simgrid::smpi::action_wait);
933 xbt_replay_action_register("waitAll", simgrid::smpi::action_waitall);
934 xbt_replay_action_register("barrier", simgrid::smpi::action_barrier);
935 xbt_replay_action_register("bcast", simgrid::smpi::action_bcast);
936 xbt_replay_action_register("reduce", simgrid::smpi::action_reduce);
937 xbt_replay_action_register("allReduce", simgrid::smpi::action_allReduce);
938 xbt_replay_action_register("allToAll", simgrid::smpi::action_allToAll);
939 xbt_replay_action_register("allToAllV", simgrid::smpi::action_allToAllv);
940 xbt_replay_action_register("gather", simgrid::smpi::action_gather);
941 xbt_replay_action_register("gatherV", simgrid::smpi::action_gatherv);
942 xbt_replay_action_register("allGather", simgrid::smpi::action_allgather);
943 xbt_replay_action_register("allGatherV", simgrid::smpi::action_allgatherv);
944 xbt_replay_action_register("reduceScatter", simgrid::smpi::action_reducescatter);
945 xbt_replay_action_register("compute", simgrid::smpi::action_compute);
947 //if we have a delayed start, sleep here.
949 double value = xbt_str_parse_double((*argv)[2], "%s is not a double");
950 XBT_VERB("Delayed start for instance - Sleeping for %f flops ",value );
951 smpi_execute_flops(value);
953 //UGLY: force a context switch to be sure that all MSG_processes begin initialization
954 XBT_DEBUG("Force context switch by smpi_execute_flops - Sleeping for 0.0 flops ");
955 smpi_execute_flops(0.0);
959 /** @brief actually run the replay after initialization */
960 void smpi_replay_main(int* argc, char*** argv)
962 simgrid::xbt::replay_runner(*argc, *argv);
964 /* and now, finalize everything */
965 /* One active process will stop. Decrease the counter*/
966 XBT_DEBUG("There are %zu elements in reqq[*]", get_reqq_self()->size());
967 if (not get_reqq_self()->empty()) {
968 unsigned int count_requests=get_reqq_self()->size();
969 MPI_Request requests[count_requests];
970 MPI_Status status[count_requests];
973 for (auto req: *get_reqq_self()){
977 simgrid::smpi::Request::waitall(count_requests, requests, status);
979 delete get_reqq_self();
982 if(active_processes==0){
983 /* Last process alive speaking: end the simulated timer */
984 XBT_INFO("Simulation time %f", smpi_process()->simulated_elapsed());
985 xbt_free(sendbuffer);
986 xbt_free(recvbuffer);
989 instr_extra_data extra_fin = xbt_new0(s_instr_extra_data_t,1);
990 extra_fin->type = TRACING_FINALIZE;
991 TRACE_smpi_collective_in(smpi_process()->index(), -1, "smpi_replay_run_finalize", extra_fin);
993 smpi_process()->finalize();
995 TRACE_smpi_collective_out(smpi_process()->index(), -1, "smpi_replay_run_finalize");
996 TRACE_smpi_finalize(smpi_process()->index());
999 /** @brief chain a replay initialization and a replay start */
1000 void smpi_replay_run(int* argc, char*** argv)
1002 smpi_replay_init(argc, argv);
1003 smpi_replay_main(argc, argv);