Giter Club home page Giter Club logo

vmware-go-kcl's Introduction

VMware-Go-KCL

technology Go Go Report Card License: MIT

Overview

Amazon Kinesis enables real-time processing of streaming data at massive scale. Kinesis Streams is useful for rapidly moving data off data producers and then continuously processing the data, be it to transform the data before emitting to a data store, run real-time metrics and analytics, or derive more complex data streams for further processing.

The VMware Kinesis Client Library for GO (VMware-Go-KCL) enables Go developers to easily consume and process data from [Amazon Kinesis][kinesis].

VMware-Go-KCL brings Go/Kubernetes community with Go language native implementation of KCL matching exactly the same API and functional spec of original Java KCL v2.0 without the resource overhead of installing Java based MultiLangDaemon.

Besides, vmware-go-kcl-v2 is the v2 version of VMWare KCL for the Go programming language by utilizing AWS Go SDK V2.

Try it out

Prerequisites

Make sure hmake is version 1.3.1 or above and go is version 1.11 or above

hmake --version
1.3.1

Make sure to launch Docker daemon with specified DNS server --dns DNS-SERVER-IP

On Ubuntu, update the file /etc/default/docker to put --dns DNS-SERVER-IP in DOCKER_OPTS.

On Mac, set DNS in Docker PreferencesDaemonInsecure registries

Build & Run

hmake

# security scan
hmake scanast

# run test
hmake check

# run integration test
# update the worker_test.go to let it point to your Kinesis stream
hmake test

Documentation

VMware-Go-KCL matches exactly the same interface and programming model from original Amazon KCL, the best place for getting reference, tutorial is from Amazon itself:

Contributing

The vmware-go-kcl project team welcomes contributions from the community. Before you start working with vmware-go-kcl, please read our Developer Certificate of Origin. All contributions to this repository must be signed as described on that page. Your signature certifies that you wrote the patch or have the right to pass it on as an open-source patch. For more detailed information, refer to CONTRIBUTING.md.

License

MIT License

vmware-go-kcl's People

Contributors

arl avatar cmckelvey-vmware avatar connormckelvey avatar dependabot[bot] avatar dferstay avatar fafg avatar iliacimpoes avatar kevburnsjr avatar longzhou avatar lucarin91 avatar mdpye avatar taoj-action avatar timstudd avatar vmwsrpbot avatar wgerges-discovery avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

vmware-go-kcl's Issues

Properly handle leases for slow record processing

Since leases are renewed in the getRecords loop, and the records are processed in a blocking way in that same loop, leases aren't properly renewed on time if record processing takes a long time.

For example, if we allow for the maximum number of records in a batch, and processing a record takes a second, the call to ProcessRecords takes 10k seconds, while a lease might expire in 30 seconds. Some other client might start processing the same set of records, while the original client is still alive and well and busy processing. Even if (in this example), you'd lower the batch size to 100 records, it would still be an issue.

A solution could be to run ProcessRecords in a goroutine with a callback (through channel, function or otherwise) when this batch is done processing, such the the main getRecords loop is still alive and just won't get any more records until processing is done. Alternatively, the lease renewal can be moved to a separate goroutine (this would have my preference I think).

If you think this would be a good enhancement to this library, I'm happy to implement this.

Error in publishing cloudwatch metrics

It seems the input params are not used to set monitor service in cloudwatch Init function.
The empty appName, streamName and workerID cause PutMetricData failed with error string "Error in publishing cloudwatch metrics. Error: InvalidParameter...".
Could you help to check this issue? Thanks.

Create close channel first inside initialize method

Background

When we start the worker by calling the Start method, the application may fail due to AWS access rights or any AWS error, in that case we retry calling start couple of times. If retrying a certain amount of times is unsuccessful we handle the error and exit our program.

However we also have a signal listener which shuts down the application gracefully, and also calls Shutdown method. Considering our workers always failed to start due to failed initialize
When the Shutdown method inside the library is called, it attempts to close stop channel which was not yet created:

worker.go#L134

close(*w.stop)

Solution

I suggest to create the stop channel & and waitGroup in the first place when calling initialize method to avoid panics when calling the Shutdown method

Here's a small example of how our Start looks like:

Start
var retries int
err := w.Start()
for err != nil && retries < app.config.Workers.MaxRetries {
	app.logger.Error(err)
	time.Sleep(5 * time.Second)
	err = w.Start()
	retries++
}
errChan <- err

Use time.Duration instead of milliseconds count

Many, if not all, time duration in whole the code base are expressed in milliseconds. In Go time.Duration expressly exists for this and makes it easier to read, manipulate and use in coordination with other libraries.

