Skip to content

Commit 4118e5f

Browse files
committed
drpcyamux: add yamux multiplexing support for concurrent RPCs
This package enables multiple concurrent RPCs over a single connection using HashiCorp's yamux multiplexer. It's based on https://gitea.elara.ws/Elara6331/drpc but almost rewritten to fix critical concurrency issues and resource leaks. Key improvements over the original: - Fixed race conditions on connection state using sync.Once and channels - Eliminated goroutine leaks in stream cleanup and server shutdown paths - Proper graceful shutdown with WaitGroups throughout the server stack - Thread-safe idempotent Close() on both client and server - Simplified error handling in session Accept loop - Context-aware shutdown that properly unblocks blocking operations The package provides two main components: - Conn: Client-side drpc.Conn implementation with yamux multiplexing - Server: Server that accepts yamux sessions and handles streams concurrently
1 parent 78d4e12 commit 4118e5f

4 files changed

Lines changed: 268 additions & 1 deletion

File tree

drpcyamux/conn.go

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
// Copyright (C) 2023 Elara Musayelyan
2+
// Copyright (C) 2025 Cockroach Labs
3+
// See LICENSE for copying information.
4+
5+
package drpcyamux
6+
7+
import (
8+
"context"
9+
"errors"
10+
"io"
11+
"sync"
12+
13+
"github.com/hashicorp/yamux"
14+
"storj.io/drpc"
15+
"storj.io/drpc/drpcconn"
16+
)
17+
18+
var ErrClosed = errors.New("connection closed")
19+
20+
var _ drpc.Conn = &Conn{}
21+
22+
// Conn implements drpc.Conn using the yamux multiplexer to allow concurrent
23+
// RPCs
24+
type Conn struct {
25+
conn io.ReadWriteCloser
26+
sess *yamux.Session
27+
28+
closeOnce sync.Once
29+
closeErr error
30+
closed chan struct{}
31+
}
32+
33+
// NewConn returns a new multiplexed DRPC connection as a client
34+
func NewConn(conn io.ReadWriteCloser) (*Conn, error) {
35+
return NewConnWithConfig(conn, nil)
36+
}
37+
38+
// NewConnWithConfig returns a new multiplexed DRPC connection as a client
39+
// with the given yamux configuration
40+
func NewConnWithConfig(conn io.ReadWriteCloser, config *yamux.Config) (*Conn, error) {
41+
sess, err := yamux.Client(conn, config)
42+
if err != nil {
43+
return nil, err
44+
}
45+
46+
return &Conn{
47+
conn: conn,
48+
sess: sess,
49+
closed: make(chan struct{}),
50+
}, nil
51+
}
52+
53+
// Close closes the multiplexer session and the underlying connection. It is
54+
// safe to call Close multiple times.
55+
func (c *Conn) Close() error {
56+
c.closeOnce.Do(func() {
57+
close(c.closed)
58+
59+
// Close session first to stop accepting new streams
60+
sessErr := c.sess.Close()
61+
62+
// Always close the underlying connection
63+
connErr := c.conn.Close()
64+
65+
// Return the first error encountered
66+
if sessErr != nil {
67+
c.closeErr = sessErr
68+
} else {
69+
c.closeErr = connErr
70+
}
71+
})
72+
return c.closeErr
73+
}
74+
75+
// Closed returns a channel that will be closed
76+
// when the connection is closed
77+
func (c *Conn) Closed() <-chan struct{} {
78+
return c.closed
79+
}
80+
81+
// Invoke issues the rpc on the transport serializing in, waits for a response,
82+
// and deserializes it into out.
83+
func (c *Conn) Invoke(
84+
ctx context.Context, rpc string, enc drpc.Encoding, in, out drpc.Message,
85+
) error {
86+
select {
87+
case <-c.closed:
88+
return ErrClosed
89+
default:
90+
}
91+
92+
stream, err := c.sess.Open()
93+
if err != nil {
94+
return err
95+
}
96+
defer stream.Close()
97+
98+
dconn := drpcconn.New(stream)
99+
defer dconn.Close()
100+
101+
return dconn.Invoke(ctx, rpc, enc, in, out)
102+
}
103+
104+
// NewStream begins a streaming rpc on the connection.
105+
func (c *Conn) NewStream(ctx context.Context, rpc string, enc drpc.Encoding) (drpc.Stream, error) {
106+
select {
107+
case <-c.closed:
108+
return nil, ErrClosed
109+
default:
110+
}
111+
112+
stream, err := c.sess.Open()
113+
if err != nil {
114+
return nil, err
115+
}
116+
117+
dconn := drpcconn.New(stream)
118+
119+
s, err := dconn.NewStream(ctx, rpc, enc)
120+
if err != nil {
121+
dconn.Close()
122+
stream.Close()
123+
return nil, err
124+
}
125+
126+
// Clean up the yamux stream when the drpc connection closes.
127+
// This goroutine will exit when dconn.Closed() is signaled.
128+
go func() {
129+
<-dconn.Closed()
130+
stream.Close()
131+
}()
132+
133+
return s, nil
134+
}

