Coverage for src/debputy/lsp/apt_cache.py: 31%

106 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-10-12 15:06 +0000

1import asyncio 

2import dataclasses 

3import subprocess 

4import sys 

5from collections import defaultdict 

6from typing import Literal, Optional 

7from collections.abc import Sequence, Iterable, Mapping 

8 

9from debian.deb822 import Deb822 

10from debian.debian_support import Version 

11 

12AptCacheState = Literal[ 

13 "not-loaded", 

14 "loading", 

15 "loaded", 

16 "failed", 

17 "tooling-not-available", 

18 "empty-cache", 

19] 

20 

21 

22@dataclasses.dataclass(slots=True) 

23class PackageInformation: 

24 name: str 

25 architecture: str 

26 version: Version 

27 multi_arch: str 

28 # suites: Sequence[Tuple[str, ...]] 

29 synopsis: str 

30 section: str 

31 provides: str | None 

32 upstream_homepage: str | None 

33 

34 

35@dataclasses.dataclass(slots=True, frozen=True) 

36class PackageLookup: 

37 name: str 

38 package: PackageInformation | None 

39 provided_by: Sequence[PackageInformation] 

40 

41 

42class AptCache: 

43 

44 def __init__(self) -> None: 

45 self._state: AptCacheState = "not-loaded" 

46 self._load_error: str | None = None 

47 self._lookups: Mapping[str, PackageLookup] = {} 

48 

49 @property 

50 def state(self) -> AptCacheState: 

51 return self._state 

52 

53 @property 

54 def load_error(self) -> str | None: 

55 return self._load_error 

56 

57 def lookup(self, name: str) -> PackageLookup | None: 

58 return self._lookups.get(name) 

59 

60 async def load(self) -> None: 

61 if self._state in ("loading", "loaded"): 

62 raise RuntimeError(f"Already {self._state}") 

63 self._load_error = None 

64 self._state = "loading" 

65 try: 

66 files_raw = subprocess.check_output( 

67 [ 

68 "apt-get", 

69 "indextargets", 

70 "--format", 

71 "$(IDENTIFIER)\x1f$(FILENAME)", 

72 ] 

73 ).decode("utf-8") 

74 except FileNotFoundError: 

75 self._state = "tooling-not-available" 

76 self._load_error = "apt-get not available in PATH" 

77 return 

78 except subprocess.CalledProcessError as e: 

79 self._state = "failed" 

80 self._load_error = f"apt-get exited with {e.returncode}" 

81 return 

82 packages = {} 

83 for raw_file_line in files_raw.split("\n"): 

84 if not raw_file_line or raw_file_line.isspace(): 

85 continue 

86 identifier, filename = raw_file_line.split("\x1f") 

87 if identifier not in ("Packages",): 

88 continue 

89 try: 

90 for package_info in parse_apt_file(filename): 

91 # Let other computations happen if needed. 

92 await asyncio.sleep(0) 

93 existing = packages.get(package_info.name) 

94 if existing and package_info.version < existing.version: 

95 continue 

96 packages[package_info.name] = package_info 

97 except FileNotFoundError: 

98 self._state = "tooling-not-available" 

99 self._load_error = "/usr/lib/apt/apt-helper not available" 

100 return 

101 except (AttributeError, RuntimeError, IndexError) as e: 

102 self._state = "failed" 

103 self._load_error = str(e) 

104 return 

105 provides = defaultdict(list) 

106 for package_info in packages.values(): 

107 if not package_info.provides: 

108 continue 

109 # Some packages (`debhelper`) provides the same package multiple times (`debhelper-compat`). 

110 # Normalize that into one. 

111 deps = { 

112 clause.split("(")[0].strip() 

113 for clause in package_info.provides.split(",") 

114 } 

115 for dep in sorted(deps): 

116 provides[dep].append(package_info) 

117 

118 self._lookups = { 

119 name: PackageLookup( 

120 name, 

121 packages.get(name), 

122 tuple(provides.get(name, [])), 

123 ) 

124 for name in packages.keys() | provides.keys() 

125 } 

126 self._state = "loaded" 

127 

128 

129def parse_apt_file(filename: str) -> Iterable[PackageInformation]: 

130 proc = subprocess.Popen( 

131 ["/usr/lib/apt/apt-helper", "cat-file", filename], 

132 stdin=subprocess.DEVNULL, 

133 stdout=subprocess.PIPE, 

134 ) 

135 with proc: 

136 for stanza in Deb822.iter_paragraphs(proc.stdout): 

137 pkg_info = stanza_to_package_info(stanza) 

138 if pkg_info is not None: 

139 yield pkg_info 

140 

141 

142def stanza_to_package_info(stanza: Deb822) -> PackageInformation | None: 

143 try: 

144 name = stanza["Package"] 

145 architecture = sys.intern(stanza["Architecture"]) 

146 version = Version(stanza["Version"]) 

147 multi_arch = sys.intern(stanza.get("Multi-Arch", "no")) 

148 synopsis = stanza["Description"] 

149 section = sys.intern(stanza["Section"]) 

150 provides = stanza.get("Provides") 

151 homepage = stanza.get("Homepage") 

152 except KeyError: 

153 return None 

154 if "\n" in synopsis: 

155 # "Modern" Packages files do not have the full description. But in case we see a (very old one) 

156 # have consistent behavior with the modern ones. 

157 synopsis = synopsis.split("\n")[0] 

158 

159 return PackageInformation( 

160 name, 

161 architecture, 

162 version, 

163 multi_arch, 

164 synopsis, 

165 section, 

166 provides, 

167 homepage, 

168 )