Coverage for src/debputy/plugins/debputy/service_management.py: 82%

164 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-10-12 15:06 +0000

1import collections 

2import dataclasses 

3import os 

4import textwrap 

5from typing import Dict, List, Literal 

6from collections.abc import Iterable, Sequence 

7 

8from debputy.packages import BinaryPackage 

9from debputy.plugin.api.spec import ( 

10 ServiceRegistry, 

11 VirtualPath, 

12 PackageProcessingContext, 

13 BinaryCtrlAccessor, 

14 ServiceDefinition, 

15) 

16from debputy.util import _error, assume_not_none 

17 

18DPKG_ROOT = '"${DPKG_ROOT}"' 

19EMPTY_DPKG_ROOT_CONDITION = '[ -z "${DPKG_ROOT}" ]' 

20SERVICE_MANAGER_IS_SYSTEMD_CONDITION = "[ -d /run/systemd/system ]" 

21 

22 

23@dataclasses.dataclass(slots=True) 

24class SystemdServiceContext: 

25 had_install_section: bool 

26 

27 

28@dataclasses.dataclass(slots=True) 

29class SystemdUnit: 

30 path: VirtualPath 

31 names: list[str] 

32 type_of_service: str 

33 service_scope: str 

34 enable_by_default: bool 

35 start_by_default: bool 

36 had_install_section: bool 

37 

38 

39def detect_systemd_service_files( 

40 fs_root: VirtualPath, 

41 service_registry: ServiceRegistry[SystemdServiceContext], 

42 context: PackageProcessingContext, 

43) -> None: 

44 pkg = context.binary_package 

45 systemd_units = _find_and_analyze_systemd_service_files(pkg, fs_root, "system") 

46 for unit in systemd_units: 

47 service_registry.register_service( 

48 unit.path, 

49 unit.names, 

50 type_of_service=unit.type_of_service, 

51 service_scope=unit.service_scope, 

52 enable_by_default=unit.enable_by_default, 

53 start_by_default=unit.start_by_default, 

54 default_upgrade_rule="restart" if unit.start_by_default else "do-nothing", 

55 service_context=SystemdServiceContext( 

56 unit.had_install_section, 

57 ), 

58 ) 

59 

60 

61def generate_snippets_for_systemd_units( 

62 services: Sequence[ServiceDefinition[SystemdServiceContext]], 

63 ctrl: BinaryCtrlAccessor, 

64 _context: PackageProcessingContext, 

65) -> None: 

66 stop_before_upgrade: list[str] = [] 

67 stop_then_start_scripts = [] 

68 on_purge = [] 

69 start_on_install = [] 

70 action_on_upgrade = collections.defaultdict(list) 

71 assert services 

72 

73 for service_def in services: 

74 if service_def.auto_enable_on_install: 

75 template = """\ 

76 if deb-systemd-helper debian-installed {UNITFILE}; then 

77 # The following line should be removed in trixie or trixie+1 

78 deb-systemd-helper unmask {UNITFILE} >/dev/null || true 

79 

80 if deb-systemd-helper --quiet was-enabled {UNITFILE}; then 

81 # Create new symlinks, if any. 

82 deb-systemd-helper enable {UNITFILE} >/dev/null || true 

83 fi 

84 fi 

85 

86 # Update the statefile to add new symlinks (if any), which need to be cleaned 

87 # up on purge. Also remove old symlinks. 

88 deb-systemd-helper update-state {UNITFILE} >/dev/null || true 

89 """ 

90 else: 

91 template = """\ 

92 # The following line should be removed in trixie or trixie+1 

93 deb-systemd-helper unmask {UNITFILE} >/dev/null || true 

94 

95 # was-enabled defaults to true, so new installations run enable. 

96 if deb-systemd-helper --quiet was-enabled {UNITFILE}; then 

97 # Enables the unit on first installation, creates new 

98 # symlinks on upgrades if the unit file has changed. 

99 deb-systemd-helper enable {UNITFILE} >/dev/null || true 

100 else 

101 # Update the statefile to add new symlinks (if any), which need to be 

102 # cleaned up on purge. Also remove old symlinks. 

103 deb-systemd-helper update-state {UNITFILE} >/dev/null || true 

104 fi 

105 """ 

