Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Update copyright lines for 2023.
[simgrid.git] / examples / python / comm-waitallfor / comm-waitallfor.py
1 # Copyright (c) 2010-2023. The SimGrid Team. All rights reserved.
2 #
3 # This program is free software; you can redistribute it and/or modify it
4 # under the terms of the license (GNU LGPL) which comes with this package.
5
6 """
7 This example implements the following scenario:
8 - Multiple workers consume jobs (Job) from a shared mailbox (worker)
9 - A client first dispatches several jobs (with a simulated 'cost' - i.e. time the worker will 'process' the job)
10 - The client then waits for all job results for a maximum time set by the 'wait timeout' (Comm.wait_all_for)
11 - The client then displays the status of individual jobs.
12 """
13
14
15 from argparse import ArgumentParser
16 from dataclasses import dataclass
17 from typing import List
18 from uuid import uuid4
19 import sys
20
21 from simgrid import Actor, Comm, Engine, Host, Mailbox, PyGetAsync, this_actor
22
23
24 SIMULATED_JOB_SIZE_BYTES = 1024
25 SIMULATED_RESULT_SIZE_BYTES = 1024 * 1024
26
27
28 def parse_requests(requests_str: str) -> List[float]:
29     return [float(item.strip()) for item in requests_str.split(",")]
30
31
32 def create_parser() -> ArgumentParser:
33     parser = ArgumentParser()
34     parser.add_argument(
35         '--platform',
36         type=str,
37         required=True,
38         help='path to the platform description'
39     )
40     parser.add_argument(
41         "--workers",
42         type=int,
43         default=1,
44         help="number of worker actors to start"
45     )
46     parser.add_argument(
47         "--jobs",
48         type=parse_requests,
49         default="1,2,3,4,5",
50         help="duration of individual jobs sent to the workers by the client"
51     )
52     parser.add_argument(
53         "--wait-timeout",
54         type=float,
55         default=5.0,
56         help="number of seconds before the client gives up waiting for results and aborts the simulation"
57     )
58     return parser
59
60
61 @dataclass
62 class Job:
63     job_id: str
64     duration: float
65     result_mailbox: Mailbox
66
67
68 def worker(worker_id: str):
69     this_actor.info(f"{worker_id} started")
70     mailbox: Mailbox = Mailbox.by_name("worker")
71     while True:
72         job: Job = mailbox.get()
73         this_actor.info(f"{worker_id} working on {job.job_id} (will take {job.duration}s to complete)")
74         this_actor.sleep_for(job.duration)
75         job.result_mailbox.put(f"{worker_id}", SIMULATED_RESULT_SIZE_BYTES)
76
77
78 @dataclass
79 class AsyncJobResult:
80     job: Job
81     result_comm: Comm
82     async_data: PyGetAsync
83
84     @property
85     def complete(self) -> bool:
86         return self.result_comm.test()
87
88     @property
89     def status(self) -> str:
90         return "complete" if self.complete else "pending"
91
92
93 def client(client_id: str, jobs: List[float], wait_timeout: float):
94     worker_mailbox: Mailbox = Mailbox.by_name("worker")
95     this_actor.info(f"{client_id} started")
96     async_job_results: list[AsyncJobResult] = []
97     for job_idx, job_duration in enumerate(jobs):
98         result_mailbox: Mailbox = Mailbox.by_name(str(uuid4()))
99         job = Job(job_id=f"job-{job_idx}", duration=job_duration, result_mailbox=result_mailbox)
100         out_comm = worker_mailbox.put_init(Job(
101             job_id=f"job-{job_idx}",
102             duration=job_duration,
103             result_mailbox=result_mailbox
104         ), SIMULATED_JOB_SIZE_BYTES)
105         out_comm.detach()
106         result_comm, async_data = result_mailbox.get_async()
107         async_job_results.append(AsyncJobResult(
108             job=job,
109             result_comm=result_comm,
110             async_data=async_data
111         ))
112     this_actor.info(f"awaiting results for all jobs (timeout={wait_timeout}s)")
113     completed_comms = Comm.wait_all_for([entry.result_comm for entry in async_job_results], wait_timeout)
114     logger = this_actor.warning if completed_comms < len(async_job_results) else this_actor.info
115     logger(f"received {completed_comms}/{len(async_job_results)} results")
116     for result in async_job_results:
117         this_actor.info(f"{result.job.job_id}"
118                         f" status={result.status}"
119                         f" result_payload={result.async_data.get() if result.complete else ''}")
120
121
122 def main():
123     settings = create_parser().parse_known_args()[0]
124     e = Engine(sys.argv)
125     e.load_platform(settings.platform)
126     Actor.create("client", Host.by_name("Tremblay"), client, "client", settings.jobs, settings.wait_timeout)
127     for worker_idx in range(settings.workers):
128         Actor.create("worker", Host.by_name("Ruby"), worker, f"worker-{worker_idx}").daemonize()
129     e.run()
130
131
132 if __name__ == "__main__":
133     main()