Coverage for src/debputy/highlevel_manifest.py: 64%
878 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-09-07 09:27 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-09-07 09:27 +0000
1import dataclasses
2import functools
3import os
4import textwrap
5from contextlib import suppress
6from dataclasses import dataclass, field
7from typing import (
8 List,
9 Dict,
10 Iterable,
11 Mapping,
12 Any,
13 Union,
14 Optional,
15 TypeVar,
16 Generic,
17 cast,
18 Set,
19 Tuple,
20 Sequence,
21 FrozenSet,
22 Callable,
23)
25from debian.debian_support import DpkgArchTable
27from debputy.dh.debhelper_emulation import (
28 dhe_dbgsym_root_dir,
29 assert_no_dbgsym_migration,
30 read_dbgsym_file,
31)
32from ._deb_options_profiles import DebBuildOptionsAndProfiles
33from ._manifest_constants import *
34from .architecture_support import DpkgArchitectureBuildProcessValuesTable
35from .builtin_manifest_rules import builtin_mode_normalization_rules
36from .exceptions import (
37 DebputySubstitutionError,
38 DebputyRuntimeErrorWithPreamble,
39)
40from .filesystem_scan import FSPath, FSRootDir, FSROOverlay, FSControlRootDir
41from .installations import (
42 InstallRule,
43 SourcePathMatcher,
44 PathAlreadyInstalledOrDiscardedError,
45 NoMatchForInstallPatternError,
46 InstallRuleContext,
47 BinaryPackageInstallRuleContext,
48 InstallSearchDirContext,
49 SearchDir,
50)
51from .intermediate_manifest import TarMember, PathType, IntermediateManifest
52from .maintscript_snippet import (
53 DpkgMaintscriptHelperCommand,
54 MaintscriptSnippetContainer,
55)
56from .manifest_conditions import ConditionContext
57from .manifest_parser.base_types import (
58 FileSystemMatchRule,
59 FileSystemExactMatchRule,
60 BuildEnvironments,
61)
62from .manifest_parser.util import AttributePath
63from .packager_provided_files import PackagerProvidedFile
64from .packages import BinaryPackage, SourcePackage
65from .plugin.api.feature_set import PluginProvidedFeatureSet
66from .plugin.api.impl import BinaryCtrlAccessorProviderCreator
67from .plugin.api.impl_types import (
68 PackageProcessingContextProvider,
69 PackageDataTable,
70)
71from .plugin.api.spec import (
72 FlushableSubstvars,
73 VirtualPath,
74 DebputyIntegrationMode,
75 INTEGRATION_MODE_DH_DEBPUTY_RRR,
76 INTEGRATION_MODE_FULL,
77)
78from debputy.plugins.debputy.binary_package_rules import ServiceRule
79from debputy.plugins.debputy.build_system_rules import BuildRule
80from .plugin.plugin_state import run_in_context_of_plugin
81from .substitution import Substitution
82from .transformation_rules import (
83 TransformationRule,
84 ModeNormalizationTransformationRule,
85 NormalizeShebangLineTransformation,
86)
87from .util import (
88 _error,
89 _warn,
90 debian_policy_normalize_symlink_target,
91 generated_content_dir,
92 _info,
93)
94from .yaml import MANIFEST_YAML
95from .yaml.compat import CommentedMap, CommentedSeq
98class PathNotCoveredByInstallRulesError(DebputyRuntimeErrorWithPreamble):
100 @property
101 def unmatched_paths(self) -> Sequence[VirtualPath]:
102 return self.args[1]
104 @property
105 def search_dir(self) -> VirtualPath:
106 return self.args[2]
108 def render_preamble(self) -> None:
109 _warn(
110 f"The following paths were present in {self.search_dir.fs_path}, but not installed (nor explicitly discarded)."
111 )
112 _warn("")
113 for entry in self.unmatched_paths:
114 desc = _describe_missing_path(entry)
115 _warn(f" * {desc}")
116 _warn("")
119@dataclass(slots=True)
120class DbgsymInfo:
121 binary_package: BinaryPackage
122 dbgsym_fs_root: FSPath
123 _dbgsym_root_fs: Optional[str]
124 dbgsym_ids: List[str]
125 run_dwz: bool
127 @property
128 def dbgsym_root_dir(self) -> str:
129 root_dir = self._dbgsym_root_fs
130 if root_dir is None:
131 root_dir = generated_content_dir(
132 package=self.binary_package,
133 subdir_key="dbgsym-fs-root",
134 )
135 self._dbgsym_root_fs = root_dir
136 return root_dir
138 @property
139 def dbgsym_ctrl_dir(self) -> FSControlRootDir:
140 return FSControlRootDir.create_root_dir(
141 os.path.join(self.dbgsym_root_dir, "DEBIAN")
142 )
145@dataclass(slots=True, frozen=True)
146class BinaryPackageData:
147 source_package: SourcePackage
148 binary_package: BinaryPackage
149 binary_staging_root_dir: str
150 fs_root: FSPath
151 substvars: FlushableSubstvars
152 package_metadata_context: PackageProcessingContextProvider
153 ctrl_creator: BinaryCtrlAccessorProviderCreator
154 dbgsym_info: DbgsymInfo
156 @property
157 def control_output_dir(self) -> FSControlRootDir:
158 return FSControlRootDir.create_root_dir(
159 generated_content_dir(
160 package=self.binary_package,
161 subdir_key="DEBIAN",
162 )
163 )
166@dataclass(slots=True)
167class PackageTransformationDefinition:
168 binary_package: BinaryPackage
169 substitution: Substitution
170 is_auto_generated_package: bool
171 binary_version: Optional[str] = None
172 search_dirs: Optional[List[FileSystemExactMatchRule]] = None
173 dpkg_maintscript_helper_snippets: List[DpkgMaintscriptHelperCommand] = field(
174 default_factory=list
175 )
176 maintscript_snippets: Dict[str, MaintscriptSnippetContainer] = field(
177 default_factory=dict
178 )
179 transformations: List[TransformationRule] = field(default_factory=list)
180 reserved_packager_provided_files: Dict[str, List[PackagerProvidedFile]] = field(
181 default_factory=dict
182 )
183 install_rules: List[InstallRule] = field(default_factory=list)
184 requested_service_rules: List[ServiceRule] = field(default_factory=list)
187def _path_to_tar_member(
188 path: FSPath,
189 clamp_mtime_to: int,
190) -> TarMember:
191 mtime = float(clamp_mtime_to)
192 owner, uid, group, gid = path.tar_owner_info
193 mode = path.mode
195 if path.has_fs_path:
196 mtime = min(mtime, path.mtime)
198 if path.is_dir:
199 path_type = PathType.DIRECTORY
200 elif path.is_file:
201 # TODO: someday we will need to deal with hardlinks and it might appear here.
202 path_type = PathType.FILE
203 elif path.is_symlink: 203 ↛ 223line 203 didn't jump to line 223 because the condition on line 203 was always true
204 # Special-case that we resolve immediately (since we need to normalize the target anyway)
205 link_target = debian_policy_normalize_symlink_target(
206 path.path,
207 path.readlink(),
208 )
209 return TarMember.virtual_path(
210 path.tar_path,
211 PathType.SYMLINK,
212 mtime,
213 link_target=link_target,
214 # Force mode to be 0777 as that is the mode we see in the data.tar. In theory, tar lets you set
215 # it to whatever. However, for reproducibility, we have to be well-behaved - and that is 0777.
216 mode=0o0777,
217 owner=owner,
218 uid=uid,
219 group=group,
220 gid=gid,
221 )
222 else:
223 assert not path.is_symlink
224 raise AssertionError(
225 f"Unsupported file type: {path.path} - not a file, dir nor a symlink!"
226 )
228 if not path.has_fs_path:
229 assert not path.is_file
230 return TarMember.virtual_path(
231 path.tar_path,
232 path_type,
233 mtime,
234 mode=mode,
235 owner=owner,
236 uid=uid,
237 group=group,
238 gid=gid,
239 )
240 may_steal_fs_path = path._can_replace_inline
241 return TarMember.from_file(
242 path.tar_path,
243 path.fs_path,
244 mode=mode,
245 uid=uid,
246 owner=owner,
247 gid=gid,
248 group=group,
249 path_type=path_type,
250 path_mtime=mtime,
251 clamp_mtime_to=clamp_mtime_to,
252 may_steal_fs_path=may_steal_fs_path,
253 )
256def _generate_intermediate_manifest(
257 fs_root: FSPath,
258 clamp_mtime_to: int,
259) -> Iterable[TarMember]:
260 symlinks = []
261 for path in fs_root.all_paths():
262 tar_member = _path_to_tar_member(path, clamp_mtime_to)
263 if tar_member.path_type == PathType.SYMLINK:
264 symlinks.append(tar_member)
265 continue
266 yield tar_member
267 yield from symlinks
270ST = TypeVar("ST")
271T = TypeVar("T")
274class AbstractYAMLSubStore(Generic[ST]):
275 def __init__(
276 self,
277 parent_store: Any,
278 parent_key: Optional[Union[int, str]],
279 store: Optional[ST] = None,
280 ) -> None:
281 if parent_store is not None and parent_key is not None:
282 try:
283 from_parent_store = parent_store[parent_key]
284 except (KeyError, IndexError):
285 from_parent_store = None
286 if ( 286 ↛ 291line 286 didn't jump to line 291 because the condition on line 286 was never true
287 store is not None
288 and from_parent_store is not None
289 and store is not parent_store
290 ):
291 raise ValueError(
292 "Store is provided but is not the one already in the parent store"
293 )
294 if store is None: 294 ↛ 296line 294 didn't jump to line 296 because the condition on line 294 was always true
295 store = from_parent_store
296 self._parent_store = parent_store
297 self._parent_key = parent_key
298 self._is_detached = (
299 parent_key is None or parent_store is None or parent_key not in parent_store
300 )
301 assert self._is_detached or store is not None
302 if store is None:
303 store = self._create_new_instance()
304 self._store: ST = store
306 def _create_new_instance(self) -> ST:
307 raise NotImplementedError
309 def create_definition_if_missing(self) -> None:
310 if self._is_detached:
311 self.create_definition()
313 def create_definition(self) -> None:
314 if not self._is_detached: 314 ↛ 315line 314 didn't jump to line 315 because the condition on line 314 was never true
315 raise RuntimeError("Definition is already present")
316 parent_store = self._parent_store
317 if parent_store is None: 317 ↛ 318line 317 didn't jump to line 318 because the condition on line 317 was never true
318 raise RuntimeError(
319 f"Definition is not attached to any parent!? ({self.__class__.__name__})"
320 )
321 if isinstance(parent_store, list):
322 assert self._parent_key is None
323 self._parent_key = len(parent_store)
324 self._parent_store.append(self._store)
325 else:
326 parent_store[self._parent_key] = self._store
327 self._is_detached = False
329 def remove_definition(self) -> None:
330 self._ensure_attached()
331 del self._parent_store[self._parent_key]
332 if isinstance(self._parent_store, list):
333 self._parent_key = None
334 self._is_detached = True
336 def _ensure_attached(self) -> None:
337 if self._is_detached:
338 raise RuntimeError("The definition has been removed!")
341class AbstractYAMLListSubStore(Generic[T], AbstractYAMLSubStore[List[T]]):
342 def _create_new_instance(self) -> List[T]:
343 return CommentedSeq()
346class AbstractYAMLDictSubStore(Generic[T], AbstractYAMLSubStore[Dict[str, T]]):
347 def _create_new_instance(self) -> Dict[str, T]:
348 return CommentedMap()
351class MutableCondition:
352 @classmethod
353 def arch_matches(cls, arch_filter: str) -> CommentedMap:
354 return CommentedMap({MK_CONDITION_ARCH_MATCHES: arch_filter})
356 @classmethod
357 def build_profiles_matches(cls, build_profiles_matches: str) -> CommentedMap:
358 return CommentedMap(
359 {MK_CONDITION_BUILD_PROFILES_MATCHES: build_profiles_matches}
360 )
363class MutableYAMLSymlink(AbstractYAMLDictSubStore[Any]):
364 @classmethod
365 def new_symlink(
366 cls, link_path: str, link_target: str, condition: Optional[Any]
367 ) -> "MutableYAMLSymlink":
368 inner = {
369 MK_TRANSFORMATIONS_CREATE_SYMLINK_LINK_PATH: link_path,
370 MK_TRANSFORMATIONS_CREATE_SYMLINK_LINK_TARGET: link_target,
371 }
372 content = {MK_TRANSFORMATIONS_CREATE_SYMLINK: inner}
373 if condition is not None: 373 ↛ 374line 373 didn't jump to line 374 because the condition on line 373 was never true
374 inner["when"] = condition
375 return cls(None, None, store=CommentedMap(content))
377 @property
378 def symlink_path(self) -> str:
379 return self._store[MK_TRANSFORMATIONS_CREATE_SYMLINK][
380 MK_TRANSFORMATIONS_CREATE_SYMLINK_LINK_PATH
381 ]
383 @symlink_path.setter
384 def symlink_path(self, path: str) -> None:
385 self._store[MK_TRANSFORMATIONS_CREATE_SYMLINK][
386 MK_TRANSFORMATIONS_CREATE_SYMLINK_LINK_PATH
387 ] = path
389 @property
390 def symlink_target(self) -> Optional[str]:
391 return self._store[MK_TRANSFORMATIONS_CREATE_SYMLINK][
392 MK_TRANSFORMATIONS_CREATE_SYMLINK_LINK_TARGET
393 ]
395 @symlink_target.setter
396 def symlink_target(self, target: str) -> None:
397 self._store[MK_TRANSFORMATIONS_CREATE_SYMLINK][
398 MK_TRANSFORMATIONS_CREATE_SYMLINK_LINK_TARGET
399 ] = target
402class MutableYAMLConffileManagementItem(AbstractYAMLDictSubStore[Any]):
403 @classmethod
404 def rm_conffile(
405 cls,
406 conffile: str,
407 prior_to_version: Optional[str],
408 owning_package: Optional[str],
409 ) -> "MutableYAMLConffileManagementItem":
410 r = cls(
411 None,
412 None,
413 store=CommentedMap(
414 {
415 MK_CONFFILE_MANAGEMENT_REMOVE: CommentedMap(
416 {MK_CONFFILE_MANAGEMENT_REMOVE_PATH: conffile}
417 )
418 }
419 ),
420 )
421 r.prior_to_version = prior_to_version
422 r.owning_package = owning_package
423 return r
425 @classmethod
426 def mv_conffile(
427 cls,
428 old_conffile: str,
429 new_conffile: str,
430 prior_to_version: Optional[str],
431 owning_package: Optional[str],
432 ) -> "MutableYAMLConffileManagementItem":
433 r = cls(
434 None,
435 None,
436 store=CommentedMap(
437 {
438 MK_CONFFILE_MANAGEMENT_RENAME: CommentedMap(
439 {
440 MK_CONFFILE_MANAGEMENT_RENAME_SOURCE: old_conffile,
441 MK_CONFFILE_MANAGEMENT_RENAME_TARGET: new_conffile,
442 }
443 )
444 }
445 ),
446 )
447 r.prior_to_version = prior_to_version
448 r.owning_package = owning_package
449 return r
451 @property
452 def _container(self) -> Dict[str, Any]:
453 assert len(self._store) == 1
454 return next(iter(self._store.values()))
456 @property
457 def command(self) -> str:
458 assert len(self._store) == 1
459 return next(iter(self._store))
461 @property
462 def obsolete_conffile(self) -> str:
463 if self.command == MK_CONFFILE_MANAGEMENT_REMOVE:
464 return self._container[MK_CONFFILE_MANAGEMENT_REMOVE_PATH]
465 assert self.command == MK_CONFFILE_MANAGEMENT_RENAME
466 return self._container[MK_CONFFILE_MANAGEMENT_RENAME_SOURCE]
468 @obsolete_conffile.setter
469 def obsolete_conffile(self, value: str) -> None:
470 if self.command == MK_CONFFILE_MANAGEMENT_REMOVE:
471 self._container[MK_CONFFILE_MANAGEMENT_REMOVE_PATH] = value
472 else:
473 assert self.command == MK_CONFFILE_MANAGEMENT_RENAME
474 self._container[MK_CONFFILE_MANAGEMENT_RENAME_SOURCE] = value
476 @property
477 def new_conffile(self) -> str:
478 if self.command != MK_CONFFILE_MANAGEMENT_RENAME:
479 raise TypeError(
480 f"The new_conffile attribute is only applicable to command {MK_CONFFILE_MANAGEMENT_RENAME}."
481 f" This is a {self.command}"
482 )
483 return self._container[MK_CONFFILE_MANAGEMENT_RENAME_TARGET]
485 @new_conffile.setter
486 def new_conffile(self, value: str) -> None:
487 if self.command != MK_CONFFILE_MANAGEMENT_RENAME:
488 raise TypeError(
489 f"The new_conffile attribute is only applicable to command {MK_CONFFILE_MANAGEMENT_RENAME}."
490 f" This is a {self.command}"
491 )
492 self._container[MK_CONFFILE_MANAGEMENT_RENAME_TARGET] = value
494 @property
495 def prior_to_version(self) -> Optional[str]:
496 return self._container.get(MK_CONFFILE_MANAGEMENT_X_PRIOR_TO_VERSION)
498 @prior_to_version.setter
499 def prior_to_version(self, value: Optional[str]) -> None:
500 if value is None:
501 try:
502 del self._container[MK_CONFFILE_MANAGEMENT_X_PRIOR_TO_VERSION]
503 except KeyError:
504 pass
505 else:
506 self._container[MK_CONFFILE_MANAGEMENT_X_PRIOR_TO_VERSION] = value
508 @property
509 def owning_package(self) -> Optional[str]:
510 return self._container[MK_CONFFILE_MANAGEMENT_X_PRIOR_TO_VERSION]
512 @owning_package.setter
513 def owning_package(self, value: Optional[str]) -> None:
514 if value is None:
515 try:
516 del self._container[MK_CONFFILE_MANAGEMENT_X_OWNING_PACKAGE]
517 except KeyError:
518 pass
519 else:
520 self._container[MK_CONFFILE_MANAGEMENT_X_OWNING_PACKAGE] = value
523class MutableYAMLPackageDefinition(AbstractYAMLDictSubStore):
524 def _list_store(
525 self, key, *, create_if_absent: bool = False
526 ) -> Optional[List[Dict[str, Any]]]:
527 if self._is_detached or key not in self._store:
528 if create_if_absent: 528 ↛ 529line 528 didn't jump to line 529 because the condition on line 528 was never true
529 return None
530 self.create_definition_if_missing()
531 self._store[key] = []
532 return self._store[key]
534 def _insert_item(self, key: str, item: AbstractYAMLDictSubStore) -> None:
535 parent_store = self._list_store(key, create_if_absent=True)
536 assert parent_store is not None
537 if not item._is_detached or ( 537 ↛ 540line 537 didn't jump to line 540 because the condition on line 537 was never true
538 item._parent_store is not None and item._parent_store is not parent_store
539 ):
540 raise RuntimeError(
541 "Item is already attached or associated with a different container"
542 )
543 item._parent_store = parent_store
544 item.create_definition()
546 def add_symlink(self, symlink: MutableYAMLSymlink) -> None:
547 self._insert_item(MK_TRANSFORMATIONS, symlink)
549 def symlinks(self) -> Iterable[MutableYAMLSymlink]:
550 store = self._list_store(MK_TRANSFORMATIONS)
551 if store is None: 551 ↛ 552line 551 didn't jump to line 552 because the condition on line 551 was never true
552 return
553 for i in range(len(store)): 553 ↛ 554line 553 didn't jump to line 554 because the loop on line 553 never started
554 d = store[i]
555 if d and isinstance(d, dict) and len(d) == 1 and "symlink" in d:
556 yield MutableYAMLSymlink(store, i)
558 def conffile_management_items(self) -> Iterable[MutableYAMLConffileManagementItem]:
559 store = self._list_store(MK_CONFFILE_MANAGEMENT)
560 if store is None: 560 ↛ 561line 560 didn't jump to line 561 because the condition on line 560 was never true
561 return
562 yield from (
563 MutableYAMLConffileManagementItem(store, i) for i in range(len(store))
564 )
566 def add_conffile_management(
567 self, conffile_management_item: MutableYAMLConffileManagementItem
568 ) -> None:
569 self._insert_item(MK_CONFFILE_MANAGEMENT, conffile_management_item)
572class AbstractMutableYAMLInstallRule(AbstractYAMLDictSubStore):
573 @property
574 def _container(self) -> Dict[str, Any]:
575 assert len(self._store) == 1
576 return next(iter(self._store.values()))
578 @property
579 def into(self) -> Optional[List[str]]:
580 v = self._container[MK_INSTALLATIONS_INSTALL_INTO]
581 if v is None:
582 return None
583 if isinstance(v, str):
584 return [v]
585 return v
587 @into.setter
588 def into(self, new_value: Optional[Union[str, List[str]]]) -> None:
589 if new_value is None: 589 ↛ 593line 589 didn't jump to line 593 because the condition on line 589 was always true
590 with suppress(KeyError):
591 del self._container[MK_INSTALLATIONS_INSTALL_INTO]
592 return
593 if isinstance(new_value, str):
594 self._container[MK_INSTALLATIONS_INSTALL_INTO] = new_value
595 return
596 new_list = CommentedSeq(new_value)
597 self._container[MK_INSTALLATIONS_INSTALL_INTO] = new_list
599 @property
600 def when(self) -> Optional[Union[str, Mapping[str, Any]]]:
601 return self._container[MK_CONDITION_WHEN]
603 @when.setter
604 def when(self, new_value: Optional[Union[str, Mapping[str, Any]]]) -> None:
605 if new_value is None: 605 ↛ 606line 605 didn't jump to line 606 because the condition on line 605 was never true
606 with suppress(KeyError):
607 del self._container[MK_CONDITION_WHEN]
608 return
609 if isinstance(new_value, str): 609 ↛ 610line 609 didn't jump to line 610 because the condition on line 609 was never true
610 self._container[MK_CONDITION_WHEN] = new_value
611 return
612 new_map = CommentedMap(new_value)
613 self._container[MK_CONDITION_WHEN] = new_map
615 @classmethod
616 def install_dest(
617 cls,
618 sources: Union[str, List[str]],
619 into: Optional[Union[str, List[str]]],
620 *,
621 dest_dir: Optional[str] = None,
622 when: Optional[Union[str, Mapping[str, Any]]] = None,
623 ) -> "MutableYAMLInstallRuleInstall":
624 k = MK_INSTALLATIONS_INSTALL_SOURCES
625 if isinstance(sources, str):
626 k = MK_INSTALLATIONS_INSTALL_SOURCE
627 r = MutableYAMLInstallRuleInstall(
628 None,
629 None,
630 store=CommentedMap(
631 {
632 MK_INSTALLATIONS_INSTALL: CommentedMap(
633 {
634 k: sources,
635 }
636 )
637 }
638 ),
639 )
640 r.dest_dir = dest_dir
641 r.into = into
642 if when is not None:
643 r.when = when
644 return r
646 @classmethod
647 def multi_dest_install(
648 cls,
649 sources: Union[str, List[str]],
650 dest_dirs: Sequence[str],
651 into: Optional[Union[str, List[str]]],
652 *,
653 when: Optional[Union[str, Mapping[str, Any]]] = None,
654 ) -> "MutableYAMLInstallRuleInstall":
655 k = MK_INSTALLATIONS_INSTALL_SOURCES
656 if isinstance(sources, str): 656 ↛ 658line 656 didn't jump to line 658 because the condition on line 656 was always true
657 k = MK_INSTALLATIONS_INSTALL_SOURCE
658 r = MutableYAMLInstallRuleInstall(
659 None,
660 None,
661 store=CommentedMap(
662 {
663 MK_INSTALLATIONS_MULTI_DEST_INSTALL: CommentedMap(
664 {
665 k: sources,
666 "dest-dirs": dest_dirs,
667 }
668 )
669 }
670 ),
671 )
672 r.into = into
673 if when is not None: 673 ↛ 674line 673 didn't jump to line 674 because the condition on line 673 was never true
674 r.when = when
675 return r
677 @classmethod
678 def install_as(
679 cls,
680 source: str,
681 install_as: str,
682 into: Optional[Union[str, List[str]]],
683 when: Optional[Union[str, Mapping[str, Any]]] = None,
684 ) -> "MutableYAMLInstallRuleInstall":
685 r = MutableYAMLInstallRuleInstall(
686 None,
687 None,
688 store=CommentedMap(
689 {
690 MK_INSTALLATIONS_INSTALL: CommentedMap(
691 {
692 MK_INSTALLATIONS_INSTALL_SOURCE: source,
693 MK_INSTALLATIONS_INSTALL_AS: install_as,
694 }
695 )
696 }
697 ),
698 )
699 r.into = into
700 if when is not None: 700 ↛ 701line 700 didn't jump to line 701 because the condition on line 700 was never true
701 r.when = when
702 return r
704 @classmethod
705 def install_doc_as(
706 cls,
707 source: str,
708 install_as: str,
709 into: Optional[Union[str, List[str]]],
710 when: Optional[Union[str, Mapping[str, Any]]] = None,
711 ) -> "MutableYAMLInstallRuleInstall":
712 r = MutableYAMLInstallRuleInstall(
713 None,
714 None,
715 store=CommentedMap(
716 {
717 MK_INSTALLATIONS_INSTALL_DOCS: CommentedMap(
718 {
719 MK_INSTALLATIONS_INSTALL_SOURCE: source,
720 MK_INSTALLATIONS_INSTALL_AS: install_as,
721 }
722 )
723 }
724 ),
725 )
726 r.into = into
727 if when is not None:
728 r.when = when
729 return r
731 @classmethod
732 def install_docs(
733 cls,
734 sources: Union[str, List[str]],
735 into: Optional[Union[str, List[str]]],
736 *,
737 dest_dir: Optional[str] = None,
738 when: Optional[Union[str, Mapping[str, Any]]] = None,
739 ) -> "MutableYAMLInstallRuleInstall":
740 k = MK_INSTALLATIONS_INSTALL_SOURCES
741 if isinstance(sources, str):
742 k = MK_INSTALLATIONS_INSTALL_SOURCE
743 r = MutableYAMLInstallRuleInstall(
744 None,
745 None,
746 store=CommentedMap(
747 {
748 MK_INSTALLATIONS_INSTALL_DOCS: CommentedMap(
749 {
750 k: sources,
751 }
752 )
753 }
754 ),
755 )
756 r.into = into
757 r.dest_dir = dest_dir
758 if when is not None:
759 r.when = when
760 return r
762 @classmethod
763 def install_examples(
764 cls,
765 sources: Union[str, List[str]],
766 into: Optional[Union[str, List[str]]],
767 when: Optional[Union[str, Mapping[str, Any]]] = None,
768 ) -> "MutableYAMLInstallRuleInstallExamples":
769 k = MK_INSTALLATIONS_INSTALL_SOURCES
770 if isinstance(sources, str):
771 k = MK_INSTALLATIONS_INSTALL_SOURCE
772 r = MutableYAMLInstallRuleInstallExamples(
773 None,
774 None,
775 store=CommentedMap(
776 {
777 MK_INSTALLATIONS_INSTALL_EXAMPLES: CommentedMap(
778 {
779 k: sources,
780 }
781 )
782 }
783 ),
784 )
785 r.into = into
786 if when is not None: 786 ↛ 787line 786 didn't jump to line 787 because the condition on line 786 was never true
787 r.when = when
788 return r
790 @classmethod
791 def install_man(
792 cls,
793 sources: Union[str, List[str]],
794 into: Optional[Union[str, List[str]]],
795 language: Optional[str],
796 when: Optional[Union[str, Mapping[str, Any]]] = None,
797 ) -> "MutableYAMLInstallRuleMan":
798 k = MK_INSTALLATIONS_INSTALL_SOURCES
799 if isinstance(sources, str): 799 ↛ 800line 799 didn't jump to line 800 because the condition on line 799 was never true
800 k = MK_INSTALLATIONS_INSTALL_SOURCE
801 r = MutableYAMLInstallRuleMan(
802 None,
803 None,
804 store=CommentedMap(
805 {
806 MK_INSTALLATIONS_INSTALL_MAN: CommentedMap(
807 {
808 k: sources,
809 }
810 )
811 }
812 ),
813 )
814 r.language = language
815 r.into = into
816 if when is not None: 816 ↛ 817line 816 didn't jump to line 817 because the condition on line 816 was never true
817 r.when = when
818 return r
820 @classmethod
821 def discard(
822 cls,
823 sources: Union[str, List[str]],
824 ) -> "MutableYAMLInstallRuleDiscard":
825 return MutableYAMLInstallRuleDiscard(
826 None,
827 None,
828 store=CommentedMap({MK_INSTALLATIONS_DISCARD: sources}),
829 )
832class MutableYAMLInstallRuleInstallExamples(AbstractMutableYAMLInstallRule):
833 pass
836class MutableYAMLInstallRuleMan(AbstractMutableYAMLInstallRule):
837 @property
838 def language(self) -> Optional[str]:
839 return self._container[MK_INSTALLATIONS_INSTALL_MAN_LANGUAGE]
841 @language.setter
842 def language(self, new_value: Optional[str]) -> None:
843 if new_value is not None:
844 self._container[MK_INSTALLATIONS_INSTALL_MAN_LANGUAGE] = new_value
845 return
846 with suppress(KeyError):
847 del self._container[MK_INSTALLATIONS_INSTALL_MAN_LANGUAGE]
850class MutableYAMLInstallRuleDiscard(AbstractMutableYAMLInstallRule):
851 pass
854class MutableYAMLInstallRuleInstall(AbstractMutableYAMLInstallRule):
855 @property
856 def sources(self) -> List[str]:
857 v = self._container[MK_INSTALLATIONS_INSTALL_SOURCES]
858 if isinstance(v, str):
859 return [v]
860 return v
862 @sources.setter
863 def sources(self, new_value: Union[str, List[str]]) -> None:
864 if isinstance(new_value, str):
865 self._container[MK_INSTALLATIONS_INSTALL_SOURCES] = new_value
866 return
867 new_list = CommentedSeq(new_value)
868 self._container[MK_INSTALLATIONS_INSTALL_SOURCES] = new_list
870 @property
871 def dest_dir(self) -> Optional[str]:
872 return self._container.get(MK_INSTALLATIONS_INSTALL_DEST_DIR)
874 @dest_dir.setter
875 def dest_dir(self, new_value: Optional[str]) -> None:
876 if new_value is not None and self.dest_as is not None: 876 ↛ 877line 876 didn't jump to line 877 because the condition on line 876 was never true
877 raise ValueError(
878 f'Cannot both have a "{MK_INSTALLATIONS_INSTALL_DEST_DIR}" and'
879 f' "{MK_INSTALLATIONS_INSTALL_AS}"'
880 )
881 if new_value is not None:
882 self._container[MK_INSTALLATIONS_INSTALL_DEST_DIR] = new_value
883 else:
884 with suppress(KeyError):
885 del self._container[MK_INSTALLATIONS_INSTALL_DEST_DIR]
887 @property
888 def dest_as(self) -> Optional[str]:
889 return self._container.get(MK_INSTALLATIONS_INSTALL_AS)
891 @dest_as.setter
892 def dest_as(self, new_value: Optional[str]) -> None:
893 if new_value is not None:
894 if self.dest_dir is not None:
895 raise ValueError(
896 f'Cannot both have a "{MK_INSTALLATIONS_INSTALL_DEST_DIR}" and'
897 f' "{MK_INSTALLATIONS_INSTALL_AS}"'
898 )
900 sources = self._container[MK_INSTALLATIONS_INSTALL_SOURCES]
901 if isinstance(sources, list):
902 if len(sources) != 1:
903 raise ValueError(
904 f'Cannot have "{MK_INSTALLATIONS_INSTALL_AS}" when'
905 f' "{MK_INSTALLATIONS_INSTALL_SOURCES}" is not exactly one item'
906 )
907 self.sources = sources[0]
908 self._container[MK_INSTALLATIONS_INSTALL_AS] = new_value
909 else:
910 with suppress(KeyError):
911 del self._container[MK_INSTALLATIONS_INSTALL_AS]
914class MutableYAMLInstallationsDefinition(AbstractYAMLListSubStore[Any]):
915 def append(self, install_rule: AbstractMutableYAMLInstallRule) -> None:
916 parent_store = self._store
917 if not install_rule._is_detached or ( 917 ↛ 921line 917 didn't jump to line 921 because the condition on line 917 was never true
918 install_rule._parent_store is not None
919 and install_rule._parent_store is not parent_store
920 ):
921 raise RuntimeError(
922 "Item is already attached or associated with a different container"
923 )
924 self.create_definition_if_missing()
925 install_rule._parent_store = parent_store
926 install_rule.create_definition()
928 def extend(self, install_rules: Iterable[AbstractMutableYAMLInstallRule]) -> None:
929 parent_store = self._store
930 for install_rule in install_rules:
931 if not install_rule._is_detached or ( 931 ↛ 935line 931 didn't jump to line 935 because the condition on line 931 was never true
932 install_rule._parent_store is not None
933 and install_rule._parent_store is not parent_store
934 ):
935 raise RuntimeError(
936 "Item is already attached or associated with a different container"
937 )
938 self.create_definition_if_missing()
939 install_rule._parent_store = parent_store
940 install_rule.create_definition()
943class MutableYAMLManifestVariables(AbstractYAMLDictSubStore):
944 @property
945 def variables(self) -> Dict[str, Any]:
946 return self._store
948 def __setitem__(self, key: str, value: Any) -> None:
949 self._store[key] = value
950 self.create_definition_if_missing()
953class MutableYAMLManifestDefinitions(AbstractYAMLDictSubStore):
954 def manifest_variables(
955 self, *, create_if_absent: bool = True
956 ) -> MutableYAMLManifestVariables:
957 d = MutableYAMLManifestVariables(self._store, MK_MANIFEST_VARIABLES)
958 if create_if_absent: 958 ↛ 959line 958 didn't jump to line 959 because the condition on line 958 was never true
959 d.create_definition_if_missing()
960 return d
963class MutableYAMLRemoveDuringCleanDefinitions(AbstractYAMLListSubStore[str]):
964 def append(self, rule: str) -> None:
965 self.create_definition_if_missing()
966 self._store.append(rule)
968 def __len__(self) -> int:
969 return len(self._store)
971 def extend(self, rules: Iterable[str]) -> None:
972 it = iter(rules)
973 try:
974 first_rule = next(it)
975 except StopIteration:
976 return
977 self.create_definition_if_missing()
978 self._store.append(first_rule)
979 self._store.extend(it)
982class MutableYAMLManifest:
983 def __init__(self, store: Any) -> None:
984 self._store = store
986 @classmethod
987 def empty_manifest(cls) -> "MutableYAMLManifest":
988 return cls(CommentedMap({MK_MANIFEST_VERSION: DEFAULT_MANIFEST_VERSION}))
990 @property
991 def manifest_version(self) -> str:
992 return self._store[MK_MANIFEST_VERSION]
994 @manifest_version.setter
995 def manifest_version(self, version: str) -> None:
996 if version not in SUPPORTED_MANIFEST_VERSIONS:
997 raise ValueError("Unsupported version")
998 self._store[MK_MANIFEST_VERSION] = version
1000 def remove_during_clean(
1001 self,
1002 *,
1003 create_if_absent: bool = True,
1004 ) -> MutableYAMLRemoveDuringCleanDefinitions:
1005 d = MutableYAMLRemoveDuringCleanDefinitions(
1006 self._store, MK_MANIFEST_REMOVE_DURING_CLEAN
1007 )
1008 if create_if_absent: 1008 ↛ 1009line 1008 didn't jump to line 1009 because the condition on line 1008 was never true
1009 d.create_definition_if_missing()
1010 return d
1012 def installations(
1013 self,
1014 *,
1015 create_if_absent: bool = True,
1016 ) -> MutableYAMLInstallationsDefinition:
1017 d = MutableYAMLInstallationsDefinition(self._store, MK_INSTALLATIONS)
1018 if create_if_absent: 1018 ↛ 1019line 1018 didn't jump to line 1019 because the condition on line 1018 was never true
1019 d.create_definition_if_missing()
1020 return d
1022 def manifest_definitions(
1023 self,
1024 *,
1025 create_if_absent: bool = True,
1026 ) -> MutableYAMLManifestDefinitions:
1027 d = MutableYAMLManifestDefinitions(self._store, MK_MANIFEST_DEFINITIONS)
1028 if create_if_absent: 1028 ↛ 1029line 1028 didn't jump to line 1029 because the condition on line 1028 was never true
1029 d.create_definition_if_missing()
1030 return d
1032 def package(
1033 self, name: str, *, create_if_absent: bool = True
1034 ) -> MutableYAMLPackageDefinition:
1035 if MK_PACKAGES not in self._store: 1035 ↛ 1037line 1035 didn't jump to line 1037 because the condition on line 1035 was always true
1036 self._store[MK_PACKAGES] = CommentedMap()
1037 packages_store = self._store[MK_PACKAGES]
1038 package = packages_store.get(name)
1039 if package is None: 1039 ↛ 1046line 1039 didn't jump to line 1046 because the condition on line 1039 was always true
1040 if not create_if_absent: 1040 ↛ 1041line 1040 didn't jump to line 1041 because the condition on line 1040 was never true
1041 raise KeyError(name)
1042 assert packages_store is not None
1043 d = MutableYAMLPackageDefinition(packages_store, name)
1044 d.create_definition()
1045 else:
1046 d = MutableYAMLPackageDefinition(packages_store, name)
1047 return d
1049 def write_to(self, fd) -> None:
1050 MANIFEST_YAML.dump(self._store, fd)
1053def _describe_missing_path(entry: VirtualPath) -> str:
1054 if entry.is_dir:
1055 return f"{entry.fs_path}/ (empty directory; possible integration point)"
1056 if entry.is_symlink:
1057 target = os.readlink(entry.fs_path)
1058 return f"{entry.fs_path} (symlink; links to {target})"
1059 if entry.is_file:
1060 return f"{entry.fs_path} (file)"
1061 return f"{entry.fs_path} (other!? Probably not supported by debputy and may need a `remove`)"
1064def _detect_missing_installations(
1065 path_matcher: SourcePathMatcher,
1066 search_dir: VirtualPath,
1067) -> None:
1068 if not search_dir.is_dir: 1068 ↛ 1069line 1068 didn't jump to line 1069 because the condition on line 1068 was never true
1069 return
1070 missing = list(path_matcher.detect_missing(search_dir))
1071 if not missing:
1072 return
1074 excl = textwrap.dedent(
1075 """\
1076 - discard: "*"
1077 """
1078 )
1080 raise PathNotCoveredByInstallRulesError(
1081 "Please review the list and add either install rules or exclusions to `installations` in"
1082 " debian/debputy.manifest. If you do not need any of these paths, add the following to the"
1083 f" end of your 'installations`:\n\n{excl}\n",
1084 missing,
1085 search_dir,
1086 )
1089def _list_automatic_discard_rules(path_matcher: SourcePathMatcher) -> None:
1090 used_discard_rules = path_matcher.used_auto_discard_rules
1091 # Discard rules can match and then be overridden. In that case, they appear
1092 # but have 0 matches.
1093 if not sum((len(v) for v in used_discard_rules.values()), 0):
1094 return
1095 _info("The following automatic discard rules were triggered:")
1096 example_path: Optional[str] = None
1097 for rule in sorted(used_discard_rules):
1098 for fs_path in sorted(used_discard_rules[rule]):
1099 if example_path is None: 1099 ↛ 1101line 1099 didn't jump to line 1101 because the condition on line 1099 was always true
1100 example_path = fs_path
1101 _info(f" * {rule} -> {fs_path}")
1102 assert example_path is not None
1103 _info("")
1104 _info(
1105 "Note that some of these may have been overruled. The overrule detection logic is not"
1106 )
1107 _info("100% reliable.")
1108 _info("")
1109 _info(
1110 "You can overrule an automatic discard rule by explicitly listing the path. As an example:"
1111 )
1112 _info(" installations:")
1113 _info(" - install:")
1114 _info(f" source: {example_path}")
1117def _install_everything_from_source_dir_if_present(
1118 dctrl_bin: BinaryPackage,
1119 substitution: Substitution,
1120 path_matcher: SourcePathMatcher,
1121 install_rule_context: InstallRuleContext,
1122 source_condition_context: ConditionContext,
1123 source_dir: VirtualPath,
1124 *,
1125 into_dir: Optional[VirtualPath] = None,
1126) -> None:
1127 attribute_path = AttributePath.builtin_path()[f"installing {source_dir.fs_path}"]
1128 pkg_set = frozenset([dctrl_bin])
1129 install_rule = run_in_context_of_plugin(
1130 "debputy",
1131 InstallRule.install_dest,
1132 [FileSystemMatchRule.from_path_match("*", attribute_path, substitution)],
1133 None,
1134 pkg_set,
1135 f"Built-in; install everything from {source_dir.fs_path} into {dctrl_bin.name}",
1136 None,
1137 )
1138 pkg_search_dir: Tuple[SearchDir] = (
1139 SearchDir(
1140 source_dir,
1141 pkg_set,
1142 ),
1143 )
1144 replacements = {
1145 "search_dirs": pkg_search_dir,
1146 }
1147 if into_dir is not None: 1147 ↛ 1148line 1147 didn't jump to line 1148 because the condition on line 1147 was never true
1148 binary_package_contexts = dict(install_rule_context.binary_package_contexts)
1149 updated = binary_package_contexts[dctrl_bin.name].replace(fs_root=into_dir)
1150 binary_package_contexts[dctrl_bin.name] = updated
1151 replacements["binary_package_contexts"] = binary_package_contexts
1153 fake_install_rule_context = install_rule_context.replace(**replacements)
1154 try:
1155 install_rule.perform_install(
1156 path_matcher,
1157 fake_install_rule_context,
1158 source_condition_context,
1159 )
1160 except (
1161 NoMatchForInstallPatternError,
1162 PathAlreadyInstalledOrDiscardedError,
1163 ):
1164 # Empty directory or everything excluded by default; ignore the error
1165 pass
1168def _add_build_install_dirs_to_per_package_search_dirs(
1169 build_system_install_dirs: Sequence[Tuple[str, FrozenSet[BinaryPackage]]],
1170 per_package_search_dirs: Dict[BinaryPackage, List[VirtualPath]],
1171 as_path: Callable[[str], VirtualPath],
1172) -> None:
1173 seen_pp_search_dirs: Set[Tuple[BinaryPackage, str]] = set()
1174 for dest_dir, for_packages in build_system_install_dirs:
1175 dest_path = as_path(dest_dir)
1176 for pkg in for_packages:
1177 seen_key = (pkg, dest_dir)
1178 if seen_key in seen_pp_search_dirs:
1179 continue
1180 seen_pp_search_dirs.add(seen_key)
1181 if pkg not in per_package_search_dirs:
1182 per_package_search_dirs[pkg] = [dest_path]
1183 else:
1184 per_package_search_dirs[pkg].append(dest_path)
1187class HighLevelManifest:
1188 def __init__(
1189 self,
1190 manifest_path: str,
1191 mutable_manifest: Optional[MutableYAMLManifest],
1192 remove_during_clean_rules: List[FileSystemMatchRule],
1193 install_rules: Optional[List[InstallRule]],
1194 source_package: SourcePackage,
1195 binary_packages: Mapping[str, BinaryPackage],
1196 substitution: Substitution,
1197 package_transformations: Mapping[str, PackageTransformationDefinition],
1198 dpkg_architecture_variables: DpkgArchitectureBuildProcessValuesTable,
1199 dpkg_arch_query_table: DpkgArchTable,
1200 build_env: DebBuildOptionsAndProfiles,
1201 build_environments: BuildEnvironments,
1202 build_rules: Optional[List[BuildRule]],
1203 plugin_provided_feature_set: PluginProvidedFeatureSet,
1204 debian_dir: VirtualPath,
1205 ) -> None:
1206 self.manifest_path = manifest_path
1207 self.mutable_manifest = mutable_manifest
1208 self._remove_during_clean_rules: List[FileSystemMatchRule] = (
1209 remove_during_clean_rules
1210 )
1211 self._install_rules = install_rules
1212 self._source_package = source_package
1213 self._binary_packages = binary_packages
1214 self.substitution = substitution
1215 self.package_transformations = package_transformations
1216 self._dpkg_architecture_variables = dpkg_architecture_variables
1217 self._dpkg_arch_query_table = dpkg_arch_query_table
1218 self._build_env = build_env
1219 self._used_for: Set[str] = set()
1220 self.build_environments = build_environments
1221 self.build_rules = build_rules
1222 self._plugin_provided_feature_set = plugin_provided_feature_set
1223 self._debian_dir = debian_dir
1224 self._source_condition_context = ConditionContext(
1225 binary_package=None,
1226 substitution=self.substitution,
1227 deb_options_and_profiles=self._build_env,
1228 dpkg_architecture_variables=self._dpkg_architecture_variables,
1229 dpkg_arch_query_table=self._dpkg_arch_query_table,
1230 )
1232 def source_version(self, include_binnmu_version: bool = True) -> str:
1233 # TODO: There should an easier way to determine the source version; really.
1234 version_var = "{{DEB_VERSION}}"
1235 if not include_binnmu_version:
1236 version_var = "{{_DEBPUTY_INTERNAL_NON_BINNMU_SOURCE}}"
1237 try:
1238 return self.substitution.substitute(
1239 version_var, "internal (resolve version)"
1240 )
1241 except DebputySubstitutionError as e:
1242 raise AssertionError(f"Could not resolve {version_var}") from e
1244 @property
1245 def source_condition_context(self) -> ConditionContext:
1246 return self._source_condition_context
1248 @property
1249 def debian_dir(self) -> VirtualPath:
1250 return self._debian_dir
1252 @property
1253 def dpkg_architecture_variables(self) -> DpkgArchitectureBuildProcessValuesTable:
1254 return self._dpkg_architecture_variables
1256 @property
1257 def deb_options_and_profiles(self) -> DebBuildOptionsAndProfiles:
1258 return self._build_env
1260 @property
1261 def plugin_provided_feature_set(self) -> PluginProvidedFeatureSet:
1262 return self._plugin_provided_feature_set
1264 @property
1265 def remove_during_clean_rules(self) -> List[FileSystemMatchRule]:
1266 return self._remove_during_clean_rules
1268 @property
1269 def active_packages(self) -> Iterable[BinaryPackage]:
1270 yield from (p for p in self._binary_packages.values() if p.should_be_acted_on)
1272 @property
1273 def all_packages(self) -> Iterable[BinaryPackage]:
1274 yield from self._binary_packages.values()
1276 def package_state_for(self, package: str) -> PackageTransformationDefinition:
1277 return self.package_transformations[package]
1279 def _detect_doc_main_package_for(self, package: BinaryPackage) -> BinaryPackage:
1280 name = package.name
1281 # If it is not a -doc package, then docs should be installed
1282 # under its own package name.
1283 if not name.endswith("-doc"): 1283 ↛ 1285line 1283 didn't jump to line 1285 because the condition on line 1283 was always true
1284 return package
1285 name = name[:-4]
1286 main_package = self._binary_packages.get(name)
1287 if main_package:
1288 return main_package
1289 if name.startswith("lib"):
1290 dev_pkg = self._binary_packages.get(f"{name}-dev")
1291 if dev_pkg:
1292 return dev_pkg
1294 # If we found no better match; default to the doc package itself.
1295 return package
1297 def perform_installations(
1298 self,
1299 integration_mode: DebputyIntegrationMode,
1300 build_system_install_dirs: Sequence[Tuple[str, FrozenSet[BinaryPackage]]],
1301 *,
1302 install_request_context: Optional[InstallSearchDirContext] = None,
1303 ) -> PackageDataTable:
1304 package_data_dict = {}
1305 package_data_table = PackageDataTable(package_data_dict)
1306 enable_manifest_installation_feature = (
1307 integration_mode != INTEGRATION_MODE_DH_DEBPUTY_RRR
1308 )
1310 if build_system_install_dirs: 1310 ↛ 1311line 1310 didn't jump to line 1311 because the condition on line 1310 was never true
1311 if integration_mode != INTEGRATION_MODE_FULL:
1312 raise ValueError(
1313 "The build_system_install_dirs parameter can only be used in full integration mode"
1314 )
1315 if install_request_context:
1316 raise ValueError(
1317 "The build_system_install_dirs parameter cannot be used with install_request_context"
1318 " (not implemented)"
1319 )
1321 if install_request_context is None: 1321 ↛ 1323line 1321 didn't jump to line 1323 because the condition on line 1321 was never true
1323 @functools.lru_cache(None)
1324 def _as_path(fs_path: str) -> VirtualPath:
1325 return FSROOverlay.create_root_dir(".", fs_path)
1327 dtmp_dir = _as_path("debian/tmp")
1328 source_root_dir = _as_path(".")
1329 into = frozenset(self._binary_packages.values())
1330 per_package_search_dirs = {
1331 t.binary_package: [_as_path(f.match_rule.path) for f in t.search_dirs]
1332 for t in self.package_transformations.values()
1333 if t.search_dirs is not None
1334 }
1336 if integration_mode == INTEGRATION_MODE_FULL:
1337 # In this mode, we have no default search dir. Everything ends up being
1338 # per-package instead (since that is easier logic-wise).
1339 #
1340 # Even dtmp_dir is omitted here (since it is not universally applicable).
1341 # Note we still initialize dtmp_dir, since it affects a later guard.
1342 default_search_dirs = []
1343 _add_build_install_dirs_to_per_package_search_dirs(
1344 build_system_install_dirs,
1345 per_package_search_dirs,
1346 _as_path,
1347 )
1349 # We can end here with per_package_search_dirs having no search dirs for any package
1350 # (single binary, where everything is installed into d/<pkg> is the most common case).
1351 #
1352 # This is not a problem in itself as the installation rules can still apply to the
1353 # source root and there should be no reason to install something from d/<pkg> into
1354 # d/<another-pkg>
1355 else:
1356 default_search_dirs = [dtmp_dir]
1358 search_dirs = _determine_search_dir_order(
1359 per_package_search_dirs,
1360 into,
1361 default_search_dirs,
1362 source_root_dir,
1363 )
1364 check_for_uninstalled_dirs = tuple(
1365 s.search_dir
1366 for s in search_dirs
1367 if s.search_dir.fs_path != source_root_dir.fs_path
1368 )
1369 if enable_manifest_installation_feature:
1370 _present_installation_dirs(
1371 search_dirs, check_for_uninstalled_dirs, into
1372 )
1373 else:
1374 dtmp_dir = None
1375 search_dirs = install_request_context.search_dirs
1376 into = frozenset(self._binary_packages.values())
1377 seen: Set[BinaryPackage] = set()
1378 for search_dir in search_dirs:
1379 seen.update(search_dir.applies_to)
1381 missing = into - seen
1382 if missing: 1382 ↛ 1383line 1382 didn't jump to line 1383 because the condition on line 1382 was never true
1383 names = ", ".join(p.name for p in missing)
1384 raise ValueError(
1385 f"The following package(s) had no search dirs: {names}."
1386 " (Generally, the source root would be applicable to all packages)"
1387 )
1388 extra_names = seen - into
1389 if extra_names: 1389 ↛ 1390line 1389 didn't jump to line 1390 because the condition on line 1389 was never true
1390 names = ", ".join(p.name for p in extra_names)
1391 raise ValueError(
1392 f"The install_request_context referenced the following unknown package(s): {names}"
1393 )
1395 check_for_uninstalled_dirs = (
1396 install_request_context.check_for_uninstalled_dirs
1397 )
1399 install_rule_context = InstallRuleContext(search_dirs)
1401 if ( 1401 ↛ 1408line 1401 didn't jump to line 1408 because the condition on line 1401 was never true
1402 enable_manifest_installation_feature
1403 and self._install_rules is None
1404 # TODO: Should we also do this for full mode when build systems provided search dirs?
1405 and dtmp_dir is not None
1406 and os.path.isdir(dtmp_dir.fs_path)
1407 ):
1408 msg = (
1409 "The build system appears to have provided the output of upstream build system's"
1410 " install in debian/tmp. However, these are no provisions for debputy to install"
1411 " any of that into any of the debian packages listed in debian/control."
1412 " To avoid accidentally creating empty packages, debputy will insist that you "
1413 " explicitly define an empty installation definition if you did not want to "
1414 " install any of those files even though they have been provided."
1415 ' Example: "installations: []"'
1416 )
1417 _error(msg)
1418 elif ( 1418 ↛ 1421line 1418 didn't jump to line 1421 because the condition on line 1418 was never true
1419 not enable_manifest_installation_feature and self._install_rules is not None
1420 ):
1421 _error(
1422 f"The `installations` feature cannot be used in {self.manifest_path} with this integration mode."
1423 f" Please remove or comment out the `installations` keyword."
1424 )
1426 for dctrl_bin in self.all_packages:
1427 package = dctrl_bin.name
1428 doc_main_package = self._detect_doc_main_package_for(dctrl_bin)
1430 install_rule_context[package] = BinaryPackageInstallRuleContext(
1431 dctrl_bin,
1432 FSRootDir(),
1433 doc_main_package,
1434 )
1436 if enable_manifest_installation_feature: 1436 ↛ 1441line 1436 didn't jump to line 1441 because the condition on line 1436 was always true
1437 discard_rules = list(
1438 self.plugin_provided_feature_set.auto_discard_rules.values()
1439 )
1440 else:
1441 discard_rules = [
1442 self.plugin_provided_feature_set.auto_discard_rules["debian-dir"]
1443 ]
1444 path_matcher = SourcePathMatcher(discard_rules)
1446 source_condition_context = self._source_condition_context
1448 for dctrl_bin in self.active_packages:
1449 package = dctrl_bin.name
1450 if install_request_context: 1450 ↛ 1455line 1450 didn't jump to line 1455 because the condition on line 1450 was always true
1451 build_system_staging_dir = install_request_context.debian_pkg_dirs.get(
1452 package
1453 )
1454 else:
1455 build_system_staging_dir_fs_path = os.path.join("debian", package)
1456 if os.path.isdir(build_system_staging_dir_fs_path):
1457 build_system_staging_dir = FSROOverlay.create_root_dir(
1458 ".",
1459 build_system_staging_dir_fs_path,
1460 )
1461 else:
1462 build_system_staging_dir = None
1464 if build_system_staging_dir is not None:
1465 _install_everything_from_source_dir_if_present(
1466 dctrl_bin,
1467 self.substitution,
1468 path_matcher,
1469 install_rule_context,
1470 source_condition_context,
1471 build_system_staging_dir,
1472 )
1474 if self._install_rules:
1475 # FIXME: Check that every install rule remains used after transformations have run.
1476 # What we want to check is transformations do not exclude everything from an install
1477 # rule. The hard part here is that renaming (etc.) is fine, so we cannot 1:1 string
1478 # match.
1479 for install_rule in self._install_rules:
1480 install_rule.perform_install(
1481 path_matcher,
1482 install_rule_context,
1483 source_condition_context,
1484 )
1486 if enable_manifest_installation_feature: 1486 ↛ 1490line 1486 didn't jump to line 1490 because the condition on line 1486 was always true
1487 for search_dir in check_for_uninstalled_dirs:
1488 _detect_missing_installations(path_matcher, search_dir)
1490 for dctrl_bin in self.all_packages:
1491 package = dctrl_bin.name
1492 binary_install_rule_context = install_rule_context[package]
1493 build_system_pkg_staging_dir = os.path.join("debian", package)
1494 fs_root = binary_install_rule_context.fs_root
1496 context = self.package_transformations[package]
1497 if dctrl_bin.should_be_acted_on and enable_manifest_installation_feature:
1498 for special_install_rule in context.install_rules: 1498 ↛ 1499line 1498 didn't jump to line 1499 because the loop on line 1498 never started
1499 special_install_rule.perform_install(
1500 path_matcher,
1501 install_rule_context,
1502 source_condition_context,
1503 )
1505 if dctrl_bin.should_be_acted_on:
1506 self.apply_fs_transformations(package, fs_root)
1507 substvars_file = f"debian/{package}.substvars"
1508 substvars = FlushableSubstvars.load_from_path(
1509 substvars_file, missing_ok=True
1510 )
1511 # We do not want to touch the substvars file (non-clean rebuild contamination)
1512 substvars.substvars_path = None
1513 else:
1514 substvars = FlushableSubstvars()
1516 udeb_package = self._binary_packages.get(f"{package}-udeb")
1517 if udeb_package and not udeb_package.is_udeb: 1517 ↛ 1518line 1517 didn't jump to line 1518 because the condition on line 1517 was never true
1518 udeb_package = None
1520 package_metadata_context = PackageProcessingContextProvider(
1521 self,
1522 dctrl_bin,
1523 udeb_package,
1524 package_data_table,
1525 # FIXME: source_package
1526 )
1528 ctrl_creator = BinaryCtrlAccessorProviderCreator(
1529 package_metadata_context,
1530 substvars,
1531 context.maintscript_snippets,
1532 context.substitution,
1533 )
1535 if not enable_manifest_installation_feature: 1535 ↛ 1536line 1535 didn't jump to line 1536 because the condition on line 1535 was never true
1536 assert_no_dbgsym_migration(dctrl_bin)
1537 dh_dbgsym_root_fs = dhe_dbgsym_root_dir(dctrl_bin)
1538 dh_dbgsym_root_path = FSROOverlay.create_root_dir(
1539 "",
1540 dh_dbgsym_root_fs,
1541 )
1542 dbgsym_root_fs = FSRootDir()
1543 _install_everything_from_source_dir_if_present(
1544 dctrl_bin,
1545 self.substitution,
1546 path_matcher,
1547 install_rule_context,
1548 source_condition_context,
1549 dh_dbgsym_root_path,
1550 into_dir=dbgsym_root_fs,
1551 )
1552 dbgsym_build_ids = read_dbgsym_file(dctrl_bin)
1553 dbgsym_info = DbgsymInfo(
1554 dctrl_bin,
1555 dbgsym_root_fs,
1556 os.path.join(dh_dbgsym_root_fs, "DEBIAN"),
1557 dbgsym_build_ids,
1558 # TODO: Provide manifest feature to support this.
1559 False,
1560 )
1561 else:
1562 dbgsym_info = DbgsymInfo(
1563 dctrl_bin,
1564 FSRootDir(),
1565 None,
1566 [],
1567 False,
1568 )
1570 package_data_dict[package] = BinaryPackageData(
1571 self._source_package,
1572 dctrl_bin,
1573 build_system_pkg_staging_dir,
1574 fs_root,
1575 substvars,
1576 package_metadata_context,
1577 ctrl_creator,
1578 dbgsym_info,
1579 )
1581 if enable_manifest_installation_feature: 1581 ↛ 1584line 1581 didn't jump to line 1584 because the condition on line 1581 was always true
1582 _list_automatic_discard_rules(path_matcher)
1584 return package_data_table
1586 def condition_context(
1587 self, binary_package: Optional[Union[BinaryPackage, str]]
1588 ) -> ConditionContext:
1589 if binary_package is None: 1589 ↛ 1590line 1589 didn't jump to line 1590 because the condition on line 1589 was never true
1590 return self._source_condition_context
1591 if not isinstance(binary_package, str): 1591 ↛ 1592line 1591 didn't jump to line 1592 because the condition on line 1591 was never true
1592 binary_package = binary_package.name
1594 package_transformation = self.package_transformations[binary_package]
1595 return self._source_condition_context.replace(
1596 binary_package=package_transformation.binary_package,
1597 substitution=package_transformation.substitution,
1598 )
1600 def apply_fs_transformations(
1601 self,
1602 package: str,
1603 fs_root: FSPath,
1604 ) -> None:
1605 if package in self._used_for: 1605 ↛ 1606line 1605 didn't jump to line 1606 because the condition on line 1605 was never true
1606 raise ValueError(
1607 f"data.tar contents for {package} has already been finalized!?"
1608 )
1609 if package not in self.package_transformations: 1609 ↛ 1610line 1609 didn't jump to line 1610 because the condition on line 1609 was never true
1610 raise ValueError(
1611 f'The package "{package}" was not relevant for the manifest!?'
1612 )
1613 package_transformation = self.package_transformations[package]
1614 condition_context = ConditionContext(
1615 binary_package=package_transformation.binary_package,
1616 substitution=package_transformation.substitution,
1617 deb_options_and_profiles=self._build_env,
1618 dpkg_architecture_variables=self._dpkg_architecture_variables,
1619 dpkg_arch_query_table=self._dpkg_arch_query_table,
1620 )
1621 norm_rules = list(
1622 builtin_mode_normalization_rules(
1623 self._dpkg_architecture_variables,
1624 package_transformation.binary_package,
1625 package_transformation.substitution,
1626 )
1627 )
1628 norm_mode_transformation_rule = ModeNormalizationTransformationRule(norm_rules)
1629 norm_mode_transformation_rule.transform_file_system(fs_root, condition_context)
1630 for transformation in package_transformation.transformations:
1631 transformation.run_transform_file_system(fs_root, condition_context)
1632 interpreter_normalization = NormalizeShebangLineTransformation()
1633 interpreter_normalization.transform_file_system(fs_root, condition_context)
1635 def finalize_data_tar_contents(
1636 self,
1637 package: str,
1638 fs_root: FSPath,
1639 clamp_mtime_to: int,
1640 ) -> IntermediateManifest:
1641 if package in self._used_for: 1641 ↛ 1642line 1641 didn't jump to line 1642 because the condition on line 1641 was never true
1642 raise ValueError(
1643 f"data.tar contents for {package} has already been finalized!?"
1644 )
1645 if package not in self.package_transformations: 1645 ↛ 1646line 1645 didn't jump to line 1646 because the condition on line 1645 was never true
1646 raise ValueError(
1647 f'The package "{package}" was not relevant for the manifest!?'
1648 )
1649 self._used_for.add(package)
1651 # At this point, there so be no further mutations to the file system (because the will not
1652 # be present in the intermediate manifest)
1653 cast("FSRootDir", fs_root).is_read_write = False
1655 intermediate_manifest = list(
1656 _generate_intermediate_manifest(
1657 fs_root,
1658 clamp_mtime_to,
1659 )
1660 )
1661 return intermediate_manifest
1663 def apply_to_binary_staging_directory(
1664 self,
1665 package: str,
1666 fs_root: FSPath,
1667 clamp_mtime_to: int,
1668 ) -> IntermediateManifest:
1669 self.apply_fs_transformations(package, fs_root)
1670 return self.finalize_data_tar_contents(package, fs_root, clamp_mtime_to)
1673@dataclasses.dataclass(slots=True)
1674class SearchDirOrderState:
1675 search_dir: VirtualPath
1676 applies_to: Union[Set[BinaryPackage], FrozenSet[BinaryPackage]] = dataclasses.field(
1677 default_factory=set
1678 )
1679 after: Set[str] = dataclasses.field(default_factory=set)
1682def _present_installation_dirs(
1683 search_dirs: Sequence[SearchDir],
1684 checked_missing_dirs: Sequence[VirtualPath],
1685 all_pkgs: FrozenSet[BinaryPackage],
1686) -> None:
1687 _info("The following directories are considered search dirs (in order):")
1688 max_len = max((len(s.search_dir.fs_path) for s in search_dirs), default=1)
1689 for search_dir in search_dirs:
1690 applies_to = ""
1691 if search_dir.applies_to < all_pkgs:
1692 names = ", ".join(p.name for p in search_dir.applies_to)
1693 applies_to = f" [only applicable to: {names}]"
1694 remark = ""
1695 if not os.path.isdir(search_dir.search_dir.fs_path):
1696 remark = " (skipped; absent)"
1697 _info(f" * {search_dir.search_dir.fs_path:{max_len}}{applies_to}{remark}")
1699 if checked_missing_dirs:
1700 _info('The following directories are considered for "not-installed" paths;')
1701 for d in checked_missing_dirs:
1702 remark = ""
1703 if not os.path.isdir(d.fs_path):
1704 remark = " (skipped; absent)"
1705 _info(f" * {d.fs_path:{max_len}}{remark}")
1708def _determine_search_dir_order(
1709 requested: Mapping[BinaryPackage, List[VirtualPath]],
1710 all_pkgs: FrozenSet[BinaryPackage],
1711 default_search_dirs: List[VirtualPath],
1712 source_root: VirtualPath,
1713) -> Sequence[SearchDir]:
1714 search_dir_table = {}
1715 assert requested.keys() <= all_pkgs
1716 for pkg in all_pkgs:
1717 paths = requested.get(pkg, default_search_dirs)
1718 previous_search_dir: Optional[SearchDirOrderState] = None
1719 for path in paths:
1720 try:
1721 search_dir_state = search_dir_table[path.fs_path]
1722 except KeyError:
1723 search_dir_state = SearchDirOrderState(path)
1724 search_dir_table[path.fs_path] = search_dir_state
1725 search_dir_state.applies_to.add(pkg)
1726 if previous_search_dir is not None:
1727 search_dir_state.after.add(previous_search_dir.search_dir.fs_path)
1728 previous_search_dir = search_dir_state
1730 search_dirs_in_order = []
1731 released = set()
1732 remaining = set()
1733 for search_dir_state in search_dir_table.values():
1734 if not (search_dir_state.after <= released):
1735 remaining.add(search_dir_state.search_dir.fs_path)
1736 continue
1737 search_dirs_in_order.append(search_dir_state)
1738 released.add(search_dir_state.search_dir.fs_path)
1740 while remaining:
1741 current_released = len(released)
1742 for fs_path in remaining:
1743 search_dir_state = search_dir_table[fs_path]
1744 if not search_dir_state.after.issubset(released):
1745 remaining.add(search_dir_state.search_dir.fs_path)
1746 continue
1747 search_dirs_in_order.append(search_dir_state)
1748 released.add(search_dir_state.search_dir.fs_path)
1750 if current_released == len(released):
1751 names = ", ".join(remaining)
1752 _error(
1753 f"There is a circular dependency (somewhere) between the search dirs: {names}."
1754 " Note that the search directories across all packages have to be ordered (and the"
1755 " source root should generally be last)"
1756 )
1757 remaining -= released
1759 search_dirs_in_order.append(
1760 SearchDirOrderState(
1761 source_root,
1762 all_pkgs,
1763 )
1764 )
1766 return tuple(
1767 # Avoid duplicating all_pkgs
1768 SearchDir(
1769 s.search_dir,
1770 frozenset(s.applies_to) if s.applies_to != all_pkgs else all_pkgs,
1771 )
1772 for s in search_dirs_in_order
1773 )