bind(&process::send1_no_bookkeeping, this, _1));
prev_load_broadcast = load;
}
+ comm.flush(false);
}
void process::receive(bool wait)
message* msg;
m_host_t from;
- if (may_receive() && comm.recv(msg, from, wait)) {
+ while (may_receive() && comm.recv(msg, from, wait)) {
switch (msg->get_type()) {
case message::INFO: {
neighbor* n = rev_neigh[from];
break;
}
delete msg;
+ wait = false; // only wait on first recv
}
+ comm.flush(false);
}
void process::finalize1(neighbor& nb)
DEBUG2("wait for CLOSE from %lu neighbor%s",
(unsigned long )neigh.size(), ESSE(neigh.size()));
- while (may_receive())
+ while (may_receive()) {
+ comm.flush(false);
receive(true);
+ }
comm.flush(true);
}