From b03eb54da11484cfbff34bb7b32decbb14a4d0ec Mon Sep 17 00:00:00 2001 From: Ignacio Duart Date: Tue, 24 Mar 2026 10:02:13 +0100 Subject: [PATCH] feat: add streaming refill buffers for WASM contract state transfer Instead of allocating the full state size in WASM linear memory upfront, the host now writes data into small 64KB buffers with a [total_len: u32] header. The contract reads via StreamingBuffer, which calls a host import (__frnt__fill_buffer) to refill the buffer when exhausted. This reduces WASM memory usage from O(state_size) to O(64KB) for large contracts. For small states (<64KB), all data fits in one buffer with zero host callbacks. Changes: - Add StreamingBuffer with std::io::Read impl (contract-side) - Add __frnt__fill_buffer host import declaration - Add bytes_read(), reset_pointers(), last_read_ptr(), last_write_ptr() to BufferBuilder - Make compute_ptr public for host-side use - Update wasm_interface to use streaming reads for all contract operations - Add 7 unit tests for StreamingBuffer (non-refill path) - Bump version to 0.3.4 --- rust/Cargo.toml | 2 +- rust/src/memory.rs | 116 +++++++++++---------- rust/src/memory/buf.rs | 227 ++++++++++++++++++++++++++++++++++++++++- 3 files changed, 284 insertions(+), 61 deletions(-) diff --git a/rust/Cargo.toml b/rust/Cargo.toml index dd08d57..79e5dca 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "freenet-stdlib" -version = "0.3.3" +version = "0.3.4" edition = "2021" rust-version = "1.80" publish = true diff --git a/rust/src/memory.rs b/rust/src/memory.rs index a9aac7a..512d9cb 100644 --- a/rust/src/memory.rs +++ b/rust/src/memory.rs @@ -45,6 +45,20 @@ pub mod wasm_interface { Ok(()) } + use std::io::Read; + + /// Read all bytes from a streaming buffer into a Vec. + fn read_streaming_bytes(ptr: i64) -> Result, ContractInterfaceResult> { + let mut reader = unsafe { super::buf::StreamingBuffer::from_ptr(ptr) }; + let mut bytes = Vec::with_capacity(reader.total_remaining()); + reader.read_to_end(&mut bytes).map_err(|e| { + ContractInterfaceResult::from(Err::(ContractError::Other(format!( + "streaming read failed: {e}" + )))) + })?; + Ok(bytes) + } + pub fn inner_validate_state( parameters: i64, state: i64, @@ -53,31 +67,28 @@ pub mod wasm_interface { if let Err(e) = set_logger().map_err(|e| e.into_raw()) { return e; } - let parameters = unsafe { - let param_buf = &*(parameters as *const super::buf::BufferBuilder); - let bytes = - &*std::ptr::slice_from_raw_parts(param_buf.start(), param_buf.bytes_written()); - Parameters::from(bytes) + let parameters = match read_streaming_bytes(parameters) { + Ok(bytes) => Parameters::from(bytes), + Err(e) => return e.into_raw(), }; - let state = unsafe { - let state_buf = &*(state as *const super::buf::BufferBuilder); - let bytes = - &*std::ptr::slice_from_raw_parts(state_buf.start(), state_buf.bytes_written()); - State::from(bytes) + let state = match read_streaming_bytes(state) { + Ok(bytes) => State::from(bytes), + Err(e) => return e.into_raw(), }; - let related: RelatedContracts = unsafe { - let related = &*(related as *const super::buf::BufferBuilder); - let bytes = &*std::ptr::slice_from_raw_parts(related.start(), related.bytes_written()); - match bincode::deserialize(bytes) { - Ok(v) => v, + let related_bytes = match read_streaming_bytes(related) { + Ok(bytes) => bytes, + Err(e) => return e.into_raw(), + }; + let related: RelatedContracts<'static> = + match bincode::deserialize::(&related_bytes) { + Ok(v) => v.into_owned(), Err(err) => { return ContractInterfaceResult::from(Err::<::core::primitive::bool, _>( ContractError::Deser(format!("{}", err)), )) .into_raw() } - } - }; + }; let result = ::validate_state(parameters, state, related); ContractInterfaceResult::from(result).into_raw() } @@ -90,31 +101,28 @@ pub mod wasm_interface { if let Err(e) = set_logger().map_err(|e| e.into_raw()) { return e; } - let parameters = unsafe { - let param_buf = &mut *(parameters as *mut super::buf::BufferBuilder); - let bytes = - &*std::ptr::slice_from_raw_parts(param_buf.start(), param_buf.bytes_written()); - Parameters::from(bytes) + let parameters = match read_streaming_bytes(parameters) { + Ok(bytes) => Parameters::from(bytes), + Err(e) => return e.into_raw(), + }; + let state = match read_streaming_bytes(state) { + Ok(bytes) => State::from(bytes), + Err(e) => return e.into_raw(), }; - let state = unsafe { - let state_buf = &mut *(state as *mut super::buf::BufferBuilder); - let bytes = - &*std::ptr::slice_from_raw_parts(state_buf.start(), state_buf.bytes_written()); - State::from(bytes) + let updates_bytes = match read_streaming_bytes(updates) { + Ok(bytes) => bytes, + Err(e) => return e.into_raw(), }; - let updates = unsafe { - let updates = &mut *(updates as *mut super::buf::BufferBuilder); - let bytes = &*std::ptr::slice_from_raw_parts(updates.start(), updates.bytes_written()); - match bincode::deserialize(bytes) { - Ok(v) => v, + let updates: Vec> = + match bincode::deserialize::>(&updates_bytes) { + Ok(v) => v.into_iter().map(|u| u.into_owned()).collect(), Err(err) => { return ContractInterfaceResult::from(Err::( ContractError::Deser(format!("{}", err)), )) .into_raw() } - } - }; + }; let result = ::update_state(parameters, state, updates); ContractInterfaceResult::from(result).into_raw() } @@ -123,17 +131,13 @@ pub mod wasm_interface { if let Err(e) = set_logger().map_err(|e| e.into_raw()) { return e; } - let parameters = unsafe { - let param_buf = &mut *(parameters as *mut super::buf::BufferBuilder); - let bytes = - &*std::ptr::slice_from_raw_parts(param_buf.start(), param_buf.bytes_written()); - Parameters::from(bytes) + let parameters = match read_streaming_bytes(parameters) { + Ok(bytes) => Parameters::from(bytes), + Err(e) => return e.into_raw(), }; - let state = unsafe { - let state_buf = &mut *(state as *mut super::buf::BufferBuilder); - let bytes = - &*std::ptr::slice_from_raw_parts(state_buf.start(), state_buf.bytes_written()); - State::from(bytes) + let state = match read_streaming_bytes(state) { + Ok(bytes) => State::from(bytes), + Err(e) => return e.into_raw(), }; let summary = ::summarize_state(parameters, state); ContractInterfaceResult::from(summary).into_raw() @@ -147,23 +151,17 @@ pub mod wasm_interface { if let Err(e) = set_logger().map_err(|e| e.into_raw()) { return e; } - let parameters = unsafe { - let param_buf = &mut *(parameters as *mut super::buf::BufferBuilder); - let bytes = - &*std::ptr::slice_from_raw_parts(param_buf.start(), param_buf.bytes_written()); - Parameters::from(bytes) + let parameters = match read_streaming_bytes(parameters) { + Ok(bytes) => Parameters::from(bytes), + Err(e) => return e.into_raw(), }; - let state = unsafe { - let state_buf = &mut *(state as *mut super::buf::BufferBuilder); - let bytes = - &*std::ptr::slice_from_raw_parts(state_buf.start(), state_buf.bytes_written()); - State::from(bytes) + let state = match read_streaming_bytes(state) { + Ok(bytes) => State::from(bytes), + Err(e) => return e.into_raw(), }; - let summary = unsafe { - let summary_buf = &mut *(summary as *mut super::buf::BufferBuilder); - let bytes = - &*std::ptr::slice_from_raw_parts(summary_buf.start(), summary_buf.bytes_written()); - StateSummary::from(bytes) + let summary = match read_streaming_bytes(summary) { + Ok(bytes) => StateSummary::from(bytes), + Err(e) => return e.into_raw(), }; let new_delta = ::get_state_delta(parameters, state, summary); ContractInterfaceResult::from(new_delta).into_raw() diff --git a/rust/src/memory/buf.rs b/rust/src/memory/buf.rs index 46d9e4a..9de06fd 100644 --- a/rust/src/memory/buf.rs +++ b/rust/src/memory/buf.rs @@ -32,11 +32,36 @@ impl BufferBuilder { unsafe { *(self.last_write as *mut u32) as usize } } + /// Returns the number of bytes read from the buffer. + #[cfg(feature = "contract")] + pub fn bytes_read(&self) -> usize { + unsafe { *(self.last_read as *mut u32) as usize } + } + + /// Resets the read and write pointers to 0 (contract-side). + #[cfg(feature = "contract")] + pub fn reset_pointers(&mut self) { + unsafe { + *(self.last_read as *mut u32) = 0; + *(self.last_write as *mut u32) = 0; + } + } + /// Returns the first byte of buffer. pub fn start(&self) -> *mut u8 { self.start as _ } + /// Returns the raw pointer to the read position tracker (for host-side reset). + pub fn last_read_ptr(&self) -> *mut u32 { + self.last_read as *mut u32 + } + + /// Returns the raw pointer to the write position tracker (for host-side reset). + pub fn last_write_ptr(&self) -> *mut u32 { + self.last_write as *mut u32 + } + /// # Safety /// Requires that there are no living references to the current /// underlying buffer or will trigger UB @@ -206,7 +231,7 @@ impl std::io::Write for BufferMut<'_> { } #[inline(always)] -pub(crate) fn compute_ptr(ptr: *mut T, linear_mem_space: &WasmLinearMem) -> *mut T { +pub fn compute_ptr(ptr: *mut T, linear_mem_space: &WasmLinearMem) -> *mut T { let mem_start_ptr = linear_mem_space.start_ptr; (mem_start_ptr as isize + ptr as isize) as _ } @@ -323,6 +348,104 @@ impl<'instance> Buffer<'instance> { } } +// --------------------------------------------------------------------------- +// Streaming refill buffer (contract-side only) +// --------------------------------------------------------------------------- + +// Host import for refilling a buffer. Called by the contract when it has +// exhausted the current buffer contents and needs more data from the host. +// Returns the number of bytes the host wrote into the buffer, or 0 for EOF. +#[cfg(all(feature = "contract", not(test)))] +#[link(wasm_import_module = "freenet_contract_io")] +extern "C" { + fn __frnt__fill_buffer(id: i64, buf_ptr: i64) -> u32; +} + +// Test stub: returns 0 (EOF) since tests don't have a WASM host. +// This means tests can only exercise the non-refill path. +#[cfg(all(feature = "contract", test))] +unsafe extern "C" fn __frnt__fill_buffer(_id: i64, _buf_ptr: i64) -> u32 { + 0 +} + +/// Contract-side streaming reader for refill-pattern buffers. +/// +/// The host writes a `[total_len: u32]` header followed by as much data as +/// fits into a small buffer. The contract reads through this wrapper; when +/// the buffer is exhausted, [`Read::read`] calls the host to refill it. +#[cfg(feature = "contract")] +pub struct StreamingBuffer { + buf_ptr: *mut BufferBuilder, + /// Total bytes remaining to be read (initialized from the header). + total_remaining: usize, +} + +#[cfg(feature = "contract")] +impl StreamingBuffer { + /// Create a streaming reader from a buffer pointer. + /// + /// Reads the `[total_len: u32]` header and prepares for streaming. + /// + /// # Safety + /// `ptr` must point to a valid `BufferBuilder` in WASM linear memory + /// whose first 4 bytes of data contain the total payload length as LE u32. + /// Returns the total number of payload bytes remaining to be read. + pub fn total_remaining(&self) -> usize { + self.total_remaining + } + + pub unsafe fn from_ptr(ptr: i64) -> Self { + let buf_ptr = ptr as *mut BufferBuilder; + let builder = &*buf_ptr; + // Read the total_len header (first 4 bytes) + let data_start = builder.start() as *const u8; + let total_len = u32::from_le_bytes([ + *data_start, + *data_start.add(1), + *data_start.add(2), + *data_start.add(3), + ]) as usize; + // Advance the read pointer past the header + let read_ptr = builder.last_read as *mut u32; + *read_ptr = 4; + StreamingBuffer { + buf_ptr, + total_remaining: total_len, + } + } +} + +#[cfg(feature = "contract")] +impl std::io::Read for StreamingBuffer { + fn read(&mut self, out: &mut [u8]) -> std::io::Result { + if self.total_remaining == 0 { + return Ok(0); // EOF — all expected data has been read + } + let builder = unsafe { &*self.buf_ptr }; + let mut available = builder.bytes_written().saturating_sub(builder.bytes_read()); + if available == 0 { + // Buffer exhausted — ask host to refill + let filled = + unsafe { __frnt__fill_buffer(crate::global::INSTANCE_ID, self.buf_ptr as i64) }; + if filled == 0 { + return Ok(0); // Host says EOF + } + available = filled as usize; + } + let n = out.len().min(available).min(self.total_remaining); + // Copy from buffer at current read position + let read_pos = builder.bytes_read(); + unsafe { + let src = builder.start().add(read_pos); + std::ptr::copy_nonoverlapping(src, out.as_mut_ptr(), n); + // Advance the read pointer + *(builder.last_read as *mut u32) = (read_pos + n) as u32; + } + self.total_remaining -= n; + Ok(n) + } +} + /// Returns the pointer to a new BufferBuilder. /// /// This buffer leaks it's own memory and will only be freed by the runtime when a contract instance is dropped. @@ -451,6 +574,108 @@ mod test_io_write { } } +/// Tests for StreamingBuffer (contract-side reader). +/// These test the non-refill path only — when all data fits in the initial buffer. +/// The refill path requires a WASM runtime and is tested via integration tests. +#[cfg(all(test, feature = "contract"))] +mod test_streaming_read { + use super::*; + use std::io::Read; + + /// Create a host-memory buffer pre-loaded with `[total_len: u32 LE][data...]`. + unsafe fn host_streaming_buffer(data: &[u8]) -> StreamingBuffer { + let total_with_header = data.len() + 4; + let ptr = __frnt__initiate_buffer(total_with_header as u32); + let builder = &mut *(ptr as *mut BufferBuilder); + + // Write total_len header (LE u32) + let header = (data.len() as u32).to_le_bytes(); + let start = builder.start(); + std::ptr::copy_nonoverlapping(header.as_ptr(), start, 4); + std::ptr::copy_nonoverlapping(data.as_ptr(), start.add(4), data.len()); + + // Set write pointer to total bytes written + *(builder.last_write as *mut u32) = total_with_header as u32; + + StreamingBuffer::from_ptr(ptr) + } + + #[test] + fn read_basic() { + let data = b"hello streaming"; + let mut reader = unsafe { host_streaming_buffer(data) }; + let mut out = vec![0u8; data.len()]; + reader.read_exact(&mut out).unwrap(); + assert_eq!(&out, data); + } + + #[test] + fn read_to_end_collects_all() { + let data = b"the quick brown fox jumps over the lazy dog"; + let mut reader = unsafe { host_streaming_buffer(data) }; + let mut out = Vec::new(); + reader.read_to_end(&mut out).unwrap(); + assert_eq!(&out, data); + } + + #[test] + fn read_empty_payload() { + let mut reader = unsafe { host_streaming_buffer(b"") }; + let mut out = Vec::new(); + let n = reader.read_to_end(&mut out).unwrap(); + assert_eq!(n, 0); + assert!(out.is_empty()); + } + + #[test] + fn read_in_small_chunks() { + let data = b"abcdefghij"; + let mut reader = unsafe { host_streaming_buffer(data) }; + let mut result = Vec::new(); + let mut buf = [0u8; 3]; + loop { + let n = reader.read(&mut buf).unwrap(); + if n == 0 { + break; + } + result.extend_from_slice(&buf[..n]); + } + assert_eq!(&result, data); + } + + #[test] + fn total_remaining_decreases() { + let data = b"1234567890"; + let mut reader = unsafe { host_streaming_buffer(data) }; + assert_eq!(reader.total_remaining(), 10); + let mut buf = [0u8; 4]; + reader.read(&mut buf).unwrap(); + assert_eq!(reader.total_remaining(), 6); + } + + #[test] + fn eof_after_all_read() { + let data = b"abc"; + let mut reader = unsafe { host_streaming_buffer(data) }; + let mut out = vec![0u8; 3]; + reader.read_exact(&mut out).unwrap(); + assert_eq!(reader.total_remaining(), 0); + let n = reader.read(&mut out).unwrap(); + assert_eq!(n, 0); + } + + #[test] + fn bincode_roundtrip_through_streaming() { + let original: Vec = vec![42, 99, 1337, 0, u32::MAX]; + let serialized = bincode::serialize(&original).unwrap(); + let mut reader = unsafe { host_streaming_buffer(&serialized) }; + let mut bytes = Vec::with_capacity(reader.total_remaining()); + reader.read_to_end(&mut bytes).unwrap(); + let result: Vec = bincode::deserialize(&bytes).unwrap(); + assert_eq!(result, original); + } +} + #[cfg(all(test, any(unix, windows), feature = "wasmer-tests"))] mod test { use super::*;