Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Add ExtensionSetCalculator files
[simgrid.git] / src / mc / remote / Channel.cpp
1 /* Copyright (c) 2015-2023. The SimGrid Team.  All rights reserved.         */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "src/mc/remote/Channel.hpp"
7 #include <xbt/log.h>
8
9 #include <cerrno>
10 #include <cstring>
11 #include <sys/socket.h>
12 #include <sys/types.h>
13 #include <unistd.h>
14
15 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(mc_channel, mc, "MC interprocess communication");
16
17 namespace simgrid::mc {
18 Channel::Channel(int sock, Channel const& other) : socket_(sock)
19 {
20   if (not other.buffer_.empty()) {
21     ssize_t size = other.buffer_.size();
22     XBT_DEBUG("Adopt %d bytes buffered by father channel.", (int)size);
23     buffer_.resize(size);
24     memcpy(buffer_.data(), other.buffer_.data(), size);
25   }
26 }
27
28 Channel::~Channel()
29 {
30   if (this->socket_ >= 0)
31     close(this->socket_);
32 }
33
34 /** @brief Send a message; returns 0 on success or errno on failure */
35 int Channel::send(const void* message, size_t size) const
36 {
37   if (size >= sizeof(int) && is_valid_MessageType(*static_cast<const int*>(message))) {
38     XBT_DEBUG("Sending %s (%zu bytes sent)", to_c_str(*static_cast<const MessageType*>(message)), size);
39   } else {
40     XBT_DEBUG("Sending bytes directly (from address %p) (%zu bytes sent)", message, size);
41     if (size == 0)
42       XBT_WARN("Request to send a 0-sized message! Proceeding anyway.");
43   }
44
45   while (::send(this->socket_, message, size, 0) == -1) {
46     if (errno != EINTR) {
47       XBT_ERROR("Channel::send failure: %s", strerror(errno));
48       return errno;
49     }
50   }
51   return 0;
52 }
53
54 ssize_t Channel::receive(const void* message, size_t size, int flags)
55 {
56   size_t bufsize = buffer_.size();
57   ssize_t copied = 0;
58   void* whereto  = const_cast<void*>(message);
59   size_t todo    = size;
60   if (bufsize > 0) {
61     XBT_DEBUG("%d %zu bytes (of %zu expected) are already in buffer", getpid(), bufsize, size);
62     if (bufsize >= size) {
63       copied = size;
64       memcpy(whereto, buffer_.data(), size);
65       memcpy(static_cast<void*>(buffer_.data()), buffer_.data() + size, bufsize - size);
66       buffer_.resize(bufsize - size);
67       todo = 0;
68     } else {
69       copied = bufsize;
70       memcpy(whereto, buffer_.data(), bufsize);
71       buffer_.clear();
72       todo -= bufsize;
73       whereto = static_cast<char*>(whereto) + bufsize;
74     }
75   }
76   ssize_t res = 0;
77   if (todo > 0) {
78     errno = 0;
79     res   = recv(this->socket_, whereto, todo, flags);
80     xbt_assert(res != -1 || errno == EAGAIN, "Channel::receive failure: %s", strerror(errno));
81     if (res == -1) {
82       res = 0;
83     }
84   }
85   XBT_DEBUG("%d Wanted %d; Got %d from buffer and %d from network", getpid(), (int)size, (int)copied, (int)res);
86   res += copied;
87   if (static_cast<size_t>(res) >= sizeof(int) && is_valid_MessageType(*static_cast<const int*>(message))) {
88     XBT_DEBUG("%d Receive %s (requested %zu; received %zd at %p)", getpid(),
89               to_c_str(*static_cast<const MessageType*>(message)), size, res, message);
90   } else {
91     XBT_DEBUG("Receive %zd bytes", res);
92   }
93   return res;
94 }
95
96 void Channel::reinject(const char* data, size_t size)
97 {
98   xbt_assert(size > 0, "Cannot reinject less than one char (size: %lu)", size);
99   auto prev_size = buffer_.size();
100   XBT_DEBUG("%d Reinject %zu bytes on top of %zu pre-existing bytes", getpid(), size, prev_size);
101   buffer_.resize(prev_size + size);
102   memcpy(buffer_.data() + prev_size, data, size);
103 }
104 } // namespace simgrid::mc