sg4::Mailbox* prev = nullptr;
sg4::Mailbox* next = nullptr;
sg4::Mailbox* me = nullptr;
- std::vector<sg4::CommPtr> pending_recvs;
- std::vector<sg4::CommPtr> pending_sends;
+ sg4::ActivitySet pending_recvs;
+ sg4::ActivitySet pending_sends;
unsigned long long received_bytes = 0;
unsigned int received_pieces = 0;
while (not done) {
sg4::CommPtr comm = me->get_async<FilePiece>(&received);
- pending_recvs.push_back(comm);
+ pending_recvs.push(comm);
- ssize_t idx = sg4::Comm::wait_any(pending_recvs);
- if (idx != -1) {
- comm = pending_recvs.at(idx);
+ auto completed_one = pending_recvs.wait_any();
+ if (completed_one != nullptr) {
+ comm = boost::dynamic_pointer_cast<sg4::Comm>(completed_one);
XBT_DEBUG("Peer %s got a 'SEND_DATA' message", me->get_cname());
- pending_recvs.erase(pending_recvs.begin() + idx);
if (next != nullptr) {
XBT_DEBUG("Sending (asynchronously) from %s to %s", me->get_cname(), next->get_cname());
sg4::CommPtr send = next->put_async(received, MESSAGE_SEND_DATA_HEADER_SIZE + PIECE_SIZE);
- pending_sends.push_back(send);
+ pending_sends.push(send);
} else
delete received;
void sendFile()
{
- std::vector<sg4::CommPtr> pending_sends;
+ sg4::ActivitySet pending_sends;
for (unsigned int current_piece = 0; current_piece < piece_count; current_piece++) {
XBT_DEBUG("Sending (send) piece %u from %s into mailbox %s", current_piece, sg4::Host::current()->get_cname(),
first->get_cname());
sg4::CommPtr comm = first->put_async(new FilePiece(), MESSAGE_SEND_DATA_HEADER_SIZE + PIECE_SIZE);
- pending_sends.push_back(comm);
+ pending_sends.push(comm);
}
- sg4::Comm::wait_all(pending_sends);
+ pending_sends.wait_all();
}
Broadcaster(int hostcount, unsigned int piece_count) : piece_count(piece_count)
p.joinChain();
p.forwardFile();
- sg4::Comm::wait_all(p.pending_sends);
+ p.pending_sends.wait_all();
double end_time = sg4::Engine::get_clock();
XBT_INFO("### %f %llu bytes (Avg %f MB/s); copy finished (simulated).", end_time - start_time, p.received_bytes,
sg4::Actor::create("Sender", host1, Sender("mailbox2", "mailbox3"));
sg4::Actor::create("Receiver", host2, Receiver("mailbox2"));
sg4::Actor::create("Receiver", host3, Receiver("mailbox3"));
-
+
sg4::Actor::create("LinkKiller", host1, [](){
sg4::this_actor::sleep_for(10.0);
XBT_INFO("Turning off link 'linkto2'");
// Define an amount of work that should take 1 second to execute.
double computation_amount = sg4::this_actor::get_host()->get_speed();
- std::vector<sg4::ExecPtr> pending_execs;
// Create a small DAG
// + Two parents and a child
// + First parent ends after 1 second and the Second parent after 2 seconds.
sg4::ExecPtr first_parent = sg4::this_actor::exec_init(computation_amount);
- pending_execs.push_back(first_parent);
sg4::ExecPtr second_parent = sg4::this_actor::exec_init(2 * computation_amount);
- pending_execs.push_back(second_parent);
sg4::ExecPtr child = sg4::Exec::init()->set_flops_amount(computation_amount);
- pending_execs.push_back(child);
+
+ sg4::ActivitySet pending_execs ({first_parent, second_parent, child});
// Name the activities (for logging purposes only)
first_parent->set_name("parent 1");
// wait for the completion of all activities
while (not pending_execs.empty()) {
- ssize_t changed_pos = sg4::Exec::wait_any_for(pending_execs, -1);
- XBT_INFO("Exec '%s' is complete", pending_execs[changed_pos]->get_cname());
- pending_execs.erase(pending_execs.begin() + changed_pos);
+ auto completed_one = pending_execs.wait_any();
+ if (completed_one != nullptr)
+ XBT_INFO("Exec '%s' is complete", completed_one->get_cname());
}
}
static void test()
{
- std::vector<sg4::ActivityPtr> pending_activities;
-
sg4::ExecPtr bob_compute = sg4::this_actor::exec_init(1e9);
- pending_activities.push_back(boost::dynamic_pointer_cast<sg4::Activity>(bob_compute));
sg4::IoPtr bob_write = sg4::Host::current()->get_disks().front()->io_init(4000000, sg4::Io::OpType::WRITE);
- pending_activities.push_back(boost::dynamic_pointer_cast<sg4::Activity>(bob_write));
sg4::IoPtr carl_read = sg4::Host::by_name("carl")->get_disks().front()->io_init(4000000, sg4::Io::OpType::READ);
- pending_activities.push_back(boost::dynamic_pointer_cast<sg4::Activity>(carl_read));
sg4::ExecPtr carl_compute = sg4::Host::by_name("carl")->exec_init(1e9);
- pending_activities.push_back(boost::dynamic_pointer_cast<sg4::Activity>(carl_compute));
+
+ sg4::ActivitySet pending_activities ({boost::dynamic_pointer_cast<sg4::Activity>(bob_compute),
+ boost::dynamic_pointer_cast<sg4::Activity>(bob_write),
+ boost::dynamic_pointer_cast<sg4::Activity>(carl_read),
+ boost::dynamic_pointer_cast<sg4::Activity>(carl_compute)});
// Name the activities (for logging purposes only)
bob_compute->set_name("bob compute");
// wait for the completion of all activities
while (not pending_activities.empty()) {
- ssize_t changed_pos = sg4::Activity::wait_any(pending_activities);
- XBT_INFO("Activity '%s' is complete", pending_activities[changed_pos]->get_cname());
- pending_activities.erase(pending_activities.begin() + changed_pos);
+ auto completed_one = pending_activities.wait_any();
+ if (completed_one != nullptr)
+ XBT_INFO("Activity '%s' is complete", completed_one->get_cname());
}
}
void operator()() const
{
// sphinx-doc: init-begin (this line helps the doc to build; ignore it)
- /* Vector in which we store all ongoing communications */
- std::vector<sg4::CommPtr> pending_comms;
+ /* ActivitySet in which we store all ongoing communications */
+ sg4::ActivitySet pending_comms;
- /* Make a vector of the mailboxes to use */
+ /* Mailbox to use */
sg4::Mailbox* mbox = sg4::Mailbox::by_name("receiver");
// sphinx-doc: init-end
/* Create a communication representing the ongoing communication, and store it in pending_comms */
sg4::CommPtr comm = mbox->put_async(payload, size);
- pending_comms.push_back(comm);
+ pending_comms.push(comm);
}
XBT_INFO("Done dispatching all messages");
/* Now that all message exchanges were initiated, wait for their completion in one single call */
- sg4::Comm::wait_all(pending_comms);
+ pending_comms.wait_all();
// sphinx-doc: put-end
XBT_INFO("Goodbye now!");
explicit Receiver(int count) : messages_count(count) { mbox = sg4::Mailbox::by_name("receiver"); }
void operator()()
{
- /* Vector in which we store all incoming msgs */
- std::vector<std::unique_ptr<std::string*>> pending_msgs;
- std::vector<sg4::CommPtr> pending_comms;
+ /* Where we store all incoming msgs */
+ std::unordered_map<sg4::CommPtr, std::string**> pending_msgs;
+ sg4::ActivitySet pending_comms;
XBT_INFO("Wait for %d messages asynchronously", messages_count);
for (int i = 0; i < messages_count; i++) {
- pending_msgs.push_back(std::make_unique<std::string*>());
- pending_comms.emplace_back(mbox->get_async<std::string>(pending_msgs[i].get()));
+ std::string* msg;
+ auto comm = mbox->get_async<std::string>(&msg);
+ pending_comms.push(comm);
+ pending_msgs.insert({comm, &msg});
}
+
while (not pending_comms.empty()) {
- ssize_t index = sg4::Comm::wait_any(pending_comms);
- std::string* msg = *pending_msgs[index];
- XBT_INFO("I got '%s'.", msg->c_str());
- /* cleanup memory and remove from vectors */
- delete msg;
- pending_comms.erase(pending_comms.begin() + index);
- pending_msgs.erase(pending_msgs.begin() + index);
+ auto completed_one = pending_comms.wait_any();
+ if (completed_one != nullptr){
+ auto comm = boost::dynamic_pointer_cast<sg4::Comm>(completed_one);
+ std::string *msg = std::move(*pending_msgs[comm]);
+ XBT_INFO("I got '%s'.", msg->c_str());
+ /* cleanup memory and remove from map */
+ delete msg;
+ pending_msgs.erase(comm);
+ }
}
}
};
*
* This example is very similar to the other asynchronous communication examples, but messages get serialized by the platform.
* Without this call to Link::set_concurrency_limit(2) in main, all messages would be received at the exact same timestamp since
- * they are initiated at the same instant and are of the same size. But with this extra configuration to the link, at most 2
+ * they are initiated at the same instant and are of the same size. But with this extra configuration to the link, at most 2
* messages can travel through the link at the same time.
*/
void operator()() const
{
// sphinx-doc: init-begin (this line helps the doc to build; ignore it)
- /* Vector in which we store all ongoing communications */
- std::vector<sg4::CommPtr> pending_comms;
+ /* ActivitySet in which we store all ongoing communications */
+ sg4::ActivitySet pending_comms;
- /* Make a vector of the mailboxes to use */
+ /* Mailbox to use */
sg4::Mailbox* mbox = sg4::Mailbox::by_name("receiver");
// sphinx-doc: init-end
/* Create a communication representing the ongoing communication, and store it in pending_comms */
sg4::CommPtr comm = mbox->put_async(payload, msg_size);
- pending_comms.push_back(comm);
+ pending_comms.push(comm);
}
XBT_INFO("Done dispatching all messages");
/* Now that all message exchanges were initiated, wait for their completion in one single call */
- sg4::Comm::wait_all(pending_comms);
+ pending_comms.wait_all();
// sphinx-doc: put-end
XBT_INFO("Goodbye now!");