Giter Club home page Giter Club logo

Comments (41)

prolic avatar prolic commented on May 24, 2024 1

The $all stream contains a lot of metadata as well as information about projections, stats of the event store, ACL configs, etc. You should never subscribe to $all or read from it (in 99% of cases).

What you should do instead, is create a projection for all streams that you are interested in and subscribe to this. Remember to enable the category projection first. This could look like this:

fromStreams('$ce-user', '$ce-blog', '$ce-billing', '$ce-shipping')
.when({
    $any: function (s, e) { 
        linkTo('everything', e);
    }
})

then you can subscribe to the everything stream.

from event-store-client.

prolic avatar prolic commented on May 24, 2024 1

createPersistentSubscriptionAsync -> connectToPersistentSubscriptionAsync

Persistent, see? The other ones are called volatile subscriptions (they are not persisted). The benefit of persistent subscriptions are, that they store current position, so when your subscription stops / server goes down / whatever else, then you can continue at the same position. The downside is, they are slightly slower, because the EventStore has to track the current position and has to wait for ack / nack messages.

from event-store-client.

prolic avatar prolic commented on May 24, 2024

I was planning to work on docs pretty soon. I need to check my workstation, maybe I have some (non-working, haha) blueprints somewhere. I will update you.

from event-store-client.

enumag avatar enumag commented on May 24, 2024

I'm fine with non-working. 😄 Thanks!

from event-store-client.

prolic avatar prolic commented on May 24, 2024

Quick and dirty, no promises made, 4 classes!

Maybe it's working, maybe it's not, maybe it's a good implementation, maybe it's bad!

Just to get an idea!

  1. MessageTransformer
<?php

declare(strict_types=1);

namespace Prooph\EventSourcing\Demo;

use Prooph\EventStoreClient\EventData;
use Prooph\EventStoreClient\EventId;
use Prooph\EventStoreClient\ResolvedEvent;

class MessageTransformer
{
    /**
     * key = event type
     * value = event class name
     * @var array
     */
    protected $map;

    // key = event-type, value = event class name
    public function __construct(array $map)
    {
        $this->map = $map;
    }

    public function toMessage(ResolvedEvent $event): DomainEvent
    {
        $event = $event->originalEvent();
        $eventType = $event->eventType();

        $payload = \json_decode($event->data(), true);

        if (! isset($this->map[$eventType])) {
            throw new \RuntimeException('No event class for type ' . $eventType . ' given');
        }

        $class = $this->map[$eventType];

        return $class::fromArray($payload);
    }

    public function toEventData(DomainEvent $event): EventData
    {
        return new EventData(
            EventId::generate(),
            $event->eventType(),
            true,
            \json_encode($event->toArray())
        );
    }
}
  1. DomainEvent
<?php

declare(strict_types=1);

namespace Prooph\EventSourcing\Demo;

interface DomainEvent
{
    public function eventType(): string;

    public function toArray(): array;

    public function fromArray(array $data): self;
}
  1. AggregateRoot
<?php

declare(strict_types=1);

namespace Prooph\EventSourcing\Demo;

use Prooph\EventStoreClient\ExpectedVersion;

abstract class AggregateRoot
{
    /** @var int */
    protected $expectedVersion = ExpectedVersion::EMPTY_STREAM;

    /**
     * List of events that are not committed to the EventStore
     *
     * @var DomainEvent[]
     */
    protected $recordedEvents = [];

    /**
     * We do not allow public access to __construct, this way we make sure that an aggregate root can only
     * be constructed by static factories
     */
    protected function __construct()
    {
    }

    public function expectedVersion(): int
    {
        return $this->expectedVersion;
    }

    public function setExpectedVersion(int $version): void
    {
        $this->expectedVersion = $version;
    }

    /**
     * Get pending events and reset stack
     *
     * @return DomainEvent[]
     */
    public function popRecordedEvents(): array
    {
        $pendingEvents = $this->recordedEvents;

        $this->recordedEvents = [];

        return $pendingEvents;
    }

    /**
     * Record an aggregate changed event
     */
    protected function recordThat(DomainEvent $event): void
    {
        $this->recordedEvents[] = $event;

        $this->apply($event);
    }

    public static function reconstituteFromHistory(array $historyEvents): self
    {
        $instance = new static();
        $instance->replay($historyEvents);

        return $instance;
    }

    /**
     * Replay past events
     *
     * @param DomainEvent[]
     */
    public function replay(array $historyEvents): void
    {
        foreach ($historyEvents as $pastEvent) {
            /** @var DomainEvent $pastEvent */
            $this->apply($pastEvent);
        }
    }

