Giter Club home page Giter Club logo

guardian-for-apache-kafka's People

Contributors

aiven-scala-steward[bot] avatar docemmetbrown avatar jlprat avatar lukas-vlcek avatar mdedetrich avatar omarayad-aiven avatar scala-steward avatar snuyanzin avatar st3fan avatar staaldraad avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

guardian-for-apache-kafka's Issues

Add an option to buffer based on S3 minimum chunk size

What is currently missing?

When gzip compression was added to Guardian (see #196) due to how gzip compression works the implementation was not ideal. Specifically we compress each Kafka record individually where as ideally we would like to compress larger chunks of data since that will give much better space savings via compression.

How could this be improved?

This feature could be improved by buffering the data in memory up until the S3 minimum chunk size (5 megs) and then compressing that entire memory chunk at once rather than doing it per message.

Is this a feature you would work on yourself?

  • I plan to open a pull request for this feature

Fix flaky tests with MockedKafkaClientBackupClientSpec

What happened?

The MockedKafkaClientBackupClientSpec occasionally fails, need to investigate why.

What did you expect to happen?

Here are some seeds with which the test fails

-81029110963502054
7832168009826873070
-5937289859930351334

7832168009826873070 has occurred a couple of times but even when setting the seed it still doesn't deterministically fail

Allow optional step in BackupClientInterface after a time periods substream is complete

After speaking with @mkoskinen, he made a valid point in that there may be a usecase where some kind of object storage (or more specifically an implementation of BackupClientInterface) is so basic/simple that it doesn't support functionality such as resume. A way around this would be to use a BackupClientInterface that supports resuming (even a flat file storage) and then after each substream for that object/key/file is complete you can them upload it to S3/GCS/whatever as a whole.

This can also solve other problems, for example currently we don't compress the .json file (using something like gz) while streaming because we can't find a resume point from a compressed object/key/file. This approach would allow you to backup the stream to an initial storage and then after its finished compress it and send it to S3/GCS/whatever.

On first impressions the implementation can be adding a single method to the BackupClientInterface that returns an Option[Sink] where if its defined, this sink gets executed after a backup is complete. One considering is whether the sink can be run asynchronously or synchronously (ideally as a parameter in the method itself), i.e.

def afterBackupSink(async: Boolean): Option[Sink]

Reduce boilerplate in tests

What is currently missing?

The current tests, particularly RealS3BackupClientSpec have quite a bit of boilerplate code which can be reduced with basic refactoring.

How could this be improved?

Refactoring out common code in the tests with function should alleviate most of the boilerplate.

Is this a feature you would work on yourself?

  • I plan to open a pull request for this feature

Come up with a better matcher that can handle large amounts of data

What is currently missing?

Unfortunately due to S3 having a large minimum chunk size (5 megabytes) we often have to deal with comparing large amounts of data when we do our tests against S3. What invariably ends up happening is we compare very large data structures (typically 10mb's or more) with eachother to see if they are equal which uses large amounts of CPU, enough so that it can actually create bottlenecks.

How could this be improved?

There are 2 cases we have to deal with

  1. Doing a very fast comparison to see that large data structures are equal
  2. If two data structures are not equal, finding a very fast way of figuring out "whats wrong"

In regards to 1, there isn't too much we can do apart from potentially dealing with raw numbers rather than String's on the suspicion that scalacheck's generated String's is causing too many hash collisions that is slowing down the equals method. The easiest solution here may be to use a custom hashcode that works better with the generated data. Alternately one can make sure that the data generated in io.aiven.guardian.kafka.Generators are incrementing numbers which will generate less collisions, however they you have to deal with serializing the strings into numbers when implementing a custom hashcode.

In regards to 2, we are currently using https://github.com/softwaremill/diffx to create nice diff's if the 2 data structures are not equal however diffx isn't designed to handle large data structures nicely. More specifically it can handle cases were a single value in some data structure is wrong or missing but what typically happens in our S3 tests is not a single value is missing but instead entire chunks of data are missing (i.e. a backup files for a single chunk). Using algorithms which are are very fast at quickly detecting these missing large chunks of data and handling overlapping data and then falling back to slower/more general methods can theoretically greatly reduce the amount of CPU time that is being used. In other words we want to have a fast path for the most common cause of test failures and then fallback to the slower/current algorithm (if possible)

Is this a feature you would work on yourself?

  • I plan to open a pull request for this feature

Add a Killswitch to gracefully terminate the backup stream

The proper way of terminating an akka-stream by the client (i.e. a running backup instance is terminated) is by using a kill-switch https://doc.akka.io/docs/akka/current/stream/stream-dynamic.html#controlling-stream-completion-with-killswitch.

Specifically for our case we should add a shutdown hook that triggers the killswitch. The killswitch is also handy for people using this as a library as it gives them a way to terminate the indefinite stream.

Document S3 retention levels/required permissions

What is currently missing?

When using the S3 storage engine we should document what permissions are needed for S3 accounts that Guardian.

How could this be improved?

Since Guardian uses non typical parts of the S3 API which need extra permissions, i.e. ListParts) this needs to be documented since it can come off as a surprise.

Also retention levels for uploaded parts with multipart uploads also need to be documented, i.e. if the retention level of your multipart upload is lower than the configured timeslice you will get problems since the parts will disappear before they get completed.

Is this a feature you would work on yourself?

  • I plan to open a pull request for this feature

Exceptions thrown inside of `SubFlow` don't cancel entire stream when using `SupervisionStrategy.Stop`

What happened?

Whenever an exception is thrown in a SubFlow (i.e. as a result of using splitAfter for creating sub flows based on time slice) it does not respect the configured actor supervision strategy which means instead of the stream being terminated (which is what you would expect), it just repeats itself indefinitely.

What did you expect to happen?

If an exception is thrown inside a SubFlow it should cancel the parent stream which should propagate and cancel all substreams.

What else do we need to know?

Upstream issue/comment is here akka/akka#23066 (comment)

Support Transactional API for Restore

What is currently missing?

Currently the restore portion of Guardian only uses a standard Kafka producer using Alpakka's Producer. While this is fine for standard scenarios most people when doing a restore would ideally want exactly once semantics for making a restore. This seems possible to do using the standard Producer with

def baseProducerConfig
    : Some[ProducerSettings[Array[Byte], Array[Byte]] => ProducerSettings[Array[Byte], Array[Byte]]] =
  Some(
    _.withBootstrapServers(
      container.bootstrapServers
    ).withProperties(
      Map(
        ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG             -> true.toString,
        ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION -> 1.toString,
        ProducerConfig.BATCH_SIZE_CONFIG                     -> 0.toString
      )
    ).withParallelism(1)
  )

However such a configuration has terrible throughput since you are essentially forcing the Kafka producer to send one message at a time with no batching

How could this be improved?

Support needs to be done in Alpakka Kafka so that its possible to create a Transactional.source from a non Kafka cluster source. The current Transactional.sink only works with a PartitionOffsetCommittedMarker which is currently only created with a Transactional.source. Trying to manually create a PartitionOffset from a different Source just results in an exception being thrown at https://github.com/akka/alpakka-kafka/blob/1a1dab1e5168a829b7e76053375a364739043850/core/src/main/scala/akka/kafka/internal/TransactionalProducerStage.scala#L220 and its not possible to manually create a PartitionOffsetCommittedMarker since its private/internal API

There is currently an ongoing issue at Alpakka Kafka at akka/alpakka-kafka#1075 for this

Is this a feature you would work on yourself?

  • I plan to open a pull request for this feature

Add mock tests for GCS client

What is currently missing?

Tests for the GCS BackupClient are missing due to the fact that the currently the hostname is hardcoded in Alpakka (see https://github.com/akka/alpakka/blob/f2971ca8a4a71b541cddbd5bf35af3a2a56efe71/google-cloud-storage/src/main/scala/akka/stream/alpakka/googlecloud/storage/impl/GCStorageStream.scala#L33-L34)

How could this be improved?

An upstream PR for Alpakka needs to be created in order to make this configurable.

Is this a feature you would work on yourself?

  • I plan to open a pull request for this feature

Move `reference.conf` environment variables to cli modules

Currently environment variable overrides are defined in reference.conf in core modules, i.e. https://github.com/aiven/guardian-for-apache-kafka/blob/main/core-backup/src/main/resources/reference.conf#L1-L30 . When thinking about this, ideally they should be moved to the equivalent cli porject as application.conf files since the environment variables are only relevant when you actually run your application.

Furthermore if someone uses Guardian core modules as a library the user of that library may not want to expose those environment variables under these name/s (or even at all). This can also help alleviate #91 since we can encapsulate any such changes in the cli modules and leave the core Guardian modules as clean Scala libraries.

Is this a feature you would work on yourself?

  • I plan to open a pull request for this feature

Implement callbacks to Akka's `FileIO` whenever writing successfully adds a chunk to a file

What is currently missing?

At some point we would like to add file based backups to Guardian.

How could this be improved?

Currently this is not possible because Akka's FileIO doesn't allow you to add a "callback" whenever a chunk of bytes have been successfully written to disk. The callback is required to commit the current Kafka cursor and hence the set of changes required are similar in concept to what was done with Alpakka's S3 client (see akka/alpakka#2770). Here is the location where the current FileIO actually writes the data to disk https://github.com/akka/akka/blob/7abc41cf4e7e8827393b181cd06c5f8ea684e696/akka-stream/src/main/scala/akka/stream/impl/io/FileOutputStage.scala#L63

Is this a feature you would work on yourself?

  • I plan to open a pull request for this feature

Use akka testkit for akka related tests

What is currently missing?

Currently we are not shutting down our akka systems correctly in our tests

How could this be improved?

Akka provides akka-testkit which comes with utility traits such as TestKit that can be used to both initialize and shutdown the akka system

Is this a feature you would work on yourself?

  • I plan to open a pull request for this feature

Add scalasteward

What is currently missing?

Automatic detection of newer updates for dependencies as well as creation of pull requests

How could this be improved?

There is already a plugin/bot that handles this at https://github.com/scala-steward-org/scala-steward

Is this a feature you would work on yourself?

  • I plan to open a pull request for this feature

Add Scalafix as a linter

What is currently missing?

There is no linter at the moment this means that we might be letting some subtle bugs go by in each PR

How could this be improved?

Add and configure Scalafix in he project

Is this a feature you would work on yourself?

  • I plan to open a pull request for this feature

Specify an initial point in time when calculating time slices

Currently when we calculate time slices we just use the first timestamp of the event as a reference point, i.e. in the following situation

event 1: 13:00 Day 1 -> event 2: 15:00 Day 1-> event 3: 21:00 Day 1-> event 4: 10:00 Day 2 -> event 4: 14:00 Day 2

We will use the timestamp of event 1 as an initial time, so that if we set the time period to 1 day (lets say), that means the time slices will be from 13:00 of the current day to 12:59.... of the next day.

Practically I suspect most admins would rather sort the time slices by an easily distinguishable point of reference (i.e. from 0:00 each day) rather than the timestamp of the first event.

i.e. with the previous example if we allow the user to configure the initial time to be "00:00 of day when we receive the first event" then the time slices would be as follows

slice 1: event 1: 13:00 Day 1 -> event 2: 15:00 Day 1-> event 3: 21:00 Day 1
slice 2: event 4: 10:00 Day 2 -> event 4: 14:00 Day 2

where as currently it would be

slice 1: event 1: 13:00 Day 1 -> event 2: 15:00 Day 1-> event 3: 21:00 Day 1 -> event 4: 10:00 Day 2
slice 2: event 4: 14:00 Day 2

(since we calculate the day from 13:00 rather than 00:00)

Further points that arise from this

  • While this makes sense when the time period is set to a "whole unit" (i.e. 1 day or 1 hour) how does it work when you set it to a time period which has no such easily group-able time abstraction Someone can just set the time period to be 153482 millis (if they want) in which case this configuration doesn't make too much sense
  • What is the configuration going to look like, i.e. are we just going to make a configuration to a java.time.temporal.ChronoUnit (which means the configuration is only going to be Days/Weeks etc etc) or is another configuration more appropriate? Having a restricted config that points to a type such as ChronoUnit makes it easier to solve the problem from the previous point. Furthermore it can be argued that if genrealize the problem mathematically, we already currently do have an initial point in time, its just ChronoUnit.MICROS of the first event?

@jlprat What are your thoughts?

`ExpectedStartOfSource` being thrown on real life cluster

What happened?

When running Guardian against an actual live Aiven cluster that has messages being continuously sent into a topic this error occurs

[ERROR] Entry$$anonfun$$lessinit$greater$1$$anon$1 - Unhandled exception in stream
io.aiven.guardian.kafka.Errors$ExpectedStartOfSource$: Always expect a single element at the start of a stream
        at io.aiven.guardian.kafka.Errors$ExpectedStartOfSource$.<clinit>(Errors.scala:6)
        at io.aiven.guardian.kafka.backup.BackupClientInterface.$anonfun$sourceWithFirstRecord$1(BackupClientInterface.scala:200)
        at akka.stream.impl.fusing.Map$$anon$1.onPush(Ops.scala:52)
        at akka.stream.impl.fusing.GraphInterpreter.processPush(GraphInterpreter.scala:542)
        at akka.stream.impl.fusing.GraphInterpreter.processEvent(GraphInterpreter.scala:528)
        at akka.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter.scala:390)
        at akka.stream.impl.fusing.GraphInterpreterShell.runBatch(ActorGraphInterpreter.scala:650)
        at akka.stream.impl.fusing.GraphInterpreterShell$AsyncInput.execute(ActorGraphInterpreter.scala:521)
        at akka.stream.impl.fusing.GraphInterpreterShell.processEvent(ActorGraphInterpreter.scala:625)
        at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:800)
        at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:818)
        at akka.actor.Actor.aroundReceive(Actor.scala:537)
        at akka.actor.Actor.aroundReceive$(Actor.scala:535)
        at akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:716)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
        at akka.actor.ActorCell.invoke(ActorCell.scala:548)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
        at akka.dispatch.Mailbox.run(Mailbox.scala:231)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
        at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373)
        at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182)
        at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655)
        at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622)
        at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)

What did you expect to happen?

That it works

What else do we need to know?

This is likely a corner case in Guardian where the first instantiation of the stream is empty. Note that this does not mean the stream in general has no data in it, just that this current iteration of the stream doesn't, i.e. there may be no messages in any given timeslice.

Apply a DrainingControl to Kafka Committer sync

What is currently missing?

Currently when you start the main backup stream using backupClient.backup.run() you get back a Consumer.Control . The Consumer.Control has a drainAndShutdown(streamCompletion: Future[_]) method which is intended to be executed when the application is shutting down in order to do a graceful shutdown.

The problem with the drainAndShutdown(streamCompletion: Future[_])is the streamCompletion future it requires. The supplied argument is meant to be a Future representing the stream however no such future exists when calling drainAndShutdown(streamCompletion: Future[S]): Future[S] because backupClient.backup.run() is a never ending stream so it doesn't even return a Future as a materialization value.

How could this be improved?

The above problem occurs when you dont use a DrainingControl as a combiner when you materialize a stream as documented here https://doc.akka.io/docs/alpakka-kafka/current/consumer.html#committer-sink, i.e.

val committerSettings = CommitterSettings(system)

val control: DrainingControl[Done] =
  Consumer
    .committableSource(consumerSettings, Subscriptions.topics(topic))
    .mapAsync(1) { msg =>
      business(msg.record.key, msg.record.value)
        .map(_ => msg.committableOffset)
    }
    .toMat(Committer.sink(committerSettings))(DrainingControl.apply)
    .run()

If you do use a DrainingControl as described above then the drainAndShutdown doesn't require the streamCompletion argument, i.e. its just drainAndShutdown(): Future[Done].

Unfortunately doing this in guardian is more complicated because of our usage of Sink.lazyInit which when combined with to we end up losing a reference to the materialized value i.e.

@nowarn("msg=method lazyInit in object Sink is deprecated")
    val subFlowSink = substreams.to(
     // At this point we lose the materialized value so we cant use DrainingControl.apply
      Sink.lazyInit(
        {
          case start: Start =>
            implicit val ec: ExecutionContext = system.getDispatcher
            for {
              state <- getCurrentUploadState(start.key)
            } yield backupToStorageSink(start.key, state)
              .contramap[ByteStringElement] { byteStringElement =>
                (byteStringElement.data, byteStringElement.context)
              }
          case _ => throw Errors.ExpectedStartOfSource
        },
        empty
      )
    )

Relevant thread in the Lightbend forums https://discuss.lightbend.com/t/losing-reference-to-materialized-value-with-sink-lazyinit/9396

Is this a feature you would work on yourself?

  • I plan to open a pull request for this feature

Document how configuration for storage backends work

What is currently missing?

Documentation is sparse/missing when it comes to configuration for the storage backends

How could this be improved?

Document it!

Is this a feature you would work on yourself?

  • I plan to open a pull request for this feature

External Forks in github actions don't terminate.

What happened?

Likely as a result of #165 and #166, PR's from external forks i.e. from scala-steward (see #171) are not terminating anymore.

What did you expect to happen?

That the CliSpec successfully passes even if you don't have S3 credentials

What else do we need to know?

Since the current backup CliSpec actually starts the backup this ends up initializing the a connection to S3 there must be some exceptional case where this prevents the CliSpec from terminating if the S3 environment variables are not all provided which is the case if the PR is created by an external fork due to secrets not being exposed.

Discuss how to treat environment variables wrt naming

What is currently missing?

Guardian can be configured using environment variables which is documented in the various reference.conf files that can be found in the modules (note that actual documentation for this needs to be done and will be added later).

Currently the strategy for the naming of these environment variables was to reflect the reference.conf setting which is a feature of how Java/Scala libraries are configured. However due to how Apache Kafka Client/Alpakka/S3 designed, the naming and structure for these reference.conf settings are all over the place. As a quick example, to configure the Kafka bootstrap servers you would do

KAFKA_CLIENT_BOOTSTRAP_SERVERS="localhost:9092"

but for a topic you would do

AKKA_KAFKA_CONSUMER_POLL_INTERVAL=1 minute

This is due to the fact that the former is a setting for Apache's official Java Kafka Client where as the second is a setting for Alpakka's Kafka client that happens to wrap Apache's Java Kafka Client.

Furthermore for some settings which are passed directly into Apache's Kafka such as bootstrap servers you configure this by using comma delimited single value, i.e.

KAFKA_CLIENT_BOOTSTRAP_SERVERS="localhost:9092,localhost:9093"

but kafka topics config allows you to individually specify environment variables using the dot prefix, i.e.

KAFKA_CLUSTER_TOPICS.0=first_topic
KAFKA_CLUSTER_TOPICS.1=second_topic

This is because typesafe config allows you to configure list based configurations directly by index using . (which is valid under POSIX for environment variables) but Apach'e Kafka client decided to configure this differently.

Hence the conclusion can be made because of largely technical/language reasons the environment configuration is all over the place. Or to put differently, the configuration settings are reflecting the idiosyncracies of the Java/Scala language along with how all of the libraries/ecosystems work rather than the treating the implementation/language of guardian as a black box and having consistent/clear way to configure it.

How could this be improved?

There is a strong argument to be made that rather than using environment variables that are consistent to how Java/Scala reference.conf/java properties libraries are configured (which ultimately ends up leading to confusion) we should instead strive to have completely consistent environment variables for the whole app at the cost of possibly confusing a Java/Scala developer that may want to use Guardian directly as an app in a non typical way (rather than as a CLI or a docker tool).

Is this a feature you would work on yourself?

  • I plan to open a pull request for this feature

Figure out how to get search functionality for docs

What is currently missing?

There isn't a way to search through documentation

How could this be improved?

Similar to akka and alpakka which have search functionality Guardian docs should have the same. From looking at the source code in the akka paradox theme project it appears they are using algolia docsearch.

Need to investigate whether we want to use algolia-docsearch or some better alternative and also implement it.

Is this a feature you would work on yourself?

  • I plan to open a pull request for this feature

Create a theme for paradox with Aiven branding

What is currently missing?

Currently the doc generation (using paradox) is still using the basic theme

How could this be improved?

We should create a theme that uses Aiven brading/colouring to make the documentation more appeasing. We can use https://github.com/akka/akka-paradox as a reference.

An open point is whether or not we should put the paradox theme into a separate git repository. Doing so means we would be able to share the theme amongst multiple SBT projects (if we happen to have them in the future).

Is this a feature you would work on yourself?

  • I plan to open a pull request for this feature

Use a more efficient Base64 Encoder/Decoder

What is currently missing?

Currently we are using Java's basic java.util.Base64 encoder/decoder to transform the byte arrays into Base64, we should explore faster/more efficient options

The streaming here is also applicable since the payloads for kafka messages can potentially be quite large

How could this be improved?

There already appears to be a highly performant solution used by akka-http at https://github.com/akka/akka-http/blob/main/akka-parsing/src/main/java/akka/parboiled2/util/Base64.java

Note that akka-http is already available in guardian as a transitive dependency so no dependencies need to be added.

Is this a feature you would work on yourself?

  • I plan to open a pull request for this feature

Investigate a transitive license reporter

What is currently missing?

Due to compliance reasons we should document all of the licenses (both direct and transitive) that this project uses.

How could this be improved?

There is already a sbt plugin called https://github.com/sbt/sbt-license-report which can handle this

Is this a feature you would work on yourself?

  • I plan to open a pull request for this feature

Use paradox's github directive instead of manual links to project sources

What is currently missing?

Currently documentation does direct linking to github when referencing source files however paradox has a directive specifically for this usecase.

How could this be improved?

Use the github directive here https://developer.lightbend.com/docs/paradox/current/directives/linking.html#github-directive. Note that due to this bug lightbend/paradox#512 we have to wait for a new release of paradox in order to use this directive.

The following also needs to be added to paradoxProperties to correctly set the base github url

"github.base_url" -> s"https://github.com/aiven/guardian-for-apache-kafka/tree/${if (isSnapshot.value) "main"
        else "v" + version.value}"

Is this a feature you would work on yourself?

  • I plan to open a pull request for this feature

Implement SQL query for compaction on Postgres

As discussed by @rdunklau and @jlprat we have a couple of different options for SQL queries when working with compaction.

If we have a small amount of keys but a large history the following SQL query is more performant

SELECT key, ts, value FROM objects, lateral (SELECT ts, value FROM object_versions WHERE objects.key = object_versions.key ORDER BY ts DESC LIMIT 1) t

where as this version should theoretically be faster if you have a lot of keys

SELECT DISTINCT on (key) key, ts, value FROM test ORDER BY key DESC, ts DESC;

Disable lint-byname-implicit because of Scala bug

What is currently missing?

Currently the Scala compiler is generating warnings with Block result was adapted via implicit conversion (method apply) taking a by-name parameter error but it turns out that this is a false positive because the error is being thrown in library code that uses macros and not actual user written code, see scala/bug#12072

How could this be improved?

Rather than polluting the codebase with @nowarn("cat=lint-byname-implicit") everywhere we should just disable the -Xlint:byname-implicit check until it gets resolved in upstream Scala.

Is this a feature you would work on yourself?

  • I plan to open a pull request for this feature

Move implementation logic/details from `BackupClientInterface` to another private trait

What is currently missing?

Currently if you look at https://github.com/aiven/guardian-for-apache-kafka/blob/main/core-backup/src/main/scala/io/aiven/guardian/kafka/backup/BackupClientInterface.scala both the public interface (i.e. all methods that are public) in BackupClientInterface is mixed with all of the implementation details/backup core logic which are private

How could this be improved?

All of the private implementation details (and implementation of the core logic behind BackupClientInterface itself) should be moved to another trait, likely called BackupClientInterfaceImpl which is package private. BackupClientInterface can then extend BackupClientInterfaceImpl.

This will make it ultra clear what is the public interface, i.e. BackupClientInterface. Can also help with Java interopt since BackupClientInterface will essentially be the same as a Java interface.

Is this a feature you would work on yourself?

  • I plan to open a pull request for this feature

Rename `KafkaClient` and `KafkaClientInterface` to `KafkaConsumer` and `KafkaConsumerInterface`

What is currently missing?

io.aiven.guardian.kafka.backup.KafkaClient and io.aiven.guardian.kafka.backup.KafkaClientInterface isn't really using the correct terminology, it should rather be KafkaConsumer and KafkaConsumerInterface since both consumers and producers are technically clients

How could this be improved?

Rename KafkaClient to KafkaConsumer and KafkaClientInterface to KafkaConsumerInterface

Is this a feature you would work on yourself?

  • I plan to open a pull request for this feature

Add validation for time configuration changes in subsequent runs of Guardian

What is currently missing?

Since its possible for users of Guardian to change certain configuration settings (ie. changing the TimeConfiguration from DAYS to MINUTES) we should add logic to detect these changes and act appropriately.

Additionally we should also introduce a check for detecting any currently existing uploads to make sure we don't accidentally lose any backups Furthermore we should also prevent concurrent backups to the same storage location (for obvious reasons).

How could this be improved?

Some brainstorming still needs to be done on what is considered appropriate. For example if someone changes the time configuration from MINUTES to DAYS what should we do here? i.e. should we immediately close the currently open MINUTE timeslot and then create a new object/file for the DAY slot?

Is this a feature you would work on yourself?

  • I plan to open a pull request for this feature

Figure out how to treat timezone/timestamp

When we receive time information from Kafka it is stored in two fields, timestamp and timestampType. Timestamp is a long value from epoch in millis where as timestampType is an emum that looks like this

public enum TimestampType {
    NO_TIMESTAMP_TYPE(-1, "NoTimestampType"), CREATE_TIME(0, "CreateTime"), LOG_APPEND_TIME(1, "LogAppendTime");

    public final int id;
    public final String name;

    TimestampType(int id, String name) {
        this.id = id;
        this.name = name;
    }

    public static TimestampType forName(String name) {
        for (TimestampType t : values())
            if (t.name.equals(name))
                return t;
        throw new NoSuchElementException("Invalid timestamp type " + name);
    }

    @Override
    public String toString() {
        return name;
    }
}

Guardian needs a timestamp value to use as a source of truth when replaying messages as well as splitting the storage into various time slices. Currently we ignore the timestampType so we have a trivial implementation of getting an OffsetDateTime, i.e.

