aiven-open / guardian-for-apache-kafka Goto Github PK
View Code? Open in Web Editor NEWSet of tools for creating backups, compaction and restoration of Apache Kafka® Clusters
License: Apache License 2.0
Set of tools for creating backups, compaction and restoration of Apache Kafka® Clusters
License: Apache License 2.0
When gzip compression was added to Guardian (see #196) due to how gzip compression works the implementation was not ideal. Specifically we compress each Kafka record individually where as ideally we would like to compress larger chunks of data since that will give much better space savings via compression.
This feature could be improved by buffering the data in memory up until the S3 minimum chunk size (5 megs) and then compressing that entire memory chunk at once rather than doing it per message.
The MockedKafkaClientBackupClientSpec
occasionally fails, need to investigate why.
Here are some seeds with which the test fails
-81029110963502054
7832168009826873070
-5937289859930351334
7832168009826873070 has occurred a couple of times but even when setting the seed it still doesn't deterministically fail
After speaking with @mkoskinen, he made a valid point in that there may be a usecase where some kind of object storage (or more specifically an implementation of BackupClientInterface
) is so basic/simple that it doesn't support functionality such as resume. A way around this would be to use a BackupClientInterface
that supports resuming (even a flat file storage) and then after each substream for that object/key/file is complete you can them upload it to S3/GCS/whatever as a whole.
This can also solve other problems, for example currently we don't compress the .json
file (using something like gz
) while streaming because we can't find a resume point from a compressed object/key/file. This approach would allow you to backup the stream to an initial storage and then after its finished compress it and send it to S3/GCS/whatever.
On first impressions the implementation can be adding a single method to the BackupClientInterface
that returns an Option[Sink]
where if its defined, this sink gets executed after a backup is complete. One considering is whether the sink can be run asynchronously or synchronously (ideally as a parameter in the method itself), i.e.
def afterBackupSink(async: Boolean): Option[Sink]
The current tests, particularly RealS3BackupClientSpec
have quite a bit of boilerplate code which can be reduced with basic refactoring.
Refactoring out common code in the tests with function should alleviate most of the boilerplate.
Unfortunately due to S3 having a large minimum chunk size (5 megabytes) we often have to deal with comparing large amounts of data when we do our tests against S3. What invariably ends up happening is we compare very large data structures (typically 10mb's or more) with eachother to see if they are equal which uses large amounts of CPU, enough so that it can actually create bottlenecks.
There are 2 cases we have to deal with
In regards to 1, there isn't too much we can do apart from potentially dealing with raw numbers rather than String
's on the suspicion that scalacheck's generated String
's is causing too many hash collisions that is slowing down the equals
method. The easiest solution here may be to use a custom hashcode that works better with the generated data. Alternately one can make sure that the data generated in io.aiven.guardian.kafka.Generators
are incrementing numbers which will generate less collisions, however they you have to deal with serializing the strings into numbers when implementing a custom hashcode.
In regards to 2, we are currently using https://github.com/softwaremill/diffx to create nice diff's if the 2 data structures are not equal however diffx isn't designed to handle large data structures nicely. More specifically it can handle cases were a single value in some data structure is wrong or missing but what typically happens in our S3 tests is not a single value is missing but instead entire chunks of data are missing (i.e. a backup files for a single chunk). Using algorithms which are are very fast at quickly detecting these missing large chunks of data and handling overlapping data and then falling back to slower/more general methods can theoretically greatly reduce the amount of CPU time that is being used. In other words we want to have a fast path for the most common cause of test failures and then fallback to the slower/current algorithm (if possible)
The proper way of terminating an akka-stream by the client (i.e. a running backup instance is terminated) is by using a kill-switch https://doc.akka.io/docs/akka/current/stream/stream-dynamic.html#controlling-stream-completion-with-killswitch.
Specifically for our case we should add a shutdown hook that triggers the killswitch. The killswitch is also handy for people using this as a library as it gives them a way to terminate the indefinite stream.
When using the S3 storage engine we should document what permissions are needed for S3 accounts that Guardian.
Since Guardian uses non typical parts of the S3 API which need extra permissions, i.e. ListParts) this needs to be documented since it can come off as a surprise.
Also retention levels for uploaded parts with multipart uploads also need to be documented, i.e. if the retention level of your multipart upload is lower than the configured timeslice you will get problems since the parts will disappear before they get completed.
Whenever an exception is thrown in a SubFlow
(i.e. as a result of using splitAfter
for creating sub flows based on time slice) it does not respect the configured actor supervision strategy which means instead of the stream being terminated (which is what you would expect), it just repeats itself indefinitely.
If an exception is thrown inside a SubFlow
it should cancel the parent stream which should propagate and cancel all substreams.
Upstream issue/comment is here akka/akka#23066 (comment)
Currently the restore portion of Guardian only uses a standard Kafka producer using Alpakka's Producer
. While this is fine for standard scenarios most people when doing a restore would ideally want exactly once semantics for making a restore. This seems possible to do using the standard Producer with
def baseProducerConfig
: Some[ProducerSettings[Array[Byte], Array[Byte]] => ProducerSettings[Array[Byte], Array[Byte]]] =
Some(
_.withBootstrapServers(
container.bootstrapServers
).withProperties(
Map(
ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG -> true.toString,
ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION -> 1.toString,
ProducerConfig.BATCH_SIZE_CONFIG -> 0.toString
)
).withParallelism(1)
)
However such a configuration has terrible throughput since you are essentially forcing the Kafka producer to send one message at a time with no batching
Support needs to be done in Alpakka Kafka so that its possible to create a Transactional.source
from a non Kafka cluster source. The current Transactional.sink
only works with a PartitionOffsetCommittedMarker
which is currently only created with a Transactional.source
. Trying to manually create a PartitionOffset
from a different Source
just results in an exception being thrown at https://github.com/akka/alpakka-kafka/blob/1a1dab1e5168a829b7e76053375a364739043850/core/src/main/scala/akka/kafka/internal/TransactionalProducerStage.scala#L220 and its not possible to manually create a PartitionOffsetCommittedMarker
since its private/internal API
There is currently an ongoing issue at Alpakka Kafka at akka/alpakka-kafka#1075 for this
Tests for the GCS BackupClient
are missing due to the fact that the currently the hostname is hardcoded in Alpakka (see https://github.com/akka/alpakka/blob/f2971ca8a4a71b541cddbd5bf35af3a2a56efe71/google-cloud-storage/src/main/scala/akka/stream/alpakka/googlecloud/storage/impl/GCStorageStream.scala#L33-L34)
An upstream PR for Alpakka needs to be created in order to make this configurable.
Once we work on the relevant section in CONTRIBUTING.md
we should add a comment about people remembering to use githubWorkflowGenerate
when modifying build.sbt
so that generated pipeline is in sync with the changes.
Currently environment variable overrides are defined in reference.conf
in core modules, i.e. https://github.com/aiven/guardian-for-apache-kafka/blob/main/core-backup/src/main/resources/reference.conf#L1-L30 . When thinking about this, ideally they should be moved to the equivalent cli
porject as application.conf
files since the environment variables are only relevant when you actually run your application.
Furthermore if someone uses Guardian core modules as a library the user of that library may not want to expose those environment variables under these name/s (or even at all). This can also help alleviate #91 since we can encapsulate any such changes in the cli modules and leave the core Guardian modules as clean Scala libraries.
At some point we would like to add file based backups to Guardian.
Currently this is not possible because Akka's FileIO
doesn't allow you to add a "callback" whenever a chunk of bytes have been successfully written to disk. The callback is required to commit the current Kafka cursor and hence the set of changes required are similar in concept to what was done with Alpakka's S3 client (see akka/alpakka#2770). Here is the location where the current FileIO
actually writes the data to disk https://github.com/akka/akka/blob/7abc41cf4e7e8827393b181cd06c5f8ea684e696/akka-stream/src/main/scala/akka/stream/impl/io/FileOutputStage.scala#L63
Use https://doc.akka.io/docs/alpakka-kafka/current/home.html#matching-kafka-versions in order to connect to a kafka cluster in core
project
Currently we are not shutting down our akka systems correctly in our tests
Akka provides akka-testkit which comes with utility traits such as TestKit
that can be used to both initialize and shutdown the akka system
Automatic detection of newer updates for dependencies as well as creation of pull requests
There is already a plugin/bot that handles this at https://github.com/scala-steward-org/scala-steward
There is no linter at the moment this means that we might be letting some subtle bugs go by in each PR
Add and configure Scalafix in he project
The current BackupClient
spec for S3 seems to have randomly failed
The test always run correctly
Test failure is at https://github.com/aiven/guardian-for-apache-kafka/runs/3379943584?check_suite_focus=true . Logs have also been attached incase they expire
logs_376.zip
Currently when we calculate time slices we just use the first timestamp of the event as a reference point, i.e. in the following situation
event 1: 13:00 Day 1 -> event 2: 15:00 Day 1-> event 3: 21:00 Day 1-> event 4: 10:00 Day 2 -> event 4: 14:00 Day 2
We will use the timestamp of event 1
as an initial time, so that if we set the time period to 1 day
(lets say), that means the time slices will be from 13:00
of the current day to 12:59....
of the next day.
Practically I suspect most admins would rather sort the time slices by an easily distinguishable point of reference (i.e. from 0:00 each day) rather than the timestamp of the first event.
i.e. with the previous example if we allow the user to configure the initial time to be "00:00 of day when we receive the first event" then the time slices would be as follows
slice 1: event 1: 13:00 Day 1 -> event 2: 15:00 Day 1-> event 3: 21:00 Day 1
slice 2: event 4: 10:00 Day 2 -> event 4: 14:00 Day 2
where as currently it would be
slice 1: event 1: 13:00 Day 1 -> event 2: 15:00 Day 1-> event 3: 21:00 Day 1 -> event 4: 10:00 Day 2
slice 2: event 4: 14:00 Day 2
(since we calculate the day from 13:00 rather than 00:00)
Further points that arise from this
1 day
or 1 hour
) how does it work when you set it to a time period which has no such easily group-able time abstraction Someone can just set the time period to be 153482 millis
(if they want) in which case this configuration doesn't make too much sensejava.time.temporal.ChronoUnit
(which means the configuration is only going to be Days
/Weeks
etc etc) or is another configuration more appropriate? Having a restricted config that points to a type such as ChronoUnit
makes it easier to solve the problem from the previous point. Furthermore it can be argued that if genrealize the problem mathematically, we already currently do have an initial point in time, its just ChronoUnit.MICROS
of the first event?@jlprat What are your thoughts?
As @jankatins has stated, we should be using io.aiven
instead of aiven.io
for our package names since this is what our other software also uses.
When running Guardian against an actual live Aiven cluster that has messages being continuously sent into a topic this error occurs
[ERROR] Entry$$anonfun$$lessinit$greater$1$$anon$1 - Unhandled exception in stream
io.aiven.guardian.kafka.Errors$ExpectedStartOfSource$: Always expect a single element at the start of a stream
at io.aiven.guardian.kafka.Errors$ExpectedStartOfSource$.<clinit>(Errors.scala:6)
at io.aiven.guardian.kafka.backup.BackupClientInterface.$anonfun$sourceWithFirstRecord$1(BackupClientInterface.scala:200)
at akka.stream.impl.fusing.Map$$anon$1.onPush(Ops.scala:52)
at akka.stream.impl.fusing.GraphInterpreter.processPush(GraphInterpreter.scala:542)
at akka.stream.impl.fusing.GraphInterpreter.processEvent(GraphInterpreter.scala:528)
at akka.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter.scala:390)
at akka.stream.impl.fusing.GraphInterpreterShell.runBatch(ActorGraphInterpreter.scala:650)
at akka.stream.impl.fusing.GraphInterpreterShell$AsyncInput.execute(ActorGraphInterpreter.scala:521)
at akka.stream.impl.fusing.GraphInterpreterShell.processEvent(ActorGraphInterpreter.scala:625)
at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:800)
at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:818)
at akka.actor.Actor.aroundReceive(Actor.scala:537)
at akka.actor.Actor.aroundReceive$(Actor.scala:535)
at akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:716)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
at akka.actor.ActorCell.invoke(ActorCell.scala:548)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
at akka.dispatch.Mailbox.run(Mailbox.scala:231)
at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373)
at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182)
at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655)
at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622)
at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)
That it works
This is likely a corner case in Guardian where the first instantiation of the stream is empty. Note that this does not mean the stream in general has no data in it, just that this current iteration of the stream doesn't, i.e. there may be no messages in any given timeslice.
Docker packaging via https://github.com/sbt/sbt-native-packager
sbt-native-packager has already been implemented in #83 so it should be fairly straight forward. https://sbt-native-packager.readthedocs.io/en/stable/formats/docker.html has documentation for it.
Currently when you start the main backup stream using backupClient.backup.run()
you get back a Consumer.Control
. The Consumer.Control
has a drainAndShutdown(streamCompletion: Future[_])
method which is intended to be executed when the application is shutting down in order to do a graceful shutdown.
The problem with the drainAndShutdown(streamCompletion: Future[_])
is the streamCompletion
future it requires. The supplied argument is meant to be a Future
representing the stream however no such future exists when calling drainAndShutdown(streamCompletion: Future[S]): Future[S]
because backupClient.backup.run()
is a never ending stream so it doesn't even return a Future
as a materialization value.
The above problem occurs when you dont use a DrainingControl
as a combiner when you materialize a stream as documented here https://doc.akka.io/docs/alpakka-kafka/current/consumer.html#committer-sink, i.e.
val committerSettings = CommitterSettings(system)
val control: DrainingControl[Done] =
Consumer
.committableSource(consumerSettings, Subscriptions.topics(topic))
.mapAsync(1) { msg =>
business(msg.record.key, msg.record.value)
.map(_ => msg.committableOffset)
}
.toMat(Committer.sink(committerSettings))(DrainingControl.apply)
.run()
If you do use a DrainingControl
as described above then the drainAndShutdown
doesn't require the streamCompletion
argument, i.e. its just drainAndShutdown(): Future[Done]
.
Unfortunately doing this in guardian is more complicated because of our usage of Sink.lazyInit
which when combined with to
we end up losing a reference to the materialized value i.e.
@nowarn("msg=method lazyInit in object Sink is deprecated")
val subFlowSink = substreams.to(
// At this point we lose the materialized value so we cant use DrainingControl.apply
Sink.lazyInit(
{
case start: Start =>
implicit val ec: ExecutionContext = system.getDispatcher
for {
state <- getCurrentUploadState(start.key)
} yield backupToStorageSink(start.key, state)
.contramap[ByteStringElement] { byteStringElement =>
(byteStringElement.data, byteStringElement.context)
}
case _ => throw Errors.ExpectedStartOfSource
},
empty
)
)
Relevant thread in the Lightbend forums https://discuss.lightbend.com/t/losing-reference-to-materialized-value-with-sink-lazyinit/9396
Documentation is sparse/missing when it comes to configuration for the storage backends
Document it!
Likely as a result of #165 and #166, PR's from external forks i.e. from scala-steward (see #171) are not terminating anymore.
That the CliSpec
successfully passes even if you don't have S3 credentials
Since the current backup CliSpec
actually starts the backup this ends up initializing the a connection to S3 there must be some exceptional case where this prevents the CliSpec
from terminating if the S3 environment variables are not all provided which is the case if the PR is created by an external fork due to secrets not being exposed.
Guardian can be configured using environment variables which is documented in the various reference.conf
files that can be found in the modules (note that actual documentation for this needs to be done and will be added later).
Currently the strategy for the naming of these environment variables was to reflect the reference.conf
setting which is a feature of how Java/Scala libraries are configured. However due to how Apache Kafka Client/Alpakka/S3 designed, the naming and structure for these reference.conf
settings are all over the place. As a quick example, to configure the Kafka bootstrap servers you would do
KAFKA_CLIENT_BOOTSTRAP_SERVERS="localhost:9092"
but for a topic you would do
AKKA_KAFKA_CONSUMER_POLL_INTERVAL=1 minute
This is due to the fact that the former is a setting for Apache's official Java Kafka Client where as the second is a setting for Alpakka's Kafka client that happens to wrap Apache's Java Kafka Client.
Furthermore for some settings which are passed directly into Apache's Kafka such as bootstrap servers you configure this by using comma delimited single value, i.e.
KAFKA_CLIENT_BOOTSTRAP_SERVERS="localhost:9092,localhost:9093"
but kafka topics config allows you to individually specify environment variables using the dot prefix, i.e.
KAFKA_CLUSTER_TOPICS.0=first_topic
KAFKA_CLUSTER_TOPICS.1=second_topic
This is because typesafe config allows you to configure list based configurations directly by index using .
(which is valid under POSIX for environment variables) but Apach'e Kafka client decided to configure this differently.
Hence the conclusion can be made because of largely technical/language reasons the environment configuration is all over the place. Or to put differently, the configuration settings are reflecting the idiosyncracies of the Java/Scala language along with how all of the libraries/ecosystems work rather than the treating the implementation/language of guardian as a black box and having consistent/clear way to configure it.
There is a strong argument to be made that rather than using environment variables that are consistent to how Java/Scala reference.conf
/java properties libraries are configured (which ultimately ends up leading to confusion) we should instead strive to have completely consistent environment variables for the whole app at the cost of possibly confusing a Java/Scala developer that may want to use Guardian directly as an app in a non typical way (rather than as a CLI or a docker tool).
There isn't a way to search through documentation
Similar to akka and alpakka which have search functionality Guardian docs should have the same. From looking at the source code in the akka paradox theme project it appears they are using algolia docsearch.
Need to investigate whether we want to use algolia-docsearch or some better alternative and also implement it.
Currently the doc generation (using paradox) is still using the basic theme
We should create a theme that uses Aiven brading/colouring to make the documentation more appeasing. We can use https://github.com/akka/akka-paradox as a reference.
An open point is whether or not we should put the paradox theme into a separate git repository. Doing so means we would be able to share the theme amongst multiple SBT projects (if we happen to have them in the future).
Currently we are using Java's basic java.util.Base64
encoder/decoder to transform the byte arrays into Base64, we should explore faster/more efficient options
The streaming here is also applicable since the payloads for kafka messages can potentially be quite large
There already appears to be a highly performant solution used by akka-http at https://github.com/akka/akka-http/blob/main/akka-parsing/src/main/java/akka/parboiled2/util/Base64.java
Note that akka-http is already available in guardian as a transitive dependency so no dependencies need to be added.
Due to compliance reasons we should document all of the licenses (both direct and transitive) that this project uses.
There is already a sbt plugin called https://github.com/sbt/sbt-license-report which can handle this
Currently documentation does direct linking to github when referencing source files however paradox has a directive specifically for this usecase.
Use the github directive here https://developer.lightbend.com/docs/paradox/current/directives/linking.html#github-directive. Note that due to this bug lightbend/paradox#512 we have to wait for a new release of paradox in order to use this directive.
The following also needs to be added to paradoxProperties
to correctly set the base github url
"github.base_url" -> s"https://github.com/aiven/guardian-for-apache-kafka/tree/${if (isSnapshot.value) "main"
else "v" + version.value}"
As discussed by @rdunklau and @jlprat we have a couple of different options for SQL queries when working with compaction.
If we have a small amount of keys but a large history the following SQL query is more performant
SELECT key, ts, value FROM objects, lateral (SELECT ts, value FROM object_versions WHERE objects.key = object_versions.key ORDER BY ts DESC LIMIT 1) t
where as this version should theoretically be faster if you have a lot of keys
SELECT DISTINCT on (key) key, ts, value FROM test ORDER BY key DESC, ts DESC;
Provide a proper README explaining the project
Currently the Scala compiler is generating warnings with Block result was adapted via implicit conversion (method apply) taking a by-name parameter
error but it turns out that this is a false positive because the error is being thrown in library code that uses macros and not actual user written code, see scala/bug#12072
Rather than polluting the codebase with @nowarn("cat=lint-byname-implicit")
everywhere we should just disable the -Xlint:byname-implicit
check until it gets resolved in upstream Scala.
Currently if you look at https://github.com/aiven/guardian-for-apache-kafka/blob/main/core-backup/src/main/scala/io/aiven/guardian/kafka/backup/BackupClientInterface.scala both the public interface (i.e. all methods that are public) in BackupClientInterface
is mixed with all of the implementation details/backup core logic which are private
All of the private implementation details (and implementation of the core logic behind BackupClientInterface
itself) should be moved to another trait, likely called BackupClientInterfaceImpl
which is package private. BackupClientInterface
can then extend BackupClientInterfaceImpl
.
This will make it ultra clear what is the public interface, i.e. BackupClientInterface
. Can also help with Java interopt since BackupClientInterface
will essentially be the same as a Java interface
.
io.aiven.guardian.kafka.backup.KafkaClient
and io.aiven.guardian.kafka.backup.KafkaClientInterface
isn't really using the correct terminology, it should rather be KafkaConsumer
and KafkaConsumerInterface
since both consumers and producers are technically clients
Rename KafkaClient
to KafkaConsumer
and KafkaClientInterface
to KafkaConsumerInterface
Since its possible for users of Guardian to change certain configuration settings (ie. changing the TimeConfiguration
from DAYS
to MINUTES
) we should add logic to detect these changes and act appropriately.
Additionally we should also introduce a check for detecting any currently existing uploads to make sure we don't accidentally lose any backups Furthermore we should also prevent concurrent backups to the same storage location (for obvious reasons).
Some brainstorming still needs to be done on what is considered appropriate. For example if someone changes the time configuration from MINUTES
to DAYS
what should we do here? i.e. should we immediately close the currently open MINUTE
timeslot and then create a new object/file for the DAY
slot?
When we receive time information from Kafka it is stored in two fields, timestamp
and timestampType
. Timestamp is a long value from epoch in millis where as timestampType
is an emum that looks like this
public enum TimestampType {
NO_TIMESTAMP_TYPE(-1, "NoTimestampType"), CREATE_TIME(0, "CreateTime"), LOG_APPEND_TIME(1, "LogAppendTime");
public final int id;
public final String name;
TimestampType(int id, String name) {
this.id = id;
this.name = name;
}
public static TimestampType forName(String name) {
for (TimestampType t : values())
if (t.name.equals(name))
return t;
throw new NoSuchElementException("Invalid timestamp type " + name);
}
@Override
public String toString() {
return name;
}
}
Guardian needs a timestamp value to use as a source of truth when replaying messages as well as splitting the storage into various time slices. Currently we ignore the timestampType
so we have a trivial implementation of getting an OffsetDateTime
, i.e.
final case class ReducedConsumerRecord(topic: String,
offset: Long,
key: String,
value: String,
timestamp: Long,
timestampType: TimestampType
) {
def toOffsetDateTime: OffsetDateTime =
Instant.ofEpochMilli(this.timestamp).atZone(ZoneId.of("UTC")).toOffsetDateTime
}
The timestampType
is a configurable setting for the Kafka cluster so we should investigate whether we need to calculate the timestmaps in a different way depending on the timestampType
.
Currently when you use Guardian with the s3 backend it doesn't check if the configured S3 account has the necessary permissions for correct usage of Guardian.
If possible we can also check that the retention level for multipart upload chunks is greater than the configured time slice.
This is basically codifying what would be documented by #199
Current thinking is to use the official AWS Java S3 client to check ACL permissions (adding these features to Alpakka's S3 client would likely be considered out of scope and hence excessive). Just need to make sure that we use the same version of the Java S3 client that Alpakka brings in.
At some point we would like to support GCS in the same way we currently support S3 as a backup storage.
Similar to what was done with alpakka's s3 client, functionality needs to be upstreamed to Alpakka GCS client that allows us to both get the uploaded parts (aka chunks) of a currently incompletely multipart upload ergo akka/alpakka#2730 and also the ability to resume the upload of a file while providing context (which in our case is Kafka cursors) ergo akka/alpakka#2770
One thing to consider is that GCS appears to have 2 API's. Their original JSON based API which is what the current Alpakka' GCS client implements and also an S3 compatible API which they added later on. It needs to be investigated whether GCS's JSON based API even supports the functionality we need. If not the changes required would be more extensive as we would need to either re-implement the current GCS client using S3's API or whether we should make a completely new GCS client with the current on in tact (also need to consider if we should share code between Alpakka's S3 client and GCS's S3 client).
The Bucket name generators generates valid bucket names according to S3Settings with virtualDotHost and prefix
test is flaky
Test passes without problems
Recent scala-steward PR's seem to be causing the problem. Need to investigate
When running Guardian against a real life cluster this warning message appeared
[WARN ] 15:54:57.878 BackupClient - Failed to upload a chunk into S3 with bucket: guardian-presentation, key: 2022-02-02T20:55:00Z.json, uploadId: 2QL6vb8QzqmelRBByFJUA8vyu.0ZI03luwyWp4_mLIEqvNp4gf2lCVoaptHPU6c6G2wPK8mjn4h6e4RNlLFAfxqCr.Lez_zISoIBCeV8m1HjplKFf9FBesIeCSqjPwl9 and partNumber: 1
java.lang.RuntimeException: Upload part 1 request failed. Response header: (HttpResponse(404 Not Found,List(x-amz-request-id: SRXG6WA7MJ406J0N, x-amz-id-2: Ran4EsJJ23uuehA8GS9j5KNDx8ap7NBjoW1W02wTPjTeWv8TmkcgwLXTcpEipTsxD14Ra9jI580=, Date: Thu, 31 Mar 2022 13:54:57 GMT, Server: AmazonS3, Connection: close),HttpEntity.Chunked(application/xml),HttpProtocol(HTTP/1.1))), response body: (<?xml version="1.0" encoding="UTF-8"?>
<Error><Code>NoSuchUpload</Code><Message>The specified upload does not exist. The upload ID may be invalid, or the upload may have been aborted or completed.</Message><UploadId>2QL6vb8QzqmelRBByFJUA8vyu.0ZI03luwyWp4_mLIEqvNp4gf2lCVoaptHPU6c6G2wPK8mjn4h6e4RNlLFAfxqCr.Lez_zISoIBCeV8m1HjplKFf9FBesIeCSqjPwl9</UploadId><RequestId>SRXG6WA7MJ406J0N</RequestId><HostId>Ran4EsJJ23uuehA8GS9j5KNDx8ap7NBjoW1W02wTPjTeWv8TmkcgwLXTcpEipTsxD14Ra9jI580=</HostId></Error>).
at akka.stream.alpakka.s3.impl.S3Stream$.$anonfun$handleChunkResponse$1(S3Stream.scala:1306)
at scala.concurrent.impl.Promise$Transformation.run(Promise.scala:467)
at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:63)
at akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:100)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:94)
at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:100)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48)
at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373)
at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182)
at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655)
at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622)
at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)
This request should automatically be retried, preferably with an exponential backoff.
Its somewhat likely that S3 may be returning the wrong kind of error and in fact this might be due to too many requests and/or some weird case of eventual consistency. This is due to the fact that this tool was being run against a Kafka cluster that had millions of unconsumed records so Guardian was pulling as many Kafka records as it could handle.
Currently the project is using sbt-release in order to do publishing (see https://github.com/aiven/guardian-for-apache-kafka/blob/main/build.sbt#L223-L239) . This is currently done as a manual step however ideally it should be done automatically.
sbt-github-actions allows you to automatically publish a library within CI (see https://github.com/djspiewak/sbt-github-actions#integration-with-sbt-ci-release). This uses another plugin instead of sbt-release so we should figure out what is the most ideal way to do this. I believe the way this is done is by creating a pull request that changes the version from a SNAPSHOT
to a whole release which triggers a build.
The credentials for our sonatype account should also be stored as github actions secrets
There is a range index issue when generating S3 bucket names for tests, i.e.
[info] - Bucket name generators generates valid bucket names according to S3Settings with virtualDotHost and prefix *** FAILED ***
[info] IllegalArgumentException was thrown during property evaluation.
[info] Message: requirement failed: invalid size given: -1
[info] Occurred when passed generated values (
[info]
[info] )
For it to not fail
There is some fencepost error that needs to be looked into at https://github.com/aiven/guardian-for-apache-kafka/blob/main/core-s3/src/test/scala/io/aiven/guardian/kafka/s3/Generators.scala
Currently for constant values (such as dummy keys/secrets) are hardcoded, it would be ideal to specify this as a configuration. See #43 (comment)
Make a separate test
folder in resources which contains configuration that is ONLY specific for a tests
Due to compliance reasons as well as making sure our project is secure we need scan for any potential security vulnerabilities
Here are some suggestions we can look at
Guardian is currently missing compression. For saving storage space (especially with large backups) compression is usually a necessary feature
Adding a gzip compression setting to Backup
configuration which will create backup files/objects that are compressed. There are certain questions/topics that arise from this
.json
filename where as compressed keys/files have a .json.gz
Due to a limitation with github actions where PR's that are created from forks cannot access github actions (see https://github.community/t/make-secrets-available-to-builds-of-forks/16166/33) and having no way to override this limitation in a granular way, tests that rely on secrets (such as end to end s3 tests) should check to see if secrets are available and if they aren't the test should skip.
ScalaTest seems to have a feature called tags that appears to be correct abstraction to solve this problem https://www.scalatest.org/user_guide/tagging_your_tests
Provide issue templates for different types: bugs, features, questions
Provide a PR template with the information we'd like to have from our contributors
Add the Contributor Covenant Code of Conduct
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.