Coverage for src/debputy/commands/debputy_cmd/context.py: 38%

347 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2025-01-27 13:59 +0000

1import argparse 

2import dataclasses 

3import errno 

4import logging 

5import os 

6from typing import ( 

7 Optional, 

8 Tuple, 

9 Mapping, 

10 FrozenSet, 

11 Set, 

12 Union, 

13 Sequence, 

14 Iterable, 

15 Callable, 

16 Dict, 

17 TYPE_CHECKING, 

18 Literal, 

19) 

20 

21from debian.debian_support import DpkgArchTable 

22 

23from debputy._deb_options_profiles import DebBuildOptionsAndProfiles 

24from debputy.architecture_support import ( 

25 DpkgArchitectureBuildProcessValuesTable, 

26 dpkg_architecture_table, 

27) 

28from debputy.dh.dh_assistant import read_dh_addon_sequences 

29from debputy.exceptions import DebputyRuntimeError 

30from debputy.filesystem_scan import FSROOverlay 

31from debputy.highlevel_manifest import HighLevelManifest 

32from debputy.highlevel_manifest_parser import YAMLManifestParser 

33from debputy.integration_detection import determine_debputy_integration_mode 

34from debputy.packages import ( 

35 SourcePackage, 

36 BinaryPackage, 

37 DctrlParser, 

38) 

39from debputy.plugin.api import VirtualPath 

40from debputy.plugin.api.impl import load_plugin_features 

41from debputy.plugin.api.feature_set import PluginProvidedFeatureSet 

42from debputy.plugin.api.spec import DebputyIntegrationMode 

43from debputy.substitution import ( 

44 Substitution, 

45 VariableContext, 

46 SubstitutionImpl, 

47 NULL_SUBSTITUTION, 

48) 

49from debputy.util import ( 

50 _error, 

51 PKGNAME_REGEX, 

52 resolve_source_date_epoch, 

53 setup_logging, 

54 PRINT_COMMAND, 

55 change_log_level, 

56 _warn, 

57 TRACE_LOG, 

58) 

59 

60if TYPE_CHECKING: 

61 from argparse import _SubParsersAction 

62 

63 

64CommandHandler = Callable[["CommandContext"], None] 

65ArgparserConfigurator = Callable[[argparse.ArgumentParser], None] 

66 

67 

68def add_arg( 

69 *name_or_flags: str, 

70 **kwargs, 

71) -> Callable[[argparse.ArgumentParser], None]: 

72 def _configurator(argparser: argparse.ArgumentParser) -> None: 

73 argparser.add_argument( 

74 *name_or_flags, 

75 **kwargs, 

76 ) 

77 

78 return _configurator 

79 

80 

81@dataclasses.dataclass(slots=True, frozen=True) 

82class CommandArg: 

83 parsed_args: argparse.Namespace 

84 plugin_search_dirs: Sequence[str] 

85 

86 

87@dataclasses.dataclass 

88class Command: 

89 handler: Callable[["CommandContext"], None] 

90 require_substitution: bool = True 

91 requested_plugins_only: bool = False 

92 

93 

94def _host_dpo_to_dbo(opt_and_profiles: "DebBuildOptionsAndProfiles", v: str) -> bool: 

95 

96 if ( 

97 v in opt_and_profiles.deb_build_profiles 

98 and v not in opt_and_profiles.deb_build_options 

99 ): 

100 val = os.environ.get("DEB_BUILD_OPTIONS", "") + " " + v 

101 _warn( 

102 f'Copying "{v}" into DEB_BUILD_OPTIONS: It was in DEB_BUILD_PROFILES but not in DEB_BUILD_OPTIONS' 

103 ) 

104 os.environ["DEB_BUILD_OPTIONS"] = val.lstrip() 

105 # Note: It will not be immediately visible since `DebBuildOptionsAndProfiles` caches the result 

106 return True 

107 return False 

108 

109 

110class CommandContext: 

111 def __init__( 

112 self, 

113 parsed_args: argparse.Namespace, 

114 plugin_search_dirs: Sequence[str], 

115 require_substitution: bool = True, 

116 requested_plugins_only: bool = False, 

117 ) -> None: 

118 self.parsed_args = parsed_args 

119 self.plugin_search_dirs = plugin_search_dirs 

120 self._require_substitution = require_substitution 

121 self._requested_plugins_only = requested_plugins_only 

122 self._debputy_plugin_feature_set: PluginProvidedFeatureSet = ( 

123 PluginProvidedFeatureSet() 

124 ) 

125 self._debian_dir = FSROOverlay.create_root_dir("debian", "debian") 

126 self._mtime: Optional[int] = None 

127 self._source_variables: Optional[Mapping[str, str]] = None 

128 self._substitution: Optional[Substitution] = None 

129 self._requested_plugins: Optional[Sequence[str]] = None 

130 self._plugins_loaded = False 

131 self._dctrl_parser: Optional[DctrlParser] = None 

132 self.debputy_integration_mode: Optional[DebputyIntegrationMode] = None 

133 self._dctrl_data: Optional[ 

134 Tuple[ 

135 "SourcePackage", 

136 Mapping[str, "BinaryPackage"], 

137 ] 

138 ] = None 

139 self._package_set: Literal["both", "arch", "indep"] = "both" 

140 

141 @property 

142 def package_set(self) -> Literal["both", "arch", "indep"]: 

143 return self._package_set 

144 

145 @package_set.setter 

146 def package_set(self, new_value: Literal["both", "arch", "indep"]) -> None: 

147 if self._dctrl_parser is not None: 

148 raise TypeError( 

149 "package_set cannot be redefined once the debian/control parser has been initialized" 

150 ) 

151 self._package_set = new_value 

152 

153 @property 

154 def debian_dir(self) -> VirtualPath: 

155 return self._debian_dir 

156 

157 @property 

158 def mtime(self) -> int: 

159 if self._mtime is None: 

160 self._mtime = resolve_source_date_epoch( 

161 None, 

162 substitution=self.substitution, 

163 ) 

164 return self._mtime 

165 

166 @property 

167 def dctrl_parser(self) -> DctrlParser: 

168 parser = self._dctrl_parser 

169 if parser is None: 

170 packages: Union[Set[str], FrozenSet[str]] = frozenset() 

171 if hasattr(self.parsed_args, "packages"): 

172 packages = self.parsed_args.packages 

173 

174 instance = DebBuildOptionsAndProfiles(environ=os.environ) 

175 

176 dirty = _host_dpo_to_dbo(instance, "nodoc") 

177 dirty = _host_dpo_to_dbo(instance, "nocheck") or dirty 

178 

179 if dirty: 

180 instance = DebBuildOptionsAndProfiles(environ=os.environ) 

181 

182 parser = DctrlParser( 

183 packages, # -p/--package 

184 set(), # -N/--no-package 

185 # binary-indep and binary-indep (dpkg BuildDriver integration only) 

186 self._package_set == "indep", 

187 self._package_set == "arch", 

188 deb_options_and_profiles=instance, 

189 dpkg_architecture_variables=dpkg_architecture_table(), 

190 dpkg_arch_query_table=DpkgArchTable.load_arch_table(), 

191 ) 

192 self._dctrl_parser = parser 

193 return parser 

194 

195 def source_package(self) -> SourcePackage: 

196 source, _ = self._parse_dctrl() 

197 return source 

198 

199 def binary_packages(self) -> Mapping[str, "BinaryPackage"]: 

200 _, binary_package_table = self._parse_dctrl() 

201 return binary_package_table 

202 

203 def dpkg_architecture_variables(self) -> DpkgArchitectureBuildProcessValuesTable: 

204 return self.dctrl_parser.dpkg_architecture_variables 

205 

206 def requested_plugins(self) -> Sequence[str]: 

207 if self._requested_plugins is None: 

208 self._requested_plugins = self._resolve_requested_plugins() 

209 return self._requested_plugins 

210 

211 def required_plugins(self) -> Set[str]: 

212 return set(getattr(self.parsed_args, "required_plugins") or []) 

213 

214 @property 

215 def deb_build_options_and_profiles(self) -> "DebBuildOptionsAndProfiles": 

216 return self.dctrl_parser.deb_options_and_profiles 

217 

218 @property 

219 def deb_build_options(self) -> Mapping[str, Optional[str]]: 

220 return self.deb_build_options_and_profiles.deb_build_options 

221 

222 def _create_substitution( 

223 self, 

224 parsed_args: argparse.Namespace, 

225 plugin_feature_set: PluginProvidedFeatureSet, 

226 debian_dir: VirtualPath, 

227 ) -> Substitution: 

228 requested_subst = self._require_substitution 

229 if hasattr(parsed_args, "substitution"): 

230 requested_subst = parsed_args.substitution 

