Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Change the way MPI_Accumulate works with serialization, to handle complex datatypes.
[simgrid.git] / src / smpi / smpi_base.c
index 830cd2d623fcf0e56e24851b2229b71bdb875328..562ddde3f11ebed870a4ffb0779b655b42c0e038 100644 (file)
@@ -197,11 +197,11 @@ static MPI_Request build_request(void *buf, int count,
 
   s_smpi_subtype_t *subtype = datatype->substruct;
 
-  if(datatype->has_subtype == 1){
+  if(((flags & RECV) && (flags & ACCUMULATE)) || (datatype->has_subtype == 1)){
     // This part handles the problem of non-contiguous memory
     old_buf = buf;
     buf = count==0 ? NULL : xbt_malloc(count*smpi_datatype_size(datatype));
-    if (flags & SEND) {
+    if ((datatype->has_subtype == 1) && (flags & SEND)) {
       subtype->serialize(old_buf, buf, count, datatype->substruct);
     }
   }
@@ -230,7 +230,7 @@ static MPI_Request build_request(void *buf, int count,
     request->refcount = 1;
   else
     request->refcount = 0;
-
+  request->op = MPI_REPLACE;
 #ifdef HAVE_TRACING
   request->send = 0;
   request->recv = 0;
@@ -336,7 +336,7 @@ void smpi_mpi_start(MPI_Request request)
     smpi_comm_use(request->comm);
     request->action = simcall_comm_irecv(mailbox, request->buf,
                                          &request->real_size, &match_recv,
-                                         (request->flags & ACCUMULATE)? NULL : &smpi_comm_copy_buffer_callback,
+                                         &smpi_comm_copy_buffer_callback,
                                          request, -1.0);
 
     //integrate pseudo-timing for buffering of small messages, do not bother to execute the simcall if 0
@@ -412,7 +412,7 @@ void smpi_mpi_start(MPI_Request request)
                          buf, request->real_size,
                          &match_send,
                          &xbt_free_f, // how to free the userdata if a detached send fails
-                         (request->flags & ACCUMULATE)? NULL : &smpi_comm_copy_buffer_callback,
+                         &smpi_comm_copy_buffer_callback,
                          request,
                          // detach if msg size < eager/rdv switch limit
                          request->detached);
@@ -458,20 +458,32 @@ void smpi_mpi_request_free(MPI_Request * request)
 
 
 MPI_Request smpi_rma_send_init(void *buf, int count, MPI_Datatype datatype,
-                            int src, int dst, int tag, MPI_Comm comm)
+                            int src, int dst, int tag, MPI_Comm comm, MPI_Op op)
 {
   MPI_Request request = NULL; /* MC needs the comm to be set to NULL during the call */
-  request = build_request(buf==MPI_BOTTOM ? (void*)0 : buf , count, datatype, src, dst, tag,
-                          comm, RMA | NON_PERSISTENT | ISEND | SEND | PREPARED);
+  if(op==MPI_OP_NULL){
+    request = build_request(buf==MPI_BOTTOM ? (void*)0 : buf , count, datatype, src, dst, tag,
+                            comm, RMA | NON_PERSISTENT | ISEND | SEND | PREPARED);
+  }else{
+    request = build_request(buf==MPI_BOTTOM ? (void*)0 : buf, count, datatype,  src, dst, tag,
+                            comm, RMA | NON_PERSISTENT | ISEND | SEND | PREPARED | ACCUMULATE);
+    request->op = op;
+  }
   return request;
 }
 
 MPI_Request smpi_rma_recv_init(void *buf, int count, MPI_Datatype datatype,
-                            int src, int dst, int tag, MPI_Comm comm)
+                            int src, int dst, int tag, MPI_Comm comm, MPI_Op op)
 {
   MPI_Request request = NULL; /* MC needs the comm to be set to NULL during the call */
-  request = build_request(buf==MPI_BOTTOM ? (void*)0 : buf, count, datatype,  src, dst, tag,
-                          comm, RMA | NON_PERSISTENT | RECV | PREPARED);
+  if(op==MPI_OP_NULL){
+    request = build_request(buf==MPI_BOTTOM ? (void*)0 : buf, count, datatype,  src, dst, tag,
+                            comm, RMA | NON_PERSISTENT | RECV | PREPARED);
+  }else{
+    request = build_request(buf==MPI_BOTTOM ? (void*)0 : buf, count, datatype,  src, dst, tag,
+                            comm, RMA | NON_PERSISTENT | RECV | PREPARED | ACCUMULATE);
+    request->op = op;
+  }
   return request;
 }
 
@@ -612,7 +624,7 @@ static void finish_wait(MPI_Request * request, MPI_Status * status)
     print_request("Finishing", req);
     MPI_Datatype datatype = req->old_type;
 
-    if(datatype->has_subtype == 1){
+    if((req->flags & ACCUMULATE) || (datatype->has_subtype == 1)){
       if (!_xbt_replay_is_active()){
         if( smpi_privatize_global_variables
             && ((char*)req->old_buf >= start_data_exe)
@@ -622,13 +634,19 @@ static void finish_wait(MPI_Request * request, MPI_Status * status)
             switch_data_segment(smpi_process_index());
         }
       }
-      // This part handles the problem of non-contignous memory
-      // the unserialization at the reception
-      s_smpi_subtype_t *subtype = datatype->substruct;
       if(req->flags & RECV) {
-        subtype->unserialize(req->buf, req->old_buf, req->real_size/smpi_datatype_size(datatype) , datatype->substruct);
+
+        if(datatype->has_subtype == 1){
+          // This part handles the problem of non-contignous memory
+          // the unserialization at the reception
+          s_smpi_subtype_t *subtype = datatype->substruct;
+            subtype->unserialize(req->buf, req->old_buf, req->real_size/smpi_datatype_size(datatype) , datatype->substruct, req->op);
+          if(req->detached == 0) free(req->buf);
+        }else{//apply op on contiguous buffer for accumulate
+            int n =req->real_size/smpi_datatype_size(datatype);
+            smpi_op_apply(req->op, req->buf, req->old_buf, &n, &datatype);
+        }
       }
-      if(req->detached == 0) free(req->buf);
     }
     smpi_comm_unuse(req->comm);
     smpi_datatype_unuse(datatype);