- # List in which we store all ongoing communications
- pending_comms = []
-
- # Vector of the used mailboxes
- mboxes = [Mailbox.by_name("receiver-{:d}".format(i))
- for i in range(0, receivers_count)]
-
- # Start dispatching all messages to receivers, in a round robin fashion
- for i in range(0, messages_count):
- content = "Message {:d}".format(i)
- mbox = mboxes[i % receivers_count]
-
- this_actor.info("Send '{:s}' to '{:s}'".format(content, str(mbox)))
-
- # Create a communication representing the ongoing communication, and store it in pending_comms
- comm = mbox.put_async(content, msg_size)
- pending_comms.append(comm)
-
- # Start sending messages to let the workers know that they should stop
- for i in range(0, receivers_count):
- mbox = mboxes[i]
- this_actor.info("Send 'finalize' to '{:s}'".format(str(mbox)))
- comm = mbox.put_async("finalize", 0)
- pending_comms.append(comm)
-
- this_actor.info("Done dispatching all messages")
-
- # Now that all message exchanges were initiated, wait for their completion, in order of completion.
- #
- # This loop waits for first terminating message with wait_any() and remove it with del, until all comms are
- # terminated.
- # Even in this simple example, the pending comms do not terminate in the exact same order of creation.
- while pending_comms:
- changed_pos = Comm.wait_any(pending_comms)
- del pending_comms[changed_pos]
- if (changed_pos != 0):
- this_actor.info(
- "Remove the {:d}th pending comm: it terminated earlier than another comm that was initiated first.".format(changed_pos))
-
- this_actor.info("Goodbye now!")
-
-
-def receiver(id):
- mbox = Mailbox.by_name("receiver-{:d}".format(id))
+ # List in which we store all ongoing communications
+ pending_comms = []
+
+ # Vector of the used mailboxes
+ mboxes = [Mailbox.by_name("receiver-{:d}".format(i))
+ for i in range(0, receivers_count)]
+
+ # Start dispatching all messages to receivers, in a round robin fashion
+ for i in range(0, messages_count):
+ content = "Message {:d}".format(i)
+ mbox = mboxes[i % receivers_count]
+
+ this_actor.info("Send '{:s}' to '{:s}'".format(content, str(mbox)))
+
+ # Create a communication representing the ongoing communication, and store it in pending_comms
+ comm = mbox.put_async(content, msg_size)
+ pending_comms.append(comm)
+
+ # Start sending messages to let the workers know that they should stop
+ for i in range(0, receivers_count):
+ mbox = mboxes[i]
+ this_actor.info("Send 'finalize' to '{:s}'".format(str(mbox)))
+ comm = mbox.put_async("finalize", 0)
+ pending_comms.append(comm)
+
+ this_actor.info("Done dispatching all messages")
+
+ # Now that all message exchanges were initiated, wait for their completion, in order of completion.
+ #
+ # This loop waits for first terminating message with wait_any() and remove it with del, until all comms are
+ # terminated.
+ # Even in this simple example, the pending comms do not terminate in the exact same order of creation.
+ while pending_comms:
+ changed_pos = Comm.wait_any(pending_comms)
+ del pending_comms[changed_pos]
+ if changed_pos != 0:
+ this_actor.info(
+ "Remove the {:d}th pending comm: it terminated earlier than another comm that was initiated first."
+ .format(changed_pos))
+
+ this_actor.info("Goodbye now!")
+
+
+def receiver(my_id):
+ mbox = Mailbox.by_name("receiver-{:d}".format(my_id))