Возможности QTasks¶
QTasks — это современный фреймворк для очередей задач, созданный вокруг идеи полной расширяемости, минималистичной архитектуры и удобства использования. Он одинаково хорошо подходит для небольших проектов и крупной распределённой инфраструктуры.
Эта страница описывает ключевые возможности фреймворка, включая его уникальные архитектурные особенности.
Простая и быстрая настройка¶
- Регистрация задач через
@app.task— синхронных и асинхронных. - Нативная поддержка
asyncioбез ограничений GIL. - Лёгкое подключение приложения через CLI:
qtasks -A app:app run. - Гибкая конфигурация: параметры по умолчанию или кастомные компоненты.
Уникальные архитектурные возможности¶
🔧 Компонентная архитектура¶
Каждый элемент системы можно заменить:
- Broker
- Worker
- Storage
- GlobalConfig
- Starter
- TaskExecutor
Это позволяет адаптировать QTasks под любые сценарии: in-memory тестирование, распределённые системы, собственные брокеры, альтернативные executors.
⚙️ Полностью заменяемый TaskExecutor¶
Логика выполнения задач изолирована в отдельном классе. Его можно заменить через:
@app.task(executor=MyTaskExecutor)
Это даёт контроль над:
- обработкой ошибок,
- retry-стратегиями,
- middleware,
- кастомными пайплайнами выполнения.
Работа с задачами¶
✔️ Поддержка yield в задачах¶
QTasks позволяет использовать генераторы в задачах.
@app.task(generate_handler=gen_handler)
def mytask():
value = yield "step1"
return value
gen_handler(result) принимает значение, переданное через yield, а результат
return возвращается в задачу.
Это позволяет строить поэтапные сценарии выполнения задач.
✔️ Поддержка Depends¶
Плагин Depends позволяет внедрять зависимости в задачи.
@app.task
def process(data: int, db=Depends(get_db, scope="worker")):
return db.save(data)
Scopes:
taskworkerbrokerstorageglobal_config
Фреймворк автоматически создаёт и корректно закрывает контекст зависимости.
✔️ Поддержка State¶
Потокобезопасный способ передачи данных между задачами.
from qtasks.plugins import AsyncState
class MyState(AsyncState):
pass
@app.task
def sample(state: MyState):
...
State позволяет задаче безопасно передавать промежуточные данные поэтапно.
Интеграция с брокерами сообщений¶
- Redis — основной поддерживаемый брокер.
- RabbitMQ — доступен через доп. установку.
- Kafka — также поддерживается.
Брокеры легко подключаются:
app = QueueTasks(broker_url="redis://localhost:6379/0")
При необходимости можно передать собственную реализацию брокера.
Расширяемость и плагины¶
QTasks предоставляет многоуровневую систему триггеров:
- на выполнение задач,
- на изменение аргументов,
- на изменение результата,
- на события внутри компонентов.
Плагин легко добавить:
app.add_plugin(MyPlugin(), trigger_names=["task_executor_args_replace"], component="worker")
Фреймворк поддерживает создание дополнительных инструментов: gRPC-передача задач, унифицированные middleware, Depends, State.
Масштабируемость¶
QTasks масштабируется горизонтально благодаря простой, надёжной модели:
«Кто первый взял задачу, тот и обрабатывает».
Это обеспечивает:
- работу нескольких воркеров одновременно;
- естественную балансировку нагрузки;
- распределённое выполнение задач без сложных алгоритмов.
Асинхронная обработка задач¶
Асинхронный режим имеет преимущества:
- высокая производительность;
- потокобезопасность (используются Semaphore и PriorityQueue);
- инкапсуляция контекста задач;
- легко внедряется внутрь других фреймворков.
Асинхронные задачи объявляются как:
@app.task
async def handler(x: int):
return x * 2
Мониторинг и статистика¶
(Описание аналитики — на отдельной странице.)
Возможности включают:
(A)syncStatsдля получения статистики приложений и задач;- вывод статистики через CLI:
qtasks stats inspect ....
Эти возможности делают QTasks не просто очередью задач, а гибким и расширяемым фреймворком, который легко адаптируется под реальные бизнес-сценарии.