Перейти к содержанию

Тестирование

QTasks поддерживает тестирование задач как в синхронном, так и в асинхронном режиме. Вы можете использовать стандартные инструменты Python, такие как unittest, а также асинхронные библиотеки — например, aiounittest и pytest-asyncio.

Быстрый запуск тестов из консоли

py tests/main.py

Поддерживаемые фреймворки

  • unittest — для синхронных задач и базовой структуры.
  • aiounittest — для асинхронных задач и компонентов.
  • pytest + pytest-asyncio — для гибкого и мощного асинхронного тестирования.
  • SyncTestCase/AsyncTestCase — внутренние кейсы QTasks для точного управления задачами.

Примеры

unittest (синхронно)

import unittest
from app import app

class TestTasks(unittest.TestCase):
    def setUp(self):
        self._result = app.add_task(task_name="test", 5)

    def test_task_get_result(self):
        uuid = self._result.uuid
        result = app.get(uuid=uuid)
        self.assertIsNotNone(result)

unittest (асинхронно)

import unittest

from qtasks.schemas.task import Task
from app import app

class TestAsyncTasks(unittest.IsolatedAsyncioTestCase):
    async def _add_task(self) -> Task | None:
        return await app.add_task(task_name="test", 5)

    async def test_task_get_result(self):
        uuid = (await self._add_task()).uuid
        result = await app.get(uuid=uuid)
        self.assertIsNotNone(result)

SyncTestCase / AsyncTestCase

import unittest

from qtasks.tests import SyncTestCase
from qtasks.schemas.test import TestConfig
from app import app

class TestTasks(unittest.TestCase):
    def setUp(self):
        self.test_case = SyncTestCase(app=app)

    def test_task_add(self):
        self.test_case.settings(TestConfig.full())
        result = self.test_case.add_task(task_name="test", 5)
        self.assertIsNotNone(result)
import unittest

from qtasks.tests import AsyncTestCase
from qtasks.schemas.test import TestConfig
from qtasks.schemas.task import Task
from app import app

class TestAsyncQTasks(unittest.IsolatedAsyncioTestCase):
    def setUp(self):
        self.test_case = AsyncTestCase(app=app)

    async def _add_task(self) -> Task | None:
        return await self.test_case.add_task("test", args=(5,), timeout=10)

    async def test_add_task(self):
        self.test_case.settings(TestConfig.full())
        result = await self._add_task()
        self.assertIsNotNone(result)

pytest

Установка

pip install pytest pytest-asyncio

tests/test_async_task.py

import pytest
from uuid import uuid4

from qtasks.tests import AsyncTestCase
from qtasks.schemas.test import TestConfig
from qtasks.enums.task_status import TaskStatusEnum

from tests.apps.app_async import app

@pytest.fixture()
def test_case():
    case = AsyncTestCase(app=app)
    case.settings(TestConfig.full())
    return case

@pytest.mark.asyncio
async def test_task_get_result(test_case):
    task = await test_case.add_task("test", args=(5,))
    result = await app.get(uuid=task.uuid)
    assert result is not None

@pytest.mark.asyncio
async def test_task_returns_expected_result(test_case):
    task = await test_case.add_task("test", args=(5,), timeout=10)
    assert task.returning == "Пользователь 5 записан"

@pytest.mark.asyncio
async def test_task_error_handling(test_case):
    task = await test_case.add_task("error_task", timeout=10)
    assert task.status == TaskStatusEnum.ERROR.value

@pytest.mark.asyncio
async def test_task_not_found():
    result = await app.get(uuid=str(uuid4()))
    assert result is None

Запуск

pytest tests/test_async_task.py -v