Friday, May 18, 2018

Functional Adventures in F# - Adding Snapshot Persisting



In this post we will look at how to add snapshots to our event source engine from previous posts.

This post is part of a series:
Functional Adventures in F# - A simple planner
Functional Adventures in F# - Calling F# from C#
Functional Adventures in F# - Using Map Collections
Functional Adventures in F# - Application State
Functional Adventures in F# - Types with member functions
Functional Adventures in F# - Getting rid of loops
Functional Adventures in F# - Getting rid of temp variables
Functional Adventures in F# - The MailboxProcessor
Functional Adventures in F# - Persisting Application State
Functional Adventures in F# - Adding Snapshot Persisting


Quickest way to get started for me was to just persist a snapshot every X actions that were processed. Also for my needs, it is OK to persist the snapshot on a separate thread, as long as the actions are persisted by the AppHolder MailboxProcessor, then the snapshot can be written by another MailboxProcessor. This way, as the state grows it will not affect the main processing...

Changes needed to the AppPersister

We will be adding a new function called Persist that will be called from the AppHolder instead of the current PersistAction.
    let Persist (nextAction:obj) (state:obj) (actionCounter:int)  =
        PersistAction nextAction nextAction?ActionId
        PersistSnapshot actionCounter state nextAction?ActionId
        ()
