return ret;
}
+void ActivitySet::handle_failed_activities()
+{
+ for (size_t i = 0; i < activities_.size(); i++) {
+ auto act = activities_[i];
+ if (act->pimpl_->get_state() == kernel::activity::State::FAILED) {
+ act->complete(Activity::State::FAILED);
+
+ failed_activities_.push_back(act);
+ activities_[i] = activities_[activities_.size() - 1];
+ activities_.resize(activities_.size() - 1);
+ i--; // compensate the i++ occuring at the end of the loop
+ }
+ }
+}
+
ActivityPtr ActivitySet::wait_any_for(double timeout)
{
std::vector<kernel::activity::ActivityImpl*> act_impls(activities_.size());
kernel::actor::ActorImpl* issuer = kernel::actor::ActorImpl::self();
kernel::actor::ActivityWaitanySimcall observer{issuer, act_impls, timeout, "wait_any_for"};
- ssize_t changed_pos = kernel::actor::simcall_blocking(
- [&observer] {
- kernel::activity::ActivityImpl::wait_any_for(observer.get_issuer(), observer.get_activities(),
- observer.get_timeout());
- },
- &observer);
- if (changed_pos == -1)
- throw TimeoutException(XBT_THROW_POINT, "Timeouted");
-
- auto ret = activities_.at(changed_pos);
- erase(ret);
- ret->complete(Activity::State::FINISHED);
- return ret;
+ try {
+ ssize_t changed_pos = kernel::actor::simcall_blocking(
+ [&observer] {
+ kernel::activity::ActivityImpl::wait_any_for(observer.get_issuer(), observer.get_activities(),
+ observer.get_timeout());
+ },
+ &observer);
+ if (changed_pos == -1)
+ throw TimeoutException(XBT_THROW_POINT, "Timeouted");
+
+ auto ret = activities_.at(changed_pos);
+ erase(ret);
+ ret->complete(Activity::State::FINISHED);
+ return ret;
+ } catch (const HostFailureException& e) {
+ handle_failed_activities();
+ throw;
+ } catch (const NetworkFailureException& e) {
+ handle_failed_activities();
+ throw;
+ } catch (const StorageFailureException& e) {
+ handle_failed_activities();
+ throw;
+ }
}
ActivityPtr ActivitySet::get_failed_activity()
{
delete as;
}
+void sg_activity_unref(sg_activity_t acti)
+{
+ acti->unref();
+}
SG_END_DECL