Giter Club home page Giter Club logo

eventuous's Introduction

Eventuous

Event Sourcing library for .NET, which is:

  • Somewhat volatile as breaking changes keep coming in
  • Very Somewhat opinionated
  • EventStoreDB-oriented
  • .NET-friendly (decent DI, logging, and diagnostics support)

News

Read Eventuous blog on Medium: https://medium.com/eventuous. You will find some of the documentation articles published there as blog posts, it is normal as these articles have value for the community.

Community

Join our Slack channel.

Eventuous also has its own channel on DDD-CQRS-ES Discord server.

Video

You can watch a bit chaotic introduction to Eventuous on YouTube:

Link to YouTube

Subscribe to Eventuous YouTube channel.

Documentation

Documentation is available on eventuous.dev.

It might be not up to date as we are currently solidifying the codebase. This work leads to many breaking changes, and the documentation will get updated when the codebase stabilises.

Packages

Stable versions and release candidates are available on NuGet.

In addition, latest dev versions are available on MyGet under the public feed https://www.myget.org/F/eventuous/api/v3/index.json.

Stats

Alt

Support

If you like Eventuous and want to show your appreciation to its contributors, please ❤️ sponsor us.

For development and production support, get in touch with Ubiquitous.

Other things

Licence: Apache 2.0 Copyright © Ubiquitous AS All rights reserved

eventuous's People

Contributors

alejandro-sb avatar alexeyzimarev avatar claudiuchis avatar dependabot[bot] avatar fbjerggaard avatar iappwebdev avatar k3daevin avatar lejdholt avatar locktar avatar makp0 avatar matt-lethargic avatar mvastarelli avatar oakie avatar paulopez78 avatar pehrgit avatar reinderreinders avatar riezebosch avatar steve-oh avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

eventuous's Issues

Identity attribute for commands

Right now, in the app service, we explain how to get the aggregate id from a command:

OnNewAsync<V1.CreateTask>(
    cmd => new ProjectTaskId(cmd.TaskId),
    async (task, cmd, token) => { ... }
)

It seems a bit too verbose. We know the drill - the identity must be a non-empty string value. So, in theory, it can be simplified to use an attribute:

public record CreateTask([AggregateId] string TaskId, string Description);

The app service overloads without the first function would be able to check if the property is of type string on startup, ensure that it's not empty when it gets the command, and construct the identity instance of a given type as it has a default constructor. So, the call would look like:

OnNewAsync<V1.CreateTask>(
    async (task, cmd, token) => { ... }
)

Improve subscription registration

Eventuous is DI-friendly by design. That's why it has quite a long list of constructor parameters for some complex elements, like subscriptions.

We already have many parameters encapsulated into options to mitigate this.

One annoying bit is the requirements to use subscription id in event handlers, so the subscription could find its own handlers. When a subscription is instantiated manually, this requirement becomes redundant as the list of passed handlers is controlled by the constructor invocation. The subscription id match is done solely to work nicely with DI.

Learning from HttpClientBuilder and using named options, it is possible to avoid this hassle.

So, the idea is to remove SubscriptionId from IEventHandler. It is still required for the subscription itself as it's a useful identifier for other purposes.

Provide a better registration path, which allows chain handlers. The current AddEventHandler doesn't chain anything.

_counters not set MongoCheckpointStore

I think the ctor of the MongoCheckpointStore should have something like this in it:

_counters.TryAdd(checkPoint, 0);

Otherwise the dictionary has not been initialised (key not found)

I didn't test this from MongoCheckpointStore, but I built my own one, which was very similar.

Aggregate state not being saved after executing an async method

This is likely not an issue with the Eventuous framework, but I can't figure out why an aggregate's state doesn't get saved after executing an async method on the aggregate. I made sure that async/await is used where needed.
Here is a link to a sample project (it has only the bare minimum to reproduce this issue):

https://github.com/claudiuchis/Eventuous.Sample

There are 2 events:

  • WidgetCreated - created by calling the post endpoint on the WidgetController
  • WidgetReacted - created as a reaction to the WidgetCreated event after calling an "external" service.

The 2nd event (WidgetReacted) is not appended (e.g. saved) to the stream, so it looks like the reaction doesn't get a chance to complete, and thus the event doesn't get saved. No clue why, as the service does await on the command issued by the reactor.

Any help is much appreciated.

gRPC exceptions when reusing the AggregateStore

Hi,
i recently stumbled upon Eventuous and tried it out a bit. So far i really like it! :)

While hitting my WebAPI Backend with some ApacheBench requests to see how it performs, i noticed that the AggregateStore started to bubble up some gRPC exceptions. It recovers after some time, but the exceptions keep comming in a stable-ish frequency (like every 20 Requests or something).

I created a console based reproducer to show the problem. For this i forked your repository and created a new branch reproducer/grpc-errors-when-reading-aggregates-multiple-times.

The reproducer is using the Booking-Domain from Eventuous.Sut.Booking to make sure, that i don't have any bug in my Aggregate-Implementation.

Google PubSub: auto-create topics and subs

Unlike RMQ producers and subs, Google PubSub components don't create the necessary resources.

Partially because those ops are async and we do inits in the constructor, which is DI-friendly.

Not sure how to solve it though. Subscriptions are OK, producers aren't. Create on first use? Would work but doesn't sound right.

`StreamName.For<T>()` is prepending type name

Why is StreamName.For<T>() prepending the type name? Why is it depending on T at all?

I have at least two problems with this:

  1. My event store already have established stream ids. Eventuous is unable to read these without me implementing an adapter and hack the incoming stream name.
  2. Renaming an aggregate will disconnect it from existing streams.

Binary vs JSON events

As far as I understand, Eventuous writes/reads events to/from the EventStoreDB using the binary format.
What if a stream also contains JSON events?

How would you "share" projections between bounded contexts?

Alex,
Congrats with this new repo, nice continuation on your book. Thanks a lot.
Not sure if it's appropriate to ask a question here. If so, simply disregard my question.
I think that in https://eventuous.dev/docs/prologue/the-right-way/, you make a very good point about the orthogonality of message brokers and event sourcing.
My questions is about projections and the most elegant way to "share" a projections with other bounded contexts.
So, let's stick to the canonical case of a customer service which manages (single source of truth write access) a customer status.
The customer services is event sourced and needs also a customer-status projection for his own purpose.
Other services services needs also the customer-status projection, but also when customer service is offline.
How would you tackle this?
Cheers
paul.

Handling deleted events in subscription to $all

When subscribing to $all in ESDB, the subscription will receive deleted events from deleted or truncated streams. That's a known issue and it won't be fixed until log v3 is fully released.

Event Store Replicator has the scavenge filter, which maintains the stream metadata cache. It would be a good addition here as well, should be opt-in (disabled by default) as it creates some burden for the application (memory for the cache, additional subscription to receive metadata updates, metadata reads, etc).

Make possible to configure PubSub subs better

We pass only the Ack timeout to the PubSub sub when creating it, but it has tons of other options. In particular, we want to enable message ordering.

The subscription request object has all these things and I don't want to bring all those options to our options as it must stay detached from transport details. So, we need to have a functional way to allow extended subs configuration before creating it.

Prevent of Aggregate wide calls to EnsureDoesntExist() and EnsureExists()

I think it would make sense to call EnsureDoesntExist() and EnsureExists() not from the Aggregate but from the ApplicationService, so the Aggregate itself is not splattered with those calls in (almost) every API point of the Aggregate. For that the methods OnNew() and OnExisting() in the ApplicationService could be changed like so:

protected void OnNew<TCommand>(Action<T, TCommand> action) where TCommand : class
    => _handlers.Add(
        typeof(TCommand), 
        new RegisteredHandler<T>(ExpectedState.New, (aggregate, cmd) =>
        {
            aggregate.EnsureDoesntExist();
            action(aggregate, (TCommand)cmd);
        })
    );

protected void OnExisting<TCommand>(Func<TCommand, TId> getId, Action<T, TCommand> action) where TCommand : class {
    _handlers.Add(
        typeof(TCommand), 
        new RegisteredHandler<T>(ExpectedState.Existing, (aggregate, cmd) =>
        {
            aggregate.EnsureExists();
            action(aggregate, (TCommand)cmd);
        })
    );

    _getId.TryAdd(typeof(TCommand), cmd => getId((TCommand) cmd));
}

So for example in the Booking Aggregate, we can write:

public void BookRoom(BookingId id, string roomId, StayPeriod period, decimal price) {
    Apply(new RoomBooked(id, roomId, period.CheckIn, period.CheckOut, price));
}

public void RecordPayment(string paymentId, decimal amount) {
    if (State.HasPaymentRecord(paymentId)) return;
    ...
}

instead of

public void BookRoom(BookingId id, string roomId, StayPeriod period, decimal price) {
    EnsureDoesntExist();

    Apply(new RoomBooked(id, roomId, period.CheckIn, period.CheckOut, price));
}

public void RecordPayment(string paymentId, decimal amount) {
    EnsureExists();
            
    if (State.HasPaymentRecord(paymentId)) return;
    ...
}

Exciting and Refreshing

