Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
cc8b0a3ffa5acfc12c7197045e2b13fe7d5b2586
[simgrid.git] / examples / msg / kadeploy / kadeploy.c
1 /* Copyright (c) 2007, 2008, 2009, 2010. The SimGrid Team.
2  * Copyright (c) 2012. Maximiliano Geier.
3  * All rights reserved.                                                     */
4
5 /* This program is free software; you can redistribute it and/or modify it
6  * under the terms of the license (GNU LGPL) which comes with this package. */
7
8 #include <stdio.h>
9 #include <stdlib.h>
10
11 #include "msg/msg.h"            /* Yeah! If you want to use msg, you need to include msg/msg.h */
12 #include "xbt/sysdep.h"         /* calloc */
13
14 /* Create a log channel to have nice outputs. */
15 #include "xbt/log.h"
16 #include "xbt/asserts.h"
17
18 /** @addtogroup MSG_examples
19  * 
20  *  - <b>kadeploy/kadeploy.c: Kadeploy implementation</b>.
21  */
22
23
24 XBT_LOG_NEW_DEFAULT_CATEGORY(msg_kadeploy,
25                              "Messages specific for kadeploy");
26
27 #define MESSAGE_SIZE 1
28 #define HOSTNAME_LENGTH 20
29
30 /*
31  Data structures
32  */
33
34 /* Random iterator for xbt_dynar */
35 typedef struct xbt_dynar_iterator_struct {
36   xbt_dynar_t list;
37   xbt_dynar_t indices_list;
38   int current;
39   unsigned long length;
40   int (*criteria_fn)(void* it);
41 } *xbt_dynar_iterator_t;
42 typedef struct xbt_dynar_iterator_struct xbt_dynar_iterator_s;
43
44 /* Messages enum */
45 typedef enum {
46   MESSAGE_BUILD_CHAIN = 0,
47   MESSAGE_SEND_DATA,
48   MESSAGE_END_DATA
49 } e_message_type;
50
51 /* Message struct */
52 typedef struct s_message {
53   e_message_type type;
54   const char *issuer_hostname;
55   const char *mailbox;
56   const char *prev_hostname;
57   const char *next_hostname;
58   const char *data_block;
59   unsigned int data_length;
60 } s_message_t, *message_t;
61
62 /* Iterator methods */
63 xbt_dynar_iterator_t xbt_dynar_iterator_new(xbt_dynar_t list, int (*criteria_fn)(void*));
64 void *xbt_dynar_iterator_next(xbt_dynar_iterator_t it);
65 void xbt_dynar_iterator_delete(xbt_dynar_iterator_t it);
66 int xbt_dynar_iterator_forward_criteria(void *p);
67
68 /* Message methods */
69 msg_task_t task_message_new(e_message_type type, const char *issuer_hostname, const char *mailbox);
70 msg_task_t task_message_chain_new(const char *issuer_hostname, const char *mailbox, const char* prev, const char *next);
71 msg_task_t task_message_data_new(const char *issuer_hostname, const char *mailbox, const char *block, unsigned int len);
72 msg_task_t task_message_end_data_new(const char *issuer_hostname, const char *mailbox);
73 void task_message_delete(void *);
74
75 /* Tasks */
76 int broadcaster(int argc, char *argv[]);
77 int peer(int argc, char *argv[]);
78
79 xbt_dynar_t build_hostlist_from_hostcount(int hostcount); 
80 /*xbt_dynar_t build_hostlist_from_argv(int argc, char *argv[]);*/
81
82 /* Broadcaster: helper functions */
83 int broadcaster_build_chain(xbt_dynar_t host_list);
84 int broadcaster_send_file(xbt_dynar_t host_list);
85 int broadcaster_finish(xbt_dynar_t host_list);
86
87 /* Peer: helper functions */
88 int peer_wait_for_init();
89
90 /* Initialization stuff */
91 msg_error_t test_all(const char *platform_file,
92                      const char *application_file);
93
94 /* Allocates and initializes a new xbt_dynar iterator for list, using criteria_fn as iteration criteria
95    criteria_fn: given an iterator, it must update the iterator and give the next element's index, 
96    less than 0 otherwise*/
97 xbt_dynar_iterator_t xbt_dynar_iterator_new(xbt_dynar_t list, int (*criteria_fn)(void*))
98 {
99   xbt_dynar_iterator_t it = xbt_new(xbt_dynar_iterator_s, 1);
100   
101   it->list = list;
102   it->length = xbt_dynar_length(list);
103   it->indices_list = xbt_dynar_new(sizeof(int), NULL);
104   it->criteria_fn = criteria_fn;
105   it->current = -1;
106 }
107
108 /* Returns the next element iterated by iterator it, NULL if there are no more elements */
109 void *xbt_dynar_iterator_next(xbt_dynar_iterator_t it)
110 {
111   int next = it->criteria_fn((xbt_dynar_iterator_t)it);
112   XBT_INFO("%d current\n", next);
113   if (next < 0) {
114     XBT_INFO("Nothing to return!\n");
115     return NULL;
116   } else {
117     xbt_dynar_push(it->indices_list, &next);
118     return xbt_dynar_get_ptr(it->list, next);
119   }
120 }
121
122 void xbt_dynar_iterator_delete(xbt_dynar_iterator_t it)
123 {
124   xbt_dynar_free_container(&(it->indices_list));
125   xbt_free_ref(&it);
126 }
127
128 int xbt_dynar_iterator_forward_criteria(void *p)
129 {
130   xbt_dynar_iterator_t it = (xbt_dynar_iterator_t)p;
131   int r = -1;
132   if (it->current == -1) {
133     /* iterator initialization */
134     it->current = 0;
135   }
136   if (it->current < it->length) {
137     r = it->current;
138     it->current++;
139   }
140
141   return r;
142 }
143
144 msg_task_t task_message_new(e_message_type type, const char *issuer_hostname, const char *mailbox)
145 {
146   message_t msg = xbt_new(s_message_t, 1);
147   msg->type = type;
148   msg->issuer_hostname = issuer_hostname;
149   msg->mailbox = mailbox;
150   msg_task_t task = MSG_task_create(NULL, 0, MESSAGE_SIZE, msg); 
151
152   return task;
153 }
154
155 msg_task_t task_message_chain_new(const char *issuer_hostname, const char *mailbox, const char* prev, const char *next)
156 {
157   msg_task_t task = task_message_new(MESSAGE_BUILD_CHAIN, issuer_hostname, mailbox);
158   message_t msg = MSG_task_get_data(task);
159   msg->prev_hostname = prev;
160   msg->next_hostname = next;
161
162   return task;
163 }
164
165 msg_task_t task_message_data_new(const char *issuer_hostname, const char *mailbox, const char *block, unsigned int len)
166 {
167   msg_task_t task = task_message_new(MESSAGE_SEND_DATA, issuer_hostname, mailbox);
168   message_t msg = MSG_task_get_data(task);
169   msg->data_block = block;
170   msg->data_length = len;
171
172   return task;
173 }
174
175 msg_task_t task_message_end_data_new(const char *issuer_hostname, const char *mailbox)
176 {
177   return task_message_new(MESSAGE_END_DATA, issuer_hostname, mailbox);
178 }
179
180
181 void task_message_delete(void *task)
182 {
183   message_t msg = MSG_task_get_data(task);
184   xbt_free(msg);
185   MSG_task_destroy(task);
186 }
187
188 xbt_dynar_t build_hostlist_from_hostcount(int hostcount)
189 {
190   xbt_dynar_t host_list = xbt_dynar_new(sizeof(char*), NULL);
191   char *hostname = NULL;
192   msg_host_t h = NULL;
193   int i = 1;
194   
195   for (; i < hostcount+1; i++) {
196     hostname = xbt_new(char, HOSTNAME_LENGTH);
197     snprintf(hostname, HOSTNAME_LENGTH, "host%d", i);
198     XBT_INFO("%s", hostname);
199     h = MSG_get_host_by_name(hostname);
200     if (h == NULL) {
201       XBT_INFO("Unknown host %s. Stopping Now! ", hostname);
202       abort();
203     } else {
204       xbt_dynar_push(host_list, &hostname);
205     }
206   }
207   return host_list;
208 }
209
210 /*xbt_dynar_t build_hostlist_from_argv(int argc, char *argv[])
211 {
212   xbt_dynar_t host_list = xbt_dynar_new(sizeof(char*), NULL);
213   msg_host_t h = NULL;
214   int i = 1;
215   
216   for (; i < argc; i++) {
217     XBT_INFO("host%d = %s", i, argv[i]);
218     h = MSG_get_host_by_name(argv[i]);
219     if (h == NULL) {
220       XBT_INFO("Unknown host %s. Stopping Now! ", argv[i]);
221       abort();
222     } else {
223       xbt_dynar_push(host_list, &(argv[i]));
224     }
225   }
226   return host_list;
227 }*/
228
229 void delete_hostlist(xbt_dynar_t h)
230 {
231   xbt_dynar_free(&h);
232 }
233
234 int broadcaster_build_chain(xbt_dynar_t host_list)
235 {
236   xbt_dynar_iterator_t it = xbt_dynar_iterator_new(host_list, xbt_dynar_iterator_forward_criteria);
237   msg_task_t task = NULL;
238   char **cur = (char**)xbt_dynar_iterator_next(it);
239   const char *current_host = NULL;
240   const char *prev = NULL;
241   const char *next = NULL;
242   const char *me = MSG_host_get_name(MSG_host_self());
243   const char *last = NULL;
244
245   /* Build the chain if there's at least one peer */
246   if (cur != NULL) {
247     /* init: prev=NULL, host=current cur, next=next cur */
248     next = *cur;
249
250     /* This iterator iterates one step ahead: cur is current iterated element, 
251        but it's actually the next one in the chain */
252     do {
253       /* following steps: prev=last, host=next, next=cur */
254       cur = (char**)xbt_dynar_iterator_next(it);
255       prev = last;
256       current_host = next;
257       if (cur != NULL)
258         next = *cur;
259       else
260         next = NULL;
261       XBT_INFO("Building chain -- broadcaster:\"%s\" dest:\"%s\" prev:\"%s\" next:\"%s\"", me, current_host, prev, next);
262     
263       /* Send message to current peer */
264       task = task_message_chain_new(me, current_host, prev, next);
265       MSG_task_send(task, current_host);
266
267       last = current_host;
268     } while (cur != NULL);
269   }
270   xbt_dynar_iterator_delete(it);
271
272   return MSG_OK;
273 }
274
275 int broadcaster_send_file(xbt_dynar_t host_list)
276 {
277   /* ... */
278
279   return MSG_OK;
280 }
281
282 int broadcaster_finish(xbt_dynar_t host_list)
283 {
284   xbt_dynar_iterator_t it = xbt_dynar_iterator_new(host_list, xbt_dynar_iterator_forward_criteria);
285   msg_task_t task = NULL;
286   const char *me = MSG_host_get_name(MSG_host_self());
287   const char *current_host = NULL;
288   char **cur = NULL;
289
290   /* Send goodbye message to every peer */
291   for (cur = (char**)xbt_dynar_iterator_next(it); cur != NULL; cur = (char**)xbt_dynar_iterator_next(it)) {
292       /* Send message to current peer */
293       current_host = *cur;
294       task = task_message_end_data_new(me, current_host);
295       MSG_task_send(task, current_host);
296   }
297
298   return MSG_OK;
299 }
300
301
302 /** Emitter function  */
303 int broadcaster(int argc, char *argv[])
304 {
305   xbt_dynar_t host_list = NULL;
306   const char *first = NULL;
307   int status = !MSG_OK;
308
309   XBT_INFO("broadcaster");
310
311   /* Check that every host given by the hostcount in argv[1] exists and add it
312      to a dynamic array */
313   host_list = build_hostlist_from_hostcount(atoi(argv[1]));
314   /*host_list = build_hostlist_from_argv(argc, argv);*/
315   
316   /* TODO: Error checking */
317   status = broadcaster_build_chain(host_list);
318   status = broadcaster_send_file(host_list);
319   status = broadcaster_finish(host_list);
320
321   delete_hostlist(host_list);
322
323   /* Latency */
324   /*time = MSG_get_clock();
325   sprintf(sprintf_buffer_la, "latency task");
326   task_la =
327       MSG_task_create(sprintf_buffer_la, 0.0, task_comm_size_lat, NULL);
328   task_la->data = xbt_new(double, 1);
329   *(double *) task_la->data = time;
330   XBT_INFO("task_la->data = %le", *((double *) task_la->data));
331   MSG_task_send(task_la, argv[1]);*/
332
333   /* Bandwidth */
334   /*time = MSG_get_clock();
335   sprintf(sprintf_buffer_bw, "bandwidth task");
336   task_bw =
337       MSG_task_create(sprintf_buffer_bw, 0.0, task_comm_size_bw, NULL);
338   task_bw->data = xbt_new(double, 1);
339   *(double *) task_bw->data = time;
340   XBT_INFO("task_bw->data = %le", *((double *) task_bw->data));
341   MSG_task_send(task_bw, argv[1]);
342   */
343   return status;
344 }
345
346 int peer_wait_for_init()
347 {
348   msg_task_t task = NULL;
349   const char *me = MSG_host_get_name(MSG_host_self());
350
351   int a = MSG_task_receive(&task, me);
352
353   if (a == MSG_OK) {
354     XBT_INFO("Peer %s got message\n", me);
355   }
356
357   task_message_delete(task);
358
359   return MSG_OK;
360 }
361
362 /** Peer function  */
363 int peer(int argc, char *argv[])
364 {
365   double time, time1, sender_time;
366   msg_task_t task_la = NULL;
367   msg_task_t task_bw = NULL;
368   int a;
369   double communication_time = 0;
370
371   XBT_INFO("peer");
372
373   time = MSG_get_clock();
374
375   a = peer_wait_for_init();
376   /* Get Latency */
377   /*a = MSG_task_receive(&task_la,MSG_host_get_name(MSG_host_self()));
378   if (a == MSG_OK) {
379     time1 = MSG_get_clock();
380     sender_time = *((double *) (task_la->data));
381     time = sender_time;
382     communication_time = time1 - time;
383     XBT_INFO("Task received : %s", task_la->name);
384     xbt_free(task_la->data);
385     MSG_task_destroy(task_la);
386     XBT_INFO("Communic. time %le", communication_time);
387     XBT_INFO("--- la %f ----", communication_time);
388   } else {
389     xbt_die("Unexpected behavior");
390   }*/
391
392
393   /* Get Bandwidth */
394   /*a = MSG_task_receive(&task_bw,MSG_host_get_name(MSG_host_self()));
395   if (a == MSG_OK) {
396     time1 = MSG_get_clock();
397     sender_time = *((double *) (task_bw->data));
398     time = sender_time;
399     communication_time = time1 - time;
400     XBT_INFO("Task received : %s", task_bw->name);
401     xbt_free(task_bw->data);
402     MSG_task_destroy(task_bw);
403     XBT_INFO("Communic. time %le", communication_time);
404     XBT_INFO("--- bw %f ----", task_comm_size_bw / communication_time);
405   } else {
406     xbt_die("Unexpected behavior");
407   }*/
408
409
410   return 0;
411 }                               /* end_of_receiver */
412
413
414 /** Test function */
415 msg_error_t test_all(const char *platform_file,
416                      const char *application_file)
417 {
418
419   msg_error_t res = MSG_OK;
420
421
422
423   XBT_INFO("test_all");
424
425   /*  Simulation setting */
426   MSG_create_environment(platform_file);
427
428   /*   Application deployment */
429   MSG_function_register("broadcaster", broadcaster);
430   MSG_function_register("peer", peer);
431
432   MSG_launch_application(application_file);
433
434   res = MSG_main();
435
436   return res;
437 }                               /* end_of_test_all */
438
439
440 /** Main function */
441 int main(int argc, char *argv[])
442 {
443   msg_error_t res = MSG_OK;
444
445 #ifdef _MSC_VER
446   unsigned int prev_exponent_format =
447       _set_output_format(_TWO_DIGIT_EXPONENT);
448 #endif
449
450   MSG_init(&argc, argv);
451
452
453   /*if (argc <= 3) {
454     XBT_CRITICAL("Usage: %s platform_file deployment_file <model>\n",
455               argv[0]);
456     XBT_CRITICAL
457         ("example: %s msg_platform.xml msg_deployment.xml KCCFLN05_Vegas\n",
458          argv[0]);
459     exit(1);
460   }*/
461
462   /* Options for the workstation/model:
463
464      KCCFLN05              => for maxmin
465      KCCFLN05_proportional => for proportional (Vegas)
466      KCCFLN05_Vegas        => for TCP Vegas
467      KCCFLN05_Reno         => for TCP Reno
468    */
469   //MSG_config("workstation/model", argv[3]);
470
471   res = test_all(argv[1], argv[2]);
472
473   XBT_INFO("Total simulation time: %le", MSG_get_clock());
474
475   MSG_clean();
476
477 #ifdef _MSC_VER
478   _set_output_format(prev_exponent_format);
479 #endif
480
481   if (res == MSG_OK)
482     return 0;
483   else
484     return 1;
485 }                               /* end_of_main */