Skip to content

Commit 78a59c7

Browse files
committed
conflict resolve
1 parent d9276c9 commit 78a59c7

4 files changed

Lines changed: 164 additions & 9 deletions

File tree

rust/fory-core/src/buffer.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1167,3 +1167,5 @@ impl<'a> Reader<'a> {
11671167

11681168
#[allow(clippy::needless_lifetimes)]
11691169
unsafe impl<'a> Send for Reader<'a> {}
1170+
#[allow(clippy::needless_lifetimes)]
1171+
unsafe impl<'a> Sync for Reader<'a> {}

rust/fory-core/src/fory.rs

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -982,8 +982,17 @@ impl Fory {
982982
// SAFETY: same invariant as Reader::from_stream and fill_to:
983983
// bf points into Box-owned stream buffer, owned by reader.stream,
984984
// which lives as long as reader.
985-
if let Some(ref s) = reader.stream {
985+
if let Some(ref mut s) = reader.stream {
986+
// Sync stream's read_pos with the reader cursor position
987+
// before shrinking — the detached reader may have advanced
988+
// cursor without updating stream.read_pos.
989+
let _ = s.set_reader_index(reader.cursor);
990+
// Mirror C++ StreamShrinkGuard: compact consumed bytes after
991+
// deserialization to prevent unbounded buffer growth on
992+
// long-lived streams.
993+
s.shrink_buffer();
986994
reader.bf = unsafe { std::slice::from_raw_parts(s.data(), s.size()) };
995+
reader.cursor = s.reader_index();
987996
}
988997
result
989998
} else {
@@ -1027,7 +1036,11 @@ impl Fory {
10271036
let reader = Reader::from_stream(stream);
10281037
context.attach_reader(reader);
10291038
let result = self.deserialize_with_context(context);
1030-
context.detach_reader();
1039+
// Mirror C++ StreamShrinkGuard: shrink_buffer on detach.
1040+
let mut returned = context.detach_reader();
1041+
if let Some(ref mut s) = returned.stream {
1042+
s.shrink_buffer();
1043+
}
10311044
result
10321045
})
10331046
}

rust/fory-core/src/stream.rs

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ pub struct ForyStreamBuf {
3434
valid_len: usize,
3535
/// Current read cursor — equivalent of `gptr() - eback()`
3636
read_pos: usize,
37+
/// Initial capacity for shrink_buffer target — mirrors C++ `initial_buffer_size_`
38+
initial_buffer_size: usize,
3739
}
3840

3941
impl ForyStreamBuf {
@@ -51,6 +53,7 @@ impl ForyStreamBuf {
5153
buffer,
5254
valid_len: 0,
5355
read_pos: 0,
56+
initial_buffer_size: cap,
5457
}
5558
}
5659

@@ -198,6 +201,44 @@ impl ForyStreamBuf {
198201
pub fn is_stream_backed(&self) -> bool {
199202
true
200203
}
204+
205+
/// Compact consumed bytes and optionally shrink capacity.
206+
///
207+
/// Mirrors C++ `ForyInputStream::shrink_buffer()` exactly:
208+
/// 1. Memmove remaining bytes to front of buffer
209+
/// 2. Reset read_pos = 0, valid_len = remaining
210+
/// 3. If capacity > initial_buffer_size and utilization is low,
211+
/// shrink back toward initial size
212+
pub fn shrink_buffer(&mut self) {
213+
let remaining = self.remaining();
214+
215+
// Phase 1: compact — memmove remaining data to front
216+
if self.read_pos > 0 {
217+
if remaining > 0 {
218+
self.buffer.copy_within(self.read_pos..self.valid_len, 0);
219+
}
220+
self.read_pos = 0;
221+
self.valid_len = remaining;
222+
}
223+
224+
// Phase 2: optionally shrink capacity back toward initial_buffer_size
225+
let current_capacity = self.buffer.len();
226+
let mut target_capacity = current_capacity;
227+
228+
if current_capacity > self.initial_buffer_size {
229+
if remaining == 0 {
230+
target_capacity = self.initial_buffer_size;
231+
} else if remaining <= current_capacity / 4 {
232+
let doubled = remaining.saturating_mul(2).max(1);
233+
target_capacity = self.initial_buffer_size.max(doubled);
234+
}
235+
}
236+
237+
if target_capacity < current_capacity {
238+
self.buffer.truncate(target_capacity);
239+
self.buffer.shrink_to_fit();
240+
}
241+
}
201242
}
202243

203244
#[cfg(test)]

rust/tests/tests/stream_test.rs

Lines changed: 106 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,13 @@ mod stream_tests {
2020
use fory_core::buffer::Reader;
2121
use fory_core::stream::ForyStreamBuf;
2222
use fory_core::Fory;
23+
use std::fmt::Debug;
2324
use std::io::Cursor;
2425

26+
// ========================================================================
27+
// OneByteStream — mirrors C++ OneByteStreamBuf / OneByteIStream
28+
// Delivers exactly 1 byte per read() call for maximum streaming stress.
29+
// ========================================================================
2530
struct OneByte(Cursor<Vec<u8>>);
2631
impl std::io::Read for OneByte {
2732
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
@@ -39,22 +44,57 @@ mod stream_tests {
3944
}
4045
}
4146

47+
// ========================================================================
48+
// Deserialize helper — per maintainer requirement:
49+
// "Create a Deserialize help methods in tests, then use that instead of
50+
// fory.Deserialize for deserialization, and in the Deserialize test
51+
// helper, first deserialize from bytes, then wrap it into a
52+
// OneByteStream to deserialize it to ensure deserialization works."
53+
// ========================================================================
54+
fn deserialize_helper<T>(fory: &Fory, bytes: &[u8]) -> T
55+
where
56+
T: fory_core::Serializer + fory_core::ForyDefault + PartialEq + Debug,
57+
{
58+
// Path 1: deserialize from bytes (standard in-memory path)
59+
let from_bytes: T = fory.deserialize(bytes).expect("bytes deserialize failed");
60+
61+
// Path 2: deserialize from OneByteStream (streaming path)
62+
let from_stream: T = fory
63+
.deserialize_from_stream(OneByte(Cursor::new(bytes.to_vec())))
64+
.expect("stream deserialize failed");
65+
66+
// Assert both paths produce the same result
67+
assert_eq!(
68+
from_bytes, from_stream,
69+
"bytes vs stream deserialization mismatch"
70+
);
71+
72+
from_bytes
73+
}
74+
75+
// ========================================================================
76+
// Test: PrimitiveAndStringRoundTrip
77+
// Mirrors C++ StreamSerializationTest::PrimitiveAndStringRoundTrip
78+
// ========================================================================
4279
#[test]
43-
fn test_primitive_stream_roundtrip() {
80+
fn test_primitive_and_string_round_trip() {
4481
let fory = Fory::default();
82+
83+
// i64 round-trip
4584
let bytes = fory.serialize(&-9876543212345i64).unwrap();
46-
let result: i64 = fory
47-
.deserialize_from_stream(OneByte(Cursor::new(bytes)))
48-
.unwrap();
85+
let result = deserialize_helper::<i64>(&fory, &bytes);
4986
assert_eq!(result, -9876543212345i64);
5087

88+
// String round-trip (with unicode)
5189
let bytes = fory.serialize(&"stream-hello-世界".to_string()).unwrap();
52-
let result: String = fory
53-
.deserialize_from_stream(OneByte(Cursor::new(bytes)))
54-
.unwrap();
90+
let result = deserialize_helper::<String>(&fory, &bytes);
5591
assert_eq!(result, "stream-hello-世界");
5692
}
5793

94+
// ========================================================================
95+
// Test: SequentialDeserializeFromSingleStream
96+
// Mirrors C++ StreamSerializationTest::SequentialDeserializeFromSingleStream
97+
// ========================================================================
5898
#[test]
5999
fn test_sequential_stream_reads() {
60100
let fory = Fory::default();
@@ -74,6 +114,10 @@ mod stream_tests {
74114
assert_eq!(third, 99);
75115
}
76116

117+
// ========================================================================
118+
// Test: TruncatedStreamReturnsError
119+
// Mirrors C++ StreamSerializationTest::TruncatedStreamReturnsError
120+
// ========================================================================
77121
#[test]
78122
fn test_truncated_stream_returns_error() {
79123
let fory = Fory::default();
@@ -82,4 +126,59 @@ mod stream_tests {
82126
let result: Result<String, _> = fory.deserialize_from_stream(Cursor::new(bytes));
83127
assert!(result.is_err());
84128
}
129+
130+
// ========================================================================
131+
// Test: ShrinkBuffer compacts consumed bytes
132+
// Validates the C++ shrink_buffer() behavior is correctly implemented
133+
// ========================================================================
134+
#[test]
135+
fn test_shrink_buffer_compacts_consumed_bytes() {
136+
let fory = Fory::default();
137+
138+
// Serialize multiple values into a single buffer
139+
let mut bytes = Vec::new();
140+
fory.serialize_to(&mut bytes, &42i32).unwrap();
141+
fory.serialize_to(&mut bytes, &"shrink-test".to_string())
142+
.unwrap();
143+
fory.serialize_to(&mut bytes, &100i64).unwrap();
144+
145+
// Use a small initial buffer to force multiple fills
146+
let mut reader =
147+
Reader::from_stream(ForyStreamBuf::with_capacity(OneByte(Cursor::new(bytes)), 4));
148+
149+
// After each deserialize_from, shrink_buffer should compact the stream.
150+
let first: i32 = fory.deserialize_from(&mut reader).unwrap();
151+
assert_eq!(first, 42);
152+
153+
let second: String = fory.deserialize_from(&mut reader).unwrap();
154+
assert_eq!(second, "shrink-test");
155+
156+
let third: i64 = fory.deserialize_from(&mut reader).unwrap();
157+
assert_eq!(third, 100);
158+
}
159+
160+
// ========================================================================
161+
// Test: Additional primitive types through deserialize_helper
162+
// ========================================================================
163+
#[test]
164+
fn test_additional_primitive_types() {
165+
let fory = Fory::default();
166+
167+
// bool
168+
let bytes = fory.serialize(&true).unwrap();
169+
assert_eq!(deserialize_helper::<bool>(&fory, &bytes), true);
170+
171+
// i32
172+
let bytes = fory.serialize(&-42i32).unwrap();
173+
assert_eq!(deserialize_helper::<i32>(&fory, &bytes), -42i32);
174+
175+
// f64
176+
let bytes = fory.serialize(&3.14159f64).unwrap();
177+
assert_eq!(deserialize_helper::<f64>(&fory, &bytes), 3.14159f64);
178+
179+
// Vec<i32>
180+
let vec = vec![1i32, 2, 3, 5, 8];
181+
let bytes = fory.serialize(&vec).unwrap();
182+
assert_eq!(deserialize_helper::<Vec<i32>>(&fory, &bytes), vec);
183+
}
85184
}

0 commit comments

Comments
 (0)