In Part 1 of this series we introduced a number of basic Cloud Haskell communication patterns. All these patterns will work fine for modest numbers of work items; in this blog post we're going to look at the performance and memory use for much larger numbers of work items, so we can see how each pattern scales up. We will see that scalability improves as we move from master-slave to work-pushing to work-stealing, and that many further improvements can still be made. More importantly, we hope to help you to predict runtime behaviour from your code, so that you can write more scalable and high-performance code.

Predicting resource use and performance

When thinking about Cloud Haskell performance you should consider things such as

  • Bandwidth. How many messages are you sending, and how large are those messages? Network bandwidth is typically several orders of magnitudes lower than direct memory access, so it is important to be aware of the cost of messaging. In fact, an important design criterion in the development of Cloud Haskell was that this cost should be visible to the programmer.
  • Latency. It takes time for even the smallest message to arrive at a remote node. Synchronous operations wait for an acknowledgement from the remote endpoint and hence imply a full network roundtrip. Hence synchronous operations are often much slower than asynchronous operations, which fire off a message to the remote endpoint but don't wait to see when it arrives. For example, on many networks the bandwidth and latency are such that sending a hundred small messages in quick succession and waiting for one reply will take about the same time as sending just one message and waiting for a reply. The Cloud Haskell documentation will tell you which primitives are synchronous and which are not, but often you should be able to guess based on the result type. For example, send is asynchronous (result type ()), but spawn is synchronous: it sends a Closure to the remote endpoint, and then waits for the remote endpoint to reply with the process ID of the newly created process. (There is also an asynchronous version of spawn, as we will see below).
  • Message Queue Size. When messages are sent to a process they will sit in that process's message queue (or "mailbox") until they are processed. These messages are kept in memory. How much memory this uses depends on the number of messages and their size.
  • Number of Processes. Cloud Haskell processes are implemented as Haskell threads plus some additional data. They are therefore relatively lightweight, but not free.

You should try and make a mental comparison between the Master-Slave, Work-Pushing and Work-Stealing examples before reading the analysis, below.

The Master process in the Master-Slave setup

In the discussion of the master-slave setup we mentioned that spawning a separate process for each subcomputation up-front might be too resource intensive on the client side. What we didn't mention, however, is that the master process we defined there is also resource hungry.

Spawn Strategies

Heap profile of the master process in the Master-Slave example Single slave, factorizing first n = 5000 numbers

1. As defined above

2. Using reconnect

3. Using spawnAsync

It is obvious from the heap profile that the master process has a memory leak. But where is that coming from? The problem arises when we spawn the child processes:

spawnLocal $
  forM_ (zip [1 .. n] (cycle slaves)) $ \(m, there) -> 
    spawn there ($(mkClosure 'slave) (us, m))

In order to deal with network failure at any point during the spawning process, spawning involves sending a message to the newly started remote process that says "it's okay to start running now". In order to provide reliable and ordered message passing, Cloud Haskell must maintain some state for every destination that a process sends messages to. This means that for the master process it must maintain state for n outgoing connections, since it starts a process for each subcomputation.

In a future version of Cloud Haskell this problem might be solved automatically, but for now we can solve it manually by using the reconnect primitive. The reconnect primitive is primarily meant to be used in situations where processes get disconnected from each other, but as a side effect it lets us release the connection state, and that's the effect we need for our workaround here:

spawnLocal $
  forM_ (zip [1 .. n] (cycle slaves)) $ \(m, there) -> do
    them <- spawn there ($(mkClosure 'slave) (us, m))
    reconnect them

This yields heap profile (2), above, and we see that the memory leak has disappeared.

However, there is another problem with this master. If we compare performance, we find that it is more than an order of magnitude slower than the master process in the work-pushing version. The reason is that spawn is synchronous: we wait for the remote node to tell us the process ID of the newly started process. That means that the execution of the master looks something like

spawn first process
wait for reply
spawn second process
wait for reply

Synchronous message passing is much slower than asynchronous message passing, which is why the work-pushing master is so much faster. However, spawn is actually implemented in terms of spawnAsync, which we can use directly:

spawnLocal $
  forM_ (zip [1 .. n] (cycle slaves)) $ \(m, there) -> do
    spawnAsync there ($(mkClosure 'slave) (us, m))
    _ <- expectTimeout 0 :: Process (Maybe DidSpawn)
    return ()

This yields a similar heap profile (profile 3, above), but is much faster. We use the expectTimeout function to check – without actually waiting – and discard DidSpawn notifications (which are notifications from the remote node that tell us the PID of new processes); this avoids a build-up in our message queue, which would show up in the heap profile as a new memory leak. For every spawnAsync we do we remove at most one DidSpawn message; any DidSpawn message that are "left over" when we finish spawning will simply be discarded because the process that did the spawning will have terminated.

Comparing the Master processes across Master-Slave, Work-Pushing and Work-Stealing

The heap profile of the Master-Slave (with the memory leak fixed) is pretty similar to the heap profile of the master process of the work-pushing and the work-stealing examples:

Master Heap Profiles

Heap profiles of the master processes Single slave, factorizing first n = 50,000 numbers

1. In the master-slave setup

2. In the work-pushing setup

3. In the work-stealing setup

Comparing the Slave processes across Master-Slave, Work-Pushing and Work-Stealing

The heap profiles of the slave processes are however more interesting:

Slave Heap Profiles

Heap profiles of the slave processes Single slave, factorizing first n = 50,000 numbers

1. In the master-slave setup

2. In the work-pushing setup

3. In the work-stealing setup

As expected, the slave in the Master-Slave setup is the most resource hungry. A large amount of memory is used up when the slave starts; these are mostly messages to the slave's node controller that tell the node which processes to spawn. The heap-profile of the work-pushing slave has a similar shape, but with a much lower peak: in this case the memory is used to store the messages (containing just an integer) from the master to the slave process.

The heap profile of the work-stealing slave looks different. The increase in size you see comes from the algorithm used: it takes more memory to compute the number of prime factors of larger numbers. Note however that this only comes to 550 kB in total; the master-slave and work-pushing slaves had the same behaviour, but the memory used by the factorization algorithm was insignificant.

Execution time

Finally, let's plot execution time versus number of slaves:

Execution time vs Number of Slaves

Execution time (in seconds) versus number of slaves

Ignoring the absolute values, we can conclude that the Master-Slave example has a lot of overhead and is significantly slower than the other two setups. The work-pushing setup is slightly faster than the work-stealing setup, probably due to the higher latency we mentioned above. Finally, speedup is much less than linear in the number of slaves; this is due to the relatively high communication overhead compared to the cost of the actual computation.

Exercise 5: Extend the time/memory comparison to the work-stealing-with-low-latency setup from Exercise 4 (from Part 1). You should find that it's is as fast as the work-pushing setup and that its heap profile is comparable to the work-stealing setup.

To be continued

In the next blog post we will see how these communication patterns can be generalized as a Map-Reduce skeleton.