1 # Copyright (c) 2010-2023. The SimGrid Team. All rights reserved.
3 # This program is free software; you can redistribute it and/or modify it
4 # under the terms of the license (GNU LGPL) which comes with this package.
6 from argparse import ArgumentParser
7 from typing import List
10 from simgrid import Actor, Comm, Engine, Mailbox, this_actor
13 FINALIZE_MESSAGE = "finalize"
16 def create_parser() -> ArgumentParser:
17 parser = ArgumentParser()
22 help='path to the platform description'
27 def get_peer_mailbox(peer_id: int) -> Mailbox:
28 return Mailbox.by_name(f"peer-{peer_id}")
31 def peer(my_id: int, message_count: int, payload_size: int, peers_count: int):
32 my_mailbox: Mailbox = get_peer_mailbox(my_id)
33 my_mailbox.set_receiver(Actor.self())
34 pending_comms: List[Comm] = []
35 # Start dispatching all messages to peers others that myself
36 for i in range(message_count):
37 for peer_id in range(peers_count):
39 peer_mailbox = get_peer_mailbox(peer_id)
40 message = f"Message {i} from peer {my_id}"
41 this_actor.info(f"Send '{message}' to '{peer_mailbox.name}'")
42 pending_comms.append(peer_mailbox.put_async(message, payload_size))
44 # Start sending messages to let peers know that they should stop
45 for peer_id in range(peers_count):
47 peer_mailbox = get_peer_mailbox(peer_id)
48 payload = str(FINALIZE_MESSAGE)
49 pending_comms.append(peer_mailbox.put_async(payload, payload_size))
50 this_actor.info(f"Send '{payload}' to '{peer_mailbox.name}'")
51 this_actor.info("Done dispatching all messages")
53 # Retrieve all the messages other peers have been sending to me until I receive all the corresponding "Finalize"
55 pending_finalize_messages = peers_count - 1
56 while pending_finalize_messages > 0:
59 received: str = my_mailbox.get()
60 waiting_time = Engine.clock - start
61 if waiting_time != 0.0:
62 raise AssertionError(f"Expecting the waiting time to be 0.0 because the communication was supposedly "
63 f"ready, but got {waiting_time} instead")
64 this_actor.info(f"I got a '{received}'.")
65 if received == FINALIZE_MESSAGE:
66 pending_finalize_messages -= 1
68 this_actor.info("Nothing ready to consume yet, I better sleep for a while")
69 this_actor.sleep_for(0.01)
71 this_actor.info("I'm done, just waiting for my peers to receive the messages before exiting")
72 Comm.wait_all(pending_comms)
73 this_actor.info("Goodbye now!")
77 settings = create_parser().parse_known_args()[0]
79 e.load_platform(settings.platform)
80 Actor.create("peer", e.host_by_name("Tremblay"), peer, 0, 2, int(5e7), 3)
81 Actor.create("peer", e.host_by_name("Ruby"), peer, 1, 6, int(2.5e5), 3)
82 Actor.create("peer", e.host_by_name("Perl"), peer, 2, 0, int(5e7), 3)
86 if __name__ == "__main__":