Skip to content

Commit 888743b

Browse files
committed
*: move frame assembly from reader to stream
Previously, Reader assembled wire frames into complete packets before handing them to the manager. This change makes Reader return individual frames (ReadFrame), and the stream handles frame assembly itself (HandleFrame). The manager now enforces global frame ID monotonicity and other validation that are beyond a stream's scope. This is groundwork for stream multiplexing, where frames from different streams will be interleaved on the wire and must be routed to the correct stream before assembly. Re-enables TestManageReader_FirstFrameMustBeInvoke and TestManageReader_InvokeOnExistingStream, which were disabled in the previous commit because the old code did not enforce these conditions. This change also includes wire-protocol.md which can be considered as spec.
1 parent 2e067fd commit 888743b

7 files changed

Lines changed: 793 additions & 330 deletions

File tree

drpcmanager/manager.go

Lines changed: 39 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,9 @@ type Manager struct {
7272
rd *drpcwire.Reader
7373
opts Options
7474

75+
lastFrameID drpcwire.ID
76+
lastFrameKind drpcwire.Kind
77+
7578
sem drpcsignal.Chan // held by the active stream
7679
sbuf streamBuffer // largest stream id created
7780
pkts chan drpcwire.Packet // channel for invoke packets
@@ -213,27 +216,15 @@ func (m *Manager) terminate(err error) {
213216
// manage reader
214217
//
215218

216-
// manageReader is always reading a packet and dispatching it to the appropriate
217-
// stream or queue. It sets the read signal when it exits so that one can wait
218-
// to ensure that no one is reading on the reader. It sets the term signal if
219-
// there is any error reading packets.
219+
// manageReader reads the frame and dispatches them to the appropriate stream or
220+
// queue. It sets the read signal when it exits so that one can wait to ensure
221+
// that no one is reading on the reader. It sets the term signal if there is any
222+
// error reading packets.
220223
func (m *Manager) manageReader() {
221224
defer m.sigs.read.Set(nil)
222225

223-
var pkt drpcwire.Packet
224-
var err error
225-
var run int
226-
227226
for !m.sigs.term.IsSet() {
228-
// if we have a run of "small" packets, drop the buffer to release
229-
// memory so that a burst of large packets does not cause eternally
230-
// large heap usage.
231-
if run > 10 {
232-
pkt.Data = nil
233-
run = 0
234-
}
235-
236-
pkt, err = m.rd.ReadPacketUsing(pkt.Data[:0])
227+
incomingFrame, err := m.rd.ReadFrame()
237228
if err != nil {
238229
if isConnectionReset(err) {
239230
err = drpc.ClosedError.Wrap(err)
@@ -242,33 +233,35 @@ func (m *Manager) manageReader() {
242233
return
243234
}
244235

245-
if len(pkt.Data) < cap(pkt.Data)/4 {
246-
run++
247-
} else {
248-
run = 0
249-
}
236+
m.log("READ", incomingFrame.String)
250237

251-
m.log("READ", pkt.String)
238+
if incomingFrame.ID.Less(m.lastFrameID) {
239+
m.terminate(managerClosed.Wrap(drpc.ProtocolError.New("id monotonicity violation")))
240+
return
241+
}
252242

253243
again:
254244
switch curr := m.sbuf.Get(); {
255-
// if the packet is for the current stream, deliver it.
256-
case curr != nil && pkt.ID.Stream == curr.ID():
257-
if err := curr.HandlePacket(pkt); err != nil {
245+
// If the frame is for the current stream, deliver it.
246+
case curr != nil && incomingFrame.ID.Stream == curr.ID():
247+
if err := curr.HandleFrame(incomingFrame); err != nil {
258248
m.terminate(managerClosed.Wrap(err))
259249
return
260250
}
261251

262-
// if an old message has been sent, just ignore it.
263-
case curr != nil && pkt.ID.Stream < curr.ID():
252+
case curr != nil && incomingFrame.ID.Stream < curr.ID():
253+
254+
// Either we are receiving first frame or a frame greater than the
255+
// existing one in which case it should be an Invoke frame.
264256

265-
// if any invoke sequence is being sent, close any old unterminated
257+
// If any invoke sequence is being sent, close any old unterminated
266258
// stream and forward it to be handled.
267-
case pkt.Kind == drpcwire.KindInvoke || pkt.Kind == drpcwire.KindInvokeMetadata:
259+
case incomingFrame.Kind == drpcwire.KindInvoke || incomingFrame.Kind == drpcwire.KindInvokeMetadata:
268260
if curr != nil && !curr.IsTerminated() {
269261
curr.Cancel(context.Canceled)
270262
}
271263

264+
pkt := drpcwire.Packet{ID: incomingFrame.ID, Kind: incomingFrame.Kind, Data: incomingFrame.Data}
272265
select {
273266
case m.pkts <- pkt:
274267
m.pdone.Recv()
@@ -277,19 +270,32 @@ func (m *Manager) manageReader() {
277270
return
278271
}
279272

280-
// a non-invoke packet should be delivered to some stream so we wait for
281-
// a new stream to be created and try again. like an invoke, we
282-
// implicitly close any previous stream.
273+
// A non-invoke packet is delivered while the stream is being created
274+
// with an invoke so we wait for a new stream to be created and try
275+
// again. Like an invoke, we implicitly close any previous stream.
283276
default:
277+
if m.lastFrameKind != drpcwire.KindInvoke {
278+
m.terminate(managerClosed.Wrap(
279+
drpc.ProtocolError.New("first message of a stream cannot be of kind non-Invoke")))
280+
return
281+
}
282+
284283
if curr != nil && !curr.IsTerminated() {
285284
curr.Cancel(context.Canceled)
286285
}
287286

288287
if !m.sbuf.Wait(curr.ID()) {
289288
return
290289
}
290+
// Note: it is safe to not update lastFrameKind and lastFrameID, as we are merely retrying
291291
goto again
292292
}
293+
294+
m.lastFrameKind = incomingFrame.Kind
295+
m.lastFrameID = incomingFrame.ID
296+
if incomingFrame.Done {
297+
m.lastFrameID.Message += 1
298+
}
293299
}
294300
}
295301

drpcmanager/manager_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -365,7 +365,7 @@ func TestManageReader_OldStreamFramesIgnored(t *testing.T) {
365365

366366
// The first frame for a new stream must be KindInvoke or KindInvokeMetadata.
367367
// A non-invoke kind causes a protocol error.
368-
func Disabled_TestManageReader_FirstFrameMustBeInvoke(t *testing.T) {
368+
func TestManageReader_FirstFrameMustBeInvoke(t *testing.T) {
369369
for _, kind := range []drpcwire.Kind{
370370
drpcwire.KindMessage,
371371
drpcwire.KindCancel,
@@ -552,7 +552,7 @@ func TestManageReader_MultiFrameWithSkippedMessageID(t *testing.T) {
552552

553553
// A second invoke for the same stream ID is rejected — the stream treats
554554
// it as a protocol error, terminating the manager.
555-
func Disabled_TestManageReader_InvokeOnExistingStream(t *testing.T) {
555+
func TestManageReader_InvokeOnExistingStream(t *testing.T) {
556556
ctx := drpctest.NewTracker(t)
557557
defer ctx.Close()
558558

drpcstream/stream.go

Lines changed: 59 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import (
2323

2424
// Options controls configuration settings for a stream.
2525
type Options struct {
26-
// SplitSize controls the default size we split packets into frames.
26+
// SplitSize controls the default size we split data packets into frames.
2727
SplitSize int
2828

2929
// ManualFlush controls if the stream will automatically flush after every
@@ -52,6 +52,11 @@ type Stream struct {
5252
read inspectMutex
5353
flush sync.Once
5454

55+
assembling bool
56+
pktBuf []byte
57+
pktKind drpcwire.Kind
58+
nextMessageID uint64
59+
5560
id drpcwire.ID
5661
wr *drpcwire.Writer
5762
pbuf packetBuffer
@@ -98,6 +103,9 @@ func NewWithOptions(ctx context.Context, sid uint64, wr *drpcwire.Writer, opts O
98103
fin: drpcopts.GetStreamFin(&opts.Internal),
99104
task: task,
100105

106+
// TODO: add a test case.
107+
nextMessageID: 1,
108+
101109
id: drpcwire.ID{Stream: sid},
102110
wr: wr.Reset(),
103111
}
@@ -214,15 +222,54 @@ func (s *Stream) SetManualFlush(mf bool) { s.opts.ManualFlush = mf }
214222
// packet handler
215223
//
216224

225+
func (s *Stream) HandleFrame(fr drpcwire.Frame) (err error) {
226+
if fr.ID.Stream != s.ID() {
227+
return drpc.ProtocolError.New("frame doesn't belong to this stream (fr: %v)", fr.ID)
228+
}
229+
230+
if fr.ID.Message < s.nextMessageID {
231+
return drpc.ProtocolError.New(
232+
"id monotonicity violation (current frame:%v last frame:%v)", fr.ID, s.nextMessageID)
233+
} else if fr.ID.Message > s.nextMessageID || !s.assembling {
234+
s.pktBuf = s.pktBuf[:0]
235+
s.assembling = true
236+
s.nextMessageID = fr.ID.Message
237+
} else if fr.Kind != s.pktKind {
238+
return drpc.ProtocolError.New("packet kind change (fr:%v pkt:%v)", fr.Kind, s.pktKind)
239+
}
240+
241+
// TODO(shubham): add buf reuse
242+
s.pktBuf = append(s.pktBuf, fr.Data...)
243+
244+
s.pktKind = fr.Kind
245+
246+
// TODO:
247+
// case len(pkt.Data) > r.opts.MaximumBufferSize:
248+
// return Packet{}, drpc.ProtocolError.New("data overflow (len:%v)", len(pkt.Data))
249+
250+
if !fr.Done {
251+
return nil
252+
}
253+
254+
s.assembling = false
255+
s.nextMessageID = fr.ID.Message + 1
256+
257+
err = s.handlePacket(drpcwire.Packet{
258+
ID: fr.ID,
259+
Kind: fr.Kind,
260+
Data: s.pktBuf,
261+
})
262+
263+
// TODO(shubham): add buf reuse
264+
s.pktBuf = nil
265+
return err
266+
}
267+
217268
// HandlePacket advances the stream state machine by inspecting the packet. It
218269
// returns any major errors that should terminate the transport the stream is
219270
// operating on as well as a boolean indicating if the stream expects more
220271
// packets.
221-
func (s *Stream) HandlePacket(pkt drpcwire.Packet) (err error) {
222-
if pkt.ID.Stream != s.id.Stream {
223-
return nil
224-
}
225-
272+
func (s *Stream) handlePacket(pkt drpcwire.Packet) (err error) {
226273
drpcopts.GetStreamStats(&s.opts.Internal).AddRead(uint64(len(pkt.Data)))
227274

228275
if s.sigs.term.IsSet() {
@@ -240,7 +287,7 @@ func (s *Stream) HandlePacket(pkt drpcwire.Packet) (err error) {
240287
defer s.mu.Unlock()
241288

242289
switch pkt.Kind {
243-
case drpcwire.KindInvoke:
290+
case drpcwire.KindInvoke, drpcwire.KindInvokeMetadata:
244291
err := drpc.ProtocolError.New("invoke on existing stream")
245292
s.terminate(err)
246293
return err
@@ -375,9 +422,13 @@ func (s *Stream) RawWrite(kind drpcwire.Kind, data []byte) (err error) {
375422

376423
// rawWriteLocked does the body of RawWrite assuming the caller is holding the
377424
// appropriate locks.
425+
// TODO(shubham): can we merge this with sendPacketLocked?
378426
func (s *Stream) rawWriteLocked(kind drpcwire.Kind, data []byte) (err error) {
379427
fr := s.newFrameLocked(kind)
380-
n := s.opts.SplitSize
428+
n := 0
429+
if kind == drpcwire.KindMessage {
430+
n = s.opts.SplitSize
431+
}
381432

382433
for {
383434
switch {

0 commit comments

Comments
 (0)