Coverage for src/debputy/commands/debputy_cmd/output.py: 22%

215 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-10-12 15:06 +0000

1import argparse 

2import contextlib 

3import os 

4import re 

5import shutil 

6import subprocess 

7import sys 

8from typing import ( 

9 Union, 

10 IO, 

11 Tuple, 

12 Optional, 

13 Any, 

14) 

15from collections.abc import Sequence, Iterator, Mapping 

16 

17from debputy.util import assume_not_none 

18 

19try: 

20 import colored 

21 

22 if ( 22 ↛ 28line 22 didn't jump to line 28 because the condition on line 22 was never true

23 not hasattr(colored, "Style") 

24 or not hasattr(colored, "Fore") 

25 or not hasattr(colored, "Back") 

26 ): 

27 # Seen with python3-colored v1 (bookworm) 

28 raise ImportError 

29except ImportError: 

30 colored = None 

31 

32 

33def _pager() -> str | None: 

34 pager = os.environ.get("DEBPUTY_PAGER") 

35 if pager is None: 

36 pager = os.environ.get("PAGER") 

37 if pager is None and shutil.which("less") is not None: 

38 pager = "less" 

39 return pager 

40 

41 

42URL_START = "\033]8;;" 

43URL_END = "\033]8;;\a" 

44MAN_URL_REWRITE = re.compile(r"man:(\S+)[(](\d+)[)]") 

45 

46_SUPPORTED_COLORS = { 

47 "black", 

48 "red", 

49 "green", 

50 "yellow", 

51 "blue", 

52 "magenta", 

53 "cyan", 

54 "white", 

55} 

56_SUPPORTED_STYLES = {"none", "bold"} 

57 

58 

59class OutputStyle: 

60 

61 def colored( 

62 self, 

63 text: str, 

64 *, 

65 fg: str | None = None, 

66 bg: str | None = None, 

67 style: str | None = None, 

68 ) -> str: 

69 self._check_color(fg) 

70 self._check_color(bg) 

71 self._check_text_style(style) 

72 return text 

73 

74 @property 

75 def supports_colors(self) -> bool: 

76 return False 

77 

78 def _check_color(self, color: str | None) -> None: 

79 if color is not None and color not in _SUPPORTED_COLORS: 

80 raise ValueError( 

81 f"Unsupported color: {color}. Only the following are supported {','.join(_SUPPORTED_COLORS)}" 

82 ) 

83 

84 def _check_text_style(self, style: str | None) -> None: 

85 if style is not None and style not in _SUPPORTED_STYLES: 

86 raise ValueError( 

87 f"Unsupported style: {style}. Only the following are supported {','.join(_SUPPORTED_STYLES)}" 

88 ) 

89 

90 def heading(self, heading: str, level: int) -> str: 

91 return heading 

92 

93 def render_url(self, link_url: str) -> str: 

94 return link_url 

95 

96 def bts(self, bugno) -> str: 

97 return self.render_url(f"https://bugs.debian.org/{bugno}") 

98 

99 

100class MarkdownOutputStyle(OutputStyle): 

101 def colored( 

102 self, 

103 text: str, 

104 *, 

105 fg: str | None = None, 

106 bg: str | None = None, 

107 style: str | None = None, 

108 ) -> str: 

109 result = super().colored(text, fg=fg, bg=bg, style=style) 

110 if style == "bold": 

111 return f"**{result}**" 

112 return result 

113 

114 def heading(self, heading: str, level: int) -> str: 

115 prefix = "#" * level 

116 return f"{prefix} {heading}" 

117 

118 def render_url(self, link_url: str) -> str: 

119 if link_url.startswith("man:"): 

120 # Rewrite man page to a clickable link for markdown 

121 m = MAN_URL_REWRITE.match(link_url) 

122 if m: 

123 page, section = m.groups() 

124 man_page_url = f"https://manpages.debian.org/{page}.{section}" 

125 return f"[{link_url}]({man_page_url})" 

126 return f"<{link_url}>" 

127 

128 

129class IOBasedOutputStyling(OutputStyle): 

130 def __init__( 

131 self, 

132 stream: IO[str], 

133 output_format: str, 

134 *, 

135 optimize_for_screen_reader: bool = False, 

136 ) -> None: 

137 self.stream = stream 

138 self.output_format = output_format 

139 self.optimize_for_screen_reader = optimize_for_screen_reader 

