Coverage for src/debputy/highlevel_manifest.py: 65%
900 statements
« prev ^ index » next coverage.py v7.8.2, created at 2026-04-06 19:25 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2026-04-06 19:25 +0000
1import dataclasses
2import functools
3import os
4import textwrap
5import typing
6from contextlib import suppress
7from dataclasses import dataclass, field
8from typing import (
9 Any,
10 TypeVar,
11 Generic,
12 cast,
13)
14from collections.abc import Iterable, Mapping, Sequence, Callable
16from debian.debian_support import DpkgArchTable
18from debputy.dh.debhelper_emulation import (
19 dhe_dbgsym_root_dir,
20 assert_no_dbgsym_migration,
21 read_dbgsym_file,
22)
23from ._deb_options_profiles import DebBuildOptionsAndProfiles
24from ._manifest_constants import *
25from .architecture_support import DpkgArchitectureBuildProcessValuesTable
26from .builtin_manifest_rules import builtin_mode_normalization_rules
27from .exceptions import (
28 DebputySubstitutionError,
29 DebputyRuntimeErrorWithPreamble,
30)
31from .filesystem_scan import (
32 InMemoryVirtualRootDir,
33 OSFSROOverlay,
34 FSControlRootDir,
35 InMemoryVirtualPathBase,
36)
37from .installations import (
38 InstallRule,
39 SourcePathMatcher,
40 PathAlreadyInstalledOrDiscardedError,
41 NoMatchForInstallPatternError,
42 InstallRuleContext,
43 BinaryPackageInstallRuleContext,
44 InstallSearchDirContext,
45 SearchDir,
46)
47from .intermediate_manifest import TarMember, PathType, IntermediateManifest
48from .maintscript_snippet import (
49 DpkgMaintscriptHelperCommand,
50 MaintscriptSnippetContainer,
51)
52from .manifest_conditions import ConditionContext
53from .manifest_parser.base_types import (
54 FileSystemMatchRule,
55 FileSystemExactMatchRule,
56 BuildEnvironments,
57)
58from .manifest_parser.util import AttributePath
59from .packager_provided_files import PackagerProvidedFile
60from .packages import BinaryPackage, SourcePackage
61from .plugin.api.feature_set import PluginProvidedFeatureSet
62from .plugin.api.impl import BinaryCtrlAccessorProviderCreator
63from .plugin.api.impl_types import (
64 PackageProcessingContextProvider,
65 PackageDataTable,
66)
67from .plugin.api.spec import (
68 FlushableSubstvars,
69 VirtualPath,
70 DebputyIntegrationMode,
71 INTEGRATION_MODE_DH_DEBPUTY_RRR,
72 INTEGRATION_MODE_FULL,
73)
74from debputy.plugins.debputy.binary_package_rules import ServiceRule
75from debputy.plugins.debputy.build_system_rules import BuildRule
76from .plugin.plugin_state import run_in_context_of_plugin
77from .substitution import Substitution
78from .transformation_rules import (
79 TransformationRule,
80 ModeNormalizationTransformationRule,
81 NormalizeShebangLineTransformation,
82)
83from .util import (
84 _error,
85 _warn,
86 debian_policy_normalize_symlink_target,
87 generated_content_dir,
88 _info,
89)
90from .yaml import MANIFEST_YAML
91from .yaml.compat import CommentedMap, CommentedSeq
94def tar_path(p: VirtualPath) -> str:
95 path = p.path
96 if p.is_dir:
97 return path + "/"
98 return path
101def tar_owner_info(path: InMemoryVirtualPathBase) -> tuple[str, int, str, int]:
102 owner = path._owner # noqa
103 group = path._group # noqa
104 return (
105 owner.entity_name,
106 owner.entity_id,
107 group.entity_name,
108 group.entity_id,
109 )
112class PathNotCoveredByInstallRulesError(DebputyRuntimeErrorWithPreamble):
114 @property
115 def unmatched_paths(self) -> Sequence[VirtualPath]:
116 return self.args[1]
118 @property
119 def search_dir(self) -> VirtualPath:
120 return self.args[2]
122 def render_preamble(self) -> None:
123 _warn(
124 f"The following paths were present in {self.search_dir.fs_path}, but not installed (nor explicitly discarded)."
125 )
126 _warn("")
127 for entry in self.unmatched_paths:
128 desc = _describe_missing_path(entry)
129 _warn(f" * {desc}")
130 _warn("")
133@dataclass(slots=True)
134class DbgsymInfo:
135 binary_package: BinaryPackage
136 dbgsym_fs_root: InMemoryVirtualPathBase
137 _dbgsym_root_fs: str | None
138 dbgsym_ids: list[str]
139 run_dwz: bool
141 @property
142 def dbgsym_root_dir(self) -> str:
143 root_dir = self._dbgsym_root_fs
144 if root_dir is None:
145 root_dir = generated_content_dir(
146 package=self.binary_package,
147 subdir_key="dbgsym-fs-root",
148 )
149 self._dbgsym_root_fs = root_dir
150 return root_dir
152 @property
153 def dbgsym_ctrl_dir(self) -> FSControlRootDir:
154 return FSControlRootDir.create_root_dir(
155 os.path.join(self.dbgsym_root_dir, "DEBIAN")
156 )
159@dataclass(slots=True, frozen=True)
160class BinaryPackageData:
161 source_package: SourcePackage
162 binary_package: BinaryPackage
163 binary_staging_root_dir: str
164 fs_root: InMemoryVirtualPathBase
165 substvars: FlushableSubstvars
166 package_metadata_context: PackageProcessingContextProvider
167 ctrl_creator: BinaryCtrlAccessorProviderCreator
168 dbgsym_info: DbgsymInfo
170 @property
171 def control_output_dir(self) -> FSControlRootDir:
172 return FSControlRootDir.create_root_dir(
173 generated_content_dir(
174 package=self.binary_package,
175 subdir_key="DEBIAN",
176 )
177 )
180@dataclass(slots=True)
181class PackageTransformationDefinition:
182 binary_package: BinaryPackage
183 substitution: Substitution
184 is_auto_generated_package: bool
185 binary_version: str | None = None
186 search_dirs: list[FileSystemExactMatchRule] | None = None
187 dpkg_maintscript_helper_snippets: list[DpkgMaintscriptHelperCommand] = field(
188 default_factory=list
189 )
190 maintscript_snippets: dict[str, MaintscriptSnippetContainer] = field(
191 default_factory=dict
192 )
193 transformations: list[TransformationRule] = field(default_factory=list)
194 reserved_packager_provided_files: dict[str, list[PackagerProvidedFile]] = field(
195 default_factory=dict
196 )
197 install_rules: list[InstallRule] = field(default_factory=list)
198 requested_service_rules: list[ServiceRule] = field(default_factory=list)
201def _path_to_tar_member(
202 path: InMemoryVirtualPathBase,
203 clamp_mtime_to: int,
204) -> TarMember:
205 mtime = float(clamp_mtime_to)
206 owner, uid, group, gid = tar_owner_info(path)
207 mode = path.mode
209 if path.has_fs_path:
210 mtime = min(mtime, path.mtime)
212 if path.is_dir:
213 path_type = PathType.DIRECTORY
214 elif path.is_file:
215 # TODO: someday we will need to deal with hardlinks and it might appear here.
216 path_type = PathType.FILE
217 elif path.is_symlink: 217 ↛ 237line 217 didn't jump to line 237 because the condition on line 217 was always true
218 # Special-case that we resolve immediately (since we need to normalize the target anyway)
219 link_target = debian_policy_normalize_symlink_target(
220 path.path,
221 path.readlink(),
222 )
223 return TarMember.virtual_path(
224 tar_path(path),
225 PathType.SYMLINK,
226 mtime,
227 link_target=link_target,
228 # Force mode to be 0777 as that is the mode we see in the data.tar. In theory, tar lets you set
229 # it to whatever. However, for reproducibility, we have to be well-behaved - and that is 0777.
230 mode=0o0777,
231 owner=owner,
232 uid=uid,
233 group=group,
234 gid=gid,
235 )
236 else:
237 assert not path.is_symlink
238 raise AssertionError(
239 f"Unsupported file type: {path.path} - not a file, dir nor a symlink!"
240 )
242 if not path.has_fs_path:
243 assert not path.is_file
244 return TarMember.virtual_path(
245 tar_path(path),
246 path_type,
247 mtime,
248 mode=mode,
249 owner=owner,
250 uid=uid,
251 group=group,
252 gid=gid,
253 )
254 may_steal_fs_path = getattr(path, "_can_replace_inline", False)
255 return TarMember.from_file(
256 tar_path(path),
257 path.fs_path,
258 mode=mode,
259 uid=uid,
260 owner=owner,
261 gid=gid,
262 group=group,
263 path_type=path_type,
264 path_mtime=mtime,
265 clamp_mtime_to=clamp_mtime_to,
266 may_steal_fs_path=may_steal_fs_path,
267 )
270def _generate_intermediate_manifest(
271 fs_root: InMemoryVirtualPathBase,
272 clamp_mtime_to: int,
273) -> Iterable[TarMember]:
274 symlinks = []
275 for path in fs_root.all_paths():
276 tar_member = _path_to_tar_member(path, clamp_mtime_to)
277 if tar_member.path_type == PathType.SYMLINK:
278 symlinks.append(tar_member)
279 continue
280 yield tar_member
281 yield from symlinks
284ST = TypeVar("ST")
285T = TypeVar("T")
288class AbstractYAMLSubStore(Generic[ST]):
289 def __init__(
290 self,
291 parent_store: Any,
292 parent_key: int | str | None,
293 store: ST | None = None,
294 ) -> None:
295 if parent_store is not None and parent_key is not None:
296 try:
297 from_parent_store = parent_store[parent_key]
298 except (KeyError, IndexError):
299 from_parent_store = None
300 if ( 300 ↛ 305line 300 didn't jump to line 305 because the condition on line 300 was never true
301 store is not None
302 and from_parent_store is not None
303 and store is not parent_store
304 ):
305 raise ValueError(
306 "Store is provided but is not the one already in the parent store"
307 )
308 if store is None: 308 ↛ 310line 308 didn't jump to line 310 because the condition on line 308 was always true
309 store = from_parent_store
310 self._parent_store = parent_store
311 self._parent_key = parent_key
312 self._is_detached = (
313 parent_key is None or parent_store is None or parent_key not in parent_store
314 )
315 assert self._is_detached or store is not None
316 if store is None:
317 store = self._create_new_instance()
318 self._store: ST = store
320 def _create_new_instance(self) -> ST:
321 raise NotImplementedError
323 def create_definition_if_missing(self) -> None:
324 if self._is_detached:
325 self.create_definition()
327 def create_definition(self) -> None:
328 if not self._is_detached: 328 ↛ 329line 328 didn't jump to line 329 because the condition on line 328 was never true
329 raise RuntimeError("Definition is already present")
330 parent_store = self._parent_store
331 if parent_store is None: 331 ↛ 332line 331 didn't jump to line 332 because the condition on line 331 was never true
332 raise RuntimeError(
333 f"Definition is not attached to any parent!? ({self.__class__.__name__})"
334 )
335 if isinstance(parent_store, list):
336 assert self._parent_key is None
337 self._parent_key = len(parent_store)
338 self._parent_store.append(self._store)
339 else:
340 parent_store[self._parent_key] = self._store
341 self._is_detached = False
343 def remove_definition(self) -> None:
344 self._ensure_attached()
345 del self._parent_store[self._parent_key]
346 if isinstance(self._parent_store, list):
347 self._parent_key = None
348 self._is_detached = True
350 def _ensure_attached(self) -> None:
351 if self._is_detached:
352 raise RuntimeError("The definition has been removed!")
355class AbstractYAMLListSubStore(Generic[T], AbstractYAMLSubStore[list[T]]):
356 def _create_new_instance(self) -> list[T]:
357 return CommentedSeq()
360class AbstractYAMLDictSubStore(Generic[T], AbstractYAMLSubStore[dict[str, T]]):
361 def _create_new_instance(self) -> dict[str, T]:
362 return CommentedMap()
365class MutableCondition:
366 @classmethod
367 def arch_matches(cls, arch_filter: str) -> CommentedMap:
368 return CommentedMap({MK_CONDITION_ARCH_MATCHES: arch_filter})
370 @classmethod
371 def build_profiles_matches(cls, build_profiles_matches: str) -> CommentedMap:
372 return CommentedMap(
373 {MK_CONDITION_BUILD_PROFILES_MATCHES: build_profiles_matches}
374 )
377class MutableYAMLSymlink(AbstractYAMLDictSubStore[Any]):
378 @classmethod
379 def new_symlink(
380 cls, link_path: str, link_target: str, condition: Any | None
381 ) -> "MutableYAMLSymlink":
382 inner = {
383 MK_TRANSFORMATIONS_CREATE_SYMLINK_LINK_PATH: link_path,
384 MK_TRANSFORMATIONS_CREATE_SYMLINK_LINK_TARGET: link_target,
385 }
386 content = {MK_TRANSFORMATIONS_CREATE_SYMLINK: inner}
387 if condition is not None: 387 ↛ 388line 387 didn't jump to line 388 because the condition on line 387 was never true
388 inner["when"] = condition
389 return cls(None, None, store=CommentedMap(content))
391 @property
392 def symlink_path(self) -> str:
393 return self._store[MK_TRANSFORMATIONS_CREATE_SYMLINK][
394 MK_TRANSFORMATIONS_CREATE_SYMLINK_LINK_PATH
395 ]
397 @symlink_path.setter
398 def symlink_path(self, path: str) -> None:
399 self._store[MK_TRANSFORMATIONS_CREATE_SYMLINK][
400 MK_TRANSFORMATIONS_CREATE_SYMLINK_LINK_PATH
401 ] = path
403 @property
404 def symlink_target(self) -> str | None:
405 return self._store[MK_TRANSFORMATIONS_CREATE_SYMLINK][
406 MK_TRANSFORMATIONS_CREATE_SYMLINK_LINK_TARGET
407 ]
409 @symlink_target.setter
410 def symlink_target(self, target: str) -> None:
411 self._store[MK_TRANSFORMATIONS_CREATE_SYMLINK][
412 MK_TRANSFORMATIONS_CREATE_SYMLINK_LINK_TARGET
413 ] = target
416class MutableYAMLConffileManagementItem(AbstractYAMLDictSubStore[Any]):
417 @classmethod
418 def rm_conffile(
419 cls,
420 conffile: str,
421 prior_to_version: str | None,
422 owning_package: str | None,
423 ) -> "MutableYAMLConffileManagementItem":
424 r = cls(
425 None,
426 None,
427 store=CommentedMap(
428 {
429 MK_CONFFILE_MANAGEMENT_REMOVE: CommentedMap(
430 {MK_CONFFILE_MANAGEMENT_REMOVE_PATH: conffile}
431 )
432 }
433 ),
434 )
435 r.prior_to_version = prior_to_version
436 r.owning_package = owning_package
437 return r
439 @classmethod
440 def mv_conffile(
441 cls,
442 old_conffile: str,
443 new_conffile: str,
444 prior_to_version: str | None,
445 owning_package: str | None,
446 ) -> "MutableYAMLConffileManagementItem":
447 r = cls(
448 None,
449 None,
450 store=CommentedMap(
451 {
452 MK_CONFFILE_MANAGEMENT_RENAME: CommentedMap(
453 {
454 MK_CONFFILE_MANAGEMENT_RENAME_SOURCE: old_conffile,
455 MK_CONFFILE_MANAGEMENT_RENAME_TARGET: new_conffile,
456 }
457 )
458 }
459 ),
460 )
461 r.prior_to_version = prior_to_version
462 r.owning_package = owning_package
463 return r
465 @property
466 def _container(self) -> dict[str, Any]:
467 assert len(self._store) == 1
468 return next(iter(self._store.values()))
470 @property
471 def command(self) -> str:
472 assert len(self._store) == 1
473 return next(iter(self._store))
475 @property
476 def obsolete_conffile(self) -> str:
477 if self.command == MK_CONFFILE_MANAGEMENT_REMOVE:
478 return self._container[MK_CONFFILE_MANAGEMENT_REMOVE_PATH]
479 assert self.command == MK_CONFFILE_MANAGEMENT_RENAME
480 return self._container[MK_CONFFILE_MANAGEMENT_RENAME_SOURCE]
482 @obsolete_conffile.setter
483 def obsolete_conffile(self, value: str) -> None:
484 if self.command == MK_CONFFILE_MANAGEMENT_REMOVE:
485 self._container[MK_CONFFILE_MANAGEMENT_REMOVE_PATH] = value
486 else:
487 assert self.command == MK_CONFFILE_MANAGEMENT_RENAME
488 self._container[MK_CONFFILE_MANAGEMENT_RENAME_SOURCE] = value
490 @property
491 def new_conffile(self) -> str:
492 if self.command != MK_CONFFILE_MANAGEMENT_RENAME:
493 raise TypeError(
494 f"The new_conffile attribute is only applicable to command {MK_CONFFILE_MANAGEMENT_RENAME}."
495 f" This is a {self.command}"
496 )
497 return self._container[MK_CONFFILE_MANAGEMENT_RENAME_TARGET]
499 @new_conffile.setter
500 def new_conffile(self, value: str) -> None:
501 if self.command != MK_CONFFILE_MANAGEMENT_RENAME:
502 raise TypeError(
503 f"The new_conffile attribute is only applicable to command {MK_CONFFILE_MANAGEMENT_RENAME}."
504 f" This is a {self.command}"
505 )
506 self._container[MK_CONFFILE_MANAGEMENT_RENAME_TARGET] = value
508 @property
509 def prior_to_version(self) -> str | None:
510 return self._container.get(MK_CONFFILE_MANAGEMENT_X_PRIOR_TO_VERSION)
512 @prior_to_version.setter
513 def prior_to_version(self, value: str | None) -> None:
514 if value is None:
515 try:
516 del self._container[MK_CONFFILE_MANAGEMENT_X_PRIOR_TO_VERSION]
517 except KeyError:
518 pass
519 else:
520 self._container[MK_CONFFILE_MANAGEMENT_X_PRIOR_TO_VERSION] = value
522 @property
523 def owning_package(self) -> str | None:
524 return self._container[MK_CONFFILE_MANAGEMENT_X_PRIOR_TO_VERSION]
526 @owning_package.setter
527 def owning_package(self, value: str | None) -> None:
528 if value is None:
529 try:
530 del self._container[MK_CONFFILE_MANAGEMENT_X_OWNING_PACKAGE]
531 except KeyError:
532 pass
533 else:
534 self._container[MK_CONFFILE_MANAGEMENT_X_OWNING_PACKAGE] = value
537class MutableYAMLPackageDefinition(AbstractYAMLDictSubStore):
538 def _list_store(
539 self, key, *, create_if_absent: bool = False
540 ) -> list[dict[str, Any]] | None:
541 if self._is_detached or key not in self._store:
542 if create_if_absent: 542 ↛ 543line 542 didn't jump to line 543 because the condition on line 542 was never true
543 return None
544 self.create_definition_if_missing()
545 self._store[key] = []
546 return self._store[key]
548 def _insert_item(self, key: str, item: AbstractYAMLDictSubStore) -> None:
549 parent_store = self._list_store(key, create_if_absent=True)
550 assert parent_store is not None
551 if not item._is_detached or ( 551 ↛ 554line 551 didn't jump to line 554 because the condition on line 551 was never true
552 item._parent_store is not None and item._parent_store is not parent_store
553 ):
554 raise RuntimeError(
555 "Item is already attached or associated with a different container"
556 )
557 item._parent_store = parent_store
558 item.create_definition()
560 def add_symlink(self, symlink: MutableYAMLSymlink) -> None:
561 self._insert_item(MK_TRANSFORMATIONS, symlink)
563 def symlinks(self) -> Iterable[MutableYAMLSymlink]:
564 store = self._list_store(MK_TRANSFORMATIONS)
565 if store is None: 565 ↛ 566line 565 didn't jump to line 566 because the condition on line 565 was never true
566 return
567 for i in range(len(store)): 567 ↛ 568line 567 didn't jump to line 568 because the loop on line 567 never started
568 d = store[i]
569 if d and isinstance(d, dict) and len(d) == 1 and "symlink" in d:
570 yield MutableYAMLSymlink(store, i)
572 def conffile_management_items(self) -> Iterable[MutableYAMLConffileManagementItem]:
573 store = self._list_store(MK_CONFFILE_MANAGEMENT)
574 if store is None: 574 ↛ 575line 574 didn't jump to line 575 because the condition on line 574 was never true
575 return
576 yield from (
577 MutableYAMLConffileManagementItem(store, i) for i in range(len(store))
578 )
580 def add_conffile_management(
581 self, conffile_management_item: MutableYAMLConffileManagementItem
582 ) -> None:
583 self._insert_item(MK_CONFFILE_MANAGEMENT, conffile_management_item)
586class AbstractMutableYAMLInstallRule(AbstractYAMLDictSubStore):
587 @property
588 def _container(self) -> dict[str, Any]:
589 assert len(self._store) == 1
590 return next(iter(self._store.values()))
592 @property
593 def into(self) -> list[str] | None:
594 v = self._container[MK_INSTALLATIONS_INSTALL_INTO]
595 if v is None:
596 return None
597 if isinstance(v, str):
598 return [v]
599 return v
601 @into.setter
602 def into(self, new_value: str | list[str] | None) -> None:
603 if new_value is None: 603 ↛ 607line 603 didn't jump to line 607 because the condition on line 603 was always true
604 with suppress(KeyError):
605 del self._container[MK_INSTALLATIONS_INSTALL_INTO]
606 return
607 if isinstance(new_value, str):
608 self._container[MK_INSTALLATIONS_INSTALL_INTO] = new_value
609 return
610 new_list = CommentedSeq(new_value)
611 self._container[MK_INSTALLATIONS_INSTALL_INTO] = new_list
613 @property
614 def when(self) -> str | Mapping[str, Any] | None:
615 return self._container[MK_CONDITION_WHEN]
617 @when.setter
618 def when(self, new_value: str | Mapping[str, Any] | None) -> None:
619 if new_value is None: 619 ↛ 620line 619 didn't jump to line 620 because the condition on line 619 was never true
620 with suppress(KeyError):
621 del self._container[MK_CONDITION_WHEN]
622 return
623 if isinstance(new_value, str): 623 ↛ 624line 623 didn't jump to line 624 because the condition on line 623 was never true
624 self._container[MK_CONDITION_WHEN] = new_value
625 return
626 new_map = CommentedMap(new_value)
627 self._container[MK_CONDITION_WHEN] = new_map
629 @classmethod
630 def install_dest(
631 cls,
632 sources: str | list[str],
633 into: str | list[str] | None,
634 *,
635 dest_dir: str | None = None,
636 when: str | Mapping[str, Any] | None = None,
637 ) -> "MutableYAMLInstallRuleInstall":
638 k = MK_INSTALLATIONS_INSTALL_SOURCES
639 if isinstance(sources, str):
640 k = MK_INSTALLATIONS_INSTALL_SOURCE
641 r = MutableYAMLInstallRuleInstall(
642 None,
643 None,
644 store=CommentedMap(
645 {
646 MK_INSTALLATIONS_INSTALL: CommentedMap(
647 {
648 k: sources,
649 }
650 )
651 }
652 ),
653 )
654 r.dest_dir = dest_dir
655 r.into = into
656 if when is not None:
657 r.when = when
658 return r
660 @classmethod
661 def multi_dest_install(
662 cls,
663 sources: str | list[str],
664 dest_dirs: Sequence[str],
665 into: str | list[str] | None,
666 *,
667 when: str | Mapping[str, Any] | None = None,
668 ) -> "MutableYAMLInstallRuleInstall":
669 k = MK_INSTALLATIONS_INSTALL_SOURCES
670 if isinstance(sources, str): 670 ↛ 672line 670 didn't jump to line 672 because the condition on line 670 was always true
671 k = MK_INSTALLATIONS_INSTALL_SOURCE
672 r = MutableYAMLInstallRuleInstall(
673 None,
674 None,
675 store=CommentedMap(
676 {
677 MK_INSTALLATIONS_MULTI_DEST_INSTALL: CommentedMap(
678 {
679 k: sources,
680 "dest-dirs": dest_dirs,
681 }
682 )
683 }
684 ),
685 )
686 r.into = into
687 if when is not None: 687 ↛ 688line 687 didn't jump to line 688 because the condition on line 687 was never true
688 r.when = when
689 return r
691 @classmethod
692 def install_as(
693 cls,
694 source: str,
695 install_as: str,
696 into: str | list[str] | None,
697 when: str | Mapping[str, Any] | None = None,
698 ) -> "MutableYAMLInstallRuleInstall":
699 r = MutableYAMLInstallRuleInstall(
700 None,
701 None,
702 store=CommentedMap(
703 {
704 MK_INSTALLATIONS_INSTALL: CommentedMap(
705 {
706 MK_INSTALLATIONS_INSTALL_SOURCE: source,
707 MK_INSTALLATIONS_INSTALL_AS: install_as,
708 }
709 )
710 }
711 ),
712 )
713 r.into = into
714 if when is not None: 714 ↛ 715line 714 didn't jump to line 715 because the condition on line 714 was never true
715 r.when = when
716 return r
718 @classmethod
719 def install_doc_as(
720 cls,
721 source: str,
722 install_as: str,
723 into: str | list[str] | None,
724 when: str | Mapping[str, Any] | None = None,
725 ) -> "MutableYAMLInstallRuleInstall":
726 r = MutableYAMLInstallRuleInstall(
727 None,
728 None,
729 store=CommentedMap(
730 {
731 MK_INSTALLATIONS_INSTALL_DOCS: CommentedMap(
732 {
733 MK_INSTALLATIONS_INSTALL_SOURCE: source,
734 MK_INSTALLATIONS_INSTALL_AS: install_as,
735 }
736 )
737 }
738 ),
739 )
740 r.into = into
741 if when is not None:
742 r.when = when
743 return r
745 @classmethod
746 def install_docs(
747 cls,
748 sources: str | list[str],
749 into: str | list[str] | None,
750 *,
751 dest_dir: str | None = None,
752 when: str | Mapping[str, Any] | None = None,
753 ) -> "MutableYAMLInstallRuleInstall":
754 k = MK_INSTALLATIONS_INSTALL_SOURCES
755 if isinstance(sources, str):
756 k = MK_INSTALLATIONS_INSTALL_SOURCE
757 r = MutableYAMLInstallRuleInstall(
758 None,
759 None,
760 store=CommentedMap(
761 {
762 MK_INSTALLATIONS_INSTALL_DOCS: CommentedMap(
763 {
764 k: sources,
765 }
766 )
767 }
768 ),
769 )
770 r.into = into
771 r.dest_dir = dest_dir
772 if when is not None:
773 r.when = when
774 return r
776 @classmethod
777 def install_examples(
778 cls,
779 sources: str | list[str],
780 into: str | list[str] | None,
781 when: str | Mapping[str, Any] | None = None,
782 ) -> "MutableYAMLInstallRuleInstallExamples":
783 k = MK_INSTALLATIONS_INSTALL_SOURCES
784 if isinstance(sources, str):
785 k = MK_INSTALLATIONS_INSTALL_SOURCE
786 r = MutableYAMLInstallRuleInstallExamples(
787 None,
788 None,
789 store=CommentedMap(
790 {
791 MK_INSTALLATIONS_INSTALL_EXAMPLES: CommentedMap(
792 {
793 k: sources,
794 }
795 )
796 }
797 ),
798 )
799 r.into = into
800 if when is not None: 800 ↛ 801line 800 didn't jump to line 801 because the condition on line 800 was never true
801 r.when = when
802 return r
804 @classmethod
805 def install_man(
806 cls,
807 sources: str | list[str],
808 into: str | list[str] | None,
809 language: str | None,
810 when: str | Mapping[str, Any] | None = None,
811 ) -> "MutableYAMLInstallRuleMan":
812 k = MK_INSTALLATIONS_INSTALL_SOURCES
813 if isinstance(sources, str): 813 ↛ 814line 813 didn't jump to line 814 because the condition on line 813 was never true
814 k = MK_INSTALLATIONS_INSTALL_SOURCE
815 r = MutableYAMLInstallRuleMan(
816 None,
817 None,
818 store=CommentedMap(
819 {
820 MK_INSTALLATIONS_INSTALL_MAN: CommentedMap(
821 {
822 k: sources,
823 }
824 )
825 }
826 ),
827 )
828 r.language = language
829 r.into = into
830 if when is not None: 830 ↛ 831line 830 didn't jump to line 831 because the condition on line 830 was never true
831 r.when = when
832 return r
834 @classmethod
835 def discard(
836 cls,
837 sources: str | list[str],
838 ) -> "MutableYAMLInstallRuleDiscard":
839 return MutableYAMLInstallRuleDiscard(
840 None,
841 None,
842 store=CommentedMap({MK_INSTALLATIONS_DISCARD: sources}),
843 )
846class MutableYAMLInstallRuleInstallExamples(AbstractMutableYAMLInstallRule):
847 pass
850class MutableYAMLInstallRuleMan(AbstractMutableYAMLInstallRule):
851 @property
852 def language(self) -> str | None:
853 return self._container[MK_INSTALLATIONS_INSTALL_MAN_LANGUAGE]
855 @language.setter
856 def language(self, new_value: str | None) -> None:
857 if new_value is not None:
858 self._container[MK_INSTALLATIONS_INSTALL_MAN_LANGUAGE] = new_value
859 return
860 with suppress(KeyError):
861 del self._container[MK_INSTALLATIONS_INSTALL_MAN_LANGUAGE]
864class MutableYAMLInstallRuleDiscard(AbstractMutableYAMLInstallRule):
865 pass
868class MutableYAMLInstallRuleInstall(AbstractMutableYAMLInstallRule):
869 @property
870 def sources(self) -> list[str]:
871 v = self._container[MK_INSTALLATIONS_INSTALL_SOURCES]
872 if isinstance(v, str):
873 return [v]
874 return v
876 @sources.setter
877 def sources(self, new_value: str | list[str]) -> None:
878 if isinstance(new_value, str):
879 self._container[MK_INSTALLATIONS_INSTALL_SOURCES] = new_value
880 return
881 new_list = CommentedSeq(new_value)
882 self._container[MK_INSTALLATIONS_INSTALL_SOURCES] = new_list
884 @property
885 def dest_dir(self) -> str | None:
886 return self._container.get(MK_INSTALLATIONS_INSTALL_DEST_DIR)
888 @dest_dir.setter
889 def dest_dir(self, new_value: str | None) -> None:
890 if new_value is not None and self.dest_as is not None: 890 ↛ 891line 890 didn't jump to line 891 because the condition on line 890 was never true
891 raise ValueError(
892 f'Cannot both have a "{MK_INSTALLATIONS_INSTALL_DEST_DIR}" and'
893 f' "{MK_INSTALLATIONS_INSTALL_AS}"'
894 )
895 if new_value is not None:
896 self._container[MK_INSTALLATIONS_INSTALL_DEST_DIR] = new_value
897 else:
898 with suppress(KeyError):
899 del self._container[MK_INSTALLATIONS_INSTALL_DEST_DIR]
901 @property
902 def dest_as(self) -> str | None:
903 return self._container.get(MK_INSTALLATIONS_INSTALL_AS)
905 @dest_as.setter
906 def dest_as(self, new_value: str | None) -> None:
907 if new_value is not None:
908 if self.dest_dir is not None:
909 raise ValueError(
910 f'Cannot both have a "{MK_INSTALLATIONS_INSTALL_DEST_DIR}" and'
911 f' "{MK_INSTALLATIONS_INSTALL_AS}"'
912 )
914 sources = self._container[MK_INSTALLATIONS_INSTALL_SOURCES]
915 if isinstance(sources, list):
916 if len(sources) != 1:
917 raise ValueError(
918 f'Cannot have "{MK_INSTALLATIONS_INSTALL_AS}" when'
919 f' "{MK_INSTALLATIONS_INSTALL_SOURCES}" is not exactly one item'
920 )
921 self.sources = sources[0]
922 self._container[MK_INSTALLATIONS_INSTALL_AS] = new_value
923 else:
924 with suppress(KeyError):
925 del self._container[MK_INSTALLATIONS_INSTALL_AS]
928class MutableYAMLInstallationsDefinition(AbstractYAMLListSubStore[Any]):
929 def append(self, install_rule: AbstractMutableYAMLInstallRule) -> None:
930 parent_store = self._store
931 if not install_rule._is_detached or ( 931 ↛ 935line 931 didn't jump to line 935 because the condition on line 931 was never true
932 install_rule._parent_store is not None
933 and install_rule._parent_store is not parent_store
934 ):
935 raise RuntimeError(
936 "Item is already attached or associated with a different container"
937 )
938 self.create_definition_if_missing()
939 install_rule._parent_store = parent_store
940 install_rule.create_definition()
942 def extend(self, install_rules: Iterable[AbstractMutableYAMLInstallRule]) -> None:
943 parent_store = self._store
944 for install_rule in install_rules:
945 if not install_rule._is_detached or ( 945 ↛ 949line 945 didn't jump to line 949 because the condition on line 945 was never true
946 install_rule._parent_store is not None
947 and install_rule._parent_store is not parent_store
948 ):
949 raise RuntimeError(
950 "Item is already attached or associated with a different container"
951 )
952 self.create_definition_if_missing()
953 install_rule._parent_store = parent_store
954 install_rule.create_definition()
957class MutableYAMLManifestVariables(AbstractYAMLDictSubStore):
958 @property
959 def variables(self) -> dict[str, Any]:
960 return self._store
962 def __setitem__(self, key: str, value: Any) -> None:
963 self._store[key] = value
964 self.create_definition_if_missing()
967class MutableYAMLManifestDefinitions(AbstractYAMLDictSubStore):
968 def manifest_variables(
969 self, *, create_if_absent: bool = True
970 ) -> MutableYAMLManifestVariables:
971 d = MutableYAMLManifestVariables(self._store, MK_MANIFEST_VARIABLES)
972 if create_if_absent: 972 ↛ 973line 972 didn't jump to line 973 because the condition on line 972 was never true
973 d.create_definition_if_missing()
974 return d
977class MutableYAMLRemoveDuringCleanDefinitions(AbstractYAMLListSubStore[str]):
978 def append(self, rule: str) -> None:
979 self.create_definition_if_missing()
980 self._store.append(rule)
982 def __len__(self) -> int:
983 return len(self._store)
985 def extend(self, rules: Iterable[str]) -> None:
986 it = iter(rules)
987 try:
988 first_rule = next(it)
989 except StopIteration:
990 return
991 self.create_definition_if_missing()
992 self._store.append(first_rule)
993 self._store.extend(it)
996class MutableYAMLManifest:
997 def __init__(self, store: Any) -> None:
998 self._store = store
1000 @classmethod
1001 def empty_manifest(cls) -> "MutableYAMLManifest":
1002 return cls(CommentedMap({MK_MANIFEST_VERSION: DEFAULT_MANIFEST_VERSION}))
1004 @property
1005 def manifest_version(self) -> str:
1006 return self._store[MK_MANIFEST_VERSION]
1008 @manifest_version.setter
1009 def manifest_version(self, version: str) -> None:
1010 if version not in SUPPORTED_MANIFEST_VERSIONS:
1011 raise ValueError("Unsupported version")
1012 self._store[MK_MANIFEST_VERSION] = version
1014 def remove_during_clean(
1015 self,
1016 *,
1017 create_if_absent: bool = True,
1018 ) -> MutableYAMLRemoveDuringCleanDefinitions:
1019 d = MutableYAMLRemoveDuringCleanDefinitions(
1020 self._store, MK_MANIFEST_REMOVE_DURING_CLEAN
1021 )
1022 if create_if_absent: 1022 ↛ 1023line 1022 didn't jump to line 1023 because the condition on line 1022 was never true
1023 d.create_definition_if_missing()
1024 return d
1026 def installations(
1027 self,
1028 *,
1029 create_if_absent: bool = True,
1030 ) -> MutableYAMLInstallationsDefinition:
1031 d = MutableYAMLInstallationsDefinition(self._store, MK_INSTALLATIONS)
1032 if create_if_absent: 1032 ↛ 1033line 1032 didn't jump to line 1033 because the condition on line 1032 was never true
1033 d.create_definition_if_missing()
1034 return d
1036 def manifest_definitions(
1037 self,
1038 *,
1039 create_if_absent: bool = True,
1040 ) -> MutableYAMLManifestDefinitions:
1041 d = MutableYAMLManifestDefinitions(self._store, MK_MANIFEST_DEFINITIONS)
1042 if create_if_absent: 1042 ↛ 1043line 1042 didn't jump to line 1043 because the condition on line 1042 was never true
1043 d.create_definition_if_missing()
1044 return d
1046 def package(
1047 self, name: str, *, create_if_absent: bool = True
1048 ) -> MutableYAMLPackageDefinition:
1049 if MK_PACKAGES not in self._store: 1049 ↛ 1051line 1049 didn't jump to line 1051 because the condition on line 1049 was always true
1050 self._store[MK_PACKAGES] = CommentedMap()
1051 packages_store = self._store[MK_PACKAGES]
1052 package = packages_store.get(name)
1053 if package is None: 1053 ↛ 1060line 1053 didn't jump to line 1060 because the condition on line 1053 was always true
1054 if not create_if_absent: 1054 ↛ 1055line 1054 didn't jump to line 1055 because the condition on line 1054 was never true
1055 raise KeyError(name)
1056 assert packages_store is not None
1057 d = MutableYAMLPackageDefinition(packages_store, name)
1058 d.create_definition()
1059 else:
1060 d = MutableYAMLPackageDefinition(packages_store, name)
1061 return d
1063 def write_to(self, fd) -> None:
1064 MANIFEST_YAML.dump(self._store, fd)
1067def _describe_missing_path(entry: VirtualPath) -> str:
1068 if entry.is_dir:
1069 return f"{entry.fs_path}/ (empty directory; possible integration point)"
1070 if entry.is_symlink:
1071 target = os.readlink(entry.fs_path)
1072 return f"{entry.fs_path} (symlink; links to {target})"
1073 if entry.is_file:
1074 return f"{entry.fs_path} (file)"
1075 return f"{entry.fs_path} (other!? Probably not supported by debputy and may need a `remove`)"
1078def _detect_missing_installations(
1079 path_matcher: SourcePathMatcher,
1080 search_dir: VirtualPath,
1081) -> None:
1082 if not search_dir.is_dir: 1082 ↛ 1083line 1082 didn't jump to line 1083 because the condition on line 1082 was never true
1083 return
1084 missing = list(path_matcher.detect_missing(search_dir))
1085 if not missing:
1086 return
1088 excl = textwrap.dedent("""\
1089 - discard: "*"
1090 """)
1092 raise PathNotCoveredByInstallRulesError(
1093 "Please review the list and add either install rules or exclusions to `installations` in"
1094 " debian/debputy.manifest. If you do not need any of these paths, add the following to the"
1095 f" end of your 'installations`:\n\n{excl}\n",
1096 missing,
1097 search_dir,
1098 )
1101def _list_automatic_discard_rules(path_matcher: SourcePathMatcher) -> None:
1102 used_discard_rules = path_matcher.used_auto_discard_rules
1103 # Discard rules can match and then be overridden. In that case, they appear
1104 # but have 0 matches.
1105 if not sum((len(v) for v in used_discard_rules.values()), 0):
1106 return
1107 _info("The following automatic discard rules were triggered:")
1108 example_path: str | None = None
1109 for rule in sorted(used_discard_rules):
1110 for fs_path in sorted(used_discard_rules[rule]):
1111 if example_path is None: 1111 ↛ 1113line 1111 didn't jump to line 1113 because the condition on line 1111 was always true
1112 example_path = fs_path
1113 _info(f" * {rule} -> {fs_path}")
1114 assert example_path is not None
1115 _info("")
1116 _info(
1117 "Note that some of these may have been overruled. The overrule detection logic is not"
1118 )
1119 _info("100% reliable.")
1120 _info("")
1121 _info(
1122 "You can overrule an automatic discard rule by explicitly listing the path. As an example:"
1123 )
1124 _info(" installations:")
1125 _info(" - install:")
1126 _info(f" source: {example_path}")
1129def _install_everything_from_source_dir_if_present(
1130 dctrl_bin: BinaryPackage,
1131 substitution: Substitution,
1132 path_matcher: SourcePathMatcher,
1133 install_rule_context: InstallRuleContext,
1134 source_condition_context: ConditionContext,
1135 source_dir: VirtualPath,
1136 *,
1137 into_dir: VirtualPath | None = None,
1138) -> None:
1139 attribute_path = AttributePath.builtin_path()[f"installing {source_dir.fs_path}"]
1140 pkg_set = frozenset([dctrl_bin])
1141 install_rule = run_in_context_of_plugin(
1142 "debputy",
1143 InstallRule.install_dest,
1144 [FileSystemMatchRule.from_path_match("*", attribute_path, substitution)],
1145 None,
1146 pkg_set,
1147 f"Built-in; install everything from {source_dir.fs_path} into {dctrl_bin.name}",
1148 None,
1149 )
1150 pkg_search_dir: tuple[SearchDir] = (
1151 SearchDir(
1152 source_dir,
1153 pkg_set,
1154 ),
1155 )
1156 replacements = {
1157 "search_dirs": pkg_search_dir,
1158 }
1159 if into_dir is not None: 1159 ↛ 1160line 1159 didn't jump to line 1160 because the condition on line 1159 was never true
1160 binary_package_contexts = dict(install_rule_context.binary_package_contexts)
1161 updated = binary_package_contexts[dctrl_bin.name].replace(fs_root=into_dir)
1162 binary_package_contexts[dctrl_bin.name] = updated
1163 replacements["binary_package_contexts"] = binary_package_contexts
1165 fake_install_rule_context = install_rule_context.replace(**replacements)
1166 try:
1167 install_rule.perform_install(
1168 path_matcher,
1169 fake_install_rule_context,
1170 source_condition_context,
1171 )
1172 except (
1173 NoMatchForInstallPatternError,
1174 PathAlreadyInstalledOrDiscardedError,
1175 ):
1176 # Empty directory or everything excluded by default; ignore the error
1177 pass
1180def _add_build_install_dirs_to_per_package_search_dirs(
1181 build_system_install_dirs: Sequence[tuple[str, frozenset[BinaryPackage]]],
1182 per_package_search_dirs: dict[BinaryPackage, list[VirtualPath]],
1183 as_path: Callable[[str], VirtualPath],
1184) -> None:
1185 seen_pp_search_dirs: set[tuple[BinaryPackage, str]] = set()
1186 for dest_dir, for_packages in build_system_install_dirs:
1187 dest_path = as_path(dest_dir)
1188 for pkg in for_packages:
1189 seen_key = (pkg, dest_dir)
1190 if seen_key in seen_pp_search_dirs:
1191 continue
1192 seen_pp_search_dirs.add(seen_key)
1193 if pkg not in per_package_search_dirs:
1194 per_package_search_dirs[pkg] = [dest_path]
1195 else:
1196 per_package_search_dirs[pkg].append(dest_path)
1199class HighLevelManifest:
1200 def __init__(
1201 self,
1202 manifest_path: str,
1203 mutable_manifest: MutableYAMLManifest | None,
1204 remove_during_clean_rules: list[FileSystemMatchRule],
1205 install_rules: list[InstallRule] | None,
1206 source_package: SourcePackage,
1207 binary_packages: Mapping[str, BinaryPackage],
1208 substitution: Substitution,
1209 package_transformations: Mapping[str, PackageTransformationDefinition],
1210 dpkg_architecture_variables: DpkgArchitectureBuildProcessValuesTable,
1211 dpkg_arch_query_table: DpkgArchTable,
1212 build_env: DebBuildOptionsAndProfiles,
1213 build_environments: BuildEnvironments,
1214 build_rules: list[BuildRule] | None,
1215 value_table: Mapping[
1216 tuple[SourcePackage | BinaryPackage, type[Any]],
1217 Any,
1218 ],
1219 plugin_provided_feature_set: PluginProvidedFeatureSet,
1220 debian_dir: VirtualPath,
1221 ) -> None:
1222 self.manifest_path = manifest_path
1223 self.mutable_manifest = mutable_manifest
1224 self._remove_during_clean_rules: list[FileSystemMatchRule] = (
1225 remove_during_clean_rules
1226 )
1227 self._install_rules = install_rules
1228 self.source_package = source_package
1229 self._binary_packages = binary_packages
1230 self.substitution = substitution
1231 self.package_transformations = package_transformations
1232 self._dpkg_architecture_variables = dpkg_architecture_variables
1233 self.dpkg_arch_query_table = dpkg_arch_query_table
1234 self._build_env = build_env
1235 self._used_for: set[str] = set()
1236 self.build_environments = build_environments
1237 self.build_rules = build_rules
1238 self._value_table = value_table
1239 self._plugin_provided_feature_set = plugin_provided_feature_set
1240 self._debian_dir = debian_dir
1241 self._source_condition_context = ConditionContext(
1242 binary_package=None,
1243 substitution=self.substitution,
1244 deb_options_and_profiles=self._build_env,
1245 dpkg_architecture_variables=self._dpkg_architecture_variables,
1246 dpkg_arch_query_table=self.dpkg_arch_query_table,
1247 )
1249 def source_version(self, include_binnmu_version: bool = True) -> str:
1250 # TODO: There should an easier way to determine the source version; really.
1251 version_var = "{{DEB_VERSION}}"
1252 if not include_binnmu_version:
1253 version_var = "{{_DEBPUTY_INTERNAL_NON_BINNMU_SOURCE}}"
1254 try:
1255 return self.substitution.substitute(
1256 version_var, "internal (resolve version)"
1257 )
1258 except DebputySubstitutionError as e:
1259 raise AssertionError(f"Could not resolve {version_var}") from e
1261 @property
1262 def source_condition_context(self) -> ConditionContext:
1263 return self._source_condition_context
1265 @property
1266 def debian_dir(self) -> VirtualPath:
1267 return self._debian_dir
1269 @property
1270 def dpkg_architecture_variables(self) -> DpkgArchitectureBuildProcessValuesTable:
1271 return self._dpkg_architecture_variables
1273 @property
1274 def deb_options_and_profiles(self) -> DebBuildOptionsAndProfiles:
1275 return self._build_env
1277 @property
1278 def plugin_provided_feature_set(self) -> PluginProvidedFeatureSet:
1279 return self._plugin_provided_feature_set
1281 @property
1282 def remove_during_clean_rules(self) -> list[FileSystemMatchRule]:
1283 return self._remove_during_clean_rules
1285 @property
1286 def active_packages(self) -> Iterable[BinaryPackage]:
1287 yield from (p for p in self._binary_packages.values() if p.should_be_acted_on)
1289 @property
1290 def all_packages(self) -> Iterable[BinaryPackage]:
1291 yield from self._binary_packages.values()
1293 def manifest_configuration[T](
1294 self,
1295 context_package: SourcePackage | BinaryPackage,
1296 value_type: type[T],
1297 ) -> T | None:
1298 res = self._value_table.get((context_package, value_type))
1299 return typing.cast("T | None", res)
1301 def package_state_for(self, package: str) -> PackageTransformationDefinition:
1302 return self.package_transformations[package]
1304 def _detect_doc_main_package_for(self, package: BinaryPackage) -> BinaryPackage:
1305 name = package.name
1306 for doc_main_field in ("Doc-Main-Package", "X-Doc-Main-Package"):
1307 doc_main_package_name = package.fields.get(doc_main_field)
1308 if doc_main_package_name: 1308 ↛ 1309line 1308 didn't jump to line 1309 because the condition on line 1308 was never true
1309 main_package = self._binary_packages.get(doc_main_package_name)
1310 if main_package is None:
1311 _error(
1312 f"Invalid Doc-Main-Package for {name}: The package {doc_main_package_name!r} is not listed in d/control"
1313 )
1314 return main_package
1315 # If it is not a -doc package, then docs should be installed
1316 # under its own package name.
1317 if not name.endswith("-doc"): 1317 ↛ 1319line 1317 didn't jump to line 1319 because the condition on line 1317 was always true
1318 return package
1319 name = name[:-4]
1320 main_package = self._binary_packages.get(name)
1321 if main_package:
1322 return main_package
1323 if name.startswith("lib"):
1324 dev_pkg = self._binary_packages.get(f"{name}-dev")
1325 if dev_pkg:
1326 return dev_pkg
1328 # If we found no better match; default to the doc package itself.
1329 return package
1331 def perform_installations(
1332 self,
1333 integration_mode: DebputyIntegrationMode,
1334 build_system_install_dirs: Sequence[tuple[str, frozenset[BinaryPackage]]],
1335 *,
1336 install_request_context: InstallSearchDirContext | None = None,
1337 ) -> PackageDataTable:
1338 package_data_dict = {}
1339 package_data_table = PackageDataTable(package_data_dict)
1340 enable_manifest_installation_feature = (
1341 integration_mode != INTEGRATION_MODE_DH_DEBPUTY_RRR
1342 )
1344 if build_system_install_dirs: 1344 ↛ 1345line 1344 didn't jump to line 1345 because the condition on line 1344 was never true
1345 if integration_mode != INTEGRATION_MODE_FULL:
1346 raise ValueError(
1347 "The build_system_install_dirs parameter can only be used in full integration mode"
1348 )
1349 if install_request_context:
1350 raise ValueError(
1351 "The build_system_install_dirs parameter cannot be used with install_request_context"
1352 " (not implemented)"
1353 )
1355 if install_request_context is None: 1355 ↛ 1357line 1355 didn't jump to line 1357 because the condition on line 1355 was never true
1357 @functools.lru_cache(None)
1358 def _as_path(fs_path: str) -> VirtualPath:
1359 return OSFSROOverlay.create_root_dir(".", fs_path)
1361 dtmp_dir = _as_path("debian/tmp")
1362 source_root_dir = _as_path(".")
1363 into = frozenset(self._binary_packages.values())
1364 per_package_search_dirs = {
1365 t.binary_package: [_as_path(f.match_rule.path) for f in t.search_dirs]
1366 for t in self.package_transformations.values()
1367 if t.search_dirs is not None
1368 }
1370 if integration_mode == INTEGRATION_MODE_FULL:
1371 # In this mode, we have no default search dir. Everything ends up being
1372 # per-package instead (since that is easier logic-wise).
1373 #
1374 # Even dtmp_dir is omitted here (since it is not universally applicable).
1375 # Note we still initialize dtmp_dir, since it affects a later guard.
1376 default_search_dirs = []
1377 _add_build_install_dirs_to_per_package_search_dirs(
1378 build_system_install_dirs,
1379 per_package_search_dirs,
1380 _as_path,
1381 )
1383 # We can end here with per_package_search_dirs having no search dirs for any package
1384 # (single binary, where everything is installed into d/<pkg> is the most common case).
1385 #
1386 # This is not a problem in itself as the installation rules can still apply to the
1387 # source root and there should be no reason to install something from d/<pkg> into
1388 # d/<another-pkg>
1389 else:
1390 default_search_dirs = [dtmp_dir]
1392 search_dirs = _determine_search_dir_order(
1393 per_package_search_dirs,
1394 into,
1395 default_search_dirs,
1396 source_root_dir,
1397 )
1398 check_for_uninstalled_dirs = tuple(
1399 s.search_dir
1400 for s in search_dirs
1401 if s.search_dir.fs_path != source_root_dir.fs_path
1402 )
1403 if enable_manifest_installation_feature:
1404 _present_installation_dirs(
1405 search_dirs, check_for_uninstalled_dirs, into
1406 )
1407 else:
1408 dtmp_dir = None
1409 search_dirs = install_request_context.search_dirs
1410 into = frozenset(self._binary_packages.values())
1411 seen: set[BinaryPackage] = set()
1412 for search_dir in search_dirs:
1413 seen.update(search_dir.applies_to)
1415 missing = into - seen
1416 if missing: 1416 ↛ 1417line 1416 didn't jump to line 1417 because the condition on line 1416 was never true
1417 names = ", ".join(p.name for p in missing)
1418 raise ValueError(
1419 f"The following package(s) had no search dirs: {names}."
1420 " (Generally, the source root would be applicable to all packages)"
1421 )
1422 extra_names = seen - into
1423 if extra_names: 1423 ↛ 1424line 1423 didn't jump to line 1424 because the condition on line 1423 was never true
1424 names = ", ".join(p.name for p in extra_names)
1425 raise ValueError(
1426 f"The install_request_context referenced the following unknown package(s): {names}"
1427 )
1429 check_for_uninstalled_dirs = (
1430 install_request_context.check_for_uninstalled_dirs
1431 )
1433 install_rule_context = InstallRuleContext(search_dirs)
1435 if ( 1435 ↛ 1442line 1435 didn't jump to line 1442 because the condition on line 1435 was never true
1436 enable_manifest_installation_feature
1437 and self._install_rules is None
1438 # TODO: Should we also do this for full mode when build systems provided search dirs?
1439 and dtmp_dir is not None
1440 and os.path.isdir(dtmp_dir.fs_path)
1441 ):
1442 msg = (
1443 "The build system appears to have provided the output of upstream build system's"
1444 " install in debian/tmp. However, these are no provisions for debputy to install"
1445 " any of that into any of the debian packages listed in debian/control."
1446 " To avoid accidentally creating empty packages, debputy will insist that you "
1447 " explicitly define an empty installation definition if you did not want to "
1448 " install any of those files even though they have been provided."
1449 ' Example: "installations: []"'
1450 )
1451 _error(msg)
1452 elif ( 1452 ↛ 1455line 1452 didn't jump to line 1455 because the condition on line 1452 was never true
1453 not enable_manifest_installation_feature and self._install_rules is not None
1454 ):
1455 _error(
1456 f"The `installations` feature cannot be used in {self.manifest_path} with this integration mode."
1457 f" Please remove or comment out the `installations` keyword."
1458 )
1460 for dctrl_bin in self.all_packages:
1461 package = dctrl_bin.name
1462 doc_main_package = self._detect_doc_main_package_for(dctrl_bin)
1464 install_rule_context[package] = BinaryPackageInstallRuleContext(
1465 dctrl_bin,
1466 InMemoryVirtualRootDir(),
1467 doc_main_package,
1468 )
1470 if enable_manifest_installation_feature: 1470 ↛ 1475line 1470 didn't jump to line 1475 because the condition on line 1470 was always true
1471 discard_rules = list(
1472 self.plugin_provided_feature_set.auto_discard_rules.values()
1473 )
1474 else:
1475 discard_rules = [
1476 self.plugin_provided_feature_set.auto_discard_rules["debian-dir"]
1477 ]
1478 path_matcher = SourcePathMatcher(discard_rules)
1480 source_condition_context = self._source_condition_context
1482 for dctrl_bin in self.active_packages:
1483 package = dctrl_bin.name
1484 if install_request_context: 1484 ↛ 1489line 1484 didn't jump to line 1489 because the condition on line 1484 was always true
1485 build_system_staging_dir = install_request_context.debian_pkg_dirs.get(
1486 package
1487 )
1488 else:
1489 build_system_staging_dir_fs_path = os.path.join("debian", package)
1490 if os.path.isdir(build_system_staging_dir_fs_path):
1491 build_system_staging_dir = OSFSROOverlay.create_root_dir(
1492 ".",
1493 build_system_staging_dir_fs_path,
1494 )
1495 else:
1496 build_system_staging_dir = None
1498 if build_system_staging_dir is not None:
1499 _install_everything_from_source_dir_if_present(
1500 dctrl_bin,
1501 self.substitution,
1502 path_matcher,
1503 install_rule_context,
1504 source_condition_context,
1505 build_system_staging_dir,
1506 )
1508 if self._install_rules:
1509 # FIXME: Check that every install rule remains used after transformations have run.
1510 # What we want to check is transformations do not exclude everything from an install
1511 # rule. The hard part here is that renaming (etc.) is fine, so we cannot 1:1 string
1512 # match.
1513 for install_rule in self._install_rules:
1514 install_rule.perform_install(
1515 path_matcher,
1516 install_rule_context,
1517 source_condition_context,
1518 )
1520 if enable_manifest_installation_feature: 1520 ↛ 1524line 1520 didn't jump to line 1524 because the condition on line 1520 was always true
1521 for search_dir in check_for_uninstalled_dirs:
1522 _detect_missing_installations(path_matcher, search_dir)
1524 for dctrl_bin in self.all_packages:
1525 package = dctrl_bin.name
1526 binary_install_rule_context = install_rule_context[package]
1527 build_system_pkg_staging_dir = os.path.join("debian", package)
1528 fs_root = binary_install_rule_context.fs_root
1530 context = self.package_transformations[package]
1531 if dctrl_bin.should_be_acted_on and enable_manifest_installation_feature:
1532 for special_install_rule in context.install_rules: 1532 ↛ 1533line 1532 didn't jump to line 1533 because the loop on line 1532 never started
1533 special_install_rule.perform_install(
1534 path_matcher,
1535 install_rule_context,
1536 source_condition_context,
1537 )
1539 if dctrl_bin.should_be_acted_on:
1540 self.apply_fs_transformations(package, fs_root)
1541 substvars_file = f"debian/{package}.substvars"
1542 substvars = FlushableSubstvars.load_from_path(
1543 substvars_file, missing_ok=True
1544 )
1545 # We do not want to touch the substvars file (non-clean rebuild contamination)
1546 substvars.substvars_path = None
1547 else:
1548 substvars = FlushableSubstvars()
1550 udeb_package = self._binary_packages.get(f"{package}-udeb")
1551 if udeb_package and not udeb_package.is_udeb: 1551 ↛ 1552line 1551 didn't jump to line 1552 because the condition on line 1551 was never true
1552 udeb_package = None
1554 package_metadata_context = PackageProcessingContextProvider(
1555 self,
1556 dctrl_bin,
1557 udeb_package,
1558 package_data_table,
1559 )
1561 ctrl_creator = BinaryCtrlAccessorProviderCreator(
1562 package_metadata_context,
1563 substvars,
1564 context.maintscript_snippets,
1565 context.substitution,
1566 )
1568 if not enable_manifest_installation_feature: 1568 ↛ 1569line 1568 didn't jump to line 1569 because the condition on line 1568 was never true
1569 assert_no_dbgsym_migration(dctrl_bin)
1570 dh_dbgsym_root_fs = dhe_dbgsym_root_dir(dctrl_bin)
1571 dh_dbgsym_root_path = OSFSROOverlay.create_root_dir(
1572 "",
1573 dh_dbgsym_root_fs,
1574 )
1575 dbgsym_root_fs = InMemoryVirtualRootDir()
1576 _install_everything_from_source_dir_if_present(
1577 dctrl_bin,
1578 self.substitution,
1579 path_matcher,
1580 install_rule_context,
1581 source_condition_context,
1582 dh_dbgsym_root_path,
1583 into_dir=dbgsym_root_fs,
1584 )
1585 dbgsym_build_ids = read_dbgsym_file(dctrl_bin)
1586 dbgsym_info = DbgsymInfo(
1587 dctrl_bin,
1588 dbgsym_root_fs,
1589 os.path.join(dh_dbgsym_root_fs, "DEBIAN"),
1590 dbgsym_build_ids,
1591 # TODO: Provide manifest feature to support this.
1592 False,
1593 )
1594 else:
1595 dbgsym_info = DbgsymInfo(
1596 dctrl_bin,
1597 InMemoryVirtualRootDir(),
1598 None,
1599 [],
1600 False,
1601 )
1603 package_data_dict[package] = BinaryPackageData(
1604 self.source_package,
1605 dctrl_bin,
1606 build_system_pkg_staging_dir,
1607 fs_root,
1608 substvars,
1609 package_metadata_context,
1610 ctrl_creator,
1611 dbgsym_info,
1612 )
1614 if enable_manifest_installation_feature: 1614 ↛ 1617line 1614 didn't jump to line 1617 because the condition on line 1614 was always true
1615 _list_automatic_discard_rules(path_matcher)
1617 return package_data_table
1619 def condition_context(
1620 self, binary_package: BinaryPackage | str | None
1621 ) -> ConditionContext:
1622 if binary_package is None: 1622 ↛ 1623line 1622 didn't jump to line 1623 because the condition on line 1622 was never true
1623 return self._source_condition_context
1624 if not isinstance(binary_package, str):
1625 binary_package = binary_package.name
1627 package_transformation = self.package_transformations[binary_package]
1628 return self._source_condition_context.replace(
1629 binary_package=package_transformation.binary_package,
1630 substitution=package_transformation.substitution,
1631 )
1633 def apply_fs_transformations(
1634 self,
1635 package: str,
1636 fs_root: InMemoryVirtualPathBase,
1637 ) -> None:
1638 if package in self._used_for: 1638 ↛ 1639line 1638 didn't jump to line 1639 because the condition on line 1638 was never true
1639 raise ValueError(
1640 f"data.tar contents for {package} has already been finalized!?"
1641 )
1642 if package not in self.package_transformations: 1642 ↛ 1643line 1642 didn't jump to line 1643 because the condition on line 1642 was never true
1643 raise ValueError(
1644 f'The package "{package}" was not relevant for the manifest!?'
1645 )
1646 package_transformation = self.package_transformations[package]
1647 condition_context = ConditionContext(
1648 binary_package=package_transformation.binary_package,
1649 substitution=package_transformation.substitution,
1650 deb_options_and_profiles=self._build_env,
1651 dpkg_architecture_variables=self._dpkg_architecture_variables,
1652 dpkg_arch_query_table=self.dpkg_arch_query_table,
1653 )
1654 norm_rules = list(
1655 builtin_mode_normalization_rules(
1656 self._dpkg_architecture_variables,
1657 package_transformation.binary_package,
1658 package_transformation.substitution,
1659 )
1660 )
1661 norm_mode_transformation_rule = ModeNormalizationTransformationRule(norm_rules)
1662 norm_mode_transformation_rule.transform_file_system(fs_root, condition_context)
1663 for transformation in package_transformation.transformations:
1664 transformation.run_transform_file_system(fs_root, condition_context)
1665 interpreter_normalization = NormalizeShebangLineTransformation()
1666 interpreter_normalization.transform_file_system(fs_root, condition_context)
1668 def finalize_data_tar_contents(
1669 self,
1670 package: str,
1671 fs_root: InMemoryVirtualPathBase,
1672 clamp_mtime_to: int,
1673 ) -> IntermediateManifest:
1674 if package in self._used_for: 1674 ↛ 1675line 1674 didn't jump to line 1675 because the condition on line 1674 was never true
1675 raise ValueError(
1676 f"data.tar contents for {package} has already been finalized!?"
1677 )
1678 if package not in self.package_transformations: 1678 ↛ 1679line 1678 didn't jump to line 1679 because the condition on line 1678 was never true
1679 raise ValueError(
1680 f'The package "{package}" was not relevant for the manifest!?'
1681 )
1682 self._used_for.add(package)
1684 # At this point, there so be no further mutations to the file system (because the will not
1685 # be present in the intermediate manifest)
1686 cast("FSRootDir", fs_root).is_read_write = False
1688 intermediate_manifest = list(
1689 _generate_intermediate_manifest(
1690 fs_root,
1691 clamp_mtime_to,
1692 )
1693 )
1694 return intermediate_manifest
1696 def apply_to_binary_staging_directory(
1697 self,
1698 package: str,
1699 fs_root: InMemoryVirtualPathBase,
1700 clamp_mtime_to: int,
1701 ) -> IntermediateManifest:
1702 self.apply_fs_transformations(package, fs_root)
1703 return self.finalize_data_tar_contents(package, fs_root, clamp_mtime_to)
1706@dataclasses.dataclass(slots=True)
1707class SearchDirOrderState:
1708 search_dir: VirtualPath
1709 applies_to: set[BinaryPackage] = dataclasses.field(default_factory=set)
1710 after: set[str] = dataclasses.field(default_factory=set)
1713def _present_installation_dirs(
1714 search_dirs: Sequence[SearchDir],
1715 checked_missing_dirs: Sequence[VirtualPath],
1716 all_pkgs: frozenset[BinaryPackage],
1717) -> None:
1718 _info("The following directories are considered search dirs (in order):")
1719 max_len = max((len(s.search_dir.fs_path) for s in search_dirs), default=1)
1720 for search_dir in search_dirs:
1721 applies_to = ""
1722 if search_dir.applies_to < all_pkgs:
1723 names = ", ".join(p.name for p in search_dir.applies_to)
1724 applies_to = f" [only applicable to: {names}]"
1725 remark = ""
1726 if not os.path.isdir(search_dir.search_dir.fs_path):
1727 remark = " (skipped; absent)"
1728 _info(f" * {search_dir.search_dir.fs_path:{max_len}}{applies_to}{remark}")
1730 if checked_missing_dirs:
1731 _info('The following directories are considered for "not-installed" paths;')
1732 for d in checked_missing_dirs:
1733 remark = ""
1734 if not os.path.isdir(d.fs_path):
1735 remark = " (skipped; absent)"
1736 _info(f" * {d.fs_path:{max_len}}{remark}")
1739def _determine_search_dir_order(
1740 requested: Mapping[BinaryPackage, list[VirtualPath]],
1741 all_pkgs: frozenset[BinaryPackage],
1742 default_search_dirs: list[VirtualPath],
1743 source_root: VirtualPath,
1744) -> Sequence[SearchDir]:
1745 search_dir_table = dict[str, SearchDirOrderState]()
1746 assert requested.keys() <= all_pkgs
1747 for pkg in all_pkgs:
1748 paths = requested.get(pkg, default_search_dirs)
1749 previous_search_dir: SearchDirOrderState | None = None
1750 for path in paths:
1751 try:
1752 search_dir_state = search_dir_table[path.fs_path]
1753 except KeyError:
1754 search_dir_state = SearchDirOrderState(path)
1755 search_dir_table[path.fs_path] = search_dir_state
1756 search_dir_state.applies_to.add(pkg)
1757 if previous_search_dir is not None:
1758 search_dir_state.after.add(previous_search_dir.search_dir.fs_path)
1759 previous_search_dir = search_dir_state
1761 search_dirs_in_order = []
1762 released = set[str]()
1763 remaining = set()
1764 for search_dir_state in search_dir_table.values():
1765 if not (search_dir_state.after <= released):
1766 remaining.add(search_dir_state.search_dir.fs_path)
1767 continue
1768 search_dirs_in_order.append(search_dir_state)
1769 released.add(search_dir_state.search_dir.fs_path)
1771 while remaining:
1772 current_released = len(released)
1773 for fs_path in remaining:
1774 search_dir_state = search_dir_table[fs_path]
1775 if not search_dir_state.after.issubset(released):
1776 remaining.add(search_dir_state.search_dir.fs_path)
1777 continue
1778 search_dirs_in_order.append(search_dir_state)
1779 released.add(search_dir_state.search_dir.fs_path)
1781 if current_released == len(released):
1782 names = ", ".join(remaining)
1783 _error(
1784 f"There is a circular dependency (somewhere) between the search dirs: {names}."
1785 " Note that the search directories across all packages have to be ordered (and the"
1786 " source root should generally be last)"
1787 )
1788 remaining -= released
1790 search_dirs_in_order.append(
1791 SearchDirOrderState(
1792 source_root,
1793 set(all_pkgs),
1794 )
1795 )
1797 return tuple(
1798 # Avoid duplicating all_pkgs
1799 SearchDir(
1800 s.search_dir,
1801 frozenset(s.applies_to) if s.applies_to != all_pkgs else all_pkgs,
1802 )
1803 for s in search_dirs_in_order
1804 )