Coverage for src/debputy/plugins/debputy/service_management.py: 82%

164 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2026-03-22 09:00 +0000

1import collections 

2import dataclasses 

3import os 

4import textwrap 

5from typing import Literal 

6from collections.abc import Iterable, Sequence 

7 

8from debputy.packages import BinaryPackage 

9from debputy.plugin.api.spec import ( 

10 ServiceRegistry, 

11 VirtualPath, 

12 PackageProcessingContext, 

13 BinaryCtrlAccessor, 

14 ServiceDefinition, 

15) 

16from debputy.util import _error, assume_not_none 

17 

18DPKG_ROOT = '"${DPKG_ROOT}"' 

19EMPTY_DPKG_ROOT_CONDITION = '[ -z "${DPKG_ROOT}" ]' 

20SERVICE_MANAGER_IS_SYSTEMD_CONDITION = "[ -d /run/systemd/system ]" 

21 

22 

23@dataclasses.dataclass(slots=True) 

24class SystemdServiceContext: 

25 had_install_section: bool 

26 

27 

28@dataclasses.dataclass(slots=True) 

29class SystemdUnit: 

30 path: VirtualPath 

31 names: list[str] 

32 type_of_service: str 

33 service_scope: str 

34 enable_by_default: bool 

35 start_by_default: bool 

36 had_install_section: bool 

37 

38 

39def detect_systemd_service_files( 

40 fs_root: VirtualPath, 

41 service_registry: ServiceRegistry[SystemdServiceContext], 

42 context: PackageProcessingContext, 

43) -> None: 

44 pkg = context.binary_package 

45 systemd_units = _find_and_analyze_systemd_service_files(pkg, fs_root, "system") 

46 for unit in systemd_units: 

47 service_registry.register_service( 

48 unit.path, 

49 unit.names, 

50 type_of_service=unit.type_of_service, 

51 service_scope=unit.service_scope, 

52 enable_by_default=unit.enable_by_default, 

53 start_by_default=unit.start_by_default, 

54 default_upgrade_rule="restart" if unit.start_by_default else "do-nothing", 

55 service_context=SystemdServiceContext( 

56 unit.had_install_section, 

57 ), 

58 ) 

59 

60 

61def generate_snippets_for_systemd_units( 

62 services: Sequence[ServiceDefinition[SystemdServiceContext]], 

63 ctrl: BinaryCtrlAccessor, 

64 _context: PackageProcessingContext, 

65) -> None: 

66 stop_before_upgrade: list[str] = [] 

67 stop_then_start_scripts = [] 

68 on_purge = [] 

69 start_on_install = [] 

70 action_on_upgrade = collections.defaultdict(list) 

71 assert services 

72 

73 for service_def in services: 

74 if service_def.auto_enable_on_install: 

75 template = """\ 

76 if deb-systemd-helper debian-installed {UNITFILE}; then 

77 # The following line should be removed in trixie or trixie+1 

78 deb-systemd-helper unmask {UNITFILE} >/dev/null || true 

79 

80 if deb-systemd-helper --quiet was-enabled {UNITFILE}; then 

81 # Create new symlinks, if any. 

82 deb-systemd-helper enable {UNITFILE} >/dev/null || true 

83 fi 

84 fi 

85 

86 # Update the statefile to add new symlinks (if any), which need to be cleaned 

87 # up on purge. Also remove old symlinks. 

88 deb-systemd-helper update-state {UNITFILE} >/dev/null || true 

89 """ 

90 else: 

91 template = """\ 

92 # The following line should be removed in trixie or trixie+1 

93 deb-systemd-helper unmask {UNITFILE} >/dev/null || true 

94 

95 # was-enabled defaults to true, so new installations run enable. 

96 if deb-systemd-helper --quiet was-enabled {UNITFILE}; then 

97 # Enables the unit on first installation, creates new 

98 # symlinks on upgrades if the unit file has changed. 

99 deb-systemd-helper enable {UNITFILE} >/dev/null || true 

100 else 

101 # Update the statefile to add new symlinks (if any), which need to be 

102 # cleaned up on purge. Also remove old symlinks. 

103 deb-systemd-helper update-state {UNITFILE} >/dev/null || true 

104 fi 

105 """ 

106 service_name = service_def.name 

107 

