Giter Club home page Giter Club logo

cartridge-spark's Introduction

Tarantool

Actions Status Code Coverage OSS Fuzz Telegram GitHub Discussions Stack Overflow

Tarantool is an in-memory computing platform consisting of a database and an application server.

It is distributed under BSD 2-Clause terms.

Key features of the application server:

Key features of the database:

  • MessagePack data format and MessagePack based client-server protocol.
  • Two data engines: 100% in-memory with complete WAL-based persistence and an own implementation of LSM-tree, to use with large data sets.
  • Multiple index types: HASH, TREE, RTREE, BITSET.
  • Document oriented JSON path indexes.
  • Asynchronous master-master replication.
  • Synchronous quorum-based replication.
  • RAFT-based automatic leader election for the single-leader configuration.
  • Authentication and access control.
  • ANSI SQL, including views, joins, referential and check constraints.
  • Connectors for many programming languages.
  • The database is a C extension of the application server and can be turned off.

Supported platforms are Linux (x86_64, aarch64), Mac OS X (x86_64, M1), FreeBSD (x86_64).

Tarantool is ideal for data-enriched components of scalable Web architecture: queue servers, caches, stateful Web applications.

To download and install Tarantool as a binary package for your OS or using Docker, please see the download instructions.

To build Tarantool from source, see detailed instructions in the Tarantool documentation.

To find modules, connectors and tools for Tarantool, check out our Awesome Tarantool list.

Please report bugs to our issue tracker. We also warmly welcome your feedback on the discussions page and questions on Stack Overflow.

We accept contributions via pull requests. Check out our contributing guide.

Thank you for your interest in Tarantool!

cartridge-spark's People

Contributors

akudiyar avatar savolgin avatar

Stargazers

 avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

cartridge-spark's Issues

Release 0.1.1 is not usable because of scoverage error

When the cartridge-spark module is added as a dependency:

        <dependency>
            <groupId>io.tarantool</groupId>
            <artifactId>spark-tarantool-connector_2.11</artifactId>
            <version>0.1.1</version>
        </dependency>

and the application is started, the following error appears:

java.io.FileNotFoundException: \Users\a.kuzin\sources\tarantool\spark\target\scala-2.11\scoverage-data\scoverage.measurements.7002b66b-f285-4bda-9c66-8fb4a71d1d73.58 (The system cannot find the path specified)

This error is related to scoverage/sbt-scoverage#306 (sadly, still isn't fixed after 3 years).

Possible solutions: disable scoverage at all or manage to disable it before publishing.

Add an ability to retry specific dataset write errors

When some kinds of errors occur, failing the whole dataset can be too ineffective (see #35).

Proposal:

  • Implement limited retries for specific kinds of errors with an ability for the customer to specify what error types should be retried;
  • Retry writing of one batch in the executor, don't propagate the error until the retry limit is exhausted;
  • Log unsuccessful retries;
  • This algorithm can be combined with both error handling types ("fail fast" and "collect errors").

Reference implementation in cartridge-java: https://github.com/tarantool/cartridge-java/blob/master/src/main/java/io/tarantool/driver/core/RetryingTarantoolClient.java

Allow user to specify whether the field names in a Dataset query should be lowercased or not

Currently, when a schema is generated from the Dataset, the field names are implicitly lowercased: https://github.com/tarantool/cartridge-spark/blob/master/src/main/scala/org/apache/spark/sql/tarantool/TarantoolSchema.scala#L32. That was done so because usually the queries have the field names uppercased and therefore not compatible with the schema in Tarantool. But if the schema in Tarantool has uppercase letters in the field names, there is the same problem.

Acceptance criteria:

  • Users are able to specify the field transformation
  • The field names are not transformed by default

OptionalDataException when deserializing the task closure

Deserialization of the task closure sometimes causes an error as follows:

023-01-24 17:58:48,625 WARN  [task-result-getter-0] scheduler.TaskSetManager (Logging.scala:logWarning(66)) - Lost task 8.0 in stage 0.0 (TID 0, ..., executor 1): java.io.OptionalDataException
  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647)
  at java.io.ObjectInputStream.readObject(ObjectInputStream.java:464)
  at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
  at java.util.LinkedList.readObject(LinkedList.java:1149)
  at sun.reflect.GeneratedMethodAccessor4.invoke(Unknown Source)
  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:498)
  at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1170)
  at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2232)
  at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2123)
  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624)
  at java.io.ObjectInputStream.readObject(ObjectInputStream.java:464)
  at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
  at java.util.HashMap.readObject(HashMap.java:1412)
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:498)
  at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1170)
  at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2232)
  at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2123)
  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624)
  at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2341)
  at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2265)
  at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2123)
  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624)
  at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2341)
  at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2265)
  at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2123)
  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624)
  at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2341)
  at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2265)
  at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2123)
  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624)
  at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2341)
  at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2265)
  at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2123)
  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624)
  at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2341)
  at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2265)
  at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2123)
  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624)
  at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2341)
  at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2265)
  at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2123)
  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624)
  at java.io.ObjectInputStream.readObject(ObjectInputStream.java:464)
  at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
  at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
  at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:83)
  at org.apache.spark.scheduler.Task.run(Task.scala:123)

