This repository offers a message bus system tailored for integration with python-dependency-injector . It efficiently manages command and query dispatching and supports dependency injection, enhancing application decoupling, organization, and maintainability.
The source code is currently hosted on GitHub at: https://github.com/Retr0327/py-knot
Binary installers for the latest released version are available at the Python Package Index (PyPI).
-
pip
pip install pyknot
-
poetry
poetry add pyknot
-
Define messages:
Create specific command or query messages by extending
Command
orQuery
classes.from dataclasses import dataclass from knot import Command ReturnType = dict[str, int] @dataclass(slots=True) class TestCommand(Command[ReturnType]): a: int b: int
-
Define handlers:
Implement
CommandHandler
orQueryHandler
for handling defined messages.from dataclasses import dataclass from knot import Command, CommandHandler ReturnType = dict[str, int] @dataclass(slots=True) class TestCommand(Command[ReturnType]): a: int b: int class TestCommandHandler(CommandHandler[TestCommand]): def handle(self, message: TestCommand) -> ReturnType: ... # do something
-
Register handlers:
Register handlers to
MessageBus
by usingregister_handler
method.from dataclasses import dataclass from knot import ( Command, CommandHandler, MessageBus, register_handlers, ) ReturnType = dict[str, int] @dataclass(slots=True) class TestCommand(Command[ReturnType]): a: int b: int class TestCommandHandler(CommandHandler[TestCommand]): def handle(self, message: TestCommand) -> ReturnType: ... # do something messages = register_handlers((TestCommandHandler,)) message_bus = MessageBus(messages=messages)
-
Dispatch Messages:
Utilize
MessageBus
to dispatch messages within your applicationfrom dataclasses import dataclass from knot import ( Command, CommandHandler, MessageBus, register_handlers, ) ReturnType = dict[str, int] @dataclass(slots=True) class TestCommand(Command[ReturnType]): a: int b: int class TestCommandHandler(CommandHandler[TestCommand]): def handle(self, message: TestCommand) -> ReturnType: ... # do something messages = register_handlers((TestCommandHandler,)) message_bus = MessageBus(messages=messages) message_bus.dispatch(TestCommand(a=1, b=2))
please make sure you have installed python-dependency-injector
-
Set Up Dependency Injection Container
from dataclasses import dataclass from dependency_injector import ( containers, providers, ) from knot import ( Command, CommandHandler, MessageBus, register_handlers, ) from knot.plugins.di import handlers_to_factories ReturnType = dict[str, int] @dataclass(slots=True) class TestCommand(Command[ReturnType]): a: int b: int class TestCommandHandler(CommandHandler[TestCommand]): def handle(self, message: TestCommand) -> ReturnType: return message.as_dict() messages = register_handlers((TestCommandHandler,)) class MessageBusContainer(containers.DeclarativeContainer): message_bus = providers.Singleton( MessageBus, messages=providers.Dict(handlers_to_factories(messages)), )
In this container, you'll manage your dependencies, including the message bus. The key function here is
handlers_to_factories
, transforming the handlers into factory providers that are compatible with the dependency injection framework. -
Wire Dependencies and Dispatch Command
Wire the dependencies and dispatch commands through the message bus, and inject the message bus to whereever you need it.
from dependency_injector.wiring import inject, Provide @inject def test( message_bus: MessageBus = Provide["message_bus"], ): return message_bus.dispatch(TestCommand(a=1, b=2)) if __name__ == "__main__": container = MessageBusContainer() container.wire(modules=[__name__]) test()
please ensure that your application is already configured to work with Celery
py-knot
can be extended to work seamlessly with Celery, enabling the dispatching of messages to a task queue. This extension is particularly useful for applications that require asynchronous processing or distributed task execution.
-
Extend
MessageBus
for Celery IntegrationChange:
from knot import MessageBus
to
from knot.plugins.celery import MessageBus
-
Wire
queue_dispatcher_module
to the DI ContainerWiring
queue_dispatcher_module
to the DI container makes thedispatch_queue
method accessible as a dependency.from dataclasses import dataclass from dependency_injector import ( containers, providers, ) from dependency_injector.wiring import ( Provide, inject, ) from knot import ( Command, CommandHandler, register_handlers, ) from knot.plugins.celery import ( MessageBus, queue_dispatcher_module, ) from knot.plugins.di import handlers_to_factories ReturnType = dict[str, int] @dataclass(slots=True) class TestCommand(Command[ReturnType]): a: int b: int class TestCommandHandler(CommandHandler[TestCommand]): def handle(self, message: TestCommand) -> ReturnType: return message.as_dict() messages = register_handlers((TestCommandHandler,)) class MessageBusContainer(containers.DeclarativeContainer): message_bus = providers.Singleton( MessageBus, messages=providers.Dict(handlers_to_factories(messages)), ) @inject def test( message_bus: MessageBus = Provide["message_bus"], ): return message_bus.dispatch_queue(TestCommand(a="a", b="b")) if __name__ == "__main__": container = MessageBusContainer() container.wire(modules=[__name__, queue_dispatcher_module]) test()
If you have any suggestion or question, please do not hesitate to email me at [email protected].