sg4::Mailbox* prev = nullptr;
sg4::Mailbox* next = nullptr;
sg4::Mailbox* me = nullptr;
- std::vector<sg4::CommPtr> pending_recvs;
- std::vector<sg4::CommPtr> pending_sends;
+ sg4::ActivitySet pending_recvs;
+ sg4::ActivitySet pending_sends;
unsigned long long received_bytes = 0;
unsigned int received_pieces = 0;
while (not done) {
sg4::CommPtr comm = me->get_async<FilePiece>(&received);
- pending_recvs.push_back(comm);
+ pending_recvs.push(comm);
- ssize_t idx = sg4::Comm::wait_any(pending_recvs);
- if (idx != -1) {
- comm = pending_recvs.at(idx);
+ auto completed_one = pending_recvs.wait_any();
+ if (completed_one != nullptr) {
+ comm = boost::dynamic_pointer_cast<sg4::Comm>(completed_one);
XBT_DEBUG("Peer %s got a 'SEND_DATA' message", me->get_cname());
- pending_recvs.erase(pending_recvs.begin() + idx);
if (next != nullptr) {
XBT_DEBUG("Sending (asynchronously) from %s to %s", me->get_cname(), next->get_cname());
sg4::CommPtr send = next->put_async(received, MESSAGE_SEND_DATA_HEADER_SIZE + PIECE_SIZE);
- pending_sends.push_back(send);
+ pending_sends.push(send);
} else
delete received;
void sendFile()
{
- std::vector<sg4::CommPtr> pending_sends;
+ sg4::ActivitySet pending_sends;
for (unsigned int current_piece = 0; current_piece < piece_count; current_piece++) {
XBT_DEBUG("Sending (send) piece %u from %s into mailbox %s", current_piece, sg4::Host::current()->get_cname(),
first->get_cname());
sg4::CommPtr comm = first->put_async(new FilePiece(), MESSAGE_SEND_DATA_HEADER_SIZE + PIECE_SIZE);
- pending_sends.push_back(comm);
+ pending_sends.push(comm);
}
- sg4::Comm::wait_all(pending_sends);
+ pending_sends.wait_all();
}
Broadcaster(int hostcount, unsigned int piece_count) : piece_count(piece_count)
p.joinChain();
p.forwardFile();
- sg4::Comm::wait_all(p.pending_sends);
+ p.pending_sends.wait_all();
double end_time = sg4::Engine::get_clock();
XBT_INFO("### %f %llu bytes (Avg %f MB/s); copy finished (simulated).", end_time - start_time, p.received_bytes,