X-Git-Url: http://bilbo.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/149c63f36e15b8500b1e826bda5138318ff7ba2b..365a83d3a1407923acccab758f9225e11408b5c6:/src/mc/remote/Channel.cpp diff --git a/src/mc/remote/Channel.cpp b/src/mc/remote/Channel.cpp index 782a563184..7e3bfb9902 100644 --- a/src/mc/remote/Channel.cpp +++ b/src/mc/remote/Channel.cpp @@ -1,5 +1,4 @@ -/* Copyright (c) 2015-2020. The SimGrid Team. - * All rights reserved. */ +/* Copyright (c) 2015-2023. The SimGrid Team. All rights reserved. */ /* This program is free software; you can redistribute it and/or modify it * under the terms of the license (GNU LGPL) which comes with this package. */ @@ -7,15 +6,20 @@ #include "src/mc/remote/Channel.hpp" #include +#include #include -#include +#include #include #include +#include -XBT_LOG_NEW_DEFAULT_SUBCATEGORY(mc_Channel, mc, "MC interprocess communication"); +XBT_LOG_NEW_DEFAULT_SUBCATEGORY(mc_channel, mc, "MC interprocess communication"); -namespace simgrid { -namespace mc { +namespace simgrid::mc { +Channel::Channel(int sock, Channel const& other) : socket_(sock), buffer_(other.buffer_) +{ + XBT_DEBUG("Adopt %zu bytes buffered by father channel.", buffer_.size()); +} Channel::~Channel() { @@ -26,20 +30,61 @@ Channel::~Channel() /** @brief Send a message; returns 0 on success or errno on failure */ int Channel::send(const void* message, size_t size) const { - XBT_DEBUG("Send %s", MC_message_type_name(*(e_mc_message_type*)message)); + if (size >= sizeof(int) && is_valid_MessageType(*static_cast(message))) { + XBT_DEBUG("Sending %s (%zu bytes sent)", to_c_str(*static_cast(message)), size); + } else { + XBT_DEBUG("Sending bytes directly (from address %p) (%zu bytes sent)", message, size); + if (size == 0) + XBT_WARN("Request to send a 0-sized message! Proceeding anyway."); + } + while (::send(this->socket_, message, size, 0) == -1) { - if (errno != EINTR) + if (errno != EINTR) { + XBT_ERROR("Channel::send failure: %s", strerror(errno)); return errno; + } } return 0; } -ssize_t Channel::receive(void* message, size_t size, bool block) const +ssize_t Channel::receive(void* message, size_t size, int flags) { - ssize_t res = recv(this->socket_, message, size, block ? 0 : MSG_DONTWAIT); - if (res != -1) - XBT_DEBUG("Receive %s", MC_message_type_name(*(e_mc_message_type*)message)); + size_t bufsize = buffer_.size(); + ssize_t copied = 0; + auto* whereto = static_cast(message); + size_t todo = size; + if (bufsize > 0) { + XBT_DEBUG("%d %zu bytes (of %zu expected) are already in buffer", getpid(), bufsize, size); + copied = std::min(size, bufsize); + std::copy_n(begin(buffer_), copied, whereto); + buffer_.erase(begin(buffer_), begin(buffer_) + copied); + todo -= copied; + whereto += copied; + } + ssize_t res = 0; + if (todo > 0) { + errno = 0; + res = recv(this->socket_, whereto, todo, flags); + xbt_assert(res != -1 || errno == EAGAIN, "Channel::receive failure: %s", strerror(errno)); + if (res == -1) { + res = 0; + } + } + XBT_DEBUG("%d Wanted %zu; Got %zd from buffer and %zd from network", getpid(), size, copied, res); + res += copied; + if (static_cast(res) >= sizeof(int) && is_valid_MessageType(*static_cast(message))) { + XBT_DEBUG("%d Receive %s (requested %zu; received %zd at %p)", getpid(), + to_c_str(*static_cast(message)), size, res, message); + } else { + XBT_DEBUG("Receive %zd bytes", res); + } return res; } + +void Channel::reinject(const char* data, size_t size) +{ + xbt_assert(size > 0, "Cannot reinject less than one char (size: %lu)", size); + XBT_DEBUG("%d Reinject %zu bytes on top of %zu pre-existing bytes", getpid(), size, buffer_.size()); + buffer_.insert(end(buffer_), data, data + size); } -} +} // namespace simgrid::mc