Coverage for src/debputy/lsp/apt_cache.py: 33%
105 statements
« prev ^ index » next coverage.py v7.6.0, created at 2025-01-27 13:59 +0000
« prev ^ index » next coverage.py v7.6.0, created at 2025-01-27 13:59 +0000
1import asyncio
2import dataclasses
3import subprocess
4import sys
5from collections import defaultdict
6from typing import Literal, Optional, Sequence, Iterable, Mapping
8from debian.deb822 import Deb822
9from debian.debian_support import Version
11AptCacheState = Literal[
12 "not-loaded",
13 "loading",
14 "loaded",
15 "failed",
16 "tooling-not-available",
17 "empty-cache",
18]
21@dataclasses.dataclass(slots=True)
22class PackageInformation:
23 name: str
24 architecture: str
25 version: Version
26 multi_arch: str
27 # suites: Sequence[Tuple[str, ...]]
28 synopsis: str
29 section: str
30 provides: Optional[str]
31 upstream_homepage: Optional[str]
34@dataclasses.dataclass(slots=True, frozen=True)
35class PackageLookup:
36 name: str
37 package: Optional[PackageInformation]
38 provided_by: Sequence[PackageInformation]
41class AptCache:
43 def __init__(self) -> None:
44 self._state: AptCacheState = "not-loaded"
45 self._load_error: Optional[str] = None
46 self._lookups: Mapping[str, PackageLookup] = {}
48 @property
49 def state(self) -> AptCacheState:
50 return self._state
52 @property
53 def load_error(self) -> Optional[str]:
54 return self._load_error
56 def lookup(self, name: str) -> Optional[PackageLookup]:
57 return self._lookups.get(name)
59 async def load(self) -> None:
60 if self._state in ("loading", "loaded"):
61 raise RuntimeError(f"Already {self._state}")
62 self._load_error = None
63 self._state = "loading"
64 try:
65 files_raw = subprocess.check_output(
66 [
67 "apt-get",
68 "indextargets",
69 "--format",
70 "$(IDENTIFIER)\x1f$(FILENAME)",
71 ]
72 ).decode("utf-8")
73 except FileNotFoundError:
74 self._state = "tooling-not-available"
75 self._load_error = "apt-get not available in PATH"
76 return
77 except subprocess.CalledProcessError as e:
78 self._state = "failed"
79 self._load_error = f"apt-get exited with {e.returncode}"
80 return
81 packages = {}
82 for raw_file_line in files_raw.split("\n"):
83 if not raw_file_line or raw_file_line.isspace():
84 continue
85 identifier, filename = raw_file_line.split("\x1f")
86 if identifier not in ("Packages",):
87 continue
88 try:
89 for package_info in parse_apt_file(filename):
90 # Let other computations happen if needed.
91 await asyncio.sleep(0)
92 existing = packages.get(package_info.name)
93 if existing and package_info.version < existing.version:
94 continue
95 packages[package_info.name] = package_info
96 except FileNotFoundError:
97 self._state = "tooling-not-available"
98 self._load_error = "/usr/lib/apt/apt-helper not available"
99 return
100 except (AttributeError, RuntimeError, IndexError) as e:
101 self._state = "failed"
102 self._load_error = str(e)
103 return
104 provides = defaultdict(list)
105 for package_info in packages.values():
106 if not package_info.provides:
107 continue
108 # Some packages (`debhelper`) provides the same package multiple times (`debhelper-compat`).
109 # Normalize that into one.
110 deps = {
111 clause.split("(")[0].strip()
112 for clause in package_info.provides.split(",")
113 }
114 for dep in sorted(deps):
115 provides[dep].append(package_info)
117 self._lookups = {
118 name: PackageLookup(
119 name,
120 packages.get(name),
121 tuple(provides.get(name, [])),
122 )
123 for name in packages.keys() | provides.keys()
124 }
125 self._state = "loaded"
128def parse_apt_file(filename: str) -> Iterable[PackageInformation]:
129 proc = subprocess.Popen(
130 ["/usr/lib/apt/apt-helper", "cat-file", filename],
131 stdin=subprocess.DEVNULL,
132 stdout=subprocess.PIPE,
133 )
134 with proc:
135 for stanza in Deb822.iter_paragraphs(proc.stdout):
136 pkg_info = stanza_to_package_info(stanza)
137 if pkg_info is not None:
138 yield pkg_info
141def stanza_to_package_info(stanza: Deb822) -> Optional[PackageInformation]:
142 try:
143 name = stanza["Package"]
144 architecture = sys.intern(stanza["Architecture"])
145 version = Version(stanza["Version"])
146 multi_arch = sys.intern(stanza.get("Multi-Arch", "no"))
147 synopsis = stanza["Description"]
148 section = sys.intern(stanza["Section"])
149 provides = stanza.get("Provides")
150 homepage = stanza.get("Homepage")
151 except KeyError:
152 return None
153 if "\n" in synopsis:
154 # "Modern" Packages files do not have the full description. But in case we see a (very old one)
155 # have consistent behavior with the modern ones.
156 synopsis = synopsis.split("\n")[0]
158 return PackageInformation(
159 name,
160 architecture,
161 version,
162 multi_arch,
163 synopsis,
164 section,
165 provides,
166 homepage,
167 )