include examples/c/actor-yield/actor-yield.c
include examples/c/actor-yield/actor-yield.tesh
include examples/c/actor-yield/actor-yield_d.xml
+include examples/c/app-chainsend/app-chainsend.tesh
+include examples/c/app-chainsend/app-chainsend_d.xml
+include examples/c/app-chainsend/broadcaster.c
+include examples/c/app-chainsend/chainsend.c
+include examples/c/app-chainsend/chainsend.h
+include examples/c/app-chainsend/peer.c
include examples/c/app-pingpong/app-pingpong.c
include examples/c/app-pingpong/app-pingpong.tesh
include examples/c/app-pingpong/app-pingpong_d.xml
include teshsuite/msg/app-bittorrent/generate.py
include teshsuite/msg/app-bittorrent/tracker.c
include teshsuite/msg/app-bittorrent/tracker.h
-include teshsuite/msg/app-chainsend/app-chainsend.tesh
-include teshsuite/msg/app-chainsend/app-chainsend_d.xml
-include teshsuite/msg/app-chainsend/broadcaster.c
-include teshsuite/msg/app-chainsend/broadcaster.h
-include teshsuite/msg/app-chainsend/chainsend.c
-include teshsuite/msg/app-chainsend/common.c
-include teshsuite/msg/app-chainsend/common.h
-include teshsuite/msg/app-chainsend/iterator.c
-include teshsuite/msg/app-chainsend/iterator.h
-include teshsuite/msg/app-chainsend/messages.c
-include teshsuite/msg/app-chainsend/messages.h
-include teshsuite/msg/app-chainsend/peer.c
-include teshsuite/msg/app-chainsend/peer.h
include teshsuite/msg/async-wait/async-wait.c
include teshsuite/msg/async-wait/async-wait.tesh
include teshsuite/msg/async-wait/async-wait2_d.xml
+# Regular examples: with only one source and tested with all factories
+######################################################################
+
foreach(x
actor-create actor-daemon actor-exiting actor-join actor-kill actor-migrate actor-suspend actor-yield
app-pingpong app-token-ring
set(teshsuite_src ${teshsuite_src} ${CMAKE_CURRENT_SOURCE_DIR}/${x}/${x}.c)
endforeach()
+# Multi-files examples
+######################
+# Chainsend example
+
+add_executable (app-chainsend-c EXCLUDE_FROM_ALL app-chainsend/chainsend.c app-chainsend/broadcaster.c
+ app-chainsend/peer.c)
+target_link_libraries(app-chainsend-c simgrid)
+set_target_properties(app-chainsend-c PROPERTIES RUNTIME_OUTPUT_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/app-chainsend)
+add_dependencies(tests app-chainsend-c)
+
+foreach (file chainsend broadcaster peer)
+ set(teshsuite_src ${teshsuite_src} ${CMAKE_CURRENT_SOURCE_DIR}/app-chainsend/${file}.c)
+endforeach()
+set(teshsuite_src ${teshsuite_src} ${CMAKE_CURRENT_SOURCE_DIR}/app-chainsend/chainsend.h)
+
+# Add all extra files to the archive
+####################################
+
set(teshsuite_src ${teshsuite_src} PARENT_SCOPE)
-set(tesh_files ${tesh_files} PARENT_SCOPE)
+set(tesh_files ${tesh_files} ${CMAKE_CURRENT_SOURCE_DIR}/app-chainsend/app-chainsend.tesh
+ PARENT_SCOPE)
set(xml_files ${xml_files} ${CMAKE_CURRENT_SOURCE_DIR}/actor-create/actor-create_d.xml
${CMAKE_CURRENT_SOURCE_DIR}/actor-yield/actor-yield_d.xml
+ ${CMAKE_CURRENT_SOURCE_DIR}/app-chainsend/app-chainsend_d.xml
${CMAKE_CURRENT_SOURCE_DIR}/app-pingpong/app-pingpong_d.xml
${CMAKE_CURRENT_SOURCE_DIR}/async-waitall/async-waitall_d.xml
${CMAKE_CURRENT_SOURCE_DIR}/async-waitany/async-waitany_d.xml
foreach(x
actor-create actor-daemon actor-exiting actor-join actor-kill actor-migrate actor-suspend actor-yield
- app-pingpong app-token-ring
+ app-chainsend app-pingpong app-token-ring
async-waitall async-waitany
cloud-capping cloud-simple
energy-exec
#!/usr/bin/env tesh
-p Testing the chainsend MSG implementation
-
! timeout 60
! output sort 19
-$ ${bindir:=.}/chainsend ${platfdir}/cluster_backbone.xml app-chainsend_d.xml "--log=root.fmt:[%12.6r]%e(%i:%P@%h)%e%m%n"
-> [ 2.214423] (2:peer@node-1.simgrid.org) ### 2.214423 16777216 bytes (Avg 7.225359 MB/s); copy finished (simulated).
+$ ${bindir:=.}/app-chainsend-c ${platfdir}/cluster_backbone.xml app-chainsend_d.xml "--log=root.fmt:[%12.6r]%e(%i:%P@%h)%e%m%n"
+> [ 2.214423] (2:peer@node-1.simgrid.org) ### 2.214423 16777216 bytes (Avg 7.225360 MB/s); copy finished (simulated).
> [ 2.222796] (3:peer@node-2.simgrid.org) ### 2.222796 16777216 bytes (Avg 7.198141 MB/s); copy finished (simulated).
-> [ 2.231170] (4:peer@node-3.simgrid.org) ### 2.231170 16777216 bytes (Avg 7.171126 MB/s); copy finished (simulated).
+> [ 2.231170] (4:peer@node-3.simgrid.org) ### 2.231170 16777216 bytes (Avg 7.171127 MB/s); copy finished (simulated).
> [ 2.239543] (5:peer@node-4.simgrid.org) ### 2.239543 16777216 bytes (Avg 7.144314 MB/s); copy finished (simulated).
> [ 2.247917] (6:peer@node-5.simgrid.org) ### 2.247917 16777216 bytes (Avg 7.117701 MB/s); copy finished (simulated).
> [ 2.256290] (7:peer@node-6.simgrid.org) ### 2.256290 16777216 bytes (Avg 7.091286 MB/s); copy finished (simulated).
--- /dev/null
+/* Copyright (c) 2012-2020. The SimGrid Team. All rights reserved. */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+#include "chainsend.h"
+
+XBT_LOG_NEW_DEFAULT_CATEGORY(broadcaster, "Messages specific for the broadcaster");
+
+static chain_message_t chain_message_new(sg_mailbox_t prev, sg_mailbox_t next, const unsigned int num_pieces)
+{
+ chain_message_t msg = (chain_message_t)malloc(sizeof(s_chain_message_t));
+ msg->prev_ = prev;
+ msg->next_ = next;
+ msg->num_pieces = num_pieces;
+
+ return msg;
+}
+
+static void broadcaster_build_chain(broadcaster_t bc)
+{
+ /* Build the chain if there's at least one peer */
+ if (bc->host_count > 0)
+ bc->first = bc->mailboxes[0];
+
+ for (unsigned i = 0; i < bc->host_count; i++) {
+ sg_mailbox_t prev = i > 0 ? bc->mailboxes[i - 1] : NULL;
+ sg_mailbox_t next = i < bc->host_count - 1 ? bc->mailboxes[i + 1] : NULL;
+ XBT_DEBUG("Building chain--broadcaster:\"%s\" dest:\"%s\" prev:\"%s\" next:\"%s\"", sg_host_self_get_name(),
+ sg_mailbox_get_name(bc->mailboxes[i]), prev ? sg_mailbox_get_name(prev) : NULL,
+ next ? sg_mailbox_get_name(next) : NULL);
+ /* Send message to current peer */
+ sg_mailbox_put(bc->mailboxes[i], chain_message_new(prev, next, bc->piece_count), MESSAGE_BUILD_CHAIN_SIZE);
+ }
+}
+
+static void broadcaster_send_file(broadcaster_t bc)
+{
+ int nb_pending_sends = 0;
+
+ for (unsigned int current_piece = 0; current_piece < bc->piece_count; current_piece++) {
+ XBT_DEBUG("Sending (send) piece %u from %s into mailbox %s", current_piece, sg_host_self_get_name(),
+ sg_mailbox_get_name(bc->first));
+ char* file_piece = bprintf("piece-%u", current_piece);
+ sg_comm_t comm = sg_mailbox_put_async(bc->first, file_piece, MESSAGE_SEND_DATA_HEADER_SIZE + PIECE_SIZE);
+ bc->pending_sends[nb_pending_sends++] = comm;
+ }
+ sg_comm_wait_all(bc->pending_sends, nb_pending_sends);
+}
+
+static broadcaster_t broadcaster_init(sg_mailbox_t* mailboxes, unsigned int host_count, unsigned int piece_count)
+{
+ broadcaster_t bc = (broadcaster_t)malloc(sizeof(s_broadcaster_t));
+
+ bc->first = NULL;
+ bc->host_count = host_count;
+ bc->piece_count = piece_count;
+ bc->mailboxes = mailboxes;
+ bc->pending_sends = (sg_comm_t*)malloc(sizeof(sg_comm_t) * MAX_PENDING_COMMS);
+
+ broadcaster_build_chain(bc);
+
+ return bc;
+}
+
+static void broadcaster_destroy(broadcaster_t bc)
+{
+ free(bc->pending_sends);
+ free(bc->mailboxes);
+ free(bc);
+}
+
+/** Emitter function */
+void broadcaster(int argc, char* argv[])
+{
+ XBT_DEBUG("broadcaster");
+
+ unsigned int host_count = xbt_str_parse_int(argv[1], "Invalid number of peers: %s");
+
+ sg_mailbox_t* mailboxes = (sg_mailbox_t*)malloc(sizeof(sg_mailbox_t) * host_count);
+
+ for (unsigned int i = 1; i <= host_count; i++) {
+ char* name = bprintf("node-%u.simgrid.org", i);
+ XBT_DEBUG("%s", name);
+ mailboxes[i - 1] = sg_mailbox_by_name(name);
+ free(name);
+ }
+
+ unsigned int piece_count = xbt_str_parse_int(argv[2], "Invalid number of pieces: %s");
+
+ broadcaster_t bc = broadcaster_init(mailboxes, host_count, piece_count);
+
+ broadcaster_send_file(bc);
+
+ broadcaster_destroy(bc);
+}
/* This program is free software; you can redistribute it and/or modify it
* under the terms of the license (GNU LGPL) which comes with this package. */
-#include "broadcaster.h"
-#include "iterator.h"
-#include "messages.h"
-#include "peer.h"
-#include "simgrid/msg.h"
+#include "chainsend.h"
-/** @addtogroup MSG_examples
- *
- * - <b>chainsend: MSG implementation of a file broadcasting system, similar to Kastafior (from Kadeploy).</b>.
- */
-
-XBT_LOG_NEW_DEFAULT_CATEGORY(msg_chainsend, "Messages specific for chainsend");
+XBT_LOG_NEW_DEFAULT_CATEGORY(chainsend, "Messages specific for chainsend");
int main(int argc, char* argv[])
{
- MSG_init(&argc, argv);
+ simgrid_init(&argc, argv);
- MSG_create_environment(argv[1]);
+ simgrid_load_platform(argv[1]);
/* Trace categories */
TRACE_category_with_color("host0", "0 0 1");
TRACE_category_with_color("host8", "0 1 0");
/* Application deployment */
- MSG_function_register("broadcaster", broadcaster);
- MSG_function_register("peer", peer);
-
- MSG_launch_application(argv[2]);
+ simgrid_register_function("broadcaster", broadcaster);
+ simgrid_register_function("peer", peer);
- msg_error_t res = MSG_main();
+ simgrid_load_deployment(argv[2]);
- XBT_INFO("Total simulation time: %e", MSG_get_clock());
+ simgrid_run();
+ XBT_INFO("Total simulation time: %e", simgrid_get_clock());
- return res != MSG_OK;
+ return 0;
}
--- /dev/null
+/* Copyright (c) 2012-2020. The SimGrid Team.
+ * All rights reserved. */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+#ifndef CHAINSEND_H
+#define CHAINSEND_H
+
+#include "simgrid/actor.h"
+#include "simgrid/comm.h"
+#include "simgrid/engine.h"
+#include "simgrid/host.h"
+#include "simgrid/instr.h"
+#include "simgrid/mailbox.h"
+
+#include "xbt/log.h"
+#include "xbt/str.h"
+
+#include <stdlib.h>
+
+/* Connection parameters */
+#define MAX_PENDING_COMMS 256
+#define PIECE_SIZE 65536
+#define MESSAGE_BUILD_CHAIN_SIZE 40
+#define MESSAGE_SEND_DATA_HEADER_SIZE 1
+
+/* Broadcaster struct */
+typedef struct s_broadcaster {
+ unsigned int host_count;
+ int piece_count;
+ sg_mailbox_t first;
+ sg_mailbox_t* mailboxes;
+ sg_comm_t* pending_sends;
+} s_broadcaster_t;
+
+typedef s_broadcaster_t* broadcaster_t;
+void broadcaster(int argc, char* argv[]);
+
+/* Message struct */
+typedef struct s_chain_message {
+ sg_mailbox_t prev_;
+ sg_mailbox_t next_;
+ unsigned int num_pieces;
+} s_chain_message_t;
+
+typedef s_chain_message_t* chain_message_t;
+
+/* Peer struct */
+typedef struct s_peer {
+ sg_mailbox_t prev;
+ sg_mailbox_t next;
+ sg_mailbox_t me;
+ unsigned long long received_bytes;
+ unsigned int received_pieces;
+ unsigned int total_pieces;
+ sg_comm_t* pending_recvs;
+ sg_comm_t* pending_sends;
+} s_peer_t;
+
+typedef s_peer_t* peer_t;
+void peer(int argc, char* argv[]);
+
+#endif /* CHAINSEND_H */
--- /dev/null
+/* Copyright (c) 2012-2020. The SimGrid Team.
+ * All rights reserved. */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+#include "chainsend.h"
+
+XBT_LOG_NEW_DEFAULT_CATEGORY(chainsend_peer, "Messages specific for the peer");
+
+static void peer_join_chain(peer_t p)
+{
+ const chain_message_t msg = (chain_message_t)sg_mailbox_get(p->me);
+ p->prev = msg->prev_;
+ p->next = msg->next_;
+ p->total_pieces = msg->num_pieces;
+ XBT_DEBUG("Peer %s got a 'BUILD_CHAIN' message (prev: %s / next: %s)", sg_mailbox_get_name(p->me),
+ p->prev ? sg_mailbox_get_name(p->prev) : NULL, p->next ? sg_mailbox_get_name(p->next) : NULL);
+ free(msg);
+}
+
+static void peer_forward_file(const peer_t p)
+{
+ void* received;
+ int done = 0;
+ size_t nb_pending_sends = 0;
+ size_t nb_pending_recvs = 0;
+
+ while (!done) {
+ sg_comm_t comm = sg_mailbox_get_async(p->me, &received);
+ p->pending_recvs[nb_pending_recvs] = comm;
+ nb_pending_recvs++;
+
+ int idx = sg_comm_wait_any(p->pending_recvs, nb_pending_recvs);
+ if (idx != -1) {
+ comm = p->pending_recvs[idx];
+ XBT_DEBUG("Peer %s got a 'SEND_DATA' message", sg_mailbox_get_name(p->me));
+ /* move the last pending comm where the finished one was, and decrement */
+ p->pending_recvs[idx] = p->pending_recvs[--nb_pending_recvs];
+
+ if (p->next != NULL) {
+ XBT_DEBUG("Sending %s (asynchronously) from %s to %s", (char*)received, sg_mailbox_get_name(p->me),
+ sg_mailbox_get_name(p->next));
+ sg_comm_t send = sg_mailbox_put_async(p->next, received, MESSAGE_SEND_DATA_HEADER_SIZE + PIECE_SIZE);
+ p->pending_sends[nb_pending_sends] = send;
+ nb_pending_sends++;
+ } else
+ free(received);
+
+ p->received_pieces++;
+ p->received_bytes += PIECE_SIZE;
+ XBT_DEBUG("%u pieces received, %llu bytes received", p->received_pieces, p->received_bytes);
+ if (p->received_pieces >= p->total_pieces) {
+ done = 1;
+ }
+ }
+ }
+ sg_comm_wait_all(p->pending_sends, nb_pending_sends);
+}
+
+static peer_t peer_init(int argc, char* argv[])
+{
+ peer_t p = (peer_t)malloc(sizeof(s_peer_t));
+ p->prev = NULL;
+ p->next = NULL;
+ p->received_pieces = 0;
+ p->received_bytes = 0;
+ p->pending_recvs = (sg_comm_t*)malloc(sizeof(sg_comm_t) * MAX_PENDING_COMMS);
+ p->pending_sends = (sg_comm_t*)malloc(sizeof(sg_comm_t) * MAX_PENDING_COMMS);
+
+ p->me = sg_mailbox_by_name(sg_host_self_get_name());
+
+ return p;
+}
+
+static void peer_delete(peer_t p)
+{
+ free(p->pending_recvs);
+ free(p->pending_sends);
+
+ free(p);
+}
+
+void peer(int argc, char* argv[])
+{
+ XBT_DEBUG("peer");
+
+ peer_t p = peer_init(argc, argv);
+ double start_time = simgrid_get_clock();
+ peer_join_chain(p);
+ peer_forward_file(p);
+ double end_time = simgrid_get_clock();
+
+ XBT_INFO("### %f %llu bytes (Avg %f MB/s); copy finished (simulated).", end_time - start_time, p->received_bytes,
+ p->received_bytes / 1024.0 / 1024.0 / (end_time - start_time));
+
+ peer_delete(p);
+}
set(teshsuite_src ${teshsuite_src} ${CMAKE_CURRENT_SOURCE_DIR}/app-bittorrent/${file}.c ${CMAKE_CURRENT_SOURCE_DIR}/app-bittorrent/${file}.h)
endforeach()
-if(enable_msg)
- add_executable (chainsend EXCLUDE_FROM_ALL app-chainsend/chainsend.c app-chainsend/iterator.c app-chainsend/common.c app-chainsend/messages.c app-chainsend/broadcaster.c app-chainsend/peer.c)
- target_link_libraries(chainsend simgrid)
- set_target_properties(chainsend PROPERTIES RUNTIME_OUTPUT_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/app-chainsend)
- add_dependencies(tests chainsend)
-endif()
-foreach (file common iterator messages broadcaster peer)
- set(teshsuite_src ${teshsuite_src} ${CMAKE_CURRENT_SOURCE_DIR}/app-chainsend/${file}.c ${CMAKE_CURRENT_SOURCE_DIR}/app-chainsend/${file}.h)
-endforeach()
-set(teshsuite_src ${teshsuite_src} ${CMAKE_CURRENT_SOURCE_DIR}/app-chainsend/chainsend.c)
-
set(teshsuite_src ${teshsuite_src} PARENT_SCOPE)
set(tesh_files ${tesh_files} ${CMAKE_CURRENT_SOURCE_DIR}/app-bittorrent/app-bittorrent.tesh
- ${CMAKE_CURRENT_SOURCE_DIR}/app-chainsend/app-chainsend.tesh
PARENT_SCOPE)
set(bin_files ${bin_files} ${CMAKE_CURRENT_SOURCE_DIR}/app-bittorrent/generate.py PARENT_SCOPE)
set(xml_files ${xml_files} ${CMAKE_CURRENT_SOURCE_DIR}/app-bittorrent/app-bittorrent_d.xml
- ${CMAKE_CURRENT_SOURCE_DIR}/app-chainsend/app-chainsend_d.xml
${CMAKE_CURRENT_SOURCE_DIR}/async-wait/async-wait_d.xml
${CMAKE_CURRENT_SOURCE_DIR}/async-wait/async-wait2_d.xml
${CMAKE_CURRENT_SOURCE_DIR}/async-wait/async-wait3_d.xml
if(enable_msg)
foreach(x
async-wait
- app-bittorrent app-chainsend
+ app-bittorrent
cloud-migration cloud-two-tasks
energy-pstate
host_on_off host_on_off_processes host_on_off_recv
+++ /dev/null
-/* Copyright (c) 2012-2020. The SimGrid Team. All rights reserved. */
-
-/* This program is free software; you can redistribute it and/or modify it
- * under the terms of the license (GNU LGPL) which comes with this package. */
-
-#include "broadcaster.h"
-
-XBT_LOG_NEW_DEFAULT_CATEGORY(msg_broadcaster, "Messages specific for the broadcaster");
-
-xbt_dynar_t build_hostlist_from_hostcount(int hostcount)
-{
- xbt_dynar_t host_list = xbt_dynar_new(sizeof(char*), xbt_free_ref);
- for (int i = 1; i <= hostcount; i++) {
- char* hostname = bprintf("host%d", i);
- XBT_DEBUG("%s", hostname);
- xbt_dynar_push(host_list, &hostname);
- }
- return host_list;
-}
-
-int broadcaster_build_chain(broadcaster_t bc)
-{
- msg_task_t task = NULL;
- char** cur = (char**)xbt_dynar_iterator_next(bc->it);
- const char* me = MSG_host_get_name(MSG_host_self());
- const char* current_host = NULL;
- const char* prev = NULL;
- const char* next = NULL;
- const char* last = NULL;
-
- /* Build the chain if there's at least one peer */
- if (cur != NULL) {
- /* init: prev=NULL, host=current cur, next=next cur */
- next = *cur;
- bc->first = next;
-
- /* This iterator iterates one step ahead: cur is current iterated element, but is actually next in the chain */
- do {
- /* following steps: prev=last, host=next, next=cur */
- cur = (char**)xbt_dynar_iterator_next(bc->it);
- prev = last;
- current_host = next;
- if (cur != NULL)
- next = *cur;
- else
- next = NULL;
- XBT_DEBUG("Building chain--broadcaster:\"%s\" dest:\"%s\" prev:\"%s\" next:\"%s\"", me, current_host, prev, next);
-
- /* Send message to current peer */
- task = task_message_chain_new(prev, next, bc->piece_count);
- MSG_task_send(task, current_host);
-
- last = current_host;
- } while (cur != NULL);
- }
-
- return MSG_OK;
-}
-
-int broadcaster_send_file(broadcaster_t bc)
-{
- const char* me = MSG_host_get_name(MSG_host_self());
- msg_task_t task = NULL;
-
- bc->current_piece = 0;
-
- while (bc->current_piece < bc->piece_count) {
- task = task_message_data_new(NULL, PIECE_SIZE);
- XBT_DEBUG("Sending (send) piece %d from %s into mailbox %s", bc->current_piece, me, bc->first);
- MSG_task_send(task, bc->first);
- bc->current_piece++;
- }
-
- return MSG_OK;
-}
-
-broadcaster_t broadcaster_init(xbt_dynar_t host_list, unsigned int piece_count)
-{
- int status;
- broadcaster_t bc = xbt_new(s_broadcaster_t, 1);
-
- bc->first = NULL;
- bc->piece_count = piece_count;
- bc->current_piece = 0;
- bc->host_list = host_list;
- bc->it = xbt_dynar_iterator_new(bc->host_list, forward_indices_list);
- bc->max_pending_sends = MAX_PENDING_SENDS;
- bc->pending_sends = xbt_dynar_new(sizeof(msg_comm_t), NULL);
-
- status = broadcaster_build_chain(bc);
- xbt_assert(status == MSG_OK, "Chain initialization failed");
-
- return bc;
-}
-
-static void broadcaster_destroy(broadcaster_t bc)
-{
- /* Destroy iterator and hostlist */
- xbt_dynar_iterator_delete(bc->it);
- xbt_dynar_free(&bc->pending_sends);
- xbt_dynar_free(&bc->host_list);
- xbt_free(bc);
-}
-
-/** Emitter function */
-int broadcaster(int argc, char* argv[])
-{
- unsigned int piece_count = PIECE_COUNT;
-
- XBT_DEBUG("broadcaster");
-
- /* Add every mailbox given by the hostcount in argv[1] to a dynamic array */
- xbt_dynar_t host_list = build_hostlist_from_hostcount(xbt_str_parse_int(argv[1], "Invalid number of peers: %s"));
-
- /* argv[2] is the number of pieces */
- if (argc > 2) {
- piece_count = xbt_str_parse_int(argv[2], "Invalid number of pieces: %s");
- XBT_DEBUG("piece_count set to %u", piece_count);
- } else {
- XBT_DEBUG("No piece_count specified, defaulting to %u", piece_count);
- }
- broadcaster_t bc = broadcaster_init(host_list, piece_count);
-
- /* TODO: Error checking */
- int status = broadcaster_send_file(bc);
-
- broadcaster_destroy(bc);
-
- return status;
-}
+++ /dev/null
-/* Copyright (c) 2012-2020. The SimGrid Team.
- * All rights reserved. */
-
-/* This program is free software; you can redistribute it and/or modify it
- * under the terms of the license (GNU LGPL) which comes with this package. */
-
-#ifndef BROADCASTER_H
-#define BROADCASTER_H
-
-#include "common.h"
-#include "iterator.h"
-#include "messages.h"
-#include "xbt/dynar.h"
-
-/* Connection parameters */
-#define MAX_PENDING_SENDS 10
-
-/* Default values for the ``file'' details */
-#define PIECE_SIZE 65536
-#define PIECE_COUNT 16384
-
-/* Broadcaster struct */
-typedef struct s_broadcaster {
- const char* first;
- int piece_count;
- int current_piece;
- xbt_dynar_t host_list;
- xbt_dynar_iterator_t it;
- int max_pending_sends;
- xbt_dynar_t pending_sends;
-} s_broadcaster_t;
-
-typedef s_broadcaster_t* broadcaster_t;
-
-xbt_dynar_t build_hostlist_from_hostcount(int hostcount);
-
-/* Broadcaster: helper functions */
-broadcaster_t broadcaster_init(xbt_dynar_t host_list, unsigned int piece_count);
-int broadcaster_build_chain(broadcaster_t bc);
-int broadcaster_send_file(broadcaster_t bc);
-
-/* Tasks */
-int broadcaster(int argc, char* argv[]);
-
-#endif /* BROADCASTER_H */
+++ /dev/null
-/* Copyright (c) 2012-2020. The SimGrid Team.
- * All rights reserved. */
-
-/* This program is free software; you can redistribute it and/or modify it
- * under the terms of the license (GNU LGPL) which comes with this package. */
-
-#include "common.h"
-
-int process_pending_connections(xbt_dynar_t q)
-{
- unsigned int iter;
- int status;
- int empty = 0;
- msg_comm_t comm;
-
- xbt_dynar_foreach (q, iter, comm) {
- empty = 1;
- if (MSG_comm_test(comm)) {
- status = MSG_comm_get_status(comm);
- MSG_comm_destroy(comm);
- xbt_assert(status == MSG_OK, "process_pending_connections() failed");
- xbt_dynar_cursor_rm(q, &iter);
- empty = 0;
- }
- }
- return empty;
-}
+++ /dev/null
-/* Copyright (c) 2012-2020. The SimGrid Team.
- * All rights reserved. */
-
-/* This program is free software; you can redistribute it and/or modify it
- * under the terms of the license (GNU LGPL) which comes with this package. */
-
-#ifndef COMMON_H
-#define COMMON_H
-
-#include "simgrid/msg.h"
-#include "xbt/sysdep.h"
-
-static inline void queue_pending_connection(msg_comm_t comm, xbt_dynar_t q)
-{
- xbt_dynar_push(q, &comm);
-}
-
-int process_pending_connections(xbt_dynar_t q);
-
-#endif /* COMMON_H */
+++ /dev/null
-/* Copyright (c) 2012-2020. The SimGrid Team. All rights reserved. */
-
-/* This program is free software; you can redistribute it and/or modify it
- * under the terms of the license (GNU LGPL) which comes with this package. */
-
-#include "iterator.h"
-
-/* http://stackoverflow.com/a/3348142 */
-/**************************************/
-
-/* Allocates and initializes a new xbt_dynar iterator for list, using criteria_fn as iteration criteria
- criteria_fn: given an array size, it must generate a list containing the indices of every item in some order */
-xbt_dynar_iterator_t xbt_dynar_iterator_new(xbt_dynar_t list, xbt_dynar_t (*criteria_fn)(int))
-{
- xbt_dynar_iterator_t it = xbt_new(xbt_dynar_iterator_s, 1);
-
- it->list = list;
- it->length = xbt_dynar_length(list);
- it->indices_list = criteria_fn(it->length); // Creates and fills a dynar of int
- it->criteria_fn = criteria_fn;
- it->current = 0;
-
- return it;
-}
-
-/* Returns the next element iterated by iterator it, NULL if there are no more elements */
-void* xbt_dynar_iterator_next(xbt_dynar_iterator_t it)
-{
- if (it->current >= it->length) {
- return NULL;
- } else {
- const int* next = xbt_dynar_get_ptr(it->indices_list, it->current);
- it->current++;
- return xbt_dynar_get_ptr(it->list, *next);
- }
-}
-
-void xbt_dynar_iterator_delete(xbt_dynar_iterator_t it)
-{
- xbt_dynar_free_container(&(it->indices_list));
- xbt_free_ref(&it);
-}
-
-xbt_dynar_t forward_indices_list(int size)
-{
- xbt_dynar_t indices_list = xbt_dynar_new(sizeof(int), NULL);
- for (int i = 0; i < size; i++)
- xbt_dynar_push_as(indices_list, int, i);
- return indices_list;
-}
+++ /dev/null
-/* Copyright (c) 2012-2020. The SimGrid Team.
- * All rights reserved. */
-
-/* This program is free software; you can redistribute it and/or modify it
- * under the terms of the license (GNU LGPL) which comes with this package. */
-
-#ifndef ITERATOR_H
-#define ITERATOR_H
-
-#include "xbt/dynar.h"
-#include "xbt/sysdep.h"
-
-/* Random iterator for xbt_dynar */
-typedef struct xbt_dynar_iterator_struct {
- xbt_dynar_t list;
- xbt_dynar_t indices_list;
- int current;
- unsigned long length;
- xbt_dynar_t (*criteria_fn)(int size);
-} * xbt_dynar_iterator_t;
-typedef struct xbt_dynar_iterator_struct xbt_dynar_iterator_s;
-
-/* Iterator methods */
-xbt_dynar_iterator_t xbt_dynar_iterator_new(xbt_dynar_t list, xbt_dynar_t (*criteria_fn)(int));
-void* xbt_dynar_iterator_next(xbt_dynar_iterator_t it);
-void xbt_dynar_iterator_delete(xbt_dynar_iterator_t it);
-
-/* Iterator generators */
-xbt_dynar_t forward_indices_list(int size);
-
-#endif /* ITERATOR_H */
+++ /dev/null
-/* Copyright (c) 2012-2020. The SimGrid Team.
- * All rights reserved. */
-
-/* This program is free software; you can redistribute it and/or modify it
- * under the terms of the license (GNU LGPL) which comes with this package. */
-
-#include "messages.h"
-
-msg_task_t task_message_new(e_message_type type, unsigned int len)
-{
- message_t msg = xbt_new(s_message_t, 1);
- msg->type = type;
- msg->prev_hostname = NULL;
- msg->next_hostname = NULL;
- msg_task_t task = MSG_task_create(NULL, 0, len, msg);
-
- return task;
-}
-
-msg_task_t task_message_chain_new(const char* prev, const char* next, const unsigned int num_pieces)
-{
- msg_task_t task = task_message_new(MESSAGE_BUILD_CHAIN, MESSAGE_BUILD_CHAIN_SIZE);
- message_t msg = MSG_task_get_data(task);
- msg->prev_hostname = xbt_strdup(prev);
- msg->next_hostname = xbt_strdup(next);
- msg->num_pieces = num_pieces;
-
- return task;
-}
-
-msg_task_t task_message_data_new(const char* block, unsigned int len)
-{
- msg_task_t task = task_message_new(MESSAGE_SEND_DATA, MESSAGE_SEND_DATA_HEADER_SIZE + len);
- message_t msg = MSG_task_get_data(task);
- msg->data_block = block;
- msg->data_length = len;
-
- return task;
-}
-
-void task_message_delete(void* task)
-{
- message_t msg = MSG_task_get_data(task);
- xbt_free(msg);
- MSG_task_destroy(task);
-}
+++ /dev/null
-/* Copyright (c) 2012-2020. The SimGrid Team.
- * All rights reserved. */
-
-/* This program is free software; you can redistribute it and/or modify it
- * under the terms of the license (GNU LGPL) which comes with this package. */
-
-#ifndef MESSAGES_H
-#define MESSAGES_H
-
-#include "simgrid/msg.h"
-
-#define MESSAGE_BUILD_CHAIN_SIZE 40
-#define MESSAGE_SEND_DATA_HEADER_SIZE 1
-#define MESSAGE_END_DATA_SIZE 1
-
-/* Messages enum */
-typedef enum { MESSAGE_BUILD_CHAIN = 0, MESSAGE_SEND_DATA } e_message_type;
-
-/* Message struct */
-typedef struct s_message {
- e_message_type type;
- char* prev_hostname;
- char* next_hostname;
- const char* data_block;
- unsigned int data_length;
- unsigned int num_pieces;
-} s_message_t;
-
-typedef s_message_t* message_t;
-
-/* Message methods */
-msg_task_t task_message_new(e_message_type type, unsigned int len);
-msg_task_t task_message_chain_new(const char* prev, const char* next, const unsigned int num_pieces);
-msg_task_t task_message_data_new(const char* block, unsigned int len);
-void task_message_delete(void*);
-
-#endif /* MESSAGES_H */
+++ /dev/null
-/* Copyright (c) 2012-2020. The SimGrid Team.
- * All rights reserved. */
-
-/* This program is free software; you can redistribute it and/or modify it
- * under the terms of the license (GNU LGPL) which comes with this package. */
-
-#include "peer.h"
-
-XBT_LOG_NEW_DEFAULT_CATEGORY(msg_peer, "Messages specific for the peer");
-
-void peer_init_chain(peer_t peer, const s_message_t* msg)
-{
- peer->prev = msg->prev_hostname;
- peer->next = msg->next_hostname;
- peer->total_pieces = msg->num_pieces;
- peer->init = 1;
-}
-
-static void peer_forward_msg(const s_peer_t* peer, const s_message_t* msg)
-{
- msg_task_t task = task_message_data_new(NULL, msg->data_length);
- XBT_DEBUG("Sending (isend) from %s into mailbox %s", peer->me, peer->next);
- msg_comm_t comm = MSG_task_isend(task, peer->next);
- queue_pending_connection(comm, peer->pending_sends);
-}
-
-int peer_execute_task(peer_t peer, msg_task_t task)
-{
- int done = 0;
- const s_message_t* msg = MSG_task_get_data(task);
-
- XBT_DEBUG("Peer %s got message of type %u\n", peer->me, msg->type);
- if (msg->type == MESSAGE_BUILD_CHAIN)
- peer_init_chain(peer, msg);
- else if (msg->type == MESSAGE_SEND_DATA) {
- xbt_assert(peer->init, "peer_execute_task() failed: got msg_type %u before initialization", msg->type);
- if (peer->next != NULL)
- peer_forward_msg(peer, msg);
- peer->pieces++;
- peer->bytes += msg->data_length;
- if (peer->pieces >= peer->total_pieces) {
- XBT_DEBUG("%d pieces received", peer->pieces);
- done = 1;
- }
- }
-
- MSG_task_execute(task);
-
- return done;
-}
-
-msg_error_t peer_wait_for_message(peer_t peer)
-{
- msg_error_t status;
- msg_comm_t comm = NULL;
- msg_task_t task = NULL;
- int done = 0;
-
- while (done == 0) {
- comm = MSG_task_irecv(&task, peer->me);
- queue_pending_connection(comm, peer->pending_recvs);
- int idx = MSG_comm_waitany(peer->pending_recvs);
- if (idx != -1) {
- comm = xbt_dynar_get_as(peer->pending_recvs, idx, msg_comm_t);
- status = MSG_comm_get_status(comm);
- XBT_DEBUG("peer_wait_for_message: error code = %u", status);
- xbt_assert(status == MSG_OK, "peer_wait_for_message() failed");
-
- task = MSG_comm_get_task(comm);
- MSG_comm_destroy(comm);
- xbt_dynar_cursor_rm(peer->pending_recvs, (unsigned int*)&idx);
- done = peer_execute_task(peer, task);
-
- task_message_delete(task);
- task = NULL;
- }
- process_pending_connections(peer->pending_sends);
- }
-
- return status;
-}
-
-void peer_init(peer_t p, int argc, char* argv[])
-{
- p->init = 0;
- p->prev = NULL;
- p->next = NULL;
- p->pieces = 0;
- p->bytes = 0;
- p->pending_recvs = xbt_dynar_new(sizeof(msg_comm_t), NULL);
- p->pending_sends = xbt_dynar_new(sizeof(msg_comm_t), NULL);
- /* Set mailbox name: use host number from argv or hostname if no argument given */
- if (argc > 1) {
- p->me = bprintf("host%s", argv[1]);
- } else {
- p->me = xbt_strdup(MSG_host_get_name(MSG_host_self()));
- }
-}
-
-void peer_shutdown(const s_peer_t* p)
-{
- unsigned int size = xbt_dynar_length(p->pending_sends);
- unsigned int idx;
- msg_comm_t* comms = xbt_new(msg_comm_t, size);
-
- for (idx = 0; idx < size; idx++) {
- comms[idx] = xbt_dynar_get_as(p->pending_sends, idx, msg_comm_t);
- }
-
- XBT_DEBUG("Waiting for sends to finish before shutdown...");
- MSG_comm_waitall(comms, size, PEER_SHUTDOWN_DEADLINE);
-
- for (idx = 0; idx < size; idx++) {
- MSG_comm_destroy(comms[idx]);
- }
-
- xbt_free(comms);
-}
-
-void peer_delete(peer_t p)
-{
- xbt_dynar_free(&p->pending_recvs);
- xbt_dynar_free(&p->pending_sends);
- xbt_free(p->me);
- xbt_free(p->prev);
- xbt_free(p->next);
-
- xbt_free(p);
-}
-
-void peer_print_stats(const s_peer_t* p, float elapsed_time)
-{
- XBT_INFO("### %f %llu bytes (Avg %f MB/s); copy finished (simulated).", elapsed_time, p->bytes,
- p->bytes / 1024.0 / 1024.0 / elapsed_time);
-}
-
-/** Peer function */
-int peer(int argc, char* argv[])
-{
- peer_t p = xbt_new(s_peer_t, 1);
- msg_error_t status;
-
- XBT_DEBUG("peer");
-
- peer_init(p, argc, argv);
- float start_time = MSG_get_clock();
- status = peer_wait_for_message(p);
- peer_shutdown(p);
- float end_time = MSG_get_clock();
- peer_print_stats(p, end_time - start_time);
- peer_delete(p);
-
- return status;
-}
+++ /dev/null
-/* Copyright (c) 2012-2020. The SimGrid Team.
- * All rights reserved. */
-
-/* This program is free software; you can redistribute it and/or modify it
- * under the terms of the license (GNU LGPL) which comes with this package. */
-
-#ifndef PEER_H
-#define PEER_H
-
-#include "simgrid/msg.h"
-#include "xbt/dynar.h"
-
-#include "common.h"
-#include "messages.h"
-
-#define PEER_SHUTDOWN_DEADLINE 60000
-
-/* Peer struct */
-typedef struct s_peer {
- int init;
- char* prev;
- char* next;
- char* me;
- int pieces;
- unsigned long long bytes;
- xbt_dynar_t pending_recvs;
- xbt_dynar_t pending_sends;
- unsigned int total_pieces;
-} s_peer_t;
-typedef s_peer_t* peer_t;
-
-/* Peer: helper functions */
-msg_error_t peer_wait_for_message(peer_t peer);
-int peer_execute_task(peer_t peer, msg_task_t task);
-void peer_init_chain(peer_t peer, const s_message_t* msg);
-void peer_delete(peer_t p);
-void peer_shutdown(const s_peer_t* p);
-void peer_init(peer_t p, int argc, char* argv[]);
-void peer_print_stats(const s_peer_t* p, float elapsed_time);
-
-int peer(int argc, char* argv[]);
-
-#endif /* PEER_H */