/* This program is free software; you can redistribute it and/or modify it
* under the terms of the license (GNU LGPL) which comes with this package. */
- /* This example demonstrate basic use of the task plugin.
+ /* This example demonstrate basic use of tasks.
*
* We model the following graph:
*
* comm is a communication task.
*/
- #include "simgrid/plugins/task.hpp"
-#include "simgrid/s4u/Task.hpp"
#include "simgrid/s4u.hpp"
++#include "simgrid/s4u/Task.hpp"
#include <simgrid/plugins/file_system.h>
XBT_LOG_NEW_DEFAULT_CATEGORY(task_simple, "Messages specific for this task example");
+ namespace sg4 = simgrid::s4u;
int main(int argc, char* argv[])
{
- simgrid::s4u::Engine e(&argc, argv);
+ sg4::Engine e(&argc, argv);
e.load_platform(argv[1]);
- simgrid::plugins::Task::init();
// Retrieve hosts
auto* bob = e.host_by_name("bob");
auto* carl = e.host_by_name("carl");
// Create tasks
- auto exec1 = simgrid::plugins::ExecTask::init("exec1", 1e9, bob);
- auto exec2 = simgrid::plugins::ExecTask::init("exec2", 1e9, carl);
- auto write = simgrid::plugins::IoTask::init("write", 1e7, bob->get_disks().front(), simgrid::s4u::Io::OpType::WRITE);
- auto read = simgrid::plugins::IoTask::init("read", 1e7, carl->get_disks().front(), simgrid::s4u::Io::OpType::READ);
+ auto exec1 = sg4::ExecTask::init("exec1", 1e9, bob);
+ auto exec2 = sg4::ExecTask::init("exec2", 1e9, carl);
+ auto write = sg4::IoTask::init("write", 1e7, bob->get_disks().front(), sg4::Io::OpType::WRITE);
+ auto read = sg4::IoTask::init("read", 1e7, carl->get_disks().front(), sg4::Io::OpType::READ);
// Create the graph by defining dependencies between tasks
exec1->add_successor(write);
read->add_successor(exec2);
// Add a function to be called when tasks end for log purpose
- simgrid::plugins::Task::on_end_cb([](const simgrid::plugins::Task* t) {
- sg4::Task::on_completion_cb([](const sg4::Task* t) {
-- XBT_INFO("Task %s finished (%d)", t->get_name().c_str(), t->get_count());
-- });
++ sg4::Task::on_completion_cb(
++ [](const sg4::Task* t) { XBT_INFO("Task %s finished (%d)", t->get_name().c_str(), t->get_count()); });
- // Enqueue two executions for task exec1
- exec1->enqueue_execs(2);
+ // Enqueue two firings for task exec1
+ exec1->enqueue_firings(2);
// Start the simulation
e.run();
/* This program is free software; you can redistribute it and/or modify it
* under the terms of the license (GNU LGPL) which comes with this package. */
- /* This example demonstrate basic use of the task plugin.
+ /* This example demonstrate basic use of tasks.
*
* We model the following graph:
*
* comm is a communication task.
*/
- #include "simgrid/plugins/task.hpp"
#include "simgrid/s4u.hpp"
XBT_LOG_NEW_DEFAULT_CATEGORY(task_simple, "Messages specific for this task example");
+ namespace sg4 = simgrid::s4u;
+
int main(int argc, char* argv[])
{
- simgrid::s4u::Engine e(&argc, argv);
+ sg4::Engine e(&argc, argv);
e.load_platform(argv[1]);
- simgrid::plugins::Task::init();
// Retrieve hosts
auto* tremblay = e.host_by_name("Tremblay");
auto* jupiter = e.host_by_name("Jupiter");
// Create tasks
- auto exec1 = simgrid::plugins::ExecTask::init("exec1", 1e9, tremblay);
- auto exec2 = simgrid::plugins::ExecTask::init("exec2", 1e9, jupiter);
- auto comm = simgrid::plugins::CommTask::init("comm", 1e7, tremblay, jupiter);
+ auto exec1 = sg4::ExecTask::init("exec1", 1e9, tremblay);
+ auto exec2 = sg4::ExecTask::init("exec2", 1e9, jupiter);
+ auto comm = sg4::CommTask::init("comm", 1e7, tremblay, jupiter);
// Create the graph by defining dependencies between tasks
exec1->add_successor(comm);
comm->add_successor(exec2);
// Add a function to be called when tasks end for log purpose
- simgrid::plugins::Task::on_end_cb([](const simgrid::plugins::Task* t) {
- sg4::Task::on_completion_cb([](const sg4::Task* t) {
-- XBT_INFO("Task %s finished (%d)", t->get_name().c_str(), t->get_count());
-- });
++ sg4::Task::on_completion_cb(
++ [](const sg4::Task* t) { XBT_INFO("Task %s finished (%d)", t->get_name().c_str(), t->get_count()); });
- // Enqueue two executions for task exec1
- exec1->enqueue_execs(2);
+ // Enqueue two firings for task exec1
+ exec1->enqueue_firings(2);
// Start the simulation
e.run();
--- /dev/null
-/* This example takes the main concepts of Apache Storm presented here https://storm.apache.org/releases/2.4.0/Concepts.html
- and use them to build a simulation of a stream processing application
+ /* Copyright (c) 2017-2023. The SimGrid Team. All rights reserved. */
+
+ /* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
- Bolt B1 and B2 processes data from Spout SA alternatively. The quantity of work to process this data is 10 flops per bytes
- Bolt B3 processes data from Spout SB.
- Bolt B4 processes data from Bolt B3.
++/* This example takes the main concepts of Apache Storm presented here
++ https://storm.apache.org/releases/2.4.0/Concepts.html and use them to build a simulation of a stream processing
++ application
+
+ Spout SA produces data every 100ms. The volume produced is alternatively 1e3, 1e6 and 1e9 bytes.
+ Spout SB produces 1e6 bytes every 200ms.
+
- auto fafard = e.host_by_name("Fafard");
++ Bolt B1 and B2 processes data from Spout SA alternatively. The quantity of work to process this data is 10 flops per
++ bytes Bolt B3 processes data from Spout SB. Bolt B4 processes data from Bolt B3.
+
+ Fafard
+ ┌────┐
+ ┌──►│ B1 │
+ Tremblay │ └────┘
+ ┌────┐ │
+ │ SA ├────┤ Ginette
+ └────┘ │ ┌────┐
+ └──►│ B2 │
+ └────┘
+
+
+ Bourassa
+ Jupiter ┌──────────┐
+ ┌────┐ │ │
+ │ SB ├─────┤ B3 ──► B4│
+ └────┘ │ │
+ └──────────┘
+ */
+
+ #include "simgrid/s4u.hpp"
+
+ XBT_LOG_NEW_DEFAULT_CATEGORY(task_storm, "Messages specific for this s4u example");
+ namespace sg4 = simgrid::s4u;
+
+ int main(int argc, char* argv[])
+ {
+ sg4::Engine e(&argc, argv);
+ e.load_platform(argv[1]);
+
+ // Retrieve hosts
+ auto tremblay = e.host_by_name("Tremblay");
+ auto jupiter = e.host_by_name("Jupiter");
- SA->on_this_start_cb([SA_to_B1,SA_to_B2](sg4::Task* t) {
++ auto fafard = e.host_by_name("Fafard");
+ auto ginette = e.host_by_name("Ginette");
+ auto bourassa = e.host_by_name("Bourassa");
+
+ // Create execution tasks
+ auto SA = sg4::ExecTask::init("SA", tremblay->get_speed() * 0.1, tremblay);
+ auto SB = sg4::ExecTask::init("SB", jupiter->get_speed() * 0.2, jupiter);
+ auto B1 = sg4::ExecTask::init("B1", 1e8, fafard);
+ auto B2 = sg4::ExecTask::init("B2", 1e8, ginette);
+ auto B3 = sg4::ExecTask::init("B3", 1e8, bourassa);
+ auto B4 = sg4::ExecTask::init("B4", 2e8, bourassa);
+
+ // Create communication tasks
+ auto SA_to_B1 = sg4::CommTask::init("SA_to_B1", 0, tremblay, fafard);
+ auto SA_to_B2 = sg4::CommTask::init("SA_to_B2", 0, tremblay, ginette);
+ auto SB_to_B3 = sg4::CommTask::init("SB_to_B3", 1e6, jupiter, bourassa);
+
+ // Create the graph by defining dependencies between tasks
+ // Some dependencies are defined dynamically
+ SA_to_B1->add_successor(B1);
+ SA_to_B2->add_successor(B2);
+ SB->add_successor(SB_to_B3);
+ SB_to_B3->add_successor(B3);
+ B3->add_successor(B4);
+
+ /* Dynamic modification of the graph and bytes sent
+ Alternatively we: remove/add the link between SA and SA_to_B2
+ add/remove the link between SA and SA_to_B1
+ */
- }
- else {
++ SA->on_this_start_cb([SA_to_B1, SA_to_B2](sg4::Task* t) {
+ int count = t->get_count();
+ sg4::CommTaskPtr comm;
+ if (count % 2 == 0) {
+ t->remove_successor(SA_to_B2);
+ t->add_successor(SA_to_B1);
+ comm = SA_to_B1;
- std::vector<double> amount = {1e3,1e6,1e9};
++ } else {
+ t->remove_successor(SA_to_B1);
+ t->add_successor(SA_to_B2);
+ comm = SA_to_B2;
+ }
- SA_to_B1->on_this_start_cb([SA](sg4::Task* t) {
- t->set_token(t->get_next_token_from(SA));
- });
- SA_to_B2->on_this_start_cb([SA](sg4::Task* t) {
- t->set_token(t->get_next_token_from(SA));
- });
++ std::vector<double> amount = {1e3, 1e6, 1e9};
+ comm->set_amount(amount[count % 3]);
+ auto token = std::make_shared<sg4::Token>();
+ token->set_data(new double(amount[count % 3]));
+ t->set_token(token);
+ });
+
+ // The token sent by SA is forwarded by both communication tasks
- sg4::Task::on_completion_cb([]
- (const sg4::Task* t) {
- XBT_INFO("Task %s finished (%d)", t->get_name().c_str(), t->get_count());
- });
++ SA_to_B1->on_this_start_cb([SA](sg4::Task* t) { t->set_token(t->get_next_token_from(SA)); });
++ SA_to_B2->on_this_start_cb([SA](sg4::Task* t) { t->set_token(t->get_next_token_from(SA)); });
+
+ /* B1 and B2 read the value of the token received by their predecessors
+ and use it to adapt their amount of work to do.
+ */
+ B1->on_this_start_cb([SA_to_B1](sg4::Task* t) {
+ auto data = t->get_next_token_from(SA_to_B1)->get_unique_data<double>();
+ t->set_amount(*data * 10);
+ });
+ B2->on_this_start_cb([SA_to_B2](sg4::Task* t) {
+ auto data = t->get_next_token_from(SA_to_B2)->get_unique_data<double>();
+ t->set_amount(*data * 10);
+ });
+
+ // Enqueue firings for tasks without predecessors
+ SA->enqueue_firings(5);
+ SB->enqueue_firings(5);
+
+ // Add a function to be called when tasks end for log purpose
++ sg4::Task::on_completion_cb(
++ [](const sg4::Task* t) { XBT_INFO("Task %s finished (%d)", t->get_name().c_str(), t->get_count()); });
+
+ // Start the simulation
+ e.run();
+ return 0;
+ }
* With exec1 and exec2 on different hosts.
*/
- #include "simgrid/plugins/task.hpp"
#include "simgrid/s4u.hpp"
XBT_LOG_NEW_DEFAULT_CATEGORY(task_switch_host, "Messages specific for this task example");
+ namespace sg4 = simgrid::s4u;
int main(int argc, char* argv[])
{
- simgrid::s4u::Engine e(&argc, argv);
+ sg4::Engine e(&argc, argv);
e.load_platform(argv[1]);
- simgrid::plugins::Task::init();
// Retrieve hosts
auto* tremblay = e.host_by_name("Tremblay");
auto* fafard = e.host_by_name("Fafard");
// Create tasks
- auto comm0 = simgrid::plugins::CommTask::init("comm0");
+ auto comm0 = sg4::CommTask::init("comm0");
comm0->set_bytes(1e7);
comm0->set_source(tremblay);
- auto exec1 = simgrid::plugins::ExecTask::init("exec1", 1e9, jupiter);
- auto exec2 = simgrid::plugins::ExecTask::init("exec2", 1e9, fafard);
- auto comm1 = simgrid::plugins::CommTask::init("comm1", 1e7, jupiter, tremblay);
- auto comm2 = simgrid::plugins::CommTask::init("comm2", 1e7, fafard, tremblay);
+ auto exec1 = sg4::ExecTask::init("exec1", 1e9, jupiter);
+ auto exec2 = sg4::ExecTask::init("exec2", 1e9, fafard);
+ auto comm1 = sg4::CommTask::init("comm1", 1e7, jupiter, tremblay);
+ auto comm2 = sg4::CommTask::init("comm2", 1e7, fafard, tremblay);
// Create the initial graph by defining dependencies between tasks
comm0->add_successor(exec2);
exec2->add_successor(comm2);
// Add a function to be called when tasks end for log purpose
- simgrid::plugins::Task::on_end_cb([](const simgrid::plugins::Task* t) {
- sg4::Task::on_completion_cb([](const sg4::Task* t) {
-- XBT_INFO("Task %s finished (%d)", t->get_name().c_str(), t->get_count());
-- });
++ sg4::Task::on_completion_cb(
++ [](const sg4::Task* t) { XBT_INFO("Task %s finished (%d)", t->get_name().c_str(), t->get_count()); });
- // Add a function to be called before each executions of comm0
+ // Add a function to be called before each firing of comm0
// This function modifies the graph of tasks by adding or removing
// successors to comm0
- comm0->on_this_start_cb([exec1, exec2, jupiter, fafard](simgrid::plugins::Task* t) {
- auto* comm0 = dynamic_cast<simgrid::plugins::CommTask*>(t);
+ comm0->on_this_start_cb([comm0, exec1, exec2, jupiter, fafard](sg4::Task*) {
static int count = 0;
if (count % 2 == 0) {
comm0->set_destination(jupiter);
count++;
});
- // Enqueue four executions for task comm0
- comm0->enqueue_execs(4);
+ // Enqueue four firings for task comm0
+ comm0->enqueue_firings(4);
// Start the simulation
e.run();
* With a heavy load there is a burst of comm before the exec task can even finish once.
*/
- #include "simgrid/plugins/task.hpp"
#include "simgrid/s4u.hpp"
XBT_LOG_NEW_DEFAULT_CATEGORY(task_variable_load, "Messages specific for this s4u example");
+ namespace sg4 = simgrid::s4u;
- static void variable_load(simgrid::plugins::TaskPtr t)
+ static void variable_load(sg4::TaskPtr t)
{
XBT_INFO("--- Small load ---");
for (int i = 0; i < 3; i++) {
- t->enqueue_execs(1);
- simgrid::s4u::this_actor::sleep_for(100);
+ t->enqueue_firings(1);
+ sg4::this_actor::sleep_for(100);
}
- simgrid::s4u::this_actor::sleep_until(1000);
+ sg4::this_actor::sleep_until(1000);
XBT_INFO("--- Heavy load ---");
for (int i = 0; i < 3; i++) {
- t->enqueue_execs(1);
- simgrid::s4u::this_actor::sleep_for(1);
+ t->enqueue_firings(1);
+ sg4::this_actor::sleep_for(1);
}
}
int main(int argc, char* argv[])
{
- simgrid::s4u::Engine e(&argc, argv);
+ sg4::Engine e(&argc, argv);
e.load_platform(argv[1]);
- simgrid::plugins::Task::init();
// Retreive hosts
auto* tremblay = e.host_by_name("Tremblay");
auto* jupiter = e.host_by_name("Jupiter");
// Create tasks
- auto comm = simgrid::plugins::CommTask::init("comm", 1e7, tremblay, jupiter);
- auto exec = simgrid::plugins::ExecTask::init("exec", 1e9, jupiter);
+ auto comm = sg4::CommTask::init("comm", 1e7, tremblay, jupiter);
+ auto exec = sg4::ExecTask::init("exec", 1e9, jupiter);
// Create the graph by defining dependencies between tasks
comm->add_successor(exec);
// Add a function to be called when tasks end for log purpose
- simgrid::plugins::Task::on_end_cb([](const simgrid::plugins::Task* t) {
- sg4::Task::on_completion_cb([](const sg4::Task* t) {
-- XBT_INFO("Task %s finished (%d)", t->get_name().c_str(), t->get_count());
-- });
++ sg4::Task::on_completion_cb(
++ [](const sg4::Task* t) { XBT_INFO("Task %s finished (%d)", t->get_name().c_str(), t->get_count()); });
// Create the actor that will inject load during the simulation
- simgrid::s4u::Actor::create("input", tremblay, variable_load, comm);
+ sg4::Actor::create("input", tremblay, variable_load, comm);
// Start the simulation
e.run();
virtual void fire_on_this_veto() const = 0;
public:
- XBT_ATTRIB_DEPRECATED_v334("All start() are vetoable now. Please use start() ") void vetoable_start()
- {
- start();
- }
void start()
{
state_ = State::STARTING;
* dependency or no resource assigned) */
void on_this_veto_cb(const std::function<void(AnyActivity&)>& cb) { on_this_veto.connect(cb); }
- XBT_ATTRIB_DEPRECATED_v337("Please use on_suspend_cb() instead") static void on_suspended_cb(
- const std::function<void(Activity const&)>& cb) { on_suspend.connect(cb); }
- XBT_ATTRIB_DEPRECATED_v337("Please use on_resume_cb() instead") static void on_resumed_cb(
- const std::function<void(Activity const&)>& cb) { on_resume.connect(cb); }
+ XBT_ATTRIB_DEPRECATED_v338("Please use on_suspend_cb() instead") static void on_suspended_cb(
- const std::function<void(Activity const&)>& cb) { on_suspend.connect(cb); }
++ const std::function<void(Activity const&)>& cb)
++ {
++ on_suspend.connect(cb);
++ }
+ XBT_ATTRIB_DEPRECATED_v338("Please use on_resume_cb() instead") static void on_resumed_cb(
- const std::function<void(Activity const&)>& cb) { on_resume.connect(cb); }
++ const std::function<void(Activity const&)>& cb)
++ {
++ on_resume.connect(cb);
++ }
AnyActivity* add_successor(ActivityPtr a)
{
}
const std::string& get_tracing_category() const { return tracing_category_; }
- XBT_ATTRIB_DEPRECATED_v334("Please use Activity::set_data()") AnyActivity* set_user_data(void* data)
- {
- set_data(data);
- return static_cast<AnyActivity*>(this);
- }
-
- XBT_ATTRIB_DEPRECATED_v334("Please use Activity::get_data<>()") void* get_user_data() const
- {
- return get_data<void>();
- }
- XBT_ATTRIB_DEPRECATED_v334("All start() are vetoable now. Please use start() ") AnyActivity* vetoable_start()
- {
- return start();
- }
AnyActivity* start()
{
Activity::start();
double get_load() const;
#ifndef DOXYGEN
- XBT_ATTRIB_DEPRECATED_v337("Please use get_load() instead") double get_usage() const
- XBT_ATTRIB_DEPRECATED_v338("Please use get_load() instead") double get_usage() const
-- {
-- return get_load();
-- }
++ XBT_ATTRIB_DEPRECATED_v338("Please use get_load() instead") double get_usage() const { return get_load(); }
#endif
/** @brief Check if the Link is used (at least one flow uses the link) */
on_this_destruction.connect(cb);
}
- XBT_ATTRIB_DEPRECATED_v337("Please use on_onoff_cb() instead") static void on_state_change_cb(
+ XBT_ATTRIB_DEPRECATED_v338("Please use on_onoff_cb() instead") static void on_state_change_cb(
const std::function<void(Link const&)>& cb)
{
on_onoff.connect(cb);
- #ifndef SIMGRID_PLUGINS_TASK_H_
- #define SIMGRID_PLUGINS_TASK_H_
+ #ifndef SIMGRID_S4U_TASK_H_
+ #define SIMGRID_S4U_TASK_H_
#include <simgrid/s4u/Activity.hpp>
#include <simgrid/s4u/Io.hpp>
#include <xbt/Extendable.hpp>
#include <atomic>
+ #include <deque>
#include <map>
#include <memory>
#include <set>
- namespace simgrid::plugins {
+ namespace simgrid::s4u {
class Task;
using TaskPtr = boost::intrusive_ptr<Task>;
class IoTask;
using IoTaskPtr = boost::intrusive_ptr<IoTask>;
- struct ExtendedAttributeActivity {
- static simgrid::xbt::Extension<simgrid::s4u::Activity, ExtendedAttributeActivity> EXTENSION_ID;
- Task* task_;
- };
+ class XBT_PUBLIC Token : public xbt::Extendable<Token> {};
class Task {
- int count_ = 0;
- bool working_ = false;
+ std::string name_;
+ double amount_;
+ int queued_firings_ = 0;
++ int count_ = 0;
++ bool working_ = false;
+
std::set<Task*> successors_ = {};
std::map<Task*, unsigned int> predecessors_ = {};
+ std::atomic_int_fast32_t refcount_{0};
bool ready_to_run() const;
void receive(Task* source);
- void complete();
+
+ std::shared_ptr<Token> token_ = nullptr;
+ std::deque<std::map<TaskPtr, std::shared_ptr<Token>>> tokens_received_;
+ ActivityPtr previous_activity_;
+ ActivityPtr current_activity_;
protected:
- std::string name_;
- double amount_;
- int queued_execs_ = 0;
- int count_ = 0;
- bool working_ = false;
- s4u::ActivityPtr previous_activity_;
- s4u::ActivityPtr current_activity_;
- xbt::signal<void(Task*)> on_this_start_;
- xbt::signal<void(Task*)> on_this_end_;
explicit Task(const std::string& name);
-- virtual ~Task() = default;
- virtual void fire() = 0;
++ virtual ~Task() = default;
- static xbt::signal<void(Task*)> on_start;
- static xbt::signal<void(Task*)> on_end;
- std::atomic_int_fast32_t refcount_{0};
+ virtual void fire();
+ void complete();
+
- void set_current_activity (ActivityPtr a) { current_activity_ = a; }
++ void set_current_activity(ActivityPtr a) { current_activity_ = a; }
+
+ inline static xbt::signal<void(Task*)> on_start;
+ xbt::signal<void(Task*)> on_this_start;
+ inline static xbt::signal<void(Task*)> on_completion;
+ xbt::signal<void(Task*)> on_this_completion;
public:
- static void init();
const std::string& get_name() const { return name_; }
const char* get_cname() const { return name_.c_str(); }
- void enqueue_execs(int n);
void set_amount(double amount);
double get_amount() const { return amount_; }
+ int get_count() const { return count_; }
+
+ void set_token(std::shared_ptr<Token> token);
+ std::shared_ptr<Token> get_next_token_from(TaskPtr t);
+
void add_successor(TaskPtr t);
void remove_successor(TaskPtr t);
void remove_all_successors();
const std::set<Task*>& get_successors() const { return successors_; }
- void on_this_start_cb(const std::function<void(Task*)>& func);
- void on_this_end_cb(const std::function<void(Task*)>& func);
- int get_count() const;
- /** Add a callback fired before a task activity start.
+ void enqueue_firings(int n);
+
+ /** Add a callback fired before this task activity starts */
- void on_this_start_cb(const std::function<void(Task*)>& func){ on_this_start.connect(func); }
++ void on_this_start_cb(const std::function<void(Task*)>& func) { on_this_start.connect(func); }
+ /** Add a callback fired before a task activity starts.
* Triggered after the on_this_start function**/
static void on_start_cb(const std::function<void(Task*)>& cb) { on_start.connect(cb); }
- /** Add a callback fired after a task activity end.
- * Triggered after the on_this_end function, but before
- * sending tokens to successors.**/
- static void on_end_cb(const std::function<void(Task*)>& cb) { on_end.connect(cb); }
+ /** Add a callback fired before this task activity ends */
+ void on_this_completion_cb(const std::function<void(Task*)>& func) { on_this_completion.connect(func); };
+ /** Add a callback fired after a task activity ends.
+ * Triggered after the on_this_end function, but before sending tokens to successors.**/
+ static void on_completion_cb(const std::function<void(Task*)>& cb) { on_completion.connect(cb); }
#ifndef DOXYGEN
friend void intrusive_ptr_release(Task* o)
#endif
};
- class ExecTask : public Task {
- s4u::Host* host_;
+ class CommTask : public Task {
+ Host* source_;
+ Host* destination_;
- explicit ExecTask(const std::string& name);
+ explicit CommTask(const std::string& name);
void fire() override;
public:
- static ExecTaskPtr init(const std::string& name);
- static ExecTaskPtr init(const std::string& name, double flops, s4u::Host* host);
- ExecTaskPtr set_host(s4u::Host* host);
- s4u::Host* get_host() const { return host_; }
- ExecTaskPtr set_flops(double flops);
- double get_flops() const { return get_amount(); }
+ static CommTaskPtr init(const std::string& name);
+ static CommTaskPtr init(const std::string& name, double bytes, Host* source, Host* destination);
+
+ CommTaskPtr set_source(Host* source);
+ Host* get_source() const { return source_; }
+ CommTaskPtr set_destination(Host* destination);
+ Host* get_destination() const { return destination_; }
+ CommTaskPtr set_bytes(double bytes);
+ double get_bytes() const { return get_amount(); }
};
- class CommTask : public Task {
- s4u::Host* source_;
- s4u::Host* destination_;
+ class ExecTask : public Task {
+ Host* host_;
- explicit CommTask(const std::string& name);
+ explicit ExecTask(const std::string& name);
void fire() override;
public:
- static CommTaskPtr init(const std::string& name);
- static CommTaskPtr init(const std::string& name, double bytes, s4u::Host* source, s4u::Host* destination);
- CommTaskPtr set_source(s4u::Host* source);
- s4u::Host* get_source() const { return source_; }
- CommTaskPtr set_destination(s4u::Host* destination);
- s4u::Host* get_destination() const { return destination_; }
- CommTaskPtr set_bytes(double bytes);
- double get_bytes() const { return get_amount(); }
+ static ExecTaskPtr init(const std::string& name);
+ static ExecTaskPtr init(const std::string& name, double flops, Host* host);
+
+ ExecTaskPtr set_host(Host* host);
+ Host* get_host() const { return host_; }
+ ExecTaskPtr set_flops(double flops);
+ double get_flops() const { return get_amount(); }
};
class IoTask : public Task {
- s4u::Disk* disk_;
- s4u::Io::OpType type_;
+ Disk* disk_;
+ Io::OpType type_;
explicit IoTask(const std::string& name);
void fire() override;
public:
static IoTaskPtr init(const std::string& name);
- static IoTaskPtr init(const std::string& name, double bytes, s4u::Disk* disk, s4u::Io::OpType type);
- IoTaskPtr set_disk(s4u::Disk* disk);
- s4u::Disk* get_disk() const { return disk_; }
+ static IoTaskPtr init(const std::string& name, double bytes, Disk* disk, Io::OpType type);
+
+ IoTaskPtr set_disk(Disk* disk);
+ Disk* get_disk() const { return disk_; }
IoTaskPtr set_bytes(double bytes);
double get_bytes() const { return get_amount(); }
- IoTaskPtr set_op_type(s4u::Io::OpType type);
- s4u::Io::OpType get_op_type() const { return type_; }
+ IoTaskPtr set_op_type(Io::OpType type);
+ Io::OpType get_op_type() const { return type_; }
};
- } // namespace simgrid::plugins
+ } // namespace simgrid::s4u
#endif
#include "simgrid/kernel/ProfileBuilder.hpp"
#include "simgrid/kernel/routing/NetPoint.hpp"
#include <simgrid/Exception.hpp>
- #include <simgrid/plugins/task.hpp>
#include <simgrid/s4u/Actor.hpp>
#include <simgrid/s4u/Barrier.hpp>
#include <simgrid/s4u/Comm.hpp>
#include <simgrid/s4u/Mutex.hpp>
#include <simgrid/s4u/NetZone.hpp>
#include <simgrid/s4u/Semaphore.hpp>
+ #include <simgrid/s4u/Task.hpp>
#include <simgrid/version.h>
#include <algorithm>
#include <vector>
namespace py = pybind11;
- using simgrid::plugins::CommTask;
- using simgrid::plugins::CommTaskPtr;
- using simgrid::plugins::ExecTask;
- using simgrid::plugins::ExecTaskPtr;
- using simgrid::plugins::IoTask;
- using simgrid::plugins::IoTaskPtr;
- using simgrid::plugins::Task;
- using simgrid::plugins::TaskPtr;
-using simgrid::s4u::CommTask;
-using simgrid::s4u::CommTaskPtr;
-using simgrid::s4u::ExecTask;
-using simgrid::s4u::ExecTaskPtr;
-using simgrid::s4u::IoTask;
-using simgrid::s4u::IoTaskPtr;
-using simgrid::s4u::Task;
-using simgrid::s4u::TaskPtr;
using simgrid::s4u::Actor;
using simgrid::s4u::ActorPtr;
using simgrid::s4u::Barrier;
using simgrid::s4u::BarrierPtr;
using simgrid::s4u::Comm;
using simgrid::s4u::CommPtr;
++using simgrid::s4u::CommTask;
++using simgrid::s4u::CommTaskPtr;
using simgrid::s4u::Disk;
using simgrid::s4u::Engine;
++using simgrid::s4u::ExecTask;
++using simgrid::s4u::ExecTaskPtr;
using simgrid::s4u::Host;
using simgrid::s4u::Io;
++using simgrid::s4u::IoTask;
++using simgrid::s4u::IoTaskPtr;
using simgrid::s4u::Link;
using simgrid::s4u::Mailbox;
using simgrid::s4u::Mutex;
using simgrid::s4u::MutexPtr;
using simgrid::s4u::Semaphore;
using simgrid::s4u::SemaphorePtr;
++using simgrid::s4u::Task;
++using simgrid::s4u::TaskPtr;
XBT_LOG_NEW_DEFAULT_CATEGORY(python, "python");
return new simgrid::s4u::Engine(&argc, argv.data());
}),
"The constructor should take the parameters from the command line, as is ")
- .def_static("get_clock",
- []() // XBT_ATTRIB_DEPRECATED_v334
- {
- PyErr_WarnEx(
- PyExc_DeprecationWarning,
- "get_clock() is deprecated and will be dropped after v3.33, use `Engine.clock` instead.", 1);
- return Engine::get_clock();
- })
.def_property_readonly_static(
"clock", [](py::object /* self */) { return Engine::get_clock(); },
"The simulation time, ie the amount of simulated seconds since the simulation start.")
.def_property_readonly_static(
"instance", [](py::object /* self */) { return Engine::get_instance(); }, "Retrieve the simulation engine")
- .def("get_all_hosts",
- [](py::object self) // XBT_ATTRIB_DEPRECATED_v334
- {
- PyErr_WarnEx(PyExc_DeprecationWarning,
- "get_all_hosts() is deprecated and will be dropped after v3.33, use all_hosts instead.", 1);
- return self.attr("all_hosts");
- })
.def("host_by_name", &Engine::host_by_name_or_null,
"Retrieve a host by its name, or None if it does not exist in the platform.")
.def_property_readonly("all_hosts", &Engine::get_all_hosts, "Returns the list of all hosts found in the platform")
- .def("get_all_links",
- [](py::object self) // XBT_ATTRIB_DEPRECATED_v334
- {
- PyErr_WarnEx(PyExc_DeprecationWarning,
- "get_all_links() is deprecated and will be dropped after v3.33, use all_links instead.", 1);
- return self.attr("all_links");
- })
.def_property_readonly("all_links", &Engine::get_all_links, "Returns the list of all links found in the platform")
- .def("get_all_netpoints",
- [](py::object self) // XBT_ATTRIB_DEPRECATED_v334
- {
- PyErr_WarnEx(
- PyExc_DeprecationWarning,
- "get_all_netpoints() is deprecated and will be dropped after v3.33, use all_netpoints instead.", 1);
- return self.attr("all_netpoints");
- })
.def_property_readonly("all_netpoints", &Engine::get_all_netpoints)
- .def("get_netzone_root",
- [](py::object self) // XBT_ATTRIB_DEPRECATED_v334
- {
- PyErr_WarnEx(
- PyExc_DeprecationWarning,
- "get_netzone_root() is deprecated and will be dropped after v3.33, use netzone_root instead.", 1);
- return self.attr("netzone_root");
- })
.def_property_readonly("netzone_root", &Engine::get_netzone_root,
"Retrieve the root netzone, containing all others.")
.def("netpoint_by_name", &Engine::netpoint_by_name_or_null)
.def("create_router", &simgrid::s4u::NetZone::create_router, "Create a router")
.def("set_parent", &simgrid::s4u::NetZone::set_parent, "Set the parent of this zone")
.def("set_property", &simgrid::s4u::NetZone::set_property, "Add a property to this zone")
- .def("get_netpoint",
- [](py::object self) // XBT_ATTRIB_DEPRECATED_v334
- {
- PyErr_WarnEx(PyExc_DeprecationWarning,
- "get_netpoint() is deprecated and will be dropped after v3.33, use netpoint instead.", 1);
- return self.attr("netpoint");
- })
.def_property_readonly("netpoint", &simgrid::s4u::NetZone::get_netpoint,
"Retrieve the netpoint associated to this zone")
.def("seal", &simgrid::s4u::NetZone::seal, "Seal this NetZone")
" \"\"\"\n\n"
"The second function parameter is the periodicity: the time to wait after the last event to start again over "
"the list. Set it to -1 to not loop over.")
- .def("get_pstate_count",
- [](py::object self) // XBT_ATTRIB_DEPRECATED_v334
- {
- PyErr_WarnEx(
- PyExc_DeprecationWarning,
- "get_pstate_count() is deprecated and will be dropped after v3.33, use pstate_count instead.", 1);
- return self.attr("pstate_count");
- })
.def_property_readonly("pstate_count", &Host::get_pstate_count, "Retrieve the count of defined pstate levels")
- .def("get_pstate_speed",
- [](py::object self, int state) // XBT_ATTRIB_DEPRECATED_v334
- {
- PyErr_WarnEx(
- PyExc_DeprecationWarning,
- "get_pstate_speed() is deprecated and will be dropped after v3.33, use pstate_speed instead.", 1);
- return self.attr("pstate_speed")(state);
- })
.def("pstate_speed", &Host::get_pstate_speed, "Retrieve the maximal speed at the given pstate")
- .def("get_netpoint",
- [](py::object self) // XBT_ATTRIB_DEPRECATED_v334
- {
- PyErr_WarnEx(PyExc_DeprecationWarning,
- "get_netpoint() is deprecated and will be dropped after v3.33, use netpoint instead.", 1);
- return self.attr("netpoint");
- })
.def_property_readonly("netpoint", &Host::get_netpoint, "Retrieve the netpoint associated to this zone")
.def_property_readonly("disks", &Host::get_disks, "The list of disks on this host (read-only).")
.def("get_disks", &Host::get_disks, "Retrieve the list of disks in this host")
- .def("set_core_count",
- [](py::object self, double count) // XBT_ATTRIB_DEPRECATED_v334
- {
- PyErr_WarnEx(PyExc_DeprecationWarning,
- "set_core_count() is deprecated and will be dropped after v3.33, use core_count instead.",
- 1);
- self.attr("core_count")(count);
- })
.def_property("core_count", &Host::get_core_count,
py::cpp_function(&Host::set_core_count, py::call_guard<py::gil_scoped_release>()),
"Manage the number of cores in the CPU")
/* Class Split-Duplex Link */
py::class_<simgrid::s4u::SplitDuplexLink, Link, std::unique_ptr<simgrid::s4u::SplitDuplexLink, py::nodelete>>(
m, "SplitDuplexLink", "Network split-duplex link")
- .def("get_link_up",
- [](py::object self) // XBT_ATTRIB_DEPRECATED_v334
- {
- PyErr_WarnEx(PyExc_DeprecationWarning,
- "get_link_up() is deprecated and will be dropped after v3.33, use link_up instead.", 1);
- return self.attr("link_up");
- })
.def_property_readonly("link_up", &simgrid::s4u::SplitDuplexLink::get_link_up, "Get link direction up")
- .def("get_link_down",
- [](py::object self) // XBT_ATTRIB_DEPRECATED_v334
- {
- PyErr_WarnEx(PyExc_DeprecationWarning,
- "get_link_down() is deprecated and will be dropped after v3.33, use link_down instead.", 1);
- return self.attr("link_down");
- })
.def_property_readonly("link_down", &simgrid::s4u::SplitDuplexLink::get_link_down, "Get link direction down");
/* Class Mailbox */
/* Class Task */
py::class_<Task, TaskPtr>(m, "Task", "Task. See the C++ documentation for details.")
- .def_static("init", &Task::init)
.def_static(
"on_start_cb",
[](py::object cb) {
},
"Add a callback called when each task starts.")
.def_static(
- "on_end_cb",
+ "on_completion_cb",
[](py::object cb) {
cb.inc_ref(); // keep alive after return
const py::gil_scoped_release gil_release;
- Task::on_end_cb([cb_p = cb.ptr()](Task* op) {
+ Task::on_completion_cb([cb_p = cb.ptr()](Task* op) {
const py::gil_scoped_acquire py_context; // need a new context for callback
py::reinterpret_borrow<py::function>(cb_p)(op);
});
.def_property_readonly("count", &Task::get_count, "The execution count of this task (read-only).")
.def_property_readonly("successors", &Task::get_successors, "The successors of this task (read-only).")
.def_property("amount", &Task::get_amount, &Task::set_amount, "The amount of work to do for this task.")
- .def("enqueue_execs", py::overload_cast<int>(&Task::enqueue_execs), py::call_guard<py::gil_scoped_release>(),
- py::arg("n"), "Enqueue executions for this task.")
+ .def("enqueue_firings", py::overload_cast<int>(&Task::enqueue_firings), py::call_guard<py::gil_scoped_release>(),
+ py::arg("n"), "Enqueue firings for this task.")
.def("add_successor", py::overload_cast<TaskPtr>(&Task::add_successor), py::call_guard<py::gil_scoped_release>(),
py::arg("op"), "Add a successor to this task.")
.def("remove_successor", py::overload_cast<TaskPtr>(&Task::remove_successor),
"Remove all successors of this task.")
.def("on_this_start_cb", py::overload_cast<const std::function<void(Task*)>&>(&Task::on_this_start_cb),
py::arg("func"), "Add a callback called when this task starts.")
- .def("on_this_end_cb", py::overload_cast<const std::function<void(Task*)>&>(&Task::on_this_end_cb),
+ .def("on_this_completion_cb", py::overload_cast<const std::function<void(Task*)>&>(&Task::on_this_completion_cb),
py::arg("func"), "Add a callback called when this task ends.")
.def(
"__repr__", [](const TaskPtr op) { return "Task(" + op->get_name() + ")"; },
/** Wait MC guiding class that aims at minimizing the number of in-fly communication.
* When possible, it will try to match corresponding in-fly communications. */
class MaxMatchComm : public Strategy {
-
/** Stores for each mailbox what kind of transition is waiting on it.
* Negative number means that much recv are waiting on that mailbox, while
* a positiv number means that much send are waiting there. */
if (mailbox_.count(cast_recv->get_mailbox()) > 0 and
mailbox_.at(cast_recv->get_mailbox()) > 0) {
aid_value--; // This means we have waiting recv corresponding to this recv
-- } else {
-- aid_value++;
-
-- }
++ } else {
++ aid_value++;
++ }
}
const CommSendTransition* cast_send = dynamic_cast<CommSendTransition const*>(transition);
if (cast_send != nullptr)
last_mailbox_ = cast_send->get_mailbox();
}
-
};
} // namespace simgrid::mc
}
void Comm::copy_buffer_callback(kernel::activity::CommImpl* comm, void* buff,
- size_t buff_size) // XBT_ATTRIB_DEPRECATED_v337
+ size_t buff_size) // XBT_ATTRIB_DEPRECATED_v338
{
XBT_DEBUG("Copy the data over");
memcpy(comm->dst_buff_, buff, buff_size);
}
void Comm::copy_pointer_callback(kernel::activity::CommImpl* comm, void* buff,
- size_t buff_size) // XBT_ATTRIB_DEPRECATED_v337
+ size_t buff_size) // XBT_ATTRIB_DEPRECATED_v338
{
xbt_assert((buff_size == sizeof(void*)), "Cannot copy %zu bytes: must be sizeof(void*)", buff_size);
*(void**)(comm->dst_buff_) = buff;
simgrid::kernel::activity::ActivityImplPtr comm = nullptr;
simgrid::kernel::actor::CommIsendSimcall send_observer{
- sender, mbox->get_impl(), task_size, rate, static_cast<unsigned char*>(src_buff), src_buff_size, match_fun,
- nullptr, copy_data_fun, data, false};
+ sender, mbox->get_impl(), task_size, rate, static_cast<unsigned char*>(src_buff),
+ src_buff_size, match_fun, nullptr, copy_data_fun, data,
+ false, "Isend"};
comm = simgrid::kernel::actor::simcall_answered(
[&send_observer] { return simgrid::kernel::activity::CommImpl::isend(&send_observer); }, &send_observer);
- if (simgrid::kernel::actor::ActivityWaitSimcall wait_observer{sender, comm.get(), timeout};
+ if (simgrid::kernel::actor::ActivityWaitSimcall wait_observer{sender, comm.get(), timeout, "Wait"};
simgrid::kernel::actor::simcall_blocking(
[&wait_observer] {
wait_observer.get_activity()->wait_for(wait_observer.get_issuer(), wait_observer.get_timeout());
} else {
simgrid::kernel::actor::CommIsendSimcall observer(sender, mbox->get_impl(), task_size, rate,
static_cast<unsigned char*>(src_buff), src_buff_size, match_fun,
- nullptr, copy_data_fun, data, false);
+ nullptr, copy_data_fun, data, false, "Isend");
simgrid::kernel::actor::simcall_blocking([&observer, timeout] {
simgrid::kernel::activity::ActivityImplPtr comm = simgrid::kernel::activity::CommImpl::isend(&observer);
comm->wait_for(observer.get_issuer(), timeout);
match_fun,
copy_data_fun,
data,
- rate};
+ rate,
+ "Irecv"};
comm = simgrid::kernel::actor::simcall_answered(
[&observer] { return simgrid::kernel::activity::CommImpl::irecv(&observer); }, &observer);
- if (simgrid::kernel::actor::ActivityWaitSimcall wait_observer{receiver, comm.get(), timeout};
+ if (simgrid::kernel::actor::ActivityWaitSimcall wait_observer{receiver, comm.get(), timeout, "wait"};
simgrid::kernel::actor::simcall_blocking(
[&wait_observer] {
wait_observer.get_activity()->wait_for(wait_observer.get_issuer(), wait_observer.get_timeout());
comm = nullptr;
} else {
simgrid::kernel::actor::CommIrecvSimcall observer(receiver, mbox->get_impl(), static_cast<unsigned char*>(dst_buff),
- dst_buff_size, match_fun, copy_data_fun, data, rate);
+ dst_buff_size, match_fun, copy_data_fun, data, rate, "Irecv");
simgrid::kernel::actor::simcall_blocking([&observer, timeout] {
simgrid::kernel::activity::ActivityImplPtr comm = simgrid::kernel::activity::CommImpl::irecv(&observer);
comm->wait_for(observer.get_issuer(), timeout);
clean_fun_,
copy_data_function_,
get_data<void>(),
- detached_};
+ detached_,
+ "Isend"};
pimpl_ = kernel::actor::simcall_answered([&observer] { return kernel::activity::CommImpl::isend(&observer); },
&observer);
} else if (dst_buff_ != nullptr) { // Receiver side
match_fun_,
copy_data_function_,
get_data<void>(),
- rate_};
+ rate_,
+ "Irecv"};
pimpl_ = kernel::actor::simcall_answered([&observer] { return kernel::activity::CommImpl::irecv(&observer); },
&observer);
} else {
case State::STARTED:
try {
issuer = kernel::actor::ActorImpl::self();
- kernel::actor::ActivityWaitSimcall observer{issuer, pimpl_.get(), timeout};
+ kernel::actor::ActivityWaitSimcall observer{issuer, pimpl_.get(), timeout, "Wait"};
if (kernel::actor::simcall_blocking(
[&observer] { observer.get_activity()->wait_for(observer.get_issuer(), observer.get_timeout()); },
&observer)) {
+ #include <memory>
#include <simgrid/Exception.hpp>
- #include <simgrid/plugins/task.hpp>
-#include <simgrid/s4u/Task.hpp>
#include <simgrid/s4u/Comm.hpp>
#include <simgrid/s4u/Exec.hpp>
#include <simgrid/s4u/Io.hpp>
++#include <simgrid/s4u/Task.hpp>
#include <simgrid/simix.hpp>
#include "src/simgrid/module.hpp"
SIMGRID_REGISTER_PLUGIN(task, "Battery management", nullptr)
- /** @defgroup plugin_task plugin_task Plugin Task
-
+ /**
@beginrst
- This is the task plugin, enabling management of Tasks.
- To activate this plugin, first call :cpp:func:`Task::init`.
Tasks are designed to represent dataflows, i.e, graphs of Tasks.
Tasks can only be instancied using either
- :cpp:func:`simgrid::plugins::ExecTask::init` or :cpp:func:`simgrid::plugins::CommTask::init`
+ :cpp:func:`simgrid::s4u::ExecTask::init` or :cpp:func:`simgrid::s4u::CommTask::init`
An ExecTask is an Execution Task. Its underlying Activity is an :ref:`Exec <API_s4u_Exec>`.
A CommTask is a Communication Task. Its underlying Activity is a :ref:`Comm <API_s4u_Comm>`.
*/
XBT_LOG_NEW_DEFAULT_SUBCATEGORY(Task, kernel, "Logging specific to the task plugin");
- namespace simgrid::plugins {
-
- xbt::Extension<s4u::Activity, ExtendedAttributeActivity> ExtendedAttributeActivity::EXTENSION_ID;
-
- xbt::signal<void(Task*)> Task::on_start;
- xbt::signal<void(Task*)> Task::on_end;
+ namespace simgrid::s4u {
Task::Task(const std::string& name) : name_(name) {}
*/
bool Task::ready_to_run() const
{
- return not working_ && queued_execs_ > 0;
+ return not working_ && queued_firings_ > 0;
}
/**
void Task::receive(Task* source)
{
XBT_DEBUG("Task %s received a token from %s", name_.c_str(), source->name_.c_str());
- predecessors_[source]++;
- bool enough_tokens = true;
+ auto source_count = predecessors_[source]++;
+ if (tokens_received_.size() <= queued_firings_ + source_count)
+ tokens_received_.push_back({});
+ tokens_received_[queued_firings_ + source_count][source] = source->token_;
- bool enough_tokens = true;
++ bool enough_tokens = true;
for (auto const& [key, val] : predecessors_)
if (val < 1) {
enough_tokens = false;
if (enough_tokens) {
for (auto& [key, val] : predecessors_)
val--;
- enqueue_execs(1);
+ enqueue_firings(1);
}
}
*/
void Task::complete()
{
- xbt_assert(s4u::Actor::is_maestro());
+ xbt_assert(Actor::is_maestro());
working_ = false;
count_++;
- on_this_end_(this);
- Task::on_end(this);
+ on_this_completion(this);
+ on_completion(this);
if (current_activity_)
previous_activity_ = std::move(current_activity_);
for (auto const& t : successors_)
fire();
}
- /** @ingroup plugin_task
- * @brief Init the Task plugin.
- * @note Add a completion callback to all Activities to call Task::complete().
- */
- void Task::init()
- {
- static bool inited = false;
- if (inited)
- return;
-
- inited = true;
- ExtendedAttributeActivity::EXTENSION_ID = simgrid::s4u::Activity::extension_create<ExtendedAttributeActivity>();
- simgrid::s4u::Exec::on_completion_cb(
- [](simgrid::s4u::Exec const& exec) { exec.extension<ExtendedAttributeActivity>()->task_->complete(); });
- simgrid::s4u::Comm::on_completion_cb(
- [](simgrid::s4u::Comm const& comm) { comm.extension<ExtendedAttributeActivity>()->task_->complete(); });
- simgrid::s4u::Io::on_completion_cb(
- [](simgrid::s4u::Io const& io) { io.extension<ExtendedAttributeActivity>()->task_->complete(); });
- }
-
- /** @ingroup plugin_task
- * @param n The number of executions to enqueue.
- * @brief Enqueue executions.
- * @note Immediatly starts an execution if possible.
+ /** @param n The number of firings to enqueue.
+ * @brief Enqueue firing.
+ * @note Immediatly fire an activity if possible.
*/
- void Task::enqueue_execs(int n)
+ void Task::enqueue_firings(int n)
{
simgrid::kernel::actor::simcall_answered([this, n] {
- queued_execs_ += n;
+ queued_firings_ += n;
if (ready_to_run())
fire();
});
}
- /** @ingroup plugin_task
- * @param amount The amount to set.
+ /** @param amount The amount to set.
* @brief Set the amout of work to do.
* @note Amount in flop for ExecTask and in bytes for CommTask.
*/
simgrid::kernel::actor::simcall_answered([this, amount] { amount_ = amount; });
}
- /** @ingroup plugin_task
- * @param successor The Task to add.
+ /** @param token The token to set.
+ * @brief Set the token to send to successors.
+ * @note The token is passed to each successor after the task end, i.e., after the on_end callback.
+ */
+ void Task::set_token(std::shared_ptr<Token> token)
+ {
+ simgrid::kernel::actor::simcall_answered([this, token] { token_ = token; });
+ }
+
+ /** @return Map of tokens received for the next execution.
+ * @note If there is no queued execution for this task the map might not exist or be partially empty.
+ */
+ std::shared_ptr<Token> Task::get_next_token_from(TaskPtr t)
+ {
+ return tokens_received_.front()[t];
+ }
+
-void Task::fire() {
++void Task::fire()
++{
+ on_this_start(this);
+ on_start(this);
- working_ = true;
++ working_ = true;
+ queued_firings_ = std::max(queued_firings_ - 1, 0);
+ if (tokens_received_.size() > 0)
+ tokens_received_.pop_front();
+ }
+
+ /** @param successor The Task to add.
* @brief Add a successor to this Task.
* @note It also adds this as a predecessor of successor.
*/
});
}
- /** @ingroup plugin_task
- * @param successor The Task to remove.
+ /** @param successor The Task to remove.
* @brief Remove a successor from this Task.
* @note It also remove this from the predecessors of successor.
*/
});
}
- /** @ingroup plugin_task
- * @param func The function to set.
- * @brief Set a function to be called before each execution.
- * @note The function is called before the underlying Activity starts.
- */
- void Task::on_this_start_cb(const std::function<void(Task*)>& func)
- {
- simgrid::kernel::actor::simcall_answered([this, &func] { on_this_start_.connect(func); });
- }
-
- /** @ingroup plugin_task
- * @param func The function to set.
- * @brief Set a function to be called after each execution.
- * @note The function is called after the underlying Activity ends, but before sending tokens to successors.
- */
- void Task::on_this_end_cb(const std::function<void(Task*)>& func)
- {
- simgrid::kernel::actor::simcall_answered([this, &func] { on_this_end_.connect(func); });
- }
-
- /** @ingroup plugin_task
- * @brief Return the number of completed executions.
- */
- int Task::get_count() const
- {
- return count_;
- }
-
/**
* @brief Default constructor.
*/
/** @ingroup plugin_task
* @brief Smart Constructor.
*/
- ExecTaskPtr ExecTask::init(const std::string& name, double flops, s4u::Host* host)
+ ExecTaskPtr ExecTask::init(const std::string& name, double flops, Host* host)
{
return init(name)->set_flops(flops)->set_host(host);
}
*/
void ExecTask::fire()
{
- on_this_start_(this);
- Task::on_start(this);
- working_ = true;
- queued_execs_ = std::max(queued_execs_ - 1, 0);
- s4u::ExecPtr exec = s4u::Exec::init();
- exec->set_name(name_);
- exec->set_flops_amount(amount_);
- exec->set_host(host_);
+ Task::fire();
+ auto exec = Exec::init()->set_name(get_name())->set_flops_amount(get_amount())->set_host(host_);
exec->start();
- exec->extension_set(new ExtendedAttributeActivity());
- exec->extension<ExtendedAttributeActivity>()->task_ = this;
- current_activity_ = exec;
+ exec->on_this_completion_cb([this](Exec const&) { this->complete(); });
+ set_current_activity(exec);
}
/** @ingroup plugin_task
* @param host The host to set.
* @brief Set a new host.
*/
- ExecTaskPtr ExecTask::set_host(s4u::Host* host)
+ ExecTaskPtr ExecTask::set_host(Host* host)
{
kernel::actor::simcall_answered([this, host] { host_ = host; });
return this;
*/
ExecTaskPtr ExecTask::set_flops(double flops)
{
- kernel::actor::simcall_answered([this, flops] { amount_ = flops; });
+ kernel::actor::simcall_answered([this, flops] { set_amount(flops); });
return this;
}
/** @ingroup plugin_task
* @brief Smart constructor.
*/
- CommTaskPtr CommTask::init(const std::string& name, double bytes, s4u::Host* source, s4u::Host* destination)
+ CommTaskPtr CommTask::init(const std::string& name, double bytes, Host* source, Host* destination)
{
return init(name)->set_bytes(bytes)->set_source(source)->set_destination(destination);
}
*/
void CommTask::fire()
{
- on_this_start_(this);
- Task::on_start(this);
- working_ = true;
- queued_execs_ = std::max(queued_execs_ - 1, 0);
- s4u::CommPtr comm = s4u::Comm::sendto_init(source_, destination_);
- comm->set_name(name_);
- comm->set_payload_size(amount_);
+ Task::fire();
+ auto comm = Comm::sendto_init(source_, destination_)->set_name(get_name())->set_payload_size(get_amount());
comm->start();
- comm->extension_set(new ExtendedAttributeActivity());
- comm->extension<ExtendedAttributeActivity>()->task_ = this;
- current_activity_ = comm;
+ comm->on_this_completion_cb([this](Comm const&) { this->complete(); });
+ set_current_activity(comm);
}
/** @ingroup plugin_task
* @param source The host to set.
* @brief Set a new source host.
*/
- CommTaskPtr CommTask::set_source(s4u::Host* source)
+ CommTaskPtr CommTask::set_source(Host* source)
{
kernel::actor::simcall_answered([this, source] { source_ = source; });
return this;
* @param destination The host to set.
* @brief Set a new destination host.
*/
- CommTaskPtr CommTask::set_destination(s4u::Host* destination)
+ CommTaskPtr CommTask::set_destination(Host* destination)
{
kernel::actor::simcall_answered([this, destination] { destination_ = destination; });
return this;
*/
CommTaskPtr CommTask::set_bytes(double bytes)
{
- kernel::actor::simcall_answered([this, bytes] { amount_ = bytes; });
+ kernel::actor::simcall_answered([this, bytes] { set_amount(bytes); });
return this;
}
/** @ingroup plugin_task
* @brief Smart Constructor.
*/
- IoTaskPtr IoTask::init(const std::string& name, double bytes, s4u::Disk* disk, s4u::Io::OpType type)
+ IoTaskPtr IoTask::init(const std::string& name, double bytes, Disk* disk, Io::OpType type)
{
return init(name)->set_bytes(bytes)->set_disk(disk)->set_op_type(type);
}
* @param disk The disk to set.
* @brief Set a new disk.
*/
- IoTaskPtr IoTask::set_disk(s4u::Disk* disk)
+ IoTaskPtr IoTask::set_disk(Disk* disk)
{
kernel::actor::simcall_answered([this, disk] { disk_ = disk; });
return this;
*/
IoTaskPtr IoTask::set_bytes(double bytes)
{
- kernel::actor::simcall_answered([this, bytes] { amount_ = bytes; });
+ kernel::actor::simcall_answered([this, bytes] { set_amount(bytes); });
return this;
}
/** @ingroup plugin_task */
- IoTaskPtr IoTask::set_op_type(s4u::Io::OpType type)
+ IoTaskPtr IoTask::set_op_type(Io::OpType type)
{
kernel::actor::simcall_answered([this, type] { type_ = type; });
return this;
void IoTask::fire()
{
- on_this_start_(this);
- Task::on_start(this);
- working_ = true;
- queued_execs_ = std::max(queued_execs_ - 1, 0);
- s4u::IoPtr io = s4u::Io::init();
- io->set_name(name_);
- io->set_size(amount_);
- io->set_disk(disk_);
- io->set_op_type(type_);
+ Task::fire();
+ auto io = Io::init()->set_name(get_name())->set_size(get_amount())->set_disk(disk_)->set_op_type(type_);
io->start();
- io->extension_set(new ExtendedAttributeActivity());
- io->extension<ExtendedAttributeActivity>()->task_ = this;
- current_activity_ = io;
+ io->on_this_completion_cb([this](Io const&) { this->complete(); });
+ set_current_activity(io);
}
- } // namespace simgrid::plugins
+ } // namespace simgrid::s4u