Giter Club home page Giter Club logo

eventsourcing's Introduction

Overview

This set of modules is a post implementation of @jen20's way of implementing event sourcing. You can find the original blog post here and github repo here.

It's structured in two main parts:

Event Sourcing

Event Sourcing is a technique to make it possible to capture all changes to an application state as a sequence of events.

Aggregate Root

The aggregate root is the central point where events are bound. The aggregate struct needs to embed eventsourcing.AggreateRoot to get the aggregate behaviors.

Below, a Person aggregate where the Aggregate Root is embedded next to the Name and Age properties.

type Person struct {
	eventsourcing.AggregateRoot
	Name string
	Age  int
}

The aggregate needs to implement the Transition(event eventsourcing.Event) and Register(r eventsourcing.RegisterFunc) methods to fulfill the aggregate interface. This methods define how events are transformed to build the aggregate state and which events to register into the repository.

Example of the Transition method from the Person aggregate.

// Transition the person state dependent on the events
func (person *Person) Transition(event eventsourcing.Event) {
    switch e := event.Data().(type) {
    case *Born:
            person.Age = 0
            person.Name = e.Name
    case *AgedOneYear:
            person.Age += 1
    }
}

The Born event sets the Person property Age and Name, and the AgedOneYear adds one year to the Age property. This makes the state of the aggregate flexible and could easily change in the future if required.

Example or the Register method:

// Register callback method that register Person events to the repository
func (person *Person) Register(r eventsourcing.RegisterFunc) {
    r(&Born{}, &AgedOneYear{})
}

The Born and AgedOneYear events are now registered to the repository when the aggregate is registered.

Aggregate Event

An event is a clean struct with exported properties that contains the state of the event.

Example of two events from the Person aggregate.

// Initial event
type Born struct {
    Name string
}

// Event that happens once a year
type AgedOneYear struct {}

When an aggregate is first created, an event is needed to initialize the state of the aggregate. No event, no aggregate. Below is an example of a constructor that returns the Person aggregate and inside it binds an event via the TrackChange function. It's possible to define rules that the aggregate must uphold before an event is created, in this case the person's name must not be blank.

// CreatePerson constructor for Person
func CreatePerson(name string) (*Person, error) {
	if name == "" {
		return nil, errors.New("name can't be blank")
	}
	person := Person{}
	person.TrackChange(&person, &Born{Name: name})
	return &person, nil
}

When a person is created, more events could be created via methods on the Person aggregate. Below is the GrowOlder method which in turn triggers the event AgedOneYear. This event is tracked on the person aggregate.

// GrowOlder command
func (person *Person) GrowOlder() {
	person.TrackChange(person, &AgedOneYear{})
}

Internally the TrackChange methods calls the Transition method on the aggregate to transform the aggregate based on the newly created event.

To bind metadata to events use the TrackChangeWithMetadata method.

The internal Event looks like this.

type Event struct {
    // aggregate identifier 
    aggregateID string
    // the aggregate version when this event was created
    version         Version
    // the global version is based on all events (this value is only set after the event is saved to the event store) 
    globalVersion   Version
    // aggregate type (Person in the example above)
    aggregateType   string
    // UTC time when the event was created  
    timestamp       time.Time
    // the specific event data specified in the application (Born{}, AgedOneYear{})
    data            interface{}
    // data that don´t belongs to the application state (could be correlation id or other request references)
    metadata        map[string]interface{}
}

To access properties on the event you can use the corresponding methods exposing them, e.g AggregateID(). This prevent external parties to modify the event from the outside.

Aggregate ID

The identifier on the aggregate is default set by a random generated string via the crypt/rand pkg. It is possible to change the default behaivior in two ways.

  • Set a specific id on the aggregate via the SetID func.
var id = "123"
person := Person{}
err := person.SetID(id)
  • Change the id generator via the global eventsourcing.SetIDFunc function.
var counter = 0
f := func() string {
	counter++
	return fmt.Sprint(counter)
}

