Giter Club home page Giter Club logo

reactive-kinesis's Introduction

reactive-kinesis Build Status Coverage Status

Kinesis client built upon Amazon's KCL (Kinesis Client Library) & KPL (Kinesis Producer Library).

It's worth familiarising yourself with Sequence numbers and Sub sequence numbers.

Contents

Dependency Resolution

SBT

"com.weightwatchers" %% "reactive-kinesis" % "0.5.5"

Maven 2.11

<dependency>
  <groupId>com.weightwatchers</groupId>
  <artifactId>reactive-kinesis_2.11</artifactId>
  <version>0.5.5</version>
  <type>pom</type>
</dependency>

Maven 2.12

<dependency>
  <groupId>com.weightwatchers</groupId>
  <artifactId>reactive-kinesis_2.12</artifactId>
  <version>0.5.5</version>
  <type>pom</type>
</dependency>

Snapshots will be published here

You will need the following resolver for snapshots: https://oss.jfrog.org/artifactory/oss-snapshot-local

Considerations When Using Kinesis in a Distributed Environment

Required Minimum Sharding for Read-Based Applications

From http://docs.aws.amazon.com/streams/latest/dev/kinesis-record-processor-scaling.html:

Typically, when you use the KCL, you should ensure that the number of instances does not exceed the number of shards (except for failure standby purposes). Each shard is processed by exactly one KCL worker and has exactly one corresponding record processor, so you never need multiple instances to process one shard. However, one worker can process any number of shards, so it's fine if the number of shards exceeds the number of instances.

For our purposes, this means any service reading from Kinesis should expect to have one shard per instance, as a minimum. Note that this is specifically for consuming events. Producers don't have the same shard restrictions.

DynamoDB Checkpoint Storage

Amazon's KCL uses DynamoDB to checkpoint progress through reading the stream. When DynamoDB tables are provisioned automatically, for this purpose, they may have a relatively high write-throughput, which can incur additional cost.

You should make sure that the DynamoDB table used for checkpointing your stream

  1. Has a reasonable write throughput defined

  2. Is cleaned up when you're done with it -- KCL will not automatically delete it for you

The checkpointer will automatically throttle if the write throughput is not sufficient, look out for the following info log:

Throttled by DynamoDB on checkpointing -- backing off...

Usage

Kinesis streams and auth

The stream you have configured must already exist in AWS, e.g.

aws kinesis create-stream --stream-name core-test-foo --shard-count 1

For local development, it's expected that you already have a file called ~/.aws/credentials, which contains an AWS access key and secret e.g.

[default]
aws_access_key_id=AKIAXXXXXXXXX999999X
aws_secret_access_key=AAAAAAAAAAAA000000000+AAAAAAAAAAAAAAAAAA

Both the producer and consumer use the DefaultAWSCredentialsProviderChain.

Defining a config file in the client application

You'll need some configuration values provided in the application which leverages this library, As a minimum you'll need:

kinesis {

   application-name = "SampleService"

   # The name of the this producer, we can have many producers per application.
   # MUST contain the stream-name as a minimum. Any additional settings defined will override
   # defaults in the kinesis.producer reference.conf for this producer only.
   some-producer {
      # The name of the producer stream
      stream-name = "sample-producer"

      kpl {
         Region = us-east-1
      }
   }

   # The name of the consumer, we can have many consumers per application
   some-consumer {
      # The name of the consumer stream, MUST be specified per consumer and MUST exist
      stream-name = "sample-consumer"
   }

   some-other-consumer {
      stream-name = "another-sample-consumer"
   }
}

These values will override the default reference.conf. See src/main/resources/reference.conf for a complete reference configuration. and src/it/resources/application.conf for a more detailed override example.

The name of the producer/consumer configuration value MUST match what is specified when instantiating the library. This will be merged with the default-consumer/default-producer from reference.confconfiguration at runtime.

Once these are defined, you can pass them into the Kinesis producer and consumer using a config object (see code examples below).

Note that the application-name is combined with the stream-name for each consumer to define the DynamoDB table for checkpointing. For example: SampleService-sample-consumer.

Notable Consumer Configuration Values

  • kinesis.<consumer-name>.akka.dispatcher - Sets the dispatcher for the consumer, defaults to kinesis.akka.default-dispatcher
  • kinesis.<consumer-name>.worker.batchTimeoutSeconds - The timeout for processing a batch. Note that any messages not processed within this time will be retried (according to the configuration). After retrying any unconfirmed messages will be considered failed.
  • kinesis.<consumer-name>.worker.failedMessageRetries - The number of times to retry failed messages within a batch, after the batch timeout.
  • kinesis.<consumer-name>.worker.failureTolerancePercentage - If, after retrying, messages are still unconfirmed. We will either continue processing the next batch, or shutdown processing completely depending on this tolerance percentage.
  • kinesis.<consumer-name>.checkpointer.backoffMillis - When DynamoDB throttles us (due to hitting the write threshold) we wait for this amount of time.
  • kinesis.<consumer-name>.checkpointer.intervalMillis - The interval between checkpoints. Setting this too high will cause lots of messages to be duplicated in event of a failed node. Setting it too low will result in throttling from DynamoDB.
  • kinesis.<consumer-name>.kcl.initialPositionInStream - Controls our strategy for pulling from Kinesis (LATEST, TRIM_HORIZON, ..)
  • kinesis.<consumer-name>.kcl.maxRecords - The maximum batch size.

Notable Producer Configuration Values

  • kinesis.<producer-name>.akka.dispatcher - Sets the dispatcher for the producer, defaults to kinesis.akka.default-dispatcher
  • kinesis.<producer-name>.akka.max-outstanding-requests - Enables artificial throttling within the Producer. This limits the number of futures in play at any one time. Each message creates a new future (internally in the KPL), which allows us to track the progress of sent messages when they go with the next batch.
  • kinesis.<producer-name>.akka.throttling-retry-millis - How soon to retry after hitting the above throttling cap.
  • kinesis.<producer-name>.kpl.AggregationEnabled - Enables aggregation of messages.
  • kinesis.<producer-name>.kpl.Region - The AWS Region to use.
  • kinesis.<producer-name>.kpl.RateLimit - Limits the maximum allowed put rate for a shard, as a percentage of the backend limits.

Typed Configuration - Producer

If you don't want to depend on config files, there's a typed configuration class available: KinesisProducerConfig

You can construct it in a few ways:

// With default values
val defaultProducerConfig = KinesisProducerConfig()

// With a provided KinesisProducerConfiguration from the Java KPL library
val awsKinesisConfig: KinesisProducerConfiguration = ...
val producerConfig = KinesisProducerConfig(awsKinesisConfig)

// With a typesafe-config object
val typesafeConfig: Config = ...
val producerConfig = KinesisProducerConfig(typesafeConfig)

// With a typesafe-config object and an AWSCredentialsProvider
val typesafeConfig: Config = ...
val credentialsProvider: AWSCredentialsProvider = ...
val producerConfig = KinesisProducerConfig(typesafeConfig, credentialsProvider)

