Skip to content

AsyncResult

Async Result.

AsyncResult

AsyncResult - Asynchronous class for waiting for the result of a task.

Example

import asyncio

from qtasks import QueueTasks
from qtasks.results import AsyncResult

app = QueueTasks()

async def main():
    task = await app.add_task(task_name="test")
    result = await AsyncResult(uuid=task.uuid).result(timeout=50)

asyncio.run(main())
Source code in src/qtasks/results/async_result.py
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
class AsyncResult:
    """
    `AsyncResult` - Asynchronous class for waiting for the result of a task.

    ## Example

    ```python
    import asyncio

    from qtasks import QueueTasks
    from qtasks.results import AsyncResult

    app = QueueTasks()

    async def main():
        task = await app.add_task(task_name="test")
        result = await AsyncResult(uuid=task.uuid).result(timeout=50)

    asyncio.run(main())
    ```
    """

    def __init__(
        self,
        uuid: Annotated[
            UUID | str | None,
            Doc("""
                    UUID of the task.
                    """),
        ] = None,
        app: Annotated[
            Optional[QueueTasks],
            Doc("""
                    `QueueTasks` instance.

                    Default: `qtasks._state.app_main`.
                    """),
        ] = None,
        log: Annotated[
            Logger | None,
            Doc("""
                    Logger.

                    Default: `qtasks.logs.Logger`.
                    """),
        ] = None,
    ):
        """
        Initializing an asynchronous result.

        Args:
            uuid (UUID | str, optional): UUID of the task. Default: None.
            app (QueueTasks, optional): `QueueTasks` instance. Default: None.
            log (Logger, optional): Logger. Default: None.
        """
        self._app = app or self._update_state()

        self.log = (
            log.with_subname(
                "AsyncResult", default_level=self._app.config.logs_default_level_client
            )
            if log
            else Logger(
                name=self._app.name,
                subname="AsyncResult",
                default_level=self._app.config.logs_default_level_client,
                format=self._app.config.logs_format,
            )
        )
        self._stop_event = asyncio.Event()

        self.uuid = uuid

    async def result(
        self,
        timeout: Annotated[
            float,
            Doc("""
                    Task timeout

                    Default: `100`.
                    """),
        ] = 100,
    ) -> Union[Task, None]:
        """
        Waiting for the task result.

        Args:
            timeout (float, optional): Task timeout. Default: `100`.

        Returns:
            Task | None: Task or None.
        """
        self._stop_event.clear()
        try:
            result = await asyncio.wait_for(self._execute_task(), timeout)
            self.log.debug(f"Task {result.uuid if result else None} is completed!")
            return result
        except asyncio.TimeoutError:
            self.log.warning(f"Task timed out after {timeout} seconds!")
            self._stop_event.set()
            return None

    async def _execute_task(self) -> Union[Task, None]:
        if not self.uuid:
            raise ValueError("Task UUID is not set.")

        uuid = self.uuid
        while True:
            if self._stop_event.is_set():
                break

            task = await self._app.get(uuid=uuid)

            if not task:
                self.log.warning(f"Task {uuid} not found!")
                return None

            if task.retry and task.retry > 0 and task.retry_child_uuid:
                uuid = task.retry_child_uuid
                continue

            if not task or task.status not in self._app.config.result_statuses_end:
                await asyncio.sleep(self._app.config.result_time_interval)
                continue

            return task

    def _update_state(self) -> QueueTasks:
        import qtasks._state

        if qtasks._state.app_main is None:
            raise ImportError("Unable to get app!")
        app = qtasks._state.app_main
        return app  # type: ignore

__init__(uuid=None, app=None, log=None)

Initializing an asynchronous result.

Parameters:

Name Type Description Default
uuid UUID | str

UUID of the task. Default: None.

None
app QueueTasks

QueueTasks instance. Default: None.

None
log Logger

Logger. Default: None.

None
Source code in src/qtasks/results/async_result.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
def __init__(
    self,
    uuid: Annotated[
        UUID | str | None,
        Doc("""
                UUID of the task.
                """),
    ] = None,
    app: Annotated[
        Optional[QueueTasks],
        Doc("""
                `QueueTasks` instance.

                Default: `qtasks._state.app_main`.
                """),
    ] = None,
    log: Annotated[
        Logger | None,
        Doc("""
                Logger.

                Default: `qtasks.logs.Logger`.
                """),
    ] = None,
):
    """
    Initializing an asynchronous result.

    Args:
        uuid (UUID | str, optional): UUID of the task. Default: None.
        app (QueueTasks, optional): `QueueTasks` instance. Default: None.
        log (Logger, optional): Logger. Default: None.
    """
    self._app = app or self._update_state()

    self.log = (
        log.with_subname(
            "AsyncResult", default_level=self._app.config.logs_default_level_client
        )
        if log
        else Logger(
            name=self._app.name,
            subname="AsyncResult",
            default_level=self._app.config.logs_default_level_client,
            format=self._app.config.logs_format,
        )
    )
    self._stop_event = asyncio.Event()

    self.uuid = uuid

result(timeout=100) async

Waiting for the task result.

Parameters:

Name Type Description Default
timeout float

Task timeout. Default: 100.

100

Returns:

Type Description
Union[Task, None]

Task | None: Task or None.

Source code in src/qtasks/results/async_result.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
async def result(
    self,
    timeout: Annotated[
        float,
        Doc("""
                Task timeout

                Default: `100`.
                """),
    ] = 100,
) -> Union[Task, None]:
    """
    Waiting for the task result.

    Args:
        timeout (float, optional): Task timeout. Default: `100`.

    Returns:
        Task | None: Task or None.
    """
    self._stop_event.clear()
    try:
        result = await asyncio.wait_for(self._execute_task(), timeout)
        self.log.debug(f"Task {result.uuid if result else None} is completed!")
        return result
    except asyncio.TimeoutError:
        self.log.warning(f"Task timed out after {timeout} seconds!")
        self._stop_event.set()
        return None