Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Add remaining Comm synchronisation Python bindings
[simgrid.git] / examples / python / comm-waitallfor / comm-waitallfor.py
1 # Copyright (c) 2010-2022. The SimGrid Team. All rights reserved.
2 #
3 # This program is free software; you can redistribute it and/or modify it
4 # under the terms of the license (GNU LGPL) which comes with this package.
5
6 """
7 This example implements the following scenario:
8 - Multiple workers consume jobs (Job) from a shared mailbox (worker)
9 - A client first dispatches several jobs (with a simulated 'cost' - i.e. time the worker will 'process' the job)
10 - The client then waits for all job results for a maximum time set by the 'wait timeout' (Comm.wait_all_for)
11 - The client then displays the status of individual jobs.
12 """
13
14
15 from argparse import ArgumentParser
16 from dataclasses import dataclass
17 from uuid import uuid4
18 import sys
19
20 from simgrid import Actor, Comm, Engine, Host, Mailbox, PyGetAsync, this_actor
21
22
23 SIMULATED_JOB_SIZE_BYTES = 1024
24 SIMULATED_RESULT_SIZE_BYTES = 1024 * 1024
25
26
27 def parse_requests(requests_str: str) -> list[float]:
28     return [float(item.strip()) for item in requests_str.split(",")]
29
30
31 def create_parser() -> ArgumentParser:
32     parser = ArgumentParser()
33     parser.add_argument(
34         '--platform',
35         type=str,
36         help='path to the platform description'
37     )
38     parser.add_argument(
39         "--workers",
40         type=int,
41         default=1,
42         help="number of worker actors to start"
43     )
44     parser.add_argument(
45         "--jobs",
46         type=parse_requests,
47         default="1,2,3,4,5",
48         help="duration of individual jobs sent to the workers by the client"
49     )
50     parser.add_argument(
51         "--wait-timeout",
52         type=float,
53         default=5.0,
54         help="number of seconds before the client gives up waiting for results and aborts the simulation"
55     )
56     return parser
57
58
59 @dataclass
60 class Job:
61     job_id: str
62     duration: float
63     result_mailbox: Mailbox
64
65
66 def worker(worker_id: str):
67     this_actor.info(f"{worker_id} started")
68     mailbox: Mailbox = Mailbox.by_name("worker")
69     while True:
70         job: Job = mailbox.get()
71         this_actor.info(f"{worker_id} working on {job.job_id} (will take {job.duration}s to complete)")
72         this_actor.sleep_for(job.duration)
73         job.result_mailbox.put(f"{worker_id}", SIMULATED_RESULT_SIZE_BYTES)
74
75
76 @dataclass
77 class AsyncJobResult:
78     job: Job
79     result_comm: Comm
80     async_data: PyGetAsync
81
82     @property
83     def complete(self) -> bool:
84         return self.result_comm.test()
85
86     @property
87     def status(self) -> str:
88         return "complete" if self.complete else "pending"
89
90
91 def client(client_id: str, jobs: list[float], wait_timeout: float):
92     worker_mailbox: Mailbox = Mailbox.by_name("worker")
93     this_actor.info(f"{client_id} started")
94     async_job_results: list[AsyncJobResult] = []
95     for job_idx, job_duration in enumerate(jobs):
96         result_mailbox: Mailbox = Mailbox.by_name(str(uuid4()))
97         job = Job(job_id=f"job-{job_idx}", duration=job_duration, result_mailbox=result_mailbox)
98         out_comm = worker_mailbox.put_init(Job(
99             job_id=f"job-{job_idx}",
100             duration=job_duration,
101             result_mailbox=result_mailbox
102         ), SIMULATED_JOB_SIZE_BYTES)
103         out_comm.detach()
104         result_comm, async_data = result_mailbox.get_async()
105         async_job_results.append(AsyncJobResult(
106             job=job,
107             result_comm=result_comm,
108             async_data=async_data
109         ))
110     this_actor.info(f"awaiting results for all jobs (timeout={wait_timeout}s)")
111     completed_comms = Comm.wait_all_for([entry.result_comm for entry in async_job_results], wait_timeout)
112     logger = this_actor.warning if completed_comms < len(async_job_results) else this_actor.info
113     logger(f"received {completed_comms}/{len(async_job_results)} results")
114     for result in async_job_results:
115         this_actor.info(f"{result.job.job_id}"
116                         f" status={result.status}"
117                         f" result_payload={result.async_data.get() if result.complete else ''}")
118
119
120 def main():
121     settings = create_parser().parse_known_args()[0]
122     e = Engine(sys.argv)
123     e.load_platform(settings.platform)
124     Actor.create("client", Host.by_name("Tremblay"), client, "client", settings.jobs, settings.wait_timeout)
125     for worker_idx in range(settings.workers):
126         Actor.create("worker", Host.by_name("Ruby"), worker, f"worker-{worker_idx}").daemonize()
127     e.run()
128
129
130 if __name__ == "__main__":
131     main()