1 # Copyright (c) 2010-2023. The SimGrid Team. All rights reserved.
3 # This program is free software; you can redistribute it and/or modify it
4 # under the terms of the license (GNU LGPL) which comes with this package.
7 This example implements the following scenario:
8 - Multiple workers consume jobs (Job) from a shared mailbox (worker)
9 - A client first dispatches several jobs (with a simulated 'cost' - i.e. time the worker will 'process' the job)
10 - The client then waits for all job results for a maximum time set by the 'wait timeout' (Comm.wait_all_for)
11 - The client then displays the status of individual jobs.
15 from argparse import ArgumentParser
16 from dataclasses import dataclass
17 from typing import List
18 from uuid import uuid4
21 from simgrid import Actor, Comm, Engine, Host, Mailbox, PyGetAsync, this_actor
24 SIMULATED_JOB_SIZE_BYTES = 1024
25 SIMULATED_RESULT_SIZE_BYTES = 1024 * 1024
28 def parse_requests(requests_str: str) -> List[float]:
29 return [float(item.strip()) for item in requests_str.split(",")]
32 def create_parser() -> ArgumentParser:
33 parser = ArgumentParser()
38 help='path to the platform description'
44 help="number of worker actors to start"
50 help="duration of individual jobs sent to the workers by the client"
56 help="number of seconds before the client gives up waiting for results and aborts the simulation"
65 result_mailbox: Mailbox
68 def worker(worker_id: str):
69 this_actor.info(f"{worker_id} started")
70 mailbox: Mailbox = Mailbox.by_name("worker")
72 job: Job = mailbox.get()
73 this_actor.info(f"{worker_id} working on {job.job_id} (will take {job.duration}s to complete)")
74 this_actor.sleep_for(job.duration)
75 job.result_mailbox.put(f"{worker_id}", SIMULATED_RESULT_SIZE_BYTES)
82 async_data: PyGetAsync
85 def complete(self) -> bool:
86 return self.result_comm.test()
89 def status(self) -> str:
90 return "complete" if self.complete else "pending"
93 def client(client_id: str, jobs: List[float], wait_timeout: float):
94 worker_mailbox: Mailbox = Mailbox.by_name("worker")
95 this_actor.info(f"{client_id} started")
96 async_job_results: list[AsyncJobResult] = []
97 for job_idx, job_duration in enumerate(jobs):
98 result_mailbox: Mailbox = Mailbox.by_name(str(uuid4()))
99 job = Job(job_id=f"job-{job_idx}", duration=job_duration, result_mailbox=result_mailbox)
100 out_comm = worker_mailbox.put_init(Job(
101 job_id=f"job-{job_idx}",
102 duration=job_duration,
103 result_mailbox=result_mailbox
104 ), SIMULATED_JOB_SIZE_BYTES)
106 result_comm, async_data = result_mailbox.get_async()
107 async_job_results.append(AsyncJobResult(
109 result_comm=result_comm,
110 async_data=async_data
112 this_actor.info(f"awaiting results for all jobs (timeout={wait_timeout}s)")
113 completed_comms = Comm.wait_all_for([entry.result_comm for entry in async_job_results], wait_timeout)
114 logger = this_actor.warning if completed_comms < len(async_job_results) else this_actor.info
115 logger(f"received {completed_comms}/{len(async_job_results)} results")
116 for result in async_job_results:
117 this_actor.info(f"{result.job.job_id}"
118 f" status={result.status}"
119 f" result_payload={result.async_data.get() if result.complete else ''}")
123 settings = create_parser().parse_known_args()[0]
125 e.load_platform(settings.platform)
126 Actor.create("client", Host.by_name("Tremblay"), client, "client", settings.jobs, settings.wait_timeout)
127 for worker_idx in range(settings.workers):
128 Actor.create("worker", Host.by_name("Ruby"), worker, f"worker-{worker_idx}").daemonize()
132 if __name__ == "__main__":