Coverage for src/debputy/dh/dh_assistant.py: 79%
98 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-10-12 15:06 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-10-12 15:06 +0000
1import dataclasses
2import json
3import re
4import subprocess
5from typing import FrozenSet, Optional, List, Union, Any, Set, Tuple
6from collections.abc import Iterable, Mapping
8from debian.deb822 import Deb822
9from debputy.exceptions import PureVirtualPathError
11from debputy.plugin.api import VirtualPath
13_FIND_DH_WITH = re.compile(r"--with(?:\s+|=)(\S+)")
14_DEP_REGEX = re.compile("^([a-z0-9][-+.a-z0-9]+)", re.ASCII)
17@dataclasses.dataclass(frozen=True, slots=True)
18class DhListCommands:
19 active_commands: frozenset[str]
20 disabled_commands: frozenset[str]
23@dataclasses.dataclass(frozen=True, slots=True)
24class DhSequencerData:
25 sequences: frozenset[str]
26 uses_dh_sequencer: bool
29def _parse_dh_cmd_list(
30 cmd_list: list[Mapping[str, Any] | object] | None,
31) -> Iterable[str]:
32 if not isinstance(cmd_list, list):
33 return
35 for command in cmd_list:
36 if not isinstance(command, dict): 36 ↛ 37line 36 didn't jump to line 37 because the condition on line 36 was never true
37 continue
38 command_name = command.get("command")
39 if isinstance(command_name, str): 39 ↛ 35line 39 didn't jump to line 35 because the condition on line 39 was always true
40 yield command_name
43def resolve_active_and_inactive_dh_commands(
44 dh_rules_addons: Iterable[str],
45 *,
46 source_root: str | None = None,
47) -> DhListCommands:
48 cmd = ["dh_assistant", "list-commands", "--output-format=json"]
49 if dh_rules_addons:
50 addons = ",".join(dh_rules_addons)
51 cmd.append(f"--with={addons}")
52 try:
53 output = subprocess.check_output(
54 cmd,
55 stderr=subprocess.DEVNULL,
56 cwd=source_root,
57 )
58 except (subprocess.CalledProcessError, FileNotFoundError):
59 return DhListCommands(
60 frozenset(),
61 frozenset(),
62 )
63 else:
64 result = json.loads(output)
65 active_commands = frozenset(_parse_dh_cmd_list(result.get("commands")))
66 disabled_commands = frozenset(
67 _parse_dh_cmd_list(result.get("disabled-commands"))
68 )
69 return DhListCommands(
70 active_commands,
71 disabled_commands,
72 )
75def parse_drules_for_addons(lines: Iterable[str], sequences: set[str]) -> bool:
76 saw_dh = False
77 for line in lines:
78 if not line.startswith("\tdh "): 78 ↛ 80line 78 didn't jump to line 80 because the condition on line 78 was always true
79 continue
80 saw_dh = True
81 for match in _FIND_DH_WITH.finditer(line):
82 sequence_def = match.group(1)
83 sequences.update(sequence_def.split(","))
84 return saw_dh
87def extract_dh_addons_from_control(
88 source_paragraph: Mapping[str, str] | Deb822,
89 sequences: set[str],
90) -> None:
91 for f in ("Build-Depends", "Build-Depends-Indep", "Build-Depends-Arch"):
92 field = source_paragraph.get(f)
93 if not field:
94 continue
96 for dep_clause in (d.strip() for d in field.split(",")):
97 match = _DEP_REGEX.match(dep_clause.strip())
98 if not match:
99 continue
100 dep = match.group(1)
101 if not dep.startswith("dh-sequence-"):
102 continue
103 sequences.add(dep[12:])
106def read_dh_addon_sequences(
107 debian_dir: VirtualPath,
108) -> tuple[set[str], set[str], bool] | None:
109 ctrl_file = debian_dir.get("control")
110 if ctrl_file:
111 dr_sequences: set[str] = set()
112 bd_sequences: set[str] = set()
114 drules = debian_dir.get("rules")
115 saw_dh = False
116 if drules and drules.is_file: 116 ↛ 117line 116 didn't jump to line 117 because the condition on line 116 was never true
117 try:
118 with drules.open() as fd:
119 saw_dh = parse_drules_for_addons(fd, dr_sequences)
120 except PureVirtualPathError:
121 pass
122 with ctrl_file.open() as fd:
123 ctrl = list(Deb822.iter_paragraphs(fd))
124 source_paragraph = ctrl[0] if ctrl else {}
126 extract_dh_addons_from_control(source_paragraph, bd_sequences)
127 return bd_sequences, dr_sequences, saw_dh
128 return None
131def extract_dh_compat_level(*, cwd=None) -> tuple[int | None, int]:
132 try:
133 output = subprocess.check_output(
134 ["dh_assistant", "active-compat-level"],
135 stderr=subprocess.DEVNULL,
136 cwd=cwd,
137 )
138 except (FileNotFoundError, subprocess.CalledProcessError) as e:
139 exit_code = 127
140 if isinstance(e, subprocess.CalledProcessError):
141 exit_code = e.returncode
142 return None, exit_code
143 else:
144 data = json.loads(output)
145 active_compat_level = data.get("active-compat-level")
146 exit_code = 0
147 if not isinstance(active_compat_level, int) or active_compat_level < 1: 147 ↛ 148line 147 didn't jump to line 148 because the condition on line 147 was never true
148 active_compat_level = None
149 exit_code = 255
150 return active_compat_level, exit_code