This in turn will call 2 functions, the old PersistAction from previous part and the new PersistSnapshot.
Also, note the nextAction?ActionId, this is part of the FSharp.Interop.Dynamic NuGet package that adds the support for Dynamics, i.e. runtime checked fields and functions on objects instead of compile time checked types. We will be assuming that all actions (that are of type object, or obj in F#) implement the field ActionId... We could solve this by forcing an interface implementation but that just gives us ugly code here, but I guess there is a better solution but for me this solved the problem and I am only to blame myself if things break.
Use them with care.

    let private Snapshotter = MailboxProcessor.Start(fun inbox ->
        let rec loop (n:int) =
            async {
                let! (actionCounter:int,state:obj, nextActionId:Guid) = inbox.Receive()
                if actionCounter % 300 = 0 then
                    writeSnapshotToDisk state nextActionId
                return! loop n
                }
        loop 0)

    let PersistSnapshot (actionCounter:int) (state:obj) (nextActionId:Guid) =
        Snapshotter.Post (actionCounter, state, nextActionId)

The PersistSnapshot function just posts the state to the MailboxProcessor called Snapshotter that in turn calls writeSnahotToDisk every 300 action. Much hardcoded here, I guess you will have to experiment a little to find the magic number that works for you (300 is just an example and I will probably end changing it in my code as well)
    let private createSnapshotFilename (nextActionId:Guid) =
        let now = DateTime.UtcNow
        let dayPath = now.ToString("yyyy-MM-dd")
        let fullPath = Path.Combine(store, "snapshots", dayPath)
        let di = new DirectoryInfo(fullPath)
        di.Create()
        let filename = now.ToString("yyyy-MM-dd hh_mm_ss_fffffff.") + nextActionId.ToString() + ".snapshot"
        let fullFilename = Path.Combine(fullPath, filename)
        fullFilename
    
    let private writeSnapshotToDisk  (state:obj) (nextActionId:Guid) =
        let fullFilename = createSnapshotFilename nextActionId
        let json = JsonConvert.SerializeObject(state)
        File.WriteAllText(fullFilename,  json)
        ()

So basically as in the PersistAction part, we add some meta-data to the filename that we can use when reading from disk later. We JSON serialize the state object just as it is. The meta-data that we are interested in is the ID of the next action to be executed.


We will also be changing the createActionFilename function to add the ID of the action to the filename as follows:
    let private createActionFilename (action:obj) (actionId:Guid) =
        let now = DateTime.UtcNow
        let hourPath = now.ToString("yyyy-MM-dd HH")
        let fullPath = Path.Combine(store, hourPath)
        let di = new DirectoryInfo(fullPath)
        di.Create()
        let t = action.GetType()
        let filename = now.ToString("yyyy-MM-dd hh_mm_ss_fffffff+") + now.Ticks.ToString() + "." + t.Name + "." + actionId.ToString() +  ".action"
        let fullFilename = Path.Combine(fullPath, filename)
        fullFilename


Next we will be adding functionality to load snapshots and actions from disk. I.e. latest snapshot and all actions persisted after that snapshot. A totality that will result in a total state.

First, a minor change to getAction
    let private getAction (json:string, filename:string) =
        let split = filename.Split('.')
        let actionName = split.[1]
        let actionNameWithNamespace = "dreamstatecoding.core.Actions+" + actionName
        let t = Assembly.GetExecutingAssembly().GetType(actionNameWithNamespace)
        (JsonConvert.DeserializeObject(json, t), Guid.Parse(split.[2]))
We will be returning a tuple, with the deserialized object as the first element and the ID as the second element

Next, we want to get all actions from a specific ID, namely the ID that we wrote into the filename of the snapshot. To do this we do the following:
    let private fileContainAction (filename:string) (actionIdString:string) =
        let split = filename.Split('.')
        split.[2] = actionIdString

    let GetAllActionsFromId (nextActionId:Guid) =
        let di = new DirectoryInfo(store)
        di.Create()
        let nextActionIdString = nextActionId.ToString()
        let actions =
            di.GetFiles("*.action", SearchOption.AllDirectories)
            |> Seq.skipWhile (fun (fi:FileInfo) -> not (fileContainAction fi.Name nextActionIdString))
            |> Seq.map (fun (fi:FileInfo) -> File.ReadAllText(fi.FullName), fi.Name)
            |> Seq.map (fun x -> (getAction x))
            |> Seq.toArray
        actions
We use the Seq.skipWhile that skips elements in the input sequence until the function returns false, after that it will return the rest of the sequence. In our case, we check if the filename contains the next action id with the help of the fileContainsAction function and negating the result by using the not operator.
Handy when the framework contians these nice functions that do exactly what you need at the moment!
    let private getSnapshot (fi:FileInfo) =
        let split = fi.Name.Split('.')
        let json = File.ReadAllText(fi.FullName)
        (JsonConvert.DeserializeObject<ApplicationState.AppliationState>(json), Guid.Parse(split.[1]))

The getSnaphot is pretty much the same as the GetAction, we parse out the ID of the next action to be executed from the filename and return a tuple of the deserialized state and the ID.

And lastly the new function to be called from outside to get the latest state from disk storage:
    let GetLatestSnapshotAndActions () =
        let di = new DirectoryInfo(Path.Combine(store, "snapshots"))
        di.Create()
        let fileInfos = di.GetFiles("*.snapshot", SearchOption.AllDirectories)
        match fileInfos with
        | a when a.Length = 0 -> (None, GetAllActions())
        | _ ->
            let (snapshot, nextActionId) =
                fileInfos
                |> Seq.last
                |> getSnapshot
            let actions = GetAllActionsFromId nextActionId
            (Some snapshot, actions)
Key things to note here:

  • We create the snapshot directory if it does not exist, this so that we don't get 'directory does not exist' exceptions from the GetFiles method.
  • If there are no files returned by the GetFiles method, we return a tuple with None as the first element and the original GetAllActions as the actions list to be executed.
  • otherwie we get the last snapshot (Seq.last) and return Some snapshot, and reminder of actions as a tuple
The None and Some way of doing things seems nicer then using null as we do in other languages, especially from the caller side as we will see in our rewritten AppHolder module:

    type Message =
        | Snapshot of AppliationState
        | Replay of obj
        | Action of obj
First we introduce a new message type to our MailboxProcessor, the Snapshot of ApplicationState.
let s' =
 match message with
 | Snapshot snapshot -> snapshot
 | Replay a -> s.HandleAction a
 | Action a -> 
  AppPersister.Persist a s c'
  s.HandleAction a
If it is a snapshot, then the s' state is the snapshot, i.e. no other processing is done
Then, lastly we add a new init function to be called from our application startup code (Program.cs or whatever you are using)

    let InitiateFromLastSnapshot () =
        let (snapshot, actions) = AppPersister.GetLatestSnapshotAndActions()
        match snapshot with
        | Some x -> Processor.Post (Snapshot x)
        | _ -> ()
        Array.map (fun x -> (HandleReplayAction x)) actions
Here we get the snapshot and actions from the AppPersister, then we use pattern matching on the snapshot to determine if it is Some, in which case we send it into the Processor as a Snapshot message. Otherwise we do nothing ()
And then just run in all the Replay actions as before.



That is it!
All code is available at github.



All code provided as-is. This is copied from my own code-base, May need some additional programming to work. Use for whatever you want, how you want! If you find this helpful, please leave a comment, not required but appreciated! :)

