Giter Club home page Giter Club logo

php-task-scheduler's Introduction

Clustered process management for PHP

Build Status Scrutinizer Code Quality Code Coverage Latest Stable Version GitHub release GitHub license

Parallel task scheduler for PHP using MongoDB as distribution queue. Execute parallel tasks the easy way. This library has built-in support for clustered systems and multi core cpu. You can start up multiple worker nodes and they will load balance the available jobs with the principal first comes first serves. Each node will also spawn a (dynamically) configurable number of child processes to use all available resources. Moreover it is possible to schedule jobs at certain times, endless intervals as well as rescheduling if jobs fail. This brings a real world implementation for parallel process management to PHP. You are also able to sync child tasks and much more nice stuff.

Features

  • Parallel tasks
  • Cluster support
  • Multi core support
  • Load balancing
  • Failover
  • Scalable
  • Sync tasks between each other
  • Abort running tasks
  • Timeout jobs
  • Retry and intervals
  • Schedule tasks at specific times
  • Signal management
  • Intercept events
  • Progress support
  • Auto detection of orphaned jobs

v4

This is the documentation for the current major version v4. You may check the upgrade guide if you want to upgrade from v3 or even an older version. The documentation for v3 is available here.

Table of Contents

Why?

PHP isn't a multithreaded language and neither can it handle (most) tasks asynchronous. Sure there is pthreads and pcntl but those are only usable in cli mode (or should only be used there). Using this library you are able to write taks which can be executed in parallel by the same or any other system.

How does it work (The short way please)?

A job is scheduled via a task scheduler and gets written into a central message queue (MongoDB). All Queue nodes will get notified in (soft) realtime that a new job is available. The queue node will forward the job via a internal systemv message queue to the worker manager. The worker manager decides if a new worker needs to be spawned. At last, one worker will execute the task according the principle first come first serves. If no free slots are available the job will wait in the queue and get executed as soon as there is a free slot. A job may be rescheduled if it failed. There are lots of more features available, continue reading.

Requirements

  • Posix system (Basically every linux)
  • MongoDB server >= 3.6
  • MongoDB replication set (May also be just a single MongoDB node)
  • PHP >= 7.1
  • PHP pcntl extension
  • PHP posix extension
  • PHP mongodb extension
  • PHP sysvmsg extension

Note: This library will only work on *nix systems. There is no windows support and there will most likely never be.

Download

The package is available at packagist

To install the package via composer execute:

composer require gyselroth/php-task-scheduler

Changelog

A changelog is available here.

Contribute

We are glad that you would like to contribute to this project. Please follow the given terms.

Terms

You may encounter the follwing terms in this readme or elsewhere:

Term Class Description
Scheduler TaskScheduler\Scheduler The Scheduler is used to add jobs, query jobs, delete jobs and listen for events, it is the only component besides (different) jobs which is actually used in your main application.
Job TaskScheduler\JobInterface A job implementation is the actual task you want to execute.
Process TaskScheduler\Process You will receive a process after adding jobs, query jobs and so on, a process is basically an upperset of your job implementation.
Queue Node TaskScheduler\Queue Queue nodes handle the available jobs and forward them to the worker manager.
Worker Manager TaskScheduler\WorkerManager The worker managers job is to spawn workers which actually handle a job. Note: A worker manager itself is a fork from the queue node process.
Worker TaskScheduler\Worker Workers are the ones which process a job from the queue and actually do your submitted work.
Worker Factory TaskScheduler\WorkerFactoryInterface A worker factory needs to be implemented by you, it will spawn the worker manager and new workers.
Cluster - A cluster is a set of multiple queue nodes. A cluster does not need to be configured in any way, you may start as many queue nodes as you want.

Install

If your app gets built using a docker container you must use at least the following build options:

FROM php:7.4
RUN docker-php-ext-install pcntl sysvmsg
RUN pecl install mongodb && docker-php-ext-enable mongodb pcntl sysvmsg

Documentation

For a better understanding how this library works, we're going to implement a mail job. Of course you can implement any kind of job.

Create job

It is quite easy to create a task, you just need to implement TaskScheduler\JobInterface. In this example we're going to implement a job called MailJob which sends mail using zend-mail.

Note: You can use TaskScheduler\AbstractJob to implement the required default methods by TaskScheduler\JobInterface. The only thing then you need to implement is start() which does the actual job (sending mail).

class MailJob extends TaskScheduler\AbstractJob
{
    /**
     * {@inheritdoc}
     */
    public function start(): bool
    {
        $transport = new Zend\Mail\Transport\Sendmail();
        $mail = Message::fromString($this->data);
        $this->transport->send($mail);

        return true;
    }
}

Initialize scheduler

You need an instance of a MongoDB\Database and a Psr\Log\LoggerInterface compatible logger to initialize the scheduler.

$mongodb = new MongoDB\Client('mongodb://localhost:27017');
$logger = new \A\Psr4\Compatible\Logger();
$scheduler = new TaskScheduler\Scheduler($mongodb->mydb, $logger);

Spool job

Now let us create a mail and deploy it to the task scheduler which we have initialized right before:

$mail = new Message();
$mail->setSubject('Hello...');
$mail->setBody('World');
$mail->setFrom('root@localhost', 'root');

$scheduler->addJob(MailJob::class, $mail->toString());

This is the whole magic, our scheduler now got its first job, awesome!

Execute jobs

But now we need to execute those queued jobs.
That's where the queue nodes come into play. Those nodes listen in (soft) realtime for new jobs and will load balance those jobs.

Create worker factory

You will need to create your own worker node factory in your app namespace which gets called to spawn new child processes. This factory gets called during a new fork is spawned. This means if it gets called, you are in a new process and you will need to bootstrap your application from scratch (Or just the things you need for a worker).

Note: Both a worker manager and a worker itself are spawned in own forks from the queue node process.

Queue node (TaskScheduler\Queue)
|
|-- Worker Manager (TaskScheduler\WorkerManager)
    |
    |-- Worker (TaskScheduler\Worker)
    |-- Worker (TaskScheduler\Worker)
    |-- Worker (TaskScheduler\Worker)
    |-- ...

For both a worker manager and a worker a new fork means you will need to bootstrap the class from scratch.

Note: Theoretically you can reuse existing connections, objects and so on by setting those via the constructor of your worker factory since the factory gets initialized in main(). But this will likely lead to errors and strange app behaviours and is not supported.

For better understanding: if there is a configuration file where you have stored your configs like a MongoDB uri, in the factory you will need to parse this configuration again and create a new mongodb instance. Or you may be using a PSR-11 container, the container needs to be created from scratch in the factory (A new dependency tree). You may pass an instance of a dic (compatible to Psr\Container\ContainerInterface) as fifth argument to TaskScheduler\Worker (or advanced worker manager options as third argument for a TaskScheduler\WorkerManager (Advanced worker manager options). See more at Using a DIC).

class WorkerFactory extends TaskScheduler\WorkerFactoryInterface
{
    /**
     * {@inheritdoc}
     */
    public function buildWorker(MongoDB\BSON\ObjectId $id): TaskScheduler\Worker
    {
        $mongodb = new MongoDB\Client('mongodb://localhost:27017');
        $logger = new \A\Psr4\Compatible\Logger();
        $scheduler = new TaskScheduler\Scheduler($mongodb->mydb, $logger);

        return new TaskScheduler\Worker($id, $scheduler, $mongodb->mydb, $logger);
    }
    
