Skip to content

UtilsInspectStats

BaseInspectStats.

UtilsInspectStats

Utilities for inspection of statistics.

Source code in src/qtasks/stats/inspect/base.py
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
class UtilsInspectStats:
    """Utilities for inspection of statistics."""

    label_width = 26

    def _app_parser(
        self, app: Union[QueueTasks, aioQueueTasks], json: bool = False
    ):
        """
        Parser for application information.

        Args:
            app (QueueTasks): Application instance.

        Returns:
            str: Application information.
        """
        lines = []
        plugins_sum = (
            len(app.plugins)
            + len(app.broker.plugins)
            + len(app.worker.plugins)
            + (len(app.starter.plugins) if app.starter else 0)
            + (len(app.broker.storage.plugins) if app.broker.storage else 0)
            + (
                len(app.broker.storage.global_config.plugins)
                if app.broker.storage.global_config
                else 0
            )
        )
        task_info = {
            "Name": app.name,
            "Method": app._method,
            "Version": app.version,
            "Config": str(app.config),
            "Tasks Count": len(app.tasks),
            "Routers Count": len(app.routers),
            "Plugins Count": plugins_sum,
            "Broker": app.broker.__class__.__name__,
            "Worker": app.worker.__class__.__name__,
            "Starter": app.starter.__class__.__name__ if app.starter else "—",
            "Storage": app.broker.storage.__class__.__name__,
            "GlobalConfig": (
                app.broker.storage.global_config.__class__.__name__
                if app.broker.storage.global_config
                else "—"
            ),
            "Log": app.log.__class__.__name__,
        }
        if app.events:
            task_info.update(
                {
                    "Init Count": sum(
                        len(inits) for inits in app.events.on._events.values()
                    ),
                }
            )

        if json:
            return self._parser_json(task_info)

        task_block = "\n".join(
            f"{label:<{self.label_width}}: {value}"
            for label, value in task_info.items()
        )
        lines.append(task_block)
        lines.append("-" * 50)
        return "\n".join(lines)

    def _parser_json(self, data: Any | tuple[Any]) -> str:
        def formatter(d):
            if is_dataclass(d) and not isinstance(d, type):
                return asdict(d)
            return d

        data = (
            [formatter(d) for d in data]
            if isinstance(data, (tuple, list, ValuesView))
            else formatter(data)
        )
        return json.dumps(data, ensure_ascii=False, indent=2, default=str)

    def _tasks_parser(
        self,
        tasks: tuple[TaskExecSchema] | list[TaskExecSchema] | ValuesView[TaskExecSchema],
    ) -> str:
        """Formatted output of all registered tasks."""
        lines = []

        for task in tasks:
            args, kwargs = self._task_get_args_kwargs(task.func)

            task_info = {
                "Task nane": task.name,
                "Priority": task.priority,
                "Description": task.description or "—",
                "Tags": ", ".join(task.tags) if task.tags else "—",
                "Awaiting": task.awaiting,
                "Generating": task.generating,
                "Task Self": task.echo,
                "Args": ", ".join(args) if args else "—",
                "Kwargs": (
                    ", ".join(f"{k}={v}" for k, v in kwargs.items()) if kwargs else "—"
                ),
            }

            if task.retry is not None:
                task_info["Retry Count"] = task.retry
            if task.retry_on_exc:
                task_info["Retry on Exception"] = pformat(task.retry_on_exc)
            if task.decode:
                task_info["Decode"] = str(task.decode)
            if task.generate_handler:
                task_info["Generator"] = str(task.generate_handler)
            if task.executor:
                task_info["Executor"] = str(task.executor)
            if task.middlewares_before:
                task_info["Middlewares Before"] = pformat(task.middlewares_before)
            if task.middlewares_after:
                task_info["Middlewares After"] = pformat(task.middlewares_after)
            if task.extra:
                extra_lines = "\n" + "\n".join(
                    f" * {k}: {v}" for k, v in task.extra.items()
                )
                task_info["Extra"] = extra_lines

            task_block = "\n".join(
                f"{label:<{self.label_width}}: {value}"
                for label, value in task_info.items()
            )

            lines.append(task_block)
            lines.append("-" * 50)

        return "\n".join(lines) or "No registered tasks."

    def _task_get_args_kwargs(self, func):
        """
        Retrieving positional and key arguments of a task function.

        Args:
            func (Callable): Task function.

        Returns:
            tuple: Positional and key arguments.
        """
        sig = signature(func)
        positional_args = []
        keyword_args = {}

        for name, param in sig.parameters.items():
            annotation = param.annotation if param.annotation is not _empty else None

            type_str = (
                f": {annotation.__name__}"
                if isinstance(annotation, type)
                else f": {annotation}" if annotation else ""
            )

            if param.kind in (param.POSITIONAL_ONLY, param.POSITIONAL_OR_KEYWORD):
                if param.default is param.empty:
                    positional_args.append(f"{name}{type_str}")
                else:
                    keyword_args[f"{name}{type_str}"] = param.default
            elif param.kind == param.KEYWORD_ONLY:
                type_str = type_str or ""
                keyword_args[f"{name}{type_str}"] = (
                    param.default if param.default is not param.empty else "required"
                )
            elif param.kind == param.VAR_POSITIONAL:
                positional_args.append(f"*{name}")
            elif param.kind == param.VAR_KEYWORD:
                keyword_args[f"**{name}"] = "..."

        return positional_args, keyword_args