Giter Club home page Giter Club logo

pyramid_celery's Introduction

Getting Started

https://travis-ci.org/sontek/pyramid_celery.png?branch=master https://coveralls.io/repos/sontek/pyramid_celery/badge.png?branch=master

Include pyramid_celery either by setting your includes in your .ini, or by calling config.include('pyramid_celery'):

pyramid.includes = pyramid_celery

Then you just need to tell pyramid_celery what ini file your [celery] section is in:

config.configure_celery('development.ini')

Then you are free to use celery, for example class based:

from pyramid_celery import celery_app as app

class AddTask(app.Task):
    def run(self, x, y):
        print x+y

or decorator based:

from pyramid_celery import celery_app as app

@app.task
def add(x, y):
    print x+y

To get pyramid settings you may access them in app.conf['PYRAMID_REGISTRY'].

Configuration

By default pyramid_celery assumes you want to configure celery via an ini settings. You can do this by calling config.configure_celery('development.ini') but if you are already in the main of your application and want to use the ini used to configure the app you can do the following:

config.configure_celery(global_config['__file__'])

If you want to use the standard celeryconfig python file you can set the use_celeryconfig = True like this:

[celery]
use_celeryconfig = True

You can get more information for celeryconfig.py here:

http://celery.readthedocs.io/en/latest/userguide/configuration.html

An example ini configuration looks like this:

[celery]
broker_url = redis://localhost:1337/0
imports = app1.tasks
          app2.tasks

[celery:broker_transport_options]
visibility_timeout = 18000
max_retries = 5

[celerybeat:task1]
task = app1.tasks.Task1
type = crontab
schedule = {"minute": 0}

You'll notice the configuration options that are dictionaries or have multiple values will be split into their own sections.

Scheduled/Periodic Tasks

To use celerybeat (periodic tasks) you need to declare 1 celerybeat config section per task. The options are:

  • task - The python task you need executed.
  • type - The type of scheduling your configuration uses, options are crontab, timedelta, and integer.
  • schedule - The actual schedule for your type of configuration.
  • args - Additional positional arguments.
  • kwargs - Additional keyword arguments.

Example configuration for this:

[celerybeat:task1]
task = app1.tasks.Task1
type = crontab
schedule = {"minute": 0}

[celerybeat:task2]
task = app1.tasks.Task2
type = timedelta
schedule = {"seconds": 30}
args = [16, 16]

[celerybeat:task3]
task = app2.tasks.Task1
type = crontab
schedule = {"hour": 0, "minute": 0}
kwargs = {"boom": "shaka"}

[celerybeat:task4]
task = myapp.tasks.Task4
type = integer
schedule = 30

A gotcha you want to watchout for is that the date/time in scheduled tasks is UTC by default. If you want to schedule for an exact date/time for your local timezone you need to set timezone. Documentation for that can be found here:

http://celery.readthedocs.org/en/latest/userguide/periodic-tasks.html#time-zones

If you need to find out what timezones are available you can do the following:

from pprint import pprint
from pytz import all_timezones
pprint(all_timezones)

Worker Execution

The celerybeat worker will read your configuration and schedule tasks in the queue to be executed at the time defined. This means if you are using celerybeat you will end up running 2 workers:

$ celery -A pyramid_celery.celery_app worker --ini development.ini
$ celery -A pyramid_celery.celery_app beat --ini development.ini

The first command is the standard worker command that will read messages off of the queue and run the task. The second command will read the celerybeat configuration and periodically schedule tasks on the queue.

Routing

If you would like to route a task to a specific queue you can define a route per task by declaring their queue and/or routing_key in a celeryroute section.

An example configuration for this:

[celeryroute:otherapp.tasks.Task3]
queue = slow_tasks
routing_key = turtle

[celeryroute:myapp.tasks.Task1]
queue = fast_tasks

Running the worker

To run the worker we just use the standard celery command with an additional argument:

celery worker -A pyramid_celery.celery_app --ini development.ini

If you've defined variables in your .ini like %(database_username)s you can use the --ini-var argument, which is a comma separated list of key value pairs:

celery worker -A pyramid_celery.celery_app --ini development.ini --ini-var=database_username=sontek,database_password=OhYeah!

The values in ini-var cannot have spaces in them, this will break celery's parser.

The reason it is a csv instead of using --ini-var multiple times is because of a bug in celery itself. When they fix the bug we will re-work the API. Ticket is here:

celery/celery#2435

If you use celerybeat scheduler you need to run with the --beat flag to run beat and the worker at the same time.

celery worker --beat -A pyramid_celery.celery_app --ini development.ini

Or you can launch it separately like this:

celery beat -A pyramid_celery.celery_app --ini development.ini

Logging

If you use the .ini configuration (i.e don't use celeryconfig.py) then the logging configuration will be loaded from the .ini and will not use the default celery loggers.

You most likely want to add a logging section to your ini for celery as well:

[logger_celery]
level = INFO
handlers =
qualname = celery

and then update your [loggers] section to include it.

If you want use the default celery loggers then you can set CELERYD_HIJACK_ROOT_LOGGER=True in the [celery] section of your .ini.

Celery worker processes do not propagate exceptions inside tasks, but swallow them silently by default. This is related to the behavior of reading asynchronous task results back. To see if your tasks fail you might need to configure celery.worker.job logger to propagate exceptions:

# Make sure Celery worker doesn't silently swallow exceptions
# See http://stackoverflow.com/a/20719461/315168
# https://github.com/celery/celery/issues/2437
[logger_celery_worker_job]
level = ERROR
handlers =
qualname = celery.worker.job
propagate = 1

If you want use the default celery loggers then you can set CELERYD_HIJACK_ROOT_LOGGER=True in the [celery] section of your .ini

Demo

To see it all in action check out examples/long_running_with_tm, run redis-server and then do:

$ python setup.py develop
$ populate_long_running_with_tm development.ini
$ pserve ./development.ini
$ celery worker -A pyramid_celery.celery_app --ini development.ini

pyramid_celery's People

Contributors

belasin avatar caspercl avatar casperla avatar crooksey avatar damoxc avatar edelooff avatar groner avatar hynek avatar kenrobbins avatar lxneng avatar mardiros avatar miohtama avatar msabramo avatar ocampana avatar omarkohl avatar przemal avatar rguldener avatar sontek avatar timgates42 avatar weyzu avatar wichert avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

pyramid_celery's Issues

Can't set CELERY_ACCEPT_CONTENT parameter through the ini file (will be blocking with celery 3.2)

In pyramid_celery 2.0.0-rc3, while configuring through the main ini file, it's not possible to configure CELERY_ACCEPT_CONTENT

The ini file content

CELERY_ACCEPT_CONTENT =
     json

The traceback

Traceback (most recent call last):
  File "/home/gas/.virtualenvs/autonomie/lib/python2.7/site-packages/billiard/process.py", line 292, in _bootstrap
    self.run()
  File "/home/gas/.virtualenvs/autonomie/lib/python2.7/site-packages/billiard/pool.py", line 286, in run
    self.after_fork()
  File "/home/gas/.virtualenvs/autonomie/lib/python2.7/site-packages/billiard/pool.py", line 389, in after_fork
    self.initializer(*self.initargs)
  File "/home/gas/.virtualenvs/autonomie/lib/python2.7/site-packages/celery/concurrency/prefork.py", line 81, in process_initializer
    app=app)
  File "/home/gas/.virtualenvs/autonomie/lib/python2.7/site-packages/celery/app/trace.py", line 161, in build_tracer
    backend = task.backend
  File "/home/gas/.virtualenvs/autonomie/lib/python2.7/site-packages/celery/five.py", line 284, in __get__
    return self.__get.__get__(obj, type)()
  File "/home/gas/.virtualenvs/autonomie/lib/python2.7/site-packages/celery/task/base.py", line 75, in backend
    return cls.app.backend
  File "/home/gas/.virtualenvs/autonomie/lib/python2.7/site-packages/kombu/utils/__init__.py", line 322, in __get__
    value = obj.__dict__[self.__name__] = self.__get(obj)
  File "/home/gas/.virtualenvs/autonomie/lib/python2.7/site-packages/celery/app/base.py", line 625, in backend
    return self._get_backend()
  File "/home/gas/.virtualenvs/autonomie/lib/python2.7/site-packages/celery/app/base.py", line 444, in _get_backend
    return backend(app=self, url=url)
  File "/home/gas/.virtualenvs/autonomie/lib/python2.7/site-packages/celery/backends/base.py", line 107, in __init__
    conf.CELERY_ACCEPT_CONTENT if accept is None else accept,
  File "/home/gas/.virtualenvs/autonomie/lib/python2.7/site-packages/kombu/serialization.py", line 454, in prepare_accept_content
    return set(n if '/' in n else name_to_type[n] for n in l)
  File "/home/gas/.virtualenvs/autonomie/lib/python2.7/site-packages/kombu/serialization.py", line 454, in <genexpr>
    return set(n if '/' in n else name_to_type[n] for n in l)
KeyError: '\n'

Expected CELERY_IMPORTS (a specific treatment has been added in the pyramd_celery/init.py file), we can't customize parameters expecting a list as value.

Trouble with timezones/dst

So, I have been using celery/beat for a number of years, and have been ofsetting manually, the schedule of my tasks due to DST issues etc. As my codebase has become larger, the script that I run to change the times is getting bigger and bigger, and I have decided to sort the problem out.

So in short, my system clock updates automatically, from my shell I can run:

┌─[luke@freebsd] - [~/py3-apps/intranet] - [Thu Mar 29, 12:24]
└─[$]> date
Thu Mar 29 12:37:22 BST 2018

So presently I have a task to run at 10:30am, it will run at 11:30am. So I thought this would be easy, I added the following to my configuration:

CELERY_TIMEZONE = Europe/London
CELERY_ENABLE_UTC = False

When I run my celery beat schedule, via:

celery worker --beat -A pyramid_celery.celery_app --ini development.ini -n celeryIntranetAPI

Now I thought this would solve my problems, however my cron tasks are still an hour behind, how can I make celery keep up with the system clock?

Note I have tried:

CELERY_TIMEZONE = UTC          
CELERY_ENABLE_UTC = True

As per a few suggestions, but this did not work either.

Can anyone can shed some light on how I can link my celery cron timings to the system clock?

pyramid_celery is not compatible with pyramid 2

In pyramid 2 there is no pyramid.compat module anymore.
The ConfigParser should be loaded directly from the python3 configparser module.

from pyramid.compat import configparser

Should be

import configparser

'BaseBeatCommand' is not defined error

pcelerybeat yields the following error when starting.

./env/lib/python2.7/site-packages/pyramid_celery/commands/celerybeat.py", line 12, in
class BeatCommand(CommandMixin, BaseBeatCommand):
NameError: name 'BaseBeatCommand' is not defined

It seems BeatCommand class defined in celerybeat.py expects BaseBeatCommand class to be present always, but it fails when that class cannot be imported from celery.bin.celerybeat.

I changed definition of the class as below and it works okay.

class BeatCommand(CommandMixin, BaseCeleryCtl):
preload_options = tuple(BaseCeleryCtl.preload_options
[len(Command.preload_options):])

Can pyramid_celery setup celery logging configuration as well?

One of the goals of this package is to configure celery from the pyramid configuration file.

But usually a pyramid PasteDeploy configuration file contains logging configuration as well. Can pyramid_celery make use of the pyramid configuration file again and setup logging using celery logging API?

Can this package take responsibility for this task as well? Is it actually possible, does it make sense at all?

If the docs could make a statement about this, I think it would be helpful for people adopting pyramid & celery framework. Many of them would appreciate some more developer guidance.

pypi version outdated

Hi John,
The current version of pyramid_celery installed via pip or easy_install doesn't work with celery >= 3.1.
Can you please update Pypi with the latest changes from github ?

Thanks!

result_backend config option doesn't work in .ini file

Pyramid 1.10.4
Celery 4.4.2
pyramid_celery 3.0.0
Python 3.6.9

I was trying to specify my celery result_backend using the .ini file. But it just wouldn't work. Kept getting results: disabled://.
Moving the config to celeryconfig.py with USE_CELERYCONFIG = True fixed the issue for me, but thought this might still be something to look into, as expected behaviour might be for it to work either way.

Thank you,

pyramid_celery not installable from pypi

It looks like README.md is not included in the sdist, probably due to lack of a appropriate MANIFST.in:

Getting distribution for 'pyramid-celery'.
error: /var/folders/+P/+PcLj-6aHLaQAin55j1OYE+++TI/-Tmp-/easy_install-2iPIa1/pyramid_celery-0.1/README.md: No such file or directory

Current implementation does not support SQS configuration broker transport options

Currently, configuration for broker_transport_options are being limited by the following map:

BROKER_TRANSPORT_OPTIONS_MAP = {
    'visibility_timeout': int,
    'max_retries': int,
}

However, for SQS, there are other options required such as region. Because these are not mapped, they are ignored and we get an error further down the line as this information is missing.

It would not be very sustainable to support every option required by every (supported) provider, but what is the potential risk of removing this mapping in its entirety? Or is there another way to configure this that I missed?

Worker Concurrency configuration on .ini does not work.

Versions used:
* Python: 3.9.4
* Celery: 5.0.5
* Pyramid: 2.0
* Pyramid-Celery: 4.0.0

Before openinig this issue a digged a lot and I think I kind of understand why and how it happens but I am not sure how to fix it and even if it is possible to fix without some ajustments in the celery project itself.

I will put my findings here and hopefuly we may find a way to address it.

How to reproduce:

Files

The files can also be found here: https://github.com/debonzi/pyramid-celery-issue-sample

setup.py

from setuptools import setup, find_packages

requires = [
    "celery==5.0.5",
    "pyramid==2.0",
    "pyramid-celery==4.0.0",
]
setup(
    name="demo",
    version="0.0.1",
    install_requires=requires,
    packages=find_packages(exclude=['tests']),
    entry_points={
        "paste.app_factory": [
            "main = demo:main",
        ],
    },
)

demo.py

from pyramid.config import Configurator


def main(global_config, **settings):
    """This function returns a Pyramid WSGI application."""
    with Configurator(settings=settings) as config:
        config.include("pyramid_celery")
        config.configure_celery(global_config["__file__"])

    return config.make_wsgi_app()

conf.ini

[app:main]
use = egg:demo

[server:main]
use = egg:waitress#main
listen = localhost:6543

[celery]
broker_url = redis://localhost:6379/1
worker_concurrency = 4

Install and Run

$ pip install -e .
$ celery -A pyramid_celery.celery_app worker --ini config.ini

Output:

 -------------- celery@Doomhammer v5.0.5 (singularity)
--- ***** -----
-- ******* ---- Linux-5.4.72-microsoft-standard-WSL2-x86_64-with-glibc2.31 2021-05-02 16:15:11
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app:         __main__:0x7f3e25b9e670
- ** ---------- .> transport:   redis://localhost:6379/1
- ** ---------- .> results:     disabled://
- *** --- * --- .> concurrency: 8 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery

Concurrency is 8 (Number of CPU Cores on my machine) and not 4 as expected.

As fas as I could understand, celery 5 uses click and celery-pyramid uses the signal user_preload_options to configure celery using config_from_object method.

At this point it seems that celery has already loaded the click parameters into internal variables and since we didn't use --concurrency it behaves as if we have used --concurrency=0 (which will later translate to Number of CPU cores). Since command line parameters has precedency over config file parameters (config_from_object) celery will ignore the value from .ini` file and use the default value.

If you guys have a idea on how to fix (or work around it) or need any more information I might have I will be glad to provide it.

Cheers

Environment variable parsing not supported

I want to use an environment variable as my BROKER_URL in a .ini file but it isn't parsed correctly. When I try to do
[celery]
BROKER_URL = ${REDIS_URL}
I get the following error

pyramid_celery fails silently when the inifile is not well configured

In the pyramid_celery/init.py file, inside on_preload_parsed function the pyramid app is bootstraped using

env = bootstrap(ini_location, options=options)

or

env = bootstrap(ini_location)

When that call fails, the celery_app starts without having been customized with default configuration.

For example running the long_running_task_with_tm example results by default in the following error.

[2017-09-28 17:36:35,569: WARNING/MainProcess] consumer: Cannot connect to %s: %s.
%s
[2017-09-28 17:36:35,569: WARNING/MainProcess] amqp://guest:**@127.0.0.1:5672//
[2017-09-28 17:36:35,569: WARNING/MainProcess] [Errno 111] Connection refused
[2017-09-28 17:36:35,570: ERROR/MainProcess] consumer: Cannot connect to amqp://guest:**@127.0.0.1:5672//: [Errno 111] Connection refused.
Trying again in 4.00 seconds...

Adding a traceback print shows :

InterpolationMissingOptionError: Error in file /home/mysuser/pool/pyramid_celery/examples/long_running_with_tm/development.ini: Bad value substitution:
	section: [app:main]
	option : sqlalchemy.url
	key    : database
	rawval : postgresql://localhost/%(database)s

Cron job timing

I have some cron tasks, then ones the are every 2 hours etc work fine, the ones where I specify an exact time to run, or day of the week, simply never execute, anything I am doing wrong?

# Check every 2 hours for orders with nil_value
# THIS WORKS FINE
[celerybeat:task3]
task = intranet.celery_tasks.nilvalue_celery
type = crontab
schedule = {"minute": 0, "hour": "*/2"}

#THESE DO NOT WORK
[celerybeat:task4]
task = intranet.celery_tasks.emailfigures_celery
type = crontab
schedule = {"hour": "16", "minute": "30" }
#schedule = {}

[celerybeat:task5]
task = intranet.celery_tasks.reps_pastweek_celery
type = crontab
schedule = {"day_of_week": 5, "hour": "16", "minute": "30"}
# schedule = {"day_of_week":"Friday", "hour": "16", "minute": "30"}
# schedule = {"day_of_week":"Fri", "hour": "16", "minute": "30"}

Planning a new release

The current release in PyPI is not compatible current versions of Celery, but the code in Git works. Would it be possible to tag a new version and push it to PyPI?

transaction support

it would be great if this supported joining the pyramid_tm transaction

for example, if I call something an async task, it could defer sending the request to the broker until the transaction commits.

TypeError: flower() got an unexpected keyword argument 'ini_var'

With pyramid-celery==4.0.0, celery==5.2.3 and flower=1.0.0 I'm getting the error after invoking flower via

celery -A pyramid_celery.celery_app flower

It seems like pyramid-celery is trying to send ini_var parameter with the newer version of flower no longer accepts.

Stack trace is

flower_1      | Traceback (most recent call last):
flower_1      |   File "/usr/local/bin/celery", line 8, in <module>
flower_1      |     sys.exit(main())
flower_1      |   File "/usr/local/lib/python3.8/dist-packages/celery/__main__.py", line 15, in main
flower_1      |     sys.exit(_main())
flower_1      |   File "/usr/local/lib/python3.8/dist-packages/celery/bin/celery.py", line 213, in main
flower_1      |     return celery(auto_envvar_prefix="CELERY")
flower_1      |   File "/usr/local/lib/python3.8/dist-packages/click/core.py", line 1128, in __call__
flower_1      |     return self.main(*args, **kwargs)
flower_1      |   File "/usr/local/lib/python3.8/dist-packages/click/core.py", line 1053, in main
flower_1      |     rv = self.invoke(ctx)
flower_1      |   File "/usr/local/lib/python3.8/dist-packages/click/core.py", line 1659, in invoke
flower_1      |     return _process_result(sub_ctx.command.invoke(sub_ctx))
flower_1      |   File "/usr/local/lib/python3.8/dist-packages/click/core.py", line 1395, in invoke
flower_1      |     return ctx.invoke(self.callback, **ctx.params)
flower_1      |   File "/usr/local/lib/python3.8/dist-packages/click/core.py", line 754, in invoke
flower_1      |     return __callback(*args, **kwargs)
flower_1      |   File "/usr/local/lib/python3.8/dist-packages/click/decorators.py", line 26, in new_func
flower_1      |     return f(get_current_context(), *args, **kwargs)
flower_1      | TypeError: flower() got an unexpected keyword argument 'ini_var'

Duplicated Tasks

Hi John,
Thanks for your hard work on this project, it makes things lot easier.
I have realized that all my tasks are running twice, I was wondering if you have faced an issue like this in the past:

2015-05-28 08:43:04,586 INFO  [celery.beat][MainThread] beat: Starting...
2015-05-28 08:43:04,586 INFO  [celery.beat][MainThread] beat: Starting...
2015-05-28 08:43:04,600 INFO  [celery.worker.consumer][MainThread] Connected to redis://127.0.0.1:6379/0
2015-05-28 08:43:04,600 INFO  [celery.worker.consumer][MainThread] Connected to redis://127.0.0.1:6379/0
2015-05-28 08:43:04,614 INFO  [celery.worker.consumer][MainThread] mingle: searching for neighbors
2015-05-28 08:43:04,614 INFO  [celery.worker.consumer][MainThread] mingle: searching for neighbors
2015-05-28 08:43:04,639 INFO  [celery.beat][MainThread] Scheduler: Sending due task task1 (app.task.user.add)
2015-05-28 08:43:04,639 INFO  [celery.beat][MainThread] Scheduler: Sending due task task1 (app.task.user.add)
2015-05-28 08:43:05,623 INFO  [celery.worker.consumer][MainThread] mingle: all alone
2015-05-28 08:43:05,623 INFO  [celery.worker.consumer][MainThread] mingle: all alone
2015-05-28 08:43:06,145 INFO  [celery.worker.strategy][MainThread] Received task: app.task.user.add[85240b95-a9f4-4a40-b215-36c5379c871a]
2015-05-28 08:43:06,145 INFO  [celery.worker.strategy][MainThread] Received task: app.task.user.add[85240b95-a9f4-4a40-b215-36c537

This is the configuration I'm using:

#development.ini
    [celery]
BROKER_URL = redis://127.0.0.1:6379/0
CELERY_IMPORTS = app.task.user

[celerybeat:task1]
task = app.task.user.add
type = timedelta
schedule = {"seconds": 15}

Task file:

#app.task.user
from pyramid_celery import celery_app as app
import logging

@app.task
def add(*args, **kwargs):
    log = logging.getLogger('celery')
    log.error(app.control.inspect())
    return None

I have tried to fix it following the celery documentation, but I don't really know how to add BROKER_TRANSPORT_OPTIONS to the development.ini file. How do you parse the JSON options into the .ini file? Thanks John.

How to run periodic tasks?

pyramid_celery doesn't seem to import celery.task.periodic_task or PeriodicTask, I can't figure out how to run periodic tasks.

Any help will be much appreciated.

Jerry

ini-var interface question

The pyramid_celery documentation currently says:

The reason it is a csv instead of using –ini-var multiple times is because of a bug in celery itself. When they fix the bug we will re-work the API.

The relevant bug is celery/celery#2435, which was merged over two years ago. Are you still planning on updating the --ini-var interface, or should that remark be removed from the documentation?

setting worker config results in worker error

Setting integer properties results in the worker failing to start.. we could default to use_celeryconfig but would prefer
to keep the config in the ini file.

[celery]
...
worker_max_tasks_per_child=1024

on worker start :

"levelname": "CRITICAL", "name": "celery.worker", "message": "Unrecoverable error: AssertionError()", "exc_info": "Traceback (most recent call last):
  File \"/opt/viqi/lib/python3.10/site-packages/celery/worker/worker.py\", line 202, in start
    self.blueprint.start(self)
  File \"/opt/viqi/lib/python3.10/site-packages/celery/bootsteps.py\", line 116, in start
    step.start(parent)
  File \"/opt/viqi/lib/python3.10/site-packages/celery/bootsteps.py\", line 365, in start
    return self.obj.start()
  File \"/opt/viqi/lib/python3.10/site-packages/celery/concurrency/base.py\", line 130, in start
    self.on_start()
  File \"/opt/viqi/lib/python3.10/site-packages/celery/concurrency/prefork.py\", line 109, in on_start
    P = self._pool = Pool(processes=self.limit,
  File \"/opt/viqi/lib/python3.10/site-packages/celery/concurrency/asynpool.py\", line 464, in __init__
    super().__init__(processes, *args, **kwargs)
  File \"/opt/viqi/lib/python3.10/site-packages/billiard/pool.py\", line 1045, in __init__
    self._create_worker_process(i)
  File \"/opt/viqi/lib/python3.10/site-packages/celery/concurrency/asynpool.py\", line 482, in _create_worker_process
    return super()._create_worker_process(i)
  File \"/opt/viqi/lib/python3.10/site-packages/billiard/pool.py\", line 1142, in _create_worker_process
    w = self.WorkerProcess(self.Worker(
  File \"/opt/viqi/lib/python3.10/site-packages/billiard/pool.py\", line 241, in __init__
    assert maxtasks is None or (type(maxtasks) == int and maxtasks > 0)
AssertionError",

It appears all fields are loaded as str from config section

does this module support python3.8.5 on windows64

$ ..\Scripts\celery -A pyramid_celery.celery_app worker --ini development.ini

-------------- celery@PC v5.0.5 (singularity)
--- ***** -----
-- ******* ---- Windows-7-6.1.7601-SP1 2021-04-25 09:05:13

  • *** --- * ---
  • ** ---------- [config]
  • ** ---------- .> app: main:0x40b0910
  • ** ---------- .> transport: redis://:**@localhost:6379/0
  • ** ---------- .> results: disabled://
  • *** --- * --- .> concurrency: 4 (prefork)
    -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
    --- ***** -----
    -------------- [queues]
    .> celery exchange=celery(direct) key=celery

[2021-04-25 09:05:13,577: CRITICAL/MainProcess] Unrecoverable error: AttributeError("Can't pickle local object '_compile_route..matcher'")
Traceback (most recent call last):
File "d:\pyramid_celery_test\lib\site-packages\celery\worker\worker.py", line 203, in start
self.blueprint.start(self)
File "d:\pyramid_celery_test\lib\site-packages\celery\bootsteps.py", line 116, in start
step.start(parent)
File "d:\pyramid_celery_test\lib\site-packages\celery\bootsteps.py", line 365, in start
return self.obj.start()
File "d:\pyramid_celery_test\lib\site-packages\celery\concurrency\base.py", line 129, in start
self.on_start()
File "d:\pyramid_celery_test\lib\site-packages\celery\concurrency\prefork.py", line 107, in on_start
P = self._pool = Pool(processes=self.limit,
File "d:\pyramid_celery_test\lib\site-packages\billiard\pool.py", line 1046, in init
self._create_worker_process(i)
File "d:\pyramid_celery_test\lib\site-packages\billiard\pool.py", line 1158, in _create_worker_process
w.start()
File "d:\pyramid_celery_test\lib\site-packages\billiard\process.py", line 124, in start
self._popen = self._Popen(self)
File "d:\pyramid_celery_test\lib\site-packages\billiard\context.py", line 383, in _Popen
return Popen(process_obj)
File "d:\pyramid_celery_test\lib\site-packages\billiard\popen_spawn_win32.py", line 83, in init
reduction.dump(process_obj, to_child)
File "d:\pyramid_celery_test\lib\site-packages\billiard\reduction.py", line 99, in dump
ForkingPickler(file, protocol).dump(obj)
AttributeError: Can't pickle local object '_compile_route..matcher'
Traceback (most recent call last):
File "", line 1, in
File "d:\pyramid_celery_test\lib\site-packages\billiard\spawn.py", line 165, in spawn_main
exitcode = _main(fd)
File "d:\pyramid_celery_test\lib\site-packages\billiard\spawn.py", line 207, in _main
self = pickle.load(from_parent)
EOFError: Ran out of input

Project does not present a license

This code is under copyright with all rights reserved by the author, which technically doesn't makes it available for inclusion in other projects without permission.

No app config

Hi there. As documented, I do next:
ini:

[celery]
BROKER_URL = redis://0.0.0.0:6379/0
CELERY_IMPORTS = my.module.tasks

[app:main]
egg = another.module
pyramid.includes =
    ...
    pyramid_celery
...

another.module:

<...>
def main(global_config, **settings):
    <...>
    config.configure_celery(global_config['__file__'])

my.module.tasks

from pyramid_celery import celery_app as app

print app.conf['PYRAMID_REGISTRY']

@app.task()
def my_task():
    <...>

Then I run worker or I run application with pserve by .ini file, and application fails with
KeyError: 'PYRAMID_REGISTRY'.

Question is: what am I doing wrong? How do I access pyramid config in my tasks?

Unable to run purge with Celery release candidate 5.3.0rc1

Lot's of changes in Celery have been happening since the last release of pyramid-celery. Recently building against Celery==5.3.0rc1
and pyramid-celery v4.0.0 I run into the following problems with purge

Without --ini arg specified I get the error

celery -A pyramid_celery.celery_app purge
You must provide the paste --ini argument

and when running with --ini specified

celery -A pyramid_celery.celery_app purge --ini ini/staging.ini
Traceback (most recent call last):
  File "/usr/local/bin/celery", line 8, in <module>
    sys.exit(main())
  File "/usr/local/lib/python3.8/dist-packages/celery/__main__.py", line 15, in main
    sys.exit(_main())
  File "/usr/local/lib/python3.8/dist-packages/celery/bin/celery.py", line 235, in main
    return celery(auto_envvar_prefix="CELERY")
  File "/usr/local/lib/python3.8/dist-packages/click/core.py", line 1130, in __call__
    return self.main(*args, **kwargs)
  File "/usr/local/lib/python3.8/dist-packages/click/core.py", line 1055, in main
    rv = self.invoke(ctx)
  File "/usr/local/lib/python3.8/dist-packages/click/core.py", line 1657, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/usr/local/lib/python3.8/dist-packages/click/core.py", line 1404, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/usr/local/lib/python3.8/dist-packages/click/core.py", line 760, in invoke
    return __callback(*args, **kwargs)
  File "/usr/local/lib/python3.8/dist-packages/click/decorators.py", line 26, in new_func
    return f(get_current_context(), *args, **kwargs)
  File "/usr/local/lib/python3.8/dist-packages/celery/bin/base.py", line 134, in caller
    return f(ctx, *args, **kwargs)
TypeError: purge() got an unexpected keyword argument 'ini'

It seems that pyramid-celery is requiring --ini but it is no longer supported or required by the upstream Celery method

AttributeError: 'NoneType' object has no attribute 'config_from_object'

Running the demo (redis etc installed and running):

pserve development.ini

Traceback (most recent call last):
File "/home/danjac/.virtualenvs/pyramid_celery_demo/bin/pserve", line 8, in
load_entry_point('pyramid==1.4a2', 'console_scripts', 'pserve')()
File "/home/danjac/.virtualenvs/pyramid_celery_demo/local/lib/python2.7/site-packages/pyramid-1.4a2-py2.7.egg/pyramid/scripts/pserve.py", line 47, in main
return command.run()
File "/home/danjac/.virtualenvs/pyramid_celery_demo/local/lib/python2.7/site-packages/pyramid-1.4a2-py2.7.egg/pyramid/scripts/pserve.py", line 290, in run
relative_to=base, global_conf=vars)
File "/home/danjac/.virtualenvs/pyramid_celery_demo/local/lib/python2.7/site-packages/pyramid-1.4a2-py2.7.egg/pyramid/scripts/pserve.py", line 318, in loadapp
return loadapp(app_spec, name=name, relative_to=relative_to, *_kw)
File "/home/danjac/.virtualenvs/pyramid_celery_demo/local/lib/python2.7/site-packages/PasteDeploy-1.5.0-py2.7.egg/paste/deploy/loadwsgi.py", line 247, in loadapp
return loadobj(APP, uri, name=name, *_kw)
File "/home/danjac/.virtualenvs/pyramid_celery_demo/local/lib/python2.7/site-packages/PasteDeploy-1.5.0-py2.7.egg/paste/deploy/loadwsgi.py", line 272, in loadobj
return context.create()
File "/home/danjac/.virtualenvs/pyramid_celery_demo/local/lib/python2.7/site-packages/PasteDeploy-1.5.0-py2.7.egg/paste/deploy/loadwsgi.py", line 710, in create
return self.object_type.invoke(self)
File "/home/danjac/.virtualenvs/pyramid_celery_demo/local/lib/python2.7/site-packages/PasteDeploy-1.5.0-py2.7.egg/paste/deploy/loadwsgi.py", line 146, in invoke
return fix_call(context.object, context.global_conf, *_context.local_conf)
File "/home/danjac/.virtualenvs/pyramid_celery_demo/local/lib/python2.7/site-packages/PasteDeploy-1.5.0-py2.7.egg/paste/deploy/util.py", line 56, in fix_call
val = callable(_args, **kw)
File "/home/danjac/Projects/pyramid_celery/pyramid_celery_demo/pyramid_celery_demo/init.py", line 11, in main
config = Configurator(settings=settings)
File "/home/danjac/.virtualenvs/pyramid_celery_demo/local/lib/python2.7/site-packages/pyramid-1.4a2-py2.7.egg/pyramid/config/init.py", line 307, in init
exceptionresponse_view=exceptionresponse_view,
File "/home/danjac/.virtualenvs/pyramid_celery_demo/local/lib/python2.7/site-packages/pyramid-1.4a2-py2.7.egg/pyramid/config/init.py", line 432, in setup_registry
self.include(inc)
File "/home/danjac/.virtualenvs/pyramid_celery_demo/local/lib/python2.7/site-packages/pyramid-1.4a2-py2.7.egg/pyramid/config/init.py", line 772, in include
c(configurator)
File "/home/danjac/.virtualenvs/pyramid_celery_demo/local/lib/python2.7/site-packages/pyramid_celery/init.py", line 57, in includeme
default_app.config_from_object(config.registry.settings)
File "/home/danjac/.virtualenvs/pyramid_celery_demo/local/lib/python2.7/site-packages/celery/local.py", line 169, in getattr
return getattr(self._get_current_object(), name)
AttributeError: 'NoneType' object has no attribute 'config_from_object'

options for celerybeat?

hi.
i was able to launch periodic tasks using something like this:

[celery]
BROKER_URL = redis://localhost:6379/0
CELERY_RESULT_BACKEND = redis://
CELERY_INCLUDE = miniscada.taskos
CELERY_REDIRECT_STDOUTS_LEVEL = info
CELERYD_CONCURRENCY = 3

[celerybeat:task1]
task = miniscada.taskos.lee_medidor
type = timedelta
schedule = {"seconds": 15}

it works fine. i tried to use options appending a line:
[celerybeat:task1]
task = miniscada.taskos.lee_medidor
type = timedelta
schedule = {"seconds": 15}
options = {"task_id" : "4445",}

but it doesnt work. is there a way we can pass options to the beat task ?

Delete operation in example fails

In the long_running_with_tm example, clicking delete fails as follows:

[2013-09-20 17:38:26,775: ERROR/MainProcess] Task long_running_with_tm.tasks.DeleteTask[cc9e58e8-f4c3-4efb-9cb9-14e6670b6345] raised exception: Task of kind 'long_running_with_tm.tasks.DeleteTask' is not registered, please make sure it's imported.
Traceback (most recent call last):
  File "D:\experiments\env\lib\site-packages\billiard-2.7.3.32-py2.7.egg\billiard\pool.py", line 293, in worker
    result = (True, func(*args, **kwds))
  File "D:\experiments\env\lib\site-packages\celery-3.0.23-py2.7.egg\celery\task\trace.py", line 325, in _fast_trace_task
    return _tasks[task].__trace__(uuid, args, kwargs, request)[0]
  File "D:\experiments\env\lib\site-packages\celery-3.0.23-py2.7.egg\celery\app\registry.py", line 20, in __missing__
    raise self.NotRegistered(key)
NotRegistered: 'long_running_with_tm.tasks.DeleteTask'

This is on Windows 7 with pyramid_celery 1.3 on Python 2.7, using virtualenv.

Passing variables to .ini config file

My config file makes use of variables like so:

sqlalchemy.url = postgresql+psycopg2://%(db_user)s:%(db_pass)[email protected]/%(db_name)s

And I pass them along like so:

pserve production.ini db_user=user db_pass=pass db_name=name

How can I pass these variables along using pceleryd?

Getting a myapp request

How can I get the request object like the one in pshell development.ini so I can use request.route_url in the render usage in a task? (from pyramid.renderers import render)

Now I got the error by just use the from pyramid.request import Request for render:

  File "build/bdist.macosx-10.8-intel/egg/webob/request.py", line 443, in host_url
    url = scheme + '://'
TypeError: unsupported operand type(s) for +: 'NoneType' and 'str'

I've tried and google a lot but still no idea the best way to fix it. Any suggestion?

Error with celery/ mongo

Hi
I face an error I did not have some time ago.
Final error is : "InvalidName: database names cannot contain the character '/'"

My celeryd process starts correctly:

  • ** ---------- [config]
  • ** ---------- .> broker: mongodb://localhost%2Fmobyle2_tests/mobyle2_tests

The error occurs when I call the delay method on my task

download.delay(url, options)

File "/home/osallou/Development/NOSAVE/mobyle2/feature_newobjectmanager/local/lib/python2.7/site-packages/celery/app/task.py", line 358, in delay
return self.apply_async(args, kwargs)
File "/home/osallou/Development/NOSAVE/mobyle2/feature_newobjectmanager/local/lib/python2.7/site-packages/celery/app/task.py", line 474, in apply_async
*_options)
File "/home/osallou/Development/NOSAVE/mobyle2/feature_newobjectmanager/local/lib/python2.7/site-packages/celery/app/amqp.py", line 249, in publish_task
*_kwargs
File "/home/osallou/Development/NOSAVE/mobyle2/feature_newobjectmanager/local/lib/python2.7/site-packages/kombu/messaging.py", line 164, in publish
routing_key, mandatory, immediate, exchange, declare)
File "/home/osallou/Development/NOSAVE/mobyle2/feature_newobjectmanager/local/lib/python2.7/site-packages/kombu/connection.py", line 453, in _ensured
return fun(_args, *_kwargs)
File "/home/osallou/Development/NOSAVE/mobyle2/feature_newobjectmanager/local/lib/python2.7/site-packages/kombu/messaging.py", line 176, in _publish
[maybe_declare(entity) for entity in declare]
File "/home/osallou/Development/NOSAVE/mobyle2/feature_newobjectmanager/local/lib/python2.7/site-packages/kombu/messaging.py", line 107, in maybe_declare
return maybe_declare(entity, self.channel, retry, **retry_policy)
File "/home/osallou/Development/NOSAVE/mobyle2/feature_newobjectmanager/local/lib/python2.7/site-packages/kombu/common.py", line 90, in maybe_declare
return _maybe_declare(entity)
File "/home/osallou/Development/NOSAVE/mobyle2/feature_newobjectmanager/local/lib/python2.7/site-packages/kombu/common.py", line 102, in _maybe_declare
entity.declare()
File "/home/osallou/Development/NOSAVE/mobyle2/feature_newobjectmanager/local/lib/python2.7/site-packages/kombu/entity.py", line 494, in declare
self.queue_declare(nowait, passive=False)
File "/home/osallou/Development/NOSAVE/mobyle2/feature_newobjectmanager/local/lib/python2.7/site-packages/kombu/entity.py", line 520, in queue_declare
nowait=nowait)
File "/home/osallou/Development/NOSAVE/mobyle2/feature_newobjectmanager/local/lib/python2.7/site-packages/kombu/transport/virtual/init.py", line 404, in queue_declare
return queue, self._size(queue), 0
File "/home/osallou/Development/NOSAVE/mobyle2/feature_newobjectmanager/local/lib/python2.7/site-packages/kombu/transport/mongodb.py", line 78, in _size
return self.client.messages.find({'queue': queue}).count()
File "/home/osallou/Development/NOSAVE/mobyle2/feature_newobjectmanager/local/lib/python2.7/site-packages/kombu/transport/mongodb.py", line 193, in client
self._client = self._open()
File "/home/osallou/Development/NOSAVE/mobyle2/feature_newobjectmanager/local/lib/python2.7/site-packages/kombu/transport/mongodb.py", line 125, in _open
database = getattr(mongoconn, dbname)
File "/home/osallou/Development/NOSAVE/mobyle2/feature_newobjectmanager/local/lib/python2.7/site-packages/pymongo/mongo_client.py", line 1143, in getattr
return database.Database(self, name)
File "/home/osallou/Development/NOSAVE/mobyle2/feature_newobjectmanager/local/lib/python2.7/site-packages/pymongo/database.py", line 74, in init
_check_name(name)
File "/home/osallou/Development/NOSAVE/mobyle2/feature_newobjectmanager/local/lib/python2.7/site-packages/pymongo/database.py", line 39, in _check_name
"character %r" % invalid_char)
InvalidName: database names cannot contain the character '/'