I just wanted to say I'm excited to see where this project goes and it's refreshing to see a new approach on event sourcing. I have an upcoming project with a valid use-case / requirement for an Event Sourcing database and I'm looking for frameworks to help bootstrap that initiative.

There are very few frameworks for .NET that support event sourcing, and the few that do, all have the problem(s) you've outlined with their event bus approach to handling the write side and the query/read handlers. I think you're the only project I've come across that is building with Event Source database first (instead of a relational or document database overlay) that lets you approach the problem differently.

I'm going to give you guys a try, hopefully I"ll be able to contribute back, and at the very least If I'm a bit premature in adoption, I'm excited to follow your progress.

Gap measure for Google PubSub

Not sure it's even possible. Maybe there's some other API, but messages don't contain any information about their current position. Queue size is irrelevant here too. There are some metrics in GCP though, need to spend time investigating.

Subscriptions v2

Current status

  • Subscriptions currently implement hosted services as they run in the background
  • Subscriptions are responsible for retrieving the initial checkpoint
  • Subscriptions also store checkpoints after processing an event, one by one

Issues

  • Not easy to use subscriptions outside of ASP.NET Core scope, forced Hosting and DI dependency
  • Overuse of a dummy checkpoint store for subscriptions that don't have the checkpoint maintenance (RMQ, PubSub)
  • Batched checkpointing has to be implemented by the checkpoint store
  • Impossible to partition event handlers as the sequential processing will break

Plan

  • Separate subscriptions from hosting
  • Make subscriptions independent of ASP.NET Core
  • Remove the checkpoint concern from the core subscription entirely
  • Introduce SubscriptionContext
  • Allow explicit ACK and NACK using the context, per event handler
  • Handle ACK and NACK only when all handlers respond (should there be a timeout too?)
  • Handle ACK and NACK in a way that is native to the transport
  • For subscriptions with a client-side checkpoint, only commit the checkpoint for the earliest ACKed event position

Thoughts, feedback - please reply here.

Fail fast on missing event type

This is related to #4

Eventuous silently stop working when an event type is not registered, which makes it very hard to figure out why things aren't working.

Today I discovered we have obsolete events in our database, and we have managed to do the incredible stupid thing which is to delete or rename those events in our code base. This took us a few hours of debugging and added logging, and even then the discovery was by chance: We happened to log the result of ApplicationService.Handle() which we then could see contained an error message.

It would have been so much easier and faster if an exception was thrown instead.

Faster feedback for new developers would guide us in the right direction faster. Swallowed exceptions have a too high risk of leaving the system in an invalid state.

Headers vs metadata

EventStoreDB allows adding metadata to events in any format, as it's just a byte array. Also, an event has a content type, which spans both payload and metadata.

However, subscriptions and producers can support event streaming using brokers. Most of the brokers don't support metadata as such and have message headers instead. Headers are key-value maps, not complex objects. RabbitMQ represents them as text, similar to HTTP headers. Google PubSub uses attributes, which are also key-value pairs, represented as a string-string dictionary.

My thought is to limit metadata to such a dictionary format. It would help to handle serialisation concerns and allows adding default headers like trace and span id, correlation id and so on.

Possibly generate server-side filter using handled event types

The AllStreamSubscriptionService accepts the filter, so it's possible to limit the number of events delivered. It works fine, but it must be explicitly configured.

As we don't really know what events the subscription wants to process, we can't pre-configure the filter automatically.

It should be possible to make a new version of subscriptions, which would only use TypedEventHandler. This way, the subscription knows what events it needs, and build the filter automatically.

It can also be done by checking the list of event handlers provided to the subscription, and if all of them are typed handlers, we can build the filter automatically.

ESDB subscription to $all could use a server-side filter. RMQ subscription could use the routing key, as the event type is already used as the routing key in the RMQ producer.

Add basic retries

There's a package to add retries using Polly to subscriptions (for retrying event handlers). However, we need more places with retries, and there's no need for complex policies that Polly supports.

The idea is to add some basic retry mechanism to the core project, so it can be used in, for example, AggregateStore that will retry reads and writes for transient store errors.

Question - Business Rules

What is your opinion on properly implementing business rules using Eventuous? I have been reading about CSLA's approach and like how the rules are packaged to be leveraged for UI code. I am willing to work on an opensource solution for Eventuous with your guidance. Incredible work on Eventuous.

Thanks in advance.

Archiving streams

In order to keep the database size in an event store upper bound, it would be useful to bake the archiving function.

The archive storage could be anything, so it needs an abstraction. The functionality would be limited to a stream archive function in a separate class that needs a dependency on IEventStore and the archive interface.

