Giter Club home page Giter Club logo

kotlin-kafka's Introduction

Module kotlin-kafka

Maven Central

Rationale

At the time of starting this repository I didn't find any bindings between Kafka SDK and Kotlin suspension, or KotlinX Coroutines Flow. These operators should be implemented low-level, so they can guarantee correct cancellation support, and high optimised runtimes.

Some important aspects of Kafka are tricky to implement with the "low-level" Kafka API, especially properly streaming records from Kafka and correctly committing them. Additional complexity is involved in this process, more details here.

To solve these problems a couple of projects in the JVM already exist:

There was no implementation for KotlinX Coroutines Flow, you can however quite easily use reactor-kafka with KotlinX Coroutines Reactor bindings.

This project implements the same strategies as [reactor-kafka] directly on top of KotlinX Coroutines to benefit from all their benefits, and to open the door to potentially becoming a Kotlin MPP library in the future.

Goals

  • Lean Core library built on top of Kotlin Std & KotlinX Coroutines
  • Extensions to easily operate over the Kafka SDK with KotlinX Coroutines and suspend.
  • Flow based operators, so you can easily compose KotlinX Flow based Kafka programs
  • Strong guarantees about committing record offsets, and performance optimisations in regard to re-balancing/partitioning.
  • example for testing Kafka with Test Containers in Kotlin.

Adding Dependency

Simply add the following dependency as implementation in the build.gradle dependencies` block.

dependencies {
  implementation("io.github.nomisrev:kotlin-kafka:0.3.0")
}

Example

@JvmInline
value class Key(val index: Int)

@JvmInline
value class Message(val content: String)

fun main(): Unit = runBlocking(Dispatchers.Default) {
  val topicName = "test-topic"
  val msgCount = 10
  val kafka = Kafka.container

  Admin(AdminSettings(kafka.bootstrapServers)).use { client ->
    client.createTopic(NewTopic(topicName, 1, 1))
  }

  coroutineScope { // Run produces and consumer in a single scope
    launch(Dispatchers.IO) { // Send 20 messages, and then close the producer
      val settings: ProducerSettings<Key, Message> = ProducerSettings(
        kafka.bootstrapServers,
        IntegerSerializer().imap { key: Key -> key.index },
        StringSerializer().imap { msg: Message -> msg.content },
        Acks.All
      )
      (1..msgCount)
        .asFlow()
        .map { index -> ProducerRecord(topicName, Key(index), Message("msg: $index")) }
        .produce(settings)
        .collect(::println)
    }

    launch(Dispatchers.IO) { // Consume 20 messages as a stream, and then close the consumer
      val settings: ReceiverSettings<Key, Message> = ReceiverSettings(
        kafka.bootstrapServers,
        IntegerDeserializer().map(::Key),
        StringDeserializer().map(::Message),
        groupId = UUID.randomUUID().toString(),
        autoOffsetReset = AutoOffsetReset.Earliest
      )
      KafkaReceiver(settings)
        .receive(topicName)
        .take(msgCount)
        .map { "${it.key()} -> ${it.value()}" }
        .collect(::println)
    }
  }
}

You can get the full code here.

test-topic-0@0
test-topic-0@1
test-topic-0@2
test-topic-0@3
test-topic-0@4
test-topic-0@5
test-topic-0@6
test-topic-0@7
test-topic-0@8
test-topic-0@9
Key(index=1) -> Message(content=msg: 1)
Key(index=2) -> Message(content=msg: 2)
Key(index=3) -> Message(content=msg: 3)
Key(index=4) -> Message(content=msg: 4)
Key(index=5) -> Message(content=msg: 5)
Key(index=6) -> Message(content=msg: 6)
Key(index=7) -> Message(content=msg: 7)
Key(index=8) -> Message(content=msg: 8)
Key(index=9) -> Message(content=msg: 9)
Key(index=10) -> Message(content=msg: 10)

kotlin-kafka's People

Contributors

nomisrev avatar renovate[bot] avatar renovate-bot avatar

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.