Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Kill old $Id$ command dating from CVS
[simgrid.git] / examples / gras / pmm / pmm.c
1 /* pmm - parallel matrix multiplication "double diffusion"                  */
2
3 /* Copyright (c) 2006-2008 The SimGrid team. All rights reserved.           */
4
5 /* This program is free software; you can redistribute it and/or modify it
6  * under the terms of the license (GNU LGPL) which comes with this package. */
7
8 #include "gras.h"
9 #include "xbt/matrix.h"
10 #include "amok/peermanagement.h"
11
12 #define PROC_MATRIX_SIZE 3
13 #define NEIGHBOR_COUNT PROC_MATRIX_SIZE - 1
14 #define SLAVE_COUNT (PROC_MATRIX_SIZE*PROC_MATRIX_SIZE)
15
16 #define DATA_MATRIX_SIZE 18
17 const int submatrix_size = DATA_MATRIX_SIZE / PROC_MATRIX_SIZE;
18
19 XBT_LOG_NEW_DEFAULT_CATEGORY(pmm, "Parallel Matrix Multiplication");
20
21 /* struct for recovering results */
22 GRAS_DEFINE_TYPE(s_result, struct s_result {
23                  int linepos;
24                  int rowpos; xbt_matrix_t C GRAS_ANNOTE(subtype, double);});
25
26 typedef struct s_result result_t;
27
28 /* struct to send initial data to slave */
29 GRAS_DEFINE_TYPE(s_pmm_assignment, struct s_pmm_assignment {
30                  int linepos;
31                  int rowpos;
32                  xbt_peer_t line[NEIGHBOR_COUNT];
33                  xbt_peer_t row[NEIGHBOR_COUNT];
34                  xbt_matrix_t A GRAS_ANNOTE(subtype, double);
35                  xbt_matrix_t B GRAS_ANNOTE(subtype, double);});
36
37 typedef struct s_pmm_assignment s_pmm_assignment_t;
38
39 /* register messages which may be sent (common to client and server) */
40 static void register_messages(void)
41 {
42   gras_datadesc_type_t result_type;
43   gras_datadesc_type_t pmm_assignment_type;
44
45   gras_datadesc_set_const("NEIGHBOR_COUNT", NEIGHBOR_COUNT);
46   result_type = gras_datadesc_by_symbol(s_result);
47   pmm_assignment_type = gras_datadesc_by_symbol(s_pmm_assignment);
48
49   /* receive a final result from slave */
50   gras_msgtype_declare("result", result_type);
51
52   /* send from master to slave to assign a position and some data */
53   gras_msgtype_declare("pmm_slave", pmm_assignment_type);
54
55   /* send data between slaves */
56   gras_msgtype_declare("dataA",
57                        gras_datadesc_matrix(gras_datadesc_by_name("double"),
58                                             NULL));
59   gras_msgtype_declare("dataB",
60                        gras_datadesc_matrix(gras_datadesc_by_name("double"),
61                                             NULL));
62 }
63
64 /* Function prototypes */
65 int slave(int argc, char *argv[]);
66 int master(int argc, char *argv[]);
67
68
69 /* **********************************************************************
70  * master code
71  * **********************************************************************/
72
73 /* Global private data */
74 typedef struct {
75   int nbr_row, nbr_line;
76   int remaining_step;
77   int remaining_ack;
78 } master_data_t;
79
80 int master(int argc, char *argv[])
81 {
82
83   int i;
84
85   xbt_matrix_t A, B, C;
86   result_t result;
87
88   gras_socket_t from;
89
90   xbt_dynar_t peers;            /* group of slaves */
91   xbt_peer_t grid[SLAVE_COUNT]; /* The slaves as an array */
92   gras_socket_t socket[SLAVE_COUNT];    /* sockets for brodcast to slaves */
93
94   /* Init the GRAS's infrastructure */
95   gras_init(&argc, argv);
96   amok_pm_init();
97   register_messages();
98
99   /* Initialize data matrices */
100   A = xbt_matrix_double_new_id(DATA_MATRIX_SIZE, DATA_MATRIX_SIZE);
101   B = xbt_matrix_double_new_seq(DATA_MATRIX_SIZE, DATA_MATRIX_SIZE);
102   C = xbt_matrix_double_new_zeros(DATA_MATRIX_SIZE, DATA_MATRIX_SIZE);
103
104   /* Create the connexions */
105   xbt_assert0(argc > 1, "Usage: master <port>");
106   gras_socket_server(atoi(argv[1]));
107   peers = amok_pm_group_new("pmm");
108
109   /* friends, we're ready. Come and play */
110   INFO0("Wait for peers for 2 sec");
111   gras_msg_handleall(2);
112   while (xbt_dynar_length(peers)<9) {
113     INFO1("Got only %ld pals. Wait 2 more seconds", xbt_dynar_length(peers));
114     gras_msg_handleall(2);
115   }
116   INFO1("Good. Got %ld pals", xbt_dynar_length(peers));
117
118   for (i = 0; i < xbt_dynar_length(peers) && i < SLAVE_COUNT; i++) {
119     xbt_dynar_get_cpy(peers, i, &grid[i]);
120     socket[i] = gras_socket_client(grid[i]->name, grid[i]->port);
121   }
122   xbt_assert2(i == SLAVE_COUNT,
123               "Not enough slaves for this setting (got %d of %d). Change the deployment file",
124               i, SLAVE_COUNT);
125
126   /* Kill surnumerous slaves */
127   for (i = SLAVE_COUNT; i < xbt_dynar_length(peers);) {
128     xbt_peer_t h;
129
130     xbt_dynar_remove_at(peers, i, &h);
131     INFO2("Too much slaves. Killing %s:%d", h->name, h->port);
132     amok_pm_kill_hp(h->name, h->port);
133     free(h);
134   }
135
136
137   /* Assign job to slaves */
138   int row = 0, line = 0;
139   INFO0("XXXXXXXXXXXXXXXXXXXXXX begin Multiplication");
140   for (i = 0; i < SLAVE_COUNT; i++) {
141     s_pmm_assignment_t assignment;
142     int j, k;
143
144     assignment.linepos = line;  // assigned line
145     assignment.rowpos = row;    // assigned row
146
147     /* Neiborhood */
148     for (j = 0, k = 0; j < PROC_MATRIX_SIZE; j++) {
149       if (i != j * PROC_MATRIX_SIZE + (row)) {
150         assignment.row[k] = grid[j * PROC_MATRIX_SIZE + (row)];
151         k++;
152       }
153     }
154     for (j = 0, k = 0; j < PROC_MATRIX_SIZE; j++) {
155       if (i != (line) * PROC_MATRIX_SIZE + j) {
156         assignment.line[k] = grid[(line) * PROC_MATRIX_SIZE + j];
157         k++;
158       }
159     }
160
161     assignment.A = xbt_matrix_new_sub(A,
162                                       submatrix_size, submatrix_size,
163                                       submatrix_size * line,
164                                       submatrix_size * row, NULL);
165     assignment.B =
166       xbt_matrix_new_sub(B, submatrix_size, submatrix_size,
167                          submatrix_size * line, submatrix_size * row, NULL);
168     row++;
169     if (row >= PROC_MATRIX_SIZE) {
170       row = 0;
171       line++;
172     }
173
174     gras_msg_send(socket[i], "pmm_slave", &assignment);
175     xbt_matrix_free(assignment.A);
176     xbt_matrix_free(assignment.B);
177   }
178
179   /* (have a rest while the slave perform the multiplication) */
180
181   /* Retrieve the results */
182   for (i = 0; i < SLAVE_COUNT; i++) {
183     gras_msg_wait(6000, "result", &from, &result);
184     VERB2("%d slaves are done already. Waiting for %d", i + 1, SLAVE_COUNT);
185     xbt_matrix_copy_values(C, result.C, submatrix_size, submatrix_size,
186                            submatrix_size * result.linepos,
187                            submatrix_size * result.rowpos, 0, 0, NULL);
188     xbt_matrix_free(result.C);
189   }
190   /*    end of gather   */
191
192   if (xbt_matrix_double_is_seq(C))
193     INFO0("XXXXXXXXXXXXXXXXXXXXXX Ok, the result matches expectations");
194   else {
195     WARN0("the result seems wrong");
196   if (DATA_MATRIX_SIZE < 30) {
197     INFO0("The Result of Multiplication is :");
198     xbt_matrix_dump(C, "C:res", 0, xbt_matrix_dump_display_double);
199   } else {
200     INFO1("Matrix size too big (%d>30) to be displayed here",
201           DATA_MATRIX_SIZE);
202   }
203   }
204
205   amok_pm_group_shutdown("pmm");        /* Ok, we're out of here */
206
207   for (i = 0; i < SLAVE_COUNT; i++)
208     gras_socket_close(socket[i]);
209
210   xbt_matrix_free(A);
211   xbt_matrix_free(B);
212   xbt_matrix_free(C);
213   gras_exit();
214   return 0;
215 }                               /* end_of_master */
216
217 /* **********************************************************************
218  * slave code
219  * **********************************************************************/
220
221 static int pmm_worker_cb(gras_msg_cb_ctx_t ctx, void *payload)
222 {
223   /* Recover my initialized Data and My Position */
224   s_pmm_assignment_t assignment = *(s_pmm_assignment_t *) payload;
225   gras_socket_t master = gras_msg_cb_ctx_from(ctx);
226
227   xbt_ex_t e;
228
229   int step, l;
230   xbt_matrix_t bA = xbt_matrix_new(submatrix_size, submatrix_size,
231                                    sizeof(double), NULL);
232   xbt_matrix_t bB = xbt_matrix_new(submatrix_size, submatrix_size,
233                                    sizeof(double), NULL);
234
235   int myline, myrow;
236   xbt_matrix_t mydataA, mydataB;
237   xbt_matrix_t bC =
238     xbt_matrix_double_new_zeros(submatrix_size, submatrix_size);
239
240   result_t result;
241
242   gras_socket_t from;           /* to exchange data with my neighbor */
243
244   /* sockets for brodcast to other slave */
245   gras_socket_t socket_line[PROC_MATRIX_SIZE - 1];
246   gras_socket_t socket_row[PROC_MATRIX_SIZE - 1];
247   memset(socket_line, 0, sizeof(socket_line));
248   memset(socket_row, 0, sizeof(socket_row));
249
250   int i;
251
252   gras_os_sleep(1);             /* wait for my pals */
253
254   myline = assignment.linepos;
255   myrow = assignment.rowpos;
256   mydataA = assignment.A;
257   mydataB = assignment.B;
258
259   if (gras_if_RL())
260     INFO0("Receive my pos and assignment");
261   else
262     INFO2("Receive my pos (%d,%d) and assignment", myline, myrow);
263
264   /* Get my neighborhood from the assignment message (skipping myself) */
265   for (i = 0; i < PROC_MATRIX_SIZE - 1; i++) {
266     socket_line[i] = gras_socket_client(assignment.line[i]->name,
267                                         assignment.line[i]->port);
268     xbt_peer_free(assignment.line[i]);
269   }
270   for (i = 0; i < PROC_MATRIX_SIZE - 1; i++) {
271     socket_row[i] = gras_socket_client(assignment.row[i]->name,
272                                        assignment.row[i]->port);
273     xbt_peer_free(assignment.row[i]);
274   }
275
276   for (step = 0; step < PROC_MATRIX_SIZE; step++) {
277
278     /* a line brodcast */
279     if (myline == step) {
280       VERB2("LINE: step(%d) = Myline(%d). Broadcast my data.", step, myline);
281       for (l = 0; l < PROC_MATRIX_SIZE - 1; l++) {
282         VERB1("LINE:   Send to %s", gras_socket_peer_name(socket_row[l]));
283         gras_msg_send(socket_row[l], "dataB", &mydataB);
284       }
285
286
287       xbt_matrix_free(bB);
288       bB = xbt_matrix_new_sub(mydataB,
289                               submatrix_size, submatrix_size, 0, 0, NULL);
290     } else {
291       TRY {
292         xbt_matrix_free(bB);
293         gras_msg_wait(600, "dataB", &from, &bB);
294       }
295       CATCH(e) {
296         RETHROW0("Can't get a data message from line : %s");
297       }
298       VERB3("LINE: step(%d) <> Myline(%d). Receive data from %s", step,
299             myline, gras_socket_peer_name(from));
300     }
301
302     /* a row brodcast */
303     if (myrow == step) {
304       VERB2("ROW: step(%d)=myrow(%d). Broadcast my data.", step, myrow);
305       for (l = 1; l < PROC_MATRIX_SIZE; l++) {
306         VERB1("ROW:   Send to %s", gras_socket_peer_name(socket_line[l - 1]));
307         gras_msg_send(socket_line[l - 1], "dataA", &mydataA);
308       }
309       xbt_matrix_free(bA);
310       bA = xbt_matrix_new_sub(mydataA,
311                               submatrix_size, submatrix_size, 0, 0, NULL);
312     } else {
313       TRY {
314         xbt_matrix_free(bA);
315         gras_msg_wait(1200, "dataA", &from, &bA);
316       }
317       CATCH(e) {
318         RETHROW0("Can't get a data message from row : %s");
319       }
320       VERB3("ROW: step(%d)<>myrow(%d). Receive data from %s", step, myrow,
321             gras_socket_peer_name(from));
322     }
323     xbt_matrix_double_addmult(bA, bB, bC);
324
325   };
326
327   /* send Result to master */
328   result.C = bC;
329   result.linepos = myline;
330   result.rowpos = myrow;
331
332   TRY {
333     gras_msg_send(master, "result", &result);
334   }
335   CATCH(e) {
336     RETHROW0("Failed to send answer to server: %s");
337   }
338   VERB2(">>>>>>>> Result sent to %s:%d <<<<<<<<",
339         gras_socket_peer_name(master), gras_socket_peer_port(master));
340   /*  Free the allocated resources, and shut GRAS down */
341
342   xbt_matrix_free(bA);
343   xbt_matrix_free(bB);
344   xbt_matrix_free(bC);
345
346   xbt_matrix_free(mydataA);
347   xbt_matrix_free(mydataB);
348   /* FIXME: some are said to be unknown 
349      gras_socket_close(master);
350      gras_socket_close(from);
351      for (l=0; l < PROC_MATRIX_SIZE-1; l++) {
352      if (socket_line[l])
353      gras_socket_close(socket_line[l]);
354      if (socket_row[l])
355      gras_socket_close(socket_row[l]); 
356      } */
357
358   return 0;
359 }
360
361 int slave(int argc, char *argv[])
362 {
363   gras_socket_t mysock;
364   gras_socket_t master = NULL;
365   int connected = 0;
366   int rank;
367
368   /* Init the GRAS's infrastructure */
369   gras_init(&argc, argv);
370   amok_pm_init();
371   if (argc != 3 && argc != 2)
372     xbt_die("Usage: slave masterhost:masterport [rank]");
373   if (argc == 2)
374     rank = -1;
375   else
376     rank = atoi(argv[2]);
377
378   /*  Register the known messages and my callback */
379   register_messages();
380   gras_cb_register("pmm_slave", pmm_worker_cb);
381
382   /* Create the connexions */
383   mysock = gras_socket_server_range(3000, 9999, 0, 0);
384   INFO1("Sensor %d starting", rank);
385   while (!connected) {
386     xbt_ex_t e;
387     TRY {
388       master = gras_socket_client_from_string(argv[1]);
389       connected = 1;
390     }
391     CATCH(e) {
392       if (e.category != system_error)
393         RETHROW;
394       xbt_ex_free(e);
395       gras_os_sleep(0.5);
396     }
397   }
398
399   /* Join and run the group */
400   rank = amok_pm_group_join(master, "pmm");
401   amok_pm_mainloop(600);
402
403   /* housekeeping */
404   gras_socket_close(mysock);
405   //  gras_socket_close(master); Unknown
406   gras_exit();
407   return 0;
408 }                               /* end_of_slave */