]> AND Private Git Repository - loba.git/blob - simple_async.cpp
Logo AND Algorithmique Numérique Distribuée

Private GIT Repository
Use MSG_comm_waitall for communicator::flush(true).
[loba.git] / simple_async.cpp
1 #include <cstring>              // strlen
2 #include <cstdio>               // sprintf
3 #include <time.h>               // clock()
4 #include <msg/msg.h>
5 #include <xbt/log.h>
6 #include "simgrid_features.h"
7
8 // Creates a new log category and makes it the default
9 XBT_LOG_NEW_DEFAULT_CATEGORY(simu, "Simulation messages");
10
11 #define N_MBOX 5
12 #define N_MESG 16
13
14 // Failure exit status
15 enum {
16     EXIT_NO_FAILURE    = 0x00,  // no error
17     EXIT_FAILURE_ARGS  = 0x01,  // bad arguments
18     EXIT_FAILURE_INIT  = 0x02,  // failed to initialize simulator
19     EXIT_FAILURE_SIMU  = 0x04,  // simulation failed
20     EXIT_FAILURE_CLEAN = 0x08,  // error at cleanup
21 };
22
23 int sender(int, char* [])
24 {
25     char mbox_stack[N_MBOX][100];
26     msg_comm_t comm_stack[N_MBOX * N_MESG];
27     msg_comm_t* pcomm = comm_stack;
28     for (int i = 0 ; i < N_MBOX ; i++)
29         sprintf(mbox_stack[i], "MBox_%02d", i);
30
31     INFO0("Starting...");
32     int n = 0;
33     for (int i = 0 ; i < N_MBOX ; i++)
34         for (int j = 0 ; j < N_MESG ; j++) {
35             char task_name[100];
36             const char* mailbox = mbox_stack[i];
37             unsigned shift = j;
38             unsigned comm_size = 1 << shift;
39             m_task_t task;
40
41             sprintf(task_name, "Task_%02d", n);
42             task = MSG_task_create(task_name, 0, 1024.0 * comm_size, NULL);
43             INFO4("At %02d, send %s, size %.0f to \"%s\"", n,
44                   MSG_task_get_name(task),
45                   MSG_task_get_data_size(task), mailbox);
46             *pcomm++ = MSG_task_isend(task, mailbox);
47             ++n;
48         }
49
50     INFO0("Wait for communications to terminate...");
51     MSG_comm_waitall(comm_stack, pcomm - comm_stack, -1.0);
52     if (!MSG_WAIT_DESTROYS_COMMS) {
53         while (pcomm > comm_stack)
54             MSG_comm_destroy(*--pcomm);
55     }
56
57     INFO0("Finished.");
58     return 0;
59 }
60
61 int receiver(int, char* [])
62 {
63     char mbox[N_MBOX][100];
64     int comm_count[N_MBOX];
65     m_task_t tasks[N_MBOX];
66     msg_comm_t comms[N_MBOX];
67
68     for (int i = 0 ; i < N_MBOX ; i++) {
69         sprintf(mbox[i], "MBox_%02d", i);
70         comm_count[i] = N_MESG;
71         tasks[i] = NULL;
72         comms[i] = NULL;
73     }
74
75     INFO0("Starting...");
76     xbt_dynar_t dcomms = xbt_dynar_new(sizeof(msg_comm_t), NULL);
77     for (int i = 0 ; i < N_MBOX ; i++) {
78         if (comm_count[i] > 0) {
79             comms[i] = MSG_task_irecv(&tasks[i], mbox[i]);
80             xbt_dynar_push(dcomms, &comms[i]);
81             --comm_count[i];
82         }
83     }
84     int n = 0;
85     while (!xbt_dynar_is_empty(dcomms)) {
86         MSG_comm_waitany(dcomms);
87         xbt_dynar_reset(dcomms);
88         for (int i = 0 ; i < N_MBOX ; i++) {
89             if (!comms[i])
90                 continue;
91             if (!MSG_comm_test(comms[i])) {
92                 xbt_dynar_push(dcomms, &comms[i]);
93                 continue;
94             }
95             MSG_comm_destroy(comms[i]);
96             comms[i] = NULL;
97
98             INFO4("At %02d, received %s, size %.0f from \"%s\"", n++,
99                   MSG_task_get_name(tasks[i]),
100                   MSG_task_get_data_size(tasks[i]),
101                   mbox[i]);
102
103             MSG_task_destroy(tasks[i]);
104             tasks[i] = NULL;
105
106             if (comm_count[i] > 0) {
107                 comms[i] = MSG_task_irecv(&tasks[i], mbox[i]);
108                 xbt_dynar_push(dcomms, &comms[i]);
109                 --comm_count[i];
110             }
111         }
112     }
113     xbt_dynar_free(&dcomms);
114
115     INFO0("Finished.");
116     return 0;
117 }
118
119 int main(int argc, char* argv[])
120 {
121     const char* platform_file;
122     const char* application_file;
123     // Note: variables used after THROW must be declared as volatile.
124     volatile int exit_status;   // global exit status
125     volatile double simulated_time = -1.0;
126     volatile clock_t start_time = clock();
127     xbt_ex_t ex;
128     MSG_error_t res;
129
130     // Initialize some MSG internal data.
131     // Note: MSG_global_init() may throw an exception, but it seems
132     // impossible to catch it correctly :-(
133     MSG_global_init(&argc, argv);
134
135     exit_status = EXIT_FAILURE_ARGS; // =====
136     TRY {
137
138         // Parse global parameters
139         if (argc != 3) {
140             INFO1("Usage: %s platform_file application_file", argv[0]);
141             THROW0(0, 0, "Failed to parse command line\n");
142         }
143         platform_file = argv[1];
144         application_file = argv[2];
145
146         INFO0(",----[ Simulation parameters ]");
147         INFO1("| platform_file.....: \"%s\"", platform_file);
148         INFO1("| application_file..: \"%s\"", application_file);
149         INFO0("`----");
150
151         exit_status = EXIT_FAILURE_INIT; // =====
152
153         // Register the main functions of an agent in a global table.
154         MSG_function_register("sender", sender);
155         MSG_function_register("receiver", receiver);
156
157         // Create the platform and the application.
158         MSG_create_environment(platform_file);
159         MSG_launch_application(application_file);
160
161         exit_status = EXIT_FAILURE_SIMU; // =====
162
163         // Launch the MSG simulation.
164         INFO0("Starting simulation...");
165         res = MSG_main();
166         INFO0("Simulation ended.");
167         simulated_time = MSG_get_clock();
168         if (res != MSG_OK)
169             THROW1(0, 0, "MSG_main() failed with status %#x", res);
170
171         exit_status = EXIT_NO_FAILURE; // =====
172     }
173     CATCH (ex) {
174         int len = strlen(ex.msg);
175         if (len > 0 && ex.msg[len - 1] == '\n')
176             len--;              // strip the ending '\n'
177         ERROR2("%.*s", len, ex.msg);
178         DEBUG3("Error from %s() in %s:%d", ex.func, ex.file, ex.line);
179         xbt_ex_free(ex);
180     }
181
182     // Clean the MSG simulation.
183     res = MSG_clean();
184     if (res != MSG_OK) {
185         ERROR1("MSG_clean() failed with status %#x", res);
186         exit_status |= EXIT_FAILURE_CLEAN;
187     }
188
189     // Report final simulation status.
190     if (simulated_time >= 0.0) {
191         clock_t end_time = clock();
192         double simulation_time =
193             (double )(end_time - start_time) / CLOCKS_PER_SEC;
194         INFO0(",----[ Results ]");
195         INFO1("| Total simulated time...: %g", simulated_time);
196         INFO1("| Total simulation time..: %g", simulation_time);
197         INFO0("`----");
198     }
199     if (exit_status == 0)
200         INFO0("Simulation succeeded.");
201     else
202         ERROR1("Simulation failed (%#x).", exit_status);
203
204     return exit_status;
205 }
206
207 // Local variables:
208 // mode: c++
209 // End: