From: Fred Suter Date: Mon, 7 Nov 2022 21:25:24 +0000 (-0500) Subject: simplify the I/O stream thing A LOT. Might be plunged in CLM03 afterall X-Git-Tag: v3.34~694 X-Git-Url: http://bilbo.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/commitdiff_plain/6ca5c122412a958bea1a85b6a67248003d0e6564 simplify the I/O stream thing A LOT. Might be plunged in CLM03 afterall --- diff --git a/src/surf/sio_S22.cpp b/src/surf/sio_S22.cpp index aae6f3060a..3d5aa3b95d 100644 --- a/src/surf/sio_S22.cpp +++ b/src/surf/sio_S22.cpp @@ -25,7 +25,7 @@ XBT_LOG_EXTERNAL_CATEGORY(xbt_cfg); ***********/ static simgrid::config::Flag cfg_sio_solver("host/sio_solver", "Set linear equations solver used by sio model", - "fairbottleneck", + "maxmin", &simgrid::kernel::lmm::System::validate_solver); /**************************************/ @@ -34,8 +34,7 @@ XBT_LOG_EXTERNAL_CATEGORY(xbt_cfg); void surf_host_model_init_sio_S22() { XBT_CINFO(xbt_cfg, "Switching to the S22 model to handle streaming I/Os."); - xbt_assert(cfg_sio_solver != "maxmin", "Invalid configuration. Cannot use maxmin solver with streaming I/Os."); - + simgrid::config::set_default("network/crosstraffic", true); auto* system = simgrid::kernel::lmm::System::build(cfg_sio_solver.get(), true /* selective update */); auto host_model = std::make_shared("Host_Sio", system); auto* engine = simgrid::kernel::EngineImpl::get_instance(); @@ -48,43 +47,9 @@ namespace simgrid::kernel::resource { HostS22Model::HostS22Model(const std::string& name, lmm::System* sys) : HostModel(name) { set_maxmin_system(sys); - - auto net_model = std::make_shared("Network_Sio", this, sys); - auto engine = EngineImpl::get_instance(); - engine->add_model(net_model); - engine->get_netzone_root()->set_network_model(net_model); - - auto disk_model = std::make_shared("Disk_Sio", this, sys); - engine->add_model(disk_model); - engine->get_netzone_root()->set_disk_model(disk_model); - + surf_network_model_init_LegrandVelho(); surf_cpu_model_init_Cas01(); -} - -DiskS22Model::DiskS22Model(const std::string& name, HostS22Model* hmodel, lmm::System* sys) - : DiskModel(name), hostModel_(hmodel) -{ - set_maxmin_system(sys); -} - -DiskS22Model::~DiskS22Model() -{ - set_maxmin_system(nullptr); -} - -NetworkS22Model::NetworkS22Model(const std::string& name, HostS22Model* hmodel, lmm::System* sys) - : NetworkModel(name), hostModel_(hmodel) -{ - set_maxmin_system(sys); - loopback_.reset(create_link("__loopback__", {simgrid::config::get_value("network/loopback-bw")})); - loopback_->set_sharing_policy(s4u::Link::SharingPolicy::FATPIPE, {}); - loopback_->set_latency(simgrid::config::get_value("network/loopback-lat")); - loopback_->get_iface()->seal(); -} - -NetworkS22Model::~NetworkS22Model() -{ - set_maxmin_system(nullptr); + surf_disk_model_init_S19(); } double HostS22Model::next_occurring_event(double now) @@ -152,54 +117,87 @@ void HostS22Model::update_actions_state(double /*now*/, double delta) } } + +DiskAction* HostS22Model::io_stream(s4u::Host* src_host, DiskImpl* src_disk, s4u::Host* dst_host, DiskImpl* dst_disk, + double size) +{ + return new S22Action(this, src_host, src_disk, dst_host, dst_disk, size); +} + +Action* HostS22Model::execute_thread(const s4u::Host* host, double flops_amount, int thread_count) +{ + /* Create a single action whose cost is thread_count * flops_amount and that requests thread_count cores. */ + return host->get_cpu()->execution_start(thread_count * flops_amount, thread_count, -1); +} + +/********** + * Action * + **********/ +void S22Action::update_bound() const +{ + double bound = std::numeric_limits::max(); + if (src_disk_) + bound = std::min(bound, src_disk_->get_read_bandwidth()); + if (dst_disk_) + bound = std::min(bound, dst_disk_->get_write_bandwidth()); + if (src_host_ != dst_host_) { + double lat = 0.0; + std::vector route; + src_host_->route_to(dst_host_, route, &lat); + if (lat > 0) + bound = std::min(bound,NetworkModel::cfg_tcp_gamma / (2.0 * lat)); + } + + XBT_DEBUG("action (%p) : bound = %g", this, bound); + + /* latency has been paid (or no latency), we can set the appropriate bound for network limit */ + if ((bound < std::numeric_limits::max()) && (latency_ <= 0.0)) + get_model()->get_maxmin_system()->update_variable_bound(get_variable(), bound); + } + S22Action::S22Action(Model* model, s4u::Host* src_host, DiskImpl* src_disk, s4u::Host* dst_host, DiskImpl* dst_disk, double size) - : DiskAction(model, 1, false) + : DiskAction(model, size, false) , src_host_(src_host) , src_disk_(src_disk) , dst_host_(dst_host) , dst_disk_(dst_disk) , size_(size) { - size_t disk_nb = 0; - size_t link_nb = 0; - double latency = 0.0; this->set_last_update(); + size_t disk_nb = 0; if (src_disk_ != nullptr) disk_nb++; if (dst_disk_ != nullptr) disk_nb++; /* there should always be a route between src_host and dst_host (loopback_ for self communication at least) */ - double lat = 0.0; std::vector route; - src_host_->route_to(dst_host_, route, &lat); - latency = std::max(latency, lat); - link_nb = route.size(); + src_host_->route_to(dst_host_, route, &latency_); + size_t link_nb = route.size(); XBT_DEBUG("Creating a stream io (%p) with %zu disk(s) and %zu unique link(s).", this, disk_nb, link_nb); - latency_ = latency; - set_variable(model->get_maxmin_system()->variable_new(this, 1.0, -1.0, disk_nb + link_nb)); + set_variable(model->get_maxmin_system()->variable_new(this, 1.0, -1.0, 3 * disk_nb + link_nb)); if (latency_ > 0) model->get_maxmin_system()->update_variable_penalty(get_variable(), 0.0); /* Expand it for the disks even if there is nothing to read/write, to make sure that it gets expended even if there is no * communication either */ - if (src_disk_ != nullptr) - model->get_maxmin_system()->expand(src_disk_->get_constraint(), get_variable(), size, true); - if (dst_disk_ != nullptr) - model->get_maxmin_system()->expand(dst_disk_->get_constraint(), get_variable(), size, true); - - if (link_nb > 0) { - std::vector route; - src_host_->route_to(dst_host_, route, nullptr); - for (auto const& link : route) - model->get_maxmin_system()->expand(link->get_constraint(), this->get_variable(), size_); + if (src_disk_ != nullptr){ + model->get_maxmin_system()->expand(src_disk_->get_constraint(), get_variable(), 1); + model->get_maxmin_system()->expand(src_disk->get_read_constraint(), get_variable(), 1); } + if (dst_disk_ != nullptr){ + model->get_maxmin_system()->expand(dst_disk_->get_constraint(), get_variable(), 1); + model->get_maxmin_system()->expand(dst_disk_->get_write_constraint(), get_variable(), 1); + } + + for (auto const& link : route) + model->get_maxmin_system()->expand(link->get_constraint(), get_variable(), 1); - if (link_nb + disk_nb == 0) { + if (size <= 0) { this->set_cost(1.0); this->set_remains(0.0); } @@ -207,183 +205,4 @@ S22Action::S22Action(Model* model, s4u::Host* src_host, DiskImpl* src_disk, s4u: /* finally calculate the initial bound value */ update_bound(); } - -DiskAction* HostS22Model::io_stream(s4u::Host* src_host, DiskImpl* src_disk, s4u::Host* dst_host, DiskImpl* dst_disk, - double size) -{ - return new S22Action(this, src_host, src_disk, dst_host, dst_disk, size); -} - -Action* NetworkS22Model::communicate(s4u::Host* src, s4u::Host* dst, double size, double /*rate*/) -{ - return hostModel_->io_stream(src, nullptr, dst, nullptr, size); -} - -DiskImpl* DiskS22Model::create_disk(const std::string& name, double read_bandwidth, double write_bandwidth) -{ - return (new DiskS22(name, read_bandwidth, write_bandwidth))->set_model(this); -} - -StandardLinkImpl* NetworkS22Model::create_link(const std::string& name, const std::vector& bandwidths) -{ - xbt_assert(bandwidths.size() == 1, "Non WIFI link must have only 1 bandwidth."); - auto link = new LinkS22(name, bandwidths[0], get_maxmin_system()); - link->set_model(this); - return link; -} - -StandardLinkImpl* NetworkS22Model::create_wifi_link(const std::string& name, const std::vector& bandwidths) -{ - THROW_UNIMPLEMENTED; -} - -/************ - * Resource * - ************/ -DiskAction* DiskS22::io_start(sg_size_t size, s4u::Io::OpType type) -{ - DiskAction* res; - switch (type) { - case s4u::Io::OpType::READ: - res = static_cast(get_model())->hostModel_->io_stream(get_host(), this, get_host(), nullptr, size); - break; - case s4u::Io::OpType::WRITE: - res = static_cast(get_model())->hostModel_->io_stream(get_host(), nullptr, get_host(), this, size); - break; - default: - THROW_UNIMPLEMENTED; - } - - return res; -} - -void DiskS22::apply_event(kernel::profile::Event* triggered, double value) -{ - /* Find out which of my iterators was triggered, and react accordingly */ - if (triggered == get_read_event()) { - set_read_bandwidth(value); - unref_read_event(); - } else if (triggered == get_write_event()) { - set_write_bandwidth(value); - unref_write_event(); - } else if (triggered == get_state_event()) { - if (value > 0) - turn_on(); - else - turn_off(); - unref_state_event(); - } else { - xbt_die("Unknown event!\n"); - } -} - -LinkS22::LinkS22(const std::string& name, double bandwidth, lmm::System* system) : StandardLinkImpl(name) -{ - this->set_constraint(system->constraint_new(this, bandwidth)); - bandwidth_.peak = bandwidth; -} - -void LinkS22::apply_event(profile::Event* triggered, double value) -{ - XBT_DEBUG("Updating link %s (%p) with value=%f", get_cname(), this, value); - if (triggered == bandwidth_.event) { - set_bandwidth(value); - tmgr_trace_event_unref(&bandwidth_.event); - - } else if (triggered == latency_.event) { - set_latency(value); - tmgr_trace_event_unref(&latency_.event); - - } else if (triggered == get_state_event()) { - if (value > 0) - turn_on(); - else - turn_off(); - unref_state_event(); - } else { - xbt_die("Unknown event ! \n"); - } -} - -void LinkS22::set_bandwidth(double value) -{ - bandwidth_.peak = value; - StandardLinkImpl::on_bandwidth_change(); - - get_model()->get_maxmin_system()->update_constraint_bound(get_constraint(), bandwidth_.peak * bandwidth_.scale); -} - -void LinkS22::set_latency(double value) -{ - latency_check(value); - const lmm::Element* elem = nullptr; - - latency_.peak = value; - while (const auto* var = get_constraint()->get_variable(&elem)) { - const auto* action = static_cast(var->get_id()); - action->update_bound(); - } -} - -/********** - * Action * - **********/ - -double S22Action::calculate_io_read_bound() const -{ - double io_read_bound = std::numeric_limits::max(); - - if (src_disk_ == nullptr) - return io_read_bound; - - if (size_ > 0) - io_read_bound = std::min(io_read_bound, src_disk_->get_read_bandwidth() / size_); - - return io_read_bound; -} - -double S22Action::calculate_io_write_bound() const -{ - double io_write_bound = std::numeric_limits::max(); - - if (dst_disk_ == nullptr) - return io_write_bound; - - if (size_ > 0) - io_write_bound = std::min(io_write_bound, dst_disk_->get_write_bandwidth() / size_); - - return io_write_bound; -} - -double S22Action::calculate_network_bound() const -{ - double lat = 0.0; - double lat_bound = std::numeric_limits::max(); - - if (src_host_ == dst_host_) - return lat_bound; - - if (size_ <= 0){ - std::vector route; - src_host_->route_to(dst_host_, route, &lat); - } - - if (lat > 0) - lat_bound = NetworkModel::cfg_tcp_gamma / (2.0 * lat); - - return lat_bound; -} - -void S22Action::update_bound() const -{ - double bound = std::min(std::min(calculate_io_read_bound(), calculate_io_write_bound()), - calculate_network_bound()); - - XBT_DEBUG("action (%p) : bound = %g", this, bound); - - /* latency has been paid (or no latency), we can set the appropriate bound for network limit */ - if ((bound < std::numeric_limits::max()) && (latency_ <= 0.0)) - get_model()->get_maxmin_system()->update_variable_bound(get_variable(), bound); - } - } // namespace simgrid::kernel::resource diff --git a/src/surf/sio_S22.hpp b/src/surf/sio_S22.hpp index a8d7cb7311..ef89c17297 100644 --- a/src/surf/sio_S22.hpp +++ b/src/surf/sio_S22.hpp @@ -18,8 +18,6 @@ namespace simgrid::kernel::resource { ***********/ class XBT_PRIVATE HostS22Model; -class XBT_PRIVATE DiskS22Model; -class XBT_PRIVATE NetworkS22Model; class XBT_PRIVATE DiskS22; class XBT_PRIVATE LinkS22; @@ -37,74 +35,13 @@ public: double next_occurring_event(double now) override; void update_actions_state(double now, double delta) override; - Action* execute_thread(const s4u::Host* host, double flops_amount, int thread_count) override { return nullptr; } + Action* execute_thread(const s4u::Host* host, double flops_amount, int thread_count) override; CpuAction* execute_parallel(const std::vector& host_list, const double* flops_amount, const double* bytes_amount, double rate) override { return nullptr; } DiskAction* io_stream(s4u::Host* src_host, DiskImpl* src_disk, s4u::Host* dst_host, DiskImpl* dst_disk, double size) override; }; -class DiskS22Model : public DiskModel { -public: - DiskS22Model(const std::string& name, HostS22Model* hmodel, lmm::System* sys); - DiskS22Model(const DiskS22Model&) = delete; - DiskS22Model& operator=(const DiskS22Model&) = delete; - ~DiskS22Model() override; - void update_actions_state(double /*now*/, double /*delta*/) override{ - /* this action is done by HostS22Model which shares the LMM system with the Disk model - * Overriding to an empty function here allows us to handle the DiskS22Model as a regular - * method in EngineImpl::presolve */ - }; - - DiskImpl* create_disk(const std::string& name, double read_bandwidth, double write_bandwidth) override; - HostS22Model* hostModel_; -}; - -class NetworkS22Model : public NetworkModel { -public: - NetworkS22Model(const std::string& name, HostS22Model* hmodel, lmm::System* sys); - NetworkS22Model(const NetworkS22Model&) = delete; - NetworkS22Model& operator=(const NetworkS22Model&) = delete; - ~NetworkS22Model() override; - StandardLinkImpl* create_link(const std::string& name, const std::vector& bandwidths) final; - StandardLinkImpl* create_wifi_link(const std::string& name, const std::vector& bandwidths) override; - - Action* communicate(s4u::Host* src, s4u::Host* dst, double size, double rate) override; - void update_actions_state(double /*now*/, double /*delta*/) override{ - /* this action is done by HostS22Model which shares the LMM system with the Network model - * Overriding to an empty function here allows us to handle the NetworkS22Model as a regular - * method in EngineImpl::presolve */ - }; - - HostS22Model* hostModel_; -}; - -/************ - * Resource * - ************/ - -class DiskS22 : public DiskImpl { -public: - using DiskImpl::DiskImpl; - DiskS22(const DiskS22&) = delete; - DiskS22& operator=(const DiskS22&) = delete; - - void apply_event(profile::Event* event, double value) override; - DiskAction* io_start(sg_size_t size, s4u::Io::OpType type) override; -}; - -class LinkS22 : public StandardLinkImpl { -public: - LinkS22(const std::string& name, double bandwidth, lmm::System* system); - LinkS22(const LinkS22&) = delete; - LinkS22& operator=(const LinkS22&) = delete; - ~LinkS22() = default; - - void apply_event(profile::Event* event, double value) override; - void set_bandwidth(double value) override; - void set_latency(double value) override; -}; - /********** * Action * **********/ @@ -117,13 +54,7 @@ class S22Action : public DiskAction { const double size_; - double latency_; - - friend DiskAction* DiskS22::io_start(sg_size_t size, s4u::Io::OpType type); - friend Action* NetworkS22Model::communicate(s4u::Host* src, s4u::Host* dst, double size, double rate); - - double calculate_io_read_bound() const; - double calculate_io_write_bound() const; + double latency_ = 0; /** * @brief Calculate the network bound for the parallel task diff --git a/teshsuite/s4u/io-stream/io-stream.cpp b/teshsuite/s4u/io-stream/io-stream.cpp index 9207fb94db..98bc2b598e 100644 --- a/teshsuite/s4u/io-stream/io-stream.cpp +++ b/teshsuite/s4u/io-stream/io-stream.cpp @@ -109,7 +109,7 @@ int main(int argc, char** argv) sg4::LinkInRoute link(zone->create_link("link", "2MBps")->set_latency("50us")->seal()); zone->add_route(bob->get_netpoint(), alice->get_netpoint(), nullptr, nullptr, {link}, true); - bob->create_disk("bob_disk", 1e6, 5e5); + bob->create_disk("bob_disk", "1MBps", "500kBps"); alice->create_disk("alice_disk", 4e6, 4e6); zone->seal(); diff --git a/teshsuite/s4u/io-stream/io-stream.tesh b/teshsuite/s4u/io-stream/io-stream.tesh index f2dbda3579..d6b1b6efda 100644 --- a/teshsuite/s4u/io-stream/io-stream.tesh +++ b/teshsuite/s4u/io-stream/io-stream.tesh @@ -5,21 +5,21 @@ $ ${bindir:=.}/io-stream --cfg=host/model:sio_S22 "--log=root.fmt:[%10.6r]%e[%i > [ 0.000000] [0:maestro@] Switching to the S22 model to handle streaming I/Os. > [ 0.000000] [1:streamer@bob] [Bob -> Alice] Store and Forward (1 block) > [ 4.000000] [1:streamer@bob] Read : 4.000000 seconds -> [ 6.000050] [1:streamer@bob] Send : 2.000050 seconds -> [ 7.000050] [1:streamer@bob] Write : 1.000000 seconds -> [ 7.000050] [1:streamer@bob] Total : 7.000050 seconds -> [ 7.000050] [1:streamer@bob] [Bob -> Alice] Store and Forward (100 blocks) -> [ 11.000050] [1:streamer@bob] Total : 4.000000 seconds -> [ 11.000050] [1:streamer@bob] [Bob -> Alice] Streaming (Read bottleneck) -> [ 15.000100] [1:streamer@bob] Total : 4.000050 seconds -> [ 15.000100] [1:streamer@bob] [Bob -> Alice] Streaming (Write bottleneck) -> [ 23.000150] [1:streamer@bob] Total : 8.000050 seconds -> [ 23.000150] [1:streamer@bob] Start two 10-second background traffic between Bob and Alice -> [ 23.000150] [1:streamer@bob] [Bob -> Alice] Streaming (Transfer bottleneck) -> [ 29.000200] [1:streamer@bob] Total : 6.000050 seconds -> [ 45.000200] [1:streamer@bob] [Bob -> Alice] Streaming "from disk to memory" (no write) -> [ 49.000250] [1:streamer@bob] Total : 4.000050 seconds -> [ 49.000250] [1:streamer@bob] [Bob -> Alice] Streaming "from memory to disk" (no read) -> [ 51.000300] [1:streamer@bob] Total : 2.000050 seconds -> [ 51.000300] [1:streamer@bob] [Bob -> Bob] Disk to disk (no transfer) -> [ 59.000300] [1:streamer@bob] Total : 8.000000 seconds \ No newline at end of file +> [ 6.165599] [1:streamer@bob] Send : 2.165599 seconds +> [ 7.165599] [1:streamer@bob] Write : 1.000000 seconds +> [ 7.165599] [1:streamer@bob] Total : 7.165599 seconds +> [ 7.165599] [1:streamer@bob] [Bob -> Alice] Store and Forward (100 blocks) +> [ 11.165518] [1:streamer@bob] Total : 3.999919 seconds +> [ 11.165518] [1:streamer@bob] [Bob -> Alice] Streaming (Read bottleneck) +> [ 15.165568] [1:streamer@bob] Total : 4.000050 seconds +> [ 15.165568] [1:streamer@bob] [Bob -> Alice] Streaming (Write bottleneck) +> [ 27.234930] [1:streamer@bob] Total : 12.069362 seconds +> [ 27.234930] [1:streamer@bob] Start two 10-second background traffic between Bob and Alice +> [ 27.234930] [1:streamer@bob] [Bob -> Alice] Streaming (Transfer bottleneck) +> [ 45.285733] [1:streamer@bob] Total : 18.050803 seconds +> [ 45.285733] [1:streamer@bob] [Bob -> Alice] Streaming "from disk to memory" (no write) +> [ 49.285783] [1:streamer@bob] Total : 4.000050 seconds +> [ 49.285783] [1:streamer@bob] [Bob -> Alice] Streaming "from memory to disk" (no read) +> [ 51.285833] [1:streamer@bob] Total : 2.000050 seconds +> [ 51.285833] [1:streamer@bob] [Bob -> Bob] Disk to disk (no transfer) +> [ 59.285833] [1:streamer@bob] Total : 8.000000 seconds \ No newline at end of file