Coverage for src/debputy/lsp/apt_cache.py: 33%

105 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2025-01-27 13:59 +0000

1import asyncio 

2import dataclasses 

3import subprocess 

4import sys 

5from collections import defaultdict 

6from typing import Literal, Optional, Sequence, Iterable, Mapping 

7 

8from debian.deb822 import Deb822 

9from debian.debian_support import Version 

10 

11AptCacheState = Literal[ 

12 "not-loaded", 

13 "loading", 

14 "loaded", 

15 "failed", 

16 "tooling-not-available", 

17 "empty-cache", 

18] 

19 

20 

21@dataclasses.dataclass(slots=True) 

22class PackageInformation: 

23 name: str 

24 architecture: str 

25 version: Version 

26 multi_arch: str 

27 # suites: Sequence[Tuple[str, ...]] 

28 synopsis: str 

29 section: str 

30 provides: Optional[str] 

31 upstream_homepage: Optional[str] 

32 

33 

34@dataclasses.dataclass(slots=True, frozen=True) 

35class PackageLookup: 

36 name: str 

37 package: Optional[PackageInformation] 

38 provided_by: Sequence[PackageInformation] 

39 

40 

41class AptCache: 

42 

43 def __init__(self) -> None: 

44 self._state: AptCacheState = "not-loaded" 

45 self._load_error: Optional[str] = None 

46 self._lookups: Mapping[str, PackageLookup] = {} 

47 

48 @property 

49 def state(self) -> AptCacheState: 

50 return self._state 

51 

52 @property 

53 def load_error(self) -> Optional[str]: 

54 return self._load_error 

55 

56 def lookup(self, name: str) -> Optional[PackageLookup]: 

57 return self._lookups.get(name) 

58 

59 async def load(self) -> None: 

60 if self._state in ("loading", "loaded"): 

61 raise RuntimeError(f"Already {self._state}") 

62 self._load_error = None 

63 self._state = "loading" 

64 try: 

65 files_raw = subprocess.check_output( 

66 [ 

67 "apt-get", 

68 "indextargets", 

69 "--format", 

70 "$(IDENTIFIER)\x1f$(FILENAME)", 

71 ] 

72 ).decode("utf-8") 

73 except FileNotFoundError: 

74 self._state = "tooling-not-available" 

75 self._load_error = "apt-get not available in PATH" 

76 return 

77 except subprocess.CalledProcessError as e: 

78 self._state = "failed" 

79 self._load_error = f"apt-get exited with {e.returncode}" 

80 return 

81 packages = {} 

82 for raw_file_line in files_raw.split("\n"): 

83 if not raw_file_line or raw_file_line.isspace(): 

84 continue 

85 identifier, filename = raw_file_line.split("\x1f") 

86 if identifier not in ("Packages",): 

87 continue 

88 try: 

89 for package_info in parse_apt_file(filename): 

90 # Let other computations happen if needed. 

91 await asyncio.sleep(0) 

92 existing = packages.get(package_info.name) 

93 if existing and package_info.version < existing.version: 

94 continue 

95 packages[package_info.name] = package_info 

96 except FileNotFoundError: 

97 self._state = "tooling-not-available" 

98 self._load_error = "/usr/lib/apt/apt-helper not available" 

99 return 

100 except (AttributeError, RuntimeError, IndexError) as e: 

101 self._state = "failed" 

102 self._load_error = str(e) 

103 return 

104 provides = defaultdict(list) 

105 for package_info in packages.values(): 

106 if not package_info.provides: 

107 continue 

108 # Some packages (`debhelper`) provides the same package multiple times (`debhelper-compat`). 

109 # Normalize that into one. 

110 deps = { 

111 clause.split("(")[0].strip() 

112 for clause in package_info.provides.split(",") 

113 } 

114 for dep in sorted(deps): 

115 provides[dep].append(package_info) 

116 

117 self._lookups = { 

118 name: PackageLookup( 

119 name, 

120 packages.get(name), 

121 tuple(provides.get(name, [])), 

122 ) 

123 for name in packages.keys() | provides.keys() 

124 } 

125 self._state = "loaded" 

126 

127 

128def parse_apt_file(filename: str) -> Iterable[PackageInformation]: 

129 proc = subprocess.Popen( 

130 ["/usr/lib/apt/apt-helper", "cat-file", filename], 

131 stdin=subprocess.DEVNULL, 

132 stdout=subprocess.PIPE, 

133 ) 

134 with proc: 

135 for stanza in Deb822.iter_paragraphs(proc.stdout): 

136 pkg_info = stanza_to_package_info(stanza) 

137 if pkg_info is not None: 

138 yield pkg_info 

139 

140 

141def stanza_to_package_info(stanza: Deb822) -> Optional[PackageInformation]: 

142 try: 

143 name = stanza["Package"] 

144 architecture = sys.intern(stanza["Architecture"]) 

145 version = Version(stanza["Version"]) 

146 multi_arch = sys.intern(stanza.get("Multi-Arch", "no")) 

147 synopsis = stanza["Description"] 

148 section = sys.intern(stanza["Section"]) 

149 provides = stanza.get("Provides") 

150 homepage = stanza.get("Homepage") 

151 except KeyError: 

152 return None 

153 if "\n" in synopsis: 

154 # "Modern" Packages files do not have the full description. But in case we see a (very old one) 

155 # have consistent behavior with the modern ones. 

156 synopsis = synopsis.split("\n")[0] 

157 

158 return PackageInformation( 

159 name, 

160 architecture, 

161 version, 

162 multi_arch, 

163 synopsis, 

164 section, 

165 provides, 

166 homepage, 

167 )