Giter Club home page Giter Club logo

ml-perf's Introduction

Laurae2 R-package

The sequel to Laurae R-package.

Each function has at least one corresponding vignette to look up for an example using help_me("my_function_name").

Installation

It can be computationally expensive to build vignettes. Build without vignettes using the following:

devtools::install_github("Laurae2/Laurae2")

If you want to build vignettes to get a significantly better help:

devtools::install_github("Laurae2/Laurae2", build_vignettes = TRUE)

Pre-requirement installation:

install.packages("devtools")
install.packages(c("knitr", "rmarkdown", "mlrMBO", "lhs", "smoof", "ParamHelpers", "animation"))

xgboost installation, commit dmlc/xgboost@017acf5 seems best currently as it has gblinear improvements. Make sure to use the right compiler below:

devtools::install_github("Laurae2/xgbdl")

# gcc
xgbdl::xgb.dl(compiler = "gcc", commit = "017acf5", use_avx = FALSE, use_gpu = FALSE)

# Visual Studio 2015, use AVX if you wish to
xgbdl::xgb.dl(compiler = "Visual Studio 14 2015 Win64", commit = "017acf5", use_avx = FALSE, use_gpu = FALSE)

# Visual Studio 2017, use AVX if you wish to
xgbdl::xgb.dl(compiler = "Visual Studio 15 2017 Win64", commit = "017acf5", use_avx = FALSE, use_gpu = FALSE)

What can it do?

What can it do:

  • Bayesian Optimization (time-limited, iteration-limited, initialization-limited)
  • Create data.frame from [R,C] matrix-like format
  • Create data.table from [R,C] matrix-like format

Package requirements:

  • knitr
  • rmarkdown
  • mlrMBO
  • lhs
  • smoof
  • ParamHelpers
  • animation
  • xgboost

ml-perf's People

Contributors

laurae2 avatar

Stargazers

 avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar

ml-perf's Issues

xgboost and LightGBM GPU requirements

GPU RAM behavior information:

  • xgboost GPU RAM requirement sampling is unreliable, as there is an initialization peak which is difficult to catch. What you see in nvidia-smi is not what you need in practice for xgboost.
  • LightGBM GPU RAM requirement never grows up, what you see in nvidia-smi is really what you need

Therefore, xgboost requires trial and error sampling (using different number of parallel workers) until crash, while LightGBM is easily predictable with only 1 sample.

Notes:

  • The GPU requirements also depend on the hyperparameters you are using for your model
  • For reference, the hyperparameters are the following: depth = 6, leaves = 63, bins = 255

Table for GPU requirements:

Dataset Observations xgboost LightGBM
Airline 100,000 "nearly" 1GB (fits 4 on 4GB) about 67 MB
Airline 1,000,000 "nearly" 1GB (fits only 3 on 4GB) about 93 MB
Airline 10,000,000 "nearly" 1.1GB (fits only 2 on 4GB) about 333 MB

To be updated with more datasets later.

Edit 05/31/2019:

  • Airline 1M 4x sometimes crashes on xgboost with 4GB GPU. Reduced to 3x.
  • Airline 10M 3x sometimes crashes on xgboost with 4GB GPU. Reduced to 2x.

xgboost in parallel: demo results

Following #3 with results.

Baseline: 1 CPU thread model throughput = (11.391 x 25 + 11.330 x 50) / 75 = 11.350

Baseline: 1 GPU thread model throughput = 20.436

For reference:

  • Parallel threads = processes/threads used in parallel to run R (multiprocessing through sockets)
  • Model threads = threads used to run xgboost (multithreading)
  • Parallel GPUs = number of GPUs used in parallel processes/threads in R
  • Parallel GPU threads = number of processes running in a single GPU
  • Models = number of models to train in total
  • Seconds / Model = average throughput for 1 model, in seconds
  • Boost vs Baseline = your performance gain if you were to do the mentioned row vs doing only 1 CPU (or 1 GPU if GPU) process/thread for your model

CPU:

Run Parallel Threads Model Threads Parallel GPUs GPU Threads Models Seconds / Model Boost vs Baseline
9 1 1 0 0 25 11.391 ~1x
10 9 1 0 0 50 1.458 7.78x
11 18 1 0 0 100 0.797 14.24x
12 35 1 0 0 250 0.474 23.95x
13 70 1 0 0 500 0.440 25.79x
14 1 1 0 0 50 11.330 ~1x
15 1 9 0 0 50 6.287 1.81x
16 1 18 0 0 50 6.283 1.81x
17 1 35 0 0 50 24.907 0.46x
18 1 70 0 0 50 165.522 0.07x

GPU:

Run Parallel Threads Model Threads Parallel GPUs GPU Threads Models Seconds / Model Boost vs Baseline
1 1 1 1 1 25 20.436 ~1x
2 2 1 2 1 50 10.666 1.91x
3 3 1 3 1 100 6.999 2.92x
4 4 1 4 1 250 5.182 3.94x
5 4 1 1 4 50 20.602 0.99x
6 8 1 2 4 100 10.495 1.95x
7 12 1 3 4 250 6.909 2.96x
8 16 1 4 4 500 5.222 3.91x

Conclusions:

  • More and too many CPU cores poorly used (multithreading) actually are playing AGAINST you by make the learning slower
  • More and too many CPU cores appropriately used (parallel) are providing huge performance boost and benefit significantly from hyperthreading (+200% from 35 threads to 70 threads)
  • More GPU in parallel provides moderate performance increase, given we are using NVIDIA Quadro P1000 GPUs, while 1x Tesla V100 is not cheap (more expensive than the CPUs on my server!)
  • Overallocating GPU threads did not help, because GPUs were already at 100% GPU usage for 0.1m data due to our sparse data
  • 18 parallel threads to 35 parallel threads is not scaling linearly, I expect it is because I have only 4 DIMM on my server (80 GBps) instead of the full 12 possible DIMMs (240 GBps) => could check with Intel VTune, too lazy because it's a long process (but it's doable)

TODO: try with V100 later

LightGBM pre-performance

I am currently testing a LightGBM script which can run 40 LightGBM on one 4GB GPU (only 2690 MB used).

Throughput: 1.330s / model, insane speed! Imagine on multiple GPUs...

image

You need 67.25 MB per thread for LightGBM on airline 0.1m data. It's about 10x lower than xgboost. Maybe @RAMitchell can get some ideas to optimize xgboost for very sparse data on GPU, as it could be a common case (xgboost does not support categoricals). LightGBM handles 16 bins, 64 bins, and 256 bins as shown here: https://github.com/microsoft/LightGBM/tree/master/src/treelearner/ocl

For the script, you may have to toy with gpu_platform_id and gpu_device_id. Unfortunately, it's due to OpenCL way of working with devices... (it will crash if it is wrong platform/device combo).

Test script:

# Sets OpenMP to 1 thread by default
Sys.setenv(OMP_NUM_THREADS = 1)

suppressMessages({
  library(optparse)
  library(data.table)
  library(parallel)
  library(lightgbm)
  library(Matrix)
})

args_list <- list( 
  optparse::make_option("--parallel_threads", type = "integer", default = 1, metavar = "Parallel CPU Threads",
                        help="Number of threads for parallel training for CPU (automatically changed if using GPU), should be greater than or equal to parallel_gpus * gpus_threads [default: %default]"),
  optparse::make_option("--model_threads", type = "integer", default = 1, metavar = "Model CPU Threads",
                        help = "Number of threads for training a single model, total number of threads is parallel_threads * model_threads [default: %default]"),
  optparse::make_option("--parallel_gpus", type = "integer", default = 0, metavar = "Parallel GPU Threads",
                        help = "Number of GPUs to use for parallel training, use 0 for no GPU [default: %default]"),
  optparse::make_option("--gpus_threads", type = "integer", default = 0, metavar = "Model GPU Threads",
                        help = "Number of parallel models to train per GPU (uses linearly more RAM), use 0 for no GPU [default: %default]"),
  optparse::make_option("--number_of_models", type = "integer", default = 1, metavar = "Number of Models",
                        help = "Number of models to train in total [default: %default]"),
  optparse::make_option("--wkdir", type = "character", default = "", metavar = "Working Directory",
                        help = "The working directory, do NOT forget it! [default: \"%default\"]"),
  optparse::make_option("--train_file", type = "character", default = "", metavar = "Training File",
                        help = "The training file to use relative to the working directory (or an absolute path), do NOT forget it! [default: \"%default\"]"),
  optparse::make_option("--test_file", type = "character", default = "", metavar = "Testing file",
                        help = "The testing file to use relative to the working directory (or an absolute path), do NOT forget it! [default: \"%default\"]"),
  optparse::make_option("--output_dir", type = "character", default = "", metavar = "Output Directory",
                        help = "The output directory for files (or an absolute path), do NOT forget it! [default: \"%default\"]"),
  optparse::make_option("--output_csv", type = "logical", default = TRUE, metavar = "Output CSV File",
                        help = "Outputs results as a CSV file [default: %default]"),
  optparse::make_option("--output_chart", type = "character", default = "jpeg", metavar = "Plot File Format",
                        help = "Outputs results as a chart using the desired format, can be any of: \"none\" (for no chart), \"eps\", \"ps\", \"tex\" (pictex), \"pdf\", \"jpeg\", \"tiff\", \"png\", \"bmp\", \"svg\", \"wmf\" (Windows only) [default: \"%default\"]"),
  optparse::make_option("--args", type = "logical", default = FALSE, metavar = "Argument Check",
                        help = "Prints the arguments passed to the R script and exits immediately [default: %default]")
)

# Force data.table as 1 thread in case you are using Fork instead of Sockets (gcc: fork X in process Y when process Y used OpenMP once, fork X cannot use OpenMP otherwise it hangs forever)
data.table::setDTthreads(1)

