Giter Club home page Giter Club logo

aws-kinesisanalytics-flink-connectors's People

Contributors

abseth-amzn avatar awslankakamal avatar blake-wilson avatar dannycranmer avatar dependabot[bot] avatar hyandell avatar vlanka avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

aws-kinesisanalytics-flink-connectors's Issues

Kinesis Analytics application having downtime after migrating to Kinesis connector v2 but no error is reported

Hello,
I have just migrated to Kinesis connector v2 and my Flink application works within my local Flink cluster but when I deploy it to Kinesis Data Analytics the application is having downtime and no error is reported.

I switched KDA log level to DEBUG and the only possibly relevant log is this one:

Relevant error log ` {"locationInformation":"org.apache.flink.runtime.exceptionclassifier.ExceptionClassifier.classify(ExceptionClassifier.java:112)","logger":"org.apache.flink.runtime.exceptionclassifier.ExceptionClassifier","message":"Exception type is USER from filter results [UserClassLoaderExceptionFilter -> NONE, UserAPIExceptionFilter -> NONE, UserSerializationExceptionFilter -> USER, UserFunctionExceptionFilter -> SKIPPED, OutOfMemoryExceptionFilter -> NONE, TooManyOpenFilesExceptionFilter -> NONE, KinesisServiceExceptionFilter -> NONE].","throwableInformation":["org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function.","\tat org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:239)","\tat org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:369)","\tat org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:296)","\tat org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:360)","\tat org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:296)","\tat org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:360)","\tat org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:296)","\tat org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:133)","\tat org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:275)","\tat org.apache.flink.runtime.taskmanager.Task.run(Task.java:714)","\tat java.lang.Thread.run(Thread.java:748)","Caused by: java.io.InvalidClassException: com.fasterxml.jackson.databind.AnnotationIntrospector; local class incompatible: stream classdesc serialVersionUID = -1733064445206768782, local class serialVersionUID = -8026823904936543468","\tat java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699)","\tat java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1964)","\tat java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1830)","\tat java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1964)","\tat java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1830)","\tat java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2121)","\tat java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647)","\tat java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366)","\tat java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290)","\tat java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148)","\tat java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647)","\tat java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366)","\tat java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290)","\tat java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148)","\tat java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647)","\tat java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366)","\tat java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290)","\tat java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148)","\tat java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647)","\tat java.io.ObjectInputStream.readArray(ObjectInputStream.java:2054)","\tat java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1635)","\tat java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366)","\tat java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290)","\tat java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148)","\tat java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647)","\tat java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2366)","\tat java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290)","\tat java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2148)","\tat java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647)","\tat java.io.ObjectInputStream.readObject(ObjectInputStream.java:483)","\tat java.io.ObjectInputStream.readObject(ObjectInputStream.java:441)","\tat org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:566)","\tat org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:552)","\tat org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:540)","\tat org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:501)","\tat org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:224)","\t... 10 more"],"threadName":"flink-akka.actor.default-dispatcher-167","applicationVersionId":"64","messageSchemaVersion":"1","messageType":"INFO"} `
pom.xml I used for the app module 4.0.0
<groupId>fr.cbm</groupId>
<artifactId>lc-logs-bus</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>

<properties>
    <mainClass>fr.cbm.EventDispatcherStreamingJob</mainClass>
    <java.version>1.8</java.version>
    <scala.binary.version>2.11</scala.binary.version>
    <flink.version>1.8.3</flink.version>
    <kda.version>1.1.0</kda.version>
    <kda-connector.version>2.0.0</kda-connector.version>
    <jackson.version>2.7.9</jackson.version>
</properties>

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>com.amazonaws</groupId>
            <artifactId>aws-java-sdk-bom</artifactId>
            <version>1.11.870O</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

