Skip to content

SyncContext

Sync context for tasks.

SyncContext

Context associated with synchronous tasks.

Example

from qtasks import QueueTasks
from qtasks.registries import SyncTask

app = QueueTasks()

@app.task(echo=True)
def my_task(self: SyncTask):
    self.ctx # SyncContext
Source code in src/qtasks/contexts/sync_context.py
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
class SyncContext:
    """
    Context associated with synchronous tasks.

    ## Example

    ```python
    from qtasks import QueueTasks
    from qtasks.registries import SyncTask

    app = QueueTasks()

    @app.task(echo=True)
    def my_task(self: SyncTask):
        self.ctx # SyncContext
    ```
    """

    def __init__(self, **kwargs):
        """Initializing the context."""
        self.task_name = kwargs.get("task_name")
        """Task name."""

        self.task_uuid: UUID | str | None = kwargs.get("task_uuid")
        """Task UUID."""

        self.generate_handler = kwargs.get("generate_handler")
        """Generator function for creating tasks."""

        self._app: QueueTasks = kwargs.get("app", self._update_app())
        """The application the task belongs to."""

        self._log: Logger = kwargs.get("log", self._update_logger())
        """Logger."""

        self._metadata: Task | None = None
        """Task metadata."""

    def get_logger(self, name: str | None = None) -> Logger:
        """
        Returns a logger for the current context.

        Args:
            name (str|None): Logger name. If not specified, the task name is used.

        Returns:
            Logger: Logger for the current context.
        """
        self._log = self._log.with_subname(name or self.task_name or "SyncContext")
        return self._log

    def get_config(self) -> QueueConfig:
        """
        Returns the application configuration.

        Returns:
            QueueConfig: Application configuration.
        """
        return self._app.config

    def get_metadata(self, cache=True) -> Union[Task, None]:
        """
        Returns task metadata.

        Args:
            cache (bool): Use cached metadata.

        Returns:
            Task|None: Task metadata or None if not found.

        Raises:
            ValueError: Task UUID is not set.
        """
        if not self.task_uuid:
            raise ValueError("Task UUID is not set.")

        if cache:
            if not self._metadata:
                self._metadata = self._app.get(self.task_uuid)
            return self._metadata
        return self._app.get(self.task_uuid)

    def get_task(self, uuid: UUID | str) -> Union[Task, None]:
        """
        Returns the task by UUID.

        Args:
            uuid (UUID|str): UUID of the task.

        Returns:
            Task|None: Task or None if not found.
        """
        return self._app.get(uuid)

    def sleep(self, seconds: float) -> None:
        """
        Pauses execution for the specified number of seconds.

        Args:
            seconds (float): Number of seconds to pause.
        """
        time.sleep(seconds)
        return

    def cancel(self, reason: str = "") -> NoReturn:
        """
        Cancels the task.

        Args:
            reason (str): Reason for canceling the task.

        Raises:
            TaskCancelError: The exception thrown when a task is canceled.
        """
        raise TaskCancelError(reason or f"{self.task_name}.cancel")

    def plugin_error(self, **kwargs):
        """
        Causes a plugin error.

        Args:
            **kwargs: Arguments to pass to the plugin error handler.
        """
        raise TaskPluginTriggerError(**kwargs)

    def get_component(self, name: str):
        """
        Returns the application component by name.

        Args:
            name (str): Component name.

        Returns:
            Any: Application component or None if not found.
        """
        return getattr(self._app, name, None)

    def _update_app(self):
        """Updates the application for the current context."""
        import qtasks._state

        app = qtasks._state.app_main  # type: ignore
        return app

    def _update_logger(self) -> Logger:
        if self._app and self._app.log:
            log = self._app.log.with_subname(self.task_name or "SyncContext")
        else:
            import qtasks._state

            log = qtasks._state.log_main.with_subname(self.task_name or "SyncContext")
        return log

    def _update(self, **kwargs):
        """
        Updates context attributes.

        Args:
            kwargs (dict, optional): New context attribute values.
        """
        for name, value in kwargs.items():
            setattr(self, name, value)
        return

generate_handler = kwargs.get('generate_handler') instance-attribute

Generator function for creating tasks.

task_name = kwargs.get('task_name') instance-attribute

Task name.

task_uuid = kwargs.get('task_uuid') instance-attribute

Task UUID.

__init__(**kwargs)

Initializing the context.

Source code in src/qtasks/contexts/sync_context.py
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
def __init__(self, **kwargs):
    """Initializing the context."""
    self.task_name = kwargs.get("task_name")
    """Task name."""

    self.task_uuid: UUID | str | None = kwargs.get("task_uuid")
    """Task UUID."""

    self.generate_handler = kwargs.get("generate_handler")
    """Generator function for creating tasks."""

    self._app: QueueTasks = kwargs.get("app", self._update_app())
    """The application the task belongs to."""

    self._log: Logger = kwargs.get("log", self._update_logger())
    """Logger."""

    self._metadata: Task | None = None
    """Task metadata."""

