2 #include <tr1/functional>
9 XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(proc);
17 double process::total_load_init = 0.0;
18 double process::total_load_running = 0.0;
19 double process::total_load_exit = 0.0;
23 void sleep_until_date(double& date, double duration = 0.0)
25 double sleep_duration = date - MSG_get_clock();
26 if (sleep_duration > 0.0)
27 MSG_process_sleep(sleep_duration);
28 date = MSG_get_clock() + duration;
33 process::process(int argc, char* argv[])
35 if (argc < 2 || !(std::istringstream(argv[1]) >> real_load))
36 throw std::invalid_argument("bad or missing initial load parameter");
38 neigh.assign(argv + 2, argv + argc);
40 pneigh.reserve(neigh.size());
41 for (unsigned i = 0 ; i < neigh.size() ; i++) {
42 neighbor* ptr = &neigh[i];
43 m_host_t host = MSG_get_host_by_name(ptr->get_name());
44 pneigh.push_back(ptr);
45 rev_neigh.insert(std::make_pair(host, ptr));
50 prev_load_broadcast = -1; // force sending of load on first send_all()
51 expected_load = real_load;
52 total_load_running += real_load;
53 total_load_init += real_load;
55 ctrl_close_pending = data_close_pending = neigh.size();
56 close_received = false;
59 comp_iter = lb_iter = 0;
61 compute_thread = new_msg_thread("compute",
62 std::tr1::bind(&process::compute_loop,
65 e_xbt_log_priority_t logp = xbt_log_priority_verbose;
66 if (!LOG_ISENABLED(logp))
68 std::ostringstream oss;
69 oss << neigh.size() << " neighbor";
71 oss << ESSE(neigh.size()) << ": ";
72 std::transform(neigh.begin(), neigh.end() - 1,
73 std::ostream_iterator<const char*>(oss, ", "),
74 std::tr1::mem_fn(&neighbor::get_name));
75 oss << neigh.back().get_name();
77 XBT_LOG(logp, "Got %s.", oss.str().c_str());
78 print_loads(false, logp);
83 delete compute_thread;
84 total_load_exit += real_load;
85 if (opt::log_rate < 0)
87 if (opt::bookkeeping) {
88 XBT_INFO("Final load after %d:%d iterations: %g ; expected: %g",
89 lb_iter, comp_iter, real_load, expected_load);
91 XBT_INFO("Final load after %d:%d iterations: %g",
92 lb_iter, comp_iter, real_load);
94 XBT_VERB("Total computation for this process: %g", comp);
99 if (opt::log_rate >= 0)
100 XBT_INFO("Initial load: %g", real_load);
101 XBT_VERB("Starting...");
102 compute_thread->start();
104 compute_thread->wait();
109 void process::load_balance_loop()
111 using std::tr1::bind;
112 using std::tr1::placeholders::_1;
114 double next_iter_after_date = MSG_get_clock() + opt::min_lb_iter_duration;
115 while (still_running()) {
118 if (opt::log_rate && lb_iter % opt::log_rate == 0) {
119 if (opt::bookkeeping)
120 XBT_INFO("(%u:%u) current load: %g ; expected: %g",
121 lb_iter, comp_iter, real_load, expected_load);
123 XBT_INFO("(%u:%u) current load: %g",
124 lb_iter, comp_iter, real_load);
127 if (get_load() > 0.0)
130 print_loads(true, xbt_log_priority_debug);
133 std::for_each(neigh.begin(), neigh.end(),
134 bind(&process::ctrl_send, this, _1));
135 prev_load_broadcast = get_load();
137 sleep_until_date(next_iter_after_date, opt::min_lb_iter_duration);
140 comm.ctrl_flush(false);
143 XBT_VERB("Going to finalize for %s...", __func__);
144 XBT_DEBUG("send CTRL_CLOSE to %zu neighbor%s",
145 neigh.size(), ESSE(neigh.size()));
146 std::for_each(neigh.begin(), neigh.end(),
147 bind(&process::ctrl_close, this, _1));
148 while (ctrl_close_pending) {
149 comm.ctrl_flush(false);
150 XBT_DEBUG("waiting for %d CTRL CLOSE", ctrl_close_pending);
153 comm.ctrl_flush(true);
156 void process::compute_loop()
158 using std::tr1::bind;
159 using std::tr1::placeholders::_1;
161 double next_iter_after_date = MSG_get_clock() + opt::min_comp_iter_duration;
162 while (still_running()) {
167 data_receive(opt::min_comp_iter_duration);
169 comm.data_flush(false);
171 if (real_load == 0.0)
175 std::for_each(neigh.begin(), neigh.end(),
176 bind(&process::data_send, this, _1));
180 double flops = opt::comp_cost(real_load);
181 m_task_t task = MSG_task_create("computation", flops, 0.0, NULL);
182 TRACE_msg_set_task_category(task, TRACE_CAT_COMP);
183 XBT_DEBUG("compute %g flop%s", flops, ESSE(flops));
184 MSG_task_execute(task);
186 MSG_task_destroy(task);
188 sleep_until_date(next_iter_after_date, opt::min_comp_iter_duration);
191 XBT_VERB("Going to finalize for %s...", __func__);
192 // last send, for not losing load scheduled to be sent
193 std::for_each(neigh.begin(), neigh.end(),
194 bind(&process::data_send, this, _1));
196 total_load_running -= real_load;
197 XBT_DEBUG("send DATA_CLOSE to %zu neighbor%s",
198 neigh.size(), ESSE(neigh.size()));
199 std::for_each(neigh.begin(), neigh.end(),
200 bind(&process::data_close, this, _1));
201 while (data_close_pending) {
202 comm.data_flush(false);
203 XBT_DEBUG("waiting for %d DATA CLOSE", data_close_pending);
206 comm.data_flush(true);
209 bool process::still_running()
211 static bool last_status = true;
216 } else if (opt::time_limit && MSG_get_clock() >= opt::time_limit) {
217 XBT_VERB("Reached time limit: %g/%g", MSG_get_clock(), opt::time_limit);
220 } else if (opt::lb_maxiter && lb_iter >= opt::lb_maxiter) {
221 XBT_VERB("Reached lb_maxiter: %d/%d", lb_iter, opt::lb_maxiter);
224 } else if (opt::comp_maxiter && comp_iter >= opt::comp_maxiter) {
225 XBT_VERB("Reached comp_maxiter: %d/%d", comp_iter, opt::comp_maxiter);
228 } else if (opt::exit_on_close && close_received) {
229 XBT_VERB("Close received");
232 } else if (real_load == 0.0 && !data_close_pending) {
233 XBT_VERB("I'm a poor lonesome process, and I have no load...");
236 } else if (100.0 * total_load_running / total_load_init <=
237 opt::load_ratio_threshold) {
238 // fixme: this check should be implemented with a distributed
239 // algorithm, and not a shared global variable!
240 XBT_VERB("No more load to balance in system.");
247 void process::load_balance()
249 if (lb_iter == 1) // warn only once
250 XBT_WARN("process::load_balance() is a no-op!");
253 void process::send(neighbor& nb, double amount)
255 set_load(get_load() - amount);
256 nb.set_to_send(nb.get_to_send() + amount);
257 nb.set_load(nb.get_load() + amount);
260 #define print_loads_generic(vec, verbose, logp, cat) \
261 if (_XBT_LOG_ISENABLEDV((*cat), logp)) { \
262 using std::tr1::bind; \
263 using std::tr1::placeholders::_1; \
264 XBT_XCLOG(cat, logp, "Neighbor loads:"); \
265 std::for_each(vec.begin(), vec.end(), \
266 bind(&neighbor::print, _1, verbose, logp, cat)); \
269 void process::print_loads(bool verbose,
270 e_xbt_log_priority_t logp,
271 xbt_log_category_t cat) const
273 print_loads_generic(neigh, verbose, logp, cat);
276 void process::print_loads_p(bool verbose,
277 e_xbt_log_priority_t logp,
278 xbt_log_category_t cat) const
280 print_loads_generic(pneigh, verbose, logp, cat);
283 #undef print_loads_generic
285 void process::ctrl_send(neighbor& nb)
287 double info_to_send = get_load();
288 if (info_to_send != prev_load_broadcast)
289 comm.ctrl_send(nb.get_ctrl_mbox(),
290 new message(message::INFO, info_to_send));
291 if (opt::bookkeeping) {
292 double debt_to_send = nb.get_to_send();
293 if (debt_to_send > 0.0) {
295 nb.set_debt(nb.get_debt() + debt_to_send);
296 comm.ctrl_send(nb.get_ctrl_mbox(),
297 new message(message::CREDIT, debt_to_send));
302 void process::data_send(neighbor& nb)
305 if (opt::bookkeeping) {
306 if (real_load <= nb.get_debt()) {
307 load_to_send = real_load;
308 nb.set_debt(nb.get_debt() - load_to_send);
311 load_to_send = nb.get_debt();
313 real_load -= load_to_send;
316 load_to_send = nb.get_to_send();
318 // do not update real_load here
320 if (load_to_send > 0.0)
321 comm.data_send(nb.get_data_mbox(),
322 new message(message::LOAD, load_to_send));
325 void process::ctrl_close(neighbor& nb)
327 comm.ctrl_send(nb.get_ctrl_mbox(), new message(message::CTRL_CLOSE, 0.0));
330 void process::data_close(neighbor& nb)
332 comm.data_send(nb.get_data_mbox(), new message(message::DATA_CLOSE, 0.0));
335 void process::ctrl_receive(double timeout)
340 XBT_DEBUG("%sblocking receive on ctrl (%g)", "\0non-" + !timeout, timeout);
341 while (ctrl_close_pending && comm.ctrl_recv(msg, from, timeout)) {
342 handle_message(msg, from);
347 void process::data_receive(double timeout)
352 XBT_DEBUG("%sblocking receive on data (%g)", "\0non-" + !timeout, timeout);
353 while (data_close_pending && comm.data_recv(msg, from, timeout)) {
354 handle_message(msg, from);
359 void process::handle_message(message* msg, m_host_t from)
361 switch (msg->get_type()) {
362 case message::INFO: {
363 neighbor* n = rev_neigh[from];
364 n->set_load(msg->get_amount());
367 case message::CREDIT:
368 expected_load += msg->get_amount();
370 case message::LOAD: {
371 double ld = msg->get_amount();
374 total_load_running -= ld;
377 case message::CTRL_CLOSE:
378 ctrl_close_pending--;
379 close_received = true;
381 case message::DATA_CLOSE:
382 data_close_pending--;
383 close_received = true;