Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Merge branch 'master' of https://framagit.org/simgrid/simgrid
authormlaurent <mathieu.laurent@ens-rennes.fr>
Tue, 27 Jun 2023 09:20:07 +0000 (11:20 +0200)
committermlaurent <mathieu.laurent@ens-rennes.fr>
Tue, 27 Jun 2023 09:20:07 +0000 (11:20 +0200)
12 files changed:
1  2 
examples/cpp/task-io/s4u-task-io.cpp
examples/cpp/task-simple/s4u-task-simple.cpp
examples/cpp/task-storm/s4u-task-storm.cpp
examples/cpp/task-switch-host/s4u-task-switch-host.cpp
examples/cpp/task-variable-load/s4u-task-variable-load.cpp
include/simgrid/s4u/Activity.hpp
include/simgrid/s4u/Link.hpp
include/simgrid/s4u/Task.hpp
src/bindings/python/simgrid_python.cpp
src/mc/api/strategy/MaxMatchComm.hpp
src/s4u/s4u_Comm.cpp
src/s4u/s4u_Task.cpp

@@@ -3,7 -3,7 +3,7 @@@
  /* This program is free software; you can redistribute it and/or modify it
   * under the terms of the license (GNU LGPL) which comes with this package. */
  
- /* This example demonstrate basic use of the task plugin.
+ /* This example demonstrate basic use of tasks.
   *
   * We model the following graph:
   *
   * comm is a communication task.
   */
  
- #include "simgrid/plugins/task.hpp"
 -#include "simgrid/s4u/Task.hpp"
  #include "simgrid/s4u.hpp"
++#include "simgrid/s4u/Task.hpp"
  #include <simgrid/plugins/file_system.h>
  
  XBT_LOG_NEW_DEFAULT_CATEGORY(task_simple, "Messages specific for this task example");
