Giter Club home page Giter Club logo

event-source-backbone's People

Contributors

aviavni avatar bnayae avatar

Stargazers

 avatar

Watchers

 avatar  avatar  avatar

Forkers

b-so-t bnayae

event-source-backbone's Issues

Separate Reconstruct vs Temporal notification

Event metadata should specify whether the event take part in a reconstruction process.
For example:

The 'Saved' notifications are relevant for reconstruction while rejected & issued didn't end up with persistence, therefore don't relevant to reconstruction.

Partial subscription

Being able to register for specific stram's event without waiting for the processing of other events.

  • will require to define a subscription group (isolate from the subscription of different events).

Requirements:

  • basic functionality
  • support for non-auto-ack (avoid blocking on the unrecognized event)
  • bridge should reflect whether it had handle the event
  • having behavior options to define the desired behavior (like strict, allow-when-auto-ack, ack-if-unhandled)

Check Orleans Event Sourcing

Dynamic / Semi Dynamic Producer Channeling (TBD)

Motivation

Change Partition or Shard according to runtime state.
For example: Service or Application Environment or Feature Flags.

Optional Solution

Semi Dynamic:

Use the Merge functionality + Filter optional filtering on the shard or partition level.

  • Merge should have options of Broadcast / First
  • Risk: it's up to the dev to make sure he have a fallback

Dynamic filtering

Having Partition / Shard formatter which will format it before forwarding to the channel.

  • Risk: might format something that the consumer don't listen to (less declarative).

Consumer: Error Handling

Make sure that non-fatal error don't complete the Source-Block, yet visible to the consumer

Dead letter stream?
Indication about:

  • retries (+ custom logic)

Id lookup layer

Useful for scenarios like a migration of a stream into a new one.
Will Assist at GetByIdAsync

Open Closed key & metadata convention

The motivation is to enable organizations to define their path & metadata semantics, yet to have a typed representation of this semantic. In order to keeping a strict convention within multiple teams within the organization.

This spec deals with two kinds of semantics:

  • Stream name: dictate the stream name (for example environment, partition, shard)
  • Deployment context: contextual information like which cluster/namespace the event created on and which one is handling the current execution (this one is related to a scenario when two different contexts synchronize their events)

What will be affected by this feature

  • Builders
  • Metadata
  • Channels

Circuit-breaker + Dead stream

  • Have Circuit-breaker policy for technical problem like data / channel corruption.
  • Have Dead stream policy for max retry of processing a message (for specific consumer channel)

Source Sharding Insight

According to the business stream can be splits into sharding as long as it keep business domain separation.
When same data may be related for different data slicing portion, the message may be send to each aspect separately,
For example each order processing can use different source, while suppliers can use different source (which share some of the ordering message) in order to build the supplier interaction state.

In-order to enable micro-service to listen on sources which may be created in the future (for example future order)
We need to have source for source channel creation (and to store it into a query storage)

Review Shard / Tagging concept.

Discuss edge cases, best practice & API / builder flow.

  • Affinity between tag and shard.
    does messages of specific shard had to use same tags?
  • Consumer discovery of shards & related tags.
    How to discover that existing shard is having new tag.

Config: strict/non-strict consumption

When inheriting an interface.
Producing may happen on the level of the derived interface, while consumers can be at the base interface level.
configuration should determine whether to hang on a non-exist method.
Hanging may be useful when another consumer consumes the rest of the messages.

  • Strict
    • Should indicate whether it is an error or waiting
    • Waiting: should release the pending if it does not exist
      • When releasing pending should await and start consuming again with incremental chunk size

Forwarder

Add buit-in forwarder operations:

  • read messages from one stream + storage strategy
  • forward it to other streams + storage strategy (can use the Merge operation to broadcast)
  • it should be at the message segmentation level (interface contract agnostic)
  • in case of a cross cluster (external call) it can do API call (which will handle it at the segmentation level)
    • have HttpClient/Factory as parameter
  • Consumer builder should allow filter by origin (ignore copy = default?)

Required Functionality

  • #34
  • #24
  • Introduce parallelism
    • Aggregation of consume messages (ability to pipe it through BatchBlock and alike while still being able to acknowledge at the end)
  • #33
  • #42
  • Dynamic Partitioning & Sharding: With Strategy Lambda for the source name based on the metadata. For example: Tenant, Geo
  • #30
  • #29
  • #35
  • Source Generation
    • Producer
    • Consumer
    • Remove Reflection base
  • Allow plan mutation on builder's Build()
  • Metadata should be available via context (stateless style).
  • Client Recovery
  • Acknowledge
    • Acknowledge on succeed
    • Acknowledge for the message: can be available via context
    • Pending & Claim
      • Self Pending: should fetch - on first time, after error, first time when have no-element.
      • Work stealing
        • When pending of other consumers are lingered to long.
      • Consumers should monitor for pending and claim long pending messages. (XCLAIM)
  • Builder should enable different channel for messages & segments (actually it might be channel per segment key) for example S3
    • Enable base channel for REDIS Stream + injection for handling different storage for segments & interceptors
  • Integration Tests
    • Should delete REDIS key on test disposal
    • Should verify no pending messages
    • Should ensure message sequence
    • Test crash scenario when same consumer name continue after the crash
  • Consumer should define Consumer Group [optional]
  • REDIS Channel: Provider (stream + hash) (options: batch)
  • Default Segmentation
  • ProducerBase should be implemented as a unit of work in order to support multiple parameters
    • Get logger
  • use class instead of using Bucket = System.Collections.Immutable.ImmutableDictionary<string, System.ReadOnlyMemory>;
  • execution pipeline (coordinates the different stages, interceptors, segmentation, channel)
    build -> create metadata -> create channel (meta, segmentation) -> decorate {for interception}
  • Test Channel
  • Merging implementation
  • Abstract Factory concept for producer / consumer clients
  • Custom Segmentation (for GDPR and alike)
  • Custom Serialization
  • Custom Channel
  • Consume raw message: try to do it friendly for pattern matching: action name and case to type

Versioning & Source Generator

Source Generator can solve the versioning issue.

  • learn more about source generator here , here and here
  • POC source generator technique
  • Source generator attribute (both for producer & consumer) will define a version
  • Producer will inject this version to the message metadata
  • Consumer will ignore messages which not match its version
    • Ignore pattern
      • Skip [BREAK ORDERING]: simply ignore with ACK (do not process it). Good when other versions have different subscription group
      • Wait [SLOW]: returns the message to the stream as unhandled (hopes someone else will handle it) and delays next consumption.
  • Multiple consumer can join single subscription in a broadcast format
    await using IConsumerLifetime subscription = _consumerBuilder
                         .WithOptions(o => consumerOptions)
                         .WithCancellation(cancellation)
                         .Partition(PARTITION)
                         .Shard(SHARD)
                         .WithLogger(_fakeLogger)
                           .SubscribeEventFlowV2Consumer()
                           .SubscribeEventFlowV1();

    [GenerateEventSource(EventSourceGenType.Consumer, multi)]
    public interface IEventFlowV2
    {
        /// <summary>
        /// Stages 1.
        /// </summary>
        /// <param name="PII">The PII.</param>
        /// <param name="payload">The payload.</param>
        /// <returns></returns>
        [Versions(from=1,to=3)]
        ValueTask Stage1Async(Person PII, string payload);

        [Versions(from=4)]
        ValueTask Stage1Async(Person PII, JsonElement payload);
        /// <summary>
        /// Stages the 2.
        /// </summary>
        /// <param name="PII">The PII.</param>
        /// <param name="data">The data.</param>
        /// <returns></returns>
        ValueTask Stage2Async(JsonElement PII, JsonElement data);
        ValueTask Stage2Async(JsonElement PII, int data);
    }

Brainstorm versioning logic

Why versioning

Different parts of the event flow may be changed over time

  • Communication Interfaces & DTOs
  • Communication channels
    • Metadata (properties)
    • Storage strategy

Requirements

  • Processing sequence should be maintained (at least for some scenarios)

Problem

  • Newer message version may not be compatible with old versions.

Solution: Naming convention

DTO / Operations name must end with Version indication.
any change should result in different suffixes.

Challenge

A consumer should be capable of handling multi-version in some filtering logic of first win.

Mitigation

// pseudo code
.Subscribe<IHandlerV9>(v => v >= 1);
.Subscribe<IHandlerV13>(v => v >= 10 && v < 14);
.Subscribe<IHandlerV17>(14, 17); // handle version 14 -17
.Subscribe<IHandlerV18>(18); // handle version 18
.Fallback(meta => 
{
    logger.LogWarning($"version {meta.Version} is missing");
    return Instruction.Hang; // release ownership & hand consumption for awhile in hope of newer consumer to take the message  
});

All subscriptions should register (sequentially to the channel) in order to keep sequential processing


Problem: enforce naming convention

  • Suggestion: Source Generator for the interfaces & DTOs (maybe over proto files)

Problem: Channel behavior compatibility

  • Channel properties or storage-strategy may change.

Solution: Chanel storage compatibility

  • Consumer channel can register multiple message formatter & storage strategies

Keeping code relative clean

All different version related code should be consumed via DI

Code Gen: Taking using from origin file (when having custom namespace)

[GenerateEventSource(EventSourceGenType.Consumer)]
[GenerateEventSource(EventSourceGenType.Producer)]
//[GenerateEventSource(EventSourceGenType.Consumer, Namespace = "Weknow.EventSource.Backbone.WebEventTest")]
//[GenerateEventSource(EventSourceGenType.Producer, Namespace = "Weknow.EventSource.Backbone.WebEventTest")]
[EditorBrowsable(EditorBrowsableState.Never)]
[Obsolete("Used for code generation, use the producer / consumer version of it", true)]
public interface IEventFlow

Version changing scenarios:

The goal of this issue is to surface sample for changes of entities which break compatibility.
It'll use to address realistic the versioning's tests.

1 Split entity (motivation: database limitation, for example Neo4j don't support nested entities)

From:

class User 
{
    int Id {get; set;}
    string Name {get; set;}
    string[] Tags {get; set;} 
}

To

class Tag
{
    string Name {get; set;}
    byte Strength {get; set;} 
}

class UserInfo 
{
    User User {get; set;}
    Tag[] Tags {get; set;} 
}
class User 
{
    int Id {get; set;}
    string Name {get; set;}
}

Async Enumerable

  • Generate DTOs from the methods parameters
  • Use pattern matching in order of filling the DTOs (by the operation name)
  • TBD: what is the type if the enumerator (it is disseminated union by nature)
    • suggestion: having T GetByOperation<T>(string operation) but it not perfect

Return form SendAsync

Motivation

Depending on the storage strategy the data of the event may be useful for classic request / Response operations.
For example using the stored data to some presentation at the application side.

The idea is to have some kind of bag which storage strategy can fill.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.