Перейти к содержанию

Особенности задач QTasks

Задачи в QTasks — это не просто функции. Это расширяемые объекты с контекстом, собственным поведением, поддержкой middleware, retry-механизмов, generator-based пайплайнов и системой зависимостей. Эта страница даёт обзор возможностей, доступных через декораторы @app.task и @shared_task.


📦 shared_task — задачи вне приложения

shared_task позволяет регистрировать задачи вне экземпляра приложения QueueTasks — удобно для библиотек, reusable-модулей и кода, который должен работать в разных проектах.

from qtasks import shared_task

@shared_task()
def shared_func():
    print("Общая задача")

Превращение shared_task в асинхронную задачу

Если указать awaiting=True, задача возвращает AsyncTask, даже если среда «синхронная»:

@shared_task(awaiting=True)
async def async_shared():
    print("async shared task")

shared_task поддерживает все параметры @app.task:

  • executor
  • middlewares_before / middlewares_after
  • generate_handler
  • echo
  • retry, retry_on_exc
  • priority, tags, description
  • decode

🔀 Маршрутизация задач: (A)syncRouter

Router позволяет подключать наборы задач к приложению, разделять логику по модулям и переиспользовать код.

from qtasks import AsyncRouter

router = AsyncRouter()

@router.task()
async def example():
    print("Router task")

В файле приложения:

app.include_router(router)

Router регистрирует только задачи и передает их в реестр приложения.


📣 echo=True и доступ к self (A)SyncTask

Если указать echo=True, задача получает первым аргументом объект задачи:

  • AsyncTask для async
  • SyncTask для sync
@app.task(echo=True)
async def echo_task(self):
    print(self.task_name)

Возможности self

Каждая задача имеет следующие поля:

Атрибут Значение
self.task_name Имя задачи
self.priority Приоритет (по умолчанию 0)
self.echo Признак echo-режима
self.max_time Максимальное время выполнения
self.retry Кол-во повторных попыток
self.retry_on_exc Список исключений для retry
self.decode Декодер результата
self.middlewares_before Миддлвари до выполнения
self.middlewares_after Миддлвари после выполнения
self.extra Любые дополнительные параметры

Это позволяет реализовать вложенные задачи:

@app.task(echo=True)
async def main(self):
    self.add_task("subtask", 123)

🧠 Контекст задачи: self.ctx

Контекст — это API взаимодействия задачи с инфраструктурой QTasks.

@app.task(echo=True)
def show(self):
    print(self.ctx.task_uuid)

Возможности контекста

Метод Описание
self.ctx.task_uuid UUID задачи
self.ctx.get_logger(name) Логгер для задачи
self.ctx.get_config() Конфиг приложения
self.ctx.get_metadata(cache=True) Метаданные задачи (результат app.get())
self.ctx.get_task(uuid) Получение другой задачи
self.ctx.sleep(sec) Задержка (async/ sync sleep)
self.ctx.cancel(reason) Отмена задачи (TaskCancelError)
self.ctx.plugin_error(**kwargs) Искусственный вызов ошибки плагина
self.ctx.get_component(name) Получение компонента (broker, storage, ...)

Контекст позволяет реализовывать сложную логику без глобальных переменных и singleton-структур.


🔁 retry и retry_on_exc

Повторное выполнение задачи при ошибке:

@app.task(retry=5, retry_on_exc=[ZeroDivisionError])
def risky():
    return 1 / 0
  • retry=5 — количество повторений
  • retry_on_exc=[...] — список исключений, при которых происходит retry

⚙️ executor — собственный обработчик задач

Позволяет заменить внутренний механизм выполнения:

class MyExec:
    ...

@app.task(executor=MyExec)
def task():
    ...

Executor управляет:

  • выполнением задачи,
  • middleware,
  • декодированием результата,
  • retry-процессом,
  • обработкой ошибок.

🧩 Middleware

Middleware вызываются ДО и ПОСЛЕ выполнения задачи.

@app.task(
    middlewares_before=[MyBefore],
    middlewares_after=[MyAfter]
)
def example():
    pass

Если middleware не указаны — используются пустые списки.


🔁 Поддержка yield и generate_handler

QTasks поддерживает generator-based задачи.

@app.task(generate_handler=handler)
def mytask():
    step = yield "INIT"
    return step

handler(result) вызывается на каждом значении, переданном через yield.

Это позволяет реализовать потоковые пайплайны:

  • пошаговая обработка,
  • стриминг,
  • логирование прогресса,
  • общение между воркером и исполнителем.

🪢 awaiting=True (только для shared_task)

Опция awaiting=True заставляет shared-задачу всегда работать как асинхронную.

@shared_task(awaiting=True)
async def async_shared():
    ...

Эти особенности делают задачи QTasks мощным инструментом, который можно адаптировать под любую инфраструктуру и архитектурный стиль.