Comments (13)
$subscription = yield $connection->subscribeToStreamAsync(....);
$shutdown = static function () use ($subscription, $connection, $logger) {
$logger->info('Received SIGINT or SIGTERM - shutting down');
$subscription->close();
$connection->close();
};
// Stop the server when SIGINT or SIGTERM is received
Loop::unreference(Loop::onSignal(SIGINT, $shutdown));
Loop::unreference(Loop::onSignal(SIGTERM, $shutdown));
That should do the trick.
from event-store-client.
@prolic that solution doesn't appear to work, I was already experimenting with that.
The problem is that $subscription->stop()
immediately returns an \Amp\Success
, even if the subscription is still in the midst of processing events. If you then also immediately close the connection you'll get a connection failure in EventStoreConnectionLogicHandler
on line 683 since it will try to restart the subscription.
The steps to reproduce:
- Have at least two events in your stream
- Call
subscribeToStreamAsync()
withlastCheckpoint
asnull
. - In the
EventAppearedOnCatchupSubscription
return a\Amp\Delayed
set to something like 5 seconds. - Run the process
- Once it read the first event, do a CTRL+C or send a SIGTERM to the process before the
Delayed
promise is resolved. - The
onSignal()
will call$subscription->stop()
and$connection->close()
. - The process keeps running until the
Delayed
promise is resolved. - Once the
Delayed
promise is resolved, the subscription will actually continue processing the next event even though the subscription it was stopped. - Once the last event is processed the subscription does a new
$this->subscribeToStreamAsync()
on its own inloadHistoricalEventsAsync()
which fails because the connection was closed and returns a failure.
from event-store-client.
I see. Sorry, in this case I need to investigate a little more. I hope to get to it soon.
from event-store-client.
So from what I can see, there is no way to do it perfectly.
I'd suggest go with $subscription->close();
, then delay to close the connection like this: Loop::delay(5000, function () use ($connection) { $connection->close(); });
- this will wait 5 secs before closing the event store connection, so you give the subscription a little time to get to the subscription drop notification. If your event processing takes a little longer, decrease maxLiveQueueSize
.
from event-store-client.
Though is it really necessary to actually close the connection? The main reason for me is because as long as the connection is open it also keeps the loop going. If I'm not mistaken it's because of the timer tick watcher that's a referenced watcher. But what's the reason for it actually being a referenced watcher?
If you have running operations (what are those exactly? I saw them in the code but I'm unsure what they are) or subscriptions, a referenced watcher makes sense so that the subscription can keep listening for incoming events. But if nothing is running, maybe it can be the responsibility of the implementation to keep the loop going rather than the package forcing the loop to keep running?
If some coding/refactoring needs to be done I'm fine with giving it a go myself, but I'm trying to figure what the correct solution is first :).
from event-store-client.
The event loop is running until there are no more watchers. The connection has a permanent watcher, the heartbeat. So as long as you don't close the connection, heartbeats will be handled.
from event-store-client.
The loop only runs until there are no more referenced watchers. Why does the heartbeat need to be a referenced watcher?
from event-store-client.
@SunMar I tried changing that today, but some tests are failing then.
from event-store-client.
I just checked why tests are failing: When the timer ticks are handled by unreferenced watchers, it happens that the connection gets closed and then you have a useless connection lying around. So I don't know how to do this, but if you can provide a PR that does it without breaking tests, I'm all for it.
Closing this ticket now.
from event-store-client.
@prolic I don't have time right now but I'll take a look at it somewhere this week. Should putting a Loop::unreference()
around the watcher be enough, and then figure out why the tests are failing? Or is there more I should take into account?
from event-store-client.
@SunMar I pushed my unreferenced_heartbeat
branch, you should be able to go from there. I hope it helps.
from event-store-client.
@prolic I've been digging into this and also been talking with trowski of amphp who helped me a lot in getting more familiar with how Amphp works. The basic jist of it all is that simply unreferencing the loop in startReceiving()
isn't going to work because the socket connection itself is also referenced. And you can't unreference both because then you don't have any referenced watchers and the loop will prematurely end. A more sophisticated solution is needed.
If the goal is to have the connection unreferenced whenever it's not needed, then any Promise that waits for data to be read from the connection should make sure the connection become reference until it is resolved, or the loop will end before the promise is resolved.
A different approach is to keep the connection referenced as long as it's open, but to track all the Promises that depend on the connection. Then when you call $this->connection->close()
you can wait for the Promises to be resolved before actually closing the connection, instead of doing a immediate force close as is done now. That will make sure $this->connection->close()
does a graceful close of the connection which you can safely use in something like a signal handler.
If you want to keep the possibility of forcibly killing the connecting regardless of any unresolved promises you can always have something like a $this->connection->forceClose()
function that does that.
Does that sound like a good approach to you, if I try to do a refactor where the watchers remain referenced as they are now, but $this->connection->close()
does a graceful close of the connection, waiting for unresolved Promises before actually closing the connection?
from event-store-client.
As this is a port of the original .NET client, I tend to keep it as close to the original as possible. Therefore connection->close()
should close immediately. You closed the connection despite the fact that you waited for responses, so it's your problem that those will fail all.
from event-store-client.
Related Issues (20)
- Unclear message when trying to communicate through closed connection
- Support EventStore v6 HOT 3
- Support UUID1 / UUID5, ... HOT 3
- RC not compatible with any release of "prooph/event-sourcing" HOT 1
- Add psalm HOT 1
- [RFC] Adding PHPdoc to generate API docs HOT 2
- Issue with simple examples and event store 5 HOT 1
- Travis failed (ramsey uuid 4) HOT 9
- [RFC] Removing "type" from projections management HOT 1
- Change constructors for http communications HOT 3
- EventStorePersistentSubscription is in namespace internal but required to be used HOT 1
- Connection process cannot be "waited" HOT 7
- MemberInfoDto timeStamp is a string, not int as the code suggests for a v5 cluster HOT 2
- Event Store is now secure by default HOT 5
- subscribetoAllAsync fails with BadRequest HOT 3
- Trying the example with server version 20.10.0.0, but can't connect HOT 2
- Segmentation fault HOT 3
- Incompatibility with Laravel 8 HOT 1
- Prooph\EventStore\Exception\CannotEstablishConnection Cannot resolve target end point HOT 11
- performOnMasterOnly property doesn't seem to work HOT 3
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from event-store-client.