]> AND Private Git Repository - loba.git/blob - process.cpp
Logo AND Algorithmique Numérique Distribuée

Private GIT Repository
Experimental tracing support.
[loba.git] / process.cpp
1 #include <algorithm>
2 #include <tr1/functional>
3 #include <iterator>
4 #include <numeric>
5 #include <stdexcept>
6 #include <sstream>
7 #include <xbt/log.h>
8 #include <xbt/time.h>
9
10 XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(proc);
11
12 #include "misc.h"
13 #include "options.h"
14 #include "tracing.h"
15
16 #include "process.h"
17
18 double process::total_load_init = 0.0;
19 double process::total_load_running = 0.0;
20 double process::total_load_exit = 0.0;
21
22 process::process(int argc, char* argv[])
23 {
24     if (argc < 2 || !(std::istringstream(argv[1]) >> load))
25         throw std::invalid_argument("bad or missing initial load parameter");
26
27     neigh.assign(argv + 2, argv + argc);
28
29     pneigh.reserve(neigh.size());
30     for (unsigned i = 0 ; i < neigh.size() ; i++) {
31         neighbor* ptr = &neigh[i];
32         m_host_t host = MSG_get_host_by_name(ptr->get_name());
33         pneigh.push_back(ptr);
34         rev_neigh.insert(std::make_pair(host, ptr));
35     }
36
37     comp = 0.0;
38
39     prev_load_broadcast = -1;   // force sending of load on first send()
40     expected_load = load;
41     total_load_running += load;
42     total_load_init += load;
43
44     ctrl_close_pending = data_close_pending = neigh.size();
45     close_received = false;
46     finalizing = false;
47
48     e_xbt_log_priority_t logp = xbt_log_priority_verbose;
49     if (!LOG_ISENABLED(logp))
50         return;
51     std::ostringstream oss;
52     oss << neigh.size() << " neighbor";
53     if (!neigh.empty()) {
54         oss << ESSE(neigh.size()) << ": ";
55         std::transform(neigh.begin(), neigh.end() - 1,
56                        std::ostream_iterator<const char*>(oss, ", "),
57                        std::tr1::mem_fn(&neighbor::get_name));
58         oss << neigh.back().get_name();
59     }
60     LOG1(logp, "Got %s.", oss.str().c_str());
61     print_loads(false, logp);
62 }
63
64 process::~process()
65 {
66     total_load_exit += load;
67 }
68
69 int process::run()
70 {
71     double next_iter_after_date = 0.0;
72     INFO1("Initial load: %g", load);
73     VERB0("Starting...");
74     comp_iter = lb_iter = 0;
75     while (true) {
76         double ld = lb_load();
77         if (ld > 0.0) {
78             double now = MSG_get_clock();
79             if (now < next_iter_after_date)
80                 MSG_process_sleep(next_iter_after_date - now);
81             next_iter_after_date = MSG_get_clock() + opt::min_iter_duration;
82
83             ++lb_iter;
84
85             if (opt::log_rate && lb_iter % opt::log_rate == 0) {
86                 if (opt::bookkeeping)
87                     INFO4("(%u:%u) current load: %g ; expected: %g",
88                           lb_iter, comp_iter, load, expected_load);
89                 else
90                     INFO2("(%u) current load: %g",
91                           lb_iter, load);
92             }
93
94             ld -= load_balance(ld);
95
96             print_loads(true, xbt_log_priority_debug);
97         }
98         lb_load() = ld;
99
100         // send load information, and load (data) if any
101         send();
102         if (load > 0.0) {
103             ++comp_iter;
104             compute();
105         }
106
107         if (opt::lb_maxiter && lb_iter >= opt::lb_maxiter) {
108             VERB2("Reached lb_maxiter: %d/%d", lb_iter, opt::lb_maxiter);
109             break;
110         }
111         if (opt::comp_maxiter && comp_iter >= opt::comp_maxiter) {
112             VERB2("Reached comp_maxiter: %d/%d", comp_iter, opt::comp_maxiter);
113             break;
114         }
115         if (opt::time_limit && MSG_get_clock() >= opt::time_limit) {
116             VERB2("Reached time limit: %g/%g", MSG_get_clock(), opt::time_limit);
117             break;
118         }
119
120         // block on receiving unless there is something to compute or
121         // to send
122         double timeout;
123         if (load != 0 || lb_load() != prev_load_broadcast)
124             timeout = 0.0;
125         else if (opt::min_iter_duration)
126             timeout = opt::min_iter_duration;
127         else
128             timeout = 1.0;
129         receive(timeout);
130
131         // one of our neighbor is finalizing
132         if (opt::exit_on_close && close_received) {
133             VERB0("Close received");
134             break;
135         }
136
137         // have no load and cannot receive anything
138         if (load == 0.0 && !may_receive()) {
139             VERB0("I'm a poor lonesome process, and I have no load...");
140             break;
141         }
142
143         // fixme: this check should be implemented with a distributed
144         // algorithm, and not a shared global variable!
145         // fixme: should this chunk be moved before call to receive() ?
146         if (100.0 * total_load_running / total_load_init <=
147             opt::load_ratio_threshold) {
148             VERB0("No more load to balance in system.");
149             break;
150         } else {
151             DEBUG1("still %g load to balance, continuing...", total_load_running);
152         }
153     }
154     VERB0("Going to finalize...");
155     finalize();
156
157     /* Open Questions :
158      * - definition of load on heterogeneous hosts ?
159      * - how to detect convergence ?
160      * - how to manage link failures ?
161      */
162
163     VERB0("Done.");
164     if (opt::bookkeeping) {
165         INFO4("Final load after %d:%d iterations: %g ; expected: %g",
166               lb_iter, comp_iter, load, expected_load);
167     } else {
168         INFO2("Final load after %d iterations: %g",
169               lb_iter, load);
170         if (lb_iter != comp_iter)
171             WARN2("lb_iter (%d) and comp_iter (%d) differ!",
172                   lb_iter, comp_iter);
173     }
174     return 0;
175 }
176
177 double process::sum_of_to_send() const
178 {
179     using std::tr1::bind;
180     using std::tr1::placeholders::_1;
181     using std::tr1::placeholders::_2;
182
183     return std::accumulate(neigh.begin(), neigh.end(), 0.0,
184                            bind(std::plus<double>(),
185                                 _1, bind(&neighbor::get_to_send, _2)));
186 }
187
188 double process::load_balance(double /*my_load*/)
189 {
190     if (lb_iter == 1)           // warn only once
191         WARN0("process::load_balance() is a no-op!");
192     return 0.0;
193 }
194
195 void process::compute()
196 {
197     if (load > 0.0) {
198         double flops = opt::comp_cost(load);
199         m_task_t task = MSG_task_create("computation", flops, 0.0, NULL);
200         TRACE_msg_set_task_category(task, TRACE_CAT_COMP);
201         DEBUG2("compute %g flop%s", flops, ESSE(flops));
202         MSG_task_execute(task);
203         comp += flops;
204         MSG_task_destroy(task);
205     } else {
206         DEBUG0("nothing to compute !");
207     }
208 }
209
210 void process::send1_no_bookkeeping(neighbor& nb)
211 {
212     if (load != prev_load_broadcast)
213         comm.send(nb.get_ctrl_mbox(), new message(message::INFO, load));
214     double load_to_send = nb.get_to_send();
215     if (load_to_send > 0.0) {
216         comm.send(nb.get_data_mbox(), new message(message::LOAD, load_to_send));
217         nb.set_to_send(0.0);
218     }
219 }
220
221 void process::send1_bookkeeping(neighbor& nb)
222 {
223     if (expected_load != prev_load_broadcast)
224         comm.send(nb.get_ctrl_mbox(),
225                   new message(message::INFO, expected_load));
226     double load_to_send;
227     double new_debt;
228     double debt_to_send = nb.get_to_send();
229     if (debt_to_send > 0.0) {
230         comm.send(nb.get_ctrl_mbox(),
231                   new message(message::CREDIT, debt_to_send));
232         nb.set_to_send(0.0);
233         new_debt = nb.get_debt() + debt_to_send;
234     } else {
235         new_debt = nb.get_debt();
236     }
237     if (load <= new_debt) {
238         load_to_send = load;
239         nb.set_debt(new_debt - load_to_send);
240         load = 0.0;
241     } else {
242         load_to_send = new_debt;
243         nb.set_debt(0.0);
244         load -= load_to_send;
245     }
246     if (load_to_send > 0.0)
247         comm.send(nb.get_data_mbox(), new message(message::LOAD, load_to_send));
248 }
249
250 void process::send()
251 {
252     using std::tr1::bind;
253     using std::tr1::placeholders::_1;
254
255     if (opt::bookkeeping) {
256         std::for_each(neigh.begin(), neigh.end(),
257                       bind(&process::send1_bookkeeping, this, _1));
258         prev_load_broadcast = expected_load;
259     } else {
260         std::for_each(neigh.begin(), neigh.end(),
261                       bind(&process::send1_no_bookkeeping, this, _1));
262         prev_load_broadcast = load;
263     }
264     comm.flush(false);
265 }
266
267 void process::receive(double timeout)
268 {
269     message* msg;
270     m_host_t from;
271
272     DEBUG2("%sblocking receive (%g)", "\0non-" + !timeout, timeout);
273     while (may_receive() && comm.recv(msg, from, timeout)) {
274         switch (msg->get_type()) {
275         case message::INFO: {
276             neighbor* n = rev_neigh[from];
277             n->set_load(msg->get_amount());
278             break;
279         }
280         case message::CREDIT:
281             expected_load += msg->get_amount();
282             break;
283         case message::LOAD: {
284             double ld = msg->get_amount();
285             load += ld;
286             if (finalizing)
287                 total_load_running -= ld;
288             break;
289         }
290         case message::CTRL_CLOSE:
291             ctrl_close_pending--;
292             close_received = true;
293             break;
294         case message::DATA_CLOSE:
295             data_close_pending--;
296             close_received = true;
297             break;
298         }
299         delete msg;
300         timeout = 0.0;          // only wait on first recv
301     }
302     comm.flush(false);
303 }
304
305 void process::finalize1(neighbor& nb)
306 {
307     comm.send(nb.get_ctrl_mbox(), new message(message::CTRL_CLOSE, 0.0));
308     comm.send(nb.get_data_mbox(), new message(message::DATA_CLOSE, 0.0));
309 }
310
311 void process::finalize()
312 {
313     using std::tr1::bind;
314     using std::tr1::placeholders::_1;
315
316     finalizing = true;
317     total_load_running -= load;
318
319     DEBUG2("send CLOSE to %lu neighbor%s",
320            (unsigned long )neigh.size(), ESSE(neigh.size()));
321     std::for_each(neigh.begin(), neigh.end(),
322                   bind(&process::finalize1, this, _1));
323
324     DEBUG2("wait for CLOSE from %lu neighbor%s",
325            (unsigned long )neigh.size(), ESSE(neigh.size()));
326     while (may_receive()) {
327         comm.flush(false);
328         receive(-1.0);
329     }
330
331     comm.flush(true);
332 }
333
334 #define print_loads_generic(vec, verbose, logp, cat)                    \
335     if (_XBT_LOG_ISENABLEDV((*cat), logp)) {                            \
336         using std::tr1::bind;                                           \
337         using std::tr1::placeholders::_1;                               \
338         XCLOG0(cat, logp, "Neighbor loads:");                           \
339         std::for_each(vec.begin(), vec.end(),                           \
340                       bind(&neighbor::print, _1, verbose, logp, cat));  \
341     } else ((void)0)
342
343 void process::print_loads(bool verbose,
344                           e_xbt_log_priority_t logp,
345                           xbt_log_category_t cat) const
346 {
347     print_loads_generic(neigh, verbose, logp, cat);
348 }
349
350 void process::print_loads_p(bool verbose,
351                             e_xbt_log_priority_t logp,
352                             xbt_log_category_t cat) const
353 {
354     print_loads_generic(pneigh, verbose, logp, cat);
355 }
356
357 #undef print_loads_generic
358
359 // Local variables:
360 // mode: c++
361 // End: