Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
use ActivitySet when we call wait_any in examples
[simgrid.git] / examples / cpp / app-chainsend / s4u-app-chainsend.cpp
index 2ba057c..bd7e25d 100644 (file)
@@ -34,8 +34,8 @@ public:
   sg4::Mailbox* prev = nullptr;
   sg4::Mailbox* next = nullptr;
   sg4::Mailbox* me   = nullptr;
-  std::vector<sg4::CommPtr> pending_recvs;
-  std::vector<sg4::CommPtr> pending_sends;
+  sg4::ActivitySet pending_recvs;
+  sg4::ActivitySet pending_sends;
 
   unsigned long long received_bytes = 0;
   unsigned int received_pieces      = 0;
@@ -60,17 +60,16 @@ public:
 
     while (not done) {
       sg4::CommPtr comm = me->get_async<FilePiece>(&received);
-      pending_recvs.push_back(comm);
+      pending_recvs.push(comm);
 
-      ssize_t idx = sg4::Comm::wait_any(pending_recvs);
-      if (idx != -1) {
-        comm = pending_recvs.at(idx);
+      auto completed_one = pending_recvs.wait_any();
+      if (completed_one != nullptr) {
+        comm = boost::dynamic_pointer_cast<sg4::Comm>(completed_one);
         XBT_DEBUG("Peer %s got a 'SEND_DATA' message", me->get_cname());
-        pending_recvs.erase(pending_recvs.begin() + idx);
         if (next != nullptr) {
           XBT_DEBUG("Sending (asynchronously) from %s to %s", me->get_cname(), next->get_cname());
           sg4::CommPtr send = next->put_async(received, MESSAGE_SEND_DATA_HEADER_SIZE + PIECE_SIZE);
-          pending_sends.push_back(send);
+          pending_sends.push(send);
         } else
           delete received;
 
@@ -110,14 +109,14 @@ public:
 
   void sendFile()
   {
-    std::vector<sg4::CommPtr> pending_sends;
+    sg4::ActivitySet pending_sends;
     for (unsigned int current_piece = 0; current_piece < piece_count; current_piece++) {
       XBT_DEBUG("Sending (send) piece %u from %s into mailbox %s", current_piece, sg4::Host::current()->get_cname(),
                 first->get_cname());
       sg4::CommPtr comm = first->put_async(new FilePiece(), MESSAGE_SEND_DATA_HEADER_SIZE + PIECE_SIZE);
-      pending_sends.push_back(comm);
+      pending_sends.push(comm);
     }
-    sg4::Comm::wait_all(pending_sends);
+    pending_sends.wait_all();
   }
 
   Broadcaster(int hostcount, unsigned int piece_count) : piece_count(piece_count)
@@ -140,7 +139,7 @@ static void peer()
   p.joinChain();
   p.forwardFile();
 
-  sg4::Comm::wait_all(p.pending_sends);
+  p.pending_sends.wait_all();
   double end_time = sg4::Engine::get_clock();
 
   XBT_INFO("### %f %llu bytes (Avg %f MB/s); copy finished (simulated).", end_time - start_time, p.received_bytes,