106 service_name = service_def.name 

107 

108 if assume_not_none(service_def.service_context).had_install_section: 

109 ctrl.maintscript.on_configure( 

110 template.format( 

111 UNITFILE=ctrl.maintscript.escape_shell_words(service_name), 

112 ) 

113 ) 

114 on_purge.append(service_name) 

115 elif service_def.auto_enable_on_install: 115 ↛ 116line 115 didn't jump to line 116 because the condition on line 115 was never true

116 _error( 

117 f'The service "{service_name}" cannot be enabled under "systemd" as' 

118 f' it has no "[Install]" section. Please correct {service_def.definition_source}' 

119 f' so that it does not enable the service or does not apply to "systemd"' 

120 ) 

121 

122 if service_def.auto_start_on_install: 122 ↛ 124line 122 didn't jump to line 124 because the condition on line 122 was always true

123 start_on_install.append(service_name) 

124 if service_def.on_upgrade == "stop-then-start": 124 ↛ 125line 124 didn't jump to line 125 because the condition on line 124 was never true

125 stop_then_start_scripts.append(service_name) 

126 elif service_def.on_upgrade in ("restart", "reload"): 126 ↛ 129line 126 didn't jump to line 129 because the condition on line 126 was always true

127 action: str = service_def.on_upgrade 

128 action_on_upgrade[action].append(service_name) 

129 elif service_def.on_upgrade != "do-nothing": 

130 raise AssertionError( 

131 f"Missing support for on_upgrade rule: {service_def.on_upgrade}" 

132 ) 

133 

134 if start_on_install or action_on_upgrade: 134 ↛ 171line 134 didn't jump to line 171 because the condition on line 134 was always true

135 lines = [ 

136 "if {EMPTY_DPKG_ROOT_CONDITION} && {SERVICE_MANAGER_IS_SYSTEMD_CONDITION}; then".format( 

137 EMPTY_DPKG_ROOT_CONDITION=EMPTY_DPKG_ROOT_CONDITION, 

138 SERVICE_MANAGER_IS_SYSTEMD_CONDITION=SERVICE_MANAGER_IS_SYSTEMD_CONDITION, 

139 ), 

140 " systemctl --system daemon-reload >/dev/null || true", 

141 ] 

142 if stop_then_start_scripts: 142 ↛ 143line 142 didn't jump to line 143 because the condition on line 142 was never true

143 unit_files = ctrl.maintscript.escape_shell_words(*stop_then_start_scripts) 

144 lines.append( 

145 " deb-systemd-invoke start {UNITFILES} >/dev/null || true".format( 

146 UNITFILES=unit_files, 

147 ) 

148 ) 

149 if start_on_install: 149 ↛ 157line 149 didn't jump to line 157 because the condition on line 149 was always true

150 lines.append(' if [ -z "$2" ]; then') 

151 lines.append( 

152 " deb-systemd-invoke start {UNITFILES} >/dev/null || true".format( 

153 UNITFILES=ctrl.maintscript.escape_shell_words(*start_on_install), 

154 ) 

155 ) 

156 lines.append(" fi") 

157 if action_on_upgrade: 157 ↛ 167line 157 didn't jump to line 167 because the condition on line 157 was always true

158 lines.append(' if [ -n "$2" ]; then') 

159 for action, units in action_on_upgrade.items(): 

160 lines.append( 

161 " deb-systemd-invoke {ACTION} {UNITFILES} >/dev/null || true".format( 

162 ACTION=action, 

163 UNITFILES=ctrl.maintscript.escape_shell_words(*units), 

164 ) 

165 ) 

166 lines.append(" fi") 

167 lines.append("fi") 

168 combined = "".join(x if x.endswith("\n") else f"{x}\n" for x in lines) 

169 ctrl.maintscript.on_configure(combined) 

170 

171 if stop_then_start_scripts: 171 ↛ 172line 171 didn't jump to line 172 because the condition on line 171 was never true

