From: SUTER Frederic Date: Wed, 19 May 2021 16:27:18 +0000 (+0200) Subject: introduce a new plugin: stock implementation of a Producer-Consumer with a monitor X-Git-Tag: v3.28~272 X-Git-Url: http://bilbo.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/commitdiff_plain/a02aa042d09050fb82401d2e6902e70cb8147956 introduce a new plugin: stock implementation of a Producer-Consumer with a monitor --- diff --git a/.gitignore b/.gitignore index ac8438af3b..d57fe08b2d 100644 --- a/.gitignore +++ b/.gitignore @@ -219,6 +219,7 @@ examples/cpp/platform-profile/s4u-platform-profile examples/cpp/platform-properties/s4u-platform-properties examples/cpp/plugin-host-load/s4u-plugin-host-load examples/cpp/plugin-link-load/s4u-plugin-link-load +examples/cpp/plugin-prodcons/s4u-plugin-prodcons examples/cpp/replay-comm/s4u-replay-comm examples/cpp/replay-io/s4u-replay-io examples/cpp/routing-get-clusters/s4u-routing-get-clusters diff --git a/ChangeLog b/ChangeLog index 194b7e2c28..41eea7478e 100644 --- a/ChangeLog +++ b/ChangeLog @@ -2,6 +2,11 @@ SimGrid (3.27.1) NOT RELEASED YET (v3.28 expected June 21. 2021, 03:32 UTC) +New features: + - New plugin: Producer-Consumer with monitor. Just requires to include the + include/simgrid/plugins/ProducerConsumer.hpp header to be used. See the + associated example (examples/cpp/plugin-prodcons). + S4U: - Fixed a bug where Activity::wait_for() killed the activity on timeout. Explicitly cancel the activity to get back to previous behavior. diff --git a/MANIFEST.in b/MANIFEST.in index 5225d57808..8416dc1266 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -299,6 +299,8 @@ include examples/cpp/plugin-host-load/s4u-plugin-host-load.cpp include examples/cpp/plugin-host-load/s4u-plugin-host-load.tesh include examples/cpp/plugin-link-load/s4u-plugin-link-load.cpp include examples/cpp/plugin-link-load/s4u-plugin-link-load.tesh +include examples/cpp/plugin-prodcons/s4u-plugin-prodcons.cpp +include examples/cpp/plugin-prodcons/s4u-plugin-prodcons.tesh include examples/cpp/replay-comm/s4u-replay-comm-split-p0.txt include examples/cpp/replay-comm/s4u-replay-comm-split-p1.txt include examples/cpp/replay-comm/s4u-replay-comm-split_d.xml @@ -1994,6 +1996,7 @@ include include/simgrid/mailbox.h include include/simgrid/modelchecker.h include include/simgrid/msg.h include include/simgrid/mutex.h +include include/simgrid/plugins/ProducerConsumer.hpp include include/simgrid/plugins/dvfs.h include include/simgrid/plugins/energy.h include include/simgrid/plugins/file_system.h diff --git a/examples/cpp/CMakeLists.txt b/examples/cpp/CMakeLists.txt index ad691e103f..1c58818a7a 100644 --- a/examples/cpp/CMakeLists.txt +++ b/examples/cpp/CMakeLists.txt @@ -76,7 +76,7 @@ foreach (example actor-create actor-daemon actor-exiting actor-join actor-kill network-wifi io-async io-file-system io-file-remote io-disk-raw io-dependent platform-failures platform-profile platform-properties - plugin-host-load plugin-link-load + plugin-host-load plugin-link-load plugin-prodcons replay-comm replay-io routing-get-clusters synchro-barrier synchro-condition-variable synchro-condition-variable-waituntil synchro-mutex synchro-semaphore diff --git a/examples/cpp/plugin-prodcons/s4u-plugin-prodcons.cpp b/examples/cpp/plugin-prodcons/s4u-plugin-prodcons.cpp new file mode 100644 index 0000000000..0faa27d064 --- /dev/null +++ b/examples/cpp/plugin-prodcons/s4u-plugin-prodcons.cpp @@ -0,0 +1,92 @@ +/* Copyright (c) 2007-2021. The SimGrid Team. All rights reserved. */ + +/* This program is free software; you can redistribute it and/or modify it + * under the terms of the license (GNU LGPL) which comes with this package. */ + +#include +#include +#include +#include +#include + +XBT_LOG_NEW_DEFAULT_CATEGORY(s4u_test, "Messages specific for this s4u example"); + +namespace sg4 = simgrid::s4u; + +static void ingester(int id, simgrid::plugin::ProducerConsumerPtr pc) +{ + sg4::this_actor::sleep_for(simgrid::xbt::random::uniform_real(0, 1)); + for (int i = 0; i < 3; i++) { + int* data = new int(10 * id + i); + pc->put(data, 1.2125e6); // last for 0.01s + XBT_INFO("data sucessfully put: %d", *data); + sg4::this_actor::sleep_for((3 - i) * simgrid::xbt::random::uniform_real(0, 1)); + } + + for (int i = 0; i < 3; i++) { + int* data = new int(10 * id + i); + pc->put_async(data, 1.2125e6); // last for 0.01s + XBT_INFO("data sucessfully put: %d", *data); + sg4::this_actor::sleep_for((i + 3) * simgrid::xbt::random::uniform_real(0, 1)); + } +} + +static void retriever(int id, simgrid::plugin::ProducerConsumerPtr pc) +{ + sg4::this_actor::sleep_for(simgrid::xbt::random::uniform_real(0, 1)); + for (int i = 0; i < 3; i++) { + int* data; + sg4::CommPtr comm = pc->get_async(&data); + comm->wait(); + XBT_INFO("data sucessfully get: %d", *data); + delete data; + sg4::this_actor::sleep_for((i + 3) * simgrid::xbt::random::uniform_real(0, 1)); + } + + for (int i = 0; i < 3; i++) { + int* data = pc->get(); + XBT_INFO("data sucessfully get: %d", *data); + delete data; + sg4::this_actor::sleep_for((3 - i) * simgrid::xbt::random::uniform_real(0, 1)); + } +} + +int main(int argc, char* argv[]) +{ + sg4::Engine e(&argc, argv); + + // Platform creation + auto* cluster = sg4::create_star_zone("cluster"); + for (int i = 0; i < 8; i++) { + std::string hostname = std::string("node-") + std::to_string(i) + ".simgrid.org"; + + const auto* host = cluster->create_host(hostname, "1Gf"); + + std::string linkname = std::string("cluster") + "_link_" + std::to_string(i); + auto* link_up = cluster->create_link(linkname + "_UP", "1Gbps"); + auto* link_down = cluster->create_link(linkname + "_DOWN", "1Gbps"); + + cluster->add_route(host->get_netpoint(), nullptr, nullptr, nullptr, std::vector{link_up}, false); + cluster->add_route(nullptr, host->get_netpoint(), nullptr, nullptr, std::vector{link_down}, false); + } + + auto* router = cluster->create_router("cluster_router"); + cluster->add_route(router, nullptr, nullptr, nullptr, {}); + + simgrid::plugin::ProducerConsumerPtr pc = simgrid::plugin::ProducerConsumer::create(2); + + XBT_INFO("Maximum number of queued data is %u", pc->get_max_queue_size()); + XBT_INFO("Transfers are done in %s mode", pc->get_transfer_mode().c_str()); + + for (int i = 0; i < 3; i++) { + std::string hostname = std::string("node-") + std::to_string(i) + ".simgrid.org"; + sg4::Actor::create("ingester-" + std::to_string(i), sg4::Host::by_name(hostname), &ingester, i, pc); + + hostname = std::string("node-") + std::to_string(i + 3) + ".simgrid.org"; + sg4::Actor::create("retriever-" + std::to_string(i), sg4::Host::by_name(hostname), &retriever, i, pc); + } + + e.run(); + + return 0; +} diff --git a/examples/cpp/plugin-prodcons/s4u-plugin-prodcons.tesh b/examples/cpp/plugin-prodcons/s4u-plugin-prodcons.tesh new file mode 100644 index 0000000000..aea58a2b90 --- /dev/null +++ b/examples/cpp/plugin-prodcons/s4u-plugin-prodcons.tesh @@ -0,0 +1,43 @@ +#!/usr/bin/env tesh + +p This tests the ProducerConsumer plugin + +$ ${bindir:=.}/s4u-plugin-prodcons "--log=root.fmt:[%5.3r]%e[%11a]%e%m%n" +> [0.000] [ maestro] Maximum number of queued data is 2 +> [0.000] [ maestro] Transfers are done in mailbox mode +> [0.145] [ ingester-2] data sucessfully put: 20 +> [0.145] [retriever-0] data sucessfully get: 20 +> [0.825] [ ingester-0] data sucessfully put: 0 +> [0.825] [retriever-0] data sucessfully get: 0 +> [0.916] [ ingester-1] data sucessfully put: 10 +> [0.916] [retriever-1] data sucessfully get: 10 +> [1.218] [ ingester-1] data sucessfully put: 11 +> [1.218] [retriever-2] data sucessfully get: 11 +> [1.794] [ ingester-1] data sucessfully put: 12 +> [1.794] [retriever-2] data sucessfully get: 12 +> [2.340] [ ingester-1] data sucessfully put: 10 +> [2.350] [retriever-0] data sucessfully get: 10 +> [2.732] [ ingester-0] data sucessfully put: 1 +> [2.732] [retriever-1] data sucessfully get: 1 +> [5.765] [ ingester-1] data sucessfully put: 11 +> [5.775] [ ingester-2] data sucessfully put: 21 +> [5.775] [retriever-2] data sucessfully get: 21 +> [6.603] [ ingester-1] data sucessfully put: 12 +> [6.613] [ ingester-0] data sucessfully put: 2 +> [6.613] [retriever-1] data sucessfully get: 2 +> [7.172] [retriever-1] data sucessfully get: 11 +> [7.343] [retriever-0] data sucessfully get: 12 +> [7.570] [ ingester-0] data sucessfully put: 0 +> [8.638] [ ingester-2] data sucessfully put: 22 +> [8.638] [retriever-1] data sucessfully get: 22 +> [8.932] [retriever-1] data sucessfully get: 0 +> [8.935] [ ingester-2] data sucessfully put: 20 +> [9.747] [retriever-0] data sucessfully get: 20 +> [9.971] [ ingester-0] data sucessfully put: 1 +> [9.982] [retriever-0] data sucessfully get: 1 +> [10.200] [ ingester-2] data sucessfully put: 21 +> [10.638] [retriever-2] data sucessfully get: 21 +> [13.369] [ ingester-2] data sucessfully put: 22 +> [13.379] [retriever-2] data sucessfully get: 22 +> [13.634] [ ingester-0] data sucessfully put: 2 +> [14.396] [retriever-2] data sucessfully get: 2 diff --git a/include/simgrid/plugins/ProducerConsumer.hpp b/include/simgrid/plugins/ProducerConsumer.hpp new file mode 100644 index 0000000000..4297a0e8af --- /dev/null +++ b/include/simgrid/plugins/ProducerConsumer.hpp @@ -0,0 +1,215 @@ +/* Copyright (c) 2021. The SimGrid Team. All rights reserved. */ + +/* This program is free software; you can redistribute it and/or modify it + * under the terms of the license (GNU LGPL) which comes with this package. */ + +#ifndef SIMGRID_PLUGIN_PRODUCERCONSUMER_HPP +#define SIMGRID_PLUGIN_PRODUCERCONSUMER_HPP + +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +XBT_LOG_NEW_CATEGORY(producer_consumer, "Producer-Consumer plugin logging category"); + +/** Stock implementation of a generic monitored queue to solve the producer-consumer problem */ + +namespace simgrid { +namespace plugin { + +template class ProducerConsumer; +template using ProducerConsumerPtr = boost::intrusive_ptr>; + +static unsigned long pc_id = 0; + +template class ProducerConsumer { +public: + /** This ProducerConsumer plugin can use two different transfer modes: + * - TransferMode::MAILBOX: this mode induces a s4u::Comm between the actors doing the calls to put() and get(). + * If these actors are on the same host, this communication goes through the host's loopback and can thus be + * seen as a memory copy. Otherwise, data goes over the network. + * - TransferMode::QUEUE: data is internally stored in a std::queue. Putting and getting data to and from this + * data structure has a zero-cost in terms of simulated time. + * Both modes guarantee that the data is consumed in the order it has been produced. However, when data goes + * through the network, s4u::Comm are started in the right order, but may complete in a different order depending + * the characteristics of the different interconnections between host pairs. + */ + enum class TransferMode { MAILBOX = 0, QUEUE }; + +private: + std::string id; + + /* Implementation of a Monitor to handle the data exchanges */ + s4u::MutexPtr mutex_; + s4u::ConditionVariablePtr can_put_; + s4u::ConditionVariablePtr can_get_; + + /* data containers for each of the transfer modes */ + s4u::Mailbox* mbox_ = nullptr; + std::queue queue_; + + unsigned int max_queue_size_ = 1; + TransferMode tmode_ = TransferMode::MAILBOX; + + /* Refcounting management */ + std::atomic_int_fast32_t refcount_{0}; + friend void intrusive_ptr_add_ref(ProducerConsumer* pc) { pc->refcount_.fetch_add(1, std::memory_order_acq_rel); } + + friend void intrusive_ptr_release(ProducerConsumer* pc) + { + if (pc->refcount_.fetch_sub(1, std::memory_order_release) == 1) { + std::atomic_thread_fence(std::memory_order_acquire); + delete pc; + } + } + + ProducerConsumer(unsigned int max_queue_size) : max_queue_size_(max_queue_size) + { + xbt_assert(max_queue_size > 0, "Max queue size of 0 is not allowed"); + + id = std::string("ProducerConsumer") + std::to_string(pc_id); + pc_id++; + + mutex_ = s4u::Mutex::create(); + can_put_ = s4u::ConditionVariable::create(); + can_get_ = s4u::ConditionVariable::create(); + + if (tmode_ == TransferMode::MAILBOX) + mbox_ = s4u::Mailbox::by_name(id); + } + ~ProducerConsumer() = default; + +public: + /** Creation of the monitored queue. Its size can be bounded by passing a strictly positive value to 'max_queue_size' + * as parameter. Calling 'create()' means that the queue size is (virtually) infinite. + */ + static ProducerConsumerPtr create(unsigned int max_queue_size = UINT_MAX) + { + return ProducerConsumerPtr(new ProducerConsumer(max_queue_size)); + } + + /** This method is intended more to set the maximum queue size in a fluent way than changing the size during the + * utilization of the ProducerConsumer. Hence, the modification occurs in a critical section to prevent + * inconsistencies. + */ + ProducerConsumer* set_max_queue_size(unsigned int max_queue_size) + { + std::unique_lock lock(*mutex_); + max_queue_size_ = max_queue_size; + return this; + } + + unsigned int get_max_queue_size() { return max_queue_size_; } + + /** The underlying data container (and transfer mode) can only be modified when the queue is empty.*/ + ProducerConsumer* set_transfer_mode(TransferMode new_mode) + { + if (tmode_ == new_mode) /* No change, do nothing */ + return this; + + xbt_assert(empty(), "cannot change transfer mode when some data is in queue"); + if (new_mode == TransferMode::MAILBOX) { + mbox_ = s4u::Mailbox::by_name(id); + } else { + mbox_ = nullptr; + } + tmode_ = new_mode; + return this; + } + std::string get_transfer_mode() { return tmode_ == TransferMode::MAILBOX ? "mailbox" : "queue"; } + + /** Container-agnostic size() method */ + unsigned int size() { return tmode_ == TransferMode::MAILBOX ? mbox_->size() : queue_.size(); } + + /** Container-agnostic empty() method */ + bool empty() { return tmode_ == TransferMode::MAILBOX ? mbox_->empty() : queue_.empty(); } + + /** Asynchronous put() of a data item of a given size + * - TransferMode::MAILBOX: if put_async is called directly from user code, it can be considered to be done in a + * fire-and-forget mode. No need to save the s4u::CommPtr. + * - TransferMode::QUEUE: the data is simply pushed into the queue. + */ + s4u::CommPtr put_async(T* data, size_t simulated_size_in_bytes) + { + std::unique_lock lock(*mutex_); + s4u::CommPtr comm = nullptr; + XBT_CVERB(producer_consumer, (size() < max_queue_size_) ? "can put" : "must wait"); + + while (size() >= max_queue_size_) + can_put_->wait(lock); + if (tmode_ == TransferMode::MAILBOX) { + comm = mbox_->put_async(data, simulated_size_in_bytes); + } else + queue_.push(data); + can_get_->notify_all(); + return comm; + } + + /** Synchronous put() of a data item of a given size + * - TransferMode::MAILBOX: the caller must wait for the induced communication with the getter of the data to be + * complete to continue with its execution. This wait is done outside of the monitor to prevent serialization. + * - TransferMode::QUEUE: the behavior is exactly the same as put_async: data is simply pushed into the queue. + */ + void put(T* data, size_t simulated_size_in_bytes) + { + s4u::CommPtr comm = put_async(data, simulated_size_in_bytes); + if (comm) { + XBT_CDEBUG(producer_consumer, "Waiting for the data to be consumed"); + comm->wait(); + } + } + + /** Asynchronous get() of a 'data' + * - TransferMode::MAILBOX: the caller is returned a s4u::CommPtr onto which it can wait when the data is really + * needed. + * - TransferMode::QUEUE: the data is simply popped from the queue and directly available. Better to call get() in + * this transfer mode. + */ + s4u::CommPtr get_async(T** data) + { + std::unique_lock lock(*mutex_); + s4u::CommPtr comm = nullptr; + XBT_CVERB(producer_consumer, empty() ? "must wait" : "can get"); + while (empty()) + can_get_->wait(lock); + if (tmode_ == TransferMode::MAILBOX) + comm = mbox_->get_async(data); + else { + *data = queue_.front(); + queue_.pop(); + } + can_put_->notify_all(); + + return comm; + } + + /** Synchronous get() of a 'data' + * - TransferMode::MAILBOX: the caller waits (outside the monitor to prevent serialization) for the induced + * communication to be complete to continue with its execution. + * - TransferMode::QUEUE: the behavior is exactly the same as get_async: data is simply popped from the queue and + * directly available to the caller. + */ + T* get() + { + T* data; + s4u::CommPtr comm = get_async(&data); + if (comm) { + XBT_CDEBUG(producer_consumer, "Waiting for the data to arrive"); + comm->wait(); + } + XBT_CDEBUG(producer_consumer, "data is available"); + return data; + } +}; + +} // namespace plugin +} // namespace simgrid + +#endif // SIMGRID_PLUGIN_PRODUCERCONSUMER_HPP diff --git a/tools/cmake/DefinePackages.cmake b/tools/cmake/DefinePackages.cmake index f36808b192..d756d5887a 100644 --- a/tools/cmake/DefinePackages.cmake +++ b/tools/cmake/DefinePackages.cmake @@ -680,6 +680,7 @@ set(headers_to_install include/simgrid/plugins/file_system.h include/simgrid/plugins/live_migration.h include/simgrid/plugins/load.h + include/simgrid/plugins/ProducerConsumer.hpp include/simgrid/smpi/smpi_replay.hpp include/simgrid/instr.h include/simgrid/mailbox.h