Skip to content

Роутер

Router for task execution.

AsyncRouter

Bases: AsyncPluginMixin

A router that stores tasks that the main QueueTasks connects to itself.

Example

from qtasks import QueueTasks, AsyncRouter

app = QueueTasks()

router = AsyncRouter()

@router.task()
async def test():
    pass

app.include_router(router)
Source code in src/qtasks/routers/async_router.py
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
class AsyncRouter(AsyncPluginMixin):
    """
    A router that stores tasks that the main `QueueTasks` connects to itself.

    ## Example

    ```python
    from qtasks import QueueTasks, AsyncRouter

    app = QueueTasks()

    router = AsyncRouter()

    @router.task()
    async def test():
        pass

    app.include_router(router)
    ```
    """

    def __init__(self) -> None:
        """Initializing the router."""
        self.tasks: Annotated[
            dict[str, TaskExecSchema],
            Doc("""
                Tasks, type `{task_name:qtasks.schemas.TaskExecSchema}`.

                Default: `Empty dictionary`.
                """),
        ] = {}

        self.plugins: dict[str, list[BasePlugin]] = {}

    def task(
        self,
        name: Annotated[
            str | Callable | None,
            Doc("""
                    Task name.

                    Default: `func.__name__`.
                    """),
        ] = None,
        *,
        priority: Annotated[
            int | None,
            Doc("""
                    The task has priority by default.

                    Default: `config.task_default_priority`.
                    """),
        ] = None,
        echo: Annotated[
            bool,
            Doc("""
                    Add (A)syncTask as the first parameter.

                    Default: `False`.
                    """),
        ] = False,
        max_time: Annotated[
            float | None,
            Doc("""
                    The maximum time it takes to complete a task in seconds.

                    Default: `None`.
                    """),
        ] = None,
        retry: Annotated[
            int | None,
            Doc("""
                    The number of attempts to retry the task.

                    Default: `None`.
                    """),
        ] = None,
        retry_on_exc: Annotated[
            list[type[Exception]] | None,
            Doc("""
                    Exceptions under which the task will be re-executed.

                    Default: `None`.
                    """),
        ] = None,
        decode: Annotated[
            Callable | None,
            Doc("""
                    Task result decoder.

                    Default: `None`.
                """),
        ] = None,
        tags: Annotated[
            list[str] | None,
            Doc("""
                    Task tags.

                    Default: `None`.
                """),
        ] = None,
        description: Annotated[
            str | None,
            Doc("""
                    Description of the task.

                    Default: `None`.
                """),
        ] = None,
        generate_handler: Annotated[
            Callable | None,
            Doc("""
                    Handler generator.

                    Default: `None`.
                    """),
        ] = None,
        executor: Annotated[
            type[BaseTaskExecutor] | None,
            Doc("""
                    Class `BaseTaskExecutor`.

                    Default: `SyncTaskExecutor`.
                    """),
        ] = None,
        middlewares_before: Annotated[
            list[type[TaskMiddleware]] | None,
            Doc("""
                    Middleware that will be executed before the task.

                    Default: `Empty array`.
                    """),
        ] = None,
        middlewares_after: Annotated[
            list[type[TaskMiddleware]] | None,
            Doc("""
                    Middleware that will be executed after the task.

                    Default: `Empty array`.
                    """),
        ] = None,
        **kwargs,
    ) -> AsyncTask[P, R] | Callable[[Callable[P, R]], AsyncTask[P, R]]:
        """
        Decorator for registering tasks.

        Args:
            name (str, optional): Name of the task. Default: `func.__name__`.
            priority (int, optional): The task's default priority. Default: `config.task_default_priority`.
            echo (bool, optional): Add (A)syncTask as the first parameter. Default: `False`.
            max_time (float, optional): The maximum time the task will take to complete in seconds. Default: `None`.
            retry (int, optional): Number of attempts to retry the task. Default: `None`.
            retry_on_exc (List[Type[Exception]], optional): Exceptions under which the task will be re-executed. Default: `None`.
            decode (Callable, optional): Decoder of the task result. Default: `None`.
            tags (List[str], optional): Task tags. Default: `None`.
            description (str, optional): Description of the task. Default: `None`.
            generate_handler (Callable, optional): Handler generator. Default: `None`.
            executor (Type["BaseTaskExecutor"], optional): Class `BaseTaskExecutor`. Default: `SyncTaskExecutor`.
            middlewares_before (List[Type["TaskMiddleware"]], optional): Middleware that will be executed before the task. Default: `Empty array`.
            middlewares_after (List[Type["TaskMiddleware"]], optional): Middleware that will be executed after the task. Default: `Empty array`.

        Raises:
            ValueError: If a task with the same name is already registered.

        Returns:
            SyncTask: Decorator for registering a task.
        """

        def wrapper(func: Callable[P, R]):
            nonlocal priority, middlewares_before, middlewares_after

            task_name = name or func.__name__ if not callable(name) else name.__name__
            if task_name in self.tasks:
                raise ValueError(f"Task with name {task_name} is already registered!")

            if priority is None:
                priority = 0

            generating = False
            if inspect.isgeneratorfunction(func):
                generating = "sync"
            if inspect.isasyncgenfunction(func):
                generating = "async"

            middlewares_before = middlewares_before or []
            middlewares_after = middlewares_after or []

            model = TaskExecSchema(
                name=task_name,
                priority=priority,
                func=func,
                awaiting=inspect.iscoroutinefunction(func),
                generating=generating,
                echo=echo,
                max_time=max_time,
                retry=retry,
                retry_on_exc=retry_on_exc,
                decode=decode,
                tags=tags,
                description=description,
                generate_handler=generate_handler,
                executor=executor,
                middlewares_before=middlewares_before,
                middlewares_after=middlewares_after,
                extra=kwargs,
            )

            self.tasks[task_name] = model

            return AsyncTask(
                task_name=model.name,
                priority=model.priority,
                echo=model.echo,
                max_time=model.max_time,
                retry=model.retry,
                retry_on_exc=model.retry_on_exc,
                decode=model.decode,
                tags=model.tags,
                description=model.description,
                generate_handler=model.generate_handler,
                executor=model.executor,
                middlewares_before=model.middlewares_before,
                middlewares_after=model.middlewares_after,
            )

        return wrapper

__init__()

Initializing the router.

Source code in src/qtasks/routers/async_router.py
45
46
47
48
49
50
51
52
53
54
55
56
def __init__(self) -> None:
    """Initializing the router."""
    self.tasks: Annotated[
        dict[str, TaskExecSchema],
        Doc("""
            Tasks, type `{task_name:qtasks.schemas.TaskExecSchema}`.

            Default: `Empty dictionary`.
            """),
    ] = {}

    self.plugins: dict[str, list[BasePlugin]] = {}

task(name=None, *, priority=None, echo=False, max_time=None, retry=None, retry_on_exc=None, decode=None, tags=None, description=None, generate_handler=None, executor=None, middlewares_before=None, middlewares_after=None, **kwargs)

Decorator for registering tasks.

Parameters:

Name Type Description Default
name str

Name of the task. Default: func.__name__.

None
priority int

The task's default priority. Default: config.task_default_priority.

None
echo bool

Add (A)syncTask as the first parameter. Default: False.

False
max_time float

The maximum time the task will take to complete in seconds. Default: None.

None
retry int

Number of attempts to retry the task. Default: None.

None
retry_on_exc List[Type[Exception]]

Exceptions under which the task will be re-executed. Default: None.

None
decode Callable

Decoder of the task result. Default: None.

None
tags List[str]

Task tags. Default: None.

None
description str

Description of the task. Default: None.

None
generate_handler Callable

Handler generator. Default: None.

None
executor Type['BaseTaskExecutor']

Class BaseTaskExecutor. Default: SyncTaskExecutor.

None
middlewares_before List[Type['TaskMiddleware']]

Middleware that will be executed before the task. Default: Empty array.

None
middlewares_after List[Type['TaskMiddleware']]

Middleware that will be executed after the task. Default: Empty array.

None

Raises:

Type Description
ValueError

If a task with the same name is already registered.

Returns:

Name Type Description
SyncTask AsyncTask[P, R] | Callable[[Callable[P, R]], AsyncTask[P, R]]

Decorator for registering a task.

Source code in src/qtasks/routers/async_router.py
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
def task(
    self,
    name: Annotated[
        str | Callable | None,
        Doc("""
                Task name.

                Default: `func.__name__`.
                """),
    ] = None,
    *,
    priority: Annotated[
        int | None,
        Doc("""
                The task has priority by default.

                Default: `config.task_default_priority`.
                """),
    ] = None,
    echo: Annotated[
        bool,
        Doc("""
                Add (A)syncTask as the first parameter.

                Default: `False`.
                """),
    ] = False,
    max_time: Annotated[
        float | None,
        Doc("""
                The maximum time it takes to complete a task in seconds.

                Default: `None`.
                """),
    ] = None,
    retry: Annotated[
        int | None,
        Doc("""
                The number of attempts to retry the task.

                Default: `None`.
                """),
    ] = None,
    retry_on_exc: Annotated[
        list[type[Exception]] | None,
        Doc("""
                Exceptions under which the task will be re-executed.

                Default: `None`.
                """),
    ] = None,
    decode: Annotated[
        Callable | None,
        Doc("""
                Task result decoder.

                Default: `None`.
            """),
    ] = None,
    tags: Annotated[
        list[str] | None,
        Doc("""
                Task tags.

                Default: `None`.
            """),
    ] = None,
    description: Annotated[
        str | None,
        Doc("""
                Description of the task.

                Default: `None`.
            """),
    ] = None,
    generate_handler: Annotated[
        Callable | None,
        Doc("""
                Handler generator.

                Default: `None`.
                """),
    ] = None,
    executor: Annotated[
        type[BaseTaskExecutor] | None,
        Doc("""
                Class `BaseTaskExecutor`.

                Default: `SyncTaskExecutor`.
                """),
    ] = None,
    middlewares_before: Annotated[
        list[type[TaskMiddleware]] | None,
        Doc("""
                Middleware that will be executed before the task.

                Default: `Empty array`.
                """),
    ] = None,
    middlewares_after: Annotated[
        list[type[TaskMiddleware]] | None,
        Doc("""
                Middleware that will be executed after the task.

                Default: `Empty array`.
                """),
    ] = None,
    **kwargs,
) -> AsyncTask[P, R] | Callable[[Callable[P, R]], AsyncTask[P, R]]:
    """
    Decorator for registering tasks.

    Args:
        name (str, optional): Name of the task. Default: `func.__name__`.
        priority (int, optional): The task's default priority. Default: `config.task_default_priority`.
        echo (bool, optional): Add (A)syncTask as the first parameter. Default: `False`.
        max_time (float, optional): The maximum time the task will take to complete in seconds. Default: `None`.
        retry (int, optional): Number of attempts to retry the task. Default: `None`.
        retry_on_exc (List[Type[Exception]], optional): Exceptions under which the task will be re-executed. Default: `None`.
        decode (Callable, optional): Decoder of the task result. Default: `None`.
        tags (List[str], optional): Task tags. Default: `None`.
        description (str, optional): Description of the task. Default: `None`.
        generate_handler (Callable, optional): Handler generator. Default: `None`.
        executor (Type["BaseTaskExecutor"], optional): Class `BaseTaskExecutor`. Default: `SyncTaskExecutor`.
        middlewares_before (List[Type["TaskMiddleware"]], optional): Middleware that will be executed before the task. Default: `Empty array`.
        middlewares_after (List[Type["TaskMiddleware"]], optional): Middleware that will be executed after the task. Default: `Empty array`.

    Raises:
        ValueError: If a task with the same name is already registered.

    Returns:
        SyncTask: Decorator for registering a task.
    """

    def wrapper(func: Callable[P, R]):
        nonlocal priority, middlewares_before, middlewares_after

        task_name = name or func.__name__ if not callable(name) else name.__name__
        if task_name in self.tasks:
            raise ValueError(f"Task with name {task_name} is already registered!")

        if priority is None:
            priority = 0

        generating = False
        if inspect.isgeneratorfunction(func):
            generating = "sync"
        if inspect.isasyncgenfunction(func):
            generating = "async"

        middlewares_before = middlewares_before or []
        middlewares_after = middlewares_after or []

        model = TaskExecSchema(
            name=task_name,
            priority=priority,
            func=func,
            awaiting=inspect.iscoroutinefunction(func),
            generating=generating,
            echo=echo,
            max_time=max_time,
            retry=retry,
            retry_on_exc=retry_on_exc,
            decode=decode,
            tags=tags,
            description=description,
            generate_handler=generate_handler,
            executor=executor,
            middlewares_before=middlewares_before,
            middlewares_after=middlewares_after,
            extra=kwargs,
        )

        self.tasks[task_name] = model

        return AsyncTask(
            task_name=model.name,
            priority=model.priority,
            echo=model.echo,
            max_time=model.max_time,
            retry=model.retry,
            retry_on_exc=model.retry_on_exc,
            decode=model.decode,
            tags=model.tags,
            description=model.description,
            generate_handler=model.generate_handler,
            executor=model.executor,
            middlewares_before=model.middlewares_before,
            middlewares_after=model.middlewares_after,
        )

    return wrapper

