Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Stop using comm-waitany in a test which contains NetworkFailures
authorMartin Quinson <martin.quinson@ens-rennes.fr>
Tue, 25 Jul 2023 18:07:15 +0000 (20:07 +0200)
committerMartin Quinson <martin.quinson@ens-rennes.fr>
Tue, 25 Jul 2023 18:07:15 +0000 (20:07 +0200)
examples/cpp/comm-failure/s4u-comm-failure.cpp
examples/python/CMakeLists.txt
examples/python/comm-waitany/comm-waitany.py [deleted file]
examples/python/comm-waitany/comm-waitany.tesh [deleted file]
include/simgrid/s4u/ActivitySet.hpp
src/s4u/s4u_ActivitySet.cpp

index 0ce5522..33c7729 100644 (file)
@@ -33,12 +33,12 @@ public:
     auto comm2 = mailbox2->put_async((void*)666, 2);
 
     XBT_INFO("Calling wait_any..");
-    std::vector<sg4::CommPtr> pending_comms;
-    pending_comms.push_back(comm1);
-    pending_comms.push_back(comm2);
+    sg4::ActivitySet pending_comms;
+    pending_comms.push(comm1);
+    pending_comms.push(comm2);
     try {
-      long index = sg4::Comm::wait_any(pending_comms);
-      XBT_INFO("Wait any returned index %ld (comm to %s)", index, pending_comms.at(index)->get_mailbox()->get_cname());
+      auto* acti = pending_comms.wait_any().get();
+      XBT_INFO("Wait any returned comm to %s", dynamic_cast<sg4::Comm*>(acti)->get_mailbox()->get_cname());
     } catch (const simgrid::NetworkFailureException&) {
       XBT_INFO("Sender has experienced a network failure exception, so it knows that something went wrong");
       XBT_INFO("Now it needs to figure out which of the two comms failed by looking at their state:");
@@ -52,8 +52,7 @@ public:
       XBT_INFO("Waiting on a FAILED comm raises an exception: '%s'", e.what());
     }
     XBT_INFO("Wait for remaining comm, just to be nice");
-    pending_comms.erase(pending_comms.begin());
-    sg4::Comm::wait_any(pending_comms);
+    pending_comms.wait_all();
   }
 };
 
