aws / aws-kinesisanalytics-flink-connectors Goto Github PK
View Code? Open in Web Editor NEWThis library contains various Apache Flink connectors to connect to AWS data sources and sinks.
License: Apache License 2.0
This library contains various Apache Flink connectors to connect to AWS data sources and sinks.
License: Apache License 2.0
Hello,
I have just migrated to Kinesis connector v2 and my Flink application works within my local Flink cluster but when I deploy it to Kinesis Data Analytics the application is having downtime and no error is reported.
I switched KDA log level to DEBUG and the only possibly relevant log is this one:
<groupId>fr.cbm</groupId>
<artifactId>lc-logs-bus</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<properties>
<mainClass>fr.cbm.EventDispatcherStreamingJob</mainClass>
<java.version>1.8</java.version>
<scala.binary.version>2.11</scala.binary.version>
<flink.version>1.8.3</flink.version>
<kda.version>1.1.0</kda.version>
<kda-connector.version>2.0.0</kda-connector.version>
<jackson.version>2.7.9</jackson.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-bom</artifactId>
<version>1.11.870O</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-dynamodb</artifactId>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-kinesisanalytics-runtime</artifactId>
<version>${kda.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kinesis_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-kinesisanalytics-flink</artifactId>
<version>${kda-connector.version}</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-logs</artifactId>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-sts</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jdk8</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-parameter-names</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jsr310</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>5.6.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<version>5.6.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.16</version>
</dependency>
<!-- https://mvnrepository.com/artifact/commons-lang/commons-lang -->
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
<version>2.6</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.22.2</version>
</plugin>
<!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
<!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.0.0</version>
<executions>
<!-- Run shade goal on package phase -->
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>org.apache.flink:force-shading</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>${mainClass}</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
Currently, when running under 1 CPU (such as using 1 KPU in Kinesis Data Analytics for Apache Flink), FlinkKinesisFirehoseProducer#invoke
is extremely inefficient. This is because of the call to handleAsync:
@Override
public void invoke(final OUT value, final Context context) throws Exception {
// ...
firehoseProducer
.addUserRecord(new Record().withData(serializedValue))
.handleAsync((record, throwable) -> {
// ...
});
}
When one looks at the implementation of handleAsync
and it's call chain:
public <U> CompletableFuture<U> handleAsync(
BiFunction<? super T, Throwable, ? extends U> fn) {
return uniHandleStage(defaultExecutor(), fn);
}
public Executor defaultExecutor() {
return ASYNC_POOL;
}
private static final Executor ASYNC_POOL = USE_COMMON_POOL ?
ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
private static final boolean USE_COMMON_POOL =
(ForkJoinPool.getCommonPoolParallelism() > 1);
static final class ThreadPerTaskExecutor implements Executor {
public void execute(Runnable r) { new Thread(r).start(); }
}
Thus, ThreadPerTaskExecutor
is being used when there is only 1 CPU available on the host machine. In my tests, I can see that it's creating 20k short-lived threads in one minute, which is very expensive in a tight loop:
The proposal is to not rely on the default behavior of the ForkJoinPool
and to instead pass an Executor
to limit the impact of this type of issue for resource limited hosts. Ideally this can be exposed to sink consumers so that it is configurable per the open-closed principle.
We had problems writing a busy stream to Firehose, since it seems that the limit of 500 records per PutRecordBatch is also the max number of records per second you can send.
With default settings, Firehose was getting throttled and the actual throughput was very low. This was solved by adding a delay in Flink before sending to the sink and now our throughtput almost reaches the limit of 1MB/s for our region.
I guess similar could be achieved by increasing bufferTimeoutBetweenFlushes
. Shouldn't this parameter be increased by default then or am I missing something else?
Hey folks, I'm baffled by the nonexistence of a popular SQS sink for Flink.... It's such a strange situation that I feel like I must be missing something obvious. Is it considered trivial to implement, or impossible to achieve commonality across diverse use cases, or what? :)
Currently, FirehoseProducer
performs linear backoff via:
//Full Jitter: https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/
long timeToSleep = RandomUtils.nextLong(0, Math.min(configuration.getMaxBackOffInMillis(), (configuration.getBaseBackOffInMillis() * 2 * attempts)));
While the code comment points to https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/, it fails to implement exponentiation:
Compare this with this code from FullJitterBackoff
's calculateFullJitterBackoff:
public long calculateFullJitterBackoff(long baseMillis, long maxMillis, double power, int attempt) {
long exponentialBackoff = (long) Math.min(maxMillis, baseMillis * Math.pow(power, attempt));
return (long) (seed.nextDouble() * exponentialBackoff);
}
Currently, the FirehoseProducer
has a very simple linear backoff model. When rate limiting on the Firehose stream is encountered, the recovery time could be greatly improved by implementing a strategy similar to AdaptivePollingRecordPublisher
's adaptRecordsToRead method:
/**
* Calculates how many records to read each time through the loop based on a target throughput
* and the measured frequenecy of the loop.
* @param runLoopTimeNanos The total time of one pass through the loop
* @param numRecords The number of records of the last read operation
* @param recordBatchSizeBytes The total batch size of the last read operation
* @param maxNumberOfRecordsPerFetch The current maxNumberOfRecordsPerFetch
*/
private int adaptRecordsToRead(long runLoopTimeNanos, int numRecords, long recordBatchSizeBytes,
int maxNumberOfRecordsPerFetch) {
if (numRecords != 0 && runLoopTimeNanos != 0) {
long averageRecordSizeBytes = recordBatchSizeBytes / numRecords;
// Adjust number of records to fetch from the shard depending on current average record size
// to optimize 2 Mb / sec read limits
double loopFrequencyHz = 1000000000.0d / runLoopTimeNanos;
double bytesPerRead = KINESIS_SHARD_BYTES_PER_SECOND_LIMIT / loopFrequencyHz;
maxNumberOfRecordsPerFetch = (int) (bytesPerRead / averageRecordSizeBytes);
// Ensure the value is greater than 0 and not more than 10000L
maxNumberOfRecordsPerFetch = Math.max(1, Math.min(maxNumberOfRecordsPerFetch, ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_MAX));
// Set metrics
metricsReporter.setLoopFrequencyHz(loopFrequencyHz);
metricsReporter.setBytesPerRead(bytesPerRead);
}
return maxNumberOfRecordsPerFetch;
}
Currently FlinkKinesisFirehoseProducer#createKinesisFirehoseClient
is package-private. This means it's not nice to create a refinement to instrument metrics, etc. against client operations. It would be great if the following were possible:
/**
* {@inheritDoc}
* <p>See <a href="https://github.com/aws/aws-kinesisanalytics-flink-connectors/issues/15">aws-kinesisanalytics-flink-connector #15</a>
* for description of why this is needed.
*/
@Nonnull
@Override
protected AmazonKinesisFirehose createKinesisFirehoseClient() {
return new MeteredAmazonKinesisFirehose(getRuntimeContext().getMetricGroup(), super.createKinesisFirehoseClient());
}
Currently, when FirehoseProducer#submitBatchWithRetry
invokes submitBatch
and there is at least one failure, the whole batch is retried, including successful records:
This will generally lead to a large number of unnecessary duplicate records ending up in the downstream Firehose. Instead, successful records should be removed and only failures retried. This will minimize duplication in the downstream.
Currently, the implementation of FlinkKinesisFirehoseProducer
does not call SerializationSchema#open
. This causes problems when a client SerializationSchema
needs to perform some initialization on open
(e.g. initialize metrics, etc.)
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.