Giter Club home page Giter Club logo

django-rq's Introduction

Django-RQ

Build Status

Django integration with RQ, a Redis based Python queuing library. Django-RQ is a simple app that allows you to configure your queues in django's settings.py and easily use them in your project.

Support Django-RQ

If you find django-rq useful, please consider supporting its development via Tidelift.

Requirements

Installation

pip install django-rq
  • Add django_rq to INSTALLED_APPS in settings.py:
INSTALLED_APPS = (
    # other apps
    "django_rq",
)
  • Configure your queues in django's settings.py:
RQ_QUEUES = {
    'default': {
        'HOST': 'localhost',
        'PORT': 6379,
        'DB': 0,
        'USERNAME': 'some-user',
        'PASSWORD': 'some-password',
        'DEFAULT_TIMEOUT': 360,
        'REDIS_CLIENT_KWARGS': {    # Eventual additional Redis connection arguments
            'ssl_cert_reqs': None,
        },
    },
    'with-sentinel': {
        'SENTINELS': [('localhost', 26736), ('localhost', 26737)],
        'MASTER_NAME': 'redismaster',
        'DB': 0,
        # Redis username/password
        'USERNAME': 'redis-user',
        'PASSWORD': 'secret',
        'SOCKET_TIMEOUT': 0.3,
        'CONNECTION_KWARGS': {  # Eventual additional Redis connection arguments
            'ssl': True
        },
        'SENTINEL_KWARGS': {    # Eventual Sentinel connection arguments
            # If Sentinel also has auth, username/password can be passed here
            'username': 'sentinel-user',
            'password': 'secret',
        },
    },
    'high': {
        'URL': os.getenv('REDISTOGO_URL', 'redis://localhost:6379/0'), # If you're on Heroku
        'DEFAULT_TIMEOUT': 500,
    },
    'low': {
        'HOST': 'localhost',
        'PORT': 6379,
        'DB': 0,
    }
}

RQ_EXCEPTION_HANDLERS = ['path.to.my.handler'] # If you need custom exception handlers
  • Include django_rq.urls in your urls.py:
urlpatterns += [
    path('django-rq/', include('django_rq.urls'))
]

Usage

Putting jobs in the queue

Django-RQ allows you to easily put jobs into any of the queues defined in settings.py. It comes with a few utility functions:

  • enqueue - push a job to the default queue:
import django_rq
django_rq.enqueue(func, foo, bar=baz)
  • get_queue - returns an Queue instance.
import django_rq
queue = django_rq.get_queue('high')
queue.enqueue(func, foo, bar=baz)

In addition to name argument, get_queue also accepts default_timeout, is_async, autocommit, connection and queue_class arguments. For example:

queue = django_rq.get_queue('default', autocommit=True, is_async=True, default_timeout=360)
queue.enqueue(func, foo, bar=baz)

You can provide your own singleton Redis connection object to this function so that it will not create a new connection object for each queue definition. This will help you limit number of connections to Redis server. For example:

import django_rq
import redis
redis_cursor = redis.StrictRedis(host='', port='', db='', password='')
high_queue = django_rq.get_queue('high', connection=redis_cursor)
low_queue = django_rq.get_queue('low', connection=redis_cursor)
  • get_connection - accepts a single queue name argument (defaults to "default") and returns a connection to the queue's Redis server:
import django_rq
redis_conn = django_rq.get_connection('high')
  • get_worker - accepts optional queue names and returns a new RQ Worker instance for specified queues (or default queue):
import django_rq
worker = django_rq.get_worker() # Returns a worker for "default" queue
worker.work()
worker = django_rq.get_worker('low', 'high') # Returns a worker for "low" and "high"

@job decorator

To easily turn a callable into an RQ task, you can also use the @job decorator that comes with django_rq:

from django_rq import job

@job
def long_running_func():
    pass
long_running_func.delay() # Enqueue function in "default" queue

@job('high')
def long_running_func():
    pass
long_running_func.delay() # Enqueue function in "high" queue

You can pass in any arguments that RQ's job decorator accepts:

@job('default', timeout=3600)
def long_running_func():
    pass
long_running_func.delay() # Enqueue function with a timeout of 3600 seconds.