if (interactive()) {
  
  # Put some parameters if you wish to test once...
  my_gpus <- 1L
  my_gpus_threads <- 40L
  my_threads <- 1L # parallel::detectCores() - 1L
  my_threads_in_threads <- 1L
  my_runs <- 40L
  my_train <- "train-0.1m.csv"
  my_test <- "test.csv"
  my_output <- "./output"
  my_csv <- TRUE
  my_chart <- "jpeg"
  # my_cpu <- system("lscpu | sed -nr '/Model name/ s/.*:\\s*(.*) @ .*/\\1/p' | sed ':a;s/  / /;ta'")
  
  # CHANGE: 0.1M = GPU about 958 MB at peak... choose wisely (here, we are putting 4 models per GPU)
  if (my_gpus > 0L) {
    # my_threads <- min(my_gpus * my_gpus_threads, my_threads)
    my_threads <- my_gpus * my_gpus_threads
  }
  
} else {
  
  # Old school method... obsolete
  # DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
  # Rscript bench_file.R 1 1 0 0 25 ${DIR} ../train-0.1m.csv ../test.csv
  # args <- commandArgs(trailingOnly = TRUE)
  # 
  # setwd(args[6])
  # my_gpus <- args[3]
  # my_gpus_threads <- args[4]
  # my_threads <- args[1]
  # my_threads_in_threads <- args[2]
  # my_runs <- args[5]
  # my_train <- args[7]
  # my_test <- args[8]
  
  # DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
  # Rscript bench_lgb_test.R --parallel_threads=1 --model_threads=1 --parallel_gpus=0 --gpus_threads=0 --number_of_models=25 --wkdir=${DIR} --train_file=../train-0.1m.csv --test_file=../test.csv --output_dir=./output --output_csv=TRUE --output_chart=jpeg --args=TRUE
  # Rscript bench_lgb_test.R --parallel_threads=1 --model_threads=1 --parallel_gpus=0 --gpus_threads=0 --number_of_models=25 --wkdir=${DIR} --train_file=../train-0.1m.csv --test_file=../test.csv --output_dir=./output --output_csv=TRUE --output_chart=jpeg
  args <- optparse::parse_args(optparse::OptionParser(option_list = args_list))
  setwd(args$wkdir)
  my_gpus <- args$parallel_gpus
  my_gpus_threads <- args$gpus_threads
  my_threads <- args$parallel_threads
  my_threads_in_threads <- args$model_threads
  my_runs <- args$number_of_models
  my_train <- args$train_file
  my_test <- args$test_file
  my_output <- args$output_dir
  my_csv <- args$output_csv
  my_chart <- args$output_chart
  
  if (my_gpus > 0L) {
    # my_threads <- min(my_gpus * my_gpus_threads, my_threads)
    my_threads <- my_gpus * my_gpus_threads
    args$parallel_threads <- my_threads
  }
  
  if (args$args) {
    print(args)
    stop("\rArgument check done.")
  }
  
}


# Load data and do preprocessing

cat("[", format(Sys.time(), "%a %b %d %Y %X"), "] [Data] Loading data.\n", sep = "")
d_train <- fread(my_train, showProgress = FALSE)
d_test <- fread(my_test, showProgress = FALSE)
invisible(gc(verbose = FALSE))

cat("[", format(Sys.time(), "%a %b %d %Y %X"), "] [Data] Transforming data.\n", sep = "")
X_train_test <- sparse.model.matrix(dep_delayed_15min ~ . -1, data = rbindlist(list(d_train, d_test))); invisible(gc(verbose = FALSE))
n1 <- nrow(d_train)
n2 <- nrow(d_test)
X_train <- X_train_test[1L:n1,]; invisible(gc(verbose = FALSE))
X_test <- X_train_test[(n1 + 1L):(n1 + n2),]; invisible(gc(verbose = FALSE))

labels_train <- as.numeric(d_train$dep_delayed_15min == "Y")
labels_test <- as.numeric(d_test$dep_delayed_15min == "Y")

rm(d_train, d_test, X_train_test, n1, n2); invisible(gc(verbose = FALSE))

# CHANGE: metric function
metric <- function(preds, labels) {
  x1 <- as.numeric(preds[labels == 1])
  n1 <- as.numeric(length(x1))
  x2 <- as.numeric(preds[labels == 0])
  n2 <- as.numeric(length(x2))
  r <- rank(c(x1,x2))
  return((sum(r[1:n1]) - n1 * (n1 + 1) / 2) / (n1 * n2))
}

# CHANGE: trainer function
trainer <- function(x, row_sampling, col_sampling, max_depth, n_iter, learning_rate, nbins, nthread, n_gpus, gpu_choice, objective) {
  
  if (n_gpus > 0) {
    
    matrix_train_time <- system.time({
      dlgb_train <- lgb.Dataset(data = X_train, label = labels_train, nthread = nthread, device = "gpu")
      lgb.Dataset.construct(dlgb_train)
    })[[3]]
    matrix_test_time <- system.time({
      dlgb_test <- lgb.Dataset(data = X_test, label = labels_test, nthread = nthread, device = "gpu") # Completely useless in practice
      lgb.Dataset.construct(dlgb_test) # Completely useless in practice
    })[[3]]
    
  } else {
    
    matrix_train_time <- system.time({
      dlgb_train <- lgb.Dataset(data = X_train, label = labels_train, nthread = nthread)
      lgb.Dataset.construct(dlgb_train)
    })[[3]]
    matrix_test_time <- system.time({
      dlgb_test <- lgb.Dataset(data = X_test, label = labels_test, nthread = nthread) # Completely useless in practice
      lgb.Dataset.construct(dlgb_test) # Completely useless in practice
    })[[3]]
    
  }
  
  if (n_gpus > 0) {
    
    model_time <- system.time({
      set.seed(x) # Useless
      model_train <- lightgbm::lgb.train(data = dlgb_train, 
                                         objective = objective,
                                         nrounds = n_iter,
                                         num_leaves = (2 ^ max_depth) - 1,
                                         max_depth = max_depth,
                                         learning_rate = learning_rate,
                                         bagging_fraction = row_sampling,
                                         bagging_seed = x,
                                         bagging_freq = 1,
                                         feature_fraction = col_sampling,
                                         feature_fraction_seed = x,
                                         num_threads = nthread,
                                         device = "gpu",
                                         gpu_platform_id = 0,
                                         gpu_device_id = gpu_choice,
                                         max_bin = nbins,
                                         verbose = -1)
    })[[3]]
    
  } else {
    
    model_time <- system.time({
      set.seed(x)
      model_train <- lightgbm::lgb.train(data = dlgb_train, 
                                         objective = objective,
                                         nrounds = n_iter,
                                         num_leaves = (2 ^ max_depth) - 1,
                                         max_depth = max_depth,
                                         learning_rate = learning_rate,
                                         bagging_fraction = row_sampling,
                                         bagging_seed = x,
                                         bagging_freq = 1,
                                         feature_fraction = col_sampling,
                                         feature_fraction_seed = x,
                                         num_threads = nthread,
                                         device = "cpu",
                                         max_bin = nbins,
                                         verbose = -1)
    })[[3]]
    
  }
  
  pred_time <- system.time({
    model_predictions <- predict(model_train, data = X_test)
  })[[3]]
  
  perf <- metric(preds = model_predictions, labels = labels_test)
  
  rm(model_train, model_predictions, dlgb_train, dlgb_test)
  
  gc_time <- system.time({
    invisible(gc(verbose = FALSE))
  })[[3]]
  
  return(list(matrix_train_time = matrix_train_time, matrix_test_time = matrix_test_time, model_time = model_time, pred_time = pred_time, gc_time = gc_time, perf = perf))
  
}


# Parallel Section

cat("[", format(Sys.time(), "%a %b %d %Y %X"), "]", " [Parallel] ", my_threads, " Process(es) Creation Time: ", sprintf("%04.03f", system.time({cl <- makeCluster(my_threads)})[[3]]), "s\n", sep = "")
cat("[", format(Sys.time(), "%a %b %d %Y %X"), "]", " [Parallel] Sending Hardware Specifications Time: ", sprintf("%04.03f", system.time({clusterExport(cl = cl, c("my_threads", "my_gpus", "my_threads_in_threads"))})[[3]]), "s\n", sep = "")
invisible(parallel::parLapply(cl = cl, X = seq_len(my_threads), function(x) {
  Sys.sleep(time = my_threads / 20) # Prevent file clash on many core systems (typically 50+ threads might attempt to read exactly at the same time the same file, especially if the disk is slow)
  suppressPackageStartupMessages(library(lightgbm))
  suppressPackageStartupMessages(library(Matrix))
  suppressPackageStartupMessages(library(data.table))
  id <<- x
}))
cat("[", format(Sys.time(), "%a %b %d %Y %X"), "]", " [Parallel] Sending Data Time: ", sprintf("%04.03f", system.time({clusterExport(cl = cl, c("trainer", "metric", "X_train", "X_test", "labels_train", "labels_test", "my_threads"))})[[3]]), "s\n", sep = "")

# Having issues? In a CLI: sudo pkill R
time_finish <- system.time({
  time_all <- parallel::parLapplyLB(cl = cl, X = seq_len(my_runs), function(x) {
    
    if (my_gpus == 0L) {
      gpus_to_use <- 0
      gpus_allowed <- 0
    } else {
      gpus_to_use <- (id - 1) %% my_gpus
      gpus_allowed <- 1
    }
    
    speed_out <- system.time({
      speed_in <- trainer(x = x,
                          row_sampling = 0.9,
                          col_sampling = 0.9,
                          max_depth = 6,
                          n_iter = 500,
                          learning_rate = 0.05,
                          nbins = 255,
                          nthread = my_threads_in_threads,
                          n_gpus = gpus_allowed,
                          gpu_choice = gpus_to_use,
                          objective = "binary")
    })[[3]]
    
    rm(gpus_to_use)
    
    return(list(total = speed_out, matrix_train_time = speed_in$matrix_train_time, matrix_test_time = speed_in$matrix_test_time, model_time = speed_in$model_time, pred_time = speed_in$pred_time, gc_time = speed_in$gc_time, perf = speed_in$perf))
    
  })
})[[3]]

