Skip to content

AsyncContext

Async context for tasks.

AsyncContext

Context associated with asynchronous tasks.

Example

from qtasks import QueueTasks
from qtasks.registries import AsyncTask

app = QueueTasks()

@app.task(echo=True)
async def my_task(self: AsyncTask):
    self.ctx #AsyncContext
Source code in src/qtasks/contexts/async_context.py
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
class AsyncContext:
    """
    Context associated with asynchronous tasks.

    ## Example

    ```python
    from qtasks import QueueTasks
    from qtasks.registries import AsyncTask

    app = QueueTasks()

    @app.task(echo=True)
    async def my_task(self: AsyncTask):
        self.ctx #AsyncContext
    ```
    """

    def __init__(self, **kwargs):
        """Initializing the context."""
        self.task_name = kwargs.get("task_name")
        """Task name."""

        self.task_uuid: UUID | str | None = kwargs.get("task_uuid")
        """Task UUID."""

        self.generate_handler = kwargs.get("generate_handler")
        """Generator function for creating tasks."""

        self._app: QueueTasks = kwargs.get("app", self._update_app())
        """The application the task belongs to."""

        self._log: Logger = kwargs.get("log", self._update_logger())
        """Logger."""

        self._metadata: Task | None = None
        """Task metadata."""

    def get_logger(self, name: str | None = None) -> Logger:
        """
        Returns a logger for the current context.

        Args:
            name (str|None): Logger name. If not specified, the task name or `AsyncContext` is used.

        Returns:
            Logger: Logger for the current context.
        """
        self._log = self._log.with_subname(name or self.task_name or "AsyncContext")
        return self._log

    def get_config(self) -> QueueConfig:
        """
        Returns the application configuration.

                Returns:
                    QueueConfig: Application configuration.
        """
        return self._app.config

    async def get_metadata(self, cache=True) -> Union[Task, None]:
        """
        Returns task metadata.

        Args:
            cache (bool): Use cached metadata.

        Returns:
            Task|None: Task metadata or None if not found.

        Raises:
            ValueError: Task UUID is not set.
        """
        if not self.task_uuid:
            raise ValueError("Task UUID is not set.")

        if cache:
            if not self._metadata:
                self._metadata = await self._app.get(self.task_uuid)
            return self._metadata
        return await self._app.get(self.task_uuid)

    async def get_task(self, uuid: UUID | str) -> Union[Task, None]:
        """
        Returns the task by UUID.

        Args:
            uuid (UUID|str): UUID of the task.

        Returns:
            Task|None: Task or None if not found.
        """
        return await self._app.get(uuid)

    async def sleep(self, seconds: float) -> None:
        """
        Pauses execution for the specified number of seconds.

        Args:
            seconds (float): Number of seconds to pause.
        """
        await asyncio.sleep(seconds)

    def cancel(self, reason: str = "") -> NoReturn:
        """
        Cancels the task.

        Args:
            reason (str): Reason for canceling the task.

        Raises:
            TaskCancelError: The exception thrown when a task is canceled.
        """
        raise TaskCancelError(reason or f"{self.task_name}.cancel")

    def plugin_error(self, **kwargs):
        """
        Causes a plugin error.

        Args:
            **kwargs: Arguments to pass to the plugin error handler.
        """
        raise TaskPluginTriggerError(**kwargs)

    def get_component(self, name: str):
        """
        Returns the application component by name.

        Args:
            name (str): Component name.

        Returns:
            Any: Application component or None if not found.
        """
        return getattr(self._app, name, None)

    def _update_app(self):
        """Updates the application for the current context."""
        import qtasks._state

        app = qtasks._state.app_main  # type: ignore
        return app

    def _update_logger(self) -> Logger:
        if self._app and self._app.log:
            log = self._app.log.with_subname(self.task_name or "AsyncContext")
        else:
            import qtasks._state

            log = qtasks._state.log_main.with_subname(self.task_name or "AsyncContext")
        return log

    def _update(self, **kwargs):
        """
        Updates context attributes.

        Args:
            kwargs (dict, optional): New context attribute values.
        """
        for name, value in kwargs.items():
            setattr(self, name, value)
        return

generate_handler = kwargs.get('generate_handler') instance-attribute

Generator function for creating tasks.

task_name = kwargs.get('task_name') instance-attribute

Task name.

task_uuid = kwargs.get('task_uuid') instance-attribute

Task UUID.

__init__(**kwargs)

Initializing the context.

Source code in src/qtasks/contexts/async_context.py
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
def __init__(self, **kwargs):
    """Initializing the context."""
    self.task_name = kwargs.get("task_name")
    """Task name."""

    self.task_uuid: UUID | str | None = kwargs.get("task_uuid")
    """Task UUID."""

    self.generate_handler = kwargs.get("generate_handler")
    """Generator function for creating tasks."""

    self._app: QueueTasks = kwargs.get("app", self._update_app())
    """The application the task belongs to."""

    self._log: Logger = kwargs.get("log", self._update_logger())
    """Logger."""

    self._metadata: Task | None = None
    """Task metadata."""

