Friday, May 18, 2018

Functional Adventures in F# - Adding Snapshot Persisting



In this post we will look at how to add snapshots to our event source engine from previous posts.

This post is part of a series:
Functional Adventures in F# - A simple planner
Functional Adventures in F# - Calling F# from C#
Functional Adventures in F# - Using Map Collections
Functional Adventures in F# - Application State
Functional Adventures in F# - Types with member functions
Functional Adventures in F# - Getting rid of loops
Functional Adventures in F# - Getting rid of temp variables
Functional Adventures in F# - The MailboxProcessor
Functional Adventures in F# - Persisting Application State
Functional Adventures in F# - Adding Snapshot Persisting
Functional Adventures in F# - Type-safe identifiers
Functional Adventures in F# - Event sourcing lessons learned


Quickest way to get started for me was to just persist a snapshot every X actions that were processed. Also for my needs, it is OK to persist the snapshot on a separate thread, as long as the actions are persisted by the AppHolder MailboxProcessor, then the snapshot can be written by another MailboxProcessor. This way, as the state grows it will not affect the main processing...

Changes needed to the AppPersister

We will be adding a new function called Persist that will be called from the AppHolder instead of the current PersistAction.
    let Persist (nextAction:obj) (state:obj) (actionCounter:int)  =
        PersistAction nextAction nextAction?ActionId
        PersistSnapshot actionCounter state nextAction?ActionId
        ()