    abstract public function aggregateId(): string;

    /**
     * Apply given event
     */
    abstract protected function apply(DomainEvent $event): void;
}
  1. AggregateRepository
<?php

declare(strict_types=1);

namespace Prooph\EventSourcing\Demo;

use Prooph\EventSourcing\MessageTransformer;
use Prooph\EventStoreClient\EventStoreSyncConnection;
use Prooph\EventStoreClient\ExpectedVersion;
use Prooph\EventStoreClient\Internal\Consts;
use Prooph\EventStoreClient\SliceReadStatus;
use Prooph\EventStoreClient\UserCredentials;

class AggregateRepository
{
    /** @var EventStoreSyncConnection */
    protected $eventStoreConnection;
    /** @var MessageTransformer */
    protected $transformer;
    /** @var string */
    protected $streamCategory;
    /** @var string */
    protected $aggregateRootClassName;
    /** @var bool */
    protected $optimisticConcurrency;

    public function __construct(
        EventStoreSyncConnection $eventStoreConnection,
        MessageTransformer $transformer,
        string $streamCategory,
        string $aggregateRootClassName,
        bool $useOptimisticConcurrencyByDefault = true
    ) {
        $this->eventStoreConnection = $eventStoreConnection;
        $this->transformer = $transformer;
        $this->streamCategory = $streamCategory;
        $this->aggregateRootClassName = $aggregateRootClassName;
        $this->optimisticConcurrency = $useOptimisticConcurrencyByDefault;
    }

    public function saveAggregateRoot(
        AggregateRoot $aggregateRoot,
        int $expectedVersion = null,
        UserCredentials $credentials = null
    ): void {
        $domainEvents = $aggregateRoot->popRecordedEvents();

        if (empty($domainEvents)) {
            return;
        }

        $aggregateId = $aggregateRoot->aggregateId();
        $stream = $this->streamCategory . '-' . $aggregateId;

        $eventData = [];

        foreach ($domainEvents as $event) {
            $eventData[] = $this->transformer->toEventData($event);
        }

        if (null === $expectedVersion) {
            $expectedVersion = $this->optimisticConcurrency
                ? $aggregateRoot->expectedVersion()
                : ExpectedVersion::ANY;
        }

        $writeResult = $this->eventStoreConnection->appendToStream(
            $stream,
            $expectedVersion,
            $eventData,
            $credentials
        );

        $aggregateRoot->setExpectedVersion($writeResult->nextExpectedVersion());
    }

    /**
     * Returns null if no stream events can be found for aggregate root otherwise the reconstituted aggregate root
     */
    public function getAggregateRoot(string $aggregateId, UserCredentials $credentials = null): ?object
    {
        $stream = $this->streamCategory . '-' . $aggregateId;

        $start = 0;
        $count = Consts::MAX_READ_SIZE;

        do {
            $events = [];

            $streamEventsSlice = $this->eventStoreConnection->readStreamEventsForward(
                $stream,
                $start,
                $count,
                true,
                $credentials
            );

            if (! $streamEventsSlice->status()->equals(SliceReadStatus::success())) {
                return null;
            }

            $start = $streamEventsSlice->nextEventNumber();

            foreach ($streamEventsSlice->events() as $event) {
                $events[] = $this->transformer->toMessage($event);
            }

            if (isset($aggregateRoot)) {
                /** @var AggregateRoot $aggregateRoot */
                $aggregateRoot->replay($events);
            } else {
                $className = $this->aggregateRootClassName;
                $aggregateRoot = $className::reconstituteFromHistory($events);
            }
        } while (! $streamEventsSlice->isEndOfStream());

        $aggregateRoot->setExpectedVersion($streamEventsSlice->lastEventNumber());

        return $aggregateRoot;
    }
}

from event-store-client.

callistino avatar callistino commented on May 24, 2024

@prolic What about the projection side? Do you use the built in GY ES projections as read models? is that performant enough compared to other data stores?

We currently still use prooph components and just replaced the PostgresEventStore with our own class that uses a GY ES client but that involved writing our own projectors, read model projectors and managers and we still have some lingering issues.

from event-store-client.

prolic avatar prolic commented on May 24, 2024

from event-store-client.

enumag avatar enumag commented on May 24, 2024

Thank you @prolic! I should be able to adapt this to my EventMachine-inspired aggregates.

Some example for projections would be useful as well though I don't need it right away.

from event-store-client.

enumag avatar enumag commented on May 24, 2024

One more thing. When and how should I dispatch my events to EventBus? In Prooph v7 this was done by EventPublisher which is an event store plugin. But I don't see any similar concept in the event-store-client.

