Перейти к содержанию

AsyncResult

Async Result.

AsyncResult

AsyncResult - Асинхронный класс для ожидания результата задачи.

Пример

import asyncio

from qtasks import QueueTasks
from qtasks.results import AsyncResult

app = QueueTasks()

async def main():
    task = await app.add_task(task_name="test")
    result = await AsyncResult(uuid=task.uuid).result(timeout=50)

asyncio.run(main())
Source code in src/qtasks/results/async_result.py
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
class AsyncResult:
    """`AsyncResult` - Асинхронный класс для ожидания результата задачи.

    ## Пример

    ```python
    import asyncio

    from qtasks import QueueTasks
    from qtasks.results import AsyncResult

    app = QueueTasks()

    async def main():
        task = await app.add_task(task_name="test")
        result = await AsyncResult(uuid=task.uuid).result(timeout=50)

    asyncio.run(main())
    ```
    """

    def __init__(
        self,
        uuid: Annotated[
            Union[UUID, str],
            Doc(
                """
                    UUID задачи.
                    """
            ),
        ] = None,
        app: Annotated[
            Optional["QueueTasks"],
            Doc(
                """
                    `QueueTasks` экземпляр.

                    По умолчанию: `qtasks._state.app_main`.
                    """
            ),
        ] = None,
        log: Annotated[
            Optional[Logger],
            Doc(
                """
                    Логгер.

                    По умолчанию: `qtasks.logs.Logger`.
                    """
            ),
        ] = None,
    ):
        """Инициализация асинхронного результата.

        Args:
            uuid (UUID | str, optional): UUID задачи. По умолчанию: None.
            app (QueueTasks, optional): `QueueTasks` экземпляр. По умолчанию: None.
            log (Logger, optional): Логгер. По умолчанию: None.
        """
        self._app = app
        self._update_state()
        self.log = (
            log.with_subname("AsyncResult", default_level=self._app.config.logs_default_level_client)
            if log
            else Logger(
                name=self._app.name,
                subname="AsyncResult",
                default_level=self._app.config.logs_default_level_client,
                format=self._app.config.logs_format,
            )
        )
        self._stop_event = asyncio.Event()

        self.uuid = uuid

    async def result(
        self,
        timeout: Annotated[
            float,
            Doc(
                """
                    Таймаут задачи

                    По умолчанию: `100`.
                    """
            ),
        ] = 100,
    ) -> Union["Task", None]:
        """Ожидание результата задачи.

        Args:
            timeout (float, optional): Таймаут задачи. По умолчанию: `100`.

        Returns:
            Task | None: Задача или None.
        """
        self._stop_event.clear()
        try:
            result = await asyncio.wait_for(self._execute_task(), timeout)
            self.log.debug(f"Задача {result.uuid} выполнена!")
            return result
        except asyncio.TimeoutError:
            self.log.warning(f"Функция выполнялась {timeout} секунд!")
            self._stop_event.set()
            return None

    async def _execute_task(self) -> Union["Task", None]:
        uuid = self.uuid
        while True:
            if self._stop_event.is_set():
                break

            task = await self._app.get(uuid=uuid)
            if hasattr(task, "retry") and task.retry > 0 and hasattr(task, "retry_child_uuid"):
                uuid = task.retry_child_uuid
                continue
            if not task or task.status not in self._app.config.result_statuses_end:
                await asyncio.sleep(self._app.config.result_time_interval)
                continue
            return task

    def _update_state(self) -> "QueueTasks":
        import qtasks._state

        if not self._app:
            if qtasks._state.app_main is None:
                raise ImportError("Невозможно получить app!")
            self._app = qtasks._state.app_main

__init__(uuid=None, app=None, log=None)

Инициализация асинхронного результата.

Parameters:

Name Type Description Default
uuid UUID | str

UUID задачи. По умолчанию: None.

None
app QueueTasks

QueueTasks экземпляр. По умолчанию: None.

None
log Logger

Логгер. По умолчанию: None.

None
Source code in src/qtasks/results/async_result.py
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
def __init__(
    self,
    uuid: Annotated[
        Union[UUID, str],
        Doc(
            """
                UUID задачи.
                """
        ),
    ] = None,
    app: Annotated[
        Optional["QueueTasks"],
        Doc(
            """
                `QueueTasks` экземпляр.

                По умолчанию: `qtasks._state.app_main`.
                """
        ),
    ] = None,
    log: Annotated[
        Optional[Logger],
        Doc(
            """
                Логгер.

                По умолчанию: `qtasks.logs.Logger`.
                """
        ),
    ] = None,
):
    """Инициализация асинхронного результата.

    Args:
        uuid (UUID | str, optional): UUID задачи. По умолчанию: None.
        app (QueueTasks, optional): `QueueTasks` экземпляр. По умолчанию: None.
        log (Logger, optional): Логгер. По умолчанию: None.
    """
    self._app = app
    self._update_state()
    self.log = (
        log.with_subname("AsyncResult", default_level=self._app.config.logs_default_level_client)
        if log
        else Logger(
            name=self._app.name,
            subname="AsyncResult",
            default_level=self._app.config.logs_default_level_client,
            format=self._app.config.logs_format,
        )
    )
    self._stop_event = asyncio.Event()

    self.uuid = uuid

result(timeout=100) async

Ожидание результата задачи.

Parameters:

Name Type Description Default
timeout float

Таймаут задачи. По умолчанию: 100.

100

Returns:

Type Description
Union[Task, None]

Task | None: Задача или None.

Source code in src/qtasks/results/async_result.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
async def result(
    self,
    timeout: Annotated[
        float,
        Doc(
            """
                Таймаут задачи

                По умолчанию: `100`.
                """
        ),
    ] = 100,
) -> Union["Task", None]:
    """Ожидание результата задачи.

    Args:
        timeout (float, optional): Таймаут задачи. По умолчанию: `100`.

    Returns:
        Task | None: Задача или None.
    """
    self._stop_event.clear()
    try:
        result = await asyncio.wait_for(self._execute_task(), timeout)
        self.log.debug(f"Задача {result.uuid} выполнена!")
        return result
    except asyncio.TimeoutError:
        self.log.warning(f"Функция выполнялась {timeout} секунд!")
        self._stop_event.set()
        return None