Haskell training in San Francisco and New York
Wed, 24 Apr 2013 22:28:11 GMT, by duncan.
Filed under training.
We are offering Haskell courses in San Francisco and New York in early June.
They are for professional developers who want to learn Haskell or improve their skills. There is a 2-day introductory course and a 2-day advanced course.
Locations, dates and registration
Well-Typed are running these courses in partnership with FP Complete and Skills Matter.
San Francisco Bay Area
- Introductory Course: June 4-5th, 2013
- Advanced Course: June 6-7th, 2013
- $1300 for each course, with 15% discount if you book both
Register via FP Complete.
New York
- Introductory Course: June 10-11th, 2013
- Advanced Course: June 12-13th, 2013
- $1495 for each course, with 15% discount before April 29th
Register via Skills Matter: introductory course, or advanced course.
InfoQ interview with Duncan on Parallelism, Concurrency and Distributed Programming
Thu, 14 Mar 2013 22:52:20 GMT, by duncan.
Filed under well-typed, parallel.
I was at the Tech Mesh London conference at the end of last year giving a talk about Haskell's approach to parallel programming. While I was there I did an interview with Werner Schuster from InfoQ which is now up on their site.
Duncan Coutts on Parallelism and Concurrency with Haskell, Distributed Programming with Cloud Haskell
Duncan Coutts explains the nature of Concurrency and Parallelism in Haskell, its threading and STM implementation, Erlang OTP's influence on Cloud Haskell for distributed programming, Monads, and more.
Of course since I was at the conference to tell everyone about deterministic parallelism in Haskell then we talked about that a bit, and how that is different from concurrency.
While we were talking about STM I went off on a bit of a rant about Node.js, async I/O and how we do sequential imperative programming so much better in Haskell. Hopefully people find that interesting. It's a topic that I think doesn't get enough attention.
We also touched on Cloud Haskell and Werner put me on the spot by asking what my favourite monad is.
My thanks to Werner for an interesting chat, and to the people at InfoQ for the A/V and other technical work.
Upcoming Haskell training courses
Wed, 23 Jan 2013 22:07:02 GMT, by duncan.
Filed under well-typed.
Well-Typed is running Haskell courses in London, targeting professional developers who want to learn Haskell or improve their skills.
The next courses are in the first week of February, and then again in April and July.
Fast Track to Haskell
4-5 February 2013
A two-day introduction to Haskell assuming no previous Haskell or functional programming experience. Covers Haskell syntax, how to define functions and datatypes, dealing with IO, and monads.
Advanced Haskell
6-7 February 2013
A two-day course for people with basic Haskell experience who want to take their Haskell skills to the next level. Covers functional data structures, understanding laziness and performance, profiling Haskell programs, concurrency and parallelism, programming patterns and type-level programming.
The courses are designed such that they can be taken both individually or in sequence.
Registration for both courses is open and there are discounts for early bookings. We hope to see many of you there.
Deterministic Parallel Programming with Haskell
Sat, 08 Dec 2012 17:01:16 GMT, by duncan.
Filed under parallel.
This week I gave a talk at the Tech Mesh conference:
Multi-core with less Pain: Deterministic Parallel Programming with Haskell.
- Slides (pdf)
- (video will be available later)
Abstract:
You want to make your code run faster, so you think about changing it to run in parallel to use multiple cores. But most approaches to parallelism also force you to rewrite your program using explicit concurrency which means you'll always worry about race conditions, deadlocks and other concurrency bugs. Deterministic parallelism frees you from concurrency bugs and gives a strong guarantee that your program remains deterministic, so it will give the same result no matter the number of cores or the scheduling being used.
Haskell offers a variety of libraries for deterministic parallelism that enable concise high-level parallel programs. In this talk we will look at the idea of parallelism without concurrency and give an overview of the paradigms that Haskell offers in this area.
The main point I was making is that we can achieve parallel speedups, without ourselves having to write explicitly concurrent programs. For a mainstream audience this is often a rather surprising and novel idea.
The talk was based on an article that Andres and I wrote for the Computing in Science & Engineering Magazine (CiSE). As you can guess from the name, CiSE is aimed at an audience of scientists and engineers. CiSE ran a special issue on parallelism and concurrency in modern programming languages. They got articles on approaches in different languages (Clojure, Erlang, and Haskell). The guest editors gave the authors the problem of parallelising a simple numerical solver. You can read the guest editors' introduction to the special issue.
For those of you not subscribed to CiSE magazine, we are making our article available
Funnily enough, I'm still waiting for my copy of CiSE to arrive, so I've not yet read the articles on Clojure and Erlang. I'll be interested to see how the other authors solved the same problem. The tricky aspect of the problem is that it has very fine grained parallelism.
When we looked at the problem, we decided it was a good fit for data parallelism, and we wrote a solution using Repa. Repa handles the granularity issue for us automatically by chunking up the array across cores. If we'd used a more manual technique we would have had to do quite a bit of chunking ourselves to get the granularity right, and that would have added more code that could have obscured the algorithm. As it was, the core of our optimised Repa version was only 5 lines long and pretty readable. (The code snippets are in the article and the full code is available on the CiSE website.)
We got performance results that are fairly competitive with C (using gcc and OpenMP, see the article for the details). The competitive side of me is also interested to find out how we compare on performance with the other solutions. After all, parallelism is about performance.
Communication Patterns in Cloud Haskell (Part 4)
Mon, 15 Oct 2012 14:52:25 GMT, by edsko.
Filed under parallel, cloud-haskell.
K-Means
In Part 3 of this series we showed how to write a simple distributed implementation of Map-Reduce using Cloud Haskell. In this final part of the series we will explain the K-Means algorithm and show how it can be implemented in terms of Map-Reduce.
K-Means is an algorithm to partition a set of points into n clusters. The algorithm iterates the following two steps for a fixed number of times (or until convergence):
- Given a set of points and n cluster centres, associate each point with the cluster centre it is nearest to.
- Compute the centre of each new cluster.
The initial cluster centres can be chosen randomly. Here is one example run of the first 5 iterations of the algorithm on randomly (evenly) distributed two-dimensional points:

Of course, in such an evenly distributed data set the clusters that are "discovered" are more or less arbitrary and heavily influenced by the choice of initial centers. For example, here is another run of the algorithm with different initial centers:

K-Means as a Map-Reduce skeleton
We will use Map-Reduce to implement a single iteration of the K-Means algorithm. Each mapper node will execute step (1) of the algorithm for a subset of the points, and in the reduction step we will compute the new cluster centres.
type Point = (Double, Double)
type Cluster = (Double, Double)
average :: Fractional a => [a] -> a
average xs = sum xs / fromIntegral (length xs)
distanceSq :: Point -> Point -> Double
distanceSq (x1, y1) (x2, y2) = a * a + b * b
where
a = x2 - x1
b = y2 - y1
nearest :: Point -> [Cluster] -> Cluster
nearest p = minimumBy (compare `on` distanceSq p)
center :: [Point] -> Point
center ps = let (xs, ys) = unzip ps in (average xs, average ys)
kmeans :: Array Int Point -> MapReduce (Int, Int) [Cluster] Cluster Point ([Point], Point)
kmeans points = MapReduce {
mrMap = \(lo, hi) cs -> [ let p = points ! i in (nearest p cs, p)
| i <- [lo .. hi]
]
, mrReduce = \_ ps -> (ps, center ps)
}
We start with a Map (Int, Int) [Cluster]; the keys in this map correspond to
the segmentation of the input set; for instance, a key (20, 39) indicates
that a mapper node should compute clusters for points [20 .. 39]. The values
in this map are the current center (that is, every key in the map has the same
value).
The mappers then compute a list [(Cluster, Point)] which associates points
with each cluster. Finally, in the reduction step we create a Map Cluster
([Point], Point) which tells us for each cluster its set of points and its
centre.
Iterating locally
The Map-Reduce skeleton only computes a single iteration of the algorithm; we
need to iterate this a number of steps to implement the full algorithm. Using
the localMapReduce we defined above we can do this as
localKMeans :: Array Int Point
-> [Cluster]
-> Int
-> Map Cluster ([Point], Point)
localKMeans points cs iterations = go (iterations - 1)
where
mr :: [Cluster] -> Map Cluster ([Point], Point)
mr = localMapReduce (kmeans points) . trivialSegmentation
go :: Int -> Map Cluster ([Point], Point)
go 0 = mr cs
go n = mr . map snd . Map.elems . go $ n - 1
trivialSegmentation :: [Cluster] -> Map (Int, Int) [Cluster]
trivialSegmentation cs' = Map.fromList [(bounds points, cs')]
For the local implementation we don't care about how we partition the input, so we just create a trivial segmentation and have one mapper process the entire input.
Generalizing the distributed Map-Reduce implementation
The set of points itself does not vary from one iteration of the algorithm to another, and only needs to be distributed to the mapper nodes once. The master process of our Map-Reduce implementation from Part 3 however looks like
- Initialize the mappers
- Run the Map-Reduce query
- Terminate the mappers
This means that if we use distrMapReduce as implemented we will re-distribute the
full set of points to the mapper nodes on every iteration of the algorithm. To
avoid this, we can generalize the Map-Reduce implementation to be
- Initialize the mappers
- Run as many Map-Reduce queries as necessary
- Terminate the mappers
We will change the type of distrMapReduce to
distrMapReduce :: Closure (MapReduce (Point, Point) [Cluster] Cluster
Point ([Point], Point))
-> [NodeId]
-> ((Map (Point, Point) [Cluster] ->
Process (Map Cluster ([Point], Point))) -> Process a)
-> Process a
In distrMapReduce mrClosure mappers p the process p is provided with a
means to run map-reduce queries; compare the type of distrMapReduce to the
types of functions such as
withFile :: FilePath -> IOMode -> (Handle -> IO r) -> IO r
Exercise 8: Implement
distrMapReducewith the type above. This does not require any new Cloud Haskell concepts, but does require a bit of engineering. (You can find the implementation also in thedistributed-process-demospackage).
Note that even with this generalization we will pass the entire set of points to all the nodes, even though each node will only operate on a subset of them; we leave optimizing this as an exercise to the reader (it will require a further generalization of the Map-Reduce driver).
Polymorphism
In the section above we changed the type of distrMapReduce to match the type
of the K-Means MapReduce skeleton instead of the word-counting MapReduce
skeleton; we can change that type without changing the implementation at all.
What we really want, of course, is a polymorphic implementation of Map-Reduce:
distrMapReduce :: (Serializable k1, Serializable v1, Serializable k2,
Serializable v2, Serializable v3, Ord k2)
=> Closure (MapReduce k1 v1 k2 v2 v3)
-> [NodeId]
-> ((Map k1 v1 -> Process (Map k2 v3)) -> Process a)
-> Process a
However, when we try to generalize our code above we run into a problem.
Consider the code that we ship to the mapper nodes. What does this code need to
do? First, it needs to wait for a message of a specific type. In order to do
the type matching it needs some information about the type (k1, v1). Once it
receives such a message, it needs to send the list of type [(k2,v2)] created
by the Map function back to the master. In order to do that, it needs to know
how to serialize values of type [(k2,v2)].
Where does distrMapReduce get this information? Well, it is provided by the
Serializable type class constraints. Unfortunately, however, Haskell does not
give an explicit handle on these arguments, much less provide us with a way to
serialize these arguments so that we can ship them to the mapper nodes. We can,
however, reify a type class constraint as an explicit dictionary:
data SerializableDict a where
SerializableDict :: Serializable a => SerializableDict a
We cannot serialize objects of type SerializableDict directly, but we can
serialise static SerializableDicts! Hence, the type of distrMapReduce
becomes:
distrMapReduce :: forall k1 k2 v1 v2 v3 a.
(Serializable k1, Serializable v1, Serializable k2,
Serializable v2, Serializable v3, Ord k2)
=> Static (SerializableDict (k1, v1))
-> Static (SerializableDict [(k2, v2)])
-> Closure (MapReduce k1 v1 k2 v2 v3)
-> [NodeId]
-> ((Map k1 v1 -> Process (Map k2 v3)) -> Process a)
-> Process a
This type may look a bit intimidating, but really all that has changed is that
we require static type information so that we can ship this type information
to the mappers. We omit the implementation; you can find it in the
distributed-process-demos package; the general principles are explained in
the documentation of the
distributed-static
package.
Using this polymorphic version of distrMapReduce is no more difficult than
using the monomorphic version; for example, we can implement "distributed word
counting" as
dictIn :: SerializableDict (FilePath, Document)
dictIn = SerializableDict
dictOut :: SerializableDict [(Word, Frequency)]
dictOut = SerializableDict
countWords_ :: () -> MapReduce FilePath Document Word Frequency Frequency
countWords_ () = countWords
remotable ['dictIn, 'dictOut, 'countWords_]
distrCountWords :: [NodeId] -> Map FilePath Document -> Process (Map Word Frequency)
distrCountWords mappers input =
distrMapReduce $(mkStatic 'dictIn)
$(mkStatic 'dictOut)
($(mkClosure 'countWords_) ())
mappers
(\iteration -> iteration input)
Creating the necessary SerializableDicts is easy (there is only one
constructor for SerializableDict, after all, and it doesn't take any
arguments!). Note that the word counter only calls the iteration function
once; this will not be true for the final algorithm we consider: distributed
k-means.
Distributed K-Means
The distributed K-means is not much more complicated. Everything up to and
including go pretty much follows the local implementation; the remainder
(segments, dividePoints, pointsPerMapper and numPoints) just compute
which segment of the input each mapper node is going to do.
dictIn :: SerializableDict ((Int, Int), [Cluster])
dictIn = SerializableDict
dictOut :: SerializableDict [(Cluster, Point)]
dictOut = SerializableDict
remotable ['kmeans, 'dictIn, 'dictOut]
distrKMeans :: Array Int Point
-> [Cluster]
-> [NodeId]
-> Int
-> Process (Map Cluster ([Point], Point))
distrKMeans points cs mappers iterations =
distrMapReduce $(mkStatic 'dictIn)
$(mkStatic 'dictOut)
($(mkClosure 'kmeans) points)
mappers
(go (iterations - 1))
where
go :: Int
-> (Map (Int, Int) [Cluster] -> Process (Map Cluster ([Point], Point)))
-> Process (Map Cluster ([Point], Point))
go 0 iteration = do
iteration (Map.fromList $ map (, cs) segments)
go n iteration = do
clusters <- go (n - 1) iteration
let centers = map snd $ Map.elems clusters
iteration (Map.fromList $ map (, centers) segments)
segments :: [(Int, Int)]
segments = let (lo, _) = bounds points in dividePoints numPoints lo
dividePoints :: Int -> Int -> [(Int, Int)]
dividePoints pointsLeft offset
| pointsLeft <= pointsPerMapper = [(offset, offset + pointsLeft - 1)]
| otherwise = let offset' = offset + pointsPerMapper in
(offset, offset' - 1)
: dividePoints (pointsLeft - pointsPerMapper) offset'
pointsPerMapper :: Int
pointsPerMapper =
ceiling (toRational numPoints / toRational (length mappers))
numPoints :: Int
numPoints = let (lo, hi) = bounds points in hi - lo + 1
Exercise 9: In this example we create precisely enough segments that every mapper nodes gets a single segment. We could have created more or fewer segments. Why is creating one segment per mapper node the optimal choice for this algorithm? In which case might creating more segments be more efficient?
Exercise 10: The data sent back from the mapper nodes to the master node contains a lot of redundancy. How might you improve that?
Profiling
To conclude we briefly look at the heap profiles of the master and the slave nodes of our distributed k-means example.

Heap profile (by type) of the Map-Reduce master running K-Means Single slave, clustering 50,000 points, 5 iterations
In this example the master process first creates a random set of 50,000 points, before running the K-means algorithm. This is the first peak. Then the other 5 peaks are the master node collecting data from the mapper nodes before reducing them locally.
Exercise 11. In the Work-Stealing example from Part 1 of this series the master reduces as it receives messages (
sumIntegers). Might you do something similar in the Map-Reduce master?

Heap profile (by type) of the Map-Reduce slave running K-Means Single slave, clustering 50,000 points, 5 iterations
In the slave node too, we clearly see the 5 iterations of the algorithm. At the start of each iteration the mapper node creates a list that associates points to clusters. Once the list is created, the list is serialized as a bytestring (purple section in the profile) and sent to the master node before the cycle starts again.
Exercise 12: (More technical) Why is the entire list created before it is serialized to a bytestring? (Hint: look up the
Binaryinstance for lists.) How might you change this so that the list is created lazily as it is serialized?
More information
We hope you've enjoyed this series (Parts 1, 2, 3, 4). For links to more information see the Cloud Haskell home page.
Communication Patterns in Cloud Haskell (Part 3)
Fri, 12 Oct 2012 10:59:51 GMT, by edsko.
Filed under parallel, cloud-haskell.
Map-Reduce
In Part 1 and Part 2 of this series we described a number of ways in which we might compute the number of prime factors of a set of numbers in a distributed manner. Abstractly, these examples can be described as
a problem (computing the number of prime factors of 100 natural numbers) is split into subproblems (factorizing each number), those subproblems are solved by slave nodes, and the partial results are then combined (summing the number of factors)
This is reminiscent of Google's Map-Reduce algorithm (MapReduce: Simplified Data Processing on Large Clusters, Dean and Ghemawat, OSDI'04), with the "Map" part corresponding to the computation of partial results and the "Reduce" part corresponding to combining these results. We will explain the basic ideas behind Map-Reduce before giving a distributed implementation using work-stealing.
Local map-reduce
The exposition we give in this section is based on Google's MapReduce Programming Model -- Revisited by Ralf Laemmel (SCP 2008). The Map-Reduce algorithm transforms key-value maps; in Haskell, its type is given by
-- The type of Map-Reduce skeletons (provided by the user)
data MapReduce k1 v1 k2 v2 v3 = MapReduce {
mrMap :: k1 -> v1 -> [(k2, v2)]
, mrReduce :: k2 -> [v2] -> v3
}
-- The driver (which "executes" a Map-Reduce skeleton)
localMapReduce :: Ord k2 => MapReduce k1 v1 k2 v2 v3 -> Map k1 v1 -> Map k2 v3

We start with a key-value map (with keys of type
k1and values of typev1). With the help of the "Map" (mrMap) part of a Map-Reduce skeleton each of these key-value pairs is turned into a list of key-value pairs (with keys of typek2and values of typev2); note that this this list may (and typically does) contain multiple pairs with the same key. The Map-Reduce driver then collects all values for each key, and finally reduces this list of values (of typev2) to a single value (of typev3) using the "Reduce" (mrReduce) part of the skeleton.(Colour changes in the diagram indicate type changes.)
Exercise 6: Implement
localMapReduce. The types pretty much say what needs to be done.
Consider counting the number of words in a set of documents; that is, we want
to transform a Map FilePath Document to a Map Word Frequency. We can do
this with the following Map-Reduce skeleton:
countWords :: MapReduce FilePath Document Word Frequency Frequency
countWords = MapReduce {
mrMap = const (map (, 1) . words)
, mrReduce = const sum
}
Distributed map-reduce
We are going to have slave nodes execute the Map part of the algorithm; we will
use work-stealing to distribute the work amongst the slaves. We are not going
to distribute the Reduce part of the algorithm, but do that on a single
machine. For now we will give a monomorphic implementation of the distributed
Map-Reduce algorithm, tailored specifically for the countWords example from the
previous section. In the next post we will see how to make the
implementation polymorphic.
The implementation follows the Work-Pushing example from Part 1 very closely. The slave asks for work and executes it, using the mrMap part of the Map-Reduce skeleton:
mapperProcess :: (ProcessId, ProcessId, Closure (MapReduce String String String Int Int))
-> Process ()
mapperProcess (master, workQueue, mrClosure) = do
us <- getSelfPid
mr <- unClosure mrClosure
go us mr
where
go us mr = do
-- Ask the queue for work
send workQueue us
-- Wait for a reply; if there is work, do it and repeat; otherwise, exit
receiveWait
[ match $ \(key, val) -> send master (mrMap mr key val) >> go us mr
, match $ \() -> return ()
]
remotable ['mapperProcess]
Note that the slave wants a Closure of a Map-Reduce skeleton; since the Map-Reduce skeleton itself contains functions, it is not serializable.
The master process is
distrMapReduce :: Closure (MapReduce String String String Int Int)
-> [NodeId]
-> Map String String
-> Process (Map String Int)
distrMapReduce mrClosure mappers input = do
mr <- unClosure mrClosure
master <- getSelfPid
workQueue <- spawnLocal $ do
-- Return the next bit of work to be done
forM_ (Map.toList input) $ \(key, val) -> do
them <- expect
send them (key, val)
-- Once all the work is done tell the mappers to terminate
replicateM_ (length mappers) $ do
them <- expect
send them ()
-- Start the mappers
forM_ mappers $ \nid -> spawn nid ($(mkClosure 'mapperProcess) (master, workQueue, mrClosure))
-- Wait for the partial results
partials <- replicateM (Map.size input) expect
-- We reduce on this node
return (reducePerKey mr . groupByKey . concat $ partials)
We can now implement "distributed word counting" as
countWords_ :: () -> MapReduce FilePath Document Word Frequency Frequency countWords_ () = countWords remotable ['countWords_] distrCountWords :: [NodeId] -> Map FilePath Document -> Process (Map Word Frequency) distrCountWords = distrMapReduce ($(mkClosure 'countWords_) ())
Performance
If we use ThreadScope to look at how busy the nodes are, we find something like

with alternating activity between the master node (which does the reduction, top) and the mapper nodes (bottom). This is unsurprising, of course; we have a distributed implementation of the Map phase but reduce locally.
Exercise 7: Change the implementation so that the reduce step happens in parallel too and confirm that the work distribution is now better.
Note: as hinted at by the ThreadScope plot above, it is possible to use ThreadScope to look at distributed applications running in multiple OS processes, including on different physical hosts. Doing so is currently a somewhat manual process. In addition to the normal step of linking the application using the
-eventlogflag, and running with the flag+RTS -l -RTS, there are a few steps after the program is complete: collect the.eventlogfiles from each node; use theghc-events mergecommand to merge all the eventlog files into one (which currently has to be done in several steps because the merge command only takes pairs of files at once) and finally use ThreadScope to view the combined eventlog.
To be continued
In the next blog post we will look at how we can make the distributed Map-Reduce skeleton polymorphic. We'll also look at k-means as an example algorithm that can be implemented using Map-Reduce. In fact for efficiency, k-means requires a "multi-shot" Map-Reduce and we'll see how Haskell and Cloud Haskell give us the flexibility we need to make this kind of extension.
Communication Patterns in Cloud Haskell (Part 2)
Mon, 08 Oct 2012 21:30:26 GMT, by edsko.
Filed under parallel, cloud-haskell.
Performance
In Part 1 of this series we introduced a number of basic Cloud Haskell communication patterns. All these patterns will work fine for modest numbers of work items; in this blog post we're going to look at the performance and memory use for much larger numbers of work items, so we can see how each pattern scales up. We will see that scalability improves as we move from master-slave to work-pushing to work-stealing, and that many further improvements can still be made. More importantly, we hope to help you to predict runtime behaviour from your code, so that you can write more scalable and high-performance code.
Predicting resource use and performance
When thinking about Cloud Haskell performance you should consider things such as
- Bandwidth. How many messages are you sending, and how large are those messages? Network bandwidth is typically several orders of magnitudes lower than direct memory access, so it is important to be aware of the cost of messaging. In fact, an important design criterion in the development of Cloud Haskell was that this cost should be visible to the programmer.
- Latency. It takes time for even the smallest message to arrive at a remote
node. Synchronous operations wait for an acknowledgement from the remote
endpoint and hence imply a full network roundtrip. Hence synchronous
operations are often much slower than asynchronous operations, which fire
off a message to the remote endpoint but don't wait to see when it arrives.
For example, on many networks the bandwidth and latency are such that
sending a hundred small messages in quick succession and waiting for one reply
will take about the same time as sending just one message and waiting for a
reply.
The Cloud Haskell documentation will tell you which primitives are
synchronous and which are not, but often you should be able to guess based on the result type. For
example,
sendis asynchronous (result type()), butspawnis synchronous: it sends aClosureto the remote endpoint, and then waits for the remote endpoint to reply with the process ID of the newly created process. (There is also an asynchronous version ofspawn, as we will see below).
- Message Queue Size. When messages are sent to a process they will sit in that process's message queue (or "mailbox") until they are processed. These messages are kept in memory. How much memory this uses depends on the number of messages and their size.
- Number of Processes. Cloud Haskell processes are implemented as Haskell threads plus some additional data. They are therefore relatively lightweight, but not free.
You should try and make a mental comparison between the Master-Slave, Work-Pushing and Work-Stealing examples before reading the analysis, below.
The Master process in the Master-Slave setup
In the discussion of the master-slave setup we mentioned that spawning a separate process for each subcomputation up-front might be too resource intensive on the client side. What we didn't mention, however, is that the master process we defined there is also resource hungry.

Heap profile of the master process in the Master-Slave example Single slave, factorizing first n = 5000 numbers
1. As defined above
2. Using
reconnect3. Using
spawnAsync
It is obvious from the heap profile that the master process has a memory leak. But where is that coming from? The problem arises when we spawn the child processes:
spawnLocal $
forM_ (zip [1 .. n] (cycle slaves)) $ \(m, there) ->
spawn there ($(mkClosure 'slave) (us, m))
In order to deal with network failure at any point during the spawning process,
spawning involves sending a message to the newly started remote process that
says "it's okay to start running now". In order to provide reliable and ordered
message passing, Cloud Haskell must maintain some state for every destination
that a process sends messages to. This means that for the master process it
must maintain state for n outgoing connections, since it starts a process for
each subcomputation.
In a future version of Cloud Haskell this problem might be solved
automatically, but for now we can solve it manually by using the reconnect
primitive. The reconnect primitive is primarily meant to be used in
situations where processes get disconnected from each other, but as a side
effect it lets us release the connection state, and that's the effect we need
for our workaround here:
spawnLocal $
forM_ (zip [1 .. n] (cycle slaves)) $ \(m, there) -> do
them <- spawn there ($(mkClosure 'slave) (us, m))
reconnect them
This yields heap profile (2), above, and we see that the memory leak has disappeared.
However, there is another problem with this master. If we compare performance,
we find that it is more than an order of magnitude slower than the master
process in the work-pushing version. The reason is that spawn is synchronous:
we wait for the remote node to tell us the process ID of the newly started
process. That means that the execution of the master looks something like
spawn first process wait for reply spawn second process wait for reply ...
Synchronous message passing is much slower than asynchronous message passing,
which is why the work-pushing master is so much faster. However, spawn is
actually implemented in terms of spawnAsync, which we can use directly:
spawnLocal $
forM_ (zip [1 .. n] (cycle slaves)) $ \(m, there) -> do
spawnAsync there ($(mkClosure 'slave) (us, m))
_ <- expectTimeout 0 :: Process (Maybe DidSpawn)
return ()
This yields a similar heap profile (profile 3, above), but is much faster. We
use the expectTimeout function to check – without actually waiting – and
discard DidSpawn notifications (which are notifications from the remote node
that tell us the PID of new processes); this avoids a build-up in our message
queue, which would show up in the heap profile as a new memory leak. For every
spawnAsync we do we remove at most one DidSpawn message; any DidSpawn
message that are "left over" when we finish spawning will simply be discarded
because the process that did the spawning will have terminated.
Comparing the Master processes across Master-Slave, Work-Pushing and Work-Stealing
The heap profile of the Master-Slave (with the memory leak fixed) is pretty similar to the heap profile of the master process of the work-pushing and the work-stealing examples:

Heap profiles of the master processes Single slave, factorizing first n = 50,000 numbers
1. In the master-slave setup
2. In the work-pushing setup
3. In the work-stealing setup
Comparing the Slave processes across Master-Slave, Work-Pushing and Work-Stealing
The heap profiles of the slave processes are however more interesting:

Heap profiles of the slave processes Single slave, factorizing first n = 50,000 numbers
1. In the master-slave setup
2. In the work-pushing setup
3. In the work-stealing setup
As expected, the slave in the Master-Slave setup is the most resource hungry. A large amount of memory is used up when the slave starts; these are mostly messages to the slave's node controller that tell the node which processes to spawn. The heap-profile of the work-pushing slave has a similar shape, but with a much lower peak: in this case the memory is used to store the messages (containing just an integer) from the master to the slave process.
The heap profile of the work-stealing slave looks different. The increase in size you see comes from the algorithm used: it takes more memory to compute the number of prime factors of larger numbers. Note however that this only comes to 550 kB in total; the master-slave and work-pushing slaves had the same behaviour, but the memory used by the factorization algorithm was insignificant.
Execution time
Finally, let's plot execution time versus number of slaves:

Execution time (in seconds) versus number of slaves
Ignoring the absolute values, we can conclude that the Master-Slave example has a lot of overhead and is significantly slower than the other two setups. The work-pushing setup is slightly faster than the work-stealing setup, probably due to the higher latency we mentioned above. Finally, speedup is much less than linear in the number of slaves; this is due to the relatively high communication overhead compared to the cost of the actual computation.
Exercise 5: Extend the time/memory comparison to the work-stealing-with-low-latency setup from Exercise 4 (from Part 1). You should find that it's is as fast as the work-pushing setup and that its heap profile is comparable to the work-stealing setup.
To be continued
In the next blog post we will see how these communication patterns can be generalized as a Map-Reduce skeleton.
Communication Patterns in Cloud Haskell (Part 1)
Fri, 05 Oct 2012 13:02:56 GMT, by edsko.
Filed under parallel, cloud-haskell.
Master-Slave, Work-Stealing and Work-Pushing
In this series (2,3,4) of blog posts we will describe a number of basic communication patterns in Cloud Haskell. We don't assume much familiarity with Cloud Haskell, but it will probably be useful to be familiar with the basics; Towards Haskell in the Cloud (Epstein, Black and Peyton Jones, Haskell Symposium 2011) is a good starting point. We will start simple but we will finish with some more advanced techniques.
Source The source code for these examples is available:
cabal unpack http://well-typed.com/blog/aux/files/cloud-demos.tar.gz(The code is also available on github)
Disclaimer The examples in this blog post and in the
distributed-process-demospackage are written for educational purposes; they are not engineered for optimal performance.
Master-Slave
Master-Slave is one of the simplest communication patterns possible. A single master process spawns a bunch of slave processes to do computations on other nodes, and then combines the results.

A single master node (red) and a bunch of slave nodes (blue). A single master process spawns a bunch of slave processes, one for each subcomputation. The slave processes post the partial results on the message queue of the master node.
For example, consider summing the number of prime factors of the natural numbers 1 to 100 (why you would want do to that is anyone's guess :) We're just keeping CPUs busy). A master process can spawn a child process on remote nodes for each of the numbers in the sequence, then collect the results and return their sum. The implementation of the slave is very simple:
slave :: (ProcessId, Integer) -> Process () slave (pid, n) = send pid (numPrimeFactors n)
remotable ['slave]
Recall from Towards Haskell in the Cloud that in order to spawn a process on
a node we need something of type Closure (Process ()). In
distributed-process if f : T1 -> T2 then
$(mkClosure 'f) :: T1 -> Closure T2
That is, the first argument the function we pass to mkClosure will act as the
closure environment for that process; if you want multiple values in the
closure environment, you must tuple them up. In this case, the closure
environment will contain the process ID of the master and a natural number that
the slave process must factorize.
The master process is a bit longer but not much more complicated:
master :: Integer -> [NodeId] -> Process Integer
master n slaves = do
us <- getSelfPid
-- Spawn slave processes to compute numPrimeFactors 1 .. numPrimeFactors n
spawnLocal $
forM_ (zip [1 .. n] (cycle slaves)) $ \(m, them) ->
spawn them ($(mkClosure 'slave) (us, m))
-- Wait for the result sumIntegers (fromIntegral n)
sumIntegers :: Int -> Process Integer
sumIntegers = go 0
where
go :: Integer -> Int -> Process Integer
go !acc 0 = return acc
go !acc n = do
m <- expect
go (acc + m) (n - 1)
We have n bits of work to distribute amongst the slaves, which we do by
zipping [1 .. n] and cycle slaves to get something like
[(1, slave1), (2, slave2), (3, slave3), (4, slave1), (5, slave2), ...
For each of these bits of work we spawn a separate process on the slave node, all of which will run concurrently. This may be too resource intensive (for instance, if each computation would be memory hungry). We will consider a solution to that problem in the next section.
The partial results arrive back at the master node in arbitrary order; this
does not matter because the result of addition does not depend on the order of
the arguments. We spawn the slaves in a separate process (using spawnLocal)
so that the master process can start collecting partial results while it is
still spawning more slaves.
Work-Pushing
If we spawn a separate process for each computation then all these computations run concurrently, which may be too resource intensive. We can instead spawn a single child process on each of the slave nodes, and ask each of those slave processes to factorize a bunch of numbers:

As before, we have a number of slave nodes (blue) and a single master node (red), but now we only have a single slave process on each slave node. The master process pushes the computations to be done to the message queues of the slave processes, which will process them one by one and push the partial results back on the message queue of the master process.
The slave processes wait repeatedly for an integer n and compute
numPrimeFactors n. The closure environment for the slave (the first
argument) now only contains the process ID of the master, because the slave
receives the natural number to factorize by message:
slave :: ProcessId -> Process () slave them = forever $ do n <- expect send them (numPrimeFactors n) remotable ['slave]
The master process starts one of these slave processes on each of the slave
nodes, distributes the integers [1 .. 100] among them and waits for the
results.
master :: Integer -> [NodeId] -> Process Integer master n slaves = do us <- getSelfPid -- Start slave processes slaveProcesses <- forM slaves $ \nid -> spawn nid ($(mkClosure 'slave) us) -- Distribute 1 .. n amongst the slave processes forM_ (zip [1 .. n] (cycle slaveProcesses)) $ \(m, them) -> send them m -- Wait for the result sumIntegers (fromIntegral n)
Exercise 1: The slave processes keep running even when the master process finishes. How would you modify this example so that they are terminated when they are no longer necessary?
The master pushes all bits of work to be done to the slaves up front. These messages will sit in the slaves' message queues until they are processed. If the messages are big then it might be more efficient to incrementally send messages to slaves.
Exercise 2: (More difficult) Modify the master so that the slaves will only have a limited number of messages waiting in their queue (this means that the master will need to know the sender slave of each reply). (An alternative solution is to switch from work-pushing to work-stealing, which we discuss in the next section).
Note on reliability In the Master-Slave example, if one slave process dies we can restart it to redo that single computation. Restarting is more tricky in the Work-Pushing setup, because a single process is responsible for a large amount of work.
Work-Stealing
A disadvantage of both the master-slave setup and the work-pushing setup is that the master node must decide, a priori, what each slave is going to do. Unless the master node can predict accurately how long each computation will take, it is likely that this leaves some slaves twidling their thumbs while others are buried in work.
One way to avoid this problem is to have the slaves ask the master for work whenever they're ready. This is a simple but effective way of achieving load balancing.

A single master node (red) and a bunch of slave nodes (blue). Each of the slave nodes runs a single slave process. The master node does not push work to the slaves, but rather the slaves query the master for work. To simplify the design, the master process spawns an auxiliary "work queue" process that the slave processes query for the next bit of work to do. This auxiliary process replies to the slave process which then does the work, posts the partial result to the master process message queue, and queries the "work queue" process for more work.
slave :: (ProcessId, ProcessId) -> Process ()
slave (master, workQueue) = do
us <- getSelfPid
go us
where
go us = do
-- Ask the queue for work
send workQueue us
-- If there is work, do it, otherwise terminate
receiveWait
[ match $ \n -> send master (numPrimeFactors n) >> go us
, match $ \() -> return ()
]
remotable ['slave]
The slave is passed the process ID of the process that it can query for more
work, as well as the process ID of the master. When it receives an integer it
factorizes it, sends the number of prime factors to the master process, and
then asks the work queue process for the next bit of work; when it receives a
unit value () it terminates.
master :: Integer -> [NodeId] -> Process Integer
master n slaves = do
us <- getSelfPid
workQueue <- spawnLocal $ do
-- Return the next bit of work to be done
forM_ [1 .. n] $ \m -> do
them <- expect
send them m
-- Once all the work is done tell the slaves to terminate
forever $ do
pid <- expect
send pid ()
-- Start slave processes
forM_ slaves $ \nid -> spawn nid ($(mkClosure 'slave) (us, workQueue))
-- Wait for the result
sumIntegers (fromIntegral n)
The master process needs to do two things concurrently: it needs to make sure that slave nodes can ask for more work to do, and it needs to collect the partial results from the slaves. We could do this in a single process, but the design above is much simpler: the master spawns an auxiliary process whose sole purpose is to provide the slaves with more work when they request it; the master process itself meanwhile waits for the partial results from the slaves, as before.
The master spawns a local process which the slaves can query for work; it just
sends out the integers [0 .. 100] in order to whomever asks nexts. It then
starts the slaves, waits for results, and returns the sum.
Exercise 3: Does the above implementation of the master process guarantee that all slave nodes will be properly terminated?
Exercise 4: A downside of this approach compared to the work-pushing approach above is that the latency between computations by each slave is higher: when one computation completes, the slaves must wait for a reply from the work queue process before the next can start. How might you improve this?
To be continued
In the next blog post we will analyze the performance and memory usage of these communication patterns in more detail.
The New Cloud Haskell
Thu, 04 Oct 2012 16:23:55 GMT, by duncan.
Filed under parallel, cloud-haskell.
The new implementation
For about the last year we have been working on a new implementation of Cloud Haskell. This is the same idea for concurrent distributed programming in Haskell that Simon Peyton Jones has been telling everyone about, but it's a new implementation designed to be robust and flexible.
The summary about the new implementation is that it exists, it works, it's on hackage, and we think it is now ready for serious experiments.
Compared to the previous prototype:
- it is much faster;
- it can run on multiple kinds of network;
- has backends to support different environments (like cluster or cloud);
- has a new system for dealing with node disconnect and reconnect;
- has a more precisely defined semantics;
- supports composable, polymorphic serialisable closures;
- and internally the code is better structured and easier to work with.
The key packages on hackage are:
We will also release a backend for the Windows Azure cloud platform later this month.
I gave a talk at the Haskell Implementors Workshop last month with lots more details about the new implementation. The slides and video are available:
A tutorial on Communication Patterns in Cloud Haskell
Starting tomorrow we're going to do a series of blog posts about using Cloud Haskell. It's a mini-tutorial, with examples that you can try and exercises to extend the code.
We'll be focusing on patterns for distributing work amongst a number of machines in a network. We'll start with very simple distributed patterns and work up to map-reduce and a slight generalisation of map-reduce. We'll also look closely at performance and resources like memory and network latency.
A gentle introduction
If you want a little background reading to help you follow what we're going to be talking about, here's some recommendations:
- A Cloud Haskell Appetiser, part of the Parallel Haskell Digest series
- Slides from a talk I gave earlier this year which has more motivation for why this approach to distributed programming is interesting in the first place
- A video presentation by Simon Peyton Jones explaining the Cloud Haskell idea
- Towards Haskell in the Cloud, the original paper from last year by Jeff Epstein, Andrew Black, and Simon Peyton Jones
More technical details
For even more technical details, see the developer documentation in the source repo and wiki
- source repo and README
- distributed process overview
- Network.Transport lots of details on the network transport layer
- Draft of the formal semantics
There is also the parallel-haskell google group for discussion and release announcements.
Haskell Courses
Wed, 19 Sep 2012 13:34:43 GMT, by andres.
Filed under well-typed.
Well-Typed is partnering with Skills Matter to offer two Haskell courses in London, targeting professional developers who want to learn Haskell.
Fast Track to Haskell
8-9 October 2012
A two-day introduction to Haskell assuming no previous Haskell or functional programming experience. Covers Haskell syntax, how to define functions and datatypes, dealing with IO, and monads.
Advanced Haskell
11-12 October 2012
A two-day course for people with basic Haskell experience who want to take their Haskell skills to the next level. Covers functional data structures, profiling Haskell programs, concurrency and parallelism, programming patterns and type-level programming.
The courses are designed such that they can be taken both individually and in sequence.
On the day in between, October 10, Skills Matter is organizing the Haskell eXchange, a one-day conference featuring talks by Simon Peyton Jones, Simon Marlow, Lennart Augustsson, Blake Rain, Duncan Coutts and Rob Harrop.
Registration for all these events is open. I hope to see many of you there.
