Coverage for src/debputy/highlevel_manifest_parser.py: 72%
307 statements
« prev ^ index » next coverage.py v7.6.0, created at 2025-01-27 13:59 +0000
« prev ^ index » next coverage.py v7.6.0, created at 2025-01-27 13:59 +0000
1import collections
2import contextlib
3from typing import (
4 Optional,
5 Dict,
6 Callable,
7 List,
8 Any,
9 Union,
10 Mapping,
11 IO,
12 Iterator,
13 cast,
14 Tuple,
15)
17from debian.debian_support import DpkgArchTable
19from debputy.highlevel_manifest import (
20 HighLevelManifest,
21 PackageTransformationDefinition,
22 MutableYAMLManifest,
23)
24from debputy.maintscript_snippet import (
25 MaintscriptSnippet,
26 STD_CONTROL_SCRIPTS,
27 MaintscriptSnippetContainer,
28)
29from debputy.packages import BinaryPackage, SourcePackage
30from debputy.path_matcher import (
31 MatchRuleType,
32 ExactFileSystemPath,
33 MatchRule,
34)
35from debputy.substitution import Substitution
36from debputy.util import (
37 _normalize_path,
38 escape_shell,
39 assume_not_none,
40)
41from debputy.util import _warn, _info
42from ._deb_options_profiles import DebBuildOptionsAndProfiles
43from .architecture_support import DpkgArchitectureBuildProcessValuesTable
44from .filesystem_scan import FSROOverlay
45from .installations import InstallRule, PPFInstallRule
46from .manifest_parser.base_types import BuildEnvironments, BuildEnvironmentDefinition
47from .manifest_parser.exceptions import ManifestParseException
48from .manifest_parser.parser_data import ParserContextData
49from .manifest_parser.util import AttributePath
50from .packager_provided_files import detect_all_packager_provided_files
51from .plugin.api import VirtualPath
52from .plugin.api.feature_set import PluginProvidedFeatureSet
53from .plugin.api.impl_types import (
54 TP,
55 TTP,
56 DispatchingTableParser,
57 PackageContextData,
58)
59from debputy.plugin.api.parser_tables import OPARSER_MANIFEST_ROOT
60from .plugin.api.spec import DebputyIntegrationMode
61from .plugin.debputy.to_be_api_types import BuildRule
62from .yaml import YAMLError, MANIFEST_YAML
64try:
65 from Levenshtein import distance
66except ImportError:
68 def _detect_possible_typo(
69 _d,
70 _key,
71 _attribute_parent_path: AttributePath,
72 required: bool,
73 ) -> None:
74 if required:
75 _info(
76 "Install python3-levenshtein to have debputy try to detect typos in the manifest."
77 )
79else:
81 def _detect_possible_typo(
82 d,
83 key,
84 _attribute_parent_path: AttributePath,
85 _required: bool,
86 ) -> None:
87 k_len = len(key)
88 for actual_key in d:
89 if abs(k_len - len(actual_key)) > 2:
90 continue
91 d = distance(key, actual_key)
92 if d > 2:
93 continue
94 path = _attribute_parent_path.path
95 ref = f'at "{path}"' if path else "at the manifest root level"
96 _warn(
97 f'Possible typo: The key "{actual_key}" should probably have been "{key}" {ref}'
98 )
101def _per_package_subst_variables(
102 p: BinaryPackage,
103 *,
104 name: Optional[str] = None,
105) -> Dict[str, str]:
106 return {
107 "PACKAGE": name if name is not None else p.name,
108 }
111class HighLevelManifestParser(ParserContextData):
112 def __init__(
113 self,
114 manifest_path: str,
115 source_package: SourcePackage,
116 binary_packages: Mapping[str, BinaryPackage],
117 substitution: Substitution,
118 dpkg_architecture_variables: DpkgArchitectureBuildProcessValuesTable,
119 dpkg_arch_query_table: DpkgArchTable,
120 build_env: DebBuildOptionsAndProfiles,
121 plugin_provided_feature_set: PluginProvidedFeatureSet,
122 debputy_integration_mode: DebputyIntegrationMode,
123 *,
124 # Available for testing purposes only
125 debian_dir: Union[str, VirtualPath] = "./debian",
126 ):
127 self.manifest_path = manifest_path
128 self._source_package = source_package
129 self._binary_packages = binary_packages
130 self._mutable_yaml_manifest: Optional[MutableYAMLManifest] = None
131 # In source context, some variables are known to be unresolvable. Record this, so
132 # we can give better error messages.
133 self._substitution = substitution
134 self._dpkg_architecture_variables = dpkg_architecture_variables
135 self._dpkg_arch_query_table = dpkg_arch_query_table
136 self._deb_options_and_profiles = build_env
137 self._package_state_stack: List[PackageTransformationDefinition] = []
138 self._plugin_provided_feature_set = plugin_provided_feature_set
139 self._debputy_integration_mode = debputy_integration_mode
140 self._declared_variables = {}
141 self._used_named_envs = set()
142 self._build_environments: Optional[BuildEnvironments] = BuildEnvironments(
143 {},
144 None,
145 )
146 self._has_set_default_build_environment = False
147 self._read_build_environment = False
148 self._build_rules: Optional[List[BuildRule]] = None
150 if isinstance(debian_dir, str): 150 ↛ 151line 150 didn't jump to line 151 because the condition on line 150 was never true
151 debian_dir = FSROOverlay.create_root_dir("debian", debian_dir)
153 self._debian_dir = debian_dir
155 # Delayed initialized; we rely on this delay to parse the variables.
156 self._all_package_states = None
158 self._install_rules: Optional[List[InstallRule]] = None
159 self._ownership_caches_loaded = False
160 self._used = False
162 def _ensure_package_states_is_initialized(self) -> None:
163 if self._all_package_states is not None:
164 return
165 substitution = self._substitution
166 binary_packages = self._binary_packages
167 assert self._all_package_states is None
169 self._all_package_states = {
170 n: PackageTransformationDefinition(
171 binary_package=p,
172 substitution=substitution.with_extra_substitutions(
173 **_per_package_subst_variables(p)
174 ),
175 is_auto_generated_package=False,
176 maintscript_snippets=collections.defaultdict(
177 MaintscriptSnippetContainer
178 ),
179 )
180 for n, p in binary_packages.items()
181 }
182 for n, p in binary_packages.items():
183 dbgsym_name = f"{n}-dbgsym"
184 if dbgsym_name in self._all_package_states: 184 ↛ 185line 184 didn't jump to line 185 because the condition on line 184 was never true
185 continue
186 self._all_package_states[dbgsym_name] = PackageTransformationDefinition(
187 binary_package=p,
188 substitution=substitution.with_extra_substitutions(
189 **_per_package_subst_variables(p, name=dbgsym_name)
190 ),
191 is_auto_generated_package=True,
192 maintscript_snippets=collections.defaultdict(
193 MaintscriptSnippetContainer
194 ),
195 )
197 @property
198 def binary_packages(self) -> Mapping[str, BinaryPackage]:
199 return self._binary_packages
201 @property
202 def _package_states(self) -> Mapping[str, PackageTransformationDefinition]:
203 assert self._all_package_states is not None
204 return self._all_package_states
206 @property
207 def dpkg_architecture_variables(self) -> DpkgArchitectureBuildProcessValuesTable:
208 return self._dpkg_architecture_variables
210 @property
211 def dpkg_arch_query_table(self) -> DpkgArchTable:
212 return self._dpkg_arch_query_table
214 @property
215 def deb_options_and_profiles(self) -> DebBuildOptionsAndProfiles:
216 return self._deb_options_and_profiles
218 def _self_check(self) -> None:
219 unused_envs = (
220 self._build_environments.environments.keys() - self._used_named_envs
221 )
222 if unused_envs:
223 unused_env_names = ", ".join(unused_envs)
224 raise ManifestParseException(
225 f"The following named environments were never referenced: {unused_env_names}"
226 )
228 def build_manifest(self) -> HighLevelManifest:
229 self._self_check()
230 if self._used: 230 ↛ 231line 230 didn't jump to line 231 because the condition on line 230 was never true
231 raise TypeError("build_manifest can only be called once!")
232 self._used = True
233 self._ensure_package_states_is_initialized()
234 for var, attribute_path in self._declared_variables.items():
235 if not self.substitution.is_used(var):
236 raise ManifestParseException(
237 f'The variable "{var}" is unused. Either use it or remove it.'
238 f" The variable was declared at {attribute_path.path_key_lc}."
239 )
240 if isinstance(self, YAMLManifestParser) and self._mutable_yaml_manifest is None:
241 self._mutable_yaml_manifest = MutableYAMLManifest.empty_manifest()
242 all_packager_provided_files = detect_all_packager_provided_files(
243 self._plugin_provided_feature_set.packager_provided_files,
244 self._debian_dir,
245 self.binary_packages,
246 )
248 for package in self._package_states:
249 with self.binary_package_context(package) as context:
250 if not context.is_auto_generated_package:
251 ppf_result = all_packager_provided_files[package]
252 if ppf_result.auto_installable: 252 ↛ 253line 252 didn't jump to line 253 because the condition on line 252 was never true
253 context.install_rules.append(
254 PPFInstallRule(
255 context.binary_package,
256 context.substitution,
257 ppf_result.auto_installable,
258 )
259 )
260 context.reserved_packager_provided_files.update(
261 ppf_result.reserved_only
262 )
263 self._transform_dpkg_maintscript_helpers_to_snippets()
264 build_environments = self.build_environments()
265 assert build_environments is not None
267 return HighLevelManifest(
268 self.manifest_path,
269 self._mutable_yaml_manifest,
270 self._install_rules,
271 self._source_package,
272 self.binary_packages,
273 self.substitution,
274 self._package_states,
275 self._dpkg_architecture_variables,
276 self._dpkg_arch_query_table,
277 self._deb_options_and_profiles,
278 build_environments,
279 self._build_rules,
280 self._plugin_provided_feature_set,
281 self._debian_dir,
282 )
284 @contextlib.contextmanager
285 def binary_package_context(
286 self, package_name: str
287 ) -> Iterator[PackageTransformationDefinition]:
288 if package_name not in self._package_states:
289 self._error(
290 f'The package "{package_name}" is not present in the debian/control file (could not find'
291 f' "Package: {package_name}" in a binary stanza) nor is it a -dbgsym package for one'
292 " for a package in debian/control."
293 )
294 package_state = self._package_states[package_name]
295 self._package_state_stack.append(package_state)
296 ps_len = len(self._package_state_stack)
297 yield package_state
298 if ps_len != len(self._package_state_stack): 298 ↛ 299line 298 didn't jump to line 299 because the condition on line 298 was never true
299 raise RuntimeError("Internal error: Unbalanced stack manipulation detected")
300 self._package_state_stack.pop()
302 def dispatch_parser_table_for(self, rule_type: TTP) -> DispatchingTableParser[TP]:
303 t = self._plugin_provided_feature_set.manifest_parser_generator.dispatch_parser_table_for(
304 rule_type
305 )
306 if t is None:
307 raise AssertionError(
308 f"Internal error: No dispatching parser for {rule_type.__name__}"
309 )
310 return t
312 @property
313 def substitution(self) -> Substitution:
314 if self._package_state_stack:
315 return self._package_state_stack[-1].substitution
316 return self._substitution
318 def add_extra_substitution_variables(
319 self,
320 **extra_substitutions: Tuple[str, AttributePath],
321 ) -> Substitution:
322 if self._package_state_stack or self._all_package_states is not None: 322 ↛ 327line 322 didn't jump to line 327 because the condition on line 322 was never true
323 # For one, it would not "bubble up" correctly when added to the lowest stack.
324 # And if it is not added to the lowest stack, then you get errors about it being
325 # unknown as soon as you leave the stack (which is weird for the user when
326 # the variable is something known, sometimes not)
327 raise RuntimeError("Cannot use add_extra_substitution from this state")
328 for key, (_, path) in extra_substitutions.items():
329 self._declared_variables[key] = path
330 self._substitution = self._substitution.with_extra_substitutions(
331 **{k: v[0] for k, v in extra_substitutions.items()}
332 )
333 return self._substitution
335 @property
336 def current_binary_package_state(self) -> PackageTransformationDefinition:
337 if not self._package_state_stack: 337 ↛ 338line 337 didn't jump to line 338 because the condition on line 337 was never true
338 raise RuntimeError("Invalid state: Not in a binary package context")
339 return self._package_state_stack[-1]
341 @property
342 def is_in_binary_package_state(self) -> bool:
343 return bool(self._package_state_stack)
345 @property
346 def debputy_integration_mode(self) -> DebputyIntegrationMode:
347 return self._debputy_integration_mode
349 @debputy_integration_mode.setter
350 def debputy_integration_mode(self, new_value: DebputyIntegrationMode) -> None:
351 self._debputy_integration_mode = new_value
353 def _register_build_environment(
354 self,
355 name: Optional[str],
356 build_environment: BuildEnvironmentDefinition,
357 attribute_path: AttributePath,
358 is_default: bool = False,
359 ) -> None:
360 assert not self._read_build_environment
362 # TODO: Reference the paths of the original environments for the error messages where that is relevant.
363 if is_default:
364 if self._has_set_default_build_environment: 364 ↛ 365line 364 didn't jump to line 365 because the condition on line 364 was never true
365 raise ManifestParseException(
366 f"There cannot be multiple default environments and"
367 f" therefore {attribute_path.path} cannot be a default environment"
368 )
369 self._has_set_default_build_environment = True
370 self._build_environments.default_environment = build_environment
371 if name is None: 371 ↛ 380line 371 didn't jump to line 380 because the condition on line 371 was always true
372 return
373 elif name is None: 373 ↛ 374line 373 didn't jump to line 374 because the condition on line 373 was never true
374 raise ManifestParseException(
375 f"Useless environment defined at {attribute_path.path}. It is neither the"
376 " default environment nor does it have a name (so no rules can reference it"
377 " explicitly)"
378 )
380 if name in self._build_environments.environments: 380 ↛ 381line 380 didn't jump to line 381 because the condition on line 380 was never true
381 raise ManifestParseException(
382 f'The environment defined at {attribute_path.path} reuse the name "{name}".'
383 " The environment name must be unique."
384 )
385 self._build_environments.environments[name] = build_environment
387 def resolve_build_environment(
388 self,
389 name: Optional[str],
390 attribute_path: AttributePath,
391 ) -> BuildEnvironmentDefinition:
392 if name is None:
393 return self.build_environments().default_environment
394 try:
395 env = self.build_environments().environments[name]
396 except KeyError:
397 raise ManifestParseException(
398 f'The environment "{name}" requested at {attribute_path.path} was not'
399 f" defined in the `build-environments`"
400 )
401 else:
402 self._used_named_envs.add(name)
403 return env
405 def build_environments(self) -> BuildEnvironments:
406 v = self._build_environments
407 if (
408 not self._read_build_environment
409 and not self._build_environments.environments
410 and self._build_environments.default_environment is None
411 ):
412 self._build_environments.default_environment = BuildEnvironmentDefinition()
413 self._read_build_environment = True
414 return v
416 def _transform_dpkg_maintscript_helpers_to_snippets(self) -> None:
417 package_state = self.current_binary_package_state
418 for dmh in package_state.dpkg_maintscript_helper_snippets: 418 ↛ 419line 418 didn't jump to line 419 because the loop on line 418 never started
419 snippet = MaintscriptSnippet(
420 definition_source=dmh.definition_source,
421 snippet=f'dpkg-maintscript-helper {escape_shell(*dmh.cmdline)} -- "$@"\n',
422 )
423 for script in STD_CONTROL_SCRIPTS:
424 package_state.maintscript_snippets[script].append(snippet)
426 def normalize_path(
427 self,
428 path: str,
429 definition_source: AttributePath,
430 *,
431 allow_root_dir_match: bool = False,
432 ) -> ExactFileSystemPath:
433 try:
434 normalized = _normalize_path(path)
435 except ValueError:
436 self._error(
437 f'The path "{path}" provided in {definition_source.path} should be relative to the root of the'
438 ' package and not use any ".." or "." segments.'
439 )
440 if normalized == "." and not allow_root_dir_match:
441 self._error(
442 "Manifests must not change the root directory of the deb file. Please correct"
443 f' "{definition_source.path}" (path: "{path}) in {self.manifest_path}'
444 )
445 return ExactFileSystemPath(
446 self.substitution.substitute(normalized, definition_source.path)
447 )
449 def parse_path_or_glob(
450 self,
451 path_or_glob: str,
452 definition_source: AttributePath,
453 ) -> MatchRule:
454 match_rule = MatchRule.from_path_or_glob(
455 path_or_glob, definition_source.path, substitution=self.substitution
456 )
457 # NB: "." and "/" will be translated to MATCH_ANYTHING by MatchRule.from_path_or_glob,
458 # so there is no need to check for an exact match on "." like in normalize_path.
459 if match_rule.rule_type == MatchRuleType.MATCH_ANYTHING:
460 self._error(
461 f'The chosen match rule "{path_or_glob}" matches everything (including the deb root directory).'
462 f' Please correct "{definition_source.path}" (path: "{path_or_glob}) in {self.manifest_path} to'
463 f' something that matches "less" than everything.'
464 )
465 return match_rule
467 def parse_manifest(self) -> HighLevelManifest:
468 raise NotImplementedError
471class YAMLManifestParser(HighLevelManifestParser):
472 def _optional_key(
473 self,
474 d: Mapping[str, Any],
475 key: str,
476 attribute_parent_path: AttributePath,
477 expected_type=None,
478 default_value=None,
479 ):
480 v = d.get(key)
481 if v is None:
482 _detect_possible_typo(d, key, attribute_parent_path, False)
483 return default_value
484 if expected_type is not None:
485 return self._ensure_value_is_type(
486 v, expected_type, key, attribute_parent_path
487 )
488 return v
490 def _required_key(
491 self,
492 d: Mapping[str, Any],
493 key: str,
494 attribute_parent_path: AttributePath,
495 expected_type=None,
496 extra: Optional[Union[str, Callable[[], str]]] = None,
497 ):
498 v = d.get(key)
499 if v is None:
500 _detect_possible_typo(d, key, attribute_parent_path, True)
501 if extra is not None:
502 msg = extra if isinstance(extra, str) else extra()
503 extra_info = " " + msg
504 else:
505 extra_info = ""
506 self._error(
507 f'Missing required key {key} at {attribute_parent_path.path} in manifest "{self.manifest_path}.'
508 f"{extra_info}"
509 )
511 if expected_type is not None:
512 return self._ensure_value_is_type(
513 v, expected_type, key, attribute_parent_path
514 )
515 return v
517 def _ensure_value_is_type(
518 self,
519 v,
520 t,
521 key: Union[str, int, AttributePath],
522 attribute_parent_path: Optional[AttributePath],
523 ):
524 if v is None:
525 return None
526 if not isinstance(v, t):
527 if isinstance(t, tuple):
528 t_msg = "one of: " + ", ".join(x.__name__ for x in t)
529 else:
530 t_msg = f"a {t.__name__}"
531 key_path = (
532 key.path
533 if isinstance(key, AttributePath)
534 else assume_not_none(attribute_parent_path)[key].path
535 )
536 self._error(
537 f'The key {key_path} must be {t_msg} in manifest "{self.manifest_path}"'
538 )
539 return v
541 def from_yaml_dict(self, yaml_data: object) -> "HighLevelManifest":
542 attribute_path = AttributePath.root_path(yaml_data)
543 parser_generator = self._plugin_provided_feature_set.manifest_parser_generator
544 dispatchable_object_parsers = parser_generator.dispatchable_object_parsers
545 manifest_root_parser = dispatchable_object_parsers[OPARSER_MANIFEST_ROOT]
546 parsed_data = manifest_root_parser.parse_input(
547 yaml_data,
548 attribute_path,
549 parser_context=self,
550 )
552 packages_dict: Mapping[str, PackageContextData[Mapping[str, Any]]] = cast(
553 "Mapping[str, PackageContextData[Mapping[str, Any]]]",
554 parsed_data.get("packages", {}),
555 )
556 install_rules = parsed_data.get("installations")
557 if install_rules:
558 self._install_rules = install_rules
559 packages_parent_path = attribute_path["packages"]
560 for package_name_raw, pcd in packages_dict.items():
561 definition_source = packages_parent_path[package_name_raw]
562 package_name = pcd.resolved_package_name
563 parsed = pcd.value
565 package_state: PackageTransformationDefinition
566 with self.binary_package_context(package_name) as package_state:
567 if package_state.is_auto_generated_package: 567 ↛ 569line 567 didn't jump to line 569 because the condition on line 567 was never true
568 # Maybe lift (part) of this restriction.
569 self._error(
570 f'Cannot define rules for package "{package_name}" (at {definition_source.path}). It is an'
571 " auto-generated package."
572 )
573 binary_version = parsed.get("binary-version")
574 if binary_version is not None:
575 package_state.binary_version = (
576 package_state.substitution.substitute(
577 binary_version,
578 definition_source["binary-version"].path,
579 )
580 )
581 search_dirs = parsed.get("installation_search_dirs")
582 if search_dirs is not None: 582 ↛ 583line 582 didn't jump to line 583 because the condition on line 582 was never true
583 package_state.search_dirs = search_dirs
584 transformations = parsed.get("transformations")
585 conffile_management = parsed.get("conffile_management")
586 service_rules = parsed.get("services")
587 if transformations:
588 package_state.transformations.extend(transformations)
589 if conffile_management: 589 ↛ 590line 589 didn't jump to line 590 because the condition on line 589 was never true
590 package_state.dpkg_maintscript_helper_snippets.extend(
591 conffile_management
592 )
593 if service_rules: 593 ↛ 594line 593 didn't jump to line 594 because the condition on line 593 was never true
594 package_state.requested_service_rules.extend(service_rules)
595 self._build_rules = parsed_data.get("builds")
597 return self.build_manifest()
599 def _parse_manifest(self, fd: Union[IO[bytes], str]) -> HighLevelManifest:
600 try:
601 data = MANIFEST_YAML.load(fd)
602 except YAMLError as e:
603 msg = str(e)
604 lines = msg.splitlines(keepends=True)
605 i = -1
606 for i, line in enumerate(lines):
607 # Avoid an irrelevant "how do configure the YAML parser" message, which the
608 # user cannot use.
609 if line.startswith("To suppress this check"):
610 break
611 if i > -1 and len(lines) > i + 1:
612 lines = lines[:i]
613 msg = "".join(lines)
614 msg = msg.rstrip()
615 msg += (
616 f"\n\nYou can use `yamllint -d relaxed {escape_shell(self.manifest_path)}` to validate"
617 " the YAML syntax. The yamllint tool also supports style rules for YAML documents"
618 " (such as indentation rules) in case that is of interest."
619 )
620 raise ManifestParseException(
621 f"Could not parse {self.manifest_path} as a YAML document: {msg}"
622 ) from e
623 self._mutable_yaml_manifest = MutableYAMLManifest(data)
624 return self.from_yaml_dict(data)
626 def parse_manifest(
627 self,
628 *,
629 fd: Optional[Union[IO[bytes], str]] = None,
630 ) -> HighLevelManifest:
631 if fd is None: 631 ↛ 632line 631 didn't jump to line 632 because the condition on line 631 was never true
632 with open(self.manifest_path, "rb") as fd:
633 return self._parse_manifest(fd)
634 else:
635 return self._parse_manifest(fd)