1# SPDX-License-Identifier: GPL-2.0 2# 3# Runs UML kernel, collects output, and handles errors. 4# 5# Copyright (C) 2019, Google LLC. 6# Author: Felix Guo <felixguoxiuping@gmail.com> 7# Author: Brendan Higgins <brendanhiggins@google.com> 8 9import importlib.abc 10import importlib.util 11import logging 12import subprocess 13import os 14import shlex 15import shutil 16import signal 17import threading 18from typing import Iterator, List, Optional, Tuple 19 20import kunit_config 21from kunit_printer import stdout 22import qemu_config 23 24KCONFIG_PATH = '.config' 25KUNITCONFIG_PATH = '.kunitconfig' 26OLD_KUNITCONFIG_PATH = 'last_used_kunitconfig' 27DEFAULT_KUNITCONFIG_PATH = 'tools/testing/kunit/configs/default.config' 28ALL_TESTS_CONFIG_PATH = 'tools/testing/kunit/configs/all_tests.config' 29UML_KCONFIG_PATH = 'tools/testing/kunit/configs/arch_uml.config' 30OUTFILE_PATH = 'test.log' 31ABS_TOOL_PATH = os.path.abspath(os.path.dirname(__file__)) 32QEMU_CONFIGS_DIR = os.path.join(ABS_TOOL_PATH, 'qemu_configs') 33 34class ConfigError(Exception): 35 """Represents an error trying to configure the Linux kernel.""" 36 37 38class BuildError(Exception): 39 """Represents an error trying to build the Linux kernel.""" 40 41 42class LinuxSourceTreeOperations: 43 """An abstraction over command line operations performed on a source tree.""" 44 45 def __init__(self, linux_arch: str, cross_compile: Optional[str]): 46 self._linux_arch = linux_arch 47 self._cross_compile = cross_compile 48 49 def make_mrproper(self) -> None: 50 try: 51 subprocess.check_output(['make', 'mrproper'], stderr=subprocess.STDOUT) 52 except OSError as e: 53 raise ConfigError('Could not call make command: ' + str(e)) 54 except subprocess.CalledProcessError as e: 55 raise ConfigError(e.output.decode()) 56 57 def make_arch_config(self, base_kunitconfig: kunit_config.Kconfig) -> kunit_config.Kconfig: 58 return base_kunitconfig 59 60 def make_olddefconfig(self, build_dir: str, make_options) -> None: 61 command = ['make', 'ARCH=' + self._linux_arch, 'O=' + build_dir, 'olddefconfig'] 62 if self._cross_compile: 63 command += ['CROSS_COMPILE=' + self._cross_compile] 64 if make_options: 65 command.extend(make_options) 66 print('Populating config with:\n$', ' '.join(command)) 67 try: 68 subprocess.check_output(command, stderr=subprocess.STDOUT) 69 except OSError as e: 70 raise ConfigError('Could not call make command: ' + str(e)) 71 except subprocess.CalledProcessError as e: 72 raise ConfigError(e.output.decode()) 73 74 def make(self, jobs, build_dir: str, make_options) -> None: 75 command = ['make', 'ARCH=' + self._linux_arch, 'O=' + build_dir, '--jobs=' + str(jobs)] 76 if make_options: 77 command.extend(make_options) 78 if self._cross_compile: 79 command += ['CROSS_COMPILE=' + self._cross_compile] 80 print('Building with:\n$', ' '.join(command)) 81 try: 82 proc = subprocess.Popen(command, 83 stderr=subprocess.PIPE, 84 stdout=subprocess.DEVNULL) 85 except OSError as e: 86 raise BuildError('Could not call execute make: ' + str(e)) 87 except subprocess.CalledProcessError as e: 88 raise BuildError(e.output) 89 _, stderr = proc.communicate() 90 if proc.returncode != 0: 91 raise BuildError(stderr.decode()) 92 if stderr: # likely only due to build warnings 93 print(stderr.decode()) 94 95 def start(self, params: List[str], build_dir: str) -> subprocess.Popen: 96 raise RuntimeError('not implemented!') 97 98 99class LinuxSourceTreeOperationsQemu(LinuxSourceTreeOperations): 100 101 def __init__(self, qemu_arch_params: qemu_config.QemuArchParams, cross_compile: Optional[str]): 102 super().__init__(linux_arch=qemu_arch_params.linux_arch, 103 cross_compile=cross_compile) 104 self._kconfig = qemu_arch_params.kconfig 105 self._qemu_arch = qemu_arch_params.qemu_arch 106 self._kernel_path = qemu_arch_params.kernel_path 107 self._kernel_command_line = qemu_arch_params.kernel_command_line + ' kunit_shutdown=reboot' 108 self._extra_qemu_params = qemu_arch_params.extra_qemu_params 109 110 def make_arch_config(self, base_kunitconfig: kunit_config.Kconfig) -> kunit_config.Kconfig: 111 kconfig = kunit_config.parse_from_string(self._kconfig) 112 kconfig.merge_in_entries(base_kunitconfig) 113 return kconfig 114 115 def start(self, params: List[str], build_dir: str) -> subprocess.Popen: 116 kernel_path = os.path.join(build_dir, self._kernel_path) 117 qemu_command = ['qemu-system-' + self._qemu_arch, 118 '-nodefaults', 119 '-m', '1024', 120 '-kernel', kernel_path, 121 '-append', ' '.join(params + [self._kernel_command_line]), 122 '-no-reboot', 123 '-nographic', 124 '-serial', 'stdio'] + self._extra_qemu_params 125 # Note: shlex.join() does what we want, but requires python 3.8+. 126 print('Running tests with:\n$', ' '.join(shlex.quote(arg) for arg in qemu_command)) 127 return subprocess.Popen(qemu_command, 128 stdin=subprocess.PIPE, 129 stdout=subprocess.PIPE, 130 stderr=subprocess.STDOUT, 131 text=True, errors='backslashreplace') 132 133class LinuxSourceTreeOperationsUml(LinuxSourceTreeOperations): 134 """An abstraction over command line operations performed on a source tree.""" 135 136 def __init__(self, cross_compile=None): 137 super().__init__(linux_arch='um', cross_compile=cross_compile) 138 139 def make_arch_config(self, base_kunitconfig: kunit_config.Kconfig) -> kunit_config.Kconfig: 140 kconfig = kunit_config.parse_file(UML_KCONFIG_PATH) 141 kconfig.merge_in_entries(base_kunitconfig) 142 return kconfig 143 144 def start(self, params: List[str], build_dir: str) -> subprocess.Popen: 145 """Runs the Linux UML binary. Must be named 'linux'.""" 146 linux_bin = os.path.join(build_dir, 'linux') 147 params.extend(['mem=1G', 'console=tty', 'kunit_shutdown=halt']) 148 return subprocess.Popen([linux_bin] + params, 149 stdin=subprocess.PIPE, 150 stdout=subprocess.PIPE, 151 stderr=subprocess.STDOUT, 152 text=True, errors='backslashreplace') 153 154def get_kconfig_path(build_dir: str) -> str: 155 return os.path.join(build_dir, KCONFIG_PATH) 156 157def get_kunitconfig_path(build_dir: str) -> str: 158 return os.path.join(build_dir, KUNITCONFIG_PATH) 159 160def get_old_kunitconfig_path(build_dir: str) -> str: 161 return os.path.join(build_dir, OLD_KUNITCONFIG_PATH) 162 163def get_parsed_kunitconfig(build_dir: str, 164 kunitconfig_paths: Optional[List[str]]=None) -> kunit_config.Kconfig: 165 if not kunitconfig_paths: 166 path = get_kunitconfig_path(build_dir) 167 if not os.path.exists(path): 168 shutil.copyfile(DEFAULT_KUNITCONFIG_PATH, path) 169 return kunit_config.parse_file(path) 170 171 merged = kunit_config.Kconfig() 172 173 for path in kunitconfig_paths: 174 if os.path.isdir(path): 175 path = os.path.join(path, KUNITCONFIG_PATH) 176 if not os.path.exists(path): 177 raise ConfigError(f'Specified kunitconfig ({path}) does not exist') 178 179 partial = kunit_config.parse_file(path) 180 diff = merged.conflicting_options(partial) 181 if diff: 182 diff_str = '\n\n'.join(f'{a}\n vs from {path}\n{b}' for a, b in diff) 183 raise ConfigError(f'Multiple values specified for {len(diff)} options in kunitconfig:\n{diff_str}') 184 merged.merge_in_entries(partial) 185 return merged 186 187def get_outfile_path(build_dir: str) -> str: 188 return os.path.join(build_dir, OUTFILE_PATH) 189 190def _default_qemu_config_path(arch: str) -> str: 191 config_path = os.path.join(QEMU_CONFIGS_DIR, arch + '.py') 192 if os.path.isfile(config_path): 193 return config_path 194 195 options = [f[:-3] for f in os.listdir(QEMU_CONFIGS_DIR) if f.endswith('.py')] 196 raise ConfigError(arch + ' is not a valid arch, options are ' + str(sorted(options))) 197 198def _get_qemu_ops(config_path: str, 199 extra_qemu_args: Optional[List[str]], 200 cross_compile: Optional[str]) -> Tuple[str, LinuxSourceTreeOperations]: 201 # The module name/path has very little to do with where the actual file 202 # exists (I learned this through experimentation and could not find it 203 # anywhere in the Python documentation). 204 # 205 # Bascially, we completely ignore the actual file location of the config 206 # we are loading and just tell Python that the module lives in the 207 # QEMU_CONFIGS_DIR for import purposes regardless of where it actually 208 # exists as a file. 209 module_path = '.' + os.path.join(os.path.basename(QEMU_CONFIGS_DIR), os.path.basename(config_path)) 210 spec = importlib.util.spec_from_file_location(module_path, config_path) 211 assert spec is not None 212 config = importlib.util.module_from_spec(spec) 213 # See https://github.com/python/typeshed/pull/2626 for context. 214 assert isinstance(spec.loader, importlib.abc.Loader) 215 spec.loader.exec_module(config) 216 217 if not hasattr(config, 'QEMU_ARCH'): 218 raise ValueError('qemu_config module missing "QEMU_ARCH": ' + config_path) 219 params: qemu_config.QemuArchParams = config.QEMU_ARCH # type: ignore 220 if extra_qemu_args: 221 params.extra_qemu_params.extend(extra_qemu_args) 222 return params.linux_arch, LinuxSourceTreeOperationsQemu( 223 params, cross_compile=cross_compile) 224 225class LinuxSourceTree: 226 """Represents a Linux kernel source tree with KUnit tests.""" 227 228 def __init__( 229 self, 230 build_dir: str, 231 kunitconfig_paths: Optional[List[str]]=None, 232 kconfig_add: Optional[List[str]]=None, 233 arch=None, 234 cross_compile=None, 235 qemu_config_path=None, 236 extra_qemu_args=None) -> None: 237 signal.signal(signal.SIGINT, self.signal_handler) 238 if qemu_config_path: 239 self._arch, self._ops = _get_qemu_ops(qemu_config_path, extra_qemu_args, cross_compile) 240 else: 241 self._arch = 'um' if arch is None else arch 242 if self._arch == 'um': 243 self._ops = LinuxSourceTreeOperationsUml(cross_compile=cross_compile) 244 else: 245 qemu_config_path = _default_qemu_config_path(self._arch) 246 _, self._ops = _get_qemu_ops(qemu_config_path, extra_qemu_args, cross_compile) 247 248 self._kconfig = get_parsed_kunitconfig(build_dir, kunitconfig_paths) 249 if kconfig_add: 250 kconfig = kunit_config.parse_from_string('\n'.join(kconfig_add)) 251 self._kconfig.merge_in_entries(kconfig) 252 253 def arch(self) -> str: 254 return self._arch 255 256 def clean(self) -> bool: 257 try: 258 self._ops.make_mrproper() 259 except ConfigError as e: 260 logging.error(e) 261 return False 262 return True 263 264 def validate_config(self, build_dir: str) -> bool: 265 kconfig_path = get_kconfig_path(build_dir) 266 validated_kconfig = kunit_config.parse_file(kconfig_path) 267 if self._kconfig.is_subset_of(validated_kconfig): 268 return True 269 missing = set(self._kconfig.as_entries()) - set(validated_kconfig.as_entries()) 270 message = 'Not all Kconfig options selected in kunitconfig were in the generated .config.\n' \ 271 'This is probably due to unsatisfied dependencies.\n' \ 272 'Missing: ' + ', '.join(str(e) for e in missing) 273 if self._arch == 'um': 274 message += '\nNote: many Kconfig options aren\'t available on UML. You can try running ' \ 275 'on a different architecture with something like "--arch=x86_64".' 276 logging.error(message) 277 return False 278 279 def build_config(self, build_dir: str, make_options) -> bool: 280 kconfig_path = get_kconfig_path(build_dir) 281 if build_dir and not os.path.exists(build_dir): 282 os.mkdir(build_dir) 283 try: 284 self._kconfig = self._ops.make_arch_config(self._kconfig) 285 self._kconfig.write_to_file(kconfig_path) 286 self._ops.make_olddefconfig(build_dir, make_options) 287 except ConfigError as e: 288 logging.error(e) 289 return False 290 if not self.validate_config(build_dir): 291 return False 292 293 old_path = get_old_kunitconfig_path(build_dir) 294 if os.path.exists(old_path): 295 os.remove(old_path) # write_to_file appends to the file 296 self._kconfig.write_to_file(old_path) 297 return True 298 299 def _kunitconfig_changed(self, build_dir: str) -> bool: 300 old_path = get_old_kunitconfig_path(build_dir) 301 if not os.path.exists(old_path): 302 return True 303 304 old_kconfig = kunit_config.parse_file(old_path) 305 return old_kconfig != self._kconfig 306 307 def build_reconfig(self, build_dir: str, make_options) -> bool: 308 """Creates a new .config if it is not a subset of the .kunitconfig.""" 309 kconfig_path = get_kconfig_path(build_dir) 310 if not os.path.exists(kconfig_path): 311 print('Generating .config ...') 312 return self.build_config(build_dir, make_options) 313 314 existing_kconfig = kunit_config.parse_file(kconfig_path) 315 self._kconfig = self._ops.make_arch_config(self._kconfig) 316 317 if self._kconfig.is_subset_of(existing_kconfig) and not self._kunitconfig_changed(build_dir): 318 return True 319 print('Regenerating .config ...') 320 os.remove(kconfig_path) 321 return self.build_config(build_dir, make_options) 322 323 def build_kernel(self, jobs, build_dir: str, make_options) -> bool: 324 try: 325 self._ops.make_olddefconfig(build_dir, make_options) 326 self._ops.make(jobs, build_dir, make_options) 327 except (ConfigError, BuildError) as e: 328 logging.error(e) 329 return False 330 return self.validate_config(build_dir) 331 332 def run_kernel(self, args=None, build_dir='', filter_glob='', timeout=None) -> Iterator[str]: 333 if not args: 334 args = [] 335 if filter_glob: 336 args.append('kunit.filter_glob='+filter_glob) 337 args.append('kunit.enable=1') 338 339 process = self._ops.start(args, build_dir) 340 assert process.stdout is not None # tell mypy it's set 341 342 # Enforce the timeout in a background thread. 343 def _wait_proc(): 344 try: 345 process.wait(timeout=timeout) 346 except Exception as e: 347 print(e) 348 process.terminate() 349 process.wait() 350 waiter = threading.Thread(target=_wait_proc) 351 waiter.start() 352 353 output = open(get_outfile_path(build_dir), 'w') 354 try: 355 # Tee the output to the file and to our caller in real time. 356 for line in process.stdout: 357 output.write(line) 358 yield line 359 # This runs even if our caller doesn't consume every line. 360 finally: 361 # Flush any leftover output to the file 362 output.write(process.stdout.read()) 363 output.close() 364 process.stdout.close() 365 366 waiter.join() 367 subprocess.call(['stty', 'sane']) 368 369 def signal_handler(self, unused_sig, unused_frame) -> None: 370 logging.error('Build interruption occurred. Cleaning console.') 371 subprocess.call(['stty', 'sane']) 372