Giter Club home page Giter Club logo

queues's Introduction

queues's People

Contributors

0xtim avatar andrewangeta avatar bennydebock avatar dylanshine avatar finestructure avatar gwynne avatar heldersrvio avatar ikenndac avatar ileitch avatar jaapwijnen avatar jdmcd avatar kacperk avatar madsodgaard avatar raulriera avatar rnantes avatar siemensikkema avatar tanner0101 avatar vkill avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

queues's Issues

Allow getting connection pools for an event loop in clients

Is your feature request related to a problem? Please describe.
It would be useful to be able to get the RedisConnectionPool for an EventLoop. For example, one may want to extend QueueContext with a Redis property, to allow using Redis from Vapor Jobs.

Describe the solution you'd like
The method pool(eventLoop:) in Application.Redis could be made public.

Describe alternatives you've considered
This library could extend each type that could make use of Redis.

In process queues worker crash on Ctrl+c to exit server

I have found a small issue when making use of startInProcessJobs function to run in process worker.

In configure we call:

app.queues.startInProcessJobs(on .default)

However when server is exited using ctrl+c, there will be an error printed from QueuesCommand deinit

JobsCommand did not shutdown before deinit

I think we need to QueuesCommand part of lifecycle so its shutdown function can be called properly?

THanks for your help!

Support delay

The library should allow users to specify a delayed job:

queue.dispatch(job, delay: someDateInTheFuture)

Allow single process workers

Users should be able to specify that the jobs worker should be run in the same process as the web application instead of running via a separate CLI command

Support for setting a JobIdentifier for a job

Currently, there is no way to inject a JobIdentifier for a certain job using queue.dispatch(...) so that we can use that JobIdentifier to clear a job from the queue, by calling queue.clear(id).

Event delegate for scheduled jobs

At the moment the event delegate only works for regular jobs. You can work around it, but I think it would be nice to have it "in-build".

We could possibly add the notification hook to the method, where the job is about to run. I think its in ScheduledJob on line 49.

But also I think we need to define a different event delegate for a schedule job as well.

More informative error reporting about what job is failing

QueuesCommand.swift line 101: worker.queue.logger.error("Job run failed: (error)")
yields uninformative errors like "Connection refused" with no context as to what actually failed.
Logging the worker, I see it has some information about the job that would be useful.

Might relate to #79 possibly as without a delay, the logs fill with infinite retries of non-obvious origins.
Possibly just logging the job might suffice.

ScheduledJob run twice quickly because actual sleep duration is shorter than expected.

Describe the bug

I used ScheduledJob which is scheduled at every minutes.
In some environment, job runs twice quickly at every minutes.

I patched this library to investigate by printing more information to know what happens.
This is a patch.
https://github.com/omochi/queues/pull/1/files

It prints scheduled time previously started job and current time.

This is log.

$ .build/release/Run serve --hostname 0.0.0.0 --port 80
[ INFO ] Scheduling JobKickerJob to run at 2021-03-03 12:16:00 +0000, now=2021-03-03 12:15:26 +0000, prev=nil
[ NOTICE ] Server starting on http://0.0.0.0:80
[ INFO ] Scheduling JobKickerJob to run at 2021-03-03 12:16:00 +0000, now=2021-03-03 12:15:59 +0000, prev=Optional(2021-03-03 12:16:00 +0000)
[ INFO ] Scheduling JobKickerJob to run at 2021-03-03 12:17:00 +0000, now=2021-03-03 12:16:00 +0000, prev=Optional(2021-03-03 12:16:00 +0000)
[ INFO ] Scheduling JobKickerJob to run at 2021-03-03 12:17:00 +0000, now=2021-03-03 12:16:59 +0000, prev=Optional(2021-03-03 12:17:00 +0000)
[ INFO ] Scheduling JobKickerJob to run at 2021-03-03 12:18:00 +0000, now=2021-03-03 12:17:00 +0000, prev=Optional(2021-03-03 12:17:00 +0000)
[ INFO ] Scheduling JobKickerJob to run at 2021-03-03 12:18:00 +0000, now=2021-03-03 12:17:59 +0000, prev=Optional(2021-03-03 12:18:00 +0000)
[ INFO ] Scheduling JobKickerJob to run at 2021-03-03 12:19:00 +0000, now=2021-03-03 12:18:00 +0000, prev=Optional(2021-03-03 12:18:00 +0000)
[ INFO ] Scheduling JobKickerJob to run at 2021-03-03 12:19:00 +0000, now=2021-03-03 12:18:59 +0000, prev=Optional(2021-03-03 12:19:00 +0000)
[ INFO ] Scheduling JobKickerJob to run at 2021-03-03 12:20:00 +0000, now=2021-03-03 12:19:00 +0000, prev=Optional(2021-03-03 12:19:00 +0000)
[ INFO ] Scheduling JobKickerJob to run at 2021-03-03 12:20:00 +0000, now=2021-03-03 12:19:59 +0000, prev=Optional(2021-03-03 12:20:00 +0000)
[ INFO ] Scheduling JobKickerJob to run at 2021-03-03 12:21:00 +0000, now=2021-03-03 12:20:00 +0000, prev=Optional(2021-03-03 12:20:00 +0000)
[ INFO ] Scheduling JobKickerJob to run at 2021-03-03 12:21:00 +0000, now=2021-03-03 12:20:59 +0000, prev=Optional(2021-03-03 12:21:00 +0000)
[ INFO ] Scheduling JobKickerJob to run at 2021-03-03 12:22:00 +0000, now=2021-03-03 12:21:00 +0000, prev=Optional(2021-03-03 12:21:00 +0000)
[ INFO ] Scheduling JobKickerJob to run at 2021-03-03 12:22:00 +0000, now=2021-03-03 12:21:59 +0000, prev=Optional(2021-03-03 12:22:00 +0000)

First, job is scheduled at 12:16:00 at (now=)12:15:26.

Next, job is started at (now=)12:15:59.
This is problem.
This job is actually scheduled at 12:16:00 but started earlier it.
So next schedule is also 12:16:00.
Immediately, 12:16:00 comes actually.
This is twice execution process.

After that, same phenomenon happens repeatedly.

To Reproduce

Configure job.

    app.queues.schedule(JobKickerJob()).minutely().at(0)
    try app.queues.startScheduledJobs()

Start app.

Expected behavior

Job should run only once at every scheduled interval.

Environment

I tried this at many environment.
I got only once environment as below.

  • Linux on Docker on Mac. Docker image is swift:5.3.
  • Release build. ($ swift build -c release)
  • My mac is iMac Pro 2017, 2.3 GHz 18 core Intel Xeon W
  • Docker engine uses 18 core.
  • vapor 4.41.2
  • queus-redis-driver 1.0.0
  • queues 1.5.1

Additional context

This bug causes concurrent execution and race condition in our project.
We didn't expect this at all, so debugging was very hard and consume many working time.
I finally reached to this library and very surprised.

It may be duplicate with #85

access the container

I want to send a request to another server once the job is triggered
but it's seems impossible to access the container

swift run Run jobs --scheduled only works with using .at()

If you schedule a job using at with a specific date then the scheduled works.. however if you try and setup a job using something like .everySecond() it never process the jobs

works

public func configure(_ app: Application) throws {
    app.jobs.schedule(TestJob()).at(Date(rfc1123: "Sat, 04 Jan 2020 16:47:40 GMT")!)
    try app.jobs.use(.redis(url: "redis://127.0.0.1:6379"))
}

Doesn't work

public func configure(_ app: Application) throws {
    app.jobs.schedule(TestJob()).everySecond()
    try app.jobs.use(.redis(url: "redis://127.0.0.1:6379"))
}

job is just a simple print

struct TestJob: ScheduledJob {    
    func run(context: JobContext) -> EventLoopFuture<Void> {
        print("Job has run \(context)")       
        return context.eventLoop.makeSucceededFuture(Void())
    }
}

if I make the change to ScheduleBuilder:

    public func everySecond() {
        self.second = 1
    }

