1 /* amok peer management - servers main loop and remote peer stopping */
3 /* Copyright (c) 2006, 2007, 2008, 2009, 2010. The SimGrid Team.
4 * All rights reserved. */
6 /* This program is free software; you can redistribute it and/or modify it
7 * under the terms of the license (GNU LGPL) which comes with this package. */
9 #include "xbt/sysdep.h"
11 #include "amok/peermanagement.h"
13 #include "amok/amok_modinter.h" /* prototype of my module declaration */
14 #include "gras/module.h" /* module mecanism */
16 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(amok_pm, amok, "peer management");
20 int amok_pm_moddata_id = -1;
24 } s_amok_pm_moddata_t, *amok_pm_moddata_t;
26 /* Message callbacks */
27 static int amok_pm_cb_kill(gras_msg_cb_ctx_t ctx, void *payload_data)
30 amok_pm_moddata_t g = gras_moddata_by_id(amok_pm_moddata_id);
35 static int amok_pm_cb_killrpc(gras_msg_cb_ctx_t ctx, void *payload_data)
38 amok_pm_moddata_t g = gras_moddata_by_id(amok_pm_moddata_id);
40 gras_msg_rpcreturn(30, ctx, NULL);
44 static int amok_pm_cb_get(gras_msg_cb_ctx_t ctx, void *payload)
46 amok_pm_moddata_t g = gras_moddata_by_id(amok_pm_moddata_id);
47 char *name = *(void **) payload;
48 xbt_dynar_t res = xbt_dict_get(g->groups, name);
50 gras_msg_rpcreturn(30, ctx, &res);
54 static int amok_pm_cb_join(gras_msg_cb_ctx_t ctx, void *payload)
56 amok_pm_moddata_t g = gras_moddata_by_id(amok_pm_moddata_id);
57 char *group_name = *(char **) payload;
58 xbt_dynar_t group = xbt_dict_get(g->groups, group_name);
61 xbt_socket_t exp = gras_msg_cb_ctx_from(ctx);
62 xbt_peer_t dude = xbt_peer_new(xbt_socket_peer_name(exp),
63 xbt_socket_peer_port(exp));
65 rank = xbt_dynar_length(group);
66 xbt_dynar_push(group, &dude);
67 XBT_VERB("Contacted by %s:%d. Give it rank #%d", dude->name, dude->port,
70 gras_msg_rpcreturn(10, ctx, &rank);
75 static int amok_pm_cb_leave(gras_msg_cb_ctx_t ctx, void *payload)
77 amok_pm_moddata_t g = gras_moddata_by_id(amok_pm_moddata_id);
78 char *name = *(void **) payload;
79 xbt_dynar_t group = xbt_dict_get(g->groups, name);
81 xbt_socket_t exp = gras_msg_cb_ctx_from(ctx);
82 xbt_peer_t dude = xbt_peer_new(xbt_socket_peer_name(exp),
83 xbt_socket_peer_port(exp));
88 xbt_dynar_foreach(group, cpt, peer_it) {
89 if (!strcmp(peer_it->name, dude->name) && peer_it->port == dude->port) {
90 xbt_dynar_cursor_rm(group, &cpt);
94 XBT_WARN("Asked to remove %s:%d from group '%s', but not found. Ignoring",
95 dude->name, dude->port, name);
98 gras_msg_rpcreturn(30, ctx, NULL);
102 static int amok_pm_cb_shutdown(gras_msg_cb_ctx_t ctx, void *payload)
104 char *name = *(void **) payload;
105 amok_pm_group_shutdown(name);
107 gras_msg_rpcreturn(30, ctx, NULL);
111 /** \brief Enter the main loop of the program. It won't return until we get a kill message. */
112 void amok_pm_mainloop(double timeOut)
114 amok_pm_moddata_t g = gras_moddata_by_id(amok_pm_moddata_id);
117 gras_msg_handle(timeOut);
121 /** \brief kill a buddy identified by its peername and port. Note that it is not removed from any group it may belong to. */
122 void amok_pm_kill_hp(char *name, int port)
124 xbt_socket_t sock = gras_socket_client(name, port);
126 gras_socket_close(sock);
129 /** \brief kill a buddy to which we have a socket already. Note that it is not removed from any group it may belong to. */
130 void amok_pm_kill(xbt_socket_t buddy)
132 gras_msg_send(buddy, "amok_pm_kill", NULL);
135 /** \brief kill syncronously a buddy (do not return before its death). Note that it is not removed from any group it may belong to. */
136 void amok_pm_kill_sync(xbt_socket_t buddy)
138 gras_msg_rpccall(buddy, 30, "amok_pm_killrpc", NULL, NULL);
142 /** \brief create a new peermanagement group located on local peer
144 * The dynar elements are of type xbt_peer_t
146 xbt_dynar_t amok_pm_group_new(const char *group_name)
149 xbt_dynar_t res = xbt_dynar_new(sizeof(xbt_peer_t),
150 xbt_peer_free_voidp);
152 xbt_assert(amok_pm_moddata_id != -1, "Run amok_pm_init first!");
153 g = gras_moddata_by_id(amok_pm_moddata_id);
155 XBT_DEBUG("retrieved groups=%p", g->groups);
157 xbt_dict_set(g->groups, group_name, res, NULL); /*FIXME: leaking xbt_dynar_free_voidp); */
158 XBT_VERB("Group %s created", group_name);
163 /** \brief retrieve all members of the given remote group */
164 xbt_dynar_t amok_pm_group_get(xbt_socket_t master, const char *group_name)
168 gras_msg_rpccall(master, 30, "amok_pm_get", &group_name, &res);
172 /** \brief add current peer to the given remote group
174 * Returns the rank of the process in the group.
176 int amok_pm_group_join(xbt_socket_t master, const char *group_name)
179 XBT_VERB("Join group '%s' on %s:%d",
180 group_name, xbt_socket_peer_name(master),
181 xbt_socket_peer_port(master));
182 gras_msg_rpccall(master, 30, "amok_pm_join", &group_name, &rank);
183 XBT_VERB("Joined group '%s' on %s:%d. Got rank %d",
184 group_name, xbt_socket_peer_name(master),
185 xbt_socket_peer_port(master), rank);
189 /** \brief remove current peer from the given remote group if found
191 * If not found, call is ignored
193 void amok_pm_group_leave(xbt_socket_t master, const char *group_name)
195 gras_msg_rpccall(master, 30, "amok_pm_leave", &group_name, NULL);
196 XBT_VERB("Leaved group '%s' on %s:%d",
197 group_name, xbt_socket_peer_name(master),
198 xbt_socket_peer_port(master));
201 /** \brief stops all members of the given local group */
202 void amok_pm_group_shutdown(const char *group_name)
204 amok_pm_moddata_t g = gras_moddata_by_id(amok_pm_moddata_id);
205 xbt_dynar_t group = xbt_dict_get(g->groups, group_name);
210 xbt_dynar_foreach(group, cpt, peer_it) {
211 amok_pm_kill_hp(peer_it->name, peer_it->port);
214 xbt_dynar_free(&group);
215 xbt_dict_remove(g->groups, group_name);
218 /** \brief stops all members of the given remote group */
219 void amok_pm_group_shutdown_remote(xbt_socket_t master,
220 const char *group_name)
222 gras_msg_rpccall(master, 30, "amok_pm_shutdown", &group_name, NULL);
228 * * Module management functions
234 static void _amok_pm_init(void)
236 /* no world-wide globals */
237 /* Datatype and message declarations */
238 xbt_datadesc_type_t pm_group_type =
239 xbt_datadesc_dynar(xbt_datadesc_by_name("xbt_peer_t"),
240 xbt_peer_free_voidp);
242 gras_msgtype_declare("amok_pm_kill", NULL);
243 gras_msgtype_declare_rpc("amok_pm_killrpc", NULL, NULL);
245 gras_msgtype_declare_rpc("amok_pm_get",
246 xbt_datadesc_by_name("string"), pm_group_type);
247 gras_msgtype_declare_rpc("amok_pm_join", xbt_datadesc_by_name("string"),
248 xbt_datadesc_by_name("int"));
249 gras_msgtype_declare_rpc("amok_pm_leave",
250 xbt_datadesc_by_name("string"), NULL);
252 gras_msgtype_declare_rpc("amok_pm_shutdown",
253 xbt_datadesc_by_name("string"), NULL);
256 static void _amok_pm_join(void *p)
258 /* moddata management */
259 amok_pm_moddata_t mod = (amok_pm_moddata_t) p;
264 mod->groups = xbt_dict_new_homogeneous(NULL);
267 gras_cb_register("amok_pm_kill", &amok_pm_cb_kill);
268 gras_cb_register("amok_pm_killrpc", &amok_pm_cb_killrpc);
270 gras_cb_register("amok_pm_get", &amok_pm_cb_get);
271 gras_cb_register("amok_pm_join", &amok_pm_cb_join);
272 gras_cb_register("amok_pm_leave", &amok_pm_cb_leave);
273 gras_cb_register("amok_pm_shutdown", &amok_pm_cb_shutdown);
276 static void _amok_pm_exit(void)
278 /* no world-wide globals */
281 static void _amok_pm_leave(void *p)
284 amok_pm_moddata_t mod = (amok_pm_moddata_t) p;
286 xbt_dict_free(&mod->groups);
289 gras_cb_unregister("amok_pm_kill", &amok_pm_cb_kill);
290 gras_cb_unregister("amok_pm_killrpc", &amok_pm_cb_killrpc);
292 gras_cb_unregister("amok_pm_get", &amok_pm_cb_get);
293 gras_cb_unregister("amok_pm_join", &amok_pm_cb_join);
294 gras_cb_unregister("amok_pm_leave", &amok_pm_cb_leave);
295 gras_cb_unregister("amok_pm_shutdown", &amok_pm_cb_shutdown);
298 void amok_pm_modulecreate()
300 gras_module_add("amok_pm", sizeof(s_amok_pm_moddata_t),
301 &amok_pm_moddata_id, _amok_pm_init, _amok_pm_exit,
302 _amok_pm_join, _amok_pm_leave);
309 * * Old module functions (kept for compatibility)
312 /** \brief Initialize the peer management module. Every process must run it before use */
315 gras_module_join("amok_pm");
318 /** \brief Finalize the peer management module. Every process should run it after use */
321 gras_module_leave("amok_pm");