]> AND Private Git Repository - loba.git/blob - communicator.cpp
Logo AND Algorithmique Numérique Distribuée

Private GIT Repository
da5e2b5f8062755727ed07dcd3617e8a07f0a0fb
[loba.git] / communicator.cpp
1 #include <algorithm>
2 #include <cstring>
3 #include <msg/msg.h>
4 #include <xbt/log.h>
5 #include "communicator.h"
6 #include "simgrid_features.h"
7
8 // XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(simu);
9 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(comm, simu,
10                                 "Messages from asynchronous pipes");
11
12 namespace {
13
14     bool comm_test_n_destroy(msg_comm_t& comm)
15     {
16         if (MSG_comm_test(comm)) {
17             MSG_comm_destroy(comm);
18             return true;
19         } else
20             return false;
21     }
22
23 }
24
25 communicator::communicator()
26 {
27     const char* hostname = MSG_host_get_name(MSG_host_self());
28     ctrl_mbox = hostname;
29     ctrl_mbox += "_ctrl";
30     ctrl_task = NULL;
31     ctrl_comm = MSG_task_irecv(&ctrl_task, get_ctrl_mbox());
32
33     data_mbox = hostname;
34     data_mbox += "_data";
35     data_task = NULL;
36     data_comm = MSG_task_irecv(&data_task, get_data_mbox());
37 }
38
39 communicator::~communicator()
40 {
41     // fixme: don't know how to free pending communications
42     // (data_comm, ctrl_comm and sent_comm)
43
44     flush_sent();
45     if (!sent_comm.empty())
46         WARN1("Lost %ld send communications!", (long )sent_comm.size());
47 }
48
49 void communicator::send(const char* dest, message* msg)
50 {
51     double msg_size = sizeof *msg;
52     if (msg->get_type() == message::LOAD)
53         msg_size += msg->get_amount();
54     m_task_t task = MSG_task_create("message", 0.0, msg_size, msg);    
55     sent_comm.push_back(MSG_task_isend(task, dest));
56     flush_sent();
57 }
58
59 bool communicator::recv(message*& msg, m_host_t& from)
60 {
61     msg = NULL;
62
63     if (comm_test_n_destroy(ctrl_comm)) {
64         msg = (message* )MSG_task_get_data(ctrl_task);
65         from = MSG_task_get_source(ctrl_task);
66         MSG_task_destroy(ctrl_task);
67         ctrl_task = NULL;
68         ctrl_comm = MSG_task_irecv(&ctrl_task, get_ctrl_mbox());
69
70     } else if (comm_test_n_destroy(data_comm)) {
71         msg = (message* )MSG_task_get_data(data_task);
72         from = MSG_task_get_source(data_task);
73         MSG_task_destroy(data_task);
74         data_task = NULL;
75         data_comm = MSG_task_irecv(&data_task, get_data_mbox());
76     }
77
78     return msg != NULL;
79 }
80
81 int communicator::send_backlog()
82 {
83     flush_sent();
84     return sent_comm.size();
85 }
86
87 void communicator::flush_sent()
88 {
89     std::remove_if(sent_comm.begin(), sent_comm.end(), comm_test_n_destroy);
90 }
91
92 // Local variables:
93 // mode: c++
94 // End: