Coverage for src/debputy/commands/debputy_cmd/context.py: 36%

344 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2026-01-16 17:20 +0000

1import argparse 

2import dataclasses 

3import errno 

4import logging 

5import os 

6from typing import ( 

7 Optional, 

8 Tuple, 

9 FrozenSet, 

10 Set, 

11 Union, 

12 Dict, 

13 TYPE_CHECKING, 

14 Literal, 

15) 

16from collections.abc import Mapping, Sequence, Iterable, Callable 

17 

18from debian.debian_support import DpkgArchTable 

19 

20from debputy._deb_options_profiles import DebBuildOptionsAndProfiles 

21from debputy.architecture_support import DpkgArchitectureBuildProcessValuesTable 

22from debputy.dh.dh_assistant import read_dh_addon_sequences 

23from debputy.exceptions import DebputyRuntimeError 

24from debputy.filesystem_scan import OSFSROOverlay 

25from debputy.highlevel_manifest import HighLevelManifest 

26from debputy.highlevel_manifest_parser import YAMLManifestParser 

27from debputy.integration_detection import determine_debputy_integration_mode 

28from debputy.packages import ( 

29 SourcePackage, 

30 BinaryPackage, 

31 DctrlParser, 

32) 

33from debputy.plugin.api import VirtualPath 

34from debputy.plugin.api.impl import load_plugin_features 

35from debputy.plugin.api.feature_set import PluginProvidedFeatureSet 

36from debputy.plugin.api.spec import DebputyIntegrationMode 

37from debputy.substitution import ( 

38 Substitution, 

39 VariableContext, 

40 SubstitutionImpl, 

41 NULL_SUBSTITUTION, 

42) 

43from debputy.util import ( 

44 _error, 

45 PKGNAME_REGEX, 

46 resolve_source_date_epoch, 

47 setup_logging, 

48 PRINT_COMMAND, 

49 change_log_level, 

50 _warn, 

51 TRACE_LOG, 

52) 

53 

54if TYPE_CHECKING: 

55 from argparse import _SubParsersAction 

56 

57 

58CommandHandler = Callable[["CommandContext"], None] 

59ArgparserConfigurator = Callable[[argparse.ArgumentParser], None] 

60 

61 

62def add_arg( 

63 *name_or_flags: str, 

64 **kwargs, 

65) -> Callable[[argparse.ArgumentParser], None]: 

66 def _configurator(argparser: argparse.ArgumentParser) -> None: 

67 argparser.add_argument( 

68 *name_or_flags, 

69 **kwargs, 

70 ) 

71 

72 return _configurator 

73 

74 

75@dataclasses.dataclass(slots=True, frozen=True) 

76class CommandArg: 

77 parsed_args: argparse.Namespace 

78 plugin_search_dirs: Sequence[str] 

79 

80 

81@dataclasses.dataclass 

82class Command: 

83 handler: Callable[["CommandContext"], None] 

84 require_substitution: bool = True 

85 requested_plugins_only: bool = False 

86 

87 

88def _host_dpo_to_dbo(opt_and_profiles: DebBuildOptionsAndProfiles, v: str) -> None: 

89 

90 if ( 

91 v in opt_and_profiles.deb_build_profiles 

92 and v not in opt_and_profiles.deb_build_options 

93 ): 

94 val = os.environ.get("DEB_BUILD_OPTIONS", "") + " " + v 

95 _warn( 

96 f'Copying "{v}" into DEB_BUILD_OPTIONS: It was in DEB_BUILD_PROFILES but not in DEB_BUILD_OPTIONS' 

97 ) 

98 os.environ["DEB_BUILD_OPTIONS"] = val.lstrip() 

99 

100 

101class CommandContext: 

102 def __init__( 

103 self, 

104 parsed_args: argparse.Namespace, 

105 plugin_search_dirs: Sequence[str], 

106 require_substitution: bool = True, 

107 requested_plugins_only: bool = False, 

108 ) -> None: 

109 self.parsed_args = parsed_args 

110 self.plugin_search_dirs = plugin_search_dirs 

111 self._require_substitution = require_substitution 

112 self._requested_plugins_only = requested_plugins_only 

113 self._debputy_plugin_feature_set: PluginProvidedFeatureSet = ( 

114 PluginProvidedFeatureSet() 

115 ) 

116 self._debian_dir = OSFSROOverlay.create_root_dir("debian", "debian") 

117 self._mtime: int | None = None 

118 self._source_variables: Mapping[str, str] | None = None 

119 self._substitution: Substitution | None = None 

120 self._requested_plugins: Sequence[str] | None = None 

121 self._plugins_loaded = False 

122 self._dctrl_parser: DctrlParser | None = None 

123 self.debputy_integration_mode: DebputyIntegrationMode | None = None 

124 self._dctrl_data: None | ( 

125 tuple[ 

126 "SourcePackage", 

127 Mapping[str, "BinaryPackage"], 

128 ] 

129 ) = None 

130 self._package_set: Literal["both", "arch", "indep"] = "both" 

131 

132 @property 

133 def package_set(self) -> Literal["both", "arch", "indep"]: 

134 return self._package_set 

135 

136 @package_set.setter 

137 def package_set(self, new_value: Literal["both", "arch", "indep"]) -> None: 

138 if self._dctrl_parser is not None: 

139 raise TypeError( 

140 "package_set cannot be redefined once the debian/control parser has been initialized" 

141 ) 

142 self._package_set = new_value 

143 

144 @property 

145 def debian_dir(self) -> VirtualPath: 

146 return self._debian_dir 

147 

148 @property 

149 def mtime(self) -> int: 

150 if self._mtime is None: 

151 self._mtime = resolve_source_date_epoch( 

152 None, 

153 substitution=self.substitution, 

154 ) 

155 return self._mtime 

156 

157 @property 

158 def dctrl_parser(self) -> DctrlParser: 

159 parser = self._dctrl_parser 

160 if parser is None: 

161 packages: set[str] | frozenset[str] = frozenset() 

162 if hasattr(self.parsed_args, "packages"): 

163 packages = self.parsed_args.packages 

164 

165 instance = DebBuildOptionsAndProfiles() 

166 

167 _host_dpo_to_dbo(instance, "nodoc") 

168 _host_dpo_to_dbo(instance, "nocheck") 

169 

170 parser = DctrlParser( 

171 packages, # -p/--package 

172 set(), # -N/--no-package 

173 # binary-indep and binary-indep (dpkg BuildDriver integration only) 

174 self._package_set == "indep", 

175 self._package_set == "arch", 

176 deb_options_and_profiles=instance, 

177 dpkg_architecture_variables=DpkgArchitectureBuildProcessValuesTable(), 

178 dpkg_arch_query_table=DpkgArchTable.load_arch_table(), 

179 ) 

180 self._dctrl_parser = parser 

181 return parser 

182 

183 def source_package(self) -> SourcePackage: 

184 source, _ = self._parse_dctrl() 

185 return source 

186 

187 def binary_packages(self) -> Mapping[str, "BinaryPackage"]: 

188 _, binary_package_table = self._parse_dctrl() 

189 return binary_package_table 

190 

191 def dpkg_architecture_variables(self) -> DpkgArchitectureBuildProcessValuesTable: 

192 return self.dctrl_parser.dpkg_architecture_variables 

193 

194 def requested_plugins(self) -> Sequence[str]: 

195 if self._requested_plugins is None: 

196 self._requested_plugins = self._resolve_requested_plugins() 

197 return self._requested_plugins 

198 

199 def required_plugins(self) -> set[str]: 

200 return set(getattr(self.parsed_args, "required_plugins") or []) 

201 

202 @property 

203 def deb_build_options_and_profiles(self) -> DebBuildOptionsAndProfiles: 

204 return self.dctrl_parser.deb_options_and_profiles 

205 

206 @property 

207 def deb_build_options(self) -> Mapping[str, str | None]: 

208 return self.deb_build_options_and_profiles.deb_build_options 

209 

210 def _create_substitution( 

211 self, 

212 parsed_args: argparse.Namespace, 

213 plugin_feature_set: PluginProvidedFeatureSet, 

214 debian_dir: VirtualPath, 

215 ) -> Substitution: 

216 requested_subst = self._require_substitution 

217 if hasattr(parsed_args, "substitution"): 

218 requested_subst = parsed_args.substitution 

219 if requested_subst is False and self._require_substitution: 

220 _error(f"--no-substitution cannot be used with {parsed_args.command}") 

221 if self._require_substitution or requested_subst is not False: 

222 variable_context = VariableContext(debian_dir) 

