Comments (10)
The library is working as expected. I was starting up the consumers from different docker-compose commands and there were issues due to that. I will close this issue for now.
from vmware-go-kcl.
Log from 1st container:
lct-consumer | 3:41PM WRN go/src/last-contact-updater/consumer_main.go:19 > Configuration = &{last-contact-time-consumer us-east-1 device-last-contact-time 1 1000 5000 300000 1 lct-checkpoint http://localstack:4566 minikube local@elasticsearch:9200 1000 300 1 true http} app=last-contact-updater
lct-consumer | 3:41PM INF go/src/last-contact-updater/internal/consumer/lct_updater_app.go:39 > Starting lct consumer.... app=last-contact-updater
lct-consumer | 3:41PM INF go/src/last-contact-updater/internal/logger/es-logger.go:22 > elastic: http://172.29.0.2:9200 joined the cluster app=last-contact-updater
lct-consumer | 3:41PM INF go/src/last-contact-updater/internal/logger/kcl-logger.go:25 > Worker initialization in progress... app=last-contact-updater
lct-consumer | 3:41PM INF go/src/last-contact-updater/internal/logger/kcl-logger.go:25 > Creating Kinesis session app=last-contact-updater
lct-consumer | 3:41PM INF go/src/last-contact-updater/internal/logger/kcl-logger.go:25 > Creating DynamoDB based checkpointer app=last-contact-updater
lct-consumer | 3:41PM INF go/src/last-contact-updater/internal/logger/kcl-logger.go:25 > Initializing Checkpointer app=last-contact-updater
lct-consumer | 3:41PM INF go/src/last-contact-updater/internal/logger/kcl-logger.go:25 > Creating DynamoDB session app=last-contact-updater
lct-consumer | 3:41PM INF go/src/last-contact-updater/internal/logger/kcl-logger.go:25 > Initialization complete. app=last-contact-updater
lct-consumer | 3:41PM INF go/src/last-contact-updater/internal/logger/kcl-logger.go:25 > Starting monitoring service. app=last-contact-updater
lct-consumer | 3:41PM INF go/src/last-contact-updater/internal/logger/kcl-logger.go:25 > Starting worker event loop. app=last-contact-updater
lct-consumer | 3:41PM INF go/src/last-contact-updater/internal/consumer/lct_updater_app.go:58 > Lct consumer started, waiting indefinitely.... app=last-contact-updater
lct-consumer | 3:41PM INF go/src/last-contact-updater/internal/logger/kcl-logger.go:25 > Found new shard with id shardId-000000000000 app=last-contact-updater
lct-consumer | 3:41PM INF go/src/last-contact-updater/internal/logger/kcl-logger.go:25 > Found 1 shards app=last-contact-updater
lct-consumer | 3:41PM INF go/src/last-contact-updater/internal/logger/kcl-logger.go:25 > Start polling shard consumer for shard: shardId-000000000000 app=last-contact-updater
lct-consumer | 3:41PM INF go/src/last-contact-updater/internal/consumer/lct_processor.go:25 > Processing SharId: shardId-000000000000 at checkpoint: app=last-contact-updater
from vmware-go-kcl.
Logs from 2nd consumer container
lct-consumer | 3:41PM WRN go/src/last-contact-updater/consumer_main.go:19 > Configuration = &{last-contact-time-consumer us-east-1 device-last-contact-time 1 1000 5000 300000 1 lct-checkpoint http://localstack:4566 minikube local@elasticsearch:9200 1000 300 1 true http} app=last-contact-updater
lct-consumer | 3:41PM INF go/src/last-contact-updater/internal/consumer/lct_updater_app.go:39 > Starting lct consumer.... app=last-contact-updater
lct-consumer | 3:41PM INF go/src/last-contact-updater/internal/logger/es-logger.go:22 > elastic: http://172.29.0.2:9200 joined the cluster app=last-contact-updater
lct-consumer | 3:41PM INF go/src/last-contact-updater/internal/logger/kcl-logger.go:25 > Worker initialization in progress... app=last-contact-updater
lct-consumer | 3:41PM INF go/src/last-contact-updater/internal/logger/kcl-logger.go:25 > Creating Kinesis session app=last-contact-updater
lct-consumer | 3:41PM INF go/src/last-contact-updater/internal/logger/kcl-logger.go:25 > Creating DynamoDB based checkpointer app=last-contact-updater
lct-consumer | 3:41PM INF go/src/last-contact-updater/internal/logger/kcl-logger.go:25 > Initializing Checkpointer app=last-contact-updater
lct-consumer | 3:41PM INF go/src/last-contact-updater/internal/logger/kcl-logger.go:25 > Creating DynamoDB session app=last-contact-updater
lct-consumer | 3:41PM INF go/src/last-contact-updater/internal/logger/kcl-logger.go:25 > Initialization complete. app=last-contact-updater
lct-consumer | 3:41PM INF go/src/last-contact-updater/internal/logger/kcl-logger.go:25 > Starting monitoring service. app=last-contact-updater
lct-consumer | 3:41PM INF go/src/last-contact-updater/internal/logger/kcl-logger.go:25 > Starting worker event loop. app=last-contact-updater
lct-consumer | 3:41PM INF go/src/last-contact-updater/internal/consumer/lct_updater_app.go:58 > Lct consumer started, waiting indefinitely.... app=last-contact-updater
lct-consumer | 3:41PM INF go/src/last-contact-updater/internal/logger/kcl-logger.go:25 > Found new shard with id shardId-000000000000 app=last-contact-updater
lct-consumer | 3:41PM INF go/src/last-contact-updater/internal/logger/kcl-logger.go:25 > Found 1 shards app=last-contact-updater
lct-consumer | 3:41PM INF go/src/last-contact-updater/internal/logger/kcl-logger.go:25 > Start polling shard consumer for shard: shardId-000000000000 app=last-contact-updater
lct-consumer | 3:41PM INF go/src/last-contact-updater/internal/consumer/lct_processor.go:25 > Processing SharId: shardId-000000000000 at checkpoint: app=last-contact-updater
from vmware-go-kcl.
@rrejavnera, correct me if I'm wrong but does the stream only have 1 shard? If so would you mind seeing if you experience the same issue on a stream with more than 1 shard? Thanks
from vmware-go-kcl.
@connormckelvey Yes. The stream has only 1 shard. I will test with 2 shards and update.
from vmware-go-kcl.
@connormckelvey I tested with 2 shards and all the consumers are polling shard-0 only.
Log1
lct-consumer | 2:53AM INF go/src/last-contact-updater/internal/logger/kcl-logger.go:25 > Found new shard with id shardId-000000000000 app=last-contact-updater
lct-consumer | 2:53AM INF go/src/last-contact-updater/internal/logger/kcl-logger.go:25 > Found new shard with id shardId-000000000001 app=last-contact-updater
lct-consumer | 2:53AM INF go/src/last-contact-updater/internal/logger/kcl-logger.go:25 > Found 2 shards app=last-contact-updater
lct-consumer | 2:53AM INF go/src/last-contact-updater/internal/logger/kcl-logger.go:25 > Start polling shard consumer for shard: shardId-000000000000 app=last-contact-updater
lct-consumer | 2:53AM INF go/src/last-contact-updater/internal/consumer/lct_processor.go:25 > Processing SharId: shardId-000000000000 at checkpoint: app=last-contact-updater
Log2
lct-consumer | 2:53AM INF go/src/last-contact-updater/internal/logger/kcl-logger.go:25 > Found new shard with id shardId-000000000000 app=last-contact-updater
lct-consumer | 2:53AM INF go/src/last-contact-updater/internal/logger/kcl-logger.go:25 > Found new shard with id shardId-000000000001 app=last-contact-updater
lct-consumer | 2:53AM INF go/src/last-contact-updater/internal/logger/kcl-logger.go:25 > Found 2 shards app=last-contact-updater
lct-consumer | 2:53AM INF go/src/last-contact-updater/internal/logger/kcl-logger.go:25 > Start polling shard consumer for shard: shardId-000000000000 app=last-contact-updater
lct-consumer | 2:53AM INF go/src/last-contact-updater/internal/consumer/lct_processor.go:25 > Processing SharId: shardId-000000000000 at checkpoint: app=last-contact-updater
Log3
lct-consumer | 2:53AM INF go/src/last-contact-updater/internal/logger/kcl-logger.go:25 > Found new shard with id shardId-000000000000 app=last-contact-updater
lct-consumer | 2:53AM INF go/src/last-contact-updater/internal/logger/kcl-logger.go:25 > Found new shard with id shardId-000000000001 app=last-contact-updater
lct-consumer | 2:53AM INF go/src/last-contact-updater/internal/logger/kcl-logger.go:25 > Found 2 shards app=last-contact-updater
lct-consumer | 2:53AM INF go/src/last-contact-updater/internal/logger/kcl-logger.go:25 > Start polling shard consumer for shard: shardId-000000000000 app=last-contact-updater
lct-consumer | 2:53AM INF go/src/last-contact-updater/internal/consumer/lct_processor.go:25 > Processing SharId: shardId-000000000000 at checkpoint: app=last-contact-updater
from vmware-go-kcl.
Ok, thank you for trying with 2 shards. I do not see anything obvious in the logs. I will need some time to attempt to reproduce on my end.
from vmware-go-kcl.
Ok. I will also debug and let you know my findings.
from vmware-go-kcl.
@rrejavnera So far I am not able to replicate this on my end using a known working configuration. Lease stealing does have the potential for a slight overlap in processing when shards are stolen, but they issue you posted does not look like that (since it appears that the second shard is never even picked up).
So a couple questions in regard to your configuration:
-
The config you posted, are you creating the
KinesisClientLibConfiguration
using the struct literal syntax (as posted above), or are you using theNewKinesisClientLibConfig
constructor function? The constructor does some validation and defaulting that could account for the strange behavior you are seeing. So if you are using the literal syntax, I would give the constructor a shot. -
In the original slack message you mentioned that lease stealing was not working, however I do not see anywhere in the config where the LeaseStealing functionality is enabled. You can do this with the
WithLeaseStealing
method on the KCLClientLibConfiguration.
To be clear, I wouldn't expect to see this behavior with lease stealing enabled or disabled, so if my point from the first bullet does not clear things up, we'll need to dive deeper.
from vmware-go-kcl.
Sure. I will give it a try and let you know.
from vmware-go-kcl.
Related Issues (20)
- Lease stealing code exception HOT 3
- Option to assume role in AWS Kinesis client HOT 1
- Checkpointer interface breaking change in v1.3.0 HOT 1
- worker.go:339 Error in getRecords: unexpected EOF
- How to get the StreamName/ConsumerName inside ProcessRecords method HOT 1
- Unsupported protocol scheme
- Workload skewed across workers HOT 3
- (Still) Too many calls to DescribeStream HOT 3
- Error in getRecords leaves a dangling record processor. HOT 1
- Stuck in waitOnParentShard after resharding HOT 4
- Possible Data Race for input Checkpointer HOT 1
- Semantic versioning friendly version tags HOT 1
- Shard consumer sometimes fails to recover from error refreshing lease HOT 9
- Prometheus metrics should add appname as label and not in the metric name HOT 1
- Possibility to Work w/ DynamoDB Streams HOT 3
- Error in publishing cloudwatch metrics. Error: NoCredentialProviders: no valid providers in chain. HOT 1
- How to use record deaggregator? HOT 2
- Tagged toolchain docker container vmware/go-kcl-toolchain:0.1.2 uses golang version 1.12.4
- AWS Go SDK V2 HOT 15
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from vmware-go-kcl.