Giter Club home page Giter Club logo

scala-kafka-client's Introduction

Scala support for Apache Kafka's Java client library 0.9.0.x - 2.3.1

Join the chat at https://gitter.im/cakesolutions/scala-kafka-client Build status Dependencies codecov

This project comprises a few helper modules for operating the Kafka Java Client Driver in a Scala codebase.

Status

These modules are production ready, actively maintained and are used in a large scale production system.

Artifact Resolution

To resolve any of the modules, add the following resolver to the build.sbt:

resolvers += Resolver.bintrayRepo("cakesolutions", "maven")

Components

Scala Kafka Client

A thin Scala wrapper over the official Apache Kafka Java Driver. This module is useful for integrating with Kafka for message consumption/delivery, but provides some helpers for convenient configuration of the driver and usage from Scala. Minimal 3rd party dependencies are added in addition to the Kafka client.

For configuration and usage, see the Wiki: Scala Kafka Client Guide

SBT library dependency:

libraryDependencies += "net.cakesolutions" %% "scala-kafka-client" % "2.3.1"

Akka Integration

This module provides a configurable asynchronous and non-blocking Kafka Consumer and Producer Actor implementations to support high performance, parallel custom stream processing in an Akka application. These components are specifically intended for use cases where high performance and scalable message processing is required with specific concern for message delivery guarantees and resilience.

For configuration and usage, see the Wiki: Akka Integration

SBT library dependency:

libraryDependencies += "net.cakesolutions" %% "scala-kafka-client-akka" % "2.3.1"

TestKit

The TestKit module provides some tools to support integration testing of client service code that depends on a running Kafka Server. Helps the setup of an in-process Kafka and Zookeeper server.

For usage, see the Wiki: TestKit User Guide

SBT library dependency:

libraryDependencies += "net.cakesolutions" %% "scala-kafka-client-testkit" % "2.3.1" % "test"

Version Compatibility

Starting after version 0.8.0, the versioning for Scala Kafka client will be tracking Kafka's versioning scheme. Binary compatibility in the new versioning system works as follows:

  • The first and the second digit in the version indicate compatibility with the Kafka driver. For example, 0.9.0.0 is compatible with Kafka 0.9 and 0.10.0.0 is compatible with Kafka 0.10.
  • The third digit in the version indicates an incompatible change between Scala Kafka client versions. For example, 0.9.0.1 is not binary compatible with 0.9.1.0.
  • The fourth digit in the version indicates a compatible change between Scala Kafka client versions. For example, 0.9.0.0 is compatible with 0.9.0.1.

Both the 0.10.* and 1.0.* versions are maintained concurrently.

Here is the full table of binary compatibilities between Scala Kafka client and the Kafka Java driver:

Scala Kafka client Kafka Java Driver
2.3.1 2.3.1
2.1.0 2.1.0
2.0.0 2.0.0
1.1.1 1.1.1
1.0.0 1.0.0
0.11.0.0 0.11.0.0
0.10.2.x 0.10.2.x
0.10.1.x 0.10.1.x
0.10.0.0 0.10.0.x
0.9.0.0 0.9.0.x
0.8.0 0.10.0.0
0.7.0 0.9.0.1

Change log

2.3.1 - 11/2019

  • Update to Kafka 2.3.1

2.1.0 - 12/2018

  • Update to Kafka 2.1.0

2.0.0 - 08/2018

  • Update to Kafka 2.0.0

1.1.0 - 08/2018

  • Update to Kafka 1.1.1

1.1.0 - 04/2018

  • Update to Kafka 1.1.0

1.0.0 - 12/2017

  • Update to Kafka 1.0.0

0.11.0.0 - 07/2017

  • Update to Kafka 0.11.0.0
  • Update Akka to 2.5.3
  • Added transaction related properties to config options.

0.10.2.2 - 05/2017

  • Minor improvements to producer API

0.10.2.1 - 05/2017

  • Update Kafka to 0.10.2.1
  • Update Akka to 2.4.18

0.10.2.0 - 02/2017

  • Update Kafka to 0.10.2.0
  • Update dependencies
  • Minor tweaks to logging verbosity

0.10.1.2 - 01/2017

  • Crossbuild for Scala 2.11.8 and 2.12.1
  • Added maxMetaDataAge to KafkaConsumer (thanks @nitendragautam)
  • KafkaConsumerActor now terminates on downstream receiver actor termination (thanks @yoks)
  • Fixed bug related to Exception Handling in KafkaProducer (thanks @conniec)

0.10.1.1 - 11/2016

  • Added new subscription mode: AutoPartitionWithManualOffset.
  • Updated Akka to 2.4.14.
  • Added new Retry count config, to prevent overwhelming an unresponsive downstream client.

0.10.1.0 - 11/2016

  • Supports Kafka 0.10.1.0
  • Added support for new Consumer property: max.poll.interval
  • Removed log4j-over-slf4j dependency
  • Added examples to source

