Coverage for src/debputy/commands/debputy_cmd/output.py: 22%

214 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-09-07 09:27 +0000

1import argparse 

2import contextlib 

3import os 

4import re 

5import shutil 

6import subprocess 

7import sys 

8from typing import ( 

9 Union, 

10 Sequence, 

11 Iterator, 

12 IO, 

13 Mapping, 

14 Tuple, 

15 Optional, 

16 Any, 

17) 

18 

19from debputy.util import assume_not_none 

20 

21try: 

22 import colored 

23 

24 if ( 24 ↛ 30line 24 didn't jump to line 30 because the condition on line 24 was never true

25 not hasattr(colored, "Style") 

26 or not hasattr(colored, "Fore") 

27 or not hasattr(colored, "Back") 

28 ): 

29 # Seen with python3-colored v1 (bookworm) 

30 raise ImportError 

31except ImportError: 

32 colored = None 

33 

34 

35def _pager() -> Optional[str]: 

36 pager = os.environ.get("DEBPUTY_PAGER") 

37 if pager is None: 

38 pager = os.environ.get("PAGER") 

39 if pager is None and shutil.which("less") is not None: 

40 pager = "less" 

41 return pager 

42 

43 

44URL_START = "\033]8;;" 

45URL_END = "\033]8;;\a" 

46MAN_URL_REWRITE = re.compile(r"man:(\S+)[(](\d+)[)]") 

47 

48_SUPPORTED_COLORS = { 

49 "black", 

50 "red", 

51 "green", 

52 "yellow", 

53 "blue", 

54 "magenta", 

55 "cyan", 

56 "white", 

57} 

58_SUPPORTED_STYLES = {"none", "bold"} 

59 

60 

61class OutputStyle: 

62 

63 def colored( 

64 self, 

65 text: str, 

66 *, 

67 fg: Optional[Union[str]] = None, 

68 bg: Optional[str] = None, 

69 style: Optional[str] = None, 

70 ) -> str: 

71 self._check_color(fg) 

72 self._check_color(bg) 

73 self._check_text_style(style) 

74 return text 

75 

76 @property 

77 def supports_colors(self) -> bool: 

78 return False 

79 

80 def _check_color(self, color: Optional[str]) -> None: 

81 if color is not None and color not in _SUPPORTED_COLORS: 

82 raise ValueError( 

83 f"Unsupported color: {color}. Only the following are supported {','.join(_SUPPORTED_COLORS)}" 

84 ) 

85 

86 def _check_text_style(self, style: Optional[str]) -> None: 

87 if style is not None and style not in _SUPPORTED_STYLES: 

88 raise ValueError( 

89 f"Unsupported style: {style}. Only the following are supported {','.join(_SUPPORTED_STYLES)}" 

90 ) 

91 

92 def heading(self, heading: str, level: int) -> str: 

93 return heading 

94 

95 def render_url(self, link_url: str) -> str: 

96 return link_url 

97 

98 def bts(self, bugno) -> str: 

99 return self.render_url(f"https://bugs.debian.org/{bugno}") 

100 

101 

102class MarkdownOutputStyle(OutputStyle): 

103 def colored( 

104 self, 

105 text: str, 

106 *, 

107 fg: Optional[Union[str]] = None, 

108 bg: Optional[str] = None, 

109 style: Optional[str] = None, 

110 ) -> str: 

111 result = super().colored(text, fg=fg, bg=bg, style=style) 

112 if style == "bold": 

113 return f"**{result}**" 

114 return result 

115 

116 def heading(self, heading: str, level: int) -> str: 

117 prefix = "#" * level 

118 return f"{prefix} {heading}" 

119 

120 def render_url(self, link_url: str) -> str: 

121 if link_url.startswith("man:"): 

122 # Rewrite man page to a clickable link for markdown 

123 m = MAN_URL_REWRITE.match(link_url) 

124 if m: 

125 page, section = m.groups() 

126 man_page_url = f"https://manpages.debian.org/{page}.{section}" 

127 return f"[{link_url}]({man_page_url})" 

128 return f"<{link_url}>" 

129 

130 

131class IOBasedOutputStyling(OutputStyle): 

132 def __init__( 

133 self, 

134 stream: IO[str], 

135 output_format: str, 

136 *, 

137 optimize_for_screen_reader: bool = False, 

138 ) -> None: 

139 self.stream = stream 

140 self.output_format = output_format 

141 self.optimize_for_screen_reader = optimize_for_screen_reader 

142 self._color_support = None 

143 

144 def print_list_table( 

145 self, 

146 headers: Sequence[Union[str, Tuple[str, str]]], 

147 rows: Sequence[Sequence[str]], 

148 ) -> None: 

149 if rows: 

150 if any(len(r) != len(rows[0]) for r in rows): 

151 raise ValueError( 

152 "Unbalanced table: All rows must have the same column count" 

153 ) 

154 if len(rows[0]) != len(headers): 

155 raise ValueError( 

156 "Unbalanced table: header list does not agree with row list on number of columns" 

157 ) 

158 

159 if not headers: 

160 raise ValueError("No headers provided!?") 

161 

162 cadjust = {} 

163 header_names = [] 

164 for c in headers: 

165 if isinstance(c, str): 

166 header_names.append(c) 

167 else: 

168 cname, adjust = c 

169 header_names.append(cname) 

170 cadjust[cname] = adjust 

171 

172 if self.output_format == "csv": 

173 from csv import writer 

174 

175 w = writer(self.stream) 

176 w.writerow(header_names) 

177 w.writerows(rows) 

178 return 

179 

180 column_lengths = [ 

181 max((len(h), max(len(r[i]) for r in rows))) 

182 for i, h in enumerate(header_names) 

183 ] 

184 # divider => "+---+---+-...-+" 

185 divider = "+-" + "-+-".join("-" * x for x in column_lengths) + "-+" 

186 # row_format => '| {:<10} | {:<8} | ... |' where the numbers are the column lengths 

187 row_format_inner = " | ".join( 

188 f"{ CELL_COLOR} { :{cadjust.get(cn, '<')}{x}} { CELL_COLOR_RESET} " 

189 for cn, x in zip(header_names, column_lengths) 

190 ) 

191 

192 row_format = f"| {row_format_inner} |" 

193 

194 if self.supports_colors: 

195 cs = self._color_support 

196 assert cs is not None 

197 header_color = cs.Style.bold 

198 header_color_reset = cs.Style.reset 

199 else: 

200 header_color = "" 

201 header_color_reset = "" 

202 

203 self.print_visual_formatting(divider) 

204 self.print( 

205 row_format.format( 

206 *header_names, 

207 CELL_COLOR=header_color, 

208 CELL_COLOR_RESET=header_color_reset, 

209 ) 

210 ) 

211 self.print_visual_formatting(divider) 

212 for row in rows: 

213 self.print(row_format.format(*row, CELL_COLOR="", CELL_COLOR_RESET="")) 

214 self.print_visual_formatting(divider) 

215 

216 def print(self, /, string: str = "", **kwargs) -> None: 

217 if "file" in kwargs: 

218 raise ValueError("Unsupported kwarg file") 

219 print(string, file=self.stream, **kwargs) 

220 

221 def print_visual_formatting(self, /, format_sequence: str, **kwargs) -> None: 

222 if self.optimize_for_screen_reader: 

223 return 

224 self.print(format_sequence, **kwargs) 

225 

226 def print_for_screen_reader(self, /, text: str, **kwargs) -> None: 

227 if not self.optimize_for_screen_reader: 

228 return 

229 self.print(text, **kwargs) 

230 

231 def heading(self, heading: str, level: int) -> str: 

232 # Use markdown notation 

233 heading_prefix = "#" * level 

234 return f"{heading_prefix} {heading}" 

235 

236 

237class ANSIOutputStylingBase(IOBasedOutputStyling): 

238 def __init__( 

239 self, 

240 stream: IO[str], 

241 output_format: str, 

242 *, 

243 support_colors: bool = True, 

244 support_clickable_urls: bool = True, 

245 **kwargs: Any, 

246 ) -> None: 

247 super().__init__(stream, output_format, **kwargs) 

248 self._stream = stream 

249 self._color_support = colored 

250 self._support_colors = ( 

251 support_colors if self._color_support is not None else False 

252 ) 

253 self._support_clickable_urls = support_clickable_urls 

254 

255 @property 

256 def supports_colors(self) -> bool: 

257 return self._support_colors 

258 

259 def colored( 

260 self, 

261 text: str, 

262 *, 

263 fg: Optional[str] = None, 

264 bg: Optional[str] = None, 

265 style: Optional[str] = None, 

266 ) -> str: 

267 self._check_color(fg) 

268 self._check_color(bg) 

