Перейти к содержанию

BaseStarter - Пишем свой Стартер

Base starter.

BaseStarter

Bases: ABC

BaseStarter - Абстрактный класс, который является фундаментом для Стартеров.

Пример

from qtasks import QueueTasks
from qtasks.starters.base import BaseStarter

class MyStarter(BaseStarter):
    def __init__(self, name: str = None, broker = None, worker = None):
        super().__init__(name=name, broker = None, worker = None)
        pass
Source code in src/qtasks/starters/base.py
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
class BaseStarter(ABC):
    """
    `BaseStarter` - Абстрактный класс, который является фундаментом для Стартеров.

    ## Пример

    ```python
    from qtasks import QueueTasks
    from qtasks.starters.base import BaseStarter

    class MyStarter(BaseStarter):
        def __init__(self, name: str = None, broker = None, worker = None):
            super().__init__(name=name, broker = None, worker = None)
            pass
    ```
    """

    def __init__(
        self,
        name: Annotated[
            Optional[str],
            Doc(
                """
                    Имя проекта. Это имя можно использовать для тегов для Стартеров.

                    По умолчанию: `None`.
                    """
            ),
        ] = None,
        broker: Annotated[
            Optional["BaseBroker"],
            Doc(
                """
                    Брокер.

                    По умолчанию: `None`.
                    """
            ),
        ] = None,
        worker: Annotated[
            Optional["BaseWorker"],
            Doc(
                """
                    Воркер.

                    По умолчанию: `None`.
                    """
            ),
        ] = None,
        log: Annotated[
            Optional[Logger],
            Doc(
                """
                    Логгер.

                    По умолчанию: `qtasks.logs.Logger`.
                    """
            ),
        ] = None,
        config: Annotated[
            Optional[QueueConfig],
            Doc(
                """
                    Конфиг.

                    По умолчанию: `qtasks.configs.config.QueueConfig`.
                    """
            ),
        ] = None,
        events: Annotated[
            Optional["BaseEvents"],
            Doc(
                """
                    События.

                    По умолчанию: `None`.
                    """
            ),
        ] = None,
    ):
        """Инициализация базового стартера.

        Args:
            name (str, optional): Имя проекта. По умолчанию: None.
            broker (BaseBroker, optional): Брокер. По умолчанию: None.
            worker (BaseWorker, optional): Воркер. По умолчанию: None.
            log (Logger, optional): Логгер. По умолчанию: `qtasks.logs.Logger`.
            config (QueueConfig, optional): Конфиг. По умолчанию: `qtasks.configs.config.QueueConfig`.
            events (BaseEvents, optional): События. По умолчанию: `None`.
        """
        self.name = name
        self.config = config or QueueConfig()
        self.log = (
            log.with_subname("Starter")
            if log
            else Logger(
                name=self.name,
                subname="Starter",
                default_level=self.config.logs_default_level_server,
                format=self.config.logs_format,
            )
        )
        self.events = events

        self.broker = broker
        self.worker = worker

        self.plugins: Dict[str, List["BasePlugin"]] = {}

        self.init_plugins()

    @abstractmethod
    def start(self) -> None:
        """Запуск Стартера. Эта функция задействуется основным экземпляром `QueueTasks` через `run_forever`."""

    @abstractmethod
    def stop(self):
        """Останавливает Стартер. Эта функция задействуется основным экземпляром `QueueTasks` после завершения функции `run_forever`."""
        pass

    def add_plugin(
        self,
        plugin: Annotated[
            "BasePlugin",
            Doc(
                """
                    Плагин.
                    """
            ),
        ],
        trigger_names: Annotated[
            Optional[List[str]],
            Doc(
                """
                    Имя триггеров для плагина.

                    По умолчанию: По умолчанию: будет добавлен в `Globals`.
                    """
            ),
        ] = None,
    ) -> None:
        """Добавить плагин в класс.

        Args:
            plugin (BasePlugin): Плагин
            trigger_names (List[str], optional): Имя триггеров для плагина. По умолчанию: будет добавлен в `Globals`.
        """
        trigger_names = trigger_names or ["Globals"]

        for name in trigger_names:
            if name not in self.plugins:
                self.plugins.update({name: [plugin]})
            else:
                self.plugins[name].append(plugin)
        return

    def update_configs(
        self,
        config: Annotated[
            QueueConfig,
            Doc(
                """
                    Конфиг.
                    """
            ),
        ],
    ):
        """Обновить конфиги всем компонентам.

        Args:
            config (QueueConfig): Конфиг.
        """
        self.log.debug("Конфиг обновлен")
        if self.worker:
            self.worker.update_config(config)
        if self.broker:
            self.broker.update_config(config)
            if self.broker.storage:
                self.broker.storage.update_config(config)
                if self.broker.storage.global_config:
                    self.broker.storage.global_config.update_config(config)

    def init_plugins(self):
        """Инициализация плагинов."""
        pass

__init__(name=None, broker=None, worker=None, log=None, config=None, events=None)

Инициализация базового стартера.

Parameters:

Name Type Description Default
name str

Имя проекта. По умолчанию: None.

None
broker BaseBroker

Брокер. По умолчанию: None.

None
worker BaseWorker

