Giter Club home page Giter Club logo

breejs / bree Goto Github PK

View Code? Open in Web Editor NEW
2.9K 10.0 76.0 1.36 MB

Bree is a Node.js and JavaScript job task scheduler with worker threads, cron, Date, and human syntax. Built for @ladjs, @forwardemail, @spamscanner, @cabinjs.

Home Page: https://jobscheduler.net

License: MIT License

JavaScript 98.09% HTML 1.42% Shell 0.15% TypeScript 0.34%
job scheduler node cron ms human simple workers sandboxed processes retries throttling concurrency crontab cronjob cancelable jobs

bree's Introduction

bree

build status code style styled with prettier made with lass license npm downloads

Bree is the best job scheduler for Node.js and JavaScript with cron, dates, ms, later, and human-friendly support.

Works in Node v12.17.0+, uses worker threads (Node.js) to spawn sandboxed processes, and supports async/await, retries, throttling, concurrency, and cancelable jobs with graceful shutdown. Simple, fast, and lightweight. Made for Forward Email and Lad.

Table of Contents

Foreword

Bree was created to give you fine-grained control with simplicity, and has built-in support for workers, sandboxed processes, graceful reloading, cron jobs, dates, human-friendly time representations, and much more.

We recommend you to query a persistent database in your jobs, to prevent specific operations from running more than once.

Bree does not force you to use an additional database layer of Redis or MongoDB to manage job state.

In doing so, you should manage boolean job states yourself using queries. For instance, if you have to send a welcome email to users, only send a welcome email to users that do not have a Date value set yet for welcome_email_sent_at.

Install

npm:

npm install bree

yarn:

yarn add bree

Upgrading

To see details about upgrading from the last major version, please see UPGRADING.md.

IMPORTANT: Bree v9.0.0 has several breaking changes, please see UPGRADING.md for more insight.

NOTE: Bree v6.5.0 is the last version to support Node v10 and browsers.

Usage and Examples

The example below assumes that you have a directory jobs in the root of the directory from which you run this example. For example, if the example below is at /path/to/script.js, then /path/to/jobs/ must also exist as a directory. If you wish to disable this feature, then pass root: false as an option.

Inside this jobs directory are individual scripts which are run using Workers per optional timeouts, and additionally, an optional interval or cron expression. The example below contains comments, which help to clarify how this works.

The option jobs passed to a new instance of Bree (as shown below) is an Array. It contains values which can either be a String (name of a job in the jobs directory, which is run on boot) OR it can be an Object with name, path, timeout, and interval properties. If you do not supply a path, then the path is created using the root directory (defaults to jobs) in combination with the name. If you do not supply values for timeout and/nor interval, then these values are defaulted to 0 (which is the default for both, see index.js for more insight into configurable default options).

We have also documented all Instance Options and Job Options in this README below. Be sure to read those sections so you have a complete understanding of how Bree works.

ECMAScript modules (ESM)

// app.mjs

import Bree from 'bree';

const bree = new Bree({
  // ... (see below) ...
});

// top-level await supported in Node v14.8+
await bree.start();

// ... (see below) ...

Please reference the #CommonJS example below for more insight and options.

CommonJS (CJS)

// app.js

const path = require('path');

// optional
const ms = require('ms');
const dayjs = require('dayjs');
const Graceful = require('@ladjs/graceful');
const Cabin = require('cabin');

// required
const Bree = require('bree');

//
// NOTE: see the "Instance Options" section below in this README
// for the complete list of options and their defaults
//
const bree = new Bree({
  //
  // NOTE: by default the `logger` is set to `console`
  // however we recommend you to use CabinJS as it
  // will automatically add application and worker metadata
  // to your log output, and also masks sensitive data for you
  // <https://cabinjs.com>
  //
  // NOTE: You can also pass `false` as `logger: false` to disable logging
  //
  logger: new Cabin(),

  //
  // NOTE: instead of passing this Array as an option
  // you can create a `./jobs/index.js` file, exporting
  // this exact same array as `module.exports = [ ... ]`
  // doing so will allow you to keep your job configuration and the jobs
  // themselves all in the same folder and very organized
  //
  // See the "Job Options" section below in this README
  // for the complete list of job options and configurations
  //
  jobs: [
    // runs `./jobs/foo.js` on start
    'foo',

    // runs `./jobs/foo-bar.js` on start
    {
      name: 'foo-bar'
    },

    // runs `./jobs/some-other-path.js` on start
    {
      name: 'beep',
      path: path.join(__dirname, 'jobs', 'some-other-path')
    },

    // runs `./jobs/worker-1.js` on the last day of the month
    {
      name: 'worker-1',
      interval: 'on the last day of the month'
    },

    // runs `./jobs/worker-2.js` every other day
    {
      name: 'worker-2',
      interval: 'every 2 days'
    },

    // runs `./jobs/worker-3.js` at 10:15am and 5:15pm every day except on Tuesday
    {
      name: 'worker-3',
      interval: 'at 10:15 am also at 5:15pm except on Tuesday'
    },

    // runs `./jobs/worker-4.js` at 10:15am every weekday
    {
      name: 'worker-4',
      cron: '15 10 ? * *',
      cronValidate: {
        override: {
          useBlankDay: true
        }
      }
    },

    // runs `./jobs/worker-5.js` on after 10 minutes have elapsed
    {
      name: 'worker-5',
      timeout: '10m'
    },

    // runs `./jobs/worker-6.js` after 1 minute and every 5 minutes thereafter
    {
      name: 'worker-6',
      timeout: '1m',
      interval: '5m'
      // this is unnecessary but shows you can pass a Number (ms)
      // interval: ms('5m')
    },

    // runs `./jobs/worker-7.js` after 3 days and 4 hours
    {
      name: 'worker-7',
      // this example uses `human-interval` parsing
      timeout: '3 days and 4 hours'
    },

    // runs `./jobs/worker-8.js` at midnight (once)
    {
      name: 'worker-8',
      timeout: 'at 12:00 am'
    },

    // runs `./jobs/worker-9.js` every day at midnight
    {
      name: 'worker-9',
      interval: 'at 12:00 am'
    },

    // runs `./jobs/worker-10.js` at midnight on the 1st of every month
    {
      name: 'worker-10',
      cron: '0 0 1 * *'
    },

    // runs `./jobs/worker-11.js` at midnight on the last day of month
    {
      name: 'worker-11',
      cron: '0 0 L * *',
      cronValidate: {
        useLastDayOfMonth: true
      }
    },

    // runs `./jobs/worker-12.js` at a specific Date (e.g. in 3 days)
    {
      name: 'worker-12',
      // <https://github.com/iamkun/dayjs>
      date: dayjs().add(3, 'days').toDate()
      // you can also use momentjs
      // <https://momentjs.com/>
      // date: moment('1/1/20', 'M/D/YY').toDate()
      // you can pass Date instances (if it's in the past it will not get run)
      // date: new Date()
    },

    // runs `./jobs/worker-13.js` on start and every 2 minutes
    {
      name: 'worker-13',
      interval: '2m'
    },

    // runs `./jobs/worker-14.js` on start with custom `new Worker` options (see below)
    {
      name: 'worker-14',
      // <https://nodejs.org/api/worker_threads.html#worker_threads_new_worker_filename_options>
      worker: {
        workerData: {
          foo: 'bar',
          beep: 'boop'
        }
      }
    },

    // runs `./jobs/worker-15.js` **NOT** on start, but every 2 minutes
    {
      name: 'worker-15',
      timeout: false, // <-- specify `false` here to prevent default timeout (e.g. on start)
      interval: '2m'
    },

    // runs `./jobs/worker-16.js` on January 1st, 2022
    // and at midnight on the 1st of every month thereafter
    {
      name: 'worker-16',
      date: dayjs('1-1-2022', 'M-D-YYYY').toDate(),
      cron: '0 0 1 * *'
    }
  ]
});