Router for task execution.

SyncRouter

Bases: SyncPluginMixin

A router that stores tasks that the main QueueTasks connects to itself.

Example

```python from qtasks import QueueTasks, SyncRouter

app = QueueTasks()

router = SyncRouter()

@router.task() def test(): pass

app.include_router(router) ```

Source code in src/qtasks/routers/sync_router.py
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
class SyncRouter(SyncPluginMixin):
    """
    A router that stores tasks that the main `QueueTasks` connects to itself.

    ## Example

    ```python
    from qtasks import QueueTasks, SyncRouter

    app = QueueTasks()

    router = SyncRouter()

    @router.task()
    def test():
        pass

    app.include_router(router)
        ```
    """

    def __init__(self) -> None:
        """Initializing the router."""
        self.tasks: Annotated[
            dict[str, TaskExecSchema],
            Doc("""
                Tasks, type `{task_name:qtasks.schemas.TaskExecSchema}`.

                Default: `Empty dictionary`.
                """),
        ] = {}

        self.plugins: dict[str, list[BasePlugin]] = {}

    def task(
        self,
        name: Annotated[
            str | Callable | None,
            Doc("""
                    Task name.

                    Default: `func.__name__`.
                    """),
        ] = None,
        *,
        priority: Annotated[
            int | None,
            Doc("""
                    The task has priority by default.

                    Default: `config.task_default_priority`.
                    """),
        ] = None,
        echo: Annotated[
            bool,
            Doc("""
                    Add (A)syncTask as the first parameter.

                    Default: `False`.
                    """),
        ] = False,
        max_time: Annotated[
            float | None,
            Doc("""
                    The maximum time it takes to complete a task in seconds.

                    Default: `None`.
                    """),
        ] = None,
        retry: Annotated[
            int | None,
            Doc("""
                    The number of attempts to retry the task.

                    Default: `None`.
                    """),
        ] = None,
        retry_on_exc: Annotated[
            list[type[Exception]] | None,
            Doc("""
                    Exceptions under which the task will be re-executed.

                    Default: `None`.
                    """),
        ] = None,
        decode: Annotated[
            Callable | None,
            Doc("""
                    Task result decoder.

                    Default: `None`.
                """),
        ] = None,
        tags: Annotated[
            list[str] | None,
            Doc("""
                    Task tags.

                    Default: `None`.
                """),
        ] = None,
        description: Annotated[
            str | None,
            Doc("""
                    Description of the task.

                    Default: `None`.
                """),
        ] = None,
        generate_handler: Annotated[
            Callable | None,
            Doc("""
                    Handler generator.

                    Default: `None`.
                    """),
        ] = None,
        executor: Annotated[
            type[BaseTaskExecutor] | None,
            Doc("""
                    Class `BaseTaskExecutor`.

                    Default: `SyncTaskExecutor`.
                    """),
        ] = None,
        middlewares_before: Annotated[
            list[type[TaskMiddleware]] | None,
            Doc("""
                    Middleware that will be executed before the task.

                    Default: `Empty array`.
                    """),
        ] = None,
        middlewares_after: Annotated[
            list[type[TaskMiddleware]] | None,
            Doc("""
                    Middleware that will be executed after the task.

                    Default: `Empty array`.
                    """),
        ] = None,
        **kwargs,
    ) -> SyncTask[P, R] | Callable[[Callable[P, R]], SyncTask[P, R]]:
        """
        Decorator for registering tasks.

        Args:
            name (str, optional): Name of the task. Default: `func.__name__`.
            priority (int, optional): The task's default priority. Default: `config.task_default_priority`.
            echo (bool, optional): Add (A)syncTask as the first parameter. Default: `False`.
            max_time (float, optional): The maximum time the task will take to complete in seconds. Default: `None`.
            retry (int, optional): Number of attempts to retry the task. Default: `None`.
            retry_on_exc (List[Type[Exception]], optional): Exceptions under which the task will be re-executed. Default: `None`.
            decode (Callable, optional): Decoder of the task result. Default: `None`.
            tags (List[str], optional): Task tags. Default: `None`.
            description (str, optional): Description of the task. Default: `None`.
            generate_handler (Callable, optional): Handler generator. Default: `None`.
            executor (Type["BaseTaskExecutor"], optional): Class `BaseTaskExecutor`. Default: `SyncTaskExecutor`.
            middlewares_before (List[Type["TaskMiddleware"]], optional): Middleware that will be executed before the task. Default: `Empty array`.
            middlewares_after (List[Type["TaskMiddleware"]], optional): Middleware that will be executed after the task. Default: `Empty array`.

        Raises:
            ValueError: If a task with the same name is already registered.

        Returns:
            SyncTask: Decorator for registering a task.
        """

        def wrapper(func: Callable[P, R]):
            nonlocal priority, middlewares_before, middlewares_after

            task_name = name or func.__name__ if not callable(name) else name.__name__
            if task_name in self.tasks:
                raise ValueError(f"Task with name {task_name} is already registered!")

            if priority is None:
                priority = 0

            generating = False
            if inspect.isgeneratorfunction(func):
                generating = "sync"
            if inspect.isasyncgenfunction(func):
                generating = "async"

            middlewares_before = middlewares_before or []
            middlewares_after = middlewares_after or []

            model = TaskExecSchema(
                name=task_name,
                priority=priority,
                func=func,
                awaiting=inspect.iscoroutinefunction(func),
                generating=generating,
                echo=echo,
                max_time=max_time,
                retry=retry,
                retry_on_exc=retry_on_exc,
                decode=decode,
                tags=tags,
                description=description,
                generate_handler=generate_handler,
                executor=executor,
                middlewares_before=middlewares_before,
                middlewares_after=middlewares_after,
                extra=kwargs,
            )

            self.tasks[task_name] = model

            return SyncTask(
                task_name=model.name,
                priority=model.priority,
                echo=model.echo,
                max_time=model.max_time,
                retry=model.retry,
                retry_on_exc=model.retry_on_exc,
                decode=model.decode,
                tags=model.tags,
                description=model.description,
                generate_handler=model.generate_handler,
                executor=model.executor,
                middlewares_before=model.middlewares_before,
                middlewares_after=model.middlewares_after,
            )

        return wrapper

