Coverage for src/debputy/commands/debputy_cmd/output.py: 22%
214 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-09-07 09:27 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-09-07 09:27 +0000
1import argparse
2import contextlib
3import os
4import re
5import shutil
6import subprocess
7import sys
8from typing import (
9 Union,
10 Sequence,
11 Iterator,
12 IO,
13 Mapping,
14 Tuple,
15 Optional,
16 Any,
17)
19from debputy.util import assume_not_none
21try:
22 import colored
24 if ( 24 ↛ 30line 24 didn't jump to line 30 because the condition on line 24 was never true
25 not hasattr(colored, "Style")
26 or not hasattr(colored, "Fore")
27 or not hasattr(colored, "Back")
28 ):
29 # Seen with python3-colored v1 (bookworm)
30 raise ImportError
31except ImportError:
32 colored = None
35def _pager() -> Optional[str]:
36 pager = os.environ.get("DEBPUTY_PAGER")
37 if pager is None:
38 pager = os.environ.get("PAGER")
39 if pager is None and shutil.which("less") is not None:
40 pager = "less"
41 return pager
44URL_START = "\033]8;;"
45URL_END = "\033]8;;\a"
46MAN_URL_REWRITE = re.compile(r"man:(\S+)[(](\d+)[)]")
48_SUPPORTED_COLORS = {
49 "black",
50 "red",
51 "green",
52 "yellow",
53 "blue",
54 "magenta",
55 "cyan",
56 "white",
57}
58_SUPPORTED_STYLES = {"none", "bold"}
61class OutputStyle:
63 def colored(
64 self,
65 text: str,
66 *,
67 fg: Optional[Union[str]] = None,
68 bg: Optional[str] = None,
69 style: Optional[str] = None,
70 ) -> str:
71 self._check_color(fg)
72 self._check_color(bg)
73 self._check_text_style(style)
74 return text
76 @property
77 def supports_colors(self) -> bool:
78 return False
80 def _check_color(self, color: Optional[str]) -> None:
81 if color is not None and color not in _SUPPORTED_COLORS:
82 raise ValueError(
83 f"Unsupported color: {color}. Only the following are supported {','.join(_SUPPORTED_COLORS)}"
84 )
86 def _check_text_style(self, style: Optional[str]) -> None:
87 if style is not None and style not in _SUPPORTED_STYLES:
88 raise ValueError(
89 f"Unsupported style: {style}. Only the following are supported {','.join(_SUPPORTED_STYLES)}"
90 )
92 def heading(self, heading: str, level: int) -> str:
93 return heading
95 def render_url(self, link_url: str) -> str:
96 return link_url
98 def bts(self, bugno) -> str:
99 return self.render_url(f"https://bugs.debian.org/{bugno}")
102class MarkdownOutputStyle(OutputStyle):
103 def colored(
104 self,
105 text: str,
106 *,
107 fg: Optional[Union[str]] = None,
108 bg: Optional[str] = None,
109 style: Optional[str] = None,
110 ) -> str:
111 result = super().colored(text, fg=fg, bg=bg, style=style)
112 if style == "bold":
113 return f"**{result}**"
114 return result
116 def heading(self, heading: str, level: int) -> str:
117 prefix = "#" * level
118 return f"{prefix} {heading}"
120 def render_url(self, link_url: str) -> str:
121 if link_url.startswith("man:"):
122 # Rewrite man page to a clickable link for markdown
123 m = MAN_URL_REWRITE.match(link_url)
124 if m:
125 page, section = m.groups()
126 man_page_url = f"https://manpages.debian.org/{page}.{section}"
127 return f"[{link_url}]({man_page_url})"
128 return f"<{link_url}>"
131class IOBasedOutputStyling(OutputStyle):
132 def __init__(
133 self,
134 stream: IO[str],
135 output_format: str,
136 *,
137 optimize_for_screen_reader: bool = False,
138 ) -> None:
139 self.stream = stream
140 self.output_format = output_format
141 self.optimize_for_screen_reader = optimize_for_screen_reader
142 self._color_support = None
144 def print_list_table(
145 self,
146 headers: Sequence[Union[str, Tuple[str, str]]],
147 rows: Sequence[Sequence[str]],
148 ) -> None:
149 if rows:
150 if any(len(r) != len(rows[0]) for r in rows):
151 raise ValueError(
152 "Unbalanced table: All rows must have the same column count"
153 )
154 if len(rows[0]) != len(headers):
155 raise ValueError(
156 "Unbalanced table: header list does not agree with row list on number of columns"
157 )
159 if not headers:
160 raise ValueError("No headers provided!?")
162 cadjust = {}
163 header_names = []
164 for c in headers:
165 if isinstance(c, str):
166 header_names.append(c)
167 else:
168 cname, adjust = c
169 header_names.append(cname)
170 cadjust[cname] = adjust
172 if self.output_format == "csv":
173 from csv import writer
175 w = writer(self.stream)
176 w.writerow(header_names)
177 w.writerows(rows)
178 return
180 column_lengths = [
181 max((len(h), max(len(r[i]) for r in rows)))
182 for i, h in enumerate(header_names)
183 ]
184 # divider => "+---+---+-...-+"
185 divider = "+-" + "-+-".join("-" * x for x in column_lengths) + "-+"
186 # row_format => '| {:<10} | {:<8} | ... |' where the numbers are the column lengths
187 row_format_inner = " | ".join(
188 f"{ CELL_COLOR} { :{cadjust.get(cn, '<')}{x}} { CELL_COLOR_RESET} "
189 for cn, x in zip(header_names, column_lengths)
190 )
192 row_format = f"| {row_format_inner} |"
194 if self.supports_colors:
195 cs = self._color_support
196 assert cs is not None
197 header_color = cs.Style.bold
198 header_color_reset = cs.Style.reset
199 else:
200 header_color = ""
201 header_color_reset = ""
203 self.print_visual_formatting(divider)
204 self.print(
205 row_format.format(
206 *header_names,
207 CELL_COLOR=header_color,
208 CELL_COLOR_RESET=header_color_reset,
209 )
210 )
211 self.print_visual_formatting(divider)
212 for row in rows:
213 self.print(row_format.format(*row, CELL_COLOR="", CELL_COLOR_RESET=""))
214 self.print_visual_formatting(divider)
216 def print(self, /, string: str = "", **kwargs) -> None:
217 if "file" in kwargs:
218 raise ValueError("Unsupported kwarg file")
219 print(string, file=self.stream, **kwargs)
221 def print_visual_formatting(self, /, format_sequence: str, **kwargs) -> None:
222 if self.optimize_for_screen_reader:
223 return
224 self.print(format_sequence, **kwargs)
226 def print_for_screen_reader(self, /, text: str, **kwargs) -> None:
227 if not self.optimize_for_screen_reader:
228 return
229 self.print(text, **kwargs)
231 def heading(self, heading: str, level: int) -> str:
232 # Use markdown notation
233 heading_prefix = "#" * level
234 return f"{heading_prefix} {heading}"
237class ANSIOutputStylingBase(IOBasedOutputStyling):
238 def __init__(
239 self,
240 stream: IO[str],
241 output_format: str,
242 *,
243 support_colors: bool = True,
244 support_clickable_urls: bool = True,
245 **kwargs: Any,
246 ) -> None:
247 super().__init__(stream, output_format, **kwargs)
248 self._stream = stream
249 self._color_support = colored
250 self._support_colors = (
251 support_colors if self._color_support is not None else False
252 )
253 self._support_clickable_urls = support_clickable_urls
255 @property
256 def supports_colors(self) -> bool:
257 return self._support_colors
259 def colored(
260 self,
261 text: str,
262 *,
263 fg: Optional[str] = None,
264 bg: Optional[str] = None,
265 style: Optional[str] = None,
266 ) -> str:
267 self._check_color(fg)
268 self._check_color(bg)
269 self._check_text_style(style)
270 _colored = self._color_support
271 if not self.supports_colors or _colored is None:
272 return text
273 codes = []
274 if style is not None:
275 code = getattr(_colored.Style, style)
276 assert code is not None
277 codes.append(code)
278 if fg is not None:
279 code = getattr(_colored.Fore, fg)
280 assert code is not None
281 codes.append(code)
282 if bg is not None:
283 code = getattr(_colored.Back, bg)
284 assert code is not None
285 codes.append(code)
286 if not codes:
287 return text
288 return "".join(codes) + text + _colored.Style.reset
290 def render_url(self, link_url: str) -> str:
291 if not self._support_clickable_urls:
292 return super().render_url(link_url)
293 link_text = link_url
294 if not self.optimize_for_screen_reader and link_url.startswith("man:"):
295 # Rewrite man page to a clickable link by default. I am not sure how the hyperlink
296 # ANSI code works with screen readers, so lets not rewrite the man page link by
297 # default. My fear is that both the link url and the link text gets read out.
298 m = MAN_URL_REWRITE.match(link_url)
299 if m:
300 page, section = m.groups()
301 link_url = f"https://manpages.debian.org/{page}.{section}"
302 return URL_START + f"{link_url}\a{link_text}" + URL_END
304 def heading(self, heading: str, level: int) -> str:
305 return self.colored(super().heading(heading, level), style="bold")
307 def bts(self, bugno) -> str:
308 if not self._support_clickable_urls:
309 return super().bts(bugno)
310 return self.render_url(f"https://bugs.debian.org/{bugno}")
313def no_fancy_output(
314 stream: IO[str] = None,
315 output_format: str = str,
316 optimize_for_screen_reader: bool = False,
317) -> IOBasedOutputStyling:
318 if stream is None: 318 ↛ 320line 318 didn't jump to line 320 because the condition on line 318 was always true
319 stream = sys.stdout
320 return IOBasedOutputStyling(
321 stream,
322 output_format,
323 optimize_for_screen_reader=optimize_for_screen_reader,
324 )
327def _output_styling(
328 parsed_args: argparse.Namespace,
329 stream: IO[str],
330) -> IOBasedOutputStyling:
331 output_format = getattr(parsed_args, "output_format", None)
332 if output_format is None:
333 output_format = "text"
334 optimize_for_screen_reader = os.environ.get("OPTIMIZE_FOR_SCREEN_READER", "") != ""
335 if not stream.isatty():
336 return no_fancy_output(
337 stream,
338 output_format,
339 optimize_for_screen_reader=optimize_for_screen_reader,
340 )
342 return ANSIOutputStylingBase(
343 stream, output_format, optimize_for_screen_reader=optimize_for_screen_reader
344 )
347@contextlib.contextmanager
348def _stream_to_pager(
349 parsed_args: argparse.Namespace,
350) -> Iterator[Tuple[IO[str], IOBasedOutputStyling]]:
351 fancy_output = _output_styling(parsed_args, sys.stdout)
352 if (
353 not parsed_args.pager
354 or not sys.stdout.isatty()
355 or fancy_output.output_format != "text"
356 ):
357 yield sys.stdout, fancy_output
358 return
360 pager = _pager()
361 if pager is None:
362 yield sys.stdout, fancy_output
363 return
365 env: Mapping[str, str] = os.environ
366 if "LESS" not in env:
367 env_copy = dict(os.environ)
368 env_copy["LESS"] = "-FRSXMQ"
369 env = env_copy
371 cmd = subprocess.Popen(
372 pager,
373 stdin=subprocess.PIPE,
374 encoding="utf-8",
375 env=env,
376 )
377 stdin = assume_not_none(cmd.stdin)
378 try:
379 fancy_output.stream = stdin
380 yield stdin, fancy_output
381 except Exception:
382 stdin.close()
383 cmd.kill()
384 cmd.wait()
385 raise
386 finally:
387 fancy_output.stream = sys.stdin
388 stdin.close()
389 cmd.wait()