... and insert appropriate calls in process methods.
The goal is here to destroy achieved communications as soon as
possible, and avoid a bug in SimGrid 3.5 that make the simulation
very slow when there are many communications.
-int communicator::send_count_before_flush = 4;
-
communicator::communicator()
: host((hostdata* )MSG_host_get_data(MSG_host_self()))
, mutex(xbt_mutex_init())
, cond(xbt_cond_init())
communicator::communicator()
: host((hostdata* )MSG_host_get_data(MSG_host_self()))
, mutex(xbt_mutex_init())
, cond(xbt_cond_init())
, ctrl_task(NULL)
, ctrl_comm(NULL)
, data_task(NULL)
, ctrl_task(NULL)
, ctrl_comm(NULL)
, data_task(NULL)
m_task_t task = MSG_task_create("message", 0.0, msg_size, msg);
msg_comm_t comm = MSG_task_isend(task, dest);
sent_comm.push_back(comm);
m_task_t task = MSG_task_create("message", 0.0, msg_size, msg);
msg_comm_t comm = MSG_task_isend(task, dest);
sent_comm.push_back(comm);
-
- if (++send_counter >= send_count_before_flush) {
- flush(false);
- send_counter = 0;
- }
}
bool communicator::recv(message*& msg, m_host_t& from, bool wait)
}
bool communicator::recv(message*& msg, m_host_t& from, bool wait)
// List of pending send communications
std::list<msg_comm_t> sent_comm;
// List of pending send communications
std::list<msg_comm_t> sent_comm;
- static int send_count_before_flush;
- int send_counter;
// Queue of received messages
std::queue<m_task_t> received;
// Queue of received messages
std::queue<m_task_t> received;
// Used to test if a communication is over, and to destroy it if it is
static bool comm_test_n_destroy(msg_comm_t comm);
// Used to test if a communication is over, and to destroy it if it is
static bool comm_test_n_destroy(msg_comm_t comm);
-
- // Make opt::* functions our friends to provide them an access to
- // send_count_before_flush
- friend bool opt::parse_args(int*, char* []);
- friend void opt::print();
- friend void opt::usage();
};
#endif // !COMMUNICATOR_H
};
#endif // !COMMUNICATOR_H
case 'N':
std::istringstream(optarg) >> opt::auto_depl::nhosts;
break;
case 'N':
std::istringstream(optarg) >> opt::auto_depl::nhosts;
break;
- case 's':
- std::istringstream(optarg) >> communicator::send_count_before_flush;
- break;
case 'T':
opt::auto_depl::topology = optarg;
result = opt_helper::nol_find_prefix(opt::topologies, "topology",
case 'T':
opt::auto_depl::topology = optarg;
result = opt_helper::nol_find_prefix(opt::topologies, "topology",
DESCR("maximum number of lb. iterations", "%s",
h.val_or_string(lb_maxiter, "infinity"));
DESCR("exit on close", "%s", h.on_off(exit_on_close));
DESCR("maximum number of lb. iterations", "%s",
h.val_or_string(lb_maxiter, "infinity"));
DESCR("exit on close", "%s", h.on_off(exit_on_close));
- DESCR("send count before flush", "%d",
- communicator::send_count_before_flush);
INFO0("`----");
#undef DESCR
INFO0("`----");
#undef DESCR
<< " proc : messages from base process class\n"
<< " loba : messages from load-balancer\n";
<< " proc : messages from base process class\n"
<< " loba : messages from load-balancer\n";
- std::clog << "\nMiscellaneous low-level parameters\n";
- std::clog << o("-s count")
- << "check for finished comm. every `count' send operation"
- << " (" << communicator::send_count_before_flush << ")\n";
+ // std::clog << "\nMiscellaneous low-level parameters\n";
bind(&process::send1_no_bookkeeping, this, _1));
prev_load_broadcast = load;
}
bind(&process::send1_no_bookkeeping, this, _1));
prev_load_broadcast = load;
}
}
void process::receive(bool wait)
}
void process::receive(bool wait)
delete msg;
wait = false; // only wait on first recv
}
delete msg;
wait = false; // only wait on first recv
}
}
void process::finalize1(neighbor& nb)
}
void process::finalize1(neighbor& nb)
DEBUG2("wait for CLOSE from %lu neighbor%s",
(unsigned long )neigh.size(), ESSE(neigh.size()));
DEBUG2("wait for CLOSE from %lu neighbor%s",
(unsigned long )neigh.size(), ESSE(neigh.size()));
+ while (may_receive()) {
+ comm.flush(false);