Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Further cosmetics in that example, adding a helper function to s4u on the way
[simgrid.git] / examples / cpp / dag-scheduling / s4u-dag-scheduling.cpp
1 /* Copyright (c) 2009-2023. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 /* simple test to schedule a DAX file with the Min-Min algorithm.           */
7 #include <algorithm>
8 #include <simgrid/host.h>
9 #include <simgrid/s4u.hpp>
10 #include <string.h>
11
12 XBT_LOG_NEW_DEFAULT_CATEGORY(dag_scheduling, "Logging specific to this example");
13 namespace sg4 = simgrid::s4u;
14
15 static std::vector<sg4::Exec*> get_ready_tasks(const std::vector<sg4::ActivityPtr>& dax)
16 {
17   std::vector<sg4::Exec*> ready_tasks;
18   std::map<sg4::Exec*, unsigned int> candidate_execs;
19
20   for (auto& a : dax) {
21     // Only look at activity that have their dependencies solved but are not assigned
22     if (a->dependencies_solved() && not a->is_assigned()) {
23       // if it is an exec, it's ready
24       if (auto* exec = dynamic_cast<sg4::Exec*>(a.get()))
25         ready_tasks.push_back(exec);
26       // if it a comm, we consider its successor as a candidate. If a candidate solves all its dependencies,
27       // i.e., get all its input data, it's ready
28       if (const auto* comm = dynamic_cast<sg4::Comm*>(a.get())) {
29         auto* next_exec = static_cast<sg4::Exec*>(comm->get_successors().front().get());
30         candidate_execs[next_exec]++;
31         if (next_exec->get_dependencies().size() == candidate_execs[next_exec])
32           ready_tasks.push_back(next_exec);
33       }
34     }
35   }
36   XBT_DEBUG("There are %zu ready tasks", ready_tasks.size());
37   return ready_tasks;
38 }
39
40 static double finish_on_at(const sg4::ExecPtr task, const sg4::Host* host)
41 {
42   double data_available      = 0.;
43   double last_data_available = -1.0;
44   /* compute last_data_available */
45   for (const auto& parent : task->get_dependencies()) {
46     /* normal case */
47     if (const auto* comm = dynamic_cast<sg4::Comm*>(parent.get())) {
48       auto source = comm->get_source();
49       XBT_DEBUG("transfer from %s to %s", source->get_cname(), host->get_cname());
50       /* Estimate the redistribution time from this parent */
51       double redist_time;
52       if (comm->get_remaining() <= 1e-6) {
53         redist_time = 0;
54       } else {
55         double bandwidth      = std::numeric_limits<double>::max();
56         auto [links, latency] = source->route_to(host);
57         for (auto const& link : links)
58           bandwidth = std::min(bandwidth, link->get_bandwidth());
59
60         redist_time = latency + comm->get_remaining() / bandwidth;
61       }
62       // We use the user data field to store the finish time of the predecessor of the comm, i.e., its potential start
63       // time
64       data_available = *comm->get_data<double>() + redist_time;
65     }
66
67     /* no transfer, control dependency */
68     if (const auto* exec = dynamic_cast<sg4::Exec*>(parent.get()))
69       data_available = exec->get_finish_time();
70
71     if (last_data_available < data_available)
72       last_data_available = data_available;
73   }
74   return std::max(*host->get_data<double>(), last_data_available) + task->get_remaining() / host->get_speed();
75 }
76
77 static sg4::Host* get_best_host(const sg4::ExecPtr exec)
78 {
79   sg4::Host* best_host;
80   double min_EFT = std::numeric_limits<double>::max();
81
82   for (const auto& host : sg4::Engine::get_instance()->get_all_hosts()) {
83     double EFT = finish_on_at(exec, host);
84     XBT_DEBUG("%s finishes on %s at %f", exec->get_cname(), host->get_cname(), EFT);
85
86     if (EFT < min_EFT) {
87       min_EFT   = EFT;
88       best_host = host;
89     }
90   }
91   return best_host;
92 }
93
94 static void schedule_on(sg4::ExecPtr exec, sg4::Host* host, double busy_until = 0.0)
95 {
96   exec->set_host(host);
97   // We use the user data field to store up to when the host is busy
98   host->set_data(new double(busy_until));
99   // we can also set the destination of all the input comms of this exec
100   for (const auto& pred : exec->get_dependencies()) {
101     auto* comm = dynamic_cast<sg4::Comm*>(pred.get());
102     if (comm != nullptr) {
103       comm->set_destination(host);
104       delete comm->get_data<double>();
105     }
106   }
107   // we can also set the source of all the output comms of this exec
108   for (const auto& succ : exec->get_successors()) {
109     auto* comm = dynamic_cast<sg4::Comm*>(succ.get());
110     if (comm != nullptr)
111       comm->set_source(host);
112   }
113 }
114
115 int main(int argc, char** argv)
116 {
117   sg4::Engine e(&argc, argv);
118   std::set<sg4::Activity*> vetoed;
119   e.track_vetoed_activities(&vetoed);
120
121   sg4::Activity::on_completion_cb([](sg4::Activity const& activity) {
122     // when an Exec completes, we need to set the potential start time of all its ouput comms
123     const auto* exec = dynamic_cast<sg4::Exec const*>(&activity);
124     if (exec == nullptr) // Only Execs are concerned here
125       return;
126     for (const auto& succ : exec->get_successors()) {
127       auto* comm = dynamic_cast<sg4::Comm*>(succ.get());
128       if (comm != nullptr) {
129         auto* finish_time = new double(exec->get_finish_time());
130         // We use the user data field to store the finish time of the predecessor of the comm, i.e., its potential start
131         // time
132         comm->set_data(finish_time);
133       }
134     }
135   });
136
137   e.load_platform(argv[1]);
138
139   /* Mark all hosts as sequential, as it ought to be in such a scheduling example.
140    *
141    * It means that the hosts can only compute one thing at a given time. If an execution already takes place on a given
142    * host, any subsequently started execution will be queued until after the first execution terminates */
143   for (auto const& host : e.get_all_hosts()) {
144     host->set_concurrency_limit(1);
145     host->set_data(new double(0.0));
146   }
147   /* load the DAX file */
148   auto dax = sg4::create_DAG_from_DAX(argv[2]);
149
150   /* Schedule the root first */
151   auto* root = static_cast<sg4::Exec*>(dax.front().get());
152   auto host  = get_best_host(root);
153   schedule_on(root, host);
154
155   e.run();
156
157   while (not vetoed.empty()) {
158     XBT_DEBUG("Start new scheduling round");
159     /* Get the set of ready tasks */
160     auto ready_tasks = get_ready_tasks(dax);
161     vetoed.clear();
162
163     if (ready_tasks.empty()) {
164       /* there is no ready task, let advance the simulation */
165       e.run();
166       continue;
167     }
168     /* For each ready task:
169      * get the host that minimizes the completion time.
170      * select the task that has the minimum completion time on its best host.
171      */
172     double min_finish_time            = -1.0;
173     sg4::Exec* selected_task          = nullptr;
174     sg4::Host* selected_host          = nullptr;
175
176     for (auto task : ready_tasks) {
177       XBT_DEBUG("%s is ready", task->get_cname());
178       host               = get_best_host(task);
179       double finish_time = finish_on_at(task, host);
180       if (min_finish_time < 0 || finish_time < min_finish_time) {
181         min_finish_time = finish_time;
182         selected_task   = task;
183         selected_host   = host;
184       }
185     }
186
187     XBT_INFO("Schedule %s on %s", selected_task->get_cname(), selected_host->get_cname());
188     schedule_on(selected_task, selected_host, min_finish_time);
189
190     ready_tasks.clear();
191     e.run();
192   }
193
194   XBT_INFO("Simulation Time: %f", simgrid_get_clock());
195
196   return 0;
197 }