Skip to content

TaskMiddleware

Base Task Middleware.

TaskMiddleware

Bases: BaseMiddleware

TaskMiddleware - An abstract class that is the foundation for task Middleware classes for TaskExecutor.

Example

from qtasks.middlewares import TaskMiddleware
from qtasks.executors.base import BaseTaskExecutor

class MyTaskMiddleware(TaskMiddleware):
    def __init__(self, task_executor: BaseTaskExecutor):
        super().__init__(name="MyTaskMiddleware")
        self.task_executor = task_executor
Source code in src/qtasks/middlewares/task.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
class TaskMiddleware(BaseMiddleware):
    """
    `TaskMiddleware` - An abstract class that is the foundation for task Middleware classes for `TaskExecutor`.

    ## Example

    ```python
    from qtasks.middlewares import TaskMiddleware
    from qtasks.executors.base import BaseTaskExecutor

    class MyTaskMiddleware(TaskMiddleware):
        def __init__(self, task_executor: BaseTaskExecutor):
            super().__init__(name="MyTaskMiddleware")
            self.task_executor = task_executor
    ```
    """

    def __init__(self, task_executor: BaseTaskExecutor):
        """
        Initializing the task middleware.

        Args:
            task_executor (BaseTaskExecutor): A task executor instance.
        """
        super().__init__(name="TaskMiddleware")
        self.task_executor = task_executor

    def __call__(self, *args, **kwargs) -> Any:
        """
        Processing the task middleware call.

        Returns:
            Any: Processing result.
        """
        pass

__call__(*args, **kwargs)

Processing the task middleware call.

Returns:

Name Type Description
Any Any

Processing result.

Source code in src/qtasks/middlewares/task.py
39
40
41
42
43
44
45
46
def __call__(self, *args, **kwargs) -> Any:
    """
    Processing the task middleware call.

    Returns:
        Any: Processing result.
    """
    pass

__init__(task_executor)

Initializing the task middleware.

Parameters:

Name Type Description Default
task_executor BaseTaskExecutor

A task executor instance.

required
Source code in src/qtasks/middlewares/task.py
29
30
31
32
33
34
35
36
37
def __init__(self, task_executor: BaseTaskExecutor):
    """
    Initializing the task middleware.

    Args:
        task_executor (BaseTaskExecutor): A task executor instance.
    """
    super().__init__(name="TaskMiddleware")
    self.task_executor = task_executor