Coverage for src/debputy/plugin/debputy/binary_package_rules.py: 83%

173 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2025-01-27 13:59 +0000

1import dataclasses 

2import os 

3from typing import ( 

4 Any, 

5 List, 

6 NotRequired, 

7 Union, 

8 Literal, 

9 TypedDict, 

10 Annotated, 

11 Optional, 

12 FrozenSet, 

13 Self, 

14 cast, 

15) 

16 

17from debputy.maintscript_snippet import DpkgMaintscriptHelperCommand, MaintscriptSnippet 

18from debputy.manifest_parser.base_types import FileSystemExactMatchRule 

19from debputy.manifest_parser.declarative_parser import ParserGenerator 

20from debputy.manifest_parser.exceptions import ManifestParseException 

21from debputy.manifest_parser.parse_hints import DebputyParseHint 

22from debputy.manifest_parser.parser_data import ParserContextData 

23from debputy.manifest_parser.tagging_types import DebputyParsedContent 

24from debputy.manifest_parser.util import AttributePath 

25from debputy.path_matcher import MatchRule, MATCH_ANYTHING, ExactFileSystemPath 

26from debputy.plugin.api import reference_documentation 

27from debputy.plugin.api.impl import ( 

28 DebputyPluginInitializerProvider, 

29 ServiceDefinitionImpl, 

30) 

31from debputy.plugin.api.parser_tables import OPARSER_PACKAGES 

32from debputy.plugin.api.spec import ( 

33 ServiceUpgradeRule, 

34 ServiceDefinition, 

35 DSD, 

36 INTEGRATION_MODE_DH_DEBPUTY_RRR, 

37 not_integrations, 

38) 

39from debputy.transformation_rules import TransformationRule 

40from debputy.util import _error, manifest_format_doc 

41 

42ACCEPTABLE_CLEAN_ON_REMOVAL_FOR_GLOBS_AND_EXACT_MATCHES = frozenset( 

43 [ 

44 "./var/log", 

45 ] 

46) 

47 

48 

49ACCEPTABLE_CLEAN_ON_REMOVAL_IF_EXACT_MATCH_OR_SUBDIR_OF = frozenset( 

50 [ 

51 "./etc", 

52 "./run", 

53 "./var/lib", 

54 "./var/cache", 

55 "./var/backups", 

56 "./var/spool", 

57 # linux-image uses these paths with some `rm -f` 

58 "./usr/lib/modules", 

59 "./lib/modules", 

60 # udev special case 

61 "./lib/udev", 

62 "./usr/lib/udev", 

63 # pciutils deletes /usr/share/misc/pci.ids.<ext> 

64 "./usr/share/misc", 

65 ] 

66) 

67 

68 

69def register_binary_package_rules(api: DebputyPluginInitializerProvider) -> None: 

70 api.pluggable_manifest_rule( 

71 OPARSER_PACKAGES, 

72 "binary-version", 

73 BinaryVersionParsedFormat, 

74 _parse_binary_version, 

75 source_format=str, 

76 ) 

77 

78 api.pluggable_manifest_rule( 

79 OPARSER_PACKAGES, 

80 "transformations", 

81 List[TransformationRule], 

82 _unpack_list, 

83 ) 

84 

85 api.pluggable_manifest_rule( 

86 OPARSER_PACKAGES, 

87 "conffile-management", 

88 List[DpkgMaintscriptHelperCommand], 

89 _unpack_list, 

90 expected_debputy_integration_mode=not_integrations( 

91 INTEGRATION_MODE_DH_DEBPUTY_RRR 

92 ), 

93 ) 

94 

95 api.pluggable_manifest_rule( 

96 OPARSER_PACKAGES, 

97 "services", 

98 List[ServiceRuleParsedFormat], 

99 _process_service_rules, 

100 source_format=List[ServiceRuleSourceFormat], 

101 expected_debputy_integration_mode=not_integrations( 

102 INTEGRATION_MODE_DH_DEBPUTY_RRR 

103 ), 

104 ) 

105 

106 api.pluggable_manifest_rule( 

107 OPARSER_PACKAGES, 

108 "clean-after-removal", 

109 ListParsedFormat, 

110 _parse_clean_after_removal, 

111 # FIXME: debputy won't see the attributes for this one :'( 

112 # (update `debputy_docs.yaml` when fixed) 

113 source_format=List[Any], 

114 expected_debputy_integration_mode=not_integrations( 

115 INTEGRATION_MODE_DH_DEBPUTY_RRR 

116 ), 

117 ) 

118 

119 api.pluggable_manifest_rule( 

120 OPARSER_PACKAGES, 

121 "installation-search-dirs", 

122 InstallationSearchDirsParsedFormat, 

123 _parse_installation_search_dirs, 

124 source_format=List[FileSystemExactMatchRule], 

125 expected_debputy_integration_mode=not_integrations( 

126 INTEGRATION_MODE_DH_DEBPUTY_RRR 

127 ), 

128 ) 

129 

130 

131class ServiceRuleSourceFormat(TypedDict): 

132 service: str 

133 type_of_service: NotRequired[str] 

134 service_scope: NotRequired[Literal["system", "user"]] 

135 enable_on_install: NotRequired[bool] 

136 start_on_install: NotRequired[bool] 

137 on_upgrade: NotRequired[ServiceUpgradeRule] 

138 service_manager: NotRequired[ 

139 Annotated[str, DebputyParseHint.target_attribute("service_managers")] 

140 ] 

141 service_managers: NotRequired[List[str]] 

142 

143 

144class ServiceRuleParsedFormat(DebputyParsedContent): 

145 service: str 

146 type_of_service: NotRequired[str] 

147 service_scope: NotRequired[Literal["system", "user"]] 

148 enable_on_install: NotRequired[bool] 

149 start_on_install: NotRequired[bool] 

150 on_upgrade: NotRequired[ServiceUpgradeRule] 

151 service_managers: NotRequired[List[str]] 

152 

153 

154@dataclasses.dataclass(slots=True, frozen=True) 

155class ServiceRule: 

156 definition_source: str 

157 service: str 

158 type_of_service: str 

159 service_scope: Literal["system", "user"] 

160 enable_on_install: Optional[bool] 

161 start_on_install: Optional[bool] 

162 on_upgrade: Optional[ServiceUpgradeRule] 

163 service_managers: Optional[FrozenSet[str]] 

164 

165 @classmethod 

166 def from_service_rule_parsed_format( 

167 cls, 

168 data: ServiceRuleParsedFormat, 

169 attribute_path: AttributePath, 

170 ) -> "Self": 

171 service_managers = data.get("service_managers") 

172 return cls( 

173 attribute_path.path, 

174 data["service"], 

175 data.get("type_of_service", "service"), 

176 cast("Literal['system', 'user']", data.get("service_scope", "system")), 

177 data.get("enable_on_install"), 

178 data.get("start_on_install"), 

179 data.get("on_upgrade"), 

180 frozenset(service_managers) if service_managers else service_managers, 

181 ) 

182 

183 def applies_to_service_manager(self, service_manager: str) -> bool: 

184 return self.service_managers is None or service_manager in self.service_managers 

185 

186 def apply_to_service_definition( 

187 self, 

188 service_definition: ServiceDefinition[DSD], 

189 ) -> ServiceDefinition[DSD]: 

190 assert isinstance(service_definition, ServiceDefinitionImpl) 

191 if not service_definition.is_plugin_provided_definition: 

192 _error( 

193 f"Conflicting definitions related to {self.service} (type: {self.type_of_service}," 

194 f" scope: {self.service_scope}). First definition at {service_definition.definition_source}," 

195 f" the second at {self.definition_source}). If they are for different service managers," 

196 " you can often avoid this problem by explicitly defining which service managers are applicable" 

197 ' to each rule via the "service-managers" keyword.' 

198 ) 

199 changes = { 

200 "definition_source": self.definition_source, 

201 "is_plugin_provided_definition": False, 

202 } 

203 if ( 

204 self.service != service_definition.name 

205 and self.service in service_definition.names 

206 ): 

207 changes["name"] = self.service 

208 if self.enable_on_install is not None: 

209 changes["auto_start_on_install"] = self.enable_on_install 

210 if self.start_on_install is not None: 

211 changes["auto_start_on_install"] = self.start_on_install 

212 if self.on_upgrade is not None: 

213 changes["on_upgrade"] = self.on_upgrade 

214 

215 return service_definition.replace(**changes) 

216 

217 

218class BinaryVersionParsedFormat(DebputyParsedContent): 

219 binary_version: str 

220 

221 

222class ListParsedFormat(DebputyParsedContent): 

223 elements: List[Any] 

224 

225 

226class ListOfTransformationRulesFormat(DebputyParsedContent): 

227 elements: List[TransformationRule] 

228 

229 

230class ListOfDpkgMaintscriptHelperCommandFormat(DebputyParsedContent): 

231 elements: List[DpkgMaintscriptHelperCommand] 

232 

233 

234class InstallationSearchDirsParsedFormat(DebputyParsedContent): 

235 installation_search_dirs: List[FileSystemExactMatchRule] 

236 

237 

238def _parse_binary_version( 

239 _name: str, 

240 parsed_data: BinaryVersionParsedFormat, 

241 _attribute_path: AttributePath, 

242 _parser_context: ParserContextData, 

243) -> str: 

244 return parsed_data["binary_version"] 

245 

246 

247def _parse_installation_search_dirs( 

248 _name: str, 

249 parsed_data: InstallationSearchDirsParsedFormat, 

250 _attribute_path: AttributePath, 

251 _parser_context: ParserContextData, 

252) -> List[FileSystemExactMatchRule]: 

253 return parsed_data["installation_search_dirs"] 

254 

255 

256def _process_service_rules( 

257 _name: str, 

258 parsed_data: List[ServiceRuleParsedFormat], 

259 attribute_path: AttributePath, 

260 _parser_context: ParserContextData, 

261) -> List[ServiceRule]: 

262 return [ 

263 ServiceRule.from_service_rule_parsed_format(x, attribute_path[i]) 

264 for i, x in enumerate(parsed_data) 

265 ] 

266 

267 

268def _unpack_list( 

269 _name: str, 

270 parsed_data: List[Any], 

271 _attribute_path: AttributePath, 

272 _parser_context: ParserContextData, 

273) -> List[Any]: 

274 return parsed_data 

275 

276 

277class CleanAfterRemovalRuleSourceFormat(TypedDict): 

278 path: NotRequired[Annotated[str, DebputyParseHint.target_attribute("paths")]] 

279 paths: NotRequired[List[str]] 

280 delete_on: NotRequired[Literal["purge", "removal"]] 

281 recursive: NotRequired[bool] 

282 ignore_non_empty_dir: NotRequired[bool] 

283 

284 

285class CleanAfterRemovalRule(DebputyParsedContent): 

286 paths: List[str] 

287 delete_on: NotRequired[Literal["purge", "removal"]] 

288 recursive: NotRequired[bool] 

289 ignore_non_empty_dir: NotRequired[bool] 

290 

291 

292# FIXME: Not optimal that we are doing an initialization of ParserGenerator here. But the rule is not depending on any 

293# complex types that is registered by plugins, so it will work for now. 

294_CLEAN_AFTER_REMOVAL_RULE_PARSER = ParserGenerator().generate_parser( 

295 CleanAfterRemovalRule, 

296 source_content=Union[CleanAfterRemovalRuleSourceFormat, str, List[str]], 

297 inline_reference_documentation=reference_documentation( 

298 reference_documentation_url=manifest_format_doc( 

299 "remove-runtime-created-paths-on-purge-or-post-removal-clean-after-removal" 

300 ), 

301 ), 

302) 

303 

304 

305# Order between clean_on_removal and conffile_management is 

306# important. We want the dpkg conffile management rules to happen before the 

307# clean clean_on_removal rules. Since the latter only affects `postrm` 