223 return SubstitutionImpl( 

224 plugin_feature_set=plugin_feature_set, 

225 unresolvable_substitutions=frozenset(["PACKAGE"]), 

226 variable_context=variable_context, 

227 ) 

228 return NULL_SUBSTITUTION 

229 

230 def load_plugins(self) -> PluginProvidedFeatureSet: 

231 if not self._plugins_loaded: 

232 requested_plugins = None 

233 required_plugins = self.required_plugins() 

234 if self._requested_plugins_only: 

235 requested_plugins = self.requested_plugins() 

236 debug_mode = getattr(self.parsed_args, "debug_mode", False) 

237 load_plugin_features( 

238 self.plugin_search_dirs, 

239 self.substitution, 

240 requested_plugins_only=requested_plugins, 

241 required_plugins=required_plugins, 

242 plugin_feature_set=self._debputy_plugin_feature_set, 

243 debug_mode=debug_mode, 

244 ) 

245 self._plugins_loaded = True 

246 return self._debputy_plugin_feature_set 

247 

248 @staticmethod 

249 def _plugin_from_dependency_field(dep_field: str) -> Iterable[str]: 

250 package_prefix = "debputy-plugin-" 

251 for dep_clause in (d.strip() for d in dep_field.split(",")): 

252 dep = dep_clause.split("|")[0].strip() 

253 if not dep.startswith(package_prefix): 

254 continue 

255 m = PKGNAME_REGEX.search(dep) 

256 assert m 

257 package_name = m.group(0) 

258 plugin_name = package_name[len(package_prefix) :] 

259 yield plugin_name 

260 

261 def _resolve_requested_plugins(self) -> Sequence[str]: 

262 source_package, _ = self._parse_dctrl() 

263 bd = source_package.fields.get("Build-Depends", "") 

264 plugins = list(self._plugin_from_dependency_field(bd)) 

265 for field_name in ("Build-Depends-Arch", "Build-Depends-Indep"): 

266 f = source_package.fields.get(field_name) 

267 if not f: 

268 continue 

269 for plugin in self._plugin_from_dependency_field(f): 

270 raise DebputyRuntimeError( 

271 f"Cannot load plugins via {field_name}:" 

272 f" Please move debputy-plugin-{plugin} dependency to Build-Depends." 

273 ) 

274 

275 return plugins 

276 

277 @property 

278 def substitution(self) -> Substitution: 

279 if self._substitution is None: 

280 self._substitution = self._create_substitution( 

281 self.parsed_args, 

282 self._debputy_plugin_feature_set, 

283 self.debian_dir, 

284 ) 

285 return self._substitution 

286 

287 def must_be_called_in_source_root(self) -> None: 

288 if self.debian_dir.get("control") is None: 

289 _error( 

290 "This subcommand must be run from a source package root; expecting debian/control to exist." 

291 ) 

292 

293 def _parse_dctrl( 

294 self, 

295 ) -> tuple[ 

296 "SourcePackage", 

297 Mapping[str, "BinaryPackage"], 

298 ]: 

299 if self._dctrl_data is None: 

300 try: 

301 debian_control = self.debian_dir.get("control") 

302 if debian_control is None: 

303 raise FileNotFoundError( 

304 errno.ENOENT, 

305 os.strerror(errno.ENOENT), 

306 os.path.join(self.debian_dir.fs_path, "control"), 

307 ) 

308 with debian_control.open() as fd: 

309 _, source_package, binary_packages = ( 

310 self.dctrl_parser.parse_source_debian_control( 

311 fd, 

312 ) 

313 ) 

314 except FileNotFoundError: 

315 # We are not using `must_be_called_in_source_root`, because we (in this case) require 

316 # the file to be readable (that is, parse_source_debian_control can also raise a 

317 # FileNotFoundError when trying to open the file). 

318 _error( 

319 "This subcommand must be run from a source package root; expecting debian/control to exist." 

320 ) 

321 

322 self._dctrl_data = ( 

323 source_package, 

324 binary_packages, 

325 ) 

326 

327 return self._dctrl_data 

328 

329 @property 

330 def has_dctrl_file(self) -> bool: 

331 debian_control = self.debian_dir.get("control") 

332 return debian_control is not None 

333 

334 def resolve_integration_mode( 

335 self, 

336 require_integration: bool = True, 

337 ) -> DebputyIntegrationMode: 

338 integration_mode = self.debputy_integration_mode 

339 if integration_mode is None: 

340 r = read_dh_addon_sequences(self.debian_dir) 

341 bd_sequences, dr_sequences, _ = r 

342 all_sequences = bd_sequences | dr_sequences 

343 integration_mode = determine_debputy_integration_mode( 

344 self.source_package().fields, 

345 all_sequences, 

346 ) 

347 if integration_mode is None and not require_integration: 

348 _error( 

349 "Cannot resolve the integration mode expected for this package. Is this package using `debputy`?" 

350 ) 

351 self.debputy_integration_mode = integration_mode 

352 return integration_mode 

353 

354 def set_log_level_for_build_subcommand(self) -> int | None: 

355 parsed_args = self.parsed_args 

356 log_level: int | None = None 

357 if os.environ.get("DH_VERBOSE", "") != "": 

358 log_level = PRINT_COMMAND 

359 if parsed_args.debug_mode: 

360 log_level = logging.DEBUG 

361 if log_level is not None: 

362 change_log_level(log_level) 

363 return log_level 

364 

365 def manifest_parser( 

366 self, 

367 *, 

368 manifest_path: str | None = None, 

369 ) -> YAMLManifestParser: 

370 substitution = self.substitution 

371 dctrl_parser = self.dctrl_parser 

372 

373 source_package, binary_packages = self._parse_dctrl() 

374 

375 if self.parsed_args.debputy_manifest is not None: 

376 manifest_path = self.parsed_args.debputy_manifest 

377 if manifest_path is None: 

378 manifest_path = os.path.join(self.debian_dir.fs_path, "debputy.manifest") 

379 return YAMLManifestParser( 

380 manifest_path, 

381 source_package, 

382 binary_packages, 

383 substitution, 

384 dctrl_parser.dpkg_architecture_variables, 

385 dctrl_parser.dpkg_arch_query_table, 

386 dctrl_parser.deb_options_and_profiles, 

387 self.load_plugins(), 

388 self.resolve_integration_mode(), 

389 debian_dir=self.debian_dir, 

390 ) 

391 

392 def parse_manifest( 

393 self, 

394 *, 

395 manifest_path: str | None = None, 

396 ) -> HighLevelManifest: 

397 substitution = self.substitution 

398 manifest_required = False 

399 

400 if self.parsed_args.debputy_manifest is not None: 

401 manifest_path = self.parsed_args.debputy_manifest 

402 manifest_required = True 

403 if manifest_path is None: 

404 manifest_path = os.path.join(self.debian_dir.fs_path, "debputy.manifest") 

405 parser = self.manifest_parser(manifest_path=manifest_path) 

406 

407 os.environ["SOURCE_DATE_EPOCH"] = substitution.substitute( 

408 "{{SOURCE_DATE_EPOCH}}", 

409 "Internal resolution", 

410 ) 

411 if os.path.isfile(manifest_path): 

412 return parser.parse_manifest() 

413 if manifest_required: 

414 _error(f'The path "{manifest_path}" is not a file!') 

415 return parser.build_manifest() 

416 

417 

418class CommandBase: 

419 __slots__ = () 

420 

421 def configure(self, argparser: argparse.ArgumentParser) -> None: 

422 # Does nothing by default 

423 pass 

424 

425 def __call__(self, command_arg: CommandArg) -> None: 

426 raise NotImplementedError 

427 

428 

429class SubcommandBase(CommandBase): 

430 __slots__ = ("name", "aliases", "help_description") 

431 

432 def __init__( 

433 self, 

434 name: str, 

435 *, 

436 aliases: Sequence[str] = tuple(), 

437 help_description: str | None = None, 

438 ) -> None: 

439 self.name = name 

440 self.aliases = aliases 

441 self.help_description = help_description 

442 

443 def add_subcommand_to_subparser( 

444 self, 

445 subparser: "_SubParsersAction", 

446 ) -> argparse.ArgumentParser: 

447 parser = subparser.add_parser( 

448 self.name, 

449 aliases=self.aliases, 

450 help=self.help_description, 

451 allow_abbrev=False, 

452 ) 

453 self.configure(parser) 

454 return parser 

455 

456 

457class GenericSubCommand(SubcommandBase): 

