class message {
public:
- enum message_type { INFO, CREDIT, LOAD, CLOSE };
+ enum message_type { INFO, CREDIT, LOAD, CTRL_CLOSE, DATA_CLOSE };
message(message_type t, double a): type(t), amount(a) { }
~communicator();
void send(const char* dest, message* msg);
- bool recv(message*& msg, m_host_t& from);
+ bool recv(message*& msg, m_host_t& from, bool wait);
+ void wait_for_sent();
+
+ void next_close_on_ctrl_is_last();
+ void next_close_on_data_is_last();
int send_backlog();
std::string ctrl_mbox;
msg_comm_t ctrl_comm;
m_task_t ctrl_task;
+ bool ctrl_close_is_last;
// Data channel for receiving
std::string data_mbox;
msg_comm_t data_comm;
m_task_t data_task;
+ bool data_close_is_last;
const char* get_ctrl_mbox() const { return ctrl_mbox.c_str(); }
const char* get_data_mbox() const { return data_mbox.c_str(); }
+ static bool comm_test_n_destroy(msg_comm_t& comm);
void flush_sent();
};