These can be used to create a ProducerConf and ultimately a KinesisProducer, like so:

val producerConfig: KinesisProducerConfig = ...
val producerConf: ProducerConf = ProducerConf(producerConfig, "my-stream-name", None, None)

Usage: Consumer

reactive-kinesis provides two different ways to consume messages from Kinesis: Actor Based Consumer and Akka Stream Source.

Consumer Architecture

Actor Based Consumer

Implementing the consumer requires a simple actor which is responsible for processing messages sent to it by the library. We call this the Event Processor. Upon creating an instance of the KinesisConsumer, internally one ConsumerWorker (this is different from the KCL Worker) is created per shard (shards are distributed amongst consumers automatically). This consumer worker is what sends messages to the Event Processor. Note that the Event Processor is shared amongst ALL shards, so it is important not to cache the sender of previous messages. It is perfectly valid to use a router to spread the work amongst many Event Processor actors.

import akka.actor.{Actor, Props}
import com.typesafe.config.ConfigFactory
import com.typesafe.scalalogging.LazyLogging
import com.weightwatchers.reactive.kinesis.consumer.ConsumerWorker.{ConsumerShutdown, ConsumerWorkerFailure, EventProcessed, ProcessEvent}
import com.weightwatchers.reactive.kinesis.consumer.KinesisConsumer
import com.weightwatchers.reactive.kinesis.consumer.KinesisConsumer.ConsumerConf

class TestEventProcessor() extends Actor with LazyLogging {

  import scala.concurrent.duration._

  implicit val timeout = akka.util.Timeout(5.minutes)


  // scalastyle:off method.length
  override def receive: Receive = {

    case ProcessEvent(event) => {
      //Do some processing here...
      sender ! EventProcessed(event.sequenceNumber)
    }
    case ConsumerWorkerFailure(failedEvents, shardId) =>
    // Consumer failure, no more messages will be consumed!! Depending on the purpose of this service this may be critical.

    case ConsumerShutdown(shardId) =>
    // The Consumer has shutdown all shards on this instance (gracefully, or as a result of a failure).
  }
}

object Consumer extends App {
  val system = akka.actor.ActorSystem.create("test-system")
  val config = ConfigFactory.load()
  val eventProcessor = system.actorOf(Props[TestEventProcessor], "test-processor")
  val consumer = KinesisConsumer(ConsumerConf(config.getConfig("kinesis"), "some-consumer"),
                                 eventProcessor, system)
  consumer.start()
}

The following types must be handled:

import com.weightwatchers.reactive.kinesis.consumer.ConsumerWorker.{ConsumerShutdown, ConsumerWorkerFailure, EventProcessed, ProcessEvent}

/**
  * Sent to the eventProcessor for each message in the batch.
  */
  case class ProcessEvent(consumerEvent: ConsumerEvent)

/**
  * Expected in response to a [[ProcessEvent]] message after processing is complete
  *
  * @param compoundSeqNo This is a combination of the sequence and subsequence numbers
  * @param successful    Set this to false to skip this message.
  */
case class EventProcessed(compoundSeqNo: CompoundSequenceNumber, successful: Boolean = true)

/**
  * Sent to the eventProcessor if batch processing fails (above the tolerance after retrying)
  * before shutting down processing on this shard.
  * @param failedEvents The events that failed processing within the time.
  * @param shardId      The shardId of the worker causing the failure.
  */
case class ConsumerWorkerFailure(failedEvents: Seq[ConsumerEvent], shardId: String)

/**
  * Sent to the eventProcessor upon shutdown of this worker.
  */
case class ConsumerShutdown(shardId: String)

Important considerations when implementing the Event Processor

  • The Event Processor MUST handle [[ProcessEvent]] messages (for each message)
  • The Event Processor MUST respond with [[EventProcessed]] after processing of the [[ProcessEvent]]
  • The Event Processor may set successful to false to indicate the message can be skipped
  • The Event Processor SHOULD handle [[ConsumerWorkerFailure]] messages which signal a critical failure in the Consumer.
  • The Event Processor SHOULD handle [[ConsumerShutdown]] messages which siganl a graceful shutdown of the Consumer.

Checkpointing

The client will handle checkpointing asynchronously PER SHARD according to the configuration using a separate actor.

Akka Stream Source

An Akka Source is provided that can be used with streams. It is possible to create a source from a ConsumerConf or directly from the consumer name that is defined in the configuration. Every message that is emitted to the stream is of type CommitableEvent[ConsumerEvent] and has to be committed explicitly downstream with a call to event.commit(). It is possible to map to a different type of CommittableEvent via the map and mapAsync functionality. A KinesisConsumer is created internally for the Kinesis.source, when the factory method isn't defined.

import com.weightwatchers.reactive.kinesis.stream._

Kinesis
  .source("consumer-name")
  .take(100)
  .map(event => event.map(_.payloadAsString())) // read and map payload as string
  .mapAsync(10)(event => event.mapAsync(Downloader.download(event.payload))) // handle an async message
  .map(event => event.commit()) // mark the event as handled by calling commit
  .runWith(Sink.seq) 

Or you can explicitly pass a lambda, to create the KinesisConsumer. You can save a reference to this KinesisConsumer and use it to manually shutdown your consumer when needed.

  import akka.actor.ActorSystem
  import akka.stream.scaladsl.Sink
  import com.weightwatchers.reactive.kinesis.consumer.KinesisConsumer
  import com.weightwatchers.reactive.kinesis.stream._


  implicit val sys = ActorSystem("kinesis-consumer-system")
  implicit val materializer: Materializer = ActorMaterializer()
  import sys.dispatcher

  var consumer = Option.empty[KinesisConsumer]

  Kinesis.source(
    consumerName = "some-consumer",
    createConsumer = (conf, ref) => {
      val c = KinesisConsumer(conf, ref, sys)
      consumer = Option(c)
      c
    })
    .take(100)
    .map(event => event.map(_.payloadAsString()))
    .mapAsync(10)(event => event.mapAsync(Downloader.download(event.payload)))
    .map(event => event.commit())
    .runWith(Sink.seq)

  consumer.foreach { c =>
    c.stop()
  }

All rules described here for the KinesisConsumer also apply for the stream source.

Graceful Shutdown

Currently the KinesisConsumer Shutdown works as follows:

  • stop() is called on the KinesisConsumer (either explicitly or via the jvm shutdown hook)
  • This then calls requestShutdown on the KCL Worker, blocking until completion.
  • The KCL Worker propagates this down to the ConsumerProcessingManager (Which is the IRecordProcessor) - calling shutdownRequested on each instance (one per shard).
  • When shutdownRequested is called, this sends a GracefulShutdown message to the ConsumerWorker Actor, blocking until a response is received (Ask + Await).
  • On receipt of this message, the ConsumerWorker switches context to ignore all future messages. If a batch is currently being processed, it responds to the sender of that batch (the manager), which will currently be blocking awaiting confirmation of the batch (this is by design, the KCL requires that we don't complete the processRecords function until we have finished the batch, otherwise the next batch is immediately sent)
  • The ConsumerWorker then forces a final checkpoint, responding to the manager once completed (or failed), which allows shutdown to continue and the KinesisConsumer to shutdown.
  • The shutdown timeout is configured by: kinesis.<consumer-name>.worker.shutdownTimeoutSeconds
  • The shutdown hooks can be disabled using: kinesis.<consumer-name>.worker.gracefulShutdownHook

Usage: Producer

The KPL Client sends messages in batches, each message creates a Future which completes upon successful send or failure.

See Amazon's documentation for more information: https://github.com/awslabs/amazon-kinesis-producer

Actor Based Implementation

This implementation will optionally throttle of the number of futures currently in play, according to the max-outstanding-requests property.

Each future is handled within the actor and a message will be returned to the sender upon completion.

The following messages are supported:

import com.weightwatchers.reactive.kinesis.producer.KinesisProducerActor.{SendFailed, SendSuccessful, SendWithCallback}

/**
  * Send a message to Kinesis, registering a callback response of [[SendSuccessful]] or [[SendFailed]] accordingly.
  */
case class SendWithCallback(producerEvent: ProducerEvent, messageId: String = UUID_GENERATOR.generate().toString)

/**
  * Send a message to Kinesis witout any callbacks. Fire and forget.
  */
case class Send(producerEvent: ProducerEvent)

/**
  * Sent to the sender in event of a successful completion.
  *
  * @param messageId        The id of the event that was sent.
  * @param userRecordResult The Kinesis data regarding the send.
  */
case class SendSuccessful(messageId: String, userRecordResult: UserRecordResult)

/**
    * Sent to the sender in event of a failed completion.
    *
    * @param event     The current event that failed.
    * @param messageId The id of the event that failed.
    * @param reason    The exception causing the failure.
    */
case class SendFailed(event: ProducerEvent, messageId: String, reason: Throwable)

Within an Actor (Strongly recommended)

import java.util.UUID

import akka.actor.Actor
import com.typesafe.config.Config
import com.weightwatchers.reactive.kinesis.models.ProducerEvent
import com.weightwatchers.reactive.kinesis.producer.KinesisProducerActor
import com.weightwatchers.reactive.kinesis.producer.KinesisProducerActor.{SendFailed, SendSuccessful, SendWithCallback}
import samples.SomeActor.DoSomething

object SomeActor {
  case object DoSomething
}

class SomeActor(kinesisConfig: Config) extends Actor {

  val kpa = context.actorOf(
    KinesisProducerActor.props(kinesisConfig, "some-producer"))

  override def receive: Receive = {
    case DoSomething =>
      //Do something exciting!
      val producerEvent = ProducerEvent(UUID.randomUUID.toString, "{Some Payload}")
      kpa ! SendWithCallback(producerEvent)

    //Callbacks from the KinesisProducerActor
    case SendSuccessful(messageId, _) =>
      println(s"Successfully sent $messageId")

    case SendFailed(event, messageId, reason) =>
      println(s"Failed to send event ${event.partitionKey} with $messageId, cause: ${reason.getMessage}")
  }
}

Scheme for kinesis retries in services

Kinesis Retry model with service DB:

  • The service itself manage their kinesis failures.
  • Practical, develop only in service.
  • Save time for learning and integration with other technology.

Algorithm steps to publish event to stream:
if sendSuccessful
    continue
if sendFailed
    save event in db
if backgroundJob.connect(stream).isSuccessful
    go to step1
    delete event from DB 
else
    do nothing.

Kinesis retries diagram scheme

From outside of an Actor

import java.util.UUID
import com.typesafe.config._
import com.weightwatchers.reactive.kinesis.models._
import com.weightwatchers.reactive.kinesis.producer.KinesisProducerActor
import com.weightwatchers.reactive.kinesis.producer.KinesisProducerActor.Send

implicit val system = akka.actor.ActorSystem.create()

val kinesisConfig: Config = ConfigFactory.load().getConfig("kinesis")

// where testProducer is the name in the configuration
val kpa = system.actorOf(KinesisProducerActor.props(kinesisConfig, "some-producer"))

val producerEvent = ProducerEvent(UUID.randomUUID.toString, "{Some Payload}")
kpa ! Send(producerEvent) //Send without a callback confirmation

Pure Scala based implementation (simple wrapper around KPL)

Note that future throttling will be unavailable using this method.

import java.util.UUID
import com.amazonaws.services.kinesis.producer.{UserRecordFailedException, UserRecordResult}
import com.weightwatchers.reactive.kinesis.producer.KinesisProducer
import com.weightwatchers.reactive.kinesis.producer.ProducerConf
import com.typesafe.config._
import com.weightwatchers.reactive.kinesis.models._
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global //Not for production

val kinesisConfig: Config = ConfigFactory.load().getConfig("kinesis")

val kpl = KinesisProducer(ProducerConf(kinesisConfig, "some-producer"))

val producerEvent = ProducerEvent(UUID.randomUUID.toString, "{Some Payload}")

val callback: Future[UserRecordResult] = kpl.addUserRecord(producerEvent)

callback onSuccess {
  case result =>
    println("Success!!")
}

callback onFailure {
  case ex: UserRecordFailedException =>
    println(s"Failure! ${ex.getMessage}")
  case ex =>
    println(s"Critical Failure! ${ex.getMessage}")
}

Akka Stream Sink

An Akka Sink is provided which can be used to publish messages via streams. Every message is sent as ProduserEvent to the Sink, which defines the PartitionKey as well as the payload. The Sink is created from a ProducerConf or directly with a KinesisProducerActor. See Kinesis for the various options.

The Sink expects an acknowledgement for every message sent to Kinesis. An amount of unacknowledged messages can be configured, before back pressure is applied. This throttling is controlled by the kinesis.{producer}.akka.max-outstanding-requests configuration value. Please note: a default value (1000 messages) is applied, if throttling is not configured.

The provided Sink produces a Future[Done] as materialized value. This future succeeds, if all messages from upstream are sent to Kinesis and acknowledged. It fails if a message could not be send to Kinesis or upstream fails.

import akka.stream.scaladsl.Source
import com.weightwatchers.reactive.kinesis.models._
import com.weightwatchers.reactive.kinesis.stream._

Source(1.to(100).map(_.toString))
  .map(num => ProducerEvent(num, num))
  .runWith(Kinesis.sink("producer-name"))
  .onComplete {
    case Success(_) => println("All messages are published successfully.")
    case Failure(ex) => println(s"Failed to publish messages: ${ex.getMessage}")
  }

A long running flow can be easily achieved using a SourceQueue. In this case the flow stays open as long as needed. New elements can be published via the materialized queue:

import akka.stream.scaladsl.Source
import com.weightwatchers.reactive.kinesis.models._
import com.weightwatchers.reactive.kinesis.stream._

val sourceQueue = Source.queue[ProducerEvent](1000, OverflowStrategy.fail)
  .toMat(Kinesis.sink("producer-name"))(Keep.left)
  .run()

sourceQueue.offer(ProducerEvent("foo", "bar"))
sourceQueue.offer(ProducerEvent("foo", "baz"))

The Sink uses a KinesisProducerActor under the cover. All rules regarding this actor also apply for the Sink.

Running the reliability test

Delete & recreate kinesisstreams and dynamo table

Execute this command in a shell. If you don't have access to WW AWS resources, you'll need it:

aws kinesis delete-stream --stream-name test-kinesis-reliability && aws dynamodb delete-table --table-name KinesisReliabilitySpec && sleep 90 && aws kinesis create-stream --stream-name test-kinesis-reliability --shard-count 2

Running the producer-consumer test

Run the SimpleKinesisProducer using the App object.

Wait for hte producer to publish all messages.

At the end the producer will print the number of unconfirmed and failed messages (both should be 0!).

Run the SimpleKinesisConsumer using the App object.

Now, wait for two messages that look like this to appear in the consumer window:

2016-06-14 20:09:24,938 c.w.c.e.KinesisRecordProcessingManager - Initializing record processor for shard: shardId-000000000001
2016-06-14 20:09:24,963 c.w.c.e.KinesisRecordProcessingManager - Initializing record processor for shard: shardId-000000000000

As the test progresses, watch the consumer window for a message of this format:

**** PIT STOP OK

You'll see some stats logged regarding messages/sec processed, near that line.

FAQ

  • How is DynamoDB used in relation to out checkpointing?
    • DynamoDB tables will be automatically created, however the write throughput must be configured appropriately using the AWS console or CLI. There is a cost associated with this, but note that setting it too low will cause checkpoint throttling. Configure kinesis.<consumer-name>.checkpointer.intervalMillis accordingly.
  • How is data sharded?
    • Sharding relates to the distribution of messages across the shards for a given stream. Ideally you want an even distribution amongst our shards. However ordering is only guaranteed within a given shard, it is therefore important to group related messages by shard. For example if a specific user performs several operations, in which the order of execution matters, then ensuring they land on the same shard will guarantee the order is maintained.
    • For this, the partition key is used. Messages with the same partition key will land on the same shard. In the example above a userId may be a good partition key.
    • Note that if the number of partition keys exceeds the number of shards, some shards necessarily contain records with different partition keys. From a design standpoint, to ensure that all your shards are well utilized, the number of shards should be substantially less than the number of unique partition keys.
  • How long do we keep data?
    • By default Kinesis stores data for 24 hours, however this can be extended to 7 days for a fee.
  • What data is already in the Stream and from where is it consumed?
    • Kinesis persists data up until the retention period, consuming data does not remove it from the stream. Rather, it moves your checkpoint to the appropriate position.
    • There are a number of options for configuring at which position in the stream a consumer will start, configured by the initialPositionInStream setting.
    • This library default to TRIM_HORIZON, note that if a checkpoint exists in DynamoDB, the application will always continue from the last checkpoint.
      • AT_SEQUENCE_NUMBER - Start reading from the position denoted by a specific sequence number, provided in the value StartingSequenceNumber.
      • AFTER_SEQUENCE_NUMBER - Start reading right after the position denoted by a specific sequence number, provided in the value StartingSequenceNumber.
      • AT_TIMESTAMP - Start reading from the position denoted by a specific timestamp, provided in the value Timestamp.
      • TRIM_HORIZON - Start reading at the last untrimmed record in the shard in the system, which is the oldest data record in the shard.
      • LATEST - Start reading just after the most recent record in the shard, so that you always read the most recent data in the shard.
  • How do sequence numbers work?
    • Sequence numbers for the same partition key generally increase over time, but NOT necessarily in a continuous sequence. The longer the time period between records, the bigger the gap between the sequence numbers.
    • To uniquely identify a record on a shard, you need to use BOTH the sequence number and the sub-sequence number. This is because messages that are aggregated together have the same sequence number (they are treated as one messages by Kinesis). Therefore it is important to also use the sub-sequence number to distinguish between them.

Contributor Guide

Code Formatting

This project uses scalafmt and will automatically fail the build if any files do not match the expected formatting.

Please run sbt scalafmt before committing and pushing changes.

Integration tests

As part of the travis build, integration tests will run against a Kinesis localstack instance. You can run these locally as follows:

  • docker-compose -f localstack/docker-compose.yml up
  • sbt it:test

Tag Requirements

Uses tags and sbt-git to determine the current version.

  • Each merge into master will automatically build a snapshot and publish to bintray OSS artifactory.
  • Tagging the master branch will automatically build and publish both Scala 2.11 & Scala 2.12 artifacts (to bintray and maven central).
  • Tags are in the format vX.X.X

Version information

  • IF the current commit is tagged with "vX.Y.Z" (ie semantic-versioning), the version is "X.Y.Z"
  • ELSE IF the current commit is tagged with "vX.Y.Z-Mx", the version is "X.Y.Z-Mx"
  • ELSE IF the latest found tag is "vX.Y.Z", the version is "X.Y.Z-commitsSinceVersion-gCommitHash-SNAPSHOT"
  • ELSE the version is "0.0.0-commitHash-SNAPSHOT"

Valid Release Tag Examples:

v1.2.3 (version=1.2.3) v1.2.3-M1 (version=1.2.3-M1)

Invalid Release Tag Examples:

v1.2.3-SNAPSHOT v1.2.3-M1-SNAPSHOT v1.2.3-X1 1.2.3

If the current version on master is a snapshot (release tag + x commits), then the artifact will be deployed to the JFrog OSS repository:

Contribution policy

Contributions via GitHub pull requests are gladly accepted from their original author. Along with any pull requests, please state that the contribution is your original work and that you license the work to the project under the project's open source license. Whether or not you state this explicitly, by submitting any copyrighted material via pull request, email, or other means you agree to license the material under the project's open source license and warrant that you have the legal authority to do so.

Changelog

See the releases tab: https://github.com/WW-Digital/reactive-kinesis/releases

License

This code is open source software licensed under the Apache 2.0 license.

reactive-kinesis's People

Contributors

agaro1121 avatar aquamatthias avatar easel avatar etspaceman avatar fernando-torterolo avatar htimur avatar jannikarndt avatar markglh avatar markus1189 avatar nick-smith avatar tjc avatar ww-jesusaguilar avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

reactive-kinesis's Issues

KinesisProducerKPL should take typed config

This isn't exactly... typesafe...

  def apply(kplConfig: Config,
            streamName: String,
            credentialsProvider: Option[AWSCredentialsProvider] = None):

Config should be replaced with a case class that has everything that can be set. And if they are optional, make them Option.

Fully test the KPL properties

The KPL properties work as follows:

  • Define them in the KPL section of the app.conf
  • These get parsed and converted to a java properties file
  • This file gets imported into the KPL library.

Currently we test that we load these properties into our structure, but we don't assert that they get set correctly on the underlying library.

The tasks are as follows:

  • Move the ProducerConf definition down into the KinesisProducerKPL.
  • Create new constructors like the ones in the Actor which allow passing of a confi only, see here in the Actor interface. Move this initial parsing logic down into the KinesisProducerKPL
    _ Be careful not to break the existing interfaces, we're just adding a new constructor to the KPL externally
  • Move the Actor tests which prove out this config parsing logic into the KinesisProducerKPLSpec
  • Add more tests which prove that the created AWSKinesisProducer has everything set correctly.

Note: I suspect the threading model won't be set correctly, this needs to be fixed!

  • Amazon define the allowed properties here - it's not up to date
  • The ThreadingModel is typed as an enum in the KPL code, we'll be passing as a String in the properties file (how else can we?)
  • The properties file is parsed here - it simply iterates through setting the types, there's no special logic for enums.
  • The setter for the threadingModel is here
  • ThreadingModel enum
  • We define it here
  • Thread pool configuration was added to the KPL here

Investigate & implement 2 Phase checkpoints to reduce duplication on failover

Applications can now set a pending checkpoint, before completing the checkpoint operation. Once the application has completed its checkpoint steps, the final checkpoint will clear the pending checkpoint.
Should the checkpoint fail the attempted sequence number is provided in the InitializationInput#getPendingCheckpointSequenceNumber otherwise the value will be null.

See: awslabs/amazon-kinesis-client#188

This passes the pendingCheckpoint to the new RecordProcessor upon failover, allowing it to continue at that record if the actual checkpoint failed.

Feature Request - Akka Stream Support

Akka Actors are a great start for implementing the reactive API. I think the logical next step for an interface here would be to provide Akka Stream implementations. This would particularly benefit the Consumer as you could provide significant features such as back-pressure.

Calling KinesisProducerActor.props already starts a KinesisProducer

While playing with the KinesisSinkGraphStage I realized, that a kinesis producer is started even before the actor has been created, since a call to props starts the producer behind the scenes. It was surprising for me and I think it could be surprising for others as well.

I see 2 scenarios which probably should be supported:

  • an already created KinesisProducer should be used
  • a new KinesisProducer should be created on demand, if the actor starts

Shutdown process - unclear how to trigger - could improve docs

The README states:

Currently the KinesisConsumer Shutdown works as follows:

  • Shutdown is called on the KinesisConsumer (either explicitly or via the jvm shutdown hook)

How do I explicitly call shutdown upon the KinesisConsumer?

It looks like I call the stop() method upon it, which isn't quite the same as your note that says to call Shutdown upon it.

I know it's just a simple thing, but maybe you could tweak the documentation?

Cheers

non-actor interface

it would be good if you provided a HKT interface such as

trait Kinesis[F[_]] {
  def getFoo(t: String): F[Foo]
}

etc along with an impl for Future backed by your actors. This would make it easier to mock downstream (e.g. using synchronous Id) and also encapsulate your actor implementation.

Fix Intermittent failing test

Sometime (in Travis) the following test fails:

[info]   - When the response is a failed batch it should shutdown and stop processing *** FAILED ***
[info]     org.mockito.exceptions.verification.WantedButNotInvoked: Wanted but not invoked:
[info] worker.requestShutdown();
[info] -> at com.weightwatchers.reactive.kinesis.consumer.ConsumerProcessingManagerSpec$$anon$2.$anonfun$new$7(ConsumerProcessingManagerSpec.scala:162)
[info] 
[info] However, there was exactly 1 interaction with this mock:
[info] worker.getApplicationName();
[info] -> at com.weightwatchers.reactive.kinesis.consumer.ConsumerProcessingManager.<init>(ConsumerProcessingManager.scala:65)
[info]     at com.weightwatchers.reactive.kinesis.consumer.ConsumerProcessingManagerSpec$$anon$2.$anonfun$new$7(ConsumerProcessingManagerSpec.scala:162)
[info]     at com.weightwatchers.reactive.kinesis.consumer.ConsumerProcessingManagerSpec$$anon$2$$Lambda$2021/1019640917.apply(Unknown Source)
[info]     at org.scalatest.concurrent.Futures.whenReady(Futures.scala:677)
[info]     at org.scalatest.concurrent.Futures.whenReady$(Futures.scala:675)
[info]     at com.weightwatchers.reactive.kinesis.consumer.ConsumerProcessingManagerSpec.whenReady(ConsumerProcessingManagerSpec.scala:52)
[info]     at com.weightwatchers.reactive.kinesis.consumer.ConsumerProcessingManagerSpec$$anon$2.<init>(ConsumerProcessingManagerSpec.scala:156)
[info]     at com.weightwatchers.reactive.kinesis.consumer.ConsumerProcessingManagerSpec.$anonfun$new$6(ConsumerProcessingManagerSpec.scala:152)
[info]     at com.weightwatchers.reactive.kinesis.consumer.ConsumerProcessingManagerSpec$$Lambda$1999/696904565.apply(Unknown Source)
[info]     at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
[info]     at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
[info]     ...

https://travis-ci.org/WW-Digital/reactive-kinesis/jobs/237901638

KinesisProducerActor: failed sends should be retried

The current implementation of KinesisProducerActor sends every message to KinesisProducer. A successful produced message results in a SendSuccessful, otherwise a SendFailed actor message.

If a message can not be published, it would be great if sending the message can be retried a configurable number of times.

A related feature is available on the consumer side: failedMessageRetries

ConsumerProcessingManagerSpec random failed test

Fix random fail in test:
ConsumerProcessingManagerSpec

  • When the response is a failed batch it should shutdown and stop processing *** FAILED ***
    [info] org.mockito.exceptions.verification.WantedButNotInvoked: Wanted but not invoked:
    [info] worker.requestShutdown();
    [info] -> at com.weightwatchers.reactive.kinesis.consumer.ConsumerProcessingManagerSpec$$anon$2.$anonfun$new$7(ConsumerProcessingManagerSpec.scala:162)
    [info]
    [info] However, there was exactly 1 interaction with this mock:
    [info] worker.getApplicationName();

[info] -> at com.weightwatchers.reactive.kinesis.consumer.ConsumerProcessingManager.(ConsumerProcessingManager.scala:65)
[info] at com.weightwatchers.reactive.kinesis.consumer.ConsumerProcessingManagerSpec$$anon$2.$anonfun$new$7(ConsumerProcessingManagerSpec.scala:162)
[info] at com.weightwatchers.reactive.kinesis.consumer.ConsumerProcessingManagerSpec$$anon$2$$Lambda$327/1114273098.apply(Unknown Source)
[info] at org.scalatest.concurrent.Futures.whenReady(Futures.scala:677)
[info] at org.scalatest.concurrent.Futures.whenReady$(Futures.scala:675)
[info] at com.weightwatchers.reactive.kinesis.consumer.ConsumerProcessingManagerSpec.whenReady(ConsumerProcessingManagerSpec.scala:52)
[info] at com.weightwatchers.reactive.kinesis.consumer.ConsumerProcessingManagerSpec$$anon$2.(ConsumerProcessingManagerSpec.scala:156)
[info] at com.weightwatchers.reactive.kinesis.consumer.ConsumerProcessingManagerSpec.$anonfun$new$6(ConsumerProcessingManagerSpec.scala:152)
[info] at com.weightwatchers.reactive.kinesis.consumer.ConsumerProcessingManagerSpec$$Lambda$240/1621586683.apply(Unknown Source)
[info] at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
[info] at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)

