Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 79 additions & 23 deletions tests/tap_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,18 @@
import argparse
import importlib
import io
import os
import sys
import threading
import traceback
import unittest


class DiagnosticCapture(io.TextIOBase):
"""Capture writes and re-emit them as TAP diagnostic lines (# ...)."""

def __init__(self, real_stdout: io.TextIOBase) -> None:
self._real = real_stdout
def __init__(self, stream: io.TextIOBase) -> None:
self._real = stream
self._buf = ''

def write(self, text: str) -> int:
Expand All @@ -42,64 +44,109 @@ def flush(self) -> None:
self._real.flush()


class FDCapture:
"""Redirect a file descriptor at the OS level and re-emit captured output
as TAP diagnostic lines. This intercepts writes from subprocesses which
bypass the Python-level sys.stderr redirect."""

def __init__(self, fd: int, real_stdout: io.TextIOBase) -> None:
self._fd = fd
self._real = real_stdout
self._saved_fd = os.dup(fd)
r_fd, w_fd = os.pipe()
os.dup2(w_fd, fd)
os.close(w_fd)
self._thread = threading.Thread(target=self._reader, args=(r_fd,),
daemon=True)
# daemon=True: if restore() is somehow never called (e.g. os._exit()),
# the process can still exit rather than hang on a blocking read.
self._thread.start()

def _reader(self, r_fd: int) -> None:
buf = b''
# Open unbuffered (bufsize=0) so bytes are delivered to the reader
# as soon as they are written, without waiting for a buffer to fill.
with open(r_fd, 'rb', 0) as f:
while True:
chunk = f.read(4096)
if not chunk:
break
buf += chunk
while b'\n' in buf:
line, buf = buf.split(b'\n', 1)
self._real.write(
'# {}\n'.format(line.decode('utf-8', errors='replace')))
self._real.flush()
if buf:
self._real.write(
'# {}\n'.format(buf.decode('utf-8', errors='replace')))
self._real.flush()

def restore(self) -> None:
"""Restore the original file descriptor and wait for the reader to drain."""
os.dup2(self._saved_fd, self._fd)
os.close(self._saved_fd)
self._thread.join()


class TAPTestResult(unittest.TestResult):
"""Collect unittest results and render them as TAP version 13."""

def __init__(self) -> None:
def __init__(self, stream: io.TextIOBase) -> None:
super().__init__()
self._stream = stream
self._test_count = 0
self._lines: list[str] = []

def _description(self, test: unittest.TestCase) -> str:
return '{} ({})'.format(test._testMethodName, type(test).__name__)

def addSuccess(self, test: unittest.TestCase) -> None:
super().addSuccess(test)
self._test_count += 1
self._lines.append('ok {} - {}\n'.format(
self._stream.write('ok {} - {}\n'.format(
self._test_count, self._description(test)))
self._stream.flush()

def addError(self, test: unittest.TestCase, err: object) -> None:
super().addError(test, err)
self._test_count += 1
self._lines.append('not ok {} - {}\n'.format(
self._stream.write('not ok {} - {}\n'.format(
self._test_count, self._description(test)))
for line in traceback.format_exception(*err): # type: ignore[misc]
for subline in line.splitlines():
self._lines.append('# {}\n'.format(subline))
self._stream.write('# {}\n'.format(subline))
self._stream.flush()

def addFailure(self, test: unittest.TestCase, err: object) -> None:
super().addFailure(test, err)
self._test_count += 1
self._lines.append('not ok {} - {}\n'.format(
self._stream.write('not ok {} - {}\n'.format(
self._test_count, self._description(test)))
for line in traceback.format_exception(*err): # type: ignore[misc]
for subline in line.splitlines():
self._lines.append('# {}\n'.format(subline))
self._stream.write('# {}\n'.format(subline))
self._stream.flush()

def addSkip(self, test: unittest.TestCase, reason: str) -> None:
super().addSkip(test, reason)
self._test_count += 1
self._lines.append('ok {} - {} # SKIP {}\n'.format(
self._stream.write('ok {} - {} # SKIP {}\n'.format(
self._test_count, self._description(test), reason))
self._stream.flush()

def addExpectedFailure(self, test: unittest.TestCase, err: object) -> None:
super().addExpectedFailure(test, err)
self._test_count += 1
self._lines.append('ok {} - {} # TODO expected failure\n'.format(
self._stream.write('ok {} - {} # TODO expected failure\n'.format(
self._test_count, self._description(test)))
self._stream.flush()

def addUnexpectedSuccess(self, test: unittest.TestCase) -> None:
super().addUnexpectedSuccess(test)
self._test_count += 1
self._lines.append('not ok {} - {} # TODO unexpected success\n'.format(
self._stream.write('not ok {} - {} # TODO unexpected success\n'.format(
self._test_count, self._description(test)))

def print_tap(self, stream: io.TextIOBase) -> None:
stream.write('1..{}\n'.format(self._test_count))
for line in self._lines:
stream.write(line)
stream.flush()
self._stream.flush()


def run_tests(test_module_name: str, start_dir: str | None = None) -> bool:
Expand All @@ -112,21 +159,30 @@ def run_tests(test_module_name: str, start_dir: str | None = None) -> bool:
suite = loader.loadTestsFromModule(module)

real_stdout = sys.stdout
# TAP version header must be the very first line on stdout.
real_stderr = sys.stderr
# TAP version header and plan must appear before any test output.
real_stdout.write('TAP version 13\n')
real_stdout.write('1..{}\n'.format(suite.countTestCases()))
real_stdout.flush()

# Redirect stdout so any print() calls from setUp/tearDown/tests are
# re-emitted as TAP diagnostic lines and do not break the TAP stream.
# Redirect stdout and stderr so any print()/sys.stderr.write() calls from
# setUp/tearDown/tests are re-emitted as TAP diagnostic lines and do not
# break the TAP stream.
sys.stdout = DiagnosticCapture(real_stdout) # type: ignore[assignment]
sys.stderr = DiagnosticCapture(real_stdout) # type: ignore[assignment]
# Also redirect fd 2 at the OS level so that subprocess stderr (which
# inherits the raw file descriptor and bypasses sys.stderr) is captured.
stderr_fd_capture = FDCapture(2, real_stdout)
try:
result = TAPTestResult()
result = TAPTestResult(real_stdout)
suite.run(result)
finally:
sys.stdout.flush()
sys.stdout = real_stdout
sys.stderr.flush()
sys.stderr = real_stderr
stderr_fd_capture.restore()

result.print_tap(real_stdout)
return result.wasSuccessful()


Expand Down
Loading