From 989042c42c15e746278c98212cd16e655d03f182 Mon Sep 17 00:00:00 2001 From: Augustin Degomme Date: Fri, 7 May 2021 12:17:54 +0200 Subject: [PATCH] Properly terminate non blocking collective requests in all cases (mpi_test/testall/testany/testsome/waitsome/waitany). Also use flags to identify non blocking collectivre requests instead of the size of an array (which should be turned in a vector soon) Fix one issue reported in #67 --- src/smpi/bindings/smpi_pmpi_request.cpp | 7 ++- src/smpi/colls/smpi_nbc_impl.cpp | 34 ++++++------ src/smpi/include/private.hpp | 1 + src/smpi/include/smpi_request.hpp | 1 + src/smpi/mpi/smpi_request.cpp | 70 +++++++++++-------------- 5 files changed, 53 insertions(+), 60 deletions(-) diff --git a/src/smpi/bindings/smpi_pmpi_request.cpp b/src/smpi/bindings/smpi_pmpi_request.cpp index e0abad66de..19de25e0c9 100644 --- a/src/smpi/bindings/smpi_pmpi_request.cpp +++ b/src/smpi/bindings/smpi_pmpi_request.cpp @@ -617,8 +617,7 @@ int PMPI_Wait(MPI_Request * request, MPI_Status * status) } else { // for tracing, save the handle which might get overridden before we can use the helper on it MPI_Request savedreq = *request; - if (savedreq != MPI_REQUEST_NULL && not(savedreq->flags() & MPI_REQ_FINISHED) - && not(savedreq->flags() & MPI_REQ_GENERALIZED)) + if (savedreq != MPI_REQUEST_NULL && not(savedreq->flags() & (MPI_REQ_FINISHED | MPI_REQ_GENERALIZED | MPI_REQ_NBC))) savedreq->ref();//don't erase the handle in Request::wait, we'll need it later else savedreq = MPI_REQUEST_NULL; @@ -652,7 +651,7 @@ int PMPI_Waitany(int count, MPI_Request requests[], int *index, MPI_Status * sta // for tracing, save the handles which might get overridden before we can use the helper on it std::vector savedreqs(requests, requests + count); for (MPI_Request& req : savedreqs) { - if (req != MPI_REQUEST_NULL && not(req->flags() & MPI_REQ_FINISHED)) + if (req != MPI_REQUEST_NULL && not(req->flags() & (MPI_REQ_FINISHED | MPI_REQ_NBC))) req->ref(); else req = MPI_REQUEST_NULL; @@ -683,7 +682,7 @@ int PMPI_Waitall(int count, MPI_Request requests[], MPI_Status status[]) // for tracing, save the handles which might get overridden before we can use the helper on it std::vector savedreqs(requests, requests + count); for (MPI_Request& req : savedreqs) { - if (req != MPI_REQUEST_NULL && not(req->flags() & MPI_REQ_FINISHED)) + if (req != MPI_REQUEST_NULL && not(req->flags() & (MPI_REQ_FINISHED | MPI_REQ_NBC))) req->ref(); else req = MPI_REQUEST_NULL; diff --git a/src/smpi/colls/smpi_nbc_impl.cpp b/src/smpi/colls/smpi_nbc_impl.cpp index 6c0c050c0a..f3b4a4672f 100644 --- a/src/smpi/colls/smpi_nbc_impl.cpp +++ b/src/smpi/colls/smpi_nbc_impl.cpp @@ -17,7 +17,7 @@ int colls::ibarrier(MPI_Comm comm, MPI_Request* request, int external) int rank = comm->rank(); int system_tag=COLL_TAG_BARRIER-external; (*request) = new Request( nullptr, 0, MPI_BYTE, - rank,rank, system_tag, comm, MPI_REQ_PERSISTENT); + rank,rank, system_tag, comm, MPI_REQ_PERSISTENT|MPI_REQ_NBC); if (rank > 0) { auto* requests = new MPI_Request[2]; requests[0] = Request::isend (nullptr, 0, MPI_BYTE, 0, @@ -46,7 +46,7 @@ int colls::ibcast(void* buf, int count, MPI_Datatype datatype, int root, MPI_Com int rank = comm->rank(); int system_tag=COLL_TAG_BCAST-external; (*request) = new Request( nullptr, 0, MPI_BYTE, - rank,rank, system_tag, comm, MPI_REQ_PERSISTENT); + rank,rank, system_tag, comm, MPI_REQ_PERSISTENT|MPI_REQ_NBC); if (rank != root) { auto* requests = new MPI_Request[1]; requests[0] = Request::irecv (buf, count, datatype, root, @@ -82,7 +82,7 @@ int colls::iallgather(const void* sendbuf, int sendcount, MPI_Datatype sendtype, int rank = comm->rank(); int size = comm->size(); (*request) = new Request( nullptr, 0, MPI_BYTE, - rank,rank, system_tag, comm, MPI_REQ_PERSISTENT); + rank,rank, system_tag, comm, MPI_REQ_PERSISTENT|MPI_REQ_NBC); // FIXME: check for errors recvtype->extent(&lb, &recvext); // Local copy from self @@ -115,7 +115,7 @@ int colls::iscatter(const void* sendbuf, int sendcount, MPI_Datatype sendtype, v int rank = comm->rank(); int size = comm->size(); (*request) = new Request( nullptr, 0, MPI_BYTE, - rank,rank, system_tag, comm, MPI_REQ_PERSISTENT); + rank,rank, system_tag, comm, MPI_REQ_PERSISTENT|MPI_REQ_NBC); if(rank != root) { auto* requests = new MPI_Request[1]; // Recv buffer from root @@ -155,7 +155,7 @@ int colls::iallgatherv(const void* sendbuf, int sendcount, MPI_Datatype sendtype int rank = comm->rank(); int size = comm->size(); (*request) = new Request( nullptr, 0, MPI_BYTE, - rank,rank, system_tag, comm, MPI_REQ_PERSISTENT); + rank,rank, system_tag, comm, MPI_REQ_PERSISTENT|MPI_REQ_NBC); recvtype->extent(&lb, &recvext); // Local copy from self Datatype::copy(sendbuf, sendcount, sendtype, @@ -191,7 +191,7 @@ int colls::ialltoall(const void* sendbuf, int sendcount, MPI_Datatype sendtype, int rank = comm->rank(); int size = comm->size(); (*request) = new Request( nullptr, 0, MPI_BYTE, - rank,rank, system_tag, comm, MPI_REQ_PERSISTENT); + rank,rank, system_tag, comm, MPI_REQ_PERSISTENT|MPI_REQ_NBC); sendtype->extent(&lb, &sendext); recvtype->extent(&lb, &recvext); /* simple optimization */ @@ -237,7 +237,7 @@ int colls::ialltoallv(const void* sendbuf, const int* sendcounts, const int* sen int rank = comm->rank(); int size = comm->size(); (*request) = new Request( nullptr, 0, MPI_BYTE, - rank,rank, system_tag, comm, MPI_REQ_PERSISTENT); + rank,rank, system_tag, comm, MPI_REQ_PERSISTENT|MPI_REQ_NBC); sendtype->extent(&lb, &sendext); recvtype->extent(&lb, &recvext); /* Local copy from self */ @@ -284,7 +284,7 @@ int colls::ialltoallw(const void* sendbuf, const int* sendcounts, const int* sen int rank = comm->rank(); int size = comm->size(); (*request) = new Request( nullptr, 0, MPI_BYTE, - rank,rank, system_tag, comm, MPI_REQ_PERSISTENT); + rank,rank, system_tag, comm, MPI_REQ_PERSISTENT|MPI_REQ_NBC); /* Local copy from self */ int err = (sendcounts[rank]>0 && recvcounts[rank]) ? Datatype::copy(static_cast(sendbuf) + senddisps[rank], sendcounts[rank], sendtypes[rank], static_cast(recvbuf) + recvdisps[rank], recvcounts[rank], recvtypes[rank]): MPI_SUCCESS; @@ -329,7 +329,7 @@ int colls::igather(const void* sendbuf, int sendcount, MPI_Datatype sendtype, vo int rank = comm->rank(); int size = comm->size(); (*request) = new Request( nullptr, 0, MPI_BYTE, - rank,rank, system_tag, comm, MPI_REQ_PERSISTENT); + rank,rank, system_tag, comm, MPI_REQ_PERSISTENT|MPI_REQ_NBC); if(rank != root) { // Send buffer to root auto* requests = new MPI_Request[1]; @@ -368,7 +368,7 @@ int colls::igatherv(const void* sendbuf, int sendcount, MPI_Datatype sendtype, v int rank = comm->rank(); int size = comm->size(); (*request) = new Request( nullptr, 0, MPI_BYTE, - rank,rank, system_tag, comm, MPI_REQ_PERSISTENT); + rank,rank, system_tag, comm, MPI_REQ_PERSISTENT|MPI_REQ_NBC); if (rank != root) { // Send buffer to root auto* requests = new MPI_Request[1]; @@ -406,7 +406,7 @@ int colls::iscatterv(const void* sendbuf, const int* sendcounts, const int* disp int rank = comm->rank(); int size = comm->size(); (*request) = new Request( nullptr, 0, MPI_BYTE, - rank,rank, system_tag, comm, MPI_REQ_PERSISTENT); + rank,rank, system_tag, comm, MPI_REQ_PERSISTENT|MPI_REQ_NBC); if(rank != root) { // Recv buffer from root auto* requests = new MPI_Request[1]; @@ -460,11 +460,11 @@ int colls::ireduce(const void* sendbuf, void* recvbuf, int count, MPI_Datatype d if(rank == root){ (*request) = new Request( recvbuf, count, datatype, - rank,rank, system_tag, comm, MPI_REQ_PERSISTENT, op); + rank,rank, system_tag, comm, MPI_REQ_PERSISTENT|MPI_REQ_NBC, op); } else (*request) = new Request( nullptr, count, datatype, - rank,rank, system_tag, comm, MPI_REQ_PERSISTENT); + rank,rank, system_tag, comm, MPI_REQ_PERSISTENT|MPI_REQ_NBC); if(rank != root) { // Send buffer to root @@ -507,7 +507,7 @@ int colls::iallreduce(const void* sendbuf, void* recvbuf, int count, MPI_Datatyp int rank = comm->rank(); int size = comm->size(); (*request) = new Request( recvbuf, count, datatype, - rank,rank, system_tag, comm, MPI_REQ_PERSISTENT, op); + rank,rank, system_tag, comm, MPI_REQ_PERSISTENT|MPI_REQ_NBC, op); // FIXME: check for errors datatype->extent(&lb, &dataext); // Local copy from self @@ -539,7 +539,7 @@ int colls::iscan(const void* sendbuf, void* recvbuf, int count, MPI_Datatype dat int rank = comm->rank(); int size = comm->size(); (*request) = new Request( recvbuf, count, datatype, - rank,rank, system_tag, comm, MPI_REQ_PERSISTENT, op); + rank,rank, system_tag, comm, MPI_REQ_PERSISTENT|MPI_REQ_NBC, op); datatype->extent(&lb, &dataext); // Local copy from self @@ -571,7 +571,7 @@ int colls::iexscan(const void* sendbuf, void* recvbuf, int count, MPI_Datatype d int rank = comm->rank(); int size = comm->size(); (*request) = new Request( recvbuf, count, datatype, - rank,rank, system_tag, comm, MPI_REQ_PERSISTENT, op); + rank,rank, system_tag, comm, MPI_REQ_PERSISTENT|MPI_REQ_NBC, op); datatype->extent(&lb, &dataext); if(rank != 0) memset(recvbuf, 0, count*dataext); @@ -605,7 +605,7 @@ int colls::ireduce_scatter(const void* sendbuf, void* recvbuf, const int* recvco int size = comm->size(); int count=recvcounts[rank]; (*request) = new Request( recvbuf, count, datatype, - rank,rank, system_tag, comm, MPI_REQ_PERSISTENT, op); + rank,rank, system_tag, comm, MPI_REQ_PERSISTENT|MPI_REQ_NBC, op); datatype->extent(&lb, &dataext); // Send/Recv buffers to/from others; diff --git a/src/smpi/include/private.hpp b/src/smpi/include/private.hpp index dbc9d31bb9..870a39788d 100644 --- a/src/smpi/include/private.hpp +++ b/src/smpi/include/private.hpp @@ -29,6 +29,7 @@ constexpr unsigned MPI_REQ_COMPLETE = 0x1000; constexpr unsigned MPI_REQ_BSEND = 0x2000; constexpr unsigned MPI_REQ_MATCHED = 0x4000; constexpr unsigned MPI_REQ_CANCELLED = 0x8000; +constexpr unsigned MPI_REQ_NBC = 0x10000; enum class SmpiProcessState { UNINITIALIZED, INITIALIZING, INITIALIZED /*(=MPI_Init called)*/, FINALIZED }; diff --git a/src/smpi/include/smpi_request.hpp b/src/smpi/include/smpi_request.hpp index a4c08f9e13..802753370b 100644 --- a/src/smpi/include/smpi_request.hpp +++ b/src/smpi/include/smpi_request.hpp @@ -73,6 +73,7 @@ public: void init_buffer(int count); void ref(); void set_nbc_requests(MPI_Request* reqs, int size); + static int finish_nbc_requests(MPI_Request* req); int get_nbc_requests_size() const; MPI_Request* get_nbc_requests() const; static void finish_wait(MPI_Request* request, MPI_Status* status); diff --git a/src/smpi/mpi/smpi_request.cpp b/src/smpi/mpi/smpi_request.cpp index 7736157185..b782759c68 100644 --- a/src/smpi/mpi/smpi_request.cpp +++ b/src/smpi/mpi/smpi_request.cpp @@ -598,19 +598,7 @@ int Request::test(MPI_Request * request, MPI_Status * status, int* flag) { static int nsleeps = 1; int ret = MPI_SUCCESS; - - // Are we testing a request meant for non blocking collectives ? - // If so, test all the subrequests. - if ((*request)->nbc_requests_size_>0){ - ret = testall((*request)->nbc_requests_size_, (*request)->nbc_requests_, flag, MPI_STATUSES_IGNORE); - if(*flag){ - delete[] (*request)->nbc_requests_; - (*request)->nbc_requests_size_=0; - unref(request); - } - return ret; - } - + if(smpi_test_sleep > 0) simgrid::s4u::this_actor::sleep_for(nsleeps * smpi_test_sleep); @@ -856,6 +844,30 @@ void Request::iprobe(int source, int tag, MPI_Comm comm, int* flag, MPI_Status* xbt_assert(request == MPI_REQUEST_NULL); } +int Request::finish_nbc_requests(MPI_Request* request){ + int ret = waitall((*request)->nbc_requests_size_, (*request)->nbc_requests_, MPI_STATUSES_IGNORE); + XBT_DEBUG("finish non blocking collective request with %d sub-requests", (*request)->nbc_requests_size_); + for (int i = 0; i < (*request)->nbc_requests_size_; i++) { + if((*request)->buf_!=nullptr && (*request)->nbc_requests_[i]!=MPI_REQUEST_NULL){//reduce case + void * buf=(*request)->nbc_requests_[i]->buf_; + if((*request)->old_type_->flags() & DT_FLAG_DERIVED) + buf=(*request)->nbc_requests_[i]->old_buf_; + if((*request)->nbc_requests_[i]->flags_ & MPI_REQ_RECV ){ + if((*request)->op_!=MPI_OP_NULL){ + int count=(*request)->size_/ (*request)->old_type_->size(); + (*request)->op_->apply(buf, (*request)->buf_, &count, (*request)->old_type_); + } + smpi_free_tmp_buffer(static_cast(buf)); + } + } + if((*request)->nbc_requests_[i]!=MPI_REQUEST_NULL) + Request::unref(&((*request)->nbc_requests_[i])); + } + delete[] (*request)->nbc_requests_; + (*request)->nbc_requests_size_=0; + return ret; +} + void Request::finish_wait(MPI_Request* request, MPI_Status * status) { MPI_Request req = *request; @@ -869,6 +881,12 @@ void Request::finish_wait(MPI_Request* request, MPI_Status * status) return; } + if ((*request)->flags() & MPI_REQ_NBC){ + int ret = finish_nbc_requests(request); + if (ret != MPI_SUCCESS) + xbt_die("error when finishing non blocking collective requests"); + } + if ((req->flags_ & (MPI_REQ_PREPARED | MPI_REQ_GENERALIZED | MPI_REQ_FINISHED)) == 0) { if (status != MPI_STATUS_IGNORE) { if (req->src_== MPI_PROC_NULL || req->dst_== MPI_PROC_NULL){ @@ -969,32 +987,6 @@ int Request::wait(MPI_Request * request, MPI_Status * status) (*request)=MPI_REQUEST_NULL; return ret; } - // Are we waiting on a request meant for non blocking collectives ? - // If so, wait for all the subrequests. - if ((*request)->nbc_requests_size_>0){ - ret = waitall((*request)->nbc_requests_size_, (*request)->nbc_requests_, MPI_STATUSES_IGNORE); - for (int i = 0; i < (*request)->nbc_requests_size_; i++) { - if((*request)->buf_!=nullptr && (*request)->nbc_requests_[i]!=MPI_REQUEST_NULL){//reduce case - void * buf=(*request)->nbc_requests_[i]->buf_; - if((*request)->old_type_->flags() & DT_FLAG_DERIVED) - buf=(*request)->nbc_requests_[i]->old_buf_; - if((*request)->nbc_requests_[i]->flags_ & MPI_REQ_RECV ){ - if((*request)->op_!=MPI_OP_NULL){ - int count=(*request)->size_/ (*request)->old_type_->size(); - (*request)->op_->apply(buf, (*request)->buf_, &count, (*request)->old_type_); - } - smpi_free_tmp_buffer(static_cast(buf)); - } - } - if((*request)->nbc_requests_[i]!=MPI_REQUEST_NULL) - Request::unref(&((*request)->nbc_requests_[i])); - } - delete[] (*request)->nbc_requests_; - (*request)->nbc_requests_size_=0; - unref(request); - (*request)=MPI_REQUEST_NULL; - return ret; - } (*request)->print_request("Waiting"); if ((*request)->flags_ & (MPI_REQ_PREPARED | MPI_REQ_FINISHED)) { -- 2.20.1