read->add_successor(exec2);
// Add a function to be called when tasks end for log purpose
- sg4::Task::on_end_cb([](const sg4::Task* t) {
+ sg4::Task::on_completion_cb([](const sg4::Task* t) {
XBT_INFO("Task %s finished (%d)", t->get_name().c_str(), t->get_count());
});
comm->add_successor(exec2);
// Add a function to be called when tasks end for log purpose
- sg4::Task::on_end_cb([](const sg4::Task* t) {
+ sg4::Task::on_completion_cb([](const sg4::Task* t) {
XBT_INFO("Task %s finished (%d)", t->get_name().c_str(), t->get_count());
});
SB->enqueue_execs(5);
// Add a function to be called when tasks end for log purpose
- sg4::Task::on_end_cb([]
+ sg4::Task::on_completion_cb([]
(const sg4::Task* t) {
XBT_INFO("Task %s finished (%d)", t->get_name().c_str(), t->get_count());
});
exec2->add_successor(comm2);
// Add a function to be called when tasks end for log purpose
- sg4::Task::on_end_cb([](const sg4::Task* t) {
+ sg4::Task::on_completion_cb([](const sg4::Task* t) {
XBT_INFO("Task %s finished (%d)", t->get_name().c_str(), t->get_count());
});
comm->add_successor(exec);
// Add a function to be called when tasks end for log purpose
- sg4::Task::on_end_cb([](const sg4::Task* t) {
+ sg4::Task::on_completion_cb([](const sg4::Task* t) {
XBT_INFO("Task %s finished (%d)", t->get_name().c_str(), t->get_count());
});
read.add_successor(exec2)
# Add a function to be called when tasks end for log purpose
- Task.on_end_cb(callback)
+ Task.on_completion_cb(callback)
# Enqueue two executions for task exec1
exec1.enqueue_execs(2)
comm.add_successor(exec2)
# Add a function to be called when tasks end for log purpose
- Task.on_end_cb(callback)
+ Task.on_completion_cb(callback)
# Enqueue two executions for task exec1
exec1.enqueue_execs(2)
exec2.add_successor(comm2)
# Add a function to be called when tasks end for log purpose
- Task.on_end_cb(callback)
+ Task.on_completion_cb(callback)
# Add a function to be called before each executions of comm0
# This function modifies the graph of tasks by adding or removing
comm.add_successor(exec)
# Add a function to be called when tasks end for log purpose
- Task.on_end_cb(callback)
+ Task.on_completion_cb(callback)
# Create the actor that will inject load during the simulation
Actor.create("input", tremblay, variable_load, comm)
class XBT_PUBLIC Token : public xbt::Extendable<Token> {};
class Task {
+ std::string name_;
+ double amount_;
+ int queued_execs_ = 0;
+ int count_ = 0;
+ bool working_ = false;
+
std::set<Task*> successors_ = {};
std::map<Task*, unsigned int> predecessors_ = {};
+ std::atomic_int_fast32_t refcount_{0};
bool ready_to_run() const;
void receive(Task* source);
-protected:
- std::string name_;
- double amount_;
std::shared_ptr<Token> token_ = nullptr;
std::deque<std::map<TaskPtr, std::shared_ptr<Token>>> tokens_received_;
- int queued_execs_ = 0;
- int count_ = 0;
- bool working_ = false;
- s4u::ActivityPtr previous_activity_;
- s4u::ActivityPtr current_activity_;
- xbt::signal<void(Task*)> on_this_start_;
- xbt::signal<void(Task*)> on_this_end_;
+ ActivityPtr previous_activity_;
+ ActivityPtr current_activity_;
+
+protected:
explicit Task(const std::string& name);
virtual ~Task() = default;
- virtual void fire() = 0;
+
+ virtual void fire();
void complete();
- static xbt::signal<void(Task*)> on_start;
- static xbt::signal<void(Task*)> on_end;
- std::atomic_int_fast32_t refcount_{0};
+ void set_current_activity (ActivityPtr a) { current_activity_ = a; }
+
+ inline static xbt::signal<void(Task*)> on_start;
+ xbt::signal<void(Task*)> on_this_start;
+ inline static xbt::signal<void(Task*)> on_completion;
+ xbt::signal<void(Task*)> on_this_completion;
public:
const std::string& get_name() const { return name_; }
const char* get_cname() const { return name_.c_str(); }
- void enqueue_execs(int n);
void set_amount(double amount);
double get_amount() const { return amount_; }
+ int get_count() const { return count_; }
+
void set_token(std::shared_ptr<Token> token);
std::shared_ptr<Token> get_next_token_from(TaskPtr t);
+
void add_successor(TaskPtr t);
void remove_successor(TaskPtr t);
void remove_all_successors();
const std::set<Task*>& get_successors() const { return successors_; }
- void on_this_start_cb(const std::function<void(Task*)>& func);
- void on_this_end_cb(const std::function<void(Task*)>& func);
- int get_count() const;
- /** Add a callback fired before a task activity start.
+ void enqueue_execs(int n);
+
+ /** Add a callback fired before this task activity starts */
+ void on_this_start_cb(const std::function<void(Task*)>& func){ on_this_start.connect(func); }
+ /** Add a callback fired before a task activity starts.
* Triggered after the on_this_start function**/
static void on_start_cb(const std::function<void(Task*)>& cb) { on_start.connect(cb); }
- /** Add a callback fired after a task activity end.
- * Triggered after the on_this_end function, but before
- * sending tokens to successors.**/
- static void on_end_cb(const std::function<void(Task*)>& cb) { on_end.connect(cb); }
+ /** Add a callback fired before this task activity ends */
+ void on_this_completion_cb(const std::function<void(Task*)>& func) { on_this_completion.connect(func); };
+ /** Add a callback fired after a task activity ends.
+ * Triggered after the on_this_end function, but before sending tokens to successors.**/
+ static void on_completion_cb(const std::function<void(Task*)>& cb) { on_completion.connect(cb); }
#ifndef DOXYGEN
friend void intrusive_ptr_release(Task* o)
#endif
};
-class ExecTask : public Task {
- s4u::Host* host_;
+class CommTask : public Task {
+ Host* source_;
+ Host* destination_;
- explicit ExecTask(const std::string& name);
+ explicit CommTask(const std::string& name);
void fire() override;
public:
- static ExecTaskPtr init(const std::string& name);
- static ExecTaskPtr init(const std::string& name, double flops, s4u::Host* host);
- ExecTaskPtr set_host(s4u::Host* host);
- s4u::Host* get_host() const { return host_; }
- ExecTaskPtr set_flops(double flops);
- double get_flops() const { return get_amount(); }
+ static CommTaskPtr init(const std::string& name);
+ static CommTaskPtr init(const std::string& name, double bytes, Host* source, Host* destination);
+
+ CommTaskPtr set_source(Host* source);
+ Host* get_source() const { return source_; }
+ CommTaskPtr set_destination(Host* destination);
+ Host* get_destination() const { return destination_; }
+ CommTaskPtr set_bytes(double bytes);
+ double get_bytes() const { return get_amount(); }
};
-class CommTask : public Task {
- s4u::Host* source_;
- s4u::Host* destination_;
+class ExecTask : public Task {
+ Host* host_;
- explicit CommTask(const std::string& name);
+ explicit ExecTask(const std::string& name);
void fire() override;
public:
- static CommTaskPtr init(const std::string& name);
- static CommTaskPtr init(const std::string& name, double bytes, s4u::Host* source, s4u::Host* destination);
- CommTaskPtr set_source(s4u::Host* source);
- s4u::Host* get_source() const { return source_; }
- CommTaskPtr set_destination(s4u::Host* destination);
- s4u::Host* get_destination() const { return destination_; }
- CommTaskPtr set_bytes(double bytes);
- double get_bytes() const { return get_amount(); }
+ static ExecTaskPtr init(const std::string& name);
+ static ExecTaskPtr init(const std::string& name, double flops, Host* host);
+
+ ExecTaskPtr set_host(Host* host);
+ Host* get_host() const { return host_; }
+ ExecTaskPtr set_flops(double flops);
+ double get_flops() const { return get_amount(); }
};
class IoTask : public Task {
- s4u::Disk* disk_;
- s4u::Io::OpType type_;
+ Disk* disk_;
+ Io::OpType type_;
explicit IoTask(const std::string& name);
void fire() override;
public:
static IoTaskPtr init(const std::string& name);
- static IoTaskPtr init(const std::string& name, double bytes, s4u::Disk* disk, s4u::Io::OpType type);
- IoTaskPtr set_disk(s4u::Disk* disk);
- s4u::Disk* get_disk() const { return disk_; }
+ static IoTaskPtr init(const std::string& name, double bytes, Disk* disk, Io::OpType type);
+
+ IoTaskPtr set_disk(Disk* disk);
+ Disk* get_disk() const { return disk_; }
IoTaskPtr set_bytes(double bytes);
double get_bytes() const { return get_amount(); }
- IoTaskPtr set_op_type(s4u::Io::OpType type);
- s4u::Io::OpType get_op_type() const { return type_; }
+ IoTaskPtr set_op_type(Io::OpType type);
+ Io::OpType get_op_type() const { return type_; }
};
} // namespace simgrid::s4u
#endif
},
"Add a callback called when each task starts.")
.def_static(
- "on_end_cb",
+ "on_completion_cb",
[](py::object cb) {
cb.inc_ref(); // keep alive after return
const py::gil_scoped_release gil_release;
- Task::on_end_cb([cb_p = cb.ptr()](Task* op) {
+ Task::on_completion_cb([cb_p = cb.ptr()](Task* op) {
const py::gil_scoped_acquire py_context; // need a new context for callback
py::reinterpret_borrow<py::function>(cb_p)(op);
});
"Remove all successors of this task.")
.def("on_this_start_cb", py::overload_cast<const std::function<void(Task*)>&>(&Task::on_this_start_cb),
py::arg("func"), "Add a callback called when this task starts.")
- .def("on_this_end_cb", py::overload_cast<const std::function<void(Task*)>&>(&Task::on_this_end_cb),
+ .def("on_this_completion_cb", py::overload_cast<const std::function<void(Task*)>&>(&Task::on_this_completion_cb),
py::arg("func"), "Add a callback called when this task ends.")
.def(
"__repr__", [](const TaskPtr op) { return "Task(" + op->get_name() + ")"; },
namespace simgrid::s4u {
-xbt::signal<void(Task*)> Task::on_start;
-xbt::signal<void(Task*)> Task::on_end;
-
Task::Task(const std::string& name) : name_(name) {}
/**
*/
void Task::complete()
{
- xbt_assert(s4u::Actor::is_maestro());
+ xbt_assert(Actor::is_maestro());
working_ = false;
count_++;
- on_this_end_(this);
- Task::on_end(this);
+ on_this_completion(this);
+ on_completion(this);
if (current_activity_)
previous_activity_ = std::move(current_activity_);
for (auto const& t : successors_)
return tokens_received_.front()[t];
}
+void Task::fire() {
+ on_this_start(this);
+ on_start(this);
+ working_ = true;
+ queued_execs_ = std::max(queued_execs_ - 1, 0);
+ if (tokens_received_.size() > 0)
+ tokens_received_.pop_front();
+}
+
/** @param successor The Task to add.
* @brief Add a successor to this Task.
* @note It also adds this as a predecessor of successor.
});
}
-/** @ingroup plugin_task
- * @param func The function to set.
- * @brief Set a function to be called before each execution.
- * @note The function is called before the underlying Activity starts.
- */
-void Task::on_this_start_cb(const std::function<void(Task*)>& func)
-{
- simgrid::kernel::actor::simcall_answered([this, &func] { on_this_start_.connect(func); });
-}
-
-/** @ingroup plugin_task
- * @param func The function to set.
- * @brief Set a function to be called after each execution.
- * @note The function is called after the underlying Activity ends, but before sending tokens to successors.
- */
-void Task::on_this_end_cb(const std::function<void(Task*)>& func)
-{
- simgrid::kernel::actor::simcall_answered([this, &func] { on_this_end_.connect(func); });
-}
-
-/** @ingroup plugin_task
- * @brief Return the number of completed executions.
- */
-int Task::get_count() const
-{
- return count_;
-}
-
/**
* @brief Default constructor.
*/
/** @ingroup plugin_task
* @brief Smart Constructor.
*/
-ExecTaskPtr ExecTask::init(const std::string& name, double flops, s4u::Host* host)
+ExecTaskPtr ExecTask::init(const std::string& name, double flops, Host* host)
{
return init(name)->set_flops(flops)->set_host(host);
}
*/
void ExecTask::fire()
{
- on_this_start_(this);
- Task::on_start(this);
- working_ = true;
- queued_execs_ = std::max(queued_execs_ - 1, 0);
- if (tokens_received_.size() > 0)
- tokens_received_.pop_front();
- s4u::ExecPtr exec = s4u::Exec::init();
- exec->set_name(name_);
- exec->set_flops_amount(amount_);
- exec->set_host(host_);
+ Task::fire();
+ auto exec = Exec::init()->set_name(get_name())->set_flops_amount(get_amount())->set_host(host_);
exec->start();
- exec->on_this_completion_cb([this](Exec const& exec) { this->complete(); });
- current_activity_ = exec;
+ exec->on_this_completion_cb([this](Exec const&) { this->complete(); });
+ set_current_activity(exec);
}
/** @ingroup plugin_task
* @param host The host to set.
* @brief Set a new host.
*/
-ExecTaskPtr ExecTask::set_host(s4u::Host* host)
+ExecTaskPtr ExecTask::set_host(Host* host)
{
kernel::actor::simcall_answered([this, host] { host_ = host; });
return this;
*/
ExecTaskPtr ExecTask::set_flops(double flops)
{
- kernel::actor::simcall_answered([this, flops] { amount_ = flops; });
+ kernel::actor::simcall_answered([this, flops] { set_amount(flops); });
return this;
}
/** @ingroup plugin_task
* @brief Smart constructor.
*/
-CommTaskPtr CommTask::init(const std::string& name, double bytes, s4u::Host* source, s4u::Host* destination)
+CommTaskPtr CommTask::init(const std::string& name, double bytes, Host* source, Host* destination)
{
return init(name)->set_bytes(bytes)->set_source(source)->set_destination(destination);
}
*/
void CommTask::fire()
{
- on_this_start_(this);
- Task::on_start(this);
- working_ = true;
- queued_execs_ = std::max(queued_execs_ - 1, 0);
- if (tokens_received_.size() > 0)
- tokens_received_.pop_front();
- s4u::CommPtr comm = s4u::Comm::sendto_init(source_, destination_);
- comm->set_name(name_);
- comm->set_payload_size(amount_);
+ Task::fire();
+ auto comm = Comm::sendto_init(source_, destination_)->set_name(get_name())->set_payload_size(get_amount());
comm->start();
- comm->on_this_completion_cb([this](Comm const& comm) { this->complete(); });
- current_activity_ = comm;
+ comm->on_this_completion_cb([this](Comm const&) { this->complete(); });
+ set_current_activity(comm);
}
/** @ingroup plugin_task
* @param source The host to set.
* @brief Set a new source host.
*/
-CommTaskPtr CommTask::set_source(s4u::Host* source)
+CommTaskPtr CommTask::set_source(Host* source)
{
kernel::actor::simcall_answered([this, source] { source_ = source; });
return this;
* @param destination The host to set.
* @brief Set a new destination host.
*/
-CommTaskPtr CommTask::set_destination(s4u::Host* destination)
+CommTaskPtr CommTask::set_destination(Host* destination)
{
kernel::actor::simcall_answered([this, destination] { destination_ = destination; });
return this;
*/
CommTaskPtr CommTask::set_bytes(double bytes)
{
- kernel::actor::simcall_answered([this, bytes] { amount_ = bytes; });
+ kernel::actor::simcall_answered([this, bytes] { set_amount(bytes); });
return this;
}
/** @ingroup plugin_task
* @brief Smart Constructor.
*/
-IoTaskPtr IoTask::init(const std::string& name, double bytes, s4u::Disk* disk, s4u::Io::OpType type)
+IoTaskPtr IoTask::init(const std::string& name, double bytes, Disk* disk, Io::OpType type)
{
return init(name)->set_bytes(bytes)->set_disk(disk)->set_op_type(type);
}
* @param disk The disk to set.
* @brief Set a new disk.
*/
-IoTaskPtr IoTask::set_disk(s4u::Disk* disk)
+IoTaskPtr IoTask::set_disk(Disk* disk)
{
kernel::actor::simcall_answered([this, disk] { disk_ = disk; });
return this;
*/
IoTaskPtr IoTask::set_bytes(double bytes)
{
- kernel::actor::simcall_answered([this, bytes] { amount_ = bytes; });
+ kernel::actor::simcall_answered([this, bytes] { set_amount(bytes); });
return this;
}
/** @ingroup plugin_task */
-IoTaskPtr IoTask::set_op_type(s4u::Io::OpType type)
+IoTaskPtr IoTask::set_op_type(Io::OpType type)
{
kernel::actor::simcall_answered([this, type] { type_ = type; });
return this;
void IoTask::fire()
{
- on_this_start_(this);
- Task::on_start(this);
- working_ = true;
- queued_execs_ = std::max(queued_execs_ - 1, 0);
- if (tokens_received_.size() > 0)
- tokens_received_.pop_front();
- s4u::IoPtr io = s4u::Io::init();
- io->set_name(name_);
- io->set_size(amount_);
- io->set_disk(disk_);
- io->set_op_type(type_);
+ Task::fire();
+ auto io = Io::init()->set_name(get_name())->set_size(get_amount())->set_disk(disk_)->set_op_type(type_);
io->start();
- io->on_this_completion_cb([this](Io const& io) { this->complete(); });
- current_activity_ = io;
+ io->on_this_completion_cb([this](Io const&) { this->complete(); });
+ set_current_activity(io);
}
} // namespace simgrid::s4u