Giter Club home page Giter Club logo

arrow-ballista's Introduction

Ballista: Distributed SQL Query Engine, built on Apache Arrow

Ballista is a distributed SQL query engine powered by the Rust implementation of Apache Arrow and Apache Arrow DataFusion.

If you are looking for documentation for a released version of Ballista, please refer to the Ballista User Guide.

Overview

Ballista implements a similar design to Apache Spark (particularly Spark SQL), but there are some key differences:

  • The choice of Rust as the main execution language avoids the overhead of GC pauses and results in deterministic processing times.
  • Ballista is designed from the ground up to use columnar data, enabling a number of efficiencies such as vectorized processing (SIMD) and efficient compression. Although Spark does have some columnar support, it is still largely row-based today.
  • The combination of Rust and Arrow provides excellent memory efficiency and memory usage can be 5x - 10x lower than Apache Spark in some cases, which means that more processing can fit on a single node, reducing the overhead of distributed compute.
  • The use of Apache Arrow as the memory model and network protocol means that data can be exchanged efficiently between executors using the Flight Protocol, and between clients and schedulers/executors using the Flight SQL Protocol

Architecture

A Ballista cluster consists of one or more scheduler processes and one or more executor processes. These processes can be run as native binaries and are also available as Docker Images, which can be easily deployed with Docker Compose or Kubernetes.

The following diagram shows the interaction between clients and the scheduler for submitting jobs, and the interaction between the executor(s) and the scheduler for fetching tasks and reporting task status.

Ballista Cluster Diagram

See the architecture guide for more details.

Features

  • Supports HDFS as well as cloud object stores. S3 is supported today and GCS and Azure support is planned.
  • DataFrame and SQL APIs available from Python and Rust.
  • Clients can connect to a Ballista cluster using Flight SQL.
  • JDBC support via Arrow Flight SQL JDBC Driver
  • Scheduler web interface and REST UI for monitoring query progress and viewing query plans and metrics.
  • Support for Docker, Docker Compose, and Kubernetes deployment, as well as manual deployment on bare metal.

Performance

We run some simple benchmarks comparing Ballista with Apache Spark to track progress with performance optimizations. These are benchmarks derived from TPC-H and not official TPC-H benchmarks. These results are from running individual queries at scale factor 10 (10 GB) on a single node with a single executor and 24 concurrent tasks.

The tracking issue for improving these results is #339.

benchmarks

Getting Started

The easiest way to get started is to run one of the standalone or distributed examples. After that, refer to the Getting Started Guide.

Project Status

Ballista supports a wide range of SQL, including CTEs, Joins, and Subqueries and can execute complex queries at scale.

Refer to the DataFusion SQL Reference for more information on supported SQL.

Ballista is maturing quickly and is now working towards being production ready. See the roadmap for more details.

Contribution Guide

Please see the Contribution Guide for information about contributing to Ballista.

arrow-ballista's People

Contributors

alamb avatar andygrove avatar bkietz avatar cpcloud avatar dandandan avatar dependabot[bot] avatar emkornfield avatar fsaintjacques avatar houqp avatar jimexist avatar jorgecarleitao avatar julienledem avatar kou avatar kszucs avatar liukun4515 avatar matthewmturner avatar nealrichardson avatar nevi-me avatar paddyhoran avatar pitrou avatar seddonm1 avatar sunchao avatar ted-jiang avatar thinkharderdev avatar tustvold avatar wesm avatar xhochy avatar xudong963 avatar yahonanjing avatar yjshen avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

arrow-ballista's Issues

Leverage Atomic for the in-memory states in Scheduler

Is your feature request related to a problem or challenge? Please describe what you are trying to do.

Currently we leverage read write lock for the in-memory states in scheduler in global level, which is too coarse. When tens of thousands of states need to be updated, it will downgrade the whole system performance because of the write lock.

Describe the solution you'd like

Therefore, we propose to introduce Atomic for the states to change the write lock to the read lock.

Describe alternatives you've considered

Additional context

Add config for concurrent_task in executor

Is your feature request related to a problem or challenge? Please describe what you are trying to do.

        // Use a dedicated executor for CPU bound tasks so that the main tokio
            // executor can still answer requests even when under load
            // TODO make it configurable
            let dedicated_executor = DedicatedExecutor::new("task_runner", 4);

Before in DedicatedExecutor pool is fixed size 4 thread,
When we facing large task set, it will cut down the system throughput.
Need align with scheduler_config_spec.toml

Describe the solution you'd like
A clear and concise description of what you want to happen.

Describe alternatives you've considered
A clear and concise description of any alternative solutions or features you've considered.

Additional context
Add any other context or screenshots about the feature request here.

Suspicious slow test in Ballista

Describe the bug

SLOW [> 60.005s]        ballista-executor cpu_bound_executor::tests::executor_shutdown_while_task_running
SLOW [>120.011s]         ballista-executor cpu_bound_executor::tests::executor_shutdown_while_task_running
SLOW [>180.015s]         ballista-executor cpu_bound_executor::tests::executor_shutdown_while_task_running
SLOW [>240.016s]         ballista-executor cpu_bound_executor::tests::executor_shutdown_while_task_running

To Reproduce

On the master branch, run cargo nextest run, not always present, on average occurs 2-3 times out of 5 runs.

Cannot build Ballista docker images on Apple silicon

Describe the bug
I can't run the integration tests on an M1 MacBook Air.

