Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Update copyright lines for 2023.
[simgrid.git] / examples / python / comm-waitany / comm-waitany.py
1 # Copyright (c) 2010-2023. The SimGrid Team. All rights reserved.
2 #
3 # This program is free software; you can redistribute it and/or modify it
4 # under the terms of the license (GNU LGPL) which comes with this package.
5
6 """
7 This example shows how to block on the completion of a set of communications.
8
9 As for the other asynchronous examples, the sender initiate all the messages it wants to send and
10 pack the resulting simgrid.Comm objects in a list. All messages thus occur concurrently.
11
12 The sender then loops until there is no ongoing communication. Using wait_any() ensures that the sender
13 will notice events as soon as they occur even if it does not follow the order of the container.
14
15 Here, finalize messages will terminate earlier because their size is 0, so they travel faster than the
16 other messages of this application.  As expected, the trace shows that the finalize of worker 1 is
17 processed before 'Message 5' that is sent to worker 0.
18 """
19
20 import sys
21 from simgrid import Actor, Comm, Engine, Host, Mailbox, this_actor
22
23
24 def sender(messages_count, msg_size, receivers_count):
25     # List in which we store all ongoing communications
26     pending_comms = []
27
28     # Vector of the used mailboxes
29     mboxes = [Mailbox.by_name("receiver-{:d}".format(i))
30               for i in range(0, receivers_count)]
31
32     # Start dispatching all messages to receivers, in a round robin fashion
33     for i in range(0, messages_count):
34         content = "Message {:d}".format(i)
35         mbox = mboxes[i % receivers_count]
36
37         this_actor.info("Send '{:s}' to '{:s}'".format(content, str(mbox)))
38
39         # Create a communication representing the ongoing communication, and store it in pending_comms
40         comm = mbox.put_async(content, msg_size)
41         pending_comms.append(comm)
42
43     # Start sending messages to let the workers know that they should stop
44     for i in range(0, receivers_count):
45         mbox = mboxes[i]
46         this_actor.info("Send 'finalize' to '{:s}'".format(str(mbox)))
47         comm = mbox.put_async("finalize", 0)
48         pending_comms.append(comm)
49
50     this_actor.info("Done dispatching all messages")
51
52     # Now that all message exchanges were initiated, wait for their completion, in order of completion.
53     #
54     # This loop waits for first terminating message with wait_any() and remove it with del, until all comms are
55     # terminated.
56     # Even in this simple example, the pending comms do not terminate in the exact same order of creation.
57     while pending_comms:
58         changed_pos = Comm.wait_any(pending_comms)
59         del pending_comms[changed_pos]
60         if changed_pos != 0:
61             this_actor.info(
62                 "Remove the {:d}th pending comm: it terminated earlier than another comm that was initiated first."
63                 .format(changed_pos))
64
65     this_actor.info("Goodbye now!")
66
67
68 def receiver(my_id):
69     mbox = Mailbox.by_name("receiver-{:d}".format(my_id))
70     this_actor.info("Wait for my first message")
71     while True:
72         received = mbox.get()
73         this_actor.info("I got a '{:s}'.".format(received))
74         if received == "finalize":
75             break  # If it's a finalize message, we're done.
76
77 if __name__ == '__main__':
78     e = Engine(sys.argv)
79
80     # Load the platform description
81     e.load_platform(sys.argv[1])
82
83     Actor.create("sender", Host.by_name("Tremblay"), sender, 6, 1000000, 2)
84     Actor.create("receiver", Host.by_name("Fafard"), receiver, 0)
85     Actor.create("receiver", Host.by_name("Jupiter"), receiver, 1)
86
87     e.run()