pcelerybeat yields a "BaseBeatCommand" is not defined error

I got an error when I executed pcelerybeat as follows:

$ pcelerybeat development.ini 
Traceback (most recent call last):
  File "env/bin/pcelerybeat", line 9, in <module>
    load_entry_point('pyramid-celery==1.3', 'console_scripts', 'pcelerybeat')()
  File "env/lib/python2.7/site-packages/pkg_resources.py", line 339, in load_entry_point
    return get_distribution(dist).load_entry_point(group, name)
  File "env/lib/python2.7/site-packages/pkg_resources.py", line 2457, in load_entry_point
    return ep.load()
  File "env/lib/python2.7/site-packages/pkg_resources.py", line 2171, in load
    ['__name__'])
  File "env/lib/python2.7/site-packages/pyramid_celery/commands/celerybeat.py", line 12, in <module>
    class BeatCommand(CommandMixin, BaseBeatCommand):
NameError: name 'BaseBeatCommand' is not defined

Not installable from pypi

The 0.1 release on pypi is not installable: there is no downloadable source available. You can upload that using this command:

python setup.py sdist upload

Can't define a result backend

Maybe this is just a PEBKAC, however I have found that pyramid_celery doesn't allow to define a result backend.

I'm not sure what makes this happen, as other configuration options are handled correctly except the backend ones. I use the exact same options like in celeryconfig.py (minus quotes) and it just doesn't reach celery. If I call a get() on a task, I get an error, that no backend is defined.

