X-Git-Url: http://bilbo.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/97e2219ed6c0e511f6165460cec79afadf42f589..b2852b7c61948f495d7437ffaa7fd9aced12849c:/src/smpi/mpi/smpi_file.cpp diff --git a/src/smpi/mpi/smpi_file.cpp b/src/smpi/mpi/smpi_file.cpp index 531832243d..47c73ebf2f 100644 --- a/src/smpi/mpi/smpi_file.cpp +++ b/src/smpi/mpi/smpi_file.cpp @@ -1,4 +1,4 @@ -/* Copyright (c) 2007-2022. The SimGrid Team. All rights reserved. */ +/* Copyright (c) 2007-2023. The SimGrid Team. All rights reserved. */ /* This program is free software; you can redistribute it and/or modify it * under the terms of the license (GNU LGPL) which comes with this package. */ @@ -17,14 +17,15 @@ #include "simgrid/s4u/Host.hpp" #include "simgrid/plugins/file_system.h" +#include // std::scoped_lock + #define FP_SIZE sizeof(MPI_Offset) XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_io, smpi, "Logging specific to SMPI (RMA operations)"); MPI_Errhandler SMPI_default_File_Errhandler = _smpi_cfg_default_errhandler_is_error ? MPI_ERRORS_ARE_FATAL : MPI_ERRORS_RETURN;; -namespace simgrid{ -namespace smpi{ +namespace simgrid::smpi { File::File(MPI_Comm comm, const char* filename, int amode, MPI_Info info) : comm_(comm), flags_(amode), info_(info) { @@ -34,10 +35,9 @@ File::File(MPI_Comm comm, const char* filename, int amode, MPI_Info info) : comm xbt_assert(not simgrid::s4u::Host::current()->get_disks().empty(), "SMPI/IO : Trying to open file on a diskless host ! Add one to your platform file"); - size_t found = fullname.find('/'); // in case no fullpath is provided ... just pick the first mountpoint. - if (found == std::string::npos || fullname.rfind("./", 1) != std::string::npos) { - auto disk = simgrid::s4u::Host::current()->get_disks().front(); + if (size_t found = fullname.find('/'); found == std::string::npos || fullname.rfind("./", 1) != std::string::npos) { + const auto* disk = simgrid::s4u::Host::current()->get_disks().front(); std::string mount; if (disk->get_host() != simgrid::s4u::Host::current()) mount = disk->extension()->get_mount_point(disk->get_host()); @@ -51,15 +51,17 @@ File::File(MPI_Comm comm, const char* filename, int amode, MPI_Info info) : comm fullname.insert(0, mount); } } - + XBT_DEBUG("Opening %s", fullname.c_str()); file_ = simgrid::s4u::File::open(fullname, nullptr); list_ = nullptr; + disp_ = 0; + etype_ = MPI_BYTE; + atomicity_ = true; if (comm_->rank() == 0) { int size = comm_->size() + FP_SIZE; - list_ = new char[size]; + list_ = new char[size](); errhandler_ = SMPI_default_File_Errhandler; errhandler_->ref(); - memset(list_, 0, size); shared_file_pointer_ = new MPI_Offset(); shared_mutex_ = s4u::Mutex::create(); *shared_file_pointer_ = 0; @@ -110,15 +112,14 @@ int File::del(const char* filename, const Info*) int File::get_position(MPI_Offset* offset) const { - *offset = file_->tell(); + *offset = file_->tell()/etype_->get_extent(); return MPI_SUCCESS; } int File::get_position_shared(MPI_Offset* offset) const { - shared_mutex_->lock(); - *offset = *shared_file_pointer_; - shared_mutex_->unlock(); + const std::scoped_lock lock(*shared_mutex_); + *offset = *shared_file_pointer_/etype_->get_extent(); return MPI_SUCCESS; } @@ -145,10 +146,9 @@ int File::seek(MPI_Offset offset, int whence) int File::seek_shared(MPI_Offset offset, int whence) { - shared_mutex_->lock(); + const std::scoped_lock lock(*shared_mutex_); seek(offset, whence); *shared_file_pointer_ = file_->tell(); - shared_mutex_->unlock(); return MPI_SUCCESS; } @@ -158,9 +158,9 @@ int File::read(MPI_File fh, void* /*buf*/, int count, const Datatype* datatype, MPI_Offset position = fh->file_->tell(); MPI_Offset movesize = datatype->get_extent() * count; MPI_Offset readsize = datatype->size() * count; - XBT_DEBUG("Position before read in MPI_File %s : %llu", fh->file_->get_path(), fh->file_->tell()); + XBT_DEBUG("Position before read in MPI_File %s : %llu, size %llu", fh->file_->get_path(), fh->file_->tell(), fh->file_->size()); MPI_Offset read = fh->file_->read(readsize); - XBT_VERB("Read in MPI_File %s, %lld bytes read, readsize %lld bytes, movesize %lld", fh->file_->get_path(), read, + XBT_VERB("Read in MPI_File %s, %lld bytes read, count %d, readsize %lld bytes, movesize %lld", fh->file_->get_path(), read, count, readsize, movesize); if (readsize != movesize) { fh->file_->seek(position + movesize, SEEK_SET); @@ -183,11 +183,12 @@ int File::read(MPI_File fh, void* /*buf*/, int count, const Datatype* datatype, /* }*/ int File::read_shared(MPI_File fh, void* buf, int count, const Datatype* datatype, MPI_Status* status) { - fh->shared_mutex_->lock(); + if (const std::scoped_lock lock(*fh->shared_mutex_); true) { + fh->seek(*(fh->shared_file_pointer_), MPI_SEEK_SET); + read(fh, buf, count, datatype, status); + *(fh->shared_file_pointer_) = fh->file_->tell(); + } fh->seek(*(fh->shared_file_pointer_), MPI_SEEK_SET); - read(fh, buf, count, datatype, status); - *(fh->shared_file_pointer_) = fh->file_->tell(); - fh->shared_mutex_->unlock(); return MPI_SUCCESS; } @@ -203,15 +204,17 @@ int File::read_ordered(MPI_File fh, void* buf, int count, const Datatype* dataty MPI_Offset result; simgrid::smpi::colls::scan(&val, &result, 1, MPI_OFFSET, MPI_SUM, fh->comm_); + MPI_Offset prev; + fh->get_position(&prev); fh->seek(result, MPI_SEEK_SET); int ret = fh->op_all(buf, count, datatype, status); if (fh->comm_->rank() == fh->comm_->size() - 1) { - fh->shared_mutex_->lock(); + const std::scoped_lock lock(*fh->shared_mutex_); *(fh->shared_file_pointer_)=fh->file_->tell(); - fh->shared_mutex_->unlock(); } char c; simgrid::smpi::colls::bcast(&c, 1, MPI_BYTE, fh->comm_->size() - 1, fh->comm_); + fh->seek(prev, MPI_SEEK_SET); return ret; } @@ -221,10 +224,10 @@ int File::write(MPI_File fh, void* /*buf*/, int count, const Datatype* datatype, MPI_Offset position = fh->file_->tell(); MPI_Offset movesize = datatype->get_extent() * count; MPI_Offset writesize = datatype->size() * count; - XBT_DEBUG("Position before write in MPI_File %s : %llu", fh->file_->get_path(), fh->file_->tell()); + XBT_DEBUG("Position before write in MPI_File %s : %llu, size %llu", fh->file_->get_path(), fh->file_->tell(), fh->file_->size()); MPI_Offset write = fh->file_->write(writesize, true); - XBT_VERB("Write in MPI_File %s, %lld bytes written, readsize %lld bytes, movesize %lld", fh->file_->get_path(), write, - writesize, movesize); + XBT_VERB("Write in MPI_File %s, %lld bytes written, count %d, writesize %lld bytes, movesize %lld", fh->file_->get_path(), write, + count, writesize, movesize); if (writesize != movesize) { fh->file_->seek(position + movesize, SEEK_SET); } @@ -236,13 +239,13 @@ int File::write(MPI_File fh, void* /*buf*/, int count, const Datatype* datatype, int File::write_shared(MPI_File fh, const void* buf, int count, const Datatype* datatype, MPI_Status* status) { - fh->shared_mutex_->lock(); + const std::scoped_lock lock(*fh->shared_mutex_); XBT_DEBUG("Write shared on %s - Shared ptr before : %lld", fh->file_->get_path(), *(fh->shared_file_pointer_)); fh->seek(*(fh->shared_file_pointer_), MPI_SEEK_SET); write(fh, const_cast(buf), count, datatype, status); *(fh->shared_file_pointer_) = fh->file_->tell(); XBT_DEBUG("Write shared on %s - Shared ptr after : %lld", fh->file_->get_path(), *(fh->shared_file_pointer_)); - fh->shared_mutex_->unlock(); + fh->seek(*(fh->shared_file_pointer_), MPI_SEEK_SET); return MPI_SUCCESS; } @@ -257,29 +260,39 @@ int File::write_ordered(MPI_File fh, const void* buf, int count, const Datatype* } MPI_Offset result; simgrid::smpi::colls::scan(&val, &result, 1, MPI_OFFSET, MPI_SUM, fh->comm_); + MPI_Offset prev; + fh->get_position(&prev); fh->seek(result, MPI_SEEK_SET); int ret = fh->op_all(const_cast(buf), count, datatype, status); if (fh->comm_->rank() == fh->comm_->size() - 1) { - fh->shared_mutex_->lock(); + const std::scoped_lock lock(*fh->shared_mutex_); *(fh->shared_file_pointer_)=fh->file_->tell(); - fh->shared_mutex_->unlock(); } char c; simgrid::smpi::colls::bcast(&c, 1, MPI_BYTE, fh->comm_->size() - 1, fh->comm_); + fh->seek(prev, MPI_SEEK_SET); return ret; } -int File::set_view(MPI_Offset /*disp*/, MPI_Datatype etype, MPI_Datatype filetype, const char* datarep, const Info*) +int File::set_view(MPI_Offset disp, MPI_Datatype etype, MPI_Datatype filetype, const char* datarep, const Info*) { etype_ = etype; filetype_ = filetype; - datarep_ = std::string(datarep); - seek_shared(0, MPI_SEEK_SET); + datarep_ = datarep; + disp_ = disp; + if (comm_->rank() == 0){ + if(disp != MPI_DISPLACEMENT_CURRENT) + seek_shared(disp, MPI_SEEK_SET); + else + seek_shared(0, MPI_SEEK_CUR); + } + sync(); return MPI_SUCCESS; } -int File::get_view(MPI_Offset* /*disp*/, MPI_Datatype* etype, MPI_Datatype* filetype, char* datarep) const +int File::get_view(MPI_Offset* disp, MPI_Datatype* etype, MPI_Datatype* filetype, char* datarep) const { + *disp = disp_; *etype = etype_; *filetype = filetype_; snprintf(datarep, MPI_MAX_NAME_STRING + 1, "%s", datarep_.c_str()); @@ -301,6 +314,11 @@ int File::flags() const return flags_; } +MPI_Datatype File::etype() const +{ + return etype_; +} + int File::sync() { // no idea @@ -346,5 +364,14 @@ File* File::f2c(int id) { return static_cast(F2C::f2c(id)); } -} // namespace smpi -} // namespace simgrid + +void File::set_atomicity(bool a){ + atomicity_ = a; +} + +bool File::get_atomicity() const +{ + return atomicity_; +} + +} // namespace simgrid::smpi