Coverage for src/debputy/commands/debputy_cmd/context.py: 38%
347 statements
« prev ^ index » next coverage.py v7.6.0, created at 2025-01-27 13:59 +0000
« prev ^ index » next coverage.py v7.6.0, created at 2025-01-27 13:59 +0000
1import argparse
2import dataclasses
3import errno
4import logging
5import os
6from typing import (
7 Optional,
8 Tuple,
9 Mapping,
10 FrozenSet,
11 Set,
12 Union,
13 Sequence,
14 Iterable,
15 Callable,
16 Dict,
17 TYPE_CHECKING,
18 Literal,
19)
21from debian.debian_support import DpkgArchTable
23from debputy._deb_options_profiles import DebBuildOptionsAndProfiles
24from debputy.architecture_support import (
25 DpkgArchitectureBuildProcessValuesTable,
26 dpkg_architecture_table,
27)
28from debputy.dh.dh_assistant import read_dh_addon_sequences
29from debputy.exceptions import DebputyRuntimeError
30from debputy.filesystem_scan import FSROOverlay
31from debputy.highlevel_manifest import HighLevelManifest
32from debputy.highlevel_manifest_parser import YAMLManifestParser
33from debputy.integration_detection import determine_debputy_integration_mode
34from debputy.packages import (
35 SourcePackage,
36 BinaryPackage,
37 DctrlParser,
38)
39from debputy.plugin.api import VirtualPath
40from debputy.plugin.api.impl import load_plugin_features
41from debputy.plugin.api.feature_set import PluginProvidedFeatureSet
42from debputy.plugin.api.spec import DebputyIntegrationMode
43from debputy.substitution import (
44 Substitution,
45 VariableContext,
46 SubstitutionImpl,
47 NULL_SUBSTITUTION,
48)
49from debputy.util import (
50 _error,
51 PKGNAME_REGEX,
52 resolve_source_date_epoch,
53 setup_logging,
54 PRINT_COMMAND,
55 change_log_level,
56 _warn,
57 TRACE_LOG,
58)
60if TYPE_CHECKING:
61 from argparse import _SubParsersAction
64CommandHandler = Callable[["CommandContext"], None]
65ArgparserConfigurator = Callable[[argparse.ArgumentParser], None]
68def add_arg(
69 *name_or_flags: str,
70 **kwargs,
71) -> Callable[[argparse.ArgumentParser], None]:
72 def _configurator(argparser: argparse.ArgumentParser) -> None:
73 argparser.add_argument(
74 *name_or_flags,
75 **kwargs,
76 )
78 return _configurator
81@dataclasses.dataclass(slots=True, frozen=True)
82class CommandArg:
83 parsed_args: argparse.Namespace
84 plugin_search_dirs: Sequence[str]
87@dataclasses.dataclass
88class Command:
89 handler: Callable[["CommandContext"], None]
90 require_substitution: bool = True
91 requested_plugins_only: bool = False
94def _host_dpo_to_dbo(opt_and_profiles: "DebBuildOptionsAndProfiles", v: str) -> bool:
96 if (
97 v in opt_and_profiles.deb_build_profiles
98 and v not in opt_and_profiles.deb_build_options
99 ):
100 val = os.environ.get("DEB_BUILD_OPTIONS", "") + " " + v
101 _warn(
102 f'Copying "{v}" into DEB_BUILD_OPTIONS: It was in DEB_BUILD_PROFILES but not in DEB_BUILD_OPTIONS'
103 )
104 os.environ["DEB_BUILD_OPTIONS"] = val.lstrip()
105 # Note: It will not be immediately visible since `DebBuildOptionsAndProfiles` caches the result
106 return True
107 return False
110class CommandContext:
111 def __init__(
112 self,
113 parsed_args: argparse.Namespace,
114 plugin_search_dirs: Sequence[str],
115 require_substitution: bool = True,
116 requested_plugins_only: bool = False,
117 ) -> None:
118 self.parsed_args = parsed_args
119 self.plugin_search_dirs = plugin_search_dirs
120 self._require_substitution = require_substitution
121 self._requested_plugins_only = requested_plugins_only
122 self._debputy_plugin_feature_set: PluginProvidedFeatureSet = (
123 PluginProvidedFeatureSet()
124 )
125 self._debian_dir = FSROOverlay.create_root_dir("debian", "debian")
126 self._mtime: Optional[int] = None
127 self._source_variables: Optional[Mapping[str, str]] = None
128 self._substitution: Optional[Substitution] = None
129 self._requested_plugins: Optional[Sequence[str]] = None
130 self._plugins_loaded = False
131 self._dctrl_parser: Optional[DctrlParser] = None
132 self.debputy_integration_mode: Optional[DebputyIntegrationMode] = None
133 self._dctrl_data: Optional[
134 Tuple[
135 "SourcePackage",
136 Mapping[str, "BinaryPackage"],
137 ]
138 ] = None
139 self._package_set: Literal["both", "arch", "indep"] = "both"
141 @property
142 def package_set(self) -> Literal["both", "arch", "indep"]:
143 return self._package_set
145 @package_set.setter
146 def package_set(self, new_value: Literal["both", "arch", "indep"]) -> None:
147 if self._dctrl_parser is not None:
148 raise TypeError(
149 "package_set cannot be redefined once the debian/control parser has been initialized"
150 )
151 self._package_set = new_value
153 @property
154 def debian_dir(self) -> VirtualPath:
155 return self._debian_dir
157 @property
158 def mtime(self) -> int:
159 if self._mtime is None:
160 self._mtime = resolve_source_date_epoch(
161 None,
162 substitution=self.substitution,
163 )
164 return self._mtime
166 @property
167 def dctrl_parser(self) -> DctrlParser:
168 parser = self._dctrl_parser
169 if parser is None:
170 packages: Union[Set[str], FrozenSet[str]] = frozenset()
171 if hasattr(self.parsed_args, "packages"):
172 packages = self.parsed_args.packages
174 instance = DebBuildOptionsAndProfiles(environ=os.environ)
176 dirty = _host_dpo_to_dbo(instance, "nodoc")
177 dirty = _host_dpo_to_dbo(instance, "nocheck") or dirty
179 if dirty:
180 instance = DebBuildOptionsAndProfiles(environ=os.environ)
182 parser = DctrlParser(
183 packages, # -p/--package
184 set(), # -N/--no-package
185 # binary-indep and binary-indep (dpkg BuildDriver integration only)
186 self._package_set == "indep",
187 self._package_set == "arch",
188 deb_options_and_profiles=instance,
189 dpkg_architecture_variables=dpkg_architecture_table(),
190 dpkg_arch_query_table=DpkgArchTable.load_arch_table(),
191 )
192 self._dctrl_parser = parser
193 return parser
195 def source_package(self) -> SourcePackage:
196 source, _ = self._parse_dctrl()
197 return source
199 def binary_packages(self) -> Mapping[str, "BinaryPackage"]:
200 _, binary_package_table = self._parse_dctrl()
201 return binary_package_table
203 def dpkg_architecture_variables(self) -> DpkgArchitectureBuildProcessValuesTable:
204 return self.dctrl_parser.dpkg_architecture_variables
206 def requested_plugins(self) -> Sequence[str]:
207 if self._requested_plugins is None:
208 self._requested_plugins = self._resolve_requested_plugins()
209 return self._requested_plugins
211 def required_plugins(self) -> Set[str]:
212 return set(getattr(self.parsed_args, "required_plugins") or [])
214 @property
215 def deb_build_options_and_profiles(self) -> "DebBuildOptionsAndProfiles":
216 return self.dctrl_parser.deb_options_and_profiles
218 @property
219 def deb_build_options(self) -> Mapping[str, Optional[str]]:
220 return self.deb_build_options_and_profiles.deb_build_options
222 def _create_substitution(
223 self,
224 parsed_args: argparse.Namespace,
225 plugin_feature_set: PluginProvidedFeatureSet,
226 debian_dir: VirtualPath,
227 ) -> Substitution:
228 requested_subst = self._require_substitution
229 if hasattr(parsed_args, "substitution"):
230 requested_subst = parsed_args.substitution
231 if requested_subst is False and self._require_substitution:
232 _error(f"--no-substitution cannot be used with {parsed_args.command}")
233 if self._require_substitution or requested_subst is not False:
234 variable_context = VariableContext(debian_dir)
235 return SubstitutionImpl(
236 plugin_feature_set=plugin_feature_set,
237 unresolvable_substitutions=frozenset(["PACKAGE"]),
238 variable_context=variable_context,
239 )
240 return NULL_SUBSTITUTION
242 def load_plugins(self) -> PluginProvidedFeatureSet:
243 if not self._plugins_loaded:
244 requested_plugins = None
245 required_plugins = self.required_plugins()
246 if self._requested_plugins_only:
247 requested_plugins = self.requested_plugins()
248 debug_mode = getattr(self.parsed_args, "debug_mode", False)
249 load_plugin_features(
250 self.plugin_search_dirs,
251 self.substitution,
252 requested_plugins_only=requested_plugins,
253 required_plugins=required_plugins,
254 plugin_feature_set=self._debputy_plugin_feature_set,
255 debug_mode=debug_mode,
256 )
257 self._plugins_loaded = True
258 return self._debputy_plugin_feature_set
260 @staticmethod
261 def _plugin_from_dependency_field(dep_field: str) -> Iterable[str]:
262 package_prefix = "debputy-plugin-"
263 for dep_clause in (d.strip() for d in dep_field.split(",")):
264 dep = dep_clause.split("|")[0].strip()
265 if not dep.startswith(package_prefix):
266 continue
267 m = PKGNAME_REGEX.search(dep)
268 assert m
269 package_name = m.group(0)
270 plugin_name = package_name[len(package_prefix) :]
271 yield plugin_name
273 def _resolve_requested_plugins(self) -> Sequence[str]:
274 source_package, _ = self._parse_dctrl()
275 bd = source_package.fields.get("Build-Depends", "")
276 plugins = list(self._plugin_from_dependency_field(bd))
277 for field_name in ("Build-Depends-Arch", "Build-Depends-Indep"):
278 f = source_package.fields.get(field_name)
279 if not f:
280 continue
281 for plugin in self._plugin_from_dependency_field(f):
282 raise DebputyRuntimeError(
283 f"Cannot load plugins via {field_name}:"
284 f" Please move debputy-plugin-{plugin} dependency to Build-Depends."
285 )
287 return plugins
289 @property
290 def substitution(self) -> Substitution:
291 if self._substitution is None:
292 self._substitution = self._create_substitution(
293 self.parsed_args,
294 self._debputy_plugin_feature_set,
295 self.debian_dir,
296 )
297 return self._substitution
299 def must_be_called_in_source_root(self) -> None:
300 if self.debian_dir.get("control") is None:
301 _error(
302 "This subcommand must be run from a source package root; expecting debian/control to exist."
303 )
305 def _parse_dctrl(
306 self,
307 ) -> Tuple[
308 "SourcePackage",
309 Mapping[str, "BinaryPackage"],
310 ]:
311 if self._dctrl_data is None:
312 try:
313 debian_control = self.debian_dir.get("control")
314 if debian_control is None:
315 raise FileNotFoundError(
316 errno.ENOENT,
317 os.strerror(errno.ENOENT),
318 os.path.join(self.debian_dir.fs_path, "control"),
319 )
320 with debian_control.open() as fd:
321 _, source_package, binary_packages = (
322 self.dctrl_parser.parse_source_debian_control(
323 fd,
324 )
325 )
326 except FileNotFoundError:
327 # We are not using `must_be_called_in_source_root`, because we (in this case) require
328 # the file to be readable (that is, parse_source_debian_control can also raise a
329 # FileNotFoundError when trying to open the file).
330 _error(
331 "This subcommand must be run from a source package root; expecting debian/control to exist."
332 )
334 self._dctrl_data = (
335 source_package,
336 binary_packages,
337 )
339 return self._dctrl_data
341 @property
342 def has_dctrl_file(self) -> bool:
343 debian_control = self.debian_dir.get("control")
344 return debian_control is not None
346 def resolve_integration_mode(
347 self,
348 require_integration: bool = True,
349 ) -> DebputyIntegrationMode:
350 integration_mode = self.debputy_integration_mode
351 if integration_mode is None:
352 r = read_dh_addon_sequences(self.debian_dir)
353 bd_sequences, dr_sequences, _ = r
354 all_sequences = bd_sequences | dr_sequences
355 integration_mode = determine_debputy_integration_mode(
356 self.source_package().fields,
357 all_sequences,
358 )
359 if integration_mode is None and not require_integration:
360 _error(
361 "Cannot resolve the integration mode expected for this package. Is this package using `debputy`?"
362 )
363 self.debputy_integration_mode = integration_mode
364 return integration_mode
366 def set_log_level_for_build_subcommand(self) -> Optional[int]:
367 parsed_args = self.parsed_args
368 log_level: Optional[int] = None
369 if os.environ.get("DH_VERBOSE", "") != "":
370 log_level = PRINT_COMMAND
371 if parsed_args.debug_mode:
372 log_level = logging.DEBUG
373 if log_level is not None:
374 change_log_level(log_level)
375 return log_level
377 def manifest_parser(
378 self,
379 *,
380 manifest_path: Optional[str] = None,
381 ) -> YAMLManifestParser:
382 substitution = self.substitution
383 dctrl_parser = self.dctrl_parser
385 source_package, binary_packages = self._parse_dctrl()
387 if self.parsed_args.debputy_manifest is not None:
388 manifest_path = self.parsed_args.debputy_manifest
389 if manifest_path is None:
390 manifest_path = os.path.join(self.debian_dir.fs_path, "debputy.manifest")
391 return YAMLManifestParser(
392 manifest_path,
393 source_package,
394 binary_packages,
395 substitution,
396 dctrl_parser.dpkg_architecture_variables,
397 dctrl_parser.dpkg_arch_query_table,
398 dctrl_parser.deb_options_and_profiles,
399 self.load_plugins(),
400 self.resolve_integration_mode(),
401 debian_dir=self.debian_dir,
402 )
404 def parse_manifest(
405 self,
406 *,
407 manifest_path: Optional[str] = None,
408 ) -> HighLevelManifest:
409 substitution = self.substitution
410 manifest_required = False
412 if self.parsed_args.debputy_manifest is not None:
413 manifest_path = self.parsed_args.debputy_manifest
414 manifest_required = True
415 if manifest_path is None:
416 manifest_path = os.path.join(self.debian_dir.fs_path, "debputy.manifest")
417 parser = self.manifest_parser(manifest_path=manifest_path)
419 os.environ["SOURCE_DATE_EPOCH"] = substitution.substitute(
420 "{{SOURCE_DATE_EPOCH}}",
421 "Internal resolution",
422 )
423 if os.path.isfile(manifest_path):
424 return parser.parse_manifest()
425 if manifest_required:
426 _error(f'The path "{manifest_path}" is not a file!')
427 return parser.build_manifest()
430class CommandBase:
431 __slots__ = ()
433 def configure(self, argparser: argparse.ArgumentParser) -> None:
434 # Does nothing by default
435 pass
437 def __call__(self, command_arg: CommandArg) -> None:
438 raise NotImplementedError
441class SubcommandBase(CommandBase):
442 __slots__ = ("name", "aliases", "help_description")
444 def __init__(
445 self,
446 name: str,
447 *,
448 aliases: Sequence[str] = tuple(),
449 help_description: Optional[str] = None,
450 ) -> None:
451 self.name = name
452 self.aliases = aliases
453 self.help_description = help_description
455 def add_subcommand_to_subparser(
456 self,
457 subparser: "_SubParsersAction",
458 ) -> argparse.ArgumentParser:
459 parser = subparser.add_parser(
460 self.name,
461 aliases=self.aliases,
462 help=self.help_description,
463 allow_abbrev=False,
464 )
465 self.configure(parser)
466 return parser
469class GenericSubCommand(SubcommandBase):
470 __slots__ = (
471 "_handler",
472 "_configure_handler",
473 "_require_substitution",
474 "_requested_plugins_only",
475 "_log_only_to_stderr",
476 "_default_log_level",
477 )
479 def __init__(
480 self,
481 name: str,
482 handler: Callable[[CommandContext], None],
483 *,
484 aliases: Sequence[str] = tuple(),
485 help_description: Optional[str] = None,
486 configure_handler: Optional[Callable[[argparse.ArgumentParser], None]] = None,
487 require_substitution: bool = True,
488 requested_plugins_only: bool = False,
489 log_only_to_stderr: bool = False,
490 default_log_level: Union[int, Callable[[CommandContext], int]] = logging.INFO,
491 ) -> None:
492 super().__init__(name, aliases=aliases, help_description=help_description)
493 self._handler = handler
494 self._configure_handler = configure_handler
495 self._require_substitution = require_substitution
496 self._requested_plugins_only = requested_plugins_only
497 self._log_only_to_stderr = log_only_to_stderr
498 self._default_log_level = default_log_level
500 def configure_handler(
501 self,
502 handler: Callable[[argparse.ArgumentParser], None],
503 ) -> None:
504 if self._configure_handler is not None: 504 ↛ 505line 504 didn't jump to line 505 because the condition on line 504 was never true
505 raise TypeError("Only one argument handler can be provided")
506 self._configure_handler = handler
508 def configure(self, argparser: argparse.ArgumentParser) -> None:
509 handler = self._configure_handler
510 if handler is not None:
511 handler(argparser)
513 def __call__(self, command_arg: CommandArg) -> None:
514 context = CommandContext(
515 command_arg.parsed_args,
516 command_arg.plugin_search_dirs,
517 self._require_substitution,
518 self._requested_plugins_only,
519 )
520 if self._log_only_to_stderr:
521 setup_logging(reconfigure_logging=True, log_only_to_stderr=True)
523 default_log_level = self._default_log_level
524 if isinstance(default_log_level, int):
525 level = default_log_level
526 else:
527 assert callable(default_log_level)
528 level = default_log_level(context)
529 change_log_level(level)
530 if level > logging.DEBUG and context.parsed_args.debug_mode:
531 change_log_level(logging.DEBUG)
532 if level > TRACE_LOG and os.environ.get("DEBPUTY_DEBUG", "") == "trace":
533 change_log_level(TRACE_LOG)
534 return self._handler(context)
537class DispatchingCommandMixin(CommandBase):
538 __slots__ = ()
540 def add_subcommand(self, subcommand: SubcommandBase) -> None:
541 raise NotImplementedError
543 def add_dispatching_subcommand(
544 self,
545 name: str,
546 dest: str,
547 *,
548 aliases: Sequence[str] = tuple(),
549 help_description: Optional[str] = None,
550 metavar: str = "command",
551 default_subcommand: Optional[str] = None,
552 ) -> "DispatcherCommand":
553 ds = DispatcherCommand(
554 name,
555 dest,
556 aliases=aliases,
557 help_description=help_description,
558 metavar=metavar,
559 default_subcommand=default_subcommand,
560 )
561 self.add_subcommand(ds)
562 return ds
564 def register_subcommand(
565 self,
566 name: Union[str, Sequence[str]],
567 *,
568 help_description: Optional[str] = None,
569 argparser: Optional[
570 Union[ArgparserConfigurator, Sequence[ArgparserConfigurator]]
571 ] = None,
572 require_substitution: bool = True,
573 requested_plugins_only: bool = False,
574 log_only_to_stderr: bool = False,
575 default_log_level: Union[int, Callable[[CommandContext], int]] = logging.INFO,
576 ) -> Callable[[CommandHandler], GenericSubCommand]:
577 if isinstance(name, str):
578 cmd_name = name
579 aliases = []
580 else:
581 cmd_name = name[0]
582 aliases = name[1:]
584 if argparser is not None and not callable(argparser):
585 args = argparser
587 def _wrapper(parser: argparse.ArgumentParser) -> None:
588 for configurator in args:
589 configurator(parser)
591 argparser = _wrapper
593 def _annotation_impl(func: CommandHandler) -> GenericSubCommand:
594 subcommand = GenericSubCommand(
595 cmd_name,
596 func,
597 aliases=aliases,
598 help_description=help_description,
599 require_substitution=require_substitution,
600 requested_plugins_only=requested_plugins_only,
601 log_only_to_stderr=log_only_to_stderr,
602 default_log_level=default_log_level,
603 )
604 self.add_subcommand(subcommand)
605 if argparser is not None:
606 subcommand.configure_handler(argparser)
608 return subcommand
610 return _annotation_impl
613class DispatcherCommand(SubcommandBase, DispatchingCommandMixin):
614 __slots__ = (
615 "_subcommands",
616 "_aliases",
617 "_dest",
618 "_metavar",
619 "_required",
620 "_default_subcommand",
621 "_argparser",
622 )
624 def __init__(
625 self,
626 name: str,
627 dest: str,
628 *,
629 aliases: Sequence[str] = tuple(),
630 help_description: Optional[str] = None,
631 metavar: str = "command",
632 default_subcommand: Optional[str] = None,
633 ) -> None:
634 super().__init__(name, aliases=aliases, help_description=help_description)
635 self._aliases: Dict[str, SubcommandBase] = {}
636 self._subcommands: Dict[str, SubcommandBase] = {}
637 self._dest = dest
638 self._metavar = metavar
639 self._default_subcommand = default_subcommand
640 self._argparser: Optional[argparse.ArgumentParser] = None
642 def add_subcommand(self, subcommand: SubcommandBase) -> None:
643 all_names = [subcommand.name]
644 if subcommand.aliases:
645 all_names.extend(subcommand.aliases)
646 aliases = self._aliases
647 for n in all_names:
648 if n in aliases: 648 ↛ 649line 648 didn't jump to line 649 because the condition on line 648 was never true
649 raise ValueError(
650 f"Internal error: Multiple handlers for {n} on topic {self.name}"
651 )
653 aliases[n] = subcommand
654 self._subcommands[subcommand.name] = subcommand
656 def configure(self, argparser: argparse.ArgumentParser) -> None:
657 if self._argparser is not None:
658 raise TypeError("Cannot configure twice!")
659 self._argparser = argparser
660 subcommands = self._subcommands
661 if not subcommands:
662 raise ValueError(
663 f"Internal error: No subcommands for subcommand {self.name} (then why do we have it?)"
664 )
665 default_subcommand = self._default_subcommand
666 required = default_subcommand is None
667 if (
668 default_subcommand is not None
669 and default_subcommand not in ("--help", "-h")
670 and default_subcommand not in subcommands
671 ):
672 raise ValueError(
673 f"Internal error: Subcommand {self.name} should have {default_subcommand} as default,"
674 " but it was not registered?"
675 )
676 subparser = argparser.add_subparsers(
677 dest=self._dest,
678 required=required,
679 metavar=self._metavar,
680 )
681 for subcommand in subcommands.values():
682 subcommand.add_subcommand_to_subparser(subparser)
684 def has_command(self, command: str) -> bool:
685 return command in self._aliases
687 def __call__(self, command_arg: CommandArg) -> None:
688 argparser = self._argparser
689 assert argparser is not None
690 v = getattr(command_arg.parsed_args, self._dest, None)
691 if v is None:
692 v = self._default_subcommand
693 if v in ("--help", "-h"):
694 argparser.parse_args([v])
695 _error("Missing command", prog=argparser.prog)
697 assert (
698 v is not None
699 ), f"Internal error: No default subcommand and argparse did not provide the required subcommand {self._dest}?"
700 assert (
701 v in self._aliases
702 ), f"Internal error: {v} was accepted as a topic, but it was not registered?"
703 self._aliases[v](command_arg)
706ROOT_COMMAND = DispatcherCommand(
707 "root",
708 dest="command",
709 metavar="COMMAND",
710)