Status update on GHC maintenance

Sunday, 04 August 2013, by Duncan Coutts.
Filed under ghc, community, well-typed.

As many in the community are aware, our long-time colleague and co-founder Ian Lynagh is leaving for new challenges. We are of course sad to see him go and wish him all the best with his new endeavours. Since he and I founded the company together five years ago he has worked tirelessly to make it a success. It has been a privilege to work with him. It has also been great fun.

Ian has of course played an important role in the Haskell community as one of the pillars of GHC development. It is understandable that people in the community have concerns about what his departure means, so I want to give everyone an update on our plans which I hope will allay concerns.

As a company, Well-Typed is fully committed to GHC and helping Simon PJ with its development and maintenance. We are currently in the process of recruiting. In addition, our colleague Edsko has already started working on GHC and he will continue during the transition period and perhaps also in the longer term.

Our plan is actually to have two people work on GHC, each about half-time. This is partly because GHC work requires a wide range of specialised skills and partly to provide a bit of variety for the people working on GHC. Ian worked primarily on GHC for seven years, but he is a bit superhuman in that respect.

Once we have determined who will be working on GHC we will make further announcements. There will inevitably be a lull in activity as Edsko gets up to speed and the new people come on board, but I am reasonably confident that this will not be too significant.

The first big challenge will be to manage the process of the GHC 7.8 release. The target is to have a release candidate available shortly before ICFP in late September, and then a final release a month or two later.

I also want to reiterate what Simon has been saying recently, that we have to see GHC as being owned by the community and that community involvement in GHC development and maintenance is vital. Well-Typed's roll in GHC maintenance is not to do it on behalf of the community. It is better to think of our role as like that of a full-time maintainer. A maintainer can help volunteers get their patches reviewed and merged. They provide continuity, knowledge of the codebase, can help make decisions, and can organise volunteer efforts and processes like releases.

Ian has of course shaped and defined the role, but it is not fixed in stone. Our new people working on GHC maintenance will have some scope to define the role, in consultation with Simon and the community.

I should also say that the number of volunteers working on GHC has increased noticeably since Simon PJ called on the community to get more involved following Simon Marlow's departure. This is a very promising sign and we will play our part to assist those volunteers and to encourage more people.

So all in all, we agree with Simon that the future of the Glorious Haskell Compiler is indeed glorious!


Well-Typed are hiring: Haskell developer

Thursday, 11 July 2013, by Duncan Coutts.
Filed under well-typed.

We are looking to hire a Haskell expert to work with us at Well-Typed as a Haskell developer. This is an exciting opportunity for someone who is passionate about Haskell and who is keen to improve and promote Haskell in a professional context.

The role is quite general and could cover any of the projects and activities that we are involved in as a company. The tasks may involve:

At the moment, we are particularly hoping to find someone with an interest in supporting the development and maintenance of GHC. Therefore, some knowledge or interest in compiler internals, operating systems, the foreign function interface (FFI), and/or deployment issues would be welcome.

Well-Typed has a variety of clients. For some we do proprietary Haskell development and consulting. For others, much of the work involves open-source development and cooperating with the rest of the Haskell community: the commercial, open-source and academic users.

Our ideal candidate has excellent knowledge of Haskell, whether from industry, academia, or personal interest. Familiarity with other languages, low-level programming, and good software engineering practices are also useful. Good organisation and ability to manage your own time, and reliably meet deadlines, is important. You should also have good communication skills. Being interested or having experience in teaching Haskell (or other technical topics) is a bonus. Experience of consulting, or running a business, is also a bonus. You are likely to have a bachelor's degree or higher in computer science or a related field, although this isn't a requirement.

The offer is initially for a one-year full time contract. We are also happy to receive applications for part-time work. The annual salary is from GBP 34,800 or pro rata for part-time or flexible work. We also operate a bonus scheme. We offer flexible hours and work from home. Living in England is not required. We may be able to offer either employment or sub-contracting, depending on the jurisdiction in which you live.

If you are interested, please apply via info@well-typed.com. Tell us why you are interested and why you would be a good fit for the job, and attach your CV. Please also indicate how soon you might be able to start. We are more than happy to answer informal enquiries. Contact Duncan Coutts, Ian Lynagh or Andres Löh for further information, either by email or IRC.

