QueueTasks Components¶
The framework is built on a component-based architecture: each element of the system is responsible for its own part of the work and can be replaced, extended, or redefined. This page provides an overview of all the main and additional components, without going into depth on the internal logic.
Main components¶
QueueTasks¶
The central object of the framework. It manages task registration, configures the execution environment, and connects the other components.
When an instance of QueueTasks() is created, the following are automatically formed:
- Broker (Redis by default)
- Storage (Redis)
- GlobalConfig (Redis)
- Worker (SyncThreadWorker or AsyncWorker)
- Starter (SyncStarter or AsyncStarter)
from qtasks import QueueTasks
app = QueueTasks()
@app.task(name="test")
def sample_task(id: int):
return f"User {id} recorded"
QueueTasks can accept connection URLs (broker_url, storage_url) as well as
fully custom components.
Broker — incoming task handler¶
Responsible for receiving tasks and passing them to the worker. By default, a Redis broker is used.
from qtasks.asyncio import QueueTasks
from qtasks.brokers import AsyncRedisBroker
broker = AsyncRedisBroker(url="redis://localhost:6379/2")
app = QueueTasks(broker=broker)
The broker uses Storage and, if necessary, GlobalConfig.
Worker — task executor¶
Executes tasks received from the broker. Supports synchronous (SyncThreadWorker)
and asynchronous (AsyncWorker) modes.
from qtasks.asyncio import QueueTasks
from qtasks.workers import AsyncWorker
from qtasks.brokers import AsyncRedisBroker
broker = AsyncRedisBroker(url="redis://localhost:6379/2")
worker = AsyncWorker(broker=broker)
app = QueueTasks(broker=broker, worker=worker)
The worker interacts with the task context, retry logic, middleware, and plugins.
Storage — task data storage¶
Stores information about tasks:
- status,
- result,
- errors,
- execution time.
It is a mandatory component, but can be replaced. Broker contains a link to Storage inside itself.
```py
from qtasks.asyncio import QueueTasks
from qtasks.storages import AsyncRedisStorage
from qtasks.brokers import AsyncRedisBroker
storage = AsyncRedisStorage(url="redis://localhost:6379/2")
broker = AsyncRedisBroker(url="redis://localhost:6379/2", storage=storage)
app = QueueTasks(broker=broker)
The storage also contains a link to GlobalConfig.
Starter — controls the launch of components¶
Starter is responsible for launching and stopping all components. QueueTasks uses Starter by default.
Starter can also control the execution flow scenarios of components.
from qtasks.asyncio import QueueTasks
from qtasks.starters import AsyncStarter
starter = AsyncStarter(name="QueueTasks")
app = QueueTasks()
if __name__ == "__main__":
app.run_forever(starter=starter)
Additional components¶
GlobalConfig — global configuration¶
Storage of global variables and settings available to all components. Used, for example, when working with WebView: the interface can connect to Redis without launching the application.
from qtasks.asyncio import QueueTasks
from qtasks.configs import AsyncRedisGlobalConfig
config = AsyncRedisGlobalConfig(url="redis://localhost:6379/2")
app = QueueTasks(global_config=config)
GlobalConfig is available as:
app.broker.storage.global_config
and can be None.
Plugins — extending functionality¶
Plugins allow you to connect any additional logic: logging, modification of task arguments, integrations.
All available triggers are described here: Triggers
Example:
from qtasks import QueueTasks
from qtasks.plugins import BasePlugin
app = QueueTasks()
class TestPlugin(BasePlugin):
def __init__(self):
super().__init__()
self.handlers = {
"task_executor_args_replace": self.replace
}
def replace(self, **kwargs):
print("ARGS:", kwargs)
return None
app.add_plugin(
TestPlugin(),
trigger_names=["task_executor_args_replace"],
component="worker")
Timer — running tasks on a schedule¶
A separate component that allows you to use cron-like schedules.
Uses APScheduler (CronTrigger).
from qtasks import QueueTasks
from qtasks.timers import AsyncTimer
from apscheduler.triggers.cron import CronTrigger
app = QueueTasks()
@app.task
def test():
print("Running test task")
trigger = CronTrigger(second="*/10")
timer = AsyncTimer(app=app)
timer.add_task("test", trigger=trigger)
timer.run_forever()
WebView — visual interface¶
A separate library for viewing the list of tasks, results, and statistics. Installation:
pip install qtasks_webview
WebView works directly with Redis and does not require the application to be running.
Complete example of manual assembly of all components¶
import asyncio
from qtasks.asyncio import QueueTasks
from qtasks.configs import AsyncRedisGlobalConfig
from qtasks.storages import AsyncRedisStorage
from qtasks.brokers import AsyncRedisBroker
from qtasks.workers import AsyncWorker
from qtasks.starters import AsyncStarter
# GlobalConfig — global variables and settings
global_config = AsyncRedisGlobalConfig(
name="QueueTasks",
url="redis://localhost:6379/2"
)
# Storage — task storage
storage = AsyncRedisStorage(
name="QueueTasks",
global_config=global_config,
url="redis://localhost:6379/2")
# Broker — incoming task handler
broker = AsyncRedisBroker(
name="QueueTasks",
storage=storage,
url="redis://localhost:6379/2")
# Worker — task executor
worker = AsyncWorker(
name="QueueTasks",
broker=broker
)
# QueueTasks — main object
app = QueueTasks(
name="QueueTasks",
broker=broker,
worker=worker)
# Application settings
app.config.max_tasks_process = 10
app.config.running_older_tasks = True
app.config.delete_finished_tasks = True
@app.task(name="test")
async def sample_task(id: int):
result = f"User {id} recorded"
await asyncio.sleep(id)
return result
if __name__ == "__main__":
starter = AsyncStarter(
name="QueueTasks",
worker=worker,
broker=broker
)
app.run_forever(starter=starter)