Provide the partitionKey as part of the Event

Hey,

we are currently playing with reactive-kinesis. As first impression: It looks really good - so thanks for this cool library.
What I miss is the information about the partitionKey in each event which is available in Record.
If a shard is taken over by a different worker, we would like to drop buffered events from this shard.
Do you think it would make sense to add the key to the ConsumerEvent class?
Happy to create a PR, if you agree.

accessing KinesisConsumer directly when using Akka Source

I saw that issue #60 and the subsequent fix in #61 seemed to touch upon this problem, but I don't quite see how the proposed solution allows one to grab a reference to a KinesisConsumer or ConsumerService. The createConsumer parameter provided by these new factory methods ultimately rely on creation of the consumer within the preStart function of the GraphStageLogic via the underlying stage actor.

My end goal here is to have a reference to my consumer so I can manually start/stop consumption based on custom business logic. Maybe I'm missing something, but please let me know if there's a way to manage this at the moment.

Consumer Shutdown Testing: Exception handling

When processing a batch and an exception scenario occurs we need to gracefully handle this. Exceptions include:

  • The application not acknowledging the processing of messages above the configured threshold. (failed batches)
  • Exception within the Kinesis Client itself (definitely a bug, but again, causing a failed batch) - handled in the manager.
  • Losing connectivity to Kinesis - I'm almost certain the KCL will retry x times before shutting down the worker, this in turn will cause a "GracefulShutdown" which will notify the application's message processor actor. It's up to that what happens (Notification's calls System.exit - sugar perhaps shouldn't?). What's certain here though is that once the Worker shuts down - it won't ever restart. Can we increase the retries on the KCL/KPL via config? Or do we need a process which restarts the worker? Alternatively shutdown? ...

