From e2d7449be5c9b3b78d266db18435864237c4ac6c Mon Sep 17 00:00:00 2001 From: markls Date: Mon, 23 Jul 2007 23:21:44 +0000 Subject: [PATCH 1/1] everything compiles and basic program runs again, but still getting a pointer error when I try to actually send data between nodes. git-svn-id: svn+ssh://scm.gforge.inria.fr/svn/simgrid/simgrid/trunk@3887 48e7efb5-ca39-0410-a469-dd3cf9ba447f --- src/smpi/include/smpi.h | 2 +- src/smpi/src/smpi_base.c | 58 +++++++++++++++++++++++++++++++----- src/smpi/src/smpi_mpi.c | 64 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 116 insertions(+), 8 deletions(-) diff --git a/src/smpi/include/smpi.h b/src/smpi/include/smpi.h index 99a4643db8..a007c9c659 100644 --- a/src/smpi/include/smpi.h +++ b/src/smpi/include/smpi.h @@ -17,7 +17,7 @@ #define MPI_ERR_RANK 7 #define MPI_ERR_TAG 8 -#include +#include #include // MPI_Comm diff --git a/src/smpi/src/smpi_base.c b/src/smpi/src/smpi_base.c index 44698c78c8..57763a2294 100644 --- a/src/smpi/src/smpi_base.c +++ b/src/smpi/src/smpi_base.c @@ -177,6 +177,13 @@ int smpi_sender(int argc, char **argv) SIMIX_mutex_unlock(smpi_running_hosts_mutex); } + SIMIX_mutex_lock(init_mutex); + smpi_ready_count--; + if (smpi_ready_count <= 0) { + SIMIX_cond_broadcast(init_cond); + } + SIMIX_mutex_unlock(init_mutex); + return 0; } @@ -230,6 +237,9 @@ int smpi_receiver(int argc, char **argv) while (0 < running_hosts) { + request = NULL; + message = NULL; + // FIXME: better algorithm, maybe some kind of balanced tree? or a heap? // FIXME: not the best way to request multiple locks... @@ -279,6 +289,13 @@ stopsearch: SIMIX_mutex_unlock(smpi_running_hosts_mutex); } + SIMIX_mutex_lock(init_mutex); + smpi_ready_count--; + if (smpi_ready_count <= 0) { + SIMIX_cond_broadcast(init_cond); + } + SIMIX_mutex_unlock(init_mutex); + return 0; } @@ -353,6 +370,11 @@ void *smpi_new_message() return xbt_new(smpi_received_message_t, 1); } +void smpi_do_nothing(void *pointer) +{ + return; +} + void smpi_mpi_init() { int i; @@ -397,8 +419,8 @@ void smpi_mpi_init() smpi_mpi_sum.func = &smpi_mpi_sum_func; // smpi globals - smpi_request_mallocator = xbt_mallocator_new(SMPI_REQUEST_MALLOCATOR_SIZE, &smpi_new_request, &xbt_free, NULL); - smpi_message_mallocator = xbt_mallocator_new(SMPI_MESSAGE_MALLOCATOR_SIZE, &smpi_new_message, &xbt_free, NULL); + smpi_request_mallocator = xbt_mallocator_new(SMPI_REQUEST_MALLOCATOR_SIZE, smpi_new_request, xbt_free, smpi_do_nothing); + smpi_message_mallocator = xbt_mallocator_new(SMPI_MESSAGE_MALLOCATOR_SIZE, smpi_new_message, xbt_free, smpi_do_nothing); smpi_pending_send_requests = xbt_new(xbt_fifo_t, size); smpi_pending_send_requests_mutex = xbt_new(smx_mutex_t, size); smpi_pending_recv_requests = xbt_new(xbt_fifo_t, size); @@ -459,8 +481,30 @@ void smpi_mpi_finalize() i = --smpi_running_hosts; SIMIX_mutex_unlock(smpi_running_hosts_mutex); + SIMIX_mutex_lock(init_mutex); + smpi_ready_count--; + SIMIX_mutex_unlock(init_mutex); + if (0 >= i) { + // wake up senders/receivers + for (i = 0; i < smpi_mpi_comm_world.size; i++) { + if (SIMIX_process_is_suspended(smpi_sender_processes[i])) { + SIMIX_process_resume(smpi_sender_processes[i]); + } + if (SIMIX_process_is_suspended(smpi_receiver_processes[i])) { + SIMIX_process_resume(smpi_receiver_processes[i]); + } + } + + // wait for senders/receivers to exit... + SIMIX_mutex_lock(init_mutex); + if (smpi_ready_count > 0) { + SIMIX_cond_wait(init_cond, init_mutex); + } + SIMIX_mutex_unlock(init_mutex); + + SIMIX_mutex_destroy(init_mutex); SIMIX_mutex_destroy(smpi_running_hosts_mutex); for (i = 0 ; i < smpi_mpi_comm_world.size; i++) { @@ -592,8 +636,8 @@ int smpi_isend(smpi_mpi_request_t *request) xbt_fifo_push(smpi_pending_send_requests[rank], request); SIMIX_mutex_unlock(smpi_pending_send_requests_mutex[rank]); - if (MSG_process_is_suspended(smpi_sender_processes[rank])) { - MSG_process_resume(smpi_sender_processes[rank]); + if (SIMIX_process_is_suspended(smpi_sender_processes[rank])) { + SIMIX_process_resume(smpi_sender_processes[rank]); } } @@ -605,8 +649,8 @@ int smpi_irecv(smpi_mpi_request_t *request) xbt_fifo_push(smpi_pending_recv_requests[rank], request); SIMIX_mutex_unlock(smpi_pending_recv_requests_mutex[rank]); - if (MSG_process_is_suspended(smpi_receiver_processes[rank])) { - MSG_process_resume(smpi_receiver_processes[rank]); + if (SIMIX_process_is_suspended(smpi_receiver_processes[rank])) { + SIMIX_process_resume(smpi_receiver_processes[rank]); } } @@ -624,7 +668,7 @@ void smpi_wait(smpi_mpi_request_t *request, smpi_mpi_status_t *status) } SIMIX_mutex_unlock(request->mutex); if (suspend) { - SIMIX_suspend(self); + SIMIX_process_suspend(self); } if (NULL != status && MPI_STATUS_IGNORE != status) { SIMIX_mutex_lock(request->mutex); diff --git a/src/smpi/src/smpi_mpi.c b/src/smpi/src/smpi_mpi.c index e5af0573e0..37ce0afa36 100644 --- a/src/smpi/src/smpi_mpi.c +++ b/src/smpi/src/smpi_mpi.c @@ -87,3 +87,67 @@ int MPI_Barrier(MPI_Comm comm) smpi_bench_begin(); return MPI_SUCCESS; } + +int MPI_Irecv(void *buf, int count, MPI_Datatype datatype, int src, int tag, MPI_Comm comm, MPI_Request *request) +{ + int retval = MPI_SUCCESS; + int dst; + smpi_bench_end(); + dst = smpi_mpi_comm_rank_self(comm); + retval = smpi_create_request(buf, count, datatype, src, dst, tag, comm, request); + if (NULL != *request) { + smpi_irecv(*request); + } + smpi_bench_begin(); + return retval; +} + +int MPI_Recv(void *buf, int count, MPI_Datatype datatype, int src, int tag, MPI_Comm comm, MPI_Status *status) +{ + int retval = MPI_SUCCESS; + int dst; + smpi_mpi_request_t *request; + smpi_bench_end(); + dst = smpi_mpi_comm_rank_self(comm); + retval = smpi_create_request(buf, count, datatype, src, dst, tag, comm, &request); + if (NULL != request) { + smpi_irecv(request); + smpi_wait(request, status); + // FIXME: mallocator + //xbt_free(request); + } + smpi_bench_begin(); + return retval; +} + +int MPI_Isend(void *buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm, MPI_Request *request) +{ + int retval = MPI_SUCCESS; + int src; + smpi_bench_end(); + src = smpi_mpi_comm_rank_self(comm); + retval = smpi_create_request(buf, count, datatype, src, dst, tag, comm, request); + if (NULL != *request) { + smpi_isend(*request); + } + smpi_bench_begin(); + return retval; +} + +int MPI_Send(void *buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm) +{ + int retval = MPI_SUCCESS; + int src; + smpi_mpi_request_t *request; + smpi_bench_end(); + src = smpi_mpi_comm_rank_self(comm); + retval = smpi_create_request(buf, count, datatype, src, dst, tag, comm, &request); + if (NULL != request) { + smpi_isend(request); + smpi_wait(request, MPI_STATUS_IGNORE); + // FIXME: mallocator + //xbt_free(request) + } + smpi_bench_begin(); + return retval; +} -- 2.39.2