Giter Club home page Giter Club logo

willa's Introduction

willa CircleCI Clojars Project

Willa provides a data-driven DSL on top of the Kafka Streams DSL, inspired by Onyx. It uses Jackdaw under the hood.

Willa is named after Willa Muir, who translated Kafka's "The Metamorphosis". Her husband Edwin was also involved, but apparently he "only helped".

Rationale

The Kafka Streams DSL is very "Javaish". It uses a KStreamsBuilder object to build topologies, which operates by in-place mutation. This has all the usual disadvantages of mutability, including making topologies difficult to compose, test, and visualise. The built topology is represented as a ProcessorTopology object, which can theoretically be used to manipulate the topology. However, it is extremely difficult to work with in practice - not least because the ProcessorTopology class isn't documented. The KStreamsBuilder API also re-implements many of the stateless transformation functions from the core Clojure library (map, filter, mapcat, etc.), encouraging needless code duplication.

Willa aims to provide an immutable, data-driven DSL (inspired by Onyx) on top of the Kafka Streams DSL. It represents all aspects of your topology as Clojure data structures and functions. This makes topologies far easier to manipulate and compose. For example, if you want to log every message that is published to output topics, you can write a generic pure function to transform a Willa topology to achieve this. It also enables you to visualise your topology using GraphViz, which is very useful for reasoning about how a topology works, and also for documentation purposes.

Willa uses transducers for stateless transformations, as opposed to a separate API like with the KStreamsBuilder. Transducers are far more composable, and allow you to re-use code far more effectively. They also enable you to test your transformation logic completely independently of Kafka (and Willa).

Willa also provides a mechanism for experimenting with a topology from the repl, and seeing how data flows through it. It can also be used for unit testing. This mechanism is similar in scope to Kafka's TestTopologyDriver, but has a few advantages:

  1. It gives you the output data of each individual KStream/KTable/topic within your topology, instead of just the data on the output topics.
  2. It enables you to visualise the data flow using GraphViz.
  3. It is faster, and doesn't persist anything on disk.

Getting Started

Willa represents your topology as a map, containing 3 keys:

  • :entities - an entity is a map containing information about a topic, KStream, or KTable. The :entities map is a map of identifier to entity.
  • :workflow - a vector of tuples of [input-entity-id output-entity-id], similar to a workflow in Onyx.
  • :joins - this is a map representing all the joins/merges in your topology as data. It is a map of a vector of entity names involved in the join, to a join config.

This may sound confusing, but let's try to clear things up with a simple example. Before we start, make sure you have a Kafka broker running locally, either using the Confluent distribution or Landoop's fast-data-dev docker image. Also, if you don't have an existing application, create one by running lein new my-cool-app.

Say we want a topology that simply reads messages from an input topic, increments the value, then writes to an output topic. The topology would look like this:

Simple Topology

Start by adding willa to your project.clj or deps.edn. Check the latest version on Clojars or in the badge at the top of this readme.

Next, we'll require some necessary namespaces:

(ns my-cool-app.core
  (:require [jackdaw.streams :as streams]
    [jackdaw.serdes.edn :as serdes.edn]
    [willa.core :as w]))

We then create the workflow like so:

(def workflow
  [[:input-topic :increment-stream]
   [:increment-stream :output-topic]])

You can see that data will flow from the :input-topic to the :increment-stream, then from :increment-stream to the :output-topic. Now we need to tell Willa what exactly the :input-topic, :increment-stream and :output-topic entities are. To do this, we'll create the entity config map. It looks like this:

(def entities
  {:input-topic      {::w/entity-type     :topic
                      :topic-name         "input-topic"
                      :replication-factor 1
                      :partition-count    1
                      :key-serde          (serdes.edn/serde)
                      :value-serde        (serdes.edn/serde)}
   :increment-stream {::w/entity-type   :kstream
                      :willa.core/xform (map (fn [[k v]] [k (inc v)])) ;; Note that the mapping function expects a key-value tuple
                      }
   :output-topic     {::w/entity-type     :topic
                      :topic-name         "output-topic"
                      :replication-factor 1
                      :partition-count    1
                      :key-serde          (serdes.edn/serde)
                      :value-serde        (serdes.edn/serde)}})

That's all the data Willa needs to build your topology! To get our topology up and running, we'll follow these steps:

  1. Create a KStreamsBuilder object
  2. Call the willa.core/build-topology! function, passing it the builder, workflow, and entities
  3. Create a KafkaStreams object from the builder
  4. Call start on it

The code looks like this:

(def app-config
  {"application.id"            "my-cool-app"
   "bootstrap.servers"         "localhost:9092"
   "cache.max.bytes.buffering" "0"})

(def topology
  {:workflow workflow
   :entities entities})

(defn start! []
  (let [builder (doto (streams/streams-builder) ;; step 1
                  (w/build-topology! topology)) ;; step 2
        kstreams-app (streams/kafka-streams builder app-config) ;; step 3
        ]
    (streams/start kstreams-app) ;; step 4
    kstreams-app))

You can verify that it works by running the following commands in your repl:

(require 'jackdaw.client
         'jackdaw.admin
         'willa.streams)

(def admin-client (jackdaw.admin/->AdminClient app-config))
;; create the input and output topics
(jackdaw.admin/create-topics! admin-client [(:input-topic entities) (:output-topic entities)])

;; start the topology
(def kstreams-app (start!))

;; create a Kafka Producer, and produce a message with value 1 to the input topic
(def producer (jackdaw.client/producer app-config
                                       willa.streams/default-serdes))
@(jackdaw.client/send! producer (jackdaw.data/->ProducerRecord (:input-topic entities) "key" 1))

;; create a Kafka Consumer, and consume everything from the output topic                                     
(def consumer (jackdaw.client/consumer (assoc app-config "group.id" "consumer")
                                       willa.streams/default-serdes))
(jackdaw.client/subscribe consumer [(:output-topic entities)])
(jackdaw.client/seek-to-beginning-eager consumer)

;; should return something like: [{:key "key" :value 2}] 
(->> (jackdaw.client/poll consumer 200)
     (map #(select-keys % [:key :value])))                                           

Going Further

One of the cool features of Willa is that you can visualise your topology. To do this, first make sure you have graphviz installed, then run these commands in your repl:

(require 'willa.viz)

(willa.viz/view-topology topology)

A diagram of your topology should pop up in a separate window.

You can also use Willa to experiment with your Topology. For instance, you might want to know what would happen if you receive a message with value 1. To do this, we'll use the run-experiment function in the willa.experiment namespace. This function takes a topology and a map of entity id to records. Each record must contain the :key, :value, and :timestamp keys. The code looks like this:

(require 'willa.experiment)

(def experiment-results
  ;; should return the topology map, but with each entity updated with a :willa.experiment/output key
  (willa.experiment/run-experiment topology
                                   {:input-topic [{:key       "some-key"
                                                   :value     1
                                                   :timestamp 0}]}))

;; you can now visualise how data flows through the topology in a diagram
(willa.viz/view-topology experiment-results)                                              

Reference

Entity Config

Key Required? Valid Entity Types Description
:willa.core/entity-type All The type of the entity. Can be one of: :topic, :kstream, :ktable, or :global-ktable
:topic-name :topic The name of the topic
:key-serde :topic The serde to use to serialize/deserialize the keys of records on the topic
:value-serde :topic The serde to use to serialize/deserialize the values of records on the topic
:willa.core/xform :kstream A transducer to apply to the KStream
:willa.core/group-by-fn :ktable A function which takes a key-value pair, and returns the key of the group. If this key is present, :willa.core/aggregate-adder-fn and :willa.core/aggregate-initial-value must also be provided.
:willa.core/window :ktable The windowing to apply after grouping the input records. Should be either a Windows or a SessionWindows object. If this key is present, :willa.core/group-by must also be provided. Will cause the input to be coerced to a KStream
:willa.core/aggregate-initial-value :ktable The initial value to use in an aggregation. Must be provided if :willa.core/aggregate-adder-fn is present
:willa.core/aggregate-adder-fn :ktable The aggregator function if the input is a KStream, or the "adder" function if it is a KTable. If this key is present, :willa.core/group-by must also be provided.
:willa.core/aggregate-subtractor-fn :ktable The aggregate "subtractor" function, only valid if the input is a KTable. If this key is present, :willa.core/group-by must also be provided.
:willa.core/suppression :ktable A Suppressed object that determines how updates to the KTable are emitted. See the Kafka Streams docs for more info
:willa.core/store-name :ktable The local state store name to use for the KTable.
willa.overrides/prevent-repartition kstream Set to true to prevent a repartition of the input topic. This prevents the provided xform from changing the message key. Setting this override will cause problems if the xform does change the message key - use with caution!

Join Config

Key Description
:willa.core/join-type The type of the join. Can be one of :merge, :left, :inner or :outer.
:willa.core/window A JoinWindows object that specifies the windowing to be used in the join. Not used when the join type is :merge or when joining a :kstream and a :global-ktable
:willa.core/kv-mapper The kv-mapper function to use when joining a :kstream with a :global-ktable. Extracts a key of the GlobalKTable from each [k v] of the stream. If not specified, the key of the stream is used. Not used when joining other kinds of objects. See the Kafka Streams docs for more info

License

This program and the accompanying materials are made available under the terms of the GPL V3 license, which is available at https://www.gnu.org/licenses/gpl-3.0.en.html.

willa's People

Contributors

bandoos avatar bpringe avatar cjbarre avatar davewm avatar kschltz avatar kylecbrodie avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar

willa's Issues

Undefined var in README.md

(def topology 
  {:workflow workflow
   :entities entities
   :joins joins})

joins is not defined anywhere above this.
I guess it will be {} in this case?

Typo in readme?

Hi, thanks for this library and its nice readme.
I think there is an error where you define the entities:

(def entities
  {:input-topic {::w/entity-type :topic
                 :topic-name "output-topic" ;; <======== HERE =======
                 :replication-factor 1
                 :partition-count 1
                 :key-serde (serdes.edn/serde)
                 :value-serde (serdes.edn/serde)}
   :increment-stream {::w/entity-type :kstream
                      :willa.core/xform (map (fn [[k v]] [k (inc v)])) ;; Note that the mapping function expects a key-value tuple
                      }
   :output-topic {::w/entity-type :topic
                  :topic-name "output-topic"
                  :replication-factor 1
                  :partition-count 1
                  :key-serde (serdes.edn/serde)
                  :value-serde (serdes.edn/serde)}})

I think you meant "input-topic" there!

Thanks again.

Hooking into a Kstream directly from code, rather than from a topic

Hi, our org just switched from Apache Camel to your lib, and we are liking it so far.

Question: is there any way to call a KStream directly with a function? Maybe I'm asking too high up the stack here but perhaps you know. With Camel, you could call! any route directly from your code synchronously or asynchronously, and it would flow through all the downstream routes as you've defined them. We'd like to do the same thing with Kafka Streams, given the key describing the stream in our Willa topology.

Avoiding repartitions when not transforming keys

Problem

When using ::willa/xform kafka streams will always repartition the topic, since we might have changed the key. There should be some way of transforming only the values, to make it possible to avoid repartitioning.

Suggested solution

Add ::willa/xform-values which takes a transducer of the shape (map (fn [[k v]] v)).

Custom store name not used for aggregations

Hello! Thanks for the library.

I noticed that the recently added custom store-name key is not used if an :aggregate-adder-fn is also defined. Is this working as intended?

Also, the key name mentioned in the issue for the original PR mentions :willa.core/store-name, but tE code seems to use :willa.streams/store.name, is that right?

README.md with error

where it reads:
(w/build-workflow! topology)) should be
(w/build-topology! topology))

Tried to update all dependcies to latest, and lein test failed...

Not sure if this is supposed to work, but FYI:

Syntax error (ClassNotFoundException) compiling new at (jackdaw/test.clj:250:5). org.apache.kafka.streams.processor.internals.ChangelogRegister

and the second time I run lein test I get:

Syntax error (ClassNotFoundException) compiling new at (jackdaw/test.clj:250:5). org.apache.kafka.streams.processor.internals.ClientUtils$QuietStreamsConfig

Here is the project.clj I tried to use:

(defproject willa "0.2.0"
  :description "A Clojure DSL for Kafka Streams"
  :url "https://github.com/DaveWM/willa"
  :license {:name "GPL V3"
            :url "https://www.gnu.org/licenses/gpl-3.0.en.html"}
  :dependencies [[org.clojure/clojure "1.10.3"]
                 [fundingcircle/jackdaw "0.7.8"]
                 [aysylu/loom "1.0.2"]
                 [rhizome "0.2.9"]
                 [org.clojure/math.combinatorics "0.1.6"]]
  :repl-options {:init-ns willa.core}
  :repositories [["confluent" "https://packages.confluent.io/maven/"]
                 ["clojars" "https://clojars.org/repo/"]]
  :profiles {:test {:dependencies [[org.apache.kafka/kafka-streams-test-utils "2.7.0"]
                                   [log4j/log4j "1.2.17"]]}
             :dev {:dependencies [[org.clojure/test.check "1.1.0"]]}
             :ci {:jvm-opts ["-Djava.awt.headless=true"]}})

I rarely use lein test, so maybe I don't know what I am doing....

Then I went back to your master branch, and that branch tests fine (as one would expect):

lein test willa.test-utils
lein test willa.unit.core-test
lein test willa.unit.experiment-and-ttd-congruity-test
lein test willa.unit.streams-test
lein test willa.unit.utils-test

Ran 19 tests containing 43 assertions.
0 failures, 0 errors.

No method in multimethod 'coerce-to-ktable' when building topology

When I try to build this topology I get a No method in multimethod 'coerce-to-ktable' for dispatch value: class jackdaw.streams.interop.CljTimeWindowedKStream exception.

Changing the :ktable to :kstream will allow the topology to build but I think it produces different semantics of the stream.

(def workflow
  [[:input-topic :aggregate-stream]
   [:aggregate-stream :output-topic]])

(def entities
 {:input-topic {::w/entity-type :topic
                :topic-name "timed-readings-topic"
                :replication-factor 1
                :partition-count 1
                :key-serde (serdes.edn/serde)
                :value-serde (serdes.edn/serde)}
  :aggregate-stream {::w/entity-type :ktable
                     :willa.core/group-by-fn (fn [[k v]] (:device-number v))
                     :willa.core/window (TimeWindows/of (* 2 60 1000))} 
  :output-topic {::w/entity-type :topic
                 :topic-name "timed-readings-output-topic"
                 :replication-factor 1
                 :partition-count 1
                 :key-serde (serdes.edn/serde)
                 :value-serde (serdes.edn/serde)}})

Documentation & error message about graphviz dependency

Hey there! My co-worker recommended your library to me and I was reading through your blog post.

It looks great!

When I ran (wv/view-topology topology), the following exception was thrown:

Execution error (IOException) at java.lang.UNIXProcess/forkAndExec (UNIXProcess.java:-2). error=2, No such file or directory

After some digging, it looks like rhizome has a dependency on graphviz, which has to be installed by an os package manager. Installing graphviz solves the issue.

I think it would be helpful to have some documentation noting the need to install the dependency to use this method & maybe a try around the usage of rhizome to help suggest this as a possible solution.

aggregate-subtractor-fn not in willa.experiment

When running an experiment in willa where I have a ktable that has a key added to it then removed from it the aggregate-subtractor-fn I supply does not seem to get called. When looking in the willa.experiment namespace I see references to aggregate-adder-fn though not to aggregate-subtractor-fun. Does the experiment module currently not support where you add a key to a ktable then remove it it?

Avoid viz dependency in core (HeadlessException in docker)

If I load willa.core in a repl running in docker, I see this error:

Syntax error (HeadlessException) compiling at (viz.clj:27:1).

No X11 DISPLAY variable was set, but this program performed an operation which requires it.
class clojure.lang.Compiler$CompilerException

Could willa.core be restructured to not load viz or rhizome stuff?

Is it possible to name my state store when ::w/entity-type is :ktable?

I couldn't figure this one out and it's not really referenced in the documentation that I could see. My use-case is establishing a ktable with the intention to query it outside of the topology as a local state store. I need a consistent name instead of a gensym for obvious reasons. I looked in the code and believe I determined that this is not supported, so I have pushed a PR for you to check out, to see if it's something you'd like to support.

#14

thoughts on kstream branch?

I wanted to check in to see if you have put any thought onto how support for kstream branching via jackdaw.streams/branch might work. Seems like it might be representable in the willa build-topology! map but there's probably options there about the exact structure of the data form and how the code to build would look.

Upgrade Willa's Jackdaw Dep to v0.8.0

Something may have changed within the Kafka Streams API. Trying to bump the jackdaw version for Willa causes tests to fail because:

java.lang.NoSuchMethodError: 'org.apache.kafka.streams.processor.internals.ProcessorTopology org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.build(java.lang.Integer)

I'm just starting to keep up with this stuff, it looks like InternalTopologyBuilder has been deprecated.

Any idea what needs to be done @DaveWM?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.