I had to uninstall pyramid_celery completely and remove all references from the code to make it work again.

Am I doing something wrong? It seems odd that I'm the first with that problem…unfortunately I can't make much sense out of the Pyramid-specific code so I can't even guess what's going on.

celerybeat configuration impossible using .ini

If pyramid_celery is included, celeryconfig.py is ignored. That’s bad, because in order to configure celerybeat, one has to import modules like timedelta or crontab, otherwise the eval() while parsing fails.

I haven’t found a way to achieve that so I claim that pcelerybeat is mostly useless – or did I miss something?

I don’t really have an off-hand solution to this problem – maybe add a configuration option with dependencies that are needed in the eval() context?

broker_transport_options seems to not work with celery 4.1

when i set BROKER_TRANSPORT_OPTIONS = {"confirm_publish": True} in my ini file celery fails to load

(pyramid_celery gives the dict as a string to celery and it seems to choke on that)

added this to pyramid_celery/loaders.py

dict_settings = ['BROKER_TRANSPORT_OPTIONS']

    import ast
    for setting in dict_settings:
       if setting in config_dict:
           config_dict[setting] = ast.literal_eval(config_dict[setting])

Seems to fix the issue (celery starts perfectly) ... no idea if this is the proper way but it works.

line 14 in tasks.py in the demo app makes assumptions about the db which may not be valid for an async task