// handle graceful reloads, pm2 support, and events like SIGHUP, SIGINT, etc.
const graceful = new Graceful({ brees: [bree] });
graceful.listen();

// start all jobs (this is the equivalent of reloading a crontab):
(async () => {
  await bree.start();
})();

/*
// start only a specific job:
(async () => {
  await bree.start('foo');
})();

// stop all jobs
bree.stop();

// stop only a specific job:
bree.stop('beep');

// run all jobs (this does not abide by timeout/interval/cron and spawns workers immediately)
bree.run();

// run a specific job (...)
bree.run('beep');

(async () => {
  // add a job array after initialization:
  const added = await bree.add(['boop']); // will return array of added jobs
  // this must then be started using one of the above methods

  // add a job after initialization:
  await bree.add('boop');
  // this must then be started using one of the above methods
})();


// remove a job after initialization:
bree.remove('boop');
*/

For more examples - including setting up bree with TypeScript, ESModules, and implementing an Email Queue, see the examples folder.

For a more complete demo using express see: Bree Express Demo

Instance Options

Here is the full list of options and their defaults. See src/index.js for more insight if necessary.

Property Type Default Value Description
logger Object console This is the default logger. We recommend using Cabin instead of using console as your default logger. Set this value to false to disable logging entirely (uses noop function)
root String path.resolve('jobs') Resolves a jobs folder relative to where the project is ran (the directory you call node in). Set this value to false to prevent requiring a root directory of jobs (e.g. if your jobs are not all in one directory). Set this to path.join(__dirname, 'jobs') to keep your jobs directory relative to the file where Bree is set up.
silenceRootCheckError Boolean false Silences errors from requiring the root folder. Set this to false if you do not want to see errors from this operation
doRootCheck Boolean true Attempts to require the root directory, when jobs is empty or null. Set this to false to prevent requiring the root directory
removeCompleted Boolean false Removes job upon completion. Set this to true in order to remove jobs from the array upon completion.
timeout Number 0 Default timeout for jobs (e.g. a value of 0 means that jobs will start on boot by default unless a job has a property of timeout or interval defined. Set this to false if you do not wish for a default value to be set for jobs. This value does not apply to jobs with a property of date.
interval Number 0 Default interval for jobs (e.g. a value of 0 means that there is no interval, and a value greater than zero indicates a default interval will be set with this value). This value does not apply to jobs with a property of cron.
jobs Array [] Defaults to an empty Array, but if the root directory has a index.js file, then it will be used. This allows you to keep your jobs and job definition index in the same place. See Job Options below, and Usage and Examples above for more insight.
hasSeconds Boolean false This value is passed to later for parsing jobs, and can be overridden on a per job basis. See later cron parsing documentation for more insight. Note that setting this to true will automatically set cronValidate defaults to have { preset: 'default', override: { useSeconds: true } }
cronValidate Object {} This value is passed to cron-validate for validation of cron expressions. See the cron-validate documentation for more insight.
closeWorkerAfterMs Number 0 If you set a value greater than 0 here, then it will terminate workers after this specified time (in milliseconds). As of v6.0.0, workers now terminate after they have been signaled as "online" (as opposed to previous versions which did not take this into account and started the timer when jobs were initially "run"). By default there is no termination done, and jobs can run for infinite periods of time.
defaultRootIndex String index.js This value should be the file name inside of the root directory option (if you pass a root directory or use the default root String value (and your index file name is different than index.js).
defaultExtension String js This value can either be js or mjs. The default is js, and is the default extension added to jobs that are simply defined with a name and without a path. For example, if you define a job test, then it will look for /path/to/root/test.js as the file used for workers.
acceptedExtensions Array ['.js', '.mjs'] This defines all of the accepted extensions for file validation and job creation. Please note if you add to this list you must override the createWorker function to properly handle the new file types.
worker Object {} These are default options to pass when creating a new Worker instance. See the Worker class documentation for more insight.
outputWorkerMetadata Boolean false By default worker metadata is not passed to the second Object argument of logger. However if you set this to true, then logger will be invoked internally with two arguments (e.g. logger.info('...', { worker: ... })). This worker property contains isMainThread (Boolean), resourceLimits (Object), and threadId (String) properties; all of which correspond to Workers metadata. This can be overridden on a per job basis.
errorHandler Function null Set this function to receive a callback when an error is encountered during worker execution (e.g. throws an exception) or when it exits with non-zero code (e.g. process.exit(1)). The callback receives two parameters error and workerMetadata. Important note, when this callback is present default error logging will not be executed.
workerMessageHandler Function null Set this function to receive a callback when a worker sends a message through parentPort.postMessage. The callback receives at least two parameters name (of the worker) and message (coming from postMessage), if outputWorkerMetadata is enabled additional metadata will be sent to this handler.

Job Options

See Interval, Timeout, Date, and Cron Validate below for more insight besides this table:

Property Type Description
name String The name of the job. This should match the base file path (e.g. foo if foo.js is located at /path/to/jobs/foo.js) unless path option is specified. A value of index, index.js, and index.mjs are reserved values and cannot be used here.
path String The path of the job or function used for spawning a new Worker with. If not specified, then it defaults to the value for name plus the default file extension specified under Instance Options.
timeout Number, Object, String, or Boolean Sets the duration in milliseconds before the job starts (it overrides the default inherited timeout as set in Instance Options. A value of 0 indicates it will start immediately. This value can be a Number, String, or a Boolean of false (which indicates it will NOT inherit the default timeout from Instance Options). See Job Interval and Timeout Values below for more insight into how this value is parsed.
interval Number, Object, or String Sets the duration in milliseconds for the job to repeat itself, otherwise known as its interval (it overrides the default inherited interval as set in Instance Options). A value of 0 indicates it will not repeat and there will be no interval. If the value is greater than 0 then this value will be used as the interval. See Job Interval and Timeout Values below for more insight into how this value is parsed.
date Date This must be a valid JavaScript Date (we use instance of Date for comparison). If this value is in the past, then it is not run when jobs are started (or run manually). We recommend using dayjs for creating this date, and then formatting it using the toDate() method (e.g. dayjs().add('3, 'days').toDate()). You could also use moment or any other JavaScript date library, as long as you convert the value to a Date instance here.
cron String A cron expression to use as the job's interval, which is validated against cron-validate and parsed by later.
hasSeconds Boolean Overrides the Instance Options hasSeconds property if set. Note that setting this to true will automatically set cronValidate defaults to have { preset: 'default', override: { useSeconds: true } }
cronValidate Object Overrides the Instance Options cronValidate property if set.
closeWorkerAfterMs Number Overrides the Instance Options closeWorkerAfterMs property if set.
worker Object Overrides the Instance Options worker property if set.
outputWorkerMetadata Boolean Overrides the Instance Options outputWorkerMetadata property if set.

Job Interval and Timeout Values

These values can include Number, Object, and String variable types:

  • Number values indicates the number of milliseconds for the timeout or interval
  • Object values must be a later schedule object value (e.g. later.parse.cron('15 10 * * ? *')))
  • String values can be either a later, human-interval, or ms String values (e.g. later supports Strings such as every 5 mins, human-interval supports Strings such as 3 days and 4 hours, and ms supports Strings such as 4h for four hours)

Listening for events

Bree extends from EventEmitter and emits two events:

  • worker created with an argument of name
  • worker deleted with an argument of name

If you'd like to know when your workers are created (or deleted), you can do so through this example:

bree.on('worker created', (name) => {
  console.log('worker created', name);
  console.log(bree.workers.get(name));
});

bree.on('worker deleted', (name) => {
  console.log('worker deleted', name);
  console.log(!bree.worker.has(name));
});

Custom error/message handling

If you'd like to override default behavior for worker error/message handling, provide a callback function as errorHandler or workerMessageHandler parameter when creating a Bree instance.

NOTE: Any console.log calls, from within the worker, will not be sent to stdout/stderr until the main thread is available. Furthermore, any console.log calls, from within the worker, will not be sent if the process is terminated before the message is printed. You should use parentPort.postMessage() alongside errorHandler or workerMessageHandler to print to stdout/stderr during worker execution. This is a known bug for workers.

An example use-case. If you want to call an external service to record an error (like Honeybadger, Sentry, etc.) along with logging the error internally. You can do so with:

const logger = ('../path/to/logger');
const errorService = ('../path/to/error-service');

new Bree({
  jobs: [
    {
      name: 'job that sometimes throws errors',
      path: jobFunction
    }
  ],
  errorHandler: (error, workerMetadata) => {
    // workerMetadata will be populated with extended worker information only if
    // Bree instance is initialized with parameter `workerMetadata: true`
    if (workerMetadata.threadId) {
      logger.info(`There was an error while running a worker ${workerMetadata.name} with thread ID: ${workerMetadata.threadId}`)
    } else {
      logger.info(`There was an error while running a worker ${workerMetadata.name}`)
    }

    logger.error(error);
    errorService.captureException(error);
  }
});

Cancellation, Retries, Stalled Jobs, and Graceful Reloading

We recommend that you listen for "cancel" event in your worker paths. Doing so will allow you to handle graceful cancellation of jobs. For example, you could use p-cancelable

Here's a quick example of how to do that (e.g. ./jobs/some-worker.js):

// <https://nodejs.org/api/worker_threads.html>
const { parentPort } = require('worker_threads');

// ...

function cancel() {
  // do cleanup here
  // (if you're using @ladjs/graceful, the max time this can run by default is 5s)

  // send a message to the parent that we're ready to terminate
  // (you could do `process.exit(0)` or `process.exit(1)` instead if desired
  // but this is a bit of a cleaner approach for worker termination
  if (parentPort) parentPort.postMessage('cancelled');
  else process.exit(0);
}

if (parentPort)
  parentPort.once('message', message => {
    if (message === 'cancel') return cancel();
  });

If you'd like jobs to retry, simply wrap your usage of promises with p-retry.

We leave it up to you to have as much fine-grained control as you wish.

See @ladjs/graceful for more insight into how this package works.

Interval, Timeout, Date, and Cron Validation

If you need help writing cron expressions, you can reference crontab.guru.

We support later, human-interval, or ms String values for both timeout and interval.

If you pass a cron property, then it is validated against cron-validate.

You can pass a Date as the date property, but you cannot combine both date and timeout.

If you do pass a Date, then it is only run if it is in the future.

See Job Interval and Timeout Values above for more insight.

Writing jobs with Promises and async-await

If jobs are running with Node pre-v14.8.0, which enables top-level async-await support, here is the working alternative:

const { parentPort } = require('worker_threads');

const delay = require('delay');
const ms = require('ms');

(async () => {
  // wait for a promise to finish
  await delay(ms('10s'));

  // signal to parent that the job is done
  if (parentPort) parentPort.postMessage('done');
  else process.exit(0);
})();

Callbacks, Done, and Completion States

To close out the worker and signal that it is done, you can simply parentPort.postMessage('done'); and/or process.exit(0).

While writing your jobs (which will run in worker threads), you should do one of the following:

  • Signal to the main thread that the process has completed by sending a "done" message (per the example above in Writing jobs with Promises and async-await)
  • Exit the process if there is NOT an error with code 0 (e.g. process.exit(0);)
  • Throw an error if an error occurs (this will bubble up to the worker event error listener and terminate it)
  • Exit the process if there IS an error with code 1 (e.g. process.exit(1))

Long-running jobs

If a job is already running, a new worker thread will not be spawned, instead logger.error will be invoked with an error message (no error will be thrown, don't worry). This is to prevent bad practices from being used. If you need something to be run more than one time, then make the job itself run the task multiple times. This approach gives you more fine-grained control.

By default, workers run indefinitely and are not closed until they exit (e.g. via process.exit(0) or process.exit(1), OR send to the parent port a "close" message, which will subsequently call worker.close() to close the worker thread.

If you wish to specify a maximum time (in milliseconds) that a worker can run, then pass closeWorkerAfterMs (Number) either as a default option when creating a new Bree() instance (e.g. new Bree({ closeWorkerAfterMs: ms('10s') })) or on a per-job configuration, e.g. { name: 'beep', closeWorkerAfterMs: ms('5m') }.

As of v6.0.0 when you pass closeWorkerAfterMs, the timer will start once the worker is signaled as "online" (as opposed to previous versions which did not take this into account).

Complex timeouts and intervals

Since we use later, you can pass an instance of later.parse.recur, later.parse.cron, or later.parse.text as the timeout or interval property values (e.g. if you need to construct something manually).

You can also use dayjs to construct dates (e.g. from now or a certain date) to millisecond differences using dayjs().diff(new Date(), 'milliseconds'). You would then pass that returned Number value as timeout or interval as needed.

Custom Worker Options

You can pass a default worker configuration object as new Bree({ worker: { ... } });.

These options are passed to the options argument when we internally invoke new Worker(path, options).

Additionally, you can pass custom worker options on a per-job basis through a worker property Object on the job definition.

See complete documentation for options (but you usually don't have to modify these).

Using functions for jobs

It is highly recommended to use files instead of functions. However, sometimes it is necessary to use functions.

You can pass a function to be run as a job:

new Bree({ jobs: [someFunction] });

(or)

new Bree({
  jobs: [
    {
      name: 'job with function',
      path: someFunction
    }
  ]
});

The function will be run as if it's in its own file, therefore no variables or dependencies will be shared from the local context by default.

You should be able to pass data via worker.workerData (see Custom Worker Options).

Note that you cannot pass a built-in nor bound function.

Typescript and Usage with Bundlers

When working with a bundler or a tool that transpiles your code in some form or another, we recommend that your bundler is set up in a way that transforms both your application code and your jobs. Because your jobs are in their own files and are run in their own separate threads, they will not be part of your applications dependency graph and need to be setup as their own entry points. You need to ensure you have configured your tool to bundle your jobs into a jobs folder and keep them properly relative to your entry point folder.

We recommend setting the root instance options to path.join(__dirname,'jobs') so that bree searches for your jobs folder relative to the file being ran. (by default it searches for jobs relative to where node is invoked). We recommend treating each job as an entry point and running all jobs through the same transformations as your app code.

After an example transformation - you should expect the output in your dist folder to look like:

- dist
  |-jobs
    |-job.js
  |-index.js

For some example TypeScript set ups - see the examples folder.

For another alternative also see the @breejs/ts-worker plugin.

Concurrency

We recommend using the following packages in your workers for handling concurrency:

Plugins

Plugins can be added to Bree using a similar method to Day.js

To add a plugin use the following method:

Bree.extend(plugin, options);

Available Plugins

Creating plugins for Bree

Plugins should be a function that recieves an options object and the Bree class:

  const plugin = (options, Bree) => {
    /* plugin logic */
  };

Real-world usage

More detailed examples can be found in Forward Email, Lad, and Ghost.

Contributors

Name Website
Nick Baugh http://niftylettuce.com/
shadowgate15 https://github.com/shadowgate15

License

MIT ยฉ Nick Baugh

#

bree's People

Contributors

climba03003 avatar cronid avatar danibram avatar dynamitec avatar flyingpumba avatar knicola avatar mat813 avatar mikevalstar avatar naz avatar niftylettuce avatar nsylke avatar olliechick avatar r00b avatar revadike avatar shadowgate15 avatar snrmwg avatar spence-s avatar thewebdevel avatar tipiwiny avatar titanism avatar zanechua avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

bree's Issues

Interval parsing issue

Specifying a job interval of every 10 seconds as opposed to 10s results in the following very confusing error:

    this[kPort].postMessage({
                ^

DOMException [DataCloneError]: function(d) {
        return getInstances("next", 1, d, d) !== later.NEVER;
      } could not be cloned.
    at new Worker (internal/worker.js:200:17)
    at Bree.run (/Users/cedgington/repos/personal/temp/express-example/node_modules/bree/index.js:573:28)

You can see this by simply using the example project (as I did) and modify only the interval.

How to use with multiple instances?

Hi, Bree looks very promising. Thanks for the hard work!

I have hacked together a mail queue which checks for unsent mails every x seconds and sends them out. My problem is: I'm running more than one instance of the application and use Postgres locks to make sure no mail is sent twice (only one instance has access to the queue at the same time).

Is there any way to solve this with Bree? Or would I still need to use locking?

[feat] Allow forced thread termination after timeout period

Problem

Graceful thread termination is currently relying on correct handling of 'cancel' event inside of the job, which allows for hanging thread termination if job was incorrectly implemented.

Solution

To make thread termination bullet-proof bree could introduce a grace period after which thread would be terminated no matter what. Termination period could be parameterized through property like terminateWorkerAfterMs (similar to existing closeWorkerAfterMs). Would also make sense to have a a smart default like 60s (value taken from thin air and needs discussion/research) to make sure threads are always terminated after certain time.

[feat] worker thread pool and other performance optimizations

Was looking into how resource intensive worker creation is through bree. I did not come across concrete numbers of the overhead associated with worker_thread instantiation. Based on the reference below it should not be considered as a "cheap" operation to run.

From Node.js worker threads documentation:

use a pool of Workers instead for these kinds of tasks. Otherwise, the overhead of creating Workers would likely exceed their benefit.
When implementing a worker pool, use the AsyncResource API to inform diagnostic tools (e.g. in order to provide asynchronous stack traces) about the correlation between tasks and their outcomes. See "Using AsyncResource for a Worker thread pool" in the async_hooks documentation for an example implementation.

(linked Worker thread pool implementation)

Was wondering if there are concrete performance metrics available illustrating an overhead associated with worker thread creation? Is it planned to utilize worker thread pool pattern (there's a Pool implementation available in bthreads)

Another idea I was contemplating was around optimizing non-CPU intensive jobs. Would it make sense to bypass worker creation and run such jobs in the main thread without breaking out of event loop? What's the threshold which would determine if it's more resource efficient to run the task on the main thread or offload it to a worker thread?

[test] cover node's internal 'messageerror' event with tests

Problem

Test coverage is turned off for messageerror events with a comment to fix it:

// NOTE: you cannot catch messageerror since it is a Node internal
// (if anyone has any idea how to catch this in tests let us know)

Research

Have started a discussion in node repository around best ways to handle such cases - nodejs/node#36333 (comment). The best advice to tackle it so far has been:

I would currently recommend just to emit the events directly

Next

Simulating these events directly as suggested would be a next thing to try here.

Note, opening up an issue here so it's easier to track progress and any information that comes up on this topic in the future.

[dicsussion] merges vs rebases

Is there a preference on how PRs/changes from branches are merged into master?

I have slight preference towards keeping the master as clean as possible so would prefer clean rebase or squash+rebase combination. Can check an example project history using this pattern in Ghost - https://github.com/TryGhost/Ghost/commits/master.

Don't mind keeping the current pattern as well. Just wanted to have this written down somewhere for clarity :)

[feat] Option to execute jobs on the same event loop "inline"

Problem

Executing jobs on a separate thread through worker_thread or in a separate process comes with a price of additional memory allocation and time taken to spawn up a thread/process. Because of these constraints it's sometimes more efficient to run jobs in the same event loop. For example, a job which has few non blocking (async) operations and needs to be executed on specific schedule.

Solution

Introduce an option to job configuration to execute it as an "inline" function. Proposing an option name: offloaded, which would have to be set to false to run in the same even loop.

The solution would need to look into restrictions that would come with running such functions.

@niftylettuce @shadowgate15 would love to hear your opinions on this topic and hear your thoughts about the new option naming.

Better directory resolution for workers

Hi there. This is a great little scheduling library that is much easier to use out of the box compared to some of the alternatives.

One thing that has been a bit of an itch is directory resolution. For apps like express, I don't want to have all my jobs sitting at <repo>/jobs by default; I'd like them to be somewhere in <repo>/src/jobs or <repo>/src/path/to/services/jobs or something. I can avoid <repo>/jobs by setting root: false on the instance, but then I need to individually set the full path of each worker in the jobs array. I also can't use a relative path like ./jobs/worker that is require-esque, I need to do something like '${__dirname}/jobs/test.js', which is just a bit ugly in my opinion and produces repeated code unless I pull out '${__dirname}/jobs/' out into a variable somewhere.

Feature (PR welcome): Use functions for a job

As I understood the docs the only way to define a job is to have a .js file.
Is it possible or planned in the future, that we can simply pass a function for each job?

Our use-case: we use ncc to compile our node app to a single javascript file, so we cannot have separate .js files for jobs

[perf] get rid of syncronous fs.statSync calls during job validation

Problem

Synchronous calls are blocking the even loop leading to performance degradation:

The synchronous form blocks the Node.js event loop and further JavaScript execution until the operation is complete.
(reference)

Current state

The codebase uses fs.statSync calls in Bree constructor and during job validation.

Possible solutions

There are two different parts to the problem (1) the constructor call and (2) validateJob with all it's callers (constructor and add method).

(1) Solving constructor sync calls could be approached in couple ways:

  1. Refactor constructor into 2 separate phases - general initialization and job specific initialization. Breaking down constructor into job initalization phase would allow having async job initialization logic in separate method which can be called post construction. Use could look something like this:
const bree = new Bree({...nonJobOptions});
bree.addJobs(jobOptionsArray, optionalRootPath);

Main downside of this approach is not being able to construct Bree instance in one call, and makes use of root parameter somewhat ambiguous.
2. Refactor constructor into async function. Clients could use it like this:

const bree = await new Bree({...options});

I'm not sure this even works, only checked briefly and that's what came up on SO. The downside of this approach is tricky use of super() when using async self invoking functions inside the constructor:

super note: If you need to use super, you cannot call it within the async callback.
(reference)

(2) As for validateJob method, I don't see a good reason not to convert it into an async function with async fs.stat call inside. Needs solving the constructor problem first as it's using validateJob internally.

Note, synchronous calls are problematic but not critical. It only effects initialization code which is rarely called. Nevertheless I think it's useful to track and solve it.

@niftylettuce @shadowgate15 would love to hear your opinion on this.

[feat] custom error handler

Problem

Allow custom error handling when an error is produced by the worker.

Current state

At the moment when worker throws an unhandled exception it should bubble up to bree to handle it. When bree picks up an 'error' event it logs the error.

Proposed improvement

Allow providing custom error handler which would be called when errors happen in bree allowing to modify default behavior.

Alternative or additional solution would be extending the events bree emits with a new 'error' event.

An example use-case: giving the library client a possibility to send error reports to an external error tracking service like Honeybadger, Airbrake, Rollbar, BugSnag, Sentry, Exceptiontrap, Raygun, etc.

References

  • sidekiq - allows to configure handler
  • resque - same as sidekiq
  • agenda - no specific pattern the library only partially logs some of the errors and seems like leaves handling to the job
  • bull - not documented specifically but allows to register handlers for failed jobs

Using typescript files

Just a tip for anyone wanting to use typescript files instead of javascript. Make sure your project is already setup for ts-node.

Setup the job like this:

{ 
    name: 'Typescript Worker', 
    path: typescript_worker, 
    interval: 'every 10 seconds', 
    worker: { workerData: { __filename: './src/job_specific_filename_worker.ts' } } 
}

With the typescript_worker function like this:

function typescript_worker() {
    const path = require('path')
    require('ts-node').register()
    const workerData = require('worker_threads').workerData
    require(path.resolve(__dirname, workerData.__filename))
}

[feat] browser support

Right now browser support is unstable due to these issues:

I previously had modified index.html with a basic example such as this:

  <script src="https://unpkg.com/bree">
  <script>
    (function() {
      function hello() {
        console.log('hello');
        postMessage('done');
      }

      var bree = new Bree({
        jobs: [
          {
            name: 'hello',
            path: hello,
            interval: '5s',
          }
        ]
      });

      bree.start();
    })();
  </script>

However since postMessage is not working, I have it commented out right now.

Note that I do expose threads from bthreads as Bree.threads, therefore one could write a browser test (e.g. with puppeteer that uses Bree.threads.backend === 'web_worker' and Bree.threads.browser === true to test that the proper backends load.

Rare one off jobs & dynamic scheduled jobs

I was really excited to see this library pop up - it's awesome to see something using native worker threads and not requiring redis/mongo/some other store. But was then a bit confused by the configuration method when it came to trying it out.

I have use cases for different types of jobs:

  1. recurring jobs that operate like a cron
  2. one off, super rare, but long-running tasks that might never be run in the lifetime of the process
  3. jobs that are generated dynamically and need to be run at a specific time
  • 1 seems to be the main use case for Bree, but in my case the use case is smallest
  • 2 is the my main use case - I can sort of see how I might manage it by not calling bree.start() and only calling bree.run('task') if the long running task is needed, but that feels like I'm not using the tool properly
  • 3 is a nice to have - already have code doing this, but unless I'm missing something there's no way to achieve it with Bree - except approximation with a cron running every minute to check

Given the rareness of the one-off jobs, it's a shame to have to declare them upfront, rather than being able to add them if and when they show up - otherwise there's overhead for no good reason.

With type 3 specifically, I find it odd that Bree has support for setting an exact date when a job should run, but that can only be set on instantiation?

I guess I'm looking for bree.add(({jobConfig}) or bree.run({jobConfig}) - or am I massively missing something?

Cron interval doesnt execute functional job

When using a cron string, the job does not execute at the specified time, 6:20pm every day. It does however, execute on start. Not sure exactly why but...theres that lol.

function test () { console.log('yay') }

const bree = new Bree({
  root: false,
  jobs: [
    {
      name: 'snapshot',
      path: test,
      cron: '20 18 */1 * *'
    }
  ]
});

bree.start()

[docs] Example integration with external persistance layer

Problem

There's a need to track, discuss, and document best approaches around adding jobs persistence layer on top of bree. This issue is a focused continuation of this comment:

An example in the README for using sockets, or redis pubsub to communicate with Bree to add new jobs (?)

and another comment :

I would recommend keeping dynamic jobs actually stored in a queue, with a persistent database, and then have a job that runs every so often to flush the queue (with limited concurrency).

References

How job persistence is approached in other job processing/queuing libraries:

Current use of bree along with MongoDB:

  • forwardemail.net - here jobs are generated based on state of records in the database
  • TBD: expand this list with more examples for future reference

Expected outcome

What I think would be the ideal outcome for this issue is documentation an example implementation of persisted jobs queue on top of bree. Ideally the example would be backend agnostic so that multiple storage approaches - NoSQL, SQL or key/value stores like Redis, could utilize it.

Maybe bree could even provide an adapter/plugin system to optionally add persistence for (1) scheduled one-off jobs and (2) failed jobs ๐Ÿค”

process crashes when worker with timeout exits before timeout

Setting closeWorkerAfterMs causes for the worker to be terminated after the specified time. However, if the worker completes and exits normally before closeWorkerAfterMs milliseconds, the timeout created upon worker init is not cleared. This causes a TypeError where this.workers[name] is undefined and thus this.workers[name].terminate cannot be read or called.

See #14

[bug] bthreads require is needed for proper error handling in worker scripts

Problem

When the worker script is defined without any dependencies it does not emit an 'error' event described in node documentation.

Reproduction steps

Register a job in with bree that points to following script:

// contents of uncaught-exception.js file
(async () => {
    throw new Error('catch me if you can');
})();

when the job executes observe the log:

  1. There is an uncaught error in the log (node:37046) UnhandledPromiseRejectionWarning: Error: catch me if you can
  2. Bree logs the worker exit as if it was successful: Worker for job "uncaught-exception" exited with code 0

The expected behavior would be no unhandled exception in the log and exit code 1.

Quick fix

To fix the situation load bthreads module with the job script, e.g.:

// contents of uncaught-exception-corrected.js file
require('bthreads');

(async () => {
    throw new Error('catch me if you can');
})();

Solution

I think it's worth documenting need to include bthreads in each job script and advice against using native worker_threads module as that causes unexpected behavior with unhandled exceptions within job scripts.

[feat] Dynamic jobs

Hi,
is there a way to create a dynamic set of jobs? How can I pass custom parameters to a job?
I was wondering if a similar approach to the following one can be used...

const currencies = ['USD', 'EUR','GBP'];
const currencyJob = currency => () => {
// job
console.log(currency)
};

const jobs = currencies.map(currency => ({task: currencyJob(currency), interval: '2m'}));
const bree = new Bree({jobs});

// start...

Thanks

Using functions for jobs

Hello,

I have red a part of your document indicated:

It is highly recommended to use files instead of functions. However, sometimes it is necessary to use functions.

And we really need to use functions now.
Once we initiate a function to be executed along with cron:

Screen Shot 2020-11-27 at 10 46 32 PM

We get this error:
Screen Shot 2020-11-27 at 10 45 38 PM

Would you please assist us with this?

Human-readable "every *" style intervals are still broken

This should produce two jobs that run every 2 seconds. The job with the interval "every 2 seconds" only runs once. I was using bree 3.0.1 for testing.

const Bree = require('bree')

const taskman = new Bree({
    jobs: [
        { name: 'Worker 1', path: worker_1, interval: '2s' },
        { name: 'Worker 2', path: worker_2, interval: 'every 2 seconds' },
    ]
})

function worker_1() {
    console.log('worker_1 working ...')
}

function worker_2() {
    console.log('worker_2 working ...')
}

taskman.start()

Using cron

Can somebody give me a simple example, im trying to use bree with a cron, but it just exit and nothing happens

const Bree = require('bree');

const bree = new Bree({
    jobs: [{
        name: 'test1',
        cron : '* * * * *'
    }]

});

bree.start();

job/test1

const { parentPort } = require("worker_threads");

console.log("working")
if (parentPort) parentPort.postMessage("done");
else process.exit(0);

Question about queues

We want to switch from our own solution to a better queue/scheduler (because ours doesn't scale great). We looked at Bull but the unanswered/unfixed issues aren't a good sign. So, we're considering Bree.

Besides simple queues, cron-like and repeated jobs, we also have queues that start processing either after a certain time has passed or after a certain number of jobs have queued up. One use case is bulk inserts into the database. Is this easily achievable with Bree?

Typescript missing

Hi I appreciate that you made this awesome library! But what I am missing little bit is TypeScript support. I possible in future add this extension?

Failed to stop bree

{"level":30,"time":1604236413590,"pid":7466,"hostname":"ip-10-0-0-21","scope":["bree"],"msg":"Worker for job \"get_lastest_from_rss\" exited with code 0"}
Stopping bree...
bree.service: State 'stop-final-sigterm' timed out. Killing.
Stopped bree.
bree.service: Unit entered failed state.
bree.service: Failed with result 'timeout'.
Stopped bree.
Started bree.
``
I saw these lines in the log.
It was created my script connect to postgresql forever.

[bug] interval jobs overlaping with initial job.run() execution

Problem

When interval job is started unnecessary Job "${name}" is already running error is logged.

Context

When an interval job is added to bree it runs itself immediately upon start. This does not take into account that job execution could take some time (for sake of example let's say 5 seconds). Because of the nature of schedules produced by later they execute on "whole" time intervals instead of taking into account exact time of execution+interval.

Example

Schedule a job that executes every 10 seconds with a job that takes roughly 5 seconds to execute:

const bree = new Bree({...config});
const job = { 
    interval: 'every 10 seconds' 
    name: 'run-five-seconds', 
    path: '/path/to/jobs/run-five-seconds.js', 
    outputWorkerMetadata: true
};
bree.add(breeJob);
bree.start('run-five-seconds');

The schedules produced with every 10 seconds interval is:

> job.interval.schedules[0].s
(6) [0, 10, 20, 30, 40, 50]

Meaning the job will run at 0 seconds, 10 seconds, 20 seconds ... etc

If the bree.start('run-five-seconds'); runs at second 6 it still will be executing when the first interval hits because: 6s + 5s ( execution time) = 11s which runs over first interval at second 10

Possible approaches

Options:

  1. Disallow immediate run() for interval jobs and let them run when their interval comes
  2. Skip first interval run
  3. Other ideas?

Imo, option 1 is the way to go as I don't think its ever an expectation for a job to run right away when it's given a specific schedule (for reference that's how crontab executes cron jobs). @niftylettuce wdyt?

[feat] user interface

Hello, this library looks great :) . I'am currently using agenda which provides a UI that helps to monitor jobs , retry them etc... Does Bree provides a User Interface like that ? What is the best practice or recommendation you have to connect a UI with this job scheduler ?

Functional job always running on start, although interval is given

Regardless of the interval, the task is run on start. I dont think i missed an option to skip running on start.

function test () { console.log('yay') }

const bree = new Bree({
  root: false,
  jobs: [
    {
      name: 'snapshot',
      path: test,
      interval: '10s'
    }
  ]
});

bree.start()

Using the same file for different jobs

I need to create a number of jobs dynamically from the same file using different entry parameters, however the following won't work since the name of the job is the same as the name of the worker (which needs to be unique).

Since I want to execute the same job for a number of times (a hundred or so), creating a file for each one doesn't seem like a valid solution.

const Bree = require('bree');

const params = [1, 2, 3, 4];
var myJobs = [];

params.forEach((param) => {
  myJobs.push({
    name: 'test-worker', // execute the file "test-worker.js" with different params
    interval: param,
    worker: {
      workerData: param,
    },
  });
});

const bree = new Bree({
  jobs: myJobs,
  errorHandler: (error, workerMetaData) => {
    console.log(error);
  },
});

bree.start();

However, this doesn't work which is to be expected since the name is the same for every job.

OUTPUT:

Error: Job #1 has a duplicate job name of test-worker
Error: Job #2 has a duplicate job name of test-worker
...

Is there a method to use the same file for different jobs? It feels like it does but I'm missing something.

[question] workerData parameter shape

The problem I'm facing is a confusing (from job developer perspective) use of the data (workerData) passed into the worker. After looking closer into how workerData parameter is constructed, I have a proposal on how to structure it so it's more intuitive for the job developer.

Let's consider an example bree job initialization with a value passed in:

// bree client configuring a job with passed in data

const job = {
    cron:'0/5 * * * * *',
    name:'job-with-data',
    path:'/path/to/job-using-data.js',
    worker:{
      workerData: 42
    }
};

bree.add(job);
bree.start('job-with-data');
// job-using-data.js file content

const {workerData} = require('worker_threads');

(async () => {
    const data = workerData.job.worker.workerData;
    console.log(data); // prints '42'
})();

To get access to the data inside of the job file job developer has have knowledge about quite complicated bree's internal object structure workerData.job.worker.workerData. This structure doesn't match 1:1 to the parameter passed during the job initialization. As a developer writing a job, I would expect to have access to same data structure as if I initialized a job through native Worker constructor:

new Worker('/path/to/job-using-data.js', {workerData: 42});
// job-using-data.js file content

const {workerData} = require('worker_threads');

(async () => {
    const data = workerData;
    console.log(data); // prints '42'
})();

I don't see a good reason why bree job would need to have all the meta information about itself accessible from within the script. I think it's unnecessary for the job to know about it's name, interval or any other bree's internal configurations. It should be up to bree's client to use these configurations and the job itself should execute in self contained manner, with as little knowledge as possible. @niftylettuce what are your thoughts? Could you please clarify the usecase for passing in job metadata into workerData parameter?

Proposed change

I think initializing worker in following way would make reading workerData inside of jobs most intuitive:

const workerData = job.worker && job.worker.workerData ? job.worker.workerData : undefined;
const object = {
  ...(this.config.worker ? this.config.worker : {}),
  ...(job.worker ? job.worker : {}),
  workerData
};

this.workers[name] = new threads.Worker(job.path, object);

Interval at specific time and cron not working

Environment:

  • Windows Server 2012
  • Node.js 12.18.4
  • Task is managed by PM2

I'm using bree now, however, only interval: '30s' works. Neither interval: 'at 09:28 am' nor cron: '18 9 * * *' works. I'm using the following code:

const Bree = require('bree');

const bree = new Bree({
  jobs: [
    {
      name: 'daily-report',
      interval: '30s', // works
      // interval: 'at 09:28 am', // not working
      // cron: '18 9 * * *', // not working
    },
  ]
});

bree.start();

EMFILE: too many open files

I'm running a job every second that instantiates a class, like so

const RedisService = require('../../services/redis-service');
...
const redis = new RedisService()

This job runs for awhile and then eventually goes into a fail loop with the error

EMFILE: too many open files

and points to ...services/redis-service.js as the open that exceeded whatever ulimit -n must be on my machine. I know that there are packages like graceful-fs that might be able to solve this problem, but is this just because I'm using Bree incorrectly? Is there a proper way to close the files that a job opens when it is run by Bree?

error when use with pino log

in jobs/test.js:

export {}
require('dotenv').config()
const pino = require('pino')
let logger = pino({
  customLevels: {
    critical: 70
  }
})
process.on('uncaughtException', pino.final(logger, (err, finalLogger) => {
  finalLogger.error(err, 'uncaughtException')
  process.exit(1)
}))
  

err: Error: final requires a stream that has a flushSync method, such as pino.destination and pino.extreme

I think the issue was related to this:
pinojs/pino#761

same job file different para meters

hello is is possbile to have

jobs:[{ name: "jobA", interval: 15, workerData:{ test: "taska" }, name: "jobA", interval: 15, workerData:{ test: "taskb" }, }]

Some question about using this library

Hi, this library seems quite attractive to me. I would like to ask some questions.
Assuming I have a microservice node.js app to queue jobs from http request. input looks like:

{
     "id": "1234",
     "fireAt" : "15xxxxxxxxxx",
     "name" : "job_name" // refer to job_name.js
     "data" : {"someJSONdata": "data" }
}

And I have 3 instances (A,B,C) of this service in a cluster which also contains Redis and MongoDB.

  1. If instance A queued a job, but before it being fired, instance A somehow crashed. How to recover the job?
    I read that you suggest to query databases in worker.js to prevent duplicated jobs. But how about recovering?
  2. I saw you have an example of worker which connect to MongoDB inside it. Is it a good practice to connect to database every time the job being fired? Is there a worker thread pools to reuse threads (and maybe database connections)?
  3. In my use case, jobs could be canceled frequently. Maybe only 20% jobs fired. How to implement this case? The id in input fields is used to overwrite the current queued job. For example:
    a) Got id:"1234" job with 2s delay to be fired. Enqueue it.
    b) 1s later, got id:"1234" job with 10s delay to be fired. Cancel the job in a), enqueue the new job.
    c) 10s later, fire id:"1234" job queued in b)

Thanks for you patience.

Error: Job "email" is already running

Hey,
After some day using and running the job, when I try to run my dev environment node bree.js
I get Error: Job "email" is already running, the strange thing is that a I kill all node process and I reestart my machine and looks like the worker is start running when start my pc, any idea how to fix that?

Error: Job "email" is already running at Bree.run (C:\...\node_modules\bree\index.js:571:11)

Passing data

Hello, is there a way currently to pass data on to worker threads?

Dynamically set `workerData`

This is more of a question and not an issue--not sure where might be the best place to post. I can configure a worker with a workerData hash so that the scheduled job can have access to some seed data when it runs. However, is there a way to dynamically set this workerData hash on each run? Say I have a job that runs every 5 minutes, and every time it runs, I want to repopulate workerData with some data from another class in my project fetched via a getter, for example. Is that possible?

BTHREADS_BACKEND=child_process SIGINT handling

For my specific use case in a node app, I'd like jobs created as processes (lots of IO and long running). Experimenting with the examples, I've come up with the parent / child code as per below. The problem is I cannot get the exception handling to work properly without a "sleep" after it calls bree.stop(). With it, it works as expected. My naive understanding is that signal handlers should not be async in nature. Is there a better way? BTW, same problem exists if I use Graceful.

Main "dispatch" app 

const path = require('path');
 
// optional
const ms = require('ms');
const dayjs = require('dayjs');
const Graceful = require('@ladjs/graceful');
const Cabin = require('cabin');
const later = require('@breejs/later');
 
// required
const Bree = require('bree');
const sleep = require('util').promisify(setTimeout);

process.on('SIGINT',  () => {
    myExit('SIGINT');
});
process.on('SIGQUIT',  () => {
    myExit('SIGQUIT');
});
process.on('SIGTERM',  () => {
    myExit('SIGTERM');
});
async function myExit(name) {
    console.log(`\mBREE Parent Ending ${name}`);
    bree.stop();
    //removing sleep here now does not allow child to close properly (at least no logging is seen but process is closed)
    await sleep(500);
    process.exit(0);
}

console.log(`BREE starting with PID:${process.pid}`);

//
// NOTE: see the "Instance Options" section below in this README
// for the complete list of options and their defaults
//
const bree = new Bree({
  //
  // NOTE: by default the `logger` is set to `console`
  // however we recommend you to use CabinJS as it
  // will automatically add application and worker metadata
  // to your log output, and also masks sensitive data for you
  // <https://cabinjs.com>
  //
  // logger: new Cabin(),
 
  //
  // NOTE: instead of passing this Array as an option
  // you can create a `./jobs/index.js` file, exporting
  // this exact same array as `module.exports = [ ... ]`
  // doing so will allow you to keep your job configuration and the jobs
  // themselves all in the same folder and very organized
  //
  // See the "Job Options" section below in this README
  // for the complete list of job options and configurations
  //
  jobs: [
    // runs `./jobs/foo.js` on start
    // 'job-async',
//    'job-async',

    //'job-compute'
    
    // runs `./jobs/worker-5.js` on after 10 minutes have elapsed
    {   
      name: 'job-async-copy',
      interval: 5000,//later.parse.text('every 5 seconds'),
      worker: {
          workerData: { name: 'joe' }
      }

    },
    
  ]
});
 
// handle graceful reloads, pm2 support, and events like SIGHUP, SIGINT, etc.
// const graceful = new Graceful({ brees: [bree] });
// graceful.listen();
 
// start all jobs (this is the equivalent of reloading a crontab):
bree.start();

//bree.stop('job-async');

/*
// start only a specific job:
bree.start('foo');
 
// stop all jobs
bree.stop();
 
// stop only a specific job:
bree.stop('beep');
 
// run all jobs (this does not abide by timeout/interval/cron and spawns workers immediately)
bree.run();
 
// run a specific job (...)
bree.run('beep');
 
// add a job array after initialization:
bree.add(['boop']);
// this must then be started using one of the above methods
 
// add a job after initialization:
bree.add('boop');
// this must then be started using one of the above methods
 
// remove a job after initialization:
bree.remove('boop');
*/

bree.on('worker deleted', (name) => {
    console.log('worker deleted', name);
    // if (name === 'job-async' && !signal) 
    //     bree.run(name);
  });

bree.on('worker created', (name) => {
    console.log(`new worker ${name}`);
});  
Job

const threads = require('bthreads');
const sleep = require('util').promisify(setTimeout);

let signal = false;

process.on('SIGINT',  () => {
    myExit('SIGINT');
});
process.on('SIGQUIT',  () => {
    myExit('SIGQUIT');
});
process.on('SIGTERM',  () => {
    myExit('SIGTERM');
});

function myExit(name) {
    console.log(`ASYNC received ${name}`);
    signal = true;

    // if (threads.parentPort) {
    //     threads.parentPort.postMessage('cancel');
    //     console.log(`Cancel sent`);
    // } else 
    //     console.log(`Exiting...`);
}
if (!threads.isMainThread) {
    threads.parentPort.on('message', async (message) => {
        console.log(`Received message from parent [${message}]`);
    //    await sleep(300);
        if (message === 'cancel') {
            signal = true;
        }   
    });
}

if (threads.isMainThread) {
    console.log(`ASYNC on main threads`);
    
} else {
    console.log(`ASYNC on worker thread`);

}

// if (threads.parentPort) {
//     console.log(`Adding .... event receiver`);
//     threads.parentPort.once('message', async (message) => {
//     });
// }


let a = 1;
let name = 'll'; //threads.hasOwnProperty('workerData.name')  ? "UNKNOWN" : threads.workerData.name
console.log(`async-COPY starting with PID:${process.pid} and name ${name}`);
async function x() {
    for (;;) {
        a++;
        await sleep(300);
        console.log(`COPY loop ${a}`);
        // if (a==5) 
        //     threads.parentPort.postMessage('error');
        if (a>10 || signal) {
            // if (threads.parentPort) {
            //     threads.parentPort.postMessage('COPY exit');
            // }
            if (signal) {
                console.log(`COPY Exiting due to signal...`);
                if (!threads.isMainThread) {
                    console.log(`Sending to Parent...`);
                    threads.parentPort.postMessage('cancelled');
                }
            }
            process.exit(0);
        }
    }
}

x();

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.