From: Augustin Degomme Date: Sun, 20 Mar 2022 20:40:34 +0000 (+0100) Subject: SMPI: add utility to keep the order of collective calls performed by each process... X-Git-Tag: v3.31~21 X-Git-Url: http://bilbo.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/commitdiff_plain/74208c99e01890f3da51863d290d577392f9bce5 SMPI: add utility to keep the order of collective calls performed by each process MPI and validate that all processes do the same. It's not activated by default, and needs --cfg=smpi/pedantic:true option, as it may store too much data in memory for now For each comm we maintain a vector of encountered collective calls Each process stores the amount of calls performed in each communicator At each new one we compare the amount of calls to the size of the corresponding vector if we are the first process, add the call to the list if not, compare its name to the one at the corresponding place in the vector, and cry for help if it's not the same. Kudos mquinson and MBI for the idea. --- diff --git a/src/smpi/include/smpi_comm.hpp b/src/smpi/include/smpi_comm.hpp index 87ff9b2053..82bf722e88 100644 --- a/src/smpi/include/smpi_comm.hpp +++ b/src/smpi/include/smpi_comm.hpp @@ -42,6 +42,9 @@ class Comm : public F2C, public Keyval{ std::unordered_map sent_messages_; std::unordered_map recv_messages_; + unsigned int collectives_count_ = 0; + unsigned int* collectives_counts_ = nullptr; //for MPI_COMM_WORLD only + public: static std::unordered_map keyvals_; @@ -97,6 +100,9 @@ public: void increment_sent_messages_count(int src, int dst, int tag); unsigned int get_received_messages_count(int src, int dst, int tag); void increment_received_messages_count(int src, int dst, int tag); + unsigned int get_collectives_count(); + void increment_collectives_count(); + }; } // namespace smpi diff --git a/src/smpi/include/smpi_utils.hpp b/src/smpi/include/smpi_utils.hpp index a53149410d..1125ad1dfa 100644 --- a/src/smpi/include/smpi_utils.hpp +++ b/src/smpi/include/smpi_utils.hpp @@ -8,6 +8,7 @@ #include #include "smpi_f2c.hpp" +#include "smpi_comm.hpp" #include #include @@ -35,6 +36,7 @@ XBT_PUBLIC void set_current_handle(F2C* handle); XBT_PUBLIC void set_current_buffer(int i, const char* name, const void* handle); XBT_PUBLIC size_t get_buffer_size(const void* ptr); XBT_PUBLIC void account_free(const void* ptr); +XBT_PUBLIC int check_collectives_ordering(MPI_Comm comm, std::string call); } // namespace utils } // namespace smpi diff --git a/src/smpi/internals/smpi_utils.cpp b/src/smpi/internals/smpi_utils.cpp index 837cedf699..686bc14605 100644 --- a/src/smpi/internals/smpi_utils.cpp +++ b/src/smpi/internals/smpi_utils.cpp @@ -11,6 +11,7 @@ #include "src/surf/xml/platf.hpp" #include "xbt/file.hpp" #include "xbt/log.h" +#include "xbt/ex.h" #include "xbt/parse_units.hpp" #include "xbt/sysdep.h" #include @@ -48,6 +49,8 @@ current_buffer_metadata_t current_buffer2; std::unordered_map allocs; +std::unordered_map> collective_calls; + std::vector parse_factor(const std::string& smpi_coef_string) { std::vector smpi_factor; @@ -345,6 +348,30 @@ void account_free(const void* ptr){ } } +int check_collectives_ordering(MPI_Comm comm, std::string call){ + if(_smpi_cfg_pedantic){ + unsigned int count = comm->get_collectives_count(); + comm->increment_collectives_count(); + auto vec = collective_calls.find(comm->id()); + if (vec == collective_calls.end()) { + collective_calls.emplace(comm->id(), std::vector{call}); + }else{ + //are we the first ? add the call + if (vec->second.size() == (count)){ + vec->second.push_back(call); + } else if (vec->second.size() > count){ + if (vec->second[count] != call){ + XBT_WARN("Collective communication mismatch. For process %ld, expected %s, got %s", simgrid::s4u::this_actor::get_pid(), vec->second[count].c_str(), call.c_str()); + return MPI_ERR_OTHER; + } + } else { + THROW_IMPOSSIBLE; + } + } + } + return MPI_SUCCESS; +} + } } } // namespace simgrid diff --git a/src/smpi/mpi/smpi_comm.cpp b/src/smpi/mpi/smpi_comm.cpp index b482e21449..84f604dba8 100644 --- a/src/smpi/mpi/smpi_comm.cpp +++ b/src/smpi/mpi/smpi_comm.cpp @@ -365,6 +365,8 @@ void Comm::unref(Comm* comm){ delete[] comm->errhandlers_; } else if (comm->errhandler_ != MPI_ERRHANDLER_NULL) simgrid::smpi::Errhandler::unref(comm->errhandler_); + if(comm->collectives_counts_!=nullptr) + delete[] comm->collectives_counts_; } Group::unref(comm->group_); if(comm->refcount_==0) @@ -650,5 +652,31 @@ void Comm::increment_received_messages_count(int src, int dst, int tag) recv_messages_[hash_message(src, dst, tag)]++; } +unsigned int Comm::get_collectives_count() +{ + if (this==MPI_COMM_UNINITIALIZED){ + return smpi_process()->comm_world()->get_collectives_count(); + }else if(this == MPI_COMM_WORLD || this == smpi_process()->comm_world()){ + if(collectives_counts_==nullptr) + collectives_counts_=new unsigned int[this->size()]{0}; + return collectives_counts_[this->rank()]; + }else{ + return collectives_count_; + } +} + +void Comm::increment_collectives_count() +{ + if (this==MPI_COMM_UNINITIALIZED){ + smpi_process()->comm_world()->increment_collectives_count(); + }else if (this == MPI_COMM_WORLD || this == smpi_process()->comm_world()){ + if(collectives_counts_==nullptr) + collectives_counts_=new unsigned int[this->size()]{0}; + collectives_counts_[this->rank()]++; + }else{ + collectives_count_++; + } +} + } // namespace smpi } // namespace simgrid