Comments (9)
For completeness, logs from the process termination:
12:07:05.381 time="2020-06-02T11:07:05Z" level=info msg="Shutting down..." component=kcl
12:07:05.381 time="2020-06-02T11:07:05Z" level=info msg="Worker shutdown in requested." component=kcl
12:07:05.425 time="2020-06-02T11:07:05Z" level=info msg="Release lease for shard shardId-000000000003" component=kcl
12:07:07.819 time="2020-06-02T11:07:07Z" level=info msg="Release lease for shard shardId-000000000004" component=kcl
12:07:08.180 time="2020-06-02T11:07:08Z" level=info msg="Release lease for shard shardId-000000000005" component=kcl
12:07:08.186 time="2020-06-02T11:07:08Z" level=info msg="Worker loop is complete. Exiting from worker." component=kcl
No errors or indication that anything was trying to handle shard 6 at this point.
from vmware-go-kcl.
If I understand correctly, there is a race during the release of a shard.
During shard release:
1a. The local shard record is updated to remove the leaseholder
1b. The remote shard record is updated to remove the leaseholder
Concurrently, the worker event-loop may run:
2a. The local shard record's leaseholder is examined, and found not to belong to this worker:
vmware-go-kcl/clientlibrary/worker/worker.go
Lines 272 to 274 in 7d5bfbb
2b. The local shard record's checkpoint and leaseholder are updated from the remote checkpoint store
vmware-go-kcl/clientlibrary/worker/worker.go
Line 276 in 7d5bfbb
vmware-go-kcl/clientlibrary/checkpoint/dynamodb-checkpointer.go
Lines 236 to 240 in 7d5bfbb
2c. The remote shard's checkpoint and leaseholder are fetched in to local variables as part of the process of attempting to take the lease:
vmware-go-kcl/clientlibrary/worker/worker.go
Line 291 in 7d5bfbb
2d. A conditional update is built and submitted based on this state, the success of which determines the outcome of taking the lease
2e. Conditional on the success of 2d, a processor is started:
vmware-go-kcl/clientlibrary/worker/worker.go
Lines 292 to 298 in 7d5bfbb
vmware-go-kcl/clientlibrary/worker/worker.go
Lines 306 to 311 in 7d5bfbb
The following sequence results in a divergence between the local and remote views of the shard leaseholder (local believes it is owned, remote states that it is not) and a divergence between the local view of the shard and the actual processing state (local indicates that the shard is owned and therefore being processed, but no local processor is running):
1a, 2a, 2b, 2c, 1b, 2d, 2e
The local state is updated by the shard release, causing the worker to attempt to re-take the shard.
The worker examines the remote state during the preparation to take the shard.
The remote state is updated, invalidating the worker's attempt to take the shard.
The worker fails to take the shard, but critically, in 2b the worker has overwritten the local state update made in 1a with stale data from the remote state.
from vmware-go-kcl.
Inspection leads to other causes for concern, for example https://github.com/vmware/vmware-go-kcl/blob/master/clientlibrary/checkpoint/dynamodb-checkpointer.go#L198-L220 overwrites the remote record in its entirety, filling in the leaseholder record from the local shard cache, with no conditional clause to assert that it has not changed in the remote store.
This suggests that if the shard consumer loop responsible for both updating the lease and reading / processing the records were to stall and the lease time out underneath it, the checkpointer may overwrite a new claimant's lease in the remote table.
from vmware-go-kcl.
Most likely the same issue as: #72
Make sure to do checkpointing during shutting down. See example:
https://github.com/vmware/vmware-go-kcl/blob/master/test/worker_test.go#L300
from vmware-go-kcl.
Also, for "Error in getRecords: ProvisionedThroughputExceededException: ", maybe use larger number of WithMaxRecords.
from vmware-go-kcl.
Hi @taojwmware, thanks for taking a look.
Unfortunately I don't believe it's the same as #72, for a number of reasons:
- We already incorporated a nil checkpoint on shutdown, as suggested.
- It does not occur at shutdown, but "randomly" during the life of the application.
- It is not triggered by any re-sharding or processing of shards which end during processing. The shards are completely stable when it happens.
What happens to trigger the race is that a call to dynamodb to refresh an existing lease fails. In our case it fails with a ProvisionedThroughputExceededException, but it's a network call, so there are a wide variety of other reasons this could fail. Increasing the throughput on our dynamodb table would be sensible, but it doesn't eliminate the chance of this happening.
Failing to refresh the lease in the shard consumer loop causes the shard consumer to exit and call the deferred releaseShard. releaseShard then races with the worker event loop as described in detail in my third comment.
I've added logging and observed re-occurrences. I can confirm that after the race, the following are true:
- the local shard cache has the current worker as the owner of the shard
- the dynamodb table has no owner for the shard
- there is no shard consumer running for the shard
This state is stable from that point forwards.
This is a big problem for us, because we're currently running only a single consumer process, so there is no other consumer which can pick up the shard processing based on the (correct) dynamodb view of leases. However, even with multiple consumer processes, we would only increase the time to failure, not eliminate it (eventually all processes would experience the race and end up with shards they are stuck on). Each occurrence of a "stuck" shard also incorrectly consumes one of the max shards per process.
I'm starting to look at ways of closing this race, but I suspect that a fix might be moderately invasive. I'd be keen to get your acknowledgement that you recognise what's happening here and would be interested in reviewing a fix.
from vmware-go-kcl.
I think we're seeing a similar if not the same issue. What happens is that at some point during the life of the program, we see Failed in acquiring lease on shard
in logs and at that point the shard gets "stuck", which means that the process stop reading it.
It's happening both for streams with a single shard and a single consumer as well as with streams with multiple shards and single or multiple consumers.
We're now dealing with this error with a monitor on the dynamodb table: the LeaseTimeout
value is periodically checked and if too old, the process is restarted. I didn't perform a full debug yet (will do it soon), but I'm pretty sure we'll see the same race condition as pointed out by @mdpye (thanks for the deep debugging, btw!).
from vmware-go-kcl.
That certainly sounds very similar to what we experienced. Though it's been a while now, here's what I remember of my conclusions:
- The dynamodb checkpointer is not safe for concurrent use, see e.g. #77 (comment), though I believe there are more places where changes to the shared state are not respected. The issue described here shows that it's not safe even in the way it is used concurrently inside a single process. Some of the cases where corruption would be expected:
- Failure to communicate with dynamo (i.e. this issue)
- Unexpected delays in processing (the lock held by the delayed process expires during processing, the process doesn't respect that, two processes end up processing the same shard)
- The checkpointer is supposed to be responsible for co-ordinating processing between multiple concurrent processes, so it absolutely needs to be safe for concurrent use.
- I looked at improving the checkpointer to make it safe by adding appropriate conditions on updates and so on, but the existing interface with the rest of the library makes this very difficult or impossible to achieve - I believe the core of the library would have to be re-architected to give reliable behaviour.
Given the effort involved to make this library reliable, we chose to move our project to a much simpler library (https://github.com/twitchscience/kinsumer, I believe).
Having audited the code base, I can't recommend this library for production use. The required fixes are far from trivial. Kinsumer is much more limited, but I believe it's been working well for the use case we put it in, and was simple enough to follow if bugs were found.
from vmware-go-kcl.
From the log, it shows trying to acquire a lease right after giving up the lease. The default dynamodb provisioned read capacity is 10/s. One alternative solution is to configure the larger shard sync sleep value.
from vmware-go-kcl.
Related Issues (20)
- Lease stealing code exception HOT 3
- Option to assume role in AWS Kinesis client HOT 1
- Checkpointer interface breaking change in v1.3.0 HOT 1
- worker.go:339 Error in getRecords: unexpected EOF
- How to get the StreamName/ConsumerName inside ProcessRecords method HOT 1
- Unsupported protocol scheme
- Workload skewed across workers HOT 3
- (Still) Too many calls to DescribeStream HOT 3
- Error in getRecords leaves a dangling record processor. HOT 1
- Stuck in waitOnParentShard after resharding HOT 4
- Possible Data Race for input Checkpointer HOT 1
- Semantic versioning friendly version tags HOT 1
- Prometheus metrics should add appname as label and not in the metric name HOT 1
- Possibility to Work w/ DynamoDB Streams HOT 3
- Error in publishing cloudwatch metrics. Error: NoCredentialProviders: no valid providers in chain. HOT 1
- How to use record deaggregator? HOT 2
- Multiple consumers processing records from the same shard. HOT 10
- Tagged toolchain docker container vmware/go-kcl-toolchain:0.1.2 uses golang version 1.12.4
- AWS Go SDK V2 HOT 15
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from vmware-go-kcl.