hostname = xbt_new(char, HOSTNAME_LENGTH);
snprintf(hostname, HOSTNAME_LENGTH, "host%d", i);
//XBT_INFO("%s", hostname);
- h = MSG_get_host_by_name(hostname);
+ /*h = MSG_get_host_by_name(hostname);
if (h == NULL) {
XBT_INFO("Unknown host %s. Stopping Now! ", hostname);
abort();
- } else {
+ } else {*/
xbt_dynar_push(host_list, &hostname);
- }
+ /*}*/
}
return host_list;
}
xbt_dynar_free(&h);
}
-int broadcaster_build_chain(const char **first, xbt_dynar_t host_list)
+int broadcaster_build_chain(const char **first, xbt_dynar_t host_list, xbt_dynar_iterator_t it)
{
- xbt_dynar_iterator_t it = xbt_dynar_iterator_new(host_list, forward_indices_list);
msg_task_t task = NULL;
char **cur = (char**)xbt_dynar_iterator_next(it);
- const char *me = MSG_host_get_name(MSG_host_self());
+ const char *me = "host0"; /* FIXME: hardcoded*/ /*MSG_host_get_name(MSG_host_self());*/
const char *current_host = NULL;
const char *prev = NULL;
const char *next = NULL;
last = current_host;
} while (cur != NULL);
}
- xbt_dynar_iterator_delete(it);
return MSG_OK;
}
XBT_INFO("Sending (send) from %s into mailbox %s", me, first);
status = MSG_task_send(task, first);
- xbt_assert(status == MSG_OK, __FILE__ ": broadcaster_send_file() failed");
+ xbt_assert(status == MSG_OK, "broadcaster_send_file() failed");
}
return MSG_OK;
}
-/* FIXME: I should iterate nodes in the same order as the one used to build the chain */
-int broadcaster_finish(xbt_dynar_t host_list)
+int broadcaster_finish(xbt_dynar_iterator_t it)
{
- xbt_dynar_iterator_t it = xbt_dynar_iterator_new(host_list, forward_indices_list);
msg_task_t task = NULL;
- const char *me = MSG_host_get_name(MSG_host_self());
+ const char *me = "host0"; /* FIXME: hardcoded*/ /*MSG_host_get_name(MSG_host_self());*/
const char *current_host = NULL;
char **cur = NULL;
- /* Send goodbye message to every peer */
+ xbt_dynar_iterator_seek(it, 0);
+
+ /* Send goodbye message to every peer in the order generated by iterator it */
for (cur = (char**)xbt_dynar_iterator_next(it); cur != NULL; cur = (char**)xbt_dynar_iterator_next(it)) {
/* Send message to current peer */
current_host = *cur;
XBT_INFO("broadcaster");
- /* Check that every host given by the hostcount in argv[1] exists and add it
- to a dynamic array */
+ /* Add every mailbox given by the hostcount in argv[1] to a dynamic array */
host_list = build_hostlist_from_hostcount(atoi(argv[1]));
/*host_list = build_hostlist_from_argv(argc, argv);*/
+ /* Initialize iterator */
+ xbt_dynar_iterator_t it = xbt_dynar_iterator_new(host_list, forward_indices_list);
+
/* TODO: Error checking */
- status = broadcaster_build_chain(&first, host_list);
+ status = broadcaster_build_chain(&first, host_list, it);
status = broadcaster_send_file(first);
- status = broadcaster_finish(host_list);
+ status = broadcaster_finish(it);
+ /* Destroy iterator and hostlist */
+ xbt_dynar_iterator_delete(it);
delete_hostlist(host_list);
return status;
#include "messages.h"
#include "iterator.h"
+#include "common.h"
-#define HOSTNAME_LENGTH 20
-#define PIECE_COUNT 1000
+#define PIECE_COUNT 50
xbt_dynar_t build_hostlist_from_hostcount(int hostcount);
/*xbt_dynar_t build_hostlist_from_argv(int argc, char *argv[]);*/
/* Broadcaster: helper functions */
-int broadcaster_build_chain(const char **first, xbt_dynar_t host_list);
+int broadcaster_build_chain(const char **first, xbt_dynar_t host_list, xbt_dynar_iterator_t it);
int broadcaster_send_file(const char *first);
-int broadcaster_finish(xbt_dynar_t host_list);
+int broadcaster_finish(xbt_dynar_iterator_t it);
/* Tasks */
int broadcaster(int argc, char *argv[]);
#include "xbt/sysdep.h"
#define MESSAGE_SIZE 1
-
-
+#define HOSTNAME_LENGTH 20
#endif /* KADEPLOY_COMMON_H */
--- /dev/null
+#!/usr/bin/env ruby
+
+require 'rexml/document'
+
+class HostsExtractor
+ @@doc = nil
+ @@hosts = []
+
+ def initialize(xml)
+ @@doc = REXML::Document.new(xml)
+ @@doc.elements.each('platform') do |platform|
+ extract_hosts(platform)
+ end
+ end
+
+ def extract_hosts(doc)
+ doc.elements.each('AS') do |as|
+ extract_hosts_from_AS(as)
+ extract_hosts(as)
+ end
+ end
+
+ def extract_hosts_from_AS(doc)
+ doc.elements.each('host') do |h|
+ @@hosts << h.attributes['id']
+ puts "hosts %s" % h.attributes['id']
+ end
+
+ doc.elements.each('cluster') do |c|
+ prefix = c.attributes['prefix']
+ suffix = c.attributes['suffix']
+ puts "%s %s %s" % [prefix, c.attributes['radical'], suffix]
+ expand_radical(c.attributes['radical']).each do |num|
+ @@hosts << "%s%s%s" % [prefix, num, suffix]
+ end
+ end
+ end
+
+ def hosts
+ return @@hosts
+ end
+
+ def expand_radical(radical)
+ l = []
+ puts radical
+ radical.split(',').each do |range|
+ range.scan(/^\d+$/) { |x| l << x }
+ range.scan(/^(\d+)-(\d+)$/) { |x, y| (x..y).each do |i| l << i end }
+ end
+ return l
+ end
+end
+
+class DeploymentGenerator
+ @@outfile = nil
+
+ def initialize(fname)
+ @@outfile = File.new(fname, "w")
+ end
+
+ def write_header
+ @@outfile.puts "<?xml version='1.0'?>"
+ @@outfile.puts "<!DOCTYPE platform SYSTEM \"http://simgrid.gforge.inria.fr/simgrid.dtd\">"
+ @@outfile.puts "<platform version=\"3\">"
+ end
+
+ def write_process(name, function, hosts, args)
+ @@outfile.puts " <!-- %s -->" % name
+ hosts.zip(args).each do |h, a|
+ @@outfile.puts " <process host=\"%s\" function=\"%s\">" % [h, function]
+ @@outfile.puts " <argument value=\"%s\"/>" % [a]
+ @@outfile.puts " </process>"
+ end
+ end
+
+ def write_footer
+ @@outfile.puts "</platform>"
+ end
+end
+
+xml = File.read(ARGV.shift)
+he = HostsExtractor.new(xml)
+
+raise "Cannot run with less than 2 hosts" unless he.hosts.size > 1
+
+output = ARGV.shift
+dg = DeploymentGenerator.new(output)
+dg.write_header
+
+puts he.hosts
+broadcaster = he.hosts.shift
+peers = he.hosts
+
+dg.write_process("Broadcaster", "broadcaster", [broadcaster], [he.hosts.size])
+dg.write_process("Peers", "peer", peers, (1..he.hosts.size))
+
+dg.write_footer
TRACE_category_with_color("host2", "0 1 1");
TRACE_category_with_color("host3", "1 0 0");
TRACE_category_with_color("host4", "1 0 1");
- TRACE_category_with_color("host5", "1 1 0");
+ TRACE_category_with_color("host5", "0 0 0");
+ TRACE_category_with_color("host6", "1 1 0");
+ TRACE_category_with_color("host7", "1 1 1");
+ TRACE_category_with_color("host8", "0 1 0");
/* Application deployment */
MSG_function_register("broadcaster", broadcaster);
msg_task_t task_message_data_new(const char *issuer_hostname, const char *mailbox, const char *block, unsigned int len)
{
msg_task_t task = task_message_new(MESSAGE_SEND_DATA, issuer_hostname, mailbox);
- if (strcmp(mailbox, "host4") == 0) MSG_task_set_category(task, mailbox);
+ //if (strcmp(mailbox, "host4") == 0)
+ //MSG_task_set_category(task, mailbox);
message_t msg = MSG_task_get_data(task);
msg->data_block = block;
msg->data_length = len;
int done = 0;
while (!done) {
- if (comm == NULL)
+ if (comm == NULL) // FIXME I should have a recv queue
comm = MSG_task_irecv(&task, peer->me);
if (MSG_comm_test(comm)) {
status = MSG_comm_get_status(comm);
//XBT_INFO("peer_wait_for_message: error code = %d", status);
- xbt_assert(status == MSG_OK, __FILE__ ": peer_wait_for_message() failed");
+ xbt_assert(status == MSG_OK, "peer_wait_for_message() failed");
MSG_comm_destroy(comm);
comm = NULL;
done = peer_execute_task(peer, task);
return status;
}
-void peer_init(peer_t p)
+void peer_init(peer_t p, int argc, char *argv[])
{
p->init = 0;
p->prev = NULL;
p->pieces = 0;
p->close_asap = 0;
p->pending_sends = xbt_dynar_new(sizeof(msg_comm_t), NULL);
- p->me = MSG_host_get_name(MSG_host_self());
+ p->me = xbt_new(char, HOSTNAME_LENGTH);
+ /* Set mailbox name: use host number from argv or hostname if no argument given */
+ if (argc > 1) {
+ snprintf(p->me, HOSTNAME_LENGTH, "host%s", argv[1]);
+ } else {
+ strncpy(p->me, MSG_host_get_name(MSG_host_self()), HOSTNAME_LENGTH);
+ }
}
void peer_shutdown(peer_t p)
xbt_assert(xbt_dynar_length(p->pending_sends) == 0, "Shutdown failed, sends still pending after deadline");
xbt_dynar_free(&p->pending_sends);
+ xbt_free(p->me);
xbt_free(p);
}
XBT_INFO("peer");
- peer_init(p);
+ peer_init(p, argc, argv);
status = peer_wait_for_message(p);
peer_shutdown(p);
#include "xbt/sysdep.h"
#include "messages.h"
+#include "common.h"
#define PEER_SHUTDOWN_DEADLINE 6000
int init;
const char *prev;
const char *next;
- const char *me;
+ char *me;
int pieces;
xbt_dynar_t pending_sends;
int close_asap; /* TODO: unused */
int peer_execute_task(peer_t peer, msg_task_t task);
void peer_init_chain(peer_t peer, message_t msg);
void peer_shutdown(peer_t p);
-void peer_init(peer_t p);
+void peer_init(peer_t p, int argc, char *argv[]);
int peer(int argc, char *argv[]);