X-Git-Url: http://bilbo.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/3d84113e2eb8d578a6838d21f84f55d9e77c65d4..59823017c3f9f3dba0ef09292bb55ed9ab906254:/examples/msg/bittorrent/peer.c diff --git a/examples/msg/bittorrent/peer.c b/examples/msg/bittorrent/peer.c index acafbc7e7f..2a9db33a5c 100644 --- a/examples/msg/bittorrent/peer.c +++ b/examples/msg/bittorrent/peer.c @@ -15,11 +15,14 @@ XBT_LOG_NEW_DEFAULT_CATEGORY(msg_peers, "Messages specific for the peers"); //TODO: Let users change this /* * File transfered data + * + * File size: 10 pieces * 5 blocks/piece * 16384 bytes/block = 819200 bytes */ -static int FILE_SIZE = 5120; +static int FILE_SIZE = 10 * 5 * 16384; static int FILE_PIECES = 10; static int PIECES_BLOCKS = 5; +static int BLOCK_SIZE = 16384; static int BLOCKS_REQUESTED = 2; /** @@ -96,6 +99,7 @@ void leech_loop(peer_t peer, double deadline) handle_message(peer, peer->task_received); } } else { + handle_pending_sends(peer); if (peer->current_piece != -1) { send_interested_to_peers(peer); } else { @@ -235,10 +239,12 @@ void peer_init(peer_t peer, int id, int seed) peer->current_pieces = xbt_dynar_new(sizeof(int), NULL); peer->current_piece = -1; - peer->stream = RngStream_CreateStream(""); + peer->stream = MSG_host_get_data(MSG_host_self()); peer->comm_received = NULL; peer->round = 0; + + peer->pending_sends = xbt_dynar_new(sizeof(msg_comm_t), NULL); } /** @@ -255,11 +261,10 @@ void peer_free(peer_t peer) xbt_dict_free(&peer->peers); xbt_dict_free(&peer->active_peers); xbt_dynar_free(&peer->current_pieces); + xbt_dynar_free(&peer->pending_sends); xbt_free(peer->pieces_count); xbt_free(peer->bitfield); xbt_free(peer->bitfield_blocks); - - RngStream_DeleteStream(&peer->stream); } /** @@ -271,6 +276,29 @@ int has_finished(char *bitfield) return ((memchr(bitfield, '0', sizeof(char) * FILE_PIECES) == NULL) ? 1 : 0); } +/** + * Handle pending sends and remove those which are done + * @param peer Peer data + */ +void handle_pending_sends(peer_t peer) +{ + int index; + + while ((index = MSG_comm_testany(peer->pending_sends)) != -1) { + msg_comm_t comm_send = xbt_dynar_get_as(peer->pending_sends, index, msg_comm_t); + int status = MSG_comm_get_status(comm_send); + xbt_dynar_remove_at(peer->pending_sends, index, &comm_send); + XBT_DEBUG("Communication %p is finished with status %d, dynar size is now %lu", comm_send, status, xbt_dynar_length(peer->pending_sends)); + + msg_task_t task = MSG_comm_get_task(comm_send); + MSG_comm_destroy(comm_send); + + if (status != MSG_OK) { + task_message_free(task); + } + } +} + /** * Handle a received message sent by another peer * @param peer Peer data @@ -425,6 +453,8 @@ void handle_message(peer_t peer, msg_task_t task) } } break; + case MESSAGE_CANCEL: + break; } //Update the peer speed. if (remote_peer) { @@ -726,7 +756,7 @@ void send_interested_to_peers(peer_t peer) connection->am_interested = 1; msg_task_t task = task_message_new(MESSAGE_INTERESTED, peer->hostname, peer->mailbox, - peer->id); + peer->id, task_message_size(MESSAGE_INTERESTED)); MSG_task_dsend(task, connection->mailbox, task_message_free); XBT_DEBUG("Send INTERESTED to %s", connection->mailbox); } @@ -744,7 +774,7 @@ void send_interested(peer_t peer, const char *mailbox) { msg_task_t task = task_message_new(MESSAGE_INTERESTED, peer->hostname, peer->mailbox, - peer->id); + peer->id, task_message_size(MESSAGE_INTERESTED)); MSG_task_dsend(task, mailbox, task_message_free); XBT_DEBUG("Sending INTERESTED to %s", mailbox); @@ -759,7 +789,7 @@ void send_notinterested(peer_t peer, const char *mailbox) { msg_task_t task = task_message_new(MESSAGE_NOTINTERESTED, peer->hostname, peer->mailbox, - peer->id); + peer->id, task_message_size(MESSAGE_NOTINTERESTED)); MSG_task_dsend(task, mailbox, task_message_free); XBT_DEBUG("Sending NOTINTERESTED to %s", mailbox); @@ -777,7 +807,7 @@ void send_handshake_all(peer_t peer) xbt_dict_foreach(peer->peers, cursor, key, remote_peer) { msg_task_t task = task_message_new(MESSAGE_HANDSHAKE, peer->hostname, peer->mailbox, - peer->id); + peer->id, task_message_size(MESSAGE_HANDSHAKE)); MSG_task_dsend(task, remote_peer->mailbox, task_message_free); XBT_DEBUG("Sending a HANDSHAKE to %s", remote_peer->mailbox); } @@ -792,7 +822,7 @@ void send_handshake(peer_t peer, const char *mailbox) { msg_task_t task = task_message_new(MESSAGE_HANDSHAKE, peer->hostname, peer->mailbox, - peer->id); + peer->id, task_message_size(MESSAGE_HANDSHAKE)); MSG_task_dsend(task, mailbox, task_message_free); XBT_DEBUG("Sending a HANDSHAKE to %s", mailbox); } @@ -804,7 +834,8 @@ void send_choked(peer_t peer, const char *mailbox) { XBT_DEBUG("Sending a CHOKE to %s", mailbox); msg_task_t task = - task_message_new(MESSAGE_CHOKE, peer->hostname, peer->mailbox, peer->id); + task_message_new(MESSAGE_CHOKE, peer->hostname, peer->mailbox, + peer->id, task_message_size(MESSAGE_CHOKE)); MSG_task_dsend(task, mailbox, task_message_free); } @@ -816,7 +847,7 @@ void send_unchoked(peer_t peer, const char *mailbox) XBT_DEBUG("Sending a UNCHOKE to %s", mailbox); msg_task_t task = task_message_new(MESSAGE_UNCHOKE, peer->hostname, peer->mailbox, - peer->id); + peer->id, task_message_size(MESSAGE_UNCHOKE)); MSG_task_dsend(task, mailbox, task_message_free); } @@ -832,7 +863,7 @@ void send_have(peer_t peer, int piece) xbt_dict_foreach(peer->peers, cursor, key, remote_peer) { msg_task_t task = task_message_index_new(MESSAGE_HAVE, peer->hostname, peer->mailbox, - peer->id, piece); + peer->id, piece, task_message_size(MESSAGE_HAVE)); MSG_task_dsend(task, remote_peer->mailbox, task_message_free); } } @@ -846,8 +877,10 @@ void send_bitfield(peer_t peer, const char *mailbox) XBT_DEBUG("Sending a BITFIELD to %s", mailbox); msg_task_t task = task_message_bitfield_new(peer->hostname, peer->mailbox, peer->id, - peer->bitfield); - MSG_task_dsend(task, mailbox, task_message_free); + peer->bitfield, FILE_PIECES); + //Async send and append to pending sends + msg_comm_t comm = MSG_task_isend(task, mailbox); + xbt_dynar_push(peer->pending_sends, &comm); } /** @@ -877,7 +910,7 @@ void send_piece(peer_t peer, const char *mailbox, int piece, int stalled, "Tried to send a piece that we doesn't have."); msg_task_t task = task_message_piece_new(peer->hostname, peer->mailbox, peer->id, piece, - stalled, block_index, block_length); + stalled, block_index, block_length, BLOCK_SIZE); MSG_task_dsend(task, mailbox, task_message_free); }