<dependencies>
    <dependency>
        <groupId>com.amazonaws</groupId>
        <artifactId>aws-java-sdk-dynamodb</artifactId>
    </dependency>

    <dependency>
        <groupId>com.amazonaws</groupId>
        <artifactId>aws-kinesisanalytics-runtime</artifactId>
        <version>${kda.version}</version>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kinesis_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
        <scope>provided</scope>
    </dependency>

    <dependency>
        <groupId>com.amazonaws</groupId>
        <artifactId>aws-kinesisanalytics-flink</artifactId>
        <version>${kda-connector.version}</version>
    </dependency>

    <dependency>
        <groupId>com.amazonaws</groupId>
        <artifactId>aws-java-sdk-logs</artifactId>
    </dependency>

    <dependency>
        <groupId>com.amazonaws</groupId>
        <artifactId>aws-java-sdk-sts</artifactId>
    </dependency>

    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-core</artifactId>
        <version>${jackson.version}</version>
    </dependency>

    <dependency>
        <groupId>com.fasterxml.jackson.datatype</groupId>
        <artifactId>jackson-datatype-jdk8</artifactId>
        <version>${jackson.version}</version>
    </dependency>

    <dependency>
        <groupId>com.fasterxml.jackson.module</groupId>
        <artifactId>jackson-module-parameter-names</artifactId>
        <version>${jackson.version}</version>
    </dependency>

    <dependency>
        <groupId>com.fasterxml.jackson.datatype</groupId>
        <artifactId>jackson-datatype-jsr310</artifactId>
        <version>${jackson.version}</version>
    </dependency>

    <dependency>
        <groupId>org.junit.jupiter</groupId>
        <artifactId>junit-jupiter-api</artifactId>
        <version>5.6.0</version>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.junit.jupiter</groupId>
        <artifactId>junit-jupiter-engine</artifactId>
        <version>5.6.0</version>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <version>1.7.16</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/commons-lang/commons-lang -->
    <dependency>
        <groupId>commons-lang</groupId>
        <artifactId>commons-lang</artifactId>
        <version>2.6</version>
    </dependency>

</dependencies>

