1 /* Copyright (c) 2012-2023. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
9 XBT_LOG_NEW_DEFAULT_CATEGORY(chainsend_peer, "Messages specific for the peer");
11 static void peer_join_chain(peer_t p)
13 chain_message_t msg = (chain_message_t)sg_mailbox_get(p->me);
16 p->total_pieces = msg->num_pieces;
17 XBT_DEBUG("Peer %s got a 'BUILD_CHAIN' message (prev: %s / next: %s)", sg_mailbox_get_name(p->me),
18 p->prev ? sg_mailbox_get_name(p->prev) : NULL, p->next ? sg_mailbox_get_name(p->next) : NULL);
22 static void peer_forward_file(peer_t p)
26 size_t nb_pending_sends = 0;
27 size_t nb_pending_recvs = 0;
30 p->pending_recvs[nb_pending_recvs] = sg_mailbox_get_async(p->me, &received);
33 ssize_t idx = sg_comm_wait_any(p->pending_recvs, nb_pending_recvs);
35 XBT_DEBUG("Peer %s got a 'SEND_DATA' message", sg_mailbox_get_name(p->me));
36 /* move the last pending comm where the finished one was, and decrement */
37 p->pending_recvs[idx] = p->pending_recvs[--nb_pending_recvs];
39 if (p->next != NULL) {
40 XBT_DEBUG("Sending %s (asynchronously) from %s to %s", (char*)received, sg_mailbox_get_name(p->me),
41 sg_mailbox_get_name(p->next));
42 sg_comm_t send = sg_mailbox_put_async(p->next, received, MESSAGE_SEND_DATA_HEADER_SIZE + PIECE_SIZE);
43 p->pending_sends[nb_pending_sends] = send;
49 p->received_bytes += PIECE_SIZE;
50 XBT_DEBUG("%u pieces received, %llu bytes received", p->received_pieces, p->received_bytes);
51 if (p->received_pieces >= p->total_pieces) {
56 sg_comm_wait_all(p->pending_sends, nb_pending_sends);
59 static peer_t peer_init(int argc, char* argv[])
61 peer_t p = xbt_malloc(sizeof(s_peer_t));
64 p->received_pieces = 0;
65 p->received_bytes = 0;
66 p->pending_recvs = xbt_malloc(sizeof(sg_comm_t) * MAX_PENDING_COMMS);
67 p->pending_sends = xbt_malloc(sizeof(sg_comm_t) * MAX_PENDING_COMMS);
69 p->me = sg_mailbox_by_name(sg_host_self_get_name());
74 static void peer_delete(peer_t p)
76 xbt_free(p->pending_recvs);
77 xbt_free(p->pending_sends);
82 void peer(int argc, char* argv[])
86 peer_t p = peer_init(argc, argv);
87 double start_time = simgrid_get_clock();
90 double end_time = simgrid_get_clock();
92 XBT_INFO("### %f %llu bytes (Avg %f MB/s); copy finished (simulated).", end_time - start_time, p->received_bytes,
93 p->received_bytes / 1024.0 / 1024.0 / (end_time - start_time));