140 self._color_support = None 

141 

142 def print_list_table( 

143 self, 

144 headers: Sequence[str | tuple[str, str]], 

145 rows: Sequence[Sequence[str]], 

146 ) -> None: 

147 if rows: 

148 if any(len(r) != len(rows[0]) for r in rows): 

149 raise ValueError( 

150 "Unbalanced table: All rows must have the same column count" 

151 ) 

152 if len(rows[0]) != len(headers): 

153 raise ValueError( 

154 "Unbalanced table: header list does not agree with row list on number of columns" 

155 ) 

156 

157 if not headers: 

158 raise ValueError("No headers provided!?") 

159 

160 cadjust = {} 

161 header_names = [] 

162 for c in headers: 

163 if isinstance(c, str): 

164 header_names.append(c) 

165 else: 

166 cname, adjust = c 

167 header_names.append(cname) 

168 cadjust[cname] = adjust 

169 

170 if self.output_format == "csv": 

171 from csv import writer 

172 

173 w = writer(self.stream) 

174 w.writerow(header_names) 

175 w.writerows(rows) 

176 return 

177 

178 column_lengths = [ 

179 max((len(h), max(len(r[i]) for r in rows))) 

180 for i, h in enumerate(header_names) 

181 ] 

182 # divider => "+---+---+-...-+" 

183 divider = "+-" + "-+-".join("-" * x for x in column_lengths) + "-+" 

184 # row_format => '| {:<10} | {:<8} | ... |' where the numbers are the column lengths 

185 row_format_inner = " | ".join( 

186 f"{ CELL_COLOR} { :{cadjust.get(cn, '<')}{x}} { CELL_COLOR_RESET} " 

187 for cn, x in zip(header_names, column_lengths) 

188 ) 

189 

190 row_format = f"| {row_format_inner} |" 

191 

192 if self.supports_colors: 

193 cs = self._color_support 

194 assert cs is not None 

195 header_color = cs.Style.bold 

196 header_color_reset = cs.Style.reset 

197 else: 

198 header_color = "" 

199 header_color_reset = "" 

200 

201 self.print_visual_formatting(divider) 

202 self.print( 

203 row_format.format( 

204 *header_names, 

205 CELL_COLOR=header_color, 

206 CELL_COLOR_RESET=header_color_reset, 

207 ) 

208 ) 

209 self.print_visual_formatting(divider) 

210 for row in rows: 

211 self.print(row_format.format(*row, CELL_COLOR="", CELL_COLOR_RESET="")) 

212 self.print_visual_formatting(divider) 

213 

214 def print(self, /, string: str = "", **kwargs) -> None: 

215 if "file" in kwargs: 

216 raise ValueError("Unsupported kwarg file") 

217 print(string, file=self.stream, **kwargs) 

218 

219 def print_visual_formatting(self, /, format_sequence: str, **kwargs) -> None: 

220 if self.optimize_for_screen_reader: 

221 return 

222 self.print(format_sequence, **kwargs) 

223 

224 def print_for_screen_reader(self, /, text: str, **kwargs) -> None: 

225 if not self.optimize_for_screen_reader: 

226 return 

227 self.print(text, **kwargs) 

228 

229 def heading(self, heading: str, level: int) -> str: 

230 # Use markdown notation 

231 heading_prefix = "#" * level 

232 return f"{heading_prefix} {heading}" 

233 

234 

235class ANSIOutputStylingBase(IOBasedOutputStyling): 

236 def __init__( 

237 self, 

238 stream: IO[str], 

239 output_format: str, 

240 *, 

241 support_colors: bool = True, 

242 support_clickable_urls: bool = True, 

243 **kwargs: Any, 

244 ) -> None: 

245 super().__init__(stream, output_format, **kwargs) 

246 self._stream = stream 

247 self._color_support = colored 

248 self._support_colors = ( 

249 support_colors if self._color_support is not None else False 

250 ) 

251 self._support_clickable_urls = support_clickable_urls 

252 

253 @property 

254 def supports_colors(self) -> bool: 

255 return self._support_colors 

256 

257 def colored( 

258 self, 

259 text: str, 

260 *, 

261 fg: str | None = None, 

262 bg: str | None = None, 

263 style: str | None = None, 

264 ) -> str: 

265 self._check_color(fg) 

266 self._check_color(bg) 

267 self._check_text_style(style) 

