Перейти к содержанию

TaskExec

TaskPriority and TaskExec Schema.

TaskExecSchema dataclass

TaskExecSchema схема.

Parameters:

Name Type Description Default
priority int

Приоритет.

required
name str

Название.

required
func FunctionType

Функция задачи.

required
awaiting bool

Асинхронность задачи. По умолчанию: False

False
generating str | Literal[False]

Генерация задачи. По умолчанию: False

False
echo bool

Включить параметр self в задачу. По умолчанию: False

False
max_time float

Максимальное время выполнения задачи в секундах. По умолчанию: None

None
retry int

Количество попыток повторного выполнения задачи. По умолчанию: None

None
retry_on_exc List[Type[Exception]]

Исключения, при которых задача будет повторно выполнена. По умолчанию: None

None
decode Callable

Декодер результата задачи. По умолчанию: None

None
tags List[str]

Теги задачи. По умолчанию: None

None
description str

Описание задачи. По умолчанию: None.

None
generate_handler Callable

Генератор обработчика. По умолчанию: None

None
executor Type[BaseTaskExecutor]

Класс BaseTaskExecutor. По умолчанию: SyncTaskExecutor|AsyncTaskExecutor.

None
middlewares_before List[Type[TaskMiddleware]]

Мидлвари до выполнения задачи. По умолчанию: Пустой массив.

list()
middlewares_after List[Type[TaskMiddleware]]

Мидлвари после выполнения задачи. По умолчанию: Пустой массив.

list()
extra Dict[str, Any]

Дополнительные параметры задачи. По умолчанию: Пустой словарь.

dict()
Source code in src/qtasks/schemas/task_exec.py
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
@dataclass
class TaskExecSchema:
    """`TaskExecSchema` схема.

    Args:
        priority (int): Приоритет.
        name (str): Название.

        func (FunctionType): Функция задачи.
        awaiting (bool): Асинхронность задачи. По умолчанию: `False`
        generating (str|Literal[False]): Генерация задачи. По умолчанию: `False`

        echo (bool): Включить параметр self в задачу. По умолчанию: `False`

        max_time (float, optional): Максимальное время выполнения задачи в секундах. По умолчанию: `None`

        retry (int, optional): Количество попыток повторного выполнения задачи. По умолчанию: `None`
        retry_on_exc (List[Type[Exception]], optional): Исключения, при которых задача будет повторно выполнена. По умолчанию: `None`

        decode (Callable, optional): Декодер результата задачи. По умолчанию: `None`
        tags (List[str], optional): Теги задачи. По умолчанию: `None`
        description (str, optional): Описание задачи. По умолчанию: `None`.

        generate_handler (Callable, optional): Генератор обработчика. По умолчанию: `None`

        executor (Type[BaseTaskExecutor], optional): Класс `BaseTaskExecutor`. По умолчанию: `SyncTaskExecutor`|`AsyncTaskExecutor`.
        middlewares_before (List[Type[TaskMiddleware]]): Мидлвари до выполнения задачи. По умолчанию: `Пустой массив`.
        middlewares_after (List[Type[TaskMiddleware]]): Мидлвари после выполнения задачи. По умолчанию: `Пустой массив`.

        extra (Dict[str, Any]): Дополнительные параметры задачи. По умолчанию: `Пустой словарь`.

    """

    priority: int
    name: str

    func: FunctionType
    awaiting: bool = False
    generating: Union[str, Literal[False]] = False

    echo: bool = False

    max_time: Union[float, None] = None

    retry: Union[int, None] = None
    retry_on_exc: Union[List[Type[Exception]], None] = None

    decode: Union[Callable, None] = None
    tags: Union[List[str], None] = None
    description: Union[str, None] = None

    generate_handler: Union[Callable, None] = None

    executor: Union[Type["BaseTaskExecutor"], None] = None
    middlewares_before: List[Type["TaskMiddleware"]] = field(default_factory=list)
    middlewares_after: List[Type["TaskMiddleware"]] = field(default_factory=list)

    extra: dict = field(default_factory=dict)

    def add_middlewares_before(self, middlewares: List[Type["TaskMiddleware"]]) -> None:
        """Добавляет мидлвари к задаче.

        Args:
            middlewares (List[Type[TaskMiddleware]]): Список мидлварей.
        """
        self.middlewares_before.extend(middlewares)

    def add_middlewares_after(self, middlewares: List[Type["TaskMiddleware"]]) -> None:
        """Добавляет мидлвари к задаче.

        Args:
            middlewares (List[Type[TaskMiddleware]]): Список мидлварей.
        """
        self.middlewares_after.extend(middlewares)

add_middlewares_after(middlewares)

Добавляет мидлвари к задаче.

Parameters:

Name Type Description Default
middlewares List[Type[TaskMiddleware]]

Список мидлварей.

required
Source code in src/qtasks/schemas/task_exec.py
108
109
110
111
112
113
114
def add_middlewares_after(self, middlewares: List[Type["TaskMiddleware"]]) -> None:
    """Добавляет мидлвари к задаче.

    Args:
        middlewares (List[Type[TaskMiddleware]]): Список мидлварей.
    """
    self.middlewares_after.extend(middlewares)

add_middlewares_before(middlewares)

Добавляет мидлвари к задаче.

Parameters:

Name Type Description Default
middlewares List[Type[TaskMiddleware]]

Список мидлварей.

required
Source code in src/qtasks/schemas/task_exec.py
100
101
102
103
104
105
106
def add_middlewares_before(self, middlewares: List[Type["TaskMiddleware"]]) -> None:
    """Добавляет мидлвари к задаче.

    Args:
        middlewares (List[Type[TaskMiddleware]]): Список мидлварей.
    """
    self.middlewares_before.extend(middlewares)

TaskPrioritySchema dataclass

TaskPrioritySchema схема.

Parameters:

Name Type Description Default
priority int

Приоритет.

required
uuid UUID

UUID.

required
name str

Название.

required
args Tuple[str]

Аргументы типа args.

list()
kwargs Dict[str, str]

Аргументы типа kwargs.

dict()
created_at float

Дата создания в формате timestamp.

0.0
updated_at float

Дата обновления в формате timestamp.

0.0
Source code in src/qtasks/schemas/task_exec.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
@dataclass(order=True)
class TaskPrioritySchema:
    """`TaskPrioritySchema` схема.

    Args:
        priority (int): Приоритет.
        uuid (UUID): UUID.
        name (str): Название.

        args (Tuple[str]): Аргументы типа args.
        kwargs (Dict[str, str]): Аргументы типа kwargs.

        created_at (float): Дата создания в формате `timestamp`.
        updated_at (float): Дата обновления в формате `timestamp`.
    """

    priority: int
    uuid: UUID = field(compare=False)
    name: str = field(compare=False)

    args: list = field(default_factory=list, compare=False)
    kwargs: dict = field(default_factory=dict, compare=False)

    created_at: float = 0.0
    updated_at: float = 0.0