Перейти к содержанию

States

Механизм State в QTasks позволяет безопасно хранить и передавать данные между задачами. Он реализован во встроенном плагине воркера (A)StatePlugin и предоставляет удобный интерфейс для работы с состоянием.


Пример использования State

from qtasks.plugins.states import AsyncState


class MyState(AsyncState):
    pass


@app.task
async def step1(state: MyState):
    await state.set("state", "await_phone")
    await state.update(step=1, prompt="Введите телефон")
    return "ok"


@app.task
async def step2(state: MyState):
    print(await state.get_all())

    cur = await state.get("state")
    if cur != "await_phone":
        return "error"

    await state.update(step=2)
    await state.delete("state")
    await state.clear()
    return "ok"

Вывод print(await state.get_all()):

{'state': 'await_phone', 'step': 1, 'prompt': 'Введите телефон'}

Сценарий:

  • step1 инициализирует состояние и записывает несколько значений.
  • step2 читает состояние, проверяет его, обновляет и очищает.

Таким образом, несколько задач могут работать с общим потокобезопасным состоянием.


Как это работает "под капотом"

Ниже описана упрощённая внутренняя логика (A)StatePlugin.

1. Поиск State в параметрах задачи

Плагин анализирует сигнатуру задачи и проверяет, является ли тип параметра наследником AsyncState или State:

issubclass(MyState, (AsyncState, State))

Если такой параметр найден, задача воспринимается как использующая состояние.

2. Инициализация State и StateRegistry

Если для MyState ещё не был вызван __init__, плагин:

  • создаёт экземпляр класса состояния (MyState()),
  • связывает его с внутренним реестром (A)syncStateRegistry.

Реестр отвечает за хранение значений "внутри" и использует блокировки:

  • asyncio.Lock() — для асинхронных задач,
  • threading.Lock() — для синхронных задач.

Это гарантирует, что параллельные задачи не смогут одновременно изменять одни и те же данные в неконсистентном виде.

3. Подмена параметра в задаче

После инициализации плагин заменяет параметр функции задачи на готовый объект MyState. Внутри тела задачи разработчик работает уже с полноценным API состояния.


API State / AsyncState

Ниже приведены основные методы состояния.

get

value = await state.get("key", default=None)
  • key — ключ состояния (если None, возвращаются все значения).
  • default — значение по умолчанию, если ключ не найден.

get_all

data = await state.get_all()

Возвращает словарь всех значений состояния.

set

await state.set("key", value)

Устанавливает значение состояния по ключу.

update

data = await state.update(step=1, prompt="Введите телефон")

Обновляет несколько значений сразу и возвращает обновлённый словарь.

delete

await state.delete("key")

Удаляет значение по ключу.

clear

await state.clear()

Очищает все значения состояния.


Задача механизма State

Механизм State решает задачу потокобезопасной передачи данных между задачами:

  • общая память для связанных задач (шаги сценария, цепочки, диалоги);
  • защита от гонок при параллельном доступе к состоянию;
  • единый интерфейс для чтения, записи и очистки данных.

Это удобный инструмент для построения сложных пошаговых процессов и сценариев, которые требуют сохранения контекста между задачами.