Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
use ActivitySet when we call wait_any in examples
authorFred Suter <suterf@ornl.gov>
Mon, 17 Jul 2023 19:47:48 +0000 (15:47 -0400)
committerFred Suter <suterf@ornl.gov>
Mon, 17 Jul 2023 19:47:48 +0000 (15:47 -0400)
examples/cpp/app-chainsend/s4u-app-chainsend.cpp
examples/cpp/comm-failure/s4u-comm-failure.cpp
examples/cpp/exec-dependent/s4u-exec-dependent.cpp
examples/cpp/io-dependent/s4u-io-dependent.cpp
examples/cpp/network-nonlinear/s4u-network-nonlinear.cpp
examples/cpp/platform-comm-serialize/s4u-platform-comm-serialize.cpp

index 2ba057c..bd7e25d 100644 (file)
@@ -34,8 +34,8 @@ public:
   sg4::Mailbox* prev = nullptr;
   sg4::Mailbox* next = nullptr;
   sg4::Mailbox* me   = nullptr;
-  std::vector<sg4::CommPtr> pending_recvs;
-  std::vector<sg4::CommPtr> pending_sends;
+  sg4::ActivitySet pending_recvs;
+  sg4::ActivitySet pending_sends;
 
   unsigned long long received_bytes = 0;
   unsigned int received_pieces      = 0;
