Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
task can now pass tokens (void*) to other tasks. add example using tokens and apache...
[simgrid.git] / examples / cpp / task-storm / s4u-task-storm.cpp
1 /* Copyright (c) 2017-2023. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 /* This example takes the main concepts of Apache Storm presented here https://storm.apache.org/releases/2.4.0/Concepts.html
7    and use them to build a simulation of a stream processing application
8
9    Spout SA produces data every 100ms. The volume produced is alternatively 1e3, 1e6 and 1e9 bytes.
10    Spout SB produces 1e6 bytes every 200ms.  
11    
12    Bolt B1 and B2 processes data from Spout SA alternatively. The quantity of work to process this data is 10 flops per bytes
13    Bolt B3 processes data from Spout SB.
14    Bolt B4 processes data from Bolt B3.
15    
16                         Fafard
17                         ┌────┐
18                     ┌──►│ B1 │
19          Tremblay   │   └────┘
20           ┌────┐    │
21           │ SA ├────┤  Ginette
22           └────┘    │   ┌────┐
23                     └──►│ B2 │
24                         └────┘
25
26
27                        Bourassa
28          Jupiter     ┌──────────┐
29           ┌────┐     │          │
30           │ SB ├─────┤ B3 ──► B4│
31           └────┘     │          │
32                      └──────────┘     
33  */
34
35 #include "simgrid/plugins/task.hpp"
36 #include "simgrid/s4u.hpp"
37
38 XBT_LOG_NEW_DEFAULT_CATEGORY(task_storm, "Messages specific for this s4u example");
39
40 struct Token {
41   double data_ = 0;
42   Token(double data) : data_(data) {}
43 };
44
45 int main(int argc, char* argv[])
46 {
47   simgrid::s4u::Engine e(&argc, argv);
48   e.load_platform(argv[1]);
49   simgrid::plugins::Task::init();
50
51   // Retrieve hosts
52   auto tremblay = e.host_by_name("Tremblay");
53   auto jupiter  = e.host_by_name("Jupiter");
54   auto fafard  = e.host_by_name("Fafard");
55   auto ginette  = e.host_by_name("Ginette");
56   auto bourassa = e.host_by_name("Bourassa");
57
58   // Create execution tasks
59   auto SA = simgrid::plugins::ExecTask::init("SA", tremblay->get_speed() * 0.1, tremblay);
60   auto SB = simgrid::plugins::ExecTask::init("SB", jupiter->get_speed() * 0.2, jupiter);
61   auto B1 = simgrid::plugins::ExecTask::init("B1", 1e8, fafard);
62   auto B2 = simgrid::plugins::ExecTask::init("B2", 1e8, ginette);
63   auto B3 = simgrid::plugins::ExecTask::init("B3", 1e8, bourassa);
64   auto B4 = simgrid::plugins::ExecTask::init("B4", 2e8, bourassa);
65
66   // Create communication tasks
67   auto SA_to_B1 = simgrid::plugins::CommTask::init("SA_to_B1", 0, tremblay, fafard); 
68   auto SA_to_B2 = simgrid::plugins::CommTask::init("SA_to_B2", 0, tremblay, ginette);
69   auto SB_to_B3 = simgrid::plugins::CommTask::init("SB_to_B3", 1e6, jupiter, bourassa);
70
71   // Create the graph by defining dependencies between tasks
72   // Some dependencies are defined dynamically
73   SA_to_B1->add_successor(B1);
74   SA_to_B2->add_successor(B2);
75   SB->add_successor(SB_to_B3);
76   SB_to_B3->add_successor(B3);
77   B3->add_successor(B4);
78
79   /* Dynamic modification of the graph and bytes sent
80      Alternatively we: remove/add the link between SA and SA_to_B2
81                        add/remove the link between SA and SA_to_B1
82   */
83   SA->on_this_start_cb([&](simgrid::plugins::Task* t) {
84     int count = t->get_count();
85     simgrid::plugins::CommTaskPtr comm;
86     if (count % 2 == 0) {
87       t->remove_successor(SA_to_B2);
88       t->add_successor(SA_to_B1);
89       comm = SA_to_B1;
90     }
91     else {
92       t->remove_successor(SA_to_B1);
93       t->add_successor(SA_to_B2);
94       comm = SA_to_B2;
95     }
96     std::vector<double> amount = {1e3,1e6,1e9};
97     comm->set_amount(amount[count % 3]);
98     auto token = std::make_shared<Token>(amount[count % 3]);
99     t->set_token(token);
100   });
101
102   // The token sent by SA is forwarded by both communication tasks
103   SA_to_B1->on_this_start_cb([&](simgrid::plugins::Task* t) {
104     t->set_token(t->get_tokens()[SA]);
105   });
106   SA_to_B2->on_this_start_cb([&](simgrid::plugins::Task* t) {
107     t->set_token(t->get_tokens()[SA]);
108   });
109
110   /* B1 and B2 read the value of the token received by their predecessors
111      and use it to adapt their amount of work to do.
112   */ 
113   B1->on_this_start_cb([&](simgrid::plugins::Task* t) {
114     auto tokens_map = t->get_tokens();
115     Token* tok = (Token*)(tokens_map[SA_to_B1].get());
116     t->set_amount(tok->data_ * 10);
117   });
118   B2->on_this_start_cb([&](simgrid::plugins::Task* t) {
119     auto tokens_map = t->get_tokens();
120     Token* tok = (Token*)(tokens_map[SA_to_B2].get());
121     t->set_amount(tok->data_ * 10);
122   });
123
124   // Enqueue executions for tasks without predecessors
125   SA->enqueue_execs(5);
126   SB->enqueue_execs(5);
127
128   // Add a function to be called when tasks end for log purpose
129   simgrid::plugins::Task::on_end_cb([]
130   (const simgrid::plugins::Task* t) {
131     XBT_INFO("Task %s finished (%d)", t->get_name().c_str(), t->get_count());
132   });
133
134   // Start the simulation
135   e.run();
136   return 0;
137 }