Особенности задач QTasks¶
Задачи в QTasks — это не просто функции. Это расширяемые объекты с контекстом,
собственным поведением, поддержкой middleware, retry-механизмов, generator-based
пайплайнов и системой зависимостей.
Эта страница даёт обзор возможностей, доступных через декораторы @app.task и @shared_task.
📦 shared_task — задачи вне приложения¶
shared_task позволяет регистрировать задачи вне экземпляра приложения QueueTasks
— удобно для библиотек, reusable-модулей и кода, который должен работать в разных
проектах.
from qtasks import shared_task
@shared_task()
def shared_func():
print("Общая задача")
Превращение shared_task в асинхронную задачу¶
Если указать awaiting=True, задача возвращает AsyncTask, даже если среда «синхронная»:
@shared_task(awaiting=True)
async def async_shared():
print("async shared task")
shared_task поддерживает все параметры @app.task:
executormiddlewares_before/middlewares_aftergenerate_handlerechoretry,retry_on_excpriority,tags,descriptiondecode
🔀 Маршрутизация задач: (A)syncRouter¶
Router позволяет подключать наборы задач к приложению, разделять логику по модулям и переиспользовать код.
from qtasks import AsyncRouter
router = AsyncRouter()
@router.task()
async def example():
print("Router task")
В файле приложения:
app.include_router(router)
Router регистрирует только задачи и передает их в реестр приложения.
📣 echo=True и доступ к self (A)SyncTask¶
Если указать echo=True, задача получает первым аргументом объект задачи:
AsyncTaskдля asyncSyncTaskдля sync
@app.task(echo=True)
async def echo_task(self):
print(self.task_name)
Возможности self¶
Каждая задача имеет следующие поля:
| Атрибут | Значение |
|---|---|
self.task_name |
Имя задачи |
self.priority |
Приоритет (по умолчанию 0) |
self.echo |
Признак echo-режима |
self.max_time |
Максимальное время выполнения |
self.retry |
Кол-во повторных попыток |
self.retry_on_exc |
Список исключений для retry |
self.decode |
Декодер результата |
self.middlewares_before |
Миддлвари до выполнения |
self.middlewares_after |
Миддлвари после выполнения |
self.extra |
Любые дополнительные параметры |
Это позволяет реализовать вложенные задачи:
@app.task(echo=True)
async def main(self):
self.add_task("subtask", 123)
🧠 Контекст задачи: self.ctx¶
Контекст — это API взаимодействия задачи с инфраструктурой QTasks.
@app.task(echo=True)
def show(self):
print(self.ctx.task_uuid)
Возможности контекста¶
| Метод | Описание |
|---|---|
self.ctx.task_uuid |
UUID задачи |
self.ctx.get_logger(name) |
Логгер для задачи |
self.ctx.get_config() |
Конфиг приложения |
self.ctx.get_metadata(cache=True) |
Метаданные задачи (результат app.get()) |
self.ctx.get_task(uuid) |
Получение другой задачи |
self.ctx.sleep(sec) |
Задержка (async/ sync sleep) |
self.ctx.cancel(reason) |
Отмена задачи (TaskCancelError) |
self.ctx.plugin_error(**kwargs) |
Искусственный вызов ошибки плагина |
self.ctx.get_component(name) |
Получение компонента (broker, storage, ...) |
Контекст позволяет реализовывать сложную логику без глобальных переменных и singleton-структур.
🔁 retry и retry_on_exc¶
Повторное выполнение задачи при ошибке:
@app.task(retry=5, retry_on_exc=[ZeroDivisionError])
def risky():
return 1 / 0
retry=5— количество повторенийretry_on_exc=[...]— список исключений, при которых происходит retry
⚙️ executor — собственный обработчик задач¶
Позволяет заменить внутренний механизм выполнения:
class MyExec:
...
@app.task(executor=MyExec)
def task():
...
Executor управляет:
- выполнением задачи,
- middleware,
- декодированием результата,
- retry-процессом,
- обработкой ошибок.
🧩 Middleware¶
Middleware вызываются ДО и ПОСЛЕ выполнения задачи.
@app.task(
middlewares_before=[MyBefore],
middlewares_after=[MyAfter]
)
def example():
pass
Если middleware не указаны — используются пустые списки.
🔁 Поддержка yield и generate_handler¶
QTasks поддерживает generator-based задачи.
@app.task(generate_handler=handler)
def mytask():
step = yield "INIT"
return step
handler(result) вызывается на каждом значении, переданном через yield.
Это позволяет реализовать потоковые пайплайны:
- пошаговая обработка,
- стриминг,
- логирование прогресса,
- общение между воркером и исполнителем.
🪢 awaiting=True (только для shared_task)¶
Опция awaiting=True заставляет shared-задачу всегда работать как асинхронную.
@shared_task(awaiting=True)
async def async_shared():
...
Эти особенности делают задачи QTasks мощным инструментом, который можно адаптировать под любую инфраструктуру и архитектурный стиль.