108 if assume_not_none(service_def.service_context).had_install_section: 

109 ctrl.maintscript.on_configure( 

110 template.format( 

111 UNITFILE=ctrl.maintscript.escape_shell_words(service_name), 

112 ) 

113 ) 

114 on_purge.append(service_name) 

115 elif service_def.auto_enable_on_install: 115 ↛ 116line 115 didn't jump to line 116 because the condition on line 115 was never true

116 _error( 

117 f'The service "{service_name}" cannot be enabled under "systemd" as' 

118 f' it has no "[Install]" section. Please correct {service_def.definition_source}' 

119 f' so that it does not enable the service or does not apply to "systemd"' 

120 ) 

121 

122 if service_def.auto_start_on_install: 122 ↛ 124line 122 didn't jump to line 124 because the condition on line 122 was always true

123 start_on_install.append(service_name) 

124 if service_def.on_upgrade == "stop-then-start": 124 ↛ 125line 124 didn't jump to line 125 because the condition on line 124 was never true

125 stop_then_start_scripts.append(service_name) 

126 elif service_def.on_upgrade in ("restart", "reload"): 126 ↛ 129line 126 didn't jump to line 129 because the condition on line 126 was always true

127 action: str = service_def.on_upgrade 

128 action_on_upgrade[action].append(service_name) 

129 elif service_def.on_upgrade != "do-nothing": 

130 raise AssertionError( 

131 f"Missing support for on_upgrade rule: {service_def.on_upgrade}" 

132 ) 

133 

134 if start_on_install or action_on_upgrade: 134 ↛ 171line 134 didn't jump to line 171 because the condition on line 134 was always true

135 lines = [ 

136 "if {EMPTY_DPKG_ROOT_CONDITION} && {SERVICE_MANAGER_IS_SYSTEMD_CONDITION}; then".format( 

137 EMPTY_DPKG_ROOT_CONDITION=EMPTY_DPKG_ROOT_CONDITION, 

138 SERVICE_MANAGER_IS_SYSTEMD_CONDITION=SERVICE_MANAGER_IS_SYSTEMD_CONDITION, 

139 ), 

140 " systemctl --system daemon-reload >/dev/null || true", 

141 ] 

142 if stop_then_start_scripts: 142 ↛ 143line 142 didn't jump to line 143 because the condition on line 142 was never true

143 unit_files = ctrl.maintscript.escape_shell_words(*stop_then_start_scripts) 

144 lines.append( 

145 " deb-systemd-invoke start {UNITFILES} >/dev/null || true".format( 

146 UNITFILES=unit_files, 

147 ) 

148 ) 

149 if start_on_install: 149 ↛ 157line 149 didn't jump to line 157 because the condition on line 149 was always true

150 lines.append(' if [ -z "$2" ]; then') 

151 lines.append( 

152 " deb-systemd-invoke start {UNITFILES} >/dev/null || true".format( 

153 UNITFILES=ctrl.maintscript.escape_shell_words(*start_on_install), 

154 ) 

155 ) 

156 lines.append(" fi") 

157 if action_on_upgrade: 157 ↛ 167line 157 didn't jump to line 167 because the condition on line 157 was always true

158 lines.append(' if [ -n "$2" ]; then') 

159 for action, units in action_on_upgrade.items(): 

160 lines.append( 

161 " deb-systemd-invoke {ACTION} {UNITFILES} >/dev/null || true".format( 

162 ACTION=action, 

163 UNITFILES=ctrl.maintscript.escape_shell_words(*units), 

164 ) 

165 ) 

166 lines.append(" fi") 

167 lines.append("fi") 

168 combined = "".join(x if x.endswith("\n") else f"{x}\n" for x in lines) 

169 ctrl.maintscript.on_configure(combined) 

170 

171 if stop_then_start_scripts: 171 ↛ 172line 171 didn't jump to line 172 because the condition on line 171 was never true

