Перейти к содержанию

SyncContext

Sync context for tasks.

SyncContext

Контекст, связанный с синхронными задачами.

Пример

from qtasks import QueueTasks
from qtasks.registries import SyncTask

app = QueueTasks()

@app.task(echo=True)
async def my_task(self: SyncTask):
    self.ctx # SyncContext
Source code in src/qtasks/contexts/sync_context.py
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
class SyncContext:
    """
    Контекст, связанный с синхронными задачами.

    ## Пример

    ```python
    from qtasks import QueueTasks
    from qtasks.registries import SyncTask

    app = QueueTasks()

    @app.task(echo=True)
    async def my_task(self: SyncTask):
        self.ctx # SyncContext
    ```
    """

    def __init__(self, **kwargs):
        """Инициализация контекста."""
        self.task_name = kwargs.get("task_name")
        """Имя задачи."""

        self.task_uuid = kwargs.get("task_uuid")
        """UUID задачи."""

        self.generate_handler = kwargs.get("generate_handler")
        """Функция-генератор для создания задач."""

        self._app: "QueueTasks" = kwargs.get("app")
        """Приложение, к которому принадлежит задача."""
        self._update_app()

        self._log: "Logger" = kwargs.get("log")
        """Логгер."""

        self._metadata: Union["Task", None] = None
        """Метаданные задачи."""

    def get_logger(self, name: Union[str, None] = None) -> "Logger":
        """Возвращает логгер для текущего контекста.

        Args:
            name (str|None): Имя логгера. Если не указано, используется имя задачи.

        Returns:
            Logger: Логгер для текущего контекста.
        """
        self._log = self._app.log.with_subname(name or self.task_name)
        return self._log

    def get_config(self) -> QueueConfig:
        """Возвращает конфигурацию приложения.

        Returns:
            QueueConfig: Конфигурация приложения.
        """
        return self._app.config

    def get_metadata(self, cache=True) -> Union["Task", None]:
        """Возвращает метаданные задачи.

        Args:
            cache (bool): Использовать кэшированные метаданные.

        Returns:
            Task|None: Метаданные задачи или None, если не найдены.
        """
        if cache:
            if not self._metadata:
                self._metadata = self._app.get(self.task_uuid)
            return self._metadata
        return self._app.get(self.task_uuid)

    def get_task(self, uuid: Union[UUID, str]) -> Union["Task", None]:
        """Возвращает задачу по UUID.

        Args:
            uuid (UUID|str): UUID задачи.

        Returns:
            Task|None: Задача или None, если не найдена.
        """
        return self._app.get(uuid)

    def sleep(self, seconds: float) -> None:
        """Приостанавливает выполнение на заданное количество секунд.

        Args:
            seconds (float): Количество секунд для приостановки.
        """
        time.sleep(seconds)
        return

    def cancel(self, reason: str = "") -> NoReturn:
        """Отменяет задачу.

        Args:
            reason (str): Причина отмены задачи.

        Raises:
            TaskCancelError: Исключение, вызываемое при отмене задачи.
        """
        raise TaskCancelError(reason or f"{self.task_name}.cancel")

    def plugin_error(self, **kwargs):
        """Вызывает ошибку плагина.

        Args:
            **kwargs: Аргументы для передачи в обработчик ошибки плагина.
        """
        raise TaskPluginTriggerError(**kwargs)

    def get_component(self, name: str):
        """Возвращает компонент приложения по имени.

        Args:
            name (str): Имя компонента.

        Returns:
            Any: Компонент приложения или None, если не найден.
        """
        return getattr(self._app, name, None)

    def _update_app(self):
        """Обновляет приложение для текущего контекста."""
        if not self._app:
            import qtasks._state

            self._app = qtasks._state.app_main
        return

    def _update(self, **kwargs):
        """Обновляет атрибуты контекста.

        Args:
            kwargs (dict, optional): Новые значения атрибутов контекста.
        """
        for name, value in kwargs.items():
            setattr(self, name, value)
        return

generate_handler = kwargs.get('generate_handler') instance-attribute

Функция-генератор для создания задач.

task_name = kwargs.get('task_name') instance-attribute

Имя задачи.

task_uuid = kwargs.get('task_uuid') instance-attribute

UUID задачи.

__init__(**kwargs)

Инициализация контекста.

Source code in src/qtasks/contexts/sync_context.py
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
def __init__(self, **kwargs):
    """Инициализация контекста."""
    self.task_name = kwargs.get("task_name")
    """Имя задачи."""

    self.task_uuid = kwargs.get("task_uuid")
    """UUID задачи."""

    self.generate_handler = kwargs.get("generate_handler")
    """Функция-генератор для создания задач."""

    self._app: "QueueTasks" = kwargs.get("app")
    """Приложение, к которому принадлежит задача."""
    self._update_app()

    self._log: "Logger" = kwargs.get("log")
    """Логгер."""

    self._metadata: Union["Task", None] = None
    """Метаданные задачи."""