To ensure we can properly consider your application, please get it to us by July 25th, 2013, though we may be able to consider applications received later.


Video and slides on (Alternatives to) Lazy I/O

Thursday, 11 July 2013, by Duncan Coutts.
Filed under coding.

Edsko is in London this week running our regular Haskell training courses. Last night he gave a talk, as part of the Skills Matter "In The Brain" series, on the subject of lazy I/O in Haskell and the various old and new alternatives.

(Alternatives to) Lazy I/O

by Edsko de Vries

If you are lucky enough to be in or near London, there is now quite a range of free evening Haskell events to keep an eye out for:

These are all fun, friendly events to learn a bit more and meet up with fellow Haskellers and get involved with the community.


Haskell training in San Francisco and New York

Wednesday, 24 April 2013, by Duncan Coutts.
Filed under training.

We are offering Haskell courses in San Francisco and New York in early June.

They are for professional developers who want to learn Haskell or improve their skills. There is a 2-day introductory course and a 2-day advanced course.

Locations, dates and registration

Well-Typed are running these courses in partnership with FP Complete and Skills Matter.

San Francisco Bay Area

Register via FP Complete.

New York

Register via Skills Matter: introductory course, or advanced course.


InfoQ interview with Duncan on Parallelism, Concurrency and Distributed Programming

Thursday, 14 March 2013, by Duncan Coutts.
Filed under well-typed, parallel.

I was at the Tech Mesh London conference at the end of last year giving a talk about Haskell's approach to parallel programming. While I was there I did an interview with Werner Schuster from InfoQ which is now up on their site.

Duncan Coutts on Parallelism and Concurrency with Haskell, Distributed Programming with Cloud Haskell

Duncan Coutts explains the nature of Concurrency and Parallelism in Haskell, its threading and STM implementation, Erlang OTP's influence on Cloud Haskell for distributed programming, Monads, and more.

Of course since I was at the conference to tell everyone about deterministic parallelism in Haskell then we talked about that a bit, and how that is different from concurrency.

While we were talking about STM I went off on a bit of a rant about Node.js, async I/O and how we do sequential imperative programming so much better in Haskell. Hopefully people find that interesting. It's a topic that I think doesn't get enough attention.

We also touched on Cloud Haskell and Werner put me on the spot by asking what my favourite monad is.

My thanks to Werner for an interesting chat, and to the people at InfoQ for the A/V and other technical work.


Upcoming Haskell training courses

Wednesday, 23 January 2013, by Duncan Coutts.
Filed under well-typed.

Well-Typed is running Haskell courses in London, targeting professional developers who want to learn Haskell or improve their skills.

The next courses are in the first week of February, and then again in April and July.

Fast Track to Haskell

4-5 February 2013

A two-day introduction to Haskell assuming no previous Haskell or functional programming experience. Covers Haskell syntax, how to define functions and datatypes, dealing with IO, and monads.

Advanced Haskell

6-7 February 2013

A two-day course for people with basic Haskell experience who want to take their Haskell skills to the next level. Covers functional data structures, understanding laziness and performance, profiling Haskell programs, concurrency and parallelism, programming patterns and type-level programming.

The courses are designed such that they can be taken both individually or in sequence.

Registration for both courses is open and there are discounts for early bookings. We hope to see many of you there.


Deterministic Parallel Programming with Haskell

Saturday, 08 December 2012, by Duncan Coutts.
Filed under parallel.

This week I gave a talk at the Tech Mesh conference:

Multi-core with less Pain: Deterministic Parallel Programming with Haskell.

Abstract:

You want to make your code run faster, so you think about changing it to run in parallel to use multiple cores. But most approaches to parallelism also force you to rewrite your program using explicit concurrency which means you'll always worry about race conditions, deadlocks and other concurrency bugs. Deterministic parallelism frees you from concurrency bugs and gives a strong guarantee that your program remains deterministic, so it will give the same result no matter the number of cores or the scheduling being used.

Haskell offers a variety of libraries for deterministic parallelism that enable concise high-level parallel programs. In this talk we will look at the idea of parallelism without concurrency and give an overview of the paradigms that Haskell offers in this area.

The main point I was making is that we can achieve parallel speedups, without ourselves having to write explicitly concurrent programs. For a mainstream audience this is often a rather surprising and novel idea.