By gracefully do we mean:

  • Shutting down the whole application? Regardless of the number of individual stream consumers, producers & shards
  • Delegating this to the application?

Currently we shutdown the processing and then the whole application, working on the assumption that without a consumer the application can't serve it's purpose. Perhaps this should be configurable?

Batch Consumer (Feature Request)

Hi There -

Something that I'm struggling with when using this library is that I can only set up single-event processors for the Consumer via ConsumerWorker. This limits me from using this library for something like a Redshift endpoint, which encourages users to batch their commands through a series of staged tables. I cannot simply run INSERT/UPDATE/DELETE commands on Redshift when reacting to Kinesis events because the speed is incredibly slow.

If we were able to use an actor that received the sequence of events being processed here:

https://github.com/WW-Digital/reactive-kinesis/blob/master/src/main/scala/com/weightwatchers/reactive/kinesis/consumer/ConsumerWorker.scala#L327

We could batch the commands against all records received and efficiently update batch-biased endpoints like Redshift.

CheckpointWorker Actor is not killed when the lease is lost

In our setup we have a Kinesis stream with 2 shards and 2 consumer (C1 and C2 in my description).
Both consumer C1 and C2 will get a lease for one shard and read from the stream as expected.
If C2 fails (process dies), C1 will read from both shards. C2 restarts and will take over a lease for one shard.
The CheckPointWorkerActor is not killed when a lease is lost. We see the following debug output in C1:

Sep 28, 2017 10:32:13 AM com.amazonaws.services.kinesis.leases.impl.LeaseRenewer renewLease
INFORMATION: Worker b1dbae75-b26a-4c4a-a037-236d0546144a lost lease with key shardId-000000000001
Sep 28, 2017 10:32:13 AM com.amazonaws.services.kinesis.clientlibrary.lib.worker.SequenceNumberValidator validateSequenceNumber
INFORMATION: Validated sequence number 49577403242574069570051621516948183134887434789161271314 with shard id shardId-000000000001
Sep 28, 2017 10:32:13 AM com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibLeaseCoordinator setCheckpoint
INFORMATION: Worker b1dbae75-b26a-4c4a-a037-236d0546144a could not update checkpoint for shard shardId-000000000001 because it does not hold the lease
10:32:13.962 [kinesis-akka.actor.default-dispatcher-4] INFO com.weightwatchers.reactive.kinesis.consumer.CheckpointWorker - Caught shutdown exception, skipping checkpoint

From here on, we see this exception logged in the checkpoint interval (every 3 seconds):

10:32:59.418 [kinesis-akka.actor.default-dispatcher-3] INFO com.weightwatchers.reactive.kinesis.consumer.CheckpointWorker - Caught shutdown exception, skipping checkpoint.
com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException: Can't update checkpoint - instance doesn't hold the lease for this shard
	at com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibLeaseCoordinator.setCheckpoint(KinesisClientLibLeaseCoordinator.java:174)
	at com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer.advancePosition(RecordProcessorCheckpointer.java:216)
	at com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer.checkpoint(RecordProcessorCheckpointer.java:137)
	at com.weightwatchers.reactive.kinesis.consumer.CheckpointWorker.$anonfun$checkpointOrBackoff$1(CheckpointWorker.scala:231)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
	at scala.util.Try$.apply(Try.scala:209)
	at com.weightwatchers.reactive.kinesis.consumer.CheckpointWorker.checkpointOrBackoff(CheckpointWorker.scala:230)
	at com.weightwatchers.reactive.kinesis.consumer.CheckpointWorker.com$weightwatchers$reactive$kinesis$consumer$CheckpointWorker$$checkpointAndRespond(CheckpointWorker.scala:204)
	at com.weightwatchers.reactive.kinesis.consumer.CheckpointWorker$$anonfun$readyState$1$1.applyOrElse(CheckpointWorker.scala:146)
	at akka.actor.Actor.aroundReceive(Actor.scala:514)
	at akka.actor.Actor.aroundReceive$(Actor.scala:512)
	at com.weightwatchers.reactive.kinesis.consumer.CheckpointWorker.aroundReceive(CheckpointWorker.scala:117)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:527)
	at akka.actor.ActorCell.invoke(ActorCell.scala:496)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
	at akka.dispatch.Mailbox.run(Mailbox.scala:224)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
	at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Everything works as expected, since the lease is taken over by another consumer.
Nevertheless I see this as a problem, since the logs will emit warnings, which are no warnings.
Not sure if this can lead to other unwanted behaviour?

Standardise CredentialsProvider config between KCL and KPL

Currently the KPL requires you to pass it as an argument, whereas the KCL requires it as config.

  • Ideally we should be able to specify both as config
  • Verify that custom providers work for both the KCL and KPL

The KCL uses a different (more sophisticated) method for parsing the properties compared to the KPL, see below for an example of how it parses the CredentialsProvider:
https://github.com/awslabs/amazon-kinesis-client/blob/master/src/main/java/com/amazonaws/services/kinesis/clientlibrary/config/KinesisClientLibConfigurator.java#L112

The KPL only supports setting of Strings, Ints and Booleans:
https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/KinesisProducerConfiguration.java#L197

License - AWS vs. Apache 2

See this thread:

akka/alpakka#434

Alpakka determined that they were unable to continue to distribute their software under the Apache 2 license when leveraging the KCL library. This is because the KCL is distributed with its own License.