Воркер. По умолчанию: None.

None
log Logger

Логгер. По умолчанию: qtasks.logs.Logger.

None
config QueueConfig

Конфиг. По умолчанию: qtasks.configs.config.QueueConfig.

None
events BaseEvents

События. По умолчанию: None.

None
Source code in src/qtasks/starters/base.py
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
def __init__(
    self,
    name: Annotated[
        Optional[str],
        Doc(
            """
                Имя проекта. Это имя можно использовать для тегов для Стартеров.

                По умолчанию: `None`.
                """
        ),
    ] = None,
    broker: Annotated[
        Optional["BaseBroker"],
        Doc(
            """
                Брокер.

                По умолчанию: `None`.
                """
        ),
    ] = None,
    worker: Annotated[
        Optional["BaseWorker"],
        Doc(
            """
                Воркер.

                По умолчанию: `None`.
                """
        ),
    ] = None,
    log: Annotated[
        Optional[Logger],
        Doc(
            """
                Логгер.

                По умолчанию: `qtasks.logs.Logger`.
                """
        ),
    ] = None,
    config: Annotated[
        Optional[QueueConfig],
        Doc(
            """
                Конфиг.

                По умолчанию: `qtasks.configs.config.QueueConfig`.
                """
        ),
    ] = None,
    events: Annotated[
        Optional["BaseEvents"],
        Doc(
            """
                События.

                По умолчанию: `None`.
                """
        ),
    ] = None,
):
    """Инициализация базового стартера.

    Args:
        name (str, optional): Имя проекта. По умолчанию: None.
        broker (BaseBroker, optional): Брокер. По умолчанию: None.
        worker (BaseWorker, optional): Воркер. По умолчанию: None.
        log (Logger, optional): Логгер. По умолчанию: `qtasks.logs.Logger`.
        config (QueueConfig, optional): Конфиг. По умолчанию: `qtasks.configs.config.QueueConfig`.
        events (BaseEvents, optional): События. По умолчанию: `None`.
    """
    self.name = name
    self.config = config or QueueConfig()
    self.log = (
        log.with_subname("Starter")
        if log
        else Logger(
            name=self.name,
            subname="Starter",
            default_level=self.config.logs_default_level_server,
            format=self.config.logs_format,
        )
    )
    self.events = events

    self.broker = broker
    self.worker = worker

    self.plugins: Dict[str, List["BasePlugin"]] = {}

    self.init_plugins()

add_plugin(plugin, trigger_names=None)

Добавить плагин в класс.

Parameters:

Name Type Description Default
plugin BasePlugin

Плагин

required
trigger_names List[str]

Имя триггеров для плагина. По умолчанию: будет добавлен в Globals.

None
Source code in src/qtasks/starters/base.py
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
def add_plugin(
    self,
    plugin: Annotated[
        "BasePlugin",
        Doc(
            """
                Плагин.
                """
        ),
    ],
    trigger_names: Annotated[
        Optional[List[str]],
        Doc(
            """
                Имя триггеров для плагина.

                По умолчанию: По умолчанию: будет добавлен в `Globals`.
                """
        ),
    ] = None,
) -> None:
    """Добавить плагин в класс.

    Args:
        plugin (BasePlugin): Плагин
        trigger_names (List[str], optional): Имя триггеров для плагина. По умолчанию: будет добавлен в `Globals`.
    """
    trigger_names = trigger_names or ["Globals"]

    for name in trigger_names:
        if name not in self.plugins:
            self.plugins.update({name: [plugin]})
        else:
            self.plugins[name].append(plugin)
    return

init_plugins()

Инициализация плагинов.

Source code in src/qtasks/starters/base.py
199
200
201
def init_plugins(self):
    """Инициализация плагинов."""
    pass

start() abstractmethod

Запуск Стартера. Эта функция задействуется основным экземпляром QueueTasks через run_forever.

Source code in src/qtasks/starters/base.py
128
129
130
@abstractmethod
def start(self) -> None:
    """Запуск Стартера. Эта функция задействуется основным экземпляром `QueueTasks` через `run_forever`."""

stop() abstractmethod

Останавливает Стартер. Эта функция задействуется основным экземпляром QueueTasks после завершения функции run_forever.

Source code in src/qtasks/starters/base.py
132
133
134
135
@abstractmethod
def stop(self):
    """Останавливает Стартер. Эта функция задействуется основным экземпляром `QueueTasks` после завершения функции `run_forever`."""
    pass

update_configs(config)

Обновить конфиги всем компонентам.

Parameters:

Name Type Description Default
config QueueConfig

Конфиг.

required
Source code in src/qtasks/starters/base.py
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
def update_configs(
    self,
    config: Annotated[
        QueueConfig,
        Doc(
            """
                Конфиг.
                """
        ),
    ],
):
    """Обновить конфиги всем компонентам.

    Args:
        config (QueueConfig): Конфиг.
    """
    self.log.debug("Конфиг обновлен")
    if self.worker:
        self.worker.update_config(config)
    if self.broker:
        self.broker.update_config(config)
        if self.broker.storage:
            self.broker.storage.update_config(config)
            if self.broker.storage.global_config:
                self.broker.storage.global_config.update_config(config)