Giter Club home page Giter Club logo

clustermq's Introduction

Hi there 👋

I'm Michael, a computational biologist. This means that I'm using computers and code to analyze biological data, often in collaboration with experimental scientists. I'm working on how cancer cells overcome their growth limitations and how we can use that to help patients. Here you can find some of the work I did, and more general purpose tools I wrote for anyone to use. Please reach out if you want to collaborate!

This is what I do for research 👨‍🔬 (full list 🔗)
            article                     code         
Developed a reliable method for estimating cell signaling pathways from gene expression Nat Comm
Cell
github :octocat:
bioc 📦
Showed how gene coexpression networks often reflect cell mixtures instead of regulation BBA-GRM github :octocat:
Found a way how cancer cells can tolerate abnormal DNA content (aneuploidy, chromosomal instability) and a potential treatment, in collaboration with experimental scientists bioRxiv
Nature
transposon :octocat:
cgas_ko :octocat:
Working on estimating DNA copy number of single-cell RNA sequencing coming soon
Here are some of my open source contributions 🔠 (full list 🔗)
            status                     code         
clustermq R package for efficient high performance computing
Bioinformatics downloads
CRAN version
Build Status
github :octocat:
cran 📦
testing ⚙️
narray R package for simplifying array operations
downloads
CRAN version
Build Status
github :octocat:
cran 📦
ebits R bioinformatics toolkit incubator and data API Build Status ebits :octocat:
data :octocat:
Software build scripts for the ArchLinux User Repository 🔗 and as Gentoo overlay 🔗 pkgcheck pkgbuilds :octocat:
overlay :octocat:

clustermq's People

Contributors

ashiklom avatar barzine avatar brendanf avatar gabora avatar jeroen avatar kaneplusplus avatar klmr avatar marcosci avatar mattwarkentin avatar mfansler avatar michaelmayer2 avatar mschubert avatar nickholway avatar phdyellow avatar strazto avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

clustermq's Issues

Add schedulers

Add more schedulers so the package is more generally usable.

This will likely be done upon request.

  • local processing (#13)
  • multi-core
  • SGE
  • PBS
  • Torque
  • SLURM

SSH proxy does not distribute data yet

For now everything is transferred via the master.

This is bad for low-bandwidth SSH connections. - There, the data should be transferred to the proxy only once and then requested from there

Allow integration with dplyr

It should be straightforward to have an already constructed data_frame as an input to Q analogous to the map_* functions:

models = processed %>%
    group_by(PROTEIN) %>%
    tidyr::nest() %>%
    mutate(mod = data %>% map(do_model)) # Q(do_model)

What this needs is

  • a version of Q() that takes a data.frame
  • optionally, some parameter mapping analogous to ggplot2's aes()

run asynchronously

Thanks for this package! It's very easy to use.

I'd like to ask if it's possible to run a job asynchronously, without waiting for the results.

For example, when I run:

job <- Q(fx, x=1:3, n_jobs=1)

I get:

Submitting 1 worker jobs for 3 function calls (ID: 6642) ...
  |======================================================================| 100%
Running calculations (1 calls/chunk) ...
  |                                                                      |   0%
  |======================================================================| 100%
Master: [19.0s 0.0% CPU]; Worker average: [11.7% CPU]

While this message is being printed, I'm unable to continue executing commands. I have to wait for the submitted job to complete before I can continue working.

Is it possible to let R wait in the background, so I can continue working?

Find a better solution for common_data distribution

common_data is often orders of magnitude bigger than the iterated arguments.

Now, the master distributes common_data to all worker nodes, which blocks the distribution of iterated arguments and can cause timeouts on the worker (set from 10 min to 1 h in 3c4acc1).

A separate socket on the master for data distribution would need another forward on SSH, so that's probably not the best option.

Instead, what about if

  • each worker initially keeps a copy of the serialized data
  • creates a socket to serve this data (either before accepting jobs or in a separate thread), sends master information on how to connect to it
  • master will send the next worker this address as redirect
  • worker will discard the serialized data

Report max memory of workers

Would be nice to be able to report the maximum amount of memory that workers used while performing their tasks.

But, as it turns out, this is almost impossible.

Best bet would be to rely on the monitoring the scheduler does and then query that using e.g. bhist.

Closures may cause serialization memory leaks

This bug supersedes #19 and #44.

Consider the following function with an attached environment (like in ebits/data_frame/wrap#L41):

function create_fun(...) {
    new_fun(x) x
    x = parent.frame()
    new_fun
}
fx = create_fun(x)

This will attache the parent.frame() in the function environment, which, if we perform other operations (like performing a Q call), will keep growing with each iteration. This is not a problem for the local R session, but serialization copies elements of environments.

This seems to be fixed with evaluating all function arguments to the closure (and maybe pryr::unenclose it) before serialization (like in ebits@9dae849).

Probably the same should be added for the serialization method in QSys, or at least a warning displayed if the supplied function carries and environment.

work_chunk's unlist() causes problems with matrix arguments

For the do.call() in work_chunk, we want a list with a matrix in there, not a list with a list with a matrix.

However, unlisting the one element returns a vector instead of the actual matrix.

unlist(list(mat=matrix(1:4,ncol=2)), recursive=FALSE)
# mat1 mat2 mat3 mat4
#    1    2    3    4

Need to check manually for is.list(x) and length(x) == 1.

Adaptive chunk size

The chunk_size heuristic seems to work reasonably well for most of the time a job runs, but is a bit inefficient just before they finish.

We can optimize this by decreasing the chunk_size when few jobs are left, so all workers finish more or less at the same time.

A possible version would be:

ceiling(n_calls - submit_index) / n_workers

Starting job with same group on different host might result in cross-killing

Starting a job with the same id/port on two different hosts but the same queuing system (here: LSF) may result in the cleanup() call killing the whole job group and hence jobs that it did not start.

Simple workaround: increase number of ports we sample from

Potential fix: add a second job group including the host name in the hierarchy

Caveat: this is hard (impossible?) to test.

Include functionality of `ulimit`

  • package includes R source code, so must be GPL
  • will require switching license to GPL
  • only compile it in on Linux (we only need it on the cluster anway)
  • supersedes #14

Consider reducing dependencies

As per @eddelbuettel's comment, consider reducing the number of packages clustermq depends on.

  • pryr is for checking memory of workers, may not need this if we're sure it's stable
  • tibble is used to create the iterated index df, could do without if I check factors and column/row names manually
  • purrr could be replaced by manually iterating through the index df (but this is in an inner loop, so check performance)
  • infuser template filling could replicated quite easily (but is it worth it?)

Main loop shuts down while jobs still running

For setting a timeout on the receive, I currently check

submit_index[1] <= n_calls

However, this will set a timeout if all jobs are submitted, but the last batch has not yet returned.

This should rather be:

(!shutdown && submit_index[1] <= n_calls) || length(jobs_running) > 0
  • Roll back master to before this check
  • Fix develop
  • Add test in develop
  • Merge to master

Pipe-friendly version of `Q`

It should be easy to use a data.frame as index for function calls processed by Q().

Hence, it should support Q(df, fun, ...) and

  • figure out which arguments are the function and which the data.frame
  • match arguments for exact matches and renaming in ... (tidy eval or pryr)

Extend tests

(add to the list below)

  • check if seed is numeric and has length 1
  • check that objects exported to .GlobalEnv on workers to not interfere with worker function
  • ssh_proxy should report error when forwarding fails
  • have timeout on workers (30 min?) to have them terminate if they don't hear from master
  • conversely, terminate if worker crashes (v0.8.0)

Provide convenient way to kill job group

For instance if the master crashes and does not tidy up.

This used to be

bkill -g /rzmq/<port>

but is now

bkill -g /rzmq/<host>/<port>

which is hard to find.

If I switch <host> and <port>, does bkill -g /rzmq/<port> 0 work?

Perform some kind of heartbeating for workers

If workers crash because they run out of memory (without ulimit protection) or call C code that is not caught by try(...), they just disappear and the master never completes.

Find an appropriate way to check if workers are up. Maybe combined this with PUSH/PULL sockets for work and REQ/REP for control (#30)

clustermq hangs after job failed

I am running the following job. It fails, but the Q call is hanging, and the interactive progress bar shows it as running:

rm_annotation_file = 'data/annotation/mm10-rmsk.txt'

gz_extract = function (file) {
    warning_to_error = function (expr)
        withCallingHandlers(expr, warning = function (w) stop(w))

    if (! grepl('\\.gz$', file))
        file = paste0(file, '.gz')

    warning_to_error(system(sprintf('gunzip %s', shQuote(file)), intern = TRUE))
}

cmq = modules::import_package('clustermq')
cmq$Q(gz_extract, file = rm_annotation_file, memory = 500, n_jobs = 1, log_worker = TRUE)

Here’s the relevant output in the log file:

rzmq7123-1.log
> clustermq:::worker("rzmq7123-1", "tcp://bc-32-1-04:7123", 500)
[1] "tcp://bc-32-1-04:7123"
[1] 500
WORKER_UP to: tcp://bc-32-1-04:7123
function (file) {
    warning_to_error = function (expr)
        withCallingHandlers(expr, warning = function (w) stop(w))

    if (! grepl('\\.gz$', file))
        file = paste0(file, '.gz')

    warning_to_error(system(sprintf('gunzip %s', shQuote(file)), intern = TRUE))
}
NULL
received: DO_CHUNK
gzip: data/annotation/mm10-rmsk.txt.gz: No such file or directory
Error: running command 'gunzip 'data/annotation/mm10-rmsk.txt.gz'' had status 1
Execution halted

------------------------------------------------------------
Sender: LSF System <lsfadmin@bc-31-2-08>
Subject: Job 3300290: <rzmq7123-1> in cluster <farm3> Exited

Job <rzmq7123-1> was submitted from host <bc-32-1-04> by user <kr15> in cluster <farm3>.
Job was executed on host(s) <bc-31-2-08>, in queue <normal>, as user <kr15> in cluster <farm3>.
</nfs/users/nfs_k/kr15> was used as the home directory.
</lustre/scratch115/realdata/mdt2/teams/miska/users/kr15/projects/time-series> was used as the working directory.
Started at Mon Feb 27 16:59:38 2017
Results reported on Mon Feb 27 17:00:02 2017

Your job looked like:

------------------------------------------------------------
# LSBATCH: User input
#BSUB-J rzmq7123-1                  # name of the job / array jobs
#BSUB-g /rzmq/7123       # group the job belongs to
#BSUB-o rzmq7123-1.log      # stdout + stderr
#BSUB-M 500             # Memory requirements in Mbytes
#BSUB-R rusage[mem=500] # Memory requirements in Mbytes
#BSUB-R select[mem>500]
#BSUB-R span[hosts=1]
#BSUB-q normal                          # name of the queue

R --no-save --no-restore -e \
    'clustermq:::worker("rzmq7123-1", "tcp://bc-32-1-04:7123", 500)'


------------------------------------------------------------

Exited with exit code 1.

Resource usage summary:

    CPU time :                                   0.62 sec.
    Total Requested Memory :                     500.00 MB
    Delta Memory :                               -

The output (if any) is above this job summary.

Support reusing of workers

Instead of starting workers with each call to Q, we should be able to set up a pool of workers and then perform multiple calls on it.

Syntax could be something like the following:

pool = create_workers(hpc_args)
Q(... hpc_args=pool)
stop_workers(pool)

where pool could be an instance of QSys

Iterated argument not found error

6ed048b introduced a bug where iterated arguments are not found when the iterated argument is a list.

as a workaround, use f9ae691:

# devtools::install_github("mschubert/clustermq", ref="f9ae6913c4dd62716110e3337c622b42dc8c2a1d")

make sure function call is separated from worker env

  • just run the function evaluation in new env and copy vars with each iteration? (should be by ref anyway, and this way would be copied if changed)
    • test this with functon that tries to change var in const args
  • make sure argument names like "seed" work

Warnings aren’t propagated to caller

I’m using clustermq to download large files via the R function utils::download.files. Unfortunately, that function reports presumably failed downloads as a warning. As this warning isn’t propagated to the caller, there’s no indication that downloads failed.

In addition, this happens extremely commonly — more commonly than I’d usually expect. I am not sure whether this is due to a quota limitation (though I cannot find any such quota on the website in question — ArrayExpress) or something to do with clustermq.

Here’s some code to reproduce the problem:

library(dplyr)

uri = 'https://www.ebi.ac.uk/arrayexpress/files/E-MTAB-2328/E-MTAB-2328.sdrf.txt'
samples = readr::read_tsv(uri) %>%
    select(Uri = `Comment[FASTQ_URI]`) %>%
    mutate(File = basename(Uri))

clustermq::Q(download.file, url = samples$Uri, destfile = sample$File,
             memory = 500, job_size = 1, log_worker = TRUE) %>% invisible()

This will download the (48) files without indicating an error. However, some of these files will be truncated. The rzmq*.log will contain something like the following lines:

⟩⟩⟩ grep -A2 'Warning message:' rzmq*.log
rzmq7111-46.log:Warning message:
rzmq7111-46.log-In (function (url, destfile, method, quiet = FALSE, mode = "w",  :
rzmq7111-46.log-  downloaded length 0 != reported length 2034745308

Worker timeout causes full valid list of results to be discarded

If a worker doesn't get new work from the master for 600 seconds, it shuts itself down.

The master, however, has a list of running workers and will loop forever until they all report back.

Add a timeout here on the master that just returns if we have all results anyway.

Documentation is unclear about remote SSH keys

The SSH "scheduler" is set up in a way that it relies on key authentication if possible, but can take passwords as well to log into the remote server.

Problem is, that the remote server sets a local forward to grant the network access to the remote forward. This is done using

ssh -g -L <local forward> localhost

This, however, requires key-based authentication that is not mentioned anywhere.

Options to fix this are:

  • write in the documentation that remote localhost ssh has to be passwordless (simple, but adds complexity to the setup)
  • use the remote forward to set up the second forward as well, reusing the agent and potentially catching the second password; I've got no idea if this works, but should try

Allow specifying arbitrary scheduler command line arguments in Q

Currently the scheduler is set up via a configuration file that’s loaded when the clustermq package is attached.

However, there are situations where additional options need to be specified in individual calls. For instance, I need to specify the -n option of the LSF scheduler, and I need to do so on a case-per-case basis.

Of course it would be possible to add this as another configurable option to the scheduler configuration, analogous to memory. But maybe a more flexible way would be to allow passing arbitrary arguments to the scheduler; for instance:

clustermq::Q(fun, args, scheduler_args = list('-n4', '"-Rspan[hosts=1]"')

Improve errors when template filling fails

  • this should raise an explicit error, not "job submission failed for unknown reason"
  • should also check if the worker is called with the right API (as this changes from 0.7)
  • submit_job in base qsys is really just template filling now, provide testable function

Guess cluster setup when no options set

Should be relatively simple and not really prone to errors

  • Check which binaries are available (bsub, qsub, sbatch)
  • Set up the scheduler accordingly
  • Use .onAttach instead of .onLoad to comply with CRAN

passing run time to bsub (-W)

Hi,
could you please include an interface to runtime (#BSUB-W )?
either through options
options(clustermq.runtime = ?)
or maybe through the Q function?

this way, we could overwrite the default 15 mins on our cluster.
thanks!

Solve SSH forwarding with ZeroMQ sockets instead of `ssh -g -L`

Currently, for the scheduler via SSH option to work, ssh_proxy needs to set up a local forward on the remote machine from that network to the local listening port.

This has multiple issues:

  • We do not monitor SSH forwards but rely on the system (and can't debug it effectively)
  • It needs ssh localhost to be available without password
  • It needs ssh -g to be available on the remote network

It would be more desirable to have the proxy provide ZeroMQ sockets to forward traffic so the only SSH forward required is the reverse tunneling.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.