Hope this helps someone out there!
Until next time: Work to Live, Don’t Live to Work

Wednesday, May 9, 2018

Functional Adventures in F# - Persisting Application State



In this part we will look at how to persist the application state to disk. It should not be too hard to modify for usage with other persistence solutions.

This post is part of a series:
Functional Adventures in F# - A simple planner
Functional Adventures in F# - Calling F# from C#
Functional Adventures in F# - Using Map Collections
Functional Adventures in F# - Application State
Functional Adventures in F# - Types with member functions
Functional Adventures in F# - Getting rid of loops
Functional Adventures in F# - Getting rid of temp variables
Functional Adventures in F# - The MailboxProcessor
Functional Adventures in F# - Persisting Application State
Functional Adventures in F# - Adding Snapshot Persisting

Storing all actions

The idea is pretty simple, we will store all actions sent to the AppHolder MailboxProcessor and whenever we want to re-initiate the application state, we just load all the actions from storage and run them through the processor again and we should end up with the same application state.

This has been described by other people, for example Martin Fowler, if you have the time then head over there and read his article about event sourcing.

For this we will create a module called AppPersister. It will handle the actual storing and retrieving of the events from disk. So it will have side-effects and thus not be purely functional.... So lets just throw all those concepts out the door. But, this is a great opportunity to show some nice features of F# that will help write this kind of code a little more ... functional?
For serialization I chose to use the json format, and serializer is the JSON.NET library by Newtonsoft that can be added to your project with NuGet.

Some code for starters:
module AppPersister =
    open System
    open System.Reflection
    open System.IO
    open Newtonsoft.Json
    
    let private store = @".\store\"
Basically we just create a new module that will handle the persisting of actions... Here we also define that the store should be a subdirectory for the application.
    let private createActionFilename (action:obj) =
        let now = DateTime.UtcNow
        let hourPath = now.ToString("yyyy-MM-dd HH")
        let fullPath = Path.Combine(store, hourPath)
        let di = new DirectoryInfo(fullPath)
        di.Create()
        let t = action.GetType()
        let filename = now.ToString("yyyy-MM-dd hh_mm_ss_fffffff+") + now.Ticks.ToString() + "." + t.Name + ".action"
        let fullFilename = Path.Combine(fullPath, filename)
        fullFilename
The createActionFilename function does just that, it generates a new unique filename for the action by combining todays date and time, throwing in the total number of ticks to ensure that we always have a fresh value and lastly adding the type of action and the extension '.action'. Here we also create the directory if it does not exist already for the current date and hour (UTC), the DirectoryInfo.Create method is safe to run on an already existing directory so we do no other checking ourselves.
    let PersistAction (action:obj) =
        let fullFilename = createActionFilename action
        let json = JsonConvert.SerializeObject(action)
        File.WriteAllText(fullFilename,  json)
        ()
The PersisAction function handles the actual writing of the action to disk. We call createActionFilename to get the full filename and then use JsonConvert to serialize the action to a json string. Lastly we write the file with the nice File.WriteAllText method.

