#include "messages.h"
+message::message(message_type t, double a, double c)
+ : type(t), amount(a) , credit(c)
+{
+ // compute message size
+ // arbitrary: 8 for type, and 8 for each double
+ switch (type) {
+ case INFO:
+ size = opt::bookkeeping ? 24 : 16; // type + amount + (credit)?
+ break;
+ case LOAD:
+ size = 16 + opt::comm_cost(amount); // type + amount + data size
+ break;
+ default:
+ size = 8; // type
+ break;
+ }
+}
+
std::string message::to_string()
{
- static const char* str[] = { "INFO", "CREDIT", "LOAD",
- "CTRL_CLOSE", "DATA_CLOSE" };
+ static const char* str[DATA_CLOSE + 1] = { "INFO", "LOAD",
+ "CTRL_CLOSE", "DATA_CLOSE" };
std::ostringstream oss;
oss << str[type] << ": " << amount;
return oss.str();
}
-double message::get_size() const
-{
- // arbitrary: 8 for type, and 8 for amount
- double size = 16;
- if (type == LOAD)
- size += opt::comm_cost(amount);
- return size;
-}
-
void message_queue::push(m_task_t task)
{
if (queue.push(task)) {
class message {
public:
- enum message_type { INFO, CREDIT, LOAD, CTRL_CLOSE, DATA_CLOSE };
+ enum message_type { INFO, LOAD, CTRL_CLOSE, DATA_CLOSE };
- message(message_type t, double a): type(t), amount(a) { }
+ message(message_type t, double a, double c = 0.0);
message_type get_type() const { return type; }
double get_amount() const { return amount; }
- double get_size() const;
+ double get_credit() const { return credit; }
+ double get_size() const { return size; }
std::string to_string();
private:
message_type type;
double amount;
+ double credit;
+ double size;
};
class message_queue {
void process::ctrl_send(neighbor& nb)
{
double info_to_send = expected_load;
- if (info_to_send != prev_load_broadcast) {
- message* msg = new message(message::INFO, info_to_send);
- add_ctrl_send_mesg(msg->get_size());
- comm.ctrl_send(nb.get_ctrl_mbox(), msg);
- }
- if (opt::bookkeeping) {
- double debt_to_send = nb.get_to_send();
+ double debt_to_send;
+ if (opt::bookkeeping) { // bookkeeping
+ debt_to_send = nb.get_to_send();
if (debt_to_send > 0.0) {
nb.set_to_send(0.0);
nb.set_debt(nb.get_debt() + debt_to_send);
- message* msg = new message(message::CREDIT, debt_to_send);
- add_ctrl_send_mesg(msg->get_size());
- comm.ctrl_send(nb.get_ctrl_mbox(), msg);
}
+ } else { // !bookkeeping
+ debt_to_send = 0.0;
+ }
+ if (info_to_send != prev_load_broadcast || debt_to_send > 0.0) {
+ message* msg = new message(message::INFO, info_to_send, debt_to_send);
+ add_ctrl_send_mesg(msg->get_size());
+ comm.ctrl_send(nb.get_ctrl_mbox(), msg);
}
}
case message::INFO: {
neighbor* n = rev_neigh[from];
n->set_load(msg->get_amount() + n->get_to_send());
+ expected_load += msg->get_credit(); // may be 0.0 if !opt::bookkeeping
break;
}
- case message::CREDIT:
- expected_load += msg->get_amount();
- break;
case message::LOAD: {
double ld = msg->get_amount();
real_load += ld;