1 /* Copyright (c) 2007, 2008, 2009, 2010. The SimGrid Team.
2 * Copyright (c) 2012. Maximiliano Geier.
3 * All rights reserved. */
5 /* This program is free software; you can redistribute it and/or modify it
6 * under the terms of the license (GNU LGPL) which comes with this package. */
11 #include "msg/msg.h" /* Yeah! If you want to use msg, you need to include msg/msg.h */
12 #include "xbt/sysdep.h" /* calloc */
14 /* Create a log channel to have nice outputs. */
16 #include "xbt/asserts.h"
18 /** @addtogroup MSG_examples
20 * - <b>kadeploy/kadeploy.c: Kadeploy implementation</b>.
24 XBT_LOG_NEW_DEFAULT_CATEGORY(msg_kadeploy,
25 "Messages specific for kadeploy");
27 #define MESSAGE_SIZE 1
28 #define HOSTNAME_LENGTH 20
34 /* Random iterator for xbt_dynar */
35 typedef struct xbt_dynar_iterator_struct {
37 xbt_dynar_t indices_list;
40 int (*criteria_fn)(void* it);
41 } *xbt_dynar_iterator_t;
42 typedef struct xbt_dynar_iterator_struct xbt_dynar_iterator_s;
46 MESSAGE_BUILD_CHAIN = 0,
52 typedef struct s_message {
54 const char *issuer_hostname;
56 const char *prev_hostname;
57 const char *next_hostname;
58 const char *data_block;
59 unsigned int data_length;
60 } s_message_t, *message_t;
62 /* Iterator methods */
63 xbt_dynar_iterator_t xbt_dynar_iterator_new(xbt_dynar_t list, int (*criteria_fn)(void*));
64 void *xbt_dynar_iterator_next(xbt_dynar_iterator_t it);
65 void xbt_dynar_iterator_delete(xbt_dynar_iterator_t it);
66 int xbt_dynar_iterator_forward_criteria(void *p);
69 msg_task_t task_message_new(e_message_type type, const char *issuer_hostname, const char *mailbox);
70 msg_task_t task_message_chain_new(const char *issuer_hostname, const char *mailbox, const char* prev, const char *next);
71 msg_task_t task_message_data_new(const char *issuer_hostname, const char *mailbox, const char *block, unsigned int len);
72 msg_task_t task_message_end_data_new(const char *issuer_hostname, const char *mailbox);
73 void task_message_delete(void *);
76 int broadcaster(int argc, char *argv[]);
77 int peer(int argc, char *argv[]);
79 xbt_dynar_t build_hostlist_from_hostcount(int hostcount);
80 /*xbt_dynar_t build_hostlist_from_argv(int argc, char *argv[]);*/
82 /* Broadcaster: helper functions */
83 int broadcaster_build_chain(xbt_dynar_t host_list);
84 int broadcaster_send_file(xbt_dynar_t host_list);
85 int broadcaster_finish(xbt_dynar_t host_list);
87 /* Peer: helper functions */
88 int peer_wait_for_init();
90 /* Initialization stuff */
91 msg_error_t test_all(const char *platform_file,
92 const char *application_file);
94 /* Allocates and initializes a new xbt_dynar iterator for list, using criteria_fn as iteration criteria
95 criteria_fn: given an iterator, it must update the iterator and give the next element's index,
96 less than 0 otherwise*/
97 xbt_dynar_iterator_t xbt_dynar_iterator_new(xbt_dynar_t list, int (*criteria_fn)(void*))
99 xbt_dynar_iterator_t it = xbt_new(xbt_dynar_iterator_s, 1);
102 it->length = xbt_dynar_length(list);
103 it->indices_list = xbt_dynar_new(sizeof(int), NULL);
104 it->criteria_fn = criteria_fn;
108 /* Returns the next element iterated by iterator it, NULL if there are no more elements */
109 void *xbt_dynar_iterator_next(xbt_dynar_iterator_t it)
111 int next = it->criteria_fn((xbt_dynar_iterator_t)it);
112 XBT_INFO("%d current\n", next);
114 XBT_INFO("Nothing to return!\n");
117 xbt_dynar_push(it->indices_list, &next);
118 return xbt_dynar_get_ptr(it->list, next);
122 void xbt_dynar_iterator_delete(xbt_dynar_iterator_t it)
124 xbt_dynar_free_container(&(it->indices_list));
128 int xbt_dynar_iterator_forward_criteria(void *p)
130 xbt_dynar_iterator_t it = (xbt_dynar_iterator_t)p;
132 if (it->current == -1) {
133 /* iterator initialization */
136 if (it->current < it->length) {
144 msg_task_t task_message_new(e_message_type type, const char *issuer_hostname, const char *mailbox)
146 message_t msg = xbt_new(s_message_t, 1);
148 msg->issuer_hostname = issuer_hostname;
149 msg->mailbox = mailbox;
150 msg_task_t task = MSG_task_create(NULL, 0, MESSAGE_SIZE, msg);
155 msg_task_t task_message_chain_new(const char *issuer_hostname, const char *mailbox, const char* prev, const char *next)
157 msg_task_t task = task_message_new(MESSAGE_BUILD_CHAIN, issuer_hostname, mailbox);
158 message_t msg = MSG_task_get_data(task);
159 msg->prev_hostname = prev;
160 msg->next_hostname = next;
165 msg_task_t task_message_data_new(const char *issuer_hostname, const char *mailbox, const char *block, unsigned int len)
167 msg_task_t task = task_message_new(MESSAGE_SEND_DATA, issuer_hostname, mailbox);
168 message_t msg = MSG_task_get_data(task);
169 msg->data_block = block;
170 msg->data_length = len;
175 msg_task_t task_message_end_data_new(const char *issuer_hostname, const char *mailbox)
177 return task_message_new(MESSAGE_END_DATA, issuer_hostname, mailbox);
181 void task_message_delete(void *task)
183 message_t msg = MSG_task_get_data(task);
185 MSG_task_destroy(task);
188 xbt_dynar_t build_hostlist_from_hostcount(int hostcount)
190 xbt_dynar_t host_list = xbt_dynar_new(sizeof(char*), NULL);
191 char *hostname = NULL;
195 for (; i < hostcount+1; i++) {
196 hostname = xbt_new(char, HOSTNAME_LENGTH);
197 snprintf(hostname, HOSTNAME_LENGTH, "host%d", i);
198 XBT_INFO("%s", hostname);
199 h = MSG_get_host_by_name(hostname);
201 XBT_INFO("Unknown host %s. Stopping Now! ", hostname);
204 xbt_dynar_push(host_list, &hostname);
210 /*xbt_dynar_t build_hostlist_from_argv(int argc, char *argv[])
212 xbt_dynar_t host_list = xbt_dynar_new(sizeof(char*), NULL);
216 for (; i < argc; i++) {
217 XBT_INFO("host%d = %s", i, argv[i]);
218 h = MSG_get_host_by_name(argv[i]);
220 XBT_INFO("Unknown host %s. Stopping Now! ", argv[i]);
223 xbt_dynar_push(host_list, &(argv[i]));
229 void delete_hostlist(xbt_dynar_t h)
234 int broadcaster_build_chain(xbt_dynar_t host_list)
236 xbt_dynar_iterator_t it = xbt_dynar_iterator_new(host_list, xbt_dynar_iterator_forward_criteria);
237 msg_task_t task = NULL;
238 char **cur = (char**)xbt_dynar_iterator_next(it);
239 const char *current_host = NULL;
240 const char *prev = NULL;
241 const char *next = NULL;
242 const char *me = MSG_host_get_name(MSG_host_self());
243 const char *last = NULL;
245 /* Build the chain if there's at least one peer */
247 /* init: prev=NULL, host=current cur, next=next cur */
250 /* This iterator iterates one step ahead: cur is current iterated element,
251 but it's actually the next one in the chain */
253 /* following steps: prev=last, host=next, next=cur */
254 cur = (char**)xbt_dynar_iterator_next(it);
261 XBT_INFO("Building chain -- broadcaster:\"%s\" dest:\"%s\" prev:\"%s\" next:\"%s\"", me, current_host, prev, next);
263 /* Send message to current peer */
264 task = task_message_chain_new(me, current_host, prev, next);
265 MSG_task_send(task, current_host);
268 } while (cur != NULL);
270 xbt_dynar_iterator_delete(it);
275 int broadcaster_send_file(xbt_dynar_t host_list)
282 int broadcaster_finish(xbt_dynar_t host_list)
284 xbt_dynar_iterator_t it = xbt_dynar_iterator_new(host_list, xbt_dynar_iterator_forward_criteria);
285 msg_task_t task = NULL;
286 const char *me = MSG_host_get_name(MSG_host_self());
287 const char *current_host = NULL;
290 /* Send goodbye message to every peer */
291 for (cur = (char**)xbt_dynar_iterator_next(it); cur != NULL; cur = (char**)xbt_dynar_iterator_next(it)) {
292 /* Send message to current peer */
294 task = task_message_end_data_new(me, current_host);
295 MSG_task_send(task, current_host);
302 /** Emitter function */
303 int broadcaster(int argc, char *argv[])
305 xbt_dynar_t host_list = NULL;
306 const char *first = NULL;
307 int status = !MSG_OK;
309 XBT_INFO("broadcaster");
311 /* Check that every host given by the hostcount in argv[1] exists and add it
312 to a dynamic array */
313 host_list = build_hostlist_from_hostcount(atoi(argv[1]));
314 /*host_list = build_hostlist_from_argv(argc, argv);*/
316 /* TODO: Error checking */
317 status = broadcaster_build_chain(host_list);
318 status = broadcaster_send_file(host_list);
319 status = broadcaster_finish(host_list);
321 delete_hostlist(host_list);
324 /*time = MSG_get_clock();
325 sprintf(sprintf_buffer_la, "latency task");
327 MSG_task_create(sprintf_buffer_la, 0.0, task_comm_size_lat, NULL);
328 task_la->data = xbt_new(double, 1);
329 *(double *) task_la->data = time;
330 XBT_INFO("task_la->data = %le", *((double *) task_la->data));
331 MSG_task_send(task_la, argv[1]);*/
334 /*time = MSG_get_clock();
335 sprintf(sprintf_buffer_bw, "bandwidth task");
337 MSG_task_create(sprintf_buffer_bw, 0.0, task_comm_size_bw, NULL);
338 task_bw->data = xbt_new(double, 1);
339 *(double *) task_bw->data = time;
340 XBT_INFO("task_bw->data = %le", *((double *) task_bw->data));
341 MSG_task_send(task_bw, argv[1]);
346 int peer_wait_for_init()
348 msg_task_t task = NULL;
349 const char *me = MSG_host_get_name(MSG_host_self());
351 int a = MSG_task_receive(&task, me);
354 XBT_INFO("Peer %s got message\n", me);
357 task_message_delete(task);
363 int peer(int argc, char *argv[])
365 double time, time1, sender_time;
366 msg_task_t task_la = NULL;
367 msg_task_t task_bw = NULL;
369 double communication_time = 0;
373 time = MSG_get_clock();
375 a = peer_wait_for_init();
377 /*a = MSG_task_receive(&task_la,MSG_host_get_name(MSG_host_self()));
379 time1 = MSG_get_clock();
380 sender_time = *((double *) (task_la->data));
382 communication_time = time1 - time;
383 XBT_INFO("Task received : %s", task_la->name);
384 xbt_free(task_la->data);
385 MSG_task_destroy(task_la);
386 XBT_INFO("Communic. time %le", communication_time);
387 XBT_INFO("--- la %f ----", communication_time);
389 xbt_die("Unexpected behavior");
394 /*a = MSG_task_receive(&task_bw,MSG_host_get_name(MSG_host_self()));
396 time1 = MSG_get_clock();
397 sender_time = *((double *) (task_bw->data));
399 communication_time = time1 - time;
400 XBT_INFO("Task received : %s", task_bw->name);
401 xbt_free(task_bw->data);
402 MSG_task_destroy(task_bw);
403 XBT_INFO("Communic. time %le", communication_time);
404 XBT_INFO("--- bw %f ----", task_comm_size_bw / communication_time);
406 xbt_die("Unexpected behavior");
411 } /* end_of_receiver */
415 msg_error_t test_all(const char *platform_file,
416 const char *application_file)
419 msg_error_t res = MSG_OK;
423 XBT_INFO("test_all");
425 /* Simulation setting */
426 MSG_create_environment(platform_file);
428 /* Application deployment */
429 MSG_function_register("broadcaster", broadcaster);
430 MSG_function_register("peer", peer);
432 MSG_launch_application(application_file);
437 } /* end_of_test_all */
441 int main(int argc, char *argv[])
443 msg_error_t res = MSG_OK;
446 unsigned int prev_exponent_format =
447 _set_output_format(_TWO_DIGIT_EXPONENT);
450 MSG_init(&argc, argv);
454 XBT_CRITICAL("Usage: %s platform_file deployment_file <model>\n",
457 ("example: %s msg_platform.xml msg_deployment.xml KCCFLN05_Vegas\n",
462 /* Options for the workstation/model:
464 KCCFLN05 => for maxmin
465 KCCFLN05_proportional => for proportional (Vegas)
466 KCCFLN05_Vegas => for TCP Vegas
467 KCCFLN05_Reno => for TCP Reno
469 //MSG_config("workstation/model", argv[3]);
471 res = test_all(argv[1], argv[2]);
473 XBT_INFO("Total simulation time: %le", MSG_get_clock());
478 _set_output_format(prev_exponent_format);