#include "amok/Bandwidth/bandwidth_private.h"
#include "gras/messages.h"
-XBT_LOG_NEW_DEFAULT_SUBCATEGORY(bw,amok,"Bandwidth testing");
+XBT_LOG_NEW_DEFAULT_SUBCATEGORY(amok_bw,amok,"Bandwidth testing");
/******************************
gras_datadesc_by_name("unsigned long int"));
gras_datadesc_struct_append(bw_request_desc,"msg_size",
gras_datadesc_by_name("unsigned long int"));
+ gras_datadesc_struct_append(bw_request_desc,"min_duration",
+ gras_datadesc_by_name("double"));
gras_datadesc_struct_close(bw_request_desc);
bw_request_desc = gras_datadesc_ref("bw_request_t",bw_request_desc);
bw_res_desc = gras_datadesc_ref("bw_res_t",bw_res_desc);
gras_msgtype_declare_rpc("BW handshake",bw_request_desc,bw_request_desc);
+
+ gras_msgtype_declare_rpc("BW reask",bw_request_desc,NULL);
+ gras_msgtype_declare("BW stop", NULL);
+
gras_msgtype_declare_rpc("BW request", bw_request_desc,bw_res_desc);
}
void amok_bw_bw_join() {
* \brief bandwidth measurement between localhost and \e peer
*
* \arg peer: A (regular) socket at which the the host with which we should conduct the experiment can be contacted
- * \arg buf_size: Size of the socket buffer
+ * \arg buf_size: Size of the socket buffer. If 0, a sain default is used (32k, but may change)
* \arg exp_size: Total size of data sent across the network
* \arg msg_size: Size of each message sent. Ie, (\e expSize % \e msgSize) messages will be sent.
+ * \arg min_duration: The minimum wanted duration. When the test message is too little, you tend to measure the latency. This argument allows you to force the test to take at least, say one second.
* \arg sec: where the result (in seconds) should be stored.
* \arg bw: observed Bandwidth (in byte/s)
*
unsigned long int buf_size,
unsigned long int exp_size,
unsigned long int msg_size,
+ double min_duration,
/*OUT*/ double *sec, double *bw) {
/* Measurement sockets for the experiments */
if (port == 10000 -1) {
RETHROW0("Error caught while opening a measurement socket: %s");
} else {
- xbt_ex_free(&e);
+ xbt_ex_free(e);
}
}
}
request->msg_size=msg_size;
request->host.name = NULL;
request->host.port = gras_socket_my_port(measMasterIn);
- VERB5("Handshaking with %s:%d to connect it back on my %d (expsize=%ld byte= %ld b)",
+ DEBUG5("Handshaking with %s:%d to connect it back on my %d (expsize=%ld byte= %ld b)",
gras_socket_peer_name(peer),gras_socket_peer_port(peer), request->host.port,
buf_size,request->buf_size);
}
DEBUG1("Got ACK; conduct the experiment (msg_size=%ld)",request->msg_size);
- *sec=gras_os_time();
- TRY {
- gras_socket_meas_send(measOut,120,request->exp_size,request->msg_size);
- gras_socket_meas_recv(measIn,120,1,1);
- } CATCH(e) {
- gras_socket_close(measOut);
- gras_socket_close(measMasterIn);
- gras_socket_close(measIn);
- RETHROW0("Unable to conduct the experiment: %s");
- }
+ *sec = 0;
+ do {
+ if (*sec>0) {
+ double meas_duration=*sec;
+ request->exp_size = request->exp_size * (min_duration / meas_duration) * 1.1;
+
+ DEBUG4("The experiment was too short (%f sec<%f sec). Redo it with exp_size=%ld (got %fkb/s)",
+ meas_duration,min_duration,request->exp_size,((double)exp_size) / *sec/1024);
+ gras_msg_rpccall(peer, 60, gras_msgtype_by_name("BW reask"),&request, NULL);
+ }
+
+ *sec=gras_os_time();
+ TRY {
+ gras_socket_meas_send(measOut,120,request->exp_size,request->msg_size);
+ DEBUG0("Data sent. Wait ACK");
+ gras_socket_meas_recv(measIn,120,1,1);
+ } CATCH(e) {
+ gras_socket_close(measOut);
+ gras_socket_close(measMasterIn);
+ gras_socket_close(measIn);
+ RETHROW0("Unable to conduct the experiment: %s");
+ }
+ DEBUG0("Experiment done");
+
+ *sec = gras_os_time() - *sec;
+ *bw = ((double)exp_size) / *sec;
+ } while (*sec < min_duration);
- *sec = gras_os_time() - *sec;
- *bw = ((double)exp_size) / *sec;
+ DEBUG0("This measurement was long enough. Stop peer");
+ gras_msg_send(peer, gras_msgtype_by_name("BW stop"), NULL);
free(request_ack);
free(request);
bw_request_t answer;
xbt_ex_t e;
int port;
+ int tooshort = 1;
+ gras_msg_cb_ctx_t ctx_reask;
+ static xbt_dynar_t msgtwaited=NULL;
- VERB5("Handshaked to connect to %s:%d (sizes: buf=%lu exp=%lu msg=%lu)",
+ DEBUG5("Handshaked to connect to %s:%d (sizes: buf=%lu exp=%lu msg=%lu)",
gras_socket_peer_name(expeditor),request->host.port,
request->buf_size,request->exp_size,request->msg_size);
} CATCH(e) {
measMasterIn = NULL;
if (port < 10000)
- xbt_ex_free(&e);
+ xbt_ex_free(e);
else
/* FIXME: tell error to remote */
RETHROW0("Error encountered while opening a measurement server socket: %s");
measIn = gras_socket_meas_accept(measMasterIn);
DEBUG4("BW handshake answered. buf_size=%lu exp_size=%lu msg_size=%lu port=%d",
answer->buf_size,answer->exp_size,answer->msg_size,answer->host.port);
-
- gras_socket_meas_recv(measIn, 120,request->exp_size,request->msg_size);
- gras_socket_meas_send(measOut,120,1,1);
} CATCH(e) {
gras_socket_close(measMasterIn);
gras_socket_close(measIn);
gras_socket_close(measOut);
/* FIXME: tell error to remote ? */
- RETHROW0("Error encountered while receiving the experiment: %s");
+ RETHROW0("Error encountered while opening the meas socket: %s");
+ }
+
+ if (!msgtwaited) {
+ msgtwaited = xbt_dynar_new(sizeof(gras_msgtype_t),NULL);
+ xbt_dynar_push(msgtwaited,gras_msgtype_by_name("BW stop"));
+ xbt_dynar_push(msgtwaited,gras_msgtype_by_name("BW reask"));
+ }
+
+ while (tooshort) {
+ void *payload;
+ int msggot;
+ TRY {
+ gras_socket_meas_recv(measIn, 120,request->exp_size,request->msg_size);
+ gras_socket_meas_send(measOut,120,1,1);
+ DEBUG0("ACK sent");
+ } CATCH(e) {
+ gras_socket_close(measMasterIn);
+ gras_socket_close(measIn);
+ gras_socket_close(measOut);
+ /* FIXME: tell error to remote ? */
+ RETHROW0("Error encountered while receiving the experiment: %s");
+ }
+ gras_msg_wait_or(60,msgtwaited,&ctx_reask,&msggot,&payload);
+ switch(msggot) {
+ case 0: /* BW stop */
+ tooshort = 0;
+ break;
+ case 1: /* BW reask */
+ tooshort = 1;
+ free(request);
+ request = (bw_request_t)payload;
+ gras_msg_rpcreturn(60,ctx_reask,NULL);
+ }
+ gras_msg_cb_ctx_free(ctx_reask);
}
if (measIn != measMasterIn)
gras_socket_close(measOut);
free(answer);
free(request);
- DEBUG0("BW experiment done.");
+ VERB0("BW experiment done.");
return 1;
}
* \arg to_port: port on which the second process is listening (for messages, do not
* give a measurement socket here. The needed measurement sockets will be created
* automatically and negociated between the peers)
- * \arg buf_size: Size of the socket buffer
+ * \arg buf_size: Size of the socket buffer. If 0, a sain default is used (32k, but may change)
* \arg exp_size: Total size of data sent across the network
* \arg msg_size: Size of each message sent. (\e expSize % \e msgSize) messages will be sent.
* \arg sec: where the result (in seconds) should be stored.
unsigned long int buf_size,
unsigned long int exp_size,
unsigned long int msg_size,
+ double min_duration,
/*OUT*/ double *sec, double*bw) {
gras_socket_t sock;
request->buf_size=buf_size;
request->exp_size=exp_size;
request->msg_size=msg_size;
+ request->min_duration = min_duration;
request->host.name = (char*)to_name;
request->host.port = to_port;
if (bw)
*bw =result->bw;
- VERB6("BW test between %s:%d and %s:%d took %f sec, achieving %f kb/s",
+ VERB6("BW test (%s:%d -> %s:%d) took %f sec (%f kb/s)",
from_name,from_port, to_name,to_port,
- *sec,((double)*bw)/1024.0);
+ result->sec,((double)result->bw)/1024.0);
gras_socket_close(sock);
free(result);
/* specification of the test to run, and our answer */
bw_request_t request = *(bw_request_t*)payload;
bw_res_t result = xbt_new0(s_bw_res_t,1);
- gras_socket_t peer;
+ gras_socket_t peer,asker;
+ asker=gras_msg_cb_ctx_from(ctx);
+ VERB4("Asked by %s:%d to conduct a bw XP with %s:%d",
+ gras_socket_peer_name(asker),gras_socket_peer_port(asker),
+ request->host.name,request->host.port);
peer = gras_socket_client(request->host.name,request->host.port);
amok_bw_test(peer,
request->buf_size,request->exp_size,request->msg_size,
+ request->min_duration,
&(result->sec),&(result->bw));
gras_msg_rpcreturn(240,ctx,&result);
gras_os_sleep(1);
- gras_socket_close(peer);
+ gras_socket_close(peer); /* FIXME: it should be blocking in RL until everything is sent */
+ free(request->host.name);
free(request);
free(result);
}
double * amok_bw_matrix(xbt_dynar_t hosts,
- int buf_size_bw, int exp_size_bw, int msg_size_bw) {
+ int buf_size_bw, int exp_size_bw, int msg_size_bw,
+ double min_duration) {
double sec;
/* construct of matrixs for bandwith and Latency */
if (i!=j) {
/* Mesurements of Bandwidth */
amok_bw_request(h1->name,h1->port,h2->name,h2->port,
- buf_size_bw,exp_size_bw,msg_size_bw,&sec,&matrix_res[i*len + j]);
+ buf_size_bw,exp_size_bw,msg_size_bw,min_duration,
+ &sec,&matrix_res[i*len + j]);
}
}
}