Giter Club home page Giter Club logo

kafka-common's Introduction

kafka-common Java CI with Gradle Maven Central

Common Java utilities for Apache Kafka. Currently the library provides support for Event Sourcing [1], [2], [3] and Json Serde.



Overview

This librray supports treating Kafka as a persistent database, and we casually refer to this as Event Sourcing. We're using the term Event Sourcing to mean a real-time feed of changes, that can also be replayed to build the full state for anyone (re)joining mid-stream. This is accomplished in Kafka with log compacted topics. We are using an Event Sourcing scheme with a simple entire record (database row / entity) per message approach. All fields for a given record are always present (or implied null). The entire state of a given entity is always stored in a single message, the message key is required to be non-null and acts as the "primary key" in relational database terms, and the message value is the rest of the entity state (some fields may still act as a 'foreign key' reference though). To make an update simply produce a new message with the same key containing the full state of the record. To delete a record, produce a tombstone message (null).

We could have instead allowed a more granular scheme where a message only contains a single field that is changing (key would then need to include field name in order for compaction to play nicely), and that would have at least two benefits:

  1. Clients would not need to know the entire state of a record when making single field changes (currently clients must re-set even the fields they aren't intending to change in the record)
  2. Save on single message size with single field changes (good with large records with a single field that changes frequently)

We went with the atomic strategy since it is easier to deal with - clients don't have to merge records, they're always replace and our use-cases so far haven't required stateless producers or small individual messages. Plus aggregate record size is smaller since no need for a composite key (field + record ID).

The Kafka Consumer API uses polling (pull), this library adds a thread that does that for you and pushes events to registered listeners. You can configure the behavior of the polling thread through configuration.

Why not use Consumer API or Kafka Streams KTable API?

This library is somewhere in-between the Kafka Consumer API and the Kafka Streams KTable API. You could just use the regular Consumer API just fine, but you'd likely end-up duplicating some code in each of your clients that can be refactored into a shared library - that's where this lib came from. The Kafka Streams KTable API is overkill for many simple apps such as a command line utilty that just reads all records, dumps them to the console, and quits. Other use-cases where Kafka Streams KTable is overkill include the epics2kafka app, which consumes from a "command" topic: a relatively small, single partion, unreplicated topic that instructs the app what to do, and the app is already a Kafka Connect app so embedding a Kafka Streams app inside would add awkward complexity. Finally, the jaws-admin-gui proxies Kakfa topics to web browser clients via Server Sent Events, and in this case it's a one-to-one connection that wouldn't benefit from the Kafka Streams infrastructure, but would certainly be complicated by it.

Install

This library requires a Java 11+ JVM and standard library at run time.

You can obtain the library jar file from the Maven Central repository directly or from a Maven friendly build tool with the following coordinates (Gradle example shown):

implementation 'org.jlab:kafka-common:<version>'

Check the Release Notes to see what has changed in each version.

API

Javadocs

Configure

The EventSourceTable class (simplier version of KTable with some similarities to a standard Kafka Consumer class) is configured with the EventSourceConfig class, which extends the common Kafka AbstractConfig.

Build

This project is built with Java 17 (compiled to Java 11 bytecode), and uses the Gradle 7 build tool to automatically download dependencies and build the project from source:

git clone https://github.com/JeffersonLab/kafka-common
cd kafka-common
gradlew build

Note: If you do not already have Gradle installed, it will be installed automatically by the wrapper script included in the source

Note for JLab On-Site Users: Jefferson Lab has an intercepting proxy

Test

Continuous Integration (CI) is setup using GitHub Actions, so on push tests are automatically run unless [no ci] is included in the commit message. Tests can be manually run on a local workstation using:

docker compose up

Wait for containers to start then:

gradlew integrationTest

Note: By default integration tests require localhost port 9094 be available for Kafka.

Release

  1. Bump the version number in build.gradle and commit and push to GitHub (using Semantic Versioning).
  2. Create a new release on the GitHub Releases page corresponding to same version in build.gradle (Enumerate changes and link issues)
  3. Publish to Maven Central GitHub Action should run automatically.
  4. Publish to gh-pages GitHub Action should run automatically.

See Also

kafka-common's People

Contributors

slominskir avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

kafka-common's Issues

EventSourceConfig internal use clumsy

The Kafka AbstractConfig class is confusing and is likely not being used optimally. See:

public EventSourceTable(Properties props) {
config = new EventSourceConfig(props);
// Not sure if there is a better way to get configs (with defaults) from EventSourceConfig into a
// Properties object (or Map) for KafkaConsumer - we manually copy values over into a new clean Properties.
// Tried the following without success:
// - if you use config.valuesWithPrefixOverride() to obtain consumer props it will compile, but serialization
// may fail at runtime w/ClassCastException! (I guess stuff is mangled from String to Objects or something)
// - if you simply pass the constructor argument above "props" along to KafkaConsumer, the defaults for missing
// values won't be set.
Properties consumerProps = new Properties();
// Pass values in as is from user (without defaults); then next we'll ensure defaults are used if needed
// Note: using new Properties(props) does NOT work as that sets the defaults field inside the Properties object,
// which are not carried over later
// inside the KafkaConsumer constructor when it also creates a new Properties and uses putAll().
consumerProps.putAll(props);
consumerProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getString(EventSourceConfig.EVENT_SOURCE_BOOTSTRAP_SERVERS));
consumerProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, config.getString(EventSourceConfig.EVENT_SOURCE_GROUP));
consumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, config.getString(EventSourceConfig.EVENT_SOURCE_KEY_DESERIALIZER));
consumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, config.getString(EventSourceConfig.EVENT_SOURCE_VALUE_DESERIALIZER));
// Deserializer specific configs are passed in via putAll(props) and don't have defaults in EventSourceConfig
// Examples:
// - KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG
// - KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG
consumer = new KafkaConsumer<K, V>(consumerProps);
}

One specific problem is properties that ConsumerConfig is unaware of such as event.source.topic and compacted.cache are passed through to the internal Consumer causing warnings in the logs. To be fair this happens internal to Kafka in a bunch of places as well so in practice you really just have to adjust the logger settings to ignore these warnings anyways.

JsonDeserializer may need to be abstract

Currently in order to use the JsonDeserializer using the ProducerConfig API in which you simply pass a String containing the full class name of the deserializer you must extend JsonSerializer to provide the Java entity class (example). Alternatively the example given by Apache Kafka docs is to use the alternative ProducerConfig API in which you pass an already configured Deserializer object, so therefore support a configure method as shown in the developer guide. We're currently just relying on the constructor, but it suffers a similar issue of not supporting simple class name String configuration. There may be other strategies as well. Needs investigation.

Add test case for batch updates during init

Comparing code to Python version I noticed it looks like the unused scenario where a consumer is interested in batch updates during init (before high water) may be not clearing the batches (they appear to accumulate). We should have a test case for this scenario anyways that will prove/disprove the code is working. This use-case is only used by Python library in JAWS (CLI monitor).

List<EventSourceRecord<K, V>> eventRecords = new ArrayList<>();

Move this line into the while loop below it?

Update testing strategy

Currently we run integration tests during the build and there are no "unit" tests. We should create some unit tests and move the current integration tests to an integrationTest SourceSet in Gradle. Further, we may consider switching from testcontainers to docker compose for consistency with other projects. See: Integration Strategy. Example project that has integration SourceSet: https://github.com/JeffersonLab/jaws-libj

awaitHighWaterOffset does not return status

Currently the awaitHighWaterOffset method does not return boolean status indicating whether Timeout occurred or signal was received.

This is already fixed by the following commit:

ff1117b

We need a release still.

Consider Explicit vs Indirect Seek to beginning

Currently we explicitly seek to the beginning of an event sourced topic. We could possibly rely on Kafka to do it for us by using a random group.id and auto.offset.reset = 'earliest'. Consider two use-cases:

Clients that cache and resume
We currently are manually managing resume offsets (client side). The __consumer_offsets topic server-side is already doing this and it is unclear how many resources are wasted not using it (we likely need to at least disable auto-commit offsets with enable.auto.commit = false to minimize waste). If we try to rely on Kafka for consumer offsets then in the Server Sent Events web proxy use-case we'd need the web browser client to ack / commit offsets back to the proxy and that's just a bunch of probably unnecessary overhead as that app just stores the offsets with its materialized view in IndexDB currently.

Clients that always get entire topic from scratch
With clients that never cache a materialized view and thus never try to resume it later - clients that always rewind and replay all messages - using a random group.id with 'earliest' auto.offset.reset has potential implications on __consumer_offsets as well. Even if we've disabled enable.auto.commit, and we never manually commit offsets, it's still possible Kafka is wasting some resources (memory, cpu, network) storing / processing / transferring the random group.id info (more investigation needed). On the other hand - attempting to minimize the number of unique clients stored in __consumer_offsets by using fixed names is dangerous if also relying on enable.auto.commit = false as an accidentally (or maliciously) configured client could advance the commit offset on the fixed known group IDs - it's probably best to explicitly rewind or at least explicitly check the topic was played back from the beginning.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.