Skip to content

BaseTestCase - Пишем свой Кейс тестирования

Base test case.

BaseTestCase

Bases: Generic[TAsyncFlag], ABC

BaseTestCase - An abstract class that is the foundation for TestCase.

Example

from qtasks import QueueTasks
from qtasks.tests.base import BaseTestCase

class MyTestCase(BaseTestCase):
    def __init__(self, app: QueueTasks, name: str|None = None):
        super().__init__(app=app, name=name)
        pass
Source code in src/qtasks/tests/base.py
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
class BaseTestCase(Generic[TAsyncFlag], ABC):
    """
    `BaseTestCase` - An abstract class that is the foundation for TestCase.

    ## Example

    ```python
    from qtasks import QueueTasks
    from qtasks.tests.base import BaseTestCase

    class MyTestCase(BaseTestCase):
        def __init__(self, app: QueueTasks, name: str|None = None):
            super().__init__(app=app, name=name)
            pass
    ```
    """

    def __init__(
        self,
        app: Annotated[
            Union["QueueTasks", "aioQueueTasks"],
            Doc("""
                    Main copy.
                    """),
        ],
        name: Annotated[
            str | None,
            Doc("""
                    Project name. This name can be used for test components.

                    Default: `None`.
                    """),
        ] = None,
    ):
        """Test case initialization."""
        self.app = app

        self.name = name
        self.config = QueueConfig()
        self.test_config = TestConfig()

    @overload
    def start(self: "BaseTestCase[Literal[False]]", **kwargs) -> None:
        """Launches a test case."""
        pass

    @overload
    async def start(self: "BaseTestCase[Literal[True]]", **kwargs) -> None:
        """Launches a test case."""
        pass

    @abstractmethod
    def start(self, **kwargs) -> None | Awaitable[None]:
        """Launches a test case."""
        pass

    @overload
    def stop(self: "BaseTestCase[Literal[False]]", **kwargs) -> None:
        """Launches a test case."""
        pass

    @overload
    async def stop(self: "BaseTestCase[Literal[True]]", **kwargs) -> None:
        """Launches a test case."""
        pass

    @abstractmethod
    def stop(self, **kwargs) -> None | Awaitable[None]:
        """Stops the test case."""
        pass

    def update_config(
        self,
        config: Annotated[
            QueueConfig,
            Doc("""
                    Config.
                    """),
        ],
    ) -> None:
        """
        Updates the broker config.

        Args:
            config (QueueConfig): Config.
        """
        self.config = config
        return

    def settings(
        self, test_config: TestConfig | None = None, awaiting: bool | None = False
    ) -> None:
        """
        Test settings.

        Args:
            test_config (TestConfig, optional): Test config. Default: `TestConfig()`.
            awaiting (bool, optional): Use Async components. Default: `False`.
        """
        if test_config:
            self.test_config = test_config
        else:
            test_config = self.test_config

        global_config = None
        storage = None

        if not test_config.global_config:
            global_config = (
                AsyncTestGlobalConfig(name=self.name)
                if awaiting
                else SyncTestGlobalConfig(name=self.name)
            )

        if not test_config.storage:
            storage = (
                AsyncTestStorage(name=self.name, global_config=global_config)
                if awaiting
                else SyncTestStorage(name=self.name, global_config=global_config)
            )

        if not test_config.broker:
            self.app.broker = (
                AsyncTestBroker(name=self.name, storage=storage)
                if awaiting
                else SyncTestBroker(name=self.name, storage=storage)
            )

        if not test_config.worker:
            self.app.worker = AsyncTestWorker(name=self.name, broker=self.app.broker) if awaiting else SyncTestWorker(name=self.name, broker=self.app.broker)  # type: ignore

        if not test_config.plugins:
            self.app.plugins.clear()
            self.app.broker.plugins.clear()
            self.app.broker.storage.plugins.clear()
            if self.app.broker.storage.global_config:
                self.app.broker.storage.global_config.plugins.clear()
            self.app.worker.plugins.clear()
            if self.app.starter:
                self.app.starter.plugins.clear()

        return

__init__(app, name=None)

Test case initialization.

Source code in src/qtasks/tests/base.py
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
def __init__(
    self,
    app: Annotated[
        Union["QueueTasks", "aioQueueTasks"],
        Doc("""
                Main copy.
                """),
    ],
    name: Annotated[
        str | None,
        Doc("""
                Project name. This name can be used for test components.

                Default: `None`.
                """),
    ] = None,
):
    """Test case initialization."""
    self.app = app

    self.name = name
    self.config = QueueConfig()
    self.test_config = TestConfig()

settings(test_config=None, awaiting=False)

Test settings.

Parameters:

Name Type Description Default
test_config TestConfig

Test config. Default: TestConfig().

None
awaiting bool

Use Async components. Default: False.

False
Source code in src/qtasks/tests/base.py
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
def settings(
    self, test_config: TestConfig | None = None, awaiting: bool | None = False
) -> None:
    """
    Test settings.

    Args:
        test_config (TestConfig, optional): Test config. Default: `TestConfig()`.
        awaiting (bool, optional): Use Async components. Default: `False`.
    """
    if test_config:
        self.test_config = test_config
    else:
        test_config = self.test_config

    global_config = None
    storage = None

    if not test_config.global_config:
        global_config = (
            AsyncTestGlobalConfig(name=self.name)
            if awaiting
            else SyncTestGlobalConfig(name=self.name)
        )

    if not test_config.storage:
        storage = (
            AsyncTestStorage(name=self.name, global_config=global_config)
            if awaiting
            else SyncTestStorage(name=self.name, global_config=global_config)
        )

    if not test_config.broker:
        self.app.broker = (
            AsyncTestBroker(name=self.name, storage=storage)
            if awaiting
            else SyncTestBroker(name=self.name, storage=storage)
        )

    if not test_config.worker:
        self.app.worker = AsyncTestWorker(name=self.name, broker=self.app.broker) if awaiting else SyncTestWorker(name=self.name, broker=self.app.broker)  # type: ignore

    if not test_config.plugins:
        self.app.plugins.clear()
        self.app.broker.plugins.clear()
        self.app.broker.storage.plugins.clear()
        if self.app.broker.storage.global_config:
            self.app.broker.storage.global_config.plugins.clear()
        self.app.worker.plugins.clear()
        if self.app.starter:
            self.app.starter.plugins.clear()

    return

start(**kwargs) abstractmethod

start(**kwargs) -> None
start(**kwargs) -> None

Launches a test case.

Source code in src/qtasks/tests/base.py
81
82
83
84
@abstractmethod
def start(self, **kwargs) -> None | Awaitable[None]:
    """Launches a test case."""
    pass

stop(**kwargs) abstractmethod

stop(**kwargs) -> None
stop(**kwargs) -> None

Stops the test case.

Source code in src/qtasks/tests/base.py
96
97
98
99
@abstractmethod
def stop(self, **kwargs) -> None | Awaitable[None]:
    """Stops the test case."""
    pass

update_config(config)

Updates the broker config.

Parameters:

Name Type Description Default
config QueueConfig

Config.

required
Source code in src/qtasks/tests/base.py
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
def update_config(
    self,
    config: Annotated[
        QueueConfig,
        Doc("""
                Config.
                """),
    ],
) -> None:
    """
    Updates the broker config.

    Args:
        config (QueueConfig): Config.
    """
    self.config = config
    return