Перейти к содержанию

TaskStatus

Task Status Schema.

BaseTaskStatusSchema dataclass

BaseTaskStatusSchema схема.

Parameters:

Name Type Description Default
status str

Статус.

required
task_name str

Название.

''
priority int

Приоритет.

0
args Tuple[str]

Аргументы типа args.

'[]'
kwargs Dict[str, str]

Аргументы типа kwargs.

'{}'
created_at float

Дата создания в формате timestamp.

0.0
updated_at float

Дата обновления в формате timestamp.

time()
returning str | None

Результат. По умолчанию: None.

required
traceback str | None

Трассировка ошибок. По умолчанию: None.

required
Source code in src/qtasks/schemas/task_status.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
@dataclass
class BaseTaskStatusSchema:
    """`BaseTaskStatusSchema` схема.

    Args:
        status (str): Статус.
        task_name (str): Название.
        priority (int): Приоритет.
        args (Tuple[str]): Аргументы типа args.
        kwargs (Dict[str, str]): Аргументы типа kwargs.

        created_at (float): Дата создания в формате `timestamp`.
        updated_at (float): Дата обновления в формате `timestamp`.
        returning (str | None): Результат. По умолчанию: `None`.
        traceback (str | None): Трассировка ошибок. По умолчанию: `None`.
    """

    task_name: str = ""
    priority: int = 0

    args: Tuple[str] = field(default="[]")
    kwargs: Dict[str, str] = field(default="{}")

    created_at: float = 0.0
    updated_at: float = field(default_factory=time)

    def __post_init__(self):
        """Преобразовать аргументы в формат JSON."""
        if not isinstance(self.args, str):
            self.args = json.dumps(self.args)
        if not isinstance(self.kwargs, str):
            self.kwargs = json.dumps(self.kwargs)

__post_init__()

Преобразовать аргументы в формат JSON.

Source code in src/qtasks/schemas/task_status.py
37
38
39
40
41
42
def __post_init__(self):
    """Преобразовать аргументы в формат JSON."""
    if not isinstance(self.args, str):
        self.args = json.dumps(self.args)
    if not isinstance(self.kwargs, str):
        self.kwargs = json.dumps(self.kwargs)

TaskStatusCancelSchema dataclass

Bases: BaseTaskStatusSchema

TaskStatusCancelSchema схема.

Parameters:

Name Type Description Default
status str

Статус.

CANCEL.value
cancel_reason str

Причина отмены задачи.

''
Source code in src/qtasks/schemas/task_status.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
@dataclass
class TaskStatusCancelSchema(BaseTaskStatusSchema):
    """`TaskStatusCancelSchema` схема.

    Args:
        status (str): Статус.
        cancel_reason (str): Причина отмены задачи.
    """

    status: str = TaskStatusEnum.CANCEL.value
    cancel_reason: str = ""

TaskStatusErrorSchema dataclass

Bases: BaseTaskStatusSchema

TaskStatusErrorSchema схема.

Parameters:

Name Type Description Default
status str

Статус.

ERROR.value
traceback str

Трассировка ошибок.

''
Source code in src/qtasks/schemas/task_status.py
80
81
82
83
84
85
86
87
88
89
90
@dataclass
class TaskStatusErrorSchema(BaseTaskStatusSchema):
    """`TaskStatusErrorSchema` схема.

    Args:
        status (str): Статус.
        traceback (str): Трассировка ошибок.
    """

    status: str = TaskStatusEnum.ERROR.value
    traceback: str = ""

TaskStatusNewSchema dataclass

Bases: BaseTaskStatusSchema

TaskStatusNewSchema схема.

Parameters:

Name Type Description Default
status str

Статус.

NEW.value
Source code in src/qtasks/schemas/task_status.py
45
46
47
48
49
50
51
52
53
@dataclass
class TaskStatusNewSchema(BaseTaskStatusSchema):
    """`TaskStatusNewSchema` схема.

    Args:
        status (str): Статус.
    """

    status: str = TaskStatusEnum.NEW.value

TaskStatusProcessSchema dataclass

Bases: BaseTaskStatusSchema

TaskStatusProcessSchema схема.

Parameters:

Name Type Description Default
status str

Статус.

PROCESS.value
Source code in src/qtasks/schemas/task_status.py
56
57
58
59
60
61
62
63
64
@dataclass
class TaskStatusProcessSchema(BaseTaskStatusSchema):
    """`TaskStatusProcessSchema` схема.

    Args:
        status (str): Статус.
    """

    status: str = TaskStatusEnum.PROCESS.value

TaskStatusSuccessSchema dataclass

Bases: BaseTaskStatusSchema

TaskStatusSuccessSchema схема.

Parameters:

Name Type Description Default
status str

Статус.

SUCCESS.value
returning str

Результат.

''
Source code in src/qtasks/schemas/task_status.py
67
68
69
70
71
72
73
74
75
76
77
@dataclass
class TaskStatusSuccessSchema(BaseTaskStatusSchema):
    """`TaskStatusSuccessSchema` схема.

    Args:
        status (str): Статус.
        returning (str): Результат.
    """

    status: str = TaskStatusEnum.SUCCESS.value
    returning: str = ""