X-Git-Url: http://bilbo.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/5fa8d6780a107eb5ae1ff8a233b0dd23cb065f8a..59823017c3f9f3dba0ef09292bb55ed9ab906254:/examples/msg/bittorrent/peer.c diff --git a/examples/msg/bittorrent/peer.c b/examples/msg/bittorrent/peer.c index baa8bc1672..2a9db33a5c 100644 --- a/examples/msg/bittorrent/peer.c +++ b/examples/msg/bittorrent/peer.c @@ -99,6 +99,7 @@ void leech_loop(peer_t peer, double deadline) handle_message(peer, peer->task_received); } } else { + handle_pending_sends(peer); if (peer->current_piece != -1) { send_interested_to_peers(peer); } else { @@ -238,10 +239,12 @@ void peer_init(peer_t peer, int id, int seed) peer->current_pieces = xbt_dynar_new(sizeof(int), NULL); peer->current_piece = -1; - peer->stream = RngStream_CreateStream(""); + peer->stream = MSG_host_get_data(MSG_host_self()); peer->comm_received = NULL; peer->round = 0; + + peer->pending_sends = xbt_dynar_new(sizeof(msg_comm_t), NULL); } /** @@ -258,11 +261,10 @@ void peer_free(peer_t peer) xbt_dict_free(&peer->peers); xbt_dict_free(&peer->active_peers); xbt_dynar_free(&peer->current_pieces); + xbt_dynar_free(&peer->pending_sends); xbt_free(peer->pieces_count); xbt_free(peer->bitfield); xbt_free(peer->bitfield_blocks); - - RngStream_DeleteStream(&peer->stream); } /** @@ -274,6 +276,29 @@ int has_finished(char *bitfield) return ((memchr(bitfield, '0', sizeof(char) * FILE_PIECES) == NULL) ? 1 : 0); } +/** + * Handle pending sends and remove those which are done + * @param peer Peer data + */ +void handle_pending_sends(peer_t peer) +{ + int index; + + while ((index = MSG_comm_testany(peer->pending_sends)) != -1) { + msg_comm_t comm_send = xbt_dynar_get_as(peer->pending_sends, index, msg_comm_t); + int status = MSG_comm_get_status(comm_send); + xbt_dynar_remove_at(peer->pending_sends, index, &comm_send); + XBT_DEBUG("Communication %p is finished with status %d, dynar size is now %lu", comm_send, status, xbt_dynar_length(peer->pending_sends)); + + msg_task_t task = MSG_comm_get_task(comm_send); + MSG_comm_destroy(comm_send); + + if (status != MSG_OK) { + task_message_free(task); + } + } +} + /** * Handle a received message sent by another peer * @param peer Peer data @@ -428,6 +453,8 @@ void handle_message(peer_t peer, msg_task_t task) } } break; + case MESSAGE_CANCEL: + break; } //Update the peer speed. if (remote_peer) { @@ -851,7 +878,9 @@ void send_bitfield(peer_t peer, const char *mailbox) msg_task_t task = task_message_bitfield_new(peer->hostname, peer->mailbox, peer->id, peer->bitfield, FILE_PIECES); - MSG_task_dsend(task, mailbox, task_message_free); + //Async send and append to pending sends + msg_comm_t comm = MSG_task_isend(task, mailbox); + xbt_dynar_push(peer->pending_sends, &comm); } /**