# Clearup all R sessions from this process, except the master
stopCluster(cl)
closeAllConnections()

rm(cl, metric, trainer, X_train, X_test, labels_train, labels_test); invisible(gc(verbose = FALSE))

cat("[", format(Sys.time(), "%a %b %d %Y %X"), "]", " [Parallel] Total Time: ", sprintf("%04.03f", time_finish), "s\n", sep = "")


# Gather Data

# Get data
time_total <- unlist(lapply(time_all, function(x) {round(x[[1]], digits = 3)}))
matrix_train_time <- unlist(lapply(time_all, function(x) {round(x[[2]], digits = 3)}))
matrix_test_time <- unlist(lapply(time_all, function(x) {round(x[[3]], digits = 3)}))
model_time <- unlist(lapply(time_all, function(x) {round(x[[4]], digits = 3)}))
pred_time <- unlist(lapply(time_all, function(x) {round(x[[5]], digits = 3)}))
gc_time <- unlist(lapply(time_all, function(x) {round(x[[6]], digits = 3)}))
perf <- unlist(lapply(time_all, function(x) {round(x[[7]], digits = 6)}))

# Put all data together
time_table <- data.table(Run = seq_len(my_runs),
                         time_total = time_total,
                         matrix_train_time = matrix_train_time,
                         matrix_test_time = matrix_test_time,
                         model_time = model_time,
                         pred_time = pred_time,
                         gc_time = gc_time,
                         perf = perf)

if (my_csv) {
  
  fwrite(time_table, paste0(my_output, "/ml-perf_lgb_gbdt_", substr(my_train, 1, nchar(my_train) - 4), "_", my_threads, "Tx", my_threads_in_threads, "T_", my_gpus, "GPU_", my_runs, "m_", sprintf("%04.03f", time_finish), "s.csv"))
  
}

# Analyze Data

if (my_chart != "none") {
  
  suppressMessages({
    library(ggplot2)
    library(ClusterR)
  })
  
  # Create time series matrix
  time_table_matrix <- apply(as.matrix(time_table[, 2:8, with = FALSE]), MARGIN = 2, function(x) {
    y <- cumsum(x)
    y / max(y)
  })
  
  # Compute optimal number of non-parametric clusters
  clusters <- Optimal_Clusters_Medoids(data = time_table_matrix,
                                       max_clusters = 2:10,
                                       distance_metric = "manhattan",
                                       criterion = "silhouette",
                                       threads = 1,
                                       swap_phase = TRUE,
                                       verbose = FALSE,
                                       plot_clusters = FALSE,
                                       seed = 1)
  
  # Compute clusters
  clusters_selected <- Cluster_Medoids(data = time_table_matrix,
                                       clusters = 1 + which.max(unlist(lapply(clusters, function(x) {x[[3]]}))),
                                       distance_metric = "manhattan",
                                       threads = 1,
                                       swap_phase = TRUE,
                                       verbose = FALSE,
                                       seed = 1)
  time_table[, Cluster := as.character(clusters_selected$clusters)]
  
  # Melt data
  time_table_vertical <- melt(time_table, id.vars = c("Run", "Cluster"), measure.vars = c("time_total", "matrix_train_time", "matrix_test_time", "model_time", "pred_time", "gc_time", "perf"), variable.name = "Variable", value.name = "Value", variable.factor = FALSE, value.factor = FALSE)
  
  # Rename melted variables to have details in chart
  time_table_vertical[Variable == "time_total", Variable := paste0("1. Total Time (Σ=", sprintf("%04.03f", sum(Value)), ", μ=", sprintf("%04.03f", mean(Value)), ", σ=", sprintf("%04.03f", sd(Value)), ")")]
  time_table_vertical[Variable == "model_time", Variable := paste0("2. Model Time (Σ=", sprintf("%04.03f", sum(Value)), ", μ=", sprintf("%04.03f", mean(Value)), ", σ=", sprintf("%04.03f", sd(Value)), ")")]
  time_table_vertical[Variable == "matrix_train_time", Variable := paste0("3. Matrix Train Build Time (Σ=", sprintf("%04.03f", sum(Value)), ", μ=", sprintf("%04.03f", mean(Value)), ", σ=", sprintf("%04.03f", sd(Value)), ")")]
  time_table_vertical[Variable == "matrix_test_time", Variable := paste0("4. Matrix Test Build Time (Σ=", sprintf("%04.03f", sum(Value)), ", μ=", sprintf("%04.03f", mean(Value)), ", σ=", sprintf("%04.03f", sd(Value)), ")")]
  time_table_vertical[Variable == "pred_time", Variable := paste0("5. Predict Time (Σ=", sprintf("%04.03f", sum(Value)), ", μ=", sprintf("%04.03f", mean(Value)), ", σ=", sprintf("%04.03f", sd(Value)), ")")]
  time_table_vertical[Variable == "gc_time", Variable := paste0("6. Garbage Collector Time (Σ=", sprintf("%04.03f", sum(Value)), ", μ=", sprintf("%04.03f", mean(Value)), ", σ=", sprintf("%04.03f", sd(Value)), ")")]
  time_table_vertical[Variable == "perf", Variable := paste0("7. Metric (Σ=", sprintf("%07.06f", sum(Value)), ", μ=", sprintf("%07.06f", mean(Value)), ", σ=", sprintf("%07.06f", sd(Value)), ")")]
  cat(sort(unique(time_table_vertical$Variable)), sep = "\n")
  
  # Plot a nice chart
  my_plot <- ggplot(data = time_table_vertical, aes(x = Run, y = Value, group = Cluster, color = Cluster)) + geom_point() + facet_wrap(facets = Variable ~ ., nrow = 4, ncol = 2, scales = "free_y") + labs(title = "'Performance' over Models, LightGBM GBDT", subtitle = paste0(my_runs, " Models over ", sprintf("%04.03f", time_finish), " seconds using ", my_threads, " parallel threads, ", my_threads_in_threads, " model threads, and ", my_gpus, " GPUs (Throughput: ", sprintf("%04.03f", time_finish / my_runs), "s / Model", ")"), x = "Model", y = "Value or Time (s)") + theme_bw() + theme(legend.position = "none")
  ggsave(filename = paste0(my_output, "/ml-perf_lgb_gbdt_", substr(my_train, 1, nchar(my_train) - 4), "_", my_threads, "Tx", my_threads_in_threads, "T_", my_gpus, "GPU_", my_runs, "m_", sprintf("%04.03f", time_finish), "s.jpg"),
         plot = my_plot,
         device = my_chart,
         width = 24,
         height = 16,
         units = "cm",
         dpi = "print")
  
  if (interactive()) {
    print(my_plot)
  }
  
}

cat("[", format(Sys.time(), "%a %b %d %Y %X"), "] Done computations. Quitting R.\n", sep = "")

demo benchmark for db parallelism in production using R

Script example for benchmark where we have:

  • we have 1000 models to train (100k observations)
  • we want to balance workload to make threads as busy as possible
  • we may (or may not have) GPUs available
  • we are using xgboost with a known hyperparameter combo (with RNG seed + a bit of sub sampling / column sampling)

Provides the following, by run:

  1. total time to train
  2. model training time
  3. time to build training matrix to feed to xgboost
  4. time to build testing matrix to feed to xgboost
  5. prediction time
  6. garbage collection time (R specific overhead)
  7. metric

Also in addition, auto clustering is performed to find if there are potential changes in the data series.

Expected behavior:

  • Lot of outliers during early/end of parallelization
  • Stable metric

Code demo:

# Sets OpenMP to 1 thread by default, bypasses xgboost forcing all thread on xgb.DMatrix
Sys.setenv(OMP_NUM_THREADS = 1)

suppressMessages({
  library(data.table)
  library(parallel)
  library(xgboost)
  library(Matrix)
})

# Force data.table as 1 thread in case you are using Fork instead of Sockets (gcc: fork X in process Y when process Y used OpenMP once, fork X cannot use OpenMP otherwise it hangs forever)
data.table::setDTthreads(1)

# CHANGE: Specify your number of GPUs / Threads
my_gpus <- 4L
my_threads <- parallel::detectCores() - 1L
my_runs <- 1000

# CHANGE: 0.1M = GPU about 958 MB at peak... choose wisely (here, we are putting 4 models per GPU)
if (my_gpus > 0L) {
  my_threads <- min(my_gpus * 4L, my_threads)
}


# Load data and do preprocessing

d_train <- fread("train-0.1m.csv", showProgress = FALSE)
d_test <- fread("test.csv", showProgress = FALSE)
invisible(gc(verbose = FALSE))

X_train_test <- sparse.model.matrix(dep_delayed_15min ~ . -1, data = rbindlist(list(d_train, d_test))); invisible(gc(verbose = FALSE))
n1 <- nrow(d_train)
n2 <- nrow(d_test)
X_train <- X_train_test[1L:n1,]; invisible(gc(verbose = FALSE))
X_test <- X_train_test[(n1 + 1L):(n1 + n2),]; invisible(gc(verbose = FALSE))

labels_train <- as.numeric(d_train$dep_delayed_15min == "Y")
labels_test <- as.numeric(d_test$dep_delayed_15min == "Y")

# dxgb_train <- xgb.DMatrix(data = X_train, label = labels_train); invisible(gc(verbose = FALSE))
# dxgb_test <- xgb.DMatrix(data = X_test); invisible(gc(verbose = FALSE))