This repository is facing the same conundrum. I think that the license needs to be changed to match the AWS Licnese for it to be legally compliant w/ their SDK distribution:

https://github.com/awslabs/amazon-kinesis-client/blob/master/LICENSE.txt

CC @markglh

Handle Shutdown Whilst Processing Batches

Currently the Kinesis Shutdown works as follows:

  • Shutdown is called on the KinesisConsumer (either explicitly or via the jvm shutdown hook)
  • This then calls requestShutdown on the KCL Worker, blocking until completion.
  • The KCL Worker propagates this down to the ConsumerProcessingManager (Which is the IRecordProcessor) - calling shutdownRequested on each instance (one per shard).
  • When shutdownRequested is called, this sends a GracefulShutdown message to the ConsumerWorker Actor, blocking until a response is received (Ask + Await).
  • On receipt of this message, the ConsumerWorker switches context to ignore all future messages. If a batch is currently being processed, it responds to the sender of that batch (the manager), which will currently be blocking awaiting confirmation of the batch (this is by design, the KCL requires that we don't complete the processRecords function until we have finished the batch, otherwise the next batch is immediately sent)
  • The ConsumerWorker then forces a final checkpoint, responding to the manager once completed (or failed), which allows shutdown to continue and the KinesisConsumer to shutdown.

So this all sounds great, however if we're processing a batch (and therefore blocking processRecords), the KCL doesn't allocate a separate thread to call shutdownRequested. This means that even though in the ConsumerWorker we allow the batch processing to be aborted early, this never happens because until batch processing is complete the processRecords thread is blocked.

Possible solutions
What needs to happen is the KCL calls requestShutdown on a separate thread, we'll then unblock processRecords automatically and checkpoint accordingly. This will require a change to the KCL (assuming the issue is indeed with the KCL). We'd need to write a test which reproduces this (using Java). The raising an issue in the KCL github for them to fix it.

Alternatively, maybe the issue is with us? Potentially the GracefulShutdown message is stuck in the mailbox whilst we process the batch. If this is the case (a test could prove this where the message isn't acked before sending shutdown), then one option is to use the a priority mailbox to allow GracefulShutdown message a higher priority - skipping the queue.

include documentation for integration testing

in our project we have set up integration tests around kinesis, @markglh ping me if you want access to see what we've done. I think this amount to adding a section to the README about wiring up docker-compose and the relevant config.

Problem consuming from stream when moving from 1 shard to 2 shards.

We are using reactive-kinesis for the time to consume from a stream. With 1 shard, everything works fine. When we bump it up to 2 shards, the consumer shuts down soon after it starts. The log error is:

2018-05-09 12:03:48,991 [warn] c.w.r.k.c.ConsumerWorker - Worker for shard Some(shardId-000000000002): Skipped checkpointing on shutdown 2018-05-09 12:03:48,993 [info] c.w.r.k.s.KinesisSourceGraph - Consumer shutdown for shard shardId-000000000002 2018-05-09 12:03:48,995 [error] c.a.s.k.c.l.w.ShutdownTask - Application exception. java.lang.IllegalArgumentException: Application didn't checkpoint at end of shard shardId-000000000002 at com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownTask.call(ShutdownTask.java:110)

Is there any configuration that must be done on the consumer? Is there something we have to do in code? Thanks for your time.

support for aws / local config initialisation

AWS seems to use magic to provide the host/port and creds, whereas using a local docker mock of kinesis requires other information. It would be good if all this setup was abstracted away such that evil

if (tests) { ... }
else { ... untested prod codebase ...}

branches are minimised / localised to this project and tested in the field with the long hard stick of production support.

That's a guaranteed bug, right there.

Kinesis stream how to call shutdown on source?

We are using the Kinesis object to create a kinesis stream source:

Kinesis
  .source(StreamConfigName)
  .via(flow)
  .runWith(Sink.foreach[CommittableEvent[ConsumerEvent]](_.commit()))

When we shutdown the application in which this stream is processed we produce this exception:

2018-04-17 09:59:28.934 ERROR c.w.r.k.c.ConsumerProcessingManager - Unexpected exception on shutdown, final checkpoint attempt may have failed
akka.pattern.AskTimeoutException: Recipient[Actor[akka://user/consumer-worker-c28d4f4c-4214-11e8-a2d7-f70fbc38ec42#-1898192529]] had already been terminated. Sender[null] sent the message of type "com.weightwatchers.reactive.kinesis.consumer.ConsumerWorker$GracefulShutdown".
at akka.pattern.AskableActorRef$.internalAsk$extension(AskSupport.scala:290)
at akka.pattern.AskableActorRef$.$qmark$extension1(AskSupport.scala:282)
at com.weightwatchers.reactive.kinesis.consumer.ConsumerProcessingManager$$anonfun$shutdown$2.apply(ConsumerProcessingManager.scala:143)
at scala.util.Try$.apply(Try.scala:192)
at com.weightwatchers.reactive.kinesis.consumer.ConsumerProcessingManager.shutdown(ConsumerProcessingManager.scala:142)
at com.weightwatchers.reactive.kinesis.consumer.ConsumerProcessingManager.shutdownRequested(ConsumerProcessingManager.scala:128)
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownNotificationTask.call(ShutdownNotificationTask.java:44)
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:49)
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:24)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

How can I call the stop method on the KinesisConsumer to shutdown gracefully? If I understood the documentation correctly, the KinesisConsumer is used inside the Source. But The source does not return any handle so I could trigger a graceful shutdown, does it?

Scaling Kinesis Shards Yields Errors

See this thread on the AWS Developer forums:

https://forums.aws.amazon.com/thread.jspa?threadID=245127

Basically to recreate this, I bump up the shard count for Kinesis. With a single node, I see the above error occur, and the consumer dies. We bounce our application and scale to 2 nodes, but one of the nodes tries to connect to the initial shard (which is now in a "CLOSED" state), and throws the same error. We've also tried removing the checkpointing and then restarting our applications, with the same issue occurring. The only way we're able to consume all shards is to have N+1 instances of a consumer against the stream shards.

Create a consumer interface users can extend

Maybe something like this:
So users are forced to process events and not have to worry about a receive method

import com.weightwatchers.core.eventing.consumer.ConsumerWorker.{EventProcessed, ProcessEvent}
import akka.actor._

abstract class AbstractConsumerActor extends Actor {

  def processEvent: Function[ProcessEvent, EventProcessed]

  def send: EventProcessed => Unit

  val processAndSend = processEvent.andThen(send).asInstanceOf[PartialFunction[Any,Unit]]


  override def receive: Receive = processAndSend
}

extending it would look like:

class TestConsumer extends AbstractConsumerActor {
  override def processEvent: Function[ProcessEvent, EventProcessed] = ???

  override def send: (EventProcessed) => Unit = ???
}

Kinesis Client Issues?

I noticed that you guys updated the AWS Kinesis Client library today (very recently), and I was wondering if you had been seeing any strange behaviors with workers getting shut-down.

We have a consumer that we wrote using the amazon-kinesis-client library and it stopped processing records yesterday. My coworker swapped in your library to try and tackle the problem, but we saw the same issue (a graceful shutdown is initiated).

Sorry, this is pretty vague but we're at a loss and wanted to reach into the ether to see if others were having unexplained Kinesis issues today.

Consumer Shutdown Testing: Shard Reallocation

When processing from multiple shards (1 consumer), we need to assert the following.

  • Reducing the shard count from 2 to 1 (in Kinesis) should result in the consumer continuing to consume from the remaining shard. Kinesis will reallocate messages to the remaining shard automatically. We need to ensure the consumer continues seamlessly.
  • It's possible that the ConsumerManager within the client will shutdown the whole consumer when a shard is removed as it calls shutdown on the KCLWorker, which isn't shard specific.
  • Note also that there are two shutdown scenarios in the manager, one when shutting down a shard gracefully and one due to an exception - this is testing the former.

Update the kinesis library to accept and return a byte array (or bytebuffer)

Update the kinesis library to accept and return a byte array (or bytebuffer). Returning string and assuming UTF8 is very dangerous, because some byte arrays can not be represented as a UTF8 string, e.g.

“nope” in {

     val data = Array[Byte](
        10, 62, 10, 2, 8, 1, 18, 12, 10, 10, 8, -42, -81, -18, -115, -15, -13,
        -128, -56, 127, 26, 38, 10, 36, 52, 56, 101, 100, 52, 101, 52, 50, 45,
        57, 54, 48, 55, 45, 52, 97, 49, 49, 45, 56, 102, 97, 48, 45, 97, 49,
        100, 55, 52, 101, 55, 98, 48, 102, 98, 100, 34, 2, 8, 1, 18, 23, 10, 5,
        10, 3, 117, 105, 100, 10, 6, 10, 4, 117, 105, 100, 50, 10, 6, 10, 4,
        117, 105, 100, 51
      )

     val asString =
        new String(data, StandardCharsets.UTF_8)

     println(“asString”)
      println(asString.mkString(“,”))
      println(“asString”)

     val asBuffer =
        ByteBuffer.wrap(asString.getBytes(StandardCharsets.UTF_8))

     println(data.mkString(“,”))
      println(asBuffer.array().mkString(“,”))

     data shouldEqual asBuffer
    }

This test will always fail. For that reason when using the kinesis library we HAVE TO convert our messages to string manually (NOT using UTF8), then in the library the string is converted to bytebuffer (using UTF8 although it is NOT in UTF8). When consuming the library turns the bytes into a string using UTF8 and we again HAVE TO take that string and turn it into a byte buffer using our enconding. A lot of unnecessary operations and also very confusing

i.e. if we use ISO string (because that can represent any byte array, we are essentially doing this

producer: protobuffer -> isostring -> uft8buffer
consumer: utf8buffer -> isostring -> protobuffer

creating a CWPublisherRunnable that tries to publish to cloudwatch

I've been debugging an issue where MetricsLevel = none is being ignored. No matter what is used, runtime logs are full of CWPublisherRunnable stuff like this (keep reading beyond the stack trace, I have some analysis)

Could not publish 13 datums to CloudWatch

c.a.s.c.m.AmazonCloudWatchException: The security token included in the request is invalid. (Service: AmazonCloudWatch; Status Code: 403; Error Code: InvalidClientTokenId; Request ID: f4de4145-0db8-11e8-97f0-5b84c3ea3332)
    at c.a.h.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1639)
    at c.a.h.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1304)
    at c.a.h.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1056)
    at c.a.h.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743)
    at c.a.h.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
    at c.a.h.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
    at c.a.h.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
    at c.a.h.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
    at c.a.h.AmazonHttpClient.execute(AmazonHttpClient.java:513)
    at c.a.s.c.AmazonCloudWatchClient.doInvoke(AmazonCloudWatchClient.java:1325)
    at c.a.s.c.AmazonCloudWatchClient.invoke(AmazonCloudWatchClient.java:1301)
    at c.a.s.c.AmazonCloudWatchClient.executePutMetricData(AmazonCloudWatchClient.java:1209)
    at c.a.s.c.AmazonCloudWatchClient.putMetricData(AmazonCloudWatchClient.java:1186)
    at c.a.s.k.m.i.DefaultCWMetricsPublisher.publishMetrics(DefaultCWMetricsPublisher.java:63)
    at c.a.s.k.m.i.CWPublisherRunnable.runOnce(CWPublisherRunnable.java:144)
    at c.a.s.k.m.i.CWPublisherRunnable.run(CWPublisherRunnable.java:90)
    at java.lang.Thread.run(Thread.java:748)

