diff --git a/go/fory/buffer.go b/go/fory/buffer.go index f008333001..67641faeb9 100644 --- a/go/fory/buffer.go +++ b/go/fory/buffer.go @@ -20,6 +20,7 @@ package fory import ( "encoding/binary" "fmt" + "io" "math" "unsafe" ) @@ -28,12 +29,76 @@ type ByteBuffer struct { data []byte // Most accessed field first for cache locality writerIndex int readerIndex int + reader io.Reader + minCap int } func NewByteBuffer(data []byte) *ByteBuffer { return &ByteBuffer{data: data} } +func NewByteBufferFromReader(r io.Reader, minCap int) *ByteBuffer { + if minCap <= 0 { + minCap = 4096 + } + return &ByteBuffer{ + data: make([]byte, 0, minCap), + reader: r, + minCap: minCap, + } +} + +//go:noinline +func (b *ByteBuffer) fill(n int) bool { + if b.reader == nil { + return false + } + + available := len(b.data) - b.readerIndex + if available >= n { + return true + } + + if b.readerIndex > 0 { + copy(b.data, b.data[b.readerIndex:]) + b.writerIndex -= b.readerIndex + b.readerIndex = 0 + b.data = b.data[:b.writerIndex] + } + + if cap(b.data) < n { + newCap := cap(b.data) * 2 + if newCap < n { + newCap = n + } + if newCap < b.minCap { + newCap = b.minCap + } + newData := make([]byte, len(b.data), newCap) + copy(newData, b.data) + b.data = newData + } + + for len(b.data) < n { + spare := b.data[len(b.data):cap(b.data)] + if len(spare) == 0 { + return false + } + readBytes, err := b.reader.Read(spare) + if readBytes > 0 { + b.data = b.data[:len(b.data)+readBytes] + b.writerIndex += readBytes + } + if err != nil { + if len(b.data) >= n { + return true + } + return false + } + } + return true +} + // grow ensures there's space for n more bytes. Hot path is inlined. // //go:inline @@ -185,8 +250,10 @@ func (b *ByteBuffer) WriteBinary(p []byte) { //go:inline func (b *ByteBuffer) ReadBool(err *Error) bool { if b.readerIndex+1 > len(b.data) { - *err = BufferOutOfBoundError(b.readerIndex, 1, len(b.data)) - return false + if !b.fill(1) { + *err = BufferOutOfBoundError(b.readerIndex, 1, len(b.data)) + return false + } } v := b.data[b.readerIndex] b.readerIndex++ @@ -198,8 +265,10 @@ func (b *ByteBuffer) ReadBool(err *Error) bool { //go:inline func (b *ByteBuffer) ReadByte(err *Error) byte { if b.readerIndex+1 > len(b.data) { - *err = BufferOutOfBoundError(b.readerIndex, 1, len(b.data)) - return 0 + if !b.fill(1) { + *err = BufferOutOfBoundError(b.readerIndex, 1, len(b.data)) + return 0 + } } v := b.data[b.readerIndex] b.readerIndex++ @@ -211,8 +280,10 @@ func (b *ByteBuffer) ReadByte(err *Error) byte { //go:inline func (b *ByteBuffer) ReadInt8(err *Error) int8 { if b.readerIndex+1 > len(b.data) { - *err = BufferOutOfBoundError(b.readerIndex, 1, len(b.data)) - return 0 + if !b.fill(1) { + *err = BufferOutOfBoundError(b.readerIndex, 1, len(b.data)) + return 0 + } } v := int8(b.data[b.readerIndex]) b.readerIndex++ @@ -224,8 +295,10 @@ func (b *ByteBuffer) ReadInt8(err *Error) int8 { //go:inline func (b *ByteBuffer) ReadInt16(err *Error) int16 { if b.readerIndex+2 > len(b.data) { - *err = BufferOutOfBoundError(b.readerIndex, 2, len(b.data)) - return 0 + if !b.fill(2) { + *err = BufferOutOfBoundError(b.readerIndex, 2, len(b.data)) + return 0 + } } v := int16(binary.LittleEndian.Uint16(b.data[b.readerIndex:])) b.readerIndex += 2 @@ -237,8 +310,10 @@ func (b *ByteBuffer) ReadInt16(err *Error) int16 { //go:inline func (b *ByteBuffer) ReadUint16(err *Error) uint16 { if b.readerIndex+2 > len(b.data) { - *err = BufferOutOfBoundError(b.readerIndex, 2, len(b.data)) - return 0 + if !b.fill(2) { + *err = BufferOutOfBoundError(b.readerIndex, 2, len(b.data)) + return 0 + } } v := binary.LittleEndian.Uint16(b.data[b.readerIndex:]) b.readerIndex += 2 @@ -250,8 +325,10 @@ func (b *ByteBuffer) ReadUint16(err *Error) uint16 { //go:inline func (b *ByteBuffer) ReadUint32(err *Error) uint32 { if b.readerIndex+4 > len(b.data) { - *err = BufferOutOfBoundError(b.readerIndex, 4, len(b.data)) - return 0 + if !b.fill(4) { + *err = BufferOutOfBoundError(b.readerIndex, 4, len(b.data)) + return 0 + } } i := binary.LittleEndian.Uint32(b.data[b.readerIndex:]) b.readerIndex += 4 @@ -263,8 +340,10 @@ func (b *ByteBuffer) ReadUint32(err *Error) uint32 { //go:inline func (b *ByteBuffer) ReadUint64(err *Error) uint64 { if b.readerIndex+8 > len(b.data) { - *err = BufferOutOfBoundError(b.readerIndex, 8, len(b.data)) - return 0 + if !b.fill(8) { + *err = BufferOutOfBoundError(b.readerIndex, 8, len(b.data)) + return 0 + } } i := binary.LittleEndian.Uint64(b.data[b.readerIndex:]) b.readerIndex += 8 @@ -308,9 +387,20 @@ func (b *ByteBuffer) Read(p []byte) (n int, err error) { // ReadBinary reads n bytes and sets error on bounds violation func (b *ByteBuffer) ReadBinary(length int, err *Error) []byte { if b.readerIndex+length > len(b.data) { - *err = BufferOutOfBoundError(b.readerIndex, length, len(b.data)) - return nil + if !b.fill(length) { + *err = BufferOutOfBoundError(b.readerIndex, length, len(b.data)) + return nil + } + } + + if b.reader != nil { + // In stream mode, compaction might overwrite these bytes, so we must copy + result := make([]byte, length) + copy(result, b.data[b.readerIndex:b.readerIndex+length]) + b.readerIndex += length + return result } + v := b.data[b.readerIndex : b.readerIndex+length] b.readerIndex += length return v @@ -360,13 +450,27 @@ func (b *ByteBuffer) SetReaderIndex(index int) { func (b *ByteBuffer) Reset() { b.readerIndex = 0 b.writerIndex = 0 + b.reader = nil // Keep the underlying buffer if it's reasonable sized to reduce allocations // Only nil it out if we want to release memory if cap(b.data) > 64*1024 { b.data = nil } - // Note: We keep b.data as-is (with its current length) to avoid issues - // with grow() needing to expand the slice on first write +} + +func (b *ByteBuffer) ResetWithReader(r io.Reader, minCap int) { + b.readerIndex = 0 + b.writerIndex = 0 + b.reader = r + if minCap <= 0 { + minCap = 4096 + } + b.minCap = minCap + if cap(b.data) < minCap { + b.data = make([]byte, 0, minCap) + } else { + b.data = b.data[:0] + } } // Reserve ensures buffer has at least n bytes available for writing from current position. @@ -877,7 +981,7 @@ func (b *ByteBuffer) WriteVaruint36Small(value uint64) { // //go:inline func (b *ByteBuffer) ReadVaruint36Small(err *Error) uint64 { - if b.remaining() >= 8 { + if b.remaining() >= 8 || (b.reader != nil && b.fill(8)) { return b.readVaruint36SmallFast() } return b.readVaruint36SmallSlow(err) @@ -921,7 +1025,13 @@ func (b *ByteBuffer) readVaruint36SmallSlow(err *Error) uint64 { var result uint64 var shift uint - for b.readerIndex < len(b.data) { + for { + if b.readerIndex >= len(b.data) { + if !b.fill(1) { + *err = BufferOutOfBoundError(b.readerIndex, 1, len(b.data)) + return 0 + } + } byteVal := b.data[b.readerIndex] b.readerIndex++ result |= uint64(byteVal&0x7F) << shift @@ -934,8 +1044,6 @@ func (b *ByteBuffer) readVaruint36SmallSlow(err *Error) uint64 { return 0 } } - *err = BufferOutOfBoundError(b.readerIndex, 1, len(b.data)) - return 0 } // ReadVarint64 reads the varint encoded with zig-zag (compatible with Java's readVarint64). @@ -975,8 +1083,10 @@ func (b *ByteBuffer) WriteTaggedInt64(value int64) { // Otherwise, skip flag byte and read 8 bytes as int64. func (b *ByteBuffer) ReadTaggedInt64(err *Error) int64 { if b.readerIndex+4 > len(b.data) { - *err = BufferOutOfBoundError(b.readerIndex, 4, len(b.data)) - return 0 + if !b.fill(4) { + *err = BufferOutOfBoundError(b.readerIndex, 4, len(b.data)) + return 0 + } } var i int32 if isLittleEndian { @@ -989,8 +1099,10 @@ func (b *ByteBuffer) ReadTaggedInt64(err *Error) int64 { return int64(i >> 1) // arithmetic right shift } if b.readerIndex+9 > len(b.data) { - *err = BufferOutOfBoundError(b.readerIndex, 9, len(b.data)) - return 0 + if !b.fill(9) { + *err = BufferOutOfBoundError(b.readerIndex, 9, len(b.data)) + return 0 + } } var value int64 if isLittleEndian { @@ -1026,8 +1138,10 @@ func (b *ByteBuffer) WriteTaggedUint64(value uint64) { // Otherwise, skip flag byte and read 8 bytes as uint64. func (b *ByteBuffer) ReadTaggedUint64(err *Error) uint64 { if b.readerIndex+4 > len(b.data) { - *err = BufferOutOfBoundError(b.readerIndex, 4, len(b.data)) - return 0 + if !b.fill(4) { + *err = BufferOutOfBoundError(b.readerIndex, 4, len(b.data)) + return 0 + } } var i uint32 if isLittleEndian { @@ -1040,8 +1154,10 @@ func (b *ByteBuffer) ReadTaggedUint64(err *Error) uint64 { return uint64(i >> 1) } if b.readerIndex+9 > len(b.data) { - *err = BufferOutOfBoundError(b.readerIndex, 9, len(b.data)) - return 0 + if !b.fill(9) { + *err = BufferOutOfBoundError(b.readerIndex, 9, len(b.data)) + return 0 + } } var value uint64 if isLittleEndian { @@ -1057,7 +1173,7 @@ func (b *ByteBuffer) ReadTaggedUint64(err *Error) uint64 { // //go:inline func (b *ByteBuffer) ReadVarUint64(err *Error) uint64 { - if b.remaining() >= 9 { + if b.remaining() >= 9 || (b.reader != nil && b.fill(9)) { return b.readVarUint64Fast() } return b.readVarUint64Slow(err) @@ -1122,8 +1238,10 @@ func (b *ByteBuffer) readVarUint64Slow(err *Error) uint64 { var shift uint for i := 0; i < 8; i++ { if b.readerIndex >= len(b.data) { - *err = BufferOutOfBoundError(b.readerIndex, 1, len(b.data)) - return 0 + if !b.fill(1) { + *err = BufferOutOfBoundError(b.readerIndex, 1, len(b.data)) + return 0 + } } byteVal := b.data[b.readerIndex] b.readerIndex++ @@ -1134,8 +1252,10 @@ func (b *ByteBuffer) readVarUint64Slow(err *Error) uint64 { shift += 7 } if b.readerIndex >= len(b.data) { - *err = BufferOutOfBoundError(b.readerIndex, 1, len(b.data)) - return 0 + if !b.fill(1) { + *err = BufferOutOfBoundError(b.readerIndex, 1, len(b.data)) + return 0 + } } byteVal := b.data[b.readerIndex] b.readerIndex++ @@ -1154,8 +1274,10 @@ func (b *ByteBuffer) remaining() int { //go:inline func (b *ByteBuffer) ReadUint8(err *Error) uint8 { if b.readerIndex >= len(b.data) { - *err = BufferOutOfBoundError(b.readerIndex, 1, len(b.data)) - return 0 + if !b.fill(1) { + *err = BufferOutOfBoundError(b.readerIndex, 1, len(b.data)) + return 0 + } } v := b.data[b.readerIndex] b.readerIndex++ @@ -1228,7 +1350,7 @@ func (b *ByteBuffer) UnsafeReadVarUint64() uint64 { // //go:inline func (b *ByteBuffer) ReadVarUint32(err *Error) uint32 { - if b.remaining() >= 8 { // Need 8 bytes for bulk uint64 read in fast path + if b.remaining() >= 8 || (b.reader != nil && b.fill(8)) { // Need 8 bytes for bulk uint64 read in fast path return b.readVarUint32Fast() } return b.readVarUint32Slow(err) @@ -1276,8 +1398,10 @@ func (b *ByteBuffer) readVarUint32Slow(err *Error) uint32 { var shift uint for { if b.readerIndex >= len(b.data) { - *err = BufferOutOfBoundError(b.readerIndex, 1, len(b.data)) - return 0 + if !b.fill(1) { + *err = BufferOutOfBoundError(b.readerIndex, 1, len(b.data)) + return 0 + } } byteVal := b.data[b.readerIndex] b.readerIndex++ @@ -1437,8 +1561,10 @@ func (b *ByteBuffer) UnsafePutTaggedUint64(offset int, value uint64) int { // ReadVarUint32Small7 reads a VarUint32 in small-7 format with error checking func (b *ByteBuffer) ReadVarUint32Small7(err *Error) uint32 { if b.readerIndex >= len(b.data) { - *err = BufferOutOfBoundError(b.readerIndex, 1, len(b.data)) - return 0 + if !b.fill(1) { + *err = BufferOutOfBoundError(b.readerIndex, 1, len(b.data)) + return 0 + } } readIdx := b.readerIndex v := b.data[readIdx] @@ -1486,47 +1612,25 @@ func (b *ByteBuffer) continueReadVarUint32(readIdx int, bulkRead, value uint32) } func (b *ByteBuffer) readVaruint36Slow(err *Error) uint64 { - if b.readerIndex >= len(b.data) { - *err = BufferOutOfBoundError(b.readerIndex, 1, len(b.data)) - return 0 - } - b0 := b.data[b.readerIndex] - b.readerIndex++ - result := uint64(b0 & 0x7F) - if b0&0x80 != 0 { + var shift uint + var result uint64 + for { if b.readerIndex >= len(b.data) { - *err = BufferOutOfBoundError(b.readerIndex, 1, len(b.data)) - return 0 - } - b1 := b.data[b.readerIndex] - b.readerIndex++ - result |= uint64(b1&0x7F) << 7 - if b1&0x80 != 0 { - if b.readerIndex >= len(b.data) { + if !b.fill(1) { *err = BufferOutOfBoundError(b.readerIndex, 1, len(b.data)) return 0 } - b2 := b.data[b.readerIndex] - b.readerIndex++ - result |= uint64(b2&0x7F) << 14 - if b2&0x80 != 0 { - if b.readerIndex >= len(b.data) { - *err = BufferOutOfBoundError(b.readerIndex, 1, len(b.data)) - return 0 - } - b3 := b.data[b.readerIndex] - b.readerIndex++ - result |= uint64(b3&0x7F) << 21 - if b3&0x80 != 0 { - if b.readerIndex >= len(b.data) { - *err = BufferOutOfBoundError(b.readerIndex, 1, len(b.data)) - return 0 - } - b4 := b.data[b.readerIndex] - b.readerIndex++ - result |= uint64(b4) << 28 - } - } + } + b0 := b.data[b.readerIndex] + b.readerIndex++ + result |= uint64(b0&0x7F) << shift + if b0&0x80 == 0 { + break + } + shift += 7 + if shift >= 35 { + *err = DeserializationError("varuint36 overflow") + return 0 } } return result @@ -1549,9 +1653,20 @@ func (b *ByteBuffer) IncreaseReaderIndex(n int) { // ReadBytes reads n bytes and sets error on bounds violation func (b *ByteBuffer) ReadBytes(n int, err *Error) []byte { if b.readerIndex+n > len(b.data) { - *err = BufferOutOfBoundError(b.readerIndex, n, len(b.data)) - return nil + if !b.fill(n) { + *err = BufferOutOfBoundError(b.readerIndex, n, len(b.data)) + return nil + } + } + + if b.reader != nil { + // In stream mode, compaction might overwrite these bytes, so we must copy + result := make([]byte, n) + copy(result, b.data[b.readerIndex:b.readerIndex+n]) + b.readerIndex += n + return result } + p := b.data[b.readerIndex : b.readerIndex+n] b.readerIndex += n return p @@ -1560,8 +1675,10 @@ func (b *ByteBuffer) ReadBytes(n int, err *Error) []byte { // Skip skips n bytes and sets error on bounds violation func (b *ByteBuffer) Skip(length int, err *Error) { if b.readerIndex+length > len(b.data) { - *err = BufferOutOfBoundError(b.readerIndex, length, len(b.data)) - return + if !b.fill(length) { + *err = BufferOutOfBoundError(b.readerIndex, length, len(b.data)) + return + } } b.readerIndex += length } diff --git a/go/fory/fory.go b/go/fory/fory.go index 09a0e3c6d2..c04e822d5a 100644 --- a/go/fory/fory.go +++ b/go/fory/fory.go @@ -20,6 +20,7 @@ package fory import ( "errors" "fmt" + "io" "reflect" "strconv" "strings" @@ -495,6 +496,105 @@ func (f *Fory) Deserialize(data []byte, v any) error { return nil } +// StreamReader supports robust sequential deserialization from a stream. +// It maintains the ByteBuffer and ReadContext state across multiple Deserialize calls, +// preventing data loss from prefetched buffers and preserving TypeResolver metadata +// (Meta Sharing) across object boundaries. +type StreamReader struct { + fory *Fory + reader io.Reader + buffer *ByteBuffer +} + +// NewStreamReader creates a new StreamReader that reads from the provided io.Reader. +// The StreamReader owns the buffer and maintains state across sequential Deserialize calls. +func (f *Fory) NewStreamReader(r io.Reader) *StreamReader { + return f.NewStreamReaderWithMinCap(r, 0) +} + +// NewStreamReaderWithMinCap creates a new StreamReader with a specified minimum buffer capacity. +func (f *Fory) NewStreamReaderWithMinCap(r io.Reader, minCap int) *StreamReader { + buf := NewByteBufferFromReader(r, minCap) + return &StreamReader{ + fory: f, + reader: r, + buffer: buf, + } +} + +// Deserialize reads the next object from the stream into the provided value. +// It uses a shared ReadContext for the lifetime of the StreamReader, clearing +// temporary state between calls but preserving the buffer and TypeResolver state. +func (sr *StreamReader) Deserialize(v any) error { + f := sr.fory + + // We only reset the temporary read state (like refTracker and outOfBand buffers), + // NOT the buffer or the type mapping, which must persist. + defer func() { + f.readCtx.refReader.Reset() + f.readCtx.outOfBandBuffers = nil + f.readCtx.outOfBandIndex = 0 + f.readCtx.err = Error{} + if f.readCtx.refResolver != nil { + f.readCtx.refResolver.resetRead() + } + }() + + // Temporarily swap buffer + origBuffer := f.readCtx.buffer + f.readCtx.buffer = sr.buffer + + isNull := readHeader(f.readCtx) + if f.readCtx.HasError() { + f.readCtx.buffer = origBuffer + return f.readCtx.TakeError() + } + + if isNull { + f.readCtx.buffer = origBuffer + return nil + } + + target := reflect.ValueOf(v).Elem() + f.readCtx.ReadValue(target, RefModeTracking, true) + if f.readCtx.HasError() { + f.readCtx.buffer = origBuffer + return f.readCtx.TakeError() + } + + // Restore original buffer + f.readCtx.buffer = origBuffer + + return nil +} + +// For Sequential Streaming use NewStreamReader instead of DeserializeFromReader. +// DeserializeFromReader deserializes a single object from a stream but will discard prefetched data +// and type metadata after the call. +func (f *Fory) DeserializeFromReader(r io.Reader, v any) error { + defer f.resetReadState() + if f.readCtx.buffer.reader != r { + f.readCtx.buffer.ResetWithReader(r, 0) + } + + isNull := readHeader(f.readCtx) + if f.readCtx.HasError() { + return f.readCtx.TakeError() + } + + if isNull { + return nil + } + + target := reflect.ValueOf(v).Elem() + f.readCtx.ReadValue(target, RefModeTracking, true) + if f.readCtx.HasError() { + return f.readCtx.TakeError() + } + + return nil +} + // resetReadState resets read context state without allocation func (f *Fory) resetReadState() { f.readCtx.Reset() diff --git a/go/fory/reader.go b/go/fory/reader.go index e7a1df1710..a0a37d92fb 100644 --- a/go/fory/reader.go +++ b/go/fory/reader.go @@ -83,6 +83,7 @@ func (c *ReadContext) SetData(data []byte) { c.buffer.data = data c.buffer.readerIndex = 0 c.buffer.writerIndex = len(data) + c.buffer.reader = nil } } diff --git a/go/fory/stream_test.go b/go/fory/stream_test.go new file mode 100644 index 0000000000..e48fa37868 --- /dev/null +++ b/go/fory/stream_test.go @@ -0,0 +1,191 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package fory + +import ( + "bytes" + "io" + "testing" +) + +type StreamTestStruct struct { + ID int32 + Name string + Data []byte +} + +func TestStreamDeserialization(t *testing.T) { + f := New() + f.RegisterStruct(&StreamTestStruct{}, 100) + + original := &StreamTestStruct{ + ID: 42, + Name: "Stream Test", + Data: []byte{1, 2, 3, 4, 5}, + } + + data, err := f.Serialize(original) + if err != nil { + t.Fatalf("Serialize failed: %v", err) + } + + // 1. Test normal reader + reader := bytes.NewReader(data) + var decoded StreamTestStruct + err = f.DeserializeFromReader(reader, &decoded) + if err != nil { + t.Fatalf("DeserializeFromReader failed: %v", err) + } + + if decoded.ID != original.ID || decoded.Name != original.Name || !bytes.Equal(decoded.Data, original.Data) { + t.Errorf("Decoded value mismatch. Got: %+v, Want: %+v", decoded, original) + } +} + +// slowReader returns data byte by byte to test fill() logic and compaction +type slowReader struct { + data []byte + pos int +} + +func (r *slowReader) Read(p []byte) (n int, err error) { + if r.pos >= len(r.data) { + return 0, io.EOF + } + if len(p) == 0 { + return 0, nil + } + p[0] = r.data[r.pos] + r.pos++ + return 1, nil +} + +func TestStreamDeserializationSlow(t *testing.T) { + f := New() + f.RegisterStruct(&StreamTestStruct{}, 100) + + original := &StreamTestStruct{ + ID: 42, + Name: "Slow Stream Test with a reasonably long string and some data to trigger multiple fills", + Data: bytes.Repeat([]byte{0xAA}, 100), + } + + data, err := f.Serialize(original) + if err != nil { + t.Fatalf("Serialize failed: %v", err) + } + + // Test with slow reader and small minCap to force compaction/growth + reader := &slowReader{data: data} + var decoded StreamTestStruct + // Use small minCap (16) to force frequent fills and compactions + f.readCtx.buffer.ResetWithReader(reader, 16) + + err = f.DeserializeFromReader(reader, &decoded) + if err != nil { + t.Fatalf("DeserializeFromReader (slow) failed: %v", err) + } + + if decoded.ID != original.ID || decoded.Name != original.Name || !bytes.Equal(decoded.Data, original.Data) { + t.Errorf("Decoded value mismatch (slow). Got: %+v, Want: %+v", decoded, original) + } +} + +func TestStreamDeserializationEOF(t *testing.T) { + f := New() + f.RegisterStruct(&StreamTestStruct{}, 100) + + original := &StreamTestStruct{ + ID: 42, + Name: "EOF Test", + } + + data, err := f.Serialize(original) + if err != nil { + t.Fatalf("Serialize failed: %v", err) + } + + // Truncate data to cause unexpected EOF during reading Name + truncated := data[:len(data)-2] + reader := bytes.NewReader(truncated) + var decoded StreamTestStruct + err = f.DeserializeFromReader(reader, &decoded) + if err == nil { + t.Fatal("Expected error on truncated stream, got nil") + } + + // Ideally it should be a BufferOutOfBoundError + if _, ok := err.(Error); !ok { + t.Errorf("Expected fory.Error, got %T: %v", err, err) + } +} + +func TestStreamReaderSequential(t *testing.T) { + f := New() + // Register type in compatible mode to test Meta Sharing across sequential reads + f.config.Compatible = true + f.RegisterStruct(&StreamTestStruct{}, 100) + + msg1 := &StreamTestStruct{ID: 1, Name: "Msg 1", Data: []byte{1, 1}} + msg2 := &StreamTestStruct{ID: 2, Name: "Msg 2", Data: []byte{2, 2}} + msg3 := &StreamTestStruct{ID: 3, Name: "Msg 3", Data: []byte{3, 3}} + + var buf bytes.Buffer + + // Serialize sequentially into one stream + data1, _ := f.Serialize(msg1) + buf.Write(data1) + data2, _ := f.Serialize(msg2) + buf.Write(data2) + data3, _ := f.Serialize(msg3) + buf.Write(data3) + + fDec := New() + fDec.config.Compatible = true + fDec.RegisterStruct(&StreamTestStruct{}, 100) + + // Create a StreamReader + sr := fDec.NewStreamReader(&buf) + + // Deserialize sequentially + var out1, out2, out3 StreamTestStruct + + err := sr.Deserialize(&out1) + if err != nil { + t.Fatalf("Deserialize 1 failed: %v", err) + } + if out1.ID != msg1.ID || out1.Name != msg1.Name || !bytes.Equal(out1.Data, msg1.Data) { + t.Errorf("Msg 1 mismatch. Got: %+v, Want: %+v", out1, msg1) + } + + err = sr.Deserialize(&out2) + if err != nil { + t.Fatalf("Deserialize 2 failed: %v", err) + } + if out2.ID != msg2.ID || out2.Name != msg2.Name || !bytes.Equal(out2.Data, msg2.Data) { + t.Errorf("Msg 2 mismatch. Got: %+v, Want: %+v", out2, msg2) + } + + err = sr.Deserialize(&out3) + if err != nil { + t.Fatalf("Deserialize 3 failed: %v", err) + } + if out3.ID != msg3.ID || out3.Name != msg3.Name || !bytes.Equal(out3.Data, msg3.Data) { + t.Errorf("Msg 3 mismatch. Got: %+v, Want: %+v", out3, msg3) + } +}