if(datatype->has_subtype == 1){
// This part handles the problem of non-contiguous memory
old_buf = buf;
- buf = xbt_malloc(count*smpi_datatype_size(datatype));
+ buf = count==0 ? NULL : xbt_malloc(count*smpi_datatype_size(datatype));
if (flags & SEND) {
subtype->serialize(old_buf, buf, count, datatype->substruct);
}
int dst, int tag, MPI_Comm comm)
{
MPI_Request request =
- build_request(buf, count, datatype, smpi_comm_rank(comm), dst, tag,
+ build_request(buf, count, datatype, smpi_process_index(), smpi_group_index(smpi_comm_group(comm), dst), tag,
comm, PERSISTENT | SEND);
request->refcount++;
return request;
int dst, int tag, MPI_Comm comm)
{
MPI_Request request =
- build_request(buf, count, datatype, smpi_comm_rank(comm), dst, tag,
+ build_request(buf, count, datatype, smpi_process_index(), smpi_group_index(smpi_comm_group(comm), dst), tag,
comm, PERSISTENT | SSEND | SEND);
request->refcount++;
return request;
int src, int tag, MPI_Comm comm)
{
MPI_Request request =
- build_request(buf, count, datatype, src, smpi_comm_rank(comm), tag,
+ build_request(buf, count, datatype, src == MPI_ANY_SOURCE ? MPI_ANY_SOURCE : smpi_group_index(smpi_comm_group(comm), src), smpi_process_index(), tag,
comm, PERSISTENT | RECV);
request->refcount++;
return request;
} else {
- int receiver = smpi_group_index(smpi_comm_group(request->comm), request->dst);
+ int receiver = request->dst;//smpi_group_index(smpi_comm_group(request->comm), request->dst);
#ifdef HAVE_TRACING
int rank = smpi_process_index();
request->refcount++;
if(request->old_type->has_subtype == 0){
oldbuf = request->buf;
- if (oldbuf){
+ if (oldbuf && request->size!=0){
request->buf = xbt_malloc(request->size);
memcpy(request->buf,oldbuf,request->size);
}
}
XBT_DEBUG("Send request %p is detached; buf %p copied into %p",request,oldbuf,request->buf);
}
+
// we make a copy here, as the size is modified by simix, and we may reuse the request in another receive later
request->real_size=request->size;
smpi_datatype_use(request->old_type);
if((*request)->refcount<0) xbt_die("wrong refcount");
if((*request)->refcount==0){
+ print_request("Destroying", (*request));
xbt_free(*request);
*request = MPI_REQUEST_NULL;
+ }else{
+ print_request("Decrementing", (*request));
+
}
}else{
xbt_die("freeing an already free request");
int dst, int tag, MPI_Comm comm)
{
MPI_Request request =
- build_request(buf, count, datatype, smpi_comm_rank(comm), dst, tag,
+ build_request(buf, count, datatype, smpi_process_index(), smpi_group_index(smpi_comm_group(comm), dst), tag,
comm, NON_PERSISTENT | SEND);
return request;
int dst, int tag, MPI_Comm comm)
{
MPI_Request request =
- build_request(buf, count, datatype, smpi_comm_rank(comm), dst, tag,
+ build_request(buf, count, datatype, smpi_process_index(), smpi_group_index(smpi_comm_group(comm), dst), tag,
comm, NON_PERSISTENT | ISEND | SEND);
smpi_mpi_start(request);
int dst, int tag, MPI_Comm comm)
{
MPI_Request request =
- build_request(buf, count, datatype, smpi_comm_rank(comm), dst, tag,
+ build_request(buf, count, datatype, smpi_process_index(), smpi_group_index(smpi_comm_group(comm), dst), tag,
comm, NON_PERSISTENT | ISEND | SSEND | SEND);
smpi_mpi_start(request);
return request;
int src, int tag, MPI_Comm comm)
{
MPI_Request request =
- build_request(buf, count, datatype, src, smpi_comm_rank(comm), tag,
+ build_request(buf, count, datatype, src == MPI_ANY_SOURCE ? MPI_ANY_SOURCE : smpi_group_index(smpi_comm_group(comm), src), smpi_process_index(), tag,
comm, NON_PERSISTENT | RECV);
return request;
}
int src, int tag, MPI_Comm comm)
{
MPI_Request request =
- build_request(buf, count, datatype, src, smpi_comm_rank(comm), tag,
+ build_request(buf, count, datatype, src == MPI_ANY_SOURCE ? MPI_ANY_SOURCE : smpi_group_index(smpi_comm_group(comm), src), smpi_process_index(), tag,
comm, NON_PERSISTENT | RECV);
smpi_mpi_start(request);
int tag, MPI_Comm comm)
{
MPI_Request request =
- build_request(buf, count, datatype, smpi_comm_rank(comm), dst, tag,
+ build_request(buf, count, datatype, smpi_process_index(), smpi_group_index(smpi_comm_group(comm), dst), tag,
comm, NON_PERSISTENT | SEND);
-
smpi_mpi_start(request);
smpi_mpi_wait(&request, MPI_STATUS_IGNORE);
void smpi_mpi_ssend(void *buf, int count, MPI_Datatype datatype,
int dst, int tag, MPI_Comm comm)
{
- MPI_Request request = smpi_mpi_issend(buf, count, datatype, dst, tag, comm);
+ MPI_Request request =
+ build_request(buf, count, datatype, smpi_process_index(), smpi_group_index(smpi_comm_group(comm), dst), tag,
+ comm, NON_PERSISTENT | SSEND | SEND);
+
+ smpi_mpi_start(request);
smpi_mpi_wait(&request, MPI_STATUS_IGNORE);
}
if(!(req->detached && req->flags & SEND)){
if(status != MPI_STATUS_IGNORE) {
- status->MPI_SOURCE = req->src == MPI_ANY_SOURCE ? req->real_src : req->src;
+ int src = req->src == MPI_ANY_SOURCE ? req->real_src : req->src;
+ status->MPI_SOURCE = smpi_group_rank(smpi_comm_group(req->comm), src);
status->MPI_TAG = req->tag == MPI_ANY_TAG ? req->real_tag : req->tag;
status->MPI_ERROR = req->truncated ? MPI_ERR_TRUNCATE : MPI_SUCCESS;
// this handles the case were size in receive differs from size in send
flag = simcall_comm_test((*request)->action);
if(flag) {
finish_wait(request, status);
+ request=MPI_REQUEST_NULL;
}else{
smpi_empty_status(status);
}
while(flag==0){
smpi_mpi_iprobe(source, tag, comm, &flag, status);
XBT_DEBUG("Busy Waiting on probing : %d", flag);
- if(!flag) {
- simcall_process_sleep(0.0001);
- }
}
}
void smpi_mpi_iprobe(int source, int tag, MPI_Comm comm, int* flag, MPI_Status* status){
- MPI_Request request =build_request(NULL, 0, MPI_CHAR, source, smpi_comm_rank(comm), tag,
+
+ MPI_Request request =build_request(NULL, 0, MPI_CHAR, source == MPI_ANY_SOURCE ? MPI_ANY_SOURCE : smpi_group_index(smpi_comm_group(comm), source), smpi_comm_rank(comm), tag,
comm, NON_PERSISTENT | RECV);
+ //to avoid deadlock, we have to sleep some time here, or the timer won't advance and we will only do iprobe simcalls
+ double sleeptime= sg_cfg_get_double("smpi/iprobe");
+ //multiplier to the sleeptime, to increase speed of execution, each failed iprobe will increase it
+ static int nsleeps = 1;
+
+ simcall_process_sleep(sleeptime);
+
// behave like a receive, but don't do it
smx_rdv_t mailbox;
MPI_Request req = (MPI_Request)SIMIX_comm_get_src_data(request->action);
*flag = 1;
if(status != MPI_STATUS_IGNORE) {
- status->MPI_SOURCE = req->src;
+ status->MPI_SOURCE = smpi_group_rank(smpi_comm_group(comm), req->src);
status->MPI_TAG = req->tag;
status->MPI_ERROR = MPI_SUCCESS;
status->count = req->real_size;
}
+ nsleeps=1;//reset the number of sleeps we will do next time
+ }
+ else {
+ *flag = 0;
+ nsleeps++;
}
- else *flag = 0;
smpi_mpi_request_free(&request);
return;
index = smpi_mpi_waitany(count, requests, pstat);
if (index == MPI_UNDEFINED)
break;
+ requests[index]=MPI_REQUEST_NULL;
}
if (status != MPI_STATUSES_IGNORE) {
status[index] = *pstat;
if(status != MPI_STATUSES_IGNORE) {
status[index] = *pstat;
}
+ requests[index]=MPI_REQUEST_NULL;
}else{
return MPI_UNDEFINED;
}
if(status != MPI_STATUSES_IGNORE) {
status[i] = *pstat;
}
+ requests[i]=MPI_REQUEST_NULL;
+
}
}else{
count_dead++;
}
}
+
+void smpi_mpi_reduce_scatter(void *sendbuf, void *recvbuf, int *recvcounts,
+ MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
+{
+ int i, size, count;
+ int *displs;
+ int rank = smpi_process_index();
+ /* arbitrarily choose root as rank 0 */
+ size = smpi_comm_size(comm);
+ count = 0;
+ displs = xbt_new(int, size);
+ for (i = 0; i < size; i++) {
+ displs[i] = count;
+ count += recvcounts[i];
+ }
+ mpi_coll_reduce_fun(sendbuf, recvbuf, count, datatype, op, 0, comm);
+ smpi_mpi_scatterv(recvbuf, recvcounts, displs, datatype, recvbuf,
+ recvcounts[rank], datatype, 0, comm);
+ xbt_free(displs);
+}
+
void smpi_mpi_gatherv(void *sendbuf, int sendcount, MPI_Datatype sendtype,
void *recvbuf, int *recvcounts, int *displs,
MPI_Datatype recvtype, int root, MPI_Comm comm)