-* segfault with ./loba cluster1000.xml -N64 -a fairstrategy
-
- this is a deadlock occuring when:
- - a process is in the finalize stage;
- - all processes but one are blocked on receive;
- - the process that is still running owns all the remaining load,
- and sends it all to the finalizing process, and then goes in
- blocking receive.
- The finalizing process receives the load, and blocks again,
- waiting for a close message.
- All processes are then blocked, and non-one is able to see that
- there is no more load in the system!
+* review receive with timeout.
* verify bookkeeping version.
sent_comm.push_back(comm);
}
-bool communicator::recv(message*& msg, m_host_t& from, bool wait)
+bool communicator::recv(message*& msg, m_host_t& from, double timeout)
{
- if (wait) {
+ if (timeout != 0) {
+ volatile double deadline =
+ timeout > 0 ? MSG_get_clock() + timeout : 0.0;
xbt_mutex_acquire(mutex);
- while (received.empty()) {
+ while (received.empty() && (!deadline || deadline > MSG_get_clock())) {
+ xbt_ex_t e;
DEBUG0("waiting for a message to come");
- xbt_cond_wait(cond, mutex);
+ TRY {
+ if (deadline)
+ xbt_cond_timedwait(cond, mutex, deadline - MSG_get_clock());
+ else
+ xbt_cond_wait(cond, mutex);
+ }
+ CATCH (e) {
+ if (e.category != timeout_error)
+ RETHROW;
+ xbt_ex_free(e);
+ }
}
xbt_mutex_release(mutex);
}
void send(const char* dest, message* msg);
// Try to get a message. Returns true on success.
- // If "wait" is true, blocks until success.
- bool recv(message*& msg, m_host_t& from, bool wait);
+ // Parameter "timeout" may be 0 for non-blocking operation, -1 for
+ // infinite waiting, or any positive timeout.
+ bool recv(message*& msg, m_host_t& from, double timeout);
// Try to flush pending sending communications.
// If "wait" is true, blocks until success.
// block on receiving unless there is something to compute or
// to send
- bool wait = (load == 0 && lb_load() == prev_load_broadcast);
- receive(wait);
+ double timeout;
+ if (load != 0 || lb_load() != prev_load_broadcast)
+ timeout = 0.0;
+ else if (opt::min_iter_duration)
+ timeout = opt::min_iter_duration;
+ else
+ timeout = 1.0;
+ receive(timeout);
// one of our neighbor is finalizing
if (opt::exit_on_close && close_received) {
comm.flush(false);
}
-void process::receive(bool wait)
+void process::receive(double timeout)
{
message* msg;
m_host_t from;
- DEBUG1("%sblocking receive", "\0non-" + !wait);
- while (may_receive() && comm.recv(msg, from, wait)) {
+ DEBUG2("%sblocking receive (%g)", "\0non-" + !timeout, timeout);
+ while (may_receive() && comm.recv(msg, from, timeout)) {
switch (msg->get_type()) {
case message::INFO: {
neighbor* n = rev_neigh[from];
break;
}
delete msg;
- wait = false; // only wait on first recv
+ timeout = 0.0; // only wait on first recv
}
comm.flush(false);
}
(unsigned long )neigh.size(), ESSE(neigh.size()));
while (may_receive()) {
comm.flush(false);
- receive(true);
+ receive(-1.0);
}
comm.flush(true);
// Returns true if there remains neighbors to listen for
bool may_receive() { return ctrl_close_pending || data_close_pending; }
- // Receive procedure: wait (or not) for a message to come
- void receive(bool wait);
+ // Receive procedure
+ // Parameter "timeout" may be 0 for non-blocking operation, -1 for
+ // infinite waiting, or any positive timeout.
+ void receive(double timeout);
// Finalize sends a "close" message to each neighbor and wait for
// all of them to answer.