Giter Club home page Giter Club logo

datafuselabs / databend Goto Github PK

View Code? Open in Web Editor NEW
7.3K 7.3K 701.0 258.83 MB

๐——๐—ฎ๐˜๐—ฎ, ๐—”๐—ป๐—ฎ๐—น๐˜†๐˜๐—ถ๐—ฐ๐˜€ & ๐—”๐—œ. Modern alternative to Snowflake. Cost-effective and simple for massive-scale analytics.

Home Page:

License: Other

Makefile 0.02% Rust 97.64% Dockerfile 0.03% Shell 1.60% Python 0.57% Jinja 0.14%
ai bigdata database rust serverless snowflake

databend's Issues

Build with seld defined docker hub and tag

Description for this feature.
Currently fusequery only supports docker hub on dafafusedev, for local test environment, we could support user defined hub and tag during image building

the result of "select * from system.numbers_mt limit 10000000000" is wrong

only 10000 rows returned.

:~/fuse$ nohup ./fusequery-v0.1.1-alpha-linux-aarch64 &
[2] 6366
:~/fuse$ nohup: ๅฟฝ็•ฅ่พ“ๅ…ฅๅนถๆŠŠ่พ“ๅ‡บ่ฟฝๅŠ ๅˆฐ'nohup.out'

:~/fuse$ mysql -h127.0.0.1 -P3307
Welcome to the MySQL monitor.  Commands end with ; or \g.
Your MySQL connection id is 8
Server version: 5.1.10-alpha-msql-proxy

Copyright (c) 2000, 2019, Oracle and/or its affiliates. All rights reserved.

Oracle is a registered trademark of Oracle Corporation and/or its
affiliates. Other names may be trademarks of their respective

Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.

mysql> SELECT avg(number) FROM (select * from system.numbers_mt limit 10000000000);
| avg(number) |
|      4999.5 |
1 row in set (0.01 sec)

mysql> SELECT sum(number) FROM (select * from system.numbers_mt limit 10000000000);
| sum(number) |
|    49995000 |
1 row in set (0.01 sec)

mysql> SELECT count(number) FROM (select * from system.numbers_mt limit 10000000000);
| count(number) |
|         10000 |
1 row in set (0.00 sec)

[build] add ARM building test


Add a testing for ARM platform.
Since arrow-3 is not compile under cross build --target aarch64-unknown-linux-gnu, this issue needs to wait for them to fix.

Cannot create table?

Hi, Iโ€™m new to both this database and ClickHouse, could I know how to insert data to the database?

ClickHouse protocol

There is a growing trend that ClickHouse is becoming the standard SQL for OLAP databases.
ClickHouse is very powerful, it supports flexible storage engines and plentiful features.

So that will be very great to support ClickHouse protocol.
Then we can work well with clickhouse-client, for testing and for loading rich and varied formats data into datafuse.

Support create table statement

Use case

Create table as MySQL did. Currently, we just keep it in memory.


