Concurrency in F# – Part III – Erlang Style Message Passing

Why is the introduction of Erlang style message passing into F# interesting? Well you may have never heard of Erlang but if you’ve ever used a cell phone you’ve probably used an Erlang system. Erlang was originally built by Ericsson and released as open source in 1998, it was built to support the highly distributed and fault-tolerant systems required to support mobile phone networks. Many people consider Erlang to be the language that gets concurrency right.

So if you want to do concurrency why not just use Erlang? Well the fact that Erlang is built on its own custom runtime means there are few libraries or tools available for it, well few compared to the number available for .NET. Tim Brady has series of post on Erlang were he criticizes it for slow file I/O and regular expressions; again I would put these problems down to the custom run time. So taking what’s good from Erlang and put it into a language, that’s build on a platform that has lots of libraries and tools and good file I/O and fast regular expressions, would seem like a very good idea.

Erlang programs are typically composed of agents that pass messages to each other, the messages being passed between agents via a message queue. In F# we create an agent using the MailboxProcessor.Start function, this function takes a function as a parameter and is passed an instance of the message and must return an asynchronous workflow. In this work flow you will typically read messages from the mailbox and process them. Below we see an example of a word counting agent, that is an agent that will count the number of times it is passed a word. It is the sort of thing we might use to perform statistical analysis on a text:

/// The internal type of messages for the agent

type Message = Word of string | Fetch of IChannel<Map<string,int>> | Stop

 

type WordCountingAgent() =

    let counter = MailboxProcessor.Start(fun inbox ->

             // The states of the message processing state machine...

             let rec loop(words : Map<string,int>) =

                async { let! msg = inbox.Receive()

                        match msg with

                        | Word word ->

                            if words.ContainsKey word then

                                let count = words.[word]

                                let words = words.Remove word

                                return! loop(words.Add (word, (count + 1)) )

                            else

                                // do printfn "New word: %s" word

                                return! loop(words.Add (word, 1) )

                               

                        | Stop ->

                            // exit

                            return ()

                        | Fetch  replyChannel  ->

                            // post response to reply channel and continue

                            do replyChannel.Post(words)

                            return! loop(words) }

 

             // The initial state of the message processing state machine...

             loop(Map.empty))

 

    member a.AddWord(n) = counter.Post(Word(n))

    member a.Stop() = counter.Post(Stop)

    member a.Fetch() = counter.PostSync(fun replyChannel -> Fetch(replyChannel))

 

There are two things worth noting about the overall design, first we use a sum type to represent all the possible types of messages, this is a very common pattern for this style of programming and secondly we wrap our agent in a class to provide a friendlier interface to the outside world. A happy side effect of this is that other .NET languages would find this class really easy to use too.

Now if we look more at the implementation details we see all the work is done in the function we pass to MailboxProcessor.Start. Here we read the messages we are posted and perform the relevant actions. The actual work of counting words is done in the “Word” action, here we use a Map, a function data structure similar to a dictionary but immutable, to store the words along with the count for the number of times it has been found. We use the Post function of the MailboxProcessor to post the Word message to the message queue. In the “Fetch” action we return the current Map containing all the words found to date, notice how this is implemented using the special PostSync function provided by the MailboxProcessor. And the “Stop”action stops the agent J

The advantage of implementing the word counting agent in this way is the agent is now thread safe and can be shared between threads working on related texts freely. Also as we use an immutable map to store the state we can pass this out to the outside world and carry on processing without having to worry about it becoming inconsistent and corrupted.

To demonstrate this I wrote some code to read from text files and analyze the numbers of occurrences of each word:

let counter = new WordCountingAgent()

 

let readLines file =

  seq { use r = new StreamReader( File.OpenRead file )

        while not r.EndOfStream do yield r.ReadLine() }

 

