Перейти к содержанию

AsyncTaskExecutor

Async Task Executor.

AsyncTaskExecutor

Bases: BaseTaskExecutor, AsyncPluginMixin

AsyncTaskExecutor - Синхронный класс исполнителя задач. Используется по умолчанию в AsyncWorker.

Пример

import asyncio
from qtasks.executors import AsyncTaskExecutor

task_func = TaskExecSchema(...)
task_broker = TaskPrioritySchema(...)
executor = AsyncTaskExecutor(task_func, task_broker)
result = asyncio.run(executor.execute())
Source code in src/qtasks/executors/async_task_executor.py
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
class AsyncTaskExecutor(BaseTaskExecutor, AsyncPluginMixin):
    """
    `AsyncTaskExecutor` - Синхронный класс исполнителя задач. Используется по умолчанию в `AsyncWorker`.

    ## Пример

    ```python
    import asyncio
    from qtasks.executors import AsyncTaskExecutor

    task_func = TaskExecSchema(...)
    task_broker = TaskPrioritySchema(...)
    executor = AsyncTaskExecutor(task_func, task_broker)
    result = asyncio.run(executor.execute())
    ```
    """

    def __init__(
        self,
        task_func: Annotated[
            TaskExecSchema,
            Doc(
                """
                    `TaskExecSchema` схема.
                    """
            ),
        ],
        task_broker: Annotated[
            TaskPrioritySchema,
            Doc(
                """
                    `TaskPrioritySchema` схема.
                    """
            ),
        ],
        log: Annotated[
            Optional[Logger],
            Doc(
                """
                    Логгер.

                    По умолчанию: `qtasks.logs.Logger`.
                    """
            ),
        ] = None,
        plugins: Annotated[
            Optional[Dict[str, List[Type["BasePlugin"]]]],
            Doc(
                """
                    Массив Плагинов.

                    По умолчанию: `Пустой массив`.
                    """
            ),
        ] = None,
    ):
        """Инициализация класса. Происходит внутри `Worker` перед обработкой задачи.

        Args:
            task_func (TaskExecSchema): Схема `TaskExecSchema`.
            task_broker (TaskPrioritySchema): Схема `TaskPrioritySchema`.
            log (Logger, optional): класс `qtasks.logs.Logger`. По умолчанию: `qtasks._state.log_main`.
            plugins (Dict[str, List[Type[BasePlugin]]], optional): Массив плагинов. По умолчанию: `Пустой массив`.
        """
        super().__init__(
            task_func=task_func,
            task_broker=task_broker,
            log=log,
            plugins=plugins,
        )

    async def before_execute(self):
        """Вызывается перед выполнением задачи."""
        if self.task_func.echo:
            task_cls = SyncTask if not self.task_func.awaiting or self.task_func.generating == "sync" else AsyncTask
            self.echo = task_cls(
                task_name=self.task_broker.name,
                priority=self.task_broker.priority,
                echo=self.task_func.echo,
                retry=self.task_func.retry,
                retry_on_exc=self.task_func.retry_on_exc,
                decode=self.task_func.decode,
                tags=self.task_func.tags,
                description=self.task_func.description,
                executor=self.task_func.executor,
                middlewares_before=self.task_func.middlewares_before,
                middlewares_after=self.task_func.middlewares_after,
                extra=self.task_func.extra
            )
            self.echo.ctx._update(task_uuid=self.task_broker.uuid)
            self._args.insert(0, self.echo)

        args_from_func = self._extract_args_kwargs_from_func(self.task_func.func)
        args_info = self._build_args_info(args_from_func[0], args_from_func[1])
        new_args = await self._plugin_trigger(
            "task_executor_args_replace",
            task_executor=self,
            **{
                "args": self._args,
                "kw": self._kwargs,
                "args_info": args_info,
            },
            return_last=True
        )
        if new_args:
            self._args, self._kwargs = new_args.get("args", self._args), new_args.get("kw", self._kwargs)

        await self._plugin_trigger("task_executor_before_execute", task_executor=self)

    async def after_execute(self):
        """Вызов после запуска задач."""
        await self._plugin_trigger("task_executor_after_execute", task_executor=self)
        result: Any = await self._plugin_trigger(
            "task_executor_after_execute_result_replace", task_executor=self, result=self._result, return_last=True
        )
        if result:
            self._result = result.get("result", self._result)
        return

    async def execute_middlewares_before(self):
        """Вызов мидлварей до выполнения задачи."""
        await self._plugin_trigger(
            "task_executor_middlewares_execute",
            task_executor=self,
            middlewares_before=self.task_func.middlewares_before
        )
        for m in self.task_func.middlewares_before:
            m: "TaskMiddleware" = m(self)
            new_task_executor: BaseTaskExecutor = await m()
            if new_task_executor:
                self = new_task_executor
            self.log.debug(f"Middleware {m.name} для {self.task_func.name} был вызван.")

    async def execute_middlewares_after(self):
        """Вызов мидлварей после выполнения задачи."""
        await self._plugin_trigger(
            "task_executor_middlewares_execute",
            task_executor=self,
            middlewares_after=self.task_func.middlewares_after
        )
        for m in self.task_func.middlewares_after:
            m: "TaskMiddleware" = m(self)
            new_task_executor: BaseTaskExecutor = await m()
            if new_task_executor:
                self = new_task_executor
            self.log.debug(f"Middleware {m.name} для {self.task_func.name} был вызван.")

    async def run_task(self) -> Any:
        """Вызов задачи.

        Returns:
            Any: Результат задачи.
        """
        if self._args and self._kwargs:
            result = (
                await self.task_func.func(*self._args, **self._kwargs)
                if self.task_func.awaiting
                else self.task_func.func(*self._args, **self._kwargs)
            )
        elif self._args:
            result = (
                await self.task_func.func(*self._args)
                if self.task_func.awaiting
                else self.task_func.func(*self._args)
            )
        elif self._kwargs:
            result = (
                await self.task_func.func(**self._kwargs)
                if self.task_func.awaiting
                else self.task_func.func(**self._kwargs)
            )
        else:
            result = (
                await self.task_func.func()
                if self.task_func.awaiting
                else self.task_func.func()
            )

        if self.task_func.generating:
            return await self.run_task_gen(result)

        new_result = await self._plugin_trigger("task_executor_run_task", task_executor=self, result=result)
        if new_result:
            result = new_result.get("result", result)

        return result

    async def run_task_gen(self, func: AsyncGenerator) -> List[Any]:
        """Вызов генератора задачи.

        Args:
            func (FunctionType): Функция.

        Returns:
            Any: Результат задачи.
        """
        if self.echo:
            self.echo.ctx._update(generate_handler=self.task_func.generate_handler)

        results = []
        if self.task_func.generating == "async":
            async for result in func:
                if self.task_func.generate_handler:
                    result = await self._maybe_await(
                        self.task_func.generate_handler(result)
                    )
                results.append(result)

        elif self.task_func.generating == "sync":
            try:
                while True:
                    result = next(func)
                    if self.task_func.generate_handler:
                        result = await self._maybe_await(
                            self.task_func.generate_handler(result)
                        )
                    results.append(result)
            except StopIteration:
                pass
        new_results = await self._plugin_trigger("task_executor_run_task_gen", task_executor=self, results=results)
        if new_results:
            results = new_results.get("results", results)
        return results

    async def _maybe_await(self, value):
        if asyncio.iscoroutine(value):
            return await value
        return value

    async def execute(self, decode: bool = True) -> Union[Any, str]:
        """Вызов задачи.

        Args:
            decode (bool, optional): Декодирование результата задачи. По умолчанию: True.

        Returns:
            Any|str: Результат задачи.
        """
        self.log.debug(f"Вызван execute для {self.task_func.name}")
        await self.execute_middlewares_before()
        await self.before_execute()
        try:
            if self.task_func.max_time:
                self._result = await asyncio.wait_for(self.run_task(), timeout=self.task_func.max_time)
            else:
                self._result = await self.run_task()
        except TaskPluginTriggerError as e:
            new_result = await self._plugin_trigger(
                "task_executor_run_task_trigger_error",
                task_executor=self,
                task_func=self.task_func,
                task_broker=self.task_broker,
                e=e,
                return_last=True
            )
            if new_result:
                self._result = new_result.get("result", self._result)
            else:
                raise e
        except asyncio.TimeoutError:
            msg = f"Время выполнения задачи {self.task_func.name} превысило лимит {self.task_func.max_time} секунд"
            self.log.error(msg)
            raise asyncio.TimeoutError(msg)

        await self.after_execute()
        await self.execute_middlewares_after()
        if decode:
            self._result = await self.decode()
        return self._result

    async def decode(self) -> Any:
        """Декодирование задачи.

        Returns:
            Any: Результат задачи.
        """
        if self.task_func.decode is not None:
            result = await self._maybe_await(self.task_func.decode(self._result))
        else:
            result = json.dumps(self._result, ensure_ascii=False)
        new_result = await self._plugin_trigger("task_executor_decode", task_executor=self, result=result)
        if new_result:
            result = new_result.get("result", result)
        return result