    /**
     * {@inheritdoc}
     */
    public function buildManager(): TaskScheduler\WorkerManager
    {
        $logger = new \A\Psr4\Compatible\Logger();
        return new TaskScheduler\WorkerManager($this, $logger);
    }
}

Create queue node

Let us write a new queue node. The queue node must be started as a separate process! You should provide an easy way to start such queue nodes, there are multiple ways to achieve this. The easiest way is to just create a single php script which can be started via cli.

$mongodb = new MongoDB\Client('mongodb://localhost:27017');
$logger = new \A\Psr4\Compatible\Logger();
$scheduler = new TaskScheduler\Scheduler($mongodb->mydb, $logger);
$worker_factory = My\App\WorkerFactory(); #An instance of our previously created worker factory
$queue = new TaskScheduler\Queue($scheduler, $mongodb, $worker_factory, $logger);

And then start the magic:

$queue->process();

Note: TaskScheduler\Queue::process() is a blocking call.

Our mail gets sent as soon as a queue node is running and started some workers.

Usually you want those nodes running at all times! They act like invisible execution nodes behind your app.

Manage jobs

Get jobs

You may want to retrieve all scheduled jobs:

$scheduler = new TaskScheduler\Scheduler($mongodb->mydb, $logger);
$scheduler->getJobs();

By default you will receive all jobs with the status:

  • WAITING
  • PROCESSING
  • POSTPONED

You may pass an optional query to query specific jobs.

$scheduler = new TaskScheduler\Scheduler($mongodb->mydb, $logger);
$jobs = $scheduler->getJobs([
    'status' => TaskScheduler\JobInterface::STATUS_DONE,
    '$or' => [
        ['class' => 'MyApp\\MyTask1'],
        ['class' => 'MyApp\\MyTask2'],
    ]
]);

foreach($jobs as $job) {
    echo $job->getId()." done\n";
}

Cancel job

It is possible to cancel jobs waiting in the queue as well as kill jobs which are actually running.

$scheduler = new TaskScheduler\Scheduler($mongodb->mydb, $logger);
$scheduler->cancelJob(MongoDB\BSON\ObjectId $job_id);

If you cancel a job with the status PROCESSING, the job gets killed by force and my corrupt data. You have been warned. (This is similar as the job ends with status TIMEOUT). The only difference is that a timeout job gets rescheduled if it has retry > 0 or has a configured interval. A canceled job will not get rescheduled. You will need to create a new job manually for that.

Modify jobs

It is not possible to modify a scheduled job by design. You need to cancel the job and append a new one.

Note: This is likely to be changed with v4 which will feature persistence for jobs.

Flush queue

While it is not possible to modify/remove jobs it is possible to flush the entire queue.

Note: This is not meant to be called regularly. There may be a case where you need to flush all jobs because of an upgrade. Running queue nodes will detect this and will listen for newly spooled jobs.

$scheduler = new TaskScheduler\Scheduler($mongodb->mydb, $logger);
$scheduler->flush();

Handling of failed jobs

A job is acknowledged as failed if the job throws an exception of any kind. If we have a look at our mail job again, but this time it will throw an exception:

class MailJob extends TaskScheduler\AbstractJob
{
    /**
     * {@inheritdoc}
     */
    public function start(): bool
    {
        $transport = new Zend\Mail\Transport\Sendmail();
        $mail = Message::fromString($this->data);
        $this->transport->send($mail);
        throw new \Exception('i am an exception');
        return true;
    }
}

This will lead to a FAILED job as soon as this job gets executed.

Note: It does not matter if you return true or false, only an uncaught exception will result to a FAILED job, however you should always return true.

The scheduler has an integrated handling of failed jobs. You may specify to automatically reschedule a job if it failed. The following will reschedule the job up to 5 times (If it ended with status FAILED) with an interval of 30s.

$scheduler->addJob(MailJob::class, $mail->toString(), [
    TaskScheduler\Scheduler::OPTION_RETRY => 5,
    TaskScheduler\Scheduler::OPTION_RETRY_INTERVAL => 30,
]);

This will queue our mail to be executed in one hour from now and it will re-schedule the job up to three times if it fails with an interval of one minute.

Alive ping and Job progress

TaskScheduler has built-in support to update the progress of a job from your job implementation. By default a job starts at 0 (%) and ends with progress 100 (%). Note that the progress is floating number. You may increase the progress made within your job.

Important: Note that by default the scheduler takes a job after 30s as orphaned and reschedules it. You may change the 30s globally during the Scheduler initialization or keep calling ->updateProgress() within your task implementation. Calling updateProgress with or without a progress acts like a keep alive ping for the scheduler and should be called in your task if it a long running task which contains a loop. If there is no loop you should still call this method in some form of intervals to keep your task alive. Set a progress as percentage value is not required, if not set the task keeps beeing at 0% and set to 100% if finished.

Let us have a look how this works with a job which copies a file from a to b.

class CopyFileJob extends TaskScheduler\AbstractJob
{
    /**
     * {@inheritdoc}
     */
    public function start(): bool
    {
        $source = $this->data['source'];
        $dest = $this->data['destination'];

        $size = filesize($source);
        $f = fopen($source, 'r');
        $t = fopen($dest, 'w');
        $read = 0;

        while($chunk = fread($f, 4096)) {
            $read += fwrite($t, $chunk);
            $this->updateProgress($read/$size*100);
        }
    }
}

The current progress may be available using the process interface:

$scheduler = new TaskScheduler\Scheduler($mongodb->mydb, $logger);
$p = $scheduler->getJob(MongoDB\BSON\ObjectId('5b3cbc39e26cf26c6d0ede69'));
$p->getProgress();

Note There is a rate limit for the progress updates which is by default 500ms. You may change the rate limit by configuring the TaskScheduler::OPTION_PROGRESS_RATE_LIMIT to something else and to 0 if you do not want a rate limit at all.

Asynchronous programming

Have a look at this example:

$scheduler = new TaskScheduler\Scheduler($mongodb->mydb, $logger);
$scheduler->addJob(MyTask::class, 'foobar')
  ->wait();

This will force main() (Your process) to wait until the task MyTask::class was executed. (Either with status DONE, FAILED, CANCELED, TIMEOUT).

Here is more complex example:

$scheduler = new TaskScheduler\Scheduler($mongodb->mydb, $logger);
$stack = [];
$stack[] = $scheduler->addJob(MyTask::class, 'foobar');
$stack[] = $scheduler->addJob(MyTask::class, 'barfoo');
$stack[] = $scheduler->addJob(OtherTask::class, 'barefoot');

$scheduler->waitFor($stack);

//some other important stuff here

This will wait for all three jobs to be finished before continuing.

Important note:
If you are programming in http mode (incoming http requests) and your app needs to deploy tasks it is good practice not to wait!. Best practice is to return a HTTP 202 code instead. If the client needs to know the result of those jobs you may return the process id's and send a 2nd request which then waits and returns the status of those jobs or the client may get its results via a persistent connection or websockets.

$scheduler = new TaskScheduler\Scheduler($mongodb->mydb, $logger);
$stack = $scheduler->getJobs([
    '_id' => ['$in' => $job_ids_from_http_request]
]);

$scheduler->waitFor(iterator_to_array($stack));

//do stuff

