From ba26d6a615fe2e02a1fe12c668e608f1b585d1cb Mon Sep 17 00:00:00 2001 From: ayush00git Date: Fri, 20 Feb 2026 10:15:36 +0530 Subject: [PATCH 01/12] added io.Reader to ByteBuffer for streaming deserialization --- go/fory/buffer.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/go/fory/buffer.go b/go/fory/buffer.go index f008333001..70e47a5320 100644 --- a/go/fory/buffer.go +++ b/go/fory/buffer.go @@ -20,6 +20,7 @@ package fory import ( "encoding/binary" "fmt" + "io" "math" "unsafe" ) @@ -28,6 +29,8 @@ type ByteBuffer struct { data []byte // Most accessed field first for cache locality writerIndex int readerIndex int + reader io.Reader + minCap int } func NewByteBuffer(data []byte) *ByteBuffer { From 8ba4dba6de8d40049a2b0ee0ccc974301cec37e3 Mon Sep 17 00:00:00 2001 From: ayush00git Date: Fri, 20 Feb 2026 14:01:07 +0530 Subject: [PATCH 02/12] added NewByteBufferFromReader and fill method --- go/fory/buffer.go | 66 +++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 64 insertions(+), 2 deletions(-) diff --git a/go/fory/buffer.go b/go/fory/buffer.go index 70e47a5320..f052debe24 100644 --- a/go/fory/buffer.go +++ b/go/fory/buffer.go @@ -29,14 +29,76 @@ type ByteBuffer struct { data []byte // Most accessed field first for cache locality writerIndex int readerIndex int - reader io.Reader - minCap int + reader io.Reader + minCap int } func NewByteBuffer(data []byte) *ByteBuffer { return &ByteBuffer{data: data} } +func NewByteBufferFromReader(r io.Reader, minCap int) *ByteBuffer { + if minCap <= 0 { + minCap = 4096 + } + return &ByteBuffer{ + data: make([]byte, 0, minCap), + reader: r, + minCap: minCap, + } +} + +//go:noinline +func (b *ByteBuffer) fill(n int) bool { + if b.reader == nil { + return false + } + + available := len(b.data) - b.readerIndex + if available >= n { + return true + } + + if b.readerIndex > 0 { + copy(b.data, b.data[b.readerIndex:]) + b.writerIndex -= b.readerIndex + b.readerIndex = 0 + b.data = b.data[:b.writerIndex] + } + + if cap(b.data) < n { + newCap := cap(b.data) * 2 + if newCap < n { + newCap = n + } + if newCap < b.minCap { + newCap = b.minCap + } + newData := make([]byte, len(b.data), newCap) + copy(newData, b.data) + b.data = newData + } + + for len(b.data) < n { + spare := b.data[len(b.data):cap(b.data)] + if len(spare) == 0 { + return false + } + readBytes, err := b.reader.Read(spare) + if readBytes > 0 { + b.data = b.data[:len(b.data)+readBytes] + b.writerIndex += readBytes + } + if err != nil { + if len(b.data) >= n { + return true + } + return false + } + } + return true +} + // grow ensures there's space for n more bytes. Hot path is inlined. // //go:inline From e4baf32cb4133bd25403ed026de366c2d44eccea Mon Sep 17 00:00:00 2001 From: ayush00git Date: Fri, 20 Feb 2026 14:32:02 +0530 Subject: [PATCH 03/12] added condition to check for read stream and ResetByteBuffer method --- go/fory/buffer.go | 200 +++++++++++++++++++++++++++------------------- 1 file changed, 117 insertions(+), 83 deletions(-) diff --git a/go/fory/buffer.go b/go/fory/buffer.go index f052debe24..e4a43b701e 100644 --- a/go/fory/buffer.go +++ b/go/fory/buffer.go @@ -250,8 +250,10 @@ func (b *ByteBuffer) WriteBinary(p []byte) { //go:inline func (b *ByteBuffer) ReadBool(err *Error) bool { if b.readerIndex+1 > len(b.data) { - *err = BufferOutOfBoundError(b.readerIndex, 1, len(b.data)) - return false + if !b.fill(1) { + *err = BufferOutOfBoundError(b.readerIndex, 1, len(b.data)) + return false + } } v := b.data[b.readerIndex] b.readerIndex++ @@ -263,8 +265,10 @@ func (b *ByteBuffer) ReadBool(err *Error) bool { //go:inline func (b *ByteBuffer) ReadByte(err *Error) byte { if b.readerIndex+1 > len(b.data) { - *err = BufferOutOfBoundError(b.readerIndex, 1, len(b.data)) - return 0 + if !b.fill(1) { + *err = BufferOutOfBoundError(b.readerIndex, 1, len(b.data)) + return 0 + } } v := b.data[b.readerIndex] b.readerIndex++ @@ -276,8 +280,10 @@ func (b *ByteBuffer) ReadByte(err *Error) byte { //go:inline func (b *ByteBuffer) ReadInt8(err *Error) int8 { if b.readerIndex+1 > len(b.data) { - *err = BufferOutOfBoundError(b.readerIndex, 1, len(b.data)) - return 0 + if !b.fill(1) { + *err = BufferOutOfBoundError(b.readerIndex, 1, len(b.data)) + return 0 + } } v := int8(b.data[b.readerIndex]) b.readerIndex++ @@ -289,8 +295,10 @@ func (b *ByteBuffer) ReadInt8(err *Error) int8 { //go:inline func (b *ByteBuffer) ReadInt16(err *Error) int16 { if b.readerIndex+2 > len(b.data) { - *err = BufferOutOfBoundError(b.readerIndex, 2, len(b.data)) - return 0 + if !b.fill(2) { + *err = BufferOutOfBoundError(b.readerIndex, 2, len(b.data)) + return 0 + } } v := int16(binary.LittleEndian.Uint16(b.data[b.readerIndex:])) b.readerIndex += 2 @@ -302,8 +310,10 @@ func (b *ByteBuffer) ReadInt16(err *Error) int16 { //go:inline func (b *ByteBuffer) ReadUint16(err *Error) uint16 { if b.readerIndex+2 > len(b.data) { - *err = BufferOutOfBoundError(b.readerIndex, 2, len(b.data)) - return 0 + if !b.fill(2) { + *err = BufferOutOfBoundError(b.readerIndex, 2, len(b.data)) + return 0 + } } v := binary.LittleEndian.Uint16(b.data[b.readerIndex:]) b.readerIndex += 2 @@ -315,8 +325,10 @@ func (b *ByteBuffer) ReadUint16(err *Error) uint16 { //go:inline func (b *ByteBuffer) ReadUint32(err *Error) uint32 { if b.readerIndex+4 > len(b.data) { - *err = BufferOutOfBoundError(b.readerIndex, 4, len(b.data)) - return 0 + if !b.fill(4) { + *err = BufferOutOfBoundError(b.readerIndex, 4, len(b.data)) + return 0 + } } i := binary.LittleEndian.Uint32(b.data[b.readerIndex:]) b.readerIndex += 4 @@ -328,8 +340,10 @@ func (b *ByteBuffer) ReadUint32(err *Error) uint32 { //go:inline func (b *ByteBuffer) ReadUint64(err *Error) uint64 { if b.readerIndex+8 > len(b.data) { - *err = BufferOutOfBoundError(b.readerIndex, 8, len(b.data)) - return 0 + if !b.fill(8) { + *err = BufferOutOfBoundError(b.readerIndex, 8, len(b.data)) + return 0 + } } i := binary.LittleEndian.Uint64(b.data[b.readerIndex:]) b.readerIndex += 8 @@ -373,8 +387,10 @@ func (b *ByteBuffer) Read(p []byte) (n int, err error) { // ReadBinary reads n bytes and sets error on bounds violation func (b *ByteBuffer) ReadBinary(length int, err *Error) []byte { if b.readerIndex+length > len(b.data) { - *err = BufferOutOfBoundError(b.readerIndex, length, len(b.data)) - return nil + if !b.fill(length) { + *err = BufferOutOfBoundError(b.readerIndex, length, len(b.data)) + return nil + } } v := b.data[b.readerIndex : b.readerIndex+length] b.readerIndex += length @@ -425,13 +441,27 @@ func (b *ByteBuffer) SetReaderIndex(index int) { func (b *ByteBuffer) Reset() { b.readerIndex = 0 b.writerIndex = 0 + b.reader = nil // Keep the underlying buffer if it's reasonable sized to reduce allocations // Only nil it out if we want to release memory if cap(b.data) > 64*1024 { b.data = nil } - // Note: We keep b.data as-is (with its current length) to avoid issues - // with grow() needing to expand the slice on first write +} + +func (b *ByteBuffer) ResetWithReader(r io.Reader, minCap int) { + b.readerIndex = 0 + b.writerIndex = 0 + b.reader = r + if minCap <= 0 { + minCap = 4096 + } + b.minCap = minCap + if cap(b.data) < minCap { + b.data = make([]byte, 0, minCap) + } else { + b.data = b.data[:0] + } } // Reserve ensures buffer has at least n bytes available for writing from current position. @@ -942,7 +972,7 @@ func (b *ByteBuffer) WriteVaruint36Small(value uint64) { // //go:inline func (b *ByteBuffer) ReadVaruint36Small(err *Error) uint64 { - if b.remaining() >= 8 { + if b.remaining() >= 8 || (b.reader != nil && b.fill(8)) { return b.readVaruint36SmallFast() } return b.readVaruint36SmallSlow(err) @@ -986,7 +1016,13 @@ func (b *ByteBuffer) readVaruint36SmallSlow(err *Error) uint64 { var result uint64 var shift uint - for b.readerIndex < len(b.data) { + for { + if b.readerIndex >= len(b.data) { + if !b.fill(1) { + *err = BufferOutOfBoundError(b.readerIndex, 1, len(b.data)) + return 0 + } + } byteVal := b.data[b.readerIndex] b.readerIndex++ result |= uint64(byteVal&0x7F) << shift @@ -999,8 +1035,6 @@ func (b *ByteBuffer) readVaruint36SmallSlow(err *Error) uint64 { return 0 } } - *err = BufferOutOfBoundError(b.readerIndex, 1, len(b.data)) - return 0 } // ReadVarint64 reads the varint encoded with zig-zag (compatible with Java's readVarint64). @@ -1040,8 +1074,10 @@ func (b *ByteBuffer) WriteTaggedInt64(value int64) { // Otherwise, skip flag byte and read 8 bytes as int64. func (b *ByteBuffer) ReadTaggedInt64(err *Error) int64 { if b.readerIndex+4 > len(b.data) { - *err = BufferOutOfBoundError(b.readerIndex, 4, len(b.data)) - return 0 + if !b.fill(4) { + *err = BufferOutOfBoundError(b.readerIndex, 4, len(b.data)) + return 0 + } } var i int32 if isLittleEndian { @@ -1054,8 +1090,10 @@ func (b *ByteBuffer) ReadTaggedInt64(err *Error) int64 { return int64(i >> 1) // arithmetic right shift } if b.readerIndex+9 > len(b.data) { - *err = BufferOutOfBoundError(b.readerIndex, 9, len(b.data)) - return 0 + if !b.fill(9) { + *err = BufferOutOfBoundError(b.readerIndex, 9, len(b.data)) + return 0 + } } var value int64 if isLittleEndian { @@ -1091,8 +1129,10 @@ func (b *ByteBuffer) WriteTaggedUint64(value uint64) { // Otherwise, skip flag byte and read 8 bytes as uint64. func (b *ByteBuffer) ReadTaggedUint64(err *Error) uint64 { if b.readerIndex+4 > len(b.data) { - *err = BufferOutOfBoundError(b.readerIndex, 4, len(b.data)) - return 0 + if !b.fill(4) { + *err = BufferOutOfBoundError(b.readerIndex, 4, len(b.data)) + return 0 + } } var i uint32 if isLittleEndian { @@ -1105,8 +1145,10 @@ func (b *ByteBuffer) ReadTaggedUint64(err *Error) uint64 { return uint64(i >> 1) } if b.readerIndex+9 > len(b.data) { - *err = BufferOutOfBoundError(b.readerIndex, 9, len(b.data)) - return 0 + if !b.fill(9) { + *err = BufferOutOfBoundError(b.readerIndex, 9, len(b.data)) + return 0 + } } var value uint64 if isLittleEndian { @@ -1122,7 +1164,7 @@ func (b *ByteBuffer) ReadTaggedUint64(err *Error) uint64 { // //go:inline func (b *ByteBuffer) ReadVarUint64(err *Error) uint64 { - if b.remaining() >= 9 { + if b.remaining() >= 9 || (b.reader != nil && b.fill(9)) { return b.readVarUint64Fast() } return b.readVarUint64Slow(err) @@ -1187,8 +1229,10 @@ func (b *ByteBuffer) readVarUint64Slow(err *Error) uint64 { var shift uint for i := 0; i < 8; i++ { if b.readerIndex >= len(b.data) { - *err = BufferOutOfBoundError(b.readerIndex, 1, len(b.data)) - return 0 + if !b.fill(1) { + *err = BufferOutOfBoundError(b.readerIndex, 1, len(b.data)) + return 0 + } } byteVal := b.data[b.readerIndex] b.readerIndex++ @@ -1199,8 +1243,10 @@ func (b *ByteBuffer) readVarUint64Slow(err *Error) uint64 { shift += 7 } if b.readerIndex >= len(b.data) { - *err = BufferOutOfBoundError(b.readerIndex, 1, len(b.data)) - return 0 + if !b.fill(1) { + *err = BufferOutOfBoundError(b.readerIndex, 1, len(b.data)) + return 0 + } } byteVal := b.data[b.readerIndex] b.readerIndex++ @@ -1219,8 +1265,10 @@ func (b *ByteBuffer) remaining() int { //go:inline func (b *ByteBuffer) ReadUint8(err *Error) uint8 { if b.readerIndex >= len(b.data) { - *err = BufferOutOfBoundError(b.readerIndex, 1, len(b.data)) - return 0 + if !b.fill(1) { + *err = BufferOutOfBoundError(b.readerIndex, 1, len(b.data)) + return 0 + } } v := b.data[b.readerIndex] b.readerIndex++ @@ -1293,7 +1341,7 @@ func (b *ByteBuffer) UnsafeReadVarUint64() uint64 { // //go:inline func (b *ByteBuffer) ReadVarUint32(err *Error) uint32 { - if b.remaining() >= 8 { // Need 8 bytes for bulk uint64 read in fast path + if b.remaining() >= 8 || (b.reader != nil && b.fill(8)) { // Need 8 bytes for bulk uint64 read in fast path return b.readVarUint32Fast() } return b.readVarUint32Slow(err) @@ -1341,8 +1389,10 @@ func (b *ByteBuffer) readVarUint32Slow(err *Error) uint32 { var shift uint for { if b.readerIndex >= len(b.data) { - *err = BufferOutOfBoundError(b.readerIndex, 1, len(b.data)) - return 0 + if !b.fill(1) { + *err = BufferOutOfBoundError(b.readerIndex, 1, len(b.data)) + return 0 + } } byteVal := b.data[b.readerIndex] b.readerIndex++ @@ -1502,8 +1552,10 @@ func (b *ByteBuffer) UnsafePutTaggedUint64(offset int, value uint64) int { // ReadVarUint32Small7 reads a VarUint32 in small-7 format with error checking func (b *ByteBuffer) ReadVarUint32Small7(err *Error) uint32 { if b.readerIndex >= len(b.data) { - *err = BufferOutOfBoundError(b.readerIndex, 1, len(b.data)) - return 0 + if !b.fill(1) { + *err = BufferOutOfBoundError(b.readerIndex, 1, len(b.data)) + return 0 + } } readIdx := b.readerIndex v := b.data[readIdx] @@ -1551,47 +1603,25 @@ func (b *ByteBuffer) continueReadVarUint32(readIdx int, bulkRead, value uint32) } func (b *ByteBuffer) readVaruint36Slow(err *Error) uint64 { - if b.readerIndex >= len(b.data) { - *err = BufferOutOfBoundError(b.readerIndex, 1, len(b.data)) - return 0 - } - b0 := b.data[b.readerIndex] - b.readerIndex++ - result := uint64(b0 & 0x7F) - if b0&0x80 != 0 { + var shift uint + var result uint64 + for { if b.readerIndex >= len(b.data) { - *err = BufferOutOfBoundError(b.readerIndex, 1, len(b.data)) - return 0 - } - b1 := b.data[b.readerIndex] - b.readerIndex++ - result |= uint64(b1&0x7F) << 7 - if b1&0x80 != 0 { - if b.readerIndex >= len(b.data) { + if !b.fill(1) { *err = BufferOutOfBoundError(b.readerIndex, 1, len(b.data)) return 0 } - b2 := b.data[b.readerIndex] - b.readerIndex++ - result |= uint64(b2&0x7F) << 14 - if b2&0x80 != 0 { - if b.readerIndex >= len(b.data) { - *err = BufferOutOfBoundError(b.readerIndex, 1, len(b.data)) - return 0 - } - b3 := b.data[b.readerIndex] - b.readerIndex++ - result |= uint64(b3&0x7F) << 21 - if b3&0x80 != 0 { - if b.readerIndex >= len(b.data) { - *err = BufferOutOfBoundError(b.readerIndex, 1, len(b.data)) - return 0 - } - b4 := b.data[b.readerIndex] - b.readerIndex++ - result |= uint64(b4) << 28 - } - } + } + b0 := b.data[b.readerIndex] + b.readerIndex++ + result |= uint64(b0&0x7F) << shift + if b0&0x80 == 0 { + break + } + shift += 7 + if shift >= 35 { + *err = DeserializationError("varuint36 overflow") + return 0 } } return result @@ -1614,8 +1644,10 @@ func (b *ByteBuffer) IncreaseReaderIndex(n int) { // ReadBytes reads n bytes and sets error on bounds violation func (b *ByteBuffer) ReadBytes(n int, err *Error) []byte { if b.readerIndex+n > len(b.data) { - *err = BufferOutOfBoundError(b.readerIndex, n, len(b.data)) - return nil + if !b.fill(n) { + *err = BufferOutOfBoundError(b.readerIndex, n, len(b.data)) + return nil + } } p := b.data[b.readerIndex : b.readerIndex+n] b.readerIndex += n @@ -1625,8 +1657,10 @@ func (b *ByteBuffer) ReadBytes(n int, err *Error) []byte { // Skip skips n bytes and sets error on bounds violation func (b *ByteBuffer) Skip(length int, err *Error) { if b.readerIndex+length > len(b.data) { - *err = BufferOutOfBoundError(b.readerIndex, length, len(b.data)) - return + if !b.fill(length) { + *err = BufferOutOfBoundError(b.readerIndex, length, len(b.data)) + return + } } b.readerIndex += length } From f751916336992e55929b442c59bef94768e68e11 Mon Sep 17 00:00:00 2001 From: ayush00git Date: Fri, 20 Feb 2026 14:41:23 +0530 Subject: [PATCH 04/12] added stream deserializer and initialized buffer reader to 0 --- go/fory/fory.go | 23 +++++++++++++++++++++++ go/fory/reader.go | 1 + 2 files changed, 24 insertions(+) diff --git a/go/fory/fory.go b/go/fory/fory.go index 09a0e3c6d2..86b2e3c041 100644 --- a/go/fory/fory.go +++ b/go/fory/fory.go @@ -20,6 +20,7 @@ package fory import ( "errors" "fmt" + "io" "reflect" "strconv" "strings" @@ -495,6 +496,28 @@ func (f *Fory) Deserialize(data []byte, v any) error { return nil } +func (f *Fory) DeserializeFromReader(r io.Reader, v any) error { + defer f.resetReadState() + f.readCtx.buffer.ResetWithReader(r, 0) + + isNull := readHeader(f.readCtx) + if f.readCtx.HasError() { + return f.readCtx.TakeError() + } + + if isNull { + return nil + } + + target := reflect.ValueOf(v).Elem() + f.readCtx.ReadValue(target, RefModeTracking, true) + if f.readCtx.HasError() { + return f.readCtx.TakeError() + } + + return nil +} + // resetReadState resets read context state without allocation func (f *Fory) resetReadState() { f.readCtx.Reset() diff --git a/go/fory/reader.go b/go/fory/reader.go index e7a1df1710..a0a37d92fb 100644 --- a/go/fory/reader.go +++ b/go/fory/reader.go @@ -83,6 +83,7 @@ func (c *ReadContext) SetData(data []byte) { c.buffer.data = data c.buffer.readerIndex = 0 c.buffer.writerIndex = len(data) + c.buffer.reader = nil } } From 88ecb2dc1757736cecd4ec5a1b35cb13b0c8d030 Mon Sep 17 00:00:00 2001 From: ayush00git Date: Fri, 20 Feb 2026 14:48:54 +0530 Subject: [PATCH 05/12] added stream test suites --- go/fory/stream_test.go | 119 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 119 insertions(+) create mode 100644 go/fory/stream_test.go diff --git a/go/fory/stream_test.go b/go/fory/stream_test.go new file mode 100644 index 0000000000..c2da9e1638 --- /dev/null +++ b/go/fory/stream_test.go @@ -0,0 +1,119 @@ +package fory + +import ( + "bytes" + "io" + "testing" +) + +type StreamTestStruct struct { + ID int32 + Name string + Data []byte +} + +func TestStreamDeserialization(t *testing.T) { + f := New() + f.RegisterStruct(&StreamTestStruct{}, 100) + + original := &StreamTestStruct{ + ID: 42, + Name: "Stream Test", + Data: []byte{1, 2, 3, 4, 5}, + } + + data, err := f.Serialize(original) + if err != nil { + t.Fatalf("Serialize failed: %v", err) + } + + // 1. Test normal reader + reader := bytes.NewReader(data) + var decoded StreamTestStruct + err = f.DeserializeFromReader(reader, &decoded) + if err != nil { + t.Fatalf("DeserializeFromReader failed: %v", err) + } + + if decoded.ID != original.ID || decoded.Name != original.Name || !bytes.Equal(decoded.Data, original.Data) { + t.Errorf("Decoded value mismatch. Got: %+v, Want: %+v", decoded, original) + } +} + +// slowReader returns data byte by byte to test fill() logic and compaction +type slowReader struct { + data []byte + pos int +} + +func (r *slowReader) Read(p []byte) (n int, err error) { + if r.pos >= len(r.data) { + return 0, io.EOF + } + if len(p) == 0 { + return 0, nil + } + p[0] = r.data[r.pos] + r.pos++ + return 1, nil +} + +func TestStreamDeserializationSlow(t *testing.T) { + f := New() + f.RegisterStruct(&StreamTestStruct{}, 100) + + original := &StreamTestStruct{ + ID: 42, + Name: "Slow Stream Test with a reasonably long string and some data to trigger multiple fills", + Data: bytes.Repeat([]byte{0xAA}, 100), + } + + data, err := f.Serialize(original) + if err != nil { + t.Fatalf("Serialize failed: %v", err) + } + + // Test with slow reader and small minCap to force compaction/growth + reader := &slowReader{data: data} + var decoded StreamTestStruct + // Use small minCap (16) to force frequent fills and compactions + f.readCtx.buffer.ResetWithReader(reader, 16) + + err = f.DeserializeFromReader(reader, &decoded) + if err != nil { + t.Fatalf("DeserializeFromReader (slow) failed: %v", err) + } + + if decoded.ID != original.ID || decoded.Name != original.Name || !bytes.Equal(decoded.Data, original.Data) { + t.Errorf("Decoded value mismatch (slow). Got: %+v, Want: %+v", decoded, original) + } +} + +func TestStreamDeserializationEOF(t *testing.T) { + f := New() + f.RegisterStruct(&StreamTestStruct{}, 100) + + original := &StreamTestStruct{ + ID: 42, + Name: "EOF Test", + } + + data, err := f.Serialize(original) + if err != nil { + t.Fatalf("Serialize failed: %v", err) + } + + // Truncate data to cause unexpected EOF during reading Name + truncated := data[:len(data)-2] + reader := bytes.NewReader(truncated) + var decoded StreamTestStruct + err = f.DeserializeFromReader(reader, &decoded) + if err == nil { + t.Fatal("Expected error on truncated stream, got nil") + } + + // Ideally it should be a BufferOutOfBoundError + if _, ok := err.(Error); !ok { + t.Errorf("Expected fory.Error, got %T: %v", err, err) + } +} From a768ac1149e605d191dc29d4fd528451265b20e9 Mon Sep 17 00:00:00 2001 From: ayush00git Date: Fri, 20 Feb 2026 14:59:03 +0530 Subject: [PATCH 06/12] fix ci --- go/fory/stream_test.go | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/go/fory/stream_test.go b/go/fory/stream_test.go index c2da9e1638..7388a78bb6 100644 --- a/go/fory/stream_test.go +++ b/go/fory/stream_test.go @@ -1,3 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + package fory import ( From 1742a5d925445be16002c132abf798f408100cf1 Mon Sep 17 00:00:00 2001 From: ayush00git Date: Wed, 25 Feb 2026 21:16:13 +0530 Subject: [PATCH 07/12] fix(docs): updated compiler guide --- docs/compiler/compiler-guide.md | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/docs/compiler/compiler-guide.md b/docs/compiler/compiler-guide.md index 833ea0d3ff..49a3787b2e 100644 --- a/docs/compiler/compiler-guide.md +++ b/docs/compiler/compiler-guide.md @@ -63,9 +63,10 @@ Compile options: | `--cpp_out=DST_DIR` | Generate C++ code in DST_DIR | (none) | | `--go_out=DST_DIR` | Generate Go code in DST_DIR | (none) | | `--rust_out=DST_DIR` | Generate Rust code in DST_DIR | (none) | -| `--go_nested_type_style` | Go nested type naming: `camelcase` or `underscore` | from schema/default | -| `--emit-fdl` | Print translated Fory IDL for non-`.fdl` inputs | `false` | -| `--emit-fdl-path` | Write translated Fory IDL to a file or directory | (stdout) | +| `--grpc` | Generate gRPC service code (in development) | `false` | +| `--go_nested_type_style` | Go nested type naming: `camelcase` or `underscore` | `underscore` | +| `--emit-fdl` | Emit translated FDL (for non-FDL inputs) | `false` | +| `--emit-fdl-path` | Write translated FDL to this path (file or directory) | (stdout) | Scan options (with `--scan-generated`): @@ -179,6 +180,12 @@ foryc schema.proto --emit-fdl foryc schema.fbs --emit-fdl --emit-fdl-path ./translated ``` +**Generate gRPC service code:** + +```bash +foryc schema.fdl --grpc --lang go,java +``` + ## Import Path Resolution When compiling Fory IDL files with imports, the compiler searches for imported files in this order: From e205d5dca58828bdbca0403152d7aa8822f8f4f8 Mon Sep 17 00:00:00 2001 From: ayush00git Date: Thu, 26 Feb 2026 17:03:54 +0530 Subject: [PATCH 08/12] fix: create a copy of slice while desrlz to prevent overwriting --- go/fory/buffer.go | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/go/fory/buffer.go b/go/fory/buffer.go index e4a43b701e..67641faeb9 100644 --- a/go/fory/buffer.go +++ b/go/fory/buffer.go @@ -392,6 +392,15 @@ func (b *ByteBuffer) ReadBinary(length int, err *Error) []byte { return nil } } + + if b.reader != nil { + // In stream mode, compaction might overwrite these bytes, so we must copy + result := make([]byte, length) + copy(result, b.data[b.readerIndex:b.readerIndex+length]) + b.readerIndex += length + return result + } + v := b.data[b.readerIndex : b.readerIndex+length] b.readerIndex += length return v @@ -1649,6 +1658,15 @@ func (b *ByteBuffer) ReadBytes(n int, err *Error) []byte { return nil } } + + if b.reader != nil { + // In stream mode, compaction might overwrite these bytes, so we must copy + result := make([]byte, n) + copy(result, b.data[b.readerIndex:b.readerIndex+n]) + b.readerIndex += n + return result + } + p := b.data[b.readerIndex : b.readerIndex+n] b.readerIndex += n return p From 4fdb8ecf3e7b38c69164068773e7d781e49c7000 Mon Sep 17 00:00:00 2001 From: ayush00git Date: Thu, 26 Feb 2026 17:12:35 +0530 Subject: [PATCH 09/12] added StreamReader struct and NewStreamReader method which would handle stateful deserialization --- go/fory/fory.go | 80 ++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 79 insertions(+), 1 deletion(-) diff --git a/go/fory/fory.go b/go/fory/fory.go index 86b2e3c041..868a480f2e 100644 --- a/go/fory/fory.go +++ b/go/fory/fory.go @@ -496,9 +496,87 @@ func (f *Fory) Deserialize(data []byte, v any) error { return nil } +// StreamReader supports robust sequential deserialization from a stream. +// It maintains the ByteBuffer and ReadContext state across multiple Deserialize calls, +// preventing data loss from prefetched buffers and preserving TypeResolver metadata +// (Meta Sharing) across object boundaries. +type StreamReader struct { + fory *Fory + reader io.Reader + buffer *ByteBuffer +} + +// NewStreamReader creates a new StreamReader that reads from the provided io.Reader. +// The StreamReader owns the buffer and maintains state across sequential Deserialize calls. +func (f *Fory) NewStreamReader(r io.Reader) *StreamReader { + return f.NewStreamReaderWithMinCap(r, 0) +} + +// NewStreamReaderWithMinCap creates a new StreamReader with a specified minimum buffer capacity. +func (f *Fory) NewStreamReaderWithMinCap(r io.Reader, minCap int) *StreamReader { + buf := NewByteBufferFromReader(r, minCap) + return &StreamReader{ + fory: f, + reader: r, + buffer: buf, + } +} + +// Deserialize reads the next object from the stream into the provided value. +// It uses a shared ReadContext for the lifetime of the StreamReader, clearing +// temporary state between calls but preserving the buffer and TypeResolver state. +func (sr *StreamReader) Deserialize(v any) error { + f := sr.fory + + // We only reset the temporary read state (like refTracker and outOfBand buffers), + // NOT the buffer or the type mapping, which must persist. + defer func() { + f.readCtx.refReader.Reset() + f.readCtx.outOfBandBuffers = nil + f.readCtx.outOfBandIndex = 0 + f.readCtx.err = Error{} + if f.readCtx.refResolver != nil { + f.readCtx.refResolver.resetRead() + } + // Do NOT reset typeResolver here. It must persist across objects in a stream. + }() + + // Temporarily swap buffer + origBuffer := f.readCtx.buffer + f.readCtx.buffer = sr.buffer + + isNull := readHeader(f.readCtx) + if f.readCtx.HasError() { + f.readCtx.buffer = origBuffer + return f.readCtx.TakeError() + } + + if isNull { + f.readCtx.buffer = origBuffer + return nil + } + + target := reflect.ValueOf(v).Elem() + f.readCtx.ReadValue(target, RefModeTracking, true) + if f.readCtx.HasError() { + f.readCtx.buffer = origBuffer + return f.readCtx.TakeError() + } + + // Restore original buffer + f.readCtx.buffer = origBuffer + + return nil +} + +// DeserializeFromReader is deprecated for sequential streaming. Use NewStreamReader instead. +// It deserializes a single object from a stream but will discard prefetched data +// and type metadata after the call. func (f *Fory) DeserializeFromReader(r io.Reader, v any) error { defer f.resetReadState() - f.readCtx.buffer.ResetWithReader(r, 0) + if f.readCtx.buffer.reader != r { + f.readCtx.buffer.ResetWithReader(r, 0) + } isNull := readHeader(f.readCtx) if f.readCtx.HasError() { From 1a8c100a893a20a0cb3ad29bb8e0f2fb7ca5a77c Mon Sep 17 00:00:00 2001 From: ayush00git Date: Thu, 26 Feb 2026 17:13:12 +0530 Subject: [PATCH 10/12] added StreamReader struct and NewStreamReader method which would handle stateful deserialization --- go/fory/fory.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/go/fory/fory.go b/go/fory/fory.go index 868a480f2e..c04e822d5a 100644 --- a/go/fory/fory.go +++ b/go/fory/fory.go @@ -538,7 +538,6 @@ func (sr *StreamReader) Deserialize(v any) error { if f.readCtx.refResolver != nil { f.readCtx.refResolver.resetRead() } - // Do NOT reset typeResolver here. It must persist across objects in a stream. }() // Temporarily swap buffer @@ -569,8 +568,8 @@ func (sr *StreamReader) Deserialize(v any) error { return nil } -// DeserializeFromReader is deprecated for sequential streaming. Use NewStreamReader instead. -// It deserializes a single object from a stream but will discard prefetched data +// For Sequential Streaming use NewStreamReader instead of DeserializeFromReader. +// DeserializeFromReader deserializes a single object from a stream but will discard prefetched data // and type metadata after the call. func (f *Fory) DeserializeFromReader(r io.Reader, v any) error { defer f.resetReadState() From 92c207d4697e9264981db63172747a4f2899ccdc Mon Sep 17 00:00:00 2001 From: ayush00git Date: Thu, 26 Feb 2026 17:14:08 +0530 Subject: [PATCH 11/12] added tests for stream reader --- go/fory/stream_test.go | 55 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 55 insertions(+) diff --git a/go/fory/stream_test.go b/go/fory/stream_test.go index 7388a78bb6..c0392c0b59 100644 --- a/go/fory/stream_test.go +++ b/go/fory/stream_test.go @@ -134,3 +134,58 @@ func TestStreamDeserializationEOF(t *testing.T) { t.Errorf("Expected fory.Error, got %T: %v", err, err) } } + +func TestStreamReaderSequential(t *testing.T) { + f := New() + // Register type in compatible mode to test Meta Sharing across sequential reads + f.config.Compatible = true + f.RegisterStruct(&StreamTestStruct{}, 100) + + msg1 := &StreamTestStruct{ID: 1, Name: "Msg 1", Data: []byte{1, 1}} + msg2 := &StreamTestStruct{ID: 2, Name: "Msg 2", Data: []byte{2, 2}} + msg3 := &StreamTestStruct{ID: 3, Name: "Msg 3", Data: []byte{3, 3}} + + var buf bytes.Buffer + + // Serialize sequentially into one stream + data1, _ := f.Serialize(msg1) + buf.Write(data1) + data2, _ := f.Serialize(msg2) + buf.Write(data2) + data3, _ := f.Serialize(msg3) + buf.Write(data3) + + fDec := New() + fDec.config.Compatible = true + fDec.RegisterStruct(&StreamTestStruct{}, 100) + + // Create a StreamReader + sr := fDec.NewStreamReader(&buf) + + // Deserialize sequentially + var out1, out2, out3 StreamTestStruct + + err := sr.Deserialize(&out1) + if err != nil { + t.Fatalf("Deserialize 1 failed: %v", err) + } + if out1.ID != msg1.ID || out1.Name != msg1.Name || !bytes.Equal(out1.Data, msg1.Data) { + t.Errorf("Msg 1 mismatch. Got: %+v, Want: %+v", out1, msg1) + } + + err = sr.Deserialize(&out2) + if err != nil { + t.Fatalf("Deserialize 2 failed: %v", err) + } + if out2.ID != msg2.ID || out2.Name != msg2.Name || !bytes.Equal(out2.Data, msg2.Data) { + t.Errorf("Msg 2 mismatch. Got: %+v, Want: %+v", out2, msg2) + } + + err = sr.Deserialize(&out3) + if err != nil { + t.Fatalf("Deserialize 3 failed: %v", err) + } + if out3.ID != msg3.ID || out3.Name != msg3.Name || !bytes.Equal(out3.Data, msg3.Data) { + t.Errorf("Msg 3 mismatch. Got: %+v, Want: %+v", out3, msg3) + } +} From d90c55d4ec85ef882edc5f59643d1429d05f1946 Mon Sep 17 00:00:00 2001 From: ayush00git Date: Thu, 26 Feb 2026 17:26:12 +0530 Subject: [PATCH 12/12] code lint checks --- go/fory/stream_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/fory/stream_test.go b/go/fory/stream_test.go index c0392c0b59..e48fa37868 100644 --- a/go/fory/stream_test.go +++ b/go/fory/stream_test.go @@ -146,7 +146,7 @@ func TestStreamReaderSequential(t *testing.T) { msg3 := &StreamTestStruct{ID: 3, Name: "Msg 3", Data: []byte{3, 3}} var buf bytes.Buffer - + // Serialize sequentially into one stream data1, _ := f.Serialize(msg1) buf.Write(data1)