with two instances of pceleryd running, wild clicking on "delete" links in the web interface made it possible to schedule multiple delete-tasks for the same TaskItem, which resulted in an exception in the task (IndexError when trying to get the first item of an empty query) and in an BrokenPipe error in the web app server . While this is a minor bug, I think it would be a good thing to promote correct handling of asynchronous code in this demo app.

pyramid_celery_demo doesn't exist in repo

The "Demo" section of the documentation is confusing. It says "check out pyramid_celery_demo" which makes me assume it is either its own repo or maybe a branch, but that doesn't appear to be the case. It isn't in the examples directory either. How do I get this demo working? Maybe it was part of an older version?

pceleryctl has an import error

with celery==3.0.19

vagrant@orion:~$ pceleryctl 
Traceback (most recent call last):
  File "/usr/local/bin/pceleryctl", line 9, in <module>
    load_entry_point('pyramid-celery==1.3', 'console_scripts', 'pceleryctl')()
  File "/usr/lib/python2.7/dist-packages/pkg_resources.py", line 337, in load_entry_point
    return get_distribution(dist).load_entry_point(group, name)
  File "/usr/lib/python2.7/dist-packages/pkg_resources.py", line 2279, in load_entry_point
    return ep.load()
  File "/usr/lib/python2.7/dist-packages/pkg_resources.py", line 1989, in load
    entry = __import__(self.module_name, globals(),globals(), ['__name__'])
  File "/usr/local/lib/python2.7/dist-packages/pyramid_celery/commands/celeryctl.py", line 3, in <module>
    from celery.bin.celeryctl import help as BaseHelp
ImportError: cannot import name help

Admins list isn't read correctly

I've been working on hooking up Celery with Pyramid and ran into what I think is a limitation of the ini-based configuration.

To get error-mails sent when a task fails, I need to set the ADMINS list. However, adding an admins variable in the [celery] config section makes this a string rather than the list of 2-tuples that Celery wants. Adding it to the list_settings list also doesn't work, as that makes a flat list, which is not accepted.

Extending the read_configuration() method of the INILoader class with the following (after processing list_settings) does the trick, though I'm not sure it's the best possible solution.

# Following after list_settings parsing

tuple_list_settings = ['ADMINS']

for setting in tuple_list_settings:
    if setting in config_dict:
        items = config_dict[setting].split()
        tuple_settings = [tuple(item.split(',')) for item in items]
        config_dict[setting] = tuple_settings

The corresponding ini file would look like this:

[celery]
admins =
    development,[email protected]
    excecptions,[email protected]

Missing environment after dying worker

From time to time (unfortunately, several times a day), I get a WorkerLostError('Worker exited prematurely.',).

After that, my code that accesses pyramid's settings starts throwing TypeError("'NoneType' object is not subscriptable",).

I presume, that when a replacement worker is started up, it doesn't have access to the pyramid environment?

Example is confusing

In reading the example, the term "task" is very overloaded. It refers to a model object, a property of that object, as well as celery's task concept. For example:

@task
def add_task(task):
    time.sleep(random.choice([2,4,6,8,10]))
    print 'creating task %s' % task
    task = TaskItem(task=task)
    DBSession.add(task)
    transaction.commit()

I would suggest adopting some other arbitrary model for the purposes of the example. It would make it easier for a newb such as myself to understand. :)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.