172 ctrl.maintscript.unconditionally_in_script( 

173 "preinst", 

174 textwrap.dedent( 

175 """\ 

176 if {EMPTY_DPKG_ROOT_CONDITION} && [ "$1" = upgrade ] && {SERVICE_MANAGER_IS_SYSTEMD_CONDITION} ; then 

177 deb-systemd-invoke stop {UNIT_FILES} >/dev/null || true 

178 fi 

179 """.format( 

180 EMPTY_DPKG_ROOT_CONDITION=EMPTY_DPKG_ROOT_CONDITION, 

181 SERVICE_MANAGER_IS_SYSTEMD_CONDITION=SERVICE_MANAGER_IS_SYSTEMD_CONDITION, 

182 UNIT_FILES=ctrl.maintscript.escape_shell_words( 

183 *stop_then_start_scripts 

184 ), 

185 ) 

186 ), 

187 ) 

188 

189 if stop_before_upgrade: 189 ↛ 190line 189 didn't jump to line 190 because the condition on line 189 was never true

190 ctrl.maintscript.on_before_removal( 

191 """\ 

192 if {EMPTY_DPKG_ROOT_CONDITION} && {SERVICE_MANAGER_IS_SYSTEMD_CONDITION} ; then 

193 deb-systemd-invoke stop {UNIT_FILES} >/dev/null || true 

194 fi 

195 """.format( 

196 EMPTY_DPKG_ROOT_CONDITION=EMPTY_DPKG_ROOT_CONDITION, 

197 SERVICE_MANAGER_IS_SYSTEMD_CONDITION=SERVICE_MANAGER_IS_SYSTEMD_CONDITION, 

198 UNIT_FILES=ctrl.maintscript.escape_shell_words(*stop_before_upgrade), 

199 ) 

200 ) 

201 if on_purge: 201 ↛ 209line 201 didn't jump to line 209 because the condition on line 201 was always true

202 ctrl.maintscript.on_purge( 

203 """\ 

204 deb-systemd-helper purge {UNITFILES} >/dev/null || true 

205 """.format( 

206 UNITFILES=ctrl.maintscript.escape_shell_words(*stop_before_upgrade), 

207 ) 

208 ) 

209 ctrl.maintscript.on_removed( 

210 textwrap.dedent( 

211 """\ 

212 if {SERVICE_MANAGER_IS_SYSTEMD_CONDITION} ; then 

213 systemctl --system daemon-reload >/dev/null || true 

214 fi 

215 """.format( 

216 SERVICE_MANAGER_IS_SYSTEMD_CONDITION=SERVICE_MANAGER_IS_SYSTEMD_CONDITION 

217 ) 

218 ) 

219 ) 

220 

221 

222def _remove_quote(v: str) -> str: 

223 if v and v[0] == v[-1] and v[0] in ('"', "'"): 223 ↛ 225line 223 didn't jump to line 225 because the condition on line 223 was always true

224 return v[1:-1] 

225 return v 

226 

227 

228def _find_and_analyze_systemd_service_files( 

229 pkg: BinaryPackage, 

230 fs_root: VirtualPath, 

231 systemd_service_dir: Literal["system", "user"], 

232) -> Iterable[SystemdUnit]: 

233 service_dirs = [ 

234 f"./usr/lib/systemd/{systemd_service_dir}", 

235 f"./lib/systemd/{systemd_service_dir}", 

236 ] 

237 had_install_sections = set() 

238 aliases: dict[str, list[str]] = collections.defaultdict(list) 

239 seen = set() 

240 all_files = [] 

241 expected_units = set() 

242 expected_units_required_by = collections.defaultdict(list) 

243 

244 for d in service_dirs: 

245 system_dir = fs_root.lookup(d) 

246 if not system_dir: 

247 continue 

248 for child in system_dir.iterdir(): 

249 if child.is_symlink: 

250 dest = os.path.basename(child.readlink()) 

251 aliases[dest].append(child.name) 

252 elif child.is_file and child.name not in seen: 252 ↛ 248line 252 didn't jump to line 248 because the condition on line 252 was always true

253 seen.add(child.name) 

254 all_files.append(child) 

255 if "@" in child.name: 

256 # dh_installsystemd does not check the contents of templated services, 

257 # and we match that. 

258 continue 

259 with child.open() as fd: 

260 for line in fd: 

261 line = line.strip() 

262 line_lc = line.lower() 

263 if line_lc == "[install]": 

264 had_install_sections.add(child.name) 

265 elif line_lc.startswith("alias="): 265 ↛ 271line 265 didn't jump to line 271 because the condition on line 265 was always true

266 # This code assumes service names cannot contain spaces (as in 

267 # if you copy-paste it for another field it might not work) 

