From: Arnaud Giersch Date: Thu, 10 Feb 2011 16:43:09 +0000 (+0100) Subject: Separate ctrl and data communications. X-Git-Tag: v0.1~149 X-Git-Url: https://bilbo.iut-bm.univ-fcomte.fr/and/gitweb/loba.git/commitdiff_plain/271654d06dedba35eab7c26069a42bac89b0cee4?ds=sidebyside Separate ctrl and data communications. This is a preparation for making computations in a separate thread in process. Note: results may be different with this commit. --- diff --git a/communicator.cpp b/communicator.cpp index fd01ac9..ecb207a 100644 --- a/communicator.cpp +++ b/communicator.cpp @@ -12,6 +12,16 @@ XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(comm); #include "communicator.h" +namespace { + + void check_for_lost_messages(size_t size, const char* descr) + { + if (size) + XBT_WARN("lost %zu %s message%s!", size, descr, ESSE(size)); + } + +} + communicator::communicator() : host(static_cast(MSG_host_get_data(MSG_host_self()))) { @@ -36,15 +46,13 @@ communicator::~communicator() receiver_thread->wait(); delete receiver_thread; - if (!received.empty()) - XBT_WARN("lost %zu received message%s!", - received.size(), ESSE(received.size())); - if (!sent_comm.empty()) - XBT_WARN("lost %zu sent message%s!", - sent_comm.size(), ESSE(sent_comm.size())); + check_for_lost_messages(ctrl_received.size(), "received ctrl"); + check_for_lost_messages(data_received.size(), "received data"); + check_for_lost_messages(ctrl_sent.size(), "sent ctrl"); + check_for_lost_messages(data_sent.size(), "sent data"); } -void communicator::send(const char* dest, message* msg) +msg_comm_t communicator::real_send(const char* dest, message* msg) { XBT_DEBUG("send %s to %s", msg->to_string().c_str(), dest); double msg_size = sizeof *msg; @@ -54,24 +62,24 @@ void communicator::send(const char* dest, message* msg) TRACE_msg_set_task_category(task, msg->get_type() == message::LOAD ? TRACE_CAT_DATA : TRACE_CAT_CTRL); - msg_comm_t comm = MSG_task_isend(task, dest); - sent_comm.push_back(comm); + return MSG_task_isend(task, dest); } -void communicator::flush(bool wait) +void communicator::real_flush(sent_comm_type& sent_comm, bool wait) { sent_comm_type::iterator bound; bound = std::remove_if(sent_comm.begin(), sent_comm.end(), comm_test_n_destroy); sent_comm.erase(bound, sent_comm.end()); if (wait && !sent_comm.empty()) { - msg_comm_t comms[sent_comm.size()]; + size_t size = sent_comm.size(); + msg_comm_t* comms = new msg_comm_t[size]; std::copy(sent_comm.begin(), sent_comm.end(), comms); - MSG_comm_waitall(comms, sent_comm.size(), -1.0); - if (!MSG_WAIT_DESTROYS_COMMS) - std::for_each(sent_comm.begin(), sent_comm.end(), - comm_check_n_destroy); sent_comm.clear(); + MSG_comm_waitall(comms, size, -1.0); + if (!MSG_WAIT_DESTROYS_COMMS) + std::for_each(comms, comms + size, comm_check_n_destroy); + delete[] comms; } } @@ -82,9 +90,10 @@ void communicator::receiver() msg_comm_t comm; m_task_t task; const char* mbox; + message_queue& received; }; - channel chan[] = { { NULL, NULL, host->get_ctrl_mbox() }, - { NULL, NULL, host->get_data_mbox() } }; + channel chan[] = { { NULL, NULL, host->get_ctrl_mbox(), ctrl_received }, + { NULL, NULL, host->get_data_mbox(), data_received } }; const int chan_size = (sizeof chan) / (sizeof chan[0]); for (int i = 0 ; i < chan_size ; ++i) { @@ -104,7 +113,7 @@ void communicator::receiver() comm_check_n_destroy(ch->comm); if (strcmp(MSG_task_get_name(ch->task), "finalize")) { XBT_DEBUG("received message on %s", ch->mbox); - received.push(ch->task); + ch->received.push(ch->task); ch->task = NULL; ch->comm = MSG_task_irecv(&ch->task, ch->mbox); xbt_dynar_set_as(comms, index, msg_comm_t, ch->comm); diff --git a/communicator.h b/communicator.h index f9d096a..b79210c 100644 --- a/communicator.h +++ b/communicator.h @@ -15,17 +15,33 @@ public: ~communicator(); // Send a message to the "dest" mailbox - void send(const char* dest, message* msg); + void ctrl_send(const char* dest, message* msg) { + ctrl_sent.push_back(real_send(dest, msg)); + } + void data_send(const char* dest, message* msg) { + data_sent.push_back(real_send(dest, msg)); + } // Try to flush pending sending communications. // If "wait" is true, blocks until success. - void flush(bool wait); + void ctrl_flush(bool wait) { + real_flush(ctrl_sent, wait); + } + void data_flush(bool wait) { + real_flush(data_sent, wait); + } + + // Flush all sending_communications. Blocking. + void flush_all(); // Try to get a message. Returns true on success. // Parameter "timeout" may be 0 for non-blocking operation, -1 for // infinite waiting, or any positive timeout. - bool recv(message*& msg, m_host_t& from, double timeout) { - return received.pop(msg, from, timeout); + bool ctrl_recv(message*& msg, m_host_t& from, double timeout) { + return ctrl_received.pop(msg, from, timeout); + } + bool data_recv(message*& msg, m_host_t& from, double timeout) { + return data_received.pop(msg, from, timeout); } private: @@ -34,10 +50,15 @@ private: // List of pending send communications typedef std::vector sent_comm_type; - sent_comm_type sent_comm; + sent_comm_type ctrl_sent; + sent_comm_type data_sent; + + msg_comm_t real_send(const char* dest, message* msg); + void real_flush(sent_comm_type& sent_comm, bool wait); - // Queue of received messages - message_queue received; + // Queues of received messages + message_queue ctrl_received; + message_queue data_received; // Handling of receiving thread msg_thread* receiver_thread; diff --git a/process.cpp b/process.cpp index 34ddbea..39646cd 100644 --- a/process.cpp +++ b/process.cpp @@ -204,10 +204,12 @@ void process::send(neighbor& nb, double amount) void process::send1_no_bookkeeping(neighbor& nb) { if (real_load != prev_load_broadcast) - comm.send(nb.get_ctrl_mbox(), new message(message::INFO, real_load)); + comm.ctrl_send(nb.get_ctrl_mbox(), + new message(message::INFO, real_load)); double load_to_send = nb.get_to_send(); if (load_to_send > 0.0) { - comm.send(nb.get_data_mbox(), new message(message::LOAD, load_to_send)); + comm.data_send(nb.get_data_mbox(), + new message(message::LOAD, load_to_send)); nb.set_to_send(0.0); } } @@ -215,14 +217,14 @@ void process::send1_no_bookkeeping(neighbor& nb) void process::send1_bookkeeping(neighbor& nb) { if (expected_load != prev_load_broadcast) - comm.send(nb.get_ctrl_mbox(), - new message(message::INFO, expected_load)); + comm.ctrl_send(nb.get_ctrl_mbox(), + new message(message::INFO, expected_load)); double load_to_send; double new_debt; double debt_to_send = nb.get_to_send(); if (debt_to_send > 0.0) { - comm.send(nb.get_ctrl_mbox(), - new message(message::CREDIT, debt_to_send)); + comm.ctrl_send(nb.get_ctrl_mbox(), + new message(message::CREDIT, debt_to_send)); nb.set_to_send(0.0); new_debt = nb.get_debt() + debt_to_send; } else { @@ -238,7 +240,8 @@ void process::send1_bookkeeping(neighbor& nb) real_load -= load_to_send; } if (load_to_send > 0.0) - comm.send(nb.get_data_mbox(), new message(message::LOAD, load_to_send)); + comm.data_send(nb.get_data_mbox(), + new message(message::LOAD, load_to_send)); } void process::send_all() @@ -255,7 +258,8 @@ void process::send_all() bind(&process::send1_no_bookkeeping, this, _1)); prev_load_broadcast = real_load; } - comm.flush(false); + comm.ctrl_flush(false); + comm.data_flush(false); } void process::receive(double timeout) @@ -264,7 +268,8 @@ void process::receive(double timeout) m_host_t from; XBT_DEBUG("%sblocking receive (%g)", "\0non-" + !timeout, timeout); - while (may_receive() && comm.recv(msg, from, timeout)) { + while (may_receive() && (comm.ctrl_recv(msg, from, timeout) || + comm.data_recv(msg, from, timeout))) { switch (msg->get_type()) { case message::INFO: { neighbor* n = rev_neigh[from]; @@ -293,13 +298,14 @@ void process::receive(double timeout) delete msg; timeout = 0.0; // only wait on first recv } - comm.flush(false); + comm.ctrl_flush(false); + comm.data_flush(false); } void process::finalize1(neighbor& nb) { - comm.send(nb.get_ctrl_mbox(), new message(message::CTRL_CLOSE, 0.0)); - comm.send(nb.get_data_mbox(), new message(message::DATA_CLOSE, 0.0)); + comm.ctrl_send(nb.get_ctrl_mbox(), new message(message::CTRL_CLOSE, 0.0)); + comm.data_send(nb.get_data_mbox(), new message(message::DATA_CLOSE, 0.0)); } void process::finalize() @@ -315,13 +321,15 @@ void process::finalize() bind(&process::finalize1, this, _1)); while (may_receive()) { - comm.flush(false); + comm.ctrl_flush(false); + comm.data_flush(false); XBT_DEBUG("waiting for %d CTRL and %d DATA CLOSE", ctrl_close_pending, data_close_pending); receive(-1.0); } - comm.flush(true); + comm.ctrl_flush(true); + comm.data_flush(true); } #define print_loads_generic(vec, verbose, logp, cat) \