0.9.0.0,0.10.0.0 - 08/2016

  • Subscribe model changed, now supports more explicit subscription types
  • Handling of partition rebalances improved
  • ConsumerActor failure and restart mechanics improved
  • Versioning scheme changed
  • Testkit improvements
  • ConsumerActor wrapper API provided
  • Tested against Kafka 0.10.0.1

0.8.0 - 06/2016

  • Supports Kafka Client 0.10.0.0
  • Add max.poll.records config option to consumer

0.7.0 - 05/2016

  • Supports Kafka Client 0.9.0.1

Acknowledgements

YourKit supports open source projects with its full-featured Java Profiler. YourKit, LLC is the creator of YourKit Java Profiler and YourKit .NET Profiler, innovative and intelligent tools for profiling Java and .NET applications.

License

Copyright 2016-2018, Cake Solutions.

Licensed under the MIT License

scala-kafka-client's People

Contributors

antonio-ramadas avatar bigmoby avatar choedl avatar conniec avatar ctoomey avatar danbim avatar dhpiggott avatar gitter-badger avatar glorat avatar janm399 avatar jeanbza avatar jiminhsieh avatar jkpl avatar keremk avatar malenczuk avatar ploddi avatar saheb avatar simonsouter avatar sohumb avatar tabdulradi avatar tonicebrian avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

scala-kafka-client's Issues

Convenient way to supply ValueDeserializer

It would be convenient to be able to supply the valueDeserializer as just a Array[Byte] => A; this will be especially helpful for ScalaPB Protobuf generated code.

KafkaConsumer.Conf(
    ...,
    keyDeserializer = ...,
    valueDeserializer = Enveope.parseFrom
)

Where Enveope.parseFrom is def parseFrom(s: Array[Byte]): Enveope.

No documentation on KafkaProducer

Documentation exists for how to use the Scala KafkaConsumer, but no documentation exists for how and when you would expect users to utilise the KafkaProducer and its associated helper functions.

This can lead to difficult to debug issues - for example, using KafkaRecords.fromValues rather than KafkaRecords.fromValuesWithKey can lead to type related issues that cause messages to be dropped. Fortunately, these dropped messages are logged (though at a WARN logging level).

Consumer Retry Logic

Retrying delivery of unconfirmed messages from the KafkaConsumerActor is currently configurable via a delay. It would be good to be able to limit the number of retries as once the session has timed out it probably does not make sense to continue to redeliver indefinitely.

Unkown message: Confirm(Offsets(...))

Sometimes (especially on startup) we get the following message in our logs:

Unknown message: Confirm(Offsets(... = 550, ... = 8),true)

This leads to messages being processed multiple times in the system, sometimes up to 20-30 times, until the actor suddenly gets into a valid state and starts to process the Confirm-messages correctly.

Is there a way to prevent this problem? After looking through the code there seems to be a problem when a Confirm-message is processed in the "ready"-state, despite that state having a comment that no unconfirmed messages should be present.

There is no case class Records

There is no case class Records in KafkaConsumerActor.

import cakesolutions.kafka.akka.KafkaConsumerActor.{Confirm, Records} // Records not found

class ReceiverActor extends Actor {

  override def receive:Receive = {
  case r:Records[_, _] =>

  //Type safe cast of records to correct serialisation type
  r.cast[String, String] match {
    case Some(records) =>
      processRecords(records.records)
      sender() ! Confirm(r.offsets)
    case None => log.warning("Received wrong Kafka records type!")
  }

}

I'm using "net.cakesolutions" %% "scala-kafka-client-akka" % "0.7.0",

Consumer Actor could provide an Ack on Successful Subscription on Topics

Hello,

The current issue I'm facing at the moment is that upon instructing the Kafka Consumer to Subscribe on given Topics, there is no way to know if it has finished with the subscription or not.

By following the example provide for the KafkaConsumerActor, we subscribe by sending:

 override def preStart() = {
    super.preStart()
    kafkaConsumerActor ! Subscribe()
  }

and we immediately return assuming it's consuming already. By relying on the default reset strategy, the consumer will start consuming from the latest offset.

The issue arises if we start publishing messages on a topic straight after this Actor has been created and (instructed to) subscribe. There is a small window where messages are going to be skipped while the subscription is happening.

Is there a way to acknowledge that the subscription has been completed?

E.g. after it has transitioned from state unsubscribed -> ready ?

java.net.ConnectException: Connection refused

Hi ,
Not sure that this a scala-kafka-client issue, but I see a lot Connection refused exception lines in the log. Although everything seems to be working properly - messages are published and consumed as expected. Below is the full log (as an example checkout this project , make sure to use branch name logback).

TKD [kafka-producer-network-thread | producer-1] DEBUG o.apache.kafka.clients.NetworkClient - Node -1 disconnected.
TKD [kafka-producer-network-thread | producer-1] DEBUG o.apache.kafka.clients.NetworkClient - Give up sending metadata request since no node is available
TKD [kafka-producer-network-thread | producer-2] DEBUG o.apache.kafka.clients.NetworkClient - Initialize connection to node -1 for sending metadata request
TKD [MyActorSystem-akka.actor.default-dispatcher-2] DEBUG o.apache.kafka.clients.NetworkClient - Initialize connection to node -1 for sending metadata request
TKD [MyActorSystem-akka.actor.default-dispatcher-5] DEBUG o.apache.kafka.clients.NetworkClient - Initialize connection to node -1 for sending metadata request
TKD [kafka-producer-network-thread | producer-2] DEBUG o.apache.kafka.clients.NetworkClient - Initiating connection to node -1 at localhost:9092.
TKD [MyActorSystem-akka.actor.default-dispatcher-2] DEBUG o.apache.kafka.clients.NetworkClient - Initiating connection to node -1 at localhost:9092.
TKD [MyActorSystem-akka.actor.default-dispatcher-5] DEBUG o.apache.kafka.clients.NetworkClient - Initiating connection to node -1 at localhost:9092.
TKD [kafka-producer-network-thread | producer-2] DEBUG o.a.kafka.common.network.Selector - Connection with localhost/127.0.0.1 disconnected
java.net.ConnectException: Connection refused
	at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
	at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
	at org.apache.kafka.common.network.PlaintextTransportLayer.finishConnect(PlaintextTransportLayer.java:51)
	at org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:73)
	at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:323)
	at org.apache.kafka.common.network.Selector.poll(Selector.java:291)
	at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260)
	at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:236)
	at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:135)
	at java.lang.Thread.run(Thread.java:745)

Support Kafka Headers [0.11.0]

https://cwiki.apache.org/confluence/display/KAFKA/KIP-82+-+Add+Record+Headers

With the latest release 0.11.0.0, Kafka's ProducerRecord allows adding headers in messages, would be good to have KafkaProducerRecord abstract out the same functionality at this client level.

    /**
     * Creates a record to be sent to a specified topic and partition
     *
     * @param topic The topic the record will be appended to
     * @param partition The partition to which the record should be sent
     * @param key The key that will be included in the record
     * @param value The record contents
     * @param headers The headers that will be included in the record
     */
    public ProducerRecord(String topic, Integer partition, K key, V value,  Iterable<Header> headers) {
        this(topic, partition, null, key, value, headers);
    }

Consider handling issues related to hanging within Apache's Kafka consumer

The following is not an issue with Scala Kafka client, but it would be nice to have a solution/workaround for it.

We have noticed that kafka consumer just hangs when the port or address of a bootstrap server is wrong (i.e., non-existent bs server). This happens in the kafka-console-consumer too, so it is likely an issue within the Java implementation.

A few bug reports/tickets that might be related:

The issue can easily be reproduced with the command:

kafka-console-consumer.sh --bootstrap-server google.com:9092 --from-beginning --topic anything

The process just hangs without producing any output.

The same happens in code, without emitting any error message about the non-existing broker.

Received confirmation for unexpected offsets

I see this sort of messages a lot in log after I send the confirm message.

[WARN] [03/03/2017 22:53:55.390] [MyActorSystem-akka.actor.default-dispatcher-5] [akka://MyActorSystem/user/pingActor/PingKafkaConsumerActor] Received confirmation for unexpected offsets: Offsets(pong-0 = 18000)
[INFO] [03/03/2017 22:53:55.390] [MyActorSystem-akka.actor.default-dispatcher-10] [akka://MyActorSystem/user/pingActor] In PongActor - id:e6a, msg: PingPongMessage(pong), offsets Offsets(pong-0 = 18000)
[INFO] [03/03/2017 22:53:55.390] [MyActorSystem-akka.actor.default-dispatcher-10] [akka://MyActorSystem/user/pingActor] In PongActor - id:K7V, msg: PingPongMessage(pong), offsets Offsets(pong-0 = 18000)
...
this is the configuration that I use :

val config: Config = ConfigFactory.parseString(
    s"""
       | bootstrap.servers = "localhost:9092",
       | group.id = "$randomString"
       | enable.auto.commit = false
       | auto.offset.reset = "earliest"
       | schedule.interval = 1 second
       | unconfirmed.timeout = 3 seconds
       | max.redeliveries = 3
        """.stripMargin
  )

how can I fix it ?

Any plans to add Scala Kafka Admin Client?

From what I see looking at the repo, currently you have wrappers for Consumer and Producer, but I think it would be nice to have a wrapper for KafkaAdminClient as well

Teskit log dependency error

Hi,

I am using the testkit latest version (0.10.1.2) but I have the following error when launching tests:
java.lang.NoClassDefFoundError: org/apache/log4j/Logger at kafka.utils.Logging$class.logger(Logging.scala:24) at kafka.server.KafkaServerStartable.logger$lzycompute(KafkaServerStartable.scala:32) at kafka.server.KafkaServerStartable.logger(KafkaServerStartable.scala:32) at kafka.utils.Logging$class.fatal(Logging.scala:118) at kafka.server.KafkaServerStartable.fatal(KafkaServerStartable.scala:32) at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:43) at cakesolutions.kafka.testkit.KafkaServer.startup(KafkaServer.scala:113)

My project is a Play application so I am using its logger and currently I have the following work-around in my build.sbt: libraryDependencies += "org.slf4j" % "log4j-over-slf4j" % "1.7.22" % Test
but then the KafkaServer class cannot log anything.

I also noticed that this error could be similar to #69 but I am not sure since the issue seems resolved.

Any ideas on how to fix it in the testkit ?

mapping over list to create seq of ProduceRecord doesn't map with correct types

 val topics = List("topic1", "topic2")
 val batch: Seq[ProducerRecord[String, String]] = topics.map( t =>
      KafkaProducerRecord(t, "bar"))

resulting the wrong type

Error:(38, 64) type mismatch;
found : List[org.apache.kafka.clients.producer.ProducerRecord[Nothing,String]]
required: Seq[org.apache.kafka.clients.producer.ProducerRecord[String,String]]
val batch: Seq[ProducerRecord[String, String]] = topics.map( t =>

but using other factory (by adding empty string) seems to work :

val topics = List("topic1", "topic2")
 val batch: Seq[ProducerRecord[String, String]] = topics.map( t =>
      KafkaProducerRecord(t, "", "bar"))

Health Check API

Add ability to the KafkaConsumerActor and KafkaProducerActor to inquire the connection health of the Consumer and Producer.

Create a source release

It would be great if you released a source artifact so a users IDE can automatically download it :)

following processing steps after consuming event from kafka

HI,

it is not an issue, more a question, but didnt find any other place where to ask it.

in my use case, I am fetching events from kafka and would like to enrich the event using some kind of API (lets assume REST API), then persist the event in cassandra, and finally to send the event to other kafka topic.

How would be better to implement it, tacking into account the order of events must be preserved ?

1 - Should each step (enrich, persist, kakfa producer) be done in dedicated actor ?

2 - Should all be done in the same Actor (akka reactive streams, for example, try to do as many steps as possible in same actor) ?

3 - another idea that I have is to use akka reactive stream and hence to extend the receiving actor
(SampleAcceptorActor in the example from here:
http://www.cakesolutions.net/teamblogs/getting-started-with-kafka-using-scala-kafka-client-and-akka)
with ActorPublisher so that the receiving actor will be kind of reactive source for the following process steps, which will executed using the akka reactive stream api ?

small update,
I have decide to try option 3.
this is my code based on example from akka reactive streams docs:

class ConsumerAtMostOnceReactiveActor(config: Config, clientID: String, maxBufferSize: Int, topics: List[String]) extends ActorPublisher[ConsumerRecord[String, String]] with ActorLogging {

  import akka.stream.actor.ActorPublisherMessage._

  private val extractor = ConsumerRecords.extractor[String, String]

  val MaxBufferSize = maxBufferSize
  var buf = Vector.empty[ConsumerRecord[String, String]]

  // Configuration specific to the Async Consumer Actor
  val actorConf = KafkaConsumerActor.Conf(topics, 1.seconds, 3.seconds)

  private val kafkaConsumerActor = context.actorOf(
    KafkaConsumerActor.props(
      consumerConf = KafkaConsumer.Conf(
        config.withValue("client.id", ConfigValueFactory.fromAnyRef(clientID)),
        keyDeserializer = new StringDeserializer,
        valueDeserializer = new StringDeserializer
      ),
      actorConf = actorConf,
      self
    ),
    "KafkaConsumer"
  )

  override def preStart() = {
    super.preStart()
    kafkaConsumerActor ! Subscribe()
  }

  override def postStop() = {
    kafkaConsumerActor ! Unsubscribe
    Logger.root.info("shutting down kafka consumer system")
    super.postStop()
  }

  override def receive = {
    case extractor(consumerRecords) if buf.size == MaxBufferSize =>
    case extractor(consumerRecords) =>
      kafkaConsumerActor ! Confirm(consumerRecords.offsets, commit = true)
      buf ++= consumerRecords.recordsList
      deliverBuf()
    case Request(_) =>
      deliverBuf()
    case Cancel =>
      context.stop(self)
  }

  @tailrec final def deliverBuf(): Unit =
    if (totalDemand > 0) {
      /*
       * totalDemand is a Long and could be larger than
       * what buf.splitAt can accept
       */
      if (totalDemand <= Int.MaxValue) {
        val (use, keep) = buf.splitAt(totalDemand.toInt)
        buf = keep
        use foreach onNext
      } else {
        val (use, keep) = buf.splitAt(Int.MaxValue)
        buf = keep
        use foreach onNext
        deliverBuf()
      }
    }
}

IT would be still great to get feedback regarding my question (which option will be better, and as well regarding my way of implementing option 3)

Thanks,

Use of Kafka Server in the toolkit

Hi again,

Since there is very little documentation about the testkit, I'm really struggling with the use of KafkaServer.

First of all, I don't really know if I'm supposed to create a new KafkaServer for every test or if I can reuse the same server for all the tests in my suite ?

Second, I would prefer to re-use the same instance in all my tests in the Suite. But the problem is that the consume function reads every message from the log and not just those I have produced in the current test but also messages already consumed in previous tests. Is there a way, other than having a new instance of KafkaServer for every test, to remove the existing messages at the end of each test or to only consume the messages I'm interested in in the current test ?

Dynamic subscription to new topic

I am working on a use case where topics are created dynamically and delete the topics after the message is sent .
I am developing a Kafka Consumer Listener which listens to new topics created and subscribe to it automatically .ConsumerSelfManaged.scala example shows a subscription for a single topics when the application starts .Is there any implementation of this library through which i can create a scheduler for subscribing to new topics when it is created ?.

Fail to cast while using ConsumerRecords.extractor[Long,String]

I used private val extractor = ConsumerRecords.extractor[Long,String] as an extractor, but I failed to get any message.

If changing to ConsumerRecords.extractor[String,String], it worked.

I found that the hasType[Other: TypeTag] method in TypeTaggedTrait trait returned false while not having String as key. It's very strange.

Anyone has this problem?

KafkaProducer is created for each KafkaProducerActor

As the JKafkaProducer is threadsafe, it's recommended to use the same instance among thread (meaning among Actors) to have better performances. Solution I see to change it (correct me if I am wrong because I am new to Scala) :

  1. first create KafkaProducer and then pass the instance to KafkaProducerActor. Pros : easy to implement. Cons : the user/developer has to deal with 2 objects
  2. provide trait only for KafkaProducer. that means if you need more than one producer you'll have to inherits in several different singleton object. Pros: I don't really see. Cons : need to create singleton and pass the singleton to KafkaProducerActor (same as 1 ?)
  3. what would be ideal is when instantiating KafkaProducerActor, check the conf and then if no already existing KafkaProducer exists (comparing the conf), create a new one, if exists reuse this

Combining messages with same Key using downstream actor

I have a use case where Kafka Consumer Actor receives messages in batches with the same Key for same Topic.As some of the messages are large ,Kafka Producer slices the messages and send the messages in small chunks in batches for the same topic.
These messages Key are unique for particular request and can change during next request
Now using a downstream Akka actor i need to combine the Kafka messages with same key when received by customer .

Lets say i get messages in batches (key1,messge1),(key2,message3),(key1,message2),(key2,message4)
After the consumer gets the messages i want to combine as below
key1===> message1 +message2
key2 ===>message3+message4

Any idea about how to create a receiver Actor or a Service which combines the messages?

kafka auto.offset.reset earliest not working

Hi,

I was using the auto.offset.reset "earliest" strategy, and when started a consumer with new group.id, which subscribed to already existing topics (with LOG-END-OFFSET > 0 for topics partitions),
The consumer was not able to pull any data from kafka.
using kafka-consumer-groups.sh the output was

GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG OWNER
my-group test-topic 0 unknown 197 unknown client123_/192.168.99.1

just to verify, I tried also using ./kafka-console-consumer.sh with --from-beginning flag, and that was fine.

Thanks,

Consumer Actor blocks on commit

After reading that the kafka consumer was non-blocking, I saw this (which is of course blocking):

https://github.com/cakesolutions/scala-kafka-client/blob/master/akka/src/main/scala/cakesolutions/kafka/akka/KafkaConsumerActor.scala#L752

private def tryCommit(offsetsToCommit: Offsets, state: StateData): Try[Unit] = {
    try {
      consumer.commitSync(offsetsToCommit.toCommitMap)
      Success({})
    } catch {
...

Are there plans to use the commitAsync() method instead? It seems silly to me that you are committing synchronously within an actor. It should be fairly trivial to convert this to an async message-passing model.

getting EOFException on tests

I have noticed this exception when running tests on KafkaServer

 TKD [kafka-network-thread-1-PLAINTEXT-1] DEBUG o.a.kafka.common.network.Selector - Connection with /192.168.1.107 disconnected
java.io.EOFException: null
	at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:83)
	at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
	at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:154)
	at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:135)
	at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:343)
	at org.apache.kafka.common.network.Selector.poll(Selector.java:291)
	at kafka.network.Processor.poll(SocketServer.scala:476)
	at kafka.network.Processor.run(SocketServer.scala:416)
	at java.lang.Thread.run(Thread.java:745)

In the logs I noticed also the following :

TKD [KafkaConsumerTest-akka.actor.default-dispatcher-2] DEBUG o.a.kafka.common.network.Selector - Connection with localhost/127.0.0.1 disconnected
java.net.ConnectException: Connection refused
	at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
	at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
	at org.apache.kafka.common.network.PlaintextTransportLayer.finishConnect(PlaintextTransportLayer.java:51)
	at org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:73)
	at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:323)
	at org.apache.kafka.common.network.Selector.poll(Selector.java:291)
	at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:232)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:209)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:148)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:136)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:197)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:248)
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1013)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:979)
	at cakesolutions.kafka.akka.KafkaConsumerActorImpl$$anonfun$cakesolutions$kafka$akka$KafkaConsumerActorImpl$$pollKafka$1.apply(KafkaConsumerActor.scala:711)
	at cakesolutions.kafka.akka.KafkaConsumerActorImpl$$anonfun$cakesolutions$kafka$akka$KafkaConsumerActorImpl$$pollKafka$1.apply(KafkaConsumerActor.scala:709)
	at cakesolutions.kafka.akka.KafkaConsumerActorImpl.tryWithConsumer(KafkaConsumerActor.scala:721)
	at cakesolutions.kafka.akka.KafkaConsumerActorImpl.cakesolutions$kafka$akka$KafkaConsumerActorImpl$$pollKafka(KafkaConsumerActor.scala:709)
	at cakesolutions.kafka.akka.KafkaConsumerActorImpl$$anonfun$cakesolutions$kafka$akka$KafkaConsumerActorImpl$$ready$1.applyOrElse(KafkaConsumerActor.scala:507)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
	at akka.actor.Actor$class.aroundReceive(Actor.scala:496)
	at cakesolutions.kafka.akka.KafkaConsumerActorImpl.aroundReceive(KafkaConsumerActor.scala:356)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
	at akka.actor.ActorCell.invoke(ActorCell.scala:495)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
	at akka.dispatch.Mailbox.run(Mailbox.scala:224)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Not an issue, but a question about where to catch 'java.net.ConnectException'?