I put together a monkey patch version of CWPublisherRunnable that dumps out the stack trace when the runnable is created and it looks like the culprit is com.weightwatchers.reactive.kinesis.consumer.KinesisConsumer.$anonfun$start$2(KinesisConsumer.scala:259)

java.lang.Exception
	at com.amazonaws.services.kinesis.metrics.impl.CWPublisherRunnable.<init>(CWPublisherRunnable.java:71)
	at com.amazonaws.services.kinesis.metrics.impl.CWPublisherRunnable.<init>(CWPublisherRunnable.java:63)
	at com.amazonaws.services.kinesis.metrics.impl.CWMetricsFactory.<init>(CWMetricsFactory.java:131)
	at com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker$WorkerCWMetricsFactory.<init>(Worker.java:1043)
	at com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.getMetricsFactory(Worker.java:1020)
	at com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.access$500(Worker.java:66)
	at com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker$Builder.build(Worker.java:1254)
	at com.weightwatchers.reactive.kinesis.consumer.KinesisConsumer.kclWorker$lzycompute(KinesisConsumer.scala:230)
	at com.weightwatchers.reactive.kinesis.consumer.KinesisConsumer.kclWorker(KinesisConsumer.scala:228)
	at com.weightwatchers.reactive.kinesis.consumer.KinesisConsumer.$anonfun$start$2(KinesisConsumer.scala:259)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
	at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:655)
	at scala.util.Success.$anonfun$map$1(Try.scala:251)
	at scala.util.Success.map(Try.scala:209)
	at scala.concurrent.Future.$anonfun$map$1(Future.scala:289)
	at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:29)
	at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:29)
	at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
	at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
	at akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:91)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
	at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:81)
	at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:91)
	at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:43)
	at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

JodaTime -> java.time

Looks like the library makes use of jodatime. Migration to java.time data types probably makes sense.

Producer Shutdown Testing: Graceful Shutdown

This should be tackled AFTER: #6 & #7

Applications using the Kinesis client should gracefully shutdown. By this we mean completing the current batch (both in the actor and in the KPL), to minimise duplication of messages.

The Producer doesn't currently do anything special here, but it should.

This must be fully tested by asserting duplication of messages published to a stream over a number of shutdowns

The outcome here is to verify how much duplication of messages happens under normal an exception scenarios. Exception meaning adding/removing shards, shutting down a consumer/producer, etc. Document the outcome and raise any necessary stories required to address unacceptable duplication.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.