This post is my contribution towards the 2021 F# Advent Calendar.

Introduction

I’ve been a fan of F# for many years, dabbling with it on and off during that time. However, I recently had the opportunity to start using it every day professionally (finally). No longer am I trying to convince my colleagues to check out this awesome F# language, but rather, I’m surrounded by coworkers that know so much more about it than I do. I’ve learned as much or more in the past four months than I have in the past 5 years. A real project with code reviews, knowledgeable colleagues, QA etc. really accelerates the learning process. In short, I’m loving it!

One of the things that really surprised me is how often we use the MailboxProcessor class. I’d seen it in passing, but in the past 5 years, I never really found a use for it. It seemed like one of those esoteric features of the language that only a handful of people would need. Ironically, as someone who also dabbled in Elixir over the years, I’m surprised it took me this long to realize how useful the MailboxProcessor can be.

One of the best ways for me to learn something is to start playing with it and then blog about it to gather my thoughts (whether I’m doing it right or not). So this article is an exploration of the MailboxProcessor with some silly examples. I hope you find it useful or at the very least, entertaining.

What’s a MailboxProcessor?

So what is a MailboxProcessor? If you’ve ever heard of the actor model, which is an architecture designed for concurrency using message passing, the MailboxProcessor can be thought of as F#’s lightweight and simpler subset of that solution using agents (which cannot cross process boundaries). I’ve noticed many people create a type alias around the MailboxProcessor to actually call it Agent. However, the name MailboxProcessor does make some sense, as messages are Posted and Received.

Officially, the documentation says:

A message-processing agent which executes an asynchronous computation.

What’s a MailboxProcessor Good For?

I’m glad you asked. Surprisingly, you get a lot of nice things for free when you create a MailboxProcessor, such as thread-safety without all the locking nonsense and a built-in queue that acts like a non-blocking buffer to allow messages to be passed in concurrently, but processed serially and in order (with an option to give some messages higher priority). And with F#, you have the power of discriminated unions to provide strongly-typed messages that can be pattern matched, rather than relying on strings.

So with these things in mind, the MailboxProcessor is great for managing shared state (database access, in-memory cache operations etc.), IO operations, scaling and coordination (instances are lightweight so thousands can be spawned and can easily pass messages to each other), creating state machines, etc. The list goes on!

Time To Play

For this blog post, we’re going to try to do a few of these things. I’ll break it down into smaller pieces as we build up a rather silly example.

Output Module

First, we’re going to build a simple agent that will allow a message to be printed. This can be to the console, to a file, whatever works for you. We’ll build the agent first, then we’ll see how to encapsulate it into an object and expose a nicer API for it.

First, we create a simple single-case discriminated union to represent our message type. We can request to Print a string.

Next, we create a function that takes a parameter named output (of type (string -> unit)), which then returns a MailboxProcessor<OutputMessage>. Inside the function, we follow the typical pattern for simple agents (I will use this term interchangeable with MailboxProcessor):

  • Start the agent
  • Pass in a lambda function, closing over the agent (inbox is a fairly common name)
  • Create a recursive loop function with an asynchronous computation
  • Wait to receive a message (let! ensures it doesn’t block)
  • When a message arrives, match against our message type (we only have Print)
  • Output our message with the passed in output function
  • Recursively call our loop function

We also need to kick off our initial call to the loop function outside of the async block.

Then we test it.

First, we create an agent that uses the Console.WriteLine method as its output function parameter. Next, we create a couple of Print messages, passing them to the agent’s Post function. I’ve included (most of) the F# Interactive (FSI) output.

As I mentioned earlier, we’d like to encapsulate this agent into an object to expose a nicer API (this will make more sense when we have more complicated agents later).

I’ve polished this up a bit and moved it into a module called Output. We still have our OutputMessage type (though it is now internal to the module) with a single Print case. However, you’ll notice that there is now a Printer type that takes the output function parameter from our previous code as a constructor parameter instead. The agent is private to the Printer type and exposes a new member called Print that hides away some implementation details. No longer does the consumer need to know to use the Print message type, nor do they need to know about the agent and its Post method.

A similar test is shown, creating a consolePrinter and calling the Print member a couple times.

Vending Module

Next up, we’re going to create a vending machine agent, which sells a single type of item. You can interact with this vending machine by sending it messages to do one of the following:

  • Buy an item
  • Get the current stock
  • Restock the vending machine

The vending machine also keeps track of orders when someone tries to buy an item when it’s out of stock. Once it is restocked, it will immediately fulfill those orders. In this way, it’s like an all you can eat sushi restaurant. You can keep ordering more food while you’re still waiting for your previous orders, but make sure you’re hungry. When it does come eventually, you’ll be paying for that extra food if you don’t eat it!

We’ll start by creating a new module (it will eventually require access to the previous Output module, which is why I had put [<AutoOpen>] above its declaration, but if you don’t want them in the same file, you can manually add an open declarion). In the new module, we’ll start with our messages.

Again, we’re making the VendingMachineMessage type internal, because we’ll be hiding the message types away in a nicer API via members on the type we will create shortly.

A few things to note:

  • Buying an item requires passing in a customer ID
  • Getting stock will return the quantity, so requires an AsyncReplyChannel in which to send the response
  • Restocking requires supplying the quantity we have added

Except for the middle one there, it should be pretty straight forward.

Now let’s add the VendingMachine type, that will encapsulate the agent. This one will be a little more complicated, as we are going to introduce a state machine.

[<AutoOpen>]
module Vending =
type internal VendingMachineMessage =
| BuyItem of custId: int
| GetStock of AsyncReplyChannel<int>
| Restock of qty: int
type VendingMachine(initialStock, output : Printer) =
let agent = MailboxProcessor<VendingMachineMessage>.Start(fun inbox ->
let rec stocked stock = async {
let! msg = inbox.Receive()
match msg with
| BuyItem id ->
let newStock = stock - 1
output.Print (sprintf $"Vend item to customer {id}\nNew stock: {newStock}")
if newStock = 0 then
return! empty()
else
return! stocked newStock
| GetStock channel ->
channel.Reply(stock)
return! stocked stock
| Restock qty ->
let newStock = stock + qty
output.Print (sprintf $"Restocking with {qty}\nCurrent stock: {newStock}")
return! stocked newStock }
and empty () = inbox.Scan(fun msg ->
match msg with
| BuyItem id ->
output.Print (sprintf $"Request for customer {id} will be fulfilled after machine is restocked")
None
| GetStock channel ->
channel.Reply(0)
Some(empty())
| Restock qty ->
output.Print (sprintf $"Restocking with {qty}\nCurrent stock: {qty}")
Some(stocked qty))
stocked initialStock)
member _.BuyItem custId = agent.Post(BuyItem custId)
member _.GetStock() = agent.PostAndReply(GetStock)
member _.Restock qty = agent.Post(Restock qty)
view raw Vending_2.fsx hosted with ❤ by GitHub

This might look scary at first, but it follows a similar structure to our previous agent.

First, our constructor parameters for the VendingMachine are an initial stock qty and a Printer (the type we defined in our Output module).

Next, we start our MailboxProcessor as we’ve done before, but this time our recursive loop function is called stocked and it has a parameter called stock. This is because one of the states in our state machine is stocked and it is passed some state called stock (the current stock in the vending machine). As before, we start an async computation expression and wait to receive a message. Once we receive a message, we match against our three message types:

  • BuyItem
    • Extracts the customer ID
    • Creates a new stock state with one less than before
    • Uses our printer to output some information
    • Checks if we have any stock left; if no, go to empty state; otherwise, recursively loop with new stock state
  • GetStock
    • Extracts the async reply channel
    • Replies with the current stock state value, using the channel
    • Recursively loops with the current stock state
  • Restock
    • Extracts the quantity to add
    • Calculates new stock
    • Uses our printer to output some information
    • Recursively loops with new stock state

This is all quite similar to our first agent, except this time we are carrying some state between each iteration. We are doing it in a functional way that avoids mutation, which is always nice (you can safely use mutable state if you wish in a MailboxProcessor, however, and you can also use a while true loop in place of recursion… but to each his own).

I’ll repeat the code snippet here so you can avoid scrolling up too much.

Now to our second state in our state machine, empty. First off, it is defined using the and keyword, because the two functions are mutually recursive. That is, they can call each other. In order to get around the F# compiler rule that something must be defined before it can be called, this is how you do it.

For our empty state, we require no state parameter, because… well, it’s empty. Since there’s no stock, we just receive unit. We’ve also introduced a new way to grab a message. Instead of Receive() we are calling Scan. In this case, Scan accepts a function that takes a VendingMachineMessage and returns an Option<Async<‘T>>. We can just pass in a lambda expression where it closes over the message and does a pattern match.

What Scan is allowing us to do is to prioritize certain types of messages, by returning Some(Async<‘T>) or ignoring others (which will remain to be processed later, in the order they first arrived), by returning None.

