Coverage for src/debputy/highlevel_manifest.py: 65%
900 statements
« prev ^ index » next coverage.py v7.8.2, created at 2026-02-28 21:56 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2026-02-28 21:56 +0000
1import dataclasses
2import functools
3import os
4import textwrap
5import typing
6from contextlib import suppress
7from dataclasses import dataclass, field
8from typing import (
9 Any,
10 TypeVar,
11 Generic,
12 cast,
13)
14from collections.abc import Iterable, Mapping, Sequence, Callable
16from debian.debian_support import DpkgArchTable
18from debputy.dh.debhelper_emulation import (
19 dhe_dbgsym_root_dir,
20 assert_no_dbgsym_migration,
21 read_dbgsym_file,
22)
23from ._deb_options_profiles import DebBuildOptionsAndProfiles
24from ._manifest_constants import *
25from .architecture_support import DpkgArchitectureBuildProcessValuesTable
26from .builtin_manifest_rules import builtin_mode_normalization_rules
27from .exceptions import (
28 DebputySubstitutionError,
29 DebputyRuntimeErrorWithPreamble,
30)
31from .filesystem_scan import (
32 InMemoryVirtualRootDir,
33 OSFSROOverlay,
34 FSControlRootDir,
35 InMemoryVirtualPathBase,
36)
37from .installations import (
38 InstallRule,
39 SourcePathMatcher,
40 PathAlreadyInstalledOrDiscardedError,
41 NoMatchForInstallPatternError,
42 InstallRuleContext,
43 BinaryPackageInstallRuleContext,
44 InstallSearchDirContext,
45 SearchDir,
46)
47from .intermediate_manifest import TarMember, PathType, IntermediateManifest
48from .maintscript_snippet import (
49 DpkgMaintscriptHelperCommand,
50 MaintscriptSnippetContainer,
51)
52from .manifest_conditions import ConditionContext
53from .manifest_parser.base_types import (
54 FileSystemMatchRule,
55 FileSystemExactMatchRule,
56 BuildEnvironments,
57)
58from .manifest_parser.util import AttributePath
59from .packager_provided_files import PackagerProvidedFile
60from .packages import BinaryPackage, SourcePackage
61from .plugin.api.feature_set import PluginProvidedFeatureSet
62from .plugin.api.impl import BinaryCtrlAccessorProviderCreator
63from .plugin.api.impl_types import (
64 PackageProcessingContextProvider,
65 PackageDataTable,
66)
67from .plugin.api.spec import (
68 FlushableSubstvars,
69 VirtualPath,
70 DebputyIntegrationMode,
71 INTEGRATION_MODE_DH_DEBPUTY_RRR,
72 INTEGRATION_MODE_FULL,
73)
74from debputy.plugins.debputy.binary_package_rules import ServiceRule
75from debputy.plugins.debputy.build_system_rules import BuildRule
76from .plugin.plugin_state import run_in_context_of_plugin
77from .substitution import Substitution
78from .transformation_rules import (
79 TransformationRule,
80 ModeNormalizationTransformationRule,
81 NormalizeShebangLineTransformation,
82)
83from .util import (
84 _error,
85 _warn,
86 debian_policy_normalize_symlink_target,
87 generated_content_dir,
88 _info,
89)
90from .yaml import MANIFEST_YAML
91from .yaml.compat import CommentedMap, CommentedSeq
94def tar_path(p: VirtualPath) -> str:
95 path = p.path
96 if p.is_dir:
97 return path + "/"
98 return path
101def tar_owner_info(path: InMemoryVirtualPathBase) -> tuple[str, int, str, int]:
102 owner = path._owner # noqa
103 group = path._group # noqa
104 return (
105 owner.entity_name,
106 owner.entity_id,
107 group.entity_name,
108 group.entity_id,
109 )
112class PathNotCoveredByInstallRulesError(DebputyRuntimeErrorWithPreamble):
114 @property
115 def unmatched_paths(self) -> Sequence[VirtualPath]:
116 return self.args[1]
118 @property
119 def search_dir(self) -> VirtualPath:
120 return self.args[2]
122 def render_preamble(self) -> None:
123 _warn(
124 f"The following paths were present in {self.search_dir.fs_path}, but not installed (nor explicitly discarded)."
125 )
126 _warn("")
127 for entry in self.unmatched_paths:
128 desc = _describe_missing_path(entry)
129 _warn(f" * {desc}")
130 _warn("")
133@dataclass(slots=True)
134class DbgsymInfo:
135 binary_package: BinaryPackage
136 dbgsym_fs_root: InMemoryVirtualPathBase
137 _dbgsym_root_fs: str | None
138 dbgsym_ids: list[str]
139 run_dwz: bool
141 @property
142 def dbgsym_root_dir(self) -> str:
143 root_dir = self._dbgsym_root_fs
144 if root_dir is None:
145 root_dir = generated_content_dir(
146 package=self.binary_package,
147 subdir_key="dbgsym-fs-root",
148 )
149 self._dbgsym_root_fs = root_dir
150 return root_dir
152 @property
153 def dbgsym_ctrl_dir(self) -> FSControlRootDir:
154 return FSControlRootDir.create_root_dir(
155 os.path.join(self.dbgsym_root_dir, "DEBIAN")
156 )
159@dataclass(slots=True, frozen=True)
160class BinaryPackageData:
161 source_package: SourcePackage
162 binary_package: BinaryPackage
163 binary_staging_root_dir: str
164 fs_root: InMemoryVirtualPathBase
165 substvars: FlushableSubstvars
166 package_metadata_context: PackageProcessingContextProvider
167 ctrl_creator: BinaryCtrlAccessorProviderCreator
168 dbgsym_info: DbgsymInfo
170 @property
171 def control_output_dir(self) -> FSControlRootDir:
172 return FSControlRootDir.create_root_dir(
173 generated_content_dir(
174 package=self.binary_package,
175 subdir_key="DEBIAN",
176 )
177 )
180@dataclass(slots=True)
181class PackageTransformationDefinition:
182 binary_package: BinaryPackage
183 substitution: Substitution
184 is_auto_generated_package: bool
185 binary_version: str | None = None
186 search_dirs: list[FileSystemExactMatchRule] | None = None
187 dpkg_maintscript_helper_snippets: list[DpkgMaintscriptHelperCommand] = field(
188 default_factory=list
189 )
190 maintscript_snippets: dict[str, MaintscriptSnippetContainer] = field(
191 default_factory=dict
192 )
193 transformations: list[TransformationRule] = field(default_factory=list)
194 reserved_packager_provided_files: dict[str, list[PackagerProvidedFile]] = field(
195 default_factory=dict
196 )
197 install_rules: list[InstallRule] = field(default_factory=list)
198 requested_service_rules: list[ServiceRule] = field(default_factory=list)
201def _path_to_tar_member(
202 path: InMemoryVirtualPathBase,
203 clamp_mtime_to: int,
204) -> TarMember:
205 mtime = float(clamp_mtime_to)
206 owner, uid, group, gid = tar_owner_info(path)
207 mode = path.mode
209 if path.has_fs_path:
210 mtime = min(mtime, path.mtime)
212 if path.is_dir:
213 path_type = PathType.DIRECTORY
214 elif path.is_file:
215 # TODO: someday we will need to deal with hardlinks and it might appear here.
216 path_type = PathType.FILE
217 elif path.is_symlink: 217 ↛ 237line 217 didn't jump to line 237 because the condition on line 217 was always true
218 # Special-case that we resolve immediately (since we need to normalize the target anyway)
219 link_target = debian_policy_normalize_symlink_target(
220 path.path,
221 path.readlink(),
222 )
223 return TarMember.virtual_path(
224 tar_path(path),
225 PathType.SYMLINK,
226 mtime,
227 link_target=link_target,
228 # Force mode to be 0777 as that is the mode we see in the data.tar. In theory, tar lets you set
229 # it to whatever. However, for reproducibility, we have to be well-behaved - and that is 0777.
230 mode=0o0777,
231 owner=owner,
232 uid=uid,
233 group=group,
234 gid=gid,
235 )
236 else:
237 assert not path.is_symlink
238 raise AssertionError(
239 f"Unsupported file type: {path.path} - not a file, dir nor a symlink!"
240 )
242 if not path.has_fs_path:
243 assert not path.is_file
244 return TarMember.virtual_path(
245 tar_path(path),
246 path_type,
247 mtime,
248 mode=mode,
249 owner=owner,
250 uid=uid,
251 group=group,
252 gid=gid,
253 )
254 may_steal_fs_path = getattr(path, "_can_replace_inline", False)
255 return TarMember.from_file(
256 tar_path(path),
257 path.fs_path,
258 mode=mode,
259 uid=uid,
260 owner=owner,
261 gid=gid,
262 group=group,
263 path_type=path_type,
264 path_mtime=mtime,
265 clamp_mtime_to=clamp_mtime_to,
266 may_steal_fs_path=may_steal_fs_path,
267 )
270def _generate_intermediate_manifest(
271 fs_root: InMemoryVirtualPathBase,
272 clamp_mtime_to: int,
273) -> Iterable[TarMember]:
274 symlinks = []
275 for path in fs_root.all_paths():
276 tar_member = _path_to_tar_member(path, clamp_mtime_to)
277 if tar_member.path_type == PathType.SYMLINK:
278 symlinks.append(tar_member)
279 continue
280 yield tar_member
281 yield from symlinks
284ST = TypeVar("ST")
285T = TypeVar("T")
288class AbstractYAMLSubStore(Generic[ST]):
289 def __init__(
290 self,
291 parent_store: Any,
292 parent_key: int | str | None,
293 store: ST | None = None,
294 ) -> None:
295 if parent_store is not None and parent_key is not None:
296 try:
297 from_parent_store = parent_store[parent_key]
298 except (KeyError, IndexError):
299 from_parent_store = None
300 if ( 300 ↛ 305line 300 didn't jump to line 305 because the condition on line 300 was never true
301 store is not None
302 and from_parent_store is not None
303 and store is not parent_store
304 ):
305 raise ValueError(
306 "Store is provided but is not the one already in the parent store"
307 )
308 if store is None: 308 ↛ 310line 308 didn't jump to line 310 because the condition on line 308 was always true
309 store = from_parent_store
310 self._parent_store = parent_store
311 self._parent_key = parent_key
312 self._is_detached = (
313 parent_key is None or parent_store is None or parent_key not in parent_store
314 )
315 assert self._is_detached or store is not None
316 if store is None:
317 store = self._create_new_instance()
318 self._store: ST = store
320 def _create_new_instance(self) -> ST:
321 raise NotImplementedError
323 def create_definition_if_missing(self) -> None:
324 if self._is_detached:
325 self.create_definition()
327 def create_definition(self) -> None:
328 if not self._is_detached: 328 ↛ 329line 328 didn't jump to line 329 because the condition on line 328 was never true
329 raise RuntimeError("Definition is already present")
330 parent_store = self._parent_store
331 if parent_store is None: 331 ↛ 332line 331 didn't jump to line 332 because the condition on line 331 was never true
332 raise RuntimeError(
333 f"Definition is not attached to any parent!? ({self.__class__.__name__})"
334 )
335 if isinstance(parent_store, list):
336 assert self._parent_key is None
337 self._parent_key = len(parent_store)
338 self._parent_store.append(self._store)
339 else:
340 parent_store[self._parent_key] = self._store
341 self._is_detached = False
343 def remove_definition(self) -> None:
344 self._ensure_attached()
345 del self._parent_store[self._parent_key]
346 if isinstance(self._parent_store, list):
347 self._parent_key = None
348 self._is_detached = True
350 def _ensure_attached(self) -> None:
351 if self._is_detached:
352 raise RuntimeError("The definition has been removed!")
355class AbstractYAMLListSubStore(Generic[T], AbstractYAMLSubStore[list[T]]):
356 def _create_new_instance(self) -> list[T]:
357 return CommentedSeq()
360class AbstractYAMLDictSubStore(Generic[T], AbstractYAMLSubStore[dict[str, T]]):
361 def _create_new_instance(self) -> dict[str, T]:
362 return CommentedMap()
365class MutableCondition:
366 @classmethod
367 def arch_matches(cls, arch_filter: str) -> CommentedMap:
368 return CommentedMap({MK_CONDITION_ARCH_MATCHES: arch_filter})
370 @classmethod
371 def build_profiles_matches(cls, build_profiles_matches: str) -> CommentedMap:
372 return CommentedMap(
373 {MK_CONDITION_BUILD_PROFILES_MATCHES: build_profiles_matches}
374 )
377class MutableYAMLSymlink(AbstractYAMLDictSubStore[Any]):
378 @classmethod
379 def new_symlink(
380 cls, link_path: str, link_target: str, condition: Any | None
381 ) -> "MutableYAMLSymlink":
382 inner = {
383 MK_TRANSFORMATIONS_CREATE_SYMLINK_LINK_PATH: link_path,
384 MK_TRANSFORMATIONS_CREATE_SYMLINK_LINK_TARGET: link_target,
385 }
386 content = {MK_TRANSFORMATIONS_CREATE_SYMLINK: inner}
387 if condition is not None: 387 ↛ 388line 387 didn't jump to line 388 because the condition on line 387 was never true
388 inner["when"] = condition
389 return cls(None, None, store=CommentedMap(content))
391 @property
392 def symlink_path(self) -> str:
393 return self._store[MK_TRANSFORMATIONS_CREATE_SYMLINK][
394 MK_TRANSFORMATIONS_CREATE_SYMLINK_LINK_PATH
395 ]
397 @symlink_path.setter
398 def symlink_path(self, path: str) -> None:
399 self._store[MK_TRANSFORMATIONS_CREATE_SYMLINK][
400 MK_TRANSFORMATIONS_CREATE_SYMLINK_LINK_PATH
401 ] = path
403 @property
404 def symlink_target(self) -> str | None:
405 return self._store[MK_TRANSFORMATIONS_CREATE_SYMLINK][
406 MK_TRANSFORMATIONS_CREATE_SYMLINK_LINK_TARGET
407 ]
409 @symlink_target.setter
410 def symlink_target(self, target: str) -> None:
411 self._store[MK_TRANSFORMATIONS_CREATE_SYMLINK][
412 MK_TRANSFORMATIONS_CREATE_SYMLINK_LINK_TARGET
413 ] = target
416class MutableYAMLConffileManagementItem(AbstractYAMLDictSubStore[Any]):
417 @classmethod
418 def rm_conffile(
419 cls,
420 conffile: str,
421 prior_to_version: str | None,
422 owning_package: str | None,
423 ) -> "MutableYAMLConffileManagementItem":
424 r = cls(
425 None,
426 None,
427 store=CommentedMap(
428 {
429 MK_CONFFILE_MANAGEMENT_REMOVE: CommentedMap(
430 {MK_CONFFILE_MANAGEMENT_REMOVE_PATH: conffile}
431 )
432 }
433 ),
434 )
435 r.prior_to_version = prior_to_version
436 r.owning_package = owning_package
437 return r
439 @classmethod
440 def mv_conffile(
441 cls,
442 old_conffile: str,
443 new_conffile: str,
444 prior_to_version: str | None,
445 owning_package: str | None,
446 ) -> "MutableYAMLConffileManagementItem":
447 r = cls(
448 None,
449 None,
450 store=CommentedMap(
451 {
452 MK_CONFFILE_MANAGEMENT_RENAME: CommentedMap(
453 {
454 MK_CONFFILE_MANAGEMENT_RENAME_SOURCE: old_conffile,
455 MK_CONFFILE_MANAGEMENT_RENAME_TARGET: new_conffile,
456 }
457 )
458 }
459 ),
460 )
461 r.prior_to_version = prior_to_version
462 r.owning_package = owning_package
463 return r
465 @property
466 def _container(self) -> dict[str, Any]:
467 assert len(self._store) == 1
468 return next(iter(self._store.values()))
470 @property
471 def command(self) -> str:
472 assert len(self._store) == 1
473 return next(iter(self._store))
475 @property
476 def obsolete_conffile(self) -> str:
477 if self.command == MK_CONFFILE_MANAGEMENT_REMOVE:
478 return self._container[MK_CONFFILE_MANAGEMENT_REMOVE_PATH]
479 assert self.command == MK_CONFFILE_MANAGEMENT_RENAME
480 return self._container[MK_CONFFILE_MANAGEMENT_RENAME_SOURCE]
482 @obsolete_conffile.setter
483 def obsolete_conffile(self, value: str) -> None:
484 if self.command == MK_CONFFILE_MANAGEMENT_REMOVE:
485 self._container[MK_CONFFILE_MANAGEMENT_REMOVE_PATH] = value
486 else:
487 assert self.command == MK_CONFFILE_MANAGEMENT_RENAME
488 self._container[MK_CONFFILE_MANAGEMENT_RENAME_SOURCE] = value
490 @property
491 def new_conffile(self) -> str:
492 if self.command != MK_CONFFILE_MANAGEMENT_RENAME:
493 raise TypeError(
494 f"The new_conffile attribute is only applicable to command {MK_CONFFILE_MANAGEMENT_RENAME}."
495 f" This is a {self.command}"
496 )
497 return self._container[MK_CONFFILE_MANAGEMENT_RENAME_TARGET]
499 @new_conffile.setter
500 def new_conffile(self, value: str) -> None:
501 if self.command != MK_CONFFILE_MANAGEMENT_RENAME:
502 raise TypeError(
503 f"The new_conffile attribute is only applicable to command {MK_CONFFILE_MANAGEMENT_RENAME}."
504 f" This is a {self.command}"
505 )
506 self._container[MK_CONFFILE_MANAGEMENT_RENAME_TARGET] = value
508 @property
509 def prior_to_version(self) -> str | None:
510 return self._container.get(MK_CONFFILE_MANAGEMENT_X_PRIOR_TO_VERSION)
512 @prior_to_version.setter
513 def prior_to_version(self, value: str | None) -> None:
514 if value is None:
515 try:
516 del self._container[MK_CONFFILE_MANAGEMENT_X_PRIOR_TO_VERSION]
517 except KeyError:
518 pass
519 else:
520 self._container[MK_CONFFILE_MANAGEMENT_X_PRIOR_TO_VERSION] = value
522 @property
523 def owning_package(self) -> str | None:
524 return self._container[MK_CONFFILE_MANAGEMENT_X_PRIOR_TO_VERSION]
526 @owning_package.setter
527 def owning_package(self, value: str | None) -> None:
528 if value is None:
529 try:
530 del self._container[MK_CONFFILE_MANAGEMENT_X_OWNING_PACKAGE]
531 except KeyError:
532 pass
533 else:
534 self._container[MK_CONFFILE_MANAGEMENT_X_OWNING_PACKAGE] = value
537class MutableYAMLPackageDefinition(AbstractYAMLDictSubStore):
538 def _list_store(
539 self, key, *, create_if_absent: bool = False
540 ) -> list[dict[str, Any]] | None:
541 if self._is_detached or key not in self._store:
542 if create_if_absent: 542 ↛ 543line 542 didn't jump to line 543 because the condition on line 542 was never true
543 return None
544 self.create_definition_if_missing()
545 self._store[key] = []
546 return self._store[key]
548 def _insert_item(self, key: str, item: AbstractYAMLDictSubStore) -> None:
549 parent_store = self._list_store(key, create_if_absent=True)
550 assert parent_store is not None
551 if not item._is_detached or ( 551 ↛ 554line 551 didn't jump to line 554 because the condition on line 551 was never true
552 item._parent_store is not None and item._parent_store is not parent_store
553 ):
554 raise RuntimeError(
555 "Item is already attached or associated with a different container"
556 )
557 item._parent_store = parent_store
558 item.create_definition()
560 def add_symlink(self, symlink: MutableYAMLSymlink) -> None:
561 self._insert_item(MK_TRANSFORMATIONS, symlink)
563 def symlinks(self) -> Iterable[MutableYAMLSymlink]:
564 store = self._list_store(MK_TRANSFORMATIONS)
565 if store is None: 565 ↛ 566line 565 didn't jump to line 566 because the condition on line 565 was never true
566 return
567 for i in range(len(store)): 567 ↛ 568line 567 didn't jump to line 568 because the loop on line 567 never started
568 d = store[i]
569 if d and isinstance(d, dict) and len(d) == 1 and "symlink" in d:
570 yield MutableYAMLSymlink(store, i)
572 def conffile_management_items(self) -> Iterable[MutableYAMLConffileManagementItem]:
573 store = self._list_store(MK_CONFFILE_MANAGEMENT)
574 if store is None: 574 ↛ 575line 574 didn't jump to line 575 because the condition on line 574 was never true
575 return
576 yield from (
577 MutableYAMLConffileManagementItem(store, i) for i in range(len(store))
578 )
580 def add_conffile_management(
581 self, conffile_management_item: MutableYAMLConffileManagementItem
582 ) -> None:
583 self._insert_item(MK_CONFFILE_MANAGEMENT, conffile_management_item)
586class AbstractMutableYAMLInstallRule(AbstractYAMLDictSubStore):
587 @property
588 def _container(self) -> dict[str, Any]:
589 assert len(self._store) == 1
590 return next(iter(self._store.values()))
592 @property
593 def into(self) -> list[str] | None:
594 v = self._container[MK_INSTALLATIONS_INSTALL_INTO]
595 if v is None:
596 return None
597 if isinstance(v, str):
598 return [v]
599 return v
601 @into.setter
602 def into(self, new_value: str | list[str] | None) -> None:
603 if new_value is None: 603 ↛ 607line 603 didn't jump to line 607 because the condition on line 603 was always true
604 with suppress(KeyError):
605 del self._container[MK_INSTALLATIONS_INSTALL_INTO]
606 return
607 if isinstance(new_value, str):
608 self._container[MK_INSTALLATIONS_INSTALL_INTO] = new_value
609 return
610 new_list = CommentedSeq(new_value)
611 self._container[MK_INSTALLATIONS_INSTALL_INTO] = new_list
613 @property
614 def when(self) -> str | Mapping[str, Any] | None:
615 return self._container[MK_CONDITION_WHEN]
617 @when.setter
618 def when(self, new_value: str | Mapping[str, Any] | None) -> None:
619 if new_value is None: 619 ↛ 620line 619 didn't jump to line 620 because the condition on line 619 was never true
620 with suppress(KeyError):
621 del self._container[MK_CONDITION_WHEN]
622 return
623 if isinstance(new_value, str): 623 ↛ 624line 623 didn't jump to line 624 because the condition on line 623 was never true
624 self._container[MK_CONDITION_WHEN] = new_value
625 return
626 new_map = CommentedMap(new_value)
627 self._container[MK_CONDITION_WHEN] = new_map
629 @classmethod
630 def install_dest(
631 cls,
632 sources: str | list[str],
633 into: str | list[str] | None,
634 *,
635 dest_dir: str | None = None,
636 when: str | Mapping[str, Any] | None = None,
637 ) -> "MutableYAMLInstallRuleInstall":
638 k = MK_INSTALLATIONS_INSTALL_SOURCES
639 if isinstance(sources, str):
640 k = MK_INSTALLATIONS_INSTALL_SOURCE
641 r = MutableYAMLInstallRuleInstall(
642 None,
643 None,
644 store=CommentedMap(
645 {
646 MK_INSTALLATIONS_INSTALL: CommentedMap(
647 {
648 k: sources,
649 }
650 )
651 }
652 ),
653 )
654 r.dest_dir = dest_dir
655 r.into = into
656 if when is not None:
657 r.when = when
658 return r
660 @classmethod
661 def multi_dest_install(
662 cls,
663 sources: str | list[str],
664 dest_dirs: Sequence[str],
665 into: str | list[str] | None,
666 *,
667 when: str | Mapping[str, Any] | None = None,
668 ) -> "MutableYAMLInstallRuleInstall":
669 k = MK_INSTALLATIONS_INSTALL_SOURCES
670 if isinstance(sources, str): 670 ↛ 672line 670 didn't jump to line 672 because the condition on line 670 was always true
671 k = MK_INSTALLATIONS_INSTALL_SOURCE
672 r = MutableYAMLInstallRuleInstall(
673 None,
674 None,
675 store=CommentedMap(
676 {
677 MK_INSTALLATIONS_MULTI_DEST_INSTALL: CommentedMap(
678 {
679 k: sources,
680 "dest-dirs": dest_dirs,
681 }
682 )
683 }
684 ),
685 )
686 r.into = into
687 if when is not None: 687 ↛ 688line 687 didn't jump to line 688 because the condition on line 687 was never true
688 r.when = when
689 return r
691 @classmethod
692 def install_as(
693 cls,
694 source: str,
695 install_as: str,
696 into: str | list[str] | None,
697 when: str | Mapping[str, Any] | None = None,
698 ) -> "MutableYAMLInstallRuleInstall":
699 r = MutableYAMLInstallRuleInstall(
700 None,
701 None,
702 store=CommentedMap(
703 {
704 MK_INSTALLATIONS_INSTALL: CommentedMap(
705 {
706 MK_INSTALLATIONS_INSTALL_SOURCE: source,
707 MK_INSTALLATIONS_INSTALL_AS: install_as,
708 }
709 )
710 }
711 ),
712 )
713 r.into = into
714 if when is not None: 714 ↛ 715line 714 didn't jump to line 715 because the condition on line 714 was never true
715 r.when = when
716 return r
718 @classmethod
719 def install_doc_as(
720 cls,
721 source: str,
722 install_as: str,
723 into: str | list[str] | None,
724 when: str | Mapping[str, Any] | None = None,
725 ) -> "MutableYAMLInstallRuleInstall":
726 r = MutableYAMLInstallRuleInstall(
727 None,
728 None,
729 store=CommentedMap(
730 {
731 MK_INSTALLATIONS_INSTALL_DOCS: CommentedMap(
732 {
733 MK_INSTALLATIONS_INSTALL_SOURCE: source,
734 MK_INSTALLATIONS_INSTALL_AS: install_as,
735 }
736 )
737 }
738 ),
739 )
740 r.into = into
741 if when is not None:
742 r.when = when
743 return r
745 @classmethod
746 def install_docs(
747 cls,
748 sources: str | list[str],
749 into: str | list[str] | None,
750 *,
751 dest_dir: str | None = None,
752 when: str | Mapping[str, Any] | None = None,
753 ) -> "MutableYAMLInstallRuleInstall":
754 k = MK_INSTALLATIONS_INSTALL_SOURCES
755 if isinstance(sources, str):
756 k = MK_INSTALLATIONS_INSTALL_SOURCE
757 r = MutableYAMLInstallRuleInstall(
758 None,
759 None,
760 store=CommentedMap(
761 {
762 MK_INSTALLATIONS_INSTALL_DOCS: CommentedMap(
763 {
764 k: sources,
765 }
766 )
767 }
768 ),
769 )
770 r.into = into
771 r.dest_dir = dest_dir
772 if when is not None:
773 r.when = when
774 return r
776 @classmethod
777 def install_examples(
778 cls,
779 sources: str | list[str],
780 into: str | list[str] | None,
781 when: str | Mapping[str, Any] | None = None,
782 ) -> "MutableYAMLInstallRuleInstallExamples":
783 k = MK_INSTALLATIONS_INSTALL_SOURCES
784 if isinstance(sources, str):
785 k = MK_INSTALLATIONS_INSTALL_SOURCE
786 r = MutableYAMLInstallRuleInstallExamples(
787 None,
788 None,
789 store=CommentedMap(
790 {
791 MK_INSTALLATIONS_INSTALL_EXAMPLES: CommentedMap(
792 {
793 k: sources,
794 }
795 )
796 }
797 ),
798 )
799 r.into = into
800 if when is not None: 800 ↛ 801line 800 didn't jump to line 801 because the condition on line 800 was never true
801 r.when = when
802 return r
804 @classmethod
805 def install_man(
806 cls,
807 sources: str | list[str],
808 into: str | list[str] | None,
809 language: str | None,
810 when: str | Mapping[str, Any] | None = None,
811 ) -> "MutableYAMLInstallRuleMan":
812 k = MK_INSTALLATIONS_INSTALL_SOURCES
813 if isinstance(sources, str): 813 ↛ 814line 813 didn't jump to line 814 because the condition on line 813 was never true
814 k = MK_INSTALLATIONS_INSTALL_SOURCE
815 r = MutableYAMLInstallRuleMan(
816 None,
817 None,
818 store=CommentedMap(
819 {
820 MK_INSTALLATIONS_INSTALL_MAN: CommentedMap(
821 {
822 k: sources,
823 }
824 )
825 }
826 ),
827 )
828 r.language = language
829 r.into = into
830 if when is not None: 830 ↛ 831line 830 didn't jump to line 831 because the condition on line 830 was never true
831 r.when = when
832 return r
834 @classmethod
835 def discard(
836 cls,
837 sources: str | list[str],
838 ) -> "MutableYAMLInstallRuleDiscard":
839 return MutableYAMLInstallRuleDiscard(
840 None,
841 None,
842 store=CommentedMap({MK_INSTALLATIONS_DISCARD: sources}),
843 )
846class MutableYAMLInstallRuleInstallExamples(AbstractMutableYAMLInstallRule):
847 pass
850class MutableYAMLInstallRuleMan(AbstractMutableYAMLInstallRule):
851 @property
852 def language(self) -> str | None:
853 return self._container[MK_INSTALLATIONS_INSTALL_MAN_LANGUAGE]
855 @language.setter
856 def language(self, new_value: str | None) -> None:
857 if new_value is not None:
858 self._container[MK_INSTALLATIONS_INSTALL_MAN_LANGUAGE] = new_value
859 return
860 with suppress(KeyError):
861 del self._container[MK_INSTALLATIONS_INSTALL_MAN_LANGUAGE]
864class MutableYAMLInstallRuleDiscard(AbstractMutableYAMLInstallRule):
865 pass
868class MutableYAMLInstallRuleInstall(AbstractMutableYAMLInstallRule):
869 @property
870 def sources(self) -> list[str]:
871 v = self._container[MK_INSTALLATIONS_INSTALL_SOURCES]
872 if isinstance(v, str):
873 return [v]
874 return v
876 @sources.setter
877 def sources(self, new_value: str | list[str]) -> None:
878 if isinstance(new_value, str):
879 self._container[MK_INSTALLATIONS_INSTALL_SOURCES] = new_value
880 return
881 new_list = CommentedSeq(new_value)
882 self._container[MK_INSTALLATIONS_INSTALL_SOURCES] = new_list
884 @property
885 def dest_dir(self) -> str | None:
886 return self._container.get(MK_INSTALLATIONS_INSTALL_DEST_DIR)
888 @dest_dir.setter
889 def dest_dir(self, new_value: str | None) -> None:
890 if new_value is not None and self.dest_as is not None: 890 ↛ 891line 890 didn't jump to line 891 because the condition on line 890 was never true
891 raise ValueError(
892 f'Cannot both have a "{MK_INSTALLATIONS_INSTALL_DEST_DIR}" and'
893 f' "{MK_INSTALLATIONS_INSTALL_AS}"'
894 )
895 if new_value is not None:
896 self._container[MK_INSTALLATIONS_INSTALL_DEST_DIR] = new_value
897 else:
898 with suppress(KeyError):
899 del self._container[MK_INSTALLATIONS_INSTALL_DEST_DIR]
901 @property
902 def dest_as(self) -> str | None:
903 return self._container.get(MK_INSTALLATIONS_INSTALL_AS)
905 @dest_as.setter
906 def dest_as(self, new_value: str | None) -> None:
907 if new_value is not None:
908 if self.dest_dir is not None:
909 raise ValueError(
910 f'Cannot both have a "{MK_INSTALLATIONS_INSTALL_DEST_DIR}" and'
911 f' "{MK_INSTALLATIONS_INSTALL_AS}"'
912 )
914 sources = self._container[MK_INSTALLATIONS_INSTALL_SOURCES]
915 if isinstance(sources, list):
916 if len(sources) != 1:
917 raise ValueError(
918 f'Cannot have "{MK_INSTALLATIONS_INSTALL_AS}" when'
919 f' "{MK_INSTALLATIONS_INSTALL_SOURCES}" is not exactly one item'
920 )
921 self.sources = sources[0]
922 self._container[MK_INSTALLATIONS_INSTALL_AS] = new_value
923 else:
924 with suppress(KeyError):
925 del self._container[MK_INSTALLATIONS_INSTALL_AS]
928class MutableYAMLInstallationsDefinition(AbstractYAMLListSubStore[Any]):
929 def append(self, install_rule: AbstractMutableYAMLInstallRule) -> None:
930 parent_store = self._store
931 if not install_rule._is_detached or ( 931 ↛ 935line 931 didn't jump to line 935 because the condition on line 931 was never true
932 install_rule._parent_store is not None
933 and install_rule._parent_store is not parent_store
934 ):
935 raise RuntimeError(
936 "Item is already attached or associated with a different container"
937 )
938 self.create_definition_if_missing()
939 install_rule._parent_store = parent_store
940 install_rule.create_definition()
942 def extend(self, install_rules: Iterable[AbstractMutableYAMLInstallRule]) -> None:
943 parent_store = self._store
944 for install_rule in install_rules:
945 if not install_rule._is_detached or ( 945 ↛ 949line 945 didn't jump to line 949 because the condition on line 945 was never true
946 install_rule._parent_store is not None
947 and install_rule._parent_store is not parent_store
948 ):
949 raise RuntimeError(
950 "Item is already attached or associated with a different container"
951 )
952 self.create_definition_if_missing()
953 install_rule._parent_store = parent_store
954 install_rule.create_definition()
957class MutableYAMLManifestVariables(AbstractYAMLDictSubStore):
958 @property
959 def variables(self) -> dict[str, Any]:
960 return self._store
962 def __setitem__(self, key: str, value: Any) -> None:
963 self._store[key] = value
964 self.create_definition_if_missing()
967class MutableYAMLManifestDefinitions(AbstractYAMLDictSubStore):
968 def manifest_variables(
969 self, *, create_if_absent: bool = True
970 ) -> MutableYAMLManifestVariables:
971 d = MutableYAMLManifestVariables(self._store, MK_MANIFEST_VARIABLES)
972 if create_if_absent: 972 ↛ 973line 972 didn't jump to line 973 because the condition on line 972 was never true
973 d.create_definition_if_missing()
974 return d
977class MutableYAMLRemoveDuringCleanDefinitions(AbstractYAMLListSubStore[str]):
978 def append(self, rule: str) -> None:
979 self.create_definition_if_missing()
980 self._store.append(rule)
982 def __len__(self) -> int:
983 return len(self._store)
985 def extend(self, rules: Iterable[str]) -> None:
986 it = iter(rules)
987 try:
988 first_rule = next(it)
989 except StopIteration:
990 return
991 self.create_definition_if_missing()
992 self._store.append(first_rule)
993 self._store.extend(it)
996class MutableYAMLManifest:
997 def __init__(self, store: Any) -> None:
998 self._store = store
1000 @classmethod
1001 def empty_manifest(cls) -> "MutableYAMLManifest":
1002 return cls(CommentedMap({MK_MANIFEST_VERSION: DEFAULT_MANIFEST_VERSION}))
1004 @property
1005 def manifest_version(self) -> str:
1006 return self._store[MK_MANIFEST_VERSION]
1008 @manifest_version.setter
1009 def manifest_version(self, version: str) -> None:
1010 if version not in SUPPORTED_MANIFEST_VERSIONS:
1011 raise ValueError("Unsupported version")
1012 self._store[MK_MANIFEST_VERSION] = version
1014 def remove_during_clean(
1015 self,
1016 *,
1017 create_if_absent: bool = True,
1018 ) -> MutableYAMLRemoveDuringCleanDefinitions:
1019 d = MutableYAMLRemoveDuringCleanDefinitions(
1020 self._store, MK_MANIFEST_REMOVE_DURING_CLEAN
1021 )
1022 if create_if_absent: 1022 ↛ 1023line 1022 didn't jump to line 1023 because the condition on line 1022 was never true
1023 d.create_definition_if_missing()
1024 return d
1026 def installations(
1027 self,
1028 *,
1029 create_if_absent: bool = True,
1030 ) -> MutableYAMLInstallationsDefinition:
1031 d = MutableYAMLInstallationsDefinition(self._store, MK_INSTALLATIONS)
1032 if create_if_absent: 1032 ↛ 1033line 1032 didn't jump to line 1033 because the condition on line 1032 was never true
1033 d.create_definition_if_missing()
1034 return d
1036 def manifest_definitions(
1037 self,
1038 *,
1039 create_if_absent: bool = True,
1040 ) -> MutableYAMLManifestDefinitions:
1041 d = MutableYAMLManifestDefinitions(self._store, MK_MANIFEST_DEFINITIONS)
1042 if create_if_absent: 1042 ↛ 1043line 1042 didn't jump to line 1043 because the condition on line 1042 was never true
1043 d.create_definition_if_missing()
1044 return d
1046 def package(
1047 self, name: str, *, create_if_absent: bool = True
1048 ) -> MutableYAMLPackageDefinition:
1049 if MK_PACKAGES not in self._store: 1049 ↛ 1051line 1049 didn't jump to line 1051 because the condition on line 1049 was always true
1050 self._store[MK_PACKAGES] = CommentedMap()
1051 packages_store = self._store[MK_PACKAGES]
1052 package = packages_store.get(name)
1053 if package is None: 1053 ↛ 1060line 1053 didn't jump to line 1060 because the condition on line 1053 was always true
1054 if not create_if_absent: 1054 ↛ 1055line 1054 didn't jump to line 1055 because the condition on line 1054 was never true
1055 raise KeyError(name)
1056 assert packages_store is not None
1057 d = MutableYAMLPackageDefinition(packages_store, name)
1058 d.create_definition()
1059 else:
1060 d = MutableYAMLPackageDefinition(packages_store, name)
1061 return d
1063 def write_to(self, fd) -> None:
1064 MANIFEST_YAML.dump(self._store, fd)
1067def _describe_missing_path(entry: VirtualPath) -> str:
1068 if entry.is_dir:
1069 return f"{entry.fs_path}/ (empty directory; possible integration point)"
1070 if entry.is_symlink:
1071 target = os.readlink(entry.fs_path)
1072 return f"{entry.fs_path} (symlink; links to {target})"
1073 if entry.is_file:
1074 return f"{entry.fs_path} (file)"
1075 return f"{entry.fs_path} (other!? Probably not supported by debputy and may need a `remove`)"
1078def _detect_missing_installations(
1079 path_matcher: SourcePathMatcher,
1080 search_dir: VirtualPath,
1081) -> None:
1082 if not search_dir.is_dir: 1082 ↛ 1083line 1082 didn't jump to line 1083 because the condition on line 1082 was never true
1083 return
1084 missing = list(path_matcher.detect_missing(search_dir))
1085 if not missing:
1086 return
1088 excl = textwrap.dedent(
1089 """\
1090 - discard: "*"
1091 """
1092 )
1094 raise PathNotCoveredByInstallRulesError(
1095 "Please review the list and add either install rules or exclusions to `installations` in"
1096 " debian/debputy.manifest. If you do not need any of these paths, add the following to the"
1097 f" end of your 'installations`:\n\n{excl}\n",
1098 missing,
1099 search_dir,
1100 )
1103def _list_automatic_discard_rules(path_matcher: SourcePathMatcher) -> None:
1104 used_discard_rules = path_matcher.used_auto_discard_rules
1105 # Discard rules can match and then be overridden. In that case, they appear
1106 # but have 0 matches.
1107 if not sum((len(v) for v in used_discard_rules.values()), 0):
1108 return
1109 _info("The following automatic discard rules were triggered:")
1110 example_path: str | None = None
1111 for rule in sorted(used_discard_rules):
1112 for fs_path in sorted(used_discard_rules[rule]):
1113 if example_path is None: 1113 ↛ 1115line 1113 didn't jump to line 1115 because the condition on line 1113 was always true
1114 example_path = fs_path
1115 _info(f" * {rule} -> {fs_path}")
1116 assert example_path is not None
1117 _info("")
1118 _info(
1119 "Note that some of these may have been overruled. The overrule detection logic is not"
1120 )
1121 _info("100% reliable.")
1122 _info("")
1123 _info(
1124 "You can overrule an automatic discard rule by explicitly listing the path. As an example:"
1125 )
1126 _info(" installations:")
1127 _info(" - install:")
1128 _info(f" source: {example_path}")
1131def _install_everything_from_source_dir_if_present(
1132 dctrl_bin: BinaryPackage,
1133 substitution: Substitution,
1134 path_matcher: SourcePathMatcher,
1135 install_rule_context: InstallRuleContext,
1136 source_condition_context: ConditionContext,
1137 source_dir: VirtualPath,
1138 *,
1139 into_dir: VirtualPath | None = None,
1140) -> None:
1141 attribute_path = AttributePath.builtin_path()[f"installing {source_dir.fs_path}"]
1142 pkg_set = frozenset([dctrl_bin])
1143 install_rule = run_in_context_of_plugin(
1144 "debputy",
1145 InstallRule.install_dest,
1146 [FileSystemMatchRule.from_path_match("*", attribute_path, substitution)],
1147 None,
1148 pkg_set,
1149 f"Built-in; install everything from {source_dir.fs_path} into {dctrl_bin.name}",
1150 None,
1151 )
1152 pkg_search_dir: tuple[SearchDir] = (
1153 SearchDir(
1154 source_dir,
1155 pkg_set,
1156 ),
1157 )
1158 replacements = {
1159 "search_dirs": pkg_search_dir,
1160 }
1161 if into_dir is not None: 1161 ↛ 1162line 1161 didn't jump to line 1162 because the condition on line 1161 was never true
1162 binary_package_contexts = dict(install_rule_context.binary_package_contexts)
1163 updated = binary_package_contexts[dctrl_bin.name].replace(fs_root=into_dir)
1164 binary_package_contexts[dctrl_bin.name] = updated
1165 replacements["binary_package_contexts"] = binary_package_contexts
1167 fake_install_rule_context = install_rule_context.replace(**replacements)
1168 try:
1169 install_rule.perform_install(
1170 path_matcher,
1171 fake_install_rule_context,
1172 source_condition_context,
1173 )
1174 except (
1175 NoMatchForInstallPatternError,
1176 PathAlreadyInstalledOrDiscardedError,
1177 ):
1178 # Empty directory or everything excluded by default; ignore the error
1179 pass
1182def _add_build_install_dirs_to_per_package_search_dirs(
1183 build_system_install_dirs: Sequence[tuple[str, frozenset[BinaryPackage]]],
1184 per_package_search_dirs: dict[BinaryPackage, list[VirtualPath]],
1185 as_path: Callable[[str], VirtualPath],
1186) -> None:
1187 seen_pp_search_dirs: set[tuple[BinaryPackage, str]] = set()
1188 for dest_dir, for_packages in build_system_install_dirs:
1189 dest_path = as_path(dest_dir)
1190 for pkg in for_packages:
1191 seen_key = (pkg, dest_dir)
1192 if seen_key in seen_pp_search_dirs:
1193 continue
1194 seen_pp_search_dirs.add(seen_key)
1195 if pkg not in per_package_search_dirs:
1196 per_package_search_dirs[pkg] = [dest_path]
1197 else:
1198 per_package_search_dirs[pkg].append(dest_path)
1201class HighLevelManifest:
1202 def __init__(
1203 self,
1204 manifest_path: str,
1205 mutable_manifest: MutableYAMLManifest | None,
1206 remove_during_clean_rules: list[FileSystemMatchRule],
1207 install_rules: list[InstallRule] | None,
1208 source_package: SourcePackage,
1209 binary_packages: Mapping[str, BinaryPackage],
1210 substitution: Substitution,
1211 package_transformations: Mapping[str, PackageTransformationDefinition],
1212 dpkg_architecture_variables: DpkgArchitectureBuildProcessValuesTable,
1213 dpkg_arch_query_table: DpkgArchTable,
1214 build_env: DebBuildOptionsAndProfiles,
1215 build_environments: BuildEnvironments,
1216 build_rules: list[BuildRule] | None,
1217 value_table: Mapping[
1218 tuple[SourcePackage | BinaryPackage, type[Any]],
1219 Any,
1220 ],
1221 plugin_provided_feature_set: PluginProvidedFeatureSet,
1222 debian_dir: VirtualPath,
1223 ) -> None:
1224 self.manifest_path = manifest_path
1225 self.mutable_manifest = mutable_manifest
1226 self._remove_during_clean_rules: list[FileSystemMatchRule] = (
1227 remove_during_clean_rules
1228 )
1229 self._install_rules = install_rules
1230 self.source_package = source_package
1231 self._binary_packages = binary_packages
1232 self.substitution = substitution
1233 self.package_transformations = package_transformations
1234 self._dpkg_architecture_variables = dpkg_architecture_variables
1235 self.dpkg_arch_query_table = dpkg_arch_query_table
1236 self._build_env = build_env
1237 self._used_for: set[str] = set()
1238 self.build_environments = build_environments
1239 self.build_rules = build_rules
1240 self._value_table = value_table
1241 self._plugin_provided_feature_set = plugin_provided_feature_set
1242 self._debian_dir = debian_dir
1243 self._source_condition_context = ConditionContext(
1244 binary_package=None,
1245 substitution=self.substitution,
1246 deb_options_and_profiles=self._build_env,
1247 dpkg_architecture_variables=self._dpkg_architecture_variables,
1248 dpkg_arch_query_table=self.dpkg_arch_query_table,
1249 )
1251 def source_version(self, include_binnmu_version: bool = True) -> str:
1252 # TODO: There should an easier way to determine the source version; really.
1253 version_var = "{{DEB_VERSION}}"
1254 if not include_binnmu_version:
1255 version_var = "{{_DEBPUTY_INTERNAL_NON_BINNMU_SOURCE}}"
1256 try:
1257 return self.substitution.substitute(
1258 version_var, "internal (resolve version)"
1259 )
1260 except DebputySubstitutionError as e:
1261 raise AssertionError(f"Could not resolve {version_var}") from e
1263 @property
1264 def source_condition_context(self) -> ConditionContext:
1265 return self._source_condition_context
1267 @property
1268 def debian_dir(self) -> VirtualPath:
1269 return self._debian_dir
1271 @property
1272 def dpkg_architecture_variables(self) -> DpkgArchitectureBuildProcessValuesTable:
1273 return self._dpkg_architecture_variables
1275 @property
1276 def deb_options_and_profiles(self) -> DebBuildOptionsAndProfiles:
1277 return self._build_env
1279 @property
1280 def plugin_provided_feature_set(self) -> PluginProvidedFeatureSet:
1281 return self._plugin_provided_feature_set
1283 @property
1284 def remove_during_clean_rules(self) -> list[FileSystemMatchRule]:
1285 return self._remove_during_clean_rules
1287 @property
1288 def active_packages(self) -> Iterable[BinaryPackage]:
1289 yield from (p for p in self._binary_packages.values() if p.should_be_acted_on)
1291 @property
1292 def all_packages(self) -> Iterable[BinaryPackage]:
1293 yield from self._binary_packages.values()
1295 def manifest_configuration[T](
1296 self,
1297 context_package: SourcePackage | BinaryPackage,
1298 value_type: type[T],
1299 ) -> T | None:
1300 res = self._value_table.get((context_package, value_type))
1301 return typing.cast("T | None", res)
1303 def package_state_for(self, package: str) -> PackageTransformationDefinition:
1304 return self.package_transformations[package]
1306 def _detect_doc_main_package_for(self, package: BinaryPackage) -> BinaryPackage:
1307 name = package.name
1308 for doc_main_field in ("Doc-Main-Package", "X-Doc-Main-Package"):
1309 doc_main_package_name = package.fields.get(doc_main_field)
1310 if doc_main_package_name: 1310 ↛ 1311line 1310 didn't jump to line 1311 because the condition on line 1310 was never true
1311 main_package = self._binary_packages.get(doc_main_package_name)
1312 if main_package is None:
1313 _error(
1314 f"Invalid Doc-Main-Package for {name}: The package {doc_main_package_name!r} is not listed in d/control"
1315 )
1316 return main_package
1317 # If it is not a -doc package, then docs should be installed
1318 # under its own package name.
1319 if not name.endswith("-doc"): 1319 ↛ 1321line 1319 didn't jump to line 1321 because the condition on line 1319 was always true
1320 return package
1321 name = name[:-4]
1322 main_package = self._binary_packages.get(name)
1323 if main_package:
1324 return main_package
1325 if name.startswith("lib"):
1326 dev_pkg = self._binary_packages.get(f"{name}-dev")
1327 if dev_pkg:
1328 return dev_pkg
1330 # If we found no better match; default to the doc package itself.
1331 return package
1333 def perform_installations(
1334 self,
1335 integration_mode: DebputyIntegrationMode,
1336 build_system_install_dirs: Sequence[tuple[str, frozenset[BinaryPackage]]],
1337 *,
1338 install_request_context: InstallSearchDirContext | None = None,
1339 ) -> PackageDataTable:
1340 package_data_dict = {}
1341 package_data_table = PackageDataTable(package_data_dict)
1342 enable_manifest_installation_feature = (
1343 integration_mode != INTEGRATION_MODE_DH_DEBPUTY_RRR
1344 )
1346 if build_system_install_dirs: 1346 ↛ 1347line 1346 didn't jump to line 1347 because the condition on line 1346 was never true
1347 if integration_mode != INTEGRATION_MODE_FULL:
1348 raise ValueError(
1349 "The build_system_install_dirs parameter can only be used in full integration mode"
1350 )
1351 if install_request_context:
1352 raise ValueError(
1353 "The build_system_install_dirs parameter cannot be used with install_request_context"
1354 " (not implemented)"
1355 )
1357 if install_request_context is None: 1357 ↛ 1359line 1357 didn't jump to line 1359 because the condition on line 1357 was never true
1359 @functools.lru_cache(None)
1360 def _as_path(fs_path: str) -> VirtualPath:
1361 return OSFSROOverlay.create_root_dir(".", fs_path)
1363 dtmp_dir = _as_path("debian/tmp")
1364 source_root_dir = _as_path(".")
1365 into = frozenset(self._binary_packages.values())
1366 per_package_search_dirs = {
1367 t.binary_package: [_as_path(f.match_rule.path) for f in t.search_dirs]
1368 for t in self.package_transformations.values()
1369 if t.search_dirs is not None
1370 }
1372 if integration_mode == INTEGRATION_MODE_FULL:
1373 # In this mode, we have no default search dir. Everything ends up being
1374 # per-package instead (since that is easier logic-wise).
1375 #
1376 # Even dtmp_dir is omitted here (since it is not universally applicable).
1377 # Note we still initialize dtmp_dir, since it affects a later guard.
1378 default_search_dirs = []
1379 _add_build_install_dirs_to_per_package_search_dirs(
1380 build_system_install_dirs,
1381 per_package_search_dirs,
1382 _as_path,
1383 )
1385 # We can end here with per_package_search_dirs having no search dirs for any package
1386 # (single binary, where everything is installed into d/<pkg> is the most common case).
1387 #
1388 # This is not a problem in itself as the installation rules can still apply to the
1389 # source root and there should be no reason to install something from d/<pkg> into
1390 # d/<another-pkg>
1391 else:
1392 default_search_dirs = [dtmp_dir]
1394 search_dirs = _determine_search_dir_order(
1395 per_package_search_dirs,
1396 into,
1397 default_search_dirs,
1398 source_root_dir,
1399 )
1400 check_for_uninstalled_dirs = tuple(
1401 s.search_dir
1402 for s in search_dirs
1403 if s.search_dir.fs_path != source_root_dir.fs_path
1404 )
1405 if enable_manifest_installation_feature:
1406 _present_installation_dirs(
1407 search_dirs, check_for_uninstalled_dirs, into
1408 )
1409 else:
1410 dtmp_dir = None
1411 search_dirs = install_request_context.search_dirs
1412 into = frozenset(self._binary_packages.values())
1413 seen: set[BinaryPackage] = set()
1414 for search_dir in search_dirs:
1415 seen.update(search_dir.applies_to)
1417 missing = into - seen
1418 if missing: 1418 ↛ 1419line 1418 didn't jump to line 1419 because the condition on line 1418 was never true
1419 names = ", ".join(p.name for p in missing)
1420 raise ValueError(
1421 f"The following package(s) had no search dirs: {names}."
1422 " (Generally, the source root would be applicable to all packages)"
1423 )
1424 extra_names = seen - into
1425 if extra_names: 1425 ↛ 1426line 1425 didn't jump to line 1426 because the condition on line 1425 was never true
1426 names = ", ".join(p.name for p in extra_names)
1427 raise ValueError(
1428 f"The install_request_context referenced the following unknown package(s): {names}"
1429 )
1431 check_for_uninstalled_dirs = (
1432 install_request_context.check_for_uninstalled_dirs
1433 )
1435 install_rule_context = InstallRuleContext(search_dirs)
1437 if ( 1437 ↛ 1444line 1437 didn't jump to line 1444 because the condition on line 1437 was never true
1438 enable_manifest_installation_feature
1439 and self._install_rules is None
1440 # TODO: Should we also do this for full mode when build systems provided search dirs?
1441 and dtmp_dir is not None
1442 and os.path.isdir(dtmp_dir.fs_path)
1443 ):
1444 msg = (
1445 "The build system appears to have provided the output of upstream build system's"
1446 " install in debian/tmp. However, these are no provisions for debputy to install"
1447 " any of that into any of the debian packages listed in debian/control."
1448 " To avoid accidentally creating empty packages, debputy will insist that you "
1449 " explicitly define an empty installation definition if you did not want to "
1450 " install any of those files even though they have been provided."
1451 ' Example: "installations: []"'
1452 )
1453 _error(msg)
1454 elif ( 1454 ↛ 1457line 1454 didn't jump to line 1457 because the condition on line 1454 was never true
1455 not enable_manifest_installation_feature and self._install_rules is not None
1456 ):
1457 _error(
1458 f"The `installations` feature cannot be used in {self.manifest_path} with this integration mode."
1459 f" Please remove or comment out the `installations` keyword."
1460 )
1462 for dctrl_bin in self.all_packages:
1463 package = dctrl_bin.name
1464 doc_main_package = self._detect_doc_main_package_for(dctrl_bin)
1466 install_rule_context[package] = BinaryPackageInstallRuleContext(
1467 dctrl_bin,
1468 InMemoryVirtualRootDir(),
1469 doc_main_package,
1470 )
1472 if enable_manifest_installation_feature: 1472 ↛ 1477line 1472 didn't jump to line 1477 because the condition on line 1472 was always true
1473 discard_rules = list(
1474 self.plugin_provided_feature_set.auto_discard_rules.values()
1475 )
1476 else:
1477 discard_rules = [
1478 self.plugin_provided_feature_set.auto_discard_rules["debian-dir"]
1479 ]
1480 path_matcher = SourcePathMatcher(discard_rules)
1482 source_condition_context = self._source_condition_context
1484 for dctrl_bin in self.active_packages:
1485 package = dctrl_bin.name
1486 if install_request_context: 1486 ↛ 1491line 1486 didn't jump to line 1491 because the condition on line 1486 was always true
1487 build_system_staging_dir = install_request_context.debian_pkg_dirs.get(
1488 package
1489 )
1490 else:
1491 build_system_staging_dir_fs_path = os.path.join("debian", package)
1492 if os.path.isdir(build_system_staging_dir_fs_path):
1493 build_system_staging_dir = OSFSROOverlay.create_root_dir(
1494 ".",
1495 build_system_staging_dir_fs_path,
1496 )
1497 else:
1498 build_system_staging_dir = None
1500 if build_system_staging_dir is not None:
1501 _install_everything_from_source_dir_if_present(
1502 dctrl_bin,
1503 self.substitution,
1504 path_matcher,
1505 install_rule_context,
1506 source_condition_context,
1507 build_system_staging_dir,
1508 )
1510 if self._install_rules:
1511 # FIXME: Check that every install rule remains used after transformations have run.
1512 # What we want to check is transformations do not exclude everything from an install
1513 # rule. The hard part here is that renaming (etc.) is fine, so we cannot 1:1 string
1514 # match.
1515 for install_rule in self._install_rules:
1516 install_rule.perform_install(
1517 path_matcher,
1518 install_rule_context,
1519 source_condition_context,
1520 )
1522 if enable_manifest_installation_feature: 1522 ↛ 1526line 1522 didn't jump to line 1526 because the condition on line 1522 was always true
1523 for search_dir in check_for_uninstalled_dirs:
1524 _detect_missing_installations(path_matcher, search_dir)
1526 for dctrl_bin in self.all_packages:
1527 package = dctrl_bin.name
1528 binary_install_rule_context = install_rule_context[package]
1529 build_system_pkg_staging_dir = os.path.join("debian", package)
1530 fs_root = binary_install_rule_context.fs_root
1532 context = self.package_transformations[package]
1533 if dctrl_bin.should_be_acted_on and enable_manifest_installation_feature:
1534 for special_install_rule in context.install_rules: 1534 ↛ 1535line 1534 didn't jump to line 1535 because the loop on line 1534 never started
1535 special_install_rule.perform_install(
1536 path_matcher,
1537 install_rule_context,
1538 source_condition_context,
1539 )
1541 if dctrl_bin.should_be_acted_on:
1542 self.apply_fs_transformations(package, fs_root)
1543 substvars_file = f"debian/{package}.substvars"
1544 substvars = FlushableSubstvars.load_from_path(
1545 substvars_file, missing_ok=True
1546 )
1547 # We do not want to touch the substvars file (non-clean rebuild contamination)
1548 substvars.substvars_path = None
1549 else:
1550 substvars = FlushableSubstvars()
1552 udeb_package = self._binary_packages.get(f"{package}-udeb")
1553 if udeb_package and not udeb_package.is_udeb: 1553 ↛ 1554line 1553 didn't jump to line 1554 because the condition on line 1553 was never true
1554 udeb_package = None
1556 package_metadata_context = PackageProcessingContextProvider(
1557 self,
1558 dctrl_bin,
1559 udeb_package,
1560 package_data_table,
1561 )
1563 ctrl_creator = BinaryCtrlAccessorProviderCreator(
1564 package_metadata_context,
1565 substvars,
1566 context.maintscript_snippets,
1567 context.substitution,
1568 )
1570 if not enable_manifest_installation_feature: 1570 ↛ 1571line 1570 didn't jump to line 1571 because the condition on line 1570 was never true
1571 assert_no_dbgsym_migration(dctrl_bin)
1572 dh_dbgsym_root_fs = dhe_dbgsym_root_dir(dctrl_bin)
1573 dh_dbgsym_root_path = OSFSROOverlay.create_root_dir(
1574 "",
1575 dh_dbgsym_root_fs,
1576 )
1577 dbgsym_root_fs = InMemoryVirtualRootDir()
1578 _install_everything_from_source_dir_if_present(
1579 dctrl_bin,
1580 self.substitution,
1581 path_matcher,
1582 install_rule_context,
1583 source_condition_context,
1584 dh_dbgsym_root_path,
1585 into_dir=dbgsym_root_fs,
1586 )
1587 dbgsym_build_ids = read_dbgsym_file(dctrl_bin)
1588 dbgsym_info = DbgsymInfo(
1589 dctrl_bin,
1590 dbgsym_root_fs,
1591 os.path.join(dh_dbgsym_root_fs, "DEBIAN"),
1592 dbgsym_build_ids,
1593 # TODO: Provide manifest feature to support this.
1594 False,
1595 )
1596 else:
1597 dbgsym_info = DbgsymInfo(
1598 dctrl_bin,
1599 InMemoryVirtualRootDir(),
1600 None,
1601 [],
1602 False,
1603 )
1605 package_data_dict[package] = BinaryPackageData(
1606 self.source_package,
1607 dctrl_bin,
1608 build_system_pkg_staging_dir,
1609 fs_root,
1610 substvars,
1611 package_metadata_context,
1612 ctrl_creator,
1613 dbgsym_info,
1614 )
1616 if enable_manifest_installation_feature: 1616 ↛ 1619line 1616 didn't jump to line 1619 because the condition on line 1616 was always true
1617 _list_automatic_discard_rules(path_matcher)
1619 return package_data_table
1621 def condition_context(
1622 self, binary_package: BinaryPackage | str | None
1623 ) -> ConditionContext:
1624 if binary_package is None: 1624 ↛ 1625line 1624 didn't jump to line 1625 because the condition on line 1624 was never true
1625 return self._source_condition_context
1626 if not isinstance(binary_package, str):
1627 binary_package = binary_package.name
1629 package_transformation = self.package_transformations[binary_package]
1630 return self._source_condition_context.replace(
1631 binary_package=package_transformation.binary_package,
1632 substitution=package_transformation.substitution,
1633 )
1635 def apply_fs_transformations(
1636 self,
1637 package: str,
1638 fs_root: InMemoryVirtualPathBase,
1639 ) -> None:
1640 if package in self._used_for: 1640 ↛ 1641line 1640 didn't jump to line 1641 because the condition on line 1640 was never true
1641 raise ValueError(
1642 f"data.tar contents for {package} has already been finalized!?"
1643 )
1644 if package not in self.package_transformations: 1644 ↛ 1645line 1644 didn't jump to line 1645 because the condition on line 1644 was never true
1645 raise ValueError(
1646 f'The package "{package}" was not relevant for the manifest!?'
1647 )
1648 package_transformation = self.package_transformations[package]
1649 condition_context = ConditionContext(
1650 binary_package=package_transformation.binary_package,
1651 substitution=package_transformation.substitution,
1652 deb_options_and_profiles=self._build_env,
1653 dpkg_architecture_variables=self._dpkg_architecture_variables,
1654 dpkg_arch_query_table=self.dpkg_arch_query_table,
1655 )
1656 norm_rules = list(
1657 builtin_mode_normalization_rules(
1658 self._dpkg_architecture_variables,
1659 package_transformation.binary_package,
1660 package_transformation.substitution,
1661 )
1662 )
1663 norm_mode_transformation_rule = ModeNormalizationTransformationRule(norm_rules)
1664 norm_mode_transformation_rule.transform_file_system(fs_root, condition_context)
1665 for transformation in package_transformation.transformations:
1666 transformation.run_transform_file_system(fs_root, condition_context)
1667 interpreter_normalization = NormalizeShebangLineTransformation()
1668 interpreter_normalization.transform_file_system(fs_root, condition_context)
1670 def finalize_data_tar_contents(
1671 self,
1672 package: str,
1673 fs_root: InMemoryVirtualPathBase,
1674 clamp_mtime_to: int,
1675 ) -> IntermediateManifest:
1676 if package in self._used_for: 1676 ↛ 1677line 1676 didn't jump to line 1677 because the condition on line 1676 was never true
1677 raise ValueError(
1678 f"data.tar contents for {package} has already been finalized!?"
1679 )
1680 if package not in self.package_transformations: 1680 ↛ 1681line 1680 didn't jump to line 1681 because the condition on line 1680 was never true
1681 raise ValueError(
1682 f'The package "{package}" was not relevant for the manifest!?'
1683 )
1684 self._used_for.add(package)
1686 # At this point, there so be no further mutations to the file system (because the will not
1687 # be present in the intermediate manifest)
1688 cast("FSRootDir", fs_root).is_read_write = False
1690 intermediate_manifest = list(
1691 _generate_intermediate_manifest(
1692 fs_root,
1693 clamp_mtime_to,
1694 )
1695 )
1696 return intermediate_manifest
1698 def apply_to_binary_staging_directory(
1699 self,
1700 package: str,
1701 fs_root: InMemoryVirtualPathBase,
1702 clamp_mtime_to: int,
1703 ) -> IntermediateManifest:
1704 self.apply_fs_transformations(package, fs_root)
1705 return self.finalize_data_tar_contents(package, fs_root, clamp_mtime_to)
1708@dataclasses.dataclass(slots=True)
1709class SearchDirOrderState:
1710 search_dir: VirtualPath
1711 applies_to: set[BinaryPackage] = dataclasses.field(default_factory=set)
1712 after: set[str] = dataclasses.field(default_factory=set)
1715def _present_installation_dirs(
1716 search_dirs: Sequence[SearchDir],
1717 checked_missing_dirs: Sequence[VirtualPath],
1718 all_pkgs: frozenset[BinaryPackage],
1719) -> None:
1720 _info("The following directories are considered search dirs (in order):")
1721 max_len = max((len(s.search_dir.fs_path) for s in search_dirs), default=1)
1722 for search_dir in search_dirs:
1723 applies_to = ""
1724 if search_dir.applies_to < all_pkgs:
1725 names = ", ".join(p.name for p in search_dir.applies_to)
1726 applies_to = f" [only applicable to: {names}]"
1727 remark = ""
1728 if not os.path.isdir(search_dir.search_dir.fs_path):
1729 remark = " (skipped; absent)"
1730 _info(f" * {search_dir.search_dir.fs_path:{max_len}}{applies_to}{remark}")
1732 if checked_missing_dirs:
1733 _info('The following directories are considered for "not-installed" paths;')
1734 for d in checked_missing_dirs:
1735 remark = ""
1736 if not os.path.isdir(d.fs_path):
1737 remark = " (skipped; absent)"
1738 _info(f" * {d.fs_path:{max_len}}{remark}")
1741def _determine_search_dir_order(
1742 requested: Mapping[BinaryPackage, list[VirtualPath]],
1743 all_pkgs: frozenset[BinaryPackage],
1744 default_search_dirs: list[VirtualPath],
1745 source_root: VirtualPath,
1746) -> Sequence[SearchDir]:
1747 search_dir_table = dict[str, SearchDirOrderState]()
1748 assert requested.keys() <= all_pkgs
1749 for pkg in all_pkgs:
1750 paths = requested.get(pkg, default_search_dirs)
1751 previous_search_dir: SearchDirOrderState | None = None
1752 for path in paths:
1753 try:
1754 search_dir_state = search_dir_table[path.fs_path]
1755 except KeyError:
1756 search_dir_state = SearchDirOrderState(path)
1757 search_dir_table[path.fs_path] = search_dir_state
1758 search_dir_state.applies_to.add(pkg)
1759 if previous_search_dir is not None:
1760 search_dir_state.after.add(previous_search_dir.search_dir.fs_path)
1761 previous_search_dir = search_dir_state
1763 search_dirs_in_order = []
1764 released = set[str]()
1765 remaining = set()
1766 for search_dir_state in search_dir_table.values():
1767 if not (search_dir_state.after <= released):
1768 remaining.add(search_dir_state.search_dir.fs_path)
1769 continue
1770 search_dirs_in_order.append(search_dir_state)
1771 released.add(search_dir_state.search_dir.fs_path)
1773 while remaining:
1774 current_released = len(released)
1775 for fs_path in remaining:
1776 search_dir_state = search_dir_table[fs_path]
1777 if not search_dir_state.after.issubset(released):
1778 remaining.add(search_dir_state.search_dir.fs_path)
1779 continue
1780 search_dirs_in_order.append(search_dir_state)
1781 released.add(search_dir_state.search_dir.fs_path)
1783 if current_released == len(released):
1784 names = ", ".join(remaining)
1785 _error(
1786 f"There is a circular dependency (somewhere) between the search dirs: {names}."
1787 " Note that the search directories across all packages have to be ordered (and the"
1788 " source root should generally be last)"
1789 )
1790 remaining -= released
1792 search_dirs_in_order.append(
1793 SearchDirOrderState(
1794 source_root,
1795 set(all_pkgs),
1796 )
1797 )
1799 return tuple(
1800 # Avoid duplicating all_pkgs
1801 SearchDir(
1802 s.search_dir,
1803 frozenset(s.applies_to) if s.applies_to != all_pkgs else all_pkgs,
1804 )
1805 for s in search_dirs_in_order
1806 )