I propose to use time.Duration everywhere, even in the metrics.MonitoringService interface. That would be a breaking change though so it would require a new major tag.

Support lease stealing

There is 'MaxLeasesToStealAtOneTime' option in the config but lease stealing logic is not implemented as far as I can tell.

Support enhanced fan-out

As of March 8th, 2019, the aws-sdk-go package has had (what appears to be) full support for SubscribeToStream, which allows for subscribers to have shard records pushed to them instead of having to deal with shard iterators and polling. This is also necessary for enhanced fan-out, which gives dedicated throughput to consumers and consumer-level metrics inside cloudwatch.

What do you think the level of effort would be to add this functionality to this project? It would help us out immensely.

Supporting links:
aws/aws-sdk-go#2402
https://docs.aws.amazon.com/streams/latest/dev/introduction-to-enhanced-consumers.html
https://docs.aws.amazon.com/streams/latest/dev/building-enhanced-consumers-api.html

Semantic versioning friendly version tags

Don't know if this made on purpose or not, but I noted that the tag for version 1.0 version does not respect the format required for semantic versioning.
For instance v-1.0 should be v1.0.0 if the goal is to indicates the first major version (no dash and 3 numbers).
If you look at the official Go package documentation website for vmware-go-kcl, indeed it shows no version for this project.
Now it is still possible to use this project with Go modules since go mod will use commits numbers, but IMHO you lose a lot of useful feature provided when respecting semantic versioning.

Prometheus metrics should add appname as label and not in the metric name

The prometheus metrics package creates metrics names by prepending the appname to the metric name.

This cause the metrics service to fail if the app name has characters that are invalid for prometheus. Additionally, embedding the app name into the metric name makes queries difficult in cases where multiple, related consumer applications are running.

It would make more sens to add the app name as a label and avoid the potential errors and better organize the data.

This:
p.processedBytes = prom.NewCounterVec(prom.CounterOpts{ Name: p.namespace +_processed_bytes, Help: "Number of bytes processed", }, []string{"kinesisStream", "shard"})

May be better off as:
p.processedBytes = prom.NewCounterVec(prom.CounterOpts{ Name: kcl_processed_bytes, Help: "Number of bytes processed", }, []string{"kinesisStream", "shard","app"})