rm(d_train, d_test, X_train_test, n1, n2); invisible(gc(verbose = FALSE))

# CHANGE: metric function
metric <- function(preds, labels) {
  x1 <- as.numeric(preds[labels == 1])
  n1 <- as.numeric(length(x1))
  x2 <- as.numeric(preds[labels == 0])
  n2 <- as.numeric(length(x2))
  r <- rank(c(x1,x2))
  return((sum(r[1:n1]) - n1 * (n1 + 1) / 2) / (n1 * n2))
}

# CHANGE: trainer function
trainer <- function(x, row_sampling, col_sampling, max_depth, n_iter, learning_rate, nbins, nthread, n_gpus, gpu_choice, objective) {
  
  matrix_train_time <- system.time({
    dxgb_train <- xgboost::xgb.DMatrix(data = X_train, label = labels_train)
  })[[3]]
  matrix_test_time <- system.time({
    dxgb_test <- xgboost::xgb.DMatrix(data = X_test, label = labels_test)
  })[[3]]
  
  if (n_gpus > 0) {
    
    model_time <- system.time({
      set.seed(x)
      model_train <- xgboost::xgb.train(data = dxgb_train,
                                        objective = objective,
                                        nrounds = n_iter,
                                        max_depth = max_depth,
                                        eta = learning_rate,
                                        subsample = row_sampling,
                                        colsample_bytree = col_sampling,
                                        nthread = nthread,
                                        n_gpus = n_gpus,
                                        gpu_id = gpu_choice,
                                        tree_method = "gpu_hist",
                                        max_bin = nbins,
                                        predictor = "gpu_predictor")
    })[[3]]
    
  } else {
    
    model_time <- system.time({
      set.seed(x)
      model_train <- xgboost::xgb.train(data = dxgb_train,
                                        objective = objective,
                                        nrounds = n_iter,
                                        max_depth = max_depth,
                                        eta = learning_rate,
                                        subsample = row_sampling,
                                        colsample_bytree = col_sampling,
                                        nthread = nthread,
                                        n_gpus = 0,
                                        tree_method = "hist",
                                        max_bin = nbins)
    })[[3]]
    
  }
  
  pred_time <- system.time({
    model_predictions <- predict(model_train, newdata = dxgb_test)
  })[[3]]
  
  perf <- metric(preds = model_predictions, labels = labels_test)
  
  rm(model_train, model_predictions, dxgb_train, dxgb_test)
  
  gc_time <- system.time({
    invisible(gc(verbose = FALSE))
  })[[3]]
  
  return(list(matrix_train_time = matrix_train_time, matrix_test_time = matrix_test_time, model_time = model_time, pred_time = pred_time, gc_time = gc_time, perf = perf))
  
}


# Parallel Section

cat("[Parallel] ", my_threads, " Process(es) Creation Time: ", sprintf("%04.03f", system.time({cl <- makeCluster(my_threads)})[[3]]), "s\n", sep = "")
cat("[Parallel] Sending Hardware Specifications Time: ", sprintf("%04.03f", system.time({clusterExport(cl = cl, c("my_threads", "my_gpus"))})[[3]]), "s\n", sep = "")
invisible(parallel::parLapply(cl = cl, X = seq_len(my_threads), function(x) {
  Sys.sleep(time = my_threads / 20) # Prevent file clash on many core systems (typically 50+ threads might attempt to read exactly at the same time the same file, especially if the disk is slow)
  suppressPackageStartupMessages(library(xgboost))
  suppressPackageStartupMessages(library(Matrix))
  suppressPackageStartupMessages(library(data.table))
  id <<- x
}))
cat("[Parallel] Sending Data Time: ", sprintf("%04.03f", system.time({clusterExport(cl = cl, c("trainer", "metric", "X_train", "X_test", "labels_train", "labels_test", "my_threads"))})[[3]]), "s\n", sep = "")

# Having issues? In a CLI: sudo pkill R
time_finish <- system.time({
  time_all <- parallel::parLapplyLB(cl = cl, X = seq_len(my_runs), function(x) {
    
    if (my_gpus == 0L) {
      gpus_to_use <- 0
      gpus_allowed <- 0
    } else {
      gpus_to_use <- (id - 1) %% my_gpus
      gpus_allowed <- 1
    }
    
    speed_out <- system.time({
      speed_in <- trainer(x = x,
                          row_sampling = 0.9,
                          col_sampling = 0.9,
                          max_depth = 6,
                          n_iter = 500,
                          learning_rate = 0.05,
                          nbins = 255,
                          nthread = 1,
                          n_gpus = gpus_allowed,
                          gpu_choice = gpus_to_use,
                          objective = "binary:logistic")
    })[[3]]
    
    rm(gpus_to_use)
    
    return(list(total = speed_out, matrix_train_time = speed_in$matrix_train_time, matrix_test_time = speed_in$matrix_test_time, model_time = speed_in$model_time, pred_time = speed_in$pred_time, gc_time = speed_in$gc_time, perf = speed_in$perf))
    
  })
})[[3]]

# Clearup all R sessions from this process, except the master
stopCluster(cl)
closeAllConnections()

rm(cl, metric, trainer, X_train, X_test, labels_train, labels_test); invisible(gc(verbose = FALSE))

cat("[Parallel] Total Time: ", sprintf("%04.03f", time_finish), "s\n", sep = "")


# Gather Data

# Get data
time_total <- unlist(lapply(time_all, function(x) {x[[1]]}))
matrix_train_time <- unlist(lapply(time_all, function(x) {x[[2]]}))
matrix_test_time <- unlist(lapply(time_all, function(x) {x[[3]]}))
model_time <- unlist(lapply(time_all, function(x) {x[[4]]}))
pred_time <- unlist(lapply(time_all, function(x) {x[[5]]}))
gc_time <- unlist(lapply(time_all, function(x) {x[[6]]}))
perf <- unlist(lapply(time_all, function(x) {x[[7]]}))

# Put all data together
time_table <- data.table(Run = seq_len(my_runs),
                         time_total = time_total,
                         matrix_train_time = matrix_train_time,
                         matrix_test_time = matrix_test_time,
                         model_time = model_time,
                         pred_time = pred_time,
                         gc_time = gc_time,
                         perf = perf)


# Analyze Data

library(ggplot2)
library(ClusterR)

# Create time series matrix
time_table_matrix <- apply(as.matrix(time_table[, 2:8, with = FALSE]), MARGIN = 2, function(x) {
  y <- cumsum(x)
  y / max(y)
})

# Compute optimal number of non-parametric clusters
clusters <- Optimal_Clusters_Medoids(data = time_table_matrix,
                                     max_clusters = 2:10,
                                     distance_metric = "manhattan",
                                     criterion = "silhouette",
                                     threads = 1,
                                     swap_phase = TRUE,
                                     verbose = FALSE,
                                     plot_clusters = FALSE,
                                     seed = 1)

# Compute clusters
clusters_selected <- Cluster_Medoids(data = time_table_matrix,
                                     clusters = 1 + which.max(unlist(lapply(clusters, function(x) {x[[3]]}))),
                                     distance_metric = "manhattan",
                                     threads = 1,
                                     swap_phase = TRUE,
                                     verbose = FALSE,
                                     seed = 1)
time_table[, Cluster := as.character(clusters_selected$clusters)]

# Melt data
time_table_vertical <- melt(time_table, id.vars = c("Run", "Cluster"), measure.vars = c("time_total", "matrix_train_time", "matrix_test_time", "model_time", "pred_time", "gc_time", "perf"), variable.name = "Variable", value.name = "Value", variable.factor = FALSE, value.factor = FALSE)

# Rename melted variables to have details in chart
time_table_vertical[Variable == "time_total", Variable := paste0("1. Total Time (Σ=", sprintf("%04.03f", sum(Value)), ", μ=", sprintf("%04.03f", mean(Value)), ", σ=", sprintf("%04.03f", sd(Value)), ")")]
time_table_vertical[Variable == "model_time", Variable := paste0("2. Model Time (Σ=", sprintf("%04.03f", sum(Value)), ", μ=", sprintf("%04.03f", mean(Value)), ", σ=", sprintf("%04.03f", sd(Value)), ")")]
time_table_vertical[Variable == "matrix_train_time", Variable := paste0("3. Matrix Train Build Time (Σ=", sprintf("%04.03f", sum(Value)), ", μ=", sprintf("%04.03f", mean(Value)), ", σ=", sprintf("%04.03f", sd(Value)), ")")]
time_table_vertical[Variable == "matrix_test_time", Variable := paste0("4. Matrix Test Build Time (Σ=", sprintf("%04.03f", sum(Value)), ", μ=", sprintf("%04.03f", mean(Value)), ", σ=", sprintf("%04.03f", sd(Value)), ")")]
time_table_vertical[Variable == "pred_time", Variable := paste0("5. Predict Time (Σ=", sprintf("%04.03f", sum(Value)), ", μ=", sprintf("%04.03f", mean(Value)), ", σ=", sprintf("%04.03f", sd(Value)), ")")]
time_table_vertical[Variable == "gc_time", Variable := paste0("6. Garbage Collector Time (Σ=", sprintf("%04.03f", sum(Value)), ", μ=", sprintf("%04.03f", mean(Value)), ", σ=", sprintf("%04.03f", sd(Value)), ")")]
time_table_vertical[Variable == "perf", Variable := paste0("7. Performance/Metric (Σ=", sprintf("%04.03f", sum(Value)), ", μ=", sprintf("%04.03f", mean(Value)), ", σ=", sprintf("%04.03f", sd(Value)), ")")]
cat(sort(unique(time_table_vertical$Variable)), sep = "\n")

