1 \section{General scheme of asynchronous parallel code with
2 computation/communication overlapping}
5 In the previous part, we have seen how to efficiently implement overlap of
6 computations (CPU and GPU) with communications (GPU transfers and inter-node
7 communications). However, we have previously shown that for some parallel
8 iterative algorithms, it is sometimes even more efficient to use an asynchronous
9 scheme of iterations\index{iterations!asynchronous} \cite{HPCS2002,ParCo05,Para10}. In that case, the nodes do
10 not wait for each others but they perform their iterations using the last
11 external data they have received from the other nodes, even if this
12 data was produced \emph{before} the previous iteration on the other nodes.
14 Formally, if we denote by $f=(f_1,...,f_n)$ the function representing the
15 iterative process and by $x^t=(x_1^t,...,x_n^t)$ the values of the $n$ elements of
16 the system at iteration $t$, we pass from a synchronous iterative scheme of the
19 \caption{Synchronous iterative scheme}\label{algo:ch6p2sync}
21 $x^{0}=(x_{1}^{0},...,x_{n}^{0})$\\
22 \textbf{for} $t=0,1,...$\\
23 \>\textbf{for} $i=1,...,n$\\
24 \>\>$x_{i}^{t+1}=f_{i}(x_{1}^t,...,x_i^t,...,x_{n}^t)$\\
30 to an asynchronous iterative scheme of the form:
32 \caption{Asynchronous iterative scheme}\label{algo:ch6p2async}
34 $x^{0}=(x_{1}^{0},...,x_{n}^{0})$\\
35 \textbf{for} $t=0,1,...$\\
36 \>\textbf{for} $i=1,...,n$\\
37 \>\>$x_{i}^{t+1}=\left\{
39 x_i^t & \text{if } i \text{ is \emph{not} updated at iteration } i\\
40 f_i(x_1^{s_1^i(t)},...,x_n^{s_n^i(t)}) & \text{if } i \text{ is updated at iteration } i
47 where $s_j^i(t)$ is the iteration number of the production of the value $x_j$ of
48 element $j$ that is used on element $i$ at iteration $t$ (see for example~\cite{BT89,
49 FS2000} for further details).
50 Such schemes are called AIAC\index{AIAC} for \emph{Asynchronous Iterations and
51 Asynchronous Communications}. They combine two aspects that are respectively
52 different computation speeds of the computing elements and communication delays
55 The key feature of such algorithmic schemes
56 is that they may be faster than their synchronous counterparts due to the
57 implied total overlap of computations with communications: in fact, this scheme
58 suppresses all the idle times induced by nodes synchronizations between each iteration.
60 However, the efficiency of such a scheme is directly linked to the frequency at
61 which new data arrives on each node. Typically,
62 if a node receives newer data only every four or five local iterations, it is
63 strongly probable that the evolution of its local iterative process will be
64 slower than if it receives data at every iteration. The key point here is that
65 this frequency does not only depend on the hardware configuration of the
66 parallel system but it also depends on the software that is used to
67 implement the algorithmic scheme.
69 The impact of the programming environments used to implement asynchronous
70 algorithms has already been investigated in~\cite{SuperCo05}. Although the
71 features required to efficiently implement asynchronous schemes have not
72 changed, the available programming environments and computing hardware have
73 evolved, in particular now GPUs are available. So, there is a need to reconsider the
74 implementation schemes of AIAC according to the new de facto standards for parallel
75 programming (communications and threads) as well as the integration of the GPUs.
76 One of the main objective here is to obtain a maximal overlap between the
77 activities of the three types of devices that are the CPU, the GPU and the
78 network. Moreover, another objective is to present what we think
79 is the best compromise between the simplicity of the implementation and its
80 maintainability on one side and its
81 performance on the other side. This is especially important for industries where
82 implementation and maintenance costs are strong constraints.
84 For the sake of clarity, we present the different algorithmic schemes in a progressive
85 order of complexity, from the basic asynchronous scheme to the complete scheme
86 with full overlap. Between these two extremes, we propose a synchronization
87 mechanism on top of our asynchronous scheme that can be used either statically or
88 dynamically during the application execution.
90 Although there exist several programming environments for inter-node
91 communications, multi-threading and GPU programming, a few of them have
92 become \emph{de facto standards}, either due to their good stability, their ease
93 of use and/or their wide adoption by the scientific community.
94 Therefore, as in the previous section all the schemes presented in the following use MPI~\cite{MPI},
95 OpenMP~\cite{openMP} and CUDA~\cite{CUDA}. However, there is no loss of
96 generality as those schemes may easily be implemented with other libraries.
98 Finally, in order to stay as clear as possible, only the parts of code and
99 variables related to the control of parallelism (communications, threads,...)
100 are presented in our schemes. The inner organization of data is not detailed as
101 it depends on the application. We only consider that we have two data arrays
102 (previous version and current version) and communication buffers. However, in
103 most of the cases, those buffers can correspond to the data arrays themselves to
106 \subsection{A basic asynchronous scheme}
107 \label{ch6:p2BasicAsync}
109 The first step toward our complete scheme is to implement a basic asynchronous
110 scheme that includes an actual overlap of the communications with the
111 computations\index{overlap!computation and communication}. In order to ensure that the communications are actually performed
112 in parallel of the computations, it is necessary to use different threads. It
113 is important to remember that asynchronous communications provided in
114 communication libraries like MPI are not systematically performed in parallel of
115 the computations~\cite{ChVCV13,Hoefler08a}. So, the logical and classical way
116 to implement such an overlap is to use three threads: one for
117 computing, one for sending and one for receiving. Moreover, since
118 the communication is performed by threads, blocking synchronous communications\index{MPI!communication!blocking}\index{MPI!communication!synchronous}
119 can be used without deteriorating the overall performance.
121 In this basic version, the termination\index{termination} of the global process is performed
122 individually on each node according to their own termination. This can be guided by either a
123 number of iterations or a local convergence detection\index{convergence detection}. The important step at
124 the end of the process is to perform the receptions of all pending
125 communications in order to ensure the termination of the two communication
128 So, the global organization of this scheme is set up in \Lst{algo:ch6p2BasicAsync}.
130 % \begin{algorithm}[H]
131 % \caption{Initialization of the basic asynchronous scheme.}
132 % \label{algo:ch6p2BasicAsync}
133 \begin{Listing}{algo:ch6p2BasicAsync}{Initialization of the basic asynchronous scheme}
134 // Variables declaration and initialization
135 omp_lock_t lockSend; // Controls the sendings from the computing thread
136 omp_lock_t lockRec; // Ensures the initial reception of external data
137 char Finished = 0; // Boolean indicating the end of the process
138 char SendsInProgress = 0; // Boolean indicating if previous data sendings are still in progress
139 double Threshold; // Threshold of the residual for convergence detection
141 // Parameters reading
144 // MPI initialization
145 MPI_Init_thread(argc, argv, MPI_THREAD_MULTIPLE, &provided);
146 MPI_Comm_size(MPI_COMM_WORLD, &nbP);
147 MPI_Comm_rank(MPI_COMM_WORLD, &numP);
149 // Data initialization and distribution among the nodes
152 // OpenMP initialization (mainly declarations and setting up of locks)
153 omp_set_num_threads(3);
154 omp_init_lock(&lockSend);
155 omp_set_lock(&lockSend); // Initially locked, unlocked to start sendings
156 omp_init_lock(&lockRec);
157 omp_set_lock(&lockRec); // Initially locked, unlocked when initial data are received
161 switch(omp_get_thread_num()){
163 computations(... @\emph{relevant parameters}@ ...);
176 // Cleaning of OpenMP locks
177 omp_test_lock(&lockSend);
178 omp_unset_lock(&lockSend);
179 omp_destroy_lock(&lockSend);
180 omp_test_lock(&lockRec);
181 omp_unset_lock(&lockRec);
182 omp_destroy_lock(&lockRec);
189 % \ANNOT{Est-ce qu'on laisse le 1er échange de données ou pas dans
190 % l'algo~\ref{algo:ch6p2BasicAsyncComp} ? (lignes 16-19)\\
191 % (ça n'est pas nécessaire si les données de
192 % départ sont distribuées avec les dépendances qui correspondent, sinon il faut
195 In this scheme, the \texttt{lockRec} mutex\index{OpenMP!mutex} is not mandatory.
196 It is only used to ensure that data dependencies are actually exchanged at the
197 first iteration of the process. Data initialization and distribution
198 (lines~16-17) are not detailed here because they are directly related to the
199 application. The important point is that, in most cases, they should be done
200 before the iterative process. The computing function is given in
201 \Lst{algo:ch6p2BasicAsyncComp}.
203 %\begin{algorithm}[H]
204 % \caption{Computing function in the basic asynchronous scheme.}
205 % \label{algo:ch6p2BasicAsyncComp}
206 \begin{Listing}{algo:ch6p2BasicAsyncComp}{Computing function in the basic asynchronous scheme}
207 // Variables declaration and initialization
208 int iter = 1; // Number of the current iteration
209 double difference; // Variation of one element between two iterations
210 double residual; // Residual of the current iteration
214 // Sendings of data dependencies if there is no previous sending in progress
215 if(!SendsInProgress){
216 // Potential copy of data to be sent in additional buffers
218 // Change of sending state
220 omp_unset_lock(&lockSend);
223 // Blocking receptions at the first iteration
225 omp_set_lock(&lockRec);
228 // Initialization of the residual
230 // Swapping of data arrays (current and previous)
231 tmp = current; // Pointers swapping to avoid
232 current = previous; // actual data copies between
233 previous = tmp; // the two data versions
234 // Computation of current iteration over local data
235 for(ind=0; ind<localSize; ++ind){
236 // Updating of current array using previous array
238 // Updating of the residual
239 // (max difference between two successive iterations)
240 difference = fabs(current[ind] - previous[ind]);
241 if(difference > residual){
242 residual = difference;
246 // Checking of the end of the process (residual under threshold)
247 // Other conditions can be added to the termination detection
248 if(residual <= Threshold){
250 omp_unset_lock(&lockSend); // Activation of end messages sendings
251 MPI_Ssend(&Finished, 1, MPI_CHAR, numP, tagEnd, MPI_COMM_WORLD);
254 // Updating of the iteration number
260 As mentioned above, it can be seen in line~18 of \Lst{algo:ch6p2BasicAsyncComp}
261 that the \texttt{lockRec} mutex is used only at the first iteration to wait for
262 the initial data dependencies before the computations. The residual\index{residual}, initialized
263 in line~23 and computed in lines~34-37, is defined by the maximal difference
264 between the elements from two consecutive iterations. It is classically used to
265 detect the local convergence of the process on each node. In the more
266 complete schemes presented in the sequel, a global termination detection that
267 takes the states of all the nodes into account will be exhibited.
269 Finally, the local convergence is tested and updated when necessary. In line~44,
270 the \texttt{lockSend} mutex is unlocked to allow the sending function to send
271 final messages to the dependency nodes. Those messages are required to keep the
272 reception function alive until all the final messages have been received.
273 Otherwise, a node could stop its reception function whereas other nodes are
274 still trying to communicate with it. Moreover, a local sending of a final
275 message to the node itself is required (line~45) to ensure that the reception
276 function will not stay blocked in a message probing
277 (see~\Lst{algo:ch6p2BasicAsyncReceptions}, line~11). This may happen if the node
278 receives the final messages from its dependencies \emph{before} being itself in
281 All the messages but this final local one are performed in the sending function
282 described in \Lst{algo:ch6p2BasicAsyncSendings}.
284 The main loop is only conditioned by the end of the computing process (line~4).
285 At each iteration, the thread waits for the permission from the computing thread
286 (according to the \texttt{lockSend} mutex). Then, data are sent with
287 blocking synchronous communications. The \texttt{SendsInProgress} boolean
288 allows the computing thread to skip data sendings as long as a previous sending
289 is in progress. This skip is possible due to the nature of asynchronous
290 algorithms that allows such \emph{message loss}\index{message!loss/miss} or \emph{message miss}. After
291 the main loop, the final messages are sent to the dependencies of the node.
293 %\begin{algorithm}[H]
294 % \caption{Sending function in the basic asynchronous scheme.}
295 % \label{algo:ch6p2BasicAsyncSendings}
296 \begin{Listing}{algo:ch6p2BasicAsyncSendings}{Sending function in the basic asynchronous scheme}
297 // Variables declaration and initialization
301 omp_set_lock(&lockSend); // Waiting for signal from the comp. thread
303 // Blocking synchronous sends to all dependencies
304 for(i=0; i<nbDeps; ++i){
305 MPI_Ssend(&dataToSend[deps[i]], nb_data, type_of_data, deps[i], tagCom, MPI_COMM_WORLD);
307 SendsInProgress = 0; // Indicates that the sendings are done
310 // At the end of the process, sendings of final messages
311 for(i=0; i<nbDeps; ++i){
312 MPI_Ssend(&Finished, 1, MPI_CHAR, deps[i], tagEnd, MPI_COMM_WORLD);
317 The last function, detailed in \Lst{algo:ch6p2BasicAsyncReceptions}, does all the messages receptions.
318 %\begin{algorithm}[H]
319 % \caption{Reception function in the basic asynchronous scheme.}
320 % \label{algo:ch6p2BasicAsyncReceptions}
321 \begin{Listing}{algo:ch6p2BasicAsyncReceptions}{Reception function in the basic asynchronous scheme}
322 // Variables declaration and initialization
323 char countReceipts = 0; // Boolean indicating whether receptions are counted or not
324 int nbEndMsg = 0; // Number of end messages received
325 int arrived = 0; // Boolean indicating if a message is arrived
326 int srcNd; // Source node of the message
327 int size; // Message size
329 // Main loop of receptions
331 // Waiting for an incoming message
332 MPI_Probe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &status);
334 // Management of data messages
335 switch(status.MPI_TAG){
336 case tagCom: // Management of data messages
337 srcNd = status.MPI_SOURCE; // Get the source node of the message
338 // Actual data reception in the corresponding buffer
339 MPI_Recv(dataBufferOf(srcNd), nbDataOf(srcNd), dataTypeOf(srcNd), srcNd, tagCom, MPI_COMM_WORLD, &status);
340 // Unlocking of the computing thread when data are received from all dependencies
341 if(countReceipts == 1 && ... @\emph{receptions from ALL dependencies}@ ...){
342 omp_unset_lock(&lockRec);
343 countReceipts = 0; // No more counting after first iteration
346 case tagEnd: // Management of end messages
347 // Actual end message reception in dummy buffer
348 MPI_Recv(dummyBuffer, 1, MPI_CHAR, status.MPI_SOURCE, tagEnd, MPI_COMM_WORLD, &status);
354 // Reception of pending messages and counting of end messages
355 do{ // Loop over the remaining incoming/waited messages
356 MPI_Probe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &status);
357 MPI_Get_count(&status, MPI_CHAR, &size);
358 // Actual reception in dummy buffer
359 MPI_Recv(dummyBuffer, size, MPI_CHAR, status.MPI_SOURCE, status.MPI_TAG, MPI_COMM_WORLD, &status);
360 if(status.MPI_TAG == tagEnd){ // Counting of end messages
363 MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &arrived, &status);
364 }while(arrived == 1 || nbEndMsg < nbDeps + 1);
368 As in the sending function, the main loop of receptions is done while the
369 iterative process is not \texttt{Finished}. In line~11, the thread waits until
370 a message arrives on the node. Then, it performs the actual reception and the
371 corresponding subsequent actions (potential data copies for data messages and
372 counting for end messages). Lines 20-23 check that all data dependencies have
373 been received before unlocking the \texttt{lockRec} mutex. As mentioned before,
374 they are not mandatory and are included only to ensure that all data
375 dependencies are received at the first iteration. Lines 25-28 are required to
376 manage end messages that arrive on the node \emph{before} it reaches its own
377 termination process. As the nodes are \emph{not} synchronized, this may happen.
378 Finally, lines 34-43 perform the receptions of all pending communications,
379 including the remaining end messages (at least the one from the node itself).
382 So, with those algorithms, we obtain a quite simple and efficient asynchronous
383 iterative scheme. It is interesting to notice that GPU computing can be easily
384 included in the computing thread. This will be fully addressed in
385 paragraph~\ref{ch6:p2GPUAsync}. However, before presenting the complete
386 asynchronous scheme with GPU computing, we have to detail how our initial scheme
387 can be made synchronous.
389 \subsection{Synchronization of the asynchronous scheme}
390 \label{ch6:p2SsyncOverAsync}
392 The presence of synchronization in the previous scheme may seem contradictory to our goal,
393 and obviously, it is neither the simplest way to obtain a synchronous scheme nor
394 the most efficient (as presented in~\Sec{ch6:part1}). However, it is necessary
395 for our global convergence detection strategy\index{convergence detection!global}. Recall that the
396 global convergence\index{convergence!global} is the extension of the local convergence\index{convergence!local} concept to all the
397 nodes. This implies that all the nodes have to be in local convergence at the
398 same time to achieve global convergence. Typically, if we use the residual and a
399 threshold to stop the iterative process, all the nodes have to continue their
400 local iterative process until \emph{all} of them obtain a residual under the
403 % Moreover, it allows the gathering of both operating modes in a single source
404 % code, which tends to improve the implementation and maintenance costs when both
405 % versions are required.
406 % The second one is that
407 In our context, the interest of being able to dynamically change the operating
408 mode (sync/async) during the process execution, is that this strongly simplifies
409 the global convergence detection. In fact, our past experience in the design
410 and implementation of global convergence detection in asynchronous
411 algorithms~\cite{SuperCo05, BCC07, Vecpar08a}, have led us to the conclusion
412 that although a decentralized detection scheme is possible and may be more
413 efficient in some situations, its much higher complexity is an
414 obstacle to actual use in practice, especially in industrial contexts where
415 implementation/maintenance costs are strong constraints. Moreover, although the
416 decentralized scheme does not slow down the computations, it requires more iterations than a synchronous version and
417 thus may induce longer detection times in some cases. So, the solution we
418 present below is a good compromise between simplicity and efficiency. It
419 consists in dynamically changing the operating mode between asynchronous and synchronous
420 during the execution of the process in order to check the global convergence.
421 This is why we need to synchronize our asynchronous scheme.
423 In each algorithm of the initial scheme, we only exhibit the additional code
424 required to change the operating mode.
426 %\begin{algorithm}[H]
427 % \caption{Initialization of the synchronized scheme.}
428 % \label{algo:ch6p2Sync}
429 \begin{Listing}{algo:ch6p2Sync}{Initialization of the synchronized scheme}
430 // Variables declarations and initialization
432 omp_lock_t lockStates; // Controls the synchronous exchange of local states
433 omp_lock_t lockIter; // Controls the synchronization at the end of each iteration
434 char localCV = 0; // Boolean indicating whether the local stabilization is reached or not
435 int nbOtherCVs = 0; // Number of other nodes being in local stabilization
437 // Parameters reading
439 // MPI initialization
441 // Data initialization and distribution among the nodes
443 // OpenMP initialization (mainly declarations and setting up of locks)
445 omp_init_lock(&lockStates);
446 omp_set_lock(&lockStates); // Initially locked, unlocked when all state messages are received
447 omp_init_lock(&lockIter);
448 omp_set_lock(&lockIter); // Initially locked, unlocked when all "end of iteration" messages are received
453 switch(omp_get_thread_num()){
458 // Cleaning of OpenMP locks
460 omp_test_lock(&lockStates);
461 omp_unset_lock(&lockStates);
462 omp_destroy_lock(&lockStates);
463 omp_test_lock(&lockIter);
464 omp_unset_lock(&lockIter);
465 omp_destroy_lock(&lockIter);
472 As can be seen in \Lst{algo:ch6p2Sync}, the synchronization implies two
473 additional mutex. The \texttt{lockStates} mutex is used to wait for the
474 receptions of all state messages coming from the other nodes. As shown
475 in \Lst{algo:ch6p2SyncComp}, those messages contain only a boolean indicating
476 for each node if it is in local convergence\index{convergence!local}. So, once all the states are
477 received on a node, it is possible to determine if all the nodes are in local
478 convergence, and thus to detect the global convergence. The \texttt{lockIter}
479 mutex is used to synchronize all the nodes at the end of each iteration. There
480 are also two new variables that respectively represent the local state of the
481 node (\texttt{localCV}) according to the iterative process (convergence) and the
482 number of other nodes that are in local convergence (\texttt{nbOtherCVs}).
484 The computation thread is where most of the modifications take place, as shown
485 in \Lst{algo:ch6p2SyncComp}.
487 %\begin{algorithm}[H]
488 % \caption{Computing function in the synchronized scheme.}
489 % \label{algo:ch6p2SyncComp}
490 \begin{Listing}{algo:ch6p2SyncComp}{Computing function in the synchronized scheme}
491 // Variables declarations and initialization
496 // Sendings of data dependencies at @\emph{each}@ iteration
497 // Potential copy of data to be sent in additional buffers
499 omp_unset_lock(&lockSend);
501 // Blocking receptions at @\emph{each}@ iteration
502 omp_set_lock(&lockRec);
505 // (init of residual, arrays swapping and iteration computation)
508 // Checking of the stabilization of the local process
509 // Other conditions than the residual can be added
510 if(residual <= Threshold){
516 // Global exchange of local states of the nodes
517 for(ind=0; ind<nbP; ++ind){
519 MPI_Ssend(&localCV, 1, MPI_CHAR, ind, tagState, MPI_COMM_WORLD);
523 // Waiting for the state messages receptions from the other nodes
524 omp_set_lock(&lockStates);
526 // Determination of global convergence (if all nodes are in local CV)
527 if(localCV + nbOtherCVs == nbP){
528 // Entering global CV state
530 // Unlocking of sending thread to start sendings of end messages
531 omp_unset_lock(&lockSend);
532 MPI_Ssend(&Finished, 1, MPI_CHAR, numP, tagEnd, MPI_COMM_WORLD);
534 // Resetting of information about the states of the other nodes
536 // Global barrier at the end of each iteration during the process
537 for(ind=0; ind<nbP; ++ind){
539 MPI_Ssend(&Finished, 1, MPI_CHAR, ind, tagIter, MPI_COMM_WORLD);
542 omp_set_lock(&lockIter);
546 // Updating of the iteration number
552 Most of the added code is related to the waiting for specific communications.
553 Between lines~6 and~7, the use of the flag \texttt{SendsInProgress} is no longer
554 needed since the sends are performed at each iteration. In line~12, the
555 thread waits for the data receptions from its dependencies. In
556 lines~26-34, the local states are determined and exchanged among all nodes. A
557 new message tag (\texttt{tagState}) is required for identifying those messages.
558 In line~37, the global termination state is determined. When it is reached,
559 lines~38-42 change the \texttt{Finished} boolean to stop the iterative process,
560 and send the end messages. Otherwise each node resets its local state
561 information about the other nodes and a global barrier is done between all the
562 nodes at the end of each iteration with another new tag (\texttt{tagIter}).
563 That barrier is needed to ensure that data messages from successive iterations
564 are actually received during the \emph{same} iteration on the destination nodes.
565 Nevertheless, it is not useful at the termination of the global process as it is
566 replaced by the global exchange of end messages.
568 There is no big modification induced by the synchronization in the sending
569 function. The only change could be the suppression of line~11 that is not useful
570 in this case. Apart from that, the function stays the same as
571 in \Lst{algo:ch6p2BasicAsyncSendings}.
573 In the reception function, given in \Lst{algo:ch6p2SyncReceptions}, there are
574 mainly two insertions (in lines~19-30 and 31-40), corresponding to the
575 additional types of messages to receive. There is also the insertion of three
576 variables that are used for the receptions of the new message types. In
577 lines~24-29 and 34-39 are located messages counting and mutex unlocking
578 mechanisms that are used to block the computing thread at the corresponding steps of its
579 execution. They are similar to the mechanism used for managing the end messages
580 at the end of the entire process. Line~23 directly updates the
581 number of other nodes that are in local convergence by adding the
582 received state of the source node. This is possible due to the encoding that is used to
583 represent the local convergence (1) and the non-convergence (0).
585 %\begin{algorithm}[H]
586 % \caption{Reception function in the synchronized scheme.}
587 % \label{algo:ch6p2SyncReceptions}
588 \begin{Listing}{algo:ch6p2SyncReceptions}{Reception function in the synchronized scheme}
589 // Variables declarations and initialization
591 int nbStateMsg = 0; // Number of local state messages received
592 int nbIterMsg = 0; // Number of "end of iteration" messages received
593 char recvdState; // Received state from another node (0 or 1)
595 // Main loop of receptions
597 // Waiting for an incoming message
598 MPI_Probe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &status);
600 switch(status.MPI_TAG){ // Actions related to message type
601 case tagCom: // Management of data messages
604 case tagEnd: // Management of termination messages
607 case tagState: // Management of local state messages
608 // Actual reception of the message
609 MPI_Recv(&recvdState, 1, MPI_CHAR, status.MPI_SOURCE, tagState, MPI_COMM_WORLD, &status);
610 // Updates of numbers of stabilized nodes and received state msgs
611 nbOtherCVs += recvdState;
613 // Unlocking of the computing thread when states of all other nodes are received
614 if(nbStateMsg == nbP-1){
616 omp_unset_lock(&lockStates);
619 case tagIter: // Management of "end of iteration" messages
620 // Actual reception of the message in dummy buffer
621 MPI_Recv(dummyBuffer, 1, MPI_CHAR, status.MPI_SOURCE, tagIter, MPI_COMM_WORLD, &status);
622 nbIterMsg++; // Update of the nb of iteration messages
623 // Unlocking of the computing thread when iteration messages are received from all other nodes
624 if(nbIterMsg == nbP - 1){
626 omp_unset_lock(&lockIter);
633 // Reception of pending messages and counting of end messages
634 do{ // Loop over the remaining incoming/waited messages
636 }while(arrived == 1 || nbEndMsg < nbDeps + 1);
640 Now that we can synchronize our asynchronous scheme, the final step is to
641 dynamically alternate the two operating modes in order to regularly check the
642 global convergence of the iterative process. This is detailed in the following
643 paragraph together with the inclusion of GPU computing in the final asynchronous
646 \subsection{Asynchronous scheme using MPI, OpenMP and CUDA}
647 \label{ch6:p2GPUAsync}
649 As mentioned above, the strategy proposed to obtain a good compromise between
650 simplicity and efficiency in the asynchronous scheme is to dynamically change
651 the operating mode of the process. A good way to obtain a maximal
652 simplification of the final scheme while preserving good performance is to
653 perform local and global convergence detections only in synchronous
654 mode. Moreover, as two successive iterations are sufficient in synchronous mode
655 to detect local and global convergences, the key is to alternate some
656 asynchronous iterations with two synchronous iterations until
659 The last problem is to decide \emph{when} to switch from the
660 asynchronous to the synchronous mode. Here again, for the sake of simplicity, any
661 asynchronous mechanism for \emph{detecting} such instant is avoided and we
662 prefer to use a mechanism that is local to each node. Obviously, that local system must
663 rely neither on the number of local iterations done nor on the local
664 convergence. The former would slow down the fastest nodes according to the
665 slowest ones. The latter would provoke too much synchronization because the
666 residuals on all nodes commonly do not evolve in the same way and, in most
667 cases, there is a convergence wave phenomenon throughout the elements. So, a
668 good solution is to insert a local timer mechanism on each node with a given
669 initial duration. Then, that duration may be modified during the execution
670 according to the successive results of the synchronous sections.
672 Another problem induced by entering synchronous mode from the asynchronous one
673 is the possibility to receive some data messages
674 from previous asynchronous iterations during synchronous iterations. This could lead to deadlocks. In order
675 to avoid this, a wait of the end of previous send is added to the
676 transition between the two modes. This is implemented by replacing the variable
677 \texttt{SendsInProgress} by a mutex \texttt{lockSendsDone} which is unlocked
678 once all the messages have been sent in the sending function. Moreover, it is
679 also necessary to stamp data messages\index{message!stamping} (by the function \texttt{stampData}) with
680 a Boolean indicating whether they have been sent during a synchronous or
681 asynchronous iteration. Then, the \texttt{lockRec} mutex is unlocked only
682 after to the complete reception of data messages from synchronous
683 iterations. The message ordering of point-to-point communications in MPI
684 together with the barrier at the end of each iteration ensure two important
685 properties of this mechanism. First, data messages from previous
686 asynchronous iterations will be received but not taken into account during
687 synchronous sections. Then, a data message from a synchronous
688 iteration cannot be received in another synchronous iteration. In the
689 asynchronous sections, no additional mechanism is needed as there are no such
690 constraints concerning the data receptions.
692 Finally, the required modifications of the previous scheme
693 % to make it dynamically change its operating mode (asynchronous or synchronous)
694 are mainly related to the computing thread. Small additions or modifications
695 are also required in the main process and the other threads.
697 In the main process, two new variables are added to store respectively the main
698 operating mode of the iterative process (\texttt{mainMode}) and the duration of
699 asynchronous sections (\texttt{asyncDuration}). Those variables are
700 initialized by the programmer. The \texttt{lockSendsDone} mutex is also declared,
701 initialized (locked) and destroyed with the other mutex in this process.
703 In the computing function, shown in \Lst{algo:ch6p2AsyncSyncComp}, the
704 modifications consist of the insertion of the timer mechanism and of the conditions
705 to differentiate the actions to be done in each mode. Some additional variables
706 are also required to store the current operating mode in action during the
707 execution (\texttt{curMode}), the starting time of the current asynchronous
708 section (\texttt{asyncStart}) and the number of successive synchronous
709 iterations done (\texttt{nbSyncIter}).
711 %\begin{algorithm}[H]
712 % \caption{Computing function in the final asynchronous scheme.}% without GPU computing.}
713 % \label{algo:ch6p2AsyncSyncComp}
715 \begin{Listing}{algo:ch6p2AsyncSyncComp}{Computing function in the final asynchronous scheme}% without GPU computing.}
716 // Variables declarations and initialization
718 OpMode curMode = SYNC; // Current operating mode (always begin in sync)
719 double asyncStart; // Starting time of the current async section
720 int nbSyncIter = 0; // Number of sync iterations done in async mode
724 // Determination of the dynamic operating mode
725 if(curMode == ASYNC){
726 // Entering synchronous mode when asyncDuration is reached
727 @% // (additional conditions can be specified if needed)
728 @ if(MPI_Wtime() - asyncStart >= asyncDuration){
729 // Waiting for the end of previous sends before starting sync mode
730 omp_set_lock(&lockSendsDone);
731 curMode = SYNC; // Entering synchronous mode
732 stampData(dataToSend, SYNC); // Mark data to send with sync flag
736 // In main async mode, going back to async mode when the max number of sync iterations are done
737 if(mainMode == ASYNC){
738 nbSyncIter++; // Update of the number of sync iterations done
740 curMode = ASYNC; // Going back to async mode
741 stampData(dataToSend, ASYNC); // Mark data to send
742 asyncStart = MPI_Wtime(); // Get the async starting time
747 // Sendings of data dependencies
748 if(curMode == SYNC || !SendsInProgress){
752 // Blocking data receptions in sync mode
754 omp_set_lock(&lockRec);
758 // (init of residual, arrays swapping and iteration computation)
761 // Checking of convergences (local & global) only in sync mode
763 // Local convergence checking (residual under threshold)
765 // Blocking global exchange of local states of the nodes
767 // Determination of global convergence (all nodes in local CV)
768 // Stop of the iterative process and sending of end messages
769 // or Re-initialization of state information and iteration barrier
774 // Updating of the iteration number
780 In the sending function, the only modification is the replacement in line~11 of
781 the assignment of variable \texttt{SendsInProgress} by the unlocking of
782 \texttt{lockSendsDone}. Finally, in the reception function, the only
783 modification is the insertion before line~19
784 of \Lst{algo:ch6p2BasicAsyncReceptions} of the extraction of the stamp from the
785 message and its counting among the receipts only if the stamp is \texttt{SYNC}.
787 The final step to get our complete scheme using GPU is to insert the GPU
788 management in the computing thread. The first possibility, detailed
789 in \Lst{algo:ch6p2syncGPU}, is to simply replace the
790 CPU kernel (lines~41-43 in \Lst{algo:ch6p2AsyncSyncComp}) by a blocking GPU kernel call. This includes data
791 transfers from the node RAM to the GPU RAM, the launching of the GPU kernel, the
792 waiting for kernel completion and the results transfers from GPU RAM to
795 %\begin{algorithm}[H]
796 % \caption{Computing function in the final asynchronous scheme.}
797 % \label{algo:ch6p2syncGPU}
798 \begin{Listing}{algo:ch6p2syncGPU}{Computing function in the final asynchronous scheme}
799 // Variables declarations and initialization
801 dim3 Dg, Db; // CUDA kernel grids
805 // Determination of the dynamic operating mode, sendings of data dependencies and blocking data receptions in sync mode
807 // Local GPU computation
808 // Data transfers from node RAM to GPU
809 CHECK_CUDA_SUCCESS(cudaMemcpyToSymbol(dataOnGPU, dataInRAM, inputsSize, 0, cudaMemcpyHostToDevice), "Data transfer");
810 ... // There may be several data transfers: typically A and b in linear problems
811 // GPU grid definition
812 Db.x = BLOCK_SIZE_X; // BLOCK_SIZE_# are kernel design dependent
815 Dg.x = localSize/BLOCK_SIZE_X + (localSize%BLOCK_SIZE_X ? 1 : 0);
816 Dg.y = localSize/BLOCK_SIZE_Y + (localSize%BLOCK_SIZE_Y ? 1 : 0);
817 Dg.z = localSize/BLOCK_SIZE_Z + (localSize%BLOCK_SIZE_Z ? 1 : 0);
818 // Use of shared memory (when possible)
819 cudaFuncSetCacheConfig(gpuKernelName, cudaFuncCachePreferShared);
821 gpuKernelName<<<Dg,Db>>>(... @\emph{kernel parameters}@ ...);
822 // Waiting for kernel completion
823 cudaDeviceSynchronize();
824 // Results transfer from GPU to node RAM
825 CHECK_CUDA_SUCCESS(cudaMemcpyFromSymbol(resultsInRam, resultsOnGPU, resultsSize, 0, cudaMemcpyDeviceToHost), "Results transfer");
826 // Potential post-treatment of results on the CPU
829 // Convergences checking
835 This scheme provides asynchronism through a cluster of GPUs as well as a
836 complete overlap of communications with GPU computations (similarly
837 to~\Sec{ch6:part1}). However, the autonomy of GPU devices according to their
838 host can be further exploited in order to perform some computations on the CPU
839 while the GPU kernel is running. The nature of computations that can be done by
840 the CPU may vary depending on the application. For example, when processing data
841 streams (pipelines), pre-processing of next data item and/or post-processing of
842 previous result can be done on the CPU while the GPU is processing the current
843 data item. In other cases, the CPU can perform \emph{auxiliary}
844 computations\index{computation!auxiliary}
845 that are not absolutely required to obtain the result but that may accelerate
846 the entire iterative process. Another possibility would be to distribute the
847 main computations between the GPU and CPU. However, this
848 usually leads to poor performance increases. This is mainly due to data
849 dependencies that often require additional transfers between CPU and GPU.
851 So, if we consider that the application enables such overlap of
852 computations, its implementation is straightforward as it consists in inserting
853 the additional CPU computations between lines~23 and~24
854 in \Lst{algo:ch6p2syncGPU}. Nevertheless, such scheme is fully efficient only
855 if the computation times on both sides are similar.
857 In some cases, especially with auxiliary computations, another interesting
858 solution is to add a fourth CPU thread to perform them. This suppresses the
859 duration constraint over those optional computations as they are performed in
860 parallel of the main iterative process, without blocking it. Moreover, this
861 scheme stays coherent with current architectures as most nodes include four CPU
862 cores. The algorithmic scheme of such context of complete overlap of
863 CPU/GPU computations and communications is described in
864 Listings~\ref{algo:ch6p2FullOverAsyncMain},~\ref{algo:ch6p2FullOverAsyncComp1}
865 and~\ref{algo:ch6p2FullOverAsyncComp2}, where we suppose that auxiliary
866 computations use intermediate results of the main computation process from any previous iteration. This may be
867 different according to the application.
869 %\begin{algorithm}[H]
870 % \caption{Initialization of the main process of complete overlap with asynchronism.}
871 % \label{algo:ch6p2FullOverAsyncMain}
873 \begin{Listing}{algo:ch6p2FullOverAsyncMain}{Initialization of the main process of complete overlap with asynchronism}
874 // Variables declarations and initialization
876 omp_lock_t lockAux; // Informs main thread about new aux results
877 omp_lock_t lockRes; // Informs aux thread about new results
878 omp_lock_t lockWrite; // Controls exclusion of results access
879 ... auxRes ... ; // Results of auxiliary computations
881 // Parameters reading, MPI initialization, data initialization and distribution
883 // OpenMP initialization
885 omp_init_lock(&lockAux);
886 omp_set_lock(&lockAux); // Unlocked when new aux results are available
887 omp_init_lock(&lockRes);
888 omp_set_lock(&lockRes); // Unlocked when new results are available
889 omp_init_lock(&lockWrite);
890 omp_unset_lock(&lockWrite); // Controls access to results from threads
894 switch(omp_get_thread_num()){
896 computations(... @\emph{relevant parameters}@ ...);
900 auxComps(... @\emph{relevant parameters}@ ...);
913 // Cleaning of OpenMP locks
915 omp_test_lock(&lockAux);
916 omp_unset_lock(&lockAux);
917 omp_destroy_lock(&lockAux);
918 omp_test_lock(&lockRes);
919 omp_unset_lock(&lockRes);
920 omp_destroy_lock(&lockRes);
921 omp_test_lock(&lockWrite);
922 omp_unset_lock(&lockWrite);
923 omp_destroy_lock(&lockWrite);
930 %\begin{algorithm}[H]
931 % \caption{Computing function in the final asynchronous scheme with CPU/GPU overlap.}
932 % \label{algo:ch6p2FullOverAsyncComp1}
934 \begin{Listing}{algo:ch6p2FullOverAsyncComp1}{Computing function in the final asynchronous scheme with CPU/GPU overlap}
935 // Variables declarations and initialization
937 dim3 Dg, Db; // CUDA kernel grids
941 // Determination of the dynamic operating mode, sendings of data dependencies and blocking data receptions in sync mode
943 // Local GPU computation
944 // Data transfers from node RAM to GPU, GPU grid definition and init of shared mem
945 CHECK_CUDA_SUCCESS(cudaMemcpyToSymbol(dataOnGPU, dataInRAM, inputsSize, 0, cudaMemcpyHostToDevice), "Data transfer");
948 gpuKernelName<<<Dg,Db>>>(... @\emph{kernel parameters}@ ...);
949 // Potential pre/post-treatments in pipeline like computations
951 // Waiting for kernel completion
952 cudaDeviceSynchronize();
953 // Results transfer from GPU to node RAM
954 omp_set_lock(&lockWrite); // Wait for write access to resultsInRam
955 CHECK_CUDA_SUCCESS(cudaMemcpyFromSymbol(resultsInRam, resultsOnGPU, resultsSize, 0, cudaMemcpyDeviceToHost), "Results transfer");
956 // Potential post-treatments in non-pipeline computations
958 omp_unset_lock(&lockWrite); // Give back read access to aux thread
959 omp_test_lock(&lockRes);
960 omp_unset_lock(&lockRes); // Informs aux thread of new results
962 // Auxiliary computations availability checking
963 if(omp_test_lock(&lockAux)){
964 // Use auxRes to update the iterative process
965 ... // May induce additional GPU transfers
968 // Convergences checking
970 // Local convergence checking and global exchange of local states
972 // Determination of global convergence (all nodes in local CV)
973 if(cvLocale == 1 && nbCVLocales == nbP-1){
974 // Stop of the iterative process and sending of end messages
976 // Unlocking of aux thread for termination
977 omp_test_lock(&lockRes);
978 omp_unset_lock(&lockRes);
980 // Re-initialization of state information and iteration barrier
988 %\begin{algorithm}[H]
989 % \caption{Auxiliary computing function in the final asynchronous scheme with CPU/GPU overlap.}
990 % \label{algo:ch6p2FullOverAsyncComp2}
992 \begin{Listing}{algo:ch6p2FullOverAsyncComp2}{Auxiliary computing function in the final asynchronous scheme with CPU/GPU overlap}
993 // Variables declarations and initialization
994 ... auxInput ... // Local array for input data
998 // Data copy from resultsInRam into auxInput
999 omp_set_lock(&lockRes); // Waiting for new results from main comps
1001 omp_set_lock(&lockWrite); // Waiting for access to results
1002 for(ind=0; ind<resultsSize; ++ind){
1003 auxInput[ind] = resultsInRam[ind];
1005 omp_unset_lock(&lockWrite); // Give back write access to main thread
1006 // Auxiliary computations with possible interruption at the end
1007 for(ind=0; ind<auxSize && !Finished; ++ind){
1008 // Computation of auxRes array according to auxInput
1011 // Informs main thread that new aux results are available in auxData
1012 omp_test_lock(&lockAux); // Ensures mutex is locked when unlocking
1013 omp_unset_lock(&lockAux);
1019 As can be seen in \Lst{algo:ch6p2FullOverAsyncMain}, there are three additional
1020 mutex (\texttt{lockAux}, \texttt{lockRes} and \texttt{lockWrite}) that are used
1021 respectively to inform the main computation thread that new auxiliary results
1022 are available (lines~20-21 in \Lst{algo:ch6p2FullOverAsyncComp2} and line~29 in
1023 \Lst{algo:ch6p2FullOverAsyncComp1}), to inform the auxiliary thread that new
1024 results from the main thread are available (lines~25-26
1025 in \Lst{algo:ch6p2FullOverAsyncComp1} and line~7
1026 in \Lst{algo:ch6p2FullOverAsyncComp2}), and to perform exclusive accesses to the
1027 results from those two threads (lines~20,~24
1028 in \Lst{algo:ch6p2FullOverAsyncComp1} and 9,~13
1029 in \Lst{algo:ch6p2FullOverAsyncComp2}). Also, an additional array
1030 (\texttt{auxRes}) is required to store the results of the auxiliary computations
1031 as well as a local array for the input of the auxiliary function
1032 (\texttt{auxInput}). That last function has the same general organization as the
1033 send/receive ones, that is a global loop conditioned by the end of the global
1034 process. At each iteration in this function, the thread waits for the
1035 availability of new results produced by the main computation thread. This avoids
1036 to perform the same computations several times with the same input data.
1037 Then, input data of auxiliary computations
1038 % (as supposed here, they often
1039 % correspond to the results of the main computations, but may sometimes be
1041 is copied with a mutual exclusion mechanism. Finally, auxiliary
1042 computations are performed. When they are completed, the associated mutex is
1043 unlocked to signal the availability of those auxiliary results to the main
1044 computing thread. The main thread regularly checks this availability at the end
1045 of its iterations and takes them into account whenever this is possible.
1047 Finally, we obtain an algorithmic scheme allowing maximal overlap between
1048 CPU and GPU computations as well as communications. It is worth noticing that
1049 such scheme is also usable for systems without GPUs but 4-cores nodes.
1051 \subsection{Experimental validation}
1052 \label{sec:ch6p2expes}
1054 As in~\Sec{ch6:part1}, we validate the feasibility of our asynchronous scheme
1055 with some experiments performed with a representative example of scientific
1056 application. It is a three-dimensional version of the
1057 advection-diffusion-reaction process\index{PDE example} that models the evolution of the
1058 concentrations of two chemical species in shallow waters. As this process is
1059 dynamic in time, the simulation is performed for a given number of consecutive
1060 time steps. This implies two nested loops in the iterative process, the outer
1061 one for the time steps and the inner one for solving the problem at each time.
1062 Full details about this PDE problem can be found in~\cite{ChapNRJ2011}. That
1063 two-stage iterative process implies a few adaptations of the general scheme
1064 presented above in order to include the outer iterations over the time steps, but the
1065 inner iterative process closely follows the same scheme.
1067 We show two series of experiments performed with 16 nodes of the first cluster
1068 described in~\Sec{ch6:p1expes}. The first one deals with the comparison of
1069 synchronous and asynchronous computations. The second one is related to the use
1070 of auxiliary computations. In the context of our PDE application, they consist in
1071 the update of the Jacobian of the system.
1073 \subsubsection*{Synchronous and asynchronous computations}
1075 The first experiment allows us to check that the asynchronous behavior obtained
1076 with our scheme corresponds to the expected one according to its synchronous
1077 counterpart. So, we show in~\Fig{fig:ch6p2syncasync} the computation times of
1078 our test application in both modes for different problem sizes. The size shown
1079 is the number of discrete spatial elements on each side of the cube representing
1080 the 3D volume. Moreover, for each of these elements, there are the
1081 concentrations of the two chemical species considered. So, for example, size 30
1082 corresponds in fact to $30\times30\times30\times2$ values.
1086 \includegraphics[width=.75\columnwidth]{Chapters/chapter6/curves/syncasync.pdf}
1087 \caption{Computation times of the test application in synchronous and
1088 asynchronous modes.}
1089 \label{fig:ch6p2syncasync}
1092 The results obtained show that the asynchronous version is sensibly faster than
1093 the synchronous one for smaller problem sizes, then it becomes similar or even
1094 a bit slower for larger problem sizes. A closer comparison of computation and
1095 communication times in each execution confirms that this behavior is consistent.
1096 The asynchronous version is interesting if communication time
1097 is similar or larger than computation time. In our example, this is the
1098 case up to a problem size between 50 and 60. Then, computations become longer
1099 than communications. Since asynchronous computations often require more
1100 iterations to converge, the gain obtained on the communication side becomes smaller
1101 than the overhead generated on the computation side, and the asynchronous version
1103 % So, the observed behavior is fully coherent with the expected behavior.
1105 \subsubsection*{Overlap of auxiliary computations}
1107 In this experiment, we use only the asynchronous version of the application. In
1108 the context of our test application, we have an iterative PDE solver based on
1109 Netwon resolution. Such solvers are written under the form
1110 $x=T(x),~x\in\Reals^n$ where $T(x)=x-F'(x)^{-1}F(x)$ and $F'$ is the Jacobian of
1111 the system. In such cases, it is necessary to compute the vector $\Delta x$ in
1112 $F'\times \Delta x=-F$ to update $x$ with $\Delta x$. There are two levels of
1113 iterations, the inner level to get a stabilized version of $x$, and the outer
1114 level to compute $x$ at the successive time steps in the simulation process. In
1115 this context, classical algorithms either compute $F'$ only at the first iteration
1116 of each time step or at some iterations but not all because the computation of $F'$ is done
1117 in the main iterative process and it has a relatively high computing cost.
1119 However, with the scheme presented above, it is possible to continuously compute
1120 new versions of $F'$ in parallel to the main iterative process without
1121 penalizing it. Hence, $F'$ is updated as often as possible and taken into
1122 account in the main computations when it is relevant. So, the Newton process
1123 should be accelerated a little bit.
1125 We compare the performance obtained with overlapped Jacobian updatings and
1126 non-overlapped ones for several problem sizes, see~\Fig{fig:ch6p2aux}.
1129 \includegraphics[width=.75\columnwidth]{Chapters/chapter6/curves/recouvs.pdf}
1130 \caption{Computation times with or without overlap of Jacobian updatings
1131 in asynchronous mode.}
1132 \label{fig:ch6p2aux}
1135 The overlap is clearly efficient as the computation times with overlapping
1136 Jacobian updatings are much better than without overlap. Moreover, the ratio
1137 between the two versions tend to increase with the problem size, which is as
1138 expected. Also, we have tested the application without auxiliary computations at
1139 all, that is, the Jacobian is computed only once at the beginning of each time
1140 step of the simulation. The results for this last version are quite similar to
1141 the overlapped auxiliary computations, and even better for small problem sizes.
1142 The fact that no sensible gain can be seen on this range of problem sizes is due
1143 to the limited number of Jacobian updates taken into account in the main
1144 computation. This happens when the Jacobian update is as long as
1145 several iterations of the main process. So, the benefit is reduced in this
1148 Those results show two things; first, auxiliary computations do not induce great
1149 overhead in the whole process. Second, for this particular application the
1150 choice of updating the Jacobian matrix as auxiliary computations does not speed
1151 up the iterative process. This does not question the parallel scheme in itself
1152 but merely points out the difficulty to identify relevant auxiliary
1153 computations. Indeed, this identification depends on the considered application
1154 and requires a profound specialized analysis.
1156 Another interesting choice could be the computation of load estimation for
1157 dynamic load balancing, especially in decentralized diffusion strategies where
1158 loads are transferred between neighboring nodes~\cite{BCVG11}. In such case,
1159 the load evaluation and the comparison with other nodes can be done in parallel
1160 of the main computations without perturbing them.
1162 %%% Local Variables:
1165 %%% ispell-dictionary: "american"
1167 %%% TeX-master: "../../BookGPU.tex"