The talk was based on an article that Andres and I wrote for the Computing in Science & Engineering Magazine (CiSE). As you can guess from the name, CiSE is aimed at an audience of scientists and engineers. CiSE ran a special issue on parallelism and concurrency in modern programming languages. They got articles on approaches in different languages (Clojure, Erlang, and Haskell). The guest editors gave the authors the problem of parallelising a simple numerical solver. You can read the guest editors' introduction to the special issue.

For those of you not subscribed to CiSE magazine, we are making our article available

Funnily enough, I'm still waiting for my copy of CiSE to arrive, so I've not yet read the articles on Clojure and Erlang. I'll be interested to see how the other authors solved the same problem. The tricky aspect of the problem is that it has very fine grained parallelism.

When we looked at the problem, we decided it was a good fit for data parallelism, and we wrote a solution using Repa. Repa handles the granularity issue for us automatically by chunking up the array across cores. If we'd used a more manual technique we would have had to do quite a bit of chunking ourselves to get the granularity right, and that would have added more code that could have obscured the algorithm. As it was, the core of our optimised Repa version was only 5 lines long and pretty readable. (The code snippets are in the article and the full code is available on the CiSE website.)

We got performance results that are fairly competitive with C (using gcc and OpenMP, see the article for the details). The competitive side of me is also interested to find out how we compare on performance with the other solutions. After all, parallelism is about performance.


Communication Patterns in Cloud Haskell (Part 4)

Monday, 15 October 2012, by Edsko de Vries.
Filed under parallel, cloud-haskell.

K-Means

In Part 3 of this series we showed how to write a simple distributed implementation of Map-Reduce using Cloud Haskell. In this final part of the series we will explain the K-Means algorithm and show how it can be implemented in terms of Map-Reduce.

K-Means is an algorithm to partition a set of points into n clusters. The algorithm iterates the following two steps for a fixed number of times (or until convergence):

The initial cluster centres can be chosen randomly. Here is one example run of the first 5 iterations of the algorithm on randomly (evenly) distributed two-dimensional points:

Iteration B-1 Iteration B-2 Iteration B-3 Iteration B-4 Iteration B-5

Of course, in such an evenly distributed data set the clusters that are "discovered" are more or less arbitrary and heavily influenced by the choice of initial centers. For example, here is another run of the algorithm with different initial centers:

Iteration B-1 Iteration B-2 Iteration B-3 Iteration B-4 Iteration B-5

K-Means as a Map-Reduce skeleton

We will use Map-Reduce to implement a single iteration of the K-Means algorithm. Each mapper node will execute step (1) of the algorithm for a subset of the points, and in the reduction step we will compute the new cluster centres.

type Point    = (Double, Double)
type Cluster  = (Double, Double)

average :: Fractional a => [a] -> a
average xs = sum xs / fromIntegral (length xs)

distanceSq :: Point -> Point -> Double 
distanceSq (x1, y1) (x2, y2) = a * a + b * b
  where
    a = x2 - x1
    b = y2 - y1

nearest :: Point -> [Cluster] -> Cluster
nearest p = minimumBy (compare `on` distanceSq p) 

center :: [Point] -> Point
center ps = let (xs, ys) = unzip ps in (average xs, average ys) 

kmeans :: Array Int Point -> MapReduce (Int, Int) [Cluster] Cluster Point ([Point], Point)
kmeans points = MapReduce {
    mrMap    = \(lo, hi) cs -> [ let p = points ! i in (nearest p cs, p)
                               | i <- [lo .. hi]
                               ] 
  , mrReduce = \_ ps -> (ps, center ps)
  }

We start with a Map (Int, Int) [Cluster]; the keys in this map correspond to the segmentation of the input set; for instance, a key (20, 39) indicates that a mapper node should compute clusters for points [20 .. 39]. The values in this map are the current center (that is, every key in the map has the same value).

The mappers then compute a list [(Cluster, Point)] which associates points with each cluster. Finally, in the reduction step we create a Map Cluster ([Point], Point) which tells us for each cluster its set of points and its centre.

Iterating locally

The Map-Reduce skeleton only computes a single iteration of the algorithm; we need to iterate this a number of steps to implement the full algorithm. Using the localMapReduce we defined above we can do this as