# Plot a nice chart
ggplot(data = time_table_vertical, aes(x = Run, y = Value, group = Cluster, color = Cluster)) + geom_point() + facet_wrap(facets = Variable ~ ., nrow = 4, ncol = 2, scales = "free_y") + labs(title = "'Performance' over Models", subtitle = paste0(my_runs, " Models over ", sprintf("%04.03f", time_finish), " seconds using ", my_threads, " threads and ", my_gpus, " GPUs (", sprintf("%04.03f", time_finish / my_runs), "s / Model", ")"), x = "Model", y = "Value or Time (s)") + theme_bw() + theme(legend.position = "none")

demo benchmark for db parallelism in production using R

Script example for benchmark where we have:

  • we have X models to train (100k observations)
  • we want to balance workload to make threads as busy as possible
  • we may (or may not have) GPUs available
  • we are using xgboost hist (CPU or GPU) with a known hyperparameter combo (with RNG seed + a bit of sub sampling / column sampling)

Provides the following, by run:

  1. total time to train
  2. model training time
  3. time to build training matrix to feed to xgboost
  4. time to build testing matrix to feed to xgboost
  5. prediction time
  6. garbage collection time (R specific overhead)
  7. metric

Also in addition, auto clustering is performed to find if there are potential changes in the data series.

Expected behavior:

  • Script success (no OOM)
  • No increasing memory usage once stabilized (gc must work as it is)
  • Lot of outliers during early/end of parallelization (in timings) due to firing up/off threads simultaneously
  • Stable metric overall
  • Scaling up with CPU threads and GPUs (throughput metric: more models per second done)
  • Scaling up parallel should be better than scaling up multithreaded
  • Get some nice output CSV/chart files for analysis
  • Dominating Model Training time, followed by Model Predict time (ignoring Metric time)

Objective of the libraries tested:

  • Go through the matrix creation (for the model) to the model training (train a machine learning model) to the prediction (predict quickly) as fast as possible, while using the least memory possible (we are not using fork, to make sure NUMA is affecting the least possible the parallel processes)
  • Keep metric performance as high as possible against a blind spot (no early stopping) with usually good but fine out of the box hyperparameters (GBDT: 500 rounds, 0.05 learning rate, 6 depth, 0.9 row sampling, 0.9 column sampling by tree) which actually should favor speed for all libraries tested

Example charts with 1 to 4 GPUs:

1 GPU (20.436s / model):

image

2 GPUs (10.666s / model):
image

3 GPUs (6.999s / model):
image

4 GPUs (5.182s / model) :
image

I am currently using a shell script to run a big batch of different works on my Dual Xeon 6154 with 4 Quadro P1000 (4 Quadro P1000 is approximately equal to 1 GeForce 1080). It gets the current folder, and executes the R benchmark script with different parameters.

For the 0.1m dataset used and if you want to use GPU, you need about 1GB GPU RAM peak per process. Do not believe it just use the peak 659 MB reported by nvidia-smi, the peak is very short and happens at the beginning of training (running 4 models is the max for 4GB RAM GPU, not well optimized implementation for sparse data in xgboost GPU hist).

image

Run Parallel Threads Model Threads Parallel GPUs GPU Threads Models
1 1 1 1 1 25
2 2 1 2 1 50
3 3 1 3 1 100
4 4 1 4 1 250
5 4 1 1 4 50
6 8 1 2 4 100
7 12 1 3 4 250
8 16 1 4 4 500
9 1 1 0 0 25
10 9 1 0 0 50
11 18 1 0 0 100
12 35 1 0 0 250
13 70 1 0 0 500
14 1 1 0 0 50
15 1 9 0 0 50
16 1 18 0 0 50
17 1 35 0 0 50
18 1 70 0 0 50

(yeah, run 9 and 14 are identical...)

Shell script, you will need in the folder you are running the benchmark:

/ ml-perf
--- bench_xgb_test.R
--- train_0.1m.csv
--- test.csv
--/ output
----- nothing yet

Note my script clears up GOMP_CPU_AFFINITY variable if you set it. If you set it and you are not clearing it, you will end up with only 1 thread being used for all runs (and all processes will share the same thread).

DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
env -u GOMP_CPU_AFFINITY Rscript bench_xgb_test.R --parallel_threads=1 --model_threads=1 --parallel_gpus=1 --gpus_threads=1 --number_of_models=25 --wkdir=${DIR} --train_file=train-0.1m.csv --test_file=test.csv --output_dir=./output --output_csv=TRUE --output_chart=jpeg
env -u GOMP_CPU_AFFINITY Rscript bench_xgb_test.R --parallel_threads=2 --model_threads=1 --parallel_gpus=2 --gpus_threads=1 --number_of_models=50 --wkdir=${DIR} --train_file=train-0.1m.csv --test_file=test.csv --output_dir=./output --output_csv=TRUE --output_chart=jpeg
env -u GOMP_CPU_AFFINITY Rscript bench_xgb_test.R --parallel_threads=3 --model_threads=1 --parallel_gpus=3 --gpus_threads=1 --number_of_models=100 --wkdir=${DIR} --train_file=train-0.1m.csv --test_file=test.csv --output_dir=./output --output_csv=TRUE --output_chart=jpeg
env -u GOMP_CPU_AFFINITY Rscript bench_xgb_test.R --parallel_threads=4 --model_threads=1 --parallel_gpus=4 --gpus_threads=1 --number_of_models=250 --wkdir=${DIR} --train_file=train-0.1m.csv --test_file=test.csv --output_dir=./output --output_csv=TRUE --output_chart=jpeg
env -u GOMP_CPU_AFFINITY Rscript bench_xgb_test.R --parallel_threads=4 --model_threads=1 --parallel_gpus=1 --gpus_threads=4 --number_of_models=50 --wkdir=${DIR} --train_file=train-0.1m.csv --test_file=test.csv --output_dir=./output --output_csv=TRUE --output_chart=jpeg
env -u GOMP_CPU_AFFINITY Rscript bench_xgb_test.R --parallel_threads=8 --model_threads=1 --parallel_gpus=2 --gpus_threads=4 --number_of_models=100 --wkdir=${DIR} --train_file=train-0.1m.csv --test_file=test.csv --output_dir=./output --output_csv=TRUE --output_chart=jpeg
env -u GOMP_CPU_AFFINITY Rscript bench_xgb_test.R --parallel_threads=12 --model_threads=1 --parallel_gpus=3 --gpus_threads=4 --number_of_models=250 --wkdir=${DIR} --train_file=train-0.1m.csv --test_file=test.csv --output_dir=./output --output_csv=TRUE --output_chart=jpeg
env -u GOMP_CPU_AFFINITY Rscript bench_xgb_test.R --parallel_threads=16 --model_threads=1 --parallel_gpus=4 --gpus_threads=4 --number_of_models=500 --wkdir=${DIR} --train_file=train-0.1m.csv --test_file=test.csv --output_dir=./output --output_csv=TRUE --output_chart=jpeg
env -u GOMP_CPU_AFFINITY Rscript bench_xgb_test.R --parallel_threads=1 --model_threads=1 --parallel_gpus=0 --gpus_threads=0 --number_of_models=25 --wkdir=${DIR} --train_file=train-0.1m.csv --test_file=test.csv --output_dir=./output --output_csv=TRUE --output_chart=jpeg
env -u GOMP_CPU_AFFINITY Rscript bench_xgb_test.R --parallel_threads=9 --model_threads=1 --parallel_gpus=0 --gpus_threads=0 --number_of_models=50 --wkdir=${DIR} --train_file=train-0.1m.csv --test_file=test.csv --output_dir=./output --output_csv=TRUE --output_chart=jpeg
env -u GOMP_CPU_AFFINITY Rscript bench_xgb_test.R --parallel_threads=18 --model_threads=1 --parallel_gpus=0 --gpus_threads=0 --number_of_models=100 --wkdir=${DIR} --train_file=train-0.1m.csv --test_file=test.csv --output_dir=./output --output_csv=TRUE --output_chart=jpeg
env -u GOMP_CPU_AFFINITY Rscript bench_xgb_test.R --parallel_threads=35 --model_threads=1 --parallel_gpus=0 --gpus_threads=0 --number_of_models=250 --wkdir=${DIR} --train_file=train-0.1m.csv --test_file=test.csv --output_dir=./output --output_csv=TRUE --output_chart=jpeg
env -u GOMP_CPU_AFFINITY Rscript bench_xgb_test.R --parallel_threads=70 --model_threads=1 --parallel_gpus=0 --gpus_threads=0 --number_of_models=500 --wkdir=${DIR} --train_file=train-0.1m.csv --test_file=test.csv --output_dir=./output --output_csv=TRUE --output_chart=jpeg
env -u GOMP_CPU_AFFINITY Rscript bench_xgb_test.R --parallel_threads=1 --model_threads=1 --parallel_gpus=0 --gpus_threads=0 --number_of_models=50 --wkdir=${DIR} --train_file=train-0.1m.csv --test_file=test.csv --output_dir=./output --output_csv=TRUE --output_chart=jpeg
env -u GOMP_CPU_AFFINITY Rscript bench_xgb_test.R --parallel_threads=1 --model_threads=9 --parallel_gpus=0 --gpus_threads=0 --number_of_models=50 --wkdir=${DIR} --train_file=train-0.1m.csv --test_file=test.csv --output_dir=./output --output_csv=TRUE --output_chart=jpeg
env -u GOMP_CPU_AFFINITY Rscript bench_xgb_test.R --parallel_threads=1 --model_threads=18 --parallel_gpus=0 --gpus_threads=0 --number_of_models=50 --wkdir=${DIR} --train_file=train-0.1m.csv --test_file=test.csv --output_dir=./output --output_csv=TRUE --output_chart=jpeg
env -u GOMP_CPU_AFFINITY Rscript bench_xgb_test.R --parallel_threads=1 --model_threads=35 --parallel_gpus=0 --gpus_threads=0 --number_of_models=50 --wkdir=${DIR} --train_file=train-0.1m.csv --test_file=test.csv --output_dir=./output --output_csv=TRUE --output_chart=jpeg
env -u GOMP_CPU_AFFINITY Rscript bench_xgb_test.R --parallel_threads=1 --model_threads=70 --parallel_gpus=0 --gpus_threads=0 --number_of_models=50 --wkdir=${DIR} --train_file=train-0.1m.csv --test_file=test.csv --output_dir=./output --output_csv=TRUE --output_chart=jpeg

