Writing Simpler and Faster MPI Applications
The goal of this article is to show the similarities between user-interface (UI) events and the receipt of MPI messages, and to develop this idea into a framework or layer that can be used to abstract complexities of Message Passing Interface (MPI) applications and so simplify their development and maintenance, and by so doing improve the throughput and decrease the runtime of the application. This method circumvents the usual problems associated with running MPI applications on heterogeneous architectures or on networked computer systems where the applications are competing for resources.
MPI applications are computer programs that are written to run on multiple CPU or even multiple cores of a
single CPU. These programs use the Message Passing Interface (MPI) Communications Protocol
and are started by an MPI-specific command (usually
The MPI startup command loads the requested number of copies of the program on however many named hosts as are known to the command.
For instance, this command will start 24 copies of the
mpiexec -n 24 hostname
All 24 copies of the
hostname application may be started on the current machine, or on 24 connected CPUs;
depending upon how many individual CPUs are connected together in the system and available to the
MPI supplies every running instance with its identifying number and with the total number of program copies
running. These instance numbers are referred to as the rank in MPI terminology and run from 0 up to the
number requested (the
-n parameter in the above command). It is up to the program to decide what, if anything,
to do with these numbers. The application may be written to take the problem at hand and divide the work into
as many nodes as are available, and simply do that portion of their work (based on their rank and the total number of nodes)
and then exit. While this is undoubtedly the simplest way to make use of the MPI system, it is not necessarily
the most efficient, and certainly not the only one.
In addition to the rank and total number described above, MPI allows one instance to communicate with another
instance using the MPI_send and MPI_receive methods (I'll stick to these in this paper and only recommend the use of
MPI_Probe as a method to achieve even better throughput).
These communications are point-to-point rendezvous where
one instance sends a message to another instance by number. The sending node sends a message of a specific
type to a receiving node which must have the matching receive function call. The receiver may be set to
receive the message from any node in the system (a special MPI flag
MPI_ANY_SOURCE) or from one
specific node, but the message types must match
[Kendall] (the use of MPI_Probe with ).
MPI also allows one node to broadcast a message to all the other nodes with the MPI_bcast [LLNL] method. All the other instances rendezvous with the sender and receive a copy of the message.
There are additional MPI methods for sending messages, some of which are also blocking, and some of which are
not. The non-blocking methods include the
The difficulty with using these is the
requirement for the application to periodically check to see if the message is properly sent before destroying
the outgoing message or reclaiming that memory for other purposes.
If the sender is rank n of Figure 1, and the receiver rank m, and the sender sends before the receiver has reached that particular rendezvous in the code for blocking or synchronous communications, the sending application waits until the message is delivered. If the receiver is rank n, and it reaches that part of the code before rank m sends the message, it waits until the message is delivered. In either case, this can lead to inefficiencies while nodes wait for the rendezvous to complete. The MPI alternative to synchronized communications is asynchronous, but this increases the complexity of the application, which must periodically check the status of the transmission.
Communications between instances running on different cores of one CPU are much faster than those going across the network to other instances of the application [KNEM]. This communications heterogeneity between separate CPUs or between cores on a single CPU is one reason why simply dividing the work across all instances and foregoing communications altogether is often preferred over synchronized communications. It also restricts the type of problem that is generally solved by using MPI in the first place; problems that can statically perform a divide and conquer approach.
UI and MPI Commonality
User Interface (UI) frameworks are not unlike MPI communications in that a message may arrive at any time. In UI frameworks like Qt [Qt], the application registers delegates (methods or functions) that the framework invokes when it receives events from the user, internal timers, or file system operations. When the event happens, the framework invokes the previously registered method and passes it the event information and waits for the thread of control to be returned. This is a standard architectural design principle or software pattern commonly used in Object Oriented Programming [Fowler]. This keeps the user application from having to check and wait for events - the framework does this. (It also means that the framework may seem to "lock-up" while the user application is doing something that takes longer than anticipated.) This approach effectively gives overall control of the execution to the framework until some previously specified event causes the framework to return control to the application; usually for termination. This is called Inversion of Control (IoC) [Busoli] because the framework is seen to be in control instead of the user application. This same thing is also done for the UI and networking parts of mobile phone applications [Haller].
MPI application are like UI applications in that they can receive asynchronous messages, but without a framework to simplify the processing of events, the programmer must code the program to check for the receipt of incoming messages. Messages should be processed in the order in which they are received (there is limited buffering of incoming messages), forcing the application to be written with the added complexity of knowing what messages will arrive and in which order. [MPI]. Unfortunately, this means the MPI application that wants to communicate with another instance of itself running on another CPU or core must already be programmed to send and receive that message at the correct time between these two communicating instances. This makes traditional MPI programming more complex than writing UI applications and effectively limits their complexity.
The Proposed Framework
The framework suggested here is one that removes the complexity of the MPI communications from the user and places this in a separate layer to which the user application is linked or bound. The user application simply registers methods or functions to be invoked when messages of specific types arrive (just as is done with UI frameworks -- see Figure 2) and then the application calls the start method of the framework and waits on the framework to notify it of incoming messages or events. The framework also provides the application with a method for sending a message to another instance, and one for stopping the framework. This is all the application needs to know: it registers methods for when certain message types arrive and then starts the framework and waits to process the received messages.
When the framework starts on any node it gathers the capabilities of the current CPU (network name, amount of memory available, number of cores, &c.) and sends this information to the control node (rank 0). This is depicted graphically in Figure 3. The application on the control node may use this information for determining how much memory is being shared between the instances on named CPUs, or for other purposes. When started on the control node, the framework also invokes any registered startup method of the user application.
The startup method and the method to receive the
CAPABILITIES are both optional parameters supplied to
the framework on startup. Neither is required, but without one of them, the framework will sit idle until
some other event starts the communications (from a separate thread, for instance.) When the
message is received (see Figure 4), the registered method will be invoked and given the data and the
number of the node which sent it.
The framework allows the user application to send any type of registered message to any instance at any time without any prior coordination. The only requirement is that both the sender and the receiver have registered a callback (delegate) for this type of message. This removes much of the complexity of writing MPI applications, and it simplifies maintenance, but it also makes the application run faster as I will show.
Applications written this way are basically in a
WAIT state until a message arrives. When the message
arrives, the framework checks the message against its list of registered callback methods, selects the
appropriate one, and invokes it with the message and the rank of the sending node as arguments. When
that method is done, the framework waits for the receipt of the next message. When executing one of
these registered methods, the application may also send messages to other nodes.
When the framework sends messages to other nodes, it sends the prefix message which may indicate there is a packet of data following, or it may be a simple notice that requires no additional data (this requirement can be relaxed with judicious use of MPI_Probe, noted earlier but not used in the accompanying demonstration code). Notices are messages that contain no data and are used for things like requests for more work. When the framework receives the prefix message it checks this against its list of registered notices and either invokes the registered method or waits for a following data packet from that node (see Figure 5.)
The application sends a message to another node by calling the framework message sending method. That method checks to see if the message to send is a string (a simple notice) or an object, gets the appropriate number, and sends the information. This is shown in Figure 6.
A node may receive a message that contains work to be done, and when done it may send back a request for more work before returning control to the framework (as shown in Figure 7.) With this scenario and carefully allotted work details, the node doing the work may be kept busy and only given more work when it has completed its earlier quota - no other scheduling or work leveling may be required.
The complexity of the MPI communications is pushed to the framework and largely removed from the user
application, but the complexity still exists. For the framework to allow any node to send a message of
any registered type to any other node without prior coordination requires the framework to send a small
control or prefix message to the recipient node telling the framework on that node to switch over to only
receiving a message of a specific type from this sending node, if data follows. The framework always
waits for this small control message from any node in the system. This message contains the type of
message that may follow from this sending node, or may be a notice (like a
WORK_REQUEST) that has no
data, in which case the registered method for this simple message is invoked.
Using this framework means that the user application may be based around a very simple state machine as
in Figures 7, 8, and 9.
WORK arrives, the registered method is invoked, and when done, the node sends
WORK_REQUEST back to the sender. When the node receives the
WORK_REQUEST it may send the next bit of
WORK to the source (see Figure 6).
The very tight coupling of the application between nodes and messages
is all but eliminated, as each thread of execution based on the registered methods can be largely
implemented in isolation from the other threads of execution (see Figure 8).
This allows the application to be split up into individual threads of execution based solely on the receipt of and required processing of messages. The user application no longer needs to be overly concerned about traditional communication rendezvous (as shown in Figure 1). This also allows an instance to perform any specific function, or even change overall function during the program execution. For instance, nodes may be selected for handling I/O operations and buffering based on their proximity to other nodes that will be using those services. An instance may designate an arbitrary number of nodes to a particular function simply by sending them the appropriate message - usually from the control node, which was initially supplied with the capabilities of all the nodes and can determine things like which nodes are on the same physical CPU.
The framework only needs to supply the following methods:
- init (required for C, but implicit in the python)
- register_method_for_message_type (may or may not include data)
- stop (only actioned from rank 0)
- finalize (required for C, but implicit in python)
The example framework
also implements a broadcast_message, but unlike the standard
method which requires a rendezvous of all nodes, this simply sends the message to all the other nodes,
which will process it when they next enter the
Two applications were created to show the cost of sending the additional control messages between an arbitrary numbers of nodes, and to demonstrate the impact these have on throughput. The applications share most of their code, and only differ in how they handle the MPI communications: the first uses a traditional method of dividing the work into the number of available worker nodes, whereas the second sends work to the worker nodes on request. These two applications together with the framework and related files are all available on the github site previously referenced.
These two test programs implement a very simple version of Kirchhoff 3D Time Migration [DMT]; spreading randomly generated columns of data through an image volume. The columns of data are generated initially by the control node and distributed to the worker nodes. The algorithm spreads the data through the cube in circular fashion like expanding spheres from an (x,y) point on the surface. This migration algorithm was chosen because of the large amount of computation required at each worker node relative to the amount of data sent between the control node and the worker node. Less computationally intensive algorithms would tend to be more limited by I/O bounds rather than by CPU performance.
TestDirect application creates the worklist on the control node (rank 0) and then sends the data to
the other nodes, resulting in an even distribution of work. When all the work is distributed, the control
node sends an empty packet of data to each worker indicating all the data is distributed and the worker
may begin processing the received work. This could have been done with an MPI broadcast message as well.
Except for the extra packet (one per worker node) to indicate the end of the data, there are no extra data
or message packets. And after distributing all the work, the control node switches to receiving the
resultant images from the worker nodes. The received images are summed together to produce the final
image volume before the application terminates.
TestFramework application creates the same worklist and sends one piece of work at a time to the
worker nodes. When a worker node is ready for more work it sends a
WORK_REQUEST message to the control
node and waits for the next column of data. When it receives this work it processes it and asks for more
work by sending another request back to the control node. This continues until no more work is available,
at which point the control node sends a request for the worker to send the result with the
message. It sums the individual image volumes the same as for the
TestDirect application (as shown in
TestFramework operations are all centered on a WAIT state. The control node sends one piece of work
to all the worker nodes to start the process going, and then it too enters the
WAIT state. If the node
receives a column of data (the
WORK message) it processes it, requests more work (
returns to the
WAIT state. If the node receives a
WORK_REQUEST it checks the list and either sends a
column of data to the requesting node or sends a request for its image to be returned to the control
IMAGE_REQUEST notice) if no more work is available, and then it enters the
WAIT state. If the
node receives a request for the image, it sends all pieces of the image back to the requesting node and
then enters the
WAIT state. If the node receives the pieces of the image, it sums them into the final
product, and if more work is still being done, it enters the
WAIT state; otherwise it sends the shut-down
message to all the nodes.
TestDirect application, the framework used in the
TestFramework application sends a control or
prefix message prior to the data message, and another control message as the
WORK_REQUEST, as well as the
control messages before each of the columns of data in the resultant image. This is two extra messages for
each data column for the worker to process and one extra for each of the resultant columns of the image
going back to the control node - almost twice as many messages as are used in the
For instance, when the
TestDirect processes 1000 columns of data on 16 nodes (15 worker nodes), there are
1015 total messages sent from the control node to the workers, together with 800*12 messages (the example
size used for these tests - 9600 columns in the resultant image) from each of the 15 worker nodes (144,000
image columns) for a total of 145,015 individual MPI messages. The
TestFramework application sends the
1000 data columns together with 1000 control messages in response to the receipt of 985
messages. It also sends the 144,000 messages back to control but with an additional 144,000 control
messages for a total of 289,985 individual MPI messages. This is 145,015 messages versus 289,985. This
is a non-trivial additional amount of MPI messages for the
TestFramework application (nearly twice the
number of messages as used in the
If we wished to change the algorithm, the
TestDirect application would be difficult to modify, probably
requiring a complete rewrite of the main routine that sends and receives the messages to add additional
message types or other coordination messages. The
TestFramework application could easily be modified to
add additional message types together with whatever processing would be required for these new messages
by simply registering more methods to be invoked when the new messages are received, and to add whatever
code is required to generate the new messages. Since the application is being run from the
the messages are not restricted to being generated or processed on any specific node (control or worker).
The two applications were each run on a desktop computer with a shared "home directory" on a business network. The applications made use of two machines referred to as "machine1" and "machine2". Each physical machine has 8 (SMP [Anderson]) cores for a total of 16. Both machines were running the Ubuntu 14.04 LTS Operating System. For these tests, the applications were started on "machine1" with this command:
time; mpiexec -n 16 -host machine1,machine2 TestXXX.py
This mpiexec command spreads the application across the two machines with 8 instances on each machine for one instance per core. These applications were run in isolation, in competition with each other, and in competition with another MPI application that was only running on one of the machines. The resultant execution times were recorded for each of these scenarios to make the following graphs.
This first figure here (Figure 10) shows the execution times for the applications running independently.
In the absence of any appreciable competition for resources, the
TestFramework application is significantly
faster than the
TestDirect for worklist lengths in excess of 1000. This is because the
application, despite using fewer MPI messages, doesn't allow for differences in communication speed
between nodes on the same CPU and nodes across the network. MPI is extremely fast in sending messages
between nodes on the same machine, but significantly slower when sending messages between different
networked computers. The
TestFramework application naturally adjusts itself on each node because each
node requests work when it's ready for more. It also caters for different processing speeds and network
capabilities, which is completely absent from the
TestDirect worklist division.
These tests were all run on the same two machines, and the variability is likely due to other network traffic or occasional use by others. The tests for each worklist length were run consecutively to avoid differences associated with network use at different times of the day. Many of the tests were run on weekends, evenings, and very early mornings to avoid interference from others using the network.
This second figure (Figure 11) shows the execution times for the applications running at the same time
and competing for resources. In this case, the
TestDirect has a slight advantage because once the work
is distributed, it does fewer operations that might cause the Operating System to do a context switch in
algorithm and switch to running the other application. This advantage is very
slight but noticeable for short worklists - see where the execution time for the framework is greater than
that for the direct method. For larger worklists the
TestFramework is surprisingly and consistently faster.
These two figures (Figure 12 and Figure 13)
show the execution times for the applications running at the
same time and competing with a third application (another copy of the
TestDirect program) which is running
on only one of the two machines. The top one has the extra application on "Machine2" and the lower has it
on "Machine1?. The
TestFramework successfully caters for resource contention on the available network and
has a much better throughput than the
TestDirect which has no mechanism for redistributing the workload.
And this difference becomes more noticeable as the amount of work increases.
Discussion and Conclusion
This work demonstrates the similarities between UI events and the receipt of MPI messages. It shows how this similarity naturally leads to the construction of a framework or layer to abstract the complexity of the MPI primitives and communications (much as is done for UI frameworks monitoring signals and events), and how an MPI application can be designed simply and executed efficiently in a heterogeneous environment that may be shared with other processes that may degrade the performance of MPI applications designed using traditional methods. This framework-based method allows the application to grow in complexity with additional message types and distributed functionality, opening up the use of MPI on networked desktop machines or clusters for solving a large variety of problems. The framework approach simplifies the construction of MPI applications in the same manner as UI frameworks simplify the construction of desktop applications.
The example framework on the github site is a rudimentary one to serve as a basis and for training, and is available for expanding into a production-ready library. Some of the code is also available here:
- MPI_IoC.py - the framework itself
- TestFramework.py - the test application that uses the framework
- TestDirect.py - the test application that doesn't use the framework
- TestMessage.py - the test message used by both applications
These are just a few references for learning more about the topics. This paper is an original work and not an extension of other models or papers.