This race condition appears in a highly loaded and concurrent environment and reproduction is not stable. Apparently, the source of these errors is some race condition when writing the task closure data which makes it corrupted.

Implement more efficient stable partitioning

Motivation

Currently, no partitioning exists, all requests are sent to the vshard/Cartridge routers specified in the configuration and balanced using the driver internal mechanism, and the routers then translate them to the vshard nodes. It is reliable enough, but in the case of parallel processing of the Spark jobs, the routers quickly will become a bottleneck, especially if the number of routers is less than the number of storage nodes.

Goal

Find and implement a way of distributing the requests across the vshard/Cartridge cluster without bottlenecks on the router nodes.

Research

TODO

TODO

  • Research the possibility of reducing the overhead on vshard routers when using bulk writing and push-down filters
  • Research the possibility of re-implementing (a part of) the router logic in the cartridge-java driver

Implement efficient bulk writing of datasets

Motivation

The current implementation of Dataset writing sends records (tuples) to the Tarantool nodes one-by-one, which is not very efficient (although atomic and reliable enough). The load time may be out of the desirable limits when loading large datasets with this approach.

Research

TODO

TODO:

  • Write some load tests for measuring the load time and researching the ways of improving it
  • Research the ways of utilizing the node-local computation for efficient bulk writing

Fail fast if dataset write failed

Currently, the dataset write algorithm on each executor collects the record write errors and fails with an exception only after finishing the task if any errors are present.

This mode is not suitable for batch writing and may not be suitable in case when writing of the whole dataset or a batch is repeated without replacing the records (in the "insert" mode). In the latter case "duplicate key" errors may occur when previously successfully written records are sent to the cluster again.

Proposal:

  • implement a new error handling algorithm that fails the executor task once the first error occurs;
  • make this algorithm the default one, with an option to enable the alternative one via config

Further, implementation of the fail-fast algorithm can include checking the error type and retrying the writes in certain cases, as it is done with RetryingTarantoolClient in cartridge-java.

Support push-down filters (with pre-filtering on the Tarantool nodes)

Motivation

Currently, the push-down filters aren't implemented.

Implementing this mechanism will significantly improve the operations in the cluster (see this article, for example).

Research

The implementation is possible using filtering on the Tarantool nodes via box or tarantool/crud API. However, using crud API requires the presence of some guarantees that the filtering is performed efficiently (on one node without extra network hops). This will require consistent partitioning based on the virtual buckets and relying on the stability of the bucket position (either via parallel jobs on different nodes containing the target bucket or via bucket pinning, which is considered bad practice)..

TODO

  • Describe a task about stable partitioning in the vshard/Cartridge cluster

Support Timestamp field types for Tarantool 2.11+

The TimeStamp type schema type should be converted to Tarantool type datetime, supported only for Tarantool 2.11+.

Support for an Instant converter was added in Tarantool driver versions 0.11.0+. For driver versions below it, there are no built-in converters for date-like Java types, so it is recommended to use either numeric or string time representation.

Timestamp type in Spark: https://spark.apache.org/docs/3.1.3/api/java/org/apache/spark/sql/types/TimestampType.html

Investigate possibilities of running coverage in CI

sbt-scoverage is currently not compatible with both Scala 2.11 and Scala 2.13, so its configuration was removed in e62f3f1.
Although proper integration with coverage reports compatible with Github Actions is still needed as a way for monitoring the project health.

We need to investigate possible alternatives or create a configuration that launches sbt-scoverage in CI for Scala 2.12 only.

Insert fields into a Dataset by name

When inserting fields into a Dataset the values are taken by position and not name, see this example:

List of fields in the space schema (in order): id, bucket_id, unique_key, book_name, author, ...
When inserting a row with fields (in order): id, bucketId, bookName, author, year connector returns the following Tarantool error: Failed to insert: Tuple field 5 (author) type does not match one required by operation: expected string, got unsigned.

This way a request works:

select 1 as id, null as bucketId, 'dq' as uniqueKey, 'Don Quixote' as bookName, 'Miguel de Cervantes' as author, 1605 as year

and this way it doesn't:

select 1 as id, null as bucketId, 'Don Quixote' as bookName, 'Miguel de Cervantes' as author, 1605 as year, 'dq' as uniqueKey

Expected behavior

An error about the required field having an empty value, and the order of fields in the insert request should not affect the real order of the fields in the row inserted to Tarantool, if the fields are specified by names.

Truncate the space in Overwrite mode and replace records in Append

Currently, when the Append mode is selected, inserting a record with an existing primary key will lead to an exception; when the Overwrite mode is selected, the records will be appended or overwritten if a record with the same primary key exists.

Acceptance criteria:

  • When the Append mode is selected, the records are inserted or replaced if the primary key is not unique
  • When the Overwrite mode is selected, the space is truncated and the records are inserted afterwards
  • Ignore mode works as is (inserts records only if the corresponding space is empty)
  • ErrorIfExists mode works as is (produces an error if the corresponding space is not empty)

Use "replace" operation instead of "insert" for all write modes

Highly loaded Tarantool clusters may produce spurious timeout errors, which cause Spark tasks to fail. In that case, it is a common Spark setup to automatically restart a task for eventually finishing the write operations. But these retries are not compatible with the "insert" operations, since they cause "duplicate key" errors for the tuples that were saved successfully (having in mind that 2-phase commits and other consistency protocols are still not available in a sharded Tarantool cluster).

So, all write modes should only incur replace operations.

Add support for BigInteger and BigDecimal types

Currently the Tarantool Java driver (cartridge-java) supports these types, but when creating a decimal field in Tarantool it results in null values when inserting the data.

Schema in Tarantool:

test_space: 
     engine: memtx 
     is_local: false 
     temporary: false 
     sharding_key: [idreg] 
     format: 
       - {name: bucket_id          , type: unsigned    , is_nullable: false} 
       - {name: idreg              , type: decimal    , is_nullable: false} 
       - {name: regnum             , type: string      , is_nullable: true} 
     indexes: 
       - name: index_id 
         unique: true 
         type: TREE 
         parts: 
           - {path: idreg, type: decimal, is_nullable: false} 
       - name: bucket_id 
         unique: false 
         type: TREE 
         parts: 
           - {path: bucket_id, type: unsigned, is_nullable: false}