You should see similar to this during training:

image

The full bench_xgb_test.R script. Use bench_xgb_test -h to get help for the commands. Passing --args=true as parameter allows you to check the parameters you passed to the script, without executing any training / data loading. You may run want it interactively if you wish...

Required R packages:

  • optparse (parses the CLI parameters)
  • data.table (load data, xgboost data holders)
  • parallel (parallel sockets for multiprocessing on available threads)
  • xgboost (with GPU support if you want to use GPU parameters)
  • Matrix (xgboost specific)
  • ggplot2 (for charting)
  • ClusterR (for the chart, autoclustering time series by time aka model number)

Script:

# Sets OpenMP to 1 thread by default, bypasses xgboost forcing all thread on xgb.DMatrix
Sys.setenv(OMP_NUM_THREADS = 1)

suppressMessages({
  library(optparse)
  library(data.table)
  library(parallel)
  library(xgboost)
  library(Matrix)
})

args_list <- list( 
  optparse::make_option("--parallel_threads", type = "integer", default = 1, metavar = "Parallel CPU Threads",
                        help="Number of threads for parallel training for CPU (automatically changed if using GPU), should be greater than or equal to parallel_gpus * gpus_threads [default: %default]"),
  optparse::make_option("--model_threads", type = "integer", default = 1, metavar = "Model CPU Threads",
                        help = "Number of threads for training a single model, total number of threads is parallel_threads * model_threads [default: %default]"),
  optparse::make_option("--parallel_gpus", type = "integer", default = 0, metavar = "Parallel GPU Threads",
                        help = "Number of GPUs to use for parallel training, use 0 for no GPU [default: %default]"),
  optparse::make_option("--gpus_threads", type = "integer", default = 0, metavar = "Model GPU Threads",
                        help = "Number of parallel models to train per GPU (uses linearly more RAM), use 0 for no GPU [default: %default]"),
  optparse::make_option("--number_of_models", type = "integer", default = 1, metavar = "Number of Models",
                        help = "Number of models to train in total [default: %default]"),
  optparse::make_option("--wkdir", type = "character", default = "", metavar = "Working Directory",
                        help = "The working directory, do NOT forget it! [default: \"%default\"]"),
  optparse::make_option("--train_file", type = "character", default = "", metavar = "Training File",
                        help = "The training file to use relative to the working directory (or an absolute path), do NOT forget it! [default: \"%default\"]"),
  optparse::make_option("--test_file", type = "character", default = "", metavar = "Testing file",
                        help = "The testing file to use relative to the working directory (or an absolute path), do NOT forget it! [default: \"%default\"]"),
  optparse::make_option("--output_dir", type = "character", default = "", metavar = "Output Directory",
                        help = "The output directory for files (or an absolute path), do NOT forget it! [default: \"%default\"]"),
  optparse::make_option("--output_csv", type = "logical", default = TRUE, metavar = "Output CSV File",
                        help = "Outputs results as a CSV file [default: %default]"),
  optparse::make_option("--output_chart", type = "character", default = "jpeg", metavar = "Plot File Format",
                        help = "Outputs results as a chart using the desired format, can be any of: \"none\" (for no chart), \"eps\", \"ps\", \"tex\" (pictex), \"pdf\", \"jpeg\", \"tiff\", \"png\", \"bmp\", \"svg\", \"wmf\" (Windows only) [default: \"%default\"]"),
  optparse::make_option("--args", type = "logical", default = FALSE, metavar = "Argument Check",
                        help = "Prints the arguments passed to the R script and exits immediately [default: %default]")
)

# Force data.table as 1 thread in case you are using Fork instead of Sockets (gcc: fork X in process Y when process Y used OpenMP once, fork X cannot use OpenMP otherwise it hangs forever)
data.table::setDTthreads(1)

if (interactive()) {
  
  # Put some parameters if you wish to test once...
  my_gpus <- 1L
  my_gpus_threads <- 1L
  my_threads <- parallel::detectCores() - 1L
  my_threads_in_threads <- 1L
  my_runs <- 100L
  my_train <- "train-0.1m.csv"
  my_test <- "test.csv"
  my_output <- "./output"
  my_csv <- TRUE
  my_chart <- "jpeg"
  # my_cpu <- system("lscpu | sed -nr '/Model name/ s/.*:\\s*(.*) @ .*/\\1/p' | sed ':a;s/  / /;ta'")
  
  # CHANGE: 0.1M = GPU about 958 MB at peak... choose wisely (here, we are putting 4 models per GPU)
  if (my_gpus > 0L) {
    # my_threads <- min(my_gpus * my_gpus_threads, my_threads)
    my_threads <- my_gpus * my_gpus_threads
  }
  
} else {
  
  # Old school method... obsolete
  # DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
  # Rscript bench_file.R 1 1 0 0 25 ${DIR} ../train-0.1m.csv ../test.csv
  # args <- commandArgs(trailingOnly = TRUE)
  # 
  # setwd(args[6])
  # my_gpus <- args[3]
  # my_gpus_threads <- args[4]
  # my_threads <- args[1]
  # my_threads_in_threads <- args[2]
  # my_runs <- args[5]
  # my_train <- args[7]
  # my_test <- args[8]
  
  # DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
  # Rscript bench_xgb_test.R --parallel_threads=1 --model_threads=1 --parallel_gpus=0 --gpus_threads=0 --number_of_models=25 --wkdir=${DIR} --train_file=../train-0.1m.csv --test_file=../test.csv --output_dir=./output --output_csv=TRUE --output_chart=jpeg --args=TRUE
  # Rscript bench_xgb_test.R --parallel_threads=1 --model_threads=1 --parallel_gpus=0 --gpus_threads=0 --number_of_models=25 --wkdir=${DIR} --train_file=../train-0.1m.csv --test_file=../test.csv --output_dir=./output --output_csv=TRUE --output_chart=jpeg
  args <- optparse::parse_args(optparse::OptionParser(option_list = args_list))
  setwd(args$wkdir)
  my_gpus <- args$parallel_gpus
  my_gpus_threads <- args$gpus_threads
  my_threads <- args$parallel_threads
  my_threads_in_threads <- args$model_threads
  my_runs <- args$number_of_models
  my_train <- args$train_file
  my_test <- args$test_file
  my_output <- args$output_dir
  my_csv <- args$output_csv
  my_chart <- args$output_chart
  
  if (my_gpus > 0L) {
    # my_threads <- min(my_gpus * my_gpus_threads, my_threads)
    my_threads <- my_gpus * my_gpus_threads
    args$parallel_threads <- my_threads
  }
  
  if (args$args) {
    print(args)
    stop("\rArgument check done.")
  }
  
}


# Load data and do preprocessing

cat("[", format(Sys.time(), "%a %b %d %Y %X"), "] [Data] Loading data.\n", sep = "")
d_train <- fread(my_train, showProgress = FALSE)
d_test <- fread(my_test, showProgress = FALSE)
invisible(gc(verbose = FALSE))

cat("[", format(Sys.time(), "%a %b %d %Y %X"), "] [Data] Transforming data.\n", sep = "")
X_train_test <- sparse.model.matrix(dep_delayed_15min ~ . -1, data = rbindlist(list(d_train, d_test))); invisible(gc(verbose = FALSE))
n1 <- nrow(d_train)
n2 <- nrow(d_test)
X_train <- X_train_test[1L:n1,]; invisible(gc(verbose = FALSE))
X_test <- X_train_test[(n1 + 1L):(n1 + n2),]; invisible(gc(verbose = FALSE))

labels_train <- as.numeric(d_train$dep_delayed_15min == "Y")
labels_test <- as.numeric(d_test$dep_delayed_15min == "Y")

# dxgb_train <- xgb.DMatrix(data = X_train, label = labels_train); invisible(gc(verbose = FALSE))
# dxgb_test <- xgb.DMatrix(data = X_test); invisible(gc(verbose = FALSE))

rm(d_train, d_test, X_train_test, n1, n2); invisible(gc(verbose = FALSE))

# CHANGE: metric function
metric <- function(preds, labels) {
  x1 <- as.numeric(preds[labels == 1])
  n1 <- as.numeric(length(x1))
  x2 <- as.numeric(preds[labels == 0])
  n2 <- as.numeric(length(x2))
  r <- rank(c(x1,x2))
  return((sum(r[1:n1]) - n1 * (n1 + 1) / 2) / (n1 * n2))
}

