diff --git a/crates/paimon/src/arrow/format/vortex.rs b/crates/paimon/src/arrow/format/vortex.rs index f0a3a48e..6dfce9c4 100644 --- a/crates/paimon/src/arrow/format/vortex.rs +++ b/crates/paimon/src/arrow/format/vortex.rs @@ -24,9 +24,12 @@ use arrow_array::RecordBatch; use arrow_schema::{DataType as ArrowDataType, SchemaRef}; use async_trait::async_trait; use futures::future::BoxFuture; +use futures::Stream; use futures::StreamExt; +use std::pin::Pin; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; +use std::task::{Context, Poll}; use vortex::array::arrow::{FromArrowArray, IntoArrowArray}; use vortex::array::dtype::arrow::FromArrowType; use vortex::array::dtype::DType; @@ -38,6 +41,9 @@ use vortex::array::ArrayRef; use vortex::buffer::{Alignment, ByteBuffer}; use vortex::error::VortexResult; use vortex::file::{OpenOptionsSessionExt, WriteOptionsSessionExt}; +use vortex::io::runtime::tokio::TokioRuntime; +use vortex::io::runtime::BlockingRuntime; +use vortex::io::session::RuntimeSessionExt; use vortex::io::{IoBuf, VortexReadAt, VortexWrite}; use vortex::scan::selection::Selection; use vortex::session::VortexSession; @@ -50,6 +56,51 @@ use vortex::VortexSessionDefault; /// Maximum number of concurrent read requests for Vortex file IO. const DEFAULT_READ_CONCURRENCY: usize = 10; +// --------------------------------------------------------------------------- +// Vortex Runtime +// --------------------------------------------------------------------------- + +struct PaimonVortexRuntime { + runtime: TokioRuntime, +} + +impl PaimonVortexRuntime { + fn new() -> crate::Result> { + let handle = tokio::runtime::Handle::try_current().map_err(|e| Error::DataInvalid { + message: format!("Vortex requires an active Tokio runtime: {e}"), + source: None, + })?; + Ok(Arc::new(Self { + runtime: TokioRuntime::new(handle), + })) + } + + fn session(&self) -> VortexSession { + VortexSession::default().with_handle(self.runtime.handle()) + } +} + +fn new_vortex_session() -> crate::Result<(VortexSession, Arc)> { + let runtime = PaimonVortexRuntime::new()?; + let session = runtime.session(); + Ok((session, runtime)) +} + +struct VortexRecordBatchStream { + inner: ArrowRecordBatchStream, + _runtime: Arc, +} + +impl Unpin for VortexRecordBatchStream {} + +impl Stream for VortexRecordBatchStream { + type Item = crate::Result; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.get_mut().inner.as_mut().poll_next(cx) + } +} + // --------------------------------------------------------------------------- // IO Adapters // --------------------------------------------------------------------------- @@ -110,7 +161,7 @@ impl FormatFileReader for VortexFormatReader { _batch_size: Option, row_selection: Option>, ) -> crate::Result { - let session = VortexSession::default(); + let (session, runtime) = new_vortex_session()?; let source = Arc::new(PaimonVortexReadAt { file_size, @@ -201,7 +252,10 @@ impl FormatFileReader for VortexFormatReader { result.and_then(|vortex_array| vortex_array_to_record_batch(vortex_array, &schema)) }); - Ok(Box::pin(stream)) + Ok(Box::pin(VortexRecordBatchStream { + inner: Box::pin(stream), + _runtime: runtime, + })) } } @@ -552,10 +606,13 @@ pub(crate) struct VortexFormatWriter { write_task: Option>>, /// Bytes already flushed to storage (updated by the background task). bytes_written: Arc, + /// Keeps the Vortex runtime handle alive while the write task uses the session. + _runtime: Arc, } impl VortexFormatWriter { pub(crate) async fn new(output: &OutputFile, schema: SchemaRef) -> crate::Result { + let (session, runtime) = new_vortex_session()?; let dtype = DType::from_arrow(schema); // Create channel for streaming arrays to the background writer. @@ -575,7 +632,6 @@ impl VortexFormatWriter { }; // Spawn the background write task. - let session = VortexSession::default(); let write_task = tokio::spawn(async move { let mut sink = sink; let result = session @@ -593,6 +649,7 @@ impl VortexFormatWriter { sender: Some(sender), write_task: Some(write_task), bytes_written, + _runtime: runtime, }) } }