X-Git-Url: http://bilbo.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/2750bde086c69304587076350db922bfd9da04f6..fff2630cdfff384800b2002b20553788fd78bad1:/src/smpi/mpi/smpi_request.cpp diff --git a/src/smpi/mpi/smpi_request.cpp b/src/smpi/mpi/smpi_request.cpp index 99badd85ae..93098dc510 100644 --- a/src/smpi/mpi/smpi_request.cpp +++ b/src/smpi/mpi/smpi_request.cpp @@ -1,4 +1,4 @@ -/* Copyright (c) 2007-2020. The SimGrid Team. All rights reserved. */ +/* Copyright (c) 2007-2021. The SimGrid Team. All rights reserved. */ /* This program is free software; you can redistribute it and/or modify it * under the terms of the license (GNU LGPL) which comes with this package. */ @@ -18,6 +18,7 @@ #include "src/smpi/include/smpi_actor.hpp" #include +#include XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_request, smpi, "Logging specific to SMPI (request)"); @@ -61,7 +62,6 @@ Request::Request(const void* buf, int count, MPI_Datatype datatype, int src, int else refcount_ = 0; cancelled_ = 0; - generalized_funcs=nullptr; nbc_requests_=nullptr; nbc_requests_size_=0; init_buffer(count); @@ -82,7 +82,6 @@ void Request::unref(MPI_Request* request) if((*request)->refcount_==0){ if ((*request)->flags_ & MPI_REQ_GENERALIZED){ ((*request)->generalized_funcs)->free_fn(((*request)->generalized_funcs)->extra_state); - delete (*request)->generalized_funcs; }else{ Comm::unref((*request)->comm_); Datatype::unref((*request)->old_type_); @@ -119,8 +118,9 @@ bool Request::match_common(MPI_Request req, MPI_Request sender, MPI_Request rece receiver->real_src_ = sender->src_; if (receiver->tag_ == MPI_ANY_TAG) receiver->real_tag_ = sender->tag_; - if (receiver->real_size_ < sender->real_size_) + if (receiver->real_size_ < sender->real_size_){ receiver->truncated_ = true; + } if (sender->detached_) receiver->detached_sender_ = sender; // tie the sender to the receiver, as it is detached and has to be freed in // the receiver @@ -337,8 +337,8 @@ void Request::sendrecv(const void *sendbuf, int sendcount, MPI_Datatype sendtype void *recvbuf, int recvcount, MPI_Datatype recvtype, int src, int recvtag, MPI_Comm comm, MPI_Status * status) { - MPI_Request requests[2]; - MPI_Status stats[2]; + std::array requests; + std::array stats; int myid = simgrid::s4u::this_actor::get_pid(); if ((comm->group()->actor(dst)->get_pid() == myid) && (comm->group()->actor(src)->get_pid() == myid)) { Datatype::copy(sendbuf, sendcount, sendtype, recvbuf, recvcount, recvtype); @@ -352,8 +352,8 @@ void Request::sendrecv(const void *sendbuf, int sendcount, MPI_Datatype sendtype } requests[0] = isend_init(sendbuf, sendcount, sendtype, dst, sendtag, comm); requests[1] = irecv_init(recvbuf, recvcount, recvtype, src, recvtag, comm); - startall(2, requests); - waitall(2, requests, stats); + startall(2, requests.data()); + waitall(2, requests.data(), stats.data()); unref(&requests[0]); unref(&requests[1]); if(status != MPI_STATUS_IGNORE) { @@ -376,6 +376,8 @@ void Request::start() flags_ &= ~MPI_REQ_FINISHED; this->ref(); + // we make a copy here, as the size is modified by simix, and we may reuse the request in another receive later + real_size_=size_; if ((flags_ & MPI_REQ_RECV) != 0) { this->print_request("New recv"); @@ -419,8 +421,6 @@ void Request::start() } } - // we make a copy here, as the size is modified by simix, and we may reuse the request in another receive later - real_size_=size_; action_ = simcall_comm_irecv( process->get_actor()->get_impl(), mailbox->get_impl(), buf_, &real_size_, &match_recv, process->replaying() ? &smpi_comm_null_copy_buffer_callback : smpi_comm_copy_data_callback, this, -1.0); @@ -511,8 +511,6 @@ void Request::start() XBT_DEBUG("Send request %p is in the large mailbox %s (buf: %p)", this, mailbox->get_cname(), buf_); } - // we make a copy here, as the size is modified by simix, and we may reuse the request in another receive later - real_size_=size_; size_t payload_size_ = size_ + 16;//MPI enveloppe size (tag+dest+communicator) action_ = simcall_comm_isend( simgrid::s4u::Actor::by_pid(src_)->get_impl(), mailbox->get_impl(), payload_size_, -1.0, buf, real_size_, &match_send, @@ -552,11 +550,13 @@ void Request::cancel() } int Request::test(MPI_Request * request, MPI_Status * status, int* flag) { - //assume that request is not MPI_REQUEST_NULL (filtered in PMPI_Test or testall before) + // assume that *request is not MPI_REQUEST_NULL (filtered in PMPI_Test or testall before) // to avoid deadlocks if used as a break condition, such as // while (MPI_Test(request, flag, status) && flag) dostuff... // because the time will not normally advance when only calls to MPI_Test are made -> deadlock // multiplier to the sleeptime, to increase speed of execution, each failed test will increase it + xbt_assert(*request != MPI_REQUEST_NULL); + static int nsleeps = 1; int ret = MPI_SUCCESS; @@ -586,23 +586,20 @@ int Request::test(MPI_Request * request, MPI_Status * status, int* flag) { return ret; } } - if (*request != MPI_REQUEST_NULL && - ((*request)->flags_ & MPI_REQ_GENERALIZED) - && !((*request)->flags_ & MPI_REQ_COMPLETE)) + if (((*request)->flags_ & MPI_REQ_GENERALIZED) && !((*request)->flags_ & MPI_REQ_COMPLETE)) *flag=0; if (*flag) { - finish_wait(request,status); + finish_wait(request, status); // may invalidate *request if (*request != MPI_REQUEST_NULL && ((*request)->flags_ & MPI_REQ_GENERALIZED)){ + MPI_Status tmp_status; MPI_Status* mystatus; - if(status==MPI_STATUS_IGNORE){ - mystatus=new MPI_Status(); + if (status == MPI_STATUS_IGNORE) { + mystatus = &tmp_status; Status::empty(mystatus); - }else{ - mystatus=status; + } else { + mystatus = status; } ret = ((*request)->generalized_funcs)->query_fn(((*request)->generalized_funcs)->extra_state, mystatus); - if(status==MPI_STATUS_IGNORE) - delete mystatus; } nsleeps=1;//reset the number of sleeps we will do next time if (*request != MPI_REQUEST_NULL && ((*request)->flags_ & MPI_REQ_PERSISTENT) == 0) @@ -685,16 +682,15 @@ int Request::testany(int count, MPI_Request requests[], int *index, int* flag, M } else { finish_wait(&requests[*index],status); if (requests[*index] != MPI_REQUEST_NULL && (requests[*index]->flags_ & MPI_REQ_GENERALIZED)){ + MPI_Status tmp_status; MPI_Status* mystatus; - if(status==MPI_STATUS_IGNORE){ - mystatus=new MPI_Status(); + if (status == MPI_STATUS_IGNORE) { + mystatus = &tmp_status; Status::empty(mystatus); - }else{ - mystatus=status; + } else { + mystatus = status; } ret=(requests[*index]->generalized_funcs)->query_fn((requests[*index]->generalized_funcs)->extra_state, mystatus); - if(status==MPI_STATUS_IGNORE) - delete mystatus; } if (requests[*index] != MPI_REQUEST_NULL && (requests[*index]->flags_ & MPI_REQ_NON_PERSISTENT)) @@ -895,11 +891,31 @@ void Request::finish_wait(MPI_Request* request, MPI_Status * status) if (req->flags_ & MPI_REQ_PERSISTENT) req->action_ = nullptr; req->flags_ |= MPI_REQ_FINISHED; + + if (req->truncated_) { + char error_string[MPI_MAX_ERROR_STRING]; + int error_size; + PMPI_Error_string(MPI_ERR_TRUNCATE, error_string, &error_size); + MPI_Errhandler err = (req->comm_) ? (req->comm_)->errhandler() : MPI_ERRHANDLER_NULL; + if (err == MPI_ERRHANDLER_NULL || err == MPI_ERRORS_RETURN) + XBT_WARN("recv - returned %.*s instead of MPI_SUCCESS", error_size, error_string); + else if (err == MPI_ERRORS_ARE_FATAL) + xbt_die("recv - returned %.*s instead of MPI_SUCCESS", error_size, error_string); + else + err->call((req->comm_), MPI_ERR_TRUNCATE); + if (err != MPI_ERRHANDLER_NULL) + simgrid::smpi::Errhandler::unref(err); + MC_assert(not MC_is_active()); /* Only fail in MC mode */ + } unref(request); + } int Request::wait(MPI_Request * request, MPI_Status * status) { + // assume that *request is not MPI_REQUEST_NULL (filtered in PMPI_Wait before) + xbt_assert(*request != MPI_REQUEST_NULL); + int ret=MPI_SUCCESS; // Are we waiting on a request meant for non blocking collectives ? // If so, wait for all the subrequests. @@ -943,25 +959,24 @@ int Request::wait(MPI_Request * request, MPI_Status * status) } } - if (*request != MPI_REQUEST_NULL && ((*request)->flags_ & MPI_REQ_GENERALIZED)){ - MPI_Status* mystatus; + if ((*request)->flags_ & MPI_REQ_GENERALIZED) { if(!((*request)->flags_ & MPI_REQ_COMPLETE)){ ((*request)->generalized_funcs)->mutex->lock(); ((*request)->generalized_funcs)->cond->wait(((*request)->generalized_funcs)->mutex); ((*request)->generalized_funcs)->mutex->unlock(); - } - if(status==MPI_STATUS_IGNORE){ - mystatus=new MPI_Status(); + } + MPI_Status tmp_status; + MPI_Status* mystatus; + if (status == MPI_STATUS_IGNORE) { + mystatus = &tmp_status; Status::empty(mystatus); - }else{ - mystatus=status; + } else { + mystatus = status; } ret = ((*request)->generalized_funcs)->query_fn(((*request)->generalized_funcs)->extra_state, mystatus); - if(status==MPI_STATUS_IGNORE) - delete mystatus; } - finish_wait(request,status); + finish_wait(request, status); // may invalidate *request if (*request != MPI_REQUEST_NULL && (((*request)->flags_ & MPI_REQ_NON_PERSISTENT) != 0)) *request = MPI_REQUEST_NULL; return ret; @@ -969,12 +984,11 @@ int Request::wait(MPI_Request * request, MPI_Status * status) int Request::waitany(int count, MPI_Request requests[], MPI_Status * status) { - std::vector comms; - comms.reserve(count); int index = MPI_UNDEFINED; if(count > 0) { // Wait for a request to complete + std::vector comms; std::vector map; XBT_DEBUG("Wait for one of %d", count); for(int i = 0; i < count; i++) { @@ -986,7 +1000,7 @@ int Request::waitany(int count, MPI_Request requests[], MPI_Status * status) map.push_back(i); } else { // This is a finished detached request, let's return this one - comms.clear(); // so we free don't do the waitany call + comms.clear(); // don't do the waitany call afterwards index = i; finish_wait(&requests[i], status); // cleanup if refcount = 0 if (requests[i] != MPI_REQUEST_NULL && (requests[i]->flags_ & MPI_REQ_NON_PERSISTENT)) @@ -997,13 +1011,12 @@ int Request::waitany(int count, MPI_Request requests[], MPI_Status * status) } if (not comms.empty()) { XBT_DEBUG("Enter waitany for %zu comms", comms.size()); - int i=MPI_UNDEFINED; + int i; try{ - // this is not a detached send i = simcall_comm_waitany(comms.data(), comms.size(), -1); } catch (const Exception&) { - XBT_INFO("request %d cancelled ", i); - return i; + XBT_INFO("request cancelled"); + i = -1; } // not MPI_UNDEFINED, as this is a simix return code @@ -1117,17 +1130,15 @@ int Request::waitsome(int incount, MPI_Request requests[], int *indices, MPI_Sta MPI_Request Request::f2c(int id) { - char key[KEY_SIZE]; if(id==MPI_FORTRAN_REQUEST_NULL) return MPI_REQUEST_NULL; - return static_cast(F2C::f2c_lookup()->at(get_key(key,id))); + return static_cast(F2C::lookup()->at(id)); } void Request::free_f(int id) { if (id != MPI_FORTRAN_REQUEST_NULL) { - char key[KEY_SIZE]; - F2C::f2c_lookup()->erase(get_key(key, id)); + F2C::lookup()->erase(id); } } @@ -1166,7 +1177,7 @@ int Request::grequest_start(MPI_Grequest_query_function* query_fn, MPI_Grequest_ (*request)->flags_ |= MPI_REQ_GENERALIZED; (*request)->flags_ |= MPI_REQ_PERSISTENT; (*request)->refcount_ = 1; - ((*request)->generalized_funcs) = new s_smpi_mpi_generalized_request_funcs_t; + ((*request)->generalized_funcs) = std::make_unique(); ((*request)->generalized_funcs)->query_fn=query_fn; ((*request)->generalized_funcs)->free_fn=free_fn; ((*request)->generalized_funcs)->cancel_fn=cancel_fn; @@ -1178,7 +1189,7 @@ int Request::grequest_start(MPI_Grequest_query_function* query_fn, MPI_Grequest_ int Request::grequest_complete(MPI_Request request) { - if ((!(request->flags_ & MPI_REQ_GENERALIZED)) || request->generalized_funcs->mutex==NULL) + if ((!(request->flags_ & MPI_REQ_GENERALIZED)) || request->generalized_funcs->mutex == nullptr) return MPI_ERR_REQUEST; request->generalized_funcs->mutex->lock(); request->flags_ |= MPI_REQ_COMPLETE; // in case wait would be called after complete