# CHANGE: trainer function
trainer <- function(x, row_sampling, col_sampling, max_depth, n_iter, learning_rate, nbins, nthread, n_gpus, gpu_choice, objective) {
  
  matrix_train_time <- system.time({
    dxgb_train <- xgboost::xgb.DMatrix(data = X_train, label = labels_train)
  })[[3]]
  matrix_test_time <- system.time({
    dxgb_test <- xgboost::xgb.DMatrix(data = X_test, label = labels_test)
  })[[3]]
  
  if (n_gpus > 0) {
    
    model_time <- system.time({
      set.seed(x)
      model_train <- xgboost::xgb.train(data = dxgb_train,
                                        objective = objective,
                                        nrounds = n_iter,
                                        max_depth = max_depth,
                                        eta = learning_rate,
                                        subsample = row_sampling,
                                        colsample_bytree = col_sampling,
                                        nthread = nthread,
                                        n_gpus = n_gpus,
                                        gpu_id = gpu_choice,
                                        tree_method = "gpu_hist",
                                        max_bin = nbins,
                                        predictor = "gpu_predictor")
    })[[3]]
    
  } else {
    
    model_time <- system.time({
      set.seed(x)
      model_train <- xgboost::xgb.train(data = dxgb_train,
                                        objective = objective,
                                        nrounds = n_iter,
                                        max_depth = max_depth,
                                        eta = learning_rate,
                                        subsample = row_sampling,
                                        colsample_bytree = col_sampling,
                                        nthread = nthread,
                                        n_gpus = 0,
                                        tree_method = "hist",
                                        max_bin = nbins)
    })[[3]]
    
  }
  
  pred_time <- system.time({
    model_predictions <- predict(model_train, newdata = dxgb_test)
  })[[3]]
  
  perf <- metric(preds = model_predictions, labels = labels_test)
  
  rm(model_train, model_predictions, dxgb_train, dxgb_test)
  
  gc_time <- system.time({
    invisible(gc(verbose = FALSE))
  })[[3]]
  
  return(list(matrix_train_time = matrix_train_time, matrix_test_time = matrix_test_time, model_time = model_time, pred_time = pred_time, gc_time = gc_time, perf = perf))
  
}


# Parallel Section

cat("[", format(Sys.time(), "%a %b %d %Y %X"), "]", " [Parallel] ", my_threads, " Process(es) Creation Time: ", sprintf("%04.03f", system.time({cl <- makeCluster(my_threads)})[[3]]), "s\n", sep = "")
cat("[", format(Sys.time(), "%a %b %d %Y %X"), "]", " [Parallel] Sending Hardware Specifications Time: ", sprintf("%04.03f", system.time({clusterExport(cl = cl, c("my_threads", "my_gpus", "my_threads_in_threads"))})[[3]]), "s\n", sep = "")
invisible(parallel::parLapply(cl = cl, X = seq_len(my_threads), function(x) {
  Sys.sleep(time = my_threads / 20) # Prevent file clash on many core systems (typically 50+ threads might attempt to read exactly at the same time the same file, especially if the disk is slow)
  suppressPackageStartupMessages(library(xgboost))
  suppressPackageStartupMessages(library(Matrix))
  suppressPackageStartupMessages(library(data.table))
  id <<- x
}))
cat("[", format(Sys.time(), "%a %b %d %Y %X"), "]", " [Parallel] Sending Data Time: ", sprintf("%04.03f", system.time({clusterExport(cl = cl, c("trainer", "metric", "X_train", "X_test", "labels_train", "labels_test", "my_threads"))})[[3]]), "s\n", sep = "")

# Having issues? In a CLI: sudo pkill R
time_finish <- system.time({
  time_all <- parallel::parLapplyLB(cl = cl, X = seq_len(my_runs), function(x) {
    
    if (my_gpus == 0L) {
      gpus_to_use <- 0
      gpus_allowed <- 0
    } else {
      gpus_to_use <- (id - 1) %% my_gpus
      gpus_allowed <- 1
    }
    
    speed_out <- system.time({
      speed_in <- trainer(x = x,
                          row_sampling = 0.9,
                          col_sampling = 0.9,
                          max_depth = 6,
                          n_iter = 500,
                          learning_rate = 0.05,
                          nbins = 255,
                          nthread = my_threads_in_threads,
                          n_gpus = gpus_allowed,
                          gpu_choice = gpus_to_use,
                          objective = "binary:logistic")
    })[[3]]
    
    rm(gpus_to_use)
    
    return(list(total = speed_out, matrix_train_time = speed_in$matrix_train_time, matrix_test_time = speed_in$matrix_test_time, model_time = speed_in$model_time, pred_time = speed_in$pred_time, gc_time = speed_in$gc_time, perf = speed_in$perf))
    
  })
})[[3]]

# Clearup all R sessions from this process, except the master
stopCluster(cl)
closeAllConnections()

rm(cl, metric, trainer, X_train, X_test, labels_train, labels_test); invisible(gc(verbose = FALSE))

cat("[", format(Sys.time(), "%a %b %d %Y %X"), "]", " [Parallel] Total Time: ", sprintf("%04.03f", time_finish), "s\n", sep = "")


# Gather Data

# Get data
time_total <- unlist(lapply(time_all, function(x) {round(x[[1]], digits = 3)}))
matrix_train_time <- unlist(lapply(time_all, function(x) {round(x[[2]], digits = 3)}))
matrix_test_time <- unlist(lapply(time_all, function(x) {round(x[[3]], digits = 3)}))
model_time <- unlist(lapply(time_all, function(x) {round(x[[4]], digits = 3)}))
pred_time <- unlist(lapply(time_all, function(x) {round(x[[5]], digits = 3)}))
gc_time <- unlist(lapply(time_all, function(x) {round(x[[6]], digits = 3)}))
perf <- unlist(lapply(time_all, function(x) {round(x[[7]], digits = 6)}))

# Put all data together
time_table <- data.table(Run = seq_len(my_runs),
                         time_total = time_total,
                         matrix_train_time = matrix_train_time,
                         matrix_test_time = matrix_test_time,
                         model_time = model_time,
                         pred_time = pred_time,
                         gc_time = gc_time,
                         perf = perf)

if (my_csv) {
  
  fwrite(time_table, paste0(my_output, "/ml-perf_xgb_gbdt_", substr(my_train, 1, nchar(my_train) - 4), "_", my_threads, "Tx", my_threads_in_threads, "T_", my_gpus, "GPU_", my_runs, "m_", sprintf("%04.03f", time_finish), "s.csv"))
  
}

# Analyze Data

if (my_chart != "none") {
  
  suppressMessages({
    library(ggplot2)
    library(ClusterR)
  })
  
  # Create time series matrix
  time_table_matrix <- apply(as.matrix(time_table[, 2:8, with = FALSE]), MARGIN = 2, function(x) {
    y <- cumsum(x)
    y / max(y)
  })
  
  # Compute optimal number of non-parametric clusters
  clusters <- Optimal_Clusters_Medoids(data = time_table_matrix,
                                       max_clusters = 2:10,
                                       distance_metric = "manhattan",
                                       criterion = "silhouette",
                                       threads = 1,
                                       swap_phase = TRUE,
                                       verbose = FALSE,
                                       plot_clusters = FALSE,
                                       seed = 1)
  
  # Compute clusters
  clusters_selected <- Cluster_Medoids(data = time_table_matrix,
                                       clusters = 1 + which.max(unlist(lapply(clusters, function(x) {x[[3]]}))),
                                       distance_metric = "manhattan",
                                       threads = 1,
                                       swap_phase = TRUE,
                                       verbose = FALSE,
                                       seed = 1)
  time_table[, Cluster := as.character(clusters_selected$clusters)]
  
  # Melt data
  time_table_vertical <- melt(time_table, id.vars = c("Run", "Cluster"), measure.vars = c("time_total", "matrix_train_time", "matrix_test_time", "model_time", "pred_time", "gc_time", "perf"), variable.name = "Variable", value.name = "Value", variable.factor = FALSE, value.factor = FALSE)
  
  # Rename melted variables to have details in chart
  time_table_vertical[Variable == "time_total", Variable := paste0("1. Total Time (Σ=", sprintf("%04.03f", sum(Value)), ", μ=", sprintf("%04.03f", mean(Value)), ", σ=", sprintf("%04.03f", sd(Value)), ")")]
  time_table_vertical[Variable == "model_time", Variable := paste0("2. Model Time (Σ=", sprintf("%04.03f", sum(Value)), ", μ=", sprintf("%04.03f", mean(Value)), ", σ=", sprintf("%04.03f", sd(Value)), ")")]
  time_table_vertical[Variable == "matrix_train_time", Variable := paste0("3. Matrix Train Build Time (Σ=", sprintf("%04.03f", sum(Value)), ", μ=", sprintf("%04.03f", mean(Value)), ", σ=", sprintf("%04.03f", sd(Value)), ")")]
  time_table_vertical[Variable == "matrix_test_time", Variable := paste0("4. Matrix Test Build Time (Σ=", sprintf("%04.03f", sum(Value)), ", μ=", sprintf("%04.03f", mean(Value)), ", σ=", sprintf("%04.03f", sd(Value)), ")")]
  time_table_vertical[Variable == "pred_time", Variable := paste0("5. Predict Time (Σ=", sprintf("%04.03f", sum(Value)), ", μ=", sprintf("%04.03f", mean(Value)), ", σ=", sprintf("%04.03f", sd(Value)), ")")]
  time_table_vertical[Variable == "gc_time", Variable := paste0("6. Garbage Collector Time (Σ=", sprintf("%04.03f", sum(Value)), ", μ=", sprintf("%04.03f", mean(Value)), ", σ=", sprintf("%04.03f", sd(Value)), ")")]
  time_table_vertical[Variable == "perf", Variable := paste0("7. Metric (Σ=", sprintf("%07.06f", sum(Value)), ", μ=", sprintf("%07.06f", mean(Value)), ", σ=", sprintf("%07.06f", sd(Value)), ")")]
  cat(sort(unique(time_table_vertical$Variable)), sep = "\n")
  
  # Plot a nice chart
  my_plot <- ggplot(data = time_table_vertical, aes(x = Run, y = Value, group = Cluster, color = Cluster)) + geom_point() + facet_wrap(facets = Variable ~ ., nrow = 4, ncol = 2, scales = "free_y") + labs(title = "'Performance' over Models", subtitle = paste0(my_runs, " Models over ", sprintf("%04.03f", time_finish), " seconds using ", my_threads, " threads and ", my_gpus, " GPUs (Throughput: ", sprintf("%04.03f", time_finish / my_runs), "s / Model", ")"), x = "Model", y = "Value or Time (s)") + theme_bw() + theme(legend.position = "none")
  ggsave(filename = paste0(my_output, "/ml-perf_xgb_gbdt_", substr(my_train, 1, nchar(my_train) - 4), "_", my_threads, "Tx", my_threads_in_threads, "T_", my_gpus, "GPU_", my_runs, "m_", sprintf("%04.03f", time_finish), "s.jpg"),
         plot = my_plot,
         device = my_chart,
         width = 24,
         height = 16,
         units = "cm",
         dpi = "print")
  
  if (interactive()) {
    print(my_plot)
  }
  
}

