Giter Club home page Giter Club logo

celery-amqp-events's Introduction

Celery-AMQP-Events is a library that implements voluntary events handling on top of Celery.

  • AMQP-based robustness of event handling
  • Celery tasks interface
  • Anti-flood tactics

Installation

pip install celery-amqp-events

Configuration

  1. Pass a unique "service name" to Celery() instance for each service that has event handlers (see amqp_events.celery:events_app).
  2. Tell celery with imports setting where to find event handlers.
  3. Configure broker connection and other celery settings.
  4. Leave result backend empty - each event may have multiple consumers, event result is meaningless in this case.

You absolutely need to set separate name for each service that consumes events, because without that each fired event will be handled only by single randomly chosen service, because your services will share same queue for this event.

from amqp_events.celery import events_app

app = events_app(
    "service_name",  # important in multi-service environment
    imports=['demo.tasks']  # modules where to find event handlers
    broker_url='amqp://guest:guest@rabbitmq:5672/',
)

Adding events and handlers

from demo.celery import app

@app.event('service_name.object_name.event_name')
def even_number_generated(number: int):
    # You may add some validation logic in event function body;
    if number % 2 != 0:
        raise ValueError("number is not even")

@app.handler('service_name.object_name.event_name')
def on_even_number_generated(number: int):
    # Handle event somehow
    print(f"even number {number} generated")

Running

  • Start ordinary celery worker for your consumer service

Note that mingle, gossip and heartbeat should be disabled if not used. These algorithms use broadcast events, which means that you'll have N^2 messages in RabbitMQ for N workers without any purpose.

celery worker -A your_service.celery \
  --without-mingle --without-heartbeat --without-gossip

Sending events

import random
from demo.events import number_is_even

try:
    number_is_even(random.randint(0, 100))
except ValueError:
    print("oops, number was odd")

Robustness

  • If event fails with unhandled error, it is retried to separate queue with exponential backoff.
  • Backoff is used to prevent resources exhausting (like free http connections)
  • If no retry attempts left, unhandled event is moved to "archive" queue
  • Archive is used to catch messages which always produce an error in consumer; these messages can be manually retried when fix is released.
  • Archive is limited both by message TTL and message count limit, so alerts should exist.
  • Retry is done via separate queue because of multiple reasons:
    • using countdown forces consumer to keep "unacknowledged" events in memory, which is bad for load balancing and resource usage.
    • retrying to same queue will slow down event processing if retry probability is high enougth
    • two faulty consumers retrying same event with same routing key will cause exponential growth of message count in RabbitMQ because message is split to multiple messages when published from same exchange to multiple queues.
  • By default, some fault-tolerance celery settings are enabled:
    • task_acks_late will delay task acknowledgement till end of processing
    • task_reject_on_worker_lost will prevent ack if worker was killed
    • confirm_publish in broker_transport_options will block producer till broker will confirm that it received incoming message.

Related projects

Celery-message-consumer

Robustness tactics is inspired by celery-message-consumer project which aims to handle events published to AMQP broker from non-celery-based projects (maybe from other languages). The main difference is that Celery-AMQP-Events uses Celery tasks instead of including additional consumer step nearby the celery worker.

celery-amqp-events's People

Contributors

tumb1er avatar

Watchers

 avatar

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.