Coverage for src/debputy/dh/dh_assistant.py: 79%

98 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-10-12 15:06 +0000

1import dataclasses 

2import json 

3import re 

4import subprocess 

5from typing import FrozenSet, Optional, List, Union, Any, Set, Tuple 

6from collections.abc import Iterable, Mapping 

7 

8from debian.deb822 import Deb822 

9from debputy.exceptions import PureVirtualPathError 

10 

11from debputy.plugin.api import VirtualPath 

12 

13_FIND_DH_WITH = re.compile(r"--with(?:\s+|=)(\S+)") 

14_DEP_REGEX = re.compile("^([a-z0-9][-+.a-z0-9]+)", re.ASCII) 

15 

16 

17@dataclasses.dataclass(frozen=True, slots=True) 

18class DhListCommands: 

19 active_commands: frozenset[str] 

20 disabled_commands: frozenset[str] 

21 

22 

23@dataclasses.dataclass(frozen=True, slots=True) 

24class DhSequencerData: 

25 sequences: frozenset[str] 

26 uses_dh_sequencer: bool 

27 

28 

29def _parse_dh_cmd_list( 

30 cmd_list: list[Mapping[str, Any] | object] | None, 

31) -> Iterable[str]: 

32 if not isinstance(cmd_list, list): 

33 return 

34 

35 for command in cmd_list: 

36 if not isinstance(command, dict): 36 ↛ 37line 36 didn't jump to line 37 because the condition on line 36 was never true

37 continue 

38 command_name = command.get("command") 

39 if isinstance(command_name, str): 39 ↛ 35line 39 didn't jump to line 35 because the condition on line 39 was always true

40 yield command_name 

41 

42 

43def resolve_active_and_inactive_dh_commands( 

44 dh_rules_addons: Iterable[str], 

45 *, 

46 source_root: str | None = None, 

47) -> DhListCommands: 

48 cmd = ["dh_assistant", "list-commands", "--output-format=json"] 

49 if dh_rules_addons: 

50 addons = ",".join(dh_rules_addons) 

51 cmd.append(f"--with={addons}") 

52 try: 

53 output = subprocess.check_output( 

54 cmd, 

55 stderr=subprocess.DEVNULL, 

56 cwd=source_root, 

57 ) 

58 except (subprocess.CalledProcessError, FileNotFoundError): 

59 return DhListCommands( 

60 frozenset(), 

61 frozenset(), 

62 ) 

63 else: 

64 result = json.loads(output) 

65 active_commands = frozenset(_parse_dh_cmd_list(result.get("commands"))) 

66 disabled_commands = frozenset( 

67 _parse_dh_cmd_list(result.get("disabled-commands")) 

68 ) 

69 return DhListCommands( 

70 active_commands, 

71 disabled_commands, 

72 ) 

73 

74 

75def parse_drules_for_addons(lines: Iterable[str], sequences: set[str]) -> bool: 

76 saw_dh = False 

77 for line in lines: 

78 if not line.startswith("\tdh "): 78 ↛ 80line 78 didn't jump to line 80 because the condition on line 78 was always true

79 continue 

80 saw_dh = True 

81 for match in _FIND_DH_WITH.finditer(line): 

82 sequence_def = match.group(1) 

83 sequences.update(sequence_def.split(",")) 

84 return saw_dh 

85 

86 

87def extract_dh_addons_from_control( 

88 source_paragraph: Mapping[str, str] | Deb822, 

89 sequences: set[str], 

90) -> None: 

91 for f in ("Build-Depends", "Build-Depends-Indep", "Build-Depends-Arch"): 

92 field = source_paragraph.get(f) 

93 if not field: 

94 continue 

95 

96 for dep_clause in (d.strip() for d in field.split(",")): 

97 match = _DEP_REGEX.match(dep_clause.strip()) 

98 if not match: 

99 continue 

100 dep = match.group(1) 

101 if not dep.startswith("dh-sequence-"): 

102 continue 

103 sequences.add(dep[12:]) 

104 

105 

106def read_dh_addon_sequences( 

107 debian_dir: VirtualPath, 

108) -> tuple[set[str], set[str], bool] | None: 

109 ctrl_file = debian_dir.get("control") 

110 if ctrl_file: 

111 dr_sequences: set[str] = set() 

112 bd_sequences: set[str] = set() 

113 

114 drules = debian_dir.get("rules") 

115 saw_dh = False 

116 if drules and drules.is_file: 116 ↛ 117line 116 didn't jump to line 117 because the condition on line 116 was never true

117 try: 

118 with drules.open() as fd: 

119 saw_dh = parse_drules_for_addons(fd, dr_sequences) 

120 except PureVirtualPathError: 

121 pass 

122 with ctrl_file.open() as fd: 

123 ctrl = list(Deb822.iter_paragraphs(fd)) 

124 source_paragraph = ctrl[0] if ctrl else {} 

125 

126 extract_dh_addons_from_control(source_paragraph, bd_sequences) 

127 return bd_sequences, dr_sequences, saw_dh 

128 return None 

129 

130 

131def extract_dh_compat_level(*, cwd=None) -> tuple[int | None, int]: 

132 try: 

133 output = subprocess.check_output( 

134 ["dh_assistant", "active-compat-level"], 

135 stderr=subprocess.DEVNULL, 

136 cwd=cwd, 

137 ) 

138 except (FileNotFoundError, subprocess.CalledProcessError) as e: 

139 exit_code = 127 

140 if isinstance(e, subprocess.CalledProcessError): 

141 exit_code = e.returncode 

142 return None, exit_code 

143 else: 

144 data = json.loads(output) 

145 active_compat_level = data.get("active-compat-level") 

146 exit_code = 0 

147 if not isinstance(active_compat_level, int) or active_compat_level < 1: 147 ↛ 148line 147 didn't jump to line 148 because the condition on line 147 was never true

148 active_compat_level = None 

149 exit_code = 255 

150 return active_compat_level, exit_code