1 /* Copyright (c) 2023-. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
6 #include "src/kernel/activity/ActivityImpl.hpp"
7 #include "src/kernel/actor/ActorImpl.hpp"
8 #include "src/kernel/actor/CommObserver.hpp"
9 #include <simgrid/Exception.hpp>
10 #include <simgrid/activity_set.h>
11 #include <simgrid/s4u/ActivitySet.hpp>
12 #include <simgrid/s4u/Engine.hpp>
14 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(s4u_activityset, s4u_activity, "S4U set of activities");
18 template class xbt::Extendable<s4u::ActivitySet>;
22 void ActivitySet::erase(ActivityPtr a)
24 for (auto it = activities_.begin(); it != activities_.end(); it++)
26 activities_.erase(it);
31 void ActivitySet::wait_all_for(double timeout)
34 for (const auto& act : activities_)
39 double deadline = Engine::get_clock() + timeout;
40 for (const auto& act : activities_)
41 act->wait_until(deadline);
45 ActivityPtr ActivitySet::test_any()
47 std::vector<kernel::activity::ActivityImpl*> act_impls(activities_.size());
48 std::transform(begin(activities_), end(activities_), begin(act_impls),
49 [](const ActivityPtr& act) { return act->pimpl_.get(); });
51 kernel::actor::ActorImpl* issuer = kernel::actor::ActorImpl::self();
52 kernel::actor::ActivityTestanySimcall observer{issuer, act_impls, "test_any"};
53 ssize_t changed_pos = kernel::actor::simcall_answered(
55 return kernel::activity::ActivityImpl::test_any(observer.get_issuer(), observer.get_activities());
58 if (changed_pos == -1)
59 return ActivityPtr(nullptr);
61 auto ret = activities_.at(changed_pos);
63 ret->complete(Activity::State::FINISHED);
67 void ActivitySet::handle_failed_activities()
69 for (size_t i = 0; i < activities_.size(); i++) {
70 auto act = activities_[i];
71 if (act->pimpl_->get_state() == kernel::activity::State::FAILED) {
72 act->complete(Activity::State::FAILED);
74 failed_activities_.push_back(act);
75 activities_[i] = activities_[activities_.size() - 1];
76 activities_.resize(activities_.size() - 1);
77 i--; // compensate the i++ occuring at the end of the loop
82 ActivityPtr ActivitySet::wait_any_for(double timeout)
84 std::vector<kernel::activity::ActivityImpl*> act_impls(activities_.size());
85 std::transform(begin(activities_), end(activities_), begin(act_impls),
86 [](const ActivityPtr& activity) { return activity->pimpl_.get(); });
88 kernel::actor::ActorImpl* issuer = kernel::actor::ActorImpl::self();
89 kernel::actor::ActivityWaitanySimcall observer{issuer, act_impls, timeout, "wait_any_for"};
91 ssize_t changed_pos = kernel::actor::simcall_blocking(
93 kernel::activity::ActivityImpl::wait_any_for(observer.get_issuer(), observer.get_activities(),
94 observer.get_timeout());
97 if (changed_pos == -1)
98 throw TimeoutException(XBT_THROW_POINT, "Timeouted");
100 auto ret = activities_.at(changed_pos);
102 ret->complete(Activity::State::FINISHED);
104 } catch (const HostFailureException& e) {
105 handle_failed_activities();
107 } catch (const NetworkFailureException& e) {
108 handle_failed_activities();
110 } catch (const StorageFailureException& e) {
111 handle_failed_activities();
116 ActivityPtr ActivitySet::get_failed_activity()
118 if (failed_activities_.empty())
119 return ActivityPtr(nullptr);
120 auto ret = failed_activities_.back();
121 failed_activities_.pop_back();
126 } // namespace simgrid
130 sg_activity_set_t sg_activity_set_init()
132 return new simgrid::s4u::ActivitySet();
134 void sg_activity_set_push(sg_activity_set_t as, sg_activity_t acti)
138 void sg_activity_set_erase(sg_activity_set_t as, sg_activity_t acti)
142 size_t sg_activity_set_size(sg_activity_set_t as)
146 int sg_activity_set_empty(sg_activity_set_t as)
151 sg_activity_t sg_activity_set_test_any(sg_activity_set_t as)
153 return as->test_any().get();
155 void sg_activity_set_wait_all(sg_activity_set_t as)
159 int sg_activity_set_wait_all_for(sg_activity_set_t as, double timeout)
162 as->wait_all_for(timeout);
164 } catch (const simgrid::TimeoutException& e) {
168 sg_activity_t sg_activity_set_wait_any(sg_activity_set_t as)
170 return as->wait_any().get();
172 sg_activity_t sg_activity_set_wait_any_for(sg_activity_set_t as, double timeout)
175 return as->wait_any_for(timeout).get();
176 } catch (const simgrid::TimeoutException& e) {
181 void sg_activity_set_delete(sg_activity_set_t as)
185 void sg_activity_unref(sg_activity_t acti)