and put in a breakpoint in the nextDate(current function I can get it to run once.. but not repeated

improving configuration api

This is what a min-viable setup for vapor + jobs looks like currently:

let app = Application(environment: env)
app.provider(JobsProvider())
app.register(JobsDriver.self) { app in
    return TestDriver(on: app.make())
}
app.register(extension: JobsConfiguration.self) { jobs, app in
    jobs.add(FooJob())
}
app.get("foo") { req in
    return req.jobs.dispatch(FooJob.Data(foo: "bar"))
        .map { "done" }
}
return app

Not bad at all, but I have some ideas. What about something like this:

let app = Application(environment: env)
app.provider(JobsProvider())

app.jobs.driver(TestDriver())
app.jobs.add(FooJob())
app.jobs.add(BarJob())
app.jobs.add(QuxJob())

app.get("foo") { req in
    return req.jobs.dispatch(FooJob.self, .init(foo: "bar"))
        .map { "done" }
}
return app

Multiple workers can execute the same scheduled job

Currently we create one ScheduledJobWorker per event loop. As far as I can tell, there is nothing in place to make sure that the job is only performed once and on one thread so that multiple workers do not execute the same job at the same time.

Support re-queueing stuck jobs

In order to fully complete conformance with the reliable queue pattern, the library should expose a hook that allows drivers to move idle jobs in processing back to the upcoming queue stack. This would be useful for jobs that get stuck if the process crashes.

Let the driver determine the appropriate payload storage format

I'm frustrated that when I use the excellent Fluent Driver for Queues, the payload/JobData enforces storage as raw Data. This ends up meaning that for Postgres and MySQL, it's a column containing [UInt8], which is far from ideal.

I assume that this was done for the early drivers, which require storage this way, but it would be good to unpick this.

Offering the payload as Codable and expecting the driver to handle persistence would be a more flexible approach, and perhaps allow the use of things like Postgres' jsonb type for far easier inspection and debugging of in-flight jobs.

Add option to configure number of workers

It would be nice to give the user the ability to control the number of workers they wish to have, instead of being forced to use the default of 1 per core.

Possible API:
app.queues.configuration.numWorkers = 2

Provide a Fluent-backed driver?

I've been working on porting the work I did for the earlier Jobs PostgreSQL driver over to the new Queues library, and I keep coming up against wanting to use Fluent to provide a JobData wrapper that could be constructed using Fluent.

It got me to thinking: why not provide a single Queues driver that backs onto whatever Fluent-backed database the user is already using?

Does this sound feasible?

Run in Application's EventLoopGroup for Vapor 3

It is cool if there is way to customize event loop group for version 0.2.7. Like in new version to reuse application's event loops group

Under hobby account in heroku there is a limit 20 postgres connections for all workers. And JobCommand creates MultiThreadedEventLoopGroup(numberOfThreads: System.coreCount) and every thread creates own connections as I understand correctly. So this limit is easy to reach currently

Output to Swift Metrics

We should add a dependency to Swift Metrics and output based metrics from the package like jobs queued, time taken to process a job and success/failure status

make testScheduledJob faster

Right now the smallest unit of time scheduled jobs support is a minute. This makes waiting for the scheduled jobs test to run pretty annoying.

Maybe we should add seconds or a schedule(at:Date) method? This would allow us to run the tests a lot faster and could be useful to the end user.

Queue only runs 0 or 1 task per refresh interval per worker

Describe the bug

The queues package is only able to run up to 1 task per refresh interval, per worker. For example, a system configured with 1 worker and a 10 second refresh interval, running tasks that take 1 second, cannot be utilised beyond 10%, and cannot run more than 1 task every 10 seconds.

To Reproduce

In a project with queues and a task configured...

  1. Set the number of workers to 1
  2. Set the refresh interval to 10 seconds
  3. Enqueue 10 tasks.

Expected behaviour
All 10 tasks complete one after the other with effectively zero gap between them.

Actual behaviour
The first task completes, and the queue waits until 10 seconds have elapsed before running the next.

Environment

queues 1.12.1
vapor-queues-fluent-driver 3.0.0-beta1

Additional context

I'm not sure whether this is worth filing on the fluent driver repo as well. I think there are two possible scenarios here:

  1. queues intends that the pop method in the driver API blocks until a job is available, likely intended for use with BRPOPLPUSH/etc on Redis. Therefore making this a "bug" in vapor-queues-fluent-driver or the queues documentation.
  2. (and/or) This is unintentional behaviour in queues, and it should instead re-pop if any job is run until there is nothing.

While the intention of refreshInterval isn't documented (that I can tell), I think it's most reasonable and would be most expected by those who have used other queueing systems, that it be a period to wait when there is no available work so the tuning of this value only impacts the polling of storage and does not impact the utilisation of queue workers.

JobData naming collision warning

We should provide a console warning when multiple jobs are registered with the same JobData name to help prevent collisions during decoding.

Document database/fluent access from inside dequeue

Could we add an example to the readme on how to access the database/fluent from the job please?

I haven't figured out what the best way would be... can we pass in a DatabaseConnectable as service? or is it possible to extend the JobContext to be DatabaseConnectable?

(If you have some examples then I would be willing to document it in the readme.)

Retry delay

Would be nice to be able to be able to specify a delay between retries.

Maybe something like

protocol Job {
    func nextRetry(attempt: Int) -> Date
}
extension Job {
    func nextRetry(attempt: Int) -> Date { return Date() }
}

That would allow you to implement exponential backoff etc

Scheduling a ScheduledJob hourly runs a few seconds early and reschedules thus running twice

Scheduling a synchronous job to run hourly runs a few seconds early then reschedules to run at the correct time a few seconds later - thus running twice. Occasionally it will run early more than once (see 08:01:57).

I have worked around this by not running the job if it is running or if it has run within the last minute.

Steps to reproduce

    // queue job to fetch currency rates at two minutes past each hour
    try app.queues.use(.redis(url: "redis://127.0.0.1:6379"))
    app.queues.schedule(fetcher)
        .hourly()
        .at(2)
    try app.queues.startScheduledJobs()

Expected behavior

ScheduledJob should run exactly once on the specified schedule.

Actual behavior

ScheduledJob runs early. For example, if the job is scheduled to run at two minutes past each hour (11:02:00), then it will run at (say) 11.01.57 then reschedule to run at the correct time of 11:02:00 thus running twice.

[ INFO ] 2020-07-31 01:57:58 +0000 Initializing rates (App/CurrencyRatesFetcher.swift:22)
[ INFO ] 2020-07-31 01:57:58 +0000 Try to load rates from file (App/CurrencyRatesFetcher.swift:26)
[ INFO ] 2020-07-31 01:57:58 +0000 Attempting to load cached rates from file file:///app/data/rates... (App/CurrencyRatesStorage.swift:48)
[ INFO ] 2020-07-31 01:57:58 +0000 Loading cached rates from file file:///app/data/rates... (App/CurrencyRatesStorage.swift:55)
[ INFO ] 2020-07-31 01:57:58 +0000 Cached rates: [{
"rates" : {
"BTC" : 8.9993039e-05,
"EGP" : 15.9703,
"MDL" : 16.576971,
"BRL" : 5.1554,
"QAR" : 3.641,
"MWK" : 740.341469,
...
"SGD" : 1.371368,
"YER" : 250.349961,
"FKP" : 0.762474
},
"base" : "USD",
"fetchedAt" : 1596153718.0983748,
"timestamp" : 1596153600
}] (App/CurrencyRatesStorage.swift:57)
[ DEBUG ] Factory created. [RedisConnectionFactory: 8387CBC7-9040-4427-8890-6D06D8C46934] (RedisKit/RedisConnectionSource.swift:20)
[ DEBUG ] Scheduling CurrencyRatesFetcher to run at 2020-07-31 02:02:00 +0000 (Queues/ScheduledJob.swift:36)
[ NOTICE ] Server starting on http://0.0.0.0:8080 (Vapor/HTTP/Server/HTTPServer.swift:183)
[ INFO ] 2020-07-31 02:01:59 +0000 Fetching currencies on queue scheduled... (App/CurrencyRatesFetcher.swift:117)
[ DEBUG ] Scheduling CurrencyRatesFetcher to run at 2020-07-31 02:02:00 +0000 (Queues/ScheduledJob.swift:36)
[ ERROR ] CurrencyRatesFetcher failed: inProgress (Queues/QueuesCommand.swift:139)
[ DEBUG ] Scheduling CurrencyRatesFetcher to run at 2020-07-31 03:02:00 +0000 (Queues/ScheduledJob.swift:36)
[ INFO ] 2020-07-31 02:02:02 +0000 Fetched 171 currencies at July 31, 2020 at 2:02:02 AM UTC (App/CurrencyRatesFetcher.swift:48)
[ INFO ] 2020-07-31 02:02:02 +0000 Currency rates up to date at July 31, 2020 at 2:00:00 AM UTC) (App/CurrencyRatesFetcher.swift:50)
[ INFO ] 2020-07-31 02:02:02 +0000 Saved rates to file rates (App/CurrencyRatesFetcher.swift:57)
[ INFO ] 2020-07-31 03:01:59 +0000 Fetching currencies on queue scheduled... (App/CurrencyRatesFetcher.swift:117)
[ DEBUG ] Scheduling CurrencyRatesFetcher to run at 2020-07-31 03:02:00 +0000 (Queues/ScheduledJob.swift:36)
[ ERROR ] CurrencyRatesFetcher failed: inProgress (Queues/QueuesCommand.swift:139)
[ DEBUG ] Scheduling CurrencyRatesFetcher to run at 2020-07-31 04:02:00 +0000 (Queues/ScheduledJob.swift:36)
[ INFO ] 2020-07-31 03:02:00 +0000 Fetched 171 currencies at July 31, 2020 at 3:02:00 AM UTC (App/CurrencyRatesFetcher.swift:48)
[ INFO ] 2020-07-31 03:02:00 +0000 Currency rates up to date at July 31, 2020 at 3:00:00 AM UTC) (App/CurrencyRatesFetcher.swift:50)
[ INFO ] 2020-07-31 03:02:00 +0000 Saved rates to file rates (App/CurrencyRatesFetcher.swift:57)
[ INFO ] 2020-07-31 04:01:57 +0000 Fetching currencies on queue scheduled... (App/CurrencyRatesFetcher.swift:117)
[ DEBUG ] Scheduling CurrencyRatesFetcher to run at 2020-07-31 04:02:00 +0000 (Queues/ScheduledJob.swift:36)
[ INFO ] 2020-07-31 04:01:58 +0000 Fetched 171 currencies at July 31, 2020 at 4:01:58 AM UTC (App/CurrencyRatesFetcher.swift:48)
[ INFO ] 2020-07-31 04:01:58 +0000 Currency rates up to date at July 31, 2020 at 4:00:00 AM UTC) (App/CurrencyRatesFetcher.swift:50)
[ INFO ] 2020-07-31 04:01:58 +0000 Saved rates to file rates (App/CurrencyRatesFetcher.swift:57)
[ DEBUG ] 2020-07-31 04:02:00 +0000 Skipping fetch. Last fetched at 2020-07-31 04:01:58 +0000 (App/CurrencyRatesFetcher.swift:114)
[ DEBUG ] Scheduling CurrencyRatesFetcher to run at 2020-07-31 05:02:00 +0000 (Queues/ScheduledJob.swift:36)
[ INFO ] 2020-07-31 05:01:57 +0000 Fetching currencies on queue scheduled... (App/CurrencyRatesFetcher.swift:117)
[ DEBUG ] Scheduling CurrencyRatesFetcher to run at 2020-07-31 05:02:00 +0000 (Queues/ScheduledJob.swift:36)
[ INFO ] 2020-07-31 05:01:57 +0000 Fetched 171 currencies at July 31, 2020 at 5:01:57 AM UTC (App/CurrencyRatesFetcher.swift:48)
[ INFO ] 2020-07-31 05:01:57 +0000 Currency rates up to date at July 31, 2020 at 5:00:00 AM UTC) (App/CurrencyRatesFetcher.swift:50)
[ INFO ] 2020-07-31 05:01:57 +0000 Saved rates to file rates (App/CurrencyRatesFetcher.swift:57)
[ DEBUG ] 2020-07-31 05:02:00 +0000 Skipping fetch. Last fetched at 2020-07-31 05:01:57 +0000(App/CurrencyRatesFetcher.swift:114)
[ DEBUG ] Scheduling CurrencyRatesFetcher to run at 2020-07-31 06:02:00 +0000 (Queues/ScheduledJob.swift:36)
[ INFO ] 2020-07-31 06:01:57 +0000 Fetching currencies on queue scheduled... (App/CurrencyRatesFetcher.swift:117)
[ DEBUG ] Scheduling CurrencyRatesFetcher to run at 2020-07-31 06:02:00 +0000 (Queues/ScheduledJob.swift:36)
[ INFO ] 2020-07-31 06:01:58 +0000 Fetched 171 currencies at July 31, 2020 at 6:01:58 AM UTC (App/CurrencyRatesFetcher.swift:48)
[ INFO ] 2020-07-31 06:01:58 +0000 Currency rates up to date at July 31, 2020 at 6:00:00 AM UTC) (App/CurrencyRatesFetcher.swift:50)
[ INFO ] 2020-07-31 06:01:58 +0000 Saved rates to file rates (App/CurrencyRatesFetcher.swift:57)
[ DEBUG ] 2020-07-31 06:02:00 +0000 Skipping fetch. Last fetched at 2020-07-31 06:01:58 +0000 (App/CurrencyRatesFetcher.swift:114)
[ DEBUG ] Scheduling CurrencyRatesFetcher to run at 2020-07-31 07:02:00 +0000 (Queues/ScheduledJob.swift:36)
[ INFO ] 2020-07-31 07:01:57 +0000 Fetching currencies on queue scheduled... (App/CurrencyRatesFetcher.swift:117)
[ DEBUG ] Scheduling CurrencyRatesFetcher to run at 2020-07-31 07:02:00 +0000 (Queues/ScheduledJob.swift:36)
[ INFO ] 2020-07-31 07:01:58 +0000 Fetched 171 currencies at July 31, 2020 at 7:01:58 AM UTC (App/CurrencyRatesFetcher.swift:48)
[ INFO ] 2020-07-31 07:01:58 +0000 Currency rates up to date at July 31, 2020 at 7:00:00 AM UTC) (App/CurrencyRatesFetcher.swift:50)
[ INFO ] 2020-07-31 07:01:58 +0000 Saved rates to file rates (App/CurrencyRatesFetcher.swift:57)
[ DEBUG ] 2020-07-31 07:02:00 +0000 Skipping fetch. Last fetched at 2020-07-31 07:01:58 +0000 (App/CurrencyRatesFetcher.swift:114)
[ DEBUG ] Scheduling CurrencyRatesFetcher to run at 2020-07-31 08:02:00 +0000 (Queues/ScheduledJob.swift:36)
[ INFO ] 2020-07-31 08:01:57 +0000 Fetching currencies on queue scheduled... (App/CurrencyRatesFetcher.swift:117)
[ DEBUG ] Scheduling CurrencyRatesFetcher to run at 2020-07-31 08:02:00 +0000 (Queues/ScheduledJob.swift:36)
[ INFO ] 2020-07-31 08:01:58 +0000 Fetched 171 currencies at July 31, 2020 at 8:01:58 AM UTC (App/CurrencyRatesFetcher.swift:48)
[ INFO ] 2020-07-31 08:01:58 +0000 Currency rates up to date at July 31, 2020 at 8:00:00 AM UTC) (App/CurrencyRatesFetcher.swift:50)
[ DEBUG ] 2020-07-31 08:01:59 +0000 Skipping fetch. Last fetched at 2020-07-31 08:01:58 +0000 (App/CurrencyRatesFetcher.swift:114)
[ DEBUG ] Scheduling CurrencyRatesFetcher to run at 2020-07-31 08:02:00 +0000 (Queues/ScheduledJob.swift:36)
[ DEBUG ] 2020-07-31 08:02:00 +0000 Skipping fetch. Last fetched at 2020-07-31 08:01:58 +0000 (App/CurrencyRatesFetcher.swift:114)
[ DEBUG ] Scheduling CurrencyRatesFetcher to run at 2020-07-31 09:02:00 +0000 (Queues/ScheduledJob.swift:36)
[ INFO ] 2020-07-31 09:01:57 +0000 Fetching currencies on queue scheduled... (App/CurrencyRatesFetcher.swift:117)
[ DEBUG ] Scheduling CurrencyRatesFetcher to run at 2020-07-31 09:02:00 +0000 (Queues/ScheduledJob.swift:36)
[ INFO ] 2020-07-31 09:01:58 +0000 Fetched 171 currencies at July 31, 2020 at 9:01:58 AM UTC (App/CurrencyRatesFetcher.swift:48)
[ INFO ] 2020-07-31 09:01:58 +0000 Currency rates up to date at July 31, 2020 at 9:00:00 AM UTC) (App/CurrencyRatesFetcher.swift:50)
[ DEBUG ] 2020-07-31 09:01:59 +0000 Skipping fetch. Last fetched at 2020-07-31 09:01:58 +0000 (App/CurrencyRatesFetcher.swift:114)
[ DEBUG ] Scheduling CurrencyRatesFetcher to run at 2020-07-31 09:02:00 +0000 (Queues/ScheduledJob.swift:36)
[ DEBUG ] 2020-07-31 09:02:00 +0000 Skipping fetch. Last fetched at 2020-07-31 09:01:58 +0000 (App/CurrencyRatesFetcher.swift:114)
[ DEBUG ] Scheduling CurrencyRatesFetcher to run at 2020-07-31 10:02:00 +0000 (Queues/ScheduledJob.swift:36)
[ INFO ] 2020-07-31 10:01:57 +0000 Fetching currencies on queue scheduled... (App/CurrencyRatesFetcher.swift:117)
[ DEBUG ] Scheduling CurrencyRatesFetcher to run at 2020-07-31 10:02:00 +0000 (Queues/ScheduledJob.swift:36)
[ INFO ] 2020-07-31 10:01:58 +0000 Fetched 171 currencies at July 31, 2020 at 10:01:58 AM UTC (App/CurrencyRatesFetcher.swift:48)
[ INFO ] 2020-07-31 10:01:58 +0000 Currency rates up to date at July 31, 2020 at 10:00:00 AM UTC) (App/CurrencyRatesFetcher.swift:50)
[ DEBUG ] 2020-07-31 10:02:00 +0000 Skipping fetch. Last fetched at 2020-07-31 10:01:58 +0000 (App/CurrencyRatesFetcher.swift:114)
[ DEBUG ] Scheduling CurrencyRatesFetcher to run at 2020-07-31 11:02:00 +0000 (Queues/ScheduledJob.swift:36)[ INFO ] 2020-07-31 11:01:57 +0000 Fetching currencies on queue scheduled... (App/CurrencyRatesFetcher.swift:117)
[ DEBUG ] Scheduling CurrencyRatesFetcher to run at 2020-07-31 11:02:00 +0000 (Queues/ScheduledJob.swift:36)
[ INFO ] 2020-07-31 11:01:59 +0000 Fetched 171 currencies at July 31, 2020 at 11:01:59 AM UTC (App/CurrencyRatesFetcher.swift:48)
[ INFO ] 2020-07-31 11:01:59 +0000 Currency rates up to date at July 31, 2020 at 11:00:00 AM UTC) (App/CurrencyRatesFetcher.swift:50)
[ DEBUG ] 2020-07-31 11:02:00 +0000 Skipping fetch. Last fetched at 2020-07-31 11:01:59 +0000 (App/CurrencyRatesFetcher.swift:114)
[ DEBUG ] Scheduling CurrencyRatesFetcher to run at 2020-07-31 12:02:00 +0000 (Queues/ScheduledJob.swift:36)
[ INFO ] 2020-07-31 12:01:57 +0000 Fetching currencies on queue scheduled... (App/CurrencyRatesFetcher.swift:117)
[ DEBUG ] Scheduling CurrencyRatesFetcher to run at 2020-07-31 12:02:00 +0000 (Queues/ScheduledJob.swift:36)
[ INFO ] 2020-07-31 12:01:58 +0000 Fetched 171 currencies at July 31, 2020 at 12:01:58 PM UTC (App/CurrencyRatesFetcher.swift:48)
[ INFO ] 2020-07-31 12:01:58 +0000 Currency rates up to date at July 31, 2020 at 12:00:00 PM UTC) (App/CurrencyRatesFetcher.swift:50)
[ DEBUG ] 2020-07-31 12:02:00 +0000 Skipping fetch. Last fetched at 2020-07-31 12:01:58 +0000 (App/CurrencyRatesFetcher.swift:114)
[ DEBUG ] Scheduling CurrencyRatesFetcher to run at 2020-07-31 13:02:00 +0000 (Queues/ScheduledJob.swift:36)
[ INFO ] 2020-07-31 13:01:57 +0000 Fetching currencies on queue scheduled... (App/CurrencyRatesFetcher.swift:117)
[ DEBUG ] Scheduling CurrencyRatesFetcher to run at 2020-07-31 13:02:00 +0000 (Queues/ScheduledJob.swift:36)
[ INFO ] 2020-07-31 13:01:59 +0000 Fetched 171 currencies at July 31, 2020 at 1:01:59 PM UTC (App/CurrencyRatesFetcher.swift:48)
[ INFO ] 2020-07-31 13:01:59 +0000 Currency rates up to date at July 31, 2020 at 1:00:00 PM UTC) (App/CurrencyRatesFetcher.swift:50)
[ DEBUG ] 2020-07-31 13:02:00 +0000 Skipping fetch. Last fetched at 2020-07-31 13:01:59 +0000 (App/CurrencyRatesFetcher.swift:114)
[ DEBUG ] Scheduling CurrencyRatesFetcher to run at 2020-07-31 14:02:00 +0000 (Queues/ScheduledJob.swift:36)

