Skip to content

QueueConfig

QueueConfig Schema.

QueueConfig dataclass

Configuration of task queues.

Attributes:

Name Type Description
max_tasks_process int

Maximum tasks in process. Default: 10

running_older_tasks bool

Run past tasks. Default: False

delete_finished_tasks bool

Deleting completed tasks. Default: False

task_default_priority int

Default task priority. Default: 0

task_default_decode Callable | None

Default task result decoder. Default: None

logs_default_level_server int

Logging level for the server. Default: logging.INFO(20)

logs_default_level_client int | None

Logging level for the client. Default: logging.INFO(20)

logs_format str

Logging format. Default: %(asctime)s [%(name)s: %(levelname)s] (%(subname)s) %(message)s

result_time_interval float

Time interval for results. Default: 1.0

result_statuses_end list[str]

Results end statuses. Default: ["success", "error", "cancel"]

environ_prefix str

Prefix for environment variables. Default: QTASKS_

Source code in src/qtasks/configs/config.py
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
@dataclass
class QueueConfig:
    """
    Configuration of task queues.

    Attributes:
        max_tasks_process (int): Maximum tasks in process. Default: `10`
        running_older_tasks (bool): Run past tasks. Default: `False`
        delete_finished_tasks (bool): Deleting completed tasks. Default: `False`

        task_default_priority (int): Default task priority. Default: `0`
        task_default_decode (Callable | None): Default task result decoder. Default: `None`

        logs_default_level_server (int): Logging level for the server. Default: `logging.INFO(20)`
        logs_default_level_client (int | None): Logging level for the client. Default: `logging.INFO(20)`
        logs_format (str): Logging format. Default: `%(asctime)s [%(name)s: %(levelname)s] (%(subname)s) %(message)s`

        result_time_interval (float): Time interval for results. Default: `1.0`
        result_statuses_end (list[str]): Results end statuses. Default: `["success", "error", "cancel"]`

        environ_prefix (str): Prefix for environment variables. Default: `QTASKS_`
    """

    max_tasks_process: int = 10
    running_older_tasks: bool = False
    delete_finished_tasks: bool = False

    task_default_priority: int = 0
    task_default_decode: Optional[Callable] = None

    global_config_status_ttl = 20
    global_config_status_set_periodic = 17

    logs_default_level_server: int = logging.INFO
    logs_default_level_client: int = logging.INFO
    logs_format: str = "%(asctime)s [%(name)s: %(levelname)s] (%(subname)s) %(message)s"

    result_time_interval: float = 1.0
    result_statuses_end: list[str] = field(
        default_factory=lambda: [
            TaskStatusEnum.SUCCESS.value,
            TaskStatusEnum.ERROR.value,
            TaskStatusEnum.CANCEL.value,
        ]
    )

    environ_prefix: str = "QTASKS_"

    _callbacks: list[Callable[["QueueConfig", str, Any], None]] = field(
        default_factory=list, init=False, repr=False
    )
    _dynamic_fields: dict[str, Any] = field(
        default_factory=dict, init=False, repr=False
    )

    def subscribe(self, callback: Callable[["QueueConfig", str, Any], None]):
        """Subscribe to change."""
        self._callbacks.append(callback)

    def _notify(self, key: str, value: Any):
        for callback in self._callbacks:
            callback(self, key, value)

    def __getattr__(self, item):
        """Getting an attribute."""
        # Checking dynamic fields
        if item in self._dynamic_fields:
            return self._dynamic_fields[item]
        # Calling the default behavior of dataclass
        raise AttributeError(f"{type(self).__name__} has no attribute '{item}'")

    def __setattr__(self, key, value):
        """Setting the attribute."""
        # For dataclass fields
        if key in self.__annotations__:
            object.__setattr__(self, key, value)
            if (
                "_callbacks" in self.__dict__
            ):  # Checking for presence to avoid recursion
                self._notify(key, value)
        # For internal attributes
        elif key.startswith("_"):
            object.__setattr__(self, key, value)
        # For dynamic fields
        else:
            dynamic_fields = object.__getattribute__(self, "_dynamic_fields")
            dynamic_fields[key] = value
            self._notify(key, value)

    def __post_init__(self):
        """Initialization after creation."""
        env_vars = self._get_env_all()
        for key, value in env_vars.items():
            attr_key = key[len(self.environ_prefix) :].lower()
            if hasattr(self, attr_key):
                attr_type = type(getattr(self, attr_key))
                try:
                    if attr_type is bool:
                        casted_value = value.lower() in ("1", "true", "yes", "on")
                    else:
                        casted_value = attr_type(value)
                    setattr(self, attr_key, casted_value)
                except (ValueError, TypeError):
                    pass


    def _get_env_all(self) -> dict[str, Any]:
        """Getting environment variables with prefix."""
        return {k: v for k, v in os.environ.items() if k.startswith(self.environ_prefix)}

__getattr__(item)

Getting an attribute.

Source code in src/qtasks/configs/config.py
75
76
77
78
79
80
81
def __getattr__(self, item):
    """Getting an attribute."""
    # Checking dynamic fields
    if item in self._dynamic_fields:
        return self._dynamic_fields[item]
    # Calling the default behavior of dataclass
    raise AttributeError(f"{type(self).__name__} has no attribute '{item}'")

__post_init__()

Initialization after creation.

Source code in src/qtasks/configs/config.py
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
def __post_init__(self):
    """Initialization after creation."""
    env_vars = self._get_env_all()
    for key, value in env_vars.items():
        attr_key = key[len(self.environ_prefix) :].lower()
        if hasattr(self, attr_key):
            attr_type = type(getattr(self, attr_key))
            try:
                if attr_type is bool:
                    casted_value = value.lower() in ("1", "true", "yes", "on")
                else:
                    casted_value = attr_type(value)
                setattr(self, attr_key, casted_value)
            except (ValueError, TypeError):
                pass

__setattr__(key, value)

Setting the attribute.

Source code in src/qtasks/configs/config.py
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
def __setattr__(self, key, value):
    """Setting the attribute."""
    # For dataclass fields
    if key in self.__annotations__:
        object.__setattr__(self, key, value)
        if (
            "_callbacks" in self.__dict__
        ):  # Checking for presence to avoid recursion
            self._notify(key, value)
    # For internal attributes
    elif key.startswith("_"):
        object.__setattr__(self, key, value)
    # For dynamic fields
    else:
        dynamic_fields = object.__getattribute__(self, "_dynamic_fields")
        dynamic_fields[key] = value
        self._notify(key, value)

subscribe(callback)

Subscribe to change.

Source code in src/qtasks/configs/config.py
67
68
69
def subscribe(self, callback: Callable[["QueueConfig", str, Any], None]):
    """Subscribe to change."""
    self._callbacks.append(callback)