eventuous / eventuous Goto Github PK
View Code? Open in Web Editor NEWEvent Sourcing library for .NET
Home Page: https://eventuous.dev
License: Apache License 2.0
Event Sourcing library for .NET
Home Page: https://eventuous.dev
License: Apache License 2.0
What is your opinion on properly implementing business rules using Eventuous? I have been reading about CSLA's approach and like how the rules are packaged to be leveraged for UI code. I am willing to work on an opensource solution for Eventuous with your guidance. Incredible work on Eventuous.
Thanks in advance.
The AllStreamSubscriptionService
accepts the filter, so it's possible to limit the number of events delivered. It works fine, but it must be explicitly configured.
As we don't really know what events the subscription wants to process, we can't pre-configure the filter automatically.
It should be possible to make a new version of subscriptions, which would only use TypedEventHandler
. This way, the subscription knows what events it needs, and build the filter automatically.
It can also be done by checking the list of event handlers provided to the subscription, and if all of them are typed handlers, we can build the filter automatically.
ESDB subscription to $all could use a server-side filter. RMQ subscription could use the routing key, as the event type is already used as the routing key in the RMQ producer.
Add explicit registered handlers to Mongo projection, similar to TypedEventHandler
Unlike RMQ producers and subs, Google PubSub components don't create the necessary resources.
Partially because those ops are async and we do inits in the constructor, which is DI-friendly.
Not sure how to solve it though. Subscriptions are OK, producers aren't. Create on first use? Would work but doesn't sound right.
In the best of worlds, I'd like to obsolete the Proto.Persistence libs and instead have a proper adapter for a library like this.
This would both benefit the ecosystem to have a more generic lib like this where everyone focuses on the persistence efforts. and be in the spirit of ProtoActor to leverage already existing tools.
Having a play around with this framework and struggled to get the Subscription service working.
I pulled down your repository so I could debug it, and it turned out I hadn't registered my Type.
Opted for the DefaultEventSerializer, which silently handles unknown events.
Wonder if an option to throw an error on this would be useful?
Might be better that happen, than skip passed an event because someone forgot to register the Event.
Not sure it's even possible. Maybe there's some other API, but messages don't contain any information about their current position. Queue size is irrelevant here too. There are some metrics in GCP though, need to spend time investigating.
Proposal is to merge packages per infra.
Three EventStore packages will go to one.
PubSub and RMQ have two each, will be one.
Kafka starts with one (producers), then subs will be added. If not, it will be two packages.
Thoughts?
Vote with thumbs up and down. Will keep it open for a couple of weeks.
Based on #52 it is now possible to add a partitioned consumer.
It's similar to the concurrent consumer, which is a round robin.
The new CheckpointCommitHandler
, which is currently not in use, can wrap the checkpoint store, as commit is just a function. It only commits if the sequence has no gaps.
Events sequence increase needs tests for catch-up subscriptions, as other subscription types don't really use the sequence number.
Sequence:
Sub -> Partitioned consumer -> (partitioner) -> (channel) -> DefaultConsumer -> context.Ack
DelayedAckContext.Ack = CommitCheckpointHandler.Commit (CheckpointStore.StoreCheckpoint)
I think it would make sense to call EnsureDoesntExist()
and EnsureExists()
not from the Aggregate
but from the ApplicationService
, so the Aggregate
itself is not splattered with those calls in (almost) every API point of the Aggregate
. For that the methods OnNew()
and OnExisting()
in the ApplicationService
could be changed like so:
protected void OnNew<TCommand>(Action<T, TCommand> action) where TCommand : class
=> _handlers.Add(
typeof(TCommand),
new RegisteredHandler<T>(ExpectedState.New, (aggregate, cmd) =>
{
aggregate.EnsureDoesntExist();
action(aggregate, (TCommand)cmd);
})
);
protected void OnExisting<TCommand>(Func<TCommand, TId> getId, Action<T, TCommand> action) where TCommand : class {
_handlers.Add(
typeof(TCommand),
new RegisteredHandler<T>(ExpectedState.Existing, (aggregate, cmd) =>
{
aggregate.EnsureExists();
action(aggregate, (TCommand)cmd);
})
);
_getId.TryAdd(typeof(TCommand), cmd => getId((TCommand) cmd));
}
So for example in the Booking Aggregate
, we can write:
public void BookRoom(BookingId id, string roomId, StayPeriod period, decimal price) {
Apply(new RoomBooked(id, roomId, period.CheckIn, period.CheckOut, price));
}
public void RecordPayment(string paymentId, decimal amount) {
if (State.HasPaymentRecord(paymentId)) return;
...
}
instead of
public void BookRoom(BookingId id, string roomId, StayPeriod period, decimal price) {
EnsureDoesntExist();
Apply(new RoomBooked(id, roomId, period.CheckIn, period.CheckOut, price));
}
public void RecordPayment(string paymentId, decimal amount) {
EnsureExists();
if (State.HasPaymentRecord(paymentId)) return;
...
}
transform method I have require access to injected service instances used to enrich the method, for instance "IAggregateStore".
There's no gap for RMQ, but lead-time is relevant. Also, a subscription queue size basically tells how many events aren't processed. RMQ management API could help. The limitation is that the management API plugin must be added to the RMQ deployment.
Similar to RMQ
Thinking about what to add to the default meta.
created
system meta of ESDB gets lost when migrating to another store)When subscribing to $all
in ESDB, the subscription will receive deleted events from deleted or truncated streams. That's a known issue and it won't be fixed until log v3 is fully released.
Event Store Replicator has the scavenge filter, which maintains the stream metadata cache. It would be a good addition here as well, should be opt-in (disabled by default) as it creates some burden for the application (memory for the cache, additional subscription to receive metadata updates, metadata reads, etc).
Hi Alexey,
Nice library ๐
Is there any mechanism in this framework to create an aggregate from a pre-existing state object and version (ie - snapshots)?
Even If not, would you be open to any pull requests to allow the possibility as a basic implementation?
EventStoreDB allows adding metadata to events in any format, as it's just a byte array. Also, an event has a content type, which spans both payload and metadata.
However, subscriptions and producers can support event streaming using brokers. Most of the brokers don't support metadata as such and have message headers instead. Headers are key-value maps, not complex objects. RabbitMQ represents them as text, similar to HTTP headers. Google PubSub uses attributes, which are also key-value pairs, represented as a string-string dictionary.
My thought is to limit metadata to such a dictionary format. It would help to handle serialisation concerns and allows adding default headers like trace and span id, correlation id and so on.
Add Kafka producer, which should be easy
I think the ctor of the MongoCheckpointStore should have something like this in it:
_counters.TryAdd(checkPoint, 0);
Otherwise the dictionary has not been initialised (key not found)
I didn't test this from MongoCheckpointStore, but I built my own one, which was very similar.
The Result is nice as it returns the list of changes, which are events. These events can be delivered to the API consumer, like the UI, and applied there, eliminating the need to replicate the backend logic in the UI for the domain logic when it comes to state updates.
However, right now it is impossible to understand what actually happened as events returned by the Result object do not indicate the event type, so one would need to figure it out by looking at the object structure, and that's not easy.
We need to enrich the list of returned changes by adding the event type string either to the contract itself (which would require generating a new contract or manipulating the JSON object), or in a tuple.
It would be nice to be able to save and access metadata easily in eventuous.
Currently a 'null' is saved for metadata when appending events to the eventstore.
It would be nice if there was a method that you could override like 'AddMetadata' when inheriting from AggregateStore to set the metadata.
Currently no access to metadata in event handlers
This would be nice to use the build-in $correlationId of eventStore.
Currently I had to reimplement some of the already implemented classes like aggregateStore to make this possible.
Thanks.
I just wanted to say I'm excited to see where this project goes and it's refreshing to see a new approach on event sourcing. I have an upcoming project with a valid use-case / requirement for an Event Sourcing database and I'm looking for frameworks to help bootstrap that initiative.
There are very few frameworks for .NET that support event sourcing, and the few that do, all have the problem(s) you've outlined with their event bus approach to handling the write side and the query/read handlers. I think you're the only project I've come across that is building with Event Source database first (instead of a relational or document database overlay) that lets you approach the problem differently.
I'm going to give you guys a try, hopefully I"ll be able to contribute back, and at the very least If I'm a bit premature in adoption, I'm excited to follow your progress.
TypedEventHandler
AggregateState
with explicit handlersMongoProjection
when it supports registering handlersThese subscriptions support native retries:
Need to find a way to figure out separation between native retries and client-side retries.
When applying an event for the loaded aggregate, the state seems to get dropped.
This is likely not an issue with the Eventuous framework, but I can't figure out why an aggregate's state doesn't get saved after executing an async method on the aggregate. I made sure that async/await is used where needed.
Here is a link to a sample project (it has only the bare minimum to reproduce this issue):
https://github.com/claudiuchis/Eventuous.Sample
There are 2 events:
The 2nd event (WidgetReacted) is not appended (e.g. saved) to the stream, so it looks like the reaction doesn't get a chance to complete, and thus the event doesn't get saved. No clue why, as the service does await on the command issued by the reactor.
Any help is much appreciated.
We pass only the Ack timeout to the PubSub sub when creating it, but it has tons of other options. In particular, we want to enable message ordering.
The subscription request object has all these things and I don't want to bring all those options to our options as it must stay detached from transport details. So, we need to have a functional way to allow extended subs configuration before creating it.
As far as I understand, Eventuous writes/reads events to/from the EventStoreDB using the binary format.
What if a stream also contains JSON events?
Transport drops, what else?
There's a package to add retries using Polly to subscriptions (for retrying event handlers). However, we need more places with retries, and there's no need for complex policies that Polly supports.
The idea is to add some basic retry mechanism to the core project, so it can be used in, for example, AggregateStore
that will retry reads and writes for transient store errors.
Why is StreamName.For<T>()
prepending the type name? Why is it depending on T
at all?
I have at least two problems with this:
Right now, the event store is agnostic to what events are being persisted. It is quite a limitation for doing things like #64, domain-driven snapshots, closing periods, etc, as some "fold and archive" or "fold and snapshot" actions need to be controlled by the domain model.
For example, a reservation might get archived after the guest checks out, as the lifecycle of the reservation is done, and it becomes fully immutable (or frozen). Similarly, closing a period would collect the period data in a "new period started" event, and then signal the persistence to truncate all the previous events.
So, the idea is to provide a way for the domain model to tell those things to the aggregate store and further to the event store.
I have two ideas atm:
Apply
can get an overload or optional parameter to signal the archiving or truncationBoth can be implemented at the same time, but the implementation would be in different layers.
Apply
the change would be to extend the Changes
to contain extra informationStreamEvent
record would need to get new properties.In order to keep the database size in an event store upper bound, it would be useful to bake the archiving function.
The archive storage could be anything, so it needs an abstraction. The functionality would be limited to a stream archive function in a separate class that needs a dependency on IEventStore
and the archive interface.
Proposed functions:
object
and push everything as a single object to the archiveConsider:
Hi Alex,
I'm trying to depend on Eventuous inside my netstandard2.1 project. It requires dotnet5.0. I think this project only consists of the core classes which should be compatible inside a netstandard2.0 project.
Could you configure it to be explicit about it being a netstandard2.0 project? Perhaps other projects are compatible with netstandard2.0 as well.
Kind regards,
Thijs
This is related to #4
Eventuous silently stop working when an event type is not registered, which makes it very hard to figure out why things aren't working.
Today I discovered we have obsolete events in our database, and we have managed to do the incredible stupid thing which is to delete or rename those events in our code base. This took us a few hours of debugging and added logging, and even then the discovery was by chance: We happened to log the result of ApplicationService.Handle()
which we then could see contained an error message.
It would have been so much easier and faster if an exception was thrown instead.
Faster feedback for new developers would guide us in the right direction faster. Swallowed exceptions have a too high risk of leaving the system in an invalid state.
I think there is an issue where exceptions thrown in subscription handlers are NOT causing the checkpoint to be replayed from the last position.
I think this is related to the SubscriptionService.cs: 108
Task Store() => StoreCheckpoint(GetPosition(re), cancellationToken);
I believe the GetPosition(re) should be _lastProcessed from line 90.
Distributed tracing is essential for any system with asynchronous message flow. Even if the system isn't distributed physically, producing an event for processing it late already makes the system distributed in time.
Adding traces to Eventuous was planned from the start, so the work will be tracked here.
Here are some initial assumptions:
ActivitySource
would allow us to be vendor-agnostic and play nicely with OpenTelemetry.NETResources:
We handle subscriptions drops, but it would be nice to have a story around failures in event handlers.
When handling an event in a subscription fails, the following possible scenarios apply:
Reason | Remedy |
---|---|
Transient failure | Retry individual handlers |
Permanent failure (poison) | Log and continue Full stop, make unhealthy |
Checkpoint storage failure | Endless retry, make unhealthy |
We could support all the scenarios. The question is only how to configure it, and what is the good default.
Right now, in the app service, we explain how to get the aggregate id from a command:
OnNewAsync<V1.CreateTask>(
cmd => new ProjectTaskId(cmd.TaskId),
async (task, cmd, token) => { ... }
)
It seems a bit too verbose. We know the drill - the identity must be a non-empty string value. So, in theory, it can be simplified to use an attribute:
public record CreateTask([AggregateId] string TaskId, string Description);
The app service overloads without the first function would be able to check if the property is of type string on startup, ensure that it's not empty when it gets the command, and construct the identity instance of a given type as it has a default constructor. So, the call would look like:
OnNewAsync<V1.CreateTask>(
async (task, cmd, token) => { ... }
)
passing transform method lacks dependency injection, shovel class may have dependencies to be injected to enrich the transformed events.
This trips me several times, and smells duplication:
When
method in aggregate stateAlso, if I don't set the Id
explicit on my state object, at least on the very first event, then my aggregate has no id.
I can see why the service need me to compute the id from incoming command, but the id is known at the time When
runs. Which means it's known to the aggregate. Is it really necessary to keep the id in the aggregate state?
Hi @alexeyzimarev,
I really appreciate this project! I have been thinking of giving event sourcing a try for a long time and eventuous seems like the perfect occasion.
With that said, I was wondering about validation and how it works with event sourcing.
For example, in the your demo project (https://github.com/Eventuous/dotnet-sample), there is a delegate function called IsRoomAvailable that you simplified to
=> new ValueTask<bool>(true)
How would you implement the real thing and is it possible to make it transactional, ie, ensuring there is never a room booked twice at the same time?
Thank you!
Eventuous is DI-friendly by design. That's why it has quite a long list of constructor parameters for some complex elements, like subscriptions.
We already have many parameters encapsulated into options to mitigate this.
One annoying bit is the requirements to use subscription id in event handlers, so the subscription could find its own handlers. When a subscription is instantiated manually, this requirement becomes redundant as the list of passed handlers is controlled by the constructor invocation. The subscription id match is done solely to work nicely with DI.
Learning from HttpClientBuilder
and using named options, it is possible to avoid this hassle.
So, the idea is to remove SubscriptionId
from IEventHandler
. It is still required for the subscription itself as it's a useful identifier for other purposes.
Provide a better registration path, which allows chain handlers. The current AddEventHandler
doesn't chain anything.
SubscriptionContext
Thoughts, feedback - please reply here.
Example:
var psm = new PubsubMessage {
Data = ByteString.CopyFrom(payload),
OrderingKey = options?.OrderingKey ?? "",
Attributes = {
{ "contentType", _serializer.ContentType },
{ "eventType", eventType }
}
};
These keys are hard-coded, but there are scenarios where we can enable compatibility with other existing libs if we make those keys configurable.
Alex,
Congrats with this new repo, nice continuation on your book. Thanks a lot.
Not sure if it's appropriate to ask a question here. If so, simply disregard my question.
I think that in https://eventuous.dev/docs/prologue/the-right-way/, you make a very good point about the orthogonality of message brokers and event sourcing.
My questions is about projections and the most elegant way to "share" a projections with other bounded contexts.
So, let's stick to the canonical case of a customer service which manages (single source of truth write access) a customer status.
The customer services is event sourced and needs also a customer-status projection for his own purpose.
Other services services needs also the customer-status projection, but also when customer service is offline.
How would you tackle this?
Cheers
paul.
The subscription to a category stream seems to receive deleted events, also with ResolveLinkTos
set to true
. It seems like the Event
property of the ResolvedEvent
instance is null in this case, although the property is not nullable.
To me, it looks like a client bug. We need a test to see where it breaks.
If indeed the Event
property is null
, we can check it and not proceed. Also, I need to report it to Event Store, to check if it's the expected behaviour.
If it's something else, then, well, do something else...
Hi,
i recently stumbled upon Eventuous and tried it out a bit. So far i really like it! :)
While hitting my WebAPI Backend with some ApacheBench requests to see how it performs, i noticed that the AggregateStore
started to bubble up some gRPC exceptions. It recovers after some time, but the exceptions keep comming in a stable-ish frequency (like every 20 Requests or something).
I created a console based reproducer to show the problem. For this i forked your repository and created a new branch reproducer/grpc-errors-when-reading-aggregates-multiple-times.
The reproducer is using the Booking-Domain from Eventuous.Sut.Booking
to make sure, that i don't have any bug in my Aggregate-Implementation.
Related to #77
@thomaseyde describes that the application service fails when handling a command when an event stream contains an unknown event. Currently, the app service returns an ErrorResult
where the exception is "unknown event type".
The reason for the app service to return an ErrorResult
in the first place was to convey domain-level errors to the caller. However, things like "event store is down" or "unknown event type" are different types of errors as they indicate technical failures rather than model issues.
I am now inclined to make the application service throw on technical errors during Load
and Store
, but keep returning the ErrorResult
when an exception is thrown when handling the command itself.
Similar to RMQ
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.