1#!/usr/bin/env python3
2# SPDX-License-Identifier: LGPL-2.1-or-later
3#
4#  Copyright © 2017 Michal Sekletar <msekleta@redhat.com>
5
6# ATTENTION: This uses the *installed* systemd, not the one from the built
7# source tree.
8
9import unittest
10import time
11import os
12import tempfile
13import subprocess
14import sys
15
16from enum import Enum
17
18class UnitFileChange(Enum):
19    NO_CHANGE = 0
20    LINES_SWAPPED = 1
21    COMMAND_ADDED_BEFORE = 2
22    COMMAND_ADDED_AFTER = 3
23    COMMAND_INTERLEAVED = 4
24    REMOVAL = 5
25
26class ExecutionResumeTest(unittest.TestCase):
27    def setUp(self):
28        self.unit = 'test-issue-518.service'
29        self.unitfile_path = '/run/systemd/system/{0}'.format(self.unit)
30        self.output_file = tempfile.mktemp()
31        self.unit_files = {}
32
33        unit_file_content = '''
34        [Service]
35        Type=oneshot
36        ExecStart=/bin/sleep 3
37        ExecStart=/bin/bash -c "echo foo >> {0}"
38        '''.format(self.output_file)
39        self.unit_files[UnitFileChange.NO_CHANGE] = unit_file_content
40
41        unit_file_content = '''
42        [Service]
43        Type=oneshot
44        ExecStart=/bin/bash -c "echo foo >> {0}"
45        ExecStart=/bin/sleep 3
46        '''.format(self.output_file)
47        self.unit_files[UnitFileChange.LINES_SWAPPED] = unit_file_content
48
49        unit_file_content = '''
50        [Service]
51        Type=oneshot
52        ExecStart=/bin/bash -c "echo bar >> {0}"
53        ExecStart=/bin/sleep 3
54        ExecStart=/bin/bash -c "echo foo >> {0}"
55        '''.format(self.output_file)
56        self.unit_files[UnitFileChange.COMMAND_ADDED_BEFORE] = unit_file_content
57
58        unit_file_content = '''
59        [Service]
60        Type=oneshot
61        ExecStart=/bin/sleep 3
62        ExecStart=/bin/bash -c "echo foo >> {0}"
63        ExecStart=/bin/bash -c "echo bar >> {0}"
64        '''.format(self.output_file)
65        self.unit_files[UnitFileChange.COMMAND_ADDED_AFTER] = unit_file_content
66
67        unit_file_content = '''
68        [Service]
69        Type=oneshot
70        ExecStart=/bin/bash -c "echo baz >> {0}"
71        ExecStart=/bin/sleep 3
72        ExecStart=/bin/bash -c "echo foo >> {0}"
73        ExecStart=/bin/bash -c "echo bar >> {0}"
74        '''.format(self.output_file)
75        self.unit_files[UnitFileChange.COMMAND_INTERLEAVED] = unit_file_content
76
77        unit_file_content = '''
78        [Service]
79        Type=oneshot
80        ExecStart=/bin/bash -c "echo bar >> {0}"
81        ExecStart=/bin/bash -c "echo baz >> {0}"
82        '''.format(self.output_file)
83        self.unit_files[UnitFileChange.REMOVAL] = unit_file_content
84
85    def reload(self):
86        subprocess.check_call(['systemctl', 'daemon-reload'])
87
88    def write_unit_file(self, unit_file_change):
89        if not isinstance(unit_file_change, UnitFileChange):
90            raise ValueError('Unknown unit file change')
91
92        content = self.unit_files[unit_file_change]
93
94        with open(self.unitfile_path, 'w') as f:
95            f.write(content)
96
97        self.reload()
98
99    def check_output(self, expected_output):
100        try:
101            with open(self.output_file, 'r') as log:
102                output = log.read()
103        except IOError:
104            self.fail()
105
106        self.assertEqual(output, expected_output)
107
108    def setup_unit(self):
109        self.write_unit_file(UnitFileChange.NO_CHANGE)
110        subprocess.check_call(['systemctl', '--job-mode=replace', '--no-block', 'start', self.unit])
111        time.sleep(1)
112
113    def test_no_change(self):
114        expected_output = 'foo\n'
115
116        self.setup_unit()
117        self.reload()
118        time.sleep(4)
119
120        self.check_output(expected_output)
121
122    def test_swapped(self):
123        expected_output = ''
124
125        self.setup_unit()
126        self.write_unit_file(UnitFileChange.LINES_SWAPPED)
127        self.reload()
128        time.sleep(4)
129
130        self.assertTrue(not os.path.exists(self.output_file))
131
132    def test_added_before(self):
133        expected_output = 'foo\n'
134
135        self.setup_unit()
136        self.write_unit_file(UnitFileChange.COMMAND_ADDED_BEFORE)
137        self.reload()
138        time.sleep(4)
139
140        self.check_output(expected_output)
141
142    def test_added_after(self):
143        expected_output = 'foo\nbar\n'
144
145        self.setup_unit()
146        self.write_unit_file(UnitFileChange.COMMAND_ADDED_AFTER)
147        self.reload()
148        time.sleep(4)
149
150        self.check_output(expected_output)
151
152    def test_interleaved(self):
153        expected_output = 'foo\nbar\n'
154
155        self.setup_unit()
156        self.write_unit_file(UnitFileChange.COMMAND_INTERLEAVED)
157        self.reload()
158        time.sleep(4)
159
160        self.check_output(expected_output)
161
162    def test_removal(self):
163        self.setup_unit()
164        self.write_unit_file(UnitFileChange.REMOVAL)
165        self.reload()
166        time.sleep(4)
167
168        self.assertTrue(not os.path.exists(self.output_file))
169
170    def test_issue_6533(self):
171        unit = "test-issue-6533.service"
172        unitfile_path = "/run/systemd/system/{}".format(unit)
173
174        content = '''
175        [Service]
176        ExecStart=/bin/sleep 5
177        '''
178
179        with open(unitfile_path, 'w') as f:
180            f.write(content)
181
182        self.reload()
183
184        subprocess.check_call(['systemctl', '--job-mode=replace', '--no-block', 'start', unit])
185        time.sleep(2)
186
187        content = '''
188        [Service]
189        ExecStart=/bin/sleep 5
190        ExecStart=/bin/true
191        '''
192
193        with open(unitfile_path, 'w') as f:
194            f.write(content)
195
196        self.reload()
197        time.sleep(5)
198
199        self.assertTrue(subprocess.call("journalctl -b _PID=1  | grep -q 'Freezing execution'", shell=True) != 0)
200
201    def tearDown(self):
202        for f in [self.output_file, self.unitfile_path]:
203            try:
204                os.remove(f)
205            except OSError:
206                # ignore error if log file doesn't exist
207                pass
208
209        self.reload()
210
211if __name__ == '__main__':
212    unittest.main(testRunner=unittest.TextTestRunner(stream=sys.stdout, verbosity=3))
213