Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions tests/nvme_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,27 +77,28 @@ def setUp(self):
self.do_validate_pci_device = True
self.default_nsid = 0x1
self.flbas = 0
self.config_file = 'tests/config.json'
self.debug = False
self.config_file = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'config.json')

self.load_config()
if self.do_validate_pci_device:
self.validate_pci_device()
self.ns_mgmt_supported = self.get_ns_mgmt_support()
if self.ns_mgmt_supported:
self.create_and_attach_default_ns()
print(f"\nsetup: ctrl: {self.ctrl}, ns1: {self.ns1}, default_nsid: {self.default_nsid}, flbas: {self.flbas}\n")
if self.debug:
print(f"setup: ctrl: {self.ctrl}, ns1: {self.ns1}, default_nsid: {self.default_nsid}, flbas: {self.flbas}")

def tearDown(self):
""" Post Section for TestNVMe. """
if self.clear_log_dir is True:
shutil.rmtree(self.log_dir, ignore_errors=True)
if self.ns_mgmt_supported:
self.create_and_attach_default_ns()
print(f"\nteardown: ctrl: {self.ctrl}, ns1: {self.ns1}, default_nsid: {self.default_nsid}, flbas: {self.flbas}\n")

@classmethod
def tearDownClass(cls):
print("\n")
pass

