Coverage for src/debputy/commands/debputy_cmd/context.py: 36%
344 statements
« prev ^ index » next coverage.py v7.8.2, created at 2026-01-16 17:20 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2026-01-16 17:20 +0000
1import argparse
2import dataclasses
3import errno
4import logging
5import os
6from typing import (
7 Optional,
8 Tuple,
9 FrozenSet,
10 Set,
11 Union,
12 Dict,
13 TYPE_CHECKING,
14 Literal,
15)
16from collections.abc import Mapping, Sequence, Iterable, Callable
18from debian.debian_support import DpkgArchTable
20from debputy._deb_options_profiles import DebBuildOptionsAndProfiles
21from debputy.architecture_support import DpkgArchitectureBuildProcessValuesTable
22from debputy.dh.dh_assistant import read_dh_addon_sequences
23from debputy.exceptions import DebputyRuntimeError
24from debputy.filesystem_scan import OSFSROOverlay
25from debputy.highlevel_manifest import HighLevelManifest
26from debputy.highlevel_manifest_parser import YAMLManifestParser
27from debputy.integration_detection import determine_debputy_integration_mode
28from debputy.packages import (
29 SourcePackage,
30 BinaryPackage,
31 DctrlParser,
32)
33from debputy.plugin.api import VirtualPath
34from debputy.plugin.api.impl import load_plugin_features
35from debputy.plugin.api.feature_set import PluginProvidedFeatureSet
36from debputy.plugin.api.spec import DebputyIntegrationMode
37from debputy.substitution import (
38 Substitution,
39 VariableContext,
40 SubstitutionImpl,
41 NULL_SUBSTITUTION,
42)
43from debputy.util import (
44 _error,
45 PKGNAME_REGEX,
46 resolve_source_date_epoch,
47 setup_logging,
48 PRINT_COMMAND,
49 change_log_level,
50 _warn,
51 TRACE_LOG,
52)
54if TYPE_CHECKING:
55 from argparse import _SubParsersAction
58CommandHandler = Callable[["CommandContext"], None]
59ArgparserConfigurator = Callable[[argparse.ArgumentParser], None]
62def add_arg(
63 *name_or_flags: str,
64 **kwargs,
65) -> Callable[[argparse.ArgumentParser], None]:
66 def _configurator(argparser: argparse.ArgumentParser) -> None:
67 argparser.add_argument(
68 *name_or_flags,
69 **kwargs,
70 )
72 return _configurator
75@dataclasses.dataclass(slots=True, frozen=True)
76class CommandArg:
77 parsed_args: argparse.Namespace
78 plugin_search_dirs: Sequence[str]
81@dataclasses.dataclass
82class Command:
83 handler: Callable[["CommandContext"], None]
84 require_substitution: bool = True
85 requested_plugins_only: bool = False
88def _host_dpo_to_dbo(opt_and_profiles: DebBuildOptionsAndProfiles, v: str) -> None:
90 if (
91 v in opt_and_profiles.deb_build_profiles
92 and v not in opt_and_profiles.deb_build_options
93 ):
94 val = os.environ.get("DEB_BUILD_OPTIONS", "") + " " + v
95 _warn(
96 f'Copying "{v}" into DEB_BUILD_OPTIONS: It was in DEB_BUILD_PROFILES but not in DEB_BUILD_OPTIONS'
97 )
98 os.environ["DEB_BUILD_OPTIONS"] = val.lstrip()
101class CommandContext:
102 def __init__(
103 self,
104 parsed_args: argparse.Namespace,
105 plugin_search_dirs: Sequence[str],
106 require_substitution: bool = True,
107 requested_plugins_only: bool = False,
108 ) -> None:
109 self.parsed_args = parsed_args
110 self.plugin_search_dirs = plugin_search_dirs
111 self._require_substitution = require_substitution
112 self._requested_plugins_only = requested_plugins_only
113 self._debputy_plugin_feature_set: PluginProvidedFeatureSet = (
114 PluginProvidedFeatureSet()
115 )
116 self._debian_dir = OSFSROOverlay.create_root_dir("debian", "debian")
117 self._mtime: int | None = None
118 self._source_variables: Mapping[str, str] | None = None
119 self._substitution: Substitution | None = None
120 self._requested_plugins: Sequence[str] | None = None
121 self._plugins_loaded = False
122 self._dctrl_parser: DctrlParser | None = None
123 self.debputy_integration_mode: DebputyIntegrationMode | None = None
124 self._dctrl_data: None | (
125 tuple[
126 "SourcePackage",
127 Mapping[str, "BinaryPackage"],
128 ]
129 ) = None
130 self._package_set: Literal["both", "arch", "indep"] = "both"
132 @property
133 def package_set(self) -> Literal["both", "arch", "indep"]:
134 return self._package_set
136 @package_set.setter
137 def package_set(self, new_value: Literal["both", "arch", "indep"]) -> None:
138 if self._dctrl_parser is not None:
139 raise TypeError(
140 "package_set cannot be redefined once the debian/control parser has been initialized"
141 )
142 self._package_set = new_value
144 @property
145 def debian_dir(self) -> VirtualPath:
146 return self._debian_dir
148 @property
149 def mtime(self) -> int:
150 if self._mtime is None:
151 self._mtime = resolve_source_date_epoch(
152 None,
153 substitution=self.substitution,
154 )
155 return self._mtime
157 @property
158 def dctrl_parser(self) -> DctrlParser:
159 parser = self._dctrl_parser
160 if parser is None:
161 packages: set[str] | frozenset[str] = frozenset()
162 if hasattr(self.parsed_args, "packages"):
163 packages = self.parsed_args.packages
165 instance = DebBuildOptionsAndProfiles()
167 _host_dpo_to_dbo(instance, "nodoc")
168 _host_dpo_to_dbo(instance, "nocheck")
170 parser = DctrlParser(
171 packages, # -p/--package
172 set(), # -N/--no-package
173 # binary-indep and binary-indep (dpkg BuildDriver integration only)
174 self._package_set == "indep",
175 self._package_set == "arch",
176 deb_options_and_profiles=instance,
177 dpkg_architecture_variables=DpkgArchitectureBuildProcessValuesTable(),
178 dpkg_arch_query_table=DpkgArchTable.load_arch_table(),
179 )
180 self._dctrl_parser = parser
181 return parser
183 def source_package(self) -> SourcePackage:
184 source, _ = self._parse_dctrl()
185 return source
187 def binary_packages(self) -> Mapping[str, "BinaryPackage"]:
188 _, binary_package_table = self._parse_dctrl()
189 return binary_package_table
191 def dpkg_architecture_variables(self) -> DpkgArchitectureBuildProcessValuesTable:
192 return self.dctrl_parser.dpkg_architecture_variables
194 def requested_plugins(self) -> Sequence[str]:
195 if self._requested_plugins is None:
196 self._requested_plugins = self._resolve_requested_plugins()
197 return self._requested_plugins
199 def required_plugins(self) -> set[str]:
200 return set(getattr(self.parsed_args, "required_plugins") or [])
202 @property
203 def deb_build_options_and_profiles(self) -> DebBuildOptionsAndProfiles:
204 return self.dctrl_parser.deb_options_and_profiles
206 @property
207 def deb_build_options(self) -> Mapping[str, str | None]:
208 return self.deb_build_options_and_profiles.deb_build_options
210 def _create_substitution(
211 self,
212 parsed_args: argparse.Namespace,
213 plugin_feature_set: PluginProvidedFeatureSet,
214 debian_dir: VirtualPath,
215 ) -> Substitution:
216 requested_subst = self._require_substitution
217 if hasattr(parsed_args, "substitution"):
218 requested_subst = parsed_args.substitution
219 if requested_subst is False and self._require_substitution:
220 _error(f"--no-substitution cannot be used with {parsed_args.command}")
221 if self._require_substitution or requested_subst is not False:
222 variable_context = VariableContext(debian_dir)
223 return SubstitutionImpl(
224 plugin_feature_set=plugin_feature_set,
225 unresolvable_substitutions=frozenset(["PACKAGE"]),
226 variable_context=variable_context,
227 )
228 return NULL_SUBSTITUTION
230 def load_plugins(self) -> PluginProvidedFeatureSet:
231 if not self._plugins_loaded:
232 requested_plugins = None
233 required_plugins = self.required_plugins()
234 if self._requested_plugins_only:
235 requested_plugins = self.requested_plugins()
236 debug_mode = getattr(self.parsed_args, "debug_mode", False)
237 load_plugin_features(
238 self.plugin_search_dirs,
239 self.substitution,
240 requested_plugins_only=requested_plugins,
241 required_plugins=required_plugins,
242 plugin_feature_set=self._debputy_plugin_feature_set,
243 debug_mode=debug_mode,
244 )
245 self._plugins_loaded = True
246 return self._debputy_plugin_feature_set
248 @staticmethod
249 def _plugin_from_dependency_field(dep_field: str) -> Iterable[str]:
250 package_prefix = "debputy-plugin-"
251 for dep_clause in (d.strip() for d in dep_field.split(",")):
252 dep = dep_clause.split("|")[0].strip()
253 if not dep.startswith(package_prefix):
254 continue
255 m = PKGNAME_REGEX.search(dep)
256 assert m
257 package_name = m.group(0)
258 plugin_name = package_name[len(package_prefix) :]
259 yield plugin_name
261 def _resolve_requested_plugins(self) -> Sequence[str]:
262 source_package, _ = self._parse_dctrl()
263 bd = source_package.fields.get("Build-Depends", "")
264 plugins = list(self._plugin_from_dependency_field(bd))
265 for field_name in ("Build-Depends-Arch", "Build-Depends-Indep"):
266 f = source_package.fields.get(field_name)
267 if not f:
268 continue
269 for plugin in self._plugin_from_dependency_field(f):
270 raise DebputyRuntimeError(
271 f"Cannot load plugins via {field_name}:"
272 f" Please move debputy-plugin-{plugin} dependency to Build-Depends."
273 )
275 return plugins
277 @property
278 def substitution(self) -> Substitution:
279 if self._substitution is None:
280 self._substitution = self._create_substitution(
281 self.parsed_args,
282 self._debputy_plugin_feature_set,
283 self.debian_dir,
284 )
285 return self._substitution
287 def must_be_called_in_source_root(self) -> None:
288 if self.debian_dir.get("control") is None:
289 _error(
290 "This subcommand must be run from a source package root; expecting debian/control to exist."
291 )
293 def _parse_dctrl(
294 self,
295 ) -> tuple[
296 "SourcePackage",
297 Mapping[str, "BinaryPackage"],
298 ]:
299 if self._dctrl_data is None:
300 try:
301 debian_control = self.debian_dir.get("control")
302 if debian_control is None:
303 raise FileNotFoundError(
304 errno.ENOENT,
305 os.strerror(errno.ENOENT),
306 os.path.join(self.debian_dir.fs_path, "control"),
307 )
308 with debian_control.open() as fd:
309 _, source_package, binary_packages = (
310 self.dctrl_parser.parse_source_debian_control(
311 fd,
312 )
313 )
314 except FileNotFoundError:
315 # We are not using `must_be_called_in_source_root`, because we (in this case) require
316 # the file to be readable (that is, parse_source_debian_control can also raise a
317 # FileNotFoundError when trying to open the file).
318 _error(
319 "This subcommand must be run from a source package root; expecting debian/control to exist."
320 )
322 self._dctrl_data = (
323 source_package,
324 binary_packages,
325 )
327 return self._dctrl_data
329 @property
330 def has_dctrl_file(self) -> bool:
331 debian_control = self.debian_dir.get("control")
332 return debian_control is not None
334 def resolve_integration_mode(
335 self,
336 require_integration: bool = True,
337 ) -> DebputyIntegrationMode:
338 integration_mode = self.debputy_integration_mode
339 if integration_mode is None:
340 r = read_dh_addon_sequences(self.debian_dir)
341 bd_sequences, dr_sequences, _ = r
342 all_sequences = bd_sequences | dr_sequences
343 integration_mode = determine_debputy_integration_mode(
344 self.source_package().fields,
345 all_sequences,
346 )
347 if integration_mode is None and not require_integration:
348 _error(
349 "Cannot resolve the integration mode expected for this package. Is this package using `debputy`?"
350 )
351 self.debputy_integration_mode = integration_mode
352 return integration_mode
354 def set_log_level_for_build_subcommand(self) -> int | None:
355 parsed_args = self.parsed_args
356 log_level: int | None = None
357 if os.environ.get("DH_VERBOSE", "") != "":
358 log_level = PRINT_COMMAND
359 if parsed_args.debug_mode:
360 log_level = logging.DEBUG
361 if log_level is not None:
362 change_log_level(log_level)
363 return log_level
365 def manifest_parser(
366 self,
367 *,
368 manifest_path: str | None = None,
369 ) -> YAMLManifestParser:
370 substitution = self.substitution
371 dctrl_parser = self.dctrl_parser
373 source_package, binary_packages = self._parse_dctrl()
375 if self.parsed_args.debputy_manifest is not None:
376 manifest_path = self.parsed_args.debputy_manifest
377 if manifest_path is None:
378 manifest_path = os.path.join(self.debian_dir.fs_path, "debputy.manifest")
379 return YAMLManifestParser(
380 manifest_path,
381 source_package,
382 binary_packages,
383 substitution,
384 dctrl_parser.dpkg_architecture_variables,
385 dctrl_parser.dpkg_arch_query_table,
386 dctrl_parser.deb_options_and_profiles,
387 self.load_plugins(),
388 self.resolve_integration_mode(),
389 debian_dir=self.debian_dir,
390 )
392 def parse_manifest(
393 self,
394 *,
395 manifest_path: str | None = None,
396 ) -> HighLevelManifest:
397 substitution = self.substitution
398 manifest_required = False
400 if self.parsed_args.debputy_manifest is not None:
401 manifest_path = self.parsed_args.debputy_manifest
402 manifest_required = True
403 if manifest_path is None:
404 manifest_path = os.path.join(self.debian_dir.fs_path, "debputy.manifest")
405 parser = self.manifest_parser(manifest_path=manifest_path)
407 os.environ["SOURCE_DATE_EPOCH"] = substitution.substitute(
408 "{{SOURCE_DATE_EPOCH}}",
409 "Internal resolution",
410 )
411 if os.path.isfile(manifest_path):
412 return parser.parse_manifest()
413 if manifest_required:
414 _error(f'The path "{manifest_path}" is not a file!')
415 return parser.build_manifest()
418class CommandBase:
419 __slots__ = ()
421 def configure(self, argparser: argparse.ArgumentParser) -> None:
422 # Does nothing by default
423 pass
425 def __call__(self, command_arg: CommandArg) -> None:
426 raise NotImplementedError
429class SubcommandBase(CommandBase):
430 __slots__ = ("name", "aliases", "help_description")
432 def __init__(
433 self,
434 name: str,
435 *,
436 aliases: Sequence[str] = tuple(),
437 help_description: str | None = None,
438 ) -> None:
439 self.name = name
440 self.aliases = aliases
441 self.help_description = help_description
443 def add_subcommand_to_subparser(
444 self,
445 subparser: "_SubParsersAction",
446 ) -> argparse.ArgumentParser:
447 parser = subparser.add_parser(
448 self.name,
449 aliases=self.aliases,
450 help=self.help_description,
451 allow_abbrev=False,
452 )
453 self.configure(parser)
454 return parser
457class GenericSubCommand(SubcommandBase):
458 __slots__ = (
459 "_handler",
460 "_configure_handler",
461 "_require_substitution",
462 "_requested_plugins_only",
463 "_log_only_to_stderr",
464 "_default_log_level",
465 )
467 def __init__(
468 self,
469 name: str,
470 handler: Callable[[CommandContext], None],
471 *,
472 aliases: Sequence[str] = tuple(),
473 help_description: str | None = None,
474 configure_handler: Callable[[argparse.ArgumentParser], None] | None = None,
475 require_substitution: bool = True,
476 requested_plugins_only: bool = False,
477 log_only_to_stderr: bool = False,
478 default_log_level: int | Callable[[CommandContext], int] = logging.INFO,
479 ) -> None:
480 super().__init__(name, aliases=aliases, help_description=help_description)
481 self._handler = handler
482 self._configure_handler = configure_handler
483 self._require_substitution = require_substitution
484 self._requested_plugins_only = requested_plugins_only
485 self._log_only_to_stderr = log_only_to_stderr
486 self._default_log_level = default_log_level
488 def configure_handler(
489 self,
490 handler: Callable[[argparse.ArgumentParser], None],
491 ) -> None:
492 if self._configure_handler is not None: 492 ↛ 493line 492 didn't jump to line 493 because the condition on line 492 was never true
493 raise TypeError("Only one argument handler can be provided")
494 self._configure_handler = handler
496 def configure(self, argparser: argparse.ArgumentParser) -> None:
497 handler = self._configure_handler
498 if handler is not None:
499 handler(argparser)
501 def __call__(self, command_arg: CommandArg) -> None:
502 context = CommandContext(
503 command_arg.parsed_args,
504 command_arg.plugin_search_dirs,
505 self._require_substitution,
506 self._requested_plugins_only,
507 )
508 if self._log_only_to_stderr:
509 setup_logging(reconfigure_logging=True, log_only_to_stderr=True)
511 default_log_level = self._default_log_level
512 if isinstance(default_log_level, int):
513 level = default_log_level
514 else:
515 assert callable(default_log_level)
516 level = default_log_level(context)
517 change_log_level(level)
518 if level > logging.DEBUG and context.parsed_args.debug_mode:
519 change_log_level(logging.DEBUG)
520 if level > TRACE_LOG and os.environ.get("DEBPUTY_DEBUG", "") == "trace":
521 change_log_level(TRACE_LOG)
522 return self._handler(context)
525class DispatchingCommandMixin(CommandBase):
526 __slots__ = ()
528 def add_subcommand(self, subcommand: SubcommandBase) -> None:
529 raise NotImplementedError
531 def add_dispatching_subcommand(
532 self,
533 name: str,
534 dest: str,
535 *,
536 aliases: Sequence[str] = tuple(),
537 help_description: str | None = None,
538 metavar: str = "command",
539 default_subcommand: str | None = None,
540 ) -> "DispatcherCommand":
541 ds = DispatcherCommand(
542 name,
543 dest,
544 aliases=aliases,
545 help_description=help_description,
546 metavar=metavar,
547 default_subcommand=default_subcommand,
548 )
549 self.add_subcommand(ds)
550 return ds
552 def register_subcommand(
553 self,
554 name: str | Sequence[str],
555 *,
556 help_description: str | None = None,
557 argparser: None | (
558 ArgparserConfigurator | Sequence[ArgparserConfigurator]
559 ) = None,
560 require_substitution: bool = True,
561 requested_plugins_only: bool = False,
562 log_only_to_stderr: bool = False,
563 default_log_level: int | Callable[[CommandContext], int] = logging.INFO,
564 ) -> Callable[[CommandHandler], GenericSubCommand]:
565 if isinstance(name, str):
566 cmd_name = name
567 aliases = []
568 else:
569 cmd_name = name[0]
570 aliases = name[1:]
572 if argparser is not None and not callable(argparser):
573 args = argparser
575 def _wrapper(parser: argparse.ArgumentParser) -> None:
576 for configurator in args:
577 configurator(parser)
579 argparser = _wrapper
581 def _annotation_impl(func: CommandHandler) -> GenericSubCommand:
582 subcommand = GenericSubCommand(
583 cmd_name,
584 func,
585 aliases=aliases,
586 help_description=help_description,
587 require_substitution=require_substitution,
588 requested_plugins_only=requested_plugins_only,
589 log_only_to_stderr=log_only_to_stderr,
590 default_log_level=default_log_level,
591 )
592 self.add_subcommand(subcommand)
593 if argparser is not None:
594 subcommand.configure_handler(argparser)
596 return subcommand
598 return _annotation_impl
601class DispatcherCommand(SubcommandBase, DispatchingCommandMixin):
602 __slots__ = (
603 "_subcommands",
604 "_aliases",
605 "_dest",
606 "_metavar",
607 "_required",
608 "_default_subcommand",
609 "_argparser",
610 )
612 def __init__(
613 self,
614 name: str,
615 dest: str,
616 *,
617 aliases: Sequence[str] = tuple(),
618 help_description: str | None = None,
619 metavar: str = "command",
620 default_subcommand: str | None = None,
621 ) -> None:
622 super().__init__(name, aliases=aliases, help_description=help_description)
623 self._aliases: dict[str, SubcommandBase] = {}
624 self._subcommands: dict[str, SubcommandBase] = {}
625 self._dest = dest
626 self._metavar = metavar
627 self._default_subcommand = default_subcommand
628 self._argparser: argparse.ArgumentParser | None = None
630 def add_subcommand(self, subcommand: SubcommandBase) -> None:
631 all_names = [subcommand.name]
632 if subcommand.aliases:
633 all_names.extend(subcommand.aliases)
634 aliases = self._aliases
635 for n in all_names:
636 if n in aliases: 636 ↛ 637line 636 didn't jump to line 637 because the condition on line 636 was never true
637 raise ValueError(
638 f"Internal error: Multiple handlers for {n} on topic {self.name}"
639 )
641 aliases[n] = subcommand
642 self._subcommands[subcommand.name] = subcommand
644 def configure(self, argparser: argparse.ArgumentParser) -> None:
645 if self._argparser is not None:
646 raise TypeError("Cannot configure twice!")
647 self._argparser = argparser
648 subcommands = self._subcommands
649 if not subcommands:
650 raise ValueError(
651 f"Internal error: No subcommands for subcommand {self.name} (then why do we have it?)"
652 )
653 default_subcommand = self._default_subcommand
654 required = default_subcommand is None
655 if (
656 default_subcommand is not None
657 and default_subcommand not in ("--help", "-h")
658 and default_subcommand not in subcommands
659 ):
660 raise ValueError(
661 f"Internal error: Subcommand {self.name} should have {default_subcommand} as default,"
662 " but it was not registered?"
663 )
664 subparser = argparser.add_subparsers(
665 dest=self._dest,
666 required=required,
667 metavar=self._metavar,
668 )
669 for subcommand in subcommands.values():
670 subcommand.add_subcommand_to_subparser(subparser)
672 def has_command(self, command: str) -> bool:
673 return command in self._aliases
675 def __call__(self, command_arg: CommandArg) -> None:
676 argparser = self._argparser
677 assert argparser is not None
678 v = getattr(command_arg.parsed_args, self._dest, None)
679 if v is None:
680 v = self._default_subcommand
681 if v in ("--help", "-h"):
682 argparser.parse_args([v])
683 _error("Missing command", prog=argparser.prog)
685 assert (
686 v is not None
687 ), f"Internal error: No default subcommand and argparse did not provide the required subcommand {self._dest}?"
688 assert (
689 v in self._aliases
690 ), f"Internal error: {v} was accepted as a topic, but it was not registered?"
691 self._aliases[v](command_arg)
694ROOT_COMMAND = DispatcherCommand(
695 "root",
696 dest="command",
697 metavar="COMMAND",
698)