Coverage for src/debputy/plugin/debputy/binary_package_rules.py: 83%
173 statements
« prev ^ index » next coverage.py v7.6.0, created at 2025-01-27 13:59 +0000
« prev ^ index » next coverage.py v7.6.0, created at 2025-01-27 13:59 +0000
1import dataclasses
2import os
3from typing import (
4 Any,
5 List,
6 NotRequired,
7 Union,
8 Literal,
9 TypedDict,
10 Annotated,
11 Optional,
12 FrozenSet,
13 Self,
14 cast,
15)
17from debputy.maintscript_snippet import DpkgMaintscriptHelperCommand, MaintscriptSnippet
18from debputy.manifest_parser.base_types import FileSystemExactMatchRule
19from debputy.manifest_parser.declarative_parser import ParserGenerator
20from debputy.manifest_parser.exceptions import ManifestParseException
21from debputy.manifest_parser.parse_hints import DebputyParseHint
22from debputy.manifest_parser.parser_data import ParserContextData
23from debputy.manifest_parser.tagging_types import DebputyParsedContent
24from debputy.manifest_parser.util import AttributePath
25from debputy.path_matcher import MatchRule, MATCH_ANYTHING, ExactFileSystemPath
26from debputy.plugin.api import reference_documentation
27from debputy.plugin.api.impl import (
28 DebputyPluginInitializerProvider,
29 ServiceDefinitionImpl,
30)
31from debputy.plugin.api.parser_tables import OPARSER_PACKAGES
32from debputy.plugin.api.spec import (
33 ServiceUpgradeRule,
34 ServiceDefinition,
35 DSD,
36 INTEGRATION_MODE_DH_DEBPUTY_RRR,
37 not_integrations,
38)
39from debputy.transformation_rules import TransformationRule
40from debputy.util import _error, manifest_format_doc
42ACCEPTABLE_CLEAN_ON_REMOVAL_FOR_GLOBS_AND_EXACT_MATCHES = frozenset(
43 [
44 "./var/log",
45 ]
46)
49ACCEPTABLE_CLEAN_ON_REMOVAL_IF_EXACT_MATCH_OR_SUBDIR_OF = frozenset(
50 [
51 "./etc",
52 "./run",
53 "./var/lib",
54 "./var/cache",
55 "./var/backups",
56 "./var/spool",
57 # linux-image uses these paths with some `rm -f`
58 "./usr/lib/modules",
59 "./lib/modules",
60 # udev special case
61 "./lib/udev",
62 "./usr/lib/udev",
63 # pciutils deletes /usr/share/misc/pci.ids.<ext>
64 "./usr/share/misc",
65 ]
66)
69def register_binary_package_rules(api: DebputyPluginInitializerProvider) -> None:
70 api.pluggable_manifest_rule(
71 OPARSER_PACKAGES,
72 "binary-version",
73 BinaryVersionParsedFormat,
74 _parse_binary_version,
75 source_format=str,
76 )
78 api.pluggable_manifest_rule(
79 OPARSER_PACKAGES,
80 "transformations",
81 List[TransformationRule],
82 _unpack_list,
83 )
85 api.pluggable_manifest_rule(
86 OPARSER_PACKAGES,
87 "conffile-management",
88 List[DpkgMaintscriptHelperCommand],
89 _unpack_list,
90 expected_debputy_integration_mode=not_integrations(
91 INTEGRATION_MODE_DH_DEBPUTY_RRR
92 ),
93 )
95 api.pluggable_manifest_rule(
96 OPARSER_PACKAGES,
97 "services",
98 List[ServiceRuleParsedFormat],
99 _process_service_rules,
100 source_format=List[ServiceRuleSourceFormat],
101 expected_debputy_integration_mode=not_integrations(
102 INTEGRATION_MODE_DH_DEBPUTY_RRR
103 ),
104 )
106 api.pluggable_manifest_rule(
107 OPARSER_PACKAGES,
108 "clean-after-removal",
109 ListParsedFormat,
110 _parse_clean_after_removal,
111 # FIXME: debputy won't see the attributes for this one :'(
112 # (update `debputy_docs.yaml` when fixed)
113 source_format=List[Any],
114 expected_debputy_integration_mode=not_integrations(
115 INTEGRATION_MODE_DH_DEBPUTY_RRR
116 ),
117 )
119 api.pluggable_manifest_rule(
120 OPARSER_PACKAGES,
121 "installation-search-dirs",
122 InstallationSearchDirsParsedFormat,
123 _parse_installation_search_dirs,
124 source_format=List[FileSystemExactMatchRule],
125 expected_debputy_integration_mode=not_integrations(
126 INTEGRATION_MODE_DH_DEBPUTY_RRR
127 ),
128 )
131class ServiceRuleSourceFormat(TypedDict):
132 service: str
133 type_of_service: NotRequired[str]
134 service_scope: NotRequired[Literal["system", "user"]]
135 enable_on_install: NotRequired[bool]
136 start_on_install: NotRequired[bool]
137 on_upgrade: NotRequired[ServiceUpgradeRule]
138 service_manager: NotRequired[
139 Annotated[str, DebputyParseHint.target_attribute("service_managers")]
140 ]
141 service_managers: NotRequired[List[str]]
144class ServiceRuleParsedFormat(DebputyParsedContent):
145 service: str
146 type_of_service: NotRequired[str]
147 service_scope: NotRequired[Literal["system", "user"]]
148 enable_on_install: NotRequired[bool]
149 start_on_install: NotRequired[bool]
150 on_upgrade: NotRequired[ServiceUpgradeRule]
151 service_managers: NotRequired[List[str]]
154@dataclasses.dataclass(slots=True, frozen=True)
155class ServiceRule:
156 definition_source: str
157 service: str
158 type_of_service: str
159 service_scope: Literal["system", "user"]
160 enable_on_install: Optional[bool]
161 start_on_install: Optional[bool]
162 on_upgrade: Optional[ServiceUpgradeRule]
163 service_managers: Optional[FrozenSet[str]]
165 @classmethod
166 def from_service_rule_parsed_format(
167 cls,
168 data: ServiceRuleParsedFormat,
169 attribute_path: AttributePath,
170 ) -> "Self":
171 service_managers = data.get("service_managers")
172 return cls(
173 attribute_path.path,
174 data["service"],
175 data.get("type_of_service", "service"),
176 cast("Literal['system', 'user']", data.get("service_scope", "system")),
177 data.get("enable_on_install"),
178 data.get("start_on_install"),
179 data.get("on_upgrade"),
180 frozenset(service_managers) if service_managers else service_managers,
181 )
183 def applies_to_service_manager(self, service_manager: str) -> bool:
184 return self.service_managers is None or service_manager in self.service_managers
186 def apply_to_service_definition(
187 self,
188 service_definition: ServiceDefinition[DSD],
189 ) -> ServiceDefinition[DSD]:
190 assert isinstance(service_definition, ServiceDefinitionImpl)
191 if not service_definition.is_plugin_provided_definition:
192 _error(
193 f"Conflicting definitions related to {self.service} (type: {self.type_of_service},"
194 f" scope: {self.service_scope}). First definition at {service_definition.definition_source},"
195 f" the second at {self.definition_source}). If they are for different service managers,"
196 " you can often avoid this problem by explicitly defining which service managers are applicable"
197 ' to each rule via the "service-managers" keyword.'
198 )
199 changes = {
200 "definition_source": self.definition_source,
201 "is_plugin_provided_definition": False,
202 }
203 if (
204 self.service != service_definition.name
205 and self.service in service_definition.names
206 ):
207 changes["name"] = self.service
208 if self.enable_on_install is not None:
209 changes["auto_start_on_install"] = self.enable_on_install
210 if self.start_on_install is not None:
211 changes["auto_start_on_install"] = self.start_on_install
212 if self.on_upgrade is not None:
213 changes["on_upgrade"] = self.on_upgrade
215 return service_definition.replace(**changes)
218class BinaryVersionParsedFormat(DebputyParsedContent):
219 binary_version: str
222class ListParsedFormat(DebputyParsedContent):
223 elements: List[Any]
226class ListOfTransformationRulesFormat(DebputyParsedContent):
227 elements: List[TransformationRule]
230class ListOfDpkgMaintscriptHelperCommandFormat(DebputyParsedContent):
231 elements: List[DpkgMaintscriptHelperCommand]
234class InstallationSearchDirsParsedFormat(DebputyParsedContent):
235 installation_search_dirs: List[FileSystemExactMatchRule]
238def _parse_binary_version(
239 _name: str,
240 parsed_data: BinaryVersionParsedFormat,
241 _attribute_path: AttributePath,
242 _parser_context: ParserContextData,
243) -> str:
244 return parsed_data["binary_version"]
247def _parse_installation_search_dirs(
248 _name: str,
249 parsed_data: InstallationSearchDirsParsedFormat,
250 _attribute_path: AttributePath,
251 _parser_context: ParserContextData,
252) -> List[FileSystemExactMatchRule]:
253 return parsed_data["installation_search_dirs"]
256def _process_service_rules(
257 _name: str,
258 parsed_data: List[ServiceRuleParsedFormat],
259 attribute_path: AttributePath,
260 _parser_context: ParserContextData,
261) -> List[ServiceRule]:
262 return [
263 ServiceRule.from_service_rule_parsed_format(x, attribute_path[i])
264 for i, x in enumerate(parsed_data)
265 ]
268def _unpack_list(
269 _name: str,
270 parsed_data: List[Any],
271 _attribute_path: AttributePath,
272 _parser_context: ParserContextData,
273) -> List[Any]:
274 return parsed_data
277class CleanAfterRemovalRuleSourceFormat(TypedDict):
278 path: NotRequired[Annotated[str, DebputyParseHint.target_attribute("paths")]]
279 paths: NotRequired[List[str]]
280 delete_on: NotRequired[Literal["purge", "removal"]]
281 recursive: NotRequired[bool]
282 ignore_non_empty_dir: NotRequired[bool]
285class CleanAfterRemovalRule(DebputyParsedContent):
286 paths: List[str]
287 delete_on: NotRequired[Literal["purge", "removal"]]
288 recursive: NotRequired[bool]
289 ignore_non_empty_dir: NotRequired[bool]
292# FIXME: Not optimal that we are doing an initialization of ParserGenerator here. But the rule is not depending on any
293# complex types that is registered by plugins, so it will work for now.
294_CLEAN_AFTER_REMOVAL_RULE_PARSER = ParserGenerator().generate_parser(
295 CleanAfterRemovalRule,
296 source_content=Union[CleanAfterRemovalRuleSourceFormat, str, List[str]],
297 inline_reference_documentation=reference_documentation(
298 reference_documentation_url=manifest_format_doc(
299 "remove-runtime-created-paths-on-purge-or-post-removal-clean-after-removal"
300 ),
301 ),
302)
305# Order between clean_on_removal and conffile_management is
306# important. We want the dpkg conffile management rules to happen before the
307# clean clean_on_removal rules. Since the latter only affects `postrm`
308# and the order is reversed for `postrm` scripts (among other), we need do
309# clean_on_removal first to account for the reversing of order.
310#
311# FIXME: All of this is currently not really possible todo, but it should be.
312# (I think it is the correct order by "mistake" rather than by "design", which is
313# what this note is about)
314def _parse_clean_after_removal(
315 _name: str,
316 parsed_data: ListParsedFormat,
317 attribute_path: AttributePath,
318 parser_context: ParserContextData,
319) -> None: # TODO: Return and pass to a maintscript helper
320 raw_clean_after_removal = parsed_data["elements"]
321 package_state = parser_context.current_binary_package_state
323 for no, raw_transformation in enumerate(raw_clean_after_removal):
324 definition_source = attribute_path[no]
325 clean_after_removal_rules = _CLEAN_AFTER_REMOVAL_RULE_PARSER.parse_input(
326 raw_transformation,
327 definition_source,
328 parser_context=parser_context,
329 )
330 patterns = clean_after_removal_rules["paths"]
331 if patterns: 331 ↛ 333line 331 didn't jump to line 333 because the condition on line 331 was always true
332 definition_source.path_hint = patterns[0]
333 delete_on = clean_after_removal_rules.get("delete_on") or "purge"
334 recurse = clean_after_removal_rules.get("recursive") or False
335 ignore_non_empty_dir = (
336 clean_after_removal_rules.get("ignore_non_empty_dir") or False
337 )
338 if delete_on == "purge": 338 ↛ 341line 338 didn't jump to line 341 because the condition on line 338 was always true
339 condition = '[ "$1" = "purge" ]'
340 else:
341 condition = '[ "$1" = "remove" ]'
343 if ignore_non_empty_dir:
344 if recurse: 344 ↛ 345line 344 didn't jump to line 345 because the condition on line 344 was never true
345 raise ManifestParseException(
346 'The "recursive" and "ignore-non-empty-dir" options are mutually exclusive.'
347 f" Both were enabled at the same time in at {definition_source.path}"
348 )
349 for pattern in patterns:
350 if not pattern.endswith("/"): 350 ↛ 351line 350 didn't jump to line 351 because the condition on line 350 was never true
351 raise ManifestParseException(
352 'When ignore-non-empty-dir is True, then all patterns must end with a literal "/"'
353 f' to ensure they only apply to directories. The pattern "{pattern}" at'
354 f" {definition_source.path} did not."
355 )
357 substitution = parser_context.substitution
358 match_rules = [
359 MatchRule.from_path_or_glob(
360 p, definition_source.path, substitution=substitution
361 )
362 for p in patterns
363 ]
364 content_lines = [
365 f"if {condition}; then\n",
366 ]
367 for idx, match_rule in enumerate(match_rules):
368 original_pattern = patterns[idx]
369 if match_rule is MATCH_ANYTHING: 369 ↛ 370line 369 didn't jump to line 370 because the condition on line 369 was never true
370 raise ManifestParseException(
371 f'Using "{original_pattern}" in a clean rule would trash the system.'
372 f" Please restrict this pattern at {definition_source.path} considerably."
373 )
374 is_subdir_match = False
375 matched_directory: Optional[str]
376 if isinstance(match_rule, ExactFileSystemPath):
377 matched_directory = (
378 os.path.dirname(match_rule.path)
379 if match_rule.path not in ("/", ".", "./")
380 else match_rule.path
381 )
382 is_subdir_match = True
383 else:
384 matched_directory = getattr(match_rule, "directory", None)
386 if matched_directory is None: 386 ↛ 387line 386 didn't jump to line 387 because the condition on line 386 was never true
387 raise ManifestParseException(
388 f'The pattern "{original_pattern}" defined at {definition_source.path} is not'
389 f" trivially anchored in a specific directory. Cowardly refusing to use it"
390 f" in a clean rule as it may trash the system if the pattern is overreaching."
391 f" Please avoid glob characters in the top level directories."
392 )
393 assert matched_directory.startswith("./") or matched_directory in (
394 ".",
395 "./",
396 "",
397 )
398 acceptable_directory = False
399 would_have_allowed_direct_match = False
400 while matched_directory not in (".", "./", ""):
401 # Our acceptable paths set includes "/var/lib" or "/etc". We require that the
402 # pattern is either an exact match, in which case it may match directly inside
403 # the acceptable directory OR it is a pattern against a subdirectory of the
404 # acceptable path. As an example:
405 #
406 # /etc/inputrc <-- OK, exact match
407 # /etc/foo/* <-- OK, subdir match
408 # /etc/* <-- ERROR, glob directly in the accepted directory.
409 if is_subdir_match and (
410 matched_directory
411 in ACCEPTABLE_CLEAN_ON_REMOVAL_IF_EXACT_MATCH_OR_SUBDIR_OF
412 ):
413 acceptable_directory = True
414 break
415 if (
416 matched_directory
417 in ACCEPTABLE_CLEAN_ON_REMOVAL_FOR_GLOBS_AND_EXACT_MATCHES
418 ):
419 # Special-case: In some directories (such as /var/log), we allow globs directly.
420 # Notably, X11's log files are /var/log/Xorg.*.log
421 acceptable_directory = True
422 break
423 if (
424 matched_directory
425 in ACCEPTABLE_CLEAN_ON_REMOVAL_IF_EXACT_MATCH_OR_SUBDIR_OF
426 ):
427 would_have_allowed_direct_match = True
428 break
429 matched_directory = os.path.dirname(matched_directory)
430 is_subdir_match = True
432 if would_have_allowed_direct_match and not acceptable_directory:
433 raise ManifestParseException(
434 f'The pattern "{original_pattern}" defined at {definition_source.path} seems to'
435 " be overreaching. If it has been a path (and not use a glob), the rule would"
436 " have been permitted."
437 )
438 elif not acceptable_directory:
439 raise ManifestParseException(
440 f'The pattern or path "{original_pattern}" defined at {definition_source.path} seems to'
441 f' be overreaching or not limited to the set of "known acceptable" directories.'
442 )
444 try:
445 shell_escaped_pattern = match_rule.shell_escape_pattern()
446 except TypeError:
447 raise ManifestParseException(
448 f'Sorry, the pattern "{original_pattern}" defined at {definition_source.path}'
449 f" is unfortunately not supported by `debputy` for clean-after-removal rules."
450 f" If you can rewrite the rule to something like `/var/log/foo/*.log` or"
451 f' similar "trivial" patterns. You may have to rewrite the pattern the rule '
452 f" into multiple patterns to achieve this. This restriction is to enable "
453 f' `debputy` to ensure the pattern is correctly executed plus catch "obvious'
454 f' system trashing" patterns. Apologies for the inconvenience.'
455 )
457 if ignore_non_empty_dir:
458 cmd = f' rmdir --ignore-fail-on-non-empty "${ DPKG_ROOT} "{shell_escaped_pattern}\n'
459 elif recurse:
460 cmd = f' rm -fr "${ DPKG_ROOT} "{shell_escaped_pattern}\n'
461 elif original_pattern.endswith("/"):
462 cmd = f' rmdir "${ DPKG_ROOT} "{shell_escaped_pattern}\n'
463 else:
464 cmd = f' rm -f "${ DPKG_ROOT} "{shell_escaped_pattern}\n'
465 content_lines.append(cmd)
466 content_lines.append("fi\n")
468 snippet = MaintscriptSnippet(definition_source.path, "".join(content_lines))
469 package_state.maintscript_snippets["postrm"].append(snippet)