Environment

  • Vapor Framework version: 4.27.1
  • Vapor Toolbox version: 18.0.0
  • OS version: MacOS 10.15.5; Docker: build image: FROM swift:5.2-focal, run image: FROM swift:5.2-focal-slim

"JobsCommand did not shutdown before deinit" when run unit tests

Steps to reproduce

I use the QUEUES package and try to understand how to use it. I have created a simple Job which fetches data from a remote server. I config vapor application with the following function

// configures your application
public func configure(_ app: Application) throws {
    // uncomment to serve files from /Public folder
    // app.middleware.use(FileMiddleware(publicDirectory: app.directory.publicDirectory))

    app.databases.use(.sqlite(.file("db.sqlite")), as: .sqlite)
    
    try app.queues.use(.redis(url: "redis://127.0.0.1:6379"))

    //Register jobs
    let categoriesFetcherJob = FetcherJob()
    app.queues.add(categoriesFetcherJob)
    applyWPV2Migrations(app)
    
    try app.queues.startInProcessJobs(on: .default)
    // register routes
    try routes(app)
}

I dispatch Job from Controller like this:

    func index(req: Request) throws -> EventLoopFuture<[String: String]> {
        let jobPayload = CategoriesFetcherJobPayload(startPage: 1, categoriesPerPage: 10)
        return req.queue.dispatch(FetcherJob.self, jobPayload)
            .flatMap { (_) -> EventLoopFuture<[String: String]> in
                let res = ["status": "Fetching of remote categories was scheduled successfully"]
                return req.eventLoop.makeSucceededFuture(res)
        }
    }

