+bool process::receive(bool wait_for_close)
+{
+ bool result = true;
+ message* msg;
+ m_host_t from;
+ while ((ctrl_close_pending ||
+ data_close_pending) && comm.recv(msg, from, wait_for_close)) {
+ switch (msg->get_type()) {
+ case message::INFO:
+ DEBUG0("received INFO");
+ // fixme: update neighbor
+ // need a map m_host_t -> neighbor&
+ break;
+ case message::CREDIT:
+ DEBUG0("received CREDIT");
+ expected_load += msg->get_amount();
+ break;
+ case message::LOAD:
+ DEBUG0("received LOAD");
+ load += msg->get_amount();
+ break;
+ case message::CTRL_CLOSE:
+ DEBUG0("received CTRL_CLOSE");
+ if (--ctrl_close_pending == 1)
+ comm.next_close_on_ctrl_is_last();
+ result = false;
+ break;
+ case message::DATA_CLOSE:
+ DEBUG0("received DATA_CLOSE");
+ if (--data_close_pending == 1)
+ comm.next_close_on_data_is_last();
+ result = false;
+ break;
+ }
+ delete msg;
+ }
+ return result;
+}
+
+void process::finalize()
+{
+ DEBUG2("send CLOSE to %d neighbor%s.",
+ (int )neigh.size(), ESSE(neigh.size()));
+ std::vector<neighbor>::iterator n;
+ for (n = neigh.begin() ; n != neigh.end() ; ++n) {
+ comm.send(n->get_ctrl_mbox(), new message(message::CTRL_CLOSE, 0.0));
+ comm.send(n->get_data_mbox(), new message(message::DATA_CLOSE, 0.0));
+ }
+
+ DEBUG2("wait for CLOSE from %d neighbor%s.",
+ (int )neigh.size(), ESSE(neigh.size()));
+ receive(true);
+
+ comm.wait_for_sent();
+}
+