Saturday, April 28, 2018

Functional Adventures in F# - The MailboxProcessor


In this post we will look at MailboxProcessor in F#.

This post is part of a series:
Functional Adventures in F# - A simple planner
Functional Adventures in F# - Calling F# from C#
Functional Adventures in F# - Using Map Collections
Functional Adventures in F# - Application State
Functional Adventures in F# - Types with member functions
Functional Adventures in F# - Getting rid of loops
Functional Adventures in F# - Getting rid of temp variables
Functional Adventures in F# - The MailboxProcessor
Functional Adventures in F# - Persisting Application State
Functional Adventures in F# - Adding Snapshot Persisting
Functional Adventures in F# - Type-safe identifiers
Functional Adventures in F# - Event sourcing lessons learned

MailboxProcessor

This continues with the code from Application State post.
Mainly the GameState type
and GameState =
 {
  PlayerStore:PlayerStore
  UnitStore:UnitStore
 }
 member this.HandleAction (action:obj) (state:GameState) =
  {
   PlayerStore = state.PlayerStore.HandleAction action state.UserStore
   UnitStore = state.UnitStore.HandleAction action state.AmmunitionStore 
  }


My goal is to incorporate F# with C# in a nice way. And as described in the Application State post, the core application model and all actions that can be made in the application will be modeled in F#. But to prevent a multithreaded C# application to corrupt the state we need to serialize the commands that are processed.
In C# I would have used a background Thread and a Queue<T> and written a locks to synchronize the reads and writes. Luckily in F#, we have all that nicely packaged in the built in MailboxProcessor.

The first line redefines the MailboxProcessor to the term Agent instead, this seems to be something that a lot of people are doing, so lets stick to that for the time being.
After that we define a F# static class, i.e. an abstract and sealed class with only static members. (this is what C# does under the hood).

type Agent<'T> = MailboxProcessor<'T>     
[<AbstractClass; Sealed>]
type GameHolder private () =
 static let mutable state : GameState = 
  {
   PlayerStore = { Players = Map.empty }
   UnitStore = { Units = Map.empty }
  }
 static member State : GameState = state
 static member private Processor = Agent.Start(fun inbox ->
  let rec loop =
   async {
    let! action = inbox.Receive()
    state <- state.HandleAction action state
    return! loop
    }
  loop)
 static member HandleAction (action:obj) =
  GameHolder.Processor.Post action

In addition we will be allowing for mutation here, something that the business logic in the F# module should be unaware of, but here, we are exposing the current state to the rest of the application so that it is easily accessible.
I.e. C# we can call GameHolder.State and always get the latest state. It is exposed as immutable so there is no risk that other threads will write to it.

After that we define the MailboxProcessor<'T> or in this case the redefined Agent<'T> as a private static member of the class.
What happens in the recursive part is a little iffy for me still. To my understanding the let! (with the exclamation mark) makes the inbox.Receive (that is also an async call) an synchronous call and stores its result in the action value.
If anyone has detailed explanation of what is going on here, please feel free to comment as I was unable to find details of this seems just to be the way things are done.

The HandleAction method calls the Post method on the MailboxProcessor and thus posts the action to the MailboxProcessor that in turn calls the HandleAction method of the State and mutates the exposed State value.

Update 2018-05-02, forcing serial access

Turns out the MailboxProcessor above does not work the way I thought... In my mind it serialized the processing but it did not. I.e. when multiple actions were sent to it in parallel, it corrupted the state variable.
After looking at this for some time now, I finally had to post the question on stackoverflow.
https://stackoverflow.com/questions/50135825/f-mailboxprocessor-limit-parallelism

Turns out the bad wolf in the drama was the static member. Changing the code to a module instead got the solution to work as expected.

The end result is:
module GameHolder =
    type Agent<'T> = MailboxProcessor<'T>     

    let mutable private state = AppliationState.Default
    let mutable private counter : int = 0
    let GetCurrentState() =  state
    let GetProcessedActionsCounter() = counter
    let private Processor = Agent.Start(fun inbox ->
        let rec loop (s : AppliationState) =
            async {
                let! action = inbox.Receive()
                let s' = s.HandleAction action
                state <- s'
                counter <- counter + 1
                return! loop s'
                }
        loop AppliationState.Default)

    let HandleAction (action:obj) =
        Processor.Post action

I can still get the current application state by calling a 'static' method from C#, in this case GameHolder.GetCurrentState().
So a small change thanks to stackoverflow and we still have a small but neat interface between our C# game/application and it model/business logic that is written in F#.

Finally

In the end. We have a GameHolder class that exposes State and HandleAction to the outside world. Whoever can query the state at anytime, but they are not allowed to change it other then through the HandleAction method.

All code provided as-is. This is copied from my own code-base, May need some additional programming to work. Use for whatever you want, how you want! If you find this helpful, please leave a comment, not required but appreciated! :)

Hope this helps someone out there!
Until next time: Work to Live, Don’t Live to Work

No comments:

Post a Comment