Support Local file reader


  • support system.tables [#141 ]
  • support table create statment [#148 ]
  • support show queries.[#156 ]
  • create table store as [parquet,csv], table will be added to datasource, currently stored in memory[#183 ]
  • parquet,csv reader[#183 ]

question about Arrow plus MergeTree

Hi, I am pretty interested in this project's idea. But I have some questions. Do you use some columnar format from Arrow to persist all the data? How does that cooperate with MergeTree? I know that columar format such as Parquet or ORC are not suitable with frequent rewrite, but MergeTree engine requires to write data partitioned by keys together, which will cause great write amplification and performance degradation. How do you overcome this problem?

[planner] support the distrubuted plan fragment


For now, all the pipeline build based on the single fragment, if we have a query:

SELECT SUM(number) from system.numbers_mt

Thie pipelines building like:

  โ””โ”€ AggregateFinalTransform ร— 1 processor
    โ””โ”€ Merge (AggregatePartialTransform ร— 8 processors) to (MergeProcessor ร— 1)
      โ””โ”€ AggregatePartialTransform ร— 8 processors
        โ””โ”€ SourceTransform ร— 8 processors  

All 8-way parallel is executed on the local, so we need the plan can split to many fragments and some of them can be executed on the remote site:

  โ””โ”€ AggregateFinalTransform ร— 1 processor
    โ””โ”€ Merge (AggregatePartialTransform ร— 8 processors) to (MergeProcessor ร— 1)
      โ””โ”€ AggregatePartialTransform@remote1 ร— 2 processors
        โ””โ”€ SourceTransform@remote1 ร— 2 processors  
      โ””โ”€ AggregatePartialTransform@remote2 ร— 2 processors
        โ””โ”€ SourceTransform@remote2 ร— 2 processors  
      โ””โ”€ AggregatePartialTransform@local ร— 4 processors
        โ””โ”€ SourceTransform@local ร— 4 processors  

Local and remote stream is connected by the ExchangeStream port.

Distributed query engine

To archive distributed query engine, tasks:

  • Mod for the cluster meta maintain [#69 ]
  • RPC API for the planner executor [#112 #114 #116]
  • Remote transform for adapting local and remote pipeline stream [#119 ]
  • Distributed plan scheduler [#132 ]
  • Distributed pipeline build [#136 #143 ]
  • REST API for cluster node add/remove [#150 ]
  • Add system.clusters table [#155 ]
  • Stealing worker from remote executors [#167 ]

planner/processor digraph


thinkpad :) explain pipeline graph=1 select sum(number) from system.numbers_mt;

SELECT sum(number)
FROM system.numbers_mt

Query id: 00993332-0c4f-4039-a24c-8c65d0f6902f

โ”‚ digraph                                        โ”‚
โ”‚ {                                              โ”‚
โ”‚   rankdir="LR";                                โ”‚
โ”‚   { node [shape = box]                         โ”‚
โ”‚         n1 [label="Numbers ร— 8"];              โ”‚
โ”‚     subgraph cluster_0 {                       โ”‚
โ”‚       label ="Aggregating";                    โ”‚
โ”‚       style=filled;                            โ”‚
โ”‚       color=lightgrey;                         โ”‚
โ”‚       node [style=filled,color=white];         โ”‚
โ”‚       { rank = same;                           โ”‚
โ”‚         n3 [label="AggregatingTransform ร— 8"]; โ”‚
โ”‚         n4 [label="Resize"];                   โ”‚
โ”‚       }                                        โ”‚
โ”‚     }                                          โ”‚
โ”‚     subgraph cluster_1 {                       โ”‚
โ”‚       label ="Expression";                     โ”‚
โ”‚       style=filled;                            โ”‚
โ”‚       color=lightgrey;                         โ”‚
โ”‚       node [style=filled,color=white];         โ”‚
โ”‚       { rank = same;                           โ”‚
โ”‚         n2 [label="ExpressionTransform ร— 8"];  โ”‚
โ”‚       }                                        โ”‚
โ”‚     }                                          โ”‚
โ”‚     subgraph cluster_2 {                       โ”‚
โ”‚       label ="Expression";                     โ”‚
โ”‚       style=filled;                            โ”‚
โ”‚       color=lightgrey;                         โ”‚
โ”‚       node [style=filled,color=white];         โ”‚
โ”‚       { rank = same;                           โ”‚
โ”‚         n5 [label="ExpressionTransform"];      โ”‚
โ”‚       }                                        โ”‚
โ”‚     }                                          โ”‚
โ”‚   }                                            โ”‚
โ”‚   n1 -> n2 [label="ร— 8"];                      โ”‚
โ”‚   n3 -> n4 [label="ร— 8"];                      โ”‚
โ”‚   n4 -> n5 [label=""];                         โ”‚
โ”‚   n2 -> n3 [label="ร— 8"];                      โ”‚
โ”‚ }                                              โ”‚

Code in printPipeline:

In FuseQuery, display_graphviz TODO:

Document for show statement


Add document for show:

mysql> show tables;
| name       |
| clusters   |
| one        |
| functions  |
| numbers    |
| settings   |
| numbers_mt |
| tables     |
7 rows in set (0.01 sec)

mysql> show settings;
| name           |
| max_block_size |
| max_threads    |
| default_db     |
3 rows in set (0.01 sec)

Group By


Build the planner for the sql


Support system.clusters table

Show the nodes info in the cluster:

SELECT * FROM system.clusters;

Table schemas like(Memory table):

โ”‚ cluster                 โ”‚ String โ”‚              โ”‚                    โ”‚         โ”‚                  โ”‚                โ”‚
โ”‚ node_weight             โ”‚ UInt32 โ”‚              โ”‚                    โ”‚         โ”‚                  โ”‚                โ”‚
โ”‚ host_name               โ”‚ String โ”‚              โ”‚                    โ”‚         โ”‚                  โ”‚                โ”‚
โ”‚ host_address            โ”‚ String โ”‚              โ”‚                    โ”‚         โ”‚                  โ”‚                โ”‚
โ”‚ port                    โ”‚ UInt16 โ”‚              โ”‚                    โ”‚         โ”‚                  โ”‚                โ”‚
โ”‚ cpus                    โ”‚ UInt16 โ”‚              โ”‚                    โ”‚         โ”‚                  โ”‚                โ”‚
โ”‚ is_local                โ”‚ UInt8  โ”‚              โ”‚                    โ”‚         โ”‚                  โ”‚                โ”‚
โ”‚ user                    โ”‚ String โ”‚              โ”‚                    โ”‚         โ”‚                  โ”‚                โ”‚


[planner] add plan walker for visit the internal node eaiser

Now we change all plan nodes to a array, if we using the walker pattern, we can do it easy and clear:

plan_node.walk(|node| {
         match node {...}

This can be used at optimizer/pipeline_builder/rewritter:

  • Change Pipeline builder to walk function #80
  • Change Optimizer to walk function #80
  • Change the plan display(Indent and graphvisual) to walk function #82

how to run fusequery-v0.1.1-alpha-linux-armv7

:~/fuse$ ./fusequery-v0.1.1-alpha-linux-armv7 
=== Exagear internal error ===
pid = 10299
tid = 10299
cpu = 4
argv[0] = /opt/exagear/ubt_a32a64
prctl(PR_SET_MM_MAP) failed: Invalid argument
ๅทฒๆ”พๅผƒ (ๆ ธๅฟƒๅทฒ่ฝฌๅ‚จ)

Replace DataBlock with Arrow RecordBatch


Now, DataBlock must convert to Arrow RecordBatch in some case, such as(and other place):

In fact, DataBlock and RecordBatch overlap much on the functions, RecordBatch would be better.
[1] DataBlock
[2] RecordBatch

Create too many thread in

In the start() function๏ผŒit will create a thread pool:
`pub fn start(&self) -> FuseQueryResult<()> {
let listener =
net::TcpListener::bind(format!("{}", self.opts.mysql_handler_port)).unwrap();

    let pool = ThreadPool::new(4096);

When I use my mac book pro to run fuse-query, a failed occur: thread 'tokio-runtime-worker' panicked at 'called Result::unwrap() on an Err value: Os { code: 35, kind: WouldBlock, message: "Resource temporarily unavailable" }', /Users/caoxiaoyong/.cargo/registry/src/
stack backtrace:
0: 0x1082a43ac - std::backtrace_rs::backtrace::libunwind::trace::hd4e4f36eb9f0f866
at /rustc/11c94a197726b6a981828cb1837d7c3eed1b841d/library/std/src/../../backtrace/src/backtrace/
1: 0x1082a43ac - std::backtrace_rs::backtrace::trace_unsynchronized::h713e8299cd20066f
at /rustc/11c94a197726b6a981828cb1837d7c3eed1b841d/library/std/src/../../backtrace/src/backtrace/
2: 0x1082a43ac - std::sys_common::backtrace::_print_fmt::h10db128357fb183a
at /rustc/11c94a197726b6a981828cb1837d7c3eed1b841d/library/std/src/sys_common/
3: 0x1082a43ac - <std::sys_common::backtrace::_print::DisplayBacktrace as core::fmt::Display>::fmt::ha6f5ddd5259d3e16
at /rustc/11c94a197726b6a981828cb1837d7c3eed1b841d/library/std/src/sys_common/
4: 0x1082c001d - core::fmt::write::hefe95a44532fe6ed
at /rustc/11c94a197726b6a981828cb1837d7c3eed1b841d/library/core/src/fmt/
5: 0x10829fac6 - std::io::Write::write_fmt::hf513c99961415bff
at /rustc/11c94a197726b6a981828cb1837d7c3eed1b841d/library/std/src/io/
6: 0x1082a62b9 - std::sys_common::backtrace::_print::he8e57200c2a0a691
at /rustc/11c94a197726b6a981828cb1837d7c3eed1b841d/library/std/src/sys_common/
7: 0x1082a62b9 - std::sys_common::backtrace::print::hab21ea437dd3480e
at /rustc/11c94a197726b6a981828cb1837d7c3eed1b841d/library/std/src/sys_common/
8: 0x1082a62b9 - std::panicking::default_hook::{{closure}}::h70a3a940826edea5
at /rustc/11c94a197726b6a981828cb1837d7c3eed1b841d/library/std/src/
9: 0x1082a5e40 - std::panicking::default_hook::h9520f36dd50be056
at /rustc/11c94a197726b6a981828cb1837d7c3eed1b841d/library/std/src/
10: 0x1082a693b - std::panicking::rust_panic_with_hook::h102f8bee4e0ef4c7
at /rustc/11c94a197726b6a981828cb1837d7c3eed1b841d/library/std/src/
11: 0x1082a6465 - std::panicking::begin_panic_handler::{{closure}}::hb72eee9aad2e147c
at /rustc/11c94a197726b6a981828cb1837d7c3eed1b841d/library/std/src/
12: 0x1082a4868 - std::sys_common::backtrace::__rust_end_short_backtrace::h372ff87ecb2667f3
at /rustc/11c94a197726b6a981828cb1837d7c3eed1b841d/library/std/src/sys_common/
13: 0x1082a63ca - rust_begin_unwind
at /rustc/11c94a197726b6a981828cb1837d7c3eed1b841d/library/std/src/
14: 0x1082ce8ef - core::panicking::panic_fmt::h261fd45d36f74dfa
at /rustc/11c94a197726b6a981828cb1837d7c3eed1b841d/library/core/src/
15: 0x1082ce7f5 - core::option::expect_none_failed::hf2a36714418ff407
at /rustc/11c94a197726b6a981828cb1837d7c3eed1b841d/library/core/src/
16: 0x10810a2e2 - threadpool::spawn_in_pool::hf5127a9147a81622
17: 0x108109e67 - threadpool::Builder::build::hbfe641a452a12f1e
18: 0x10810a0b6 - threadpool::ThreadPool::new::h36ebd4c5ae8c87c6
19: 0x108081646 - fuse_query::servers::mysql::mysql_handler::MySQLHandler::start::h6b0310d2b8b54387
20: 0x10802d226 - <core::future::from_generator::GenFuture as core::future::future::Future>::poll::h2f71a80a373508ff
21: 0x10803079e - tokio::runtime::task::core::Core<T,S>::poll::hb3c0728aebbf4b3f
22: 0x108028448 - <std::panic::AssertUnwindSafe as core::ops::function::FnOnce<()>>::call_once::h36c2507424199f31
23: 0x1080251cb - tokio::runtime::task::harness::Harness<T,S>::poll::h45dd6c7e49284806
24: 0x108267cd8 - std::thread::local::LocalKey::with::h162dda8b5d89bc0e
25: 0x1082710b6 - tokio::runtime::thread_pool::worker::Context::run_task::he46df3ea7ce91956
26: 0x10827035b - tokio::runtime::thread_pool::worker::Context::run::h63705a7d38108a39
27: 0x108265e23 - tokio::macros::scoped_tls::ScopedKey::set::h336c692f64a9e74e
28: 0x1082701cd - tokio::runtime::thread_pool::worker::run::h03f0c263fd4dd28d
29: 0x108257640 - tokio::loom::std::unsafe_cell::UnsafeCell::with_mut::he0230cd105d9acc3
30: 0x108254443 - <std::panic::AssertUnwindSafe as core::ops::function::FnOnce<()>>::call_once::hcdd61b3f981481cb
31: 0x108257e38 - tokio::runtime::task::raw::poll::h7ecfdb64b9ae978c
32: 0x10826d7e3 - tokio::runtime::blocking::pool::Inner::run::h8c4016b5ae5b81af
33: 0x10826051a - std::sys_common::backtrace::__rust_begin_short_backtrace::h46a045006015dc24
34: 0x108256896 - core::ops::function::FnOnce::call_once{{vtable.shim}}::h8662e0c5f894df07
35: 0x1082a9a3d - <alloc::boxed::Box<F,A> as core::ops::function::FnOnce>::call_once::h1658d06ba2d41cb5
at /rustc/11c94a197726b6a981828cb1837d7c3eed1b841d/library/alloc/src/
36: 0x1082a9a3d - <alloc::boxed::Box<F,A> as core::ops::function::FnOnce>::call_once::h747cc083e7349205
at /rustc/11c94a197726b6a981828cb1837d7c3eed1b841d/library/alloc/src/
37: 0x1082a9a3d - std::sys::unix::thread::Thread::new::thread_start::h93dd3097fa4fa219
at /rustc/11c94a197726b6a981828cb1837d7c3eed1b841d/library/std/src/sys/unix/
38: 0x7fff7057e109 - __pthread_start
^Cmake: *** [run] Interrupt: 2
I noticedstart()` function will create 4096 threads, when I changed the number of thread be 128, all thing is right.

[admin] /v1/cluster/addnode REST API

curl --location --request POST 'localhost:3030/v1/cluster/addnode' \
--header 'Content-Type: application/json' \
--header 'Content-Type: text/plain' \
--data-raw '{
    "address": "",
    "cpus": 8

Add cluster mode for fuse-test


For fuse-test supports Cluster and Standalone(now support), the init plans:
[1] Start two servers in

 # node-9090
 sudo nohup /fuse-query &

 # node-9091
 sudo nohup /fuse-query --http-api-address= --metric-api-address= --mysql-handler-port=3308 --rpc-api-address=

[2] Add nodes to

#Add node-9090 
curl -X POST -H "Content-Type: application/json" -d '{"name":"9090","address":"","cpus":8}'

#Add node-9091
curl -X POST -H "Content-Type: application/json" -d '{"name":"9091","address":"","cpus":8}'

[3] node-9090 is in cluster with 2 nodes, and node-9091 is single stand
[4] Add 1_cluster for cluster mode test?

work stealing scheduler for partitions aware parallelism

The purpose of this scheduler is to maximize local parallelism(SMP) and distributed parallelism(MPP), allowing resources to be fully utilized between workers.
The init design:

  1. a global worker pool, key is session context uuid, value is partitions
  2. each datasource pipeline executor fecth N partitions from the global worker pool by UUID
  3. pipeline executor exeucted and fetch again, until empty
  4. if global worker pool is empty, he will try to steal from others nodes

Pretty display for the pipeline with remote transform

If the cluster with 3 nodes, the query:

select sum(number+1)+2 as sumx from system.numbers_mt(100000)

First the distributed query plan:

AggregatorFinal: groupBy=[[]], aggr=[[(sum([(number + 1)]) + 2) as sumx]]\
    \n  RedistributeStage[state: AggregatorMerge, id: 0]\
    \n    AggregatorPartial: groupBy=[[]], aggr=[[(sum([(number + 1)]) + 2) as sumx]]\
    \n      ReadDataSource: scan parts [8](Read from system.numbers_mt table, Read Rows:100000, Read Bytes:800000)

Then, the pipeline:

AggregatorFinalTransform ร— 1 processor\
    \n  Merge (RemoteTransform ร— 3 processors) to (AggregatorFinalTransform ร— 1)\
    \n    RemoteTransform ร— 3 processors

In the pipeline show, we need more RemoteTransform info to display, expected as:

AggregatorFinalTransform ร— 1 processor\
    \n  Merge (RemoteTransform ร— 3 processors) to (AggregatorFinalTransform ร— 1)\
    \n    RemoteTransform ร— 3 processor(s): AggregatorPartialTransform ร— 8 processors -> SourceTransform ร— 8 processors

The partition_queue in session context is ruined by explain pipeline query.

Now, we shared FuseQueryContext in each session. So partition_queue is shared, there's a bug after we execute explain and then execute a select query.

mysql> explain pipeline select sum(number) from system.numbers(10);
| explain                                                                                                                                                                                                                                      |
  โ””โ”€ AggregateFinalTransform ร— 1 processor
    โ””โ”€ Merge (AggregatePartialTransform ร— 8 processors) to (MergeProcessor ร— 1)
      โ””โ”€ AggregatePartialTransform ร— 8 processors
        โ””โ”€ SourceTransform ร— 8 processors                      |
1 row in set (0.00 sec)

mysql> select sum(number) from system.numbers(10);
| Sum(number) |
|          90 |
1 row in set (0.01 sec)

mysql> select sum(number) from system.numbers(10);
| Sum(number) |
|          45 |
1 row in set (0.01 sec)

The simplest way to fix is to keep partition_queue empty during explain query. The future to fix is to make the context separated by gloall_context and session context.

Try to use cargo workspace


Combine FuseQuery and FuseStore to one project(datafuse), directories and files looks like:

โ”œโ”€โ”€ Cargo.lock
โ”œโ”€โ”€ Cargo.toml
โ”œโ”€โ”€ docker
โ”œโ”€โ”€ scripts
โ”œโ”€โ”€ proto
โ”œโ”€โ”€ query
โ”‚   โ”œโ”€โ”€ Cargo.toml
โ”‚   โ””โ”€โ”€ src
โ”‚       โ””โ”€โ”€ ...
โ”œโ”€โ”€ storage
โ”‚   โ”œโ”€โ”€ Cargo.toml
โ”‚   โ””โ”€โ”€ src
โ”‚       โ””โ”€โ”€ ..
โ””โ”€โ”€ target

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.