Giter Club home page Giter Club logo

fs2-aws's People

Contributors

barryoneill avatar busybyte avatar chuwy avatar custommonkey avatar daddykotex avatar daenyth avatar damienoreilly avatar danicheg avatar dmateusp avatar fredshonorio avatar gvolpe avatar iivat avatar jarrodcodes avatar jatcwang avatar johng84 avatar kiambogo avatar mattkohl avatar mmienko avatar scala-steward avatar semenodm avatar simy4 avatar sullis avatar toddburnside avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar

fs2-aws's Issues

SqsConfig class is misplaced, it seems

The source is located under fs2.aws.sqs folder structure but the actual class has sqs package declared.

I think the class was misplaced. I wonder if you're open to put it back on the right place?

Stream can hang if there is an error reading a source file from S3

Problem

We have an fs2 job that reads a JSON file from S3 and then processes this file with 30 parallel streams to extract different sets of data to write to 30 parquet files. We find that if there is an error reading the input file, such as the file key being incorrect or the S3 permissions being incorrect, then the job will hang, print no errors, and produce no output.

However, we also find that if we configure the job to produce fewer out files (which results in fewer parallel streams), then the job doesn't hang and an appropriate exception is produce and shows up in the logs.

Work Around

The problem appears to be that when the stream from S3 is read it does not use the Blocker that it should. To work around this we no longer use readS3File, but instead use our own version, which is as follows:

  def readFromS3(bucket: String, key: String, blocker: Blocker, chunkSize: Int)(implicit sync: Sync[IO], shift: ContextShift[IO]): ByteStream = {
    fs2.io.readInputStream[IO](
      blocker.blockOn(IO(AmazonS3ClientBuilder.defaultClient().getObject(new GetObjectRequest(bucket, key)).getObjectContent)),
      chunkSize = chunkSize,
      blocker = blocker,
      closeAfterUse = true)
  }

The first difference from the fs2-aws API is that this readFromS3 method takes a Blocker instead of an ExecutionContext, which seems more correct as we already have a Blocker in context to use for this. The second different from the fs2-aws API is that blocker.blockOn(...) is used to wrap the IO[InputSTream], which is what ultimately appears to fix the hanging problem.

S3 upload should make it easy to set properties on upload requests

Problem

We have an fs2 job that runs in one AWS account and must upload files to a separate AWS account. To make sure the ownership/permissions of the files are correct we must set bucket owner full control on the upload request. However, the fs2-aws API makes this tricky to do.

Work Around

To set this property we extend the AbstractAmazonS23 class and implement just enough of the API to support uploading through fs2-aws. The code looks something like this:

object S3ClientWithCannedAcl extends AbstractAmazonS3 {

  private val client = AmazonS3ClientBuilder.defaultClient()

  override def initiateMultipartUpload(request: InitiateMultipartUploadRequest): InitiateMultipartUploadResult = {
    request.setCannedACL(CannedAccessControlList.BucketOwnerFullControl)
    client.initiateMultipartUpload(request)
  }

  override def uploadPart(request: UploadPartRequest): UploadPartResult = {
    client.uploadPart(request)
  }

 override def completeMultipartUpload(request: CompleteMultipartUploadRequest): CompleteMultipartUploadResult = {
    client.completeMultipartUpload(request)
  }
}

fs2.aws.s3.uploadS3FileMultipart[IO](bucket, key, chunkSize, amazonS3 = S3ClientWithCannedAcl)

It would be great if the fs2-aws API allowed for setting properties like this in a more direct way.

Add Scala 2.13 cross compile

please add scala 2.13 support since all libraries You are using are available with Scala 2.13.

the only issue is the scanamo-circe library with is also under Your maintenance

Regards

S3 uploadFileMultipart fails with empty input stream

WHAT IS WRONG

I use uploadFileMultipart to stream serialized database transactions to an S3 file. But, in the edge case that my DB query produces no results, I get the following error:

The XML you provided was not well-formed or did not validate against our published schema (Service: S3, Status Code: 400, Request ID: AXVBZSEAM6S8WJR7, Extended Request ID: luAacBiEwX5Xt8rpN2BvoNdaub+Q3phQTKal5flI0X5GedYxgIRONNXXp77V+6Yd7EZw0PL20t4=)

WHAT IS EXPECTED

I think that the most sensible thing in this case is to just create an empty file.

AWS authentication with web identity token

Hi,
using this library for an app running on EKS with injecting AWS credentials via IRSA, it doesn't seem to read the credentials provided as Web identity Token. Were using the lib fs2.aws.kinesis.publisher.
These are provided via the env vars AWS_ROLE_ARN and AWS_WEB_IDENTITY_TOKEN_FILE.
Can we have support for this authentication method as well, please?
Thanks

v4.0.0-RC1 Availability

Thanks for running this project. Interested to try it out with CE3 and was wondering if v4.0.0-RC1 was available prebuilt anywhere? I didn't see it on maven.

Kinesis consumer returns `StreamName should not be empty`

Problem

After upgrading to v3.0.9 using readFromKinesisStream with the default FanOut retrieval mode returns the following error. The streamName in KinesisConsumerSettings is set properly.

[info] ERROR 2021-03-01 15:09:49,630 s.a.kinesis.coordinator.Scheduler - Worker.run caught exception, sleeping for 1000 milli seconds!
[info] java.lang.NullPointerException: StreamName should not be empty
[info] 	at software.amazon.awssdk.utils.Validate.notEmpty(Validate.java:290)
[info] 	at software.amazon.kinesis.common.StreamIdentifier.singleStreamInstance(StreamIdentifier.java:85)
[info] 	at software.amazon.kinesis.retrieval.fanout.FanOutRetrievalFactory.createGetRecordsCache(FanOutRetrievalFactory.java:63)
[info] 	at software.amazon.kinesis.coordinator.Scheduler.buildConsumer(Scheduler.java:915)
[info] 	at software.amazon.kinesis.coordinator.Scheduler.createOrGetShardConsumer(Scheduler.java:890)
[info] 	at software.amazon.kinesis.coordinator.Scheduler.runProcessLoop(Scheduler.java:416)
[info] 	at software.amazon.kinesis.coordinator.Scheduler.run(Scheduler.java:325)
[info] 	at fs2.aws.kinesis.consumer$.$anonfun$readChunksFromKinesisStream$7(consumer.scala:239)

The application tries to read from one stream which has one shard.
Switching to Polling works fine.

Readme.MD examples do not compile

From README.md:

Stream.emits("test data".getBytes("UTF-8"))
  .through(s3.uploadFileMultipart(BucketName("foo"), FileKey("bar"), partSize = 5))
  .evalMap(t => IO(println(s"eTag: $t")))

Does not work anymore, since 5 is an Int, and not a PartSizeMB.
Any pointers on how to create an instance of PartSizeMB?

Update:
Fixed by copying the definition of PartSizeMB to my codebase, but I'd rather not import it if there's a better way

Can you provide an example?

This looks like a very interesting library. Can you please provide an example. I have to stream content of GZ files from S3 and I will be able to get started if there was an example of how to use this library.

readS3FileMultipart should throw error on file missing

Originally discovered by @tpon. Currently if invoked with a path to a file that doesn't exist in S3, an empty stream will be returned. Up for discussion, but I feel like an exception should be thrown so the user is aware that the file doesn't exist.

DynamoDB Stream Consumer Race Condition with `shardEnded`

I'm using fs2.aws.dynamodb.package$#readFromDynamDBStream to read from a ddb stream, and fs2.aws.dynamodb#checkpointRecords to checkpoint the records after processing.

The following problem arises when there is a delay (in my case, intentional) in checkpointing the records, and the software.amazon.kinesis.processor.ShardRecordProcessor#shardEnded event fires. fs2_aws correctly checkpoints the end of the shard, but then subsequent checkpoints of the previously emitted events will fail with:

ava.lang.IllegalArgumentException: Could not checkpoint at extended sequence number {SequenceNumber: 554022700000000052561897332,SubsequenceNumber: 0} as it did not fall into acceptable range between the last checkpoint {SequenceNumber: SHARD_END,SubsequenceNumber: 0} and the greatest extended sequence number passed to this record processor {SequenceNumber: SHARD_END,SubsequenceNumber: 0}

We need to maintain a list of all in-flight commitable records, and evict them when fs2-aws checkpoints a shardEnded event.

s3.listFiles does not manage truncation

In response to a ListObjectsV2Request, if a bucket contains too many objects (more than a 1000 in my experience), AWS automatically truncates the response to a reasonable size (1000, as before) and sends along with it a continuation token to send the next request.

This behaviour is completely ignored by s3.listFiles, leading to an obvious bug while listing crowded buckets.

Too many dependencies

Right now, my build file looks like this:

"io.laserdisc"         %% "fs2-aws"       % "2.28.39" excludeAll(
        ExclusionRule("com.amazonaws", "aws-java-sdk-kinesis"),
        ExclusionRule("com.amazonaws", "aws-java-sdk-sqs"),
        ExclusionRule("com.amazonaws", "amazon-kinesis-producer"),
        ExclusionRule("software.amazon.kinesis", "amazon-kinesis-client"),
        ExclusionRule("software.amazon.awssdk", "sts"),
        ExclusionRule("com.amazonaws", "aws-java-sdk-sqs"),
        ExclusionRule("com.amazonaws", "amazon-sqs-java-messaging-lib"),
      ),

...because all I'm interested in is the S3 multipart upload logic functionality.

Is it possible to split s3, kinesis, sts, and sqs into distinct modules so that we're not otherwise forced to download this lot and a whole forest of transitive dependencies unless we actually use them?

Unable to connect to S3 and read a file

package fs2
package aws


//import java.util.concurrent.Executors

import cats.effect.{Blocker, ContextShift, IO}
import com.amazonaws.auth.{AWSStaticCredentialsProvider, BasicAWSCredentials}
import com.amazonaws.services.s3.AmazonS3ClientBuilder
import fs2.aws.internal.S3Client
import fs2.aws.s3._
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers

import scala.concurrent.ExecutionContext





class S3Spec extends AnyFlatSpec with Matchers {

  //private val blockingEC = ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(6))
  implicit val ec: ExecutionContext = ExecutionContext.global
  implicit val ioContextShift: ContextShift[IO] = IO.contextShift(ec)

  implicit val s3Client: S3Client[IO] = fs2.aws.utils.s3TestClient
  val provider = new AWSStaticCredentialsProvider(
    new BasicAWSCredentials("...","...")
  )
  val client = AmazonS3ClientBuilder
    .standard
    .withCredentials(provider)
    .withRegion("eu-west-1") // or whatever  your region is
    .build
  val reader: Stream[IO, Unit] = Stream.resource(Blocker[IO]).flatMap { blocker =>
    readS3FileMultipart[IO]("...", "text", 25, client)
      .through(fs2.text.utf8Decode)
      .through(fs2.text.lines)
      .intersperse("\n")
      .through(text.utf8Encode)
      .through(io.file.writeAll(java.nio.file.Paths.get("output.txt"), blocker))
  }

  reader.compile.drain.unsafeRunSync

} 

getting the following error:

An exception or error caused a run to abort.
java.lang.NullPointerException
at java.io.Reader.<init>(Reader.java:78)
at java.io.InputStreamReader.<init>(InputStreamReader.java:129)
at scala.io.BufferedSource.reader(BufferedSource.scala:26)
at scala.io.BufferedSource.bufferedReader(BufferedSource.scala:27)
at scala.io.BufferedSource.charReader$lzycompute(BufferedSource.scala:37)
at scala.io.BufferedSource.charReader(BufferedSource.scala:35)
at scala.io.BufferedSource.scala$io$BufferedSourcedecachedReader(BufferedSource.scala:64) at scala.io.BufferedSource.mkString(BufferedSource.scala:93) at fs2.aws.utils.packagedecachedReader(BufferedSource.scala:64)atscala.io.BufferedSource.mkString(BufferedSource.scala:93)atfs2.aws.utils.packageanon$1.$anonfun$getObjectContentOrError$1(package.scala:27)

4.0.0-RC1 and Amazon S3

Apologies if I am behind the times here. I recently upgraded a project of mine to CE3 which uses the excellent fs2-aws-s3 module to stream data from Amazon S3 buckets. In 4.0.0 I only see kinesis-style streaming. Is there a plan to interface with S3 directly or is that now gone in favour of kinesis and should I be looking to move my data from S3 to a kinesis stream? I may well have missed something obvious, if so I would grateful if someone could demonstrate how to pull from S3 using fs2-aws-kinesis.

Need to support non-default CredentialProvider when building KinesisAsyncClient

For apps that need to use assume-role or session credentials (see https://docs.aws.amazon.com/AmazonS3/latest/dev/AuthUsingTempSessionTokenJava.html), the current approach does not work. Some possible solutions:

  1. Make add an optional CredentialsProvider field to fs2.aws.kinesis.KinesisConsumerSettings, and use that in fs2.aws.kinesis.defaultScheduler() when building KinesisAsyncClient.

or 2) Make fs2.aws.kinesis.defaultScheduler() public or protected

or 3) Make fs2.aws.kinesis.readFromKinesisStream() public or protected

Documentation / comments

Open to suggestions here!

We need to work on the README for sure, maybe some Scaladocs and a github page for the package.

Type mismatch when upgrading from 3.0.7 to 3.0.9

Having an S3 client up and running, basically as described in the readme, an upgrade from 3.0.7 to version 3.0.9 leads to the following compile error:

[error]  found   : software.amazon.awssdk.services.s3.S3Client
[error]  required: io.laserdisc.pure.s3.tagless.S3AsyncClientOp[?]
[error]         .eval(S3.create(awsS3SyncClient, blocker))

Publish fs2-aws-sqs for 3.x

I might be missing something, but when I add fs2-aws:3.0.2 as a dependency I don't have fs2.aws.sqs package.

Also don't see AWS SQS runtime dependency on Maven, though 2.29.0 works as expected.

Pipe method signature prevents changing the Stream effect type