from event-store-client.

prolic avatar prolic commented on May 24, 2024

ServiceBus and EventPublisher will no longer be maintained (until bugfix support until end of next year), so you have to find your own solution for this. Some ideas:

  1. Have your own service-bus + event-publisher-plugin (or modify the one from prooph to your needs)
  2. Do it in your aggregate repostiory (either by adding a publish events feature or by having a plugin system in place there)

from event-store-client.

enumag avatar enumag commented on May 24, 2024

ServiceBus and EventPublisher will no longer be maintained (until bugfix support until end of next year), so you have to find your own solution for this.

Yeah I know. My intention is to use service-bus for now and later try to migrate to something else (symfony/messenger possibly). Thanks for your ideas.

from event-store-client.

callistino avatar callistino commented on May 24, 2024

@enumag Keeping the same existing tools was another reason why we wrote our TransactionalEventstore client that calls the GYES client itself. We still use the prooph service-bus + event-publisher-plugin and inject our EventStore on the prooph_event_store config. That part was relatively easy. The one we continue to struggle is the projection side where we still want to keep using postgress as our read models. That was/still is a source of bugs given how many classes we had to copy/overwrite.

We also have another microservice where we started using the symfony messenger component as service-bus. But that one is not event sourced just CQRS and it worked right off the bat with minimun changes to our Prooph way of building services. We still need to figure out the projection/read model side there too for a full CQRS/ES integration but writing plugins for it is relatively easy.

from event-store-client.

enumag avatar enumag commented on May 24, 2024

The one we continue to struggle is the projection side where we still want to keep using postgress as our read models. That was/still is a source of bugs given how many classes we had to copy/overwrite.

You used the Prooph v7 event-store with GYES, right? Why are projections in postgres a problem? I mean the fact that it's GYES should not matter since it's behind an interface and postgres projections are not a problem with Prooph v7.

from event-store-client.

callistino avatar callistino commented on May 24, 2024

@enumag Because the streams are not in postgres, so the projection classes needed to be updated to read from the GYES stream. The built in projection classes only read from the data store you have configured so they are very coupled.

We had to create a PdoEventStoreReadModelProjector class that reads from GYES store and projects to postgres.

from event-store-client.

prolic avatar prolic commented on May 24, 2024

from event-store-client.

enumag avatar enumag commented on May 24, 2024

@prolic Okay, I successfully implemented a layer to save aggregates using event-store-client and load them. I think I can implement an equivalent to EventPublisher too without a problem so I can start refactoring my domain now.

Next I'd like to know how to implement projections. Prooph v7 had a special table in the event-store database to save the state of each projection. How is projection state handled with GYES? Does GYES implement that on its own or do we need to do it ourselves? What do you mean by persistent and volatile subscriptions? Can't find that in the docs. Does it have something to do with this or not?

EDIT: Never mind, I need to read these first:

https://eventstore.org/docs/getting-started/reading-subscribing-events/index.html
https://eventstore.org/blog/20130306/getting-started-part-3-subscriptions/

from event-store-client.

prolic avatar prolic commented on May 24, 2024

from event-store-client.

enumag avatar enumag commented on May 24, 2024

So how do I subscribe to all events? (Or at least all events that my application generated, I don't care about eventstore's internal events.) In your test case you expect an exception in that case. Or is there something else than $all that I should use?

from event-store-client.

enumag avatar enumag commented on May 24, 2024

If I understand the code correctly I can create the projection using ProjectionManager::createContinuousAsync(). Alright, I'll try it later. Thanks for the advice!

from event-store-client.

prolic avatar prolic commented on May 24, 2024

Maybe it's easier to create the projection through the UI. You only need the API to create stuff like this fully automated. But even then I would create it originally in the UI the first time.

from event-store-client.

callistino avatar callistino commented on May 24, 2024

The problem with creating it in the UI is that you have no version control over it and having to manually create it for local dev, CI/CD and Prod becomes a pain.

from event-store-client.

enumag avatar enumag commented on May 24, 2024

Yeah, it needs to be a part of some migration script...

from event-store-client.

prolic avatar prolic commented on May 24, 2024

@callistino actually the projections are all versioned within the event store, no information is ever lost.
It's of course a little different if you have multiple event stores (local dev / staging / production). But I didn't tell you not to create it by code, only you create it FIRST in the UI. You can then go ahead and copy / paste to your source code. The UI is really great for this, because you can debug it there directly and test.

from event-store-client.

enumag avatar enumag commented on May 24, 2024

Ok I'm slightly confused...