__init__(task_func, task_broker, log=None, plugins=None)

Инициализация класса. Происходит внутри Worker перед обработкой задачи.

Parameters:

Name Type Description Default
task_func TaskExecSchema

Схема TaskExecSchema.

required
task_broker TaskPrioritySchema

Схема TaskPrioritySchema.

required
log Logger

класс qtasks.logs.Logger. По умолчанию: qtasks._state.log_main.

None
plugins Dict[str, List[Type[BasePlugin]]]

Массив плагинов. По умолчанию: Пустой массив.

None
Source code in src/qtasks/executors/async_task_executor.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
def __init__(
    self,
    task_func: Annotated[
        TaskExecSchema,
        Doc(
            """
                `TaskExecSchema` схема.
                """
        ),
    ],
    task_broker: Annotated[
        TaskPrioritySchema,
        Doc(
            """
                `TaskPrioritySchema` схема.
                """
        ),
    ],
    log: Annotated[
        Optional[Logger],
        Doc(
            """
                Логгер.

                По умолчанию: `qtasks.logs.Logger`.
                """
        ),
    ] = None,
    plugins: Annotated[
        Optional[Dict[str, List[Type["BasePlugin"]]]],
        Doc(
            """
                Массив Плагинов.

                По умолчанию: `Пустой массив`.
                """
        ),
    ] = None,
):
    """Инициализация класса. Происходит внутри `Worker` перед обработкой задачи.

    Args:
        task_func (TaskExecSchema): Схема `TaskExecSchema`.
        task_broker (TaskPrioritySchema): Схема `TaskPrioritySchema`.
        log (Logger, optional): класс `qtasks.logs.Logger`. По умолчанию: `qtasks._state.log_main`.
        plugins (Dict[str, List[Type[BasePlugin]]], optional): Массив плагинов. По умолчанию: `Пустой массив`.
    """
    super().__init__(
        task_func=task_func,
        task_broker=task_broker,
        log=log,
        plugins=plugins,
    )

