Giter Club home page Giter Club logo

wise-workers's Introduction

wise-workers test

A worker thread pool for CPU-bound tasks. It requires no configuration and has many powerful features:

  • Generator and AsyncGenerator function support
    • Worker functions can be generator functions or async generator functions, making it easy to stream results back to the main thread. Iteration happens eagerly, to maximize parallelism (i.e., the main thread does not pause the generator function).
  • Functions (callbacks) as arguments
    • Functions can be passed to worker tasks. They become async functions in the worker thread, using MessagePort for communication under the hood.
  • Cancellation support
  • Automatic thread management
    • Crashed threads are automatically respawned, unless they're crashing during startup (to prevent an infinite spawn loop).
  • Zero-copy data transfer
    • Data can be efficiently moved between threads without copying it. This is beneficial to performance when passing large Buffers between threads.

Installation

npm install wise-workers

Requires Node.js v16.x.x or later.

Usage

const ThreadPool = require('wise-workers');

const pool = new ThreadPool({ filename: require.resolve('./worker') });

const result = await pool.call('add', 2, 2); // => 4

worker.js

exports.add = (a, b) => a + b;

Example: Zero-copy

const ThreadPool = require('wise-workers');

const pool = new ThreadPool({ filename: require.resolve('./worker') });

const data = Buffer.alloc(1024 * 1024);

// pool.invoke() allows you to provide more options than pool.call()
const compressedData = await pool.invoke('compress', {
    args: [data],
    transferList: [data.buffer], // Pass the ArrayBuffer in the transferList
});

worker.js

const zlib = require('zlib');
const { move } = require('wise-workers');

exports.compress = (data) => {
    const compressedData = zlib.gzipSync(data);

    // Use move() to include a transferList in the return value.
    return move(compressedData, [compressedData.buffer]);
};

Example: Generator function

When calling a generator function, you will get an async iterable object.

const ThreadPool = require('wise-workers');

const pool = new ThreadPool({ filename: require.resolve('./worker') });

const asyncIterable = await pool.call('readFile', 'data.csv');

for await (const chunk of asyncIterable) {
    console.log(`got chunk of size ${chunk.byteLength} bytes`);
}

worker.js

const fs = require('fs');
const { move } = require('wise-workers');

exports.readFile = function* (filename, chunkSize = 1024 * 16) {
    const fd = fs.openSync(filename);
    try {
        while (true) {
            const buffer = Buffer.alloc(chunkSize);
            const bytesRead = fs.readSync(fd, buffer, 0, chunkSize);
            if (bytesRead > 0) {
                const chunk = buffer.subarray(0, bytesRead);
                // You can move() yielded values too
                yield move(chunk, [chunk.buffer]);
            }
            if (bytesRead < chunkSize) {
                break;
            }
        }
    } finally {
        fs.closeSync(fd);
    }
};

Example: Callback function

You an pass callback functions to the worker, but they must be in the top-level arguments (they can't be nested within some other object). Callback functions can also be async functions.

const ThreadPool = require('wise-workers');

const pool = new ThreadPool({ filename: require.resolve('./worker') });

const allowedList = new Set(getHugeDataset());
const result = await pool.call('search', searchTerm, (value) => {
    return allowedList.has(value);
});

worker.js

exports.search = async (searchTerm, filter) => {
    const matches = [];
    for (const match of searchFor(searchTerm)) {
        if (await filter(match)) {
            matches.push(match);
        }
    }
    return matches;
};

Currently, callback functions do not support "zero-copy" data transfer in their arguments or return values. This restriction may be lifted in the future.

Example: AbortSignal (cancellation)

Calling controller.abort() will forcefully terminate the thread that's assigned to the associated task.

const ThreadPool = require('wise-workers');

const pool = new ThreadPool({ filename: require.resolve('./worker') });

const controller = new AbortController();
setTimeout(() => {
    controller.abort();
}, 1000);

await pool.invoke('infiniteLoop', {
    signal: controller.signal,
});

worker.js

exports.infiniteLoop = () => {
    while (true) {}
};

Forcefulling aborting a thread is not a cheap operation, so it should only be used for exceptional/rare situations. For more common situations where performance is critical, you can use util.transferableAbortSignal() to implement your own co-operative cancellation logic.

API

new ThreadPool(options)

Creates a new thread pool. The following options are supported:

  • filename (string, required)
    • The absolute path to the worker script or module. Both CommonJS and ESM modules are supported.
  • minThreads (number, optional)
    • The minimum number of worker threads to keep in the pool. By default, this is equal to half the number of physical CPUs on the machine.
  • maxThreads (number, optional)
    • The maximum number of worker threads to keep in the pool. By default, this is equal to the number of physical CPUs on the machine.
  • The following options are passed directly to new Worker() under the hood:
    • execArgv
    • argv
    • env
    • workerData
    • resourceLimits
    • trackUnmanagedFds
    • name

ThreadPool is an EventEmitter. It emits the following events:

  • error: This occurs if a worker crashes unexpectedly. If this occurs while a worker is starting up, the ThreadPool will be destroyed.
  • online:n: Where n is an integer, these events occur when the number of online threads changes (i.e., when a new thread comes online or when an existing thread goes offline).

pool.call(methodName, [...args]) -> promise

Invokes a function exported by a worker thread. Even if the worker's function is synchronous, this method always returns a Promise.

The args can contain any value that is supported by the HTML structured clone algorithm. Additionally, functions may be passed within the top-level arguments (i.e., not nested within some other object), in which case they appear as async functions in the worker thread.

If the worker method is a generator or async generator function, the returned promise will be resolved with an async iterable object.

pool.invoke(methodName, [options]) -> promise

This is the same as pool.call(), except it supports more options:

  • args (Array, optional)
    • The arguments to pass to the worker function.
  • transferList (Array, optional)
    • A list of transferable objects within args that should be moved, rather than copied to the worker thread ("zero-copy").
  • signal (AbortSignal, optional)
    • An AbortSignal that, when aborted, will forcefully stop this task. If the signal is aborted after the task completes, nothing happens.

pool.destroy([error]) -> promise

Destroys the thread pool, cancelling any pending tasks and permanently terminating all threads. After being destroyed, the thread pool is no longer usable.

If an error object is provided, all pending tasks will be rejected with it. Otherwise, a default error is used.

The returned promise resolves when all threads have finished shutting down.

ThreadPool properties

  • pool.filename: The filename of the worker script being used.
  • pool.threadCount: The number of threads currently spawned within the pool.
  • pool.onlineThreadCount: The number of threads which are fully online, which means they have completed their initialization/startup and are capable of handling tasks.
  • pool.activeThreadCount: The number of threads which are busy with a pending task.
  • pool.pendingTaskCount: The number of pending tasks yet to be resolved.
  • pool.destroyed: Whether or not the thread pool is destroyed (boolean).

Static properties

  • ThreadPool.PHYSICAL_CORES: The number of physical CPU cores detected on the machine. This number is used to calculate defaults for minThreads and maxThreads when constructing a ThreadPool.

License

MIT

wise-workers's People

Contributors

joshuawise avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.