let processFile file =

    let lines = readLines file

    for line in lines do

        let punctuation = [| ' '; '.'; '"'; ''';

          ','; ';'; ':'; '!'; '?'; '-'; '('; ')'; |]

        let words = line.Split(punctuation)

        for word in words do

            if word.Length > 0 then

                counter.AddWord word

 

let printWords = false

 

let main() =

    let autoResetEvent = new AutoResetEvent(false)

    let files = Directory.GetFiles(@"C:\Users\robert\Documents\Fielding")

    let i = ref 0

    for file in files do

        use readfile = new BackgroundWorker()

        readfile.DoWork.Add(fun _ ->

            printfn "Starting '%s'" (Path.GetFileNameWithoutExtension file)

            processFile file |> ignore )

        readfile.RunWorkerCompleted.Add(fun _ ->

            printfn "Finished '%s'" (Path.GetFileNameWithoutExtension file)

            incr i

            if !i = files.Length then

                autoResetEvent.Set() |> ignore)

        readfile.RunWorkerAsync()

    while not (autoResetEvent.WaitOne(100, false)) do

        let words = counter.Fetch()

        printfn "Words: %i" words.Count

    let res = counter.Fetch()

   

    printfn "Finished Words: %i" res.Count

    if printWords then

        res.Iterate (fun k v -> printfn "%s : %i" k v)

    counter.Stop()

    read_line()

   

main()

 

You’ll see in the implementation we use the background worker thread to process the texts, this because we need notification of when the processing has finished and asynchronous workflows do not yet offer this. We could very well have used stuff from the “Task Parallel Library”, but I didn’t simply because I thought it would make it easier for people to test the sample for themselves. More on the Task Parallel Library later.

I’m not sure this is a very realistic approach to the problem as if we were doing it properly we’d probably analyze each text then merger the results (because know the results for each individual text could also be interesting). Also if we were doing it properly we’d probably pay much more attention to how we split up the words. But at the end of the day it is a good way to provide work to test our agent.

I choose the analyze the works of Henry Fielding, which I downloaded from http://www.gutenberg.org, I choose Fielding because  I have a soft spot for Tom Jones, as also because the number of works available on project Gutenberg from Fielding was small enough so I could download them all yet large enough to provide a decent amount of work.

We run on my duel core machine we get the following output (varies slightly from run to run):

Starting 'From This World to the Next'

Starting 'Amelia'

Starting 'History of Tom Jones, a Foundling'

Starting 'Joseph Andrews Volume 1'

Starting 'Joseph Andrews Volume 2'

Starting 'Journal of a Voyage to Lisbon - Volume 1'

Starting 'The History of the Life of the Late Mr Jonathan Wild the Great'

Starting 'The Works of Henry Fielding'

Words: 1236

Finished 'From This World to the Next'

Finished 'Joseph Andrews Volume 1'

Finished 'Journal of a Voyage to Lisbon - Volume 1'

Finished 'The Works of Henry Fielding'

Finished 'Joseph Andrews Volume 2'

Finished 'The History of the Life of the Late Mr Jonathan Wild the Great'

Finished 'Amelia'

Finished 'History of Tom Jones, a Foundling'

Words: 16610

Finished Words: 24469

 

There’s a couple of things worth noting about the output. First for quite a while it makes both processors work at full speed. This would seem to suggest that we’re getting the number of threads right – there are threads to carry on working while the other threads are blocked doing I/O. Here the Task Parallel Library’s TaskManager class might have helped us limited the number of threads in action at one time and reduce context switching overhead. But after the last book is processed and we see: Finished 'History of Tom Jones, a Foundling' one process carries on working on its own while the other idles, it takes a while before we see Finished Words: 24469. This effectively means we’ve overloaded our agent and it has not been able to process all the words that we put into it, but thanks to the message queue it did eventually catch up when more work stopped being added. This is why this style of programming is a good choice when the work loaded is varied, even if an agent can process all it work at peak times it can store work up till the system has some idle time and finish processing then.

It’s also worth noting we only ever see too intermediate word counts (Words: 1236 and Words: 16610). The code that produces these word counts is as follows:

    while not (autoResetEvent.WaitOne(100, false)) do

        let words = counter.Fetch()

        printfn "Words: %i" words.Count

 

So given that samples runs for about considerable longer that 200 m/s we might expect to see alot more that 2 intermediate word counts. We don’t because fetching the dictionary has to be done synchronously so our “Fetch” action has to wait in the queue to be process. This means that the thread is blocked at the counter.Fetch() instruction for considerably longer than it is blocked at autoResetEvent.WaitOne(100, false), and thus we only see too intermediate counts.

 

Wrapping it up, I think we’ve see than Erlang style message passing provides an interesting model for creating concurrent applications. Even if a more realistic application would be composed of many agents working together passing messages to each other we have seen the core of an agents tasks, it ability to keep a consistent data structure while being called from many different threads. Writing this post made me reflect on the difference between this style of programming and the style offered by the TaskManager in the Task Parallel Library. It seems to me that in the Task Parallel Library you only ever control inputting messages/actions to the queue where as using a MailboxProcessor in F# allows you to control both ends of the queue. The TaskManager has the advantage that it can use also sort of clever heuristics to know when to start new threads/actions as it controls the executions of new tasks. However, using a MailboxProcessor in F# would seem to offer some interesting possibilities not yet possibly with the Task Parallel Library.

Anyway, this series will continue!

Bookmark
dotnetkicks+, digg+, reddit+, del.icio.us+, dzone+, facebook+

Print | posted @ Wednesday, October 24, 2007 10:55 PM

Comments on this entry:

Gravatar # re: Concurrency in F# – Part III – Erlang Style Message Passing
by Tony Nassar at 12/11/2007 3:49 PM

Robert, I realize you have other work to do, but could you take a second to explain return!? This keyword has only just appeared, and I can't find any documentation of it.
Gravatar # re: Concurrency in F# – Part III – Erlang Style Message Passing
by Ulf Wiger at 1/30/2008 11:30 PM

Short note on the Tim Bray experiments: the slow file I/O was primarily due to the use of a function intended for human interaction - not batch-mode file processing. That doesn't invalidate your comparison, of course. It's good to have a runtime with lots of library support.

What you're showing in your post isn't quite Erlang-style concurrency, but rather perhaps Haskell-style concurrency ;-) If I understand your mailboxes, a thread can handle several mailboxes at any given time. Erlang processes have one mailbox, and allows for selective pattern matching on its contents. The big issue is that you need scoped message reception, and multiple mailboxes can give you that. You might want to take a look at http://www.erlang.se/euc/05/1500Wiger.ppt and sketch a solution in F#.

The next steps in erlang-style concurrency are distribution transparency, and process linking and supervision. Then you just need to verify that your implementation scales to at least a few tens of thousand concurrent threads, and you're home free. (:
Gravatar # re: Concurrency in F# – Part III – Erlang Style Message Passing
by Robert Pickering at 1/31/2008 9:55 AM

Thanks for your feedback, lately I have been reading Joe Armstrong’s “Programming Erlang Software for a Concurrent World” and have come to release some of the limitations of F# mailboxes with respect to Erlang. Clearly the ability to distribute processes so easily is a massive advantage and Erlang’s light weight threading is very nice too.

I believe it may be possible to simulate some kind of process linking/supervision tree like behaviour with current messages box implementation, and I’ll definitely take a look at your suggestions of how to use F# mailboxes in a more Erlang like way. However transparent distribution is pretty much out of the question for the moment.

In short I’m not trying to pretend that F# concurrency story is anywhere near as good as Erlang’s, but I feel its asynchronous workflows are a small step in the right direction.
Gravatar # F# Asynchronous Workflows with the Coordination and Concurrency Runtime
by The monad nomad at 3/22/2010 1:24 PM

F# Asynchronous Workflows with the Coordination and Concurrency Runtime

Your comment:

(Note: all comments are moderated so it may take sometime to appear)

Title:
Name:
Email:
Website:
 
Italic Underline Blockquote Hyperlink
 
 
Please add 5 and 3 and type the answer here: