Giter Club home page Giter Club logo

zio-kafka's Introduction

ZIO Kafka

ZIO Kafka is a Kafka client for ZIO. It provides a purely functional, streams-based interface to the Kafka client and integrates effortlessly with ZIO and ZIO Streams.

Production Ready CI Badge Sonatype Releases Sonatype Snapshots javadoc ZIO Kafka Scala Steward badge

Introduction

Apache Kafka is a distributed event streaming platform that acts as a distributed publish-subscribe messaging system. It enables us to build distributed streaming data pipelines and event-driven applications.

Kafka has a mature Java client for producing and consuming events, but it has a low-level API. ZIO Kafka is a ZIO native client for Apache Kafka. It has a high-level streaming API on top of the Java client. So we can produce and consume events using the declarative concurrency model of ZIO Streams.

Installation

In order to use this library, we need to add the following line in our build.sbt file:

libraryDependencies += "dev.zio" %% "zio-kafka"         % "2.8.0"
libraryDependencies += "dev.zio" %% "zio-kafka-testkit" % "2.8.0" % Test

Snapshots are available on Sonatype's snapshot repository https://oss.sonatype.org/content/repositories/snapshots. Browse here to find available versions.

Example

Let's write a simple Kafka producer and consumer using ZIO Kafka with ZIO Streams. Before everything, we need a running instance of Kafka. We can do that by saving the following docker-compose script in the docker-compose.yml file and run docker-compose up:

version: '2'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - 22181:2181
  
  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    ports:
      - 29092:29092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

Now, we can run our ZIO Kafka Streaming application:

import zio._
import zio.kafka.consumer._
import zio.kafka.producer.{Producer, ProducerSettings}
import zio.kafka.serde._
import zio.stream.ZStream

object MainApp extends ZIOAppDefault {
  val producer: ZStream[Producer, Throwable, Nothing] =
    ZStream
      .repeatZIO(Random.nextIntBetween(0, Int.MaxValue))
      .schedule(Schedule.fixed(2.seconds))
      .mapZIO { random =>
        Producer.produce[Any, Long, String](
          topic = "random",
          key = random % 4,
          value = random.toString,
          keySerializer = Serde.long,
          valueSerializer = Serde.string
        )
      }
      .drain

  val consumer: ZStream[Consumer, Throwable, Nothing] =
    Consumer
      .plainStream(Subscription.topics("random"), Serde.long, Serde.string)
      .tap(r => Console.printLine(r.value))
      .map(_.offset)
      .aggregateAsync(Consumer.offsetBatches)
      .mapZIO(_.commit)
      .drain

  def producerLayer =
    ZLayer.scoped(
      Producer.make(
        settings = ProducerSettings(List("localhost:29092"))
      )
    )

  def consumerLayer =
    ZLayer.scoped(
      Consumer.make(
        ConsumerSettings(List("localhost:29092")).withGroupId("group")
      )
    )

  override def run =
    producer.merge(consumer)
      .runDrain
      .provide(producerLayer, consumerLayer)
}

Resources

Adopters

Here is a partial list of companies using zio-kafka in production.

Want to see your company here? Submit a PR!

Documentation

Learn more on the ZIO Kafka homepage!

Contributing

For the general guidelines, see ZIO contributor's guide.

Code of Conduct

See the Code of Conduct

Support

Come chat with us on Badge-Discord.

Credits

This library is heavily inspired and made possible by the research and implementation done in Alpakka Kafka, a library maintained by the Akka team and originally written as Reactive Kafka by SoftwareMill.

License

License

Copyright 2021-2024 Itamar Ravid and the zio-kafka contributors.

zio-kafka's People

Contributors

aartigao avatar adamgfraser avatar andreamarcolin avatar azhur avatar dependabot[bot] avatar egast avatar erikvanoosten avatar flavienbert avatar ghostdogpr avatar github-actions[bot] avatar guizmaii avatar guymers avatar iravid avatar khajavi avatar koshelev avatar kyri-petrou avatar lukestephenson avatar mdulac avatar mijicd avatar regispl avatar rituraj2342 avatar ruurtjan avatar scala-steward avatar strokyl avatar svroonland avatar trobert avatar vigoo avatar vladimirkl avatar zio-assistant[bot] avatar zio-scala-steward[bot] avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

zio-kafka's Issues

probable concurrency bug using multiple consumers

This can be demonstrated on PR labelled Client Admin

To reproduce, edit in tests zio.kafka.client.EmbeddedMultiConsumerTest
change the test in the list from testMultipleConsumers to testParallelConsumers

On my system it will hang without returning almost all the time but not all the time.
Code for testMultipleConsumers is:

  val testMultipleConsumers = testM("test multiple consumers") {
    for {
      topic           <- randomTopic
      consumerGroupId <- randomGroup
      _               <- makeTopic(topic, 5)
      _               <- makeMany(topic, 1000)
      _               <- Live.live(ZIO.sleep(2.seconds))
      consumed        = 0.to(4).map(i => MultiConsumerTestHelper.consumeN(topic, consumerGroupId, i, 3))
      _               <- ZIO.collectAll(consumed)
    } yield {
      assertCompletes
    }
  }

For testParallelConsumers I simply change collectAll to collectAllPar

Setup CI and publishing

@NeQuissimus do we have a project structure I could use for this? I know my way around CircleCI so you could throw the general instructions at me :-)

Add a diagnostics interface for the consumer

We donโ€™t want to depend on any logging libraries, but we want to output diagnostics if the user chooses to.

The idea is to create a DiagnosticEvent ADT, and a Diagnostics service which has an emit: DiagnosticEvent => UIO[Unit] function.

We will depend on this service in the run loop and emit diagnostics through it. Two implementations will be provided: a no-op implementation and a sliding queue implementation.

Support external offset retrieval

Currently partitions will be consumed from the latest offset stored in Kafka. We should also support a way to get offsets from an external source.

Add module pattern and service for AdminClient

I want to pass adminclient around as an environment service and since it's a fairly heavy-weight object it seems the sensible way to do it.
I can make the additions in the AdminClient.scala file without causing any problems I think

update tests to zio-test

I'm volunteering to do this. I'll build a couple of doc pages around them at the same time - could be useful. @iravid any reason not to do this?

Handle java.lang.IllegalStateException: No current assignment for partition

Sometimes, I am getting IllegalStateException and consumer stop feeding from Kafka.
Stacktrace:

Fiber failed.
A checked error was not handled.
java.lang.IllegalStateException: No current assignment for partition test-topic-13
	at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:356)
	at org.apache.kafka.clients.consumer.internals.SubscriptionState.resume(SubscriptionState.java:662)
	at org.apache.kafka.clients.consumer.KafkaConsumer.resume(KafkaConsumer.java:2012)
	at zio.kafka.client.internal.Runloop$.$anonfun$apply$29(Runloop.scala:331)
	at zio.Task$.$anonfun$effectSuspend$1(Task.scala:195)
	at zio.internal.FiberContext.liftedTree1$1(FiberContext.scala:546)
	at zio.internal.FiberContext.evaluateNow(FiberContext.scala:546)
	at zio.internal.FiberContext.$anonfun$evaluateLater$1(FiberContext.scala:661)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)

Fiber:Id(1582406036469,10490784) was supposed to continue to:
  a future continuation at zio.ZIO.run(ZIO.scala:1166)
  a future continuation at zio.ZIO.bracket_(ZIO.scala:147)
  a future continuation at zio.kafka.client.internal.ConsumerAccess$$anonfun$withConsumerNoPermit$2.apply(ConsumerAccess.scala:23)

Fiber:Id(1582406036469,10490784) execution trace:
  at zio.Task$.effectSuspend(Task.scala:195)
  at zio.ZIOFunctions.effectSuspend(ZIO.scala:2003)
  at zio.ZIO.bracket_(ZIO.scala:147)
  at zio.internal.FiberContext.lock(FiberContext.scala:606)
  at zio.internal.FiberContext.lock(FiberContext.scala:606)
  at zio.blocking.Blocking$Service.blocking(Blocking.scala:69)
  at zio.blocking.package$.blockingExecutor(blocking.scala:23)

Fiber:Id(1582406036469,10490784) was spawned by:

Fiber:Id(1582405673386,9919483) was supposed to continue to:
  a future continuation at zio.kafka.client.internal.ConsumerAccess.withConsumerNoPermit(ConsumerAccess.scala:26)
  a future continuation at zio.ZIO.run(ZIO.scala:1166)
  a future continuation at zio.Semaphore.withPermits(Semaphore.scala:101)
  a future continuation at zio.kafka.client.internal.Runloop$.apply(Runloop.scala:232)
  a future continuation at zio.kafka.client.internal.Runloop$.apply(Runloop.scala:232)
  a future continuation at zio.stream.ZStream.foldWhileManagedM(ZStream.scala:1460)
  a future continuation at zio.stream.ZStream.foldM(ZStream.scala:1384)
  a future continuation at zio.ZIO.run(ZIO.scala:1166)
  a future continuation at zio.ZManaged.use(ZManaged.scala:748)
  a future continuation at zio.ZIO.run(ZIO.scala:1166)

Fiber:Id(1582405673386,9919483) execution trace:
  at zio.Semaphore.withPermits(Semaphore.scala:102)
  at zio.Semaphore.withPermits(Semaphore.scala:102)
  at zio.Ref$.modify$extension(Ref.scala:53)
  at zio.Semaphore.prepare(Semaphore.scala:131)
  at zio.Promise$.makeAs(Promise.scala:248)
  at zio.Promise$.make(Promise.scala:241)
  at zio.ZIOFunctions.fiberId(ZIO.scala:2054)
  at zio.ZIOFunctions.descriptor(ZIO.scala:1874)
  at zio.kafka.client.internal.Runloop$.apply(Runloop.scala:441)
  at zio.Ref$.get$extension(Ref.scala:41)

Code:

Consumer.make(consumerSettings)
  .use { consumer =>
    consumer
      .subscribeAnd(Subscription.topics(config.kafka.topic))
      .plainStream(Serde.string, Serde.string)
      .flattenChunks
      .mapM(decoding)
      .mapMPar(config.parallelism) {
        case (offset, Some(event)) =>
          processing(offset, event)
        case (offset, _) =>
          ZIO.succeed(offset)
       }
       .aggregateAsyncWithin(Consumer.offsetBatches, scheduler)
       .mapM(_.commit)
       .runDrain
       .forever
  }

Enable user registration of a rebalance listener

KafkaConsumer takes a rebalance listener on topic subscription. I think it's wise to follow the usage semantics in this case. I'm thinking something in the line of:

def myRebalanceListener(rebalance: Rebalance): ZIO[Any, Nothing, Unit]
consumer.subscribe(Subscription.topics("my-input-topic"), myRebalanceListener)

subscribe method could be overloaded or given a ZIO.succeed as default for the none case.
This will however mean that the listener creation in Runloop can't occur on Deps.make anymore, unless we want to pass the user's listener via Consumer.make which I don't find that appealing. Either case the goal as I see it is getting the following line in Runloop under "new ConsumerRebalanceListener" (lines 80 and 88 at time of writing) to resemble e.g.:

for {
_ <- rebalancingRef.set(true)
rebalanceEvent = Rebalance.Revoked(partitions.asScala.toSet)
_ <- rebalances.offer(rebalanceEvent ).fork //or diagnostics emit when that's merged
_ <- userRebalanceListener(rebalanceEvent)
} yield ()

On a rebalance, your poll call is blocked until all rebalance listeners have completed. This could be longer than pollFrequency and I'm not sure if this will be an issue, some insight required here.

zio-kafka-testkit

Hi @iravid , maybe a little module with test utils can be interesting, what do you think? especially embedded kafka or another version with kafka test container.

a home for test utils?

adapting the tests for zio-test showed a need for non-trivial support methods to provide a proper providedManagedShared environment for running the tests. It's hard to see how anyone using the library and zio-test will not have the same issues. For example, practically everything needs the live clock.
Is there a place for some sort of kafka-test-util?

Evaluate Consumer

I'm trying to follow your examples but I cannot figure how to evaluate the consumerProgram

Having this code, where the consumerSettings is provided by the Consumer factory

    val subscription: Subscription = Subscription.topics("zio-topic")
    /**
     * Commit offset is done after running effect automatically
     */
    val consumerProgram: ZIO[Console with Nothing with Blocking with Clock, Throwable, Unit] =
      Consumer.consumeWith(consumerSettings, subscription, Serde.int, Serde.string) { case (key, value) =>
      putStrLn(s"Received message ${key}: ${value}")
      // Perform an effect with the received message
    }
    
    runtime.unsafeRun(consumerProgram)

I dont know what Has Layer I need to pass as provider to the program

Add a failure handler for serde

In @svroonland's words:

I can see three desirable ways of handling deserialization failures:

    Fail the stream
    Skip the message
    Allow the client to handle it explicitly

1 should be easy (already the case?)
3 can be done by sending Eithers like you mentioned.
Maybe 2 can be implemented with some sort of onDeserializationError: DeserializationFailure => UIO[Unit] parameter on the plainStream and partitionedStream methods?

Consumer doesn't work in version 0.8.0

After upgrading zio-kafka to version 0.8.0, the consumer stopped working.
The following example should print the records but does nothing.

object KafkaTestApp extends zio.App {
  val consumerSettings: ConsumerSettings = ConsumerSettings(List("localhost:9092")).withGroupId("group")
  val producerSettings: ProducerSettings = ProducerSettings(List("localhost:9092"))

  val consumerAndProducer =
    Consumer.make(consumerSettings) ++
      Producer.make(producerSettings, Serde.int, Serde.string)

  override def run(args: List[String]): ZIO[zio.ZEnv, Nothing, Int] =
    Consumer.subscribeAnd(Subscription.topics("topic1"))
      .plainStream(Serde.string, Serde.string)
      .flattenChunks
      .tap(cr => console.putStrLn(s"key: ${cr.record.key}, value: ${cr.record.value}"))
      .map(_.offset)
      .aggregateAsync(Consumer.offsetBatches)
      .runDrain
      .provideCustomLayer(consumerAndProducer)
      .fold(_ => 1, _ => 0)

}

Consider using `interruptible`

Instead of:

blocking(ZIO(...))

Consider:

interruptible(...)

Both will be placed on the blocking thread pool, but the latter can be interrupted.

SOE after 20 minutes of running the app

I have the following simple zio.App program

val topic = kafkaConfig.userAccount.topic
val registry =
  new CachedSchemaRegistryClient(kafkaConfig.schemaRegistry, 1)
val params = Map(
  KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG -> "true",
  "schema.registry.url" -> kafkaConfig.schemaRegistry
)
val serde = new KafkaAvroDeserializer(registry, params.asJava)

val kafkaSettings =
  ConsumerSettings(kafkaConfig.brokers.split(",").toList)
    .withGroupId(kafkaConfig.userAccount.consumerName)

Consumer
  .consumeWith(
    kafkaSettings,
    Subscription.topics(topic),
    Serde.byteArray,
    Deserializer(serde).map(_.asInstanceOf[UserAccountEvent])
  ) {
    case (_, v) =>
      console.putStrLn(s"message received $v")
  }
  .exitCode

It connects, consumes and parses avro messages just fine. but after 15 minutes or so produces the following error:

[error] java.lang.StackOverflowError [error] at zio.Schedule$.$anonfun$unfold$3(Schedule.scala:1041) [error] at zio.Schedule$$$Lambda$756/0000000051A60020.apply(Unknown Source) [error] at zio.Schedule$.$anonfun$unfold$3(Schedule.scala:1041) [error] at zio.Schedule$$$Lambda$756/0000000051A60020.apply(Unknown Source) [error] at zio.Schedule$.$anonfun$unfold$3(Schedule.scala:1041) [error] at zio.Schedule$$$Lambda$756/0000000051A60020.apply(Unknown Source) [error] at zio.Schedule$.$anonfun$unfold$3(Schedule.scala:1041) [error] at zio.Schedule$$$Lambda$756/0000000051A60020.apply(Unknown Source) [error] at zio.Schedule$.$anonfun$unfold$3(Schedule.scala:1041) [error] at zio.Schedule$$$Lambda$756/0000000051A60020.apply(Unknown Source) [error] at zio.Schedule$.$anonfun$unfold$3(Schedule.scala:1041) [error] at zio.Schedule$$$Lambda$756/0000000051A60020.apply(Unknown Source) [error] at zio.Schedule$.$anonfun$unfold$3(Schedule.scala:1041) [error] at zio.Schedule$$$Lambda$756/0000000051A60020.apply(Unknown Source) [error] at zio.Schedule$.$anonfun$unfold$3(Schedule.scala:1041)

Configure serializers/derserializers to be able to initialize Schema Registry

This is a follow up of Discord's short discussion with @iravid.

Configure serializers/deserializers to be able to initialize Schema Registry when using KafkaAvroSerilizer/KafkaAcroDeserializer.

While testing my producer against Kafka with Schema Registry it breaks down with following error:

[info]     org.apache.kafka.common.errors.SerializationException: Error serializing Avro message
[info]     Caused by: java.lang.NullPointerException
[info]          at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:77)
[info]          at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:58)
[info]          at com.valamis.avro.serialization.AvroSerializerWithRegistry.$anonfun$serialize$1(AvroSerializerWithRegistry.scala:33)
[info]          at scala.Option.map(Option.scala:230)
[info]          at com.valamis.avro.serialization.AvroSerializerWithRegistry.serialize(AvroSerializerWithRegistry.scala:32)
[info]          at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62)
[info]          at zio.kafka.serde.Serializer$$anon$2.$anonfun$serialize$1(Serializer.scala:45)
[info]          at zio.internal.FiberContext.evaluateNow(FiberContext.scala:345)
[info]          at zio.internal.FiberContext.$anonfun$evaluateLater$1(FiberContext.scala:687)
[info]          at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[info]          at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[info]          at java.lang.Thread.run(Thread.java:748)

Basically, Schema Registry hasn't gotten a chance to be initialized. In fact, it does that via method call 'configure' on Kafka Avro's serializers/deserializers. Have a look here for further details.

However, if you look at sources of zio-kafka's Producer and Consumer there you will see that none of them call configure on underlining Kafka's Serializer/Deserializer. Hence, schema registry doesn't have a chance to be initialized while using Kavka Avro serdes.

If you go through Scala Kafka client from Cake Solutions and have a look at, say, Consumer then you will see they call configure before do call serialize.

Consider adding one stage produce methods

Producer#produce and Producer#produceChunk are well documented about returning two stage ZIO of Task, however even README.md example falls into this pit.

I understand both reason for having lower level two-stage API and one-stage API as well.
See this discussion below: https://discordapp.com/channels/629491597070827530/629497941719121960/704617273557647370

Can we think about def produceWithAck(record: ProducerRecord[K, V]): RIO[R with Blocking, RecordMetadata] and def produceChunkWithAck(records: Chunk[ProducerRecord[K, V]]): RIO[R with Blocking, Chunk[RecordMetadata]]

I'm wondering what would be better than WithAck, any ideas?

Build for 2.13

Kafka and Embedded Kafka aren't built for 2.13, so we'll need to think what to do there. Maybe not run tests on 2.13.

This could be a good reason to running Kafka on a separate JVM.

Investigate support for stateful stream processing

This has several parts:

  • abstractions for describing a table created from a stream
  • mechanisms for persisting state (in-memory / on disk)
  • a RocksDB wrapper and implementation
  • combinators for joining and aggregating streams and tables

This is a pretty big chunk of work and could possibly be out of scope for this project. It should, at the very lease, end up in a separate module.

Cover all features of Kafka Streams

This should be the first goal here, I believe.

Since I work with Kafka Streams quite frequently, let me brain-dump what we will need to achieve parity:

  • Stream source that wraps Kafka consumers (needs to support consumer groups and hence partition assignments but that should be dealt with in the consumer code itself already)
  • Stream sink that wraps Kafka producers
  • Stream sources need to properly send heartbeats to not be kicked out of consumer groups (while very busy and while not receiving messages for > heartbeat timeout)
  • Ability to fork and merge streams
  • Kafka "exactly-once" support
  • Ability to provide custom Serde instances
  • An abstraction over Kafka topics similar to a KTable (which essentially turns a topic into a "table" by treating the topic as a changelog)
  • Exceptions, errors etc. may never destroy the stream source, something equivalent to DeserializationExceptionHandler should instead handle these messages

A lot of this can be/is handled by ZIO(-Streams) automatically.
Some of it is already implemented here, I am just trying to piece together what is still missing.

Stop using Kafka's Serde and add a real, composable Serializer/Deserializer

We should hardcode the consumer to K = Array[Byte] and V = Array[Byte], and provide data types that describe serialization/deserialization to and from byte arrays.

These data types should also be derivable from Kafka's existing Serde to ease migration.

As part of moving to these codecs, we should consider what interface to provide on the consumer for handling de/serialization failures. It's easy to just send a stream of eithers, but that is not very performant or ergonomic.

Setup benchmarks

I've recently ran into some performance issues with the library, so this would be a good time to setup benchmarking infrastructure and benchmarks. Comparisons should be done with the plain Kafka consumer/producer and Alpakka Kafka.

Handle RetriableCommitFailedException

In my experience commits sometimes fail because of some temporary situation, like a network timeout. Kafka will then throw a RetriableCommitFailedException. This should not fail the consumer.

We should probably handle this in the callback passed to commitAsync in the RunLoop

Injection of key and value serializers can be ambiguous

When I have Producer[R, A, A] or Consumer[R, A, A] but I want to use different serializers for keys and values ZLayer will register only value serializer and will use it to serialize keys.

I propose to introduce a class that encapsulates key and value serializers and use with ZLayer.

Support multiple streams with a shared consumer

Sharing a single KafkaConsumer instance from the apache kafka library would be more efficient when consuming from multiple Kafka topics. Each instance allocates a thread pool and maintains connections with the Kafka brokers.

Polling new data for partitions when no demand

Calls to poll will return up to max_poll_records records from the latest consumed offset. If we have a topic with a large number of unconsumed messages and a slow consumer that cannot keep up, the Runloop will keep on fetching new records and buffering them. This results not only in unnecessary network traffic and load on the Kafka broker but possibly also to OOM.

See for a test case that shows this issue:
https://github.com/zio/zio-kafka/pull/82/files

Runloop ends prematurely

As reported by egast on Discord, this application hangs and does nothing:

import zio._
import zio.blocking.Blocking
import zio.clock.Clock
import zio.console._
import zio.kafka.consumer.Consumer.{AutoOffsetStrategy, OffsetRetrieval}
import zio.kafka.consumer.{Consumer, ConsumerSettings, Subscription}
import zio.kafka.serde.Deserializer

object ZioKafkaTestApp extends App {
  val consumerSettings: ConsumerSettings = ConsumerSettings(List("localhost:9092"))
    .withGroupId("zio-kafka-test")
    .withOffsetRetrieval(OffsetRetrieval.Auto(AutoOffsetStrategy.Earliest))
  val consumer: ZLayer[Clock with Blocking, Throwable, Consumer[Any, String, String]] =
    Consumer.make(consumerSettings, Deserializer.string, Deserializer.string)

  override def run(args: List[String]): ZIO[zio.ZEnv, Nothing, Int] = {
    Consumer.subscribeAnd[Any, String, String](Subscription.topics("topic1"))
      .partitionedStream
      .tap(tpAndStr => putStrLn(s"topic: ${tpAndStr._1.topic}, partition: ${tpAndStr._1.partition}"))
      .flatMap(_._2.flattenChunks)
      .tap(cr => putStrLn(s"key: ${cr.record.key}, value: ${cr.record.value}"))
      .runDrain
      .provideCustomLayer(consumer)
      .fold(_ => 1, _ => 0)
  }
}

The problem appears to be that the runloop ends prematurely. Further investigation points at ZManaged#fork. Changing the .unit.toManaged_.fork in Runloop.apply to use this combinator, a modification of ZManaged#fork that uses forkDaemon solves the issue:

  def forkManagedDaemon[R, E, A](self: ZManaged[R, E, A]): ZManaged[R, Nothing, Fiber.Runtime[E, A]] =
    ZManaged {
      for {
        finalizer <- Ref.make[Exit[Any, Any] => ZIO[R, Nothing, Any]](_ => UIO.unit)
        // The reservation phase of the new `ZManaged` runs uninterruptibly;
        // so to make sure the acquire phase of the original `ZManaged` runs
        // interruptibly, we need to create an interruptible hole in the region.
        fiber <- ZIO.interruptibleMask { restore =>
                  restore(self.reserve.tap(r => finalizer.set(r.release))) >>= (_.acquire)
                }.forkDaemon
      } yield Reservation(
        acquire = UIO.succeedNow(fiber),
        release = e => fiber.interrupt *> finalizer.get.flatMap(f => f(e))
      )
    }

I will make a patch release soon and investigate further if this is a problem with ZManaged#fork and ZLayer.

cc @adamgfraser

Remove the semaphore guarding AdminClient

As helpfully reported by @sderosiaux on Discord:

KIP-117 says "The AdminClient will provide CompletableFuture-based APIs that closely reflect the requests which the brokers can handle.  The client will be multi-threaded; multiple threads will be able to safely make calls using the same AdminClient object. "

This means we can remove the Semaphore guarding the AdminClient against concurrent operations, simplifying the code.

Handle messages that the Serde cannot deserialize

Kafka Streams has a global "Exception handler" for messages that could not be deserialized or otherwise triggered exceptions.
We should somehow have something like that, otherwise a single misformatted message will rip down the app too easily (or one would have to always be aware of this happening).

In order to reduce boilerplate on the implementer's side, we should find some nice way to handle "bad messages".
I would limit handling of issues to that. All other exceptions should indeed be handled by the stream's implementer.

Add a ZIO Streams-based interface for interacting with the consumer

For starters, we need an interface that will allow consuming the consumer's data as a StreamChunk[Throwable, Record[K, V]], where Record is something like:

case class Record[K, V](key: K, value: V, timestamp: Long, metadata: Metadata)
case class Metadata(topicPartition: TopicPartition, offset: Long)

An important part for achieving good throughput is decoupling the consumption from processing:

  • The consumer must be polled in a separate fiber from the rest of the stream with the data transferred to the stream via a bounded queue
  • When the queue is full, the partitions on the consumer must be paused and polling must continue (or the consumer will be removed from the consumer group)
  • Commits should be fed back to the consumer fiber via another queue and be processed by the polling fiber

See https://github.com/iravid/fs2-kafka-streams/blob/master/src/main/scala/com/iravid/fs2/kafka/client/RecordStream.scala#L262 for a previous implementation.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.