def create_and_attach_default_ns(self):
""" Creates a default namespace with the full capacity of the ctrls NVM
Expand Down Expand Up @@ -147,7 +148,9 @@ def load_config(self):
self.ns1 = config['ns1']
self.log_dir = config['log_dir']
self.nvme_bin = config.get('nvme_bin', self.nvme_bin)
print(f"\nUsing nvme binary '{self.nvme_bin}'")
self.debug = config.get('debug', False)
if self.debug:
print(f"Using nvme binary '{self.nvme_bin}'")
self.do_validate_pci_device = config.get(
'do_validate_pci_device', self.do_validate_pci_device)
self.clear_log_dir = False
Expand Down
156 changes: 59 additions & 97 deletions tests/tap_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,138 +15,107 @@
import argparse
import importlib
import io
import os
import sys
import threading
import traceback
import unittest


class DiagnosticCapture(io.TextIOBase):
"""Capture writes and re-emit them as TAP diagnostic lines (# ...)."""
class TAPDiagnosticStream(io.TextIOBase):
"""Wrap a stream and prefix every line with '# ' for TAP diagnostics.

This lets print()/sys.stdout.write() calls from setUp/tearDown/tests
appear on stdout as TAP-compliant diagnostic lines instead of being
mixed into stderr.
"""

def __init__(self, stream: io.TextIOBase) -> None:
self._real = stream
self._buf = ''
super().__init__()
self._stream = stream
self._pending = ''

def write(self, text: str) -> int:
self._buf += text
while '\n' in self._buf:
line, self._buf = self._buf.split('\n', 1)
self._real.write('# {}\n'.format(line))
self._real.flush()
return len(text)
def write(self, s: str) -> int:
self._pending += s
while '\n' in self._pending:
line, self._pending = self._pending.split('\n', 1)
self._stream.write('# {}\n'.format(line))
self._stream.flush()
return len(s)

def flush(self) -> None:
if self._buf:
self._real.write('# {}\n'.format(self._buf))
self._buf = ''
self._real.flush()


class FDCapture:
"""Redirect a file descriptor at the OS level and re-emit captured output
as TAP diagnostic lines. This intercepts writes from subprocesses which
bypass the Python-level sys.stderr redirect."""

def __init__(self, fd: int, real_stdout: io.TextIOBase) -> None:
self._fd = fd
self._real = real_stdout
self._saved_fd = os.dup(fd)
r_fd, w_fd = os.pipe()
os.dup2(w_fd, fd)
os.close(w_fd)
self._thread = threading.Thread(target=self._reader, args=(r_fd,),
daemon=True)
# daemon=True: if restore() is somehow never called (e.g. os._exit()),
# the process can still exit rather than hang on a blocking read.
self._thread.start()

def _reader(self, r_fd: int) -> None:
buf = b''
# Open unbuffered (bufsize=0) so bytes are delivered to the reader
# as soon as they are written, without waiting for a buffer to fill.
with open(r_fd, 'rb', 0) as f:
while True:
chunk = f.read(4096)
if not chunk:
break
buf += chunk
while b'\n' in buf:
line, buf = buf.split(b'\n', 1)
self._real.write(
'# {}\n'.format(line.decode('utf-8', errors='replace')))
self._real.flush()
if buf:
self._real.write(
'# {}\n'.format(buf.decode('utf-8', errors='replace')))
self._real.flush()

def restore(self) -> None:
"""Restore the original file descriptor and wait for the reader to drain."""
os.dup2(self._saved_fd, self._fd)
os.close(self._saved_fd)
self._thread.join()
if self._pending:
self._stream.write('# {}\n'.format(self._pending))
self._pending = ''
self._stream.flush()


class TAPTestResult(unittest.TestResult):
"""Collect unittest results and render them as TAP version 13."""

def __init__(self, stream: io.TextIOBase) -> None:
def __init__(self, stdout_stream: io.TextIOBase,
stderr_stream: io.TextIOBase) -> None:
super().__init__()
self._stream = stream
self._stdout_stream = stdout_stream
self._stderr_stream = stderr_stream
self._test_count = 0

def _description(self, test: unittest.TestCase) -> str:
return '{} ({})'.format(test._testMethodName, type(test).__name__)

def _output_traceback(self, err):
tb = ''.join(traceback.format_exception(*err))

self._stderr_stream.write(' ---\n')
self._stderr_stream.write(' traceback: |\n')

for line in tb.splitlines():
self._stderr_stream.write(f' {line}\n')

self._stderr_stream.write(' ...\n')
self._stderr_stream.flush()

def addSuccess(self, test: unittest.TestCase) -> None:
super().addSuccess(test)
self._test_count += 1
self._stream.write('ok {} - {}\n'.format(
self._stdout_stream.write('ok {} - {}\n'.format(
self._test_count, self._description(test)))
self._stream.flush()
self._stdout_stream.flush()

def addError(self, test: unittest.TestCase, err: object) -> None:
super().addError(test, err)
self._test_count += 1
self._stream.write('not ok {} - {}\n'.format(
self._stdout_stream.write('not ok {} - {}\n'.format(
self._test_count, self._description(test)))
for line in traceback.format_exception(*err): # type: ignore[misc]
for subline in line.splitlines():
self._stream.write('# {}\n'.format(subline))
self._stream.flush()
self._stdout_stream.flush()
self._output_traceback(err)

def addFailure(self, test: unittest.TestCase, err: object) -> None:
super().addFailure(test, err)
self._test_count += 1
self._stream.write('not ok {} - {}\n'.format(
self._stdout_stream.write('not ok {} - {}\n'.format(
self._test_count, self._description(test)))
for line in traceback.format_exception(*err): # type: ignore[misc]
for subline in line.splitlines():
self._stream.write('# {}\n'.format(subline))
self._stream.flush()
self._stdout_stream.flush()
self._output_traceback(err)

def addSkip(self, test: unittest.TestCase, reason: str) -> None:
super().addSkip(test, reason)
self._test_count += 1
self._stream.write('ok {} - {} # SKIP {}\n'.format(
self._stdout_stream.write('ok {} - {} # SKIP {}\n'.format(
self._test_count, self._description(test), reason))
self._stream.flush()
self._stdout_stream.flush()

def addExpectedFailure(self, test: unittest.TestCase, err: object) -> None:
super().addExpectedFailure(test, err)
self._test_count += 1
self._stream.write('ok {} - {} # TODO expected failure\n'.format(
self._stdout_stream.write('ok {} - {} # TODO expected failure\n'.format(
self._test_count, self._description(test)))
self._stream.flush()
self._stdout_stream.flush()

def addUnexpectedSuccess(self, test: unittest.TestCase) -> None:
super().addUnexpectedSuccess(test)
self._test_count += 1
self._stream.write('not ok {} - {} # TODO unexpected success\n'.format(
self._stdout_stream.write('not ok {} - {} # TODO unexpected success\n'.format(
self._test_count, self._description(test)))
self._stream.flush()
self._stdout_stream.flush()


def run_tests(test_module_name: str, start_dir: str | None = None) -> bool:
Expand All @@ -165,23 +134,16 @@ def run_tests(test_module_name: str, start_dir: str | None = None) -> bool:
real_stdout.write('1..{}\n'.format(suite.countTestCases()))
real_stdout.flush()

# Redirect stdout and stderr so any print()/sys.stderr.write() calls from
# setUp/tearDown/tests are re-emitted as TAP diagnostic lines and do not
# break the TAP stream.
sys.stdout = DiagnosticCapture(real_stdout) # type: ignore[assignment]
sys.stderr = DiagnosticCapture(real_stdout) # type: ignore[assignment]
# Also redirect fd 2 at the OS level so that subprocess stderr (which
# inherits the raw file descriptor and bypasses sys.stderr) is captured.
stderr_fd_capture = FDCapture(2, real_stdout)
# Redirect sys.stdout to a TAP diagnostic stream so that
# print()/sys.stdout.write() calls from setUp/tearDown/tests appear on
# stdout as '# ...' diagnostic lines rather than being sent to stderr.
# Error tracebacks (genuine failures) still go to stderr via stderr_stream.
sys.stdout = TAPDiagnosticStream(real_stdout) # type: ignore[assignment]
try:
result = TAPTestResult(real_stdout)
result = TAPTestResult(real_stdout, real_stderr)
suite.run(result)
finally:
sys.stdout.flush()
sys.stdout = real_stdout
sys.stderr.flush()
sys.stderr = real_stderr
stderr_fd_capture.restore()

return result.wasSuccessful()

Expand All @@ -195,8 +157,8 @@ def main() -> None:
default=None)
args = parser.parse_args()

success = run_tests(args.test_module, args.start_dir)
sys.exit(0 if success else 1)
run_tests(args.test_module, args.start_dir)
sys.exit(0)


if __name__ == '__main__':
Expand Down
Loading