Giter Club home page Giter Club logo

kafka_topology_it's Introduction

Message aggregation with Streams Processor API

Problem description

We need to aggregate messages coming in any order asynchronously in a kafka topic and output an aggregated message after the transformation.

Possible Solution

Messages are <Key, Value> pairs of binary data

our data model is as follows.

messages have keys that connect them logically.

they internally have information of the whole logical entity

  • message part e.g: 1 of 3
  • totalmessages [the number of messages that make up the whole entity]

A low level Kafka Streams API is used to aggregate the messages using a persisted (log backed - resilient) State Store.

The transformer collects the parts and if all present, emits the aggregated message and cleans the State Store.

A process is scheduled at Wall Clock time Intervals to check if incomplete messages exist in the State Store and expires them writing a Aggregation Message with an error to the downstream.

  • input m1...mn -> Ma (aggregated)
  • input m1...m(n-1) -> Me (aggregated erroneous/incomplete after timeout)

The transformer

public class OcrEventTransformer implements Transformer<String, OcrReadyEvent, KeyValue<String, OcrAggregatedEvent>> {

    private ProcessorContext context;
    private KeyValueStore<String, StoreItem> kvStore;

    private String stateStoreName;

    public OcrEventTransformer(String stateStoreName) {
        this.stateStoreName = stateStoreName;
    }

    @Override
    public void init(ProcessorContext context) {

        this.context = context;

        kvStore = (KeyValueStore) context.getStateStore(stateStoreName);

        this.context.schedule(Duration.ofMillis(1000), PunctuationType.WALL_CLOCK_TIME, (timestamp) -> invalidateExpiredMessages());

    }

    @Override
    public KeyValue<String, OcrAggregatedEvent> transform(String key, OcrReadyEvent event) {
        try {
            log.trace("transforming {} msg {} of {}", key, event.getPart(), event.getTotal());
            StoreItem storeItem = this.kvStore.get(key);
            if (storeItem == null) {
                storeItem = new StoreItem(event);
            } else {
                storeItem.incrementAndGetCount();
                storeItem.addEvent(event);
            }
            kvStore.put(key, storeItem);
            if (storeItem.isComplete()) {
                kvStore.delete(key);
                return KeyValue.pair(key, storeItem.finalise());
            }
        } catch (Exception e) {
            log.error("error in transformer");
            e.printStackTrace();
        }
        return null;
    }

    private void invalidateExpiredMessages() {
        try {
            log.trace("invalidateExpiredMessages....");
            KeyValueIterator<String, StoreItem> it = this.kvStore.all();
            while (it.hasNext()) {
                KeyValue<String, StoreItem> entry = it.next();
                log.trace("entry key {}", entry.key);
                if (entry.value.hasExpired()) {
                    log.trace("Invalidating message key: {} val {}", entry.key, entry.value);
                    String key = entry.key;
                    OcrAggregatedEvent event = entry.value.finalise("expired");
                    context.forward(key, event);
                    kvStore.delete(entry.key);
                }
            }
            it.close();
            context.commit();
        } catch (Exception e) {
            log.error("error in punctuator", e.getMessage());
        }
    }


    @Override
    public void close() {
    }
}
The SpringCloud Stream Integration in the DSL API
public class CustomAggregator {


    private static final String STATE_STORE_NAME = "ocr-store";

    @Bean
    public Function<KStream<String, OcrReadyEvent>, KStream<String, OcrReadyEvent>> customaggregator() {

        return input ->
                input
                  .transform((TransformerSupplier) () -> new OcrEventTransformer(STATE_STORE_NAME), Named.as("ocr-transformer"), STATE_STORE_NAME);
    }


    @Bean
    public StoreBuilder mystore() {
        return Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(STATE_STORE_NAME), new Serdes.StringSerde(), CustomSerdes.StoreItemSerde());
    }
}

Used technologis:

  • Springboot
  • Springcloud Kafka Streams
  • Springcloud Kafka Streams Binder
  • Embededkafka Integration Tests
  • TopologyTestDriver Unit Tests that are faster and address every aspect of the custom stream processing.

Embeded Kafka integration test and topology test for custom Streams Transformer with StateStore.

kafka_topology_it's People

Contributors

aballaci avatar

Stargazers

 avatar

Watchers

 avatar  avatar

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.