Coverage for src/debputy/plugins/debputy/service_management.py: 82%
164 statements
« prev ^ index » next coverage.py v7.8.2, created at 2025-10-12 15:06 +0000
« prev ^ index » next coverage.py v7.8.2, created at 2025-10-12 15:06 +0000
1import collections
2import dataclasses
3import os
4import textwrap
5from typing import Dict, List, Literal
6from collections.abc import Iterable, Sequence
8from debputy.packages import BinaryPackage
9from debputy.plugin.api.spec import (
10 ServiceRegistry,
11 VirtualPath,
12 PackageProcessingContext,
13 BinaryCtrlAccessor,
14 ServiceDefinition,
15)
16from debputy.util import _error, assume_not_none
18DPKG_ROOT = '"${DPKG_ROOT}"'
19EMPTY_DPKG_ROOT_CONDITION = '[ -z "${DPKG_ROOT}" ]'
20SERVICE_MANAGER_IS_SYSTEMD_CONDITION = "[ -d /run/systemd/system ]"
23@dataclasses.dataclass(slots=True)
24class SystemdServiceContext:
25 had_install_section: bool
28@dataclasses.dataclass(slots=True)
29class SystemdUnit:
30 path: VirtualPath
31 names: list[str]
32 type_of_service: str
33 service_scope: str
34 enable_by_default: bool
35 start_by_default: bool
36 had_install_section: bool
39def detect_systemd_service_files(
40 fs_root: VirtualPath,
41 service_registry: ServiceRegistry[SystemdServiceContext],
42 context: PackageProcessingContext,
43) -> None:
44 pkg = context.binary_package
45 systemd_units = _find_and_analyze_systemd_service_files(pkg, fs_root, "system")
46 for unit in systemd_units:
47 service_registry.register_service(
48 unit.path,
49 unit.names,
50 type_of_service=unit.type_of_service,
51 service_scope=unit.service_scope,
52 enable_by_default=unit.enable_by_default,
53 start_by_default=unit.start_by_default,
54 default_upgrade_rule="restart" if unit.start_by_default else "do-nothing",
55 service_context=SystemdServiceContext(
56 unit.had_install_section,
57 ),
58 )
61def generate_snippets_for_systemd_units(
62 services: Sequence[ServiceDefinition[SystemdServiceContext]],
63 ctrl: BinaryCtrlAccessor,
64 _context: PackageProcessingContext,
65) -> None:
66 stop_before_upgrade: list[str] = []
67 stop_then_start_scripts = []
68 on_purge = []
69 start_on_install = []
70 action_on_upgrade = collections.defaultdict(list)
71 assert services
73 for service_def in services:
74 if service_def.auto_enable_on_install:
75 template = """\
76 if deb-systemd-helper debian-installed {UNITFILE}; then
77 # The following line should be removed in trixie or trixie+1
78 deb-systemd-helper unmask {UNITFILE} >/dev/null || true
80 if deb-systemd-helper --quiet was-enabled {UNITFILE}; then
81 # Create new symlinks, if any.
82 deb-systemd-helper enable {UNITFILE} >/dev/null || true
83 fi
84 fi
86 # Update the statefile to add new symlinks (if any), which need to be cleaned
87 # up on purge. Also remove old symlinks.
88 deb-systemd-helper update-state {UNITFILE} >/dev/null || true
89 """
90 else:
91 template = """\
92 # The following line should be removed in trixie or trixie+1
93 deb-systemd-helper unmask {UNITFILE} >/dev/null || true
95 # was-enabled defaults to true, so new installations run enable.
96 if deb-systemd-helper --quiet was-enabled {UNITFILE}; then
97 # Enables the unit on first installation, creates new
98 # symlinks on upgrades if the unit file has changed.
99 deb-systemd-helper enable {UNITFILE} >/dev/null || true
100 else
101 # Update the statefile to add new symlinks (if any), which need to be
102 # cleaned up on purge. Also remove old symlinks.
103 deb-systemd-helper update-state {UNITFILE} >/dev/null || true
104 fi
105 """
106 service_name = service_def.name
108 if assume_not_none(service_def.service_context).had_install_section:
109 ctrl.maintscript.on_configure(
110 template.format(
111 UNITFILE=ctrl.maintscript.escape_shell_words(service_name),
112 )
113 )
114 on_purge.append(service_name)
115 elif service_def.auto_enable_on_install: 115 ↛ 116line 115 didn't jump to line 116 because the condition on line 115 was never true
116 _error(
117 f'The service "{service_name}" cannot be enabled under "systemd" as'
118 f' it has no "[Install]" section. Please correct {service_def.definition_source}'
119 f' so that it does not enable the service or does not apply to "systemd"'
120 )
122 if service_def.auto_start_on_install: 122 ↛ 124line 122 didn't jump to line 124 because the condition on line 122 was always true
123 start_on_install.append(service_name)
124 if service_def.on_upgrade == "stop-then-start": 124 ↛ 125line 124 didn't jump to line 125 because the condition on line 124 was never true
125 stop_then_start_scripts.append(service_name)
126 elif service_def.on_upgrade in ("restart", "reload"): 126 ↛ 129line 126 didn't jump to line 129 because the condition on line 126 was always true
127 action: str = service_def.on_upgrade
128 action_on_upgrade[action].append(service_name)
129 elif service_def.on_upgrade != "do-nothing":
130 raise AssertionError(
131 f"Missing support for on_upgrade rule: {service_def.on_upgrade}"
132 )
134 if start_on_install or action_on_upgrade: 134 ↛ 171line 134 didn't jump to line 171 because the condition on line 134 was always true
135 lines = [
136 "if {EMPTY_DPKG_ROOT_CONDITION} && {SERVICE_MANAGER_IS_SYSTEMD_CONDITION}; then".format(
137 EMPTY_DPKG_ROOT_CONDITION=EMPTY_DPKG_ROOT_CONDITION,
138 SERVICE_MANAGER_IS_SYSTEMD_CONDITION=SERVICE_MANAGER_IS_SYSTEMD_CONDITION,
139 ),
140 " systemctl --system daemon-reload >/dev/null || true",
141 ]
142 if stop_then_start_scripts: 142 ↛ 143line 142 didn't jump to line 143 because the condition on line 142 was never true
143 unit_files = ctrl.maintscript.escape_shell_words(*stop_then_start_scripts)
144 lines.append(
145 " deb-systemd-invoke start {UNITFILES} >/dev/null || true".format(
146 UNITFILES=unit_files,
147 )
148 )
149 if start_on_install: 149 ↛ 157line 149 didn't jump to line 157 because the condition on line 149 was always true
150 lines.append(' if [ -z "$2" ]; then')
151 lines.append(
152 " deb-systemd-invoke start {UNITFILES} >/dev/null || true".format(
153 UNITFILES=ctrl.maintscript.escape_shell_words(*start_on_install),
154 )
155 )
156 lines.append(" fi")
157 if action_on_upgrade: 157 ↛ 167line 157 didn't jump to line 167 because the condition on line 157 was always true
158 lines.append(' if [ -n "$2" ]; then')
159 for action, units in action_on_upgrade.items():
160 lines.append(
161 " deb-systemd-invoke {ACTION} {UNITFILES} >/dev/null || true".format(
162 ACTION=action,
163 UNITFILES=ctrl.maintscript.escape_shell_words(*units),
164 )
165 )
166 lines.append(" fi")
167 lines.append("fi")
168 combined = "".join(x if x.endswith("\n") else f"{x}\n" for x in lines)
169 ctrl.maintscript.on_configure(combined)
171 if stop_then_start_scripts: 171 ↛ 172line 171 didn't jump to line 172 because the condition on line 171 was never true
172 ctrl.maintscript.unconditionally_in_script(
173 "preinst",
174 textwrap.dedent(
175 """\
176 if {EMPTY_DPKG_ROOT_CONDITION} && [ "$1" = upgrade ] && {SERVICE_MANAGER_IS_SYSTEMD_CONDITION} ; then
177 deb-systemd-invoke stop {UNIT_FILES} >/dev/null || true
178 fi
179 """.format(
180 EMPTY_DPKG_ROOT_CONDITION=EMPTY_DPKG_ROOT_CONDITION,
181 SERVICE_MANAGER_IS_SYSTEMD_CONDITION=SERVICE_MANAGER_IS_SYSTEMD_CONDITION,
182 UNIT_FILES=ctrl.maintscript.escape_shell_words(
183 *stop_then_start_scripts
184 ),
185 )
186 ),
187 )
189 if stop_before_upgrade: 189 ↛ 190line 189 didn't jump to line 190 because the condition on line 189 was never true
190 ctrl.maintscript.on_before_removal(
191 """\
192 if {EMPTY_DPKG_ROOT_CONDITION} && {SERVICE_MANAGER_IS_SYSTEMD_CONDITION} ; then
193 deb-systemd-invoke stop {UNIT_FILES} >/dev/null || true
194 fi
195 """.format(
196 EMPTY_DPKG_ROOT_CONDITION=EMPTY_DPKG_ROOT_CONDITION,
197 SERVICE_MANAGER_IS_SYSTEMD_CONDITION=SERVICE_MANAGER_IS_SYSTEMD_CONDITION,
198 UNIT_FILES=ctrl.maintscript.escape_shell_words(*stop_before_upgrade),
199 )
200 )
201 if on_purge: 201 ↛ 211line 201 didn't jump to line 211 because the condition on line 201 was always true
202 ctrl.maintscript.on_purge(
203 """\
204 if [ -x "/usr/bin/deb-systemd-helper" ]; then
205 deb-systemd-helper purge {UNITFILES} >/dev/null || true
206 fi
207 """.format(
208 UNITFILES=ctrl.maintscript.escape_shell_words(*stop_before_upgrade),
209 )
210 )
211 ctrl.maintscript.on_removed(
212 textwrap.dedent(
213 """\
214 if {SERVICE_MANAGER_IS_SYSTEMD_CONDITION} ; then
215 systemctl --system daemon-reload >/dev/null || true
216 fi
217 """.format(
218 SERVICE_MANAGER_IS_SYSTEMD_CONDITION=SERVICE_MANAGER_IS_SYSTEMD_CONDITION
219 )
220 )
221 )
224def _remove_quote(v: str) -> str:
225 if v and v[0] == v[-1] and v[0] in ('"', "'"): 225 ↛ 227line 225 didn't jump to line 227 because the condition on line 225 was always true
226 return v[1:-1]
227 return v
230def _find_and_analyze_systemd_service_files(
231 pkg: BinaryPackage,
232 fs_root: VirtualPath,
233 systemd_service_dir: Literal["system", "user"],
234) -> Iterable[SystemdUnit]:
235 service_dirs = [
236 f"./usr/lib/systemd/{systemd_service_dir}",
237 f"./lib/systemd/{systemd_service_dir}",
238 ]
239 had_install_sections = set()
240 aliases: dict[str, list[str]] = collections.defaultdict(list)
241 seen = set()
242 all_files = []
243 expected_units = set()
244 expected_units_required_by = collections.defaultdict(list)
246 for d in service_dirs:
247 system_dir = fs_root.lookup(d)
248 if not system_dir:
249 continue
250 for child in system_dir.iterdir:
251 if child.is_symlink:
252 dest = os.path.basename(child.readlink())
253 aliases[dest].append(child.name)
254 elif child.is_file and child.name not in seen: 254 ↛ 250line 254 didn't jump to line 250 because the condition on line 254 was always true
255 seen.add(child.name)
256 all_files.append(child)
257 if "@" in child.name:
258 # dh_installsystemd does not check the contents of templated services,
259 # and we match that.
260 continue
261 with child.open() as fd:
262 for line in fd:
263 line = line.strip()
264 line_lc = line.lower()
265 if line_lc == "[install]":
266 had_install_sections.add(child.name)
267 elif line_lc.startswith("alias="): 267 ↛ 273line 267 didn't jump to line 273 because the condition on line 267 was always true
268 # This code assumes service names cannot contain spaces (as in
269 # if you copy-paste it for another field it might not work)
270 aliases[child.name].extend(
271 _remove_quote(x) for x in line[6:].split()
272 )
273 elif line_lc.startswith("also="):
274 # This code assumes service names cannot contain spaces (as in
275 # if you copy-paste it for another field it might not work)
276 for unit in (_remove_quote(x) for x in line[5:].split()):
277 expected_units_required_by[unit].append(child.absolute)
278 expected_units.add(unit)
279 for path in all_files:
280 if "@" in path.name:
281 # Match dh_installsystemd, which skips templated services
282 continue
283 names = aliases[path.name]
284 _, type_of_service = path.name.rsplit(".", 1)
285 expected_units.difference_update(names)
286 expected_units.discard(path.name)
287 names.extend(x[:-8] for x in list(names) if x.endswith(".service"))
288 names.insert(0, path.name)
289 if path.name.endswith(".service"):
290 names.insert(1, path.name[:-8])
291 yield SystemdUnit(
292 path,
293 names,
294 type_of_service,
295 systemd_service_dir,
296 # Bug (?) compat with dh_installsystemd. All units are started, but only
297 # enable those with an `[Install]` section.
298 # Possibly related bug #1055599
299 enable_by_default=path.name in had_install_sections,
300 start_by_default=True,
301 had_install_section=path.name in had_install_sections,
302 )
304 if expected_units: 304 ↛ 305line 304 didn't jump to line 305 because the condition on line 304 was never true
305 for unit_name in expected_units:
306 required_by = expected_units_required_by[unit_name]
307 required_names = ", ".join(required_by)
308 _error(
309 f"The unit {unit_name} was required by {required_names} (via Also=...)"
310 f" but was not present in the package {pkg.name}"
311 )
314def generate_snippets_for_init_scripts(
315 services: Sequence[ServiceDefinition[None]],
316 ctrl: BinaryCtrlAccessor,
317 _context: PackageProcessingContext,
318) -> None:
319 for service_def in services:
320 script_name = service_def.path.name
321 script_installed_path = service_def.path.absolute
323 update_rcd_params = (
324 "defaults" if service_def.auto_enable_on_install else "defaults-disabled"
325 )
327 ctrl.maintscript.unconditionally_in_script(
328 "preinst",
329 textwrap.dedent(
330 """\
331 if [ "$1" = "install" ] && [ -n "$2" ] && [ -x {DPKG_ROOT}{SCRIPT_PATH} ] ; then
332 chmod +x {DPKG_ROOT}{SCRIPT_PATH} >/dev/null || true
333 fi
334 """.format(
335 DPKG_ROOT=DPKG_ROOT,
336 SCRIPT_PATH=ctrl.maintscript.escape_shell_words(
337 script_installed_path
338 ),
339 )
340 ),
341 )
343 lines = [
344 "if {EMPTY_DPKG_ROOT_CONDITION} && [ -x {SCRIPT_PATH} ]; then",
345 " update-rc.d {SCRIPT_NAME} {UPDATE_RCD_PARAMS} >/dev/null || exit 1",
346 ]
348 if ( 348 ↛ 360line 348 didn't jump to line 360 because the condition on line 348 was always true
349 service_def.auto_start_on_install
350 and service_def.on_upgrade != "stop-then-start"
351 ):
352 lines.append(' if [ -z "$2" ]; then')
353 lines.append(
354 " invoke-rc.d --skip-systemd-native {SCRIPT_NAME} start >/dev/null || exit 1".format(
355 SCRIPT_NAME=ctrl.maintscript.escape_shell_words(script_name),
356 )
357 )
358 lines.append(" fi")
360 if service_def.on_upgrade in ("restart", "reload"): 360 ↛ 369line 360 didn't jump to line 369 because the condition on line 360 was always true
361 lines.append(' if [ -n "$2" ]; then')
362 lines.append(
363 " invoke-rc.d --skip-systemd-native {SCRIPT_NAME} {ACTION} >/dev/null || exit 1".format(
364 SCRIPT_NAME=ctrl.maintscript.escape_shell_words(script_name),
365 ACTION=service_def.on_upgrade,
366 )
367 )
368 lines.append(" fi")
369 elif service_def.on_upgrade == "stop-then-start":
370 lines.append(
371 " invoke-rc.d --skip-systemd-native {SCRIPT_NAME} start >/dev/null || exit 1".format(
372 SCRIPT_NAME=ctrl.maintscript.escape_shell_words(script_name),
373 )
374 )
375 ctrl.maintscript.unconditionally_in_script(
376 "preinst",
377 textwrap.dedent(
378 """\
379 if {EMPTY_DPKG_ROOT_CONDITION} && [ "$1" = "upgrade" ] && [ -x {SCRIPT_PATH} ]; then
380 invoke-rc.d --skip-systemd-native {SCRIPT_NAME} stop > /dev/null || true
381 fi
382 """.format(
383 EMPTY_DPKG_ROOT_CONDITION=EMPTY_DPKG_ROOT_CONDITION,
384 SCRIPT_PATH=ctrl.maintscript.escape_shell_words(
385 script_installed_path
386 ),
387 SCRIPT_NAME=ctrl.maintscript.escape_shell_words(script_name),
388 )
389 ),
390 )
391 elif service_def.on_upgrade != "do-nothing":
392 raise AssertionError(
393 f"Missing support for on_upgrade rule: {service_def.on_upgrade}"
394 )
396 lines.append("fi")
397 combined = "".join(x if x.endswith("\n") else f"{x}\n" for x in lines)
398 ctrl.maintscript.on_configure(
399 combined.format(
400 EMPTY_DPKG_ROOT_CONDITION=EMPTY_DPKG_ROOT_CONDITION,
401 DPKG_ROOT=DPKG_ROOT,
402 UPDATE_RCD_PARAMS=update_rcd_params,
403 SCRIPT_PATH=ctrl.maintscript.escape_shell_words(script_installed_path),
404 SCRIPT_NAME=ctrl.maintscript.escape_shell_words(script_name),
405 )
406 )
408 ctrl.maintscript.on_removed(
409 textwrap.dedent(
410 """\
411 if [ -x {DPKG_ROOT}{SCRIPT_PATH} ]; then
412 chmod -x {DPKG_ROOT}{SCRIPT_PATH} > /dev/null || true
413 fi
414 """.format(
415 DPKG_ROOT=DPKG_ROOT,
416 SCRIPT_PATH=ctrl.maintscript.escape_shell_words(
417 script_installed_path
418 ),
419 )
420 )
421 )
422 ctrl.maintscript.on_purge(
423 textwrap.dedent(
424 """\
425 if {EMPTY_DPKG_ROOT_CONDITION} ; then
426 update-rc.d {SCRIPT_NAME} remove >/dev/null
427 fi
428 """.format(
429 SCRIPT_NAME=ctrl.maintscript.escape_shell_words(script_name),
430 EMPTY_DPKG_ROOT_CONDITION=EMPTY_DPKG_ROOT_CONDITION,
431 )
432 )
433 )
436def detect_sysv_init_service_files(
437 fs_root: VirtualPath,
438 service_registry: ServiceRegistry[None],
439 _context: PackageProcessingContext,
440) -> None:
441 etc_init = fs_root.lookup("/etc/init.d")
442 if not etc_init:
443 return
444 for path in etc_init.iterdir:
445 if path.is_dir or not path.is_executable:
446 continue
448 service_registry.register_service(
449 path,
450 path.name,
451 )