1# SPDX-License-Identifier: GPL-2.0 2# 3# Parses KTAP test results from a kernel dmesg log and incrementally prints 4# results with reader-friendly format. Stores and returns test results in a 5# Test object. 6# 7# Copyright (C) 2019, Google LLC. 8# Author: Felix Guo <felixguoxiuping@gmail.com> 9# Author: Brendan Higgins <brendanhiggins@google.com> 10# Author: Rae Moar <rmoar@google.com> 11 12from __future__ import annotations 13import re 14import sys 15 16from enum import Enum, auto 17from typing import Iterable, Iterator, List, Optional, Tuple 18 19from kunit_printer import stdout 20 21class Test: 22 """ 23 A class to represent a test parsed from KTAP results. All KTAP 24 results within a test log are stored in a main Test object as 25 subtests. 26 27 Attributes: 28 status : TestStatus - status of the test 29 name : str - name of the test 30 expected_count : int - expected number of subtests (0 if single 31 test case and None if unknown expected number of subtests) 32 subtests : List[Test] - list of subtests 33 log : List[str] - log of KTAP lines that correspond to the test 34 counts : TestCounts - counts of the test statuses and errors of 35 subtests or of the test itself if the test is a single 36 test case. 37 """ 38 def __init__(self) -> None: 39 """Creates Test object with default attributes.""" 40 self.status = TestStatus.TEST_CRASHED 41 self.name = '' 42 self.expected_count = 0 # type: Optional[int] 43 self.subtests = [] # type: List[Test] 44 self.log = [] # type: List[str] 45 self.counts = TestCounts() 46 47 def __str__(self) -> str: 48 """Returns string representation of a Test class object.""" 49 return (f'Test({self.status}, {self.name}, {self.expected_count}, ' 50 f'{self.subtests}, {self.log}, {self.counts})') 51 52 def __repr__(self) -> str: 53 """Returns string representation of a Test class object.""" 54 return str(self) 55 56 def add_error(self, error_message: str) -> None: 57 """Records an error that occurred while parsing this test.""" 58 self.counts.errors += 1 59 stdout.print_with_timestamp(stdout.red('[ERROR]') + f' Test: {self.name}: {error_message}') 60 61class TestStatus(Enum): 62 """An enumeration class to represent the status of a test.""" 63 SUCCESS = auto() 64 FAILURE = auto() 65 SKIPPED = auto() 66 TEST_CRASHED = auto() 67 NO_TESTS = auto() 68 FAILURE_TO_PARSE_TESTS = auto() 69 70class TestCounts: 71 """ 72 Tracks the counts of statuses of all test cases and any errors within 73 a Test. 74 75 Attributes: 76 passed : int - the number of tests that have passed 77 failed : int - the number of tests that have failed 78 crashed : int - the number of tests that have crashed 79 skipped : int - the number of tests that have skipped 80 errors : int - the number of errors in the test and subtests 81 """ 82 def __init__(self): 83 """Creates TestCounts object with counts of all test 84 statuses and test errors set to 0. 85 """ 86 self.passed = 0 87 self.failed = 0 88 self.crashed = 0 89 self.skipped = 0 90 self.errors = 0 91 92 def __str__(self) -> str: 93 """Returns the string representation of a TestCounts object.""" 94 statuses = [('passed', self.passed), ('failed', self.failed), 95 ('crashed', self.crashed), ('skipped', self.skipped), 96 ('errors', self.errors)] 97 return f'Ran {self.total()} tests: ' + \ 98 ', '.join(f'{s}: {n}' for s, n in statuses if n > 0) 99 100 def total(self) -> int: 101 """Returns the total number of test cases within a test 102 object, where a test case is a test with no subtests. 103 """ 104 return (self.passed + self.failed + self.crashed + 105 self.skipped) 106 107 def add_subtest_counts(self, counts: TestCounts) -> None: 108 """ 109 Adds the counts of another TestCounts object to the current 110 TestCounts object. Used to add the counts of a subtest to the 111 parent test. 112 113 Parameters: 114 counts - a different TestCounts object whose counts 115 will be added to the counts of the TestCounts object 116 """ 117 self.passed += counts.passed 118 self.failed += counts.failed 119 self.crashed += counts.crashed 120 self.skipped += counts.skipped 121 self.errors += counts.errors 122 123 def get_status(self) -> TestStatus: 124 """Returns the aggregated status of a Test using test 125 counts. 126 """ 127 if self.total() == 0: 128 return TestStatus.NO_TESTS 129 if self.crashed: 130 # Crashes should take priority. 131 return TestStatus.TEST_CRASHED 132 if self.failed: 133 return TestStatus.FAILURE 134 if self.passed: 135 # No failures or crashes, looks good! 136 return TestStatus.SUCCESS 137 # We have only skipped tests. 138 return TestStatus.SKIPPED 139 140 def add_status(self, status: TestStatus) -> None: 141 """Increments the count for `status`.""" 142 if status == TestStatus.SUCCESS: 143 self.passed += 1 144 elif status == TestStatus.FAILURE: 145 self.failed += 1 146 elif status == TestStatus.SKIPPED: 147 self.skipped += 1 148 elif status != TestStatus.NO_TESTS: 149 self.crashed += 1 150 151class LineStream: 152 """ 153 A class to represent the lines of kernel output. 154 Provides a lazy peek()/pop() interface over an iterator of 155 (line#, text). 156 """ 157 _lines: Iterator[Tuple[int, str]] 158 _next: Tuple[int, str] 159 _need_next: bool 160 _done: bool 161 162 def __init__(self, lines: Iterator[Tuple[int, str]]): 163 """Creates a new LineStream that wraps the given iterator.""" 164 self._lines = lines 165 self._done = False 166 self._need_next = True 167 self._next = (0, '') 168 169 def _get_next(self) -> None: 170 """Advances the LineSteam to the next line, if necessary.""" 171 if not self._need_next: 172 return 173 try: 174 self._next = next(self._lines) 175 except StopIteration: 176 self._done = True 177 finally: 178 self._need_next = False 179 180 def peek(self) -> str: 181 """Returns the current line, without advancing the LineStream. 182 """ 183 self._get_next() 184 return self._next[1] 185 186 def pop(self) -> str: 187 """Returns the current line and advances the LineStream to 188 the next line. 189 """ 190 s = self.peek() 191 if self._done: 192 raise ValueError(f'LineStream: going past EOF, last line was {s}') 193 self._need_next = True 194 return s 195 196 def __bool__(self) -> bool: 197 """Returns True if stream has more lines.""" 198 self._get_next() 199 return not self._done 200 201 # Only used by kunit_tool_test.py. 202 def __iter__(self) -> Iterator[str]: 203 """Empties all lines stored in LineStream object into 204 Iterator object and returns the Iterator object. 205 """ 206 while bool(self): 207 yield self.pop() 208 209 def line_number(self) -> int: 210 """Returns the line number of the current line.""" 211 self._get_next() 212 return self._next[0] 213 214# Parsing helper methods: 215 216KTAP_START = re.compile(r'KTAP version ([0-9]+)$') 217TAP_START = re.compile(r'TAP version ([0-9]+)$') 218KTAP_END = re.compile('(List of all partitions:|' 219 'Kernel panic - not syncing: VFS:|reboot: System halted)') 220 221def extract_tap_lines(kernel_output: Iterable[str], lstrip=True) -> LineStream: 222 """Extracts KTAP lines from the kernel output.""" 223 def isolate_ktap_output(kernel_output: Iterable[str]) \ 224 -> Iterator[Tuple[int, str]]: 225 line_num = 0 226 started = False 227 for line in kernel_output: 228 line_num += 1 229 line = line.rstrip() # remove trailing \n 230 if not started and KTAP_START.search(line): 231 # start extracting KTAP lines and set prefix 232 # to number of characters before version line 233 prefix_len = len( 234 line.split('KTAP version')[0]) 235 started = True 236 yield line_num, line[prefix_len:] 237 elif not started and TAP_START.search(line): 238 # start extracting KTAP lines and set prefix 239 # to number of characters before version line 240 prefix_len = len(line.split('TAP version')[0]) 241 started = True 242 yield line_num, line[prefix_len:] 243 elif started and KTAP_END.search(line): 244 # stop extracting KTAP lines 245 break 246 elif started: 247 # remove the prefix and optionally any leading 248 # whitespace. Our parsing logic relies on this. 249 line = line[prefix_len:] 250 if lstrip: 251 line = line.lstrip() 252 yield line_num, line 253 return LineStream(lines=isolate_ktap_output(kernel_output)) 254 255KTAP_VERSIONS = [1] 256TAP_VERSIONS = [13, 14] 257 258def check_version(version_num: int, accepted_versions: List[int], 259 version_type: str, test: Test) -> None: 260 """ 261 Adds error to test object if version number is too high or too 262 low. 263 264 Parameters: 265 version_num - The inputted version number from the parsed KTAP or TAP 266 header line 267 accepted_version - List of accepted KTAP or TAP versions 268 version_type - 'KTAP' or 'TAP' depending on the type of 269 version line. 270 test - Test object for current test being parsed 271 """ 272 if version_num < min(accepted_versions): 273 test.add_error(f'{version_type} version lower than expected!') 274 elif version_num > max(accepted_versions): 275 test.add_error(f'{version_type} version higer than expected!') 276 277def parse_ktap_header(lines: LineStream, test: Test) -> bool: 278 """ 279 Parses KTAP/TAP header line and checks version number. 280 Returns False if fails to parse KTAP/TAP header line. 281 282 Accepted formats: 283 - 'KTAP version [version number]' 284 - 'TAP version [version number]' 285 286 Parameters: 287 lines - LineStream of KTAP output to parse 288 test - Test object for current test being parsed 289 290 Return: 291 True if successfully parsed KTAP/TAP header line 292 """ 293 ktap_match = KTAP_START.match(lines.peek()) 294 tap_match = TAP_START.match(lines.peek()) 295 if ktap_match: 296 version_num = int(ktap_match.group(1)) 297 check_version(version_num, KTAP_VERSIONS, 'KTAP', test) 298 elif tap_match: 299 version_num = int(tap_match.group(1)) 300 check_version(version_num, TAP_VERSIONS, 'TAP', test) 301 else: 302 return False 303 test.log.append(lines.pop()) 304 return True 305 306TEST_HEADER = re.compile(r'^# Subtest: (.*)$') 307 308def parse_test_header(lines: LineStream, test: Test) -> bool: 309 """ 310 Parses test header and stores test name in test object. 311 Returns False if fails to parse test header line. 312 313 Accepted format: 314 - '# Subtest: [test name]' 315 316 Parameters: 317 lines - LineStream of KTAP output to parse 318 test - Test object for current test being parsed 319 320 Return: 321 True if successfully parsed test header line 322 """ 323 match = TEST_HEADER.match(lines.peek()) 324 if not match: 325 return False 326 test.log.append(lines.pop()) 327 test.name = match.group(1) 328 return True 329 330TEST_PLAN = re.compile(r'1\.\.([0-9]+)') 331 332def parse_test_plan(lines: LineStream, test: Test) -> bool: 333 """ 334 Parses test plan line and stores the expected number of subtests in 335 test object. Reports an error if expected count is 0. 336 Returns False and sets expected_count to None if there is no valid test 337 plan. 338 339 Accepted format: 340 - '1..[number of subtests]' 341 342 Parameters: 343 lines - LineStream of KTAP output to parse 344 test - Test object for current test being parsed 345 346 Return: 347 True if successfully parsed test plan line 348 """ 349 match = TEST_PLAN.match(lines.peek()) 350 if not match: 351 test.expected_count = None 352 return False 353 test.log.append(lines.pop()) 354 expected_count = int(match.group(1)) 355 test.expected_count = expected_count 356 return True 357 358TEST_RESULT = re.compile(r'^(ok|not ok) ([0-9]+) (- )?([^#]*)( # .*)?$') 359 360TEST_RESULT_SKIP = re.compile(r'^(ok|not ok) ([0-9]+) (- )?(.*) # SKIP(.*)$') 361 362def peek_test_name_match(lines: LineStream, test: Test) -> bool: 363 """ 364 Matches current line with the format of a test result line and checks 365 if the name matches the name of the current test. 366 Returns False if fails to match format or name. 367 368 Accepted format: 369 - '[ok|not ok] [test number] [-] [test name] [optional skip 370 directive]' 371 372 Parameters: 373 lines - LineStream of KTAP output to parse 374 test - Test object for current test being parsed 375 376 Return: 377 True if matched a test result line and the name matching the 378 expected test name 379 """ 380 line = lines.peek() 381 match = TEST_RESULT.match(line) 382 if not match: 383 return False 384 name = match.group(4) 385 return name == test.name 386 387def parse_test_result(lines: LineStream, test: Test, 388 expected_num: int) -> bool: 389 """ 390 Parses test result line and stores the status and name in the test 391 object. Reports an error if the test number does not match expected 392 test number. 393 Returns False if fails to parse test result line. 394 395 Note that the SKIP directive is the only direction that causes a 396 change in status. 397 398 Accepted format: 399 - '[ok|not ok] [test number] [-] [test name] [optional skip 400 directive]' 401 402 Parameters: 403 lines - LineStream of KTAP output to parse 404 test - Test object for current test being parsed 405 expected_num - expected test number for current test 406 407 Return: 408 True if successfully parsed a test result line. 409 """ 410 line = lines.peek() 411 match = TEST_RESULT.match(line) 412 skip_match = TEST_RESULT_SKIP.match(line) 413 414 # Check if line matches test result line format 415 if not match: 416 return False 417 test.log.append(lines.pop()) 418 419 # Set name of test object 420 if skip_match: 421 test.name = skip_match.group(4) 422 else: 423 test.name = match.group(4) 424 425 # Check test num 426 num = int(match.group(2)) 427 if num != expected_num: 428 test.add_error(f'Expected test number {expected_num} but found {num}') 429 430 # Set status of test object 431 status = match.group(1) 432 if skip_match: 433 test.status = TestStatus.SKIPPED 434 elif status == 'ok': 435 test.status = TestStatus.SUCCESS 436 else: 437 test.status = TestStatus.FAILURE 438 return True 439 440def parse_diagnostic(lines: LineStream) -> List[str]: 441 """ 442 Parse lines that do not match the format of a test result line or 443 test header line and returns them in list. 444 445 Line formats that are not parsed: 446 - '# Subtest: [test name]' 447 - '[ok|not ok] [test number] [-] [test name] [optional skip 448 directive]' 449 450 Parameters: 451 lines - LineStream of KTAP output to parse 452 453 Return: 454 Log of diagnostic lines 455 """ 456 log = [] # type: List[str] 457 while lines and not TEST_RESULT.match(lines.peek()) and not \ 458 TEST_HEADER.match(lines.peek()): 459 log.append(lines.pop()) 460 return log 461 462 463# Printing helper methods: 464 465DIVIDER = '=' * 60 466 467def format_test_divider(message: str, len_message: int) -> str: 468 """ 469 Returns string with message centered in fixed width divider. 470 471 Example: 472 '===================== message example =====================' 473 474 Parameters: 475 message - message to be centered in divider line 476 len_message - length of the message to be printed such that 477 any characters of the color codes are not counted 478 479 Return: 480 String containing message centered in fixed width divider 481 """ 482 default_count = 3 # default number of dashes 483 len_1 = default_count 484 len_2 = default_count 485 difference = len(DIVIDER) - len_message - 2 # 2 spaces added 486 if difference > 0: 487 # calculate number of dashes for each side of the divider 488 len_1 = int(difference / 2) 489 len_2 = difference - len_1 490 return ('=' * len_1) + f' {message} ' + ('=' * len_2) 491 492def print_test_header(test: Test) -> None: 493 """ 494 Prints test header with test name and optionally the expected number 495 of subtests. 496 497 Example: 498 '=================== example (2 subtests) ===================' 499 500 Parameters: 501 test - Test object representing current test being printed 502 """ 503 message = test.name 504 if test.expected_count: 505 if test.expected_count == 1: 506 message += ' (1 subtest)' 507 else: 508 message += f' ({test.expected_count} subtests)' 509 stdout.print_with_timestamp(format_test_divider(message, len(message))) 510 511def print_log(log: Iterable[str]) -> None: 512 """Prints all strings in saved log for test in yellow.""" 513 for m in log: 514 stdout.print_with_timestamp(stdout.yellow(m)) 515 516def format_test_result(test: Test) -> str: 517 """ 518 Returns string with formatted test result with colored status and test 519 name. 520 521 Example: 522 '[PASSED] example' 523 524 Parameters: 525 test - Test object representing current test being printed 526 527 Return: 528 String containing formatted test result 529 """ 530 if test.status == TestStatus.SUCCESS: 531 return stdout.green('[PASSED] ') + test.name 532 if test.status == TestStatus.SKIPPED: 533 return stdout.yellow('[SKIPPED] ') + test.name 534 if test.status == TestStatus.NO_TESTS: 535 return stdout.yellow('[NO TESTS RUN] ') + test.name 536 if test.status == TestStatus.TEST_CRASHED: 537 print_log(test.log) 538 return stdout.red('[CRASHED] ') + test.name 539 print_log(test.log) 540 return stdout.red('[FAILED] ') + test.name 541 542def print_test_result(test: Test) -> None: 543 """ 544 Prints result line with status of test. 545 546 Example: 547 '[PASSED] example' 548 549 Parameters: 550 test - Test object representing current test being printed 551 """ 552 stdout.print_with_timestamp(format_test_result(test)) 553 554def print_test_footer(test: Test) -> None: 555 """ 556 Prints test footer with status of test. 557 558 Example: 559 '===================== [PASSED] example =====================' 560 561 Parameters: 562 test - Test object representing current test being printed 563 """ 564 message = format_test_result(test) 565 stdout.print_with_timestamp(format_test_divider(message, 566 len(message) - stdout.color_len())) 567 568def print_summary_line(test: Test) -> None: 569 """ 570 Prints summary line of test object. Color of line is dependent on 571 status of test. Color is green if test passes, yellow if test is 572 skipped, and red if the test fails or crashes. Summary line contains 573 counts of the statuses of the tests subtests or the test itself if it 574 has no subtests. 575 576 Example: 577 "Testing complete. Passed: 2, Failed: 0, Crashed: 0, Skipped: 0, 578 Errors: 0" 579 580 test - Test object representing current test being printed 581 """ 582 if test.status == TestStatus.SUCCESS: 583 color = stdout.green 584 elif test.status in (TestStatus.SKIPPED, TestStatus.NO_TESTS): 585 color = stdout.yellow 586 else: 587 color = stdout.red 588 stdout.print_with_timestamp(color(f'Testing complete. {test.counts}')) 589 590# Other methods: 591 592def bubble_up_test_results(test: Test) -> None: 593 """ 594 If the test has subtests, add the test counts of the subtests to the 595 test and check if any of the tests crashed and if so set the test 596 status to crashed. Otherwise if the test has no subtests add the 597 status of the test to the test counts. 598 599 Parameters: 600 test - Test object for current test being parsed 601 """ 602 subtests = test.subtests 603 counts = test.counts 604 status = test.status 605 for t in subtests: 606 counts.add_subtest_counts(t.counts) 607 if counts.total() == 0: 608 counts.add_status(status) 609 elif test.counts.get_status() == TestStatus.TEST_CRASHED: 610 test.status = TestStatus.TEST_CRASHED 611 612def parse_test(lines: LineStream, expected_num: int, log: List[str]) -> Test: 613 """ 614 Finds next test to parse in LineStream, creates new Test object, 615 parses any subtests of the test, populates Test object with all 616 information (status, name) about the test and the Test objects for 617 any subtests, and then returns the Test object. The method accepts 618 three formats of tests: 619 620 Accepted test formats: 621 622 - Main KTAP/TAP header 623 624 Example: 625 626 KTAP version 1 627 1..4 628 [subtests] 629 630 - Subtest header line 631 632 Example: 633 634 # Subtest: name 635 1..3 636 [subtests] 637 ok 1 name 638 639 - Test result line 640 641 Example: 642 643 ok 1 - test 644 645 Parameters: 646 lines - LineStream of KTAP output to parse 647 expected_num - expected test number for test to be parsed 648 log - list of strings containing any preceding diagnostic lines 649 corresponding to the current test 650 651 Return: 652 Test object populated with characteristics and any subtests 653 """ 654 test = Test() 655 test.log.extend(log) 656 parent_test = False 657 main = parse_ktap_header(lines, test) 658 if main: 659 # If KTAP/TAP header is found, attempt to parse 660 # test plan 661 test.name = "main" 662 parse_test_plan(lines, test) 663 parent_test = True 664 else: 665 # If KTAP/TAP header is not found, test must be subtest 666 # header or test result line so parse attempt to parser 667 # subtest header 668 parent_test = parse_test_header(lines, test) 669 if parent_test: 670 # If subtest header is found, attempt to parse 671 # test plan and print header 672 parse_test_plan(lines, test) 673 print_test_header(test) 674 expected_count = test.expected_count 675 subtests = [] 676 test_num = 1 677 while parent_test and (expected_count is None or test_num <= expected_count): 678 # Loop to parse any subtests. 679 # Break after parsing expected number of tests or 680 # if expected number of tests is unknown break when test 681 # result line with matching name to subtest header is found 682 # or no more lines in stream. 683 sub_log = parse_diagnostic(lines) 684 sub_test = Test() 685 if not lines or (peek_test_name_match(lines, test) and 686 not main): 687 if expected_count and test_num <= expected_count: 688 # If parser reaches end of test before 689 # parsing expected number of subtests, print 690 # crashed subtest and record error 691 test.add_error('missing expected subtest!') 692 sub_test.log.extend(sub_log) 693 test.counts.add_status( 694 TestStatus.TEST_CRASHED) 695 print_test_result(sub_test) 696 else: 697 test.log.extend(sub_log) 698 break 699 else: 700 sub_test = parse_test(lines, test_num, sub_log) 701 subtests.append(sub_test) 702 test_num += 1 703 test.subtests = subtests 704 if not main: 705 # If not main test, look for test result line 706 test.log.extend(parse_diagnostic(lines)) 707 if (parent_test and peek_test_name_match(lines, test)) or \ 708 not parent_test: 709 parse_test_result(lines, test, expected_num) 710 else: 711 test.add_error('missing subtest result line!') 712 713 # Check for there being no tests 714 if parent_test and len(subtests) == 0: 715 # Don't override a bad status if this test had one reported. 716 # Assumption: no subtests means CRASHED is from Test.__init__() 717 if test.status in (TestStatus.TEST_CRASHED, TestStatus.SUCCESS): 718 test.status = TestStatus.NO_TESTS 719 test.add_error('0 tests run!') 720 721 # Add statuses to TestCounts attribute in Test object 722 bubble_up_test_results(test) 723 if parent_test and not main: 724 # If test has subtests and is not the main test object, print 725 # footer. 726 print_test_footer(test) 727 elif not main: 728 print_test_result(test) 729 return test 730 731def parse_run_tests(kernel_output: Iterable[str]) -> Test: 732 """ 733 Using kernel output, extract KTAP lines, parse the lines for test 734 results and print condensed test results and summary line. 735 736 Parameters: 737 kernel_output - Iterable object contains lines of kernel output 738 739 Return: 740 Test - the main test object with all subtests. 741 """ 742 stdout.print_with_timestamp(DIVIDER) 743 lines = extract_tap_lines(kernel_output) 744 test = Test() 745 if not lines: 746 test.name = '<missing>' 747 test.add_error('could not find any KTAP output!') 748 test.status = TestStatus.FAILURE_TO_PARSE_TESTS 749 else: 750 test = parse_test(lines, 0, []) 751 if test.status != TestStatus.NO_TESTS: 752 test.status = test.counts.get_status() 753 stdout.print_with_timestamp(DIVIDER) 754 print_summary_line(test) 755 return test 756