(because of a bug in Sonar coverage computation)
C++
- fields, methods and variables are in snake_case()
- - Classes and Enum are in UpperCamelCase
+ - Classes and Enum names are in UpperCamelCase
+ - Enum values are in UPPER_SNAKE_CASE (as constants)
- public filenames: api_Class.cpp and api/Class.hpp.
- Example: src/s4u/s4u_ConditionVariable.cpp and
include/simgrid/s4u/ConditionVariable.hpp
class trace;
class future_evt_set;
}
+namespace vm {
+class VMModel;
+class VirtualMachineImpl;
+} // namespace vm
} // namespace simgrid
typedef simgrid::s4u::Actor s4u_Actor;
bool empty() const { return heap_type::empty(); }
};
-/** @details An action is a consumption on a resource (e.g.: a communication for the network) */
+/** @details An action is a consumption on a resource (e.g.: a communication for the network).
+ *
+ * It is related (but still different) from activities, that are the stuff on which an actor can be blocked.
+ * See simgrid::s4u::Activity for more details.
+ */
class XBT_PUBLIC Action {
friend ActionHeap;
StateSet;
enum class State {
- ready = 0, /**< Ready */
- running, /**< Running */
- failed, /**< Task Failure */
- done, /**< Completed */
- to_free, /**< Action to free in next cleanup */
- not_in_the_system /**< Not in the system anymore. Why did you ask ? */
+ INITED, /**< Created, but not started yet */
+ STARTED, /**< Currently running */
+ FAILED, /**< either the resource failed, or the action was canceled */
+ FINISHED, /**< Successfully completed */
+ IGNORED /**< e.g. failure detectors: infinite sleep actions that are put on resources which failure should get
+ noticed */
};
enum class SuspendStates {
virtual ~Model();
- /** @brief Get the set of [actions](@ref Action) in *ready* state */
- Action::StateSet* get_ready_action_set() const { return ready_action_set_; }
+ /** @brief Get the set of [actions](@ref Action) in *inited* state */
+ Action::StateSet* get_inited_action_set() const { return inited_action_set_; }
- /** @brief Get the set of [actions](@ref Action) in *running* state */
- Action::StateSet* get_running_action_set() const { return running_action_set_; }
+ /** @brief Get the set of [actions](@ref Action) in *started* state */
+ Action::StateSet* get_started_action_set() const { return started_action_set_; }
/** @brief Get the set of [actions](@ref Action) in *failed* state */
Action::StateSet* get_failed_action_set() const { return failed_action_set_; }
- /** @brief Get the set of [actions](@ref Action) in *done* state */
- Action::StateSet* get_done_action_set() const { return done_action_set_; }
+ /** @brief Get the set of [actions](@ref Action) in *finished* state */
+ Action::StateSet* get_finished_action_set() const { return finished_action_set_; }
+
+ /** @brief Get the set of [actions](@ref Action) in *ignored* state */
+ Action::StateSet* get_ignored_action_set() const { return ignored_action_set_; }
/** @brief Get the set of modified [actions](@ref Action) */
Action::ModifiedSet* get_modified_set() const;
private:
lmm::System* maxmin_system_ = nullptr;
const UpdateAlgo update_algorithm_;
- Action::StateSet* ready_action_set_ = new Action::StateSet(); /**< Actions in state SURF_ACTION_READY */
- Action::StateSet* running_action_set_ = new Action::StateSet(); /**< Actions in state SURF_ACTION_RUNNING */
- Action::StateSet* failed_action_set_ = new Action::StateSet(); /**< Actions in state SURF_ACTION_FAILED */
- Action::StateSet* done_action_set_ = new Action::StateSet(); /**< Actions in state SURF_ACTION_DONE */
+ Action::StateSet* inited_action_set_ = new Action::StateSet(); /**< Created not started */
+ Action::StateSet* started_action_set_ = new Action::StateSet(); /**< Started not done */
+ Action::StateSet* failed_action_set_ = new Action::StateSet(); /**< Done with failure */
+ Action::StateSet* finished_action_set_ = new Action::StateSet(); /**< Done successful */
+ Action::StateSet* ignored_action_set_ = new Action::StateSet(); /**< not considered (failure detectors?) */
+
ActionHeap action_heap_;
};
/** @brief Activities
*
* This class is the ancestor of every activities that an actor can undertake.
- * That is, of the actions that do take time in the simulated world.
+ * That is, activities are all the things that do take time to the actor in the simulated world.
+ *
+ * They are somewhat linked but not identical to simgrid::kernel::resource::Action,
+ * that are stuff occurring on a resource:
+ *
+ * - A sequential execution activity encompasses 2 actions: one for the exec itself,
+ * and a time-limited sleep used as timeout detector.
+ * - A point-to-point communication activity encompasses 3 actions: one for the comm itself
+ * (which spans on all links of the path), and one infinite sleep used as failure detector
+ * on both sender and receiver hosts.
+ * - Synchronization activities may possibly be connected to no action.
*/
class XBT_PUBLIC Activity {
friend Comm;
Activity(Activity const&) = delete;
Activity& operator=(Activity const&) = delete;
- enum class State { inited = 0, started, canceled, errored, finished };
+ enum class State { INITED = 0, STARTED, CANCELED, ERRORED, FINISHED };
/** Starts a previously created activity.
*
private:
simgrid::kernel::activity::ActivityImplPtr pimpl_ = nullptr;
- Activity::State state_ = Activity::State::inited;
+ Activity::State state_ = Activity::State::INITED;
double remains_ = 0;
void* user_data_ = nullptr;
}; // class
* and actors can retrieve the host on which they run using simgrid::s4u::Host::current().
*/
class XBT_PUBLIC Host : public simgrid::xbt::Extendable<Host> {
+ friend simgrid::vm::VMModel; // Use the pimpl_cpu to compute the VM sharing
+ friend simgrid::vm::VirtualMachineImpl; // creates the the pimpl_cpu
public:
explicit Host(const char* name);
std::unordered_map<std::string, Storage*>* mounts_ = nullptr; // caching
public:
- // TODO, this could be a unique_ptr
- surf::HostImpl* pimpl_ = nullptr;
/** DO NOT USE DIRECTLY (@todo: these should be protected, once our code is clean) */
surf::Cpu* pimpl_cpu = nullptr;
+ // TODO, this could be a unique_ptr
+ surf::HostImpl* pimpl_ = nullptr;
/** DO NOT USE DIRECTLY (@todo: these should be protected, once our code is clean) */
kernel::routing::NetPoint* pimpl_netpoint = nullptr;
};
#endif
XBT_PUBLIC void SIMIX_process_detach();
-/*********************************** Host *************************************/
-XBT_PUBLIC void SIMIX_host_off(sg_host_t host, smx_actor_t issuer);
-
/********************************* Process ************************************/
XBT_PUBLIC int SIMIX_process_count();
XBT_PUBLIC smx_actor_t SIMIX_process_self();
void simgrid::kernel::activity::CommImpl::post()
{
/* Update synchro state */
- if (src_timeout && src_timeout->get_state() == simgrid::kernel::resource::Action::State::done)
+ if (src_timeout && src_timeout->get_state() == simgrid::kernel::resource::Action::State::FINISHED)
state_ = SIMIX_SRC_TIMEOUT;
- else if (dst_timeout && dst_timeout->get_state() == simgrid::kernel::resource::Action::State::done)
+ else if (dst_timeout && dst_timeout->get_state() == simgrid::kernel::resource::Action::State::FINISHED)
state_ = SIMIX_DST_TIMEOUT;
- else if (src_timeout && src_timeout->get_state() == simgrid::kernel::resource::Action::State::failed)
+ else if (src_timeout && src_timeout->get_state() == simgrid::kernel::resource::Action::State::FAILED)
state_ = SIMIX_SRC_HOST_FAILURE;
- else if (dst_timeout && dst_timeout->get_state() == simgrid::kernel::resource::Action::State::failed)
+ else if (dst_timeout && dst_timeout->get_state() == simgrid::kernel::resource::Action::State::FAILED)
state_ = SIMIX_DST_HOST_FAILURE;
- else if (surfAction_ && surfAction_->get_state() == simgrid::kernel::resource::Action::State::failed) {
+ else if (surfAction_ && surfAction_->get_state() == simgrid::kernel::resource::Action::State::FAILED) {
state_ = SIMIX_LINK_FAILURE;
} else
state_ = SIMIX_DONE;
/* If the host running the synchro failed, notice it. This way, the asking
* process can be killed if it runs on that host itself */
state_ = SIMIX_FAILED;
- } else if (surf_action_ && surf_action_->get_state() == simgrid::kernel::resource::Action::State::failed) {
+ } else if (surf_action_ && surf_action_->get_state() == simgrid::kernel::resource::Action::State::FAILED) {
/* If the host running the synchro didn't fail, then the synchro was canceled */
state_ = SIMIX_CANCELED;
- } else if (timeout_detector_ && timeout_detector_->get_state() == simgrid::kernel::resource::Action::State::done) {
+ } else if (timeout_detector_ &&
+ timeout_detector_->get_state() == simgrid::kernel::resource::Action::State::FINISHED) {
state_ = SIMIX_TIMEOUT;
} else {
state_ = SIMIX_DONE;
e_smx_state_t result;
switch (surf_sleep->get_state()) {
- case simgrid::kernel::resource::Action::State::failed:
+ case simgrid::kernel::resource::Action::State::FAILED:
simcall->issuer->context->iwannadie = 1;
result = SIMIX_SRC_HOST_FAILURE;
break;
- case simgrid::kernel::resource::Action::State::done:
+ case simgrid::kernel::resource::Action::State::FINISHED:
result = SIMIX_DONE;
break;
}
switch (surf_io->get_state()) {
- case simgrid::kernel::resource::Action::State::failed:
+ case simgrid::kernel::resource::Action::State::FAILED:
state_ = SIMIX_FAILED;
break;
- case simgrid::kernel::resource::Action::State::done:
+ case simgrid::kernel::resource::Action::State::FINISHED:
state_ = SIMIX_DONE;
break;
default:
void simgrid::kernel::activity::RawImpl::post()
{
XBT_IN("(%p)",this);
- if (sleep->get_state() == simgrid::kernel::resource::Action::State::failed)
+ if (sleep->get_state() == simgrid::kernel::resource::Action::State::FAILED)
state_ = SIMIX_FAILED;
- else if (sleep->get_state() == simgrid::kernel::resource::Action::State::done)
+ else if (sleep->get_state() == simgrid::kernel::resource::Action::State::FINISHED)
state_ = SIMIX_SRC_TIMEOUT;
SIMIX_synchro_finish(this);
if (failed)
state_set_ = get_model()->get_failed_action_set();
else
- state_set_ = get_model()->get_running_action_set();
+ state_set_ = get_model()->get_started_action_set();
state_set_->push_back(*this);
}
Action::State Action::get_state() const
{
- if (state_set_ == model_->get_ready_action_set())
- return Action::State::ready;
- if (state_set_ == model_->get_running_action_set())
- return Action::State::running;
+ if (state_set_ == model_->get_inited_action_set())
+ return Action::State::INITED;
+ if (state_set_ == model_->get_started_action_set())
+ return Action::State::STARTED;
if (state_set_ == model_->get_failed_action_set())
- return Action::State::failed;
- if (state_set_ == model_->get_done_action_set())
- return Action::State::done;
- return Action::State::not_in_the_system;
+ return Action::State::FAILED;
+ if (state_set_ == model_->get_finished_action_set())
+ return Action::State::FINISHED;
+ if (state_set_ == model_->get_ignored_action_set())
+ return Action::State::IGNORED;
+ THROW_IMPOSSIBLE;
}
void Action::set_state(Action::State state)
{
simgrid::xbt::intrusive_erase(*state_set_, *this);
switch (state) {
- case Action::State::ready:
- state_set_ = model_->get_ready_action_set();
+ case Action::State::INITED:
+ state_set_ = model_->get_inited_action_set();
break;
- case Action::State::running:
- state_set_ = model_->get_running_action_set();
+ case Action::State::STARTED:
+ state_set_ = model_->get_started_action_set();
break;
- case Action::State::failed:
+ case Action::State::FAILED:
state_set_ = model_->get_failed_action_set();
break;
- case Action::State::done:
- state_set_ = model_->get_done_action_set();
+ case Action::State::FINISHED:
+ state_set_ = model_->get_finished_action_set();
+ break;
+ case Action::State::IGNORED:
+ state_set_ = model_->get_ignored_action_set();
break;
default:
state_set_ = nullptr;
void Action::cancel()
{
- set_state(Action::State::failed);
+ set_state(Action::State::FAILED);
if (get_model()->get_update_algorithm() == Model::UpdateAlgo::Lazy) {
if (modified_set_hook_.is_linked())
simgrid::xbt::intrusive_erase(*get_model()->get_modified_set(), *this);
get_model()->get_maxmin_system()->update_variable_weight(get_variable(), 0.0);
if (get_model()->get_update_algorithm() == Model::UpdateAlgo::Lazy) {
get_model()->get_action_heap().remove(this);
- if (state_set_ == get_model()->get_running_action_set() && sharing_priority_ > 0) {
+ if (state_set_ == get_model()->get_started_action_set() && sharing_priority_ > 0) {
// If we have a lazy model, we need to update the remaining value accordingly
update_remains_lazy(surf_get_clock());
}
Model::~Model()
{
- delete ready_action_set_;
- delete running_action_set_;
+ delete inited_action_set_;
+ delete started_action_set_;
delete failed_action_set_;
- delete done_action_set_;
+ delete finished_action_set_;
delete maxmin_system_;
}
maxmin_system_->modified_set_->pop_front();
bool max_duration_flag = false;
- if (action->get_state_set() != running_action_set_)
+ if (action->get_state_set() != started_action_set_)
continue;
/* bogus priority, skip it */
double min = -1;
- for (Action& action : *get_running_action_set()) {
+ for (Action& action : *get_started_action_set()) {
double value = action.get_variable()->get_value();
if (value > 0) {
if (action.get_remains() > 0)
/* iterate for all virtual machines */
for (s4u::VirtualMachine* const& ws_vm : VirtualMachineImpl::allVms_) {
surf::Cpu* cpu = ws_vm->pimpl_cpu;
- xbt_assert(cpu, "cpu-less host");
double solved_value =
ws_vm->getImpl()->action_->get_variable()->get_value(); // this is X1 in comment above, what
/* Currently, a VM uses the network resource of its physical host */
pimpl_netpoint = pm->pimpl_netpoint;
+
// Create a VCPU for this VM
- surf::CpuCas01* sub_cpu = dynamic_cast<surf::CpuCas01*>(pm->pimpl_cpu);
+ std::vector<double>* speeds = new std::vector<double>();
+ for (int i = 0; i < pm->getPstatesCount(); i++)
+ speeds->push_back(pm->getPstateSpeed(i));
- pimpl_cpu = surf_cpu_model_vm->createCpu(this, sub_cpu->getSpeedPeakList(), coreAmount);
- if (sub_cpu->getPState() != 0)
- pimpl_cpu->setPState(sub_cpu->getPState());
+ surf_cpu_model_vm->createCpu(this, speeds, pm->getCoreCount());
+ if (pm->getPstate() != 0)
+ setPstate(pm->getPstate());
/* Make a process container */
extension_set<simgrid::simix::Host>(new simgrid::simix::Host());
Activity* Activity::set_remaining(double remains)
{
- xbt_assert(state_ == State::inited, "Cannot change the remaining amount of work once the Activity is started");
+ xbt_assert(state_ == State::INITED, "Cannot change the remaining amount of work once the Activity is started");
remains_ = remains;
return this;
}
namespace s4u {
Comm::~Comm()
{
- if (state_ == State::started && not detached_ && (pimpl_ == nullptr || pimpl_->state_ == SIMIX_RUNNING)) {
+ if (state_ == State::STARTED && not detached_ && (pimpl_ == nullptr || pimpl_->state_ == SIMIX_RUNNING)) {
XBT_INFO("Comm %p freed before its completion. Detached: %d, State: %d", this, detached_, (int)state_);
if (pimpl_ != nullptr)
XBT_INFO("pimpl_->state: %d", pimpl_->state_);
intrusive_ptr_release(*(simgrid::kernel::activity::ActivityImpl**)ptr);
});
for (auto const& comm : *comms_in) {
- if (comm->state_ == Activity::State::inited)
+ if (comm->state_ == Activity::State::INITED)
comm->start();
- xbt_assert(comm->state_ == Activity::State::started);
+ xbt_assert(comm->state_ == Activity::State::STARTED);
simgrid::kernel::activity::ActivityImpl* ptr = comm->pimpl_.get();
intrusive_ptr_add_ref(ptr);
xbt_dynar_push_as(comms, simgrid::kernel::activity::ActivityImpl*, ptr);
Activity* Comm::set_rate(double rate)
{
- xbt_assert(state_ == State::inited);
+ xbt_assert(state_ == State::INITED);
rate_ = rate;
return this;
}
Activity* Comm::set_src_data(void* buff)
{
- xbt_assert(state_ == State::inited);
+ xbt_assert(state_ == State::INITED);
xbt_assert(dst_buff_ == nullptr, "Cannot set the src and dst buffers at the same time");
src_buff_ = buff;
return this;
}
Activity* Comm::set_src_data_size(size_t size)
{
- xbt_assert(state_ == State::inited);
+ xbt_assert(state_ == State::INITED);
src_buff_size_ = size;
return this;
}
Activity* Comm::set_src_data(void* buff, size_t size)
{
- xbt_assert(state_ == State::inited);
+ xbt_assert(state_ == State::INITED);
xbt_assert(dst_buff_ == nullptr, "Cannot set the src and dst buffers at the same time");
src_buff_ = buff;
}
Activity* Comm::set_dst_data(void** buff)
{
- xbt_assert(state_ == State::inited);
+ xbt_assert(state_ == State::INITED);
xbt_assert(src_buff_ == nullptr, "Cannot set the src and dst buffers at the same time");
dst_buff_ = buff;
return this;
}
size_t Comm::get_dst_data_size()
{
- xbt_assert(state_ == State::finished);
+ xbt_assert(state_ == State::FINISHED);
return dst_buff_size_;
}
Activity* Comm::set_dst_data(void** buff, size_t size)
{
- xbt_assert(state_ == State::inited);
+ xbt_assert(state_ == State::INITED);
xbt_assert(src_buff_ == nullptr, "Cannot set the src and dst buffers at the same time");
dst_buff_ = buff;
Activity* Comm::start()
{
- xbt_assert(state_ == State::inited);
+ xbt_assert(state_ == State::INITED);
if (src_buff_ != nullptr) { // Sender side
pimpl_ = simcall_comm_isend(sender_, mailbox_->get_impl(), remains_, rate_, src_buff_, src_buff_size_, match_fun_,
} else {
xbt_die("Cannot start a communication before specifying whether we are the sender or the receiver");
}
- state_ = State::started;
+ state_ = State::STARTED;
return this;
}
Activity* Comm::wait(double timeout)
{
switch (state_) {
- case State::finished:
+ case State::FINISHED:
return this;
- case State::inited: // It's not started yet. Do it in one simcall
+ case State::INITED: // It's not started yet. Do it in one simcall
if (src_buff_ != nullptr) {
simcall_comm_send(sender_, mailbox_->get_impl(), remains_, rate_, src_buff_, src_buff_size_, match_fun_,
copy_data_function_, user_data_, timeout);
simcall_comm_recv(receiver_, mailbox_->get_impl(), dst_buff_, &dst_buff_size_, match_fun_, copy_data_function_,
user_data_, timeout, rate_);
}
- state_ = State::finished;
+ state_ = State::FINISHED;
return this;
- case State::started:
+ case State::STARTED:
simcall_comm_wait(pimpl_, timeout);
- state_ = State::finished;
+ state_ = State::FINISHED;
return this;
default:
Activity* Comm::detach()
{
- xbt_assert(state_ == State::inited, "You cannot detach communications once they are started (not implemented).");
+ xbt_assert(state_ == State::INITED, "You cannot detach communications once they are started (not implemented).");
xbt_assert(src_buff_ != nullptr && src_buff_size_ != 0, "You can only detach sends, not recvs");
detached_ = true;
return start();
bool Comm::test()
{
- xbt_assert(state_ == State::inited || state_ == State::started || state_ == State::finished);
+ xbt_assert(state_ == State::INITED || state_ == State::STARTED || state_ == State::FINISHED);
- if (state_ == State::finished)
+ if (state_ == State::FINISHED)
return true;
- if (state_ == State::inited)
+ if (state_ == State::INITED)
this->start();
if (simcall_comm_test(pimpl_)) {
- state_ = State::finished;
+ state_ = State::FINISHED;
return true;
}
return false;
{
pimpl_ = simcall_execution_start(nullptr, flops_amount_, 1. / priority_, 0., host_);
boost::static_pointer_cast<simgrid::kernel::activity::ExecImpl>(pimpl_)->set_bound(bound_);
- state_ = State::started;
+ state_ = State::STARTED;
return this;
}
Activity* Exec::wait()
{
simcall_execution_wait(pimpl_);
- state_ = State::finished;
+ state_ = State::FINISHED;
return this;
}
/** @brief Returns whether the state of the exec is finished */
bool Exec::test()
{
- xbt_assert(state_ == State::inited || state_ == State::started || state_ == State::finished);
+ xbt_assert(state_ == State::INITED || state_ == State::STARTED || state_ == State::FINISHED);
- if (state_ == State::finished)
+ if (state_ == State::FINISHED)
return true;
- if (state_ == State::inited)
+ if (state_ == State::INITED)
this->start();
if (simcall_execution_test(pimpl_)) {
- state_ = State::finished;
+ state_ = State::FINISHED;
return true;
}
* Currently, this cannot be changed once the exec started. */
ExecPtr Exec::set_priority(double priority)
{
- xbt_assert(state_ == State::inited, "Cannot change the priority of an exec after its start");
+ xbt_assert(state_ == State::INITED, "Cannot change the priority of an exec after its start");
priority_ = priority;
return this;
}
* Currently, this cannot be changed once the exec started. */
ExecPtr Exec::set_bound(double bound)
{
- xbt_assert(state_ == State::inited, "Cannot change the bound of an exec after its start");
+ xbt_assert(state_ == State::INITED, "Cannot change the bound of an exec after its start");
bound_ = bound;
return this;
}
* The activity cannot be terminated already (but it may be started). */
ExecPtr Exec::set_host(Host* host)
{
- xbt_assert(state_ == State::inited || state_ == State::started,
+ xbt_assert(state_ == State::INITED || state_ == State::STARTED,
"Cannot change the host of an exec once it's done (state: %d)", (int)state_);
- if (state_ == State::started)
+ if (state_ == State::STARTED)
boost::static_pointer_cast<simgrid::kernel::activity::ExecImpl>(pimpl_)->migrate(host);
host_ = host;
return this;
}
}
+/** @brief Stop the host if it is on */
void Host::turnOff()
{
if (isOn()) {
smx_actor_t self = SIMIX_process_self();
simgrid::simix::simcall([this, self] {
- SIMIX_host_off(this, self);
+ simgrid::simix::Host* host = this->extension<simgrid::simix::Host>();
+
+ xbt_assert((host != nullptr), "Invalid parameters");
+
+ this->pimpl_cpu->turn_off();
+
+ /* Clean Simulator data */
+ if (not host->process_list.empty()) {
+ for (auto& process : host->process_list) {
+ SIMIX_process_kill(&process, self);
+ XBT_DEBUG("Killing %s@%s on behalf of %s which turned off that host.", process.get_cname(),
+ process.host->get_cname(), self->get_cname());
+ }
+ }
+
on_state_change(*this);
});
}
[](void*, void* arg) {
auto sleep = static_cast<simgrid::kernel::activity::SleepImpl*>(arg);
if (sleep->surf_sleep)
- sleep->surf_sleep->finish(simgrid::kernel::resource::Action::State::done);
+ sleep->surf_sleep->finish(simgrid::kernel::resource::Action::State::FINISHED);
intrusive_ptr_release(sleep);
return 0;
},
}} // namespaces
-/** @brief Stop the host if it is on */
-void SIMIX_host_off(sg_host_t h, smx_actor_t issuer)
-{
- simgrid::simix::Host* host = h->extension<simgrid::simix::Host>();
-
- xbt_assert((host != nullptr), "Invalid parameters");
-
- if (h->isOn()) {
- h->pimpl_cpu->turn_off();
-
- /* Clean Simulator data */
- if (not host->process_list.empty()) {
- for (auto& process : host->process_list) {
- SIMIX_process_kill(&process, issuer);
- XBT_DEBUG("Killing %s@%s on behalf of %s which turned off that host.", process.get_cname(),
- process.host->get_cname(), issuer->get_cname());
- }
- }
- } else {
- XBT_INFO("Host %s is already off", h->get_cname());
- }
-}
-
/* needs to be public and without simcall for exceptions and logging events */
const char* sg_host_self_get_name()
{
receiver->get_cname(), comm->surfAction_);
/* If a link is failed, detect it immediately */
- if (comm->surfAction_->get_state() == simgrid::kernel::resource::Action::State::failed) {
+ if (comm->surfAction_->get_state() == simgrid::kernel::resource::Action::State::FAILED) {
XBT_DEBUG("Communication from '%s' to '%s' failed to start because of a link failure", sender->get_cname(),
receiver->get_cname());
comm->state_ = SIMIX_LINK_FAILURE;
req->size(),
req->tag(),
simgrid::smpi::Datatype::encode(req->type())));
- if (not TRACE_smpi_view_internals() && req->flags() & SEND)
+ if (not TRACE_smpi_view_internals() && req->flags() & MPI_REQ_SEND)
TRACE_smpi_send(my_proc_id, my_proc_id, getPid(req->comm(), req->dst()), req->tag(), req->size());
req->start();
- if (not TRACE_smpi_view_internals() && req->flags() & RECV)
+ if (not TRACE_smpi_view_internals() && req->flags() & MPI_REQ_RECV)
TRACE_smpi_recv(getPid(req->comm(), req->src()), my_proc_id, req->tag());
retval = MPI_SUCCESS;
TRACE_smpi_comm_out(my_proc_id);
if (not TRACE_smpi_view_internals())
for (int i = 0; i < count; i++) {
req = requests[i];
- if (req->flags() & SEND)
+ if (req->flags() & MPI_REQ_SEND)
TRACE_smpi_send(my_proc_id, my_proc_id, getPid(req->comm(), req->dst()), req->tag(), req->size());
}
if (not TRACE_smpi_view_internals())
for (int i = 0; i < count; i++) {
req = requests[i];
- if (req->flags() & RECV)
+ if (req->flags() & MPI_REQ_RECV)
TRACE_smpi_recv(getPid(req->comm(), req->src()), my_proc_id, req->tag());
}
TRACE_smpi_comm_out(my_proc_id);
int src_traced = req->src();
// the src may not have been known at the beginning of the recv (MPI_ANY_SOURCE)
int dst_traced = req->dst();
- if (req->flags() & RECV) { // Is this request a wait for RECV?
+ if (req->flags() & MPI_REQ_RECV) { // Is this request a wait for RECV?
if (src_traced == MPI_ANY_SOURCE)
src_traced = (status != MPI_STATUSES_IGNORE) ? req->comm()->group()->rank(status->MPI_SOURCE) : req->src();
TRACE_smpi_recv(src_traced, dst_traced, req->tag());
#include <time.h>
#endif
-#define PERSISTENT 0x1
-#define NON_PERSISTENT 0x2
-#define SEND 0x4
-#define RECV 0x8
-#define RECV_DELETE 0x10
-#define ISEND 0x20
-#define SSEND 0x40
-#define PREPARED 0x80
-#define FINISHED 0x100
-#define RMA 0x200
-#define ACCUMULATE 0x400
+#define MPI_REQ_PERSISTENT 0x1
+#define MPI_REQ_NON_PERSISTENT 0x2
+#define MPI_REQ_SEND 0x4
+#define MPI_REQ_RECV 0x8
+//#define MPI_REQ_RECV_DELETE 0x10
+#define MPI_REQ_ISEND 0x20
+#define MPI_REQ_SSEND 0x40
+#define MPI_REQ_PREPARED 0x80
+#define MPI_REQ_FINISHED 0x100
+#define MPI_REQ_RMA 0x200
+#define MPI_REQ_ACCUMULATE 0x400
enum smpi_process_state { SMPI_UNINITIALIZED, SMPI_INITIALIZED, SMPI_FINALIZED };
// Must be taken before Request::wait() since the request may be set to
// MPI_REQUEST_NULL by Request::wait!
- bool is_wait_for_receive = (request->flags() & RECV);
+ bool is_wait_for_receive = (request->flags() & MPI_REQ_RECV);
// TODO: Here we take the rank while we normally take the process id (look for my_proc_id)
TRACE_smpi_comm_in(rank, __func__, new simgrid::instr::NoOpTIData("wait"));
std::vector<MPI_Request> reqs;
req_storage.get_requests(reqs);
for (const auto& req : reqs) {
- if (req && (req->flags() & RECV)) {
+ if (req && (req->flags() & MPI_REQ_RECV)) {
sender_receiver.push_back({req->src(), req->dst()});
}
}
{
void *old_buf = nullptr;
// FIXME Handle the case of a partial shared malloc.
- if ((((flags & RECV) != 0) && ((flags & ACCUMULATE) != 0)) || (datatype->flags() & DT_FLAG_DERIVED)) {
+ if ((((flags & MPI_REQ_RECV) != 0) && ((flags & MPI_REQ_ACCUMULATE) != 0)) || (datatype->flags() & DT_FLAG_DERIVED)) {
// This part handles the problem of non-contiguous memory
old_buf = buf;
if (count==0){
buf_ = nullptr;
}else {
buf_ = xbt_malloc(count*datatype->size());
- if ((datatype->flags() & DT_FLAG_DERIVED) && ((flags & SEND) != 0)) {
+ if ((datatype->flags() & DT_FLAG_DERIVED) && ((flags & MPI_REQ_SEND) != 0)) {
datatype->serialize(old_buf, buf_, count);
}
}
truncated_ = 0;
real_size_ = 0;
real_tag_ = 0;
- if (flags & PERSISTENT)
+ if (flags & MPI_REQ_PERSISTENT)
refcount_ = 1;
else
refcount_ = 0;
{
return new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, simgrid::s4u::this_actor::get_pid(),
- comm->group()->actor(dst)->get_pid(), tag, comm, PERSISTENT | SEND | PREPARED);
+ comm->group()->actor(dst)->get_pid(), tag, comm,
+ MPI_REQ_PERSISTENT | MPI_REQ_SEND | MPI_REQ_PREPARED);
}
MPI_Request Request::ssend_init(void *buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm)
{
return new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, simgrid::s4u::this_actor::get_pid(),
- comm->group()->actor(dst)->get_pid(), tag, comm, PERSISTENT | SSEND | SEND | PREPARED);
+ comm->group()->actor(dst)->get_pid(), tag, comm,
+ MPI_REQ_PERSISTENT | MPI_REQ_SSEND | MPI_REQ_SEND | MPI_REQ_PREPARED);
}
MPI_Request Request::isend_init(void *buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm)
{
return new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, simgrid::s4u::this_actor::get_pid(),
- comm->group()->actor(dst)->get_pid(), tag, comm, PERSISTENT | ISEND | SEND | PREPARED);
+ comm->group()->actor(dst)->get_pid(), tag, comm,
+ MPI_REQ_PERSISTENT | MPI_REQ_ISEND | MPI_REQ_SEND | MPI_REQ_PREPARED);
}
{
MPI_Request request = nullptr; /* MC needs the comm to be set to nullptr during the call */
if(op==MPI_OP_NULL){
- request =
- new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, comm->group()->actor(src)->get_pid(),
- comm->group()->actor(dst)->get_pid(), tag, comm, RMA | NON_PERSISTENT | ISEND | SEND | PREPARED);
+ request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, comm->group()->actor(src)->get_pid(),
+ comm->group()->actor(dst)->get_pid(), tag, comm,
+ MPI_REQ_RMA | MPI_REQ_NON_PERSISTENT | MPI_REQ_ISEND | MPI_REQ_SEND | MPI_REQ_PREPARED);
}else{
request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, comm->group()->actor(src)->get_pid(),
comm->group()->actor(dst)->get_pid(), tag, comm,
- RMA | NON_PERSISTENT | ISEND | SEND | PREPARED | ACCUMULATE);
+ MPI_REQ_RMA | MPI_REQ_NON_PERSISTENT | MPI_REQ_ISEND | MPI_REQ_SEND | MPI_REQ_PREPARED |
+ MPI_REQ_ACCUMULATE);
request->op_ = op;
}
return request;
{
return new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype,
src == MPI_ANY_SOURCE ? MPI_ANY_SOURCE : comm->group()->actor(src)->get_pid(),
- simgrid::s4u::this_actor::get_pid(), tag, comm, PERSISTENT | RECV | PREPARED);
+ simgrid::s4u::this_actor::get_pid(), tag, comm,
+ MPI_REQ_PERSISTENT | MPI_REQ_RECV | MPI_REQ_PREPARED);
}
MPI_Request Request::rma_recv_init(void *buf, int count, MPI_Datatype datatype, int src, int dst, int tag, MPI_Comm comm,
MPI_Request request = nullptr; /* MC needs the comm to be set to nullptr during the call */
if(op==MPI_OP_NULL){
request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, comm->group()->actor(src)->get_pid(),
- comm->group()->actor(dst)->get_pid(), tag, comm, RMA | NON_PERSISTENT | RECV | PREPARED);
+ comm->group()->actor(dst)->get_pid(), tag, comm,
+ MPI_REQ_RMA | MPI_REQ_NON_PERSISTENT | MPI_REQ_RECV | MPI_REQ_PREPARED);
}else{
request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, comm->group()->actor(src)->get_pid(),
comm->group()->actor(dst)->get_pid(), tag, comm,
- RMA | NON_PERSISTENT | RECV | PREPARED | ACCUMULATE);
+ MPI_REQ_RMA | MPI_REQ_NON_PERSISTENT | MPI_REQ_RECV | MPI_REQ_PREPARED | MPI_REQ_ACCUMULATE);
request->op_ = op;
}
return request;
{
return new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype,
src == MPI_ANY_SOURCE ? MPI_ANY_SOURCE : comm->group()->actor(src)->get_pid(),
- simgrid::s4u::this_actor::get_pid(), tag, comm, PERSISTENT | RECV | PREPARED);
+ simgrid::s4u::this_actor::get_pid(), tag, comm,
+ MPI_REQ_PERSISTENT | MPI_REQ_RECV | MPI_REQ_PREPARED);
}
MPI_Request Request::isend(void *buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm)
{
MPI_Request request = nullptr; /* MC needs the comm to be set to nullptr during the call */
request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, simgrid::s4u::this_actor::get_pid(),
- comm->group()->actor(dst)->get_pid(), tag, comm, NON_PERSISTENT | ISEND | SEND);
+ comm->group()->actor(dst)->get_pid(), tag, comm,
+ MPI_REQ_NON_PERSISTENT | MPI_REQ_ISEND | MPI_REQ_SEND);
request->start();
return request;
}
{
MPI_Request request = nullptr; /* MC needs the comm to be set to nullptr during the call */
request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, simgrid::s4u::this_actor::get_pid(),
- comm->group()->actor(dst)->get_pid(), tag, comm, NON_PERSISTENT | ISEND | SSEND | SEND);
+ comm->group()->actor(dst)->get_pid(), tag, comm,
+ MPI_REQ_NON_PERSISTENT | MPI_REQ_ISEND | MPI_REQ_SSEND | MPI_REQ_SEND);
request->start();
return request;
}
MPI_Request request = nullptr; /* MC needs the comm to be set to nullptr during the call */
request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype,
src == MPI_ANY_SOURCE ? MPI_ANY_SOURCE : comm->group()->actor(src)->get_pid(),
- simgrid::s4u::this_actor::get_pid(), tag, comm, NON_PERSISTENT | RECV);
+ simgrid::s4u::this_actor::get_pid(), tag, comm, MPI_REQ_NON_PERSISTENT | MPI_REQ_RECV);
request->start();
return request;
}
{
MPI_Request request = nullptr; /* MC needs the comm to be set to nullptr during the call */
request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, simgrid::s4u::this_actor::get_pid(),
- comm->group()->actor(dst)->get_pid(), tag, comm, NON_PERSISTENT | SEND);
+ comm->group()->actor(dst)->get_pid(), tag, comm, MPI_REQ_NON_PERSISTENT | MPI_REQ_SEND);
request->start();
wait(&request, MPI_STATUS_IGNORE);
{
MPI_Request request = nullptr; /* MC needs the comm to be set to nullptr during the call */
request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, simgrid::s4u::this_actor::get_pid(),
- comm->group()->actor(dst)->get_pid(), tag, comm, NON_PERSISTENT | SSEND | SEND);
+ comm->group()->actor(dst)->get_pid(), tag, comm,
+ MPI_REQ_NON_PERSISTENT | MPI_REQ_SSEND | MPI_REQ_SEND);
request->start();
wait(&request,MPI_STATUS_IGNORE);
smx_mailbox_t mailbox;
xbt_assert(action_ == nullptr, "Cannot (re-)start unfinished communication");
- flags_ &= ~PREPARED;
- flags_ &= ~FINISHED;
+ flags_ &= ~MPI_REQ_PREPARED;
+ flags_ &= ~MPI_REQ_FINISHED;
refcount_++;
- if ((flags_ & RECV) != 0) {
+ if ((flags_ & MPI_REQ_RECV) != 0) {
this->print_request("New recv");
simgrid::smpi::Process* process = smpi_process_remote(simgrid::s4u::Actor::by_pid(dst_));
int async_small_thresh = simgrid::config::get_value<int>("smpi/async-small-thresh");
xbt_mutex_t mut = process->mailboxes_mutex();
- if (async_small_thresh != 0 || (flags_ & RMA) != 0)
+ if (async_small_thresh != 0 || (flags_ & MPI_REQ_RMA) != 0)
xbt_mutex_acquire(mut);
- if (async_small_thresh == 0 && (flags_ & RMA) == 0 ) {
+ if (async_small_thresh == 0 && (flags_ & MPI_REQ_RMA) == 0) {
mailbox = process->mailbox();
- }
- else if (((flags_ & RMA) != 0) || static_cast<int>(size_) < async_small_thresh) {
+ } else if (((flags_ & MPI_REQ_RMA) != 0) || static_cast<int>(size_) < async_small_thresh) {
//We have to check both mailboxes (because SSEND messages are sent to the large mbox).
//begin with the more appropriate one : the small one.
mailbox = process->mailbox_small();
process->replaying() ? &smpi_comm_null_copy_buffer_callback : smpi_comm_copy_data_callback, this, -1.0);
XBT_DEBUG("recv simcall posted");
- if (async_small_thresh != 0 || (flags_ & RMA) != 0 )
+ if (async_small_thresh != 0 || (flags_ & MPI_REQ_RMA) != 0)
xbt_mutex_release(mut);
} else { /* the RECV flag was not set, so this is a send */
simgrid::smpi::Process* process = smpi_process_remote(simgrid::s4u::Actor::by_pid(dst_));
this->print_request("New send");
void* buf = buf_;
- if ((flags_ & SSEND) == 0 &&
- ((flags_ & RMA) != 0 ||
+ if ((flags_ & MPI_REQ_SSEND) == 0 &&
+ ((flags_ & MPI_REQ_RMA) != 0 ||
static_cast<int>(size_) < simgrid::config::get_value<int>("smpi/send-is-detached-thresh"))) {
void *oldbuf = nullptr;
detached_ = 1;
//if we are giving back the control to the user without waiting for completion, we have to inject timings
double sleeptime = 0.0;
- if (detached_ != 0 || ((flags_ & (ISEND | SSEND)) != 0)) { // issend should be treated as isend
+ if (detached_ != 0 || ((flags_ & (MPI_REQ_ISEND | MPI_REQ_SSEND)) != 0)) { // issend should be treated as isend
// isend and send timings may be different
- sleeptime = ((flags_ & ISEND) != 0)
+ sleeptime = ((flags_ & MPI_REQ_ISEND) != 0)
? simgrid::s4u::Actor::self()->get_host()->extension<simgrid::smpi::Host>()->oisend(size_)
: simgrid::s4u::Actor::self()->get_host()->extension<simgrid::smpi::Host>()->osend(size_);
}
xbt_mutex_t mut=process->mailboxes_mutex();
- if (async_small_thresh != 0 || (flags_ & RMA) != 0)
+ if (async_small_thresh != 0 || (flags_ & MPI_REQ_RMA) != 0)
xbt_mutex_acquire(mut);
- if (not(async_small_thresh != 0 || (flags_ & RMA) != 0)) {
+ if (not(async_small_thresh != 0 || (flags_ & MPI_REQ_RMA) != 0)) {
mailbox = process->mailbox();
- } else if (((flags_ & RMA) != 0) || static_cast<int>(size_) < async_small_thresh) { // eager mode
+ } else if (((flags_ & MPI_REQ_RMA) != 0) || static_cast<int>(size_) < async_small_thresh) { // eager mode
mailbox = process->mailbox();
XBT_DEBUG("Is there a corresponding recv already posted in the large mailbox %p?", mailbox);
smx_activity_t action = simcall_comm_iprobe(mailbox, 1, &match_send, static_cast<void*>(this));
if (action == nullptr) {
- if ((flags_ & SSEND) == 0){
+ if ((flags_ & MPI_REQ_SSEND) == 0) {
mailbox = process->mailbox_small();
XBT_DEBUG("No, nothing in the large mailbox, message is to be sent on the small one %p", mailbox);
} else {
/* FIXME: detached sends are not traceable (action_ == nullptr) */
if (action_ != nullptr)
simcall_set_category(action_, TRACE_internal_smpi_get_category());
- if (async_small_thresh != 0 || ((flags_ & RMA)!=0))
+ if (async_small_thresh != 0 || ((flags_ & MPI_REQ_RMA) != 0))
xbt_mutex_release(mut);
}
}
Status::empty(status);
int flag = 1;
- if (((*request)->flags_ & PREPARED) == 0) {
+ if (((*request)->flags_ & MPI_REQ_PREPARED) == 0) {
if ((*request)->action_ != nullptr){
try{
flag = simcall_comm_test((*request)->action_);
if (flag) {
finish_wait(request,status);
nsleeps=1;//reset the number of sleeps we will do next time
- if (*request != MPI_REQUEST_NULL && ((*request)->flags_ & PERSISTENT) == 0)
+ if (*request != MPI_REQUEST_NULL && ((*request)->flags_ & MPI_REQ_PERSISTENT) == 0)
*request = MPI_REQUEST_NULL;
} else if (simgrid::config::get_value<bool>("smpi/grow-injected-times")) {
nsleeps++;
count++;
if (status != MPI_STATUSES_IGNORE)
status[i] = *pstat;
- if ((requests[i] != MPI_REQUEST_NULL) && (requests[i]->flags_ & NON_PERSISTENT))
+ if ((requests[i] != MPI_REQUEST_NULL) && (requests[i]->flags_ & MPI_REQ_NON_PERSISTENT))
requests[i] = MPI_REQUEST_NULL;
}
} else {
std::vector<int> map; /** Maps all matching comms back to their location in requests **/
for(i = 0; i < count; i++) {
- if ((requests[i] != MPI_REQUEST_NULL) && requests[i]->action_ && not(requests[i]->flags_ & PREPARED)) {
+ if ((requests[i] != MPI_REQUEST_NULL) && requests[i]->action_ && not(requests[i]->flags_ & MPI_REQ_PREPARED)) {
comms.push_back(requests[i]->action_);
map.push_back(i);
}
finish_wait(&requests[*index],status);
flag = 1;
nsleeps = 1;
- if (requests[*index] != MPI_REQUEST_NULL && (requests[*index]->flags_ & NON_PERSISTENT)) {
+ if (requests[*index] != MPI_REQUEST_NULL && (requests[*index]->flags_ & MPI_REQ_NON_PERSISTENT)) {
requests[*index] = MPI_REQUEST_NULL;
}
} else {
MPI_Status *pstat = status == MPI_STATUSES_IGNORE ? MPI_STATUS_IGNORE : &stat;
int flag=1;
for(int i=0; i<count; i++){
- if (requests[i] != MPI_REQUEST_NULL && not(requests[i]->flags_ & PREPARED)) {
+ if (requests[i] != MPI_REQUEST_NULL && not(requests[i]->flags_ & MPI_REQ_PREPARED)) {
if (test(&requests[i], pstat)!=1){
flag=0;
}else{
double maxrate = simgrid::config::get_value<double>("smpi/iprobe-cpu-usage");
MPI_Request request = new Request(nullptr, 0, MPI_CHAR,
source == MPI_ANY_SOURCE ? MPI_ANY_SOURCE : comm->group()->actor(source)->get_pid(),
- simgrid::s4u::this_actor::get_pid(), tag, comm, PERSISTENT | RECV);
+ simgrid::s4u::this_actor::get_pid(), tag, comm, MPI_REQ_PERSISTENT | MPI_REQ_RECV);
if (smpi_iprobe_sleep > 0) {
smx_activity_t iprobe_sleep = simcall_execution_start(
"iprobe", /* flops to executek*/ nsleeps * smpi_iprobe_sleep * speed * maxrate, /* priority */ 1.0,
boost::static_pointer_cast<simgrid::kernel::activity::CommImpl>(request->action_);
MPI_Request req = static_cast<MPI_Request>(sync_comm->src_data);
*flag = 1;
- if(status != MPI_STATUS_IGNORE && (req->flags_ & PREPARED) == 0) {
+ if (status != MPI_STATUS_IGNORE && (req->flags_ & MPI_REQ_PREPARED) == 0) {
status->MPI_SOURCE = comm->group()->rank(req->src_);
status->MPI_TAG = req->tag_;
status->MPI_ERROR = MPI_SUCCESS;
return;
}
- if (not((req->detached_ != 0) && ((req->flags_ & SEND) != 0)) && ((req->flags_ & PREPARED) == 0)) {
+ if (not((req->detached_ != 0) && ((req->flags_ & MPI_REQ_SEND) != 0)) && ((req->flags_ & MPI_REQ_PREPARED) == 0)) {
if(status != MPI_STATUS_IGNORE) {
int src = req->src_ == MPI_ANY_SOURCE ? req->real_src_ : req->src_;
status->MPI_SOURCE = req->comm_->group()->rank(src);
MPI_Datatype datatype = req->old_type_;
// FIXME Handle the case of a partial shared malloc.
- if (((req->flags_ & ACCUMULATE) != 0) ||
+ if (((req->flags_ & MPI_REQ_ACCUMULATE) != 0) ||
(datatype->flags() & DT_FLAG_DERIVED)) { // && (not smpi_is_shared(req->old_buf_))){
if (not smpi_process()->replaying() && smpi_privatize_global_variables != SmpiPrivStrategies::None &&
if(datatype->flags() & DT_FLAG_DERIVED){
// This part handles the problem of non-contignous memory the unserialization at the reception
- if((req->flags_ & RECV) && datatype->size()!=0)
+ if ((req->flags_ & MPI_REQ_RECV) && datatype->size() != 0)
datatype->unserialize(req->buf_, req->old_buf_, req->real_size_/datatype->size() , req->op_);
xbt_free(req->buf_);
- }else if(req->flags_ & RECV){//apply op on contiguous buffer for accumulate
- if(datatype->size()!=0){
- int n =req->real_size_/datatype->size();
- req->op_->apply(req->buf_, req->old_buf_, &n, datatype);
- }
- xbt_free(req->buf_);
+ } else if (req->flags_ & MPI_REQ_RECV) { // apply op on contiguous buffer for accumulate
+ if (datatype->size() != 0) {
+ int n = req->real_size_ / datatype->size();
+ req->op_->apply(req->buf_, req->old_buf_, &n, datatype);
+ }
+ xbt_free(req->buf_);
}
}
}
- if (TRACE_smpi_view_internals() && ((req->flags_ & RECV) != 0)){
+ if (TRACE_smpi_view_internals() && ((req->flags_ & MPI_REQ_RECV) != 0)) {
int rank = simgrid::s4u::this_actor::get_pid();
int src_traced = (req->src_ == MPI_ANY_SOURCE ? req->real_src_ : req->src_);
TRACE_smpi_recv(src_traced, rank,req->tag_);
}
unref(&(req->detached_sender_));
}
- if(req->flags_ & PERSISTENT)
+ if (req->flags_ & MPI_REQ_PERSISTENT)
req->action_ = nullptr;
- req->flags_ |= FINISHED;
+ req->flags_ |= MPI_REQ_FINISHED;
unref(request);
}
void Request::wait(MPI_Request * request, MPI_Status * status)
{
(*request)->print_request("Waiting");
- if ((*request)->flags_ & PREPARED) {
+ if ((*request)->flags_ & MPI_REQ_PREPARED) {
Status::empty(status);
return;
}
finish_wait(request,status);
- if (*request != MPI_REQUEST_NULL && (((*request)->flags_ & NON_PERSISTENT)!=0))
+ if (*request != MPI_REQUEST_NULL && (((*request)->flags_ & MPI_REQ_NON_PERSISTENT) != 0))
*request = MPI_REQUEST_NULL;
}
int *map = xbt_new(int, count);
XBT_DEBUG("Wait for one of %d", count);
for(int i = 0; i < count; i++) {
- if (requests[i] != MPI_REQUEST_NULL && not(requests[i]->flags_ & PREPARED) &&
- not(requests[i]->flags_ & FINISHED)) {
+ if (requests[i] != MPI_REQUEST_NULL && not(requests[i]->flags_ & MPI_REQ_PREPARED) &&
+ not(requests[i]->flags_ & MPI_REQ_FINISHED)) {
if (requests[i]->action_ != nullptr) {
XBT_DEBUG("Waiting any %p ", requests[i]);
intrusive_ptr_add_ref(requests[i]->action_.get());
size = 0; // so we free the dynar but don't do the waitany call
index = i;
finish_wait(&requests[i], status); // cleanup if refcount = 0
- if (requests[i] != MPI_REQUEST_NULL && (requests[i]->flags_ & NON_PERSISTENT))
+ if (requests[i] != MPI_REQUEST_NULL && (requests[i]->flags_ & MPI_REQ_NON_PERSISTENT))
requests[i] = MPI_REQUEST_NULL; // set to null
break;
}
index = map[i];
//in case of an accumulate, we have to wait the end of all requests to apply the operation, ordered correctly.
if ((requests[index] == MPI_REQUEST_NULL) ||
- (not((requests[index]->flags_ & ACCUMULATE) && (requests[index]->flags_ & RECV)))) {
+ (not((requests[index]->flags_ & MPI_REQ_ACCUMULATE) && (requests[index]->flags_ & MPI_REQ_RECV)))) {
finish_wait(&requests[index],status);
- if (requests[i] != MPI_REQUEST_NULL && (requests[i]->flags_ & NON_PERSISTENT))
+ if (requests[i] != MPI_REQUEST_NULL && (requests[i]->flags_ & MPI_REQ_NON_PERSISTENT))
requests[index] = MPI_REQUEST_NULL;
}
}
//tag invalid requests in the set
if (status != MPI_STATUSES_IGNORE) {
for (int c = 0; c < count; c++) {
- if (requests[c] == MPI_REQUEST_NULL || requests[c]->dst_ == MPI_PROC_NULL || (requests[c]->flags_ & PREPARED)) {
+ if (requests[c] == MPI_REQUEST_NULL || requests[c]->dst_ == MPI_PROC_NULL ||
+ (requests[c]->flags_ & MPI_REQ_PREPARED)) {
Status::empty(&status[c]);
} else if (requests[c]->src_ == MPI_PROC_NULL) {
Status::empty(&status[c]);
if (index == MPI_UNDEFINED)
break;
- if (requests[index] != MPI_REQUEST_NULL
- && (requests[index]->flags_ & RECV)
- && (requests[index]->flags_ & ACCUMULATE))
+ if (requests[index] != MPI_REQUEST_NULL && (requests[index]->flags_ & MPI_REQ_RECV) &&
+ (requests[index]->flags_ & MPI_REQ_ACCUMULATE))
accumulates.push_back(requests[index]);
- if (requests[index] != MPI_REQUEST_NULL && (requests[index]->flags_ & NON_PERSISTENT))
+ if (requests[index] != MPI_REQUEST_NULL && (requests[index]->flags_ & MPI_REQ_NON_PERSISTENT))
requests[index] = MPI_REQUEST_NULL;
}
if (status != MPI_STATUSES_IGNORE) {
if(status != MPI_STATUSES_IGNORE) {
status[index] = *pstat;
}
- if (requests[index] != MPI_REQUEST_NULL && (requests[index]->flags_ & NON_PERSISTENT))
- requests[index] = MPI_REQUEST_NULL;
+ if (requests[index] != MPI_REQUEST_NULL && (requests[index]->flags_ & MPI_REQ_NON_PERSISTENT))
+ requests[index] = MPI_REQUEST_NULL;
}else{
return MPI_UNDEFINED;
}
while ((var = cnst->get_variable(&elem))) {
kernel::resource::Action* action = static_cast<kernel::resource::Action*>(var->get_id());
- if (action->get_state() == kernel::resource::Action::State::running ||
- action->get_state() == kernel::resource::Action::State::ready ||
- action->get_state() == kernel::resource::Action::State::not_in_the_system) {
+ if (action->get_state() == kernel::resource::Action::State::INITED ||
+ action->get_state() == kernel::resource::Action::State::STARTED ||
+ action->get_state() == kernel::resource::Action::State::IGNORED) {
action->set_finish_time(date);
- action->set_state(kernel::resource::Action::State::failed);
+ action->set_state(kernel::resource::Action::State::FAILED);
}
}
}
// FIXME: sleep variables should not consume 1.0 in System::expand()
action->set_max_duration(duration);
action->suspended_ = kernel::resource::Action::SuspendStates::sleeping;
- if (duration < 0) { // NO_MAX_DURATION
- /* Move to the *end* of the corresponding action set. This convention is used to speed up update_resource_state */
- simgrid::xbt::intrusive_erase(*action->get_state_set(), *action);
- action->state_set_ = &static_cast<CpuCas01Model*>(get_model())->cpuRunningActionSetThatDoesNotNeedBeingChecked_;
- action->get_state_set()->push_back(*action);
- }
+ if (duration < 0) // NO_MAX_DURATION
+ action->set_state(simgrid::kernel::resource::Action::State::IGNORED);
get_model()->get_maxmin_system()->update_variable_weight(action->get_variable(), 0.0);
if (get_model()->get_update_algorithm() == kernel::resource::Model::UpdateAlgo::Lazy) { // remove action from the heap
~CpuCas01Model() override;
Cpu *createCpu(simgrid::s4u::Host *host, std::vector<double> *speedPerPstate, int core) override;
- kernel::resource::Action::StateSet cpuRunningActionSetThatDoesNotNeedBeingChecked_;
};
/************
CpuAction* action = static_cast<CpuAction*>(get_action_heap().pop());
XBT_CDEBUG(surf_kernel, "Something happened to action %p", action);
- action->finish(kernel::resource::Action::State::done);
+ action->finish(kernel::resource::Action::State::FINISHED);
XBT_CDEBUG(surf_kernel, "Action %p finished", action);
}
if (TRACE_is_enabled()) {
//defining the last timestamp that we can safely dump to trace file
//without losing the event ascending order (considering all CPU's)
double smaller = -1;
- for (kernel::resource::Action const& action : *get_running_action_set()) {
+ for (kernel::resource::Action const& action : *get_started_action_set()) {
if (smaller < 0 || action.get_last_update() < smaller)
smaller = action.get_last_update();
}
void CpuModel::update_actions_state_full(double now, double delta)
{
- for (auto it = std::begin(*get_running_action_set()); it != std::end(*get_running_action_set());) {
+ for (auto it = std::begin(*get_started_action_set()); it != std::end(*get_started_action_set());) {
CpuAction& action = static_cast<CpuAction&>(*it);
++it; // increment iterator here since the following calls to action.finish() may invalidate it
if (((action.get_remains_no_update() <= 0) && (action.get_variable()->get_weight() > 0)) ||
((action.get_max_duration() != NO_MAX_DURATION) && (action.get_max_duration() <= 0))) {
- action.finish(kernel::resource::Action::State::done);
+ action.finish(kernel::resource::Action::State::FINISHED);
}
}
}
void CpuAction::update_remains_lazy(double now)
{
- xbt_assert(get_state_set() == get_model()->get_running_action_set(),
+ xbt_assert(get_state_set() == get_model()->get_started_action_set(),
"You're updating an action that is not running.");
xbt_assert(get_priority() > 0, "You're updating an action that seems suspended.");
while (not get_action_heap().empty() && double_equals(get_action_heap().top_date(), now, sg_surf_precision)) {
CpuTiAction* action = static_cast<CpuTiAction*>(get_action_heap().pop());
XBT_DEBUG("Action %p: finish", action);
- action->finish(kernel::resource::Action::State::done);
+ action->finish(kernel::resource::Action::State::FINISHED);
/* update remaining amount of all actions */
action->cpu_->update_remaining_amount(surf_get_clock());
}
/* put all action running on cpu to failed */
for (CpuTiAction& action : action_set_) {
- if (action.get_state() == kernel::resource::Action::State::running ||
- action.get_state() == kernel::resource::Action::State::ready ||
- action.get_state() == kernel::resource::Action::State::not_in_the_system) {
+ if (action.get_state() == kernel::resource::Action::State::INITED ||
+ action.get_state() == kernel::resource::Action::State::STARTED ||
+ action.get_state() == kernel::resource::Action::State::IGNORED) {
action.set_finish_time(date);
- action.set_state(kernel::resource::Action::State::failed);
+ action.set_state(kernel::resource::Action::State::FAILED);
get_model()->get_action_heap().remove(&action);
}
}
sum_priority_ = 0.0;
for (CpuTiAction const& action : action_set_) {
/* action not running, skip it */
- if (action.get_state_set() != surf_cpu_model_pm->get_running_action_set())
+ if (action.get_state_set() != surf_cpu_model_pm->get_started_action_set())
continue;
/* bogus priority, skip it */
for (CpuTiAction& action : action_set_) {
double min_finish = -1;
/* action not running, skip it */
- if (action.get_state_set() != surf_cpu_model_pm->get_running_action_set())
+ if (action.get_state_set() != surf_cpu_model_pm->get_started_action_set())
continue;
/* verify if the action is really running on cpu */
XBT_DEBUG("Flops total: %f, Last update %f", area_total, last_update_);
for (CpuTiAction& action : action_set_) {
/* action not running, skip it */
- if (action.get_state_set() != get_model()->get_running_action_set())
+ if (action.get_state_set() != get_model()->get_started_action_set())
continue;
/* bogus priority, skip it */
action->set_max_duration(duration);
action->suspended_ = kernel::resource::Action::SuspendStates::sleeping;
- if (duration == NO_MAX_DURATION) {
- /* Move to the *end* of the corresponding action set. This convention is used to speed up update_resource_state */
- simgrid::xbt::intrusive_erase(*action->get_state_set(), *action);
- action->state_set_ = &static_cast<CpuTiModel*>(get_model())->runningActionSetThatDoesNotNeedBeingChecked_;
- action->get_state_set()->push_back(*action);
- }
+ if (duration < 0) // NO_MAX_DURATION
+ action->set_state(simgrid::kernel::resource::Action::State::IGNORED);
action_set_.push_back(*action);
void CpuTiAction::cancel()
{
- this->set_state(Action::State::failed);
+ this->set_state(Action::State::FAILED);
get_model()->get_action_heap().remove(this);
cpu_->set_modified(true);
}
double next_occuring_event(double now) override;
void update_actions_state(double now, double delta) override;
- kernel::resource::Action::StateSet runningActionSetThatDoesNotNeedBeingChecked_;
CpuTiList modified_cpus_;
};
// no need to communicate anymore
// assume that flows that reached max_duration have remaining of 0
XBT_DEBUG("Action %p finished", action);
- action->finish(Action::State::done);
+ action->finish(Action::State::FINISHED);
get_action_heap().remove(action);
}
}
void NetworkCm02Model::update_actions_state_full(double now, double delta)
{
- for (auto it = std::begin(*get_running_action_set()); it != std::end(*get_running_action_set());) {
+ for (auto it = std::begin(*get_started_action_set()); it != std::end(*get_started_action_set());) {
NetworkCm02Action& action = static_cast<NetworkCm02Action&>(*it);
++it; // increment iterator here since the following calls to action.finish() may invalidate it
XBT_DEBUG("Something happened to action %p", &action);
if (((action.get_remains() <= 0) && (action.get_variable()->get_weight() > 0)) ||
((action.get_max_duration() > NO_MAX_DURATION) && (action.get_max_duration() <= 0))) {
- action.finish(Action::State::done);
+ action.finish(Action::State::FINISHED);
}
}
}
while ((var = get_constraint()->get_variable(&elem))) {
Action* action = static_cast<Action*>(var->get_id());
- if (action->get_state() == Action::State::running || action->get_state() == Action::State::ready) {
+ if (action->get_state() == Action::State::INITED || action->get_state() == Action::State::STARTED) {
action->set_finish_time(now);
- action->set_state(Action::State::failed);
+ action->set_state(Action::State::FAILED);
}
}
}
if ((get_remains_no_update() <= 0 && (get_variable()->get_weight() > 0)) ||
((max_duration > NO_MAX_DURATION) && (max_duration <= 0))) {
- finish(Action::State::done);
+ finish(Action::State::FINISHED);
get_model()->get_action_heap().remove(this);
}
double NetworkConstantModel::next_occuring_event(double /*now*/)
{
double min = -1.0;
- for (kernel::resource::Action const& action : *get_running_action_set()) {
+ for (kernel::resource::Action const& action : *get_started_action_set()) {
const NetworkConstantAction& net_action = static_cast<const NetworkConstantAction&>(action);
if (net_action.latency_ > 0 && (min < 0 || net_action.latency_ < min))
min = net_action.latency_;
void NetworkConstantModel::update_actions_state(double /*now*/, double delta)
{
- for (auto it = std::begin(*get_running_action_set()); it != std::end(*get_running_action_set());) {
+ for (auto it = std::begin(*get_started_action_set()); it != std::end(*get_started_action_set());) {
NetworkConstantAction& action = static_cast<NetworkConstantAction&>(*it);
++it; // increment iterator here since the following calls to action.finish() may invalidate it
if (action.latency_ > 0) {
if ((action.get_remains_no_update() <= 0) ||
((action.get_max_duration() != NO_MAX_DURATION) && (action.get_max_duration() <= 0))) {
- action.finish(kernel::resource::Action::State::done);
+ action.finish(kernel::resource::Action::State::FINISHED);
}
}
}
{
latency_ = latency;
if (latency_ <= 0.0)
- NetworkConstantAction::set_state(Action::State::done);
+ NetworkConstantAction::set_state(Action::State::FINISHED);
};
NetworkConstantAction::~NetworkConstantAction() = default;
using simgrid::kernel::resource::IBNode;
using simgrid::kernel::resource::NetworkIBModel;
- if (action->get_state() != simgrid::kernel::resource::Action::State::done)
+ if (action->get_state() != simgrid::kernel::resource::Action::State::FINISHED)
return;
std::pair<IBNode*,IBNode*> pair = ((NetworkIBModel*)surf_network_model)->active_comms[action];
XBT_DEBUG("IB callback - action %p finished", action);
{
double minRes = Model::next_occuring_event_full(now);
- for (Action const& action : *get_running_action_set()) {
+ for (Action const& action : *get_started_action_set()) {
const NetworkAction& net_action = static_cast<const NetworkAction&>(action);
if (net_action.latency_ > 0)
minRes = (minRes < 0) ? net_action.latency_ : std::min(minRes, net_action.latency_);
XBT_DEBUG("ns3_next_occuring_event");
//get the first relevant value from the running_actions list
- if (not get_running_action_set()->size() || now == 0.0)
+ if (not get_started_action_set()->size() || now == 0.0)
return -1.0;
else
do {
static std::vector<std::string> socket_to_destroy;
/* If there are no running flows, advance the NS3 simulator and return */
- if (get_running_action_set()->empty()) {
+ if (get_started_action_set()->empty()) {
while(double_positive(now - ns3::Simulator::Now().GetSeconds(), sg_surf_precision))
ns3_simulator(now-ns3::Simulator::Now().GetSeconds());
XBT_DEBUG("Processing socket %p (action %p)",sgFlow,action);
action->set_remains(action->get_cost() - sgFlow->sentBytes_);
- if (TRACE_is_enabled() && action->get_state() == kernel::resource::Action::State::running) {
+ if (TRACE_is_enabled() && action->get_state() == kernel::resource::Action::State::STARTED) {
double data_delta_sent = sgFlow->sentBytes_ - action->lastSent_;
std::vector<LinkImpl*> route = std::vector<LinkImpl*>();
if(sgFlow->finished_){
socket_to_destroy.push_back(ns3Socket);
XBT_DEBUG("Destroy socket %p of action %p", ns3Socket.c_str(), action);
- action->finish(kernel::resource::Action::State::done);
+ action->finish(kernel::resource::Action::State::FINISHED);
} else {
XBT_DEBUG("Socket %p sent %u bytes out of %u (%u remaining)", ns3Socket.c_str(), sgFlow->sentBytes_,
sgFlow->totalBytes_, sgFlow->remaining_);
double HostL07Model::next_occuring_event(double now)
{
double min = HostModel::next_occuring_event_full(now);
- for (kernel::resource::Action const& action : *get_running_action_set()) {
+ for (kernel::resource::Action const& action : *get_started_action_set()) {
const L07Action& net_action = static_cast<const L07Action&>(action);
if (net_action.latency_ > 0 && (min < 0 || net_action.latency_ < min)) {
min = net_action.latency_;
void HostL07Model::update_actions_state(double /*now*/, double delta)
{
- for (auto it = std::begin(*get_running_action_set()); it != std::end(*get_running_action_set());) {
+ for (auto it = std::begin(*get_started_action_set()); it != std::end(*get_started_action_set());) {
L07Action& action = static_cast<L07Action&>(*it);
++it; // increment iterator here since the following calls to action.finish() may invalidate it
if (action.latency_ > 0) {
if (((action.get_remains() <= 0) && (action.get_variable()->get_weight() > 0)) ||
((action.get_max_duration() > NO_MAX_DURATION) && (action.get_max_duration() <= 0))) {
- action.finish(kernel::resource::Action::State::done);
+ action.finish(kernel::resource::Action::State::FINISHED);
} else {
/* Need to check that none of the model has failed */
int i = 0;
void* constraint_id = cnst->get_id();
if (static_cast<simgrid::kernel::resource::Resource*>(constraint_id)->is_off()) {
XBT_DEBUG("Action (%p) Failed!!", &action);
- action.finish(kernel::resource::Action::State::failed);
+ action.finish(kernel::resource::Action::State::FAILED);
break;
}
cnst = action.get_variable()->get_constraint(i);
void StorageN11Model::update_actions_state(double /*now*/, double delta)
{
- for (auto it = std::begin(*get_running_action_set()); it != std::end(*get_running_action_set());) {
+ for (auto it = std::begin(*get_started_action_set()); it != std::end(*get_started_action_set());) {
StorageAction& action = static_cast<StorageAction&>(*it);
++it; // increment iterator here since the following calls to action.finish() may invalidate it
action.update_remains(lrint(action.get_variable()->get_value() * delta));
if (((action.get_remains_no_update() <= 0) && (action.get_variable()->get_weight() > 0)) ||
((action.get_max_duration() > NO_MAX_DURATION) && (action.get_max_duration() <= 0))) {
- action.finish(kernel::resource::Action::State::done);
+ action.finish(kernel::resource::Action::State::FINISHED);
}
}
}
void StorageN11Action::cancel()
{
- set_state(Action::State::failed);
+ set_state(Action::State::FAILED);
}
void StorageN11Action::suspend()
simgrid::kernel::resource::Action* surf_model_extract_done_action_set(simgrid::kernel::resource::Model* model)
{
- return ActionListExtract(model->get_done_action_set());
+ return ActionListExtract(model->get_finished_action_set());
}
simgrid::kernel::resource::Action* surf_model_extract_failed_action_set(simgrid::kernel::resource::Model* model)
int surf_model_running_action_set_size(simgrid::kernel::resource::Model* model)
{
- return model->get_running_action_set()->size();
+ return model->get_started_action_set()->size();
}
void surf_cpu_action_set_bound(simgrid::kernel::resource::Action* action, double bound)
static const char* string_action(simgrid::kernel::resource::Action::State state)
{
switch (state) {
- case simgrid::kernel::resource::Action::State::ready:
- return "SURF_ACTION_READY";
- case simgrid::kernel::resource::Action::State::running:
+ case simgrid::kernel::resource::Action::State::INITED:
+ return "SURF_ACTION_INITED";
+ case simgrid::kernel::resource::Action::State::STARTED:
return "SURF_ACTION_RUNNING";
- case simgrid::kernel::resource::Action::State::failed:
+ case simgrid::kernel::resource::Action::State::FAILED:
return "SURF_ACTION_FAILED";
- case simgrid::kernel::resource::Action::State::done:
+ case simgrid::kernel::resource::Action::State::FINISHED:
return "SURF_ACTION_DONE";
- case simgrid::kernel::resource::Action::State::not_in_the_system:
- return "SURF_ACTION_NOT_IN_THE_SYSTEM";
+ case simgrid::kernel::resource::Action::State::IGNORED:
+ return "SURF_ACTION_IGNORED";
default:
return "INVALID STATE";
}
action.unref();
}
- action_list = surf_cpu_model_pm->get_done_action_set();
+ action_list = surf_cpu_model_pm->get_finished_action_set();
while (not action_list->empty()) {
simgrid::kernel::resource::Action& action = action_list->front();
XBT_INFO(" CPU Done action");
action.unref();
}
- action_list = surf_network_model->get_done_action_set();
+ action_list = surf_network_model->get_finished_action_set();
while (not action_list->empty()) {
simgrid::kernel::resource::Action& action = action_list->front();
XBT_INFO(" Network Done action");
}
} while (
- (surf_network_model->get_running_action_set()->size() || surf_cpu_model_pm->get_running_action_set()->size()) &&
+ (surf_network_model->get_started_action_set()->size() || surf_cpu_model_pm->get_started_action_set()->size()) &&
surf_solve(-1.0) >= 0.0);
XBT_DEBUG("Simulation Terminated");