It's possible to specify default for result_ttl decorator keyword argument via DEFAULT_RESULT_TTL setting:

RQ = {
    'DEFAULT_RESULT_TTL': 5000,
}

With this setting, job decorator will set result_ttl to 5000 unless it's specified explicitly.

Running workers

django_rq provides a management command that starts a worker for every queue specified as arguments:

python manage.py rqworker high default low

If you want to run rqworker in burst mode, you can pass in the --burst flag:

python manage.py rqworker high default low --burst

If you need to use custom worker, job or queue classes, it is best to use global settings (see Custom queue classes and Custom job and worker classes). However, it is also possible to override such settings with command line options as follows.

To use a custom worker class, you can pass in the --worker-class flag with the path to your worker:

python manage.py rqworker high default low --worker-class 'path.to.GeventWorker'

To use a custom queue class, you can pass in the --queue-class flag with the path to your queue class:

python manage.py rqworker high default low --queue-class 'path.to.CustomQueue'

To use a custom job class, provide --job-class flag.

Starting from version 2.10, running RQ's worker-pool is also supported:

python manage.py rqworker-pool default low medium --num-workers 4

Support for Scheduled Jobs

With RQ 1.2.0. you can use built-in scheduler for your jobs. For example:

from django_rq.queues import get_queue
queue = get_queue('default')
job = queue.enqueue_at(datetime(2020, 10, 10), func)

If you are using built-in scheduler you have to start workers with scheduler support:

python manage.py rqworker --with-scheduler

Alternatively you can use RQ Scheduler. After install you can also use the get_scheduler function to return a Scheduler instance for queues defined in settings.py's RQ_QUEUES. For example:

import django_rq
scheduler = django_rq.get_scheduler('default')
job = scheduler.enqueue_at(datetime(2020, 10, 10), func)

You can also use the management command rqscheduler to start the scheduler:

python manage.py rqscheduler

Support for django-redis and django-redis-cache

If you have django-redis or django-redis-cache installed, you can instruct django_rq to use the same connection information from your Redis cache. This has two advantages: it's DRY and it takes advantage of any optimization that may be going on in your cache setup (like using connection pooling or Hiredis.)

To use configure it, use a dict with the key USE_REDIS_CACHE pointing to the name of the desired cache in your RQ_QUEUES dict. It goes without saying that the chosen cache must exist and use the Redis backend. See your respective Redis cache package docs for configuration instructions. It's also important to point out that since the django-redis-cache ShardedClient splits the cache over multiple Redis connections, it does not work.

Here is an example settings fragment for `django-redis`:

CACHES = {
    'redis-cache': {
        'BACKEND': 'redis_cache.cache.RedisCache',
        'LOCATION': 'localhost:6379:1',
        'OPTIONS': {
            'CLIENT_CLASS': 'django_redis.client.DefaultClient',
            'MAX_ENTRIES': 5000,
        },
    },
}

RQ_QUEUES = {
    'high': {
        'USE_REDIS_CACHE': 'redis-cache',
    },
    'low': {
        'USE_REDIS_CACHE': 'redis-cache',
    },
}

Suspending and Resuming Workers ----------------

Sometimes you may want to suspend RQ to prevent it from processing new jobs. A classic example is during the initial phase of a deployment script or in advance of putting your site into maintenance mode. This is particularly helpful when you have jobs that are relatively long-running and might otherwise be forcibly killed during the deploy.

The suspend command stops workers on _all queues (in a single Redis database) from picking up new jobs. However currently running jobs will continue until completion.

# Suspend indefinitely
python manage.py rqsuspend

# Suspend for a specific duration (in seconds) then automatically
# resume work again.
python manage.py rqsuspend -d 600

# Resume work again.
python manage.py rqresume

Queue Statistics

