Giter Club home page Giter Club logo

flink-connectors's Introduction

Pravega Build Status codecov License Version CII Best Practices

Pravega is an open source distributed storage service implementing Streams. It offers Stream as the main primitive for the foundation of reliable storage systems: a high-performance, durable, elastic, and unlimited append-only byte stream with strict ordering and consistency.

To learn more about Pravega, visit https://pravega.io

Prerequisites

  • Java 11+

In spite of the requirements of using JDK 11+ to build this project, client artifacts (and its dependencies) must be compatible with a Java 8 runtime. All other components are built and ran using JDK11+.

The clientJavaVersion project property determines the version used to build the client (defaults to 8).

Building Pravega

Checkout the source code:

git clone https://github.com/pravega/pravega.git
cd pravega

Build the pravega distribution:

./gradlew distribution

Install pravega jar files into the local maven repository. This is handy for running the pravega-samples locally against a custom version of pravega.

./gradlew install

Running unit tests:

./gradlew test

Setting up your IDE

Pravega uses Project Lombok so you should ensure you have your IDE setup with the required plugins. Using IntelliJ is recommended.

To import the source into IntelliJ:

  1. Import the project directory into IntelliJ IDE. It will automatically detect the gradle project and import things correctly.
  2. Enable Annotation Processing by going to Build, Execution, Deployment -> Compiler > Annotation Processors and checking 'Enable annotation processing'.
  3. Install the Lombok Plugin. This can be found in Preferences -> Plugins. Restart your IDE.
  4. Pravega should now compile properly.

For eclipse, you can generate eclipse project files by running ./gradlew eclipse.

Note: Some unit tests will create (and delete) a significant amount of files. For improved performance on Windows machines, be sure to add the appropriate 'Microsoft Defender' exclusion.

Releases

The latest pravega releases can be found on the Github Release project page.

Snapshot artifacts

All snapshot artifacts from master and release branches are available in GitHub Packages Registry

Add the following to your repositories list and import dependencies as usual.

maven {
    url "https://maven.pkg.github.com/pravega/pravega"
    credentials {
        username = "pravega-public"
        password = "\u0067\u0068\u0070\u005F\u0048\u0034\u0046\u0079\u0047\u005A\u0031\u006B\u0056\u0030\u0051\u0070\u006B\u0079\u0058\u006D\u0035\u0063\u0034\u0055\u0033\u006E\u0032\u0065\u0078\u0039\u0032\u0046\u006E\u0071\u0033\u0053\u0046\u0076\u005A\u0049"
    }
}

Note GitHub Packages requires authentication to download packages thus credentials above are required. Use the provided password as is, please do not decode it.

If you need a dedicated token to use in your repository (and GitHub Actions) please reach out to us.

As alternative option you can use JitPack (https://jitpack.io/#pravega/pravega) to get pre-release artifacts.

Quick Start

Read Getting Started page for more information, and also visit sample-apps repo for more applications.

Running Pravega

Pravega can be installed locally or in a distributed environment. The installation and deployment of pravega is covered in the Running Pravega guide.

Support

Don’t hesitate to ask! Contact the developers and community on slack (signup) if you need any help. Open an issue if you found a bug on Github Issues.

Documentation

The Pravega documentation is hosted on the website: https://pravega.io/docs/latest or in the documentation directory of the source code.

Contributing

Become one of the contributors! We thrive to build a welcoming and open community for anyone who wants to use the system or contribute to it. Here we describe how to contribute to Pravega! You can see the roadmap document here.

About

Pravega is 100% open source and community-driven. All components are available under Apache 2 License on GitHub.

flink-connectors's People

Contributors

alexanderzhao1 avatar andreykoltsov1997 avatar aparnarr avatar arvindkandhare avatar charlielchen avatar chrisdail avatar crazyzhou avatar derekm avatar elizabethbain avatar empcl avatar eronwright avatar fpj avatar fyang86 avatar gamalhot avatar guangfeng-xu avatar hldnova avatar jdmaguire avatar jonny-miller avatar maddisondavid avatar miomiomiomio avatar pavanbhat21 avatar raulgracia avatar skrishnappa avatar stephanewen avatar thekingofcity avatar tzulitai avatar vijikarthi avatar welkin-y avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

flink-connectors's Issues

Exception thrown while trying to connect to controller.

Problem description

        at io.pravega.client.stream.impl.ControllerResolverFactory.lambda$newNameResolver$103(ControllerResolverFactory.java:68)
        at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
        at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1374)
        at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
        at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
        at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
        at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
        at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
        at io.pravega.client.stream.impl.ControllerResolverFactory.newNameResolver(ControllerResolverFactory.java:69)
        at io.grpc.internal.ManagedChannelImpl.getNameResolver(ManagedChannelImpl.java:440)
        at io.grpc.internal.ManagedChannelImpl.<init>(ManagedChannelImpl.java:394)
        at io.grpc.internal.AbstractManagedChannelImplBuilder.build(AbstractManagedChannelImplBuilder.java:350)
        at io.pravega.client.stream.impl.ControllerImpl.<init>(ControllerImpl.java:140)
        at io.pravega.client.stream.impl.ControllerImpl.<init>(ControllerImpl.java:114)
        at io.pravega.client.admin.impl.StreamManagerImpl.<init>(StreamManagerImpl.java:35)
        at io.pravega.client.admin.StreamManager.create(StreamManager.java:28)
        at io.pravega.connectors.flink.utils.SetupUtils.createTestStream(SetupUtils.java:126)

