1#!/usr/bin/env python3
2# SPDX-License-Identifier: LGPL-2.1-or-later
3#
4# systemd-sysv-generator integration test
5#
6# © 2015 Canonical Ltd.
7# Author: Martin Pitt <martin.pitt@ubuntu.com>
8
9import collections
10import os
11import shutil
12import subprocess
13import sys
14import tempfile
15import unittest
16
17from configparser import RawConfigParser
18from glob import glob
19
20sysv_generator = './systemd-sysv-generator'
21
22class MultiDict(collections.OrderedDict):
23    def __setitem__(self, key, value):
24        if isinstance(value, list) and key in self:
25            self[key].extend(value)
26        else:
27            super(MultiDict, self).__setitem__(key, value)
28
29class SysvGeneratorTest(unittest.TestCase):
30    def setUp(self):
31        self.workdir = tempfile.mkdtemp(prefix='sysv-gen-test.')
32        self.init_d_dir = os.path.join(self.workdir, 'init.d')
33        os.mkdir(self.init_d_dir)
34        self.rcnd_dir = self.workdir
35        self.unit_dir = os.path.join(self.workdir, 'systemd')
36        os.mkdir(self.unit_dir)
37        self.out_dir = os.path.join(self.workdir, 'output')
38        os.mkdir(self.out_dir)
39
40    def tearDown(self):
41        shutil.rmtree(self.workdir)
42
43    #
44    # Helper methods
45    #
46
47    def run_generator(self, expect_error=False):
48        '''Run sysv-generator.
49
50        Fail if stderr contains any "Fail", unless expect_error is True.
51        Return (stderr, filename -> ConfigParser) pair with output to stderr and
52        parsed generated units.
53        '''
54        env = os.environ.copy()
55        env['SYSTEMD_LOG_LEVEL'] = 'debug'
56        env['SYSTEMD_LOG_TARGET'] = 'console'
57        env['SYSTEMD_SYSVINIT_PATH'] = self.init_d_dir
58        env['SYSTEMD_SYSVRCND_PATH'] = self.rcnd_dir
59        env['SYSTEMD_UNIT_PATH'] = self.unit_dir
60        gen = subprocess.Popen(
61            [sysv_generator, 'ignored', 'ignored', self.out_dir],
62            stdout=subprocess.PIPE, stderr=subprocess.PIPE,
63            universal_newlines=True, env=env)
64        (out, err) = gen.communicate()
65        if not expect_error:
66            self.assertFalse('Fail' in err, err)
67        self.assertEqual(gen.returncode, 0, err)
68
69        results = {}
70        for service in glob(self.out_dir + '/*.service'):
71            if os.path.islink(service):
72                continue
73            try:
74                # for python3 we need here strict=False to parse multiple
75                # lines with the same key
76                cp = RawConfigParser(dict_type=MultiDict, strict=False)
77            except TypeError:
78                # RawConfigParser in python2 does not have the strict option
79                # but it allows multiple lines with the same key by default
80                cp = RawConfigParser(dict_type=MultiDict)
81            cp.optionxform = lambda o: o  # don't lower-case option names
82            with open(service) as f:
83                cp.readfp(f)
84            results[os.path.basename(service)] = cp
85
86        return (err, results)
87
88    def add_sysv(self, fname, keys, enable=False, prio=1):
89        '''Create a SysV init script with the given keys in the LSB header
90
91        There are sensible default values for all fields.
92        If enable is True, links will be created in the rcN.d dirs. In that
93        case, the priority can be given with "prio" (default to 1).
94
95        Return path of generated script.
96        '''
97        name_without_sh = fname.endswith('.sh') and fname[:-3] or fname
98        keys.setdefault('Provides', name_without_sh)
99        keys.setdefault('Required-Start', '$local_fs')
100        keys.setdefault('Required-Stop', keys['Required-Start'])
101        keys.setdefault('Default-Start', '2 3 4 5')
102        keys.setdefault('Default-Stop', '0 1 6')
103        keys.setdefault('Short-Description', 'test {} service'.format(name_without_sh))
104        keys.setdefault('Description', 'long description for test {} service'.format(name_without_sh))
105        script = os.path.join(self.init_d_dir, fname)
106        with open(script, 'w') as f:
107            f.write('#!/bin/init-d-interpreter\n### BEGIN INIT INFO\n')
108            for k, v in keys.items():
109                if v is not None:
110                    f.write('#{:>20} {}\n'.format(k + ':', v))
111            f.write('### END INIT INFO\ncode --goes here\n')
112        os.chmod(script, 0o755)
113
114        if enable:
115            def make_link(prefix, runlevel):
116                d = os.path.join(self.rcnd_dir, 'rc{}.d'.format(runlevel))
117                if not os.path.isdir(d):
118                    os.mkdir(d)
119                os.symlink('../init.d/' + fname, os.path.join(d, prefix + fname))
120
121            for rl in keys['Default-Start'].split():
122                make_link('S%02i' % prio, rl)
123            for rl in keys['Default-Stop'].split():
124                make_link('K%02i' % (99 - prio), rl)
125
126        return script
127
128    def assert_enabled(self, unit, targets):
129        '''assert that a unit is enabled in precisely the given targets'''
130
131        all_targets = ['multi-user', 'graphical']
132
133        # should be enabled
134        for target in all_targets:
135            link = os.path.join(self.out_dir, '{}.target.wants'.format(target), unit)
136            if target in targets:
137                unit_file = os.readlink(link)
138                # os.path.exists() will fail on a dangling symlink
139                self.assertTrue(os.path.exists(link))
140                self.assertEqual(os.path.basename(unit_file), unit)
141            else:
142                self.assertFalse(os.path.exists(link),
143                                 '{} unexpectedly exists'.format(link))
144
145    #
146    # test cases
147    #
148
149    def test_nothing(self):
150        '''no input files'''
151
152        results = self.run_generator()[1]
153        self.assertEqual(results, {})
154        self.assertEqual(os.listdir(self.out_dir), [])
155
156    def test_simple_disabled(self):
157        '''simple service without dependencies, disabled'''
158
159        self.add_sysv('foo', {}, enable=False)
160        err, results = self.run_generator()
161        self.assertEqual(len(results), 1)
162
163        # no enablement links or other stuff
164        self.assertEqual(os.listdir(self.out_dir), ['foo.service'])
165
166        s = results['foo.service']
167        self.assertEqual(s.sections(), ['Unit', 'Service'])
168        self.assertEqual(s.get('Unit', 'Description'), 'LSB: test foo service')
169        # $local_fs does not need translation, don't expect any dependency
170        # fields here
171        self.assertEqual(set(s.options('Unit')),
172                         set(['Documentation', 'SourcePath', 'Description']))
173
174        self.assertEqual(s.get('Service', 'Type'), 'forking')
175        init_script = os.path.join(self.init_d_dir, 'foo')
176        self.assertEqual(s.get('Service', 'ExecStart'),
177                         '{} start'.format(init_script))
178        self.assertEqual(s.get('Service', 'ExecStop'),
179                         '{} stop'.format(init_script))
180
181        self.assertNotIn('Overwriting', err)
182
183    def test_simple_enabled_all(self):
184        '''simple service without dependencies, enabled in all runlevels'''
185
186        self.add_sysv('foo', {}, enable=True)
187        err, results = self.run_generator()
188        self.assertEqual(list(results), ['foo.service'])
189        self.assert_enabled('foo.service', ['multi-user', 'graphical'])
190        self.assertNotIn('Overwriting', err)
191
192    def test_simple_escaped(self):
193        '''simple service without dependencies, that requires escaping the name'''
194
195        self.add_sysv('foo+', {})
196        self.add_sysv('foo-admin', {})
197        err, results = self.run_generator()
198        self.assertEqual(set(results), {'foo-admin.service', 'foo\\x2b.service'})
199        self.assertNotIn('Overwriting', err)
200
201    def test_simple_enabled_some(self):
202        '''simple service without dependencies, enabled in some runlevels'''
203
204        self.add_sysv('foo', {'Default-Start': '2 4'}, enable=True)
205        err, results = self.run_generator()
206        self.assertEqual(list(results), ['foo.service'])
207        self.assert_enabled('foo.service', ['multi-user'])
208
209    def test_lsb_macro_dep_single(self):
210        '''single LSB macro dependency: $network'''
211
212        self.add_sysv('foo', {'Required-Start': '$network'})
213        s = self.run_generator()[1]['foo.service']
214        self.assertEqual(set(s.options('Unit')),
215                         set(['Documentation', 'SourcePath', 'Description', 'After', 'Wants']))
216        self.assertEqual(s.get('Unit', 'After'), 'network-online.target')
217        self.assertEqual(s.get('Unit', 'Wants'), 'network-online.target')
218
219    def test_lsb_macro_dep_multi(self):
220        '''multiple LSB macro dependencies'''
221
222        self.add_sysv('foo', {'Required-Start': '$named $portmap'})
223        s = self.run_generator()[1]['foo.service']
224        self.assertEqual(set(s.options('Unit')),
225                         set(['Documentation', 'SourcePath', 'Description', 'After']))
226        self.assertEqual(s.get('Unit', 'After').split(), ['nss-lookup.target', 'rpcbind.target'])
227
228    def test_lsb_deps(self):
229        '''LSB header dependencies to other services'''
230
231        # also give symlink priorities here; they should be ignored
232        self.add_sysv('foo', {'Required-Start': 'must1 must2',
233                              'Should-Start': 'may1 ne_may2'},
234                      enable=True, prio=40)
235        self.add_sysv('must1', {}, enable=True, prio=10)
236        self.add_sysv('must2', {}, enable=True, prio=15)
237        self.add_sysv('may1', {}, enable=True, prio=20)
238        # do not create ne_may2
239        err, results = self.run_generator()
240        self.assertEqual(sorted(results),
241                         ['foo.service', 'may1.service', 'must1.service', 'must2.service'])
242
243        # foo should depend on all of them
244        self.assertEqual(sorted(results['foo.service'].get('Unit', 'After').split()),
245                         ['may1.service', 'must1.service', 'must2.service', 'ne_may2.service'])
246
247        # other services should not depend on each other
248        self.assertFalse(results['must1.service'].has_option('Unit', 'After'))
249        self.assertFalse(results['must2.service'].has_option('Unit', 'After'))
250        self.assertFalse(results['may1.service'].has_option('Unit', 'After'))
251
252    def test_symlink_prio_deps(self):
253        '''script without LSB headers use rcN.d priority'''
254
255        # create two init.d scripts without LSB header and enable them with
256        # startup priorities
257        for prio, name in [(10, 'provider'), (15, 'consumer')]:
258            with open(os.path.join(self.init_d_dir, name), 'w') as f:
259                f.write('#!/bin/init-d-interpreter\ncode --goes here\n')
260                os.fchmod(f.fileno(), 0o755)
261
262            d = os.path.join(self.rcnd_dir, 'rc2.d')
263            if not os.path.isdir(d):
264                os.mkdir(d)
265            os.symlink('../init.d/' + name, os.path.join(d, 'S{:>2}{}'.format(prio, name)))
266
267        err, results = self.run_generator()
268        self.assertEqual(sorted(results), ['consumer.service', 'provider.service'])
269        self.assertFalse(results['provider.service'].has_option('Unit', 'After'))
270        self.assertEqual(results['consumer.service'].get('Unit', 'After'),
271                         'provider.service')
272
273    def test_multiple_provides(self):
274        '''multiple Provides: names'''
275
276        self.add_sysv('foo', {'Provides': 'foo bar baz'})
277        err, results = self.run_generator()
278        self.assertEqual(list(results), ['foo.service'])
279        self.assertEqual(set(results['foo.service'].options('Unit')),
280                         set(['Documentation', 'SourcePath', 'Description']))
281        # should create symlinks for the alternative names
282        for f in ['bar.service', 'baz.service']:
283            self.assertEqual(os.readlink(os.path.join(self.out_dir, f)),
284                             'foo.service')
285        self.assertNotIn('Overwriting', err)
286
287    def test_provides_escaped(self):
288        '''a script that Provides: a name that requires escaping'''
289
290        self.add_sysv('foo', {'Provides': 'foo foo+'})
291        err, results = self.run_generator()
292        self.assertEqual(list(results), ['foo.service'])
293        self.assertEqual(os.readlink(os.path.join(self.out_dir, 'foo\\x2b.service')),
294                         'foo.service')
295        self.assertNotIn('Overwriting', err)
296
297    def test_same_provides_in_multiple_scripts(self):
298        '''multiple init.d scripts provide the same name'''
299
300        self.add_sysv('foo', {'Provides': 'foo common'}, enable=True, prio=1)
301        self.add_sysv('bar', {'Provides': 'bar common'}, enable=True, prio=2)
302        err, results = self.run_generator()
303        self.assertEqual(sorted(results), ['bar.service', 'foo.service'])
304        # should create symlink for the alternative name for either unit
305        self.assertIn(os.readlink(os.path.join(self.out_dir, 'common.service')),
306                      ['foo.service', 'bar.service'])
307
308    def test_provide_other_script(self):
309        '''init.d scripts provides the name of another init.d script'''
310
311        self.add_sysv('foo', {'Provides': 'foo bar'}, enable=True)
312        self.add_sysv('bar', {'Provides': 'bar'}, enable=True)
313        err, results = self.run_generator()
314        self.assertEqual(sorted(results), ['bar.service', 'foo.service'])
315        # we do expect an overwrite here, bar.service should overwrite the
316        # alias link from foo.service
317        self.assertIn('Overwriting', err)
318
319    def test_nonexecutable_script(self):
320        '''ignores non-executable init.d script'''
321
322        os.chmod(self.add_sysv('foo', {}), 0o644)
323        err, results = self.run_generator()
324        self.assertEqual(results, {})
325
326    def test_sh_suffix(self):
327        '''init.d script with .sh suffix'''
328
329        self.add_sysv('foo.sh', {}, enable=True)
330        err, results = self.run_generator()
331        s = results['foo.service']
332
333        self.assertEqual(s.sections(), ['Unit', 'Service'])
334        # should not have a .sh
335        self.assertEqual(s.get('Unit', 'Description'), 'LSB: test foo service')
336
337        # calls correct script with .sh
338        init_script = os.path.join(self.init_d_dir, 'foo.sh')
339        self.assertEqual(s.get('Service', 'ExecStart'),
340                         '{} start'.format(init_script))
341        self.assertEqual(s.get('Service', 'ExecStop'),
342                         '{} stop'.format(init_script))
343
344        self.assert_enabled('foo.service', ['multi-user', 'graphical'])
345
346    def test_sh_suffix_with_provides(self):
347        '''init.d script with .sh suffix and Provides:'''
348
349        self.add_sysv('foo.sh', {'Provides': 'foo bar'})
350        err, results = self.run_generator()
351        # ensure we don't try to create a symlink to itself
352        self.assertNotIn('itself', err)
353        self.assertEqual(list(results), ['foo.service'])
354        self.assertEqual(results['foo.service'].get('Unit', 'Description'),
355                         'LSB: test foo service')
356
357        # should create symlink for the alternative name
358        self.assertEqual(os.readlink(os.path.join(self.out_dir, 'bar.service')),
359                         'foo.service')
360
361    def test_hidden_files(self):
362        '''init.d script with hidden file suffix'''
363
364        script = self.add_sysv('foo', {}, enable=True)
365        # backup files (not enabled in rcN.d/)
366        shutil.copy(script, script + '.dpkg-new')
367        shutil.copy(script, script + '.dpkg-dist')
368        shutil.copy(script, script + '.swp')
369        shutil.copy(script, script + '.rpmsave')
370
371        err, results = self.run_generator()
372        self.assertEqual(list(results), ['foo.service'])
373
374        self.assert_enabled('foo.service', ['multi-user', 'graphical'])
375
376    def test_backup_file(self):
377        '''init.d script with backup file'''
378
379        script = self.add_sysv('foo', {}, enable=True)
380        # backup files (not enabled in rcN.d/)
381        shutil.copy(script, script + '.bak')
382        shutil.copy(script, script + '.old')
383        shutil.copy(script, script + '.tmp')
384        shutil.copy(script, script + '.new')
385
386        err, results = self.run_generator()
387        print(err)
388        self.assertEqual(sorted(results), ['foo.service', 'foo.tmp.service'])
389
390        # ensure we don't try to create a symlink to itself
391        self.assertNotIn('itself', err)
392
393        self.assert_enabled('foo.service', ['multi-user', 'graphical'])
394        self.assert_enabled('foo.bak.service', [])
395        self.assert_enabled('foo.old.service', [])
396
397    def test_existing_native_unit(self):
398        '''existing native unit'''
399
400        with open(os.path.join(self.unit_dir, 'foo.service'), 'w') as f:
401            f.write('[Unit]\n')
402
403        self.add_sysv('foo.sh', {'Provides': 'foo bar'}, enable=True)
404        err, results = self.run_generator()
405        self.assertEqual(list(results), [])
406        # no enablement or alias links, as native unit is disabled
407        self.assertEqual(os.listdir(self.out_dir), [])
408
409
410if __name__ == '__main__':
411    unittest.main(testRunner=unittest.TextTestRunner(stream=sys.stdout, verbosity=2))
412