index be8db81..3705f47 100644 (file)
@@ -1,7 +1,7 @@
 foreach(example actor-create actor-daemon actor-join actor-kill actor-migrate actor-suspend actor-yield actor-lifetime
         activityset-testany activityset-waitall activityset-waitallfor activityset-waitany
         app-masterworkers
-        comm-wait comm-waitany comm-failure comm-host2host comm-pingpong
+        comm-wait comm-failure comm-host2host comm-pingpong
         comm-ready comm-suspend comm-throttling comm-waituntil
         exec-async exec-basic exec-dvfs exec-remote exec-ptask
         task-io task-simple task-switch-host task-variable-load
diff --git a/examples/python/comm-waitany/comm-waitany.py b/examples/python/comm-waitany/comm-waitany.py
deleted file mode 100644 (file)
index 76dd8ce..0000000
+++ /dev/null
@@ -1,87 +0,0 @@
-# Copyright (c) 2010-2023. The SimGrid Team. All rights reserved.
-#
-# This program is free software; you can redistribute it and/or modify it
-# under the terms of the license (GNU LGPL) which comes with this package.
-
-"""
-This example shows how to block on the completion of a set of communications.
-
-As for the other asynchronous examples, the sender initiate all the messages it wants to send and
-pack the resulting simgrid.Comm objects in a list. All messages thus occur concurrently.
-
-The sender then loops until there is no ongoing communication. Using wait_any() ensures that the sender
-will notice events as soon as they occur even if it does not follow the order of the container.
-
-Here, finalize messages will terminate earlier because their size is 0, so they travel faster than the
-other messages of this application.  As expected, the trace shows that the finalize of worker 1 is
-processed before 'Message 5' that is sent to worker 0.
-"""
-
-import sys
-from simgrid import Actor, Comm, Engine, Host, Mailbox, this_actor
-
-
-def sender(messages_count, msg_size, receivers_count):
-    # List in which we store all ongoing communications
-    pending_comms = []
-
-    # Vector of the used mailboxes
-    mboxes = [Mailbox.by_name("receiver-{:d}".format(i))
-              for i in range(0, receivers_count)]
-
-    # Start dispatching all messages to receivers, in a round robin fashion
-    for i in range(0, messages_count):
-        content = "Message {:d}".format(i)
-        mbox = mboxes[i % receivers_count]
-
-        this_actor.info("Send '{:s}' to '{:s}'".format(content, str(mbox)))
-
-        # Create a communication representing the ongoing communication, and store it in pending_comms
-        comm = mbox.put_async(content, msg_size)
-        pending_comms.append(comm)
-
-    # Start sending messages to let the workers know that they should stop
-    for i in range(0, receivers_count):
-        mbox = mboxes[i]
-        this_actor.info("Send 'finalize' to '{:s}'".format(str(mbox)))
-        comm = mbox.put_async("finalize", 0)
-        pending_comms.append(comm)
-
-    this_actor.info("Done dispatching all messages")
-
-    # Now that all message exchanges were initiated, wait for their completion, in order of completion.
-    #
-    # This loop waits for first terminating message with wait_any() and remove it with del, until all comms are
-    # terminated.
-    # Even in this simple example, the pending comms do not terminate in the exact same order of creation.
-    while pending_comms:
-        changed_pos = Comm.wait_any(pending_comms)
-        del pending_comms[changed_pos]
-        if changed_pos != 0:
-            this_actor.info(
-                "Remove the {:d}th pending comm: it terminated earlier than another comm that was initiated first."
-                .format(changed_pos))
-
-    this_actor.info("Goodbye now!")
-
-
-def receiver(my_id):
-    mbox = Mailbox.by_name("receiver-{:d}".format(my_id))
-    this_actor.info("Wait for my first message")
-    while True:
-        received = mbox.get()
-        this_actor.info("I got a '{:s}'.".format(received))
-        if received == "finalize":
-            break  # If it's a finalize message, we're done.
-
-if __name__ == '__main__':
-    e = Engine(sys.argv)
-
-    # Load the platform description
-    e.load_platform(sys.argv[1])
-
-    Actor.create("sender", Host.by_name("Tremblay"), sender, 6, 1000000, 2)
-    Actor.create("receiver", Host.by_name("Fafard"), receiver, 0)
-    Actor.create("receiver", Host.by_name("Jupiter"), receiver, 1)
-
-    e.run()
diff --git a/examples/python/comm-waitany/comm-waitany.tesh b/examples/python/comm-waitany/comm-waitany.tesh
deleted file mode 100644 (file)
index d4593c1..0000000
+++ /dev/null
@@ -1,27 +0,0 @@
-#!/usr/bin/env tesh
-
-p Testing Comm.wait_any()
-
-! output sort 19
-$ ${pythoncmd:=python3} ${PYTHON_TOOL_OPTIONS:=} ${bindir:=.}/comm-waitany.py ${platfdir}/small_platform.xml "--log=root.fmt:[%10.6r]%e(%i:%a@%h)%e%m%n"
-> [  0.000000] (1:sender@Tremblay) Send 'Message 0' to 'Mailbox(receiver-0)'
-> [  0.000000] (2:receiver@Fafard) Wait for my first message
-> [  0.000000] (3:receiver@Jupiter) Wait for my first message
-> [  0.000000] (1:sender@Tremblay) Send 'Message 1' to 'Mailbox(receiver-1)'
-> [  0.000000] (1:sender@Tremblay) Send 'Message 2' to 'Mailbox(receiver-0)'
-> [  0.000000] (1:sender@Tremblay) Send 'Message 3' to 'Mailbox(receiver-1)'
-> [  0.000000] (1:sender@Tremblay) Send 'Message 4' to 'Mailbox(receiver-0)'
-> [  0.000000] (1:sender@Tremblay) Send 'Message 5' to 'Mailbox(receiver-1)'
-> [  0.000000] (1:sender@Tremblay) Send 'finalize' to 'Mailbox(receiver-0)'
-> [  0.000000] (1:sender@Tremblay) Send 'finalize' to 'Mailbox(receiver-1)'
-> [  0.000000] (1:sender@Tremblay) Done dispatching all messages
-> [  0.158397] (2:receiver@Fafard) I got a 'Message 0'.
-> [  0.169155] (3:receiver@Jupiter) I got a 'Message 1'.
-> [  0.316794] (2:receiver@Fafard) I got a 'Message 2'.
-> [  0.338309] (3:receiver@Jupiter) I got a 'Message 3'.
-> [  0.475190] (2:receiver@Fafard) I got a 'Message 4'.
-> [  0.500898] (2:receiver@Fafard) I got a 'finalize'.
-> [  0.500898] (1:sender@Tremblay) Remove the 1th pending comm: it terminated earlier than another comm that was initiated first.
-> [  0.507464] (3:receiver@Jupiter) I got a 'Message 5'.
-> [  0.526478] (3:receiver@Jupiter) I got a 'finalize'.
-> [  0.526478] (1:sender@Tremblay) Goodbye now!
index b23ea6d..49c79d6 100644 (file)
@@ -27,6 +27,8 @@ class XBT_PUBLIC ActivitySet : public xbt::Extendable<ActivitySet> {
   std::vector<ActivityPtr> activities_; // Use vectors, not sets for better reproductibility accross architectures
   std::vector<ActivityPtr> failed_activities_;
 
+  void handle_failed_activities();
+
 public:
   ActivitySet()  = default;
   ActivitySet(const std::vector<ActivityPtr> init) : activities_(init) {}
index 8ba545a..215f126 100644 (file)
@@ -64,6 +64,21 @@ ActivityPtr ActivitySet::test_any()
   return ret;
 }
 
+void ActivitySet::handle_failed_activities()
+{
+  for (size_t i = 0; i < activities_.size(); i++) {
+    auto act = activities_[i];
+    if (act->pimpl_->get_state() == kernel::activity::State::FAILED) {
+      act->complete(Activity::State::FAILED);
+
+      failed_activities_.push_back(act);
+      activities_[i] = activities_[activities_.size() - 1];
+      activities_.resize(activities_.size() - 1);
+      i--; // compensate the i++ occuring at the end of the loop
+    }
+  }
+}
+
 ActivityPtr ActivitySet::wait_any_for(double timeout)
 {
   std::vector<kernel::activity::ActivityImpl*> act_impls(activities_.size());
@@ -72,19 +87,30 @@ ActivityPtr ActivitySet::wait_any_for(double timeout)
 
   kernel::actor::ActorImpl* issuer = kernel::actor::ActorImpl::self();
   kernel::actor::ActivityWaitanySimcall observer{issuer, act_impls, timeout, "wait_any_for"};
-  ssize_t changed_pos = kernel::actor::simcall_blocking(
-      [&observer] {
-        kernel::activity::ActivityImpl::wait_any_for(observer.get_issuer(), observer.get_activities(),
-                                                     observer.get_timeout());
-      },
-      &observer);
-  if (changed_pos == -1)
-    throw TimeoutException(XBT_THROW_POINT, "Timeouted");
-
-  auto ret = activities_.at(changed_pos);
-  erase(ret);
-  ret->complete(Activity::State::FINISHED);
-  return ret;
+  try {
+    ssize_t changed_pos = kernel::actor::simcall_blocking(
+        [&observer] {
+          kernel::activity::ActivityImpl::wait_any_for(observer.get_issuer(), observer.get_activities(),
+                                                       observer.get_timeout());
+        },
+        &observer);
+    if (changed_pos == -1)
+      throw TimeoutException(XBT_THROW_POINT, "Timeouted");
+
+    auto ret = activities_.at(changed_pos);
+    erase(ret);
+    ret->complete(Activity::State::FINISHED);
+    return ret;
+  } catch (const HostFailureException& e) {
+    handle_failed_activities();
+    throw e;
+  } catch (const NetworkFailureException& e) {
+    handle_failed_activities();
+    throw e;
+  } catch (const StorageFailureException& e) {
+    handle_failed_activities();
+    throw e;
+  }
 }
 
 ActivityPtr ActivitySet::get_failed_activity()