cancel(reason='')

Cancels the task.

Parameters:

Name Type Description Default
reason str

Reason for canceling the task.

''

Raises:

Type Description
TaskCancelError

The exception thrown when a task is canceled.

Source code in src/qtasks/contexts/sync_context.py
122
123
124
125
126
127
128
129
130
131
132
def cancel(self, reason: str = "") -> NoReturn:
    """
    Cancels the task.

    Args:
        reason (str): Reason for canceling the task.

    Raises:
        TaskCancelError: The exception thrown when a task is canceled.
    """
    raise TaskCancelError(reason or f"{self.task_name}.cancel")

get_component(name)

Returns the application component by name.

Parameters:

Name Type Description Default
name str

Component name.

required

Returns:

Name Type Description
Any

Application component or None if not found.

Source code in src/qtasks/contexts/sync_context.py
143
144
145
146
147
148
149
150
151
152
153
def get_component(self, name: str):
    """
    Returns the application component by name.

    Args:
        name (str): Component name.

    Returns:
        Any: Application component or None if not found.
    """
    return getattr(self._app, name, None)

get_config()

Returns the application configuration.

Returns:

Name Type Description
QueueConfig QueueConfig

Application configuration.

Source code in src/qtasks/contexts/sync_context.py
69
70
71
72
73
74
75
76
def get_config(self) -> QueueConfig:
    """
    Returns the application configuration.

    Returns:
        QueueConfig: Application configuration.
    """
    return self._app.config

get_logger(name=None)

Returns a logger for the current context.

Parameters:

Name Type Description Default
name str | None

Logger name. If not specified, the task name is used.

None

Returns:

Name Type Description
Logger Logger

Logger for the current context.

Source code in src/qtasks/contexts/sync_context.py
56
57
58
59
60
61
62
63
64
65
66
67
def get_logger(self, name: str | None = None) -> Logger:
    """
    Returns a logger for the current context.

    Args:
        name (str|None): Logger name. If not specified, the task name is used.

    Returns:
        Logger: Logger for the current context.
    """
    self._log = self._log.with_subname(name or self.task_name or "SyncContext")
    return self._log

get_metadata(cache=True)

Returns task metadata.

Parameters:

Name Type Description Default
cache bool

Use cached metadata.

True

Returns:

Type Description
Union[Task, None]

Task|None: Task metadata or None if not found.

Raises:

Type Description
ValueError

Task UUID is not set.

Source code in src/qtasks/contexts/sync_context.py
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
def get_metadata(self, cache=True) -> Union[Task, None]:
    """
    Returns task metadata.

    Args:
        cache (bool): Use cached metadata.

    Returns:
        Task|None: Task metadata or None if not found.

    Raises:
        ValueError: Task UUID is not set.
    """
    if not self.task_uuid:
        raise ValueError("Task UUID is not set.")

    if cache:
        if not self._metadata:
            self._metadata = self._app.get(self.task_uuid)
        return self._metadata
    return self._app.get(self.task_uuid)

get_task(uuid)

Returns the task by UUID.

Parameters:

Name Type Description Default
uuid UUID | str

UUID of the task.

required

Returns:

Type Description
Union[Task, None]

Task|None: Task or None if not found.

Source code in src/qtasks/contexts/sync_context.py
100
101
102
103
104
105
106
107
108
109
110
def get_task(self, uuid: UUID | str) -> Union[Task, None]:
    """
    Returns the task by UUID.

    Args:
        uuid (UUID|str): UUID of the task.

    Returns:
        Task|None: Task or None if not found.
    """
    return self._app.get(uuid)

plugin_error(**kwargs)

Causes a plugin error.

Parameters:

Name Type Description Default
**kwargs

Arguments to pass to the plugin error handler.

{}
Source code in src/qtasks/contexts/sync_context.py
134
135
136
137
138
139
140
141
def plugin_error(self, **kwargs):
    """
    Causes a plugin error.

    Args:
        **kwargs: Arguments to pass to the plugin error handler.
    """
    raise TaskPluginTriggerError(**kwargs)

sleep(seconds)

Pauses execution for the specified number of seconds.

Parameters:

Name Type Description Default
seconds float

Number of seconds to pause.

required
Source code in src/qtasks/contexts/sync_context.py
112
113
114
115
116
117
118
119
120
def sleep(self, seconds: float) -> None:
    """
    Pauses execution for the specified number of seconds.

    Args:
        seconds (float): Number of seconds to pause.
    """
    time.sleep(seconds)
    return