Giter Club home page Giter Club logo

kafka-streams-scala's Introduction

kafka-streams-scala

This is a thin Scala wrapper for the Kafka Streams API. It does not intend to provide a Scala-idiomatic API, but rather intends to make the original API simpler to use from Scala. In particular, it provides the following adjustments:

  • Scala lambda expressions can be used directly
  • when aggregating and counting, counts are converted from Java Longs to Scala Longs
  • when using a flatMap operation, this lets you use a Scala Iterable
  • Serdes (Serializers/Deserializers) can be implicitly found in the scope

This API also contains a few Serdes (Serializers/Deserializers):

  • to convert Scala Int/Long/Double to/from their binary representation
  • to convert Scala Int/Long/Double to/from string representation
  • to convert case classes to/from JSON

Finally, the API provides the following extensions:

  • KStreamS.split() (see documentation below)

Usage of the Kafka Streams API in Scala

The main objects are:

  • KStreamsBuilderS as the entry point to build streams or tables
  • KStreamS as a wrapper around KStream
  • KGroupedStreamS as a wrapper around KGroupedStream
  • KTableS as a wrapper around KTable
  • KGroupedTable as a wrapper around KGroupedTable

Using the builder

With the original Java API, you would create an instance of KStreamBuilder, then use it to create streams or tables. Here, KStreamsBuilderS is an object that can be used directly:

val stream: KStreamS[String, String] = KStreamBuilderS.stream[String, String]("my-stream")

val table: KTableS[String, String] = KStreamBuilderS.table[String, String]("my-table")

When starting the application, you just need to unwrap the KStreamBuilder by calling KStreamBuilderS.inner:

val streams = new KafkaStreams(KStreamBuilderS.inner, config)

Serdes (declare them as implicit)

It is a common mistake to forget to specify Serdes when using the Java API, then resulting in class cast errors when objects are serialized or deserialized.

To work around this issue, this API requires Serdes to be used. Most of the times, it is enough to declare your Serdes as implicit values, and they will be picked up automatically:

implicit val stringSerde: Serde[String] = Serdes.String()
implicit val userSerde: Serde[User] = new MyUserSerde

val usersByIdStream = KStreamBuilderS.stream[String, User]("users-by-id")

Resolution is based on the type of the object to serialize/deserialize, so make sure you have a Serde of the appropriate type. If not, you should see an error such as:

Error:(87, 80) could not find implicit value for parameter keySerde: org.apache.kafka.common.serialization.Serde[String]

If, on the other hand, you have multiple Serdes for the same type, you might see the following error:

Error:(88, 80) ambiguous implicit values:
 both value stringSerde2 of type org.apache.kafka.common.serialization.Serde[String]
 and value stringSerde1 of type org.apache.kafka.common.serialization.Serde[String]
 match expected type org.apache.kafka.common.serialization.Serde[String]

In this case, just pass the Serde explicitly:

val usersByIdStream = KStreamBuilderS.stream[String, User]("users-by-id")(stringSerde, userSerde)

Usage of the Serdes in Scala

To convert Scala Int/Long/Double to/from their binary representation:

import com.github.aseigneurin.kafka.serialization.scala._

implicit val intSerde = IntAsStringSerde
implicit val longSerde = LongAsStringSerde
implicit val doubleSerde = DoubleAsStringSerde

To convert Scala Int/Long/Double to/from string representation:

import com.github.aseigneurin.kafka.serialization.scala._

implicit val intSerde = IntSerde
implicit val longSerde = LongSerde
implicit val doubleSerde = DoubleSerde

To convert case classes to/from JSON:

  • define a case class
  • create an instance of JsonSerde with the case class as the generic type

Example:

import com.github.aseigneurin.kafka.serialization.scala._

case class User(name: String)

implicit val stringSerde = Serdes.String
implicit val userSerde = new JsonSerde[User]

// read JSON -> case class
KStreamBuilderS.stream[String, User]("users")
  .mapValues { user => user.name }
  .to("names")

// write case class -> JSON
KStreamBuilderS.stream[String, String]("names")
  .mapValues { name => User(name) }
  .to("users")

Example

This repository contains a Scala version of the Java Word Count Demo.

Here is the code to implement a word count:

val props = new Properties()
// ...

implicit val stringSerde = Serdes.String
implicit val longSerde = LongAsStringSerde

val source = KStreamBuilderS.stream[String, String]("streams-file-input")

val counts: KTableS[String, Long] = source
  .flatMapValues { value => value.toLowerCase(Locale.getDefault).split(" ") }
  .map { (_, value) => (value, value) }
  .groupByKey
  .count("Counts")

counts.to("streams-wordcount-output")

val streams = new KafkaStreams(KStreamBuilderS.inner, props)
streams.start()

Extensions

KStreamS.split()

This method applies a predicate and returns two KStreamSs, one with the messages that match the predicate, and another one with the messages that don't match.

The two KStreamSs are returned in a tuple that can be easily deconstructed:

def isValidMessage(v: ...): Boolean = ???

val (goodMessages, badMessages) = deserializedMessages.split((k, v) => isValidMessage(v))

kafka-streams-scala's People

Contributors

aseigneurin avatar davidrupp avatar

Stargazers

Tyson Cung avatar

Watchers

James Cloos avatar

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.