Skip to content

BaseTaskExecutor - пишем свой исполнитель задачи

Base Task Executor.

BaseTaskExecutor

Bases: Generic[TAsyncFlag], ABC

BaseTaskExecutor - An abstract class that is the foundation for task executor classes.

Example

from qtasks.executors.base import BaseTaskExecutor

class MyTaskExecutor(BaseTaskExecutor):
    def __init__(self, name: str):
        super().__init__(name=name)
        pass
Source code in src/qtasks/executors/base.py
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
class BaseTaskExecutor(Generic[TAsyncFlag], ABC):
    """
    `BaseTaskExecutor` - An abstract class that is the foundation for task executor classes.

    ## Example

    ```python
    from qtasks.executors.base import BaseTaskExecutor

    class MyTaskExecutor(BaseTaskExecutor):
        def __init__(self, name: str):
            super().__init__(name=name)
            pass
    ```
    """

    def __init__(
        self,
        task_func: Annotated[
            TaskExecSchema,
            Doc("""
                    `TaskExecSchema` schema.
                    """),
        ],
        task_broker: Annotated[
            TaskPrioritySchema,
            Doc("""
                    `TaskPrioritySchema` schema.
                    """),
        ],
        log: Annotated[
            Logger | None,
            Doc("""
                    Logger.

                    Default: `qtasks.logs.Logger`.
                    """),
        ] = None,
        plugins: Annotated[
            dict[str, list[BasePlugin]] | None,
            Doc("""
                    Array of Plugins.

                    Default: `Empty array`.
                    """),
        ] = None,
    ):
        """
        Initializing the class. Occurs inside a `Worker` before a task is processed.

        Args:
            task_func (TaskExecSchema): Schema `TaskExecSchema`.
            task_broker(TaskPrioritySchema): Schema `TaskPrioritySchema`.
            log (Logger, optional): class `qtasks.logs.Logger`. Default: `qtasks._state.log_main`.
            plugins (Dict[str, List[BasePlugin]], optional): Plugin dictionary. Default: `Empty dictionary`.
        """
        self.task_func = task_func
        self.task_broker = task_broker
        self._args = self.task_broker.args.copy()
        self._kwargs = self.task_broker.kwargs.copy()
        self._result: Any = None
        self.echo = None

        self.decode_cls = lambda res: json.dumps(res, ensure_ascii=False)

        self.log = log
        if self.log is None:
            import qtasks._state

            self.log = qtasks._state.log_main
        self.log = self.log.with_subname(self.__class__.__name__)

        self.plugins = plugins or {}

    @overload
    def before_execute(self: BaseTaskExecutor[Literal[False]]) -> None: ...

    @overload
    async def before_execute(self: BaseTaskExecutor[Literal[True]]) -> None: ...

    def before_execute(self) -> None | Awaitable[None]:
        """Called before a task is executed."""
        pass

    @overload
    def after_execute(self: BaseTaskExecutor[Literal[False]]) -> None: ...

    @overload
    async def after_execute(self: BaseTaskExecutor[Literal[True]]) -> None: ...

    def after_execute(self) -> None | Awaitable[None]:
        """Called after a task has completed."""
        pass

    @overload
    def execute_middlewares_before(
        self: BaseTaskExecutor[Literal[False]],
    ) -> None: ...

    @overload
    async def execute_middlewares_before(
        self: BaseTaskExecutor[Literal[True]],
    ) -> None: ...

    def execute_middlewares_before(self) -> None | Awaitable[None]:
        """Calling middleware before the task is completed."""
        pass

    @overload
    def execute_middlewares_after(self: BaseTaskExecutor[Literal[False]]) -> None: ...

    @overload
    async def execute_middlewares_after(
        self: BaseTaskExecutor[Literal[True]],
    ) -> None: ...

    def execute_middlewares_after(self) -> None | Awaitable[None]:
        """Calling middleware after completing a task."""
        pass

    @overload
    def run_task(self: BaseTaskExecutor[Literal[False]]) -> Any: ...

    @overload
    async def run_task(self: BaseTaskExecutor[Literal[True]]) -> Any: ...

    def run_task(self) -> Any | Awaitable[Any]:
        """
        Calling a task.

        Returns:
            Any: Result of the task.
        """
        pass

    @overload
    def execute(self: BaseTaskExecutor[Literal[False]], decode: bool = True) -> str:
        """
        Task processing.

        Args:
            decode (bool, optional): Decoding. Default: `True`.

        Returns:
            str: Result of the task.
        """
        ...

    @overload
    def execute(self: BaseTaskExecutor[Literal[False]], decode: bool = False) -> Any:
        """
        Task processing.

        Args:
            decode (bool, optional): Decoding. Default: `True`.

        Returns:
            Any: Result of the task.
        """
        ...

    @overload
    async def execute(
        self: BaseTaskExecutor[Literal[True]], decode: bool = True
    ) -> str:
        """
        Task processing.

        Args:
            decode (bool, optional): Decoding. Default: `True`.

        Returns:
            str: Result of the task.
        """
        ...

    @overload
    async def execute(
        self: BaseTaskExecutor[Literal[True]], decode: bool = False
    ) -> Any:
        """
        Task processing.

        Args:
            decode (bool, optional): Decoding. Default: `True`.

        Returns:
            Any: Result of the task.
        """
        ...

    @abstractmethod
    def execute(
        self, decode=None
    ) -> Any | str | Awaitable[Any | str]:
        """
        Task processing.

        Args:
            decode (bool, optional): Decoding. Default: `True`.

        Returns:
            Any|str: Result of the task.
        """
        pass

    @overload
    def decode(self: BaseTaskExecutor[Literal[False]]) -> str: ...

    @overload
    async def decode(self: BaseTaskExecutor[Literal[True]]) -> str: ...

    def decode(self) -> str | Awaitable[str]:
        """
        Decoding the task.

        Returns:
            str: Result of the task.
        """
        return ""

    def add_plugin(
        self,
        plugin: Annotated[
            BasePlugin,
            Doc("""
                    Plugin.
                    """),
        ],
        trigger_names: Annotated[
            list[str] | None,
            Doc("""
                    The name of the triggers for the plugin.

                    Default: Default: will be added to `Globals`.
                    """),
        ] = None,
    ) -> None:
        """
        Add a plugin to the class.

        Args:
            plugin (BasePlugin): Plugin
            trigger_names (List[str], optional): The name of the triggers for the plugin. Default: will be added to `Globals`.
        """
        trigger_names = trigger_names or ["Globals"]

        for name in trigger_names:
            if name not in self.plugins:
                self.plugins.update({name: [plugin]})
            else:
                self.plugins[name].append(plugin)
        return

    def _extract_args_kwargs_from_func(self, func: Any) -> tuple[list, dict]:
        """
        Retrieves the values of the arguments from a function if they are given as default values.

        Args:
            func (Callable): The function from which args and kwargs are retrieved.

        Returns:
            Tuple[list, dict]: args and kwargs ready to be passed to `_build_args_info`.
        """
        sig = inspect.signature(func)
        args = []
        kwargs = {}

        for name, param in sig.parameters.items():
            if param.default is not inspect.Parameter.empty:
                # Named argument (has a default value)
                kwargs[name] = param.default
            elif param.kind in (
                inspect.Parameter.POSITIONAL_ONLY,
                inspect.Parameter.POSITIONAL_OR_KEYWORD,
            ):
                # Positional argument without a default value (just None)
                args.append(None)

        return args, kwargs

    def _build_args_info(self, args: list, kwargs: dict) -> list[ArgMeta]:
        """
        Constructs an ArgMeta list of args and kwargs based on function annotations.

        Args:
            args (list): Positional arguments.
            kwargs (dict): Named arguments.

        Returns:
            List[ArgMeta]: List of argument metadata.
        """
        args_info: list[ArgMeta] = []
        func = self.task_func.func

        try:
            sig = inspect.signature(func)
            parameters = list(sig.parameters.items())
        except (ValueError, TypeError):
            parameters = []

        annotations = getattr(func, "__annotations__", {})

        # Handling positional arguments
        for idx, value in enumerate(args):
            param_name = parameters[idx][0] if idx < len(parameters) else f"arg{idx}"
            annotation = annotations.get(param_name)
            origin = get_origin(annotation)
            raw_type = get_args(annotation)[0] if get_args(annotation) else annotation
            args_info.append(
                ArgMeta(
                    name=param_name,
                    value=value,
                    origin=origin,
                    raw_type=raw_type,
                    annotation=annotation,
                    is_kwarg=False,
                    index=idx,
                )
            )

        # Handling named arguments
        for key, value in kwargs.items():
            annotation = annotations.get(key)
            origin = get_origin(annotation)
            raw_type = get_args(annotation)[0] if get_args(annotation) else annotation
            args_info.append(
                ArgMeta(
                    name=key,
                    value=value,
                    origin=origin,
                    raw_type=raw_type,
                    annotation=annotation,
                    is_kwarg=True,
                    key=key,
                )
            )

        return args_info