<build>

    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.1</version>
            <configuration>
                <source>${java.version}</source>
                <target>${java.version}</target>
            </configuration>
        </plugin>

        <plugin>
            <artifactId>maven-surefire-plugin</artifactId>
            <version>2.22.2</version>
        </plugin>

        <!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
        <!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-shade-plugin</artifactId>
            <version>3.0.0</version>
            <executions>
                <!-- Run shade goal on package phase -->
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>shade</goal>
                    </goals>
                    <configuration>
                        <artifactSet>
                            <excludes>
                                <exclude>org.apache.flink:force-shading</exclude>
                                <exclude>org.slf4j:*</exclude>
                                <exclude>log4j:*</exclude>
                            </excludes>
                        </artifactSet>
                        <filters>
                            <filter>
                                <!-- Do not copy the signatures in the META-INF folder.
                                Otherwise, this might cause SecurityExceptions when using the JAR. -->
                                <artifact>*:*</artifact>
                                <excludes>
                                    <exclude>META-INF/*.SF</exclude>
                                    <exclude>META-INF/*.DSA</exclude>
                                    <exclude>META-INF/*.RSA</exclude>
                                </excludes>
                            </filter>
                        </filters>
                        <transformers>
                            <transformer
                                    implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                <mainClass>${mainClass}</mainClass>
                            </transformer>
                        </transformers>
                    </configuration>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

`FlinkKinesisFirehoseProducer#invoke` creates a new thread for each added record for single core architectures leading to very high resource utilization

Currently, when running under 1 CPU (such as using 1 KPU in Kinesis Data Analytics for Apache Flink), FlinkKinesisFirehoseProducer#invoke is extremely inefficient. This is because of the call to handleAsync:

 @Override
    public void invoke(final OUT value, final Context context) throws Exception {
        // ...
        firehoseProducer
                .addUserRecord(new Record().withData(serializedValue))
                .handleAsync((record, throwable) -> {
                    // ...
                });
    }

When one looks at the implementation of handleAsync and it's call chain:

    public <U> CompletableFuture<U> handleAsync(
        BiFunction<? super T, Throwable, ? extends U> fn) {
        return uniHandleStage(defaultExecutor(), fn);
    }

    public Executor defaultExecutor() {
        return ASYNC_POOL;
    }

    private static final Executor ASYNC_POOL = USE_COMMON_POOL ?
        ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();

    private static final boolean USE_COMMON_POOL =
        (ForkJoinPool.getCommonPoolParallelism() > 1);       

    static final class ThreadPerTaskExecutor implements Executor {
        public void execute(Runnable r) { new Thread(r).start(); }
    }

Thus, ThreadPerTaskExecutor is being used when there is only 1 CPU available on the host machine. In my tests, I can see that it's creating 20k short-lived threads in one minute, which is very expensive in a tight loop:
image

The proposal is to not rely on the default behavior of the ForkJoinPool and to instead pass an Executor to limit the impact of this type of issue for resource limited hosts. Ideally this can be exposed to sink consumers so that it is configurable per the open-closed principle.

Too low interval between flushes

We had problems writing a busy stream to Firehose, since it seems that the limit of 500 records per PutRecordBatch is also the max number of records per second you can send.

With default settings, Firehose was getting throttled and the actual throughput was very low. This was solved by adding a delay in Flink before sending to the sink and now our throughtput almost reaches the limit of 1MB/s for our region.

I guess similar could be achieved by increasing bufferTimeoutBetweenFlushes. Shouldn't this parameter be increased by default then or am I missing something else?

SQS?

Hey folks, I'm baffled by the nonexistence of a popular SQS sink for Flink.... It's such a strange situation that I feel like I must be missing something obvious. Is it considered trivial to implement, or impossible to achieve commonality across diverse use cases, or what? :)

Add support for exponential backoff in `FirehoseProducer`

Currently, FirehoseProducer performs linear backoff via:

//Full Jitter: https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/
long timeToSleep = RandomUtils.nextLong(0, Math.min(configuration.getMaxBackOffInMillis(), (configuration.getBaseBackOffInMillis() * 2 * attempts)));

While the code comment points to https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/, it fails to implement exponentiation:
image

Compare this with this code from FullJitterBackoff's calculateFullJitterBackoff:

public long calculateFullJitterBackoff(long baseMillis, long maxMillis, double power, int attempt) {
   long exponentialBackoff = (long) Math.min(maxMillis, baseMillis * Math.pow(power, attempt));
   return (long) (seed.nextDouble() * exponentialBackoff);
}

Add support for the Firehose equivalent of `AdaptivePollingRecordPublisher`

Currently, the FirehoseProducer has a very simple linear backoff model. When rate limiting on the Firehose stream is encountered, the recovery time could be greatly improved by implementing a strategy similar to AdaptivePollingRecordPublisher's adaptRecordsToRead method:

	/**
	 * Calculates how many records to read each time through the loop based on a target throughput
	 * and the measured frequenecy of the loop.
	 * @param runLoopTimeNanos The total time of one pass through the loop
	 * @param numRecords The number of records of the last read operation
	 * @param recordBatchSizeBytes The total batch size of the last read operation
	 * @param maxNumberOfRecordsPerFetch The current maxNumberOfRecordsPerFetch
	 */
	private int adaptRecordsToRead(long runLoopTimeNanos, int numRecords, long recordBatchSizeBytes,
			int maxNumberOfRecordsPerFetch) {
		if (numRecords != 0 && runLoopTimeNanos != 0) {
			long averageRecordSizeBytes = recordBatchSizeBytes / numRecords;
			// Adjust number of records to fetch from the shard depending on current average record size
			// to optimize 2 Mb / sec read limits
			double loopFrequencyHz = 1000000000.0d / runLoopTimeNanos;
			double bytesPerRead = KINESIS_SHARD_BYTES_PER_SECOND_LIMIT / loopFrequencyHz;
			maxNumberOfRecordsPerFetch = (int) (bytesPerRead / averageRecordSizeBytes);
			// Ensure the value is greater than 0 and not more than 10000L
			maxNumberOfRecordsPerFetch = Math.max(1, Math.min(maxNumberOfRecordsPerFetch, ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_MAX));

			// Set metrics
			metricsReporter.setLoopFrequencyHz(loopFrequencyHz);
			metricsReporter.setBytesPerRead(bytesPerRead);
		}
		return maxNumberOfRecordsPerFetch;
	}

Make `FlinkKinesisFirehoseProducer#createKinesisFirehoseClient` protected to allow for instrumentation

Currently FlinkKinesisFirehoseProducer#createKinesisFirehoseClient is package-private. This means it's not nice to create a refinement to instrument metrics, etc. against client operations. It would be great if the following were possible:

    /**
     * {@inheritDoc}
     * <p>See <a href="https://github.com/aws/aws-kinesisanalytics-flink-connectors/issues/15">aws-kinesisanalytics-flink-connector #15</a>
     * for description of why this is needed.
     */
    @Nonnull
    @Override
    protected AmazonKinesisFirehose createKinesisFirehoseClient() {
        return new MeteredAmazonKinesisFirehose(getRuntimeContext().getMetricGroup(), super.createKinesisFirehoseClient());
    }

`FirehoseProducer#submitBatchWithRetry` resubmits successful records when a batch attempt has partial failures

Currently, when FirehoseProducer#submitBatchWithRetry invokes submitBatch and there is at least one failure, the whole batch is retried, including successful records:

This will generally lead to a large number of unnecessary duplicate records ending up in the downstream Firehose. Instead, successful records should be removed and only failures retried. This will minimize duplication in the downstream.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.