Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
bca5a431fe59a6b0622ec6fe13e7db0ca66c5e32
[simgrid.git] / examples / cpp / dag-scheduling / s4u-dag-scheduling.cpp
1 /* Copyright (c) 2009-2021. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 /* simple test to schedule a DAX file with the Min-Min algorithm.           */
8 #include <math.h>
9 #include <simgrid/host.h>
10 #include <simgrid/s4u.hpp>
11 #include <string.h>
12
13 #if SIMGRID_HAVE_JEDULE
14 #include "simgrid/jedule/jedule_sd_binding.h"
15 #endif
16
17 XBT_LOG_NEW_DEFAULT_CATEGORY(dag_scheduling, "Logging specific to this example");
18
19 typedef struct _HostAttribute* HostAttribute;
20 struct _HostAttribute {
21   /* Earliest time at which a host is ready to execute a task */
22   double available_at;
23   simgrid::s4u::Exec* last_scheduled_task;
24 };
25
26 static double sg_host_get_available_at(const simgrid::s4u::Host* host)
27 {
28   return static_cast<HostAttribute>(host->get_data())->available_at;
29 }
30
31 static void sg_host_set_available_at(simgrid::s4u::Host* host, double time)
32 {
33   auto* attr         = static_cast<HostAttribute>(host->get_data());
34   attr->available_at = time;
35   host->set_data(attr);
36 }
37
38 static simgrid::s4u::Exec* sg_host_get_last_scheduled_task(const simgrid::s4u::Host* host)
39 {
40   return static_cast<HostAttribute>(host->get_data())->last_scheduled_task;
41 }
42
43 static void sg_host_set_last_scheduled_task(simgrid::s4u::Host* host, simgrid::s4u::ExecPtr task)
44 {
45   auto* attr                = static_cast<HostAttribute>(host->get_data());
46   attr->last_scheduled_task = task.get();
47   host->set_data(attr);
48 }
49
50 static bool dependency_exists(simgrid::s4u::Exec* src, simgrid::s4u::Exec* dst)
51 {
52   const auto& dependencies = src->get_dependencies();
53   const auto& successors   = src->get_successors();
54   return (std::find(successors.begin(), successors.end(), dst) != successors.end() ||
55           dependencies.find(dst) != dependencies.end());
56 }
57
58 static std::vector<simgrid::s4u::Exec*> get_ready_tasks(const std::vector<simgrid::s4u::ActivityPtr> dax)
59 {
60   std::vector<simgrid::s4u::Exec*> ready_tasks;
61   std::map<simgrid::s4u::Exec*, unsigned int> candidate_execs;
62
63   for (auto& a : dax) {
64     // Only loot at activity that have their dependencies solved but are not assigned
65     if (a->dependencies_solved() && not a->is_assigned()) {
66       // if it is an exec, it's ready
67       auto* exec = dynamic_cast<simgrid::s4u::Exec*>(a.get());
68       if (exec != nullptr)
69         ready_tasks.push_back(exec);
70       // if it a comm, we consider its successor as a candidate. If a candidate solves all its dependencies,
71       // i.e., get all its input data, it's ready
72       auto* comm = dynamic_cast<simgrid::s4u::Comm*>(a.get());
73       if (comm != nullptr) {
74         auto* next_exec = static_cast<simgrid::s4u::Exec*>(comm->get_successors().front().get());
75         candidate_execs[next_exec]++;
76         if (next_exec->get_dependencies().size() == candidate_execs[next_exec])
77           ready_tasks.push_back(next_exec);
78       }
79     }
80   }
81   XBT_DEBUG("There are %zu ready tasks", ready_tasks.size());
82   return ready_tasks;
83 }
84
85 static double finish_on_at(const simgrid::s4u::ExecPtr task, const simgrid::s4u::Host* host)
86 {
87   double result;
88
89   const auto& parents = task->get_dependencies();
90
91   if (not parents.empty()) {
92     double data_available = 0.;
93     double last_data_available;
94     /* compute last_data_available */
95     last_data_available = -1.0;
96     for (const auto& parent : parents) {
97       /* normal case */
98       auto* comm = dynamic_cast<simgrid::s4u::Comm*>(parent.get());
99       if (comm != nullptr) {
100         auto source = comm->get_source();
101         XBT_DEBUG("transfer from %s to %s", source->get_cname(), host->get_cname());
102         /* Estimate the redistribution time from this parent */
103         double redist_time;
104         if (comm->get_remaining() <= 1e-6) {
105           redist_time = 0;
106         } else {
107           redist_time = sg_host_get_route_latency(source, host) +
108                         comm->get_remaining() / sg_host_get_route_bandwidth(source, host);
109         }
110         // We use the user data field to store the finish time of the predecessor of the comm, i.e., its potential start
111         // time
112         data_available = *(static_cast<double*>(comm->get_data())) + redist_time;
113       }
114
115       auto* exec = dynamic_cast<simgrid::s4u::Exec*>(parent.get());
116       /* no transfer, control dependency */
117       if (exec != nullptr) {
118         data_available = exec->get_finish_time();
119       }
120
121       if (last_data_available < data_available)
122         last_data_available = data_available;
123     }
124
125     result = fmax(sg_host_get_available_at(host), last_data_available) + task->get_remaining() / host->get_speed();
126   } else
127     result = sg_host_get_available_at(host) + task->get_remaining() / host->get_speed();
128
129   return result;
130 }
131
132 static simgrid::s4u::Host* get_best_host(const simgrid::s4u::ExecPtr exec)
133 {
134   std::vector<simgrid::s4u::Host*> hosts = simgrid::s4u::Engine::get_instance()->get_all_hosts();
135   auto best_host                         = hosts.front();
136   double min_EFT                         = finish_on_at(exec, best_host);
137
138   for (const auto& h : hosts) {
139     double EFT = finish_on_at(exec, h);
140     XBT_DEBUG("%s finishes on %s at %f", exec->get_cname(), h->get_cname(), EFT);
141
142     if (EFT < min_EFT) {
143       min_EFT   = EFT;
144       best_host = h;
145     }
146   }
147   return best_host;
148 }
149
150 int main(int argc, char** argv)
151 {
152   double min_finish_time            = -1.0;
153   simgrid::s4u::Exec* selected_task = nullptr;
154   simgrid::s4u::Host* selected_host = nullptr;
155   char* tracefilename               = nullptr;
156
157   simgrid::s4u::Engine e(&argc, argv);
158   std::set<simgrid::s4u::Activity*> vetoed;
159   e.track_vetoed_activities(&vetoed);
160
161   simgrid::s4u::Activity::on_completion.connect([](simgrid::s4u::Activity& activity) {
162     // when an Exec completes, we need to set the potential start time of all its ouput comms
163     const auto* exec = dynamic_cast<simgrid::s4u::Exec*>(&activity);
164     if (exec == nullptr) // Only Execs are concerned here
165       return;
166     for (const auto& succ : exec->get_successors()) {
167       auto* comm = dynamic_cast<simgrid::s4u::Comm*>(succ.get());
168       if (comm != nullptr) {
169         double* finish_time = new double(exec->get_finish_time());
170         // We use the user data field to store the finish time of the predecessor of the comm, i.e., its potential start
171         // time
172         comm->set_data(finish_time);
173       }
174     }
175   });
176   /* Check our arguments */
177   xbt_assert(argc > 2,
178              "Usage: %s platform_file dax_file [jedule_file]\n"
179              "\tExample: %s simulacrum_7_hosts.xml Montage_25.xml Montage_25.jed",
180              argv[0], argv[0]);
181
182   if (argc == 4)
183     tracefilename = xbt_strdup(argv[3]);
184
185   e.load_platform(argv[1]);
186
187   /*  Allocating the host attribute */
188   unsigned int total_nhosts = e.get_host_count();
189   const auto hosts          = e.get_all_hosts();
190
191   for (unsigned int i = 0; i < total_nhosts; i++)
192     hosts[i]->set_data(xbt_new0(struct _HostAttribute, 1));
193
194   /* load the DAX file */
195   std::vector<simgrid::s4u::ActivityPtr> dax = simgrid::s4u::create_DAG_from_DAX(argv[2]);
196
197   /* Schedule the root first */
198   auto* root = static_cast<simgrid::s4u::Exec*>(dax.front().get());
199   auto host  = get_best_host(root);
200   root->set_host(host);
201   // we can also set the source of all the output comms of the root node
202   for (const auto& succ : root->get_successors()) {
203     auto* comm = dynamic_cast<simgrid::s4u::Comm*>(succ.get());
204     if (comm != nullptr)
205       comm->set_source(host);
206   }
207
208   e.run();
209
210   while (not vetoed.empty()) {
211     XBT_DEBUG("Start new scheduling round");
212     /* Get the set of ready tasks */
213     auto ready_tasks = get_ready_tasks(dax);
214     vetoed.clear();
215
216     if (ready_tasks.empty()) {
217       ready_tasks.clear();
218       /* there is no ready task, let advance the simulation */
219       e.run();
220       continue;
221     }
222     /* For each ready task:
223      * get the host that minimizes the completion time.
224      * select the task that has the minimum completion time on its best host.
225      */
226     for (auto task : ready_tasks) {
227       XBT_DEBUG("%s is ready", task->get_cname());
228       host               = get_best_host(task);
229       double finish_time = finish_on_at(task, host);
230       if (min_finish_time < 0 || finish_time < min_finish_time) {
231         min_finish_time = finish_time;
232         selected_task   = task;
233         selected_host   = host;
234       }
235     }
236
237     XBT_INFO("Schedule %s on %s", selected_task->get_cname(), selected_host->get_cname());
238     selected_task->set_host(selected_host);
239     // we can also set the destination of all the input comms of the selected task
240     for (const auto& pred : selected_task->get_dependencies()) {
241       auto* comm = dynamic_cast<simgrid::s4u::Comm*>(pred.get());
242       if (comm != nullptr) {
243         comm->set_destination(selected_host);
244         delete static_cast<double*>(comm->get_data());
245       }
246     }
247     // we can also set the source of all the output comms of the selected task
248     for (const auto& succ : selected_task->get_successors()) {
249       auto* comm = dynamic_cast<simgrid::s4u::Comm*>(succ.get());
250       if (comm != nullptr)
251         comm->set_source(selected_host);
252     }
253
254     /*
255      * tasks can be executed concurrently when they can by default.
256      * Yet schedulers take decisions assuming that tasks wait for resource availability to start.
257      * The solution (well crude hack is to keep track of the last task scheduled on a host and add a special type of
258      * dependency if needed to force the sequential execution meant by the scheduler.
259      * If the last scheduled task is already done, has failed or is a predecessor of the current task, no need for a
260      * new dependency
261      */
262
263     auto last_scheduled_task = sg_host_get_last_scheduled_task(selected_host);
264     if (last_scheduled_task && (last_scheduled_task->get_state() != simgrid::s4u::Activity::State::FINISHED) &&
265         (last_scheduled_task->get_state() != simgrid::s4u::Activity::State::FAILED) &&
266         not dependency_exists(sg_host_get_last_scheduled_task(selected_host), selected_task))
267       last_scheduled_task->add_successor(selected_task);
268
269     sg_host_set_last_scheduled_task(selected_host, selected_task);
270     sg_host_set_available_at(selected_host, min_finish_time);
271
272     ready_tasks.clear();
273     /* reset the min_finish_time for the next set of ready tasks */
274     min_finish_time = -1.;
275     e.run();
276   }
277
278   XBT_INFO("Simulation Time: %f", simgrid_get_clock());
279   XBT_INFO("------------------- Produce the trace file---------------------------");
280   XBT_INFO("Producing a jedule output (if active) of the run into %s",
281            tracefilename ? tracefilename : "minmin_test.jed");
282 #if SIMGRID_HAVE_JEDULE
283   jedule_sd_dump(tracefilename);
284 #endif
285   free(tracefilename);
286
287   for (auto h : hosts) {
288     xbt_free(h->get_data());
289     h->set_data(nullptr);
290   }
291
292   return 0;
293 }