Giter Club home page Giter Club logo

rocketry's Introduction

The engine to power your Python apps

Test Test coverage Package version Supported Python versions


What is it?

Rocketry is a modern statement-based scheduling framework for Python. It is simple, clean and extensive. It is suitable for small and big projects.

This is how it looks like:

from rocketry import Rocketry
from rocketry.conds import daily

app = Rocketry()

@app.task(daily)
def do_daily():
    ...

if __name__ == '__main__':
    app.run()

Core functionalities:

  • Powerful scheduling
  • Concurrency (async, threading, multiprocess)
  • Parametrization
  • Task pipelining
  • Modifiable session also in runtime
  • Async support

Links:

Why Rocketry?

Unlike the alternatives, Rocketry's scheduler is statement-based. Rocketry natively supports the same scheduling strategies as the other options, including cron and task pipelining, but it can also be arbitrarily extended using custom scheduling statements.

Here is an example of custom conditions:

from rocketry.conds import daily, time_of_week
from pathlib import Path

@app.cond()
def file_exists(file):
    return Path(file).exists()

@app.task(daily.after("08:00") & file_exists("myfile.csv"))
def do_work():
    ...

Rocketry is suitable for quick automation projects and for larger scale applications. It does not make assumptions of your project structure.

Installation

Install Rocketry from PyPI:

pip install rocketry

More Examples

Here are some more examples of what it can do.

Scheduling:

from rocketry.conds import every
from rocketry.conds import hourly, daily, weekly, 
from rocketry.conds import time_of_day
from rocketry.conds import cron

@app.task(every("10 seconds"))
def do_continuously():
    ...

@app.task(daily.after("07:00"))
def do_daily_after_seven():
    ...

@app.task(hourly & time_of_day.between("22:00", "06:00"))
def do_hourly_at_night():
    ...

@app.task((weekly.on("Mon") | weekly.on("Sat")) & time_of_day.after("10:00"))
def do_twice_a_week_after_ten():
    ...

@app.task(cron("* 2 * * *"))
def do_based_on_cron():
    ...

Pipelining tasks:

from rocketry.conds import daily, after_success
from rocketry.args import Return

@app.task(daily.after("07:00"))
def do_first():
    ...
    return 'Hello World'

@app.task(after_success(do_first))
def do_second(arg=Return('do_first')):
    # arg contains the value of the task do_first's return
    ...
    return 'Hello Python'

Parallelizing tasks:

from rocketry.conds import daily

@app.task(daily, execution="main")
def do_unparallel():
    ...

@app.task(daily, execution="async")
async def do_async():
    ...

@app.task(daily, execution="thread")
def do_on_separate_thread():
    ...

@app.task(daily, execution="process")
def do_on_separate_process():
    ...

Interested?

Read more from the documentation.

About Library

rocketry's People

Contributors

addisonhardy avatar ayr-ton avatar carlosm27 avatar djnnvx avatar egisxxegis avatar marksmayo avatar miksus avatar naxrevlis avatar nsht avatar rohansh-tty avatar rohanshetty-dev avatar rpadaki avatar tekumara avatar theblackmallard avatar tobi-de avatar zakkg3 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

rocketry's Issues

ENH: Better argument support for custom condition

Describe the solution you'd like
At the moment this is not possible:

from redengine.args import Task

@app.cond("is things")
def is_things(task=Task()):
    return True or False

If this was a starting condition of a task, that task should be put as the argument.

Additional context
Maybe the task injection to the conditions could be a bit reworked to work uniformly with user-defined conditions.
Maybe we could have a check method in the conditions for such purposes instead of relying to __bool__.

ENH: Pass parameters for manual run

Is your feature request related to a problem? Please describe.
When running a task manually (using force_run), sometimes it's useful to pass parameters for this particular manual run.

Describe the solution you'd like
Possibly a core change. An attribute manual_parameters or so that are used only once and only when the task is run with force_run.

Describe alternatives you've considered
Cannot think of such.

BUG Using CSVFileRepo raise NotImplementedError

Install

pip install rocketry==2.4.1

Code

import datetime

from rocketry import Rocketry
from redbird.repos import CSVFileRepo


app = Rocketry(logger_repo=CSVFileRepo(filename='logs.csv'))


@app.task('secondly')
def do_things():
    print(datetime.datetime.now())


if __name__ == '__main__':
    app.run()

It raise NotImplementedError.

Traceback (most recent call last):
  File "C:\Users\Administrator\AppData\Local\Programs\Python\Python38\lib\site-packages\redbird\templates.py", line 68, in last
    return self.repo.query_read_last(self.query_)
  File "C:\Users\Administrator\AppData\Local\Programs\Python\Python38\lib\site-packages\redbird\templates.py", line 309, in query_read_last
    raise NotImplementedError("Read using first not implemented.")