localKMeans :: Array Int Point 
            -> [Cluster] 
            -> Int
            -> Map Cluster ([Point], Point)
localKMeans points cs iterations = go (iterations - 1)
  where
    mr :: [Cluster] -> Map Cluster ([Point], Point)
    mr = localMapReduce (kmeans points) . trivialSegmentation
  
    go :: Int -> Map Cluster ([Point], Point)
    go 0 = mr cs 
    go n = mr . map snd . Map.elems . go $ n - 1

    trivialSegmentation :: [Cluster] -> Map (Int, Int) [Cluster]
    trivialSegmentation cs' = Map.fromList [(bounds points, cs')]

For the local implementation we don't care about how we partition the input, so we just create a trivial segmentation and have one mapper process the entire input.

Generalizing the distributed Map-Reduce implementation

The set of points itself does not vary from one iteration of the algorithm to another, and only needs to be distributed to the mapper nodes once. The master process of our Map-Reduce implementation from Part 3 however looks like

This means that if we use distrMapReduce as implemented we will re-distribute the full set of points to the mapper nodes on every iteration of the algorithm. To avoid this, we can generalize the Map-Reduce implementation to be

We will change the type of distrMapReduce to

distrMapReduce :: Closure (MapReduce (Point, Point) [Cluster] Cluster
                                     Point ([Point], Point))
               -> [NodeId]
               -> ((Map (Point, Point) [Cluster] ->
                   Process (Map Cluster ([Point], Point))) -> Process a)
               -> Process a 

In distrMapReduce mrClosure mappers p the process p is provided with a means to run map-reduce queries; compare the type of distrMapReduce to the types of functions such as

withFile :: FilePath -> IOMode -> (Handle -> IO r) -> IO r

Exercise 8: Implement distrMapReduce with the type above. This does not require any new Cloud Haskell concepts, but does require a bit of engineering. (You can find the implementation also in the distributed-process-demos package).

Note that even with this generalization we will pass the entire set of points to all the nodes, even though each node will only operate on a subset of them; we leave optimizing this as an exercise to the reader (it will require a further generalization of the Map-Reduce driver).

Polymorphism

In the section above we changed the type of distrMapReduce to match the type of the K-Means MapReduce skeleton instead of the word-counting MapReduce skeleton; we can change that type without changing the implementation at all. What we really want, of course, is a polymorphic implementation of Map-Reduce:

distrMapReduce :: (Serializable k1, Serializable v1, Serializable k2,
                   Serializable v2, Serializable v3, Ord k2)
               => Closure (MapReduce k1 v1 k2 v2 v3)
               -> [NodeId]
               -> ((Map k1 v1 -> Process (Map k2 v3)) -> Process a) 
               -> Process a 

However, when we try to generalize our code above we run into a problem. Consider the code that we ship to the mapper nodes. What does this code need to do? First, it needs to wait for a message of a specific type. In order to do the type matching it needs some information about the type (k1, v1). Once it receives such a message, it needs to send the list of type [(k2,v2)] created by the Map function back to the master. In order to do that, it needs to know how to serialize values of type [(k2,v2)].

Where does distrMapReduce get this information? Well, it is provided by the Serializable type class constraints. Unfortunately, however, Haskell does not give an explicit handle on these arguments, much less provide us with a way to serialize these arguments so that we can ship them to the mapper nodes. We can, however, reify a type class constraint as an explicit dictionary:

data SerializableDict a where
    SerializableDict :: Serializable a => SerializableDict a

We cannot serialize objects of type SerializableDict directly, but we can serialise static SerializableDicts! Hence, the type of distrMapReduce becomes:

distrMapReduce :: forall k1 k2 v1 v2 v3 a. 
                  (Serializable k1, Serializable v1, Serializable k2,
                   Serializable v2, Serializable v3, Ord k2)
               => Static (SerializableDict (k1, v1))
               -> Static (SerializableDict [(k2, v2)])
               -> Closure (MapReduce k1 v1 k2 v2 v3)
               -> [NodeId]
               -> ((Map k1 v1 -> Process (Map k2 v3)) -> Process a) 
               -> Process a 