apache/arrow-datafusion#6 2.909 musl-gcc  -I. -Iinclude -fPIC -pthread -m64 -Wa,--noexecstack -Wall -O3 -fPIC -DOPENSSL_USE_NODELETE -DL_ENDIAN -DOPENSSL_PIC -DOPENSSL_CPUID_OBJ -DOPENSSL_IA32_SSE2 -DOPENSSL_BN_ASM_MONT -DOPENSSL_BN_ASM_MONT5 -DOPENSSL_BN_ASM_GF2m -DSHA1_ASM -DSHA256_ASM -DSHA512_ASM -DKECCAK1600_ASM -DRC4_ASM -DMD5_ASM -DAES_ASM -DVPAES_ASM -DBSAES_ASM -DGHASH_ASM -DECP_NISTZ256_ASM -DX25519_ASM -DPADLOCK_ASM -DPOLY1305_ASM -DOPENSSLDIR="\"/usr/local/musl/ssl\"" -DENGINESDIR="\"/usr/local/musl/lib/engines-1.1\"" -DNDEBUG -DOPENSSL_NO_SECURE_MEMORY -MMD -MF apps/app_rand.d.tmp -MT apps/app_rand.o -c -o apps/app_rand.o apps/app_rand.c
apache/arrow-datafusion#6 2.912 cc1: error: unrecognized command line option '-m64'

To Reproduce
./dev/integration-tests.sh

Expected behavior
Should work.

Additional context
None

Ballista example project does not build

Describe the bug
A clear and concise description of what the bug is.
The example at https://github.com/apache/arrow-datafusion/blob/master/ballista/rust/client/README.md#executing-a-query does not build

To Reproduce
Steps to reproduce the behavior:

  • Create a new cargo project with cargo new ballista-example
  • Add the three dependencies
  • Add the code
  • Run cargo build

Expected behavior
A clear and concise description of what you expected to happen.
A successful build

Additional context
Add any other context about the problem here.

The error is

error: failed to select a version for `zstd-safe`.
    ... required by package `zstd v0.9.0+zstd.1.5.0`
    ... which satisfies dependency `zstd = "^0.9"` of package `parquet v6.1.0`
    ... which satisfies dependency `parquet = "^6.1.0"` of package `datafusion v6.0.0`
    ... which satisfies dependency `datafusion = "^6.0.0"` of package `ballista v0.6.0`
    ... which satisfies dependency `ballista = "^0.6"` of package `ballista-example v0.1.0`
versions that meet the requirements `=4.1.1` are: 4.1.1+zstd.1.5.0

all possible versions conflict with previously selected packages.

  previously selected package `zstd-safe v4.1.4+zstd.1.5.2`
    ... which satisfies dependency `zstd-safe = "=4.1.4"` of package `zstd v0.10.0+zstd.1.5.2`
    ... which satisfies dependency `zstd = "^0.10"` of package `parquet v9.0.2`
    ... which satisfies dependency `parquet = "^9.0.0"` of package `datafusion v7.0.0`
    ... which satisfies dependency `datafusion = "^7.0"` of package `ballista-example v0.1.0`

failed to select a version for `zstd-safe` which could resolve this conflict

Need clean up intermediate data in Ballista

Is your feature request related to a problem or challenge? Please describe what you are trying to do.
We need to check whether the states saved in the sled is consumed by UI or not.
if not consumed by UI, we can clean the job/task data when the SQL is finished.

If they are consumed by UI, we can choose either LRU based policy like Spark or time based eviction policy.

Regarding shuffle files, we also need to implement a way to clean them. This is a little bit complex because we need to clean up the files on all the hosts. We might need to add new RPCs .

Make scheduler prefer assign the task with same jobId to same executor

Is your feature request related to a problem or challenge? Please describe what you are trying to do.
In current code, each poll work request will scan all waiting task in the same name space.https://github.com/apache/arrow-datafusion/blob/d7ae8c2631b5ea86ac2328530f0a4745daaf0bda/ballista/rust/scheduler/src/state/mod.rs#L283

plan1 -> shuffle wirte -> plan2

Seems p1 and p2 run on the same executor may avoid sending intermediate results.
Describe the solution you'd like

  1. executor request job with jobId
  2. scheduler send same job task back

Describe alternatives you've considered

Use another channel to update the status of a task set for executor

Is your feature request related to a problem or challenge? Please describe what you are trying to do.

Currently for per task status, there will be a rpc for update its status, which is not sufficient, especially there're tens of thousands tasks.

Describe the solution you'd like

It's better to leverage another channel to update a bunch of finished tasks.

Describe alternatives you've considered

Additional context

Ballista: Fix hacks around concurrency=2 to force hash-partitioned joins

Is your feature request related to a problem or challenge? Please describe what you are trying to do.

By default, DataFusion uses hash-partitioned joins if concurrency > 1 which led to me adding this hacky code in a couple of places in Ballista.

let config = ExecutionConfig::new().with_concurrency(2); // TODO: this is hack to enable partitioned joins
let mut ctx = ExecutionContext::with_config(config);

Describe the solution you'd like
I'm actually not sure what the solution should be, but I would like to be able to tell the context to use hash-partitioned joins, separately from specifying concurrency.

Describe alternatives you've considered
None

Additional context
This code is running in the scheduler, not in the executor where the query actually executes. The scheduler concurrency should not impact how the query is planned.

Document how to run TPC-H benchmarks in Kubernetes

Is your feature request related to a problem or challenge? Please describe what you are trying to do.
I had to spend time figuring out how to deploy the benchmarks to Kubernetes, so I plan on documenting this.

Describe the solution you'd like

  • Dockerfile for packaging up benchmarks
  • Example YAML for running as a pod.

Here is the YAML I have been using:

apiVersion: v1
kind: Pod
metadata:
  name: tpch
  namespace: default
