Comments (41)
The $all
stream contains a lot of metadata as well as information about projections, stats of the event store, ACL configs, etc. You should never subscribe to $all
or read from it (in 99% of cases).
What you should do instead, is create a projection for all streams that you are interested in and subscribe to this. Remember to enable the category projection first. This could look like this:
fromStreams('$ce-user', '$ce-blog', '$ce-billing', '$ce-shipping')
.when({
$any: function (s, e) {
linkTo('everything', e);
}
})
then you can subscribe to the everything
stream.
from event-store-client.
createPersistentSubscriptionAsync
-> connectToPersistentSubscriptionAsync
Persistent, see? The other ones are called volatile subscriptions (they are not persisted). The benefit of persistent subscriptions are, that they store current position, so when your subscription stops / server goes down / whatever else, then you can continue at the same position. The downside is, they are slightly slower, because the EventStore has to track the current position and has to wait for ack / nack messages.
from event-store-client.
I was planning to work on docs pretty soon. I need to check my workstation, maybe I have some (non-working, haha) blueprints somewhere. I will update you.
from event-store-client.
I'm fine with non-working.
from event-store-client.
Quick and dirty, no promises made, 4 classes!
Maybe it's working, maybe it's not, maybe it's a good implementation, maybe it's bad!
Just to get an idea!
MessageTransformer
<?php
declare(strict_types=1);
namespace Prooph\EventSourcing\Demo;
use Prooph\EventStoreClient\EventData;
use Prooph\EventStoreClient\EventId;
use Prooph\EventStoreClient\ResolvedEvent;
class MessageTransformer
{
/**
* key = event type
* value = event class name
* @var array
*/
protected $map;
// key = event-type, value = event class name
public function __construct(array $map)
{
$this->map = $map;
}
public function toMessage(ResolvedEvent $event): DomainEvent
{
$event = $event->originalEvent();
$eventType = $event->eventType();
$payload = \json_decode($event->data(), true);
if (! isset($this->map[$eventType])) {
throw new \RuntimeException('No event class for type ' . $eventType . ' given');
}
$class = $this->map[$eventType];
return $class::fromArray($payload);
}
public function toEventData(DomainEvent $event): EventData
{
return new EventData(
EventId::generate(),
$event->eventType(),
true,
\json_encode($event->toArray())
);
}
}
DomainEvent
<?php
declare(strict_types=1);
namespace Prooph\EventSourcing\Demo;
interface DomainEvent
{
public function eventType(): string;
public function toArray(): array;
public function fromArray(array $data): self;
}
AggregateRoot
<?php
declare(strict_types=1);
namespace Prooph\EventSourcing\Demo;
use Prooph\EventStoreClient\ExpectedVersion;
abstract class AggregateRoot
{
/** @var int */
protected $expectedVersion = ExpectedVersion::EMPTY_STREAM;
/**
* List of events that are not committed to the EventStore
*
* @var DomainEvent[]
*/
protected $recordedEvents = [];
/**
* We do not allow public access to __construct, this way we make sure that an aggregate root can only
* be constructed by static factories
*/
protected function __construct()
{
}
public function expectedVersion(): int
{
return $this->expectedVersion;
}
public function setExpectedVersion(int $version): void
{
$this->expectedVersion = $version;
}
/**
* Get pending events and reset stack
*
* @return DomainEvent[]
*/
public function popRecordedEvents(): array
{
$pendingEvents = $this->recordedEvents;
$this->recordedEvents = [];
return $pendingEvents;
}
/**
* Record an aggregate changed event
*/
protected function recordThat(DomainEvent $event): void
{
$this->recordedEvents[] = $event;
$this->apply($event);
}
public static function reconstituteFromHistory(array $historyEvents): self
{
$instance = new static();
$instance->replay($historyEvents);
return $instance;
}
/**
* Replay past events
*
* @param DomainEvent[]
*/
public function replay(array $historyEvents): void
{
foreach ($historyEvents as $pastEvent) {
/** @var DomainEvent $pastEvent */
$this->apply($pastEvent);
}
}
abstract public function aggregateId(): string;
/**
* Apply given event
*/
abstract protected function apply(DomainEvent $event): void;
}
AggregateRepository
<?php
declare(strict_types=1);
namespace Prooph\EventSourcing\Demo;
use Prooph\EventSourcing\MessageTransformer;
use Prooph\EventStoreClient\EventStoreSyncConnection;
use Prooph\EventStoreClient\ExpectedVersion;
use Prooph\EventStoreClient\Internal\Consts;
use Prooph\EventStoreClient\SliceReadStatus;
use Prooph\EventStoreClient\UserCredentials;
class AggregateRepository
{
/** @var EventStoreSyncConnection */
protected $eventStoreConnection;
/** @var MessageTransformer */
protected $transformer;
/** @var string */
protected $streamCategory;
/** @var string */
protected $aggregateRootClassName;
/** @var bool */
protected $optimisticConcurrency;
public function __construct(
EventStoreSyncConnection $eventStoreConnection,
MessageTransformer $transformer,
string $streamCategory,
string $aggregateRootClassName,
bool $useOptimisticConcurrencyByDefault = true
) {
$this->eventStoreConnection = $eventStoreConnection;
$this->transformer = $transformer;
$this->streamCategory = $streamCategory;
$this->aggregateRootClassName = $aggregateRootClassName;
$this->optimisticConcurrency = $useOptimisticConcurrencyByDefault;
}
public function saveAggregateRoot(
AggregateRoot $aggregateRoot,
int $expectedVersion = null,
UserCredentials $credentials = null
): void {
$domainEvents = $aggregateRoot->popRecordedEvents();
if (empty($domainEvents)) {
return;
}
$aggregateId = $aggregateRoot->aggregateId();
$stream = $this->streamCategory . '-' . $aggregateId;
$eventData = [];
foreach ($domainEvents as $event) {
$eventData[] = $this->transformer->toEventData($event);
}
if (null === $expectedVersion) {
$expectedVersion = $this->optimisticConcurrency
? $aggregateRoot->expectedVersion()
: ExpectedVersion::ANY;
}
$writeResult = $this->eventStoreConnection->appendToStream(
$stream,
$expectedVersion,
$eventData,
$credentials
);
$aggregateRoot->setExpectedVersion($writeResult->nextExpectedVersion());
}
/**
* Returns null if no stream events can be found for aggregate root otherwise the reconstituted aggregate root
*/
public function getAggregateRoot(string $aggregateId, UserCredentials $credentials = null): ?object
{
$stream = $this->streamCategory . '-' . $aggregateId;
$start = 0;
$count = Consts::MAX_READ_SIZE;
do {
$events = [];
$streamEventsSlice = $this->eventStoreConnection->readStreamEventsForward(
$stream,
$start,
$count,
true,
$credentials
);
if (! $streamEventsSlice->status()->equals(SliceReadStatus::success())) {
return null;
}
$start = $streamEventsSlice->nextEventNumber();
foreach ($streamEventsSlice->events() as $event) {
$events[] = $this->transformer->toMessage($event);
}
if (isset($aggregateRoot)) {
/** @var AggregateRoot $aggregateRoot */
$aggregateRoot->replay($events);
} else {
$className = $this->aggregateRootClassName;
$aggregateRoot = $className::reconstituteFromHistory($events);
}
} while (! $streamEventsSlice->isEndOfStream());
$aggregateRoot->setExpectedVersion($streamEventsSlice->lastEventNumber());
return $aggregateRoot;
}
}
from event-store-client.
@prolic What about the projection side? Do you use the built in GY ES projections as read models? is that performant enough compared to other data stores?
We currently still use prooph components and just replaced the PostgresEventStore
with our own class that uses a GY ES client but that involved writing our own projectors, read model projectors and managers and we still have some lingering issues.
from event-store-client.
from event-store-client.
Thank you @prolic! I should be able to adapt this to my EventMachine-inspired aggregates.
Some example for projections would be useful as well though I don't need it right away.
from event-store-client.
One more thing. When and how should I dispatch my events to EventBus? In Prooph v7 this was done by EventPublisher which is an event store plugin. But I don't see any similar concept in the event-store-client.
from event-store-client.
ServiceBus and EventPublisher will no longer be maintained (until bugfix support until end of next year), so you have to find your own solution for this. Some ideas:
- Have your own service-bus + event-publisher-plugin (or modify the one from prooph to your needs)
- Do it in your aggregate repostiory (either by adding a publish events feature or by having a plugin system in place there)
from event-store-client.
ServiceBus and EventPublisher will no longer be maintained (until bugfix support until end of next year), so you have to find your own solution for this.
Yeah I know. My intention is to use service-bus for now and later try to migrate to something else (symfony/messenger possibly). Thanks for your ideas.
from event-store-client.
@enumag Keeping the same existing tools was another reason why we wrote our TransactionalEventstore
client that calls the GYES client itself. We still use the prooph service-bus + event-publisher-plugin and inject our EventStore on the prooph_event_store
config. That part was relatively easy. The one we continue to struggle is the projection side where we still want to keep using postgress as our read models. That was/still is a source of bugs given how many classes we had to copy/overwrite.
We also have another microservice where we started using the symfony messenger component as service-bus. But that one is not event sourced just CQRS and it worked right off the bat with minimun changes to our Prooph way of building services. We still need to figure out the projection/read model side there too for a full CQRS/ES integration but writing plugins for it is relatively easy.
from event-store-client.
The one we continue to struggle is the projection side where we still want to keep using postgress as our read models. That was/still is a source of bugs given how many classes we had to copy/overwrite.
You used the Prooph v7 event-store with GYES, right? Why are projections in postgres a problem? I mean the fact that it's GYES should not matter since it's behind an interface and postgres projections are not a problem with Prooph v7.
from event-store-client.
@enumag Because the streams are not in postgres, so the projection classes needed to be updated to read from the GYES stream. The built in projection classes only read from the data store you have configured so they are very coupled.
We had to create a PdoEventStoreReadModelProjector
class that reads from GYES store and projects to postgres.
from event-store-client.
from event-store-client.
@prolic Okay, I successfully implemented a layer to save aggregates using event-store-client and load them. I think I can implement an equivalent to EventPublisher too without a problem so I can start refactoring my domain now.
Next I'd like to know how to implement projections. Prooph v7 had a special table in the event-store database to save the state of each projection. How is projection state handled with GYES? Does GYES implement that on its own or do we need to do it ourselves? What do you mean by persistent and volatile subscriptions? Can't find that in the docs. Does it have something to do with this or not?
EDIT: Never mind, I need to read these first:
https://eventstore.org/docs/getting-started/reading-subscribing-events/index.html
https://eventstore.org/blog/20130306/getting-started-part-3-subscriptions/
from event-store-client.
from event-store-client.
So how do I subscribe to all events? (Or at least all events that my application generated, I don't care about eventstore's internal events.) In your test case you expect an exception in that case. Or is there something else than $all
that I should use?
from event-store-client.
If I understand the code correctly I can create the projection using ProjectionManager::createContinuousAsync()
. Alright, I'll try it later. Thanks for the advice!
from event-store-client.
Maybe it's easier to create the projection through the UI. You only need the API to create stuff like this fully automated. But even then I would create it originally in the UI the first time.
from event-store-client.
The problem with creating it in the UI is that you have no version control over it and having to manually create it for local dev, CI/CD and Prod becomes a pain.
from event-store-client.
Yeah, it needs to be a part of some migration script...
from event-store-client.
@callistino actually the projections are all versioned within the event store, no information is ever lost.
It's of course a little different if you have multiple event stores (local dev / staging / production). But I didn't tell you not to create it by code, only you create it FIRST in the UI. You can then go ahead and copy / paste to your source code. The UI is really great for this, because you can debug it there directly and test.
from event-store-client.
Ok I'm slightly confused...
My application is saving data into streams with naming convention like <boundedContext>.<aggregateType>.<guid>
.Is that ok in your opinion or would you recommend something else?
Now I need to create a projection from all of these streams. In your code above you have this:
fromStreams('$ce-user', '$ce-blog', '$ce-billing', '$ce-shipping')
How do I create these $ce-*
streams? According to the documentation these streams are categories. But how do I create a "category" and specify which streams belong to it? Are these some "best practices" what categories I should create?
from event-store-client.
@enumag See here: https://eventstore.org/docs/projections/system-projections/index.html
You can go to localhost:2113, click projections and then enable "$by_category", it's shipped by default. That's it.
from event-store-client.
You can go to localhost:2113, click projections and then enable "$by_category", it's shipped by default. That's it.
Can I enable it with API somehow?
from event-store-client.
you can tell EventStore to start them on server start, see https://eventstore.org/docs/server/command-line-arguments/index.html
from event-store-client.
Wait what? There is nothing about categories on that page... Which option am I looking for? START_STANDARD_PROJECTIONS
? Or something else?
from event-store-client.
yes
from event-store-client.
or like this: ./run-node.sh --run-projections=all
from event-store-client.
or like this: ./run-node.sh --run-projections=all
No that's definitely not it. I'm starting the docker conatiner with
environment:
- EVENTSTORE_RUN_PROJECTIONS=all
and I don't have any $ce
streams there.
from event-store-client.
then put this: EVENTSTORE_START_STANDARD_PROJECTIONS=true
from event-store-client.
Will do. Thanks!
from event-store-client.
Ok. After fixing some problems (see the issues and PRs I created) the standard projections are up and running.
Do you think the category streams should be per-bounded-context or per-aggregate-type? Just looking for best-practices. Also what format would you recommend for stream IDs and eventIDs?
from event-store-client.
Ok I have the everything
projection up.
Next, how do I use the subscriptions? First I most likely need to use \Prooph\EventStoreClient\Internal\EventStoreAsyncNodeConnection::createPersistentSubscriptionAsync()
. But what then? There is:
subscribeToStreamAsync
subscribeToStreamFromAsync
subscribeToAllAsync
subscribeToAllFromAsync
connectToPersistentSubscriptionAsync
How should I choose between these?
from event-store-client.
EventId => only uuid is allowed
StreamId => user-afbaab1d-121e-4471-9a61-cffcfd1af4c6, blog-733425da-079c-46fc-a422-a997a679dee1, etc. One stream for each aggregate.
from event-store-client.
StreamId => user-afbaab1d-121e-4471-9a61-cffcfd1af4c6
@prolic Yeah, that's what I've been using myself at first but I wanted to add bounded context name as prefix and separate the type name (user
in this case) from the uuid by something else than a dash since dash is already used in the uuid. That's why I went and changed the $by_category
projection. So in the end I'll use something like this as my stream IDs:
Security/User.afbaab1d-121e-4471-9a61-cffcfd1af4c6
from event-store-client.
Don't change the $by_category
! Don't use it if you don't like it instead and write your own.
For your use-case I would suggest this instead: Security.User-afbaab1d-121e-4471-9a61-cffcfd1af4c6
That there is a dash within uuid as well, doesn't matter. The category is the part of the string until the first dash.
from event-store-client.
Ok I'll consider sticking to the dash. Thanks for your opinion!
from event-store-client.
Documentation is now available: https://github.com/prooph/documentation
I'm closing this issue
from event-store-client.
The documentation seems to mostly be sort of a copy of the eventstore.org documentation which I found quite lacking. What is needed here is some guide how to actually use it in a CQRS environment with the message buses, aggregates etc.
I had that mostly done on my end but now with this async-only thing I'm lost again.
from event-store-client.
Related Issues (20)
- Unclear message when trying to communicate through closed connection
- Support EventStore v6 HOT 3
- Support UUID1 / UUID5, ... HOT 3
- RC not compatible with any release of "prooph/event-sourcing" HOT 1
- Add psalm HOT 1
- [RFC] Adding PHPdoc to generate API docs HOT 2
- Issue with simple examples and event store 5 HOT 1
- Travis failed (ramsey uuid 4) HOT 9
- [RFC] Removing "type" from projections management HOT 1
- Change constructors for http communications HOT 3
- EventStorePersistentSubscription is in namespace internal but required to be used HOT 1
- Connection process cannot be "waited" HOT 7
- MemberInfoDto timeStamp is a string, not int as the code suggests for a v5 cluster HOT 2
- Event Store is now secure by default HOT 5
- subscribetoAllAsync fails with BadRequest HOT 3
- Trying the example with server version 20.10.0.0, but can't connect HOT 2
- Segmentation fault HOT 3
- Incompatibility with Laravel 8 HOT 1
- Prooph\EventStore\Exception\CannotEstablishConnection Cannot resolve target end point HOT 11
- performOnMasterOnly property doesn't seem to work HOT 3
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from event-store-client.