#include <xbt/Extendable.hpp>
#include <atomic>
+#include <deque>
#include <map>
#include <memory>
#include <set>
XBT_PUBLIC void intrusive_ptr_add_ref(Task* o);
class ExecTask;
using ExecTaskPtr = boost::intrusive_ptr<ExecTask>;
-XBT_PUBLIC void intrusive_ptr_release(ExecTask* e);
-XBT_PUBLIC void intrusive_ptr_add_ref(ExecTask* e);
class CommTask;
using CommTaskPtr = boost::intrusive_ptr<CommTask>;
-XBT_PUBLIC void intrusive_ptr_release(CommTask* c);
-XBT_PUBLIC void intrusive_ptr_add_ref(CommTask* c);
class IoTask;
using IoTaskPtr = boost::intrusive_ptr<IoTask>;
-XBT_PUBLIC void intrusive_ptr_release(IoTask* i);
-XBT_PUBLIC void intrusive_ptr_add_ref(IoTask* i);
struct ExtendedAttributeActivity {
static simgrid::xbt::Extension<simgrid::s4u::Activity, ExtendedAttributeActivity> EXTENSION_ID;
Task* task_;
};
+class XBT_PUBLIC Token : public xbt::Extendable<Token> {};
+
class Task {
- static bool inited_;
std::set<Task*> successors_ = {};
std::map<Task*, unsigned int> predecessors_ = {};
- void add_predecessor(Task* predecessor);
- void remove_predecessor(Task* predecessor);
bool ready_to_run() const;
void receive(Task* source);
void complete();
protected:
std::string name_;
double amount_;
+ std::shared_ptr<Token> token_ = nullptr;
+ std::deque<std::map<TaskPtr, std::shared_ptr<Token>>> tokens_received_;
int queued_execs_ = 0;
int count_ = 0;
bool working_ = false;
+ s4u::ActivityPtr previous_activity_;
s4u::ActivityPtr current_activity_;
- std::vector<std::function<void(Task*)>> end_func_handlers_;
- std::vector<std::function<void(Task*)>> start_func_handlers_;
+ xbt::signal<void(Task*)> on_this_start_;
+ xbt::signal<void(Task*)> on_this_end_;
explicit Task(const std::string& name);
virtual ~Task() = default;
virtual void fire() = 0;
void enqueue_execs(int n);
void set_amount(double amount);
double get_amount() const { return amount_; }
+ void set_token(std::shared_ptr<Token> token);
+ std::shared_ptr<Token> get_next_token_from(TaskPtr t);
void add_successor(TaskPtr t);
void remove_successor(TaskPtr t);
void remove_all_successors();
const std::set<Task*>& get_successors() const { return successors_; }
- void on_this_start(const std::function<void(Task*)>& func);
- void on_this_end(const std::function<void(Task*)>& func);
+ void on_this_start_cb(const std::function<void(Task*)>& func);
+ void on_this_end_cb(const std::function<void(Task*)>& func);
int get_count() const;
/** Add a callback fired before a task activity start.
s4u::Host* get_host() const { return host_; }
ExecTaskPtr set_flops(double flops);
double get_flops() const { return get_amount(); }
- friend void inline intrusive_ptr_release(ExecTask* e) { intrusive_ptr_release(static_cast<Task*>(e)); }
- friend void inline intrusive_ptr_add_ref(ExecTask* e) { intrusive_ptr_add_ref(static_cast<Task*>(e)); }
};
class CommTask : public Task {
s4u::Host* get_destination() const { return destination_; }
CommTaskPtr set_bytes(double bytes);
double get_bytes() const { return get_amount(); }
- friend void inline intrusive_ptr_release(CommTask* c) { intrusive_ptr_release(static_cast<Task*>(c)); }
- friend void inline intrusive_ptr_add_ref(CommTask* c) { intrusive_ptr_add_ref(static_cast<Task*>(c)); }
};
class IoTask : public Task {
IoTaskPtr set_disk(s4u::Disk* disk);
s4u::Disk* get_disk() const { return disk_; }
IoTaskPtr set_bytes(double bytes);
- double get_bytes() { return get_amount(); }
+ double get_bytes() const { return get_amount(); }
IoTaskPtr set_op_type(s4u::Io::OpType type);
- s4u::Io::OpType get_op_type() { return type_; }
-
- friend void inline intrusive_ptr_release(IoTask* i) { intrusive_ptr_release(static_cast<Task*>(i)); }
- friend void inline intrusive_ptr_add_ref(IoTask* i) { intrusive_ptr_add_ref(static_cast<Task*>(i)); }
+ s4u::Io::OpType get_op_type() const { return type_; }
};
} // namespace simgrid::plugins
#endif