NotImplementedError: Read using first not implemented.

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "D:/mycode/Rocketry_examples/04 日志.py", line 11, in <module>
    def do_things():
  File "C:\Users\Administrator\AppData\Local\Programs\Python\Python38\lib\site-packages\rocketry\tasks\func.py", line 193, in __call__
    super().__init__(func=func, **self._delayed_kwargs)
  File "C:\Users\Administrator\AppData\Local\Programs\Python\Python38\lib\site-packages\rocketry\core\task.py", line 275, in __init__
    self.set_cached()
  File "C:\Users\Administrator\AppData\Local\Programs\Python\Python38\lib\site-packages\rocketry\core\task.py", line 825, in set_cached
    self.last_run = self._get_last_action("run", from_logs=True, logger=logger)
  File "C:\Users\Administrator\AppData\Local\Programs\Python\Python38\lib\site-packages\rocketry\core\task.py", line 1064, in _get_last_action
    value = self._get_last_action_from_log(action, logger)
  File "C:\Users\Administrator\AppData\Local\Programs\Python\Python38\lib\site-packages\rocketry\core\task.py", line 1074, in _get_last_action_from_log
    record = logger.get_latest(action=action)
  File "C:\Users\Administrator\AppData\Local\Programs\Python\Python38\lib\site-packages\rocketry\core\log\adapter.py", line 91, in get_latest
    return self.filter_by(**kwargs).last()
  File "C:\Users\Administrator\AppData\Local\Programs\Python\Python38\lib\site-packages\redbird\templates.py", line 70, in last
    return super().last()
  File "C:\Users\Administrator\AppData\Local\Programs\Python\Python38\lib\site-packages\redbird\base.py", line 57, in last
    for item in self.query():
  File "C:\Users\Administrator\AppData\Local\Programs\Python\Python38\lib\site-packages\redbird\templates.py", line 23, in query
    yield from items
  File "C:\Users\Administrator\AppData\Local\Programs\Python\Python38\lib\site-packages\redbird\repos\csv.py", line 82, in query_items
    yield from read_items(self, self.read_file(), query)
  File "C:\Users\Administrator\AppData\Local\Programs\Python\Python38\lib\site-packages\redbird\utils\query.py", line 39, in read_items
    for data in reader:
  File "C:\Users\Administrator\AppData\Local\Programs\Python\Python38\lib\site-packages\redbird\repos\csv.py", line 114, in read_file
    reader = self.get_reader(file)
  File "C:\Users\Administrator\AppData\Local\Programs\Python\Python38\lib\site-packages\redbird\repos\csv.py", line 143, in get_reader
    return csv.DictReader(buff, fieldnames=self.get_headers(), **self.kwds_csv)
  File "C:\Users\Administrator\AppData\Local\Programs\Python\Python38\lib\site-packages\redbird\repos\csv.py", line 105, in get_headers
    raise TypeError("Cannot determine CSV headers")
TypeError: Cannot determine CSV headers

ENH: Allow running same task parallel

Is your feature request related to a problem? Please describe.
Currently, a task can run only once at a time. It does not support launching the same task simultaneously.

Describe the solution you'd like
Something like this:

@app.task(multilaunch=True)
def do_things(arg):
    print("Doing", arg)
    time.sleep(2)
    print("Did", arg)

If it was called like this:

task.run(arg="1")
task.run(arg="2")

This should print:

Doing 1
Doing 2
Did 1
Did 2

Currently, Rocketry checks whether the task is already running before running it. And it will block second run if it was running.

Describe alternatives you've considered
Create duplicate tasks with different names.

Additional context
By default this should be not permitted as the logs may become a mess.

ENH: Task groups

Is your feature request related to a problem? Please describe.
Allow creating groups of tasks so they can be modified at once, like:

  • Setting a prefix to the names of the tasks (ie, "scrape-")
  • Setting common start condition (ie. must run in certain time, like on week days)
  • Setting common parameters

Can also have view functionality for easier handling of the group. Also adds options for new conditions like GroupSucceeded (True if all tasks in the group has succeeded).

Describe the solution you'd like
Possibly new parameter to Task initiation (group) and a new class (maybe to extensions).

ENH Avoid mem build-up in default log storage

Description
The use of RedBird to store logs can easily lead to unexpected memleaks, and it is both a difficult problem to debug, and a little awkward to handle that on the side of the client.

Consider the following script:

import tracemalloc
from rocketry import Rocketry
app = Rocketry(execution="main")
tracemalloc.start()
last_count, _ = tracemalloc.get_traced_memory()

@app.task("every 1 second")
def do_things() -> None:
    global last_count
    new_count, _ = tracemalloc.get_traced_memory()
    print(new_count - last_count)
    last_count = new_count

if __name__ == "__main__":
    app.run()

Which results in the following input:

145402
14344
6636
6604
6668
6604
6604
6604
6677
7290
6604

So every run causes a memory build-up of around 6.5KB even if no logging is made, other than the default Rocketry logging. In 24h that would be over 500MB.

Using tracemalloc.take_snapshot() it is easy to check that this is caused by the RedBird in-memory log store.

This is solvable, but in an arguably awkward way:

import tracemalloc

from redbird import BaseRepo
from redbird.repos import MemoryRepo
from rocketry import Rocketry
from rocketry.log.log_record import LogRecord
logger_repo = MemoryRepo(model=LogRecord)
app = Rocketry(execution="main", logger_repo=logger_repo)
tracemalloc.start()
last_count, _ = tracemalloc.get_traced_memory()

@app.task("every 1 second")
def do_things() -> None:
    global last_count
    logger_repo.filter_by().delete()
    new_count, _ = tracemalloc.get_traced_memory()
    print(new_count - last_count)
    last_count = new_count

if __name__ == "__main__":
    app.run()

The reason why I say this is awkward is that it requires:

  • Knowing that a memory build-up is a potential problem in the basic use-case of Rocketry. I think most developers won't think about that.
  • Debugging after coming across the problem, potentially after it having undesired consequences on a real-world application.
  • Coming up with this (or some other) solution, which is not very trivial.

Potential solution
Ideally, Rocketry's default log repo has some mechanism to evict old records automatically, which would be configurable. So for example, if we simply did:

app = Rocketry()

We would get a logger repo that would keep, for example, the last 100 log messages only. If we wanted to keep all records, we'd just pass the basic MemoryRepo as in my example above. If we wanted to control the amount of records kept, we could do something like:

from redbird.repos import FIFOMemoryRepo
from rocketry.log.log_record import LogRecord

app = Rocketry(logger_repo=FIFOMemoryRepo(model=LogRecord, size=50))

The FIFOMemoryRepo class would have to be implemented in RedBird, of course.

BUG - second manual process run freezes the async app.serve()

