Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
read, seek + init
authordegomme <adegomme@users.noreply.github.com>
Mon, 15 Apr 2019 14:02:38 +0000 (16:02 +0200)
committerdegomme <adegomme@users.noreply.github.com>
Mon, 15 Apr 2019 14:02:38 +0000 (16:02 +0200)
TODO: properly init plugin, only when needed.

src/smpi/bindings/smpi_mpi.cpp
src/smpi/bindings/smpi_pmpi_file.cpp
src/smpi/include/smpi_file.hpp
src/smpi/internals/instr_smpi.cpp
src/smpi/internals/smpi_global.cpp
src/smpi/mpi/smpi_file.cpp

index 0305b39..83386ad 100644 (file)
@@ -366,7 +366,7 @@ UNIMPLEMENTED_WRAPPED_PMPI_CALL(int, MPI_File_iread_at,(MPI_File fh, MPI_Offset
 UNIMPLEMENTED_WRAPPED_PMPI_CALL(int, MPI_File_iwrite_at,(MPI_File fh, MPI_Offset offset, void *buf,int count, MPI_Datatype datatype, MPI_Request *request), (fh, offset, buf, count, datatype, request))
 UNIMPLEMENTED_WRAPPED_PMPI_CALL(int, MPI_File_iread_at_all,(MPI_File fh, MPI_Offset offset, void *buf, int count, MPI_Datatype datatype, MPI_Request *request), (fh, offset, buf, count, datatype, request))
 UNIMPLEMENTED_WRAPPED_PMPI_CALL(int, MPI_File_iwrite_at_all,(MPI_File fh, MPI_Offset offset, void *buf,int count, MPI_Datatype datatype, MPI_Request *request), (fh, offset, buf, count, datatype, request))
-UNIMPLEMENTED_WRAPPED_PMPI_CALL(int, MPI_File_read,(MPI_File fh, void *buf, int count,MPI_Datatype datatype, MPI_Status *status), (fh, buf, count, datatype, status))
+WRAPPED_PMPI_CALL(int, MPI_File_read,(MPI_File fh, void *buf, int count,MPI_Datatype datatype, MPI_Status *status), (fh, buf, count, datatype, status))
 UNIMPLEMENTED_WRAPPED_PMPI_CALL(int, MPI_File_read_all,(MPI_File fh, void *buf, int count, MPI_Datatype datatype, MPI_Status *status), (fh, buf, count, datatype, status))
 UNIMPLEMENTED_WRAPPED_PMPI_CALL(int, MPI_File_write,(MPI_File fh, void *buf, int count, MPI_Datatype datatype, MPI_Status *status), (fh, buf, count, datatype, status))
 UNIMPLEMENTED_WRAPPED_PMPI_CALL(int, MPI_File_write_all,(MPI_File fh, void *buf, int count,MPI_Datatype datatype, MPI_Status *status), (fh, buf, count, datatype, status))
@@ -374,7 +374,7 @@ UNIMPLEMENTED_WRAPPED_PMPI_CALL(int, MPI_File_iread,(MPI_File fh, void *buf, int
 UNIMPLEMENTED_WRAPPED_PMPI_CALL(int, MPI_File_iwrite,(MPI_File fh, void *buf, int count, MPI_Datatype datatype, MPI_Request *request), (fh, buf, count, datatype, request))
 UNIMPLEMENTED_WRAPPED_PMPI_CALL(int, MPI_File_iread_all,(MPI_File fh, void *buf, int count, MPI_Datatype datatype, MPI_Request *request), (fh, buf, count, datatype, request))
 UNIMPLEMENTED_WRAPPED_PMPI_CALL(int, MPI_File_iwrite_all,(MPI_File fh, void *buf, int count, MPI_Datatype datatype, MPI_Request *request), (fh, buf, count, datatype, request))
-UNIMPLEMENTED_WRAPPED_PMPI_CALL(int, MPI_File_seek,(MPI_File fh, MPI_Offset offset, int whenace), (fh, offset, whenace))
+WRAPPED_PMPI_CALL(int, MPI_File_seek,(MPI_File fh, MPI_Offset offset, int whenace), (fh, offset, whenace))
 UNIMPLEMENTED_WRAPPED_PMPI_CALL(int, MPI_File_get_position,(MPI_File fh, MPI_Offset *offset), (fh, offset))
 UNIMPLEMENTED_WRAPPED_PMPI_CALL(int, MPI_File_get_byte_offset,(MPI_File fh, MPI_Offset offset, MPI_Offset *disp), (fh, offset, disp))
 UNIMPLEMENTED_WRAPPED_PMPI_CALL(int, MPI_File_read_shared,(MPI_File fh, void *buf, int count, MPI_Datatype datatype, MPI_Status *status), (fh, buf, count, datatype, status))
index 8af99f1..5c14cdf 100644 (file)
@@ -5,6 +5,7 @@
 \r
 #include "private.hpp"\r
 #include "smpi_file.hpp"\r
+#include "smpi_datatype.hpp"\r
 \r
 XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(smpi_pmpi);\r
 \r
@@ -19,10 +20,13 @@ int PMPI_File_open(MPI_Comm comm, char *filename, int amode, MPI_Info info, MPI_
     smpi_bench_end();\r
     *fh =  new simgrid::smpi::File(comm, filename, amode, info);\r
     smpi_bench_begin();\r
-    if((*fh)->size()==0 && not amode & MPI_MODE_CREATE){\r
+    if (((*fh)->size() == 0 && (not amode & MPI_MODE_CREATE)) ||\r
+       ((*fh)->size() != 0 && (amode & MPI_MODE_EXCL))){\r
       delete fh;\r
       return MPI_ERR_AMODE;\r
     }\r
+    if(amode & MPI_MODE_APPEND)\r
+      (*fh)->seek(0,MPI_SEEK_END);\r
     return MPI_SUCCESS;\r
   }\r
 }\r
@@ -39,6 +43,42 @@ int PMPI_File_close(MPI_File *fh){
   }\r
 }\r
 \r
+int PMPI_File_seek(MPI_File fh, MPI_Offset offset, int whence){\r
+  if (fh==MPI_FILE_NULL){\r
+    return MPI_ERR_FILE;\r
+  } else {\r
+    smpi_bench_end();\r
+    int ret = fh->seek(offset,whence);\r
+    smpi_bench_begin();\r
+    return ret;\r
+  }\r
+}\r
+\r
+int PMPI_File_read(MPI_File fh, void *buf, int count,MPI_Datatype datatype, MPI_Status *status){\r
+  if (fh==MPI_FILE_NULL){\r
+    return MPI_ERR_FILE;\r
+  } else if (buf==nullptr && count > 0){\r
+    return MPI_ERR_BUFFER;\r
+  } else if ( count < 0){\r
+    return MPI_ERR_COUNT;\r
+  } else if ( datatype == MPI_DATATYPE_NULL && count > 0){\r
+    return MPI_ERR_TYPE;\r
+  } else if (status == nullptr){\r
+    return MPI_ERR_ARG;\r
+  } else if (fh->flags() & MPI_MODE_SEQUENTIAL){\r
+    return MPI_ERR_AMODE;\r
+  } else {\r
+    smpi_bench_end();\r
+    int rank_traced = simgrid::s4u::this_actor::get_pid();\r
+    TRACE_smpi_comm_in(rank_traced, __func__, new simgrid::instr::CpuTIData("IO - read", static_cast<double>(count*datatype->size())));\r
+    int ret = fh->read(buf, count, datatype, status);\r
+    TRACE_smpi_comm_out(rank_traced);\r
+    smpi_bench_begin();\r
+    return ret;\r
+  }\r
+}\r
+\r
+\r
 int PMPI_File_delete(char *filename, MPI_Info info){\r
   if (filename == nullptr) {\r
     return MPI_ERR_FILE;\r
@@ -48,4 +88,5 @@ int PMPI_File_delete(char *filename, MPI_Info info){
     smpi_bench_begin();\r
     return ret;\r
   }\r
-}
\ No newline at end of file
+}\r
+\r
index 9236747..9172071 100644 (file)
@@ -22,9 +22,11 @@ class File{
   int size();\r
   int flags();\r
   int sync();\r
+  int seek(MPI_Offset offset, int whence);\r
+  int read(void *buf, int count,MPI_Datatype datatype, MPI_Status *status);\r
   static int close(MPI_File *fh);\r
   static int del(char *filename, MPI_Info info);\r
 };\r
 }\r
 }\r
-#endif
\ No newline at end of file
+#endif\r
index 4d3ab5a..50112e0 100644 (file)
@@ -74,7 +74,9 @@ static std::map<std::string, std::string> smpi_colors = {{"recv", "1 0 0"},
   {"win_flush", "1 0 0.3"},
   {"win_flush_local", "1 0 0.8"},
   {"win_flush_all", "1 0.8 0"},
-  {"win_flush_local_all", "1 0 0.3"}
+  {"win_flush_local_all", "1 0 0.3"},
+  
+  {"file_read", "1 1 0.3"}
 };
 
 static const char* instr_find_color(const char* c_state)
index 9ad0c6d..5f0e988 100644 (file)
@@ -5,6 +5,7 @@
 
 #include "mc/mc.h"
 #include "simgrid/s4u/Engine.hpp"
+#include "simgrid/plugins/file_system.h"
 #include "smpi_coll.hpp"
 #include "smpi_f2c.hpp"
 #include "smpi_host.hpp"
@@ -663,7 +664,7 @@ int smpi_main(const char* executable, int argc, char* argv[])
   SIMIX_global_init(&argc, argv);
 
   SMPI_switch_data_segment = &smpi_switch_data_segment;
-
+  sg_storage_file_system_init();
   // parse the platform file: get the host list
   simgrid::s4u::Engine::get_instance()->load_platform(argv[1]);
   SIMIX_comm_set_copy_data_callback(smpi_comm_copy_buffer_callback);
index 4cfbe05..35e36d4 100644 (file)
@@ -6,6 +6,7 @@
 \r
 #include "smpi_comm.hpp"\r
 #include "smpi_coll.hpp"\r
+#include "smpi_datatype.hpp"\r
 #include "smpi_info.hpp"\r
 #include "smpi_file.hpp"\r
 #include "simgrid/plugins/file_system.h"\r
@@ -23,31 +24,69 @@ namespace smpi{
   File::~File(){\r
     delete file_;\r
   }\r
-  \r
+\r
   int File::close(MPI_File *fh){\r
+    XBT_DEBUG("Closing MPI_File %s", (*fh)->file_->get_path());\r
     (*fh)->sync();\r
     if((*fh)->flags() & MPI_MODE_DELETE_ON_CLOSE)\r
       (*fh)->file_->unlink();\r
-    delete fh;\r
+    delete (*fh);\r
     return MPI_SUCCESS;\r
   }\r
-  \r
+\r
   int File::del(char *filename, MPI_Info info){\r
+    //get the file with MPI_MODE_DELETE_ON_CLOSE and then close it\r
     File* f = new File(MPI_COMM_SELF,filename,MPI_MODE_DELETE_ON_CLOSE|MPI_MODE_RDWR, nullptr);\r
     close(&f);\r
     return MPI_SUCCESS;\r
   }\r
+\r
+  int File::seek(MPI_Offset offset, int whence){\r
+    switch(whence){\r
+      case(MPI_SEEK_SET):\r
+        XBT_DEBUG("Seeking in MPI_File %s, setting offset %lld", file_->get_path(), offset);\r
+        file_->seek(offset,SEEK_SET);\r
+        break;\r
+      case(MPI_SEEK_CUR):\r
+        XBT_DEBUG("Seeking in MPI_File %s, current offset + %lld", file_->get_path(), offset);\r
+        file_->seek(offset,SEEK_CUR);\r
+        break;\r
+      case(MPI_SEEK_END):\r
+        XBT_DEBUG("Seeking in MPI_File %s, end offset + %lld", file_->get_path(), offset);\r
+        file_->seek(offset,SEEK_END);\r
+        break;\r
+      default:\r
+        return MPI_ERR_FILE;\r
+    }\r
+    return MPI_SUCCESS;\r
+  }\r
   \r
+  int File::read(void *buf, int count, MPI_Datatype datatype, MPI_Status *status){\r
+    //get position first as we may be doing non contiguous reads and it will probably be updated badly\r
+    MPI_Offset position = file_->tell();\r
+    MPI_Offset movesize = datatype->get_extent()*count;\r
+    MPI_Offset readsize = datatype->size()*count;\r
+    XBT_DEBUG("Position before read in MPI_File %s : %llu",file_->get_path(),file_->tell());\r
+    MPI_Offset read = file_->read(readsize);\r
+    XBT_DEBUG("Read in MPI_File %s, %lld bytes read, readsize %lld bytes, movesize %lld", file_->get_path(), read, readsize, movesize);\r
+    if(readsize!=movesize){\r
+      file_->seek(position+movesize, SEEK_SET);\r
+    }\r
+    XBT_DEBUG("Position after read in MPI_File %s : %llu",file_->get_path(), file_->tell());\r
+    return MPI_SUCCESS;\r
+  }\r
+\r
   int File::size(){\r
     return file_->size();\r
   }\r
-  \r
+\r
   int File::flags(){\r
     return flags_;\r
   }\r
+\r
   int File::sync(){\r
     //no idea\r
     return simgrid::smpi::Colls::barrier(comm_);\r
   }\r
 }\r
-}
\ No newline at end of file
+}\r