My application is saving data into streams with naming convention like <boundedContext>.<aggregateType>.<guid>.Is that ok in your opinion or would you recommend something else?

Now I need to create a projection from all of these streams. In your code above you have this:

fromStreams('$ce-user', '$ce-blog', '$ce-billing', '$ce-shipping')

How do I create these $ce-* streams? According to the documentation these streams are categories. But how do I create a "category" and specify which streams belong to it? Are these some "best practices" what categories I should create?

from event-store-client.

prolic avatar prolic commented on May 24, 2024

@enumag See here: https://eventstore.org/docs/projections/system-projections/index.html

You can go to localhost:2113, click projections and then enable "$by_category", it's shipped by default. That's it.

from event-store-client.

enumag avatar enumag commented on May 24, 2024

You can go to localhost:2113, click projections and then enable "$by_category", it's shipped by default. That's it.

Can I enable it with API somehow?

from event-store-client.

prolic avatar prolic commented on May 24, 2024

you can tell EventStore to start them on server start, see https://eventstore.org/docs/server/command-line-arguments/index.html

from event-store-client.

enumag avatar enumag commented on May 24, 2024

Wait what? There is nothing about categories on that page... Which option am I looking for? START_STANDARD_PROJECTIONS? Or something else?

from event-store-client.

prolic avatar prolic commented on May 24, 2024

yes

from event-store-client.

prolic avatar prolic commented on May 24, 2024

or like this: ./run-node.sh --run-projections=all

from event-store-client.

enumag avatar enumag commented on May 24, 2024

or like this: ./run-node.sh --run-projections=all

No that's definitely not it. I'm starting the docker conatiner with

        environment:
            - EVENTSTORE_RUN_PROJECTIONS=all

and I don't have any $ce streams there.

from event-store-client.

prolic avatar prolic commented on May 24, 2024

then put this: EVENTSTORE_START_STANDARD_PROJECTIONS=true

from event-store-client.

enumag avatar enumag commented on May 24, 2024

Will do. Thanks!

from event-store-client.

enumag avatar enumag commented on May 24, 2024

Ok. After fixing some problems (see the issues and PRs I created) the standard projections are up and running.

Do you think the category streams should be per-bounded-context or per-aggregate-type? Just looking for best-practices. Also what format would you recommend for stream IDs and eventIDs?

from event-store-client.

enumag avatar enumag commented on May 24, 2024

Ok I have the everything projection up.

Next, how do I use the subscriptions? First I most likely need to use \Prooph\EventStoreClient\Internal\EventStoreAsyncNodeConnection::createPersistentSubscriptionAsync(). But what then? There is:

  • subscribeToStreamAsync
  • subscribeToStreamFromAsync
  • subscribeToAllAsync
  • subscribeToAllFromAsync
  • connectToPersistentSubscriptionAsync

How should I choose between these?

from event-store-client.

prolic avatar prolic commented on May 24, 2024

EventId => only uuid is allowed
StreamId => user-afbaab1d-121e-4471-9a61-cffcfd1af4c6, blog-733425da-079c-46fc-a422-a997a679dee1, etc. One stream for each aggregate.

from event-store-client.

enumag avatar enumag commented on May 24, 2024

StreamId => user-afbaab1d-121e-4471-9a61-cffcfd1af4c6

@prolic Yeah, that's what I've been using myself at first but I wanted to add bounded context name as prefix and separate the type name (user in this case) from the uuid by something else than a dash since dash is already used in the uuid. That's why I went and changed the $by_category projection. So in the end I'll use something like this as my stream IDs:

Security/User.afbaab1d-121e-4471-9a61-cffcfd1af4c6

from event-store-client.

prolic avatar prolic commented on May 24, 2024

Don't change the $by_category! Don't use it if you don't like it instead and write your own.

For your use-case I would suggest this instead: Security.User-afbaab1d-121e-4471-9a61-cffcfd1af4c6

That there is a dash within uuid as well, doesn't matter. The category is the part of the string until the first dash.

from event-store-client.

enumag avatar enumag commented on May 24, 2024

Ok I'll consider sticking to the dash. Thanks for your opinion!

from event-store-client.

prolic avatar prolic commented on May 24, 2024

Documentation is now available: https://github.com/prooph/documentation

I'm closing this issue

from event-store-client.

enumag avatar enumag commented on May 24, 2024

The documentation seems to mostly be sort of a copy of the eventstore.org documentation which I found quite lacking. What is needed here is some guide how to actually use it in a CQRS environment with the message buses, aggregates etc.

I had that mostly done on my end but now with this async-only thing I'm lost again.

from event-store-client.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.