Giter Club home page Giter Club logo

akka-kafka's Introduction

akka-kafka

Actor based kafka consumer built on top of the high level kafka consumer.

Manages backpressure so the consumer doesn't overwhelm other parts of the system. The consumer allows asynchronous/concurrent processing of a configurable, bounded number of in-flight messages.

Commits offsets at a configurable interval, after a configurable number of messages are processed, and also programatically. It waits until all in flight messages are processed at commit time, so you know that everything that's committed has been processed.

use

Add the dependencies to your project. akka-kafka by default excludes kafka's logging dependencies, in favor of slf4j, but does so without forcing you to do the same, so you need to add a dependency that brings in log4j or slf4j's log4j.

If you dont want to use slf4j, you can substitute log4j.

/* build.sbt */

libraryDependencies += "com.sclasen" %% "akka-kafka" % "0.0.3" % "compile"

libraryDependencies += "com.typesafe.akka" %% "akka-slf4j" % "2.3.2" % "compile"

libraryDependencies += "org.slf4j" % "log4j-over-slf4j" % "1.6.6" % "compile"

To use this library you must provide it with an actorRef that will receive messages from kafka, and will reply to the sender with StreamFSM.Processed after a message has been successfully processed. If you do not reply in this way to every single message received, the connector will not be able to drain all in-flight messages at commit time, and will hang.

Here is an example of such an actor that just prints the messages.

class Printer extends Actor{

  def receive = {
    case x:Any =>
      println(x)
      sender ! StreamFSM.Processed
  }

}

The consumer is configured with an instance of AkkaConsumerProps, which looks like this.

case class AkkaConsumerProps[Key,Msg](system:ActorSystem,
                                      zkConnect:String,
                                      topic:String,
                                      group:String,
                                      streams:Int,
                                      keyDecoder:Decoder[Key],
                                      msgDecoder:Decoder[Msg],
                                      receiver: ActorRef,
                                      maxInFlightPerStream:Int = 64,
                                      commitInterval:FiniteDuration = 10 seconds,
                                      commitAfterMsgCount:Int = 10000,
                                      startTimeout:Timeout = Timeout(5 seconds),
                                      commitTimeout:Timeout = Timeout(5 seconds)
                                       )

So a full example of getting a consumer up and running looks like this.

import akka.actor.{Props, ActorSystem, Actor}
import com.sclasen.akka.kafka.{AkkaConsumer, AkkaConsumerProps, StreamFSM}
import kafka.serializer.DefaultDecoder

object Example {
  class Printer extends Actor{
    def receive = {
      case x:Any =>
        println(x)
        sender ! StreamFSM.Processed
    }
  }

  val system = ActorSystem("test")
  val printer = system.actorOf(Props[Printer])


  /*
  the consumer will have 4 streams and max 64 messages per stream in flight, for a total of 256
  concurrently processed messages.
  */
  val consumerProps = AkkaConsumerProps(
    system = system,
    zkConnect = "localhost:2181",
    topic = "your-kafka-topic",
    group = "your-consumer-group",
    streams = 4, //one per partition
    keyDecoder = new DefaultDecoder(),
    msgDecoder = new DefaultDecoder(),
    receiver = printer,
    maxInFlightPerStream = 64
  )

  val consumer = new AkkaConsumer(consumerProps)

  consumer.start()  //returns a Future[Unit] that completes when the connector is started

  consumer.commit() //returns a Future[Unit] that completes when all in-flight messages are processed and offsets are committed.

}

configure

The reference.conf file is used to provide defaults for the properties used to configure the underlying kafka consumer.

To override any of these properties, use the standard akka application.conf mechanism.

Note that the property values must be strings, as that is how kafka expects them.

Do NOT set the consumer.timeout.ms to -1. This will break everything. Don't set auto.commit.enable either.

kafka.consumer {
    zookeeper.connection.timeout.ms = "10000"
    auto.commit.enable = "false"
    zookeeper.session.timeout.ms = "1000"
    zookeeper.sync.time.ms =  "1000"
    consumer.timeout.ms =  "400"
}

develop

Assumes you have forego installed. if not, go get github.com/ddollar/forego

./bin/setup         #installs kafka to ./kafka-install
cp .env.sample .env #used by forego to set your env vars
forego start        #starts zookeeper and kafka

In another shell

forego run sbt

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.