Concurrency in F# – Part I – The Asynchronous Workflow

So I was going to start this series with an over of concurrency options on the .NET framework, but after playing F# asynchronous workflows, I decided to blog about them as they just too exciting to keep quiet about. Maybe I’ll spin back at the end of the series and do a summary of other options at the end of the series.

This post will look at how to use concurrent workflows; it will not look into the mechanics of workflows themselves, which are explained in this nice blog post from Don Syme.

First we’ll walk though what you need to do to use asynchronous web services, then we’ll dig into what’s going on. Okay, so imagine we’re building a chemistry applications, we want to use this period table web service to look up information about the elements. To do this we build a proxy in the usual way.

wsdl /namespace:PeriodicTableWS http://www.webservicex.net/periodictable.asmx?wsdl

csc /target:library /out:PeriodicTable.dll PeriodicTable.cs

This creates us a proxy to the web service in PeriodicTable.dll, we can then use this to all web service. To call the service asynchronously we need to add a couple of methods to it to wrap the existing BeginXXX and EndXXX methods.

type PeriodicTableWS.periodictable with

    member ws.GetAtomsAsyncr() =

        AsyncExtras.BuildPrimitive(ws.BeginGetAtoms,

                                   ws.EndGetAtoms)

 

type PeriodicTableWS.periodictable with

    member ws.GetAtomicWeighAsyncr(s) =

        AsyncExtras.BuildPrimitive(s,

                                   ws.BeginGetAtomicWeight,

                                   ws.EndGetAtomicWeight)

 

Unfortunately the AsyncExtras type didn’t make it into the 1.9.2 release, but we can add our definition of this very usefully type, adding this definition locally won’t be necessary in future releases, you’ll be able to remove it and use the definition from the libraries.

type AsyncExtras =

        [<OverloadID("BuildPrimitve_zero_arg")>]

        static member BuildPrimitive(beginFunc,endFunc) =

            Async.Primitive(fun (cont,econt) ->

                (beginFunc(System.AsyncCallback(fun iar -> protect econt endFunc iar cont),(null:obj)) : System.IAsyncResult) |> ignore)

 

        [<OverloadID("BuildPrimitve_one_arg")>]

        static member BuildPrimitive(arg1,beginFunc,endFunc) =

            Async.Primitive(fun (cont,econt) ->

                (beginFunc(arg1,System.AsyncCallback(fun iar -> protect econt endFunc iar cont),(null:obj)) : System.IAsyncResult) |> ignore)

 

        [<OverloadID("BuildPrimitve_two_arg")>]

        static member BuildPrimitive(arg1,arg2,beginFunc,endFunc) =

            Async.Primitive(fun (cont,econt) ->

                (beginFunc(arg1,arg2,System.AsyncCallback(fun iar -> protect econt endFunc iar cont),(null:obj)) : System.IAsyncResult) |> ignore)

 

 

So now we get to the good bit, the workflow itself, shown below:

 

let atomsWorkFlow =

    async { let pt = new PeriodicTableWS.periodictable()

            do wl ( threadid() + "Get Element Data List")

            let! atoms = pt.GetAtomsAsyncr()

            let atoms = getNodeContentsList atoms "/NewDataSet/Table/ElementName"

            do wl ( threadid() + "Got " + atoms.Length.ToString() + " Elements")

            return atoms }

 

let atoms = Async.Run (atomsWorkFlow)

 

The workflow looks pretty much like an ordinary piece of F#, as we know from Don’s post atomsWorkFlow is just a data structure, the instructions it contains will not happen till we interpret it, we see this at the bottom of the listing, the call to Async.Run. The “magic” bit of this work from is the call “let! atoms = pt.GetAtomsAsyncr()” this will be interpreted as asynchronous binding, the I/O will happen, but a thread will not be blocked while it happens. Once the O/S has notified that the binding has happen the workflow will resume. So just making one call asynchronously isn’t especially interesting, so below I show how we then make further calls to get the service, this time to get the atomic weight of each atom.

