1#!/usr/bin/env python3 2# SPDX-License-Identifier: LGPL-2.1-or-later 3# 4# systemd-sysv-generator integration test 5# 6# © 2015 Canonical Ltd. 7# Author: Martin Pitt <martin.pitt@ubuntu.com> 8 9import collections 10import os 11import shutil 12import subprocess 13import sys 14import tempfile 15import unittest 16 17from configparser import RawConfigParser 18from glob import glob 19 20sysv_generator = './systemd-sysv-generator' 21 22class MultiDict(collections.OrderedDict): 23 def __setitem__(self, key, value): 24 if isinstance(value, list) and key in self: 25 self[key].extend(value) 26 else: 27 super(MultiDict, self).__setitem__(key, value) 28 29class SysvGeneratorTest(unittest.TestCase): 30 def setUp(self): 31 self.workdir = tempfile.mkdtemp(prefix='sysv-gen-test.') 32 self.init_d_dir = os.path.join(self.workdir, 'init.d') 33 os.mkdir(self.init_d_dir) 34 self.rcnd_dir = self.workdir 35 self.unit_dir = os.path.join(self.workdir, 'systemd') 36 os.mkdir(self.unit_dir) 37 self.out_dir = os.path.join(self.workdir, 'output') 38 os.mkdir(self.out_dir) 39 40 def tearDown(self): 41 shutil.rmtree(self.workdir) 42 43 # 44 # Helper methods 45 # 46 47 def run_generator(self, expect_error=False): 48 '''Run sysv-generator. 49 50 Fail if stderr contains any "Fail", unless expect_error is True. 51 Return (stderr, filename -> ConfigParser) pair with output to stderr and 52 parsed generated units. 53 ''' 54 env = os.environ.copy() 55 env['SYSTEMD_LOG_LEVEL'] = 'debug' 56 env['SYSTEMD_LOG_TARGET'] = 'console' 57 env['SYSTEMD_SYSVINIT_PATH'] = self.init_d_dir 58 env['SYSTEMD_SYSVRCND_PATH'] = self.rcnd_dir 59 env['SYSTEMD_UNIT_PATH'] = self.unit_dir 60 gen = subprocess.Popen( 61 [sysv_generator, 'ignored', 'ignored', self.out_dir], 62 stdout=subprocess.PIPE, stderr=subprocess.PIPE, 63 universal_newlines=True, env=env) 64 (out, err) = gen.communicate() 65 if not expect_error: 66 self.assertFalse('Fail' in err, err) 67 self.assertEqual(gen.returncode, 0, err) 68 69 results = {} 70 for service in glob(self.out_dir + '/*.service'): 71 if os.path.islink(service): 72 continue 73 try: 74 # for python3 we need here strict=False to parse multiple 75 # lines with the same key 76 cp = RawConfigParser(dict_type=MultiDict, strict=False) 77 except TypeError: 78 # RawConfigParser in python2 does not have the strict option 79 # but it allows multiple lines with the same key by default 80 cp = RawConfigParser(dict_type=MultiDict) 81 cp.optionxform = lambda o: o # don't lower-case option names 82 with open(service) as f: 83 cp.readfp(f) 84 results[os.path.basename(service)] = cp 85 86 return (err, results) 87 88 def add_sysv(self, fname, keys, enable=False, prio=1): 89 '''Create a SysV init script with the given keys in the LSB header 90 91 There are sensible default values for all fields. 92 If enable is True, links will be created in the rcN.d dirs. In that 93 case, the priority can be given with "prio" (default to 1). 94 95 Return path of generated script. 96 ''' 97 name_without_sh = fname.endswith('.sh') and fname[:-3] or fname 98 keys.setdefault('Provides', name_without_sh) 99 keys.setdefault('Required-Start', '$local_fs') 100 keys.setdefault('Required-Stop', keys['Required-Start']) 101 keys.setdefault('Default-Start', '2 3 4 5') 102 keys.setdefault('Default-Stop', '0 1 6') 103 keys.setdefault('Short-Description', 'test {} service'.format(name_without_sh)) 104 keys.setdefault('Description', 'long description for test {} service'.format(name_without_sh)) 105 script = os.path.join(self.init_d_dir, fname) 106 with open(script, 'w') as f: 107 f.write('#!/bin/init-d-interpreter\n### BEGIN INIT INFO\n') 108 for k, v in keys.items(): 109 if v is not None: 110 f.write('#{:>20} {}\n'.format(k + ':', v)) 111 f.write('### END INIT INFO\ncode --goes here\n') 112 os.chmod(script, 0o755) 113 114 if enable: 115 def make_link(prefix, runlevel): 116 d = os.path.join(self.rcnd_dir, 'rc{}.d'.format(runlevel)) 117 if not os.path.isdir(d): 118 os.mkdir(d) 119 os.symlink('../init.d/' + fname, os.path.join(d, prefix + fname)) 120 121 for rl in keys['Default-Start'].split(): 122 make_link('S%02i' % prio, rl) 123 for rl in keys['Default-Stop'].split(): 124 make_link('K%02i' % (99 - prio), rl) 125 126 return script 127 128 def assert_enabled(self, unit, targets): 129 '''assert that a unit is enabled in precisely the given targets''' 130 131 all_targets = ['multi-user', 'graphical'] 132 133 # should be enabled 134 for target in all_targets: 135 link = os.path.join(self.out_dir, '{}.target.wants'.format(target), unit) 136 if target in targets: 137 unit_file = os.readlink(link) 138 # os.path.exists() will fail on a dangling symlink 139 self.assertTrue(os.path.exists(link)) 140 self.assertEqual(os.path.basename(unit_file), unit) 141 else: 142 self.assertFalse(os.path.exists(link), 143 '{} unexpectedly exists'.format(link)) 144 145 # 146 # test cases 147 # 148 149 def test_nothing(self): 150 '''no input files''' 151 152 results = self.run_generator()[1] 153 self.assertEqual(results, {}) 154 self.assertEqual(os.listdir(self.out_dir), []) 155 156 def test_simple_disabled(self): 157 '''simple service without dependencies, disabled''' 158 159 self.add_sysv('foo', {}, enable=False) 160 err, results = self.run_generator() 161 self.assertEqual(len(results), 1) 162 163 # no enablement links or other stuff 164 self.assertEqual(os.listdir(self.out_dir), ['foo.service']) 165 166 s = results['foo.service'] 167 self.assertEqual(s.sections(), ['Unit', 'Service']) 168 self.assertEqual(s.get('Unit', 'Description'), 'LSB: test foo service') 169 # $local_fs does not need translation, don't expect any dependency 170 # fields here 171 self.assertEqual(set(s.options('Unit')), 172 set(['Documentation', 'SourcePath', 'Description'])) 173 174 self.assertEqual(s.get('Service', 'Type'), 'forking') 175 init_script = os.path.join(self.init_d_dir, 'foo') 176 self.assertEqual(s.get('Service', 'ExecStart'), 177 '{} start'.format(init_script)) 178 self.assertEqual(s.get('Service', 'ExecStop'), 179 '{} stop'.format(init_script)) 180 181 self.assertNotIn('Overwriting', err) 182 183 def test_simple_enabled_all(self): 184 '''simple service without dependencies, enabled in all runlevels''' 185 186 self.add_sysv('foo', {}, enable=True) 187 err, results = self.run_generator() 188 self.assertEqual(list(results), ['foo.service']) 189 self.assert_enabled('foo.service', ['multi-user', 'graphical']) 190 self.assertNotIn('Overwriting', err) 191 192 def test_simple_escaped(self): 193 '''simple service without dependencies, that requires escaping the name''' 194 195 self.add_sysv('foo+', {}) 196 self.add_sysv('foo-admin', {}) 197 err, results = self.run_generator() 198 self.assertEqual(set(results), {'foo-admin.service', 'foo\\x2b.service'}) 199 self.assertNotIn('Overwriting', err) 200 201 def test_simple_enabled_some(self): 202 '''simple service without dependencies, enabled in some runlevels''' 203 204 self.add_sysv('foo', {'Default-Start': '2 4'}, enable=True) 205 err, results = self.run_generator() 206 self.assertEqual(list(results), ['foo.service']) 207 self.assert_enabled('foo.service', ['multi-user']) 208 209 def test_lsb_macro_dep_single(self): 210 '''single LSB macro dependency: $network''' 211 212 self.add_sysv('foo', {'Required-Start': '$network'}) 213 s = self.run_generator()[1]['foo.service'] 214 self.assertEqual(set(s.options('Unit')), 215 set(['Documentation', 'SourcePath', 'Description', 'After', 'Wants'])) 216 self.assertEqual(s.get('Unit', 'After'), 'network-online.target') 217 self.assertEqual(s.get('Unit', 'Wants'), 'network-online.target') 218 219 def test_lsb_macro_dep_multi(self): 220 '''multiple LSB macro dependencies''' 221 222 self.add_sysv('foo', {'Required-Start': '$named $portmap'}) 223 s = self.run_generator()[1]['foo.service'] 224 self.assertEqual(set(s.options('Unit')), 225 set(['Documentation', 'SourcePath', 'Description', 'After'])) 226 self.assertEqual(s.get('Unit', 'After').split(), ['nss-lookup.target', 'rpcbind.target']) 227 228 def test_lsb_deps(self): 229 '''LSB header dependencies to other services''' 230 231 # also give symlink priorities here; they should be ignored 232 self.add_sysv('foo', {'Required-Start': 'must1 must2', 233 'Should-Start': 'may1 ne_may2'}, 234 enable=True, prio=40) 235 self.add_sysv('must1', {}, enable=True, prio=10) 236 self.add_sysv('must2', {}, enable=True, prio=15) 237 self.add_sysv('may1', {}, enable=True, prio=20) 238 # do not create ne_may2 239 err, results = self.run_generator() 240 self.assertEqual(sorted(results), 241 ['foo.service', 'may1.service', 'must1.service', 'must2.service']) 242 243 # foo should depend on all of them 244 self.assertEqual(sorted(results['foo.service'].get('Unit', 'After').split()), 245 ['may1.service', 'must1.service', 'must2.service', 'ne_may2.service']) 246 247 # other services should not depend on each other 248 self.assertFalse(results['must1.service'].has_option('Unit', 'After')) 249 self.assertFalse(results['must2.service'].has_option('Unit', 'After')) 250 self.assertFalse(results['may1.service'].has_option('Unit', 'After')) 251 252 def test_symlink_prio_deps(self): 253 '''script without LSB headers use rcN.d priority''' 254 255 # create two init.d scripts without LSB header and enable them with 256 # startup priorities 257 for prio, name in [(10, 'provider'), (15, 'consumer')]: 258 with open(os.path.join(self.init_d_dir, name), 'w') as f: 259 f.write('#!/bin/init-d-interpreter\ncode --goes here\n') 260 os.fchmod(f.fileno(), 0o755) 261 262 d = os.path.join(self.rcnd_dir, 'rc2.d') 263 if not os.path.isdir(d): 264 os.mkdir(d) 265 os.symlink('../init.d/' + name, os.path.join(d, 'S{:>2}{}'.format(prio, name))) 266 267 err, results = self.run_generator() 268 self.assertEqual(sorted(results), ['consumer.service', 'provider.service']) 269 self.assertFalse(results['provider.service'].has_option('Unit', 'After')) 270 self.assertEqual(results['consumer.service'].get('Unit', 'After'), 271 'provider.service') 272 273 def test_multiple_provides(self): 274 '''multiple Provides: names''' 275 276 self.add_sysv('foo', {'Provides': 'foo bar baz'}) 277 err, results = self.run_generator() 278 self.assertEqual(list(results), ['foo.service']) 279 self.assertEqual(set(results['foo.service'].options('Unit')), 280 set(['Documentation', 'SourcePath', 'Description'])) 281 # should create symlinks for the alternative names 282 for f in ['bar.service', 'baz.service']: 283 self.assertEqual(os.readlink(os.path.join(self.out_dir, f)), 284 'foo.service') 285 self.assertNotIn('Overwriting', err) 286 287 def test_provides_escaped(self): 288 '''a script that Provides: a name that requires escaping''' 289 290 self.add_sysv('foo', {'Provides': 'foo foo+'}) 291 err, results = self.run_generator() 292 self.assertEqual(list(results), ['foo.service']) 293 self.assertEqual(os.readlink(os.path.join(self.out_dir, 'foo\\x2b.service')), 294 'foo.service') 295 self.assertNotIn('Overwriting', err) 296 297 def test_same_provides_in_multiple_scripts(self): 298 '''multiple init.d scripts provide the same name''' 299 300 self.add_sysv('foo', {'Provides': 'foo common'}, enable=True, prio=1) 301 self.add_sysv('bar', {'Provides': 'bar common'}, enable=True, prio=2) 302 err, results = self.run_generator() 303 self.assertEqual(sorted(results), ['bar.service', 'foo.service']) 304 # should create symlink for the alternative name for either unit 305 self.assertIn(os.readlink(os.path.join(self.out_dir, 'common.service')), 306 ['foo.service', 'bar.service']) 307 308 def test_provide_other_script(self): 309 '''init.d scripts provides the name of another init.d script''' 310 311 self.add_sysv('foo', {'Provides': 'foo bar'}, enable=True) 312 self.add_sysv('bar', {'Provides': 'bar'}, enable=True) 313 err, results = self.run_generator() 314 self.assertEqual(sorted(results), ['bar.service', 'foo.service']) 315 # we do expect an overwrite here, bar.service should overwrite the 316 # alias link from foo.service 317 self.assertIn('Overwriting', err) 318 319 def test_nonexecutable_script(self): 320 '''ignores non-executable init.d script''' 321 322 os.chmod(self.add_sysv('foo', {}), 0o644) 323 err, results = self.run_generator() 324 self.assertEqual(results, {}) 325 326 def test_sh_suffix(self): 327 '''init.d script with .sh suffix''' 328 329 self.add_sysv('foo.sh', {}, enable=True) 330 err, results = self.run_generator() 331 s = results['foo.service'] 332 333 self.assertEqual(s.sections(), ['Unit', 'Service']) 334 # should not have a .sh 335 self.assertEqual(s.get('Unit', 'Description'), 'LSB: test foo service') 336 337 # calls correct script with .sh 338 init_script = os.path.join(self.init_d_dir, 'foo.sh') 339 self.assertEqual(s.get('Service', 'ExecStart'), 340 '{} start'.format(init_script)) 341 self.assertEqual(s.get('Service', 'ExecStop'), 342 '{} stop'.format(init_script)) 343 344 self.assert_enabled('foo.service', ['multi-user', 'graphical']) 345 346 def test_sh_suffix_with_provides(self): 347 '''init.d script with .sh suffix and Provides:''' 348 349 self.add_sysv('foo.sh', {'Provides': 'foo bar'}) 350 err, results = self.run_generator() 351 # ensure we don't try to create a symlink to itself 352 self.assertNotIn('itself', err) 353 self.assertEqual(list(results), ['foo.service']) 354 self.assertEqual(results['foo.service'].get('Unit', 'Description'), 355 'LSB: test foo service') 356 357 # should create symlink for the alternative name 358 self.assertEqual(os.readlink(os.path.join(self.out_dir, 'bar.service')), 359 'foo.service') 360 361 def test_hidden_files(self): 362 '''init.d script with hidden file suffix''' 363 364 script = self.add_sysv('foo', {}, enable=True) 365 # backup files (not enabled in rcN.d/) 366 shutil.copy(script, script + '.dpkg-new') 367 shutil.copy(script, script + '.dpkg-dist') 368 shutil.copy(script, script + '.swp') 369 shutil.copy(script, script + '.rpmsave') 370 371 err, results = self.run_generator() 372 self.assertEqual(list(results), ['foo.service']) 373 374 self.assert_enabled('foo.service', ['multi-user', 'graphical']) 375 376 def test_backup_file(self): 377 '''init.d script with backup file''' 378 379 script = self.add_sysv('foo', {}, enable=True) 380 # backup files (not enabled in rcN.d/) 381 shutil.copy(script, script + '.bak') 382 shutil.copy(script, script + '.old') 383 shutil.copy(script, script + '.tmp') 384 shutil.copy(script, script + '.new') 385 386 err, results = self.run_generator() 387 print(err) 388 self.assertEqual(sorted(results), ['foo.service', 'foo.tmp.service']) 389 390 # ensure we don't try to create a symlink to itself 391 self.assertNotIn('itself', err) 392 393 self.assert_enabled('foo.service', ['multi-user', 'graphical']) 394 self.assert_enabled('foo.bak.service', []) 395 self.assert_enabled('foo.old.service', []) 396 397 def test_existing_native_unit(self): 398 '''existing native unit''' 399 400 with open(os.path.join(self.unit_dir, 'foo.service'), 'w') as f: 401 f.write('[Unit]\n') 402 403 self.add_sysv('foo.sh', {'Provides': 'foo bar'}, enable=True) 404 err, results = self.run_generator() 405 self.assertEqual(list(results), []) 406 # no enablement or alias links, as native unit is disabled 407 self.assertEqual(os.listdir(self.out_dir), []) 408 409 410if __name__ == '__main__': 411 unittest.main(testRunner=unittest.TextTestRunner(stream=sys.stdout, verbosity=2)) 412