This in turn will call 2 functions, the old PersistAction from previous part and the new PersistSnapshot.
Also, note the nextAction?ActionId, this is part of the FSharp.Interop.Dynamic NuGet package that adds the support for Dynamics, i.e. runtime checked fields and functions on objects instead of compile time checked types. We will be assuming that all actions (that are of type object, or obj in F#) implement the field ActionId... We could solve this by forcing an interface implementation but that just gives us ugly code here, but I guess there is a better solution but for me this solved the problem and I am only to blame myself if things break.
Use them with care.

    let private Snapshotter = MailboxProcessor.Start(fun inbox ->
        let rec loop (n:int) =
            async {
                let! (actionCounter:int,state:obj, nextActionId:Guid) = inbox.Receive()
                if actionCounter % 300 = 0 then
                    writeSnapshotToDisk state nextActionId
                return! loop n
                }
        loop 0)

    let PersistSnapshot (actionCounter:int) (state:obj) (nextActionId:Guid) =
        Snapshotter.Post (actionCounter, state, nextActionId)

The PersistSnapshot function just posts the state to the MailboxProcessor called Snapshotter that in turn calls writeSnahotToDisk every 300 action. Much hardcoded here, I guess you will have to experiment a little to find the magic number that works for you (300 is just an example and I will probably end changing it in my code as well)
    let private createSnapshotFilename (nextActionId:Guid) =
        let now = DateTime.UtcNow
        let dayPath = now.ToString("yyyy-MM-dd")
        let fullPath = Path.Combine(store, "snapshots", dayPath)
        let di = new DirectoryInfo(fullPath)
        di.Create()
        let filename = now.ToString("yyyy-MM-dd hh_mm_ss_fffffff.") + nextActionId.ToString() + ".snapshot"
        let fullFilename = Path.Combine(fullPath, filename)
        fullFilename
    
    let private writeSnapshotToDisk  (state:obj) (nextActionId:Guid) =
        let fullFilename = createSnapshotFilename nextActionId
        let json = JsonConvert.SerializeObject(state)
        File.WriteAllText(fullFilename,  json)
        ()

So basically as in the PersistAction part, we add some meta-data to the filename that we can use when reading from disk later. We JSON serialize the state object just as it is. The meta-data that we are interested in is the ID of the next action to be executed.


We will also be changing the createActionFilename function to add the ID of the action to the filename as follows:
    let private createActionFilename (action:obj) (actionId:Guid) =
        let now = DateTime.UtcNow
        let hourPath = now.ToString("yyyy-MM-dd HH")
        let fullPath = Path.Combine(store, hourPath)
        let di = new DirectoryInfo(fullPath)
        di.Create()
        let t = action.GetType()
        let filename = now.ToString("yyyy-MM-dd hh_mm_ss_fffffff+") + now.Ticks.ToString() + "." + t.Name + "." + actionId.ToString() +  ".action"
        let fullFilename = Path.Combine(fullPath, filename)
        fullFilename


Next we will be adding functionality to load snapshots and actions from disk. I.e. latest snapshot and all actions persisted after that snapshot. A totality that will result in a total state.

First, a minor change to getAction
    let private getAction (json:string, filename:string) =
        let split = filename.Split('.')
        let actionName = split.[1]
        let actionNameWithNamespace = "dreamstatecoding.core.Actions+" + actionName
        let t = Assembly.GetExecutingAssembly().GetType(actionNameWithNamespace)
        (JsonConvert.DeserializeObject(json, t), Guid.Parse(split.[2]))
We will be returning a tuple, with the deserialized object as the first element and the ID as the second element

Next, we want to get all actions from a specific ID, namely the ID that we wrote into the filename of the snapshot. To do this we do the following:
    let private fileContainAction (filename:string) (actionIdString:string) =
        let split = filename.Split('.')
        split.[2] = actionIdString

    let GetAllActionsFromId (nextActionId:Guid) =
        let di = new DirectoryInfo(store)
        di.Create()
        let nextActionIdString = nextActionId.ToString()
        let actions =
            di.GetFiles("*.action", SearchOption.AllDirectories)
            |> Seq.skipWhile (fun (fi:FileInfo) -> not (fileContainAction fi.Name nextActionIdString))
            |> Seq.map (fun (fi:FileInfo) -> File.ReadAllText(fi.FullName), fi.Name)
            |> Seq.map (fun x -> (getAction x))
            |> Seq.toArray
        actions
We use the Seq.skipWhile that skips elements in the input sequence until the function returns false, after that it will return the rest of the sequence. In our case, we check if the filename contains the next action id with the help of the fileContainsAction function and negating the result by using the not operator.
Handy when the framework contians these nice functions that do exactly what you need at the moment!
    let private getSnapshot (fi:FileInfo) =
        let split = fi.Name.Split('.')
        let json = File.ReadAllText(fi.FullName)
        (JsonConvert.DeserializeObject<ApplicationState.AppliationState>(json), Guid.Parse(split.[1]))

The getSnaphot is pretty much the same as the GetAction, we parse out the ID of the next action to be executed from the filename and return a tuple of the deserialized state and the ID.

And lastly the new function to be called from outside to get the latest state from disk storage:
    let GetLatestSnapshotAndActions () =
        let di = new DirectoryInfo(Path.Combine(store, "snapshots"))
        di.Create()
        let fileInfos = di.GetFiles("*.snapshot", SearchOption.AllDirectories)
        match fileInfos with
        | a when a.Length = 0 -> (None, GetAllActions())
        | _ ->
            let (snapshot, nextActionId) =
                fileInfos
                |> Seq.last
                |> getSnapshot
            let actions = GetAllActionsFromId nextActionId
            (Some snapshot, actions)
Key things to note here:

  • We create the snapshot directory if it does not exist, this so that we don't get 'directory does not exist' exceptions from the GetFiles method.
  • If there are no files returned by the GetFiles method, we return a tuple with None as the first element and the original GetAllActions as the actions list to be executed.
  • otherwie we get the last snapshot (Seq.last) and return Some snapshot, and reminder of actions as a tuple
The None and Some way of doing things seems nicer then using null as we do in other languages, especially from the caller side as we will see in our rewritten AppHolder module:

    type Message =
        | Snapshot of AppliationState
        | Replay of obj
        | Action of obj
First we introduce a new message type to our MailboxProcessor, the Snapshot of ApplicationState.
let s' =
 match message with
 | Snapshot snapshot -> snapshot
 | Replay a -> s.HandleAction a
 | Action a -> 
  AppPersister.Persist a s c'
  s.HandleAction a
If it is a snapshot, then the s' state is the snapshot, i.e. no other processing is done
Then, lastly we add a new init function to be called from our application startup code (Program.cs or whatever you are using)

    let InitiateFromLastSnapshot () =
        let (snapshot, actions) = AppPersister.GetLatestSnapshotAndActions()
        match snapshot with
        | Some x -> Processor.Post (Snapshot x)
        | _ -> ()
        Array.map (fun x -> (HandleReplayAction x)) actions
Here we get the snapshot and actions from the AppPersister, then we use pattern matching on the snapshot to determine if it is Some, in which case we send it into the Processor as a Snapshot message. Otherwise we do nothing ()
And then just run in all the Replay actions as before.



That is it!
All code is available at github.

Known issues

Newtonsoft JSON.NET typeconverter issue

Newtonsoft JSON.NET seems to have some trouble with F# Map collections, especially when using union types as keys
type UserId = UserId of Guid.
Map<UserId, User>
Gives following nasty exception when trying to deserialize it as a key in a Map
Newtonsoft.Json.JsonSerializationException
  HResult=0x80131500
  Message=Could not convert string 'UserId 8fa6d5a6-9500-4aac-9f3e-a5b918b81c46' to dictionary key type 'dreamstatecoding.core.Model+UserId'. Create a TypeConverter to convert from the string to the key type object. Path 'UserStore.Users['UserId 8fa6d5a6-9500-4aac-9f3e-a5b918b81c46']', line 1, position 69.
  Source=Newtonsoft.Json
  StackTrace:
   at Newtonsoft.Json.Serialization.JsonSerializerInternalReader.PopulateDictionary(IDictionary dictionary, JsonReader reader, JsonDictionaryContract contract, JsonProperty containerProperty, String id)
   at Newtonsoft.Json.Serialization.JsonSerializerInternalReader.CreateObject(JsonReader reader, Type objectType, JsonContract contract, JsonProperty member, JsonContainerContract containerContract, JsonProperty containerMember, Object existingValue)
   at Newtonsoft.Json.Serialization.JsonSerializerInternalReader.ResolvePropertyAndCreatorValues(JsonObjectContract contract, JsonProperty containerProperty, JsonReader reader, Type objectType)
   at Newtonsoft.Json.Serialization.JsonSerializerInternalReader.CreateObjectUsingCreatorWithParameters(JsonReader reader, JsonObjectContract contract, JsonProperty containerProperty, ObjectConstructor`1 creator, String id)
   at Newtonsoft.Json.Serialization.JsonSerializerInternalReader.CreateObject(JsonReader reader, Type objectType, JsonContract contract, JsonProperty member, JsonContainerContract containerContract, JsonProperty containerMember, Object existingValue)
   at Newtonsoft.Json.Serialization.JsonSerializerInternalReader.ResolvePropertyAndCreatorValues(JsonObjectContract contract, JsonProperty containerProperty, JsonReader reader, Type objectType)
   at Newtonsoft.Json.Serialization.JsonSerializerInternalReader.CreateObjectUsingCreatorWithParameters(JsonReader reader, JsonObjectContract contract, JsonProperty containerProperty, ObjectConstructor`1 creator, String id)
   at Newtonsoft.Json.Serialization.JsonSerializerInternalReader.CreateObject(JsonReader reader, Type objectType, JsonContract contract, JsonProperty member, JsonContainerContract containerContract, JsonProperty containerMember, Object existingValue)
   at Newtonsoft.Json.Serialization.JsonSerializerInternalReader.Deserialize(JsonReader reader, Type objectType, Boolean checkAdditionalContent)
   at Newtonsoft.Json.JsonSerializer.DeserializeInternal(JsonReader reader, Type objectType)
   at Newtonsoft.Json.JsonConvert.DeserializeObject(String value, Type type, JsonSerializerSettings settings)
   at Newtonsoft.Json.JsonConvert.DeserializeObject[T](String value, JsonSerializerSettings settings)
   at dreamstatecoding.core.AppPersister.getSnapshot(FileInfo fi) in C:\tfs\apps\dreamstatecoding.core\AppPersister.fs:line 104
   at dreamstatecoding.core.AppPersister.GetLatestSnapshotAndActions() in C:\tfs\apps\dreamstatecoding.core\AppPersister.fs:line 113
   at dreamstatecoding.core.AppHolder.InitiateFromLastSnapshot() in C:\tfs\apps\dreamstatecoding.core\AppHolder.fs:line 47
   at test.web.Program.Main(String[] args) in C:\tfs\apps\test\test.web\Program.cs:line 14

Inner Exception 1:
JsonSerializationException: Error converting value "UserId 8fa6d5a6-9500-4aac-9f3e-a5b918b81c46" to type 'dreamstatecoding.core.Model+UserId'. Path 'UserStore.Users['UserId 8fa6d5a6-9500-4aac-9f3e-a5b918b81c46']', line 1, position 69.

Inner Exception 2:
ArgumentException: Could not cast or convert from System.String to dreamstatecoding.core.Model+UserId.
This is fixed and a new version is on github.. Thanks to the really nice #fsharp community on twitter for the help!
Code changes to include the Fable.JsonConverter
let converters =
 [ Fable.JsonConverter () :> JsonConverter ] |> List.toArray :> IList<JsonConverter>

let settings =
 JsonSerializerSettings (
  Converters = converters,
  Formatting = Formatting.Indented,
  NullValueHandling = NullValueHandling.Ignore)
And then just using it in serializing and deserializing as follows:
let json = JsonConvert.SerializeObject(state, settings)
let newState = JsonConvert.DeserializeObject<ApplicationState.ApplicationState>(json, settings)

All code provided as-is. This is copied from my own code-base, May need some additional programming to work. Use for whatever you want, how you want! If you find this helpful, please leave a comment, not required but appreciated! :)

