Перейти к содержанию

SyncRedisGlobalConfig

Sync Redis Global Config.

SyncRedisGlobalConfig

Bases: BaseGlobalConfig, SyncPluginMixin

Глобальный Конфиг, работающий через Redis и работает с глобальными значениями.

Пример

from qtasks import QueueTasks
from qtasks.configs import SyncRedisGlobalConfig
from qtasks.storage import SyncRedisStorage
from qtasks.brokers import SyncRedisBroker

global_config = SyncRedisGlobalConfig(name="QueueTasks", url="redis://localhost:6379/2")

storage = SyncRedisStorage(name="QueueTasks", global_config=global_config, url="redis://localhost:6379/2")

broker = SyncRedisBroker(name="QueueTasks", storage=storage, url="redis://localhost:6379/2")

app = QueueTasks(broker=broker)
Source code in src/qtasks/configs/sync_redisglobalconfig.py
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
class SyncRedisGlobalConfig(BaseGlobalConfig, SyncPluginMixin):
    """
    Глобальный Конфиг, работающий через Redis и работает с глобальными значениями.

    ## Пример

    ```python
    from qtasks import QueueTasks
    from qtasks.configs import SyncRedisGlobalConfig
    from qtasks.storage import SyncRedisStorage
    from qtasks.brokers import SyncRedisBroker

    global_config = SyncRedisGlobalConfig(name="QueueTasks", url="redis://localhost:6379/2")

    storage = SyncRedisStorage(name="QueueTasks", global_config=global_config, url="redis://localhost:6379/2")

    broker = SyncRedisBroker(name="QueueTasks", storage=storage, url="redis://localhost:6379/2")

    app = QueueTasks(broker=broker)
    ```
    """

    def __init__(
        self,
        name: Annotated[
            str,
            Doc(
                """
                    Имя проекта. Это имя также используется брокером.

                    По умолчанию: `QueueTasks`.
                    """
            ),
        ] = "QueueTasks",
        url: Annotated[
            str,
            Doc(
                """
                    URL для подключения к Redis.

                    По умолчанию: `redis://localhost:6379/0`.
                    """
            ),
        ] = "redis://localhost:6379/0",
        redis_connect: Annotated[
            Optional[redis.Redis],
            Doc(
                """
                    Внешний класс подключения к Redis.

                    По умолчанию: `None`.
                    """
            ),
        ] = None,
        config_name: Annotated[
            Optional[str],
            Doc(
                """
                    Имя Папки с Hash. Название обновляется на: `name:queue_name`.

                    По умолчанию: `name:GlobalConfig`.
                    """
            ),
        ] = None,
        log: Annotated[
            Optional[Logger],
            Doc(
                """
                    Логгер.

                    По умолчанию: `qtasks.logs.Logger`.
                    """
            ),
        ] = None,
        config: Annotated[
            Optional[QueueConfig],
            Doc(
                """
                    Конфиг.

                    По умолчанию: `qtasks.configs.config.QueueConfig`.
                    """
            ),
        ] = None,
        events: Annotated[
            Optional["BaseEvents"],
            Doc(
                """
                    События.

                    По умолчанию: `qtasks.events.SyncEvents`.
                    """
            ),
        ] = None,
    ):
        """Инициализация асинхронного Redis глобального конфига.

        Args:
            name (str, optional): Имя проекта. По умолчанию: "QueueTasks".
            url (str, optional): URL для подключения к Redis. По умолчанию: "redis://localhost:6379/0".
            redis_connect (redis.Redis, optional): Внешний класс подключения к Redis. По умолчанию: None.
            config_name (str, optional): Имя Папки с Hash. По умолчанию: None.
            log (Logger, optional): Логгер. По умолчанию: None.
            config (QueueConfig, optional): Конфигурация. По умолчанию: None.
            events (BaseEvents, optional): События. По умолчанию: `qtasks.events.SyncEvents`.
        """
        super().__init__(name=name, log=log, config=config, events=events)
        self.name = name
        self.url = url
        self.config_name = f"{self.name}:{config_name or 'GlobalConfig'}"
        self.events = self.events or SyncEvents()

        self.client = redis_connect or redis.from_url(
            self.url, decode_responses=True, encoding="utf-8"
        )
        self.running = False

    def set(self, name: str, key: str, value: str) -> None:
        """Добавить новое значение.

        Args:
            name (str): Имя.
            key (str): Ключ.
            value (str): Значение.
        """
        new_data = self._plugin_trigger(
            "global_config_set",
            global_config=self,
            name=name,
            key=key,
            value=value,
            return_last=True
        )
        if new_data:
            name = new_data.get("name", name)
            key = new_data.get("key", key)
            value = new_data.get("value", value)

        self.client.hset(name=f"{self.config_name}:{name}", key=key, value=value)
        return

    def get(self, key: str, name: str) -> Any:
        """Получить значение.

        Args:
            key (str): Ключ.
            name (str): Имя.

        Returns:
            Any: Значение.
        """
        result = self.client.hget(name=f"{self.config_name}:{key}", key=name)
        new_result = self._plugin_trigger(
            "global_config_get",
            global_config=self,
            get=result,
            return_last=True
        )
        if new_result:
            result = new_result.get("get", result)
        return result

    def get_all(self, key: str) -> Dict[str, Any]:
        """Получить все значения.

        Args:
            key (str): Ключ.

        Returns:
            Dict[str, Any]: Значения.
        """
        result = self.client.hgetall(name=f"{self.config_name}:{key}")
        new_result = self._plugin_trigger(
            "global_config_get_all",
            global_config=self,
            get=result,
            return_last=True
        )
        if new_result:
            result = new_result.get("get", result)
        return result

    def get_match(self, match: str) -> Union[Any, dict]:
        """Получить значения по паттерну.

        Args:
            match (str): Паттерн.

        Returns:
            Any | Dict[str, Any]: Значение или Значения.
        """
        result = self.client.hscan(key=self.config_name, match=match)
        new_result = self._plugin_trigger(
            "global_config_get_match",
            global_config=self,
            get=result,
            return_last=True
        )
        if new_result:
            result = new_result.get("get", result)
        return result

    def start(self) -> None:
        """Запуск Брокера. Эта функция задействуется основным экземпляром `QueueTasks` через `run_forever."""
        self._plugin_trigger("global_config_start", global_config=self)
        self.running = True
        global_config = GlobalConfigSchema(name=self.name, status="running")
        self.client.hset(
            name=f"{self.config_name}:main", mapping=global_config.__dict__
        )
        Thread(target=self._set_status, daemon=True).start()

    def stop(self) -> None:
        """Останавливает Глобальный Конфиг. Эта функция задействуется основным экземпляром `QueueTasks` после завершения функции `run_forever`."""
        self._plugin_trigger("global_config_stop", global_config=self)
        self.running = False
        self.client.close()
        return

    def _set_status(self):
        """Обновляет статус запуска глобального конфига."""
        self._plugin_trigger("global_config_set_status", global_config=self)
        ttl = self.config.global_config_status_ttl
        interval = self.config.global_config_status_set_periodic
        while self.running:
            self.client.expire(f"{self.config_name}:main", ttl)
            time.sleep(interval)

__init__(name='QueueTasks', url='redis://localhost:6379/0', redis_connect=None, config_name=None, log=None, config=None, events=None)

Инициализация асинхронного Redis глобального конфига.

Parameters:

Name Type Description Default
name str

Имя проекта. По умолчанию: "QueueTasks".

'QueueTasks'
url str

URL для подключения к Redis. По умолчанию: "redis://localhost:6379/0".

'redis://localhost:6379/0'
redis_connect Redis

Внешний класс подключения к Redis. По умолчанию: None.

None
config_name str

Имя Папки с Hash. По умолчанию: None.

None
log Logger

Логгер. По умолчанию: None.

None
config QueueConfig

Конфигурация. По умолчанию: None.

None
events BaseEvents

События. По умолчанию: qtasks.events.SyncEvents.

None
Source code in src/qtasks/configs/sync_redisglobalconfig.py
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
def __init__(
    self,
    name: Annotated[
        str,
        Doc(
            """
                Имя проекта. Это имя также используется брокером.

                По умолчанию: `QueueTasks`.
                """
        ),
    ] = "QueueTasks",
    url: Annotated[
        str,
        Doc(
            """
                URL для подключения к Redis.

                По умолчанию: `redis://localhost:6379/0`.
                """
        ),
    ] = "redis://localhost:6379/0",
    redis_connect: Annotated[
        Optional[redis.Redis],
        Doc(
            """
                Внешний класс подключения к Redis.

                По умолчанию: `None`.
                """
        ),
    ] = None,
    config_name: Annotated[
        Optional[str],
        Doc(
            """
                Имя Папки с Hash. Название обновляется на: `name:queue_name`.

                По умолчанию: `name:GlobalConfig`.
                """
        ),
    ] = None,
    log: Annotated[
        Optional[Logger],
        Doc(
            """
                Логгер.

                По умолчанию: `qtasks.logs.Logger`.
                """
        ),
    ] = None,
    config: Annotated[
        Optional[QueueConfig],
        Doc(
            """
                Конфиг.

                По умолчанию: `qtasks.configs.config.QueueConfig`.
                """
        ),
    ] = None,
    events: Annotated[
        Optional["BaseEvents"],
        Doc(
            """
                События.

                По умолчанию: `qtasks.events.SyncEvents`.
                """
        ),
    ] = None,
):
    """Инициализация асинхронного Redis глобального конфига.

    Args:
        name (str, optional): Имя проекта. По умолчанию: "QueueTasks".
        url (str, optional): URL для подключения к Redis. По умолчанию: "redis://localhost:6379/0".
        redis_connect (redis.Redis, optional): Внешний класс подключения к Redis. По умолчанию: None.
        config_name (str, optional): Имя Папки с Hash. По умолчанию: None.
        log (Logger, optional): Логгер. По умолчанию: None.
        config (QueueConfig, optional): Конфигурация. По умолчанию: None.
        events (BaseEvents, optional): События. По умолчанию: `qtasks.events.SyncEvents`.
    """
    super().__init__(name=name, log=log, config=config, events=events)
    self.name = name
    self.url = url
    self.config_name = f"{self.name}:{config_name or 'GlobalConfig'}"
    self.events = self.events or SyncEvents()

    self.client = redis_connect or redis.from_url(
        self.url, decode_responses=True, encoding="utf-8"
    )
    self.running = False

get(key, name)

Получить значение.

Parameters:

Name Type Description Default
key str

Ключ.

required
name str

Имя.

required

Returns:

Name Type Description
Any Any

Значение.

Source code in src/qtasks/configs/sync_redisglobalconfig.py
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
def get(self, key: str, name: str) -> Any:
    """Получить значение.

    Args:
        key (str): Ключ.
        name (str): Имя.

    Returns:
        Any: Значение.
    """
    result = self.client.hget(name=f"{self.config_name}:{key}", key=name)
    new_result = self._plugin_trigger(
        "global_config_get",
        global_config=self,
        get=result,
        return_last=True
    )
    if new_result:
        result = new_result.get("get", result)
    return result

get_all(key)

Получить все значения.

Parameters:

Name Type Description Default
key str

Ключ.

required

Returns:

Type Description
Dict[str, Any]

Dict[str, Any]: Значения.

Source code in src/qtasks/configs/sync_redisglobalconfig.py
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
def get_all(self, key: str) -> Dict[str, Any]:
    """Получить все значения.

    Args:
        key (str): Ключ.

    Returns:
        Dict[str, Any]: Значения.
    """
    result = self.client.hgetall(name=f"{self.config_name}:{key}")
    new_result = self._plugin_trigger(
        "global_config_get_all",
        global_config=self,
        get=result,
        return_last=True
    )
    if new_result:
        result = new_result.get("get", result)
    return result

get_match(match)

Получить значения по паттерну.

Parameters:

Name Type Description Default
match str

Паттерн.

required

Returns:

Type Description
Union[Any, dict]

Any | Dict[str, Any]: Значение или Значения.

Source code in src/qtasks/configs/sync_redisglobalconfig.py
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
def get_match(self, match: str) -> Union[Any, dict]:
    """Получить значения по паттерну.

    Args:
        match (str): Паттерн.

    Returns:
        Any | Dict[str, Any]: Значение или Значения.
    """
    result = self.client.hscan(key=self.config_name, match=match)
    new_result = self._plugin_trigger(
        "global_config_get_match",
        global_config=self,
        get=result,
        return_last=True
    )
    if new_result:
        result = new_result.get("get", result)
    return result

set(name, key, value)

Добавить новое значение.

Parameters:

Name Type Description Default
name str

Имя.

required
key str

Ключ.

required
value str

Значение.

required
Source code in src/qtasks/configs/sync_redisglobalconfig.py
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
def set(self, name: str, key: str, value: str) -> None:
    """Добавить новое значение.

    Args:
        name (str): Имя.
        key (str): Ключ.
        value (str): Значение.
    """
    new_data = self._plugin_trigger(
        "global_config_set",
        global_config=self,
        name=name,
        key=key,
        value=value,
        return_last=True
    )
    if new_data:
        name = new_data.get("name", name)
        key = new_data.get("key", key)
        value = new_data.get("value", value)

    self.client.hset(name=f"{self.config_name}:{name}", key=key, value=value)
    return

start()

Запуск Брокера. Эта функция задействуется основным экземпляром QueueTasks через `run_forever.

Source code in src/qtasks/configs/sync_redisglobalconfig.py
223
224
225
226
227
228
229
230
231
def start(self) -> None:
    """Запуск Брокера. Эта функция задействуется основным экземпляром `QueueTasks` через `run_forever."""
    self._plugin_trigger("global_config_start", global_config=self)
    self.running = True
    global_config = GlobalConfigSchema(name=self.name, status="running")
    self.client.hset(
        name=f"{self.config_name}:main", mapping=global_config.__dict__
    )
    Thread(target=self._set_status, daemon=True).start()

stop()

Останавливает Глобальный Конфиг. Эта функция задействуется основным экземпляром QueueTasks после завершения функции run_forever.

Source code in src/qtasks/configs/sync_redisglobalconfig.py
233
234
235
236
237
238
def stop(self) -> None:
    """Останавливает Глобальный Конфиг. Эта функция задействуется основным экземпляром `QueueTasks` после завершения функции `run_forever`."""
    self._plugin_trigger("global_config_stop", global_config=self)
    self.running = False
    self.client.close()
    return