Proposed functions:

  • ArchiveStream would take a given stream, read all the events from there, serialize to an array of object and push everything as a single object to the archive
  • RestoreStream would do the opposite, given that we know the types of all the archived events

Consider:

  • Store event type for each event in the archive envelope
  • Should the archived stream be deleted or truncated after writing an activation event? It can be left outside for the user to decide, or it can be a built-in feature.

Initializing Aggreagate from Pre-existing state

Hi Alexey,

Nice library 👍

Is there any mechanism in this framework to create an aggregate from a pre-existing state object and version (ie - snapshots)?

Even If not, would you be open to any pull requests to allow the possibility as a basic implementation?

DefaultEventSerializer - throw error when Type not found

Having a play around with this framework and struggled to get the Subscription service working.
I pulled down your repository so I could debug it, and it turned out I hadn't registered my Type.
Opted for the DefaultEventSerializer, which silently handles unknown events.

Wonder if an option to throw an error on this would be useful?
Might be better that happen, than skip passed an event because someone forgot to register the Event.

Eventuous least package dependencies

Hi Alex,

I'm trying to depend on Eventuous inside my netstandard2.1 project. It requires dotnet5.0. I think this project only consists of the core classes which should be compatible inside a netstandard2.0 project.

Could you configure it to be explicit about it being a netstandard2.0 project? Perhaps other projects are compatible with netstandard2.0 as well.

Kind regards,

Thijs

Merge packages? Or not?

Proposal is to merge packages per infra.

Three EventStore packages will go to one.
PubSub and RMQ have two each, will be one.
Kafka starts with one (producers), then subs will be added. If not, it will be two packages.

Thoughts?

Vote with thumbs up and down. Will keep it open for a couple of weeks.

Consider adding the possibility to save and access metadata

It would be nice to be able to save and access metadata easily in eventuous.

  1. Currently a 'null' is saved for metadata when appending events to the eventstore.
    It would be nice if there was a method that you could override like 'AddMetadata' when inheriting from AggregateStore to set the metadata.

  2. Currently no access to metadata in event handlers

This would be nice to use the build-in $correlationId of eventStore.
Currently I had to reimplement some of the already implemented classes like aggregateStore to make this possible.

Thanks.

Allow overriding header names for the required values

Example:

            var psm = new PubsubMessage {
                Data        = ByteString.CopyFrom(payload),
                OrderingKey = options?.OrderingKey ?? "",
                Attributes = {
                    { "contentType", _serializer.ContentType },
                    { "eventType", eventType }
                }
            };

These keys are hard-coded, but there are scenarios where we can enable compatibility with other existing libs if we make those keys configurable.

Add partitioned consumer

Based on #52 it is now possible to add a partitioned consumer.

It's similar to the concurrent consumer, which is a round robin.

The new CheckpointCommitHandler, which is currently not in use, can wrap the checkpoint store, as commit is just a function. It only commits if the sequence has no gaps.

Events sequence increase needs tests for catch-up subscriptions, as other subscription types don't really use the sequence number.

Sequence:

Sub -> Partitioned consumer -> (partitioner) -> (channel) -> DefaultConsumer -> context.Ack
DelayedAckContext.Ack = CommitCheckpointHandler.Commit (CheckpointStore.StoreCheckpoint)

Proto Eventuous integration

In the best of worlds, I'd like to obsolete the Proto.Persistence libs and instead have a proper adapter for a library like this.
This would both benefit the ecosystem to have a more generic lib like this where everyone focuses on the persistence efforts. and be in the spirit of ProtoActor to leverage already existing tools.

  • Pluggable serialization
  • Semaphore limits on reads / writes, e.g. prevent more than X concurrent reads/writes at any given point in time. e.g. big actor systems booting up can easily kill a database if no such feature is in place
  • Batch reads and writes across aggregates. similar to the above, but many actors asks to read or write state, all of these actions are collected in a single aggregator and leveraging DB level features for batch operations

Subscription durability

We handle subscriptions drops, but it would be nice to have a story around failures in event handlers.

When handling an event in a subscription fails, the following possible scenarios apply:

Reason Remedy
Transient failure Retry individual handlers
Permanent failure (poison) Log and continue
Full stop, make unhealthy
Checkpoint storage failure Endless retry, make unhealthy

We could support all the scenarios. The question is only how to configure it, and what is the good default.

Add event type to the JSON object returned n the Result

The Result is nice as it returns the list of changes, which are events. These events can be delivered to the API consumer, like the UI, and applied there, eliminating the need to replicate the backend logic in the UI for the domain logic when it comes to state updates.

