/* This program is free software; you can redistribute it and/or modify it
* under the terms of the license (GNU LGPL) which comes with this package. */
-/* This example takes the main concepts of Apache Storm presented here https://storm.apache.org/releases/2.4.0/Concepts.html
- and use them to build a simulation of a stream processing application
+/* This example takes the main concepts of Apache Storm presented here
+ https://storm.apache.org/releases/2.4.0/Concepts.html and use them to build a simulation of a stream processing
+ application
Spout SA produces data every 100ms. The volume produced is alternatively 1e3, 1e6 and 1e9 bytes.
Spout SB produces 1e6 bytes every 200ms.
- Bolt B1 and B2 processes data from Spout SA alternatively. The quantity of work to process this data is 10 flops per bytes
- Bolt B3 processes data from Spout SB.
- Bolt B4 processes data from Bolt B3.
+ Bolt B1 and B2 processes data from Spout SA alternatively. The quantity of work to process this data is 10 flops per
+ bytes Bolt B3 processes data from Spout SB. Bolt B4 processes data from Bolt B3.
Fafard
┌────┐
// Retrieve hosts
auto tremblay = e.host_by_name("Tremblay");
auto jupiter = e.host_by_name("Jupiter");
- auto fafard = e.host_by_name("Fafard");
+ auto fafard = e.host_by_name("Fafard");
auto ginette = e.host_by_name("Ginette");
auto bourassa = e.host_by_name("Bourassa");
Alternatively we: remove/add the link between SA and SA_to_B2
add/remove the link between SA and SA_to_B1
*/
- SA->on_this_start_cb([SA_to_B1,SA_to_B2](sg4::Task* t) {
+ SA->on_this_start_cb([SA_to_B1, SA_to_B2](sg4::Task* t) {
int count = t->get_count();
sg4::CommTaskPtr comm;
if (count % 2 == 0) {
t->remove_successor(SA_to_B2);
t->add_successor(SA_to_B1);
comm = SA_to_B1;
- }
- else {
+ } else {
t->remove_successor(SA_to_B1);
t->add_successor(SA_to_B2);
comm = SA_to_B2;
}
- std::vector<double> amount = {1e3,1e6,1e9};
+ std::vector<double> amount = {1e3, 1e6, 1e9};
comm->set_amount(amount[count % 3]);
auto token = std::make_shared<sg4::Token>();
token->set_data(new double(amount[count % 3]));
});
// The token sent by SA is forwarded by both communication tasks
- SA_to_B1->on_this_start_cb([SA](sg4::Task* t) { t->set_token(t->get_next_token_from(SA)); });
- SA_to_B2->on_this_start_cb([SA](sg4::Task* t) { t->set_token(t->get_next_token_from(SA)); });
+ SA_to_B1->on_this_start_cb([&SA](sg4::Task* t) { t->set_token(t->get_next_token_from(SA)); });
+ SA_to_B2->on_this_start_cb([&SA](sg4::Task* t) { t->set_token(t->get_next_token_from(SA)); });
/* B1 and B2 read the value of the token received by their predecessors
and use it to adapt their amount of work to do.
SB->enqueue_firings(5);
// Add a function to be called when tasks end for log purpose
- sg4::Task::on_completion_cb([]
- (const sg4::Task* t) {
- XBT_INFO("Task %s finished (%d)", t->get_name().c_str(), t->get_count());
- });
+ sg4::Task::on_completion_cb(
+ [](const sg4::Task* t) { XBT_INFO("Task %s finished (%d)", t->get_name().c_str(), t->get_count()); });
// Start the simulation
e.run();
exec2->add_successor(comm2);
// Add a function to be called when tasks end for log purpose
- sg4::Task::on_completion_cb([](const sg4::Task* t) {
- XBT_INFO("Task %s finished (%d)", t->get_name().c_str(), t->get_count());
- });
+ sg4::Task::on_completion_cb(
+ [](const sg4::Task* t) { XBT_INFO("Task %s finished (%d)", t->get_name().c_str(), t->get_count()); });
// Add a function to be called before each firing of comm0
// This function modifies the graph of tasks by adding or removing
// successors to comm0
- comm0->on_this_start_cb([comm0, exec1, exec2, jupiter, fafard](sg4::Task*) {
+ comm0->on_this_start_cb([&comm0, exec1, exec2, jupiter, fafard](const sg4::Task*) {
static int count = 0;
if (count % 2 == 0) {
comm0->set_destination(jupiter);
* It is forbidden to change the amount of work once the Activity is started */
Activity* set_remaining(double remains);
- virtual void fire_on_completion() const = 0;
+ virtual void fire_on_start() const = 0;
+ virtual void fire_on_this_start() const = 0;
+ virtual void fire_on_completion() const = 0;
virtual void fire_on_this_completion() const = 0;
- virtual void fire_on_suspend() const = 0;
- virtual void fire_on_this_suspend() const = 0;
- virtual void fire_on_resume() const = 0;
- virtual void fire_on_this_resume() const = 0;
- virtual void fire_on_veto() const = 0;
- virtual void fire_on_this_veto() const = 0;
+ virtual void fire_on_suspend() const = 0;
+ virtual void fire_on_this_suspend() const = 0;
+ virtual void fire_on_resume() const = 0;
+ virtual void fire_on_this_resume() const = 0;
+ virtual void fire_on_veto() = 0;
+ virtual void fire_on_this_veto() = 0;
public:
void start()
std::string name_ = "unnamed";
std::string tracing_category_ = "";
- protected:
+ inline static xbt::signal<void(AnyActivity const&)> on_start;
+ xbt::signal<void(AnyActivity const&)> on_this_start;
inline static xbt::signal<void(AnyActivity const&)> on_completion;
xbt::signal<void(AnyActivity const&)> on_this_completion;
inline static xbt::signal<void(AnyActivity const&)> on_suspend;
inline static xbt::signal<void(AnyActivity&)> on_veto;
xbt::signal<void(AnyActivity&)> on_this_veto;
+ protected:
+ void fire_on_start() const override { on_start(static_cast<const AnyActivity&>(*this)); }
+ void fire_on_this_start() const override { on_this_start(static_cast<const AnyActivity&>(*this)); }
+ void fire_on_completion() const override { on_completion(static_cast<const AnyActivity&>(*this)); }
+ void fire_on_this_completion() const override { on_this_completion(static_cast<const AnyActivity&>(*this)); }
+ void fire_on_suspend() const override { on_suspend(static_cast<const AnyActivity&>(*this)); }
+ void fire_on_this_suspend() const override { on_this_suspend(static_cast<const AnyActivity&>(*this)); }
+ void fire_on_resume() const override { on_resume(static_cast<const AnyActivity&>(*this)); }
+ void fire_on_this_resume() const override { on_this_resume(static_cast<const AnyActivity&>(*this)); }
+ void fire_on_veto() override { on_veto(static_cast<AnyActivity&>(*this)); }
+ void fire_on_this_veto() override { on_this_veto(static_cast<AnyActivity&>(*this)); }
+
public:
+ /*! \static Add a callback fired when any activity starts (no veto) */
+ static void on_start_cb(const std::function<void(AnyActivity const&)>& cb) { on_start.connect(cb); }
+ /*! Add a callback fired when this specific activity starts (no veto) */
+ void on_this_start_cb(const std::function<void(AnyActivity const&)>& cb) { on_this_start.connect(cb); }
/*! \static Add a callback fired when any activity completes (either normally, cancelled or failed) */
static void on_completion_cb(const std::function<void(AnyActivity const&)>& cb) { on_completion.connect(cb); }
/*! Add a callback fired when this specific activity completes (either normally, cancelled or failed) */
void on_this_veto_cb(const std::function<void(AnyActivity&)>& cb) { on_this_veto.connect(cb); }
XBT_ATTRIB_DEPRECATED_v338("Please use on_suspend_cb() instead") static void on_suspended_cb(
- const std::function<void(Activity const&)>& cb) { on_suspend.connect(cb); }
+ const std::function<void(Activity const&)>& cb)
+ {
+ on_suspend.connect(cb);
+ }
XBT_ATTRIB_DEPRECATED_v338("Please use on_resume_cb() instead") static void on_resumed_cb(
- const std::function<void(Activity const&)>& cb) { on_resume.connect(cb); }
+ const std::function<void(Activity const&)>& cb)
+ {
+ on_resume.connect(cb);
+ }
AnyActivity* add_successor(ActivityPtr a)
{
std::string name_;
double amount_;
int queued_firings_ = 0;
- int count_ = 0;
- bool working_ = false;
+ int count_ = 0;
+ bool working_ = false;
std::set<Task*> successors_ = {};
std::map<Task*, unsigned int> predecessors_ = {};
ActivityPtr previous_activity_;
ActivityPtr current_activity_;
+ inline static xbt::signal<void(Task*)> on_start;
+ xbt::signal<void(Task*)> on_this_start;
+ inline static xbt::signal<void(Task*)> on_completion;
+ xbt::signal<void(Task*)> on_this_completion;
+
protected:
explicit Task(const std::string& name);
- virtual ~Task() = default;
+ virtual ~Task() = default;
virtual void fire();
void complete();
- void set_current_activity (ActivityPtr a) { current_activity_ = a; }
+ void set_current_activity(ActivityPtr a) { current_activity_ = a; }
- inline static xbt::signal<void(Task*)> on_start;
- xbt::signal<void(Task*)> on_this_start;
- inline static xbt::signal<void(Task*)> on_completion;
- xbt::signal<void(Task*)> on_this_completion;
-
public:
const std::string& get_name() const { return name_; }
const char* get_cname() const { return name_.c_str(); }
void enqueue_firings(int n);
/** Add a callback fired before this task activity starts */
- void on_this_start_cb(const std::function<void(Task*)>& func){ on_this_start.connect(func); }
+ void on_this_start_cb(const std::function<void(Task*)>& func) { on_this_start.connect(func); }
/** Add a callback fired before a task activity starts.
* Triggered after the on_this_start function**/
static void on_start_cb(const std::function<void(Task*)>& cb) { on_start.connect(cb); }
#include "simgrid/kernel/ProfileBuilder.hpp"
#include "simgrid/kernel/routing/NetPoint.hpp"
+ #include "simgrid/plugins/load.h"
#include <simgrid/Exception.hpp>
#include <simgrid/s4u/Actor.hpp>
#include <simgrid/s4u/Barrier.hpp>
#include <vector>
namespace py = pybind11;
-using simgrid::s4u::CommTask;
-using simgrid::s4u::CommTaskPtr;
-using simgrid::s4u::ExecTask;
-using simgrid::s4u::ExecTaskPtr;
-using simgrid::s4u::IoTask;
-using simgrid::s4u::IoTaskPtr;
-using simgrid::s4u::Task;
-using simgrid::s4u::TaskPtr;
using simgrid::s4u::Actor;
using simgrid::s4u::ActorPtr;
using simgrid::s4u::Barrier;
using simgrid::s4u::BarrierPtr;
using simgrid::s4u::Comm;
using simgrid::s4u::CommPtr;
+using simgrid::s4u::CommTask;
+using simgrid::s4u::CommTaskPtr;
using simgrid::s4u::Disk;
using simgrid::s4u::Engine;
+using simgrid::s4u::ExecTask;
+using simgrid::s4u::ExecTaskPtr;
using simgrid::s4u::Host;
using simgrid::s4u::Io;
+using simgrid::s4u::IoTask;
+using simgrid::s4u::IoTaskPtr;
using simgrid::s4u::Link;
using simgrid::s4u::Mailbox;
using simgrid::s4u::Mutex;
using simgrid::s4u::MutexPtr;
using simgrid::s4u::Semaphore;
using simgrid::s4u::SemaphorePtr;
+using simgrid::s4u::Task;
+using simgrid::s4u::TaskPtr;
XBT_LOG_NEW_DEFAULT_CATEGORY(python, "python");
py::overload_cast<const std::string&, const std::string&, const std::string&>(&Host::create_disk),
py::call_guard<py::gil_scoped_release>(), "Create a disk")
.def("seal", &Host::seal, py::call_guard<py::gil_scoped_release>(), "Seal this host")
+ .def("turn_off", &Host::turn_off, py::call_guard<py::gil_scoped_release>(), "Turn off this host")
+ .def("turn_on", &Host::turn_on, py::call_guard<py::gil_scoped_release>(), "Turn on this host")
.def_property("pstate", &Host::get_pstate,
py::cpp_function(&Host::set_pstate, py::call_guard<py::gil_scoped_release>()),
"The current pstate (read/write property).")
"")
.def(
"__repr__", [](const Host* h) { return "Host(" + h->get_name() + ")"; },
- "Textual representation of the Host");
+ "Textual representation of the Host.");
+
+ m.def("sg_host_load_plugin_init", [host]() {
+ sg_host_load_plugin_init();
+
+ static_cast<pybind11::class_<simgrid::s4u::Host, std::unique_ptr<simgrid::s4u::Host, pybind11::nodelete>>>(host)
+ .def(
+ "reset_load", [](const Host* h) { sg_host_load_reset(h); }, py::call_guard<py::gil_scoped_release>(),
+ "Reset counters of the host load plugin for this host.")
+ .def_property_readonly(
+ "current_load", [](const Host* h) { return sg_host_get_current_load(h); }, "Current load of the host.")
+ .def_property_readonly(
+ "avg_load", [](const Host* h) { return sg_host_get_avg_load(h); }, "Average load of the host.")
+ .def_property_readonly(
+ "idle_time", [](const Host* h) { return sg_host_get_idle_time(h); }, "Idle time of the host")
+ .def_property_readonly(
+ "total_idle_time", [](const Host* h) { return sg_host_get_total_idle_time(h); },
+ "Total idle time of the host.")
+ .def_property_readonly(
+ "computed_flops", [](const Host* h) { return sg_host_get_computed_flops(h); },
+ "Computed flops of the host.");
+ });
py::enum_<simgrid::s4u::Host::SharingPolicy>(host, "SharingPolicy")
.value("NONLINEAR", simgrid::s4u::Host::SharingPolicy::NONLINEAR)
public:
void copy_from(const Strategy* strategy) override
{
- const MaxMatchComm* cast_strategy = dynamic_cast<MaxMatchComm const*>(strategy);
+ const auto* cast_strategy = dynamic_cast<MaxMatchComm const*>(strategy);
xbt_assert(cast_strategy != nullptr);
for (auto& [id, val] : cast_strategy->mailbox_)
mailbox_[id] = val;
std::pair<aid_t, int> best_transition(bool must_be_todo) const override
{
- std::pair<aid_t, int> min_found = std::make_pair(-1, value_of_state_+2);
+ std::pair<aid_t, int> min_found = std::make_pair(-1, value_of_state_ + 2);
for (auto const& [aid, actor] : actors_to_run_) {
if ((not actor.is_todo() && must_be_todo) || not actor.is_enabled() || actor.is_done())
continue;
int aid_value = value_of_state_;
const Transition* transition = actor.get_transition(actor.get_times_considered()).get();
-
- const CommRecvTransition* cast_recv = dynamic_cast<CommRecvTransition const*>(transition);
- if (cast_recv != nullptr) {
- if (mailbox_.count(cast_recv->get_mailbox()) > 0 and
- mailbox_.at(cast_recv->get_mailbox()) > 0) {
- aid_value--; // This means we have waiting recv corresponding to this recv
- } else {
+
+ if (auto const* cast_recv = dynamic_cast<CommRecvTransition const*>(transition)) {
- if (mailbox_.count(cast_recv->get_mailbox()) > 0 && mailbox_.at(cast_recv->get_mailbox()) > 0) {
- aid_value--; // This means we have waiting recv corresponding to this recv
- } else {
- aid_value++;
- }
++ if (mailbox_.count(cast_recv->get_mailbox()) > 0 && mailbox_.at(cast_recv->get_mailbox()) > 0) {
++ aid_value--; // This means we have waiting recv corresponding to this recv
++ } else {
+ aid_value++;
- }
++ }
}
-
- const CommSendTransition* cast_send = dynamic_cast<CommSendTransition const*>(transition);
- if (cast_send != nullptr) {
- if (mailbox_.count(cast_send->get_mailbox()) > 0 and
- mailbox_.at(cast_send->get_mailbox()) < 0) {
- aid_value--; // This means we have waiting recv corresponding to this send
- }else {
- aid_value++;
- }
+
+ if (auto const* cast_send = dynamic_cast<CommSendTransition const*>(transition)) {
- if (mailbox_.count(cast_send->get_mailbox()) > 0 && mailbox_.at(cast_send->get_mailbox()) < 0) {
- aid_value--; // This means we have waiting recv corresponding to this send
- } else {
- aid_value++;
- }
++ if (mailbox_.count(cast_send->get_mailbox()) > 0 && mailbox_.at(cast_send->get_mailbox()) < 0) {
++ aid_value--; // This means we have waiting recv corresponding to this send
++ } else {
++ aid_value++;
++ }
}
-
+
if (aid_value < min_found.second)
min_found = std::make_pair(aid, aid_value);
}
return min_found;
}
-
void execute_next(aid_t aid, RemoteApp& app) override
{
const Transition* transition = actors_to_run_.at(aid).get_transition(actors_to_run_.at(aid).get_times_considered()).get();
last_transition_ = transition->type_;
- const CommRecvTransition* cast_recv = dynamic_cast<CommRecvTransition const*>(transition);
- if (cast_recv != nullptr)
+ if (auto const* cast_recv = dynamic_cast<CommRecvTransition const*>(transition))
last_mailbox_ = cast_recv->get_mailbox();
- const CommSendTransition* cast_send = dynamic_cast<CommSendTransition const*>(transition);
- if (cast_send != nullptr)
+ if (auto const* cast_send = dynamic_cast<CommSendTransition const*>(transition))
last_mailbox_ = cast_send->get_mailbox();
}
};
public:
void copy_from(const Strategy* strategy) override
{
- const MinMatchComm* cast_strategy = dynamic_cast<MinMatchComm const*>(strategy);
- xbt_assert(cast_strategy != nullptr);
- for (auto& [id, val] : cast_strategy->mailbox_)
- mailbox_[id] = val;
- if (cast_strategy->last_transition_ == Transition::Type::COMM_ASYNC_RECV)
- mailbox_[cast_strategy->last_mailbox_]--;
- if (cast_strategy->last_transition_ == Transition::Type::COMM_ASYNC_SEND)
- mailbox_[cast_strategy->last_mailbox_]++;
-
- for (auto const& [_, val] : mailbox_)
- value_of_state_ -= std::abs(val);
- xbt_assert(value_of_state_ > 0, "MinMatchComm value shouldn't reach 0");
+ const auto* cast_strategy = dynamic_cast<MinMatchComm const*>(strategy);
+ xbt_assert(cast_strategy != nullptr);
+ for (auto& [id, val] : cast_strategy->mailbox_)
+ mailbox_[id] = val;
+ if (cast_strategy->last_transition_ == Transition::Type::COMM_ASYNC_RECV)
+ mailbox_[cast_strategy->last_mailbox_]--;
+ if (cast_strategy->last_transition_ == Transition::Type::COMM_ASYNC_SEND)
+ mailbox_[cast_strategy->last_mailbox_]++;
+
+ for (auto const& [_, val] : mailbox_)
+ value_of_state_ -= std::abs(val);
+ xbt_assert(value_of_state_ > 0, "MinMatchComm value shouldn't reach 0");
}
MinMatchComm() = default;
~MinMatchComm() override = default;
std::pair<aid_t, int> best_transition(bool must_be_todo) const override
{
- std::pair<aid_t, int> min_found = std::make_pair(-1, value_of_state_+2);
+ std::pair<aid_t, int> min_found = std::make_pair(-1, value_of_state_ + 2);
for (auto const& [aid, actor] : actors_to_run_) {
if ((not actor.is_todo() && must_be_todo) || not actor.is_enabled() || actor.is_done())
continue;
int aid_value = value_of_state_;
const Transition* transition = actor.get_transition(actor.get_times_considered()).get();
- const CommRecvTransition* cast_recv = dynamic_cast<CommRecvTransition const*>(transition);
- if (cast_recv != nullptr) {
- if ((mailbox_.count(cast_recv->get_mailbox()) > 0 and
- mailbox_.at(cast_recv->get_mailbox()) <= 0) or mailbox_.count(cast_recv->get_mailbox()) == 0)
- aid_value--; // This means we don't have waiting recv corresponding to this recv
- else
- aid_value++;
+ if (auto const* cast_recv = dynamic_cast<CommRecvTransition const*>(transition)) {
- if ((mailbox_.count(cast_recv->get_mailbox()) > 0 && mailbox_.at(cast_recv->get_mailbox()) <= 0) ||
- mailbox_.count(cast_recv->get_mailbox()) == 0)
- aid_value--; // This means we don't have waiting recv corresponding to this recv
- else
- aid_value++;
++ if ((mailbox_.count(cast_recv->get_mailbox()) > 0 && mailbox_.at(cast_recv->get_mailbox()) <= 0) ||
++ mailbox_.count(cast_recv->get_mailbox()) == 0)
++ aid_value--; // This means we don't have waiting recv corresponding to this recv
++ else
++ aid_value++;
}
- const CommSendTransition* cast_send = dynamic_cast<CommSendTransition const*>(transition);
- if (cast_send != nullptr) {
- if ((mailbox_.count(cast_send->get_mailbox()) > 0 and
- mailbox_.at(cast_send->get_mailbox()) >= 0) or mailbox_.count(cast_send->get_mailbox()) == 0)
- aid_value--;
- else
- aid_value++;
+ if (auto const* cast_send = dynamic_cast<CommSendTransition const*>(transition)) {
- if ((mailbox_.count(cast_send->get_mailbox()) > 0 && mailbox_.at(cast_send->get_mailbox()) >= 0) ||
- mailbox_.count(cast_send->get_mailbox()) == 0)
- aid_value--;
- else
- aid_value++;
++ if ((mailbox_.count(cast_send->get_mailbox()) > 0 && mailbox_.at(cast_send->get_mailbox()) >= 0) ||
++ mailbox_.count(cast_send->get_mailbox()) == 0)
++ aid_value--;
++ else
++ aid_value++;
}
-
+
if (aid_value < min_found.second)
min_found = std::make_pair(aid, aid_value);
}
const Transition* transition = actors_to_run_.at(aid).get_transition(actors_to_run_.at(aid).get_times_considered()).get();
last_transition_ = transition->type_;
- const CommRecvTransition* cast_recv = dynamic_cast<CommRecvTransition const*>(transition);
- if (cast_recv != nullptr)
+ if (auto const* cast_recv = dynamic_cast<CommRecvTransition const*>(transition))
last_mailbox_ = cast_recv->get_mailbox();
- const CommSendTransition* cast_send = dynamic_cast<CommSendTransition const*>(transition);
- if (cast_send != nullptr)
+ if (auto const* cast_send = dynamic_cast<CommSendTransition const*>(transition))
last_mailbox_ = cast_send->get_mailbox();
}
};
virtual std::pair<aid_t, int> best_transition(bool must_be_todo) const = 0;
/** Returns the best transition among those that should be interleaved. */
- std::pair<aid_t, int> next_transition() { return best_transition(true); }
+ std::pair<aid_t, int> next_transition() const { return best_transition(true); }
/** Allows for the strategy to update its fields knowing that the actor aid will
* be executed and a children strategy will then be created. */
/** Ensure at least one transition is marked as todo among the enabled ones not done.
* If required, it marks as todo the best transition according to the strategy. */
void consider_best() {
- for (auto& [_, actor] :actors_to_run_)
- if (actor.is_todo())
- return;
+ if (std::any_of(begin(actors_to_run_), end(actors_to_run_),
+ [](const auto& actor) { return actor.second.is_todo(); }))
+ return;
aid_t best_aid = best_transition(false).first;
if (best_aid != -1)
actors_to_run_.at(best_aid).mark_todo();
// else raise an error
void consider_one(aid_t aid)
{
- xbt_assert(actors_to_run_.at(aid).is_enabled() and not actors_to_run_.at(aid).is_done(),
+ xbt_assert(actors_to_run_.at(aid).is_enabled() && not actors_to_run_.at(aid).is_done(),
"Tried to mark as TODO actor %ld but it is either not enabled or already done", aid);
actors_to_run_.at(aid).mark_todo();
}
{
unsigned long count = 0;
for (auto& [_, actor] : actors_to_run_)
- if (actor.is_enabled() and not actor.is_done()) {
- if (actor.is_enabled() && not actor.is_done()) {
-- actor.mark_todo();
-- count++;
-- }
++ if (actor.is_enabled() && not actor.is_done()) {
++ actor.mark_todo();
++ count++;
++ }
return count;
}
#include "src/mc/transition/Transition.hpp"
#include "xbt/random.hpp"
- #define MAX_RAND 100000
-
namespace simgrid::mc {
/** Guiding strategy that valuate states randomly */
class UniformStrategy : public Strategy {
+ static constexpr int MAX_RAND = 100000;
+
std::map<aid_t, int> valuation;
public:
}
void copy_from(const Strategy* strategy) override
{
- for (auto& [aid, _] : actors_to_run_)
+ for (auto const& [aid, _] : actors_to_run_)
valuation[aid] = xbt::random::uniform_int(0, MAX_RAND);
}
// Consider only valid actors
for (auto const& [aid, actor] : actors_to_run_) {
- if ((actor.is_todo() or not must_be_todo) and (not actor.is_done()) and actor.is_enabled())
+ if ((actor.is_todo() || not must_be_todo) && (not actor.is_done()) && actor.is_enabled())
possibilities++;
}
chosen = xbt::random::uniform_int(0, possibilities-1);
for (auto const& [aid, actor] : actors_to_run_) {
- if (((not actor.is_todo()) and must_be_todo) or actor.is_done() or (not actor.is_enabled()))
- if (((not actor.is_todo()) && must_be_todo) || actor.is_done() || (not actor.is_enabled()))
++ if (((not actor.is_todo()) && must_be_todo) || actor.is_done() || (not actor.is_enabled()))
continue;
if (chosen == 0) {
return std::make_pair(aid, valuation.at(aid));
"model-check/strategy",
"Specify the the kind of heuristic to use for guided model-checking",
"none",
- {{"none", "No specific strategy: simply pick the first available transistion and act as a DFS."},
+ {{"none", "No specific strategy: simply pick the first available transition and act as a DFS."},
{"max_match_comm", "Try to minimize the number of in-fly communication by appairing matching send and receive."},
-- {"min_match_comm", "Try to maximize the number of in-fly communication by not appairing matching send and receive."},
-- {"uniform", "No specific strategy: choices are made randomly based on a uniform sampling."}
-- }};
++ {"min_match_comm",
++ "Try to maximize the number of in-fly communication by not appairing matching send and receive."},
++ {"uniform", "No specific strategy: choices are made randomly based on a uniform sampling."}}};
simgrid::config::Flag<int> _sg_mc_random_seed{"model-check/rand-seed",
"give a specific random seed to initialize the uniform distribution", 0,
simgrid::kernel::activity::ActivityImplPtr comm = nullptr;
simgrid::kernel::actor::CommIsendSimcall send_observer{
- sender, mbox->get_impl(), task_size, rate, static_cast<unsigned char*>(src_buff), src_buff_size, match_fun,
- nullptr, copy_data_fun, data, false};
+ sender, mbox->get_impl(), task_size, rate, static_cast<unsigned char*>(src_buff),
+ src_buff_size, match_fun, nullptr, copy_data_fun, data,
+ false, "Isend"};
comm = simgrid::kernel::actor::simcall_answered(
[&send_observer] { return simgrid::kernel::activity::CommImpl::isend(&send_observer); }, &send_observer);
- if (simgrid::kernel::actor::ActivityWaitSimcall wait_observer{sender, comm.get(), timeout};
+ if (simgrid::kernel::actor::ActivityWaitSimcall wait_observer{sender, comm.get(), timeout, "Wait"};
simgrid::kernel::actor::simcall_blocking(
[&wait_observer] {
wait_observer.get_activity()->wait_for(wait_observer.get_issuer(), wait_observer.get_timeout());
} else {
simgrid::kernel::actor::CommIsendSimcall observer(sender, mbox->get_impl(), task_size, rate,
static_cast<unsigned char*>(src_buff), src_buff_size, match_fun,
- nullptr, copy_data_fun, data, false);
+ nullptr, copy_data_fun, data, false, "Isend");
simgrid::kernel::actor::simcall_blocking([&observer, timeout] {
simgrid::kernel::activity::ActivityImplPtr comm = simgrid::kernel::activity::CommImpl::isend(&observer);
comm->wait_for(observer.get_issuer(), timeout);
match_fun,
copy_data_fun,
data,
- rate};
+ rate,
+ "Irecv"};
comm = simgrid::kernel::actor::simcall_answered(
[&observer] { return simgrid::kernel::activity::CommImpl::irecv(&observer); }, &observer);
- if (simgrid::kernel::actor::ActivityWaitSimcall wait_observer{receiver, comm.get(), timeout};
+ if (simgrid::kernel::actor::ActivityWaitSimcall wait_observer{receiver, comm.get(), timeout, "wait"};
simgrid::kernel::actor::simcall_blocking(
[&wait_observer] {
wait_observer.get_activity()->wait_for(wait_observer.get_issuer(), wait_observer.get_timeout());
comm = nullptr;
} else {
simgrid::kernel::actor::CommIrecvSimcall observer(receiver, mbox->get_impl(), static_cast<unsigned char*>(dst_buff),
- dst_buff_size, match_fun, copy_data_fun, data, rate);
+ dst_buff_size, match_fun, copy_data_fun, data, rate, "Irecv");
simgrid::kernel::actor::simcall_blocking([&observer, timeout] {
simgrid::kernel::activity::ActivityImplPtr comm = simgrid::kernel::activity::CommImpl::irecv(&observer);
comm->wait_for(observer.get_issuer(), timeout);
xbt_assert(src_buff_ == nullptr && dst_buff_ == nullptr,
"Direct host-to-host communications cannot carry any data.");
XBT_DEBUG("host-to-host Comm. Pimpl already created and set, just start it.");
- on_start(*this);
- on_this_start(*this);
+ fire_on_start();
+ fire_on_this_start();
kernel::actor::simcall_answered([this] {
pimpl_->set_state(kernel::activity::State::READY);
boost::static_pointer_cast<kernel::activity::CommImpl>(pimpl_)->start();
clean_fun_,
copy_data_function_,
get_data<void>(),
- detached_};
+ detached_,
+ "Isend"};
pimpl_ = kernel::actor::simcall_answered([&observer] { return kernel::activity::CommImpl::isend(&observer); },
&observer);
} else if (dst_buff_ != nullptr) { // Receiver side
match_fun_,
copy_data_function_,
get_data<void>(),
- rate_};
+ rate_,
+ "Irecv"};
pimpl_ = kernel::actor::simcall_answered([&observer] { return kernel::activity::CommImpl::irecv(&observer); },
&observer);
} else {
pimpl_->set_actor(sender_);
// Only throw the signal when both sides are here and the status is READY
if (pimpl_->get_state() != kernel::activity::State::WAITING) {
- on_start(*this);
- on_this_start(*this);
+ fire_on_start();
+ fire_on_this_start();
}
}
case State::STARTED:
try {
issuer = kernel::actor::ActorImpl::self();
- kernel::actor::ActivityWaitSimcall observer{issuer, pimpl_.get(), timeout};
+ kernel::actor::ActivityWaitSimcall observer{issuer, pimpl_.get(), timeout, "Wait"};
if (kernel::actor::simcall_blocking(
[&observer] { observer.get_activity()->wait_for(observer.get_issuer(), observer.get_timeout()); },
&observer)) {
#include <memory>
#include <simgrid/Exception.hpp>
-#include <simgrid/s4u/Task.hpp>
#include <simgrid/s4u/Comm.hpp>
#include <simgrid/s4u/Exec.hpp>
#include <simgrid/s4u/Io.hpp>
+#include <simgrid/s4u/Task.hpp>
#include <simgrid/simix.hpp>
#include "src/simgrid/module.hpp"
void Task::receive(Task* source)
{
XBT_DEBUG("Task %s received a token from %s", name_.c_str(), source->name_.c_str());
- auto source_count = predecessors_[source]++;
+ auto source_count = predecessors_[source];
+ predecessors_[source]++;
if (tokens_received_.size() <= queued_firings_ + source_count)
- tokens_received_.push_back({});
+ tokens_received_.emplace_back();
tokens_received_[queued_firings_ + source_count][source] = source->token_;
- bool enough_tokens = true;
+ bool enough_tokens = true;
for (auto const& [key, val] : predecessors_)
if (val < 1) {
enough_tokens = false;
return tokens_received_.front()[t];
}
-void Task::fire() {
+void Task::fire()
+{
on_this_start(this);
on_start(this);
- working_ = true;
+ working_ = true;
queued_firings_ = std::max(queued_firings_ - 1, 0);
- if (tokens_received_.size() > 0)
+ if (not tokens_received_.empty())
tokens_received_.pop_front();
}