Giter Club home page Giter Club logo

goes's Introduction

goes - Event-Sourcing Framework

Go Reference MongoDB NATS Documentation

goes is a collection of interfaces, tools, and backend implementations that allow you to write event-sourced applicatios in Go.

If you have any questions or feedback, feel free to open an issue or start a discussion.

Getting Started

Read the documentation to get started. It's still a work-in-progress but the best place to get started with development. You can also take a look at the "To-Do" example, which implements a simple event-sourced app that works distributedly.

Contributing

TBD

License

Apache License, Version 2.0

goes's People

Contributors

bounoable avatar dependabot[bot] avatar groggy7 avatar olpie101 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar

goes's Issues

Built-in commands

Add built-in commands for common actions:

Delete aggregate

command.DeleteAggregate() should delete an aggregate by deleting its events:

package example

func delete(aggregateName string, aggregateID uuid.UUID, bus command.Bus) {
    cmd := command.DeleteAggregate(aggregateName, aggregateID)
    bus.Dispatch(context.TODO(), cmd)
}

Handle built-in commands

package main

import "github.com/modernice/goes/command"

func main() {
    var ctx context.Context
    var bus command.Bus

    errs, err := command.HandleBuiltin(ctx, bus)
    if err != nil {
        log.Fatalf("handle built-in goes commands: %w", err)
    }

    for err := range errs {
        log.Println(fmt.Errorf("handle command: %w", err)
    }
}

Development Tools

Event Store

  • delete events and fix aggregate versions of events
  • event migration

Cached Repository

Provide an Aggregate Repository with cache features.

  • Cache fetches of aggregates
  • Cache based on queries (#59)
  • Clear cache
package example

func example(repo aggregate.Repository) { // or aggregate.TypedRepository[...]
  cached := repository.NewCache(repo)

  // Only runs once
  q := query.New(...)
  cached.Query(context.TODO(), q)
  cached.Query(context.TODO(), q)

  // Only runs once
  id := uuid.New()
  cached.Fetch(context.TODO(), id)
  cached.Fetch(context.TODO(), id)

  // Clear cache
  cached.Reset()
}

Allowing Jetstream driver to use mirrors/sources

Not sure if this is part of the scope of the project, however we have a scenario whereby we would like to allow developers to mirror or sources the goes stream from another account and allow them to freely break their environment while still being able to source "real" data. With the current implementation of ensureStream uses the subject to ensure that it is configured correctly. Wondering if stream verification could also take mirrors and sources into consideration?

If there is another approach to handling this use case please let me know.

[MongoDB] Change field type of event data to `Object`

The data field of events is currently Binary. This is slightly annoying when viewing at the data within MongoDB Compass: BinData(0, "ey...."). Need to change the field type to String to be able to view the data.

I want to change the field type to Object, so the event data can be viewed and navigated in Compass.

Should be able to implement this backwards-compatible, so no migration of the event store is needed.

Initial projections in continuous schedule

A complex continuous projection job may creates a projection target that needs all of the past events in order to fully build the projection state. Currently, when a continuous projection schedule triggers, only the events that triggered the schedule in that moment are applied "continuously" to the projections. A projection must be able to hint to the projection job that it needs the full event history for the initial run.

package example

type Foo struct {
  valid bool
}

func (f *Foo) ApplyEvent(e event.Event) {
  switch e.Name {
    case "initialized":
      f.valid = true
    case "changed":
      if !f.valid {
        panic("not allowed")
      }
  }
}

func (f *Foo) RequiresFullHistory() bool {
  return !f.valid
}

Google PubSub backend implementation for the event.Bus

Summary

Provide an Event Bus backend implementation backed by Google PubSub event broker.

Motivation

We are deploying our application GCE and we need an event bus to publish events so the application can use an event driven architecture. One of the uses is as for Event Sourcing and to allow the projectors to see events from aggregates.

We evaluated Google PubSub and saw that it supports Event Ordering. An event can provide an Ordering Key and the infrastructure will warrant the events for the same Ordering Key will be provided in order. Our intent is to use the Aggregate ID as the ordering key.

Proposal

The idea is to implement the event.Bus interface using the cloud.google.com/go/pubsub package. This implementation will take the nats.bus implementation as a base.

Using topicFunc (like subjectFunc in nats implementation), all events of a given aggregate type can be published on the same PubSub topic.

Also the Aggregate ID in the event can be used as an Ordering Key.

Note

I'm using this implementation to learn about goes and its capabilities. Any suggestion or guidence us wellcome.

How to use `SoftRestorer`?

Problem

When the event stream of an aggregate contains a SoftDeleter event, the aggregate can neither be queried nor fetched from the aggregate repository. How can an aggregate be restored if it cannot be fetched to raise the SoftRestorer event?

Example

package example

type RestoredEvent struct {}

func (RestoredEvent) SoftRestore() bool { return true }

func example(repo aggregate.Repository) {
  var foo aggregate.Aggregate // soft-deleted aggregate

  if err := repo.Fetch(context.TODO(), foo); err != nil {
    // fails with repository.ErrDeleted
  }

  // we want to do this
  aggregate.Next(foo, "restored", RestoredEvent{})
  repo.Save(context.TODO(), foo)
}

Proposal – context.Context API

The repository package could provide a "hidden" API using context.Context.WithValue() to disable soft-deletion checks:

package example

func example(repo aggregate.Repository) {
  var foo aggregate.Aggregate

  ctx := repository.WithSoftDeleted(context.TODO())

  repo.Fetch(ctx, foo)
  aggregate.Next(foo, "restored", RestoredEvent{})
  repo.Save(context.TODO(), foo)
}

Drawbacks

  • hiding options behind a Context is considered bad design

Aggregate metadata/tagging

Allow aggregates to provide tags so that they can be queried more efficiently.

package foo

type Foo struct {
    *aggregate.Base
    *aggregate.Tags
}

func NewFoo(id uuid.UUID) *Foo {
    return &Foo{
        Base: aggregate.New("foo", id),
        Tags: &aggregate.Tags{},
    }
}

func example() {
    foo := NewFoo(uuid.New())

    foo.Tag("some-tag", "another-tag")
    foo.HasTag("some-tag")
    foo.Untag("some-tag", "another-tag")

    var repo aggregate.Repository

    str, errs, err := repo.Query(context.TODO(), query.New(
        query.Tag("some-tag", "another-tag"),
    ))
}

Don't decode events and commands before necessary

Example: Command bus receives a "command dispatched" event, constructs the command for internal processing. This requires the command registry to have the command payload registered even if the user did not subscribe to the command.

Solution: Don't construct the command before knowing that the user subscribed to the command.

Query Optimizer

An idea that came to my mind:

package example

func example(q1, q2 event.Query) {
  if query.IsSupersetOf(q2, q1) {
    log.Println("first query would return all events that the second query would return, and possibly more")
  }
}

If this could be implemented, projection jobs could further optimize queries for each individual projection. When a job applies itself onto multiple projections, it can check if a query that ran previously is a superset of the current query, and if so, just return the cached result from the previous query.

A query q1 is a superset of another query q2 if each of q1's filters/constraints is either less or equally restricting than q2's.

MongoDB Event Store Indices

Let users configure the builtin indices that are created by the MongoDB event store.

Motivation

Currently, the MongoDB event store creates 8 indices that are more or less useful/applicable for different kinds of queries. Goal is to reduce the default amount of indices to a minimum, without creating performance issues. If needed, users should be able to enable additional, "named" indices that boost the performance of specific queries.

Proposal

Proposal is to separate indices into two kinds: core and edge. Core indices are always created because they are required by goes' components to be performant. Edge-case indices can be enabled by applications that do more uncommon queries, which wouldn't be able to use the core indices.

Core Indices

  • aggregate name + id + version: query aggregates by name (+ id (+ version))
  • id: fetch a specific event
  • name: query events by name
    • used in many projection startup queries
      • e.g. query.New(query.Name(FooEvent, BarEvent))
  • name + time: fetch all events of a given set of names, sorted by time
    • used in projection jobs when using the default query
  • aggregate name + version: query events of a specific aggregate type, up to a specific version
    • used in projection startup queries
      • e.g. query.New(query.AggregateName("foo"), query.AggregateVersion(1)) to fetch the first event of each "foo" aggregate to extract the aggregates from a projection job in the most performant way

Edge Case Indices

  • aggregate id: fetch a specific aggregate only by its id
  • aggregate version: useful when querying only by version
  • name + version: query a given set of events, up to a given version
    • can be used in projection startup queries
      • e.g. query.New(query.Name(FooEvent, BarEvent), query.AggregateVersion(1)) to extract aggregates from projection jobs

Enabling Indices

Edge-case indices can be enabled individually using feature flags:

package example

func example(enc event.Encoding) {
  store := mongo.NewEventStore(enc, mongo.WithIndices(
    indices.EventStore.AggregateID,
    indices.EventStore.AggregateVersion,
    indices.EventStore.NameAndVersion,
  ))
}

Concurrent Event Handling

Add a new handler.Workers() option to support handling multiple events in parallel without additional code.

Testing setup

Command Bus testing

This is tedious:

package foo_test

func TestSomethingWithCommandBus(t *testing.T) {
    ereg := event.NewRegistry()
    ebus := eventstore.WithBus(chanbus.New(), ebus)
    estore := memstore.New()
    creg := command.NewRegistry()
    cbus := cmdbus.New(creg, ereg, ebus, estore)

    err := cbus.Dispatch(...)
}

Something like this should be possible:

package foo_test

func TestSomethingWithCommandBus(t *testing.T)  {
    setup := test.Setup(t)

    cbus := setup.CommandBus()

    err := cbus.Dispatch(...)
}

Command dispatches should be synchronous by default when using test.Setup:

package foo_test

func TestAsyncDispatch(t *testing.T) {
    setup := test.Setup(t)
    
    cbus := setup.CommandBus(dispatch.Asynchronous) // pass default dispatch options

    err := cbus.Dispatch(...)
}

Review projection APIs

This issue will determine if the different projection APIs are even necessary to solve common problems or if there are better solutions to these problems that don't require these APIs.

HistoryDependent

The HistoryDependent API is used by projection jobs to determine which events a projection needs to properly update itself. If a projection implements HistoryDependent, it can hint to a projection job that it requires the full history of the events that are configured in the job, instead of just the (published) events that triggered the job.

ProgressAware

The ProgressAware API is usually used by persistent projections that are stored in a database. When a projection job is triggered, the application fetches the current projection state from the database, applies the events on it and saves it back to the database. A ProgressAware projection keeps track of the last applied event in terms of the event time, to ensure that no event is applied twice to the projection.

contrib/auth: Permission Granter

Implement a PermissionGranter within the authorization module that grants permissions automatically.

Background

Currently, there is no automated way to grant permissions to actors on specific events. In order to grant permissions when an event is published, the developer must manually implement an event handler that dispatches the GrantToXXX() commands. Additionally, if a developer adds new permissions/actions to the handler, past events must trigger the handler so that actors and roles are granted the new permissions on already existing aggregates.

Proposal

Proposal is to implement a PermissionGranter that can be added to applications as a background task. Users define the events to subscribe to and the events provide the permissions to grant or revoke.

package example

// FooEvent is user-defined event data
type FooEvent {...}

func (evt FooEvent) GrantPermissions(g auth.EventGranter) error {
  // grant actions on the aggregate of the event (evt.Aggregate()) to the given actor
  err := g.GrantToActor(<actor-id>, "action-1", "action-2")

  // revoke actions on the aggregate of the event (evt.Aggregate()) from the given actor
  err := g.RevokeFromActor(<actor-id>, "action-1", "action-2")

  // grant actions on the aggregate of the event (evt.Aggregate()) to the given role
  err := g.GrantToRole(<actor-id>, "action-1", "action-2")

  // revoke actions on the aggregate of the event (evt.Aggregate()) from the given role
  err := g.RevokeFromRole(<actor-id>, "action-1", "action-2")
}

func example(lookup *auth.Lookup,  bus event.Bus, store event.Store) {
  granter := auth.NewPermissionGranter(
    lookup, // lookup is used to lookup actor and role ids
    bus,
    auth.StartupGrant(store), // optionally handle events from the event store when the task is started

    // initial permissions for roles and actors
    auth.InitialRoleGrant(<role-name>, aggregate.Ref{...}, "action-1", "action-2", "..."),
    auth.InitialActorGrant(<actor-string-id>, aggregate.Ref{...}, "action-1", "action-2", "..."),
  )

  errs, err := granter.Run(context.TODO())
}

A Role() method needs to be added to the auth.Lookup for looking up role ids from role names.

View event data in MongoDB Compass as JSON

Summary

Provide a JSON view of event data in a MongoDB event store.

Motivation

During development or debugging, developers often need to view the event data of stored events. This is currently not possible through MongoDB Compass because the event data is stored in binary:

Bildschirmfoto 2022-03-29 um 19 30 58

In order to view the event data, the developer would have to write a program that fetches the event from the store and decodes it using the registered decoder.

Storing the event data directly as JSON into the store is not an option because it is not guaranteed that events are encoded/decoded as JSON.

Proposal

Proposal is to implement a service that builds an eventually consistent JSON view of all events in the event store within a separate collection/database. If a developer requires a JSON view, the service can be easily added to an application and MongoDB Compass can still be used as the UI to search through the events.

package example

func example(store event.Store, bus event.Bus, col *mongo.Collection) {
  // use the events in `store` to fill the JSON view collection `col` on startup.
  // continuously build the view of published events by subscribing to `bus`.
  svc := jsonview.New(store, bus, col)

  errs, err := svc.Run(context.TODO())
}

Tasks

To be able to use the projection scheduling system within the JSON view builder, the scheduling system needs to be extended to allow for wildcard event subscriptions (because the view builder must subscribe to all events).

Automated lookups

package user

const Aggregate = "user"
const NameLookup = "name"
const EmailLookup = "email"

const CreatedEvent = "user.created"

type CreatedData struct {
  Name string
  Email string
}

func (evt Created) ProvideLookup(p lookup.Provider)  {
  p.Provide(Aggregate, NameLookup, evt.Name)
  p.Provide(Aggregate, EmailLookup, evt.Email)
}

// somewhere else

var ctx context.Context

var store event.Store
l := lookup.New(store, Created) // create lookup from events in event store

errs, err := l.Run(ctx) // run the lookup projection

var aggregateID uuid.UUID

userID, err := l.Lookup(ctx, Aggregate, EmailLookup, "[email protected]")
  • only allows string values or use interface{}?
  • reverse lookups?

Publish events using MongoDB Change Streams

By default, events are not published over a bus when inserted into a store. To automatically publish inserted events, a decorator can be used:

package example

func example(store event.Store, bus event.Bus) {
  // store publishes events over the given bus after insertion into the store
  store = eventstore.WithBus(store, bus)
}

This has a flaw: when the process crashes between the insert and publish, the event will never be published, causing a "data-loss" (not really, the event was inserted into the store). If such an event would trigger a projection, the projection would likely be in an invalid state until it is manually reset by a user/developer because there is no way for a projection to know that it is missing events (not even through a projection.Progressor).

Proposal

Proposal is to create a service that uses the MongoDB Change Streams feature to tail the oplog of the event store. When an event is inserted into the database, the oplog notifies the service, which then publishes the event over the event bus.

Change Streams provide a "resume token" which are used to resume tailing the oplog after a service crash/restart. This should ensure that no events are "dropped" / all inserted events are eventually published.

Projection finalizers

Currently, projection finalization is implemented like this:

package example

type Foo struct { ... }

func (*Foo) ApplyEvent(event.Event) {}

func (f *Foo) finalize(ctx context.Context, dep SomeDependency) error {
  // do stuff
  if err := dep.Do(ctx, "..."); err != nil {
    return err
  }
  // do more stuff
  return nil
}

func example(s projection.Schedule) {
  var dep SomeDependency

  s.Subscribe(context.TODO(), func(ctx projection.Job) error {
    refs, errs, err := ctx.Aggregates(ctx)
    if err != nil {
      return fmt.Errorf("extract aggregates: %w", err)
    }

    return streams.Walk(ctx, func(ref aggregate.Ref) error {
      foo := NewFoo(ref.ID)

      if err := ctx.Apply(ctx, foo); err != nil {
        return err
      }

      return foo.finalize(ctx, dep)
    }, refs, errs)
  })
}

Finalization is done for each individual projection after the job applies its events. A nice addition would be if finalization could be batched and deferred to the end of the projection job, like this:

package example

func example(s projection.Schedule) {
  var dep SomeDependency

  s.Subscribe(context.TODO(), func(ctx projection.Job) error {
    refs, errs, err := ctx.Aggregates(ctx)
    if err != nil {
      return fmt.Errorf("extract aggregates: %w", err)
    }

    return streams.Walk(ctx, func(ref aggregate.Ref) error {
      foo := NewFoo(ref.ID)

      if err := ctx.Apply(ctx, foo); err != nil {
        return err
      }

      return ctx.Defer(func() error {
        // this call will be deferred to after this projection update
        return foo.finalize(ctx, dep)
      })
    }, refs, errs)
  })
}

Human-readable command errors

Provide a method to return a human-readable error from a command handler back to the command dispatcher when using the dispatch.Synchronous option.

Command handler in some service:

package example

func handleCommands(ctx context.Context, bus command.Bus) <-chan error {
    h := command.NewHandler()

    return h.MustHandle(ctx, "foo-cmd", func(ctx command.Context) error {
        cmd := ctx.Command()

        err := errors.New("mock error") // some error returned by domain layer

        return command.Fail(err, "Something went wrong while handling %q command.", cmd.Name())
    })
}

func main() {
    var bus command.Bus

    errs := handleCommands(context.TODO(), bus)

    for err := range errs {
        var cmdErr *command.Error
        errors.As(err, &cmdErr) == true

        err.Error() == `Something went wrong while handling "foo-cmd" command.`
        cmdErr.Unwrap().Error() == "mock error"
    }
}

Command dispatcher in another service:

package example

func dispatchCommand(ctx context.Context, bus command.Bus) {
    cmd := command.New("foo-cmd", ...)

    err := bus.Dispatch(ctx, cmd, dispatch.Synchronous())

    err.Error() == `Something went wrong while handling "foo-cmd" command.`

    var cmdErr *command.Error
    errors.As(err, &cmdErr) == true

    cmdErr.Unwrap().Error() == "mock error"
}

Projection debounce wait cap

Problem

Currently, the Debounce(time.Duration) option of the continuous schedule has no cap on the total wait time before a projection job is triggered. Application that continuously publish a lot of events could cause infinite buffering of projection events.

Proposal

Add a DebounceCap(time.Duration) option that specifies the wait cap. Use a sensible duration as the default (5s maybe?).

Introduce Query Parameter in `handler.WithStore()` for Selective Startup Queries

This proposal is to enhance the handler.WithStore() function in the event handler by introducing a query options. This would permit us to constrain the events that are fetched during startup to, for example, a specific timeframe:

package example

import (
  "log"
  "github.com/modernice/goes/event/handler"
  "github.com/modernice/goes/event/query"
  "github.com/modernice/goes/event/query/time"
)

func example() {
  h := handler.New(bus, handler.WithStore(store, query.Time(
    time.After(time.Now().AddDate(0, 0, -7)), // Fetches only the "foo" events from the past 7 days
  ))

  event.HandleWith(h, handleEvent, "foo")

  errs, err := h.Run(context.TODO())
  // handle err

  for err := range errs {
    log.Println(err)
  }
}

func handleEvent(evt event.Event) {
  // event handling logic
}

Background & Motivation

Consider a Mailer that subscribes to events to dispatch emails. The Mailer must be resistent to crashes, so it has to check for missed events on service startup.

We can construct this Mailer as an event handler using the handler.Handler type and handler.WithStore() option provided by goes. The WithStore() option will fetch the events that are registered in the handler from the event store. This could potentially take a lot of time, depending on the size of the event store.

If this proposal would be implemented, startup performance could be greatly improved by narrowing down the event query's temporal scope, which would limit the timeframe for which the events are checked by the Mailer. This could greatly reduce startup time on server restarts.

Additionally, the handler.Handler can be upgraded to support periodic handler restarts, which guarantees that the handler captures any previously overlooked events, all while not excessively consuming computational resources when the event store is very large.

Considerations

The handler.WithStore() option could become difficult to modify in the future without breaking compatibility. We could adjust the design to handler.WithStore(store, handler.StartupQuery(query.New(...))). This may slightly compromise DX, but it would ensure the API is future-proof.

Could also deprecate handler.WithStore() and create a new handler.Startup() option. Would better describe the feature.

To-Do

  • Create another issue for mentioned "period restarts" feature

Nil-UUID safe-guards

goes currently doesn't distinguish between Nil-UUIDs and valid UUIDs. This might lead to problems, notably within aggregates and projections, where Nil-UUIDs are treated as legitimate.

To tackle this, we could introduce protective measures against Nil-UUIDs in both the aggregate and event packages. This change could lead to backwards incompatibility, but the potential for reducing errors and improving overall system robustness makes it seem worthwhile.

Wildcard Subscriptions

Summary

Implement wildcard event subscriptions.

Motivation

While the event store has the capability of querying all events, the event bus requires explicit event names to be passed in order to subscribe to the events. Because of this, a projection cannot depend on all events of an application (it is possible but requires each event name to be passed when subscribing). Users should be able to subscribe to all events using a wildcard.

Proposal

Proposal is to define * as a wildcard for event subscriptions but as an optional feature of an event bus. Event bus implementations may choose to implement wildcard subscriptions but don't have to.

Notes

  • NATS supports wildcard subscriptions out of the box, so only the in-memory event bus must be changed to support wildcards

WIP

Change how registries work

Currently, events & commands must be registered using a factory function:

package example

type fooData struct{}

func registerEvents(r event.Registry) {
    r.Register("foo", func() event.Data { return fooData{} })
}

type fooPayload struct{}

func registerCommands(r command.Registry) {
    r.Register("foo", func() command.Payload { return fooPayload{} })
}

event.Data & command.Payload are both empty interfaces. Both registries can be combined into a single implementation that uses interface{}es instead (single implementation can also make use of generics when they land).

New design:

package codec

type Encoding interface {
    Encode(w io.Writer, name string, val interface{}) error
    Decode(name string, r io.Reader) (interface{}, error)
}

// Encoder is the encoder for a specific event data or command payload.
type Encoder interface {
    Encode(w io.Writer, interface{}) error
}

// Decoder is the decoder for a specific event data or command payload.
type Decoder interface {
    Decode(io.Reader) (interface{}, error)
}

type Registry struct { ... } // implements Encoding

// Register the encoder & decoder for specific event data or command payload.
func (r *Registry) Register(name string, enc Encoder, dec Decoder)

// Gob registers the encoding for the event data or command payload with the given name.
// It uses the provided factory function to make the event data and encodes & decodes it using encoding/gob.
//
// This is basically the previous implementation.
func (r *Registry) Gob(name string, make func() interface{})

Event subscription helper/tools

Wait for a single event to be published, then cancel the subscription

  • Done
package example

func example(ctx context.Context, bus event.Bus) {
    evt, errs, err := eventbus.Await(ctx, bus, "foo-evt")
    if err != nil {
        panic(err)
    }

    select {
    case <-ctx.Done():
        panic(ctx.Error())
    case err := <-errs:
        panic(err)
    case e := <-evt:
        log.Println(e)
    }
}

Add `aggregate.IsConsistencyError`

MongoDB event store implements its own VersionError and a IsConsistencyError() function. Aggregates that implement repository.Retryer must use the MongoDB-specific function if the MongoDB event store is used. There should be a single aggregate.IsConsistencyError function to check for consistency errors. Implementation-specific errors can then provide a function to tell the aggregate.IsConsistencyError() function if the error is a consistency error.

package mongo

type VersionError struct { ... }

func (err VersionError) IsConsistencyError() bool {
  return true
}
package example

func example() {
  var err mongo.VersionError
  aggregate.IsConsistencyError(err) == true

  var err aggregate.ConsistencyError
  aggregate.IsConsistencyError(err) == true
}

aggregate.Event helper type

package auth

import "github.com/modernice/goes/aggregate"

const UserRegistered  = aggregate.Event("auth.user.registered")

type User struct {
  *aggregate.Base
}

func (u *User) Register(email string) {
  UserRegistered.New(u, email)
}

Add a way to force-refresh projections

When a projection type embeds a *project.Progressor and the projection logic is changed, it may be necessary to invalidate the projections in the database.

Ideas

Trigger option

A Trigger option that forces a complete re-build could be provided:

package example

var schedule project.Schedule

schedule.Trigger(context.TODO(), project.Fully())

The above solution is quite low-level and would require developers to implement such re-builds themselves for every possible projection type they create.

Trigger option in combination with some kind of Schedule registry/list

package example

var schedule project.Schedule
var list project.ScheduleList

list.Trigger(context.TODO()) // trigger all Schedules in the list
list.Trigger(context.TODO(), project.Fully())

The above solution would only work within a single service.

Dedicated projection service in combination with CLI

A dedicated projection service could be provided that can be accessed through a CLI.

Projection service:

package main

func main() {
    svc := project.NewService()
    http.ListenAndServe(":8000", svc)
}

Some service with projections:

package main

func main() {
    var bus event.Bus
    var store event.Store

    client := project.NewClient(":8000")

    schedule := project.Continuously(bus, store, []string{"foo", "bar", "baz"})

    err := client.RegisterSchedule("example", schedule)
}

Rebuild using CLI:

goes projection rebuild -name example

Rebuild using API:

package main

func main() {
    var client *project.Client

    err := client.Trigger(context.TODO(), project.Fully())
}

Event-driven projection service in combination with CLI

Register Schedule in implementing service:

package main

func main() {
    var ebus event.Bus
    var estore event.Store
    var cbus command.Bus

    schedule := project.Continuously(ebus, estore, []string{"foo", "bar", "baz"})

    svc := project.NewService(cbus, project.RegisterSchedule("example", schedule))

    errs, err := svc.Run(context.TODO())
}

Rebuild using CLI:

goes projection rebuild -name example

Rebuild using API:

package main

func main() {
    var cbus command.Bus

    svc := project.NewService(cbus)

    err := svc.Trigger(context.TODO(), "example", project.Fully())
}

Soft Deletes

The builtin command to delete an aggregate deletes all events of an aggregate from the event store (useful to comply with GDPR). Normally, we don't want to actually delete events from the event store but instead publish an event that "marks" the aggregate as deleted and prevents it from being queryable without losing all the aggregate events.

Proposal

User-defined events may provide a SoftDelete() bool method that the aggregate repository will use within a query or fetch to determine if an aggregate has been soft-deleted. Within a query, a soft-deleted aggregate is simply excluded from the query result. Within a fetch, a new repository.ErrDeleted error will be returned. In order to revert the deletion (or "restore" the aggregate), events may provide a SoftRestore() bool method that "restores" the aggregate to include it in query results and to re-allow to fetch the aggregate.

Aggregate Command Handler

Goal

package example

type FooEvent struct { ... }

type BarEvent struct { ... }

type Foo struct {
  *aggregate.Base
  *command.AggregateHandler
}

type FooRepository = aggregate.TypedRepository[*Foo]

func NewFoo(uuid.UUID) *Foo {
  f := &Foo{
    Base: aggregate.New(...),
    AggregateHandler: command.NewAggregateHandler(),
  }

  command.HandleWith(f, func(ctx command.Ctx[FooData]) error {
    return f.Foo(...)
  }, "foo")

  command.HandleWith(f, func(ctx command.Ctx[BarData]) error {
    return f.Foo(...)
  }, "bar")
}

func (f *Foo) Foo() error {
  aggregate.Next(f, ...)
  return nil
}

func (f *Foo) Bar() error {
  aggregate.Next(f, ...)
  return nil
}

func example(bus command.Bus, repo FooRepository) {
  h := command.NewHandlerOf(repo, bus)

  // command names are extracted from a method on the aggregate
  errs, err := h.Handle(context.TODO())
  errs := h.MustHandle(context.TODO())
}

Find testing functions

Aggregate testing

package goestest

// Change tests that provided Aggregate has an uncommitted change with the given eventName.
func Change(_ *testing.T, _ aggregate.Aggregate, eventName string, _ ...ChangeOption)

// NoChange tests that provided Aggregate has no uncommitted change with the given eventName.
func NoChange(_ *testing.T, _ aggregate.Aggregate, eventName string, _ ...ChangeOption)

// WithEventData returns a ChangeOption that also compares the data of an uncommitted change with the provided data.
func WithEventData(data event.Data) ChangeOption {
}

Std IN / OUT

Is there a syd IN / OUT interface ?

so then “ workers” can just use this to interact with the broker.

Syd in / out is universal across any language but also simple.

the idea is for the bud to be the data plane that picks up the std in / out …

Query Cache

Implement a flexible cache for event queries.

Features

  • Automatic cache key based on the query filters
  • Time-based and manual expiry
  • Optional cache storage

Examples

Basic usage

package example

import (
  "github.com/modernice/goes/event"
  "github.com/modernice/goes/event/query"
)

func example(store event.Store, q event.Query) {
  cache := query.NewCache(store)

  result, err := cache.Result(context.TODO(), q)
  // handle err

  if result.Cached {
    log.Println("cache returned cached result")
  } else {
    log.Println("cache executed query")
  }

  events, err := streams.Drain(context.TODO(), result.Events, result.Errs)
  // handle err

  // alternatively

  str, errs, err := cache.Run(context.TODO(), q)
  // handle err

  events, err := streams.Drain(context.TODO(), str, errs)
}

contrib/auth: make `Granter` usable from non-auth services

Problem

Currently, the Granter has dependencies on ActorRepositories and RoleRepository, which, in a well designed system, are not accessible from other services than an application's auth service. This means the permission granter in its current design could/should only be used within the auth service. Implementing permissions for each service of an app within the auth service makes the auth service dependent on all services that require permissions (because the auth service must handle the permission events of the other services).

Each service should be able to implement its own permission granter that grants and revokes permissions via a command bus.

Proposal

Proposal is to remove the repository dependencies from the granter and replace them with a single CommandClient dependency. The CommandClient exposes the different grant and revoke commands as an interface so that it can be implemented using an underlying command bus but also using actor and role repositories.

package auth

type CommandClient interface {
  GrantToActor(...) error
  GrantToRole(...) error
  RevokeFromActor(...) error
  RevokeFromRole(...) error
}

Using a Command Bus

package example

func example(events []string, bus command.Bus) {
  granter := auth.NewGranter([]string{"foo", "bar", "baz"}, auth.CommandBusClient(bus))
}

Using Repositories

It is still possible to implement the granter as described above (with dependencies to other services) using repositories.

package example

func example(events []string, actors ActorRepositories, roles RoleRepository) {
  client := auth.RepositoryCommandClient(actors, roles)
  g := auth.NewGranter(events, client)
}

Authorization System

Provide a generic permission system that can be used by applications to grant and revoke aggregate-specific permissions from users.

Background

Many applications need some kind of authorization system that grants specific users permissions to do some kinds of actions. There exist many different authorization patterns and libraries out in the wild that can and should be used if the authorization requirements are more complex than this RFC tries to solve for.

Goal of this RFC is to implement a permission system for goes aggregates where users are granted permission to perform specific actions against specific aggregate instances. This should cover authorization needs for most simple to medium-complex apps.

Example

Given an ecommerce app where a customer makes an order for some products. Backend users of the ecommerce app need several different permissions to act on the order (view, delete, update etc.). The customer also needs permissions to view and update the order but has no user account. Backend users should get access to the order solely through their role, while the customer needs access through some kind of secret that is provided to the customer in the order mails. Based on this, the app requires

  • role-based authorization for backend users, and
  • action-based authorization for "API key / secret key" users.

Proposal

Proposal is to implement an authorization system that consists of three concepts:

  • Actions
  • Actors
  • Roles

An action represents any kind of action that can be performed against an aggregate. Permissions to perform an action are granted to actors and roles.

An actor represents a user of the application, which can be of any type (real-world human, system user, API key etc.).

A role represents a group of actions that are granted to actors. An actor that has a role may perform any action that was granted to that role.

Actors

An Actor represents a user within the system. An actor can be anything, from a real-world human to a system user to an API key.

Example: Account User

package example

func example(userID, orderID uuid.UUID) {
  actor := auth.NewActor(userID)
  actor.Grant("order", orderID, "view", "update", "...")
}

Example: System User

package example

func example(orderID uuid.UUID) {
  actor := auth.NewActor(uuid.New())
  actor.Grant("order", orderID, "*") // grant all actions
  actor.Grant("order", uuid.Nil, "*") // grant all actions on all orders
  actor.Grant("*", uuid.Nil, "*") // grant everything
}

Example: Secret Key User

package example

func example(secret string, orderID uuid.UUID) {
  actor := auth.NewStringActor(uuid.New())

  // string-actors must be identified before they can be granted permissions
  err := actor.Grant("order", orderID, "view")
  errors.Is(err, auth.ErrMissingID) == true

  actor.Identify(secret)
  actor.Grant("order", orderID, "view")
}

Roles

A Role grants a group of actors the permission to perform an arbitrary amount of actions.

package example

const AdminRole = "admin"

func example(orderID uuid.UUID) {
  role := auth.NewRole(uuid.New())

  // first, a role must be given a name
  role.Identify("admin")

  // then it can be granted permissions
  role.Grant("order", orderID, "view", "update", "...")
}

Actors are added to and removed from roles:

package example

func example(r *auth.Role, actors []uuid.UUID) {
  r.Add(actors...)
  r.Remove(actors...)
}

Permissions

Permissions is a read-model that projects the permissions of a specific actor from actor events and role events.

package example

func example(actorID uuid.UUID, orderID) {
  perms := auth.NewPermissions(actorID)

  // apply projection ...

  canView := perms.Allows("view", "order", orderID)
  cannotView := perms.Disallows("view", "order", orderID)
}

HTTP Middleware

HTTP middleware that can be used to add authorization to HTTP APIs is provided.

package example

func example(perms auth.PermissionRepository) {
  // create middleware that attaches the id of the current actor to the request context
  mw := middleware.Authorize(func(auth middleware.Authorizer, r *http.Request) {
    // auth.Authorize() may be called multiple times to authorize multiple actors for the current request
    auth.Authorize(auth.NewUUIDActor(<extract-user-id-from-request>))

    // middleware.Authorizer provides lookup for ids of actors other than uuid-Actors
    sid := "<extract-string-id-from-request>"
    id, err := auth.Lookup(sid)

    auth.Authorize(auth.NewStringActor(id))
  })

  // create middleware that extracts the actor id from the "fooId" JSON field of the request body
  // as a UUID and attaches it to the request context
  mw := middleware.AuthorizeField("fooId", uuid.Parse, auth.NewUUIDActor)

  // create middleware for the "view" action of the aggregate returned by the passed function.
  // if any of the authorized actors is allowed to do the action, the middleware calls the handler
  mw := middleware.Permission(perms, "view", func(r *http.Request) aggregate.Ref {
    return aggregate.Ref{Name: "foo", ID: "<extract-from-request>"}
  })

  // create middleware for the "view" action of the "foo" aggregate. the id of the aggregate is
  // extracted from the "fooId" JSON field of the request body
  mw := middleware.PermissionField(perms, "view", "foo", "fooId")

  // create middleware factory to avoid having to pass the PermissionRepository for each middleware
  f := middleware.NewFactory(perms)
  mw := f.Permission("view", func(r *http.Request) aggregate.Ref { ... })
}

Enriched command errors

Problem

Currently, the command bus has limited capabilities to handle errors returned by the command handler. Specifically, it can only retain the error message, which makes it hard to handle the error from the calling side. There's also no support for error codes, which are the typically the main value to check for when handling errors.

Requirements

The command handler must be able to add additional data to an error:

  • Error code
  • Human-readable, localized error messages
  • Debug information for development (?)

Proposal (gRPC error details)

gRPC error details

Proposal is to add support for Protocol Buffers, similar to how Connect does. An error that provides a Details() []proto.Message method can then enrich the error with arbitrary data.

For convenience, we can provide an *Error type that provides the most essential features out of the box.

Command handler

package example

func example() {
  var underlyingError error // the actual error that made the command fail
  var code int // the custom error code
  var details []proto.Message

  err := command.NewError(code, underlyingError, command.WithErrorDetail(details...)) // *command.Err

  // err.Code() == code
  // err.Error() == err.Error()

  for i, d := range err.Details() {
    // d == *command.ErrorDetail{...}
    v, err := d.Value()
    if err != nil {
      panic(fmt.Errorf("failed to unmarshal error detail: %w", err))
    }

    // v == details[i] // not actually the same instance but a copy
  }
}

Command dispatcher

package example

func example(bus command.Bus, cmd command.Command) {
  err := bus.Dispatch(context.TODO(), cmd, dispatch.Sync())

  cerr := command.Error(err) // parse the error

  // cerr == *command.Err{...}
  // cerr.Code() == int(...)
  // cerr.Message() == "..."

  // Either manually iterate the details
  for i, d := range cerr.Details() {
    v, err := d.Value()
    if err != nil {
      panic(fmt.Errorf("failed to unmarshal error detail: %w", err))
    }

    switch v := v.(type) {
    case *errdetails.LocalizedMessage:
      log.Println(v.GetLocale(), v.GetMessage())
    }
  }

  // Or use provided methods on the *Error type
  msg, ok := cerr.LocalizedMessage("en-US")
  // msg == "..."
}

Allow use of custom event for builtin.AggregateDeleted

The builtin.AggregateDeleted event is weird to use when trying to subscribe to the deletion of a specific aggregate type. Users should be able to provide a custom event for each aggregate type which is published instead of or with the builtin event.

Concurrent command handling

Add options to enable concurrent command handling.

Standalone command handler

package example

func example(bus command.Bus) {
  h := command.NewHandler(bus)

  h.Handle(context.TODO(), "foo", func(ctx command.Context) error { ... }, command.Workers(4))
}

Aggregate-based command handler

package example

type Foo struct {
  *aggregate.Base
  *handler.BaseHandler
}

func NewFoo(id uuid.UUID) *Foo { ... }

func example(bus command.Bus, repo aggregate.Repository) {
  h := handler.New(NewFoo, repo, bus)

  h.Handle(context.TODO(), command.Workers(4))
}

Code generation

Provide code generation for:

  • Aggregate repositories
  • Commands

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.