Many applications want to use multiple cores to execute faster, while retaining the same observable behavior as the sequential version. Async in F# goes a long way to support this, but its parallel sections do not order effects at all. Sometimes you want a combination of ordered and unordered effects. For example, you might want a logging "effect" that displays log messages strictly in the order the program is written, while other effects are free to be scheduled as the machine sees fit. And you want the logs to be processed as soon as available: delaying them until all processing is done is not user-friendly.
Let us do exactly that. The F# presented is definitely not the only, and perhaps not the best way to solve this problem. But I hope the solution is simple enough to convince you that the problem is not a hard one.
First, what are we after? As a model for parallelism, let us take something like the Haskell Par
monad with IVar
for synchronization, which is an old idea that I find is particularly well presented in this paper1. It is simply fork/join with joins accomplished by reading "write-once" variables, which amounts to waiting on the thread to write to the variable. Quite similar to BCL Task
model or F# Async SpawnChild
in fact. To that we add a logging primitive. We might use it like this, to implement parallel Fibonacci:
let rec ParFib (n: int) : Par<int> =
par {
do! Par.Log (Message (sprintf "%i" n))
do! Par.DoAsync (Async.Sleep 250)
match n with
| 0 | 1 ->
return 1
| n ->
let! a = Par.Spawn (ParFib (n - 1))
let! b = ParFib (n - 2)
let! a = Par.Await a
return a + b
}
Note Par.Spawn
which forks its argument, and returns a Future<'T>
(this is our IVar
), which we then Par.Await
to join to the main thread.
When we run this, we expect the messages to appear in the same order as the sequential version, but the program to run faster. We also expect to see logs appearing right away, before the execution finishes. Here is the sequential version:
let rec SFib (n: int) : Par<int> =
par {
do! Par.Log (Message (sprintf "%i" n))
do! Par.DoAsync (Async.Sleep 250)
match n with
| 0 | 1 ->
return 1
| n ->
let! a = SFib (n - 1)
let! b = SFib (n - 2)
return a + b
}
How to implement this? The only interesting part is combining two Par
computations in a way that preserves the log order. If we could have some notion of Stream
this would be easy - to combine A and B we would append the streams produced by both. The tricky part is that we should return the combined stream before the computation is done.
It is, in fact, possible to do so. We can implement Stream
as you would a lazy list, but replacing Lazy<'T>
suspensions with Future<'T>
suspensions. The only really tricky function is monadic bind for Par
. Here is how it might read:
type Par<'T> =
| Par of (unit -> Stream<Message> * Future<'T>)
module Par =
let Bind (Par x) (f: 'T1 -> Par<'T2>) : Par<'T2> =
Par <| fun () ->
let (streamHead, x) = x ()
let result = Future.Create()
let streamVar = Future.Create()
async {
try
let! x = Future.Await x
let (Par yF) = f x
let (yS, yV) = yF ()
do Future.Set streamVar yS
let! y = Future.Await yV
return Future.Set result y
with e ->
return Future.Fail result e
}
|> Async.Start
let stream = Stream.Append streamHead (Stream.FromFuture streamVar)
(stream, result)
That is, we construct a stream by appending a ready part with a future part, and we "set" the future part from a new thread when it is available.
Here is the complete code you can load in FSI to play with: https://gist.github.com/t0yv0/8846137 - it comes with an optimistic lock-free Future
, a very simple Stream
. It works great for the SFib/ParFib
examples. Before copying this to use in production, please note that there might be some bugs lurking in exception semantics for Future
- I have not taken the time to verify it yet; also, the optimistic implementation is fun but locking might perform better - your mileage will vary, and I did not do benchmarks.
In conclusion, I hope you are now convinced that:
This link has been updated to point to new location.↩
Can’t find what you were looking for? Drop us a line.