__init__(task_func, task_broker, log=None, plugins=None)

Initializing the class. Occurs inside a Worker before a task is processed.

Parameters:

Name Type Description Default
task_func TaskExecSchema

Schema TaskExecSchema.

required
task_broker TaskPrioritySchema

Schema TaskPrioritySchema.

required
log Logger

class qtasks.logs.Logger. Default: qtasks._state.log_main.

None
plugins Dict[str, List[BasePlugin]]

Plugin dictionary. Default: Empty dictionary.

None
Source code in src/qtasks/executors/base.py
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
def __init__(
    self,
    task_func: Annotated[
        TaskExecSchema,
        Doc("""
                `TaskExecSchema` schema.
                """),
    ],
    task_broker: Annotated[
        TaskPrioritySchema,
        Doc("""
                `TaskPrioritySchema` schema.
                """),
    ],
    log: Annotated[
        Logger | None,
        Doc("""
                Logger.

                Default: `qtasks.logs.Logger`.
                """),
    ] = None,
    plugins: Annotated[
        dict[str, list[BasePlugin]] | None,
        Doc("""
                Array of Plugins.

                Default: `Empty array`.
                """),
    ] = None,
):
    """
    Initializing the class. Occurs inside a `Worker` before a task is processed.

    Args:
        task_func (TaskExecSchema): Schema `TaskExecSchema`.
        task_broker(TaskPrioritySchema): Schema `TaskPrioritySchema`.
        log (Logger, optional): class `qtasks.logs.Logger`. Default: `qtasks._state.log_main`.
        plugins (Dict[str, List[BasePlugin]], optional): Plugin dictionary. Default: `Empty dictionary`.
    """
    self.task_func = task_func
    self.task_broker = task_broker
    self._args = self.task_broker.args.copy()
    self._kwargs = self.task_broker.kwargs.copy()
    self._result: Any = None
    self.echo = None

    self.decode_cls = lambda res: json.dumps(res, ensure_ascii=False)

    self.log = log
    if self.log is None:
        import qtasks._state

        self.log = qtasks._state.log_main
    self.log = self.log.with_subname(self.__class__.__name__)

    self.plugins = plugins or {}