268 aliases[child.name].extend( 

269 _remove_quote(x) for x in line[6:].split() 

270 ) 

271 elif line_lc.startswith("also="): 

272 # This code assumes service names cannot contain spaces (as in 

273 # if you copy-paste it for another field it might not work) 

274 for unit in (_remove_quote(x) for x in line[5:].split()): 

275 expected_units_required_by[unit].append(child.absolute) 

276 expected_units.add(unit) 

277 for path in all_files: 

278 if "@" in path.name: 

279 # Match dh_installsystemd, which skips templated services 

280 continue 

281 names = aliases[path.name] 

282 _, type_of_service = path.name.rsplit(".", 1) 

283 expected_units.difference_update(names) 

284 expected_units.discard(path.name) 

285 names.extend(x[:-8] for x in list(names) if x.endswith(".service")) 

286 names.insert(0, path.name) 

287 if path.name.endswith(".service"): 

288 names.insert(1, path.name[:-8]) 

289 yield SystemdUnit( 

290 path, 

291 names, 

292 type_of_service, 

293 systemd_service_dir, 

294 # Bug (?) compat with dh_installsystemd. All units are started, but only 

295 # enable those with an `[Install]` section. 

296 # Possibly related bug #1055599 

297 enable_by_default=path.name in had_install_sections, 

298 start_by_default=True, 

299 had_install_section=path.name in had_install_sections, 

300 ) 

301 

302 if expected_units: 302 ↛ 303line 302 didn't jump to line 303 because the condition on line 302 was never true

303 for unit_name in expected_units: 

304 required_by = expected_units_required_by[unit_name] 

305 required_names = ", ".join(required_by) 

306 _error( 

307 f"The unit {unit_name} was required by {required_names} (via Also=...)" 

308 f" but was not present in the package {pkg.name}" 

309 ) 

310 

311 

312def generate_snippets_for_init_scripts( 

313 services: Sequence[ServiceDefinition[None]], 

314 ctrl: BinaryCtrlAccessor, 

315 _context: PackageProcessingContext, 

316) -> None: 

317 for service_def in services: 

318 script_name = service_def.path.name 

319 script_installed_path = service_def.path.absolute 

320 

321 update_rcd_params = ( 

322 "defaults" if service_def.auto_enable_on_install else "defaults-disabled" 

323 ) 

324 

325 ctrl.maintscript.unconditionally_in_script( 

326 "preinst", 

327 textwrap.dedent( 

328 """\ 

329 if [ "$1" = "install" ] && [ -n "$2" ] && [ -x {DPKG_ROOT}{SCRIPT_PATH} ] ; then 

330 chmod +x {DPKG_ROOT}{SCRIPT_PATH} >/dev/null || true 

331 fi 

332 """.format( 

333 DPKG_ROOT=DPKG_ROOT, 

334 SCRIPT_PATH=ctrl.maintscript.escape_shell_words( 

335 script_installed_path 

336 ), 

337 ) 

338 ), 

339 ) 

340 

341 lines = [ 

342 "if [ -x {SCRIPT_PATH} ]; then", 

343 " update-rc.d {SCRIPT_NAME} {UPDATE_RCD_PARAMS} >/dev/null || exit 1", 

344 ] 

345 

346 if ( 346 ↛ 363line 346 didn't jump to line 363 because the condition on line 346 was always true

347 service_def.auto_start_on_install 

348 and service_def.on_upgrade != "stop-then-start" 

349 ): 

350 lines.append(' if [ -z "$2" ]; then') 

351 lines.append( 

352 """\ 

353 if {EMPTY_DPKG_ROOT_CONDITION}; then 

354 invoke-rc.d --skip-systemd-native {SCRIPT_NAME} start >/dev/null || exit 1 

355 fi 

356 """.format( 

357 SCRIPT_NAME=ctrl.maintscript.escape_shell_words(script_name), 

358 EMPTY_DPKG_ROOT_CONDITION=EMPTY_DPKG_ROOT_CONDITION, 

359 ) 

360 ) 

361 lines.append(" fi") 

362 

363 if service_def.on_upgrade in ("restart", "reload"): 363 ↛ 377line 363 didn't jump to line 377 because the condition on line 363 was always true

364 lines.append(' if [ -n "$2" ]; then') 

