#include "src/internal_config.h"
#include <algorithm>
#include <map>
+#include <fstream>
+#include <simgrid/s4u/Host.hpp>
#include <simgrid/s4u/Comm.hpp>
#include <simgrid/s4u/Engine.hpp>
#include <simgrid/s4u/Exec.hpp>
#include "dax_dtd.h"
#include "dax_dtd.c"
+#include <nlohmann/json.hpp>
+
#if HAVE_GRAPHVIZ
#include <graphviz/cgraph.h>
#endif
static std::map<std::string, Comm*, std::less<>> files;
static ExecPtr current_job;
+/** @brief loads a JSON file describing a DAG
+ *
+ * See https://github.com/wfcommons/wfformat for more details.
+ */
+std::vector<ActivityPtr> create_DAG_from_json(const std::string& filename)
+{
+ std::ifstream f(filename);
+ auto data = nlohmann::json::parse(f);
+ std::vector<ActivityPtr> dag = {};
+ std::map<std::string, std::vector<ActivityPtr>> successors = {};
+ std::map<ActivityPtr, Host*> comms_destinations = {};
+ ActivityPtr current;
+
+ for (auto const& task: data["workflow"]["tasks"]) {
+ if (task["type"] == "compute") {
+ current = Exec::init()->set_name(task["name"])->set_flops_amount(task["runtime"]);
+ if (task.contains("machine"))
+ dynamic_cast<Exec*>(current.get())->set_host(simgrid::s4u::Engine::get_instance()->host_by_name(task["machine"]));
+ }
+ else if (task["type"] == "transfer"){
+ current = Comm::sendto_init()->set_name(task["name"])->set_payload_size(task["bytesWritten"]);
+ if (task.contains("machine"))
+ comms_destinations[current] = simgrid::s4u::Engine::get_instance()->host_by_name(task["machine"]);
+ if (task["parents"].size() == 1) {
+ ActivityPtr parent_activity;
+ for (auto const& activity: dag) {
+ if (activity->get_name() == task["parents"][0]) {
+ parent_activity = activity;
+ break;
+ }
+ }
+ if (dynamic_cast<Exec*>(parent_activity.get()) != nullptr)
+ dynamic_cast<Comm*>(current.get())->set_source(dynamic_cast<Exec*>(parent_activity.get())->get_host());
+ else if (dynamic_cast<Comm*>(parent_activity.get()) != nullptr)
+ dynamic_cast<Comm*>(current.get())->set_source(dynamic_cast<Comm*>(parent_activity.get())->get_destination());
+ }
+ }
+ else
+ XBT_DEBUG("Task type \"%s\" not supported.", task["type"]);
+
+ dag.push_back(current);
+ for (auto const& parent: task["parents"]) {
+ auto it = successors.find(parent);
+ if (it == successors.end())
+ successors[parent] = {};
+ successors[parent].push_back(current);
+ }
+ }
+ // Assign successors
+ for (auto const& [parent, successors_list] : successors)
+ for (auto const& activity: dag)
+ if (activity->get_name() == parent) {
+ for (auto const& successor: successors_list)
+ activity->add_successor(successor);
+ break;
+ }
+ // Assign destinations of Comms (if done before successors are assigned there is a bug)
+ for (auto const& [comm, destination]: comms_destinations)
+ dynamic_cast<Comm*>(comm.get())->set_destination(destination);
+
+ // Start only Activities with dependencies solved
+ for (auto const& activity: dag) {
+ if (dynamic_cast<Exec*>(activity.get()) != nullptr and activity->dependencies_solved())
+ activity->start();
+ }
+ return dag;
+}
+
/** @brief loads a DAX file describing a DAG
*
* See https://confluence.pegasus.isi.edu/display/pegasus/WorkflowGenerator for more details.