Coverage for src/debputy/commands/debputy_cmd/output.py: 22%
189 statements
« prev ^ index » next coverage.py v7.6.0, created at 2025-01-27 13:59 +0000
« prev ^ index » next coverage.py v7.6.0, created at 2025-01-27 13:59 +0000
1import argparse
2import contextlib
3import os
4import re
5import shutil
6import subprocess
7import sys
8from typing import (
9 Union,
10 Sequence,
11 Iterator,
12 IO,
13 Mapping,
14 Tuple,
15 Optional,
16 Any,
17)
19from debputy.util import assume_not_none
21try:
22 import colored
24 if ( 24 ↛ 30line 24 didn't jump to line 30
25 not hasattr(colored, "Style")
26 or not hasattr(colored, "Fore")
27 or not hasattr(colored, "Back")
28 ):
29 # Seen with python3-colored v1 (bookworm)
30 raise ImportError
31except ImportError:
32 colored = None
35def _pager() -> Optional[str]:
36 pager = os.environ.get("DEBPUTY_PAGER")
37 if pager is None:
38 pager = os.environ.get("PAGER")
39 if pager is None and shutil.which("less") is not None:
40 pager = "less"
41 return pager
44URL_START = "\033]8;;"
45URL_END = "\033]8;;\a"
46MAN_URL_REWRITE = re.compile(r"man:(\S+)[(](\d+)[)]")
48_SUPPORTED_COLORS = {
49 "black",
50 "red",
51 "green",
52 "yellow",
53 "blue",
54 "magenta",
55 "cyan",
56 "white",
57}
58_SUPPORTED_STYLES = {"none", "bold"}
61class OutputStylingBase:
62 def __init__(
63 self,
64 stream: IO[str],
65 output_format: str,
66 *,
67 optimize_for_screen_reader: bool = False,
68 ) -> None:
69 self.stream = stream
70 self.output_format = output_format
71 self.optimize_for_screen_reader = optimize_for_screen_reader
72 self._color_support = None
74 def colored(
75 self,
76 text: str,
77 *,
78 fg: Optional[Union[str]] = None,
79 bg: Optional[str] = None,
80 style: Optional[str] = None,
81 ) -> str:
82 self._check_color(fg)
83 self._check_color(bg)
84 self._check_text_style(style)
85 return text
87 @property
88 def supports_colors(self) -> bool:
89 return False
91 def print_list_table(
92 self,
93 headers: Sequence[Union[str, Tuple[str, str]]],
94 rows: Sequence[Sequence[str]],
95 ) -> None:
96 if rows:
97 if any(len(r) != len(rows[0]) for r in rows):
98 raise ValueError(
99 "Unbalanced table: All rows must have the same column count"
100 )
101 if len(rows[0]) != len(headers):
102 raise ValueError(
103 "Unbalanced table: header list does not agree with row list on number of columns"
104 )
106 if not headers:
107 raise ValueError("No headers provided!?")
109 cadjust = {}
110 header_names = []
111 for c in headers:
112 if isinstance(c, str):
113 header_names.append(c)
114 else:
115 cname, adjust = c
116 header_names.append(cname)
117 cadjust[cname] = adjust
119 if self.output_format == "csv":
120 from csv import writer
122 w = writer(self.stream)
123 w.writerow(header_names)
124 w.writerows(rows)
125 return
127 column_lengths = [
128 max((len(h), max(len(r[i]) for r in rows)))
129 for i, h in enumerate(header_names)
130 ]
131 # divider => "+---+---+-...-+"
132 divider = "+-" + "-+-".join("-" * x for x in column_lengths) + "-+"
133 # row_format => '| {:<10} | {:<8} | ... |' where the numbers are the column lengths
134 row_format_inner = " | ".join(
135 f"{ CELL_COLOR} { :{cadjust.get(cn, '<')}{x}} { CELL_COLOR_RESET} "
136 for cn, x in zip(header_names, column_lengths)
137 )
139 row_format = f"| {row_format_inner} |"
141 if self.supports_colors:
142 cs = self._color_support
143 assert cs is not None
144 header_color = cs.Style.bold
145 header_color_reset = cs.Style.reset
146 else:
147 header_color = ""
148 header_color_reset = ""
150 self.print_visual_formatting(divider)
151 self.print(
152 row_format.format(
153 *header_names,
154 CELL_COLOR=header_color,
155 CELL_COLOR_RESET=header_color_reset,
156 )
157 )
158 self.print_visual_formatting(divider)
159 for row in rows:
160 self.print(row_format.format(*row, CELL_COLOR="", CELL_COLOR_RESET=""))
161 self.print_visual_formatting(divider)
163 def print(self, /, string: str = "", **kwargs) -> None:
164 if "file" in kwargs:
165 raise ValueError("Unsupported kwarg file")
166 print(string, file=self.stream, **kwargs)
168 def print_visual_formatting(self, /, format_sequence: str, **kwargs) -> None:
169 if self.optimize_for_screen_reader:
170 return
171 self.print(format_sequence, **kwargs)
173 def print_for_screen_reader(self, /, text: str, **kwargs) -> None:
174 if not self.optimize_for_screen_reader:
175 return
176 self.print(text, **kwargs)
178 def _check_color(self, color: Optional[str]) -> None:
179 if color is not None and color not in _SUPPORTED_COLORS:
180 raise ValueError(
181 f"Unsupported color: {color}. Only the following are supported {','.join(_SUPPORTED_COLORS)}"
182 )
184 def _check_text_style(self, style: Optional[str]) -> None:
185 if style is not None and style not in _SUPPORTED_STYLES:
186 raise ValueError(
187 f"Unsupported style: {style}. Only the following are supported {','.join(_SUPPORTED_STYLES)}"
188 )
190 def render_url(self, link_url: str) -> str:
191 return link_url
193 def bts(self, bugno) -> str:
194 return f"https://bugs.debian.org/{bugno}"
197class ANSIOutputStylingBase(OutputStylingBase):
198 def __init__(
199 self,
200 stream: IO[str],
201 output_format: str,
202 *,
203 support_colors: bool = True,
204 support_clickable_urls: bool = True,
205 **kwargs: Any,
206 ) -> None:
207 super().__init__(stream, output_format, **kwargs)
208 self._stream = stream
209 self._color_support = colored
210 self._support_colors = (
211 support_colors if self._color_support is not None else False
212 )
213 self._support_clickable_urls = support_clickable_urls
215 @property
216 def supports_colors(self) -> bool:
217 return self._support_colors
219 def colored(
220 self,
221 text: str,
222 *,
223 fg: Optional[str] = None,
224 bg: Optional[str] = None,
225 style: Optional[str] = None,
226 ) -> str:
227 self._check_color(fg)
228 self._check_color(bg)
229 self._check_text_style(style)
230 _colored = self._color_support
231 if not self.supports_colors or _colored is None:
232 return text
233 codes = []
234 if style is not None:
235 code = getattr(_colored.Style, style)
236 assert code is not None
237 codes.append(code)
238 if fg is not None:
239 code = getattr(_colored.Fore, fg)
240 assert code is not None
241 codes.append(code)
242 if bg is not None:
243 code = getattr(_colored.Back, bg)
244 assert code is not None
245 codes.append(code)
246 if not codes:
247 return text
248 return "".join(codes) + text + _colored.Style.reset
250 def render_url(self, link_url: str) -> str:
251 if not self._support_clickable_urls:
252 return super().render_url(link_url)
253 link_text = link_url
254 if not self.optimize_for_screen_reader and link_url.startswith("man:"):
255 # Rewrite man page to a clickable link by default. I am not sure how the hyperlink
256 # ANSI code works with screen readers, so lets not rewrite the man page link by
257 # default. My fear is that both the link url and the link text gets read out.
258 m = MAN_URL_REWRITE.match(link_url)
259 if m:
260 page, section = m.groups()
261 link_url = f"https://manpages.debian.org/{page}.{section}"
262 return URL_START + f"{link_url}\a{link_text}" + URL_END
264 def bts(self, bugno) -> str:
265 if not self._support_clickable_urls:
266 return super().bts(bugno)
267 return self.render_url(f"https://bugs.debian.org/{bugno}")
270def no_fancy_output(
271 stream: IO[str] = None,
272 output_format: str = str,
273 optimize_for_screen_reader: bool = False,
274) -> OutputStylingBase:
275 if stream is None: 275 ↛ 277line 275 didn't jump to line 277 because the condition on line 275 was always true
276 stream = sys.stdout
277 return OutputStylingBase(
278 stream,
279 output_format,
280 optimize_for_screen_reader=optimize_for_screen_reader,
281 )
284def _output_styling(
285 parsed_args: argparse.Namespace,
286 stream: IO[str],
287) -> OutputStylingBase:
288 output_format = getattr(parsed_args, "output_format", None)
289 if output_format is None:
290 output_format = "text"
291 optimize_for_screen_reader = os.environ.get("OPTIMIZE_FOR_SCREEN_READER", "") != ""
292 if not stream.isatty():
293 return no_fancy_output(
294 stream,
295 output_format,
296 optimize_for_screen_reader=optimize_for_screen_reader,
297 )
299 return ANSIOutputStylingBase(
300 stream, output_format, optimize_for_screen_reader=optimize_for_screen_reader
301 )
304@contextlib.contextmanager
305def _stream_to_pager(
306 parsed_args: argparse.Namespace,
307) -> Iterator[Tuple[IO[str], OutputStylingBase]]:
308 fancy_output = _output_styling(parsed_args, sys.stdout)
309 if (
310 not parsed_args.pager
311 or not sys.stdout.isatty()
312 or fancy_output.output_format != "text"
313 ):
314 yield sys.stdout, fancy_output
315 return
317 pager = _pager()
318 if pager is None:
319 yield sys.stdout, fancy_output
320 return
322 env: Mapping[str, str] = os.environ
323 if "LESS" not in env:
324 env_copy = dict(os.environ)
325 env_copy["LESS"] = "-FRSXMQ"
326 env = env_copy
328 cmd = subprocess.Popen(
329 pager,
330 stdin=subprocess.PIPE,
331 encoding="utf-8",
332 env=env,
333 )
334 stdin = assume_not_none(cmd.stdin)
335 try:
336 fancy_output.stream = stdin
337 yield stdin, fancy_output
338 except Exception:
339 stdin.close()
340 cmd.kill()
341 cmd.wait()
342 raise
343 finally:
344 fancy_output.stream = sys.stdin
345 stdin.close()
346 cmd.wait()