Ex: I have a piece of code:

val kafkaConsumer = system.actorOf(
    KafkaConsumerActor.props(
        KafkaConsumer.Conf(config.getConfig("kafka"), new StringDeserializer, new StringDeserializer),
        KafkaConsumerActor.Conf(config.getConfig("kafka-actor")),
        kafkaService
    )
)
kafkaConsumer ! Subscribe()

If Kafka server is not started yet, where to catch java.net.ConnectException?
I surround the piece of code above by try catch, but I did not catch the exception.

Documentation improvements

Add following sections to documentation:

  • Supervisor Strategy/ Error Handling
  • Chaining Consumer and producer
  • Examples

kafka rebalance listner

Hi,

is there option to add custom ConsumerRebalanceListener with the subscription to a topic ?

Thanks

Idle downstream actors until confirmation is send

Watching the KafkaConsumerActor API I understand that the only way of process N messages batches from the same topic concurrently is creating N KafkaConsumerActor with the same groupId and each one with his own corresponding downstream actor.
From KafkaConsumerActor.

Before the actor continues pulling more data from Kafka, the receiver of the data must confirm the batches by sending back a [[KafkaConsumerActor.Confirm]] message that contains the offsets from the received batch.

If we want to achieve at-least-once semantics we must wait until each batch processing succeeds in order to send the Confirm message with the offsets.
So, although the downstream actor is not blocked (assuming we've done things right processing the batch async, e.g in a Future) it won't receive more batches until the Confirmation(offsets) is send and I see this being equal to blocking the actor.
Suppose the first batch contains messages
first message
second message
third message
I writes this messages to a Mongo DB in a Future and send a confirmation message to the KafkaConsumerActor when this Futures succeeds.
If new messages comes
fourth message
fifth message
I would like to be able to process this new messages even if the first write to Mongo hasn't finished.
Until Mongo confirm the writes the downstream actor will be in a idle state, so why KafkaConsumerActor couldn't keep sending new batches?
I understand that this works as a mechanism to avoid overwhelming the downstream actor

This mechanism allows the receiver to control the maximum rate of messages it will receive.

But we couldn't configure a threshold of batches that can be sent without confirmation? Something like split the unconfirmed state in unconfirmedThresholdUnreached and unconfirmedThresholdReached.
states

If creating N KafkaConsumerActor + downstream actor (with same groupid) is the way to go, I must choose a fixed number of actors?

I hope I've made myself clear.
Thanks!

Subscribe.AutoPartitionWithManualOffset closes over actor instance

When sending Subscribe.AutoPartitionWithManualOffset to the KafkaConsumerActor, the assignedListener and revokedListener functions may end up capturing the actor instance. The current API also makes it difficult to use non-blocking code to load the offsets.

It would be more convenient to have

final case class AutoPartitionWithManualOffset(
  topics: Iterable[String],
  assignedListener: List[TopicPartition] => Future[Offsets],
  revokedListener: List[TopicPartition] => Future[Unit]
) extends Subscribe

For safe use in actors, it would be useful to have a wrapper around assignedListener and revokedListener that works by asking a supplied ActorRef for a response (Offsets) to AssignedListeners(topics: List[TopicPartition) and RevokedListeners(...).

example of consumer to receiver messaging

Hi, I'm still pretty new to Scala and am trying to get the cakesolutions akka client up and running.
I think I have got my (very) rough implementation connected to kafka, but am puzzled as to how to pass messages between consumer <--> receiver. My code is below.

I would be very grateful if anyone could possibly give me any pointers or examples on how to get consumer and receiver actors talking?

object eatKafka extends App {

  println("\n\n\n")
  // Create an Akka system
  println("\n\ncreating system...")
  val system = ActorSystem("VySystem")

  // create receiver
  println("\n\ncreating receiver...")
  val receiver = system.actorOf(Props[ReceiverActor], name = "receiver")

  // create cake kafka consumer
  println("\n\ncreating consumer...")
  val conf = ConfigFactory.load()

  //Construct the KafkaConsumerActor with Typesafe config in resources
  val cakeKConsumer = system.actorOf(
    KafkaConsumerActor.props(conf, new ByteArrayDeserializer(), new ByteArrayDeserializer(), receiver)
  )

  println("pass subscription details to cakeKConsumer")
  cakeKConsumer ! Subscribe.AutoPartition(List("impression")) //Subscribe.ManualOffset(Offsets(Map((new TopicPartition("impression", 0), 1))))

}


/////////////////////////////////////////////////////////////////////
// kafka consumer actor
class kConsumeCakeActor extends Actor {
  // https://github.com/cakesolutions/scala-kafka-client/blob/master/akka/src/main/scala/cakesolutions/kafka/akka/KafkaConsumerActor.scala
  println("\n\nIn consumer!!")
  override def receive = {
    case default => println("I am kConsumeCakeActor")
  }
}


/////////////////////////////////////////////////////////////////////
// receiver actor
class ReceiverActor extends Actor {
  // Extractor for ensuring type safe cast of records
  val recordsExt = ConsumerRecords.extractor[Int, ByteString]
  println("In receiver")
  override def receive: Receive = {
    case recordsExt(records) =>
      println("do something with records")
      //for (record <- records) println(record)
      sender() ! Confirm(records.offsets)

    case default => println("I am receiver!!")
  }
}

Sample output:

[info] Running eatKafka 
[info] 
[info] 
[info] 
[info] 
[info] 
[info] 
[info] creating system...
[info] 
[info] 
[info] creating receiver...
[info] 
[info] 
[info] creating consumer...
[info] In receiver
[info] pass subscription details to cakeKConsumer

[info] [INFO] [08/24/2017 09:33:20.853] [VySystem-akka.actor.default-dispatcher-3] [akka://VySystem/user/$a] Subscribing in auto partition assignment mode to topics [impression].
[info] I am receiver!!
[info] [INFO] [08/24/2017 09:33:31.493] [VySystem-akka.actor.default-dispatcher-3] [akka://VySystem/user/$a] In bufferFull: records timed out while waiting for a confirmation.
[info] [INFO] [08/24/2017 09:33:31.493] [VySystem-akka.actor.default-dispatcher-3] [akka://VySystem/user/$a] In bufferFull: redelivering.
[info] I am receiver!!
[info] I am receiver!!

actors fail to terminate

Hi,
I am working on example project using scala-kafka-client (planning to submit it as activator template).
I am integrating with akka actors but It seems that the actors fail to terminate.
you can checkout this project (would love to get your feedback). Running the PingPongActorSpec test you can see that the actors fail to terminate :

[WARN] [03/02/2017 08:22:03.141] [ScalaTest-run] [akka.actor.ActorSystemImpl(MySpec)] Failed to stop [MySpec] within [10 seconds]
-> / LocalActorRefProvider$$anon$1 class akka.actor.LocalActorRefProvider$Guardian status=0 2 children
⌊-> system LocalActorRef class akka.actor.LocalActorRefProvider$SystemGuardian status=0 5 children
| ⌊-> deadLetterListener RepointableActorRef class akka.event.DeadLetterListener status=0 no children
| ⌊-> eventStreamUnsubscriber-1 RepointableActorRef class akka.event.EventStreamUnsubscriber status=0 no children
| ⌊-> log1-Logging$DefaultLogger RepointableActorRef class akka.event.Logging$DefaultLogger status=0 no children
| ⌊-> testActor1 RepointableActorRef class akka.testkit.TestActor status=0 no children
| ⌊-> testActor2 RepointableActorRef class akka.testkit.TestActor status=0 no children
⌊-> user LocalActorRef class akka.actor.LocalActorRefProvider$Guardian status=4 Terminating(Termination)
| toDie: Actor[akka://MySpec/user/PongTest#1896611585]
⌊-> PongTest RepointableActorRef class com.example.PongActor status=4 Terminating(Termination)
| toDie: Actor[akka://MySpec/user/PongTest/PingKafkaConsumerActor#-1239449583]
⌊-> PingKafkaConsumerActor LocalActorRef class cakesolutions.kafka.akka.KafkaConsumerActorImpl status=2 no children

at most once

Hi,

Is there a way to use at most once commit strategy, but to commit each record separately instead in batches ?

Thanks,

KafkaConsumerActor should watch for downstream actor termination

KafkaConsumerActor created with ActorRef for the downstreamActor and in case if downstream actor terminates this ActorRef will no longer be valid, e.g. all messages will go to DeadLetters.

At the moment there is no DeathWatch setup to monitor downstream actor lifecycle, and because KafkaConsumerActor do not supervise downstream actor, it will still try to deliver messages to dead actor.

I suggest to add watch(downstreamActor), in KafkaConsumerActor and terminate KafkaConsumerActor if downstreamActor terminates and close connection.

Exception thrown when kafka consumer actor is polling

When I launch my Play application, it instantiates an actor handling a kafka consumer actor this way:

class ConsumerActor @Inject()(configuration: Configuration) extends Actor {

  val logger = Logger(this.getClass)

  def topic: String

  def consumerActorName: String

  val consumerConfig: Config = configuration.underlying.as[Config]("kafka.consumer")

  val consumerActorConfig: Config = configuration.underlying.as[Config]("kafka.actor")

  val consumerConf: Conf[String, String] = KafkaConsumer.Conf(consumerConfig, new StringDeserializer, new StringDeserializer)

  val actorConf: KafkaConsumerActor.Conf = KafkaConsumerActor.Conf().withConf(consumerActorConfig)

  val recordsExt: Extractor[Any, ConsumerRecords[String, String]] = ConsumerRecords.extractor[String, String]

  val consumer: ActorRef = context.actorOf(KafkaConsumerActor.props(consumerConf, actorConf, self), consumerActorName)
  context.watch(consumer)

  override def preStart(): Unit = {
    super.preStart()
    consumer ! Subscribe.AutoPartition(List(topic))
  }

  override def postStop(): Unit = {
    consumer ! Unsubscribe
    super.postStop()
  }

  override def receive: Receive = {
    // Records from Kafka
    case recordsExt(records) =>
      processRecords(records.pairs)
      consumer ! Confirm(records.offsets, commit = true)
  }

  protected def processRecords(records: Seq[(Option[String], String)]): Unit = { ... }
}

My configuration is as follows:

## Kafka
#
kafka {
  bootstrap.servers = "localhost:9092"
  producer {
    bootstrap.servers = ${kafka.bootstrap.servers}
    acks = "all"
    compression.type = "gzip"
    retries = 3
  }
  consumer {
    bootstrap.servers = ${kafka.bootstrap.servers}
    group.id = "test-group"
    auto.offset.reset = "earliest"
    enable.auto.commit = "false"
  }
  actor {
    schedule.interval = 1 second
    unconfirmed.timeout = 3 seconds
    max.redeliveries = 3
  }
}

And my Guice module for actors looks like this:

class AkkaModule extends ScalaModule with AkkaGuiceSupport {

  /**
    * Configures the module.
    */
  def configure() {
    // bind consumer
    bindActor[ConsumerActor](ConsumerActor.name)
  }
}

I have the following exception when the kafka consumer actor tries to poll the broker for new messages:

cakesolutions.kafka.akka.KafkaConsumerActor$ConsumerException: Exception thrown from Kafka consumer!
	at cakesolutions.kafka.akka.KafkaConsumerActorImpl.cakesolutions$kafka$akka$KafkaConsumerActorImpl$$consumerFailure(KafkaConsumerActor.scala:766)
	at cakesolutions.kafka.akka.KafkaConsumerActorImpl.tryWithConsumer(KafkaConsumerActor.scala:728)
	at cakesolutions.kafka.akka.KafkaConsumerActorImpl.cakesolutions$kafka$akka$KafkaConsumerActorImpl$$pollKafka(KafkaConsumerActor.scala:709)
	at cakesolutions.kafka.akka.KafkaConsumerActorImpl$$anonfun$cakesolutions$kafka$akka$KafkaConsumerActorImpl$$ready$1.applyOrElse(KafkaConsumerActor.scala:507)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
	at akka.actor.Actor$class.aroundReceive(Actor.scala:496)
	at cakesolutions.kafka.akka.KafkaConsumerActorImpl.aroundReceive(KafkaConsumerActor.scala:356)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
	at akka.actor.ActorCell.invoke(ActorCell.scala:495)
Caused by: org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'correlation_id': java.nio.BufferUnderflowException
	at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:73)
	at org.apache.kafka.common.requests.ResponseHeader.parse(ResponseHeader.java:53)
	at org.apache.kafka.clients.NetworkClient.parseResponse(NetworkClient.java:376)
	at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:449)
	at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:269)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:232)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:180)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:193)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:248)
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1013)

Do you have any idea where it comes from ?

Getting exception of fast load

I am trying to publish 100K messages to kafka however after a while I am getting this errors:

org.apache.kafka.common.errors.TimeoutException: Expiring 69 record(s) for mytopic-0 due to 30001 ms has passed since batch creation plus linger time
484305-19:11:04.002 [foo-akka.actor.default-dispatcher-2] ERROR c.kafka.akka.KafkaProducerActor - Failed to send message to Kafka!

And this one

625596:org.apache.kafka.common.errors.TimeoutException: Expiring 69 record(s) for mytopic-0 due to 30026 ms has passed since last append
625597-19:11:12.350 [foo-akka.actor.default-dispatcher-6] ERROR c.kafka.akka.KafkaProducerActor - Failed to send message to Kafka!

Versioning scheme

The versioning scheme described in the README didn't work out quite as well as I had hoped. The current version scheme assumes that the first two parts of the version are dedicated for Kafka compatibility while the rest of the parts describe SKC versioning. Since the Kafka project decided to make additional incompatible changes in 0.10.1, we'd have to track the extra third part of the version.

Figure out a versioning scheme for SKC that tracks both Kafka compatibility and the client compatibility.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.