Coverage for src/debputy/highlevel_manifest.py: 64%
853 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-07-15 05:36 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-07-15 05:36 +0000
1import dataclasses
2import functools
3import os
4import textwrap
5from contextlib import suppress
6from dataclasses import dataclass, field
7from typing import (
8 List,
9 Dict,
10 Iterable,
11 Mapping,
12 Any,
13 Union,
14 Optional,
15 TypeVar,
16 Generic,
17 cast,
18 Set,
19 Tuple,
20 Sequence,
21 FrozenSet,
22 Callable,
23)
25from debian.debian_support import DpkgArchTable
27from debputy.dh.debhelper_emulation import (
28 dhe_dbgsym_root_dir,
29 assert_no_dbgsym_migration,
30 read_dbgsym_file,
31)
32from ._deb_options_profiles import DebBuildOptionsAndProfiles
33from ._manifest_constants import *
34from .architecture_support import DpkgArchitectureBuildProcessValuesTable
35from .builtin_manifest_rules import builtin_mode_normalization_rules
36from .exceptions import (
37 DebputySubstitutionError,
38 DebputyRuntimeErrorWithPreamble,
39)
40from .filesystem_scan import FSPath, FSRootDir, FSROOverlay, FSControlRootDir
41from .installations import (
42 InstallRule,
43 SourcePathMatcher,
44 PathAlreadyInstalledOrDiscardedError,
45 NoMatchForInstallPatternError,
46 InstallRuleContext,
47 BinaryPackageInstallRuleContext,
48 InstallSearchDirContext,
49 SearchDir,
50)
51from .intermediate_manifest import TarMember, PathType, IntermediateManifest
52from .maintscript_snippet import (
53 DpkgMaintscriptHelperCommand,
54 MaintscriptSnippetContainer,
55)
56from .manifest_conditions import ConditionContext
57from .manifest_parser.base_types import (
58 FileSystemMatchRule,
59 FileSystemExactMatchRule,
60 BuildEnvironments,
61)
62from .manifest_parser.util import AttributePath
63from .packager_provided_files import PackagerProvidedFile
64from .packages import BinaryPackage, SourcePackage
65from .plugin.api.feature_set import PluginProvidedFeatureSet
66from .plugin.api.impl import BinaryCtrlAccessorProviderCreator
67from .plugin.api.impl_types import (
68 PackageProcessingContextProvider,
69 PackageDataTable,
70)
71from .plugin.api.spec import (
72 FlushableSubstvars,
73 VirtualPath,
74 DebputyIntegrationMode,
75 INTEGRATION_MODE_DH_DEBPUTY_RRR,
76 INTEGRATION_MODE_FULL,
77)
78from debputy.plugins.debputy.binary_package_rules import ServiceRule
79from debputy.plugins.debputy.build_system_rules import BuildRule
80from .plugin.plugin_state import run_in_context_of_plugin
81from .substitution import Substitution
82from .transformation_rules import (
83 TransformationRule,
84 ModeNormalizationTransformationRule,
85 NormalizeShebangLineTransformation,
86)
87from .util import (
88 _error,
89 _warn,
90 debian_policy_normalize_symlink_target,
91 generated_content_dir,
92 _info,
93)
94from .yaml import MANIFEST_YAML
95from .yaml.compat import CommentedMap, CommentedSeq
98class PathNotCoveredByInstallRulesError(DebputyRuntimeErrorWithPreamble):
100 @property
101 def unmatched_paths(self) -> Sequence[VirtualPath]:
102 return self.args[1]
104 @property
105 def search_dir(self) -> VirtualPath:
106 return self.args[2]
108 def render_preamble(self) -> None:
109 _warn(
110 f"The following paths were present in {self.search_dir.fs_path}, but not installed (nor explicitly discarded)."
111 )
112 _warn("")
113 for entry in self.unmatched_paths:
114 desc = _describe_missing_path(entry)
115 _warn(f" * {desc}")
116 _warn("")
119@dataclass(slots=True)
120class DbgsymInfo:
121 binary_package: BinaryPackage
122 dbgsym_fs_root: FSPath
123 _dbgsym_root_fs: Optional[str]
124 dbgsym_ids: List[str]
126 @property
127 def dbgsym_root_dir(self) -> str:
128 root_dir = self._dbgsym_root_fs
129 if root_dir is None:
130 root_dir = generated_content_dir(
131 package=self.binary_package,
132 subdir_key="dbgsym-fs-root",
133 )
134 self._dbgsym_root_fs = root_dir
135 return root_dir
137 @property
138 def dbgsym_ctrl_dir(self) -> FSControlRootDir:
139 return FSControlRootDir.create_root_dir(
140 os.path.join(self.dbgsym_root_dir, "DEBIAN")
141 )
144@dataclass(slots=True, frozen=True)
145class BinaryPackageData:
146 source_package: SourcePackage
147 binary_package: BinaryPackage
148 binary_staging_root_dir: str
149 fs_root: FSPath
150 substvars: FlushableSubstvars
151 package_metadata_context: PackageProcessingContextProvider
152 ctrl_creator: BinaryCtrlAccessorProviderCreator
153 dbgsym_info: DbgsymInfo
155 @property
156 def control_output_dir(self) -> FSControlRootDir:
157 return FSControlRootDir.create_root_dir(
158 generated_content_dir(
159 package=self.binary_package,
160 subdir_key="DEBIAN",
161 )
162 )
165@dataclass(slots=True)
166class PackageTransformationDefinition:
167 binary_package: BinaryPackage
168 substitution: Substitution
169 is_auto_generated_package: bool
170 binary_version: Optional[str] = None
171 search_dirs: Optional[List[FileSystemExactMatchRule]] = None
172 dpkg_maintscript_helper_snippets: List[DpkgMaintscriptHelperCommand] = field(
173 default_factory=list
174 )
175 maintscript_snippets: Dict[str, MaintscriptSnippetContainer] = field(
176 default_factory=dict
177 )
178 transformations: List[TransformationRule] = field(default_factory=list)
179 reserved_packager_provided_files: Dict[str, List[PackagerProvidedFile]] = field(
180 default_factory=dict
181 )
182 install_rules: List[InstallRule] = field(default_factory=list)
183 requested_service_rules: List[ServiceRule] = field(default_factory=list)
186def _path_to_tar_member(
187 path: FSPath,
188 clamp_mtime_to: int,
189) -> TarMember:
190 mtime = float(clamp_mtime_to)
191 owner, uid, group, gid = path.tar_owner_info
192 mode = path.mode
194 if path.has_fs_path:
195 mtime = min(mtime, path.mtime)
197 if path.is_dir:
198 path_type = PathType.DIRECTORY
199 elif path.is_file:
200 # TODO: someday we will need to deal with hardlinks and it might appear here.
201 path_type = PathType.FILE
202 elif path.is_symlink: 202 ↛ 222line 202 didn't jump to line 222 because the condition on line 202 was always true
203 # Special-case that we resolve immediately (since we need to normalize the target anyway)
204 link_target = debian_policy_normalize_symlink_target(
205 path.path,
206 path.readlink(),
207 )
208 return TarMember.virtual_path(
209 path.tar_path,
210 PathType.SYMLINK,
211 mtime,
212 link_target=link_target,
213 # Force mode to be 0777 as that is the mode we see in the data.tar. In theory, tar lets you set
214 # it to whatever. However, for reproducibility, we have to be well-behaved - and that is 0777.
215 mode=0o0777,
216 owner=owner,
217 uid=uid,
218 group=group,
219 gid=gid,
220 )
221 else:
222 assert not path.is_symlink
223 raise AssertionError(
224 f"Unsupported file type: {path.path} - not a file, dir nor a symlink!"
225 )
227 if not path.has_fs_path:
228 assert not path.is_file
229 return TarMember.virtual_path(
230 path.tar_path,
231 path_type,
232 mtime,
233 mode=mode,
234 owner=owner,
235 uid=uid,
236 group=group,
237 gid=gid,
238 )
239 may_steal_fs_path = path._can_replace_inline
240 return TarMember.from_file(
241 path.tar_path,
242 path.fs_path,
243 mode=mode,
244 uid=uid,
245 owner=owner,
246 gid=gid,
247 group=group,
248 path_type=path_type,
249 path_mtime=mtime,
250 clamp_mtime_to=clamp_mtime_to,
251 may_steal_fs_path=may_steal_fs_path,
252 )
255def _generate_intermediate_manifest(
256 fs_root: FSPath,
257 clamp_mtime_to: int,
258) -> Iterable[TarMember]:
259 symlinks = []
260 for path in fs_root.all_paths():
261 tar_member = _path_to_tar_member(path, clamp_mtime_to)
262 if tar_member.path_type == PathType.SYMLINK:
263 symlinks.append(tar_member)
264 continue
265 yield tar_member
266 yield from symlinks
269ST = TypeVar("ST")
270T = TypeVar("T")
273class AbstractYAMLSubStore(Generic[ST]):
274 def __init__(
275 self,
276 parent_store: Any,
277 parent_key: Optional[Union[int, str]],
278 store: Optional[ST] = None,
279 ) -> None:
280 if parent_store is not None and parent_key is not None:
281 try:
282 from_parent_store = parent_store[parent_key]
283 except (KeyError, IndexError):
284 from_parent_store = None
285 if ( 285 ↛ 290line 285 didn't jump to line 290 because the condition on line 285 was never true
286 store is not None
287 and from_parent_store is not None
288 and store is not parent_store
289 ):
290 raise ValueError(
291 "Store is provided but is not the one already in the parent store"
292 )
293 if store is None: 293 ↛ 295line 293 didn't jump to line 295 because the condition on line 293 was always true
294 store = from_parent_store
295 self._parent_store = parent_store
296 self._parent_key = parent_key
297 self._is_detached = (
298 parent_key is None or parent_store is None or parent_key not in parent_store
299 )
300 assert self._is_detached or store is not None
301 if store is None:
302 store = self._create_new_instance()
303 self._store: ST = store
305 def _create_new_instance(self) -> ST:
306 raise NotImplementedError
308 def create_definition_if_missing(self) -> None:
309 if self._is_detached:
310 self.create_definition()
312 def create_definition(self) -> None:
313 if not self._is_detached: 313 ↛ 314line 313 didn't jump to line 314 because the condition on line 313 was never true
314 raise RuntimeError("Definition is already present")
315 parent_store = self._parent_store
316 if parent_store is None: 316 ↛ 317line 316 didn't jump to line 317 because the condition on line 316 was never true
317 raise RuntimeError(
318 f"Definition is not attached to any parent!? ({self.__class__.__name__})"
319 )
320 if isinstance(parent_store, list):
321 assert self._parent_key is None
322 self._parent_key = len(parent_store)
323 self._parent_store.append(self._store)
324 else:
325 parent_store[self._parent_key] = self._store
326 self._is_detached = False
328 def remove_definition(self) -> None:
329 self._ensure_attached()
330 del self._parent_store[self._parent_key]
331 if isinstance(self._parent_store, list):
332 self._parent_key = None
333 self._is_detached = True
335 def _ensure_attached(self) -> None:
336 if self._is_detached:
337 raise RuntimeError("The definition has been removed!")
340class AbstractYAMLListSubStore(Generic[T], AbstractYAMLSubStore[List[T]]):
341 def _create_new_instance(self) -> List[T]:
342 return CommentedSeq()
345class AbstractYAMLDictSubStore(Generic[T], AbstractYAMLSubStore[Dict[str, T]]):
346 def _create_new_instance(self) -> Dict[str, T]:
347 return CommentedMap()
350class MutableCondition:
351 @classmethod
352 def arch_matches(cls, arch_filter: str) -> CommentedMap:
353 return CommentedMap({MK_CONDITION_ARCH_MATCHES: arch_filter})
355 @classmethod
356 def build_profiles_matches(cls, build_profiles_matches: str) -> CommentedMap:
357 return CommentedMap(
358 {MK_CONDITION_BUILD_PROFILES_MATCHES: build_profiles_matches}
359 )
362class MutableYAMLSymlink(AbstractYAMLDictSubStore[Any]):
363 @classmethod
364 def new_symlink(
365 cls, link_path: str, link_target: str, condition: Optional[Any]
366 ) -> "MutableYAMLSymlink":
367 inner = {
368 MK_TRANSFORMATIONS_CREATE_SYMLINK_LINK_PATH: link_path,
369 MK_TRANSFORMATIONS_CREATE_SYMLINK_LINK_TARGET: link_target,
370 }
371 content = {MK_TRANSFORMATIONS_CREATE_SYMLINK: inner}
372 if condition is not None: 372 ↛ 373line 372 didn't jump to line 373 because the condition on line 372 was never true
373 inner["when"] = condition
374 return cls(None, None, store=CommentedMap(content))
376 @property
377 def symlink_path(self) -> str:
378 return self._store[MK_TRANSFORMATIONS_CREATE_SYMLINK][
379 MK_TRANSFORMATIONS_CREATE_SYMLINK_LINK_PATH
380 ]
382 @symlink_path.setter
383 def symlink_path(self, path: str) -> None:
384 self._store[MK_TRANSFORMATIONS_CREATE_SYMLINK][
385 MK_TRANSFORMATIONS_CREATE_SYMLINK_LINK_PATH
386 ] = path
388 @property
389 def symlink_target(self) -> Optional[str]:
390 return self._store[MK_TRANSFORMATIONS_CREATE_SYMLINK][
391 MK_TRANSFORMATIONS_CREATE_SYMLINK_LINK_TARGET
392 ]
394 @symlink_target.setter
395 def symlink_target(self, target: str) -> None:
396 self._store[MK_TRANSFORMATIONS_CREATE_SYMLINK][
397 MK_TRANSFORMATIONS_CREATE_SYMLINK_LINK_TARGET
398 ] = target
401class MutableYAMLConffileManagementItem(AbstractYAMLDictSubStore[Any]):
402 @classmethod
403 def rm_conffile(
404 cls,
405 conffile: str,
406 prior_to_version: Optional[str],
407 owning_package: Optional[str],
408 ) -> "MutableYAMLConffileManagementItem":
409 r = cls(
410 None,
411 None,
412 store=CommentedMap(
413 {
414 MK_CONFFILE_MANAGEMENT_REMOVE: CommentedMap(
415 {MK_CONFFILE_MANAGEMENT_REMOVE_PATH: conffile}
416 )
417 }
418 ),
419 )
420 r.prior_to_version = prior_to_version
421 r.owning_package = owning_package
422 return r
424 @classmethod
425 def mv_conffile(
426 cls,
427 old_conffile: str,
428 new_conffile: str,
429 prior_to_version: Optional[str],
430 owning_package: Optional[str],
431 ) -> "MutableYAMLConffileManagementItem":
432 r = cls(
433 None,
434 None,
435 store=CommentedMap(
436 {
437 MK_CONFFILE_MANAGEMENT_RENAME: CommentedMap(
438 {
439 MK_CONFFILE_MANAGEMENT_RENAME_SOURCE: old_conffile,
440 MK_CONFFILE_MANAGEMENT_RENAME_TARGET: new_conffile,
441 }
442 )
443 }
444 ),
445 )
446 r.prior_to_version = prior_to_version
447 r.owning_package = owning_package
448 return r
450 @property
451 def _container(self) -> Dict[str, Any]:
452 assert len(self._store) == 1
453 return next(iter(self._store.values()))
455 @property
456 def command(self) -> str:
457 assert len(self._store) == 1
458 return next(iter(self._store))
460 @property
461 def obsolete_conffile(self) -> str:
462 if self.command == MK_CONFFILE_MANAGEMENT_REMOVE:
463 return self._container[MK_CONFFILE_MANAGEMENT_REMOVE_PATH]
464 assert self.command == MK_CONFFILE_MANAGEMENT_RENAME
465 return self._container[MK_CONFFILE_MANAGEMENT_RENAME_SOURCE]
467 @obsolete_conffile.setter
468 def obsolete_conffile(self, value: str) -> None:
469 if self.command == MK_CONFFILE_MANAGEMENT_REMOVE:
470 self._container[MK_CONFFILE_MANAGEMENT_REMOVE_PATH] = value
471 else:
472 assert self.command == MK_CONFFILE_MANAGEMENT_RENAME
473 self._container[MK_CONFFILE_MANAGEMENT_RENAME_SOURCE] = value
475 @property
476 def new_conffile(self) -> str:
477 if self.command != MK_CONFFILE_MANAGEMENT_RENAME:
478 raise TypeError(
479 f"The new_conffile attribute is only applicable to command {MK_CONFFILE_MANAGEMENT_RENAME}."
480 f" This is a {self.command}"
481 )
482 return self._container[MK_CONFFILE_MANAGEMENT_RENAME_TARGET]
484 @new_conffile.setter
485 def new_conffile(self, value: str) -> None:
486 if self.command != MK_CONFFILE_MANAGEMENT_RENAME:
487 raise TypeError(
488 f"The new_conffile attribute is only applicable to command {MK_CONFFILE_MANAGEMENT_RENAME}."
489 f" This is a {self.command}"
490 )
491 self._container[MK_CONFFILE_MANAGEMENT_RENAME_TARGET] = value
493 @property
494 def prior_to_version(self) -> Optional[str]:
495 return self._container.get(MK_CONFFILE_MANAGEMENT_X_PRIOR_TO_VERSION)
497 @prior_to_version.setter
498 def prior_to_version(self, value: Optional[str]) -> None:
499 if value is None:
500 try:
501 del self._container[MK_CONFFILE_MANAGEMENT_X_PRIOR_TO_VERSION]
502 except KeyError:
503 pass
504 else:
505 self._container[MK_CONFFILE_MANAGEMENT_X_PRIOR_TO_VERSION] = value
507 @property
508 def owning_package(self) -> Optional[str]:
509 return self._container[MK_CONFFILE_MANAGEMENT_X_PRIOR_TO_VERSION]
511 @owning_package.setter
512 def owning_package(self, value: Optional[str]) -> None:
513 if value is None:
514 try:
515 del self._container[MK_CONFFILE_MANAGEMENT_X_OWNING_PACKAGE]
516 except KeyError:
517 pass
518 else:
519 self._container[MK_CONFFILE_MANAGEMENT_X_OWNING_PACKAGE] = value
522class MutableYAMLPackageDefinition(AbstractYAMLDictSubStore):
523 def _list_store(
524 self, key, *, create_if_absent: bool = False
525 ) -> Optional[List[Dict[str, Any]]]:
526 if self._is_detached or key not in self._store:
527 if create_if_absent: 527 ↛ 528line 527 didn't jump to line 528 because the condition on line 527 was never true
528 return None
529 self.create_definition_if_missing()
530 self._store[key] = []
531 return self._store[key]
533 def _insert_item(self, key: str, item: AbstractYAMLDictSubStore) -> None:
534 parent_store = self._list_store(key, create_if_absent=True)
535 assert parent_store is not None
536 if not item._is_detached or ( 536 ↛ 539line 536 didn't jump to line 539 because the condition on line 536 was never true
537 item._parent_store is not None and item._parent_store is not parent_store
538 ):
539 raise RuntimeError(
540 "Item is already attached or associated with a different container"
541 )
542 item._parent_store = parent_store
543 item.create_definition()
545 def add_symlink(self, symlink: MutableYAMLSymlink) -> None:
546 self._insert_item(MK_TRANSFORMATIONS, symlink)
548 def symlinks(self) -> Iterable[MutableYAMLSymlink]:
549 store = self._list_store(MK_TRANSFORMATIONS)
550 if store is None: 550 ↛ 551line 550 didn't jump to line 551 because the condition on line 550 was never true
551 return
552 for i in range(len(store)): 552 ↛ 553line 552 didn't jump to line 553 because the loop on line 552 never started
553 d = store[i]
554 if d and isinstance(d, dict) and len(d) == 1 and "symlink" in d:
555 yield MutableYAMLSymlink(store, i)
557 def conffile_management_items(self) -> Iterable[MutableYAMLConffileManagementItem]:
558 store = self._list_store(MK_CONFFILE_MANAGEMENT)
559 if store is None: 559 ↛ 560line 559 didn't jump to line 560 because the condition on line 559 was never true
560 return
561 yield from (
562 MutableYAMLConffileManagementItem(store, i) for i in range(len(store))
563 )
565 def add_conffile_management(
566 self, conffile_management_item: MutableYAMLConffileManagementItem
567 ) -> None:
568 self._insert_item(MK_CONFFILE_MANAGEMENT, conffile_management_item)
571class AbstractMutableYAMLInstallRule(AbstractYAMLDictSubStore):
572 @property
573 def _container(self) -> Dict[str, Any]:
574 assert len(self._store) == 1
575 return next(iter(self._store.values()))
577 @property
578 def into(self) -> Optional[List[str]]:
579 v = self._container[MK_INSTALLATIONS_INSTALL_INTO]
580 if v is None:
581 return None
582 if isinstance(v, str):
583 return [v]
584 return v
586 @into.setter
587 def into(self, new_value: Optional[Union[str, List[str]]]) -> None:
588 if new_value is None: 588 ↛ 592line 588 didn't jump to line 592 because the condition on line 588 was always true
589 with suppress(KeyError):
590 del self._container[MK_INSTALLATIONS_INSTALL_INTO]
591 return
592 if isinstance(new_value, str):
593 self._container[MK_INSTALLATIONS_INSTALL_INTO] = new_value
594 return
595 new_list = CommentedSeq(new_value)
596 self._container[MK_INSTALLATIONS_INSTALL_INTO] = new_list
598 @property
599 def when(self) -> Optional[Union[str, Mapping[str, Any]]]:
600 return self._container[MK_CONDITION_WHEN]
602 @when.setter
603 def when(self, new_value: Optional[Union[str, Mapping[str, Any]]]) -> None:
604 if new_value is None: 604 ↛ 605line 604 didn't jump to line 605 because the condition on line 604 was never true
605 with suppress(KeyError):
606 del self._container[MK_CONDITION_WHEN]
607 return
608 if isinstance(new_value, str): 608 ↛ 609line 608 didn't jump to line 609 because the condition on line 608 was never true
609 self._container[MK_CONDITION_WHEN] = new_value
610 return
611 new_map = CommentedMap(new_value)
612 self._container[MK_CONDITION_WHEN] = new_map
614 @classmethod
615 def install_dest(
616 cls,
617 sources: Union[str, List[str]],
618 into: Optional[Union[str, List[str]]],
619 *,
620 dest_dir: Optional[str] = None,
621 when: Optional[Union[str, Mapping[str, Any]]] = None,
622 ) -> "MutableYAMLInstallRuleInstall":
623 k = MK_INSTALLATIONS_INSTALL_SOURCES
624 if isinstance(sources, str):
625 k = MK_INSTALLATIONS_INSTALL_SOURCE
626 r = MutableYAMLInstallRuleInstall(
627 None,
628 None,
629 store=CommentedMap(
630 {
631 MK_INSTALLATIONS_INSTALL: CommentedMap(
632 {
633 k: sources,
634 }
635 )
636 }
637 ),
638 )
639 r.dest_dir = dest_dir
640 r.into = into
641 if when is not None:
642 r.when = when
643 return r
645 @classmethod
646 def multi_dest_install(
647 cls,
648 sources: Union[str, List[str]],
649 dest_dirs: Sequence[str],
650 into: Optional[Union[str, List[str]]],
651 *,
652 when: Optional[Union[str, Mapping[str, Any]]] = None,
653 ) -> "MutableYAMLInstallRuleInstall":
654 k = MK_INSTALLATIONS_INSTALL_SOURCES
655 if isinstance(sources, str): 655 ↛ 657line 655 didn't jump to line 657 because the condition on line 655 was always true
656 k = MK_INSTALLATIONS_INSTALL_SOURCE
657 r = MutableYAMLInstallRuleInstall(
658 None,
659 None,
660 store=CommentedMap(
661 {
662 MK_INSTALLATIONS_MULTI_DEST_INSTALL: CommentedMap(
663 {
664 k: sources,
665 "dest-dirs": dest_dirs,
666 }
667 )
668 }
669 ),
670 )
671 r.into = into
672 if when is not None: 672 ↛ 673line 672 didn't jump to line 673 because the condition on line 672 was never true
673 r.when = when
674 return r
676 @classmethod
677 def install_as(
678 cls,
679 source: str,
680 install_as: str,
681 into: Optional[Union[str, List[str]]],
682 when: Optional[Union[str, Mapping[str, Any]]] = None,
683 ) -> "MutableYAMLInstallRuleInstall":
684 r = MutableYAMLInstallRuleInstall(
685 None,
686 None,
687 store=CommentedMap(
688 {
689 MK_INSTALLATIONS_INSTALL: CommentedMap(
690 {
691 MK_INSTALLATIONS_INSTALL_SOURCE: source,
692 MK_INSTALLATIONS_INSTALL_AS: install_as,
693 }
694 )
695 }
696 ),
697 )
698 r.into = into
699 if when is not None: 699 ↛ 700line 699 didn't jump to line 700 because the condition on line 699 was never true
700 r.when = when
701 return r
703 @classmethod
704 def install_doc_as(
705 cls,
706 source: str,
707 install_as: str,
708 into: Optional[Union[str, List[str]]],
709 when: Optional[Union[str, Mapping[str, Any]]] = None,
710 ) -> "MutableYAMLInstallRuleInstall":
711 r = MutableYAMLInstallRuleInstall(
712 None,
713 None,
714 store=CommentedMap(
715 {
716 MK_INSTALLATIONS_INSTALL_DOCS: CommentedMap(
717 {
718 MK_INSTALLATIONS_INSTALL_SOURCE: source,
719 MK_INSTALLATIONS_INSTALL_AS: install_as,
720 }
721 )
722 }
723 ),
724 )
725 r.into = into
726 if when is not None:
727 r.when = when
728 return r
730 @classmethod
731 def install_docs(
732 cls,
733 sources: Union[str, List[str]],
734 into: Optional[Union[str, List[str]]],
735 *,
736 dest_dir: Optional[str] = None,
737 when: Optional[Union[str, Mapping[str, Any]]] = None,
738 ) -> "MutableYAMLInstallRuleInstall":
739 k = MK_INSTALLATIONS_INSTALL_SOURCES
740 if isinstance(sources, str):
741 k = MK_INSTALLATIONS_INSTALL_SOURCE
742 r = MutableYAMLInstallRuleInstall(
743 None,
744 None,
745 store=CommentedMap(
746 {
747 MK_INSTALLATIONS_INSTALL_DOCS: CommentedMap(
748 {
749 k: sources,
750 }
751 )
752 }
753 ),
754 )
755 r.into = into
756 r.dest_dir = dest_dir
757 if when is not None:
758 r.when = when
759 return r
761 @classmethod
762 def install_examples(
763 cls,
764 sources: Union[str, List[str]],
765 into: Optional[Union[str, List[str]]],
766 when: Optional[Union[str, Mapping[str, Any]]] = None,
767 ) -> "MutableYAMLInstallRuleInstallExamples":
768 k = MK_INSTALLATIONS_INSTALL_SOURCES
769 if isinstance(sources, str):
770 k = MK_INSTALLATIONS_INSTALL_SOURCE
771 r = MutableYAMLInstallRuleInstallExamples(
772 None,
773 None,
774 store=CommentedMap(
775 {
776 MK_INSTALLATIONS_INSTALL_EXAMPLES: CommentedMap(
777 {
778 k: sources,
779 }
780 )
781 }
782 ),
783 )
784 r.into = into
785 if when is not None: 785 ↛ 786line 785 didn't jump to line 786 because the condition on line 785 was never true
786 r.when = when
787 return r
789 @classmethod
790 def install_man(
791 cls,
792 sources: Union[str, List[str]],
793 into: Optional[Union[str, List[str]]],
794 language: Optional[str],
795 when: Optional[Union[str, Mapping[str, Any]]] = None,
796 ) -> "MutableYAMLInstallRuleMan":
797 k = MK_INSTALLATIONS_INSTALL_SOURCES
798 if isinstance(sources, str): 798 ↛ 799line 798 didn't jump to line 799 because the condition on line 798 was never true
799 k = MK_INSTALLATIONS_INSTALL_SOURCE
800 r = MutableYAMLInstallRuleMan(
801 None,
802 None,
803 store=CommentedMap(
804 {
805 MK_INSTALLATIONS_INSTALL_MAN: CommentedMap(
806 {
807 k: sources,
808 }
809 )
810 }
811 ),
812 )
813 r.language = language
814 r.into = into
815 if when is not None: 815 ↛ 816line 815 didn't jump to line 816 because the condition on line 815 was never true
816 r.when = when
817 return r
819 @classmethod
820 def discard(
821 cls,
822 sources: Union[str, List[str]],
823 ) -> "MutableYAMLInstallRuleDiscard":
824 return MutableYAMLInstallRuleDiscard(
825 None,
826 None,
827 store=CommentedMap({MK_INSTALLATIONS_DISCARD: sources}),
828 )
831class MutableYAMLInstallRuleInstallExamples(AbstractMutableYAMLInstallRule):
832 pass
835class MutableYAMLInstallRuleMan(AbstractMutableYAMLInstallRule):
836 @property
837 def language(self) -> Optional[str]:
838 return self._container[MK_INSTALLATIONS_INSTALL_MAN_LANGUAGE]
840 @language.setter
841 def language(self, new_value: Optional[str]) -> None:
842 if new_value is not None:
843 self._container[MK_INSTALLATIONS_INSTALL_MAN_LANGUAGE] = new_value
844 return
845 with suppress(KeyError):
846 del self._container[MK_INSTALLATIONS_INSTALL_MAN_LANGUAGE]
849class MutableYAMLInstallRuleDiscard(AbstractMutableYAMLInstallRule):
850 pass
853class MutableYAMLInstallRuleInstall(AbstractMutableYAMLInstallRule):
854 @property
855 def sources(self) -> List[str]:
856 v = self._container[MK_INSTALLATIONS_INSTALL_SOURCES]
857 if isinstance(v, str):
858 return [v]
859 return v
861 @sources.setter
862 def sources(self, new_value: Union[str, List[str]]) -> None:
863 if isinstance(new_value, str):
864 self._container[MK_INSTALLATIONS_INSTALL_SOURCES] = new_value
865 return
866 new_list = CommentedSeq(new_value)
867 self._container[MK_INSTALLATIONS_INSTALL_SOURCES] = new_list
869 @property
870 def dest_dir(self) -> Optional[str]:
871 return self._container.get(MK_INSTALLATIONS_INSTALL_DEST_DIR)
873 @dest_dir.setter
874 def dest_dir(self, new_value: Optional[str]) -> None:
875 if new_value is not None and self.dest_as is not None: 875 ↛ 876line 875 didn't jump to line 876 because the condition on line 875 was never true
876 raise ValueError(
877 f'Cannot both have a "{MK_INSTALLATIONS_INSTALL_DEST_DIR}" and'
878 f' "{MK_INSTALLATIONS_INSTALL_AS}"'
879 )
880 if new_value is not None:
881 self._container[MK_INSTALLATIONS_INSTALL_DEST_DIR] = new_value
882 else:
883 with suppress(KeyError):
884 del self._container[MK_INSTALLATIONS_INSTALL_DEST_DIR]
886 @property
887 def dest_as(self) -> Optional[str]:
888 return self._container.get(MK_INSTALLATIONS_INSTALL_AS)
890 @dest_as.setter
891 def dest_as(self, new_value: Optional[str]) -> None:
892 if new_value is not None:
893 if self.dest_dir is not None:
894 raise ValueError(
895 f'Cannot both have a "{MK_INSTALLATIONS_INSTALL_DEST_DIR}" and'
896 f' "{MK_INSTALLATIONS_INSTALL_AS}"'
897 )
899 sources = self._container[MK_INSTALLATIONS_INSTALL_SOURCES]
900 if isinstance(sources, list):
901 if len(sources) != 1:
902 raise ValueError(
903 f'Cannot have "{MK_INSTALLATIONS_INSTALL_AS}" when'
904 f' "{MK_INSTALLATIONS_INSTALL_SOURCES}" is not exactly one item'
905 )
906 self.sources = sources[0]
907 self._container[MK_INSTALLATIONS_INSTALL_AS] = new_value
908 else:
909 with suppress(KeyError):
910 del self._container[MK_INSTALLATIONS_INSTALL_AS]
913class MutableYAMLInstallationsDefinition(AbstractYAMLListSubStore[Any]):
914 def append(self, install_rule: AbstractMutableYAMLInstallRule) -> None:
915 parent_store = self._store
916 if not install_rule._is_detached or ( 916 ↛ 920line 916 didn't jump to line 920 because the condition on line 916 was never true
917 install_rule._parent_store is not None
918 and install_rule._parent_store is not parent_store
919 ):
920 raise RuntimeError(
921 "Item is already attached or associated with a different container"
922 )
923 self.create_definition_if_missing()
924 install_rule._parent_store = parent_store
925 install_rule.create_definition()
927 def extend(self, install_rules: Iterable[AbstractMutableYAMLInstallRule]) -> None:
928 parent_store = self._store
929 for install_rule in install_rules:
930 if not install_rule._is_detached or ( 930 ↛ 934line 930 didn't jump to line 934 because the condition on line 930 was never true
931 install_rule._parent_store is not None
932 and install_rule._parent_store is not parent_store
933 ):
934 raise RuntimeError(
935 "Item is already attached or associated with a different container"
936 )
937 self.create_definition_if_missing()
938 install_rule._parent_store = parent_store
939 install_rule.create_definition()
942class MutableYAMLManifestVariables(AbstractYAMLDictSubStore):
943 @property
944 def variables(self) -> Dict[str, Any]:
945 return self._store
947 def __setitem__(self, key: str, value: Any) -> None:
948 self._store[key] = value
949 self.create_definition_if_missing()
952class MutableYAMLManifestDefinitions(AbstractYAMLDictSubStore):
953 def manifest_variables(
954 self, *, create_if_absent: bool = True
955 ) -> MutableYAMLManifestVariables:
956 d = MutableYAMLManifestVariables(self._store, MK_MANIFEST_VARIABLES)
957 if create_if_absent: 957 ↛ 958line 957 didn't jump to line 958 because the condition on line 957 was never true
958 d.create_definition_if_missing()
959 return d
962class MutableYAMLManifest:
963 def __init__(self, store: Any) -> None:
964 self._store = store
966 @classmethod
967 def empty_manifest(cls) -> "MutableYAMLManifest":
968 return cls(CommentedMap({MK_MANIFEST_VERSION: DEFAULT_MANIFEST_VERSION}))
970 @property
971 def manifest_version(self) -> str:
972 return self._store[MK_MANIFEST_VERSION]
974 @manifest_version.setter
975 def manifest_version(self, version: str) -> None:
976 if version not in SUPPORTED_MANIFEST_VERSIONS:
977 raise ValueError("Unsupported version")
978 self._store[MK_MANIFEST_VERSION] = version
980 def installations(
981 self,
982 *,
983 create_if_absent: bool = True,
984 ) -> MutableYAMLInstallationsDefinition:
985 d = MutableYAMLInstallationsDefinition(self._store, MK_INSTALLATIONS)
986 if create_if_absent: 986 ↛ 987line 986 didn't jump to line 987 because the condition on line 986 was never true
987 d.create_definition_if_missing()
988 return d
990 def manifest_definitions(
991 self,
992 *,
993 create_if_absent: bool = True,
994 ) -> MutableYAMLManifestDefinitions:
995 d = MutableYAMLManifestDefinitions(self._store, MK_MANIFEST_DEFINITIONS)
996 if create_if_absent: 996 ↛ 997line 996 didn't jump to line 997 because the condition on line 996 was never true
997 d.create_definition_if_missing()
998 return d
1000 def package(
1001 self, name: str, *, create_if_absent: bool = True
1002 ) -> MutableYAMLPackageDefinition:
1003 if MK_PACKAGES not in self._store: 1003 ↛ 1005line 1003 didn't jump to line 1005 because the condition on line 1003 was always true
1004 self._store[MK_PACKAGES] = CommentedMap()
1005 packages_store = self._store[MK_PACKAGES]
1006 package = packages_store.get(name)
1007 if package is None: 1007 ↛ 1014line 1007 didn't jump to line 1014 because the condition on line 1007 was always true
1008 if not create_if_absent: 1008 ↛ 1009line 1008 didn't jump to line 1009 because the condition on line 1008 was never true
1009 raise KeyError(name)
1010 assert packages_store is not None
1011 d = MutableYAMLPackageDefinition(packages_store, name)
1012 d.create_definition()
1013 else:
1014 d = MutableYAMLPackageDefinition(packages_store, name)
1015 return d
1017 def write_to(self, fd) -> None:
1018 MANIFEST_YAML.dump(self._store, fd)
1021def _describe_missing_path(entry: VirtualPath) -> str:
1022 if entry.is_dir:
1023 return f"{entry.fs_path}/ (empty directory; possible integration point)"
1024 if entry.is_symlink:
1025 target = os.readlink(entry.fs_path)
1026 return f"{entry.fs_path} (symlink; links to {target})"
1027 if entry.is_file:
1028 return f"{entry.fs_path} (file)"
1029 return f"{entry.fs_path} (other!? Probably not supported by debputy and may need a `remove`)"
1032def _detect_missing_installations(
1033 path_matcher: SourcePathMatcher,
1034 search_dir: VirtualPath,
1035) -> None:
1036 if not search_dir.is_dir: 1036 ↛ 1037line 1036 didn't jump to line 1037 because the condition on line 1036 was never true
1037 return
1038 missing = list(path_matcher.detect_missing(search_dir))
1039 if not missing:
1040 return
1042 excl = textwrap.dedent(
1043 """\
1044 - discard: "*"
1045 """
1046 )
1048 raise PathNotCoveredByInstallRulesError(
1049 "Please review the list and add either install rules or exclusions to `installations` in"
1050 " debian/debputy.manifest. If you do not need any of these paths, add the following to the"
1051 f" end of your 'installations`:\n\n{excl}\n",
1052 missing,
1053 search_dir,
1054 )
1057def _list_automatic_discard_rules(path_matcher: SourcePathMatcher) -> None:
1058 used_discard_rules = path_matcher.used_auto_discard_rules
1059 # Discard rules can match and then be overridden. In that case, they appear
1060 # but have 0 matches.
1061 if not sum((len(v) for v in used_discard_rules.values()), 0):
1062 return
1063 _info("The following automatic discard rules were triggered:")
1064 example_path: Optional[str] = None
1065 for rule in sorted(used_discard_rules):
1066 for fs_path in sorted(used_discard_rules[rule]):
1067 if example_path is None: 1067 ↛ 1069line 1067 didn't jump to line 1069 because the condition on line 1067 was always true
1068 example_path = fs_path
1069 _info(f" * {rule} -> {fs_path}")
1070 assert example_path is not None
1071 _info("")
1072 _info(
1073 "Note that some of these may have been overruled. The overrule detection logic is not"
1074 )
1075 _info("100% reliable.")
1076 _info("")
1077 _info(
1078 "You can overrule an automatic discard rule by explicitly listing the path. As an example:"
1079 )
1080 _info(" installations:")
1081 _info(" - install:")
1082 _info(f" source: {example_path}")
1085def _install_everything_from_source_dir_if_present(
1086 dctrl_bin: BinaryPackage,
1087 substitution: Substitution,
1088 path_matcher: SourcePathMatcher,
1089 install_rule_context: InstallRuleContext,
1090 source_condition_context: ConditionContext,
1091 source_dir: VirtualPath,
1092 *,
1093 into_dir: Optional[VirtualPath] = None,
1094) -> None:
1095 attribute_path = AttributePath.builtin_path()[f"installing {source_dir.fs_path}"]
1096 pkg_set = frozenset([dctrl_bin])
1097 install_rule = run_in_context_of_plugin(
1098 "debputy",
1099 InstallRule.install_dest,
1100 [FileSystemMatchRule.from_path_match("*", attribute_path, substitution)],
1101 None,
1102 pkg_set,
1103 f"Built-in; install everything from {source_dir.fs_path} into {dctrl_bin.name}",
1104 None,
1105 )
1106 pkg_search_dir: Tuple[SearchDir] = (
1107 SearchDir(
1108 source_dir,
1109 pkg_set,
1110 ),
1111 )
1112 replacements = {
1113 "search_dirs": pkg_search_dir,
1114 }
1115 if into_dir is not None: 1115 ↛ 1116line 1115 didn't jump to line 1116 because the condition on line 1115 was never true
1116 binary_package_contexts = dict(install_rule_context.binary_package_contexts)
1117 updated = binary_package_contexts[dctrl_bin.name].replace(fs_root=into_dir)
1118 binary_package_contexts[dctrl_bin.name] = updated
1119 replacements["binary_package_contexts"] = binary_package_contexts
1121 fake_install_rule_context = install_rule_context.replace(**replacements)
1122 try:
1123 install_rule.perform_install(
1124 path_matcher,
1125 fake_install_rule_context,
1126 source_condition_context,
1127 )
1128 except (
1129 NoMatchForInstallPatternError,
1130 PathAlreadyInstalledOrDiscardedError,
1131 ):
1132 # Empty directory or everything excluded by default; ignore the error
1133 pass
1136def _add_build_install_dirs_to_per_package_search_dirs(
1137 build_system_install_dirs: Sequence[Tuple[str, FrozenSet[BinaryPackage]]],
1138 per_package_search_dirs: Dict[BinaryPackage, List[VirtualPath]],
1139 as_path: Callable[[str], VirtualPath],
1140) -> None:
1141 seen_pp_search_dirs: Set[Tuple[BinaryPackage, str]] = set()
1142 for dest_dir, for_packages in build_system_install_dirs:
1143 dest_path = as_path(dest_dir)
1144 for pkg in for_packages:
1145 seen_key = (pkg, dest_dir)
1146 if seen_key in seen_pp_search_dirs:
1147 continue
1148 seen_pp_search_dirs.add(seen_key)
1149 if pkg not in per_package_search_dirs:
1150 per_package_search_dirs[pkg] = [dest_path]
1151 else:
1152 per_package_search_dirs[pkg].append(dest_path)
1155class HighLevelManifest:
1156 def __init__(
1157 self,
1158 manifest_path: str,
1159 mutable_manifest: Optional[MutableYAMLManifest],
1160 install_rules: Optional[List[InstallRule]],
1161 source_package: SourcePackage,
1162 binary_packages: Mapping[str, BinaryPackage],
1163 substitution: Substitution,
1164 package_transformations: Mapping[str, PackageTransformationDefinition],
1165 dpkg_architecture_variables: DpkgArchitectureBuildProcessValuesTable,
1166 dpkg_arch_query_table: DpkgArchTable,
1167 build_env: DebBuildOptionsAndProfiles,
1168 build_environments: BuildEnvironments,
1169 build_rules: Optional[List[BuildRule]],
1170 plugin_provided_feature_set: PluginProvidedFeatureSet,
1171 debian_dir: VirtualPath,
1172 ) -> None:
1173 self.manifest_path = manifest_path
1174 self.mutable_manifest = mutable_manifest
1175 self._install_rules = install_rules
1176 self._source_package = source_package
1177 self._binary_packages = binary_packages
1178 self.substitution = substitution
1179 self.package_transformations = package_transformations
1180 self._dpkg_architecture_variables = dpkg_architecture_variables
1181 self._dpkg_arch_query_table = dpkg_arch_query_table
1182 self._build_env = build_env
1183 self._used_for: Set[str] = set()
1184 self.build_environments = build_environments
1185 self.build_rules = build_rules
1186 self._plugin_provided_feature_set = plugin_provided_feature_set
1187 self._debian_dir = debian_dir
1188 self._source_condition_context = ConditionContext(
1189 binary_package=None,
1190 substitution=self.substitution,
1191 deb_options_and_profiles=self._build_env,
1192 dpkg_architecture_variables=self._dpkg_architecture_variables,
1193 dpkg_arch_query_table=self._dpkg_arch_query_table,
1194 )
1196 def source_version(self, include_binnmu_version: bool = True) -> str:
1197 # TODO: There should an easier way to determine the source version; really.
1198 version_var = "{{DEB_VERSION}}"
1199 if not include_binnmu_version:
1200 version_var = "{{_DEBPUTY_INTERNAL_NON_BINNMU_SOURCE}}"
1201 try:
1202 return self.substitution.substitute(
1203 version_var, "internal (resolve version)"
1204 )
1205 except DebputySubstitutionError as e:
1206 raise AssertionError(f"Could not resolve {version_var}") from e
1208 @property
1209 def source_condition_context(self) -> ConditionContext:
1210 return self._source_condition_context
1212 @property
1213 def debian_dir(self) -> VirtualPath:
1214 return self._debian_dir
1216 @property
1217 def dpkg_architecture_variables(self) -> DpkgArchitectureBuildProcessValuesTable:
1218 return self._dpkg_architecture_variables
1220 @property
1221 def deb_options_and_profiles(self) -> DebBuildOptionsAndProfiles:
1222 return self._build_env
1224 @property
1225 def plugin_provided_feature_set(self) -> PluginProvidedFeatureSet:
1226 return self._plugin_provided_feature_set
1228 @property
1229 def active_packages(self) -> Iterable[BinaryPackage]:
1230 yield from (p for p in self._binary_packages.values() if p.should_be_acted_on)
1232 @property
1233 def all_packages(self) -> Iterable[BinaryPackage]:
1234 yield from self._binary_packages.values()
1236 def package_state_for(self, package: str) -> PackageTransformationDefinition:
1237 return self.package_transformations[package]
1239 def _detect_doc_main_package_for(self, package: BinaryPackage) -> BinaryPackage:
1240 name = package.name
1241 # If it is not a -doc package, then docs should be installed
1242 # under its own package name.
1243 if not name.endswith("-doc"): 1243 ↛ 1245line 1243 didn't jump to line 1245 because the condition on line 1243 was always true
1244 return package
1245 name = name[:-4]
1246 main_package = self._binary_packages.get(name)
1247 if main_package:
1248 return main_package
1249 if name.startswith("lib"):
1250 dev_pkg = self._binary_packages.get(f"{name}-dev")
1251 if dev_pkg:
1252 return dev_pkg
1254 # If we found no better match; default to the doc package itself.
1255 return package
1257 def perform_installations(
1258 self,
1259 integration_mode: DebputyIntegrationMode,
1260 build_system_install_dirs: Sequence[Tuple[str, FrozenSet[BinaryPackage]]],
1261 *,
1262 install_request_context: Optional[InstallSearchDirContext] = None,
1263 ) -> PackageDataTable:
1264 package_data_dict = {}
1265 package_data_table = PackageDataTable(package_data_dict)
1266 enable_manifest_installation_feature = (
1267 integration_mode != INTEGRATION_MODE_DH_DEBPUTY_RRR
1268 )
1270 if build_system_install_dirs: 1270 ↛ 1271line 1270 didn't jump to line 1271 because the condition on line 1270 was never true
1271 if integration_mode != INTEGRATION_MODE_FULL:
1272 raise ValueError(
1273 "The build_system_install_dirs parameter can only be used in full integration mode"
1274 )
1275 if install_request_context:
1276 raise ValueError(
1277 "The build_system_install_dirs parameter cannot be used with install_request_context"
1278 " (not implemented)"
1279 )
1281 if install_request_context is None: 1281 ↛ 1283line 1281 didn't jump to line 1283 because the condition on line 1281 was never true
1283 @functools.lru_cache(None)
1284 def _as_path(fs_path: str) -> VirtualPath:
1285 return FSROOverlay.create_root_dir(".", fs_path)
1287 dtmp_dir = _as_path("debian/tmp")
1288 source_root_dir = _as_path(".")
1289 into = frozenset(self._binary_packages.values())
1290 per_package_search_dirs = {
1291 t.binary_package: [_as_path(f.match_rule.path) for f in t.search_dirs]
1292 for t in self.package_transformations.values()
1293 if t.search_dirs is not None
1294 }
1296 if integration_mode == INTEGRATION_MODE_FULL:
1297 # In this mode, we have no default search dir. Everything ends up being
1298 # per-package instead (since that is easier logic-wise).
1299 #
1300 # Even dtmp_dir is omitted here (since it is not universally applicable).
1301 # Note we still initialize dtmp_dir, since it affects a later guard.
1302 default_search_dirs = []
1303 _add_build_install_dirs_to_per_package_search_dirs(
1304 build_system_install_dirs,
1305 per_package_search_dirs,
1306 _as_path,
1307 )
1309 # We can end here with per_package_search_dirs having no search dirs for any package
1310 # (single binary, where everything is installed into d/<pkg> is the most common case).
1311 #
1312 # This is not a problem in itself as the installation rules can still apply to the
1313 # source root and there should be no reason to install something from d/<pkg> into
1314 # d/<another-pkg>
1315 else:
1316 default_search_dirs = [dtmp_dir]
1318 search_dirs = _determine_search_dir_order(
1319 per_package_search_dirs,
1320 into,
1321 default_search_dirs,
1322 source_root_dir,
1323 )
1324 check_for_uninstalled_dirs = tuple(
1325 s.search_dir
1326 for s in search_dirs
1327 if s.search_dir.fs_path != source_root_dir.fs_path
1328 )
1329 if enable_manifest_installation_feature:
1330 _present_installation_dirs(
1331 search_dirs, check_for_uninstalled_dirs, into
1332 )
1333 else:
1334 dtmp_dir = None
1335 search_dirs = install_request_context.search_dirs
1336 into = frozenset(self._binary_packages.values())
1337 seen: Set[BinaryPackage] = set()
1338 for search_dir in search_dirs:
1339 seen.update(search_dir.applies_to)
1341 missing = into - seen
1342 if missing: 1342 ↛ 1343line 1342 didn't jump to line 1343 because the condition on line 1342 was never true
1343 names = ", ".join(p.name for p in missing)
1344 raise ValueError(
1345 f"The following package(s) had no search dirs: {names}."
1346 " (Generally, the source root would be applicable to all packages)"
1347 )
1348 extra_names = seen - into
1349 if extra_names: 1349 ↛ 1350line 1349 didn't jump to line 1350 because the condition on line 1349 was never true
1350 names = ", ".join(p.name for p in extra_names)
1351 raise ValueError(
1352 f"The install_request_context referenced the following unknown package(s): {names}"
1353 )
1355 check_for_uninstalled_dirs = (
1356 install_request_context.check_for_uninstalled_dirs
1357 )
1359 install_rule_context = InstallRuleContext(search_dirs)
1361 if ( 1361 ↛ 1368line 1361 didn't jump to line 1368 because the condition on line 1361 was never true
1362 enable_manifest_installation_feature
1363 and self._install_rules is None
1364 # TODO: Should we also do this for full mode when build systems provided search dirs?
1365 and dtmp_dir is not None
1366 and os.path.isdir(dtmp_dir.fs_path)
1367 ):
1368 msg = (
1369 "The build system appears to have provided the output of upstream build system's"
1370 " install in debian/tmp. However, these are no provisions for debputy to install"
1371 " any of that into any of the debian packages listed in debian/control."
1372 " To avoid accidentally creating empty packages, debputy will insist that you "
1373 " explicitly define an empty installation definition if you did not want to "
1374 " install any of those files even though they have been provided."
1375 ' Example: "installations: []"'
1376 )
1377 _error(msg)
1378 elif ( 1378 ↛ 1381line 1378 didn't jump to line 1381 because the condition on line 1378 was never true
1379 not enable_manifest_installation_feature and self._install_rules is not None
1380 ):
1381 _error(
1382 f"The `installations` feature cannot be used in {self.manifest_path} with this integration mode."
1383 f" Please remove or comment out the `installations` keyword."
1384 )
1386 for dctrl_bin in self.all_packages:
1387 package = dctrl_bin.name
1388 doc_main_package = self._detect_doc_main_package_for(dctrl_bin)
1390 install_rule_context[package] = BinaryPackageInstallRuleContext(
1391 dctrl_bin,
1392 FSRootDir(),
1393 doc_main_package,
1394 )
1396 if enable_manifest_installation_feature: 1396 ↛ 1401line 1396 didn't jump to line 1401 because the condition on line 1396 was always true
1397 discard_rules = list(
1398 self.plugin_provided_feature_set.auto_discard_rules.values()
1399 )
1400 else:
1401 discard_rules = [
1402 self.plugin_provided_feature_set.auto_discard_rules["debian-dir"]
1403 ]
1404 path_matcher = SourcePathMatcher(discard_rules)
1406 source_condition_context = self._source_condition_context
1408 for dctrl_bin in self.active_packages:
1409 package = dctrl_bin.name
1410 if install_request_context: 1410 ↛ 1415line 1410 didn't jump to line 1415 because the condition on line 1410 was always true
1411 build_system_staging_dir = install_request_context.debian_pkg_dirs.get(
1412 package
1413 )
1414 else:
1415 build_system_staging_dir_fs_path = os.path.join("debian", package)
1416 if os.path.isdir(build_system_staging_dir_fs_path):
1417 build_system_staging_dir = FSROOverlay.create_root_dir(
1418 ".",
1419 build_system_staging_dir_fs_path,
1420 )
1421 else:
1422 build_system_staging_dir = None
1424 if build_system_staging_dir is not None:
1425 _install_everything_from_source_dir_if_present(
1426 dctrl_bin,
1427 self.substitution,
1428 path_matcher,
1429 install_rule_context,
1430 source_condition_context,
1431 build_system_staging_dir,
1432 )
1434 if self._install_rules:
1435 # FIXME: Check that every install rule remains used after transformations have run.
1436 # What we want to check is transformations do not exclude everything from an install
1437 # rule. The hard part here is that renaming (etc.) is fine, so we cannot 1:1 string
1438 # match.
1439 for install_rule in self._install_rules:
1440 install_rule.perform_install(
1441 path_matcher,
1442 install_rule_context,
1443 source_condition_context,
1444 )
1446 if enable_manifest_installation_feature: 1446 ↛ 1450line 1446 didn't jump to line 1450 because the condition on line 1446 was always true
1447 for search_dir in check_for_uninstalled_dirs:
1448 _detect_missing_installations(path_matcher, search_dir)
1450 for dctrl_bin in self.all_packages:
1451 package = dctrl_bin.name
1452 binary_install_rule_context = install_rule_context[package]
1453 build_system_pkg_staging_dir = os.path.join("debian", package)
1454 fs_root = binary_install_rule_context.fs_root
1456 context = self.package_transformations[package]
1457 if dctrl_bin.should_be_acted_on and enable_manifest_installation_feature:
1458 for special_install_rule in context.install_rules: 1458 ↛ 1459line 1458 didn't jump to line 1459 because the loop on line 1458 never started
1459 special_install_rule.perform_install(
1460 path_matcher,
1461 install_rule_context,
1462 source_condition_context,
1463 )
1465 if dctrl_bin.should_be_acted_on:
1466 self.apply_fs_transformations(package, fs_root)
1467 substvars_file = f"debian/{package}.substvars"
1468 substvars = FlushableSubstvars.load_from_path(
1469 substvars_file, missing_ok=True
1470 )
1471 # We do not want to touch the substvars file (non-clean rebuild contamination)
1472 substvars.substvars_path = None
1473 else:
1474 substvars = FlushableSubstvars()
1476 udeb_package = self._binary_packages.get(f"{package}-udeb")
1477 if udeb_package and not udeb_package.is_udeb: 1477 ↛ 1478line 1477 didn't jump to line 1478 because the condition on line 1477 was never true
1478 udeb_package = None
1480 package_metadata_context = PackageProcessingContextProvider(
1481 self,
1482 dctrl_bin,
1483 udeb_package,
1484 package_data_table,
1485 # FIXME: source_package
1486 )
1488 ctrl_creator = BinaryCtrlAccessorProviderCreator(
1489 package_metadata_context,
1490 substvars,
1491 context.maintscript_snippets,
1492 context.substitution,
1493 )
1495 if not enable_manifest_installation_feature: 1495 ↛ 1496line 1495 didn't jump to line 1496 because the condition on line 1495 was never true
1496 assert_no_dbgsym_migration(dctrl_bin)
1497 dh_dbgsym_root_fs = dhe_dbgsym_root_dir(dctrl_bin)
1498 dh_dbgsym_root_path = FSROOverlay.create_root_dir(
1499 "",
1500 dh_dbgsym_root_fs,
1501 )
1502 dbgsym_root_fs = FSRootDir()
1503 _install_everything_from_source_dir_if_present(
1504 dctrl_bin,
1505 self.substitution,
1506 path_matcher,
1507 install_rule_context,
1508 source_condition_context,
1509 dh_dbgsym_root_path,
1510 into_dir=dbgsym_root_fs,
1511 )
1512 dbgsym_build_ids = read_dbgsym_file(dctrl_bin)
1513 dbgsym_info = DbgsymInfo(
1514 dctrl_bin,
1515 dbgsym_root_fs,
1516 os.path.join(dh_dbgsym_root_fs, "DEBIAN"),
1517 dbgsym_build_ids,
1518 )
1519 else:
1520 dbgsym_info = DbgsymInfo(
1521 dctrl_bin,
1522 FSRootDir(),
1523 None,
1524 [],
1525 )
1527 package_data_dict[package] = BinaryPackageData(
1528 self._source_package,
1529 dctrl_bin,
1530 build_system_pkg_staging_dir,
1531 fs_root,
1532 substvars,
1533 package_metadata_context,
1534 ctrl_creator,
1535 dbgsym_info,
1536 )
1538 if enable_manifest_installation_feature: 1538 ↛ 1541line 1538 didn't jump to line 1541 because the condition on line 1538 was always true
1539 _list_automatic_discard_rules(path_matcher)
1541 return package_data_table
1543 def condition_context(
1544 self, binary_package: Optional[Union[BinaryPackage, str]]
1545 ) -> ConditionContext:
1546 if binary_package is None: 1546 ↛ 1547line 1546 didn't jump to line 1547 because the condition on line 1546 was never true
1547 return self._source_condition_context
1548 if not isinstance(binary_package, str): 1548 ↛ 1549line 1548 didn't jump to line 1549 because the condition on line 1548 was never true
1549 binary_package = binary_package.name
1551 package_transformation = self.package_transformations[binary_package]
1552 return self._source_condition_context.replace(
1553 binary_package=package_transformation.binary_package,
1554 substitution=package_transformation.substitution,
1555 )
1557 def apply_fs_transformations(
1558 self,
1559 package: str,
1560 fs_root: FSPath,
1561 ) -> None:
1562 if package in self._used_for: 1562 ↛ 1563line 1562 didn't jump to line 1563 because the condition on line 1562 was never true
1563 raise ValueError(
1564 f"data.tar contents for {package} has already been finalized!?"
1565 )
1566 if package not in self.package_transformations: 1566 ↛ 1567line 1566 didn't jump to line 1567 because the condition on line 1566 was never true
1567 raise ValueError(
1568 f'The package "{package}" was not relevant for the manifest!?'
1569 )
1570 package_transformation = self.package_transformations[package]
1571 condition_context = ConditionContext(
1572 binary_package=package_transformation.binary_package,
1573 substitution=package_transformation.substitution,
1574 deb_options_and_profiles=self._build_env,
1575 dpkg_architecture_variables=self._dpkg_architecture_variables,
1576 dpkg_arch_query_table=self._dpkg_arch_query_table,
1577 )
1578 norm_rules = list(
1579 builtin_mode_normalization_rules(
1580 self._dpkg_architecture_variables,
1581 package_transformation.binary_package,
1582 package_transformation.substitution,
1583 )
1584 )
1585 norm_mode_transformation_rule = ModeNormalizationTransformationRule(norm_rules)
1586 norm_mode_transformation_rule.transform_file_system(fs_root, condition_context)
1587 for transformation in package_transformation.transformations:
1588 transformation.run_transform_file_system(fs_root, condition_context)
1589 interpreter_normalization = NormalizeShebangLineTransformation()
1590 interpreter_normalization.transform_file_system(fs_root, condition_context)
1592 def finalize_data_tar_contents(
1593 self,
1594 package: str,
1595 fs_root: FSPath,
1596 clamp_mtime_to: int,
1597 ) -> IntermediateManifest:
1598 if package in self._used_for: 1598 ↛ 1599line 1598 didn't jump to line 1599 because the condition on line 1598 was never true
1599 raise ValueError(
1600 f"data.tar contents for {package} has already been finalized!?"
1601 )
1602 if package not in self.package_transformations: 1602 ↛ 1603line 1602 didn't jump to line 1603 because the condition on line 1602 was never true
1603 raise ValueError(
1604 f'The package "{package}" was not relevant for the manifest!?'
1605 )
1606 self._used_for.add(package)
1608 # At this point, there so be no further mutations to the file system (because the will not
1609 # be present in the intermediate manifest)
1610 cast("FSRootDir", fs_root).is_read_write = False
1612 intermediate_manifest = list(
1613 _generate_intermediate_manifest(
1614 fs_root,
1615 clamp_mtime_to,
1616 )
1617 )
1618 return intermediate_manifest
1620 def apply_to_binary_staging_directory(
1621 self,
1622 package: str,
1623 fs_root: FSPath,
1624 clamp_mtime_to: int,
1625 ) -> IntermediateManifest:
1626 self.apply_fs_transformations(package, fs_root)
1627 return self.finalize_data_tar_contents(package, fs_root, clamp_mtime_to)
1630@dataclasses.dataclass(slots=True)
1631class SearchDirOrderState:
1632 search_dir: VirtualPath
1633 applies_to: Union[Set[BinaryPackage], FrozenSet[BinaryPackage]] = dataclasses.field(
1634 default_factory=set
1635 )
1636 after: Set[str] = dataclasses.field(default_factory=set)
1639def _present_installation_dirs(
1640 search_dirs: Sequence[SearchDir],
1641 checked_missing_dirs: Sequence[VirtualPath],
1642 all_pkgs: FrozenSet[BinaryPackage],
1643) -> None:
1644 _info("The following directories are considered search dirs (in order):")
1645 max_len = max((len(s.search_dir.fs_path) for s in search_dirs), default=1)
1646 for search_dir in search_dirs:
1647 applies_to = ""
1648 if search_dir.applies_to < all_pkgs:
1649 names = ", ".join(p.name for p in search_dir.applies_to)
1650 applies_to = f" [only applicable to: {names}]"
1651 remark = ""
1652 if not os.path.isdir(search_dir.search_dir.fs_path):
1653 remark = " (skipped; absent)"
1654 _info(f" * {search_dir.search_dir.fs_path:{max_len}}{applies_to}{remark}")
1656 if checked_missing_dirs:
1657 _info('The following directories are considered for "not-installed" paths;')
1658 for d in checked_missing_dirs:
1659 remark = ""
1660 if not os.path.isdir(d.fs_path):
1661 remark = " (skipped; absent)"
1662 _info(f" * {d.fs_path:{max_len}}{remark}")
1665def _determine_search_dir_order(
1666 requested: Mapping[BinaryPackage, List[VirtualPath]],
1667 all_pkgs: FrozenSet[BinaryPackage],
1668 default_search_dirs: List[VirtualPath],
1669 source_root: VirtualPath,
1670) -> Sequence[SearchDir]:
1671 search_dir_table = {}
1672 assert requested.keys() <= all_pkgs
1673 for pkg in all_pkgs:
1674 paths = requested.get(pkg, default_search_dirs)
1675 previous_search_dir: Optional[SearchDirOrderState] = None
1676 for path in paths:
1677 try:
1678 search_dir_state = search_dir_table[path.fs_path]
1679 except KeyError:
1680 search_dir_state = SearchDirOrderState(path)
1681 search_dir_table[path.fs_path] = search_dir_state
1682 search_dir_state.applies_to.add(pkg)
1683 if previous_search_dir is not None:
1684 search_dir_state.after.add(previous_search_dir.search_dir.fs_path)
1685 previous_search_dir = search_dir_state
1687 search_dirs_in_order = []
1688 released = set()
1689 remaining = set()
1690 for search_dir_state in search_dir_table.values():
1691 if not (search_dir_state.after <= released):
1692 remaining.add(search_dir_state.search_dir.fs_path)
1693 continue
1694 search_dirs_in_order.append(search_dir_state)
1695 released.add(search_dir_state.search_dir.fs_path)
1697 while remaining:
1698 current_released = len(released)
1699 for fs_path in remaining:
1700 search_dir_state = search_dir_table[fs_path]
1701 if not search_dir_state.after.issubset(released):
1702 remaining.add(search_dir_state.search_dir.fs_path)
1703 continue
1704 search_dirs_in_order.append(search_dir_state)
1705 released.add(search_dir_state.search_dir.fs_path)
1707 if current_released == len(released):
1708 names = ", ".join(remaining)
1709 _error(
1710 f"There is a circular dependency (somewhere) between the search dirs: {names}."
1711 " Note that the search directories across all packages have to be ordered (and the"
1712 " source root should generally be last)"
1713 )
1714 remaining -= released
1716 search_dirs_in_order.append(
1717 SearchDirOrderState(
1718 source_root,
1719 all_pkgs,
1720 )
1721 )
1723 return tuple(
1724 # Avoid duplicating all_pkgs
1725 SearchDir(
1726 s.search_dir,
1727 frozenset(s.applies_to) if s.applies_to != all_pkgs else all_pkgs,
1728 )
1729 for s in search_dirs_in_order
1730 )