Hello! I have a requirement to use fs2-aws-s3 within a tracing context. We are using trace4cats and thus our effect type is Kleisli[F, Span[F], *. For most of our use cases, changing to this effect type has been either built in or easy to add. For fs2, we have the option of using the translate method. However, because the public API for fs2-aws-s3 has a method signature of Pipe[F, Byte, ETag] for the methods of uploadFile and uploadFileMultipart, we cannot apply the translate method and thus cannot easily change the effect type.

A few options would be:

  1. add similar methods that return a Stream
  2. add a new s3 object called s3Streams (or similar) with these methods using shared logic
  3. Add an effect translation method, (i.e. mapK)

With any of these options a new effect type can easily be applied for all methods.
Happy to make a PR with whichever option is decided by the maintainers. Thanks for your time.

S3 consumer stream always throws an error at EOF

The requested range is not satisfiable (Service: Amazon S3; Status Code: 416; Error Code: InvalidRange

<Error><Code>InvalidRange</Code><Message>The requested range is not satisfiable</Message><RangeRequested>bytes=263-1263</RangeRequested><ActualObjectSize>263</ActualObjectSize>

Kinesis - Handle exceptions and keep consuming messages

So we got a Kinesis consumer which is getting messages from a Kinesis stream, decoding them into a custom case class and then we perform some actions according to the information decoded. The method in charge of consuming and decoding the stream is the following:

def consumeCommands: F[Unit] =
      kinesis
        .readFromKinesisStream(consumerSettings)
        .evalMap(cr => decodeAndExecuteCommand(StandardCharsets.UTF_8.decode(cr.record.data()).toString))
        .compile
        .drain

And the methods which decode the message and execute the command are the followings:

def decodeAndExecuteCommand(value: String): F[Unit] =
      decode[KinesisCommand](value) match {
        case Right(command) => logger.info(s"Processing $value") >> executeCommand(command)
        case Left(err)      => logger.error(s"Error decoding the command: $err")
      }

private def executeCommand(command: KinesisCommand): F[Unit] = ???

So, imagine that, for some unexpected reason executeCommand(command: KinesisCommand) raises an exception:

private def executeCommand(command: KinesisCommand): F[Unit] = {
      throw new IllegalStateException("An unexpected problem")
}

How can we handle this exception gracefully in our consumeCommands method so our Kinesis consumer continues consuming messages? I've tried to put some .recoverWith() in several parts of the code in order to recover from the unexpected exception:

def consumeCommands: F[Unit] =
      kinesis
        .readFromKinesisStream(consumerSettings)
        .evalMap(cr => decodeAndExecuteCommand(StandardCharsets.UTF_8.decode(cr.record.data()).toString))
        .recoverWith { case e: Throwable => fs2.Stream.eval(logger.error(e.getMessage)) }
        .compile
        .drain

But the consumer stills gets the exception, shut itself down, and stops consuming new messages:

[io-compute-12] ERROR c.g.m.a.Server - An unexpected problem 
[prefetch-cache-shardId-000000000000-0000] ERROR s.a.k.r.p.PrefetchRecordsPublisher - meat-grinder-scala-commands:shardId-000000000000 :  Unexpected exception was thrown. This could probably be an issue or a bug. Please search for the exception/error online to check what is going on. If the issue persists or is a recurring problem, feel free to open an issue on, https://github.com/awslabs/amazon-kinesis-client. 
java.lang.IllegalStateException: Shutdown has been called on the cache, can't accept new requests.
	at software.amazon.kinesis.retrieval.polling.PrefetchRecordsPublisher.throwOnIllegalState(PrefetchRecordsPublisher.java:283)
	at software.amazon.kinesis.retrieval.polling.PrefetchRecordsPublisher.peekNextResult(PrefetchRecordsPublisher.java:292)
	at software.amazon.kinesis.retrieval.polling.PrefetchRecordsPublisher.drainQueueForRequests(PrefetchRecordsPublisher.java:388)
	at software.amazon.kinesis.retrieval.polling.PrefetchRecordsPublisher$DefaultGetRecordsCacheDaemon.makeRetrievalAttempt(PrefetchRecordsPublisher.java:506)
	at software.amazon.kinesis.retrieval.polling.PrefetchRecordsPublisher$DefaultGetRecordsCacheDaemon.run(PrefetchRecordsPublisher.java:464)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
[io-compute-3] INFO  s.a.k.c.Scheduler - All record processors have been shutdown successfully. 
[io-compute-3] INFO  s.a.k.c.Scheduler - Starting worker's final shutdown. 
[io-compute-3] INFO  s.a.k.m.CloudWatchPublisherRunnable - Shutting down CWPublication thread. 
[cw-metrics-publisher] INFO  s.a.k.m.CloudWatchPublisherRunnable - CWPublication thread finished. 
[io-compute-3] INFO  s.a.k.c.Scheduler - Worker loop is complete. Exiting from worker. 

So, what is the correct strategy to handle the exception and keep the consumer up and getting messages from the stream?

Kind regards!

Unused values in `KinesisSettings`

The maxConcurrency, stsAssumeRole, and endpoint values in KinesisSettings are not used anywhere else in the code. It looks like when they were added they were each used for building a KinesisAsyncClient in consumer.scala:

However it looks like consumer.scala was removed as part of the cats-effect 3 upgrade. Would it make sense to remove these values as well? Or would it make more sense to restore some functionality to build a kinesis client given some KinesisSettings?

Remove logback dependency

My app is using slf4j-simplelogger as a logging backend and I'm getting following warning when running it:

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/Users/antonparkhomenko/Library/Caches/Coursier/v1/https/repo1.maven.org/maven2/org/slf4j/slf4j-simple/1.7.30/slf4j-simple-1.7.30.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/Users/antonparkhomenko/Library/Caches/Coursier/v1/https/repo1.maven.org/maven2/ch/qos/logback/logback-classic/1.2.3/logback-classic-1.2.3.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.SimpleLoggerFactory]

As I understand logback-classic is the same kind of backend as slf4j-simplelogger and they're both supposed to be facaded by slf4j-api and not included into libs, instead an app developer should be adding it as a dependency.

should S3 create method require Concurrent?

hello,
i was playing with Doobie and streaming to S3 and had some problems of making it work cause i would like to work within the doobie effect ConnectionIO which does not implement Concurrent while the create method of fs2.aws.s3.S3 does require it.

I dont see why (yet?)

    def create[F[_]: Async: Concurrent](s3: S3AsyncClientOp[F]): F[S3[F]] 

would the following be sufficient (at least it compiles)?

    def create[F[_]: Async](s3: S3AsyncClientOp[F]): F[S3[F]] 

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.