Giter Club home page Giter Club logo

snowplow-s3-loader's Introduction

Snowplow S3 Loader

Build Status Release License

Overview

The Snowplow S3 Loader consumes records from an Amazon Kinesis stream and writes them to S3.

There are 2 file formats supported:

  • LZO
  • GZip

LZO

The records are treated as raw byte arrays. Elephant Bird's BinaryBlockWriter class is used to serialize them as a Protocol Buffers array (so it is clear where one record ends and the next begins) before compressing them.

The compression process generates both compressed .lzo files and small .lzo.index files (splittable LZO). Each index file contain the byte offsets of the LZO blocks in the corresponding compressed file, meaning that the blocks can be processed in parallel.

GZip

The records are treated as byte arrays containing UTF-8 encoded strings (whether CSV, JSON or TSV). New lines are used to separate records written to a file. This format can be used with the Snowplow Kinesis Enriched stream, among other streams.

Find out more

Technical Docs Setup Guide Roadmap Contributing
i1 i2 i3 i4
Technical Docs Setup Guide Roadmap Contributing

Copyright and license

Snowplow S3 Loader is copyright 2014-2023 Snowplow Analytics Ltd.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this software except in compliance with the License.

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

snowplow-s3-loader's People

Contributors

aldemirenes avatar alexanderdean avatar arihantsurana avatar benfradet avatar benjben avatar bobshaw1912 avatar chuwy avatar davidstoker avatar fblundun avatar hanfi avatar istreeter avatar jackric avatar jbeemster avatar kazjote avatar oguzhanunlu avatar pondzix avatar spenes avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

snowplow-s3-loader's Issues

Prevent buffer from becoming arbitrarily large

Suppose that an exception (like OutOfMemoryError) is thrown in the emit method. Then the KCL will catch it, declare that that batch of records have been dropped, and move on to the next batch. But the failed records will still be stored in the BasicMemoryBuffer's List.

If the exception keeps occurring, the buffer will keep growing until memory is exhausted. The application can then never recover.

One possible solution: System.exit(1) when we detect that this is happening.

Unify logger configuration

See #1367

The point of this is that it will be possible to configure the log level for both kinesis-s3 and the KCL by starting the application like this:

java -jar -Dorg.slf4j.simpleLogger.defaultLogLevel=debug kinesis-lzo-sink-0.x.0 --config myhocon

Reduce INFO logging

I'm running snowplow-kinesis-s3-0.4.0 with a config:

logging {
level: "WARN"
}

My log file output is full of lines like this. Can these be reclassed as DEBUG or reduced in some way.

Sep 23, 2015 5:13:32 PM com.snowplowanalytics.snowplow.storage.kinesis.s3.RawEventTransformer toClass
INFO: Converting one record to EmitterInput before adding it to the buffer
Sep 23, 2015 5:13:32 PM com.snowplowanalytics.snowplow.storage.kinesis.s3.RawEventTransformer toClass
INFO: Converting one record to EmitterInput before adding it to the buffer
Sep 23, 2015 5:13:32 PM com.snowplowanalytics.snowplow.storage.kinesis.s3.RawEventTransformer toClass
INFO: Converting one record to EmitterInput before adding it to the buffer
Sep 23, 2015 5:13:32 PM com.snowplowanalytics.snowplow.storage.kinesis.s3.RawEventTransformer toClass
INFO: Converting one record to EmitterInput before adding it to the buffer
Sep 23, 2015 5:13:32 PM com.snowplowanalytics.snowplow.storage.kinesis.s3.RawEventTransformer toClass
INFO: Converting one record to EmitterInput before adding it to the buffer
Sep 23, 2015 5:13:32 PM com.snowplowanalytics.snowplow.storage.kinesis.s3.RawEventTransformer toClass
INFO: Converting one record to EmitterInput before adding it to the buffer
Sep 23, 2015 5:13:32 PM com.snowplowanalytics.snowplow.storage.kinesis.s3.RawEventTransformer toClass
INFO: Converting one record to EmitterInput before adding it to the buffer
Sep 23, 2015 5:13:32 PM com.snowplowanalytics.snowplow.storage.kinesis.s3.RawEventTransformer toClass
INFO: Converting one record to EmitterInput before adding it to the buffer
Sep 23, 2015 5:13:32 PM com.snowplowanalytics.snowplow.storage.kinesis.s3.RawEventTransformer toClass
INFO: Converting one record to EmitterInput before adding it to the buffer
Sep 23, 2015 5:13:32 PM com.snowplowanalytics.snowplow.storage.kinesis.s3.RawEventTransformer toClass
INFO: Converting one record to EmitterInput before adding it to the buffer
Sep 23, 2015 5:13:32 PM com.snowplowanalytics.snowplow.storage.kinesis.s3.RawEventTransformer toClass

Explore filenames which are ordered

The basic idea is that each running instance of the kinesis-s3 has a unique identifier ("instance_id"), and then all files written by that instance have a strictly monotonically increasing number attached to them as well. So:

de305d54-75b4-431b-adb2-eb6b9e546014-00000001
de305d54-75b4-431b-adb2-eb6b9e546014-00000002
de305d54-75b4-431b-adb2-eb6b9e546014-00000003

Why do this? It basically means that you don't have to maintain a manifest for which files you have processed - you can just store "cursor position" on a per instance basis.

If the instance restarts, you create a new instance_id. After the 99999999th file, create a new instance_id.

The instance_id not having to survive between restarts means more cursor positions to maintain, but less complexity in the kinesis-s3 sink code.

Add unit tests

Initially just unit tests (need to figure out how to do integration tests for a KCL app)

Prevent empty files from being written to S3

If there are no events happening and the buffer timeout hits an empty file is written to S3. I fact there are two files:

  • 8 Byte lzo.index file
  • 555 Byte empty lzo file

Usually this does not happen in production, but in staging environment.

bildschirmfoto 2015-09-14 um 10 58 00

Stop passing String and Byte[Array] of records around

We pass around both the String and Byte[Array] versions of each record to make failure logging easier, but this doubles the amount of memory we need in the heap.

Instead, we can just pass the Byte[Array] around and reverse out the String for any failure rows. Failures will be rare anyway given we are not doing any record-level processing.

Clarify comment

This comment is rather vague. Please specify the mechanism being used to determine "first". E.g. to have the notion of "first", there must be some atomic operation going on somewhere. Presumably a file is written somewhere, or there is some Amazon API being used for this.

I have no doubt that I can read the source code to figure it out, but this is about usability, and I suspect to some extent documentation of desired behaviour.

# Note: This only affects the first run of this application
      # on a stream.

S3 outage causes infinite retry loop

There was a us-east-1 outage for 4 hours this morning.

The Kinesis S3 app got "stuck" during the outage, and continues to be stuck in an infinite retry loop:

@4000000055c8d09012bd3bcc Aug 10, 2015 4:25:42 PM com.amazonaws.services.kinesis.metrics.impl.DefaultCWMetricsPublisher publishMetrics
@4000000055c8d09012bd4784 INFO: Successfully published 6 datums.
@4000000055c8d09527e22ea4 Aug 10, 2015 4:25:47 PM com.snowplowanalytics.snowplow.storage.kinesis.s3.S3Emitter attemptEmit$1
@4000000055c8d09527e23674 SEVERE: S3Emitter threw an unexpected exception
@4000000055c8d09527e23a5c com.amazonaws.AmazonClientException: Data read (0) has a different length than the expected (51706225)
@4000000055c8d09527e23a5c   at com.amazonaws.util.LengthCheckInputStream.checkLength(LengthCheckInputStream.java:135)
@4000000055c8d09527e23e44   at com.amazonaws.util.LengthCheckInputStream.read(LengthCheckInputStream.java:103)
@4000000055c8d09527e29434   at com.amazonaws.services.s3.internal.MD5DigestCalculatingInputStream.read(MD5DigestCalculatingInputStream.java:84)
@4000000055c8d09527e2981c   at org.apache.http.entity.InputStreamEntity.writeTo(InputStreamEntity.java:98)
@4000000055c8d09527e2981c   at com.amazonaws.http.RepeatableInputStreamRequestEntity.writeTo(RepeatableInputStreamRequestEntity.java:153)
@4000000055c8d09527e29c04   at org.apache.http.entity.HttpEntityWrapper.writeTo(HttpEntityWrapper.java:98)
@4000000055c8d09527e2d2b4   at org.apache.http.impl.client.EntityEnclosingRequestWrapper$EntityWrapper.writeTo(EntityEnclosingRequestWrapper.java:108)
@4000000055c8d09527e2d69c   at org.apache.http.impl.entity.EntitySerializer.serialize(EntitySerializer.java:122)
@4000000055c8d09527e2d69c   at org.apache.http.impl.AbstractHttpClientConnection.sendRequestEntity(AbstractHttpClientConnection.java:271)
@4000000055c8d09527e2e63c   at org.apache.http.impl.conn.ManagedClientConnectionImpl.sendRequestEntity(ManagedClientConnectionImpl.java:197)
@4000000055c8d09527e2ea24   at org.apache.http.protocol.HttpRequestExecutor.doSendRequest(HttpRequestExecutor.java:257)
@4000000055c8d09527e2ea24   at com.amazonaws.http.protocol.SdkHttpRequestExecutor.doSendRequest(SdkHttpRequestExecutor.java:47)
@4000000055c8d09527e2f1f4   at org.apache.http.protocol.HttpRequestExecutor.execute(HttpRequestExecutor.java:125)
@4000000055c8d09527e2f1f4   at org.apache.http.impl.client.DefaultRequestDirector.tryExecute(DefaultRequestDirector.java:713)
@4000000055c8d09527e2f5dc   at org.apache.http.impl.client.DefaultRequestDirector.execute(DefaultRequestDirector.java:518)
@4000000055c8d09527e2f5dc   at org.apache.http.impl.client.AbstractHttpClient.execute(AbstractHttpClient.java:906)
@4000000055c8d09527e2fdac   at org.apache.http.impl.client.AbstractHttpClient.execute(AbstractHttpClient.java:805)
@4000000055c8d09527e2fdac   at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:402)
@4000000055c8d09527e30194   at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:245)
@4000000055c8d09527e30964   at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3722)
@4000000055c8d09527e30d4c   at com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1474)
@4000000055c8d09527e30d4c   at com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1311)
@4000000055c8d09527e31134   at com.snowplowanalytics.snowplow.storage.kinesis.s3.S3Emitter.attemptEmit$1(S3Emitter.scala:144)
@4000000055c8d09527e3151c   at com.snowplowanalytics.snowplow.storage.kinesis.s3.S3Emitter.emit(S3Emitter.scala:166)
@4000000055c8d09527e31904   at com.amazonaws.services.kinesis.connectors.KinesisConnectorRecordProcessor.emit(KinesisConnectorRecordProcessor.java:159)
@4000000055c8d09527e31904   at com.amazonaws.services.kinesis.connectors.KinesisConnectorRecordProcessor.processRecords(KinesisConnectorRecordProcessor.java:132)
@4000000055c8d09527e320d4   at com.amazonaws.services.kinesis.clientlibrary.lib.worker.ProcessTask.call(ProcessTask.java:125)
@4000000055c8d09527e324bc   at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:48)
@4000000055c8d09527e324bc   at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:23)
@4000000055c8d09527e33074   at java.util.concurrent.FutureTask.run(FutureTask.java:262)
@4000000055c8d09527e3345c   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
@4000000055c8d09527e3345c   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
@4000000055c8d09527e33844   at java.lang.Thread.run(Thread.java:745)
@4000000055c8d09527e33c2c
@4000000055c8d09715ea67ac Aug 10, 2015 4:25:49 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker$WorkerLog info
@4000000055c8d09715ea7364 INFO: Current stream shard assignments: shardId-000000000207
@4000000055c8d09715ec53dc Aug 10, 2015 4:25:49 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker$WorkerLog info
@4000000055c8d09715ec5bac INFO: Sleeping ...
@4000000055c8d09a13abf514 Aug 10, 2015 4:25:52 PM com.amazonaws.services.kinesis.metrics.impl.DefaultCWMetricsPublisher publishMetrics
@4000000055c8d09a13abfce4 INFO: Successfully published 6 datums.
@4000000055c8d09f2c20de0c Aug 10, 2015 4:25:57 PM com.snowplowanalytics.snowplow.storage.kinesis.s3.S3Emitter attemptEmit$1
@4000000055c8d09f2c20e5dc SEVERE: S3Emitter threw an unexpected exception
@4000000055c8d09f2c20e5dc com.amazonaws.AmazonClientException: Data read (0) has a different length than the expected (51706225)
@4000000055c8d09f2c20e9c4   at com.amazonaws.util.LengthCheckInputStream.checkLength(LengthCheckInputStream.java:135)
@4000000055c8d09f2c20e9c4   at com.amazonaws.util.LengthCheckInputStream.read(LengthCheckInputStream.java:103)
@4000000055c8d09f2c2137e4   at com.amazonaws.services.s3.internal.MD5DigestCalculatingInputStream.read(MD5DigestCalculatingInputStream.java:84)
@4000000055c8d09f2c2137e4   at org.apache.http.entity.InputStreamEntity.writeTo(InputStreamEntity.java:98)
@4000000055c8d09f2c213bcc   at com.amazonaws.http.RepeatableInputStreamRequestEntity.writeTo(RepeatableInputStreamRequestEntity.java:153)
@4000000055c8d09f2c213bcc   at org.apache.http.entity.HttpEntityWrapper.writeTo(HttpEntityWrapper.java:98)
@4000000055c8d09f2c21439c   at org.apache.http.impl.client.EntityEnclosingRequestWrapper$EntityWrapper.writeTo(EntityEnclosingRequestWrapper.java:108)
@4000000055c8d09f2c214784   at org.apache.http.impl.entity.EntitySerializer.serialize(EntitySerializer.java:122)
@4000000055c8d09f2c214784   at org.apache.http.impl.AbstractHttpClientConnection.sendRequestEntity(AbstractHttpClientConnection.java:271)
@4000000055c8d09f2c215724   at org.apache.http.impl.conn.ManagedClientConnectionImpl.sendRequestEntity(ManagedClientConnectionImpl.java:197)
@4000000055c8d09f2c215b0c   at org.apache.http.protocol.HttpRequestExecutor.doSendRequest(HttpRequestExecutor.java:257)
@4000000055c8d09f2c215b0c   at com.amazonaws.http.protocol.SdkHttpRequestExecutor.doSendRequest(SdkHttpRequestExecutor.java:47)
@4000000055c8d09f2c2162dc   at org.apache.http.protocol.HttpRequestExecutor.execute(HttpRequestExecutor.java:125)
@4000000055c8d09f2c2162dc   at org.apache.http.impl.client.DefaultRequestDirector.tryExecute(DefaultRequestDirector.java:713)
@4000000055c8d09f2c2166c4   at org.apache.http.impl.client.DefaultRequestDirector.execute(DefaultRequestDirector.java:518)
@4000000055c8d09f2c2166c4   at org.apache.http.impl.client.AbstractHttpClient.execute(AbstractHttpClient.java:906)
@4000000055c8d09f2c216e94   at org.apache.http.impl.client.AbstractHttpClient.execute(AbstractHttpClient.java:805)
@4000000055c8d09f2c216e94   at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:402)
@4000000055c8d09f2c21727c   at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:245)
@4000000055c8d09f2c217a4c   at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3722)
@4000000055c8d09f2c217e34   at com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1474)
@4000000055c8d09f2c217e34   at com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1311)
@4000000055c8d09f2c21821c   at com.snowplowanalytics.snowplow.storage.kinesis.s3.S3Emitter.attemptEmit$1(S3Emitter.scala:144)
@4000000055c8d09f2c218604   at com.snowplowanalytics.snowplow.storage.kinesis.s3.S3Emitter.emit(S3Emitter.scala:166)
@4000000055c8d09f2c2189ec   at com.amazonaws.services.kinesis.connectors.KinesisConnectorRecordProcessor.emit(KinesisConnectorRecordProcessor.java:159)
@4000000055c8d09f2c218dd4   at com.amazonaws.services.kinesis.connectors.KinesisConnectorRecordProcessor.processRecords(KinesisConnectorRecordProcessor.java:132)
@4000000055c8d09f2c2191bc   at com.amazonaws.services.kinesis.clientlibrary.lib.worker.ProcessTask.call(ProcessTask.java:125)
@4000000055c8d09f2c2195a4   at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:48)
@4000000055c8d09f2c21998c   at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:23)
@4000000055c8d09f2c21a15c   at java.util.concurrent.FutureTask.run(FutureTask.java:262)
@4000000055c8d09f2c21a544   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
@4000000055c8d09f2c21a544   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
@4000000055c8d09f2c21a92c   at java.lang.Thread.run(Thread.java:745)
@4000000055c8d09f2c21d80c
@4000000055c8d0a414e795dc Aug 10, 2015 4:26:02 PM com.amazonaws.services.kinesis.metrics.impl.DefaultCWMetricsPublisher publishMetrics
@4000000055c8d0a414e7a194 INFO: Successfully published 6 datums.

Grepping the logs for 51706225 shows that the sink has been stuck on the same batch of data for many hours (the S3 outage was resolved hours ago).

I am going to bounce the box now and fully expect the issue to resolve itself with the bounce.

Improve README

  • Usage and Running are synonyms
  • There must need to be more config changes?

Add CHANGELOG

Make sure to backfill the entries for 0.2.0 and 0.1.0

Add file name strategies

We should support:

  1. The current file name strategy (we need to come up with a sexy name for this)
  2. A new strategy, see below (we need to come up with a sexy name for this)

New strategy

  • On initialization of each sink instance, generate a UUID and create a bigint counter set to 0
  • First file is <UUID>-000000000000000...0 (where 0s are padded out to max length of a bigint)
  • Second file is <UUID>-0000000000...1, i.e. counter is incremented
  • Third file is <UUID>-0000000000...2, i.e. counter is incremented
  • On server restart, generate a new UUID and create a bigint counter set to 0

This is a strategy designed to:

  • Have minimal moving parts
  • Be extremely idiotproof
  • Minimize the size of any manifest which needs to keep track of which files have been processed

Explore exactly-once writing

At the moment, records are written at least once, due to pessimistic checkpointing.

Perhaps we could copy Secor and have filenames which were very closely tied to the events being read, so that any duplicated reading simply overwrote the same files?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.