So when the vending machine is empty, we do the following:

  • BuyItem
    • Uses our printer to output some information
    • Leaves the message unprocessed until further stock is available
  • GetStock
    • Extracts the async reply channel
    • Replies that there is 0 stock
    • Recursively loops in the empty state, wrapping it in a Some (message is now processed)
  • Restock
    • Extracts the quantity to add (this is also the new stock quantity)
    • Uses our printer to output some information
    • Recursively loops to the stocked state, passing in the new stock quantity

See, that wasn’t too scary.

We also need to start our loop, which we do by putting the agent in the stocked state with the initialStock that was passed in to the constructor. We should be checking that this value to make sure it is > 0, but I’ve left out error checking for the examples.

We’ve also got three members, which are used to hide away the agent, the message types and the Post calls. For one of the members we actually call PostAndReply, which is kind enough to automatically generate the required asyncReplyChannel for us (you can also provide one manually).

I should also note that there are many other MailboxProcessor methods available with Async and Try variations. I encourage you to play around with them.

Ok, let’s try it out. I’ll just show the test code here, rather than repeat all the previous code.

First we create a Printer that uses the console, then a VendingMachine that has an initial stock quatity of 3 and our console printer.

Next, we use the GetStock member of our vending machine and print the returned value.

For buying items, we need customer IDs, so we create a couple of those. We alternate buying items for each customer, noting that the vending machine prints the output (vend item to customer x and tells us the new stock).

Once the vending machine is empty, we see the messages change. Customers will have their orders fulfilled after being restocked. We add 3 more stock via the Restock method and see that immediately, the previous customer orders that are pending are filled (and in the same order the BuyItem messages were originally received).

Then, we add 7 more stock, just because we can.

Quick Recap

So far, we have seen how to create agents/MailboxProcessors that use strongly-typed messages and pattern matching to control behaviour and allow a safe way to manage state. We encapsulated our agents inside of reusable types to allow hiding away implementation details and exposing nicer APIs (and reusing the agents if we have such a need). Additionally, we’ve created a simple state machine by using mutually recursive functions that allows transition between the various states and associated behaviours. Lastly, by using our Printer agent from within our VendingMachine agent, we saw that agents can also intercommunicate via message passing.

Not too bad!

We’re now going to introduce a third agent type, which will eventually let us throw some concurrency in the mix.

Customer Module

Our final agent will represent customers of the vending machine. This will be a very simple agent, much like our Printer.

As usual, we’ll start with a module and our internal message type. For this scenario, a customer can either be hungry and Order from the vending machine or can say they are Done because they are full.

For this HungryCustomer agent, we pass via the constructor a VendingMachine, customer ID (we could have just as easily generated one automatically I suppose) and a Printer agent.

We’re not going to go over this agent in detail, as there is nothing you haven’t already seen. Well, except for one piece. Inside the recursive loop, when we receive a Done message, the loop exits with a return (), showing that the agent is now finished its job. Ideally all agents should have some sort of message that allows them to be stopped or perhaps paused/restarted where applicable, but that’s something you can explore later. I should also mention that cancellation tokens can be used with mailbox processors too (passed in as an optional parameter via either the static Start constructor function or the normal constructor). However, that’s out of scope for this blog post.

In the final two lines of code we have exposed the two message operations via members, as we have done for the other agents.

So let’s test it out!

First we create our three agents and wire them up as necessary. Next we use our customer agent to place 3 orders until the stock reaches 0. Our customer is eating so fast that they decide to order 2 more times while waiting for the vending machine to be restocked. Oh no, now they’re full! The customer stops ordering. However, those orders are still queued up and will be immediately fulfilled as soon as the vending machine is restocked. We restock and you see the items are promptly sent.

This is not much different than when we tested the vending machine directly. The only difference is that the customer handles passing its ID along and models the domain via a slightly altered API.

Now let’s show where the agents shine. We’re going to add some concurrency, where multiple customers will be ordering in parallel from different threads. This will be the first time we actually reuse an agent by creating multiple instances of the HungryCustomer agent. In addition, we’ll be sending messages to the vending machine manually from FSI, such as restocking and asking for current stock levels. You’ll see that the agents handle all of the ugly concurrency stuff for us without the need for locks and mutexes etc.

Getting Jiggy Concurrent Wit’ It

First we set up a helper function called startOrdering used to simulate a customer ordering 10 times, with a given delay between orders. Then the customer will say they are done ordering.