+ namespace sg4 = simgrid::s4u;
  
  int main(int argc, char* argv[])
  {
-   simgrid::s4u::Engine e(&argc, argv);
+   sg4::Engine e(&argc, argv);
    e.load_platform(argv[1]);
-   simgrid::plugins::Task::init();
  
    // Retrieve hosts
    auto* bob  = e.host_by_name("bob");
    auto* carl = e.host_by_name("carl");
  
    // Create tasks
-   auto exec1 = simgrid::plugins::ExecTask::init("exec1", 1e9, bob);
-   auto exec2 = simgrid::plugins::ExecTask::init("exec2", 1e9, carl);
-   auto write = simgrid::plugins::IoTask::init("write", 1e7, bob->get_disks().front(), simgrid::s4u::Io::OpType::WRITE);
-   auto read  = simgrid::plugins::IoTask::init("read", 1e7, carl->get_disks().front(), simgrid::s4u::Io::OpType::READ);
+   auto exec1 = sg4::ExecTask::init("exec1", 1e9, bob);
+   auto exec2 = sg4::ExecTask::init("exec2", 1e9, carl);
+   auto write = sg4::IoTask::init("write", 1e7, bob->get_disks().front(), sg4::Io::OpType::WRITE);
+   auto read  = sg4::IoTask::init("read", 1e7, carl->get_disks().front(), sg4::Io::OpType::READ);
  
    // Create the graph by defining dependencies between tasks
    exec1->add_successor(write);
    read->add_successor(exec2);
  
    // Add a function to be called when tasks end for log purpose
-   simgrid::plugins::Task::on_end_cb([](const simgrid::plugins::Task* t) {
 -  sg4::Task::on_completion_cb([](const sg4::Task* t) {
--    XBT_INFO("Task %s finished (%d)", t->get_name().c_str(), t->get_count());
--  });
++  sg4::Task::on_completion_cb(
++      [](const sg4::Task* t) { XBT_INFO("Task %s finished (%d)", t->get_name().c_str(), t->get_count()); });
  
-   // Enqueue two executions for task exec1
-   exec1->enqueue_execs(2);
+   // Enqueue two firings for task exec1
+   exec1->enqueue_firings(2);
  
    // Start the simulation
    e.run();
@@@ -3,7 -3,7 +3,7 @@@
  /* This program is free software; you can redistribute it and/or modify it
   * under the terms of the license (GNU LGPL) which comes with this package. */
  
- /* This example demonstrate basic use of the task plugin.
+ /* This example demonstrate basic use of tasks.
   *
   * We model the following graph:
   *
   * comm is a communication task.
   */
  
- #include "simgrid/plugins/task.hpp"
  #include "simgrid/s4u.hpp"
  
  XBT_LOG_NEW_DEFAULT_CATEGORY(task_simple, "Messages specific for this task example");
  
+ namespace sg4 = simgrid::s4u;
  int main(int argc, char* argv[])
  {
-   simgrid::s4u::Engine e(&argc, argv);
+   sg4::Engine e(&argc, argv);
    e.load_platform(argv[1]);
-   simgrid::plugins::Task::init();
  
    // Retrieve hosts
    auto* tremblay = e.host_by_name("Tremblay");
    auto* jupiter  = e.host_by_name("Jupiter");
  
    // Create tasks
-   auto exec1 = simgrid::plugins::ExecTask::init("exec1", 1e9, tremblay);
-   auto exec2 = simgrid::plugins::ExecTask::init("exec2", 1e9, jupiter);
-   auto comm  = simgrid::plugins::CommTask::init("comm", 1e7, tremblay, jupiter);
+   auto exec1 = sg4::ExecTask::init("exec1", 1e9, tremblay);
+   auto exec2 = sg4::ExecTask::init("exec2", 1e9, jupiter);
+   auto comm  = sg4::CommTask::init("comm", 1e7, tremblay, jupiter);
  
    // Create the graph by defining dependencies between tasks
    exec1->add_successor(comm);
    comm->add_successor(exec2);
  
    // Add a function to be called when tasks end for log purpose
-   simgrid::plugins::Task::on_end_cb([](const simgrid::plugins::Task* t) {
 -  sg4::Task::on_completion_cb([](const sg4::Task* t) {
--    XBT_INFO("Task %s finished (%d)", t->get_name().c_str(), t->get_count());
--  });
++  sg4::Task::on_completion_cb(
++      [](const sg4::Task* t) { XBT_INFO("Task %s finished (%d)", t->get_name().c_str(), t->get_count()); });
  
-   // Enqueue two executions for task exec1
-   exec1->enqueue_execs(2);
+   // Enqueue two firings for task exec1
+   exec1->enqueue_firings(2);
  
    // Start the simulation
    e.run();
index 0000000,2c4edb1..d290ca0
mode 000000,100644..100644
--- /dev/null
@@@ -1,0 -1,130 +1,123 @@@
 -/* This example takes the main concepts of Apache Storm presented here https://storm.apache.org/releases/2.4.0/Concepts.html
 -   and use them to build a simulation of a stream processing application
+ /* Copyright (c) 2017-2023. The SimGrid Team. All rights reserved.          */
+ /* This program is free software; you can redistribute it and/or modify it
+  * under the terms of the license (GNU LGPL) which comes with this package. */
 -   Bolt B1 and B2 processes data from Spout SA alternatively. The quantity of work to process this data is 10 flops per bytes
 -   Bolt B3 processes data from Spout SB.
 -   Bolt B4 processes data from Bolt B3.
++/* This example takes the main concepts of Apache Storm presented here
++   https://storm.apache.org/releases/2.4.0/Concepts.html and use them to build a simulation of a stream processing
++   application
+    Spout SA produces data every 100ms. The volume produced is alternatively 1e3, 1e6 and 1e9 bytes.
+    Spout SB produces 1e6 bytes every 200ms.
 -  auto fafard  = e.host_by_name("Fafard");
++   Bolt B1 and B2 processes data from Spout SA alternatively. The quantity of work to process this data is 10 flops per
++   bytes Bolt B3 processes data from Spout SB. Bolt B4 processes data from Bolt B3.
+                         Fafard
+                         ┌────┐
+                     ┌──►│ B1 │
+          Tremblay   │   └────┘
+           ┌────┐    │
+           │ SA ├────┤  Ginette
+           └────┘    │   ┌────┐
+                     └──►│ B2 │
+                         └────┘
+                        Bourassa
+          Jupiter     ┌──────────┐
+           ┌────┐     │          │
+           │ SB ├─────┤ B3 ──► B4│
+           └────┘     │          │
+                      └──────────┘
+  */
+ #include "simgrid/s4u.hpp"
+ XBT_LOG_NEW_DEFAULT_CATEGORY(task_storm, "Messages specific for this s4u example");
+ namespace sg4 = simgrid::s4u;
+ int main(int argc, char* argv[])
+ {
+   sg4::Engine e(&argc, argv);
+   e.load_platform(argv[1]);
+   // Retrieve hosts
+   auto tremblay = e.host_by_name("Tremblay");
+   auto jupiter  = e.host_by_name("Jupiter");
 -  SA->on_this_start_cb([SA_to_B1,SA_to_B2](sg4::Task* t) {
++  auto fafard   = e.host_by_name("Fafard");
+   auto ginette  = e.host_by_name("Ginette");
+   auto bourassa = e.host_by_name("Bourassa");
+   // Create execution tasks
+   auto SA = sg4::ExecTask::init("SA", tremblay->get_speed() * 0.1, tremblay);
+   auto SB = sg4::ExecTask::init("SB", jupiter->get_speed() * 0.2, jupiter);
+   auto B1 = sg4::ExecTask::init("B1", 1e8, fafard);
+   auto B2 = sg4::ExecTask::init("B2", 1e8, ginette);
+   auto B3 = sg4::ExecTask::init("B3", 1e8, bourassa);
+   auto B4 = sg4::ExecTask::init("B4", 2e8, bourassa);
+   // Create communication tasks
+   auto SA_to_B1 = sg4::CommTask::init("SA_to_B1", 0, tremblay, fafard);
+   auto SA_to_B2 = sg4::CommTask::init("SA_to_B2", 0, tremblay, ginette);
+   auto SB_to_B3 = sg4::CommTask::init("SB_to_B3", 1e6, jupiter, bourassa);
+   // Create the graph by defining dependencies between tasks
+   // Some dependencies are defined dynamically
+   SA_to_B1->add_successor(B1);
+   SA_to_B2->add_successor(B2);
+   SB->add_successor(SB_to_B3);
+   SB_to_B3->add_successor(B3);
+   B3->add_successor(B4);
+   /* Dynamic modification of the graph and bytes sent
+      Alternatively we: remove/add the link between SA and SA_to_B2
+                        add/remove the link between SA and SA_to_B1
+   */
 -    }
 -    else {
++  SA->on_this_start_cb([SA_to_B1, SA_to_B2](sg4::Task* t) {
+     int count = t->get_count();
+     sg4::CommTaskPtr comm;
+     if (count % 2 == 0) {
+       t->remove_successor(SA_to_B2);
+       t->add_successor(SA_to_B1);
+       comm = SA_to_B1;
 -    std::vector<double> amount = {1e3,1e6,1e9};
++    } else {
+       t->remove_successor(SA_to_B1);
+       t->add_successor(SA_to_B2);
+       comm = SA_to_B2;
+     }
 -  SA_to_B1->on_this_start_cb([SA](sg4::Task* t) {
 -    t->set_token(t->get_next_token_from(SA));
 -  });
 -  SA_to_B2->on_this_start_cb([SA](sg4::Task* t) {
 -    t->set_token(t->get_next_token_from(SA));
 -  });
++    std::vector<double> amount = {1e3, 1e6, 1e9};
+     comm->set_amount(amount[count % 3]);
+     auto token = std::make_shared<sg4::Token>();
+     token->set_data(new double(amount[count % 3]));
+     t->set_token(token);
+   });
+   // The token sent by SA is forwarded by both communication tasks
 -  sg4::Task::on_completion_cb([]
 -  (const sg4::Task* t) {
 -    XBT_INFO("Task %s finished (%d)", t->get_name().c_str(), t->get_count());
 -  });
++  SA_to_B1->on_this_start_cb([SA](sg4::Task* t) { t->set_token(t->get_next_token_from(SA)); });
++  SA_to_B2->on_this_start_cb([SA](sg4::Task* t) { t->set_token(t->get_next_token_from(SA)); });
+   /* B1 and B2 read the value of the token received by their predecessors
+      and use it to adapt their amount of work to do.
+   */
+   B1->on_this_start_cb([SA_to_B1](sg4::Task* t) {
+     auto data = t->get_next_token_from(SA_to_B1)->get_unique_data<double>();
+     t->set_amount(*data * 10);
+   });
+   B2->on_this_start_cb([SA_to_B2](sg4::Task* t) {
+     auto data = t->get_next_token_from(SA_to_B2)->get_unique_data<double>();
+     t->set_amount(*data * 10);
+   });
+   // Enqueue firings for tasks without predecessors
+   SA->enqueue_firings(5);
+   SB->enqueue_firings(5);
+   // Add a function to be called when tasks end for log purpose
++  sg4::Task::on_completion_cb(
++      [](const sg4::Task* t) { XBT_INFO("Task %s finished (%d)", t->get_name().c_str(), t->get_count()); });
+   // Start the simulation
+   e.run();
+   return 0;
+ }
   * With exec1 and exec2 on different hosts.
   */
  
- #include "simgrid/plugins/task.hpp"
  #include "simgrid/s4u.hpp"
  
  XBT_LOG_NEW_DEFAULT_CATEGORY(task_switch_host, "Messages specific for this task example");
+ namespace sg4 = simgrid::s4u;
  
  int main(int argc, char* argv[])
  {
-   simgrid::s4u::Engine e(&argc, argv);
+   sg4::Engine e(&argc, argv);
    e.load_platform(argv[1]);
-   simgrid::plugins::Task::init();
  
    // Retrieve hosts
    auto* tremblay = e.host_by_name("Tremblay");
    auto* fafard   = e.host_by_name("Fafard");
  
    // Create tasks
-   auto comm0 = simgrid::plugins::CommTask::init("comm0");
+   auto comm0 = sg4::CommTask::init("comm0");
    comm0->set_bytes(1e7);
    comm0->set_source(tremblay);
-   auto exec1 = simgrid::plugins::ExecTask::init("exec1", 1e9, jupiter);
-   auto exec2 = simgrid::plugins::ExecTask::init("exec2", 1e9, fafard);
-   auto comm1 = simgrid::plugins::CommTask::init("comm1", 1e7, jupiter, tremblay);
-   auto comm2 = simgrid::plugins::CommTask::init("comm2", 1e7, fafard, tremblay);
+   auto exec1 = sg4::ExecTask::init("exec1", 1e9, jupiter);
+   auto exec2 = sg4::ExecTask::init("exec2", 1e9, fafard);
+   auto comm1 = sg4::CommTask::init("comm1", 1e7, jupiter, tremblay);
+   auto comm2 = sg4::CommTask::init("comm2", 1e7, fafard, tremblay);
  
    // Create the initial graph by defining dependencies between tasks
    comm0->add_successor(exec2);
    exec2->add_successor(comm2);
  
    // Add a function to be called when tasks end for log purpose
-   simgrid::plugins::Task::on_end_cb([](const simgrid::plugins::Task* t) {
 -  sg4::Task::on_completion_cb([](const sg4::Task* t) {
--    XBT_INFO("Task %s finished (%d)", t->get_name().c_str(), t->get_count());
--  });
++  sg4::Task::on_completion_cb(
++      [](const sg4::Task* t) { XBT_INFO("Task %s finished (%d)", t->get_name().c_str(), t->get_count()); });
  
-   // Add a function to be called before each executions of comm0
+   // Add a function to be called before each firing of comm0
    // This function modifies the graph of tasks by adding or removing
    // successors to comm0
-   comm0->on_this_start_cb([exec1, exec2, jupiter, fafard](simgrid::plugins::Task* t) {
-     auto* comm0      = dynamic_cast<simgrid::plugins::CommTask*>(t);
+   comm0->on_this_start_cb([comm0, exec1, exec2, jupiter, fafard](sg4::Task*) {
      static int count = 0;
      if (count % 2 == 0) {
        comm0->set_destination(jupiter);
@@@ -69,8 -67,8 +66,8 @@@
      count++;
    });
  
-   // Enqueue four executions for task comm0
-   comm0->enqueue_execs(4);
+   // Enqueue four firings for task comm0
+   comm0->enqueue_firings(4);
  
    // Start the simulation
    e.run();
   * With a heavy load there is a burst of comm before the exec task can even finish once.
   */
  
- #include "simgrid/plugins/task.hpp"
  #include "simgrid/s4u.hpp"
  
  XBT_LOG_NEW_DEFAULT_CATEGORY(task_variable_load, "Messages specific for this s4u example");
+ namespace sg4 = simgrid::s4u;
  
- static void variable_load(simgrid::plugins::TaskPtr t)
+ static void variable_load(sg4::TaskPtr t)
  {
    XBT_INFO("--- Small load ---");
    for (int i = 0; i < 3; i++) {
-     t->enqueue_execs(1);
-     simgrid::s4u::this_actor::sleep_for(100);
+     t->enqueue_firings(1);
+     sg4::this_actor::sleep_for(100);
    }
-   simgrid::s4u::this_actor::sleep_until(1000);
+   sg4::this_actor::sleep_until(1000);
    XBT_INFO("--- Heavy load ---");
    for (int i = 0; i < 3; i++) {
-     t->enqueue_execs(1);
-     simgrid::s4u::this_actor::sleep_for(1);
+     t->enqueue_firings(1);
+     sg4::this_actor::sleep_for(1);
    }
  }
  
  int main(int argc, char* argv[])
  {
-   simgrid::s4u::Engine e(&argc, argv);
+   sg4::Engine e(&argc, argv);
    e.load_platform(argv[1]);
-   simgrid::plugins::Task::init();
  
    // Retreive hosts
    auto* tremblay = e.host_by_name("Tremblay");
    auto* jupiter  = e.host_by_name("Jupiter");
  
    // Create tasks
-   auto comm = simgrid::plugins::CommTask::init("comm", 1e7, tremblay, jupiter);
-   auto exec = simgrid::plugins::ExecTask::init("exec", 1e9, jupiter);
+   auto comm = sg4::CommTask::init("comm", 1e7, tremblay, jupiter);
+   auto exec = sg4::ExecTask::init("exec", 1e9, jupiter);
  
    // Create the graph by defining dependencies between tasks
    comm->add_successor(exec);
  
    // Add a function to be called when tasks end for log purpose
-   simgrid::plugins::Task::on_end_cb([](const simgrid::plugins::Task* t) {
 -  sg4::Task::on_completion_cb([](const sg4::Task* t) {
--    XBT_INFO("Task %s finished (%d)", t->get_name().c_str(), t->get_count());
--  });
++  sg4::Task::on_completion_cb(
++      [](const sg4::Task* t) { XBT_INFO("Task %s finished (%d)", t->get_name().c_str(), t->get_count()); });
  
    // Create the actor that will inject load during the simulation
-   simgrid::s4u::Actor::create("input", tremblay, variable_load, comm);
+   sg4::Actor::create("input", tremblay, variable_load, comm);
  
    // Start the simulation
    e.run();
@@@ -114,10 -114,6 +114,6 @@@ protected
    virtual void fire_on_this_veto() const = 0;
  
  public:
-   XBT_ATTRIB_DEPRECATED_v334("All start() are vetoable now. Please use start() ") void vetoable_start()
-   {
-     start();
-   }
    void start()
    {
      state_ = State::STARTING;
@@@ -267,10 -263,10 +263,16 @@@ public
     *  dependency or no resource assigned) */
    void on_this_veto_cb(const std::function<void(AnyActivity&)>& cb) { on_this_veto.connect(cb); }
  
-   XBT_ATTRIB_DEPRECATED_v337("Please use on_suspend_cb() instead") static void on_suspended_cb(
-       const std::function<void(Activity const&)>& cb) { on_suspend.connect(cb); }
-   XBT_ATTRIB_DEPRECATED_v337("Please use on_resume_cb() instead") static void on_resumed_cb(
-       const std::function<void(Activity const&)>& cb) { on_resume.connect(cb);  }
+   XBT_ATTRIB_DEPRECATED_v338("Please use on_suspend_cb() instead") static void on_suspended_cb(
 -      const std::function<void(Activity const&)>& cb) { on_suspend.connect(cb); }
++      const std::function<void(Activity const&)>& cb)
++  {
++    on_suspend.connect(cb);
++  }
+   XBT_ATTRIB_DEPRECATED_v338("Please use on_resume_cb() instead") static void on_resumed_cb(
 -      const std::function<void(Activity const&)>& cb) { on_resume.connect(cb);  }
++      const std::function<void(Activity const&)>& cb)
++  {
++    on_resume.connect(cb);
++  }
  
    AnyActivity* add_successor(ActivityPtr a)
    {
    }
    const std::string& get_tracing_category() const { return tracing_category_; }
  
-   XBT_ATTRIB_DEPRECATED_v334("Please use Activity::set_data()") AnyActivity* set_user_data(void* data)
-   {
-     set_data(data);
-     return static_cast<AnyActivity*>(this);
-   }
-   XBT_ATTRIB_DEPRECATED_v334("Please use Activity::get_data<>()") void* get_user_data() const
-   {
-     return get_data<void>();
-   }
-   XBT_ATTRIB_DEPRECATED_v334("All start() are vetoable now. Please use start() ") AnyActivity* vetoable_start()
-   {
-     return start();
-   }
    AnyActivity* start()
    {
      Activity::start();
@@@ -133,10 -133,10 +133,7 @@@ public
    double get_load() const;
  
  #ifndef DOXYGEN
-   XBT_ATTRIB_DEPRECATED_v337("Please use get_load() instead") double get_usage() const
 -  XBT_ATTRIB_DEPRECATED_v338("Please use get_load() instead") double get_usage() const
--  {
--    return get_load();
--  }
++  XBT_ATTRIB_DEPRECATED_v338("Please use get_load() instead") double get_usage() const { return get_load(); }
  #endif
  
    /** @brief Check if the Link is used (at least one flow uses the link) */
@@@ -202,7 -202,7 +199,7 @@@ public
      on_this_destruction.connect(cb);
    }
  
-   XBT_ATTRIB_DEPRECATED_v337("Please use on_onoff_cb() instead") static void on_state_change_cb(
+   XBT_ATTRIB_DEPRECATED_v338("Please use on_onoff_cb() instead") static void on_state_change_cb(
        const std::function<void(Link const&)>& cb)
    {
      on_onoff.connect(cb);
@@@ -1,16 -1,17 +1,17 @@@
- #ifndef SIMGRID_PLUGINS_TASK_H_
- #define SIMGRID_PLUGINS_TASK_H_
+ #ifndef SIMGRID_S4U_TASK_H_
+ #define SIMGRID_S4U_TASK_H_
  
  #include <simgrid/s4u/Activity.hpp>
  #include <simgrid/s4u/Io.hpp>
  #include <xbt/Extendable.hpp>
  
  #include <atomic>
+ #include <deque>
  #include <map>
  #include <memory>
  #include <set>
  
- namespace simgrid::plugins {
+ namespace simgrid::s4u {
  
  class Task;
  using TaskPtr = boost::intrusive_ptr<Task>;
@@@ -23,59 -24,68 +24,68 @@@ using CommTaskPtr = boost::intrusive_pt
  class IoTask;
  using IoTaskPtr = boost::intrusive_ptr<IoTask>;
  
- struct ExtendedAttributeActivity {
-   static simgrid::xbt::Extension<simgrid::s4u::Activity, ExtendedAttributeActivity> EXTENSION_ID;
-   Task* task_;
- };
+ class XBT_PUBLIC Token : public xbt::Extendable<Token> {};
  
  class Task {
 -  int count_        = 0;
 -  bool working_     = false;
+   std::string name_;
+   double amount_;
+   int queued_firings_ = 0;
++  int count_          = 0;
++  bool working_       = false;
    std::set<Task*> successors_                 = {};
    std::map<Task*, unsigned int> predecessors_ = {};
+   std::atomic_int_fast32_t refcount_{0};
  
    bool ready_to_run() const;
    void receive(Task* source);
-   void complete();
+   std::shared_ptr<Token> token_ = nullptr;
+   std::deque<std::map<TaskPtr, std::shared_ptr<Token>>> tokens_received_;
+   ActivityPtr previous_activity_;
+   ActivityPtr current_activity_;
  
  protected:
-   std::string name_;
-   double amount_;
-   int queued_execs_ = 0;
-   int count_        = 0;
-   bool working_     = false;
-   s4u::ActivityPtr previous_activity_;
-   s4u::ActivityPtr current_activity_;
-   xbt::signal<void(Task*)> on_this_start_;
-   xbt::signal<void(Task*)> on_this_end_;
    explicit Task(const std::string& name);
--  virtual ~Task()     = default;
-   virtual void fire() = 0;
++  virtual ~Task() = default;
  
-   static xbt::signal<void(Task*)> on_start;
-   static xbt::signal<void(Task*)> on_end;
-   std::atomic_int_fast32_t refcount_{0};
+   virtual void fire();
+   void complete();
 -  void set_current_activity (ActivityPtr a) { current_activity_ = a; }
++  void set_current_activity(ActivityPtr a) { current_activity_ = a; }
+   inline static xbt::signal<void(Task*)> on_start;
+   xbt::signal<void(Task*)> on_this_start;
+   inline static xbt::signal<void(Task*)> on_completion;
+   xbt::signal<void(Task*)> on_this_completion;
  
  public:
-   static void init();
    const std::string& get_name() const { return name_; }
    const char* get_cname() const { return name_.c_str(); }
-   void enqueue_execs(int n);
    void set_amount(double amount);
    double get_amount() const { return amount_; }
+   int get_count() const { return count_; }
+   void set_token(std::shared_ptr<Token> token);
+   std::shared_ptr<Token> get_next_token_from(TaskPtr t);
    void add_successor(TaskPtr t);
    void remove_successor(TaskPtr t);
    void remove_all_successors();
    const std::set<Task*>& get_successors() const { return successors_; }
-   void on_this_start_cb(const std::function<void(Task*)>& func);
-   void on_this_end_cb(const std::function<void(Task*)>& func);
-   int get_count() const;
  
-   /** Add a callback fired before a task activity start.
+   void enqueue_firings(int n);
+   /** Add a callback fired before this task activity starts */
 -  void on_this_start_cb(const std::function<void(Task*)>& func){ on_this_start.connect(func); }
++  void on_this_start_cb(const std::function<void(Task*)>& func) { on_this_start.connect(func); }
+   /** Add a callback fired before a task activity starts.
     * Triggered after the on_this_start function**/
    static void on_start_cb(const std::function<void(Task*)>& cb) { on_start.connect(cb); }
-   /** Add a callback fired after a task activity end.
-    * Triggered after the on_this_end function, but before
-    * sending tokens to successors.**/
-   static void on_end_cb(const std::function<void(Task*)>& cb) { on_end.connect(cb); }
+   /** Add a callback fired before this task activity ends */
+   void on_this_completion_cb(const std::function<void(Task*)>& func) { on_this_completion.connect(func); };
+   /** Add a callback fired after a task activity ends.
+    * Triggered after the on_this_end function, but before sending tokens to successors.**/
+   static void on_completion_cb(const std::function<void(Task*)>& cb) { on_completion.connect(cb); }
  
  #ifndef DOXYGEN
    friend void intrusive_ptr_release(Task* o)
  #endif
  };
  
- class ExecTask : public Task {
-   s4u::Host* host_;
+ class CommTask : public Task {
+   Host* source_;
+   Host* destination_;
  
-   explicit ExecTask(const std::string& name);
+   explicit CommTask(const std::string& name);
    void fire() override;
  
  public:
-   static ExecTaskPtr init(const std::string& name);
-   static ExecTaskPtr init(const std::string& name, double flops, s4u::Host* host);
-   ExecTaskPtr set_host(s4u::Host* host);
-   s4u::Host* get_host() const { return host_; }
-   ExecTaskPtr set_flops(double flops);
-   double get_flops() const { return get_amount(); }
+   static CommTaskPtr init(const std::string& name);
+   static CommTaskPtr init(const std::string& name, double bytes, Host* source, Host* destination);
+   CommTaskPtr set_source(Host* source);
+   Host* get_source() const { return source_; }
+   CommTaskPtr set_destination(Host* destination);
+   Host* get_destination() const { return destination_; }
+   CommTaskPtr set_bytes(double bytes);
+   double get_bytes() const { return get_amount(); }
  };
  
- class CommTask : public Task {
-   s4u::Host* source_;
-   s4u::Host* destination_;
+ class ExecTask : public Task {
+   Host* host_;
  
-   explicit CommTask(const std::string& name);
+   explicit ExecTask(const std::string& name);
    void fire() override;
  
  public:
-   static CommTaskPtr init(const std::string& name);
-   static CommTaskPtr init(const std::string& name, double bytes, s4u::Host* source, s4u::Host* destination);
-   CommTaskPtr set_source(s4u::Host* source);
-   s4u::Host* get_source() const { return source_; }
-   CommTaskPtr set_destination(s4u::Host* destination);
-   s4u::Host* get_destination() const { return destination_; }
-   CommTaskPtr set_bytes(double bytes);
-   double get_bytes() const { return get_amount(); }
+   static ExecTaskPtr init(const std::string& name);
+   static ExecTaskPtr init(const std::string& name, double flops, Host* host);
+   ExecTaskPtr set_host(Host* host);
+   Host* get_host() const { return host_; }
+   ExecTaskPtr set_flops(double flops);
+   double get_flops() const { return get_amount(); }
  };
  
  class IoTask : public Task {
-   s4u::Disk* disk_;
-   s4u::Io::OpType type_;
+   Disk* disk_;
+   Io::OpType type_;
    explicit IoTask(const std::string& name);
    void fire() override;
  
  public:
    static IoTaskPtr init(const std::string& name);
-   static IoTaskPtr init(const std::string& name, double bytes, s4u::Disk* disk, s4u::Io::OpType type);
-   IoTaskPtr set_disk(s4u::Disk* disk);
-   s4u::Disk* get_disk() const { return disk_; }
+   static IoTaskPtr init(const std::string& name, double bytes, Disk* disk, Io::OpType type);
+   IoTaskPtr set_disk(Disk* disk);
+   Disk* get_disk() const { return disk_; }
    IoTaskPtr set_bytes(double bytes);
    double get_bytes() const { return get_amount(); }
-   IoTaskPtr set_op_type(s4u::Io::OpType type);
-   s4u::Io::OpType get_op_type() const { return type_; }
+   IoTaskPtr set_op_type(Io::OpType type);
+   Io::OpType get_op_type() const { return type_; }
  };
- } // namespace simgrid::plugins
+ } // namespace simgrid::s4u
  #endif
@@@ -11,7 -11,6 +11,6 @@@
  #include "simgrid/kernel/ProfileBuilder.hpp"
  #include "simgrid/kernel/routing/NetPoint.hpp"
  #include <simgrid/Exception.hpp>
- #include <simgrid/plugins/task.hpp>
  #include <simgrid/s4u/Actor.hpp>
  #include <simgrid/s4u/Barrier.hpp>
  #include <simgrid/s4u/Comm.hpp>
@@@ -25,6 -24,7 +24,7 @@@
  #include <simgrid/s4u/Mutex.hpp>
  #include <simgrid/s4u/NetZone.hpp>
  #include <simgrid/s4u/Semaphore.hpp>
+ #include <simgrid/s4u/Task.hpp>
  #include <simgrid/version.h>
  
  #include <algorithm>
  #include <vector>
  
  namespace py = pybind11;
- using simgrid::plugins::CommTask;
- using simgrid::plugins::CommTaskPtr;
- using simgrid::plugins::ExecTask;
- using simgrid::plugins::ExecTaskPtr;
- using simgrid::plugins::IoTask;
- using simgrid::plugins::IoTaskPtr;
- using simgrid::plugins::Task;
- using simgrid::plugins::TaskPtr;
 -using simgrid::s4u::CommTask;
 -using simgrid::s4u::CommTaskPtr;
 -using simgrid::s4u::ExecTask;
 -using simgrid::s4u::ExecTaskPtr;
 -using simgrid::s4u::IoTask;
 -using simgrid::s4u::IoTaskPtr;
 -using simgrid::s4u::Task;
 -using simgrid::s4u::TaskPtr;
  using simgrid::s4u::Actor;
  using simgrid::s4u::ActorPtr;
  using simgrid::s4u::Barrier;
  using simgrid::s4u::BarrierPtr;
  using simgrid::s4u::Comm;
  using simgrid::s4u::CommPtr;
++using simgrid::s4u::CommTask;
++using simgrid::s4u::CommTaskPtr;
  using simgrid::s4u::Disk;
  using simgrid::s4u::Engine;
++using simgrid::s4u::ExecTask;
++using simgrid::s4u::ExecTaskPtr;
  using simgrid::s4u::Host;
  using simgrid::s4u::Io;
++using simgrid::s4u::IoTask;
++using simgrid::s4u::IoTaskPtr;
  using simgrid::s4u::Link;
  using simgrid::s4u::Mailbox;
  using simgrid::s4u::Mutex;
  using simgrid::s4u::MutexPtr;
  using simgrid::s4u::Semaphore;
  using simgrid::s4u::SemaphorePtr;
++using simgrid::s4u::Task;
++using simgrid::s4u::TaskPtr;
  
  XBT_LOG_NEW_DEFAULT_CATEGORY(python, "python");
  
@@@ -171,54 -171,16 +171,16 @@@ PYBIND11_MODULE(simgrid, m
               return new simgrid::s4u::Engine(&argc, argv.data());
             }),
             "The constructor should take the parameters from the command line, as is ")
-       .def_static("get_clock",
-                   []() // XBT_ATTRIB_DEPRECATED_v334
-                   {
-                     PyErr_WarnEx(
-                         PyExc_DeprecationWarning,
-                         "get_clock() is deprecated and  will be dropped after v3.33, use `Engine.clock` instead.", 1);
-                     return Engine::get_clock();
-                   })
        .def_property_readonly_static(
            "clock", [](py::object /* self */) { return Engine::get_clock(); },
            "The simulation time, ie the amount of simulated seconds since the simulation start.")
        .def_property_readonly_static(
            "instance", [](py::object /* self */) { return Engine::get_instance(); }, "Retrieve the simulation engine")
-       .def("get_all_hosts",
-            [](py::object self) // XBT_ATTRIB_DEPRECATED_v334
-            {
-              PyErr_WarnEx(PyExc_DeprecationWarning,
-                           "get_all_hosts() is deprecated and  will be dropped after v3.33, use all_hosts instead.", 1);
-              return self.attr("all_hosts");
-            })
        .def("host_by_name", &Engine::host_by_name_or_null,
             "Retrieve a host by its name, or None if it does not exist in the platform.")
        .def_property_readonly("all_hosts", &Engine::get_all_hosts, "Returns the list of all hosts found in the platform")
-       .def("get_all_links",
-            [](py::object self) // XBT_ATTRIB_DEPRECATED_v334
-            {
-              PyErr_WarnEx(PyExc_DeprecationWarning,
-                           "get_all_links() is deprecated and  will be dropped after v3.33, use all_links instead.", 1);
-              return self.attr("all_links");
-            })
        .def_property_readonly("all_links", &Engine::get_all_links, "Returns the list of all links found in the platform")
-       .def("get_all_netpoints",
-            [](py::object self) // XBT_ATTRIB_DEPRECATED_v334
-            {
-              PyErr_WarnEx(
-                  PyExc_DeprecationWarning,
-                  "get_all_netpoints() is deprecated and  will be dropped after v3.33, use all_netpoints instead.", 1);
-              return self.attr("all_netpoints");
-            })
        .def_property_readonly("all_netpoints", &Engine::get_all_netpoints)
-       .def("get_netzone_root",
-            [](py::object self) // XBT_ATTRIB_DEPRECATED_v334
-            {
-              PyErr_WarnEx(
-                  PyExc_DeprecationWarning,
-                  "get_netzone_root() is deprecated and  will be dropped after v3.33, use netzone_root instead.", 1);
-              return self.attr("netzone_root");
-            })
        .def_property_readonly("netzone_root", &Engine::get_netzone_root,
                               "Retrieve the root netzone, containing all others.")
        .def("netpoint_by_name", &Engine::netpoint_by_name_or_null)
        .def("create_router", &simgrid::s4u::NetZone::create_router, "Create a router")
        .def("set_parent", &simgrid::s4u::NetZone::set_parent, "Set the parent of this zone")
        .def("set_property", &simgrid::s4u::NetZone::set_property, "Add a property to this zone")
-       .def("get_netpoint",
-            [](py::object self) // XBT_ATTRIB_DEPRECATED_v334
-            {
-              PyErr_WarnEx(PyExc_DeprecationWarning,
-                           "get_netpoint() is deprecated and  will be dropped after v3.33, use netpoint instead.", 1);
-              return self.attr("netpoint");
-            })
        .def_property_readonly("netpoint", &simgrid::s4u::NetZone::get_netpoint,
                               "Retrieve the netpoint associated to this zone")
        .def("seal", &simgrid::s4u::NetZone::seal, "Seal this NetZone")
            "   \"\"\"\n\n"
            "The second function parameter is the periodicity: the time to wait after the last event to start again over "
            "the list. Set it to -1 to not loop over.")
-       .def("get_pstate_count",
-            [](py::object self) // XBT_ATTRIB_DEPRECATED_v334
-            {
-              PyErr_WarnEx(
-                  PyExc_DeprecationWarning,
-                  "get_pstate_count() is deprecated and  will be dropped after v3.33, use pstate_count instead.", 1);
-              return self.attr("pstate_count");
-            })
        .def_property_readonly("pstate_count", &Host::get_pstate_count, "Retrieve the count of defined pstate levels")
-       .def("get_pstate_speed",
-            [](py::object self, int state) // XBT_ATTRIB_DEPRECATED_v334
-            {
-              PyErr_WarnEx(
-                  PyExc_DeprecationWarning,
-                  "get_pstate_speed() is deprecated and  will be dropped after v3.33, use pstate_speed instead.", 1);
-              return self.attr("pstate_speed")(state);
-            })
        .def("pstate_speed", &Host::get_pstate_speed, "Retrieve the maximal speed at the given pstate")
-       .def("get_netpoint",
-            [](py::object self) // XBT_ATTRIB_DEPRECATED_v334
-            {
-              PyErr_WarnEx(PyExc_DeprecationWarning,
-                           "get_netpoint() is deprecated and  will be dropped after v3.33, use netpoint instead.", 1);
-              return self.attr("netpoint");
-            })
        .def_property_readonly("netpoint", &Host::get_netpoint, "Retrieve the netpoint associated to this zone")
        .def_property_readonly("disks", &Host::get_disks, "The list of disks on this host (read-only).")
        .def("get_disks", &Host::get_disks, "Retrieve the list of disks in this host")
-       .def("set_core_count",
-            [](py::object self, double count) // XBT_ATTRIB_DEPRECATED_v334
-            {
-              PyErr_WarnEx(PyExc_DeprecationWarning,
-                           "set_core_count() is deprecated and  will be dropped after v3.33, use core_count instead.",
-                           1);
-              self.attr("core_count")(count);
-            })
        .def_property("core_count", &Host::get_core_count,
                      py::cpp_function(&Host::set_core_count, py::call_guard<py::gil_scoped_release>()),
                      "Manage the number of cores in the CPU")
    /* Class Split-Duplex Link */
    py::class_<simgrid::s4u::SplitDuplexLink, Link, std::unique_ptr<simgrid::s4u::SplitDuplexLink, py::nodelete>>(
        m, "SplitDuplexLink", "Network split-duplex link")
-       .def("get_link_up",
-            [](py::object self) // XBT_ATTRIB_DEPRECATED_v334
-            {
-              PyErr_WarnEx(PyExc_DeprecationWarning,
-                           "get_link_up() is deprecated and  will be dropped after v3.33, use link_up instead.", 1);
-              return self.attr("link_up");
-            })
        .def_property_readonly("link_up", &simgrid::s4u::SplitDuplexLink::get_link_up, "Get link direction up")
-       .def("get_link_down",
-            [](py::object self) // XBT_ATTRIB_DEPRECATED_v334
-            {
-              PyErr_WarnEx(PyExc_DeprecationWarning,
-                           "get_link_down() is deprecated and  will be dropped after v3.33, use link_down instead.", 1);
-              return self.attr("link_down");
-            })
        .def_property_readonly("link_down", &simgrid::s4u::SplitDuplexLink::get_link_down, "Get link direction down");
  
    /* Class Mailbox */
  
    /* Class Task */
    py::class_<Task, TaskPtr>(m, "Task", "Task. See the C++ documentation for details.")
-       .def_static("init", &Task::init)
        .def_static(
            "on_start_cb",
            [](py::object cb) {
            },
            "Add a callback called when each task starts.")
        .def_static(
-           "on_end_cb",
+           "on_completion_cb",
            [](py::object cb) {
              cb.inc_ref(); // keep alive after return
              const py::gil_scoped_release gil_release;
-             Task::on_end_cb([cb_p = cb.ptr()](Task* op) {
+             Task::on_completion_cb([cb_p = cb.ptr()](Task* op) {
                const py::gil_scoped_acquire py_context; // need a new context for callback
                py::reinterpret_borrow<py::function>(cb_p)(op);
              });
        .def_property_readonly("count", &Task::get_count, "The execution count of this task (read-only).")
        .def_property_readonly("successors", &Task::get_successors, "The successors of this task (read-only).")
        .def_property("amount", &Task::get_amount, &Task::set_amount, "The amount of work to do for this task.")
-       .def("enqueue_execs", py::overload_cast<int>(&Task::enqueue_execs), py::call_guard<py::gil_scoped_release>(),
-            py::arg("n"), "Enqueue executions for this task.")
+       .def("enqueue_firings", py::overload_cast<int>(&Task::enqueue_firings), py::call_guard<py::gil_scoped_release>(),
+            py::arg("n"), "Enqueue firings for this task.")
        .def("add_successor", py::overload_cast<TaskPtr>(&Task::add_successor), py::call_guard<py::gil_scoped_release>(),
             py::arg("op"), "Add a successor to this task.")
        .def("remove_successor", py::overload_cast<TaskPtr>(&Task::remove_successor),
             "Remove all successors of this task.")
        .def("on_this_start_cb", py::overload_cast<const std::function<void(Task*)>&>(&Task::on_this_start_cb),
             py::arg("func"), "Add a callback called when this task starts.")
-       .def("on_this_end_cb", py::overload_cast<const std::function<void(Task*)>&>(&Task::on_this_end_cb),
+       .def("on_this_completion_cb", py::overload_cast<const std::function<void(Task*)>&>(&Task::on_this_completion_cb),
             py::arg("func"), "Add a callback called when this task ends.")
        .def(
            "__repr__", [](const TaskPtr op) { return "Task(" + op->get_name() + ")"; },
@@@ -13,7 -13,6 +13,6 @@@ namespace simgrid::mc 
  /** Wait MC guiding class that aims at minimizing the number of in-fly communication.
   *  When possible, it will try to match corresponding in-fly communications. */
  class MaxMatchComm : public Strategy {
    /** Stores for each mailbox what kind of transition is waiting on it.
     *  Negative number means that much recv are waiting on that mailbox, while
     *  a positiv number means that much send are waiting there. */
@@@ -56,10 -55,9 +55,9 @@@ public
          if (mailbox_.count(cast_recv->get_mailbox()) > 0 and
              mailbox_.at(cast_recv->get_mailbox()) > 0) { 
              aid_value--; // This means we have waiting recv corresponding to this recv
--        } else { 
--            aid_value++; 
--        }
++        } else {
++              aid_value++;
++          }
        }
     
        const CommSendTransition* cast_send = dynamic_cast<CommSendTransition const*>(transition);
@@@ -92,7 -90,6 +90,6 @@@
      if (cast_send != nullptr)
        last_mailbox_ = cast_send->get_mailbox();
    }
  };
  
  } // namespace simgrid::mc
diff --combined src/s4u/s4u_Comm.cpp
@@@ -29,7 -29,7 +29,7 @@@ CommPtr Comm::set_copy_data_callback(co
  }
  
  void Comm::copy_buffer_callback(kernel::activity::CommImpl* comm, void* buff,
-                                 size_t buff_size) // XBT_ATTRIB_DEPRECATED_v337
+                                 size_t buff_size) // XBT_ATTRIB_DEPRECATED_v338
  {
    XBT_DEBUG("Copy the data over");
    memcpy(comm->dst_buff_, buff, buff_size);
@@@ -41,7 -41,7 +41,7 @@@
  }
  
  void Comm::copy_pointer_callback(kernel::activity::CommImpl* comm, void* buff,
-                                  size_t buff_size) // XBT_ATTRIB_DEPRECATED_v337
+                                  size_t buff_size) // XBT_ATTRIB_DEPRECATED_v338
  {
    xbt_assert((buff_size == sizeof(void*)), "Cannot copy %zu bytes: must be sizeof(void*)", buff_size);
    *(void**)(comm->dst_buff_) = buff;
@@@ -78,13 -78,12 +78,13 @@@ void Comm::send(kernel::actor::ActorImp
      simgrid::kernel::activity::ActivityImplPtr comm = nullptr;
  
      simgrid::kernel::actor::CommIsendSimcall send_observer{
 -        sender,  mbox->get_impl(), task_size, rate, static_cast<unsigned char*>(src_buff), src_buff_size, match_fun,
 -        nullptr, copy_data_fun,    data,      false};
 +        sender,        mbox->get_impl(), task_size, rate,          static_cast<unsigned char*>(src_buff),
 +        src_buff_size, match_fun,        nullptr,   copy_data_fun, data,
 +        false,         "Isend"};
      comm = simgrid::kernel::actor::simcall_answered(
          [&send_observer] { return simgrid::kernel::activity::CommImpl::isend(&send_observer); }, &send_observer);
  
 -    if (simgrid::kernel::actor::ActivityWaitSimcall wait_observer{sender, comm.get(), timeout};
 +    if (simgrid::kernel::actor::ActivityWaitSimcall wait_observer{sender, comm.get(), timeout, "Wait"};
          simgrid::kernel::actor::simcall_blocking(
              [&wait_observer] {
                wait_observer.get_activity()->wait_for(wait_observer.get_issuer(), wait_observer.get_timeout());
@@@ -96,7 -95,7 +96,7 @@@
    } else {
      simgrid::kernel::actor::CommIsendSimcall observer(sender, mbox->get_impl(), task_size, rate,
                                                        static_cast<unsigned char*>(src_buff), src_buff_size, match_fun,
 -                                                      nullptr, copy_data_fun, data, false);
 +                                                      nullptr, copy_data_fun, data, false, "Isend");
      simgrid::kernel::actor::simcall_blocking([&observer, timeout] {
        simgrid::kernel::activity::ActivityImplPtr comm = simgrid::kernel::activity::CommImpl::isend(&observer);
        comm->wait_for(observer.get_issuer(), timeout);
@@@ -123,12 -122,11 +123,12 @@@ void Comm::recv(kernel::actor::ActorImp
                                                        match_fun,
                                                        copy_data_fun,
                                                        data,
 -                                                      rate};
 +                                                      rate,
 +                                                      "Irecv"};
      comm = simgrid::kernel::actor::simcall_answered(
          [&observer] { return simgrid::kernel::activity::CommImpl::irecv(&observer); }, &observer);
  
 -    if (simgrid::kernel::actor::ActivityWaitSimcall wait_observer{receiver, comm.get(), timeout};
 +    if (simgrid::kernel::actor::ActivityWaitSimcall wait_observer{receiver, comm.get(), timeout, "wait"};
          simgrid::kernel::actor::simcall_blocking(
              [&wait_observer] {
                wait_observer.get_activity()->wait_for(wait_observer.get_issuer(), wait_observer.get_timeout());
      comm = nullptr;
    } else {
      simgrid::kernel::actor::CommIrecvSimcall observer(receiver, mbox->get_impl(), static_cast<unsigned char*>(dst_buff),
 -                                                      dst_buff_size, match_fun, copy_data_fun, data, rate);
 +                                                      dst_buff_size, match_fun, copy_data_fun, data, rate, "Irecv");
      simgrid::kernel::actor::simcall_blocking([&observer, timeout] {
        simgrid::kernel::activity::ActivityImplPtr comm = simgrid::kernel::activity::CommImpl::irecv(&observer);
        comm->wait_for(observer.get_issuer(), timeout);
@@@ -333,8 -331,7 +333,8 @@@ Comm* Comm::do_start(
                                               clean_fun_,
                                               copy_data_function_,
                                               get_data<void>(),
 -                                             detached_};
 +                                             detached_,
 +                                             "Isend"};
      pimpl_ = kernel::actor::simcall_answered([&observer] { return kernel::activity::CommImpl::isend(&observer); },
                                               &observer);
    } else if (dst_buff_ != nullptr) { // Receiver side
                                               match_fun_,
                                               copy_data_function_,
                                               get_data<void>(),
 -                                             rate_};
 +                                             rate_,
 +                                             "Irecv"};
      pimpl_ = kernel::actor::simcall_answered([&observer] { return kernel::activity::CommImpl::irecv(&observer); },
                                               &observer);
    } else {
@@@ -426,7 -422,7 +426,7 @@@ Comm* Comm::wait_for(double timeout
      case State::STARTED:
        try {
          issuer = kernel::actor::ActorImpl::self();
 -        kernel::actor::ActivityWaitSimcall observer{issuer, pimpl_.get(), timeout};
 +        kernel::actor::ActivityWaitSimcall observer{issuer, pimpl_.get(), timeout, "Wait"};
          if (kernel::actor::simcall_blocking(
                  [&observer] { observer.get_activity()->wait_for(observer.get_issuer(), observer.get_timeout()); },
                  &observer)) {
diff --combined src/s4u/s4u_Task.cpp
@@@ -1,23 -1,21 +1,21 @@@
+ #include <memory>
  #include <simgrid/Exception.hpp>
- #include <simgrid/plugins/task.hpp>
 -#include <simgrid/s4u/Task.hpp>
  #include <simgrid/s4u/Comm.hpp>
  #include <simgrid/s4u/Exec.hpp>
  #include <simgrid/s4u/Io.hpp>
++#include <simgrid/s4u/Task.hpp>
  #include <simgrid/simix.hpp>
  
  #include "src/simgrid/module.hpp"
  
  SIMGRID_REGISTER_PLUGIN(task, "Battery management", nullptr)
- /** @defgroup plugin_task plugin_task Plugin Task
+ /**
    @beginrst
  
- This is the task plugin, enabling management of Tasks.
- To activate this plugin, first call :cpp:func:`Task::init`.
  
  Tasks are designed to represent dataflows, i.e, graphs of Tasks.
  Tasks can only be instancied using either
- :cpp:func:`simgrid::plugins::ExecTask::init` or :cpp:func:`simgrid::plugins::CommTask::init`
+ :cpp:func:`simgrid::s4u::ExecTask::init` or :cpp:func:`simgrid::s4u::CommTask::init`
  An ExecTask is an Execution Task. Its underlying Activity is an :ref:`Exec <API_s4u_Exec>`.
  A CommTask is a Communication Task. Its underlying Activity is a :ref:`Comm <API_s4u_Comm>`.
  
   */
  XBT_LOG_NEW_DEFAULT_SUBCATEGORY(Task, kernel, "Logging specific to the task plugin");
  
- namespace simgrid::plugins {
- xbt::Extension<s4u::Activity, ExtendedAttributeActivity> ExtendedAttributeActivity::EXTENSION_ID;
- xbt::signal<void(Task*)> Task::on_start;
- xbt::signal<void(Task*)> Task::on_end;
+ namespace simgrid::s4u {
  
  Task::Task(const std::string& name) : name_(name) {}
  
@@@ -40,7 -33,7 +33,7 @@@
   */
  bool Task::ready_to_run() const
  {
-   return not working_ && queued_execs_ > 0;
+   return not working_ && queued_firings_ > 0;
  }
  
  /**
  void Task::receive(Task* source)
  {
    XBT_DEBUG("Task %s received a token from %s", name_.c_str(), source->name_.c_str());
-   predecessors_[source]++;
-   bool enough_tokens = true;
+   auto source_count = predecessors_[source]++;
+   if (tokens_received_.size() <= queued_firings_ + source_count)
+     tokens_received_.push_back({});
+   tokens_received_[queued_firings_ + source_count][source] = source->token_;
 -  bool enough_tokens = true;
++  bool enough_tokens                                       = true;
    for (auto const& [key, val] : predecessors_)
      if (val < 1) {
        enough_tokens = false;
@@@ -62,7 -58,7 +58,7 @@@
    if (enough_tokens) {
      for (auto& [key, val] : predecessors_)
        val--;
-     enqueue_execs(1);
+     enqueue_firings(1);
    }
  }
  
   */
  void Task::complete()
  {
-   xbt_assert(s4u::Actor::is_maestro());
+   xbt_assert(Actor::is_maestro());
    working_ = false;
    count_++;
-   on_this_end_(this);
-   Task::on_end(this);
+   on_this_completion(this);
+   on_completion(this);
    if (current_activity_)
      previous_activity_ = std::move(current_activity_);
    for (auto const& t : successors_)
      fire();
  }
  
- /** @ingroup plugin_task
-  *  @brief Init the Task plugin.
-  *  @note Add a completion callback to all Activities to call Task::complete().
-  */
- void Task::init()
- {
-   static bool inited = false;
-   if (inited)
-     return;
-   inited                                  = true;
-   ExtendedAttributeActivity::EXTENSION_ID = simgrid::s4u::Activity::extension_create<ExtendedAttributeActivity>();
-   simgrid::s4u::Exec::on_completion_cb(
-       [](simgrid::s4u::Exec const& exec) { exec.extension<ExtendedAttributeActivity>()->task_->complete(); });
-   simgrid::s4u::Comm::on_completion_cb(
-       [](simgrid::s4u::Comm const& comm) { comm.extension<ExtendedAttributeActivity>()->task_->complete(); });
-   simgrid::s4u::Io::on_completion_cb(
-       [](simgrid::s4u::Io const& io) { io.extension<ExtendedAttributeActivity>()->task_->complete(); });
- }
- /** @ingroup plugin_task
-  *  @param n The number of executions to enqueue.
-  *  @brief Enqueue executions.
-  *  @note Immediatly starts an execution if possible.
+ /** @param n The number of firings to enqueue.
+  *  @brief Enqueue firing.
+  *  @note Immediatly fire an activity if possible.
   */
- void Task::enqueue_execs(int n)
+ void Task::enqueue_firings(int n)
  {
    simgrid::kernel::actor::simcall_answered([this, n] {
-     queued_execs_ += n;
+     queued_firings_ += n;
      if (ready_to_run())
        fire();
    });
  }
  
- /** @ingroup plugin_task
-  *  @param amount The amount to set.
+ /** @param amount The amount to set.
   *  @brief Set the amout of work to do.
   *  @note Amount in flop for ExecTask and in bytes for CommTask.
   */
@@@ -134,8 -108,33 +108,34 @@@ void Task::set_amount(double amount
    simgrid::kernel::actor::simcall_answered([this, amount] { amount_ = amount; });
  }
  
- /** @ingroup plugin_task
-  *  @param successor The Task to add.
+ /** @param token The token to set.
+  *  @brief Set the token to send to successors.
+  *  @note The token is passed to each successor after the task end, i.e., after the on_end callback.
+  */
+ void Task::set_token(std::shared_ptr<Token> token)
+ {
+   simgrid::kernel::actor::simcall_answered([this, token] { token_ = token; });
+ }
+ /** @return Map of tokens received for the next execution.
+  *  @note If there is no queued execution for this task the map might not exist or be partially empty.
+  */
+ std::shared_ptr<Token> Task::get_next_token_from(TaskPtr t)
+ {
+   return tokens_received_.front()[t];
+ }
 -void Task::fire() {
++void Task::fire()
++{
+   on_this_start(this);
+   on_start(this);
 -  working_ = true;
++  working_        = true;
+   queued_firings_ = std::max(queued_firings_ - 1, 0);
+   if (tokens_received_.size() > 0)
+     tokens_received_.pop_front();
+ }
+ /** @param successor The Task to add.
   *  @brief Add a successor to this Task.
   *  @note It also adds this as a predecessor of successor.
   */
@@@ -147,8 -146,7 +147,7 @@@ void Task::add_successor(TaskPtr succes
    });
  }
  
- /** @ingroup plugin_task
-  *  @param successor The Task to remove.
+ /** @param successor The Task to remove.
   *  @brief Remove a successor from this Task.
   *  @note It also remove this from the predecessors of successor.
   */
@@@ -171,34 -169,6 +170,6 @@@ void Task::remove_all_successors(
    });
  }
  
- /** @ingroup plugin_task
-  *  @param func The function to set.
-  *  @brief Set a function to be called before each execution.
-  *  @note The function is called before the underlying Activity starts.
-  */
- void Task::on_this_start_cb(const std::function<void(Task*)>& func)
- {
-   simgrid::kernel::actor::simcall_answered([this, &func] { on_this_start_.connect(func); });
- }
- /** @ingroup plugin_task
-  *  @param func The function to set.
-  *  @brief Set a function to be called after each execution.
-  *  @note The function is called after the underlying Activity ends, but before sending tokens to successors.
-  */
- void Task::on_this_end_cb(const std::function<void(Task*)>& func)
- {
-   simgrid::kernel::actor::simcall_answered([this, &func] { on_this_end_.connect(func); });
- }
- /** @ingroup plugin_task
-  *  @brief Return the number of completed executions.
-  */
- int Task::get_count() const
- {
-   return count_;
- }
  /**
   *  @brief Default constructor.
   */
@@@ -215,7 -185,7 +186,7 @@@ ExecTaskPtr ExecTask::init(const std::s
  /** @ingroup plugin_task
   *  @brief Smart Constructor.
   */
- ExecTaskPtr ExecTask::init(const std::string& name, double flops, s4u::Host* host)
+ ExecTaskPtr ExecTask::init(const std::string& name, double flops, Host* host)
  {
    return init(name)->set_flops(flops)->set_host(host);
  }
   */
  void ExecTask::fire()
  {
-   on_this_start_(this);
-   Task::on_start(this);
-   working_          = true;
-   queued_execs_     = std::max(queued_execs_ - 1, 0);
-   s4u::ExecPtr exec = s4u::Exec::init();
-   exec->set_name(name_);
-   exec->set_flops_amount(amount_);
-   exec->set_host(host_);
+   Task::fire();
+   auto exec = Exec::init()->set_name(get_name())->set_flops_amount(get_amount())->set_host(host_);
    exec->start();
-   exec->extension_set(new ExtendedAttributeActivity());
-   exec->extension<ExtendedAttributeActivity>()->task_ = this;
-   current_activity_                                   = exec;
+   exec->on_this_completion_cb([this](Exec const&) { this->complete(); });
+   set_current_activity(exec);
  }
  
  /** @ingroup plugin_task
   *  @param host The host to set.
   *  @brief Set a new host.
   */
- ExecTaskPtr ExecTask::set_host(s4u::Host* host)
+ ExecTaskPtr ExecTask::set_host(Host* host)
  {
    kernel::actor::simcall_answered([this, host] { host_ = host; });
    return this;
   */
  ExecTaskPtr ExecTask::set_flops(double flops)
  {
-   kernel::actor::simcall_answered([this, flops] { amount_ = flops; });
+   kernel::actor::simcall_answered([this, flops] { set_amount(flops); });
    return this;
  }
  
@@@ -276,7 -239,7 +240,7 @@@ CommTaskPtr CommTask::init(const std::s
  /** @ingroup plugin_task
   *  @brief Smart constructor.
   */
- CommTaskPtr CommTask::init(const std::string& name, double bytes, s4u::Host* source, s4u::Host* destination)
+ CommTaskPtr CommTask::init(const std::string& name, double bytes, Host* source, Host* destination)
  {
    return init(name)->set_bytes(bytes)->set_source(source)->set_destination(destination);
  }
   */
  void CommTask::fire()
  {
-   on_this_start_(this);
-   Task::on_start(this);
-   working_          = true;
-   queued_execs_     = std::max(queued_execs_ - 1, 0);
-   s4u::CommPtr comm = s4u::Comm::sendto_init(source_, destination_);
-   comm->set_name(name_);
-   comm->set_payload_size(amount_);
+   Task::fire();
+   auto comm = Comm::sendto_init(source_, destination_)->set_name(get_name())->set_payload_size(get_amount());
    comm->start();
-   comm->extension_set(new ExtendedAttributeActivity());
-   comm->extension<ExtendedAttributeActivity>()->task_ = this;
-   current_activity_                                   = comm;
+   comm->on_this_completion_cb([this](Comm const&) { this->complete(); });
+   set_current_activity(comm);
  }
  
  /** @ingroup plugin_task
   *  @param source The host to set.
   *  @brief Set a new source host.
   */
- CommTaskPtr CommTask::set_source(s4u::Host* source)
+ CommTaskPtr CommTask::set_source(Host* source)
  {
    kernel::actor::simcall_answered([this, source] { source_ = source; });
    return this;
   *  @param destination The host to set.
   *  @brief Set a new destination host.
   */
- CommTaskPtr CommTask::set_destination(s4u::Host* destination)
+ CommTaskPtr CommTask::set_destination(Host* destination)
  {
    kernel::actor::simcall_answered([this, destination] { destination_ = destination; });
    return this;
   */
  CommTaskPtr CommTask::set_bytes(double bytes)
  {
-   kernel::actor::simcall_answered([this, bytes] { amount_ = bytes; });
+   kernel::actor::simcall_answered([this, bytes] { set_amount(bytes); });
    return this;
  }
  
@@@ -346,7 -303,7 +304,7 @@@ IoTaskPtr IoTask::init(const std::strin
  /** @ingroup plugin_task
   *  @brief Smart Constructor.
   */
- IoTaskPtr IoTask::init(const std::string& name, double bytes, s4u::Disk* disk, s4u::Io::OpType type)
+ IoTaskPtr IoTask::init(const std::string& name, double bytes, Disk* disk, Io::OpType type)
  {
    return init(name)->set_bytes(bytes)->set_disk(disk)->set_op_type(type);
  }
   *  @param disk The disk to set.
   *  @brief Set a new disk.
   */
- IoTaskPtr IoTask::set_disk(s4u::Disk* disk)
+ IoTaskPtr IoTask::set_disk(Disk* disk)
  {
    kernel::actor::simcall_answered([this, disk] { disk_ = disk; });
    return this;
   */
  IoTaskPtr IoTask::set_bytes(double bytes)
  {
-   kernel::actor::simcall_answered([this, bytes] { amount_ = bytes; });
+   kernel::actor::simcall_answered([this, bytes] { set_amount(bytes); });
    return this;
  }
  
  /** @ingroup plugin_task */
- IoTaskPtr IoTask::set_op_type(s4u::Io::OpType type)
+ IoTaskPtr IoTask::set_op_type(Io::OpType type)
  {
    kernel::actor::simcall_answered([this, type] { type_ = type; });
    return this;
  
  void IoTask::fire()
  {
-   on_this_start_(this);
-   Task::on_start(this);
-   working_      = true;
-   queued_execs_ = std::max(queued_execs_ - 1, 0);
-   s4u::IoPtr io = s4u::Io::init();
-   io->set_name(name_);
-   io->set_size(amount_);
-   io->set_disk(disk_);
-   io->set_op_type(type_);
+   Task::fire();
+   auto io = Io::init()->set_name(get_name())->set_size(get_amount())->set_disk(disk_)->set_op_type(type_);
    io->start();
-   io->extension_set(new ExtendedAttributeActivity());
-   io->extension<ExtendedAttributeActivity>()->task_ = this;
-   current_activity_                                 = io;
+   io->on_this_completion_cb([this](Io const&) { this->complete(); });
+   set_current_activity(io);
  }
  
- } // namespace simgrid::plugins
+ } // namespace simgrid::s4u