django_rq also provides a dashboard to monitor the status of your queues at /django-rq/ (or whatever URL you set in your urls.py during installation.

You can also add a link to this dashboard link in /admin by adding RQ_SHOW_ADMIN_LINK = True in settings.py. Be careful though, this will override the default admin template so it may interfere with other apps that modifies the default admin template.

These statistics are also available in JSON format via /django-rq/stats.json, which is accessible to staff members. If you need to access this view via other HTTP clients (for monitoring purposes), you can define RQ_API_TOKEN and access it via /django-rq/stats.json/<API_TOKEN>.

image

Note: Statistics of scheduled jobs display jobs from RQ built-in scheduler, not optional RQ scheduler.

Additionally, these statistics are also accessible from the command line.

python manage.py rqstats
python manage.py rqstats --interval=1  # Refreshes every second
python manage.py rqstats --json  # Output as JSON
python manage.py rqstats --yaml  # Output as YAML

image

Configuring Sentry

Sentry should be configured within the Django settings.py as described in the Sentry docs.

You can override the default Django Sentry configuration when running the rqworker command by passing the sentry-dsn option:

./manage.py rqworker --sentry-dsn=https://*****@sentry.io/222222

This will override any existing Django configuration and reinitialise Sentry, setting the following Sentry options:

{
    'debug': options.get('sentry_debug'),
    'ca_certs': options.get('sentry_ca_certs'),
    'integrations': [RedisIntegration(), RqIntegration(), DjangoIntegration()]
}

Configuring Logging

RQ uses Python's logging, this means you can easily configure rqworker's logging mechanism in django's settings.py. For example:

LOGGING = {
    "version": 1,
    "disable_existing_loggers": False,
    "formatters": {
        "rq_console": {
            "format": "%(asctime)s %(message)s",
            "datefmt": "%H:%M:%S",
        },
    },
    "handlers": {
        "rq_console": {
            "level": "DEBUG",
            "class": "rq.logutils.ColorizingStreamHandler",
            "formatter": "rq_console",
            "exclude": ["%(asctime)s"],
        },
    },
    'loggers': {
        "rq.worker": {
            "handlers": ["rq_console", "sentry"],
            "level": "DEBUG"
        },
    }
}

Custom Queue Classes

By default, every queue will use DjangoRQ class. If you want to use a custom queue class, you can do so by adding a QUEUE_CLASS option on a per queue basis in RQ_QUEUES:

RQ_QUEUES = {
    'default': {
        'HOST': 'localhost',
        'PORT': 6379,
        'DB': 0,
        'QUEUE_CLASS': 'module.path.CustomClass',
    }
}

or you can specify DjangoRQ to use a custom class for all your queues in RQ settings:

RQ = {
    'QUEUE_CLASS': 'module.path.CustomClass',
}

Custom queue classes should inherit from django_rq.queues.DjangoRQ.

If you are using more than one queue class (not recommended), be sure to only run workers on queues with same queue class. For example if you have two queues defined in RQ_QUEUES and one has custom class specified, you would have to run at least two separate workers for each queue.

Custom Job and Worker Classes

Similarly to custom queue classes, global custom job and worker classes can be configured using JOB_CLASS and WORKER_CLASS settings:

RQ = {
    'JOB_CLASS': 'module.path.CustomJobClass',
    'WORKER_CLASS': 'module.path.CustomWorkerClass',
}

Custom job class should inherit from rq.job.Job. It will be used for all jobs if configured.

Custom worker class should inherit from rq.worker.Worker. It will be used for running all workers unless overridden by rqworker management command worker-class option.

Testing Tip

For an easier testing process, you can run a worker synchronously this way:

from django.test import TestCase
from django_rq import get_worker

class MyTest(TestCase):
    def test_something_that_creates_jobs(self):
        ...                      # Stuff that init jobs.
        get_worker().work(burst=True)  # Processes all jobs then stop.
        ...                      # Asserts that the job stuff is done.

Synchronous Mode

You can set the option ASYNC to False to make synchronous operation the default for a given queue. This will cause jobs to execute immediately and on the same thread as they are dispatched, which is useful for testing and debugging. For example, you might add the following after you queue configuration in your settings file:

# ... Logic to set DEBUG and TESTING settings to True or False ...

# ... Regular RQ_QUEUES setup code ...

if DEBUG or TESTING:
    for queueConfig in RQ_QUEUES.values():
        queueConfig['ASYNC'] = False

Note that setting the is_async parameter explicitly when calling get_queue will override this setting.

Running Tests

To run django_rq's test suite:

`which django-admin` test django_rq --settings=django_rq.tests.settings --pythonpath=.

Deploying on Ubuntu

Create an rqworker service that runs the high, default, and low queues.

sudo vi /etc/systemd/system/rqworker.service

[Unit]
Description=Django-RQ Worker
After=network.target

[Service]
WorkingDirectory=<<path_to_your_project_folder>>
ExecStart=/home/ubuntu/.virtualenv/<<your_virtualenv>>/bin/python \
    <<path_to_your_project_folder>>/manage.py \
    rqworker high default low

[Install]
WantedBy=multi-user.target

Enable and start the service

sudo systemctl enable rqworker
sudo systemctl start rqworker

Deploying on Heroku

Add django-rq to your requirements.txt file with:

pip freeze > requirements.txt

Update your Procfile to:

web: gunicorn --pythonpath="$PWD/your_app_name" config.wsgi:application

worker: python your_app_name/manage.py rqworker high default low

Commit and re-deploy. Then add your new worker with:

heroku scale worker=1

Changelog

See CHANGELOG.md.

django-rq's People

Contributors

acjay avatar agconti avatar b7w avatar chromium7 avatar dependabot-preview[bot] avatar dependabot[bot] avatar efi-mk avatar gabriels1234 avatar hendi avatar jeanphix avatar krukov avatar lechup avatar lucastamoios avatar michaelbrooks avatar myhro avatar philippbosch avatar pnuckowski avatar sbussetti avatar seiryuz avatar selwin avatar smaccona avatar speclad avatar terencehonles avatar therefromhere avatar timgates42 avatar viaregio avatar vindemasi avatar wastrachan avatar yamikep avatar zz-read avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

django-rq's Issues

No management command to run rq-scheduler

There is management command to run rqworker with Django environment set (python manage.py rqworker ...) but no analogue for running rq-scheduler.
It would be great to have one instead of running DJANGO_SETTINGS_MODULE=... bin/rqscheduler.

Question: Where should periodic tasks be put?

Using rq-scheduler I can define a periodic task but I am not sure where I should put this code. I have used celery before and it puts such tasks in the settings module, but this is not possible with django-rq because of how get_scheduler is defined. Is there a recommended location for infinite periodic tasks that need to be launched with the application?

How to gracefully reload django rqworker

Sorry for asking question in an issue tracker. But I am wondering how to gracefully reload django rqworker without losing any running jobs.

Thanks in advance!

Error when trying to pass django request object as function arg to enqueue

Hello!
I'm has some mamagement command in my django project, and I need to add them to queue. Thats work fine:

django_rq.enqueue(call_command, 'load_new_price', file, queue=True)

But I also need to pass django request object to this function, like this:

django_rq.enqueue(call_command, 'load_new_price', file, queue=True,  request=requst)

And I has error: "expected string or Unicode object, NoneType found".
In file: rq/job.py in save:
line: 303 obj['data'] = dumps(self.job_tuple)

Reusing Redis connections

I'm using the "high", "medium", "low" queue pattern and my instrumentation is indicating that my request loops are spending a lot of time just connecting to Redis. Since I'm using Redis for caching as well via django-redis, it seems like I should be able to use cache.client to retrieve the cache connection that get created in the middleware. In RQ, it looks like I could just use the Connection context manager. It would be nice to be able to do this within the standard django-rq paradigm, but I think this might require some modification to do cleanly.

But I thought I'd propose the idea of making an alternative configuration that cleanly ties into django-redis system, for people who want to do things that way. In my mind, you would simply provide the key for the cache entry to use and the names of the queues to create, and the system would take it from there. If I have the time, I might fork, but I thought I'd ask your thoughts first.

sentry and RQ?

Hi,

Very nice app. Good job. Thanks

I have a question regarding sentry.
Is it mandatory to use the following logging configuration to use Sentry?
https://github.com/ui/django-rq#configuring-logging

Especially about defining a sentry handler?
I never had to define any sentry handlers for any django apps and I received the errors via sentry.
What is different with django-rq?

Thanks

Support for 'async=False'

The current enqueue helper doesn't support the 'async=False' option as described in the RQ docs under the Bypassing Workers section. Is there any intention to support this in the short term - happy to do this and submit pull request if not?

Incorrect rq version dependency causes logging to fail.

The version dependency on rq in setup.py is >=0.3.0, however the latest version of django-rq uses the rq.utils.ColorizingStreamHandler in its logging configuration, and this was introduced in rq 0.3.2, and so those running 0.3.0, 0.3.1 will fail:

ValueError: Unable to configure handler 'rq_console': Cannot resolve 'rq.utils.ColorizingStreamHandler': No module named ColorizingStreamHandler

I propose upping the dependency to 0.3.2.

How to use testing DB with django tests?

Hi,

I'm running some classic tests with manage.py test myapp. RQ is running with python manage.py rqworker default.

I have some jobs that write some stuff to DB. While my tests write in the test database Django creates, the jobs are working fine but are writing to the "normal" DB, no the test one... How can I make RQ use the test settings while in test mode?

Thanks in advance, great product BTW.

ResponseError at /django-rq/queues/0/empty/: unknown command 'EVALSHA'

Hi,

I noticed that the Empty Queue button on the admin interface wasn't working and tried to figure out what happened. The problem is that EVAL/EVALSHA commands were implemented in Redis 2.6.0, and I'm using the versions 2.4 in development (which comes with Debian Wheezy) and 2.2 in production (which comes with Ubuntu Precise). Running the django_rq tests on my machine, four of them (test_commit, test_error, test_success and test_clear_queue) fails with the same message: ResponseError: unknown command 'EVALSHA'.

Digging a little more, I've found out that the problem is actually not caused by django-rq, but by rq itself, which uses a Lua script to clear the queue in its empty() method. Then, when redis-py tries to execute it, you can guess what happens. Also, it's worth mentioning that this does not affect the bulk delete action that was added on 3dd89ad/0.7.0.

So, how can we fix this? I guess that there are other people using wheezy/precise on its systems as well. Some options are:

  • Detect which version of Redis is being used by the queue, then hide/show the button if it's < 2.6.0 or >= 2.6.0.
  • Change the default action of the Empty Queue button, so it would bulk-delete the entire queue instead - something which would be like selecting all jobs then use the new bulk delete action mentioned, although this is probably less efficient.
  • Implement a mix of the previous two options, letting the clear_queue view decide on which one is best, based on the detected Redis version.

Regards,
Tiago.

AttributeError: 'DefaultClient' object has no attribute 'client' (<redis_cache.client.default.DefaultClient>)

In a fresh pip install of the latest version:

$ pip freeze | egrep '(redis)|(rq)'
django-redis==3.4.0
django-rq==0.6.0
redis==2.8.0
rq==0.3.12

The following error is inevitable:

File ".../site-packages/django_rq/queues.py", line 63, in get_redis_connection
    return cache.client.client
AttributeError: 'DefaultClient' object has no attribute 'client'

The error is caused by https://github.com/ui/django-rq/blob/58b309458a434f075546e6b05e3911ee5178e1c2/django_rq/queues.py#L63

It is a similar error but more important than #36 because in this case it is not a third party monkey patch breaking things, but simply that the private field of django-redis that django-rq relies upon is not where it is expected.

A couple typo in README

RQ_QUEUES = {
    'default': {
        'HOST': 'localhost',
        'PORT': 6379,
        'DB': 0,
    },
    'high': {
        'HOST': 'localhost',
        'PORT': 6379,
        'DB': 0,
    },
    'low': {
        'HOST': 'localhost',
        'PORT': 6379,
        'DB': 0,
    }
}

Forgot coma before 'low'

And not rqworkers
python manage.py rqworker high default low

Check to worker is running

Hello!

It's possible to check that the worker is running?

I'm try to add job in the queue when worker is not running, and I'm get no errors or messages. I understand that the jobs are added to the redis, and when a worker will run, these jobs will be executed. But that happens if a worker has crash or it has not started?

I'm looking for something better (more highlevel and more python) than:

ps ax | grep rqworker

Django 1.5.5 produces error

Django 1.5.5. produces this error with django-rq

AttributeError: 'CacheStatTracker' object has no attribute '_client'

When I downgrade to 1.5.4 , all is well. Been having problems with 1.5.5 and many other packages. I think some 1.6 code snuck into the 1.5.5 security update.

Better error message for "must have the same redis connection"

ValueError: Queues must have the same redis connection."default" and "high" have different connections.

{'DEFAULT_TIMEOUT': 300, 'HOST': 'localhost', 'DB': 0, 'PORT': 6379}
!=
{'DEFAULT_TIMEOUT': 60, 'HOST': 'localhost', 'DB': 0, 'PORT': 6379}

It's just the timeout which is different. The error message should say: "A worker cannot handle queues with different timeouts" (if thats true).

job decorator behaves differently than enqueue

Sorry if I missed something, but this difference in behavior is not explained in the documentation as far as I can see.

In this example, test is run one time, which is expected.

def test():
    print 'asdf'
    time.sleep(2)

django_rq.enqueue(test)

But in this example, test is being run over and over (seemingly endlessly), each time getting a new job id.

@job
def test():
    print 'asdf'
    time.sleep(2)
test.delay()

test()

Is there an explanation for this? Am I using it wrong? Or is this a real problem?

Admin template override

It is terrible override default admin template.

A least make it optional depending on setting variable and mention it in readme.

DEFAULT_TIMEOUT and same redis connection error

Hi,

I get this error:

ValueError: Queues must have the same redis connection."default" and "high" have different connections

when DEFAULT_TIMEOUT is different for the two queues I use on the same worker.
I did not think DEFAULT_TIMEOUT was part of the "connection" since it is the job timeout and is not related to the connection, correct?

By just changing the DEFAULT_TIME of the high queue to "120" instead of "300", I got the error. If I set it back to "300", it works fine.

CACHES = {
    'queues': {
        'BACKEND': 'redis_cache.cache.RedisCache',
        'LOCATION': os.environ['DJANGO_CACHE_QUEUES_LOCATION'],
        'TIMEOUT': None, # Never expires by default
        'OPTIONS': {
            'CLIENT_CLASS': 'redis_cache.client.DefaultClient',
            'PARSER_CLASS': 'redis.connection.HiredisParser',
            'CONNECTION_POOL_KWARGS': {'max_connections': 100},
            'IGNORE_EXCEPTIONS': not DEBUG, # Get the errors only in DEBUG
            'PICKLE_VERSION': -1  # Use the latest protocol version
        }
    }
}

RQ_QUEUES = {
    'high': {
        'USE_REDIS_CACHE': 'queues',
        'DEFAULT_TIMEOUT': 120, # 2 mins  # =>  MUST BE 300 to avoid the error
    },
    'default': {
        'USE_REDIS_CACHE': 'queues',
        'DEFAULT_TIMEOUT': 300, # 5 mins
    },
}

Thanks

python 2.6 support

Hello,

there is an issue with python 2.6: the dictConfig used in management/commands/rqworker.py is only available on python2.7.

There was a similar issue with rq: see rq/rq#172

So using the django version

from django.utils.log import dictConfig

should fix it.

Example/Demo/Screenshot

Hi, I am interested to see what it looks like, but don't have enough time right now to install all the stack ATM. Is there any example, demo or screenshot that I've missed?

Closing the database connection

I'm using django-rq on Heroku with the Heroku Postgres DB setup. When my RQ worker jobs finish, I always see this in the logs:

LOG:  could not receive data from client: Connection reset by peer
LOG:  unexpected EOF on client connection

Any idea is there's a way to close the database connection when a job ends?
For example, I probably need to put this somewhere:

from django.db import connection
connection.close()

Perhaps it should just be in the worker function itself... but that seems like a hack... thoughts?

get_current_job from django_rq

Hi,

I'm trying to use get_current_job() function from rq , but I can't figure how to do this.

import rq
rq.get_current_job()  # return None

I'm using django_rq, and think it's a problem of connection ?

When looking a code in rq/jobs.py, we see it uses LocalStack, and it seems that LocalStack is not 'aware' of our redis connection, as mentioned in django_rq rqworker command.

Cheers,

Overriding exception handlers

This doesn't seems to be supported?
The workers are instantiated upon command without any possibility to push_exc_handler(my_handler)

Redis still fills up when results_ttl=0, Why?

Issue: Why is redis filling up if the results of jobs are discarded immediately?

I'm using django-rq to queue up jobs to create PDFs asynchronously and then save the result to my database. Since its saved, I don't need to access the object a later date and so I don't need to keep store the result in redis after its been processed.

To keep the result from staying in redis I've set the result_ttl to 0:

parameter_dict = {          
    "order": serializer.object,
    "photo": base64_image,
    "result_ttl": 0
}
django_rq.enqueue(procces_template, **parameter_dict)

The problem is although the rq worker says the job expires immediately:

15:33:35 Job OK, result = John Doe's nail order to 568 Broadway
15:33:35 Result discarded immediately.
15:33:35
15:33:35 *** Listening on high, default, low...

Redis still fills up and throws:

ResponseError: command not allowed when used memory > 'maxmemory'

Is there another parameter that I need to set in django-rq to keep redis from filling up if the job result is already not stored?

Connection limit exceeded

I'm getting an error running django-rq on Heroku:

redis.exceptions.ResponseError: max number of clients reached

The error is perfectly reasonably, I'm using the free redis cloud service which only allows 10 connections and running a script to generate content which calls django-rq lots of times and thereby exceeds the connection limit. But what's best fix?

I could probably force the queue to be synchronous for this task. But that only fixes this script, what happens in proper deployment if the system exceeds the max number of clients limit?

Basically is there an elegant way of using rq's connection pool in django-rq? Would it be easiest to use django-redis then specify max_connections?

Mistake in documentation

Hello

In documentation I found a mistake, You require use:
worker = django_rq.get_worker() # Returns a worker for "default" queue
worker.run()

But object worker doesn't has a method run()

How I can start new worker in background from django code?

Thank You

Mixing queues list and connection configurations

Hello, I think that mixing connections data and queue names in configuration is not a good idea. At least it won't give you to create queues dynamically (CELERY_CREATE_MISSING_QUEUES analogue).

More than that there is some kind of duplication, because queues list is stored in the hash of each worker.

As I can see this was done only because of only one reason - to show queues list in the interface.

It would be more consistent to pass the combination of connection and queue names as the arguments of worker manage command and extract data about workers and queues directly from the redis.

What do you think about such pattern?

Trying to connect to redis with ASYNC False

Using job decorator, django-rq tries to connect to redis even though ASYNC is set to False in RQ_QUEUES. When redis is available it executes synchronously but after connecting to redis first.

Some periodic jobs with repeat counts never go away

If I start a periodic job with a repetition count of 5, and from within each invocation of that job I submit a single non-periodic job to another worker, the parent job's repetition count never makes it all the way to zero - the repetition count is effectively ignored, making the job last forever. Printing the repeat count within enqueue_job, I can see that it decrements a few times, and then stops going down before reaching zero.

However, if I start a periodic job with a repetition count of 5, and I don't submit another job from within it, the repetition count of the periodic job decrements as one would expect, and the job's periodic evaluation is eventually finished.

I have a small test program that shows the problem, but I'll need to get permission to share it - so let me know if you think it might be helpful to see.

Allow any redis connection for use in settings.py

With rq, I am able to pass in connection to create a queue. I am using Heroku and the way I create a queue is by using redis.from_url. The way I use this without django-rq is from the [Heroku tutorial on rq]:(https://devcenter.heroku.com/articles/python-rq)

import os

from rq import Queue
import redis

redis_url = os.getenv('REDISTOGO_URL', 'redis://localhost:6379')
redis_connection = redis.from_url(redis_url)
q = Queue('email', connection=redis_connection)

Currently I am planning on parsing the url like this:

redis_url = os.getenv('REDISTOGO_URL', 'redis://localhost:6379')
url = urlparse.urlparse(redis_url)
RQ_QUEUES = {
    'default': {
        'HOST': url.hostname,
        'PORT': url.port,
        'DB': 0,
        'PASSWORD': url.password,
    },
}

Source: redis/redis-py#246

Passing timeout to queued job

Is it possible to pass a timeout value to an enqueued job? I believe the default is 180 seconds -- which is short for some long-running jobs.

Thanks for the great tool!

Job scheduled for 3 minutes in the future using absolute time, runs immediately

When I run the following code:

    now = datetime.datetime.now()
    future_3_min_delta = datetime.timedelta(minutes=3)
    print('now is {}'.format(str(now)))
    future_3_min_absolute = now + future_3_min_delta
    print('future_3_min_absolute is {}'.format(str(future_3_min_absolute)))
    result3 = scheduler.enqueue_at(future_3_min_absolute, test_mod.func, 3)

The job is not delayed for 3 minutes - instead, it runs right away. In fact, even if I change it to 1 hour in the future, it still runs right away.

django worker slow?

I have a very simple task:

@job
def log_info():
    logger.info('Job running')

Queuing this task 500 times nearly takes 2 minutes to process with a single worker.

for i in range(500): tasks.log_info.delay()

I guess its because of the issue described in the Performance notes here.
Is django-rq is setting up the django environment for every single job?

Different interpretations of 'queue' between rq and django-rq.

In rq a 'queue' is an arbitrary named pipe and doesn't imply anything about the underlying connection. In django-rq it implies a connection. So the best practice of 3 queues suggested by rq ('low', 'medium', 'high') doesn't really work with django-rq unless you define 3 connections.

I would propose that you either change the syntax of RQ_QUEUES to include some form of glob matching, or change it to be RQ_CONNECTIONS, which is what it really contains, and separate out the connection from the queue name completely.

Option 1: Automatic queue->connection routing by glob.

RQ_QUEUES = {
    '*': {
        'HOST': REDIS_HOST,
        'PORT': REDIS_PORT,
        'DB': REDIS_DB['jobs'],
    },
    'test-*': {
        ...
    }
}
...
queue = django_rq.get_queue('test-low')
queue.enqueue(func, foo, bar=baz)

Option 2: Differentiation completely.

import django_rq
queue = django_rq.get_queue('low', connection='test')
queue.enqueue(func, foo, bar=baz)

Custom worker classes

RQ supports creating custom Worker classes (here, bottom). These are launched like this:

$ rqworker -w 'path.to.GeventWorker'

Any chance we could add this flag to django-rq?

job decorator for signals

Hello,

I would like to use the job decorator 'around' a post_save (or any other) signal handler, have you got an example of this working.

Here is some code I "imagine", but don't know if I can achieve it with django-rq

from django.db.models.signals import post_save
from django_rq import job

@job
def after_save_instance_handler(sender, **kwargs):
    print sender
after_save_instance_handler.delay(*?!*)

post_save.connect(after_save_instance_handler, sender=MyModel)

There is an interesting article here, trying the same thing with celery http://dougalmatthews.com/2011/10/10/making-django%27s-signals-asynchronous-with-celery/

Regards,

Test django-rq

Hello,

I try to test an app which has some asynchronous tasks using django-rq, and it's not always as easy as it could/should be.

For the moment I have got two use cases - Mail tests and Signal tests, that work with synchronous job, but doesn't work with asynchronous tasks.

Mail test:

# in jobs.py for example

@job
def send_async_mail():
    print get_connection()  # return an instance of locmem.EmailBackend
    send_mail('foo', 'bar', '[email protected]', ['[email protected]'], fail_silently=False)


def send_custom_mail():
     send_async_mail.delay()

# in tests.py

from django_rq import get_worker
from jobs import send_custom_mail

class AsyncSendMailTestCase(unittest.TestCase):
    def test_async_mail(self):
        send_custom_mail()
        get_worker().work(burst=True)
        print get_connection() # return an OTHER instance of locmem.EmailBackend
        print len(mail.outbox)  # return 0, should return 1

As you see above, the "problem" is that the instance of get_connection() (https://docs.djangoproject.com/en/dev/topics/email/?from=olddocs#django.core.mail.get_connection) is not the same.

Also, I'm facing (I think) the same problem using mock_signal_receiver https://github.com/dcramer/mock-django/blob/master/mock_django/signals.py in asynchronous jobs. A kind of a reference to the receiver is losted with asynchronous jobs.
I'm quite sure of it because, when I remove the asynchronous part of my code, it works.

Any ideas on how to deal with that kind of problem ?

Regards,

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.