Hope this helps someone out there!

4 comments:

  1. Nice article. I really enjoyed reading this and the back-series.

    Did you possibly consider duck typing the "nextAction" object to help enforce the actionId member?


    Side note - a couple of typos:
    - PersitAction just before the first code block
    - "not" instead of "note"? on the second line after this code block.

    ReplyDelete
    Replies
    1. Thanks for the comment!
      I did consider duck-typing, did some googling and found that it was not as simple as for say in TypeScript.... So I decided to go with the dynamics instead due to time constraints..
      Not closing the door completely, might just be something I misunderstood, so there is a chance that I'll revisit the subject in future posts!
      Types fixed.. Thanks for pointing them out! :)

      Delete
  2. A small code thingy: multiple map operations after each other can be combined e.g. "a |> map f |> map g" is the same as "a |> map (f >> g)". Also, "map (fun x -> f x)" can be written as "map f".

    Also regarding the logic: I would split the createFilename functions into 2 functions. One to get the directory name and one to get the filename. And also move the directoryInfo.create into the write function. That way the creation of the directory name and filename have no side-effects. I would also add a di.exists check to the getLatestSnapshot function and remove the di.create. it can return the same value as when you can't find any files. These changes will make sure that directories are only created when needed.

    ReplyDelete
    Replies
    1. Hi there!
      All good pointers are welcome!
      I figure that combining multiple map operations gives a performance boost as well as there would be less iterations of the collection.
      Also about the creation of the directory until the write operation makes sense to limit the side effects. I will not change the code though as I will probably move away from files and the current solution does the thing for me :)
      Thanks for the comment!

      Delete