Skip to content

Commit 5edff9e

Browse files
authored
Add FileSystem trait and make DuckDB configurable (#6564)
We switched over to using DuckDB's own file system for I/O. For some reason we have really binary performance benchmarks, either the same performance as before, or really really slow. We are going to investigate, but in the meantime this change abstracts the I/O behind a file system and allows us to configure whether to use Vortex I/O or DuckDB I/O. This abstraction was planned and built in #6391 anyway, so just making use of it a little sooner. As part of this change, we have a `FileSystem::list -> Stream` API. We used to use the async-compat crate to provide Tokio support within DuckDB, but that crate doesn't support streams. This PR inlines the core piece of logic from that crate and provides wrappers for I/O. To summarise: 1. Pulls out the FileSystem trait as used in #6391 1. Uses the trait to make fs configurable in DuckDB. Defaults to Vortex I/O as per before #5767 , with extension option to enable DuckDB native file system. 1. Clean up of the vortex-io crate for consistent structure + naming, as well as providing a "compat" module with compatibility wrappers. --------- Signed-off-by: Nicholas Gates <nick@nickgates.com>
1 parent 33966b6 commit 5edff9e

60 files changed

Lines changed: 1566 additions & 533 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

Cargo.lock

Lines changed: 3 additions & 14 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,6 @@ arrow-ord = "57.1"
9696
arrow-schema = "57.1"
9797
arrow-select = "57.1"
9898
arrow-string = "57.1"
99-
async-compat = "0.2.5"
10099
async-fs = "2.2.0"
101100
async-lock = "3.4"
102101
async-stream = "0.3.6"

benchmarks/duckdb-bench/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ impl DuckClient {
7979
}?;
8080

8181
let connection = db.connect()?;
82-
vortex_duckdb::register_table_functions(&connection)?;
82+
vortex_duckdb::initialize(&db)?;
8383

8484
// Install and load httpfs so DuckDB can access remote files (S3, GCS, HTTP).
8585
connection.query("INSTALL httpfs;")?;

docs/user-guide/duckdb.md

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,26 @@ without it, DuckDB defaults to CSV.
4040
COPY (SELECT * FROM my_table) TO 'output.vortex' (FORMAT vortex);
4141
```
4242

43+
## Extension Options
44+
45+
### `vortex_filesystem`
46+
47+
Controls which filesystem implementation is used for reading and writing Vortex files.
48+
49+
| Value | Description |
50+
|-------|-------------|
51+
| `'vortex'` (default) | Uses Vortex's built-in object store filesystem. Supports `file://` and `s3://` schemes. |
52+
| `'duckdb'` | Uses DuckDB's built-in filesystem, including any filesystem extensions such as `httpfs`. |
53+
54+
```sql
55+
SET vortex_filesystem = 'duckdb';
56+
```
57+
58+
Use `'duckdb'` when you want to leverage DuckDB's filesystem extensions (e.g., `httpfs` for HTTP
59+
or S3 access with DuckDB's credential management). Use `'vortex'` (the default) for direct
60+
object store access via Vortex's own S3 integration, which reads credentials from environment
61+
variables.
62+
4363
## Python
4464

4565
The DuckDB Python client works with `read_vortex` the same way:

vortex-datafusion/src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,8 +70,8 @@ mod common_tests {
7070
use vortex::array::ArrayRef;
7171
use vortex::array::arrow::FromArrowArray;
7272
use vortex::file::WriteOptionsSessionExt;
73-
use vortex::io::ObjectStoreWriter;
7473
use vortex::io::VortexWrite;
74+
use vortex::io::object_store::ObjectStoreWrite;
7575
use vortex::session::VortexSession;
7676

7777
use crate::VortexFormatFactory;
@@ -123,7 +123,7 @@ mod common_tests {
123123
P: Into<object_store::path::Path>,
124124
{
125125
let array = ArrayRef::from_arrow(batch, false)?;
126-
let mut write = ObjectStoreWriter::new(self.store.clone(), &path.into()).await?;
126+
let mut write = ObjectStoreWrite::new(self.store.clone(), &path.into()).await?;
127127
VX_SESSION
128128
.write_options()
129129
.write(&mut write, array.to_array_stream())

vortex-datafusion/src/persistent/format.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ use vortex::file::EOF_SIZE;
5555
use vortex::file::MAX_POSTSCRIPT_SIZE;
5656
use vortex::file::OpenOptionsSessionExt;
5757
use vortex::file::VORTEX_FILE_EXTENSION;
58-
use vortex::io::file::object_store::ObjectStoreSource;
58+
use vortex::io::object_store::ObjectStoreReadAt;
5959
use vortex::io::session::RuntimeSessionExt;
6060
use vortex::scalar::Scalar;
6161
use vortex::session::VortexSession;
@@ -261,7 +261,7 @@ impl FileFormat for VortexFormat {
261261
}
262262

263263
// Not cached or invalid - open the file
264-
let reader = Arc::new(ObjectStoreSource::new(
264+
let reader = Arc::new(ObjectStoreReadAt::new(
265265
store,
266266
object.location.clone(),
267267
session.handle(),
@@ -328,7 +328,7 @@ impl FileFormat for VortexFormat {
328328
Some(metadata) => metadata,
329329
None => {
330330
// Not cached - open the file
331-
let reader = Arc::new(ObjectStoreSource::new(
331+
let reader = Arc::new(ObjectStoreReadAt::new(
332332
store,
333333
object.location.clone(),
334334
session.handle(),

vortex-datafusion/src/persistent/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,8 @@ mod tests {
3333
use vortex::array::validity::Validity;
3434
use vortex::buffer::buffer;
3535
use vortex::file::WriteOptionsSessionExt;
36-
use vortex::io::ObjectStoreWriter;
3736
use vortex::io::VortexWrite;
37+
use vortex::io::object_store::ObjectStoreWrite;
3838
use vortex::session::VortexSession;
3939

4040
use crate::common_tests::TestSessionContext;
@@ -65,7 +65,7 @@ mod tests {
6565
Validity::NonNullable,
6666
)?;
6767

68-
let mut writer = ObjectStoreWriter::new(ctx.store.clone(), &"test.vortex".into()).await?;
68+
let mut writer = ObjectStoreWrite::new(ctx.store.clone(), &"test.vortex".into()).await?;
6969

7070
let summary = session
7171
.write_options()

vortex-datafusion/src/persistent/opener.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -477,8 +477,8 @@ mod tests {
477477
use vortex::array::arrow::FromArrowArray;
478478
use vortex::buffer::Buffer;
479479
use vortex::file::WriteOptionsSessionExt;
480-
use vortex::io::ObjectStoreWriter;
481480
use vortex::io::VortexWrite;
481+
use vortex::io::object_store::ObjectStoreWrite;
482482
use vortex::metrics::DefaultMetricsRegistry;
483483
use vortex::scan::Selection;
484484
use vortex::session::VortexSession;
@@ -540,7 +540,7 @@ mod tests {
540540
let array = ArrayRef::from_arrow(rb, false)?;
541541
let path = Path::parse(path)?;
542542

543-
let mut write = ObjectStoreWriter::new(object_store, &path).await?;
543+
let mut write = ObjectStoreWrite::new(object_store, &path).await?;
544544
let summary = SESSION
545545
.write_options()
546546
.write(&mut write, array.to_array_stream())

vortex-datafusion/src/persistent/reader.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use std::sync::Arc;
77
use datafusion_common::Result as DFResult;
88
use object_store::ObjectStore;
99
use vortex::io::VortexReadAt;
10-
use vortex::io::file::object_store::ObjectStoreSource;
10+
use vortex::io::object_store::ObjectStoreReadAt;
1111
use vortex::io::session::RuntimeSessionExt;
1212
use vortex::session::VortexSession;
1313

@@ -38,7 +38,7 @@ impl VortexReaderFactory for DefaultVortexReaderFactory {
3838
path: &str,
3939
session: &VortexSession,
4040
) -> DFResult<Arc<dyn VortexReadAt>> {
41-
Ok(Arc::new(ObjectStoreSource::new(
41+
Ok(Arc::new(ObjectStoreReadAt::new(
4242
self.object_store.clone(),
4343
path.into(),
4444
session.handle(),

vortex-datafusion/src/persistent/sink.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,8 @@ use vortex::dtype::DType;
3232
use vortex::dtype::arrow::FromArrowType;
3333
use vortex::file::WriteOptionsSessionExt;
3434
use vortex::file::WriteSummary;
35-
use vortex::io::ObjectStoreWriter;
3635
use vortex::io::VortexWrite;
36+
use vortex::io::object_store::ObjectStoreWrite;
3737
use vortex::session::VortexSession;
3838

3939
pub struct VortexSink {
@@ -124,9 +124,9 @@ impl FileSink for VortexSink {
124124

125125
let stream_adapter = ArrayStreamAdapter::new(dtype, stream);
126126

127-
let mut object_writer = ObjectStoreWriter::new(object_store, &path)
127+
let mut object_writer = ObjectStoreWrite::new(object_store, &path)
128128
.await
129-
.map_err(|e| exec_datafusion_err!("Failed to create ObjectStoreWriter: {e}"))?;
129+
.map_err(|e| exec_datafusion_err!("Failed to create ObjectStoreWrite: {e}"))?;
130130

131131
let summary = session
132132
.write_options()

0 commit comments

Comments
 (0)