However, right now it is impossible to understand what actually happened as events returned by the Result object do not indicate the event type, so one would need to figure it out by looking at the object structure, and that's not easy.

We need to enrich the list of returned changes by adding the event type string either to the contract itself (which would require generating a new contract or manipulating the JSON object), or in a tuple.

Bug: Subscription Exception and checkpoint

I think there is an issue where exceptions thrown in subscription handlers are NOT causing the checkpoint to be replayed from the last position.

I think this is related to the SubscriptionService.cs: 108
Task Store() => StoreCheckpoint(GetPosition(re), cancellationToken);

I believe the GetPosition(re) should be _lastProcessed from line 90.

Diagnostics and traces

Distributed tracing is essential for any system with asynchronous message flow. Even if the system isn't distributed physically, producing an event for processing it late already makes the system distributed in time.

Adding traces to Eventuous was planned from the start, so the work will be tracked here.

Here are some initial assumptions:

  • Using ActivitySource would allow us to be vendor-agnostic and play nicely with OpenTelemetry.NET
  • #26 is a pre-requisite as we need to propagate the remote context across all known transports
  • According to #9 we'll stick to key-value (string-object where an object is a primitive value) headers

Resources:

Control events persistence from the domain model

Right now, the event store is agnostic to what events are being persisted. It is quite a limitation for doing things like #64, domain-driven snapshots, closing periods, etc, as some "fold and archive" or "fold and snapshot" actions need to be controlled by the domain model.

For example, a reservation might get archived after the guest checks out, as the lifecycle of the reservation is done, and it becomes fully immutable (or frozen). Similarly, closing a period would collect the period data in a "new period started" event, and then signal the persistence to truncate all the previous events.

So, the idea is to provide a way for the domain model to tell those things to the aggregate store and further to the event store.

I have two ideas atm:

  • Apply can get an overload or optional parameter to signal the archiving or truncation
  • A set of attributes for event classes to do the same

Both can be implemented at the same time, but the implementation would be in different layers.

  • For Apply the change would be to extend the Changes to contain extra information
  • For attributes, all the work will be done by the aggregate store. The StreamEvent record would need to get new properties.

Gap measure RMQ

There's no gap for RMQ, but lead-time is relevant. Also, a subscription queue size basically tells how many events aren't processed. RMQ management API could help. The limitation is that the management API plugin must be added to the RMQ deployment.

Additional metadata

Thinking about what to add to the default meta.

  • Event source:
    • Aggregate type (as it's not necessarily encoded in the stream name)
    • Application name
    • Application version
  • Command type that triggered the event?
  • The actual date when the event was produced? (The created system meta of ESDB gets lost when migrating to another store)
  • Anything else?

Differentiate application exceptions

Related to #77

@thomaseyde describes that the application service fails when handling a command when an event stream contains an unknown event. Currently, the app service returns an ErrorResult where the exception is "unknown event type".

The reason for the app service to return an ErrorResult in the first place was to convey domain-level errors to the caller. However, things like "event store is down" or "unknown event type" are different types of errors as they indicate technical failures rather than model issues.

I am now inclined to make the application service throw on technical errors during Load and Store, but keep returning the ErrorResult when an exception is thrown when handling the command itself.

AggregateId disconnection between aggregate and state

This trips me several times, and smells duplication:

  1. I have to compute the aggregate id in application service handlers,
  2. Then again in the When method in aggregate state

Also, if I don't set the Id explicit on my state object, at least on the very first event, then my aggregate has no id.

I can see why the service need me to compute the id from incoming command, but the id is known at the time When runs. Which means it's known to the aggregate. Is it really necessary to keep the id in the aggregate state?

NullReferenceException in AsReceivedEvent

The subscription to a category stream seems to receive deleted events, also with ResolveLinkTos set to true. It seems like the Event property of the ResolvedEvent instance is null in this case, although the property is not nullable.

To me, it looks like a client bug. We need a test to see where it breaks.

If indeed the Event property is null, we can check it and not proceed. Also, I need to report it to Event Store, to check if it's the expected behaviour.

If it's something else, then, well, do something else...

Validation and event sourcing

Hi @alexeyzimarev,

I really appreciate this project! I have been thinking of giving event sourcing a try for a long time and eventuous seems like the perfect occasion.

With that said, I was wondering about validation and how it works with event sourcing.

For example, in the your demo project (https://github.com/Eventuous/dotnet-sample), there is a delegate function called IsRoomAvailable that you simplified to
=> new ValueTask<bool>(true)

How would you implement the real thing and is it possible to make it transactional, ie, ensuring there is never a room booked twice at the same time?

Thank you!

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.