XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_base, smpi, "Logging specific to SMPI (base)");
-static int match_recv(void* a, void* b, smx_action_t ignored) {
+static int match_recv(void* a, void* b, smx_synchro_t ignored) {
MPI_Request ref = (MPI_Request)a;
MPI_Request req = (MPI_Request)b;
XBT_DEBUG("Trying to match a recv of src %d against %d, tag %d against %d",ref->src,req->src, ref->tag, req->tag);
}else return 0;
}
-static int match_send(void* a, void* b,smx_action_t ignored) {
+static int match_send(void* a, void* b,smx_synchro_t ignored) {
MPI_Request ref = (MPI_Request)a;
MPI_Request req = (MPI_Request)b;
XBT_DEBUG("Trying to match a send of src %d against %d, tag %d against %d",ref->src,req->src, ref->tag, req->tag);
void smpi_mpi_start(MPI_Request request)
{
smx_rdv_t mailbox;
-
+
xbt_assert(!request->action, "Cannot (re)start a non-finished communication");
request->flags &= ~PREPARED;
request->flags &= ~FINISHED;
if (request->flags & RECV) {
print_request("New recv", request);
- //FIXME: if receive is posted with a large size, but send is smaller, mailboxes may not match !
- if (request->flags & RMA || request->size < sg_cfg_get_int("smpi/async_small_thres"))
+
+ if (request->flags & RMA || request->size < sg_cfg_get_int("smpi/async_small_thres")){
+ //We have to check both mailboxes (because SSEND messages are sent to the large mbox). begin with the more appropriate one : the small one.
mailbox = smpi_process_mailbox_small();
- else
- mailbox = smpi_process_mailbox();
+ XBT_DEBUG("Is there a corresponding send already posted the small mailbox %p (in case of SSEND)?", mailbox);
+ smx_synchro_t action = simcall_comm_iprobe(mailbox, 0, request->src,request->tag, &match_recv, (void*)request);
+
+ if(action ==NULL){
+ mailbox = smpi_process_mailbox();
+ XBT_DEBUG("No, nothing in the small mailbox test the other one : %p", mailbox);
+ action = simcall_comm_iprobe(mailbox, 0, request->src,request->tag, &match_recv, (void*)request);
+ if(action ==NULL){
+ XBT_DEBUG("Still notching, switch back to the small mailbox : %p", mailbox);
+ mailbox = smpi_process_mailbox_small();
+ }
+ }else{
+ XBT_DEBUG("yes there was something for us in the large mailbox");
+ }
+ }else{
+ mailbox = smpi_process_mailbox_small();
+ XBT_DEBUG("Is there a corresponding send already posted the small mailbox?");
+ smx_synchro_t action = simcall_comm_iprobe(mailbox, 0, request->src,request->tag, &match_recv, (void*)request);
+
+ if(action ==NULL){
+ XBT_DEBUG("No, nothing in the permanent receive mailbox");
+ mailbox = smpi_process_mailbox();
+ }else{
+ XBT_DEBUG("yes there was something for us in the small mailbox");
+ }
+ }
+
+ //integrate pseudo-timing for buffering of small messages, do not bother to execute the simcall if 0
+ double sleeptime = request->detached ? smpi_or(request->size) : 0.0;
+ if(sleeptime!=0.0){
+ simcall_process_sleep(sleeptime);
+ XBT_DEBUG("receiving size of %zu : sleep %f ", request->size, smpi_or(request->size));
+ }
+
// we make a copy here, as the size is modified by simix, and we may reuse the request in another receive later
request->real_size=request->size;
smpi_datatype_use(request->old_type);
smpi_comm_use(request->comm);
request->action = simcall_comm_irecv(mailbox, request->buf,
&request->real_size, &match_recv,
- &smpi_comm_copy_buffer_callback,
+ !smpi_process_get_replaying()? &smpi_comm_copy_buffer_callback
+ : &smpi_comm_null_copy_buffer_callback,
request, -1.0);
+ XBT_DEBUG("recv simcall posted");
- //integrate pseudo-timing for buffering of small messages, do not bother to execute the simcall if 0
- double sleeptime = request->detached ? smpi_or(request->size) : 0.0;
- if(sleeptime!=0.0){
- simcall_process_sleep(sleeptime);
- XBT_DEBUG("receiving size of %zu : sleep %f ", request->size, smpi_or(request->size));
- }
} else {
- int receiver = request->dst;//smpi_group_index(smpi_comm_group(request->comm), request->dst);
+ int receiver = request->dst;
#ifdef HAVE_TRACING
int rank = request->src;
TRACE_smpi_send(rank, rank, receiver,request->size);
}
#endif
-/* if(receiver == MPI_UNDEFINED) {*/
-/* XBT_WARN("Trying to send a message to a wrong rank");*/
-/* return;*/
-/* }*/
print_request("New send", request);
+
+ //if we are giving back the control to the user without waiting for completion, we have to inject timings
+ double sleeptime = 0.0;
+ if(request->detached || (request->flags & (ISEND|SSEND))){// issend should be treated as isend
+ //isend and send timings may be different
+ sleeptime = (request->flags & ISEND)? smpi_ois(request->size) : smpi_os(request->size);
+ }
+
+ if(sleeptime != 0.0){
+ simcall_process_sleep(sleeptime);
+ XBT_DEBUG("sending size of %zu : sleep %f ", request->size, smpi_os(request->size));
+ }
+
if (request->flags & RMA || request->size < sg_cfg_get_int("smpi/async_small_thres")) { // eager mode
- mailbox = smpi_process_remote_mailbox_small(receiver);
+ mailbox = smpi_process_remote_mailbox(receiver);
+ XBT_DEBUG("Is there a corresponding recv already posted in the large mailbox %p?", mailbox);
+ smx_synchro_t action = simcall_comm_iprobe(mailbox, 1,request->dst, request->tag, &match_send, (void*)request);
+ if(action ==NULL){
+ if (! (request->flags & SSEND)){
+ mailbox = smpi_process_remote_mailbox_small(receiver);
+ XBT_DEBUG("No, nothing in the large mailbox, message is to be sent on the small one %p", mailbox);
+ } else{
+ mailbox = smpi_process_remote_mailbox_small(receiver);
+ XBT_DEBUG("SSEND : Is there a corresponding recv already posted in the small mailbox %p?", mailbox);
+ action = simcall_comm_iprobe(mailbox, 1,request->dst, request->tag, &match_send, (void*)request);
+ if(action ==NULL){
+ XBT_DEBUG("No, we are first, send to large mailbox");
+ mailbox = smpi_process_remote_mailbox(receiver);
+ }
+ }
+ }else{
+ XBT_DEBUG("Yes there was something for us in the large mailbox");
+ }
}else{
- XBT_DEBUG("Send request %p is not in the permanent receive mailbox (buf: %p)",request,request->buf);
mailbox = smpi_process_remote_mailbox(receiver);
+ XBT_DEBUG("Send request %p is in the large mailbox %p (buf: %p)",mailbox, request,request->buf);
}
void* buf = request->buf;
request->refcount++;
if(request->old_type->has_subtype == 0){
oldbuf = request->buf;
- if (!_xbt_replay_is_active() && oldbuf && request->size!=0){
+ if (!smpi_process_get_replaying() && oldbuf && request->size!=0){
if((smpi_privatize_global_variables)
&& ((char*)request->buf >= start_data_exe)
&& ((char*)request->buf < start_data_exe + size_data_exe )){
XBT_DEBUG("Privatization : We are sending from a zone inside global memory. Switch data segment ");
- switch_data_segment(request->src);
+ smpi_switch_data_segment(request->src);
}
buf = xbt_malloc(request->size);
memcpy(buf,oldbuf,request->size);
request->real_size=request->size;
smpi_datatype_use(request->old_type);
smpi_comm_use(request->comm);
-
- //if we are giving back the control to the user without waiting for completion, we have to inject timings
- double sleeptime = 0.0;
- if(request->detached || (request->flags & (ISEND|SSEND))){// issend should be treated as isend
- //isend and send timings may be different
- sleeptime = (request->flags & ISEND)? smpi_ois(request->size) : smpi_os(request->size);
- }
-
- if(sleeptime != 0.0){
- simcall_process_sleep(sleeptime);
- XBT_DEBUG("sending size of %zu : sleep %f ", request->size, smpi_os(request->size));
- }
request->action =
simcall_comm_isend(SIMIX_process_from_PID(request->src+1), mailbox, request->size, -1.0,
buf, request->real_size,
&match_send,
&xbt_free_f, // how to free the userdata if a detached send fails
- &smpi_comm_copy_buffer_callback,
+ !smpi_process_get_replaying()? &smpi_comm_copy_buffer_callback
+ : &smpi_comm_null_copy_buffer_callback,
request,
// detach if msg size < eager/rdv switch limit
request->detached);
+ XBT_DEBUG("send simcall posted");
+
+
#ifdef HAVE_TRACING
/* FIXME: detached sends are not traceable (request->action == NULL) */
MPI_Datatype datatype = req->old_type;
if((req->flags & ACCUMULATE) || (datatype->has_subtype == 1)){
- if (!_xbt_replay_is_active()){
+ if (!smpi_process_get_replaying()){
if( smpi_privatize_global_variables
&& ((char*)req->old_buf >= start_data_exe)
&& ((char*)req->old_buf < start_data_exe + size_data_exe )
){
XBT_VERB("Privatization : We are unserializing to a zone in global memory - Switch data segment ");
- switch_data_segment(smpi_process_index());
+ smpi_switch_data_segment(smpi_process_index());
}
}
*index = MPI_UNDEFINED;
flag = 0;
- comms = xbt_dynar_new(sizeof(smx_action_t), NULL);
+ comms = xbt_dynar_new(sizeof(smx_synchro_t), NULL);
map = xbt_new(int, count);
size = 0;
for(i = 0; i < count; i++) {
if (sg_cfg_get_int("smpi/async_small_thres")>0){
mailbox = smpi_process_mailbox_small();
XBT_DEBUG("trying to probe the perm recv mailbox");
- request->action = simcall_comm_iprobe(mailbox, request->src, request->tag, &match_recv, (void*)request);
+ request->action = simcall_comm_iprobe(mailbox, 0, request->src, request->tag, &match_recv, (void*)request);
}
if (request->action==NULL){
mailbox = smpi_process_mailbox();
XBT_DEBUG("trying to probe the other mailbox");
- request->action = simcall_comm_iprobe(mailbox, request->src, request->tag, &match_recv, (void*)request);
+ request->action = simcall_comm_iprobe(mailbox, 0, request->src,request->tag, &match_recv, (void*)request);
}
if(request->action){
index = MPI_UNDEFINED;
if(count > 0) {
// Wait for a request to complete
- comms = xbt_dynar_new(sizeof(smx_action_t), NULL);
+ comms = xbt_dynar_new(sizeof(smx_synchro_t), NULL);
map = xbt_new(int, count);
size = 0;
XBT_DEBUG("Wait for one of %d", count);
displs[i] = count;
count += recvcounts[i];
}
- tmpbuf=(void*)xbt_malloc(count*smpi_datatype_get_extent(datatype));
+ tmpbuf=(void*)smpi_get_tmp_sendbuffer(count*smpi_datatype_get_extent(datatype));
+
mpi_coll_reduce_fun(sendbuf, tmpbuf, count, datatype, op, 0, comm);
smpi_mpi_scatterv(tmpbuf, recvcounts, displs, datatype, recvbuf,
recvcounts[rank], datatype, 0, comm);
xbt_free(displs);
- xbt_free(tmpbuf);
+ smpi_free_tmp_buffer(tmpbuf);
}
void smpi_mpi_gatherv(void *sendbuf, int sendcount, MPI_Datatype sendtype,
char* sendtmpbuf = (char*) sendbuf;
if( sendbuf == MPI_IN_PLACE ) {
- sendtmpbuf = (char *)xbt_malloc(count*smpi_datatype_get_extent(datatype));
+ sendtmpbuf = (char *)smpi_get_tmp_sendbuffer(count*smpi_datatype_get_extent(datatype));
smpi_datatype_copy(recvbuf, count, datatype,sendtmpbuf, count, datatype);
}
if(src != root) {
// FIXME: possibly overkill we we have contiguous/noncontiguous data
// mapping...
- tmpbufs[index] = xbt_malloc(count * dataext);
+ if (!smpi_process_get_replaying())
+ tmpbufs[index] = xbt_malloc(count * dataext);
+ else
+ tmpbufs[index] = smpi_get_tmp_sendbuffer(count * dataext);
requests[index] =
smpi_irecv_init(tmpbufs[index], count, datatype, src,
system_tag, comm);
if(op) /* op can be MPI_OP_NULL that does nothing */
smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype);
}
- for(index = 0; index < size - 1; index++) {
- xbt_free(tmpbufs[index]);
- }
+ for(index = 0; index < size - 1; index++) {
+ smpi_free_tmp_buffer(tmpbufs[index]);
+ }
xbt_free(tmpbufs);
xbt_free(requests);
if( sendbuf == MPI_IN_PLACE ) {
- xbt_free(sendtmpbuf);
+ smpi_free_tmp_buffer(sendtmpbuf);
}
}
}
for(other = 0; other < rank; other++) {
// FIXME: possibly overkill we we have contiguous/noncontiguous data
// mapping...
- tmpbufs[index] = xbt_malloc(count * dataext);
+ tmpbufs[index] = smpi_get_tmp_sendbuffer(count * dataext);
requests[index] =
smpi_irecv_init(tmpbufs[index], count, datatype, other, system_tag,
comm);
}
}
for(index = 0; index < rank; index++) {
- xbt_free(tmpbufs[index]);
+ smpi_free_tmp_buffer(tmpbufs[index]);
}
for(index = 0; index < size-1; index++) {
smpi_mpi_request_free(&requests[index]);
for(other = 0; other < rank; other++) {
// FIXME: possibly overkill we we have contiguous/noncontiguous data
// mapping...
- tmpbufs[index] = xbt_malloc(count * dataext);
+ tmpbufs[index] = smpi_get_tmp_sendbuffer(count * dataext);
requests[index] =
smpi_irecv_init(tmpbufs[index], count, datatype, other, system_tag,
comm);
}
}
for(index = 0; index < rank; index++) {
- xbt_free(tmpbufs[index]);
+ smpi_free_tmp_buffer(tmpbufs[index]);
}
for(index = 0; index < size-1; index++) {
smpi_mpi_request_free(&requests[index]);
xbt_free(tmpbufs);
xbt_free(requests);
}
+