Coverage for src/debputy/dh/dh_assistant.py: 79%
97 statements
« prev ^ index » next coverage.py v7.6.0, created at 2025-01-27 13:59 +0000
« prev ^ index » next coverage.py v7.6.0, created at 2025-01-27 13:59 +0000
1import dataclasses
2import json
3import re
4import subprocess
5from typing import Iterable, FrozenSet, Optional, List, Union, Mapping, Any, Set, Tuple
7from debian.deb822 import Deb822
8from debputy.exceptions import PureVirtualPathError
10from debputy.plugin.api import VirtualPath
12_FIND_DH_WITH = re.compile(r"--with(?:\s+|=)(\S+)")
13_DEP_REGEX = re.compile("^([a-z0-9][-+.a-z0-9]+)", re.ASCII)
16@dataclasses.dataclass(frozen=True, slots=True)
17class DhListCommands:
18 active_commands: FrozenSet[str]
19 disabled_commands: FrozenSet[str]
22@dataclasses.dataclass(frozen=True, slots=True)
23class DhSequencerData:
24 sequences: FrozenSet[str]
25 uses_dh_sequencer: bool
28def _parse_dh_cmd_list(
29 cmd_list: Optional[List[Union[Mapping[str, Any], object]]]
30) -> Iterable[str]:
31 if not isinstance(cmd_list, list):
32 return
34 for command in cmd_list:
35 if not isinstance(command, dict): 35 ↛ 36line 35 didn't jump to line 36 because the condition on line 35 was never true
36 continue
37 command_name = command.get("command")
38 if isinstance(command_name, str): 38 ↛ 34line 38 didn't jump to line 34 because the condition on line 38 was always true
39 yield command_name
42def resolve_active_and_inactive_dh_commands(
43 dh_rules_addons: Iterable[str],
44 *,
45 source_root: Optional[str] = None,
46) -> DhListCommands:
47 cmd = ["dh_assistant", "list-commands", "--output-format=json"]
48 if dh_rules_addons:
49 addons = ",".join(dh_rules_addons)
50 cmd.append(f"--with={addons}")
51 try:
52 output = subprocess.check_output(
53 cmd,
54 stderr=subprocess.DEVNULL,
55 cwd=source_root,
56 )
57 except (subprocess.CalledProcessError, FileNotFoundError):
58 return DhListCommands(
59 frozenset(),
60 frozenset(),
61 )
62 else:
63 result = json.loads(output)
64 active_commands = frozenset(_parse_dh_cmd_list(result.get("commands")))
65 disabled_commands = frozenset(
66 _parse_dh_cmd_list(result.get("disabled-commands"))
67 )
68 return DhListCommands(
69 active_commands,
70 disabled_commands,
71 )
74def parse_drules_for_addons(lines: Iterable[str], sequences: Set[str]) -> bool:
75 saw_dh = False
76 for line in lines:
77 if not line.startswith("\tdh "): 77 ↛ 79line 77 didn't jump to line 79 because the condition on line 77 was always true
78 continue
79 saw_dh = True
80 for match in _FIND_DH_WITH.finditer(line):
81 sequence_def = match.group(1)
82 sequences.update(sequence_def.split(","))
83 return saw_dh
86def extract_dh_addons_from_control(
87 source_paragraph: Union[Mapping[str, str], Deb822],
88 sequences: Set[str],
89) -> None:
90 for f in ("Build-Depends", "Build-Depends-Indep", "Build-Depends-Arch"):
91 field = source_paragraph.get(f)
92 if not field:
93 continue
95 for dep_clause in (d.strip() for d in field.split(",")):
96 match = _DEP_REGEX.match(dep_clause.strip())
97 if not match:
98 continue
99 dep = match.group(1)
100 if not dep.startswith("dh-sequence-"):
101 continue
102 sequences.add(dep[12:])
105def read_dh_addon_sequences(
106 debian_dir: VirtualPath,
107) -> Optional[Tuple[Set[str], Set[str], bool]]:
108 ctrl_file = debian_dir.get("control")
109 if ctrl_file:
110 dr_sequences: Set[str] = set()
111 bd_sequences: Set[str] = set()
113 drules = debian_dir.get("rules")
114 saw_dh = False
115 if drules and drules.is_file: 115 ↛ 116line 115 didn't jump to line 116 because the condition on line 115 was never true
116 try:
117 with drules.open() as fd:
118 saw_dh = parse_drules_for_addons(fd, dr_sequences)
119 except PureVirtualPathError:
120 pass
121 with ctrl_file.open() as fd:
122 ctrl = list(Deb822.iter_paragraphs(fd))
123 source_paragraph = ctrl[0] if ctrl else {}
125 extract_dh_addons_from_control(source_paragraph, bd_sequences)
126 return bd_sequences, dr_sequences, saw_dh
127 return None
130def extract_dh_compat_level(*, cwd=None) -> Tuple[Optional[int], int]:
131 try:
132 output = subprocess.check_output(
133 ["dh_assistant", "active-compat-level"],
134 stderr=subprocess.DEVNULL,
135 cwd=cwd,
136 )
137 except (FileNotFoundError, subprocess.CalledProcessError) as e:
138 exit_code = 127
139 if isinstance(e, subprocess.CalledProcessError):
140 exit_code = e.returncode
141 return None, exit_code
142 else:
143 data = json.loads(output)
144 active_compat_level = data.get("active-compat-level")
145 exit_code = 0
146 if not isinstance(active_compat_level, int) or active_compat_level < 1: 146 ↛ 147line 146 didn't jump to line 147 because the condition on line 146 was never true
147 active_compat_level = None
148 exit_code = 255
149 return active_compat_level, exit_code