From dcccf592e6a75fd2b4c6dd16b6821767116cac5a Mon Sep 17 00:00:00 2001 From: Jeff Quast Date: Thu, 5 Mar 2026 01:53:21 -0500 Subject: [PATCH 1/3] mypy --strict --- telnetlib3/fingerprinting_display.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/telnetlib3/fingerprinting_display.py b/telnetlib3/fingerprinting_display.py index 314e523..fb608e3 100644 --- a/telnetlib3/fingerprinting_display.py +++ b/telnetlib3/fingerprinting_display.py @@ -770,7 +770,7 @@ def _build_seen_counts( return "" -def _color_match(term: "blessed.Terminal", name: str, score: float) -> str: +def _color_match(term: Optional["blessed.Terminal"], name: str, score: float) -> str: """ Color a nearest-match result by confidence threshold. @@ -790,7 +790,7 @@ def _color_match(term: "blessed.Terminal", name: str, score: float) -> str: def _nearest_match_lines( data: Dict[str, Any], names: Dict[str, str], - term: "blessed.Terminal", + term: Optional["blessed.Terminal"], telnet_unknown: bool = False, terminal_unknown: bool = False, ) -> List[str]: From 5168e8f06032793d2d7cc16b61f0815ec9852b60 Mon Sep 17 00:00:00 2001 From: Jeff Quast Date: Thu, 5 Mar 2026 01:54:23 -0500 Subject: [PATCH 2/3] and add --strict --- tox.ini | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tox.ini b/tox.ini index 72bb27f..c8299ef 100644 --- a/tox.ini +++ b/tox.ini @@ -105,7 +105,7 @@ commands = [testenv:mypy] deps = - mypy + mypy --strict commands = mypy telnetlib3 From 4bfd0ff072be88505f83b1ee35a14493d8b97d2e Mon Sep 17 00:00:00 2001 From: Jeff Quast Date: Thu, 5 Mar 2026 05:04:36 -0500 Subject: [PATCH 3/3] bugfix connection burst sometimes loses ECHO mode (#135) * bugfix connection burst sometimes loses ECHO mode * asyncio.get_event_loop() -> asyncio.get_running_loop() --- docs/history.rst | 4 + telnetlib3/client.py | 4 +- telnetlib3/client_shell.py | 141 ++++++++++++++---- telnetlib3/fingerprinting.py | 5 +- telnetlib3/server.py | 8 +- telnetlib3/server_fingerprinting.py | 2 +- telnetlib3/server_pty_shell.py | 4 +- telnetlib3/tests/accessories.py | 2 +- telnetlib3/tests/test_client_unit.py | 2 +- telnetlib3/tests/test_pty_shell.py | 7 +- telnetlib3/tests/test_server.py | 35 ++--- .../tests/test_server_fingerprinting.py | 4 +- 12 files changed, 152 insertions(+), 66 deletions(-) diff --git a/docs/history.rst b/docs/history.rst index 9868190..5f6972e 100644 --- a/docs/history.rst +++ b/docs/history.rst @@ -1,5 +1,9 @@ History ======= +4.0.1 + * bugfix: ``telnetlib3-client`` could begin a shell in wrong ECHO mode, depending on order of + options in a "connection burst". + 4.0.0 * removed: ``telnetlib3.color_filter``. ``ColorFilter``, ``ColorConfig``, ``PALETTES``, ``PetsciiColorFilter``, and ``AtasciiControlFilter`` have all been moved to the downstream diff --git a/telnetlib3/client.py b/telnetlib3/client.py index dabc2b5..29d1879 100755 --- a/telnetlib3/client.py +++ b/telnetlib3/client.py @@ -623,7 +623,7 @@ def connection_factory() -> client_base.BaseClient: try: _, protocol = await asyncio.wait_for( - asyncio.get_event_loop().create_connection( + asyncio.get_running_loop().create_connection( connection_factory, host or "localhost", port, **conn_kwargs ), timeout=connect_timeout, @@ -1267,7 +1267,7 @@ def patched_send_env(keys: Sequence[str]) -> Dict[str, Any]: else: fp_ssl = ssl_module.create_default_context() - waiter_closed: asyncio.Future[None] = asyncio.get_event_loop().create_future() + waiter_closed: asyncio.Future[None] = asyncio.get_running_loop().create_future() fp_conn_kwargs: Dict[str, Any] = { "host": args.host, diff --git a/telnetlib3/client_shell.py b/telnetlib3/client_shell.py index f5fdc6c..27c6206 100644 --- a/telnetlib3/client_shell.py +++ b/telnetlib3/client_shell.py @@ -173,7 +173,15 @@ def feed(self, data: bytes) -> bytes: @dataclass class _RawLoopState: - """Mutable state bundle for :func:`_raw_event_loop`.""" + """ + Mutable state bundle for :func:`_raw_event_loop`. + + Initialised by :func:`telnet_client_shell` before the loop starts and mutated + in-place as mid-session negotiation arrives (e.g. server WILL ECHO toggling + after login, LINEMODE EDIT confirmed by server). On loop exit, + ``switched_to_raw`` and ``reactivate_repl`` reflect final state so the caller + can decide whether to restart a REPL. + """ switched_to_raw: bool last_will_echo: bool @@ -529,7 +537,7 @@ async def make_stdout(self) -> asyncio.StreamWriter: write_fobj = sys.stdout if self._istty: write_fobj = sys.stdin - loop = asyncio.get_event_loop() + loop = asyncio.get_running_loop() writer_transport, writer_protocol = await loop.connect_write_pipe( asyncio.streams.FlowControlMixin, write_fobj ) @@ -544,7 +552,7 @@ async def connect_stdin(self) -> asyncio.StreamReader: """ reader = asyncio.StreamReader() reader_protocol = asyncio.StreamReaderProtocol(reader) - transport, _ = await asyncio.get_event_loop().connect_read_pipe( + transport, _ = await asyncio.get_running_loop().connect_read_pipe( lambda: reader_protocol, sys.stdin ) self._stdin_transport = transport @@ -628,17 +636,37 @@ def _send_stdin( return new_timer, pending def _get_raw_mode(writer: Union[TelnetWriter, TelnetWriterUnicode]) -> "bool | None": - """Return the writer's ``ctx.raw_mode`` (``None``, ``True``, or ``False``).""" + """ + Return the raw-mode override from the writer's session context. + + ``None`` = auto-detect from server negotiation (default), + ``True`` = force raw / character-at-a-time, + ``False`` = force line mode. + """ return writer.ctx.raw_mode def _ensure_autoreply_engine( telnet_writer: Union[TelnetWriter, TelnetWriterUnicode], ) -> "Optional[Any]": - """Return the autoreply engine from the writer's context, if set.""" + """ + Return the autoreply engine from the writer's session context, or ``None``. + + The autoreply engine is optional application-level machinery (e.g. a macro + engine in a MUD client) that watches server output and sends pre-configured + replies. It is absent in standalone telnetlib3 and supplied by the host + application via ``writer.ctx.autoreply_engine``. + """ return telnet_writer.ctx.autoreply_engine def _get_linemode_buffer(writer: Union[TelnetWriter, TelnetWriterUnicode]) -> "LinemodeBuffer": - """Return (or lazily create) the LinemodeBuffer attached to *writer*.""" + """ + Return (or lazily create) the :class:`LinemodeBuffer` attached to *writer*. + + The buffer is stored as ``writer._linemode_buf`` so it persists across loop + iterations and accumulates characters between :meth:`LinemodeBuffer.feed` + calls. Created on first use because LINEMODE negotiation may complete after + the shell has already started. + """ buf: Optional[LinemodeBuffer] = getattr(writer, "_linemode_buf", None) if buf is None: buf = LinemodeBuffer( @@ -749,7 +777,7 @@ async def _raw_event_loop( ar_engine = _ensure_autoreply_engine(telnet_writer) if ar_engine is not None: ar_engine.feed(out) - if raw_mode is None: + if raw_mode is None or (raw_mode is True and state.switched_to_raw): mode_result = tty_shell.check_auto_mode( state.switched_to_raw, state.last_will_echo ) @@ -763,7 +791,7 @@ async def _raw_event_loop( # becomes \r\n for correct display. if state.switched_to_raw and not in_raw: out = out.replace("\n", "\r\n") - if want_repl(): + if raw_mode is None and want_repl(): state.reactivate_repl = True stdout.write(out.encode()) _ts_file = telnet_writer.ctx.typescript_file @@ -807,49 +835,102 @@ async def telnet_client_shell( stdout = await tty_shell.make_stdout() tty_shell.setup_winch() - # EOR/GA-based command pacing for raw-mode autoreplies. - prompt_ready_raw = asyncio.Event() - prompt_ready_raw.set() - ga_detected_raw = False - - _sh_ctx: TelnetSessionContext = telnet_writer.ctx - - def _on_prompt_signal_raw(_cmd: bytes) -> None: - nonlocal ga_detected_raw - ga_detected_raw = True - prompt_ready_raw.set() - ar = _sh_ctx.autoreply_engine + # Prompt-pacing via IAC GA / IAC EOR. + # + # MUD servers emit IAC GA (Go-Ahead, RFC 854) or IAC EOR (End-of-Record, RFC 885) after + # each prompt to signal "output is complete, awaiting your input." The autoreply engine + # uses this to pace its replies. It calls ctx.autoreply_wait_fn() before sending each + # reply, preventing races where a reply arrives before the server has finished rendering + # the prompt. + # + # 'server_uses_ga' becomes True on the first GA/EOR received. _wait_for_prompt is does + # nothing until 'server_uses_ga', so servers that never send GA/EOR (Most everything but + # MUDs these days) are silently unaffected. + # + # prompt_event starts SET so the first autoreply fires immediately — there is no prior + # GA to wait for. _on_ga_or_eor re-sets it on each prompt signal; _wait_for_prompt + # clears it after consuming the signal so the next autoreply waits for the following + # prompt. + prompt_event = asyncio.Event() + prompt_event.set() + server_uses_ga = False + + # The session context is the decoupling point between this shell and the + # autoreply engine (which may live in a separate module). Storing + # _wait_for_prompt on it lets the engine call back into our local event state + # without a direct import or reference to this closure. + ctx: TelnetSessionContext = telnet_writer.ctx + + def _on_ga_or_eor(_cmd: bytes) -> None: + nonlocal server_uses_ga + server_uses_ga = True + prompt_event.set() + ar = ctx.autoreply_engine if ar is not None: ar.on_prompt() from .telopt import GA, CMD_EOR - telnet_writer.set_iac_callback(GA, _on_prompt_signal_raw) - telnet_writer.set_iac_callback(CMD_EOR, _on_prompt_signal_raw) + telnet_writer.set_iac_callback(GA, _on_ga_or_eor) + telnet_writer.set_iac_callback(CMD_EOR, _on_ga_or_eor) + + async def _wait_for_prompt() -> None: + """ + Wait for the next prompt signal before the autoreply engine sends a reply. - async def _wait_for_prompt_raw() -> None: - if not ga_detected_raw: + No-op until the first GA/EOR confirms this server uses prompt signalling. + After that, blocks until :func:`_on_ga_or_eor` fires the event, then clears + it to arm the wait for the following prompt. A 2-second safety timeout + prevents stalling if the server stops sending GA mid-session. + """ + if not server_uses_ga: return try: - await asyncio.wait_for(prompt_ready_raw.wait(), timeout=2.0) + await asyncio.wait_for(prompt_event.wait(), timeout=2.0) except asyncio.TimeoutError: pass - prompt_ready_raw.clear() + prompt_event.clear() - _sh_ctx.autoreply_wait_fn = _wait_for_prompt_raw + ctx.autoreply_wait_fn = _wait_for_prompt escape_name = accessories.name_unicode(keyboard_escape) banner_sep = "\r\n" if tty_shell._istty else linesep stdout.write(f"Escape character is '{escape_name}'.{banner_sep}".encode()) def _handle_close(msg: str) -> None: + # \033[m resets all SGR attributes so server-set colours do not + # bleed into the terminal after disconnect. stdout.write(f"\033[m{linesep}{msg}{linesep}".encode()) tty_shell.cleanup_winch() - def _want_repl() -> bool: + def _should_reactivate_repl() -> bool: + # Extension point for callers that embed a REPL (e.g. a MUD client). + # Return True to break _raw_event_loop and return to the REPL when + # the server puts the terminal back into local mode. The base shell + # has no REPL, so this always returns False. return False - # Standard event loop (byte-at-a-time). + # Wait up to 50 ms for subsequent WILL ECHO / WILL SGA packets to arrive before + # committing to a terminal mode. + # + # check_negotiation() declares the handshake complete as soon as TTYPE and NEW_ENVIRON / + # CHARSET are settled, without waiting for ECHO / SGA. Those options typically travel + # in the same "initial negotiation burst" but may not have not yet have "arrived" at + # this point in our TCP read until a few milliseconds later. Servers that never send + # WILL ECHO (rlogin, basically) simply time out and proceed correctly. + raw_mode = _get_raw_mode(telnet_writer) + if raw_mode is not False and tty_shell._istty: + try: + await asyncio.wait_for( + telnet_writer.wait_for_condition(lambda w: w.mode != "local"), timeout=0.05 + ) + except (asyncio.TimeoutError, asyncio.CancelledError): + pass + + # Commit the terminal to raw mode now that will_echo is stable. suppress_echo=True + # disables the kernel's local ECHO because the server will echo (or we handle it in + # software). local_echo is set to True only when the server will NOT echo, so we + # reproduce keystrokes ourselves. if not switched_to_raw and tty_shell._istty and tty_shell._save_mode is not None: tty_shell.set_mode(tty_shell._make_raw(tty_shell._save_mode, suppress_echo=True)) switched_to_raw = True @@ -871,6 +952,6 @@ def _want_repl() -> bool: keyboard_escape, state, _handle_close, - _want_repl, + _should_reactivate_repl, ) tty_shell.disconnect_stdin(stdin) diff --git a/telnetlib3/fingerprinting.py b/telnetlib3/fingerprinting.py index adcc0b1..32aa910 100644 --- a/telnetlib3/fingerprinting.py +++ b/telnetlib3/fingerprinting.py @@ -358,8 +358,9 @@ async def probe_client_capabilities( await writer.drain() - deadline = asyncio.get_event_loop().time() + timeout - while asyncio.get_event_loop().time() < deadline: + loop = asyncio.get_running_loop() + deadline = loop.time() + timeout + while loop.time() < deadline: all_responded = all( writer.remote_option.get(opt) is not None for opt, name, desc in to_probe diff --git a/telnetlib3/server.py b/telnetlib3/server.py index f8b4da0..0c50e40 100755 --- a/telnetlib3/server.py +++ b/telnetlib3/server.py @@ -780,7 +780,7 @@ async def _upgrade_to_tls(self) -> None: (rewritten in 3.11). See https://github.com/python/cpython/issues/79156 """ - loop = asyncio.get_event_loop() + loop = asyncio.get_running_loop() assert self._transport is not None protocol = self._real_factory() try: @@ -1138,13 +1138,13 @@ def _make_telnet_protocol() -> asyncio.Protocol: def factory() -> asyncio.Protocol: return _TLSAutoDetectProtocol(ssl, _make_telnet_protocol) - telnet_server._server = await asyncio.get_event_loop().create_server(factory, host, port) + telnet_server._server = await asyncio.get_running_loop().create_server(factory, host, port) else: def factory() -> asyncio.Protocol: return _make_telnet_protocol() - telnet_server._server = await asyncio.get_event_loop().create_server( + telnet_server._server = await asyncio.get_running_loop().create_server( factory, host, port, ssl=ssl ) @@ -1392,7 +1392,7 @@ async def guarded_shell( _cfg_mapping = ", ".join((f"{field}={{{field}}}" for field in CONFIG._fields)).format(**_locals) logger.debug("Server configuration: %s", _cfg_mapping) - loop = asyncio.get_event_loop() + loop = asyncio.get_running_loop() # bind server = await create_server( diff --git a/telnetlib3/server_fingerprinting.py b/telnetlib3/server_fingerprinting.py index 868c503..6589b05 100644 --- a/telnetlib3/server_fingerprinting.py +++ b/telnetlib3/server_fingerprinting.py @@ -990,7 +990,7 @@ async def _read_banner_until_quiet( stripped_accum = bytearray() esc_responded = False menu_responded = False - loop = asyncio.get_event_loop() + loop = asyncio.get_running_loop() deadline = loop.time() + max_wait while loop.time() < deadline: remaining = min(quiet_time, deadline - loop.time()) diff --git a/telnetlib3/server_pty_shell.py b/telnetlib3/server_pty_shell.py index 8c13276..0fea9b5 100644 --- a/telnetlib3/server_pty_shell.py +++ b/telnetlib3/server_pty_shell.py @@ -305,7 +305,7 @@ async def run(self) -> None: """Bridge loop between telnet and PTY.""" import errno - loop = asyncio.get_event_loop() + loop = asyncio.get_running_loop() pty_read_event = asyncio.Event() pty_data_queue: asyncio.Queue[bytes] = asyncio.Queue() @@ -583,7 +583,7 @@ async def _wait_for_terminal_info( :param writer: TelnetWriter instance. :param timeout: Maximum time to wait in seconds. """ - loop = asyncio.get_event_loop() + loop = asyncio.get_running_loop() start = loop.time() while loop.time() - start < timeout: diff --git a/telnetlib3/tests/accessories.py b/telnetlib3/tests/accessories.py index f4f8cfe..2e4ae6a 100644 --- a/telnetlib3/tests/accessories.py +++ b/telnetlib3/tests/accessories.py @@ -145,7 +145,7 @@ async def asyncio_server(protocol_factory, host, port): class TrackingProtocol(_TrackingProtocol, protocol_factory): _transports = transports - server = await asyncio.get_event_loop().create_server(TrackingProtocol, host, port) + server = await asyncio.get_running_loop().create_server(TrackingProtocol, host, port) try: yield server finally: diff --git a/telnetlib3/tests/test_client_unit.py b/telnetlib3/tests/test_client_unit.py index e1c74dc..b832245 100644 --- a/telnetlib3/tests/test_client_unit.py +++ b/telnetlib3/tests/test_client_unit.py @@ -389,7 +389,7 @@ async def test_begin_shell_cancelled_future(): client = BaseClient.__new__(BaseClient) client.log = types.SimpleNamespace(debug=lambda *a, **kw: None, isEnabledFor=lambda _: False) client.shell = lambda r, w: None - fut = asyncio.get_event_loop().create_future() + fut = asyncio.get_running_loop().create_future() fut.cancel() client.begin_shell(fut) diff --git a/telnetlib3/tests/test_pty_shell.py b/telnetlib3/tests/test_pty_shell.py index fd8aa09..2558da7 100644 --- a/telnetlib3/tests/test_pty_shell.py +++ b/telnetlib3/tests/test_pty_shell.py @@ -112,9 +112,10 @@ def begin_shell(self, result): await writer.drain() result = "" - deadline = asyncio.get_event_loop().time() + 2.0 + loop = asyncio.get_running_loop() + deadline = loop.time() + 2.0 while "hello world" not in result: - remaining = deadline - asyncio.get_event_loop().time() + remaining = deadline - loop.time() if remaining <= 0: break chunk = await asyncio.wait_for(reader.read(50), remaining) @@ -892,7 +893,7 @@ async def noop_bridge(*a): with ( patch("os.waitpid", return_value=(0, 0)), - patch("asyncio.get_event_loop", return_value=mock_loop), + patch("asyncio.get_running_loop", return_value=mock_loop), patch.object(session, "_bridge_loop", side_effect=noop_bridge), ): await session.run() diff --git a/telnetlib3/tests/test_server.py b/telnetlib3/tests/test_server.py index 04d6d6b..e19f446 100644 --- a/telnetlib3/tests/test_server.py +++ b/telnetlib3/tests/test_server.py @@ -18,7 +18,7 @@ async def test_connection_lost_closes_transport_despite_set_protocol_error(): server = BaseServer.__new__(BaseServer) server.log = __import__("logging").getLogger("test_server") server._tasks = [] - server._waiter_connected = asyncio.get_event_loop().create_future() + server._waiter_connected = asyncio.get_running_loop().create_future() server._extra = {} server.shell = None @@ -54,7 +54,7 @@ async def test_connection_lost_remove_done_callback_raises(): server.shell = None server._closing = False - waiter = asyncio.get_event_loop().create_future() + waiter = asyncio.get_running_loop().create_future() class _BadWaiter: done = waiter.done @@ -246,7 +246,7 @@ async def test_tls_upgrade_handshake_failure(): transport.is_closing.return_value = False proto._transport = transport - loop = asyncio.get_event_loop() + loop = asyncio.get_running_loop() with patch.object(loop, "start_tls", side_effect=ssl_module.SSLError("handshake failed")): await proto._upgrade_to_tls() @@ -299,19 +299,20 @@ async def mock_create_server(*args, **kwargs): created_server.shell = kwargs.get("shell") return created_server + loop = asyncio.get_running_loop() with patch("telnetlib3.server.create_server", side_effect=mock_create_server): - with patch("asyncio.get_event_loop") as mock_loop: - mock_loop.return_value = asyncio.get_event_loop() - try: - await run_server( - host="127.0.0.1", - port=0, - shell=lambda r, w: None, - robot_check=True, - pty_fork_limit=2, - ) - except (asyncio.CancelledError, OSError): - pass + with patch.object(loop, "add_signal_handler"): + with patch.object(loop, "remove_signal_handler"): + try: + await run_server( + host="127.0.0.1", + port=0, + shell=lambda r, w: None, + robot_check=True, + pty_fork_limit=2, + ) + except (asyncio.CancelledError, OSError): + pass assert created_server.shell is not None @@ -322,7 +323,8 @@ async def test_run_server_status_logger_lifecycle(): from telnetlib3.server import run_server created_server = MagicMock() - wait_future = asyncio.get_event_loop().create_future() + loop = asyncio.get_running_loop() + wait_future = loop.create_future() wait_future.set_result(None) created_server.wait_closed = MagicMock(return_value=wait_future) created_server.sockets = [] @@ -331,7 +333,6 @@ async def mock_create_server(*args, **kwargs): return created_server with patch("telnetlib3.server.create_server", side_effect=mock_create_server): - loop = asyncio.get_event_loop() with patch.object(loop, "add_signal_handler"): with patch.object(loop, "remove_signal_handler"): await run_server( diff --git a/telnetlib3/tests/test_server_fingerprinting.py b/telnetlib3/tests/test_server_fingerprinting.py index 4e03954..e1043e3 100644 --- a/telnetlib3/tests/test_server_fingerprinting.py +++ b/telnetlib3/tests/test_server_fingerprinting.py @@ -798,9 +798,7 @@ async def test_probe_skipped_when_closing(tmp_path): pytest.param( b"Press twice for the BBS ... ", b"\x1b\x1b", None, id="esc_twice_angle_brackets" ), - pytest.param( - b"Press (ESC) twice", b"\x1b\x1b", None, id="esc_twice_parens" - ), + pytest.param(b"Press (ESC) twice", b"\x1b\x1b", None, id="esc_twice_parens"), pytest.param( b"\x1b[33mPress [.ESC.] twice within 10 seconds\x1b[0m", b"\x1b\x1b",