308# and the order is reversed for `postrm` scripts (among other), we need do 

309# clean_on_removal first to account for the reversing of order. 

310# 

311# FIXME: All of this is currently not really possible todo, but it should be. 

312# (I think it is the correct order by "mistake" rather than by "design", which is 

313# what this note is about) 

314def _parse_clean_after_removal( 

315 _name: str, 

316 parsed_data: ListParsedFormat, 

317 attribute_path: AttributePath, 

318 parser_context: ParserContextData, 

319) -> None: # TODO: Return and pass to a maintscript helper 

320 raw_clean_after_removal = parsed_data["elements"] 

321 package_state = parser_context.current_binary_package_state 

322 

323 for no, raw_transformation in enumerate(raw_clean_after_removal): 

324 definition_source = attribute_path[no] 

325 clean_after_removal_rules = _CLEAN_AFTER_REMOVAL_RULE_PARSER.parse_input( 

326 raw_transformation, 

327 definition_source, 

328 parser_context=parser_context, 

329 ) 

330 patterns = clean_after_removal_rules["paths"] 

331 if patterns: 331 ↛ 333line 331 didn't jump to line 333 because the condition on line 331 was always true

332 definition_source.path_hint = patterns[0] 

333 delete_on = clean_after_removal_rules.get("delete_on") or "purge" 

334 recurse = clean_after_removal_rules.get("recursive") or False 

335 ignore_non_empty_dir = ( 

336 clean_after_removal_rules.get("ignore_non_empty_dir") or False 

337 ) 

338 if delete_on == "purge": 338 ↛ 341line 338 didn't jump to line 341 because the condition on line 338 was always true

339 condition = '[ "$1" = "purge" ]' 

340 else: 

341 condition = '[ "$1" = "remove" ]' 

342 

343 if ignore_non_empty_dir: 

344 if recurse: 344 ↛ 345line 344 didn't jump to line 345 because the condition on line 344 was never true

345 raise ManifestParseException( 

346 'The "recursive" and "ignore-non-empty-dir" options are mutually exclusive.' 

347 f" Both were enabled at the same time in at {definition_source.path}" 

348 ) 

349 for pattern in patterns: 

350 if not pattern.endswith("/"): 350 ↛ 351line 350 didn't jump to line 351 because the condition on line 350 was never true

351 raise ManifestParseException( 

352 'When ignore-non-empty-dir is True, then all patterns must end with a literal "/"' 

353 f' to ensure they only apply to directories. The pattern "{pattern}" at' 

354 f" {definition_source.path} did not." 

355 ) 

356 

357 substitution = parser_context.substitution 

358 match_rules = [ 

359 MatchRule.from_path_or_glob( 

360 p, definition_source.path, substitution=substitution 

361 ) 

362 for p in patterns 

363 ] 

364 content_lines = [ 

365 f"if {condition}; then\n", 

366 ] 

367 for idx, match_rule in enumerate(match_rules): 

368 original_pattern = patterns[idx] 

369 if match_rule is MATCH_ANYTHING: 369 ↛ 370line 369 didn't jump to line 370 because the condition on line 369 was never true

370 raise ManifestParseException( 

371 f'Using "{original_pattern}" in a clean rule would trash the system.' 

372 f" Please restrict this pattern at {definition_source.path} considerably." 

373 ) 

374 is_subdir_match = False 

375 matched_directory: Optional[str] 

376 if isinstance(match_rule, ExactFileSystemPath): 

377 matched_directory = ( 

378 os.path.dirname(match_rule.path) 

379 if match_rule.path not in ("/", ".", "./") 

380 else match_rule.path 

381 ) 

382 is_subdir_match = True 

383 else: 

384 matched_directory = getattr(match_rule, "directory", None) 

385 

386 if matched_directory is None: 386 ↛ 387line 386 didn't jump to line 387 because the condition on line 386 was never true

387 raise ManifestParseException( 

388 f'The pattern "{original_pattern}" defined at {definition_source.path} is not' 

389 f" trivially anchored in a specific directory. Cowardly refusing to use it" 

390 f" in a clean rule as it may trash the system if the pattern is overreaching." 

391 f" Please avoid glob characters in the top level directories." 

392 ) 

