diff --git a/CHANGELOG.md b/CHANGELOG.md index e677ea0..98d5529 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,6 @@ ## [2.17.0] - yyyy-mm-dd - Improved Web/WASM compatibility by updating `SSHSocket` conditional imports so web runtimes consistently use the web socket shim and avoid incorrect native socket selection [#88]. Thanks [@vicajilau]. +- Added local dynamic forwarding (`SSHClient.forwardDynamic`) with SOCKS5 `NO AUTH` + `CONNECT`, including configurable handshake/connect timeouts and connection limits. ## [2.16.0] - 2026-03-24 - **BREAKING**: Changed `SSHChannelController.sendEnv()` from `void` to `Future` to properly await environment variable setup responses and avoid race conditions with PTY requests [#102]. Thanks [@itzhoujun] and [@vicajilau]. diff --git a/README.md b/README.md index 6018799..53ea6e2 100644 --- a/README.md +++ b/README.md @@ -28,7 +28,7 @@ SSH and SFTP client written in pure Dart, aiming to be feature-rich as well as e - **Pure Dart**: Working with both Dart VM and Flutter. - **SSH Session**: Executing commands, spawning shells, setting environment variables, pseudo terminals, etc. - **Authentication**: Supports password, private key and interactive authentication method. -- **Forwarding**: Supports local forwarding and remote forwarding. +- **Forwarding**: Supports local forwarding, remote forwarding, and dynamic forwarding (SOCKS5 CONNECT). - **SFTP**: Supports all operations defined in [SFTPv3 protocol](https://datatracker.ietf.org/doc/html/draft-ietf-secsh-filexfer-02) including upload, download, list, link, remove, rename, etc. ## 🧬 Built with dartssh2 @@ -296,6 +296,40 @@ void main() async { } ``` +### Start a local SOCKS5 proxy through SSH (`ssh -D` style) + +```dart +void main() async { + final dynamicForward = await client.forwardDynamic( + bindHost: '127.0.0.1', + bindPort: 1080, + options: const SSHDynamicForwardOptions( + handshakeTimeout: Duration(seconds: 10), + connectTimeout: Duration(seconds: 15), + maxConnections: 128, + ), + filter: (host, port) { + // Optional allow/deny policy. + return true; + }, + ); + + print('SOCKS5 proxy at ${dynamicForward.host}:${dynamicForward.port}'); +} +``` + +This currently supports SOCKS5 `NO AUTH` + `CONNECT`. +It requires `dart:io` and is not available on web runtimes. + +Quick verification from your terminal: + +```sh +curl --proxy socks5h://127.0.0.1:1080 https://ifconfig.me +``` + +If the proxy is working, this command returns the public egress IP seen through +the SSH tunnel. + ### Authenticate with public keys ```dart diff --git a/example/forward_dynamic.dart b/example/forward_dynamic.dart new file mode 100644 index 0000000..41bc860 --- /dev/null +++ b/example/forward_dynamic.dart @@ -0,0 +1,48 @@ +import 'dart:io'; + +import 'package:dartssh2/dartssh2.dart'; + +Future main() async { + final host = Platform.environment['SSH_HOST'] ?? 'localhost'; + final port = int.tryParse(Platform.environment['SSH_PORT'] ?? '') ?? 22; + final username = Platform.environment['SSH_USERNAME'] ?? 'root'; + final password = Platform.environment['SSH_PASSWORD']; + + final socket = await SSHSocket.connect(host, port); + + final client = SSHClient( + socket, + username: username, + onPasswordRequest: () => password, + ); + + await client.authenticated; + + final dynamicForward = await client.forwardDynamic( + bindHost: '127.0.0.1', + bindPort: 1080, + options: const SSHDynamicForwardOptions( + handshakeTimeout: Duration(seconds: 10), + connectTimeout: Duration(seconds: 15), + maxConnections: 64, + ), + filter: (targetHost, targetPort) { + // Allow only web ports in this sample. + return targetPort == 80 || targetPort == 443; + }, + ); + + print( + 'SOCKS5 proxy ready on ${dynamicForward.host}:${dynamicForward.port}.', + ); + print('Press Ctrl+C to stop.'); + + ProcessSignal.sigint.watch().listen((_) async { + await dynamicForward.close(); + client.close(); + await client.done; + exit(0); + }); + + await Future.delayed(const Duration(days: 365)); +} diff --git a/lib/src/dynamic_forward.dart b/lib/src/dynamic_forward.dart new file mode 100644 index 0000000..f9d3e7d --- /dev/null +++ b/lib/src/dynamic_forward.dart @@ -0,0 +1,24 @@ +import 'package:dartssh2/src/dynamic_forward_stub.dart' + if (dart.library.io) 'package:dartssh2/src/dynamic_forward_io.dart' as impl; +import 'package:dartssh2/src/ssh_forward.dart'; + +typedef SSHDynamicDial = Future Function( + String host, + int port, +); + +Future startDynamicForward({ + required String bindHost, + required int? bindPort, + required SSHDynamicForwardOptions options, + SSHDynamicConnectionFilter? filter, + required SSHDynamicDial dial, +}) { + return impl.startDynamicForward( + bindHost: bindHost, + bindPort: bindPort, + options: options, + filter: filter, + dial: dial, + ); +} diff --git a/lib/src/dynamic_forward_io.dart b/lib/src/dynamic_forward_io.dart new file mode 100644 index 0000000..191b334 --- /dev/null +++ b/lib/src/dynamic_forward_io.dart @@ -0,0 +1,377 @@ +import 'dart:async'; +import 'dart:convert'; +import 'dart:io'; +import 'dart:typed_data'; + +import 'package:dartssh2/src/ssh_forward.dart'; + +typedef SSHDynamicDial = Future Function( + String host, + int port, +); + +Future startDynamicForward({ + required String bindHost, + required int? bindPort, + required SSHDynamicForwardOptions options, + SSHDynamicConnectionFilter? filter, + required SSHDynamicDial dial, +}) async { + final server = await ServerSocket.bind(bindHost, bindPort ?? 0); + return _SSHDynamicForwardImpl( + server, + options: options, + filter: filter, + dial: dial, + ); +} + +class _SSHDynamicForwardImpl implements SSHDynamicForward { + _SSHDynamicForwardImpl( + this._server, { + required this.options, + required this.filter, + required this.dial, + }) { + _serverSub = _server.listen(_handleClient); + } + + final ServerSocket _server; + final SSHDynamicForwardOptions options; + final SSHDynamicConnectionFilter? filter; + final SSHDynamicDial dial; + late final StreamSubscription _serverSub; + final _connections = <_SocksConnection>{}; + bool _closed = false; + + @override + String get host => _server.address.host; + + @override + int get port => _server.port; + + @override + bool get isClosed => _closed; + + void _handleClient(Socket client) { + if (_closed) { + client.destroy(); + return; + } + + late final _SocksConnection connection; + connection = _SocksConnection( + client, + options: options, + filter: filter, + canOpenTunnel: () => _connections.length <= options.maxConnections, + dial: dial, + onClosed: () => _connections.remove(connection), + ); + + _connections.add(connection); + connection.start(); + } + + @override + Future close() async { + if (_closed) return; + _closed = true; + + await _serverSub.cancel(); + await _server.close(); + + final closes = + _connections.map((connection) => connection.close()).toList(); + await Future.wait(closes); + _connections.clear(); + } +} + +class _SocksConnection { + _SocksConnection( + this._client, { + required this.options, + required this.filter, + required this.canOpenTunnel, + required this.dial, + required this.onClosed, + }); + + static const _socksVersion = 0x05; + + final Socket _client; + final SSHDynamicForwardOptions options; + final SSHDynamicConnectionFilter? filter; + final bool Function() canOpenTunnel; + final SSHDynamicDial dial; + final void Function() onClosed; + + final _buffer = _ByteBuffer(); + + SSHForwardChannel? _remote; + StreamSubscription>? _clientSub; + StreamSubscription? _remoteSub; + Timer? _handshakeTimer; + bool _closed = false; + _SocksState _state = _SocksState.greeting; + + void start() { + _handshakeTimer = Timer(options.handshakeTimeout, () async { + _sendReply(_SocksReply.ttlExpired); + await close(); + }); + + _clientSub = _client.listen( + _onClientData, + onDone: close, + onError: (_, __) => close(), + cancelOnError: true, + ); + } + + Future close() async { + if (_closed) return; + _closed = true; + + await _clientSub?.cancel(); + await _remoteSub?.cancel(); + _handshakeTimer?.cancel(); + + _remote?.destroy(); + _client.destroy(); + + onClosed(); + } + + Future _onClientData(List chunk) async { + if (_closed) return; + + if (_state == _SocksState.streaming) { + _remote?.sink.add(chunk); + return; + } + + _buffer.add(chunk); + + try { + await _consumeHandshake(); + } catch (_) { + await close(); + } + } + + Future _consumeHandshake() async { + if (_state == _SocksState.greeting) { + final parsed = _parseGreeting(); + if (!parsed) return; + _state = _SocksState.request; + } + + if (_state == _SocksState.request) { + final target = _parseConnectRequest(); + if (target == null) return; + + if (filter != null && !filter!(target.host, target.port)) { + _sendReply(_SocksReply.connectionNotAllowed); + await close(); + return; + } + + if (!canOpenTunnel()) { + _sendReply(_SocksReply.connectionRefused); + await close(); + return; + } + + try { + _remote = await dial(target.host, target.port).timeout( + options.connectTimeout, + ); + } catch (_) { + _sendReply(_SocksReply.hostUnreachable); + await close(); + return; + } + + _remoteSub = _remote!.stream.listen( + _client.add, + onDone: close, + onError: (_, __) => close(), + cancelOnError: true, + ); + + _sendReply(_SocksReply.succeeded); + _handshakeTimer?.cancel(); + _handshakeTimer = null; + _state = _SocksState.streaming; + + final pending = _buffer.takeAll(); + if (pending.isNotEmpty) { + _remote!.sink.add(pending); + } + } + } + + bool _parseGreeting() { + if (_buffer.length < 2) return false; + + final version = _buffer.peek(0); + final methodsCount = _buffer.peek(1); + final totalLength = 2 + methodsCount; + + if (_buffer.length < totalLength) return false; + + final payload = _buffer.read(totalLength); + + if (version != _socksVersion) { + _sendMethodSelection(0xFF); + throw StateError('Unsupported SOCKS version'); + } + + final methods = payload.sublist(2); + if (methods.contains(0x00)) { + _sendMethodSelection(0x00); + } else { + _sendMethodSelection(0xFF); + throw StateError('No supported authentication method'); + } + + return true; + } + + _TargetAddress? _parseConnectRequest() { + if (_buffer.length < 4) return null; + + final version = _buffer.peek(0); + final command = _buffer.peek(1); + final atyp = _buffer.peek(3); + + if (version != _socksVersion) { + _sendReply(_SocksReply.generalFailure); + throw StateError('Unsupported SOCKS version'); + } + + if (command != 0x01) { + _sendReply(_SocksReply.commandNotSupported); + throw StateError('Unsupported SOCKS command'); + } + + int requiredLength; + if (atyp == 0x01) { + requiredLength = 10; + } else if (atyp == 0x03) { + if (_buffer.length < 5) return null; + requiredLength = 7 + _buffer.peek(4); + } else if (atyp == 0x04) { + requiredLength = 22; + } else { + _sendReply(_SocksReply.addressTypeNotSupported); + throw StateError('Unsupported SOCKS address type'); + } + + if (_buffer.length < requiredLength) return null; + + final request = _buffer.read(requiredLength); + final host = _decodeHost(request, atyp); + final portOffset = requiredLength - 2; + final port = (request[portOffset] << 8) | request[portOffset + 1]; + + return _TargetAddress(host, port); + } + + String _decodeHost(Uint8List request, int atyp) { + if (atyp == 0x01) { + return '${request[4]}.${request[5]}.${request[6]}.${request[7]}'; + } + + if (atyp == 0x03) { + final length = request[4]; + final bytes = request.sublist(5, 5 + length); + return utf8.decode(bytes); + } + + final raw = request.sublist(4, 20); + return InternetAddress.fromRawAddress(Uint8List.fromList(raw)).address; + } + + void _sendMethodSelection(int method) { + _client.add([_socksVersion, method]); + } + + void _sendReply(_SocksReply reply) { + _client.add([ + _socksVersion, + reply.code, + 0x00, + 0x01, + 0x00, + 0x00, + 0x00, + 0x00, + 0x00, + 0x00, + ]); + } +} + +class _ByteBuffer { + final _data = []; + int _offset = 0; + + int get length => _data.length - _offset; + + void add(List chunk) { + _data.addAll(chunk); + } + + int peek(int index) => _data[_offset + index]; + + Uint8List read(int count) { + final slice = Uint8List.fromList(_data.sublist(_offset, _offset + count)); + _offset += count; + + if (_offset >= _data.length) { + _data.clear(); + _offset = 0; + } else if (_offset > 1024 && _offset * 2 > _data.length) { + _data.removeRange(0, _offset); + _offset = 0; + } + + return slice; + } + + Uint8List takeAll() { + if (length == 0) return Uint8List(0); + return read(length); + } +} + +class _TargetAddress { + final String host; + final int port; + + const _TargetAddress(this.host, this.port); +} + +enum _SocksState { + greeting, + request, + streaming, +} + +enum _SocksReply { + succeeded(0x00), + generalFailure(0x01), + connectionNotAllowed(0x02), + connectionRefused(0x05), + ttlExpired(0x06), + hostUnreachable(0x04), + commandNotSupported(0x07), + addressTypeNotSupported(0x08); + + final int code; + + const _SocksReply(this.code); +} diff --git a/lib/src/dynamic_forward_stub.dart b/lib/src/dynamic_forward_stub.dart new file mode 100644 index 0000000..dc7e239 --- /dev/null +++ b/lib/src/dynamic_forward_stub.dart @@ -0,0 +1,18 @@ +import 'package:dartssh2/src/ssh_forward.dart'; + +typedef SSHDynamicDial = Future Function( + String host, + int port, +); + +Future startDynamicForward({ + required String bindHost, + required int? bindPort, + required SSHDynamicForwardOptions options, + SSHDynamicConnectionFilter? filter, + required SSHDynamicDial dial, +}) { + throw UnsupportedError( + 'Dynamic forwarding requires dart:io and is not supported on this platform.', + ); +} diff --git a/lib/src/ssh_client.dart b/lib/src/ssh_client.dart index c3bbc2c..a59d71a 100644 --- a/lib/src/ssh_client.dart +++ b/lib/src/ssh_client.dart @@ -4,6 +4,7 @@ import 'dart:typed_data'; import 'package:dartssh2/src/http/http_client.dart'; import 'package:dartssh2/src/sftp/sftp_client.dart'; +import 'package:dartssh2/src/dynamic_forward.dart'; import 'package:dartssh2/src/ssh_algorithm.dart'; import 'package:dartssh2/src/ssh_agent.dart'; import 'package:dartssh2/src/ssh_channel.dart'; @@ -382,6 +383,30 @@ class SSHClient { return SSHForwardChannel(channelController.channel); } + /// Start a local SOCKS5 server that forwards outbound `CONNECT` requests + /// through this SSH connection. + /// + /// This is similar to `ssh -D`. Only SOCKS5 with `NO AUTH` and `CONNECT` + /// is supported. Use [filter] to optionally deny specific target + /// destinations. Use [options] to tune timeouts and connection limits. + /// + /// Not supported on platforms without `dart:io`. + Future forwardDynamic({ + String bindHost = '127.0.0.1', + int? bindPort, + SSHDynamicForwardOptions options = const SSHDynamicForwardOptions(), + SSHDynamicConnectionFilter? filter, + }) async { + await _authenticated.future; + return startDynamicForward( + bindHost: bindHost, + bindPort: bindPort, + options: options, + filter: filter, + dial: forwardLocal, + ); + } + /// Forward local connections to a remote Unix domain socket at [remoteSocketPath] on the /// remote side via a `direct-streamlocal@openssh.com` channel. /// diff --git a/lib/src/ssh_forward.dart b/lib/src/ssh_forward.dart index f2b462b..2a03f2c 100644 --- a/lib/src/ssh_forward.dart +++ b/lib/src/ssh_forward.dart @@ -4,6 +4,44 @@ import 'dart:typed_data'; import 'package:dartssh2/src/socket/ssh_socket.dart'; import 'package:dartssh2/src/ssh_channel.dart'; +/// Filters outbound targets requested through a dynamic forward (SOCKS proxy). +/// +/// Return `true` to allow connecting to `[host]:[port]`, `false` to deny. +typedef SSHDynamicConnectionFilter = bool Function(String host, int port); + +/// Configuration for [SSHClient.forwardDynamic]. +class SSHDynamicForwardOptions { + /// Maximum time allowed to complete the SOCKS5 handshake and target request. + final Duration handshakeTimeout; + + /// Maximum time allowed to establish the SSH forwarded connection to target. + final Duration connectTimeout; + + /// Maximum number of simultaneous SOCKS client connections. + final int maxConnections; + + const SSHDynamicForwardOptions({ + this.handshakeTimeout = const Duration(seconds: 10), + this.connectTimeout = const Duration(seconds: 15), + this.maxConnections = 128, + }) : assert(maxConnections > 0, 'maxConnections must be greater than zero'); +} + +/// A local dynamic forwarding server (SOCKS5 CONNECT) managed by [SSHClient]. +abstract class SSHDynamicForward { + /// Host/interface the local SOCKS server is bound to. + String get host; + + /// Bound local port of the SOCKS server. + int get port; + + /// Whether this forwarder has already been closed. + bool get isClosed; + + /// Stops accepting new SOCKS connections and closes active ones. + Future close(); +} + class SSHForwardChannel implements SSHSocket { final SSHChannel _channel; diff --git a/test/src/socket/dynamic_forward_io_test.dart b/test/src/socket/dynamic_forward_io_test.dart new file mode 100644 index 0000000..f52fe7d --- /dev/null +++ b/test/src/socket/dynamic_forward_io_test.dart @@ -0,0 +1,481 @@ +import 'dart:async'; +import 'dart:convert'; +import 'dart:io'; +import 'dart:typed_data'; + +import 'package:dartssh2/src/dynamic_forward.dart'; +import 'package:dartssh2/src/message/msg_channel.dart'; +import 'package:dartssh2/src/ssh_channel.dart'; +import 'package:dartssh2/src/ssh_forward.dart'; +import 'package:test/test.dart'; + +void main() { + group('startDynamicForward (io)', () { + test('accepts SOCKS5 connect and proxies data', () async { + late _DialedTunnel dialed; + String? dialHost; + int? dialPort; + + final forward = await startDynamicForward( + bindHost: '127.0.0.1', + bindPort: 0, + options: const SSHDynamicForwardOptions(), + dial: (host, port) async { + dialHost = host; + dialPort = port; + dialed = _DialedTunnel.create(); + return dialed.channel; + }, + ); + + final client = await Socket.connect(forward.host, forward.port); + final incoming = client.asBroadcastStream(); + addTearDown(() async { + await client.close(); + await forward.close(); + dialed.dispose(); + }); + + await _sendGreeting(client, incoming); + final reply = + await _sendConnectDomain(client, incoming, 'example.com', 443); + + expect(reply[0], 0x05); + expect(reply[1], 0x00); + expect(dialHost, 'example.com'); + expect(dialPort, 443); + + client.add(utf8.encode('hello')); + await Future.delayed(const Duration(milliseconds: 20)); + expect(utf8.decode(dialed.sentToRemote), 'hello'); + + dialed.pushFromRemote(utf8.encode('world')); + final tunneled = await _readAtLeast(incoming, 5); + expect(utf8.decode(tunneled), 'world'); + }); + + test('rejects connection when filter returns false', () async { + var dialCalled = false; + + final forward = await startDynamicForward( + bindHost: '127.0.0.1', + bindPort: 0, + options: const SSHDynamicForwardOptions(), + filter: (_, __) => false, + dial: (_, __) async { + dialCalled = true; + return _DialedTunnel.create().channel; + }, + ); + addTearDown(() => forward.close()); + + final client = await Socket.connect(forward.host, forward.port); + final incoming = client.asBroadcastStream(); + addTearDown(() => client.close()); + + await _sendGreeting(client, incoming); + final reply = + await _sendConnectDomain(client, incoming, 'blocked.test', 80); + + expect(reply[1], 0x02); // connection not allowed + expect(dialCalled, isFalse); + }); + + test('rejects new connection when maxConnections is exceeded', () async { + final tunnels = <_DialedTunnel>[]; + + final forward = await startDynamicForward( + bindHost: '127.0.0.1', + bindPort: 0, + options: const SSHDynamicForwardOptions(maxConnections: 1), + dial: (_, __) async { + final tunnel = _DialedTunnel.create(); + tunnels.add(tunnel); + return tunnel.channel; + }, + ); + addTearDown(() async { + for (final tunnel in tunnels) { + tunnel.dispose(); + } + await forward.close(); + }); + + final first = await Socket.connect(forward.host, forward.port); + final firstIncoming = first.asBroadcastStream(); + addTearDown(() => first.close()); + await _sendGreeting(first, firstIncoming); + final firstReply = + await _sendConnectDomain(first, firstIncoming, 'one.test', 80); + expect(firstReply[1], 0x00); + + final second = await Socket.connect(forward.host, forward.port); + final secondIncoming = second.asBroadcastStream(); + addTearDown(() => second.close()); + await _sendGreeting(second, secondIncoming); + final secondReply = await _sendConnectDomain( + second, + secondIncoming, + 'two.test', + 80, + ); + expect(secondReply[1], 0x05); // connection refused + }); + + test('returns host unreachable when dial times out', () async { + final neverCompletes = Completer(); + + final forward = await startDynamicForward( + bindHost: '127.0.0.1', + bindPort: 0, + options: const SSHDynamicForwardOptions( + connectTimeout: Duration(milliseconds: 30), + ), + dial: (_, __) => neverCompletes.future, + ); + addTearDown(() => forward.close()); + + final client = await Socket.connect(forward.host, forward.port); + final incoming = client.asBroadcastStream(); + addTearDown(() => client.close()); + + await _sendGreeting(client, incoming); + final reply = + await _sendConnectDomain(client, incoming, 'timeout.test', 80); + + expect(reply[1], 0x04); // host unreachable + }); + + test('expires idle handshake when no greeting is sent', () async { + final forward = await startDynamicForward( + bindHost: '127.0.0.1', + bindPort: 0, + options: const SSHDynamicForwardOptions( + handshakeTimeout: Duration(milliseconds: 40), + ), + dial: (_, __) async => _DialedTunnel.create().channel, + ); + addTearDown(() => forward.close()); + + final client = await Socket.connect(forward.host, forward.port); + final incoming = client.asBroadcastStream(); + addTearDown(() => client.close()); + + final reply = await _readAtLeast(incoming, 10); + expect(reply[0], 0x05); + expect(reply[1], 0x06); // ttl expired + }); + + test('forwards pending bytes sent with CONNECT request', () async { + late _DialedTunnel dialed; + + final forward = await startDynamicForward( + bindHost: '127.0.0.1', + bindPort: 0, + options: const SSHDynamicForwardOptions(), + dial: (_, __) async { + dialed = _DialedTunnel.create(); + return dialed.channel; + }, + ); + + final client = await Socket.connect(forward.host, forward.port); + final incoming = client.asBroadcastStream(); + addTearDown(() async { + await client.close(); + await forward.close(); + dialed.dispose(); + }); + + await _sendGreeting(client, incoming); + + final hostBytes = utf8.encode('pending.test'); + client.add([ + 0x05, + 0x01, + 0x00, + 0x03, + hostBytes.length, + ...hostBytes, + 0x00, + 0x50, + ...utf8.encode('EXTRA'), + ]); + + final reply = await _readAtLeast(incoming, 10); + expect(reply[1], 0x00); + + await Future.delayed(const Duration(milliseconds: 20)); + expect(utf8.decode(dialed.sentToRemote), 'EXTRA'); + }); + + test('rejects unsupported greeting version', () async { + final forward = await startDynamicForward( + bindHost: '127.0.0.1', + bindPort: 0, + options: const SSHDynamicForwardOptions(), + dial: (_, __) async => _DialedTunnel.create().channel, + ); + addTearDown(() => forward.close()); + + final client = await Socket.connect(forward.host, forward.port); + final incoming = client.asBroadcastStream(); + addTearDown(() => client.close()); + + client.add([0x04, 0x01, 0x00]); + final reply = await _readAtLeast(incoming, 2); + expect(reply[0], 0x05); + expect(reply[1], 0xFF); + }); + + test('rejects unsupported authentication method', () async { + final forward = await startDynamicForward( + bindHost: '127.0.0.1', + bindPort: 0, + options: const SSHDynamicForwardOptions(), + dial: (_, __) async => _DialedTunnel.create().channel, + ); + addTearDown(() => forward.close()); + + final client = await Socket.connect(forward.host, forward.port); + final incoming = client.asBroadcastStream(); + addTearDown(() => client.close()); + + client.add([0x05, 0x01, 0x02]); + final reply = await _readAtLeast(incoming, 2); + expect(reply[0], 0x05); + expect(reply[1], 0xFF); + }); + + test('rejects unsupported request version', () async { + final forward = await startDynamicForward( + bindHost: '127.0.0.1', + bindPort: 0, + options: const SSHDynamicForwardOptions(), + dial: (_, __) async => _DialedTunnel.create().channel, + ); + addTearDown(() => forward.close()); + + final client = await Socket.connect(forward.host, forward.port); + final incoming = client.asBroadcastStream(); + addTearDown(() => client.close()); + + await _sendGreeting(client, incoming); + client.add([0x04, 0x01, 0x00, 0x01, 127, 0, 0, 1, 0, 22]); + final reply = await _readAtLeast(incoming, 10); + expect(reply[1], 0x01); + }); + + test('rejects unsupported request command', () async { + final forward = await startDynamicForward( + bindHost: '127.0.0.1', + bindPort: 0, + options: const SSHDynamicForwardOptions(), + dial: (_, __) async => _DialedTunnel.create().channel, + ); + addTearDown(() => forward.close()); + + final client = await Socket.connect(forward.host, forward.port); + final incoming = client.asBroadcastStream(); + addTearDown(() => client.close()); + + await _sendGreeting(client, incoming); + client.add([0x05, 0x02, 0x00, 0x01, 127, 0, 0, 1, 0, 22]); + final reply = await _readAtLeast(incoming, 10); + expect(reply[1], 0x07); + }); + + test('rejects unsupported address type', () async { + final forward = await startDynamicForward( + bindHost: '127.0.0.1', + bindPort: 0, + options: const SSHDynamicForwardOptions(), + dial: (_, __) async => _DialedTunnel.create().channel, + ); + addTearDown(() => forward.close()); + + final client = await Socket.connect(forward.host, forward.port); + final incoming = client.asBroadcastStream(); + addTearDown(() => client.close()); + + await _sendGreeting(client, incoming); + client.add([0x05, 0x01, 0x00, 0x7F, 0x00, 0x00]); + final reply = await _readAtLeast(incoming, 10); + expect(reply[1], 0x08); + }); + + test('supports IPv4 and IPv6 target addresses', () async { + final tunnels = <_DialedTunnel>[]; + final dialedHosts = []; + + final forward = await startDynamicForward( + bindHost: '127.0.0.1', + bindPort: 0, + options: const SSHDynamicForwardOptions(), + dial: (host, _) async { + dialedHosts.add(host); + final tunnel = _DialedTunnel.create(); + tunnels.add(tunnel); + return tunnel.channel; + }, + ); + addTearDown(() async { + for (final tunnel in tunnels) { + tunnel.dispose(); + } + await forward.close(); + }); + + final ipv4 = await Socket.connect(forward.host, forward.port); + final ipv4Incoming = ipv4.asBroadcastStream(); + addTearDown(() => ipv4.close()); + await _sendGreeting(ipv4, ipv4Incoming); + ipv4.add([0x05, 0x01, 0x00, 0x01, 192, 168, 1, 2, 0, 80]); + final ipv4Reply = await _readAtLeast(ipv4Incoming, 10); + expect(ipv4Reply[1], 0x00); + + final ipv6 = await Socket.connect(forward.host, forward.port); + final ipv6Incoming = ipv6.asBroadcastStream(); + addTearDown(() => ipv6.close()); + await _sendGreeting(ipv6, ipv6Incoming); + ipv6.add([ + 0x05, + 0x01, + 0x00, + 0x04, + 0x20, + 0x01, + 0x0d, + 0xb8, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 1, + 0, + 22, + ]); + final ipv6Reply = await _readAtLeast(ipv6Incoming, 10); + expect(ipv6Reply[1], 0x00); + + expect(dialedHosts.length, 2); + expect(dialedHosts[0], '192.168.1.2'); + expect(dialedHosts[1], contains(':')); + }); + }); +} + +Future _sendGreeting(Socket socket, Stream incoming) async { + socket.add([0x05, 0x01, 0x00]); + final greeting = await _readAtLeast(incoming, 2); + expect(greeting[0], 0x05); + expect(greeting[1], 0x00); +} + +Future _sendConnectDomain( + Socket socket, + Stream incoming, + String host, + int port, +) async { + final hostBytes = utf8.encode(host); + socket.add([ + 0x05, + 0x01, + 0x00, + 0x03, + hostBytes.length, + ...hostBytes, + (port >> 8) & 0xff, + port & 0xff, + ]); + return _readAtLeast(incoming, 10); +} + +Future _readAtLeast( + Stream incoming, + int minBytes, { + Duration timeout = const Duration(seconds: 1), +}) async { + final completer = Completer(); + final buffer = []; + late final StreamSubscription sub; + + sub = incoming.listen( + (chunk) { + buffer.addAll(chunk); + if (buffer.length >= minBytes && !completer.isCompleted) { + completer.complete(Uint8List.fromList(buffer)); + } + }, + onDone: () { + if (!completer.isCompleted) { + completer.complete(Uint8List.fromList(buffer)); + } + }, + onError: (Object error, StackTrace stackTrace) { + if (!completer.isCompleted) { + completer.completeError(error, stackTrace); + } + }, + cancelOnError: true, + ); + + try { + return await completer.future.timeout(timeout); + } finally { + await sub.cancel(); + } +} + +class _DialedTunnel { + _DialedTunnel._(this.channel, this._controller, this.sentToRemote); + + final SSHForwardChannel channel; + final SSHChannelController _controller; + final List sentToRemote; + + factory _DialedTunnel.create() { + final sentToRemote = []; + + final controller = SSHChannelController( + localId: 1, + localMaximumPacketSize: 1024 * 1024, + localInitialWindowSize: 1024 * 1024, + remoteId: 2, + remoteMaximumPacketSize: 1024 * 1024, + remoteInitialWindowSize: 1024 * 1024, + sendMessage: (message) { + if (message is SSH_Message_Channel_Data) { + sentToRemote.addAll(message.data); + } + }, + ); + + return _DialedTunnel._( + SSHForwardChannel(controller.channel), + controller, + sentToRemote, + ); + } + + void pushFromRemote(List data) { + _controller.handleMessage( + SSH_Message_Channel_Data( + recipientChannel: _controller.localId, + data: Uint8List.fromList(data), + ), + ); + } + + void dispose() { + _controller.destroy(); + } +} diff --git a/test/src/ssh_client_forward_dynamic_test.dart b/test/src/ssh_client_forward_dynamic_test.dart new file mode 100644 index 0000000..e1e3b89 --- /dev/null +++ b/test/src/ssh_client_forward_dynamic_test.dart @@ -0,0 +1,86 @@ +import 'dart:async'; +import 'dart:typed_data'; + +import 'package:dartssh2/dartssh2.dart'; +import 'package:dartssh2/src/message/msg_userauth.dart'; +import 'package:test/test.dart'; + +void main() { + group('SSHClient.forwardDynamic', () { + test('waits for authentication before starting', () async { + final client = SSHClient( + _FakeSSHSocket(), + username: 'demo', + keepAliveInterval: null, + ); + + // Simulate server auth success so forwardDynamic can proceed. + scheduleMicrotask(() { + client.handlePacket(SSH_Message_Userauth_Success().encode()); + }); + + final dynamicForward = await client.forwardDynamic( + bindHost: '127.0.0.1', + bindPort: 0, + ); + + expect(dynamicForward.port, greaterThan(0)); + expect(dynamicForward.isClosed, isFalse); + + await dynamicForward.close(); + expect(dynamicForward.isClosed, isTrue); + + client.close(); + await client.done; + }); + }); +} + +class _FakeSSHSocket implements SSHSocket { + final _inputController = StreamController(); + final _doneCompleter = Completer(); + + @override + Stream get stream => _inputController.stream; + + @override + StreamSink> get sink => _NoopSink(); + + @override + Future get done => _doneCompleter.future; + + @override + Future close() async { + if (!_doneCompleter.isCompleted) { + _doneCompleter.complete(); + } + await _inputController.close(); + } + + @override + void destroy() { + if (!_doneCompleter.isCompleted) { + _doneCompleter.complete(); + } + unawaited(_inputController.close()); + } +} + +class _NoopSink implements StreamSink> { + @override + void add(List data) {} + + @override + void addError(Object error, [StackTrace? stackTrace]) {} + + @override + Future addStream(Stream> stream) async { + await for (final _ in stream) {} + } + + @override + Future close() async {} + + @override + Future get done async {} +} diff --git a/test/src/ssh_client_test.dart b/test/src/ssh_client_test.dart index 0775069..4db35e1 100644 --- a/test/src/ssh_client_test.dart +++ b/test/src/ssh_client_test.dart @@ -163,6 +163,25 @@ void main() { }); }); + group('SSHClient.forwardDynamic', () { + test('starts and closes local dynamic forward', () async { + final client = await getTestClient(); + + final dynamicForward = await client.forwardDynamic( + bindHost: '127.0.0.1', + bindPort: 0, + ); + + expect(dynamicForward.port, greaterThan(0)); + expect(dynamicForward.isClosed, isFalse); + + await dynamicForward.close(); + expect(dynamicForward.isClosed, isTrue); + + client.close(); + }); + }); + group('SSHClient.runWithResult', () { test('returns command output and exit code', () async { final client = await getTestClient();