eventsourcing.SetIDFunc(f)

Event Repository

The event repository is used to save and retrieve aggregate events. The main functions are:

// saves the events on the aggregate
Save(a aggregate) error

// retrieves and build an aggregate from events based on its identifier
// possible to cancel from the outside
GetWithContext(ctx context.Context, id string, a aggregate) error

// retrieves and build an aggregate from events based on its identifier
Get(id string, a aggregate) error

The event repository constructor input values is an event store, this handles the reading and writing of events and builds the aggregate based on the events.

repo := NewRepository(eventStore EventStore) *Repository

Here is an example of a person being saved and fetched from the repository.

// the person aggregate has to be registered in the repository
repo.Register(&Person{})

person := person.CreatePerson("Alice")
person.GrowOlder()
repo.Save(person)
twin := Person{}
repo.Get(person.Id, &twin)

Event Store

The only thing an event store handles are events, and it must implement the following interface.

// saves events to the under laying data store.
Save(events []core.Event) error

// fetches events based on identifier and type but also after a specific version. The version is used to load event that happened after a snapshot was taken.
Get(id string, aggregateType string, afterVersion core.Version) (core.Iterator, error)

Currently, there are four internal implementations.

  • SQL - go get github.com/hallgren/eventsourcing/eventstore/sql
  • Bolt - go get github.com/hallgren/eventsourcing/eventstore/bbolt
  • Event Store DB - go get github.com/hallgren/eventsourcing/eventstore/esdb
  • RAM Memory - part of the main module

And one external.

Custom event store

If you want to store events in a database beside the already implemented event stores (sql, bbolt, esdb and memory) you can implement, or provide, another event store. It has to implement the EventStore interface to support the eventsourcing.Repository.

type EventStore interface {
    Save(events []core.Event) error
    Get(id string, aggregateType string, afterVersion core.Version) (core.Iterator, error)
}

The event store needs to import the github.com/hallgren/eventsourcing/core module that expose the core.Event, core.Version and core.Iterator types.

Encoder

Before an eventsourcing.Event is stored into a event store it has to be tranformed into an core.Event. This is done with an encoder that serializes the data properties Data and Metadata into []byte. When a event is fetched the encoder deserialises the Data and Metadata []byte back into there actual types.

The event repository has a default encoder that uses the encoding/json package for serialization/deserialization. It can be replaced by using the Encoder(e encoder) method on the event repository and has to follow this interface:

type encoder interface {
	Serialize(v interface{}) ([]byte, error)
	Deserialize(data []byte, v interface{}) error
}

Event Subscription

The repository expose four possibilities to subscribe to events in realtime as they are saved to the repository.

All(func (e Event)) *subscription subscribes to all events.

AggregateID(func (e Event), events ...aggregate) *subscription events bound to specific aggregate based on type and identity. This makes it possible to get events pinpointed to one specific aggregate instance.

Aggregate(func (e Event), aggregates ...aggregate) *subscription subscribes to events bound to specific aggregate type.

Event(func (e Event), events ...interface{}) *subscription subscribes to specific events. There are no restrictions that the events need to come from the same aggregate, you can mix and match as you please.

Name(f func(e Event), aggregate string, events ...string) *subscription subscribes to events based on aggregate type and event name.

The subscription is realtime and events that are saved before the call to one of the subscribers will not be exposed via the func(e Event) function. If the application depends on this functionality make sure to call Subscribe() function on the subscriber before storing events in the repository.

The event subscription enables the application to make use of the reactive patterns and to make it more decoupled. Check out the Reactive Manifesto for more detailed information.

Example on how to set up the event subscription and consume the event FrequentFlierAccountCreated

// Setup a memory based repository
repo := eventsourcing.NewRepository(memory.Create())
repo.Register(&FrequentFlierAccountAggregate{})