393 assert matched_directory.startswith("./") or matched_directory in ( 

394 ".", 

395 "./", 

396 "", 

397 ) 

398 acceptable_directory = False 

399 would_have_allowed_direct_match = False 

400 while matched_directory not in (".", "./", ""): 

401 # Our acceptable paths set includes "/var/lib" or "/etc". We require that the 

402 # pattern is either an exact match, in which case it may match directly inside 

403 # the acceptable directory OR it is a pattern against a subdirectory of the 

404 # acceptable path. As an example: 

405 # 

406 # /etc/inputrc <-- OK, exact match 

407 # /etc/foo/* <-- OK, subdir match 

408 # /etc/* <-- ERROR, glob directly in the accepted directory. 

409 if is_subdir_match and ( 

410 matched_directory 

411 in ACCEPTABLE_CLEAN_ON_REMOVAL_IF_EXACT_MATCH_OR_SUBDIR_OF 

412 ): 

413 acceptable_directory = True 

414 break 

415 if ( 

416 matched_directory 

417 in ACCEPTABLE_CLEAN_ON_REMOVAL_FOR_GLOBS_AND_EXACT_MATCHES 

418 ): 

419 # Special-case: In some directories (such as /var/log), we allow globs directly. 

420 # Notably, X11's log files are /var/log/Xorg.*.log 

421 acceptable_directory = True 

422 break 

423 if ( 

424 matched_directory 

425 in ACCEPTABLE_CLEAN_ON_REMOVAL_IF_EXACT_MATCH_OR_SUBDIR_OF 

426 ): 

427 would_have_allowed_direct_match = True 

428 break 

429 matched_directory = os.path.dirname(matched_directory) 

430 is_subdir_match = True 

431 

432 if would_have_allowed_direct_match and not acceptable_directory: 

433 raise ManifestParseException( 

434 f'The pattern "{original_pattern}" defined at {definition_source.path} seems to' 

435 " be overreaching. If it has been a path (and not use a glob), the rule would" 

436 " have been permitted." 

437 ) 

438 elif not acceptable_directory: 

439 raise ManifestParseException( 

440 f'The pattern or path "{original_pattern}" defined at {definition_source.path} seems to' 

441 f' be overreaching or not limited to the set of "known acceptable" directories.' 

442 ) 

443 

444 try: 

445 shell_escaped_pattern = match_rule.shell_escape_pattern() 

446 except TypeError: 

447 raise ManifestParseException( 

448 f'Sorry, the pattern "{original_pattern}" defined at {definition_source.path}' 

449 f" is unfortunately not supported by `debputy` for clean-after-removal rules." 

450 f" If you can rewrite the rule to something like `/var/log/foo/*.log` or" 

451 f' similar "trivial" patterns. You may have to rewrite the pattern the rule ' 

452 f" into multiple patterns to achieve this. This restriction is to enable " 

453 f' `debputy` to ensure the pattern is correctly executed plus catch "obvious' 

454 f' system trashing" patterns. Apologies for the inconvenience.' 

455 ) 

456 

457 if ignore_non_empty_dir: 

458 cmd = f' rmdir --ignore-fail-on-non-empty "${ DPKG_ROOT} "{shell_escaped_pattern}\n' 

459 elif recurse: 

460 cmd = f' rm -fr "${ DPKG_ROOT} "{shell_escaped_pattern}\n' 

461 elif original_pattern.endswith("/"): 

462 cmd = f' rmdir "${ DPKG_ROOT} "{shell_escaped_pattern}\n' 

463 else: 

464 cmd = f' rm -f "${ DPKG_ROOT} "{shell_escaped_pattern}\n' 

465 content_lines.append(cmd) 

466 content_lines.append("fi\n") 

467 

468 snippet = MaintscriptSnippet(definition_source.path, "".join(content_lines)) 

469 package_state.maintscript_snippets["postrm"].append(snippet)