Describe the bug
When Rocketry works in async fashion through app.serve(), it becomes frozen when specific manual-run task sequence is executed.
Task sequence:
run_me_madam starts on async
run_me_madam finishes
run_me_sir starts on process (not async)
app.serve() becomes frozen

To Reproduce
For reproducing the bug I will paste code at the bottom of issue and will pinpoint commit, after which the aforementioned bug arose.

Expected behavior
starting manually a task after one has finished, should not freeze the async Rocketry.serve(). In this case it is expected to see run_me_sir finish.

Screenshots
When trying to KeyboardInterrupt the frozen async Rocketry.serve(), such error always appears:
image

Desktop (please complete the following information):

  • OS: Windows 10 Home
  • Python version 3.10.4
  • On Rocketry v2.4.1 bug does not exist.
  • On Rocketry v2.5.0, v2.5.1 and latest codebase bug does exist
  • Specific commit, that introduces the bug: pip install git+https://github.com/Miksus/rocketry.git@c9480a9c847704a0a57b4c6770eeb1028bf029a0

Additional context
Here I paste the code.

from time import sleep
import asyncio

from rocketry import Rocketry

# #
# Rocketry app
app = Rocketry(
    config={
        "execution": "async",
        "max_process_count": 100,
    }
)

# #
# tasks
@app.task("every 10 seconds")
async def mark_10_seconds():
    print("10 seconds have passed")


@app.task(execution="async")
async def run_me_madam():
    print("Madam, I am going to sleep for 2 seconds")
    sleep(2)
    print("Madam, I am done with sleeping")


@app.task(execution="process")
async def run_me_sir():
    print(f"Sir, I am going to sleep for 2 seconds.")
    sleep(2)
    print("Sir, I am done with sleeping")


if __name__ == "__main__":

    async def task_manual_run_flow():
        await asyncio.sleep(3)
        print(
            "will run run_me_madam task - on async (or even if on process, result would be the same)"
        )
        app.session["run_me_madam"].run()
        await asyncio.sleep(10)
        print("madam task should be done by now")
        print("will run run_me_sir - on process")
        app.session["run_me_sir"].run()
        print("doing await asyncio.sleep(5). If it takes forever, you are stuck here.")
        await asyncio.sleep(5)
        print(
            "launched run me sir through session. If you do not see this message, it means that app.serve() never returned control via await."
        )

    async def main_main():
        return await asyncio.gather(app.serve(), task_manual_run_flow())

    asyncio.run(main_main())

ENH: Add async support

Hi there ! Thanks for your work !

Is your feature request related to a problem? Please describe.
It'll be great to be able to run tasks that use async code.

Describe the solution you'd like
A transparent asyncio support to be able to run async code.

ENH: Custom code after task failure/success

Is your feature request related to a problem? Please describe.
Allow running custom code after task failure/success.

Describe the solution you'd like
Possibly:

  • hooks hook_success, hook_failure etc.
  • Init params for the call backs, name suggestions
    • callback_fail, callback_success
    • fail_callback, success_callback
    • fail_cb, success_cb
    • cb_fail, cb_success
    • on_fail, on_success

Describe alternatives you've considered
Subclass and override process_success and process_failure.

BUG - setting `task_execution` to main does not seem to work.

Describe the bug
I want to have periodic tasks running in my main thread, but it does not seem to work. if initialized with nothing, rocketry will:

  1. attempt to run the tasks as async
  2. fail on the tasks because they cannot be async

this is ok because the log messages are helpful in suggesting that I should initialize the Rocketry app with `Rocketry(task_execution='async') or something.

Problem is that when I initialize with (task_execution='main'), the startup fails. It seems that the log message is not up to date because docs suggest using ```python
config={
'task_execution': 'main'
}


but this has apparently no affect on the configuration (see logs below)