add_plugin(plugin, trigger_names=None)

Add a plugin to the class.

Parameters:

Name Type Description Default
plugin BasePlugin

Plugin

required
trigger_names List[str]

The name of the triggers for the plugin. Default: will be added to Globals.

None
Source code in src/qtasks/executors/base.py
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
def add_plugin(
    self,
    plugin: Annotated[
        BasePlugin,
        Doc("""
                Plugin.
                """),
    ],
    trigger_names: Annotated[
        list[str] | None,
        Doc("""
                The name of the triggers for the plugin.

                Default: Default: will be added to `Globals`.
                """),
    ] = None,
) -> None:
    """
    Add a plugin to the class.

    Args:
        plugin (BasePlugin): Plugin
        trigger_names (List[str], optional): The name of the triggers for the plugin. Default: will be added to `Globals`.
    """
    trigger_names = trigger_names or ["Globals"]

    for name in trigger_names:
        if name not in self.plugins:
            self.plugins.update({name: [plugin]})
        else:
            self.plugins[name].append(plugin)
    return

after_execute()

after_execute() -> None
after_execute() -> None

Called after a task has completed.

Source code in src/qtasks/executors/base.py
120
121
122
def after_execute(self) -> None | Awaitable[None]:
    """Called after a task has completed."""
    pass

before_execute()

before_execute() -> None
before_execute() -> None

Called before a task is executed.

Source code in src/qtasks/executors/base.py
110
111
112
def before_execute(self) -> None | Awaitable[None]:
    """Called before a task is executed."""
    pass

decode()

decode() -> str
decode() -> str

Decoding the task.

Returns:

Name Type Description
str str | Awaitable[str]

Result of the task.

Source code in src/qtasks/executors/base.py
242
243
244
245
246
247
248
249
def decode(self) -> str | Awaitable[str]:
    """
    Decoding the task.

    Returns:
        str: Result of the task.
    """
    return ""

execute(decode=None) abstractmethod

execute(decode: bool = True) -> str
execute(decode: bool = False) -> Any
execute(decode: bool = True) -> str
execute(decode: bool = False) -> Any

Task processing.

Parameters:

Name Type Description Default
decode bool

Decoding. Default: True.

None

Returns:

Type Description
Any | str | Awaitable[Any | str]

Any|str: Result of the task.

Source code in src/qtasks/executors/base.py
221
222
223
224
225
226
227
228
229
230
231
232
233
234
@abstractmethod
def execute(
    self, decode=None
) -> Any | str | Awaitable[Any | str]:
    """
    Task processing.

    Args:
        decode (bool, optional): Decoding. Default: `True`.

    Returns:
        Any|str: Result of the task.
    """
    pass

execute_middlewares_after()

execute_middlewares_after() -> None
execute_middlewares_after() -> None

Calling middleware after completing a task.

Source code in src/qtasks/executors/base.py
146
147
148
def execute_middlewares_after(self) -> None | Awaitable[None]:
    """Calling middleware after completing a task."""
    pass

execute_middlewares_before()

execute_middlewares_before() -> None
execute_middlewares_before() -> None

Calling middleware before the task is completed.

Source code in src/qtasks/executors/base.py
134
135
136
def execute_middlewares_before(self) -> None | Awaitable[None]:
    """Calling middleware before the task is completed."""
    pass

run_task()

run_task() -> Any
run_task() -> Any

Calling a task.

Returns:

Name Type Description
Any Any | Awaitable[Any]

Result of the task.

Source code in src/qtasks/executors/base.py
156
157
158
159
160
161
162
163
def run_task(self) -> Any | Awaitable[Any]:
    """
    Calling a task.

    Returns:
        Any: Result of the task.
    """
    pass