Now that we can write the actions, lets write some code to read them from disk.
    let GetAllActions () =
        let di = new DirectoryInfo(store)
        let actions =
            di.GetFiles("*.action", SearchOption.AllDirectories)
            |> Seq.map (fun (fi:FileInfo) -> File.ReadAllText(fi.FullName), fi.Name)
            |> Seq.map (fun x -> (getAction x))
            |> Seq.toArray
        actions

  • Here we use the DirectoryInfo.GetFiles built in method to find all files with the .action extension in any subdirectory in the store path. The result is an array of FileInfo objects that we pipe to a 
  • Seq.map where we return a Tuple containing the file contents and filename and pipe that tuple into
  • another Seq.map where we call getAction for all elements. This function will be tasked with deserializing the json to the correct type
  • Lastly we pipe the contents to Seq.toArray to make the result concrete. To my understanding is that the Seq constructs work a little like the IEnumerable and do lazy evaluation if you do not actually list it. 
    let private getAction (json:string, filename:string) =
        let split = filename.Split('.')
        let actionName = split.[1]
        let actionNameWithNamespace = "dreamstatecoding.core.Actions+" + actionName
        let t = Assembly.GetExecutingAssembly().GetType(actionNameWithNamespace)
        JsonConvert.DeserializeObject(json, t)
Lastly, the getAction function that is called from GetAllActions. Here we split the filename and pick out the part containing the name of the Action and then as the currently executing assembly to find the Type for that. Notice that we need to prefix the action name with the namespace name to make it work. In my solution I have all my actions in 1 module, so this works.


Rewriting AppHolder to work with persisted actions

Next step is to rewrite the AppHolder module from previous part to work with the AppPersister module.

So what we want to do here is to separate actual real time Actions from Replay actions from the persisting, mainly so that they do not get persisted again (duplicated).

So, lets put up a new discriminated union that defines what we want to do
    type Message =
        | Replay of obj
        | Action of obj

So, either we want to execute a Replay object or execute an Action object.
    let private Processor = Agent.Start(fun inbox ->
        let rec loop (s : AppliationState) =
            async {
                let! message = inbox.Receive()
                let (action:obj) = 
                    match message with
                    | Action a -> 
                        AppPersister.PersistAction a
                        a
                    | Replay a -> a

                let s' = s.HandleAction action
                state <- s'
                counter <- counter + 1
                return! loop s'
                }
        loop AppliationState.Default)
Now we rewrite our MailboxProcessor (Agent) to take a message instead of object directly. The first step is to pattern-match the received message to the type and get out the payload object.
If we are handling an Action we want to persist it as well, so lets put that function call here as well. Otherwise no change here.
    let HandleAction (action:obj) =
        Processor.Post (Action action)
The old HandleAction code called Processor.Post with the action directly. Now we must state that it is an Action, and not a Replay.
    let private HandleReplayAction (action:obj) =
        Processor.Post (Replay action)
The same goes for the Replay execution, we just add a new function that executes actions as Replays
    let InitiateFromStore () =
        AppPersister.GetAllActions()
        |> Array.map (fun x -> (HandleReplayAction x))
        |> ignore
        ()

Lastly, a function to initialize the store, we pipe all the actions loaded from persistence to the HandleReplayAction and then pipe the results from that to ignore. This means that we are not really interested in the results from this function call, in the end we just return unit from the InitiateFromStore function.
I put the call to this into my Program.cs file, it will load all the actions already executed from the store and execute them all as Replay actions.

Conclusions

In this part we have looked at how to persist the application state as a series of actions and then re-creating the application state when the application is restarted.
For applications handling lots and lots of actions, we may want to limit the number of actions needed to recreate the up to date state.. So in the next part we will look at snapshots. My plan is to upload the code to git after the snapshot part has been added.

All code provided as-is. This is copied from my own code-base, May need some additional programming to work. Use for whatever you want, how you want! If you find this helpful, please leave a comment, not required but appreciated! :)

Hope this helps someone out there!
Until next time: Work to Live, Don’t Live to Work

Saturday, April 28, 2018

Functional Adventures in F# - The MailboxProcessor


In this post we will look at MailboxProcessor in F#.

This post is part of a series:
Functional Adventures in F# - A simple planner
Functional Adventures in F# - Calling F# from C#
Functional Adventures in F# - Using Map Collections
Functional Adventures in F# - Application State
Functional Adventures in F# - Types with member functions
Functional Adventures in F# - Getting rid of loops
Functional Adventures in F# - Getting rid of temp variables
Functional Adventures in F# - The MailboxProcessor
Functional Adventures in F# - Persisting Application State
Functional Adventures in F# - Adding Snapshot Persisting

MailboxProcessor