The issue occurs when I run unit tests not necessarily related to JOB functionality:

    func testAssert() throws {
        let app = Application(.testing)
        defer {  app.shutdown() }
        try configure(app)
        XCTAssertTrue(true)
} 

After execution, I always hit assertion assert(self.didShutdown, "JobsCommand did not shutdown before deinit")

#6 0x0000000105535f34 in QueuesCommand.deinit at /queues/Sources/Queues/QueuesCommand.swift:164

OR

        `assertionFailure("Command handler deinit when queue is not empty! Queue size: \(self.commandResponseQueue.count)")`

#7 0x00000001053f0a07 in RedisCommandHandler.deinit at /RediStack/Sources/RediStack/ChannelHandlers/RedisCommandHandler.swift:43

Expected behavior

Tests do not crash

Actual behavior

Hit assertion related to JOB but there is no way to shutdown it properly.

Environment

 dependencies: [
        // ๐Ÿ’ง A server-side Swift web framework.
        .package(url: "https://github.com/vapor/vapor.git", from: "4.14.0"),
        .package(url: "https://github.com/vapor/fluent.git", from: "4.0.0"),
        .package(url: "https://github.com/vapor/fluent-sqlite-driver.git", from: "4.0.0-rc.2"),
        .package(url: "https://github.com/vapor/queues-redis-driver.git", from: "1.0.0-rc.3"),
        // .package(url: "https://github.com/apple/swift-protobuf.git", from: "1.8.0")
    ],

