Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
First working version with 8 peers I have not evaluated how it works yet, only that...
[simgrid.git] / examples / msg / kadeploy / kadeploy.c
1 /* Copyright (c) 2007, 2008, 2009, 2010. The SimGrid Team.
2  * Copyright (c) 2012. Maximiliano Geier.
3  * All rights reserved.                                                     */
4
5 /* This program is free software; you can redistribute it and/or modify it
6  * under the terms of the license (GNU LGPL) which comes with this package. */
7
8 #include <stdio.h>
9 #include <stdlib.h>
10
11 #include "msg/msg.h"            /* Yeah! If you want to use msg, you need to include msg/msg.h */
12 #include "xbt/sysdep.h"         /* calloc */
13
14 /* Create a log channel to have nice outputs. */
15 #include "xbt/log.h"
16 #include "xbt/asserts.h"
17
18 /** @addtogroup MSG_examples
19  * 
20  *  - <b>kadeploy/kadeploy.c: Kadeploy implementation</b>.
21  */
22
23
24 XBT_LOG_NEW_DEFAULT_CATEGORY(msg_kadeploy,
25                              "Messages specific for kadeploy");
26
27 #define MESSAGE_SIZE 1
28 #define PIECE_COUNT 100
29 #define HOSTNAME_LENGTH 20
30
31 #define PEER_SHUTDOWN_DEADLINE 600
32
33 /*
34  Data structures
35  */
36
37 /* Random iterator for xbt_dynar */
38 typedef struct xbt_dynar_iterator_struct {
39   xbt_dynar_t list;
40   xbt_dynar_t indices_list;
41   int current;
42   unsigned long length;
43   int (*criteria_fn)(void* it);
44 } *xbt_dynar_iterator_t;
45 typedef struct xbt_dynar_iterator_struct xbt_dynar_iterator_s;
46
47 /* Messages enum */
48 typedef enum {
49   MESSAGE_BUILD_CHAIN = 0,
50   MESSAGE_SEND_DATA,
51   MESSAGE_END_DATA
52 } e_message_type;
53
54 /* Message struct */
55 typedef struct s_message {
56   e_message_type type;
57   const char *issuer_hostname;
58   const char *mailbox;
59   const char *prev_hostname;
60   const char *next_hostname;
61   const char *data_block;
62   unsigned int data_length;
63 } s_message_t, *message_t;
64
65 /* Peer struct */
66 typedef struct s_peer {
67   int init;
68   const char *prev;
69   const char *next;
70   const char *me;
71   int pieces;
72   xbt_dynar_t pending_sends;
73   int close_asap; /* TODO: unused */
74 } s_peer_t, *peer_t;
75
76 /* Iterator methods */
77 xbt_dynar_iterator_t xbt_dynar_iterator_new(xbt_dynar_t list, int (*criteria_fn)(void*));
78 void *xbt_dynar_iterator_next(xbt_dynar_iterator_t it);
79 void xbt_dynar_iterator_delete(xbt_dynar_iterator_t it);
80 int xbt_dynar_iterator_forward_criteria(void *p);
81
82 /* Message methods */
83 msg_task_t task_message_new(e_message_type type, const char *issuer_hostname, const char *mailbox);
84 msg_task_t task_message_chain_new(const char *issuer_hostname, const char *mailbox, const char* prev, const char *next);
85 msg_task_t task_message_data_new(const char *issuer_hostname, const char *mailbox, const char *block, unsigned int len);
86 msg_task_t task_message_end_data_new(const char *issuer_hostname, const char *mailbox);
87 void task_message_delete(void *);
88
89 /* Tasks */
90 int broadcaster(int argc, char *argv[]);
91 int peer(int argc, char *argv[]);
92
93 xbt_dynar_t build_hostlist_from_hostcount(int hostcount); 
94 /*xbt_dynar_t build_hostlist_from_argv(int argc, char *argv[]);*/
95
96 /* Broadcaster: helper functions */
97 int broadcaster_build_chain(const char **first, xbt_dynar_t host_list);
98 int broadcaster_send_file(const char *first);
99 int broadcaster_finish(xbt_dynar_t host_list);
100
101 /* Peer: helper functions */
102 msg_error_t peer_wait_for_message(peer_t peer);
103 int peer_execute_task(peer_t peer, msg_task_t task);
104 void peer_init_chain(peer_t peer, message_t msg);
105 void peer_shutdown(peer_t p);
106 void peer_init(peer_t p);
107
108 /* Initialization stuff */
109 msg_error_t test_all(const char *platform_file,
110                      const char *application_file);
111
112 /* Allocates and initializes a new xbt_dynar iterator for list, using criteria_fn as iteration criteria
113    criteria_fn: given an iterator, it must update the iterator and give the next element's index, 
114    less than 0 otherwise*/
115 xbt_dynar_iterator_t xbt_dynar_iterator_new(xbt_dynar_t list, int (*criteria_fn)(void*))
116 {
117   xbt_dynar_iterator_t it = xbt_new(xbt_dynar_iterator_s, 1);
118   
119   it->list = list;
120   it->length = xbt_dynar_length(list);
121   it->indices_list = xbt_dynar_new(sizeof(int), NULL);
122   it->criteria_fn = criteria_fn;
123   it->current = -1;
124 }
125
126 /* Returns the next element iterated by iterator it, NULL if there are no more elements */
127 void *xbt_dynar_iterator_next(xbt_dynar_iterator_t it)
128 {
129   int next = it->criteria_fn((xbt_dynar_iterator_t)it);
130   //XBT_INFO("%d current\n", next);
131   if (next < 0) {
132     //XBT_INFO("Nothing to return!\n");
133     return NULL;
134   } else {
135     xbt_dynar_push(it->indices_list, &next);
136     return xbt_dynar_get_ptr(it->list, next);
137   }
138 }
139
140 void xbt_dynar_iterator_delete(xbt_dynar_iterator_t it)
141 {
142   xbt_dynar_free_container(&(it->indices_list));
143   xbt_free_ref(&it);
144 }
145
146 int xbt_dynar_iterator_forward_criteria(void *p)
147 {
148   xbt_dynar_iterator_t it = (xbt_dynar_iterator_t)p;
149   int r = -1;
150   if (it->current == -1) {
151     /* iterator initialization */
152     it->current = 0;
153   }
154   if (it->current < it->length) {
155     r = it->current;
156     it->current++;
157   }
158
159   return r;
160 }
161
162 msg_task_t task_message_new(e_message_type type, const char *issuer_hostname, const char *mailbox)
163 {
164   message_t msg = xbt_new(s_message_t, 1);
165   msg->type = type;
166   msg->issuer_hostname = issuer_hostname;
167   msg->mailbox = mailbox;
168   msg_task_t task = MSG_task_create(NULL, 0, MESSAGE_SIZE, msg); 
169
170   return task;
171 }
172
173 msg_task_t task_message_chain_new(const char *issuer_hostname, const char *mailbox, const char* prev, const char *next)
174 {
175   msg_task_t task = task_message_new(MESSAGE_BUILD_CHAIN, issuer_hostname, mailbox);
176   message_t msg = MSG_task_get_data(task);
177   msg->prev_hostname = prev;
178   msg->next_hostname = next;
179
180   return task;
181 }
182
183 msg_task_t task_message_data_new(const char *issuer_hostname, const char *mailbox, const char *block, unsigned int len)
184 {
185   msg_task_t task = task_message_new(MESSAGE_SEND_DATA, issuer_hostname, mailbox);
186   if (strcmp(mailbox, "host4") == 0) MSG_task_set_category(task, mailbox);
187   message_t msg = MSG_task_get_data(task);
188   msg->data_block = block;
189   msg->data_length = len;
190
191   return task;
192 }
193
194 msg_task_t task_message_end_data_new(const char *issuer_hostname, const char *mailbox)
195 {
196   return task_message_new(MESSAGE_END_DATA, issuer_hostname, mailbox);
197 }
198
199 void task_message_delete(void *task)
200 {
201   message_t msg = MSG_task_get_data(task);
202   xbt_free(msg);
203   MSG_task_destroy(task);
204 }
205
206 void queue_pending_connection(msg_comm_t comm, xbt_dynar_t q)
207 {
208   xbt_dynar_push(q, &comm);
209 }
210
211 int process_pending_connections(xbt_dynar_t q)
212 {
213   unsigned int iter;
214   int status;
215   int empty = 0;
216   msg_comm_t comm;
217
218   xbt_dynar_foreach(q, iter, comm) {
219     empty = 1;
220     if (MSG_comm_test(comm)) {
221       MSG_comm_destroy(comm);
222       status = MSG_comm_get_status(comm);
223       xbt_assert(status == MSG_OK, __FILE__ ": process_pending_connections() failed");
224       xbt_dynar_cursor_rm(q, &iter);
225       empty = 0;
226     }
227   }
228   return empty;
229 }
230
231 xbt_dynar_t build_hostlist_from_hostcount(int hostcount)
232 {
233   xbt_dynar_t host_list = xbt_dynar_new(sizeof(char*), NULL);
234   char *hostname = NULL;
235   msg_host_t h = NULL;
236   int i = 1;
237   
238   for (; i < hostcount+1; i++) {
239     hostname = xbt_new(char, HOSTNAME_LENGTH);
240     snprintf(hostname, HOSTNAME_LENGTH, "host%d", i);
241     //XBT_INFO("%s", hostname);
242     h = MSG_get_host_by_name(hostname);
243     if (h == NULL) {
244       XBT_INFO("Unknown host %s. Stopping Now! ", hostname);
245       abort();
246     } else {
247       xbt_dynar_push(host_list, &hostname);
248     }
249   }
250   return host_list;
251 }
252
253 /*xbt_dynar_t build_hostlist_from_argv(int argc, char *argv[])
254 {
255   xbt_dynar_t host_list = xbt_dynar_new(sizeof(char*), NULL);
256   msg_host_t h = NULL;
257   int i = 1;
258   
259   for (; i < argc; i++) {
260     XBT_INFO("host%d = %s", i, argv[i]);
261     h = MSG_get_host_by_name(argv[i]);
262     if (h == NULL) {
263       XBT_INFO("Unknown host %s. Stopping Now! ", argv[i]);
264       abort();
265     } else {
266       xbt_dynar_push(host_list, &(argv[i]));
267     }
268   }
269   return host_list;
270 }*/
271
272 void delete_hostlist(xbt_dynar_t h)
273 {
274   xbt_dynar_free(&h);
275 }
276
277 int broadcaster_build_chain(const char **first, xbt_dynar_t host_list)
278 {
279   xbt_dynar_iterator_t it = xbt_dynar_iterator_new(host_list, xbt_dynar_iterator_forward_criteria);
280   msg_task_t task = NULL;
281   char **cur = (char**)xbt_dynar_iterator_next(it);
282   const char *me = MSG_host_get_name(MSG_host_self());
283   const char *current_host = NULL;
284   const char *prev = NULL;
285   const char *next = NULL;
286   const char *last = NULL;
287
288   /* Build the chain if there's at least one peer */
289   if (cur != NULL) {
290     /* init: prev=NULL, host=current cur, next=next cur */
291     next = *cur;
292     *first = next;
293
294     /* This iterator iterates one step ahead: cur is current iterated element, 
295        but it's actually the next one in the chain */
296     do {
297       /* following steps: prev=last, host=next, next=cur */
298       cur = (char**)xbt_dynar_iterator_next(it);
299       prev = last;
300       current_host = next;
301       if (cur != NULL)
302         next = *cur;
303       else
304         next = NULL;
305       //XBT_INFO("Building chain -- broadcaster:\"%s\" dest:\"%s\" prev:\"%s\" next:\"%s\"", me, current_host, prev, next);
306     
307       /* Send message to current peer */
308       task = task_message_chain_new(me, current_host, prev, next);
309       //MSG_task_set_category(task, current_host);
310       MSG_task_send(task, current_host);
311
312       last = current_host;
313     } while (cur != NULL);
314   }
315   xbt_dynar_iterator_delete(it);
316
317   return MSG_OK;
318 }
319
320 int broadcaster_send_file(const char *first)
321 {
322   const char *me = MSG_host_get_name(MSG_host_self());
323   msg_task_t task = NULL;
324   msg_comm_t comm = NULL;
325   int status;
326
327   int piece_count = PIECE_COUNT;
328   int cur = 0;
329
330   for (; cur < piece_count; cur++) {
331     /* TODO: stub */
332     task = task_message_data_new(me, first, NULL, 0);
333     //XBT_INFO("Sending (isend) from %s into mailbox %s", me, first);
334     //comm = MSG_task_isend(task, first);
335     status = MSG_task_send(task, first);
336     //MSG_task_dsend(task, first, task_message_delete);
337    
338     //status = MSG_comm_wait(comm, -1);
339     xbt_assert(status == MSG_OK, __FILE__ ": broadcaster_send_file() failed");
340     //MSG_comm_destroy(comm);
341   }
342
343   return MSG_OK;
344 }
345
346 int broadcaster_finish(xbt_dynar_t host_list)
347 {
348   xbt_dynar_iterator_t it = xbt_dynar_iterator_new(host_list, xbt_dynar_iterator_forward_criteria);
349   msg_task_t task = NULL;
350   const char *me = MSG_host_get_name(MSG_host_self());
351   const char *current_host = NULL;
352   char **cur = NULL;
353
354   /* Send goodbye message to every peer */
355   for (cur = (char**)xbt_dynar_iterator_next(it); cur != NULL; cur = (char**)xbt_dynar_iterator_next(it)) {
356     /* Send message to current peer */
357     current_host = *cur;
358     task = task_message_end_data_new(me, current_host);
359     //MSG_task_set_category(task, current_host);
360     MSG_task_send(task, current_host);
361   }
362
363   return MSG_OK;
364 }
365
366
367 /** Emitter function  */
368 int broadcaster(int argc, char *argv[])
369 {
370   xbt_dynar_t host_list = NULL;
371   const char *first = NULL;
372   int status = !MSG_OK;
373
374   XBT_INFO("broadcaster");
375
376   /* Check that every host given by the hostcount in argv[1] exists and add it
377      to a dynamic array */
378   host_list = build_hostlist_from_hostcount(atoi(argv[1]));
379   /*host_list = build_hostlist_from_argv(argc, argv);*/
380   
381   /* TODO: Error checking */
382   status = broadcaster_build_chain(&first, host_list);
383   status = broadcaster_send_file(first);
384   status = broadcaster_finish(host_list);
385
386   delete_hostlist(host_list);
387
388   return status;
389 }
390
391 /*******************************************************
392  *                     Peer                            *
393  *******************************************************/
394
395 void peer_init_chain(peer_t peer, message_t msg)
396 {
397   peer->prev = msg->prev_hostname;
398   peer->next = msg->next_hostname;
399   peer->init = 1;
400 }
401
402 void peer_forward_msg(peer_t peer, message_t msg)
403 {
404   int status;
405   msg_task_t task = task_message_data_new(peer->me, peer->next, NULL, 0);
406   msg_comm_t comm = NULL;
407   //XBT_INFO("Sending (isend) from %s into mailbox %s", peer->me, peer->next);
408   comm = MSG_task_isend(task, peer->next);
409   queue_pending_connection(comm, peer->pending_sends);
410 }
411
412 int peer_execute_task(peer_t peer, msg_task_t task)
413 {
414   int done = 0;
415   message_t msg = MSG_task_get_data(task);
416   
417   //XBT_INFO("Peer %s got message of type %d\n", peer->me, msg->type);
418   switch (msg->type) {
419     case MESSAGE_BUILD_CHAIN:
420       peer_init_chain(peer, msg);
421       break;
422     case MESSAGE_SEND_DATA:
423       xbt_assert(peer->init, __FILE__ ": peer_execute_task() failed: got msg_type %d before initialization", msg->type);
424       if (peer->next != NULL)
425         peer_forward_msg(peer, msg);
426       peer->pieces++;
427       break;
428     case MESSAGE_END_DATA:
429       xbt_assert(peer->init, __FILE__ ": peer_execute_task() failed: got msg_type %d before initialization", msg->type);
430       done = 1;
431       XBT_INFO("%d pieces receieved", peer->pieces);
432       break;
433   }
434
435   MSG_task_execute(task);
436
437   return done;
438 }
439
440 msg_error_t peer_wait_for_message(peer_t peer)
441 {
442   msg_error_t status;
443   msg_comm_t comm = NULL;
444   msg_task_t task = NULL;
445   int done = 0;
446
447   while (!done) {
448     if (comm == NULL)
449       comm = MSG_task_irecv(&task, peer->me);
450
451     if (MSG_comm_test(comm)) {
452       status = MSG_comm_get_status(comm);
453       //XBT_INFO("peer_wait_for_message: error code = %d", status);
454       xbt_assert(status == MSG_OK, __FILE__ ": peer_wait_for_message() failed");
455       MSG_comm_destroy(comm);
456       comm = NULL;
457       done = peer_execute_task(peer, task);
458       task_message_delete(task);
459       task = NULL;
460     } else {
461       process_pending_connections(peer->pending_sends);
462       MSG_process_sleep(0.01);
463     }
464   }
465
466   return status;
467 }
468
469 void peer_init(peer_t p)
470 {
471   p->init = 0;
472   p->prev = NULL;
473   p->next = NULL;
474   p->pieces = 0;
475   p->close_asap = 0;
476   p->pending_sends = xbt_dynar_new(sizeof(msg_comm_t), NULL);
477   p->me = MSG_host_get_name(MSG_host_self());
478 }
479
480 void peer_shutdown(peer_t p)
481 {
482   float start_time = MSG_get_clock();
483   float end_time = start_time + PEER_SHUTDOWN_DEADLINE;
484
485   XBT_INFO("Waiting for sends to finish before shutdown...");
486   while (xbt_dynar_length(p->pending_sends) && MSG_get_clock() < end_time) {
487     process_pending_connections(p->pending_sends);
488     MSG_process_sleep(0.1);
489   }
490
491   xbt_assert(xbt_dynar_length(p->pending_sends) == 0, "Shutdown failed, sends still pending after deadline");
492   xbt_dynar_free(&p->pending_sends);
493
494   xbt_free(p);
495 }
496
497 /** Peer function  */
498 int peer(int argc, char *argv[])
499 {
500   peer_t p = xbt_new(s_peer_t, 1);
501   msg_error_t status;
502
503   XBT_INFO("peer");
504
505   peer_init(p);
506   status = peer_wait_for_message(p);
507   peer_shutdown(p);
508
509   return MSG_OK;
510 }                               /* end_of_receiver */
511
512
513 /** Test function */
514 msg_error_t test_all(const char *platform_file,
515                      const char *application_file)
516 {
517
518   msg_error_t res = MSG_OK;
519
520
521
522   XBT_INFO("test_all");
523
524   /*  Simulation setting */
525   MSG_create_environment(platform_file);
526
527   /* Trace categories */
528   TRACE_category_with_color("host0", "0 0 1");
529   TRACE_category_with_color("host1", "0 1 0");
530   TRACE_category_with_color("host2", "0 1 1");
531   TRACE_category_with_color("host3", "1 0 0");
532   TRACE_category_with_color("host4", "1 0 1");
533   TRACE_category_with_color("host5", "1 1 0");
534
535   /*   Application deployment */
536   MSG_function_register("broadcaster", broadcaster);
537   MSG_function_register("peer", peer);
538
539   MSG_launch_application(application_file);
540
541   res = MSG_main();
542
543   return res;
544 }                               /* end_of_test_all */
545
546
547 /** Main function */
548 int main(int argc, char *argv[])
549 {
550   msg_error_t res = MSG_OK;
551
552 #ifdef _MSC_VER
553   unsigned int prev_exponent_format =
554       _set_output_format(_TWO_DIGIT_EXPONENT);
555 #endif
556
557   MSG_init(&argc, argv);
558
559   /*if (argc <= 3) {
560     XBT_CRITICAL("Usage: %s platform_file deployment_file <model>\n",
561               argv[0]);
562     XBT_CRITICAL
563         ("example: %s msg_platform.xml msg_deployment.xml KCCFLN05_Vegas\n",
564          argv[0]);
565     exit(1);
566   }*/
567
568   /* Options for the workstation/model:
569
570      KCCFLN05              => for maxmin
571      KCCFLN05_proportional => for proportional (Vegas)
572      KCCFLN05_Vegas        => for TCP Vegas
573      KCCFLN05_Reno         => for TCP Reno
574    */
575   //MSG_config("workstation/model", argv[3]);
576
577   res = test_all(argv[1], argv[2]);
578
579   XBT_INFO("Total simulation time: %le", MSG_get_clock());
580
581   MSG_clean();
582
583 #ifdef _MSC_VER
584   _set_output_format(prev_exponent_format);
585 #endif
586
587   if (res == MSG_OK)
588     return 0;
589   else
590     return 1;
591 }                               /* end_of_main */