__init__()

Initializing the router.

Source code in src/qtasks/routers/sync_router.py
45
46
47
48
49
50
51
52
53
54
55
56
def __init__(self) -> None:
    """Initializing the router."""
    self.tasks: Annotated[
        dict[str, TaskExecSchema],
        Doc("""
            Tasks, type `{task_name:qtasks.schemas.TaskExecSchema}`.

            Default: `Empty dictionary`.
            """),
    ] = {}

    self.plugins: dict[str, list[BasePlugin]] = {}

task(name=None, *, priority=None, echo=False, max_time=None, retry=None, retry_on_exc=None, decode=None, tags=None, description=None, generate_handler=None, executor=None, middlewares_before=None, middlewares_after=None, **kwargs)

Decorator for registering tasks.

Parameters:

Name Type Description Default
name str

Name of the task. Default: func.__name__.

None
priority int

The task's default priority. Default: config.task_default_priority.

None
echo bool

Add (A)syncTask as the first parameter. Default: False.

False
max_time float

The maximum time the task will take to complete in seconds. Default: None.

None
retry int

Number of attempts to retry the task. Default: None.

None
retry_on_exc List[Type[Exception]]

Exceptions under which the task will be re-executed. Default: None.

None
decode Callable

Decoder of the task result. Default: None.

None
tags List[str]

Task tags. Default: None.

None
description str

Description of the task. Default: None.

None
generate_handler Callable

Handler generator. Default: None.

None
executor Type['BaseTaskExecutor']

Class BaseTaskExecutor. Default: SyncTaskExecutor.

None
middlewares_before List[Type['TaskMiddleware']]

Middleware that will be executed before the task. Default: Empty array.

None
middlewares_after List[Type['TaskMiddleware']]

Middleware that will be executed after the task. Default: Empty array.

None

Raises:

Type Description
ValueError

If a task with the same name is already registered.

Returns:

Name Type Description
SyncTask SyncTask[P, R] | Callable[[Callable[P, R]], SyncTask[P, R]]

Decorator for registering a task.

