Coverage for src/debputy/plugins/debputy/binary_package_rules.py: 83%

174 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-09-07 09:27 +0000

1import dataclasses 

2import os 

3from typing import ( 

4 Any, 

5 List, 

6 NotRequired, 

7 Union, 

8 Literal, 

9 TypedDict, 

10 Annotated, 

11 Optional, 

12 FrozenSet, 

13 Self, 

14 cast, 

15) 

16 

17from debputy._manifest_constants import ( 

18 MK_INSTALLATION_SEARCH_DIRS, 

19 MK_BINARY_VERSION, 

20 MK_SERVICES, 

21) 

22from debputy.maintscript_snippet import DpkgMaintscriptHelperCommand, MaintscriptSnippet 

23from debputy.manifest_parser.base_types import FileSystemExactMatchRule 

24from debputy.manifest_parser.declarative_parser import ParserGenerator 

25from debputy.manifest_parser.exceptions import ManifestParseException 

26from debputy.manifest_parser.parse_hints import DebputyParseHint 

27from debputy.manifest_parser.parser_data import ParserContextData 

28from debputy.manifest_parser.tagging_types import DebputyParsedContent 

29from debputy.manifest_parser.util import AttributePath 

30from debputy.path_matcher import MatchRule, MATCH_ANYTHING, ExactFileSystemPath 

31from debputy.plugin.api import reference_documentation 

32from debputy.plugin.api.impl import ( 

33 DebputyPluginInitializerProvider, 

34 ServiceDefinitionImpl, 

35) 

36from debputy.plugin.api.parser_tables import OPARSER_PACKAGES 

37from debputy.plugin.api.spec import ( 

38 ServiceUpgradeRule, 

39 ServiceDefinition, 

40 DSD, 

41 INTEGRATION_MODE_DH_DEBPUTY_RRR, 

42 not_integrations, 

43) 

44from debputy.transformation_rules import TransformationRule 

45from debputy.util import _error, manifest_format_doc 

46 

47ACCEPTABLE_CLEAN_ON_REMOVAL_FOR_GLOBS_AND_EXACT_MATCHES = frozenset( 

48 [ 

49 "./var/log", 

50 ] 

51) 

52 

53 

54ACCEPTABLE_CLEAN_ON_REMOVAL_IF_EXACT_MATCH_OR_SUBDIR_OF = frozenset( 

55 [ 

56 "./etc", 

57 "./run", 

58 "./var/lib", 

59 "./var/cache", 

60 "./var/backups", 

61 "./var/spool", 

62 # linux-image uses these paths with some `rm -f` 

63 "./usr/lib/modules", 

64 "./lib/modules", 

65 # udev special case 

66 "./lib/udev", 

67 "./usr/lib/udev", 

68 # pciutils deletes /usr/share/misc/pci.ids.<ext> 

69 "./usr/share/misc", 

70 ] 

71) 

72 

73 

74def register_binary_package_rules(api: DebputyPluginInitializerProvider) -> None: 

75 api.pluggable_manifest_rule( 

76 OPARSER_PACKAGES, 

77 MK_BINARY_VERSION, 

78 BinaryVersionParsedFormat, 

79 _parse_binary_version, 

80 source_format=str, 

81 ) 

82 

83 api.pluggable_manifest_rule( 

84 OPARSER_PACKAGES, 

85 "transformations", 

86 List[TransformationRule], 

87 _unpack_list, 

88 ) 

89 

90 api.pluggable_manifest_rule( 

91 OPARSER_PACKAGES, 

92 "conffile-management", 

93 List[DpkgMaintscriptHelperCommand], 

94 _unpack_list, 

95 expected_debputy_integration_mode=not_integrations( 

96 INTEGRATION_MODE_DH_DEBPUTY_RRR 

97 ), 

98 ) 

99 

100 api.pluggable_manifest_rule( 

101 OPARSER_PACKAGES, 

102 MK_SERVICES, 

103 List[ServiceRuleParsedFormat], 

104 _process_service_rules, 

105 source_format=List[ServiceRuleSourceFormat], 

106 expected_debputy_integration_mode=not_integrations( 

107 INTEGRATION_MODE_DH_DEBPUTY_RRR 

108 ), 

109 ) 

110 

111 api.pluggable_manifest_rule( 

112 OPARSER_PACKAGES, 

113 "clean-after-removal", 

114 ListParsedFormat, 

115 _parse_clean_after_removal, 

116 # FIXME: debputy won't see the attributes for this one :'( 

117 # (update `debputy_docs.yaml` when fixed) 

118 source_format=List[Any], 

119 expected_debputy_integration_mode=not_integrations( 

120 INTEGRATION_MODE_DH_DEBPUTY_RRR 

121 ), 

122 ) 

123 

124 api.pluggable_manifest_rule( 

125 OPARSER_PACKAGES, 

126 MK_INSTALLATION_SEARCH_DIRS, 

127 InstallationSearchDirsParsedFormat, 

128 _parse_installation_search_dirs, 

129 source_format=List[FileSystemExactMatchRule], 

130 expected_debputy_integration_mode=not_integrations( 

131 INTEGRATION_MODE_DH_DEBPUTY_RRR 

132 ), 

133 ) 

134 

135 

136class ServiceRuleSourceFormat(TypedDict): 

137 service: str 

138 type_of_service: NotRequired[str] 

139 service_scope: NotRequired[Literal["system", "user"]] 

140 enable_on_install: NotRequired[bool] 

141 start_on_install: NotRequired[bool] 

142 on_upgrade: NotRequired[ServiceUpgradeRule] 

143 service_manager: NotRequired[ 

144 Annotated[str, DebputyParseHint.target_attribute("service_managers")] 

145 ] 

146 service_managers: NotRequired[List[str]] 

147 

148 

149class ServiceRuleParsedFormat(DebputyParsedContent): 

150 service: str 

151 type_of_service: NotRequired[str] 

152 service_scope: NotRequired[Literal["system", "user"]] 

153 enable_on_install: NotRequired[bool] 

154 start_on_install: NotRequired[bool] 

155 on_upgrade: NotRequired[ServiceUpgradeRule] 

156 service_managers: NotRequired[List[str]] 

157 

158 

159@dataclasses.dataclass(slots=True, frozen=True) 

160class ServiceRule: 

161 definition_source: str 

162 service: str 

163 type_of_service: str 

164 service_scope: Literal["system", "user"] 

165 enable_on_install: Optional[bool] 

166 start_on_install: Optional[bool] 

167 on_upgrade: Optional[ServiceUpgradeRule] 

168 service_managers: Optional[FrozenSet[str]] 

169 

170 @classmethod 

171 def from_service_rule_parsed_format( 

172 cls, 

173 data: ServiceRuleParsedFormat, 

174 attribute_path: AttributePath, 

175 ) -> "Self": 

176 service_managers = data.get("service_managers") 

177 return cls( 

178 attribute_path.path, 

179 data["service"], 

180 data.get("type_of_service", "service"), 

181 cast("Literal['system', 'user']", data.get("service_scope", "system")), 

182 data.get("enable_on_install"), 

183 data.get("start_on_install"), 

184 data.get("on_upgrade"), 

185 frozenset(service_managers) if service_managers else service_managers, 

186 ) 

187 

188 def applies_to_service_manager(self, service_manager: str) -> bool: 

189 return self.service_managers is None or service_manager in self.service_managers 

190 

191 def apply_to_service_definition( 

192 self, 

193 service_definition: ServiceDefinition[DSD], 

194 ) -> ServiceDefinition[DSD]: 

195 assert isinstance(service_definition, ServiceDefinitionImpl) 

196 if not service_definition.is_plugin_provided_definition: 

197 _error( 

198 f"Conflicting definitions related to {self.service} (type: {self.type_of_service}," 

199 f" scope: {self.service_scope}). First definition at {service_definition.definition_source}," 

200 f" the second at {self.definition_source}). If they are for different service managers," 

201 " you can often avoid this problem by explicitly defining which service managers are applicable" 

202 ' to each rule via the "service-managers" keyword.' 

203 ) 

204 changes = { 

205 "definition_source": self.definition_source, 

206 "is_plugin_provided_definition": False, 

207 } 

208 if ( 

209 self.service != service_definition.name 

210 and self.service in service_definition.names 

211 ): 

212 changes["name"] = self.service 

213 if self.enable_on_install is not None: 

214 changes["auto_start_on_install"] = self.enable_on_install 

215 if self.start_on_install is not None: 

216 changes["auto_start_on_install"] = self.start_on_install 

217 if self.on_upgrade is not None: 

218 changes["on_upgrade"] = self.on_upgrade 

219 

220 return service_definition.replace(**changes) 

221 

222 

223class BinaryVersionParsedFormat(DebputyParsedContent): 

224 binary_version: str 

225 

226 

227class ListParsedFormat(DebputyParsedContent): 

228 elements: List[Any] 

229 

230 

231class ListOfTransformationRulesFormat(DebputyParsedContent): 

232 elements: List[TransformationRule] 

233 

234 

235class ListOfDpkgMaintscriptHelperCommandFormat(DebputyParsedContent): 

236 elements: List[DpkgMaintscriptHelperCommand] 

237 

238 

239class InstallationSearchDirsParsedFormat(DebputyParsedContent): 

240 installation_search_dirs: List[FileSystemExactMatchRule] 

241 

242 

243def _parse_binary_version( 

244 _name: str, 

245 parsed_data: BinaryVersionParsedFormat, 

246 _attribute_path: AttributePath, 

247 _parser_context: ParserContextData, 

248) -> str: 

249 return parsed_data["binary_version"] 

250 

251 

252def _parse_installation_search_dirs( 

253 _name: str, 

254 parsed_data: InstallationSearchDirsParsedFormat, 

255 _attribute_path: AttributePath, 

256 _parser_context: ParserContextData, 

257) -> List[FileSystemExactMatchRule]: 

258 return parsed_data["installation_search_dirs"] 

259 

260 

261def _process_service_rules( 

262 _name: str, 

263 parsed_data: List[ServiceRuleParsedFormat], 

264 attribute_path: AttributePath, 

265 _parser_context: ParserContextData, 

266) -> List[ServiceRule]: 

267 return [ 

268 ServiceRule.from_service_rule_parsed_format(x, attribute_path[i]) 

269 for i, x in enumerate(parsed_data) 

270 ] 

271 

272 

273def _unpack_list( 

274 _name: str, 

275 parsed_data: List[Any], 

276 _attribute_path: AttributePath, 

277 _parser_context: ParserContextData, 

278) -> List[Any]: 

279 return parsed_data 

280 

281 

282class CleanAfterRemovalRuleSourceFormat(TypedDict): 

283 path: NotRequired[Annotated[str, DebputyParseHint.target_attribute("paths")]] 

284 paths: NotRequired[List[str]] 

285 delete_on: NotRequired[Literal["purge", "removal"]] 

286 recursive: NotRequired[bool] 

287 ignore_non_empty_dir: NotRequired[bool] 

288 

289 

290class CleanAfterRemovalRule(DebputyParsedContent): 

291 paths: List[str] 

292 delete_on: NotRequired[Literal["purge", "removal"]] 

293 recursive: NotRequired[bool] 

294 ignore_non_empty_dir: NotRequired[bool] 

295 

296 

297# FIXME: Not optimal that we are doing an initialization of ParserGenerator here. But the rule is not depending on any 

298# complex types that is registered by plugins, so it will work for now. 

299_CLEAN_AFTER_REMOVAL_RULE_PARSER = ParserGenerator().generate_parser( 

300 CleanAfterRemovalRule, 

301 source_content=Union[CleanAfterRemovalRuleSourceFormat, str, List[str]], 

302 inline_reference_documentation=reference_documentation( 

303 reference_documentation_url=manifest_format_doc( 

304 "remove-runtime-created-paths-on-purge-or-post-removal-clean-after-removal" 

305 ), 

306 ), 

307) 

308 

309 

310# Order between clean_on_removal and conffile_management is 

311# important. We want the dpkg conffile management rules to happen before the 

312# clean clean_on_removal rules. Since the latter only affects `postrm` 

313# and the order is reversed for `postrm` scripts (among other), we need do 

314# clean_on_removal first to account for the reversing of order. 

315# 

316# FIXME: All of this is currently not really possible todo, but it should be. 

317# (I think it is the correct order by "mistake" rather than by "design", which is 

318# what this note is about) 

319def _parse_clean_after_removal( 

320 _name: str, 

321 parsed_data: ListParsedFormat, 

322 attribute_path: AttributePath, 

323 parser_context: ParserContextData, 

324) -> None: # TODO: Return and pass to a maintscript helper 

325 raw_clean_after_removal = parsed_data["elements"] 

326 package_state = parser_context.current_binary_package_state 

327 

328 for no, raw_transformation in enumerate(raw_clean_after_removal): 

329 definition_source = attribute_path[no] 

330 clean_after_removal_rules = _CLEAN_AFTER_REMOVAL_RULE_PARSER.parse_input( 

331 raw_transformation, 

332 definition_source, 

333 parser_context=parser_context, 

334 ) 

335 patterns = clean_after_removal_rules["paths"] 

336 if patterns: 336 ↛ 338line 336 didn't jump to line 338 because the condition on line 336 was always true

337 definition_source.path_hint = patterns[0] 

338 delete_on = clean_after_removal_rules.get("delete_on") or "purge" 

339 recurse = clean_after_removal_rules.get("recursive") or False 

340 ignore_non_empty_dir = ( 

341 clean_after_removal_rules.get("ignore_non_empty_dir") or False 

342 ) 

343 if delete_on == "purge": 343 ↛ 346line 343 didn't jump to line 346 because the condition on line 343 was always true

344 condition = '[ "$1" = "purge" ]' 

345 else: 

346 condition = '[ "$1" = "remove" ]' 

347 

348 if ignore_non_empty_dir: 

349 if recurse: 349 ↛ 350line 349 didn't jump to line 350 because the condition on line 349 was never true

350 raise ManifestParseException( 

351 'The "recursive" and "ignore-non-empty-dir" options are mutually exclusive.' 

352 f" Both were enabled at the same time in at {definition_source.path}" 

353 ) 

354 for pattern in patterns: 

355 if not pattern.endswith("/"): 355 ↛ 356line 355 didn't jump to line 356 because the condition on line 355 was never true

356 raise ManifestParseException( 

357 'When ignore-non-empty-dir is True, then all patterns must end with a literal "/"' 

358 f' to ensure they only apply to directories. The pattern "{pattern}" at' 

359 f" {definition_source.path} did not." 

360 ) 

361 

362 substitution = parser_context.substitution 

363 match_rules = [ 

364 MatchRule.from_path_or_glob( 

365 p, definition_source.path, substitution=substitution 

366 ) 

367 for p in patterns 

368 ] 

369 content_lines = [ 

370 f"if {condition}; then\n", 

371 ] 

372 for idx, match_rule in enumerate(match_rules): 

373 original_pattern = patterns[idx] 

374 if match_rule is MATCH_ANYTHING: 374 ↛ 375line 374 didn't jump to line 375 because the condition on line 374 was never true

375 raise ManifestParseException( 

376 f'Using "{original_pattern}" in a clean rule would trash the system.' 

377 f" Please restrict this pattern at {definition_source.path} considerably." 

378 ) 

379 is_subdir_match = False 

380 matched_directory: Optional[str] 

381 if isinstance(match_rule, ExactFileSystemPath): 

382 matched_directory = ( 

383 os.path.dirname(match_rule.path) 

384 if match_rule.path not in ("/", ".", "./") 

385 else match_rule.path 

386 ) 

387 is_subdir_match = True 

388 else: 

389 matched_directory = getattr(match_rule, "directory", None) 

390 

391 if matched_directory is None: 391 ↛ 392line 391 didn't jump to line 392 because the condition on line 391 was never true

