]> AND Private Git Repository - loba.git/blob - process.cpp
Logo AND Algorithmique Numérique Distribuée

Private GIT Repository
Rename process::send() -> process::send_all().
[loba.git] / process.cpp
1 #include <algorithm>
2 #include <tr1/functional>
3 #include <iterator>
4 #include <numeric>
5 #include <stdexcept>
6 #include <sstream>
7 #include <xbt/log.h>
8 #include <xbt/time.h>
9
10 XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(proc);
11
12 #include "misc.h"
13 #include "options.h"
14 #include "tracing.h"
15
16 #include "process.h"
17
18 double process::total_load_init = 0.0;
19 double process::total_load_running = 0.0;
20 double process::total_load_exit = 0.0;
21
22 process::process(int argc, char* argv[])
23 {
24     if (argc < 2 || !(std::istringstream(argv[1]) >> load))
25         throw std::invalid_argument("bad or missing initial load parameter");
26
27     neigh.assign(argv + 2, argv + argc);
28
29     pneigh.reserve(neigh.size());
30     for (unsigned i = 0 ; i < neigh.size() ; i++) {
31         neighbor* ptr = &neigh[i];
32         m_host_t host = MSG_get_host_by_name(ptr->get_name());
33         pneigh.push_back(ptr);
34         rev_neigh.insert(std::make_pair(host, ptr));
35     }
36
37     comp = 0.0;
38
39     prev_load_broadcast = -1;   // force sending of load on first send_all()
40     expected_load = load;
41     total_load_running += load;
42     total_load_init += load;
43
44     ctrl_close_pending = data_close_pending = neigh.size();
45     close_received = false;
46     finalizing = false;
47
48     e_xbt_log_priority_t logp = xbt_log_priority_verbose;
49     if (!LOG_ISENABLED(logp))
50         return;
51     std::ostringstream oss;
52     oss << neigh.size() << " neighbor";
53     if (!neigh.empty()) {
54         oss << ESSE(neigh.size()) << ": ";
55         std::transform(neigh.begin(), neigh.end() - 1,
56                        std::ostream_iterator<const char*>(oss, ", "),
57                        std::tr1::mem_fn(&neighbor::get_name));
58         oss << neigh.back().get_name();
59     }
60     LOG1(logp, "Got %s.", oss.str().c_str());
61     print_loads(false, logp);
62 }
63
64 process::~process()
65 {
66     total_load_exit += load;
67     if (opt::bookkeeping) {
68         INFO4("Final load after %d:%d iterations: %g ; expected: %g",
69               lb_iter, comp_iter, load, expected_load);
70     } else {
71         INFO2("Final load after %d iterations: %g",
72               lb_iter, load);
73         if (lb_iter != comp_iter)
74             WARN2("lb_iter (%d) and comp_iter (%d) differ!",
75                   lb_iter, comp_iter);
76     }
77     VERB1("Total computation for this process: %g", comp);
78 }
79
80 int process::run()
81 {
82     double next_iter_after_date = 0.0;
83     INFO1("Initial load: %g", load);
84     VERB0("Starting...");
85     comp_iter = lb_iter = 0;
86     while (true) {
87         double ld = lb_load();
88         if (ld > 0.0) {
89             double now = MSG_get_clock();
90             if (now < next_iter_after_date)
91                 MSG_process_sleep(next_iter_after_date - now);
92             next_iter_after_date = MSG_get_clock() + opt::min_iter_duration;
93
94             ++lb_iter;
95
96             if (opt::log_rate && lb_iter % opt::log_rate == 0) {
97                 if (opt::bookkeeping)
98                     INFO4("(%u:%u) current load: %g ; expected: %g",
99                           lb_iter, comp_iter, load, expected_load);
100                 else
101                     INFO2("(%u) current load: %g",
102                           lb_iter, load);
103             }
104
105             ld -= load_balance(ld);
106
107             print_loads(true, xbt_log_priority_debug);
108         }
109         lb_load() = ld;
110
111         // send load information, and load (data) if any
112         send_all();
113         if (load > 0.0) {
114             ++comp_iter;
115             compute();
116         }
117
118         if (opt::lb_maxiter && lb_iter >= opt::lb_maxiter) {
119             VERB2("Reached lb_maxiter: %d/%d", lb_iter, opt::lb_maxiter);
120             break;
121         }
122         if (opt::comp_maxiter && comp_iter >= opt::comp_maxiter) {
123             VERB2("Reached comp_maxiter: %d/%d", comp_iter, opt::comp_maxiter);
124             break;
125         }
126         if (opt::time_limit && MSG_get_clock() >= opt::time_limit) {
127             VERB2("Reached time limit: %g/%g", MSG_get_clock(), opt::time_limit);
128             break;
129         }
130
131         // block on receiving unless there is something to compute or
132         // to send
133         double timeout;
134         if (load != 0 || lb_load() != prev_load_broadcast)
135             timeout = 0.0;
136         else if (opt::min_iter_duration)
137             timeout = opt::min_iter_duration;
138         else
139             timeout = 1.0;
140         receive(timeout);
141
142         // one of our neighbor is finalizing
143         if (opt::exit_on_close && close_received) {
144             VERB0("Close received");
145             break;
146         }
147
148         // have no load and cannot receive anything
149         if (load == 0.0 && !may_receive()) {
150             VERB0("I'm a poor lonesome process, and I have no load...");
151             break;
152         }
153
154         // fixme: this check should be implemented with a distributed
155         // algorithm, and not a shared global variable!
156         // fixme: should this chunk be moved before call to receive() ?
157         if (100.0 * total_load_running / total_load_init <=
158             opt::load_ratio_threshold) {
159             VERB0("No more load to balance in system.");
160             break;
161         } else {
162             DEBUG1("still %g load to balance, continuing...", total_load_running);
163         }
164     }
165     VERB0("Going to finalize...");
166     finalize();
167
168     /* Open Questions :
169      * - definition of load on heterogeneous hosts ?
170      * - how to detect convergence ?
171      * - how to manage link failures ?
172      */
173
174     VERB0("Done.");
175     return 0;
176 }
177
178 double process::sum_of_to_send() const
179 {
180     using std::tr1::bind;
181     using std::tr1::placeholders::_1;
182     using std::tr1::placeholders::_2;
183
184     return std::accumulate(neigh.begin(), neigh.end(), 0.0,
185                            bind(std::plus<double>(),
186                                 _1, bind(&neighbor::get_to_send, _2)));
187 }
188
189 double process::load_balance(double /*my_load*/)
190 {
191     if (lb_iter == 1)           // warn only once
192         WARN0("process::load_balance() is a no-op!");
193     return 0.0;
194 }
195
196 void process::compute()
197 {
198     if (load > 0.0) {
199         double flops = opt::comp_cost(load);
200         m_task_t task = MSG_task_create("computation", flops, 0.0, NULL);
201         TRACE_msg_set_task_category(task, TRACE_CAT_COMP);
202         DEBUG2("compute %g flop%s", flops, ESSE(flops));
203         MSG_task_execute(task);
204         comp += flops;
205         MSG_task_destroy(task);
206     } else {
207         DEBUG0("nothing to compute !");
208     }
209 }
210
211 void process::send1_no_bookkeeping(neighbor& nb)
212 {
213     if (load != prev_load_broadcast)
214         comm.send(nb.get_ctrl_mbox(), new message(message::INFO, load));
215     double load_to_send = nb.get_to_send();
216     if (load_to_send > 0.0) {
217         comm.send(nb.get_data_mbox(), new message(message::LOAD, load_to_send));
218         nb.set_to_send(0.0);
219     }
220 }
221
222 void process::send1_bookkeeping(neighbor& nb)
223 {
224     if (expected_load != prev_load_broadcast)
225         comm.send(nb.get_ctrl_mbox(),
226                   new message(message::INFO, expected_load));
227     double load_to_send;
228     double new_debt;
229     double debt_to_send = nb.get_to_send();
230     if (debt_to_send > 0.0) {
231         comm.send(nb.get_ctrl_mbox(),
232                   new message(message::CREDIT, debt_to_send));
233         nb.set_to_send(0.0);
234         new_debt = nb.get_debt() + debt_to_send;
235     } else {
236         new_debt = nb.get_debt();
237     }
238     if (load <= new_debt) {
239         load_to_send = load;
240         nb.set_debt(new_debt - load_to_send);
241         load = 0.0;
242     } else {
243         load_to_send = new_debt;
244         nb.set_debt(0.0);
245         load -= load_to_send;
246     }
247     if (load_to_send > 0.0)
248         comm.send(nb.get_data_mbox(), new message(message::LOAD, load_to_send));
249 }
250
251 void process::send_all()
252 {
253     using std::tr1::bind;
254     using std::tr1::placeholders::_1;
255
256     if (opt::bookkeeping) {
257         std::for_each(neigh.begin(), neigh.end(),
258                       bind(&process::send1_bookkeeping, this, _1));
259         prev_load_broadcast = expected_load;
260     } else {
261         std::for_each(neigh.begin(), neigh.end(),
262                       bind(&process::send1_no_bookkeeping, this, _1));
263         prev_load_broadcast = load;
264     }
265     comm.flush(false);
266 }
267
268 void process::receive(double timeout)
269 {
270     message* msg;
271     m_host_t from;
272
273     DEBUG2("%sblocking receive (%g)", "\0non-" + !timeout, timeout);
274     while (may_receive() && comm.recv(msg, from, timeout)) {
275         switch (msg->get_type()) {
276         case message::INFO: {
277             neighbor* n = rev_neigh[from];
278             n->set_load(msg->get_amount());
279             break;
280         }
281         case message::CREDIT:
282             expected_load += msg->get_amount();
283             break;
284         case message::LOAD: {
285             double ld = msg->get_amount();
286             load += ld;
287             if (finalizing)
288                 total_load_running -= ld;
289             break;
290         }
291         case message::CTRL_CLOSE:
292             ctrl_close_pending--;
293             close_received = true;
294             break;
295         case message::DATA_CLOSE:
296             data_close_pending--;
297             close_received = true;
298             break;
299         }
300         delete msg;
301         timeout = 0.0;          // only wait on first recv
302     }
303     comm.flush(false);
304 }
305
306 void process::finalize1(neighbor& nb)
307 {
308     comm.send(nb.get_ctrl_mbox(), new message(message::CTRL_CLOSE, 0.0));
309     comm.send(nb.get_data_mbox(), new message(message::DATA_CLOSE, 0.0));
310 }
311
312 void process::finalize()
313 {
314     using std::tr1::bind;
315     using std::tr1::placeholders::_1;
316
317     finalizing = true;
318     total_load_running -= load;
319
320     DEBUG2("send CLOSE to %lu neighbor%s",
321            (unsigned long )neigh.size(), ESSE(neigh.size()));
322     std::for_each(neigh.begin(), neigh.end(),
323                   bind(&process::finalize1, this, _1));
324
325     DEBUG2("wait for CLOSE from %lu neighbor%s",
326            (unsigned long )neigh.size(), ESSE(neigh.size()));
327     while (may_receive()) {
328         comm.flush(false);
329         receive(-1.0);
330     }
331
332     comm.flush(true);
333 }
334
335 #define print_loads_generic(vec, verbose, logp, cat)                    \
336     if (_XBT_LOG_ISENABLEDV((*cat), logp)) {                            \
337         using std::tr1::bind;                                           \
338         using std::tr1::placeholders::_1;                               \
339         XCLOG0(cat, logp, "Neighbor loads:");                           \
340         std::for_each(vec.begin(), vec.end(),                           \
341                       bind(&neighbor::print, _1, verbose, logp, cat));  \
342     } else ((void)0)
343
344 void process::print_loads(bool verbose,
345                           e_xbt_log_priority_t logp,
346                           xbt_log_category_t cat) const
347 {
348     print_loads_generic(neigh, verbose, logp, cat);
349 }
350
351 void process::print_loads_p(bool verbose,
352                             e_xbt_log_priority_t logp,
353                             xbt_log_category_t cat) const
354 {
355     print_loads_generic(pneigh, verbose, logp, cat);
356 }
357
358 #undef print_loads_generic
359
360 // Local variables:
361 // mode: c++
362 // End: