Skip to content

SyncRedisCommandQueue

Sync Redis command queue.

SyncRedisCommandQueue

SyncRedisCommandQueue - Synchronous class for working with Redis.

Example

from qtasks import QueueTasks
from qtasks.contrib.redis import SyncRedisCommandQueue

redis_contrib = SyncRedisCommandQueue(redis)
redis_contrib.execute("hset", kwargs["name"], mapping=kwargs["mapping"])
Source code in src/qtasks/contrib/redis/sync_queue_client.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
class SyncRedisCommandQueue:
    """
    `SyncRedisCommandQueue` - Synchronous class for working with `Redis`.

    ## Example

    ```python
    from qtasks import QueueTasks
    from qtasks.contrib.redis import SyncRedisCommandQueue

    redis_contrib = SyncRedisCommandQueue(redis)
    redis_contrib.execute("hset", kwargs["name"], mapping=kwargs["mapping"])
    ```
    """

    def __init__(self, redis: redis.Redis, log: Logger | None = None):
        """
        An instance of a class.

        Args:
            redis (redis.Redis): class `Redis`.
            log (Logger, optional): class `qtasks.logs.Logger`. Default: `qtasks._state.log_main`.
        """
        self.log = self._get_log(log)
        self.redis = redis
        self.queue = Queue()
        self.worker_thread = None
        self.lock = threading.Lock()

    def _worker(self):
        while not self.queue.empty():
            try:
                cmd, args, kwargs = self.queue.get(timeout=2)
                self.log.debug(f"Task {cmd} with parameters {args} and {kwargs} executed")
                try:
                    getattr(self.redis, cmd)(*args, **kwargs)
                except Exception as e:
                    self.log.error(
                        f"Error executing Redis command {cmd}: {e}. Args: {args}, Kwargs: {kwargs}"
                    )
                self.queue.task_done()
            except Empty:
                break

        with self.lock:
            self.worker_thread = None

    def execute(self, cmd: str, *args, **kwargs):
        """
        Query in `Redis`.

        Args:
            cmd (str): Command.
            args(tuple, optional): Parameters to the command via *args.
            kwargs(dict, optional): Parameters to the command via *args.
        """
        self.queue.put((cmd, args, kwargs))
        with self.lock:
            if self.worker_thread is None or not self.worker_thread.is_alive():
                self.worker_thread = threading.Thread(target=self._worker, daemon=True)
                self.worker_thread.start()

    def _get_log(self, log: Logger | None):
        if log is None:
            import qtasks._state

            log = qtasks._state.log_main
        return log.with_subname("SyncRedisCommandQueue")

__init__(redis, log=None)

An instance of a class.

Parameters:

Name Type Description Default
redis Redis

class Redis.

required
log Logger

class qtasks.logs.Logger. Default: qtasks._state.log_main.

None
Source code in src/qtasks/contrib/redis/sync_queue_client.py
26
27
28
29
30
31
32
33
34
35
36
37
38
def __init__(self, redis: redis.Redis, log: Logger | None = None):
    """
    An instance of a class.

    Args:
        redis (redis.Redis): class `Redis`.
        log (Logger, optional): class `qtasks.logs.Logger`. Default: `qtasks._state.log_main`.
    """
    self.log = self._get_log(log)
    self.redis = redis
    self.queue = Queue()
    self.worker_thread = None
    self.lock = threading.Lock()

execute(cmd, *args, **kwargs)

Query in Redis.

Parameters:

Name Type Description Default
cmd str

Command.

required
args tuple

Parameters to the command via *args.

()
kwargs dict

Parameters to the command via *args.

{}
Source code in src/qtasks/contrib/redis/sync_queue_client.py
58
59
60
61
62
63
64
65
66
67
68
69
70
71
def execute(self, cmd: str, *args, **kwargs):
    """
    Query in `Redis`.

    Args:
        cmd (str): Command.
        args(tuple, optional): Parameters to the command via *args.
        kwargs(dict, optional): Parameters to the command via *args.
    """
    self.queue.put((cmd, args, kwargs))
    with self.lock:
        if self.worker_thread is None or not self.worker_thread.is_alive():
            self.worker_thread = threading.Thread(target=self._worker, daemon=True)
            self.worker_thread.start()