231 if requested_subst is False and self._require_substitution: 

232 _error(f"--no-substitution cannot be used with {parsed_args.command}") 

233 if self._require_substitution or requested_subst is not False: 

234 variable_context = VariableContext(debian_dir) 

235 return SubstitutionImpl( 

236 plugin_feature_set=plugin_feature_set, 

237 unresolvable_substitutions=frozenset(["PACKAGE"]), 

238 variable_context=variable_context, 

239 ) 

240 return NULL_SUBSTITUTION 

241 

242 def load_plugins(self) -> PluginProvidedFeatureSet: 

243 if not self._plugins_loaded: 

244 requested_plugins = None 

245 required_plugins = self.required_plugins() 

246 if self._requested_plugins_only: 

247 requested_plugins = self.requested_plugins() 

248 debug_mode = getattr(self.parsed_args, "debug_mode", False) 

249 load_plugin_features( 

250 self.plugin_search_dirs, 

251 self.substitution, 

252 requested_plugins_only=requested_plugins, 

253 required_plugins=required_plugins, 

254 plugin_feature_set=self._debputy_plugin_feature_set, 

255 debug_mode=debug_mode, 

256 ) 

257 self._plugins_loaded = True 

258 return self._debputy_plugin_feature_set 

259 

260 @staticmethod 

261 def _plugin_from_dependency_field(dep_field: str) -> Iterable[str]: 

262 package_prefix = "debputy-plugin-" 

263 for dep_clause in (d.strip() for d in dep_field.split(",")): 

264 dep = dep_clause.split("|")[0].strip() 

265 if not dep.startswith(package_prefix): 

266 continue 

267 m = PKGNAME_REGEX.search(dep) 

268 assert m 

269 package_name = m.group(0) 

270 plugin_name = package_name[len(package_prefix) :] 

271 yield plugin_name 

272 

273 def _resolve_requested_plugins(self) -> Sequence[str]: 

274 source_package, _ = self._parse_dctrl() 

275 bd = source_package.fields.get("Build-Depends", "") 

276 plugins = list(self._plugin_from_dependency_field(bd)) 

277 for field_name in ("Build-Depends-Arch", "Build-Depends-Indep"): 

278 f = source_package.fields.get(field_name) 

279 if not f: 

280 continue 

281 for plugin in self._plugin_from_dependency_field(f): 

282 raise DebputyRuntimeError( 

283 f"Cannot load plugins via {field_name}:" 

284 f" Please move debputy-plugin-{plugin} dependency to Build-Depends." 

285 ) 

286 

287 return plugins 

288 

289 @property 

290 def substitution(self) -> Substitution: 

291 if self._substitution is None: 

292 self._substitution = self._create_substitution( 

293 self.parsed_args, 

294 self._debputy_plugin_feature_set, 

295 self.debian_dir, 

296 ) 

297 return self._substitution 

298 

299 def must_be_called_in_source_root(self) -> None: 

300 if self.debian_dir.get("control") is None: 

301 _error( 

302 "This subcommand must be run from a source package root; expecting debian/control to exist." 

303 ) 

304 

305 def _parse_dctrl( 

306 self, 

307 ) -> Tuple[ 

308 "SourcePackage", 

309 Mapping[str, "BinaryPackage"], 

310 ]: 

311 if self._dctrl_data is None: 

312 try: 

313 debian_control = self.debian_dir.get("control") 

314 if debian_control is None: 

315 raise FileNotFoundError( 

316 errno.ENOENT, 

317 os.strerror(errno.ENOENT), 

318 os.path.join(self.debian_dir.fs_path, "control"), 

319 ) 

320 with debian_control.open() as fd: 

321 _, source_package, binary_packages = ( 

322 self.dctrl_parser.parse_source_debian_control( 

323 fd, 

324 ) 

325 ) 

326 except FileNotFoundError: 

327 # We are not using `must_be_called_in_source_root`, because we (in this case) require 

328 # the file to be readable (that is, parse_source_debian_control can also raise a 

329 # FileNotFoundError when trying to open the file). 

330 _error( 

331 "This subcommand must be run from a source package root; expecting debian/control to exist." 

332 ) 

333 

334 self._dctrl_data = ( 

335 source_package, 

336 binary_packages, 

337 ) 

338 

339 return self._dctrl_data 

340 

341 @property 

342 def has_dctrl_file(self) -> bool: 

343 debian_control = self.debian_dir.get("control") 

344 return debian_control is not None 

345 

346 def resolve_integration_mode( 

347 self, 

348 require_integration: bool = True, 

349 ) -> DebputyIntegrationMode: 

350 integration_mode = self.debputy_integration_mode 

351 if integration_mode is None: 

352 r = read_dh_addon_sequences(self.debian_dir) 

353 bd_sequences, dr_sequences, _ = r 

354 all_sequences = bd_sequences | dr_sequences 

355 integration_mode = determine_debputy_integration_mode( 

356 self.source_package().fields, 

357 all_sequences, 

358 ) 

359 if integration_mode is None and not require_integration: 

360 _error( 

361 "Cannot resolve the integration mode expected for this package. Is this package using `debputy`?" 

362 ) 

363 self.debputy_integration_mode = integration_mode 

364 return integration_mode 

365 

366 def set_log_level_for_build_subcommand(self) -> Optional[int]: 

367 parsed_args = self.parsed_args 

368 log_level: Optional[int] = None 

369 if os.environ.get("DH_VERBOSE", "") != "": 

370 log_level = PRINT_COMMAND 

371 if parsed_args.debug_mode: 

372 log_level = logging.DEBUG 

373 if log_level is not None: 

374 change_log_level(log_level) 

375 return log_level 

376 

377 def manifest_parser( 

378 self, 

379 *, 

380 manifest_path: Optional[str] = None, 

381 ) -> YAMLManifestParser: 

382 substitution = self.substitution 

383 dctrl_parser = self.dctrl_parser 

384 

385 source_package, binary_packages = self._parse_dctrl() 

386 

387 if self.parsed_args.debputy_manifest is not None: 

388 manifest_path = self.parsed_args.debputy_manifest 

389 if manifest_path is None: 

390 manifest_path = os.path.join(self.debian_dir.fs_path, "debputy.manifest") 

391 return YAMLManifestParser( 

392 manifest_path, 

393 source_package, 

394 binary_packages, 

395 substitution, 

396 dctrl_parser.dpkg_architecture_variables, 

397 dctrl_parser.dpkg_arch_query_table, 

398 dctrl_parser.deb_options_and_profiles, 

399 self.load_plugins(), 

400 self.resolve_integration_mode(), 

401 debian_dir=self.debian_dir, 

402 ) 

403 

404 def parse_manifest( 

405 self, 

406 *, 

407 manifest_path: Optional[str] = None, 

408 ) -> HighLevelManifest: 

409 substitution = self.substitution 

410 manifest_required = False 

411 

412 if self.parsed_args.debputy_manifest is not None: 

413 manifest_path = self.parsed_args.debputy_manifest 

414 manifest_required = True 

415 if manifest_path is None: 

416 manifest_path = os.path.join(self.debian_dir.fs_path, "debputy.manifest") 

417 parser = self.manifest_parser(manifest_path=manifest_path) 

418 

419 os.environ["SOURCE_DATE_EPOCH"] = substitution.substitute( 

420 "{{SOURCE_DATE_EPOCH}}", 

421 "Internal resolution", 

422 ) 

423 if os.path.isfile(manifest_path): 

424 return parser.parse_manifest() 

425 if manifest_required: 

426 _error(f'The path "{manifest_path}" is not a file!') 

427 return parser.build_manifest() 

428 

429 

430class CommandBase: 

431 __slots__ = () 

432 

433 def configure(self, argparser: argparse.ArgumentParser) -> None: 

434 # Does nothing by default 

435 pass 

436 

437 def __call__(self, command_arg: CommandArg) -> None: 

438 raise NotImplementedError 

439 

440 

441class SubcommandBase(CommandBase): 

442 __slots__ = ("name", "aliases", "help_description") 

443 

444 def __init__( 

445 self, 

446 name: str, 

447 *, 

448 aliases: Sequence[str] = tuple(), 

449 help_description: Optional[str] = None, 

450 ) -> None: 

451 self.name = name 

452 self.aliases = aliases 

453 self.help_description = help_description 

454 

455 def add_subcommand_to_subparser( 

456 self, 

457 subparser: "_SubParsersAction", 

458 ) -> argparse.ArgumentParser: 

459 parser = subparser.add_parser( 

460 self.name, 

461 aliases=self.aliases, 

462 help=self.help_description, 

463 allow_abbrev=False, 

464 ) 

465 self.configure(parser) 

466 return parser 

467 

468 

469class GenericSubCommand(SubcommandBase): 

470 __slots__ = ( 

471 "_handler", 

472 "_configure_handler", 

473 "_require_substitution", 

474 "_requested_plugins_only", 

475 "_log_only_to_stderr", 

476 "_default_log_level", 

477 ) 

478 

479 def __init__( 

480 self, 

481 name: str, 

482 handler: Callable[[CommandContext], None], 

483 *, 

484 aliases: Sequence[str] = tuple(), 

485 help_description: Optional[str] = None, 

486 configure_handler: Optional[Callable[[argparse.ArgumentParser], None]] = None, 

487 require_substitution: bool = True, 

488 requested_plugins_only: bool = False, 

489 log_only_to_stderr: bool = False, 

490 default_log_level: Union[int, Callable[[CommandContext], int]] = logging.INFO, 

491 ) -> None: 

492 super().__init__(name, aliases=aliases, help_description=help_description) 

493 self._handler = handler 

494 self._configure_handler = configure_handler 

495 self._require_substitution = require_substitution 

496 self._requested_plugins_only = requested_plugins_only 

497 self._log_only_to_stderr = log_only_to_stderr 

498 self._default_log_level = default_log_level 

499 

500 def configure_handler( 

501 self, 

502 handler: Callable[[argparse.ArgumentParser], None], 

503 ) -> None: 

504 if self._configure_handler is not None: 504 ↛ 505line 504 didn't jump to line 505 because the condition on line 504 was never true

505 raise TypeError("Only one argument handler can be provided") 

506 self._configure_handler = handler 

507 

508 def configure(self, argparser: argparse.ArgumentParser) -> None: 

509 handler = self._configure_handler 

510 if handler is not None: 

511 handler(argparser) 

512 

513 def __call__(self, command_arg: CommandArg) -> None: 

514 context = CommandContext( 

515 command_arg.parsed_args, 

516 command_arg.plugin_search_dirs, 

517 self._require_substitution, 

518 self._requested_plugins_only, 

519 ) 

520 if self._log_only_to_stderr: 

521 setup_logging(reconfigure_logging=True, log_only_to_stderr=True) 

522 

523 default_log_level = self._default_log_level 

524 if isinstance(default_log_level, int): 

525 level = default_log_level 

526 else: 

527 assert callable(default_log_level) 

528 level = default_log_level(context) 

529 change_log_level(level) 

530 if level > logging.DEBUG and context.parsed_args.debug_mode: 

531 change_log_level(logging.DEBUG) 

532 if level > TRACE_LOG and os.environ.get("DEBPUTY_DEBUG", "") == "trace": 

533 change_log_level(TRACE_LOG) 

534 return self._handler(context) 

535 

536 

537class DispatchingCommandMixin(CommandBase): 

538 __slots__ = () 

539 

540 def add_subcommand(self, subcommand: SubcommandBase) -> None: 

541 raise NotImplementedError 

542 

543 def add_dispatching_subcommand( 

544 self, 

545 name: str, 

546 dest: str, 

547 *, 

548 aliases: Sequence[str] = tuple(), 

549 help_description: Optional[str] = None, 

550 metavar: str = "command", 

551 default_subcommand: Optional[str] = None, 

552 ) -> "DispatcherCommand": 

553 ds = DispatcherCommand( 

554 name, 

555 dest, 

556 aliases=aliases, 

557 help_description=help_description, 

558 metavar=metavar, 

559 default_subcommand=default_subcommand, 

560 ) 

561 self.add_subcommand(ds) 

562 return ds 

563 

564 def register_subcommand( 

565 self, 

566 name: Union[str, Sequence[str]], 

567 *, 

568 help_description: Optional[str] = None, 

569 argparser: Optional[ 

570 Union[ArgparserConfigurator, Sequence[ArgparserConfigurator]] 

571 ] = None, 

572 require_substitution: bool = True, 

573 requested_plugins_only: bool = False, 

574 log_only_to_stderr: bool = False, 

575 default_log_level: Union[int, Callable[[CommandContext], int]] = logging.INFO, 

576 ) -> Callable[[CommandHandler], GenericSubCommand]: 

577 if isinstance(name, str): 

578 cmd_name = name 

579 aliases = [] 

580 else: 

581 cmd_name = name[0] 

582 aliases = name[1:] 

583 

584 if argparser is not None and not callable(argparser): 

585 args = argparser 

586 

587 def _wrapper(parser: argparse.ArgumentParser) -> None: 

588 for configurator in args: 

589 configurator(parser) 

590 

591 argparser = _wrapper 

592 

593 def _annotation_impl(func: CommandHandler) -> GenericSubCommand: 

594 subcommand = GenericSubCommand( 

595 cmd_name, 

596 func, 

597 aliases=aliases, 

598 help_description=help_description, 

599 require_substitution=require_substitution, 

600 requested_plugins_only=requested_plugins_only, 

601 log_only_to_stderr=log_only_to_stderr, 

602 default_log_level=default_log_level, 

603 ) 

604 self.add_subcommand(subcommand) 

605 if argparser is not None: 

606 subcommand.configure_handler(argparser) 

607 

608 return subcommand 

609 

610 return _annotation_impl 

611 

612 

613class DispatcherCommand(SubcommandBase, DispatchingCommandMixin): 

614 __slots__ = ( 

615 "_subcommands", 

616 "_aliases", 

617 "_dest", 

618 "_metavar", 

619 "_required", 

620 "_default_subcommand", 

621 "_argparser", 

622 ) 

623 

624 def __init__( 

625 self, 

626 name: str, 

627 dest: str, 

628 *, 

629 aliases: Sequence[str] = tuple(), 

630 help_description: Optional[str] = None, 

631 metavar: str = "command", 

632 default_subcommand: Optional[str] = None, 

633 ) -> None: 

634 super().__init__(name, aliases=aliases, help_description=help_description) 

635 self._aliases: Dict[str, SubcommandBase] = {} 

636 self._subcommands: Dict[str, SubcommandBase] = {} 

637 self._dest = dest 

638 self._metavar = metavar 

639 self._default_subcommand = default_subcommand 

640 self._argparser: Optional[argparse.ArgumentParser] = None 

641 

642 def add_subcommand(self, subcommand: SubcommandBase) -> None: 

643 all_names = [subcommand.name] 

644 if subcommand.aliases: 

645 all_names.extend(subcommand.aliases) 

646 aliases = self._aliases 

647 for n in all_names: 

648 if n in aliases: 648 ↛ 649line 648 didn't jump to line 649 because the condition on line 648 was never true

649 raise ValueError( 

650 f"Internal error: Multiple handlers for {n} on topic {self.name}" 

651 ) 

652 

653 aliases[n] = subcommand 

654 self._subcommands[subcommand.name] = subcommand 

655 

656 def configure(self, argparser: argparse.ArgumentParser) -> None: 

657 if self._argparser is not None: 

658 raise TypeError("Cannot configure twice!") 

659 self._argparser = argparser 

660 subcommands = self._subcommands 

661 if not subcommands: 

662 raise ValueError( 

663 f"Internal error: No subcommands for subcommand {self.name} (then why do we have it?)" 

664 ) 

665 default_subcommand = self._default_subcommand 

666 required = default_subcommand is None 

667 if ( 

668 default_subcommand is not None 

669 and default_subcommand not in ("--help", "-h") 

670 and default_subcommand not in subcommands 

671 ): 

672 raise ValueError( 

673 f"Internal error: Subcommand {self.name} should have {default_subcommand} as default," 

674 " but it was not registered?" 

675 ) 

676 subparser = argparser.add_subparsers( 

677 dest=self._dest, 

678 required=required, 

679 metavar=self._metavar, 

680 ) 

681 for subcommand in subcommands.values(): 

682 subcommand.add_subcommand_to_subparser(subparser) 

683 

684 def has_command(self, command: str) -> bool: 

685 return command in self._aliases 

686 

687 def __call__(self, command_arg: CommandArg) -> None: 

688 argparser = self._argparser 

689 assert argparser is not None 

690 v = getattr(command_arg.parsed_args, self._dest, None) 

691 if v is None: 

692 v = self._default_subcommand 

693 if v in ("--help", "-h"): 

694 argparser.parse_args([v]) 

695 _error("Missing command", prog=argparser.prog) 

696 

697 assert ( 

698 v is not None 

699 ), f"Internal error: No default subcommand and argparse did not provide the required subcommand {self._dest}?" 

700 assert ( 

701 v in self._aliases 

702 ), f"Internal error: {v} was accepted as a topic, but it was not registered?" 

703 self._aliases[v](command_arg) 

704 

705 

706ROOT_COMMAND = DispatcherCommand( 

707 "root", 

708 dest="command", 

709 metavar="COMMAND", 

710)