Bases: ABC
BaseTestCase - Абстрактный класс, который является фундаментом для TestCase.
Пример
from qtasks import QueueTasks
from qtasks.tests.base import BaseTestCase
class MyTestCase(BaseTestCase):
def __init__(self, app: QueueTasks, name: str|None = None):
super().__init__(app=app, name=name)
pass
Source code in src/qtasks/tests/base.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129 | class BaseTestCase(ABC):
"""
`BaseTestCase` - Абстрактный класс, который является фундаментом для TestCase.
## Пример
```python
from qtasks import QueueTasks
from qtasks.tests.base import BaseTestCase
class MyTestCase(BaseTestCase):
def __init__(self, app: QueueTasks, name: str|None = None):
super().__init__(app=app, name=name)
pass
```
"""
def __init__(
self,
app: Annotated[
"QueueTasks",
Doc(
"""
Основной экземпляр.
"""
),
],
name: Annotated[
Optional[str],
Doc(
"""
Имя проекта. Это имя может быть использовано для тестовых компонентов.
По умолчанию: `None`.
"""
),
] = None,
):
"""Инициализация тестового кейса."""
self.app = app
self.name = name
self.config = QueueConfig()
self.test_config = TestConfig()
@abstractmethod
def start(self, **kwargs):
"""Запускает кейс тестирования."""
pass
@abstractmethod
def stop(self, **kwargs):
"""Останавливает кейс тестирования."""
pass
def update_config(
self,
config: Annotated[
QueueConfig,
Doc(
"""
Конфиг.
"""
),
],
) -> None:
"""Обновляет конфиг брокера.
Args:
config (QueueConfig): Конфиг.
"""
self.config = config
return
def settings(self, test_config: TestConfig = None) -> None:
"""Настройки тестирования.
Args:
test_config (TestConfig, optional): Конфиг тестирования. По умолчанию: `TestConfig()`.
"""
if test_config:
self.test_config = test_config
else:
test_config = self.test_config
global_config = None
storage = None
if not test_config.global_config:
global_config = AsyncTestGlobalConfig(name=self.name)
if not test_config.storage:
storage = AsyncTestStorage(name=self.name, global_config=global_config)
if not test_config.broker:
self.app.broker = AsyncTestBroker(name=self.name, storage=storage)
if not test_config.worker:
self.app.worker = AsyncTestWorker(name=self.name, broker=self.app.broker)
if not test_config.plugins:
self.app.plugins.clear()
self.app.broker.plugins.clear()
self.app.broker.storage.plugins.clear()
self.app.broker.storage.global_config.plugins.clear()
self.app.worker.plugins.clear()
if self.app.starter:
self.app.starter.plugins.clear()
return
|
__init__(app, name=None)
Инициализация тестового кейса.
Source code in src/qtasks/tests/base.py
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63 | def __init__(
self,
app: Annotated[
"QueueTasks",
Doc(
"""
Основной экземпляр.
"""
),
],
name: Annotated[
Optional[str],
Doc(
"""
Имя проекта. Это имя может быть использовано для тестовых компонентов.
По умолчанию: `None`.
"""
),
] = None,
):
"""Инициализация тестового кейса."""
self.app = app
self.name = name
self.config = QueueConfig()
self.test_config = TestConfig()
|
settings(test_config=None)
Настройки тестирования.
Parameters:
| Name |
Type |
Description |
Default |
test_config
|
TestConfig
|
Конфиг тестирования. По умолчанию: TestConfig().
|
None
|
Source code in src/qtasks/tests/base.py
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129 | def settings(self, test_config: TestConfig = None) -> None:
"""Настройки тестирования.
Args:
test_config (TestConfig, optional): Конфиг тестирования. По умолчанию: `TestConfig()`.
"""
if test_config:
self.test_config = test_config
else:
test_config = self.test_config
global_config = None
storage = None
if not test_config.global_config:
global_config = AsyncTestGlobalConfig(name=self.name)
if not test_config.storage:
storage = AsyncTestStorage(name=self.name, global_config=global_config)
if not test_config.broker:
self.app.broker = AsyncTestBroker(name=self.name, storage=storage)
if not test_config.worker:
self.app.worker = AsyncTestWorker(name=self.name, broker=self.app.broker)
if not test_config.plugins:
self.app.plugins.clear()
self.app.broker.plugins.clear()
self.app.broker.storage.plugins.clear()
self.app.broker.storage.global_config.plugins.clear()
self.app.worker.plugins.clear()
if self.app.starter:
self.app.starter.plugins.clear()
return
|
start(**kwargs)
abstractmethod
Запускает кейс тестирования.
Source code in src/qtasks/tests/base.py
| @abstractmethod
def start(self, **kwargs):
"""Запускает кейс тестирования."""
pass
|
stop(**kwargs)
abstractmethod
Останавливает кейс тестирования.
Source code in src/qtasks/tests/base.py
| @abstractmethod
def stop(self, **kwargs):
"""Останавливает кейс тестирования."""
pass
|
update_config(config)
Обновляет конфиг брокера.
Parameters:
| Name |
Type |
Description |
Default |
config
|
QueueConfig
|
|
required
|
Source code in src/qtasks/tests/base.py
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92 | def update_config(
self,
config: Annotated[
QueueConfig,
Doc(
"""
Конфиг.
"""
),
],
) -> None:
"""Обновляет конфиг брокера.
Args:
config (QueueConfig): Конфиг.
"""
self.config = config
return
|