Source code in src/qtasks/routers/sync_router.py
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
def task(
    self,
    name: Annotated[
        str | Callable | None,
        Doc("""
                Task name.

                Default: `func.__name__`.
                """),
    ] = None,
    *,
    priority: Annotated[
        int | None,
        Doc("""
                The task has priority by default.

                Default: `config.task_default_priority`.
                """),
    ] = None,
    echo: Annotated[
        bool,
        Doc("""
                Add (A)syncTask as the first parameter.

                Default: `False`.
                """),
    ] = False,
    max_time: Annotated[
        float | None,
        Doc("""
                The maximum time it takes to complete a task in seconds.

                Default: `None`.
                """),
    ] = None,
    retry: Annotated[
        int | None,
        Doc("""
                The number of attempts to retry the task.

                Default: `None`.
                """),
    ] = None,
    retry_on_exc: Annotated[
        list[type[Exception]] | None,
        Doc("""
                Exceptions under which the task will be re-executed.

                Default: `None`.
                """),
    ] = None,
    decode: Annotated[
        Callable | None,
        Doc("""
                Task result decoder.

                Default: `None`.
            """),
    ] = None,
    tags: Annotated[
        list[str] | None,
        Doc("""
                Task tags.

                Default: `None`.
            """),
    ] = None,
    description: Annotated[
        str | None,
        Doc("""
                Description of the task.

                Default: `None`.
            """),
    ] = None,
    generate_handler: Annotated[
        Callable | None,
        Doc("""
                Handler generator.

                Default: `None`.
                """),
    ] = None,
    executor: Annotated[
        type[BaseTaskExecutor] | None,
        Doc("""
                Class `BaseTaskExecutor`.

                Default: `SyncTaskExecutor`.
                """),
    ] = None,
    middlewares_before: Annotated[
        list[type[TaskMiddleware]] | None,
        Doc("""
                Middleware that will be executed before the task.

                Default: `Empty array`.
                """),
    ] = None,
    middlewares_after: Annotated[
        list[type[TaskMiddleware]] | None,
        Doc("""
                Middleware that will be executed after the task.

                Default: `Empty array`.
                """),
    ] = None,
    **kwargs,
) -> SyncTask[P, R] | Callable[[Callable[P, R]], SyncTask[P, R]]:
    """
    Decorator for registering tasks.

    Args:
        name (str, optional): Name of the task. Default: `func.__name__`.
        priority (int, optional): The task's default priority. Default: `config.task_default_priority`.
        echo (bool, optional): Add (A)syncTask as the first parameter. Default: `False`.
        max_time (float, optional): The maximum time the task will take to complete in seconds. Default: `None`.
        retry (int, optional): Number of attempts to retry the task. Default: `None`.
        retry_on_exc (List[Type[Exception]], optional): Exceptions under which the task will be re-executed. Default: `None`.
        decode (Callable, optional): Decoder of the task result. Default: `None`.
        tags (List[str], optional): Task tags. Default: `None`.
        description (str, optional): Description of the task. Default: `None`.
        generate_handler (Callable, optional): Handler generator. Default: `None`.
        executor (Type["BaseTaskExecutor"], optional): Class `BaseTaskExecutor`. Default: `SyncTaskExecutor`.
        middlewares_before (List[Type["TaskMiddleware"]], optional): Middleware that will be executed before the task. Default: `Empty array`.
        middlewares_after (List[Type["TaskMiddleware"]], optional): Middleware that will be executed after the task. Default: `Empty array`.

    Raises:
        ValueError: If a task with the same name is already registered.

    Returns:
        SyncTask: Decorator for registering a task.
    """

    def wrapper(func: Callable[P, R]):
        nonlocal priority, middlewares_before, middlewares_after

        task_name = name or func.__name__ if not callable(name) else name.__name__
        if task_name in self.tasks:
            raise ValueError(f"Task with name {task_name} is already registered!")

        if priority is None:
            priority = 0

        generating = False
        if inspect.isgeneratorfunction(func):
            generating = "sync"
        if inspect.isasyncgenfunction(func):
            generating = "async"

        middlewares_before = middlewares_before or []
        middlewares_after = middlewares_after or []

        model = TaskExecSchema(
            name=task_name,
            priority=priority,
            func=func,
            awaiting=inspect.iscoroutinefunction(func),
            generating=generating,
            echo=echo,
            max_time=max_time,
            retry=retry,
            retry_on_exc=retry_on_exc,
            decode=decode,
            tags=tags,
            description=description,
            generate_handler=generate_handler,
            executor=executor,
            middlewares_before=middlewares_before,
            middlewares_after=middlewares_after,
            extra=kwargs,
        )

        self.tasks[task_name] = model

        return SyncTask(
            task_name=model.name,
            priority=model.priority,
            echo=model.echo,
            max_time=model.max_time,
            retry=model.retry,
            retry_on_exc=model.retry_on_exc,
            decode=model.decode,
            tags=model.tags,
            description=model.description,
            generate_handler=model.generate_handler,
            executor=model.executor,
            middlewares_before=model.middlewares_before,
            middlewares_after=model.middlewares_after,
        )

    return wrapper