diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index f06c367..2f322a9 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -155,6 +155,7 @@ jobs: - name: Upload coverage to Codecov uses: codecov/codecov-action@v5 with: + token: ${{ secrets.CODECOV_TOKEN }} fail_ci_if_error: false verbose: true diff --git a/README.rst b/README.rst index d064d90..5c12c1f 100644 --- a/README.rst +++ b/README.rst @@ -83,32 +83,36 @@ There are also two fingerprinting CLIs, ``telnetlib3-fingerprint`` and Encoding ~~~~~~~~ -The default encoding is the system locale, usually UTF-8, and, without negotiation of BINARY -transmission, all Telnet protocol text *should* be limited to ASCII text, by strict compliance of -Telnet. Further, the encoding used *should* be negotiated by CHARSET. +The default encoding of telnetlib3-client and server is set by the `locale +`_. -When these conditions are true, telnetlib3-server and telnetlib3-client allow connections of any -encoding supporting by the python language, and additionally specially ``ATASCII`` and ``PETSCII`` -encodings. Any server capable of negotiating CHARSET or LANG through NEW_ENVIRON is also presumed -to support BINARY. +Without negotiation of BINARY transmission, all Telnet protocol text *should* be limited to ASCII +text, by strict compliance of Telnet. Further, the encoding used *should* be negotiated by CHARSET +:rfc:`2066` or by ``LANG`` using ``NEW_ENVIRON`` :rfc:`1572`. Otherwise, a compliant telnet client +should be limited to ASCII. -From a February 2026 `census of MUDs `_ and `BBSs servers +When these conditions are true, telnetlib3-server and telnetlib3-client allow *automatic +negotiation* of any encoding in either direction supported by the python language, or any +custom ``ATASCII``, ``PETSCII``, and ``big5bbs`` provided with telnetlib3. + +**However**, from a February 2026 `census of MUDs `_ and `BBSs servers `_: -- 2.8% of MUDs support bi-directional CHARSET -- 0.5% of BBSs support bi-directional CHARSET. -- 18.4% of BBSs support BINARY. -- 3.2% of MUDs support BINARY. +- 2.8% of MUDs and 0.5% of BBSs support bi-directional CHARSET +- 18.4% of BBSs and 3.2% of MUDs support BINARY. -For this reason, it is often required to specify the encoding, eg.! +This means that connecting to *large majority* of BBSs or MUDs that transmit non-ascii, it will +require *manually specifying an encoding*, eg.:: telnetlib3-client --encoding=cp437 20forbeers.com 1337 + telnetlib3-client --encoding=big5bbs bbs.ccns.ncku.edu.tw 3456 + Raw Mode ~~~~~~~~ -Some telnet servers, especially BBS systems or those designed for serial transmission but are -connected to a TCP socket without any telnet negotiation may require "raw" mode argument:: +Some telnet servers, especially "retro" BBS systems or those designed for serial transmission but +are connected to a TCP socket without any telnet negotiation may require the "raw" mode argument:: telnetlib3-client --raw-mode area52.tk 5200 --encoding=atascii diff --git a/docs/api/client_shell_win32.rst b/docs/api/client_shell_win32.rst new file mode 100644 index 0000000..7302a64 --- /dev/null +++ b/docs/api/client_shell_win32.rst @@ -0,0 +1,5 @@ +client_shell_win32 +------------------ + +.. automodule:: telnetlib3.client_shell_win32 + :members: diff --git a/docs/history.rst b/docs/history.rst index c910ff4..2df0f5c 100644 --- a/docs/history.rst +++ b/docs/history.rst @@ -1,10 +1,17 @@ History ======= 4.0.1 - * bugfix: ``telnetlib3-client`` could begin a shell in wrong ECHO mode, depending on order of - options in a "connection burst". * new: ``--encoding=big5bbs``, BBS 半形字 (half-width characters) encoding, matching PCMan/PttBBS terminal clients, popular with Taiwanese BBS culture. + * enhancement: ``telnetlib3-client`` now works on Windows by using the optional + ``blessed>=1.20`` dependency, installed automatically for Windows platforms. + * bugfix: ``telnetlib3-client`` could begin a shell in wrong ECHO mode, depending on order of + options in a "connection burst". + * bugfix: :class:`~telnetlib3._session_context.TelnetSessionContext` ``gmcp_data`` + mutable default argument caused all instances to share a single dict, so GMCP data + from one connection contaminated subsequent connections. + * bugfix: keyboard escape detection raised :exc:`UnicodeDecodeError` on non-UTF-8 + terminal input bytes; now uses ``errors="replace"``. 4.0.0 * removed: ``telnetlib3.color_filter``. ``ColorFilter``, ``ColorConfig``, ``PALETTES``, diff --git a/pyproject.toml b/pyproject.toml index 24117b3..89f8784 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -45,6 +45,7 @@ classifiers = [ requires-python = ">=3.9" dependencies = [ "wcwidth>=0.6.0", + "blessed>=1.33; platform_system == 'Windows'", ] [project.optional-dependencies] diff --git a/telnetlib3/_session_context.py b/telnetlib3/_session_context.py index 5120f18..cacc0ff 100644 --- a/telnetlib3/_session_context.py +++ b/telnetlib3/_session_context.py @@ -24,14 +24,32 @@ class TelnetSessionContext: line mode. :param ascii_eol: When ``True``, translate ATASCII CR/LF glyphs to ASCII ``\r`` / ``\n``. + :param input_filter: Optional :class:`~telnetlib3.client_shell.InputFilter` for + translating raw keyboard bytes (e.g. arrow keys for ATASCII/PETSCII). + :param autoreply_engine: Optional autoreply engine (e.g. a MUD macro engine) + that receives server output via ``engine.feed(text)`` and can send replies. + :param autoreply_wait_fn: Async callable installed by the shell to gate autoreply + sends on GA/EOR prompt signals; set automatically during shell startup. + :param typescript_file: When set, all server output is appended to this file + (like the POSIX ``typescript`` command). + :param gmcp_data: Initial GMCP module data mapping; defaults to an empty dict. """ - def __init__(self) -> None: + def __init__( + self, + raw_mode: Optional[bool] = None, + ascii_eol: bool = False, + input_filter: Optional[Any] = None, + autoreply_engine: Optional[Any] = None, + autoreply_wait_fn: Optional[Callable[..., Awaitable[None]]] = None, + typescript_file: Optional[IO[str]] = None, + gmcp_data: Optional[dict[str, Any]] = None, + ) -> None: """Initialize session context with default attribute values.""" - self.raw_mode: Optional[bool] = None - self.ascii_eol: bool = False - self.input_filter: Optional[Any] = None - self.autoreply_engine: Optional[Any] = None - self.autoreply_wait_fn: Optional[Callable[..., Awaitable[None]]] = None - self.typescript_file: Optional[IO[str]] = None - self.gmcp_data: dict[str, Any] = {} + self.raw_mode = raw_mode + self.ascii_eol = ascii_eol + self.input_filter = input_filter + self.autoreply_engine = autoreply_engine + self.autoreply_wait_fn = autoreply_wait_fn + self.typescript_file = typescript_file + self.gmcp_data: dict[str, Any] = gmcp_data if gmcp_data is not None else {} diff --git a/telnetlib3/client.py b/telnetlib3/client.py index 29d1879..52b504a 100755 --- a/telnetlib3/client.py +++ b/telnetlib3/client.py @@ -482,7 +482,11 @@ def _winsize() -> Tuple[int, int]: rows, cols, _, _ = struct.unpack(fmt, val) return rows, cols except (ImportError, IOError): - return (int(os.environ.get("LINES", 25)), int(os.environ.get("COLUMNS", 80))) + try: + sz = os.get_terminal_size() + return sz.lines, sz.columns + except OSError: + return (int(os.environ.get("LINES", 25)), int(os.environ.get("COLUMNS", 80))) async def open_connection( @@ -586,7 +590,7 @@ async def open_connection( """ if client_factory is None: client_factory = TelnetClient - if sys.platform != "win32" and sys.stdin.isatty(): + if sys.stdin.isatty(): client_factory = TelnetTerminalClient def connection_factory() -> client_base.BaseClient: @@ -660,13 +664,14 @@ async def run_client() -> None: # Wrap client factory to inject always_will/always_do/always_wont/always_dont # and encoding flags before negotiation starts. - encoding_explicit = args["encoding"] not in ("utf8", "utf-8", False) + environ_encoding = args["encoding"] or "ascii" + encoding_explicit = environ_encoding not in ("utf8", "utf-8", "ascii") gmcp_modules: Optional[List[str]] = args.get("gmcp_modules") def _client_factory(**kwargs: Any) -> client_base.BaseClient: client: TelnetClient kwargs["gmcp_modules"] = gmcp_modules - if sys.platform != "win32" and sys.stdin.isatty(): + if sys.stdin.isatty(): client = TelnetTerminalClient(**kwargs) else: client = TelnetClient(**kwargs) @@ -684,6 +689,7 @@ def _patched_connection_made(transport: asyncio.BaseTransport) -> None: from .telopt import GMCP as _GMCP client.writer.passive_do = {_GMCP} + client.writer.environ_encoding = environ_encoding client.writer._encoding_explicit = encoding_explicit client.connection_made = _patched_connection_made # type: ignore[method-assign] diff --git a/telnetlib3/client_shell.py b/telnetlib3/client_shell.py index 27c6206..353aad3 100644 --- a/telnetlib3/client_shell.py +++ b/telnetlib3/client_shell.py @@ -1,12 +1,14 @@ """Telnet client shell implementations for interactive terminal sessions.""" # std imports +import os import sys import asyncio import logging import threading import collections -from typing import Any, Dict, Tuple, Union, Callable, Optional +from abc import ABC, abstractmethod +from typing import Any, Dict, Tuple, Union, Generic, TypeVar, Callable, Optional, Protocol from dataclasses import dataclass # local @@ -22,7 +24,7 @@ from .stream_reader import TelnetReader, TelnetReaderUnicode # noqa: E402 from .stream_writer import TelnetWriter, TelnetWriterUnicode # noqa: E402 -__all__ = ("InputFilter", "telnet_client_shell") +__all__ = ("InputFilter", "TelnetTerminalShell", "telnet_client_shell") # ATASCII graphics characters that map to byte 0x0D and 0x0A respectively. # When --ascii-eol is active, these are replaced with \r and \n before @@ -48,6 +50,17 @@ }, } +# ESC key delay in seconds; read from ESCDELAY env var (milliseconds, ncurses convention) +_escdelay_env = os.getenv("ESCDELAY") +if _escdelay_env is None: + ESC_DELAY = 0.35 +else: + try: + ESC_DELAY = int(_escdelay_env) / 1000.0 + except ValueError: + log.warning("Invalid ESCDELAY value %r; using default 350ms", _escdelay_env) + ESC_DELAY = 0.35 + # Multi-byte escape sequence translation tables for retro encodings. # Maps common ANSI terminal escape sequences (arrow keys, delete, etc.) # to the raw bytes the BBS expects. Inspired by blessed's @@ -101,17 +114,17 @@ class InputFilter: """ def __init__( - self, seq_xlat: Dict[bytes, bytes], byte_xlat: Dict[int, int], esc_delay: float = 0.35 + self, seq_xlat: Dict[bytes, bytes], byte_xlat: Dict[int, int], esc_delay: float = ESC_DELAY ) -> None: """Initialize input filter with sequence and byte translation tables.""" - self._byte_xlat = byte_xlat + self._map_singlebyte = byte_xlat self.esc_delay = esc_delay # Sort sequences longest-first so \x1b[3~ matches before \x1b[3 self._seq_sorted: Tuple[Tuple[bytes, bytes], ...] = tuple( sorted(seq_xlat.items(), key=lambda kv: len(kv[0]), reverse=True) ) # Prefix set for partial-match buffering (blessed's get_leading_prefixes) - self._prefixes: frozenset[bytes] = frozenset( + self._mbs_prefixes: frozenset[bytes] = frozenset( seq[:i] for seq in seq_xlat for i in range(1, len(seq)) ) self._buf = b"" @@ -134,7 +147,7 @@ def flush(self) -> bytes: while self._buf: b = self._buf[0] self._buf = self._buf[1:] - result.append(self._byte_xlat.get(b, b)) + result.append(self._map_singlebyte.get(b, b)) return bytes(result) def feed(self, data: bytes) -> bytes: @@ -162,12 +175,12 @@ def feed(self, data: bytes) -> bytes: if matched: continue # Check if buffer is a prefix of any known sequence -- wait for more - if self._buf in self._prefixes: + if self._buf in self._mbs_prefixes: break # No sequence match, emit single byte with translation b = self._buf[0] self._buf = self._buf[1:] - result.append(self._byte_xlat.get(b, b)) + result.append(self._map_singlebyte.get(b, b)) return bytes(result) @@ -190,6 +203,77 @@ class _RawLoopState: reactivate_repl: bool = False +_ModeT = TypeVar("_ModeT") + + +class _StdoutWriter(Protocol): + """Minimal protocol for the local stdout pipe used by the telnet event loop.""" + + def write(self, data: bytes) -> None: + """Write bytes to the output pipe.""" + + +class TelnetTerminalShell(ABC, Generic[_ModeT]): + """ + Abstract base for telnet client terminal context managers. + + Defines the interface used by ``_telnet_client_shell_impl`` and + ``_raw_event_loop``. Concrete implementations are + :class:`~telnetlib3.client_shell.Terminal` (POSIX) and + :class:`~telnetlib3.client_shell_win32.Terminal` (Windows). + + The type parameter ``_ModeT`` is the platform-specific terminal mode + descriptor (a namedtuple). Subclasses bind it to their own concrete mode + type so that :meth:`set_mode` and ``_make_raw`` are type-safe within + each platform. + """ + + software_echo: bool + _istty: bool + _save_mode: Optional[_ModeT] + + @abstractmethod + async def make_stdout(self) -> _StdoutWriter: + """Return a writer for local terminal output.""" + + @abstractmethod + def setup_winch(self) -> None: + """Register a terminal resize handler.""" + + @abstractmethod + def cleanup_winch(self) -> None: + """Deregister the terminal resize handler.""" + + @abstractmethod + async def connect_stdin(self) -> asyncio.StreamReader: + """Connect stdin to an asyncio :class:`~asyncio.StreamReader` and return it.""" + + @abstractmethod + def disconnect_stdin(self, reader: asyncio.StreamReader) -> None: + """Disconnect the stdin pipe and signal EOF to *reader*.""" + + @abstractmethod + def set_mode(self, mode: Optional[_ModeT]) -> None: + """Apply terminal mode settings; a ``None`` *mode* is a no-op.""" + + @abstractmethod + def _make_raw(self, mode: _ModeT, suppress_echo: bool = True) -> _ModeT: + """Return *mode* modified for raw character-at-a-time input.""" + + @abstractmethod + def check_auto_mode( + self, switched_to_raw: bool, last_will_echo: bool + ) -> Optional[Tuple[bool, bool, bool]]: + """ + Check whether terminal mode should change mid-session. + + :param switched_to_raw: Whether the terminal is already in raw mode. + :param last_will_echo: Previous value of the server's WILL ECHO state. + :returns: ``(switched_to_raw, last_will_echo, local_echo)`` if a mode + change is warranted, or ``None`` if no change is needed. + """ + + class LinemodeBuffer: """ Client-side line buffer for LINEMODE EDIT mode (RFC 1184 §3.1). @@ -277,21 +361,396 @@ def feed(self, char: str) -> Tuple[str, Optional[bytes]]: return (char, None) -if sys.platform == "win32": +def _transform_output( + out: str, writer: Union[TelnetWriter, TelnetWriterUnicode], in_raw_mode: bool +) -> str: + r""" + Apply ASCII EOL substitution and CRLF normalization. + + :param out: Server output text to transform. + :param writer: Telnet writer (``ctx`` provides ascii_eol). + :param in_raw_mode: When ``True``, normalize line endings to ``\r\n``. + :returns: Transformed output string. + """ + ctx: TelnetSessionContext = writer.ctx + if ctx.ascii_eol: + out = out.replace(_ATASCII_CR_CHAR, "\r").replace(_ATASCII_LF_CHAR, "\n") + if in_raw_mode: + out = out.replace("\r\n", "\n").replace("\n", "\r\n") + else: + # Cooked mode: PTY ONLCR converts \n -> \r\n, so strip \r before \n + # to avoid doubling (\r\n -> \r\r\n). + out = out.replace("\r\n", "\n") + return out + + +def _send_stdin( + inp: bytes, + telnet_writer: Union[TelnetWriter, TelnetWriterUnicode], + stdout: _StdoutWriter, + local_echo: bool, +) -> "tuple[Optional[asyncio.Task[None]], bool]": + """ + Send stdin input to server and optionally echo locally. + + :param inp: Raw bytes from terminal stdin. + :param telnet_writer: Telnet writer for sending to server. + :param stdout: Local stdout writer for software echo. + :param local_echo: When ``True``, echo input bytes to stdout. + :returns: ``(esc_timer_task_or_None, has_pending)`` tuple. + """ + ctx: TelnetSessionContext = telnet_writer.ctx + inf = ctx.input_filter + pending = False + new_timer: Optional[asyncio.Task[None]] = None + if inf is not None: + translated = inf.feed(inp) + if translated: + telnet_writer._write(translated) + if inf.has_pending: + pending = True + new_timer = asyncio.ensure_future(asyncio.sleep(inf.esc_delay)) + else: + telnet_writer._write(inp) + if local_echo: + echo_buf = bytearray() + for b in inp: + if b in (0x7F, 0x08): + echo_buf.extend(b"\b \b") + elif b == 0x0D: + echo_buf.extend(b"\r\n") + elif b >= 0x20: + echo_buf.append(b) + if echo_buf: + stdout.write(bytes(echo_buf)) + return new_timer, pending + + +def _get_raw_mode(writer: Union[TelnetWriter, TelnetWriterUnicode]) -> "bool | None": + """ + Return the raw-mode override from the writer's session context. + + ``None`` = auto-detect from server negotiation (default), + ``True`` = force raw / character-at-a-time, + ``False`` = force line mode. + """ + return writer.ctx.raw_mode + + +def _ensure_autoreply_engine( + telnet_writer: Union[TelnetWriter, TelnetWriterUnicode], +) -> "Optional[Any]": + """ + Return the autoreply engine from the writer's session context, or ``None``. + + The autoreply engine is optional application-level machinery (e.g. a macro + engine in a MUD client) that watches server output and sends pre-configured + replies. It is absent in standalone telnetlib3 and supplied by the host + application via ``writer.ctx.autoreply_engine``. + """ + return telnet_writer.ctx.autoreply_engine + + +def _get_linemode_buffer(writer: Union[TelnetWriter, TelnetWriterUnicode]) -> "LinemodeBuffer": + """ + Return (or lazily create) the :class:`LinemodeBuffer` attached to *writer*. + + The buffer is stored as ``writer._linemode_buf`` so it persists across loop + iterations and accumulates characters between :meth:`LinemodeBuffer.feed` + calls. Created on first use because LINEMODE negotiation may complete after + the shell has already started. + """ + buf: Optional[LinemodeBuffer] = getattr(writer, "_linemode_buf", None) + if buf is None: + buf = LinemodeBuffer( + slctab=writer.slctab, forwardmask=writer.forwardmask, trapsig=writer.linemode.trapsig + ) + writer._linemode_buf = buf + return buf + + +async def _raw_event_loop( + telnet_reader: Union[TelnetReader, TelnetReaderUnicode], + telnet_writer: Union[TelnetWriter, TelnetWriterUnicode], + tty_shell: TelnetTerminalShell[Any], + stdin: asyncio.StreamReader, + stdout: _StdoutWriter, + keyboard_escape: str, + state: _RawLoopState, + handle_close: Callable[[str], None], + want_repl: Callable[[], bool], +) -> None: + """Standard byte-at-a-time event loop (mutates *state* in-place).""" + stdin_task = accessories.make_reader_task(stdin) + telnet_task = accessories.make_reader_task(telnet_reader, size=2**24) + esc_timer_task: Optional[asyncio.Task[None]] = None + wait_for: set[asyncio.Task[Any]] = {stdin_task, telnet_task} + + while wait_for: + done, _ = await asyncio.wait(wait_for, return_when=asyncio.FIRST_COMPLETED) + if stdin_task in done: + task = stdin_task + done.discard(task) + else: + task = done.pop() + wait_for.discard(task) + + telnet_writer.log.log(TRACE, "task=%s, wait_for=%s", task, wait_for) + + # ESC_DELAY timer fired -- flush buffered partial sequence + if task is esc_timer_task: + esc_timer_task = None + inf = telnet_writer.ctx.input_filter + if inf is not None and inf.has_pending: + flushed = inf.flush() + if flushed: + telnet_writer._write(flushed) + continue + + # client input + if task == stdin_task: + if esc_timer_task is not None and esc_timer_task in wait_for: + esc_timer_task.cancel() + wait_for.discard(esc_timer_task) + esc_timer_task = None + inp = task.result() + if not inp: + telnet_writer.log.debug("EOF from client stdin") + continue + if keyboard_escape in inp.decode("utf-8", errors="replace"): + telnet_writer.close() + if telnet_task in wait_for: + telnet_task.cancel() + wait_for.remove(telnet_task) + handle_close("Connection closed.") + break + linemode_edit = ( + telnet_writer.local_option.enabled(LINEMODE) and telnet_writer.linemode.edit + ) + if linemode_edit and state.switched_to_raw: + # Raw PTY or non-TTY: kernel not doing line editing, use LinemodeBuffer + lmbuf = _get_linemode_buffer(telnet_writer) + for ch in inp.decode(errors="replace"): + echo, data = lmbuf.feed(ch) + if echo: + stdout.write(echo.encode()) + if data: + telnet_writer._write(data) + new_timer, has_pending = None, False + elif linemode_edit: + # Cooked PTY: kernel already handled EC/EL/echo; forward line directly + new_timer, has_pending = _send_stdin(inp, telnet_writer, stdout, False) + else: + new_timer, has_pending = _send_stdin(inp, telnet_writer, stdout, state.local_echo) + if has_pending and esc_timer_task not in wait_for: + esc_timer_task = new_timer + if esc_timer_task is not None: + wait_for.add(esc_timer_task) + stdin_task = accessories.make_reader_task(stdin) + wait_for.add(stdin_task) + + # server output + elif task == telnet_task: + out = task.result() + if not out and telnet_reader.at_eof(): + if stdin_task in wait_for: + stdin_task.cancel() + wait_for.remove(stdin_task) + handle_close("Connection closed by foreign host.") + continue + raw_mode = _get_raw_mode(telnet_writer) + in_raw = raw_mode is True or (raw_mode is None and state.switched_to_raw) + out = _transform_output(out, telnet_writer, in_raw) + ar_engine = _ensure_autoreply_engine(telnet_writer) + if ar_engine is not None: + ar_engine.feed(out) + if raw_mode is None or (raw_mode is True and state.switched_to_raw): + mode_result = tty_shell.check_auto_mode(state.switched_to_raw, state.last_will_echo) + if mode_result is not None: + if not state.switched_to_raw: + state.linesep = "\r\n" + state.switched_to_raw, state.last_will_echo, state.local_echo = mode_result + # When transitioning cooked -> raw, the data was + # processed for ONLCR (\r\n -> \n) but the terminal + # now has ONLCR disabled. Re-normalize so bare \n + # becomes \r\n for correct display. + if state.switched_to_raw and not in_raw: + out = out.replace("\n", "\r\n") + if raw_mode is None and want_repl(): + state.reactivate_repl = True + stdout.write(out.encode()) + _ts_file = telnet_writer.ctx.typescript_file + if _ts_file is not None: + _ts_file.write(out) + _ts_file.flush() + if state.reactivate_repl: + telnet_writer.log.debug("mode returned to local, reactivating REPL") + if stdin_task in wait_for: + stdin_task.cancel() + wait_for.discard(stdin_task) + state.switched_to_raw = False + break + telnet_task = accessories.make_reader_task(telnet_reader, size=2**24) + wait_for.add(telnet_task) - async def telnet_client_shell( - telnet_reader: Union[TelnetReader, TelnetReaderUnicode], - telnet_writer: Union[TelnetWriter, TelnetWriterUnicode], - ) -> None: - """Win32 telnet client shell (not implemented).""" - raise NotImplementedError("win32 not yet supported as telnet client. Please contribute!") + +async def _telnet_client_shell_impl( + telnet_reader: Union[TelnetReader, TelnetReaderUnicode], + telnet_writer: Union[TelnetWriter, TelnetWriterUnicode], + tty_shell: TelnetTerminalShell[_ModeT], +) -> None: + """ + Shared implementation body for :func:`telnet_client_shell` on all platforms. + + Called with an already-entered terminal context manager (*tty_shell*). Handles mode negotiation, + GA/EOR pacing, and the raw event loop. + """ + keyboard_escape = "\x1d" + linesep = "\n" + switched_to_raw = False + last_will_echo = False + local_echo = tty_shell.software_echo + if tty_shell._istty: + raw_mode = _get_raw_mode(telnet_writer) + if telnet_writer.will_echo or raw_mode is True: + linesep = "\r\n" + stdout = await tty_shell.make_stdout() + tty_shell.setup_winch() + + # Prompt-pacing via IAC GA / IAC EOR. + # + # MUD servers emit IAC GA (Go-Ahead, RFC 854) or IAC EOR (End-of-Record, RFC 885) after + # each prompt to signal "output is complete, awaiting your input." The autoreply engine + # uses this to pace its replies. It calls ctx.autoreply_wait_fn() before sending each + # reply, preventing races where a reply arrives before the server has finished rendering + # the prompt. + # + # 'server_uses_ga' becomes True on the first GA/EOR received. _wait_for_prompt does + # nothing until 'server_uses_ga', so servers that never send GA/EOR (Most everything but + # MUDs these days) are silently unaffected. + # + # prompt_event starts SET so the first autoreply fires immediately -- there is no prior + # GA to wait for. _on_ga_or_eor re-sets it on each prompt signal; _wait_for_prompt + # clears it after consuming the signal so the next autoreply waits for the following + # prompt. + prompt_event = asyncio.Event() + prompt_event.set() + server_uses_ga = False + + # The session context is the decoupling point between this shell and the + # autoreply engine (which may live in a separate module). Storing + # _wait_for_prompt on it lets the engine call back into our local event state + # without a direct import or reference to this closure. + ctx: TelnetSessionContext = telnet_writer.ctx + + def _on_ga_or_eor(_cmd: bytes) -> None: + nonlocal server_uses_ga + server_uses_ga = True + prompt_event.set() + ar = ctx.autoreply_engine + if ar is not None: + ar.on_prompt() + + from .telopt import GA, CMD_EOR + + telnet_writer.set_iac_callback(GA, _on_ga_or_eor) + telnet_writer.set_iac_callback(CMD_EOR, _on_ga_or_eor) + + async def _wait_for_prompt() -> None: + """ + Wait for the next prompt signal before the autoreply engine sends a reply. + + No-op until the first GA/EOR confirms this server uses prompt signalling. + After that, blocks until :func:`_on_ga_or_eor` fires the event, then clears + it to arm the wait for the following prompt. A 2-second safety timeout + prevents stalling if the server stops sending GA mid-session. + """ + if not server_uses_ga: + return + try: + await asyncio.wait_for(prompt_event.wait(), timeout=2.0) + except asyncio.TimeoutError: + pass + prompt_event.clear() + + ctx.autoreply_wait_fn = _wait_for_prompt + + escape_name = accessories.name_unicode(keyboard_escape) + banner_sep = "\r\n" if tty_shell._istty else linesep + stdout.write(f"Escape character is '{escape_name}'.{banner_sep}".encode()) + + def _handle_close(msg: str) -> None: + # \033[m resets all SGR attributes so server-set colours do not + # bleed into the terminal after disconnect. + stdout.write(f"\033[m{linesep}{msg}{linesep}".encode()) + tty_shell.cleanup_winch() + + def _should_reactivate_repl() -> bool: + # Extension point for callers that embed a REPL (e.g. a MUD client). + # Return True to break _raw_event_loop and return to the REPL when + # the server puts the terminal back into local mode. The base shell + # has no REPL, so this always returns False. + return False + + # Wait up to 50 ms for subsequent WILL ECHO / WILL SGA packets to arrive before + # committing to a terminal mode. + # + # check_negotiation() declares the handshake complete as soon as TTYPE and NEW_ENVIRON / + # CHARSET are settled, without waiting for ECHO / SGA. Those options typically travel + # in the same "initial negotiation burst" but may not have not yet have "arrived" at + # this point in our TCP read until a few milliseconds later. Servers that never send + # WILL ECHO (rlogin, basically) simply time out and proceed correctly. + raw_mode = _get_raw_mode(telnet_writer) + if raw_mode is not False and tty_shell._istty: + try: + await asyncio.wait_for( + telnet_writer.wait_for_condition(lambda w: w.mode != "local"), timeout=0.05 + ) + except (asyncio.TimeoutError, asyncio.CancelledError): + pass + + # Commit the terminal to raw mode now that will_echo is stable. suppress_echo=True + # disables the kernel's local ECHO because the server will echo (or we handle it in + # software). local_echo is set to True only when the server will NOT echo, so we + # reproduce keystrokes ourselves. + if not switched_to_raw and tty_shell._istty and tty_shell._save_mode is not None: + tty_shell.set_mode(tty_shell._make_raw(tty_shell._save_mode, suppress_echo=True)) + switched_to_raw = True + local_echo = not telnet_writer.will_echo + linesep = "\r\n" + stdin = await tty_shell.connect_stdin() + state = _RawLoopState( + switched_to_raw=switched_to_raw, + last_will_echo=last_will_echo, + local_echo=local_echo, + linesep=linesep, + ) + await _raw_event_loop( + telnet_reader, + telnet_writer, + tty_shell, + stdin, + stdout, + keyboard_escape, + state, + _handle_close, + _should_reactivate_repl, + ) + tty_shell.disconnect_stdin(stdin) + + +if sys.platform == "win32": + from .client_shell_win32 import Terminal, telnet_client_shell # noqa: F401 else: - import os import signal import termios - class Terminal: + _PosixMode = collections.namedtuple( + "_PosixMode", ["iflag", "oflag", "cflag", "lflag", "ispeed", "ospeed", "cc"] + ) + + class Terminal(TelnetTerminalShell[_PosixMode]): """ Context manager for terminal mode handling on POSIX systems. @@ -299,15 +758,13 @@ class Terminal: negotiated for the given telnet_writer. """ - ModeDef = collections.namedtuple( - "ModeDef", ["iflag", "oflag", "cflag", "lflag", "ispeed", "ospeed", "cc"] - ) + ModeDef = _PosixMode def __init__(self, telnet_writer: Union[TelnetWriter, TelnetWriterUnicode]) -> None: self.telnet_writer = telnet_writer self._fileno = sys.stdin.fileno() self._istty = os.path.sameopenfile(0, 1) - self._save_mode: Optional[Terminal.ModeDef] = None + self._save_mode: Optional[_PosixMode] = None self.software_echo = False self._remove_winch = False self._resize_pending = threading.Event() @@ -318,11 +775,16 @@ def setup_winch(self) -> None: """Register SIGWINCH handler to set ``_resize_pending`` flag.""" if not self._istty or not hasattr(signal, "SIGWINCH"): return + from .telopt import NAWS + + writer = self.telnet_writer try: loop = asyncio.get_event_loop() def _on_winch() -> None: self._resize_pending.set() + if writer.local_option.enabled(NAWS): + writer._send_naws() loop.add_signal_handler(signal.SIGWINCH, _on_winch) self._remove_winch = True @@ -351,18 +813,20 @@ def __exit__(self, *_: Any) -> None: assert self._save_mode is not None termios.tcsetattr(self._fileno, termios.TCSADRAIN, list(self._save_mode)) - def get_mode(self) -> Optional["Terminal.ModeDef"]: + def get_mode(self) -> Optional[_PosixMode]: """Return current terminal mode if attached to a tty, otherwise None.""" if self._istty: return self.ModeDef(*termios.tcgetattr(self._fileno)) return None - def set_mode(self, mode: "Terminal.ModeDef") -> None: - """Set terminal mode attributes.""" + def set_mode(self, mode: Optional[_PosixMode]) -> None: + """Set terminal mode attributes; a ``None`` *mode* is a no-op.""" + if mode is None: + return termios.tcsetattr(sys.stdin.fileno(), termios.TCSADRAIN, list(mode)) @staticmethod - def _suppress_echo(mode: "Terminal.ModeDef") -> "Terminal.ModeDef": + def _suppress_echo(mode: _PosixMode) -> _PosixMode: """Return copy of *mode* with local ECHO disabled, keeping ICANON.""" return Terminal.ModeDef( iflag=mode.iflag, @@ -374,9 +838,7 @@ def _suppress_echo(mode: "Terminal.ModeDef") -> "Terminal.ModeDef": cc=mode.cc, ) - def _make_raw( - self, mode: "Terminal.ModeDef", suppress_echo: bool = True - ) -> "Terminal.ModeDef": + def _make_raw(self, mode: _PosixMode, suppress_echo: bool = True) -> _PosixMode: """ Return copy of *mode* with raw terminal attributes set. @@ -415,7 +877,7 @@ def _server_will_sga(self) -> bool: def check_auto_mode( self, switched_to_raw: bool, last_will_echo: bool - ) -> "tuple[bool, bool, bool] | None": + ) -> Optional[Tuple[bool, bool, bool]]: """ Check if auto-mode switching is needed. @@ -468,7 +930,7 @@ def check_auto_mode( ) return (True if should_go_raw else switched_to_raw, wecho, not wecho) - def determine_mode(self, mode: "Terminal.ModeDef") -> "Terminal.ModeDef": + def determine_mode(self, mode: _PosixMode) -> _PosixMode: """ Return copy of 'mode' with changes suggested for telnet connection. @@ -572,242 +1034,6 @@ async def make_stdio(self) -> Tuple[asyncio.StreamReader, asyncio.StreamWriter]: stdin = await self.connect_stdin() return stdin, stdout - def _transform_output( - out: str, writer: Union[TelnetWriter, TelnetWriterUnicode], in_raw_mode: bool - ) -> str: - r""" - Apply ASCII EOL substitution and CRLF normalization. - - :param out: Server output text to transform. - :param writer: Telnet writer (``ctx`` provides ascii_eol). - :param in_raw_mode: When ``True``, normalize line endings to ``\r\n``. - :returns: Transformed output string. - """ - ctx: TelnetSessionContext = writer.ctx - if ctx.ascii_eol: - out = out.replace(_ATASCII_CR_CHAR, "\r").replace(_ATASCII_LF_CHAR, "\n") - if in_raw_mode: - out = out.replace("\r\n", "\n").replace("\n", "\r\n") - else: - # Cooked mode: PTY ONLCR converts \n -> \r\n, so strip \r before \n - # to avoid doubling (\r\n -> \r\r\n). - out = out.replace("\r\n", "\n") - return out - - def _send_stdin( - inp: bytes, - telnet_writer: Union[TelnetWriter, TelnetWriterUnicode], - stdout: asyncio.StreamWriter, - local_echo: bool, - ) -> "tuple[Optional[asyncio.Task[None]], bool]": - """ - Send stdin input to server and optionally echo locally. - - :param inp: Raw bytes from terminal stdin. - :param telnet_writer: Telnet writer for sending to server. - :param stdout: Local stdout writer for software echo. - :param local_echo: When ``True``, echo input bytes to stdout. - :returns: ``(esc_timer_task_or_None, has_pending)`` tuple. - """ - ctx: TelnetSessionContext = telnet_writer.ctx - inf = ctx.input_filter - pending = False - new_timer: Optional[asyncio.Task[None]] = None - if inf is not None: - translated = inf.feed(inp) - if translated: - telnet_writer._write(translated) - if inf.has_pending: - pending = True - new_timer = asyncio.ensure_future(asyncio.sleep(inf.esc_delay)) - else: - telnet_writer._write(inp) - if local_echo: - echo_buf = bytearray() - for b in inp: - if b in (0x7F, 0x08): - echo_buf.extend(b"\b \b") - elif b == 0x0D: - echo_buf.extend(b"\r\n") - elif b >= 0x20: - echo_buf.append(b) - if echo_buf: - stdout.write(bytes(echo_buf)) - return new_timer, pending - - def _get_raw_mode(writer: Union[TelnetWriter, TelnetWriterUnicode]) -> "bool | None": - """ - Return the raw-mode override from the writer's session context. - - ``None`` = auto-detect from server negotiation (default), - ``True`` = force raw / character-at-a-time, - ``False`` = force line mode. - """ - return writer.ctx.raw_mode - - def _ensure_autoreply_engine( - telnet_writer: Union[TelnetWriter, TelnetWriterUnicode], - ) -> "Optional[Any]": - """ - Return the autoreply engine from the writer's session context, or ``None``. - - The autoreply engine is optional application-level machinery (e.g. a macro - engine in a MUD client) that watches server output and sends pre-configured - replies. It is absent in standalone telnetlib3 and supplied by the host - application via ``writer.ctx.autoreply_engine``. - """ - return telnet_writer.ctx.autoreply_engine - - def _get_linemode_buffer(writer: Union[TelnetWriter, TelnetWriterUnicode]) -> "LinemodeBuffer": - """ - Return (or lazily create) the :class:`LinemodeBuffer` attached to *writer*. - - The buffer is stored as ``writer._linemode_buf`` so it persists across loop - iterations and accumulates characters between :meth:`LinemodeBuffer.feed` - calls. Created on first use because LINEMODE negotiation may complete after - the shell has already started. - """ - buf: Optional[LinemodeBuffer] = getattr(writer, "_linemode_buf", None) - if buf is None: - buf = LinemodeBuffer( - slctab=writer.slctab, - forwardmask=writer.forwardmask, - trapsig=writer.linemode.trapsig, - ) - writer._linemode_buf = buf - return buf - - async def _raw_event_loop( - telnet_reader: Union[TelnetReader, TelnetReaderUnicode], - telnet_writer: Union[TelnetWriter, TelnetWriterUnicode], - tty_shell: "Terminal", - stdin: asyncio.StreamReader, - stdout: asyncio.StreamWriter, - keyboard_escape: str, - state: _RawLoopState, - handle_close: Callable[[str], None], - want_repl: Callable[[], bool], - ) -> None: - """Standard byte-at-a-time event loop (mutates *state* in-place).""" - stdin_task = accessories.make_reader_task(stdin) - telnet_task = accessories.make_reader_task(telnet_reader, size=2**24) - esc_timer_task: Optional[asyncio.Task[None]] = None - wait_for: set[asyncio.Task[Any]] = {stdin_task, telnet_task} - - while wait_for: - done, _ = await asyncio.wait(wait_for, return_when=asyncio.FIRST_COMPLETED) - if stdin_task in done: - task = stdin_task - done.discard(task) - else: - task = done.pop() - wait_for.discard(task) - - telnet_writer.log.log(TRACE, "task=%s, wait_for=%s", task, wait_for) - - # ESC_DELAY timer fired -- flush buffered partial sequence - if task is esc_timer_task: - esc_timer_task = None - inf = telnet_writer.ctx.input_filter - if inf is not None and inf.has_pending: - flushed = inf.flush() - if flushed: - telnet_writer._write(flushed) - continue - - # client input - if task == stdin_task: - if esc_timer_task is not None and esc_timer_task in wait_for: - esc_timer_task.cancel() - wait_for.discard(esc_timer_task) - esc_timer_task = None - inp = task.result() - if not inp: - telnet_writer.log.debug("EOF from client stdin") - continue - if keyboard_escape in inp.decode(): - try: - telnet_writer.close() - except Exception: - pass - if telnet_task in wait_for: - telnet_task.cancel() - wait_for.remove(telnet_task) - handle_close("Connection closed.") - break - linemode_edit = ( - telnet_writer.local_option.enabled(LINEMODE) and telnet_writer.linemode.edit - ) - if linemode_edit and state.switched_to_raw: - # Raw PTY or non-TTY: kernel not doing line editing, use LinemodeBuffer - lmbuf = _get_linemode_buffer(telnet_writer) - for ch in inp.decode(errors="replace"): - echo, data = lmbuf.feed(ch) - if echo: - stdout.write(echo.encode()) - if data: - telnet_writer._write(data) - new_timer, has_pending = None, False - elif linemode_edit: - # Cooked PTY: kernel already handled EC/EL/echo; forward line directly - new_timer, has_pending = _send_stdin(inp, telnet_writer, stdout, False) - else: - new_timer, has_pending = _send_stdin( - inp, telnet_writer, stdout, state.local_echo - ) - if has_pending and esc_timer_task not in wait_for: - esc_timer_task = new_timer - if esc_timer_task is not None: - wait_for.add(esc_timer_task) - stdin_task = accessories.make_reader_task(stdin) - wait_for.add(stdin_task) - - # server output - elif task == telnet_task: - out = task.result() - if not out and telnet_reader.at_eof(): - if stdin_task in wait_for: - stdin_task.cancel() - wait_for.remove(stdin_task) - handle_close("Connection closed by foreign host.") - continue - raw_mode = _get_raw_mode(telnet_writer) - in_raw = raw_mode is True or (raw_mode is None and state.switched_to_raw) - out = _transform_output(out, telnet_writer, in_raw) - ar_engine = _ensure_autoreply_engine(telnet_writer) - if ar_engine is not None: - ar_engine.feed(out) - if raw_mode is None or (raw_mode is True and state.switched_to_raw): - mode_result = tty_shell.check_auto_mode( - state.switched_to_raw, state.last_will_echo - ) - if mode_result is not None: - if not state.switched_to_raw: - state.linesep = "\r\n" - state.switched_to_raw, state.last_will_echo, state.local_echo = mode_result - # When transitioning cooked -> raw, the data was - # processed for ONLCR (\r\n -> \n) but the terminal - # now has ONLCR disabled. Re-normalize so bare \n - # becomes \r\n for correct display. - if state.switched_to_raw and not in_raw: - out = out.replace("\n", "\r\n") - if raw_mode is None and want_repl(): - state.reactivate_repl = True - stdout.write(out.encode()) - _ts_file = telnet_writer.ctx.typescript_file - if _ts_file is not None: - _ts_file.write(out) - _ts_file.flush() - if state.reactivate_repl: - telnet_writer.log.debug("mode returned to local, reactivating REPL") - if stdin_task in wait_for: - stdin_task.cancel() - wait_for.discard(stdin_task) - state.switched_to_raw = False - break - telnet_task = accessories.make_reader_task(telnet_reader, size=2**24) - wait_for.add(telnet_task) - async def telnet_client_shell( telnet_reader: Union[TelnetReader, TelnetReaderUnicode], telnet_writer: Union[TelnetWriter, TelnetWriterUnicode], @@ -821,137 +1047,5 @@ async def telnet_client_shell( stdin or stdout may also be a pipe or file, behaving much like nc(1). """ - keyboard_escape = "\x1d" - with Terminal(telnet_writer=telnet_writer) as tty_shell: - linesep = "\n" - switched_to_raw = False - last_will_echo = False - local_echo = tty_shell.software_echo - if tty_shell._istty: - raw_mode = _get_raw_mode(telnet_writer) - if telnet_writer.will_echo or raw_mode is True: - linesep = "\r\n" - stdout = await tty_shell.make_stdout() - tty_shell.setup_winch() - - # Prompt-pacing via IAC GA / IAC EOR. - # - # MUD servers emit IAC GA (Go-Ahead, RFC 854) or IAC EOR (End-of-Record, RFC 885) after - # each prompt to signal "output is complete, awaiting your input." The autoreply engine - # uses this to pace its replies. It calls ctx.autoreply_wait_fn() before sending each - # reply, preventing races where a reply arrives before the server has finished rendering - # the prompt. - # - # 'server_uses_ga' becomes True on the first GA/EOR received. _wait_for_prompt is does - # nothing until 'server_uses_ga', so servers that never send GA/EOR (Most everything but - # MUDs these days) are silently unaffected. - # - # prompt_event starts SET so the first autoreply fires immediately — there is no prior - # GA to wait for. _on_ga_or_eor re-sets it on each prompt signal; _wait_for_prompt - # clears it after consuming the signal so the next autoreply waits for the following - # prompt. - prompt_event = asyncio.Event() - prompt_event.set() - server_uses_ga = False - - # The session context is the decoupling point between this shell and the - # autoreply engine (which may live in a separate module). Storing - # _wait_for_prompt on it lets the engine call back into our local event state - # without a direct import or reference to this closure. - ctx: TelnetSessionContext = telnet_writer.ctx - - def _on_ga_or_eor(_cmd: bytes) -> None: - nonlocal server_uses_ga - server_uses_ga = True - prompt_event.set() - ar = ctx.autoreply_engine - if ar is not None: - ar.on_prompt() - - from .telopt import GA, CMD_EOR - - telnet_writer.set_iac_callback(GA, _on_ga_or_eor) - telnet_writer.set_iac_callback(CMD_EOR, _on_ga_or_eor) - - async def _wait_for_prompt() -> None: - """ - Wait for the next prompt signal before the autoreply engine sends a reply. - - No-op until the first GA/EOR confirms this server uses prompt signalling. - After that, blocks until :func:`_on_ga_or_eor` fires the event, then clears - it to arm the wait for the following prompt. A 2-second safety timeout - prevents stalling if the server stops sending GA mid-session. - """ - if not server_uses_ga: - return - try: - await asyncio.wait_for(prompt_event.wait(), timeout=2.0) - except asyncio.TimeoutError: - pass - prompt_event.clear() - - ctx.autoreply_wait_fn = _wait_for_prompt - - escape_name = accessories.name_unicode(keyboard_escape) - banner_sep = "\r\n" if tty_shell._istty else linesep - stdout.write(f"Escape character is '{escape_name}'.{banner_sep}".encode()) - - def _handle_close(msg: str) -> None: - # \033[m resets all SGR attributes so server-set colours do not - # bleed into the terminal after disconnect. - stdout.write(f"\033[m{linesep}{msg}{linesep}".encode()) - tty_shell.cleanup_winch() - - def _should_reactivate_repl() -> bool: - # Extension point for callers that embed a REPL (e.g. a MUD client). - # Return True to break _raw_event_loop and return to the REPL when - # the server puts the terminal back into local mode. The base shell - # has no REPL, so this always returns False. - return False - - # Wait up to 50 ms for subsequent WILL ECHO / WILL SGA packets to arrive before - # committing to a terminal mode. - # - # check_negotiation() declares the handshake complete as soon as TTYPE and NEW_ENVIRON / - # CHARSET are settled, without waiting for ECHO / SGA. Those options typically travel - # in the same "initial negotiation burst" but may not have not yet have "arrived" at - # this point in our TCP read until a few milliseconds later. Servers that never send - # WILL ECHO (rlogin, basically) simply time out and proceed correctly. - raw_mode = _get_raw_mode(telnet_writer) - if raw_mode is not False and tty_shell._istty: - try: - await asyncio.wait_for( - telnet_writer.wait_for_condition(lambda w: w.mode != "local"), timeout=0.05 - ) - except (asyncio.TimeoutError, asyncio.CancelledError): - pass - - # Commit the terminal to raw mode now that will_echo is stable. suppress_echo=True - # disables the kernel's local ECHO because the server will echo (or we handle it in - # software). local_echo is set to True only when the server will NOT echo, so we - # reproduce keystrokes ourselves. - if not switched_to_raw and tty_shell._istty and tty_shell._save_mode is not None: - tty_shell.set_mode(tty_shell._make_raw(tty_shell._save_mode, suppress_echo=True)) - switched_to_raw = True - local_echo = not telnet_writer.will_echo - linesep = "\r\n" - stdin = await tty_shell.connect_stdin() - state = _RawLoopState( - switched_to_raw=switched_to_raw, - last_will_echo=last_will_echo, - local_echo=local_echo, - linesep=linesep, - ) - await _raw_event_loop( - telnet_reader, - telnet_writer, - tty_shell, - stdin, - stdout, - keyboard_escape, - state, - _handle_close, - _should_reactivate_repl, - ) - tty_shell.disconnect_stdin(stdin) + await _telnet_client_shell_impl(telnet_reader, telnet_writer, tty_shell) diff --git a/telnetlib3/client_shell_win32.py b/telnetlib3/client_shell_win32.py new file mode 100644 index 0000000..970269b --- /dev/null +++ b/telnetlib3/client_shell_win32.py @@ -0,0 +1,248 @@ +"""Windows telnet client shell implementation using blessed/jinxed.""" + +# std imports +import os +import sys +import asyncio +import threading +import contextlib +import collections +from typing import Any, Tuple, Union, Callable, Optional + +# local +from .client_shell import TelnetTerminalShell, _get_raw_mode, _telnet_client_shell_impl +from .stream_reader import TelnetReader, TelnetReaderUnicode +from .stream_writer import TelnetWriter, TelnetWriterUnicode + +_WinMode = collections.namedtuple("_WinMode", ["raw", "echo"]) + + +class Terminal(TelnetTerminalShell[_WinMode]): + """ + Context manager for terminal mode handling on Windows via blessed/jinxed. + + Blessed is a guaranteed dependency on Windows (pyproject.toml environment marker). + Mirrors the interface of the POSIX ``Terminal`` class in :mod:`telnetlib3.client_shell`. + """ + + ModeDef = _WinMode + + def __init__(self, telnet_writer: Union[TelnetWriter, TelnetWriterUnicode]) -> None: + """Class Initializer.""" + # imported locally, so that this module may be safely imported by non-windows systems + # without blessed, mainly just so that documentation (sphinx builds) work, doesn't matter + # otherwise. + import blessed + + self.telnet_writer = telnet_writer + self._bt = blessed.Terminal() + self._istty = self._bt.is_a_tty + self._save_mode: Optional[_WinMode] = None + self.software_echo = False + self._raw_ctx: Optional[contextlib.ExitStack] = None + self._resize_pending = threading.Event() + self.on_resize: Optional[Callable[[int, int], None]] = None + self._stop_resize = threading.Event() + self._stop_stdin = threading.Event() + self._resize_thread: Optional[threading.Thread] = None + self._stdin_thread: Optional[threading.Thread] = None + + def __enter__(self) -> "Terminal": + self._save_mode = self.get_mode() + if self._istty and self._save_mode is not None: + self.set_mode(self.determine_mode(self._save_mode)) + return self + + def __exit__(self, *_: Any) -> None: + self.cleanup_winch() + if self._istty and self._save_mode is not None: + self.set_mode(self._save_mode) + + def get_mode(self) -> Optional[_WinMode]: + """Return current terminal mode if attached to a tty, otherwise None.""" + if not self._istty: + return None + return self.ModeDef(raw=False, echo=True) + + def set_mode(self, mode: Optional[_WinMode]) -> None: + """Switch terminal to raw or cooked mode using blessed context managers.""" + if mode is None: + return + ctx = self._raw_ctx + if mode.raw and ctx is None: + self._raw_ctx = contextlib.ExitStack() + self._raw_ctx.enter_context(self._bt.raw()) + elif not mode.raw and ctx is not None: + ctx.close() + self._raw_ctx = None + + def _make_raw(self, mode: _WinMode, suppress_echo: bool = True) -> _WinMode: + """Return a raw ModeDef (mirrors POSIX Terminal._make_raw interface).""" + return self.ModeDef(raw=True, echo=not suppress_echo) + + @staticmethod + def _suppress_echo(mode: _WinMode) -> _WinMode: + """Return copy of *mode* with echo disabled.""" + return Terminal.ModeDef(raw=mode.raw, echo=False) + + def _server_will_sga(self) -> bool: + """Whether SGA has been negotiated (either direction).""" + from .telopt import SGA + + w = self.telnet_writer + return bool(w.client and (w.remote_option.enabled(SGA) or w.local_option.enabled(SGA))) + + def determine_mode(self, mode: _WinMode) -> _WinMode: + """ + Return the appropriate mode for the current telnet negotiation state. + + Windows equivalent of the POSIX ``Terminal.determine_mode``, using + ``ModeDef`` (raw/echo flags instead of termios bitfields). + """ + raw_mode = _get_raw_mode(self.telnet_writer) + will_echo = self.telnet_writer.will_echo + will_sga = self._server_will_sga() + if raw_mode is None: + if will_echo and will_sga: + return self._make_raw(mode) + if will_echo: + return self._suppress_echo(mode) + if will_sga: + self.software_echo = True + return self._make_raw(mode, suppress_echo=False) + return mode + if not raw_mode: + return mode + return self._make_raw(mode) + + def check_auto_mode( + self, switched_to_raw: bool, last_will_echo: bool + ) -> Optional[Tuple[bool, bool, bool]]: + """ + Check if auto-mode switching is needed. + + Windows equivalent of the POSIX ``Terminal.check_auto_mode``. + + :param switched_to_raw: Whether terminal has already switched to raw mode. + :param last_will_echo: Previous value of server's WILL ECHO state. + :returns: ``(switched_to_raw, last_will_echo, local_echo)`` tuple + if mode changed, or ``None`` if no change needed. + """ + if not self._istty: + return None + wecho = self.telnet_writer.will_echo + wsga = self._server_will_sga() + should_go_raw = not switched_to_raw and wsga + should_suppress_echo = not switched_to_raw and wecho and not wsga + echo_changed = switched_to_raw and wecho != last_will_echo + if not (should_go_raw or should_suppress_echo or echo_changed): + return None + assert self._save_mode is not None + if should_suppress_echo: + self.set_mode(self._suppress_echo(self._save_mode)) + return (False, wecho, False) + self.set_mode(self._make_raw(self._save_mode, suppress_echo=True)) + return (True if should_go_raw else switched_to_raw, wecho, not wecho) + + def setup_winch(self) -> None: + """Poll for terminal size changes in a background thread.""" + if not self._istty: + return + self._stop_resize.clear() + try: + last_size = os.get_terminal_size() + except OSError: + return + + from .telopt import NAWS + + writer = self.telnet_writer + loop = asyncio.get_running_loop() + + def _poll() -> None: + nonlocal last_size + while not self._stop_resize.wait(0.5): + try: + new_size = os.get_terminal_size() + if new_size != last_size: + last_size = new_size + self._resize_pending.set() + if writer.local_option.enabled(NAWS): + loop.call_soon_threadsafe(writer._send_naws) + except OSError: + pass + + self._resize_thread = threading.Thread( + target=_poll, daemon=True, name="telnetlib3-resize-poll" + ) + self._resize_thread.start() + + def cleanup_winch(self) -> None: + """Stop the resize polling thread.""" + self._stop_resize.set() + thread = self._resize_thread + self._resize_thread = None + if thread is not None and thread is not threading.current_thread(): + thread.join(timeout=1.0) + + async def make_stdout(self) -> Any: + """Return a StreamWriter-compatible wrapper for sys.stdout.""" + + class _WindowsWriter: + def write(self, data: bytes) -> None: + """Write bytes to stdout and flush immediately.""" + sys.stdout.buffer.write(data) + sys.stdout.buffer.flush() + + async def drain(self) -> None: + """No-op drain; stdout writes are synchronous.""" + pass # pylint: disable=unnecessary-pass + + return _WindowsWriter() + + async def connect_stdin(self) -> asyncio.StreamReader: + """ + Return an asyncio StreamReader fed by a blessed inkey() thread. + + Uses blessed/jinxed to read one keypress at a time in raw mode. Each keystroke is encoded as + UTF-8 and fed to the reader. + """ + reader = asyncio.StreamReader() + loop = asyncio.get_running_loop() + self._stop_stdin.clear() + bt = self._bt + + def _reader_thread() -> None: + while not self._stop_stdin.is_set(): + key = bt.inkey(timeout=0.1) + if key: + data = str(key).encode("utf-8", errors="replace") + loop.call_soon_threadsafe(reader.feed_data, data) + loop.call_soon_threadsafe(reader.feed_eof) + + self._stdin_thread = threading.Thread( + target=_reader_thread, daemon=True, name="telnetlib3-stdin-reader" + ) + self._stdin_thread.start() + return reader + + def disconnect_stdin(self, reader: asyncio.StreamReader) -> None: + """Stop the stdin reader thread and signal EOF.""" + self._stop_stdin.set() + reader.feed_eof() + if self._stdin_thread is not None and self._stdin_thread is not threading.current_thread(): + self._stdin_thread.join(timeout=1.0) + + +async def telnet_client_shell( + telnet_reader: Union[TelnetReader, TelnetReaderUnicode], + telnet_writer: Union[TelnetWriter, TelnetWriterUnicode], +) -> None: + """ + Windows telnet client shell using blessed/jinxed Terminal. + + Requires ``blessed>=1.20`` (installed automatically on Windows via the + ``blessed; platform_system == 'Windows'`` dependency in pyproject.toml). + """ + with Terminal(telnet_writer=telnet_writer) as tty_shell: + await _telnet_client_shell_impl(telnet_reader, telnet_writer, tty_shell) diff --git a/telnetlib3/encodings/big5bbs.py b/telnetlib3/encodings/big5bbs.py index d825d9c..a2e172e 100644 --- a/telnetlib3/encodings/big5bbs.py +++ b/telnetlib3/encodings/big5bbs.py @@ -11,7 +11,12 @@ Decoding algorithm: - When a Big5 lead byte (0xA1-0xFE) is followed by a valid Big5 second byte - (0x40-0x7E or 0xA1-0xFE), the pair is decoded as Big5. + (0x40-0x7E or 0xA1-0xFE) AND the pair maps to a defined Big5 character, + the pair is decoded as Big5. +- When a lead byte is followed by a structurally valid second byte but the + pair is undefined in Big5 (e.g. 0xF9 0xF9), the lone lead byte is decoded + via CP437 and the second byte is re-processed. This handles BBS art that + uses repeated high bytes as decorative fills (e.g. ∙∙∙∙ from 0xF9 runs). - When a lead byte is followed by any other byte (e.g. ESC), the lone lead byte is decoded via CP437 and the following byte is re-processed. - Bytes below 0xA1 are decoded via latin-1 (identical to ASCII for 0x00-0x7F). @@ -91,8 +96,14 @@ def decode(self, input: bytes, final: bool = False) -> str: # type: ignore[over if i + 1 < len(data): b2 = data[i + 1] if (0x40 <= b2 <= 0x7E) or (0xA1 <= b2 <= 0xFE): - result.append(bytes([b, b2]).decode("big5", errors=self.errors)) - i += 2 + try: + result.append(bytes([b, b2]).decode("big5", errors="strict")) + i += 2 + except UnicodeDecodeError: + # Structurally valid but undefined in Big5 — treat + # the lone lead byte as a CP437 half-width character. + result.append(bytes([b]).decode("cp437")) + i += 1 else: result.append(bytes([b]).decode("cp437")) i += 1 diff --git a/telnetlib3/server_fingerprinting.py b/telnetlib3/server_fingerprinting.py index 6589b05..ce7f5f4 100644 --- a/telnetlib3/server_fingerprinting.py +++ b/telnetlib3/server_fingerprinting.py @@ -87,6 +87,9 @@ # Match "color?" prompts -- many MUDs ask if the user wants color. _COLOR_RE = re.compile(rb"(?i)color\s*\?") +# Match "ANSI color standard" prompts, e.g. "Do you support the ANSI color standard (Yn)?" +_ANSI_COLOR_STANDARD_RE = re.compile(rb"(?i)ANSI\s+color\s+standard") + # Match numbered menu items offering UTF-8, e.g. "5) UTF-8", "[3] UTF-8", # "2. UTF-8", or "1 ... UTF-8". Many BBS/MUD systems present a charset # selection menu at connect time. The optional \S+/ prefix handles @@ -113,9 +116,10 @@ # Match single "Press [ESC]" prompts without "twice" (e.g. Herbie's BBS). _ESC_ONCE_RE = re.compile(rb"(?i)press\s+[\[<(]?\.?esc\.?[\]>)]?(?!\s+twice)") -# Match "HIT RETURN", "PRESS RETURN", "PRESS ENTER", "HIT ENTER", etc. -# Common on Worldgroup/MajorBBS and other vintage BBS systems. -_RETURN_PROMPT_RE = re.compile(rb"(?i)(?:hit|press)\s+(?:return|enter)\s*[:\.]?") +# Match "HIT RETURN", "PRESS RETURN", "PRESS ENTER", "HIT ENTER", etc., including +# variants with angle brackets or other punctuation around the key name, e.g. +# "press ". Common on Worldgroup/MajorBBS and other vintage BBS systems. +_RETURN_PROMPT_RE = re.compile(rb"(?i)(?:hit|press)\s+[\[<(]?(?:return|enter)[\]>)]?\s*[:\.]?") # Match "Press the BACKSPACE key" prompts -- standard telnet terminal # detection (e.g. TelnetBible.com). Respond with ASCII BS (0x08). @@ -363,6 +367,8 @@ def _detect_yn_prompt(banner: bytes) -> _PromptResult: return _PromptResult(b"y\r\n") if _COLOR_RE.search(stripped): return _PromptResult(b"y\r\n") + if _ANSI_COLOR_STANDARD_RE.search(stripped): + return _PromptResult(b"y\r\n") menu_match = _MENU_UTF8_RE.search(stripped) if menu_match: return _PromptResult(menu_match.group(1) + b"\r\n", encoding="utf-8") diff --git a/telnetlib3/tests/test_client_shell_win32.py b/telnetlib3/tests/test_client_shell_win32.py new file mode 100644 index 0000000..dbcc719 --- /dev/null +++ b/telnetlib3/tests/test_client_shell_win32.py @@ -0,0 +1,366 @@ +""" +Tests for telnetlib3.client_shell_win32 (Windows terminal mode handling). + +Runs cross-platform by injecting a mock ``blessed`` module before import. +""" + +# std imports +import sys +import types +import asyncio +import threading +import contextlib +from unittest import mock + +# 3rd party +import pytest + +# Inject a minimal blessed stub so the module can be imported on Linux. +if "blessed" not in sys.modules: + _mock_blessed = types.ModuleType("blessed") + + class _MockBlessedTerminal: + is_a_tty = False + + @contextlib.contextmanager + def raw(self): + yield + + def inkey(self, timeout=None): + return "" + + _mock_blessed.Terminal = _MockBlessedTerminal + sys.modules["blessed"] = _mock_blessed + +# local +from telnetlib3._session_context import TelnetSessionContext # noqa: E402 +from telnetlib3.client_shell_win32 import Terminal # noqa: E402 + + +class _MockOption: + def __init__(self, opts: "dict[bytes, bool]") -> None: + self._opts = opts + + def enabled(self, key: bytes) -> bool: + return self._opts.get(key, False) + + +def _make_writer( + will_echo: bool = False, + raw_mode: "bool | None" = None, + will_sga: bool = False, + local_sga: bool = False, +) -> object: + from telnetlib3.telopt import SGA + + ctx = TelnetSessionContext() + ctx.raw_mode = raw_mode + return types.SimpleNamespace( + will_echo=will_echo, + client=True, + remote_option=_MockOption({SGA: will_sga}), + local_option=_MockOption({SGA: local_sga}), + log=types.SimpleNamespace(debug=lambda *a, **kw: None), + ctx=ctx, + ) + + +def _make_term(writer: object, istty: bool = False) -> Terminal: + term = Terminal.__new__(Terminal) + term.telnet_writer = writer + term._bt = sys.modules["blessed"].Terminal() + term._istty = istty + term._save_mode = None + term.software_echo = False + term._raw_ctx = None + term._resize_pending = threading.Event() + term.on_resize = None + term._stop_resize = threading.Event() + term._stop_stdin = threading.Event() + term._resize_thread = None + term._stdin_thread = None + return term + + +def _cooked() -> Terminal.ModeDef: + return Terminal.ModeDef(raw=False, echo=True) + + +def test_modedef_fields() -> None: + m = Terminal.ModeDef(raw=True, echo=False) + assert m.raw is True + assert m.echo is False + + +@pytest.mark.parametrize("suppress_echo", [True, False]) +def test_make_raw(suppress_echo: bool) -> None: + term = _make_term(_make_writer()) + result = term._make_raw(_cooked(), suppress_echo=suppress_echo) + assert result.raw is True + assert result.echo is not suppress_echo + + +def test_suppress_echo_clears_echo() -> None: + result = Terminal._suppress_echo(Terminal.ModeDef(raw=False, echo=True)) + assert result.echo is False + assert result.raw is False + + +def test_suppress_echo_preserves_raw() -> None: + result = Terminal._suppress_echo(Terminal.ModeDef(raw=True, echo=True)) + assert result.raw is True + assert result.echo is False + + +def test_server_will_sga_remote() -> None: + assert _make_term(_make_writer(will_sga=True))._server_will_sga() is True + + +def test_server_will_sga_local() -> None: + assert _make_term(_make_writer(local_sga=True))._server_will_sga() is True + + +def test_server_will_sga_absent() -> None: + assert _make_term(_make_writer())._server_will_sga() is False + + +def test_get_mode_not_tty_returns_none() -> None: + term = _make_term(_make_writer(), istty=False) + assert term.get_mode() is None + + +def test_get_mode_tty_returns_cooked_modedef() -> None: + term = _make_term(_make_writer(), istty=True) + mode = term.get_mode() + assert mode is not None + assert mode.raw is False + assert mode.echo is True + + +def test_set_mode_none_is_noop() -> None: + term = _make_term(_make_writer()) + term.set_mode(None) # should not raise + assert term._raw_ctx is None + + +def test_set_mode_raw_enters_context() -> None: + entered = [] + + class _TrackingCtx: + def __enter__(self): + entered.append(True) + return self + + def __exit__(self, *_): + pass + + term = _make_term(_make_writer()) + term._bt = mock.Mock() + term._bt.raw.return_value = _TrackingCtx() + term.set_mode(Terminal.ModeDef(raw=True, echo=False)) + assert len(entered) == 1 + assert term._raw_ctx is not None + + +def test_set_mode_cooked_exits_context() -> None: + closed = [] + + class _TrackingCtx: + def close(self): + closed.append(True) + + term = _make_term(_make_writer()) + term._raw_ctx = _TrackingCtx() + term.set_mode(Terminal.ModeDef(raw=False, echo=True)) + assert len(closed) == 1 + assert term._raw_ctx is None + + +def test_set_mode_raw_when_already_raw_is_noop() -> None: + term = _make_term(_make_writer()) + existing_ctx = mock.MagicMock() + term._raw_ctx = existing_ctx + term.set_mode(Terminal.ModeDef(raw=True, echo=False)) + existing_ctx.__enter__.assert_not_called() + + +@pytest.mark.parametrize( + "will_echo,will_sga,raw_mode", + [(False, False, None), (False, False, False), (True, False, False)], +) +def test_determine_mode_unchanged(will_echo: bool, will_sga: bool, raw_mode: "bool | None") -> None: + term = _make_term(_make_writer(will_echo=will_echo, will_sga=will_sga, raw_mode=raw_mode)) + mode = _cooked() + assert term.determine_mode(mode) is mode + + +@pytest.mark.parametrize( + "will_echo,will_sga,raw_mode", + [(True, True, None), (False, True, None), (False, False, True), (True, False, True)], +) +def test_determine_mode_goes_raw(will_echo: bool, will_sga: bool, raw_mode: "bool | None") -> None: + term = _make_term(_make_writer(will_echo=will_echo, will_sga=will_sga, raw_mode=raw_mode)) + result = term.determine_mode(_cooked()) + assert result.raw is True + + +def test_determine_mode_will_echo_only_suppresses_echo() -> None: + term = _make_term(_make_writer(will_echo=True, will_sga=False, raw_mode=None)) + result = term.determine_mode(_cooked()) + assert result.raw is False + assert result.echo is False + + +def test_determine_mode_will_sga_only_sets_software_echo() -> None: + term = _make_term(_make_writer(will_echo=False, will_sga=True, raw_mode=None)) + term.determine_mode(_cooked()) + assert term.software_echo is True + + +def test_determine_mode_explicit_raw_suppresses_echo() -> None: + term = _make_term(_make_writer(raw_mode=True)) + result = term.determine_mode(_cooked()) + assert result.raw is True + assert result.echo is False + + +def test_check_auto_mode_not_istty_returns_none() -> None: + term = _make_term(_make_writer(will_echo=True, will_sga=True), istty=False) + assert term.check_auto_mode(switched_to_raw=False, last_will_echo=False) is None + + +def test_check_auto_mode_no_change_returns_none() -> None: + term = _make_term(_make_writer(will_echo=False, will_sga=False), istty=True) + term._save_mode = _cooked() + assert term.check_auto_mode(switched_to_raw=False, last_will_echo=False) is None + + +def test_check_auto_mode_suppress_echo_only() -> None: + term = _make_term(_make_writer(will_echo=True, will_sga=False), istty=True) + term._save_mode = _cooked() + set_modes: list[Terminal.ModeDef] = [] + term.set_mode = set_modes.append # type: ignore[method-assign] + result = term.check_auto_mode(switched_to_raw=False, last_will_echo=False) + assert result == (False, True, False) + assert len(set_modes) == 1 + assert set_modes[0].echo is False + assert set_modes[0].raw is False + + +def test_check_auto_mode_sga_goes_raw() -> None: + term = _make_term(_make_writer(will_echo=False, will_sga=True), istty=True) + term._save_mode = _cooked() + set_modes: list[Terminal.ModeDef] = [] + term.set_mode = set_modes.append # type: ignore[method-assign] + result = term.check_auto_mode(switched_to_raw=False, last_will_echo=False) + assert result is not None + switched_to_raw, _, _ = result + assert switched_to_raw is True + assert set_modes[0].raw is True + + +def test_check_auto_mode_echo_changed_while_raw() -> None: + term = _make_term(_make_writer(will_echo=True, will_sga=False), istty=True) + term._save_mode = _cooked() + set_modes: list[Terminal.ModeDef] = [] + term.set_mode = set_modes.append # type: ignore[method-assign] + result = term.check_auto_mode(switched_to_raw=True, last_will_echo=False) + assert result is not None + _, last_will_echo, local_echo = result + assert last_will_echo is True + assert local_echo is False + + +def test_check_auto_mode_already_raw_no_echo_change_returns_none() -> None: + term = _make_term(_make_writer(will_echo=True, will_sga=False), istty=True) + term._save_mode = _cooked() + assert term.check_auto_mode(switched_to_raw=True, last_will_echo=True) is None + + +def test_setup_winch_not_istty_skips() -> None: + term = _make_term(_make_writer(), istty=False) + term.setup_winch() + assert term._resize_thread is None + + +@pytest.mark.asyncio +async def test_setup_winch_starts_thread(monkeypatch: pytest.MonkeyPatch) -> None: + import os + + monkeypatch.setattr(os, "get_terminal_size", lambda: os.terminal_size((80, 24))) + term = _make_term(_make_writer(), istty=True) + term.setup_winch() + assert term._resize_thread is not None + assert term._resize_thread.is_alive() + term.cleanup_winch() + + +@pytest.mark.asyncio +async def test_cleanup_winch_stops_thread(monkeypatch: pytest.MonkeyPatch) -> None: + import os + + monkeypatch.setattr(os, "get_terminal_size", lambda: os.terminal_size((80, 24))) + term = _make_term(_make_writer(), istty=True) + term.setup_winch() + assert term._resize_thread is not None + term.cleanup_winch() + assert term._stop_resize.is_set() + assert term._resize_thread is None + + +def test_setup_winch_os_error_skips(monkeypatch: pytest.MonkeyPatch) -> None: + import os + + monkeypatch.setattr(os, "get_terminal_size", mock.Mock(side_effect=OSError)) + term = _make_term(_make_writer(), istty=True) + term.setup_winch() + assert term._resize_thread is None + + +@pytest.mark.asyncio +async def test_resize_poll_detects_change(monkeypatch: pytest.MonkeyPatch) -> None: + import os + + sizes = [os.terminal_size((80, 24)), os.terminal_size((100, 30))] + call_count = 0 + + def _fake_gts(): + nonlocal call_count + s = sizes[min(call_count, len(sizes) - 1)] + call_count += 1 + return s + + monkeypatch.setattr(os, "get_terminal_size", _fake_gts) + term = _make_term(_make_writer(), istty=True) + term.setup_winch() + await asyncio.sleep(0.7) + term.cleanup_winch() + assert term._resize_pending.is_set() + + +def test_disconnect_stdin_sets_stop_flag() -> None: + term = _make_term(_make_writer()) + reader = mock.Mock(spec=asyncio.StreamReader) + term.disconnect_stdin(reader) + assert term._stop_stdin.is_set() + + +def test_disconnect_stdin_feeds_eof() -> None: + term = _make_term(_make_writer()) + reader = mock.Mock(spec=asyncio.StreamReader) + term.disconnect_stdin(reader) + reader.feed_eof.assert_called_once() + + +@pytest.mark.asyncio +async def test_make_stdout_write_and_drain() -> None: + term = _make_term(_make_writer()) + buf = bytearray() + with mock.patch("sys.stdout") as mock_stdout: + mock_stdout.buffer = mock.Mock() + mock_stdout.buffer.write = buf.extend + mock_stdout.buffer.flush = mock.Mock() + writer = await term.make_stdout() + writer.write(b"hello") + await writer.drain() + assert bytes(buf) == b"hello" diff --git a/telnetlib3/tests/test_linemode.py b/telnetlib3/tests/test_linemode.py index 5c582cb..b39d033 100644 --- a/telnetlib3/tests/test_linemode.py +++ b/telnetlib3/tests/test_linemode.py @@ -42,7 +42,6 @@ class ServerTestLinemode(telnetlib3.BaseServer): def begin_negotiation(self): super().begin_negotiation() self.writer.iac(DO, LINEMODE) - asyncio.get_event_loop().call_later(0.1, self.connection_lost, None) async with create_server( protocol_factory=ServerTestLinemode, host=bind_host, port=unused_tcp_port @@ -64,15 +63,16 @@ def begin_negotiation(self): assert result == expect_stage2 client_writer.write(reply_stage2) - srv_instance = await asyncio.wait_for(server.wait_for_client(), 0.1) + srv_instance = await asyncio.wait_for(server.wait_for_client(), 1.0) await asyncio.wait_for( srv_instance.writer.wait_for( remote={"LINEMODE": True}, pending={"LINEMODE": False} ), - 0.1, + 1.0, ) - # server sends SLC table after MODE ACK; drain remaining bytes to reach EOF + # server sends SLC table after MODE ACK; close server side to get EOF + asyncio.get_running_loop().call_soon(srv_instance.connection_lost, None) result = await client_reader.read() assert result.startswith(IAC + SB + LINEMODE + LMODE_SLC) @@ -90,7 +90,6 @@ class ServerTestLinemode(telnetlib3.BaseServer): def begin_negotiation(self): super().begin_negotiation() self.writer.iac(DO, LINEMODE) - asyncio.get_event_loop().call_later(0.1, self.connection_lost, None) async with create_server( protocol_factory=ServerTestLinemode, host=bind_host, port=unused_tcp_port @@ -113,15 +112,16 @@ def begin_negotiation(self): assert result == expect_stage2 client_writer.write(reply_stage2) - srv_instance = await asyncio.wait_for(server.wait_for_client(), 0.1) + srv_instance = await asyncio.wait_for(server.wait_for_client(), 1.0) await asyncio.wait_for( srv_instance.writer.wait_for( remote={"LINEMODE": True}, pending={"LINEMODE": False} ), - 0.1, + 1.0, ) - # server sends SLC table after MODE ACK; drain remaining bytes to reach EOF + # server sends SLC table after MODE ACK; close server side to get EOF + asyncio.get_running_loop().call_soon(srv_instance.connection_lost, None) result = await client_reader.read() assert result.startswith(IAC + SB + LINEMODE + LMODE_SLC) diff --git a/telnetlib3/tests/test_platform.py b/telnetlib3/tests/test_platform.py deleted file mode 100644 index e4a07e3..0000000 --- a/telnetlib3/tests/test_platform.py +++ /dev/null @@ -1,15 +0,0 @@ -# std imports -import sys - -# 3rd party -import pytest - - -@pytest.mark.skipif(sys.platform != "win32", reason="Windows-only code path") -@pytest.mark.asyncio -async def test_client_shell_win32_not_implemented(): - """telnet_client_shell raises NotImplementedError on Windows.""" - from telnetlib3.client_shell import telnet_client_shell - - with pytest.raises(NotImplementedError, match="win32"): - await telnet_client_shell(None, None)