269 self._check_text_style(style) 

270 _colored = self._color_support 

271 if not self.supports_colors or _colored is None: 

272 return text 

273 codes = [] 

274 if style is not None: 

275 code = getattr(_colored.Style, style) 

276 assert code is not None 

277 codes.append(code) 

278 if fg is not None: 

279 code = getattr(_colored.Fore, fg) 

280 assert code is not None 

281 codes.append(code) 

282 if bg is not None: 

283 code = getattr(_colored.Back, bg) 

284 assert code is not None 

285 codes.append(code) 

286 if not codes: 

287 return text 

288 return "".join(codes) + text + _colored.Style.reset 

289 

290 def render_url(self, link_url: str) -> str: 

291 if not self._support_clickable_urls: 

292 return super().render_url(link_url) 

293 link_text = link_url 

294 if not self.optimize_for_screen_reader and link_url.startswith("man:"): 

295 # Rewrite man page to a clickable link by default. I am not sure how the hyperlink 

296 # ANSI code works with screen readers, so lets not rewrite the man page link by 

297 # default. My fear is that both the link url and the link text gets read out. 

298 m = MAN_URL_REWRITE.match(link_url) 

299 if m: 

300 page, section = m.groups() 

301 link_url = f"https://manpages.debian.org/{page}.{section}" 

302 return URL_START + f"{link_url}\a{link_text}" + URL_END 

303 

304 def heading(self, heading: str, level: int) -> str: 

305 return self.colored(super().heading(heading, level), style="bold") 

306 

307 def bts(self, bugno) -> str: 

308 if not self._support_clickable_urls: 

309 return super().bts(bugno) 

310 return self.render_url(f"https://bugs.debian.org/{bugno}") 

311 

312 

313def no_fancy_output( 

314 stream: IO[str] = None, 

315 output_format: str = str, 

316 optimize_for_screen_reader: bool = False, 

317) -> IOBasedOutputStyling: 

318 if stream is None: 318 ↛ 320line 318 didn't jump to line 320 because the condition on line 318 was always true

319 stream = sys.stdout 

320 return IOBasedOutputStyling( 

321 stream, 

322 output_format, 

323 optimize_for_screen_reader=optimize_for_screen_reader, 

324 ) 

325 

326 

327def _output_styling( 

328 parsed_args: argparse.Namespace, 

329 stream: IO[str], 

330) -> IOBasedOutputStyling: 

331 output_format = getattr(parsed_args, "output_format", None) 

332 if output_format is None: 

333 output_format = "text" 

334 optimize_for_screen_reader = os.environ.get("OPTIMIZE_FOR_SCREEN_READER", "") != "" 

335 if not stream.isatty(): 

336 return no_fancy_output( 

337 stream, 

338 output_format, 

339 optimize_for_screen_reader=optimize_for_screen_reader, 

340 ) 

341 

342 return ANSIOutputStylingBase( 

343 stream, output_format, optimize_for_screen_reader=optimize_for_screen_reader 

344 ) 

345 

346 

347@contextlib.contextmanager 

348def _stream_to_pager( 

349 parsed_args: argparse.Namespace, 

350) -> Iterator[Tuple[IO[str], IOBasedOutputStyling]]: 

351 fancy_output = _output_styling(parsed_args, sys.stdout) 

352 if ( 

353 not parsed_args.pager 

354 or not sys.stdout.isatty() 

355 or fancy_output.output_format != "text" 

356 ): 

357 yield sys.stdout, fancy_output 

358 return 

359 

360 pager = _pager() 

361 if pager is None: 

362 yield sys.stdout, fancy_output 

363 return 

364 

365 env: Mapping[str, str] = os.environ 

366 if "LESS" not in env: 

367 env_copy = dict(os.environ) 

368 env_copy["LESS"] = "-FRSXMQ" 

369 env = env_copy 

370 

371 cmd = subprocess.Popen( 

372 pager, 

373 stdin=subprocess.PIPE, 

374 encoding="utf-8", 

375 env=env, 

376 ) 

377 stdin = assume_not_none(cmd.stdin) 

378 try: 

379 fancy_output.stream = stdin 

380 yield stdin, fancy_output 

381 except Exception: 

382 stdin.close() 

383 cmd.kill() 

384 cmd.wait() 

385 raise 

386 finally: 

387 fancy_output.stream = sys.stdin 

388 stdin.close() 

389 cmd.wait()