Skip to content

SyncResult

Sync Result.

SyncResult

SyncResult - Synchronous class for waiting for the result of a task.

Example

from qtasks import QueueTasks
from qtasks.results import SyncResult

app = QueueTasks()

task = app.add_task(task_name="test")
result = SyncResult(uuid=task.uuid).result(timeout=50)
Source code in src/qtasks/results/sync_result.py
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
class SyncResult:
    """
    `SyncResult` - Synchronous class for waiting for the result of a task.

    ## Example

    ```python

    from qtasks import QueueTasks
    from qtasks.results import SyncResult

    app = QueueTasks()

    task = app.add_task(task_name="test")
    result = SyncResult(uuid=task.uuid).result(timeout=50)
    ```
    """

    def __init__(
        self,
        uuid: Annotated[
            UUID | str | None,
            Doc("""
                    UUID of the task.
                    """),
        ] = None,
        app: Annotated[
            Optional[QueueTasks],
            Doc("""
                    `QueueTasks` instance.

                    Default: `qtasks._state.app_main`.
                    """),
        ] = None,
        log: Annotated[
            Logger | None,
            Doc("""
                    Logger.

                    Default: `qtasks.logs.Logger`.
                    """),
        ] = None,
    ):
        """
        Initializing a synchronous result.

        Args:
            uuid (UUID | str, optional): UUID of the task. Default: None.
            app (QueueTasks, optional): `QueueTasks` instance. Default: None.
            log (Logger, optional): Logger. Default: None.
        """
        self._app = app or self._update_state()

        self.log = (
            log.with_subname(
                "SyncResult", default_level=self._app.config.logs_default_level_client
            )
            if log
            else Logger(
                name=self._app.name,
                subname="SyncResult",
                default_level=self._app.config.logs_default_level_client,
                format=self._app.config.logs_format,
            )
        )
        self._stop_event = threading.Event()

        self.uuid = uuid

    def result(
        self,
        timeout: Annotated[
            float,
            Doc("""
                    Task timeout

                    Default: `100`.
                    """),
        ] = 100,
    ) -> Union[Task, None]:
        """
        Waiting for the task result.

        Args:
            timeout (float, optional): Task timeout. Default: `100`.

        Returns:
            Task | None: Task or None.
        """
        self._stop_event.clear()
        with ThreadPoolExecutor(max_workers=1) as executor:
            future = executor.submit(self._execute_task)
            try:
                result = future.result(timeout=timeout)
                self.log.debug(f"Task {result.uuid if result else None} is completed!")
                return result
            except TimeoutError:
                self.log.warning(f"Task timed out after {timeout} seconds!")
                self._stop_event.set()
                return None

    def _execute_task(self) -> Union[Task, None]:
        if not self.uuid:
            raise ValueError("Task UUID is not set.")

        uuid = self.uuid
        while True:
            if self._stop_event.is_set():
                break

            task = self._app.get(uuid=uuid)

            if not task:
                self.log.warning(f"Task {uuid} not found!")
                return None

            if task.retry and task.retry > 0 and task.retry_child_uuid:
                uuid = task.retry_child_uuid
                continue

            if not task or task.status not in self._app.config.result_statuses_end:
                time.sleep(self._app.config.result_time_interval)
                continue

            return task

    def _update_state(self) -> QueueTasks:
        import qtasks._state

        if qtasks._state.app_main is None:
            raise ImportError("Unable to get app!")
        app = qtasks._state.app_main
        return app  # type: ignore

__init__(uuid=None, app=None, log=None)

Initializing a synchronous result.

Parameters:

Name Type Description Default
uuid UUID | str

UUID of the task. Default: None.

None
app QueueTasks

QueueTasks instance. Default: None.

None
log Logger

Logger. Default: None.

None
Source code in src/qtasks/results/sync_result.py
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
def __init__(
    self,
    uuid: Annotated[
        UUID | str | None,
        Doc("""
                UUID of the task.
                """),
    ] = None,
    app: Annotated[
        Optional[QueueTasks],
        Doc("""
                `QueueTasks` instance.

                Default: `qtasks._state.app_main`.
                """),
    ] = None,
    log: Annotated[
        Logger | None,
        Doc("""
                Logger.

                Default: `qtasks.logs.Logger`.
                """),
    ] = None,
):
    """
    Initializing a synchronous result.

    Args:
        uuid (UUID | str, optional): UUID of the task. Default: None.
        app (QueueTasks, optional): `QueueTasks` instance. Default: None.
        log (Logger, optional): Logger. Default: None.
    """
    self._app = app or self._update_state()

    self.log = (
        log.with_subname(
            "SyncResult", default_level=self._app.config.logs_default_level_client
        )
        if log
        else Logger(
            name=self._app.name,
            subname="SyncResult",
            default_level=self._app.config.logs_default_level_client,
            format=self._app.config.logs_format,
        )
    )
    self._stop_event = threading.Event()

    self.uuid = uuid

result(timeout=100)

Waiting for the task result.

Parameters:

Name Type Description Default
timeout float

Task timeout. Default: 100.

100

Returns:

Type Description
Union[Task, None]

Task | None: Task or None.

Source code in src/qtasks/results/sync_result.py
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
def result(
    self,
    timeout: Annotated[
        float,
        Doc("""
                Task timeout

                Default: `100`.
                """),
    ] = 100,
) -> Union[Task, None]:
    """
    Waiting for the task result.

    Args:
        timeout (float, optional): Task timeout. Default: `100`.

    Returns:
        Task | None: Task or None.
    """
    self._stop_event.clear()
    with ThreadPoolExecutor(max_workers=1) as executor:
        future = executor.submit(self._execute_task)
        try:
            result = future.result(timeout=timeout)
            self.log.debug(f"Task {result.uuid if result else None} is completed!")
            return result
        except TimeoutError:
            self.log.warning(f"Task timed out after {timeout} seconds!")
            self._stop_event.set()
            return None