Deterministic parallelism with logging

Anton Tayanovskyy

Anton Tayanovskyy

Feb 06, 2014

Reading time:

7 mins

Share via:

Many applications want to use multiple cores to execute faster, while retaining the same observable behavior as the sequential version. Async in F# goes a long way to support this, but its parallel sections do not order effects at all. Sometimes you want a combination of ordered and unordered effects. For example, you might want a logging "effect" that displays log messages strictly in the order the program is written, while other effects are free to be scheduled as the machine sees fit. And you want the logs to be processed as soon as available: delaying them until all processing is done is not user-friendly.

Let us do exactly that. The F# presented is definitely not the only, and perhaps not the best way to solve this problem. But I hope the solution is simple enough to convince you that the problem is not a hard one.

First, what are we after? As a model for parallelism, let us take something like the Haskell Par monad with IVar for synchronization, which is an old idea that I find is particularly well presented in this paper1. It is simply fork/join with joins accomplished by reading "write-once" variables, which amounts to waiting on the thread to write to the variable. Quite similar to BCL Task model or F# Async SpawnChild in fact. To that we add a logging primitive. We might use it like this, to implement parallel Fibonacci:

let rec ParFib (n: int) : Par<int> =
    par {
        do! Par.Log (Message (sprintf "%i" n))
        do! Par.DoAsync (Async.Sleep 250)
        match n with
        | 0 | 1 ->
            return 1
        | n ->
            let! a = Par.Spawn (ParFib (n - 1))
            let! b = ParFib (n - 2)
            let! a = Par.Await a
            return a + b
    }

Note Par.Spawn which forks its argument, and returns a Future<'T> (this is our IVar), which we then Par.Await to join to the main thread.

When we run this, we expect the messages to appear in the same order as the sequential version, but the program to run faster. We also expect to see logs appearing right away, before the execution finishes. Here is the sequential version:

let rec SFib (n: int) : Par<int> =
    par {
        do! Par.Log (Message (sprintf "%i" n))
        do! Par.DoAsync (Async.Sleep 250)
        match n with
        | 0 | 1 ->
            return 1
        | n ->
            let! a = SFib (n - 1)
            let! b = SFib (n - 2)
            return a + b
    }

How to implement this? The only interesting part is combining two Par computations in a way that preserves the log order. If we could have some notion of Stream this would be easy - to combine A and B we would append the streams produced by both. The tricky part is that we should return the combined stream before the computation is done.

It is, in fact, possible to do so. We can implement Stream as you would a lazy list, but replacing Lazy<'T> suspensions with Future<'T> suspensions. The only really tricky function is monadic bind for Par. Here is how it might read:

type Par<'T> =
    | Par of (unit -> Stream<Message> * Future<'T>)

module Par =
    let Bind (Par x) (f: 'T1 -> Par<'T2>) : Par<'T2> =
        Par <| fun () ->
            let (streamHead, x) = x ()
            let result = Future.Create()
            let streamVar = Future.Create()
            async {
                try
                    let! x = Future.Await x
                    let (Par yF) = f x
                    let (yS, yV) = yF ()
                    do Future.Set streamVar yS
                    let! y = Future.Await yV
                    return Future.Set result y
                with e ->
                    return Future.Fail result e
            }
            |> Async.Start
            let stream = Stream.Append streamHead (Stream.FromFuture streamVar)
            (stream, result)

That is, we construct a stream by appending a ready part with a future part, and we "set" the future part from a new thread when it is available.

Here is the complete code you can load in FSI to play with: https://gist.github.com/t0yv0/8846137 - it comes with an optimistic lock-free Future, a very simple Stream. It works great for the SFib/ParFib examples. Before copying this to use in production, please note that there might be some bugs lurking in exception semantics for Future - I have not taken the time to verify it yet; also, the optimistic implementation is fun but locking might perform better - your mileage will vary, and I did not do benchmarks.

In conclusion, I hope you are now convinced that:

  • Deterministic logging in presence of parallelism is not hard
  • Write-once variables (here: Futures) are a simple and useful synchronization primitive

  1. This link has been updated to point to new location.

Read more from

Can’t find what you were looking for? Drop us a line.

Anton Tayanovskyy
Found a typo?

This blog post is hosted on GitHub here. Feel free to file a ticket or send a PR.

Newsletter

We will not spam you or give your details to anyone.