]> AND Private Git Repository - loba.git/blob - communicator.cpp
Logo AND Algorithmique Numérique Distribuée

Private GIT Repository
Add a VERSION file, for the distrib.
[loba.git] / communicator.cpp
1 #include <algorithm>
2 #include <functional>
3 #include <msg/msg.h>
4 #include <xbt/log.h>
5
6 XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(comm);
7
8 #include "misc.h"
9 #include "tracing.h"
10
11 #include "communicator.h"
12
13 namespace {
14
15     void check_for_lost_messages(size_t size, const char* descr)
16     {
17         if (size)
18             XBT_WARN("lost %zu %s message%s!", size, descr, ESSE(size));
19     }
20
21 }
22
23 communicator::communicator()
24     : host(static_cast<hostdata*>(MSG_host_get_data(MSG_host_self())))
25 {
26     receiver_thread = new_msg_thread("receiver",
27                                      std::bind(&communicator::receiver, this));
28     receiver_thread->start();
29 }
30
31 communicator::~communicator()
32 {
33     msg_task_t task;
34
35     XBT_DEBUG("send finalize to receiver/ctrl");
36     task = MSG_task_create("finalize", 0.0, 0, NULL);
37     MSG_task_send(task, host->get_ctrl_mbox());
38
39     XBT_DEBUG("send finalize to receiver/data");
40     task = MSG_task_create("finalize", 0.0, 0, NULL);
41     MSG_task_send(task, host->get_data_mbox());
42
43     receiver_thread->wait();
44     delete receiver_thread;
45
46     check_for_lost_messages(ctrl_received.size(), "received ctrl");
47     check_for_lost_messages(data_received.size(), "received data");
48     check_for_lost_messages(ctrl_sent.size(), "sent ctrl");
49     check_for_lost_messages(data_sent.size(), "sent data");
50 }
51
52 msg_comm_t communicator::real_send(const char* dest, message* msg)
53 {
54     XBT_DEBUG("send %s to %s", msg->to_string().c_str(), dest);
55     msg_task_t task = MSG_task_create("message", 0.0, msg->get_size(), msg);
56     // MSG_task_set_category(task,
57     //                       msg->get_type() == message::DATA ?
58     //                       TRACE_CAT_DATA : TRACE_CAT_CTRL);
59     return MSG_task_isend(task, dest);
60 }
61
62 void communicator::real_flush(sent_comm_type& sent_comm, bool wait)
63 {
64     sent_comm_type::iterator bound =
65         std::remove_if(sent_comm.begin(), sent_comm.end(),
66                        comm_test_n_destroy);
67     sent_comm.erase(bound, sent_comm.end());
68     if (wait && !sent_comm.empty()) {
69         size_t size = sent_comm.size();
70         msg_comm_t* comms = new msg_comm_t[size];
71         std::copy(sent_comm.begin(), sent_comm.end(), comms);
72         sent_comm.clear();
73         MSG_comm_waitall(comms, size, -1.0);
74         std::for_each(comms, comms + size, comm_check_n_destroy);
75         delete[] comms;
76     }
77 }
78
79 void communicator::receiver()
80 {
81     xbt_dynar_t comms = xbt_dynar_new(sizeof(msg_comm_t), NULL);
82     struct channel {
83         msg_comm_t comm;
84         msg_task_t task;
85         const char* mbox;
86         message_queue& received;
87     };
88     channel chan[] = { { NULL, NULL, host->get_ctrl_mbox(), ctrl_received },
89                        { NULL, NULL, host->get_data_mbox(), data_received } };
90     const int chan_size = (sizeof chan) / (sizeof chan[0]);
91
92     for (int i = 0 ; i < chan_size ; ++i) {
93         chan[i].comm = MSG_task_irecv(&chan[i].task, chan[i].mbox);
94         xbt_dynar_push(comms, &chan[i].comm);
95     }
96
97     while (!xbt_dynar_is_empty(comms)) {
98
99         int index = MSG_comm_waitany(comms);
100         msg_comm_t finished_comm = xbt_dynar_get_as(comms, index, msg_comm_t);
101         channel* ch;
102
103         for (ch = chan ; ch->comm != finished_comm ; ++ch)
104             /* nop */;
105
106         comm_check_n_destroy(ch->comm);
107         if (strcmp(MSG_task_get_name(ch->task), "finalize")) {
108             XBT_DEBUG("received message on %s", ch->mbox);
109             ch->received.push(ch->task);
110             ch->task = NULL;
111             ch->comm = MSG_task_irecv(&ch->task, ch->mbox);
112             xbt_dynar_set_as(comms, index, msg_comm_t, ch->comm);
113         } else {
114             XBT_DEBUG("received finalize on %s", ch->mbox);
115             MSG_task_destroy(ch->task);
116             ch->task = NULL;
117             ch->comm = NULL;
118             xbt_dynar_remove_at(comms, index, NULL);
119         }
120
121     }
122     xbt_dynar_free(&comms);
123 }
124
125 void communicator::comm_check_n_destroy(msg_comm_t comm)
126 {
127     xbt_assert(MSG_comm_get_status(comm) == MSG_OK);
128     MSG_comm_destroy(comm);
129 }
130
131 bool communicator::comm_test_n_destroy(msg_comm_t comm)
132 {
133     if (MSG_comm_test(comm)) {
134         comm_check_n_destroy(comm);
135         return true;
136     } else
137         return false;
138 }
139
140 // Local variables:
141 // mode: c++
142 // End: