Перейти к содержанию

AsyncRedisCommandQueue

Async Redis command queue.

AsyncRedisCommandQueue

AsyncRedisCommandQueue - Асинхронный класс для работы с Redis.

Пример

import asyncio
from qtasks import QueueTasks
from qtasks.contrib.redis import AsyncRedisCommandQueue

redis_contrib = AsyncRedisCommandQueue(redis)
asyncio.run(redis_contrib.execute("hset", kwargs["name"], mapping=kwargs["mapping"]))
Source code in src/qtasks/contrib/redis/async_queue_client.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
class AsyncRedisCommandQueue:
    """
    `AsyncRedisCommandQueue` - Асинхронный класс для работы с `Redis`.

    ## Пример

    ```python
    import asyncio
    from qtasks import QueueTasks
    from qtasks.contrib.redis import AsyncRedisCommandQueue

    redis_contrib = AsyncRedisCommandQueue(redis)
    asyncio.run(redis_contrib.execute("hset", kwargs["name"], mapping=kwargs["mapping"]))
    ```
    """

    def __init__(self, redis: aioredis.Redis, log: Logger = None):
        """Экземпляр класса.

        Args:
            redis (redis.asyncio.Redis): класс `Redis`.
            log (Logger, optional): класс `qtasks.logs.Logger`. По умолчанию: `qtasks._state.log_main`.
        """
        self.log = self._get_log(log)
        self.redis = redis
        self.queue = asyncio.Queue()
        self.worker_task = None
        self.lock = asyncio.Lock()

    async def _worker(self):
        while not self.queue.empty():
            cmd, args, kwargs = await self.queue.get()
            self.log.debug(f"Задача {cmd} с параметрами {args} и {kwargs} вызвана")
            try:
                await getattr(self.redis, cmd)(*args, **kwargs)
            except Exception as e:
                self.log.error(f"Ошибка Redis команды {cmd}: {e}. Args: {args}, Kwargs: {kwargs}")
            self.queue.task_done()

        async with self.lock:
            self.worker_task = None

    async def execute(self, cmd: str, *args, **kwargs):
        """Запрос в `Redis`.

        Args:
            cmd (str): Команда.
            args(tuple, optional): Параметры к команде через *args.
            kwargs(dict, optional): Параметры к команде через *args.
        """
        await self.queue.put((cmd, args, kwargs))
        async with self.lock:
            if self.worker_task is None or self.worker_task.done():
                self.worker_task = asyncio.create_task(self._worker())

    def _get_log(self, log: Union[Logger, None]):
        if log is None:
            import qtasks._state

            log = qtasks._state.log_main
        return log.with_subname("AsyncRedisCommandQueue")

__init__(redis, log=None)

Экземпляр класса.

Parameters:

Name Type Description Default
redis Redis

класс Redis.

required
log Logger

класс qtasks.logs.Logger. По умолчанию: qtasks._state.log_main.

None
Source code in src/qtasks/contrib/redis/async_queue_client.py
26
27
28
29
30
31
32
33
34
35
36
37
def __init__(self, redis: aioredis.Redis, log: Logger = None):
    """Экземпляр класса.

    Args:
        redis (redis.asyncio.Redis): класс `Redis`.
        log (Logger, optional): класс `qtasks.logs.Logger`. По умолчанию: `qtasks._state.log_main`.
    """
    self.log = self._get_log(log)
    self.redis = redis
    self.queue = asyncio.Queue()
    self.worker_task = None
    self.lock = asyncio.Lock()

execute(cmd, *args, **kwargs) async

Запрос в Redis.

Parameters:

Name Type Description Default
cmd str

Команда.

required
args tuple

Параметры к команде через *args.

()
kwargs dict

Параметры к команде через *args.

{}
Source code in src/qtasks/contrib/redis/async_queue_client.py
52
53
54
55
56
57
58
59
60
61
62
63
async def execute(self, cmd: str, *args, **kwargs):
    """Запрос в `Redis`.

    Args:
        cmd (str): Команда.
        args(tuple, optional): Параметры к команде через *args.
        kwargs(dict, optional): Параметры к команде через *args.
    """
    await self.queue.put((cmd, args, kwargs))
    async with self.lock:
        if self.worker_task is None or self.worker_task.done():
            self.worker_task = asyncio.create_task(self._worker())