Coverage for src/debputy/commands/debputy_cmd/output.py: 22%
215 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-10-12 15:06 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-10-12 15:06 +0000
1import argparse
2import contextlib
3import os
4import re
5import shutil
6import subprocess
7import sys
8from typing import (
9 Union,
10 IO,
11 Tuple,
12 Optional,
13 Any,
14)
15from collections.abc import Sequence, Iterator, Mapping
17from debputy.util import assume_not_none
19try:
20 import colored
22 if ( 22 ↛ 28line 22 didn't jump to line 28 because the condition on line 22 was never true
23 not hasattr(colored, "Style")
24 or not hasattr(colored, "Fore")
25 or not hasattr(colored, "Back")
26 ):
27 # Seen with python3-colored v1 (bookworm)
28 raise ImportError
29except ImportError:
30 colored = None
33def _pager() -> str | None:
34 pager = os.environ.get("DEBPUTY_PAGER")
35 if pager is None:
36 pager = os.environ.get("PAGER")
37 if pager is None and shutil.which("less") is not None:
38 pager = "less"
39 return pager
42URL_START = "\033]8;;"
43URL_END = "\033]8;;\a"
44MAN_URL_REWRITE = re.compile(r"man:(\S+)[(](\d+)[)]")
46_SUPPORTED_COLORS = {
47 "black",
48 "red",
49 "green",
50 "yellow",
51 "blue",
52 "magenta",
53 "cyan",
54 "white",
55}
56_SUPPORTED_STYLES = {"none", "bold"}
59class OutputStyle:
61 def colored(
62 self,
63 text: str,
64 *,
65 fg: str | None = None,
66 bg: str | None = None,
67 style: str | None = None,
68 ) -> str:
69 self._check_color(fg)
70 self._check_color(bg)
71 self._check_text_style(style)
72 return text
74 @property
75 def supports_colors(self) -> bool:
76 return False
78 def _check_color(self, color: str | None) -> None:
79 if color is not None and color not in _SUPPORTED_COLORS:
80 raise ValueError(
81 f"Unsupported color: {color}. Only the following are supported {','.join(_SUPPORTED_COLORS)}"
82 )
84 def _check_text_style(self, style: str | None) -> None:
85 if style is not None and style not in _SUPPORTED_STYLES:
86 raise ValueError(
87 f"Unsupported style: {style}. Only the following are supported {','.join(_SUPPORTED_STYLES)}"
88 )
90 def heading(self, heading: str, level: int) -> str:
91 return heading
93 def render_url(self, link_url: str) -> str:
94 return link_url
96 def bts(self, bugno) -> str:
97 return self.render_url(f"https://bugs.debian.org/{bugno}")
100class MarkdownOutputStyle(OutputStyle):
101 def colored(
102 self,
103 text: str,
104 *,
105 fg: str | None = None,
106 bg: str | None = None,
107 style: str | None = None,
108 ) -> str:
109 result = super().colored(text, fg=fg, bg=bg, style=style)
110 if style == "bold":
111 return f"**{result}**"
112 return result
114 def heading(self, heading: str, level: int) -> str:
115 prefix = "#" * level
116 return f"{prefix} {heading}"
118 def render_url(self, link_url: str) -> str:
119 if link_url.startswith("man:"):
120 # Rewrite man page to a clickable link for markdown
121 m = MAN_URL_REWRITE.match(link_url)
122 if m:
123 page, section = m.groups()
124 man_page_url = f"https://manpages.debian.org/{page}.{section}"
125 return f"[{link_url}]({man_page_url})"
126 return f"<{link_url}>"
129class IOBasedOutputStyling(OutputStyle):
130 def __init__(
131 self,
132 stream: IO[str],
133 output_format: str,
134 *,
135 optimize_for_screen_reader: bool = False,
136 ) -> None:
137 self.stream = stream
138 self.output_format = output_format
139 self.optimize_for_screen_reader = optimize_for_screen_reader
140 self._color_support = None
142 def print_list_table(
143 self,
144 headers: Sequence[str | tuple[str, str]],
145 rows: Sequence[Sequence[str]],
146 ) -> None:
147 if rows:
148 if any(len(r) != len(rows[0]) for r in rows):
149 raise ValueError(
150 "Unbalanced table: All rows must have the same column count"
151 )
152 if len(rows[0]) != len(headers):
153 raise ValueError(
154 "Unbalanced table: header list does not agree with row list on number of columns"
155 )
157 if not headers:
158 raise ValueError("No headers provided!?")
160 cadjust = {}
161 header_names = []
162 for c in headers:
163 if isinstance(c, str):
164 header_names.append(c)
165 else:
166 cname, adjust = c
167 header_names.append(cname)
168 cadjust[cname] = adjust
170 if self.output_format == "csv":
171 from csv import writer
173 w = writer(self.stream)
174 w.writerow(header_names)
175 w.writerows(rows)
176 return
178 column_lengths = [
179 max((len(h), max(len(r[i]) for r in rows)))
180 for i, h in enumerate(header_names)
181 ]
182 # divider => "+---+---+-...-+"
183 divider = "+-" + "-+-".join("-" * x for x in column_lengths) + "-+"
184 # row_format => '| {:<10} | {:<8} | ... |' where the numbers are the column lengths
185 row_format_inner = " | ".join(
186 f"{ CELL_COLOR} { :{cadjust.get(cn, '<')}{x}} { CELL_COLOR_RESET} "
187 for cn, x in zip(header_names, column_lengths)
188 )
190 row_format = f"| {row_format_inner} |"
192 if self.supports_colors:
193 cs = self._color_support
194 assert cs is not None
195 header_color = cs.Style.bold
196 header_color_reset = cs.Style.reset
197 else:
198 header_color = ""
199 header_color_reset = ""
201 self.print_visual_formatting(divider)
202 self.print(
203 row_format.format(
204 *header_names,
205 CELL_COLOR=header_color,
206 CELL_COLOR_RESET=header_color_reset,
207 )
208 )
209 self.print_visual_formatting(divider)
210 for row in rows:
211 self.print(row_format.format(*row, CELL_COLOR="", CELL_COLOR_RESET=""))
212 self.print_visual_formatting(divider)
214 def print(self, /, string: str = "", **kwargs) -> None:
215 if "file" in kwargs:
216 raise ValueError("Unsupported kwarg file")
217 print(string, file=self.stream, **kwargs)
219 def print_visual_formatting(self, /, format_sequence: str, **kwargs) -> None:
220 if self.optimize_for_screen_reader:
221 return
222 self.print(format_sequence, **kwargs)
224 def print_for_screen_reader(self, /, text: str, **kwargs) -> None:
225 if not self.optimize_for_screen_reader:
226 return
227 self.print(text, **kwargs)
229 def heading(self, heading: str, level: int) -> str:
230 # Use markdown notation
231 heading_prefix = "#" * level
232 return f"{heading_prefix} {heading}"
235class ANSIOutputStylingBase(IOBasedOutputStyling):
236 def __init__(
237 self,
238 stream: IO[str],
239 output_format: str,
240 *,
241 support_colors: bool = True,
242 support_clickable_urls: bool = True,
243 **kwargs: Any,
244 ) -> None:
245 super().__init__(stream, output_format, **kwargs)
246 self._stream = stream
247 self._color_support = colored
248 self._support_colors = (
249 support_colors if self._color_support is not None else False
250 )
251 self._support_clickable_urls = support_clickable_urls
253 @property
254 def supports_colors(self) -> bool:
255 return self._support_colors
257 def colored(
258 self,
259 text: str,
260 *,
261 fg: str | None = None,
262 bg: str | None = None,
263 style: str | None = None,
264 ) -> str:
265 self._check_color(fg)
266 self._check_color(bg)
267 self._check_text_style(style)
268 _colored = self._color_support
269 if not self.supports_colors or _colored is None:
270 return text
271 codes = []
272 if style is not None:
273 code = getattr(_colored.Style, style)
274 assert code is not None
275 codes.append(code)
276 if fg is not None:
277 code = getattr(_colored.Fore, fg)
278 assert code is not None
279 codes.append(code)
280 if bg is not None:
281 code = getattr(_colored.Back, bg)
282 assert code is not None
283 codes.append(code)
284 if not codes:
285 return text
286 return "".join(codes) + text + _colored.Style.reset
288 def render_url(self, link_url: str) -> str:
289 if not self._support_clickable_urls:
290 return super().render_url(link_url)
291 link_text = link_url
292 if not self.optimize_for_screen_reader and link_url.startswith("man:"):
293 # Rewrite man page to a clickable link by default. I am not sure how the hyperlink
294 # ANSI code works with screen readers, so lets not rewrite the man page link by
295 # default. My fear is that both the link url and the link text gets read out.
296 m = MAN_URL_REWRITE.match(link_url)
297 if m:
298 page, section = m.groups()
299 link_url = f"https://manpages.debian.org/{page}.{section}"
300 return URL_START + f"{link_url}\a{link_text}" + URL_END
302 def heading(self, heading: str, level: int) -> str:
303 return self.colored(super().heading(heading, level), style="bold")
305 def bts(self, bugno) -> str:
306 if not self._support_clickable_urls:
307 return super().bts(bugno)
308 return self.render_url(f"https://bugs.debian.org/{bugno}")
311def no_fancy_output(
312 stream: IO[str] = None,
313 output_format: str = str,
314 optimize_for_screen_reader: bool = False,
315) -> IOBasedOutputStyling:
316 if stream is None: 316 ↛ 318line 316 didn't jump to line 318 because the condition on line 316 was always true
317 stream = sys.stdout
318 return IOBasedOutputStyling(
319 stream,
320 output_format,
321 optimize_for_screen_reader=optimize_for_screen_reader,
322 )
325def _output_styling(
326 parsed_args: argparse.Namespace,
327 stream: IO[str],
328) -> IOBasedOutputStyling:
329 output_format = getattr(parsed_args, "output_format", None)
330 if output_format is None:
331 output_format = "text"
332 optimize_for_screen_reader = os.environ.get("OPTIMIZE_FOR_SCREEN_READER", "") != ""
333 if not stream.isatty():
334 return no_fancy_output(
335 stream,
336 output_format,
337 optimize_for_screen_reader=optimize_for_screen_reader,
338 )
340 return ANSIOutputStylingBase(
341 stream, output_format, optimize_for_screen_reader=optimize_for_screen_reader
342 )
345@contextlib.contextmanager
346def _stream_to_pager(
347 parsed_args: argparse.Namespace,
348) -> Iterator[tuple[IO[str], IOBasedOutputStyling]]:
349 fancy_output = _output_styling(parsed_args, sys.stdout)
350 if (
351 not parsed_args.pager
352 or not sys.stdout.isatty()
353 or fancy_output.output_format != "text"
354 ):
355 yield sys.stdout, fancy_output
356 return
358 pager = _pager()
359 if pager is None:
360 yield sys.stdout, fancy_output
361 return
363 env: Mapping[str, str] = os.environ
364 if "LESS" not in env:
365 env_copy = dict(os.environ)
366 env_copy["LESS"] = "-FRSXMQ"
367 env = env_copy
369 cmd = subprocess.Popen(
370 pager,
371 stdin=subprocess.PIPE,
372 encoding="utf-8",
373 env=env,
374 )
375 stdin = assume_not_none(cmd.stdin)
376 try:
377 fancy_output.stream = stdin
378 yield stdin, fancy_output
379 except Exception:
380 stdin.close()
381 cmd.kill()
382 cmd.wait()
383 raise
384 finally:
385 fancy_output.stream = sys.stdin
386 stdin.close()
387 cmd.wait()