From: Fred Suter Date: Wed, 21 Jun 2023 16:57:36 +0000 (-0400) Subject: cleanups and refactoring in s4u::Task X-Git-Tag: v3.34~14^2~5 X-Git-Url: http://bilbo.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/commitdiff_plain/d636f4a9dfd4005a73f8d9e0c393c8b3d849a2ad cleanups and refactoring in s4u::Task --- diff --git a/examples/cpp/task-io/s4u-task-io.cpp b/examples/cpp/task-io/s4u-task-io.cpp index 19102f25a5..52dcca1202 100644 --- a/examples/cpp/task-io/s4u-task-io.cpp +++ b/examples/cpp/task-io/s4u-task-io.cpp @@ -41,7 +41,7 @@ int main(int argc, char* argv[]) read->add_successor(exec2); // Add a function to be called when tasks end for log purpose - sg4::Task::on_end_cb([](const sg4::Task* t) { + sg4::Task::on_completion_cb([](const sg4::Task* t) { XBT_INFO("Task %s finished (%d)", t->get_name().c_str(), t->get_count()); }); diff --git a/examples/cpp/task-simple/s4u-task-simple.cpp b/examples/cpp/task-simple/s4u-task-simple.cpp index dc3df1d04d..0ec6aaa4c2 100644 --- a/examples/cpp/task-simple/s4u-task-simple.cpp +++ b/examples/cpp/task-simple/s4u-task-simple.cpp @@ -38,7 +38,7 @@ int main(int argc, char* argv[]) comm->add_successor(exec2); // Add a function to be called when tasks end for log purpose - sg4::Task::on_end_cb([](const sg4::Task* t) { + sg4::Task::on_completion_cb([](const sg4::Task* t) { XBT_INFO("Task %s finished (%d)", t->get_name().c_str(), t->get_count()); }); diff --git a/examples/cpp/task-storm/s4u-task-storm.cpp b/examples/cpp/task-storm/s4u-task-storm.cpp index bfd52b740e..ca75e9a3ae 100644 --- a/examples/cpp/task-storm/s4u-task-storm.cpp +++ b/examples/cpp/task-storm/s4u-task-storm.cpp @@ -119,7 +119,7 @@ int main(int argc, char* argv[]) SB->enqueue_execs(5); // Add a function to be called when tasks end for log purpose - sg4::Task::on_end_cb([] + sg4::Task::on_completion_cb([] (const sg4::Task* t) { XBT_INFO("Task %s finished (%d)", t->get_name().c_str(), t->get_count()); }); diff --git a/examples/cpp/task-switch-host/s4u-task-switch-host.cpp b/examples/cpp/task-switch-host/s4u-task-switch-host.cpp index ae19ebc93b..3694a7d4e6 100644 --- a/examples/cpp/task-switch-host/s4u-task-switch-host.cpp +++ b/examples/cpp/task-switch-host/s4u-task-switch-host.cpp @@ -46,7 +46,7 @@ int main(int argc, char* argv[]) exec2->add_successor(comm2); // Add a function to be called when tasks end for log purpose - sg4::Task::on_end_cb([](const sg4::Task* t) { + sg4::Task::on_completion_cb([](const sg4::Task* t) { XBT_INFO("Task %s finished (%d)", t->get_name().c_str(), t->get_count()); }); diff --git a/examples/cpp/task-variable-load/s4u-task-variable-load.cpp b/examples/cpp/task-variable-load/s4u-task-variable-load.cpp index b0d3efd955..df41c95a82 100644 --- a/examples/cpp/task-variable-load/s4u-task-variable-load.cpp +++ b/examples/cpp/task-variable-load/s4u-task-variable-load.cpp @@ -50,7 +50,7 @@ int main(int argc, char* argv[]) comm->add_successor(exec); // Add a function to be called when tasks end for log purpose - sg4::Task::on_end_cb([](const sg4::Task* t) { + sg4::Task::on_completion_cb([](const sg4::Task* t) { XBT_INFO("Task %s finished (%d)", t->get_name().c_str(), t->get_count()); }); diff --git a/examples/python/task-io/task-io.py b/examples/python/task-io/task-io.py index 431a78990c..4db1456ded 100644 --- a/examples/python/task-io/task-io.py +++ b/examples/python/task-io/task-io.py @@ -41,7 +41,7 @@ if __name__ == '__main__': read.add_successor(exec2) # Add a function to be called when tasks end for log purpose - Task.on_end_cb(callback) + Task.on_completion_cb(callback) # Enqueue two executions for task exec1 exec1.enqueue_execs(2) diff --git a/examples/python/task-simple/task-simple.py b/examples/python/task-simple/task-simple.py index 4ac876e8f8..ce57577dea 100644 --- a/examples/python/task-simple/task-simple.py +++ b/examples/python/task-simple/task-simple.py @@ -49,7 +49,7 @@ if __name__ == '__main__': comm.add_successor(exec2) # Add a function to be called when tasks end for log purpose - Task.on_end_cb(callback) + Task.on_completion_cb(callback) # Enqueue two executions for task exec1 exec1.enqueue_execs(2) diff --git a/examples/python/task-switch-host/task-switch-host.py b/examples/python/task-switch-host/task-switch-host.py index f56bc40883..53ba3f2374 100644 --- a/examples/python/task-switch-host/task-switch-host.py +++ b/examples/python/task-switch-host/task-switch-host.py @@ -75,7 +75,7 @@ if __name__ == '__main__': exec2.add_successor(comm2) # Add a function to be called when tasks end for log purpose - Task.on_end_cb(callback) + Task.on_completion_cb(callback) # Add a function to be called before each executions of comm0 # This function modifies the graph of tasks by adding or removing diff --git a/examples/python/task-variable-load/task-variable-load.py b/examples/python/task-variable-load/task-variable-load.py index 63925d9b63..719bf56da5 100644 --- a/examples/python/task-variable-load/task-variable-load.py +++ b/examples/python/task-variable-load/task-variable-load.py @@ -58,7 +58,7 @@ if __name__ == '__main__': comm.add_successor(exec) # Add a function to be called when tasks end for log purpose - Task.on_end_cb(callback) + Task.on_completion_cb(callback) # Create the actor that will inject load during the simulation Actor.create("input", tremblay, variable_load, comm) diff --git a/include/simgrid/s4u/Task.hpp b/include/simgrid/s4u/Task.hpp index ef787347a5..d6f41f5074 100644 --- a/include/simgrid/s4u/Task.hpp +++ b/include/simgrid/s4u/Task.hpp @@ -27,56 +27,65 @@ using IoTaskPtr = boost::intrusive_ptr; class XBT_PUBLIC Token : public xbt::Extendable {}; class Task { + std::string name_; + double amount_; + int queued_execs_ = 0; + int count_ = 0; + bool working_ = false; + std::set successors_ = {}; std::map predecessors_ = {}; + std::atomic_int_fast32_t refcount_{0}; bool ready_to_run() const; void receive(Task* source); -protected: - std::string name_; - double amount_; std::shared_ptr token_ = nullptr; std::deque>> tokens_received_; - int queued_execs_ = 0; - int count_ = 0; - bool working_ = false; - s4u::ActivityPtr previous_activity_; - s4u::ActivityPtr current_activity_; - xbt::signal on_this_start_; - xbt::signal on_this_end_; + ActivityPtr previous_activity_; + ActivityPtr current_activity_; + +protected: explicit Task(const std::string& name); virtual ~Task() = default; - virtual void fire() = 0; + + virtual void fire(); void complete(); - static xbt::signal on_start; - static xbt::signal on_end; - std::atomic_int_fast32_t refcount_{0}; + void set_current_activity (ActivityPtr a) { current_activity_ = a; } + + inline static xbt::signal on_start; + xbt::signal on_this_start; + inline static xbt::signal on_completion; + xbt::signal on_this_completion; public: const std::string& get_name() const { return name_; } const char* get_cname() const { return name_.c_str(); } - void enqueue_execs(int n); void set_amount(double amount); double get_amount() const { return amount_; } + int get_count() const { return count_; } + void set_token(std::shared_ptr token); std::shared_ptr get_next_token_from(TaskPtr t); + void add_successor(TaskPtr t); void remove_successor(TaskPtr t); void remove_all_successors(); const std::set& get_successors() const { return successors_; } - void on_this_start_cb(const std::function& func); - void on_this_end_cb(const std::function& func); - int get_count() const; - /** Add a callback fired before a task activity start. + void enqueue_execs(int n); + + /** Add a callback fired before this task activity starts */ + void on_this_start_cb(const std::function& func){ on_this_start.connect(func); } + /** Add a callback fired before a task activity starts. * Triggered after the on_this_start function**/ static void on_start_cb(const std::function& cb) { on_start.connect(cb); } - /** Add a callback fired after a task activity end. - * Triggered after the on_this_end function, but before - * sending tokens to successors.**/ - static void on_end_cb(const std::function& cb) { on_end.connect(cb); } + /** Add a callback fired before this task activity ends */ + void on_this_completion_cb(const std::function& func) { on_this_completion.connect(func); }; + /** Add a callback fired after a task activity ends. + * Triggered after the on_this_end function, but before sending tokens to successors.**/ + static void on_completion_cb(const std::function& cb) { on_completion.connect(cb); } #ifndef DOXYGEN friend void intrusive_ptr_release(Task* o) @@ -90,54 +99,57 @@ public: #endif }; -class ExecTask : public Task { - s4u::Host* host_; +class CommTask : public Task { + Host* source_; + Host* destination_; - explicit ExecTask(const std::string& name); + explicit CommTask(const std::string& name); void fire() override; public: - static ExecTaskPtr init(const std::string& name); - static ExecTaskPtr init(const std::string& name, double flops, s4u::Host* host); - ExecTaskPtr set_host(s4u::Host* host); - s4u::Host* get_host() const { return host_; } - ExecTaskPtr set_flops(double flops); - double get_flops() const { return get_amount(); } + static CommTaskPtr init(const std::string& name); + static CommTaskPtr init(const std::string& name, double bytes, Host* source, Host* destination); + + CommTaskPtr set_source(Host* source); + Host* get_source() const { return source_; } + CommTaskPtr set_destination(Host* destination); + Host* get_destination() const { return destination_; } + CommTaskPtr set_bytes(double bytes); + double get_bytes() const { return get_amount(); } }; -class CommTask : public Task { - s4u::Host* source_; - s4u::Host* destination_; +class ExecTask : public Task { + Host* host_; - explicit CommTask(const std::string& name); + explicit ExecTask(const std::string& name); void fire() override; public: - static CommTaskPtr init(const std::string& name); - static CommTaskPtr init(const std::string& name, double bytes, s4u::Host* source, s4u::Host* destination); - CommTaskPtr set_source(s4u::Host* source); - s4u::Host* get_source() const { return source_; } - CommTaskPtr set_destination(s4u::Host* destination); - s4u::Host* get_destination() const { return destination_; } - CommTaskPtr set_bytes(double bytes); - double get_bytes() const { return get_amount(); } + static ExecTaskPtr init(const std::string& name); + static ExecTaskPtr init(const std::string& name, double flops, Host* host); + + ExecTaskPtr set_host(Host* host); + Host* get_host() const { return host_; } + ExecTaskPtr set_flops(double flops); + double get_flops() const { return get_amount(); } }; class IoTask : public Task { - s4u::Disk* disk_; - s4u::Io::OpType type_; + Disk* disk_; + Io::OpType type_; explicit IoTask(const std::string& name); void fire() override; public: static IoTaskPtr init(const std::string& name); - static IoTaskPtr init(const std::string& name, double bytes, s4u::Disk* disk, s4u::Io::OpType type); - IoTaskPtr set_disk(s4u::Disk* disk); - s4u::Disk* get_disk() const { return disk_; } + static IoTaskPtr init(const std::string& name, double bytes, Disk* disk, Io::OpType type); + + IoTaskPtr set_disk(Disk* disk); + Disk* get_disk() const { return disk_; } IoTaskPtr set_bytes(double bytes); double get_bytes() const { return get_amount(); } - IoTaskPtr set_op_type(s4u::Io::OpType type); - s4u::Io::OpType get_op_type() const { return type_; } + IoTaskPtr set_op_type(Io::OpType type); + Io::OpType get_op_type() const { return type_; } }; } // namespace simgrid::s4u #endif diff --git a/src/bindings/python/simgrid_python.cpp b/src/bindings/python/simgrid_python.cpp index 52f5ca6adb..f7f06b73cf 100644 --- a/src/bindings/python/simgrid_python.cpp +++ b/src/bindings/python/simgrid_python.cpp @@ -933,11 +933,11 @@ PYBIND11_MODULE(simgrid, m) }, "Add a callback called when each task starts.") .def_static( - "on_end_cb", + "on_completion_cb", [](py::object cb) { cb.inc_ref(); // keep alive after return const py::gil_scoped_release gil_release; - Task::on_end_cb([cb_p = cb.ptr()](Task* op) { + Task::on_completion_cb([cb_p = cb.ptr()](Task* op) { const py::gil_scoped_acquire py_context; // need a new context for callback py::reinterpret_borrow(cb_p)(op); }); @@ -957,7 +957,7 @@ PYBIND11_MODULE(simgrid, m) "Remove all successors of this task.") .def("on_this_start_cb", py::overload_cast&>(&Task::on_this_start_cb), py::arg("func"), "Add a callback called when this task starts.") - .def("on_this_end_cb", py::overload_cast&>(&Task::on_this_end_cb), + .def("on_this_completion_cb", py::overload_cast&>(&Task::on_this_completion_cb), py::arg("func"), "Add a callback called when this task ends.") .def( "__repr__", [](const TaskPtr op) { return "Task(" + op->get_name() + ")"; }, diff --git a/src/s4u/s4u_Task.cpp b/src/s4u/s4u_Task.cpp index c0a6e38e90..9c9fbe9487 100644 --- a/src/s4u/s4u_Task.cpp +++ b/src/s4u/s4u_Task.cpp @@ -25,9 +25,6 @@ XBT_LOG_NEW_DEFAULT_SUBCATEGORY(Task, kernel, "Logging specific to the task plug namespace simgrid::s4u { -xbt::signal Task::on_start; -xbt::signal Task::on_end; - Task::Task(const std::string& name) : name_(name) {} /** @@ -76,11 +73,11 @@ void Task::receive(Task* source) */ void Task::complete() { - xbt_assert(s4u::Actor::is_maestro()); + xbt_assert(Actor::is_maestro()); working_ = false; count_++; - on_this_end_(this); - Task::on_end(this); + on_this_completion(this); + on_completion(this); if (current_activity_) previous_activity_ = std::move(current_activity_); for (auto const& t : successors_) @@ -128,6 +125,15 @@ std::shared_ptr Task::get_next_token_from(TaskPtr t) return tokens_received_.front()[t]; } +void Task::fire() { + on_this_start(this); + on_start(this); + working_ = true; + queued_execs_ = std::max(queued_execs_ - 1, 0); + if (tokens_received_.size() > 0) + tokens_received_.pop_front(); +} + /** @param successor The Task to add. * @brief Add a successor to this Task. * @note It also adds this as a predecessor of successor. @@ -163,34 +169,6 @@ void Task::remove_all_successors() }); } -/** @ingroup plugin_task - * @param func The function to set. - * @brief Set a function to be called before each execution. - * @note The function is called before the underlying Activity starts. - */ -void Task::on_this_start_cb(const std::function& func) -{ - simgrid::kernel::actor::simcall_answered([this, &func] { on_this_start_.connect(func); }); -} - -/** @ingroup plugin_task - * @param func The function to set. - * @brief Set a function to be called after each execution. - * @note The function is called after the underlying Activity ends, but before sending tokens to successors. - */ -void Task::on_this_end_cb(const std::function& func) -{ - simgrid::kernel::actor::simcall_answered([this, &func] { on_this_end_.connect(func); }); -} - -/** @ingroup plugin_task - * @brief Return the number of completed executions. - */ -int Task::get_count() const -{ - return count_; -} - /** * @brief Default constructor. */ @@ -207,7 +185,7 @@ ExecTaskPtr ExecTask::init(const std::string& name) /** @ingroup plugin_task * @brief Smart Constructor. */ -ExecTaskPtr ExecTask::init(const std::string& name, double flops, s4u::Host* host) +ExecTaskPtr ExecTask::init(const std::string& name, double flops, Host* host) { return init(name)->set_flops(flops)->set_host(host); } @@ -219,26 +197,18 @@ ExecTaskPtr ExecTask::init(const std::string& name, double flops, s4u::Host* hos */ void ExecTask::fire() { - on_this_start_(this); - Task::on_start(this); - working_ = true; - queued_execs_ = std::max(queued_execs_ - 1, 0); - if (tokens_received_.size() > 0) - tokens_received_.pop_front(); - s4u::ExecPtr exec = s4u::Exec::init(); - exec->set_name(name_); - exec->set_flops_amount(amount_); - exec->set_host(host_); + Task::fire(); + auto exec = Exec::init()->set_name(get_name())->set_flops_amount(get_amount())->set_host(host_); exec->start(); - exec->on_this_completion_cb([this](Exec const& exec) { this->complete(); }); - current_activity_ = exec; + exec->on_this_completion_cb([this](Exec const&) { this->complete(); }); + set_current_activity(exec); } /** @ingroup plugin_task * @param host The host to set. * @brief Set a new host. */ -ExecTaskPtr ExecTask::set_host(s4u::Host* host) +ExecTaskPtr ExecTask::set_host(Host* host) { kernel::actor::simcall_answered([this, host] { host_ = host; }); return this; @@ -249,7 +219,7 @@ ExecTaskPtr ExecTask::set_host(s4u::Host* host) */ ExecTaskPtr ExecTask::set_flops(double flops) { - kernel::actor::simcall_answered([this, flops] { amount_ = flops; }); + kernel::actor::simcall_answered([this, flops] { set_amount(flops); }); return this; } @@ -269,7 +239,7 @@ CommTaskPtr CommTask::init(const std::string& name) /** @ingroup plugin_task * @brief Smart constructor. */ -CommTaskPtr CommTask::init(const std::string& name, double bytes, s4u::Host* source, s4u::Host* destination) +CommTaskPtr CommTask::init(const std::string& name, double bytes, Host* source, Host* destination) { return init(name)->set_bytes(bytes)->set_source(source)->set_destination(destination); } @@ -281,25 +251,18 @@ CommTaskPtr CommTask::init(const std::string& name, double bytes, s4u::Host* sou */ void CommTask::fire() { - on_this_start_(this); - Task::on_start(this); - working_ = true; - queued_execs_ = std::max(queued_execs_ - 1, 0); - if (tokens_received_.size() > 0) - tokens_received_.pop_front(); - s4u::CommPtr comm = s4u::Comm::sendto_init(source_, destination_); - comm->set_name(name_); - comm->set_payload_size(amount_); + Task::fire(); + auto comm = Comm::sendto_init(source_, destination_)->set_name(get_name())->set_payload_size(get_amount()); comm->start(); - comm->on_this_completion_cb([this](Comm const& comm) { this->complete(); }); - current_activity_ = comm; + comm->on_this_completion_cb([this](Comm const&) { this->complete(); }); + set_current_activity(comm); } /** @ingroup plugin_task * @param source The host to set. * @brief Set a new source host. */ -CommTaskPtr CommTask::set_source(s4u::Host* source) +CommTaskPtr CommTask::set_source(Host* source) { kernel::actor::simcall_answered([this, source] { source_ = source; }); return this; @@ -309,7 +272,7 @@ CommTaskPtr CommTask::set_source(s4u::Host* source) * @param destination The host to set. * @brief Set a new destination host. */ -CommTaskPtr CommTask::set_destination(s4u::Host* destination) +CommTaskPtr CommTask::set_destination(Host* destination) { kernel::actor::simcall_answered([this, destination] { destination_ = destination; }); return this; @@ -320,7 +283,7 @@ CommTaskPtr CommTask::set_destination(s4u::Host* destination) */ CommTaskPtr CommTask::set_bytes(double bytes) { - kernel::actor::simcall_answered([this, bytes] { amount_ = bytes; }); + kernel::actor::simcall_answered([this, bytes] { set_amount(bytes); }); return this; } @@ -340,7 +303,7 @@ IoTaskPtr IoTask::init(const std::string& name) /** @ingroup plugin_task * @brief Smart Constructor. */ -IoTaskPtr IoTask::init(const std::string& name, double bytes, s4u::Disk* disk, s4u::Io::OpType type) +IoTaskPtr IoTask::init(const std::string& name, double bytes, Disk* disk, Io::OpType type) { return init(name)->set_bytes(bytes)->set_disk(disk)->set_op_type(type); } @@ -349,7 +312,7 @@ IoTaskPtr IoTask::init(const std::string& name, double bytes, s4u::Disk* disk, s * @param disk The disk to set. * @brief Set a new disk. */ -IoTaskPtr IoTask::set_disk(s4u::Disk* disk) +IoTaskPtr IoTask::set_disk(Disk* disk) { kernel::actor::simcall_answered([this, disk] { disk_ = disk; }); return this; @@ -360,12 +323,12 @@ IoTaskPtr IoTask::set_disk(s4u::Disk* disk) */ IoTaskPtr IoTask::set_bytes(double bytes) { - kernel::actor::simcall_answered([this, bytes] { amount_ = bytes; }); + kernel::actor::simcall_answered([this, bytes] { set_amount(bytes); }); return this; } /** @ingroup plugin_task */ -IoTaskPtr IoTask::set_op_type(s4u::Io::OpType type) +IoTaskPtr IoTask::set_op_type(Io::OpType type) { kernel::actor::simcall_answered([this, type] { type_ = type; }); return this; @@ -373,20 +336,11 @@ IoTaskPtr IoTask::set_op_type(s4u::Io::OpType type) void IoTask::fire() { - on_this_start_(this); - Task::on_start(this); - working_ = true; - queued_execs_ = std::max(queued_execs_ - 1, 0); - if (tokens_received_.size() > 0) - tokens_received_.pop_front(); - s4u::IoPtr io = s4u::Io::init(); - io->set_name(name_); - io->set_size(amount_); - io->set_disk(disk_); - io->set_op_type(type_); + Task::fire(); + auto io = Io::init()->set_name(get_name())->set_size(get_amount())->set_disk(disk_)->set_op_type(type_); io->start(); - io->on_this_completion_cb([this](Io const& io) { this->complete(); }); - current_activity_ = io; + io->on_this_completion_cb([this](Io const&) { this->complete(); }); + set_current_activity(io); } } // namespace simgrid::s4u