Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Add the affinity database of each task
authorTakahiro Hirofuchi <t.hirofuchi+sg@aist.go.jp>
Mon, 28 Oct 2013 16:39:22 +0000 (17:39 +0100)
committerTakahiro Hirofuchi <t.hirofuchi+sg@aist.go.jp>
Mon, 28 Oct 2013 16:39:22 +0000 (17:39 +0100)
The SURF layer has affinity information only related to the current host
of a task. Affinity information on other hosts (i.e., the locations
where the task may move to) is maintained in the MSG layer.

src/msg/msg_gos.c
src/msg/msg_private.h
src/msg/msg_task.c
src/simix/smx_host.c
src/surf/cpu_cas01.c

index 8042c21..9c10e7c 100644 (file)
@@ -82,12 +82,15 @@ msg_error_t MSG_parallel_task_execute(msg_task_t task)
                                                        1.0, -1.0);
       XBT_DEBUG("Parallel execution action created: %p", simdata->compute);
     } else {
+      unsigned long affinity_mask = (unsigned long) xbt_dict_get_or_null_ext(simdata->affinity_mask_db, (char *) p_simdata->m_host, sizeof(msg_host_t));
+      XBT_INFO("execute %s@%s with affinity(0x%04lx)", MSG_task_get_name(task), MSG_host_get_name(p_simdata->m_host), affinity_mask);
+
       simdata->compute = simcall_host_execute(task->name,
                                               p_simdata->m_host,
                                               simdata->computation_amount,
                                               simdata->priority,
                                               simdata->bound,
-                                              simdata->affinity_mask
+                                              affinity_mask
                                               );
 
     }