This continues with the code from Application State post.
Mainly the GameState type
and GameState =
 {
  PlayerStore:PlayerStore
  UnitStore:UnitStore
 }
 member this.HandleAction (action:obj) (state:GameState) =
  {
   PlayerStore = state.PlayerStore.HandleAction action state.UserStore
   UnitStore = state.UnitStore.HandleAction action state.AmmunitionStore 
  }


My goal is to incorporate F# with C# in a nice way. And as described in the Application State post, the core application model and all actions that can be made in the application will be modeled in F#. But to prevent a multithreaded C# application to corrupt the state we need to serialize the commands that are processed.
In C# I would have used a background Thread and a Queue<T> and written a locks to synchronize the reads and writes. Luckily in F#, we have all that nicely packaged in the built in MailboxProcessor.

The first line redefines the MailboxProcessor to the term Agent instead, this seems to be something that a lot of people are doing, so lets stick to that for the time being.
After that we define a F# static class, i.e. an abstract and sealed class with only static members. (this is what C# does under the hood).

type Agent<'T> = MailboxProcessor<'T>     
[<AbstractClass; Sealed>]
type GameHolder private () =
 static let mutable state : GameState = 
  {
   PlayerStore = { Players = Map.empty }
   UnitStore = { Units = Map.empty }
  }
 static member State : GameState = state
 static member private Processor = Agent.Start(fun inbox ->
  let rec loop =
   async {
    let! action = inbox.Receive()
    state <- state.HandleAction action state
    return! loop
    }
  loop)
 static member HandleAction (action:obj) =
  GameHolder.Processor.Post action

In addition we will be allowing for mutation here, something that the business logic in the F# module should be unaware of, but here, we are exposing the current state to the rest of the application so that it is easily accessible.
I.e. C# we can call GameHolder.State and always get the latest state. It is exposed as immutable so there is no risk that other threads will write to it.

After that we define the MailboxProcessor<'T> or in this case the redefined Agent<'T> as a private static member of the class.
What happens in the recursive part is a little iffy for me still. To my understanding the let! (with the exclamation mark) makes the inbox.Receive (that is also an async call) an synchronous call and stores its result in the action value.
If anyone has detailed explanation of what is going on here, please feel free to comment as I was unable to find details of this seems just to be the way things are done.

The HandleAction method calls the Post method on the MailboxProcessor and thus posts the action to the MailboxProcessor that in turn calls the HandleAction method of the State and mutates the exposed State value.

Update 2018-05-02, forcing serial access

Turns out the MailboxProcessor above does not work the way I thought... In my mind it serialized the processing but it did not. I.e. when multiple actions were sent to it in parallel, it corrupted the state variable.
After looking at this for some time now, I finally had to post the question on stackoverflow.
https://stackoverflow.com/questions/50135825/f-mailboxprocessor-limit-parallelism

Turns out the bad wolf in the drama was the static member. Changing the code to a module instead got the solution to work as expected.

The end result is:
module GameHolder =
    type Agent<'T> = MailboxProcessor<'T>     

    let mutable private state = AppliationState.Default
    let mutable private counter : int = 0
    let GetCurrentState() =  state
    let GetProcessedActionsCounter() = counter
    let private Processor = Agent.Start(fun inbox ->
        let rec loop (s : AppliationState) =
            async {
                let! action = inbox.Receive()
                let s' = s.HandleAction action
                state <- s'
                counter <- counter + 1
                return! loop s'
                }
        loop AppliationState.Default)

    let HandleAction (action:obj) =
        Processor.Post action

I can still get the current application state by calling a 'static' method from C#, in this case GameHolder.GetCurrentState().
So a small change thanks to stackoverflow and we still have a small but neat interface between our C# game/application and it model/business logic that is written in F#.

Finally

In the end. We have a GameHolder class that exposes State and HandleAction to the outside world. Whoever can query the state at anytime, but they are not allowed to change it other then through the HandleAction method.

All code provided as-is. This is copied from my own code-base, May need some additional programming to work. Use for whatever you want, how you want! If you find this helpful, please leave a comment, not required but appreciated! :)

Hope this helps someone out there!
Until next time: Work to Live, Don’t Live to Work