My take on a simple proof of concept .NET Standard 2.0 CQRS library leveraging Reactive Extensions (Rx)
The following codebase is based on the previous work provided by:
- Meanwhile... on the command side of my architecture
- CQRS : A Cross Examination Of How It Works
- Rx works nicely with DDD and Event Sourcing
- Simple CQRS example
- CQRSlite
STATUS: "Finished" as in I've reached my conclusions which can be seen on the following section.
DISCLAIMER: This project has no intention of becoming a CQRS framework and it's only purpose is to be a tool of learning.
Since this is project is, as stated previously, a tool of learning here are my overall thoughts and opinions during development, without further ado here they are.
Basically I should not have done it... sadly I wasn't aware of MediatR, this is due to the fact that I got lost on the nomenclature for the so called mediator
pattern, and have been searching for keywords such as router
, proxy
instead.
The more I try to force Rx semantics into CQRS the more I "regret it" overall.
For example, IObservable<Change>
vs. LoadFromHistory(...)
. I do prefer the latter approach since it expresses the intent clearly.
Regarding the previous point, using OnNext(...)
to load the aggregate state looks broken by design since it enables unexpected behavior if a real stream on a different scheduler is used. Could possibly be avoided by locking the aggregate until 'OnCompleted()' is called, but seems a clunky hack.
In my point of view the only suitable candidate for Reactive Extensions so far seems to be the IStorage
, which can publish the events once they are stored. On that point I don't see any value of publishing the Change
on an aggregate, even though it's there as proof of concept.
On the CQRSlite implementation of an AggregateRoot being thread safe
I don't see a use case where a domain object should be accessible/used by multiple threads. The operations should be as atomic as possible.
The concurrency issues should be handled when saving the aggregate on the storage (log as source of truth).
On Greg Young's example and CQRSlite example Apply(...)
method.
Both approaches use some nifty tricks in order to Apply
events on aggregate while keeping track of the changes without exposing the methods on the domain object.
I understand that both approaches forces the code to some extent to adhere to the Event Sourcing concepts.
In a real scenario I think it's probably too much though. People that use/want to use CQRS with ES should first really understand how it works (not that I do) before using it.
Therefore my approach in using IEventHandler<TEvent>
and on each domain object explicitly implement them. While this approach creates more relaxed constraints, it helps to express the intent better in my opinion.~
As of now if an event published by the storage to an aggregate is out of order a StreamEventOutOfOrderException
is thrown, It could be a nice approach to try and mitigate that by caching the events out of order and the following ones until order can be restored.
If the order could not be restored until OnCompleted()
is called then an exception would be thrown.
The storage itself seems a better candidate to have this responsibility though.