You may also intercept the wait if any process results in an exception:

$scheduler = new TaskScheduler\Scheduler($mongodb->mydb, $logger);
$stack = [];
$stack[] = $scheduler->addJob(MyTask::class, 'foobar');
$stack[] = $scheduler->addJob(MyTask::class, 'barfoo');
$stack[] = $scheduler->addJob(OtherTask::class, 'barefoot');

try {
    $scheduler->waitFor($stack, Scheduler::OPTION_THROW_EXCEPTION);
} catch(\Exception $e) {
    //error handling
}

Listen for events

You may bind to the scheduler and listen for any changes and do stuff :)

$scheduler = new TaskScheduler\Scheduler($mongodb->mydb, $logger);
$jobs = $scheduler->listen(function(TaskScheduler\Process $process) {
    echo "status of ".$process->getId().' change to '.$process->getStatus();
});

It is also possible to filter such events, this example will only get notified for events occured in a specific job.

$scheduler = new TaskScheduler\Scheduler($mongodb->mydb, $logger);
$jobs = $scheduler->listen(function(TaskScheduler\Process $process) {
    echo "status of ".$process->getId().' change to '.$process->getStatus();
}, [
    '_id' => new MongoDB\BSON\ObjectId('5b3cbc39e26cf26c6d0ede69')
]);

Note: listen() is a blocking call, you may exit the listener and continue with main() if you return a boolean true in the listener callback.

Bind events

Besides the simple listener method for the Scheduler you may bind event listeneres to your TaskScheduler\Queue and/or TaskScheduler\Scheduler.

For example:

$scheduler = new TaskScheduler\Scheduler($mongodb->mydb, $logger);
$stack = [];
$stack[] = $scheduler->addJob(MyTask::class, 'foobar');
$stack[] = $scheduler->addJob(MyTask::class, 'barfoo');
$stack[] = $scheduler->addJob(OtherTask::class, 'barefoot');

$scheduler->on('waiting', function(League\Event\Event $e, TaskScheduler\Process $p) {
    echo 'job '.$p->getId().' is waiting';
})->on('done', function(League\Event\Event $e, TaskScheduler\Process $p) {
    echo 'job '.$p->getId().' is finished';
})->on('*', function(League\Event\Event $e, TaskScheduler\Process $p) {
    echo 'job '.$p->getId().' is '.$p->getStats();
});

$scheduler->waitFor($stack);

Note: You need to to bind your listeneres before calling Scheduler::waitFor() since that is a synchronous blocking call.

You may bind listeneres to the same events in your queue nodes:

$queue = new TaskScheduler\Queue($scheduler, $mongodb, $worker_factory, $logger);

$queue->on('timeout', function(League\Event\Event $e, TaskScheduler\Process $p) {
    echo 'job '.$p->getId().' is timed out';
})->on('*', function(League\Event\Event $e, TaskScheduler\Process $p) {
    echo 'job '.$p->getId().' is '.$p->getStats();
});

$queue->process();

Note: You need to to bind your listeneres before calling Queue::process() since that is a synchronous blocking call.

Events

You may bind for the following events:

Short Full Scope Description
waiting taskscheduler.onWaiting global Triggers after a new job got added
postponed taskscheduler.onPostponed global Triggers after a job has been postponed
processing taskscheduler.onProcessing global Triggers after a job started to execute
done taskscheduler.onDone global Triggers after a job finished successfully
failed taskscheduler.onFailed global Triggers after a job failed
timeout taskscheduler.onTimeout global Triggers after a job timed out
cancel taskscheduler.onCancel global Triggers after a job has been canceled
workerSpawn taskscheduler.onWorkerSpawn queue node only Triggers after a queue node spawned a new worker
workerKill taskscheduler.onWorkerKill queue node only Triggers after a worker stopped on a queue node

Custom event emitter

Under the hood both TaskScheduler\Queue and TaskScheduler\Scheduler use League\Event\Emitter as event emitter. You may create both instances with your own Leage Event emitter instance:

$emitter = new League\Event\Emitter();

//Queue
$queue = new TaskScheduler\Queue($scheduler, $mongodb, $worker_factory, $logger, $emitter);

//Scheduler
$scheduler = new TaskScheduler\Scheduler($mongodb->mydb, $logger, [], $emitter);

Advanced job options

TaskScheduler\Scheduler::addJob()/TaskScheduler\Scheduler::addJobOnce() also accept a third option (options) which let you set more advanced options for the job:

Option Default Type Description
at 0 int Accepts a specific unix time which let you specify the time at which the job should be executed. The default (0) is immediately or better saying as soon as there is a free slot.
interval 0 int You may specify a job interval (in seconds) which is usefully for jobs which need to be executed in a specific interval, for example cleaning a temporary directory. The default is 0 which means no interval at all, -1 means execute the job immediately again (But be careful with -1, this could lead to huge cpu usage depending what job you're executing). Configuring 3600 means the job will get executed hourly.
interval_reference end string You may specify if the interval refers to the start or the end of the previous job. The default is end which means the interval refers to the end time of the previous job. When you define start the interval refers to the start time of the previous job. This may be useful when a job has to run at specific times.
retry 0 int Specifies a retry interval if the job fails to execute. The default is 0 which means do not retry. 2 for example means 2 retries. You may set -1 for endless retries.
retry_interval 300 int This options specifies the time (in seconds) between job retries. The default is 300 which is 5 minutes. Be careful with this option while retry_interval is -1, you may ending up with a failure loop.
force_spawn false bool You may specify true for this option to spawn a new worker only for this task. Note: This option ignores the max_children value of TaskScheduler\WorkerManager, which means this worker always gets spawned. It makes perfectly sense for jobs which make blocking calls, for example a listener which listens for local filesystem changes (inotify). A job with this enabled option should only consume as little cpu/memory as possible!
timeout 0 int Specify a timeout in seconds which will terminate the job after the given time has passed by force. The default 0 means no timeout at all. A timeout job will get rescheduled if retry is not 0 and will marked as timed out.
id null MongoDB\BSON\ObjectId Specify a job id manually.
ignore_data false bool Only useful if set in a addJobOnce() call. If true the scheduler does not compare the jobs data to decide if a job needs to get rescheduled.

Note: Be careful with timeouts since it will kill your running job by force. You have been warned. You shall always use a native timeout in a function if supported.

Let us add our mail job example again with some custom options:

Note: We are using the OPTION_ constansts here, you may also just use the names documented above.

$mongodb = new MongoDB\Client('mongodb://localhost:27017');
$logger = new \A\Psr4\Compatible\Logger();
$scheduler = new TaskScheduler\Scheduler($mongodb->mydb, $logger);

$mail = new Message();
$mail->setSubject('Hello...');
$mail->setBody('World');
$mail->setFrom('root@localhost', 'root');

$scheduler->addJob(MailJob::class, $mail->toString(), [
    TaskScheduler\Scheduler::OPTION_AT => time()+3600,
    TaskScheduler\Scheduler::OPTION_RETRY => 3,
    TaskScheduler\Scheduler::OPTION_RETRY_INTERVAL => 60,
]);

This will queue our mail to be executed in one hour from now and it will re-schedule the job up to three times if it fails with an interval of one minute.

Add job if not exists

What you also can do is adding the job only if it has not been queued yet. Instead using addJob() you can use addJobOnce(), the scheduler then verifies if it got the same job already queued. If not, the job gets added. The scheduler compares the type of job (MailJob in this case) and the data submitted ($mail->toString() in this case).

Note: The job gets rescheduled if options get changed.

$scheduler->addJobOnce(MailJob::class, $mail->toString(), [
    TaskScheduler\Scheduler::OPTION_AT => time()+3600,
    TaskScheduler\Scheduler::OPTION_RETRY => 3,
]);

By default TaskScheduler\Scheduler::addJobOnce() does compare the job class, the submitted data and the process status (either PROCESSING, WAITING or POSTPONED). If you do not want to check the data, you may set TaskScheduler\Scheduler::OPTION_IGNORE_DATA to true. This will tell the scheduler to only reschedule the job of the given class if the data changed. This is quite useful if a job of the given class must only be queued once.

Note: This option does not make sense in the mail example we're using here. A mail can have different content. But it may happen that you have job which clears a temporary storage every 24h:

$scheduler->addJobOnce(MyApp\CleanTemp::class, ['max_age' => 3600], [
    TaskScheduler\Scheduler::OPTION_IGNORE_DATA => true,
    TaskScheduler\Scheduler::OPTION_INTERVAL => 60*60*24,
]);

If max_age changes, the old job gets canceled and a new one gets queued. If TaskScheduler\Scheduler::OPTION_IGNORE_DATA is not set here we will end up with two jobs of the type MyApp\CleanTemp::class.

$scheduler->addJobOnce(MyApp\CleanTemp::class, ['max_age' => 1800], [
    TaskScheduler\Scheduler::OPTION_IGNORE_DATA => true,
    TaskScheduler\Scheduler::OPTION_INTERVAL => 60*60*24,
]);

Of course it is also possible to query such job manually, cancel them and reschedule. This will achieve the same as above:

$jobs = $scheduler->getJobs([
    'class' => MyApp\CleanTemp::class,
    'status' => ['$lte' => TaskScheduler\JobInterface::STATUS_PROCESSING]
]);

foreach($jobs as $job) {
    $scheduler->cancelJob($job->getId());
}

$scheduler->addJob(MyApp\CleanTemp::class, ['max_age' => 1800], [
    TaskScheduler\Scheduler::OPTION_INTERVAL => 60*60*24,
]);

Advanced scheduler options

You may set those job options as global defaults for the whole scheduler. Custom options and defaults can be set for jobs during initialization or by calling Scheduler::setOptions().

$mongodb = new MongoDB\Client('mongodb://localhost:27017');
$logger = new \A\Psr4\Compatible\Logger();
$scheduler = new TaskScheduler\Scheduler($mongodb->mydb, $logger, null, [
    TaskScheduler\Scheduler::OPTION_JOB_QUEUE_SIZE => 1000000000,
    TaskScheduler\Scheduler::OPTION_EVENT_QUEUE_SIZE => 5000000000,
    TaskScheduler\Scheduler::OPTION_DEFAULT_RETRY => 3
]);

You may also change those options afterwards:

$scheduler->setOptions([
    TaskScheduler\Scheduler::OPTION_DEFAULT_RETRY => 2
]);

Note: Changing default job options will not affect any existing jobs.

Name Default Type Description
job_queue taskscheduler.jobs string The MongoDB collection which acts as job message queue.
job_queue_size 1000000 int The maximum size in bytes of the job collection, if reached the first jobs get overwritten by new ones.
event_queue taskscheduler.events string The MongoDB collection which acts as event message queue.
event_queue_size 5000000 int The maximum size in bytes of the event collection, if reached the first events get overwritten by new ones. This value should usually be 5 times bigger than the value of job_queue_size since a job can have more events.
default_at null ?int Define a default execution time for all jobs. This relates only for newly added jobs. The default is immediately or better saying as soon as there is a free slot.
default_interval 0 int Define a default interval for all jobs. This relates only for newly added jobs. The default is 0 which means no interval at all.
default_interval_reference end string Define if the interval refers to the start or the end of the previous job.
default_retry 0 int Define a default retry interval for all jobs. This relates only for newly added jobs. There are no retries by default for failed jobs.
default_retry_interval 300 int This options specifies the time (in seconds) between job retries. This relates only for newly added jobs. The default is 300 which is 5 minutes.
default_timeout 0 int Specify a default timeout for all jobs. This relates only for newly added jobs. Per default there is no timeout at all.

Note: It is important to choose a queue size (job_queue_size and event_queue_size) which fits into your setup.

Advanced worker manager options

While you already know, that you need a worker factory to spawn the worker manager, you may specify advanced options for it! Here is our worker factory again, but this time we specify some more options:

class WorkerFactory extends TaskScheduler\WorkerFactoryInterface
{
    /**
     * {@inheritdoc}
     */
    public function buildWorker(MongoDB\BSON\ObjectId $id): TaskScheduler\Worker
    {
        $mongodb = new MongoDB\Client('mongodb://localhost:27017');
        $logger = new \A\Psr4\Compatible\Logger();
        $scheduler = new TaskScheduler\Scheduler($mongodb->mydb, $logger);

        return new TaskScheduler\Worker($id, $scheduler, $mongodb->mydb, $logger);
    }
    
    /**
     * {@inheritdoc}
     */
    public function buildManager(): TaskScheduler\WorkerManager
    {
        $logger = new \A\Psr4\Compatible\Logger();
        return new TaskScheduler\WorkerManager($this, $logger, [
            TaskScheduler\WorkerManager::OPTION_MIN_CHILDREN => 10,
            TaskScheduler\WorkerManager::OPTION_PM => 'static' 
        ]);
    }
}

Worker handling is done by specifying the option pm while dynamically spawning workers is the default mode.

Name Default Type Description
pm dynamic string You may change the way how fork handling is done. There are three modes: dynamic, static, ondemand.
min_children 1 int The minimum number of child processes.
max_children 2 int The maximum number of child processes.

Process management modes (pm):

  • dynamic (start min_children forks at startup and dynamically create new children if required until max_children is reached)
  • static (start min_children nodes, (max_children is ignored))
  • ondemand (Do not start any children at startup (min_children is ignored), bootstrap a worker for each job but no more than max_children. After a job is done (Or failed, canceled, timeout), the worker dies.

The default is dynamic. Usually dynamic makes sense. You may need static in a container provisioned world whereas the number of queue nodes is determined from the number of outstanding jobs. For example you may be using Kubernetes autoscaling.

Note: The number of actual child processes can be higher if jobs are scheduled with the option Scheduler::OPTION_FORCE_SPAWN.

Using a PSR-11 DIC

Optionally one can pass a Psr\Container\ContainerInterface to the worker nodes which then get called to create job instances. You probably already get it, but here is the worker factory again. This time it passes an instance of a PSR-11 container to worker nodes. And if you already using a container it makes perfectly sense to request the manager from it. (Of course you may also request a worker instances from it if your container implementation supports parameters at runtime (The worker id). Note: This will be an incompatible container implementation from the PSR-11 specification.)

class WorkerFactory extends TaskScheduler\WorkerFactoryInterface
{
    /**
     * {@inheritdoc}
     */
    public function build(MongoDB\BSON\ObjectId $id): TaskScheduler\Worker
    {
        $mongodb = new MongoDB\Client('mongodb://localhost:27017');
        $logger = new \A\Psr4\Compatible\Logger();
        $scheduler = new TaskScheduler\Scheduler($mongodb->mydb, $logger);
        $dic = new \A\Psr11\Compatible\Dic();

        return new TaskScheduler\Worker($id, $scheduler, $mongodb->mydb, $logger, $dic);
    }
    
    /**
     * {@inheritdoc}
     */
    public function buildManager(): TaskScheduler\WorkerManager
    {
        $dic = new \A\Psr11\Compatible\Dic();
        return $dic->get(TaskScheduler\WorkerManager::class);
    }
}

Signal handling

Terminating queue nodes is possible of course. They even manage to reschedule running jobs. You just need to send a SIGTERM to the process. The queue node then will transmit this the worker manager while the worker manager will send it to all running workers and they will save their state and nicely exit. A worker also saves its state if the worker process directly receives a SIGTERM. If a SIGKILL was used to terminate the queue node (or worker) the state can not be saved and you might get zombie jobs (Jobs with the state PROCESSING but no worker will actually process those jobs). No good sysadmin will terminate running jobs by using SIGKILL, it is not acceptable and may only be used if you know what you are doing.

You should as well avoid using never ending blocking functions in your job, php can't handle signals if you do that.

Real world examples

Project Description
balloon balloon is a high performance cloud server. It makes use of this library to deploy all kind of jobs (create previews, scan files, upload to elasticsearch, sending mails, converting documents, clean temporary storage, clean trash, ...).
tubee tubee is data management engine and makes use of this library to execute its sync jobs.

Add your project here, a PR will be most welcome.

php-task-scheduler's People

Contributors

raffis avatar rootinier avatar s-aebischer avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar

php-task-scheduler's Issues

FORCE_SPAWN will not be handled by the queue node

Describe the bug

Systemv message type [3] will not be handled by queue node and the job with OPTION_FORCE_SPAWN true will not be started.

To Reproduce

  1. Add job once with OPTION_INTERVAL
  2. Add job once with OPTION_FORCE_SPAWN true and OPTION_RETRY -1
  3. Start normal job

Expected behavior

Systemv message type [3] should be handled by queue node.

Environment

  • (\TaskScheduler) Version: 3.1.0
  • MongoDB Version: 3.6.14
  • libmongoc Version: 1.15.2
  • PHP Version 7.3.12

Additional context

[2020-06-16T11:52:59.590262+00:00] default.INFO: start job listener {"category":"TaskScheduler\\Queue"} []
2020-06-16 11:52:59 [TaskScheduler\WorkerManager,DEBUG]: spawn initial [1] workers [] []
2020-06-16 11:52:59 [TaskScheduler\WorkerManager,DEBUG]: spawn new worker [] []
2020-06-16 11:52:59 [TaskScheduler\WorkerManager,DEBUG]: spawned worker [5ee8b29bf3e7824c713aba3a] with pid [96] [] []
2020-06-16 11:52:59 [TaskScheduler\WorkerManager,DEBUG]: received systemv message type [3] [] []

Interval refers always to end of previous job

Is your feature request related to a problem? Please describe

The job interval refers to the end of the previous job. Sometimes it is necessary that a job starts after a certain time after the previous job has started.

Describe the solution you'd like

Add an option to define if the interval refers to the start or the end of the previous job.

Describe alternatives you've considered

Additional context

why do you have this " 'worker' => new ObjectId()" when a new job is added

Could you explain, please what is the meaning to have this:
mongodb-php-task-scheduler/src/Scheduler.php/prepareInsert method
$document = [
..
'worker' => new ObjectId(),
...
];
I added a job and the job has this in the DB:
{ "_id" : ObjectId("5c7d99dd3b775021615e86e3"), "class" : "Mail\MailJob", "status" : 0, "created" : ISODate("2019-03-04T21:34:17.684Z"), "started" : ISODate("2019-03-04T21:34:17.684Z"), "ended" : ISODate("2019-03-04T21:34:17.684Z"), "worker" : ObjectId("5c7d99d93b775021615e86e2"), "data" : "Date: Mon, 04 Mar 2019 21:34:17 +0000\r\nSubject: Hello...\r\nFrom: root root@localhost\r\n\r\nWorld", "options" : { "at" : 0, "interval" : 0, "retry" : 0, "retry_interval" : 300, "force_spawn" : false, "timeout" : 0, "priority" : 60, "ignore_data" : false } }
There are no running workers yet. What is ObjectId("5c7d99d93b775021615e86e2")???

writer worker caught exception: 10003 Cannot change the size of a document in a capped collection

MongoDB replset slave nodes may crash due a MongoDB bug in capped collections replsets:

There are some workarounds/maybe issue solver:

  • It's happening while changing ended timestamp => try to init ended with new UTCDateTime() (not 0)
  • Place taskscheduler in the local database (no replset sync),
 2018-12-21T18:54:41.777+0100 I - [repl writer worker 14] Fatal assertion 16359 CannotGrowDocumentInCappedNamespace: Cannot change the size of a document in a capped collection: 71 != 86 at src/mongo/db/repl/sync_tail.cpp 1087
2019-01-21T08:43:21.477+0100 I STORAGE [initandlisten] exception in initAndListen: 10003 Cannot change the size of a document in a capped collection: 71 != 86, terminating
2019-01-21T18:37:32.433+0100 F REPL [repl writer worker 11] writer worker caught exception: 10003 Cannot change the size of a document in a capped collection: 71 != 86 on: { ts: Timestamp 1548092252000|2, h: 8891639960730045349, v: 2, op: "u", ns: "cloudfs.taskscheduler.jobs", o2:
{ _id: ObjectId('5c4592d8a7621c000a270723') }

, o: { $set:
{ status: 3, ended: new Date(1548092252319) }

} }
2019-01-21T18:37:32.433+0100 I - [repl writer worker 11] Fatal assertion 16359 CannotGrowDocumentInCappedNamespace: Cannot change the size of a document in a capped collection: 71 != 86 at src/mongo/db/repl/sync_tail.cpp 1087
2019-01-17T07:23:58.732+0100 F REPL [repl writer worker 6] writer worker caught exception: 10003 Cannot change the size of a document in a capped collection: 71 != 86 on: { ts: Timestamp 1547706238000|2, h: -1287491416624820144, v: 2, op: "u", ns: "cloudfs.taskscheduler.jobs", o2:
{ _id: ObjectId('5c3ecdfd81fa83000704e0f6') }

, o: { $set:
{ status: 3, ended: new Date(1547706238359) }

} }
2019-01-17T07:23:58.770+0100 I - [repl writer worker 6] Fatal assertion 16359 CannotGrowDocumentInCappedNamespace: Cannot change the size of a document in a capped collection: 71 != 86 at src/mongo/db/repl/sync_tail.cpp 1087
2019-01-17T07:52:46.610+0100 I STORAGE [initandlisten] exception in initAndListen: 10003 Cannot change the size of a document in a capped collection: 71 != 86, terminating
2019-01-18T10:36:02.219+0100 F REPL [repl writer worker 0] writer worker caught exception: 10003 Cannot change the size of a document in a capped collection: 71 != 86 on: { ts: Timestamp 1547804162000|2, h: -9155582286729571842, v: 2, op: "u", ns: "cloudfs.taskscheduler.jobs", o2:
{ _id: ObjectId('5c404c817047201a535de75f') }

, o: { $set:
{ status: 3, ended: new Date(1547804162166) }

} }
2019-01-18T10:36:02.219+0100 I - [repl writer worker 0] Fatal assertion 16359 CannotGrowDocumentInCappedNamespace: Cannot change the size of a document in a capped collection: 71 != 86 at src/mongo/db/repl/sync_tail.cpp 1087
2019-01-19T10:36:03.302+0100 F REPL [repl writer worker 13] writer worker caught exception: 10003 Cannot change the size of a document in a capped collection: 71 != 86 on: { ts: Timestamp 1547890562000|3, h: -9125484584779178179, v: 2, op: "u", ns: "cloudfs.taskscheduler.jobs", o2:
{ _id: ObjectId('5c419e02863a75000b7a54b9') }

, o: { $set:
{ status: 3, ended: new Date(1547890562963) }

} }
2019-01-19T10:36:03.315+0100 I - [repl writer worker 13] Fatal assertion 16359 CannotGrowDocumentInCappedNamespace: Cannot change the size of a document in a capped collection: 71 != 86 at src/mongo/db/repl/sync_tail.cpp 1087

$scheduler->addJob creates a usual collection (not capped)

public function addJob(string $class, $data, array $options = []): Process
{
$document = $this->prepareInsert($class, $data, $options);

	$result = $this->db->{$this->job_queue}->insertOne($document);

...
This code ^ doesn't work properly, i think.

Configurable max_message_size for msg_receive()

Describe the change

In the WorkerManager class and Queue class the maximum size of messages for the php function msg_receive() is a fixed value. This value should be configurable with a default.

Current situation

The value is fixed to 16384

Should

The default value should be 16384 but it should be possible to configure this value.

Add event callback bindings to wait(), waitFor()

Is your feature request related to a problem? Please describe

No

Describe the solution you'd like

It should be possible to attach callbacks to wait() and waitFor() to execute different callbacks on different job results.

  • Both should accept an array argument with optional callbacks:
[
'onSuccess' = function(Process $process){},
'onError' = function(Process $process){},
'onComplete' = function(Process $process){},
'onTimeout' = function(Process $process){},
'onAbort' = function(Process $process){},
]

Describe alternatives you've considered

A clear and concise description of any alternative solutions or features you've considered.

Additional context

Add any other context or screenshots about the feature request here.

Implement job priority

  • Add new option TaskScheduler\Scheduler::OPTION_PRIORITY integer

Job with a higher priority should be processed before jobs with a lower (or 0).
Sorting by prio may be a problem since we can not sort a tailable cursor nor a changestream.
One possibility is to listen for new jobs and after processing a job do get first ordered by priority. If this results in 0 jobs start a tailable cursor (or this may be changestream in v4).

uncaught exception when adding a new job

Describe the bug

Sometimes when adding a Job which already exists a uncaught exception is thrown:
uncaught exception: array_intersect_key(): Expected parameter 1 to be an array, null given at .../mongodb-php-task-scheduler/src/Scheduler.php:375)

FORCE_SPAWN shall not count to max children

Describe the bug

If jobs are bootstraped with FORCE_SPAWN: true they will force start which is fine.
but their start will count to MAX_CHILDREN and hold all other jobs in the queue.

To Reproduce

  1. Start queue with max_children: 1
  2. Add FORCE_SPAWN job
  3. Start normal job
  4. If the force jobs is a loop job, the 2nd job will never start.

Expected behavior

A job started with FORCE_SPAWN should not count to running max children.

Environment

  • (\TaskScheduler) version: v3.1.0
  • MongoDB Version: [e.g. v3.4.18]
  • PHP Version [e.g. v7.2.1]

Additional context

[2019-03-28 14:00:27] default.DEBUG: received job [5c9cd3119883f200f02fb2a6], write in systemv message queue {"category":"TaskScheduler\\Queue"} []
2019-28-03 14:00:27 [TaskScheduler\WorkerManager,DEBUG]: received systemv message type [1]  
2019-28-03 14:00:27 [TaskScheduler\WorkerManager,DEBUG]: max children [2] reached for job [5c9cd3119883f200f02fb2a6], do not spawn new worker  
[2019-03-28 14:00:29] default.DEBUG: received job [5c9cd3119883f200f02fb2aa], write in systemv message queue {"category":"TaskScheduler\\Queue"} []
2019-28-03 14:00:29 [TaskScheduler\WorkerManager,DEBUG]: received systemv message type [1]  
2019-28-03 14:00:29 [TaskScheduler\WorkerManager,DEBUG]: max children [2] reached for job [5c9cd3119883f200f02fb2aa], do not spawn new worker  
[2019-03-28 14:00:31] default.DEBUG: received job [5c9cd3119883f200f02fb2ae], write in systemv message queue {"category":"TaskScheduler\\Queue"} []
2019-28-03 14:00:31 [TaskScheduler\WorkerManager,DEBUG]: received systemv message type [1]  
2019-28-03 14:00:31 [TaskScheduler\WorkerManager,DEBUG]: max children [2] reached for job [5c9cd3119883f200f02fb2ae], do not spawn new worker  
[2019-03-28 14:00:33] default.DEBUG: received job [5c9cd3129883f200f02fb2b2], write in systemv message queue {"category":"TaskScheduler\\Queue"} []
2019-28-03 14:00:33 [TaskScheduler\WorkerManager,DEBUG]: received systemv message type [1]  
2019-28-03 14:00:33 [TaskScheduler\WorkerManager,DEBUG]: max children [2] reached for job [5c9cd3129883f200f02fb2b2], do not spawn new worker  
[2019-03-28 14:00:35] default.DEBUG: received job [5c9cd3129883f200f02fb2b6], write in systemv message queue {"category":"TaskScheduler\\Queue"} []
2019-28-03 14:00:35 [TaskScheduler\WorkerManager,DEBUG]: received systemv message type [1]  
2019-28-03 14:00:35 [TaskScheduler\WorkerManager,DEBUG]: max children [2] reached for job [5c9cd3129883f200f02fb2b6], do not spawn new worker  
[2019-03-28 14:00:37] default.DEBUG: received job [5c9cd3129883f200f02fb2ba], write in systemv message queue {"category":"TaskScheduler\\Queue"} []
2019-28-03 14:00:37 [TaskScheduler\WorkerManager,DEBUG]: received systemv message type [1]  
2019-28-03 14:00:37 [TaskScheduler\WorkerManager,DEBUG]: max children [2] reached for job [5c9cd3129883f200f02fb2ba], do not spawn new worker  
[2019-03-28 14:00:39] default.DEBUG: received job [5c9cd3129883f200f02fb2be], write in systemv message queue {"category":"TaskScheduler\\Queue"} []
2019-28-03 14:00:39 [TaskScheduler\WorkerManager,DEBUG]: received systemv message type [1]  
2019-28-03 14:00:39 [TaskScheduler\WorkerManager,DEBUG]: max children [2] reached for job [5c9cd3129883f200f02fb2be], do not spawn new worker  
[2019-03-28 14:00:41] default.DEBUG: received job [5c9cd3129883f200f02fb2c2], write in systemv message queue {"category":"TaskScheduler\\Queue"} []
2019-28-03 14:00:41 [TaskScheduler\WorkerManager,DEBUG]: received systemv message type [1]  
2019-28-03 14:00:41 [TaskScheduler\WorkerManager,DEBUG]: max children [2] reached for job [5c9cd3129883f200f02fb2c2], do not spawn new worker  
[2019-03-28 14:00:43] default.DEBUG: received job [5c9cd3129883f200f02fb2c6], write in systemv message queue {"category":"TaskScheduler\\Queue"} []
2019-28-03 14:00:43 [TaskScheduler\WorkerManager,DEBUG]: received systemv message type [1]  
2019-28-03 14:00:43 [TaskScheduler\WorkerManager,DEBUG]: max children [2] reached for job [5c9cd3129883f200f02fb2c6], do not spawn new worker  
[2019-03-28 14:00:45] default.DEBUG: received job [5c9cd3129883f200f02fb2c9], write in systemv message queue {"category":"TaskScheduler\\Queue"} []
2019-28-03 14:00:45 [TaskScheduler\WorkerManager,DEBUG]: received systemv message type [1]  
2019-28-03 14:00:45 [TaskScheduler\WorkerManager,DEBUG]: max children [2] reached for job [5c9cd3129883f200f02fb2c9], do not spawn new worker  
[2019-03-28 14:00:47] default.DEBUG: received job [5c9cd3129883f200f02fb2cc], write in systemv message queue {"category":"TaskScheduler\\Queue"} []
2019-28-03 14:00:47 [TaskScheduler\WorkerManager,DEBUG]: received systemv message type [1]  
2019-28-03 14:00:47 [TaskScheduler\WorkerManager,DEBUG]: max children [2] reached for job [5c9cd3129883f200f02fb2cc], do not spawn new worker  
[2019-03-28 14:00:49] default.DEBUG: received job [5c9cd3129883f200f02fb2cf], write in systemv message queue {"category":"TaskScheduler\\Queue"} []
2019-28-03 14:00:49 [TaskScheduler\WorkerManager,DEBUG]: received systemv message type [1]  
2019-28-03 14:00:49 [TaskScheduler\WorkerManager,DEBUG]: max children [2] reached for job [5c9cd3129883f200f02fb2cf], do not spawn new worker  
[2019-03-28 14:00:51] default.DEBUG: received job [5c9cd3129883f200f02fb2d3], write in systemv message queue {"category":"TaskScheduler\\Queue"} []
2019-28-03 14:00:51 [TaskScheduler\WorkerManager,DEBUG]: received systemv message type [1]  
2019-28-03 14:00:51 [TaskScheduler\WorkerManager,DEBUG]: max children [2] reached for job [5c9cd3129883f200f02fb2d3], do not spawn new worker  
[2019-03-28 14:00:53] default.DEBUG: received job [5c9cd3129883f200f02fb2d7], write in systemv message queue {"category":"TaskScheduler\\Queue"} []
2019-28-03 14:00:53 [TaskScheduler\WorkerManager,DEBUG]: received systemv message type [1]  
2019-28-03 14:00:53 [TaskScheduler\WorkerManager,DEBUG]: max children [2] reached for job [5c9cd3129883f200f02fb2d7], do not spawn new worker  
[2019-03-28 14:00:55] default.DEBUG: received job [5c9cd313016e3c000d4ff3ab], write in systemv message queue {"category":"TaskScheduler\\Queue"} []
2019-28-03 14:00:55 [TaskScheduler\WorkerManager,DEBUG]: received systemv message type [1]  
2019-28-03 14:00:55 [TaskScheduler\WorkerManager,DEBUG]: max children [2] reached for job [5c9cd313016e3c000d4ff3ab], do not spawn new worker  
[2019-03-28 14:00:57] default.DEBUG: received job [5c9cd3149883f201012fb016], write in systemv message queue {"category":"TaskScheduler\\Queue"} []
2019-28-03 14:00:57 [TaskScheduler\WorkerManager,DEBUG]: received systemv message type [1]  
2019-28-03 14:00:57 [TaskScheduler\WorkerManager,DEBUG]: max children [2] reached for job [5c9cd3149883f201012fb016], do not spawn new worker  
[2019-03-28 14:00:59] default.DEBUG: received job [5c9cd314016e3c000d4ff3ae], write in systemv message queue {"category":"TaskScheduler\\Queue"} []
2019-28-03 14:00:59 [TaskScheduler\WorkerManager,DEBUG]: received systemv message type [1]  
2019-28-03 14:00:59 [TaskScheduler\WorkerManager,DEBUG]: max children [2] reached for job [5c9cd314016e3c000d4ff3ae], do not spawn new worker  
[2019-03-28 14:01:01] default.DEBUG: received job [5c9cd3149883f201012fb019], write in systemv message queue {"category":"TaskScheduler\\Queue"} []
2019-28-03 14:01:01 [TaskScheduler\WorkerManager,DEBUG]: received systemv message type [1]  
2019-28-03 14:01:01 [TaskScheduler\WorkerManager,DEBUG]: max children [2] reached for job [5c9cd3149883f201012fb019], do not spawn new worker  
[2019-03-28 14:01:03] default.DEBUG: received job [5c9cd314016e3c000d4ff3b1], write in systemv message queue {"category":"TaskScheduler\\Queue"} []
2019-28-03 14:01:03 [TaskScheduler\WorkerManager,DEBUG]: received systemv message type [1]  
2019-28-03 14:01:03 [TaskScheduler\WorkerManager,DEBUG]: max children [2] reached for job [5c9cd314016e3c000d4ff3b1], do not spawn new worker  
[2019-03-28 14:01:05] default.DEBUG: received job [5c9cd314016e3c000d4ff3b4], write in systemv message queue {"category":"TaskScheduler\\Queue"} []
2019-28-03 14:01:05 [TaskScheduler\WorkerManager,DEBUG]: received systemv message type [1]  
2019-28-03 14:01:05 [TaskScheduler\WorkerManager,DEBUG]: max children [2] reached for job [5c9cd314016e3c000d4ff3b4], do not spawn new worker  
[2019-03-28 14:01:07] default.DEBUG: received job [5c9cd314016e3c000d4ff3b7], write in systemv message queue {"category":"TaskScheduler\\Queue"} []
2019-28-03 14:01:07 [TaskScheduler\WorkerManager,DEBUG]: received systemv message type [1]  
2019-28-03 14:01:07 [TaskScheduler\WorkerManager,DEBUG]: max children [2] reached for job [5c9cd314016e3c000d4ff3b7], do not spawn new worker  
[2019-03-28 14:01:09] default.DEBUG: received job [5c9cd314016e3c000d4ff3bb], write in systemv message queue {"category":"TaskScheduler\\Queue"} []
2019-28-03 14:01:09 [TaskScheduler\WorkerManager,DEBUG]: received systemv message type [1]  
2019-28-03 14:01:09 [TaskScheduler\WorkerManager,DEBUG]: max children [2] reached for job [5c9cd314016e3c000d4ff3bb], do not spawn new worker  
[2019-03-28 14:01:11] default.DEBUG: received job [5c9cd3159883f200f02fb2db], write in systemv message queue {"category":"TaskScheduler\\Queue"} []
2019-28-03 14:01:11 [TaskScheduler\WorkerManager,DEBUG]: received systemv message type [1]  
2019-28-03 14:01:11 [TaskScheduler\WorkerManager,DEBUG]: max children [2] reached for job [5c9cd3159883f200f02fb2db], do not spawn new worker  
[2019-03-28 14:01:13] default.DEBUG: received job [5c9cd3159883f200f02fb2de], write in systemv message queue {"category":"TaskScheduler\\Queue"} []
2019-28-03 14:01:13 [TaskScheduler\WorkerManager,DEBUG]: received systemv message type [1]  
2019-28-03 14:01:13 [TaskScheduler\WorkerManager,DEBUG]: max children [2] reached for job [5c9cd3159883f200f02fb2de], do not spawn new worker  
[2019-03-28 14:01:15] default.DEBUG: received job [5c9cd3159883f200f02fb2e1], write in systemv message queue {"category":"TaskScheduler\\Queue"} []
2019-28-03 14:01:15 [TaskScheduler\WorkerManager,DEBUG]: received systemv message type [1]  
2019-28-03 14:01:15 [TaskScheduler\WorkerManager,DEBUG]: max children [2] reached for job [5c9cd3159883f200f02fb2e1], do not spawn new worker  
[2019-03-28 14:01:17] default.DEBUG: received job [5c9cd3159883f200f02fb2e4], write in systemv message queue {"category":"TaskScheduler\\Queue"} []
2019-28-03 14:01:17 [TaskScheduler\WorkerManager,DEBUG]: received systemv message type [1]  
2019-28-03 14:01:17 [TaskScheduler\WorkerManager,DEBUG]: max children [2] reached for job [5c9cd3159883f200f02fb2e4], do not spawn new worker  
[2019-03-28 14:01:19] default.DEBUG: received job [5c9cd3159883f201012fb01c], write in systemv message queue {"category":"TaskScheduler\\Queue"} []
2019-28-03 14:01:19 [TaskScheduler\WorkerManager,DEBUG]: received systemv message type [1]  
2019-28-03 14:01:19 [TaskScheduler\WorkerManager,DEBUG]: max children [2] reached for job [5c9cd3159883f201012fb01c], do not spawn new worker  
[2019-03-28 14:01:21] default.DEBUG: received job [5c9cd3169883f200f02fb2e7], write in systemv message queue {"category":"TaskScheduler\\Queue"} []
2019-28-03 14:01:21 [TaskScheduler\WorkerManager,DEBUG]: received systemv message type [1]  
2019-28-03 14:01:21 [TaskScheduler\WorkerManager,DEBUG]: max children [2] reached for job [5c9cd3169883f200f02fb2e7], do not spawn new worker  
[2019-03-28 14:01:23] default.DEBUG: received job [5c9cd3169883f200f02fb2eb], write in systemv message queue {"category":"TaskScheduler\\Queue"} []
2019-28-03 14:01:23 [TaskScheduler\WorkerManager,DEBUG]: received systemv message type [1]  
2019-28-03 14:01:23 [TaskScheduler\WorkerManager,DEBUG]: max children [2] reached for job [5c9cd3169883f200f02fb2eb], do not spawn new worker  

Progress support

Is your feature request related to a problem? Please describe

Currently there is no direct support for progress information.

Describe the solution you'd like

Add a new field progress int 0 - 100 which indicates the progress of the job.
The progress may be increased by calling ProcessInterface::increaseProgress() from the job runner.

Add `end` timestamp to end a job interval

Is your feature request related to a problem? Please describe

No

Describe the solution you'd like

We have support for at and interval.
It would be nice to have possibility to end an interval.

Describe alternatives you've considered

Additional context

Wait timeout

Is your feature request related to a problem? Please describe

If a process waits for child processes using Process::wait() and no workers are available this might result in an infinite wait time. Meaning if there are only processes which are waiting for other processes there could never be a free slot to execute those waiting processes.

Describe the solution you'd like

Introduce a wait timeout?

Killed workers do not get restarted if no new jobs are added

Describe the bug

Killed worker do not get restarted. If there are already new jobs in the waiting queue and all current worker have been killed no new worker gets started as long as there are no new jobs in the queue.

To Reproduce

  1. Create job (long running job)
  2. execute job (configure max worker 1)
  3. Add new job to queue
  4. Kill first job
  5. The seccond job does not get started since there are no new worker.
  6. Add new job
  7. Both outstanding jobs get now executed.

Expected behavior

After killing a worker, the worker manager must check if the min. required number of workers is still valid. If not, new workers need to get started.

Environment

  • (\TaskScheduler) version: v3.0.2
  • MongoDB Version: [e.g. v3.4.18]
  • PHP Version [e.g. v7.2.1]

Additional context

Add any other context about the problem here.

Get exception from process (not via wait())

Is your feature request related to a problem? Please describe

I'd like to request a done/failed job from the scheduler and get the exception if there is one.

Describe the solution you'd like

Add a getException() to TaskScheduler\Process. Will be null if no exception and an instance of \Exception if there was one.

Describe alternatives you've considered

A clear and concise description of any alternative solutions or features you've considered.

Additional context

Add any other context or screenshots about the feature request here.

Add notification function

Is your feature request related to a problem? Please describe

When a job fails or gets canceled the main application can't handle this change. It would be nice to simply add a handler function in which the main application can handle the error (e.g. add an other job).

Describe the solution you'd like

When a job status is changed a function should be called in the main application. In this method custom error handling can be added.

Slow "Process::wait()" when dealing with many jobs

If a lot of (several hundreds in my case) of jobs are submitted in little time, Process::wait() is really slow (sometimes about 30s between the actual end of the job and the end of the wait in my case).

If I change the cursorType in MessageQueue::getCursor to Find::Tailable (instead of Find::TailableAwait) the problem does not occur (but the cpu usage increases).

How to add priority to a job

Hello everyone,
Could you explain, please, if it's possible to add a job priority to your code. I see some options as "at", "timeout" and etc., but i don't see priority. I would like to know how difficult it's to add the one. Can i do it this way:

  1. Add 1 more option - Priority
  2. Change the Queue.php/main method to select according to priority, here:
    $cursor_jobs = $this->jobs->getCursor([
    '$or' => [
    ['status' => JobInterface::STATUS_WAITING],
    ['status' => JobInterface::STATUS_POSTPONED],
    ],
    ]);
  3. Do the same stuff as in item 2 for Worker/processAll method.

Thanks.

So, i think, i found out you use a capped collection and a tailable cursor, there is only natural sorting available for such things: Tailable cursors do not use indexes and return documents in natural order.
Could you confirm that there is no possibility to add priority different than FIFO?

Check interval_reference option when a job times out or fails

Describe the change

When a job times out or fails a retry_timeout can be specified. The retry_timeout is added to the current timestamp.
The scheduler should check the interval_reference option and schedule the new job by adding the retry_interval to either the start time of the job or the current timestamp.

Current situation

When a job failed or ran into a timeout the new execution time is always calculated by adding the retry_interval to the current timestamp.

Should

The scheduler should check the interval_reference and add the retry_interval to either the start time of the job or the current timestamp

Add event bindings in the Process handler

Is your feature request related to a problem? Please describe

Yes, I'm missing the possibility to listen for events while processing jobs.
Queue::process() is blocking and Scheduler::listen() as well.

Describe the solution you'd like

It should be possible to listen for events on both despite the blocking call.

Queue::bind('onSuccess', function(){})
Queue::process()

Separate \TaskScheduler from MongoDB

Is your feature request related to a problem? Please describe

No.

Describe the solution you'd like

I would like to see a storage interface to use \TaskScheduler with whatever database/message queue I'd like.

Describe alternatives you've considered

No.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.