drpcyamux/server.go

Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
// Copyright (C) 2023 Elara Musayelyan
2+
// Copyright (C) 2025 Cockroach Labs
3+
// See LICENSE for copying information.
4+
5+
package drpcyamux
6+
7+
import (
8+
"context"
9+
"crypto/tls"
10+
"errors"
11+
"net"
12+
"sync"
13+
14+
"github.com/hashicorp/yamux"
15+
"storj.io/drpc"
16+
"storj.io/drpc/drpcctx"
17+
"storj.io/drpc/drpcserver"
18+
)
19+
20+
// Server is a DRPC server that handles multiplexed streams
21+
type Server struct {
22+
srv *drpcserver.Server
23+
}
24+
25+
// NewServer creates a new multiplexing DRPC server with default options
26+
func NewServer(handler drpc.Handler) *Server {
27+
return &Server{srv: drpcserver.New(handler)}
28+
}
29+
30+
// NewServerWithOptions creates a new multiplexing DRPC server with custom options
31+
func NewServerWithOptions(handler drpc.Handler, opts drpcserver.Options) *Server {
32+
return &Server{srv: drpcserver.NewWithOptions(handler, opts)}
33+
}
34+
35+
// Serve listens on the given listener and handles all multiplexed streams.
36+
// It blocks until the context is canceled or an unrecoverable error occurs.
37+
func (s *Server) Serve(ctx context.Context, ln net.Listener) error {
38+
var wg sync.WaitGroup
39+
defer wg.Wait()
40+
41+
// Context for coordinating shutdown
42+
ctx, cancel := context.WithCancel(ctx)
43+
defer cancel()
44+
45+
for {
46+
conn, err := ln.Accept()
47+
if err != nil {
48+
// Check if we're shutting down
49+
select {
50+
case <-ctx.Done():
51+
return nil
52+
default:
53+
}
54+
55+
// If listener was closed, treat it as shutdown
56+
var opErr *net.OpError
57+
if errors.As(err, &opErr) && opErr.Op == "accept" {
58+
return nil
59+
}
60+
61+
return err
62+
}
63+
64+
wg.Add(1)
65+
go func() {
66+
defer wg.Done()
67+
s.handleConn(ctx, conn)
68+
}()
69+
}
70+
}
71+
72+
// handleConn processes a single connection with multiplexing
73+
func (s *Server) handleConn(ctx context.Context, conn net.Conn) {
74+
defer conn.Close()
75+
76+
if tlsConn, ok := conn.(*tls.Conn); ok {
77+
err := tlsConn.Handshake()
78+
if err != nil {
79+
return
80+
}
81+
state := tlsConn.ConnectionState()
82+
if len(state.PeerCertificates) > 0 {
83+
ctx = drpcctx.WithPeerConnectionInfo(
84+
ctx, drpcctx.PeerConnectionInfo{Certificates: state.PeerCertificates})
85+
}
86+
}
87+
88+
sess, err := yamux.Server(conn, nil)
89+
if err != nil {
90+
return
91+
}
92+
defer sess.Close()
93+
94+
s.handleSession(ctx, sess)
95+
}
96+
97+
// handleSession accepts and serves streams from a yamux session
98+
func (s *Server) handleSession(ctx context.Context, sess *yamux.Session) {
99+
var wg sync.WaitGroup
100+
defer wg.Wait()
101+
102+
// Close session when context is cancelled
103+
done := make(chan struct{})
104+
defer close(done)
105+
106+
go func() {
107+
select {
108+
case <-ctx.Done():
109+
sess.Close()
110+
case <-done:
111+
}
112+
}()
113+
114+
for {
115+
stream, err := sess.Accept()
116+
if err != nil {
117+
// Any error from Accept means the session is done
118+
// Common errors: io.EOF (graceful close), session closed, etc.
119+
return
120+
}
121+
122+
wg.Add(1)
123+
go func() {
124+
defer wg.Done()
125+
s.srv.ServeOne(ctx, stream)
126+
}()
127+
}
128+
}

go.mod

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
11
module storj.io/drpc
22

3-
go 1.19
3+
go 1.23.0
4+
5+
toolchain go1.24.9
46

57
require (
8+
github.com/hashicorp/yamux v0.1.2
69
github.com/stretchr/testify v1.10.0
710
github.com/zeebo/assert v1.3.0
811
github.com/zeebo/errs v1.2.2

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiu
66
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
77
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
88
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
9+
github.com/hashicorp/yamux v0.1.2 h1:XtB8kyFOyHXYVFnwT5C3+Bdo8gArse7j2AQ0DA0Uey8=
10+
github.com/hashicorp/yamux v0.1.2/go.mod h1:C+zze2n6e/7wshOZep2A70/aQU6QBRWJO/G6FT1wIns=
911
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
1012
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
1113
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=

0 commit comments

Comments
 (0)