172 ctrl.maintscript.unconditionally_in_script( 

173 "preinst", 

174 textwrap.dedent( 

175 """\ 

176 if {EMPTY_DPKG_ROOT_CONDITION} && [ "$1" = upgrade ] && {SERVICE_MANAGER_IS_SYSTEMD_CONDITION} ; then 

177 deb-systemd-invoke stop {UNIT_FILES} >/dev/null || true 

178 fi 

179 """.format( 

180 EMPTY_DPKG_ROOT_CONDITION=EMPTY_DPKG_ROOT_CONDITION, 

181 SERVICE_MANAGER_IS_SYSTEMD_CONDITION=SERVICE_MANAGER_IS_SYSTEMD_CONDITION, 

182 UNIT_FILES=ctrl.maintscript.escape_shell_words( 

183 *stop_then_start_scripts 

184 ), 

185 ) 

186 ), 

187 ) 

188 

189 if stop_before_upgrade: 189 ↛ 190line 189 didn't jump to line 190 because the condition on line 189 was never true

190 ctrl.maintscript.on_before_removal( 

191 """\ 

192 if {EMPTY_DPKG_ROOT_CONDITION} && {SERVICE_MANAGER_IS_SYSTEMD_CONDITION} ; then 

193 deb-systemd-invoke stop {UNIT_FILES} >/dev/null || true 

194 fi 

195 """.format( 

196 EMPTY_DPKG_ROOT_CONDITION=EMPTY_DPKG_ROOT_CONDITION, 

197 SERVICE_MANAGER_IS_SYSTEMD_CONDITION=SERVICE_MANAGER_IS_SYSTEMD_CONDITION, 

198 UNIT_FILES=ctrl.maintscript.escape_shell_words(*stop_before_upgrade), 

199 ) 

200 ) 

201 if on_purge: 201 ↛ 211line 201 didn't jump to line 211 because the condition on line 201 was always true

202 ctrl.maintscript.on_purge( 

203 """\ 

204 if [ -x "/usr/bin/deb-systemd-helper" ]; then 

205 deb-systemd-helper purge {UNITFILES} >/dev/null || true 

206 fi 

207 """.format( 

208 UNITFILES=ctrl.maintscript.escape_shell_words(*stop_before_upgrade), 

209 ) 

210 ) 

211 ctrl.maintscript.on_removed( 

212 textwrap.dedent( 

213 """\ 

214 if {SERVICE_MANAGER_IS_SYSTEMD_CONDITION} ; then 

215 systemctl --system daemon-reload >/dev/null || true 

216 fi 

217 """.format( 

218 SERVICE_MANAGER_IS_SYSTEMD_CONDITION=SERVICE_MANAGER_IS_SYSTEMD_CONDITION 

219 ) 

220 ) 

221 ) 

222 

223 

224def _remove_quote(v: str) -> str: 

225 if v and v[0] == v[-1] and v[0] in ('"', "'"): 225 ↛ 227line 225 didn't jump to line 227 because the condition on line 225 was always true

226 return v[1:-1] 

227 return v 

228 

229 

230def _find_and_analyze_systemd_service_files( 

231 pkg: BinaryPackage, 

232 fs_root: VirtualPath, 

233 systemd_service_dir: Literal["system", "user"], 

234) -> Iterable[SystemdUnit]: 

235 service_dirs = [ 

236 f"./usr/lib/systemd/{systemd_service_dir}", 

237 f"./lib/systemd/{systemd_service_dir}", 

238 ] 

239 had_install_sections = set() 

240 aliases: dict[str, list[str]] = collections.defaultdict(list) 

241 seen = set() 

242 all_files = [] 

243 expected_units = set() 

244 expected_units_required_by = collections.defaultdict(list) 

245 

246 for d in service_dirs: 

247 system_dir = fs_root.lookup(d) 

248 if not system_dir: 

249 continue 

250 for child in system_dir.iterdir: 

251 if child.is_symlink: 

252 dest = os.path.basename(child.readlink()) 

253 aliases[dest].append(child.name) 

254 elif child.is_file and child.name not in seen: 254 ↛ 250line 254 didn't jump to line 250 because the condition on line 254 was always true

255 seen.add(child.name) 

256 all_files.append(child) 

257 if "@" in child.name: 

258 # dh_installsystemd does not check the contents of templated services, 

259 # and we match that. 

260 continue 

261 with child.open() as fd: 

262 for line in fd: 

263 line = line.strip() 

264 line_lc = line.lower() 

