From 50264b0a62396bbea89bd530d7b85dbd644b28cb Mon Sep 17 00:00:00 2001 From: qr243vbi Date: Tue, 7 Apr 2026 15:44:15 +0800 Subject: [PATCH 01/12] Replace addr with factory in TServerSocket --- lib/go/thrift/server_socket.go | 96 +++++++++++++++++++++++----------- 1 file changed, 66 insertions(+), 30 deletions(-) diff --git a/lib/go/thrift/server_socket.go b/lib/go/thrift/server_socket.go index 164221e92b2..8575bbb9d1b 100644 --- a/lib/go/thrift/server_socket.go +++ b/lib/go/thrift/server_socket.go @@ -17,6 +17,7 @@ * under the License. */ + package thrift import ( @@ -25,16 +26,20 @@ import ( "time" ) +// TServerSocketListenerFactory abstracts how listeners are created. +type TServerSocketListenerFactory func(listen bool) (net.Addr, net.Listener, error) + type TServerSocket struct { - addr net.Addr + factory TServerSocketListenerFactory clientTimeout time.Duration - // Protects the listener and interrupted fields to make them thread safe. mu sync.RWMutex listener net.Listener interrupted bool } +// --- Constructors --- + func NewTServerSocket(listenAddr string) (*TServerSocket, error) { return NewTServerSocketTimeout(listenAddr, 0) } @@ -44,28 +49,62 @@ func NewTServerSocketTimeout(listenAddr string, clientTimeout time.Duration) (*T if err != nil { return nil, err } - return &TServerSocket{addr: addr, clientTimeout: clientTimeout}, nil + + return NewTServerSocketFromAddrTimeout(addr, clientTimeout), nil } -// Creates a TServerSocket from a net.Addr func NewTServerSocketFromAddrTimeout(addr net.Addr, clientTimeout time.Duration) *TServerSocket { - return &TServerSocket{addr: addr, clientTimeout: clientTimeout} + factory := func(listen bool) (net.Addr, net.Listener, error) { + var listener net.Listener + var err error + if (listen){ + listener, err = net.Listen(addr.Network(), addr.String()) + } + return addr, listener, err + } + + return NewTServerSocketFromFactoryTimeout(factory, clientTimeout) } -func (p *TServerSocket) Listen() error { +// Allows full customization (TLS, mocks, unix sockets, windows named pipes, etc.) +func NewTServerSocketFromFactoryTimeout(factory TServerSocketListenerFactory, clientTimeout time.Duration) *TServerSocket { + return &TServerSocket{ + factory: factory, + clientTimeout: clientTimeout, + } +} + +// --- Core methods --- + +func (p *TServerSocket) try_listen(raise bool) error { p.mu.Lock() defer p.mu.Unlock() - if p.IsListening() { + + if p.listener != nil { + if (raise) { + return NewTTransportException(ALREADY_OPEN, "Server socket already open") + } return nil } - l, err := net.Listen(p.addr.Network(), p.addr.String()) + + _, l, err := p.factory(true) if err != nil { return err } + p.listener = l + p.interrupted = false return nil } +func (p *TServerSocket) Open() error { + return p.try_listen(true) +} + +func (p *TServerSocket) Listen() error { + return p.try_listen(false) +} + func (p *TServerSocket) Accept() (TTransport, error) { p.mu.RLock() interrupted := p.interrupted @@ -87,51 +126,48 @@ func (p *TServerSocket) Accept() (TTransport, error) { return NewTSocketFromConnTimeout(conn, p.clientTimeout), nil } -// Checks whether the socket is listening. +// --- State helpers --- + func (p *TServerSocket) IsListening() bool { + p.mu.RLock() + defer p.mu.RUnlock() return p.listener != nil } -// Connects the socket, creating a new socket object if necessary. -func (p *TServerSocket) Open() error { - p.mu.Lock() - defer p.mu.Unlock() - if p.IsListening() { - return NewTTransportException(ALREADY_OPEN, "Server socket already open") - } - if l, err := net.Listen(p.addr.Network(), p.addr.String()); err != nil { - return err - } else { - p.listener = l - } - return nil -} - func (p *TServerSocket) Addr() net.Addr { p.mu.RLock() defer p.mu.RUnlock() - if p.IsListening() { + + if p.listener != nil { return p.listener.Addr() } - return p.addr + addr, _, _ := p.factory(false) + return addr } +// --- Shutdown / control --- + func (p *TServerSocket) Close() error { - var err error p.mu.Lock() - if p.IsListening() { + defer p.mu.Unlock() + + var err error + if p.listener != nil { err = p.listener.Close() p.listener = nil } - p.mu.Unlock() return err } func (p *TServerSocket) Interrupt() error { p.mu.Lock() p.interrupted = true + listener := p.listener + p.listener = nil p.mu.Unlock() - p.Close() + if listener != nil { + return listener.Close() + } return nil } From 160aec18e96ba3df5a2a9ff84412eaa41081fa54 Mon Sep 17 00:00:00 2001 From: qr243vbi Date: Thu, 9 Apr 2026 08:23:59 +0300 Subject: [PATCH 02/12] Update server_socket.go --- lib/go/thrift/server_socket.go | 44 +++++++++++++++++++++------------- 1 file changed, 27 insertions(+), 17 deletions(-) diff --git a/lib/go/thrift/server_socket.go b/lib/go/thrift/server_socket.go index 8575bbb9d1b..ffd2ad1ec00 100644 --- a/lib/go/thrift/server_socket.go +++ b/lib/go/thrift/server_socket.go @@ -26,11 +26,9 @@ import ( "time" ) -// TServerSocketListenerFactory abstracts how listeners are created. -type TServerSocketListenerFactory func(listen bool) (net.Addr, net.Listener, error) - type TServerSocket struct { - factory TServerSocketListenerFactory + // TServerSocketListenerFactory abstracts how listeners are created. + factory func(bool) (net.Addr, net.Listener, error) clientTimeout time.Duration mu sync.RWMutex @@ -67,7 +65,20 @@ func NewTServerSocketFromAddrTimeout(addr net.Addr, clientTimeout time.Duration) } // Allows full customization (TLS, mocks, unix sockets, windows named pipes, etc.) -func NewTServerSocketFromFactoryTimeout(factory TServerSocketListenerFactory, clientTimeout time.Duration) *TServerSocket { +func NewTServerSocketFromFactoryAddrTimeout(proc func(addr net.Addr) (listener net.Listener, err error),addr net.Addr, clientTimeout time.Duration) *TServerSocket { + factory := func(listen bool) (net.Addr, net.Listener, error) { + var listener net.Listener + var err error + if (listen){ + listener, err = proc(addr) + } + return addr, listener, err + } + return NewTServerSocketFromFactoryTimeout(factory, clientTimeout) +} + +// Allows full customization (TLS, mocks, unix sockets, windows named pipes, etc.) +func NewTServerSocketFromFactoryTimeout(factory func(listen bool) (addr net.Addr, listener net.Listener, err error), clientTimeout time.Duration) *TServerSocket { return &TServerSocket{ factory: factory, clientTimeout: clientTimeout, @@ -147,11 +158,14 @@ func (p *TServerSocket) Addr() net.Addr { // --- Shutdown / control --- -func (p *TServerSocket) Close() error { +func (p *TServerSocket) try_close(interrupt bool) error { p.mu.Lock() defer p.mu.Unlock() + if (interrupt){ + p.interrupted = true + } - var err error + var err error = nil if p.listener != nil { err = p.listener.Close() p.listener = nil @@ -159,15 +173,11 @@ func (p *TServerSocket) Close() error { return err } -func (p *TServerSocket) Interrupt() error { - p.mu.Lock() - p.interrupted = true - listener := p.listener - p.listener = nil - p.mu.Unlock() - if listener != nil { - return listener.Close() - } - return nil +func (p *TServerSocket) Close() error { + return p.try_close(false) +} + +func (p *TServerSocket) Interrupt() error { + return p.try_close(true) } From efb11f6117f1c16a17e3ca9ce9c2ee869019be44 Mon Sep 17 00:00:00 2001 From: qr243vbi Date: Thu, 9 Apr 2026 23:03:42 +0300 Subject: [PATCH 03/12] Update lib/go/thrift/server_socket.go Co-authored-by: Yuxuan 'fishy' Wang --- lib/go/thrift/server_socket.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/go/thrift/server_socket.go b/lib/go/thrift/server_socket.go index ffd2ad1ec00..d68c7d28fa5 100644 --- a/lib/go/thrift/server_socket.go +++ b/lib/go/thrift/server_socket.go @@ -109,7 +109,7 @@ func (p *TServerSocket) try_listen(raise bool) error { } func (p *TServerSocket) Open() error { - return p.try_listen(true) + return p.try_listen(true /* raise */) } func (p *TServerSocket) Listen() error { From 5e9dea8de594047bf57bfe0e49d86f1beeaae8a1 Mon Sep 17 00:00:00 2001 From: qr243vbi Date: Thu, 9 Apr 2026 23:09:38 +0300 Subject: [PATCH 04/12] Update server_socket.go --- lib/go/thrift/server_socket.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/lib/go/thrift/server_socket.go b/lib/go/thrift/server_socket.go index d68c7d28fa5..e25a2bb3708 100644 --- a/lib/go/thrift/server_socket.go +++ b/lib/go/thrift/server_socket.go @@ -109,11 +109,11 @@ func (p *TServerSocket) try_listen(raise bool) error { } func (p *TServerSocket) Open() error { - return p.try_listen(true /* raise */) + return p.try_listen(true /* raise error if listening */) } func (p *TServerSocket) Listen() error { - return p.try_listen(false) + return p.try_listen(false /* do not raise error if listening */) } func (p *TServerSocket) Accept() (TTransport, error) { @@ -175,9 +175,9 @@ func (p *TServerSocket) try_close(interrupt bool) error { func (p *TServerSocket) Close() error { - return p.try_close(false) + return p.try_close(false /* do not set interrupted flag */) } func (p *TServerSocket) Interrupt() error { - return p.try_close(true) + return p.try_close(true /* set interrupted flag */) } From 728229492209a8ee01d04df61681999f04f37503 Mon Sep 17 00:00:00 2001 From: qr243vbi Date: Thu, 9 Apr 2026 23:15:47 +0300 Subject: [PATCH 05/12] Update server_socket.go --- lib/go/thrift/server_socket.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/go/thrift/server_socket.go b/lib/go/thrift/server_socket.go index e25a2bb3708..c48e9910e2d 100644 --- a/lib/go/thrift/server_socket.go +++ b/lib/go/thrift/server_socket.go @@ -65,7 +65,7 @@ func NewTServerSocketFromAddrTimeout(addr net.Addr, clientTimeout time.Duration) } // Allows full customization (TLS, mocks, unix sockets, windows named pipes, etc.) -func NewTServerSocketFromFactoryAddrTimeout(proc func(addr net.Addr) (listener net.Listener, err error),addr net.Addr, clientTimeout time.Duration) *TServerSocket { +func NewTServerSocketFromListenerAddrTimeout(listener func(addr net.Addr) (listener net.Listener, err error), addr net.Addr, clientTimeout time.Duration) *TServerSocket { factory := func(listen bool) (net.Addr, net.Listener, error) { var listener net.Listener var err error From ac4ef48e31aa91862f0be2739857d4a1cdeee572 Mon Sep 17 00:00:00 2001 From: qr243vbi Date: Thu, 9 Apr 2026 23:20:12 +0300 Subject: [PATCH 06/12] Update server_socket.go --- lib/go/thrift/server_socket.go | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/lib/go/thrift/server_socket.go b/lib/go/thrift/server_socket.go index c48e9910e2d..9d38fcc73c2 100644 --- a/lib/go/thrift/server_socket.go +++ b/lib/go/thrift/server_socket.go @@ -64,19 +64,6 @@ func NewTServerSocketFromAddrTimeout(addr net.Addr, clientTimeout time.Duration) return NewTServerSocketFromFactoryTimeout(factory, clientTimeout) } -// Allows full customization (TLS, mocks, unix sockets, windows named pipes, etc.) -func NewTServerSocketFromListenerAddrTimeout(listener func(addr net.Addr) (listener net.Listener, err error), addr net.Addr, clientTimeout time.Duration) *TServerSocket { - factory := func(listen bool) (net.Addr, net.Listener, error) { - var listener net.Listener - var err error - if (listen){ - listener, err = proc(addr) - } - return addr, listener, err - } - return NewTServerSocketFromFactoryTimeout(factory, clientTimeout) -} - // Allows full customization (TLS, mocks, unix sockets, windows named pipes, etc.) func NewTServerSocketFromFactoryTimeout(factory func(listen bool) (addr net.Addr, listener net.Listener, err error), clientTimeout time.Duration) *TServerSocket { return &TServerSocket{ From cd23742da732d2b0ab50eb7a01d70b3d3fc89c6c Mon Sep 17 00:00:00 2001 From: qr243vbi Date: Fri, 10 Apr 2026 01:36:04 +0300 Subject: [PATCH 07/12] Update server_socket.go --- lib/go/thrift/server_socket.go | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/lib/go/thrift/server_socket.go b/lib/go/thrift/server_socket.go index 9d38fcc73c2..69f762d47d7 100644 --- a/lib/go/thrift/server_socket.go +++ b/lib/go/thrift/server_socket.go @@ -28,7 +28,8 @@ import ( type TServerSocket struct { // TServerSocketListenerFactory abstracts how listeners are created. - factory func(bool) (net.Addr, net.Listener, error) + factory func(net.Addr) (net.Listener, error) + addr net.Addr clientTimeout time.Duration mu sync.RWMutex @@ -52,22 +53,18 @@ func NewTServerSocketTimeout(listenAddr string, clientTimeout time.Duration) (*T } func NewTServerSocketFromAddrTimeout(addr net.Addr, clientTimeout time.Duration) *TServerSocket { - factory := func(listen bool) (net.Addr, net.Listener, error) { - var listener net.Listener - var err error - if (listen){ - listener, err = net.Listen(addr.Network(), addr.String()) - } - return addr, listener, err + factory := func(addr net.Addr) (net.Listener, error) { + return net.Listen(addr.Network(), addr.String()) } - return NewTServerSocketFromFactoryTimeout(factory, clientTimeout) + return NewTServerSocketFromFactoryTimeout(factory, addr, clientTimeout) } // Allows full customization (TLS, mocks, unix sockets, windows named pipes, etc.) -func NewTServerSocketFromFactoryTimeout(factory func(listen bool) (addr net.Addr, listener net.Listener, err error), clientTimeout time.Duration) *TServerSocket { +func NewTServerSocketFromFactoryTimeout(factory func(addr net.Addr) (listener net.Listener, err error), addr net.Addr, clientTimeout time.Duration) *TServerSocket { return &TServerSocket{ factory: factory, + addr: addr, clientTimeout: clientTimeout, } } From 90c854008f820921307da60a3864ef7908f0bde5 Mon Sep 17 00:00:00 2001 From: qr243vbi Date: Fri, 10 Apr 2026 01:39:01 +0300 Subject: [PATCH 08/12] Update server_socket.go --- lib/go/thrift/server_socket.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lib/go/thrift/server_socket.go b/lib/go/thrift/server_socket.go index 69f762d47d7..9d537925cb6 100644 --- a/lib/go/thrift/server_socket.go +++ b/lib/go/thrift/server_socket.go @@ -31,7 +31,8 @@ type TServerSocket struct { factory func(net.Addr) (net.Listener, error) addr net.Addr clientTimeout time.Duration - + + // Protects the listener and interrupted fields to make them thread safe. mu sync.RWMutex listener net.Listener interrupted bool From fad587a15ce3598942b8208cd734e8d06a4ef7db Mon Sep 17 00:00:00 2001 From: qr243vbi Date: Fri, 10 Apr 2026 01:46:44 +0300 Subject: [PATCH 09/12] Update server_socket.go --- lib/go/thrift/server_socket.go | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/lib/go/thrift/server_socket.go b/lib/go/thrift/server_socket.go index 9d537925cb6..7b38c1c6868 100644 --- a/lib/go/thrift/server_socket.go +++ b/lib/go/thrift/server_socket.go @@ -31,15 +31,13 @@ type TServerSocket struct { factory func(net.Addr) (net.Listener, error) addr net.Addr clientTimeout time.Duration - + // Protects the listener and interrupted fields to make them thread safe. mu sync.RWMutex listener net.Listener interrupted bool } -// --- Constructors --- - func NewTServerSocket(listenAddr string) (*TServerSocket, error) { return NewTServerSocketTimeout(listenAddr, 0) } @@ -53,6 +51,7 @@ func NewTServerSocketTimeout(listenAddr string, clientTimeout time.Duration) (*T return NewTServerSocketFromAddrTimeout(addr, clientTimeout), nil } +// Creates a TServerSocket from a net.Addr func NewTServerSocketFromAddrTimeout(addr net.Addr, clientTimeout time.Duration) *TServerSocket { factory := func(addr net.Addr) (net.Listener, error) { return net.Listen(addr.Network(), addr.String()) @@ -70,8 +69,6 @@ func NewTServerSocketFromFactoryTimeout(factory func(addr net.Addr) (listener ne } } -// --- Core methods --- - func (p *TServerSocket) try_listen(raise bool) error { p.mu.Lock() defer p.mu.Unlock() @@ -93,6 +90,7 @@ func (p *TServerSocket) try_listen(raise bool) error { return nil } +// Connects the socket, creating a new socket object if necessary. func (p *TServerSocket) Open() error { return p.try_listen(true /* raise error if listening */) } @@ -122,8 +120,7 @@ func (p *TServerSocket) Accept() (TTransport, error) { return NewTSocketFromConnTimeout(conn, p.clientTimeout), nil } -// --- State helpers --- - +// Checks whether the socket is listening. func (p *TServerSocket) IsListening() bool { p.mu.RLock() defer p.mu.RUnlock() @@ -141,8 +138,6 @@ func (p *TServerSocket) Addr() net.Addr { return addr } -// --- Shutdown / control --- - func (p *TServerSocket) try_close(interrupt bool) error { p.mu.Lock() defer p.mu.Unlock() @@ -158,7 +153,6 @@ func (p *TServerSocket) try_close(interrupt bool) error { return err } - func (p *TServerSocket) Close() error { return p.try_close(false /* do not set interrupted flag */) } From 790f70331b9105b8c3e0ac5d8033930eafd01c97 Mon Sep 17 00:00:00 2001 From: qr243vbi Date: Fri, 10 Apr 2026 05:18:27 +0300 Subject: [PATCH 10/12] Update server_socket.go --- lib/go/thrift/server_socket.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/go/thrift/server_socket.go b/lib/go/thrift/server_socket.go index 7b38c1c6868..e98846c604f 100644 --- a/lib/go/thrift/server_socket.go +++ b/lib/go/thrift/server_socket.go @@ -51,6 +51,7 @@ func NewTServerSocketTimeout(listenAddr string, clientTimeout time.Duration) (*T return NewTServerSocketFromAddrTimeout(addr, clientTimeout), nil } +// NewTServerSocketFromFactoryTimeout // Creates a TServerSocket from a net.Addr func NewTServerSocketFromAddrTimeout(addr net.Addr, clientTimeout time.Duration) *TServerSocket { factory := func(addr net.Addr) (net.Listener, error) { @@ -80,7 +81,7 @@ func (p *TServerSocket) try_listen(raise bool) error { return nil } - _, l, err := p.factory(true) + l, err := p.factory(p.addr) if err != nil { return err } @@ -134,8 +135,7 @@ func (p *TServerSocket) Addr() net.Addr { if p.listener != nil { return p.listener.Addr() } - addr, _, _ := p.factory(false) - return addr + return p.addr } func (p *TServerSocket) try_close(interrupt bool) error { From 94d42fffc4706cb50919fa10106d0355f7a01379 Mon Sep 17 00:00:00 2001 From: qr243vbi Date: Fri, 10 Apr 2026 05:22:00 +0300 Subject: [PATCH 11/12] Update server_socket.go --- lib/go/thrift/server_socket.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/lib/go/thrift/server_socket.go b/lib/go/thrift/server_socket.go index e98846c604f..812c403a6b4 100644 --- a/lib/go/thrift/server_socket.go +++ b/lib/go/thrift/server_socket.go @@ -51,7 +51,7 @@ func NewTServerSocketTimeout(listenAddr string, clientTimeout time.Duration) (*T return NewTServerSocketFromAddrTimeout(addr, clientTimeout), nil } -// NewTServerSocketFromFactoryTimeout +// NewTServerSocketFromAddrTimeout ... // Creates a TServerSocket from a net.Addr func NewTServerSocketFromAddrTimeout(addr net.Addr, clientTimeout time.Duration) *TServerSocket { factory := func(addr net.Addr) (net.Listener, error) { @@ -61,6 +61,7 @@ func NewTServerSocketFromAddrTimeout(addr net.Addr, clientTimeout time.Duration) return NewTServerSocketFromFactoryTimeout(factory, addr, clientTimeout) } +// NewTServerSocketFromFactoryTimeout ... // Allows full customization (TLS, mocks, unix sockets, windows named pipes, etc.) func NewTServerSocketFromFactoryTimeout(factory func(addr net.Addr) (listener net.Listener, err error), addr net.Addr, clientTimeout time.Duration) *TServerSocket { return &TServerSocket{ @@ -91,6 +92,7 @@ func (p *TServerSocket) try_listen(raise bool) error { return nil } +// Open ... // Connects the socket, creating a new socket object if necessary. func (p *TServerSocket) Open() error { return p.try_listen(true /* raise error if listening */) @@ -121,6 +123,7 @@ func (p *TServerSocket) Accept() (TTransport, error) { return NewTSocketFromConnTimeout(conn, p.clientTimeout), nil } +// IsListening ... // Checks whether the socket is listening. func (p *TServerSocket) IsListening() bool { p.mu.RLock() From e9b18b06901188563b92cc46a2578d367c34f88a Mon Sep 17 00:00:00 2001 From: qr243vbi Date: Fri, 10 Apr 2026 05:26:05 +0300 Subject: [PATCH 12/12] Update server_socket.go --- lib/go/thrift/server_socket.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/lib/go/thrift/server_socket.go b/lib/go/thrift/server_socket.go index 812c403a6b4..8b38c0bfd15 100644 --- a/lib/go/thrift/server_socket.go +++ b/lib/go/thrift/server_socket.go @@ -51,7 +51,7 @@ func NewTServerSocketTimeout(listenAddr string, clientTimeout time.Duration) (*T return NewTServerSocketFromAddrTimeout(addr, clientTimeout), nil } -// NewTServerSocketFromAddrTimeout ... +// NewTServerSocketFromAddrTimeout returns TServerSocket // Creates a TServerSocket from a net.Addr func NewTServerSocketFromAddrTimeout(addr net.Addr, clientTimeout time.Duration) *TServerSocket { factory := func(addr net.Addr) (net.Listener, error) { @@ -61,7 +61,7 @@ func NewTServerSocketFromAddrTimeout(addr net.Addr, clientTimeout time.Duration) return NewTServerSocketFromFactoryTimeout(factory, addr, clientTimeout) } -// NewTServerSocketFromFactoryTimeout ... +// NewTServerSocketFromFactoryTimeout returns TServerSocket // Allows full customization (TLS, mocks, unix sockets, windows named pipes, etc.) func NewTServerSocketFromFactoryTimeout(factory func(addr net.Addr) (listener net.Listener, err error), addr net.Addr, clientTimeout time.Duration) *TServerSocket { return &TServerSocket{ @@ -92,7 +92,7 @@ func (p *TServerSocket) try_listen(raise bool) error { return nil } -// Open ... +// Open does try to listen and return on failure // Connects the socket, creating a new socket object if necessary. func (p *TServerSocket) Open() error { return p.try_listen(true /* raise error if listening */) @@ -123,7 +123,7 @@ func (p *TServerSocket) Accept() (TTransport, error) { return NewTSocketFromConnTimeout(conn, p.clientTimeout), nil } -// IsListening ... +// IsListening returns listener != nil // Checks whether the socket is listening. func (p *TServerSocket) IsListening() bool { p.mu.RLock()