spec:
  containers:
    - image: andygrove/ballista-arm64
      command: [ "/tpch",
                 "benchmark",
                 "--query=1",
                 "--path=/mnt/tpch/parquet-sf100-partitioned/",
                 "--format=parquet",
                 "--concurrency=24",
                 "--iterations=1",
                 "--debug",
                 "--host=ballista-scheduler",
                 "--port=50050"]
      imagePullPolicy: Always
      name: tpch
      volumeMounts:
          - mountPath: /mnt/tpch/parquet-sf100-partitioned/
            name: data
  restartPolicy: Never
  volumes:
    - name: data
      persistentVolumeClaim:
        claimName: data-pv-claim

Describe alternatives you've considered
None

Additional context
None

Support trace id for each query

Is your feature request related to a problem or challenge? Please describe what you are trying to do.
As a distributed query engine, many sql query will submit to the scheduler, and the scheduler or executor server will log some message for each query.

But from current status, we can't find out the logs corresponding to the specified query, we should search all the log and find the useful information for our query.

If we have the trace id or the query id, we can log the trace id or query in the log event format, just like
[time trace id code paht] : log info.

From above format, we can find out all of query log for the specified query.

@andygrove @alamb

Do you have other ideas?

Describe the solution you'd like
A clear and concise description of what you want to happen.

Describe alternatives you've considered
A clear and concise description of any alternative solutions or features you've considered.

Additional context
Add any other context or screenshots about the feature request here.

ShuffleWriterExec::schema mismatch

Describe the bug

ShuffleWriterExec::schema() returns the schema of the underlying plan, however, ShuffleWriterExec::execute returns a stream of RecordBatch containing metadata and a consequently completely different schema.

To Reproduce

Use ShuffleWriterExec

Expected behavior

ExecutionPlan::schema should return the same schema as the SendableRecordBatchStream yielded by ExecutionPlan::execute.

Additional context

There is a potentially valid question as to why we have the schema stored in so many places...

Implement hash partitioned aggregation in Ballista

Is your feature request related to a problem or challenge? Please describe what you are trying to do.
PR apache/datafusion#320 implemented hash partitioned aggregation in DataFusion. We should implement the same optimization in Ballista.

Describe the solution you'd like
DataFusion and Ballista should have consistent query plans.

Describe alternatives you've considered
None

Additional context
None

Upgrade dependency of arrow-datafusion to commit d0d5564b8f689a01e542b8c1df829d74d0fab2b0

Is your feature request related to a problem or challenge? Please describe what you are trying to do.

Recently there are many improvements and bug fixes in arrow-datafusion, especially the object store interface refactoring apache/datafusion#2677. It's better for us to upgrade this dependency. By the commit d0d5564b8f689a01e542b8c1df829d74d0fab2b0, the arrow will be upgraded to 18.0.0

Describe the solution you'd like

Describe alternatives you've considered

Additional context

Ballista does not support external file systems

I added the s3 (minio_store) module in datafusion/src/datasource/object_store, and registered the minio_store in benchmarks/tpch.rs through the register_object_store() method of ExecutionContext. But when I start the Scheduler and Executor, and then run "cargo run --bin tpch --release****", the data in minio cannot be read.
After checking the code, I found that LocalFileSystem is used directly at ballista/rust/core/src/serde/physical_plan/from_proto.rs(789) and ballista/rust/core/src/serde/logical_plan/from_proto.rs(201), so I modified these two codes to minio_store and it ran successfully.
How to make Ballista support external file system?

The project address after I added minio_store
https://github.com/ZhangqyTJ/arrow-datafusion.git
Modify the code before running
ballista/rust/core/src/serde/physical_plan/from_proto.rs(789)
ballista/rust/core/src/serde/logical_plan/from_proto.rs(201)
Run command
To run the scheduler from source:

cd $ARROW_HOME/ballista/rust/scheduler
RUST_LOG=info cargo run --release

By default the scheduler will bind to 0.0.0.0 and listen on port 50050.

To run the executor from source:

cd $ARROW_HOME/ballista/rust/executor
RUST_LOG=info cargo run --release

To run the benchmarks:

    cargo run --bin tpch --release benchmark ballista --host localhost --port 50050 --query 1 --partitions 1 --path s3://test1/tpch_tbl/cutdata --format tbl --storage-type minio --endpoint 192.168.75.81:9091 --username minioadmin --password minioadmin --bucket test1 

Unable to build master

Describe the bug

branch: master

device: macOs, latest rust tools & compiler version

after cloning , when running cargo buildat project root:

