* comm is a communication task.
*/
- #include "simgrid/plugins/task.hpp"
-#include "simgrid/s4u/Task.hpp"
#include "simgrid/s4u.hpp"
++#include "simgrid/s4u/Task.hpp"
#include <simgrid/plugins/file_system.h>
XBT_LOG_NEW_DEFAULT_CATEGORY(task_simple, "Messages specific for this task example");
read->add_successor(exec2);
// Add a function to be called when tasks end for log purpose
- simgrid::plugins::Task::on_end_cb([](const simgrid::plugins::Task* t) {
- sg4::Task::on_completion_cb([](const sg4::Task* t) {
-- XBT_INFO("Task %s finished (%d)", t->get_name().c_str(), t->get_count());
-- });
++ sg4::Task::on_completion_cb(
++ [](const sg4::Task* t) { XBT_INFO("Task %s finished (%d)", t->get_name().c_str(), t->get_count()); });
- // Enqueue two executions for task exec1
- exec1->enqueue_execs(2);
+ // Enqueue two firings for task exec1
+ exec1->enqueue_firings(2);
// Start the simulation
e.run();
comm->add_successor(exec2);
// Add a function to be called when tasks end for log purpose
- simgrid::plugins::Task::on_end_cb([](const simgrid::plugins::Task* t) {
- sg4::Task::on_completion_cb([](const sg4::Task* t) {
-- XBT_INFO("Task %s finished (%d)", t->get_name().c_str(), t->get_count());
-- });
++ sg4::Task::on_completion_cb(
++ [](const sg4::Task* t) { XBT_INFO("Task %s finished (%d)", t->get_name().c_str(), t->get_count()); });
- // Enqueue two executions for task exec1
- exec1->enqueue_execs(2);
+ // Enqueue two firings for task exec1
+ exec1->enqueue_firings(2);
// Start the simulation
e.run();
--- /dev/null
-/* This example takes the main concepts of Apache Storm presented here https://storm.apache.org/releases/2.4.0/Concepts.html
- and use them to build a simulation of a stream processing application
+ /* Copyright (c) 2017-2023. The SimGrid Team. All rights reserved. */
+
+ /* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
- Bolt B1 and B2 processes data from Spout SA alternatively. The quantity of work to process this data is 10 flops per bytes
- Bolt B3 processes data from Spout SB.
- Bolt B4 processes data from Bolt B3.
++/* This example takes the main concepts of Apache Storm presented here
++ https://storm.apache.org/releases/2.4.0/Concepts.html and use them to build a simulation of a stream processing
++ application
+
+ Spout SA produces data every 100ms. The volume produced is alternatively 1e3, 1e6 and 1e9 bytes.
+ Spout SB produces 1e6 bytes every 200ms.
+
- auto fafard = e.host_by_name("Fafard");
++ Bolt B1 and B2 processes data from Spout SA alternatively. The quantity of work to process this data is 10 flops per
++ bytes Bolt B3 processes data from Spout SB. Bolt B4 processes data from Bolt B3.
+
+ Fafard
+ ┌────┐
+ ┌──►│ B1 │
+ Tremblay │ └────┘
+ ┌────┐ │
+ │ SA ├────┤ Ginette
+ └────┘ │ ┌────┐
+ └──►│ B2 │
+ └────┘
+
+
+ Bourassa
+ Jupiter ┌──────────┐
+ ┌────┐ │ │
+ │ SB ├─────┤ B3 ──► B4│
+ └────┘ │ │
+ └──────────┘
+ */
+
+ #include "simgrid/s4u.hpp"
+
+ XBT_LOG_NEW_DEFAULT_CATEGORY(task_storm, "Messages specific for this s4u example");
+ namespace sg4 = simgrid::s4u;
+
+ int main(int argc, char* argv[])
+ {
+ sg4::Engine e(&argc, argv);
+ e.load_platform(argv[1]);
+
+ // Retrieve hosts
+ auto tremblay = e.host_by_name("Tremblay");
+ auto jupiter = e.host_by_name("Jupiter");
- SA->on_this_start_cb([SA_to_B1,SA_to_B2](sg4::Task* t) {
++ auto fafard = e.host_by_name("Fafard");
+ auto ginette = e.host_by_name("Ginette");
+ auto bourassa = e.host_by_name("Bourassa");
+
+ // Create execution tasks
+ auto SA = sg4::ExecTask::init("SA", tremblay->get_speed() * 0.1, tremblay);
+ auto SB = sg4::ExecTask::init("SB", jupiter->get_speed() * 0.2, jupiter);
+ auto B1 = sg4::ExecTask::init("B1", 1e8, fafard);
+ auto B2 = sg4::ExecTask::init("B2", 1e8, ginette);
+ auto B3 = sg4::ExecTask::init("B3", 1e8, bourassa);
+ auto B4 = sg4::ExecTask::init("B4", 2e8, bourassa);
+
+ // Create communication tasks
+ auto SA_to_B1 = sg4::CommTask::init("SA_to_B1", 0, tremblay, fafard);
+ auto SA_to_B2 = sg4::CommTask::init("SA_to_B2", 0, tremblay, ginette);
+ auto SB_to_B3 = sg4::CommTask::init("SB_to_B3", 1e6, jupiter, bourassa);
+
+ // Create the graph by defining dependencies between tasks
+ // Some dependencies are defined dynamically
+ SA_to_B1->add_successor(B1);
+ SA_to_B2->add_successor(B2);
+ SB->add_successor(SB_to_B3);
+ SB_to_B3->add_successor(B3);
+ B3->add_successor(B4);
+
+ /* Dynamic modification of the graph and bytes sent
+ Alternatively we: remove/add the link between SA and SA_to_B2
+ add/remove the link between SA and SA_to_B1
+ */
- }
- else {
++ SA->on_this_start_cb([SA_to_B1, SA_to_B2](sg4::Task* t) {
+ int count = t->get_count();
+ sg4::CommTaskPtr comm;
+ if (count % 2 == 0) {
+ t->remove_successor(SA_to_B2);
+ t->add_successor(SA_to_B1);
+ comm = SA_to_B1;
- std::vector<double> amount = {1e3,1e6,1e9};
++ } else {
+ t->remove_successor(SA_to_B1);
+ t->add_successor(SA_to_B2);
+ comm = SA_to_B2;
+ }
- SA_to_B1->on_this_start_cb([SA](sg4::Task* t) {
- t->set_token(t->get_next_token_from(SA));
- });
- SA_to_B2->on_this_start_cb([SA](sg4::Task* t) {
- t->set_token(t->get_next_token_from(SA));
- });
++ std::vector<double> amount = {1e3, 1e6, 1e9};
+ comm->set_amount(amount[count % 3]);
+ auto token = std::make_shared<sg4::Token>();
+ token->set_data(new double(amount[count % 3]));
+ t->set_token(token);
+ });
+
+ // The token sent by SA is forwarded by both communication tasks
- sg4::Task::on_completion_cb([]
- (const sg4::Task* t) {
- XBT_INFO("Task %s finished (%d)", t->get_name().c_str(), t->get_count());
- });
++ SA_to_B1->on_this_start_cb([SA](sg4::Task* t) { t->set_token(t->get_next_token_from(SA)); });
++ SA_to_B2->on_this_start_cb([SA](sg4::Task* t) { t->set_token(t->get_next_token_from(SA)); });
+
+ /* B1 and B2 read the value of the token received by their predecessors
+ and use it to adapt their amount of work to do.
+ */
+ B1->on_this_start_cb([SA_to_B1](sg4::Task* t) {
+ auto data = t->get_next_token_from(SA_to_B1)->get_unique_data<double>();
+ t->set_amount(*data * 10);
+ });
+ B2->on_this_start_cb([SA_to_B2](sg4::Task* t) {
+ auto data = t->get_next_token_from(SA_to_B2)->get_unique_data<double>();
+ t->set_amount(*data * 10);
+ });
+
+ // Enqueue firings for tasks without predecessors
+ SA->enqueue_firings(5);
+ SB->enqueue_firings(5);
+
+ // Add a function to be called when tasks end for log purpose
++ sg4::Task::on_completion_cb(
++ [](const sg4::Task* t) { XBT_INFO("Task %s finished (%d)", t->get_name().c_str(), t->get_count()); });
+
+ // Start the simulation
+ e.run();
+ return 0;
+ }
exec2->add_successor(comm2);
// Add a function to be called when tasks end for log purpose
- simgrid::plugins::Task::on_end_cb([](const simgrid::plugins::Task* t) {
- sg4::Task::on_completion_cb([](const sg4::Task* t) {
-- XBT_INFO("Task %s finished (%d)", t->get_name().c_str(), t->get_count());
-- });
++ sg4::Task::on_completion_cb(
++ [](const sg4::Task* t) { XBT_INFO("Task %s finished (%d)", t->get_name().c_str(), t->get_count()); });
- // Add a function to be called before each executions of comm0
+ // Add a function to be called before each firing of comm0
// This function modifies the graph of tasks by adding or removing
// successors to comm0
- comm0->on_this_start_cb([exec1, exec2, jupiter, fafard](simgrid::plugins::Task* t) {
- auto* comm0 = dynamic_cast<simgrid::plugins::CommTask*>(t);
+ comm0->on_this_start_cb([comm0, exec1, exec2, jupiter, fafard](sg4::Task*) {
static int count = 0;
if (count % 2 == 0) {
comm0->set_destination(jupiter);
comm->add_successor(exec);
// Add a function to be called when tasks end for log purpose
- simgrid::plugins::Task::on_end_cb([](const simgrid::plugins::Task* t) {
- sg4::Task::on_completion_cb([](const sg4::Task* t) {
-- XBT_INFO("Task %s finished (%d)", t->get_name().c_str(), t->get_count());
-- });
++ sg4::Task::on_completion_cb(
++ [](const sg4::Task* t) { XBT_INFO("Task %s finished (%d)", t->get_name().c_str(), t->get_count()); });
// Create the actor that will inject load during the simulation
- simgrid::s4u::Actor::create("input", tremblay, variable_load, comm);
+ sg4::Actor::create("input", tremblay, variable_load, comm);
// Start the simulation
e.run();
* dependency or no resource assigned) */
void on_this_veto_cb(const std::function<void(AnyActivity&)>& cb) { on_this_veto.connect(cb); }
- XBT_ATTRIB_DEPRECATED_v337("Please use on_suspend_cb() instead") static void on_suspended_cb(
- const std::function<void(Activity const&)>& cb) { on_suspend.connect(cb); }
- XBT_ATTRIB_DEPRECATED_v337("Please use on_resume_cb() instead") static void on_resumed_cb(
- const std::function<void(Activity const&)>& cb) { on_resume.connect(cb); }
+ XBT_ATTRIB_DEPRECATED_v338("Please use on_suspend_cb() instead") static void on_suspended_cb(
- const std::function<void(Activity const&)>& cb) { on_suspend.connect(cb); }
++ const std::function<void(Activity const&)>& cb)
++ {
++ on_suspend.connect(cb);
++ }
+ XBT_ATTRIB_DEPRECATED_v338("Please use on_resume_cb() instead") static void on_resumed_cb(
- const std::function<void(Activity const&)>& cb) { on_resume.connect(cb); }
++ const std::function<void(Activity const&)>& cb)
++ {
++ on_resume.connect(cb);
++ }
AnyActivity* add_successor(ActivityPtr a)
{
double get_load() const;
#ifndef DOXYGEN
- XBT_ATTRIB_DEPRECATED_v337("Please use get_load() instead") double get_usage() const
- XBT_ATTRIB_DEPRECATED_v338("Please use get_load() instead") double get_usage() const
-- {
-- return get_load();
-- }
++ XBT_ATTRIB_DEPRECATED_v338("Please use get_load() instead") double get_usage() const { return get_load(); }
#endif
/** @brief Check if the Link is used (at least one flow uses the link) */
class IoTask;
using IoTaskPtr = boost::intrusive_ptr<IoTask>;
- struct ExtendedAttributeActivity {
- static simgrid::xbt::Extension<simgrid::s4u::Activity, ExtendedAttributeActivity> EXTENSION_ID;
- Task* task_;
- };
+ class XBT_PUBLIC Token : public xbt::Extendable<Token> {};
class Task {
- int count_ = 0;
- bool working_ = false;
+ std::string name_;
+ double amount_;
+ int queued_firings_ = 0;
++ int count_ = 0;
++ bool working_ = false;
+
std::set<Task*> successors_ = {};
std::map<Task*, unsigned int> predecessors_ = {};
+ std::atomic_int_fast32_t refcount_{0};
bool ready_to_run() const;
void receive(Task* source);
- void complete();
+
+ std::shared_ptr<Token> token_ = nullptr;
+ std::deque<std::map<TaskPtr, std::shared_ptr<Token>>> tokens_received_;
+ ActivityPtr previous_activity_;
+ ActivityPtr current_activity_;
protected:
- std::string name_;
- double amount_;
- int queued_execs_ = 0;
- int count_ = 0;
- bool working_ = false;
- s4u::ActivityPtr previous_activity_;
- s4u::ActivityPtr current_activity_;
- xbt::signal<void(Task*)> on_this_start_;
- xbt::signal<void(Task*)> on_this_end_;
explicit Task(const std::string& name);
-- virtual ~Task() = default;
- virtual void fire() = 0;
++ virtual ~Task() = default;
- static xbt::signal<void(Task*)> on_start;
- static xbt::signal<void(Task*)> on_end;
- std::atomic_int_fast32_t refcount_{0};
+ virtual void fire();
+ void complete();
+
- void set_current_activity (ActivityPtr a) { current_activity_ = a; }
++ void set_current_activity(ActivityPtr a) { current_activity_ = a; }
+
+ inline static xbt::signal<void(Task*)> on_start;
+ xbt::signal<void(Task*)> on_this_start;
+ inline static xbt::signal<void(Task*)> on_completion;
+ xbt::signal<void(Task*)> on_this_completion;
public:
- static void init();
const std::string& get_name() const { return name_; }
const char* get_cname() const { return name_.c_str(); }
- void enqueue_execs(int n);
void set_amount(double amount);
double get_amount() const { return amount_; }
+ int get_count() const { return count_; }
+
+ void set_token(std::shared_ptr<Token> token);
+ std::shared_ptr<Token> get_next_token_from(TaskPtr t);
+
void add_successor(TaskPtr t);
void remove_successor(TaskPtr t);
void remove_all_successors();
const std::set<Task*>& get_successors() const { return successors_; }
- void on_this_start_cb(const std::function<void(Task*)>& func);
- void on_this_end_cb(const std::function<void(Task*)>& func);
- int get_count() const;
- /** Add a callback fired before a task activity start.
+ void enqueue_firings(int n);
+
+ /** Add a callback fired before this task activity starts */
- void on_this_start_cb(const std::function<void(Task*)>& func){ on_this_start.connect(func); }
++ void on_this_start_cb(const std::function<void(Task*)>& func) { on_this_start.connect(func); }
+ /** Add a callback fired before a task activity starts.
* Triggered after the on_this_start function**/
static void on_start_cb(const std::function<void(Task*)>& cb) { on_start.connect(cb); }
- /** Add a callback fired after a task activity end.
- * Triggered after the on_this_end function, but before
- * sending tokens to successors.**/
- static void on_end_cb(const std::function<void(Task*)>& cb) { on_end.connect(cb); }
+ /** Add a callback fired before this task activity ends */
+ void on_this_completion_cb(const std::function<void(Task*)>& func) { on_this_completion.connect(func); };
+ /** Add a callback fired after a task activity ends.
+ * Triggered after the on_this_end function, but before sending tokens to successors.**/
+ static void on_completion_cb(const std::function<void(Task*)>& cb) { on_completion.connect(cb); }
#ifndef DOXYGEN
friend void intrusive_ptr_release(Task* o)
#include <vector>
namespace py = pybind11;
- using simgrid::plugins::CommTask;
- using simgrid::plugins::CommTaskPtr;
- using simgrid::plugins::ExecTask;
- using simgrid::plugins::ExecTaskPtr;
- using simgrid::plugins::IoTask;
- using simgrid::plugins::IoTaskPtr;
- using simgrid::plugins::Task;
- using simgrid::plugins::TaskPtr;
-using simgrid::s4u::CommTask;
-using simgrid::s4u::CommTaskPtr;
-using simgrid::s4u::ExecTask;
-using simgrid::s4u::ExecTaskPtr;
-using simgrid::s4u::IoTask;
-using simgrid::s4u::IoTaskPtr;
-using simgrid::s4u::Task;
-using simgrid::s4u::TaskPtr;
using simgrid::s4u::Actor;
using simgrid::s4u::ActorPtr;
using simgrid::s4u::Barrier;
using simgrid::s4u::BarrierPtr;
using simgrid::s4u::Comm;
using simgrid::s4u::CommPtr;
++using simgrid::s4u::CommTask;
++using simgrid::s4u::CommTaskPtr;
using simgrid::s4u::Disk;
using simgrid::s4u::Engine;
++using simgrid::s4u::ExecTask;
++using simgrid::s4u::ExecTaskPtr;
using simgrid::s4u::Host;
using simgrid::s4u::Io;
++using simgrid::s4u::IoTask;
++using simgrid::s4u::IoTaskPtr;
using simgrid::s4u::Link;
using simgrid::s4u::Mailbox;
using simgrid::s4u::Mutex;
using simgrid::s4u::MutexPtr;
using simgrid::s4u::Semaphore;
using simgrid::s4u::SemaphorePtr;
++using simgrid::s4u::Task;
++using simgrid::s4u::TaskPtr;
XBT_LOG_NEW_DEFAULT_CATEGORY(python, "python");
if (mailbox_.count(cast_recv->get_mailbox()) > 0 and
mailbox_.at(cast_recv->get_mailbox()) > 0) {
aid_value--; // This means we have waiting recv corresponding to this recv
-- } else {
-- aid_value++;
-
-- }
++ } else {
++ aid_value++;
++ }
}
const CommSendTransition* cast_send = dynamic_cast<CommSendTransition const*>(transition);
+ #include <memory>
#include <simgrid/Exception.hpp>
- #include <simgrid/plugins/task.hpp>
-#include <simgrid/s4u/Task.hpp>
#include <simgrid/s4u/Comm.hpp>
#include <simgrid/s4u/Exec.hpp>
#include <simgrid/s4u/Io.hpp>
++#include <simgrid/s4u/Task.hpp>
#include <simgrid/simix.hpp>
#include "src/simgrid/module.hpp"
void Task::receive(Task* source)
{
XBT_DEBUG("Task %s received a token from %s", name_.c_str(), source->name_.c_str());
- predecessors_[source]++;
- bool enough_tokens = true;
+ auto source_count = predecessors_[source]++;
+ if (tokens_received_.size() <= queued_firings_ + source_count)
+ tokens_received_.push_back({});
+ tokens_received_[queued_firings_ + source_count][source] = source->token_;
- bool enough_tokens = true;
++ bool enough_tokens = true;
for (auto const& [key, val] : predecessors_)
if (val < 1) {
enough_tokens = false;
simgrid::kernel::actor::simcall_answered([this, amount] { amount_ = amount; });
}
- /** @ingroup plugin_task
- * @param successor The Task to add.
+ /** @param token The token to set.
+ * @brief Set the token to send to successors.
+ * @note The token is passed to each successor after the task end, i.e., after the on_end callback.
+ */
+ void Task::set_token(std::shared_ptr<Token> token)
+ {
+ simgrid::kernel::actor::simcall_answered([this, token] { token_ = token; });
+ }
+
+ /** @return Map of tokens received for the next execution.
+ * @note If there is no queued execution for this task the map might not exist or be partially empty.
+ */
+ std::shared_ptr<Token> Task::get_next_token_from(TaskPtr t)
+ {
+ return tokens_received_.front()[t];
+ }
+
-void Task::fire() {
++void Task::fire()
++{
+ on_this_start(this);
+ on_start(this);
- working_ = true;
++ working_ = true;
+ queued_firings_ = std::max(queued_firings_ - 1, 0);
+ if (tokens_received_.size() > 0)
+ tokens_received_.pop_front();
+ }
+
+ /** @param successor The Task to add.
* @brief Add a successor to this Task.
* @note It also adds this as a predecessor of successor.
*/