Giter Club home page Giter Club logo

akka-analytics's Introduction

Akka Analytics

Large-scale event processing with Akka Persistence and Apache Spark. At the moment you can

Dependencies

resolvers += "krasserm at bintray" at "http://dl.bintray.com/krasserm/maven"

libraryDependencies ++= Seq(
  "com.github.krasserm" %% "akka-analytics-cassandra" % “0.3”,
  "com.github.krasserm" %% "akka-analytics-kafka" % “0.3”
)

Event batch processing

With akka-analytics-cassandra you can expose and process events written by all persistent actors as resilient distributed dataset (RDD). It uses the Spark Cassandra Connector to fetch data from the Cassandra journal. Here's a primitive example (details here):

import akka.actor.ActorSystem

import org.apache.spark.rdd.RDD
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._

import akka.analytics.cassandra._

val conf = new SparkConf()
 .setAppName("CassandraExample")
 .setMaster("local[4]")
 .set("spark.cassandra.connection.host", "127.0.0.1")

val sc = new SparkContext(conf)

// expose journaled Akka Persistence events as RDD
val rdd: RDD[(JournalKey, Any)] = sc.eventTable().cache()

// and do some processing ... 
rdd.sortByKey().map(...).filter(...).collect().foreach(println)

The dataset generated by eventTable() is of type RDD[(JournalKey, Any)] where Any represents the persisted event (see also Akka Persistence API) and JournalKey is defined as

package akka.analytics.cassandra

case class JournalKey(persistenceId: String, partition: Long, sequenceNr: Long)

Events for a given persistenceId are partitioned across nodes in the Cassandra cluster where the partition is represented by the partition field in the key. The eventTable() method returns an RDD in which events with the same persistenceId - partition combination (= cluster partition) are ordered by increasing sequenceNr but the ordering across cluster partitions is not defined. If needed the RDD can be sorted with sortByKey() by persistenceId, partition and sequenceNr in that order of significance. Btw, the default size of a cluster partition in the Cassandra journal is 5000000 events (see akka-persistence-cassandra).

Event stream processing

With akka-analytics-kafka you can expose and process events written by all persistent actors (more specific, from any user-defined topic) as discretized stream (DStream). Here's a primitive example (details here):

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.DStream

import akka.analytics.kafka._
import akka.persistence.kafka.Event

val sparkConf = new SparkConf()
  .setAppName("events-consumer")
  .setMaster("local[4]")

// read from user-defined events topic 
// with 2 threads (see also Kafka API) 
val topics = Map("events" -> 2)
val params = Map[String, String](
  "group.id" -> "events-consumer",
  "auto.commit.enable" -> "false",  
  "auto.offset.reset" -> "smallest",
  "zookeeper.connect" -> "localhost:2181",
  "zookeeper.connection.timeout.ms" -> "10000")

val ssc = new StreamingContext(sparkConf, Seconds(1))
val es: DStream[Event] = ssc.eventStream(params, topics)

es.foreachRDD(rdd => rdd.map(...).filter(...).collect().foreach(println))

ssc.start()
ssc.awaitTermination()

The stream generated by eventStream(...) is of type DStream[Event] where Event is defined in akka-persistence-kafka as

package akka.persistence.kafka

/**
 * Event published to user-defined topics.
 *
 * @param persistenceId Id of the persistent actor that generates event `data`.
 * @param sequenceNr Sequence number of the event.
 * @param data Event data generated by a persistent actor.
 */
case class Event(persistenceId: String, sequenceNr: Long, data: Any)

The stream of events (written by all persistent actors) is partially ordered i.e. events with the same persistenceId are ordered by sequenceNr whereas the ordering of events with different persistenceId is not defined. Details about Kafka consumer params are described here.

Custom serialization

If events have been persisted with a custom serializer, the corresponding Akka serializer configuration must be specified for event processing. For event batch processing this is done as follows:

val system: ActorSystem = ...
val jsc: JournalSparkContext = 
  new SparkContext(sparkConfig).withSerializerConfig(system.settings.config)

val rdd: RDD[(JournalKey, Any)] = jsc.eventTable()
// ...

jsc.context.stop()

For event stream processing this is done in a similar way:

val system: ActorSystem = ...
val jsc: JournalStreamingContext = 
  new StreamingContext(sparkConfig, Seconds(1)).withSerializerConfig(system.settings.config)

val es: DStream[Event] = jsc.eventStream(kafkaParams, kafkaTopics)
// ...

jsc.context.start()
// ...

jsc.context.stop()

Running examples are akka.analytics.cassandra.CustomSerializationSpec and akka.analytics.kafka.CustomSerializationSpec.

akka-analytics's People

Contributors

krasserm avatar

Watchers

 avatar  avatar

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.