Coverage for src/debputy/commands/debputy_cmd/output.py: 22%

189 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2025-01-27 13:59 +0000

1import argparse 

2import contextlib 

3import os 

4import re 

5import shutil 

6import subprocess 

7import sys 

8from typing import ( 

9 Union, 

10 Sequence, 

11 Iterator, 

12 IO, 

13 Mapping, 

14 Tuple, 

15 Optional, 

16 Any, 

17) 

18 

19from debputy.util import assume_not_none 

20 

21try: 

22 import colored 

23 

24 if ( 24 ↛ 30line 24 didn't jump to line 30

25 not hasattr(colored, "Style") 

26 or not hasattr(colored, "Fore") 

27 or not hasattr(colored, "Back") 

28 ): 

29 # Seen with python3-colored v1 (bookworm) 

30 raise ImportError 

31except ImportError: 

32 colored = None 

33 

34 

35def _pager() -> Optional[str]: 

36 pager = os.environ.get("DEBPUTY_PAGER") 

37 if pager is None: 

38 pager = os.environ.get("PAGER") 

39 if pager is None and shutil.which("less") is not None: 

40 pager = "less" 

41 return pager 

42 

43 

44URL_START = "\033]8;;" 

45URL_END = "\033]8;;\a" 

46MAN_URL_REWRITE = re.compile(r"man:(\S+)[(](\d+)[)]") 

47 

48_SUPPORTED_COLORS = { 

49 "black", 

50 "red", 

51 "green", 

52 "yellow", 

53 "blue", 

54 "magenta", 

55 "cyan", 

56 "white", 

57} 

58_SUPPORTED_STYLES = {"none", "bold"} 

59 

60 

61class OutputStylingBase: 

62 def __init__( 

63 self, 

64 stream: IO[str], 

65 output_format: str, 

66 *, 

67 optimize_for_screen_reader: bool = False, 

68 ) -> None: 

69 self.stream = stream 

70 self.output_format = output_format 

71 self.optimize_for_screen_reader = optimize_for_screen_reader 

72 self._color_support = None 

73 

74 def colored( 

75 self, 

76 text: str, 

77 *, 

78 fg: Optional[Union[str]] = None, 

79 bg: Optional[str] = None, 

80 style: Optional[str] = None, 

81 ) -> str: 

82 self._check_color(fg) 

83 self._check_color(bg) 

84 self._check_text_style(style) 

85 return text 

86 

87 @property 

88 def supports_colors(self) -> bool: 

89 return False 

90 

91 def print_list_table( 

92 self, 

93 headers: Sequence[Union[str, Tuple[str, str]]], 

94 rows: Sequence[Sequence[str]], 

95 ) -> None: 

96 if rows: 

97 if any(len(r) != len(rows[0]) for r in rows): 

98 raise ValueError( 

99 "Unbalanced table: All rows must have the same column count" 

100 ) 

101 if len(rows[0]) != len(headers): 

102 raise ValueError( 

103 "Unbalanced table: header list does not agree with row list on number of columns" 

104 ) 

105 

106 if not headers: 

107 raise ValueError("No headers provided!?") 

108 

109 cadjust = {} 

110 header_names = [] 

111 for c in headers: 

112 if isinstance(c, str): 

113 header_names.append(c) 

114 else: 

115 cname, adjust = c 

116 header_names.append(cname) 

117 cadjust[cname] = adjust 

118 

119 if self.output_format == "csv": 

120 from csv import writer 

121 

122 w = writer(self.stream) 

123 w.writerow(header_names) 

124 w.writerows(rows) 

125 return 

126 

127 column_lengths = [ 

128 max((len(h), max(len(r[i]) for r in rows))) 

129 for i, h in enumerate(header_names) 

130 ] 

131 # divider => "+---+---+-...-+" 

132 divider = "+-" + "-+-".join("-" * x for x in column_lengths) + "-+" 

