Coverage for src/debputy/lsp/apt_cache.py: 31%
106 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-10-12 15:06 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-10-12 15:06 +0000
1import asyncio
2import dataclasses
3import subprocess
4import sys
5from collections import defaultdict
6from typing import Literal, Optional
7from collections.abc import Sequence, Iterable, Mapping
9from debian.deb822 import Deb822
10from debian.debian_support import Version
12AptCacheState = Literal[
13 "not-loaded",
14 "loading",
15 "loaded",
16 "failed",
17 "tooling-not-available",
18 "empty-cache",
19]
22@dataclasses.dataclass(slots=True)
23class PackageInformation:
24 name: str
25 architecture: str
26 version: Version
27 multi_arch: str
28 # suites: Sequence[Tuple[str, ...]]
29 synopsis: str
30 section: str
31 provides: str | None
32 upstream_homepage: str | None
35@dataclasses.dataclass(slots=True, frozen=True)
36class PackageLookup:
37 name: str
38 package: PackageInformation | None
39 provided_by: Sequence[PackageInformation]
42class AptCache:
44 def __init__(self) -> None:
45 self._state: AptCacheState = "not-loaded"
46 self._load_error: str | None = None
47 self._lookups: Mapping[str, PackageLookup] = {}
49 @property
50 def state(self) -> AptCacheState:
51 return self._state
53 @property
54 def load_error(self) -> str | None:
55 return self._load_error
57 def lookup(self, name: str) -> PackageLookup | None:
58 return self._lookups.get(name)
60 async def load(self) -> None:
61 if self._state in ("loading", "loaded"):
62 raise RuntimeError(f"Already {self._state}")
63 self._load_error = None
64 self._state = "loading"
65 try:
66 files_raw = subprocess.check_output(
67 [
68 "apt-get",
69 "indextargets",
70 "--format",
71 "$(IDENTIFIER)\x1f$(FILENAME)",
72 ]
73 ).decode("utf-8")
74 except FileNotFoundError:
75 self._state = "tooling-not-available"
76 self._load_error = "apt-get not available in PATH"
77 return
78 except subprocess.CalledProcessError as e:
79 self._state = "failed"
80 self._load_error = f"apt-get exited with {e.returncode}"
81 return
82 packages = {}
83 for raw_file_line in files_raw.split("\n"):
84 if not raw_file_line or raw_file_line.isspace():
85 continue
86 identifier, filename = raw_file_line.split("\x1f")
87 if identifier not in ("Packages",):
88 continue
89 try:
90 for package_info in parse_apt_file(filename):
91 # Let other computations happen if needed.
92 await asyncio.sleep(0)
93 existing = packages.get(package_info.name)
94 if existing and package_info.version < existing.version:
95 continue
96 packages[package_info.name] = package_info
97 except FileNotFoundError:
98 self._state = "tooling-not-available"
99 self._load_error = "/usr/lib/apt/apt-helper not available"
100 return
101 except (AttributeError, RuntimeError, IndexError) as e:
102 self._state = "failed"
103 self._load_error = str(e)
104 return
105 provides = defaultdict(list)
106 for package_info in packages.values():
107 if not package_info.provides:
108 continue
109 # Some packages (`debhelper`) provides the same package multiple times (`debhelper-compat`).
110 # Normalize that into one.
111 deps = {
112 clause.split("(")[0].strip()
113 for clause in package_info.provides.split(",")
114 }
115 for dep in sorted(deps):
116 provides[dep].append(package_info)
118 self._lookups = {
119 name: PackageLookup(
120 name,
121 packages.get(name),
122 tuple(provides.get(name, [])),
123 )
124 for name in packages.keys() | provides.keys()
125 }
126 self._state = "loaded"
129def parse_apt_file(filename: str) -> Iterable[PackageInformation]:
130 proc = subprocess.Popen(
131 ["/usr/lib/apt/apt-helper", "cat-file", filename],
132 stdin=subprocess.DEVNULL,
133 stdout=subprocess.PIPE,
134 )
135 with proc:
136 for stanza in Deb822.iter_paragraphs(proc.stdout):
137 pkg_info = stanza_to_package_info(stanza)
138 if pkg_info is not None:
139 yield pkg_info
142def stanza_to_package_info(stanza: Deb822) -> PackageInformation | None:
143 try:
144 name = stanza["Package"]
145 architecture = sys.intern(stanza["Architecture"])
146 version = Version(stanza["Version"])
147 multi_arch = sys.intern(stanza.get("Multi-Arch", "no"))
148 synopsis = stanza["Description"]
149 section = sys.intern(stanza["Section"])
150 provides = stanza.get("Provides")
151 homepage = stanza.get("Homepage")
152 except KeyError:
153 return None
154 if "\n" in synopsis:
155 # "Modern" Packages files do not have the full description. But in case we see a (very old one)
156 # have consistent behavior with the modern ones.
157 synopsis = synopsis.split("\n")[0]
159 return PackageInformation(
160 name,
161 architecture,
162 version,
163 multi_arch,
164 synopsis,
165 section,
166 provides,
167 homepage,
168 )