feat: Add RecordBatchLogReader for bounded log reading#446
feat: Add RecordBatchLogReader for bounded log reading#446charlesdong1991 wants to merge 1 commit intoapache:mainfrom
Conversation
| arrow_schema: SchemaRef, | ||
| /// Serializes overlapping `poll` / `poll_batches` across clones sharing this `Arc`. | ||
| /// | ||
| /// TODO: Consider an API that consumes |
There was a problem hiding this comment.
it is cheap to clone for this record batch log scanner, but all clones will share one Arc , so two overlapping poll is not supported under current usage model, i add a client-side guard with poll_session so overlapping calls can fail fast.
Not sure what you think, i am happy to create a new issue and do a follow-up on that, or if you prefer i can have a stricter API in this PR?
There was a problem hiding this comment.
Let's do it properly in this PR. The reader should take ownership of the scanner (move, not clone). That way the compiler prevents concurrent polls - no mutex needed.
fresh-borzoni
left a comment
There was a problem hiding this comment.
@charlesdong1991 Ty for the PR. Left comments, PTAL
| /// Each call may internally poll multiple batches from the scanner, | ||
| /// buffer them, and return one at a time. Batches that cross a stopping | ||
| /// offset boundary are sliced to exclude records at or beyond the stop point. | ||
| pub async fn next_batch(&mut self) -> Result<Option<RecordBatch>> { |
There was a problem hiding this comment.
next_batch() returns RecordBatch discarding bucket/offset metadata that was in use before with ScanRecord
| /// The projected row type to use for record-based scanning | ||
| projected_row_type: fcore::metadata::RowType, | ||
| /// Cache for partition_id -> partition_name mapping (avoids repeated list_partition_infos calls) | ||
| partition_name_cache: std::sync::RwLock<Option<HashMap<i64, String>>>, |
There was a problem hiding this comment.
Why have we removed this?
| m.add_class::<Lookuper>()?; | ||
| m.add_class::<Schema>()?; | ||
| m.add_class::<LogScanner>()?; | ||
| m.add_class::<PyRecordBatchLogReader>()?; |
There was a problem hiding this comment.
isn't it internal iterator?
|
|
||
| fn __next__(&mut self, py: Python) -> PyResult<Option<Py<PyAny>>> { | ||
| let batch = py | ||
| .detach(|| TOKIO_RUNTIME.block_on(self.reader.next_batch())) |
There was a problem hiding this comment.
PyRecordBatchLogReader holds async RecordBatchLogReader and calls TOKIO_RUNTIME.block_on() directly, duplicating what SyncRecordBatchLogReader already does.
Per the design spec, Python should use the shared sync adapter wrapped in py.detach().
| buffer.push_back(batch); | ||
|
|
||
| if last_offset >= stop_at - 1 { | ||
| stopping_offsets.remove(&bucket); |
There was a problem hiding this comment.
Shall we unsibscribe as well?
| }); | ||
| } | ||
|
|
||
| let stopping_offsets = query_latest_offsets(admin, &scanner, &subscribed).await?; |
There was a problem hiding this comment.
Buckets where subscribed offset >= latest offset stay in stopping_offsets forever, next_batch() loops indefinitely on empty polls
| arrow_schema: SchemaRef, | ||
| /// Serializes overlapping `poll` / `poll_batches` across clones sharing this `Arc`. | ||
| /// | ||
| /// TODO: Consider an API that consumes |
There was a problem hiding this comment.
Let's do it properly in this PR. The reader should take ownership of the scanner (move, not clone). That way the compiler prevents concurrent polls - no mutex needed.
Purpose
Move query_latest_offsets and poll-until-offsets logic from Python binding into Rust core as RecordBatchLogReader.
This enables both Python and C++ bindings to share the same bounded-read implementation.
Linked issue: close #406
Tests
Tests are passed locally
API and Format
Documentation