Problem location
SetupUtils.java

In SetupUtils getControllerUri method:

        return URI.create("tcp://" + this.inProcPravegaCluster.getControllerURI());
    }

It expects inProcPravegaCluster.getControllerUri to be only IP:port value.
However, "tcp://" prefix is already added by inprocPravegaCluster
This causes uri to look like "tcp://tcp:// ..."

There is also a bug in InProcPravegaCluster where it adds ports[0] twice whereby making the return value look like:
"tcp://localhost:ports[0], localhost:ports[0], localhost:ports[1] ..."
So even though flink connector tests start only one controller, they get a comma separated value of
"tcp://localhost:<ports[0]>, localhost:<ports[0]>". Though this is harmless.

Suggestions for an improvement
Fix the issue in setupUtils and InProcPravegaCluster.
InprocPravegaCluster should not include ports[0] twice..
SetupUtils should not add tcp:// as it is already added to the returned value..

Improve fairness of initial allocation of segments

Problem description
When the Flink job starts, the readers acquire segments as they start up. The first reader to start is being overly greedy by acquiring all segments and starving the other readers. Rebalancing eventually occurs as other readers come online.

This is an optimization issue not a functional problem.

Problem location
FlinkPravegaReader

Suggestions for an improvement
The problem is basically a race between reader initialization and segment acquisition. Since the number of subtasks (reader instances) is known when the reader group is created, their names could be pre-registered with the reader group during its initialization. This implies that the names be made stable (issue #16).

Here's a log showing that reader 1 (of 4) acquires all segments before readers 2..4 come online. Also shown is subsequent rebalancing.

2017-05-08 16:58:46,501 28409 [Source: Custom Source (2/4)] INFO  i.p.c.flink.FlinkPravegaReader - Starting Pravega reader 'Source: Custom Source (2/4)' for controller URI tcp://localhost:32770
2017-05-08 16:58:46,506 28414 [flink-akka.actor.default-dispatcher-7] INFO  o.a.f.r.e.ExecutionGraph - Source: Custom Source (1/4) (4d19a3c2c16c6f6086f6b235f4335dbc) switched from DEPLOYING to RUNNING.
2017-05-08 16:58:46,509 28417 [flink-akka.actor.default-dispatcher-2] INFO  o.a.f.r.e.ExecutionGraph - Source: Custom Source (2/4) (86099972408624a6918951a22530243d) switched from DEPLOYING to RUNNING.
2017-05-08 16:58:46,511 28419 [Source: Custom Source (2/4)] INFO  i.p.c.s.impl.EventStreamReaderImpl - EventStreamReaderImpl( id=Source: Custom Source (2/4)) acquiring segments {Segment(scope=scope, streamName=ShlxqXWdXvXzjGdpvZKJ, segmentNumber=2)=10236, Segment(scope=scope, streamName=ShlxqXWdXvXzjGdpvZKJ, segmentNumber=3)=9996, Segment(scope=scope, streamName=ShlxqXWdXvXzjGdpvZKJ, segmentNumber=0)=10308, Segment(scope=scope, streamName=ShlxqXWdXvXzjGdpvZKJ, segmentNumber=1)=10308}
2017-05-08 16:58:46,518 28426 [Source: Custom Source (3/4)] INFO  i.p.c.flink.FlinkPravegaReader - Starting Pravega reader 'Source: Custom Source (3/4)' for controller URI tcp://localhost:32770
2017-05-08 16:58:46,547 28455 [flink-akka.actor.default-dispatcher-6] INFO  o.a.f.r.e.ExecutionGraph - Source: Custom Source (3/4) (30fd38ce163e51e765d3b236b522c811) switched from DEPLOYING to RUNNING.
2017-05-08 16:58:46,556 28464 [Source: Custom Source (1/4)] INFO  i.p.c.flink.FlinkPravegaReader - Starting Pravega reader 'Source: Custom Source (1/4)' for controller URI tcp://localhost:32770
2017-05-08 16:58:46,558 28466 [Source: Custom Source (4/4)] INFO  i.p.c.flink.FlinkPravegaReader - Starting Pravega reader 'Source: Custom Source (4/4)' for controller URI tcp://localhost:32770
2017-05-08 16:58:46,608 28516 [flink-akka.actor.default-dispatcher-6] INFO  o.a.f.r.e.ExecutionGraph - Source: Custom Source (4/4) (03d62d7c81dae731e685adeca1175177) switched from DEPLOYING to RUNNING.
2017-05-08 16:58:46,610 28518 [flink-akka.actor.default-dispatcher-6] INFO  o.a.f.r.e.ExecutionGraph - Map -> Sink: Unnamed (1/1) (9aff8650dc230d80764275ab6a5e2dc2) switched from DEPLOYING to RUNNING.
...
2017-05-08 16:58:53,142 25485 [Source: Custom Source (4/4)] DEBUG i.p.c.flink.FlinkPravegaReader - Emitting: 3858
2017-05-08 16:58:53,144 25487 [Source: Custom Source (4/4)] DEBUG i.p.c.flink.FlinkPravegaReader - Emitting: 3859
2017-05-08 16:58:54,723 28631 [Map -> Sink: Unnamed (1/1)] DEBUG i.p.c.f.u.IntSequenceExactlyOnceValidator - IntSequenceExactlyOnceValidator - received element: 3858
2017-05-08 16:58:54,723 28631 [Map -> Sink: Unnamed (1/1)] DEBUG i.p.c.f.u.IntSequenceExactlyOnceValidator - IntSequenceExactlyOnceValidator - received element: 3859
...
2017-05-08 16:59:23,314 25657 [Source: Custom Source (4/4)] INFO  i.p.c.s.impl.EventStreamReaderImpl - EventStreamReaderImpl( id=Source: Custom Source (4/4)) releasing segment Segment(scope=scope, streamName=OwKBLcwfDwfhNkhIqXuK, segmentNumber=2)
2017-05-08 16:59:23,332 25675 [Source: Custom Source (1/4)] INFO  i.p.c.s.impl.EventStreamReaderImpl - EventStreamReaderImpl( id=Source: Custom Source (1/4)) acquiring segments {Segment(scope=scope, streamName=OwKBLcwfDwfhNkhIqXuK, segmentNumber=2)=4572}
...
2017-05-08 16:59:56,142 25485 [Source: Custom Source (1/4)] DEBUG i.p.c.flink.FlinkPravegaReader - Emitting: 4002
2017-05-08 16:59:56,144 25487 [Source: Custom Source (4/4)] DEBUG i.p.c.flink.FlinkPravegaReader - Emitting: 4021
...

Disable Pravega automatic checkpointing

Problem description
Since the connector explicitly invokes checkpoints, the automatic checkpoint feature is unnecessary and causes an issue due to an assumption within the connector about the checkpoint name.

See related: Pravega/issues/Automatic checkpoints

Problem location
FlinkPravegaReader

Suggestions for an improvement

  • disable automatic checkpointing using the reader group config.

Build against stable Flink version 1.3.1

Problem description

The current build goes against the 1.3-SNAPSHOT version. That is not a stable release and only published to the nightly snapshot repositories. It should not be relied upon in a stable release of this connector.

Problem location

Flink version at gradle.properties.

Suggestions for an improvement

Change the Flink version to 1.3.1.

Integrate flink end-to-end tests with Travis

Suggestion came from slack around integrating testing of the flink connectors using the flink end-to-end tests.

From Stephan:

Or we change the build setup of the flink-connectors repository a bit:

  • Travis would not call gradle directly, but instead run a test driver script
  • That script would trigger initially gradle, but also run some end-to-end tests that download flink, take the compiled connector, start flink, submit a job, parse results, etc.

Implement 'fixed' event router

Problem description
To use the Pravega sink, an instance of PravegaEventRouter must be provided. Develop a predefined implementation, similar to FlinkFixedPartitioner, that maps each Flink partition to a routing key.

Consider whether it would be possible to support parallelism changes to the sink.

This approach is one of many that could be imagined to make sense of the ordering of events produced to a stream by a Flink program. A program may ensure that all events for a given key arrive at the same sink instance by leveraging a keyed stream, e.g. stream.keyBy(...).addSink(..). Some event-time reordering logic would also be needed to establish a per-key ordering.

See org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner for background.

Suggestions

  • Evolve the PravegaEventRouter interface with an open(int parallelInstanceId, int parallelInstances) method.

Pravega txn API changes

Problem description
In this commit:

pravega/pravega@fc27917

we changed the txn API to remove the various timeouts in the signature. For now, we have moved them to configuration, but we should expect some further changes.

Problem location
Sink connector.

Suggestions for an improvement
Change the use of txns according to the API changes.

Publish the effective POM

Problem description
The wrong POM is being published, it should be the shaded/effective POM. The unshaded POM shows dependencies (e.g. on the Pravega client) that shouldn't be there after shading.

Problem location
Gradle build

Suggestions for an improvement

Remove "TCCL" workaround code

Problem description
For issue #24 we added some code to workaround a bug in Flink builds prior to RC2. Once the connector is updated to depend on RC2+, we can remove the workaround code.

Update connector build to support Flink 1.4.0

Problem description

There are couple of breaking changes in Flink 1.4 code base (FLINK-7323, FLINK-7548) that will impact running flink/pravega connector on earlier version (1.3.1). This requires an update to the connector project to support Flink 1.4.0 binary.

Use a builder for creating Pravega Reader/Writer

Problem description

During discussion for #30 we noticed the number of options to the constructors required for flink. It may be a nicer experience to have a builder to create readers and writers instead of using constructors. This may lead to better code reuse.

Suggestions for an improvement

Look into using a builder for construction of the flink connectors. Potential items to be handled by the builder:

  • controller info
  • stream id (as StreamId or as separate scope/stream name). Note that the reader accepts multiple stream names. Should it be possible to read from numerous scopes?
  • writer modes (best-effort, at-least-once, exactly-once)
  • start time
  • deserialization schema
  • 'reader name' (better understood as operator name)
  • event read timeout
  • checkpoint initiate timeout

Add logging support to tests

Problem description
All tests are currently timing out in my setup. I went to build looking for logs and didn't find any. It would be useful to have slf4j logging dumped by default to files so that we can inspect in the case of test failures.

Problem location
Tests, logging.

Suggestions for an improvement
Add slf4j logging and configuration.

Validate correctness of ByteBuffer / byte[] interchange

Problem description

The bridge between Pravega and Flink serializers switches between ByteBuffer and byte[]. It makes the assumption that the provided byte buffers are always heap-backed and that the payload of the byte buffer is the complete backing array (no position / mark / limit / slice).

Problem location

In the FlinkPravegaUtils$FlinkDeserializer.deserialize() method.

Suggestions for an improvement

We should have sanity checks that validate that the byte buffer spans the full backing array (position is zero, limit is capacity).

Develop a unit test for FlinkPravegaReader

Problem description
The FlinkPravegaReader has a test FlinkPravegaReaderTest that is best understood as an integration test. Coverage is too low.

Suggestions for an improvement

  • Rename FlinkPravegaReader to FlinkPravegaReaderITCase.
  • Develop a unit test that uses a mock Pravega EventStreamReader.

Implement watermark idleness

Problem description
Subtasks that are without assigned segments/shards/partitions for an indefinite period should enter an idle state. Otherwise downstream operators may stall waiting for watermark progression. See FLINK-5017.

From SourceFunction.SourceContext:

/**
 * Marks the source to be temporarily idle. This tells the system that this source will
 * temporarily stop emitting records and watermarks for an indefinite amount of time. This
 * is only relevant when running on {@link TimeCharacteristic#IngestionTime} and
 * {@link TimeCharacteristic#EventTime}, allowing downstream tasks to advance their
 * watermarks without the need to wait for watermarks from this source while it is idle.
 *
 * <p>Source functions should make a best effort to call this method as soon as they
 * acknowledge themselves to be idle. The system will consider the source to resume activity
 * again once {@link SourceContext#collect(T)}, {@link SourceContext#collectWithTimestamp(T, long)},
 * or {@link SourceContext#emitWatermark(Watermark)} is called to emit elements or watermarks from the source.
 */
 @PublicEvolving
 void markAsTemporarilyIdle();

Problem location
FlinkPravegaReader

Suggestions for an improvement
Call markAsTemporarilyIdle whenever the Pravega reader has no segments assigned.

Improve watermark handling (per-segment watermark)

Problem description
Time is a first-class aspect of the Flink Streaming programming model. The progress of time is based on watermarks that typically track the timestamps seen in records as they're emitted by the source. The system works best if sources emit records in roughly the order they were received in; in other words, Flink's ability to reorder out-of-order elements is limited. Two problems occur when a source emits records in a highly unordered fashion.

  1. Any record with a timestamp older than the current watermark is considered a 'late' record which requires special handling by the application.
  2. Any record with a timestamp newer than the current watermark is buffered in operator state. Depending on the state backend, this can increase memory and/or disk usage.

The Pravega source already processes segments in order; a successor won't be processed until its predecessors have been. However, segments that meet that criteria are simply processed in the order that the reader group encounters them. The ordering isn't strong enough to avoid emission of late records.

Another cause of out-of-orderness is the fact that some segments are much longer than others, combined with the fact that a segment is processed completely once it is assigned to a reader. For example, imagine that segment A contains a full day's worth of records, while segment B was split into segments C and D at mid-day due to a scale event. A given reader may well emit all of A before tackling C/D.

This problem asserts itself mainly in historical processing scenarios.

Suggestions for an improvement

  1. Consider prioritizing unassigned segments by their start time.
  2. Consider introducing a per-segment watermark assigner, with automatic min-watermark calculation across segments. This may help with problem (1). See also Kafka Per-Partition Watermarks feature docs.
  3. Consider employing a segment selection policy that would discourage a given reader from assigning itself a segment that contains elements older than the reader's current watermark. For example, once a segment has been processed, store the per-segment watermark into reader group state as the effective 'start time' of the successors.

Unit testing for reader and writer

Problem description
The existing tests do not provide adequate coverage because they're unable to simulate a wide variety of edge cases. For example, the writer has logic for write failures and async writes that isn't exercised.

Suggestions for an improvement
Develop a new set of tests that use a mock Pravega client and the Flink-provided operator test harness. See io.pravega.connectors.flink.EventTimeOrderingOperatorTest for an example.

Consider moving those tests that rely on an embedded Pravega server to intTest.

Reader ID stability

Problem description
Each subtask instance of FlinkPravegaReader creates a separate reader as part of the overall reader group. The reader ID is presently derived from UUID.randomUUID(), but would benefit from simply being derived from the subtask ID. This would improve readability of logs and nicely relate Flink concepts to Pravega concepts. No naming conflict is expected because a separate reader group is created for each Flink job.

Problem location
FlinkPravegaReader

Suggestions for an improvement
In FlinkPravegaReader::run, simply use the following to initialize the reader ID:

final String readerId = getRuntimeContext().getTaskNameWithSubtasks();

This would produce logs like the following:

2017-05-08 16:58:46,501 28409 [Source: Custom Source (2/4)] INFO  i.p.c.flink.FlinkPravegaReader - Starting Pravega reader 'Source: Custom Source (2/4)' for controller URI tcp://localhost:32770
2017-05-08 16:58:46,556 28464 [Source: Custom Source (1/4)] INFO  i.p.c.flink.FlinkPravegaReader - Starting Pravega reader 'Source: Custom Source (1/4)' for controller URI tcp://localhost:32770

Instead of:

2017-05-08 17:54:31,892 19956 [Source: Custom Source (4/4)] INFO  i.p.c.flink.FlinkPravegaReader - Starting Pravega reader 'flink-reader-9ac3a8bd-711c-448d-9e0a-2e3f59b7967e' for controller URI tcp://localhost:32770

Note that the term Custom Source stems from the application-specific source name provided via the DSL, e.g.:

env.addSource(new FlinkPravegaReader(...)).name("Custom Source")

Implement table source (batch)

Problem description
As a follow-up to #32, extend FlinkPravegaTableSource to support the batch API, based on the underlying FlinkPravegaInputFormat.

Please note that Flink's in-built CsvTableSource implements both StreamTableSource and BatchTableSource. We want the same here.

Treat Pravega as a source dependency

Problem description
Fix the travis build of the connector to build Pravega automatically, so that the connector may be unit tested by travis.

Suggestions for an improvement
Consider using a submodule plus a composite build to incorporate the Pravega build into the connector build.

Implement table connector (streaming only)

Problem description
A Flink table connector (TableSource/TableSink) is another type of connector, needed to support Flink's Table/SQL library. Two variants exist, one for integration with the streaming API (e.g. StreamTableSource), the other for the batch API (e.g. BatchTableSource). The implementation typically builds on the classic streaming or batch connector.

Implementation Details
The duty of a table source is to return a DataStream<Row> where Row is a core data type of Flink Table. A typical table source takes a DeserializationSchema<Row> as a parameter, to support conversion from an arbitrary byte message to a Row. Flink provides JsonRowDeserializationSchema and AvroRowDeserializationSchema for convenience.

A table sink is similar to a source but has some interesting implementation details, building on the concept of Streams & Tables. A basic implementation may extend AppendStreamTableSink to indicate that it accepts inserts only. A fancy sink may extend other variants to support deletes, upserts, etc.

More information here: Flink documentation

Use single threaded executor in FlinkPravegaWriter

Problem description
The non-transactional writer uses a larger thread pool than necessary for handling write completion callbacks.

Suggestions for an improvement
Use a single threaded executor to reduce the number of created threads and to reduce contention. The body of the callback synchronizes on the internal writer anyway, so having numerous threads simply increases contention.

Handle expiration of transactions between operator checkpoint and global checkpoint commit

Problem description

The FlinkExactlyOncePravegaWriter works similar to a 2-PC protocol:

  1. A transaction is created per checkpoint
  2. When Flink does a checkpoint on the writer, it flushes the transaction, and stores its UUID in the checkpoint state (vote-to-commit)
  3. When the checkpoint is complete and the sink receives a notification, that checkpoint's transaction is committed (full commit)

That model assumes that transactions can be committed once the checkpoint's completeness notification is received (step 3). If a transaction times out between step (2) and step (3), there will be data loss.

This is an inherent fragility that seems hard to circumvent with the current primitive, and has to do with transaction timeouts. Given sufficiently long timeouts, this may never be a problem in most setups, but it is not nice to have this weak point in the long run.

Problem location

The problem is in the use of Pravega Transactions in the io.pravega.connectors.flink.FlinkExactlyOncePravegaWriter.

Suggestions for an improvement

From the top of my head, there are three types of solutions to that issue:

  1. Pravega offers a pre-commit / full-commit distinction in transactions, where a pre-commit means the transaction becomes immutable and the usual timeouts do not apply (except possibly a garbage prevention-timeout which could be very long). The full commit publishes the temporary segment.

  2. Flink makes sure it can recover transactions, for example by persisting the data as well in the checkpoints. If a transaction timed out, it can open a new transaction and re-write the data. Disadvantage is that effectively, everything is persisted in two distinct system (each with its own durability/replication).

  3. Flink adds a way that lets tasks complete a checkpoint completely independent of other tasks. Then the transaction could be immediately committed on trigger checkpoint. That would require to guarantee that the sinks would never be affected by a recovery of other tasks. To keep the current consistency guarantees, this would require persisting the result from the input to the sink operation (similar as for example Samza persists every shuffle), which is in some sense not too different from approach (2).

ReaderCheckpointHook failure (FLINK-6606)

Problem description
The reader fails at job initialization, due to an issue with the classloader as described in FLINK-6606.

The exception is:

2017-05-12 15:07:15,227 ERROR org.apache.flink.runtime.jobmanager.JobManager                - Failed to submit job a7535f3f9bdfc06ec89f57e4683aed68 (anomaly-detection)
io.pravega.shaded.io.grpc.ManagedChannelProvider$ProviderNotFoundException: No functional channel service provider found. Try adding a dependency on the grpc-okhttp or grpc-netty artifact
    at io.pravega.shaded.io.grpc.ManagedChannelProvider.provider(ManagedChannelProvider.java:126)
    at io.pravega.shaded.io.grpc.ManagedChannelBuilder.forTarget(ManagedChannelBuilder.java:77)
    at io.pravega.client.stream.impl.ControllerImpl.<init>(ControllerImpl.java:91)
    at io.pravega.client.admin.stream.impl.ReaderGroupManagerImpl.<init>(ReaderGroupManagerImpl.java:49)
    at io.pravega.client.admin.ReaderGroupManager.withScope(ReaderGroupManager.java:39)
    at io.pravega.connectors.flink.ReaderCheckpointHook.<init>(ReaderCheckpointHook.java:62)
    at io.pravega.connectors.flink.FlinkPravegaReader.createMasterTriggerRestoreHook(FlinkPravegaReader.java:276)
    at org.apache.flink.streaming.api.graph.FunctionMasterCheckpointHookFactory.create(FunctionMasterCheckpointHookFactory.java:43)
    at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:262)

Problem location
The root cause is that grpc uses the thread context classloader (TCCL) to locate a transport plugin (i.e. grpc-netty), but Flink isn't setting the TCCL to facilitate that.

Suggestions for an improvement
A patch has been submitted to Flink, ideally for inclusion in 1.3. As a workaround, the FlinkPravegaReader could set the TCCL by hand, e.g.:

class FlinkPravegaReader {
public FlinkPravegaReader() {
  ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader();
  Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
  try {
    ReaderGroupManager.withScope(scope, controllerURI)
                .createReaderGroup(this.readerGroupName,
                ReaderGroupConfig.builder().startingTime(startTime).build(), streamNames);
  } finally {
    Thread.currentThread().setContextClassLoader(originalClassLoader);
  }
}

Use Pravega batch read API with batch connector

Problem description
We have merged an experimental API for batch reads of a stream:

pravega/pravega@493a9f4

The general idea is that a job has access to all segments in parallel, assuming that for batch jobs, we don't care about the order of events in a stream.

The idea is to incorporate this API in the preliminary batch connector developed in PR #54.

Problem location
Batch connectors.

Suggestions for an improvement
Use the experimental batch read API

Recovered commit should apply to original stream

Problem description
The connector is designed to handle an edge case where the transaction associated with a given savepoint remains uncommitted due to a crash before notifyCheckpointComplete. In that situation, a subsequent job based on that savepoint recovers the transaction during restoreState. The subject of this bug is that the new job might point to a different stream from that of the original job. The code in recoverState should use the old stream name when looking for transactions to recover.

Problem location
FlinkPravegaWriter

Suggestions for an improvement
Record the old name as part of the tuple that is returned in savepoint.

Implement connector metrics

Problem description
The connector should emit relevant metrics using Flink's metrics API. Note that Flink already provides some basic metrics like records in/out, which needn't be duplicated.

Suggestions for an improvement
Suggested metrics:

  • configuration data (stream names as gauges?)
  • segment assignment
  • event I/O rates
  • position information
  • lag (ReaderGroupMetrics::unreadBytes)

Develop a unit test for FlinkPravegaInputFormat

Problem description

Likewise to #67 and #68, the current tests for the FlinkPravegaInputFormat are strictly speaking integration tests that do not have sufficient coverage. We should develop proper unit tests for the connector.

Suggestions for an improvement

  • Rename FlinkPravegaInputFormatTest to FlinkPravegaInputFormatITCase.
  • Develop a unit test that uses a mock Pravega EventStreamReader.

Build step throws checkstyle violation error

Problem description
Building latest connector code is throwing checkstyle violation errors

Problem location
Multiple java files

[ant:checkstyle] [ERROR] /flink-connectors/src/main/java/io/pravega/connectors/flink/CheckpointSerializer.java:1: Line does not match expected header line of '/\*\*'. [RegexpHeader]
[ant:checkstyle] [ERROR] /flink-connectors/src/main/java/io/pravega/connectors/flink/FlinkExactlyOncePravegaWriter.java:1: Line does not match expected header line of '/\*\*'. [RegexpHeader]
[ant:checkstyle] [ERROR] /flink-connectors/src/main/java/io/pravega/connectors/flink/FlinkPravegaReader.java:1: Line does not match expected header line of '/\*\*'. [RegexpHeader]
[ant:checkstyle] [ERROR] /flink-connectors/src/main/java/io/pravega/connectors/flink/FlinkPravegaWriter.java:1: Line does not match expected header line of '/\*\*'. [RegexpHeader]
[ant:checkstyle] [ERROR] /flink-connectors/src/main/java/io/pravega/connectors/flink/PravegaEventRouter.java:1: Line does not match expected header line of '/\*\*'. [RegexpHeader]
[ant:checkstyle] [ERROR] /flink-connectors/src/main/java/io/pravega/connectors/flink/PravegaWriterMode.java:1: Line does not match expected header line of '/\*\*'. [RegexpHeader]
[ant:checkstyle] [ERROR] /flink-connectors/src/main/java/io/pravega/connectors/flink/ReaderCheckpointHook.java:1: Line does not match expected header line of '/\*\*'. [RegexpHeader]

Suggestions for an improvement

Prefer Flink annotations and preconditions

Problem description
With the rationale that the connector code should sit nicely with that of other Flink connectors, prefer Flink annotations over Guava annotations where possible. Concretely: Preconditions, VisibleForTesting.

Note that the Pravega client itself uses Guava, so it will remain a dependency.

FlinkPravegaReader fails to achieve exactly-once semantics in some cases

Problem description
In the edge case where a task fails before the first checkpoint is successful, the behavior should be that the group is rewound to the initial state. For example, if the group was configured to start from the beginning of the stream, it should restart from the beginning.

Due to how the source is implemented by creating the reader group in the source constructor (which is not re-executed in this case), and that the hook isn't invoked when there's no state to restore, the actual behavior is that the group simply continues from where it left off. Actually, when the replacement tasks start up, an error occurs due to dirty state:

java.lang.IllegalStateException: The requested reader: Source: Custom Source -> Sink: Unnamed (1/4) cannot be added to the group because it is already in the group. Perhaps close() was not called?
	at io.pravega.client.stream.impl.ReaderGroupStateManager.initializeReader(ReaderGroupStateManager.java:118)
	at io.pravega.client.stream.impl.ClientFactoryImpl.createReader(ClientFactoryImpl.java:138)
	at io.pravega.client.stream.impl.ClientFactoryImpl.createReader(ClientFactoryImpl.java:124)
	at io.pravega.connectors.flink.util.FlinkPravegaUtils.createPravegaReader(FlinkPravegaUtils.java:110)
	at io.pravega.connectors.flink.FlinkPravegaReader.run(FlinkPravegaReader.java:227)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:95)
	at org.apache.flink.streaming.runtime.tasks.StoppableSourceStreamTask.run(StoppableSourceStreamTask.java:39)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
	at java.lang.Thread.run(Thread.java:748)

Simply removing oneself from the online readers would fix the above symptom but would produce undesirable at-most-once behavior.

Suggestions for an improvement
The obvious fix is to change Flink's hook functionality to invoke the hook in the non-restore case too. In that case, the hook would reinitialize the group. Alternately, the tasks could catch the above exception and reset the reader group state, with some additional coordination.

As a workaround, the reader could wait for the first Flink checkpoint to arrive before processing any elements. There's a catch-22: the Flink checkpoints are communicated to the task via the reader group state!

Remove "pravega-standalone" from shaded JAR

Problem description
The pravega-standalone dependency is erroneously being included in the shaded JAR.

Since the server code isn't in the shaded jar, and the POM doesn't list the server libs as dependencies (nor should it), the emulator classes make little sense to have here.

Problem location
build.gradle

Suggestions for an improvement
Remove it from the set of 'included dependencies' section of 'shadowJar'. It is fine to continue to have this dependency in the testCompile configuration.

FlinkPravegaWriter cannot be flushed following a write error

Problem description
When close or flush is called after a write that has failed internally, the method will stall forever.

Problem location
The problem is due to bad accounting of pending writes based on pendingWritesCount. The flush method waits for the count to reach zero, but the count is not properly decremented on error. (ref)

Suggestions for an improvement
Decrement the write count irrespective of success or failure.

Develop a unit test for FlinkPravegaWriter

Problem description
The FlinkPravegaWriter has a pair of existing tests, FlinkPravegaWriterTest and FlinkExactlyOncePravegaWriterTest, that are best understood as integration tests. Overall coverage is low.

Suggestions for an improvement

  • Develop a unit test that uses a mock Pravega event writer to fully exercise the FlinkPravegaWriter.
  • Consolidate the existing tests into a test called FlinkPravegaWriterITCase.

Use a stable default readerName

Problem description
The readerName is used only as a key into Flink's checkpoint state to disambiguate the state written by our checkpoint hook, from that of other hooks in the same job. It is important for the value to be stable, otherwise the savepoint is unusable. Let's use a more stable identifier by default than a randomly-generated value.

Problem location
FlinkPravegaReader

Suggestions for an improvement
Ordinary state stored by an operator is similarly disambiguated using the operator ID (ref). Some enhancement to Flink's hook facility would be needed to use the operator ID here.

In the meantime, I suggest hashing the inputs to the PravegaFlinkReader (stream name, etc) to generate a default value for the readerName.

Reevaluate error semantics of FlinkPravegaWriter

Problem description
In the non-transactional write mode, the sink function processes writes asynchronously. Any error is buffered internally to be re-thrown later when a subsequent write, close or snapshot is called. One issue is that the error state is cleared after the error is thrown. It seems odd to clear the error since it a fatal error.

Consider the following sequence:

write("A"); // processed asynchronously, resulting in a buffered error.
try {
    write("B"); // buffered error is thrown 
}
catch(Exception e) {
    write("B"); // should this succeed?
}

Problem location
FlinkPravegaWriter::checkWriteError

Suggestions for an improvement
Don't clear the error; block any further writes. Maybe move to a closed state upon error.

Batch connector doesn't rewind on task failover

Problem description
Flink supports a number of failover strategies when a task fails, by default a global restart of the job. None of the restart strategies work correctly with FlinkPravegaInputFormat because it doesn't rewind the reader group to the start position.

Suggestions for an improvement

  • Discard the reader group state upon a global restart.
  • Validate that the job is configured to use the global failover strategy.

Additional Background
The restart procedure for an input format normally works as follows:

  1. InputFormat::new is called by the client, then serialized into a job graph.
  2. InputFormat::configure is called once by the JM when the execution graph is built.
  3. InputFormat::createInputSplits is called once by the JM when the execution graph is built.
  4. InputFormat::getInputSplitAssigner(splits) is called for each execution (incl. resetForNewExecution)
  5. InputSplitAssigner::getNextInputSplit is called by the JM to produce a split for processing by an idle task.

In words, the 'assigner' is the iterator and is reset by the 'global' failover strategy using resetForNewExecution.

Merge FlinkExactlyOncePravegaWriter and FlinkPravegaWriter

Problem description
The FlinkExactlyOncePravegaWriter currently always uses transactions to write to pravega. Hence if this is used as a sink in a flink job with checkpointing disabled the events will never be committed to pravega.
We could conditionally use transactions in the writer depending on whether checkpointing is enabled or not.

Problem location
FlinkExactlyOncePravegaWriter

Suggestions for an improvement
Merge the 2 writer implementations.

Remove unneeded dependencies

Problem description
Some of the 'shared-*' dependencies might not be needed, e.g. shared-metrics.

Problem location
build.gradle

Suggestions for an improvement

Move the exactly once connectors into flink-connectors

Problem description
Currently the exactly once connectors are being developed out of the pravega/pravega project.
We need to move those into the pravega/flink-connectors project.

Problem location
Flink connectors in pravega

Suggestions for an improvement
Move the flink connector files our of the pravega project.

Use secure token for Travis integration

Problem description
The token for Travis integration has currently changed and is replaced by secure credentials. The same needs to be used for (pravega/pravega#1712 for reference). The same credentials should be used for this repo too.

Problem location
.travis.yml

Suggestions for an improvement
Replace token with secure token.

Implement batch connector (source only)

Problem description
Develop a batch connector to process historical data in a given stream using the Flink Batch API. A good reason is to be able to use Flink Gelly (graph API) and Flink ML which are based on the batch API.

It is probably sufficient to consider a Pravega stream as a source, not as a sink, for batch purposes.

Suggestions for an improvement
There's at least two approaches being considered. One is to treat segments as splits, which seems more likely to leverage data locality and more compatible with fine-grained recovery, but requires client changes. Another is to synthesize a split per subtask and to use the existing reader group API, which would require fewer changes to the Pravega client but underperform at scale.

Implement an InputFormat, which splits an input file or other resource (e.g. Pravega stream) into processable units for Flink to process across a number of subtask instances. For example, a file-based input format splits the input file(s) into splits representing non-overlapping ranges that correspond to HDFS blocks. For Pravega, a close analogue would be to treat each segment as an input split.

If location hints are available (on a per-split basis), implement LocatableInputSplit and return an instance of LocatableInputSplitAssigner from InputFormat.

A connector is free to read each split in whatever way it chooses. It needn't be a file-based access. For example, if each segment were a split, the connector could use a Pravega Reader API to query a specific segment, e.g. io.pravega.client.segment.impl.SegmentInputStream (which the reader group API uses under the hood).

Work on README

Problem description
The project readme file needs detail, let's write a good landing readme file.

Problem location
README.

Suggestions for an improvement
Add more information to the main README file.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.