Coverage for src/debputy/highlevel_manifest.py: 64%
886 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-10-12 15:06 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-10-12 15:06 +0000
1import dataclasses
2import functools
3import os
4import textwrap
5from contextlib import suppress
6from dataclasses import dataclass, field
7from typing import (
8 List,
9 Dict,
10 Any,
11 Union,
12 Optional,
13 TypeVar,
14 Generic,
15 cast,
16 Set,
17 Tuple,
18 FrozenSet,
19)
20from collections.abc import Iterable, Mapping, Sequence, Callable
22from debian.debian_support import DpkgArchTable
24from debputy.dh.debhelper_emulation import (
25 dhe_dbgsym_root_dir,
26 assert_no_dbgsym_migration,
27 read_dbgsym_file,
28)
29from ._deb_options_profiles import DebBuildOptionsAndProfiles
30from ._manifest_constants import *
31from .architecture_support import DpkgArchitectureBuildProcessValuesTable
32from .builtin_manifest_rules import builtin_mode_normalization_rules
33from .exceptions import (
34 DebputySubstitutionError,
35 DebputyRuntimeErrorWithPreamble,
36)
37from .filesystem_scan import FSPath, FSRootDir, FSROOverlay, FSControlRootDir
38from .installations import (
39 InstallRule,
40 SourcePathMatcher,
41 PathAlreadyInstalledOrDiscardedError,
42 NoMatchForInstallPatternError,
43 InstallRuleContext,
44 BinaryPackageInstallRuleContext,
45 InstallSearchDirContext,
46 SearchDir,
47)
48from .intermediate_manifest import TarMember, PathType, IntermediateManifest
49from .maintscript_snippet import (
50 DpkgMaintscriptHelperCommand,
51 MaintscriptSnippetContainer,
52)
53from .manifest_conditions import ConditionContext
54from .manifest_parser.base_types import (
55 FileSystemMatchRule,
56 FileSystemExactMatchRule,
57 BuildEnvironments,
58)
59from .manifest_parser.util import AttributePath
60from .packager_provided_files import PackagerProvidedFile
61from .packages import BinaryPackage, SourcePackage
62from .plugin.api.feature_set import PluginProvidedFeatureSet
63from .plugin.api.impl import BinaryCtrlAccessorProviderCreator
64from .plugin.api.impl_types import (
65 PackageProcessingContextProvider,
66 PackageDataTable,
67)
68from .plugin.api.spec import (
69 FlushableSubstvars,
70 VirtualPath,
71 DebputyIntegrationMode,
72 INTEGRATION_MODE_DH_DEBPUTY_RRR,
73 INTEGRATION_MODE_FULL,
74)
75from debputy.plugins.debputy.binary_package_rules import ServiceRule
76from debputy.plugins.debputy.build_system_rules import BuildRule
77from .plugin.plugin_state import run_in_context_of_plugin
78from .substitution import Substitution
79from .transformation_rules import (
80 TransformationRule,
81 ModeNormalizationTransformationRule,
82 NormalizeShebangLineTransformation,
83)
84from .util import (
85 _error,
86 _warn,
87 debian_policy_normalize_symlink_target,
88 generated_content_dir,
89 _info,
90)
91from .yaml import MANIFEST_YAML
92from .yaml.compat import CommentedMap, CommentedSeq
95class PathNotCoveredByInstallRulesError(DebputyRuntimeErrorWithPreamble):
97 @property
98 def unmatched_paths(self) -> Sequence[VirtualPath]:
99 return self.args[1]
101 @property
102 def search_dir(self) -> VirtualPath:
103 return self.args[2]
105 def render_preamble(self) -> None:
106 _warn(
107 f"The following paths were present in {self.search_dir.fs_path}, but not installed (nor explicitly discarded)."
108 )
109 _warn("")
110 for entry in self.unmatched_paths:
111 desc = _describe_missing_path(entry)
112 _warn(f" * {desc}")
113 _warn("")
116@dataclass(slots=True)
117class DbgsymInfo:
118 binary_package: BinaryPackage
119 dbgsym_fs_root: FSPath
120 _dbgsym_root_fs: str | None
121 dbgsym_ids: list[str]
122 run_dwz: bool
124 @property
125 def dbgsym_root_dir(self) -> str:
126 root_dir = self._dbgsym_root_fs
127 if root_dir is None:
128 root_dir = generated_content_dir(
129 package=self.binary_package,
130 subdir_key="dbgsym-fs-root",
131 )
132 self._dbgsym_root_fs = root_dir
133 return root_dir
135 @property
136 def dbgsym_ctrl_dir(self) -> FSControlRootDir:
137 return FSControlRootDir.create_root_dir(
138 os.path.join(self.dbgsym_root_dir, "DEBIAN")
139 )
142@dataclass(slots=True, frozen=True)
143class BinaryPackageData:
144 source_package: SourcePackage
145 binary_package: BinaryPackage
146 binary_staging_root_dir: str
147 fs_root: FSPath
148 substvars: FlushableSubstvars
149 package_metadata_context: PackageProcessingContextProvider
150 ctrl_creator: BinaryCtrlAccessorProviderCreator
151 dbgsym_info: DbgsymInfo
153 @property
154 def control_output_dir(self) -> FSControlRootDir:
155 return FSControlRootDir.create_root_dir(
156 generated_content_dir(
157 package=self.binary_package,
158 subdir_key="DEBIAN",
159 )
160 )
163@dataclass(slots=True)
164class PackageTransformationDefinition:
165 binary_package: BinaryPackage
166 substitution: Substitution
167 is_auto_generated_package: bool
168 binary_version: str | None = None
169 search_dirs: list[FileSystemExactMatchRule] | None = None
170 dpkg_maintscript_helper_snippets: list[DpkgMaintscriptHelperCommand] = field(
171 default_factory=list
172 )
173 maintscript_snippets: dict[str, MaintscriptSnippetContainer] = field(
174 default_factory=dict
175 )
176 transformations: list[TransformationRule] = field(default_factory=list)
177 reserved_packager_provided_files: dict[str, list[PackagerProvidedFile]] = field(
178 default_factory=dict
179 )
180 install_rules: list[InstallRule] = field(default_factory=list)
181 requested_service_rules: list[ServiceRule] = field(default_factory=list)
184def _path_to_tar_member(
185 path: FSPath,
186 clamp_mtime_to: int,
187) -> TarMember:
188 mtime = float(clamp_mtime_to)
189 owner, uid, group, gid = path.tar_owner_info
190 mode = path.mode
192 if path.has_fs_path:
193 mtime = min(mtime, path.mtime)
195 if path.is_dir:
196 path_type = PathType.DIRECTORY
197 elif path.is_file:
198 # TODO: someday we will need to deal with hardlinks and it might appear here.
199 path_type = PathType.FILE
200 elif path.is_symlink: 200 ↛ 220line 200 didn't jump to line 220 because the condition on line 200 was always true
201 # Special-case that we resolve immediately (since we need to normalize the target anyway)
202 link_target = debian_policy_normalize_symlink_target(
203 path.path,
204 path.readlink(),
205 )
206 return TarMember.virtual_path(
207 path.tar_path,
208 PathType.SYMLINK,
209 mtime,
210 link_target=link_target,
211 # Force mode to be 0777 as that is the mode we see in the data.tar. In theory, tar lets you set
212 # it to whatever. However, for reproducibility, we have to be well-behaved - and that is 0777.
213 mode=0o0777,
214 owner=owner,
215 uid=uid,
216 group=group,
217 gid=gid,
218 )
219 else:
220 assert not path.is_symlink
221 raise AssertionError(
222 f"Unsupported file type: {path.path} - not a file, dir nor a symlink!"
223 )
225 if not path.has_fs_path:
226 assert not path.is_file
227 return TarMember.virtual_path(
228 path.tar_path,
229 path_type,
230 mtime,
231 mode=mode,
232 owner=owner,
233 uid=uid,
234 group=group,
235 gid=gid,
236 )
237 may_steal_fs_path = path._can_replace_inline
238 return TarMember.from_file(
239 path.tar_path,
240 path.fs_path,
241 mode=mode,
242 uid=uid,
243 owner=owner,
244 gid=gid,
245 group=group,
246 path_type=path_type,
247 path_mtime=mtime,
248 clamp_mtime_to=clamp_mtime_to,
249 may_steal_fs_path=may_steal_fs_path,
250 )
253def _generate_intermediate_manifest(
254 fs_root: FSPath,
255 clamp_mtime_to: int,
256) -> Iterable[TarMember]:
257 symlinks = []
258 for path in fs_root.all_paths():
259 tar_member = _path_to_tar_member(path, clamp_mtime_to)
260 if tar_member.path_type == PathType.SYMLINK:
261 symlinks.append(tar_member)
262 continue
263 yield tar_member
264 yield from symlinks
267ST = TypeVar("ST")
268T = TypeVar("T")
271class AbstractYAMLSubStore(Generic[ST]):
272 def __init__(
273 self,
274 parent_store: Any,
275 parent_key: int | str | None,
276 store: ST | None = None,
277 ) -> None:
278 if parent_store is not None and parent_key is not None:
279 try:
280 from_parent_store = parent_store[parent_key]
281 except (KeyError, IndexError):
282 from_parent_store = None
283 if ( 283 ↛ 288line 283 didn't jump to line 288 because the condition on line 283 was never true
284 store is not None
285 and from_parent_store is not None
286 and store is not parent_store
287 ):
288 raise ValueError(
289 "Store is provided but is not the one already in the parent store"
290 )
291 if store is None: 291 ↛ 293line 291 didn't jump to line 293 because the condition on line 291 was always true
292 store = from_parent_store
293 self._parent_store = parent_store
294 self._parent_key = parent_key
295 self._is_detached = (
296 parent_key is None or parent_store is None or parent_key not in parent_store
297 )
298 assert self._is_detached or store is not None
299 if store is None:
300 store = self._create_new_instance()
301 self._store: ST = store
303 def _create_new_instance(self) -> ST:
304 raise NotImplementedError
306 def create_definition_if_missing(self) -> None:
307 if self._is_detached:
308 self.create_definition()
310 def create_definition(self) -> None:
311 if not self._is_detached: 311 ↛ 312line 311 didn't jump to line 312 because the condition on line 311 was never true
312 raise RuntimeError("Definition is already present")
313 parent_store = self._parent_store
314 if parent_store is None: 314 ↛ 315line 314 didn't jump to line 315 because the condition on line 314 was never true
315 raise RuntimeError(
316 f"Definition is not attached to any parent!? ({self.__class__.__name__})"
317 )
318 if isinstance(parent_store, list):
319 assert self._parent_key is None
320 self._parent_key = len(parent_store)
321 self._parent_store.append(self._store)
322 else:
323 parent_store[self._parent_key] = self._store
324 self._is_detached = False
326 def remove_definition(self) -> None:
327 self._ensure_attached()
328 del self._parent_store[self._parent_key]
329 if isinstance(self._parent_store, list):
330 self._parent_key = None
331 self._is_detached = True
333 def _ensure_attached(self) -> None:
334 if self._is_detached:
335 raise RuntimeError("The definition has been removed!")
338class AbstractYAMLListSubStore(Generic[T], AbstractYAMLSubStore[list[T]]):
339 def _create_new_instance(self) -> list[T]:
340 return CommentedSeq()
343class AbstractYAMLDictSubStore(Generic[T], AbstractYAMLSubStore[dict[str, T]]):
344 def _create_new_instance(self) -> dict[str, T]:
345 return CommentedMap()
348class MutableCondition:
349 @classmethod
350 def arch_matches(cls, arch_filter: str) -> CommentedMap:
351 return CommentedMap({MK_CONDITION_ARCH_MATCHES: arch_filter})
353 @classmethod
354 def build_profiles_matches(cls, build_profiles_matches: str) -> CommentedMap:
355 return CommentedMap(
356 {MK_CONDITION_BUILD_PROFILES_MATCHES: build_profiles_matches}
357 )
360class MutableYAMLSymlink(AbstractYAMLDictSubStore[Any]):
361 @classmethod
362 def new_symlink(
363 cls, link_path: str, link_target: str, condition: Any | None
364 ) -> "MutableYAMLSymlink":
365 inner = {
366 MK_TRANSFORMATIONS_CREATE_SYMLINK_LINK_PATH: link_path,
367 MK_TRANSFORMATIONS_CREATE_SYMLINK_LINK_TARGET: link_target,
368 }
369 content = {MK_TRANSFORMATIONS_CREATE_SYMLINK: inner}
370 if condition is not None: 370 ↛ 371line 370 didn't jump to line 371 because the condition on line 370 was never true
371 inner["when"] = condition
372 return cls(None, None, store=CommentedMap(content))
374 @property
375 def symlink_path(self) -> str:
376 return self._store[MK_TRANSFORMATIONS_CREATE_SYMLINK][
377 MK_TRANSFORMATIONS_CREATE_SYMLINK_LINK_PATH
378 ]
380 @symlink_path.setter
381 def symlink_path(self, path: str) -> None:
382 self._store[MK_TRANSFORMATIONS_CREATE_SYMLINK][
383 MK_TRANSFORMATIONS_CREATE_SYMLINK_LINK_PATH
384 ] = path
386 @property
387 def symlink_target(self) -> str | None:
388 return self._store[MK_TRANSFORMATIONS_CREATE_SYMLINK][
389 MK_TRANSFORMATIONS_CREATE_SYMLINK_LINK_TARGET
390 ]
392 @symlink_target.setter
393 def symlink_target(self, target: str) -> None:
394 self._store[MK_TRANSFORMATIONS_CREATE_SYMLINK][
395 MK_TRANSFORMATIONS_CREATE_SYMLINK_LINK_TARGET
396 ] = target
399class MutableYAMLConffileManagementItem(AbstractYAMLDictSubStore[Any]):
400 @classmethod
401 def rm_conffile(
402 cls,
403 conffile: str,
404 prior_to_version: str | None,
405 owning_package: str | None,
406 ) -> "MutableYAMLConffileManagementItem":
407 r = cls(
408 None,
409 None,
410 store=CommentedMap(
411 {
412 MK_CONFFILE_MANAGEMENT_REMOVE: CommentedMap(
413 {MK_CONFFILE_MANAGEMENT_REMOVE_PATH: conffile}
414 )
415 }
416 ),
417 )
418 r.prior_to_version = prior_to_version
419 r.owning_package = owning_package
420 return r
422 @classmethod
423 def mv_conffile(
424 cls,
425 old_conffile: str,
426 new_conffile: str,
427 prior_to_version: str | None,
428 owning_package: str | None,
429 ) -> "MutableYAMLConffileManagementItem":
430 r = cls(
431 None,
432 None,
433 store=CommentedMap(
434 {
435 MK_CONFFILE_MANAGEMENT_RENAME: CommentedMap(
436 {
437 MK_CONFFILE_MANAGEMENT_RENAME_SOURCE: old_conffile,
438 MK_CONFFILE_MANAGEMENT_RENAME_TARGET: new_conffile,
439 }
440 )
441 }
442 ),
443 )
444 r.prior_to_version = prior_to_version
445 r.owning_package = owning_package
446 return r
448 @property
449 def _container(self) -> dict[str, Any]:
450 assert len(self._store) == 1
451 return next(iter(self._store.values()))
453 @property
454 def command(self) -> str:
455 assert len(self._store) == 1
456 return next(iter(self._store))
458 @property
459 def obsolete_conffile(self) -> str:
460 if self.command == MK_CONFFILE_MANAGEMENT_REMOVE:
461 return self._container[MK_CONFFILE_MANAGEMENT_REMOVE_PATH]
462 assert self.command == MK_CONFFILE_MANAGEMENT_RENAME
463 return self._container[MK_CONFFILE_MANAGEMENT_RENAME_SOURCE]
465 @obsolete_conffile.setter
466 def obsolete_conffile(self, value: str) -> None:
467 if self.command == MK_CONFFILE_MANAGEMENT_REMOVE:
468 self._container[MK_CONFFILE_MANAGEMENT_REMOVE_PATH] = value
469 else:
470 assert self.command == MK_CONFFILE_MANAGEMENT_RENAME
471 self._container[MK_CONFFILE_MANAGEMENT_RENAME_SOURCE] = value
473 @property
474 def new_conffile(self) -> str:
475 if self.command != MK_CONFFILE_MANAGEMENT_RENAME:
476 raise TypeError(
477 f"The new_conffile attribute is only applicable to command {MK_CONFFILE_MANAGEMENT_RENAME}."
478 f" This is a {self.command}"
479 )
480 return self._container[MK_CONFFILE_MANAGEMENT_RENAME_TARGET]
482 @new_conffile.setter
483 def new_conffile(self, value: str) -> None:
484 if self.command != MK_CONFFILE_MANAGEMENT_RENAME:
485 raise TypeError(
486 f"The new_conffile attribute is only applicable to command {MK_CONFFILE_MANAGEMENT_RENAME}."
487 f" This is a {self.command}"
488 )
489 self._container[MK_CONFFILE_MANAGEMENT_RENAME_TARGET] = value
491 @property
492 def prior_to_version(self) -> str | None:
493 return self._container.get(MK_CONFFILE_MANAGEMENT_X_PRIOR_TO_VERSION)
495 @prior_to_version.setter
496 def prior_to_version(self, value: str | None) -> None:
497 if value is None:
498 try:
499 del self._container[MK_CONFFILE_MANAGEMENT_X_PRIOR_TO_VERSION]
500 except KeyError:
501 pass
502 else:
503 self._container[MK_CONFFILE_MANAGEMENT_X_PRIOR_TO_VERSION] = value
505 @property
506 def owning_package(self) -> str | None:
507 return self._container[MK_CONFFILE_MANAGEMENT_X_PRIOR_TO_VERSION]
509 @owning_package.setter
510 def owning_package(self, value: str | None) -> None:
511 if value is None:
512 try:
513 del self._container[MK_CONFFILE_MANAGEMENT_X_OWNING_PACKAGE]
514 except KeyError:
515 pass
516 else:
517 self._container[MK_CONFFILE_MANAGEMENT_X_OWNING_PACKAGE] = value
520class MutableYAMLPackageDefinition(AbstractYAMLDictSubStore):
521 def _list_store(
522 self, key, *, create_if_absent: bool = False
523 ) -> list[dict[str, Any]] | None:
524 if self._is_detached or key not in self._store:
525 if create_if_absent: 525 ↛ 526line 525 didn't jump to line 526 because the condition on line 525 was never true
526 return None
527 self.create_definition_if_missing()
528 self._store[key] = []
529 return self._store[key]
531 def _insert_item(self, key: str, item: AbstractYAMLDictSubStore) -> None:
532 parent_store = self._list_store(key, create_if_absent=True)
533 assert parent_store is not None
534 if not item._is_detached or ( 534 ↛ 537line 534 didn't jump to line 537 because the condition on line 534 was never true
535 item._parent_store is not None and item._parent_store is not parent_store
536 ):
537 raise RuntimeError(
538 "Item is already attached or associated with a different container"
539 )
540 item._parent_store = parent_store
541 item.create_definition()
543 def add_symlink(self, symlink: MutableYAMLSymlink) -> None:
544 self._insert_item(MK_TRANSFORMATIONS, symlink)
546 def symlinks(self) -> Iterable[MutableYAMLSymlink]:
547 store = self._list_store(MK_TRANSFORMATIONS)
548 if store is None: 548 ↛ 549line 548 didn't jump to line 549 because the condition on line 548 was never true
549 return
550 for i in range(len(store)): 550 ↛ 551line 550 didn't jump to line 551 because the loop on line 550 never started
551 d = store[i]
552 if d and isinstance(d, dict) and len(d) == 1 and "symlink" in d:
553 yield MutableYAMLSymlink(store, i)
555 def conffile_management_items(self) -> Iterable[MutableYAMLConffileManagementItem]:
556 store = self._list_store(MK_CONFFILE_MANAGEMENT)
557 if store is None: 557 ↛ 558line 557 didn't jump to line 558 because the condition on line 557 was never true
558 return
559 yield from (
560 MutableYAMLConffileManagementItem(store, i) for i in range(len(store))
561 )
563 def add_conffile_management(
564 self, conffile_management_item: MutableYAMLConffileManagementItem
565 ) -> None:
566 self._insert_item(MK_CONFFILE_MANAGEMENT, conffile_management_item)
569class AbstractMutableYAMLInstallRule(AbstractYAMLDictSubStore):
570 @property
571 def _container(self) -> dict[str, Any]:
572 assert len(self._store) == 1
573 return next(iter(self._store.values()))
575 @property
576 def into(self) -> list[str] | None:
577 v = self._container[MK_INSTALLATIONS_INSTALL_INTO]
578 if v is None:
579 return None
580 if isinstance(v, str):
581 return [v]
582 return v
584 @into.setter
585 def into(self, new_value: str | list[str] | None) -> None:
586 if new_value is None: 586 ↛ 590line 586 didn't jump to line 590 because the condition on line 586 was always true
587 with suppress(KeyError):
588 del self._container[MK_INSTALLATIONS_INSTALL_INTO]
589 return
590 if isinstance(new_value, str):
591 self._container[MK_INSTALLATIONS_INSTALL_INTO] = new_value
592 return
593 new_list = CommentedSeq(new_value)
594 self._container[MK_INSTALLATIONS_INSTALL_INTO] = new_list
596 @property
597 def when(self) -> str | Mapping[str, Any] | None:
598 return self._container[MK_CONDITION_WHEN]
600 @when.setter
601 def when(self, new_value: str | Mapping[str, Any] | None) -> None:
602 if new_value is None: 602 ↛ 603line 602 didn't jump to line 603 because the condition on line 602 was never true
603 with suppress(KeyError):
604 del self._container[MK_CONDITION_WHEN]
605 return
606 if isinstance(new_value, str): 606 ↛ 607line 606 didn't jump to line 607 because the condition on line 606 was never true
607 self._container[MK_CONDITION_WHEN] = new_value
608 return
609 new_map = CommentedMap(new_value)
610 self._container[MK_CONDITION_WHEN] = new_map
612 @classmethod
613 def install_dest(
614 cls,
615 sources: str | list[str],
616 into: str | list[str] | None,
617 *,
618 dest_dir: str | None = None,
619 when: str | Mapping[str, Any] | None = None,
620 ) -> "MutableYAMLInstallRuleInstall":
621 k = MK_INSTALLATIONS_INSTALL_SOURCES
622 if isinstance(sources, str):
623 k = MK_INSTALLATIONS_INSTALL_SOURCE
624 r = MutableYAMLInstallRuleInstall(
625 None,
626 None,
627 store=CommentedMap(
628 {
629 MK_INSTALLATIONS_INSTALL: CommentedMap(
630 {
631 k: sources,
632 }
633 )
634 }
635 ),
636 )
637 r.dest_dir = dest_dir
638 r.into = into
639 if when is not None:
640 r.when = when
641 return r
643 @classmethod
644 def multi_dest_install(
645 cls,
646 sources: str | list[str],
647 dest_dirs: Sequence[str],
648 into: str | list[str] | None,
649 *,
650 when: str | Mapping[str, Any] | None = None,
651 ) -> "MutableYAMLInstallRuleInstall":
652 k = MK_INSTALLATIONS_INSTALL_SOURCES
653 if isinstance(sources, str): 653 ↛ 655line 653 didn't jump to line 655 because the condition on line 653 was always true
654 k = MK_INSTALLATIONS_INSTALL_SOURCE
655 r = MutableYAMLInstallRuleInstall(
656 None,
657 None,
658 store=CommentedMap(
659 {
660 MK_INSTALLATIONS_MULTI_DEST_INSTALL: CommentedMap(
661 {
662 k: sources,
663 "dest-dirs": dest_dirs,
664 }
665 )
666 }
667 ),
668 )
669 r.into = into
670 if when is not None: 670 ↛ 671line 670 didn't jump to line 671 because the condition on line 670 was never true
671 r.when = when
672 return r
674 @classmethod
675 def install_as(
676 cls,
677 source: str,
678 install_as: str,
679 into: str | list[str] | None,
680 when: str | Mapping[str, Any] | None = None,
681 ) -> "MutableYAMLInstallRuleInstall":
682 r = MutableYAMLInstallRuleInstall(
683 None,
684 None,
685 store=CommentedMap(
686 {
687 MK_INSTALLATIONS_INSTALL: CommentedMap(
688 {
689 MK_INSTALLATIONS_INSTALL_SOURCE: source,
690 MK_INSTALLATIONS_INSTALL_AS: install_as,
691 }
692 )
693 }
694 ),
695 )
696 r.into = into
697 if when is not None: 697 ↛ 698line 697 didn't jump to line 698 because the condition on line 697 was never true
698 r.when = when
699 return r
701 @classmethod
702 def install_doc_as(
703 cls,
704 source: str,
705 install_as: str,
706 into: str | list[str] | None,
707 when: str | Mapping[str, Any] | None = None,
708 ) -> "MutableYAMLInstallRuleInstall":
709 r = MutableYAMLInstallRuleInstall(
710 None,
711 None,
712 store=CommentedMap(
713 {
714 MK_INSTALLATIONS_INSTALL_DOCS: CommentedMap(
715 {
716 MK_INSTALLATIONS_INSTALL_SOURCE: source,
717 MK_INSTALLATIONS_INSTALL_AS: install_as,
718 }
719 )
720 }
721 ),
722 )
723 r.into = into
724 if when is not None:
725 r.when = when
726 return r
728 @classmethod
729 def install_docs(
730 cls,
731 sources: str | list[str],
732 into: str | list[str] | None,
733 *,
734 dest_dir: str | None = None,
735 when: str | Mapping[str, Any] | None = None,
736 ) -> "MutableYAMLInstallRuleInstall":
737 k = MK_INSTALLATIONS_INSTALL_SOURCES
738 if isinstance(sources, str):
739 k = MK_INSTALLATIONS_INSTALL_SOURCE
740 r = MutableYAMLInstallRuleInstall(
741 None,
742 None,
743 store=CommentedMap(
744 {
745 MK_INSTALLATIONS_INSTALL_DOCS: CommentedMap(
746 {
747 k: sources,
748 }
749 )
750 }
751 ),
752 )
753 r.into = into
754 r.dest_dir = dest_dir
755 if when is not None:
756 r.when = when
757 return r
759 @classmethod
760 def install_examples(
761 cls,
762 sources: str | list[str],
763 into: str | list[str] | None,
764 when: str | Mapping[str, Any] | None = None,
765 ) -> "MutableYAMLInstallRuleInstallExamples":
766 k = MK_INSTALLATIONS_INSTALL_SOURCES
767 if isinstance(sources, str):
768 k = MK_INSTALLATIONS_INSTALL_SOURCE
769 r = MutableYAMLInstallRuleInstallExamples(
770 None,
771 None,
772 store=CommentedMap(
773 {
774 MK_INSTALLATIONS_INSTALL_EXAMPLES: CommentedMap(
775 {
776 k: sources,
777 }
778 )
779 }
780 ),
781 )
782 r.into = into
783 if when is not None: 783 ↛ 784line 783 didn't jump to line 784 because the condition on line 783 was never true
784 r.when = when
785 return r
787 @classmethod
788 def install_man(
789 cls,
790 sources: str | list[str],
791 into: str | list[str] | None,
792 language: str | None,
793 when: str | Mapping[str, Any] | None = None,
794 ) -> "MutableYAMLInstallRuleMan":
795 k = MK_INSTALLATIONS_INSTALL_SOURCES
796 if isinstance(sources, str): 796 ↛ 797line 796 didn't jump to line 797 because the condition on line 796 was never true
797 k = MK_INSTALLATIONS_INSTALL_SOURCE
798 r = MutableYAMLInstallRuleMan(
799 None,
800 None,
801 store=CommentedMap(
802 {
803 MK_INSTALLATIONS_INSTALL_MAN: CommentedMap(
804 {
805 k: sources,
806 }
807 )
808 }
809 ),
810 )
811 r.language = language
812 r.into = into
813 if when is not None: 813 ↛ 814line 813 didn't jump to line 814 because the condition on line 813 was never true
814 r.when = when
815 return r
817 @classmethod
818 def discard(
819 cls,
820 sources: str | list[str],
821 ) -> "MutableYAMLInstallRuleDiscard":
822 return MutableYAMLInstallRuleDiscard(
823 None,
824 None,
825 store=CommentedMap({MK_INSTALLATIONS_DISCARD: sources}),
826 )
829class MutableYAMLInstallRuleInstallExamples(AbstractMutableYAMLInstallRule):
830 pass
833class MutableYAMLInstallRuleMan(AbstractMutableYAMLInstallRule):
834 @property
835 def language(self) -> str | None:
836 return self._container[MK_INSTALLATIONS_INSTALL_MAN_LANGUAGE]
838 @language.setter
839 def language(self, new_value: str | None) -> None:
840 if new_value is not None:
841 self._container[MK_INSTALLATIONS_INSTALL_MAN_LANGUAGE] = new_value
842 return
843 with suppress(KeyError):
844 del self._container[MK_INSTALLATIONS_INSTALL_MAN_LANGUAGE]
847class MutableYAMLInstallRuleDiscard(AbstractMutableYAMLInstallRule):
848 pass
851class MutableYAMLInstallRuleInstall(AbstractMutableYAMLInstallRule):
852 @property
853 def sources(self) -> list[str]:
854 v = self._container[MK_INSTALLATIONS_INSTALL_SOURCES]
855 if isinstance(v, str):
856 return [v]
857 return v
859 @sources.setter
860 def sources(self, new_value: str | list[str]) -> None:
861 if isinstance(new_value, str):
862 self._container[MK_INSTALLATIONS_INSTALL_SOURCES] = new_value
863 return
864 new_list = CommentedSeq(new_value)
865 self._container[MK_INSTALLATIONS_INSTALL_SOURCES] = new_list
867 @property
868 def dest_dir(self) -> str | None:
869 return self._container.get(MK_INSTALLATIONS_INSTALL_DEST_DIR)
871 @dest_dir.setter
872 def dest_dir(self, new_value: str | None) -> None:
873 if new_value is not None and self.dest_as is not None: 873 ↛ 874line 873 didn't jump to line 874 because the condition on line 873 was never true
874 raise ValueError(
875 f'Cannot both have a "{MK_INSTALLATIONS_INSTALL_DEST_DIR}" and'
876 f' "{MK_INSTALLATIONS_INSTALL_AS}"'
877 )
878 if new_value is not None:
879 self._container[MK_INSTALLATIONS_INSTALL_DEST_DIR] = new_value
880 else:
881 with suppress(KeyError):
882 del self._container[MK_INSTALLATIONS_INSTALL_DEST_DIR]
884 @property
885 def dest_as(self) -> str | None:
886 return self._container.get(MK_INSTALLATIONS_INSTALL_AS)
888 @dest_as.setter
889 def dest_as(self, new_value: str | None) -> None:
890 if new_value is not None:
891 if self.dest_dir is not None:
892 raise ValueError(
893 f'Cannot both have a "{MK_INSTALLATIONS_INSTALL_DEST_DIR}" and'
894 f' "{MK_INSTALLATIONS_INSTALL_AS}"'
895 )
897 sources = self._container[MK_INSTALLATIONS_INSTALL_SOURCES]
898 if isinstance(sources, list):
899 if len(sources) != 1:
900 raise ValueError(
901 f'Cannot have "{MK_INSTALLATIONS_INSTALL_AS}" when'
902 f' "{MK_INSTALLATIONS_INSTALL_SOURCES}" is not exactly one item'
903 )
904 self.sources = sources[0]
905 self._container[MK_INSTALLATIONS_INSTALL_AS] = new_value
906 else:
907 with suppress(KeyError):
908 del self._container[MK_INSTALLATIONS_INSTALL_AS]
911class MutableYAMLInstallationsDefinition(AbstractYAMLListSubStore[Any]):
912 def append(self, install_rule: AbstractMutableYAMLInstallRule) -> None:
913 parent_store = self._store
914 if not install_rule._is_detached or ( 914 ↛ 918line 914 didn't jump to line 918 because the condition on line 914 was never true
915 install_rule._parent_store is not None
916 and install_rule._parent_store is not parent_store
917 ):
918 raise RuntimeError(
919 "Item is already attached or associated with a different container"
920 )
921 self.create_definition_if_missing()
922 install_rule._parent_store = parent_store
923 install_rule.create_definition()
925 def extend(self, install_rules: Iterable[AbstractMutableYAMLInstallRule]) -> None:
926 parent_store = self._store
927 for install_rule in install_rules:
928 if not install_rule._is_detached or ( 928 ↛ 932line 928 didn't jump to line 932 because the condition on line 928 was never true
929 install_rule._parent_store is not None
930 and install_rule._parent_store is not parent_store
931 ):
932 raise RuntimeError(
933 "Item is already attached or associated with a different container"
934 )
935 self.create_definition_if_missing()
936 install_rule._parent_store = parent_store
937 install_rule.create_definition()
940class MutableYAMLManifestVariables(AbstractYAMLDictSubStore):
941 @property
942 def variables(self) -> dict[str, Any]:
943 return self._store
945 def __setitem__(self, key: str, value: Any) -> None:
946 self._store[key] = value
947 self.create_definition_if_missing()
950class MutableYAMLManifestDefinitions(AbstractYAMLDictSubStore):
951 def manifest_variables(
952 self, *, create_if_absent: bool = True
953 ) -> MutableYAMLManifestVariables:
954 d = MutableYAMLManifestVariables(self._store, MK_MANIFEST_VARIABLES)
955 if create_if_absent: 955 ↛ 956line 955 didn't jump to line 956 because the condition on line 955 was never true
956 d.create_definition_if_missing()
957 return d
960class MutableYAMLRemoveDuringCleanDefinitions(AbstractYAMLListSubStore[str]):
961 def append(self, rule: str) -> None:
962 self.create_definition_if_missing()
963 self._store.append(rule)
965 def __len__(self) -> int:
966 return len(self._store)
968 def extend(self, rules: Iterable[str]) -> None:
969 it = iter(rules)
970 try:
971 first_rule = next(it)
972 except StopIteration:
973 return
974 self.create_definition_if_missing()
975 self._store.append(first_rule)
976 self._store.extend(it)
979class MutableYAMLManifest:
980 def __init__(self, store: Any) -> None:
981 self._store = store
983 @classmethod
984 def empty_manifest(cls) -> "MutableYAMLManifest":
985 return cls(CommentedMap({MK_MANIFEST_VERSION: DEFAULT_MANIFEST_VERSION}))
987 @property
988 def manifest_version(self) -> str:
989 return self._store[MK_MANIFEST_VERSION]
991 @manifest_version.setter
992 def manifest_version(self, version: str) -> None:
993 if version not in SUPPORTED_MANIFEST_VERSIONS:
994 raise ValueError("Unsupported version")
995 self._store[MK_MANIFEST_VERSION] = version
997 def remove_during_clean(
998 self,
999 *,
1000 create_if_absent: bool = True,
1001 ) -> MutableYAMLRemoveDuringCleanDefinitions:
1002 d = MutableYAMLRemoveDuringCleanDefinitions(
1003 self._store, MK_MANIFEST_REMOVE_DURING_CLEAN
1004 )
1005 if create_if_absent: 1005 ↛ 1006line 1005 didn't jump to line 1006 because the condition on line 1005 was never true
1006 d.create_definition_if_missing()
1007 return d
1009 def installations(
1010 self,
1011 *,
1012 create_if_absent: bool = True,
1013 ) -> MutableYAMLInstallationsDefinition:
1014 d = MutableYAMLInstallationsDefinition(self._store, MK_INSTALLATIONS)
1015 if create_if_absent: 1015 ↛ 1016line 1015 didn't jump to line 1016 because the condition on line 1015 was never true
1016 d.create_definition_if_missing()
1017 return d
1019 def manifest_definitions(
1020 self,
1021 *,
1022 create_if_absent: bool = True,
1023 ) -> MutableYAMLManifestDefinitions:
1024 d = MutableYAMLManifestDefinitions(self._store, MK_MANIFEST_DEFINITIONS)
1025 if create_if_absent: 1025 ↛ 1026line 1025 didn't jump to line 1026 because the condition on line 1025 was never true
1026 d.create_definition_if_missing()
1027 return d
1029 def package(
1030 self, name: str, *, create_if_absent: bool = True
1031 ) -> MutableYAMLPackageDefinition:
1032 if MK_PACKAGES not in self._store: 1032 ↛ 1034line 1032 didn't jump to line 1034 because the condition on line 1032 was always true
1033 self._store[MK_PACKAGES] = CommentedMap()
1034 packages_store = self._store[MK_PACKAGES]
1035 package = packages_store.get(name)
1036 if package is None: 1036 ↛ 1043line 1036 didn't jump to line 1043 because the condition on line 1036 was always true
1037 if not create_if_absent: 1037 ↛ 1038line 1037 didn't jump to line 1038 because the condition on line 1037 was never true
1038 raise KeyError(name)
1039 assert packages_store is not None
1040 d = MutableYAMLPackageDefinition(packages_store, name)
1041 d.create_definition()
1042 else:
1043 d = MutableYAMLPackageDefinition(packages_store, name)
1044 return d
1046 def write_to(self, fd) -> None:
1047 MANIFEST_YAML.dump(self._store, fd)
1050def _describe_missing_path(entry: VirtualPath) -> str:
1051 if entry.is_dir:
1052 return f"{entry.fs_path}/ (empty directory; possible integration point)"
1053 if entry.is_symlink:
1054 target = os.readlink(entry.fs_path)
1055 return f"{entry.fs_path} (symlink; links to {target})"
1056 if entry.is_file:
1057 return f"{entry.fs_path} (file)"
1058 return f"{entry.fs_path} (other!? Probably not supported by debputy and may need a `remove`)"
1061def _detect_missing_installations(
1062 path_matcher: SourcePathMatcher,
1063 search_dir: VirtualPath,
1064) -> None:
1065 if not search_dir.is_dir: 1065 ↛ 1066line 1065 didn't jump to line 1066 because the condition on line 1065 was never true
1066 return
1067 missing = list(path_matcher.detect_missing(search_dir))
1068 if not missing:
1069 return
1071 excl = textwrap.dedent(
1072 """\
1073 - discard: "*"
1074 """
1075 )
1077 raise PathNotCoveredByInstallRulesError(
1078 "Please review the list and add either install rules or exclusions to `installations` in"
1079 " debian/debputy.manifest. If you do not need any of these paths, add the following to the"
1080 f" end of your 'installations`:\n\n{excl}\n",
1081 missing,
1082 search_dir,
1083 )
1086def _list_automatic_discard_rules(path_matcher: SourcePathMatcher) -> None:
1087 used_discard_rules = path_matcher.used_auto_discard_rules
1088 # Discard rules can match and then be overridden. In that case, they appear
1089 # but have 0 matches.
1090 if not sum((len(v) for v in used_discard_rules.values()), 0):
1091 return
1092 _info("The following automatic discard rules were triggered:")
1093 example_path: str | None = None
1094 for rule in sorted(used_discard_rules):
1095 for fs_path in sorted(used_discard_rules[rule]):
1096 if example_path is None: 1096 ↛ 1098line 1096 didn't jump to line 1098 because the condition on line 1096 was always true
1097 example_path = fs_path
1098 _info(f" * {rule} -> {fs_path}")
1099 assert example_path is not None
1100 _info("")
1101 _info(
1102 "Note that some of these may have been overruled. The overrule detection logic is not"
1103 )
1104 _info("100% reliable.")
1105 _info("")
1106 _info(
1107 "You can overrule an automatic discard rule by explicitly listing the path. As an example:"
1108 )
1109 _info(" installations:")
1110 _info(" - install:")
1111 _info(f" source: {example_path}")
1114def _install_everything_from_source_dir_if_present(
1115 dctrl_bin: BinaryPackage,
1116 substitution: Substitution,
1117 path_matcher: SourcePathMatcher,
1118 install_rule_context: InstallRuleContext,
1119 source_condition_context: ConditionContext,
1120 source_dir: VirtualPath,
1121 *,
1122 into_dir: VirtualPath | None = None,
1123) -> None:
1124 attribute_path = AttributePath.builtin_path()[f"installing {source_dir.fs_path}"]
1125 pkg_set = frozenset([dctrl_bin])
1126 install_rule = run_in_context_of_plugin(
1127 "debputy",
1128 InstallRule.install_dest,
1129 [FileSystemMatchRule.from_path_match("*", attribute_path, substitution)],
1130 None,
1131 pkg_set,
1132 f"Built-in; install everything from {source_dir.fs_path} into {dctrl_bin.name}",
1133 None,
1134 )
1135 pkg_search_dir: tuple[SearchDir] = (
1136 SearchDir(
1137 source_dir,
1138 pkg_set,
1139 ),
1140 )
1141 replacements = {
1142 "search_dirs": pkg_search_dir,
1143 }
1144 if into_dir is not None: 1144 ↛ 1145line 1144 didn't jump to line 1145 because the condition on line 1144 was never true
1145 binary_package_contexts = dict(install_rule_context.binary_package_contexts)
1146 updated = binary_package_contexts[dctrl_bin.name].replace(fs_root=into_dir)
1147 binary_package_contexts[dctrl_bin.name] = updated
1148 replacements["binary_package_contexts"] = binary_package_contexts
1150 fake_install_rule_context = install_rule_context.replace(**replacements)
1151 try:
1152 install_rule.perform_install(
1153 path_matcher,
1154 fake_install_rule_context,
1155 source_condition_context,
1156 )
1157 except (
1158 NoMatchForInstallPatternError,
1159 PathAlreadyInstalledOrDiscardedError,
1160 ):
1161 # Empty directory or everything excluded by default; ignore the error
1162 pass
1165def _add_build_install_dirs_to_per_package_search_dirs(
1166 build_system_install_dirs: Sequence[tuple[str, frozenset[BinaryPackage]]],
1167 per_package_search_dirs: dict[BinaryPackage, list[VirtualPath]],
1168 as_path: Callable[[str], VirtualPath],
1169) -> None:
1170 seen_pp_search_dirs: set[tuple[BinaryPackage, str]] = set()
1171 for dest_dir, for_packages in build_system_install_dirs:
1172 dest_path = as_path(dest_dir)
1173 for pkg in for_packages:
1174 seen_key = (pkg, dest_dir)
1175 if seen_key in seen_pp_search_dirs:
1176 continue
1177 seen_pp_search_dirs.add(seen_key)
1178 if pkg not in per_package_search_dirs:
1179 per_package_search_dirs[pkg] = [dest_path]
1180 else:
1181 per_package_search_dirs[pkg].append(dest_path)
1184class HighLevelManifest:
1185 def __init__(
1186 self,
1187 manifest_path: str,
1188 mutable_manifest: MutableYAMLManifest | None,
1189 remove_during_clean_rules: list[FileSystemMatchRule],
1190 install_rules: list[InstallRule] | None,
1191 source_package: SourcePackage,
1192 binary_packages: Mapping[str, BinaryPackage],
1193 substitution: Substitution,
1194 package_transformations: Mapping[str, PackageTransformationDefinition],
1195 dpkg_architecture_variables: DpkgArchitectureBuildProcessValuesTable,
1196 dpkg_arch_query_table: DpkgArchTable,
1197 build_env: DebBuildOptionsAndProfiles,
1198 build_environments: BuildEnvironments,
1199 build_rules: list[BuildRule] | None,
1200 plugin_provided_feature_set: PluginProvidedFeatureSet,
1201 debian_dir: VirtualPath,
1202 ) -> None:
1203 self.manifest_path = manifest_path
1204 self.mutable_manifest = mutable_manifest
1205 self._remove_during_clean_rules: list[FileSystemMatchRule] = (
1206 remove_during_clean_rules
1207 )
1208 self._install_rules = install_rules
1209 self._source_package = source_package
1210 self._binary_packages = binary_packages
1211 self.substitution = substitution
1212 self.package_transformations = package_transformations
1213 self._dpkg_architecture_variables = dpkg_architecture_variables
1214 self._dpkg_arch_query_table = dpkg_arch_query_table
1215 self._build_env = build_env
1216 self._used_for: set[str] = set()
1217 self.build_environments = build_environments
1218 self.build_rules = build_rules
1219 self._plugin_provided_feature_set = plugin_provided_feature_set
1220 self._debian_dir = debian_dir
1221 self._source_condition_context = ConditionContext(
1222 binary_package=None,
1223 substitution=self.substitution,
1224 deb_options_and_profiles=self._build_env,
1225 dpkg_architecture_variables=self._dpkg_architecture_variables,
1226 dpkg_arch_query_table=self._dpkg_arch_query_table,
1227 )
1229 def source_version(self, include_binnmu_version: bool = True) -> str:
1230 # TODO: There should an easier way to determine the source version; really.
1231 version_var = "{{DEB_VERSION}}"
1232 if not include_binnmu_version:
1233 version_var = "{{_DEBPUTY_INTERNAL_NON_BINNMU_SOURCE}}"
1234 try:
1235 return self.substitution.substitute(
1236 version_var, "internal (resolve version)"
1237 )
1238 except DebputySubstitutionError as e:
1239 raise AssertionError(f"Could not resolve {version_var}") from e
1241 @property
1242 def source_condition_context(self) -> ConditionContext:
1243 return self._source_condition_context
1245 @property
1246 def debian_dir(self) -> VirtualPath:
1247 return self._debian_dir
1249 @property
1250 def dpkg_architecture_variables(self) -> DpkgArchitectureBuildProcessValuesTable:
1251 return self._dpkg_architecture_variables
1253 @property
1254 def deb_options_and_profiles(self) -> DebBuildOptionsAndProfiles:
1255 return self._build_env
1257 @property
1258 def plugin_provided_feature_set(self) -> PluginProvidedFeatureSet:
1259 return self._plugin_provided_feature_set
1261 @property
1262 def remove_during_clean_rules(self) -> list[FileSystemMatchRule]:
1263 return self._remove_during_clean_rules
1265 @property
1266 def active_packages(self) -> Iterable[BinaryPackage]:
1267 yield from (p for p in self._binary_packages.values() if p.should_be_acted_on)
1269 @property
1270 def all_packages(self) -> Iterable[BinaryPackage]:
1271 yield from self._binary_packages.values()
1273 def package_state_for(self, package: str) -> PackageTransformationDefinition:
1274 return self.package_transformations[package]
1276 def _detect_doc_main_package_for(self, package: BinaryPackage) -> BinaryPackage:
1277 name = package.name
1278 for doc_main_field in ("Doc-Main-Package", "X-Doc-Main-Package"):
1279 doc_main_package_name = package.fields.get(doc_main_field)
1280 if doc_main_package_name: 1280 ↛ 1281line 1280 didn't jump to line 1281 because the condition on line 1280 was never true
1281 main_package = self._binary_packages.get(doc_main_package_name)
1282 if main_package is None:
1283 _error(
1284 f"Invalid Doc-Main-Package for {name}: The package {doc_main_package_name!r} is not listed in d/control"
1285 )
1286 return main_package
1287 # If it is not a -doc package, then docs should be installed
1288 # under its own package name.
1289 if not name.endswith("-doc"): 1289 ↛ 1291line 1289 didn't jump to line 1291 because the condition on line 1289 was always true
1290 return package
1291 name = name[:-4]
1292 main_package = self._binary_packages.get(name)
1293 if main_package:
1294 return main_package
1295 if name.startswith("lib"):
1296 dev_pkg = self._binary_packages.get(f"{name}-dev")
1297 if dev_pkg:
1298 return dev_pkg
1300 # If we found no better match; default to the doc package itself.
1301 return package
1303 def perform_installations(
1304 self,
1305 integration_mode: DebputyIntegrationMode,
1306 build_system_install_dirs: Sequence[tuple[str, frozenset[BinaryPackage]]],
1307 *,
1308 install_request_context: InstallSearchDirContext | None = None,
1309 ) -> PackageDataTable:
1310 package_data_dict = {}
1311 package_data_table = PackageDataTable(package_data_dict)
1312 enable_manifest_installation_feature = (
1313 integration_mode != INTEGRATION_MODE_DH_DEBPUTY_RRR
1314 )
1316 if build_system_install_dirs: 1316 ↛ 1317line 1316 didn't jump to line 1317 because the condition on line 1316 was never true
1317 if integration_mode != INTEGRATION_MODE_FULL:
1318 raise ValueError(
1319 "The build_system_install_dirs parameter can only be used in full integration mode"
1320 )
1321 if install_request_context:
1322 raise ValueError(
1323 "The build_system_install_dirs parameter cannot be used with install_request_context"
1324 " (not implemented)"
1325 )
1327 if install_request_context is None: 1327 ↛ 1329line 1327 didn't jump to line 1329 because the condition on line 1327 was never true
1329 @functools.lru_cache(None)
1330 def _as_path(fs_path: str) -> VirtualPath:
1331 return FSROOverlay.create_root_dir(".", fs_path)
1333 dtmp_dir = _as_path("debian/tmp")
1334 source_root_dir = _as_path(".")
1335 into = frozenset(self._binary_packages.values())
1336 per_package_search_dirs = {
1337 t.binary_package: [_as_path(f.match_rule.path) for f in t.search_dirs]
1338 for t in self.package_transformations.values()
1339 if t.search_dirs is not None
1340 }
1342 if integration_mode == INTEGRATION_MODE_FULL:
1343 # In this mode, we have no default search dir. Everything ends up being
1344 # per-package instead (since that is easier logic-wise).
1345 #
1346 # Even dtmp_dir is omitted here (since it is not universally applicable).
1347 # Note we still initialize dtmp_dir, since it affects a later guard.
1348 default_search_dirs = []
1349 _add_build_install_dirs_to_per_package_search_dirs(
1350 build_system_install_dirs,
1351 per_package_search_dirs,
1352 _as_path,
1353 )
1355 # We can end here with per_package_search_dirs having no search dirs for any package
1356 # (single binary, where everything is installed into d/<pkg> is the most common case).
1357 #
1358 # This is not a problem in itself as the installation rules can still apply to the
1359 # source root and there should be no reason to install something from d/<pkg> into
1360 # d/<another-pkg>
1361 else:
1362 default_search_dirs = [dtmp_dir]
1364 search_dirs = _determine_search_dir_order(
1365 per_package_search_dirs,
1366 into,
1367 default_search_dirs,
1368 source_root_dir,
1369 )
1370 check_for_uninstalled_dirs = tuple(
1371 s.search_dir
1372 for s in search_dirs
1373 if s.search_dir.fs_path != source_root_dir.fs_path
1374 )
1375 if enable_manifest_installation_feature:
1376 _present_installation_dirs(
1377 search_dirs, check_for_uninstalled_dirs, into
1378 )
1379 else:
1380 dtmp_dir = None
1381 search_dirs = install_request_context.search_dirs
1382 into = frozenset(self._binary_packages.values())
1383 seen: set[BinaryPackage] = set()
1384 for search_dir in search_dirs:
1385 seen.update(search_dir.applies_to)
1387 missing = into - seen
1388 if missing: 1388 ↛ 1389line 1388 didn't jump to line 1389 because the condition on line 1388 was never true
1389 names = ", ".join(p.name for p in missing)
1390 raise ValueError(
1391 f"The following package(s) had no search dirs: {names}."
1392 " (Generally, the source root would be applicable to all packages)"
1393 )
1394 extra_names = seen - into
1395 if extra_names: 1395 ↛ 1396line 1395 didn't jump to line 1396 because the condition on line 1395 was never true
1396 names = ", ".join(p.name for p in extra_names)
1397 raise ValueError(
1398 f"The install_request_context referenced the following unknown package(s): {names}"
1399 )
1401 check_for_uninstalled_dirs = (
1402 install_request_context.check_for_uninstalled_dirs
1403 )
1405 install_rule_context = InstallRuleContext(search_dirs)
1407 if ( 1407 ↛ 1414line 1407 didn't jump to line 1414 because the condition on line 1407 was never true
1408 enable_manifest_installation_feature
1409 and self._install_rules is None
1410 # TODO: Should we also do this for full mode when build systems provided search dirs?
1411 and dtmp_dir is not None
1412 and os.path.isdir(dtmp_dir.fs_path)
1413 ):
1414 msg = (
1415 "The build system appears to have provided the output of upstream build system's"
1416 " install in debian/tmp. However, these are no provisions for debputy to install"
1417 " any of that into any of the debian packages listed in debian/control."
1418 " To avoid accidentally creating empty packages, debputy will insist that you "
1419 " explicitly define an empty installation definition if you did not want to "
1420 " install any of those files even though they have been provided."
1421 ' Example: "installations: []"'
1422 )
1423 _error(msg)
1424 elif ( 1424 ↛ 1427line 1424 didn't jump to line 1427 because the condition on line 1424 was never true
1425 not enable_manifest_installation_feature and self._install_rules is not None
1426 ):
1427 _error(
1428 f"The `installations` feature cannot be used in {self.manifest_path} with this integration mode."
1429 f" Please remove or comment out the `installations` keyword."
1430 )
1432 for dctrl_bin in self.all_packages:
1433 package = dctrl_bin.name
1434 doc_main_package = self._detect_doc_main_package_for(dctrl_bin)
1436 install_rule_context[package] = BinaryPackageInstallRuleContext(
1437 dctrl_bin,
1438 FSRootDir(),
1439 doc_main_package,
1440 )
1442 if enable_manifest_installation_feature: 1442 ↛ 1447line 1442 didn't jump to line 1447 because the condition on line 1442 was always true
1443 discard_rules = list(
1444 self.plugin_provided_feature_set.auto_discard_rules.values()
1445 )
1446 else:
1447 discard_rules = [
1448 self.plugin_provided_feature_set.auto_discard_rules["debian-dir"]
1449 ]
1450 path_matcher = SourcePathMatcher(discard_rules)
1452 source_condition_context = self._source_condition_context
1454 for dctrl_bin in self.active_packages:
1455 package = dctrl_bin.name
1456 if install_request_context: 1456 ↛ 1461line 1456 didn't jump to line 1461 because the condition on line 1456 was always true
1457 build_system_staging_dir = install_request_context.debian_pkg_dirs.get(
1458 package
1459 )
1460 else:
1461 build_system_staging_dir_fs_path = os.path.join("debian", package)
1462 if os.path.isdir(build_system_staging_dir_fs_path):
1463 build_system_staging_dir = FSROOverlay.create_root_dir(
1464 ".",
1465 build_system_staging_dir_fs_path,
1466 )
1467 else:
1468 build_system_staging_dir = None
1470 if build_system_staging_dir is not None:
1471 _install_everything_from_source_dir_if_present(
1472 dctrl_bin,
1473 self.substitution,
1474 path_matcher,
1475 install_rule_context,
1476 source_condition_context,
1477 build_system_staging_dir,
1478 )
1480 if self._install_rules:
1481 # FIXME: Check that every install rule remains used after transformations have run.
1482 # What we want to check is transformations do not exclude everything from an install
1483 # rule. The hard part here is that renaming (etc.) is fine, so we cannot 1:1 string
1484 # match.
1485 for install_rule in self._install_rules:
1486 install_rule.perform_install(
1487 path_matcher,
1488 install_rule_context,
1489 source_condition_context,
1490 )
1492 if enable_manifest_installation_feature: 1492 ↛ 1496line 1492 didn't jump to line 1496 because the condition on line 1492 was always true
1493 for search_dir in check_for_uninstalled_dirs:
1494 _detect_missing_installations(path_matcher, search_dir)
1496 for dctrl_bin in self.all_packages:
1497 package = dctrl_bin.name
1498 binary_install_rule_context = install_rule_context[package]
1499 build_system_pkg_staging_dir = os.path.join("debian", package)
1500 fs_root = binary_install_rule_context.fs_root
1502 context = self.package_transformations[package]
1503 if dctrl_bin.should_be_acted_on and enable_manifest_installation_feature:
1504 for special_install_rule in context.install_rules: 1504 ↛ 1505line 1504 didn't jump to line 1505 because the loop on line 1504 never started
1505 special_install_rule.perform_install(
1506 path_matcher,
1507 install_rule_context,
1508 source_condition_context,
1509 )
1511 if dctrl_bin.should_be_acted_on:
1512 self.apply_fs_transformations(package, fs_root)
1513 substvars_file = f"debian/{package}.substvars"
1514 substvars = FlushableSubstvars.load_from_path(
1515 substvars_file, missing_ok=True
1516 )
1517 # We do not want to touch the substvars file (non-clean rebuild contamination)
1518 substvars.substvars_path = None
1519 else:
1520 substvars = FlushableSubstvars()
1522 udeb_package = self._binary_packages.get(f"{package}-udeb")
1523 if udeb_package and not udeb_package.is_udeb: 1523 ↛ 1524line 1523 didn't jump to line 1524 because the condition on line 1523 was never true
1524 udeb_package = None
1526 package_metadata_context = PackageProcessingContextProvider(
1527 self,
1528 dctrl_bin,
1529 udeb_package,
1530 package_data_table,
1531 # FIXME: source_package
1532 )
1534 ctrl_creator = BinaryCtrlAccessorProviderCreator(
1535 package_metadata_context,
1536 substvars,
1537 context.maintscript_snippets,
1538 context.substitution,
1539 )
1541 if not enable_manifest_installation_feature: 1541 ↛ 1542line 1541 didn't jump to line 1542 because the condition on line 1541 was never true
1542 assert_no_dbgsym_migration(dctrl_bin)
1543 dh_dbgsym_root_fs = dhe_dbgsym_root_dir(dctrl_bin)
1544 dh_dbgsym_root_path = FSROOverlay.create_root_dir(
1545 "",
1546 dh_dbgsym_root_fs,
1547 )
1548 dbgsym_root_fs = FSRootDir()
1549 _install_everything_from_source_dir_if_present(
1550 dctrl_bin,
1551 self.substitution,
1552 path_matcher,
1553 install_rule_context,
1554 source_condition_context,
1555 dh_dbgsym_root_path,
1556 into_dir=dbgsym_root_fs,
1557 )
1558 dbgsym_build_ids = read_dbgsym_file(dctrl_bin)
1559 dbgsym_info = DbgsymInfo(
1560 dctrl_bin,
1561 dbgsym_root_fs,
1562 os.path.join(dh_dbgsym_root_fs, "DEBIAN"),
1563 dbgsym_build_ids,
1564 # TODO: Provide manifest feature to support this.
1565 False,
1566 )
1567 else:
1568 dbgsym_info = DbgsymInfo(
1569 dctrl_bin,
1570 FSRootDir(),
1571 None,
1572 [],
1573 False,
1574 )
1576 package_data_dict[package] = BinaryPackageData(
1577 self._source_package,
1578 dctrl_bin,
1579 build_system_pkg_staging_dir,
1580 fs_root,
1581 substvars,
1582 package_metadata_context,
1583 ctrl_creator,
1584 dbgsym_info,
1585 )
1587 if enable_manifest_installation_feature: 1587 ↛ 1590line 1587 didn't jump to line 1590 because the condition on line 1587 was always true
1588 _list_automatic_discard_rules(path_matcher)
1590 return package_data_table
1592 def condition_context(
1593 self, binary_package: BinaryPackage | str | None
1594 ) -> ConditionContext:
1595 if binary_package is None: 1595 ↛ 1596line 1595 didn't jump to line 1596 because the condition on line 1595 was never true
1596 return self._source_condition_context
1597 if not isinstance(binary_package, str): 1597 ↛ 1598line 1597 didn't jump to line 1598 because the condition on line 1597 was never true
1598 binary_package = binary_package.name
1600 package_transformation = self.package_transformations[binary_package]
1601 return self._source_condition_context.replace(
1602 binary_package=package_transformation.binary_package,
1603 substitution=package_transformation.substitution,
1604 )
1606 def apply_fs_transformations(
1607 self,
1608 package: str,
1609 fs_root: FSPath,
1610 ) -> None:
1611 if package in self._used_for: 1611 ↛ 1612line 1611 didn't jump to line 1612 because the condition on line 1611 was never true
1612 raise ValueError(
1613 f"data.tar contents for {package} has already been finalized!?"
1614 )
1615 if package not in self.package_transformations: 1615 ↛ 1616line 1615 didn't jump to line 1616 because the condition on line 1615 was never true
1616 raise ValueError(
1617 f'The package "{package}" was not relevant for the manifest!?'
1618 )
1619 package_transformation = self.package_transformations[package]
1620 condition_context = ConditionContext(
1621 binary_package=package_transformation.binary_package,
1622 substitution=package_transformation.substitution,
1623 deb_options_and_profiles=self._build_env,
1624 dpkg_architecture_variables=self._dpkg_architecture_variables,
1625 dpkg_arch_query_table=self._dpkg_arch_query_table,
1626 )
1627 norm_rules = list(
1628 builtin_mode_normalization_rules(
1629 self._dpkg_architecture_variables,
1630 package_transformation.binary_package,
1631 package_transformation.substitution,
1632 )
1633 )
1634 norm_mode_transformation_rule = ModeNormalizationTransformationRule(norm_rules)
1635 norm_mode_transformation_rule.transform_file_system(fs_root, condition_context)
1636 for transformation in package_transformation.transformations:
1637 transformation.run_transform_file_system(fs_root, condition_context)
1638 interpreter_normalization = NormalizeShebangLineTransformation()
1639 interpreter_normalization.transform_file_system(fs_root, condition_context)
1641 def finalize_data_tar_contents(
1642 self,
1643 package: str,
1644 fs_root: FSPath,
1645 clamp_mtime_to: int,
1646 ) -> IntermediateManifest:
1647 if package in self._used_for: 1647 ↛ 1648line 1647 didn't jump to line 1648 because the condition on line 1647 was never true
1648 raise ValueError(
1649 f"data.tar contents for {package} has already been finalized!?"
1650 )
1651 if package not in self.package_transformations: 1651 ↛ 1652line 1651 didn't jump to line 1652 because the condition on line 1651 was never true
1652 raise ValueError(
1653 f'The package "{package}" was not relevant for the manifest!?'
1654 )
1655 self._used_for.add(package)
1657 # At this point, there so be no further mutations to the file system (because the will not
1658 # be present in the intermediate manifest)
1659 cast("FSRootDir", fs_root).is_read_write = False
1661 intermediate_manifest = list(
1662 _generate_intermediate_manifest(
1663 fs_root,
1664 clamp_mtime_to,
1665 )
1666 )
1667 return intermediate_manifest
1669 def apply_to_binary_staging_directory(
1670 self,
1671 package: str,
1672 fs_root: FSPath,
1673 clamp_mtime_to: int,
1674 ) -> IntermediateManifest:
1675 self.apply_fs_transformations(package, fs_root)
1676 return self.finalize_data_tar_contents(package, fs_root, clamp_mtime_to)
1679@dataclasses.dataclass(slots=True)
1680class SearchDirOrderState:
1681 search_dir: VirtualPath
1682 applies_to: set[BinaryPackage] | frozenset[BinaryPackage] = dataclasses.field(
1683 default_factory=set
1684 )
1685 after: set[str] = dataclasses.field(default_factory=set)
1688def _present_installation_dirs(
1689 search_dirs: Sequence[SearchDir],
1690 checked_missing_dirs: Sequence[VirtualPath],
1691 all_pkgs: frozenset[BinaryPackage],
1692) -> None:
1693 _info("The following directories are considered search dirs (in order):")
1694 max_len = max((len(s.search_dir.fs_path) for s in search_dirs), default=1)
1695 for search_dir in search_dirs:
1696 applies_to = ""
1697 if search_dir.applies_to < all_pkgs:
1698 names = ", ".join(p.name for p in search_dir.applies_to)
1699 applies_to = f" [only applicable to: {names}]"
1700 remark = ""
1701 if not os.path.isdir(search_dir.search_dir.fs_path):
1702 remark = " (skipped; absent)"
1703 _info(f" * {search_dir.search_dir.fs_path:{max_len}}{applies_to}{remark}")
1705 if checked_missing_dirs:
1706 _info('The following directories are considered for "not-installed" paths;')
1707 for d in checked_missing_dirs:
1708 remark = ""
1709 if not os.path.isdir(d.fs_path):
1710 remark = " (skipped; absent)"
1711 _info(f" * {d.fs_path:{max_len}}{remark}")
1714def _determine_search_dir_order(
1715 requested: Mapping[BinaryPackage, list[VirtualPath]],
1716 all_pkgs: frozenset[BinaryPackage],
1717 default_search_dirs: list[VirtualPath],
1718 source_root: VirtualPath,
1719) -> Sequence[SearchDir]:
1720 search_dir_table = {}
1721 assert requested.keys() <= all_pkgs
1722 for pkg in all_pkgs:
1723 paths = requested.get(pkg, default_search_dirs)
1724 previous_search_dir: SearchDirOrderState | None = None
1725 for path in paths:
1726 try:
1727 search_dir_state = search_dir_table[path.fs_path]
1728 except KeyError:
1729 search_dir_state = SearchDirOrderState(path)
1730 search_dir_table[path.fs_path] = search_dir_state
1731 search_dir_state.applies_to.add(pkg)
1732 if previous_search_dir is not None:
1733 search_dir_state.after.add(previous_search_dir.search_dir.fs_path)
1734 previous_search_dir = search_dir_state
1736 search_dirs_in_order = []
1737 released = set()
1738 remaining = set()
1739 for search_dir_state in search_dir_table.values():
1740 if not (search_dir_state.after <= released):
1741 remaining.add(search_dir_state.search_dir.fs_path)
1742 continue
1743 search_dirs_in_order.append(search_dir_state)
1744 released.add(search_dir_state.search_dir.fs_path)
1746 while remaining:
1747 current_released = len(released)
1748 for fs_path in remaining:
1749 search_dir_state = search_dir_table[fs_path]
1750 if not search_dir_state.after.issubset(released):
1751 remaining.add(search_dir_state.search_dir.fs_path)
1752 continue
1753 search_dirs_in_order.append(search_dir_state)
1754 released.add(search_dir_state.search_dir.fs_path)
1756 if current_released == len(released):
1757 names = ", ".join(remaining)
1758 _error(
1759 f"There is a circular dependency (somewhere) between the search dirs: {names}."
1760 " Note that the search directories across all packages have to be ordered (and the"
1761 " source root should generally be last)"
1762 )
1763 remaining -= released
1765 search_dirs_in_order.append(
1766 SearchDirOrderState(
1767 source_root,
1768 all_pkgs,
1769 )
1770 )
1772 return tuple(
1773 # Avoid duplicating all_pkgs
1774 SearchDir(
1775 s.search_dir,
1776 frozenset(s.applies_to) if s.applies_to != all_pkgs else all_pkgs,
1777 )
1778 for s in search_dirs_in_order
1779 )