final case class ReducedConsumerRecord(topic: String,
                                       offset: Long,
                                       key: String,
                                       value: String,
                                       timestamp: Long,
                                       timestampType: TimestampType
) {
  def toOffsetDateTime: OffsetDateTime =
    Instant.ofEpochMilli(this.timestamp).atZone(ZoneId.of("UTC")).toOffsetDateTime
}

The timestampType is a configurable setting for the Kafka cluster so we should investigate whether we need to calculate the timestmaps in a different way depending on the timestampType.

Programatically check configured AWS account has correct permissions on startup

What is currently missing?

Currently when you use Guardian with the s3 backend it doesn't check if the configured S3 account has the necessary permissions for correct usage of Guardian.

If possible we can also check that the retention level for multipart upload chunks is greater than the configured time slice.

This is basically codifying what would be documented by #199

How could this be improved?

Current thinking is to use the official AWS Java S3 client to check ACL permissions (adding these features to Alpakka's S3 client would likely be considered out of scope and hence excessive). Just need to make sure that we use the same version of the Java S3 client that Alpakka brings in.

Is this a feature you would work on yourself?

  • I plan to open a pull request for this feature

Implement callbacks to Alpakka's GCS client whenever a chunk is uploaded using multipart upload

What is currently missing?

At some point we would like to support GCS in the same way we currently support S3 as a backup storage.

How could this be improved?

Similar to what was done with alpakka's s3 client, functionality needs to be upstreamed to Alpakka GCS client that allows us to both get the uploaded parts (aka chunks) of a currently incompletely multipart upload ergo akka/alpakka#2730 and also the ability to resume the upload of a file while providing context (which in our case is Kafka cursors) ergo akka/alpakka#2770

One thing to consider is that GCS appears to have 2 API's. Their original JSON based API which is what the current Alpakka' GCS client implements and also an S3 compatible API which they added later on. It needs to be investigated whether GCS's JSON based API even supports the functionality we need. If not the changes required would be more extensive as we would need to either re-implement the current GCS client using S3's API or whether we should make a completely new GCS client with the current on in tact (also need to consider if we should share code between Alpakka's S3 client and GCS's S3 client).

Is this a feature you would work on yourself?

  • I plan to open a pull request for this feature

Fix flaky bucket name generators test

What happened?

The Bucket name generators generates valid bucket names according to S3Settings with virtualDotHost and prefix test is flaky

What did you expect to happen?

Test passes without problems

What else do we need to know?

Recent scala-steward PR's seem to be causing the problem. Need to investigate

Amazon S3 listing parts of multipart upload failed with 404

What happened?

When running Guardian against a real life cluster this warning message appeared

[WARN ] 15:54:57.878 BackupClient - Failed to upload a chunk into S3 with bucket: guardian-presentation, key: 2022-02-02T20:55:00Z.json, uploadId: 2QL6vb8QzqmelRBByFJUA8vyu.0ZI03luwyWp4_mLIEqvNp4gf2lCVoaptHPU6c6G2wPK8mjn4h6e4RNlLFAfxqCr.Lez_zISoIBCeV8m1HjplKFf9FBesIeCSqjPwl9 and partNumber: 1
java.lang.RuntimeException: Upload part 1 request failed. Response header: (HttpResponse(404 Not Found,List(x-amz-request-id: SRXG6WA7MJ406J0N, x-amz-id-2: Ran4EsJJ23uuehA8GS9j5KNDx8ap7NBjoW1W02wTPjTeWv8TmkcgwLXTcpEipTsxD14Ra9jI580=, Date: Thu, 31 Mar 2022 13:54:57 GMT, Server: AmazonS3, Connection: close),HttpEntity.Chunked(application/xml),HttpProtocol(HTTP/1.1))), response body: (<?xml version="1.0" encoding="UTF-8"?>
<Error><Code>NoSuchUpload</Code><Message>The specified upload does not exist. The upload ID may be invalid, or the upload may have been aborted or completed.</Message><UploadId>2QL6vb8QzqmelRBByFJUA8vyu.0ZI03luwyWp4_mLIEqvNp4gf2lCVoaptHPU6c6G2wPK8mjn4h6e4RNlLFAfxqCr.Lez_zISoIBCeV8m1HjplKFf9FBesIeCSqjPwl9</UploadId><RequestId>SRXG6WA7MJ406J0N</RequestId><HostId>Ran4EsJJ23uuehA8GS9j5KNDx8ap7NBjoW1W02wTPjTeWv8TmkcgwLXTcpEipTsxD14Ra9jI580=</HostId></Error>).
        at akka.stream.alpakka.s3.impl.S3Stream$.$anonfun$handleChunkResponse$1(S3Stream.scala:1306)
        at scala.concurrent.impl.Promise$Transformation.run(Promise.scala:467)
        at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:63)
        at akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:100)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
        at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:94)
        at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:100)
        at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49)
        at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48)
        at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373)
        at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182)
        at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655)
        at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622)
        at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)

What did you expect to happen?

This request should automatically be retried, preferably with an exponential backoff.

What else do we need to know?

Its somewhat likely that S3 may be returning the wrong kind of error and in fact this might be due to too many requests and/or some weird case of eventual consistency. This is due to the fact that this tool was being run against a Kafka cluster that had millions of unconsumed records so Guardian was pulling as many Kafka records as it could handle.

Figure out how to automatically publish to Sonatype

What is currently missing?

Currently the project is using sbt-release in order to do publishing (see https://github.com/aiven/guardian-for-apache-kafka/blob/main/build.sbt#L223-L239) . This is currently done as a manual step however ideally it should be done automatically.

How could this be improved?

sbt-github-actions allows you to automatically publish a library within CI (see https://github.com/djspiewak/sbt-github-actions#integration-with-sbt-ci-release). This uses another plugin instead of sbt-release so we should figure out what is the most ideal way to do this. I believe the way this is done is by creating a pull request that changes the version from a SNAPSHOT to a whole release which triggers a build.

The credentials for our sonatype account should also be stored as github actions secrets

Is this a feature you would work on yourself?

  • I plan to open a pull request for this feature

Fix Generator

What happened?

There is a range index issue when generating S3 bucket names for tests, i.e.

[info] - Bucket name generators generates valid bucket names according to S3Settings with virtualDotHost and prefix *** FAILED ***
[info]   IllegalArgumentException was thrown during property evaluation.
[info]     Message: requirement failed: invalid size given: -1
[info]     Occurred when passed generated values (
[info]   
[info]     )

What did you expect to happen?

For it to not fail

What else do we need to know?

There is some fencepost error that needs to be looked into at https://github.com/aiven/guardian-for-apache-kafka/blob/main/core-s3/src/test/scala/io/aiven/guardian/kafka/s3/Generators.scala

Make hardcoded secret keys/test constants as configuration

What is currently missing?

Currently for constant values (such as dummy keys/secrets) are hardcoded, it would be ideal to specify this as a configuration. See #43 (comment)

How could this be improved?

Make a separate test folder in resources which contains configuration that is ONLY specific for a tests

Is this a feature you would work on yourself?

  • I plan to open a pull request for this feature

Add GZip compression

What is currently missing?

Guardian is currently missing compression. For saving storage space (especially with large backups) compression is usually a necessary feature

How could this be improved?

Adding a gzip compression setting to Backup configuration which will create backup files/objects that are compressed. There are certain questions/topics that arise from this

  • How we should store the fact that a key/object is compressed? The best solution to me is that compressed keys/files have a different filename, so standard backup keys/files have a .json filename where as compressed keys/files have a .json.gz
  • One corner case that needs to be thought of is if someone originally does a backup without compression but then restarts Guardian with compression setting enabled, you have currently existing state with keys/files that are uncompressed. Because of how cloud storage's work you cannot overwrite currently existing uncompleted chunked uploads (even if you could you wouldn't want to because its quite taxing, you would have to redownload, compress and reupload the compressed version for an unbounded amount of data). The easiest way to solve this issue would be that if you change the compression setting and reload Guardian, this won't actually apply until a completely new object/key is made (this should also be documented).
  • Similarly to the previous point, one would need to confirm if there are any issues in switching compression levels between restarts of Guardian.

Is this a feature you would work on yourself?

  • I plan to open a pull request for this feature

Don't run tests that require secrets if secrets are not available

What is currently missing?

Due to a limitation with github actions where PR's that are created from forks cannot access github actions (see https://github.community/t/make-secrets-available-to-builds-of-forks/16166/33) and having no way to override this limitation in a granular way, tests that rely on secrets (such as end to end s3 tests) should check to see if secrets are available and if they aren't the test should skip.

How could this be improved?

ScalaTest seems to have a feature called tags that appears to be correct abstraction to solve this problem https://www.scalatest.org/user_guide/tagging_your_tests

Is this a feature you would work on yourself?

  • I plan to open a pull request for this feature

Provide Issue and PR templates

Provide issue templates for different types: bugs, features, questions
Provide a PR template with the information we'd like to have from our contributors

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.