@@ -60,17 +60,16 @@ public:
 
     while (not done) {
       sg4::CommPtr comm = me->get_async<FilePiece>(&received);
-      pending_recvs.push_back(comm);
+      pending_recvs.push(comm);
 
-      ssize_t idx = sg4::Comm::wait_any(pending_recvs);
-      if (idx != -1) {
-        comm = pending_recvs.at(idx);
+      auto completed_one = pending_recvs.wait_any();
+      if (completed_one != nullptr) {
+        comm = boost::dynamic_pointer_cast<sg4::Comm>(completed_one);
         XBT_DEBUG("Peer %s got a 'SEND_DATA' message", me->get_cname());
-        pending_recvs.erase(pending_recvs.begin() + idx);
         if (next != nullptr) {
           XBT_DEBUG("Sending (asynchronously) from %s to %s", me->get_cname(), next->get_cname());
           sg4::CommPtr send = next->put_async(received, MESSAGE_SEND_DATA_HEADER_SIZE + PIECE_SIZE);
-          pending_sends.push_back(send);
+          pending_sends.push(send);
         } else
           delete received;
 
@@ -110,14 +109,14 @@ public:
 
   void sendFile()
   {
-    std::vector<sg4::CommPtr> pending_sends;
+    sg4::ActivitySet pending_sends;
     for (unsigned int current_piece = 0; current_piece < piece_count; current_piece++) {
       XBT_DEBUG("Sending (send) piece %u from %s into mailbox %s", current_piece, sg4::Host::current()->get_cname(),
                 first->get_cname());
       sg4::CommPtr comm = first->put_async(new FilePiece(), MESSAGE_SEND_DATA_HEADER_SIZE + PIECE_SIZE);
-      pending_sends.push_back(comm);
+      pending_sends.push(comm);
     }
-    sg4::Comm::wait_all(pending_sends);
+    pending_sends.wait_all();
   }
 
   Broadcaster(int hostcount, unsigned int piece_count) : piece_count(piece_count)
@@ -140,7 +139,7 @@ static void peer()
   p.joinChain();
   p.forwardFile();
 
-  sg4::Comm::wait_all(p.pending_sends);
+  p.pending_sends.wait_all();
   double end_time = sg4::Engine::get_clock();
 
   XBT_INFO("### %f %llu bytes (Avg %f MB/s); copy finished (simulated).", end_time - start_time, p.received_bytes,
index 90b3778..9f36c75 100644 (file)
@@ -93,7 +93,7 @@ int main(int argc, char** argv)
   sg4::Actor::create("Sender", host1, Sender("mailbox2", "mailbox3"));
   sg4::Actor::create("Receiver", host2, Receiver("mailbox2"));
   sg4::Actor::create("Receiver", host3, Receiver("mailbox3"));
-  
+
   sg4::Actor::create("LinkKiller", host1, [](){
     sg4::this_actor::sleep_for(10.0);
     XBT_INFO("Turning off link 'linkto2'");
index f473664..08129e4 100644 (file)
@@ -14,16 +14,14 @@ static void worker()
   // Define an amount of work that should take 1 second to execute.
   double computation_amount = sg4::this_actor::get_host()->get_speed();
 
-  std::vector<sg4::ExecPtr> pending_execs;
   // Create a small DAG
   // + Two parents and a child
   // + First parent ends after 1 second and the Second parent after 2 seconds.
   sg4::ExecPtr first_parent = sg4::this_actor::exec_init(computation_amount);
-  pending_execs.push_back(first_parent);
   sg4::ExecPtr second_parent = sg4::this_actor::exec_init(2 * computation_amount);
-  pending_execs.push_back(second_parent);
   sg4::ExecPtr child = sg4::Exec::init()->set_flops_amount(computation_amount);
-  pending_execs.push_back(child);
+
+  sg4::ActivitySet pending_execs ({first_parent, second_parent, child});
 
   // Name the activities (for logging purposes only)
   first_parent->set_name("parent 1");
@@ -41,9 +39,9 @@ static void worker()
 
   // wait for the completion of all activities
   while (not pending_execs.empty()) {
-    ssize_t changed_pos = sg4::Exec::wait_any_for(pending_execs, -1);
-    XBT_INFO("Exec '%s' is complete", pending_execs[changed_pos]->get_cname());
-    pending_execs.erase(pending_execs.begin() + changed_pos);
+    auto completed_one = pending_execs.wait_any();
+    if (completed_one != nullptr)
+      XBT_INFO("Exec '%s' is complete", completed_one->get_cname());
   }
 }
 
index c86a78c..4772376 100644 (file)
@@ -12,16 +12,15 @@ namespace sg4 = simgrid::s4u;
 
 static void test()
 {
-  std::vector<sg4::ActivityPtr> pending_activities;
-
   sg4::ExecPtr bob_compute = sg4::this_actor::exec_init(1e9);
-  pending_activities.push_back(boost::dynamic_pointer_cast<sg4::Activity>(bob_compute));
   sg4::IoPtr bob_write = sg4::Host::current()->get_disks().front()->io_init(4000000, sg4::Io::OpType::WRITE);
-  pending_activities.push_back(boost::dynamic_pointer_cast<sg4::Activity>(bob_write));
   sg4::IoPtr carl_read = sg4::Host::by_name("carl")->get_disks().front()->io_init(4000000, sg4::Io::OpType::READ);
-  pending_activities.push_back(boost::dynamic_pointer_cast<sg4::Activity>(carl_read));
   sg4::ExecPtr carl_compute = sg4::Host::by_name("carl")->exec_init(1e9);
-  pending_activities.push_back(boost::dynamic_pointer_cast<sg4::Activity>(carl_compute));
+
+  sg4::ActivitySet pending_activities ({boost::dynamic_pointer_cast<sg4::Activity>(bob_compute),
+                                        boost::dynamic_pointer_cast<sg4::Activity>(bob_write),
+                                        boost::dynamic_pointer_cast<sg4::Activity>(carl_read),
+                                        boost::dynamic_pointer_cast<sg4::Activity>(carl_compute)});
 
   // Name the activities (for logging purposes only)
   bob_compute->set_name("bob compute");
@@ -45,9 +44,9 @@ static void test()
 
   // wait for the completion of all activities
   while (not pending_activities.empty()) {
-    ssize_t changed_pos = sg4::Activity::wait_any(pending_activities);
-    XBT_INFO("Activity '%s' is complete", pending_activities[changed_pos]->get_cname());
-    pending_activities.erase(pending_activities.begin() + changed_pos);
+    auto completed_one = pending_activities.wait_any();
+    if (completed_one != nullptr)
+      XBT_INFO("Activity '%s' is complete", completed_one->get_cname());
   }
 }
 
index 6384e76..9bfce5b 100644 (file)
@@ -23,10 +23,10 @@ public:
   void operator()() const
   {
     // sphinx-doc: init-begin (this line helps the doc to build; ignore it)
-    /* Vector in which we store all ongoing communications */
-    std::vector<sg4::CommPtr> pending_comms;
+    /* ActivitySet in which we store all ongoing communications */
+    sg4::ActivitySet pending_comms;
 
-    /* Make a vector of the mailboxes to use */
+    /* Mailbox to use */
     sg4::Mailbox* mbox = sg4::Mailbox::by_name("receiver");
     // sphinx-doc: init-end
 
@@ -41,13 +41,13 @@ public:
 
       /* Create a communication representing the ongoing communication, and store it in pending_comms */
       sg4::CommPtr comm = mbox->put_async(payload, size);
-      pending_comms.push_back(comm);
+      pending_comms.push(comm);
     }
 
     XBT_INFO("Done dispatching all messages");
 
     /* Now that all message exchanges were initiated, wait for their completion in one single call */
-    sg4::Comm::wait_all(pending_comms);
+    pending_comms.wait_all();
     // sphinx-doc: put-end
 
     XBT_INFO("Goodbye now!");
@@ -63,23 +63,28 @@ public:
   explicit Receiver(int count) : messages_count(count) { mbox = sg4::Mailbox::by_name("receiver"); }
   void operator()()
   {
-    /* Vector in which we store all incoming msgs */
-    std::vector<std::unique_ptr<std::string*>> pending_msgs;
-    std::vector<sg4::CommPtr> pending_comms;
+    /* Where we store all incoming msgs */
+    std::unordered_map<sg4::CommPtr, std::string**> pending_msgs;
+    sg4::ActivitySet pending_comms;
 
     XBT_INFO("Wait for %d messages asynchronously", messages_count);
     for (int i = 0; i < messages_count; i++) {
-      pending_msgs.push_back(std::make_unique<std::string*>());
-      pending_comms.emplace_back(mbox->get_async<std::string>(pending_msgs[i].get()));
+      std::string* msg;
+      auto comm = mbox->get_async<std::string>(&msg);
+      pending_comms.push(comm);
+      pending_msgs.insert({comm, &msg});
     }
+
     while (not pending_comms.empty()) {
-      ssize_t index    = sg4::Comm::wait_any(pending_comms);
-      std::string* msg = *pending_msgs[index];
-      XBT_INFO("I got '%s'.", msg->c_str());
-      /* cleanup memory and remove from vectors */
-      delete msg;
-      pending_comms.erase(pending_comms.begin() + index);
-      pending_msgs.erase(pending_msgs.begin() + index);
+      auto completed_one = pending_comms.wait_any();
+      if (completed_one != nullptr){
+        auto comm = boost::dynamic_pointer_cast<sg4::Comm>(completed_one);
+        std::string *msg = std::move(*pending_msgs[comm]);
+        XBT_INFO("I got '%s'.", msg->c_str());
+        /* cleanup memory and remove from map */
+        delete msg;
+        pending_msgs.erase(comm);
+      }
     }
   }
 };
index 3fe272d..cbff56d 100644 (file)
@@ -7,7 +7,7 @@
  *
  * This example is very similar to the other asynchronous communication examples, but messages get serialized by the platform.
  * Without this call to Link::set_concurrency_limit(2) in main, all messages would be received at the exact same timestamp since
- * they are initiated at the same instant and are of the same size. But with this extra configuration to the link, at most 2 
+ * they are initiated at the same instant and are of the same size. But with this extra configuration to the link, at most 2
  * messages can travel through the link at the same time.
  */
 
@@ -26,10 +26,10 @@ public:
   void operator()() const
   {
     // sphinx-doc: init-begin (this line helps the doc to build; ignore it)
-    /* Vector in which we store all ongoing communications */
-    std::vector<sg4::CommPtr> pending_comms;
+    /* ActivitySet in which we store all ongoing communications */
+    sg4::ActivitySet pending_comms;
 
-    /* Make a vector of the mailboxes to use */
+    /* Mailbox to use */
     sg4::Mailbox* mbox = sg4::Mailbox::by_name("receiver");
     // sphinx-doc: init-end
 
@@ -44,13 +44,13 @@ public:
 
       /* Create a communication representing the ongoing communication, and store it in pending_comms */
       sg4::CommPtr comm = mbox->put_async(payload, msg_size);
-      pending_comms.push_back(comm);
+      pending_comms.push(comm);
     }
 
     XBT_INFO("Done dispatching all messages");
 
     /* Now that all message exchanges were initiated, wait for their completion in one single call */
-    sg4::Comm::wait_all(pending_comms);
+    pending_comms.wait_all();
     // sphinx-doc: put-end
 
     XBT_INFO("Goodbye now!");