Skip to content

feat: Add RecordBatchLogReader for bounded log reading#446

Open
charlesdong1991 wants to merge 1 commit intoapache:mainfrom
charlesdong1991:arrow-batch-reader
Open

feat: Add RecordBatchLogReader for bounded log reading#446
charlesdong1991 wants to merge 1 commit intoapache:mainfrom
charlesdong1991:arrow-batch-reader

Conversation

@charlesdong1991
Copy link
Contributor

Purpose

Move query_latest_offsets and poll-until-offsets logic from Python binding into Rust core as RecordBatchLogReader.

This enables both Python and C++ bindings to share the same bounded-read implementation.

Linked issue: close #406

Tests

Tests are passed locally

API and Format

Documentation

arrow_schema: SchemaRef,
/// Serializes overlapping `poll` / `poll_batches` across clones sharing this `Arc`.
///
/// TODO: Consider an API that consumes
Copy link
Contributor Author

@charlesdong1991 charlesdong1991 Mar 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is cheap to clone for this record batch log scanner, but all clones will share one Arc , so two overlapping poll is not supported under current usage model, i add a client-side guard with poll_session so overlapping calls can fail fast.

Not sure what you think, i am happy to create a new issue and do a follow-up on that, or if you prefer i can have a stricter API in this PR?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's do it properly in this PR. The reader should take ownership of the scanner (move, not clone). That way the compiler prevents concurrent polls - no mutex needed.

Copy link
Contributor

@fresh-borzoni fresh-borzoni left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@charlesdong1991 Ty for the PR. Left comments, PTAL

/// Each call may internally poll multiple batches from the scanner,
/// buffer them, and return one at a time. Batches that cross a stopping
/// offset boundary are sliced to exclude records at or beyond the stop point.
pub async fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

next_batch() returns RecordBatch discarding bucket/offset metadata that was in use before with ScanRecord

/// The projected row type to use for record-based scanning
projected_row_type: fcore::metadata::RowType,
/// Cache for partition_id -> partition_name mapping (avoids repeated list_partition_infos calls)
partition_name_cache: std::sync::RwLock<Option<HashMap<i64, String>>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why have we removed this?

m.add_class::<Lookuper>()?;
m.add_class::<Schema>()?;
m.add_class::<LogScanner>()?;
m.add_class::<PyRecordBatchLogReader>()?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't it internal iterator?


fn __next__(&mut self, py: Python) -> PyResult<Option<Py<PyAny>>> {
let batch = py
.detach(|| TOKIO_RUNTIME.block_on(self.reader.next_batch()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PyRecordBatchLogReader holds async RecordBatchLogReader and calls TOKIO_RUNTIME.block_on() directly, duplicating what SyncRecordBatchLogReader already does.
Per the design spec, Python should use the shared sync adapter wrapped in py.detach().

buffer.push_back(batch);

if last_offset >= stop_at - 1 {
stopping_offsets.remove(&bucket);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we unsibscribe as well?

});
}

let stopping_offsets = query_latest_offsets(admin, &scanner, &subscribed).await?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Buckets where subscribed offset >= latest offset stay in stopping_offsets forever, next_batch() loops indefinitely on empty polls

arrow_schema: SchemaRef,
/// Serializes overlapping `poll` / `poll_batches` across clones sharing this `Arc`.
///
/// TODO: Consider an API that consumes
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's do it properly in this PR. The reader should take ownership of the scanner (move, not clone). That way the compiler prevents concurrent polls - no mutex needed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

NO to_arrow_batch_reader support in python binding

2 participants