Task Execution¶
This page describes the complete lifecycle of a task in QTasks: from the moment it is sent by the client to the completion of execution on the server side and the receipt of the result. The description focuses on data flows, component contracts, and the order of interaction, rather than on the user API.
The reference example uses a configuration with Redis as the broker and storage.
General Component Interaction Diagram¶
sequenceDiagram
participant Client
participant Broker
participant Storage
participant Worker
participant TaskExecutor
Client->>Broker: add_task()
Broker->>Storage: add()
Broker-->>Client: Task(status="new")
Broker->>Broker: lpop(queue)
Broker->>Storage: add_process()
Broker->>Storage: get()
Broker->>Worker: add()
Worker->>TaskExecutor: execute()
TaskExecutor-->>Worker: result
Worker->>Broker: remove_finished_task()
Broker->>Storage: remove_finished_task()
The diagram reflects only the high-level order of interactions. The details of each stage are disclosed below.
Task submission by the client¶
On the client side, the task exists exclusively as a data schema and is not linked to a specific method of execution.
- The client sends the task to the Broker via the
add_taskcontract, which is internally translated into theaddcontract. - The Broker passes the task data to the Storage via the
addcontract. - The Storage serializes the task status and saves it, for example, for Redis:
hset("{storage_name}:{uuid}", mapping=task_status.__dict__)
- The broker adds a reference to the task to its own queue:
rpush(queue_name, "{task_name}:{uuid}:{priority}")
- The broker immediately returns the
Taskschema with the statusnewto the client.
At this stage, the task is considered registered but not yet processed.
Receiving a task by the server¶
A broker operating in server mode constantly polls the task queue.
- The broker executes
lpop(queue_name). 1.1. If the queue is empty, the broker suspends execution fordefault_sleepseconds (default0.01). - Upon receiving a task, the broker notifies the storage about the start of processing
according to the contract
add_process. - The Storage adds the task to the queue of tasks to be executed, for example:
zadd(queue_process, {task_data: priority})
- The Broker requests the complete task data from the Storage using the
getcontract. - The received data is transferred to the Worker via the
addcontract.
Execution queue in Worker¶
- The Worker converts the received data into
TaskPrioritySchema. - The schema is added to the internal execution queue
self.queue. - The
newtask status is immediately generated and returned. - Subworkers running inside the Worker listen to one common queue.
- Tasks are captured on a first-come, first-served basis using
ConditionandSemaphore(max_tasks_process).
Thus, parallelism and asynchrony are limited and controlled at the Worker level.
Task transition to execution state¶
- Before the actual launch, Worker forms
TaskStatusProcessSchema. - The status is transferred to the Broker via the
updatecontract. - The Broker transmits the update to the Storage:
hset("{storage_name}:{uuid}", mapping=asdict(model))
At this stage, the task is officially considered to be in progress.
Task function execution¶
- The worker retrieves the task description from
app._tasksin the form ofTaskExecSchema. - This schema is attached to the task data.
- The worker passes the task to
TaskExecutorvia theexecute()contract.
Sequence inside TaskExecutor:
- call
execute_middlewares_before(); - call
before_execute(); - execute
run_task(); - for a generator task, delegate to
run_task_gen(); - call
after_execute(); - call
execute_middlewares_after(); - application of
decode()(if enabled, default is yes).
TaskExecutor returns either the result or an exception to the Worker.
Task completion¶
Depending on the result, the Worker forms one of the final schemas:
TaskStatusSuccessSchema;TaskStatusErrorSchema;-
TaskStatusCancelSchema. -
The result is passed to
remove_finished_task(). - The Worker sends the data to the Broker according to the
remove_finished_task()contract. - The Broker transfers it to the Storage.
The Storage performs two actions:
hset("{storage_name}:{uuid}", mapping=model.__dict__)
zrem(queue_process, "{task_name}:{uuid}:{priority}")
Redis command queue¶
When using Redis, write operations are performed via (A)syncRedisCommandQueue.
- Commands are added to the internal execution queue.
- If the queue does not exist, it is created automatically.
- The queue is executed sequentially.
- After the last command is executed, the queue is automatically closed.
This mechanism reduces overhead and organizes write operations.
Getting the task result¶
The task status is obtained through the Storage.
- According to the
get()contract, data is retrieved and converted toTask. - If
timeoutis specified inadd_task(),(A)syncResultis used. (A)syncResultperiodically queries the Storage at intervals ofresult_time_interval.- Waiting is implemented via:
ThreadPoolExecutor(max_workers=1)for sync mode;asyncio.wait_for()for async mode.- If the task status is not included in
result_statuses_end(default:SUCCESS,ERROR,CANCEL), the wait continues.
After reaching the final status, the result is returned to the client.
Architectural invariants¶
- The task is represented by data schemas at all stages, not by executable objects.
- Components interact exclusively through contracts.
- The broker does not execute tasks and does not know their internal logic.
- The worker does not manage data storage.
- The storage does not contain execution logic.
These invariants ensure predictable behavior and the ability to replace any component without changing the others.