With the app name added as a label:
func (p *MonitoringService) IncrBytesProcessed(shard string, count int64) { p.processedBytes.With(prom.Labels{"shard": shard, "kinesisStream": p.streamName","app":p.appname}).Add(float64(count)) }

Stuck in waitOnParentShard after resharding

Our app reads data from kinesis and we would like to make it quit after a resharding. It runs on ECS and it will be automatically restarted (we use this strategy to mitigate the missing shard stealing feature).

We use dynamodb checkpointing and we noticed that the app using this library can't quit. After some debugging I found out a possible bug, but it feels "strange" so I'm not 100% sure I'm not missing something here 😅

So, what happens is that, given 1 open shard, when we make a resharding we'll have 2 shards: the original one which is closed and its child, the new shard. The library stops reading the child shard because the parent shard, which is closed, could still contain data that must be read.
The child shard consumer enters the waitOnParentShard function and waits pshard.Checkpoint == chk.SHARD_END to be true, but nothing sets chk.SHARD_END in the dynamodb checkpoint.

We implemented the ProcessRecords(input *interfaces.ProcessRecordsInput) function so I guess we could check there if the last read record has the same checkpoint of the shard's ShardEndingSequenceNumber. Based on AWS docs ShardEndingSequenceNumber is populated only for closed shards and contains the last record's checkpoint in the shard.
ShardEndingSequenceNumber is available in worker.shardStatus, but it's not public. It could, in case, be added to the input passed to ProcessRecords, but it isn't called if there're no records to process, so it's probably not the right path.
So here I'm wondering if there's a missing part in the library or I'm not doing the correct thing.

Looking around in the code I noticed
https://github.com/vmware/vmware-go-kcl/blob/master/clientlibrary/worker/shard-consumer.go#L256 where getResp.NextShardIterator == nil is checked to identify a closed shard. Maybe that's the right place to update the chekpoint and set chk.SHARD_END.

AT_TIMESTAMP InitialPositionInStream does not work

// If there isn't any checkpoint for the shard, use the configuration value.
if shard.Checkpoint == "" {
  initPos := sc.kclConfig.InitialPositionInStream
  log.Debugf("No checkpoint recorded for shard: %v, starting with: %v", shard.ID,
    aws.StringValue(config.InitalPositionInStreamToShardIteratorType(initPos)))
  shardIterArgs := &kinesis.GetShardIteratorInput{
    ShardId:           &shard.ID,
    ShardIteratorType: config.InitalPositionInStreamToShardIteratorType(initPos),
    StreamName:        &sc.streamName,
  }
  iterResp, err := sc.kc.GetShardIterator(shardIterArgs)
  if err != nil {
    return nil, err
  }
  return iterResp.ShardIterator, nil
}

If no checkpoint is found and the InitialPositionInStream is AT_TIMESTAMP, GetShardIteratorInput should take the Timestamp argument, which in the above code snippet (from _ /clientlibrary/worker/shard-consumer.go_) is missing.

As a result when I use the library with the AT_TIMESTAMP config, I get the following error:

InvalidArgumentException: Must specify timestampInMillis parameter for iterator of type AT_TIMESTAMP. Current request has no timestamp parameter.\n\tstatus code: 400,

Error in getRecords leaves a dangling record processor.

If the AWS API call GetRecords fails in https://github.com/vmware/vmware-go-kcl/blob/master/clientlibrary/worker/shard-consumer.go#L207 (e.g., with a 500 which happens once in a while), the lease is released (by https://github.com/vmware/vmware-go-kcl/blob/master/clientlibrary/worker/shard-consumer.go#L142) and getRecords returns. Besides printing an error, nothing else happens in the caller (https://github.com/vmware/vmware-go-kcl/blob/master/clientlibrary/worker/worker.go#L309).

Now, after the wait period, the main loop finds a shard with no lease and gets the lease, starting a new ShardConsumer, which creates a new RecordProcessor and calls Initialize on it. However, nobody has called Shutdown on the old one.

The RecordProcessorFactory can't return the existing processor as it doesn't know the ShardID yet, so we're left with a processor that isn't properly terminated.

metrics: GetRecords time includes ProcessRecords time

Context:
In my use of KCL, I'm controlling the read throuput at shard level by performing a time.Sleep at the top of myrecordProcessor.ProcessRecords. This allows to not exceed the aws data limits described here.

By looking at the GetRecords and ProcessRecords on my metrics dashboard I was expecting to see the process_records_time metrics more or less equal to the sleep duration. That is the case.

What I didn't expect though was to see the get_records_time match the same value.

Looking at the code, I was excepting to have get_records_time be the time taken to run that code block:

getResp, err := sc.kc.GetRecords(getRecordsArgs)
if err != nil {
if awsErr, ok := err.(awserr.Error); ok {
if awsErr.Code() == kinesis.ErrCodeProvisionedThroughputExceededException || awsErr.Code() == ErrCodeKMSThrottlingException {
log.Errorf("Error getting records from shard %v: %+v", shard.ID, err)
retriedErrors++
// exponential backoff
// https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Programming.Errors.html#Programming.Errors.RetryAndBackoff
time.Sleep(time.Duration(math.Exp2(float64(retriedErrors))*100) * time.Millisecond)
continue
}
}
log.Errorf("Error getting records from Kinesis that cannot be retried: %+v Request: %s", err, getRecordsArgs)
return err
}

Instead getRecordsTime := time.Since(getRecordsStartTime) / 1000000 is computed after both calls to GetRecords and ProcessRecords.
Am I missing something here? Is this the expected behavior?

If not I can propose a PR for that.

Too many calls to DescribeStream

We recently faced a problem on our production cluster consuming a high-volume kinesis stream.
2 different services are consuming that stream. Each service is made of multiple workers.
One service is based off java KCL while the other one, ours, uses this library.

Both services saw an increase in LimitExceededException errors on the DescribeStream api call.

Error in DescribeStream: streamAAA Error: LimitExceededException: Rate exceeded for stream streamAAA under account BBBB.
	status code: 400, request id: xxxxx Request: {
  StreamName: "streamAAA"
}

The thing is there's a 10 api call per second hard limit imposed by AWS.

My understanding of the problem is the following:

Both services had been running for 2 weeks without problems but for some reason. At one moment though, planets were aligned so that nearly all calls to DescribeStream were performed around the same time, actually exceeding the 10 calls per second hard-limit.

We got a response directly from AWS support about that:

Unfortunately, the limit of 10 trasactions per second for 'Describestream' API call is a hard limit and cannot be increased. However, there are workarounds for the 'Describestream' API call.
1. You can use the APIs ListShards and DescribeStreamSummary together as the combination of these two APIs will give all the information given by the DescribeStream API.
Reference: ListShards: ://docs.aws.amazon.com/kinehttpssis/latest/APIReference/API_ListShards.html
2. If the above workaround does not match your use case, please implement an exponential backoff mechanism where you catch the LimitExceededException and retry the request or use a cache to decrease the number of calls to DescribeStream API.
Reference: 
A. Reference: Error Retries and Exponential Backoff in AWS: https://docs.aws.amazon.com/general/latest/gr/api-retries.html
B. DescribeStreamSummary: https://docs.aws.amazon.com/kinesis/latest/APIReference/API_DescribeStreamSummary.html
3. If you are making Describe Stream API call quite frequently, try modifying the application by making a call to describe stream, save it in some cache to reduce the frequency of API calls.

I think the 2 solution (exponential back-off on DescribeStream failure) as already implemented in

if awsErr.Code() == kinesis.ErrCodeProvisionedThroughputExceededException || awsErr.Code() == ErrCodeKMSThrottlingException {
log.Errorf("Error getting records from shard %v: %+v", shard.ID, err)
retriedErrors++
// exponential backoff
// https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Programming.Errors.html#Programming.Errors.RetryAndBackoff
time.Sleep(time.Duration(math.Exp2(float64(retriedErrors))*100) * time.Millisecond)
could be a good solution. Eventually the sleep duration could also be randomized a bit in order to spread calls across seconds even more.

Would an addition of such feature to vmware-go-kcl be acceptable?

Why a fork?

Hi I'm the original creator of https://github.com/patrobinson/gokini from which this project was forked. I'm wondering why it was forked and the changes not instead integrated upstream? I'm happy to collaborate on this together, I'm glad to see you've implemented dealing with shard splits and merges.

NewWorker Accept Checkpoint interface

The current NewWorker func automatically init the default Dynamo checkpoint. It would be more convenient if it accepts checkpoint interface as a input so that folks can inject customized checkpoint implementation. On the other hand, the checkpoint interface is defined to be private now. Any plan to add these supports so that people can implement their own checkpoint and inject to the worker?

AWS Go SDK V2

Now that the AWS Go SDK V2 is GA is there any plans to support V2 (via a new major version or the like)?

Tagged toolchain docker container vmware/go-kcl-toolchain:0.1.2 uses golang version 1.12.4

Cloning this repository and running hmake fails, because the errors.As method is undefined. This feature is only available in golang v1.13, but yet the docker container in dockerhub https://hub.docker.com/layers/vmware/go-kcl-toolchain/0.1.2/images/sha256-fe1cc8776970fdb644b376c5e31659537bc324f0c7fd81a9f899609fb0ef1a63?context=explorehttps://hub.docker.com/layers/vmware/go-kcl-toolchain/0.1.2/images/sha256-fe1cc8776970fdb644b376c5e31659537bc324f0c7fd81a9f899609fb0ef1a63?context=explore

shows ENV GOLANG_VERSION=1.12.4

running hmake rebuild-toolchain and then hmake does work, so either the image needs updating in dockerhub, or the README needs updating.

Option to assume role in AWS Kinesis client

Is there a way to assume a role (role_arn) and pass it to a new worker? In the best case, I would like to read the role_arn from a config file and then assume the role in Go. Without the assumed role, my current program cannot find the stream under the account because it is not pointing to the account of a third-party.

Shard consumer sometimes fails to recover from error refreshing lease

Running head of pusher/master (which is head of this repo master + both open PRs, which don't touch lease behaviour).

We see occasional throughput exceeded errors on our dynamodb lease table. Sometimes the shard consumer is restarted, other times the shard is simply left unconsumed.

Obviously, I can increase the provisioned throughput on the table to resolve the issue in the short term, however, I am uncomfortable leaving the issue in the code, as these are hardly the only way in which communication with dynamodb could fail and leave us with an unconsumed stream.

Logs (level: info) from a recent occurrence. First timestamp is in UTC+1 and is from elastic search. Included because it has more granularity. Extra line-breaks were added by me, just to group related timestamps for clarity. This is everything logged by the library during the life of the process. The process continued to run without logging anything from the library after this point.

14:58:31.288 time="2020-06-01T13:58:31Z" level=info msg="Creating DynamoDB session" component=kcl
14:58:31.288 time="2020-06-01T13:58:31Z" level=info msg="Worker initialization in progress..." component=kcl
14:58:31.288 time="2020-06-01T13:58:31Z" level=info msg="Worker initialization in progress..." component=kcl
14:58:31.288 time="2020-06-01T13:58:31Z" level=info msg="Initializing Checkpointer" component=kcl
14:58:31.288 time="2020-06-01T13:58:31Z" level=info msg="Creating DynamoDB based checkpointer" component=kcl
14:58:31.288 time="2020-06-01T13:58:31Z" level=info msg="Creating Kinesis session" component=kcl
14:58:33.124 time="2020-06-01T13:58:33Z" level=info msg="Initialization complete." component=kcl
14:58:33.124 time="2020-06-01T13:58:33Z" level=info msg="Starting worker event loop." component=kcl
14:58:33.124 time="2020-06-01T13:58:33Z" level=info msg="Starting monitoring service." component=kcl
14:58:33.171 time="2020-06-01T13:58:33Z" level=info msg="Found 4 shards" component=kcl
14:58:33.171 time="2020-06-01T13:58:33Z" level=info msg="Found new shard with id shardId-000000000006" component=kcl
14:58:33.171 time="2020-06-01T13:58:33Z" level=info msg="Found new shard with id shardId-000000000004" component=kcl
14:58:33.171 time="2020-06-01T13:58:33Z" level=info msg="Found new shard with id shardId-000000000003" component=kcl
14:58:33.171 time="2020-06-01T13:58:33Z" level=info msg="Found new shard with id shardId-000000000005" component=kcl
14:58:33.207 time="2020-06-01T13:58:33Z" level=info msg="Start Shard Consumer for shard: shardId-000000000004" component=kcl
14:58:39.011 time="2020-06-01T13:58:39Z" level=info msg="Start Shard Consumer for shard: shardId-000000000003" component=kcl
14:58:45.033 time="2020-06-01T13:58:45Z" level=info msg="Start Shard Consumer for shard: shardId-000000000005" component=kcl
14:58:54.004 time="2020-06-01T13:58:54Z" level=info msg="Start Shard Consumer for shard: shardId-000000000006" component=kcl

18:18:25.160 time="2020-06-01T17:18:25Z" level=error msg="Error in refreshing lease on shard: shardId-000000000006 for worker: f9ef13cc-a40f-11ea-9602-0a580a020807. Error: ProvisionedThroughputExceededException: The level of configured provisioned throughput for the table was exceeded. Consider increasing your provisioning level with the UpdateTable API.\n\tstatus code: 400, request id: L2C5UCK9A01ULRHMEDEVJ3N7P7VV4KQNSO5AEMVJF66Q9ASUAAJG" component=kcl
18:18:25.160 time="2020-06-01T17:18:25Z" level=info msg="Release lease for shard shardId-000000000006" component=kcl
18:18:26.040 time="2020-06-01T17:18:26Z" level=info msg="Start Shard Consumer for shard: shardId-000000000006" component=kcl
18:18:27.620 time="2020-06-01T17:18:27Z" level=error msg="Error in getRecords: ProvisionedThroughputExceededException: The level of configured provisioned throughput for the table was exceeded. Consider increasing your provisioning level with the UpdateTable API.\n\tstatus code: 400, request id: L2C5UCK9A01ULRHMEDEVJ3N7P7VV4KQNSO5AEMVJF66Q9ASUAAJG" component=kcl

19:07:24.839 time="2020-06-01T18:07:24Z" level=info msg="Release lease for shard shardId-000000000005" component=kcl
19:07:24.839 time="2020-06-01T18:07:24Z" level=error msg="Error in refreshing lease on shard: shardId-000000000005 for worker: f9ef13cc-a40f-11ea-9602-0a580a020807. Error: ProvisionedThroughputExceededException: The level of configured provisioned throughput for the table was exceeded. Consider increasing your provisioning level with the UpdateTable API.\n\tstatus code: 400, request id: O67L5BM9AFTCCGB0S8TCAQU58FVV4KQNSO5AEMVJF66Q9ASUAAJG" component=kcl
19:07:29.787 time="2020-06-01T18:07:29Z" level=info msg="Start Shard Consumer for shard: shardId-000000000005" component=kcl
19:07:30.495 time="2020-06-01T18:07:30Z" level=error msg="Error in getRecords: ProvisionedThroughputExceededException: The level of configured provisioned throughput for the table was exceeded. Consider increasing your provisioning level with the UpdateTable API.\n\tstatus code: 400, request id: O67L5BM9AFTCCGB0S8TCAQU58FVV4KQNSO5AEMVJF66Q9ASUAAJG" component=kcl

20:26:55.975 time="2020-06-01T19:26:55Z" level=error msg="Error in refreshing lease on shard: shardId-000000000006 for worker: f9ef13cc-a40f-11ea-9602-0a580a020807. Error: ProvisionedThroughputExceededException: The level of configured provisioned throughput for the table was exceeded. Consider increasing your provisioning level with the UpdateTable API.\n\tstatus code: 400, request id: JOCJLCFB9O0HQFKL0Q3ITAC2NBVV4KQNSO5AEMVJF66Q9ASUAAJG" component=kcl
20:26:55.975 time="2020-06-01T19:26:55Z" level=info msg="Release lease for shard shardId-000000000006" component=kcl
20:26:58.186 time="2020-06-01T19:26:58Z" level=error msg="Error in getRecords: ProvisionedThroughputExceededException: The level of configured provisioned throughput for the table was exceeded. Consider increasing your provisioning level with the UpdateTable API.\n\tstatus code: 400, request id: JOCJLCFB9O0HQFKL0Q3ITAC2NBVV4KQNSO5AEMVJF66Q9ASUAAJG" component=kcl

I am continuing to debug the issue, but wanted to open this here to track the investigation.

Possibility to Work w/ DynamoDB Streams

Hi,

I have a question regarding the usage, do we have a way to process data from DynamoDB Streams, like what is mentioned in https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.KCLAdapter.html, with the help of this library? From what I have researched, it looks like we would need an equivalent Go library for Kinesis Adapter so that then we can work with DynamoDB Streams. Anyway I still would like to post this question to get more ideas.

Thanks,
Zhen

DynamoCheckpoint does not have a way to set "skipTableCheck"

Source: https://github.com/vmware/vmware-go-kcl/blob/master/clientlibrary/checkpoint/dynamodb-checkpointer.go

There is a parameter (skipTableCheck) that allows the DynamoCheckpoint's Init() method to skip the table check. This parameter cannot be set.

It would be great if we can add a setter to skip the table check so there can be the option for callers to provide their own provisioned DynamoDB table instead of going through the create table workflow and making an extra call to the DynamoDB API.

Need clarification

Hi,
Is there support for re-sharding. i,e when merge and split shards happens is processing from those shards handled seamlessly.

Checkpointer interface breaking change in v1.3.0

Describe the bug

The Checkpointer interface was updated in v1.3.0 to include the following two methods:

ListActiveWorkers(map[string]*par.ShardStatus) (map[string][]*par.ShardStatus, error)
ClaimShard(*par.ShardStatus, string) error

We have a custom Postgres Checkpointer implementation and the minor version bump broke our build. We believe this change should either have been an optional interface, i.e. ShardStealingCheckpointer with only the two new methods, or a major version bump. If the optional interface would be an acceptable solution, we are happy to contribute a PR.

As far as I can tell, it's sufficient for now for us to simply return a "not implemented" error as stealShard will never be true unless EnableLeaseStealing is also true, and we are not currently using this feature, but I just wanted to confirm that this was something we could expect.

As an aside, I notice there is already a DynamoDB implementation in the repo. We would be happy to flesh out the missing methods and contribute our Postgres Checkpointer in a PR at some point if that would be something you'd see value in.

Reproduction steps

Implement the Checkpointer interface prior to v1.3.0, update to v1.3.0.

Expected behavior

Minor version bumps should not break APIs.

Additional context

No response

worker.go:339 Error in getRecords: unexpected EOF

Describe the bug

In the event loop in worker.go, the lease is grabbed and a consumer is made, however getRecords returns an unexpected EOF.

This has only happened once in the last couple of days and is non fatal but am curious.

I've took a look through the aws sdk, and i've ##only found unexpected EOF when dealing with s3.

Any reasons or leads on why this occurs.

Reproduction steps

  1. Consume from a stream using vmware-go-kcl
  2. Restart your worker suddenly

...

Expected behavior

Since this was so rare, i dont expect this error to reproduce, but perhaps it will, so you should see the above error.

Additional context

No response

Record duplication?

I'm trying to debug a duplicated records problem. Amount is from 0% (for low traffic streams) to 5% on some high throughput streams. I'm sure that it's not coming from the producer side because other consumers of the same streams (using Java KCL) do not experience that problem.
Any thoughts on this?

Workload skewed across workers

Describe the bug

I have 20vmware-go-kcl based workers, with a sane amount of memory per each (12Gi). Processing data from the stream was able to catch up decently quickly, but the workload is extremely skewed. Some workers are working way overtime, and some are doing basically nothing. See this as an example of memory usage between them. That also “makes sense” because I see in the dynamo checkpoints table some pods are managing 14-16 shards, and some only 1-3…

How can I get this to run more equally between workers?

Reproduction steps

  1. Start with multiple kinesis shards, I had 160
  2. Start multiple workers, I had 20
  3. Use the monitoring tool of your choice to check metrics and see some workers processing much more records than others
    a. image
  4. Compare AssignedTo entries in the dynamo checkpoint table, and see that there are some workers that have many shards assigned to them, and some workers that have only a few.

Expected behavior

Workers start and get assigned shards to work on.
When new assignments are made, the amount of shards each worker is working on stays at a comparable level, or even better as even as possible.
In my situation with 160 shards and 20 workers, each worker should manage exactly 8 shards.

Additional context

No response

Add go mod support and release.

Hi,
I am thinking of trying this package. Can the team let me know if this package is production ready?
Also if possible add go mod support.

Thanks,

(Still) Too many calls to DescribeStream

I don't believe you guys should consider #61 closed. DescribeStream is simply the wrong thing to use here. This is directly against AWS recommendations. That call has a hard limit for a reason.

The response @arl received from AWS mirrors the one my team received.

You can use the APIs ListShards and DescribeStreamSummary together as the combination of these two APIs will give all the information given by the DescribeStream API.
Reference: ListShards: ://docs.aws.amazon.com/kinehttpssis/latest/APIReference/API_ListShards.html

Our particular setup is dealing with billions of stream events daily and our individual consumer process count is high. No amount of jitter is going to keep my team from throttling hard on this call and I suspect we're not in a unique position.

Our dev team is looking at patching this internally and we'll attempt to PR when we do, but any insight from the maintainers may be helpful. Having looked at the code, I can see what information is required but I can't get a sense for the cadence/frequency that each piece (stream status, shard list, etc.) is required. The ListShards api has a high rate limit, but even the DescribeStreamSummary api is still capped at 20 per second. So, it may be that this library would be better served requesting each separately at different frequencies if possible.

Recording a lot of ProvisionedThroughputExceededException

Been recording a lot of Error getting records from shard ProvisionedThroughputExceededException. Wondering if this is something to do with describing the stream for shard discovery in the event loop? We are using many streams associated with the single AWS account. Wouldnt make more sense to set a single shard id for each consumer?

hmake test gives Credential Chain error

[test] worker_test.go:77: Errorin Publish. NoCredentialProviders: no valid providers in chain. Deprecated.
[test] For verbose messaging see aws.Config.CredentialsChainVerboseErrors

aws ec2 describe-instances works without issue however from the same system

New dependencies due to the addition of logger/zap.go

Since #27 vmware-go-kcl has support the any logger interface, which is very nice.

However due to the fact that logger/zap.go directly imports zap. zap is now a dependency of whoever uses vmware-go-kcl.

Just moving logger/zap.go into another subpackage (i.e logger/zap/zap.go) would do the trick as anybody that is not explicitely importing the logger/zap package would not have to have it as a dependency of its project. To make things worse, uber zap already has quite a lot of dependencies

How to get the StreamName/ConsumerName inside ProcessRecords method

Is your feature request related to a problem? Please describe.

This is a simple question: How to retrieve the StreamName/ConsumerName inside ProcessRecords method , I am unable to find a way. any help would be highly appreciated.

Describe the solution you'd like

This is a simple question: How to retrieve the StreamName/ConsumerName inside ProcessRecords method , I am unable to find a way. any help would be highly appreciated.

Describe alternatives you've considered

No response

Additional context

No response

Possible Data Race for input Checkpointer

I'm occasionally getting a data race on the checkpoint.

It is read in the worker.eventloop() here:

if shard.GetLeaseOwner() == w.workerID && shard.Checkpoint != chk.SHARD_END {
and it's written in our implementation of ProcessRecords(input *interfaces.ProcessRecordsInput) which is almost the same as
input.Checkpointer.Checkpoint(lastRecordSequenceNubmer)

The stack is:

Write at 0x00c0000fbd20 by goroutine 85:
  github.com/vmware/vmware-go-kcl/clientlibrary/worker.(*RecordProcessorCheckpointer).Checkpoint()
      myProject/vendor/github.com/vmware/vmware-go-kcl/clientlibrary/worker/record-processor-checkpointer.go:73 +0xc8
  myProject/input.(*recordProcessor).ProcessRecords()
      myProject/input/kcl.go:370 +0x6b0
  github.com/vmware/vmware-go-kcl/clientlibrary/worker.(*ShardConsumer).getRecords()
      myProject/vendor/github.com/vmware/vmware-go-kcl/clientlibrary/worker/shard-consumer.go:237 +0x1a7f
  github.com/vmware/vmware-go-kcl/clientlibrary/worker.(*Worker).eventLoop.func1()
      myProject/vendor/github.com/vmware/vmware-go-kcl/clientlibrary/worker/worker.go:308 +0xcc

Previous read at 0x00c0000fbd20 by goroutine 59:
  github.com/vmware/vmware-go-kcl/clientlibrary/worker.(*Worker).eventLoop()
      myProject/vendor/github.com/vmware/vmware-go-kcl/clientlibrary/worker/worker.go:263 +0x4eb
  github.com/vmware/vmware-go-kcl/clientlibrary/worker.(*Worker).Start.func1()
      myProject/vendor/github.com/vmware/vmware-go-kcl/clientlibrary/worker/worker.go:128 +0x8a

Line 370 of kcl.go (the writer) is:

func (p *recordProcessor) ProcessRecords(input *interfaces.ProcessRecordsInput) {
	// Skip if no records
	if len(input.Records) == 0 {
		log.Debug("No records to process")
		return
	}

	for _, v := range input.Records {
		// Process records
	}

	// Checkpoint it after processing this batch
	lastRecordSequenceNumber := input.Records[len(input.Records)-1].SequenceNumber
	input.Checkpointer.Checkpoint(lastRecordSequenceNumber) // <---- HERE
}

I think we're correctly checkpointing at the end of ProcessRecords, I don't think we're introducing the data race with a misuse of that call. Am I wrong?

Expose a logger interface

Logging is not standard in golang, but forcing any logger on lib users is usually not considered good practice. It would be awesome if we could have an interface instead!

Compilation Error

I'm surprised this isn't building, but I'm getting

github.com/vmware/vmware-go-kcl/clientlibrary/worker

..\github.com\vmware\vmware-go-kcl\clientlibrary\worker\shard-consumer.go:186:40: cannot use retriedErrors (type int) as type float64 in argument to math.Exp2

go version
go version go1.11.1 windows/amd64

zb.

Multiple consumers processing records from the same shard.

We are using v1.3 with following kcl config
KclConf := &kclConfig.KinesisClientLibConfiguration{
ApplicationName: conf.Name,
RegionName: conf.RegionName,
TableName: conf.CheckpointTableName,
StreamName: conf.StreamName,
FailoverTimeMillis: conf.FailoverTimeMillis,
MaxRecords: conf.MaxRecords,
ShardSyncIntervalMillis: conf.ShardsSyncIntervalMillis,
MaxLeasesForWorker: conf.ShardsPerContainer,
InitialPositionInStream: kclConfig.InitialPositionInStream(conf.ShardInitialPosition),
InitialLeaseTableReadCapacity: 10,
InitialLeaseTableWriteCapacity: 10,
Logger: logger.NewKclLogger(log),
WorkerID: uuid.New().String(),
MonitoringService: &KclMonitoringService{statsReporter: app.MetricsRegistry},
}

Config Properties

name: "last-contact-time-consumer"
cluster-name: "minikube"
shards-per-container: 1
max-records: 1000
shard-sync-interval-millis: 5000
fail-over-time-millis: 300000
shard-initial-position: 1
checkpoint-table-name: "lct-checkpoint"
aws-region: "us-east-1"
lct-stream-name: "device-last-contact-time"

Unsupported protocol scheme

Describe the bug

Getting an error after switching to the v2 version if the library.

time="2023-03-29T19:45:33Z" level=error msg="Could not describe stream: operation error Kinesis: DescribeStream, exceeded maximum number of attempts, 3, https response error StatusCode: 0, RequestID: , request send failed, Post \"/\": unsupported protocol scheme \"\""

Reproduction steps

  1. Switched to the v2 version of the library.
  2. Deployed to consumers
  3. Getting an error message from consumers.

Expected behavior

No errors in the logs. KCL successfully subscribes to streams.

Additional context

No response

Lease release on shutdown

When the worker terminates properly (via the Shutdown interface), it should release the lease it has in DynamoDB so other workers can pick it up. This is even more an issue since there is no lease stealing.

Lease stealing code exception

1639131792488_5DE9572C-4145-4d34-9284-D13083D144FD

During the lease theft test, I found that the theft was abnormal. Originally, a work could only steal one lease at a time, but there was a problem of stealing multiple leases here. By checking the code, I found some problems in the following codes

demo

In the code, in order to ensure that only one lease is stolen at a time, if there is a claimrequest, it will not be stolen again. The problem is that there is no assignment to claimrequest, and the value of claimrequest is not stored in the local shard. The judgment is always false.

Allow passing of AWS session

I think it's generally a good idea to allow people to provide a full AWS session. I have already forked the project to work on that, but I just want to make sure this is ok with your vision of the project.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.