In this post we will look at how to add snapshots to our event source engine from previous posts.
This post is part of a series:
Functional Adventures in F# - A simple planner
Functional Adventures in F# - Calling F# from C#
Functional Adventures in F# - Using Map Collections
Functional Adventures in F# - Application State
Functional Adventures in F# - Types with member functions
Functional Adventures in F# - Getting rid of loops
Functional Adventures in F# - Getting rid of temp variables
Functional Adventures in F# - The MailboxProcessor
Functional Adventures in F# - Persisting Application State
Functional Adventures in F# - Adding Snapshot Persisting
Functional Adventures in F# - Type-safe identifiers
Functional Adventures in F# - Event sourcing lessons learned
Quickest way to get started for me was to just persist a snapshot every X actions that were processed. Also for my needs, it is OK to persist the snapshot on a separate thread, as long as the actions are persisted by the AppHolder MailboxProcessor, then the snapshot can be written by another MailboxProcessor. This way, as the state grows it will not affect the main processing...
Changes needed to the AppPersister
We will be adding a new function called Persist that will be called from the AppHolder instead of the current PersistAction.
let Persist (nextAction:obj) (state:obj) (actionCounter:int) =
PersistAction nextAction nextAction?ActionId
PersistSnapshot actionCounter state nextAction?ActionId
()
This in turn will call 2 functions, the old PersistAction from previous part and the new PersistSnapshot.
Also, note the nextAction?ActionId, this is part of the FSharp.Interop.Dynamic NuGet package that adds the support for Dynamics, i.e. runtime checked fields and functions on objects instead of compile time checked types. We will be assuming that all actions (that are of type object, or obj in F#) implement the field ActionId... We could solve this by forcing an interface implementation but that just gives us ugly code here, but I guess there is a better solution but for me this solved the problem and I am only to blame myself if things break.
Use them with care.
let private Snapshotter = MailboxProcessor.Start(fun inbox ->
let rec loop (n:int) =
async {
let! (actionCounter:int,state:obj, nextActionId:Guid) = inbox.Receive()
if actionCounter % 300 = 0 then
writeSnapshotToDisk state nextActionId
return! loop n
}
loop 0)
let PersistSnapshot (actionCounter:int) (state:obj) (nextActionId:Guid) =
Snapshotter.Post (actionCounter, state, nextActionId)
The PersistSnapshot function just posts the state to the MailboxProcessor called Snapshotter that in turn calls writeSnahotToDisk every 300 action. Much hardcoded here, I guess you will have to experiment a little to find the magic number that works for you (300 is just an example and I will probably end changing it in my code as well)
let private createSnapshotFilename (nextActionId:Guid) =
let now = DateTime.UtcNow
let dayPath = now.ToString("yyyy-MM-dd")
let fullPath = Path.Combine(store, "snapshots", dayPath)
let di = new DirectoryInfo(fullPath)
di.Create()
let filename = now.ToString("yyyy-MM-dd hh_mm_ss_fffffff.") + nextActionId.ToString() + ".snapshot"
let fullFilename = Path.Combine(fullPath, filename)
fullFilename
let private writeSnapshotToDisk (state:obj) (nextActionId:Guid) =
let fullFilename = createSnapshotFilename nextActionId
let json = JsonConvert.SerializeObject(state)
File.WriteAllText(fullFilename, json)
()
So basically as in the PersistAction part, we add some meta-data to the filename that we can use when reading from disk later. We JSON serialize the state object just as it is. The meta-data that we are interested in is the
ID of the next action to be executed.
We will also be changing the createActionFilename function to add the ID of the action to the filename as follows:
let private createActionFilename (action:obj) (actionId:Guid) =
let now = DateTime.UtcNow
let hourPath = now.ToString("yyyy-MM-dd HH")
let fullPath = Path.Combine(store, hourPath)
let di = new DirectoryInfo(fullPath)
di.Create()
let t = action.GetType()
let filename = now.ToString("yyyy-MM-dd hh_mm_ss_fffffff+") + now.Ticks.ToString() + "." + t.Name + "." + actionId.ToString() + ".action"
let fullFilename = Path.Combine(fullPath, filename)
fullFilename
Next we will be adding functionality to load snapshots and actions from disk. I.e. latest snapshot and all actions persisted after that snapshot. A totality that will result in a total state.
First, a minor change to getAction
let private getAction (json:string, filename:string) =
let split = filename.Split('.')
let actionName = split.[1]
let actionNameWithNamespace = "dreamstatecoding.core.Actions+" + actionName
let t = Assembly.GetExecutingAssembly().GetType(actionNameWithNamespace)
(JsonConvert.DeserializeObject(json, t), Guid.Parse(split.[2]))
We will be returning a tuple, with the deserialized object as the first element and the ID as the second element
Next, we want to get all actions from a specific ID, namely the ID that we wrote into the filename of the snapshot. To do this we do the following:
let private fileContainAction (filename:string) (actionIdString:string) =
let split = filename.Split('.')
split.[2] = actionIdString
let GetAllActionsFromId (nextActionId:Guid) =
let di = new DirectoryInfo(store)
di.Create()
let nextActionIdString = nextActionId.ToString()
let actions =
di.GetFiles("*.action", SearchOption.AllDirectories)
|> Seq.skipWhile (fun (fi:FileInfo) -> not (fileContainAction fi.Name nextActionIdString))
|> Seq.map (fun (fi:FileInfo) -> File.ReadAllText(fi.FullName), fi.Name)
|> Seq.map (fun x -> (getAction x))
|> Seq.toArray
actions
We use the Seq.skipWhile that skips elements in the input sequence until the function returns false, after that it will return the rest of the sequence. In our case, we check if the filename contains the next action id with the help of the fileContainsAction function and negating the result by using the
not operator.
Handy when the framework contians these nice functions that do exactly what you need at the moment!
let private getSnapshot (fi:FileInfo) =
let split = fi.Name.Split('.')
let json = File.ReadAllText(fi.FullName)
(JsonConvert.DeserializeObject<ApplicationState.AppliationState>(json), Guid.Parse(split.[1]))
The getSnaphot is pretty much the same as the GetAction, we parse out the
ID of the next action to be executed from the filename and return a tuple of the deserialized state and the ID.
And lastly the new function to be called from outside to get the latest state from disk storage:
let GetLatestSnapshotAndActions () =
let di = new DirectoryInfo(Path.Combine(store, "snapshots"))
di.Create()
let fileInfos = di.GetFiles("*.snapshot", SearchOption.AllDirectories)
match fileInfos with
| a when a.Length = 0 -> (None, GetAllActions())
| _ ->
let (snapshot, nextActionId) =
fileInfos
|> Seq.last
|> getSnapshot
let actions = GetAllActionsFromId nextActionId
(Some snapshot, actions)
Key things to note here:
- We create the snapshot directory if it does not exist, this so that we don't get 'directory does not exist' exceptions from the GetFiles method.
- If there are no files returned by the GetFiles method, we return a tuple with None as the first element and the original GetAllActions as the actions list to be executed.
- otherwie we get the last snapshot (Seq.last) and return Some snapshot, and reminder of actions as a tuple
The None and Some way of doing things seems nicer then using null as we do in other languages, especially from the caller side as we will see in our rewritten AppHolder module:
type Message =
| Snapshot of AppliationState
| Replay of obj
| Action of obj
First we introduce a new message type to our MailboxProcessor, the Snapshot of ApplicationState.
let s' =
match message with
| Snapshot snapshot -> snapshot
| Replay a -> s.HandleAction a
| Action a ->
AppPersister.Persist a s c'
s.HandleAction a
If it is a snapshot, then the
s' state is the snapshot, i.e. no other processing is done
Then, lastly we add a new init function to be called from our application startup code (Program.cs or whatever you are using)
let InitiateFromLastSnapshot () =
let (snapshot, actions) = AppPersister.GetLatestSnapshotAndActions()
match snapshot with
| Some x -> Processor.Post (Snapshot x)
| _ -> ()
Array.map (fun x -> (HandleReplayAction x)) actions
Here we get the snapshot and actions from the AppPersister, then we use pattern matching on the snapshot to determine if it is Some, in which case we send it into the Processor as a Snapshot message. Otherwise we do nothing ()
And then just run in all the Replay actions as before.
That is it!
All code is available at
github.
Known issues
Newtonsoft JSON.NET typeconverter issue
Newtonsoft JSON.NET seems to have some trouble with F# Map collections, especially when using union types as keys
type UserId = UserId of Guid.
Map<UserId, User>
Gives following nasty exception when trying to deserialize it as a key in a Map
Newtonsoft.Json.JsonSerializationException
HResult=0x80131500
Message=Could not convert string 'UserId 8fa6d5a6-9500-4aac-9f3e-a5b918b81c46' to dictionary key type 'dreamstatecoding.core.Model+UserId'. Create a TypeConverter to convert from the string to the key type object. Path 'UserStore.Users['UserId 8fa6d5a6-9500-4aac-9f3e-a5b918b81c46']', line 1, position 69.
Source=Newtonsoft.Json
StackTrace:
at Newtonsoft.Json.Serialization.JsonSerializerInternalReader.PopulateDictionary(IDictionary dictionary, JsonReader reader, JsonDictionaryContract contract, JsonProperty containerProperty, String id)
at Newtonsoft.Json.Serialization.JsonSerializerInternalReader.CreateObject(JsonReader reader, Type objectType, JsonContract contract, JsonProperty member, JsonContainerContract containerContract, JsonProperty containerMember, Object existingValue)
at Newtonsoft.Json.Serialization.JsonSerializerInternalReader.ResolvePropertyAndCreatorValues(JsonObjectContract contract, JsonProperty containerProperty, JsonReader reader, Type objectType)
at Newtonsoft.Json.Serialization.JsonSerializerInternalReader.CreateObjectUsingCreatorWithParameters(JsonReader reader, JsonObjectContract contract, JsonProperty containerProperty, ObjectConstructor`1 creator, String id)
at Newtonsoft.Json.Serialization.JsonSerializerInternalReader.CreateObject(JsonReader reader, Type objectType, JsonContract contract, JsonProperty member, JsonContainerContract containerContract, JsonProperty containerMember, Object existingValue)
at Newtonsoft.Json.Serialization.JsonSerializerInternalReader.ResolvePropertyAndCreatorValues(JsonObjectContract contract, JsonProperty containerProperty, JsonReader reader, Type objectType)
at Newtonsoft.Json.Serialization.JsonSerializerInternalReader.CreateObjectUsingCreatorWithParameters(JsonReader reader, JsonObjectContract contract, JsonProperty containerProperty, ObjectConstructor`1 creator, String id)
at Newtonsoft.Json.Serialization.JsonSerializerInternalReader.CreateObject(JsonReader reader, Type objectType, JsonContract contract, JsonProperty member, JsonContainerContract containerContract, JsonProperty containerMember, Object existingValue)
at Newtonsoft.Json.Serialization.JsonSerializerInternalReader.Deserialize(JsonReader reader, Type objectType, Boolean checkAdditionalContent)
at Newtonsoft.Json.JsonSerializer.DeserializeInternal(JsonReader reader, Type objectType)
at Newtonsoft.Json.JsonConvert.DeserializeObject(String value, Type type, JsonSerializerSettings settings)
at Newtonsoft.Json.JsonConvert.DeserializeObject[T](String value, JsonSerializerSettings settings)
at dreamstatecoding.core.AppPersister.getSnapshot(FileInfo fi) in C:\tfs\apps\dreamstatecoding.core\AppPersister.fs:line 104
at dreamstatecoding.core.AppPersister.GetLatestSnapshotAndActions() in C:\tfs\apps\dreamstatecoding.core\AppPersister.fs:line 113
at dreamstatecoding.core.AppHolder.InitiateFromLastSnapshot() in C:\tfs\apps\dreamstatecoding.core\AppHolder.fs:line 47
at test.web.Program.Main(String[] args) in C:\tfs\apps\test\test.web\Program.cs:line 14
Inner Exception 1:
JsonSerializationException: Error converting value "UserId 8fa6d5a6-9500-4aac-9f3e-a5b918b81c46" to type 'dreamstatecoding.core.Model+UserId'. Path 'UserStore.Users['UserId 8fa6d5a6-9500-4aac-9f3e-a5b918b81c46']', line 1, position 69.
Inner Exception 2:
ArgumentException: Could not cast or convert from System.String to dreamstatecoding.core.Model+UserId.
This is fixed and a new version is on
github.. Thanks to the really nice #fsharp community on twitter for the help!
Code changes to include the Fable.JsonConverter
let converters =
[ Fable.JsonConverter () :> JsonConverter ] |> List.toArray :> IList<JsonConverter>
let settings =
JsonSerializerSettings (
Converters = converters,
Formatting = Formatting.Indented,
NullValueHandling = NullValueHandling.Ignore)
And then just using it in serializing and deserializing as follows:
let json = JsonConvert.SerializeObject(state, settings)
let newState = JsonConvert.DeserializeObject<ApplicationState.ApplicationState>(json, settings)
All code provided as-is. This is copied from my own code-base, May need some additional programming to work. Use for whatever you want, how you want! If you find this helpful, please leave a comment, not required but appreciated! :)
Hope this helps someone out there!