This type may look a bit intimidating, but really all that has changed is that we require static type information so that we can ship this type information to the mappers. We omit the implementation; you can find it in the distributed-process-demos package; the general principles are explained in the documentation of the distributed-static package.

Using this polymorphic version of distrMapReduce is no more difficult than using the monomorphic version; for example, we can implement "distributed word counting" as

dictIn :: SerializableDict (FilePath, Document)
dictIn = SerializableDict

dictOut :: SerializableDict [(Word, Frequency)]
dictOut = SerializableDict

countWords_ :: () -> MapReduce FilePath Document Word Frequency Frequency
countWords_ () = countWords

remotable ['dictIn, 'dictOut, 'countWords_]

distrCountWords :: [NodeId] -> Map FilePath Document -> Process (Map Word Frequency)
distrCountWords mappers input = 
  distrMapReduce $(mkStatic 'dictIn)
                 $(mkStatic 'dictOut) 
                 ($(mkClosure 'countWords_) ())
                 mappers
                 (\iteration -> iteration input)

Creating the necessary SerializableDicts is easy (there is only one constructor for SerializableDict, after all, and it doesn't take any arguments!). Note that the word counter only calls the iteration function once; this will not be true for the final algorithm we consider: distributed k-means.

Distributed K-Means

The distributed K-means is not much more complicated. Everything up to and including go pretty much follows the local implementation; the remainder (segments, dividePoints, pointsPerMapper and numPoints) just compute which segment of the input each mapper node is going to do.

dictIn :: SerializableDict ((Int, Int), [Cluster])
dictIn = SerializableDict

dictOut :: SerializableDict [(Cluster, Point)]
dictOut = SerializableDict

remotable ['kmeans, 'dictIn, 'dictOut]

distrKMeans :: Array Int Point 
            -> [Cluster]
            -> [NodeId] 
            -> Int
            -> Process (Map Cluster ([Point], Point))
distrKMeans points cs mappers iterations = 
    distrMapReduce $(mkStatic 'dictIn) 
                   $(mkStatic 'dictOut) 
                   ($(mkClosure 'kmeans) points) 
                   mappers
                   (go (iterations - 1))
  where
    go :: Int
       -> (Map (Int, Int) [Cluster] -> Process (Map Cluster ([Point], Point))) 
       -> Process (Map Cluster ([Point], Point))
    go 0 iteration = do
      iteration (Map.fromList $ map (, cs) segments) 
    go n iteration = do
      clusters <- go (n - 1) iteration
      let centers = map snd $ Map.elems clusters
      iteration (Map.fromList $ map (, centers) segments)
    
    segments :: [(Int, Int)]
    segments = let (lo, _) = bounds points in dividePoints numPoints lo

    dividePoints :: Int -> Int -> [(Int, Int)]
    dividePoints pointsLeft offset 
      | pointsLeft <= pointsPerMapper = [(offset, offset + pointsLeft - 1)] 
      | otherwise = let offset' = offset + pointsPerMapper in
                    (offset, offset' - 1)
                  : dividePoints (pointsLeft - pointsPerMapper) offset' 

    pointsPerMapper :: Int
    pointsPerMapper = 
      ceiling (toRational numPoints / toRational (length mappers))
       
    numPoints :: Int
    numPoints = let (lo, hi) = bounds points in hi - lo + 1

Exercise 9: In this example we create precisely enough segments that every mapper nodes gets a single segment. We could have created more or fewer segments. Why is creating one segment per mapper node the optimal choice for this algorithm? In which case might creating more segments be more efficient?

Exercise 10: The data sent back from the mapper nodes to the master node contains a lot of redundancy. How might you improve that?

Profiling

To conclude we briefly look at the heap profiles of the master and the slave nodes of our distributed k-means example.

Map-Reduce Master

Heap profile (by type) of the Map-Reduce master running K-Means Single slave, clustering 50,000 points, 5 iterations

In this example the master process first creates a random set of 50,000 points, before running the K-means algorithm. This is the first peak. Then the other 5 peaks are the master node collecting data from the mapper nodes before reducing them locally.

Exercise 11. In the Work-Stealing example from Part 1 of this series the master reduces as it receives messages (sumIntegers). Might you do something similar in the Map-Reduce master?

Map-Reduce Slave

Heap profile (by type) of the Map-Reduce slave running K-Means Single slave, clustering 50,000 points, 5 iterations

In the slave node too, we clearly see the 5 iterations of the algorithm. At the start of each iteration the mapper node creates a list that associates points to clusters. Once the list is created, the list is serialized as a bytestring (purple section in the profile) and sent to the master node before the cycle starts again.

Exercise 12: (More technical) Why is the entire list created before it is serialized to a bytestring? (Hint: look up the Binary instance for lists.) How might you change this so that the list is created lazily as it is serialized?

More information

We hope you've enjoyed this series (Parts 1, 2, 3, 4). For links to more information see the Cloud Haskell home page.


Communication Patterns in Cloud Haskell (Part 3)

Friday, 12 October 2012, by Edsko de Vries.
Filed under parallel, cloud-haskell.

Map-Reduce

In Part 1 and Part 2 of this series we described a number of ways in which we might compute the number of prime factors of a set of numbers in a distributed manner. Abstractly, these examples can be described as

a problem (computing the number of prime factors of 100 natural numbers) is split into subproblems (factorizing each number), those subproblems are solved by slave nodes, and the partial results are then combined (summing the number of factors)

This is reminiscent of Google's Map-Reduce algorithm (MapReduce: Simplified Data Processing on Large Clusters, Dean and Ghemawat, OSDI'04), with the "Map" part corresponding to the computation of partial results and the "Reduce" part corresponding to combining these results. We will explain the basic ideas behind Map-Reduce before giving a distributed implementation using work-stealing.

Local map-reduce

The exposition we give in this section is based on Google's MapReduce Programming Model -- Revisited by Ralf Laemmel (SCP 2008). The Map-Reduce algorithm transforms key-value maps; in Haskell, its type is given by

-- The type of Map-Reduce skeletons (provided by the user)
data MapReduce k1 v1 k2 v2 v3 = MapReduce {
    mrMap    :: k1 -> v1 -> [(k2, v2)]
  , mrReduce :: k2 -> [v2] -> v3
  }

-- The driver (which "executes" a Map-Reduce skeleton)
localMapReduce :: Ord k2 => MapReduce k1 v1 k2 v2 v3 -> Map k1 v1 -> Map k2 v3

Map-Reduce

We start with a key-value map (with keys of type k1 and values of type v1). With the help of the "Map" (mrMap) part of a Map-Reduce skeleton each of these key-value pairs is turned into a list of key-value pairs (with keys of type k2 and values of type v2); note that this this list may (and typically does) contain multiple pairs with the same key. The Map-Reduce driver then collects all values for each key, and finally reduces this list of values (of type v2) to a single value (of type v3) using the "Reduce" (mrReduce) part of the skeleton.

(Colour changes in the diagram indicate type changes.)

Exercise 6: Implement localMapReduce. The types pretty much say what needs to be done.

Consider counting the number of words in a set of documents; that is, we want to transform a Map FilePath Document to a Map Word Frequency. We can do this with the following Map-Reduce skeleton:

countWords :: MapReduce FilePath Document Word Frequency Frequency
countWords = MapReduce {
    mrMap    = const (map (, 1) . words)
  , mrReduce = const sum  
  }

Distributed map-reduce

We are going to have slave nodes execute the Map part of the algorithm; we will use work-stealing to distribute the work amongst the slaves. We are not going to distribute the Reduce part of the algorithm, but do that on a single machine. For now we will give a monomorphic implementation of the distributed Map-Reduce algorithm, tailored specifically for the countWords example from the previous section. In the next post we will see how to make the implementation polymorphic.

The implementation follows the Work-Pushing example from Part 1 very closely. The slave asks for work and executes it, using the mrMap part of the Map-Reduce skeleton:

mapperProcess :: (ProcessId, ProcessId, Closure (MapReduce String String String Int Int))
              -> Process ()
mapperProcess (master, workQueue, mrClosure) = do
    us <- getSelfPid
    mr <- unClosure mrClosure
    go us mr 
  where
    go us mr = do
      -- Ask the queue for work 
      send workQueue us
  
      -- Wait for a reply; if there is work, do it and repeat; otherwise, exit
      receiveWait 
        [ match $ \(key, val) -> send master (mrMap mr key val) >> go us mr
        , match $ \()         -> return () 
        ]

remotable ['mapperProcess]

Note that the slave wants a Closure of a Map-Reduce skeleton; since the Map-Reduce skeleton itself contains functions, it is not serializable.

The master process is

distrMapReduce :: Closure (MapReduce String String String Int Int)
               -> [NodeId]
               -> Map String String
               -> Process (Map String Int)
distrMapReduce mrClosure mappers input = do
  mr     <- unClosure mrClosure
  master <- getSelfPid

  workQueue <- spawnLocal $ do
    -- Return the next bit of work to be done
    forM_ (Map.toList input) $ \(key, val) -> do 
      them <- expect
      send them (key, val)

    -- Once all the work is done tell the mappers to terminate
    replicateM_ (length mappers) $ do
      them <- expect
      send them ()

  -- Start the mappers
  forM_ mappers $ \nid -> spawn nid ($(mkClosure 'mapperProcess) (master, workQueue, mrClosure)) 

  -- Wait for the partial results
  partials <- replicateM (Map.size input) expect 

  -- We reduce on this node
  return (reducePerKey mr . groupByKey . concat $ partials)

We can now implement "distributed word counting" as

countWords_ :: () -> MapReduce FilePath Document Word Frequency Frequency
countWords_ () = countWords

remotable ['countWords_]

distrCountWords :: [NodeId] -> Map FilePath Document -> Process (Map Word Frequency)
distrCountWords = distrMapReduce ($(mkClosure 'countWords_) ())

Performance

If we use ThreadScope to look at how busy the nodes are, we find something like

Wasted Resources

with alternating activity between the master node (which does the reduction, top) and the mapper nodes (bottom). This is unsurprising, of course; we have a distributed implementation of the Map phase but reduce locally.

Exercise 7: Change the implementation so that the reduce step happens in parallel too and confirm that the work distribution is now better.

Note: as hinted at by the ThreadScope plot above, it is possible to use ThreadScope to look at distributed applications running in multiple OS processes, including on different physical hosts. Doing so is currently a somewhat manual process. In addition to the normal step of linking the application using the -eventlog flag, and running with the flag +RTS -l -RTS, there are a few steps after the program is complete: collect the .eventlog files from each node; use the ghc-events merge command to merge all the eventlog files into one (which currently has to be done in several steps because the merge command only takes pairs of files at once) and finally use ThreadScope to view the combined eventlog.

To be continued

In the next blog post we will look at how we can make the distributed Map-Reduce skeleton polymorphic. We'll also look at k-means as an example algorithm that can be implemented using Map-Reduce. In fact for efficiency, k-means requires a "multi-shot" Map-Reduce and we'll see how Haskell and Cloud Haskell give us the flexibility we need to make this kind of extension.


Communication Patterns in Cloud Haskell (Part 2)

Monday, 08 October 2012, by Edsko de Vries.
Filed under parallel, cloud-haskell.

Performance

In Part 1 of this series we introduced a number of basic Cloud Haskell communication patterns. All these patterns will work fine for modest numbers of work items; in this blog post we're going to look at the performance and memory use for much larger numbers of work items, so we can see how each pattern scales up. We will see that scalability improves as we move from master-slave to work-pushing to work-stealing, and that many further improvements can still be made. More importantly, we hope to help you to predict runtime behaviour from your code, so that you can write more scalable and high-performance code.

Predicting resource use and performance

When thinking about Cloud Haskell performance you should consider things such as

You should try and make a mental comparison between the Master-Slave, Work-Pushing and Work-Stealing examples before reading the analysis, below.

The Master process in the Master-Slave setup

In the discussion of the master-slave setup we mentioned that spawning a separate process for each subcomputation up-front might be too resource intensive on the client side. What we didn't mention, however, is that the master process we defined there is also resource hungry.

Spawn Strategies

Heap profile of the master process in the Master-Slave example Single slave, factorizing first n = 5000 numbers

1. As defined above

2. Using reconnect

3. Using spawnAsync

It is obvious from the heap profile that the master process has a memory leak. But where is that coming from? The problem arises when we spawn the child processes:

spawnLocal $
  forM_ (zip [1 .. n] (cycle slaves)) $ \(m, there) -> 
    spawn there ($(mkClosure 'slave) (us, m))

In order to deal with network failure at any point during the spawning process, spawning involves sending a message to the newly started remote process that says "it's okay to start running now". In order to provide reliable and ordered message passing, Cloud Haskell must maintain some state for every destination that a process sends messages to. This means that for the master process it must maintain state for n outgoing connections, since it starts a process for each subcomputation.

In a future version of Cloud Haskell this problem might be solved automatically, but for now we can solve it manually by using the reconnect primitive. The reconnect primitive is primarily meant to be used in situations where processes get disconnected from each other, but as a side effect it lets us release the connection state, and that's the effect we need for our workaround here:

spawnLocal $
  forM_ (zip [1 .. n] (cycle slaves)) $ \(m, there) -> do
    them <- spawn there ($(mkClosure 'slave) (us, m))
    reconnect them

This yields heap profile (2), above, and we see that the memory leak has disappeared.

However, there is another problem with this master. If we compare performance, we find that it is more than an order of magnitude slower than the master process in the work-pushing version. The reason is that spawn is synchronous: we wait for the remote node to tell us the process ID of the newly started process. That means that the execution of the master looks something like

spawn first process
wait for reply
spawn second process
wait for reply
...

Synchronous message passing is much slower than asynchronous message passing, which is why the work-pushing master is so much faster. However, spawn is actually implemented in terms of spawnAsync, which we can use directly:

spawnLocal $
  forM_ (zip [1 .. n] (cycle slaves)) $ \(m, there) -> do
    spawnAsync there ($(mkClosure 'slave) (us, m))
    _ <- expectTimeout 0 :: Process (Maybe DidSpawn)
    return ()

This yields a similar heap profile (profile 3, above), but is much faster. We use the expectTimeout function to check – without actually waiting – and discard DidSpawn notifications (which are notifications from the remote node that tell us the PID of new processes); this avoids a build-up in our message queue, which would show up in the heap profile as a new memory leak. For every spawnAsync we do we remove at most one DidSpawn message; any DidSpawn message that are "left over" when we finish spawning will simply be discarded because the process that did the spawning will have terminated.

Comparing the Master processes across Master-Slave, Work-Pushing and Work-Stealing

The heap profile of the Master-Slave (with the memory leak fixed) is pretty similar to the heap profile of the master process of the work-pushing and the work-stealing examples:

Master Heap Profiles

Heap profiles of the master processes Single slave, factorizing first n = 50,000 numbers

1. In the master-slave setup

2. In the work-pushing setup

3. In the work-stealing setup

Comparing the Slave processes across Master-Slave, Work-Pushing and Work-Stealing

The heap profiles of the slave processes are however more interesting:

Slave Heap Profiles

Heap profiles of the slave processes Single slave, factorizing first n = 50,000 numbers

1. In the master-slave setup

2. In the work-pushing setup

3. In the work-stealing setup

As expected, the slave in the Master-Slave setup is the most resource hungry. A large amount of memory is used up when the slave starts; these are mostly messages to the slave's node controller that tell the node which processes to spawn. The heap-profile of the work-pushing slave has a similar shape, but with a much lower peak: in this case the memory is used to store the messages (containing just an integer) from the master to the slave process.

The heap profile of the work-stealing slave looks different. The increase in size you see comes from the algorithm used: it takes more memory to compute the number of prime factors of larger numbers. Note however that this only comes to 550 kB in total; the master-slave and work-pushing slaves had the same behaviour, but the memory used by the factorization algorithm was insignificant.

Execution time

Finally, let's plot execution time versus number of slaves:

Execution time vs Number of Slaves

Execution time (in seconds) versus number of slaves

Ignoring the absolute values, we can conclude that the Master-Slave example has a lot of overhead and is significantly slower than the other two setups. The work-pushing setup is slightly faster than the work-stealing setup, probably due to the higher latency we mentioned above. Finally, speedup is much less than linear in the number of slaves; this is due to the relatively high communication overhead compared to the cost of the actual computation.

Exercise 5: Extend the time/memory comparison to the work-stealing-with-low-latency setup from Exercise 4 (from Part 1). You should find that it's is as fast as the work-pushing setup and that its heap profile is comparable to the work-stealing setup.

To be continued

In the next blog post we will see how these communication patterns can be generalized as a Map-Reduce skeleton.


Previous entries

Next entries