after_execute() async

Вызов после запуска задач.

Source code in src/qtasks/executors/async_task_executor.py
131
132
133
134
135
136
137
138
139
async def after_execute(self):
    """Вызов после запуска задач."""
    await self._plugin_trigger("task_executor_after_execute", task_executor=self)
    result: Any = await self._plugin_trigger(
        "task_executor_after_execute_result_replace", task_executor=self, result=self._result, return_last=True
    )
    if result:
        self._result = result.get("result", self._result)
    return

before_execute() async

Вызывается перед выполнением задачи.

Source code in src/qtasks/executors/async_task_executor.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
async def before_execute(self):
    """Вызывается перед выполнением задачи."""
    if self.task_func.echo:
        task_cls = SyncTask if not self.task_func.awaiting or self.task_func.generating == "sync" else AsyncTask
        self.echo = task_cls(
            task_name=self.task_broker.name,
            priority=self.task_broker.priority,
            echo=self.task_func.echo,
            retry=self.task_func.retry,
            retry_on_exc=self.task_func.retry_on_exc,
            decode=self.task_func.decode,
            tags=self.task_func.tags,
            description=self.task_func.description,
            executor=self.task_func.executor,
            middlewares_before=self.task_func.middlewares_before,
            middlewares_after=self.task_func.middlewares_after,
            extra=self.task_func.extra
        )
        self.echo.ctx._update(task_uuid=self.task_broker.uuid)
        self._args.insert(0, self.echo)

    args_from_func = self._extract_args_kwargs_from_func(self.task_func.func)
    args_info = self._build_args_info(args_from_func[0], args_from_func[1])
    new_args = await self._plugin_trigger(
        "task_executor_args_replace",
        task_executor=self,
        **{
            "args": self._args,
            "kw": self._kwargs,
            "args_info": args_info,
        },
        return_last=True
    )
    if new_args:
        self._args, self._kwargs = new_args.get("args", self._args), new_args.get("kw", self._kwargs)

    await self._plugin_trigger("task_executor_before_execute", task_executor=self)

decode() async

Декодирование задачи.

Returns:

Name Type Description
Any Any

Результат задачи.

Source code in src/qtasks/executors/async_task_executor.py
292
293
294
295
296
297
298
299
300
301
302
303
304
305
async def decode(self) -> Any:
    """Декодирование задачи.

    Returns:
        Any: Результат задачи.
    """
    if self.task_func.decode is not None:
        result = await self._maybe_await(self.task_func.decode(self._result))
    else:
        result = json.dumps(self._result, ensure_ascii=False)
    new_result = await self._plugin_trigger("task_executor_decode", task_executor=self, result=result)
    if new_result:
        result = new_result.get("result", result)
    return result

execute(decode=True) async

Вызов задачи.

Parameters:

Name Type Description Default
decode bool

Декодирование результата задачи. По умолчанию: True.

True

Returns:

Type Description
Union[Any, str]

Any|str: Результат задачи.

