diff --git a/bindings/python/fluss/__init__.pyi b/bindings/python/fluss/__init__.pyi index 95f30801..1f424495 100644 --- a/bindings/python/fluss/__init__.pyi +++ b/bindings/python/fluss/__init__.pyi @@ -780,6 +780,26 @@ class LogScanner: or timeout expires. """ ... + def to_arrow_batch_reader(self) -> pa.RecordBatchReader: + """Create a lazy Arrow RecordBatchReader that reads until latest offsets. + + Returns a ``pyarrow.RecordBatchReader`` that lazily polls batches one at + a time (streaming). Prefer this when you want to process batches without + holding the full result in memory at once. + + Do not call ``poll_arrow`` / ``poll_record_batch`` on this scanner while + iterating the reader; they share the same underlying scanner state. + Overlapping calls fail immediately with ``FlussError`` (client-side), not + by blocking behind the active session. + + Requires a batch-based scanner (created with ``new_scan().create_record_batch_log_scanner()``). + You must call ``subscribe()``, ``subscribe_buckets()``, ``subscribe_partition()``, + or ``subscribe_partition_buckets()`` first. + + Returns: + ``pyarrow.RecordBatchReader`` yielding ``RecordBatch`` objects. + """ + ... def to_pandas(self) -> pd.DataFrame: """Convert all data to Pandas DataFrame. @@ -792,6 +812,11 @@ class LogScanner: def to_arrow(self) -> pa.Table: """Convert all data to Arrow Table. + Batches are collected in Rust then combined into one table (no per-batch + Python iteration). Do not interleave with ``poll_arrow`` / ``poll_record_batch`` + for the same subscription session; overlapping use fails immediately with + ``FlussError``. + Requires a batch-based scanner (created with new_scan().create_record_batch_log_scanner()). Reads from currently subscribed buckets until reaching their latest offsets. diff --git a/bindings/python/src/lib.rs b/bindings/python/src/lib.rs index 6890e088..5d7cccae 100644 --- a/bindings/python/src/lib.rs +++ b/bindings/python/src/lib.rs @@ -118,6 +118,7 @@ fn _fluss(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; + m.add_class::()?; m.add_class::()?; m.add_class::()?; m.add_class::()?; diff --git a/bindings/python/src/table.rs b/bindings/python/src/table.rs index 660cd6be..de97ee52 100644 --- a/bindings/python/src/table.rs +++ b/bindings/python/src/table.rs @@ -21,7 +21,6 @@ use arrow::array::RecordBatch as ArrowRecordBatch; use arrow_pyarrow::{FromPyArrow, ToPyArrow}; use arrow_schema::SchemaRef; use fluss::record::to_arrow_schema; -use fluss::rpc::message::OffsetSpec; use indexmap::IndexMap; use pyo3::exceptions::{PyIndexError, PyRuntimeError, PyTypeError}; use pyo3::sync::PyOnceLock; @@ -1856,6 +1855,40 @@ fn get_type_name(value: &Bound) -> String { .unwrap_or_else(|_| "unknown".to_string()) } +/// Python iterator that lazily yields PyArrow RecordBatches from a +/// [`fcore::client::RecordBatchLogReader`]. Used as the backing iterator +/// for ``pa.RecordBatchReader.from_batches()``. +/// +/// **Concurrency:** Do not call ``poll_arrow`` / ``poll_record_batch`` on the +/// same logical scanner while this iterator is active. +#[pyclass] +pub struct PyRecordBatchLogReader { + reader: fcore::client::RecordBatchLogReader, +} + +#[pymethods] +impl PyRecordBatchLogReader { + fn __iter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> { + slf + } + + fn __next__(&mut self, py: Python) -> PyResult>> { + let batch = py + .detach(|| TOKIO_RUNTIME.block_on(self.reader.next_batch())) + .map_err(|e| FlussError::from_core_error(&e))?; + + match batch { + Some(b) => { + let py_batch = b + .to_pyarrow(py) + .map_err(|e| FlussError::new_err(format!("Failed to convert batch: {e}")))?; + Ok(Some(py_batch.unbind())) + } + None => Ok(None), + } + } +} + /// Wraps the two scanner variants so we never have an impossible state /// (both None or both Some). enum ScannerKind { @@ -1908,8 +1941,6 @@ pub struct LogScanner { projected_schema: SchemaRef, /// The projected row type to use for record-based scanning projected_row_type: fcore::metadata::RowType, - /// Cache for partition_id -> partition_name mapping (avoids repeated list_partition_infos calls) - partition_name_cache: std::sync::RwLock>>, } #[pymethods] @@ -2157,29 +2188,87 @@ impl LogScanner { Ok(empty_table.into()) } + /// Create a lazy Arrow RecordBatchReader that reads until latest offsets. + /// + /// Returns a PyArrow RecordBatchReader that lazily polls batches one at a + /// time. This is more memory-efficient than ``to_arrow()`` which loads all + /// data into a single table. + /// + /// **Concurrency:** The reader shares the same underlying scanner state as + /// this ``LogScanner``. Do not call ``poll_arrow``, ``poll_record_batch``, + /// or other poll methods while iterating this reader. + /// + /// You must call subscribe(), subscribe_buckets(), subscribe_partition(), + /// or subscribe_partition_buckets() first. + /// + /// Returns: + /// ``pyarrow.RecordBatchReader`` yielding ``RecordBatch`` objects + fn to_arrow_batch_reader(&self, py: Python) -> PyResult> { + let scanner = self.scanner.as_batch()?; + + let reader = py + .detach(|| { + TOKIO_RUNTIME.block_on(async { + fcore::client::RecordBatchLogReader::new_until_latest( + scanner.clone(), + &self.admin, + ) + .await + }) + }) + .map_err(|e| FlussError::from_core_error(&e))?; + + let py_schema = reader + .schema() + .to_pyarrow(py) + .map_err(|e| FlussError::new_err(format!("Failed to convert schema: {e}")))?; + + let py_iter = Py::new(py, PyRecordBatchLogReader { reader })?; + + let pyarrow = py.import("pyarrow")?; + let batch_reader = pyarrow + .getattr("RecordBatchReader")? + .call_method1("from_batches", (py_schema, py_iter))?; + + Ok(batch_reader.into()) + } + /// Convert all data to Arrow Table. /// /// Reads from currently subscribed buckets until reaching their latest offsets. /// Works for both partitioned and non-partitioned tables. /// + /// Materializes batches in Rust (``RecordBatchLogReader::collect_all_batches``) + /// then builds one PyArrow table, avoiding per-batch Python iteration. + /// /// You must call subscribe(), subscribe_buckets(), subscribe_partition(), or subscribe_partition_buckets() first. /// /// Returns: /// PyArrow Table containing all data from subscribed buckets fn to_arrow(&self, py: Python) -> PyResult> { let scanner = self.scanner.as_batch()?; - let subscribed = scanner.get_subscribed_buckets(); - if subscribed.is_empty() { - return Err(FlussError::new_err( - "No buckets subscribed. Call subscribe(), subscribe_buckets(), subscribe_partition(), or subscribe_partition_buckets() first.", - )); - } - // 2. Query latest offsets for all subscribed buckets - let stopping_offsets = self.query_latest_offsets(py, &subscribed)?; + let batches: Vec> = py + .detach(|| { + TOKIO_RUNTIME.block_on(async { + let mut reader = fcore::client::RecordBatchLogReader::new_until_latest( + scanner.clone(), + &self.admin, + ) + .await?; + let batches = reader.collect_all_batches().await?; + Ok::<_, fcore::error::Error>( + batches.into_iter().map(std::sync::Arc::new).collect(), + ) + }) + }) + .map_err(|e| FlussError::from_core_error(&e))?; + + if batches.is_empty() { + return self.create_empty_table(py); + } - // 3. Poll until all buckets reach their stopping offsets - self.poll_until_offsets(py, stopping_offsets) + Utils::combine_batches_to_table(py, batches) } /// Convert all data to Pandas DataFrame. @@ -2218,207 +2307,8 @@ impl LogScanner { table_info, projected_schema, projected_row_type, - partition_name_cache: std::sync::RwLock::new(None), } } - - /// Get partition_id -> partition_name mapping, using cache if available - fn get_partition_name_map( - &self, - py: Python, - table_path: &fcore::metadata::TablePath, - ) -> PyResult> { - // Check cache first (read lock) - { - let cache = self.partition_name_cache.read().unwrap(); - if let Some(map) = cache.as_ref() { - return Ok(map.clone()); - } - } - - // Fetch partition infos (releases GIL during async call) - let partition_infos: Vec = py - .detach(|| { - TOKIO_RUNTIME.block_on(async { self.admin.list_partition_infos(table_path).await }) - }) - .map_err(|e| FlussError::from_core_error(&e))?; - - // Build and cache the mapping - let map: HashMap = partition_infos - .into_iter() - .map(|info| (info.get_partition_id(), info.get_partition_name())) - .collect(); - - // Store in cache (write lock) - { - let mut cache = self.partition_name_cache.write().unwrap(); - *cache = Some(map.clone()); - } - - Ok(map) - } - - /// Query latest offsets for subscribed buckets (handles both partitioned and non-partitioned) - fn query_latest_offsets( - &self, - py: Python, - subscribed: &[(fcore::metadata::TableBucket, i64)], - ) -> PyResult> { - let scanner = self.scanner.as_batch()?; - let is_partitioned = scanner.is_partitioned(); - let table_path = &self.table_info.table_path; - - if !is_partitioned { - // Non-partitioned: simple case - just query all bucket IDs - let bucket_ids: Vec = subscribed.iter().map(|(tb, _)| tb.bucket_id()).collect(); - - let offsets: HashMap = py - .detach(|| { - TOKIO_RUNTIME.block_on(async { - self.admin - .list_offsets(table_path, &bucket_ids, OffsetSpec::Latest) - .await - }) - }) - .map_err(|e| FlussError::from_core_error(&e))?; - - // Convert to TableBucket-keyed map - let table_id = self.table_info.table_id; - Ok(offsets - .into_iter() - .filter(|(_, offset)| *offset > 0) - .map(|(bucket_id, offset)| { - ( - fcore::metadata::TableBucket::new(table_id, bucket_id), - offset, - ) - }) - .collect()) - } else { - // Partitioned: need to query per partition - self.query_partitioned_offsets(py, subscribed) - } - } - - /// Query offsets for partitioned table subscriptions - fn query_partitioned_offsets( - &self, - py: Python, - subscribed: &[(fcore::metadata::TableBucket, i64)], - ) -> PyResult> { - let table_path = &self.table_info.table_path; - - // Get partition_id -> partition_name mapping (cached) - let partition_id_to_name = self.get_partition_name_map(py, table_path)?; - - // Group subscribed buckets by partition_id - let mut by_partition: HashMap> = HashMap::new(); - for (tb, _) in subscribed { - if let Some(partition_id) = tb.partition_id() { - by_partition - .entry(partition_id) - .or_default() - .push(tb.bucket_id()); - } - } - - // Query offsets for each partition - let mut result: HashMap = HashMap::new(); - let table_id = self.table_info.table_id; - - for (partition_id, bucket_ids) in by_partition { - let partition_name = partition_id_to_name.get(&partition_id).ok_or_else(|| { - FlussError::new_err(format!("Unknown partition_id: {partition_id}")) - })?; - - let offsets: HashMap = py - .detach(|| { - TOKIO_RUNTIME.block_on(async { - self.admin - .list_partition_offsets( - table_path, - partition_name, - &bucket_ids, - OffsetSpec::Latest, - ) - .await - }) - }) - .map_err(|e| FlussError::from_core_error(&e))?; - - for (bucket_id, offset) in offsets { - if offset > 0 { - let tb = fcore::metadata::TableBucket::new_with_partition( - table_id, - Some(partition_id), - bucket_id, - ); - result.insert(tb, offset); - } - } - } - - Ok(result) - } - - /// Poll until all buckets reach their stopping offsets - fn poll_until_offsets( - &self, - py: Python, - mut stopping_offsets: HashMap, - ) -> PyResult> { - let scanner = self.scanner.as_batch()?; - let mut all_batches = Vec::new(); - - while !stopping_offsets.is_empty() { - let scan_batches = py - .detach(|| { - TOKIO_RUNTIME.block_on(async { scanner.poll(Duration::from_millis(500)).await }) - }) - .map_err(|e| FlussError::from_core_error(&e))?; - - if scan_batches.is_empty() { - continue; - } - - for scan_batch in scan_batches { - let table_bucket = scan_batch.bucket().clone(); - - // Check if this bucket is still being tracked - let Some(&stop_at) = stopping_offsets.get(&table_bucket) else { - continue; - }; - - let base_offset = scan_batch.base_offset(); - let last_offset = scan_batch.last_offset(); - - // If the batch starts at or after the stop_at offset, the bucket is exhausted - if base_offset >= stop_at { - stopping_offsets.remove(&table_bucket); - continue; - } - - let batch = if last_offset >= stop_at { - // Slice batch to keep only records where offset < stop_at - let num_to_keep = (stop_at - base_offset) as usize; - let b = scan_batch.into_batch(); - let limit = num_to_keep.min(b.num_rows()); - b.slice(0, limit) - } else { - scan_batch.into_batch() - }; - - all_batches.push(Arc::new(batch)); - - // Check if we're done with this bucket - if last_offset >= stop_at - 1 { - stopping_offsets.remove(&table_bucket); - } - } - } - - Utils::combine_batches_to_table(py, all_batches) - } } #[cfg(test)] diff --git a/crates/fluss/src/client/table/mod.rs b/crates/fluss/src/client/table/mod.rs index 4c3dfe2d..7cdc0447 100644 --- a/crates/fluss/src/client/table/mod.rs +++ b/crates/fluss/src/client/table/mod.rs @@ -28,12 +28,14 @@ mod lookup; mod log_fetch_buffer; mod partition_getter; +mod reader; mod remote_log; mod scanner; mod upsert; pub use append::{AppendWriter, TableAppend}; pub use lookup::{LookupResult, Lookuper, TableLookup}; +pub use reader::{RecordBatchLogReader, SyncRecordBatchLogReader}; pub use remote_log::{ DEFAULT_REMOTE_FILE_DOWNLOAD_THREAD_NUM, DEFAULT_SCANNER_REMOTE_LOG_PREFETCH_NUM, }; diff --git a/crates/fluss/src/client/table/reader.rs b/crates/fluss/src/client/table/reader.rs new file mode 100644 index 00000000..3317ced7 --- /dev/null +++ b/crates/fluss/src/client/table/reader.rs @@ -0,0 +1,500 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Bounded log reader that polls until stopping offsets, then terminates. +//! +//! Unlike [`RecordBatchLogScanner`] which is unbounded (continuous streaming), +//! [`RecordBatchLogReader`] reads log data up to a finite set of stopping +//! offsets and then signals completion. This enables "snapshot-style" reads +//! from a streaming log: capture the latest offsets, then consume all data +//! up to those offsets. +//! +//! The reader also provides a synchronous [`arrow::record_batch::RecordBatchReader`] +//! adapter via [`RecordBatchLogReader::to_record_batch_reader`] for Arrow +//! ecosystem interop and FFI consumers (Python, C++). + +use crate::client::admin::FlussAdmin; +use crate::client::table::RecordBatchLogScanner; +use crate::error::{Error, Result}; +use crate::metadata::TableBucket; +use crate::record::ScanBatch; +use crate::rpc::message::OffsetSpec; +use arrow::record_batch::RecordBatch; +use arrow_schema::SchemaRef; +use std::collections::{HashMap, VecDeque}; +use std::time::Duration; + +const DEFAULT_POLL_TIMEOUT: Duration = Duration::from_millis(500); + +/// Bounded log reader that consumes log data up to specified stopping offsets. +/// +/// This type wraps a [`RecordBatchLogScanner`] and adds stopping semantics: +/// it polls batches from the scanner, filters/slices them against per-bucket +/// stopping offsets, and signals completion when all buckets are caught up. +/// +/// # Concurrent use (important) +/// +/// [`RecordBatchLogScanner`] is cheaply clonable and all clones share the same +/// underlying scanner state (fetch buffer, subscription state, in-flight +/// fetches). A `RecordBatchLogReader` typically takes one clone while the +/// [`crate::client::FlussTable`] scan path may still hold another handle to the +/// same logical scanner. +/// +/// **Do not** interleave [`RecordBatchLogScanner::poll`] (or Python +/// `poll_arrow` / `poll_record_batch`) with [`next_batch`](RecordBatchLogReader::next_batch) +/// on scanners that share this state. Use either the bounded reader **or** the +/// low-level poll loop for a given subscription session, not both at once. +/// Overlapping calls fail fast with [`crate::error::Error::UnsupportedOperation`] +/// (serialized in the client via `LogScannerInner::poll_session`). +/// +/// # Construction +/// +/// Use [`RecordBatchLogReader::new_until_latest`] for the common case of +/// reading all currently-available data, or [`RecordBatchLogReader::new_until_offsets`] +/// for custom stopping offsets. +/// +/// # Async iteration +/// +/// Call [`next_batch`](RecordBatchLogReader::next_batch) repeatedly to get +/// `RecordBatch`es lazily, one at a time. Returns `None` when all buckets +/// have reached their stopping offsets. +/// +/// # Sync adapter +/// +/// Call [`to_record_batch_reader`](RecordBatchLogReader::to_record_batch_reader) +/// to get a synchronous [`arrow::record_batch::RecordBatchReader`] suitable +/// for Arrow FFI consumers. +pub struct RecordBatchLogReader { + scanner: RecordBatchLogScanner, + stopping_offsets: HashMap, + buffer: VecDeque, + schema: SchemaRef, +} + +impl RecordBatchLogReader { + /// Create a reader that reads until the latest offsets at the time of creation. + /// + /// Queries the server for the current latest offset of each subscribed + /// bucket, then reads until those offsets are reached. + pub async fn new_until_latest( + scanner: RecordBatchLogScanner, + admin: &FlussAdmin, + ) -> Result { + let subscribed = scanner.get_subscribed_buckets(); + if subscribed.is_empty() { + return Err(Error::IllegalArgument { + message: "No buckets subscribed. Call subscribe() before creating a reader." + .to_string(), + }); + } + + let stopping_offsets = query_latest_offsets(admin, &scanner, &subscribed).await?; + let schema = scanner.schema(); + + Ok(Self { + scanner, + stopping_offsets, + buffer: VecDeque::new(), + schema, + }) + } + + /// Create a reader with explicit stopping offsets per bucket. + pub fn new_until_offsets( + scanner: RecordBatchLogScanner, + stopping_offsets: HashMap, + ) -> Self { + let schema = scanner.schema(); + Self { + scanner, + stopping_offsets, + buffer: VecDeque::new(), + schema, + } + } + + /// Returns the Arrow schema for batches produced by this reader. + pub fn schema(&self) -> SchemaRef { + self.schema.clone() + } + + /// Drain all remaining batches until stopping offsets are satisfied. + /// + /// This is a convenience for callers (e.g. bindings building a single Arrow + /// table) that want to materialize the full result in Rust without per-batch + /// Python iteration. + pub async fn collect_all_batches(&mut self) -> Result> { + let mut out = Vec::new(); + while let Some(b) = self.next_batch().await? { + out.push(b); + } + Ok(out) + } + + /// Fetch the next `RecordBatch`, or `None` if all buckets are caught up. + /// + /// Each call may internally poll multiple batches from the scanner, + /// buffer them, and return one at a time. Batches that cross a stopping + /// offset boundary are sliced to exclude records at or beyond the stop point. + pub async fn next_batch(&mut self) -> Result> { + loop { + if let Some(batch) = self.buffer.pop_front() { + return Ok(Some(batch)); + } + + if self.stopping_offsets.is_empty() { + return Ok(None); + } + + let scan_batches = self.scanner.poll(DEFAULT_POLL_TIMEOUT).await?; + + if scan_batches.is_empty() { + continue; + } + + filter_batches(scan_batches, &mut self.stopping_offsets, &mut self.buffer); + } + } + + /// Convert this async reader into a synchronous [`arrow::record_batch::RecordBatchReader`]. + /// + /// The returned adapter calls [`tokio::runtime::Handle::block_on`] on each + /// iterator step. **Do not** call this from inside a Tokio worker thread + /// while the same runtime is driving async work (nested `block_on` can + /// panic or deadlock). Prefer [`next_batch`](RecordBatchLogReader::next_batch) + /// in async Rust code. This is intended for sync/FFI boundaries (C++, some + /// Python call paths). + pub fn to_record_batch_reader( + self, + handle: tokio::runtime::Handle, + ) -> SyncRecordBatchLogReader { + SyncRecordBatchLogReader { + reader: self, + handle, + } + } +} + +/// Synchronous adapter that implements [`arrow::record_batch::RecordBatchReader`]. +/// +/// Created via [`RecordBatchLogReader::to_record_batch_reader`]. +/// Blocks the current thread on each `next()` call using the provided +/// Tokio runtime handle. +pub struct SyncRecordBatchLogReader { + reader: RecordBatchLogReader, + handle: tokio::runtime::Handle, +} + +impl Iterator for SyncRecordBatchLogReader { + type Item = std::result::Result; + + fn next(&mut self) -> Option { + match self.handle.block_on(self.reader.next_batch()) { + Ok(Some(batch)) => Some(Ok(batch)), + Ok(None) => None, + Err(e) => Some(Err(arrow::error::ArrowError::ExternalError(Box::new(e)))), + } + } +} + +impl arrow::record_batch::RecordBatchReader for SyncRecordBatchLogReader { + fn schema(&self) -> SchemaRef { + self.reader.schema() + } +} + +/// Query latest offsets for all subscribed buckets, handling both partitioned +/// and non-partitioned tables. +async fn query_latest_offsets( + admin: &FlussAdmin, + scanner: &RecordBatchLogScanner, + subscribed: &[(TableBucket, i64)], +) -> Result> { + let table_path = scanner.table_path(); + + if !scanner.is_partitioned() { + let bucket_ids: Vec = subscribed.iter().map(|(tb, _)| tb.bucket_id()).collect(); + + let offsets = admin + .list_offsets(table_path, &bucket_ids, OffsetSpec::Latest) + .await?; + + let table_id = scanner.table_id(); + Ok(offsets + .into_iter() + .filter(|(_, offset)| *offset > 0) + .map(|(bucket_id, offset)| (TableBucket::new(table_id, bucket_id), offset)) + .collect()) + } else { + query_partitioned_offsets(admin, scanner, subscribed).await + } +} + +/// Query offsets for partitioned table subscriptions. +async fn query_partitioned_offsets( + admin: &FlussAdmin, + scanner: &RecordBatchLogScanner, + subscribed: &[(TableBucket, i64)], +) -> Result> { + let table_path = scanner.table_path(); + let table_id = scanner.table_id(); + + let partition_infos = admin.list_partition_infos(table_path).await?; + let partition_id_to_name: HashMap = partition_infos + .into_iter() + .map(|info| (info.get_partition_id(), info.get_partition_name())) + .collect(); + + let mut by_partition: HashMap> = HashMap::new(); + for (tb, _) in subscribed { + if let Some(partition_id) = tb.partition_id() { + by_partition + .entry(partition_id) + .or_default() + .push(tb.bucket_id()); + } + } + + let mut result: HashMap = HashMap::new(); + + for (partition_id, bucket_ids) in by_partition { + let partition_name = + partition_id_to_name + .get(&partition_id) + .ok_or_else(|| Error::UnexpectedError { + message: format!("Unknown partition_id: {partition_id}"), + source: None, + })?; + + let offsets = admin + .list_partition_offsets(table_path, partition_name, &bucket_ids, OffsetSpec::Latest) + .await?; + + for (bucket_id, offset) in offsets { + if offset > 0 { + let tb = TableBucket::new_with_partition(table_id, Some(partition_id), bucket_id); + result.insert(tb, offset); + } + } + } + + Ok(result) +} + +/// Filter and slice scan batches against per-bucket stopping offsets. +/// +/// For each batch: +/// - If the batch's bucket is not in `stopping_offsets`, skip it. +/// - If `base_offset >= stop_at`, the bucket is exhausted; remove from map. +/// - If `last_offset >= stop_at`, slice to keep only records before stop_at. +/// - Otherwise, keep the full batch. +/// +/// Accepted batches are pushed to `buffer`. Exhausted buckets are removed +/// from `stopping_offsets`. +fn filter_batches( + scan_batches: Vec, + stopping_offsets: &mut HashMap, + buffer: &mut VecDeque, +) { + for scan_batch in scan_batches { + let bucket = scan_batch.bucket().clone(); + let Some(&stop_at) = stopping_offsets.get(&bucket) else { + continue; + }; + + let base_offset = scan_batch.base_offset(); + let last_offset = scan_batch.last_offset(); + + if base_offset >= stop_at { + stopping_offsets.remove(&bucket); + continue; + } + + let batch = if last_offset >= stop_at { + let num_to_keep = (stop_at - base_offset) as usize; + let b = scan_batch.into_batch(); + let limit = num_to_keep.min(b.num_rows()); + b.slice(0, limit) + } else { + scan_batch.into_batch() + }; + + buffer.push_back(batch); + + if last_offset >= stop_at - 1 { + stopping_offsets.remove(&bucket); + } + } +} + +// TODO: Add an end-to-end test with `FlussTestingCluster` (feature +// `integration_tests`) covering `new_until_latest`, partitioned tables, and +// `new_until_offsets` stopping semantics. +#[cfg(test)] +mod tests { + use super::*; + use arrow::array::Int32Array; + use arrow_schema::{DataType, Field, Schema}; + use std::sync::Arc; + + fn test_schema() -> SchemaRef { + Arc::new(Schema::new(vec![Field::new("v", DataType::Int32, false)])) + } + + fn make_batch(values: &[i32]) -> RecordBatch { + RecordBatch::try_new( + test_schema(), + vec![Arc::new(Int32Array::from(values.to_vec()))], + ) + .unwrap() + } + + fn make_scan_batch(bucket: TableBucket, base_offset: i64, values: &[i32]) -> ScanBatch { + ScanBatch::new(bucket, make_batch(values), base_offset) + } + + fn bucket(id: i32) -> TableBucket { + TableBucket::new(1, id) + } + + #[test] + fn filter_batch_entirely_before_stop() { + let mut offsets = HashMap::from([(bucket(0), 100)]); + let mut buffer = VecDeque::new(); + + let batches = vec![make_scan_batch(bucket(0), 10, &[1, 2, 3])]; + filter_batches(batches, &mut offsets, &mut buffer); + + assert_eq!(buffer.len(), 1); + assert_eq!(buffer[0].num_rows(), 3); + assert!(offsets.contains_key(&bucket(0))); + } + + #[test] + fn filter_batch_crossing_stop_offset_is_sliced() { + let mut offsets = HashMap::from([(bucket(0), 12)]); + let mut buffer = VecDeque::new(); + + // base_offset=10, 5 rows -> offsets 10,11,12,13,14; stop_at=12 -> keep 2 + let batches = vec![make_scan_batch(bucket(0), 10, &[1, 2, 3, 4, 5])]; + filter_batches(batches, &mut offsets, &mut buffer); + + assert_eq!(buffer.len(), 1); + assert_eq!(buffer[0].num_rows(), 2); + assert!(!offsets.contains_key(&bucket(0))); + } + + #[test] + fn filter_batch_at_or_after_stop_offset_is_skipped() { + let mut offsets = HashMap::from([(bucket(0), 10)]); + let mut buffer = VecDeque::new(); + + // base_offset=10, stop_at=10 -> base >= stop, skip entirely + let batches = vec![make_scan_batch(bucket(0), 10, &[1, 2, 3])]; + filter_batches(batches, &mut offsets, &mut buffer); + + assert!(buffer.is_empty()); + assert!(!offsets.contains_key(&bucket(0))); + } + + #[test] + fn filter_batch_ending_exactly_at_stop_minus_one() { + let mut offsets = HashMap::from([(bucket(0), 13)]); + let mut buffer = VecDeque::new(); + + // base_offset=10, 3 rows -> offsets 10,11,12; last_offset=12, stop_at=13 + // last_offset (12) >= stop_at - 1 (12) => bucket done + let batches = vec![make_scan_batch(bucket(0), 10, &[1, 2, 3])]; + filter_batches(batches, &mut offsets, &mut buffer); + + assert_eq!(buffer.len(), 1); + assert_eq!(buffer[0].num_rows(), 3); + assert!(!offsets.contains_key(&bucket(0))); + } + + #[test] + fn filter_unknown_bucket_is_ignored() { + let mut offsets = HashMap::from([(bucket(0), 100)]); + let mut buffer = VecDeque::new(); + + let batches = vec![make_scan_batch(bucket(99), 0, &[1, 2])]; + filter_batches(batches, &mut offsets, &mut buffer); + + assert!(buffer.is_empty()); + assert!(offsets.contains_key(&bucket(0))); + } + + #[test] + fn filter_multiple_buckets_independent_tracking() { + let mut offsets = HashMap::from([(bucket(0), 12), (bucket(1), 5)]); + let mut buffer = VecDeque::new(); + + let batches = vec![ + make_scan_batch(bucket(0), 10, &[1, 2, 3]), // last=12, stop=12 -> keep 2, done + make_scan_batch(bucket(1), 0, &[10, 20, 30]), // last=2, stop=5 -> keep all, not done + ]; + filter_batches(batches, &mut offsets, &mut buffer); + + assert_eq!(buffer.len(), 2); + assert_eq!(buffer[0].num_rows(), 2); // bucket 0: sliced + assert_eq!(buffer[1].num_rows(), 3); // bucket 1: full + assert!(!offsets.contains_key(&bucket(0))); // bucket 0: done + assert!(offsets.contains_key(&bucket(1))); // bucket 1: still tracking + } + + #[test] + fn filter_empty_batch_at_stop() { + let mut offsets = HashMap::from([(bucket(0), 5)]); + let mut buffer = VecDeque::new(); + + // empty batch: base_offset=5, 0 rows -> last_offset = base-1 = 4 + // base_offset (5) >= stop_at (5) -> skip, remove + let batches = vec![make_scan_batch(bucket(0), 5, &[])]; + filter_batches(batches, &mut offsets, &mut buffer); + + assert!(buffer.is_empty()); + assert!(!offsets.contains_key(&bucket(0))); + } + + #[test] + fn filter_single_row_batch_before_stop() { + let mut offsets = HashMap::from([(bucket(0), 10)]); + let mut buffer = VecDeque::new(); + + let batches = vec![make_scan_batch(bucket(0), 5, &[42])]; + filter_batches(batches, &mut offsets, &mut buffer); + + assert_eq!(buffer.len(), 1); + assert_eq!(buffer[0].num_rows(), 1); + assert!(offsets.contains_key(&bucket(0))); + } + + #[test] + fn filter_single_row_batch_at_stop_boundary() { + let mut offsets = HashMap::from([(bucket(0), 5)]); + let mut buffer = VecDeque::new(); + + // base_offset=4, 1 row -> last_offset=4, stop=5 + // last < stop -> keep all; last (4) >= stop-1 (4) -> done + let batches = vec![make_scan_batch(bucket(0), 4, &[42])]; + filter_batches(batches, &mut offsets, &mut buffer); + + assert_eq!(buffer.len(), 1); + assert_eq!(buffer[0].num_rows(), 1); + assert!(!offsets.contains_key(&bucket(0))); + } +} diff --git a/crates/fluss/src/client/table/scanner.rs b/crates/fluss/src/client/table/scanner.rs index 43025393..a72d0877 100644 --- a/crates/fluss/src/client/table/scanner.rs +++ b/crates/fluss/src/client/table/scanner.rs @@ -254,6 +254,17 @@ pub struct LogScanner { /// /// More efficient than [`LogScanner`] for batch-level analytics where per-record /// metadata (offsets, timestamps) is not needed. +/// +/// Cloning is cheap (shared `Arc` internals). Multiple clones share the same +/// fetch buffer, subscription state, and in-flight fetches. +/// +/// **Concurrency:** Do not overlap read work across clones that share this state. +/// If [`RecordBatchLogScanner::poll`] runs concurrently with another +/// [`RecordBatchLogScanner::poll`], [`LogScanner::poll`], or +/// [`crate::client::RecordBatchLogReader::next_batch`], the overlapping call +/// fails fast with [`Error::UnsupportedOperation`](crate::error::Error::UnsupportedOperation) +/// (it is not queued). +#[derive(Clone)] pub struct RecordBatchLogScanner { inner: Arc, } @@ -266,6 +277,14 @@ struct LogScannerInner { log_scanner_status: Arc, log_fetcher: LogFetcher, is_partitioned_table: bool, + arrow_schema: SchemaRef, + /// Serializes overlapping `poll` / `poll_batches` across clones sharing this `Arc`. + /// + /// TODO: Consider an API that consumes + /// the scanner when building [`crate::client::RecordBatchLogReader`] (or an explicit + /// `ScanSession`) so only one driver owns the fetch loop; discuss trade-offs vs. cheap + /// `Clone` over shared state. + poll_session: tokio::sync::Mutex<()>, } impl LogScannerInner { @@ -277,6 +296,20 @@ impl LogScannerInner { projected_fields: Option>, ) -> Result { let log_scanner_status = Arc::new(LogScannerStatus::new()); + + let full_row_type = table_info.get_row_type(); + let arrow_schema = match &projected_fields { + Some(indices) => { + let projected_fields_vec: Vec<_> = indices + .iter() + .map(|&i| full_row_type.fields()[i].clone()) + .collect(); + let projected_row_type = crate::metadata::RowType::new(projected_fields_vec); + to_arrow_schema(&projected_row_type)? + } + None => to_arrow_schema(full_row_type)?, + }; + Ok(Self { table_path: table_info.table_path.clone(), table_id: table_info.table_id, @@ -285,16 +318,30 @@ impl LogScannerInner { log_scanner_status: log_scanner_status.clone(), log_fetcher: LogFetcher::new( table_info.clone(), - connections.clone(), - metadata.clone(), + connections, + metadata, log_scanner_status.clone(), config, projected_fields, )?, + arrow_schema, + poll_session: tokio::sync::Mutex::new(()), }) } + fn concurrent_log_scan_error() -> Error { + UnsupportedOperation { + message: "Concurrent log scan: another poll or RecordBatchLogReader session is active on this scanner. \ +Wait for it to finish, or create a separate TableScan." + .to_string(), + } + } + async fn poll_records(&self, timeout: Duration) -> Result { + let _poll_guard = self + .poll_session + .try_lock() + .map_err(|_| Self::concurrent_log_scan_error())?; let start = Instant::now(); let deadline = start + timeout; @@ -469,6 +516,10 @@ impl LogScannerInner { } async fn poll_batches(&self, timeout: Duration) -> Result> { + let _poll_guard = self + .poll_session + .try_lock() + .map_err(|_| Self::concurrent_log_scan_error())?; let start = Instant::now(); let deadline = start + timeout; @@ -612,6 +663,19 @@ impl RecordBatchLogScanner { ) -> Result<()> { self.inner.unsubscribe_partition(partition_id, bucket).await } + + /// Returns the Arrow schema for batches produced by this scanner. + pub fn schema(&self) -> SchemaRef { + self.inner.arrow_schema.clone() + } + + pub fn table_path(&self) -> &TablePath { + &self.inner.table_path + } + + pub fn table_id(&self) -> TableId { + self.inner.table_id + } } struct LogFetcher { @@ -1990,6 +2054,34 @@ mod tests { let result = validate_scan_support(&table_path, &table_info); assert!(result.is_ok()); } + + /// When `poll_session` is already held, [`RecordBatchLogScanner::poll`] must fail without + /// blocking (mirrors concurrent `poll` / [`crate::client::RecordBatchLogReader::next_batch`]). + #[tokio::test] + async fn poll_batches_rejects_when_poll_session_held() -> Result<()> { + let table_path = TablePath::new("db".to_string(), "tbl".to_string()); + let table_info = build_table_info(table_path.clone(), 1, 1); + let cluster = build_cluster_arc(&table_path, 1, 1); + let metadata = Arc::new(Metadata::new_for_test(cluster)); + let inner = Arc::new(LogScannerInner::new( + &table_info, + metadata, + Arc::new(RpcClient::new()), + &crate::config::Config::default(), + None, + )?); + let scanner = RecordBatchLogScanner { + inner: inner.clone(), + }; + let _hold = inner.poll_session.lock().await; + let err = scanner + .poll(std::time::Duration::from_millis(1)) + .await + .expect_err("expected concurrent poll error"); + assert!(matches!(err, UnsupportedOperation { .. })); + Ok(()) + } + #[tokio::test] async fn prepare_fetch_log_requests_uses_configured_fetch_params() -> Result<()> { let table_path = TablePath::new("db".to_string(), "tbl".to_string()); diff --git a/website/docs/user-guide/python/api-reference.md b/website/docs/user-guide/python/api-reference.md index a4b594bc..be8cdaf0 100644 --- a/website/docs/user-guide/python/api-reference.md +++ b/website/docs/user-guide/python/api-reference.md @@ -164,9 +164,12 @@ Builder for creating a `Lookuper`. Obtain via `FlussTable.new_lookup()`. | `.poll(timeout_ms) -> ScanRecords` | Poll individual records (record scanner only) | | `.poll_arrow(timeout_ms) -> pa.Table` | Poll as Arrow Table (batch scanner only) | | `.poll_record_batch(timeout_ms) -> list[RecordBatch]` | Poll batches with metadata (batch scanner only) | +| `.to_arrow_batch_reader() -> pa.RecordBatchReader` | Lazy Arrow RecordBatchReader reading until latest offsets (batch scanner only) | | `.to_arrow() -> pa.Table` | Read all subscribed data as Arrow Table (batch scanner only) | | `.to_pandas() -> pd.DataFrame` | Read all subscribed data as DataFrame (batch scanner only) | +> **Note:** Overlapping `poll_*` / `to_arrow*` / `to_arrow_batch_reader` calls on the same underlying scanner fail immediately with `FlussError` (they are not queued). + ## `ScanRecords` Returned by `LogScanner.poll()`. Records are grouped by bucket. diff --git a/website/docs/user-guide/rust/api-reference.md b/website/docs/user-guide/rust/api-reference.md index fbe3428c..653e7935 100644 --- a/website/docs/user-guide/rust/api-reference.md +++ b/website/docs/user-guide/rust/api-reference.md @@ -148,6 +148,8 @@ Complete API reference for the Fluss Rust client. ## `RecordBatchLogScanner` +Overlapping `poll` calls on clones that share state, or `poll` concurrent with `RecordBatchLogReader::next_batch`, fail fast with `fluss::error::Error::UnsupportedOperation` (not queued). + | Method | Description | |-----------------------------------------------------------------------------------------------------------|----------------------------------------------------------| | `async fn subscribe(&self, bucket_id: i32, start_offset: i64) -> Result<()>` | Subscribe to a bucket | @@ -159,6 +161,23 @@ Complete API reference for the Fluss Rust client. | `async fn poll(&self, timeout: Duration) -> Result>` | Poll for Arrow record batches | | `fn is_partitioned(&self) -> bool` | Check if the table is partitioned | | `fn get_subscribed_buckets(&self) -> Vec<(TableBucket, i64)>` | Get all current subscriptions as (bucket, offset) pairs | +| `fn schema(&self) -> SchemaRef` | Get the Arrow schema for batches produced by this scanner| +| `fn table_path(&self) -> &TablePath` | Get the table path | +| `fn table_id(&self) -> TableId` | Get the table ID | + +## `RecordBatchLogReader` + +Bounded log reader that consumes data up to specified stopping offsets, then terminates. +Unlike `RecordBatchLogScanner` which polls indefinitely, this reader stops automatically. + +| Method | Description | +|-------------------------------------------------------------------------------------------------------------|----------------------------------------------------------| +| `async fn new_until_latest(scanner, admin) -> Result` | Read until the latest offsets at time of creation | +| `fn new_until_offsets(scanner, stopping_offsets) -> Self` | Read until custom stopping offsets per bucket | +| `async fn next_batch(&mut self) -> Result>` | Get the next batch, or `None` when all buckets caught up | +| `async fn collect_all_batches(&mut self) -> Result>` | Drain all batches until stopping offsets are satisfied | +| `fn schema(&self) -> SchemaRef` | Arrow schema for produced batches | +| `fn to_record_batch_reader(self, handle) -> SyncRecordBatchLogReader` | Sync adapter implementing `arrow::RecordBatchReader` | ## `ScanRecord`