392 raise ManifestParseException( 

393 f'The pattern "{original_pattern}" defined at {definition_source.path} is not' 

394 f" trivially anchored in a specific directory. Cowardly refusing to use it" 

395 f" in a clean rule as it may trash the system if the pattern is overreaching." 

396 f" Please avoid glob characters in the top level directories." 

397 ) 

398 assert matched_directory.startswith("./") or matched_directory in ( 

399 ".", 

400 "./", 

401 "", 

402 ) 

403 acceptable_directory = False 

404 would_have_allowed_direct_match = False 

405 while matched_directory not in (".", "./", ""): 

406 # Our acceptable paths set includes "/var/lib" or "/etc". We require that the 

407 # pattern is either an exact match, in which case it may match directly inside 

408 # the acceptable directory OR it is a pattern against a subdirectory of the 

409 # acceptable path. As an example: 

410 # 

411 # /etc/inputrc <-- OK, exact match 

412 # /etc/foo/* <-- OK, subdir match 

413 # /etc/* <-- ERROR, glob directly in the accepted directory. 

414 if is_subdir_match and ( 

415 matched_directory 

416 in ACCEPTABLE_CLEAN_ON_REMOVAL_IF_EXACT_MATCH_OR_SUBDIR_OF 

417 ): 

418 acceptable_directory = True 

419 break 

420 if ( 

421 matched_directory 

422 in ACCEPTABLE_CLEAN_ON_REMOVAL_FOR_GLOBS_AND_EXACT_MATCHES 

423 ): 

424 # Special-case: In some directories (such as /var/log), we allow globs directly. 

425 # Notably, X11's log files are /var/log/Xorg.*.log 

426 acceptable_directory = True 

427 break 

428 if ( 

429 matched_directory 

430 in ACCEPTABLE_CLEAN_ON_REMOVAL_IF_EXACT_MATCH_OR_SUBDIR_OF 

431 ): 

432 would_have_allowed_direct_match = True 

433 break 

434 matched_directory = os.path.dirname(matched_directory) 

435 is_subdir_match = True 

436 

437 if would_have_allowed_direct_match and not acceptable_directory: 

438 raise ManifestParseException( 

439 f'The pattern "{original_pattern}" defined at {definition_source.path} seems to' 

440 " be overreaching. If it had been a path (and not use a glob), the rule would" 

441 " have been permitted." 

442 ) 

443 elif not acceptable_directory: 

444 raise ManifestParseException( 

445 f'The pattern or path "{original_pattern}" defined at {definition_source.path} seems to' 

446 f' be overreaching or not limited to the set of "known acceptable" directories.' 

447 ) 

448 

449 try: 

450 shell_escaped_pattern = match_rule.shell_escape_pattern() 

451 except TypeError: 

452 raise ManifestParseException( 

453 f'Sorry, the pattern "{original_pattern}" defined at {definition_source.path}' 

454 f" is unfortunately not supported by `debputy` for clean-after-removal rules." 

455 f" If you can rewrite the rule to something like `/var/log/foo/*.log` or" 

456 f' similar "trivial" patterns. You may have to rewrite the pattern the rule ' 

457 f" into multiple patterns to achieve this. This restriction is to enable " 

458 f' `debputy` to ensure the pattern is correctly executed plus catch "obvious' 

459 f' system trashing" patterns. Apologies for the inconvenience.' 

460 ) 

461 

462 if ignore_non_empty_dir: 

463 cmd = f' rmdir --ignore-fail-on-non-empty "${ DPKG_ROOT} "{shell_escaped_pattern}\n' 

464 elif recurse: 

465 cmd = f' rm -fr "${ DPKG_ROOT} "{shell_escaped_pattern}\n' 

466 elif original_pattern.endswith("/"): 

467 cmd = f' rmdir "${ DPKG_ROOT} "{shell_escaped_pattern}\n' 

468 else: 

469 cmd = f' rm -f "${ DPKG_ROOT} "{shell_escaped_pattern}\n' 

470 content_lines.append(cmd) 

471 content_lines.append("fi\n") 

472 

473 snippet = MaintscriptSnippet(definition_source.path, "".join(content_lines)) 

474 package_state.maintscript_snippets["postrm"].append(snippet)