let printAtomDetailsWorkFlow atom =

    async { let pt = new PeriodicTableWS.periodictable()

            do wl ( threadid() + "Get Data For: " + atom)

            let! weight = pt.GetAtomicWeighAsyncr(atom)

            let weight = selectSingleNode weight "/NewDataSet/Table/AtomicWeight"

            do wl ( threadid() + atom + ": " + weight) }

               

               

for atom in atoms do

    Async.Spawn (printAtomDetailsWorkFlow atom)

 

This workflow is very similar to the last one we saw, there are only a couple of small differences. Here we use a function, printAtomDetailsWorkFlow, to create the workflow, so that we can parameterize it with the name of the atom, this probably a more normal way to work with workflows. Secondly we use the function Spawn to execute the workflow, this is the “fire and forget” way of executing workflows that don’t return a value. When run, on my dual processor machine, we see an output something like:

[.NET Thread 1]Get Element Data List

[.NET Thread 7]Got 112 Elements

[.NET Thread 8]Get Data For: Actinium

[.NET Thread 9]Get Data For: Aluminium

[.NET Thread 10]Get Data For: Americium

[.NET Thread 8]Get Data For: Antimony

[.NET Thread 10]Get Data For: Argon

[.NET Thread 8]Get Data For: Arsenic

[.NET Thread 10]Get Data For: Astatine

[.NET Thread 8]Get Data For: Barium

[.NET Thread 10]Get Data For: Berkelium

...

[.NET Thread 5]Americium: 243

[.NET Thread 7]Actinium: 227

[.NET Thread 7]Argon: 39.948

[.NET Thread 7]Antimony: 121.75

[.NET Thread 7]Astatine: 210

[.NET Thread 7]Arsenic: 74.9216

[.NET Thread 7]Berkelium: 249

[.NET Thread 11]Barium: 137.34

[.NET Thread 11]Bismuth: 208.98000000000002

 

Because this web service seems to be a fairly high latency all the requests are fired off before we get any responses. We can see that two threads, probably one per processor, work their way through the list of request firing them all off. When the results come in it’s important to notice that the result can be processed by a different thread. This, for me, is the amazing thing, a workflow is one logical unit of execution, a logical thread if you will, yet with hardly any work from us it has been executed on two physically different threads – this threading without the usual explicit marshalling and locking of data. It’s also important to that although the requests are returned to us in approximately the order we sent them out, the order can change slightly because of the order that the server answered us in and the random nature of the network.

In the next post we’ll dig a bid deeper into how the workflow is interpreted and then in part III will take a look at “mailboxes” – how you implement Erlang style messaging passing in F#.

As you may have guess the there were a couple of helper functions missed out. The full listing is given below:

#light

#nowarn "57"

open System.Xml

open Microsoft.FSharp.Control.Async

 

let trylet f x = (try Choice2_1 (f x) with exn -> Choice2_2(exn))

 

let protect econt f x cont =

    match trylet f x with

    | Choice2_1 v -> cont v

    | Choice2_2 exn -> econt exn

 

type AsyncExtras =

        [<OverloadID("BuildPrimitve_zero_arg")>]

        static member BuildPrimitive(beginFunc,endFunc) =

            Async.Primitive(fun (cont,econt) ->

                (beginFunc(System.AsyncCallback(fun iar -> protect econt endFunc iar cont),(null:obj)) : System.IAsyncResult) |> ignore)

 

        [<OverloadID("BuildPrimitve_one_arg")>]

        static member BuildPrimitive(arg1,beginFunc,endFunc) =

            Async.Primitive(fun (cont,econt) ->

                (beginFunc(arg1,System.AsyncCallback(fun iar -> protect econt endFunc iar cont),(null:obj)) : System.IAsyncResult) |> ignore)

 

        [<OverloadID("BuildPrimitve_two_arg")>]

        static member BuildPrimitive(arg1,arg2,beginFunc,endFunc) =

            Async.Primitive(fun (cont,econt) ->

                (beginFunc(arg1,arg2,System.AsyncCallback(fun iar -> protect econt endFunc iar cont),(null:obj)) : System.IAsyncResult) |> ignore)

 

 

        [<OverloadID("BuildPrimitve_three_arg")>]

        static member BuildPrimitive(arg1,arg2,arg3,beginFunc,endFunc) =

            Async.Primitive(fun (cont,econt) ->

                (beginFunc(arg1,arg2,arg3,System.AsyncCallback(fun iar -> protect econt endFunc iar cont),(null:obj)) : System.IAsyncResult) |> ignore)

 

 