Next, we create multiple outputs to use in our Printer agent. We’ll write to the console as we have been doing (so we can see stuff happening while the simulation runs), but we’re also going to write to a file. The output to the file will make it easier to look at what happened afterwards, without all the extra junk that FSI puts in there. Thanks to our mailbox processor and its mechanics we have discussed, writing to a file is not a problem. Even when multiple messages want to write to the same file, they go through one agent, which will process them one at a time.

Then we create our vending machine and a little helper function we can easily send to FSI in order to ask and display the current stock at any point during the simulation. This will be to demonstrate that we still have unblocked control from the main thread while the customer agents are running and sending a barrage of messages.

Finally, we have 3 HungryCustomer agents.

Now we’ll set up our customers to start ordering, each with different delay times and use Async.Start to get them to each fire off in the threadpool. For convenience, we’ll also pass in the same cancellation token for the Async computations in case we want to cancel them all at once without having to kill FSI. Once we start to see orders firing off and stock levels changing, we’ll randomly send messages to restock and show current stock levels.

I’ll paste the output from my sample run in a couple chunks so we can just point out a few things.

At the start of the simulation, we can see that all of our customer agents begin ordering as soon as the threads kick off. Since customer 3 has the shortest delay between orders, this customer grabs the most items until the stock runs out.

In the next block of the simulation, we can see the customers still ordering even though the vending machine is out of stock. What’s important is to note the sequence in which those orders arrived (lines 1-10):

  • Customer 1
  • Customer 3
  • Customer 3
  • Customer 2
  • Customer 3

I then tossed a restock command into FSI, adding 10 items to the stock (lines 11-12). Those items were then immediately processed and were fulfilled in the same order they were requested (lines 13-22).

In the next block, ordering and fulfillment continues until we run out of stock again (lines1-16). You’ll also notice that I sent in a request for the current stock level via FSI (line 10).

Once the stock has run out, customer 3 and customer 2 each order an item to be fulfilled after restocking (lines 17-20). At this point, customer 3 is full, so stops ordering (line 21). However, once restocking has been done, they will both still receive that last requested order. Customer 3 is full, but better eat it or they’ll have to pay extra! Or hide the sushi in a teapot <shifty_eyes>…

I put in a restock request of 10 items via FSI (lines 22-23) and the two awaiting orders were immediately fulfilled (lines 24-27). At this point, you see nothing more related to customer 3, while customers 1 and 2 continue along. I’ll spare you the last 45 lines of output, as you get the point.

Summary

This was a contrived example and a little on the silly side, but hopefully it was just complicated enough to show you that you can easily achieve safe concurrency while focusing on the domain and less on the concurrency details. Using strongly typed messages, pattern matching and following a fairly repeatable agent pattern should have you looking like a concurrency wizard in no time.

Further Resources

Here are some useful resources I found that can help you to understand agents and the MailboxProcessor much more in depth than I have covered here:

  • Concurrency in .Net by Riccardo Terrell – He shows some interesting examples with agents acting as a load balancer to other agents and (if I remember correctly), talks about using agents as supervisors to start and stop other agents similar to Elixir/Erlang and the OTP model.
  • Messages and Agents blog post by Scott Wlaschin – As usual, anything on Scott’s F# for Fun and Profit site is gold. Do yourself a favour and give it a read (while you’re there read every other article and watch every talk he has done).
  • Real-World Functional Programming With examples in F# and C# by Tomas Petricek with Jon Skeet – An old book from 2009, but it was what first planted the F# seed in my mind. There is quite a lot of code in there about agents.
  • Companion site to the book above – It has some more complete examples such as building blocking queue agents, batch processing agents and a web chat agent.
  • Writing Concurrent Programs Using F# Mailbox Processors Code Magazine article by Rachel Reese – This is a good introduction to agents and goes in to some more complicated examples like Ricardo Terrell does in his book.
  • F# MailboxProcessor and the actor model YouTube video by Jonas Juselius – This is a great introduction to the basics if you’ve never used the MailboxProcessor. I should also mention that this is part of a great lecture series by Jonas that you can find here.

2 thought on “All You Can Eat Agents”
  1. Great intro to MailboxProcessor. Having used it quite a bit myself, one thing I’ve found is that one should always hook up a handler to the mailbox’s Error event. At least have the handler log the error. It helps a lot when things stop working because your agents have crashed and your messages silently go into the void.

    1. Thanks for reading Brett. That is a great point! I think I’ll do a follow up post to this one and cover that and a few other useful things I didn’t cover in this post.

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.