265 if line_lc == "[install]": 

266 had_install_sections.add(child.name) 

267 elif line_lc.startswith("alias="): 267 ↛ 273line 267 didn't jump to line 273 because the condition on line 267 was always true

268 # This code assumes service names cannot contain spaces (as in 

269 # if you copy-paste it for another field it might not work) 

270 aliases[child.name].extend( 

271 _remove_quote(x) for x in line[6:].split() 

272 ) 

273 elif line_lc.startswith("also="): 

274 # This code assumes service names cannot contain spaces (as in 

275 # if you copy-paste it for another field it might not work) 

276 for unit in (_remove_quote(x) for x in line[5:].split()): 

277 expected_units_required_by[unit].append(child.absolute) 

278 expected_units.add(unit) 

279 for path in all_files: 

280 if "@" in path.name: 

281 # Match dh_installsystemd, which skips templated services 

282 continue 

283 names = aliases[path.name] 

284 _, type_of_service = path.name.rsplit(".", 1) 

285 expected_units.difference_update(names) 

286 expected_units.discard(path.name) 

287 names.extend(x[:-8] for x in list(names) if x.endswith(".service")) 

288 names.insert(0, path.name) 

289 if path.name.endswith(".service"): 

290 names.insert(1, path.name[:-8]) 

291 yield SystemdUnit( 

292 path, 

293 names, 

294 type_of_service, 

295 systemd_service_dir, 

296 # Bug (?) compat with dh_installsystemd. All units are started, but only 

297 # enable those with an `[Install]` section. 

298 # Possibly related bug #1055599 

299 enable_by_default=path.name in had_install_sections, 

300 start_by_default=True, 

301 had_install_section=path.name in had_install_sections, 

302 ) 

303 

304 if expected_units: 304 ↛ 305line 304 didn't jump to line 305 because the condition on line 304 was never true

305 for unit_name in expected_units: 

306 required_by = expected_units_required_by[unit_name] 

307 required_names = ", ".join(required_by) 

308 _error( 

309 f"The unit {unit_name} was required by {required_names} (via Also=...)" 

310 f" but was not present in the package {pkg.name}" 

311 ) 

312 

313 

314def generate_snippets_for_init_scripts( 

315 services: Sequence[ServiceDefinition[None]], 

316 ctrl: BinaryCtrlAccessor, 

317 _context: PackageProcessingContext, 

318) -> None: 

319 for service_def in services: 

320 script_name = service_def.path.name 

321 script_installed_path = service_def.path.absolute 

322 

323 update_rcd_params = ( 

324 "defaults" if service_def.auto_enable_on_install else "defaults-disabled" 

325 ) 

326 

327 ctrl.maintscript.unconditionally_in_script( 

328 "preinst", 

329 textwrap.dedent( 

330 """\ 

331 if [ "$1" = "install" ] && [ -n "$2" ] && [ -x {DPKG_ROOT}{SCRIPT_PATH} ] ; then 

332 chmod +x {DPKG_ROOT}{SCRIPT_PATH} >/dev/null || true 

333 fi 

334 """.format( 

335 DPKG_ROOT=DPKG_ROOT, 

336 SCRIPT_PATH=ctrl.maintscript.escape_shell_words( 

337 script_installed_path 

338 ), 

339 ) 

340 ), 

341 ) 

342 

343 lines = [ 

344 "if {EMPTY_DPKG_ROOT_CONDITION} && [ -x {SCRIPT_PATH} ]; then", 

345 " update-rc.d {SCRIPT_NAME} {UPDATE_RCD_PARAMS} >/dev/null || exit 1", 

346 ] 

347 

348 if ( 348 ↛ 360line 348 didn't jump to line 360 because the condition on line 348 was always true

349 service_def.auto_start_on_install 

350 and service_def.on_upgrade != "stop-then-start" 

351 ): 

352 lines.append(' if [ -z "$2" ]; then') 

353 lines.append( 

354 " invoke-rc.d --skip-systemd-native {SCRIPT_NAME} start >/dev/null || exit 1".format( 

355 SCRIPT_NAME=ctrl.maintscript.escape_shell_words(script_name), 

356 ) 

357 ) 

358 lines.append(" fi") 

359 

