Перейти к содержанию

SyncRedisCommandQueue

Sync Redis command queue.

SyncRedisCommandQueue

SyncRedisCommandQueue - Асинхронный класс для работы с Redis.

Пример

from qtasks import QueueTasks
from qtasks.contrib.redis import SyncRedisCommandQueue

redis_contrib = SyncRedisCommandQueue(redis)
redis_contrib.execute("hset", kwargs["name"], mapping=kwargs["mapping"])
Source code in src/qtasks/contrib/redis/sync_queue_client.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
class SyncRedisCommandQueue:
    """
    `SyncRedisCommandQueue` - Асинхронный класс для работы с `Redis`.

    ## Пример

    ```python
    from qtasks import QueueTasks
    from qtasks.contrib.redis import SyncRedisCommandQueue

    redis_contrib = SyncRedisCommandQueue(redis)
    redis_contrib.execute("hset", kwargs["name"], mapping=kwargs["mapping"])
    ```
    """

    def __init__(self, redis: redis.Redis, log: Logger = None):
        """Экземпляр класса.

        Args:
            redis (redis.asyncio.Redis): класс `Redis`.
            log (Logger, optional): класс `qtasks.logs.Logger`. По умолчанию: `qtasks._state.log_main`.
        """
        self.log = self._get_log(log)
        self.redis = redis
        self.queue = Queue()
        self.worker_thread = None
        self.lock = threading.Lock()

    def _worker(self):
        while not self.queue.empty():
            try:
                cmd, args, kwargs = self.queue.get(timeout=2)
                self.log.debug(f"Задача {cmd} с параметрами {args} и {kwargs} вызвана")
                try:
                    getattr(self.redis, cmd)(*args, **kwargs)
                except Exception as e:
                    self.log.error(f"Ошибка Redis команды {cmd}: {e}. Args: {args}, Kwargs: {kwargs}")
                self.queue.task_done()
            except Empty:
                break

        with self.lock:
            self.worker_thread = None

    def execute(self, cmd: str, *args, **kwargs):
        """Запрос в `Redis`.

        Args:
            cmd (str): Команда.
            args(tuple, optional): Параметры к команде через *args.
            kwargs(dict, optional): Параметры к команде через *args.
        """
        self.queue.put((cmd, args, kwargs))
        with self.lock:
            if self.worker_thread is None or not self.worker_thread.is_alive():
                self.worker_thread = threading.Thread(target=self._worker, daemon=True)
                self.worker_thread.start()

    def _get_log(self, log: Union[Logger, None]):
        if log is None:
            import qtasks._state

            log = qtasks._state.log_main
        return log.with_subname("SyncRedisCommandQueue")

__init__(redis, log=None)

Экземпляр класса.

Parameters:

Name Type Description Default
redis Redis

класс Redis.

required
log Logger

класс qtasks.logs.Logger. По умолчанию: qtasks._state.log_main.

None
Source code in src/qtasks/contrib/redis/sync_queue_client.py
26
27
28
29
30
31
32
33
34
35
36
37
def __init__(self, redis: redis.Redis, log: Logger = None):
    """Экземпляр класса.

    Args:
        redis (redis.asyncio.Redis): класс `Redis`.
        log (Logger, optional): класс `qtasks.logs.Logger`. По умолчанию: `qtasks._state.log_main`.
    """
    self.log = self._get_log(log)
    self.redis = redis
    self.queue = Queue()
    self.worker_thread = None
    self.lock = threading.Lock()

execute(cmd, *args, **kwargs)

Запрос в Redis.

Parameters:

Name Type Description Default
cmd str

Команда.

required
args tuple

Параметры к команде через *args.

()
kwargs dict

Параметры к команде через *args.

{}
Source code in src/qtasks/contrib/redis/sync_queue_client.py
55
56
57
58
59
60
61
62
63
64
65
66
67
def execute(self, cmd: str, *args, **kwargs):
    """Запрос в `Redis`.

    Args:
        cmd (str): Команда.
        args(tuple, optional): Параметры к команде через *args.
        kwargs(dict, optional): Параметры к команде через *args.
    """
    self.queue.put((cmd, args, kwargs))
    with self.lock:
        if self.worker_thread is None or not self.worker_thread.is_alive():
            self.worker_thread = threading.Thread(target=self._worker, daemon=True)
            self.worker_thread.start()