1 /* transport - low level communication */
3 /* Copyright (c) 2004, 2005, 2006, 2007, 2008, 2009, 2010. The SimGrid Team.
4 * All rights reserved. */
6 /* This program is free software; you can redistribute it and/or modify it
7 * under the terms of the license (GNU LGPL) which comes with this package. */
12 int gras_opt_trp_nomoredata_on_close = 0;
17 #include "gras/Transport/transport_private.h"
18 #include "gras/Msg/msg_interface.h"
20 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(gras_trp, gras,
21 "Conveying bytes over the network");
22 XBT_LOG_NEW_SUBCATEGORY(gras_trp_meas, gras_trp,
23 "Conveying bytes over the network without formating for perf measurements");
24 static short int _gras_trp_started = 0;
26 static xbt_dict_t _gras_trp_plugins; /* All registered plugins */
27 static void gras_trp_plugin_free(void *p); /* free one of the plugins */
29 static void gras_trp_plugin_new(const char *name, gras_trp_setup_t setup)
33 gras_trp_plugin_t plug = xbt_new0(s_gras_trp_plugin_t, 1);
35 XBT_DEBUG("Create plugin %s", name);
37 plug->name = xbt_strdup(name);
43 if (e.category == mismatch_error) {
44 /* SG plugin raise mismatch when in RL mode (and vice versa) */
55 xbt_dict_set(_gras_trp_plugins, name, plug, gras_trp_plugin_free);
58 void gras_trp_init(void)
60 if (!_gras_trp_started) {
61 /* make room for all plugins */
62 _gras_trp_plugins = xbt_dict_new();
64 #ifdef HAVE_WINSOCK2_H
65 /* initialize the windows mechanism */
67 WORD wVersionRequested;
70 wVersionRequested = MAKEWORD(2, 0);
72 res = WSAStartup(wVersionRequested, &wsaData);
73 xbt_assert(res == 0, "Cannot find a usable WinSock DLL");
75 /* Confirm that the WinSock DLL supports 2.0. */
76 /* Note that if the DLL supports versions greater */
77 /* than 2.0 in addition to 2.0, it will still return */
78 /* 2.0 in wVersion since that is the version we */
81 xbt_assert(LOBYTE(wsaData.wVersion) == 2 &&
82 HIBYTE(wsaData.wVersion) == 0,
83 "Cannot find a usable WinSock DLL");
84 XBT_INFO("Found and initialized winsock2");
85 } /* The WinSock DLL is acceptable. Proceed. */
90 res = WSAStartup(0x0101, &wsaData);
91 xbt_assert(res == 0, "Cannot find a usable WinSock DLL");
92 XBT_INFO("Found and initialized winsock");
97 gras_trp_plugin_new("file", gras_trp_file_setup);
98 gras_trp_plugin_new("sg", gras_trp_sg_setup);
99 gras_trp_plugin_new("tcp", gras_trp_tcp_setup);
105 void gras_trp_exit(void)
107 XBT_DEBUG("gras_trp value %d", _gras_trp_started);
108 if (_gras_trp_started == 0) {
112 if (--_gras_trp_started == 0) {
113 #ifdef HAVE_WINSOCK_H
114 if (WSACleanup() == SOCKET_ERROR) {
115 if (WSAGetLastError() == WSAEINPROGRESS) {
116 WSACancelBlockingCall();
122 /* Delete the plugins */
123 xbt_dict_free(&_gras_trp_plugins);
128 void gras_trp_plugin_free(void *p)
130 gras_trp_plugin_t plug = p;
135 } else if (plug->data) {
136 XBT_DEBUG("Plugin %s lacks exit(). Free data anyway.", plug->name);
147 * gras_trp_socket_new:
149 * Malloc a new socket, and initialize it with defaults
151 void gras_trp_socket_new(int incoming, gras_socket_t * dst)
154 gras_socket_t sock = xbt_new0(s_gras_socket_t, 1);
156 XBT_VERB("Create a new socket (%p)", (void *) sock);
160 sock->incoming = incoming ? 1 : 0;
161 sock->outgoing = incoming ? 0 : 1;
162 sock->accepting = incoming ? 1 : 0;
172 sock->bufdata = NULL;
180 * @brief Opens a server socket and makes it ready to be listened to.
181 * @param port: port on which you want to listen
182 * @param buf_size: size of the buffer (in byte) on the socket (for TCP sockets only). If 0, a sain default is used (32k, but may change)
183 * @param measurement: whether this socket is meant to convey measurement (if you don't know, use 0 to exchange regular messages)
185 * In real life, you'll get a TCP socket.
188 gras_socket_server_ext(unsigned short port,
189 unsigned long int buf_size, int measurement)
191 gras_trp_plugin_t trp;
194 XBT_DEBUG("Create a server socket from plugin %s on port %d",
195 gras_if_RL()? "tcp" : "sg", port);
196 trp = gras_trp_plugin_get_by_name(gras_if_SG()? "sg" : "tcp");
198 /* defaults settings */
199 gras_trp_socket_new(1, &sock);
201 sock->buf_size = buf_size > 0 ? buf_size : 32 * 1024;
202 sock->meas = measurement;
204 /* Call plugin socket creation function */
205 XBT_DEBUG("Prepare socket with plugin (fct=%p)", trp->socket_server);
207 trp->socket_server(trp, port, sock);
208 XBT_DEBUG("in=%c out=%c accept=%c",
209 sock->incoming ? 'y' : 'n',
210 sock->outgoing ? 'y' : 'n', sock->accepting ? 'y' : 'n');
218 ((gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id))->myport
220 xbt_dynar_push(((gras_trp_procdata_t)
221 gras_libdata_by_id(gras_trp_libdata_id))->sockets,
224 gras_msg_listener_awake();
229 * @brief Opens a server socket on any port in the given range
231 * @param minport: first port we will try
232 * @param maxport: last port we will try
233 * @param buf_size: size of the buffer (in byte) on the socket (for TCP sockets only). If 0, a sain default is used (32k, but may change)
234 * @param measurement: whether this socket is meant to convey measurement (if you don't know, use 0 to exchange regular messages)
236 * If none of the provided ports works, raises the exception got when trying the last possibility
239 gras_socket_server_range(unsigned short minport, unsigned short maxport,
240 unsigned long int buf_size, int measurement)
244 gras_socket_t res = NULL;
247 for (port = minport; port < maxport; port++) {
249 res = gras_socket_server_ext(port, buf_size, measurement);
263 * @brief Opens a client socket to a remote host.
264 * @param host: who you want to connect to
265 * @param port: where you want to connect to on this host
266 * @param buf_size: size of the buffer (in bytes) on the socket (for TCP sockets only). If 0, a sain default is used (32k, but may change)
267 * @param measurement: whether this socket is meant to convey measurement (if you don't know, use 0 to exchange regular messages)
269 * In real life, you'll get a TCP socket.
272 gras_socket_client_ext(const char *host,
274 unsigned long int buf_size, int measurement)
276 gras_trp_plugin_t trp;
279 trp = gras_trp_plugin_get_by_name(gras_if_SG()? "sg" : "tcp");
281 XBT_DEBUG("Create a client socket from plugin %s",
282 gras_if_RL()? "tcp" : "sg");
283 /* defaults settings */
284 gras_trp_socket_new(0, &sock);
286 sock->buf_size = buf_size > 0 ? buf_size : 32 * 1024;
287 sock->meas = measurement;
289 /* plugin-specific */
291 (*trp->socket_client) (trp,host,port,sock);
292 XBT_DEBUG("in=%c out=%c accept=%c",
293 sock->incoming ? 'y' : 'n',
294 sock->outgoing ? 'y' : 'n', sock->accepting ? 'y' : 'n');
300 xbt_dynar_push(((gras_trp_procdata_t)
301 gras_libdata_by_id(gras_trp_libdata_id))->sockets,
303 gras_msg_listener_awake();
308 * @brief Opens a server socket and make it ready to be listened to.
310 * In real life, you'll get a TCP socket.
312 gras_socket_t gras_socket_server(unsigned short port)
314 return gras_socket_server_ext(port, 32 * 1024, 0);
317 /** @brief Opens a client socket to a remote host */
318 gras_socket_t gras_socket_client(const char *host, unsigned short port)
320 return gras_socket_client_ext(host, port, 0, 0);
323 /** @brief Opens a client socket to a remote host specified as '\a host:\a port' */
324 gras_socket_t gras_socket_client_from_string(const char *host)
326 xbt_peer_t p = xbt_peer_from_string(host);
327 gras_socket_t res = gras_socket_client_ext(p->name, p->port, 0, 0);
332 void gras_socket_close_voidp(void *sock)
334 gras_socket_close((gras_socket_t) sock);
337 /** \brief Close socket */
338 void gras_socket_close(gras_socket_t sock)
340 if (--sock->refcount)
343 xbt_dynar_t sockets =
344 ((gras_trp_procdata_t)
345 gras_libdata_by_id(gras_trp_libdata_id))->sockets;
346 gras_socket_t sock_iter = NULL;
350 XBT_VERB("Close %p", sock);
351 if (sock == _gras_lastly_selected_socket) {
352 xbt_assert(!gras_opt_trp_nomoredata_on_close || !sock->moredata,
353 "Closing a socket having more data in buffer while the nomoredata_on_close option is activated");
357 ("Closing a socket having more data in buffer. Option nomoredata_on_close disabled, so continuing.");
358 _gras_lastly_selected_socket = NULL;
361 /* FIXME: Issue an event when the socket is closed */
362 XBT_DEBUG("sockets pointer before %p", sockets);
364 /* FIXME: Cannot get the dynar mutex, because it can be already locked */
365 // _xbt_dynar_foreach(sockets,cursor,sock_iter) {
366 for (cursor = 0; cursor < xbt_dynar_length(sockets); cursor++) {
367 _xbt_dynar_cursor_get(sockets, cursor, &sock_iter);
368 if (sock == sock_iter) {
369 XBT_DEBUG("remove sock cursor %d dize %lu\n", cursor,
370 xbt_dynar_length(sockets));
371 xbt_dynar_cursor_rm(sockets, &cursor);
372 if (sock->plugin->socket_close)
373 (*sock->plugin->socket_close) (sock);
375 /* free the memory */
382 ("Ignoring request to free an unknown socket (%p). Execution stack:",
384 xbt_backtrace_display_current();
392 * Send a bunch of bytes from on socket
393 * (stable if we know the storage will keep as is until the next trp_flush)
395 void gras_trp_send(gras_socket_t sd, char *data, long int size, int stable)
397 xbt_assert(sd->outgoing, "Socket not suited for data send");
398 (*sd->plugin->send) (sd, data, size, stable);
402 * gras_trp_chunk_recv:
404 * Receive a bunch of bytes from a socket
406 void gras_trp_recv(gras_socket_t sd, char *data, long int size)
408 xbt_assert(sd->incoming, "Socket not suited for data receive");
409 (sd->plugin->recv) (sd, data, size);
415 * Make sure all pending communications are done
417 void gras_trp_flush(gras_socket_t sd)
419 if (sd->plugin->flush)
420 (sd->plugin->flush) (sd);
423 gras_trp_plugin_t gras_trp_plugin_get_by_name(const char *name)
425 return xbt_dict_get(_gras_trp_plugins, name);
428 int gras_socket_my_port(gras_socket_t sock)
430 if (!sock->plugin->my_port)
431 THROWF(unknown_error,0,"Function my_port unimplemented in plugin %s",sock->plugin->name);
432 return (*sock->plugin->my_port)(sock);
436 int gras_socket_peer_port(gras_socket_t sock)
438 if (!sock->plugin->peer_port)
439 THROWF(unknown_error,0,"Function peer_port unimplemented in plugin %s",sock->plugin->name);
440 return (*sock->plugin->peer_port)(sock);
443 const char *gras_socket_peer_name(gras_socket_t sock)
445 xbt_assert(sock->plugin);
446 return (*sock->plugin->peer_name)(sock);
449 const char *gras_socket_peer_proc(gras_socket_t sock)
451 return (*sock->plugin->peer_proc)(sock);
454 void gras_socket_peer_proc_set(gras_socket_t sock, char *peer_proc)
456 return (*sock->plugin->peer_proc_set)(sock,peer_proc);
459 /** \brief Check if the provided socket is a measurement one (or a regular one) */
460 int gras_socket_is_meas(gras_socket_t sock)
465 /** \brief Send a chunk of (random) data over a measurement socket
467 * @param peer measurement socket to use for the experiment
468 * @param timeout timeout (in seconds)
469 * @param msg_size size of each chunk sent over the socket (in bytes).
470 * @param msg_amount how many of these packets you want to send.
472 * Calls to gras_socket_meas_send() and gras_socket_meas_recv() on
473 * each side of the socket should be paired.
475 * The exchanged data is zeroed to make sure it's initialized, but
476 * there is no way to control what is sent (ie, you cannot use these
477 * functions to exchange data out of band).
479 * @warning: in SimGrid version 3.1 and previous, the numerical arguments
480 * were the total amount of data to send and the msg_size. This
481 * was changed for the fool wanting to send more than MAXINT
482 * bytes in a fat pipe.
484 void gras_socket_meas_send(gras_socket_t peer,
485 unsigned int timeout,
486 unsigned long int msg_size,
487 unsigned long int msg_amount)
490 unsigned long int sent_sofar;
493 THROWF(unknown_error,0,"measurement sockets were broken in this release of SimGrid and should be ported back in the future."
494 "If you depend on it, sorry, you have to use an older version, or wait for the future version using it...");
496 chunk = xbt_malloc0(msg_size);
498 xbt_assert(peer->meas,
499 "Asked to send measurement data on a regular socket");
500 xbt_assert(peer->outgoing,
501 "Socket not suited for data send (was created with gras_socket_server(), not gras_socket_client())");
503 for (sent_sofar = 0; sent_sofar < msg_amount; sent_sofar++) {
504 XBT_CDEBUG(gras_trp_meas,
505 "Sent %lu msgs of %lu (size of each: %ld) to %s:%d",
506 sent_sofar, msg_amount, msg_size, gras_socket_peer_name(peer),
507 gras_socket_peer_port(peer));
508 (*peer->plugin->raw_send) (peer, chunk, msg_size);
510 XBT_CDEBUG(gras_trp_meas,
511 "Sent %lu msgs of %lu (size of each: %ld) to %s:%d", sent_sofar,
512 msg_amount, msg_size, gras_socket_peer_name(peer),
513 gras_socket_peer_port(peer));
521 /** \brief Receive a chunk of data over a measurement socket
523 * Calls to gras_socket_meas_send() and gras_socket_meas_recv() on
524 * each side of the socket should be paired.
526 * @warning: in SimGrid version 3.1 and previous, the numerical arguments
527 * were the total amount of data to send and the msg_size. This
528 * was changed for the fool wanting to send more than MAXINT
529 * bytes in a fat pipe.
531 void gras_socket_meas_recv(gras_socket_t peer,
532 unsigned int timeout,
533 unsigned long int msg_size,
534 unsigned long int msg_amount)
538 unsigned long int got_sofar;
541 THROWF(unknown_error,0,"measurement sockets were broken in this release of SimGrid and should be ported back in the future."
542 "If you depend on it, sorry, you have to use an older version, or wait for the future version using it...");
545 chunk = xbt_malloc(msg_size);
547 xbt_assert(peer->meas,
548 "Asked to receive measurement data on a regular socket");
549 xbt_assert(peer->incoming, "Socket not suited for data receive");
551 for (got_sofar = 0; got_sofar < msg_amount; got_sofar++) {
552 XBT_CDEBUG(gras_trp_meas,
553 "Recvd %ld msgs of %lu (size of each: %ld) from %s:%d",
554 got_sofar, msg_amount, msg_size, gras_socket_peer_name(peer),
555 gras_socket_peer_port(peer));
556 (peer->plugin->raw_recv) (peer, chunk, msg_size);
558 XBT_CDEBUG(gras_trp_meas,
559 "Recvd %ld msgs of %lu (size of each: %ld) from %s:%d",
560 got_sofar, msg_amount, msg_size, gras_socket_peer_name(peer),
561 gras_socket_peer_port(peer));
569 * \brief Something similar to the good old accept system call.
571 * Make sure that there is someone speaking to the provided server socket.
572 * In RL, it does an accept(2) and return the result as last argument.
573 * In SG, as accepts are useless, it returns the provided argument as result.
574 * You should thus test whether (peer != accepted) before closing both of them.
576 * You should only call this on measurement sockets. It is automatically
577 * done for regular sockets, but you usually want more control about
578 * what's going on with measurement sockets.
580 gras_socket_t gras_socket_meas_accept(gras_socket_t peer)
583 THROWF(unknown_error,0,"measurement sockets were broken in this release of SimGrid and should be ported back in the future."
584 "If you depend on it, sorry, you have to use an older version, or wait for the future version using it...");
586 xbt_assert(peer->meas,
587 "No need to accept on non-measurement sockets (it's automatic)");
589 if (!peer->accepting) {
590 /* nothing to accept here (must be in SG) */
591 /* BUG: FIXME: this is BAD! it makes tricky to free the accepted socket */
595 res = (peer->plugin->socket_accept) (peer);
596 res->meas = peer->meas;
597 XBT_CDEBUG(gras_trp_meas, "meas_accepted onto %d", res->sd);
604 * Creating procdata for this module
606 static void *gras_trp_procdata_new(void)
608 gras_trp_procdata_t res = xbt_new(s_gras_trp_procdata_t, 1);
610 res->name = xbt_strdup("gras_trp");
612 res->sockets = xbt_dynar_new_sync(sizeof(gras_socket_t *), NULL);
619 * Freeing procdata for this module
621 static void gras_trp_procdata_free(void *data)
623 gras_trp_procdata_t res = (gras_trp_procdata_t) data;
625 xbt_dynar_free(&(res->sockets));
630 void gras_trp_socketset_dump(const char *name)
632 gras_trp_procdata_t procdata =
633 (gras_trp_procdata_t) gras_libdata_by_id(gras_trp_libdata_id);
638 XBT_INFO("** Dump the socket set %s", name);
639 xbt_dynar_foreach(procdata->sockets, it, s) {
640 XBT_INFO(" %p -> %s:%d %s",
641 s, gras_socket_peer_name(s), gras_socket_peer_port(s),
642 s->valid ? "(valid)" : "(peer dead)");
644 XBT_INFO("** End of socket set %s", name);
648 * Module registration
650 int gras_trp_libdata_id;
651 void gras_trp_register()
653 gras_trp_libdata_id =
654 gras_procdata_add("gras_trp", gras_trp_procdata_new,
655 gras_trp_procdata_free);
658 int gras_os_myport(void)
660 return ((gras_trp_procdata_t)
661 gras_libdata_by_id(gras_trp_libdata_id))->myport;