**To Reproduce**
Here is the code I've used to initialize the tasks:
```python
from api.core.management.jobs import something_else
from api.core.management.jobs import something

from django.core.management.base import BaseCommand
import logging

from rocketry import Rocketry
from rocketry.conds import daily

from rocketry import Rocketry, Session
from rocketry.conds import daily, every
from rocketry.log import MinimalRecord
from redbird.repos import CSVFileRepo

class JobLogger(MinimalRecord):
    exc_text: Optional[str] = Field(description="Exception text")


repo = CSVFileRepo(filename="/tmp/api-logs/jobs.csv", model=JobLogger)

scheduler : Rocketry = Rocketry(
        logger_repo=repo,
        config={
            'task_execution': 'main'
        }
    )

# i've attempted to set scheduler.session.config.task_execution to `main` here, but to no avail

@scheduler.task(daily.at("00:10"), execution='main')
def do_something() -> None:
    logging.getLogger(__name__).info("Running do_something()")
    something.run()


@scheduler.task(daily.at("23:55"), execution='main')
def do_something_else() -> None:
    logging.getLogger(__name__).info("Running do_something_else()")
    something_else.run()


class Command(BaseCommand):
    help = "Setup the periodic jobs runner"

    def handle(self, *args, **options):
          scheduler.run()

Expected behavior
Run the tasks in my main thread and not have the app crash on startup.

Screenshots

/usr/local/lib/python3.10/site-packages/rocketry/session.py:73: FutureWarning: Default execution will be changed to 'async'. To suppress this warning, specify task_execution, ie. Rocketry(task_execution='async') 

and then later on, when the job starts:

Traceback (most recent call last):
  File ""/usr/local/lib/python3.10/site-packages/rocketry/core/task.py"", line 536, in _run_as_async
    output = await self.execute(**params)
  File ""/usr/local/lib/python3.10/site-packages/rocketry/tasks/func.py"", line 234, in execute
    output = func(**params)
  File ""/server/api/core/management/commands/initjobs.py"", line 40, in do_something
    do_something.run()
  File ""/server/api/core/management/jobs/do_something.py"", line 46, in run
    for value in fetched_from_database:
  File ""/usr/local/lib/python3.10/site-packages/django/db/models/query.py"", line 394, in __iter__
    self._fetch_all()
  File ""/usr/local/lib/python3.10/site-packages/django/db/models/query.py"", line 1866, in _fetch_all
    self._result_cache = list(self._iterable_class(self))
  File ""/usr/local/lib/python3.10/site-packages/django/db/models/query.py"", line 87, in __iter__
    results = compiler.execute_sql(
  File ""/usr/local/lib/python3.10/site-packages/django/db/models/sql/compiler.py"", line 1393, in execute_sql
    cursor = self.connection.cursor()
  File ""/usr/local/lib/python3.10/site-packages/django/utils/asyncio.py"", line 24, in inner
    raise SynchronousOnlyOperation(message)
django.core.exceptions.SynchronousOnlyOperation: You cannot call this from an async context - use a thread or sync_to_async."

Desktop (please complete the following information):
Python version: 3.10.7
Rocketry version: 2.4.1
Django version: 4.1

Additional context
N/A

ENH: New condition, TaskMisfire

Is your feature request related to a problem? Please describe.
Often it is useful to report tasks that did not run during their intended period due to dependencies did not run or so.

Describe the solution you'd like
A new condition TaskMisfire which can be used as:

@FuncTask(start_cond="daily between 08:00 and 17:00 & after task 'another'")
def run_something():
    ...

@FuncTask(start_cond="after task 'run_something' misfired")
def report_misfire():
    ... # Runs if run_something did not run during the time specified in TaskExecution cond

Describe alternatives you've considered
Alternatively one could just create tasks that do something only if the intended task
did not run on its period.

BUG multilaunch process spams Terminate

Describe the bug
Log gets spammed with status:terminate messages. This bug exists for tasks, that are running on process and using multilaunch=True.

To Reproduce

  1. Copy paste code (I will add it to the bottom of issue)
  2. Use newest version of Rocketry (multilaunch=True is needed)
  3. Run code: python my_file_name.py
  4. Wait 30 seconds
  5. Check logs file

Expected behavior

  1. Terminate should not be called if task has done job with status:success
  2. If terminate gets called, I think it should terminate task only once (not every 0.1s)

Screenshots
Uploading screenshot of logs
image

Desktop (please complete the following information):

  • OS: Windows 10
  • Python version 3.10.4

Additional context
Here I paste the code


from datetime import timedelta
from time import sleep

from rocketry import Rocketry
from rocketry.log import TaskLogRecord
from redbird.repos import CSVFileRepo

# #
# Rocketry app
repo = CSVFileRepo(filename="check_me_after_30_seconds.csv", model=TaskLogRecord)
app = Rocketry(
    config={
        "task_execution": "async",
        "max_process_count": 4,
        "timeout": timedelta(seconds=30),
    },
    logger_repo=repo,
)


# #
# tasks
@app.task("every 10 seconds")
async def mark_10_seconds():
    print("10 seconds have passed")


@app.task(execution="process")
async def run_me_sir():
    print("Sir, I am going to sleep for 20 seconds")
    sleep(20)
    print("Sir, I am done with sleeping")


@app.task(execution="process", multilaunch=True)
async def run_multilaunch_me_madam():
    print("Madam, I as multilaunch am going to sleep for 20 seconds")
    sleep(20)
    print("Madam, I as multilaunch am done with sleeping")


if __name__ == "__main__":
    app.session["run_me_sir"].run()
    app.session["run_multilaunch_me_madam"].run()
    app.run()


ENH: Auto reload

Is your feature request related to a problem? Please describe.
Update automatically the scheduler according to file changes.

Describe the solution you'd like
At least the tasks should be auto reloaded: changes in the tasks should be reflected automatically
and removed tasks should be automatically removed in run time.

Possibly solved by good and more structured support for tasks read from YAML files. Could work like this:

  1. User specifies YAML files that contain the tasks
  2. User creates Python scripts where the task code lies
  3. User uses TaskLoader to load the YAML files
    • If a YAML file is missing or (some of) the tasks are missing, delete those tasks from the session
    • Only process execution is allowed so that the task functions never get loaded to memory

Also, this is possibly supported but requires documentation and possibly more testing.

Describe alternatives you've considered
Alternatively a robust restart mechanism so the scheduler is easy to restart when needed. This is probably already supported.

Or the user can make a Reload task that reloads all the relevant Python modules and deletes tasks that are no longer specified in these.

It has an app.run() but no app.stop()

Is your feature request related to a problem? Please describe.

Sometimes, you would want the scheduler to stop running once a certain condition occurs. For instance, I want to check periodically a certain status returned by some REST API. If that status has met my condition, I would want to stop the scheduler.

Describe the solution you'd like

An app.stop() which is the logical opposite of app.run()

Describe alternatives you've considered

I could just exit() the code once the condition is met but that won't work for all cases.

BUG TaskRunnable is missing __str__

Describe the bug
When using Rocketry with FastAPI, I return all tasks using such code: return rocketry_app.session.tasks .
But if I dynamically create task (rocketry_app.session.create_task(...)) and set its start_cond to conds.cron(...), then endpoint raises exception on returning tasks:
AttributeError: Condition <class 'rocketry.conditions.task.task.TaskRunnable'> is missing __str__.

To Reproduce
Create task in session, set its start cond to future date using conds.cron(...), return session.tasks via FastAPI endpoint (found reference example in Rocketry docs)

Expected behavior
No 500 internal error (no AttributeError)
I expect it to print something, to stringify the condition

Screenshots
I am sorry, no screenshots.

Desktop (please complete the following information):

  • OS: Windows10
  • Python version 3.10.4

Additional context
I am creating a pull request to fix this issue. Link:
pull request

ENH: multiple tasks in "after task '...'" syntax

Is your feature request related to a problem? Please describe.
This is too laboursome:

@FuncTask(start_cond="after task 'task-1' & after task 'task-2' & after task 'task-3'")
def do_things():
    ...

Describe the solution you'd like
Instead it could work like:

@FuncTask(start_cond="after tasks 'task-1', 'task-2', 'task-3'")
def do_things():
    ...

or:

@FuncTask(start_cond="after tasks 'task-1', 'task-2', 'task-3' succeeded")
def do_things():
    ...

ENH: Create task from class directly

Is your feature request related to a problem? Please describe.
Proposal for new task class: create a task directly from a class.
Requires possibly a new metaclass. The task will be created when the class itself is initiated.

from redengine.tasks import ClassTask

class MyTask(ClassTask):
    name = 'my-task'
    start_cond = 'daily'
    execution = 'process'

    def __init__(self, session):
        ... # Do whatever

    def execute(self):
        ... # Do whatever

Why? You can bundle all the functions related to the task nicely to one place.

Example, now one can do:

from redengine.tasks import ClassTask

def extract():
    ...

def transform(data):
    ...
    return data

def load(data):
    ...

@FuncTask(name="etl", start_cond="daily")
def process_etl():
    data = extract()
    data = transform(data)
    load(data)

Proposed:

from redengine.tasks import ClassTask

class ProcessETL(ClassTask):
    name = 'etl'
    start_cond = 'daily'

    def execute(self):
        data = self.extract()
        data = self.transform(data)
        self.load(data)

    def extract(self):
        ....

    def transform(self, data):
        ....
        return data

    def load(self, data):
        ...

ENH Support of background tasks

Is your feature request related to a problem? Please describe.
It seems not possible to run tasks in the background.

Describe the solution you'd like
I would like to run the tasks in the background so it can do other stuff in the meantime.

Describe alternatives you've considered
I use APScheduler right now but besides it supports background tasks I feel like Rocketry would be way more convenient if it supports this feature.

BUG - since I last pulled, base repo has test failures

Describe the bug
Latest codebase, pulle down.
Run pytest
test_build.py is failing, and /schedule/test_core.py runs for 30 min and is still going?

I also tried running them all individually from vscode, got some more intermittent failures.

To Reproduce
Latest codebase, pulle down.
Run pytest

Expected behavior
Tests to execute in full and pass

Screenshots
image

image

Desktop (please complete the following information):

  • OS: Win 11
  • Python 3.11
  • pytest 2.15.6

Additional context
Test failure for build:

image

BUG: Unavoidable (non-serious) warnings

Describe the bug
It seems the system warns about the task_execution to be changed in the future version.

To Reproduce
Import the library.

Expected behavior
No warnings raised when importing.

BUG Error while deleting a task

I am trying to delete a task, but Rocketry returns a KeyError.

import asyncio
from rocketry import Rocketry
from rocketry.conds import secondly

app = Rocketry()

@app.task(secondly, name='foo')
async def foo():
    ...


async def main():
    asyncio.create_task(app.serve())
    app.session.remove_task('foo')
    # app.session.tasks.remove('foo')


if __name__ == '__main__':
    asyncio.run(main())

ENH: Task return values to parameters

Describe the solution you'd like
The return values are passable via queue (if multiprocessing) or via direct modification (if thread or main). These values need to be processed in Scheduler. Probably requires a new Argument type.

The argument type can be named as ReturnArg and can be put to Parameters using the name of the task as the key or so.

ENH Distributed execution

Hello,

Do you have any plans to add distributed execution?

Something like:

@app.task(daily, queue: RedbirdQueue | RabbitMQQueue | RedisQueue | InMemoryQueue | SQLALchemyQueue)
def do_daily():
    ...


worker = RocketryWorker(queues=[my_queues])
worker.run()

ENH: TaskCond (Condition that is also a task)

Is your feature request related to a problem? Please describe.
There may be a situation in which checking the state of a condition consumes too much from the processor, is too slow or prone for freezing thus one may want a condition which check is parallelized.

Describe the solution you'd like
A TaskCond that could be used as:

from redengine.conditions import TaskCond

@TaskCond(start_cond="every 10 minutes", cooldown="10 minutes", syntax="site is on")
def site_is_on():
    ... # Expensive code
    return True or False

This condition is False when the state is not checked and then when it is, the condition is True or False depending on the state. Then there is a cooldown meaning that the condition is True for the specified time without actually rechecking the task.

If start_cond not specified, the task runs every time the cooldown is over. If cooldown is not specified, cooldown is specified as ethernal: the state changes only when rechecked. If cooldown ends before recheck (start_cond fulfilled and rerun), the state should be false.

Describe alternatives you've considered
A condition that is False if parameter value does not exist and a task that populates this parameter.

ENH: Proper way to communicate with the runtime

Is your feature request related to a problem? Please describe.
There is (possibly not completely working) HTTP API for communicating with the session but the communication is not really standardized.

Describe the solution you'd like
Make a base class for communication tasks. (ie. BaseCommunicator)

A communication task could:

  • Run control
    • Put a specific task on hold (disable)
    • Force a specific task to run
    • Shutdown or restart the scheduler or put it on hold
  • Attribute control
    • Set the start_cond, timeout etc. of a task
    • Set
  • Session control
    • Create or delete tasks
    • Create or delete parameters

Possible example runtime communicators:

  • HTTPCommunicator (already somewhat implemented)
  • MongoCommunicator: Read instructions from a MongoDB collection
  • IOCommunicator: Read instructions from a file (JSON, CSV etc.)

These should be permanently running tasks running on startup.

Problems:

  • If tasks are deleted or modified, how to save the changes (when the scheduler is restarted)?

ENH: Expose the condition classes as substitute for the string based conditions

Is your feature request related to a problem? Please describe.
The scheduling syntax is easy but it is error-prone and does not work with static code analysis.

For example, this has no highlighting and it is hard to validate before runtime:

@app.task("daily between 12:00 and 15:00 & time of week on Monday ")
def do_daily():
    ...

The syntax parser turns the strings to the Python instances that represent the conditions but the naming of these conditions is not always the best.

Describe the solution you'd like

Candidate A): use the existing (but maybe think better names):

@app.task(TaskExecutable(period=TimeOfDay("12:00", "15:00") & IsPeriod(period=TimeOfWeek("Mon", "Mon"))))
def do_daily():
    ...

Candidate B): have similarly named instances as in the string expressions:

@app.task(daily.between("12:00", "15:00") & time_of_week.on("Monday"))
def do_daily():
    ...

These also could be used in generating the condition syntax items instead of the ugly class methods that exist for them.

Additional context
This is related to the part of the library that is quite old and could have some cleaning. The logic of these conditions is based on ideas that evolved into better ones which means the code in this area is not as clear as it could be.

Question regarding flask

I have previously used APScheduler in Flask for some things I need to do in background.

The problem with flask is if you load it in gunicorn, or debug mode and have multiple threads, the tasks will get scheduled multiple times as its run multiple times.

Is there a workaround in red engine for this?

memory leak bug

Steps to reproduce the behavior.

# coding=utf-8

from memory_profiler import profile
from rocketry import Rocketry
from rocketry.conds import every

app = Rocketry()


@profile
@app.task(every("1 seconds"),  execution="async")
async def job():
    print(1)


if __name__ == '__main__':
    app.run()

View the dat file in the same directory,memory leaks will be found
image

  • OS: [win11]
  • python 3.9
  • rocketry 2.4.0

ENH: log misfire

Is your feature request related to a problem? Please describe.
Sometimes it may be beneficial to know when a task has not been launched in the given execution period due to other constraints in other conditions.

Describe the solution you'd like
In addition to last_success, last_fail etc. log similarly the last_misfire when a task did not execute in its period. This could be logged in __bool__ of a task.

Then one could build conditions like:

@FuncTask(start_cond="after task 'mytask' misfired")
def handle_misfire():
    ...

There could be an option to store the misfires in logs in session.config, for example "log_misfire": True.

Describe alternatives you've considered
Alternatively one could make a task that inspects the same thing and executes constantly.

Time drifting on task execution

For example, running a task in 10 sec interval produces the following output:

time to run:  2022-07-05 10:03:14.170614
time to run:  2022-07-05 10:03:24.659828
time to run:  2022-07-05 10:03:35.116288
time to run:  2022-07-05 10:03:45.570992
time to run:  2022-07-05 10:03:56.041514
time to run:  2022-07-05 10:04:06.542453
time to run:  2022-07-05 10:04:17.028015
time to run:  2022-07-05 10:04:27.530127
time to run:  2022-07-05 10:04:38.023248
time to run:  2022-07-05 10:04:48.461072
time to run:  2022-07-05 10:04:58.944541
time to run:  2022-07-05 10:05:09.412248
time to run:  2022-07-05 10:05:19.871020
time to run:  2022-07-05 10:05:30.386063
time to run:  2022-07-05 10:05:40.858525
time to run:  2022-07-05 10:05:51.296365
time to run:  2022-07-05 10:06:01.766899
time to run:  2022-07-05 10:06:12.253105
time to run:  2022-07-05 10:06:22.742304
time to run:  2022-07-05 10:06:33.174691
time to run:  2022-07-05 10:06:43.657969
time to run:  2022-07-05 10:06:54.157131

I'm wondering this will cause problem with task in long run.

The code:

from redengine import RedEngine
import datetime

app = RedEngine()

@app.task('every 10 sec')
def do_things():
    print("time to run: ", datetime.datetime.now())

if __name__ == "__main__":
    app.run()

ENH Parameterise scheduling interval and easily add/remove tasks

I really like the potential of this library. I might be misinterpreting the documentation, but I believe this feature is not (easily) available.

Is your feature request related to a problem? Please describe.
The tasks are not known beforehand/statics. I would like to be able to dynamically create tasks and add them to the scheduler (both before running and while running).

Describe the solution you'd like
Something along the lines of:

app.add_task(name="my_task",interval="every 10 minutes",callback=my_fun)
app.remove_task(name="my_task")

Describe alternatives you've considered
I have tried messing with the Session() and Task() things, as from the documentation it hints that you can create tasks this way.
However, I did not understand from the documentation how this would work other than manipulating existing tasks.

Additional context
I am writing a CLI app where I would like the load the schedule from a configuration file and also allow the user to dynamically add tasks to the schedule.

ENH: Add github actions for pylint

Is your feature request related to a problem? Please describe.
It'd be nice to gradually fix and turn on pylint errors (and maybe latest flake8/black checks). Having it as part of build CI/CD with github actions would help prevent increase of these.

Describe the solution you'd like
Add github actions for pylint eg https://github.com/marketplace/actions/github-action-for-pylint
Turn on initially with -ignore for

R0401,R0801,C0112,C0113,C0114,C0115,C0116,C0301,C0103,W0511,E0602,E1101,E1102,E0001,W2301,W0613,E0102,R0914,R0912,R1732,E0213,W0212,W0108,E1125,E0704,R0903,W0622,R0913,C0415,E0611,W0707,W0642,R1710,W0703,R0913,W0106,W0223,W0221,C0412,W0201,,R0904,R1704,E0601,R0902,W0236,W0612,W0611,W0143,W0109,W0231,W0621,W0107,W0104,W0621,E1121,W0706,W0122,R0915,W0631,C0302,E0401

and then can slowly remove those as the project quality improves.

Describe alternatives you've considered
Also might be worth considering black and / or flake8

Additional context
A measure of code quality

ENH: Drop Pandas as dependency

Is your feature request related to a problem? Please describe.
Pandas, and Numpy, are quite large and often problematic and slow to build.

Describe the solution you'd like
Drop Pandas as a hard dependency from setup.py. This also requires refactoring some of the date-related functionalities such as timeout parsing, time periods etc.

It should also be heavily tested the substitutes work exactly the same.

Additional context
Pandas' date functionalities are quite extensive, unique and unfortunately well embedded in the library. I think they are partly written in Cython so the performance is probably better as well than writing substitutes in pure Python.

BUG set_repo doesn't work, though it is mentioned in the documentation

Describe the bug
I was going through the documentation here: https://rocketry.readthedocs.io/en/stable/cookbook/robust_applications.html?highlight=email#to-csv , and following the steps to add a repo to an app session, I get the error:

AttributeError: 'Session' object has no attribute 'set_repo'

To Reproduce
This simple scripts will produce the error:

from rocketry import Rocketry
from rocketry.log import LogRecord
from redbird.repos import CSVFileRepo

app = Rocketry(config={
    'task_execution': 'async',
    'silence_task_prerun': True,
    'silence_task_logging': True,
    'silence_cond_check': True
})
repo = CSVFileRepo(model=LogRecord, filename="tasks.csv")
repo.create()
app.session.set_repo(repo, delete_existing=True)

Expected behavior
The repo is set without any errors

Screenshots
If applicable, add screenshots to help explain your problem.

Desktop (please complete the following information):

  • OS: MacOs
  • Python version: Python 3.9.13

Additional context
What else can I do to set the repo in the meantime?

ENH Optionally specify timezone

Is your feature request related to a problem? Please describe.
Some people would like to use Rocketry with other timezones. For example, if the code is developed on a timezone +2 and the machine where it is run is at timezone +0, there could be problems. Running the application in the developer's machine vs in the production environment would result tasks to be run at different times between these environments.

For example, there is a task scheduled to run at 10:00. If the developer runs the application, the task will be run 10:00 in his/her local time but it would be 08:00 in the production server's time. The development environment probably should run tasks at the same universal times as the production environment.

Describe the solution you'd like
Optionally specify a timezone in the session object. Probably also requires the time measurement will be done in the same function throughout the library.

Single function for measurement has the added benefit that one could override it in order to build unit tests: check a task is indeed runned when the current time is 10:00, for example.

Describe alternatives you've considered
One could override the time.time but that has added problems. Possibly no feasible alternatives without core changes.

Additional context
The measurement of time is done using time.time (instead of datetime.datetime.now) in order to work together with logging (as it uses time.time). There is a bug in Python that datetime.datetime.now and time.time do rounding differently causing problems in quickly executed tasks if datetime.datetime.now was used.

The logs store the timestamps in timezone agnostic way (using Unix epoch).

ENH Support setting time variability (randomized offsets)

Is your feature request related to a problem? Please describe.
Hi! I'm thinking of using this library as part of a scheduler / manager to automate various tasks including web-scraping and running parts of a multi-agent system. I have some tasks that either shouldn't or don't need to be run at exactly defined moments (e.g. web-scrapers and randomized load balancing) and it would be very helpful to be able to set a spread or error bar to a time / trigger condition. I.e. set some acceptable leeway that's non-fixed.

Describe the solution you'd like
Specifically I want to introduce non-fixed acceptable leeway -- randomly sampled error from a range -- to defined start- and end-times in a way that has the errors change each time a condition is evaluated (not set the error once and re-use it). This should be handled under the hood by Rocketry to minimize boilerplate code / code complexity.

In practice what I envision is one or more of the following extensions:

  1. To use an optional language-based syntax that immediately follows a time. I'm not yet sure what it should look like but I'm thinking along the lines of 
@app.task("daily after 07:00 +/- 10 minutes") and "daily after 07:00 +/- 2 hours. Would apply to the Condition API as well: every("10 seconds +/- 2 seconds").
  2. To pass the spread as an optional argument. This avoids adding complexity to the language parser but we lose granularity on which times to affect -- it becomes more natural for the spread arg to affect all times defined in the string.
  3. Specifically for the Condition API it might be an option to add a spread or error attribute to the members of rocketry.cond (after, between, on, ...).

Choice of sampling method used to set the final offset can in theory also be a user choice but I'm not sure where that parameter is best defined -- it adds more complexity to the language syntax. Passed as an argument or maybe set on the Session beforehand might be better options. I see two viable sampling methods: Uniform random and Gaussian.

Then there's a question of the spread size. It makes no sense to say "+/- 1 day" or "+/- 7 hours" in the above examples, so there should perhaps be some constraint handling in place. Exactly which ones is unclear.

Describe alternatives you've considered
I have considered dynamically setting or adjusting times on a per run basis using my own code external to Rocketry. Functionally this will likely be fine but it adds complexity to my code and I believe others might enjoy this feature too. It's unclear to me how I can implement variation on the timing in a way that's handled under the hood by Rocketry. I know the request might seem odd in the face of Rocketry trying to be as precise and timely as possible but I like this library and I see my suggestions here as an extension to fill a niche.

Additional context
I had a look at #89 (comment) and like the idea presented there but as far as I see that fulfills a different need (i.e. setting a probability of running a task at all, time still being exact). I would love to provide a similarly small example but haven't yet figured out the internals of Rocketry to do so.

ENH: Multiple values to at/on (conditions)

Describe the solution you'd like
It would be nice to do this:

from rocketry.conds import daily, weekly

@app.task(daily.at("12:00", "18:00"))
def do_daily_twice():
    ... # Condition translates to: daily.at("12:00") | daily.at("18:00")

@app.task(weekly.on("Mon", "Fri"))
def do_weekly_twice():
    ... # Condition translates to: weekly.on("Mon") | weekly.on("Fri")

Describe alternatives you've considered
Use OR (|) but this gets tediuous.

ENH: Cron-style time periods (and conditions)

Is your feature request related to a problem? Please describe.
Some people might prefer Crontab-like scheduling options over the built-in time periods even though one can compose pretty much the same scheduling with it as well.

Describe the solution you'd like
The solution will consist of two parts: time periods and then a condition wrapper for it. The solution should extend the existing time system to support intervals mimicking the Cron time intervals and then it should be trivial to add that to the conditions. Then there could be a condition added in the condition API and to the condition syntax to compose such a combination.

Describe alternatives you've considered
Using existing time periods and conditions.

ENH: Missfire/task missed condition

Is your feature request related to a problem? Please describe.
If a task was missed (due to system failure, performance reasons or missing conditions), in some cases it is desired to run the missed task(s).

Describe the solution you'd like
Run stacks or a condition that allows running tasks that were missed.

Maybe something like this:

@app.task("daily | missed")
def do_daily():
    ...

If this was missed (did not run yesterday), the missed should return True and the task should run again.

The solution needs to check the last time the task ran and compare whether that was inside the previous timespan of the condition.

Describe alternatives you've considered
A metatask or a custom condition with a stack.

BUG: 100% CPU utilization - busy waiting?

Thanks for sharing this amazing software.

Describe the bug
Running Red Engine causes 100% CPU utilization. Maybe it's busy waiting?

To Reproduce
This simple app prints "Hello World" every 1 minute.

from redengine import RedEngine
app = RedEngine()

@app.task("every 1 minute")
def main():
    print("Hello World")

if __name__ == "__main__":
    app.run()

Expected behavior
CPU not utilized at all while waiting between running tasks.

Screenshots
screenshot

Desktop (please complete the following information):

  • OS: macOS 12.3, Linux (5.4.0-100-generic, Ubuntu, x86_64)
  • Python version 3.10.4

Additional context
If there's some misconfiguration on my side I'd be grateful for tips.

ENH: Convenient way to create new conditions

Is your feature request related to a problem? Please describe.
Creating simple conditions like a function that returns True-False and having that to the string parser engine could be easier.

Describe the solution you'd like

Proposed syntax:

from redengine.conditions import FuncCond

@FuncCond(syntax="is foo time"):
def is_foo():
    return True

# Then use
@FuncTask(start_cond="is foo time & daily")
def do_stuff():
    ...

The FuncCond could have following parameters:

  • syntax: (List[re.compile, str], str, re.compile): Syntax(es) that are parsed. Named groups are passed to the function like with BaseCondition.
  • name: Name of the resulted class, optional. By default the same name as the function.

Describe alternatives you've considered

Alternatively using the standard method:

import re
from pathlib import Path
from redengime.core import BaseCondition

class IsFoo(BaseCondition):

    __parsers__ = {
        "is foo time": "__init__"
    }

    def __init__(self):
        pass

    def __bool__(self):
        return True

# Then use
@FuncTask(start_cond="is foo time & daily")
def do_stuff():
    ...

DOCS: Example Rocketry with FastAPI where uvicorn is started by Docker container

I'm working on a FastAPI project. I want to add scheduling to, and Rocketry looks like a good way to do this. I understand the documentation about how to start Rocketry and FastAPI as separate asyncio tasks. But I also see where the examples modify uvicorn so Rocketry can enter the exit code.

Right now, I'm using docker_compose.yml to start the uvicorn rather than starting uvicorn in my code. Is there a way to get Rocketry connected to the asyncio loop as a task after the asyncio loop is running (by FastAPI)? And is there a way to connect the Rocketry exit work to uvicorn after it's running? Or can I connect the Rocketry exit stuff to the FastAPI "shutdown" event?

Thanks!
Doug

DOCS - Example for creating a task & modifying task-types

Is it corrected in the latest version (master branch)?
no

Latest version of the docs: https://rocketry.readthedocs.io/en/latest/

Location in the documentation you wish to improve

Describe the problem

In the first page, there seems a function(task name) typo.
To create a task:

from rocketry.conds import daily
from rocketry.args import Session

def do_things():
    ...

@app.task()
def remove_task(session=Session()):
    session.create_task(func=do_things, start_cond=daily)

For the second page, following from here, this line

app.task('daily', code='print("Hello world")')

Suggestion for the fix

  • For the first one, the task name here can be create_task, instead of remove_task.
  • For the second, if it's not implemented, we can remove the line from the docs.

Sub-second Timedelta?

Great project. I just switched an app over from Schedule.

One question, and I may be asking Rocketry to do something not in its nature. Is there an option to use Timedeltas lower than one second? ms and milliseconds are in the source and parsing them doesn't seem to explode Task assignment. However, they don't fire as often as expected.

In the example below I've fiddled with cycle_sleep down to 0.005 and None, but still only get 2-3 per second.

Is this supported? No problem if not, I'll have my tasks do it internally.

app = Rocketry(config={"cycle_sleep": None})

@app.task("every 100 ms")
def do_things():
    print(str(datetime.now()) + " hello")

if __name__ == "__main__":
    app.run()
2022-08-31 15:16:12.324561 hello
2022-08-31 15:16:12.713914 hello
2022-08-31 15:16:13.102267 hello
2022-08-31 15:16:13.488618 hello
2022-08-31 15:16:13.874968 hello
2022-08-31 15:16:14.261318 hello
2022-08-31 15:16:14.647670 hello
2022-08-31 15:16:15.035021 hello
2022-08-31 15:16:15.422372 hello
2022-08-31 15:16:15.812726 hello
2022-08-31 15:16:16.207084 hello
2022-08-31 15:16:16.599441 hello
2022-08-31 15:16:16.993798 hello

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.