index 611a026..c2c2e07 100644 (file)
@@ -33,7 +33,10 @@ typedef struct simdata_task {
   double priority;
   double bound; /* Capping for CPU resource */
   double rate;  /* Capping for network resource */
-  unsigned long affinity_mask; /* CPU affinity */
+
+  /* CPU affinity database of this task */
+  xbt_dict_t affinity_mask_db; /* smx_host_t host => unsigned long mask */
+
   int isused;  /* Indicates whether the task is used in SIMIX currently */
   int host_nb;                  /* ==0 if sequential task; parallel task if not */
   /*******  Parallel Tasks Only !!!! *******/
index f41ad31..d751912 100644 (file)
@@ -5,6 +5,7 @@
  * under the terms of the license (GNU LGPL) which comes with this package. */
 
 #include "msg_private.h"
+#include "simix/smx_private.h"
 #include "xbt/sysdep.h"
 #include "xbt/log.h"
 
@@ -63,7 +64,7 @@ msg_task_t MSG_task_create(const char *name, double compute_duration,
   simdata->source = NULL;
   simdata->priority = 1.0;
   simdata->bound = 0;
-  simdata->affinity_mask = 0;
+  simdata->affinity_mask_db = xbt_dict_new_homogeneous(NULL);
   simdata->rate = -1.0;
   simdata->isused = 0;
 
@@ -285,6 +286,8 @@ msg_error_t MSG_task_destroy(msg_task_t task)
   /* parallel tasks only */
   xbt_free(task->simdata->host_list);
 
+  xbt_dict_free(&task->simdata->affinity_mask_db);
+
   /* free main structures */
   xbt_free(task->simdata);
   xbt_free(task);
@@ -480,7 +483,39 @@ void MSG_task_set_affinity(msg_task_t task, msg_host_t host, unsigned long mask)
   xbt_assert(task, "Invalid parameter");
   xbt_assert(task->simdata, "Invalid parameter");
 
-  task->simdata->affinity_mask = mask;
-  if (task->simdata->compute)
+  if (mask == 0) {
+    /* 0 means clear */
+    xbt_dict_remove_ext(task->simdata->affinity_mask_db, (char *) host, sizeof(host));
+  } else
+    xbt_dict_set_ext(task->simdata->affinity_mask_db, (char *) host, sizeof(host), (void *) mask, NULL);
+
+  /* We set affinity data of this task. If the task is being executed, we
+   * actually change the affinity setting of the task. Otherwise, this change
+   * will be applied when the task is executed. */
+
+  if (!task->simdata->compute) {
+    /* task is not yet executed */
+    XBT_INFO("set affinity(0x%04lx@%s) for %s (not active now)", mask, MSG_host_get_name(host), MSG_task_get_name(task));
+    return;
+  }
+
+  {
+    smx_action_t compute = task->simdata->compute;
+    msg_host_t host_now = compute->execution.host;  // simix_private.h is necessary
+    if (host_now != host) {
+      /* task is not yet executed on this host */
+      XBT_INFO("set affinity(0x%04lx@%s) for %s (not active now)", mask, MSG_host_get_name(host), MSG_task_get_name(task));
+      return;
+    }
+
+    /* task is being executed on this host. so change the affinity now */
+    {
+      /* check it works. remove me if it works. */
+      unsigned long affinity_mask = (unsigned long) xbt_dict_get_or_null_ext(task->simdata->affinity_mask_db, (char *) host, sizeof(msg_host_t));
+      xbt_assert(affinity_mask == mask);
+    }
+
+    XBT_INFO("set affinity(0x%04lx@%s) for %s", mask, MSG_host_get_name(host), MSG_task_get_name(task));
     simcall_host_execution_set_affinity(task->simdata->compute, host, mask);
+  }
 }
index ddbe1b4..3c32722 100644 (file)
@@ -432,8 +432,11 @@ smx_action_t SIMIX_host_execute(const char *name,
     else
       ws_model->set_bound(action->execution.surf_exec, bound);
 
-    if (affinity_mask != 0)
+    if (affinity_mask != 0) {
+      /* just a double check to confirm that this host is the host where this task is running. */
+      xbt_assert(action->execution.host == host);
       ws_model->set_affinity(action->execution.surf_exec, host, affinity_mask);
+    }
   }
 
   XBT_DEBUG("Create execute action %p", action);
@@ -587,8 +590,13 @@ void SIMIX_pre_host_execution_set_affinity(smx_simcall_t simcall,
 void SIMIX_host_execution_set_affinity(smx_action_t action, smx_host_t host, unsigned long mask){
   surf_model_t ws_model = get_ws_model_from_action(action);
 
-  if(action->execution.surf_exec)
+  xbt_assert(action->type == SIMIX_ACTION_EXECUTE);
+
+  if (action->execution.surf_exec) {
+    /* just a double check to confirm that this host is the host where this task is running. */
+    xbt_assert(action->execution.host == host);
     ws_model->set_affinity(action->execution.surf_exec, host, mask);
+  }
 }
 
 void SIMIX_pre_host_execution_wait(smx_simcall_t simcall, smx_action_t action){
index 4b9d9cc..340fae2 100644 (file)
@@ -272,6 +272,9 @@ static void cpu_update_resource_state(void *id,
  * How should the system formulate constraint problems for an affinity to
  * multiple cores?
  *
+ * The cpu argument must be the host where the task is being executed. The
+ * action object does not have the information about the location where the
+ * action is being executed.
  */
 static void cpu_action_set_affinity(surf_action_t action, void *cpu, unsigned long mask)
 {
@@ -310,6 +313,15 @@ static void cpu_action_set_affinity(surf_action_t action, void *cpu, unsigned lo
 
     unsigned long has_affinity = (1UL << i) & mask;
     if (has_affinity) {
+      /* This function only accepts an affinity setting on the host where the
+       * task is now running. In future, a task might move to another host.
+       * But, at this moment, this function cannot take an affinity setting on
+       * that future host.
+       *
+       * It might be possible to extend the code to allow this function to
+       * accept affinity settings on a future host. We might be able to assign
+       * zero to elem->value to maintain such inactive affinity settings in the
+       * system. But, this will make the system complex. */
       XBT_INFO("set affinity %p to cpu-%lu@%s", action, i, CPU->generic_resource.name);
       lmm_expand(cpu_model->model_private->maxmin_system, CPU->constraint_core[i], var_obj, 1.0);
     }