Выполнение задач¶
Эта страница описывает полный жизненный цикл задачи в QTasks: от момента отправки клиентом до завершения выполнения на стороне сервера и получения результата. Описание сфокусировано на потоках данных, контрактах компонентов и порядке взаимодействия, а не на пользовательском API.
В качестве опорного примера используется конфигурация с Redis в роли брокера и хранилища.
Общая схема взаимодействия компонентов¶
sequenceDiagram
participant Client
participant Broker
participant Storage
participant Worker
participant TaskExecutor
Client->>Broker: add_task()
Broker->>Storage: add()
Broker-->>Client: Task(status="new")
Broker->>Broker: lpop(queue)
Broker->>Storage: add_process()
Broker->>Storage: get()
Broker->>Worker: add()
Worker->>TaskExecutor: execute()
TaskExecutor-->>Worker: result
Worker->>Broker: remove_finished_task()
Broker->>Storage: remove_finished_task()
Диаграмма отражает только верхнеуровневый порядок взаимодействий. Детали каждого этапа раскрываются ниже.
Отправка задачи клиентом¶
На стороне клиента задача существует исключительно в виде схемы данных и не связана с конкретным способом выполнения.
- Клиент отправляет задачу в Брокер по контракту
add_task, который внутри транслируется в контрактadd. - Брокер передаёт данные задачи в Хранилище по контракту
add. - Хранилище сериализует статус задачи и сохраняет его, например для Redis:
hset("{storage_name}:{uuid}", mapping=task_status.__dict__)
- Брокер добавляет ссылку на задачу в собственную очередь:
rpush(queue_name, "{task_name}:{uuid}:{priority}")
- Брокер немедленно возвращает клиенту схему
Taskсо статусомnew.
На этом этапе задача считается зарегистрированной, но ещё не взятой в обработку.
Получение задачи сервером¶
Брокер, работающий в серверном режиме, постоянно опрашивает очередь задач.
- Брокер выполняет
lpop(queue_name). 1.1. Если очередь пуста, Брокер приостанавливает выполнение наdefault_sleepсекунд (по умолчанию0.01). - При получении задачи Брокер уведомляет Хранилище о начале обработки по контракту
add_process. - Хранилище добавляет задачу в очередь выполняемых задач, например:
zadd(queue_process, {task_data: priority})
- Брокер запрашивает полные данные задачи у Хранилища по контракту
get. - Полученные данные передаются в Воркер по контракту
add.
Очередь выполнения в Worker¶
- Воркер преобразует полученные данные в
TaskPrioritySchema. - Схема добавляется во внутреннюю очередь выполнения
self.queue. - Немедленно формируется и возвращается статус задачи
new. - Сабворкеры, запущенные внутри Worker, слушают одну общую очередь.
- Захват задачи происходит по принципу «кто первый взял» с использованием
ConditionиSemaphore(max_tasks_process).
Таким образом, параллелизм и асинхронность ограничиваются и контролируются на уровне Worker.
Переход задачи в состояние выполнения¶
- Перед фактическим запуском Worker формирует
TaskStatusProcessSchema. - Статус передаётся в Брокер по контракту
update. - Брокер транслирует обновление в Хранилище:
hset("{storage_name}:{uuid}", mapping=asdict(model))
На этом этапе задача официально считается выполняемой.
Выполнение функции задачи¶
- Worker извлекает описание задачи из
app._tasksв видеTaskExecSchema. - Эта схема присоединяется к данным задачи.
- Worker передаёт задачу в
TaskExecutorчерез контрактexecute().
Последовательность внутри TaskExecutor:
- вызов
execute_middlewares_before(); - вызов
before_execute(); - выполнение
run_task(); - при генераторной задаче — делегирование в
run_task_gen(); - вызов
after_execute(); - вызов
execute_middlewares_after(); - применение
decode()(если включено, по умолчанию — да).
TaskExecutor возвращает в Worker либо результат, либо исключение.
Завершение задачи¶
В зависимости от результата Worker формирует одну из финальных схем:
TaskStatusSuccessSchema;TaskStatusErrorSchema;-
TaskStatusCancelSchema. -
Результат передаётся в
remove_finished_task(). - Worker отправляет данные в Брокер по контракту
remove_finished_task(). - Брокер передаёт их в Хранилище.
Хранилище выполняет два действия:
hset("{storage_name}:{uuid}", mapping=model.__dict__)
zrem(queue_process, "{task_name}:{uuid}:{priority}")
Очередь команд Redis¶
При использовании Redis операции записи выполняются через (A)syncRedisCommandQueue.
- Команды добавляются во внутреннюю очередь выполнения.
- Если очередь отсутствует — она создаётся автоматически.
- Очередь выполняется последовательно.
- После выполнения последней команды очередь автоматически завершается.
Этот механизм снижает накладные расходы и упорядочивает операции записи.
Получение результата задачи¶
Получение состояния задачи происходит через Хранилище.
- По контракту
get()данные извлекаются и преобразуются вTask. - Если при
add_task()указанtimeout, используется(A)syncResult. (A)syncResultпериодически запрашивает Хранилище с интерваломresult_time_interval.-
Ожидание реализуется через:
-
ThreadPoolExecutor(max_workers=1)для sync-режима; asyncio.wait_for()для async-режима.- Если статус задачи не входит в
result_statuses_end(по умолчанию:SUCCESS,ERROR,CANCEL), ожидание продолжается.
После достижения конечного статуса результат возвращается клиенту.
Архитектурные инварианты¶
- Задача на всех этапах представлена схемами данных, а не исполняемыми объектами.
- Компоненты взаимодействуют исключительно через контракты.
- Брокер не выполняет задачи и не знает их внутренней логики.
- Worker не управляет хранением данных.
- Хранилище не содержит логики выполнения.
Эти инварианты обеспечивают предсказуемость поведения и возможность замены любого компонента без изменения остальных.