event-source-backbone's People
event-source-backbone's Issues
Separate Reconstruct vs Temporal notification
Support for AsyncApi standard
Enable dynamic partitioning
Lambda for the source name based on the metadata
For example: Tenant, Geo
Partial subscription
Being able to register for specific stram's event without waiting for the processing of other events.
- will require to define a subscription group (isolate from the subscription of different events).
Requirements:
- basic functionality
- support for non-auto-ack (avoid blocking on the unrecognized event)
- bridge should reflect whether it had handle the event
- having behavior options to define the desired behavior (like strict, allow-when-auto-ack, ack-if-unhandled)
Check Orleans Event Sourcing
Doc: https://dotnet.github.io/orleans/Documentation/index.html
Event Sourcing: https://dotnet.github.io/orleans/Documentation/grains/event_sourcing/index.html
.NET 3: https://devblogs.microsoft.com/dotnet/orleans-3-0/
Srorage Provider: https://dotnet.github.io/orleans/Documentation/grains/grain_persistence/index.html#creating-a-storage-provider
https://www.microland.com/blogs/distributed-apps-with-google-cloud-1
Docker: https://dotnet.github.io/orleans/1.5/Documentation/Deployment-and-Operations/Docker-Deployment.html
K8s *: https://github.com/agentpowers/orleans-event-sourcing
K8s: https://github.com/OrleansContrib/Orleans.Clustering.Kubernetes
K8s-scafold: https://github.com/AviAvni/EventSourcingOrleans
Dashboard: https://github.com/OrleansContrib/OrleansDashboard
OpenTelemetry
Builder should use Factory for whatever needs ILogger (Channel, etc...) until the build phase
Plan should be a record
Both Producer & Consumer Plan
Replace XPending + XClaim with XAutoClaim
Dynamic / Semi Dynamic Producer Channeling (TBD)
Motivation
Change Partition or Shard according to runtime state.
For example: Service or Application Environment or Feature Flags.
Optional Solution
Semi Dynamic:
Use the Merge functionality + Filter optional filtering on the shard or partition level.
- Merge should have options of Broadcast / First
- Risk: it's up to the dev to make sure he have a fallback
Dynamic filtering
Having Partition / Shard formatter which will format it before forwarding to the channel.
- Risk: might format something that the consumer don't listen to (less declarative).
Consumer: Error Handling
Make sure that non-fatal error don't complete the Source-Block, yet visible to the consumer
Dead letter stream?
Indication about:
- retries (+ custom logic)
Id lookup layer
Useful for scenarios like a migration of a stream into a new one.
Will Assist at GetByIdAsync
Implement REDIS: Provider (stream + hash)
Open Closed key & metadata convention
The motivation is to enable organizations to define their path & metadata semantics, yet to have a typed representation of this semantic. In order to keeping a strict convention within multiple teams within the organization.
This spec deals with two kinds of semantics:
- Stream name: dictate the stream name (for example environment, partition, shard)
- Deployment context: contextual information like which cluster/namespace the event created on and which one is handling the current execution (this one is related to a scenario when two different contexts synchronize their events)
What will be affected by this feature
- Builders
- Metadata
- Channels
Circuit-breaker + Dead stream
- Have Circuit-breaker policy for technical problem like data / channel corruption.
- Have Dead stream policy for max retry of processing a message (for specific consumer channel)
Deploy automation
Build NuGet automation
https://dusted.codes/github-actions-for-dotnet-core-nuget-packages
Source Sharding Insight
According to the business stream can be splits into sharding as long as it keep business domain separation.
When same data may be related for different data slicing portion, the message may be send to each aspect separately,
For example each order processing can use different source, while suppliers can use different source (which share some of the ordering message) in order to build the supplier interaction state.
In-order to enable micro-service to listen on sources which may be created in the future (for example future order)
We need to have source for source channel creation (and to store it into a query storage)
Review Shard / Tagging concept.
Discuss edge cases, best practice & API / builder flow.
- Affinity between tag and shard.
does messages of specific shard had to use same tags? - Consumer discovery of shards & related tags.
How to discover that existing shard is having new tag.
Config: strict/non-strict consumption
When inheriting an interface.
Producing may happen on the level of the derived interface, while consumers can be at the base interface level.
configuration should determine whether to hang on a non-exist method.
Hanging may be useful when another consumer consumes the rest of the messages.
- Strict
- Should indicate whether it is an error or waiting
- Waiting: should release the pending if it does not exist
- When releasing pending should await and start consuming again with incremental chunk size
Forwarder
Add buit-in forwarder operations:
- read messages from one stream + storage strategy
- forward it to other streams + storage strategy (can use the Merge operation to broadcast)
- it should be at the message segmentation level (interface contract agnostic)
- in case of a cross cluster (external call) it can do API call (which will handle it at the segmentation level)
- have HttpClient/Factory as parameter
- Consumer builder should allow filter by origin (ignore copy = default?)
Required Functionality
- #34
- Add Prometheus Exporter for OpenTelemetry .NET
- Report events (reconnect, processed messages count, etc.)
- #24
- Introduce parallelism
- Aggregation of consume messages (ability to pipe it through BatchBlock and alike while still being able to acknowledge at the end)
- #33
- #42
- Dynamic Partitioning & Sharding: With Strategy Lambda for the source name based on the metadata. For example: Tenant, Geo
- #30
- #29
- #35
- Source Generation
- Producer
- Consumer
- Remove Reflection base
- Allow plan mutation on builder's Build()
- Metadata should be available via context (stateless style).
- Client Recovery
- Use Poly // https://github.com/App-vNext/Polly
- Producer - SendAsync()
- Consumer (Inject Poly's policy via the builder, into the plan)
- Use Poly // https://github.com/App-vNext/Polly
- Acknowledge
- Acknowledge on succeed
- Acknowledge for the message: can be available via context
- Pending & Claim
- Self Pending: should fetch - on first time, after error, first time when have no-element.
- Work stealing
- When pending of other consumers are lingered to long.
- Consumers should monitor for pending and claim long pending messages. (XCLAIM)
- Builder should enable different channel for messages & segments (actually it might be channel per segment key) for example S3
- Enable base channel for REDIS Stream + injection for handling different storage for segments & interceptors
- Integration Tests
- Should delete REDIS key on test disposal
- Should verify no pending messages
- Should ensure message sequence
- Test crash scenario when same consumer name continue after the crash
- Consumer should define Consumer Group [optional]
- REDIS Channel: Provider (stream + hash) (options: batch)
- Default Segmentation
- ProducerBase should be implemented as a unit of work in order to support multiple parameters
- Get logger
- use class instead of using Bucket = System.Collections.Immutable.ImmutableDictionary<string, System.ReadOnlyMemory>;
- execution pipeline (coordinates the different stages, interceptors, segmentation, channel)
build -> create metadata -> create channel (meta, segmentation) -> decorate {for interception} - Test Channel
- Merging implementation
- Abstract Factory concept for producer / consumer clients
- Custom Segmentation (for GDPR and alike)
- Custom Serialization
- Custom Channel
- Consume raw message: try to do it friendly for pattern matching: action name and case to type
Versioning & Source Generator
Source Generator can solve the versioning issue.
- learn more about source generator here , here and here
- POC source generator technique
- Source generator attribute (both for producer & consumer) will define a version
- Producer will inject this version to the message metadata
- Consumer will ignore messages which not match its version
- Ignore pattern
- Skip [BREAK ORDERING]: simply ignore with ACK (do not process it). Good when other versions have different subscription group
- Wait [SLOW]: returns the message to the stream as unhandled (hopes someone else will handle it) and delays next consumption.
- Ignore pattern
- Multiple consumer can join single subscription in a broadcast format
await using IConsumerLifetime subscription = _consumerBuilder
.WithOptions(o => consumerOptions)
.WithCancellation(cancellation)
.Partition(PARTITION)
.Shard(SHARD)
.WithLogger(_fakeLogger)
.SubscribeEventFlowV2Consumer()
.SubscribeEventFlowV1();
[GenerateEventSource(EventSourceGenType.Consumer, multi)]
public interface IEventFlowV2
{
/// <summary>
/// Stages 1.
/// </summary>
/// <param name="PII">The PII.</param>
/// <param name="payload">The payload.</param>
/// <returns></returns>
[Versions(from=1,to=3)]
ValueTask Stage1Async(Person PII, string payload);
[Versions(from=4)]
ValueTask Stage1Async(Person PII, JsonElement payload);
/// <summary>
/// Stages the 2.
/// </summary>
/// <param name="PII">The PII.</param>
/// <param name="data">The data.</param>
/// <returns></returns>
ValueTask Stage2Async(JsonElement PII, JsonElement data);
ValueTask Stage2Async(JsonElement PII, int data);
}
Brainstorm versioning logic
Why versioning
Different parts of the event flow may be changed over time
- Communication Interfaces & DTOs
- Communication channels
- Metadata (properties)
- Storage strategy
Requirements
- Processing sequence should be maintained (at least for some scenarios)
Problem
- Newer message version may not be compatible with old versions.
Solution: Naming convention
DTO / Operations name must end with Version indication.
any change should result in different suffixes.
Challenge
A consumer should be capable of handling multi-version in some filtering logic of first win.
Mitigation
// pseudo code
.Subscribe<IHandlerV9>(v => v >= 1);
.Subscribe<IHandlerV13>(v => v >= 10 && v < 14);
.Subscribe<IHandlerV17>(14, 17); // handle version 14 -17
.Subscribe<IHandlerV18>(18); // handle version 18
.Fallback(meta =>
{
logger.LogWarning($"version {meta.Version} is missing");
return Instruction.Hang; // release ownership & hand consumption for awhile in hope of newer consumer to take the message
});
All subscriptions should register (sequentially to the channel) in order to keep sequential processing
Problem: enforce naming convention
- Suggestion: Source Generator for the interfaces & DTOs (maybe over proto files)
Problem: Channel behavior compatibility
- Channel properties or storage-strategy may change.
Solution: Chanel storage compatibility
- Consumer channel can register multiple message formatter & storage strategies
Keeping code relative clean
All different version related code should be consumed via DI
Code Gen: Taking using from origin file (when having custom namespace)
[GenerateEventSource(EventSourceGenType.Consumer)]
[GenerateEventSource(EventSourceGenType.Producer)]
//[GenerateEventSource(EventSourceGenType.Consumer, Namespace = "Weknow.EventSource.Backbone.WebEventTest")]
//[GenerateEventSource(EventSourceGenType.Producer, Namespace = "Weknow.EventSource.Backbone.WebEventTest")]
[EditorBrowsable(EditorBrowsableState.Never)]
[Obsolete("Used for code generation, use the producer / consumer version of it", true)]
public interface IEventFlow
Check & Learn from alternative
XState Observability
Chart of each microservice i.e. which micro-service consume event of which producer
Version changing scenarios:
The goal of this issue is to surface sample for changes of entities which break compatibility.
It'll use to address realistic the versioning's tests.
1 Split entity (motivation: database limitation, for example Neo4j don't support nested entities)
From:
class User
{
int Id {get; set;}
string Name {get; set;}
string[] Tags {get; set;}
}
To
class Tag
{
string Name {get; set;}
byte Strength {get; set;}
}
class UserInfo
{
User User {get; set;}
Tag[] Tags {get; set;}
}
class User
{
int Id {get; set;}
string Name {get; set;}
}
Async Enumerable
- Generate DTOs from the methods parameters
- Use pattern matching in order of filling the DTOs (by the operation name)
- TBD: what is the type if the enumerator (it is disseminated union by nature)
- suggestion: having
T GetByOperation<T>(string operation)
but it not perfect
- suggestion: having
Test Scenario: Define challenging test scenarios.
- consuming different type from same source
Return form SendAsync
Motivation
Depending on the storage strategy the data of the event may be useful for classic request / Response operations.
For example using the stored data to some presentation at the application side.
The idea is to have some kind of bag which storage strategy can fill.
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. ๐๐๐
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google โค๏ธ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.