Giter Club home page Giter Club logo

akka-kafka's People

Contributors

coreyauger avatar kelvl avatar sclasen avatar xuwei-k avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

akka-kafka's Issues

(low priority) Coda Hale Metrics for topic offset, stream metrics

Although we are currently using: http://quantifind.com/KafkaOffsetMonitor/

...it would be ideal if support for Coda Hale Metrics was supported in akka-kafka, so we could track:

  • offset status for each topic (e.g. ratio of current consumer offset to topic head)
  • consumption rate for each topic
  • latency of fetch vs commit for each batch for each topic
  • histogram of batch sizes per topic
  • etc.

http://metrics.codahale.com/

I think as long as the MetricRegistry name is a Typesafe Config variable used by both akka-kakfa and any app also using Coda Hale metrics, I believe we can rendezvous via the:

com.codahale.metrics.SharedMetricRegistries.getOrCreate(String name) method.

(Right now, we are intensively using CodaHale metrics in all our apps, and feeding, e.g.: Graphite, Riemann, etc. -- although the KafkaOffsetMonitor listed earlier is a nice tool, we like to have the metrics fully integrated into our standard monitoring and alerting pipeline, and Coda Hale Metrics gets us there)

If all this is clear as mud, just ping me for more detail.

consumer read from specific partition

Hi,

Suppose I have N partitions. I would like to have X different consumer threads ( X < N) read from a specified set of partitions. How can I achieve this?

Thanks

What exactly is outstanding?

Can you please tell me what exactly outstanding means? Cause after some time of consuming I start receiving messages like: state=WaitingToSendBatch msg=StateTimeout outstanding=1 streams=8

Unable to cleanly stop AkkaConsumer

I have an actor in which I create an AkkaConsumer.

class ConsumerActor(topic: String) with Actor with ActorLogging {
    val akkaConsumer: AkkaConsumer[String, String] = createAkkaConsumer()

    private def createAkkaConsumer() = {
        val consumerProps = AkkaConsumerProps.forContext(
            context = context,
            zkConnect = "localhost:2181",
            topic = topic,
            group = s"Group-$topic",
            streams = 1,
            keyDecoder = new StringDecoder(),
            msgDecoder = new StringDecoder(),
            receiver = self,
            commitConfig = new CommitConfig()
        )

        new AkkaConsumer(consumerProps)
    }

    override def receive: Receive = {
        case Start => akkaConsumer.start() 
        case x: Any =>
            log.info(s"Kafka message: $x")
            sender() ! StreamFSM.Processed
    }

    @throws[Exception](classOf[Exception])
    override def postStop(): Unit = {
        akkaConsumer.stop().onComplete {
            case Success(_) => log.info("Finished stopping akka consumer")
            case Failure(exception) => log.error(exception, "Error stopping akka consumer")
        }
    }
}

When I'm stopping the actor system (to which the ConsumerActor is a child) I always get the error "Error stopping akka consumer" in my logs.
The stacktrace looks like this:

akka.pattern.AskTimeoutException: Recipient[Actor[akka://mySystem/user/consumer-starter/consumer-482628338]] had already been terminated.
at akka.pattern.AskableActorRef$.ask$extension(AskSupport.scala:132) ~[akka-actor_2.11-2.3.11.jar:na]
at akka.pattern.AskableActorRef$.$qmark$extension(AskSupport.scala:144) ~[akka-actor_2.11-2.3.11.jar:na]
at com.sclasen.akka.kafka.AkkaConsumer.stop(AkkaConsumer.scala:61) ~[akka-kafka_2.11-0.1.0.jar:0.1.0]

I guess this is due to the fact that I'm closing the akkaConsumer in the postStop function, which means the actor has already been closed. Is there any other way of doing this?

Question about state on the stream

Hi

Could you give me some help regarding the state that need to be used for the sync between my actor and yours.

StreamFSM.Processed = I've finished to process this message and must be sended back for each message.

But I can't use it because I receive the message in actor A process it in Actor B so could I use the StreamFSM.StartProcessing in A and send StreamFSM.Processed in B ?

Or is there other states that are relevant ?
Thanks

Race / off-by-one issue in BatchFSM? (Stuck temporarily in WaitingToSendBatch)

Hi Scott,

We are running 0.1.0 on a large 48 core box, and the service keeps choking in BatchConnectorFSM in the WaitingToSendBatch state, no matter what the value of the BatchTimeout. Furthermore, when I increase the number of streams past a certain state (e.g. 16), the system spends all its time blocked on the LinkedBlockingQueue (even though the topic has 48 partitions):

"Archival-akka.actor.default-dispatcher-49" prio=10 tid=0x00007f0df4003800 nid=0x9e65 waiting on condition [0x00007f0aa25e4000]
java.lang.Thread.State: TIMED_WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000005c925c1c0> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2082)
at java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467)
at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:65)
at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33)
at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
at com.sclasen.akka.kafka.BatchStreamFSM.hasNext(BatchActors.scala:182)
...

The ConsumerGroup is subscribing to some topics with 48 partitions, and they are full of billions of messages for each partition. I have been forced to only run a couple streams in order to make progress, because as the stream count increases, throughput decreases.

I haven't been able to figure out why the progress is choking in WaitingToSendBatch. The server has 10GigE, and isn't limited by any of: CPU, disk, network, but rather a locking/concurrency issue.

More logs of the main topic here:

2015-07-25T00:54:45.920+0000 INFO Archival-akka.actor.default-dispatcher-55 com.whitepages.ait.archival.Archival Finished dispatch/ack batch of size: 39999 to LogWriters in 59 millis from actor: Actor[akka://Archival/user/KafkaLogProcessor-prod.log.general-128919660836139306#-1505022696]
2015-07-25T00:54:45.920+0000 INFO Archival-akka.actor.default-dispatcher-55 com.whitepages.logging.LogProcessor In prod.log.general LogProcessor, got Ack from: Actor[akka://Archival/deadLetters] with count: 39999; status: 39999 / 39999}
2015-07-25T00:54:45.920+0000 INFO Archival-akka.actor.default-dispatcher-55 com.whitepages.logging.LogProcessor In processAck from Actor[akka://Archival/deadLetters], sending BatchProcessed to: Actor[akka://Archival/user/KafkaLogProcessor-prod.log.general-128919660836139306/prod.log.general.connector8919660836712630#-1171870480]
2015-07-25T00:54:46.435+0000 INFO Archival-akka.actor.default-dispatcher-59 com.whitepages.ait.archival.Archival Finished dispatch/ack batch of size: 40000 to LogWriters in 43 millis from actor: Actor[akka://Archival/user/KafkaLogProcessor-prod.log.general-128919660836139306#-1505022696]
2015-07-25T00:54:46.435+0000 INFO Archival-akka.actor.default-dispatcher-59 com.whitepages.logging.LogProcessor In prod.log.general LogProcessor, got Ack from: Actor[akka://Archival/deadLetters] with count: 40000; status: 40000 / 40000}
2015-07-25T00:54:46.435+0000 INFO Archival-akka.actor.default-dispatcher-59 com.whitepages.logging.LogProcessor In processAck from Actor[akka://Archival/deadLetters], sending BatchProcessed to: Actor[akka://Archival/user/KafkaLogProcessor-prod.log.general-128919660836139306/prod.log.general.connector8919660836712630#-1171870480]
[WARN] [07/25/2015 00:54:47.892] [Archival-akka.actor.default-dispatcher-46] [akka://Archival/user/KafkaLogProcessor-prod.log.general-128919660836139306/prod.log.general.connector8919660836712630] state=WaitingToSendBatch msg=StateTimeout outstanding=1 streams=2
[WARN] [07/25/2015 00:54:48.902] [Archival-akka.actor.default-dispatcher-64] [akka://Archival/user/KafkaLogProcessor-prod.log.general-128919660836139306/prod.log.general.connector8919660836712630] state=WaitingToSendBatch msg=StateTimeout outstanding=1 streams=2
[WARN] [07/25/2015 00:54:49.922] [Archival-akka.actor.default-dispatcher-37] [akka://Archival/user/KafkaLogProcessor-prod.log.general-128919660836139306/prod.log.general.connector8919660836712630] state=WaitingToSendBatch msg=StateTimeout outstanding=1 streams=2
2015-07-25T00:54:50.782+0000 INFO Archival-akka.actor.default-dispatcher-21 com.whitepages.ait.archival.Archival Finished dispatch/ack batch of size: 39999 to LogWriters in 28 millis from actor: Actor[akka://Archival/user/KafkaLogProcessor-prod.log.general-128919660836139306#-1505022696]
2015-07-25T00:54:50.782+0000 INFO Archival-akka.actor.default-dispatcher-21 com.whitepages.logging.LogProcessor In prod.log.general LogProcessor, got Ack from: Actor[akka://Archival/deadLetters] with count: 39999; status: 39999 / 39999}
2015-07-25T00:54:50.782+0000 INFO Archival-akka.actor.default-dispatcher-21 com.whitepages.logging.LogProcessor In processAck from Actor[akka://Archival/deadLetters], sending BatchProcessed to: Actor[akka://Archival/user/KafkaLogProcessor-prod.log.general-128919660836139306/prod.log.general.connector8919660836712630#-1171870480]
2015-07-25T00:54:51.263+0000 INFO Archival-akka.actor.default-dispatcher-48 com.whitepages.ait.archival.Archival Finished dispatch/ack batch of size: 40000 to LogWriters in 46 millis from actor: Actor[akka://Archival/user/KafkaLogProcessor-prod.log.general-128919660836139306#-1505022696]
2015-07-25T00:54:51.263+0000 INFO Archival-akka.actor.default-dispatcher-48 com.whitepages.logging.LogProcessor In prod.log.general LogProcessor, got Ack from: Actor[akka://Archival/deadLetters] with count: 40000; status: 40000 / 40000}
2015-07-25T00:54:51.263+0000 INFO Archival-akka.actor.default-dispatcher-48 com.whitepages.logging.LogProcessor In processAck from Actor[akka://Archival/deadLetters], sending BatchProcessed to: Actor[akka://Archival/user/KafkaLogProcessor-prod.log.general-128919660836139306/prod.log.general.connector8919660836712630#-1171870480]
2015-07-25T00:54:51.751+0000 INFO Archival-akka.actor.default-dispatcher-49 com.whitepages.ait.archival.Archival Finished dispatch/ack batch of size: 40000 to LogWriters in 55 millis from actor: Actor[akka://Archival/user/KafkaLogProcessor-prod.log.general-128919660836139306#-1505022696]
2015-07-25T00:54:51.752+0000 INFO Archival-akka.actor.default-dispatcher-49 com.whitepages.logging.LogProcessor In prod.log.general LogProcessor, got Ack from: Actor[akka://Archival/deadLetters] with count: 40000; status: 40000 / 40000}
2015-07-25T00:54:51.752+0000 INFO Archival-akka.actor.default-dispatcher-49 com.whitepages.logging.LogProcessor In processAck from Actor[akka://Archival/deadLetters], sending BatchProcessed to: Actor[akka://Archival/user/KafkaLogProcessor-prod.log.general-128919660836139306/prod.log.general.connector8919660836712630#-1171870480]
2015-07-25T00:54:52.121+0000 INFO Archival-akka.actor.default-dispatcher-6 com.whitepages.ait.archival.Archival Finished dispatch/ack batch of size: 40000 to LogWriters in 51 millis from actor: Actor[akka://Archival/user/KafkaLogProcessor-prod.log.general-128919660836139306#-1505022696]
2015-07-25T00:54:52.121+0000 INFO Archival-akka.actor.default-dispatcher-6 com.whitepages.logging.LogProcessor In prod.log.general LogProcessor, got Ack from: Actor[akka://Archival/deadLetters] with count: 40000; status: 40000 / 40000}
2015-07-25T00:54:52.121+0000 INFO Archival-akka.actor.default-dispatcher-6 com.whitepages.logging.LogProcessor In processAck from Actor[akka://Archival/deadLetters], sending BatchProcessed to: Actor[akka://Archival/user/KafkaLogProcessor-prod.log.general-128919660836139306/prod.log.general.connector8919660836712630#-1171870480]
[WARN] [07/25/2015 00:54:53.522] [Archival-akka.actor.default-dispatcher-62] [akka://Archival/user/KafkaLogProcessor-prod.log.general-128919660836139306/prod.log.general.connector8919660836712630] state=WaitingToSendBatch msg=StateTimeout outstanding=1 streams=2
[WARN] [07/25/2015 00:54:54.542] [Archival-akka.actor.default-dispatcher-4] [akka://Archival/user/KafkaLogProcessor-prod.log.general-128919660836139306/prod.log.general.connector8919660836712630] state=WaitingToSendBatch msg=StateTimeout outstanding=1 streams=2
[WARN] [07/25/2015 00:54:55.563] [Archival-akka.actor.default-dispatcher-60] [akka://Archival/user/KafkaLogProcessor-prod.log.general-128919660836139306/prod.log.general.connector8919660836712630] state=WaitingToSendBatch msg=StateTimeout outstanding=1 streams=2
2015-07-25T00:54:56.558+0000 INFO Archival-akka.actor.default-dispatcher-9 com.whitepages.ait.archival.Archival Finished dispatch/ack batch of size: 39999 to LogWriters in 96 millis from actor: Actor[akka://Archival/user/KafkaLogProcessor-prod.log.general-128919660836139306#-1505022696]
2015-07-25T00:54:56.559+0000 INFO Archival-akka.actor.default-dispatcher-9 com.whitepages.logging.LogProcessor In prod.log.general LogProcessor, got Ack from: Actor[akka://Archival/deadLetters] with count: 39999; status: 39999 / 39999}
2015-07-25T00:54:56.559+0000 INFO Archival-akka.actor.default-dispatcher-9 com.whitepages.logging.LogProcessor In processAck from Actor[akka://Archival/deadLetters], sending BatchProcessed to: Actor[akka://Archival/user/KafkaLogProcessor-prod.log.general-128919660836139306/prod.log.general.connector8919660836712630#-1171870480]
2015-07-25T00:54:56.933+0000 INFO Archival-akka.actor.default-dispatcher-25 com.whitepages.ait.archival.Archival Finished dispatch/ack batch of size: 40000 to LogWriters in 42 millis from actor: Actor[akka://Archival/user/KafkaLogProcessor-prod.log.general-128919660836139306#-1505022696]
2015-07-25T00:54:56.934+0000 INFO Archival-akka.actor.default-dispatcher-25 com.whitepages.logging.LogProcessor In prod.log.general LogProcessor, got Ack from: Actor[akka://Archival/deadLetters] with count: 40000; status: 40000 / 40000}
2015-07-25T00:54:56.934+0000 INFO Archival-akka.actor.default-dispatcher-25 com.whitepages.logging.LogProcessor In processAck from Actor[akka://Archival/deadLetters], sending BatchProcessed to: Actor[akka://Archival/user/KafkaLogProcessor-prod.log.general-128919660836139306/prod.log.general.connector8919660836712630#-1171870480]
2015-07-25T00:54:57.409+0000 INFO Archival-akka.actor.default-dispatcher-56 com.whitepages.ait.archival.Archival Finished dispatch/ack batch of size: 40000 to LogWriters in 45 millis from actor: Actor[akka://Archival/user/KafkaLogProcessor-prod.log.general-128919660836139306#-1505022696]
2015-07-25T00:54:57.409+0000 INFO Archival-akka.actor.default-dispatcher-56 com.whitepages.logging.LogProcessor In prod.log.general LogProcessor, got Ack from: Actor[akka://Archival/deadLetters] with count: 40000; status: 40000 / 40000}
2015-07-25T00:54:57.409+0000 INFO Archival-akka.actor.default-dispatcher-56 com.whitepages.logging.LogProcessor In processAck from Actor[akka://Archival/deadLetters], sending BatchProcessed to: Actor[akka://Archival/user/KafkaLogProcessor-prod.log.general-128919660836139306/prod.log.general.connector8919660836712630#-1171870480]
2015-07-25T00:54:57.899+0000 INFO Archival-akka.actor.default-dispatcher-48 com.whitepages.ait.archival.Archival Finished dispatch/ack batch of size: 40000 to LogWriters in 41 millis from actor: Actor[akka://Archival/user/KafkaLogProcessor-prod.log.general-128919660836139306#-1505022696]
2015-07-25T00:54:57.899+0000 INFO Archival-akka.actor.default-dispatcher-48 com.whitepages.logging.LogProcessor In prod.log.general LogProcessor, got Ack from: Actor[akka://Archival/deadLetters] with count: 40000; status: 40000 / 40000}
2015-07-25T00:54:57.899+0000 INFO Archival-akka.actor.default-dispatcher-48 com.whitepages.logging.LogProcessor In processAck from Actor[akka://Archival/deadLetters], sending BatchProcessed to: Actor[akka://Archival/user/KafkaLogProcessor-prod.log.general-128919660836139306/prod.log.general.connector8919660836712630#-1171870480]
[WARN] [07/25/2015 00:54:59.343] [Archival-akka.actor.default-dispatcher-2] [akka://Archival/user/KafkaLogProcessor-prod.log.general-128919660836139306/prod.log.general.connector8919660836712630] state=WaitingToSendBatch msg=StateTimeout outstanding=1 streams=2
[WARN] [07/25/2015 00:55:00.352] [Archival-akka.actor.default-dispatcher-26] [akka://Archival/user/KafkaLogProcessor-prod.log.general-128919660836139306/prod.log.general.connector8919660836712630] state=WaitingToSendBatch msg=StateTimeout outstanding=1 streams=2
[WARN] [07/25/2015 00:55:01.362] [Archival-akka.actor.default-dispatcher-7] [akka://Archival/user/KafkaLogProcessor-prod.log.general-128919660836139306/prod.log.general.connector8919660836712630] state=WaitingToSendBatch msg=StateTimeout outstanding=1 streams=2
2015-07-25T00:55:02.220+0000 INFO Archival-akka.actor.default-dispatcher-35 com.whitepages.ait.archival.Archival Finished dispatch/ack batch of size: 39999 to LogWriters in 46 millis from actor: Actor[akka://Archival/user/KafkaLogProcessor-prod.log.general-128919660836139306#-1505022696]
2015-07-25T00:55:02.220+0000 INFO Archival-akka.actor.default-dispatcher-35 com.whitepages.logging.LogProcessor In prod.log.general LogProcessor, got Ack from: Actor[akka://Archival/deadLetters] with count: 39999; status: 39999 / 39999}
2015-07-25T00:55:02.220+0000 INFO Archival-akka.actor.default-dispatcher-35 com.whitepages.logging.LogProcessor In processAck from Actor[akka://Archival/deadLetters], sending BatchProcessed to: Actor[akka://Archival/user/KafkaLogProcessor-prod.log.general-128919660836139306/prod.log.general.connector8919660836712630#-1171870480]
[WARN] [07/25/2015 00:55:03.552] [Archival-akka.actor.default-dispatcher-62] [akka://Archival/user/KafkaLogProcessor-prod.log.general-128919660836139306/prod.log.general.connector8919660836712630] state=WaitingToSendBatch msg=StateTimeout outstanding=1 streams=2
[WARN] [07/25/2015 00:55:04.562] [Archival-akka.actor.default-dispatcher-30] [akka://Archival/user/KafkaLogProcessor-prod.log.general-128919660836139306/prod.log.general.connector8919660836712630] state=WaitingToSendBatch msg=StateTimeout outstanding=1 streams=2
[WARN] [07/25/2015 00:55:05.582] [Archival-akka.actor.default-dispatcher-25] [akka://Archival/user/KafkaLogProcessor-prod.log.general-128919660836139306/prod.log.general.connector8919660836712630] state=WaitingToSendBatch msg=StateTimeout outstanding=1 streams=2
2015-07-25T00:55:06.496+0000 INFO Archival-akka.actor.default-dispatcher-7 com.whitepages.ait.archival.Archival Finished dispatch/ack batch of size: 39999 to LogWriters in 30 millis from actor: Actor[akka://Archival/user/KafkaLogProcessor-prod.log.general-128919660836139306#-1505022696]
2015-07-25T00:55:06.496+0000 INFO Archival-akka.actor.default-dispatcher-7 com.whitepages.logging.LogProcessor In prod.log.general LogProcessor, got Ack from: Actor[akka://Archival/deadLetters] with count: 39999; status: 39999 / 39999}
2015-07-25T00:55:06.496+0000 INFO Archival-akka.actor.default-dispatcher-7 com.whitepages.logging.LogProcessor In processAck from Actor[akka://Archival/deadLetters], sending BatchProcessed to: Actor[akka://Archival/user/KafkaLogProcessor-prod.log.general-128919660836139306/prod.log.general.connector8919660836712630#-1171870480]
2015-07-25T00:55:06.957+0000 INFO Archival-akka.actor.default-dispatcher-49 com.whitepages.ait.archival.Archival Finished dispatch/ack batch of size: 40000 to LogWriters in 43 millis from actor: Actor[akka://Archival/user/KafkaLogProcessor-prod.log.general-128919660836139306#-1505022696]
2015-07-25T00:55:06.957+0000 INFO Archival-akka.actor.default-dispatcher-49 com.whitepages.logging.LogProcessor In prod.log.general LogProcessor, got Ack from: Actor[akka://Archival/deadLetters] with count: 40000; status: 40000 / 40000}
2015-07-25T00:55:06.957+0000 INFO Archival-akka.actor.default-dispatcher-49 com.whitepages.logging.LogProcessor In processAck from Actor[akka://Archival/deadLetters], sending BatchProcessed to: Actor[akka://Archival/user/KafkaLogProcessor-prod.log.general-128919660836139306/prod.log.general.connector8919660836712630#-1171870480]
q[WARN] [07/25/2015 00:55:08.383] [Archival-akka.actor.default-dispatcher-45] [akka://Archival/user/KafkaLogProcessor-prod.log.general-128919660836139306/prod.log.general.connector8919660836712630] state=WaitingToSendBatch msg=StateTimeout outstanding=1 streams=2
[WARN] [07/25/2015 00:55:09.402] [Archival-akka.actor.default-dispatcher-17] [akka://Archival/user/KafkaLogProcessor-prod.log.general-128919660836139306/prod.log.general.connector8919660836712630] state=WaitingToSendBatch msg=StateTimeout outstanding=1 streams=2

I see the same pattern recurring (a full batch of e.g. 40k exactly) moves quite quickly. When it isn't moving quickly, there are three WaitingToSendBatch warnings followed by a batch size of (FullBatch-1)...

Once delivered, the time spent processing and Acking batches is on the order of 50ms (just streaming to files), and GC isn't an issue either, as the box has 80GB RAM and the JVM isn't too busy collecting...

So there may be one issue here or two (the WaitingToSendBatch and the inability to have a reasonably high number of streams; both symptoms may be related to a single root cause, or not...)

Help! (Thanks!)

actor name [connectorFSM] is not unique!

Hi there,

I'm getting this after hours-to-days of uptime. It looks like the connector is getting recreated without having been stopped first. But, there's an inherent race condition between actor lifecycle and namespace maintenance, so even if the actual problem is fixed, you can still see this. What I've done in the past is "uniquify" actor names by doing this:

val rnd = new scala.util.Random()
actorOf(Props(...), "myChild-" + rnd.alphanumeric.take(8).mkString)

[ERROR] [06/26/2014 05:43:00.044] [countingService-akka.actor.default-dispatcher-2235] [akka://countingService/user/$a/$a/$d] actor name [connectorFSM] is not unique!
akka.actor.InvalidActorNameException: actor name [connectorFSM] is not unique!
    at akka.actor.dungeon.ChildrenContainer$NormalChildrenContainer.reserve(ChildrenContainer.scala:130)
    at akka.actor.dungeon.Children$class.reserveChild(Children.scala:77)
        at akka.actor.ActorCell.reserveChild(ActorCell.scala:369)
        at akka.actor.dungeon.Children$class.makeChild(Children.scala:202)
        at akka.actor.dungeon.Children$class.attachChild(Children.scala:42)
        at akka.actor.ActorCell.attachChild(ActorCell.scala:369)
        at akka.actor.ActorSystemImpl.actorOf(ActorSystem.scala:552)
        at com.sclasen.akka.kafka.AkkaConsumer.createConnection(AkkaConsumer.scala:45)
        at com.sclasen.akka.kafka.AkkaConsumer.connector$lzycompute(AkkaConsumer.scala:27)
        at com.sclasen.akka.kafka.AkkaConsumer.connector(AkkaConsumer.scala:27)
        at com.sclasen.akka.kafka.AkkaConsumer.start(AkkaConsumer.scala:50)

Question : How can I consume my kafka topic as fast as possible ?

Hi,

I'm using Kafka between ruby and scala and I noticed that my ruby HermannConsumer is consuming message faster than my AkkaConsumer.

I can set commitConfig = CommitConfig(commitInterval = Some(10 millis)) in kafkaProps but I'm not sure that it's the right thing to do.

How can I consume my kafka topic as fast as possible ?

Feature Request: Topic Filter wildcard whitelist & blacklist

Expose the following Kafka API from ConsumerConnector into Akka-kafka:

  /**
   *  Create a list of message streams for all topics that match a given filter.
   *
   *  @param topicFilter Either a Whitelist or Blacklist TopicFilter object.
   *  @param numStreams Number of streams to return
   *  @param keyDecoder Decoder to decode the key portion of the message
   *  @param valueDecoder Decoder to decode the value portion of the message
   *  @return a list of KafkaStream each of which provides an
   *          iterator over message/metadata pairs over allowed topics.
   */
  def createMessageStreamsByFilter[K,V](topicFilter: TopicFilter,
                                        numStreams: Int = 1,
                                        keyDecoder: Decoder[K] = new DefaultDecoder(),
                                       valueDecoder: Decoder[V] = new DefaultDecoder())
    : Seq[KafkaStream[K,V]]

Multiple workers actors

There are many stream consumer actors created (FSMStreams), but only one worker actor - this essentially turns any non-trivial worker into a bottle neck - would it make sense to have a consumer actor per stream?

ConsumerTimeoutException

I see millions of Exceptions being catched here:

case cte: ConsumerTimeoutException => false

We're using the default consumer.timeout of 400ms at this point. Do I understand correctly, that if there is no message within a period of 400ms, the iterator will throw this exception? What is the best practice in order to prevent exceptions?, just increase the timeout?

Thanks!
Marc

Feature Request: option to return message metadata (e.g. msg key) to consumer

Right now lines 210-211 in Actors.scala look like:

Lines 210-211 in Actors.scala:
/* ok to process, and msg available */
case Event(Continue, outstanding) if hasNext() =>
val msg = msgIterator.next().message()

Our producers are encoding keys with messages, we'd like the consumers to be able to access, but right now, the akka-kafka framework doesn't pass the meta-data up, AFAIK.

The Kafka MessageAndMetadata[K, V] class appears to have a method: key() that appears to be what we need...

Perhaps another optional property passed into the Connector constructor to return a tuple of (msg, key) instead of just message?

Thanks!

-Jonathan

StateTimeout after 1 message is processed

I have a simple pipeline using non-batch consumer. I extract the message on ExtractActor, pass it along to TransformActor which generates N possible transformations, and these are passed along to LoadActor which performs the load (an elasticsearch cluster) and once all loads are confirmed, the StreamFSM.Procesed message is sent to original "sender" (the one who sent the message to ExtractActor.
I send 1 single message, it gets passed thru all the pipeline - all transformations applied, stored in database, etc - and sends its StreamFSM.Processed message. However, after this, I start receiving StateTimeout warning 1 per second.

The problem is not the warning itself, but the fact that I don't get any more messages consumed after this!

I use 4 partitions of kafka.
This is the warning:
2015-03-30 12:15:10,568 WARN com.sclasen.akka.kafka.ConnectorFSM Risk-Events-ETL-akka.actor.default-dispatcher-140 - state=Committing msg=StateTimeout drained=3 streams=4

Version in sonatype.org

I'm using the 0.2.1 version from Sonatype (resolvers += "Sonatype OSS" at "https://oss.sonatype.org/content/repositories/releases"), and getting some weird errors.

However when I build my own 0.2.2-SNAPSHOT version everything works find. Looking the at the commit history, nothing has changed between the two versions.

Any idea what's wrong with the Sonatype build?

Question, how to set SupervisorStrategy for StreamFSM?

Pardon me for English is not my primary language.

We have size limitation on the messages so that StreamFSM.hasNext() will receive a MessageSizeTooLargeException when encountering a too large message. The default SupervisorStrategy behavior is simply restart the process. Since nothing changed the process gets stuck with this ill message forever.

We'd like to recover this error by moving offset to next so that we can skip this message and restart the process. However we could not find a way to set SupervisorStrategy for the framework and change the default behavior. I am wondering if there is a way for us to do so?

StateTimeout with long processing

Hello,

I'm using Kafka as a queue for some IO operations in a worker, and I have to assume that the operations could last sometimes up to 30-60 seconds. I've notices I am seeing a lot of StateTimeout messages in the log from akka-kafka then. I tried fiddling with the commit interval/timeout but no real luck. The behaviour changes a bit if I put the StreamFSM.Processed before or after the actual IO, but nevertheless the "error" is the same, just in different moments (this is expected). How to coordinate this situation?

Question: Consuming from start of a topic

Hi Scott,
Cheers on the implementation it works great. Had a simple question, is there any configuration setting so that the AkkaConsumer will start at the beginning of a topic every time? (Something like the --from-beginning in the Kafka console java consumer)
Thanks!

Question : I try to use it but nothing ...

Hi

I've try to use it but I can't get it working I mean that I get message from kafka only at the this time then I enter in this mode:
[test-akka.actor.default-dispatcher-3] INFO com.sclasen.akka.kafka.ConnectorFSM - at=created-streams
[test-akka.actor.default-dispatcher-3] INFO akka.actor.ActorSystemImpl - at=consumer-started
[test-akka.actor.default-dispatcher-5] INFO com.sclasen.akka.kafka.ConnectorFSM - at=transition from=Receiving to=Committing uncommitted=0
[test-akka.actor.default-dispatcher-5] WARN com.sclasen.akka.kafka.ConnectorFSM - state=Committing msg=StateTimeout drained=1 streams=1
[test-akka.actor.default-dispatcher-5] WARN com.sclasen.akka.kafka.ConnectorFSM - state=Committing msg=StateTimeout drained=1 streams=1
[test-akka.actor.default-dispatcher-2] WARN com.sclasen.akka.kafka.ConnectorFSM - state=Committing msg=StateTimeout drained=1 streams=1

And don't get nothing anymore.
I've try to play with CommitConf without success.
Could you help me
My code is
Main () {
val system = ActorSystem("test")

val senderRequest = system.actorOf(Props[SenderRequest])
val consumerProps = AkkaConsumerProps.forSystem(
system = system,
zkConnect = "localhost:2181",
topic = "HTTP",
group = "1",
streams = 1, //one per partition
keyDecoder = new DefaultDecoder(),
msgDecoder = new DefaultDecoder(),
receiver = senderRequest,
commitConfig = new CommitConfig(Some(10 seconds), Some(1), Timeout(10 second))
)

val consumer = new AkkaConsumer(consumerProps)
consumer.start()
}

class SenderRequest extends Actor with ActorLogging {
override def receive: Actor.Receive = {
case data : Any =>
log.info("RECEIVE HERE !!!!!!!!!!!!!!!")
sender() ! StreamFSM.Processed
}
}

Thanks

Implementing topic offset

Hi,

I would love to be able to set the initial offset in time, i.e. reading from the starts or just reading new messages since the current moment. I'm implementing some Kafka readers that are not interested in any backed up messages, just the message that are arriving from the moment the readers attached. Any tips on how this could be done?

Thanks!

Question about zero tolerance for message loss

Thanks for building a nice akka-kafka tool. I have few questions,
We are using your tool to read data from Kafka and pass the messages read, to a cluster of nodes for processing.
If one of the node goes down during processing, we don't get an acknowledgement back from the node to the main actor to commit offsets.
For our use case we don't want to lose any messages unprocessed, and with high level consumer, I didn't find any easy way to read the same set of messages without shutting down the consumer.

What would be the best way to ensure every message is processed at least once. Any thoughts/suggestions?

Appreciate your help.

Race issues with StreamFSM when sending a batch of Processed messages

Most of what we do with Akka-Kafka involves many thousands of messages a second, and these messages are sorted into categories, and processed in bulk into various backend sinks.

We started to work with the back-pressure support in akka-kafka and rather than just return with a StreamFSM.Processed acknowledgement immediately, we started to tie these to the success of the backend batch operations.

However, with batch sizes of over around 10 messages (although we typically deal with batches on the backend of size 20-50k), we start to see the behavior get "racy", e.g. occasionally seeing:

WARN ...connector] unhandled event Processed in state Committing

The higher the batch size, the more likely we see this error, which eventually leads to consumer failure due to unhandled acknowledgements.

(We have an Akka method that, on success, takes the batch size as an argument and then turns around and does this:)

   case AckMessage(count) =>
      val connector = consumer.connector
      logger.debug(s"In AckMessage, sending $count acks back to: $connector")
      for (i <- 1 to count) connector ! StreamFSM.Processed

I tried playing with different settings for maxInFlightPerStream, but that didn't seem to help...

Perhaps this could be solved by adding an Int or Long argument to the StreamFSM.Processed message like I did for the relay above?

Example in the readme not working

I have downloaded the code and tried the example code mentioned in the readme file, I have no luck in running the sample program. Every time i start the program, it starts and doesn't consume any data from kafka.
Here are the logs i get when debug is enabled
[INFO] [01/22/2015 17:59:36.473] [test-akka.actor.default-dispatcher-2] [akka://test/user/$a] at=start
[INFO] [01/22/2015 17:59:37.743] [test-akka.actor.default-dispatcher-2] [akka://test/user/$a] at=created-streams
[INFO] [01/22/2015 17:59:37.755] [test-akka.actor.default-dispatcher-4] [ActorSystem(test)] at=consumer-started
[DEBUG] [01/22/2015 17:59:37.758] [test-akka.actor.default-dispatcher-2] [akka://test/user/$a] state=Receiving msg=Commit uncommitted=0
[INFO] [01/22/2015 17:59:37.758] [test-akka.actor.default-dispatcher-2] [akka://test/user/$a] at=transition from=Receiving to=Committing uncommitted=0
[ERROR] [01/22/2015 17:59:37.761] [test-akka.actor.default-dispatcher-2] [akka://test/user/$a] Unhandled event and stopping the consumer
[DEBUG] [01/22/2015 17:59:37.771] [test-akka.actor.default-dispatcher-3] [akka://test/user/$a/stream0] stream=stream0 state=Processing msg=Continue outstanding=0
[DEBUG] [01/22/2015 17:59:37.771] [test-akka.actor.default-dispatcher-3] [akka://test/user/$a/stream0] stream=stream0 at=transition from=Processing to=Unused
[DEBUG] [01/22/2015 17:59:37.771] [test-akka.actor.default-dispatcher-3] [akka://test/user/$a/stream0] stream=stream0 state=Unused msg=Drain outstanding=0
[DEBUG] [01/22/2015 17:59:37.771] [test-akka.actor.default-dispatcher-3] [akka://test/user/$a/stream0] stream=stream0 at=Drained
[DEBUG] [01/22/2015 17:59:37.771] [test-akka.actor.default-dispatcher-3] [akka://test/user/$a/stream0] stream=stream0 at=transition from=Unused to=Empty
[INFO] [01/22/2015 17:59:37.817] [test-akka.actor.default-dispatcher-4] [ActorSystem(test)] at=consumer-stopped
[INFO] [01/22/2015 17:59:37.824] [test-akka.actor.default-dispatcher-2] [akka://test/user/$a/stream0] Message [akka.dispatch.sysmsg.Terminate] from Actor[akka://test/user/$a/stream0#677859073] to Actor[akka://test/user/$a/stream0#677859073] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [01/22/2015 17:59:37.826] [test-akka.actor.default-dispatcher-4] [akka://test/user/$a] Message [com.sclasen.akka.kafka.ConnectorFSM$Drained] from Actor[akka://test/user/$a/stream0#677859073] to Actor[akka://test/user/$a#-1603528940] was not delivered. [2] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.

Later I realized issue is because of calling
consumer.stop() //returns a Future[Unit] that completes when the connector is stopped.
It sends an event FSMConnector and ends up in unhandled section of FSMConnector in Actors.scala
whenUnhandled{
case Event(ConnectorFSM.Stop, ) =>
log.error("Unhandled event and stopping the consumer")
connector.shutdown()
sender() ! ConnectorFSM.Stop
context.children.foreach(
! StreamFSM.Stop)
stop()
}

I commented consumer.stop() and it started reading data from kafka, I have to figure out how to stop the program once it finished all data from kafka.
I have tested it from eclipse and command line, both had same problem.
Am i doing something wrong?

Appreciate your help.

0.8.2-beta not working

commitOffsets needs () at the end now since we added an optional param to it

[ERROR] [10/31/2014 14:58:09.515] [default-akka.actor.default-dispatcher-5] [ActorSystem(default)] Uncaught error from thread [default-akka.actor.default-dispatcher-5] shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled
java.lang.NoSuchMethodError: kafka.consumer.ConsumerConnector.commitOffsets()V
at com.sclasen.akka.kafka.BatchConnectorFSM$$anonfun$5.applyOrElse(BatchActors.scala:148)
at com.sclasen.akka.kafka.BatchConnectorFSM$$anonfun$5.applyOrElse(BatchActors.scala:129)
at com.sclasen.akka.kafka.BatchConnectorFSM$$anonfun$5.applyOrElse(BatchActors.scala:148)
at com.sclasen.akka.kafka.BatchConnectorFSM$$anonfun$5.applyOrElse(BatchActors.scala:129)
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33)
at akka.actor.FSM$class.processEvent(FSM.scala:604)
at com.sclasen.akka.kafka.BatchConnectorFSM.processEvent(BatchActors.scala:60)
at akka.actor.FSM$class.akka$actor$FSM$$processMsg(FSM.scala:598)
at akka.actor.FSM$$anonfun$receive$1.applyOrElse(FSM.scala:592)
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33)
at akka.actor.FSM$class.processEvent(FSM.scala:604)
at com.sclasen.akka.kafka.BatchConnectorFSM.processEvent(BatchActors.scala:60)
at akka.actor.FSM$class.akka$actor$FSM$$processMsg(FSM.scala:598)
at akka.actor.FSM$$anonfun$receive$1.applyOrElse(FSM.scala:592)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at com.sclasen.akka.kafka.BatchConnectorFSM.aroundReceive(BatchActors.scala:60)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at com.sclasen.akka.kafka.BatchConnectorFSM.aroundReceive(BatchActors.scala:60)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

`unhandled event StartProcessing` Warning

We're seeing the following message in our logs:

[WARN] [2016-05-05 17:40:35,793] [EventService-akka.actor.default-dispatcher-12] c.s.a.k.StreamFSM akka://EventService/user/$b/stream0: unhandled event StartProcessing in state Draining

Is this an error we should be concerned about? How should we go about debugging?

Thanks in advance!

Empty Queue

I there away to continue the AkkaConsumer when the queue becomes empty?

DEBUG] [08/12/2015 15:57:06.364] [base-actor-system-akka.actor.default-dispatcher-3] [akka://base-actor-system/user/$a/stream0] stream=stream0 at=transition from=Processing to=Unused

in my case, it stops and even when the queue gets new messages, it does not continue
i tried with stop / start it keeps idle

Specifying multiple topic

Would it make sense to allow passing a Map of <Topic, Stream #> to create a consumer for multiple topics?
It's currently possible to do that by passing a TopicFilter, but it would be nice to have controlle over the number of streams for each topic

Can AkkaConsumber inherit from a common base trait?

I would like to have a common base for akka consumers... to allow for a generic way to process clean-up

ex:
private[this] val consumerRegistry = new ConcurrentHashMap[String, AkkaConsumerBase[String, _]]
`

def shutdown() = {
    logger.info("Shutting down Event Bus")
    producer.close()
    Await.ready(Future.sequence(consumerRegistry.map(e => e._2.stop())), 5 minute)
  }

Thoughts ?

No license explicitly stated

We'd like to use this project but noticed no license was explicitly stated. I was hoping to convince you to declare a license. If you don't have a preference, ASF, BSD 2 or 3 clause, or MIT are preferable. ;)

Thanks!

Specify receiver as Props instead of ActorRef?

Hello!

Maybe there's something I don't understand as I am fairly new to the world of Akka, but wouldn't it be wiser to supply the helper methods with a way to instantiate the Actor that processes the messages, so that each StreamFSM actor can create their own rather than all send to the same ActorRef, thus creating possibly some sort of bottleneck, wherever message processing can be a long-running process (intensive or not)? (such as for example submitting this message to a third party service and waiting for the answer).

The way I map this right now, it feels like the parallelism or having multiple StreamFSM is wasted on the fact that the messages are processed by the same ActorRef in the end?

Thanks for your great work!

Guidance Requested: Error Handling

Hello,

Thanks for the great library. I'm using the basic AkkaConsumer actor and am hoping you can provide guidance on error handling within the receive method. Specifically, we're handling the messages and writing to a DB system. Occasionally we receive an timeout from the underlying DB that manifests as an exception. It appears that this then causes that message to not properly send a StreamFSM.Processed event (which seems correct, since it the message was not processed). This then ultimately causes our consumer to hang in an infinite committing state.

What's the best way to handle messages such that the message will be retried to another actor? Do we need to handle all possible error cases in receive, or is there some event we can send to the sender to trigger a retry?

Thanks in advance.

New Kafka .9 consumer

Is there any plan to make this work with the .9 consumer? I know the architecture of that consumer is much different.

NPE in BatchConnectorFSM

We get an occasional NullPointerException in the BatchConnectorFSM, and then all consuming halts. There is no obvious way to setup a supervisor to restart?

[ERROR] [2017-01-11 04:51:32,383] [OneForOneStrategy] [loader-akka.actor.default-dispatcher-90] [] null
java.lang.NullPointerException: null
        at org.I0Itec.zkclient.ZkConnection.writeDataReturnStat(ZkConnection.java:138)
        at org.I0Itec.zkclient.ZkClient$13.call(ZkClient.java:1151)
        at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:990)
        at org.I0Itec.zkclient.ZkClient.writeDataReturnStat(ZkClient.java:1147)
        at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:1142)
        at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:1110)
        at kafka.utils.ZkUtils$.updatePersistentPath(ZkUtils.scala:326)
        at kafka.consumer.ZookeeperConsumerConnector.commitOffsetToZooKeeper(ZookeeperConsumerConnector.scala:283)
        at kafka.consumer.ZookeeperConsumerConnector$$anonfun$5.apply(ZookeeperConsumerConnector.scala:304)
        at kafka.consumer.ZookeeperConsumerConnector$$anonfun$5.apply(ZookeeperConsumerConnector.scala:303)
        at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:221)
        at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428)
        at kafka.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:303)
        at kafka.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:380)
        at com.sclasen.akka.kafka.BatchConnectorFSM$$anonfun$4.applyOrElse(BatchActors.scala:142)
        at com.sclasen.akka.kafka.BatchConnectorFSM$$anonfun$4.applyOrElse(BatchActors.scala:137)
        at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
        at akka.actor.FSM$class.processEvent(FSM.scala:663)
        at com.sclasen.akka.kafka.BatchConnectorFSM.processEvent(BatchActors.scala:62)
        at akka.actor.FSM$class.akka$actor$FSM$$processMsg(FSM.scala:657)
        at akka.actor.FSM$$anonfun$receive$1.applyOrElse(FSM.scala:651)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:484)
        at com.sclasen.akka.kafka.BatchConnectorFSM.aroundReceive(BatchActors.scala:62)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
        at akka.actor.ActorCell.invoke(ActorCell.scala:495)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
        at akka.dispatch.Mailbox.run(Mailbox.scala:224)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)```

Consumer with a Router

Does it make sense to extend the consumer with a router setting, i.e round robin? Because where I see it, we may need multiple instances of the same consumer if the processing logic allows it. For example some processing that is not quick and separate messages can be separately processed, so we might have 2+ consumers with the same group.

So, is it sensible to implement this, knowing Kafka and Akka principles?

Unused state does not try again

For some reason the first time the StreamFSM tries to get messages from kafka it does not find any message and goes to state Unused forever (If I use CommitConfig(None, none).
My idea is to have a scheduler that should send a message (Continue) after some time, for exemple:

When(Unused)
system.scheduler.schedule(10 seconds, 1 second, self, Continue) //Just once

If you agree with this , I can make a pull request

By the way, Any idea why if I set a high maxMessagesInFlight (5000 for example), the KafkaConsumer stops receiving messages and then it goes to Unsued state ?. It seems some problem with the KafkaConsumer when I try to read a lot of messages really fast

Batch consumer priority

Hello,

Currently, I have several batch consumer running in my application. But some messages in certain batch consumer must be processed before messages in other batch consumer.
Is it possible to specify the priority of a batch consumer over the other? (e.g. process all message in this topic before other batch consumer process theirs)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.