Comments (7)
I think adding extra go routine complicates the process model. The other alternative is using the similar model used by SWF by reporting RecordActivityTaskHeartbeat which is similar to periodic checkpointing by just renewing lease instead.
https://docs.aws.amazon.com/amazonswf/latest/apireference/API_RecordActivityTaskHeartbeat.html
from vmware-go-kcl.
The other way is to do periodic checkpointing without adding more api which doesn't mean it has to be done after processing all records in a batch.
from vmware-go-kcl.
Thanks for the quick replies.
For my particular issue, doing periodic checkpoints would actually work (I'm doing that anyway) - but reading through the code, a checkpoint doesn't renew the lease. If we could change the behavior for a checkpoint to renew the lease, this issue would be solved for me.
Happy to implement that. Or happy to hear that I read the code wrong :).
from vmware-go-kcl.
You are correct and it is a bug:
https://github.com/vmware/vmware-go-kcl/blob/master/clientlibrary/checkpoint/dynamodb-checkpointer.go#L198
The leaseTimeout should be updated when doing checkpointing similar to GetLease()
You are more than welcome to implement it. Thx!
from vmware-go-kcl.
Looking at GetLease
, and specifically https://github.com/vmware/vmware-go-kcl/blob/master/clientlibrary/checkpoint/dynamodb-checkpointer.go#L173 it looks like the checkpoint will already be set in that function. Since CheckpointSequence
already assumes the checkpoint is set in the shard, I think this will work as intended with:
func (checkpointer *DynamoCheckpoint) CheckpointSequence(shard *par.ShardStatus) error {
// We assume shard.Checkpoint has been set correctly here. Since we also want
// to renew the lease while checkpointing in case the processing of a set of records
// takes longer than the lease time, we simply can call GetLease here.
return checkpointer.GetLease(shard, shard.AssignedTo)
}
If you agree, I'll put up a quick PR for this.
from vmware-go-kcl.
It should be much simpler since the caller of CheckpointSequence already has the lease. Just need to update new lease timeout.
newLeaseTimeout := time.Now().Add(time.Duration(checkpointer.LeaseDuration) * time.Millisecond).UTC()
newLeaseTimeoutString := newLeaseTimeout.Format(time.RFC3339)
# update lease timeout
shard.LeaseTimeout = newLeaseTimeout
from vmware-go-kcl.
This is actually not a bug. Kinesis document says: "Gets an Amazon Kinesis shard iterator. A shard iterator expires 5 minutes after it is returned to the requester."
https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html
which means the shard iterator will be expired even though the consumer still hold the lease. The workaround is to reduce max record number using WithMaxRecords(smaller-value).
from vmware-go-kcl.
Related Issues (20)
- Lease stealing code exception HOT 3
- Option to assume role in AWS Kinesis client HOT 1
- Checkpointer interface breaking change in v1.3.0 HOT 1
- worker.go:339 Error in getRecords: unexpected EOF
- How to get the StreamName/ConsumerName inside ProcessRecords method HOT 1
- Unsupported protocol scheme
- Workload skewed across workers HOT 3
- (Still) Too many calls to DescribeStream HOT 3
- Error in getRecords leaves a dangling record processor. HOT 1
- Stuck in waitOnParentShard after resharding HOT 4
- Possible Data Race for input Checkpointer HOT 1
- Semantic versioning friendly version tags HOT 1
- Shard consumer sometimes fails to recover from error refreshing lease HOT 9
- Prometheus metrics should add appname as label and not in the metric name HOT 1
- Possibility to Work w/ DynamoDB Streams HOT 3
- Error in publishing cloudwatch metrics. Error: NoCredentialProviders: no valid providers in chain. HOT 1
- How to use record deaggregator? HOT 2
- Multiple consumers processing records from the same shard. HOT 10
- Tagged toolchain docker container vmware/go-kcl-toolchain:0.1.2 uses golang version 1.12.4
- AWS Go SDK V2 HOT 15
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from vmware-go-kcl.