cancel(reason='')

Отменяет задачу.

Parameters:

Name Type Description Default
reason str

Причина отмены задачи.

''

Raises:

Type Description
TaskCancelError

Исключение, вызываемое при отмене задачи.

Source code in src/qtasks/contexts/sync_context.py
111
112
113
114
115
116
117
118
119
120
def cancel(self, reason: str = "") -> NoReturn:
    """Отменяет задачу.

    Args:
        reason (str): Причина отмены задачи.

    Raises:
        TaskCancelError: Исключение, вызываемое при отмене задачи.
    """
    raise TaskCancelError(reason or f"{self.task_name}.cancel")

get_component(name)

Возвращает компонент приложения по имени.

Parameters:

Name Type Description Default
name str

Имя компонента.

required

Returns:

Name Type Description
Any

Компонент приложения или None, если не найден.

Source code in src/qtasks/contexts/sync_context.py
130
131
132
133
134
135
136
137
138
139
def get_component(self, name: str):
    """Возвращает компонент приложения по имени.

    Args:
        name (str): Имя компонента.

    Returns:
        Any: Компонент приложения или None, если не найден.
    """
    return getattr(self._app, name, None)

get_config()

Возвращает конфигурацию приложения.

Returns:

Name Type Description
QueueConfig QueueConfig

Конфигурация приложения.

Source code in src/qtasks/contexts/sync_context.py
68
69
70
71
72
73
74
def get_config(self) -> QueueConfig:
    """Возвращает конфигурацию приложения.

    Returns:
        QueueConfig: Конфигурация приложения.
    """
    return self._app.config

get_logger(name=None)

Возвращает логгер для текущего контекста.

Parameters:

Name Type Description Default
name str | None

Имя логгера. Если не указано, используется имя задачи.

None

Returns:

Name Type Description
Logger Logger

Логгер для текущего контекста.

Source code in src/qtasks/contexts/sync_context.py
56
57
58
59
60
61
62
63
64
65
66
def get_logger(self, name: Union[str, None] = None) -> "Logger":
    """Возвращает логгер для текущего контекста.

    Args:
        name (str|None): Имя логгера. Если не указано, используется имя задачи.

    Returns:
        Logger: Логгер для текущего контекста.
    """
    self._log = self._app.log.with_subname(name or self.task_name)
    return self._log

get_metadata(cache=True)

Возвращает метаданные задачи.

Parameters:

Name Type Description Default
cache bool

Использовать кэшированные метаданные.

True

Returns:

Type Description
Union[Task, None]

Task|None: Метаданные задачи или None, если не найдены.

Source code in src/qtasks/contexts/sync_context.py
76
77
78
79
80
81
82
83
84
85
86
87
88
89
def get_metadata(self, cache=True) -> Union["Task", None]:
    """Возвращает метаданные задачи.

    Args:
        cache (bool): Использовать кэшированные метаданные.

    Returns:
        Task|None: Метаданные задачи или None, если не найдены.
    """
    if cache:
        if not self._metadata:
            self._metadata = self._app.get(self.task_uuid)
        return self._metadata
    return self._app.get(self.task_uuid)

get_task(uuid)

Возвращает задачу по UUID.

Parameters:

Name Type Description Default
uuid UUID | str

UUID задачи.

required

Returns:

Type Description
Union[Task, None]

Task|None: Задача или None, если не найдена.

Source code in src/qtasks/contexts/sync_context.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
def get_task(self, uuid: Union[UUID, str]) -> Union["Task", None]:
    """Возвращает задачу по UUID.

    Args:
        uuid (UUID|str): UUID задачи.

    Returns:
        Task|None: Задача или None, если не найдена.
    """
    return self._app.get(uuid)

plugin_error(**kwargs)

Вызывает ошибку плагина.

Parameters:

Name Type Description Default
**kwargs

Аргументы для передачи в обработчик ошибки плагина.

{}
Source code in src/qtasks/contexts/sync_context.py
122
123
124
125
126
127
128
def plugin_error(self, **kwargs):
    """Вызывает ошибку плагина.

    Args:
        **kwargs: Аргументы для передачи в обработчик ошибки плагина.
    """
    raise TaskPluginTriggerError(**kwargs)

sleep(seconds)

Приостанавливает выполнение на заданное количество секунд.

Parameters:

Name Type Description Default
seconds float

Количество секунд для приостановки.

required
Source code in src/qtasks/contexts/sync_context.py
102
103
104
105
106
107
108
109
def sleep(self, seconds: float) -> None:
    """Приостанавливает выполнение на заданное количество секунд.

    Args:
        seconds (float): Количество секунд для приостановки.
    """
    time.sleep(seconds)
    return