Giter Club home page Giter Club logo

dsbulk's Introduction

DataStax Bulk Loader Overview

The DataStax Bulk Loader tool (DSBulk) is a unified tool for loading into and unloading from Cassandra-compatible storage engines, such as OSS Apache Cassandra®, DataStax Astra and DataStax Enterprise (DSE).

Out of the box, DSBulk provides the ability to:

  1. Load (import) large amounts of data into the database efficiently and reliably;
  2. Unload (export) large amounts of data from the database efficiently and reliably;
  3. Count elements in a database table: how many rows in total, how many rows per replica and per token range, and how many rows in the top N largest partitions.

Currently, CSV and Json formats are supported for both loading and unloading data.

Installation

DSBulk can be downloaded from several locations:

  • From DataStax Downloads.
    • Available formats: zip, tar.gz.
  • From GitHub.
    • Available formats: zip, tar.gz and executable jar.
  • From Maven Central: download the artifact dsbulk-distribution, for example from here.
    • Available formats: zip, tar.gz and executable jar.

Please note: only the zip and tar.gz formats are considered production-ready. The executable jar is provided as a convenience for users that want to try DSBulk, but it should not be deployed in production environments.

To install DSBulk, simply unpack the zip or tar.gz archives.

The executable jar can be executed with a command like java -jar dsbulk-distribution.jar [subcommand] [options]. See below for command line options.

Documentation

The most up-to-date documentation is available online.

We also recommend reading the series of blog posts made by Brian Hess; they target a somewhat older version of DSBulk, but most of the contents are still valid and very useful:

  1. DataStax Bulk Loader Pt. 1 — Introduction and Loading
  2. DataStax Bulk Loader Pt. 2 — More Loading
  3. DataStax Bulk Loader Pt. 3 — Common Settings
  4. DataStax Bulk Loader Pt. 4 — Unloading
  5. DataStax Bulk Loader Pt. 5 — Counting
  6. DataStax Bulk Loader: Examples for Loading From Other Locations

Developers and contributors: please read our Contribution Guidelines.

Basic Usage

Launch the tool with the appropriate script in the bin directory of your distribution. The help text of the tool provides summaries of all supported settings.

The dsbulk command takes a subcommand argument followed by options:

# Load data
dsbulk load <options>

# Unload data
dsbulk unload <options>

# Count rows
dsbulk count <options>

Long options

Any DSBulk or Java Driver setting can be entered on the command line as a long-option argument of the following general form:

--full.path.of.setting "some-value"

DSBulk settings always start with dsbulk; for convenience, this prefix can be omitted in a long option argument, so the following two options are equivalent and both map to DSBulk's dsbulk.batch.mode setting:

--dsbulk.batch.mode PARTITION_KEY
--batch.mode PARTITION_KEY

Java Driver settings always start with datastax-java-driver; for convenience, this prefix can be shortened to driver in a long option argument, so the following two options are equivalent and both map to the driver's datastax-java-driver.basic.cloud.secure-connect-bundle setting:

--datastax-java-driver.basic.cloud.secure-connect-bundle /path/to/bundle
--driver.basic.cloud.secure-connect-bundle /path/to/bundle

Most settings have default values, or values that can be inferred from the input data. However, sometimes the default value is not suitable for you, in which case you will have to specify the desired value either in the application configuration file (see below), or on the command line.

For example, the default value for connector.csv.url is to read from standard input or write to standard output; if that does not work for you, you need to override this value and specify the source path/url of the csv data to load (or path/url where to send unloaded data).

See the Settings page or DSBulk's template configuration file for details.

Short options (Shortcuts)

For convenience, many options (prefaced with --), have shortcut variants (prefaced with -). For example, --dsbulk.schema.keyspace has an equivalent short option -k.

Connector-specific options also have shortcut variants, but they are only available when the appropriate connector is chosen. This allows multiple connectors to overlap shortcut options. For example, the JSON connector has a --connector.json.url setting with a -url shortcut. This overlaps with the -url shortcut option for the CSV connector, that actually maps to --connector.csv.url. But in a given invocation of dsbulk, only the appropriate shortcut will be active.

Run the tool with --help and specify the connector to see its short options:

dsbulk -c csv --help

Configuration Files vs Command Line Options

All DSBulk options can be passed as command line arguments, or in a configuration file.

Using one or more configuration files is sometimes easier than passing all configuration options via the command line.

By default, the configuration files are located under DSBulk's conf directory; the main configuration file is named application.conf. This location can be modified via the -f switch. See examples below.

DSBulk ships with a default, empty application.conf file that users can customize to their needs; it also has a template configuration file that can serve as a starting point for further customization.

Configuration files are also required to be compliant with the HOCON syntax. This syntax is very flexible and allows sections to be grouped together in blocks, e.g.:

dsbulk {
  connector {
    name = "csv"
    csv {
      url = "C:\\Users\\My Folder"
      delimiter = "\t"
    }
  }
}

The above is equivalent to the following snippet using dotted notation instead of blocks:

dsbulk.connector.name = "csv"
dsbulk.connector.csv.url = "C:\\Users\\My Folder"
dsbulk.connector.csv.delimiter = "\t"

You can split your configuration in more than one file using file inclusions; see the HOCON documentation for details. The default configuration file includes another file called driver.conf, also located in the conf directory. This file should be used to configure the Java Driver for DSBulk. This file is empty as well; users can customize it to their needs. A driver template configuration file can serve as a starting point for further customization.

Important caveats:

  1. In configuration files, it is not possible to omit the prefix dsbulk. For example, to select the connector to use in a configuration file, use dsbulk.connector.name = csv, as in the example above; on the command line, however, you can use --dsbulk.connector.name csv or --connector.name csv to achieve the same effect, as stated above.
  2. In configuration files, it is not possible to abbreviate the prefix datastax-java-driver to driver. For example, to select the consistency level to use in a configuration file, use datastax-java-driver.basic.request.consistency = QUORUM in a configuration file; on the command line, however, you can use both --datastax-java-driver.basic.request.consistency = QUORUM or --driver.basic.request.consistency = QUORUM to achieve the same effect.
  3. Options specified through the command line override options specified in configuration files. See examples for details.

Escaping and Quoting Command Line Arguments

Regardless of whether they are supplied via the command line or in a configuration file, all option values are expected to be in valid HOCON syntax: control characters, the backslash character, and the double-quote character all need to be properly escaped.

For example, \t is the escape sequence that corresponds to the tab character:

dsbulk load -delim '\t'

In general, string values containing special characters (such as a colon or a whitespace) also need to be properly quoted with double-quotes, as required by the HOCON syntax:

dsbulk load -h '"host.com:9042"'

File paths on Windows systems usually contain backslashes; \\ is the escape sequence for the backslash character, and since Windows paths also contain special characters, the whole path needs to be double-quoted:

dsbulk load -url '"C:\\Users\\My Folder"'

However, when the expected type of an option is a string, it is possible to omit the surrounding double-quotes, for convenience:

dsbulk load -url 'C:\\Users\\My Folder'

Similarly, when an argument is a list, it is possible to omit the surrounding square brackets; making the following two lines equivalent:

dsbulk load --codec.nullStrings 'NIL, NULL'
dsbulk load --codec.nullStrings '[NIL, NULL]'

The same applies for arguments of type map: it is possible to omit the surrounding curly braces, making the following two lines equivalent:

dsbulk load --connector.json.deserializationFeatures '{ USE_BIG_DECIMAL_FOR_FLOATS : true }'
dsbulk load --connector.json.deserializationFeatures 'USE_BIG_DECIMAL_FOR_FLOATS : true'

This syntactic sugar is only available for command line arguments of type string, list or map; all other option types, as well as all options specified in a configuration file must be fully compliant with HOCON syntax, and it is the user's responsibility to ensure that such options are properly escaped and quoted.

Also, note that this syntactic sugar is not capable of quoting single elements inside a list or a map; all elements in a list or a map must be individually quoted if they contain special characters. For example, to specify a list with 2 contact points containing port numbers, it is necessary to quote each contact point individually, however the surrounding brackets, as explained above, can be omitted for brevity:

dsbulk load -h '"host1.com:9042","host2.com:9042"'

Load Examples

  • Load table table1 in keyspace ks1 from CSV data read from stdin. Use a cluster with a localhost contact point. Field names in the data match column names in the table. Field names are obtained from a header row in the data; by default the tool presumes a header exists in each file being loaded:

    dsbulk load -k ks1 -t table1

  • Load table table1 in keyspace ks1 from a gzipped CSV file by unzipping it to stdout and piping to stdin of the tool:

    gzcat table1.csv.gz | dsbulk load -k ks1 -t table1

  • Load the file export.csv to table table1 in keyspace ks1 using the short form option for url and the tab character as a field delimiter:

    dsbulk load -k ks1 -t table1 -url export.csv -delim '\t'

  • Specify a few hosts (initial contact points) that belong to the desired cluster and load from a local file, without headers. Map field indices of the input to table columns:

    dsbulk load -url ~/export.csv -k ks1 -t table1 -h '10.200.1.3,10.200.1.4' -header false -m '0=col1,1=col3'

  • Specify port 9876 for the cluster hosts and load from an external source url:

    dsbulk load -url https://192.168.1.100/data/export.csv -k ks1 -t table1 -h '10.200.1.3,10.200.1.4' -port 9876

  • Load all csv files from a directory. The files do not have header rows. Map field indices of the input to table columns:

    dsbulk load -url ~/export-dir -k ks1 -t table1 -m '0=col1,1=col3' -header false

  • Load a file containing three fields per row. The file has no header row. Map all fields to table columns in field order. Note that field indices need not be provided.

    dsbulk load -url ~/export-dir -k ks1 -t table1 -m 'col1, col2, col3' -header false

  • With default port for cluster hosts, keyspace, table, and mapping set in conf/application.conf:

    dsbulk load -url https://192.168.1.100/data/export.csv -h '10.200.1.3,10.200.1.4'

  • With default port for cluster hosts, keyspace, table, and mapping set in dsbulk_load.conf:

    dsbulk load -f dsbulk_load.conf -url https://192.168.1.100/data/export.csv -h '10.200.1.3,10.200.1.4'

  • Load table table1 in keyspace ks1 from a CSV file, where double-quote characters in fields are escaped with a double-quote; for example, "f1","value with ""quotes"" and more" is a line in the CSV file:

    dsbulk load -url ~/export.csv -k ks1 -t table1 -escape '\"'

  • Load table table1 in keyspace ks1 from an AWS S3 URL, passing the region and profile as query parameters in the URL:

    dsbulk load -k ks1 -t table1 -url s3://bucket-name/key?region=us-west-1&profile=bucket-profile

Unload Examples

Unloading is simply the inverse of loading and due to the symmetry, many settings are used in both load and unload.

  • Unload data to stdout from the ks1.table1 table in a cluster with a localhost contact point. Column names in the table map to field names in the data. Field names must be emitted in a header row in the output:

    dsbulk unload -k ks1 -t table1

  • Unload data to stdout from the ks1.table1 and gzip the result:

    dsbulk unload -k ks1 -t table1 | gzip > table1.gz

  • Unload data to a local directory (which may not yet exist):

    dsbulk unload -url ~/data-export -k ks1 -t table1

Count Examples

When counting rows in a table, no connector is required, and schema.mapping should not be present.

  • Count the total rows in the ks1.table1 table in a cluster with a localhost contact point.

    dsbulk count -k ks1 -t table1

  • Count the total number of rows per token range in the ks1.table1 table in a cluster with a localhost contact point.

    dsbulk count -k ks1 -t table1 -stats ranges

  • Count the total number of rows per host in the ks1.table1 table in a cluster with a localhost contact point.

    dsbulk count -k ks1 -t table1 -stats hosts

  • Count the total number of rows for the biggest 100 partitions in the ks1.table1 table in a cluster with a localhost contact point (by default, DSBulk returns the number of rows for the 10 biggest partitions in the table).

    dsbulk count -k ks1 -t table1 -stats partitions -partitions 100

  • Count the total number of rows, the total number of rows per token range, the total number of rows per hosts in the ks1.table1, and the total number of rows for the biggest 10 partitions table in a cluster with a localhost contact point.

    dsbulk count -k ks1 -t table1 -stats global,ranges,hosts,partitions

Command-line Help

Available settings along with defaults are documented here, they are also documented in DSBulk's template configuration file and in the driver template configuration file. This information is also available on the command-line via the help subcommand.

  • Get help for common options and a list of sections from which more help is available:

    dsbulk help

  • Get help for all connector.csv options:

    dsbulk help connector.csv

License

Copyright DataStax, Inc.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

dsbulk's People

Contributors

absurdfarce avatar adutra avatar alexott avatar davidtaylorcengage avatar dependabot[bot] avatar gregbestland avatar msmygit avatar pedjak avatar smatvienko-tb avatar stamhankar999 avatar tolbertam avatar tomekl007 avatar weideng1 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

dsbulk's Issues

Split timestamp into date and hour

Hello,
I need to load into cassandra a csv file with a date string yyyyMMddHHmmss and I need to split this string into two separate columns:
1)datetest : yyyyMMddHHmmss
2)partition_id : yyyyMMddHH
Example:
(input csv)
aaaa|bbbb|20210531041034

(table cassandra)
table test(A text, B text, datetest text, partition_id text)
--->('aaaa','bbbb,20210531041034,2021053104)

Is it possible with dsbulk?
Thanks
Graziella

┆Issue is synchronized with this Jira Task by Unito

Add support for Prometheus

DSBulk is already well instrumented, but it only exports its metrics to JMX and CSV files.

It would be good if it could export metrics to Prometheus as well.

I am not thinking about replacing Dropwizard with another metrics framework, I am only thinking about exposing the existing Dropwizard metrics to Prometheus. It turns out there is a Prometheus exporter for Dropwizard that will take care of all the heavy lifting.

Also we would need to think about how to expose the metrics. The usual pull model (scraping) is not always suitable for batch jobs like DSBulk, especially short-lived operations. Prometheus also offers a push-based model based on a PushGateway.

According to this doc, batch jobs should have a few important metrics pushed to a PushGateway (time elapsed, success/failure, etc.), but it is useful to also scrape them using pull-based monitoring. I propose that we offer both approaches and let the user decide which ones to enable.

┆Issue is synchronized with this Jira Task by Unito

Implement application-level retries

The transition from driver 3.x to driver 4.x a while ago in DSBulk 1.5.0 brought one unexpected consequence: client-side timeouts are now global to the whole statement execution. The driver docs say:

Unlike 3.x, the request timeout now spans the entire request. In other words, it's the maximum amount of time that session.execute will take, including any retry, speculative execution, etc.

What the docs don't say is that because the timeout is global to the session.execute call, timeouts are not retried anymore. The solution to this problem is now to use speculative executions.

Users of DSBulk can use speculative executions if they wish (they are disabled by default). But because speculative executions are hard to tune, and also because they don't work with rate limiters (see #447), I think it would be nice to implement a form of application-level retry when the statement execution fails.

Thanks to the Reactor framework, such a feature could certainly be implemented very easily, using the retryWhen operator, e.g. in LoadWorkflow:

private Flux<WriteResult> executeStatements(Flux<? extends Statement<?>> stmts) {
  Retry spec = new Retry() {

    private Duration delay;
    private long maxRetries = 3;

    @Override
    public Publisher<Boolean> generateCompanion(Flux<RetrySignal> retrySignals) {
      return retrySignals.flatMap(
          signal -> {
            Mono<Boolean> retryDecision;
            if (signal.totalRetries() < maxRetries && signal.failure() instanceof DriverTimeoutException) {
              retryDecision = Mono.just(true);
              if (delay != null) {
                retryDecision = retryDecision.delayElement(delay);
              }
            } else {
                retryDecision = Mono.error(signal.failure());
            }
            return retryDecision;
            });
    }
  };
  return dryRun
      ? stmts.map(EmptyWriteResult::new)
      : stmts.flatMap(statement -> Flux.from(executor.writeReactive(statement)).retryWhen(spec), writeConcurrency);
}

┆Issue is synchronized with this Jira Task by Unito

DSBulk's rate limiter is not compatible with speculative executions

By default, both rate limiting and speculative executions are disabled in DSBulk.

If both are enabled, we observed that, when writing and from the server's perspective, the rate limit is not honored.

This is because rate limit permits are acquired per row written. More specifically, each call to session.executeAsync() will need to acquire permits.

If the request is retried internally by the driver, that's fine, because a retry request is only sent when the initial request has finished, so the invariant acquired <= available is respected and the server never sees more than available concurrent requests.

However, if we enable speculative executions, the driver may trigger a speculative request while the initial request is still in-flight. This will be done without acquiring more permits, since only one call to executeAsync was done. From the client's perspective, the invariant acquired <= available looks respected (we are writing one row only, but with 2 requests), but from the server's perspective, 2 requests were received and the server may find itself processing more than available concurrent requests.

The immediate consequence of such a setup is that Astra starts returning OverloadedExceptions even after #435 was implemented.

I don't have any immediate solution for that. I think we would need to change how permits are acquired for writes: each internal request that the driver sends needs to acquire permits, not only the initial one.

We can achieve this in a few ways – but all of them involve extending driver classes:

  • Move Guava's RateLimiter to CqlRequestHandler, and acquire the permits each time a message is written to the Netty channel, see here.
  • Use the driver's built-in RateLimitingRequestThrottler. But we'll need to improve this mechanism:
    • The RequestThrottler interface will need to access the statement being executed, in order to compute the number of permits;
    • The throttler is currently not invoked for speculative executions anyways. This is probably a bug btw.

I will open driver Jiras for improving the throttling mechanism. But even so I'm reluctant to try the above changes:

  • Moving Guava's RateLimiter inside CqlRequestHandler means that we are calling a blocking operation in a driver IO thread. This is considered bad practice and could have undesired consequences.
  • Using the driver's built-in RateLimitingRequestThrottler would avoid blocking operations, but it uses instead an internal queue to park requests. When the queue is full, it throws an error. This might also be undesirable for DSBulk.

Note: I think that reads are not a problem. For reads, permits are acquired per row emitted, after the results page has been received. So speculative executions won't pose any problem here.

Note2: it might be simpler to just give up on rate limit + speculative execs and document this limitation.

Note3: to mitigate this, we could look into something simpler: implementing application-level retries when a write request ends with a DriverTimeoutException. I will create a separate issue for that.

┆Issue is synchronized with this Jira Task by Unito

DSBulk load fails when the primary key column is empty

Hello Team,

I am trying to load a data file into the Cassandra cluster and running into error if the one of the Primary key column is empty. Is there any option/way to safely insert these entries? I read the documentation that it's not possible but can we somehow fill these values to a null value during the Unload operation itself, so the load can happen?

--schema.allowMissingFields
--dsbulk.schema.allowMissingFields <boolean>
Specify whether or not to accept records that are missing fields declared in the mapping. For example, if the mapping declares three fields A, B, and C, but a record contains only fields A and B, then if this option is true, C will be silently assigned null and the record will be considered valid, and if false, the record will be rejected. If the missing field is mapped to a primary key column, the record will always be rejected, since the database will reject the record. This setting also applies to user-defined types and tuples. Only applicable for loading, ignored otherwise.

This setting is ignored when counting.

Default: false.


I can easily achieve this with the CQLSH utility

COPY keyspace.table to '/tmp/table.txt' WITH NULL = 'null'; --> export
COPY newkeyspace.table FROM '/tmp/table.txt' WITH NULL = 'null'; --> import

I am using 1.7.0 version of DSBulk and below is the sample data format

id,scope,name,tags,created_by,ver
abc123de,"",key,"{\"key\":\"value\"}",,b7258e80-5518-11eb-b06e-7d24a9bd009c

Primary Key: (id, scope)

Please let me know if you need more information.

┆Issue is synchronized with this Jira Task by Unito

Properly close files when unload is interrupted

This bug was discovered while working on #432: when an unload operation is interrupted, connector files might not be properly flushed and closed.

This is due to a flaw in AbstractFileBasedConnector#close: the code iterates over the pool of writers and closes all writers in it.

However since this is a pool, it could be empty, or only contain a subset of all writers, because the operation may be interrupted while some writers are being borrowed and therefore are not in the pool.

┆Issue is synchronized with this Jira Task by Unito

load csv with date field

I need to upload a csv file which contains a date field in yyyyMMddHHmmss format: example(20210515031117).
The field will be mapped to a cassandra table with timestamp type.
I ran the dsbulk command adding the option --codec.date 'yyyyMMddHHmmss' but it doesn't work

This is the error:
com.datastax.oss.dsbulk.workflow.commons.schema.InvalidMappingException: Could not map field call_date to variable call_date; conversion from Java type java.lang.String to CQL type TIMESTAMP failed for raw value: 20210515031117.
at com.datastax.oss.dsbulk.workflow.commons.schema.InvalidMappingException.encodeFailed(InvalidMappingException.java:96)
at com.datastax.oss.dsbulk.workflow.commons.schema.DefaultRecordMapper.bindColumn(DefaultRecordMapper.java:164)
at com.datastax.oss.dsbulk.workflow.commons.schema.DefaultRecordMapper.map(DefaultRecordMapper.java:133)
at java.lang.Thread.run(Thread.java:748) [19 skipped]
Caused by: java.time.format.DateTimeParseException: Text '20210515031117' could not be parsed at index 0
at java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949)
at java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1819)
at com.datastax.oss.dsbulk.codecs.api.format.temporal.SimpleTemporalFormat.parse(SimpleTemporalFormat.java:48)
at com.datastax.oss.dsbulk.codecs.api.format.temporal.ZonedTemporalFormat.parse(ZonedTemporalFormat.java:52)
at com.datastax.oss.dsbulk.codecs.text.string.StringToTemporalCodec.parseTemporalAccessor(StringToTemporalCodec.java:46)
at com.datastax.oss.dsbulk.codecs.text.string.StringToInstantCodec.externalToInternal(StringToInstantCodec.java:44)
at com.datastax.oss.dsbulk.codecs.text.string.StringToInstantCodec.externalToInternal(StringToInstantCodec.java:27)
at com.datastax.oss.dsbulk.codecs.api.ConvertingCodec.encode(ConvertingCodec.java:70)
at com.datastax.oss.dsbulk.workflow.commons.schema.DefaultRecordMapper.bindColumn(DefaultRecordMapper.java:162)
at com.datastax.oss.dsbulk.workflow.commons.schema.DefaultRecordMapper.map(DefaultRecordMapper.java:133)

what did i do wrong?
Thanks

┆Issue is synchronized with this Jira Task by Unito

Add support for literals in mappings

DSBulk currently doesn't allow literals in mappings outside of function args. We could extend support for literals to allow things like:

fieldA=col1,fieldB=col2,'dsbulk'=created_by

┆Issue is synchronized with this Jira Task by Unito

Add ability to unwrap BATCH queries

A BATCH query is usually required in order to load a record containing TTL and TIMESTAMP information for individual columns. This is exactly what happens when the load operation is configured with -ttl and/or -timestamp.

When the query used for loading is a BATCH query, currently DSBulk 1.8 disables its own batching mechanism, to avoid nesting BATCH statements inside protocol-level BATCH messages, which is forbidden.

But then we could face a performance hit, especially if the batches contain just a few children. It would be much better if DSBulk could "unwrap" the original batch queries, prepare the child statements individually, and then use its batching mechanism to group together all resulting child statements in a (potentially much bigger) BATCH message.

I still think that, for simplicity sake, and from the user perspective, DSBulk should continue to behave as if 1 record = 1 statement. I am here only suggesting that, internally, for performance sake, DSBulk could unwrap the provided BATCH query and treat its children individually, thus effectively behaving as if 1 record = N statements.

We might want to introduce a few safeguards:

  • Only UNLOGGED batch queries should be unwrapped and rebatched.
  • Batch-level USING clauses should prevent rebatching (as the clause applies to all children).

If rebatching is undesirable for any reason, the user can simply set batch.mode to DISABLED.

I could come up with an implementation fairly quickly as I have a good design in mind. However this would require a round of performance benchmark to make sure we aren't introducing a performance regression.

┆Issue is synchronized with this Jira Task by Unito

Unload failed after 10 hours normal flight, 4.8B from 6.3B uploaded: Unable to perform authorization of permissions: Unable to perform authorization of super-user permission: Operation timed out - received only 0 responses

Hi guys!

dsbulk is awesome. Thank you for your effort.
My unload failed after 10 hours of normal flight:

Unable to perform authorization of permissions: Unable to perform authorization of super-user permission: Operation timed out - received only 0 responses

image

I found the suggestion to fix the replication factor for system_auth. But it does not help, the issue is still in place.

ALTER KEYSPACE system_auth WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 3 };
for i in {0..9}; do oc rsh cassandra-helm-${i} nodetool repair -full -- system_auth ; done

My question is: Can I force dsbulk to retry such kind of issue? Can I resume failed upload from the latest token? any other suggestions?

dsbulk 1.8.0
The upload command was:

dsbulk unload -timestamp true -ttl true -cl LOCAL_QUORUM -maxConcurrentQueries 5 -maxRecords 200000 -c json -url /dsbulk/data/json -logDir /dsbulk/data/logs --connector.json.compression gzip -k thingsboard -t ts_kv_cf -u ${CASSANDRA_USER} -p ${CASSANDRA_PASSWORD} -h cassandra-helm-headless -maxErrors -1 -verbosity 2 --driver.basic.request.page-size 1000 --driver.advanced.continuous-paging.page-size 1000

unload-errors.log

Statement: com.datastax.oss.driver.internal.core.cql.DefaultBoundStatement350e8fa3 [2 values, idempotence: , CL: , serial CL: , timestamp: , time
out: ]
SELECT entity_type, entity_id, key, partition, ts, bool_v, writetime(bool_v) AS "writetime(bool_v)", ttl(bool_v) AS "ttl(bool_v)", dbl_v, writetime(dbl_v) AS "writetime(dbl_v
)", ttl(dbl_v) AS "ttl(dbl_v)", json_v, writetime(json_v) AS "writetime(json_v)", ttl(json_v) AS "ttl(json_v)", long_v, writetime(long_v) AS "writetime(long_v)", ttl(long_v)
AS "ttl(long_v)", str_v, writetime(str_v) AS "writetime(str_v)", ttl(str_v) AS "ttl(str_v)" FROM thingsboard.ts_kv_cf WHERE token(entity_type, entity_id...
start: 2725990092663290223
end: 2748111336664437991
com.datastax.oss.dsbulk.executor.api.exception.BulkExecutionException: Statement execution failed: SELECT entity_type, entity_id, key, partition, ts, bool_v, writetime(bool_v
) AS "writetime(bool_v)", ttl(bool_v) AS "ttl(bool_v)", dbl_v, writetime(dbl_v) AS "writetime(dbl_v)", ttl(dbl_v) AS "ttl(dbl_v)", json_v, writetime(json_v) AS "writetime(jso
n_v)", ttl(json_v) AS "ttl(json_v)", long_v, writetime(long_v) AS "writetime(long_v)", ttl(long_v) AS "ttl(long_v)", str_v, writetime(str_v) AS "writetime(str_v)", ttl(str_v)
AS "ttl(str_v)" FROM thingsboard.ts_kv_cf WHERE token(entity_type, entity_id, key, partition) > :start AND token(entity_type, entity_id, key, partition) <= :end (Unable to p
erform authorization of permissions: Unable to perform authorization of super-user permission: Operation timed out - received only 0 responses.)
at com.datastax.oss.dsbulk.executor.api.subscription.ResultSubscription.toErrorPage(ResultSubscription.java:534)
at com.datastax.oss.dsbulk.executor.api.subscription.ResultSubscription.lambda$fetchNextPage$1(ResultSubscription.java:372)
at com.datastax.oss.driver.internal.core.cql.CqlRequestHandler.setFinalError(CqlRequestHandler.java:447) [4 skipped]
at com.datastax.oss.driver.internal.core.cql.CqlRequestHandler.access$700(CqlRequestHandler.java:94)
at com.datastax.oss.driver.internal.core.cql.CqlRequestHandler$NodeResponseCallback.processErrorResponse(CqlRequestHandler.java:763)
at com.datastax.oss.driver.internal.core.cql.CqlRequestHandler$NodeResponseCallback.onResponse(CqlRequestHandler.java:655)
at com.datastax.oss.driver.internal.core.channel.InFlightHandler.channelRead(InFlightHandler.java:257)
at java.lang.Thread.run(Thread.java:748) [24 skipped]
Caused by: com.datastax.oss.driver.api.core.servererrors.UnauthorizedException: Unable to perform authorization of permissions: Unable to perform authorization of super-user
permission: Operation timed out - received only 0 responses.

┆Issue is synchronized with this Jira Task by Unito

a bug when unload to csv

dsbulk may have a bug, when unload a big table to csv, some records breaked into 2 lines, so ít causes errors when import to another database.
Please take a look and fix this.
Many thanks,
Ha Nguyen

┆Issue is synchronized with this Jira Task by Unito

DSBulk's option for "ansiMode" doesn't work

Tried to utilize the ansiMode options as documented here: https://github.com/datastax/dsbulk/blob/d9107caf9a4a286c06d8e8f089e18d4e798993a2/manual/settings.md#log
--dsbulk.log.ansiMode
--log.ansiMode

However, neither option appears to have any effect on the color scheme that is given as output.

Test #1 running dsbulk in a windows command:
image

In the image above, we can see that the ansiMode is set to disable, but still we are getting colors in the output.

We have a second test where we are using dsbulk through our application written in C#, when we apply the "force" option we still do not receive any ANSI colors in our logged output. This is making debugging the problem with ANSI colors appearing in some environments but not other environments hard to debug as we cannot control the color scheme when invoking dsbulk commands. Ultimately we would like to use the 'disable' option to turn off ANSI colors completely so that we don't have to strip the colors in our logic when parsing the numbers from the log.

┆Issue is synchronized with this Jira Task by Unito

dsbulk load requires MODIFY permission to load the data

./dsbulk --version
DataStax Bulk Loader v1.9.0

./dsbulk load -url /tmp/unload/requests -k KS -t TABLE -h IP1 -u USERNAME -p PASSWORD --connector.csv.maxCharsPerColumn -1
Username and password provided but auth provider not specified, inferring PlainTextAuthProvider
Operation directory: /tmp/dsbulk-1.9.0/bin/logs/LOAD_20220605-183958-188792
[driver] /IP2:9042 did not send an authentication challenge; This is suspicious because the driver expects authentication
[driver] /IP2:9042 did not send an authentication challenge; This is suspicious because the driver expects authentication
[driver] /IP2:9042 did not send an authentication challenge; This is suspicious because the driver expects authentication
[driver] /IP3:9042 did not send an authentication challenge; This is suspicious because the driver expects authentication
[driver] /IP3:9042 did not send an authentication challenge; This is suspicious because the driver expects authentication
[driver] /IP2:9042 did not send an authentication challenge; This is suspicious because the driver expects authentication
[driver] /IP2:9042 did not send an authentication challenge; This is suspicious because the driver expects authentication
[driver] /IP2:9042 did not send an authentication challenge; This is suspicious because the driver expects authentication
[driver] /IP3:9042 did not send an authentication challenge; This is suspicious because the driver expects authentication
[driver] /IP2:9042 did not send an authentication challenge; This is suspicious because the driver expects authentication
[driver] /IP3:9042 did not send an authentication challenge; This is suspicious because the driver expects authentication
[driver] /IP2:9042 did not send an authentication challenge; This is suspicious because the driver expects authentication
[driver] /IP3:9042 did not send an authentication challenge; This is suspicious because the driver expects authentication
[driver] /IP3:9042 did not send an authentication challenge; This is suspicious because the driver expects authentication
[driver] /IP2:9042 did not send an authentication challenge; This is suspicious because the driver expects authentication
[driver] /IP3:9042 did not send an authentication challenge; This is suspicious because the driver expects authentication
Operation LOAD_20220605-183958-188792 failed: User USERNAME has no MODIFY permission on table KS.TABLE or any of its parents.
com.datastax.oss.driver.api.core.servererrors.UnauthorizedException: User USERNAME has no MODIFY permission on table KS.TABLE or any of its parents
Suppressed: java.lang.Exception: #block terminated with an error
at com.datastax.oss.dsbulk.workflow.load.LoadWorkflow.execute(LoadWorkflow.java:230) [2 skipped]
at com.datastax.oss.dsbulk.runner.WorkflowThread.run(WorkflowThread.java:53)
total | failed | rows/s | p50ms | p99ms | p999ms | batches
2,986 | 0 | 86 | 35.61 | 100.14 | 100.14 | 1.00
Rejected records can be found in the following file(s): load.bad
Errors are detailed in the following file(s): load-errors.log
Last processed positions can be found in positions.txt

