]> AND Private Git Repository - loba.git/blob - process.cpp
Logo AND Algorithmique Numérique Distribuée

Private GIT Repository
Add ability to delay the beginning of computations.
[loba.git] / process.cpp
1 #include <algorithm>
2 #include <tr1/functional>
3 #include <iterator>
4 #include <stdexcept>
5 #include <sstream>
6 #include <xbt/log.h>
7 #include <xbt/time.h>
8
9 XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(proc);
10
11 #include "misc.h"
12 #include "options.h"
13 #include "tracing.h"
14
15 #include "process.h"
16
17 double process::total_load_init = 0.0;
18 double process::total_load_running = 0.0;
19 double process::total_load_exit = 0.0;
20
21 namespace {
22
23     void sleep_until_date(double& date, double duration = 0.0)
24     {
25         double sleep_duration = date - MSG_get_clock();
26         if (sleep_duration > 0.0)
27             MSG_process_sleep(sleep_duration);
28         date = MSG_get_clock() + duration;
29     }
30
31 }
32
33 process::process(int argc, char* argv[])
34 {
35     if (argc < 2 || !(std::istringstream(argv[1]) >> real_load))
36         throw std::invalid_argument("bad or missing initial load parameter");
37
38     neigh.assign(argv + 2, argv + argc);
39
40     pneigh.reserve(neigh.size());
41     for (unsigned i = 0 ; i < neigh.size() ; i++) {
42         neighbor* ptr = &neigh[i];
43         m_host_t host = MSG_get_host_by_name(ptr->get_name());
44         pneigh.push_back(ptr);
45         rev_neigh.insert(std::make_pair(host, ptr));
46     }
47
48     comp = 0.0;
49
50     prev_load_broadcast = -1;   // force sending of load on first send_all()
51     expected_load = real_load;
52     total_load_running += real_load;
53     total_load_init += real_load;
54
55     ctrl_close_pending = data_close_pending = neigh.size();
56     close_received = false;
57     finalizing = false;
58
59     comp_iter = lb_iter = 0;
60
61     lb_thread = new_msg_thread("loba",
62                                std::tr1::bind(&process::load_balance_loop,
63                                               this));
64
65     e_xbt_log_priority_t logp = xbt_log_priority_verbose;
66     if (!LOG_ISENABLED(logp))
67         return;
68     std::ostringstream oss;
69     oss << neigh.size() << " neighbor";
70     if (!neigh.empty()) {
71         oss << ESSE(neigh.size()) << ": ";
72         std::transform(neigh.begin(), neigh.end() - 1,
73                        std::ostream_iterator<const char*>(oss, ", "),
74                        std::tr1::mem_fn(&neighbor::get_name));
75         oss << neigh.back().get_name();
76     }
77     XBT_LOG(logp, "Got %s.", oss.str().c_str());
78     print_loads(false, logp);
79 }
80
81 process::~process()
82 {
83     delete lb_thread;
84     total_load_exit += real_load;
85     if (opt::log_rate < 0)
86         return;
87     if (opt::bookkeeping) {
88         XBT_INFO("Final load after %d:%d iterations: %g ; expected: %g",
89                  lb_iter, comp_iter, real_load, expected_load);
90     } else {
91         XBT_INFO("Final load after %d:%d iterations: %g",
92                  lb_iter, comp_iter, real_load);
93     }
94     XBT_VERB("Total computation for this process: %g", comp);
95 }
96
97 int process::run()
98 {
99     if (opt::log_rate >= 0)
100         XBT_INFO("Initial load: %g", real_load);
101     XBT_VERB("Starting...");
102     mutex.acquire();
103     lb_thread->start();
104     while (lb_iter <= opt::comp_iter_delay)
105         cond.wait(mutex);
106     mutex.release();
107     double sleep_duration = opt::comp_time_delay - MSG_get_clock();
108     if (sleep_duration > 0.0)
109         MSG_process_sleep(sleep_duration);
110     compute_loop();
111     lb_thread->wait();
112     XBT_VERB("Done.");
113     return 0;
114 }
115
116 void process::load_balance_loop()
117 {
118     using std::tr1::bind;
119     using std::tr1::placeholders::_1;
120
121     double next_iter_after_date = MSG_get_clock() + opt::min_lb_iter_duration;
122     while (still_running()) {
123         if (lb_iter == opt::comp_iter_delay) {
124             mutex.acquire();
125             ++lb_iter;
126             cond.signal();
127             mutex.release();
128         } else {
129             ++lb_iter;
130         }
131
132         if (opt::log_rate && lb_iter % opt::log_rate == 0) {
133             if (opt::bookkeeping)
134                 XBT_INFO("(%u:%u) current load: %g ; expected: %g",
135                          lb_iter, comp_iter, real_load, expected_load);
136             else
137                 XBT_INFO("(%u:%u) current load: %g",
138                          lb_iter, comp_iter, real_load);
139         }
140
141         if (get_load() > 0.0)
142             load_balance();
143
144         print_loads(true, xbt_log_priority_debug);
145
146         // send
147         std::for_each(neigh.begin(), neigh.end(),
148                       bind(&process::ctrl_send, this, _1));
149         prev_load_broadcast = get_load();
150
151         sleep_until_date(next_iter_after_date, opt::min_lb_iter_duration);
152         ctrl_receive(0.0);
153
154         comm.ctrl_flush(false);
155     }
156
157     XBT_VERB("Going to finalize for %s...", __func__);
158     XBT_DEBUG("send CTRL_CLOSE to %zu neighbor%s",
159               neigh.size(), ESSE(neigh.size()));
160     std::for_each(neigh.begin(), neigh.end(),
161                   bind(&process::ctrl_close, this, _1));
162     while (ctrl_close_pending) {
163         comm.ctrl_flush(false);
164         XBT_DEBUG("waiting for %d CTRL CLOSE", ctrl_close_pending);
165         ctrl_receive(-1.0);
166     }
167     comm.ctrl_flush(true);
168 }
169
170 void process::compute_loop()
171 {
172     using std::tr1::bind;
173     using std::tr1::placeholders::_1;
174
175     double next_iter_after_date = MSG_get_clock() + opt::min_comp_iter_duration;
176     while (still_running()) {
177         // receive
178         if (real_load > 0.0)
179             data_receive(0.0);
180         else
181             data_receive(opt::min_comp_iter_duration);
182
183         comm.data_flush(false);
184
185         if (real_load == 0.0)
186             continue;
187
188         // send
189         std::for_each(neigh.begin(), neigh.end(),
190                       bind(&process::data_send, this, _1));
191
192         // compute
193         ++comp_iter;
194         double flops = opt::comp_cost(real_load);
195         m_task_t task = MSG_task_create("computation", flops, 0.0, NULL);
196         TRACE_msg_set_task_category(task, TRACE_CAT_COMP);
197         XBT_DEBUG("compute %g flop%s", flops, ESSE(flops));
198         MSG_task_execute(task);
199         comp += flops;
200         MSG_task_destroy(task);
201
202         sleep_until_date(next_iter_after_date, opt::min_comp_iter_duration);
203     }
204
205     XBT_VERB("Going to finalize for %s...", __func__);
206     // last send, for not losing load scheduled to be sent
207     std::for_each(neigh.begin(), neigh.end(),
208                   bind(&process::data_send, this, _1));
209     finalizing = true;
210     total_load_running -= real_load;
211     XBT_DEBUG("send DATA_CLOSE to %zu neighbor%s",
212               neigh.size(), ESSE(neigh.size()));
213     std::for_each(neigh.begin(), neigh.end(),
214                   bind(&process::data_close, this, _1));
215     while (data_close_pending) {
216         comm.data_flush(false);
217         XBT_DEBUG("waiting for %d DATA CLOSE", data_close_pending);
218         data_receive(-1.0);
219     }
220     comm.data_flush(true);
221 }
222
223 bool process::still_running()
224 {
225     static bool last_status = true;
226
227     if (!last_status) {
228         /* nop */
229
230     } else if (opt::time_limit && MSG_get_clock() >= opt::time_limit) {
231         XBT_VERB("Reached time limit: %g/%g", MSG_get_clock(), opt::time_limit);
232         last_status = false;
233
234     } else if (opt::lb_maxiter && lb_iter >= opt::lb_maxiter) {
235         XBT_VERB("Reached lb_maxiter: %d/%d", lb_iter, opt::lb_maxiter);
236         last_status = false;
237
238     } else if (opt::comp_maxiter && comp_iter >= opt::comp_maxiter) {
239         XBT_VERB("Reached comp_maxiter: %d/%d", comp_iter, opt::comp_maxiter);
240         last_status = false;
241
242     } else if (opt::exit_on_close && close_received) {
243         XBT_VERB("Close received");
244         last_status = false;
245
246     } else if (real_load == 0.0 && !data_close_pending) {
247         XBT_VERB("I'm a poor lonesome process, and I have no load...");
248         last_status = false;
249
250     } else if (100.0 * total_load_running / total_load_init <=
251                opt::load_ratio_threshold) {
252         // fixme: this check should be implemented with a distributed
253         // algorithm, and not a shared global variable!
254         XBT_VERB("No more load to balance in system.");
255         last_status = false;
256     }
257
258     return last_status;
259 }
260
261 void process::load_balance()
262 {
263     if (lb_iter == 1)           // warn only once
264         XBT_WARN("process::load_balance() is a no-op!");
265 }
266
267 void process::send(neighbor& nb, double amount)
268 {
269     set_load(get_load() - amount);
270     nb.set_to_send(nb.get_to_send() + amount);
271     nb.set_load(nb.get_load() + amount);
272 }
273
274 #define print_loads_generic(vec, verbose, logp, cat)                    \
275     if (_XBT_LOG_ISENABLEDV((*cat), logp)) {                            \
276         using std::tr1::bind;                                           \
277         using std::tr1::placeholders::_1;                               \
278         XBT_XCLOG(cat, logp, "Neighbor loads:");                        \
279         std::for_each(vec.begin(), vec.end(),                           \
280                       bind(&neighbor::print, _1, verbose, logp, cat));  \
281     } else ((void)0)
282
283 void process::print_loads(bool verbose,
284                           e_xbt_log_priority_t logp,
285                           xbt_log_category_t cat) const
286 {
287     print_loads_generic(neigh, verbose, logp, cat);
288 }
289
290 void process::print_loads_p(bool verbose,
291                             e_xbt_log_priority_t logp,
292                             xbt_log_category_t cat) const
293 {
294     print_loads_generic(pneigh, verbose, logp, cat);
295 }
296
297 #undef print_loads_generic
298
299 void process::ctrl_send(neighbor& nb)
300 {
301     double info_to_send = get_load();
302     if (info_to_send != prev_load_broadcast)
303         comm.ctrl_send(nb.get_ctrl_mbox(),
304                        new message(message::INFO, info_to_send));
305     if (opt::bookkeeping) {
306         double debt_to_send = nb.get_to_send();
307         if (debt_to_send > 0.0) {
308             nb.set_to_send(0.0);
309             nb.set_debt(nb.get_debt() + debt_to_send);
310             comm.ctrl_send(nb.get_ctrl_mbox(),
311                            new message(message::CREDIT, debt_to_send));
312         }
313     }
314 }
315
316 void process::data_send(neighbor& nb)
317 {
318     double load_to_send;
319     if (opt::bookkeeping) {
320         if (real_load <= nb.get_debt()) {
321             load_to_send = real_load;
322             nb.set_debt(nb.get_debt() - load_to_send);
323             real_load = 0.0;
324         } else {
325             load_to_send = nb.get_debt();
326             nb.set_debt(0.0);
327             real_load -= load_to_send;
328         }
329     } else {
330         load_to_send = nb.get_to_send();
331         nb.set_to_send(0.0);
332         // do not update real_load here
333     }
334     if (load_to_send > 0.0)
335         comm.data_send(nb.get_data_mbox(),
336                        new message(message::LOAD, load_to_send));
337 }
338
339 void process::ctrl_close(neighbor& nb)
340 {
341     comm.ctrl_send(nb.get_ctrl_mbox(), new message(message::CTRL_CLOSE, 0.0));
342 }
343
344 void process::data_close(neighbor& nb)
345 {
346     comm.data_send(nb.get_data_mbox(), new message(message::DATA_CLOSE, 0.0));
347 }
348
349 void process::ctrl_receive(double timeout)
350 {
351     message* msg;
352     m_host_t from;
353
354     XBT_DEBUG("%sblocking receive on ctrl (%g)", "\0non-" + !timeout, timeout);
355     while (ctrl_close_pending && comm.ctrl_recv(msg, from, timeout)) {
356         handle_message(msg, from);
357         timeout = 0.0;
358     }
359 }
360
361 void process::data_receive(double timeout)
362 {
363     message* msg;
364     m_host_t from;
365
366     XBT_DEBUG("%sblocking receive on data (%g)", "\0non-" + !timeout, timeout);
367     while (data_close_pending && comm.data_recv(msg, from, timeout)) {
368         handle_message(msg, from);
369         timeout = 0.0;
370     }
371 }
372
373 void process::handle_message(message* msg, m_host_t from)
374 {
375     switch (msg->get_type()) {
376     case message::INFO: {
377         neighbor* n = rev_neigh[from];
378         n->set_load(msg->get_amount());
379         break;
380     }
381     case message::CREDIT:
382         expected_load += msg->get_amount();
383         break;
384     case message::LOAD: {
385         double ld = msg->get_amount();
386         real_load += ld;
387         if (finalizing)
388             total_load_running -= ld;
389         break;
390     }
391     case message::CTRL_CLOSE:
392         ctrl_close_pending--;
393         close_received = true;
394         break;
395     case message::DATA_CLOSE:
396         data_close_pending--;
397         close_received = true;
398         break;
399     }
400     delete msg;
401 }
402
403 // Local variables:
404 // mode: c++
405 // End: