3 /* amok_bandwidth - Bandwidth tests facilities */
5 /* Copyright (c) 2003-6 Martin Quinson. */
6 /* Copyright (c) 2006 Ahmed Harbaoui. */
7 /* All rights reserved. */
9 /* This program is free software; you can redistribute it and/or modify it
10 * under the terms of the license (GNU LGPL) which comes with this package. */
13 #include "amok/Bandwidth/bandwidth_private.h"
14 #include "gras/messages.h"
16 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(amok_bw,amok,"Bandwidth testing");
19 /******************************
20 * Stuff global to the module *
21 ******************************/
23 static short _amok_bw_initialized = 0;
25 /** @brief module initialization; all participating nodes must run this */
26 void amok_bw_init(void) {
30 if (! _amok_bw_initialized) {
38 _amok_bw_initialized++;
41 /** @brief module finalization */
42 void amok_bw_exit(void) {
43 if (! _amok_bw_initialized)
49 _amok_bw_initialized--;
52 /* ***************************************************************************
54 * ***************************************************************************/
55 static int amok_bw_cb_bw_handshake(gras_msg_cb_ctx_t ctx, void *payload);
56 static int amok_bw_cb_bw_request(gras_msg_cb_ctx_t ctx, void *payload);
58 void amok_bw_bw_init() {
59 gras_datadesc_type_t bw_request_desc, bw_res_desc;
61 /* Build the Bandwidth datatype descriptions */
62 bw_request_desc = gras_datadesc_struct("s_bw_request_t");
63 gras_datadesc_struct_append(bw_request_desc,"peer",
64 gras_datadesc_by_name("s_xbt_peer_t"));
65 gras_datadesc_struct_append(bw_request_desc,"buf_size",
66 gras_datadesc_by_name("unsigned long int"));
67 gras_datadesc_struct_append(bw_request_desc,"exp_size",
68 gras_datadesc_by_name("unsigned long int"));
69 gras_datadesc_struct_append(bw_request_desc,"msg_size",
70 gras_datadesc_by_name("unsigned long int"));
71 gras_datadesc_struct_append(bw_request_desc,"min_duration",
72 gras_datadesc_by_name("double"));
73 gras_datadesc_struct_close(bw_request_desc);
74 bw_request_desc = gras_datadesc_ref("bw_request_t",bw_request_desc);
76 bw_res_desc = gras_datadesc_struct("s_bw_res_t");
77 gras_datadesc_struct_append(bw_res_desc,"timestamp",gras_datadesc_by_name("unsigned int"));
78 gras_datadesc_struct_append(bw_res_desc,"seconds",gras_datadesc_by_name("double"));
79 gras_datadesc_struct_append(bw_res_desc,"bw",gras_datadesc_by_name("double"));
80 gras_datadesc_struct_close(bw_res_desc);
81 bw_res_desc = gras_datadesc_ref("bw_res_t",bw_res_desc);
83 gras_msgtype_declare_rpc("BW handshake",bw_request_desc,bw_request_desc);
85 gras_msgtype_declare_rpc("BW reask",bw_request_desc,NULL);
86 gras_msgtype_declare("BW stop", NULL);
88 gras_msgtype_declare_rpc("BW request", bw_request_desc,bw_res_desc);
90 void amok_bw_bw_join() {
91 gras_cb_register(gras_msgtype_by_name("BW request"),
92 &amok_bw_cb_bw_request);
93 gras_cb_register(gras_msgtype_by_name("BW handshake"),
94 &amok_bw_cb_bw_handshake);
96 void amok_bw_bw_leave() {
97 gras_cb_unregister(gras_msgtype_by_name("BW request"),
98 &amok_bw_cb_bw_request);
99 gras_cb_unregister(gras_msgtype_by_name("BW handshake"),
100 &amok_bw_cb_bw_handshake);
104 * \brief bandwidth measurement between localhost and \e peer
106 * \arg peer: A (regular) socket at which the the host with which we should conduct the experiment can be contacted
107 * \arg buf_size: Size of the socket buffer. If 0, a sain default is used (32k, but may change)
108 * \arg exp_size: Total size of data sent across the network
109 * \arg msg_size: Size of each message sent. Ie, (\e expSize % \e msgSize) messages will be sent.
110 * \arg min_duration: The minimum wanted duration. When the test message is too little, you tend to measure the latency. This argument allows you to force the test to take at least, say one second.
111 * \arg sec: where the result (in seconds) should be stored. If the experiment was done several times because the first one was too short, this is the timing of the last run only.
112 * \arg bw: observed Bandwidth (in byte/s)
114 * Conduct a bandwidth test from the local process to the given peer.
115 * This call is blocking until the end of the experiment.
117 * If the asked experiment lasts less than \a min_duration, another one will be
118 * launched. Sizes (both \a exp_size and \a msg_size) will be multiplicated by
119 * (\a min_duration / measured_duration) (plus 10% to be sure to eventually
120 * reach the \a min_duration). In that case, the reported bandwidth and
121 * duration are the ones of the last run. \a msg_size cannot go over 64Mb
122 * because we need to malloc a block of this size in RL to conduct the
123 * experiment, and we still don't want to visit the swap.
125 * Results are reported in last args, and sizes are in byte.
127 void amok_bw_test(gras_socket_t peer,
128 unsigned long int buf_size,
129 unsigned long int exp_size,
130 unsigned long int msg_size,
132 /*OUT*/ double *sec, double *bw) {
134 /* Measurement sockets for the experiments */
135 gras_socket_t measMasterIn=NULL,measIn,measOut=NULL;
137 bw_request_t request,request_ack;
140 for (port = 5000; port < 10000 && measMasterIn == NULL; port++) {
142 measMasterIn = gras_socket_server_ext(++port,buf_size,1);
145 if (port == 10000 -1) {
146 RETHROW0("Error caught while opening a measurement socket: %s");
153 request=xbt_new0(s_bw_request_t,1);
154 request->buf_size=buf_size;
155 request->exp_size=exp_size;
156 request->msg_size=msg_size;
157 request->peer.name = NULL;
158 request->peer.port = gras_socket_my_port(measMasterIn);
159 DEBUG5("Handshaking with %s:%d to connect it back on my %d (expsize=%ld byte= %ld b)",
160 gras_socket_peer_name(peer),gras_socket_peer_port(peer), request->peer.port,
161 buf_size,request->buf_size);
164 gras_msg_rpccall(peer,15,
165 gras_msgtype_by_name("BW handshake"),&request, &request_ack);
167 RETHROW0("Error encountered while sending the BW request: %s");
169 measIn = gras_socket_meas_accept(measMasterIn);
172 measOut=gras_socket_client_ext(gras_socket_peer_name(peer),
173 request_ack->peer.port,
174 request->buf_size,1);
176 RETHROW2("Error encountered while opening the measurement socket to %s:%d for BW test: %s",
177 gras_socket_peer_name(peer),request_ack->peer.port);
179 DEBUG1("Got ACK; conduct the experiment (msg_size=%ld)",request->msg_size);
184 double meas_duration=*sec;
185 request->exp_size = request->exp_size * (min_duration / meas_duration) * 1.1;
186 request->msg_size = request->msg_size * (min_duration / meas_duration) * 1.1;
187 if (request->msg_size > 64*1024*1024)
188 request->msg_size = 64*1024*1024;
190 VERB5("The experiment was too short (%f sec<%f sec). Redo it with exp_size=%ld msg_size=%ld (got %fkb/s)",
191 meas_duration,min_duration,request->exp_size,request->msg_size,((double)exp_size) / *sec/1024);
192 gras_msg_rpccall(peer, 60, gras_msgtype_by_name("BW reask"),&request, NULL);
197 gras_socket_meas_send(measOut,120,request->exp_size,request->msg_size);
198 DEBUG0("Data sent. Wait ACK");
199 gras_socket_meas_recv(measIn,120,1,1);
201 gras_socket_close(measOut);
202 gras_socket_close(measMasterIn);
203 gras_socket_close(measIn);
204 RETHROW0("Unable to conduct the experiment: %s");
206 DEBUG0("Experiment done");
208 *sec = gras_os_time() - *sec;
209 *bw = ((double)request->exp_size) / *sec;
210 } while (*sec < min_duration);
212 DEBUG2("This measurement was long enough (%f sec; found %f b/s). Stop peer",
214 gras_msg_send(peer, gras_msgtype_by_name("BW stop"), NULL);
218 if (measIn != measMasterIn)
219 gras_socket_close(measIn);
220 gras_socket_close(measMasterIn);
221 gras_socket_close(measOut);
225 /* Callback to the "BW handshake" message:
226 opens a server measurement socket,
227 indicate its port in an "BW handshaked" message,
228 receive the corresponding data on the measurement socket,
229 close the measurment socket
233 int amok_bw_cb_bw_handshake(gras_msg_cb_ctx_t ctx,
235 gras_socket_t expeditor = gras_msg_cb_ctx_from(ctx);
236 gras_socket_t measMasterIn=NULL,measIn=NULL,measOut=NULL;
237 bw_request_t request=*(bw_request_t*)payload;
242 gras_msg_cb_ctx_t ctx_reask;
243 static xbt_dynar_t msgtwaited=NULL;
245 DEBUG5("Handshaked to connect to %s:%d (sizes: buf=%lu exp=%lu msg=%lu)",
246 gras_socket_peer_name(expeditor),request->peer.port,
247 request->buf_size,request->exp_size,request->msg_size);
249 /* Build our answer */
250 answer = xbt_new0(s_bw_request_t,1);
252 for (port = 6000; port <= 10000 && measMasterIn == NULL; port++) {
254 measMasterIn = gras_socket_server_ext(port,request->buf_size,1);
260 /* FIXME: tell error to remote */
261 RETHROW0("Error encountered while opening a measurement server socket: %s");
265 answer->buf_size=request->buf_size;
266 answer->exp_size=request->exp_size;
267 answer->msg_size=request->msg_size;
268 answer->peer.port=gras_socket_my_port(measMasterIn);
271 gras_msg_rpcreturn(60,ctx,&answer);
273 gras_socket_close(measMasterIn);
274 /* FIXME: tell error to remote */
275 RETHROW0("Error encountered while sending the answer: %s");
279 /* Don't connect asap to leave time to other side to enter the accept() */
281 measOut = gras_socket_client_ext(gras_socket_peer_name(expeditor),
283 request->buf_size,1);
285 RETHROW2("Error encountered while opening a measurement socket back to %s:%d : %s",
286 gras_socket_peer_name(expeditor),request->peer.port);
287 /* FIXME: tell error to remote */
291 measIn = gras_socket_meas_accept(measMasterIn);
292 DEBUG4("BW handshake answered. buf_size=%lu exp_size=%lu msg_size=%lu port=%d",
293 answer->buf_size,answer->exp_size,answer->msg_size,answer->peer.port);
295 gras_socket_close(measMasterIn);
296 gras_socket_close(measIn);
297 gras_socket_close(measOut);
298 /* FIXME: tell error to remote ? */
299 RETHROW0("Error encountered while opening the meas socket: %s");
303 msgtwaited = xbt_dynar_new(sizeof(gras_msgtype_t),NULL);
304 xbt_dynar_push(msgtwaited,gras_msgtype_by_name("BW stop"));
305 xbt_dynar_push(msgtwaited,gras_msgtype_by_name("BW reask"));
312 gras_socket_meas_recv(measIn, 120,request->exp_size,request->msg_size);
313 gras_socket_meas_send(measOut,120,1,1);
315 gras_socket_close(measMasterIn);
316 gras_socket_close(measIn);
317 gras_socket_close(measOut);
318 /* FIXME: tell error to remote ? */
319 RETHROW0("Error encountered while receiving the experiment: %s");
321 gras_msg_wait_or(60,msgtwaited,&ctx_reask,&msggot,&payload);
323 case 0: /* BW stop */
326 case 1: /* BW reask */
329 request = (bw_request_t)payload;
330 VERB0("Return the reasking RPC");
331 gras_msg_rpcreturn(60,ctx_reask,NULL);
333 gras_msg_cb_ctx_free(ctx_reask);
336 if (measIn != measMasterIn)
337 gras_socket_close(measMasterIn);
338 gras_socket_close(measIn);
339 gras_socket_close(measOut);
342 VERB0("BW experiment done.");
347 * \brief request a bandwidth measurement between two remote peers
349 * \arg from_name: Name of the first peer
350 * \arg from_port: port on which the first process is listening for messages
351 * \arg to_name: Name of the second peer
352 * \arg to_port: port on which the second process is listening (for messages, do not
353 * give a measurement socket here. The needed measurement sockets will be created
354 * automatically and negociated between the peers)
355 * \arg buf_size: Size of the socket buffer. If 0, a sain default is used (32k, but may change)
356 * \arg exp_size: Total size of data sent across the network
357 * \arg msg_size: Size of each message sent. (\e expSize % \e msgSize) messages will be sent.
358 * \arg sec: where the result (in seconds) should be stored.
359 * \arg bw: observed Bandwidth (in byte/s)
361 * Conduct a bandwidth test from the process from_peer:from_port to to_peer:to_port.
362 * This call is blocking until the end of the experiment.
364 * Results are reported in last args, and sizes are in bytes.
366 void amok_bw_request(const char* from_name,unsigned int from_port,
367 const char* to_name,unsigned int to_port,
368 unsigned long int buf_size,
369 unsigned long int exp_size,
370 unsigned long int msg_size,
372 /*OUT*/ double *sec, double*bw) {
376 bw_request_t request;
379 request=xbt_new0(s_bw_request_t,1);
380 request->buf_size=buf_size;
381 request->exp_size=exp_size;
382 request->msg_size=msg_size;
383 request->min_duration = min_duration;
386 request->peer.name = (char*)to_name;
387 request->peer.port = to_port;
390 sock = gras_socket_client(from_name,from_port);
394 DEBUG4("Ask for a BW test between %s:%d and %s:%d", from_name,from_port, to_name,to_port);
396 gras_msg_rpccall(sock,20*60,gras_msgtype_by_name("BW request"), &request, &result);
398 if (e.value==1) THROW1(0,1,"%s",to_name);
399 THROW1(0,0,"%s",from_name);
406 VERB6("BW test (%s:%d -> %s:%d) took %f sec (%f kb/s)",
407 from_name,from_port, to_name,to_port,
408 result->sec,((double)result->bw)/1024.0);
410 gras_socket_close(sock);
415 int amok_bw_cb_bw_request(gras_msg_cb_ctx_t ctx,
418 /* specification of the test to run, and our answer */
419 bw_request_t request = *(bw_request_t*)payload;
420 bw_res_t result = xbt_new0(s_bw_res_t,1);
421 gras_socket_t peer,asker;
424 asker=gras_msg_cb_ctx_from(ctx);
425 VERB4("Asked by %s:%d to conduct a bw XP with %s:%d",
426 gras_socket_peer_name(asker),gras_socket_peer_port(asker),
428 request->peer.name,request->peer.port);
429 peer = gras_socket_client(request->peer.name,request->peer.port);
432 request->buf_size,request->exp_size,request->msg_size,
433 request->min_duration,
434 &(result->sec),&(result->bw));
436 gras_msg_rpcreturn(240,ctx,&result);
437 }CATCH(e){THROW1(0,1,"%s",request->peer.name);
440 gras_socket_close(peer); /* FIXME: it should be blocking in RL until everything is sent */
441 free(request->peer.name);
448 /** \brief builds a matrix of results of bandwidth measurement */
449 double * amok_bw_matrix(xbt_dynar_t peers,
450 int buf_size_bw, int exp_size_bw, int msg_size_bw,
451 double min_duration) {
453 /* construction of matrices for bandwith and latency */
456 int i,j,len=xbt_dynar_length(peers);
458 double *matrix_res = xbt_new0(double, len*len);
461 xbt_dynar_foreach (peers,i,p1) {
462 xbt_dynar_foreach (peers,j,p2) {
464 /* Mesurements of Bandwidth */
465 amok_bw_request(p1->name,p1->port,p2->name,p2->port,
466 buf_size_bw,exp_size_bw,msg_size_bw,min_duration,
467 &sec,&matrix_res[i*len + j]);