diff --git a/rust/Cargo.toml b/rust/Cargo.toml index dd08d57..79e5dca 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "freenet-stdlib" -version = "0.3.3" +version = "0.3.4" edition = "2021" rust-version = "1.80" publish = true diff --git a/rust/src/memory.rs b/rust/src/memory.rs index a9aac7a..512d9cb 100644 --- a/rust/src/memory.rs +++ b/rust/src/memory.rs @@ -45,6 +45,20 @@ pub mod wasm_interface { Ok(()) } + use std::io::Read; + + /// Read all bytes from a streaming buffer into a Vec. + fn read_streaming_bytes(ptr: i64) -> Result, ContractInterfaceResult> { + let mut reader = unsafe { super::buf::StreamingBuffer::from_ptr(ptr) }; + let mut bytes = Vec::with_capacity(reader.total_remaining()); + reader.read_to_end(&mut bytes).map_err(|e| { + ContractInterfaceResult::from(Err::(ContractError::Other(format!( + "streaming read failed: {e}" + )))) + })?; + Ok(bytes) + } + pub fn inner_validate_state( parameters: i64, state: i64, @@ -53,31 +67,28 @@ pub mod wasm_interface { if let Err(e) = set_logger().map_err(|e| e.into_raw()) { return e; } - let parameters = unsafe { - let param_buf = &*(parameters as *const super::buf::BufferBuilder); - let bytes = - &*std::ptr::slice_from_raw_parts(param_buf.start(), param_buf.bytes_written()); - Parameters::from(bytes) + let parameters = match read_streaming_bytes(parameters) { + Ok(bytes) => Parameters::from(bytes), + Err(e) => return e.into_raw(), }; - let state = unsafe { - let state_buf = &*(state as *const super::buf::BufferBuilder); - let bytes = - &*std::ptr::slice_from_raw_parts(state_buf.start(), state_buf.bytes_written()); - State::from(bytes) + let state = match read_streaming_bytes(state) { + Ok(bytes) => State::from(bytes), + Err(e) => return e.into_raw(), }; - let related: RelatedContracts = unsafe { - let related = &*(related as *const super::buf::BufferBuilder); - let bytes = &*std::ptr::slice_from_raw_parts(related.start(), related.bytes_written()); - match bincode::deserialize(bytes) { - Ok(v) => v, + let related_bytes = match read_streaming_bytes(related) { + Ok(bytes) => bytes, + Err(e) => return e.into_raw(), + }; + let related: RelatedContracts<'static> = + match bincode::deserialize::(&related_bytes) { + Ok(v) => v.into_owned(), Err(err) => { return ContractInterfaceResult::from(Err::<::core::primitive::bool, _>( ContractError::Deser(format!("{}", err)), )) .into_raw() } - } - }; + }; let result = ::validate_state(parameters, state, related); ContractInterfaceResult::from(result).into_raw() } @@ -90,31 +101,28 @@ pub mod wasm_interface { if let Err(e) = set_logger().map_err(|e| e.into_raw()) { return e; } - let parameters = unsafe { - let param_buf = &mut *(parameters as *mut super::buf::BufferBuilder); - let bytes = - &*std::ptr::slice_from_raw_parts(param_buf.start(), param_buf.bytes_written()); - Parameters::from(bytes) + let parameters = match read_streaming_bytes(parameters) { + Ok(bytes) => Parameters::from(bytes), + Err(e) => return e.into_raw(), + }; + let state = match read_streaming_bytes(state) { + Ok(bytes) => State::from(bytes), + Err(e) => return e.into_raw(), }; - let state = unsafe { - let state_buf = &mut *(state as *mut super::buf::BufferBuilder); - let bytes = - &*std::ptr::slice_from_raw_parts(state_buf.start(), state_buf.bytes_written()); - State::from(bytes) + let updates_bytes = match read_streaming_bytes(updates) { + Ok(bytes) => bytes, + Err(e) => return e.into_raw(), }; - let updates = unsafe { - let updates = &mut *(updates as *mut super::buf::BufferBuilder); - let bytes = &*std::ptr::slice_from_raw_parts(updates.start(), updates.bytes_written()); - match bincode::deserialize(bytes) { - Ok(v) => v, + let updates: Vec> = + match bincode::deserialize::>(&updates_bytes) { + Ok(v) => v.into_iter().map(|u| u.into_owned()).collect(), Err(err) => { return ContractInterfaceResult::from(Err::( ContractError::Deser(format!("{}", err)), )) .into_raw() } - } - }; + }; let result = ::update_state(parameters, state, updates); ContractInterfaceResult::from(result).into_raw() } @@ -123,17 +131,13 @@ pub mod wasm_interface { if let Err(e) = set_logger().map_err(|e| e.into_raw()) { return e; } - let parameters = unsafe { - let param_buf = &mut *(parameters as *mut super::buf::BufferBuilder); - let bytes = - &*std::ptr::slice_from_raw_parts(param_buf.start(), param_buf.bytes_written()); - Parameters::from(bytes) + let parameters = match read_streaming_bytes(parameters) { + Ok(bytes) => Parameters::from(bytes), + Err(e) => return e.into_raw(), }; - let state = unsafe { - let state_buf = &mut *(state as *mut super::buf::BufferBuilder); - let bytes = - &*std::ptr::slice_from_raw_parts(state_buf.start(), state_buf.bytes_written()); - State::from(bytes) + let state = match read_streaming_bytes(state) { + Ok(bytes) => State::from(bytes), + Err(e) => return e.into_raw(), }; let summary = ::summarize_state(parameters, state); ContractInterfaceResult::from(summary).into_raw() @@ -147,23 +151,17 @@ pub mod wasm_interface { if let Err(e) = set_logger().map_err(|e| e.into_raw()) { return e; } - let parameters = unsafe { - let param_buf = &mut *(parameters as *mut super::buf::BufferBuilder); - let bytes = - &*std::ptr::slice_from_raw_parts(param_buf.start(), param_buf.bytes_written()); - Parameters::from(bytes) + let parameters = match read_streaming_bytes(parameters) { + Ok(bytes) => Parameters::from(bytes), + Err(e) => return e.into_raw(), }; - let state = unsafe { - let state_buf = &mut *(state as *mut super::buf::BufferBuilder); - let bytes = - &*std::ptr::slice_from_raw_parts(state_buf.start(), state_buf.bytes_written()); - State::from(bytes) + let state = match read_streaming_bytes(state) { + Ok(bytes) => State::from(bytes), + Err(e) => return e.into_raw(), }; - let summary = unsafe { - let summary_buf = &mut *(summary as *mut super::buf::BufferBuilder); - let bytes = - &*std::ptr::slice_from_raw_parts(summary_buf.start(), summary_buf.bytes_written()); - StateSummary::from(bytes) + let summary = match read_streaming_bytes(summary) { + Ok(bytes) => StateSummary::from(bytes), + Err(e) => return e.into_raw(), }; let new_delta = ::get_state_delta(parameters, state, summary); ContractInterfaceResult::from(new_delta).into_raw() diff --git a/rust/src/memory/buf.rs b/rust/src/memory/buf.rs index 46d9e4a..9de06fd 100644 --- a/rust/src/memory/buf.rs +++ b/rust/src/memory/buf.rs @@ -32,11 +32,36 @@ impl BufferBuilder { unsafe { *(self.last_write as *mut u32) as usize } } + /// Returns the number of bytes read from the buffer. + #[cfg(feature = "contract")] + pub fn bytes_read(&self) -> usize { + unsafe { *(self.last_read as *mut u32) as usize } + } + + /// Resets the read and write pointers to 0 (contract-side). + #[cfg(feature = "contract")] + pub fn reset_pointers(&mut self) { + unsafe { + *(self.last_read as *mut u32) = 0; + *(self.last_write as *mut u32) = 0; + } + } + /// Returns the first byte of buffer. pub fn start(&self) -> *mut u8 { self.start as _ } + /// Returns the raw pointer to the read position tracker (for host-side reset). + pub fn last_read_ptr(&self) -> *mut u32 { + self.last_read as *mut u32 + } + + /// Returns the raw pointer to the write position tracker (for host-side reset). + pub fn last_write_ptr(&self) -> *mut u32 { + self.last_write as *mut u32 + } + /// # Safety /// Requires that there are no living references to the current /// underlying buffer or will trigger UB @@ -206,7 +231,7 @@ impl std::io::Write for BufferMut<'_> { } #[inline(always)] -pub(crate) fn compute_ptr(ptr: *mut T, linear_mem_space: &WasmLinearMem) -> *mut T { +pub fn compute_ptr(ptr: *mut T, linear_mem_space: &WasmLinearMem) -> *mut T { let mem_start_ptr = linear_mem_space.start_ptr; (mem_start_ptr as isize + ptr as isize) as _ } @@ -323,6 +348,104 @@ impl<'instance> Buffer<'instance> { } } +// --------------------------------------------------------------------------- +// Streaming refill buffer (contract-side only) +// --------------------------------------------------------------------------- + +// Host import for refilling a buffer. Called by the contract when it has +// exhausted the current buffer contents and needs more data from the host. +// Returns the number of bytes the host wrote into the buffer, or 0 for EOF. +#[cfg(all(feature = "contract", not(test)))] +#[link(wasm_import_module = "freenet_contract_io")] +extern "C" { + fn __frnt__fill_buffer(id: i64, buf_ptr: i64) -> u32; +} + +// Test stub: returns 0 (EOF) since tests don't have a WASM host. +// This means tests can only exercise the non-refill path. +#[cfg(all(feature = "contract", test))] +unsafe extern "C" fn __frnt__fill_buffer(_id: i64, _buf_ptr: i64) -> u32 { + 0 +} + +/// Contract-side streaming reader for refill-pattern buffers. +/// +/// The host writes a `[total_len: u32]` header followed by as much data as +/// fits into a small buffer. The contract reads through this wrapper; when +/// the buffer is exhausted, [`Read::read`] calls the host to refill it. +#[cfg(feature = "contract")] +pub struct StreamingBuffer { + buf_ptr: *mut BufferBuilder, + /// Total bytes remaining to be read (initialized from the header). + total_remaining: usize, +} + +#[cfg(feature = "contract")] +impl StreamingBuffer { + /// Create a streaming reader from a buffer pointer. + /// + /// Reads the `[total_len: u32]` header and prepares for streaming. + /// + /// # Safety + /// `ptr` must point to a valid `BufferBuilder` in WASM linear memory + /// whose first 4 bytes of data contain the total payload length as LE u32. + /// Returns the total number of payload bytes remaining to be read. + pub fn total_remaining(&self) -> usize { + self.total_remaining + } + + pub unsafe fn from_ptr(ptr: i64) -> Self { + let buf_ptr = ptr as *mut BufferBuilder; + let builder = &*buf_ptr; + // Read the total_len header (first 4 bytes) + let data_start = builder.start() as *const u8; + let total_len = u32::from_le_bytes([ + *data_start, + *data_start.add(1), + *data_start.add(2), + *data_start.add(3), + ]) as usize; + // Advance the read pointer past the header + let read_ptr = builder.last_read as *mut u32; + *read_ptr = 4; + StreamingBuffer { + buf_ptr, + total_remaining: total_len, + } + } +} + +#[cfg(feature = "contract")] +impl std::io::Read for StreamingBuffer { + fn read(&mut self, out: &mut [u8]) -> std::io::Result { + if self.total_remaining == 0 { + return Ok(0); // EOF — all expected data has been read + } + let builder = unsafe { &*self.buf_ptr }; + let mut available = builder.bytes_written().saturating_sub(builder.bytes_read()); + if available == 0 { + // Buffer exhausted — ask host to refill + let filled = + unsafe { __frnt__fill_buffer(crate::global::INSTANCE_ID, self.buf_ptr as i64) }; + if filled == 0 { + return Ok(0); // Host says EOF + } + available = filled as usize; + } + let n = out.len().min(available).min(self.total_remaining); + // Copy from buffer at current read position + let read_pos = builder.bytes_read(); + unsafe { + let src = builder.start().add(read_pos); + std::ptr::copy_nonoverlapping(src, out.as_mut_ptr(), n); + // Advance the read pointer + *(builder.last_read as *mut u32) = (read_pos + n) as u32; + } + self.total_remaining -= n; + Ok(n) + } +} + /// Returns the pointer to a new BufferBuilder. /// /// This buffer leaks it's own memory and will only be freed by the runtime when a contract instance is dropped. @@ -451,6 +574,108 @@ mod test_io_write { } } +/// Tests for StreamingBuffer (contract-side reader). +/// These test the non-refill path only — when all data fits in the initial buffer. +/// The refill path requires a WASM runtime and is tested via integration tests. +#[cfg(all(test, feature = "contract"))] +mod test_streaming_read { + use super::*; + use std::io::Read; + + /// Create a host-memory buffer pre-loaded with `[total_len: u32 LE][data...]`. + unsafe fn host_streaming_buffer(data: &[u8]) -> StreamingBuffer { + let total_with_header = data.len() + 4; + let ptr = __frnt__initiate_buffer(total_with_header as u32); + let builder = &mut *(ptr as *mut BufferBuilder); + + // Write total_len header (LE u32) + let header = (data.len() as u32).to_le_bytes(); + let start = builder.start(); + std::ptr::copy_nonoverlapping(header.as_ptr(), start, 4); + std::ptr::copy_nonoverlapping(data.as_ptr(), start.add(4), data.len()); + + // Set write pointer to total bytes written + *(builder.last_write as *mut u32) = total_with_header as u32; + + StreamingBuffer::from_ptr(ptr) + } + + #[test] + fn read_basic() { + let data = b"hello streaming"; + let mut reader = unsafe { host_streaming_buffer(data) }; + let mut out = vec![0u8; data.len()]; + reader.read_exact(&mut out).unwrap(); + assert_eq!(&out, data); + } + + #[test] + fn read_to_end_collects_all() { + let data = b"the quick brown fox jumps over the lazy dog"; + let mut reader = unsafe { host_streaming_buffer(data) }; + let mut out = Vec::new(); + reader.read_to_end(&mut out).unwrap(); + assert_eq!(&out, data); + } + + #[test] + fn read_empty_payload() { + let mut reader = unsafe { host_streaming_buffer(b"") }; + let mut out = Vec::new(); + let n = reader.read_to_end(&mut out).unwrap(); + assert_eq!(n, 0); + assert!(out.is_empty()); + } + + #[test] + fn read_in_small_chunks() { + let data = b"abcdefghij"; + let mut reader = unsafe { host_streaming_buffer(data) }; + let mut result = Vec::new(); + let mut buf = [0u8; 3]; + loop { + let n = reader.read(&mut buf).unwrap(); + if n == 0 { + break; + } + result.extend_from_slice(&buf[..n]); + } + assert_eq!(&result, data); + } + + #[test] + fn total_remaining_decreases() { + let data = b"1234567890"; + let mut reader = unsafe { host_streaming_buffer(data) }; + assert_eq!(reader.total_remaining(), 10); + let mut buf = [0u8; 4]; + reader.read(&mut buf).unwrap(); + assert_eq!(reader.total_remaining(), 6); + } + + #[test] + fn eof_after_all_read() { + let data = b"abc"; + let mut reader = unsafe { host_streaming_buffer(data) }; + let mut out = vec![0u8; 3]; + reader.read_exact(&mut out).unwrap(); + assert_eq!(reader.total_remaining(), 0); + let n = reader.read(&mut out).unwrap(); + assert_eq!(n, 0); + } + + #[test] + fn bincode_roundtrip_through_streaming() { + let original: Vec = vec![42, 99, 1337, 0, u32::MAX]; + let serialized = bincode::serialize(&original).unwrap(); + let mut reader = unsafe { host_streaming_buffer(&serialized) }; + let mut bytes = Vec::with_capacity(reader.total_remaining()); + reader.read_to_end(&mut bytes).unwrap(); + let result: Vec = bincode::deserialize(&bytes).unwrap(); + assert_eq!(result, original); + } +} + #[cfg(all(test, any(unix, windows), feature = "wasmer-tests"))] mod test { use super::*;