// subscriber that will trigger on every saved events
s := repo.Subscribers().All(func(e eventsourcing.Event) {
    switch e := event.Data().(type) {
        case *FrequentFlierAccountCreated:
            // e now have type info
            fmt.Println(e)
        }
    }
)

// stop subscription
s.Close()

Snapshot

If an aggregate has a lot of events it can take some time fetching it's event and building the aggregate. This can be optimized with the help of a snapshot. The snapshot is the state of the aggregate on a specific version. Instead of iterating all aggregate events, only the events after the version is iterated and used to build the aggregate. The use of snapshots is optional and is exposed via the snapshot repository.

Snapshot Repository

The snapshot repository is used to fetch and save aggregate-based snapshots and events (if there are events after the snapshot version).

The snapshot repository is a layer on top of the event repository.

NewSnapshotRepository(snapshotStore core.SnapshotStore, eventRepo *EventRepository) *SnapshotRepository
// fetch the aggregate based on its snapshot and the events after the version of the snapshot
GetWithContext(ctx context.Context, id string, a aggregate) error

// only fetch the aggregate snapshot and not any events
GetSnapshot(ctx context.Context, id string, a aggregate) error

// store the aggregate events and after the snapshot
Save(a aggregate) error

// Store only the aggregate snapshot. Will return an error if there are events that are not stored on the aggregate
SaveSnapshot(a aggregate) error

// expose the underlying event repository.
EventRepository() *EventRepository

// register the aggregate in the underlying event repository
Register(a aggregate)

Snapshot Store

Like the event store's the snapshot repository is built on the same design. The snapshot store has to implement the following methods.

type SnapshotStore interface {
	Save(snapshot Snapshot) error
	Get(ctx context.Context, id, aggregateType string) (Snapshot, error)
}

Unexported aggregate properties

As unexported properties on a struct is not possible to serialize there is the same limitation on aggregates. To fix this there are optional callback methods that can be added to the aggregate struct.

type SnapshotAggregate interface {
	SerializeSnapshot(SerializeFunc) ([]byte, error)
	DeserializeSnapshot(DeserializeFunc, []byte) error
}

Example:

// aggregate
type Person struct {
	eventsourcing.AggregateRoot
	unexported string
}

// snapshot struct
type PersonSnapshot struct {
	UnExported string
}

// callback that maps the aggregate to the snapshot struct with the exported property
func (s *snapshot) SerializeSnapshot(m eventsourcing.SerializeFunc) ([]byte, error) {
	snap := snapshotInternal{
		Unexported: s.unexported,
	}
	return m(snap)
}

// callback to map the snapshot back to the aggregate
func (s *snapshot) DeserializeSnapshot(m eventsourcing.DeserializeFunc, b []byte) error {
	snap := snapshotInternal{}
	err := m(b, &snap)
	if err != nil {
		return err
	}
	s.unexported = snap.UnExported
	return nil
}

Projections

Projections is a way to build read-models based on events. A read-model is way to expose data from events in a different form. Where the form is optimized for read-only queries.

If you want more background on projections check out Derek Comartin projections article Projections in Event Sourcing: Build ANY model you want! or Martin Fowler's CQRS.

Projection Handler

The Projection handler is the central part where projections are created. It's available from the event repository by the eventrepo.Projections property but can also be created standalone.

// access via the event repository
eventRepo := eventsourcing.NewEventRepository(eventstore)
ph := eventRepo.Projections

// standalone without the event repository
ph := eventsourcing.NewProjectionHandler(register, encoder)

The projection handler include the event register and a encoder to deserialize events from an event store to application event.

Projection

A projection is created from the projection handler via the Projection() method. The method takes a fetchFunc and a callbackFunc and returns a pointer to the projection.

p := ph.Projection(f fetchFunc, c callbackFunc)

The fetchFunc must return (core.Iterator, error), i.e the same signature that event stores return when they return events.

type fetchFunc func() (core.Iterator, error)

The callbackFunc is called for every iterated event inside the projection. The event is typed and can be handled in the same way as the aggregate Transition() method.

type callbackFunc func(e eventsourcing.Event) error

Example: Creates a projection that fetch all events from an event store and handle them in the callbackF.

p := eventRepo.Projections.Projection(es.All(0, 1), func(event eventsourcing.Event) error {
	switch e := event.Data().(type) {
	case *Born:
		// handle the event
	}
	return nil
})

Projection execution

A projection can be started in three different ways.

RunOnce

RunOnce fetch events from the event store one time. It returns true if there were events to iterate otherwise false.

RunOnce() (bool, ProjectionResult)

RunToEnd

RunToEnd fetch events from the event store until it reaches the end of the event stream. A context is passed in making it possible to cancel the projections from the outside.

RunToEnd(ctx context.Context) ProjectionResult

RunOnce and RunToEnd both return a ProjectionResult

type ProjectionResult struct {
	Error          		error
	ProjectionName 		string
	LastHandledEvent	Event
}
  • Error Is set if the projection returned an error
  • ProjectionName Is the name of the projection
  • LastHandledEvent The last successfully handled event (can be useful during debugging)

Run

Run will run forever until event consumer is returning an error or if it's canceled from the outside. When it hits the end of the event stream it will start a timer and sleep the time set in the projection property Pace.

Run(ctx context.Context, pace time.Duration) error

A running projection can be triggered manually via TriggerAsync() or TriggerSync().

Projection properties

A projection have a set of properties that can affect it's behaivior.

  • Strict - Default true and it will trigger an error if a fetched event is not registered in the event Register. This force all events to be handled by the callbackFunc.
  • Name - The name of the projection. Can be useful when debugging multiple running projection. The default name is the index it was created from the projection handler.

Run multiple projections

Group

A set of projections can run concurrently in a group.

g := ph.Group(p1, p2, p3)

A group is started with g.Start() where each projection will run in a separate go routine. Errors from a projection can be retrieved from a error channel g.ErrChan.

The g.Stop() method is used to halt all projections in the group and it returns when all projections has stopped.

// create three projections
p1 := ph.Projection(es.All(0, 1), callbackF)
p2 := ph.Projection(es.All(0, 1), callbackF)
p3 := ph.Projection(es.All(0, 1), callbackF)

// create a group containing the projections
g := ph.Group(p1, p2, p3)

// Start runs all projections concurrently
g.Start()

// Stop terminate all projections and wait for them to return
defer g.Stop()

// handling error in projection or termination from outside
select {
	case err := <-g.ErrChan:
		// handle the error
	case <-doneChan:
		// stop signal from the out side
		return
}

The pace of the projection can be changed with the Pace property. Default is every 10 second.

If the pace is not fast enough for some senario it's possible to trigger manually.

TriggerAsync(): Triggers all projections in the group and return.

TriggerSync(): Triggers all projections in the group and wait for them running to the end of there event streams.

Race

Compared to a group the race is a one shot operation. Instead of fetching events continuously it's used to iterate and process all existing events and then return.

The Race() method starts the projections and run them to the end of there event streams. When all projections are finished the method return.

Race(cancelOnError bool, projections ...*Projection) ([]ProjectionResult, error)

If cancelOnError is set to true the method will halt all projections and return if any projection is returning an error.

The returned []ProjectionResult is a collection of all projection results.

Race example:

// create two projections
ph := eventsourcing.NewProjectionHandler(register, eventsourcing.EncoderJSON{})
p1 := ph.Projection(es.All(0, 1), callbackF)
p2 := ph.Projection(es.All(0, 1), callbackF)

// true make the race return on error in any projection
result, err := p.Race(true, r1, r2)

eventsourcing's People

Contributors

dependabot[bot] avatar hallgren avatar hallgrenx avatar jen20 avatar ksaveras avatar lundholmx avatar lunjon avatar tonpc64 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar

eventsourcing's Issues

sql eventstore missing global order id

Needed to preserve global order between aggregate types.
The GlobalGet will also need to select on this global id instead of the Version that is aggregate dependent.

Data race

% go test -race
==================
WARNING: DATA RACE
Write at 0x00c0001b2320 by goroutine 2063:
  reflect.Value.SetString()
      /usr/local/go/src/reflect/value.go:2463 +0x7c
  encoding/json.(*decodeState).literalStore()
      /usr/local/go/src/encoding/json/decode.go:947 +0xc08
  encoding/json.(*decodeState).value()
      /usr/local/go/src/encoding/json/decode.go:388 +0x1a0
  encoding/json.(*decodeState).object()
      /usr/local/go/src/encoding/json/decode.go:755 +0xf88
  encoding/json.(*decodeState).value()
      /usr/local/go/src/encoding/json/decode.go:374 +0x78
  encoding/json.(*decodeState).unmarshal()
      /usr/local/go/src/encoding/json/decode.go:181 +0x300
  encoding/json.Unmarshal()
      /usr/local/go/src/encoding/json/decode.go:108 +0x1d0
  github.com/hallgren/eventsourcing.EncoderJSON.Deserialize()
      /Users/morganh/code/eventsourcing/encoderjson.go:12 +0x5c
  github.com/hallgren/eventsourcing.(*EncoderJSON).Deserialize()
      <autogenerated>:1 +0x20
  github.com/hallgren/eventsourcing.(*Projection).RunOnce()
      /Users/morganh/code/eventsourcing/projections.go:188 +0x3c4
  github.com/hallgren/eventsourcing.(*Projection).RunToEnd()
      /Users/morganh/code/eventsourcing/projections.go:141 +0xd8
  github.com/hallgren/eventsourcing.(*ProjectionHandler).Race.func1()
      /Users/morganh/code/eventsourcing/projections.go:293 +0xc4
  github.com/hallgren/eventsourcing.(*ProjectionHandler).Race.func2()
      /Users/morganh/code/eventsourcing/projections.go:306 +0x54

Previous write at 0x00c0001b2320 by goroutine 2062:
  reflect.Value.SetString()
      /usr/local/go/src/reflect/value.go:2463 +0x7c
  encoding/json.(*decodeState).literalStore()
      /usr/local/go/src/encoding/json/decode.go:947 +0xc08
  encoding/json.(*decodeState).value()
      /usr/local/go/src/encoding/json/decode.go:388 +0x1a0
  encoding/json.(*decodeState).object()
      /usr/local/go/src/encoding/json/decode.go:755 +0xf88
  encoding/json.(*decodeState).value()
      /usr/local/go/src/encoding/json/decode.go:374 +0x78
  encoding/json.(*decodeState).unmarshal()
      /usr/local/go/src/encoding/json/decode.go:181 +0x300
  encoding/json.Unmarshal()
      /usr/local/go/src/encoding/json/decode.go:108 +0x1d0
  github.com/hallgren/eventsourcing.EncoderJSON.Deserialize()
      /Users/morganh/code/eventsourcing/encoderjson.go:12 +0x5c
  github.com/hallgren/eventsourcing.(*EncoderJSON).Deserialize()
      <autogenerated>:1 +0x20
  github.com/hallgren/eventsourcing.(*Projection).RunOnce()
      /Users/morganh/code/eventsourcing/projections.go:188 +0x3c4
  github.com/hallgren/eventsourcing.(*Projection).RunToEnd()
      /Users/morganh/code/eventsourcing/projections.go:141 +0xd8
  github.com/hallgren/eventsourcing.(*ProjectionHandler).Race.func1()
      /Users/morganh/code/eventsourcing/projections.go:293 +0xc4
  github.com/hallgren/eventsourcing.(*ProjectionHandler).Race.func2()
      /Users/morganh/code/eventsourcing/projections.go:306 +0x54

Goroutine 2063 (running) created at:
  github.com/hallgren/eventsourcing.(*ProjectionHandler).Race()
      /Users/morganh/code/eventsourcing/projections.go:291 +0x154
  github.com/hallgren/eventsourcing_test.TestRace()
      /Users/morganh/code/eventsourcing/projections_test.go:304 +0x5c4
  testing.tRunner()
      /usr/local/go/src/testing/testing.go:1595 +0x1b0
  testing.(*T).Run.func1()
      /usr/local/go/src/testing/testing.go:1648 +0x40

Goroutine 2062 (running) created at:
  github.com/hallgren/eventsourcing.(*ProjectionHandler).Race()
      /Users/morganh/code/eventsourcing/projections.go:291 +0x154
  github.com/hallgren/eventsourcing_test.TestRace()
      /Users/morganh/code/eventsourcing/projections_test.go:304 +0x5c4
  testing.tRunner()
      /usr/local/go/src/testing/testing.go:1595 +0x1b0
  testing.(*T).Run.func1()
      /usr/local/go/src/testing/testing.go:1648 +0x40
==================
--- FAIL: TestRace (0.04s)
    testing.go:1465: race detected during execution of test
FAIL
exit status 1
FAIL	github.com/hallgren/eventsourcing	2.108s

document external api

  • event source on the aggregate level
  • repository
  • event store
  • snapshot store
  • overview

Event Store DB supported eventstore

Support the Event Store DB as an eventstore in this pkg.

  • GlobalVersion is not written back on Save. (The last events global version is written to all events)
  • Timestamp in event is from when event is written in the db not when it was created. (This is good enough)
  • esdb.EventData / ContentType need to handle when JSON is not used as serializer.
  • Get currently only fetches 10 events. (It now uses the max value of uint64)
  • Write documentation

Snapshot store

As of version 0.1.0 the snapshot store was removed from the eventsourcing module. The snapshot concept was before part of the main module concept. The new idea is to re-create it as a new module and use it from the outside.

  1. Fetch a snapshot from the snapshot store
  2. Use the snapshot when fetching an aggregate from the repository.

This would work as before but now the snapshot logic is not part of the main module.

Get events by global version

Expose possibility to retrieve events based on there global version.

Makes it possible to build read models async from when the events are written.

// Where the start is the event at the global position and the count is how many events that should be fetched.
// This should include the event at the start position and events written afterwards.
GlobalEvents(start uint64, count int) ([]Events, error)

More about this in Jim Webber's talk link

Break out the event to its own submodule

When a small change is introduced in the main module all eventstore modules has to be re-released as they point to the event struct from the main package. If the event struct is in its own module other modules would only depend on that module making it easier to introduce changes without having to release all modules.

rethink the eventstore serializer relation

The interface between the event store and serilalizer looks like this:

// EventSerializer is the common interface a event serializer must uphold
type EventSerializer interface {
	SerializeEvent(event eventsourcing.Event) ([]byte, error)
	DeserializeEvent(v []byte) (event eventsourcing.Event, err error)
}

The problem is that different event stores (sql / bbolt / memory / ...) store data in different manners. Sql has the possibility to use several columns to store the event properties while bbolt is a key value store and has to serialize the hole event.

The current interface forces every event store to handle the event in the same way that is a limitation and should be reviewed if it could be done in a better way.

counter / projection

hey,
cant figure out, how to make new projection with other projection events.
lets say i have Account model+created/updated and i want to query Count +account model created events

Support for Pocketbase

Hello,

thanks for great design.

I am using pocketbase and looking for an eventsourcing framework to use with it.

Ehat do you think about addidng support for Pocketbase, to use it as base for event and projection stores and to have REST endpoints for events etc.

json.Unmarhal does not deserialize Data property to correct type

The data behind the interface{} on the Data property get the type map[string]interface{} after the json.Unmarhal. This is due to the transformation of an json object.

map[string]interface{}, for JSON objects

The original type information is lost in translation. One solution is to look at how the https://github.com/jetbasrawi/go.geteventstore is solving it via the json.RawMessage type.

This example register the aggreagte and event types
https://github.com/jetbasrawi/go.cqrs/blob/master/examples/simplecqrs/simplecqrs/inventoryitem_repo.go
https://github.com/jetbasrawi/go.cqrs/blob/master/eventfactory.go

Re-design snapshot to not force exported properties

Currently an aggregate properties has to be exported to be serialised into a snapshot. It should be possible to have none exported properties that could be stored in a snapshot. The current implementation limits the aggregate to expose properties that should not be visable to the outside.

Get events to return iterator or channel insted of slice

https://www.reddit.com/r/golang/comments/mp8j2h/event_sourcing_a_year_later/gu8euky?utm_source=share&utm_medium=web2x&context=3

ar3s3ru:
I would try to leverage channels instead of returning slices from the Event Store (this might cause issues with apps with very large streams, although snapshotting is always an option.) I've implemented this in eventually-go, you could check it out to gain inspiration.

Kirides:
i would rather not use channels, but instead prefer some sort of a cursor to iterate.

Channels are something i do not want to see in function return values, but are more-or-less fine in a closure over them, eg Iter(fn func(events <-chan Event)).

In any case, a stream approach is way better than just returning 100-1000 events per aggregate (our maximum number of events per object until a snapshot is made, depending on prod-early/prod)

ar3s3ru:
Channels are something i do not want to see in function return values

Absolutely agree, that's why in eventually channels are injected into functions.

The idea is: callers know about the concrete concurrency requirements of the application, so they should be the one creating channels. However, callees (e.g. Event Store) are the producers. So it's important that each implementation correctly closes the channel once done (and absolutely not do that on the caller side)

Check out eventually to get a better idea: https://github.com/eventually-rs/eventually-go

Close eventstream subscription

Currently it's not possible to close a subscription on the event stream. The repository could potentially call code that is no longer active in the normal application flow.

GetGlobalEvents seams to panic when called on the repository

runtime: goroutine stack exceeds 1000000000-byte limit
runtime: sp=0xc020180380 stack=[0xc020180000, 0xc040180000]
fatal error: stack overflow

runtime stack:
runtime.throw(0x6866d1, 0xe)
/usr/local/go/src/runtime/panic.go:1117 +0x72
runtime.newstack()
/usr/local/go/src/runtime/stack.go:1069 +0x7ed
runtime.morestack()
/usr/local/go/src/runtime/asm_amd64.s:458 +0x8f

goroutine 1 [running]:
github.com/hallgren/eventsourcing.(*Repository).GlobalEvents(0xc04017ff00, 0x4, 0x4, 0x0, 0x0, 0x0, 0x0, 0x0)
/home/morganh/code/go/pkg/mod/github.com/hallgren/[email protected]/repository.go:103 +0x86 fp=0xc020180390 sp=0xc020180388 pc=0x54f8a6
github.com/hallgren/eventsourcing.(*Repository).GlobalEvents(0xc04017ff00, 0x4, 0x4, 0x0, 0x0, 0x0, 0x0, 0x0)
/home/morganh/code/go/pkg/mod/github.com/hallgren/[email protected]/repository.go:104 +0x3f fp=0xc0201803e0 sp=0xc020180390 pc=0x54f85f
github.com/hallgren/eventsourcing.(*Repository).GlobalEvents(0xc04017ff00, 0x4, 0x4, 0x0, 0x0, 0x0, 0x0, 0x0)
/home/morganh/code/go/pkg/mod/github.com/hallgren/[email protected]/repository.go:104 +0x3f fp=0xc020180430 sp=0xc0201803e0 pc=0x54f85f

snapshot handlig

Make it possible to take snapshot of an aggregate and use that to speed up re-builds of aggregates.

eventstore tests

make use of the same test suite for all eventstore implementations. Not as now where the same test suite is in each eventstore pkg.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.