268 _colored = self._color_support 

269 if not self.supports_colors or _colored is None: 

270 return text 

271 codes = [] 

272 if style is not None: 

273 code = getattr(_colored.Style, style) 

274 assert code is not None 

275 codes.append(code) 

276 if fg is not None: 

277 code = getattr(_colored.Fore, fg) 

278 assert code is not None 

279 codes.append(code) 

280 if bg is not None: 

281 code = getattr(_colored.Back, bg) 

282 assert code is not None 

283 codes.append(code) 

284 if not codes: 

285 return text 

286 return "".join(codes) + text + _colored.Style.reset 

287 

288 def render_url(self, link_url: str) -> str: 

289 if not self._support_clickable_urls: 

290 return super().render_url(link_url) 

291 link_text = link_url 

292 if not self.optimize_for_screen_reader and link_url.startswith("man:"): 

293 # Rewrite man page to a clickable link by default. I am not sure how the hyperlink 

294 # ANSI code works with screen readers, so lets not rewrite the man page link by 

295 # default. My fear is that both the link url and the link text gets read out. 

296 m = MAN_URL_REWRITE.match(link_url) 

297 if m: 

298 page, section = m.groups() 

299 link_url = f"https://manpages.debian.org/{page}.{section}" 

300 return URL_START + f"{link_url}\a{link_text}" + URL_END 

301 

302 def heading(self, heading: str, level: int) -> str: 

303 return self.colored(super().heading(heading, level), style="bold") 

304 

305 def bts(self, bugno) -> str: 

306 if not self._support_clickable_urls: 

307 return super().bts(bugno) 

308 return self.render_url(f"https://bugs.debian.org/{bugno}") 

309 

310 

311def no_fancy_output( 

312 stream: IO[str] = None, 

313 output_format: str = str, 

314 optimize_for_screen_reader: bool = False, 

315) -> IOBasedOutputStyling: 

316 if stream is None: 316 ↛ 318line 316 didn't jump to line 318 because the condition on line 316 was always true

317 stream = sys.stdout 

318 return IOBasedOutputStyling( 

319 stream, 

320 output_format, 

321 optimize_for_screen_reader=optimize_for_screen_reader, 

322 ) 

323 

324 

325def _output_styling( 

326 parsed_args: argparse.Namespace, 

327 stream: IO[str], 

328) -> IOBasedOutputStyling: 

329 output_format = getattr(parsed_args, "output_format", None) 

330 if output_format is None: 

331 output_format = "text" 

332 optimize_for_screen_reader = os.environ.get("OPTIMIZE_FOR_SCREEN_READER", "") != "" 

333 if not stream.isatty(): 

334 return no_fancy_output( 

335 stream, 

336 output_format, 

337 optimize_for_screen_reader=optimize_for_screen_reader, 

338 ) 

339 

340 return ANSIOutputStylingBase( 

341 stream, output_format, optimize_for_screen_reader=optimize_for_screen_reader 

342 ) 

343 

344 

345@contextlib.contextmanager 

346def _stream_to_pager( 

347 parsed_args: argparse.Namespace, 

348) -> Iterator[tuple[IO[str], IOBasedOutputStyling]]: 

349 fancy_output = _output_styling(parsed_args, sys.stdout) 

350 if ( 

351 not parsed_args.pager 

352 or not sys.stdout.isatty() 

353 or fancy_output.output_format != "text" 

354 ): 

355 yield sys.stdout, fancy_output 

356 return 

357 

358 pager = _pager() 

359 if pager is None: 

360 yield sys.stdout, fancy_output 

361 return 

362 

363 env: Mapping[str, str] = os.environ 

364 if "LESS" not in env: 

365 env_copy = dict(os.environ) 

366 env_copy["LESS"] = "-FRSXMQ" 

367 env = env_copy 

368 

369 cmd = subprocess.Popen( 

370 pager, 

371 stdin=subprocess.PIPE, 

372 encoding="utf-8", 

373 env=env, 

374 ) 

375 stdin = assume_not_none(cmd.stdin) 

376 try: 

377 fancy_output.stream = stdin 

378 yield stdin, fancy_output 

379 except Exception: 

380 stdin.close() 

381 cmd.kill() 

382 cmd.wait() 

383 raise 

384 finally: 

385 fancy_output.stream = sys.stdin 

386 stdin.close() 

387 cmd.wait()