cat("[", format(Sys.time(), "%a %b %d %Y %X"), "] Done computations. Quitting R.\n", sep = "")

LightGBM in parallel: demo results (and with xgboost)

I just ran this: #5 (comment)

If you want to see the numbers, skip the conclusions below.

Conclusions for our scenario, CPU:

  • LightGBM CPU is about 1/3 faster than xgboost CPU
  • xgboost might use less RAM bandwidth than LightGBM, because with LightGBM hyperthreaded cores are not providing additional performance
  • LightGBM maintains correct efficiency for "small" data with too many threads, while xgboost literally takes forever to train a single model with a bad huge number of threads (50x slower, still probably way faster than Spark)
  • Multiprocessing scales nearly linearly (embarassingly/perfect parallel jobs), multithreading scaling is significantly poorer (way harder to parallelize)

Conclusions for our scenario, GPU:

  • LightGBM GPU is way faster than xgboost GPU (3x speed)
  • LightGBM GPU uses significantly less RAM than xgboost GPU (due to high dimensional sparse data), allowing more workers per GPU on LightGBM (up to 68 in LightGBM, only 4 in xgboost)
  • LightGBM uses few % GPU while xgboost was using 100% GPU (due to high dimensional sparse data)
  • Subcribing multiple jobs on GPU, when not the GPU is not fully busy, boosts the performance nearly linearly (note that there are parts of the timings which fully use CPU only (and forced singlethreaded), those are not easily compressible by adding "more threads")
  • More GPUs means more linear scaling
  • With LightGBM, you are likely to run out of cores/threads and more likely to hit R limits on communication sockets before you fully saturate the GPUs unless you use a mid-upper tier server (30+ physical cores) where you pack up many Quadro or Volta GPUs (you could fit 544 LightGBM jobs in a 32GB V100...)

General conclusion:

  • Don't speed up a specific computation to make it run as fast as possible... others will take a toll unless you know your critical path in the graph of the work thrown at the compute server
  • Throwing stuff in parallel (multiprocessing) while maintaining multithreading as low as possible (per process) yields the highest overall performance while yielding the worst single process performance - this seems counter intuitive but remember: "many lanes going slow will finish faster than one lane going fast"
  • Make sure you got enough RAM, otherwise your jobs will interrupt - this requires intensive testing to ensure RAM constraint is controlled appropriately

For information, I use the following hardware:

  • 2x Xeon Gold 6154 (36 cores / 72 threads, 3.7 GHz all turbo)
  • 4x 64GB RAM 2666 MHz (80 GBps)
  • 4x NVIDIA Quadro P1000

Baselines:

  • xgboost CPU : 1 CPU thread model throughput = (11.389 x 25 + 11.383 x 50) / 75 = 11.385s
  • xgboost GPU : 1 GPU thread model throughput = 20.441s
  • LightGBM CPU : 1 CPU thread model throughput = (6.539 x 100 + 6.502 x 250) / 350 = 6.513s
  • LightGBM GPU : 1 GPU thread model throughput = 6.769s

For reference:

Parallel threads = processes/threads used in parallel to run R (multiprocessing through sockets)
Model threads = threads used to run xgboost (multithreading)
Parallel GPUs = number of GPUs used in parallel processes/threads in R
Parallel GPU threads = number of processes running in a single GPU
Models = number of models to train in total
Seconds / Model = average throughput for 1 model, in seconds
Boost vs Baseline = your performance gain if you were to do the mentioned row vs doing only 1 CPU (or 1 GPU if GPU) process/thread for your model

LightGBM CPU:

Run Parallel Threads Model Threads Parallel GPUs GPU Threads Models Seconds / Model Boost vs Baseline
20 1 1 0 0 100 6.539 ~1x
21 9 1 0 0 250 0.760 8.57x
22 18 1 0 0 500 0.400 16.28x
23 35 1 0 0 1000 0.252 25.85x
24 70 1 0 0 2500 0.295 22.08x
25 1 1 0 0 250 6.502 ~1x
26 1 9 0 0 250 2.315 2.81x
27 1 18 0 0 250 2.269 2.87x
28 1 35 0 0 250 2.485 2.62x
29 1 70 0 0 250 3.051 2.13x

LightGBM GPU:

Run Parallel Threads Model Threads Parallel GPUs GPU Threads Models Seconds / Model Boost vs Baseline
1 1 1 1 1 50 6.769 ~1x
2 2 1 2 1 100 3.481 1.94x
3 3 1 3 1 250 2.354 2.88x
4 4 1 4 1 500 1.790 3.78x
5 4 1 1 4 100 2.166 3.13x
6 8 1 2 4 250 1.121 6.04x
7 12 1 3 4 500 0.772 8.77x
8 16 1 4 4 1000 0.586 11.55x
9 9 1 1 9 250 1.298 5.21x
10 18 1 2 9 500 0.709 9.55x
11 27 1 3 9 1000 0.496 13.65x
12 36 1 4 9 2500 0.400 16.92x
13 18 1 1 18 500 1.200 5.64x
14 36 1 2 18 1000 0.633 10.69x
15 54 1 3 18 2500 0.464 14.59x
16 72 1 4 18 5000 0.431 15.71x
17 35 1 1 35 1000 1.194 5.67x
18 35 1 2 35 2500 0.632 10.71x
19 58 1 1 58 2500 1.185 5.71x

I also refreshed xgboost hist results (re-ran them).

xgboost CPU:

Run Parallel Threads Model Threads Parallel GPUs GPU Threads Models Seconds / Model Boost vs Baseline
9 1 1 0 0 25 11.389 ~1x
10 9 1 0 0 50 1.456 7.82x
11 18 1 0 0 100 0.782 14.56x
12 35 1 0 0 250 0.489 23.28x
13 70 1 0 0 500 0.428 26.60x
14 1 1 0 0 50 11.383 ~1x
15 1 9 0 0 50 6.565 1.73x
16 1 18 0 0 50 6.481 1.76x
17 1 35 0 0 50 24.601 0.46x
18 1 70 0 0 50 165.947 0.07x

xgboost GPU:

Run Parallel Threads Model Threads Parallel GPUs GPU Threads Models Seconds / Model Boost vs Baseline
1 1 1 1 1 25 20.441 ~1x
2 2 1 2 1 50 10.639 1.92x
3 3 1 3 1 100 6.978 2.93x
4 4 1 4 1 250 5.176 3.95x
5 4 1 1 4 50 20.556 0.99x
6 8 1 2 4 100 10.501 1.95x
7 12 1 3 4 250 6.914 2.96x
8 16 1 4 4 500 5.295 3.86x

MPI jobs in R: performance and behavior

For information, in R you can run MPI jobs if you hit the 125 communication socket limit.

However, the work is spread differently, it must be taken into account. The work could be more realistic using MPI, but its performance for sharing data is another story.

MPI chart behavior:

image

To use MPI, in R, replace the following, make sure to add the MPI type to the cluster spawn:

makeCluster(my_threads, type = "MPI")

BIG WARNING: You should close the MPI cluster with Rmpi::mpi.exit(), otherwise it crashes (it could still crash with Rmpi::mpi.exit()). I would recommend to use that command only at the very end (don't use stopCluster(cl)), and to check afterwards if there are any zombie R processes left in the wild.

Due to how MPI and R socket spreads the work at the beginning, the timings are entirely different from 1 to 7 (except the "[Parallel"] part which is creating processes and sending data to each process). MPI spawns processes faster than R sockets, however it is faster to communicate to R socket processes than MPI processes. The huge difference in sending data might be also because we are using serialization v3 in R, which is the default now in R 3.6.

MPI:

[Parallel] 58 Process(es) Creation Time: 1.102s
[Parallel] Sending Hardware Specifications Time: 24.290s
[Parallel] Sending Data Time: 59.740s
1. Total Time (Σ=6518.746, μ=61.498, σ=9.142)
2. Model Time (Σ=5917.713, μ=55.827, σ=8.112)
3. Matrix Train Build Time (Σ=58.603, μ=0.553, σ=0.113)
4. Matrix Test Build Time (Σ=57.090, μ=0.539, σ=0.115)
5. Predict Time (Σ=357.093, μ=3.369, σ=1.331)
6. Garbage Collector Time (Σ=15.051, μ=0.142, σ=0.040)
7. Metric (Σ=77.781147, μ=0.733784, σ=0.000353)

R socket:

[Parallel] 58 Process(es) Creation Time: 8.146s
[Parallel] Sending Hardware Specifications Time: 0.007s
[Parallel] Sending Data Time: 1.428s
1. Total Time (Σ=6774.940, μ=63.915, σ=20.778)
2. Model Time (Σ=6126.667, μ=57.799, σ=18.843)
3. Matrix Train Build Time (Σ=52.500, μ=0.495, σ=0.154)
4. Matrix Test Build Time (Σ=48.795, μ=0.460, σ=0.110)
5. Predict Time (Σ=320.306, μ=3.022, σ=1.262)
6. Garbage Collector Time (Σ=14.658, μ=0.138, σ=0.048)
7. Metric (Σ=77.781523, μ=0.733788, σ=0.000356)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.