365 lines.append( 

366 """\ 

367 if {EMPTY_DPKG_ROOT_CONDITION}; then 

368 invoke-rc.d --skip-systemd-native {SCRIPT_NAME} {ACTION} >/dev/null || exit 1 

369 fi 

370 """.format( 

371 SCRIPT_NAME=ctrl.maintscript.escape_shell_words(script_name), 

372 ACTION=service_def.on_upgrade, 

373 EMPTY_DPKG_ROOT_CONDITION=EMPTY_DPKG_ROOT_CONDITION, 

374 ) 

375 ) 

376 lines.append(" fi") 

377 elif service_def.on_upgrade == "stop-then-start": 

378 lines.append( 

379 """\ 

380 if {EMPTY_DPKG_ROOT_CONDITION}; then 

381 invoke-rc.d --skip-systemd-native {SCRIPT_NAME} start >/dev/null || exit 1 

382 fi 

383 """.format( 

384 SCRIPT_NAME=ctrl.maintscript.escape_shell_words(script_name), 

385 EMPTY_DPKG_ROOT_CONDITION=EMPTY_DPKG_ROOT_CONDITION, 

386 ) 

387 ) 

388 ctrl.maintscript.unconditionally_in_script( 

389 "preinst", 

390 textwrap.dedent( 

391 """\ 

392 if {EMPTY_DPKG_ROOT_CONDITION} && [ "$1" = "upgrade" ] && [ -x {SCRIPT_PATH} ]; then 

393 invoke-rc.d --skip-systemd-native {SCRIPT_NAME} stop > /dev/null || true 

394 fi 

395 """.format( 

396 EMPTY_DPKG_ROOT_CONDITION=EMPTY_DPKG_ROOT_CONDITION, 

397 SCRIPT_PATH=ctrl.maintscript.escape_shell_words( 

398 script_installed_path 

399 ), 

400 SCRIPT_NAME=ctrl.maintscript.escape_shell_words(script_name), 

401 ) 

402 ), 

403 ) 

404 elif service_def.on_upgrade != "do-nothing": 

405 raise AssertionError( 

406 f"Missing support for on_upgrade rule: {service_def.on_upgrade}" 

407 ) 

408 

409 lines.append("fi") 

410 combined = "".join(x if x.endswith("\n") else f"{x}\n" for x in lines) 

411 ctrl.maintscript.on_configure( 

412 combined.format( 

413 EMPTY_DPKG_ROOT_CONDITION=EMPTY_DPKG_ROOT_CONDITION, 

414 DPKG_ROOT=DPKG_ROOT, 

415 UPDATE_RCD_PARAMS=update_rcd_params, 

416 SCRIPT_PATH=ctrl.maintscript.escape_shell_words(script_installed_path), 

417 SCRIPT_NAME=ctrl.maintscript.escape_shell_words(script_name), 

418 ) 

419 ) 

420 

421 ctrl.maintscript.on_removed( 

422 textwrap.dedent( 

423 """\ 

424 if [ -x {DPKG_ROOT}{SCRIPT_PATH} ]; then 

425 chmod -x {DPKG_ROOT}{SCRIPT_PATH} > /dev/null || true 

426 fi 

427 """.format( 

428 DPKG_ROOT=DPKG_ROOT, 

429 SCRIPT_PATH=ctrl.maintscript.escape_shell_words( 

430 script_installed_path 

431 ), 

432 ) 

433 ) 

434 ) 

435 ctrl.maintscript.on_purge( 

436 textwrap.dedent( 

437 """\ 

438 update-rc.d {SCRIPT_NAME} remove >/dev/null 

439 """.format( 

440 SCRIPT_NAME=ctrl.maintscript.escape_shell_words(script_name), 

441 ) 

442 ) 

443 ) 

444 

445 

446def detect_sysv_init_service_files( 

447 fs_root: VirtualPath, 

448 service_registry: ServiceRegistry[None], 

449 _context: PackageProcessingContext, 

450) -> None: 

451 etc_init = fs_root.lookup("/etc/init.d") 

452 if not etc_init: 

453 return 

454 for path in etc_init.iterdir(): 

455 if path.is_dir or not path.is_executable: 

456 continue 

457 

458 service_registry.register_service( 

459 path, 

460 path.name, 

461 )