Coverage for src/debputy/commands/debputy_cmd/context.py: 35%
348 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-10-12 15:06 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-10-12 15:06 +0000
1import argparse
2import dataclasses
3import errno
4import logging
5import os
6from typing import (
7 Optional,
8 Tuple,
9 FrozenSet,
10 Set,
11 Union,
12 Dict,
13 TYPE_CHECKING,
14 Literal,
15)
16from collections.abc import Mapping, Sequence, Iterable, Callable
18from debian.debian_support import DpkgArchTable
20from debputy._deb_options_profiles import DebBuildOptionsAndProfiles
21from debputy.architecture_support import (
22 DpkgArchitectureBuildProcessValuesTable,
23 dpkg_architecture_table,
24)
25from debputy.dh.dh_assistant import read_dh_addon_sequences
26from debputy.exceptions import DebputyRuntimeError
27from debputy.filesystem_scan import FSROOverlay
28from debputy.highlevel_manifest import HighLevelManifest
29from debputy.highlevel_manifest_parser import YAMLManifestParser
30from debputy.integration_detection import determine_debputy_integration_mode
31from debputy.packages import (
32 SourcePackage,
33 BinaryPackage,
34 DctrlParser,
35)
36from debputy.plugin.api import VirtualPath
37from debputy.plugin.api.impl import load_plugin_features
38from debputy.plugin.api.feature_set import PluginProvidedFeatureSet
39from debputy.plugin.api.spec import DebputyIntegrationMode
40from debputy.substitution import (
41 Substitution,
42 VariableContext,
43 SubstitutionImpl,
44 NULL_SUBSTITUTION,
45)
46from debputy.util import (
47 _error,
48 PKGNAME_REGEX,
49 resolve_source_date_epoch,
50 setup_logging,
51 PRINT_COMMAND,
52 change_log_level,
53 _warn,
54 TRACE_LOG,
55)
57if TYPE_CHECKING:
58 from argparse import _SubParsersAction
61CommandHandler = Callable[["CommandContext"], None]
62ArgparserConfigurator = Callable[[argparse.ArgumentParser], None]
65def add_arg(
66 *name_or_flags: str,
67 **kwargs,
68) -> Callable[[argparse.ArgumentParser], None]:
69 def _configurator(argparser: argparse.ArgumentParser) -> None:
70 argparser.add_argument(
71 *name_or_flags,
72 **kwargs,
73 )
75 return _configurator
78@dataclasses.dataclass(slots=True, frozen=True)
79class CommandArg:
80 parsed_args: argparse.Namespace
81 plugin_search_dirs: Sequence[str]
84@dataclasses.dataclass
85class Command:
86 handler: Callable[["CommandContext"], None]
87 require_substitution: bool = True
88 requested_plugins_only: bool = False
91def _host_dpo_to_dbo(opt_and_profiles: "DebBuildOptionsAndProfiles", v: str) -> bool:
93 if (
94 v in opt_and_profiles.deb_build_profiles
95 and v not in opt_and_profiles.deb_build_options
96 ):
97 val = os.environ.get("DEB_BUILD_OPTIONS", "") + " " + v
98 _warn(
99 f'Copying "{v}" into DEB_BUILD_OPTIONS: It was in DEB_BUILD_PROFILES but not in DEB_BUILD_OPTIONS'
100 )
101 os.environ["DEB_BUILD_OPTIONS"] = val.lstrip()
102 # Note: It will not be immediately visible since `DebBuildOptionsAndProfiles` caches the result
103 return True
104 return False
107class CommandContext:
108 def __init__(
109 self,
110 parsed_args: argparse.Namespace,
111 plugin_search_dirs: Sequence[str],
112 require_substitution: bool = True,
113 requested_plugins_only: bool = False,
114 ) -> None:
115 self.parsed_args = parsed_args
116 self.plugin_search_dirs = plugin_search_dirs
117 self._require_substitution = require_substitution
118 self._requested_plugins_only = requested_plugins_only
119 self._debputy_plugin_feature_set: PluginProvidedFeatureSet = (
120 PluginProvidedFeatureSet()
121 )
122 self._debian_dir = FSROOverlay.create_root_dir("debian", "debian")
123 self._mtime: int | None = None
124 self._source_variables: Mapping[str, str] | None = None
125 self._substitution: Substitution | None = None
126 self._requested_plugins: Sequence[str] | None = None
127 self._plugins_loaded = False
128 self._dctrl_parser: DctrlParser | None = None
129 self.debputy_integration_mode: DebputyIntegrationMode | None = None
130 self._dctrl_data: None | (
131 tuple[
132 "SourcePackage",
133 Mapping[str, "BinaryPackage"],
134 ]
135 ) = None
136 self._package_set: Literal["both", "arch", "indep"] = "both"
138 @property
139 def package_set(self) -> Literal["both", "arch", "indep"]:
140 return self._package_set
142 @package_set.setter
143 def package_set(self, new_value: Literal["both", "arch", "indep"]) -> None:
144 if self._dctrl_parser is not None:
145 raise TypeError(
146 "package_set cannot be redefined once the debian/control parser has been initialized"
147 )
148 self._package_set = new_value
150 @property
151 def debian_dir(self) -> VirtualPath:
152 return self._debian_dir
154 @property
155 def mtime(self) -> int:
156 if self._mtime is None:
157 self._mtime = resolve_source_date_epoch(
158 None,
159 substitution=self.substitution,
160 )
161 return self._mtime
163 @property
164 def dctrl_parser(self) -> DctrlParser:
165 parser = self._dctrl_parser
166 if parser is None:
167 packages: set[str] | frozenset[str] = frozenset()
168 if hasattr(self.parsed_args, "packages"):
169 packages = self.parsed_args.packages
171 instance = DebBuildOptionsAndProfiles(environ=os.environ)
173 dirty = _host_dpo_to_dbo(instance, "nodoc")
174 dirty = _host_dpo_to_dbo(instance, "nocheck") or dirty
176 if dirty:
177 instance = DebBuildOptionsAndProfiles(environ=os.environ)
179 parser = DctrlParser(
180 packages, # -p/--package
181 set(), # -N/--no-package
182 # binary-indep and binary-indep (dpkg BuildDriver integration only)
183 self._package_set == "indep",
184 self._package_set == "arch",
185 deb_options_and_profiles=instance,
186 dpkg_architecture_variables=dpkg_architecture_table(),
187 dpkg_arch_query_table=DpkgArchTable.load_arch_table(),
188 )
189 self._dctrl_parser = parser
190 return parser
192 def source_package(self) -> SourcePackage:
193 source, _ = self._parse_dctrl()
194 return source
196 def binary_packages(self) -> Mapping[str, "BinaryPackage"]:
197 _, binary_package_table = self._parse_dctrl()
198 return binary_package_table
200 def dpkg_architecture_variables(self) -> DpkgArchitectureBuildProcessValuesTable:
201 return self.dctrl_parser.dpkg_architecture_variables
203 def requested_plugins(self) -> Sequence[str]:
204 if self._requested_plugins is None:
205 self._requested_plugins = self._resolve_requested_plugins()
206 return self._requested_plugins
208 def required_plugins(self) -> set[str]:
209 return set(getattr(self.parsed_args, "required_plugins") or [])
211 @property
212 def deb_build_options_and_profiles(self) -> "DebBuildOptionsAndProfiles":
213 return self.dctrl_parser.deb_options_and_profiles
215 @property
216 def deb_build_options(self) -> Mapping[str, str | None]:
217 return self.deb_build_options_and_profiles.deb_build_options
219 def _create_substitution(
220 self,
221 parsed_args: argparse.Namespace,
222 plugin_feature_set: PluginProvidedFeatureSet,
223 debian_dir: VirtualPath,
224 ) -> Substitution:
225 requested_subst = self._require_substitution
226 if hasattr(parsed_args, "substitution"):
227 requested_subst = parsed_args.substitution
228 if requested_subst is False and self._require_substitution:
229 _error(f"--no-substitution cannot be used with {parsed_args.command}")
230 if self._require_substitution or requested_subst is not False:
231 variable_context = VariableContext(debian_dir)
232 return SubstitutionImpl(
233 plugin_feature_set=plugin_feature_set,
234 unresolvable_substitutions=frozenset(["PACKAGE"]),
235 variable_context=variable_context,
236 )
237 return NULL_SUBSTITUTION
239 def load_plugins(self) -> PluginProvidedFeatureSet:
240 if not self._plugins_loaded:
241 requested_plugins = None
242 required_plugins = self.required_plugins()
243 if self._requested_plugins_only:
244 requested_plugins = self.requested_plugins()
245 debug_mode = getattr(self.parsed_args, "debug_mode", False)
246 load_plugin_features(
247 self.plugin_search_dirs,
248 self.substitution,
249 requested_plugins_only=requested_plugins,
250 required_plugins=required_plugins,
251 plugin_feature_set=self._debputy_plugin_feature_set,
252 debug_mode=debug_mode,
253 )
254 self._plugins_loaded = True
255 return self._debputy_plugin_feature_set
257 @staticmethod
258 def _plugin_from_dependency_field(dep_field: str) -> Iterable[str]:
259 package_prefix = "debputy-plugin-"
260 for dep_clause in (d.strip() for d in dep_field.split(",")):
261 dep = dep_clause.split("|")[0].strip()
262 if not dep.startswith(package_prefix):
263 continue
264 m = PKGNAME_REGEX.search(dep)
265 assert m
266 package_name = m.group(0)
267 plugin_name = package_name[len(package_prefix) :]
268 yield plugin_name
270 def _resolve_requested_plugins(self) -> Sequence[str]:
271 source_package, _ = self._parse_dctrl()
272 bd = source_package.fields.get("Build-Depends", "")
273 plugins = list(self._plugin_from_dependency_field(bd))
274 for field_name in ("Build-Depends-Arch", "Build-Depends-Indep"):
275 f = source_package.fields.get(field_name)
276 if not f:
277 continue
278 for plugin in self._plugin_from_dependency_field(f):
279 raise DebputyRuntimeError(
280 f"Cannot load plugins via {field_name}:"
281 f" Please move debputy-plugin-{plugin} dependency to Build-Depends."
282 )
284 return plugins
286 @property
287 def substitution(self) -> Substitution:
288 if self._substitution is None:
289 self._substitution = self._create_substitution(
290 self.parsed_args,
291 self._debputy_plugin_feature_set,
292 self.debian_dir,
293 )
294 return self._substitution
296 def must_be_called_in_source_root(self) -> None:
297 if self.debian_dir.get("control") is None:
298 _error(
299 "This subcommand must be run from a source package root; expecting debian/control to exist."
300 )
302 def _parse_dctrl(
303 self,
304 ) -> tuple[
305 "SourcePackage",
306 Mapping[str, "BinaryPackage"],
307 ]:
308 if self._dctrl_data is None:
309 try:
310 debian_control = self.debian_dir.get("control")
311 if debian_control is None:
312 raise FileNotFoundError(
313 errno.ENOENT,
314 os.strerror(errno.ENOENT),
315 os.path.join(self.debian_dir.fs_path, "control"),
316 )
317 with debian_control.open() as fd:
318 _, source_package, binary_packages = (
319 self.dctrl_parser.parse_source_debian_control(
320 fd,
321 )
322 )
323 except FileNotFoundError:
324 # We are not using `must_be_called_in_source_root`, because we (in this case) require
325 # the file to be readable (that is, parse_source_debian_control can also raise a
326 # FileNotFoundError when trying to open the file).
327 _error(
328 "This subcommand must be run from a source package root; expecting debian/control to exist."
329 )
331 self._dctrl_data = (
332 source_package,
333 binary_packages,
334 )
336 return self._dctrl_data
338 @property
339 def has_dctrl_file(self) -> bool:
340 debian_control = self.debian_dir.get("control")
341 return debian_control is not None
343 def resolve_integration_mode(
344 self,
345 require_integration: bool = True,
346 ) -> DebputyIntegrationMode:
347 integration_mode = self.debputy_integration_mode
348 if integration_mode is None:
349 r = read_dh_addon_sequences(self.debian_dir)
350 bd_sequences, dr_sequences, _ = r
351 all_sequences = bd_sequences | dr_sequences
352 integration_mode = determine_debputy_integration_mode(
353 self.source_package().fields,
354 all_sequences,
355 )
356 if integration_mode is None and not require_integration:
357 _error(
358 "Cannot resolve the integration mode expected for this package. Is this package using `debputy`?"
359 )
360 self.debputy_integration_mode = integration_mode
361 return integration_mode
363 def set_log_level_for_build_subcommand(self) -> int | None:
364 parsed_args = self.parsed_args
365 log_level: int | None = None
366 if os.environ.get("DH_VERBOSE", "") != "":
367 log_level = PRINT_COMMAND
368 if parsed_args.debug_mode:
369 log_level = logging.DEBUG
370 if log_level is not None:
371 change_log_level(log_level)
372 return log_level
374 def manifest_parser(
375 self,
376 *,
377 manifest_path: str | None = None,
378 ) -> YAMLManifestParser:
379 substitution = self.substitution
380 dctrl_parser = self.dctrl_parser
382 source_package, binary_packages = self._parse_dctrl()
384 if self.parsed_args.debputy_manifest is not None:
385 manifest_path = self.parsed_args.debputy_manifest
386 if manifest_path is None:
387 manifest_path = os.path.join(self.debian_dir.fs_path, "debputy.manifest")
388 return YAMLManifestParser(
389 manifest_path,
390 source_package,
391 binary_packages,
392 substitution,
393 dctrl_parser.dpkg_architecture_variables,
394 dctrl_parser.dpkg_arch_query_table,
395 dctrl_parser.deb_options_and_profiles,
396 self.load_plugins(),
397 self.resolve_integration_mode(),
398 debian_dir=self.debian_dir,
399 )
401 def parse_manifest(
402 self,
403 *,
404 manifest_path: str | None = None,
405 ) -> HighLevelManifest:
406 substitution = self.substitution
407 manifest_required = False
409 if self.parsed_args.debputy_manifest is not None:
410 manifest_path = self.parsed_args.debputy_manifest
411 manifest_required = True
412 if manifest_path is None:
413 manifest_path = os.path.join(self.debian_dir.fs_path, "debputy.manifest")
414 parser = self.manifest_parser(manifest_path=manifest_path)
416 os.environ["SOURCE_DATE_EPOCH"] = substitution.substitute(
417 "{{SOURCE_DATE_EPOCH}}",
418 "Internal resolution",
419 )
420 if os.path.isfile(manifest_path):
421 return parser.parse_manifest()
422 if manifest_required:
423 _error(f'The path "{manifest_path}" is not a file!')
424 return parser.build_manifest()
427class CommandBase:
428 __slots__ = ()
430 def configure(self, argparser: argparse.ArgumentParser) -> None:
431 # Does nothing by default
432 pass
434 def __call__(self, command_arg: CommandArg) -> None:
435 raise NotImplementedError
438class SubcommandBase(CommandBase):
439 __slots__ = ("name", "aliases", "help_description")
441 def __init__(
442 self,
443 name: str,
444 *,
445 aliases: Sequence[str] = tuple(),
446 help_description: str | None = None,
447 ) -> None:
448 self.name = name
449 self.aliases = aliases
450 self.help_description = help_description
452 def add_subcommand_to_subparser(
453 self,
454 subparser: "_SubParsersAction",
455 ) -> argparse.ArgumentParser:
456 parser = subparser.add_parser(
457 self.name,
458 aliases=self.aliases,
459 help=self.help_description,
460 allow_abbrev=False,
461 )
462 self.configure(parser)
463 return parser
466class GenericSubCommand(SubcommandBase):
467 __slots__ = (
468 "_handler",
469 "_configure_handler",
470 "_require_substitution",
471 "_requested_plugins_only",
472 "_log_only_to_stderr",
473 "_default_log_level",
474 )
476 def __init__(
477 self,
478 name: str,
479 handler: Callable[[CommandContext], None],
480 *,
481 aliases: Sequence[str] = tuple(),
482 help_description: str | None = None,
483 configure_handler: Callable[[argparse.ArgumentParser], None] | None = None,
484 require_substitution: bool = True,
485 requested_plugins_only: bool = False,
486 log_only_to_stderr: bool = False,
487 default_log_level: int | Callable[[CommandContext], int] = logging.INFO,
488 ) -> None:
489 super().__init__(name, aliases=aliases, help_description=help_description)
490 self._handler = handler
491 self._configure_handler = configure_handler
492 self._require_substitution = require_substitution
493 self._requested_plugins_only = requested_plugins_only
494 self._log_only_to_stderr = log_only_to_stderr
495 self._default_log_level = default_log_level
497 def configure_handler(
498 self,
499 handler: Callable[[argparse.ArgumentParser], None],
500 ) -> None:
501 if self._configure_handler is not None: 501 ↛ 502line 501 didn't jump to line 502 because the condition on line 501 was never true
502 raise TypeError("Only one argument handler can be provided")
503 self._configure_handler = handler
505 def configure(self, argparser: argparse.ArgumentParser) -> None:
506 handler = self._configure_handler
507 if handler is not None:
508 handler(argparser)
510 def __call__(self, command_arg: CommandArg) -> None:
511 context = CommandContext(
512 command_arg.parsed_args,
513 command_arg.plugin_search_dirs,
514 self._require_substitution,
515 self._requested_plugins_only,
516 )
517 if self._log_only_to_stderr:
518 setup_logging(reconfigure_logging=True, log_only_to_stderr=True)
520 default_log_level = self._default_log_level
521 if isinstance(default_log_level, int):
522 level = default_log_level
523 else:
524 assert callable(default_log_level)
525 level = default_log_level(context)
526 change_log_level(level)
527 if level > logging.DEBUG and context.parsed_args.debug_mode:
528 change_log_level(logging.DEBUG)
529 if level > TRACE_LOG and os.environ.get("DEBPUTY_DEBUG", "") == "trace":
530 change_log_level(TRACE_LOG)
531 return self._handler(context)
534class DispatchingCommandMixin(CommandBase):
535 __slots__ = ()
537 def add_subcommand(self, subcommand: SubcommandBase) -> None:
538 raise NotImplementedError
540 def add_dispatching_subcommand(
541 self,
542 name: str,
543 dest: str,
544 *,
545 aliases: Sequence[str] = tuple(),
546 help_description: str | None = None,
547 metavar: str = "command",
548 default_subcommand: str | None = None,
549 ) -> "DispatcherCommand":
550 ds = DispatcherCommand(
551 name,
552 dest,
553 aliases=aliases,
554 help_description=help_description,
555 metavar=metavar,
556 default_subcommand=default_subcommand,
557 )
558 self.add_subcommand(ds)
559 return ds
561 def register_subcommand(
562 self,
563 name: str | Sequence[str],
564 *,
565 help_description: str | None = None,
566 argparser: None | (
567 ArgparserConfigurator | Sequence[ArgparserConfigurator]
568 ) = None,
569 require_substitution: bool = True,
570 requested_plugins_only: bool = False,
571 log_only_to_stderr: bool = False,
572 default_log_level: int | Callable[[CommandContext], int] = logging.INFO,
573 ) -> Callable[[CommandHandler], GenericSubCommand]:
574 if isinstance(name, str):
575 cmd_name = name
576 aliases = []
577 else:
578 cmd_name = name[0]
579 aliases = name[1:]
581 if argparser is not None and not callable(argparser):
582 args = argparser
584 def _wrapper(parser: argparse.ArgumentParser) -> None:
585 for configurator in args:
586 configurator(parser)
588 argparser = _wrapper
590 def _annotation_impl(func: CommandHandler) -> GenericSubCommand:
591 subcommand = GenericSubCommand(
592 cmd_name,
593 func,
594 aliases=aliases,
595 help_description=help_description,
596 require_substitution=require_substitution,
597 requested_plugins_only=requested_plugins_only,
598 log_only_to_stderr=log_only_to_stderr,
599 default_log_level=default_log_level,
600 )
601 self.add_subcommand(subcommand)
602 if argparser is not None:
603 subcommand.configure_handler(argparser)
605 return subcommand
607 return _annotation_impl
610class DispatcherCommand(SubcommandBase, DispatchingCommandMixin):
611 __slots__ = (
612 "_subcommands",
613 "_aliases",
614 "_dest",
615 "_metavar",
616 "_required",
617 "_default_subcommand",
618 "_argparser",
619 )
621 def __init__(
622 self,
623 name: str,
624 dest: str,
625 *,
626 aliases: Sequence[str] = tuple(),
627 help_description: str | None = None,
628 metavar: str = "command",
629 default_subcommand: str | None = None,
630 ) -> None:
631 super().__init__(name, aliases=aliases, help_description=help_description)
632 self._aliases: dict[str, SubcommandBase] = {}
633 self._subcommands: dict[str, SubcommandBase] = {}
634 self._dest = dest
635 self._metavar = metavar
636 self._default_subcommand = default_subcommand
637 self._argparser: argparse.ArgumentParser | None = None
639 def add_subcommand(self, subcommand: SubcommandBase) -> None:
640 all_names = [subcommand.name]
641 if subcommand.aliases:
642 all_names.extend(subcommand.aliases)
643 aliases = self._aliases
644 for n in all_names:
645 if n in aliases: 645 ↛ 646line 645 didn't jump to line 646 because the condition on line 645 was never true
646 raise ValueError(
647 f"Internal error: Multiple handlers for {n} on topic {self.name}"
648 )
650 aliases[n] = subcommand
651 self._subcommands[subcommand.name] = subcommand
653 def configure(self, argparser: argparse.ArgumentParser) -> None:
654 if self._argparser is not None:
655 raise TypeError("Cannot configure twice!")
656 self._argparser = argparser
657 subcommands = self._subcommands
658 if not subcommands:
659 raise ValueError(
660 f"Internal error: No subcommands for subcommand {self.name} (then why do we have it?)"
661 )
662 default_subcommand = self._default_subcommand
663 required = default_subcommand is None
664 if (
665 default_subcommand is not None
666 and default_subcommand not in ("--help", "-h")
667 and default_subcommand not in subcommands
668 ):
669 raise ValueError(
670 f"Internal error: Subcommand {self.name} should have {default_subcommand} as default,"
671 " but it was not registered?"
672 )
673 subparser = argparser.add_subparsers(
674 dest=self._dest,
675 required=required,
676 metavar=self._metavar,
677 )
678 for subcommand in subcommands.values():
679 subcommand.add_subcommand_to_subparser(subparser)
681 def has_command(self, command: str) -> bool:
682 return command in self._aliases
684 def __call__(self, command_arg: CommandArg) -> None:
685 argparser = self._argparser
686 assert argparser is not None
687 v = getattr(command_arg.parsed_args, self._dest, None)
688 if v is None:
689 v = self._default_subcommand
690 if v in ("--help", "-h"):
691 argparser.parse_args([v])
692 _error("Missing command", prog=argparser.prog)
694 assert (
695 v is not None
696 ), f"Internal error: No default subcommand and argparse did not provide the required subcommand {self._dest}?"
697 assert (
698 v in self._aliases
699 ), f"Internal error: {v} was accepted as a topic, but it was not registered?"
700 self._aliases[v](command_arg)
703ROOT_COMMAND = DispatcherCommand(
704 "root",
705 dest="command",
706 metavar="COMMAND",
707)