Source code in src/qtasks/executors/async_task_executor.py
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
async def execute(self, decode: bool = True) -> Union[Any, str]:
    """Вызов задачи.

    Args:
        decode (bool, optional): Декодирование результата задачи. По умолчанию: True.

    Returns:
        Any|str: Результат задачи.
    """
    self.log.debug(f"Вызван execute для {self.task_func.name}")
    await self.execute_middlewares_before()
    await self.before_execute()
    try:
        if self.task_func.max_time:
            self._result = await asyncio.wait_for(self.run_task(), timeout=self.task_func.max_time)
        else:
            self._result = await self.run_task()
    except TaskPluginTriggerError as e:
        new_result = await self._plugin_trigger(
            "task_executor_run_task_trigger_error",
            task_executor=self,
            task_func=self.task_func,
            task_broker=self.task_broker,
            e=e,
            return_last=True
        )
        if new_result:
            self._result = new_result.get("result", self._result)
        else:
            raise e
    except asyncio.TimeoutError:
        msg = f"Время выполнения задачи {self.task_func.name} превысило лимит {self.task_func.max_time} секунд"
        self.log.error(msg)
        raise asyncio.TimeoutError(msg)

    await self.after_execute()
    await self.execute_middlewares_after()
    if decode:
        self._result = await self.decode()
    return self._result

execute_middlewares_after() async

Вызов мидлварей после выполнения задачи.

Source code in src/qtasks/executors/async_task_executor.py
155
156
157
158
159
160
161
162
163
164
165
166
167
async def execute_middlewares_after(self):
    """Вызов мидлварей после выполнения задачи."""
    await self._plugin_trigger(
        "task_executor_middlewares_execute",
        task_executor=self,
        middlewares_after=self.task_func.middlewares_after
    )
    for m in self.task_func.middlewares_after:
        m: "TaskMiddleware" = m(self)
        new_task_executor: BaseTaskExecutor = await m()
        if new_task_executor:
            self = new_task_executor
        self.log.debug(f"Middleware {m.name} для {self.task_func.name} был вызван.")

execute_middlewares_before() async

Вызов мидлварей до выполнения задачи.

Source code in src/qtasks/executors/async_task_executor.py
141
142
143
144
145
146
147
148
149
150
151
152
153
async def execute_middlewares_before(self):
    """Вызов мидлварей до выполнения задачи."""
    await self._plugin_trigger(
        "task_executor_middlewares_execute",
        task_executor=self,
        middlewares_before=self.task_func.middlewares_before
    )
    for m in self.task_func.middlewares_before:
        m: "TaskMiddleware" = m(self)
        new_task_executor: BaseTaskExecutor = await m()
        if new_task_executor:
            self = new_task_executor
        self.log.debug(f"Middleware {m.name} для {self.task_func.name} был вызван.")

run_task() async

Вызов задачи.

Returns:

Name Type Description
Any Any

Результат задачи.

Source code in src/qtasks/executors/async_task_executor.py
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
async def run_task(self) -> Any:
    """Вызов задачи.

    Returns:
        Any: Результат задачи.
    """
    if self._args and self._kwargs:
        result = (
            await self.task_func.func(*self._args, **self._kwargs)
            if self.task_func.awaiting
            else self.task_func.func(*self._args, **self._kwargs)
        )
    elif self._args:
        result = (
            await self.task_func.func(*self._args)
            if self.task_func.awaiting
            else self.task_func.func(*self._args)
        )
    elif self._kwargs:
        result = (
            await self.task_func.func(**self._kwargs)
            if self.task_func.awaiting
            else self.task_func.func(**self._kwargs)
        )
    else:
        result = (
            await self.task_func.func()
            if self.task_func.awaiting
            else self.task_func.func()
        )

    if self.task_func.generating:
        return await self.run_task_gen(result)

    new_result = await self._plugin_trigger("task_executor_run_task", task_executor=self, result=result)
    if new_result:
        result = new_result.get("result", result)

    return result

run_task_gen(func) async

Вызов генератора задачи.

Parameters:

Name Type Description Default
func FunctionType

Функция.

required

Returns:

Name Type Description
Any List[Any]

Результат задачи.

Source code in src/qtasks/executors/async_task_executor.py
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
async def run_task_gen(self, func: AsyncGenerator) -> List[Any]:
    """Вызов генератора задачи.

    Args:
        func (FunctionType): Функция.

    Returns:
        Any: Результат задачи.
    """
    if self.echo:
        self.echo.ctx._update(generate_handler=self.task_func.generate_handler)

    results = []
    if self.task_func.generating == "async":
        async for result in func:
            if self.task_func.generate_handler:
                result = await self._maybe_await(
                    self.task_func.generate_handler(result)
                )
            results.append(result)

    elif self.task_func.generating == "sync":
        try:
            while True:
                result = next(func)
                if self.task_func.generate_handler:
                    result = await self._maybe_await(
                        self.task_func.generate_handler(result)
                    )
                results.append(result)
        except StopIteration:
            pass
    new_results = await self._plugin_trigger("task_executor_run_task_gen", task_executor=self, results=results)
    if new_results:
        results = new_results.get("results", results)
    return results