133 # row_format => '| {:<10} | {:<8} | ... |' where the numbers are the column lengths 

134 row_format_inner = " | ".join( 

135 f"{ CELL_COLOR} { :{cadjust.get(cn, '<')}{x}} { CELL_COLOR_RESET} " 

136 for cn, x in zip(header_names, column_lengths) 

137 ) 

138 

139 row_format = f"| {row_format_inner} |" 

140 

141 if self.supports_colors: 

142 cs = self._color_support 

143 assert cs is not None 

144 header_color = cs.Style.bold 

145 header_color_reset = cs.Style.reset 

146 else: 

147 header_color = "" 

148 header_color_reset = "" 

149 

150 self.print_visual_formatting(divider) 

151 self.print( 

152 row_format.format( 

153 *header_names, 

154 CELL_COLOR=header_color, 

155 CELL_COLOR_RESET=header_color_reset, 

156 ) 

157 ) 

158 self.print_visual_formatting(divider) 

159 for row in rows: 

160 self.print(row_format.format(*row, CELL_COLOR="", CELL_COLOR_RESET="")) 

161 self.print_visual_formatting(divider) 

162 

163 def print(self, /, string: str = "", **kwargs) -> None: 

164 if "file" in kwargs: 

165 raise ValueError("Unsupported kwarg file") 

166 print(string, file=self.stream, **kwargs) 

167 

168 def print_visual_formatting(self, /, format_sequence: str, **kwargs) -> None: 

169 if self.optimize_for_screen_reader: 

170 return 

171 self.print(format_sequence, **kwargs) 

172 

173 def print_for_screen_reader(self, /, text: str, **kwargs) -> None: 

174 if not self.optimize_for_screen_reader: 

175 return 

176 self.print(text, **kwargs) 

177 

178 def _check_color(self, color: Optional[str]) -> None: 

179 if color is not None and color not in _SUPPORTED_COLORS: 

180 raise ValueError( 

181 f"Unsupported color: {color}. Only the following are supported {','.join(_SUPPORTED_COLORS)}" 

182 ) 

183 

184 def _check_text_style(self, style: Optional[str]) -> None: 

185 if style is not None and style not in _SUPPORTED_STYLES: 

186 raise ValueError( 

187 f"Unsupported style: {style}. Only the following are supported {','.join(_SUPPORTED_STYLES)}" 

188 ) 

189 

190 def render_url(self, link_url: str) -> str: 

191 return link_url 

192 

193 def bts(self, bugno) -> str: 

194 return f"https://bugs.debian.org/{bugno}" 

195 

196 

197class ANSIOutputStylingBase(OutputStylingBase): 

198 def __init__( 

199 self, 

200 stream: IO[str], 

201 output_format: str, 

202 *, 

203 support_colors: bool = True, 

204 support_clickable_urls: bool = True, 

205 **kwargs: Any, 

206 ) -> None: 

207 super().__init__(stream, output_format, **kwargs) 

208 self._stream = stream 

209 self._color_support = colored 

210 self._support_colors = ( 

211 support_colors if self._color_support is not None else False 

212 ) 

213 self._support_clickable_urls = support_clickable_urls 

214 

215 @property 

216 def supports_colors(self) -> bool: 

217 return self._support_colors 

218 

219 def colored( 

220 self, 

221 text: str, 

222 *, 

223 fg: Optional[str] = None, 

224 bg: Optional[str] = None, 

225 style: Optional[str] = None, 

226 ) -> str: 

227 self._check_color(fg) 

228 self._check_color(bg) 

229 self._check_text_style(style) 

230 _colored = self._color_support 

231 if not self.supports_colors or _colored is None: 

232 return text 

233 codes = [] 

234 if style is not None: 

235 code = getattr(_colored.Style, style) 

236 assert code is not None 

237 codes.append(code) 

238 if fg is not None: 

239 code = getattr(_colored.Fore, fg) 

