Giter Club home page Giter Club logo

cloud-storage-extension's People

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

cloud-storage-extension's Issues

Regex path matching

Question: is it possible to support regex path matching (not a simple prefix)?

/<bucket>/<folder…>/<batch…>/<table>/<file>

For small batch processing, it is useful to provide this prefix to get all data for a table in a batch.

/<bucket>/<folder…>/<batch…>/<table>

Another folder structure may be:

/<bucket>/<folder…>/<table>/<batch…>/<file>

For large batch processing, it is useful to organize folders this way to allow getting all data for a table across batches

/<bucket>/<folder…>/<table>/<batch…>

Assuming that AWS does not allow prefix wildcards, perhaps this is something that you could implement as a work-around.

I would like to store parquet files with paths like this:

/<bucket>/<folder…>/<batch…>/<table>/<file>

But if I need to do a full sweep of all batches, supply you a path like this:

/<bucket>/<folder…>/*/<table>

In this case “%” or “*” matches any path until

is reached. Maybe a regex expression would work well.

Import S3 parquet files - Partition values excluded

Hi,
I'm trying to import data from AWS S3. This data is in parquet format and it is part of a partitioned table.
The problem I have is that I can't import partition columns. For example, if I have my table partitioned with "date" as partition column:

  • s3://..../date=20200501/*.parquet
  • s3://..../date=20200502/*.parquet

Is there any way to include column "date" in the import load?

Thanks in advance,

Sergio

Add Avro format import support

As a user of cloud-storage-etl-udfs,
in addition to parquet format, I want to import from Avro format my Exasol tables.

Ignore Hadoop hidden files

As a user of cloud-storage-etl-udfs,
I want to ignore the Hadoop hidden files which usually start with underscore, e.g, _SUCCESS.

Support Exasol connection to access credentials

The cloud-storage-etl-udfs should use the user provided CONNECTION.

In Exasol you can create an connection object, for example,

CREATE CONNECTION my_s3_connection
TO '' 
USER 'my_s3_access_key'
IDENTIFIED BY 'my_s3_secret_key'

then this connection name can be used in IMPORT / EXPORT user defined functions.

Setup the project for using it with the Intellij IDEA

Situation

The project has some problems with running inside the IntelliJ IDEA. The problems are caused by dependencies.

Solution

Check and update dependencies.
Check that the solution works both in the IDEA in the command line.

java.lang.OutOfMemoryError: Java heap space

Hello,
I have a OutOfMemoryError when exporting a huge table using version 0.7.0.

EXPORT RAW.EXA_DBA_AUDIT_SQL INTO SCRIPT ETL.EXPORT_PATH WITH BUCKET_PATH = 'gs://helios-manual-backup/export/parquetexa_dba_audit_sql/' DATA_FORMAT = 'PARQUET' GCS_PROJECT_ID = '<exasol>' GCS_KEYFILE_PATH = '/buckets/bfsdefault/exasol_backup/key.json' PARALLELISM = 'iproc(), floor(random()*2)';

[2020-04-21 19:35:11] [22002] VM error:
[2020-04-21 19:35:11] java.lang.OutOfMemoryError: Java heap space
[2020-04-21 19:35:11] Stack trace:
[2020-04-21 19:35:11] java.base/java.io.ByteArrayOutputStream.<init>(ByteArrayOutputStream.java:79)
[2020-04-21 19:35:11] org.apache.parquet.bytes.BytesInput$BAOS.<init>(BytesInput.java:234)
[2020-04-21 19:35:11] org.apache.parquet.bytes.BytesInput$BAOS.<init>(BytesInput.java:232)
[2020-04-21 19:35:11] org.apache.parquet.bytes.BytesInput.toByteArray(BytesInput.java:202)
[2020-04-21 19:35:11] org.apache.parquet.bytes.ConcatenatingByteArrayCollector.collect(ConcatenatingByteArrayCollector.java:33)
[2020-04-21 19:35:11] org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(ColumnChunkPageWriteStore.java:126)
[2020-04-21 19:35:11] org.apache.parquet.column.impl.ColumnWriterV1.writePage(ColumnWriterV1.java:147)
[2020-04-21 19:35:11] org.apache.parquet.column.impl.ColumnWriterV1.flush(ColumnWriterV1.java:235)
[2020-04-21 19:35:11] org.apache.parquet.column.impl.ColumnWriteStoreV1.flush(ColumnWriteStoreV1.java:122)
[2020-04-21 19:35:11] org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:172)
[2020-04-21 19:35:11] org.apache.parquet.hadoop.InternalParquetRecordWriter.checkBlockSizeReached(InternalParquetRecordWriter.java:148)
[2020-04-21 19:35:11] org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:130)
[2020-04-21 19:35:11] org.apache.parquet.hadoop.ParquetWriter.write(ParquetWriter.java:299)
[2020-04-21 19:35:11] com.exasol.cloudetl.sink.BatchSizedSink$$anon$1.write(BatchSizedSink.scala:55)
[2020-04-21 19:35:11] com.exasol.cloudetl.sink.BatchSizedSink$$anon$1.write(BatchSizedSink.scala:48)
[2020-04-21 19:35:11] com.exasol.cloudetl.sink.BatchSizedSink.write(BatchSizedSink.scala:73)
[2020-04-21 19:35:11] com.exasol.cloudetl.scriptclasses.ExportTable$.run(ExportTable.scala:34)
[2020-04-21 19:35:11] com.exasol.cloudetl.scriptclasses.ExportTable.run(ExportTable.scala)
[2020-04-21 19:35:11] java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
[2020-04-21 19:35:11] java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
[2020-04-21 19:35:11] java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
[2020-04-21 19:35:11] java.base/java.lang.reflect.Method.invoke(Method.java:566)
[2020-04-21 19:35:11] com.exasol.ExaWrapper.run(ExaWrapper.java:168) (Session: 1664607525621005846)

I am not sure if this is a bug or am I doing sth wrong?

failure to login: NullPointerException: invalid null input: name

Hi,
I try to use this UDF to ingest Parquet formats and keep receiving the following error:

[2020-03-03 14:03:39] [22002] VM error: [2020-03-03 14:03:39] org.apache.hadoop.security.KerberosAuthException: failure to login: javax.security.auth.login.LoginException: java.lang.NullPointerException: invalid null input: name [2020-03-03 14:03:39] at com.sun.security.auth.UnixPrincipal.<init>(UnixPrincipal.java:71) [2020-03-03 14:03:39] at com.sun.security.auth.module.UnixLoginModule.login(UnixLoginModule.java:133) [2020-03-03 14:03:39] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) [2020-03-03 14:03:39] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) [2020-03-03 14:03:39] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) [2020-03-03 14:03:39] at java.lang.reflect.Method.invoke(Method.java:498) [2020-03-03 14:03:39] at javax.security.auth.login.LoginContext.invoke(LoginContext.java:755) [2020-03-03 14:03:39] at javax.security.auth.login.LoginContext.access$000(LoginContext.java:195) [2020-03-03 14:03:39] at javax.security.auth.login.LoginContext$4.run(LoginContext.java:682) [2020-03-03 14:03:39] at javax.security.auth.login.LoginContext$4.run(LoginContext.java:680) [2020-03-03 14:03:39] at java.security.AccessController.doPrivileged(Native Method) [2020-03-03 14:03:39] at javax.security.auth.login.LoginContext.invokePriv(LoginContext.java:680) [2020-03-03 14:03:39] at javax.security.auth.login.LoginContext.login(LoginContext.java:587) [2020-03-03 14:03:39] at org.apache.hadoop.security.UserGroupInformation$HadoopLoginContext.login(UserGroupInformation.java:1926) [2020-03-03 14:03:39] at org.apache.hadoop.security.UserGroupInformation.doSubjectLogin(UserGroupInformation.java:1837) [2020-03-03 14:03:39] at org.apache.hadoop.security.UserGroupInformation.createLoginUser(UserGroupInformation.java:710) [2020-03-03 14:03:39] at org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:660) [2020-03-03 14:03:39] at org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:571) [2020-03-03 14:03:39] at org.apache.hadoop.fs.FileSystem$Cache$Key.<init>(FileSystem.java:3487) [2020-03-03 14:03:39] at org.apache.hadoop.fs.FileSystem$Cache$Key.<init>(FileSystem.java:3477) [2020-03-03 14:03:39] at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3319) [2020-03-03 14:03:39] at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:479) [2020-03-03 14:03:39] at com.exasol.cloudetl.bucket.Bucket.fileSystem$lzycompute(Bucket.scala:60) [2020-03-03 14:03:39] at com.exasol.cloudetl.bucket.Bucket.fileSystem(Bucket.scala:59) [2020-03-03 14:03:39] at com.exasol.cloudetl.bucket.Bucket.getPaths(Bucket.scala:68) [2020-03-03 14:03:39] at com.exasol.cloudetl.scriptclasses.ImportMetadata$.run(ImportMetadata.scala:22) [2020-03-03 14:03:39] at com.exasol.cloudetl.scriptclasses.ImportMetadata.run(ImportMetadata.scala) [2020-03-03 14:03:39] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) [2020-03-03 14:03:39] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) [2020-03-03 14:03:39] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) [2020-03-03 14:03:39] at java.lang.reflect.Method.invoke(Method.java:498) [2020-03-03 14:03:39] at com.exasol.ExaWrapper.run(ExaWrapper.java:186) [2020-03-03 14:03:39] Stack trace: [2020-03-03 14:03:39] org.apache.hadoop.security.UserGroupInformation.doSubjectLogin(UserGroupInformation.java:1847) [2020-03-03 14:03:39] org.apache.hadoop.security.UserGroupInformation.createLoginUser(UserGroupInformation.java:710) [2020-03-03 14:03:39] org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:660) [2020-03-03 14:03:39] org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:571) [2020-03-03 14:03:39] org.apache.hadoop.fs.FileSystem$Cache$Key.<init>(FileSystem.java:3487) [2020-03-03 14:03:39] org.apache.hadoop.fs.FileSystem$Cache$Key.<init>(FileSystem.java:3477) [2020-03-03 14:03:39] org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3319) [2020-03-03 14:03:39] org.apache.hadoop.fs.FileSystem.get(FileSystem.java:479) [2020-03-03 14:03:39] com.exasol.cloudetl.bucket.Bucket.fileSystem$lzycompute(Bucket.scala:60) [2020-03-03 14:03:39] com.exasol.cloudetl.bucket.Bucket.fileSystem(Bucket.scala:59) [2020-03-03 14:03:39] com.exasol.cloudetl.bucket.Bucket.getPaths(Bucket.scala:68) [2020-03-03 14:03:39] com.exasol.cloudetl.scriptclasses.ImportMetadata$.run(ImportMetadata.scala:22) [2020-03-03 14:03:39] com.exasol.cloudetl.scriptclasses.ImportMetadata.run(ImportMetadata.scala) [2020-03-03 14:03:39] (Session: 1660147398588182131)

The error is the same wether I load from GCS or S3.

Exasol node runs on kubernetes:
image: exasol/docker-db:6.1.6-d1
udf function: cloud-storage-etl-udfs-0.6.1.jar

Thanks,
Alex

Correctly import parquet date and timestamp types

Currently we are not able to store parquet date and timestamp types in Exasol tables when importing via a udf.

  • Able to store parquet Date into table column with Exasol Date type
  • Able to store parquet Timestamp into table column with Exasol Timestamp type

Parquet logical types: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md

In summary, we should use originalType information in parquet PrimitiveType in order to determine the time related types.

  • typeName = INT32 , originalType = date : should be Date
  • typeName = INT64, originalType in (timestamp_micros, timestamp_millis): should be Timestamp

Support writing parquet files to cloud storage

Support writing queries / tables directly to parquet files in cloud storage accounts

Apologies, this is more a feature request than an issue:

  • Is direct support for writing parquet to cloud storage a planned feature of this repository?

Rationale

  • Increases interoperabilty with data lake architectures
  • In addition directly creating parquest files from scripting would enable easier integration in existing etl jobs and etl/elt infrastructure
  • Simplifies architecture as there is no need for an external process doing the work
  • Simplifies deployment as it avoids the hassle of extensive python scripting to do the work with the burden of uploading python native libraries to ExaBucket (pyarrow, pyarrow parquet etc.)

Add support for importing Parquet timestamp milliseconds

Some data systems (for example Snowflake) export timestamp values as milliseconds since epoch using INT64 Parquet type. Currently, we are not able to import these timestamp data types, only INT96 formatted timestamp types are supported.

As a user cloud-storage-etl-udfs, I want to import Parquet INT64 (TIMESTAMP_MILLIS) types as Exasol timestamp value.

Add compression codec extension to export path.

Situation

At the moment if the compression codec is set when exporting a table, the output parquet file containing data does not indicate it.

Acceptance Criteria

The exported file clearly presents which compression codec (snappy, gzip, lzo) was used when writing the exported data into a parquet file.

For example, with compression codec /path/exa_export_nodeId_vmId_uuid.snappy.parquet and without compression /path/exa_export_nodeId_vmId_uuid.parquet.

Unable to import into a table that resides in a schema that has a limited identifier name

Wheny trying to import into a schema that has a case sensitive name (limited identifier), the UDF seems not to consider this, and doesn't find the schema (the error message says it wants to import into the case-insensitive (uppercased) schema name.
[Code: 0, SQL State: 42000] object AAAA_BBBB_CCCC.DDDDD_EEEEEEEEE not found [line 1, column 13] (Session: 1649468587879417167) [Script position: 2133 - 2134] IMPORT INTO "aaaa_bbbb_cccc".DDDDD_EEEEEEEEE FROM SCRIPT ETL.IMPORT_PATH WITH BUCKET_PATH = 'wasbs://[email protected]/y/z/*' DATA_FORMAT = 'PARQUET' AZURE_ACCOUNT_NAME = 'x' AZURE_CONTAINER_NAME = 'ff-ggg' AZURE_SAS_TOKEN = '?sv=...' PARALLELISM = 'nproc()'

Update documentation

Update the docs so that it includes latest changes related to Kafka import UDF.

Support ORC format import

User of cloud-storage-etl-udfs should be able to import ORC formatted files into an Exasol tables.

Add an integration test setup

We should be able to test udfs using integration test setup.

Possible requirements:

  • Setup automated Exasol docker that upload jar to bucketFs
  • Setup s3 (gcs, azure) docker instances
  • Perform write <-> read operations

Missing documentation on good default values for parallelism, number of files, file size when importing

@morazow parquet import support is outstanding as we can avoid brittle CSV processes as in the regular bulk importer.

However we have run into multiple problems during bulk loading that can be avoided by using appropriate parameters and chunking of imports.

For example timeouts occur if we load too much with one import, memory exhaustion occurs when loading too many large parquet files in parallel, parallelism too high leads to failed import.

Thus there are at lease three tuning knobs we need set appropriately:

  1. Size of the parquet input files
  2. Number of parquet input files in blob store
  3. Level of parallelism

Missing Documentation Hints

  1. how many blobs should we push into blob storage to help exasol process them in parallel? Large parquet files (1 Mio records, 80 cols) fail to load in parallel (memory exhaustion)
  2. What is the optimum level of parallelism (how far can we push parallelism up in relation to number or cores and available memory)
  3. How many records / chunks / size should we package per individual import statement?

Exported Parquet Files are incompatible with Hive due to capital letters in column names

When using the script EXPORT_PATH to export an eXasol Table, the generated parquet files have a schema with columns names in capital letters. The reason is probably that EXASOL uses upper case metadata.

This is not a problem by itself, but when it comes to store these files as a hive table, where Hive and Spark share the common meta-store, new issues appear.

As explained here, Hive is case insensitive, while Parquet is not.

Hive stores the table, field names in lowercase in Hive Metastore.
Spark preserves the case of the field name in Dataframe, Parquet Files.
When a table is created/accessed using Spark SQL, Case Sensitivity is preserved by Spark storing the details in Table Properties (in hive metastore). This results in a weird behavior when parquet records are accessed thru Spark SQL using Hive Metastore.

Therefore, as a user of cloud-storage-etl-udfs,
I want to be able to export parquet files with column names in lower case to maximize compatibility with Hive and Spark.

EXPORT SALES_POSITIONS
INTO SCRIPT ETL.EXPORT_PATH WITH
  BUCKET_PATH    = 's3a://bucket-path/parquet/retail/sales_positions/'
  S3_ACCESS_KEY  = 'MY_AWS_ACCESS_KEY'
  S3_SECRET_KEY  = 'MY_AWS_SECRET_KEY'
  S3_ENDPOINT    = 's3.MY_REGION.amazonaws.com'
  PARALLELISM    = 'iproc(), floor(random()*4)'
  LOWERCASE_SCHEMA = true;

Import from S3 without Access key but with IAM role

For Exasol instances hosted in AWS, you could specify an IAM role to grant them read/write access to S3. Therefore, it would be good to have an option to import without Access key and secret but with making use of that role.

Parameter to overwrite s3 path when exporting to s3

As a user of cloud-storage-etl-udfs,
I want to be able to customise the export so that a second run on the same BUCKET_PATH overwrites the contents of such path instead of adding new exa_export_N files.

This is useful when your BUCKET_PATH is partitioned as s3a://my-bucket/parquet/export/sales_positions/YYYYMMDD/and you want to reprocess one day.

Example call:

EXPORT SALES_POSITIONS
INTO SCRIPT ETL.EXPORT_PATH WITH
  BUCKET_PATH    = 's3a://my-bucket/parquet/export/sales_positions/'
  OVERWRITE_PATH  = true
  S3_ACCESS_KEY  = 'MY_AWS_ACCESS_KEY'
  S3_SECRET_KEY  = 'MY_AWS_SECRET_KEY'
  S3_ENDPOINT    = 's3.MY_REGION.amazonaws.com'
  PARALLELISM    = 'iproc(), floor(random()*4)';

Ignore Error/Log Error

I would like to use DECIMAL(10,3) data type but load fails because of a single malformed input record that I can't identify. Changing datatype to DOUBLE forces rounding and uses extra space. Even better would be a validation/casting function to coerce a failed numeric casting or truncate long text values, on fail. This would probably be expensive...

Would it make sense to have an ignore error flag with logging options to identify failure rows? Log import errors seems part of the CSV loader, is it here also?

Change to multi module setup

Situation

The cloud-storage-etl-udfs contains different features such importing/exporting
from cloud storage systems, importing from streaming services Apache Kafka and
AWS Kinesis. However, at the moment we are building a single artifact for all of
them.

Split the project into modules so that each of them can be released and used
independently.

Acceptance Criteria

  • It is split into different modules: common, storage, streaming-kafka, and
    streaming-kinesis.
  • All unit tests and integration tests pass in the new setup.

Add Kafka import additional consumer properties

An additional useful properties for the Kafka consumer application:

  • MAX_POLL_RECORDS: The maximum number of records returned in a single call to poll(). The default value is 500. Since we perform a single poll() each time the udf is run, users can increase this value to import more records from partitions on each udf run.

  • FETCH_MIN_BYTES: The minimum amount of data the server should return for a fetch request. If insufficient data is available the request will wait for that much data to accumulate before answering the request. The default setting of 1 byte means that fetch requests are answered as soon as a single byte of data is available or the fetch request times out waiting for data to arrive. Setting this to something greater than 1 will cause the server to wait for larger amounts of data to accumulate which can improve server throughput a bit at the cost of some additional latency.

  • SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: The endpoint identification algorithm to validate server hostname using server certificate. Default value is https. User should be able to set this to empty string ("") if they want to disable host identification while using secure connection to Kafka clusters.

Azure Data Lake Gen2 Storage Import/Export

Hi I am trying to set up a connection between Exasol and Azure Data Lake Gen2 Storage following the set up suggested here but it doesn't seems to work.
The main problem seems to be the bucket path

Using the
BUCKET_PATH = 'abfs://<AZURE_CONTAINER_NAME>/BlobPath@<AZURE_ACCOUNT_NAME>.dfs.core.windows.net/import/orc/*'

I got the error:

SQL Error [22002]: VM error:
abfs://AZURE_CONTAINER_NAME>@<AZURE_ACCOUNT_NAME>.dfs.core.windows.net/export/parquet has invalid authority.
Stack trace:
org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.authorityParts(AzureBlobFileSystemStore.java:187)
org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.(AzureBlobFileSystemStore.java:134)
org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem.initialize(AzureBlobFileSystem.java:104)
org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3303)
org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:124)
org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3352)
org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3320)
org.apache.hadoop.fs.FileSystem.get(FileSystem.java:479)
com.exasol.cloudetl.bucket.Bucket.fileSystem$lzycompute(Bucket.scala:66)
com.exasol.cloudetl.bucket.Bucket.fileSystem(Bucket.scala:65)
com.exasol.cloudetl.bucket.Bucket.getPaths(Bucket.scala:75)
com.exasol.cloudetl.scriptclasses.ImportMetadata$.run(ImportMetadata.scala:22)
com.exasol.cloudetl.scriptclasses.ImportMetadata.run(ImportMetadata.scala)
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.base/java.lang.reflect.Method.invoke(Method.java:566)
com.exasol.ExaWrapper.run(ExaWrapper.java:186) (Session: xx)

Using:

BUCKET_PATH = 'abfs://<AZURE_CONTAINER_NAME>@<AZURE_ACCOUNT_NAME>.dfs.core.windows.net/BlobPath/import/orc/*'

The Import/Exports runs with no error, but doesn't load any data.

for security reasons I am not publishing my <AZURE_CONTAINER_NAME> and <AZURE_ACCOUNT_NAME>

Any suggestions?

Many thanks,

Diana Gomes

Introduce `batchSize` parameter on export

Situation

Recently we found out that when exporting large Exasol table with many (billions) rows to parquet format, the memory usage quickly rises up. Additionally, the parquet writer writes to disk after some threshold, which is problem since each vm has limited disk size.

These are result of not quickly closing parquet writer file.

Solution

  • Introduce a batchSize parameter provided by users (assuming they know better the Exasol table sizes)
  • Once the the current record size reaches that batch size close the writer and start a new file.

Downside

  • This can introduce smaller files because the final file will contain less than batchsize records.

Add Avro format export support

As a user of cloud-storage-etl-udfs,
in addition to parquet format, I want to export into Avro format my Exasol tables.

java.lang.IllegalStateException when trying to download Parquet file

When trying to pull parquet data from S3, I get the following error:

[22002] VM error:
java.lang.IllegalStateException: Socket not created by this factory
Stack trace:
org.apache.http.util.Asserts.check(Asserts.java:34)
org.apache.http.conn.ssl.SSLSocketFactory.isSecure(SSLSocketFactory.java:435)
org.apache.http.impl.conn.DefaultClientConnectionOperator.openConnection(DefaultClientConnectionOperator.java:186)
org.apache.http.impl.conn.ManagedClientConnectionImpl.open(ManagedClientConnectionImpl.java:326)
org.apache.http.impl.client.DefaultRequestDirector.tryConnect(DefaultRequestDirector.java:610)
org.apache.http.impl.client.DefaultRequestDirector.execute(DefaultRequestDirector.java:445)
org.apache.http.impl.client.AbstractHttpClient.doExecute(AbstractHttpClient.java:835)
org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56)
com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:728)
com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:489)
com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:310)
com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3785)
com.amazonaws.services.s3.AmazonS3Client.headBucket(AmazonS3Client.java:1107)
com.amazonaws.services.s3.AmazonS3Client.doesBucketExist(AmazonS3Client.java:1070)
org.apache.hadoop.fs.s3a.S3AFileSystem.verifyBucketExists(S3AFileSystem.java:276)
org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:236)
org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2812)
org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:100)
org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2849)
org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2831)
org.apache.hadoop.fs.FileSystem.get(FileSystem.java:389)
com.exasol.cloudetl.bucket.Bucket.fs$lzycompute(Bucket.scala:21)
com.exasol.cloudetl.bucket.Bucket.fs(Bucket.scala:20)
com.exasol.cloudetl.bucket.Bucket.getPaths(Bucket.scala:24)
com.exasol.cloudetl.scriptclasses.ImportMetadata$.run(ImportMetadata.scala:24)
com.exasol.cloudetl.scriptclasses.ImportMetadata.run(ImportMetadata.scala)
(Session: 1619744297144393908)

Improve orc data import

Problem

The import process throws java.lang.OutOfMemoryError: Java heap space exceptions occasionally if a single vm has to process many large files.

This is related to #27 issue.

Example error:

Import von allen ORC-Dateien (47GB, Snappy Compressed):

IMPORT INTO STAGING_AREA.EMPLOYEE
FROM SCRIPT ETL.IMPORT_PATH WITH
BUCKET_PATH = 'adl://xyz.azuredatalakestore.net/projects/exasol_benchmark/employee_orc/*'
DATA_FORMAT = 'ORC'
AZURE_CLIENT_ID = ''
AZURE_CLIENT_SECRET = ''
AZURE_DIRECTORY_ID = ''
PARALLELISM = 'nproc()';

[22002] VM error: 
java.lang.OutOfMemoryError: Java heap space
Stack trace:
org.apache.orc.impl.RecordReaderUtils.readDiskRanges(RecordReaderUtils.java:558)
org.apache.orc.impl.RecordReaderUtils$DefaultDataReader.readFileData(RecordReaderUtils.java:278)
org.apache.orc.impl.RecordReaderImpl.readAllDataStreams(RecordReaderImpl.java:1107)
org.apache.orc.impl.RecordReaderImpl.readStripe(RecordReaderImpl.java:1063)
org.apache.orc.impl.RecordReaderImpl.advanceStripe(RecordReaderImpl.java:1216)
org.apache.orc.impl.RecordReaderImpl.advanceToNextRow(RecordReaderImpl.java:1251)
org.apache.orc.impl.RecordReaderImpl.<init>(RecordReaderImpl.java:276)
org.apache.orc.impl.ReaderImpl.rows(ReaderImpl.java:639)
com.exasol.cloudetl.orc.OrcRowIterator$$anon$1.<init>(OrcRowIterator.scala:18)
com.exasol.cloudetl.orc.OrcRowIterator$.apply(OrcRowIterator.scala:16)
com.exasol.cloudetl.source.OrcSource.$anonfun$stream$1(OrcSource.scala:35)
com.exasol.cloudetl.source.OrcSource$$Lambda$24/73181251.apply(Unknown Source)
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:237)
scala.collection.TraversableLike$$Lambda$7/942518407.apply(Unknown Source)
scala.collection.immutable.List.foreach(List.scala:392)
scala.collection.TraversableLike.map(TraversableLike.scala:237)
scala.collection.TraversableLike.map$(TraversableLike.scala:230)
scala.collection.immutable.List.map(List.scala:298)
com.exasol.cloudetl.source.OrcSource.stream(OrcSource.scala:27)
com.exasol.cloudetl.scriptclasses.ImportFiles$.readAndEmit(ImportFiles.scala:52)
com.exasol.cloudetl.scriptclasses.ImportFiles$.run(ImportFiles.scala:26)
com.exasol.cloudetl.scriptclasses.ImportFiles.run(ImportFiles.scala)
(Session: 1635041137845565220)

Mit %jvmoption -Xmx1024m;

[22002] VM error: 
java.lang.OutOfMemoryError: Java heap space
Stack trace:
org.apache.orc.impl.RecordReaderUtils.readDiskRanges(RecordReaderUtils.java:558)
org.apache.orc.impl.RecordReaderUtils$DefaultDataReader.readFileData(RecordReaderUtils.java:278)
org.apache.orc.impl.RecordReaderImpl.readAllDataStreams(RecordReaderImpl.java:1107)
org.apache.orc.impl.RecordReaderImpl.readStripe(RecordReaderImpl.java:1063)
org.apache.orc.impl.RecordReaderImpl.advanceStripe(RecordReaderImpl.java:1216)
org.apache.orc.impl.RecordReaderImpl.advanceToNextRow(RecordReaderImpl.java:1251)
org.apache.orc.impl.RecordReaderImpl.<init>(RecordReaderImpl.java:276)
org.apache.orc.impl.ReaderImpl.rows(ReaderImpl.java:639)
com.exasol.cloudetl.orc.OrcRowIterator$$anon$1.<init>(OrcRowIterator.scala:18)
com.exasol.cloudetl.orc.OrcRowIterator$.apply(OrcRowIterator.scala:16)
com.exasol.cloudetl.source.OrcSource.$anonfun$stream$1(OrcSource.scala:35)
com.exasol.cloudetl.source.OrcSource$$Lambda$24/73181251.apply(Unknown Source)
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:237)
scala.collection.TraversableLike$$Lambda$7/942518407.apply(Unknown Source)
scala.collection.immutable.List.foreach(List.scala:392)
scala.collection.TraversableLike.map(TraversableLike.scala:237)
scala.collection.TraversableLike.map$(TraversableLike.scala:230)
scala.collection.immutable.List.map(List.scala:298)
com.exasol.cloudetl.source.OrcSource.stream(OrcSource.scala:27)
com.exasol.cloudetl.scriptclasses.ImportFiles$.readAndEmit(ImportFiles.scala:52)
com.exasol.cloudetl.scriptclasses.ImportFiles$.run(ImportFiles.scala:26)
com.exasol.cloudetl.scriptclasses.ImportFiles.run(ImportFiles.scala)

Solution

Improve the import process: check that all readers are closed and release after a file data is emitted to Exasol.

Refactor style and linting checks

Currently several scalastyle checking and linting plugins are used with strict configurations. For example, some of the error checks are:

  • using var
  • using null
  • using mutable data structures
  • and others

Even though these are good code conventions, since we usually use the Java api-s, and required to break these rules. As a result, the use of @SuppressWarnings clutters the codebase.

Therefore, disable the most commonly suppressed checks, such as var, null or mutable structures.

Complex types in Parquet storage format

Exasol does not support complex types such as Map, Struct or Array.

Therefore, it is important how to support complex types in Parquet or Avro formats.

One possible option is to converting them into json string and to store them as a VARCHAR in Exasol table.

Import s3 paths longer than 200 characters

At the moment, it is not possible to import (potentially neither export, but not tested) s3 paths longer than 200 characters.

The process returns below error:

java.lang.Exception: Error: [22002] VM error: data exception - string data, right truncation: max length: 200, emitted: 222 

Example:

IMPORT INTO myschema.mytable
FROM SCRIPT ETL.IMPORT_PATH WITH
BUCKET_PATH    = 's3a://my-very-long-bucket/aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa-my-very-long-path-bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb/cccccccccccccccccccccccccccccc/with-subprojects/mytable/year=2020/month=04/day=02'
DATA_FORMAT    = 'PARQUET'
CONNECTION_NAME = 'MYCONNECTION'
S3_ENDPOINT    = 's3-eu-west-1.amazonaws.com'
PARALLELISM    = 'nproc()';

We have tried to reduce the length of the path, but sometimes due to project hierarchy or dependencies on other teams this is not possible.

Kindly extend this limitation if possible.

Thanks and regards,
Jorge

ORC import: unsupported type scala.math.BigDecimal

When importing ORC decimal types it throws an exception as below:
[Code: 0, SQL State: 22002] VM error:
com.exasol.ExaDataTypeException: emit column MY_COLUMN_NAME' is of unsupported type scala.math.BigDecimal
Stack trace:
com.exasol.ExaIteratorImpl.emit(ExaIteratorImpl.java:187)
com.exasol.cloudetl.scriptclasses.ImportFiles$.$anonfun$readAndEmit$2(ImportFiles.scala:56)
com.exasol.cloudetl.scriptclasses.ImportFiles$.$anonfun$readAndEmit$2$adapted(ImportFiles.scala:54)
scala.collection.Iterator.foreach(Iterator.scala:941)
scala.collection.Iterator.foreach$(Iterator.scala:941)
scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
com.exasol.cloudetl.scriptclasses.ImportFiles$.$anonfun$readAndEmit$1(ImportFiles.scala:54)
com.exasol.cloudetl.scriptclasses.ImportFiles$.$anonfun$readAndEmit$1$adapted(ImportFiles.scala:53)
scala.collection.immutable.List.foreach(List.scala:392)
com.exasol.cloudetl.scriptclasses.ImportFiles$.readAndEmit(ImportFiles.scala:53)
com.exasol.cloudetl.scriptclasses.ImportFiles$.run(ImportFiles.scala:27)
com.exasol.cloudetl.scriptclasses.ImportFiles.run(ImportFiles.scala)
(Session: 1652408401446920240)

Expose progress information of IMPORT queries

Currently, IMPORT queries are atomic; we don't get feedback until it is done.
Please provide and document ways to programmatically monitor IMPORT queries.

Related to #61.

Rationale

IMPORT queries can run for quite a while. We're building a data pipeline with an Exasol instance as one target, and we've seen >1TB of data (in ~25k Parquet files on S3) which took ~1.5d to import.

Unfortunately, we don't seem to get any feedback on how far along the query is. In terms of observability and monitoring, that's a big issue. We're currently in the process of migrating to importing small batches of files after each other, which I understand is not best practice with Exasol (?).

I understand there are logs on the Exasol cluster, but we can not acces that (in production); we are purely users of Exasol, not admins. Therefore, we can not access those logs. (Or can we?)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.