360 if service_def.on_upgrade in ("restart", "reload"): 360 ↛ 369line 360 didn't jump to line 369 because the condition on line 360 was always true

361 lines.append(' if [ -n "$2" ]; then') 

362 lines.append( 

363 " invoke-rc.d --skip-systemd-native {SCRIPT_NAME} {ACTION} >/dev/null || exit 1".format( 

364 SCRIPT_NAME=ctrl.maintscript.escape_shell_words(script_name), 

365 ACTION=service_def.on_upgrade, 

366 ) 

367 ) 

368 lines.append(" fi") 

369 elif service_def.on_upgrade == "stop-then-start": 

370 lines.append( 

371 " invoke-rc.d --skip-systemd-native {SCRIPT_NAME} start >/dev/null || exit 1".format( 

372 SCRIPT_NAME=ctrl.maintscript.escape_shell_words(script_name), 

373 ) 

374 ) 

375 ctrl.maintscript.unconditionally_in_script( 

376 "preinst", 

377 textwrap.dedent( 

378 """\ 

379 if {EMPTY_DPKG_ROOT_CONDITION} && [ "$1" = "upgrade" ] && [ -x {SCRIPT_PATH} ]; then 

380 invoke-rc.d --skip-systemd-native {SCRIPT_NAME} stop > /dev/null || true 

381 fi 

382 """.format( 

383 EMPTY_DPKG_ROOT_CONDITION=EMPTY_DPKG_ROOT_CONDITION, 

384 SCRIPT_PATH=ctrl.maintscript.escape_shell_words( 

385 script_installed_path 

386 ), 

387 SCRIPT_NAME=ctrl.maintscript.escape_shell_words(script_name), 

388 ) 

389 ), 

390 ) 

391 elif service_def.on_upgrade != "do-nothing": 

392 raise AssertionError( 

393 f"Missing support for on_upgrade rule: {service_def.on_upgrade}" 

394 ) 

395 

396 lines.append("fi") 

397 combined = "".join(x if x.endswith("\n") else f"{x}\n" for x in lines) 

398 ctrl.maintscript.on_configure( 

399 combined.format( 

400 EMPTY_DPKG_ROOT_CONDITION=EMPTY_DPKG_ROOT_CONDITION, 

401 DPKG_ROOT=DPKG_ROOT, 

402 UPDATE_RCD_PARAMS=update_rcd_params, 

403 SCRIPT_PATH=ctrl.maintscript.escape_shell_words(script_installed_path), 

404 SCRIPT_NAME=ctrl.maintscript.escape_shell_words(script_name), 

405 ) 

406 ) 

407 

408 ctrl.maintscript.on_removed( 

409 textwrap.dedent( 

410 """\ 

411 if [ -x {DPKG_ROOT}{SCRIPT_PATH} ]; then 

412 chmod -x {DPKG_ROOT}{SCRIPT_PATH} > /dev/null || true 

413 fi 

414 """.format( 

415 DPKG_ROOT=DPKG_ROOT, 

416 SCRIPT_PATH=ctrl.maintscript.escape_shell_words( 

417 script_installed_path 

418 ), 

419 ) 

420 ) 

421 ) 

422 ctrl.maintscript.on_purge( 

423 textwrap.dedent( 

424 """\ 

425 if {EMPTY_DPKG_ROOT_CONDITION} ; then 

426 update-rc.d {SCRIPT_NAME} remove >/dev/null 

427 fi 

428 """.format( 

429 SCRIPT_NAME=ctrl.maintscript.escape_shell_words(script_name), 

430 EMPTY_DPKG_ROOT_CONDITION=EMPTY_DPKG_ROOT_CONDITION, 

431 ) 

432 ) 

433 ) 

434 

435 

436def detect_sysv_init_service_files( 

437 fs_root: VirtualPath, 

438 service_registry: ServiceRegistry[None], 

439 _context: PackageProcessingContext, 

440) -> None: 

441 etc_init = fs_root.lookup("/etc/init.d") 

442 if not etc_init: 

443 return 

444 for path in etc_init.iterdir: 

445 if path.is_dir or not path.is_executable: 

446 continue 

447 

448 service_registry.register_service( 

449 path, 

450 path.name, 

451 )