type PeriodicTableWS.periodictable with

    member ws.GetAtomsAsyncr() =

        AsyncExtras.BuildPrimitive(ws.BeginGetAtoms,

                                   ws.EndGetAtoms)

 

type PeriodicTableWS.periodictable with

    member ws.GetAtomicNumberAsyncr(s) =

        AsyncExtras.BuildPrimitive(s,

                                   ws.BeginGetAtomicNumber,

                                   ws.EndGetAtomicNumber)

 

type PeriodicTableWS.periodictable with

    member ws.GetAtomicWeighAsyncr(s) =

        AsyncExtras.BuildPrimitive(s,

                                   ws.BeginGetAtomicWeight,

                                   ws.EndGetAtomicWeight)

 

let threadid() =

    Printf.sprintf "[.NET Thread %d]" System.Threading.Thread.CurrentThread.ManagedThreadId;

 

let wl (x : string) = System.Console.WriteLine(x)

 

let getNodeContentsList s node =

        let doc = new XmlDocument()

        do doc.LoadXml(s)

        let nodes = doc.SelectNodes(node)

        [ for node in nodes -> node.InnerText ]

 

let atomsWorkFlow =

    async { let pt = new PeriodicTableWS.periodictable()

            do wl ( threadid() + "Get Element Data List")

            let! atoms = pt.GetAtomsAsyncr()

            let atoms = getNodeContentsList atoms "/NewDataSet/Table/ElementName"

            do wl ( threadid() + "Got " + atoms.Length.ToString() + " Elements")

            return atoms }

 

let atoms = Async.Run (atomsWorkFlow)

 

let selectSingleNode s node =

    let doc = new XmlDocument()

    do doc.LoadXml(s)

    let node = doc.SelectSingleNode(node)

    node.InnerText

   

let printAtomDetailsWorkFlow atom =

    async { let pt = new PeriodicTableWS.periodictable()

            do wl ( threadid() + "Get Data For: " + atom)

            let! weight = pt.GetAtomicWeighAsyncr(atom)

            let weight = selectSingleNode weight "/NewDataSet/Table/AtomicWeight"

            do wl ( threadid() + atom + ": " + weight) }

               

               

for atom in atoms do

    Async.Spawn (printAtomDetailsWorkFlow atom)

 

read_line()

Bookmark
dotnetkicks+, digg+, reddit+, del.icio.us+, dzone+, facebook+

Print | posted @ Saturday, September 29, 2007 12:20 PM

Comments on this entry:

Gravatar # Concurrency in F# - Understanding how Asynchronous Workflows Work
by Robert Pickering's Strange Blog at 10/15/2007 9:46 PM

Concurrency in F# - Understanding how Asynchronous Workflows Work
Gravatar # re: Concurrency in F# – Part I – The Asynchronous Workflow
by Sean at 1/29/2009 7:27 AM

Would it be possible to call this Async workflow from C# code? I really want to have a function that will return on a different thread.
Gravatar # F# and Message-Passing
by The monad nomad at 3/22/2010 1:21 PM

F# and Message-Passing

Your comment:

(Note: all comments are moderated so it may take sometime to appear)

Title:
Name:
Email:
Website:
 
Italic Underline Blockquote Hyperlink
 
 
Please add 2 and 2 and type the answer here: