Giter Club home page Giter Club logo

target-athena's Introduction

target-athena

PyPI version PyPI - Python Version License: Apache2

Note: This target is derived from https://github.com/transferwise/pipelinewise-target-s3-csv. Some of the documentation below has not been completely updated yet.

Singer target that uploads loads data to AWS Athena in CSV format following the Singer spec.

How to use it

The recommended method of running this target is to use it from PipelineWise. When running it from PipelineWise you don't need to configure this tap with JSON files and most of things are automated. Please check the related documentation at Target S3 CSV

If you want to run this Singer Target independently please read further.

Install

First, make sure Python 3 is installed on your system or follow these installation instructions for Mac or Ubuntu.

It's recommended to use a virtualenv:

  python3 -m venv venv
  pip install git+https://github.com/MeltanoLabs/target-athena.git

or

  python3 -m venv venv
  . venv/bin/activate
  pip install --upgrade pip
  pip install .

To run

Like any other target that's following the singer specificiation:

some-singer-tap | target-athena --config [config.json]

It's reading incoming messages from STDIN and using the properites in config.json to upload data into Postgres.

Note: To avoid version conflicts run tap and targets in separate virtual environments.

Configuration settings

Running the the target connector requires a config.json file. An example with the minimal settings:

{
  "s3_bucket": "my_bucket",
  "athena_database": "my_database"
}

Profile based authentication

Profile based authentication used by default using the default profile. To use another profile set aws_profile parameter in config.json or set the AWS_PROFILE environment variable.

Non-Profile based authentication

For non-profile based authentication set aws_access_key_id , aws_secret_access_key and optionally the aws_session_token parameter in the config.json. Alternatively you can define them out of config.json by setting AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY and AWS_SESSION_TOKEN environment variables.

Full list of options in config.json:

Property Type Required? Description
aws_access_key_id String No S3 Access Key Id. If not provided, AWS_ACCESS_KEY_ID environment variable will be used.
aws_secret_access_key String No S3 Secret Access Key. If not provided, AWS_SECRET_ACCESS_KEY environment variable will be used.
aws_session_token String No AWS Session token. If not provided, AWS_SESSION_TOKEN environment variable will be used.
aws_profile String No AWS profile name for profile based authentication. If not provided, AWS_PROFILE environment variable will be used.
s3_bucket String Yes S3 Bucket name
s3_key_prefix String A static prefix before the generated S3 key names. Using prefixes you can upload files into specific directories in the S3 bucket. Default(None)
s3_staging_dir String Yes S3 location to stage files. Example: s3://YOUR_S3_BUCKET/path/to/
delimiter String (Default: ',') A one-character string used to separate fields.
quotechar String (Default: '"') A one-character string used to quote fields containing special characters, such as the delimiter or quotechar, or which contain new-line characters.
add_record_metadata Boolean (Default: False) Metadata columns add extra row level information about data ingestions, (i.e. when was the row read in source, when was inserted or deleted in snowflake etc.) Metadata columns are creating automatically by adding extra columns to the tables with a column prefix _sdc_. The column names are following the stitch naming conventions documented at https://www.stitchdata.com/docs/data-structure/integration-schemas#sdc-columns. Enabling metadata columns will flag the deleted rows by setting the _sdc_deleted_at metadata column. Without the add_record_metadata option the deleted rows from singer taps will not be recongisable in Snowflake.
encryption_type String No (Default: 'none') The type of encryption to use. Current supported options are: 'none' and 'KMS'.
encryption_key String No A reference to the encryption key to use for data encryption. For KMS encryption, this should be the name of the KMS encryption key ID (e.g. '1234abcd-1234-1234-1234-1234abcd1234'). This field is ignored if 'encryption_type' is none or blank.
compression String No The type of compression to apply before uploading. Supported options are none (default) and gzip. For gzipped files, the file extension will automatically be changed to .csv.gz for all files.
naming_convention String No (Default: None) Custom naming convention of the s3 key. Replaces tokens date, stream, and timestamp with the appropriate values.

Supports "folders" in s3 keys e.g. folder/folder2/{stream}/export_date={date}/{timestamp}.csv.

Honors the s3_key_prefix, if set, by prepending the "filename". E.g. naming_convention = folder1/my_file.csv and s3_key_prefix = prefix_ results in folder1/prefix_my_file.csv
temp_dir String (Default: platform-dependent) Directory of temporary CSV files with RECORD messages.
athena_workgroup String No (Default: primary) The name of the workgroup in which the query is being started

To run tests:

  1. Define environment variables that requires running the tests
  export TARGET_ATHENA_ACCESS_KEY_ID=<s3-access-key-id>
  export TARGET_ATHENA_SECRET_ACCESS_KEY=<s3-secret-access-key>
  export TARGET_ATHENA_BUCKET=<s3-bucket>
  export TARGET_ATHENA_KEY_PREFIX=<s3-key-prefix>
  1. Install python test dependencies in a virtual env and run nose unit and integration tests
  python3 -m venv venv
  . venv/bin/activate
  pip install --upgrade pip
  pip install .[test]
  1. To run unit tests:
  nosetests --where=tests/unit
  1. To run integration tests:
  nosetests --where=tests/integration

