Giter Club home page Giter Club logo

flusswerk's People

Contributors

bitzl avatar clorenz avatar datazuul avatar dependabot-preview[bot] avatar dependabot-support avatar dependabot[bot] avatar jbaiter avatar morpheus-87 avatar schmika avatar stefan-it avatar tomzirn avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar

Forkers

lulzzz datazuul

flusswerk's Issues

Support one-step-processing

For most applications, the usual read/transform/write cycle is suitable (aka "ETL"), with sometimes leaving out a transforming or writing step. However, there are applications that merely convert messages or act as gateways and therefore harldy fit in that framework.

Introduce a Flow API that allows for a single processing step:

<M extends Message> Collection<Message> process(M m)

or

<M extends Message> Collection<Message> translate(M m)

Invalid JSON payload crashes worker thread

Passing an invalid JSON string as payload crashes the worker thread by terminating its endless loop.

Possible cure: In RabbitClient.receive, if the deserializer catches an exception, use response.getEnvelope().getDeliveryTag() to shift the message in to the failed queue and ACK it.

Replace FlowMessage by TraceableMessage

Flusswerk highly encourages the use of custom message implementations. As these were error prone, an abstract base class FlusswerkMessage was introduced, abstracting technical requirements. The idea was to make your implementation as safe and simple as possible without getting in your way.

To support tracing a processing flow there is a Message implementation FlowMessage. This class breaks the spirit in that it is a subclass of DefaultMessage and not an abstract technical base class anymore.

Solution

Deprecate/remove FlowMessage and Introduce TraceableMessage (here assuming #143 - removing id - is already implemented):

public abstract class TraceableMessage extends FlusswerkMessage {

  private final String tracingId;

  public TraceableMessage(String tracingId) {
    this.tracingId = tracingId;
  }

  public String getTracingId() {
    return tracingId;
  }

}

Make sure that Engine only starts once

Nothing prevents an application from calling Engine.start() multiple times with potentially undefined behavior. Engine should set an AtomicBoolean when start() or stop() are called and then either ignore calls or throw an exception (might be worth a flag for testing).

Remove generic id from Message

The Message interface has a generic type for an identifier attribute. Due to complex nested generics and different possible implementations, this generic type is mostly ommitted. At the same time, the interface's T getId() is never used outside of unit tests.

Applications build upon the framework are strongly encouraged to implement their own message classes and would not rely on the id property. There are applications that won't use the id property at all.

Suggested solution
Is it possible to remove the ID getId() property and the generic type parameter from the Message interface?

Now:

public interface Message<ID> {

  Envelope getEnvelope();

  ID getId();
}

New:

public interface Message {

  Envelope getEnvelope();

}

Simplify Engine: Remove appName/creation DefaultProcessReport

The Engine has a constructor parameterappName that is only used to create a new instance of DefaultProcessReport if no ProcessReport is given. Since the configuration is now supposed to be handled by ConfigurationProperties and Spring configurations, any creation of a ProcessReport should happen outside of Engine.

Todos:

  • Remove parameter appName
  • Remove construction of ProcessReport from Engine and make sure construction happens in FlusswerkConfiguration
  • Remove extra constructors

Errors sending messages should stop processing

Current
A Writer can send one or more messages by returning these. Currently any error sending these messages wir result in a retry of the whole Flow.

Expected
Errors sending messages should terminate the flow.

Details
Currently processing the flow, sending messages and ACKing the message all happens in the same line of error handling.

void process(Message receivedMessage) {
try {
Collection<? extends Message> messagesToSend = flow.process(receivedMessage);
if (flow.hasMessagesToSend()) {
for (Message messageToSend : messagesToSend) {
messageBroker.send(messageToSend);
}
}
messageBroker.ack(receivedMessage);
processReport.reportSuccess(receivedMessage);

Solutions

  1. Add explicit error handling for sending messages
  2. or move IOException to the catch block of StopProcessingException

Failed messages remain unacknowledged in origin queue

Observed with messages, which ran into a FinallyFailedProcessException: The number of messages, which were shifted into the failed queue is equal to the number of messages, which remain unacknowledged in the origin queue.

Rename Maven Artifacts

The Maven artifact names have to follow the recently introduced conventions and change the group-id to com.github.dbmdz.flusswerk.

As doing this is a breaking change, we then can also straighten the artifact ids and directory names:

module current artifact id new artifact id
parent/root dc-flusswerk-parent flusswerk
engine dc-flusswerk-engine framework
examples examples-plain examples-plain
examples (Spring Boot) examples-spring-boot examples-spring-boot
integration tests dc-flusswerk-integration-tests integration-tests
Spring Boot starter spring-boot-starter-flusswerk spring-boot-starter

Directories should be named same as the artifact id.

The Spring Boot examples would be better off in a top-level project as it is supposed to use the Spring Boot Maven parent if possible.

Improve reporting

The reporting is handled by implentations of ProcessReport. As the default implementation is quite verbose and includes the full message but no generic application information, upon common request an alternative CompactProcessReport was added.

CompactProcessReport should become the new default ProcessReporter. Engine might need an application name for construction then.

Remove HasFlowId and improve propagation logic

Problem
To trace a processing flow to a workflow graph one has to enable tracing via FlowBuilder.propagateFlowIds(true) and either use FlowMessage or implement the interface ?HasFlowId. This is complicated, as the information if tracing is desired is distributed (flag and interface). There are also multiple ways to use tracing in a message class (use FlowMessageorHasFlowId) with the less recommended approach beeing the most visible one (using FlowMessagemeans for most people just to useDefaultMessage`).

Solution
For tracing

  • check if incoming and outgoing messages implement TraceableMessage
  • check if incoming message has a value for flow id and set that for the outgoing message

Remove old tracing mechanisms:

  • remove FlowMessage as suggested in #147
  • remove HasFlowId
  • remove the tracing flag from FlowBuilder, Flow and Job

Questions
Can we have sanity checks if both incoming and outgoing message types are traceable messages on application start?

Improve FlowBuilderApi

The FlowBuilde api has quite a big surface in which developers can get a little lost. Therefore a simplified builder pattern is proposed:

Simple Construction
FlowBuilder should have no public constructor, only a minimal set of useful static construction methods (parameters are classes or types):

// FlowBuilder for Flow with Transformer Document → IndexRequest
FlowBuilder2.flow(IndexMessage.class, Document.class, IndexRequest.class);

// FlowBuilder for Flow with Transformer PublishingInfo → PublishingInfo (or no Transformer at all)
FlowBuilder2.flow(PublishingMessage.class, PublishingInfo.class);

// Using Type class to support generic types
FlowBuilder2.flow(
    new Type<SomeMessage>(){},
    new Type<Map<String, Item>>(){},
    new Type<List<Document>>(){});

Drop supplier support for Reader/Transformer/Writer
As Reader, Transformer and Writer are supposed to be stateless (and threadsafe), on early requests a supplier functionality was added. Instead of a Reader, Transformer or Writer instance, a developer could register a supplier providing new instances for each processed message.

This is rarely used now and often a sign of deeper problems in the involved components. At the same time, it doubles the amount of API methods and adds weird type errors. Therefore, suppliers should be dropped. In the rare cases such a behavior is needed, developers can implement a sufficient replacement in the respective Reader, Transformer or Writer.

Offer seperate APIs for ETL and Single Step Flows
If #151 (Support one-step-processing) is implemented, a one-step-flow should not offer to register a Reader,Transformer or Writer.

Stateful Builder API
In theory there can be a statemachine for creating

stateDiagram
    New: New Flow
    note right of New
      .read(Function<R, T> reader)
    end note

    Reader: Flow with Reader
    note right of Reader
      .transform(Function<T, W> transformer)
      .write(Consumer<W> writer)
      .write(Function<W, Message> writer)
      .write(Function<W, List<Message>> writer)
    end note

    Transformer: Flow with Reader and Transformer
    note left of Transformer
      .write(Consumer<W> writer)
      .write(Function<W, Message> writer)
      .write(Function<W, List<Message>> writer)
    end note


    Complete: Flow Complete
    note right of Complete
      .write(Consumer<W> writer)Complete
      .write(Function<W, Message> writer)
      .write(Function<W, List<Message>> writer)
    end note

    Monitoring: Has Monitoring
    note right of Monitoring
      .cleanup()
      .build()
    end note

    Cleanup: Has Cleanup Task
    note left of Cleanup
      .monitoring(Consumer<FlowStatus> monitoring)
      .build()
    end note

    [*] --> New: FlowBuilder.flow()
    New --> Reader: read()
    Reader --> Transformer: transform()
    Reader --> Complete: write()
    Transformer --> Complete: write()
    Complete --> Monitoring: monitoring()
    Complete --> Cleanup: cleanup()
    Monitoring --> Cleanup: cleanup()
    Cleanup --> Monitoring: monitoring()
    Cleanup --> [*]: build()
    Monitoring --> [*]: build()

Allow multiple input queues with precendence

Use Case

There are multiple projects with items to process. Because items from one project always have priority over others multiple queues are used. Processing these items takes a lot of resources, so running one worker for each queue is not feasible.

Suggested solution

Add the possibility to configure multiple input queues with precedence in the same order as the queues are added. Then check each queue in order for a new message until the first message is retrieved. This way, any subsequent queue will only be queried if the queue before was empty, ensuring expected precendence of processing.

Example:

MessageBroker messageBroker = new MessageBrokerBuilder()
        .readFrom("input.priority1", "input.priority2", "input.priority3")
        ...

would mean that first all messages from input.priority1 will be processed. When input.priority1 is empty, then messages from input.priority2 will be processed. And if input.priority1 and input.priority2 are both empty, then messages from input.priority3 will be processed.

Upgrade of Maven Checkstyle plugin to 3.1.0 breaks Maven build

The upgrade completely breaks the Maven build:

[ERROR] Failed to execute goal org.apache.maven.plugins:maven-checkstyle-plugin:3.1.0:check (validate) on project dc-flusswerk-parent: Failed during checkstyle configuration: cannot initialize module TreeWalker - cannot initialize module FileContentsHolder - Unable to instantiate 'FileContentsHolder' class, it is also not possible to instantiate it as .FileContentsHolder, FileContentsHolderCheck, .FileContentsHolderCheck. Please recheck that class name is specified as canonical name or read how to configure short name usage https://checkstyle.org/config.html#Packages. Please also recheck that provided ClassLoader to Checker is configured correctly. -> [Help 1]

Originally posted by @bitzl in #66 (comment)

Developer Experience: Message Base Names

Discussion

Currently, the recommended base class ist named FlusswerkMessage and there is a interface for traceable messages HasFlowId. According to current planning, there will be the current base message and a new one for messages with tracing ids.

Idea 1

Old New
FlusswerkMessage BaseMessage
HasFlowId TraceableMessage
FlowMessage TraceableMessage

Default Metrics collection (Spring Boot)

To monitor a Flusswerk applications performance Flow has a metrics callback that calls a Consumer<FlowStatus>. Most Spring Boot based applications will have the same implementation of a Metrics class using the Micrometer library or extend a similar Metrics implementation...

The Spring Boot Starter should provide a default Metrics implementation that can be overridden by defining a custom Metrics bean.

Example

public class Metrics implements Consumer<FlowStatus> {

  private final Map<Status, Counter> executionTime;
  private final Map<Status, Counter> processedItems;

  public Metrics(MeterFactory meterFactory) {
    this.executionTime = new EnumMap<>(Status.class);
    this.processedItems = new EnumMap<>(Status.class);

    for (Status status : Status.values()) {
      processedItems.put(status, meterFactory.counter("processed.items", status));
      executionTime.put(status, meterFactory.counter("execution.time", status));
    }
  }

  public void accept(FlowStatus flowStatus) {
    var status = flowStatus.getStatus();
    processedItems.get(status).increment();
    executionTime.get(status).increment(flowStatus.duration());
  }
}

Remove DefaultMessage

Problem

Flusswerk encourages implementation of custom messages that fit the respective applications. For a quick start, a DefaultMessage class is provided that has an String id and da Map<String, String> data.

This approach has several drawbacks:

Overuse of DefaultMessage in applications

There have are many applications where Developers use the HashMap<String, String> of DefaultMessage instead of a custom implementation, not only losing type safety and type information, but also revert to frequent String parsing and String comparison.

A custom message implementation also has a simpler JSON structure which greatly improves debugging and testing .

Mistaken use inside the Framework

The existence of DefaultMessage can be misleading on the Framework development itself, as to the confusing error handling implementation when receiving a message fails where a DefaultMessage is constructed, mimicking a broken message of arbitrary type:

} catch (Exception e) {
Message invalidMessage = new DefaultMessage();
invalidMessage.getEnvelope().setBody(body);
invalidMessage.getEnvelope().setDeliveryTag(response.getEnvelope().getDeliveryTag());
invalidMessage.getEnvelope().setSource(queueName);
throw new InvalidMessageException(invalidMessage, e.getMessage());

Solution

Remove DefaultMessage and require a custom message implementation. Almost all usage is only in examples and tests where a suitable implementation can be provided. For easy implementation of custom messages, the abstract FlusswerkMessage can be used (also TraceableMessage, see #147).

Refactor InvalidMessageException

The InvalidMessageException currently relies on a Message as container for all information, although it occours when parsing the incoming JSON fails. InvalidMessageException should either only rely on a JSON String or be replaced by a IOException.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.