macOS 10.15.4

extend command configuration in provider

We should automatically add the jobs command in the JobsProvider now that this is possible in Vapor 4.

services.extend(CommandConfiguration.self) { configuration, container in
    try configuration.use(c.make(JobsCommand.self), as: "jobs")
}

API for configuring drivers

There's a bit of manual work currently required to get a valid JobsPersistanceLayer. We should provide some convenience APIs for this like FluentKit does.

Each driver can extend those convenience APIs with functions for supplying the required credentials.

Job not retrying again after an error

I created a job as defined below it's working fine but once there is an error it doesn't retry again
I added the
context.eventLoop.future(error: Abort(.badRequest, reason: "My error here."))
but the compiler says argument passed to call that takes no arguments

import Foundation
import Jobs
import Vapor

struct SyncCaisseOperationsJob: Job {

    let container: Container

    init(container: Container){
        self.container = container
    }

    func dequeue(_ context: JobContext, _ data: PushCaisseOperationDataJob) -> EventLoopFuture<Void> {

        do {
            let service = try container.make(CaisseServices.self)
            return try service.pushCaisseOperations(
                container,
                caisseId: data.caisse_id,
                cmdClients: data.data,
                urlQuery: data.query
            )
        } catch {
            return Future.map(on: container, { () })
        }
        
    }

}