load-errors.log
Statement: com.datastax.oss.dsbulk.workflow.commons.statement.MappedBoundStatement1769d2e1 [33 values, idempotence: , CL: , serial CL: , timestamp: , timeout: ]
Resource: file:/tmp/unload/requests/output-000006.csv
Position: 109
Source: DATA
INSERT INTO KS.TABLE (attributes...) VALUES (...)
com.datastax.oss.dsbulk.executor.api.exception.BulkExecutionException: Statement execution failed: INSERT INTO KS.TABLE (attributes...) VALUES (...) (User USERNAME has no MODIFY permission on table KS.TABLE or any of its parents)
at com.datastax.oss.dsbulk.executor.api.subscription.ResultSubscription.toErrorPage(ResultSubscription.java:534)
at com.datastax.oss.dsbulk.executor.api.subscription.ResultSubscription.lambda$fetchNextPage$1(ResultSubscription.java:372)
at com.datastax.oss.driver.internal.core.cql.CqlRequestHandler.setFinalError(CqlRequestHandler.java:450) [4 skipped]
at com.datastax.oss.driver.internal.core.cql.CqlRequestHandler.access$700(CqlRequestHandler.java:95)
at com.datastax.oss.driver.internal.core.cql.CqlRequestHandler$NodeResponseCallback.processErrorResponse(CqlRequestHandler.java:766)
at com.datastax.oss.driver.internal.core.cql.CqlRequestHandler$NodeResponseCallback.onResponse(CqlRequestHandler.java:658)
at com.datastax.oss.driver.internal.core.channel.InFlightHandler.channelRead(InFlightHandler.java:257)
at java.lang.Thread.run(Thread.java:748) [24 skipped]
Caused by: com.datastax.oss.driver.api.core.servererrors.UnauthorizedException: User USERNAME has no MODIFY permission on table KS.TABLE or any of its parents

Not sure why dsbulk load needs the MODIFY permission to load the data.
Note: target Cassandra cluster IP1 has the auth enabled but IP2 and IP3 don't have the auth enabled, so we see the warning messages.

┆Issue is synchronized with this Jira Task by Unito

How to prevent "Adjusted frame length exceeds XXX" errors

Hi,

I get some errors from dsbulk unload like this:

com.datastax.oss.dsbulk.executor.api.exception.BulkExecutionException: Statement execution failed: SELECT [...statement here...] (Adjusted frame length exceeds 268435456: 333267766 - discarded)

It looks like the query result has exceeded the native_transport_max_frame_size_in_mb limit (256MB)

Is there a dsbulk config setting I can change that will reduce the size of the results from that query (and hopefully prevent that error) ?

Thanks

┆Issue is synchronized with this Jira Task by Unito

Getting java.lang.OutOfMemoryError: Java heap space exception

I'm trying to load data to Astra and I'm getting an error. I've got around 5000 objects that I unloaded using the dsbulk 1.9.0 into a csv file. Now, when I try to load them into Astra I'm getting a java.lang.OutOfMemoryError. Some data, before the exception happens, gets successfully uploaded and I can see it in the database - around 120 items.

What could be the cause?

Here's how I invoke the dsbulk:

dsbulk load -url file.csv -k mykeyspacename -t mytablename-b "./secure-connect-xxx-production.zip" -u xxx-p xxx -header true --connector.csv.maxCharsPerColumn -1

Error:
2022-05-25 18!UNITO-UNDERSCORE!03!UNITO-UNDERSCORE!48-5 148 169 198!UNITO-UNDERSCORE!50002 - Remote Desktop Connection

Contents of the log:

2022-05-25 16:01:45 INFO  Username and password provided but auth provider not specified, inferring PlainTextAuthProvider
2022-05-25 16:01:45 INFO  A cloud secure connect bundle was provided: ignoring all explicit contact points.
2022-05-25 16:01:45 INFO  A cloud secure connect bundle was provided and selected operation performs writes: changing default consistency level to LOCAL_QUORUM.
2022-05-25 16:01:45 INFO  Operation directory: C:\Users\Administrator\Desktop\AstraMigration\dsbulk-1.9.0\bin\logs\LOAD_20220525-160145-169000
2022-05-25 16:01:58 ERROR Operation LOAD_20220525-160145-169000 failed unexpectedly: Java heap space.
java.lang.OutOfMemoryError: Java heap space
	at java.util.Arrays.copyOfRange(Unknown Source)
	at java.lang.String.<init>(Unknown Source)
	at com.univocity.parsers.common.input.DefaultCharAppender.getAndReset(DefaultCharAppender.java:162)
	at com.univocity.parsers.common.ParserOutput.valueParsed(ParserOutput.java:363)
	at com.univocity.parsers.csv.CsvParser.parseSingleDelimiterRecord(CsvParser.java:180)
	at com.univocity.parsers.csv.CsvParser.parseRecord(CsvParser.java:109)
	at com.univocity.parsers.common.AbstractParser.parseNext(AbstractParser.java:581)
	at com.univocity.parsers.common.AbstractParser.parseNextRecord(AbstractParser.java:1219)
	at com.datastax.oss.dsbulk.connectors.csv.CSVConnector$CSVRecordReader.readNext(CSVConnector.java:289)
	at com.datastax.oss.dsbulk.connectors.commons.AbstractFileBasedConnector$$Lambda$372/27353340.apply(Unknown Source)
2022-05-25 16:02:00 INFO  Final stats:
2022-05-25 16:02:00 INFO  Last processed positions can be found in positions.txt

┆Issue is synchronized with this Jira Task by Unito

DSBulk count ignore the --stats.modes partitions in the output

It appears like this bug has begun happening since 1.5.0 version.

DSBulk 1.3.4 output is as follows,

$ ./dsbulk count -k test1 -t table1 -h 12.121.22.96 -u cassandra -p REDACTED --stats.mode partitions --stats.numPartitions 15
Operation directory: /path/to/dsbulk-1.3.4/bin/logs/COUNT_20200721-150925-672136
Username and password provided but auth provider not specified, inferring DsePlainTextAuthProvider
total | failed | rows/s | p50ms | p99ms | p999ms
  100 |      0 |     87 | 76.41 | 90.70 |  90.70
Operation COUNT_20200721-150925-672136 completed successfully in 0 seconds.
'thirty_two' 2 2.00
'forty_seven' 2 2.00
'nineteen' 2 2.00
'eleven' 2 2.00
'seven' 2 2.00
'ten' 2 2.00
'forty_nine' 2 2.00
'thirty_seven' 2 2.00
'fifty_two' 2 2.00
'fifty_eight' 2 2.00
'fifty_nine' 2 2.00
'seventeen' 2 2.00
'one' 2 2.00
'twenty_three' 2 2.00
'fourteen' 2 2.00

whereas DSBulk 1.6.0 (& 1.5.0) doesn't output the last part where it spits out the partitions in the output,

1.5.0 output,

$ ./dsbulk count -k test1 -t table1 -h 12.121.22.96 -u cassandra -p REDACTED --stats.mode partitions --stats.numPartitions 15
Username and password provided but auth provider not specified, inferring PlainTextAuthProvider
Operation directory: /path/to/dsbulk-1.5.0/bin/logs/COUNT_20200721-151058-922994
total | failed | rows/s | p50ms | p99ms | p999ms
  100 |      0 |     86 | 76.77 | 88.08 |  88.08
Operation COUNT_20200721-151058-922994 completed successfully in 0 seconds.
100

and 1.6.0 output,

$ ./dsbulk count -k test1 -t table1 -h 12.121.22.96 -u cassandra -p REDACTED --stats.mode partitions --stats.numPartitions 15
Username and password provided but auth provider not specified, inferring PlainTextAuthProvider
Operation directory: /path/to/dsbulk-1.6.0/bin/logs/COUNT_20200721-152032-786471
total | failed | rows/s | p50ms | p99ms | p999ms
  100 |      0 |     82 | 77.10 | 94.90 |  94.90
Operation COUNT_20200721-152032-786471 completed successfully in .
100

Basically --stats.modes partitions output is missing,

partitions: Count the total number of rows in the N biggest partitions in the table. Choose how many partitions to track with stats.numPartitions option. For partitions, the results are organized as follows:

  • Left column: partition key value
  • Middle column: number of rows using that partition key value
  • Right column: the partition's percentage of rows compared to the total number of rows in the table

┆Issue is synchronized with this Jira Task by Unito

Count/unload a range of tokens: start token, end token

Hi guys!
For big workloads that take hours (days), it is good to be able to count/unload a range of tokens.
The aim is to split a huge task into smaller pieces, fit in maintain time-window, etc.
Another benefit is to be able to resume failed (for any reason) upload from some point (a token that found in upload-errors.log)

SELECT * FROM thingsboard.ts_kv_cf WHERE token(entity_type, entity_id, key, partition) > :start AND token(entity_type, entity_id, key, partition) <= :end
start: 2725990092663290223
end: 2748111336664437991

The parameters may look similar with nodetool repair syntax:
[(-st start_token | --start-token start_token)]
[(-et end_token | --end-token end_token)]

┆Issue is synchronized with this Jira Task by Unito

Don't check for emptiness of primary key columns

DSBulk currently checks if a primary key column in a record is null or empty. If it is, the record is not even sent to Cassandra.

Unfortunately the check is not 100% accurate for the emptiness part:

  1. The BLOB type accepts empty buffers in any primary key column, contrary to what DSBulk currently assumes.
  2. A composite partition key also accepts empty blobs or strings (whereas a single-column partition key doesn't).

It's probably too involved to work around all such corner cases. I suggest that we don't check for emptiness and let the server deal with invalid records.

To be clear, I still think that we should check for nullness in primary key columns. This issue is only about the emptiness checks.

┆Issue is synchronized with this Jira Task by Unito

Upload dsbulk to Maven Central

Some customers have security rules in place that will not allow them to download packages without prior approval. There are loopholes, and in this case the security rule loophole allows maven repo download without prior approvals. Can a workflow be created to have dsbulk added to maven?

┆Issue is synchronized with this Jira Task by Unito

honor metadata while performing unload and load

Hello,

I am trying to import the data from one Cassandra cluster to another, however I see the ttl and writetime metadata is not honored. Can we enhance the utility to honor the TTL and Write time values?

┆Issue is synchronized with this Jira Task by Unito

Disable log directory?

Is there a way to disable logging to a directory? I have a "load" use case where I would like to be able to run dsbulk programmatically from a python process, and as it stands I need to specify an execution ID and then remove the log directory after it runs.

It would be helpful to be able to disable logging to a directory so there is nothing left behind to clean up.

Thanks

┆Issue is synchronized with this Jira Task by Unito

Unloading/Reloading list STATIC column is not idempotent.

When there is a list static column and X different unique records the elements of the STATIC columns are multiplied by X.

This seems to be because of INSERT statements generated that does not override the field value but append the data.
In case of the STATIC column that is re-inserted for each record this is a wrong behavior that BTW does not occur with the cqlsh tool (which has other issues).

┆Issue is synchronized with this Jira Task by Unito

Ability to limit throughput in bytes per second

This came as a follow-up to #435: we currently have the ability to limit throughput in ops/second, but we don't have the ability to limit it in bytes/sec.

And yet all the required bits are present: we already have a DataSizes class that is used to track statement and row sizes in two situations: when batching by data size, and when reporting rates in bytes/sec.

┆Issue is synchronized with this Jira Task by Unito

Dsbulk load fails with larger dataset

Hello Team,

I am trying to migrate Cassandra data from one cluster to other cluster using DSBulk, this works fine when the dataset is smaller but when I am trying to load the data with ~900G into the target cluster it just fails with the below exception.

com.datastax.oss.dsbulk.executor.api.exception.BulkExecutionException: Statement execution failed: INSERT INTO keyspace.table (xx, xy, xz) VALUES (:xx, :xy, :xz) (Query timed out after PT5M)
	at com.datastax.oss.dsbulk.executor.api.subscription.ResultSubscription.toErrorPage(ResultSubscription.java:534)
	at com.datastax.oss.dsbulk.executor.api.subscription.ResultSubscription.lambda$fetchNextPage$1(ResultSubscription.java:372)
	at com.datastax.oss.driver.internal.core.cql.CqlRequestHandler.setFinalError(CqlRequestHandler.java:443) [4 skipped]
	at com.datastax.oss.driver.internal.core.cql.CqlRequestHandler.lambda$scheduleTimeout$1(CqlRequestHandler.java:224)
	at java.lang.Thread.run(Thread.java:748) [4 skipped]
Caused by: com.datastax.oss.driver.api.core.DriverTimeoutException: Query timed out after PT5M
	... 6 common frames omitted

I tried to tune the target cluster with several resources, but no luck. Is there anything I am missing or is there anyway to tune the write throughput.

C* Version: 3.11.6
DSBulk Version: 1.17.0
Target cluster: 15 nodes and each node has 32 CPU cores.
Storage: 500G SSD disk attached to each node
Replication Factor: 3

Note:

  • Dsbulk is running from a machine outside the Cassandra cluster with 16 core CPUs and 30G Memory
  • The actual insert schema is different, in the above output I just removed the column names and add xx, xy, xz
  • I am trying to speed up the process and hence I would like to have higher throughput.

Please let me know if you need any other details.

┆Issue is synchronized with this Jira Task by Unito

Can't find auth-provider class

Hi,
I'm trying to use advanced.auth-provider.class=software.aws.mcs.auth.SigV4AuthProvider but dsbulk can't seem to find the class. It fails with error

Can't find class software.aws.mcs.auth.SigV4AuthProvider (specified by advanced.auth-provider.class).

I've tried putting the jar file in a variety of locations (lib dir, conf dir, etc) but it didn't help
Any idea how to fix this?
Thanks

┆Issue is synchronized with this Jira Task by Unito

Feature request to allow DSBulk to utilize sstabledump output for INSERT/DELETE operations

I've had quite a few people now that end up in a place where they've either accidentally deleted data that they needed to quickly get back, or they've resurrected data for a particular key that they need to quickly delete.
If DSBulk had the ability to process sstabledump data (would have to be live or tombstoned) then we would have a tool available to quickly rectify potentially costly mistakes. This could work well for inserting or deleting data.

┆Issue is synchronized with this Jira Task by Unito

DSBulk interprets newline characters `\n` in text columns and writes multiple lines per record to output file

When unloading text columns from Cassandra that contain newline character \n, the newline will be interpreted and DSBulk will write the row to the output file (CSV) over multiple lines. This is not the desired behavior; it is expected that these newline characters are written out as part of the text column data and the row is contained to a single line in the output file.

For example:

Text Column = First line.\nSecond line.

Current DSBulk Unload to CSV behavior:

First line.
Second line.

Desired behavior:

First line.\nSecond line.

NOTE: In order to reproduce this behavior, insert test data using the Datastax Java Driver for Apache Cassandra; also tested with Python driver. CQLSH will escape the newline character \\n, so rows inserted with CQLSH does not produce this issue.

┆Issue is synchronized with this Jira Task by Unito

DSBulk doesn't start if first line of the `java -version` doesn't contain version

If I have JAVA_TOOL_OPTIONS variable set (I need it for some programs), then java -version shows following:

>java -version
Picked up JAVA_TOOL_OPTIONS: -Dfile.encoding=UTF-8
java version "1.8.0_191"
Java(TM) SE Runtime Environment (build 1.8.0_191-b12)
Java HotSpot(TM) 64-Bit Server VM (build 25.191-b12, mixed mode)

and as result, dsbulk fails with following error:

bin/dsbulk: line 48: [: Picked up JAVA_TOOL_OPTIONS: -Dfile: integer expression expected
bin/dsbulk: line 48: [: Picked up JAVA_TOOL_OPTIONS: -Dfile: integer expression expected
bin/dsbulk: line 48: [: Picked up JAVA_TOOL_OPTIONS: -Dfile: integer expression expected
Unable to find java 8 (or later) executable. Check JAVA_HOME and PATH environment variables.

┆Issue is synchronized with this Jira Task by Unito

Support reading records from AWS S3.

Feature Request

Scenario: We have a large number of records stored in an AWS S3 bucket, and we would like to get them into Cassandra.

Problem: dsbulk does not support reading from s3:// URLs, so we tried using aws s3 cp <url> | dsbulk load <...> , but this approach was cumbersome and only allowed loading a single file at a time. (Due to the properties of the system writing these files, there is one record per file; combining them is not feasible.) Since we intend to load a very large number of files, we need a more efficient solution.

Proposed Solution: Upgrade dsbulk to be able to read from s3:// URLs, allowing us to dump a large number of filenames into the urlfile, thereby restoring the "bulk" to dsbulk.

Out-of-Scope: Since I do not have a write-to-S3 scenario, only reading from S3 need be considered for this feature.

┆Issue is synchronized with this Jira Task by Unito

DSBulk 1.6.0 count output misses duration information

When performing a count operation, the duration information is missing from the output. See the below example at this line the duration info is missing Operation COUNT_20200721-152032-786471 completed successfully in .,

$ ./dsbulk count -k test1 -t table1 -h 12.121.22.96 -u cassandra -p REDACTED --stats.mode partitions --stats.numPartitions 15
Username and password provided but auth provider not specified, inferring PlainTextAuthProvider
Operation directory: /path/to/dsbulk-1.6.0/bin/logs/COUNT_20200721-152032-786471
total | failed | rows/s | p50ms | p99ms | p999ms
  100 |      0 |     82 | 77.10 | 94.90 |  94.90
Operation COUNT_20200721-152032-786471 completed successfully in .
100

┆Issue is synchronized with this Jira Task by Unito

Can't import empty string if it is part of partition key

I have some records that contain an empty value for certain columns on the partition key. This is perfectly valid as far as I am aware. Unfortunately, they cannot be loaded by dsbulk (from csv).

They raise the following error:

com.datastax.oss.dsbulk.workflow.commons.schema.InvalidMappingException: Primary key column X cannot be set to null. Check that your settings (schema.mapping or schema.query) match your dataset contents.
        at com.datastax.oss.dsbulk.workflow.commons.schema.InvalidMappingException.nullPrimaryKey(InvalidMappingException.java:64)
        at com.datastax.oss.dsbulk.workflow.commons.schema.DefaultRecordMapper.bindColumn(DefaultRecordMapper.java:170)
        at com.datastax.oss.dsbulk.workflow.commons.schema.DefaultRecordMapper.map(DefaultRecordMapper.java:133)
        at com.datastax.oss.dsbulk.connectors.csv.CSVConnector$CSVRecordReader.readNext(CSVConnector.java:293) [7 skipped]
        at java.lang.Thread.run(Thread.java:748) [20 skipped]

It seems that the csv connector treats empty non-quoted fields as nulls (even though the export itself was performed by dsbulk), so I force the csv connector to treat nulls as empty strings --connector.csv.nullValue "", then I get the error:

com.datastax.oss.dsbulk.workflow.commons.schema.InvalidMappingException: Primary key column X cannot be set to empty. Check that your settings (schema.mapping or schema.query) match your dataset contents.
        at com.datastax.oss.dsbulk.workflow.commons.schema.InvalidMappingException.emptyPrimaryKey(InvalidMappingException.java:73)
        at com.datastax.oss.dsbulk.workflow.commons.schema.DefaultRecordMapper.bindColumn(DefaultRecordMapper.java:171)
        at com.datastax.oss.dsbulk.workflow.commons.schema.DefaultRecordMapper.map(DefaultRecordMapper.java:133)
        at com.datastax.oss.dsbulk.connectors.csv.CSVConnector$CSVRecordReader.readNext(CSVConnector.java:293) [7 skipped]
        at java.lang.Thread.run(Thread.java:748) [20 skipped]

Looking at the code, it appears that if a field is part of partition key, then it can't be either empty or null:
https://github.com/datastax/dsbulk/blob/1.x/workflow/commons/src/main/java/com/datastax/oss/dsbulk/workflow/commons/schema/DefaultRecordMapper.java#L167

As far as I am aware, empty is valid, whereas null is not?

I am able to work around this, but thought it worth raising as an issue for clarification. If you can confirm this is unintentional behaviour I am happy to raise a patch accordingly.

┆Issue is synchronized with this Jira Task by Unito

Handle write timeouts more gracefully with retries

Currently dsbulk sets a very generous timeout allowance (5-minutes) for write request timeouts, but if an insert statement still times out after 5 minutes, this particular insert will simply be marked as a failed operation, and there is no attempt to retry this write operation to minimize the number of failed operations.

All data from failed writes will be recorded in load.bad file, no matter if it's caused by write timeouts or invalid data/record. While the user does have the option of reloading from load.bad after the bulk load job is finished, it's hard to tell good records (that simply failed to load due to timeout) apart from the bad/malformed records. At the same time, dsbulk by default will also terminate once the total number of failed operations reaches 100, which means when bulk loading into a data store at a throughput close to its capacity, it's more likely the load job will fail (and longer-running job or larger data load has more probability to fail), as the number of failed writes accumulates to 100.

If we can add some logic to allow at least one retry on the write timeout before declaring it as failed, it will help to lower the number of records that the user will have to retry loading, and leave load.bad to be mostly focused on the malformed records.

┆Issue is synchronized with this Jira Task by Unito

DSBulk fails to load blob values

Hello Team,

I am trying to unload and load blob values from one cluster to another cluster using dsbulk: 1.7.0 but I see the issue during the import.

after the import when I check the data fro cqlsh I see the below errors.

 tid      | val
---------------
 abc12347 |       '\xac\xed\x00\x05sr\x00\x10java.lang.Double\x80\xb3\xc2J)k\xfb\x04\x02\x00\x01D\x00\x05valuexr\x00\x10java.lang.Number\x86\xac\x95\x1d\x0b\x94\xe0\x8b\x02\x00\x00xpBwtj\xf0#`\x00'
 abc12346 | '\xac\xed\x00\x05sr\x00\x10java.lang.Double\x80\xb3\xc2J)k\xfb\x04\x02\x00\x01D\x00\x05valuexr\x00\x10java.lang.Number\x86\xac\x95\x1d\x0b\x94\xe0\x8b\x02\x00\x00xpBwtj\xed\x90\x80\x00'
 abc12345 | '\xac\xed\x00\x05sr\x00\x10java.lang.Double\x80\xb3\xc2J)k\xfb\x04\x02\x00\x01D\x00\x05valuexr\x00\x10java.lang.Number\x86\xac\x95\x1d\x0b\x94\xe0\x8b\x02\x00\x00xpBwtj\xef\x8a\x80\x00'

(3 rows)
Failed to format value '\xac\xed\x00\x05sr\x00\x10java.lang.Double\x80\xb3\xc2J)k\xfb\x04\x02\x00\x01D\x00\x05valuexr\x00\x10java.lang.Number\x86\xac\x95\x1d\x0b\x94\xe0\x8b\x02\x00\x00xpBwtj\xf0#`\x00' : 'ascii' codec can't decode byte 0xac in position 0: ordinal not in range(128)
Failed to format value '\xac\xed\x00\x05sr\x00\x10java.lang.Double\x80\xb3\xc2J)k\xfb\x04\x02\x00\x01D\x00\x05valuexr\x00\x10java.lang.Number\x86\xac\x95\x1d\x0b\x94\xe0\x8b\x02\x00\x00xpBwtj\xed\x90\x80\x00' : 'ascii' codec can't decode byte 0xac in position 0: ordinal not in range(128)
1 more decoding errors suppressed.I also tried to set `codec.binary` but it didn't help https://docs.datastax.com/en/dsbulk/doc/dsbulk/reference/codecOptions.html#codecOptions__dsbulkCodecOptionsBinary

┆Issue is synchronized with this Jira Task by Unito

Ability to resume a failed operation

This has been asked a few times:

This is finally going to be possible for all workloads thanks to the work under review for #429 (see #430).

With the new summary.csv file being produced for all operations, we now must add a new option to resume a failed operation by parsing that file and and only re-attempting the resources that weren't read or written entirely.

┆Issue is synchronized with this Jira Task by Unito

Can't unload keyspace dse_audit

dsbulk may have bugs, I received error "Operation UNLOAD_20220419-073654-099023 failed: Keyspace dse_audit does not exist." when I tried to unload table dse_audit.audit_log. Using cqlsh or another cassandra client tool, I can query this table normaly.
Please take a look and fix this.
Many thanks,
Ha Nguyen

┆Issue is synchronized with this Jira Task by Unito

Add timestamps to debug files

The "debug" files (all files named xyz-errors.log) are generated by DSBulk directly (see LogManager). Contrary to the main log file operation.log, these files are not hndled by logback, and as a consequence, do not print a timestamp for each error. It would be useful to have a timestamp printed for each error.

┆Issue is synchronized with this Jira Task by Unito

Display bug in console metrics reporter for the number of failed operations

After finishing a long-running (16 hours+) job, we got the following output:

Username and password provided but auth provider not specified, inferring PlainTextAuthProvider
A cloud secure connect bundle was provided: ignoring all explicit contact points.
A cloud secure connect bundle was provided and selected operation performs writes: changing default consistency level to LOCAL_QUORUM.
Operation directory: /mnt/data/logs/LOAD_20220808-171139-742199
      total | failed | rows/s | p50ms | p99ms | p999ms | batches
600,000,000 |      0 | 10,005 |  2.11 | 10.81 |  20.32 |    1.00
Operation LOAD_20220808-171139-742199 completed with 76 errors in 16 hours, 39 minutes and 29 seconds.
Rejected records can be found in the following file(s): load.bad
Errors are detailed in the following file(s): load-errors.log
A summary of the operation in CSV format can be found in summary.csv.

There is a discrepancy between the reported number of failed records in the table (0) and the 76 errors in the final summary.

From the following operations.log you can see the discrepancy as well.

2022-08-08 17:11:39 INFO  Username and password provided but auth provider not specified, inferring PlainTextAuthProvider
2022-08-08 17:11:39 INFO  A cloud secure connect bundle was provided: ignoring all explicit contact points.
2022-08-08 17:11:39 INFO  A cloud secure connect bundle was provided and selected operation performs writes: changing default consistency level to LOCAL_QUORUM.
2022-08-08 17:11:39 INFO  Operation directory: /mnt/data/logs/LOAD_20220808-171139-742199
2022-08-09 09:51:11 WARN  Operation LOAD_20220808-171139-742199 completed with 76 errors in 16 hours, 39 minutes and 29 seconds.
2022-08-09 09:51:11 INFO  Records: total: 600,000,000, successful: 600,000,000, failed: 0
2022-08-09 09:51:11 INFO  Batches: total: 600,000,000, size: 1.00 mean, 1 min, 1 max
2022-08-09 09:51:11 INFO  Memory usage: used: 3,424 MB, free: 1,469 MB, allocated: 4,894 MB, available: 7,828 MB, total gc count: 23,256, total gc time: 379,839 ms
2022-08-09 09:51:11 INFO  Writes: total: 600,000,000, successful: 599,999,924, failed: 76, in-flight: 0
2022-08-09 09:51:11 INFO  Throughput: 10,005 writes/second
2022-08-09 09:51:11 INFO  Latencies: mean 2.11, 75p 2.29, 99p 10.81, 999p 20.32 milliseconds
2022-08-09 09:51:13 INFO  Final stats:
2022-08-09 09:51:13 INFO  Records: total: 600,000,000, successful: 600,000,000, failed: 0
2022-08-09 09:51:13 INFO  Batches: total: 600,000,000, size: 1.00 mean, 1 min, 1 max
2022-08-09 09:51:13 INFO  Memory usage: used: 3,430 MB, free: 1,463 MB, allocated: 4,894 MB, available: 7,828 MB, total gc count: 23,256, total gc time: 379,839 ms
2022-08-09 09:51:13 INFO  Writes: total: 600,000,000, successful: 599,999,924, failed: 76, in-flight: 0
2022-08-09 09:51:13 INFO  Throughput: 10,005 writes/second
2022-08-09 09:51:13 INFO  Latencies: mean 2.11, 75p 2.29, 99p 10.81, 999p 20.32 milliseconds
2022-08-09 09:51:13 INFO  Rejected records can be found in the following file(s): load.bad
2022-08-09 09:51:13 INFO  Errors are detailed in the following file(s): load-errors.log
2022-08-09 09:51:13 INFO  A summary of the operation in CSV format can be found in summary.csv.

┆Issue is synchronized with this Jira Task by Unito

Exclude unsupported types from automatic timestamp and TTL preservation

Currently when unloading using the options -timestamp or -ttl (to automatically preserve cell timestamp and TTL) the operation will fail if the table being unloaded contains collections.

With OSS Cassandra, the operation fails right away because the generated query is invalid. The selector functions writetime() and ttl() cannot be applied to collections (even frozen ones, see CASSANDRA-14533):

2022-02-16 14:27:36 ERROR Operation UNLOAD_TIMESTAMP_TTL failed: Cannot use selection function writeTime on collections.
com.datastax.oss.driver.api.core.servererrors.InvalidQueryException: Cannot use selection function writeTime on collections
	at com.datastax.oss.driver.api.core.servererrors.InvalidQueryException.copy(InvalidQueryException.java:48)
	at com.datastax.oss.driver.internal.core.util.concurrent.CompletableFutures.getUninterruptibly(CompletableFutures.java:149)
	at com.datastax.oss.driver.internal.core.cql.CqlPrepareSyncProcessor.process(CqlPrepareSyncProcessor.java:59)
	at com.datastax.oss.driver.internal.core.cql.CqlPrepareSyncProcessor.process(CqlPrepareSyncProcessor.java:31)
	at com.datastax.oss.driver.internal.core.session.DefaultSession.execute(DefaultSession.java:230)
	at com.datastax.oss.driver.api.core.cql.SyncCqlSession.prepare(SyncCqlSession.java:224)
	at com.datastax.oss.dsbulk.workflow.commons.settings.SchemaSettings.prepareStatementAndCreateMapping(SchemaSettings.java:720)
	at com.datastax.oss.dsbulk.workflow.commons.settings.SchemaSettings.createReadResultMapper(SchemaSettings.java:510)
	at com.datastax.oss.dsbulk.workflow.unload.UnloadWorkflow.init(UnloadWorkflow.java:150)
	at com.datastax.oss.dsbulk.runner.WorkflowThread.run(WorkflowThread.java:52)

With DSE, these functions are allowed to be applied to collections; but they return a list of BIGINTs (one element per collection item). The operation then fails during the row mapping phase with:

java.lang.IllegalArgumentException: Could not deserialize column "writetime(contact)" of type List(BIGINT, not frozen) as java.lang.String
	at com.datastax.oss.dsbulk.workflow.commons.schema.DefaultReadResultMapper.map(DefaultReadResultMapper.java:77)
	at java.lang.Thread.run(Thread.java:748) [17 skipped]
Caused by: java.lang.IllegalArgumentException: Cannot create a WriteTimeCodec for List(BIGINT, not frozen)
	at com.datastax.oss.dsbulk.mapping.DefaultMapping.createWritetimeCodec(DefaultMapping.java:101)
	at com.datastax.oss.dsbulk.mapping.DefaultMapping.lambda$codec$0(DefaultMapping.java:90)

We could improve this:

  • If the server does not support unloading timestamp and TTL of collection columns (OSS C*):
    • Simply do not export it
    • Log a warning that timestamp and TTL data will not be preserved for these columns, and keep going.
  • If the server supports it (DSE), then we should be able to export at least one value of the list. It's not possible to specify individual element timestamps and TTLs when loading anyways, so exporting one single value is the best we can do.
    • Modify WriteTimeCodec to also accept list<bigint>, and only retain the first element.
    • For TTL variables we'll need to intercept them the same way we intercept writetime variables. Then, only retain the first element.
    • Log a warning explaining that some timestamps and TTLs may be lost.

┆Issue is synchronized with this Jira Task by Unito

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.