Coverage for src/debputy/plugins/debputy/binary_package_rules.py: 83%
174 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-09-07 09:27 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-09-07 09:27 +0000
1import dataclasses
2import os
3from typing import (
4 Any,
5 List,
6 NotRequired,
7 Union,
8 Literal,
9 TypedDict,
10 Annotated,
11 Optional,
12 FrozenSet,
13 Self,
14 cast,
15)
17from debputy._manifest_constants import (
18 MK_INSTALLATION_SEARCH_DIRS,
19 MK_BINARY_VERSION,
20 MK_SERVICES,
21)
22from debputy.maintscript_snippet import DpkgMaintscriptHelperCommand, MaintscriptSnippet
23from debputy.manifest_parser.base_types import FileSystemExactMatchRule
24from debputy.manifest_parser.declarative_parser import ParserGenerator
25from debputy.manifest_parser.exceptions import ManifestParseException
26from debputy.manifest_parser.parse_hints import DebputyParseHint
27from debputy.manifest_parser.parser_data import ParserContextData
28from debputy.manifest_parser.tagging_types import DebputyParsedContent
29from debputy.manifest_parser.util import AttributePath
30from debputy.path_matcher import MatchRule, MATCH_ANYTHING, ExactFileSystemPath
31from debputy.plugin.api import reference_documentation
32from debputy.plugin.api.impl import (
33 DebputyPluginInitializerProvider,
34 ServiceDefinitionImpl,
35)
36from debputy.plugin.api.parser_tables import OPARSER_PACKAGES
37from debputy.plugin.api.spec import (
38 ServiceUpgradeRule,
39 ServiceDefinition,
40 DSD,
41 INTEGRATION_MODE_DH_DEBPUTY_RRR,
42 not_integrations,
43)
44from debputy.transformation_rules import TransformationRule
45from debputy.util import _error, manifest_format_doc
47ACCEPTABLE_CLEAN_ON_REMOVAL_FOR_GLOBS_AND_EXACT_MATCHES = frozenset(
48 [
49 "./var/log",
50 ]
51)
54ACCEPTABLE_CLEAN_ON_REMOVAL_IF_EXACT_MATCH_OR_SUBDIR_OF = frozenset(
55 [
56 "./etc",
57 "./run",
58 "./var/lib",
59 "./var/cache",
60 "./var/backups",
61 "./var/spool",
62 # linux-image uses these paths with some `rm -f`
63 "./usr/lib/modules",
64 "./lib/modules",
65 # udev special case
66 "./lib/udev",
67 "./usr/lib/udev",
68 # pciutils deletes /usr/share/misc/pci.ids.<ext>
69 "./usr/share/misc",
70 ]
71)
74def register_binary_package_rules(api: DebputyPluginInitializerProvider) -> None:
75 api.pluggable_manifest_rule(
76 OPARSER_PACKAGES,
77 MK_BINARY_VERSION,
78 BinaryVersionParsedFormat,
79 _parse_binary_version,
80 source_format=str,
81 )
83 api.pluggable_manifest_rule(
84 OPARSER_PACKAGES,
85 "transformations",
86 List[TransformationRule],
87 _unpack_list,
88 )
90 api.pluggable_manifest_rule(
91 OPARSER_PACKAGES,
92 "conffile-management",
93 List[DpkgMaintscriptHelperCommand],
94 _unpack_list,
95 expected_debputy_integration_mode=not_integrations(
96 INTEGRATION_MODE_DH_DEBPUTY_RRR
97 ),
98 )
100 api.pluggable_manifest_rule(
101 OPARSER_PACKAGES,
102 MK_SERVICES,
103 List[ServiceRuleParsedFormat],
104 _process_service_rules,
105 source_format=List[ServiceRuleSourceFormat],
106 expected_debputy_integration_mode=not_integrations(
107 INTEGRATION_MODE_DH_DEBPUTY_RRR
108 ),
109 )
111 api.pluggable_manifest_rule(
112 OPARSER_PACKAGES,
113 "clean-after-removal",
114 ListParsedFormat,
115 _parse_clean_after_removal,
116 # FIXME: debputy won't see the attributes for this one :'(
117 # (update `debputy_docs.yaml` when fixed)
118 source_format=List[Any],
119 expected_debputy_integration_mode=not_integrations(
120 INTEGRATION_MODE_DH_DEBPUTY_RRR
121 ),
122 )
124 api.pluggable_manifest_rule(
125 OPARSER_PACKAGES,
126 MK_INSTALLATION_SEARCH_DIRS,
127 InstallationSearchDirsParsedFormat,
128 _parse_installation_search_dirs,
129 source_format=List[FileSystemExactMatchRule],
130 expected_debputy_integration_mode=not_integrations(
131 INTEGRATION_MODE_DH_DEBPUTY_RRR
132 ),
133 )
136class ServiceRuleSourceFormat(TypedDict):
137 service: str
138 type_of_service: NotRequired[str]
139 service_scope: NotRequired[Literal["system", "user"]]
140 enable_on_install: NotRequired[bool]
141 start_on_install: NotRequired[bool]
142 on_upgrade: NotRequired[ServiceUpgradeRule]
143 service_manager: NotRequired[
144 Annotated[str, DebputyParseHint.target_attribute("service_managers")]
145 ]
146 service_managers: NotRequired[List[str]]
149class ServiceRuleParsedFormat(DebputyParsedContent):
150 service: str
151 type_of_service: NotRequired[str]
152 service_scope: NotRequired[Literal["system", "user"]]
153 enable_on_install: NotRequired[bool]
154 start_on_install: NotRequired[bool]
155 on_upgrade: NotRequired[ServiceUpgradeRule]
156 service_managers: NotRequired[List[str]]
159@dataclasses.dataclass(slots=True, frozen=True)
160class ServiceRule:
161 definition_source: str
162 service: str
163 type_of_service: str
164 service_scope: Literal["system", "user"]
165 enable_on_install: Optional[bool]
166 start_on_install: Optional[bool]
167 on_upgrade: Optional[ServiceUpgradeRule]
168 service_managers: Optional[FrozenSet[str]]
170 @classmethod
171 def from_service_rule_parsed_format(
172 cls,
173 data: ServiceRuleParsedFormat,
174 attribute_path: AttributePath,
175 ) -> "Self":
176 service_managers = data.get("service_managers")
177 return cls(
178 attribute_path.path,
179 data["service"],
180 data.get("type_of_service", "service"),
181 cast("Literal['system', 'user']", data.get("service_scope", "system")),
182 data.get("enable_on_install"),
183 data.get("start_on_install"),
184 data.get("on_upgrade"),
185 frozenset(service_managers) if service_managers else service_managers,
186 )
188 def applies_to_service_manager(self, service_manager: str) -> bool:
189 return self.service_managers is None or service_manager in self.service_managers
191 def apply_to_service_definition(
192 self,
193 service_definition: ServiceDefinition[DSD],
194 ) -> ServiceDefinition[DSD]:
195 assert isinstance(service_definition, ServiceDefinitionImpl)
196 if not service_definition.is_plugin_provided_definition:
197 _error(
198 f"Conflicting definitions related to {self.service} (type: {self.type_of_service},"
199 f" scope: {self.service_scope}). First definition at {service_definition.definition_source},"
200 f" the second at {self.definition_source}). If they are for different service managers,"
201 " you can often avoid this problem by explicitly defining which service managers are applicable"
202 ' to each rule via the "service-managers" keyword.'
203 )
204 changes = {
205 "definition_source": self.definition_source,
206 "is_plugin_provided_definition": False,
207 }
208 if (
209 self.service != service_definition.name
210 and self.service in service_definition.names
211 ):
212 changes["name"] = self.service
213 if self.enable_on_install is not None:
214 changes["auto_start_on_install"] = self.enable_on_install
215 if self.start_on_install is not None:
216 changes["auto_start_on_install"] = self.start_on_install
217 if self.on_upgrade is not None:
218 changes["on_upgrade"] = self.on_upgrade
220 return service_definition.replace(**changes)
223class BinaryVersionParsedFormat(DebputyParsedContent):
224 binary_version: str
227class ListParsedFormat(DebputyParsedContent):
228 elements: List[Any]
231class ListOfTransformationRulesFormat(DebputyParsedContent):
232 elements: List[TransformationRule]
235class ListOfDpkgMaintscriptHelperCommandFormat(DebputyParsedContent):
236 elements: List[DpkgMaintscriptHelperCommand]
239class InstallationSearchDirsParsedFormat(DebputyParsedContent):
240 installation_search_dirs: List[FileSystemExactMatchRule]
243def _parse_binary_version(
244 _name: str,
245 parsed_data: BinaryVersionParsedFormat,
246 _attribute_path: AttributePath,
247 _parser_context: ParserContextData,
248) -> str:
249 return parsed_data["binary_version"]
252def _parse_installation_search_dirs(
253 _name: str,
254 parsed_data: InstallationSearchDirsParsedFormat,
255 _attribute_path: AttributePath,
256 _parser_context: ParserContextData,
257) -> List[FileSystemExactMatchRule]:
258 return parsed_data["installation_search_dirs"]
261def _process_service_rules(
262 _name: str,
263 parsed_data: List[ServiceRuleParsedFormat],
264 attribute_path: AttributePath,
265 _parser_context: ParserContextData,
266) -> List[ServiceRule]:
267 return [
268 ServiceRule.from_service_rule_parsed_format(x, attribute_path[i])
269 for i, x in enumerate(parsed_data)
270 ]
273def _unpack_list(
274 _name: str,
275 parsed_data: List[Any],
276 _attribute_path: AttributePath,
277 _parser_context: ParserContextData,
278) -> List[Any]:
279 return parsed_data
282class CleanAfterRemovalRuleSourceFormat(TypedDict):
283 path: NotRequired[Annotated[str, DebputyParseHint.target_attribute("paths")]]
284 paths: NotRequired[List[str]]
285 delete_on: NotRequired[Literal["purge", "removal"]]
286 recursive: NotRequired[bool]
287 ignore_non_empty_dir: NotRequired[bool]
290class CleanAfterRemovalRule(DebputyParsedContent):
291 paths: List[str]
292 delete_on: NotRequired[Literal["purge", "removal"]]
293 recursive: NotRequired[bool]
294 ignore_non_empty_dir: NotRequired[bool]
297# FIXME: Not optimal that we are doing an initialization of ParserGenerator here. But the rule is not depending on any
298# complex types that is registered by plugins, so it will work for now.
299_CLEAN_AFTER_REMOVAL_RULE_PARSER = ParserGenerator().generate_parser(
300 CleanAfterRemovalRule,
301 source_content=Union[CleanAfterRemovalRuleSourceFormat, str, List[str]],
302 inline_reference_documentation=reference_documentation(
303 reference_documentation_url=manifest_format_doc(
304 "remove-runtime-created-paths-on-purge-or-post-removal-clean-after-removal"
305 ),
306 ),
307)
310# Order between clean_on_removal and conffile_management is
311# important. We want the dpkg conffile management rules to happen before the
312# clean clean_on_removal rules. Since the latter only affects `postrm`
313# and the order is reversed for `postrm` scripts (among other), we need do
314# clean_on_removal first to account for the reversing of order.
315#
316# FIXME: All of this is currently not really possible todo, but it should be.
317# (I think it is the correct order by "mistake" rather than by "design", which is
318# what this note is about)
319def _parse_clean_after_removal(
320 _name: str,
321 parsed_data: ListParsedFormat,
322 attribute_path: AttributePath,
323 parser_context: ParserContextData,
324) -> None: # TODO: Return and pass to a maintscript helper
325 raw_clean_after_removal = parsed_data["elements"]
326 package_state = parser_context.current_binary_package_state
328 for no, raw_transformation in enumerate(raw_clean_after_removal):
329 definition_source = attribute_path[no]
330 clean_after_removal_rules = _CLEAN_AFTER_REMOVAL_RULE_PARSER.parse_input(
331 raw_transformation,
332 definition_source,
333 parser_context=parser_context,
334 )
335 patterns = clean_after_removal_rules["paths"]
336 if patterns: 336 ↛ 338line 336 didn't jump to line 338 because the condition on line 336 was always true
337 definition_source.path_hint = patterns[0]
338 delete_on = clean_after_removal_rules.get("delete_on") or "purge"
339 recurse = clean_after_removal_rules.get("recursive") or False
340 ignore_non_empty_dir = (
341 clean_after_removal_rules.get("ignore_non_empty_dir") or False
342 )
343 if delete_on == "purge": 343 ↛ 346line 343 didn't jump to line 346 because the condition on line 343 was always true
344 condition = '[ "$1" = "purge" ]'
345 else:
346 condition = '[ "$1" = "remove" ]'
348 if ignore_non_empty_dir:
349 if recurse: 349 ↛ 350line 349 didn't jump to line 350 because the condition on line 349 was never true
350 raise ManifestParseException(
351 'The "recursive" and "ignore-non-empty-dir" options are mutually exclusive.'
352 f" Both were enabled at the same time in at {definition_source.path}"
353 )
354 for pattern in patterns:
355 if not pattern.endswith("/"): 355 ↛ 356line 355 didn't jump to line 356 because the condition on line 355 was never true
356 raise ManifestParseException(
357 'When ignore-non-empty-dir is True, then all patterns must end with a literal "/"'
358 f' to ensure they only apply to directories. The pattern "{pattern}" at'
359 f" {definition_source.path} did not."
360 )
362 substitution = parser_context.substitution
363 match_rules = [
364 MatchRule.from_path_or_glob(
365 p, definition_source.path, substitution=substitution
366 )
367 for p in patterns
368 ]
369 content_lines = [
370 f"if {condition}; then\n",
371 ]
372 for idx, match_rule in enumerate(match_rules):
373 original_pattern = patterns[idx]
374 if match_rule is MATCH_ANYTHING: 374 ↛ 375line 374 didn't jump to line 375 because the condition on line 374 was never true
375 raise ManifestParseException(
376 f'Using "{original_pattern}" in a clean rule would trash the system.'
377 f" Please restrict this pattern at {definition_source.path} considerably."
378 )
379 is_subdir_match = False
380 matched_directory: Optional[str]
381 if isinstance(match_rule, ExactFileSystemPath):
382 matched_directory = (
383 os.path.dirname(match_rule.path)
384 if match_rule.path not in ("/", ".", "./")
385 else match_rule.path
386 )
387 is_subdir_match = True
388 else:
389 matched_directory = getattr(match_rule, "directory", None)
391 if matched_directory is None: 391 ↛ 392line 391 didn't jump to line 392 because the condition on line 391 was never true
392 raise ManifestParseException(
393 f'The pattern "{original_pattern}" defined at {definition_source.path} is not'
394 f" trivially anchored in a specific directory. Cowardly refusing to use it"
395 f" in a clean rule as it may trash the system if the pattern is overreaching."
396 f" Please avoid glob characters in the top level directories."
397 )
398 assert matched_directory.startswith("./") or matched_directory in (
399 ".",
400 "./",
401 "",
402 )
403 acceptable_directory = False
404 would_have_allowed_direct_match = False
405 while matched_directory not in (".", "./", ""):
406 # Our acceptable paths set includes "/var/lib" or "/etc". We require that the
407 # pattern is either an exact match, in which case it may match directly inside
408 # the acceptable directory OR it is a pattern against a subdirectory of the
409 # acceptable path. As an example:
410 #
411 # /etc/inputrc <-- OK, exact match
412 # /etc/foo/* <-- OK, subdir match
413 # /etc/* <-- ERROR, glob directly in the accepted directory.
414 if is_subdir_match and (
415 matched_directory
416 in ACCEPTABLE_CLEAN_ON_REMOVAL_IF_EXACT_MATCH_OR_SUBDIR_OF
417 ):
418 acceptable_directory = True
419 break
420 if (
421 matched_directory
422 in ACCEPTABLE_CLEAN_ON_REMOVAL_FOR_GLOBS_AND_EXACT_MATCHES
423 ):
424 # Special-case: In some directories (such as /var/log), we allow globs directly.
425 # Notably, X11's log files are /var/log/Xorg.*.log
426 acceptable_directory = True
427 break
428 if (
429 matched_directory
430 in ACCEPTABLE_CLEAN_ON_REMOVAL_IF_EXACT_MATCH_OR_SUBDIR_OF
431 ):
432 would_have_allowed_direct_match = True
433 break
434 matched_directory = os.path.dirname(matched_directory)
435 is_subdir_match = True
437 if would_have_allowed_direct_match and not acceptable_directory:
438 raise ManifestParseException(
439 f'The pattern "{original_pattern}" defined at {definition_source.path} seems to'
440 " be overreaching. If it had been a path (and not use a glob), the rule would"
441 " have been permitted."
442 )
443 elif not acceptable_directory:
444 raise ManifestParseException(
445 f'The pattern or path "{original_pattern}" defined at {definition_source.path} seems to'
446 f' be overreaching or not limited to the set of "known acceptable" directories.'
447 )
449 try:
450 shell_escaped_pattern = match_rule.shell_escape_pattern()
451 except TypeError:
452 raise ManifestParseException(
453 f'Sorry, the pattern "{original_pattern}" defined at {definition_source.path}'
454 f" is unfortunately not supported by `debputy` for clean-after-removal rules."
455 f" If you can rewrite the rule to something like `/var/log/foo/*.log` or"
456 f' similar "trivial" patterns. You may have to rewrite the pattern the rule '
457 f" into multiple patterns to achieve this. This restriction is to enable "
458 f' `debputy` to ensure the pattern is correctly executed plus catch "obvious'
459 f' system trashing" patterns. Apologies for the inconvenience.'
460 )
462 if ignore_non_empty_dir:
463 cmd = f' rmdir --ignore-fail-on-non-empty "${ DPKG_ROOT} "{shell_escaped_pattern}\n'
464 elif recurse:
465 cmd = f' rm -fr "${ DPKG_ROOT} "{shell_escaped_pattern}\n'
466 elif original_pattern.endswith("/"):
467 cmd = f' rmdir "${ DPKG_ROOT} "{shell_escaped_pattern}\n'
468 else:
469 cmd = f' rm -f "${ DPKG_ROOT} "{shell_escaped_pattern}\n'
470 content_lines.append(cmd)
471 content_lines.append("fi\n")
473 snippet = MaintscriptSnippet(definition_source.path, "".join(content_lines))
474 package_state.maintscript_snippets["postrm"].append(snippet)