From 7233604a5595de9f7bede52733f234615a26504c Mon Sep 17 00:00:00 2001 From: Augustin Degomme Date: Wed, 17 Apr 2019 13:49:06 +0200 Subject: [PATCH] Read_all, Write_all, Read_at_all, Write_at_all Imperfect algorithm, needs to be tested --- src/smpi/bindings/smpi_mpi.cpp | 8 +- src/smpi/bindings/smpi_pmpi_file.cpp | 79 +++++++++++++++++++ src/smpi/include/smpi_file.hpp | 113 +++++++++++++++++++++++++++ 3 files changed, 196 insertions(+), 4 deletions(-) diff --git a/src/smpi/bindings/smpi_mpi.cpp b/src/smpi/bindings/smpi_mpi.cpp index 30330d940b..16018837e1 100644 --- a/src/smpi/bindings/smpi_mpi.cpp +++ b/src/smpi/bindings/smpi_mpi.cpp @@ -359,17 +359,17 @@ UNIMPLEMENTED_WRAPPED_PMPI_CALL(int, MPI_File_get_info,(MPI_File fh, MPI_Info *i UNIMPLEMENTED_WRAPPED_PMPI_CALL(int, MPI_File_set_view,(MPI_File fh, MPI_Offset disp, MPI_Datatype etype, MPI_Datatype filetype, char *datarep, MPI_Info info), (fh, disp, etype, filetype, datarep, info)) UNIMPLEMENTED_WRAPPED_PMPI_CALL(int, MPI_File_get_view,(MPI_File fh, MPI_Offset *disp, MPI_Datatype *etype, MPI_Datatype *filetype, char *datarep), (fh, disp, etype, filetype, datarep)) WRAPPED_PMPI_CALL(int, MPI_File_read_at,(MPI_File fh, MPI_Offset offset, void *buf, int count, MPI_Datatype datatype, MPI_Status *status), (fh, offset, buf, count, datatype, status)) -UNIMPLEMENTED_WRAPPED_PMPI_CALL(int, MPI_File_read_at_all,(MPI_File fh, MPI_Offset offset, void *buf, int count, MPI_Datatype datatype, MPI_Status *status), (fh, offset, buf, count, datatype, status)) -UNIMPLEMENTED_WRAPPED_PMPI_CALL(int, MPI_File_write_at_all,(MPI_File fh, MPI_Offset offset, void *buf,int count, MPI_Datatype datatype, MPI_Status *status), (fh, offset, buf, count, datatype, status)) +WRAPPED_PMPI_CALL(int, MPI_File_read_at_all,(MPI_File fh, MPI_Offset offset, void *buf, int count, MPI_Datatype datatype, MPI_Status *status), (fh, offset, buf, count, datatype, status)) WRAPPED_PMPI_CALL(int, MPI_File_write_at,(MPI_File fh, MPI_Offset offset, void *buf, int count, MPI_Datatype datatype, MPI_Status *status), (fh, offset, buf, count, datatype, status)) +WRAPPED_PMPI_CALL(int, MPI_File_write_at_all,(MPI_File fh, MPI_Offset offset, void *buf,int count, MPI_Datatype datatype, MPI_Status *status), (fh, offset, buf, count, datatype, status)) UNIMPLEMENTED_WRAPPED_PMPI_CALL(int, MPI_File_iread_at,(MPI_File fh, MPI_Offset offset, void *buf, int count, MPI_Datatype datatype, MPI_Request *request), (fh, offset, buf, count, datatype, request)) UNIMPLEMENTED_WRAPPED_PMPI_CALL(int, MPI_File_iwrite_at,(MPI_File fh, MPI_Offset offset, void *buf,int count, MPI_Datatype datatype, MPI_Request *request), (fh, offset, buf, count, datatype, request)) UNIMPLEMENTED_WRAPPED_PMPI_CALL(int, MPI_File_iread_at_all,(MPI_File fh, MPI_Offset offset, void *buf, int count, MPI_Datatype datatype, MPI_Request *request), (fh, offset, buf, count, datatype, request)) UNIMPLEMENTED_WRAPPED_PMPI_CALL(int, MPI_File_iwrite_at_all,(MPI_File fh, MPI_Offset offset, void *buf,int count, MPI_Datatype datatype, MPI_Request *request), (fh, offset, buf, count, datatype, request)) WRAPPED_PMPI_CALL(int, MPI_File_read,(MPI_File fh, void *buf, int count,MPI_Datatype datatype, MPI_Status *status), (fh, buf, count, datatype, status)) -UNIMPLEMENTED_WRAPPED_PMPI_CALL(int, MPI_File_read_all,(MPI_File fh, void *buf, int count, MPI_Datatype datatype, MPI_Status *status), (fh, buf, count, datatype, status)) -UNIMPLEMENTED_WRAPPED_PMPI_CALL(int, MPI_File_write_all,(MPI_File fh, void *buf, int count,MPI_Datatype datatype, MPI_Status *status), (fh, buf, count, datatype, status)) +WRAPPED_PMPI_CALL(int, MPI_File_read_all,(MPI_File fh, void *buf, int count, MPI_Datatype datatype, MPI_Status *status), (fh, buf, count, datatype, status)) WRAPPED_PMPI_CALL(int, MPI_File_write,(MPI_File fh, void *buf, int count, MPI_Datatype datatype, MPI_Status *status), (fh, buf, count, datatype, status)) +WRAPPED_PMPI_CALL(int, MPI_File_write_all,(MPI_File fh, void *buf, int count,MPI_Datatype datatype, MPI_Status *status), (fh, buf, count, datatype, status)) UNIMPLEMENTED_WRAPPED_PMPI_CALL(int, MPI_File_iread,(MPI_File fh, void *buf, int count, MPI_Datatype datatype, MPI_Request *request), (fh, buf, count, datatype, request)) UNIMPLEMENTED_WRAPPED_PMPI_CALL(int, MPI_File_iwrite,(MPI_File fh, void *buf, int count, MPI_Datatype datatype, MPI_Request *request), (fh, buf, count, datatype, request)) UNIMPLEMENTED_WRAPPED_PMPI_CALL(int, MPI_File_iread_all,(MPI_File fh, void *buf, int count, MPI_Datatype datatype, MPI_Request *request), (fh, buf, count, datatype, request)) diff --git a/src/smpi/bindings/smpi_pmpi_file.cpp b/src/smpi/bindings/smpi_pmpi_file.cpp index 5844463120..607ce16303 100644 --- a/src/smpi/bindings/smpi_pmpi_file.cpp +++ b/src/smpi/bindings/smpi_pmpi_file.cpp @@ -99,6 +99,42 @@ int PMPI_File_write(MPI_File fh, void *buf, int count,MPI_Datatype datatype, MPI } } +int PMPI_File_read_all(MPI_File fh, void *buf, int count,MPI_Datatype datatype, MPI_Status *status){ + CHECK_FILE(fh); + CHECK_BUFFER(buf, count); + CHECK_COUNT(count); + CHECK_DATATYPE(datatype, count); + CHECK_STATUS(status); + CHECK_FLAGS(fh); + else { + smpi_bench_end(); + int rank_traced = simgrid::s4u::this_actor::get_pid(); + TRACE_smpi_comm_in(rank_traced, __func__, new simgrid::instr::CpuTIData("IO - read_all", static_cast(count*datatype->size()))); + int ret = fh->op_all(buf, count, datatype, status); + TRACE_smpi_comm_out(rank_traced); + smpi_bench_begin(); + return ret; + } +} + +int PMPI_File_write_all(MPI_File fh, void *buf, int count,MPI_Datatype datatype, MPI_Status *status){ + CHECK_FILE(fh); + CHECK_BUFFER(buf, count); + CHECK_COUNT(count); + CHECK_DATATYPE(datatype, count); + CHECK_STATUS(status); + CHECK_FLAGS(fh); + else { + smpi_bench_end(); + int rank_traced = simgrid::s4u::this_actor::get_pid(); + TRACE_smpi_comm_in(rank_traced, __func__, new simgrid::instr::CpuTIData("IO - write_all", static_cast(count*datatype->size()))); + int ret = fh->op_all(buf, count, datatype, status); + TRACE_smpi_comm_out(rank_traced); + smpi_bench_begin(); + return ret; + } +} + int PMPI_File_read_at(MPI_File fh, MPI_Offset offset, void *buf, int count,MPI_Datatype datatype, MPI_Status *status){ CHECK_FILE(fh); CHECK_BUFFER(buf, count); @@ -122,6 +158,27 @@ int PMPI_File_read_at(MPI_File fh, MPI_Offset offset, void *buf, int count,MPI_D } } +int PMPI_File_read_at_all(MPI_File fh, MPI_Offset offset, void *buf, int count,MPI_Datatype datatype, MPI_Status *status){ + CHECK_FILE(fh); + CHECK_BUFFER(buf, count); + CHECK_OFFSET(offset); + CHECK_COUNT(count); + CHECK_DATATYPE(datatype, count); + CHECK_STATUS(status); + CHECK_FLAGS(fh); + else { + smpi_bench_end(); + int rank_traced = simgrid::s4u::this_actor::get_pid(); + TRACE_smpi_comm_in(rank_traced, __func__, new simgrid::instr::CpuTIData("IO - read_at_all", static_cast(count*datatype->size()))); + int ret = fh->seek(offset,SEEK_SET); + if(ret!=MPI_SUCCESS) + return ret; + ret = fh->op_all(buf, count, datatype, status); + TRACE_smpi_comm_out(rank_traced); + smpi_bench_begin(); + return ret; + } +} int PMPI_File_write_at(MPI_File fh, MPI_Offset offset, void *buf, int count,MPI_Datatype datatype, MPI_Status *status){ CHECK_FILE(fh); @@ -146,6 +203,28 @@ int PMPI_File_write_at(MPI_File fh, MPI_Offset offset, void *buf, int count,MPI_ } } +int PMPI_File_write_at_all(MPI_File fh, MPI_Offset offset, void *buf, int count,MPI_Datatype datatype, MPI_Status *status){ + CHECK_FILE(fh); + CHECK_BUFFER(buf, count); + CHECK_OFFSET(offset); + CHECK_COUNT(count); + CHECK_DATATYPE(datatype, count); + CHECK_STATUS(status); + CHECK_FLAGS(fh); + else { + smpi_bench_end(); + int rank_traced = simgrid::s4u::this_actor::get_pid(); + TRACE_smpi_comm_in(rank_traced, __func__, new simgrid::instr::CpuTIData("IO - write_at_all", static_cast(count*datatype->size()))); + int ret = fh->seek(offset,SEEK_SET); + if(ret!=MPI_SUCCESS) + return ret; + ret = fh->op_all(buf, count, datatype, status); + TRACE_smpi_comm_out(rank_traced); + smpi_bench_begin(); + return ret; + } +} + int PMPI_File_delete(char *filename, MPI_Info info){ if (filename == nullptr) { return MPI_ERR_FILE; diff --git a/src/smpi/include/smpi_file.hpp b/src/smpi/include/smpi_file.hpp index 45552ca694..7b2cdff441 100644 --- a/src/smpi/include/smpi_file.hpp +++ b/src/smpi/include/smpi_file.hpp @@ -7,6 +7,11 @@ #ifndef SMPI_FILE_HPP_INCLUDED #define SMPI_FILE_HPP_INCLUDED #include "simgrid/plugins/file_system.h" +#include "smpi_comm.hpp" +#include "smpi_coll.hpp" +#include "smpi_datatype.hpp" +#include "smpi_info.hpp" +#include namespace simgrid{ @@ -25,9 +30,117 @@ class File{ int seek(MPI_Offset offset, int whence); static int read(MPI_File fh, void *buf, int count,MPI_Datatype datatype, MPI_Status *status); static int write(MPI_File fh, void *buf, int count,MPI_Datatype datatype, MPI_Status *status); + template int op_all(void *buf, int count,MPI_Datatype datatype, MPI_Status *status); static int close(MPI_File *fh); static int del(char *filename, MPI_Info info); }; + + template + int File::op_all(void *buf, int count, MPI_Datatype datatype, MPI_Status *status){ + //get min and max offsets from everyone. + int size = comm_->size(); + int rank = comm_-> rank(); + MPI_Offset min_offset = file_->tell(); + MPI_Offset max_offset = min_offset + count * datatype->size();//cheating, as we don't care about exact data location, we can skip extent + MPI_Offset* min_offsets = xbt_new(MPI_Offset, size); + MPI_Offset* max_offsets = xbt_new(MPI_Offset, size); + simgrid::smpi::Colls::allgather(&min_offset, 1, MPI_OFFSET, min_offsets, 1, MPI_OFFSET, comm_); + simgrid::smpi::Colls::allgather(&max_offset, 1, MPI_OFFSET, max_offsets, 1, MPI_OFFSET, comm_); + MPI_Offset min=min_offset; + MPI_Offset max=max_offset; + MPI_Offset tot= 0; + for(int i=0;imax) + max=max_offsets[i]; + } + MPI_Offset total = max-min; + if(total==tot && (datatype->flags() & DT_FLAG_CONTIGUOUS)){ + //contiguous. Just have each proc perform its read + return T(this,buf,count,datatype, status); + } + + //Interleaved case : How much do I need to read, and whom to send it ? + MPI_Offset my_chunk_start=(max-min)/size*rank; + MPI_Offset my_chunk_end=((max-min)/size*(rank+1))-1; + int* send_sizes = xbt_new0(int, size); + int* recv_sizes = xbt_new(int, size); + int* send_disps = xbt_new(int, size); + int* recv_disps = xbt_new(int, size); + int total_sent=0; + for(int i=0;i=min_offsets[i] && my_chunk_start < max_offsets[i])|| + ((my_chunk_end<=max_offsets[i]) && my_chunk_end> min_offsets[i])){ + send_sizes[i]=(std::min(max_offsets[i], my_chunk_end+1)-std::max(min_offsets[i], my_chunk_start)); + //store min and max offest to actually read + min_offset=std::max(min_offsets[i], my_chunk_start); + max_offset=std::min(max_offsets[i], my_chunk_end+1); + send_disps[i]=0;//send_sizes[i]; cheat to avoid issues when send>recv as we use recv buffer + total_sent+=send_sizes[i]; + } + } + + //merge the ranges of every process + std::vector> ranges; + for(int i=0; i> chunks; + chunks.push_back(ranges[0]); + + unsigned int nchunks=0; + unsigned int i=1; + while(i < ranges.size()){ + if(ranges[i].second>chunks[nchunks].second){ + // else range included - ignore + if(ranges[i].first>chunks[nchunks].second){ + //new disjoint range + chunks.push_back(ranges[i]); + nchunks++; + } else { + //merge ranges + chunks[nchunks].second=ranges[i].second; + } + } + i++; + } + //what do I need to read ? + MPI_Offset totreads=0; + for(i=0; i my_chunk_end) + continue; + else + totreads += (std::min(chunks[i].second, my_chunk_end+1)-std::max(chunks[i].first, my_chunk_start)); + } + char* sendbuf= static_cast(smpi_get_tmp_sendbuffer(totreads)); + + if(totreads>0){ + seek(min_offset, MPI_SEEK_SET); + T(this,sendbuf,totreads/datatype->size(),datatype, status); + } + simgrid::smpi::Colls::alltoall(send_sizes, 1, MPI_INT, recv_sizes, 1, MPI_INT, comm_); + int total_recv=0; + for(int i=0;i