To run pylint:

  1. Install python dependencies and run python linter
  python3 -m venv venv
  . venv/bin/activate
  pip install --upgrade pip
  pip install .
  pip install pylint
  pylint target_athena -d C,W,unexpected-keyword-arg,duplicate-code

License

Apache License Version 2.0

See LICENSE to see the full text.

target-athena's People

Contributors

aaronsteers avatar andrewcstewart avatar aroder avatar dependabot[bot] avatar freimer avatar koszti avatar ndrluis avatar pnadolny13 avatar samira-el avatar tw-sec avatar visch avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar

target-athena's Issues

Config `s3_staging_dir` not advertised

Explained in #39 (comment)

The config option is accessed by the target but isnt advertised in the target.py config options. Also I was getting an error saying that it was required before the working group PR was implemented so its relatively critical.

Consolidate s3 key config options

There are currently three different config fields related to s3 data location keys:

  • s3_key_prefix - Used in defining target_key for s3 upload, and used in data_locatioin for athena table definition.
  • naming_convention - Used only for defining target_key for s3 upload.
  • s3_staging_dir - Not being used in the sink function, but apparently still set as 'required' somewhere. (will fail CLI if not set)

We can probably consolidate these into at least one config option.

Incorrect metadata setting

Currently uses add_metadata_columns to add metadata columns to records. This is incorrect. The SDK uses add_record_metadata. Setting add_metadata_columns to true does not add the metadata columns.

WIP: Ambidextrous target

One thing we've been thinking about with the SDK is how much effort it would be to have 'ambidextrous' targets, meaning targets that could also be taps - aka, on the 'left' or 'right' side of the Singer pipeline. Since I haven't seen a tap-athena, I decided to start exploring this in a new branch: #1

Suggestion: use AWS Data Wrangler instead of pyathena

Hello people, I'm starting to use this target and I'm missing some features that I'm already working to make some contributions here, but I think that we can make this codebase more simpler using AWS Data Wrangler instead of pyathena.

IDK if anyone here has worked before with this library, but aws data wr abstracts all the AWS calls and catalog/database manipulation and data upload to s3 making easier to implement the parquet writer #9 for example.

Can we discuss about?

References:
https://aws-data-wrangler.readthedocs.io/en/stable/tutorials/006%20-%20Amazon%20Athena.html
https://aws-data-wrangler.readthedocs.io/en/stable/tutorials/005%20-%20Glue%20Catalog.html
https://aws-data-wrangler.readthedocs.io/en/stable/tutorials/003%20-%20Amazon%20S3.html
https://aws-data-wrangler.readthedocs.io/en/stable/tutorials/012%20-%20CSV%20Crawler.html
https://aws-data-wrangler.readthedocs.io/en/stable/tutorials/017%20-%20Partition%20Projection.html

target not generating correct column definitions for tap-mongodb

Testing #12 with the transferwise variant of tap-mongodb, it looks like every single schema generated by the target only contains _id,document, and _sdc_deleted_at. I'm not exactly sure what's going on here, but I'm guessing we'll have to by default flatten self.schema or something?

CREATE EXTERNAL TABLE `patterns`(
  `_id` string COMMENT 'from deserializer', 
  `document` string COMMENT 'from deserializer', 
  `_sdc_deleted_at` string COMMENT 'from deserializer')
ROW FORMAT SERDE 
  'org.openx.data.jsonserde.JsonSerDe' 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.mapred.TextInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat'
LOCATION
  '-'

Add/confirm support for Redshift Spectrum

In theory, because we register data in the glue catalog, those data sources might be already available for redshift spectrum tables. We would need to test+confirm this to see if this connector can be used more broadly for Redshift Spectrum as well as natively in Athena.

Verify compression is being handled correctly

From @aaronsteers

One other bug I think I found, which I didn’t document, was that I wasn’t sure if the compression was getting handed off to Athena properly. I think my first attempt with compression made the target table unreadable. I just turned it off without much further testing, but if we want to do jsonl/json, the compression would probably come up there also.

camelCase Records are null

I'm using tap-google-analytics which returns record keys using camelCase. During table creation the fields are automatically converted to lowercase by Athena so then when I run a query its looking for keys using the lower case variant and ultimately returns null for the entire column.

I was able to get it to work by rerunning the create external table statement with case.insensitive=true in the DDL I pulled from the logs which included:

WITH SERDEPROPERTIES (
  'ignore.malformed.json'='true',
  'case.insensitive'='false'
  ) 

We set case.insensitive=false in the target when using jsonl records.