458 __slots__ = ( 

459 "_handler", 

460 "_configure_handler", 

461 "_require_substitution", 

462 "_requested_plugins_only", 

463 "_log_only_to_stderr", 

464 "_default_log_level", 

465 ) 

466 

467 def __init__( 

468 self, 

469 name: str, 

470 handler: Callable[[CommandContext], None], 

471 *, 

472 aliases: Sequence[str] = tuple(), 

473 help_description: str | None = None, 

474 configure_handler: Callable[[argparse.ArgumentParser], None] | None = None, 

475 require_substitution: bool = True, 

476 requested_plugins_only: bool = False, 

477 log_only_to_stderr: bool = False, 

478 default_log_level: int | Callable[[CommandContext], int] = logging.INFO, 

479 ) -> None: 

480 super().__init__(name, aliases=aliases, help_description=help_description) 

481 self._handler = handler 

482 self._configure_handler = configure_handler 

483 self._require_substitution = require_substitution 

484 self._requested_plugins_only = requested_plugins_only 

485 self._log_only_to_stderr = log_only_to_stderr 

486 self._default_log_level = default_log_level 

487 

488 def configure_handler( 

489 self, 

490 handler: Callable[[argparse.ArgumentParser], None], 

491 ) -> None: 

492 if self._configure_handler is not None: 492 ↛ 493line 492 didn't jump to line 493 because the condition on line 492 was never true

493 raise TypeError("Only one argument handler can be provided") 

494 self._configure_handler = handler 

495 

496 def configure(self, argparser: argparse.ArgumentParser) -> None: 

497 handler = self._configure_handler 

498 if handler is not None: 

499 handler(argparser) 

500 

501 def __call__(self, command_arg: CommandArg) -> None: 

502 context = CommandContext( 

503 command_arg.parsed_args, 

504 command_arg.plugin_search_dirs, 

505 self._require_substitution, 

506 self._requested_plugins_only, 

507 ) 

508 if self._log_only_to_stderr: 

509 setup_logging(reconfigure_logging=True, log_only_to_stderr=True) 

510 

511 default_log_level = self._default_log_level 

512 if isinstance(default_log_level, int): 

513 level = default_log_level 

514 else: 

515 assert callable(default_log_level) 

516 level = default_log_level(context) 

517 change_log_level(level) 

518 if level > logging.DEBUG and context.parsed_args.debug_mode: 

519 change_log_level(logging.DEBUG) 

520 if level > TRACE_LOG and os.environ.get("DEBPUTY_DEBUG", "") == "trace": 

521 change_log_level(TRACE_LOG) 

522 return self._handler(context) 

523 

524 

525class DispatchingCommandMixin(CommandBase): 

526 __slots__ = () 

527 

528 def add_subcommand(self, subcommand: SubcommandBase) -> None: 

529 raise NotImplementedError 

530 

531 def add_dispatching_subcommand( 

532 self, 

533 name: str, 

534 dest: str, 

535 *, 

536 aliases: Sequence[str] = tuple(), 

537 help_description: str | None = None, 

538 metavar: str = "command", 

539 default_subcommand: str | None = None, 

540 ) -> "DispatcherCommand": 

541 ds = DispatcherCommand( 

542 name, 

543 dest, 

544 aliases=aliases, 

545 help_description=help_description, 

546 metavar=metavar, 

547 default_subcommand=default_subcommand, 

548 ) 

549 self.add_subcommand(ds) 

550 return ds 

551 

552 def register_subcommand( 

553 self, 

554 name: str | Sequence[str], 

555 *, 

556 help_description: str | None = None, 

557 argparser: None | ( 

558 ArgparserConfigurator | Sequence[ArgparserConfigurator] 

559 ) = None, 

560 require_substitution: bool = True, 

561 requested_plugins_only: bool = False, 

562 log_only_to_stderr: bool = False, 

563 default_log_level: int | Callable[[CommandContext], int] = logging.INFO, 

564 ) -> Callable[[CommandHandler], GenericSubCommand]: 

565 if isinstance(name, str): 

566 cmd_name = name 

567 aliases = [] 

568 else: 

569 cmd_name = name[0] 

570 aliases = name[1:] 

571 

572 if argparser is not None and not callable(argparser): 

573 args = argparser 

574 

575 def _wrapper(parser: argparse.ArgumentParser) -> None: 

576 for configurator in args: 

577 configurator(parser) 

578 

579 argparser = _wrapper 

580 

581 def _annotation_impl(func: CommandHandler) -> GenericSubCommand: 

582 subcommand = GenericSubCommand( 

583 cmd_name, 

584 func, 

585 aliases=aliases, 

586 help_description=help_description, 

587 require_substitution=require_substitution, 

588 requested_plugins_only=requested_plugins_only, 

589 log_only_to_stderr=log_only_to_stderr, 

590 default_log_level=default_log_level, 

591 ) 

592 self.add_subcommand(subcommand) 

593 if argparser is not None: 

594 subcommand.configure_handler(argparser) 

595 

596 return subcommand 

597 

598 return _annotation_impl 

599 

600 

601class DispatcherCommand(SubcommandBase, DispatchingCommandMixin): 

602 __slots__ = ( 

603 "_subcommands", 

604 "_aliases", 

605 "_dest", 

606 "_metavar", 

607 "_required", 

608 "_default_subcommand", 

609 "_argparser", 

610 ) 

611 

612 def __init__( 

613 self, 

614 name: str, 

615 dest: str, 

616 *, 

617 aliases: Sequence[str] = tuple(), 

618 help_description: str | None = None, 

619 metavar: str = "command", 

620 default_subcommand: str | None = None, 

621 ) -> None: 

622 super().__init__(name, aliases=aliases, help_description=help_description) 

623 self._aliases: dict[str, SubcommandBase] = {} 

624 self._subcommands: dict[str, SubcommandBase] = {} 

625 self._dest = dest 

626 self._metavar = metavar 

627 self._default_subcommand = default_subcommand 

628 self._argparser: argparse.ArgumentParser | None = None 

629 

630 def add_subcommand(self, subcommand: SubcommandBase) -> None: 

631 all_names = [subcommand.name] 

632 if subcommand.aliases: 

633 all_names.extend(subcommand.aliases) 

634 aliases = self._aliases 

635 for n in all_names: 

636 if n in aliases: 636 ↛ 637line 636 didn't jump to line 637 because the condition on line 636 was never true

637 raise ValueError( 

638 f"Internal error: Multiple handlers for {n} on topic {self.name}" 

639 ) 

640 

641 aliases[n] = subcommand 

642 self._subcommands[subcommand.name] = subcommand 

643 

644 def configure(self, argparser: argparse.ArgumentParser) -> None: 

645 if self._argparser is not None: 

646 raise TypeError("Cannot configure twice!") 

647 self._argparser = argparser 

648 subcommands = self._subcommands 

649 if not subcommands: 

650 raise ValueError( 

651 f"Internal error: No subcommands for subcommand {self.name} (then why do we have it?)" 

652 ) 

653 default_subcommand = self._default_subcommand 

654 required = default_subcommand is None 

655 if ( 

656 default_subcommand is not None 

657 and default_subcommand not in ("--help", "-h") 

658 and default_subcommand not in subcommands 

659 ): 

660 raise ValueError( 

661 f"Internal error: Subcommand {self.name} should have {default_subcommand} as default," 

662 " but it was not registered?" 

663 ) 

664 subparser = argparser.add_subparsers( 

665 dest=self._dest, 

666 required=required, 

667 metavar=self._metavar, 

668 ) 

669 for subcommand in subcommands.values(): 

670 subcommand.add_subcommand_to_subparser(subparser) 

671 

672 def has_command(self, command: str) -> bool: 

673 return command in self._aliases 

674 

675 def __call__(self, command_arg: CommandArg) -> None: 

676 argparser = self._argparser 

677 assert argparser is not None 

678 v = getattr(command_arg.parsed_args, self._dest, None) 

679 if v is None: 

680 v = self._default_subcommand 

681 if v in ("--help", "-h"): 

682 argparser.parse_args([v]) 

683 _error("Missing command", prog=argparser.prog) 

684 

685 assert ( 

686 v is not None 

687 ), f"Internal error: No default subcommand and argparse did not provide the required subcommand {self._dest}?" 

688 assert ( 

689 v in self._aliases 

690 ), f"Internal error: {v} was accepted as a topic, but it was not registered?" 

691 self._aliases[v](command_arg) 

692 

693 

694ROOT_COMMAND = DispatcherCommand( 

695 "root", 

696 dest="command", 

697 metavar="COMMAND", 

698)