Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 60 additions & 3 deletions crates/paimon/src/arrow/format/vortex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,12 @@ use arrow_array::RecordBatch;
use arrow_schema::{DataType as ArrowDataType, SchemaRef};
use async_trait::async_trait;
use futures::future::BoxFuture;
use futures::Stream;
use futures::StreamExt;
use std::pin::Pin;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::task::{Context, Poll};
use vortex::array::arrow::{FromArrowArray, IntoArrowArray};
use vortex::array::dtype::arrow::FromArrowType;
use vortex::array::dtype::DType;
Expand All @@ -38,6 +41,9 @@ use vortex::array::ArrayRef;
use vortex::buffer::{Alignment, ByteBuffer};
use vortex::error::VortexResult;
use vortex::file::{OpenOptionsSessionExt, WriteOptionsSessionExt};
use vortex::io::runtime::tokio::TokioRuntime;
use vortex::io::runtime::BlockingRuntime;
use vortex::io::session::RuntimeSessionExt;
use vortex::io::{IoBuf, VortexReadAt, VortexWrite};
use vortex::scan::selection::Selection;
use vortex::session::VortexSession;
Expand All @@ -50,6 +56,51 @@ use vortex::VortexSessionDefault;
/// Maximum number of concurrent read requests for Vortex file IO.
const DEFAULT_READ_CONCURRENCY: usize = 10;

// ---------------------------------------------------------------------------
// Vortex Runtime
// ---------------------------------------------------------------------------

struct PaimonVortexRuntime {
runtime: TokioRuntime,
}

impl PaimonVortexRuntime {
fn new() -> crate::Result<Arc<Self>> {
let handle = tokio::runtime::Handle::try_current().map_err(|e| Error::DataInvalid {
message: format!("Vortex requires an active Tokio runtime: {e}"),
source: None,
})?;
Ok(Arc::new(Self {
runtime: TokioRuntime::new(handle),
}))
}

fn session(&self) -> VortexSession {
VortexSession::default().with_handle(self.runtime.handle())
}
}

fn new_vortex_session() -> crate::Result<(VortexSession, Arc<PaimonVortexRuntime>)> {
let runtime = PaimonVortexRuntime::new()?;
let session = runtime.session();
Ok((session, runtime))
}

struct VortexRecordBatchStream {
inner: ArrowRecordBatchStream,
_runtime: Arc<PaimonVortexRuntime>,
}

impl Unpin for VortexRecordBatchStream {}

impl Stream for VortexRecordBatchStream {
type Item = crate::Result<RecordBatch>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.get_mut().inner.as_mut().poll_next(cx)
}
}

// ---------------------------------------------------------------------------
// IO Adapters
// ---------------------------------------------------------------------------
Expand Down Expand Up @@ -110,7 +161,7 @@ impl FormatFileReader for VortexFormatReader {
_batch_size: Option<usize>,
row_selection: Option<Vec<RowRange>>,
) -> crate::Result<ArrowRecordBatchStream> {
let session = VortexSession::default();
let (session, runtime) = new_vortex_session()?;

let source = Arc::new(PaimonVortexReadAt {
file_size,
Expand Down Expand Up @@ -201,7 +252,10 @@ impl FormatFileReader for VortexFormatReader {
result.and_then(|vortex_array| vortex_array_to_record_batch(vortex_array, &schema))
});

Ok(Box::pin(stream))
Ok(Box::pin(VortexRecordBatchStream {
inner: Box::pin(stream),
_runtime: runtime,
}))
}
}

Expand Down Expand Up @@ -552,10 +606,13 @@ pub(crate) struct VortexFormatWriter {
write_task: Option<tokio::task::JoinHandle<VortexResult<vortex::file::WriteSummary>>>,
/// Bytes already flushed to storage (updated by the background task).
bytes_written: Arc<AtomicU64>,
/// Keeps the Vortex runtime handle alive while the write task uses the session.
_runtime: Arc<PaimonVortexRuntime>,
}

impl VortexFormatWriter {
pub(crate) async fn new(output: &OutputFile, schema: SchemaRef) -> crate::Result<Self> {
let (session, runtime) = new_vortex_session()?;
let dtype = DType::from_arrow(schema);

// Create channel for streaming arrays to the background writer.
Expand All @@ -575,7 +632,6 @@ impl VortexFormatWriter {
};

// Spawn the background write task.
let session = VortexSession::default();
let write_task = tokio::spawn(async move {
let mut sink = sink;
let result = session
Expand All @@ -593,6 +649,7 @@ impl VortexFormatWriter {
sender: Some(sender),
write_task: Some(write_task),
bytes_written,
_runtime: runtime,
})
}
}
Expand Down
Loading