Proposed Solution:

  1. set case.insensitive=true by default for jsonl. I don't see any harm in doing this but there might be an edge case I'm missing. If there are records with the same name that are distinct by casing theyll get clobbered either during table creation or on read but either way there will be a problem.
  2. As part of writing to jsonl files we could lowercase all field names in the record and potentially do any additional transformations needed for Athena to accept it. Other transformations could include replacing special characters or aliasing reserved column names.

I think option 1 is best to solve the short term problem and although I'm sensitive to transforming records in the target I think option 2 might be the long term solve in addition to capture other edge cases or at least send them through a filter and send a warning log or something.

Hyphen is not allowed in Athena table name

When the input stream is DB, Meltano's naming convention for entity and attributes commonly has the form <DB name>-<table name>.<column name>, which includes a hyphen. The current implementation tries to create an Athena table with this naming convention, but hyphens are not allowed in the Athena table.

https://docs.aws.amazon.com/athena/latest/ug/glue-best-practices.html#schema-names

The only acceptable characters for database names, table names, and column names are lowercase letters, numbers, and the underscore character.

A quick fix may be converting hyphens to underscores when creating the DDL. What do you think?

Cell / Column misalignment issue when tables created in Athena

I have been trying to run a tap-shopify to target-athena pipeline. However, when checking created tables in Athena, some cells are misaligned.

The same issue happens in target-csv. The data is heavily nested and includes arrays of JSON. e.g., If a customer purchases 1 item vs. five items, then line_items (and other columns) will grow to fit information on each.

Preserving column order from schema

Currently a stream's schema is transmitted through a Sink object as a regular Python dict - which may or may not be ordered, depending on python version (apparently?)

Anyway, we should determine whether or and how column order should be preserved both in the Hive table definition and in the written file objects (for json and csv at least).

Not all schemas have a type if "string" in property_schema["type"] and property_schema.get("format", None) in { KeyError: 'type'

Traceback (most recent call last):
  File "/home/visch/git/shopify/.meltano/loaders/target-athena/venv/bin/target-athena", line 8, in <module>
    sys.exit(cli())
  File "/home/visch/git/shopify/.meltano/loaders/target-athena/venv/lib/python3.8/site-packages/click/core.py", line 1128, in __call__
    return self.main(*args, **kwargs)
  File "/home/visch/git/shopify/.meltano/loaders/target-athena/venv/lib/python3.8/site-packages/click/core.py", line 1053, in main
    rv = self.invoke(ctx)
  File "/home/visch/git/shopify/.meltano/loaders/target-athena/venv/lib/python3.8/site-packages/click/core.py", line 1395, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/home/visch/git/shopify/.meltano/loaders/target-athena/venv/lib/python3.8/site-packages/click/core.py", line 754, in invoke
    return __callback(*args, **kwargs)
  File "/home/visch/git/shopify/.meltano/loaders/target-athena/venv/lib/python3.8/site-packages/singer_sdk/target_base.py", line 382, in cli
    target.listen()
  File "/home/visch/git/shopify/.meltano/loaders/target-athena/venv/lib/python3.8/site-packages/singer_sdk/target_base.py", line 145, in listen
    self._process_lines(sys.stdin)
  File "/home/visch/git/shopify/.meltano/loaders/target-athena/venv/lib/python3.8/site-packages/singer_sdk/target_base.py", line 205, in _process_lines
    self._process_record_message(line_dict)
  File "/home/visch/git/shopify/.meltano/loaders/target-athena/venv/lib/python3.8/site-packages/singer_sdk/target_base.py", line 248, in _process_record_message
    sink._validate_and_parse(transformed_record)
  File "/home/visch/git/shopify/.meltano/loaders/target-athena/venv/lib/python3.8/site-packages/singer_sdk/sinks.py", line 224, in _validate_and_parse
    self._parse_timestamps_in_record(
  File "/home/visch/git/shopify/.meltano/loaders/target-athena/venv/lib/python3.8/site-packages/singer_sdk/sinks.py", line 239, in _parse_timestamps_in_record
    datelike_type = get_datelike_property_type(key, schema["properties"][key])
  File "/home/visch/git/shopify/.meltano/loaders/target-athena/venv/lib/python3.8/site-packages/singer_sdk/helpers/_typing.py", line 105, in get_datelike_property_type
    if "string" in property_schema["type"] and property_schema.get("format", None) in {
KeyError: 'type'

Add support for json/jsonl format

Athena also supports JSON/JSONL as a file format. This option may address issue #6 (CSV column shift issue).

See Athena documentation: https://docs.aws.amazon.com/athena/latest/ug/parsing-JSON.html

Adding support for JSON format would involve the following:

  • adding support for specifying different formats
  • adding JSON writer logic
  • adding JSON format specific Athena table definition

Example of JSON table definition:

CREATE EXTERNAL TABLE json_table (
  column_a string,
  column_b int
 )
 ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
 WITH SERDEPROPERTIES ('ignore.malformed.json' = 'true')
 LOCATION 's3://bucket/path/';

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.