Перейти к содержанию

SyncResult

Sync Result.

SyncResult

SyncResult - Синхронный класс для ожидания результата задачи.

Пример

from qtasks import QueueTasks
from qtasks.results import SyncResult

app = QueueTasks()

task = app.add_task(task_name="test")
result = SyncResult(uuid=task.uuid).result(timeout=50)
Source code in src/qtasks/results/sync_result.py
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
class SyncResult:
    """`SyncResult` - Синхронный класс для ожидания результата задачи.

    ## Пример

    ```python

    from qtasks import QueueTasks
    from qtasks.results import SyncResult

    app = QueueTasks()

    task = app.add_task(task_name="test")
    result = SyncResult(uuid=task.uuid).result(timeout=50)
    ```
    """

    def __init__(
        self,
        uuid: Annotated[
            Union[UUID, str],
            Doc(
                """
                    UUID задачи.
                    """
            ),
        ] = None,
        app: Annotated[
            Optional["QueueTasks"],
            Doc(
                """
                    `QueueTasks` экземпляр.

                    По умолчанию: `qtasks._state.app_main`.
                    """
            ),
        ] = None,
        log: Annotated[
            Optional[Logger],
            Doc(
                """
                    Логгер.

                    По умолчанию: `qtasks.logs.Logger`.
                    """
            ),
        ] = None,
    ):
        """Инициализация синхронного результата.

        Args:
            uuid (UUID | str, optional): UUID задачи. По умолчанию: None.
            app (QueueTasks, optional): `QueueTasks` экземпляр. По умолчанию: None.
            log (Logger, optional): Логгер. По умолчанию: None.
        """
        self._app = app
        self._update_state()
        self.log = (
            log.with_subname("SyncResult", default_level=self._app.config.logs_default_level_client)
            if log
            else Logger(
                name=self._app.name,
                subname="SyncResult",
                default_level=self._app.config.logs_default_level_client,
                format=self._app.config.logs_format,
            )
        )
        self._stop_event = threading.Event()

        self.uuid = uuid

    def result(
        self,
        timeout: Annotated[
            float,
            Doc(
                """
                    Таймаут задачи

                    По умолчанию: `100`.
                    """
            ),
        ] = 100,
    ) -> Union["Task", None]:
        """Ожидание результата задачи.

        Args:
            timeout (float, optional): Таймаут задачи. По умолчанию: `100`.

        Returns:
            Task | None: Задача или None.
        """
        self._stop_event.clear()
        with ThreadPoolExecutor(max_workers=1) as executor:
            future = executor.submit(self._execute_task)
            try:
                result = future.result(timeout=timeout)
                self.log.debug(f"Задача {result.uuid} выполнена!")
                return result
            except TimeoutError:
                self.log.warning(f"Функция выполнялась {timeout} секунд!")
                self._stop_event.set()
                return None

    def _execute_task(self) -> Union["Task", None]:
        uuid = self.uuid
        while True:
            if self._stop_event.is_set():
                break

            task = self._app.get(uuid=uuid)
            if hasattr(task, "retry") and hasattr(task, "retry_child_uuid"):
                uuid = task.retry_child_uuid
                continue
            if not task or task.status not in self._app.config.result_statuses_end:
                time.sleep(self._app.config.result_time_interval)
                continue

            return task

    def _update_state(self) -> "QueueTasks":
        import qtasks._state

        if not self._app:
            if qtasks._state.app_main is None:
                raise ImportError("Невозможно получить app!")
            self._app = qtasks._state.app_main

__init__(uuid=None, app=None, log=None)

Инициализация синхронного результата.

Parameters:

Name Type Description Default
uuid UUID | str

UUID задачи. По умолчанию: None.

None
app QueueTasks

QueueTasks экземпляр. По умолчанию: None.

None
log Logger

Логгер. По умолчанию: None.

None
Source code in src/qtasks/results/sync_result.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
def __init__(
    self,
    uuid: Annotated[
        Union[UUID, str],
        Doc(
            """
                UUID задачи.
                """
        ),
    ] = None,
    app: Annotated[
        Optional["QueueTasks"],
        Doc(
            """
                `QueueTasks` экземпляр.

                По умолчанию: `qtasks._state.app_main`.
                """
        ),
    ] = None,
    log: Annotated[
        Optional[Logger],
        Doc(
            """
                Логгер.

                По умолчанию: `qtasks.logs.Logger`.
                """
        ),
    ] = None,
):
    """Инициализация синхронного результата.

    Args:
        uuid (UUID | str, optional): UUID задачи. По умолчанию: None.
        app (QueueTasks, optional): `QueueTasks` экземпляр. По умолчанию: None.
        log (Logger, optional): Логгер. По умолчанию: None.
    """
    self._app = app
    self._update_state()
    self.log = (
        log.with_subname("SyncResult", default_level=self._app.config.logs_default_level_client)
        if log
        else Logger(
            name=self._app.name,
            subname="SyncResult",
            default_level=self._app.config.logs_default_level_client,
            format=self._app.config.logs_format,
        )
    )
    self._stop_event = threading.Event()

    self.uuid = uuid

result(timeout=100)

Ожидание результата задачи.

Parameters:

Name Type Description Default
timeout float

Таймаут задачи. По умолчанию: 100.

100

Returns:

Type Description
Union[Task, None]

Task | None: Задача или None.

Source code in src/qtasks/results/sync_result.py
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
def result(
    self,
    timeout: Annotated[
        float,
        Doc(
            """
                Таймаут задачи

                По умолчанию: `100`.
                """
        ),
    ] = 100,
) -> Union["Task", None]:
    """Ожидание результата задачи.

    Args:
        timeout (float, optional): Таймаут задачи. По умолчанию: `100`.

    Returns:
        Task | None: Задача или None.
    """
    self._stop_event.clear()
    with ThreadPoolExecutor(max_workers=1) as executor:
        future = executor.submit(self._execute_task)
        try:
            result = future.result(timeout=timeout)
            self.log.debug(f"Задача {result.uuid} выполнена!")
            return result
        except TimeoutError:
            self.log.warning(f"Функция выполнялась {timeout} секунд!")
            self._stop_event.set()
            return None