error: failed to select a version for the requirement parquet = "^16.0.0"
candidate versions found which didn't match: 15.0.0, 14.0.0, 13.0.0, ...
location searched: crates.io index
required by package datafusion v9.0.0 (https://github.com/apache/arrow-datafusion?rev=0289bfe6a98bdae371eee29d3f257b173ddb4437#0289bfe6)
... which satisfies git dependency datafusion of package ballista-benchmarks v5.0.0 (/Users/ybouraoui/arrow-ballista/benchmarks)

Ballista: Implement configuration mechanism

Is your feature request related to a problem or challenge? Please describe what you are trying to do.
There is currently no way to specify configuration parameters such as default partition count in Ballista.

Describe the solution you'd like
As a user, I would like to be able to set configurations such as default partition count and have these settings propagated to the scheduler and executors via protobuf.

Describe alternatives you've considered
None

Additional context
None

Ballista context should get file metadata from scheduler, not from local disk

Is your feature request related to a problem or challenge? Please describe what you are trying to do.
I have a Ballista cluster running, and each scheduler and executor has access to TPC-H data locally.
I am running the benchmark client on my desktop, and I do not have access to the data locally.
Query planning fails with "file not found" because BallistaContext::read_parquet is looking for the file on the local file system when it should be getting the file metadata from a scheduler in the cluster.

Describe the solution you'd like
The context should send a gRPC request to the scheduler to get the necessary metadata.

Describe alternatives you've considered
None

Additional context
None

Ballista: Executor must return statistics in CompletedTask / CompletedJob

Is your feature request related to a problem or challenge? Please describe what you are trying to do.
We cannot fix the shuffle mechanism until we have partition stats, or ShuffleReaderExec will attempt to read empty partitions, causing an error.

Describe the solution you'd like
Scheduler should receive partition stats and only try and read from non-empty shuffle partitions.

Describe alternatives you've considered
As a workaround we could write empty shuffle files for empty partitions.

Additional context
None

Support for multi-scheduler deployments

Is your feature request related to a problem or challenge? Please describe what you are trying to do.
A clear and concise description of what the problem is. Ex. I'm always frustrated when [...]
(This section helps Arrow developers understand the context and why for this feature, in addition to the what)

The current architecture only support single scheduler deployments. This has two problems I believe:

  1. It is not ideal for low-latency interactive queries as the scheduler can potentially take a significant time to restart in the case of failure (especially if the serialized state is large).
  2. Query planning itself is not always a trivial mount of work. For instance, when planning a ParquetExec, the scheduler may need to read metadata from a large number of parquet files to gather statistics.

Describe the solution you'd like
A clear and concise description of what you want to happen.

I would like to see Ballista support multiple schedulers. Essentially I would like to put N schedulers beings a load balancer and have both external clients and the executors send requests to the load balancer.

The current implementation is not designed for this as the scheduler state is essentially maintained in memory and only flushed to the state backend for the purposes of recovering from a scheduler restart.

I would propose the following high-level changes:

  1. Client-side caching, if required, should be encapsulated in the StateBackendClient so it can be a backend-specific concern.
  2. We should support redis as a state backend so we can hopefully avoid the need for client-side cacheing entirely.
  3. I think we can avoid the need for distributed locking by refactoring the backend state data structures to avoid the need for locking in the first place. This should mostly be a matter of tracking executor resources and stage task completions with atomic counters.

Describe alternatives you've considered
A clear and concise description of any alternative solutions or features you've considered.

We could support only single-scheduler deployments. Alternatively, we can try to support multi-scheduler deployments with etcd.

Additional context
Add any other context or screenshots about the feature request here.

Ectd/Redis and multi-scheduler deployments are, strictly speaking, completely separate issues but I did want to raise the question of whether etcd is the right default for non-standalone deployments. Likewise, if we do add support for Redis should we maintain support for both etcd and Redis or standardize on one or the other. I think these questions are highly related, but it makes sense to address them separately then I'm fine with that as well.

Ballista cluster sharing by multiple users (with fairness of resource allocation)

Is your feature request related to a problem or challenge? Please describe what you are trying to do.
Let's say we've an organization that deploys ballista cluster to which multiple users connect to. It'd be great if executor slots can be allocated to the users fairly. If a query has consumed all slots allotted to a user, it can't progress until more slots are generated (say by passage of time).

Describe the solution you'd like
Each query execution is associated with some identity, identities are rate limited for the slots and tasks for the identities are queued and consumed at allowed rate (or concurrency)

Describe alternatives you've considered

Additional context
Add any other context or screenshots about the feature request here.

Ballista Enhancement Overview

Is your feature request related to a problem or challenge? Please describe what you are trying to do.

Current Ballista implementation is more like a POC product for verification of whether it's able to run the Datafusion operators in a distributed way. It helps set up the whole framework and works well for just verification. However, it's a long way to introduce it to the production environment for real cases. This issue mainly raises several aspects we need to consider and to enhance for a more robust distributed execution framework.

In big data era, there're many scenarios. Two common ones are query for interactive analysis and batch processing for ETL purpose. There's no silver bullet. Each scenario has its own characteristics and has its own needs. In the following, I'll describe some enhancement we can do for each scenario.

For both interactive query and batch processing:

  • [Necessary] #6
  • [Necessary] apache/datafusion#1703
  • [Necessary] apache/datafusion#1704
  • [Necessary] Support to fast recovery of scheduler restarting
  • [Necessary] Support to better handle executor lost
  • [Necessary] Support to better manage configurations
  • [Nice to have] Support to schedule stages based on priority
  • [Nice to have] Support to cancel SQL or cancel Job
  • [Nice to have] Support executor blacklist

For interactive query:

  • [Necessary] Support push-based task assignment apache/datafusion#1221
  • [Necessary] Support better data exchange, don't spill to disk apache/datafusion#1805
  • [Necessary] Support better result fetching, don't spill to disk

For batch processing:

  • [Necessary] Support task speculative scheduling
  • [Necessary] Support shuffle fetch failure handling and retry
  • [Necessary] Support to reattempt some stages

Support multiple paths for ListingTableScanNode

Is your feature request related to a problem or challenge? Please describe what you are trying to do.
After support apache/datafusion#2775
We could support multiple paths in ListingTableScanNode for ballista.

Describe the solution you'd like
update datafusion in ballista and change

message ListingTableScanNode {
  string table_name = 1;
  string path = 2;
  string file_extension = 3;

to

message ListingTableScanNode {
  string table_name = 1;
  repeated string path = 2;
  string file_extension = 3;
  ProjectionColumns projection = 4;

Describe alternatives you've considered
A clear and concise description of any alternative solutions or features you've considered.

Additional context
Add any other context or screenshots about the feature request here.

Why not include the `ballista-cli` in the member of workspace

Is your feature request related to a problem or challenge? Please describe what you are trying to do.
A clear and concise description of what the problem is. Ex. I'm always frustrated when [...]
(This section helps Arrow developers understand the context and why for this feature, in addition to the what)

From https://github.com/apache/arrow-ballista/blob/a2c794e72f0b6e21b05bb8c05b38f68bba206a44/Cargo.toml#L27 toml file, the ballista-cli is exclude in the workspace.

Could you please explain this @andygrove ?

Describe the solution you'd like
A clear and concise description of what you want to happen.

Describe alternatives you've considered
A clear and concise description of any alternative solutions or features you've considered.

Additional context
Add any other context or screenshots about the feature request here.

Start ballista ui with docker, but it can not found ballista scheduler

Describe the bug
Start ballista ui with docker, but it can not found ballista scheduler

To Reproduce
Steps to reproduce the behavior:
I clone the core from master branch. And then use ./dev/build-ballista-docker.sh and ./dev/build-ui.sh finished the docker image build.
I start the image with a docker-compose.yaml , it`s the content:

version: "2.2"
services:
  ballista-scheduler:
    image: ballista:0.6.0
    command: "/scheduler  --bind-host 0.0.0.0 --bind-port 50050"
    ports:
      - "50050:50050"
    environment:
      - RUST_LOG=info
    volumes:
      - ./data1:/data1
  ballista-executor:
    image: ballista:0.6.0
    command: "/executor --bind-host 0.0.0.0 --bind-port 50051 --scheduler-host ballista-scheduler"
    ports:
      - "50051:50051"
    environment:
      - RUST_LOG=info
    volumes:
      - ./data1:/data1
    depends_on:
      - ballista-scheduler

  ballista-scheduler-ui:
    image: ballista-scheduler-ui:0.6.0
    ports:
      - "4000:80"
    environment:
      - RUST_LOG=debug
    volumes:
      - ./data1:/data1
    depends_on:
      - ballista-scheduler

The docker container is start success. But can not find scheduler in the ui.

image

Expected behavior
A clear and concise description of what you expected to happen.

Additional context
Add any other context about the problem here.

Ballista: UnresolvedShuffleExec and ShuffleReaderExec should show correct partitioning scheme

Is your feature request related to a problem or challenge? Please describe what you are trying to do.
Once apache/datafusion#750 is merged, UnresolvedShuffleExec and ShuffleReaderExec work correctly but they both report their output partitioning as unknown. This doesn't cause any functional issues because no further planning takes place that depends on this being correct, but this could be confusing to end users when viewing query plans. Also, in the future we may want to further optimize the plan during execution and this would require the output partitioning to be reported accurately.

Describe the solution you'd like
Populate the output partitioning in UnresolvedShuffleExec and ShuffleReaderExec and implement the associated serde code.

Describe alternatives you've considered
None

Additional context
None

Ballista should serialize Parquet statistics

Is your feature request related to a problem or challenge? Please describe what you are trying to do.
When the Ballista scheduler or executor deserializes a ParquetExec it collects the statistics again and this is redundant. We should serialize the statistics to avoid this extra work.

Describe the solution you'd like
Add Parquet statistics to serde module.

Describe alternatives you've considered
N/A

Additional context
N/A

Ballista 0.7.0 Release

Is your feature request related to a problem or challenge? Please describe what you are trying to do.
Once we have released DataFusion 8.0.0, I would like to release Ballista 0.7.0.

  • Draft release notes in Google doc here

Support sled path in config file.

Is your feature request related to a problem or challenge? Please describe what you are trying to do.
We have meet sled insert fail due to disk full. So we need to set the sled data save path in config file.

Scheduler will lost the registered executor when restart it in the `push` mode

Describe the bug
When i restart the schedule, the schedule lost all the information of registered executor

To Reproduce
Start a scheduler with below config:

scheduler_policy="PushStaged"

Start a executor with below config:

scheduler_port=50050
scheduler_host="localhost"
# PushStaged or PullStaged
task_scheduling_policy="PushStaged"

then kill the scheduler and restart the scheduler using the same config.

And the scheduler will lost all registered executor in the memory.

Expected behavior
We should recover this data in memory after the scheduler restart.

Solution:
heartbeat with the registered information for the executor

Additional context
Add any other context about the problem here.

Ballista standalone mode tests fail: `context::tests::test_task_stuck_when_referenced_task_failed`

Describe the bug
The following ballista test is failing (not sure when it started failing given the tests weren't run in CI until apache/datafusion#1839 )

---- context::tests::test_task_stuck_when_referenced_task_failed stdout ----
Found object store LocalFileSystem for path /Users/alamb/Software/arrow-datafusion/parquet-testing/data/single_nan.parquet
thread 'context::tests::test_task_stuck_when_referenced_task_failed' panicked at 'called `Result::unwrap()` on an `Err` value: Execution("Job RcB8xKy failed: Task failed due to Tokio error: DataFusion error: Execution(\"ArrowError(ParseError(\\\"Error parsing line 2: Error(UnequalLengths { pos: Some(Position { byte: 104, line: 3, record: 2 }), expected_len: 2, len: 1 })\\\"))\")")', ballista/rust/client/src/context.rs:541:42
stack backtrace:
   0: rust_begin_unwind
             at /rustc/db9d1b20bba1968c1ec1fc49616d4742c1725b4b/library/std/src/panicking.rs:498:5
   1: core::panicking::panic_fmt
             at /rustc/db9d1b20bba1968c1ec1fc49616d4742c1725b4b/library/core/src/panicking.rs:107:14
   2: core::result::unwrap_failed
             at /rustc/db9d1b20bba1968c1ec1fc49616d4742c1725b4b/library/core/src/result.rs:1613:5
   3: core::result::Result<T,E>::unwrap
             at /rustc/db9d1b20bba1968c1ec1fc49616d4742c1725b4b/library/core/src/result.rs:1295:23
   4: ballista::context::tests::test_task_stuck_when_referenced_task_failed::{{closure}}
             at ./src/context.rs:541:23
   5: <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll
             at /rustc/db9d1b20bba1968c1ec1fc49616d4742c1725b4b/library/core/src/future/mod.rs:80:19
   6: <core::pin::Pin<P> as core::future::future::Future>::poll
             at /rustc/db9d1b20bba1968c1ec1fc49616d4742c1725b4b/library/core/src/future/future.rs:119:9
   7: tokio::runtime::basic_scheduler::CoreGuard::block_on::{{closure}}::{{closure}}::{{closure}}
             at /Users/alamb/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.16.1/src/runtime/basic_scheduler.rs:516:48
   8: tokio::coop::with_budget::{{closure}}
             at /Users/alamb/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.16.1/src/coop.rs:102:9
   9: std::thread::local::LocalKey<T>::try_with
             at /rustc/db9d1b20bba1968c1ec1fc49616d4742c1725b4b/library/std/src/thread/local.rs:399:16
  10: std::thread::local::LocalKey<T>::with
             at /rustc/db9d1b20bba1968c1ec1fc49616d4742c1725b4b/library/std/src/thread/local.rs:375:9
  11: tokio::coop::with_budget
             at /Users/alamb/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.16.1/src/coop.rs:95:5
  12: tokio::coop::budget
             at /Users/alamb/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.16.1/src/coop.rs:72:5
  13: tokio::runtime::basic_scheduler::CoreGuard::block_on::{{closure}}::{{closure}}
             at /Users/alamb/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.16.1/src/runtime/basic_scheduler.rs:516:25
  14: tokio::runtime::basic_scheduler::Context::enter
             at /Users/alamb/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.16.1/src/runtime/basic_scheduler.rs:374:19
  15: tokio::runtime::basic_scheduler::CoreGuard::block_on::{{closure}}
             at /Users/alamb/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.16.1/src/runtime/basic_scheduler.rs:515:36
  16: tokio::runtime::basic_scheduler::CoreGuard::enter::{{closure}}
             at /Users/alamb/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.16.1/src/runtime/basic_scheduler.rs:582:57
  17: tokio::macros::scoped_tls::ScopedKey<T>::set
             at /Users/alamb/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.16.1/src/macros/scoped_tls.rs:61:9
  18: tokio::runtime::basic_scheduler::CoreGuard::enter
             at /Users/alamb/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.16.1/src/runtime/basic_scheduler.rs:582:27
  19: tokio::runtime::basic_scheduler::CoreGuard::block_on
             at /Users/alamb/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.16.1/src/runtime/basic_scheduler.rs:506:9
  20: tokio::runtime::basic_scheduler::BasicScheduler::block_on
             at /Users/alamb/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.16.1/src/runtime/basic_scheduler.rs:182:24
  21: tokio::runtime::Runtime::block_on
             at /Users/alamb/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.16.1/src/runtime/mod.rs:475:46
  22: ballista::context::tests::test_task_stuck_when_referenced_task_failed
             at ./src/context.rs:542:9
  23: ballista::context::tests::test_task_stuck_when_referenced_task_failed::{{closure}}
             at ./src/context.rs:473:11
  24: core::ops::function::FnOnce::call_once
             at /rustc/db9d1b20bba1968c1ec1fc49616d4742c1725b4b/library/core/src/ops/function.rs:227:5
  25: core::ops::function::FnOnce::call_once
             at /rustc/db9d1b20bba1968c1ec1fc49616d4742c1725b4b/library/core/src/ops/function.rs:227:5
note: Some details are omitted, run with `RUST_BACKTRACE=full` for a verbose backtrace.

To Reproduce
Get the code from apache/datafusion#1839 and run

cd arrow-datafusion/ballista
test --no-default-features --features standalone -- --ignored

Expected behavior
Test should pass

Additional context
Add any other context about the problem here.

Publish official Docker images as part of release process

Is your feature request related to a problem or challenge? Please describe what you are trying to do.
When we release DataFusion and Ballista we should publish Docker images to the Apache Docker Hub repo for the following:

  • DataFusion CLI
  • Ballista Scheduler + Executor

Describe the solution you'd like
We should see how Apache Airflow does this.

Describe alternatives you've considered
Alternative is relying on users to build and publish their own Docker images, as Apache Spark does.

Additional context
None

[EPIC] Add support for Substrait

[EDIT: Updated this on 2/25/23]

Is your feature request related to a problem or challenge? Please describe what you are trying to do.

The substrait standard is gaining adoption and I would like to add support to Balllista. There are three different areas where we could potentially support Substrait:

  • ExecuteQueryParams currently accepts either LogicalPlan or a SQL string. We could add Substrait here as well, represented as a byte array. This would allow clients such as Ibis to submit queries directly to Ballista's gRPC service.
  • The executor currently receives tasks containing DataFusion physical plans. These plans could be serialized to Substrait and passed to other execution engines, such as DuckDB, Polars, and cuDF, making Ballista a general-purpose distributed query scheduler.
  • We currently use a proprietary protobuf format for representing plans in protobuf format. We could adopt Substrait here as well, or maybe just add a wrapper for Substrait plans.

Original description:

Is your feature request related to a problem or challenge? Please describe what you are trying to do.
Ballista (and DataFusion) has a proprietary protobuf-based format for serializing query plans. This really ties Ballista to DataFusion and does not allow other query engines and/or compute kernals to be used easily.

Describe the solution you'd like
There is now an emerging standard for query plan serialization at https://substrait.io/ and this is also protobuf-based. It would be good to move towards this over time.

Describe alternatives you've considered
None

Additional context
None

Ballista should support Arrow FlightSQL

Is your feature request related to a problem or challenge? Please describe what you are trying to do.

In order to integrate with the wide ecosystem of database tools (tableau, etc) Ballista should support (J/O)DBC.

Describe the solution you'd like

Directly implementing xDBC drivers would be wasted effort, since the Arrow FlightSQL protocol already has xDBC drivers. Therefor Ballista should simply implement support for Arrow FlightSQL and call it a day.

Describe alternatives you've considered

Directly implementing a JDBC driver.

CI should run Ballista integration tests

Is your feature request related to a problem or challenge? Please describe what you are trying to do.
We are seeing PRs merged that cause regressions in Ballista because we currently rely on manual integration testing.

Describe the solution you'd like
CI should run the integration tests:

./dev/integration-tests.sh

Describe alternatives you've considered
None

Additional context
None

Add support for Jupyter notebooks in Ballista

Is your feature request related to a problem or challenge? Please describe what you are trying to do.

Jupyter notebooks are a popular tool for interacting with data science and "Big Data" systems and it would be nice if we could add support for this in Ballista.

Describe the solution you'd like

It probably makes sense to support writing Python code in the notebook, so this would depend on having Python bindings available for DataFusion & Ballista (see https://pypi.org/project/datafusion/).

Describe alternatives you've considered
None

Additional context
None

Enable benchmark data validation for distributed execution

Is your feature request related to a problem or challenge? Please describe what you are trying to do.
The TPC-H benchmark suite already has a feature for verifying that results are correct when executing in-memory with DataFusion. It would be good to extend this support to distributed execution with Ballista.

Describe the solution you'd like
I would like an option to run the benchmark in data validation mode when executing against a Ballista cluster.

Describe alternatives you've considered
None

Additional context
None

[Ballista] Fix regression in `roundtrip_logical_plan_custom_ctx` test

Describe the bug
PR apache/datafusion#2537 fixed an issue where the csv scan methods were using the full URI instead of the path when serializing csv scans, which was not consistent with the way other scans worked (parquet, avro, json). Making this consistent led to a regression in roundtrip_logical_plan_custom_ctx so the test was ignored for now.

We should re-enable this test.

To Reproduce
Run roundtrip_logical_plan_custom_ctx.

Expected behavior
Functionality should be consistent between file types.

Additional context
None

Improve new top-level README for this project

Is your feature request related to a problem or challenge? Please describe what you are trying to do.
We need to improve the README

Describe the solution you'd like
The new README should:

  • Explain the Ballista vision
  • Show diagram, scheduler screenshot, and/or code example to make the README compelling
  • Document the current state of Ballista
  • Links to documentation and talks to learn more
  • Tell people how to get involved

Describe alternatives you've considered
None

Additional context
None

Implement Python bindings for BallistaContext

Is your feature request related to a problem or challenge? Please describe what you are trying to do.
We have Python bindings for DataFusion's ExecutionContext. It would be good to also support Ballista's BallistaContext so that we can use Python to run distributed queries.

Describe the solution you'd like
Probably something like this?

import ballista

ctx = ballista.BallistaContext
df = ctx.read_parquet(...)

Describe alternatives you've considered
Another approach might be to have ballista be an optional feature of DataFusion and then enable new methods on the DataFusion ExecutionContext instead but that would probably result in tons of additional dependencies and blur the lines between DataFusion and Ballista and I think there is a strong case for DataFusion=lib/embedded and Ballista=distributed.

Additional context
N/A

Ballista: Partition columns are duplicated in protobuf decoding.

Describe the bug
Trying to read a partitioned parquet dataset while still allowing predicate pushdown on partition columns, I am manually constructing a table scan Logical plan on a manually constructed ListingTable which specified the partition column(s). The ListingTable constructor will add the partition columns to the Schema. This is then serialized and sent to the ballista scheduler which will deserialize and construct a new ListingTable, which will again add the partition column to the schema and result in an error when constructing the DFSchema

To Reproduce
Steps to reproduce the behavior:

// Assume we have a paritioned parquet table
let table_path = "/path/to/table";

// Construct a ListingOptions
    let listing_options = ListingOptions {
        file_extension: String::new(),
        format: Arc::new(ParquetFormat::default()),
        table_partition_cols: vec!["my-partition-column".into()],
        collect_stat: true,
        target_partitions: 1,
    };

// Infer the schema
    let schema = listing_options.infer_schema(store.clone(), table_path).await?;

// Construct a ListingTable with our provider
    let provider = ListingTable::new(
        store,
        "/path/to/some/table".to_string(),
        schema,
        listing_options,
    );

// Create a table scan plan
let plan = LogicalPlanBuilder::scan(UNNAMED_TABLE, Arc::new(provider), None)?
        .limit(10)?
        .build()?;

// Create a DistributedExecQuery
let query = Arc::new(DistributedQueryExec::new(scheduler_url, config, plan));

// Execute the query
let stream = plan.execute(0).await?;

Expected behavior
A clear and concise description of what you expected to happen.
This should work and the planner should pushdown a filter on my-parition-column to the physical scan so we only read parquet files from the requested partitions.

Additional context
Add any other context about the problem here.
A simple way to fix this would be to check in the ListingTable constructor whether we already have the partition columns included in the schema:

    pub fn new(
        object_store: Arc<dyn ObjectStore>,
        table_path: String,
        file_schema: SchemaRef,
        options: ListingOptions,
    ) -> Self {
        // Add the partition columns to the file schema
        let mut table_fields = file_schema.fields().clone();
        for part in &options.table_partition_cols {
           // Only add the partition column if it doesn't already exist
            if table_fields.iter().find(|f| f.name() == part).is_none() {
                table_fields.push(Field::new(
                    part,
                    DEFAULT_PARTITION_COLUMN_DATATYPE.clone(),
                    false,
                ));
            }
        }

        Self {
            object_store,
            table_path,
            file_schema,
            table_schema: Arc::new(Schema::new(table_fields)),
            options,
        }
    }

When I try this locally it works in the sense that I don't get an error for duplicate fields, but I do get another error downstream. My guess is that this is because the partition column datatype is hard-coded but haven't debugged it fully.

Ballista assumes all aggregate expressions are not DISTINCT

Describe the bug
We have a hard-coded distinct = false parameter in ballista/rust/core/src/serde/physical_plan/mod.rs.

Ok(create_aggregate_expr(
    &aggr_function.into(),
    false, // <-- hard-coded "distinct"
    input_phy_expr.as_slice(),
    &physical_schema,
    name.to_string(),
)?)

To Reproduce
Try running a COUNT(DISTINCT expr) in Ballista

Expected behavior
We need to include the distinct flag in the protobuf for aggregate queries and implement the appropriate serde code.

Additional context
None

Introduce the object stores in datafusion-contrib as optional features

Is your feature request related to a problem or challenge? Please describe what you are trying to do.

As a long running standalone system, it's better to provide a mechanism to register some object stores automatically rather than manually do the registration.

Describe the solution you'd like

Introduce a object store self detector for ObjectStoreRegistry. apache/datafusion#2906

Describe alternatives you've considered

Additional context

Improvements to Ballista extensibility

Is your feature request related to a problem or challenge? Please describe what you are trying to do.
A clear and concise description of what the problem is. Ex. I'm always frustrated when [...]
(This section helps Arrow developers understand the context and why for this feature, in addition to the what)

Currently, we are working with DataFusion/Ballista as a query execution engine. One of the primary selling points for DataFusion is extensibility but it is not currently possible to use the many extension points in DataFusion with Ballista.

This is primarily due to the constraints of serializing all logical and physical plans as Protobuf messages.

Ideally we would like to use Ballista to execute:

  • Scans using custom object stores
  • User Defined logical plan extensions
  • User defined physical plan extensions
  • User defined scalar and aggregation functions

Describe the solution you'd like
A clear and concise description of what you want to happen.

There are two things ideally:

  1. We would like to decouple the core Ballista functionality from the serializable representations of plans so that the serde layer can become pluggable/extensible.
  2. Serde should be aware of a user-defined ExecutionContext so we can leverage optimizers, extension planners, and udf/udaf

Describe alternatives you've considered
A clear and concise description of any alternative solutions or features you've considered.

There currently is no workaround for this but we have been prototyping possible solutions which we'd be interested in upstreaming.

Additional context
Add any other context or screenshots about the feature request here.

[Discuss] Ballista Future Direction

No that Ballista is it’s own top-level project, I want to extend a previous discussion around what exactly Ballista is (previous discussion apache/datafusion#1916)

I believe the consensus from that discussion was that Ballista should be a standalone system but in practice I think we have adopted somewhat of a “both and” approach in the sense that we have added several extension point to Ballista (while also providing default implementations that allow you to run Ballista out-of-the-box). I think this is a reasonable direction and agree that it is important that Ballista be something you can use “as is”.

That raises the question of what are the use cases for Ballista that we would like to optimize for?

To take a concrete example, I think the current architecture is optimized for batch processing using a straightforward map-reduce implementation. This has many advantages:

  • Scheduling is relatively straightforward.
  • Cluster utilization is efficient since the unit of schedulable work is a single task, so whenever a task slot frees up you can just schedule a pending task on it.
  • It is resilient to task failures since all intermediate results are serialized to disk so recovering from a spurious task failure is just a matter of rescheduling the task.

However, for non-batch oriented work it has some serious drawbacks

  • Results can only be returned once the entire query finishes. We can’t start streaming results as soon as they are available.
  • Stages are scheduled sequentially so each stage is bound by its slowest partition.
  • Queries that return large resultsets can be quite resource intensive as each partition must serialize its entire shuffle result to disk

There has already been some excellent proof-of-concept work done on a different scheduling paradigm in apache/datafusion#1842 (something my team hopes to help push forward in the near future) but this raises some questions of it’s own, Namely, when do we use streaming vs map-reduce execution? In the PoC it is a very simple heuristic but in real uses cases I’m not sure there is a one-size-fits-all solution. In some cases you may want to use only streaming execution and use auto-scaling or dynamic partitioning to make sure each query can be scheduled promptly. Or you may only care about resource utilization and want to disable streaming execution entirely.

This is one particular example but you can imagine many others.

To bring this back around to some concrete options, I think there are a few different ways we can go:

  1. Ballista is a distributed computing framework which can be customized for many different use cases. There are default implementations available which allow you to use Ballista as a standalone system but the internal implementation is defined in terms of interfaces which allow for customized implementations as needed.
  2. Ballista is a standalone system with limited customization capabilities and is highly optimized for its intended use case. You can plugin certain things (ObjectStore implementations, UDFs, UDAFs, etc) but the core functionality (scheduler, state management, etc) is what it is and if it doesn’t work for your use case then this is not the right solution for you.
  3. Ballista is a standalone system which is highly configurable but not highly extensible. It ships with, for instance, schedulers optimized for different use cases which can be enabled through runtime configuration (or build-time features) but you can’t just plugin your own custom scheduler implementation (without upstreaming it :)).

[Ballista] Support to access remote object store, like HDFS, S3, etc

Is your feature request related to a problem or challenge? Please describe what you are trying to do.

After introducing the object store API, to support to access remote object store for Ballista executors, there are still some gap. For example, as #22 and #10 mentioned, ballista is not able to support remote object store.

Describe the solution you'd like

Our workaround is to make the file path self described. For example, a local file path should be file://tmp/..., a hdfs file path should hdfs://localhost:xxx:/tmp/...

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.