240 assert code is not None 

241 codes.append(code) 

242 if bg is not None: 

243 code = getattr(_colored.Back, bg) 

244 assert code is not None 

245 codes.append(code) 

246 if not codes: 

247 return text 

248 return "".join(codes) + text + _colored.Style.reset 

249 

250 def render_url(self, link_url: str) -> str: 

251 if not self._support_clickable_urls: 

252 return super().render_url(link_url) 

253 link_text = link_url 

254 if not self.optimize_for_screen_reader and link_url.startswith("man:"): 

255 # Rewrite man page to a clickable link by default. I am not sure how the hyperlink 

256 # ANSI code works with screen readers, so lets not rewrite the man page link by 

257 # default. My fear is that both the link url and the link text gets read out. 

258 m = MAN_URL_REWRITE.match(link_url) 

259 if m: 

260 page, section = m.groups() 

261 link_url = f"https://manpages.debian.org/{page}.{section}" 

262 return URL_START + f"{link_url}\a{link_text}" + URL_END 

263 

264 def bts(self, bugno) -> str: 

265 if not self._support_clickable_urls: 

266 return super().bts(bugno) 

267 return self.render_url(f"https://bugs.debian.org/{bugno}") 

268 

269 

270def no_fancy_output( 

271 stream: IO[str] = None, 

272 output_format: str = str, 

273 optimize_for_screen_reader: bool = False, 

274) -> OutputStylingBase: 

275 if stream is None: 275 ↛ 277line 275 didn't jump to line 277 because the condition on line 275 was always true

276 stream = sys.stdout 

277 return OutputStylingBase( 

278 stream, 

279 output_format, 

280 optimize_for_screen_reader=optimize_for_screen_reader, 

281 ) 

282 

283 

284def _output_styling( 

285 parsed_args: argparse.Namespace, 

286 stream: IO[str], 

287) -> OutputStylingBase: 

288 output_format = getattr(parsed_args, "output_format", None) 

289 if output_format is None: 

290 output_format = "text" 

291 optimize_for_screen_reader = os.environ.get("OPTIMIZE_FOR_SCREEN_READER", "") != "" 

292 if not stream.isatty(): 

293 return no_fancy_output( 

294 stream, 

295 output_format, 

296 optimize_for_screen_reader=optimize_for_screen_reader, 

297 ) 

298 

299 return ANSIOutputStylingBase( 

300 stream, output_format, optimize_for_screen_reader=optimize_for_screen_reader 

301 ) 

302 

303 

304@contextlib.contextmanager 

305def _stream_to_pager( 

306 parsed_args: argparse.Namespace, 

307) -> Iterator[Tuple[IO[str], OutputStylingBase]]: 

308 fancy_output = _output_styling(parsed_args, sys.stdout) 

309 if ( 

310 not parsed_args.pager 

311 or not sys.stdout.isatty() 

312 or fancy_output.output_format != "text" 

313 ): 

314 yield sys.stdout, fancy_output 

315 return 

316 

317 pager = _pager() 

318 if pager is None: 

319 yield sys.stdout, fancy_output 

320 return 

321 

322 env: Mapping[str, str] = os.environ 

323 if "LESS" not in env: 

324 env_copy = dict(os.environ) 

325 env_copy["LESS"] = "-FRSXMQ" 

326 env = env_copy 

327 

328 cmd = subprocess.Popen( 

329 pager, 

330 stdin=subprocess.PIPE, 

331 encoding="utf-8", 

332 env=env, 

333 ) 

334 stdin = assume_not_none(cmd.stdin) 

335 try: 

336 fancy_output.stream = stdin 

337 yield stdin, fancy_output 

338 except Exception: 

339 stdin.close() 

340 cmd.kill() 

341 cmd.wait() 

342 raise 

343 finally: 

344 fancy_output.stream = sys.stdin 

345 stdin.close() 

346 cmd.wait()