Giter Club home page Giter Club logo

distributed-join's Introduction

Distributed Join Project

Overview

This proof-of-concept repo implements the distributed repartitioned join algorithm. The algorithm consists of three steps:

  1. Hash partition: reorder input tables into partitions based on the hash values of the key columns.
  2. All-to-all communication: send each partition to the corresponding MPI rank so that rows with the same hash values end up in the same rank.
  3. Local join: each MPI rank performs local join independently.

For more information about the algorithm used and optimizations, please refer to the ADMS'21 paper and the presentatiton.

For production-quality distributed join implementation, checkout cuDF's Dask integration.

The following plot shows the weak-scaling performance when joining the l_orderkey column from lineitem table with the o_orderkey and the o_orderpriority columns from the orders table on TPC-H dataset with SF100k.

weak scaling performance

Compilation

This project depends on CUDA, UCX, NCCL, MPI, cuDF 0.19 and nvcomp 2.0.

To compile, make sure the variables CUDA_ROOT, CUDF_ROOT, MPI_ROOT, UCX_ROOT, NCCL_ROOT and NVCOMP_ROOT are pointing to the installation path of CUDA, cuDF, MPI, UCX, NCCL and nvcomp repectively.

The wiki page contains step-by-step instructions for setting up the environment.

To compile, run

mkdir build && cd build
cmake ..
make -j

Running

To run on systems not needing Infiniband (e.g. single-node DGX-2):

UCX_MEMTYPE_CACHE=n UCX_TLS=sm,cuda_copy,cuda_ipc mpirun -n 16 --cpus-per-rank 3 bin/benchmark/distributed_join

On systems needing Infiniband communication (e.g. single or multi-node DGX-1Vs):

  • GPU-NIC affinity is critical on systems with multiple GPUs and NICs, please refer to this page from QUDA for more detailed info. Also, you could modify run script included in the benchmark folder.
  • Depending on whether you're running with srun or mpirun, update run_sample.sh to set lrank to $SLURM_LOCALID or $OMPI_COMM_WORLD_LOCAL_RANK correspondingly.

Example run on a single DGX-1V (all 8 GPUs):

$ mpirun -n 8 --bind-to none --mca btl ^openib,smcuda benchmark/run_sample.sh
rank 0 gpu list 0,1,2,3,4,5,6,7 cpu bind 1-4 ndev mlx5_0:1
rank 1 gpu list 0,1,2,3,4,5,6,7 cpu bind 5-8 ndev mlx5_0:1
rank 2 gpu list 0,1,2,3,4,5,6,7 cpu bind 10-13 ndev mlx5_1:1
rank 3 gpu list 0,1,2,3,4,5,6,7 cpu bind 15-18 ndev mlx5_1:1
rank 4 gpu list 0,1,2,3,4,5,6,7 cpu bind 21-24 ndev mlx5_2:1
rank 6 gpu list 0,1,2,3,4,5,6,7 cpu bind 30-33 ndev mlx5_3:1
rank 7 gpu list 0,1,2,3,4,5,6,7 cpu bind 35-38 ndev mlx5_3:1
rank 5 gpu list 0,1,2,3,4,5,6,7 cpu bind 25-28 ndev mlx5_2:1
Device count: 8
Rank 4 select 4/8 GPU
Device count: 8
Rank 5 select 5/8 GPU
Device count: 8
Rank 3 select 3/8 GPU
Device count: 8
Rank 7 select 7/8 GPU
Device count: 8
Rank 0 select 0/8 GPU
Device count: 8
Rank 1 select 1/8 GPU
Device count: 8
Rank 2 select 2/8 GPU
Device count: 8
Rank 6 select 6/8 GPU
========== Parameters ==========
Key type: int64_t
Payload type: int64_t
Number of rows in the build table: 800 million
Number of rows in the probe table: 800 million
Selectivity: 0.3
Keys in build table are unique: true
Over-decomposition factor: 1
Communicator: UCX
Registration method: preregistered
Compression: false
================================
Elasped time (s) 0.392133

For the arguments accepted by each benchmark, please refer to the source files in the benchmark folder.

Code formatting

This repo uses clang-format for code formatting. To format the code, make sure clang-format is installed and run

./run-clang-format.py -p <path to clang-format>

distributed-join's People

Contributors

ajschmidt8 avatar esoha-nvidia avatar gaohao95 avatar mike-wendt avatar nsakharnykh avatar pentschev avatar raydouglass avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar

distributed-join's Issues

Investigate device memory usage outside memory pool

By default, the memory pool size used is the total GPU memory - 500MB. During some OOM runs, we observed using smaller memory pool solves the OOM issue. This indicates that the program uses a lot of device memory outside of memory pool. Tracking what memory is used outside of memory pool and make sure they are allocated within the memory pool should fix such issues.

Docker container

Setting up all the dependencies of this project (e.g. UCX, cuDF) can be complicated. It would be nice if this repo is containerized for easier setup.

Move to cuDF/rmm 0.14

Starting with 0.14 RMM removes rmmInitialize and replaces it with the default memory resource (i.e. cnmem memory resource/pool), so we should update the code accordingly.

Algorithm description

Hi there, I was wondering if there's a high-level description of the the distributed join algorithm available. Does it follow a text-book version or is it something custom?

Investigate memory usage

We should check the device memory usage, and compare it with what we projected in the modeling, with and without pipelining.
We should consider whether the extra device usage is due to our implementation, or the fragmentation of the memory pool.

Memory leak in UCXBufferCommunicator

UCXBufferCommunicator allocates buffers for count and recv_buffer used in callbacks but never frees them. Although these buffers are small (8 bytes each), we should consider making it clean.

Improve overlapping efficiency

We should profile and improve the computation-communication overlap efficiency on

  • a single node DGX with NVLink
  • multiple DGX nodes connected with IB

Remove batch ID from the tag

This should be possible, since UCX guarantees ordering of messages to the same node with the same tag. This change likely won't change anything perf-wise, but would be good to simplify the tag design.

Register the whole RMM memory pool upfront

Memory registration when using IB is costly, so we implement UCXBufferCommunicator to get away with it. However, UCXBufferCommunicator introduces additional overhead. A better way to handle this is we can preregister the whole RMM memory pool and just uses the regular UCX communicator.

One potential way to implement this is to implement a new memory resource class, just like the default CUDA memory resource currently in RMM, but registered, and uses this new memory resource class as the upstream allocator for pool_memory_resource.

Use RAII to manage communicator

Communicator objects manage resources such as UCX endpoints and communicator buffers. Using RAII to manage Communicator avoids the leak of these resources.

Query parameters from command line

Right now, parameters like build table size, probe table size, key type, payload type and selectivity are hard-coded to each test case and benchmark. Ideally we should dynamically query these parameters from command line.

Use cuDF's error checking utilities

The error checking utilities of this repo (currently located at src/error.cuh) should be aligned with cuDF's error checking utilities (cudf/utilities/error.hpp). I believe this will allow more code reuse. For example, generate_dataset/generate_dataset.cuh in this repo serves the same functionality as cuDF's join benchmark dataset generation.

Replace `cudaStreamDefault`

cudaStreamDefault is a flag used for passing to cudaStreamCreateWithFlags instead of a valid stream. We should replace that with either rmm::cuda_stream_default or 0.

Switch to CMake instead of using Makefile directly

Motivations:

  • Our Makefile is not robust. For example, @abc99lr has experienced that if CUB_HOME is not supplied, the binary will link to a different MPI library without any warning. Using a Makefile generator like CMake should avoid this type of issues in the future.
  • CMake can manage THIRD_PARTY_HOME dependency directly.

Update the code to work with cuDF 0.11+ (libcudf++)

  1. To query the size of cuDF dtype, this project uses gdf_dtype_size, which is removed in cuDF upstream recently. For now, you could use my custom branch, which brings gdf_dtype_size back. We should check if there is a similar function in a more recent cuDF version.
  2. cuDF 0.11 introduces a new API for joins and we should migrate to that: https://github.com/rapidsai/cudf/blob/e3632d6fd60de2ae2f2e0640957dfdf4ef985bb3/cpp/include/cudf/legacy/join.hpp#L67
  3. hash_partition is also changed, see the updated API here: https://github.com/rapidsai/cudf/blob/e3632d6fd60de2ae2f2e0640957dfdf4ef985bb3/cpp/include/cudf/hashing.hpp#L41

Coordinate buffer communicator and default UCX communicator

  • Currently UCXCommunicator uses different communication tag design than UCXBufferCommunicator. We should make them in line with each other.
  • Currently when there's no comm buffer available in the buffer queue, UCXBufferCommunicator will fail. A better design would give a warning and fall back to UCXCommunicator.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.