Перейти к содержанию

Возможности QTasks

QTasks — это современный фреймворк для очередей задач, созданный вокруг идеи полной расширяемости, минималистичной архитектуры и удобства использования. Он одинаково хорошо подходит для небольших проектов и крупной распределённой инфраструктуры.

Эта страница описывает ключевые возможности фреймворка, включая его уникальные архитектурные особенности.


Простая и быстрая настройка

  • Регистрация задач через @app.task — синхронных и асинхронных.
  • Нативная поддержка asyncio без ограничений GIL.
  • Лёгкое подключение приложения через CLI: qtasks -A app:app run.
  • Гибкая конфигурация: параметры по умолчанию или кастомные компоненты.

Уникальные архитектурные возможности

🔧 Компонентная архитектура

Каждый элемент системы можно заменить:

  • Broker
  • Worker
  • Storage
  • GlobalConfig
  • Starter
  • TaskExecutor

Это позволяет адаптировать QTasks под любые сценарии: in-memory тестирование, распределённые системы, собственные брокеры, альтернативные executors.

⚙️ Полностью заменяемый TaskExecutor

Логика выполнения задач изолирована в отдельном классе. Его можно заменить через:

@app.task(executor=MyTaskExecutor)

Это даёт контроль над:

  • обработкой ошибок,
  • retry-стратегиями,
  • middleware,
  • кастомными пайплайнами выполнения.

Работа с задачами

✔️ Поддержка yield в задачах

QTasks позволяет использовать генераторы в задачах.

@app.task(generate_handler=gen_handler)
def mytask():
    value = yield "step1"
    return value

gen_handler(result) принимает значение, переданное через yield, а результат return возвращается в задачу.

Это позволяет строить поэтапные сценарии выполнения задач.

✔️ Поддержка Depends

Плагин Depends позволяет внедрять зависимости в задачи.

@app.task
def process(data: int, db=Depends(get_db, scope="worker")):
    return db.save(data)

Scopes:

  • task
  • worker
  • broker
  • storage
  • global_config

Фреймворк автоматически создаёт и корректно закрывает контекст зависимости.

✔️ Поддержка State

Потокобезопасный способ передачи данных между задачами.

from qtasks.plugins import AsyncState

class MyState(AsyncState):
    pass

@app.task
def sample(state: MyState):
    ...

State позволяет задаче безопасно передавать промежуточные данные поэтапно.


Интеграция с брокерами сообщений

  • Redis — основной поддерживаемый брокер.
  • RabbitMQ — доступен через доп. установку.
  • Kafka — также поддерживается.

Брокеры легко подключаются:

app = QueueTasks(broker_url="redis://localhost:6379/0")

При необходимости можно передать собственную реализацию брокера.


Расширяемость и плагины

QTasks предоставляет многоуровневую систему триггеров:

  • на выполнение задач,
  • на изменение аргументов,
  • на изменение результата,
  • на события внутри компонентов.

Плагин легко добавить:

app.add_plugin(MyPlugin(), trigger_names=["task_executor_args_replace"], component="worker")

Фреймворк поддерживает создание дополнительных инструментов: gRPC-передача задач, унифицированные middleware, Depends, State.


Масштабируемость

QTasks масштабируется горизонтально благодаря простой, надёжной модели:

«Кто первый взял задачу, тот и обрабатывает».

Это обеспечивает:

  • работу нескольких воркеров одновременно;
  • естественную балансировку нагрузки;
  • распределённое выполнение задач без сложных алгоритмов.

Асинхронная обработка задач

Асинхронный режим имеет преимущества:

  • высокая производительность;
  • потокобезопасность (используются Semaphore и PriorityQueue);
  • инкапсуляция контекста задач;
  • легко внедряется внутрь других фреймворков.

Асинхронные задачи объявляются как:

@app.task
async def handler(x: int):
    return x * 2

Мониторинг и статистика

(Описание аналитики — на отдельной странице.)

Возможности включают:

  • (A)syncStats для получения статистики приложений и задач;
  • вывод статистики через CLI: qtasks stats inspect ....

Эти возможности делают QTasks не просто очередью задач, а гибким и расширяемым фреймворком, который легко адаптируется под реальные бизнес-сценарии.