Spark SQL query:

  spark.sql("create database if not exists dl_raw") 
  spark.sql("drop table if exists DL_RAW.test_space") 
 
  spark.sql( 
    """ 
      |create table if not exists DL_RAW.test_space ( 
      |     bucket_id             integer 
      |    ,idreg                 decimal(38) 
      |    ,regnum                string 
      |  ) stored as orc""".stripMargin) 
  spark.sql( 
    """ 
      |insert into dl_raw.test_space values 
      |(1, 1085529600000, '404503014700028'), 
      |(1, 1085529600000, '404503014700028'), 
      |(1, 1087430400000, '304503016900085') 
      |""".stripMargin) 
 
 
  val ds = spark.table("dl_raw.test_space") 
 
 
  ds.show(false) 
  ds.printSchema()

Fix parallel running of tests suites for Scala 2.12

The second Cartridge cluster conflicts with the first one:

00:02:06.636 [nioEventLoopGroup-2-2] INFO  i.t.d.c.c.AbstractTarantoolConnectionManager - Connected to Tarantool server at localhost/127.0.0.1:49177
00:02:06.639 [docker-java-stream-590997831] INFO  i.t.s.c.i.SharedJavaSparkContext - STDOUT: testapp.router | 2022-08-06 00:02:06.638 [1835] main/154/main twophase.lua:497 W> Updating config clusterwide...
00:02:06.640 [docker-java-stream-590997831] INFO  i.t.s.c.i.SharedJavaSparkContext - STDOUT: testapp.router | 2022-08-06 00:02:06.639 [1835] main/154/main twophase.lua:538 E> Invalid cluster topology config: servers[72d3a725-b73d-4f15-adb3-0fc698b612f7].uri "localhost:3304" collision with another server
00:02:06.640 [docker-java-stream-590997831] INFO  i.t.s.c.i.SharedJavaSparkContext - STDOUT: testapp.router | stack traceback:
00:02:06.640 [docker-java-stream-590997831] INFO  i.t.s.c.i.SharedJavaSparkContext - STDOUT: testapp.router | 	/app/.rocks/share/tarantool/cartridge/topology.lua:613: in function 'validate_consistency'
00:02:06.640 [docker-java-stream-590997831] INFO  i.t.s.c.i.SharedJavaSparkContext - STDOUT: testapp.router | 	/app/.rocks/share/tarantool/cartridge/topology.lua:742: in function </app/.rocks/share/tarantool/cartridge/topology.lua:728>
00:02:06.640 [docker-java-stream-590997831] INFO  i.t.s.c.i.SharedJavaSparkContext - STDOUT: testapp.router | 	[C]: in function 'xpcall'

Different ports and cluster cookie need to be set for the pipelines.

Launch Cartridge cluster as a standalone service for parallel tests

Launching the Cartridge cluster in Docker requires many kludges and leads to a waste of time and overall test instability. Docker-based approach can be left as an auxiliary or be used only for the local tests. Using a singleton cluster (or as many clusters as the number of tested Tarantool versions) for all test runs will be more effective.

Reference in Github actions doc: https://docs.github.com/en/actions/using-containerized-services/about-service-containers#creating-service-containers

Spark connector doesn't close driver connection when performing a Dataset read

This happens when the read is performed with Dataset schema auto-determination:

val df = spark.read
    .format("org.apache.spark.sql.tarantool")
    .option("tarantool.space", "test_space")
    .load()

  // Space schema from Tarantool will be used for mapping the tuple fields
  val tupleIDs: Array[Any] = df.select("id").rdd.map(row => row.get(0)).collect()

Unclosed connection leads to the emergency termination of the client applications.

Spark 3.x and PySpark

Hi. Do you have plans to support stable spark 3.x and pyspark modules in your connector?

Support Dataset extraction

Problem statement

Dataset is the modern and must-be-supported way for loading data to Spark. RDD is the old way.

Example code to be executed:

Dataset ds1 = spark.getSQLContext().createDataset(rdd1.rdd(), Encoders.bean(TarantoolTuple.class));

Requirements

  • Implement DataReader
  • Implement Dataset class for Tarantool
  • Support data loading via ProxyClient
  • Partitions must be separated by bucket ID

Related tasks

  • Cartridge administration (at least retrieving the vshard information)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.