From 409485f54582fda143207c2775a7468dc4243eb3 Mon Sep 17 00:00:00 2001 From: Pavel Kirilin Date: Tue, 5 May 2026 23:22:43 +0200 Subject: [PATCH] Made tasks independent from brokers. --- pyproject.toml | 3 +- taskiq/__init__.py | 3 ++ taskiq/abc/broker.py | 69 ++++++--------------------- taskiq/decor.py | 6 ++- taskiq/kicker.py | 4 +- taskiq/task_gen.py | 110 +++++++++++++++++++++++++++++++++++++++++++ 6 files changed, 138 insertions(+), 57 deletions(-) create mode 100644 taskiq/task_gen.py diff --git a/pyproject.toml b/pyproject.toml index 8acb611c..986b32e3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -172,7 +172,8 @@ lint.ignore = [ "D100", # Missing docstring in public module "ANN401", # typing.Any are disallowed in `**kwargs "PLR0913", # Too many arguments for function call - "D106" # Missing docstring in public nested class + "D106", # Missing docstring in public nested class + "UP037" # Remove quotes from a type def ] lint.mccabe = { max-complexity = 10 } diff --git a/taskiq/__init__.py b/taskiq/__init__.py index 2414754f..3c5d3279 100644 --- a/taskiq/__init__.py +++ b/taskiq/__init__.py @@ -36,6 +36,7 @@ from taskiq.scheduler.scheduler import TaskiqScheduler from taskiq.state import TaskiqState from taskiq.task import AsyncTaskiqTask +from taskiq.task_gen import TaskiqTaskGenerator, task_gen __version__ = version("taskiq") @@ -68,8 +69,10 @@ "TaskiqResultTimeoutError", "TaskiqScheduler", "TaskiqState", + "TaskiqTaskGenerator", "ZeroMQBroker", "__version__", "async_shared_broker", "gather", + "task_gen", ] diff --git a/taskiq/abc/broker.py b/taskiq/abc/broker.py index ea2e86c0..c3b90187 100644 --- a/taskiq/abc/broker.py +++ b/taskiq/abc/broker.py @@ -20,6 +20,7 @@ from taskiq.abc.middleware import TaskiqMiddleware from taskiq.abc.serializer import TaskiqSerializer +from taskiq.task_gen import TaskiqTaskGenerator from taskiq.acks import AckableMessage from taskiq.decor import AsyncTaskiqDecoratedTask from taskiq.events import TaskiqEvents @@ -301,64 +302,23 @@ def task( # type: ignore[misc] :returns: decorator function or AsyncTaskiqDecoratedTask. """ - - def make_decorated_task( - inner_labels: dict[str, str | int], - inner_task_name: str | None = None, - ) -> Callable[ - [Callable[_FuncParams, _ReturnType]], - AsyncTaskiqDecoratedTask[_FuncParams, _ReturnType], - ]: - def inner( - func: Callable[_FuncParams, _ReturnType], - ) -> AsyncTaskiqDecoratedTask[_FuncParams, _ReturnType]: - nonlocal inner_task_name - if inner_task_name is None: - fmodule = func.__module__ - if fmodule == "__main__": # pragma: no cover - fmodule = ".".join( - os.path.normpath(sys.argv[0]) - .removesuffix(".py") - .split(os.path.sep), - ) - fname = func.__name__ - if fname == "": - fname = f"lambda_{uuid4().hex}" - inner_task_name = f"{fmodule}:{fname}" - wrapper = wraps(func) - - sign = get_type_hints(func) - return_type = None - if "return" in sign: - return_type = sign["return"] - - decorated_task = wrapper( - self.decorator_class( - broker=self, - original_func=func, - labels=inner_labels, - task_name=inner_task_name, - return_type=return_type, # type: ignore - ), - ) - - self._register_task(decorated_task.task_name, decorated_task) # type: ignore - - return decorated_task # type: ignore - - return inner + warnings.warn( + "Tasks are not independent from brokers. " + "Use `taskiq.task` as a decorator instead.", + TaskiqDeprecationWarning, + stacklevel=2, + ) + generator = TaskiqTaskGenerator().labels(**labels).broker(self) if callable(task_name): # This is an edge case, # when decorator called without parameters. - return make_decorated_task( - inner_labels=labels or {}, - )(task_name) + return generator(task_name) - return make_decorated_task( - inner_task_name=task_name, - inner_labels=labels or {}, - ) + if task_name: + generator = generator.name(task_name) + + return generator def register_task( self, @@ -534,9 +494,10 @@ def _register_task( raise TaskBrokerMismatchError(broker=task.broker) self.local_task_registry[task_name] = task - async def __aenter__(self) -> None: + async def __aenter__(self) -> "Self": """Starts the broker as ctx manager.""" await self.startup() + return self async def __aexit__(self, *args: object, **kwargs: Any) -> None: """Shuts down the broker as ctx manager.""" diff --git a/taskiq/decor.py b/taskiq/decor.py index 8f90541c..c1bd3965 100644 --- a/taskiq/decor.py +++ b/taskiq/decor.py @@ -45,10 +45,10 @@ class AsyncTaskiqDecoratedTask(Generic[_FuncParams, _ReturnType]): def __init__( self, - broker: "AsyncBroker", task_name: str, original_func: Callable[_FuncParams, _ReturnType], labels: dict[str, Any], + broker: "AsyncBroker | None" = None, return_type: type[_ReturnType] | None = None, ) -> None: self.broker = broker @@ -230,5 +230,9 @@ def kicker(self) -> AsyncKicker[_FuncParams, _ReturnType]: return_type=self.return_type, ) + def set_broker(self, broker: "AsyncBroker") -> None: + """Set broker for the task.""" + self.broker = broker + def __repr__(self) -> str: return f"AsyncTaskiqDecoratedTask({self.task_name})" diff --git a/taskiq/kicker.py b/taskiq/kicker.py index dc113a7e..9a18950c 100644 --- a/taskiq/kicker.py +++ b/taskiq/kicker.py @@ -42,10 +42,12 @@ class AsyncKicker(Generic[_FuncParams, _ReturnType]): def __init__( self, task_name: str, - broker: "AsyncBroker", + broker: "AsyncBroker | None", labels: dict[str, Any], return_type: type[_ReturnType] | None = None, ) -> None: + if broker is None: + raise RuntimeError("Broker is not set for this task!") self.task_name = task_name self.broker = broker self.labels = labels diff --git a/taskiq/task_gen.py b/taskiq/task_gen.py new file mode 100644 index 00000000..5cfec6b4 --- /dev/null +++ b/taskiq/task_gen.py @@ -0,0 +1,110 @@ +import os +import sys +from collections.abc import Callable +from copy import copy +from functools import wraps +from typing import TYPE_CHECKING, Any, get_type_hints +from uuid import uuid4 + +from typing_extensions import ParamSpec, Self, TypeVar + +from taskiq.decor import AsyncTaskiqDecoratedTask + +if TYPE_CHECKING: + from taskiq.abc.broker import AsyncBroker + +_FuncParams = ParamSpec("_FuncParams") +_ReturnType = TypeVar("_ReturnType") + + +class TaskiqTaskGenerator: + """Class used for task generation.""" + + def __init__(self) -> None: + self._labels: dict[str, Any] = {} + self._name: str | None = None + self._broker: "AsyncBroker | None" = None + + def name(self, name: str) -> Self: + """Set task name.""" + inst = copy(self) + inst._name = name # noqa: SLF001 + return inst + + def labels(self, **labels: Any) -> Self: + """Set task's static labels.""" + inst = copy(self) + inst._labels = labels # noqa: SLF001 + return inst + + def broker(self, broker: "AsyncBroker") -> Self: + """Set a broker.""" + inst = copy(self) + inst._broker = broker # noqa: SLF001 + return inst + + @classmethod + def make_task( + cls, + task_name: str, + broker: "AsyncBroker | None", + original_func: Callable[_FuncParams, _ReturnType], + labels: dict[str, Any], + return_type: type[_ReturnType] | None = None, + ) -> AsyncTaskiqDecoratedTask[_FuncParams, _ReturnType]: + """ + Create a task out of given inputs. + + This method can be overridden to create custom task classes + with custom arguments and logic. + """ + return AsyncTaskiqDecoratedTask( + broker=broker, + task_name=task_name, + original_func=original_func, + labels=labels, + return_type=return_type, + ) + + def __call__( + self, + func: Callable[_FuncParams, _ReturnType], + ) -> AsyncTaskiqDecoratedTask[_FuncParams, _ReturnType]: + """ + Make a decorated task. + + This function is the main point for creating a task + from a raw function. + """ + task_name = self._name + if task_name is None: + fmodule = func.__module__ + if fmodule == "__main__": # pragma: no cover + fmodule = ".".join( + os.path.normpath(sys.argv[0]) + .removesuffix(".py") + .split(os.path.sep), + ) + fname = func.__name__ + if fname == "": + fname = f"lambda_{uuid4().hex}" + task_name = f"{fmodule}:{fname}" + wrapper = wraps(func) + + sign = get_type_hints(func) + return_type = None + if "return" in sign: + return_type = sign["return"] + + return wrapper( + self.make_task( + original_func=func, + labels=self._labels, + task_name=task_name, + broker=self._broker, + return_type=return_type, # type: ignore + ), + ) + + +task_gen = TaskiqTaskGenerator()