Giter Club home page Giter Club logo

Comments (9)

mdpye avatar mdpye commented on August 16, 2024

For completeness, logs from the process termination:

12:07:05.381 time="2020-06-02T11:07:05Z" level=info msg="Shutting down..." component=kcl
12:07:05.381 time="2020-06-02T11:07:05Z" level=info msg="Worker shutdown in requested." component=kcl
12:07:05.425 time="2020-06-02T11:07:05Z" level=info msg="Release lease for shard shardId-000000000003" component=kcl
12:07:07.819 time="2020-06-02T11:07:07Z" level=info msg="Release lease for shard shardId-000000000004" component=kcl
12:07:08.180 time="2020-06-02T11:07:08Z" level=info msg="Release lease for shard shardId-000000000005" component=kcl
12:07:08.186 time="2020-06-02T11:07:08Z" level=info msg="Worker loop is complete. Exiting from worker." component=kcl

No errors or indication that anything was trying to handle shard 6 at this point.

from vmware-go-kcl.

mdpye avatar mdpye commented on August 16, 2024

If I understand correctly, there is a race during the release of a shard.

During shard release:
1a. The local shard record is updated to remove the leaseholder


1b. The remote shard record is updated to remove the leaseholder
if err := sc.checkpointer.RemoveLeaseOwner(shard.ID); err != nil {

Concurrently, the worker event-loop may run:
2a. The local shard record's leaseholder is examined, and found not to belong to this worker:

if shard.GetLeaseOwner() == w.workerID {
continue
}

2b. The local shard record's checkpoint and leaseholder are updated from the remote checkpoint store
err := w.checkpointer.FetchCheckpoint(shard)
, leading to
shard.Checkpoint = aws.StringValue(sequenceID.S)
if assignedTo, ok := checkpoint[LEASE_OWNER_KEY]; ok {
shard.AssignedTo = aws.StringValue(assignedTo.S)
}

2c. The remote shard's checkpoint and leaseholder are fetched in to local variables as part of the process of attempting to take the lease:
err = w.checkpointer.GetLease(shard, w.workerID)
leading to
currentCheckpoint, err := checkpointer.getItem(shard.ID)

2d. A conditional update is built and submitted based on this state, the success of which determines the outcome of taking the lease
err = checkpointer.conditionalUpdate(conditionalExpression, expressionAttributeValues, marshalledCheckpoint)

2e. Conditional on the success of 2d, a processor is started:
if err != nil {
// cannot get lease on the shard
if err.Error() != chk.ErrLeaseNotAquired {
log.Errorf("Cannot get lease: %+v", err)
}
continue
}
which exits the loop before
go func() {
defer w.waitGroup.Done()
if err := sc.getRecords(shard); err != nil {
log.Errorf("Error in getRecords: %+v", err)
}
}()
in the negative case.

The following sequence results in a divergence between the local and remote views of the shard leaseholder (local believes it is owned, remote states that it is not) and a divergence between the local view of the shard and the actual processing state (local indicates that the shard is owned and therefore being processed, but no local processor is running):

1a, 2a, 2b, 2c, 1b, 2d, 2e

The local state is updated by the shard release, causing the worker to attempt to re-take the shard.
The worker examines the remote state during the preparation to take the shard.
The remote state is updated, invalidating the worker's attempt to take the shard.
The worker fails to take the shard, but critically, in 2b the worker has overwritten the local state update made in 1a with stale data from the remote state.

from vmware-go-kcl.

mdpye avatar mdpye commented on August 16, 2024

Inspection leads to other causes for concern, for example https://github.com/vmware/vmware-go-kcl/blob/master/clientlibrary/checkpoint/dynamodb-checkpointer.go#L198-L220 overwrites the remote record in its entirety, filling in the leaseholder record from the local shard cache, with no conditional clause to assert that it has not changed in the remote store.

This suggests that if the shard consumer loop responsible for both updating the lease and reading / processing the records were to stall and the lease time out underneath it, the checkpointer may overwrite a new claimant's lease in the remote table.

from vmware-go-kcl.

taoj-action avatar taoj-action commented on August 16, 2024

Most likely the same issue as: #72

Make sure to do checkpointing during shutting down. See example:
https://github.com/vmware/vmware-go-kcl/blob/master/test/worker_test.go#L300

from vmware-go-kcl.

taoj-action avatar taoj-action commented on August 16, 2024

Also, for "Error in getRecords: ProvisionedThroughputExceededException: ", maybe use larger number of WithMaxRecords.

from vmware-go-kcl.

mdpye avatar mdpye commented on August 16, 2024

Hi @taojwmware, thanks for taking a look.

Unfortunately I don't believe it's the same as #72, for a number of reasons:

  • We already incorporated a nil checkpoint on shutdown, as suggested.
  • It does not occur at shutdown, but "randomly" during the life of the application.
  • It is not triggered by any re-sharding or processing of shards which end during processing. The shards are completely stable when it happens.

What happens to trigger the race is that a call to dynamodb to refresh an existing lease fails. In our case it fails with a ProvisionedThroughputExceededException, but it's a network call, so there are a wide variety of other reasons this could fail. Increasing the throughput on our dynamodb table would be sensible, but it doesn't eliminate the chance of this happening.

Failing to refresh the lease in the shard consumer loop causes the shard consumer to exit and call the deferred releaseShard. releaseShard then races with the worker event loop as described in detail in my third comment.

I've added logging and observed re-occurrences. I can confirm that after the race, the following are true:

  • the local shard cache has the current worker as the owner of the shard
  • the dynamodb table has no owner for the shard
  • there is no shard consumer running for the shard

This state is stable from that point forwards.

This is a big problem for us, because we're currently running only a single consumer process, so there is no other consumer which can pick up the shard processing based on the (correct) dynamodb view of leases. However, even with multiple consumer processes, we would only increase the time to failure, not eliminate it (eventually all processes would experience the race and end up with shards they are stuck on). Each occurrence of a "stuck" shard also incorrectly consumes one of the max shards per process.

I'm starting to look at ways of closing this race, but I suspect that a fix might be moderately invasive. I'd be keen to get your acknowledgement that you recognise what's happening here and would be interested in reviewing a fix.

from vmware-go-kcl.

tommyblue avatar tommyblue commented on August 16, 2024

I think we're seeing a similar if not the same issue. What happens is that at some point during the life of the program, we see Failed in acquiring lease on shard in logs and at that point the shard gets "stuck", which means that the process stop reading it.
It's happening both for streams with a single shard and a single consumer as well as with streams with multiple shards and single or multiple consumers.

We're now dealing with this error with a monitor on the dynamodb table: the LeaseTimeout value is periodically checked and if too old, the process is restarted. I didn't perform a full debug yet (will do it soon), but I'm pretty sure we'll see the same race condition as pointed out by @mdpye (thanks for the deep debugging, btw!).

from vmware-go-kcl.

mdpye avatar mdpye commented on August 16, 2024

That certainly sounds very similar to what we experienced. Though it's been a while now, here's what I remember of my conclusions:

  • The dynamodb checkpointer is not safe for concurrent use, see e.g. #77 (comment), though I believe there are more places where changes to the shared state are not respected. The issue described here shows that it's not safe even in the way it is used concurrently inside a single process. Some of the cases where corruption would be expected:
    • Failure to communicate with dynamo (i.e. this issue)
    • Unexpected delays in processing (the lock held by the delayed process expires during processing, the process doesn't respect that, two processes end up processing the same shard)
  • The checkpointer is supposed to be responsible for co-ordinating processing between multiple concurrent processes, so it absolutely needs to be safe for concurrent use.
  • I looked at improving the checkpointer to make it safe by adding appropriate conditions on updates and so on, but the existing interface with the rest of the library makes this very difficult or impossible to achieve - I believe the core of the library would have to be re-architected to give reliable behaviour.

Given the effort involved to make this library reliable, we chose to move our project to a much simpler library (https://github.com/twitchscience/kinsumer, I believe).

Having audited the code base, I can't recommend this library for production use. The required fixes are far from trivial. Kinsumer is much more limited, but I believe it's been working well for the use case we put it in, and was simple enough to follow if bugs were found.

from vmware-go-kcl.

taoj-action avatar taoj-action commented on August 16, 2024

From the log, it shows trying to acquire a lease right after giving up the lease. The default dynamodb provisioned read capacity is 10/s. One alternative solution is to configure the larger shard sync sleep value.

from vmware-go-kcl.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.