-/* Copyright (c) 2009-2018. The SimGrid Team.
+/* Copyright (c) 2009-2023. The SimGrid Team.
* All rights reserved. */
/* This program is free software; you can redistribute it and/or modify it
This is done to avoid SMPI actors to start at actor_id=0.
3. For each job:
1. Sleep until job's starting time is reached (if needed)
- 2. Launch the replay of the corresponding time-indepent trace.
+ 2. Launch the replay of the corresponding time-independent trace.
3. Create inter-process noise, by spawning useless actors.
4. Wait for completion (via s4u::Engine's run method)
*/
#include <algorithm>
#include <fstream>
+#include <memory>
#include <sstream>
#include <stdexcept>
#include <vector>
int unique_job_number; //!< The job unique number in [0, n[.
};
-// ugly globals to avoid creating structures for giving args to processes
-static std::vector<simgrid::s4u::Host*> hosts;
-static int noise_between_jobs;
-
-static bool job_comparator(const Job* j1, const Job* j2)
-{
- if (j1->starting_time == j2->starting_time)
- return j1->smpi_app_name < j2->smpi_app_name;
- return j1->starting_time < j2->starting_time;
-}
-
static void smpi_replay_process(Job* job, simgrid::s4u::BarrierPtr barrier, int rank)
{
- // Prepare data for smpi_replay_run
- int argc = 5;
- char** argv = xbt_new(char*, argc);
- argv[0] = xbt_strdup("1"); // log only?
- argv[1] = xbt_strdup(job->smpi_app_name.c_str()); // application instance
- argv[2] = bprintf("%d", rank); // rank
- argv[3] = xbt_strdup(job->traces_filenames[rank].c_str()); // smpi trace file for this rank
- argv[4] = xbt_strdup("0"); // ?
-
XBT_INFO("Replaying rank %d of job %d (smpi_app '%s')", rank, job->unique_job_number, job->smpi_app_name.c_str());
- smpi_replay_run(&argc, &argv);
+ smpi_replay_run(job->smpi_app_name.c_str(), rank, 0, job->traces_filenames[rank].c_str());
XBT_INFO("Finished replaying rank %d of job %d (smpi_app '%s')", rank, job->unique_job_number,
job->smpi_app_name.c_str());
barrier->wait();
-
- // Memory clean-up — leaks can come from argc/argv modifications from smpi_replay_run
- for (int i = 0; i < argc; ++i)
- xbt_free(argv[i]);
- xbt_free(argv);
}
// Sleeps for a given amount of time
-static int sleeper_process(int* param)
+static int sleeper_process(int param)
{
- XBT_DEBUG("Sleeping for %d seconds", *param);
- simgrid::s4u::this_actor::sleep_for(*param);
-
- delete param;
-
+ XBT_DEBUG("Sleeping for %d seconds", param);
+ simgrid::s4u::this_actor::sleep_for(param);
return 0;
}
static void pop_some_processes(int nb_processes, simgrid::s4u::Host* host)
{
for (int i = 0; i < nb_processes; ++i) {
- int* param = new int;
- *param = i + 1;
+ int param = i + 1;
simgrid::s4u::Actor::create("meh", host, sleeper_process, param);
}
}
-static int job_executor_process(Job* job)
+static int job_executor_process(const std::vector<simgrid::s4u::Host*>& hosts, Job* job)
{
XBT_INFO("Executing job %d (smpi_app '%s')", job->unique_job_number, job->smpi_app_name.c_str());
}
// Executes a workload of SMPI processes
-static int workload_executor_process(std::vector<Job*>* workload)
+static int workload_executor_process(const std::vector<simgrid::s4u::Host*>& hosts,
+ const std::vector<std::unique_ptr<Job>>& workload, int noise_between_jobs)
{
- for (Job* job : *workload) {
+ for (auto const& job : workload) {
// Let's wait until the job's waiting time if needed
- double curr_time = simgrid::s4u::Engine::get_clock();
- if (job->starting_time > curr_time) {
+ if (double curr_time = simgrid::s4u::Engine::get_clock(); job->starting_time > curr_time) {
double time_to_sleep = (double)job->starting_time - curr_time;
XBT_INFO("Sleeping %g seconds (waiting for job %d, app '%s')", time_to_sleep, job->starting_time,
job->smpi_app_name.c_str());
// Let's finally run the job executor
char* str_pname = bprintf("job_%04d", job->unique_job_number);
XBT_INFO("Launching the job executor of job %d (app '%s')", job->unique_job_number, job->smpi_app_name.c_str());
- simgrid::s4u::Actor::create(str_pname, hosts[job->allocation[0]], job_executor_process, job);
+ simgrid::s4u::Actor::create(str_pname, hosts[job->allocation[0]], job_executor_process, std::cref(hosts),
+ job.get());
xbt_free(str_pname);
}
}
// Reads jobs from a workload file and returns them
-static std::vector<Job*> all_jobs(const std::string& workload_file)
+static std::vector<std::unique_ptr<Job>> all_jobs(const std::string& workload_file)
{
std::ifstream f(workload_file);
xbt_assert(f.is_open(), "Cannot open file '%s'.", workload_file.c_str());
- std::vector<Job*> jobs;
+ std::vector<std::unique_ptr<Job>> jobs;
simgrid::xbt::Path path(workload_file);
std::string dir = path.get_dir_name();
std::istringstream is(line);
if (is >> app_name >> filename_unprefixed >> app_size >> starting_time >> alloc) {
try {
- Job job;
- job.smpi_app_name = app_name;
- job.filename = dir + "/" + filename_unprefixed;
- job.app_size = app_size;
- job.starting_time = starting_time;
+ auto job = std::make_unique<Job>();
+ job->smpi_app_name = app_name;
+ job->filename = dir + "/" + filename_unprefixed;
+ job->app_size = app_size;
+ job->starting_time = starting_time;
std::vector<std::string> subparts;
boost::split(subparts, alloc, boost::is_any_of(","), boost::token_compress_on);
- if ((int)subparts.size() != job.app_size)
+ if ((int)subparts.size() != job->app_size)
throw std::invalid_argument("size/alloc inconsistency");
- job.allocation.resize(subparts.size());
+ job->allocation.resize(subparts.size());
for (unsigned int i = 0; i < subparts.size(); ++i)
- job.allocation[i] = stoi(subparts[i]);
+ job->allocation[i] = stoi(subparts[i]);
// Let's read the filename
- std::ifstream traces_file(job.filename);
- if (!traces_file.is_open())
- throw std::invalid_argument("Cannot open file " + job.filename);
+ std::ifstream traces_file(job->filename);
+ if (not traces_file.is_open())
+ throw std::invalid_argument("Cannot open file " + job->filename);
std::string traces_line;
while (std::getline(traces_file, traces_line)) {
boost::trim_right(traces_line);
- job.traces_filenames.push_back(dir + "/" + traces_line);
+ job->traces_filenames.push_back(dir + "/" + traces_line);
}
- if (static_cast<int>(job.traces_filenames.size()) < job.app_size)
+ if (static_cast<int>(job->traces_filenames.size()) < job->app_size)
throw std::invalid_argument("size/tracefiles inconsistency");
- job.traces_filenames.resize(job.app_size);
+ job->traces_filenames.resize(job->app_size);
XBT_INFO("Job read: app='%s', file='%s', size=%d, start=%d, "
"alloc='%s'",
- job.smpi_app_name.c_str(), filename_unprefixed.c_str(), job.app_size, job.starting_time,
+ job->smpi_app_name.c_str(), filename_unprefixed.c_str(), job->app_size, job->starting_time,
alloc.c_str());
- jobs.push_back(new Job(std::move(job)));
+ jobs.emplace_back(std::move(job));
} catch (const std::invalid_argument& e) {
xbt_die("Bad line '%s' of file '%s': %s.\n", line.c_str(), workload_file.c_str(), e.what());
}
// Jobs are sorted by ascending date, then by lexicographical order of their
// application names
- sort(jobs.begin(), jobs.end(), job_comparator);
-
+ sort(jobs.begin(), jobs.end(), [](auto const& j1, auto const& j2) {
+ if (j1->starting_time == j2->starting_time)
+ return j1->smpi_app_name < j2->smpi_app_name;
+ return j1->starting_time < j2->starting_time;
+ });
for (unsigned int i = 0; i < jobs.size(); ++i)
jobs[i]->unique_job_number = i;
// Simulation setting
simgrid::s4u::Engine e(&argc, argv);
e.load_platform(argv[1]);
- hosts = e.get_all_hosts();
+ const auto hosts = e.get_all_hosts();
xbt_assert(hosts.size() >= 4, "The given platform should contain at least 4 hosts (found %zu).", hosts.size());
// Let's retrieve all SMPI jobs
- std::vector<Job*> jobs = all_jobs(argv[2]);
+ std::vector<std::unique_ptr<Job>> jobs = all_jobs(argv[2]);
// Let's register them
- for (const Job* job : jobs)
+ for (auto const& job : jobs)
SMPI_app_instance_register(job->smpi_app_name.c_str(), nullptr, job->app_size);
SMPI_init();
int initial_noise = std::stoi(argv[3]);
xbt_assert(initial_noise >= 0, "Invalid initial_noise argument");
- noise_between_jobs = std::stoi(argv[4]);
+ int noise_between_jobs = std::stoi(argv[4]);
xbt_assert(noise_between_jobs >= 0, "Invalid noise_between_jobs argument");
if (initial_noise > 0) {
}
// Let's execute the workload
- simgrid::s4u::Actor::create("workload", hosts[0], workload_executor_process, &jobs);
+ simgrid::s4u::Actor::create("workload", hosts[0], workload_executor_process, std::cref(hosts), std::cref(jobs),
+ noise_between_jobs);
e.run();
- XBT_INFO("Simulation finished! Final time: %g", e.get_clock());
+ XBT_INFO("Simulation finished! Final time: %g", simgrid::s4u::Engine::get_clock());
SMPI_finalize();
- for (const Job* job : jobs)
- delete job;
-
return 0;
}