Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Added debug information to SIMIX network.
[simgrid.git] / src / simix / smx_network.c
1 /*      $Id$     */
2
3 /* Copyright (c) 2009 Cristian Rosa.
4    All rights reserved.                                          */
5
6 /* This program is free software; you can redistribute it and/or modify it
7  * under the terms of the license (GNU LGPL) which comes with this package. */
8
9 #include "private.h"
10 #include "xbt/log.h"
11
12 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_network, simix,
13                                 "Logging specific to SIMIX (network)");
14
15 /******************************************************************************/
16 /*                           Rendez-Vous Points                               */
17 /******************************************************************************/ 
18
19 /**
20  *  \brief Creates a new rendez-vous point
21  *  \param name The name of the rendez-vous point
22  *  \return The created rendez-vous point
23  */
24 smx_rdv_t SIMIX_rdv_create(const char *name)
25 {
26   smx_rdv_t rvp = xbt_new0(s_smx_rvpoint_t, 1);
27   rvp->name = name ? xbt_strdup(name) : NULL;
28   rvp->read = SIMIX_mutex_init();
29   rvp->write = SIMIX_mutex_init();
30   rvp->comm_fifo = xbt_fifo_new();
31
32   return rvp;
33 }
34
35 /**
36  *  \brief Destroy a rendez-vous point
37  *  \param name The rendez-vous point to destroy
38  */
39 void SIMIX_rdv_destroy(smx_rdv_t rvp)
40 {
41   if(rvp->name)
42     xbt_free(rvp->name);
43   SIMIX_mutex_destroy(rvp->read);
44   SIMIX_mutex_destroy(rvp->write);
45   xbt_fifo_free(rvp->comm_fifo);
46   xbt_free(rvp);
47 }
48
49 /**
50  *  \brief Push a communication request into a rendez-vous point
51  *  The communications request are dequeued by the two functions below
52  *  \param rvp The rendez-vous point
53  *  \param comm The communication request
54  */
55 static inline void SIMIX_rdv_push(smx_rdv_t rvp, smx_comm_t comm)
56 {
57   xbt_fifo_push(rvp->comm_fifo, comm);
58 }
59
60 /**
61  *  \brief Checks if there is a communication request queued in a rendez-vous matching our needs
62  *  \param rvp The rendez-vous with the queue
63  *  \param look_for_src boolean. True: we are receiver looking for sender; False: other way round
64  *  \return The communication request if found, or a newly created one otherwise.
65  */
66 smx_comm_t SIMIX_rdv_get_request(smx_rdv_t rvp, int (filter)(smx_comm_t, void*), void *arg) {
67   smx_comm_t comm;
68   xbt_fifo_item_t item;
69
70   /* Traverse the rendez-vous queue looking for a comm request matching the
71      filter conditions. If found return it and remove it from the list. */
72   xbt_fifo_foreach(rvp->comm_fifo, item, comm, smx_comm_t) {
73     if(filter(comm, arg)){
74       SIMIX_communication_use(comm);
75       xbt_fifo_remove_item(rvp->comm_fifo, item);
76       DEBUG1("Communication request found! %p", comm);
77       return comm;
78     }
79   }
80
81   /* no relevant request found. Return NULL */
82   DEBUG0("Communication request not found");
83   return NULL;
84 }
85
86 /******************************************************************************/
87 /*                           Communication Requests                           */
88 /******************************************************************************/ 
89
90 /**
91  *  \brief Creates a new communication request
92  *  \param sender The process starting the communication (by send)
93  *  \param receiver The process receiving the communication (by recv)
94  *  \return the communication request
95  */  
96 smx_comm_t SIMIX_communication_new(smx_comm_type_t type, smx_rdv_t rdv)
97 {
98   /* alloc structures */
99   smx_comm_t comm = xbt_new0(s_smx_comm_t, 1);
100   comm->type = type;
101   comm->cond = SIMIX_cond_init();
102   comm->rdv = rdv;
103   comm->refcount = 1;
104   
105   return comm;
106 }
107
108 /**
109  *  \brief Destroy a communication request
110  *  \param comm The request to be destroyed
111  */
112 void SIMIX_communication_destroy(smx_comm_t comm)
113 {
114   comm->refcount--;
115   if(comm->refcount == 0){
116     if(comm->act != NULL)
117       SIMIX_action_destroy(comm->act);
118
119     xbt_free(comm->cond);
120     xbt_free(comm);
121   }
122 }
123
124 /**
125  *  \brief Increase the number of users of the communication.
126  *  \param comm The communication request
127  *  Each communication request can be used by more than one process, so it is
128  *  necessary to know number of them at destroy time, to avoid freeing stuff that
129  *  maybe is in use by others.
130  *  \
131  */
132 static inline void SIMIX_communication_use(smx_comm_t comm)
133 {
134   comm->refcount++;
135 }
136
137 /**
138  *  \brief Start the simulation of a communication request
139  *  \param comm The communication request
140  */
141 static inline void SIMIX_communication_start(smx_comm_t comm)
142 {
143   /* If both the sender and the receiver are already there, start the communication */
144   if(comm->src_host != NULL && comm->dst_host != NULL){
145     DEBUG1("Starting communication %p", comm);
146     comm->act = SIMIX_action_communicate(comm->src_host, comm->dst_host, NULL, 
147                                          comm->task_size, comm->rate);
148     /* Add the communication as user data into the action, so it can be reached from it later */
149     comm->act->data = comm;
150     
151     SIMIX_register_action_to_condition(comm->act, comm->cond);
152   }else{
153     DEBUG1("Communication %p cannot be started, peer missing", comm);
154   }
155 }
156
157 /**
158  *  \brief Waits for communication completion and performs error checking
159  *  \param comm The communication
160  *  \param timeout The max amount of time to wait for the communication to finish
161  *
162  *  Throws:
163  *   - host_error if peer failed
164  *   - timeout_error if communication reached the timeout specified
165  *   - network_error if network failed or peer issued a timeout
166  */
167 static inline void SIMIX_communication_wait_for_completion(smx_comm_t comm, double timeout)
168 {
169   xbt_ex_t e;
170
171   DEBUG1("Waiting for the completion of communication %p", comm);
172   
173   if(timeout > 0){
174     TRY{
175       SIMIX_cond_wait_timeout(comm->cond, NULL, timeout);
176     }
177     CATCH(e){
178       /* If it's a timeout then cancel the communication and signal the other peer */
179       if(e.category == timeout_error){
180         DEBUG1("Communication timeout! %p", comm);
181         if(SIMIX_action_get_state(comm->act) == SURF_ACTION_RUNNING)
182           SIMIX_action_cancel(comm->act);
183         SIMIX_cond_signal(comm->cond);
184         /* FIXME: remove from the rvp if the communication didn't started */
185         SIMIX_communication_destroy(comm);
186       }
187       RETHROW;
188     }
189   }else{
190     SIMIX_cond_wait(comm->cond, NULL);
191   }
192
193   DEBUG1("Communication %p complete! Let's check for errors", comm);
194   
195   /* Check for errors */
196   if (SIMIX_host_get_state(comm->dst_host) == 0){
197     SIMIX_communication_destroy(comm);
198     THROW1(host_error, 0, "Destination host %s failed", comm->dst_host->name);
199   } else if (SIMIX_host_get_state(comm->src_host) == 0){
200     SIMIX_communication_destroy(comm);
201     THROW1(host_error, 0, "Source host %s failed", comm->src_host->name);
202   } else if (SIMIX_action_get_state(comm->act) == SURF_ACTION_FAILED){
203     SIMIX_communication_destroy(comm);
204     THROW0(network_error, 0, "Link failure");
205   }
206
207   SIMIX_unregister_action_to_condition(comm->act, comm->cond);
208 }
209
210 /**
211  *  \brief Copy the communication data from the sender's buffer to the receiver's one
212  *  \param comm The communication
213  */
214 void SIMIX_network_copy_data(smx_comm_t comm)
215 {
216   /* Copy the minimum between the size of the sender's message and the size of the
217      receiver's buffer */
218   *comm->dest_buff_size = *comm->dest_buff_size < comm->data_size ? 
219                             *comm->dest_buff_size : comm->data_size;
220
221   DEBUG4("Copying comm %p data from %s -> %s (%zu bytes)", comm, 
222          comm->src_host->name, comm->dst_host->name, *comm->dest_buff_size);
223
224   memcpy(comm->dest_buff, comm->data, *comm->dest_buff_size);
225 }
226
227 /**
228  *  \brief Checks if a communication is a send request
229  *  \param comm The communication
230  *  \return Boolean value
231  */
232 int SIMIX_communication_isSend(smx_comm_t comm)
233 {
234   return comm->type == comm_send ? TRUE : FALSE;
235 }
236
237 /**
238  *  \brief Checks if a communication is a recv request
239  *  \param comm The communication
240  *  \return Boolean value
241  */
242 int SIMIX_communication_isRecv(smx_comm_t comm)
243 {
244   return comm->type == comm_recv ? TRUE : FALSE;
245 }
246
247 /* FIXME: move to some other place */
248 int comm_filter_get(smx_comm_t comm, void *arg)
249 {
250   if(comm->type == comm_send){
251     if(arg && comm->src_host != (smx_host_t)arg)
252      return FALSE;
253     else
254      return TRUE;
255   }else{
256     return FALSE;
257   }
258 }
259
260 int comm_filter_put(smx_comm_t comm, void *arg)
261 {
262   return comm->type == comm_recv ? TRUE : FALSE;
263 }
264 /******************************************************************************/
265 /*                        Synchronous Communication                           */
266 /******************************************************************************/
267 /*  Throws:
268  *   - host_error if peer failed
269  *   - timeout_error if communication reached the timeout specified
270  *   - network_error if network failed or peer issued a timeout
271  */
272 void SIMIX_network_send(smx_rdv_t rdv, double task_size, double rate, 
273                         double timeout, void *data, size_t data_size,
274                         int (filter)(smx_comm_t, void *), void *arg)
275 {
276   smx_comm_t comm;
277   
278   /* Look for communication request matching our needs. 
279      If it is not found then create it and push it into the rendez-vous point */
280   comm = SIMIX_rdv_get_request(rdv, filter, arg);
281
282   if(comm == NULL){
283     comm = SIMIX_communication_new(comm_send, rdv);
284     SIMIX_rdv_push(rdv, comm);
285   }
286
287   /* Setup the communication request */
288   comm->src_host = SIMIX_host_self();
289   comm->task_size = task_size;
290   comm->rate = rate;
291   comm->data = data;
292   comm->data_size = data_size;
293
294   SIMIX_communication_start(comm);
295
296   /* Wait for communication completion */
297   /* FIXME: if the semantic is non blocking, it shouldn't wait on the condition here */
298   /* FIXME: add timeout checking stuff */
299   SIMIX_communication_wait_for_completion(comm, timeout);
300
301   SIMIX_communication_destroy(comm);
302 }
303
304 /*  Throws:
305  *   - host_error if peer failed
306  *   - timeout_error if communication reached the timeout specified
307  *   - network_error if network failed or peer issued a timeout
308  */
309 void SIMIX_network_recv(smx_rdv_t rdv, double timeout, void *data, 
310                         size_t *data_size, int (filter)(smx_comm_t, void *), void *arg)
311 {
312   smx_comm_t comm;
313
314   /* Look for communication request matching our needs. 
315      If it is not found then create it and push it into the rendez-vous point */
316   comm = SIMIX_rdv_get_request(rdv, filter, arg);
317
318   if(comm == NULL){
319     comm = SIMIX_communication_new(comm_recv, rdv);
320     SIMIX_rdv_push(rdv, comm);
321   }
322
323   /* Setup communication request */
324   comm->dst_host = SIMIX_host_self();
325   comm->dest_buff = data;
326   comm->dest_buff_size = data_size;
327
328   SIMIX_communication_start(comm);
329
330   /* Wait for communication completion */
331   /* FIXME: if the semantic is non blocking, it shouldn't wait on the condition here */
332   /* FIXME: add timeout checking stuff */
333   SIMIX_communication_wait_for_completion(comm, timeout);
334
335   SIMIX_communication_destroy(comm);
336 }
337
338 /******************************************************************************/
339 /*                        Asynchronous Communication                          */
340 /******************************************************************************/
341
342 /*
343 void SIMIX_network_wait(smx_action_t comm, double timeout)
344 {
345     if (timeout > 0)
346       SIMIX_cond_wait_timeout(rvp_cond, rvp_comm_mutex, timeout - start_time);
347     else
348       SIMIX_cond_wait(rvp_cond, rvp_comm_mutex);    
349
350 }
351
352
353 XBT_PUBLIC(int) SIMIX_network_test(smx_action_t comm)
354 {
355   if(SIMIX_action_get_state (comm) == SURF_ACTION_DONE){
356     memcpy(comm->data
357
358   return SIMIX_action_get_state (comm) == SURF_ACTION_DONE ? TRUE : FALSE;
359 }*/
360
361
362
363
364
365
366