cancel(reason='')

Cancels the task.

Parameters:

Name Type Description Default
reason str

Reason for canceling the task.

''

Raises:

Type Description
TaskCancelError

The exception thrown when a task is canceled.

Source code in src/qtasks/contexts/async_context.py
121
122
123
124
125
126
127
128
129
130
131
def cancel(self, reason: str = "") -> NoReturn:
    """
    Cancels the task.

    Args:
        reason (str): Reason for canceling the task.

    Raises:
        TaskCancelError: The exception thrown when a task is canceled.
    """
    raise TaskCancelError(reason or f"{self.task_name}.cancel")

get_component(name)

Returns the application component by name.

Parameters:

Name Type Description Default
name str

Component name.

required

Returns:

Name Type Description
Any

Application component or None if not found.

Source code in src/qtasks/contexts/async_context.py
142
143
144
145
146
147
148
149
150
151
152
def get_component(self, name: str):
    """
    Returns the application component by name.

    Args:
        name (str): Component name.

    Returns:
        Any: Application component or None if not found.
    """
    return getattr(self._app, name, None)

get_config()

Returns the application configuration.

    Returns:
        QueueConfig: Application configuration.
Source code in src/qtasks/contexts/async_context.py
69
70
71
72
73
74
75
76
def get_config(self) -> QueueConfig:
    """
    Returns the application configuration.

            Returns:
                QueueConfig: Application configuration.
    """
    return self._app.config

get_logger(name=None)

Returns a logger for the current context.

Parameters:

Name Type Description Default
name str | None

Logger name. If not specified, the task name or AsyncContext is used.

None

Returns:

Name Type Description
Logger Logger

Logger for the current context.

Source code in src/qtasks/contexts/async_context.py
56
57
58
59
60
61
62
63
64
65
66
67
def get_logger(self, name: str | None = None) -> Logger:
    """
    Returns a logger for the current context.

    Args:
        name (str|None): Logger name. If not specified, the task name or `AsyncContext` is used.

    Returns:
        Logger: Logger for the current context.
    """
    self._log = self._log.with_subname(name or self.task_name or "AsyncContext")
    return self._log

get_metadata(cache=True) async

Returns task metadata.

Parameters:

Name Type Description Default
cache bool

Use cached metadata.

True

Returns:

Type Description
Union[Task, None]

Task|None: Task metadata or None if not found.

Raises:

Type Description
ValueError

Task UUID is not set.

Source code in src/qtasks/contexts/async_context.py
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
async def get_metadata(self, cache=True) -> Union[Task, None]:
    """
    Returns task metadata.

    Args:
        cache (bool): Use cached metadata.

    Returns:
        Task|None: Task metadata or None if not found.

    Raises:
        ValueError: Task UUID is not set.
    """
    if not self.task_uuid:
        raise ValueError("Task UUID is not set.")

    if cache:
        if not self._metadata:
            self._metadata = await self._app.get(self.task_uuid)
        return self._metadata
    return await self._app.get(self.task_uuid)

get_task(uuid) async

Returns the task by UUID.

Parameters:

Name Type Description Default
uuid UUID | str

UUID of the task.

required

Returns:

Type Description
Union[Task, None]

Task|None: Task or None if not found.

Source code in src/qtasks/contexts/async_context.py
100
101
102
103
104
105
106
107
108
109
110
async def get_task(self, uuid: UUID | str) -> Union[Task, None]:
    """
    Returns the task by UUID.

    Args:
        uuid (UUID|str): UUID of the task.

    Returns:
        Task|None: Task or None if not found.
    """
    return await self._app.get(uuid)

plugin_error(**kwargs)

Causes a plugin error.

Parameters:

Name Type Description Default
**kwargs

Arguments to pass to the plugin error handler.

{}
Source code in src/qtasks/contexts/async_context.py
133
134
135
136
137
138
139
140
def plugin_error(self, **kwargs):
    """
    Causes a plugin error.

    Args:
        **kwargs: Arguments to pass to the plugin error handler.
    """
    raise TaskPluginTriggerError(**kwargs)

sleep(seconds) async

Pauses execution for the specified number of seconds.

Parameters:

Name Type Description Default
seconds float

Number of seconds to pause.

required
Source code in src/qtasks/contexts/async_context.py
112
113
114
115
116
117
118
119
async def sleep(self, seconds: float) -> None:
    """
    Pauses execution for the specified number of seconds.

    Args:
        seconds (float): Number of seconds to pause.
    """
    await asyncio.sleep(seconds)