Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
97938f20de35c0d11c1341b51decb32c2df3df75
[simgrid.git] / examples / cpp / dag-scheduling / s4u-dag-scheduling.cpp
1 /* Copyright (c) 2009-2023. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 /* simple test to schedule a DAX file with the Min-Min algorithm.           */
7 #include <algorithm>
8 #include <simgrid/host.h>
9 #include <simgrid/s4u.hpp>
10 #include <string.h>
11
12 XBT_LOG_NEW_DEFAULT_CATEGORY(dag_scheduling, "Logging specific to this example");
13 namespace sg4 = simgrid::s4u;
14
15 struct HostAttribute {
16   /* Earliest time at which a host is ready to execute a task */
17   double available_at                     = 0.0;
18   sg4::Exec* last_scheduled_task          = nullptr;
19 };
20
21 static double sg_host_get_available_at(const sg4::Host* host)
22 {
23   return host->get_data<HostAttribute>()->available_at;
24 }
25
26 static void sg_host_set_available_at(const sg4::Host* host, double time)
27 {
28   host->get_data<HostAttribute>()->available_at = time;
29 }
30
31 static sg4::Exec* sg_host_get_last_scheduled_task(const sg4::Host* host)
32 {
33   return host->get_data<HostAttribute>()->last_scheduled_task;
34 }
35
36 static void sg_host_set_last_scheduled_task(const sg4::Host* host, sg4::ExecPtr task)
37 {
38   host->get_data<HostAttribute>()->last_scheduled_task = task.get();
39 }
40
41 static bool dependency_exists(const sg4::Exec* src, sg4::Exec* dst)
42 {
43   const auto& dependencies = src->get_dependencies();
44   const auto& successors   = src->get_successors();
45   return (std::find(successors.begin(), successors.end(), dst) != successors.end() ||
46           dependencies.find(dst) != dependencies.end());
47 }
48
49 static std::vector<sg4::Exec*> get_ready_tasks(const std::vector<sg4::ActivityPtr>& dax)
50 {
51   std::vector<sg4::Exec*> ready_tasks;
52   std::map<sg4::Exec*, unsigned int> candidate_execs;
53
54   for (auto& a : dax) {
55     // Only look at activity that have their dependencies solved but are not assigned
56     if (a->dependencies_solved() && not a->is_assigned()) {
57       // if it is an exec, it's ready
58       if (auto* exec = dynamic_cast<sg4::Exec*>(a.get()))
59         ready_tasks.push_back(exec);
60       // if it a comm, we consider its successor as a candidate. If a candidate solves all its dependencies,
61       // i.e., get all its input data, it's ready
62       if (const auto* comm = dynamic_cast<sg4::Comm*>(a.get())) {
63         auto* next_exec = static_cast<sg4::Exec*>(comm->get_successors().front().get());
64         candidate_execs[next_exec]++;
65         if (next_exec->get_dependencies().size() == candidate_execs[next_exec])
66           ready_tasks.push_back(next_exec);
67       }
68     }
69   }
70   XBT_DEBUG("There are %zu ready tasks", ready_tasks.size());
71   return ready_tasks;
72 }
73
74 static double finish_on_at(const sg4::ExecPtr task, const sg4::Host* host)
75 {
76   double data_available      = 0.;
77   double last_data_available = -1.0;
78   /* compute last_data_available */
79   for (const auto& parent : task->get_dependencies()) {
80     /* normal case */
81     if (const auto* comm = dynamic_cast<sg4::Comm*>(parent.get())) {
82       auto source = comm->get_source();
83       XBT_DEBUG("transfer from %s to %s", source->get_cname(), host->get_cname());
84       /* Estimate the redistribution time from this parent */
85       double redist_time;
86       if (comm->get_remaining() <= 1e-6) {
87         redist_time = 0;
88       } else {
89         redist_time =
90             sg_host_get_route_latency(source, host) + comm->get_remaining() / sg_host_get_route_bandwidth(source, host);
91       }
92       // We use the user data field to store the finish time of the predecessor of the comm, i.e., its potential start
93       // time
94       data_available = *comm->get_data<double>() + redist_time;
95     }
96
97     /* no transfer, control dependency */
98     if (const auto* exec = dynamic_cast<sg4::Exec*>(parent.get()))
99       data_available = exec->get_finish_time();
100
101     if (last_data_available < data_available)
102       last_data_available = data_available;
103   }
104
105   return std::max(sg_host_get_available_at(host), last_data_available) + task->get_remaining() / host->get_speed();
106 }
107
108 static sg4::Host* get_best_host(const sg4::ExecPtr exec)
109 {
110   std::vector<sg4::Host*> hosts          = sg4::Engine::get_instance()->get_all_hosts();
111   auto best_host                         = hosts.front();
112   double min_EFT                         = finish_on_at(exec, best_host);
113
114   for (const auto& h : hosts) {
115     double EFT = finish_on_at(exec, h);
116     XBT_DEBUG("%s finishes on %s at %f", exec->get_cname(), h->get_cname(), EFT);
117
118     if (EFT < min_EFT) {
119       min_EFT   = EFT;
120       best_host = h;
121     }
122   }
123   return best_host;
124 }
125
126 static void schedule_on(sg4::ExecPtr exec, sg4::Host* host)
127 {
128   exec->set_host(host);
129   // we can also set the destination of all the input comms of this exec
130   for (const auto& pred : exec->get_dependencies()) {
131     auto* comm = dynamic_cast<sg4::Comm*>(pred.get());
132     if (comm != nullptr) {
133       comm->set_destination(host);
134       delete comm->get_data<double>();
135     }
136   }
137   // we can also set the source of all the output comms of this exec
138   for (const auto& succ : exec->get_successors()) {
139     auto* comm = dynamic_cast<sg4::Comm*>(succ.get());
140     if (comm != nullptr)
141       comm->set_source(host);
142   }
143 }
144
145 int main(int argc, char** argv)
146 {
147   sg4::Engine e(&argc, argv);
148   std::set<sg4::Activity*> vetoed;
149   e.track_vetoed_activities(&vetoed);
150
151   sg4::Activity::on_completion_cb([](sg4::Activity const& activity) {
152     // when an Exec completes, we need to set the potential start time of all its ouput comms
153     const auto* exec = dynamic_cast<sg4::Exec const*>(&activity);
154     if (exec == nullptr) // Only Execs are concerned here
155       return;
156     for (const auto& succ : exec->get_successors()) {
157       auto* comm = dynamic_cast<sg4::Comm*>(succ.get());
158       if (comm != nullptr) {
159         auto* finish_time = new double(exec->get_finish_time());
160         // We use the user data field to store the finish time of the predecessor of the comm, i.e., its potential start
161         // time
162         comm->set_data(finish_time);
163       }
164     }
165   });
166
167   e.load_platform(argv[1]);
168   const auto hosts = e.get_all_hosts();
169
170   /* Mark all hosts as sequential, as it ought to be in such a scheduling example.
171    *
172    * It means that the hosts can only compute one thing at a given time. If an execution already takes place on a given
173    * host, any subsequently started execution will be queued until after the first execution terminates */
174   for (auto const& host : hosts)
175     host->set_concurrency_limit(1);
176
177   /*  Allocating the host attribute */
178   unsigned long total_nhosts = e.get_host_count();
179   std::vector<HostAttribute> host_attributes(total_nhosts);
180   for (unsigned long i = 0; i < total_nhosts; i++)
181     hosts[i]->set_data(&host_attributes[i]);
182
183   /* load the DAX file */
184   auto dax = sg4::create_DAG_from_DAX(argv[2]);
185
186   /* Schedule the root first */
187   auto* root = static_cast<sg4::Exec*>(dax.front().get());
188   auto host  = get_best_host(root);
189   schedule_on(root, host);
190
191   e.run();
192
193   while (not vetoed.empty()) {
194     XBT_DEBUG("Start new scheduling round");
195     /* Get the set of ready tasks */
196     auto ready_tasks = get_ready_tasks(dax);
197     vetoed.clear();
198
199     if (ready_tasks.empty()) {
200       /* there is no ready task, let advance the simulation */
201       e.run();
202       continue;
203     }
204     /* For each ready task:
205      * get the host that minimizes the completion time.
206      * select the task that has the minimum completion time on its best host.
207      */
208     double min_finish_time            = -1.0;
209     sg4::Exec* selected_task          = nullptr;
210     sg4::Host* selected_host          = nullptr;
211
212     for (auto task : ready_tasks) {
213       XBT_DEBUG("%s is ready", task->get_cname());
214       host               = get_best_host(task);
215       double finish_time = finish_on_at(task, host);
216       if (min_finish_time < 0 || finish_time < min_finish_time) {
217         min_finish_time = finish_time;
218         selected_task   = task;
219         selected_host   = host;
220       }
221     }
222
223     XBT_INFO("Schedule %s on %s", selected_task->get_cname(), selected_host->get_cname());
224     schedule_on(selected_task, selected_host);
225
226     /*
227      * tasks can be executed concurrently when they can by default.
228      * Yet schedulers take decisions assuming that tasks wait for resource availability to start.
229      * The solution (well crude hack is to keep track of the last task scheduled on a host and add a special type of
230      * dependency if needed to force the sequential execution meant by the scheduler.
231      * If the last scheduled task is already done, has failed or is a predecessor of the current task, no need for a
232      * new dependency
233      */
234
235     if (auto last_scheduled_task = sg_host_get_last_scheduled_task(selected_host);
236         last_scheduled_task && (last_scheduled_task->get_state() != sg4::Activity::State::FINISHED) &&
237         (last_scheduled_task->get_state() != sg4::Activity::State::FAILED) &&
238         not dependency_exists(sg_host_get_last_scheduled_task(selected_host), selected_task))
239       last_scheduled_task->add_successor(selected_task);
240
241     sg_host_set_last_scheduled_task(selected_host, selected_task);
242     sg_host_set_available_at(selected_host, min_finish_time);
243
244     ready_tasks.clear();
245     e.run();
246   }
247
248   XBT_INFO("Simulation Time: %f", simgrid_get_clock());
249
250   return 0;
251 }