Coverage for src/debputy/highlevel_manifest.py: 64%
891 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-12-28 16:12 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-12-28 16:12 +0000
1import dataclasses
2import functools
3import os
4import textwrap
5import typing
6from contextlib import suppress
7from dataclasses import dataclass, field
8from typing import (
9 Any,
10 TypeVar,
11 Generic,
12 cast,
13)
14from collections.abc import Iterable, Mapping, Sequence, Callable
16from debian.debian_support import DpkgArchTable
18from debputy.dh.debhelper_emulation import (
19 dhe_dbgsym_root_dir,
20 assert_no_dbgsym_migration,
21 read_dbgsym_file,
22)
23from ._deb_options_profiles import DebBuildOptionsAndProfiles
24from ._manifest_constants import *
25from .architecture_support import DpkgArchitectureBuildProcessValuesTable
26from .builtin_manifest_rules import builtin_mode_normalization_rules
27from .exceptions import (
28 DebputySubstitutionError,
29 DebputyRuntimeErrorWithPreamble,
30)
31from .filesystem_scan import FSPath, FSRootDir, OSFSROOverlay, FSControlRootDir
32from .installations import (
33 InstallRule,
34 SourcePathMatcher,
35 PathAlreadyInstalledOrDiscardedError,
36 NoMatchForInstallPatternError,
37 InstallRuleContext,
38 BinaryPackageInstallRuleContext,
39 InstallSearchDirContext,
40 SearchDir,
41)
42from .intermediate_manifest import TarMember, PathType, IntermediateManifest
43from .maintscript_snippet import (
44 DpkgMaintscriptHelperCommand,
45 MaintscriptSnippetContainer,
46)
47from .manifest_conditions import ConditionContext
48from .manifest_parser.base_types import (
49 FileSystemMatchRule,
50 FileSystemExactMatchRule,
51 BuildEnvironments,
52)
53from .manifest_parser.util import AttributePath
54from .packager_provided_files import PackagerProvidedFile
55from .packages import BinaryPackage, SourcePackage
56from .plugin.api.feature_set import PluginProvidedFeatureSet
57from .plugin.api.impl import BinaryCtrlAccessorProviderCreator
58from .plugin.api.impl_types import (
59 PackageProcessingContextProvider,
60 PackageDataTable,
61)
62from .plugin.api.spec import (
63 FlushableSubstvars,
64 VirtualPath,
65 DebputyIntegrationMode,
66 INTEGRATION_MODE_DH_DEBPUTY_RRR,
67 INTEGRATION_MODE_FULL,
68)
69from debputy.plugins.debputy.binary_package_rules import ServiceRule
70from debputy.plugins.debputy.build_system_rules import BuildRule
71from .plugin.plugin_state import run_in_context_of_plugin
72from .substitution import Substitution
73from .transformation_rules import (
74 TransformationRule,
75 ModeNormalizationTransformationRule,
76 NormalizeShebangLineTransformation,
77)
78from .util import (
79 _error,
80 _warn,
81 debian_policy_normalize_symlink_target,
82 generated_content_dir,
83 _info,
84)
85from .yaml import MANIFEST_YAML
86from .yaml.compat import CommentedMap, CommentedSeq
89class PathNotCoveredByInstallRulesError(DebputyRuntimeErrorWithPreamble):
91 @property
92 def unmatched_paths(self) -> Sequence[VirtualPath]:
93 return self.args[1]
95 @property
96 def search_dir(self) -> VirtualPath:
97 return self.args[2]
99 def render_preamble(self) -> None:
100 _warn(
101 f"The following paths were present in {self.search_dir.fs_path}, but not installed (nor explicitly discarded)."
102 )
103 _warn("")
104 for entry in self.unmatched_paths:
105 desc = _describe_missing_path(entry)
106 _warn(f" * {desc}")
107 _warn("")
110@dataclass(slots=True)
111class DbgsymInfo:
112 binary_package: BinaryPackage
113 dbgsym_fs_root: FSPath
114 _dbgsym_root_fs: str | None
115 dbgsym_ids: list[str]
116 run_dwz: bool
118 @property
119 def dbgsym_root_dir(self) -> str:
120 root_dir = self._dbgsym_root_fs
121 if root_dir is None:
122 root_dir = generated_content_dir(
123 package=self.binary_package,
124 subdir_key="dbgsym-fs-root",
125 )
126 self._dbgsym_root_fs = root_dir
127 return root_dir
129 @property
130 def dbgsym_ctrl_dir(self) -> FSControlRootDir:
131 return FSControlRootDir.create_root_dir(
132 os.path.join(self.dbgsym_root_dir, "DEBIAN")
133 )
136@dataclass(slots=True, frozen=True)
137class BinaryPackageData:
138 source_package: SourcePackage
139 binary_package: BinaryPackage
140 binary_staging_root_dir: str
141 fs_root: FSPath
142 substvars: FlushableSubstvars
143 package_metadata_context: PackageProcessingContextProvider
144 ctrl_creator: BinaryCtrlAccessorProviderCreator
145 dbgsym_info: DbgsymInfo
147 @property
148 def control_output_dir(self) -> FSControlRootDir:
149 return FSControlRootDir.create_root_dir(
150 generated_content_dir(
151 package=self.binary_package,
152 subdir_key="DEBIAN",
153 )
154 )
157@dataclass(slots=True)
158class PackageTransformationDefinition:
159 binary_package: BinaryPackage
160 substitution: Substitution
161 is_auto_generated_package: bool
162 binary_version: str | None = None
163 search_dirs: list[FileSystemExactMatchRule] | None = None
164 dpkg_maintscript_helper_snippets: list[DpkgMaintscriptHelperCommand] = field(
165 default_factory=list
166 )
167 maintscript_snippets: dict[str, MaintscriptSnippetContainer] = field(
168 default_factory=dict
169 )
170 transformations: list[TransformationRule] = field(default_factory=list)
171 reserved_packager_provided_files: dict[str, list[PackagerProvidedFile]] = field(
172 default_factory=dict
173 )
174 install_rules: list[InstallRule] = field(default_factory=list)
175 requested_service_rules: list[ServiceRule] = field(default_factory=list)
178def _path_to_tar_member(
179 path: FSPath,
180 clamp_mtime_to: int,
181) -> TarMember:
182 mtime = float(clamp_mtime_to)
183 owner, uid, group, gid = path.tar_owner_info
184 mode = path.mode
186 if path.has_fs_path:
187 mtime = min(mtime, path.mtime)
189 if path.is_dir:
190 path_type = PathType.DIRECTORY
191 elif path.is_file:
192 # TODO: someday we will need to deal with hardlinks and it might appear here.
193 path_type = PathType.FILE
194 elif path.is_symlink: 194 ↛ 214line 194 didn't jump to line 214 because the condition on line 194 was always true
195 # Special-case that we resolve immediately (since we need to normalize the target anyway)
196 link_target = debian_policy_normalize_symlink_target(
197 path.path,
198 path.readlink(),
199 )
200 return TarMember.virtual_path(
201 path.tar_path,
202 PathType.SYMLINK,
203 mtime,
204 link_target=link_target,
205 # Force mode to be 0777 as that is the mode we see in the data.tar. In theory, tar lets you set
206 # it to whatever. However, for reproducibility, we have to be well-behaved - and that is 0777.
207 mode=0o0777,
208 owner=owner,
209 uid=uid,
210 group=group,
211 gid=gid,
212 )
213 else:
214 assert not path.is_symlink
215 raise AssertionError(
216 f"Unsupported file type: {path.path} - not a file, dir nor a symlink!"
217 )
219 if not path.has_fs_path:
220 assert not path.is_file
221 return TarMember.virtual_path(
222 path.tar_path,
223 path_type,
224 mtime,
225 mode=mode,
226 owner=owner,
227 uid=uid,
228 group=group,
229 gid=gid,
230 )
231 may_steal_fs_path = path._can_replace_inline
232 return TarMember.from_file(
233 path.tar_path,
234 path.fs_path,
235 mode=mode,
236 uid=uid,
237 owner=owner,
238 gid=gid,
239 group=group,
240 path_type=path_type,
241 path_mtime=mtime,
242 clamp_mtime_to=clamp_mtime_to,
243 may_steal_fs_path=may_steal_fs_path,
244 )
247def _generate_intermediate_manifest(
248 fs_root: FSPath,
249 clamp_mtime_to: int,
250) -> Iterable[TarMember]:
251 symlinks = []
252 for path in fs_root.all_paths():
253 tar_member = _path_to_tar_member(path, clamp_mtime_to)
254 if tar_member.path_type == PathType.SYMLINK:
255 symlinks.append(tar_member)
256 continue
257 yield tar_member
258 yield from symlinks
261ST = TypeVar("ST")
262T = TypeVar("T")
265class AbstractYAMLSubStore(Generic[ST]):
266 def __init__(
267 self,
268 parent_store: Any,
269 parent_key: int | str | None,
270 store: ST | None = None,
271 ) -> None:
272 if parent_store is not None and parent_key is not None:
273 try:
274 from_parent_store = parent_store[parent_key]
275 except (KeyError, IndexError):
276 from_parent_store = None
277 if ( 277 ↛ 282line 277 didn't jump to line 282 because the condition on line 277 was never true
278 store is not None
279 and from_parent_store is not None
280 and store is not parent_store
281 ):
282 raise ValueError(
283 "Store is provided but is not the one already in the parent store"
284 )
285 if store is None: 285 ↛ 287line 285 didn't jump to line 287 because the condition on line 285 was always true
286 store = from_parent_store
287 self._parent_store = parent_store
288 self._parent_key = parent_key
289 self._is_detached = (
290 parent_key is None or parent_store is None or parent_key not in parent_store
291 )
292 assert self._is_detached or store is not None
293 if store is None:
294 store = self._create_new_instance()
295 self._store: ST = store
297 def _create_new_instance(self) -> ST:
298 raise NotImplementedError
300 def create_definition_if_missing(self) -> None:
301 if self._is_detached:
302 self.create_definition()
304 def create_definition(self) -> None:
305 if not self._is_detached: 305 ↛ 306line 305 didn't jump to line 306 because the condition on line 305 was never true
306 raise RuntimeError("Definition is already present")
307 parent_store = self._parent_store
308 if parent_store is None: 308 ↛ 309line 308 didn't jump to line 309 because the condition on line 308 was never true
309 raise RuntimeError(
310 f"Definition is not attached to any parent!? ({self.__class__.__name__})"
311 )
312 if isinstance(parent_store, list):
313 assert self._parent_key is None
314 self._parent_key = len(parent_store)
315 self._parent_store.append(self._store)
316 else:
317 parent_store[self._parent_key] = self._store
318 self._is_detached = False
320 def remove_definition(self) -> None:
321 self._ensure_attached()
322 del self._parent_store[self._parent_key]
323 if isinstance(self._parent_store, list):
324 self._parent_key = None
325 self._is_detached = True
327 def _ensure_attached(self) -> None:
328 if self._is_detached:
329 raise RuntimeError("The definition has been removed!")
332class AbstractYAMLListSubStore(Generic[T], AbstractYAMLSubStore[list[T]]):
333 def _create_new_instance(self) -> list[T]:
334 return CommentedSeq()
337class AbstractYAMLDictSubStore(Generic[T], AbstractYAMLSubStore[dict[str, T]]):
338 def _create_new_instance(self) -> dict[str, T]:
339 return CommentedMap()
342class MutableCondition:
343 @classmethod
344 def arch_matches(cls, arch_filter: str) -> CommentedMap:
345 return CommentedMap({MK_CONDITION_ARCH_MATCHES: arch_filter})
347 @classmethod
348 def build_profiles_matches(cls, build_profiles_matches: str) -> CommentedMap:
349 return CommentedMap(
350 {MK_CONDITION_BUILD_PROFILES_MATCHES: build_profiles_matches}
351 )
354class MutableYAMLSymlink(AbstractYAMLDictSubStore[Any]):
355 @classmethod
356 def new_symlink(
357 cls, link_path: str, link_target: str, condition: Any | None
358 ) -> "MutableYAMLSymlink":
359 inner = {
360 MK_TRANSFORMATIONS_CREATE_SYMLINK_LINK_PATH: link_path,
361 MK_TRANSFORMATIONS_CREATE_SYMLINK_LINK_TARGET: link_target,
362 }
363 content = {MK_TRANSFORMATIONS_CREATE_SYMLINK: inner}
364 if condition is not None: 364 ↛ 365line 364 didn't jump to line 365 because the condition on line 364 was never true
365 inner["when"] = condition
366 return cls(None, None, store=CommentedMap(content))
368 @property
369 def symlink_path(self) -> str:
370 return self._store[MK_TRANSFORMATIONS_CREATE_SYMLINK][
371 MK_TRANSFORMATIONS_CREATE_SYMLINK_LINK_PATH
372 ]
374 @symlink_path.setter
375 def symlink_path(self, path: str) -> None:
376 self._store[MK_TRANSFORMATIONS_CREATE_SYMLINK][
377 MK_TRANSFORMATIONS_CREATE_SYMLINK_LINK_PATH
378 ] = path
380 @property
381 def symlink_target(self) -> str | None:
382 return self._store[MK_TRANSFORMATIONS_CREATE_SYMLINK][
383 MK_TRANSFORMATIONS_CREATE_SYMLINK_LINK_TARGET
384 ]
386 @symlink_target.setter
387 def symlink_target(self, target: str) -> None:
388 self._store[MK_TRANSFORMATIONS_CREATE_SYMLINK][
389 MK_TRANSFORMATIONS_CREATE_SYMLINK_LINK_TARGET
390 ] = target
393class MutableYAMLConffileManagementItem(AbstractYAMLDictSubStore[Any]):
394 @classmethod
395 def rm_conffile(
396 cls,
397 conffile: str,
398 prior_to_version: str | None,
399 owning_package: str | None,
400 ) -> "MutableYAMLConffileManagementItem":
401 r = cls(
402 None,
403 None,
404 store=CommentedMap(
405 {
406 MK_CONFFILE_MANAGEMENT_REMOVE: CommentedMap(
407 {MK_CONFFILE_MANAGEMENT_REMOVE_PATH: conffile}
408 )
409 }
410 ),
411 )
412 r.prior_to_version = prior_to_version
413 r.owning_package = owning_package
414 return r
416 @classmethod
417 def mv_conffile(
418 cls,
419 old_conffile: str,
420 new_conffile: str,
421 prior_to_version: str | None,
422 owning_package: str | None,
423 ) -> "MutableYAMLConffileManagementItem":
424 r = cls(
425 None,
426 None,
427 store=CommentedMap(
428 {
429 MK_CONFFILE_MANAGEMENT_RENAME: CommentedMap(
430 {
431 MK_CONFFILE_MANAGEMENT_RENAME_SOURCE: old_conffile,
432 MK_CONFFILE_MANAGEMENT_RENAME_TARGET: new_conffile,
433 }
434 )
435 }
436 ),
437 )
438 r.prior_to_version = prior_to_version
439 r.owning_package = owning_package
440 return r
442 @property
443 def _container(self) -> dict[str, Any]:
444 assert len(self._store) == 1
445 return next(iter(self._store.values()))
447 @property
448 def command(self) -> str:
449 assert len(self._store) == 1
450 return next(iter(self._store))
452 @property
453 def obsolete_conffile(self) -> str:
454 if self.command == MK_CONFFILE_MANAGEMENT_REMOVE:
455 return self._container[MK_CONFFILE_MANAGEMENT_REMOVE_PATH]
456 assert self.command == MK_CONFFILE_MANAGEMENT_RENAME
457 return self._container[MK_CONFFILE_MANAGEMENT_RENAME_SOURCE]
459 @obsolete_conffile.setter
460 def obsolete_conffile(self, value: str) -> None:
461 if self.command == MK_CONFFILE_MANAGEMENT_REMOVE:
462 self._container[MK_CONFFILE_MANAGEMENT_REMOVE_PATH] = value
463 else:
464 assert self.command == MK_CONFFILE_MANAGEMENT_RENAME
465 self._container[MK_CONFFILE_MANAGEMENT_RENAME_SOURCE] = value
467 @property
468 def new_conffile(self) -> str:
469 if self.command != MK_CONFFILE_MANAGEMENT_RENAME:
470 raise TypeError(
471 f"The new_conffile attribute is only applicable to command {MK_CONFFILE_MANAGEMENT_RENAME}."
472 f" This is a {self.command}"
473 )
474 return self._container[MK_CONFFILE_MANAGEMENT_RENAME_TARGET]
476 @new_conffile.setter
477 def new_conffile(self, value: str) -> None:
478 if self.command != MK_CONFFILE_MANAGEMENT_RENAME:
479 raise TypeError(
480 f"The new_conffile attribute is only applicable to command {MK_CONFFILE_MANAGEMENT_RENAME}."
481 f" This is a {self.command}"
482 )
483 self._container[MK_CONFFILE_MANAGEMENT_RENAME_TARGET] = value
485 @property
486 def prior_to_version(self) -> str | None:
487 return self._container.get(MK_CONFFILE_MANAGEMENT_X_PRIOR_TO_VERSION)
489 @prior_to_version.setter
490 def prior_to_version(self, value: str | None) -> None:
491 if value is None:
492 try:
493 del self._container[MK_CONFFILE_MANAGEMENT_X_PRIOR_TO_VERSION]
494 except KeyError:
495 pass
496 else:
497 self._container[MK_CONFFILE_MANAGEMENT_X_PRIOR_TO_VERSION] = value
499 @property
500 def owning_package(self) -> str | None:
501 return self._container[MK_CONFFILE_MANAGEMENT_X_PRIOR_TO_VERSION]
503 @owning_package.setter
504 def owning_package(self, value: str | None) -> None:
505 if value is None:
506 try:
507 del self._container[MK_CONFFILE_MANAGEMENT_X_OWNING_PACKAGE]
508 except KeyError:
509 pass
510 else:
511 self._container[MK_CONFFILE_MANAGEMENT_X_OWNING_PACKAGE] = value
514class MutableYAMLPackageDefinition(AbstractYAMLDictSubStore):
515 def _list_store(
516 self, key, *, create_if_absent: bool = False
517 ) -> list[dict[str, Any]] | None:
518 if self._is_detached or key not in self._store:
519 if create_if_absent: 519 ↛ 520line 519 didn't jump to line 520 because the condition on line 519 was never true
520 return None
521 self.create_definition_if_missing()
522 self._store[key] = []
523 return self._store[key]
525 def _insert_item(self, key: str, item: AbstractYAMLDictSubStore) -> None:
526 parent_store = self._list_store(key, create_if_absent=True)
527 assert parent_store is not None
528 if not item._is_detached or ( 528 ↛ 531line 528 didn't jump to line 531 because the condition on line 528 was never true
529 item._parent_store is not None and item._parent_store is not parent_store
530 ):
531 raise RuntimeError(
532 "Item is already attached or associated with a different container"
533 )
534 item._parent_store = parent_store
535 item.create_definition()
537 def add_symlink(self, symlink: MutableYAMLSymlink) -> None:
538 self._insert_item(MK_TRANSFORMATIONS, symlink)
540 def symlinks(self) -> Iterable[MutableYAMLSymlink]:
541 store = self._list_store(MK_TRANSFORMATIONS)
542 if store is None: 542 ↛ 543line 542 didn't jump to line 543 because the condition on line 542 was never true
543 return
544 for i in range(len(store)): 544 ↛ 545line 544 didn't jump to line 545 because the loop on line 544 never started
545 d = store[i]
546 if d and isinstance(d, dict) and len(d) == 1 and "symlink" in d:
547 yield MutableYAMLSymlink(store, i)
549 def conffile_management_items(self) -> Iterable[MutableYAMLConffileManagementItem]:
550 store = self._list_store(MK_CONFFILE_MANAGEMENT)
551 if store is None: 551 ↛ 552line 551 didn't jump to line 552 because the condition on line 551 was never true
552 return
553 yield from (
554 MutableYAMLConffileManagementItem(store, i) for i in range(len(store))
555 )
557 def add_conffile_management(
558 self, conffile_management_item: MutableYAMLConffileManagementItem
559 ) -> None:
560 self._insert_item(MK_CONFFILE_MANAGEMENT, conffile_management_item)
563class AbstractMutableYAMLInstallRule(AbstractYAMLDictSubStore):
564 @property
565 def _container(self) -> dict[str, Any]:
566 assert len(self._store) == 1
567 return next(iter(self._store.values()))
569 @property
570 def into(self) -> list[str] | None:
571 v = self._container[MK_INSTALLATIONS_INSTALL_INTO]
572 if v is None:
573 return None
574 if isinstance(v, str):
575 return [v]
576 return v
578 @into.setter
579 def into(self, new_value: str | list[str] | None) -> None:
580 if new_value is None: 580 ↛ 584line 580 didn't jump to line 584 because the condition on line 580 was always true
581 with suppress(KeyError):
582 del self._container[MK_INSTALLATIONS_INSTALL_INTO]
583 return
584 if isinstance(new_value, str):
585 self._container[MK_INSTALLATIONS_INSTALL_INTO] = new_value
586 return
587 new_list = CommentedSeq(new_value)
588 self._container[MK_INSTALLATIONS_INSTALL_INTO] = new_list
590 @property
591 def when(self) -> str | Mapping[str, Any] | None:
592 return self._container[MK_CONDITION_WHEN]
594 @when.setter
595 def when(self, new_value: str | Mapping[str, Any] | None) -> None:
596 if new_value is None: 596 ↛ 597line 596 didn't jump to line 597 because the condition on line 596 was never true
597 with suppress(KeyError):
598 del self._container[MK_CONDITION_WHEN]
599 return
600 if isinstance(new_value, str): 600 ↛ 601line 600 didn't jump to line 601 because the condition on line 600 was never true
601 self._container[MK_CONDITION_WHEN] = new_value
602 return
603 new_map = CommentedMap(new_value)
604 self._container[MK_CONDITION_WHEN] = new_map
606 @classmethod
607 def install_dest(
608 cls,
609 sources: str | list[str],
610 into: str | list[str] | None,
611 *,
612 dest_dir: str | None = None,
613 when: str | Mapping[str, Any] | None = None,
614 ) -> "MutableYAMLInstallRuleInstall":
615 k = MK_INSTALLATIONS_INSTALL_SOURCES
616 if isinstance(sources, str):
617 k = MK_INSTALLATIONS_INSTALL_SOURCE
618 r = MutableYAMLInstallRuleInstall(
619 None,
620 None,
621 store=CommentedMap(
622 {
623 MK_INSTALLATIONS_INSTALL: CommentedMap(
624 {
625 k: sources,
626 }
627 )
628 }
629 ),
630 )
631 r.dest_dir = dest_dir
632 r.into = into
633 if when is not None:
634 r.when = when
635 return r
637 @classmethod
638 def multi_dest_install(
639 cls,
640 sources: str | list[str],
641 dest_dirs: Sequence[str],
642 into: str | list[str] | None,
643 *,
644 when: str | Mapping[str, Any] | None = None,
645 ) -> "MutableYAMLInstallRuleInstall":
646 k = MK_INSTALLATIONS_INSTALL_SOURCES
647 if isinstance(sources, str): 647 ↛ 649line 647 didn't jump to line 649 because the condition on line 647 was always true
648 k = MK_INSTALLATIONS_INSTALL_SOURCE
649 r = MutableYAMLInstallRuleInstall(
650 None,
651 None,
652 store=CommentedMap(
653 {
654 MK_INSTALLATIONS_MULTI_DEST_INSTALL: CommentedMap(
655 {
656 k: sources,
657 "dest-dirs": dest_dirs,
658 }
659 )
660 }
661 ),
662 )
663 r.into = into
664 if when is not None: 664 ↛ 665line 664 didn't jump to line 665 because the condition on line 664 was never true
665 r.when = when
666 return r
668 @classmethod
669 def install_as(
670 cls,
671 source: str,
672 install_as: str,
673 into: str | list[str] | None,
674 when: str | Mapping[str, Any] | None = None,
675 ) -> "MutableYAMLInstallRuleInstall":
676 r = MutableYAMLInstallRuleInstall(
677 None,
678 None,
679 store=CommentedMap(
680 {
681 MK_INSTALLATIONS_INSTALL: CommentedMap(
682 {
683 MK_INSTALLATIONS_INSTALL_SOURCE: source,
684 MK_INSTALLATIONS_INSTALL_AS: install_as,
685 }
686 )
687 }
688 ),
689 )
690 r.into = into
691 if when is not None: 691 ↛ 692line 691 didn't jump to line 692 because the condition on line 691 was never true
692 r.when = when
693 return r
695 @classmethod
696 def install_doc_as(
697 cls,
698 source: str,
699 install_as: str,
700 into: str | list[str] | None,
701 when: str | Mapping[str, Any] | None = None,
702 ) -> "MutableYAMLInstallRuleInstall":
703 r = MutableYAMLInstallRuleInstall(
704 None,
705 None,
706 store=CommentedMap(
707 {
708 MK_INSTALLATIONS_INSTALL_DOCS: CommentedMap(
709 {
710 MK_INSTALLATIONS_INSTALL_SOURCE: source,
711 MK_INSTALLATIONS_INSTALL_AS: install_as,
712 }
713 )
714 }
715 ),
716 )
717 r.into = into
718 if when is not None:
719 r.when = when
720 return r
722 @classmethod
723 def install_docs(
724 cls,
725 sources: str | list[str],
726 into: str | list[str] | None,
727 *,
728 dest_dir: str | None = None,
729 when: str | Mapping[str, Any] | None = None,
730 ) -> "MutableYAMLInstallRuleInstall":
731 k = MK_INSTALLATIONS_INSTALL_SOURCES
732 if isinstance(sources, str):
733 k = MK_INSTALLATIONS_INSTALL_SOURCE
734 r = MutableYAMLInstallRuleInstall(
735 None,
736 None,
737 store=CommentedMap(
738 {
739 MK_INSTALLATIONS_INSTALL_DOCS: CommentedMap(
740 {
741 k: sources,
742 }
743 )
744 }
745 ),
746 )
747 r.into = into
748 r.dest_dir = dest_dir
749 if when is not None:
750 r.when = when
751 return r
753 @classmethod
754 def install_examples(
755 cls,
756 sources: str | list[str],
757 into: str | list[str] | None,
758 when: str | Mapping[str, Any] | None = None,
759 ) -> "MutableYAMLInstallRuleInstallExamples":
760 k = MK_INSTALLATIONS_INSTALL_SOURCES
761 if isinstance(sources, str):
762 k = MK_INSTALLATIONS_INSTALL_SOURCE
763 r = MutableYAMLInstallRuleInstallExamples(
764 None,
765 None,
766 store=CommentedMap(
767 {
768 MK_INSTALLATIONS_INSTALL_EXAMPLES: CommentedMap(
769 {
770 k: sources,
771 }
772 )
773 }
774 ),
775 )
776 r.into = into
777 if when is not None: 777 ↛ 778line 777 didn't jump to line 778 because the condition on line 777 was never true
778 r.when = when
779 return r
781 @classmethod
782 def install_man(
783 cls,
784 sources: str | list[str],
785 into: str | list[str] | None,
786 language: str | None,
787 when: str | Mapping[str, Any] | None = None,
788 ) -> "MutableYAMLInstallRuleMan":
789 k = MK_INSTALLATIONS_INSTALL_SOURCES
790 if isinstance(sources, str): 790 ↛ 791line 790 didn't jump to line 791 because the condition on line 790 was never true
791 k = MK_INSTALLATIONS_INSTALL_SOURCE
792 r = MutableYAMLInstallRuleMan(
793 None,
794 None,
795 store=CommentedMap(
796 {
797 MK_INSTALLATIONS_INSTALL_MAN: CommentedMap(
798 {
799 k: sources,
800 }
801 )
802 }
803 ),
804 )
805 r.language = language
806 r.into = into
807 if when is not None: 807 ↛ 808line 807 didn't jump to line 808 because the condition on line 807 was never true
808 r.when = when
809 return r
811 @classmethod
812 def discard(
813 cls,
814 sources: str | list[str],
815 ) -> "MutableYAMLInstallRuleDiscard":
816 return MutableYAMLInstallRuleDiscard(
817 None,
818 None,
819 store=CommentedMap({MK_INSTALLATIONS_DISCARD: sources}),
820 )
823class MutableYAMLInstallRuleInstallExamples(AbstractMutableYAMLInstallRule):
824 pass
827class MutableYAMLInstallRuleMan(AbstractMutableYAMLInstallRule):
828 @property
829 def language(self) -> str | None:
830 return self._container[MK_INSTALLATIONS_INSTALL_MAN_LANGUAGE]
832 @language.setter
833 def language(self, new_value: str | None) -> None:
834 if new_value is not None:
835 self._container[MK_INSTALLATIONS_INSTALL_MAN_LANGUAGE] = new_value
836 return
837 with suppress(KeyError):
838 del self._container[MK_INSTALLATIONS_INSTALL_MAN_LANGUAGE]
841class MutableYAMLInstallRuleDiscard(AbstractMutableYAMLInstallRule):
842 pass
845class MutableYAMLInstallRuleInstall(AbstractMutableYAMLInstallRule):
846 @property
847 def sources(self) -> list[str]:
848 v = self._container[MK_INSTALLATIONS_INSTALL_SOURCES]
849 if isinstance(v, str):
850 return [v]
851 return v
853 @sources.setter
854 def sources(self, new_value: str | list[str]) -> None:
855 if isinstance(new_value, str):
856 self._container[MK_INSTALLATIONS_INSTALL_SOURCES] = new_value
857 return
858 new_list = CommentedSeq(new_value)
859 self._container[MK_INSTALLATIONS_INSTALL_SOURCES] = new_list
861 @property
862 def dest_dir(self) -> str | None:
863 return self._container.get(MK_INSTALLATIONS_INSTALL_DEST_DIR)
865 @dest_dir.setter
866 def dest_dir(self, new_value: str | None) -> None:
867 if new_value is not None and self.dest_as is not None: 867 ↛ 868line 867 didn't jump to line 868 because the condition on line 867 was never true
868 raise ValueError(
869 f'Cannot both have a "{MK_INSTALLATIONS_INSTALL_DEST_DIR}" and'
870 f' "{MK_INSTALLATIONS_INSTALL_AS}"'
871 )
872 if new_value is not None:
873 self._container[MK_INSTALLATIONS_INSTALL_DEST_DIR] = new_value
874 else:
875 with suppress(KeyError):
876 del self._container[MK_INSTALLATIONS_INSTALL_DEST_DIR]
878 @property
879 def dest_as(self) -> str | None:
880 return self._container.get(MK_INSTALLATIONS_INSTALL_AS)
882 @dest_as.setter
883 def dest_as(self, new_value: str | None) -> None:
884 if new_value is not None:
885 if self.dest_dir is not None:
886 raise ValueError(
887 f'Cannot both have a "{MK_INSTALLATIONS_INSTALL_DEST_DIR}" and'
888 f' "{MK_INSTALLATIONS_INSTALL_AS}"'
889 )
891 sources = self._container[MK_INSTALLATIONS_INSTALL_SOURCES]
892 if isinstance(sources, list):
893 if len(sources) != 1:
894 raise ValueError(
895 f'Cannot have "{MK_INSTALLATIONS_INSTALL_AS}" when'
896 f' "{MK_INSTALLATIONS_INSTALL_SOURCES}" is not exactly one item'
897 )
898 self.sources = sources[0]
899 self._container[MK_INSTALLATIONS_INSTALL_AS] = new_value
900 else:
901 with suppress(KeyError):
902 del self._container[MK_INSTALLATIONS_INSTALL_AS]
905class MutableYAMLInstallationsDefinition(AbstractYAMLListSubStore[Any]):
906 def append(self, install_rule: AbstractMutableYAMLInstallRule) -> None:
907 parent_store = self._store
908 if not install_rule._is_detached or ( 908 ↛ 912line 908 didn't jump to line 912 because the condition on line 908 was never true
909 install_rule._parent_store is not None
910 and install_rule._parent_store is not parent_store
911 ):
912 raise RuntimeError(
913 "Item is already attached or associated with a different container"
914 )
915 self.create_definition_if_missing()
916 install_rule._parent_store = parent_store
917 install_rule.create_definition()
919 def extend(self, install_rules: Iterable[AbstractMutableYAMLInstallRule]) -> None:
920 parent_store = self._store
921 for install_rule in install_rules:
922 if not install_rule._is_detached or ( 922 ↛ 926line 922 didn't jump to line 926 because the condition on line 922 was never true
923 install_rule._parent_store is not None
924 and install_rule._parent_store is not parent_store
925 ):
926 raise RuntimeError(
927 "Item is already attached or associated with a different container"
928 )
929 self.create_definition_if_missing()
930 install_rule._parent_store = parent_store
931 install_rule.create_definition()
934class MutableYAMLManifestVariables(AbstractYAMLDictSubStore):
935 @property
936 def variables(self) -> dict[str, Any]:
937 return self._store
939 def __setitem__(self, key: str, value: Any) -> None:
940 self._store[key] = value
941 self.create_definition_if_missing()
944class MutableYAMLManifestDefinitions(AbstractYAMLDictSubStore):
945 def manifest_variables(
946 self, *, create_if_absent: bool = True
947 ) -> MutableYAMLManifestVariables:
948 d = MutableYAMLManifestVariables(self._store, MK_MANIFEST_VARIABLES)
949 if create_if_absent: 949 ↛ 950line 949 didn't jump to line 950 because the condition on line 949 was never true
950 d.create_definition_if_missing()
951 return d
954class MutableYAMLRemoveDuringCleanDefinitions(AbstractYAMLListSubStore[str]):
955 def append(self, rule: str) -> None:
956 self.create_definition_if_missing()
957 self._store.append(rule)
959 def __len__(self) -> int:
960 return len(self._store)
962 def extend(self, rules: Iterable[str]) -> None:
963 it = iter(rules)
964 try:
965 first_rule = next(it)
966 except StopIteration:
967 return
968 self.create_definition_if_missing()
969 self._store.append(first_rule)
970 self._store.extend(it)
973class MutableYAMLManifest:
974 def __init__(self, store: Any) -> None:
975 self._store = store
977 @classmethod
978 def empty_manifest(cls) -> "MutableYAMLManifest":
979 return cls(CommentedMap({MK_MANIFEST_VERSION: DEFAULT_MANIFEST_VERSION}))
981 @property
982 def manifest_version(self) -> str:
983 return self._store[MK_MANIFEST_VERSION]
985 @manifest_version.setter
986 def manifest_version(self, version: str) -> None:
987 if version not in SUPPORTED_MANIFEST_VERSIONS:
988 raise ValueError("Unsupported version")
989 self._store[MK_MANIFEST_VERSION] = version
991 def remove_during_clean(
992 self,
993 *,
994 create_if_absent: bool = True,
995 ) -> MutableYAMLRemoveDuringCleanDefinitions:
996 d = MutableYAMLRemoveDuringCleanDefinitions(
997 self._store, MK_MANIFEST_REMOVE_DURING_CLEAN
998 )
999 if create_if_absent: 999 ↛ 1000line 999 didn't jump to line 1000 because the condition on line 999 was never true
1000 d.create_definition_if_missing()
1001 return d
1003 def installations(
1004 self,
1005 *,
1006 create_if_absent: bool = True,
1007 ) -> MutableYAMLInstallationsDefinition:
1008 d = MutableYAMLInstallationsDefinition(self._store, MK_INSTALLATIONS)
1009 if create_if_absent: 1009 ↛ 1010line 1009 didn't jump to line 1010 because the condition on line 1009 was never true
1010 d.create_definition_if_missing()
1011 return d
1013 def manifest_definitions(
1014 self,
1015 *,
1016 create_if_absent: bool = True,
1017 ) -> MutableYAMLManifestDefinitions:
1018 d = MutableYAMLManifestDefinitions(self._store, MK_MANIFEST_DEFINITIONS)
1019 if create_if_absent: 1019 ↛ 1020line 1019 didn't jump to line 1020 because the condition on line 1019 was never true
1020 d.create_definition_if_missing()
1021 return d
1023 def package(
1024 self, name: str, *, create_if_absent: bool = True
1025 ) -> MutableYAMLPackageDefinition:
1026 if MK_PACKAGES not in self._store: 1026 ↛ 1028line 1026 didn't jump to line 1028 because the condition on line 1026 was always true
1027 self._store[MK_PACKAGES] = CommentedMap()
1028 packages_store = self._store[MK_PACKAGES]
1029 package = packages_store.get(name)
1030 if package is None: 1030 ↛ 1037line 1030 didn't jump to line 1037 because the condition on line 1030 was always true
1031 if not create_if_absent: 1031 ↛ 1032line 1031 didn't jump to line 1032 because the condition on line 1031 was never true
1032 raise KeyError(name)
1033 assert packages_store is not None
1034 d = MutableYAMLPackageDefinition(packages_store, name)
1035 d.create_definition()
1036 else:
1037 d = MutableYAMLPackageDefinition(packages_store, name)
1038 return d
1040 def write_to(self, fd) -> None:
1041 MANIFEST_YAML.dump(self._store, fd)
1044def _describe_missing_path(entry: VirtualPath) -> str:
1045 if entry.is_dir:
1046 return f"{entry.fs_path}/ (empty directory; possible integration point)"
1047 if entry.is_symlink:
1048 target = os.readlink(entry.fs_path)
1049 return f"{entry.fs_path} (symlink; links to {target})"
1050 if entry.is_file:
1051 return f"{entry.fs_path} (file)"
1052 return f"{entry.fs_path} (other!? Probably not supported by debputy and may need a `remove`)"
1055def _detect_missing_installations(
1056 path_matcher: SourcePathMatcher,
1057 search_dir: VirtualPath,
1058) -> None:
1059 if not search_dir.is_dir: 1059 ↛ 1060line 1059 didn't jump to line 1060 because the condition on line 1059 was never true
1060 return
1061 missing = list(path_matcher.detect_missing(search_dir))
1062 if not missing:
1063 return
1065 excl = textwrap.dedent(
1066 """\
1067 - discard: "*"
1068 """
1069 )
1071 raise PathNotCoveredByInstallRulesError(
1072 "Please review the list and add either install rules or exclusions to `installations` in"
1073 " debian/debputy.manifest. If you do not need any of these paths, add the following to the"
1074 f" end of your 'installations`:\n\n{excl}\n",
1075 missing,
1076 search_dir,
1077 )
1080def _list_automatic_discard_rules(path_matcher: SourcePathMatcher) -> None:
1081 used_discard_rules = path_matcher.used_auto_discard_rules
1082 # Discard rules can match and then be overridden. In that case, they appear
1083 # but have 0 matches.
1084 if not sum((len(v) for v in used_discard_rules.values()), 0):
1085 return
1086 _info("The following automatic discard rules were triggered:")
1087 example_path: str | None = None
1088 for rule in sorted(used_discard_rules):
1089 for fs_path in sorted(used_discard_rules[rule]):
1090 if example_path is None: 1090 ↛ 1092line 1090 didn't jump to line 1092 because the condition on line 1090 was always true
1091 example_path = fs_path
1092 _info(f" * {rule} -> {fs_path}")
1093 assert example_path is not None
1094 _info("")
1095 _info(
1096 "Note that some of these may have been overruled. The overrule detection logic is not"
1097 )
1098 _info("100% reliable.")
1099 _info("")
1100 _info(
1101 "You can overrule an automatic discard rule by explicitly listing the path. As an example:"
1102 )
1103 _info(" installations:")
1104 _info(" - install:")
1105 _info(f" source: {example_path}")
1108def _install_everything_from_source_dir_if_present(
1109 dctrl_bin: BinaryPackage,
1110 substitution: Substitution,
1111 path_matcher: SourcePathMatcher,
1112 install_rule_context: InstallRuleContext,
1113 source_condition_context: ConditionContext,
1114 source_dir: VirtualPath,
1115 *,
1116 into_dir: VirtualPath | None = None,
1117) -> None:
1118 attribute_path = AttributePath.builtin_path()[f"installing {source_dir.fs_path}"]
1119 pkg_set = frozenset([dctrl_bin])
1120 install_rule = run_in_context_of_plugin(
1121 "debputy",
1122 InstallRule.install_dest,
1123 [FileSystemMatchRule.from_path_match("*", attribute_path, substitution)],
1124 None,
1125 pkg_set,
1126 f"Built-in; install everything from {source_dir.fs_path} into {dctrl_bin.name}",
1127 None,
1128 )
1129 pkg_search_dir: tuple[SearchDir] = (
1130 SearchDir(
1131 source_dir,
1132 pkg_set,
1133 ),
1134 )
1135 replacements = {
1136 "search_dirs": pkg_search_dir,
1137 }
1138 if into_dir is not None: 1138 ↛ 1139line 1138 didn't jump to line 1139 because the condition on line 1138 was never true
1139 binary_package_contexts = dict(install_rule_context.binary_package_contexts)
1140 updated = binary_package_contexts[dctrl_bin.name].replace(fs_root=into_dir)
1141 binary_package_contexts[dctrl_bin.name] = updated
1142 replacements["binary_package_contexts"] = binary_package_contexts
1144 fake_install_rule_context = install_rule_context.replace(**replacements)
1145 try:
1146 install_rule.perform_install(
1147 path_matcher,
1148 fake_install_rule_context,
1149 source_condition_context,
1150 )
1151 except (
1152 NoMatchForInstallPatternError,
1153 PathAlreadyInstalledOrDiscardedError,
1154 ):
1155 # Empty directory or everything excluded by default; ignore the error
1156 pass
1159def _add_build_install_dirs_to_per_package_search_dirs(
1160 build_system_install_dirs: Sequence[tuple[str, frozenset[BinaryPackage]]],
1161 per_package_search_dirs: dict[BinaryPackage, list[VirtualPath]],
1162 as_path: Callable[[str], VirtualPath],
1163) -> None:
1164 seen_pp_search_dirs: set[tuple[BinaryPackage, str]] = set()
1165 for dest_dir, for_packages in build_system_install_dirs:
1166 dest_path = as_path(dest_dir)
1167 for pkg in for_packages:
1168 seen_key = (pkg, dest_dir)
1169 if seen_key in seen_pp_search_dirs:
1170 continue
1171 seen_pp_search_dirs.add(seen_key)
1172 if pkg not in per_package_search_dirs:
1173 per_package_search_dirs[pkg] = [dest_path]
1174 else:
1175 per_package_search_dirs[pkg].append(dest_path)
1178class HighLevelManifest:
1179 def __init__(
1180 self,
1181 manifest_path: str,
1182 mutable_manifest: MutableYAMLManifest | None,
1183 remove_during_clean_rules: list[FileSystemMatchRule],
1184 install_rules: list[InstallRule] | None,
1185 source_package: SourcePackage,
1186 binary_packages: Mapping[str, BinaryPackage],
1187 substitution: Substitution,
1188 package_transformations: Mapping[str, PackageTransformationDefinition],
1189 dpkg_architecture_variables: DpkgArchitectureBuildProcessValuesTable,
1190 dpkg_arch_query_table: DpkgArchTable,
1191 build_env: DebBuildOptionsAndProfiles,
1192 build_environments: BuildEnvironments,
1193 build_rules: list[BuildRule] | None,
1194 value_table: Mapping[
1195 tuple[SourcePackage | BinaryPackage, type[Any]],
1196 Any,
1197 ],
1198 plugin_provided_feature_set: PluginProvidedFeatureSet,
1199 debian_dir: VirtualPath,
1200 ) -> None:
1201 self.manifest_path = manifest_path
1202 self.mutable_manifest = mutable_manifest
1203 self._remove_during_clean_rules: list[FileSystemMatchRule] = (
1204 remove_during_clean_rules
1205 )
1206 self._install_rules = install_rules
1207 self.source_package = source_package
1208 self._binary_packages = binary_packages
1209 self.substitution = substitution
1210 self.package_transformations = package_transformations
1211 self._dpkg_architecture_variables = dpkg_architecture_variables
1212 self.dpkg_arch_query_table = dpkg_arch_query_table
1213 self._build_env = build_env
1214 self._used_for: set[str] = set()
1215 self.build_environments = build_environments
1216 self.build_rules = build_rules
1217 self._value_table = value_table
1218 self._plugin_provided_feature_set = plugin_provided_feature_set
1219 self._debian_dir = debian_dir
1220 self._source_condition_context = ConditionContext(
1221 binary_package=None,
1222 substitution=self.substitution,
1223 deb_options_and_profiles=self._build_env,
1224 dpkg_architecture_variables=self._dpkg_architecture_variables,
1225 dpkg_arch_query_table=self.dpkg_arch_query_table,
1226 )
1228 def source_version(self, include_binnmu_version: bool = True) -> str:
1229 # TODO: There should an easier way to determine the source version; really.
1230 version_var = "{{DEB_VERSION}}"
1231 if not include_binnmu_version:
1232 version_var = "{{_DEBPUTY_INTERNAL_NON_BINNMU_SOURCE}}"
1233 try:
1234 return self.substitution.substitute(
1235 version_var, "internal (resolve version)"
1236 )
1237 except DebputySubstitutionError as e:
1238 raise AssertionError(f"Could not resolve {version_var}") from e
1240 @property
1241 def source_condition_context(self) -> ConditionContext:
1242 return self._source_condition_context
1244 @property
1245 def debian_dir(self) -> VirtualPath:
1246 return self._debian_dir
1248 @property
1249 def dpkg_architecture_variables(self) -> DpkgArchitectureBuildProcessValuesTable:
1250 return self._dpkg_architecture_variables
1252 @property
1253 def deb_options_and_profiles(self) -> DebBuildOptionsAndProfiles:
1254 return self._build_env
1256 @property
1257 def plugin_provided_feature_set(self) -> PluginProvidedFeatureSet:
1258 return self._plugin_provided_feature_set
1260 @property
1261 def remove_during_clean_rules(self) -> list[FileSystemMatchRule]:
1262 return self._remove_during_clean_rules
1264 @property
1265 def active_packages(self) -> Iterable[BinaryPackage]:
1266 yield from (p for p in self._binary_packages.values() if p.should_be_acted_on)
1268 @property
1269 def all_packages(self) -> Iterable[BinaryPackage]:
1270 yield from self._binary_packages.values()
1272 def manifest_configuration[T](
1273 self,
1274 context_package: SourcePackage | BinaryPackage,
1275 value_type: type[T],
1276 ) -> T | None:
1277 res = self._value_table.get((context_package, value_type))
1278 return typing.cast("T | None", res)
1280 def package_state_for(self, package: str) -> PackageTransformationDefinition:
1281 return self.package_transformations[package]
1283 def _detect_doc_main_package_for(self, package: BinaryPackage) -> BinaryPackage:
1284 name = package.name
1285 for doc_main_field in ("Doc-Main-Package", "X-Doc-Main-Package"):
1286 doc_main_package_name = package.fields.get(doc_main_field)
1287 if doc_main_package_name: 1287 ↛ 1288line 1287 didn't jump to line 1288 because the condition on line 1287 was never true
1288 main_package = self._binary_packages.get(doc_main_package_name)
1289 if main_package is None:
1290 _error(
1291 f"Invalid Doc-Main-Package for {name}: The package {doc_main_package_name!r} is not listed in d/control"
1292 )
1293 return main_package
1294 # If it is not a -doc package, then docs should be installed
1295 # under its own package name.
1296 if not name.endswith("-doc"): 1296 ↛ 1298line 1296 didn't jump to line 1298 because the condition on line 1296 was always true
1297 return package
1298 name = name[:-4]
1299 main_package = self._binary_packages.get(name)
1300 if main_package:
1301 return main_package
1302 if name.startswith("lib"):
1303 dev_pkg = self._binary_packages.get(f"{name}-dev")
1304 if dev_pkg:
1305 return dev_pkg
1307 # If we found no better match; default to the doc package itself.
1308 return package
1310 def perform_installations(
1311 self,
1312 integration_mode: DebputyIntegrationMode,
1313 build_system_install_dirs: Sequence[tuple[str, frozenset[BinaryPackage]]],
1314 *,
1315 install_request_context: InstallSearchDirContext | None = None,
1316 ) -> PackageDataTable:
1317 package_data_dict = {}
1318 package_data_table = PackageDataTable(package_data_dict)
1319 enable_manifest_installation_feature = (
1320 integration_mode != INTEGRATION_MODE_DH_DEBPUTY_RRR
1321 )
1323 if build_system_install_dirs: 1323 ↛ 1324line 1323 didn't jump to line 1324 because the condition on line 1323 was never true
1324 if integration_mode != INTEGRATION_MODE_FULL:
1325 raise ValueError(
1326 "The build_system_install_dirs parameter can only be used in full integration mode"
1327 )
1328 if install_request_context:
1329 raise ValueError(
1330 "The build_system_install_dirs parameter cannot be used with install_request_context"
1331 " (not implemented)"
1332 )
1334 if install_request_context is None: 1334 ↛ 1336line 1334 didn't jump to line 1336 because the condition on line 1334 was never true
1336 @functools.lru_cache(None)
1337 def _as_path(fs_path: str) -> VirtualPath:
1338 return OSFSROOverlay.create_root_dir(".", fs_path)
1340 dtmp_dir = _as_path("debian/tmp")
1341 source_root_dir = _as_path(".")
1342 into = frozenset(self._binary_packages.values())
1343 per_package_search_dirs = {
1344 t.binary_package: [_as_path(f.match_rule.path) for f in t.search_dirs]
1345 for t in self.package_transformations.values()
1346 if t.search_dirs is not None
1347 }
1349 if integration_mode == INTEGRATION_MODE_FULL:
1350 # In this mode, we have no default search dir. Everything ends up being
1351 # per-package instead (since that is easier logic-wise).
1352 #
1353 # Even dtmp_dir is omitted here (since it is not universally applicable).
1354 # Note we still initialize dtmp_dir, since it affects a later guard.
1355 default_search_dirs = []
1356 _add_build_install_dirs_to_per_package_search_dirs(
1357 build_system_install_dirs,
1358 per_package_search_dirs,
1359 _as_path,
1360 )
1362 # We can end here with per_package_search_dirs having no search dirs for any package
1363 # (single binary, where everything is installed into d/<pkg> is the most common case).
1364 #
1365 # This is not a problem in itself as the installation rules can still apply to the
1366 # source root and there should be no reason to install something from d/<pkg> into
1367 # d/<another-pkg>
1368 else:
1369 default_search_dirs = [dtmp_dir]
1371 search_dirs = _determine_search_dir_order(
1372 per_package_search_dirs,
1373 into,
1374 default_search_dirs,
1375 source_root_dir,
1376 )
1377 check_for_uninstalled_dirs = tuple(
1378 s.search_dir
1379 for s in search_dirs
1380 if s.search_dir.fs_path != source_root_dir.fs_path
1381 )
1382 if enable_manifest_installation_feature:
1383 _present_installation_dirs(
1384 search_dirs, check_for_uninstalled_dirs, into
1385 )
1386 else:
1387 dtmp_dir = None
1388 search_dirs = install_request_context.search_dirs
1389 into = frozenset(self._binary_packages.values())
1390 seen: set[BinaryPackage] = set()
1391 for search_dir in search_dirs:
1392 seen.update(search_dir.applies_to)
1394 missing = into - seen
1395 if missing: 1395 ↛ 1396line 1395 didn't jump to line 1396 because the condition on line 1395 was never true
1396 names = ", ".join(p.name for p in missing)
1397 raise ValueError(
1398 f"The following package(s) had no search dirs: {names}."
1399 " (Generally, the source root would be applicable to all packages)"
1400 )
1401 extra_names = seen - into
1402 if extra_names: 1402 ↛ 1403line 1402 didn't jump to line 1403 because the condition on line 1402 was never true
1403 names = ", ".join(p.name for p in extra_names)
1404 raise ValueError(
1405 f"The install_request_context referenced the following unknown package(s): {names}"
1406 )
1408 check_for_uninstalled_dirs = (
1409 install_request_context.check_for_uninstalled_dirs
1410 )
1412 install_rule_context = InstallRuleContext(search_dirs)
1414 if ( 1414 ↛ 1421line 1414 didn't jump to line 1421 because the condition on line 1414 was never true
1415 enable_manifest_installation_feature
1416 and self._install_rules is None
1417 # TODO: Should we also do this for full mode when build systems provided search dirs?
1418 and dtmp_dir is not None
1419 and os.path.isdir(dtmp_dir.fs_path)
1420 ):
1421 msg = (
1422 "The build system appears to have provided the output of upstream build system's"
1423 " install in debian/tmp. However, these are no provisions for debputy to install"
1424 " any of that into any of the debian packages listed in debian/control."
1425 " To avoid accidentally creating empty packages, debputy will insist that you "
1426 " explicitly define an empty installation definition if you did not want to "
1427 " install any of those files even though they have been provided."
1428 ' Example: "installations: []"'
1429 )
1430 _error(msg)
1431 elif ( 1431 ↛ 1434line 1431 didn't jump to line 1434 because the condition on line 1431 was never true
1432 not enable_manifest_installation_feature and self._install_rules is not None
1433 ):
1434 _error(
1435 f"The `installations` feature cannot be used in {self.manifest_path} with this integration mode."
1436 f" Please remove or comment out the `installations` keyword."
1437 )
1439 for dctrl_bin in self.all_packages:
1440 package = dctrl_bin.name
1441 doc_main_package = self._detect_doc_main_package_for(dctrl_bin)
1443 install_rule_context[package] = BinaryPackageInstallRuleContext(
1444 dctrl_bin,
1445 FSRootDir(),
1446 doc_main_package,
1447 )
1449 if enable_manifest_installation_feature: 1449 ↛ 1454line 1449 didn't jump to line 1454 because the condition on line 1449 was always true
1450 discard_rules = list(
1451 self.plugin_provided_feature_set.auto_discard_rules.values()
1452 )
1453 else:
1454 discard_rules = [
1455 self.plugin_provided_feature_set.auto_discard_rules["debian-dir"]
1456 ]
1457 path_matcher = SourcePathMatcher(discard_rules)
1459 source_condition_context = self._source_condition_context
1461 for dctrl_bin in self.active_packages:
1462 package = dctrl_bin.name
1463 if install_request_context: 1463 ↛ 1468line 1463 didn't jump to line 1468 because the condition on line 1463 was always true
1464 build_system_staging_dir = install_request_context.debian_pkg_dirs.get(
1465 package
1466 )
1467 else:
1468 build_system_staging_dir_fs_path = os.path.join("debian", package)
1469 if os.path.isdir(build_system_staging_dir_fs_path):
1470 build_system_staging_dir = OSFSROOverlay.create_root_dir(
1471 ".",
1472 build_system_staging_dir_fs_path,
1473 )
1474 else:
1475 build_system_staging_dir = None
1477 if build_system_staging_dir is not None:
1478 _install_everything_from_source_dir_if_present(
1479 dctrl_bin,
1480 self.substitution,
1481 path_matcher,
1482 install_rule_context,
1483 source_condition_context,
1484 build_system_staging_dir,
1485 )
1487 if self._install_rules:
1488 # FIXME: Check that every install rule remains used after transformations have run.
1489 # What we want to check is transformations do not exclude everything from an install
1490 # rule. The hard part here is that renaming (etc.) is fine, so we cannot 1:1 string
1491 # match.
1492 for install_rule in self._install_rules:
1493 install_rule.perform_install(
1494 path_matcher,
1495 install_rule_context,
1496 source_condition_context,
1497 )
1499 if enable_manifest_installation_feature: 1499 ↛ 1503line 1499 didn't jump to line 1503 because the condition on line 1499 was always true
1500 for search_dir in check_for_uninstalled_dirs:
1501 _detect_missing_installations(path_matcher, search_dir)
1503 for dctrl_bin in self.all_packages:
1504 package = dctrl_bin.name
1505 binary_install_rule_context = install_rule_context[package]
1506 build_system_pkg_staging_dir = os.path.join("debian", package)
1507 fs_root = binary_install_rule_context.fs_root
1509 context = self.package_transformations[package]
1510 if dctrl_bin.should_be_acted_on and enable_manifest_installation_feature:
1511 for special_install_rule in context.install_rules: 1511 ↛ 1512line 1511 didn't jump to line 1512 because the loop on line 1511 never started
1512 special_install_rule.perform_install(
1513 path_matcher,
1514 install_rule_context,
1515 source_condition_context,
1516 )
1518 if dctrl_bin.should_be_acted_on:
1519 self.apply_fs_transformations(package, fs_root)
1520 substvars_file = f"debian/{package}.substvars"
1521 substvars = FlushableSubstvars.load_from_path(
1522 substvars_file, missing_ok=True
1523 )
1524 # We do not want to touch the substvars file (non-clean rebuild contamination)
1525 substvars.substvars_path = None
1526 else:
1527 substvars = FlushableSubstvars()
1529 udeb_package = self._binary_packages.get(f"{package}-udeb")
1530 if udeb_package and not udeb_package.is_udeb: 1530 ↛ 1531line 1530 didn't jump to line 1531 because the condition on line 1530 was never true
1531 udeb_package = None
1533 package_metadata_context = PackageProcessingContextProvider(
1534 self,
1535 dctrl_bin,
1536 udeb_package,
1537 package_data_table,
1538 )
1540 ctrl_creator = BinaryCtrlAccessorProviderCreator(
1541 package_metadata_context,
1542 substvars,
1543 context.maintscript_snippets,
1544 context.substitution,
1545 )
1547 if not enable_manifest_installation_feature: 1547 ↛ 1548line 1547 didn't jump to line 1548 because the condition on line 1547 was never true
1548 assert_no_dbgsym_migration(dctrl_bin)
1549 dh_dbgsym_root_fs = dhe_dbgsym_root_dir(dctrl_bin)
1550 dh_dbgsym_root_path = OSFSROOverlay.create_root_dir(
1551 "",
1552 dh_dbgsym_root_fs,
1553 )
1554 dbgsym_root_fs = FSRootDir()
1555 _install_everything_from_source_dir_if_present(
1556 dctrl_bin,
1557 self.substitution,
1558 path_matcher,
1559 install_rule_context,
1560 source_condition_context,
1561 dh_dbgsym_root_path,
1562 into_dir=dbgsym_root_fs,
1563 )
1564 dbgsym_build_ids = read_dbgsym_file(dctrl_bin)
1565 dbgsym_info = DbgsymInfo(
1566 dctrl_bin,
1567 dbgsym_root_fs,
1568 os.path.join(dh_dbgsym_root_fs, "DEBIAN"),
1569 dbgsym_build_ids,
1570 # TODO: Provide manifest feature to support this.
1571 False,
1572 )
1573 else:
1574 dbgsym_info = DbgsymInfo(
1575 dctrl_bin,
1576 FSRootDir(),
1577 None,
1578 [],
1579 False,
1580 )
1582 package_data_dict[package] = BinaryPackageData(
1583 self.source_package,
1584 dctrl_bin,
1585 build_system_pkg_staging_dir,
1586 fs_root,
1587 substvars,
1588 package_metadata_context,
1589 ctrl_creator,
1590 dbgsym_info,
1591 )
1593 if enable_manifest_installation_feature: 1593 ↛ 1596line 1593 didn't jump to line 1596 because the condition on line 1593 was always true
1594 _list_automatic_discard_rules(path_matcher)
1596 return package_data_table
1598 def condition_context(
1599 self, binary_package: BinaryPackage | str | None
1600 ) -> ConditionContext:
1601 if binary_package is None: 1601 ↛ 1602line 1601 didn't jump to line 1602 because the condition on line 1601 was never true
1602 return self._source_condition_context
1603 if not isinstance(binary_package, str):
1604 binary_package = binary_package.name
1606 package_transformation = self.package_transformations[binary_package]
1607 return self._source_condition_context.replace(
1608 binary_package=package_transformation.binary_package,
1609 substitution=package_transformation.substitution,
1610 )
1612 def apply_fs_transformations(
1613 self,
1614 package: str,
1615 fs_root: FSPath,
1616 ) -> None:
1617 if package in self._used_for: 1617 ↛ 1618line 1617 didn't jump to line 1618 because the condition on line 1617 was never true
1618 raise ValueError(
1619 f"data.tar contents for {package} has already been finalized!?"
1620 )
1621 if package not in self.package_transformations: 1621 ↛ 1622line 1621 didn't jump to line 1622 because the condition on line 1621 was never true
1622 raise ValueError(
1623 f'The package "{package}" was not relevant for the manifest!?'
1624 )
1625 package_transformation = self.package_transformations[package]
1626 condition_context = ConditionContext(
1627 binary_package=package_transformation.binary_package,
1628 substitution=package_transformation.substitution,
1629 deb_options_and_profiles=self._build_env,
1630 dpkg_architecture_variables=self._dpkg_architecture_variables,
1631 dpkg_arch_query_table=self.dpkg_arch_query_table,
1632 )
1633 norm_rules = list(
1634 builtin_mode_normalization_rules(
1635 self._dpkg_architecture_variables,
1636 package_transformation.binary_package,
1637 package_transformation.substitution,
1638 )
1639 )
1640 norm_mode_transformation_rule = ModeNormalizationTransformationRule(norm_rules)
1641 norm_mode_transformation_rule.transform_file_system(fs_root, condition_context)
1642 for transformation in package_transformation.transformations:
1643 transformation.run_transform_file_system(fs_root, condition_context)
1644 interpreter_normalization = NormalizeShebangLineTransformation()
1645 interpreter_normalization.transform_file_system(fs_root, condition_context)
1647 def finalize_data_tar_contents(
1648 self,
1649 package: str,
1650 fs_root: FSPath,
1651 clamp_mtime_to: int,
1652 ) -> IntermediateManifest:
1653 if package in self._used_for: 1653 ↛ 1654line 1653 didn't jump to line 1654 because the condition on line 1653 was never true
1654 raise ValueError(
1655 f"data.tar contents for {package} has already been finalized!?"
1656 )
1657 if package not in self.package_transformations: 1657 ↛ 1658line 1657 didn't jump to line 1658 because the condition on line 1657 was never true
1658 raise ValueError(
1659 f'The package "{package}" was not relevant for the manifest!?'
1660 )
1661 self._used_for.add(package)
1663 # At this point, there so be no further mutations to the file system (because the will not
1664 # be present in the intermediate manifest)
1665 cast("FSRootDir", fs_root).is_read_write = False
1667 intermediate_manifest = list(
1668 _generate_intermediate_manifest(
1669 fs_root,
1670 clamp_mtime_to,
1671 )
1672 )
1673 return intermediate_manifest
1675 def apply_to_binary_staging_directory(
1676 self,
1677 package: str,
1678 fs_root: FSPath,
1679 clamp_mtime_to: int,
1680 ) -> IntermediateManifest:
1681 self.apply_fs_transformations(package, fs_root)
1682 return self.finalize_data_tar_contents(package, fs_root, clamp_mtime_to)
1685@dataclasses.dataclass(slots=True)
1686class SearchDirOrderState:
1687 search_dir: VirtualPath
1688 applies_to: set[BinaryPackage] | frozenset[BinaryPackage] = dataclasses.field(
1689 default_factory=set
1690 )
1691 after: set[str] = dataclasses.field(default_factory=set)
1694def _present_installation_dirs(
1695 search_dirs: Sequence[SearchDir],
1696 checked_missing_dirs: Sequence[VirtualPath],
1697 all_pkgs: frozenset[BinaryPackage],
1698) -> None:
1699 _info("The following directories are considered search dirs (in order):")
1700 max_len = max((len(s.search_dir.fs_path) for s in search_dirs), default=1)
1701 for search_dir in search_dirs:
1702 applies_to = ""
1703 if search_dir.applies_to < all_pkgs:
1704 names = ", ".join(p.name for p in search_dir.applies_to)
1705 applies_to = f" [only applicable to: {names}]"
1706 remark = ""
1707 if not os.path.isdir(search_dir.search_dir.fs_path):
1708 remark = " (skipped; absent)"
1709 _info(f" * {search_dir.search_dir.fs_path:{max_len}}{applies_to}{remark}")
1711 if checked_missing_dirs:
1712 _info('The following directories are considered for "not-installed" paths;')
1713 for d in checked_missing_dirs:
1714 remark = ""
1715 if not os.path.isdir(d.fs_path):
1716 remark = " (skipped; absent)"
1717 _info(f" * {d.fs_path:{max_len}}{remark}")
1720def _determine_search_dir_order(
1721 requested: Mapping[BinaryPackage, list[VirtualPath]],
1722 all_pkgs: frozenset[BinaryPackage],
1723 default_search_dirs: list[VirtualPath],
1724 source_root: VirtualPath,
1725) -> Sequence[SearchDir]:
1726 search_dir_table = {}
1727 assert requested.keys() <= all_pkgs
1728 for pkg in all_pkgs:
1729 paths = requested.get(pkg, default_search_dirs)
1730 previous_search_dir: SearchDirOrderState | None = None
1731 for path in paths:
1732 try:
1733 search_dir_state = search_dir_table[path.fs_path]
1734 except KeyError:
1735 search_dir_state = SearchDirOrderState(path)
1736 search_dir_table[path.fs_path] = search_dir_state
1737 search_dir_state.applies_to.add(pkg)
1738 if previous_search_dir is not None:
1739 search_dir_state.after.add(previous_search_dir.search_dir.fs_path)
1740 previous_search_dir = search_dir_state
1742 search_dirs_in_order = []
1743 released = set()
1744 remaining = set()
1745 for search_dir_state in search_dir_table.values():
1746 if not (search_dir_state.after <= released):
1747 remaining.add(search_dir_state.search_dir.fs_path)
1748 continue
1749 search_dirs_in_order.append(search_dir_state)
1750 released.add(search_dir_state.search_dir.fs_path)
1752 while remaining:
1753 current_released = len(released)
1754 for fs_path in remaining:
1755 search_dir_state = search_dir_table[fs_path]
1756 if not search_dir_state.after.issubset(released):
1757 remaining.add(search_dir_state.search_dir.fs_path)
1758 continue
1759 search_dirs_in_order.append(search_dir_state)
1760 released.add(search_dir_state.search_dir.fs_path)
1762 if current_released == len(released):
1763 names = ", ".join(remaining)
1764 _error(
1765 f"There is a circular dependency (somewhere) between the search dirs: {names}."
1766 " Note that the search directories across all packages have to be ordered (and the"
1767 " source root should generally be last)"
1768 )
1769 remaining -= released
1771 search_dirs_in_order.append(
1772 SearchDirOrderState(
1773 source_root,
1774 all_pkgs,
1775 )
1776 )
1778 return tuple(
1779 # Avoid duplicating all_pkgs
1780 SearchDir(
1781 s.search_dir,
1782 frozenset(s.applies_to) if s.applies_to != all_pkgs else all_pkgs,
1783 )
1784 for s in search_dirs_in_order
1785 )