better type-safety on job dispatch

Currently job handlers are looked up by their Data associated type. This could lead to a potential ambiguity. For example, what would happen if two job handlers used [String: String] as their data type?

Maybe we could do something like:

func dispatch<Job>(_ job: Job.Type, _ data: Job.Data) { ... }

That could look like:

req.jobs.dispatch(EmailJob.self, .init(message: "hi"))

Support in-process jobs

This should be as simple as making startJobs and startScheduledJobs public in JobsCommand

Schedule job every x minutes

Looks like there is no way to make ScheduledJobs run every X hours or X Mins.

The current helpers if I am not mistaken are every 1 min, 1 hour..

I don't see a way to create a ScheduledBuilder and use that to schedule a job.

looks like the jobs.schedule() calls the mutating function self.storage.configuration.schedule(job, builder: builder) which is internal.

Queues Package for official MongoDB Swift Driver

I've been using the official MongoDB Swift Driver which has been awesome so far.

For Queues, I believe the current community MongoDB Queues package only supports using the third party MongoKitten and not the official MongoDB Swift Driver.

I wanted to raise this in case anyone in the community has created a queues package or is working on one for the official MongoDB Swift Driver.

I think this would be be a great package to have for the community going forward.

Make a Vapor Queue Sequential

**Is your feature request related to a problem?
Currently, when running a Vapor Queue on a Mac in development, when adding jobs to a queue the jobs are pulled off by multiple eventloops, (assumption) and thus the jobs don't complete in added order. Thus the queue is not FIFO.

