1#!/usr/bin/env python3 2# SPDX-License-Identifier: LGPL-2.1-or-later 3# 4# Copyright © 2017 Michal Sekletar <msekleta@redhat.com> 5 6# ATTENTION: This uses the *installed* systemd, not the one from the built 7# source tree. 8 9import unittest 10import time 11import os 12import tempfile 13import subprocess 14import sys 15 16from enum import Enum 17 18class UnitFileChange(Enum): 19 NO_CHANGE = 0 20 LINES_SWAPPED = 1 21 COMMAND_ADDED_BEFORE = 2 22 COMMAND_ADDED_AFTER = 3 23 COMMAND_INTERLEAVED = 4 24 REMOVAL = 5 25 26class ExecutionResumeTest(unittest.TestCase): 27 def setUp(self): 28 self.unit = 'test-issue-518.service' 29 self.unitfile_path = '/run/systemd/system/{0}'.format(self.unit) 30 self.output_file = tempfile.mktemp() 31 self.unit_files = {} 32 33 unit_file_content = ''' 34 [Service] 35 Type=oneshot 36 ExecStart=/bin/sleep 3 37 ExecStart=/bin/bash -c "echo foo >> {0}" 38 '''.format(self.output_file) 39 self.unit_files[UnitFileChange.NO_CHANGE] = unit_file_content 40 41 unit_file_content = ''' 42 [Service] 43 Type=oneshot 44 ExecStart=/bin/bash -c "echo foo >> {0}" 45 ExecStart=/bin/sleep 3 46 '''.format(self.output_file) 47 self.unit_files[UnitFileChange.LINES_SWAPPED] = unit_file_content 48 49 unit_file_content = ''' 50 [Service] 51 Type=oneshot 52 ExecStart=/bin/bash -c "echo bar >> {0}" 53 ExecStart=/bin/sleep 3 54 ExecStart=/bin/bash -c "echo foo >> {0}" 55 '''.format(self.output_file) 56 self.unit_files[UnitFileChange.COMMAND_ADDED_BEFORE] = unit_file_content 57 58 unit_file_content = ''' 59 [Service] 60 Type=oneshot 61 ExecStart=/bin/sleep 3 62 ExecStart=/bin/bash -c "echo foo >> {0}" 63 ExecStart=/bin/bash -c "echo bar >> {0}" 64 '''.format(self.output_file) 65 self.unit_files[UnitFileChange.COMMAND_ADDED_AFTER] = unit_file_content 66 67 unit_file_content = ''' 68 [Service] 69 Type=oneshot 70 ExecStart=/bin/bash -c "echo baz >> {0}" 71 ExecStart=/bin/sleep 3 72 ExecStart=/bin/bash -c "echo foo >> {0}" 73 ExecStart=/bin/bash -c "echo bar >> {0}" 74 '''.format(self.output_file) 75 self.unit_files[UnitFileChange.COMMAND_INTERLEAVED] = unit_file_content 76 77 unit_file_content = ''' 78 [Service] 79 Type=oneshot 80 ExecStart=/bin/bash -c "echo bar >> {0}" 81 ExecStart=/bin/bash -c "echo baz >> {0}" 82 '''.format(self.output_file) 83 self.unit_files[UnitFileChange.REMOVAL] = unit_file_content 84 85 def reload(self): 86 subprocess.check_call(['systemctl', 'daemon-reload']) 87 88 def write_unit_file(self, unit_file_change): 89 if not isinstance(unit_file_change, UnitFileChange): 90 raise ValueError('Unknown unit file change') 91 92 content = self.unit_files[unit_file_change] 93 94 with open(self.unitfile_path, 'w') as f: 95 f.write(content) 96 97 self.reload() 98 99 def check_output(self, expected_output): 100 try: 101 with open(self.output_file, 'r') as log: 102 output = log.read() 103 except IOError: 104 self.fail() 105 106 self.assertEqual(output, expected_output) 107 108 def setup_unit(self): 109 self.write_unit_file(UnitFileChange.NO_CHANGE) 110 subprocess.check_call(['systemctl', '--job-mode=replace', '--no-block', 'start', self.unit]) 111 time.sleep(1) 112 113 def test_no_change(self): 114 expected_output = 'foo\n' 115 116 self.setup_unit() 117 self.reload() 118 time.sleep(4) 119 120 self.check_output(expected_output) 121 122 def test_swapped(self): 123 expected_output = '' 124 125 self.setup_unit() 126 self.write_unit_file(UnitFileChange.LINES_SWAPPED) 127 self.reload() 128 time.sleep(4) 129 130 self.assertTrue(not os.path.exists(self.output_file)) 131 132 def test_added_before(self): 133 expected_output = 'foo\n' 134 135 self.setup_unit() 136 self.write_unit_file(UnitFileChange.COMMAND_ADDED_BEFORE) 137 self.reload() 138 time.sleep(4) 139 140 self.check_output(expected_output) 141 142 def test_added_after(self): 143 expected_output = 'foo\nbar\n' 144 145 self.setup_unit() 146 self.write_unit_file(UnitFileChange.COMMAND_ADDED_AFTER) 147 self.reload() 148 time.sleep(4) 149 150 self.check_output(expected_output) 151 152 def test_interleaved(self): 153 expected_output = 'foo\nbar\n' 154 155 self.setup_unit() 156 self.write_unit_file(UnitFileChange.COMMAND_INTERLEAVED) 157 self.reload() 158 time.sleep(4) 159 160 self.check_output(expected_output) 161 162 def test_removal(self): 163 self.setup_unit() 164 self.write_unit_file(UnitFileChange.REMOVAL) 165 self.reload() 166 time.sleep(4) 167 168 self.assertTrue(not os.path.exists(self.output_file)) 169 170 def test_issue_6533(self): 171 unit = "test-issue-6533.service" 172 unitfile_path = "/run/systemd/system/{}".format(unit) 173 174 content = ''' 175 [Service] 176 ExecStart=/bin/sleep 5 177 ''' 178 179 with open(unitfile_path, 'w') as f: 180 f.write(content) 181 182 self.reload() 183 184 subprocess.check_call(['systemctl', '--job-mode=replace', '--no-block', 'start', unit]) 185 time.sleep(2) 186 187 content = ''' 188 [Service] 189 ExecStart=/bin/sleep 5 190 ExecStart=/bin/true 191 ''' 192 193 with open(unitfile_path, 'w') as f: 194 f.write(content) 195 196 self.reload() 197 time.sleep(5) 198 199 self.assertTrue(subprocess.call("journalctl -b _PID=1 | grep -q 'Freezing execution'", shell=True) != 0) 200 201 def tearDown(self): 202 for f in [self.output_file, self.unitfile_path]: 203 try: 204 os.remove(f) 205 except OSError: 206 # ignore error if log file doesn't exist 207 pass 208 209 self.reload() 210 211if __name__ == '__main__': 212 unittest.main(testRunner=unittest.TextTestRunner(stream=sys.stdout, verbosity=3)) 213