From 7c1b3fb2adbe2154be53cdc3154a1f5b45746454 Mon Sep 17 00:00:00 2001 From: "Francisco A. Lozano" Date: Sun, 22 Mar 2026 20:35:31 +0100 Subject: [PATCH 1/8] Add passive port multiplexing by client IP --- README.md | 1 + driver.go | 1 + passive_multiplexer.go | 398 ++++++++++++++++++++++++++++++++++++ passive_multiplexer_test.go | 131 ++++++++++++ server.go | 8 + transfer_pasv.go | 105 +++++++--- transfer_test.go | 10 +- 7 files changed, 617 insertions(+), 37 deletions(-) create mode 100644 passive_multiplexer.go create mode 100644 passive_multiplexer_test.go diff --git a/README.md b/README.md index 4641d503..36c98677 100644 --- a/README.md +++ b/README.md @@ -126,6 +126,7 @@ type Settings struct { PublicHost string // Public IP to expose (only an IP address is accepted at this stage) PublicIPResolver PublicIPResolver // (Optional) To fetch a public IP lookup PassiveTransferPortRange PasvPortGetter // (Optional) Port Range for data connections. Random if not specified + PassiveTransferPortMultiplexing bool // Allow different client IPs to share passive listener ports ActiveTransferPortNon20 bool // Do not impose the port 20 for active data transfer (#88, RFC 1579) IdleTimeout int // Maximum inactivity time before disconnecting (#58) ConnectionTimeout int // Maximum time to establish passive or active transfer connections diff --git a/driver.go b/driver.go index 63443623..9fc8793c 100644 --- a/driver.go +++ b/driver.go @@ -308,6 +308,7 @@ type Settings struct { PublicHost string // Public IP to expose (only an IP address is accepted at this stage) Banner string // Banner to use in server status response PassiveTransferPortRange PasvPortGetter // (Optional) Port Mapping for data connections. Random if not specified + PassiveTransferPortMultiplexing bool // Allow different client IPs to share passive listener ports PublicIPResolver PublicIPResolver // (Optional) To fetch a public IP lookup IdleTimeout int // Maximum inactivity time before disconnecting (#58) ConnectionTimeout int // Maximum time to establish passive or active transfer connections diff --git a/passive_multiplexer.go b/passive_multiplexer.go new file mode 100644 index 00000000..dbc0d929 --- /dev/null +++ b/passive_multiplexer.go @@ -0,0 +1,398 @@ +package ftpserver + +import ( + "errors" + "fmt" + "log/slog" + "net" + "sync" + "time" +) + +var errPassiveListenerReservedForIP = errors.New("passive listener already reserved for client ip") + +type passiveDeadlineSetter interface { + SetDeadline(time.Time) error +} + +type passivePortCandidate struct { + exposedPort int + listenedPort int +} + +type passiveListenersManager struct { + logger *slog.Logger + mu sync.Mutex + listeners map[int]*sharedPassiveListener + closed bool +} + +func newPassiveListenersManager(logger *slog.Logger) *passiveListenersManager { + return &passiveListenersManager{ + logger: logger, + listeners: make(map[int]*sharedPassiveListener), + } +} + +func (m *passiveListenersManager) reserve(remoteIP net.IP, portRange PasvPortGetter) (int, net.Listener, passiveDeadlineSetter, error) { + for _, candidate := range getPassivePortCandidates(portRange) { + listener, err := m.getOrCreate(candidate.listenedPort) + if err != nil { + continue + } + + reservation, err := listener.reserve(remoteIP) + if err == nil { + return candidate.exposedPort, reservation, reservation, nil + } + if errors.Is(err, errPassiveListenerReservedForIP) { + continue + } + } + + return 0, nil, nil, ErrNoAvailableListeningPort +} + +func (m *passiveListenersManager) close() error { + m.mu.Lock() + if m.closed { + m.mu.Unlock() + return nil + } + + m.closed = true + listeners := make([]*sharedPassiveListener, 0, len(m.listeners)) + for _, listener := range m.listeners { + listeners = append(listeners, listener) + } + m.mu.Unlock() + + var closeErr error + for _, listener := range listeners { + if err := listener.close(); err != nil && closeErr == nil { + closeErr = err + } + } + + return closeErr +} + +func (m *passiveListenersManager) getOrCreate(port int) (*sharedPassiveListener, error) { + m.mu.Lock() + if m.closed { + m.mu.Unlock() + return nil, net.ErrClosed + } + if listener, ok := m.listeners[port]; ok { + m.mu.Unlock() + return listener, nil + } + m.mu.Unlock() + + listener, err := newSharedPassiveListener(port, m.logger.With("passivePort", port)) + if err != nil { + return nil, err + } + + m.mu.Lock() + defer m.mu.Unlock() + + if m.closed { + _ = listener.close() + return nil, net.ErrClosed + } + + if existing, ok := m.listeners[port]; ok { + _ = listener.close() + return existing, nil + } + + m.listeners[port] = listener + return listener, nil +} + +type sharedPassiveListener struct { + logger *slog.Logger + listener *net.TCPListener + mu sync.Mutex + reservations map[string]*passiveReservationListener + closed bool +} + +func newSharedPassiveListener(port int, logger *slog.Logger) (*sharedPassiveListener, error) { + laddr, err := net.ResolveTCPAddr("tcp", fmt.Sprintf("0.0.0.0:%d", port)) + if err != nil { + return nil, newNetworkError(fmt.Sprintf("could not resolve port %d", port), err) + } + + tcpListener, err := net.ListenTCP("tcp", laddr) + if err != nil { + return nil, err + } + + result := &sharedPassiveListener{ + logger: logger, + listener: tcpListener, + reservations: make(map[string]*passiveReservationListener), + } + + go result.serve() + + return result, nil +} + +func (l *sharedPassiveListener) serve() { + for { + conn, err := l.listener.Accept() + if err != nil { + if isClosedListenerError(err) { + l.failAll(net.ErrClosed) + return + } + + var netErr net.Error + if errors.As(err, &netErr) && netErr.Temporary() { //nolint:staticcheck + l.logger.Warn("Temporary passive accept error", "err", err) + continue + } + + l.failAll(err) + return + } + + l.dispatch(conn) + } +} + +func (l *sharedPassiveListener) reserve(remoteIP net.IP) (*passiveReservationListener, error) { + key := remoteIP.String() + + l.mu.Lock() + defer l.mu.Unlock() + + if l.closed { + return nil, net.ErrClosed + } + if _, ok := l.reservations[key]; ok { + return nil, errPassiveListenerReservedForIP + } + + reservation := &passiveReservationListener{ + parent: l, + remoteIP: key, + connCh: make(chan net.Conn, 1), + closedCh: make(chan struct{}), + } + l.reservations[key] = reservation + + return reservation, nil +} + +func (l *sharedPassiveListener) dispatch(conn net.Conn) { + ipAddress, err := getIPFromRemoteAddr(conn.RemoteAddr()) + if err != nil { + l.logger.Warn("Could not parse passive data connection IP", "err", err) + _ = conn.Close() + return + } + + key := ipAddress.String() + + l.mu.Lock() + reservation := l.reservations[key] + if reservation != nil { + delete(l.reservations, key) + } + l.mu.Unlock() + + if reservation == nil || !reservation.deliver(conn) { + _ = conn.Close() + } +} + +func (l *sharedPassiveListener) release(remoteIP string) { + l.mu.Lock() + defer l.mu.Unlock() + + if reservation, ok := l.reservations[remoteIP]; ok { + delete(l.reservations, remoteIP) + reservation.markReleased() + } +} + +func (l *sharedPassiveListener) failAll(err error) { + l.mu.Lock() + if l.closed { + l.mu.Unlock() + return + } + + l.closed = true + reservations := make([]*passiveReservationListener, 0, len(l.reservations)) + for _, reservation := range l.reservations { + reservations = append(reservations, reservation) + } + l.reservations = nil + l.mu.Unlock() + + for _, reservation := range reservations { + reservation.fail(err) + } +} + +func (l *sharedPassiveListener) close() error { + err := l.listener.Close() + l.failAll(net.ErrClosed) + return err +} + +type passiveReservationListener struct { + parent *sharedPassiveListener + remoteIP string + connCh chan net.Conn + closedCh chan struct{} + closeOnce sync.Once + stateMu sync.Mutex + deadline time.Time + released bool + failureErr error +} + +func (l *passiveReservationListener) Accept() (net.Conn, error) { + timeout := l.getDeadline() + var timerCh <-chan time.Time + var timer *time.Timer + + if !timeout.IsZero() { + wait := time.Until(timeout) + if wait <= 0 { + return nil, newPassiveAcceptTimeoutError() + } + + timer = time.NewTimer(wait) + timerCh = timer.C + defer timer.Stop() + } + + select { + case conn := <-l.connCh: + if conn == nil { + return nil, l.getFailure() + } + return conn, nil + case <-l.closedCh: + return nil, l.getFailure() + case <-timerCh: + return nil, newPassiveAcceptTimeoutError() + } +} + +func (l *passiveReservationListener) Close() error { + l.closeOnce.Do(func() { + l.parent.release(l.remoteIP) + l.markReleased() + + select { + case conn := <-l.connCh: + if conn != nil { + _ = conn.Close() + } + default: + } + + close(l.closedCh) + }) + + return nil +} + +func (l *passiveReservationListener) Addr() net.Addr { + return l.parent.listener.Addr() +} + +func (l *passiveReservationListener) SetDeadline(deadline time.Time) error { + l.stateMu.Lock() + defer l.stateMu.Unlock() + + l.deadline = deadline + return nil +} + +func (l *passiveReservationListener) deliver(conn net.Conn) bool { + select { + case <-l.closedCh: + return false + default: + } + + select { + case l.connCh <- conn: + l.markReleased() + return true + case <-l.closedCh: + return false + } +} + +func (l *passiveReservationListener) fail(err error) { + l.stateMu.Lock() + if l.failureErr == nil { + l.failureErr = err + } + l.stateMu.Unlock() + + _ = l.Close() +} + +func (l *passiveReservationListener) markReleased() { + l.stateMu.Lock() + defer l.stateMu.Unlock() + + l.released = true +} + +func (l *passiveReservationListener) getDeadline() time.Time { + l.stateMu.Lock() + defer l.stateMu.Unlock() + + return l.deadline +} + +func (l *passiveReservationListener) getFailure() error { + l.stateMu.Lock() + defer l.stateMu.Unlock() + + if l.failureErr != nil { + return l.failureErr + } + + return net.ErrClosed +} + +type passiveAcceptTimeoutError struct{} + +func (passiveAcceptTimeoutError) Error() string { return "i/o timeout" } +func (passiveAcceptTimeoutError) Timeout() bool { return true } +func (passiveAcceptTimeoutError) Temporary() bool { return true } + +func newPassiveAcceptTimeoutError() error { + return &net.OpError{ + Op: "accept", + Net: "tcp", + Err: passiveAcceptTimeoutError{}, + } +} + +func isClosedListenerError(err error) bool { + if errors.Is(err, net.ErrClosed) { + return true + } + + errOp := &net.OpError{} + if errors.As(err, &errOp) && errOp.Err != nil { + return errOp.Err.Error() == "use of closed network connection" + } + + return false +} + diff --git a/passive_multiplexer_test.go b/passive_multiplexer_test.go new file mode 100644 index 00000000..a1163c1d --- /dev/null +++ b/passive_multiplexer_test.go @@ -0,0 +1,131 @@ +package ftpserver + +import ( + "io" + "log/slog" + "net" + "testing" + + "github.com/secsy/goftp" + "github.com/stretchr/testify/require" +) + +func getFreePassivePort(t *testing.T) int { + t.Helper() + + listener, err := net.ListenTCP("tcp", &net.TCPAddr{IP: net.ParseIP("127.0.0.1"), Port: 0}) + require.NoError(t, err) + defer func() { + require.NoError(t, listener.Close()) + }() + + return listener.Addr().(*net.TCPAddr).Port +} + +func TestPassiveListenersManagerMultiplexesByClientIP(t *testing.T) { + req := require.New(t) + port := getFreePassivePort(t) + manager := newPassiveListenersManager(slog.New(slog.NewTextHandler(io.Discard, nil))) //nolint:sloglint + defer func() { + req.NoError(manager.close()) + }() + + portRange := &PortRange{Start: port, End: port} + ip1 := net.ParseIP("127.0.0.2") + ip2 := net.ParseIP("127.0.0.3") + + exposedPort1, listener1, _, err := manager.reserve(ip1, portRange) + req.NoError(err) + req.Equal(port, exposedPort1) + + exposedPort2, listener2, _, err := manager.reserve(ip2, portRange) + req.NoError(err) + req.Equal(port, exposedPort2) + + req.Len(manager.listeners, 1) + sharedListener := manager.listeners[port] + + conn1 := &testNetConn{remoteAddr: &net.TCPAddr{IP: ip1, Port: 40001}} + sharedListener.dispatch(conn1) + accepted1, err := listener1.Accept() + req.NoError(err) + req.Same(conn1, accepted1) + + conn2 := &testNetConn{remoteAddr: &net.TCPAddr{IP: ip2, Port: 40002}} + sharedListener.dispatch(conn2) + accepted2, err := listener2.Accept() + req.NoError(err) + req.Same(conn2, accepted2) +} + +func TestPassiveListenersManagerRejectsSameIPForSamePort(t *testing.T) { + req := require.New(t) + port := getFreePassivePort(t) + manager := newPassiveListenersManager(slog.New(slog.NewTextHandler(io.Discard, nil))) //nolint:sloglint + defer func() { + req.NoError(manager.close()) + }() + + portRange := &PortRange{Start: port, End: port} + ip := net.ParseIP("127.0.0.2") + + exposedPort, _, _, err := manager.reserve(ip, portRange) + req.Equal(port, exposedPort) + req.NoError(err) + + _, _, _, err = manager.reserve(ip, portRange) + req.ErrorIs(err, ErrNoAvailableListeningPort) +} + +func TestPassiveListenersManagerCloseReleasesReservation(t *testing.T) { + req := require.New(t) + port := getFreePassivePort(t) + manager := newPassiveListenersManager(slog.New(slog.NewTextHandler(io.Discard, nil))) //nolint:sloglint + defer func() { + req.NoError(manager.close()) + }() + + portRange := &PortRange{Start: port, End: port} + ip := net.ParseIP("127.0.0.2") + + _, listener, _, err := manager.reserve(ip, portRange) + req.NoError(err) + req.NoError(listener.Close()) + + _, _, _, err = manager.reserve(ip, portRange) + req.NoError(err) +} + +func TestPassivePortMultiplexingSameClientExhaustion(t *testing.T) { + req := require.New(t) + port := getFreePassivePort(t) + driver := &TestServerDriver{ + Settings: &Settings{ + ListenAddr: "127.0.0.1:0", + DefaultTransferType: TransferTypeBinary, + PassiveTransferPortRange: &PortRange{Start: port, End: port}, + PassiveTransferPortMultiplexing: true, + }, + } + server := NewTestServerWithTestDriver(t, driver) + + client, err := goftp.DialConfig(goftp.Config{ + User: authUser, + Password: authPass, + }, server.Addr()) + req.NoError(err) + defer func() { panicOnError(client.Close()) }() + + raw, err := client.OpenRawConn() + req.NoError(err) + defer func() { req.NoError(raw.Close()) }() + + returnCode, message, err := raw.SendCommand("PASV") + req.NoError(err) + req.Equal(StatusEnteringPASV, returnCode, message) + + returnCode, message, err = raw.SendCommand("PASV") + req.NoError(err) + req.Equal(StatusServiceNotAvailable, returnCode, message) + req.Contains(message, ErrNoAvailableListeningPort.Error()) +} diff --git a/server.go b/server.go index 942c6fa3..6a28e84e 100644 --- a/server.go +++ b/server.go @@ -129,6 +129,7 @@ type FtpServer struct { Logger *slog.Logger // Structured logger (log/slog) settings *Settings // General settings listener net.Listener // listener used to receive files + passiveListeners *passiveListenersManager clientCounter uint32 // Clients counter driver MainDriver // Driver to handle the client authentication and the file access driver selection } @@ -165,6 +166,7 @@ func (server *FtpServer) loadSettings() error { } server.settings = settings + server.passiveListeners = newPassiveListenersManager(server.Logger) return nil } @@ -351,6 +353,12 @@ func (server *FtpServer) Stop() error { return newNetworkError("couln't close listener", err) } + if server.passiveListeners != nil { + if err := server.passiveListeners.close(); err != nil && !errors.Is(err, net.ErrClosed) { + server.Logger.Warn("Could not close passive listeners", "err", err) + } + } + return nil } diff --git a/transfer_pasv.go b/transfer_pasv.go index 3044d5a3..8cb70d3c 100644 --- a/transfer_pasv.go +++ b/transfer_pasv.go @@ -29,13 +29,13 @@ var _ transferHandler = (*passiveTransferHandler)(nil) // Passive connection type passiveTransferHandler struct { - listener net.Listener // TCP or SSL Listener - tcpListener *net.TCPListener // TCP Listener (only keeping it to define a deadline during the accept) - Port int // TCP Port we are listening on - connection net.Conn // TCP Connection established - settings *Settings // Settings - info string // transfer info - logger *slog.Logger // Logger + listener net.Listener // TCP or SSL Listener + deadlineSetter passiveDeadlineSetter // Listener used to set accept deadlines + Port int // TCP Port we are listening on + connection net.Conn // TCP Connection established + settings *Settings // Settings + info string // transfer info + logger *slog.Logger // Logger // data connection requirement checker checkDataConn func(dataConnIP net.IP, channelType DataChannel) error } @@ -85,38 +85,63 @@ const ( portSearchMaxAttempts = 1000 ) -func (c *clientHandler) findListenerWithinPortRange(portMapping PasvPortGetter) (int, *net.TCPListener, error) { +func getPassivePortCandidates(portMapping PasvPortGetter) []passivePortCandidate { nbAttempts := portMapping.NumberAttempts() - // Making sure we trying a reasonable amount of ports before giving up if nbAttempts < portSearchMinAttempts { nbAttempts = portSearchMinAttempts } else if nbAttempts > portSearchMaxAttempts { nbAttempts = portSearchMaxAttempts } - for i := 0; i < nbAttempts; i++ { + maxFetches := nbAttempts * 4 + if maxFetches < nbAttempts { + maxFetches = nbAttempts + } + + result := make([]passivePortCandidate, 0, nbAttempts) + tried := make(map[int]struct{}, nbAttempts) + + for i := 0; len(result) < nbAttempts && i < maxFetches; i++ { exposedPort, listenedPort, ok := portMapping.FetchNext() if !ok { break } - laddr, errResolve := net.ResolveTCPAddr("tcp", fmt.Sprintf("0.0.0.0:%d", listenedPort)) + if _, ok := tried[listenedPort]; ok { + continue + } + + tried[listenedPort] = struct{}{} + result = append(result, passivePortCandidate{ + exposedPort: exposedPort, + listenedPort: listenedPort, + }) + } + + return result +} + +func (c *clientHandler) findListenerWithinPortRange(portMapping PasvPortGetter) (int, *net.TCPListener, error) { + candidates := getPassivePortCandidates(portMapping) + + for _, candidate := range candidates { + laddr, errResolve := net.ResolveTCPAddr("tcp", fmt.Sprintf("0.0.0.0:%d", candidate.listenedPort)) if errResolve != nil { - c.logger.Error("Problem resolving local port", "err", errResolve, "port", listenedPort) + c.logger.Error("Problem resolving local port", "err", errResolve, "port", candidate.listenedPort) - return 0, nil, newNetworkError(fmt.Sprintf("could not resolve port %d", listenedPort), errResolve) + return 0, nil, newNetworkError(fmt.Sprintf("could not resolve port %d", candidate.listenedPort), errResolve) } tcpListener, errListen := net.ListenTCP("tcp", laddr) if errListen == nil { - return exposedPort, tcpListener, nil + return candidate.exposedPort, tcpListener, nil } } c.logger.Warn( "Could not find any free port", - "nbAttempts", nbAttempts, + "nbAttempts", len(candidates), ) return 0, nil, ErrNoAvailableListeningPort @@ -125,15 +150,31 @@ func (c *clientHandler) findListenerWithinPortRange(portMapping PasvPortGetter) func (c *clientHandler) handlePASV(_ string) error { command := c.GetLastCommand() addr, _ := net.ResolveTCPAddr("tcp", ":0") - var tcpListener *net.TCPListener var err error portMapping := c.server.settings.PassiveTransferPortRange exposedPort := 0 + var listener net.Listener + var deadlineSetter passiveDeadlineSetter + + if c.server.settings.PassiveTransferPortMultiplexing && portMapping != nil { + controlConnIP, errIP := getIPFromRemoteAddr(c.RemoteAddr()) + if errIP != nil { + c.writeMessage(StatusServiceNotAvailable, fmt.Sprintf("Could not listen for passive connection: %v", errIP)) + return nil + } - if portMapping != nil { - exposedPort, tcpListener, err = c.findListenerWithinPortRange(portMapping) + exposedPort, listener, deadlineSetter, err = c.server.passiveListeners.reserve(controlConnIP, portMapping) } else { - tcpListener, err = net.ListenTCP("tcp", addr) + var tcpListener *net.TCPListener + if portMapping != nil { + exposedPort, tcpListener, err = c.findListenerWithinPortRange(portMapping) + } else { + tcpListener, err = net.ListenTCP("tcp", addr) + } + if err == nil { + listener = tcpListener + deadlineSetter = tcpListener + } } if err != nil { @@ -142,9 +183,6 @@ func (c *clientHandler) handlePASV(_ string) error { return nil } - // The listener will either be plain TCP or TLS - var listener net.Listener - listener = tcpListener if wrapper, ok := c.server.driver.(MainDriverExtensionPassiveWrapper); ok { listener, err = wrapper.WrapPassiveListener(listener) @@ -167,17 +205,17 @@ func (c *clientHandler) handlePASV(_ string) error { } if exposedPort == 0 { - if tcpAddr, ok := tcpListener.Addr().(*net.TCPAddr); ok { + if tcpAddr, ok := listener.Addr().(*net.TCPAddr); ok { exposedPort = tcpAddr.Port } } transferHandler := &passiveTransferHandler{ - tcpListener: tcpListener, - listener: listener, - Port: exposedPort, - settings: c.server.settings, - logger: c.logger, - checkDataConn: c.checkDataConnectionRequirement, + listener: listener, + deadlineSetter: deadlineSetter, + Port: exposedPort, + settings: c.server.settings, + logger: c.logger, + checkDataConn: c.checkDataConnectionRequirement, } // We should rewrite this part @@ -227,7 +265,10 @@ func (c *clientHandler) handlePassivePASV(transferHandler *passiveTransferHandle func (p *passiveTransferHandler) ConnectionWait(wait time.Duration) (net.Conn, error) { if p.connection == nil { var err error - if err = p.tcpListener.SetDeadline(time.Now().Add(wait)); err != nil { + if p.deadlineSetter != nil { + err = p.deadlineSetter.SetDeadline(time.Now().Add(wait)) + } + if err != nil { return nil, fmt.Errorf("failed to set deadline: %w", err) } @@ -270,8 +311,8 @@ func (p *passiveTransferHandler) Open() (net.Conn, error) { // Closing only the client connection is not supported at that time func (p *passiveTransferHandler) Close() error { - if p.tcpListener != nil { - if err := p.tcpListener.Close(); err != nil { + if p.listener != nil { + if err := p.listener.Close(); err != nil && !errors.Is(err, net.ErrClosed) { p.logger.Warn("Problem closing passive listener", "err", err) } } diff --git a/transfer_test.go b/transfer_test.go index 8557cb3a..edfb9d0f 100644 --- a/transfer_test.go +++ b/transfer_test.go @@ -1117,11 +1117,11 @@ func TestPASVConnectionWait(t *testing.T) { remoteAddr: &net.TCPAddr{IP: nil, Port: 21}, // invalid IP }, }, - tcpListener: tcpListener, - Port: tcpListener.Addr().(*net.TCPAddr).Port, - settings: cltHandler.server.settings, - logger: slog.New(slog.NewTextHandler(io.Discard, nil)), //nolint:sloglint // DiscardHandler requires Go 1.23+ - checkDataConn: cltHandler.checkDataConnectionRequirement, + deadlineSetter: tcpListener, + Port: tcpListener.Addr().(*net.TCPAddr).Port, + settings: cltHandler.server.settings, + logger: slog.New(slog.NewTextHandler(io.Discard, nil)), //nolint:sloglint // DiscardHandler requires Go 1.23+ + checkDataConn: cltHandler.checkDataConnectionRequirement, } defer func() { From 146c9377f3a249f72e45166cb881719115fb8dbe Mon Sep 17 00:00:00 2001 From: "Francisco A. Lozano" Date: Sun, 22 Mar 2026 20:42:32 +0100 Subject: [PATCH 2/8] Run gofmt on passive listener multiplexing --- driver.go | 46 ++++++++++++++++++------------------- passive_multiplexer.go | 19 ++++++++------- passive_multiplexer_test.go | 8 +++---- server.go | 10 ++++---- transfer_pasv.go | 14 +++++------ 5 files changed, 48 insertions(+), 49 deletions(-) diff --git a/driver.go b/driver.go index 9fc8793c..100ff11c 100644 --- a/driver.go +++ b/driver.go @@ -303,29 +303,29 @@ const ( // Settings defines all the server settings type Settings struct { - Listener net.Listener // (Optional) To provide an already initialized listener - ListenAddr string // Listening address - PublicHost string // Public IP to expose (only an IP address is accepted at this stage) - Banner string // Banner to use in server status response - PassiveTransferPortRange PasvPortGetter // (Optional) Port Mapping for data connections. Random if not specified - PassiveTransferPortMultiplexing bool // Allow different client IPs to share passive listener ports - PublicIPResolver PublicIPResolver // (Optional) To fetch a public IP lookup - IdleTimeout int // Maximum inactivity time before disconnecting (#58) - ConnectionTimeout int // Maximum time to establish passive or active transfer connections - ActiveTransferPortNon20 bool // Do not impose the port 20 for active data transfer (#88, RFC 1579) - DisableMLSD bool // Disable MLSD support - DisableMLST bool // Disable MLST support - DisableMFMT bool // Disable MFMT support (modify file mtime) - TLSRequired TLSRequirement // defines the TLS mode - DisableLISTArgs bool // Disable ls like options (-a,-la etc.) for directory listing - DisableSite bool // Disable SITE command - DisableActiveMode bool // Disable Active FTP - EnableHASH bool // Enable support for calculating hash value of files - DisableSTAT bool // Disable Server STATUS, STAT on files and directories will still work - DisableSYST bool // Disable SYST - EnableCOMB bool // Enable COMB support - DeflateCompressionLevel int // Deflate compression level (0-9). 0 means disabled - DefaultTransferType TransferType // Transfer type to use if the client don't send the TYPE command + Listener net.Listener // (Optional) To provide an already initialized listener + ListenAddr string // Listening address + PublicHost string // Public IP to expose (only an IP address is accepted at this stage) + Banner string // Banner to use in server status response + PassiveTransferPortRange PasvPortGetter // (Optional) Port Mapping for data connections. Random if not specified + PassiveTransferPortMultiplexing bool // Allow different client IPs to share passive listener ports + PublicIPResolver PublicIPResolver // (Optional) To fetch a public IP lookup + IdleTimeout int // Maximum inactivity time before disconnecting (#58) + ConnectionTimeout int // Maximum time to establish passive or active transfer connections + ActiveTransferPortNon20 bool // Do not impose the port 20 for active data transfer (#88, RFC 1579) + DisableMLSD bool // Disable MLSD support + DisableMLST bool // Disable MLST support + DisableMFMT bool // Disable MFMT support (modify file mtime) + TLSRequired TLSRequirement // defines the TLS mode + DisableLISTArgs bool // Disable ls like options (-a,-la etc.) for directory listing + DisableSite bool // Disable SITE command + DisableActiveMode bool // Disable Active FTP + EnableHASH bool // Enable support for calculating hash value of files + DisableSTAT bool // Disable Server STATUS, STAT on files and directories will still work + DisableSYST bool // Disable SYST + EnableCOMB bool // Enable COMB support + DeflateCompressionLevel int // Deflate compression level (0-9). 0 means disabled + DefaultTransferType TransferType // Transfer type to use if the client don't send the TYPE command // ActiveConnectionsCheck defines the security requirements for active connections ActiveConnectionsCheck DataConnectionRequirement // PasvConnectionsCheck defines the security requirements for passive connections diff --git a/passive_multiplexer.go b/passive_multiplexer.go index dbc0d929..178343e7 100644 --- a/passive_multiplexer.go +++ b/passive_multiplexer.go @@ -247,15 +247,15 @@ func (l *sharedPassiveListener) close() error { } type passiveReservationListener struct { - parent *sharedPassiveListener - remoteIP string - connCh chan net.Conn - closedCh chan struct{} - closeOnce sync.Once - stateMu sync.Mutex - deadline time.Time - released bool - failureErr error + parent *sharedPassiveListener + remoteIP string + connCh chan net.Conn + closedCh chan struct{} + closeOnce sync.Once + stateMu sync.Mutex + deadline time.Time + released bool + failureErr error } func (l *passiveReservationListener) Accept() (net.Conn, error) { @@ -395,4 +395,3 @@ func isClosedListenerError(err error) bool { return false } - diff --git a/passive_multiplexer_test.go b/passive_multiplexer_test.go index a1163c1d..721bc712 100644 --- a/passive_multiplexer_test.go +++ b/passive_multiplexer_test.go @@ -101,10 +101,10 @@ func TestPassivePortMultiplexingSameClientExhaustion(t *testing.T) { port := getFreePassivePort(t) driver := &TestServerDriver{ Settings: &Settings{ - ListenAddr: "127.0.0.1:0", - DefaultTransferType: TransferTypeBinary, - PassiveTransferPortRange: &PortRange{Start: port, End: port}, - PassiveTransferPortMultiplexing: true, + ListenAddr: "127.0.0.1:0", + DefaultTransferType: TransferTypeBinary, + PassiveTransferPortRange: &PortRange{Start: port, End: port}, + PassiveTransferPortMultiplexing: true, }, } server := NewTestServerWithTestDriver(t, driver) diff --git a/server.go b/server.go index 6a28e84e..d3f9b0bd 100644 --- a/server.go +++ b/server.go @@ -126,12 +126,12 @@ var specialAttentionCommands = []string{"ABOR", "STAT", "QUIT"} //nolint:gocheck // FtpServer is where everything is stored // We want to keep it as simple as possible type FtpServer struct { - Logger *slog.Logger // Structured logger (log/slog) - settings *Settings // General settings - listener net.Listener // listener used to receive files + Logger *slog.Logger // Structured logger (log/slog) + settings *Settings // General settings + listener net.Listener // listener used to receive files passiveListeners *passiveListenersManager - clientCounter uint32 // Clients counter - driver MainDriver // Driver to handle the client authentication and the file access driver selection + clientCounter uint32 // Clients counter + driver MainDriver // Driver to handle the client authentication and the file access driver selection } func (server *FtpServer) loadSettings() error { diff --git a/transfer_pasv.go b/transfer_pasv.go index 8cb70d3c..e4b8b211 100644 --- a/transfer_pasv.go +++ b/transfer_pasv.go @@ -29,13 +29,13 @@ var _ transferHandler = (*passiveTransferHandler)(nil) // Passive connection type passiveTransferHandler struct { - listener net.Listener // TCP or SSL Listener - deadlineSetter passiveDeadlineSetter // Listener used to set accept deadlines - Port int // TCP Port we are listening on - connection net.Conn // TCP Connection established - settings *Settings // Settings - info string // transfer info - logger *slog.Logger // Logger + listener net.Listener // TCP or SSL Listener + deadlineSetter passiveDeadlineSetter // Listener used to set accept deadlines + Port int // TCP Port we are listening on + connection net.Conn // TCP Connection established + settings *Settings // Settings + info string // transfer info + logger *slog.Logger // Logger // data connection requirement checker checkDataConn func(dataConnIP net.IP, channelType DataChannel) error } From a1a06da15adc9683f86e90bbedc37d6454ecbc8f Mon Sep 17 00:00:00 2001 From: "Francisco A. Lozano" Date: Sun, 22 Mar 2026 22:25:53 +0100 Subject: [PATCH 3/8] style(transfer): fix lint issues in passive multiplexing --- driver.go | 22 ++++---- passive_multiplexer.go | 14 ++++- passive_multiplexer_test.go | 27 +++++++--- transfer_pasv.go | 104 +++++++++++++++++++----------------- 4 files changed, 99 insertions(+), 68 deletions(-) diff --git a/driver.go b/driver.go index 100ff11c..54562563 100644 --- a/driver.go +++ b/driver.go @@ -303,11 +303,12 @@ const ( // Settings defines all the server settings type Settings struct { - Listener net.Listener // (Optional) To provide an already initialized listener - ListenAddr string // Listening address - PublicHost string // Public IP to expose (only an IP address is accepted at this stage) - Banner string // Banner to use in server status response - PassiveTransferPortRange PasvPortGetter // (Optional) Port Mapping for data connections. Random if not specified + Listener net.Listener // (Optional) To provide an already initialized listener + ListenAddr string // Listening address + PublicHost string // Public IP to expose (only an IP address is accepted at this stage) + Banner string // Banner to use in server status response + // PassiveTransferPortRange is the optional port mapping for passive data connections. + PassiveTransferPortRange PasvPortGetter PassiveTransferPortMultiplexing bool // Allow different client IPs to share passive listener ports PublicIPResolver PublicIPResolver // (Optional) To fetch a public IP lookup IdleTimeout int // Maximum inactivity time before disconnecting (#58) @@ -321,11 +322,12 @@ type Settings struct { DisableSite bool // Disable SITE command DisableActiveMode bool // Disable Active FTP EnableHASH bool // Enable support for calculating hash value of files - DisableSTAT bool // Disable Server STATUS, STAT on files and directories will still work - DisableSYST bool // Disable SYST - EnableCOMB bool // Enable COMB support - DeflateCompressionLevel int // Deflate compression level (0-9). 0 means disabled - DefaultTransferType TransferType // Transfer type to use if the client don't send the TYPE command + // DisableSTAT disables Server STATUS. STAT on files and directories still works. + DisableSTAT bool + DisableSYST bool // Disable SYST + EnableCOMB bool // Enable COMB support + DeflateCompressionLevel int // Deflate compression level (0-9). 0 means disabled + DefaultTransferType TransferType // Transfer type to use if the client don't send the TYPE command // ActiveConnectionsCheck defines the security requirements for active connections ActiveConnectionsCheck DataConnectionRequirement // PasvConnectionsCheck defines the security requirements for passive connections diff --git a/passive_multiplexer.go b/passive_multiplexer.go index 178343e7..c7ef57e6 100644 --- a/passive_multiplexer.go +++ b/passive_multiplexer.go @@ -12,7 +12,7 @@ import ( var errPassiveListenerReservedForIP = errors.New("passive listener already reserved for client ip") type passiveDeadlineSetter interface { - SetDeadline(time.Time) error + SetDeadline(deadline time.Time) error } type passivePortCandidate struct { @@ -34,7 +34,10 @@ func newPassiveListenersManager(logger *slog.Logger) *passiveListenersManager { } } -func (m *passiveListenersManager) reserve(remoteIP net.IP, portRange PasvPortGetter) (int, net.Listener, passiveDeadlineSetter, error) { +func (m *passiveListenersManager) reserve( + remoteIP net.IP, + portRange PasvPortGetter, +) (int, net.Listener, passiveDeadlineSetter, error) { for _, candidate := range getPassivePortCandidates(portRange) { listener, err := m.getOrCreate(candidate.listenedPort) if err != nil { @@ -57,6 +60,7 @@ func (m *passiveListenersManager) close() error { m.mu.Lock() if m.closed { m.mu.Unlock() + return nil } @@ -81,10 +85,12 @@ func (m *passiveListenersManager) getOrCreate(port int) (*sharedPassiveListener, m.mu.Lock() if m.closed { m.mu.Unlock() + return nil, net.ErrClosed } if listener, ok := m.listeners[port]; ok { m.mu.Unlock() + return listener, nil } m.mu.Unlock() @@ -99,15 +105,18 @@ func (m *passiveListenersManager) getOrCreate(port int) (*sharedPassiveListener, if m.closed { _ = listener.close() + return nil, net.ErrClosed } if existing, ok := m.listeners[port]; ok { _ = listener.close() + return existing, nil } m.listeners[port] = listener + return listener, nil } @@ -153,6 +162,7 @@ func (l *sharedPassiveListener) serve() { var netErr net.Error if errors.As(err, &netErr) && netErr.Temporary() { //nolint:staticcheck l.logger.Warn("Temporary passive accept error", "err", err) + continue } diff --git a/passive_multiplexer_test.go b/passive_multiplexer_test.go index 721bc712..32842648 100644 --- a/passive_multiplexer_test.go +++ b/passive_multiplexer_test.go @@ -19,7 +19,10 @@ func getFreePassivePort(t *testing.T) int { require.NoError(t, listener.Close()) }() - return listener.Addr().(*net.TCPAddr).Port + addr, ok := listener.Addr().(*net.TCPAddr) + require.True(t, ok) + + return addr.Port } func TestPassiveListenersManagerMultiplexesByClientIP(t *testing.T) { @@ -67,14 +70,19 @@ func TestPassiveListenersManagerRejectsSameIPForSamePort(t *testing.T) { }() portRange := &PortRange{Start: port, End: port} - ip := net.ParseIP("127.0.0.2") + clientIP := net.ParseIP("127.0.0.2") - exposedPort, _, _, err := manager.reserve(ip, portRange) + exposedPort, listener, deadlineSetter, err := manager.reserve(clientIP, portRange) req.Equal(port, exposedPort) req.NoError(err) + req.NotNil(listener) + req.NotNil(deadlineSetter) - _, _, _, err = manager.reserve(ip, portRange) + exposedPort, listener, deadlineSetter, err = manager.reserve(clientIP, portRange) req.ErrorIs(err, ErrNoAvailableListeningPort) + req.Zero(exposedPort) + req.Nil(listener) + req.Nil(deadlineSetter) } func TestPassiveListenersManagerCloseReleasesReservation(t *testing.T) { @@ -86,14 +94,19 @@ func TestPassiveListenersManagerCloseReleasesReservation(t *testing.T) { }() portRange := &PortRange{Start: port, End: port} - ip := net.ParseIP("127.0.0.2") + clientIP := net.ParseIP("127.0.0.2") - _, listener, _, err := manager.reserve(ip, portRange) + exposedPort, listener, deadlineSetter, err := manager.reserve(clientIP, portRange) req.NoError(err) + req.Equal(port, exposedPort) + req.NotNil(deadlineSetter) req.NoError(listener.Close()) - _, _, _, err = manager.reserve(ip, portRange) + exposedPort, listener, deadlineSetter, err = manager.reserve(clientIP, portRange) req.NoError(err) + req.Equal(port, exposedPort) + req.NotNil(listener) + req.NotNil(deadlineSetter) } func TestPassivePortMultiplexingSameClientExhaustion(t *testing.T) { diff --git a/transfer_pasv.go b/transfer_pasv.go index e4b8b211..6038164c 100644 --- a/transfer_pasv.go +++ b/transfer_pasv.go @@ -149,33 +149,7 @@ func (c *clientHandler) findListenerWithinPortRange(portMapping PasvPortGetter) func (c *clientHandler) handlePASV(_ string) error { command := c.GetLastCommand() - addr, _ := net.ResolveTCPAddr("tcp", ":0") - var err error - portMapping := c.server.settings.PassiveTransferPortRange - exposedPort := 0 - var listener net.Listener - var deadlineSetter passiveDeadlineSetter - - if c.server.settings.PassiveTransferPortMultiplexing && portMapping != nil { - controlConnIP, errIP := getIPFromRemoteAddr(c.RemoteAddr()) - if errIP != nil { - c.writeMessage(StatusServiceNotAvailable, fmt.Sprintf("Could not listen for passive connection: %v", errIP)) - return nil - } - - exposedPort, listener, deadlineSetter, err = c.server.passiveListeners.reserve(controlConnIP, portMapping) - } else { - var tcpListener *net.TCPListener - if portMapping != nil { - exposedPort, tcpListener, err = c.findListenerWithinPortRange(portMapping) - } else { - tcpListener, err = net.ListenTCP("tcp", addr) - } - if err == nil { - listener = tcpListener - deadlineSetter = tcpListener - } - } + exposedPort, listener, deadlineSetter, err := c.getPassiveListener() if err != nil { c.logger.Error("Could not listen for passive connection", "err", err) @@ -239,6 +213,36 @@ func (c *clientHandler) handlePASV(_ string) error { return nil } +func (c *clientHandler) getPassiveListener() (int, net.Listener, passiveDeadlineSetter, error) { + portMapping := c.server.settings.PassiveTransferPortRange + if c.server.settings.PassiveTransferPortMultiplexing && portMapping != nil { + controlConnIP, err := getIPFromRemoteAddr(c.RemoteAddr()) + if err != nil { + return 0, nil, nil, err + } + + return c.server.passiveListeners.reserve(controlConnIP, portMapping) + } + + addr, _ := net.ResolveTCPAddr("tcp", ":0") + var ( + exposedPort int + tcpListener *net.TCPListener + err error + ) + + if portMapping != nil { + exposedPort, tcpListener, err = c.findListenerWithinPortRange(portMapping) + } else { + tcpListener, err = net.ListenTCP("tcp", addr) + } + if err != nil { + return 0, nil, nil, err + } + + return exposedPort, tcpListener, tcpListener, nil +} + func (c *clientHandler) handlePassivePASV(transferHandler *passiveTransferHandler) bool { portByte1 := transferHandler.Port / 256 portByte2 := transferHandler.Port - (portByte1 * 256) @@ -263,33 +267,35 @@ func (c *clientHandler) handlePassivePASV(transferHandler *passiveTransferHandle } func (p *passiveTransferHandler) ConnectionWait(wait time.Duration) (net.Conn, error) { - if p.connection == nil { - var err error - if p.deadlineSetter != nil { - err = p.deadlineSetter.SetDeadline(time.Now().Add(wait)) - } - if err != nil { - return nil, fmt.Errorf("failed to set deadline: %w", err) - } + if p.connection != nil { + return p.connection, nil + } - p.connection, err = p.listener.Accept() - if err != nil { - return nil, fmt.Errorf("failed to accept passive transfer connection: %w", err) - } + var err error + if p.deadlineSetter != nil { + err = p.deadlineSetter.SetDeadline(time.Now().Add(wait)) + } + if err != nil { + return nil, fmt.Errorf("failed to set deadline: %w", err) + } - ipAddress, err := getIPFromRemoteAddr(p.connection.RemoteAddr()) - if err != nil { - p.logger.Warn("Could get remote passive IP address", "err", err) + p.connection, err = p.listener.Accept() + if err != nil { + return nil, fmt.Errorf("failed to accept passive transfer connection: %w", err) + } - return nil, err - } + ipAddress, err := getIPFromRemoteAddr(p.connection.RemoteAddr()) + if err != nil { + p.logger.Warn("Could get remote passive IP address", "err", err) - if err := p.checkDataConn(ipAddress, DataChannelPassive); err != nil { - // we don't want to expose the full error to the client, we just log it - p.logger.Warn("Could not validate passive data connection requirement", "err", err) + return nil, err + } - return nil, &ipValidationError{error: "data connection security requirements not met"} - } + if err := p.checkDataConn(ipAddress, DataChannelPassive); err != nil { + // we don't want to expose the full error to the client, we just log it + p.logger.Warn("Could not validate passive data connection requirement", "err", err) + + return nil, &ipValidationError{error: "data connection security requirements not met"} } return p.connection, nil From a00a6deff68aaed978d957db4d28330c738d99c9 Mon Sep 17 00:00:00 2001 From: "Francisco A. Lozano" Date: Mon, 23 Mar 2026 02:32:14 +0100 Subject: [PATCH 4/8] style(transfer): satisfy nlreturn in passive multiplexer --- passive_multiplexer.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/passive_multiplexer.go b/passive_multiplexer.go index c7ef57e6..b626ed0f 100644 --- a/passive_multiplexer.go +++ b/passive_multiplexer.go @@ -156,6 +156,7 @@ func (l *sharedPassiveListener) serve() { if err != nil { if isClosedListenerError(err) { l.failAll(net.ErrClosed) + return } @@ -167,6 +168,7 @@ func (l *sharedPassiveListener) serve() { } l.failAll(err) + return } @@ -203,6 +205,7 @@ func (l *sharedPassiveListener) dispatch(conn net.Conn) { if err != nil { l.logger.Warn("Could not parse passive data connection IP", "err", err) _ = conn.Close() + return } From b826922c715319f93aded18694ddfef5588971b3 Mon Sep 17 00:00:00 2001 From: "Francisco A. Lozano" Date: Mon, 23 Mar 2026 02:35:13 +0100 Subject: [PATCH 5/8] style(transfer): satisfy remaining nlreturn checks --- passive_multiplexer.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/passive_multiplexer.go b/passive_multiplexer.go index b626ed0f..d19383cf 100644 --- a/passive_multiplexer.go +++ b/passive_multiplexer.go @@ -237,6 +237,7 @@ func (l *sharedPassiveListener) failAll(err error) { l.mu.Lock() if l.closed { l.mu.Unlock() + return } @@ -256,6 +257,7 @@ func (l *sharedPassiveListener) failAll(err error) { func (l *sharedPassiveListener) close() error { err := l.listener.Close() l.failAll(net.ErrClosed) + return err } @@ -292,6 +294,7 @@ func (l *passiveReservationListener) Accept() (net.Conn, error) { if conn == nil { return nil, l.getFailure() } + return conn, nil case <-l.closedCh: return nil, l.getFailure() From 6b85e1052c785049c8acb539e626c14a2aa39dd4 Mon Sep 17 00:00:00 2001 From: "Francisco A. Lozano" Date: Mon, 23 Mar 2026 02:37:33 +0100 Subject: [PATCH 6/8] style(transfer): fix remaining nlreturn findings --- passive_multiplexer.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/passive_multiplexer.go b/passive_multiplexer.go index d19383cf..759bad7d 100644 --- a/passive_multiplexer.go +++ b/passive_multiplexer.go @@ -331,6 +331,7 @@ func (l *passiveReservationListener) SetDeadline(deadline time.Time) error { defer l.stateMu.Unlock() l.deadline = deadline + return nil } @@ -344,6 +345,7 @@ func (l *passiveReservationListener) deliver(conn net.Conn) bool { select { case l.connCh <- conn: l.markReleased() + return true case <-l.closedCh: return false From c4759ff42dce574dc8ce063fbd8daffeef4c4175 Mon Sep 17 00:00:00 2001 From: "Francisco A. Lozano" Date: Mon, 23 Mar 2026 02:43:30 +0100 Subject: [PATCH 7/8] test(transfer): cover passive multiplexer error paths --- passive_multiplexer_test.go | 112 ++++++++++++++++++++++++++++++++++++ 1 file changed, 112 insertions(+) diff --git a/passive_multiplexer_test.go b/passive_multiplexer_test.go index 32842648..11284192 100644 --- a/passive_multiplexer_test.go +++ b/passive_multiplexer_test.go @@ -1,15 +1,28 @@ package ftpserver import ( + "errors" "io" "log/slog" "net" "testing" + "time" "github.com/secsy/goftp" "github.com/stretchr/testify/require" ) +type trackingTestConn struct { + testNetConn + closed bool +} + +func (c *trackingTestConn) Close() error { + c.closed = true + + return nil +} + func getFreePassivePort(t *testing.T) int { t.Helper() @@ -142,3 +155,102 @@ func TestPassivePortMultiplexingSameClientExhaustion(t *testing.T) { req.Equal(StatusServiceNotAvailable, returnCode, message) req.Contains(message, ErrNoAvailableListeningPort.Error()) } + +func TestPassiveReservationListenerTimeoutAndHelpers(t *testing.T) { + req := require.New(t) + port := getFreePassivePort(t) + listener, err := newSharedPassiveListener(port, slog.New(slog.NewTextHandler(io.Discard, nil))) //nolint:sloglint + req.NoError(err) + defer func() { + req.NoError(listener.close()) + }() + + reservation, err := listener.reserve(net.ParseIP("127.0.0.2")) + req.NoError(err) + req.Equal(listener.listener.Addr(), reservation.Addr()) + + req.NoError(reservation.SetDeadline(time.Now().Add(-time.Second))) + + _, err = reservation.Accept() + req.Error(err) + + var opErr *net.OpError + req.ErrorAs(err, &opErr) + + netErr, ok := opErr.Err.(net.Error) + req.True(ok) + req.Equal("i/o timeout", netErr.Error()) + req.True(netErr.Timeout()) + req.True(netErr.Temporary()) + + req.True(isClosedListenerError(net.ErrClosed)) + req.True(isClosedListenerError(&net.OpError{Err: errors.New("use of closed network connection")})) + req.False(isClosedListenerError(errors.New("different error"))) +} + +func TestPassiveReservationListenerCloseAndFailures(t *testing.T) { + req := require.New(t) + port := getFreePassivePort(t) + listener, err := newSharedPassiveListener(port, slog.New(slog.NewTextHandler(io.Discard, nil))) //nolint:sloglint + req.NoError(err) + defer func() { + req.NoError(listener.close()) + }() + + reservation, err := listener.reserve(net.ParseIP("127.0.0.2")) + req.NoError(err) + + expectedErr := errors.New("boom") + reservation.stateMu.Lock() + reservation.failureErr = expectedErr + reservation.stateMu.Unlock() + reservation.connCh <- nil + + _, err = reservation.Accept() + req.ErrorIs(err, expectedErr) + + reservation2, err := listener.reserve(net.ParseIP("127.0.0.3")) + req.NoError(err) + + conn := &trackingTestConn{} + reservation2.connCh <- conn + req.NoError(reservation2.Close()) + req.True(conn.closed) + req.False(reservation2.deliver(&trackingTestConn{})) + + _, err = reservation2.Accept() + req.ErrorIs(err, net.ErrClosed) +} + +func TestSharedPassiveListenerDispatchRejectionsAndClosedManager(t *testing.T) { + req := require.New(t) + port := getFreePassivePort(t) + manager := newPassiveListenersManager(slog.New(slog.NewTextHandler(io.Discard, nil))) //nolint:sloglint + req.NoError(manager.close()) + req.NoError(manager.close()) + + _, err := manager.getOrCreate(port) + req.ErrorIs(err, net.ErrClosed) + + listener, err := newSharedPassiveListener(port, slog.New(slog.NewTextHandler(io.Discard, nil))) //nolint:sloglint + req.NoError(err) + + unknownConn := &trackingTestConn{ + testNetConn: testNetConn{remoteAddr: &net.TCPAddr{IP: net.ParseIP("127.0.0.9"), Port: 40003}}, + } + listener.dispatch(unknownConn) + req.True(unknownConn.closed) + + invalidConn := &trackingTestConn{ + testNetConn: testNetConn{remoteAddr: &net.UnixAddr{Name: "sock", Net: "unix"}}, + } + listener.dispatch(invalidConn) + req.True(invalidConn.closed) + + req.NoError(listener.close()) + _, err = listener.reserve(net.ParseIP("127.0.0.4")) + req.ErrorIs(err, net.ErrClosed) + + _, err = newSharedPassiveListener(-1, slog.New(slog.NewTextHandler(io.Discard, nil))) //nolint:sloglint + req.Error(err) +} From b302a4f1421abf9dcbe81e1b1be9f20fe569dbfc Mon Sep 17 00:00:00 2001 From: "Francisco A. Lozano" Date: Mon, 23 Mar 2026 02:45:36 +0100 Subject: [PATCH 8/8] test(transfer): lint coverage tests for passive multiplexer --- passive_multiplexer_test.go | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/passive_multiplexer_test.go b/passive_multiplexer_test.go index 11284192..bbf4b9ae 100644 --- a/passive_multiplexer_test.go +++ b/passive_multiplexer_test.go @@ -17,6 +17,12 @@ type trackingTestConn struct { closed bool } +var ( + errTestClosedListener = errors.New("use of closed network connection") + errTestDifferent = errors.New("different error") + errTestPassiveFailure = errors.New("boom") +) + func (c *trackingTestConn) Close() error { c.closed = true @@ -177,15 +183,14 @@ func TestPassiveReservationListenerTimeoutAndHelpers(t *testing.T) { var opErr *net.OpError req.ErrorAs(err, &opErr) - netErr, ok := opErr.Err.(net.Error) - req.True(ok) - req.Equal("i/o timeout", netErr.Error()) - req.True(netErr.Timeout()) - req.True(netErr.Temporary()) + var timeoutErr passiveAcceptTimeoutError + req.ErrorAs(err, &timeoutErr) + req.Equal("i/o timeout", timeoutErr.Error()) + req.True(timeoutErr.Timeout()) req.True(isClosedListenerError(net.ErrClosed)) - req.True(isClosedListenerError(&net.OpError{Err: errors.New("use of closed network connection")})) - req.False(isClosedListenerError(errors.New("different error"))) + req.True(isClosedListenerError(&net.OpError{Err: errTestClosedListener})) + req.False(isClosedListenerError(errTestDifferent)) } func TestPassiveReservationListenerCloseAndFailures(t *testing.T) { @@ -200,7 +205,7 @@ func TestPassiveReservationListenerCloseAndFailures(t *testing.T) { reservation, err := listener.reserve(net.ParseIP("127.0.0.2")) req.NoError(err) - expectedErr := errors.New("boom") + expectedErr := errTestPassiveFailure reservation.stateMu.Lock() reservation.failureErr = expectedErr reservation.stateMu.Unlock()