Describe the solution you'd like
I would like a setting to make a queue sequential or FIFO.

Describe alternatives you've considered
I can't find any alternatives. Thus I'm blocked with how I can use a Vapor Queue in this critical instance.

Additional context
I understand the value of a queue being processed by multiple event loops. I have other use cases that use multi event-loops successfully. But the need for FIFO is critical, in a different use case.

Specifically what I need: I have a series of jobs that save large amounts of data to a postgres database using Fluent. It is fine that multiple event loops process those jobs because all the save are independent. But here is the kicker. I need to add a completion job to that queue, that executes after all the other jobs complete. As it now stands, that final job, added to the queues last, executes before some of the dependent jobs complete.

A sequential/FIFO is an easy way to ensure the final job is executed after all other jobs complete.

how to use?

Hi, Can add an executable target to show sending emails every hour?

Supplying an Application instance when dequeuing a job

It would be advantageous to have access to an instance of Application when dequeueing a job.
Maybe having it added as a property on the JobContext

func dequeue(_ context: JobContext, _ payload: SomeJob) -> EventLoopFuture<Void> {
    context.application.someService.....
}

How to test jobs using XCTest?

How would you go about testing a job since it is run on a thread that does not notify when it is complete? Right now I am just calling sleep(10) before running my asserts, but I would like to be able to know when an AsyncJob is done.

In Process Jobs... start Scheduled with no ScheduledJobs

If I use the example app.. or my own and try the command:
try JobsCommand(application: app, scheduled: true).startScheduledJobs()

It produces the error Assertion failed: JobsCommand did not shutdown before deinit: file
If there is no call to: app.jobs.schedule prior to calling the . startScheduledJobs()

I get why that is.. but I wonder if there is a better way to handle this.. such as a precondition to give a warning that no jobs of type ScheduledJob have been put onto the schedule

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.