From d2ba3963cbfaf868c18c5a642e9d30a9739ec368 Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Wed, 6 May 2026 20:39:23 -0400 Subject: [PATCH 1/9] feat(file): add FileStatistics::get_by_name for column-name lookups Per-column stats access by struct field name. Used by the upcoming MultiFileFunction-backed DuckDB scan, where DuckDB's BaseFileReader:: GetStatistics is keyed by name rather than index. Signed-off-by: Nicholas Gates Signed-off-by: Nicholas Gates --- vortex-file/public-api.lock | 2 ++ vortex-file/src/footer/file_statistics.rs | 14 ++++++++++++++ 2 files changed, 16 insertions(+) diff --git a/vortex-file/public-api.lock b/vortex-file/public-api.lock index a20ab092751..a96459bf37d 100644 --- a/vortex-file/public-api.lock +++ b/vortex-file/public-api.lock @@ -140,6 +140,8 @@ pub fn vortex_file::FileStatistics::from_flatbuffer<'a>(&vortex_flatbuffers::foo pub fn vortex_file::FileStatistics::get(&self, usize) -> (&vortex_array::stats::stats_set::StatsSet, &vortex_array::dtype::DType) +pub fn vortex_file::FileStatistics::get_by_name(&self, &vortex_array::dtype::DType, &str) -> core::option::Option<(&vortex_array::stats::stats_set::StatsSet, &vortex_array::dtype::DType)> + pub fn vortex_file::FileStatistics::new(alloc::sync::Arc<[vortex_array::stats::stats_set::StatsSet]>, alloc::sync::Arc<[vortex_array::dtype::DType]>) -> Self pub fn vortex_file::FileStatistics::new_with_dtype(alloc::sync::Arc<[vortex_array::stats::stats_set::StatsSet]>, &vortex_array::dtype::DType) -> Self diff --git a/vortex-file/src/footer/file_statistics.rs b/vortex-file/src/footer/file_statistics.rs index 4fac3ad8482..5abd80339d6 100644 --- a/vortex-file/src/footer/file_statistics.rs +++ b/vortex-file/src/footer/file_statistics.rs @@ -146,6 +146,20 @@ impl FileStatistics { pub fn get(&self, field_idx: usize) -> (&StatsSet, &DType) { (&self.stats[field_idx], &self.dtypes[field_idx]) } + + /// Returns the statistics and data type for a struct field, looked up by name. + /// + /// This is a convenience for callers that key columns by name (e.g. DuckDB's + /// `BaseFileReader::GetStatistics`). Requires `file_dtype` to be a struct so + /// that field names can be matched against the stats indices. + /// + /// Returns `None` if `file_dtype` is not a struct or if `name` does not match + /// a field. + pub fn get_by_name(&self, file_dtype: &DType, name: &str) -> Option<(&StatsSet, &DType)> { + let fields = file_dtype.as_struct_fields_opt()?; + let idx = fields.names().iter().position(|n| n.as_ref() == name)?; + Some(self.get(idx)) + } } impl<'a> IntoIterator for &'a FileStatistics { From 103d32b092d02e3c32ffe4933c9a9ee598050c43 Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Wed, 6 May 2026 20:39:35 -0400 Subject: [PATCH 2/9] feat(duckdb): add MultiFileFunction trait + FFI to DuckDB MultiFileFunction MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Wraps DuckDB's templated MultiFileFunction machinery so an extension can plug in a per-format reader and inherit cross-file orchestration (file globbing, virtual columns, hive partitioning, COPY support) for free. Layered: - cpp/include/duckdb_vx/multi_file_function.h — C-ABI vtable - cpp/multi_file_function.cpp — VortexMultiFileReaderInterface and VortexFileReader subclass MultiFileReaderInterface and BaseFileReader, forwarding virtual calls to the FFI vtable - src/duckdb/multi_file_function.rs — Rust MultiFileFunction + BaseFileReader traits with associated types (mirroring the existing TableFunction shape) and a register_multi_file_function method on DatabaseRef This commit does not yet register a function — the next commit adds VortexMultiFileFunction and wires it up. Signed-off-by: Nicholas Gates Signed-off-by: Nicholas Gates --- vortex-duckdb/build.rs | 3 +- vortex-duckdb/cpp/include/duckdb_vx.h | 1 + .../include/duckdb_vx/multi_file_function.h | 163 +++++++ vortex-duckdb/cpp/multi_file_function.cpp | 417 ++++++++++++++++++ vortex-duckdb/src/duckdb/mod.rs | 2 + .../src/duckdb/multi_file_function.rs | 399 +++++++++++++++++ 6 files changed, 984 insertions(+), 1 deletion(-) create mode 100644 vortex-duckdb/cpp/include/duckdb_vx/multi_file_function.h create mode 100644 vortex-duckdb/cpp/multi_file_function.cpp create mode 100644 vortex-duckdb/src/duckdb/multi_file_function.rs diff --git a/vortex-duckdb/build.rs b/vortex-duckdb/build.rs index e435e9ba063..fa6461c657f 100644 --- a/vortex-duckdb/build.rs +++ b/vortex-duckdb/build.rs @@ -20,7 +20,7 @@ const DUCKDB_SOURCE_COMMIT_URL: &str = "https://github.com/duckdb/duckdb/archive const BUILD_ARTIFACTS: [&str; 3] = ["libduckdb.dylib", "libduckdb.so", "libduckdb_static.a"]; -const SOURCE_FILES: [&str; 17] = [ +const SOURCE_FILES: [&str; 18] = [ "cpp/client_context.cpp", "cpp/config.cpp", "cpp/copy_function.cpp", @@ -30,6 +30,7 @@ const SOURCE_FILES: [&str; 17] = [ "cpp/expr.cpp", "cpp/file_system.cpp", "cpp/logical_type.cpp", + "cpp/multi_file_function.cpp", "cpp/replacement_scan.cpp", "cpp/reusable_dict.cpp", "cpp/scalar_function.cpp", diff --git a/vortex-duckdb/cpp/include/duckdb_vx.h b/vortex-duckdb/cpp/include/duckdb_vx.h index dcad0ae1487..862a334a4e5 100644 --- a/vortex-duckdb/cpp/include/duckdb_vx.h +++ b/vortex-duckdb/cpp/include/duckdb_vx.h @@ -12,6 +12,7 @@ #include "duckdb_vx/expr.h" #include "duckdb_vx/file_system.h" #include "duckdb_vx/logical_type.h" +#include "duckdb_vx/multi_file_function.h" #include "duckdb_vx/reusable_dict.h" #include "duckdb_vx/replacement_scan.h" #include "duckdb_vx/scalar_function.h" diff --git a/vortex-duckdb/cpp/include/duckdb_vx/multi_file_function.h b/vortex-duckdb/cpp/include/duckdb_vx/multi_file_function.h new file mode 100644 index 00000000000..b761a0355d1 --- /dev/null +++ b/vortex-duckdb/cpp/include/duckdb_vx/multi_file_function.h @@ -0,0 +1,163 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +/** + * C ABI for registering a DuckDB MultiFileFunction-backed table function. + * + * Unlike duckdb_vx_tfunc_register (which wraps a single TableFunction), this exposes + * DuckDB's templated MultiFileFunction machinery: file globbing, per-file readers, + * hive partitioning, virtual columns, etc. are all driven by DuckDB itself; the + * extension only supplies a per-format reader. + * + * Owned-pointer convention: every non-null pointer the extension returns is owned by + * DuckDB and must be released by the corresponding free_* callback. Borrowed pointers + * (passed in to callbacks) must not be freed. + */ +#pragma once + +#include "duckdb_vx/data.h" +#include "error.h" +#include "table_function.h" +#include +#include + +#ifdef __cplusplus +extern "C" { +#endif + +// Opaque, extension-owned. Lifetime is tied to the corresponding free_* callback. +typedef struct duckdb_vx_mff_options_ *duckdb_vx_mff_options; +typedef struct duckdb_vx_mff_bind_data_ *duckdb_vx_mff_bind_data; +typedef struct duckdb_vx_mff_global_ *duckdb_vx_mff_global; +typedef struct duckdb_vx_mff_local_ *duckdb_vx_mff_local; +typedef struct duckdb_vx_mff_reader_ *duckdb_vx_mff_reader; + +// Opaque writers populated by the extension during bind. +typedef struct duckdb_vx_mff_schema_writer_ *duckdb_vx_mff_schema_writer; + +/** + * Append a column to the bind schema. The name is copied; the logical type is + * cloned. Both arguments remain owned by the caller. + */ +void duckdb_vx_mff_schema_writer_add_column(duckdb_vx_mff_schema_writer writer, + const char *name, + size_t name_len, + duckdb_logical_type type); + +// vtable mirroring the subset of MultiFileReaderInterface + BaseFileReader we expose. +// All callbacks are required and must be non-null. +typedef struct { + /** Function name, e.g. "read_vortex". Must outlive the registered function. */ + const char *name; + + // --------------------------------------------------------------------- + // Options lifecycle + // --------------------------------------------------------------------- + + /** Create a fresh, default options object. Called once per bind. */ + duckdb_vx_mff_options (*create_options)(duckdb_client_context ctx, duckdb_vx_error *error); + /** Release options created by create_options. Must accept null. */ + void (*free_options)(duckdb_vx_mff_options options); + + // --------------------------------------------------------------------- + // Bind lifecycle + // --------------------------------------------------------------------- + + /** + * Initialize bind data from options. Called once per bind, after options. + * Takes ownership of `options` (must be freed via free_options if the + * extension does not retain it). + */ + duckdb_vx_mff_bind_data (*initialize_bind_data)(duckdb_vx_mff_options options, + duckdb_vx_error *error); + /** Release bind data. Must accept null. */ + void (*free_bind_data)(duckdb_vx_mff_bind_data bind_data); + + /** + * Bind the reader's schema. Called by DuckDB after the first file in the + * file list is known. The extension should open the file (or a metadata- + * only handle) and append result columns via the schema_writer. + * + * `first_file_path` is borrowed (not nul-terminated, length given). + */ + void (*bind_reader)(duckdb_client_context ctx, + duckdb_vx_mff_bind_data bind_data, + const char *first_file_path, + size_t path_len, + duckdb_vx_mff_schema_writer schema_out, + duckdb_vx_error *error); + + // --------------------------------------------------------------------- + // Per-query state lifecycle + // --------------------------------------------------------------------- + + duckdb_vx_mff_global (*init_global)(duckdb_client_context ctx, + duckdb_vx_mff_bind_data bind_data, + duckdb_vx_error *error); + void (*free_global)(duckdb_vx_mff_global global); + + duckdb_vx_mff_local (*init_local)(duckdb_vx_mff_global global); + void (*free_local)(duckdb_vx_mff_local local); + + // --------------------------------------------------------------------- + // Per-file reader lifecycle + // --------------------------------------------------------------------- + + /** + * Open a per-file reader. Called once per file when DuckDB first opens + * that file for scanning. + */ + duckdb_vx_mff_reader (*create_reader)(duckdb_client_context ctx, + duckdb_vx_mff_global global, + duckdb_vx_mff_bind_data bind_data, + const char *file_path, + size_t path_len, + size_t file_idx, + duckdb_vx_error *error); + void (*free_reader)(duckdb_vx_mff_reader reader); + + /** + * Try to initialize a scan over `reader`. Returns true if a scan can begin, + * false if the reader is exhausted. Called with the multi-file global lock + * held; must not block on I/O. + */ + bool (*try_initialize_scan)(duckdb_vx_mff_reader reader, + duckdb_vx_mff_global global, + duckdb_vx_mff_local local, + duckdb_vx_error *error); + + /** + * Produce the next batch of data into `chunk_out`. Returns: + * - true with chunk size > 0 : more data may follow. + * - true with chunk size == 0 : reader is exhausted; DuckDB will move on. + * - false : an error occurred (see error_out). + */ + bool (*scan)(duckdb_vx_mff_reader reader, + duckdb_vx_mff_global global, + duckdb_vx_mff_local local, + duckdb_data_chunk chunk_out, + duckdb_vx_error *error); + + /** + * Get per-column statistics by name. Returns false if no stats are + * available. Same convention as duckdb_vx_tfunc_vtab_t::statistics. + */ + bool (*get_statistics)(duckdb_vx_mff_reader reader, + const char *col_name, + size_t name_len, + duckdb_column_statistics *stats_out); + + /** Scan progress within a file in [0.0, 100.0]. */ + double (*progress_in_file)(duckdb_vx_mff_reader reader); +} duckdb_vx_mff_vtab_t; + +/** + * Register the multi-file function described by `vtab` against `ffi_db`. The + * vtab is copied into a TableFunctionInfo owned by the catalog, so the caller + * may free it after this returns. + */ +duckdb_state duckdb_vx_mff_register(duckdb_database ffi_db, const duckdb_vx_mff_vtab_t *vtab); + +#ifdef __cplusplus +} +#endif diff --git a/vortex-duckdb/cpp/multi_file_function.cpp b/vortex-duckdb/cpp/multi_file_function.cpp new file mode 100644 index 00000000000..79771fdeba3 --- /dev/null +++ b/vortex-duckdb/cpp/multi_file_function.cpp @@ -0,0 +1,417 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +/** + * C++ adapters that bridge a duckdb_vx_mff_vtab_t to DuckDB's MultiFileFunction. + * + * Layered design: + * - VortexBaseFileReaderOptions : BaseFileReaderOptions - opaque options handle + * - VortexFileReader : BaseFileReader - per-file scan adapter + * - VortexMultiFileReaderInterface : MultiFileReaderInterface - cross-file orchestrator + * - VortexMultiFileFunctionOp - OP type for MultiFileFunction + * + * Each adapter holds a non-owning pointer to the registered vtab and an + * extension-owned FFI handle. The FFI handle is freed via the vtab's free_* + * callback in the destructor. + */ + +#include "duckdb_vx/data.hpp" +#include "duckdb_vx/duckdb_diagnostics.h" +#include "duckdb_vx/error.hpp" +#include "duckdb_vx/multi_file_function.h" + +#include +#include + +DUCKDB_INCLUDES_BEGIN +#include "duckdb.h" +#include "duckdb/catalog/catalog.hpp" +#include "duckdb/common/multi_file/base_file_reader.hpp" +#include "duckdb/common/multi_file/multi_file_data.hpp" +#include "duckdb/common/multi_file/multi_file_function.hpp" +#include "duckdb/common/multi_file/multi_file_reader.hpp" +#include "duckdb/common/multi_file/multi_file_states.hpp" +#include "duckdb/main/capi/capi_internal.hpp" +#include "duckdb/parser/parsed_data/create_table_function_info.hpp" +DUCKDB_INCLUDES_END + +using namespace duckdb; +using vortex::IntoErrString; + +namespace { + +/** + * Internal bind data stored on the catalog-owned function. We keep the vtable + * here so per-bind/per-file adapters can find it without a separate registry. + */ +struct VortexMultiFileFunctionInfo : TableFunctionInfo { + explicit VortexMultiFileFunctionInfo(const duckdb_vx_mff_vtab_t &vtab_p) : vtab(vtab_p) { + } + + const duckdb_vx_mff_vtab_t vtab; +}; + +class VortexBaseFileReaderOptions : public BaseFileReaderOptions { +public: + VortexBaseFileReaderOptions(const duckdb_vx_mff_vtab_t &vtab, duckdb_vx_mff_options handle) + : vtab(vtab), handle(handle) { + } + ~VortexBaseFileReaderOptions() override { + if (handle) { + vtab.free_options(handle); + } + } + + /** Release ownership of the FFI handle to the caller. */ + duckdb_vx_mff_options Release() { + auto out = handle; + handle = nullptr; + return out; + } + + const duckdb_vx_mff_vtab_t &vtab; + +private: + duckdb_vx_mff_options handle; +}; + +/** + * Bind data attached to the MultiFileBindData. Holds the FFI bind-data handle + * for the lifetime of the prepared statement. + */ +struct VortexMultiFileBindData : public TableFunctionData { + VortexMultiFileBindData(const duckdb_vx_mff_vtab_t &vtab, duckdb_vx_mff_bind_data handle) + : vtab(vtab), handle(handle) { + } + ~VortexMultiFileBindData() override { + if (handle) { + vtab.free_bind_data(handle); + } + } + + bool SupportStatementCache() const override { + return false; + } + + const duckdb_vx_mff_vtab_t &vtab; + duckdb_vx_mff_bind_data handle; +}; + +/** + * Global state for a single multi-file scan. Distinct from MultiFileGlobalState + * (which DuckDB owns); this is the *interface*-owned global state slot. + */ +class VortexInterfaceGlobalState : public GlobalTableFunctionState { +public: + VortexInterfaceGlobalState(const duckdb_vx_mff_vtab_t &vtab, duckdb_vx_mff_global handle) + : vtab(vtab), handle(handle) { + } + ~VortexInterfaceGlobalState() override { + if (handle) { + vtab.free_global(handle); + } + } + + const duckdb_vx_mff_vtab_t &vtab; + duckdb_vx_mff_global handle; +}; + +class VortexInterfaceLocalState : public LocalTableFunctionState { +public: + VortexInterfaceLocalState(const duckdb_vx_mff_vtab_t &vtab, duckdb_vx_mff_local handle) + : vtab(vtab), handle(handle) { + } + ~VortexInterfaceLocalState() override { + if (handle) { + vtab.free_local(handle); + } + } + + const duckdb_vx_mff_vtab_t &vtab; + duckdb_vx_mff_local handle; +}; + +/** + * Per-file reader adapter. DuckDB's MultiFileFunction drives one of these + * per opened file; each Scan call asks the extension for the next chunk. + */ +class VortexFileReader : public BaseFileReader { +public: + VortexFileReader(OpenFileInfo file_p, + const duckdb_vx_mff_vtab_t &vtab_p, + duckdb_vx_mff_reader handle_p) + : BaseFileReader(std::move(file_p)), vtab(vtab_p), handle(handle_p) { + } + ~VortexFileReader() override { + if (handle) { + vtab.free_reader(handle); + } + } + + string GetReaderType() const override { + return "vortex"; + } + + bool TryInitializeScan(ClientContext &, + GlobalTableFunctionState &gstate, + LocalTableFunctionState &lstate) override { + auto &g = gstate.Cast(); + auto &l = lstate.Cast(); + duckdb_vx_error error_out = nullptr; + const bool ok = vtab.try_initialize_scan(handle, g.handle, l.handle, &error_out); + if (error_out) { + throw IOException(IntoErrString(error_out)); + } + return ok; + } + + AsyncResult Scan(ClientContext &, + GlobalTableFunctionState &gstate, + LocalTableFunctionState &lstate, + DataChunk &chunk) override { + auto &g = gstate.Cast(); + auto &l = lstate.Cast(); + duckdb_vx_error error_out = nullptr; + auto chunk_handle = reinterpret_cast(&chunk); + const bool ok = vtab.scan(handle, g.handle, l.handle, chunk_handle, &error_out); + if (!ok || error_out) { + throw IOException(IntoErrString(error_out)); + } + // Translate "0 rows" into FINISHED so the multi-file scanner advances + // to the next file. Otherwise, signal we may have more. + return chunk.size() == 0 ? AsyncResult(SourceResultType::FINISHED) + : AsyncResult(SourceResultType::HAVE_MORE_OUTPUT); + } + + unique_ptr GetStatistics(ClientContext &, const string &name) override { + duckdb_column_statistics stats = {}; + if (!vtab.get_statistics(handle, name.c_str(), name.size(), &stats)) { + return nullptr; + } + // Materialize into a BaseStatistics matching the column's type. We reuse the + // logic in cpp/table_function.cpp by constructing an Unknown stats and setting + // the bits we have. Because we don't carry the type here (BaseFileReader has + // it via columns), look it up. + for (auto &col : columns) { + if (col.name != name) { + continue; + } + BaseStatistics out = BaseStatistics::CreateUnknown(col.type); + if (stats.min) { + auto min_val = *reinterpret_cast(stats.min); + duckdb_destroy_value(&stats.min); + if (col.type.IsNumeric()) { + NumericStats::SetMin(out, min_val); + } else if (col.type.id() == LogicalTypeId::VARCHAR || + col.type.id() == LogicalTypeId::BLOB) { + StringStats::SetMin(out, StringValue::Get(min_val)); + } + } + if (stats.max) { + auto max_val = *reinterpret_cast(stats.max); + duckdb_destroy_value(&stats.max); + if (col.type.IsNumeric()) { + NumericStats::SetMax(out, max_val); + } else if (col.type.id() == LogicalTypeId::VARCHAR || + col.type.id() == LogicalTypeId::BLOB) { + StringStats::SetMax(out, StringValue::Get(max_val)); + } + } + if (!stats.has_null) { + out.Set(StatsInfo::CANNOT_HAVE_NULL_VALUES); + } + return out.ToUnique(); + } + return nullptr; + } + + double GetProgressInFile(ClientContext &) override { + return vtab.progress_in_file(handle); + } + +private: + const duckdb_vx_mff_vtab_t &vtab; + duckdb_vx_mff_reader handle; +}; + +/** + * Cross-file orchestrator. Implements only the methods the basic scan pipeline + * needs; everything else (hive partitioning, COPY, union-by-name, virtual cols) + * defaults to the base-class behaviour. + */ +class VortexMultiFileReaderInterface : public MultiFileReaderInterface { +public: + explicit VortexMultiFileReaderInterface(const duckdb_vx_mff_vtab_t &vtab_p) : vtab(vtab_p) { + } + + unique_ptr InitializeOptions(ClientContext &context, + optional_ptr) override { + duckdb_vx_error error_out = nullptr; + auto ctx = reinterpret_cast(&context); + auto handle = vtab.create_options(ctx, &error_out); + if (error_out) { + throw BinderException(IntoErrString(error_out)); + } + return make_uniq(vtab, handle); + } + + bool ParseCopyOption(ClientContext &, const string &, const vector &, + BaseFileReaderOptions &, vector &, vector &) override { + return false; + } + + bool ParseOption(ClientContext &, const string &, const Value &, MultiFileOptions &, + BaseFileReaderOptions &) override { + return false; + } + + unique_ptr InitializeBindData(MultiFileBindData &, + unique_ptr options) override { + auto &vortex_options = options->Cast(); + // Take ownership of the options handle and pass it to the FFI. + duckdb_vx_error error_out = nullptr; + auto bind_handle = vtab.initialize_bind_data(vortex_options.Release(), &error_out); + if (error_out) { + throw BinderException(IntoErrString(error_out)); + } + return make_uniq(vtab, bind_handle); + } + + void BindReader(ClientContext &context, vector &return_types, vector &names, + MultiFileBindData &bind_data) override { + auto first_file = bind_data.file_list->GetFirstFile(); + auto &vortex_bind = bind_data.bind_data->Cast(); + + // Schema collection writer: a pair of vectors that the FFI populates. + struct SchemaWriter { + vector &names; + vector &types; + }; + SchemaWriter writer = {names, return_types}; + + duckdb_vx_error error_out = nullptr; + auto ctx = reinterpret_cast(&context); + vtab.bind_reader(ctx, vortex_bind.handle, first_file.path.c_str(), first_file.path.size(), + reinterpret_cast(&writer), &error_out); + if (error_out) { + throw BinderException(IntoErrString(error_out)); + } + } + + unique_ptr InitializeGlobalState(ClientContext &context, + MultiFileBindData &bind_data, + MultiFileGlobalState &) override { + auto &vortex_bind = bind_data.bind_data->Cast(); + duckdb_vx_error error_out = nullptr; + auto ctx = reinterpret_cast(&context); + auto handle = vtab.init_global(ctx, vortex_bind.handle, &error_out); + if (error_out) { + throw BinderException(IntoErrString(error_out)); + } + return make_uniq(vtab, handle); + } + + unique_ptr InitializeLocalState(ExecutionContext &, + GlobalTableFunctionState &gstate) override { + auto &g = gstate.Cast(); + auto handle = vtab.init_local(g.handle); + return make_uniq(vtab, handle); + } + + shared_ptr CreateReader(ClientContext &, GlobalTableFunctionState &, BaseUnionData &, + const MultiFileBindData &) override { + // UNION BY NAME path - not supported yet. + throw NotImplementedException("UNION BY NAME is not yet supported by the Vortex multi-file function"); + } + + shared_ptr CreateReader(ClientContext &context, GlobalTableFunctionState &gstate, + const OpenFileInfo &file, idx_t file_idx, + const MultiFileBindData &bind_data) override { + auto &vortex_bind = bind_data.bind_data->Cast(); + auto &vortex_g = gstate.Cast(); + duckdb_vx_error error_out = nullptr; + auto ctx = reinterpret_cast(&context); + auto handle = vtab.create_reader(ctx, vortex_g.handle, vortex_bind.handle, file.path.c_str(), + file.path.size(), file_idx, &error_out); + if (error_out) { + throw IOException(IntoErrString(error_out)); + } + auto reader = make_shared_ptr(file, vtab, handle); + // BaseFileReader exposes its file-local schema via the `columns` field; + // the multi-file reader uses it to build the global<->local column + // mapping. We don't yet support per-file schema variation, so inherit + // the bind-time global schema directly. + reader->columns = bind_data.columns; + return reader; + } + + unique_ptr Copy() override { + return make_uniq(vtab); + } + +private: + const duckdb_vx_mff_vtab_t &vtab; +}; + +/** + * The OP type required by MultiFileFunction. Holds a pointer to the vtab so + * CreateInterface can construct a VortexMultiFileReaderInterface bound to it. + */ +struct VortexMultiFileFunctionOp { + static const duckdb_vx_mff_vtab_t *current_vtab; + + static unique_ptr CreateInterface(ClientContext &) { + if (!current_vtab) { + throw InternalException("VortexMultiFileFunctionOp::CreateInterface called without a registered vtab"); + } + return make_uniq(*current_vtab); + } +}; + +const duckdb_vx_mff_vtab_t *VortexMultiFileFunctionOp::current_vtab = nullptr; + +} // namespace + +extern "C" void duckdb_vx_mff_schema_writer_add_column(duckdb_vx_mff_schema_writer writer, + const char *name, + size_t name_len, + duckdb_logical_type type) { + struct SchemaWriter { + vector &names; + vector &types; + }; + auto &w = *reinterpret_cast(writer); + w.names.emplace_back(name, name_len); + w.types.emplace_back(*reinterpret_cast(type)); +} + +extern "C" duckdb_state duckdb_vx_mff_register(duckdb_database ffi_db, const duckdb_vx_mff_vtab_t *vtab) { + D_ASSERT(ffi_db); + D_ASSERT(vtab); + + const auto &wrapper = *reinterpret_cast(ffi_db); + auto &db = *wrapper.database->instance; + + // Capture the vtab pointer so MultiFileFunction::MultiFileBind can find it + // when DuckDB re-enters CreateInterface during bind. The catalog will also + // hold a copy via TableFunctionInfo so this stays alive for the lifetime of + // the registered function. + auto info = make_shared_ptr(*vtab); + VortexMultiFileFunctionOp::current_vtab = &info->vtab; + + MultiFileFunction mff(vtab->name); + mff.function_info = info; + + try { + auto &system_catalog = Catalog::GetSystemCatalog(db); + auto data = CatalogTransaction::GetSystemTransaction(db); + CreateTableFunctionInfo tf_info(mff); + tf_info.on_conflict = OnCreateConflict::ALTER_ON_CONFLICT; + system_catalog.CreateFunction(data, tf_info); + } catch (const std::exception &e) { + ErrorData err(e); + DUCKDB_LOG_ERROR(db, "Failed to create Vortex multi-file function:\t" + err.Message()); + return DuckDBError; + } + return DuckDBSuccess; +} diff --git a/vortex-duckdb/src/duckdb/mod.rs b/vortex-duckdb/src/duckdb/mod.rs index c42fbdaf1e4..eb595633ec1 100644 --- a/vortex-duckdb/src/duckdb/mod.rs +++ b/vortex-duckdb/src/duckdb/mod.rs @@ -13,6 +13,7 @@ mod expr; mod file_system; mod logical_type; mod macro_; +mod multi_file_function; mod query_result; mod reusable_dict; mod scalar_function; @@ -37,6 +38,7 @@ pub use ddb_string::*; pub use expr::*; pub use file_system::*; pub use logical_type::*; +pub use multi_file_function::*; pub use query_result::*; pub use reusable_dict::*; pub use scalar_function::*; diff --git a/vortex-duckdb/src/duckdb/multi_file_function.rs b/vortex-duckdb/src/duckdb/multi_file_function.rs new file mode 100644 index 00000000000..52c1fd35c45 --- /dev/null +++ b/vortex-duckdb/src/duckdb/multi_file_function.rs @@ -0,0 +1,399 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! Rust-side wrapper for DuckDB's `MultiFileFunction` template. +//! +//! Lets a table-function-like type plug into DuckDB's native multi-file machinery +//! (file globbing, virtual columns, hive partitioning, COPY support, etc.) by +//! supplying only what's format-specific: how to open a file, how to read its +//! schema, and how to scan a chunk. The cross-file orchestration is owned by +//! DuckDB. +//! +//! The trait pair mirrors the DuckDB C++ surface: +//! - [`MultiFileFunction`] ↔ `MultiFileReaderInterface` +//! - [`BaseFileReader`] ↔ `BaseFileReader` +//! +//! Both are non-object-safe with associated types so each implementation gets +//! statically-monomorphised callbacks (no per-call dyn dispatch). + +use std::ffi::CStr; +use std::ffi::CString; +use std::fmt::Debug; +use std::ptr; +use std::slice; + +use vortex::error::VortexExpect; +use vortex::error::VortexResult; +use vortex::error::vortex_err; + +use crate::cpp; +use crate::duckdb::ClientContext; +use crate::duckdb::ClientContextRef; +use crate::duckdb::ColumnStatistics; +use crate::duckdb::DataChunk; +use crate::duckdb::DataChunkRef; +use crate::duckdb::DatabaseRef; +use crate::duckdb::LogicalTypeRef; +use crate::duckdb::try_or; +use crate::duckdb_try; + +/// A table function backed by DuckDB's `MultiFileFunction` template. +/// +/// Implementors describe a per-format reader; DuckDB owns the cross-file +/// orchestration (globbing, parallelism, virtual columns). +pub trait MultiFileFunction: Sized + Debug { + /// Per-format options collected from the `TABLE(...)` named parameters. + /// For minimal implementations this can be a unit struct. + type ReaderOptions: Send + Sync; + + /// Bind-time data, populated from options and (the schema of) the first + /// file. Must be `Send` because DuckDB may move it across threads. + type BindData: Send; + + /// Global state for one query invocation. Shared across worker threads. + type GlobalState: Send + Sync; + + /// Per-thread local state. + type LocalState; + + /// Per-file reader. Created when DuckDB first opens a file, dropped when + /// scanning of that file finishes. + type Reader: BaseFileReader; + + /// Construct default options. Called once per bind. + fn create_options(ctx: &ClientContextRef) -> VortexResult; + + /// Build bind data from options. Takes ownership of the options struct. + fn initialize_bind_data(options: Self::ReaderOptions) -> VortexResult; + + /// Populate the result schema. DuckDB picks the first file in the file list + /// to bind against; the implementation should open it (cheaply, metadata- + /// only if possible) and append columns to `schema`. + fn bind_reader( + ctx: &ClientContextRef, + bind_data: &Self::BindData, + first_file: &str, + schema: &mut SchemaBuilder, + ) -> VortexResult<()>; + + /// Initialize global state for one query. + fn init_global( + ctx: &ClientContextRef, + bind_data: &Self::BindData, + ) -> VortexResult; + + /// Initialize per-thread state. + fn init_local(global: &Self::GlobalState) -> Self::LocalState; + + /// Open a per-file reader. Called once per file, on the thread that won the + /// race to open it. + fn create_reader( + ctx: &ClientContextRef, + global: &Self::GlobalState, + bind_data: &Self::BindData, + file_path: &str, + file_idx: usize, + ) -> VortexResult; +} + +/// Per-file reader contract. Implementations are owned by DuckDB once handed +/// off via [`MultiFileFunction::create_reader`] and dropped when scanning of +/// that file completes. +/// +/// Note: this trait is intentionally not `Send`. DuckDB's `MultiFileFunction` +/// guarantees one-thread-at-a-time access to a given reader (it threads them +/// through `MultiFileLocalState` and acquires per-file locks on transitions); +/// requiring `Send` here would force `BaseFileReader` impls to wrap their +/// scan iterators in synchronization primitives unnecessarily. +pub trait BaseFileReader { + type GlobalState; + type LocalState; + + /// Set up scan state for the next batch. Called under DuckDB's per-file + /// lock; should not block on I/O. Return `false` once exhausted. + fn try_initialize_scan( + &mut self, + global: &Self::GlobalState, + local: &mut Self::LocalState, + ) -> VortexResult; + + /// Produce the next batch into `chunk`. Setting `chunk` to size 0 signals + /// end-of-file; otherwise non-empty implies more may follow. + fn scan( + &mut self, + global: &Self::GlobalState, + local: &mut Self::LocalState, + chunk: &mut DataChunkRef, + ) -> VortexResult<()>; + + /// Per-column statistics by name. Default returns `None`. + fn get_statistics(&self, _name: &str) -> Option { + None + } + + /// Scan progress within this file in `[0.0, 100.0]`. Default `0.0`. + fn progress_in_file(&self) -> f64 { + 0.0 + } +} + +/// Append-only schema builder passed to [`MultiFileFunction::bind_reader`]. +/// +/// Wraps the C++ `vector` / `vector` pair via +/// `duckdb_vx_mff_schema_writer_add_column`. +pub struct SchemaBuilder { + raw: cpp::duckdb_vx_mff_schema_writer, +} + +impl SchemaBuilder { + /// Append `(name, type)` to the result schema. + pub fn add_column(&mut self, name: &str, logical_type: &LogicalTypeRef) { + unsafe { + cpp::duckdb_vx_mff_schema_writer_add_column( + self.raw, + name.as_ptr().cast(), + name.len(), + logical_type.as_ptr(), + ); + } + } +} + +impl DatabaseRef { + /// Register `T` as a multi-file table function on this database under + /// `name`. + /// + /// The vtable is statically derived from `T` and copied into a C++ + /// `TableFunctionInfo` owned by the catalog; `T` itself is never instanced. + pub fn register_multi_file_function( + &self, + name: &CStr, + ) -> VortexResult<()> { + let vtab = cpp::duckdb_vx_mff_vtab_t { + name: name.as_ptr(), + create_options: Some(create_options::), + free_options: Some(free_options::), + initialize_bind_data: Some(initialize_bind_data::), + free_bind_data: Some(free_bind_data::), + bind_reader: Some(bind_reader::), + init_global: Some(init_global::), + free_global: Some(free_global::), + init_local: Some(init_local::), + free_local: Some(free_local::), + create_reader: Some(create_reader::), + free_reader: Some(free_reader::), + try_initialize_scan: Some(try_initialize_scan::), + scan: Some(scan::), + get_statistics: Some(get_statistics::), + progress_in_file: Some(progress_in_file::), + }; + + duckdb_try!( + unsafe { cpp::duckdb_vx_mff_register(self.as_ptr(), &raw const vtab) }, + "Failed to register multi-file function '{}'", + name.to_string_lossy() + ); + + Ok(()) + } +} + +// --------------------------------------------------------------------------- +// FFI shim: each callback boxes/unboxes the trait's associated type and +// dispatches to the corresponding trait method. +// --------------------------------------------------------------------------- + +unsafe extern "C-unwind" fn create_options( + ctx: cpp::duckdb_client_context, + error_out: *mut cpp::duckdb_vx_error, +) -> cpp::duckdb_vx_mff_options { + let ctx = unsafe { ClientContext::borrow(ctx) }; + try_or(error_out, || { + let opts = T::create_options(ctx)?; + Ok(Box::into_raw(Box::new(opts)).cast()) + }) +} + +unsafe extern "C-unwind" fn free_options(opts: cpp::duckdb_vx_mff_options) { + if !opts.is_null() { + drop(unsafe { Box::from_raw(opts.cast::()) }); + } +} + +unsafe extern "C-unwind" fn initialize_bind_data( + opts: cpp::duckdb_vx_mff_options, + error_out: *mut cpp::duckdb_vx_error, +) -> cpp::duckdb_vx_mff_bind_data { + let opts = unsafe { Box::from_raw(opts.cast::()) }; + try_or(error_out, || { + let bind_data = T::initialize_bind_data(*opts)?; + Ok(Box::into_raw(Box::new(bind_data)).cast()) + }) +} + +unsafe extern "C-unwind" fn free_bind_data( + bind_data: cpp::duckdb_vx_mff_bind_data, +) { + if !bind_data.is_null() { + drop(unsafe { Box::from_raw(bind_data.cast::()) }); + } +} + +unsafe extern "C-unwind" fn bind_reader( + ctx: cpp::duckdb_client_context, + bind_data: cpp::duckdb_vx_mff_bind_data, + file_path: *const std::os::raw::c_char, + path_len: usize, + schema_writer: cpp::duckdb_vx_mff_schema_writer, + error_out: *mut cpp::duckdb_vx_error, +) { + let ctx = unsafe { ClientContext::borrow(ctx) }; + let bind_data = + unsafe { bind_data.cast::().as_ref() }.vortex_expect("bind_data null"); + let mut builder = SchemaBuilder { raw: schema_writer }; + try_or(error_out, || { + let path_bytes = unsafe { slice::from_raw_parts(file_path.cast::(), path_len) }; + let path = std::str::from_utf8(path_bytes) + .map_err(|e| vortex_err!("file path is not UTF-8: {e}"))?; + T::bind_reader(ctx, bind_data, path, &mut builder) + }) +} + +unsafe extern "C-unwind" fn init_global( + ctx: cpp::duckdb_client_context, + bind_data: cpp::duckdb_vx_mff_bind_data, + error_out: *mut cpp::duckdb_vx_error, +) -> cpp::duckdb_vx_mff_global { + let ctx = unsafe { ClientContext::borrow(ctx) }; + let bind_data = + unsafe { bind_data.cast::().as_ref() }.vortex_expect("bind_data null"); + try_or(error_out, || { + let global = T::init_global(ctx, bind_data)?; + Ok(Box::into_raw(Box::new(global)).cast()) + }) +} + +unsafe extern "C-unwind" fn free_global(global: cpp::duckdb_vx_mff_global) { + if !global.is_null() { + drop(unsafe { Box::from_raw(global.cast::()) }); + } +} + +unsafe extern "C-unwind" fn init_local( + global: cpp::duckdb_vx_mff_global, +) -> cpp::duckdb_vx_mff_local { + let global = unsafe { global.cast::().as_ref() }.vortex_expect("global null"); + let local = T::init_local(global); + Box::into_raw(Box::new(local)).cast() +} + +unsafe extern "C-unwind" fn free_local(local: cpp::duckdb_vx_mff_local) { + if !local.is_null() { + drop(unsafe { Box::from_raw(local.cast::()) }); + } +} + +#[allow(clippy::too_many_arguments)] +unsafe extern "C-unwind" fn create_reader( + ctx: cpp::duckdb_client_context, + global: cpp::duckdb_vx_mff_global, + bind_data: cpp::duckdb_vx_mff_bind_data, + file_path: *const std::os::raw::c_char, + path_len: usize, + file_idx: usize, + error_out: *mut cpp::duckdb_vx_error, +) -> cpp::duckdb_vx_mff_reader { + let ctx = unsafe { ClientContext::borrow(ctx) }; + let global = unsafe { global.cast::().as_ref() }.vortex_expect("global null"); + let bind_data = + unsafe { bind_data.cast::().as_ref() }.vortex_expect("bind_data null"); + try_or(error_out, || { + let path_bytes = unsafe { slice::from_raw_parts(file_path.cast::(), path_len) }; + let path = std::str::from_utf8(path_bytes) + .map_err(|e| vortex_err!("file path is not UTF-8: {e}"))?; + let reader = T::create_reader(ctx, global, bind_data, path, file_idx)?; + Ok(Box::into_raw(Box::new(reader)).cast()) + }) +} + +unsafe extern "C-unwind" fn free_reader(reader: cpp::duckdb_vx_mff_reader) { + if !reader.is_null() { + drop(unsafe { Box::from_raw(reader.cast::()) }); + } +} + +unsafe extern "C-unwind" fn try_initialize_scan( + reader: cpp::duckdb_vx_mff_reader, + global: cpp::duckdb_vx_mff_global, + local: cpp::duckdb_vx_mff_local, + error_out: *mut cpp::duckdb_vx_error, +) -> bool { + let reader = unsafe { reader.cast::().as_mut() }.vortex_expect("reader null"); + let global = unsafe { global.cast::().as_ref() }.vortex_expect("global null"); + let local = unsafe { local.cast::().as_mut() }.vortex_expect("local null"); + try_or(error_out, || reader.try_initialize_scan(global, local)) +} + +unsafe extern "C-unwind" fn scan( + reader: cpp::duckdb_vx_mff_reader, + global: cpp::duckdb_vx_mff_global, + local: cpp::duckdb_vx_mff_local, + chunk: cpp::duckdb_data_chunk, + error_out: *mut cpp::duckdb_vx_error, +) -> bool { + let reader = unsafe { reader.cast::().as_mut() }.vortex_expect("reader null"); + let global = unsafe { global.cast::().as_ref() }.vortex_expect("global null"); + let local = unsafe { local.cast::().as_mut() }.vortex_expect("local null"); + let chunk_ref = unsafe { DataChunk::borrow_mut(chunk) }; + match reader.scan(global, local, chunk_ref) { + Ok(()) => { + unsafe { error_out.write(ptr::null_mut()) }; + true + } + Err(err) => { + let msg = err.to_string(); + unsafe { error_out.write(cpp::duckdb_vx_error_create(msg.as_ptr().cast(), msg.len())) }; + false + } + } +} + +unsafe extern "C-unwind" fn get_statistics( + reader: cpp::duckdb_vx_mff_reader, + name: *const std::os::raw::c_char, + name_len: usize, + stats_out: *mut cpp::duckdb_column_statistics, +) -> bool { + let reader = unsafe { reader.cast::().as_ref() }.vortex_expect("reader null"); + let name = unsafe { slice::from_raw_parts(name.cast::(), name_len) }; + let Ok(name) = std::str::from_utf8(name) else { + return false; + }; + let Some(stats) = reader.get_statistics(name) else { + return false; + }; + let out = unsafe { &mut *stats_out }; + out.min = stats.min.map_or(ptr::null_mut(), |v| v.into_ptr()); + out.max = stats.max.map_or(ptr::null_mut(), |v| v.into_ptr()); + out.max_string_length = stats.max_string_length; + out.has_null = stats.has_null; + true +} + +unsafe extern "C-unwind" fn progress_in_file( + reader: cpp::duckdb_vx_mff_reader, +) -> f64 { + let reader = unsafe { reader.cast::().as_ref() }.vortex_expect("reader null"); + reader.progress_in_file() +} + +// --------------------------------------------------------------------------- +// Helpers used by Phase 5 (concrete implementations). +// --------------------------------------------------------------------------- + +/// Build a `CStr` literal-equivalent at runtime. Convenient for type names +/// passed to `LogicalType` / DuckDB FFI. +#[allow(dead_code)] +pub(crate) fn cstring(s: &str) -> CString { + CString::new(s).unwrap_or_else(|_| CString::default()) +} From e1de4e5001b045264569ab8026ed19cbd316144c Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Wed, 6 May 2026 20:39:47 -0400 Subject: [PATCH 3/9] feat(duckdb): VortexMultiFileFunction + read_vortex_v2 + env-var swap Concrete MultiFileFunction implementation that drives a per-file scan via VortexFile directly (rather than going through MultiLayoutDataSource), making file-level statistics, dtype, and pruning available without LayoutReader downcasts. Registration: - read_vortex_v2 is always registered for direct comparison. - VX_DUCKDB_MULTI_FILE_FUNCTION=1 swaps read_vortex / vortex_scan over to the v2 path so existing benchmarks and SQL can run unchanged. Smoke tests cover single-file, strings, and multi-file glob. Known v2 gaps vs the existing TableFunction-backed scan (documented on use_multi_file_function): no projection or filter pushdown; no Vortex filesystem integration; no list-of-paths overload; no union_by_name. The v2 path is intended for benchmarking the orchestration layer first; parity work is follow-up. Signed-off-by: Nicholas Gates Signed-off-by: Nicholas Gates --- .../src/e2e_test/vortex_scan_test.rs | 63 ++++ vortex-duckdb/src/lib.rs | 49 ++- vortex-duckdb/src/multi_file_function.rs | 297 ++++++++++++++++++ 3 files changed, 404 insertions(+), 5 deletions(-) create mode 100644 vortex-duckdb/src/multi_file_function.rs diff --git a/vortex-duckdb/src/e2e_test/vortex_scan_test.rs b/vortex-duckdb/src/e2e_test/vortex_scan_test.rs index 8e65d26ed6f..15fbf7d7143 100644 --- a/vortex-duckdb/src/e2e_test/vortex_scan_test.rs +++ b/vortex-duckdb/src/e2e_test/vortex_scan_test.rs @@ -993,3 +993,66 @@ fn test_vortex_encodings_roundtrip() { let fixed_child_values = fixed_child.as_slice_with_len::(10); // 10 total child elements assert_eq!(fixed_child_values, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]); } + +#[test] +fn test_read_vortex_v2_basic() { + let file = RUNTIME.block_on(async { + let numbers = buffer![1i32, 2, 3, 4, 5]; + write_single_column_vortex_file("number", numbers).await + }); + let conn = database_connection(); + let file_path = file.path().to_string_lossy(); + let result = conn + .query(&format!( + "SELECT SUM(number) FROM read_vortex_v2('{file_path}')" + )) + .unwrap(); + let chunk = result.into_iter().next().unwrap(); + let vec = chunk.get_vector(0); + let sum = vec.as_slice_with_len::(chunk.len().as_())[0]; + assert_eq!(sum, 15); +} + +#[test] +fn test_read_vortex_v2_strings() { + let file = RUNTIME.block_on(async { + let strings = VarBinArray::from(vec!["alpha", "beta", "gamma"]); + write_single_column_vortex_file("s", strings).await + }); + let conn = database_connection(); + let file_path = file.path().to_string_lossy(); + let result = conn + .query(&format!( + "SELECT string_agg(s, ',') FROM read_vortex_v2('{file_path}')" + )) + .unwrap(); + let mut chunk = result.into_iter().next().unwrap(); + let len = chunk.len().as_(); + let vec = chunk.get_vector_mut(0); + let mut s = unsafe { vec.as_slice_mut::(len) }[0]; + let path = unsafe { CStr::from_ptr(cpp::duckdb_string_t_data(&raw mut s)).to_string_lossy() }; + let aggregated: String = path.into_owned(); + assert_eq!(aggregated, "alpha,beta,gamma"); +} + +#[test] +fn test_read_vortex_v2_multiple_files() { + let (tempdir, _f1, _f2) = RUNTIME.block_on(async { + let tempdir = tempfile::tempdir().unwrap(); + let f1 = write_vortex_file_to_dir(tempdir.path(), "numbers", buffer![10i32, 20, 30]).await; + let f2 = write_vortex_file_to_dir(tempdir.path(), "numbers", buffer![40i32, 50, 60]).await; + (tempdir, f1, f2) + }); + + let glob_pattern = format!("{}/*.vortex", tempdir.path().display()); + let conn = database_connection(); + let result = conn + .query(&format!( + "SELECT SUM(numbers) FROM read_vortex_v2('{glob_pattern}')" + )) + .unwrap(); + let chunk = result.into_iter().next().unwrap(); + let vec = chunk.get_vector(0); + let total = vec.as_slice_with_len::(chunk.len().as_())[0]; + assert_eq!(total, 210); +} diff --git a/vortex-duckdb/src/lib.rs b/vortex-duckdb/src/lib.rs index 413a71c611e..ccb2e406225 100644 --- a/vortex-duckdb/src/lib.rs +++ b/vortex-duckdb/src/lib.rs @@ -22,6 +22,7 @@ use crate::duckdb::LogicalType; use crate::duckdb::Value; use crate::multi_file::VortexMultiFileScan; use crate::multi_file::VortexMultiFileScanList; +use crate::multi_file_function::VortexMultiFileFunction; mod convert; mod datasource; @@ -29,6 +30,7 @@ pub mod duckdb; mod exporter; mod filesystem; mod multi_file; +mod multi_file_function; #[rustfmt::skip] #[path = "./cpp.rs"] @@ -45,6 +47,31 @@ static RUNTIME: LazyLock = LazyLock::new(CurrentThreadRunt static SESSION: LazyLock = LazyLock::new(|| VortexSession::default().with_handle(RUNTIME.handle())); +/// Returns true if the user has opted into the experimental MultiFileFunction- +/// backed scan path via `VX_DUCKDB_MULTI_FILE_FUNCTION=1` (or `=true`). +/// +/// Used to switch between the existing TableFunction-driven `read_vortex` and +/// the new `MultiFileFunction`-driven path during benchmarking. Defaults +/// to off so the existing scan remains the path of record. +/// +/// Known gaps in the v2 path (compared to v1) at time of writing: +/// - no projection pushdown: scans always read every column, which mis-sizes +/// output chunks when DuckDB only requested a subset. +/// - no Vortex filesystem integration: the `vortex_filesystem` extension +/// option is ignored; DuckDB's filesystem opens the files. +/// - no `read_vortex(['a.vortex','b.vortex'])` list-of-paths overload. +/// - no filter pushdown into `Vortex` scans (DuckDB still applies filters +/// after reading). +/// - no support for `union_by_name`. +/// These are tracked as follow-up work; for now `read_vortex_v2` exists for +/// benchmark comparisons of the orchestration layer rather than feature parity. +fn use_multi_file_function() -> bool { + matches!( + std::env::var("VX_DUCKDB_MULTI_FILE_FUNCTION").as_deref(), + Ok("1") | Ok("true") | Ok("TRUE") + ) +} + /// Initialize the Vortex extension by registering the extension functions. /// Note: This also registers extension options. If you want to register options /// separately (e.g., before creating connections), call `register_extension_options` first. @@ -55,11 +82,23 @@ pub fn initialize(db: &DatabaseRef) -> VortexResult<()> { LogicalType::varchar(), Value::from("vortex"), )?; - db.register_table_function::(c"vortex_scan")?; - db.register_table_function::(c"read_vortex")?; - // Register list overloads for multi-glob scanning (e.g., read_vortex(['a.vortex', 'b.vortex'])) - db.register_table_function::(c"vortex_scan")?; - db.register_table_function::(c"read_vortex")?; + if use_multi_file_function() { + // Replace the table-function-based scan with the MultiFileFunction + // path under the canonical names. Also expose under v2 names so an A/B + // test can run both registrations side-by-side. + db.register_multi_file_function::(c"vortex_scan")?; + db.register_multi_file_function::(c"read_vortex")?; + } else { + db.register_table_function::(c"vortex_scan")?; + db.register_table_function::(c"read_vortex")?; + // Register list overloads for multi-glob scanning (e.g., read_vortex(['a.vortex', 'b.vortex'])) + db.register_table_function::(c"vortex_scan")?; + db.register_table_function::(c"read_vortex")?; + } + // Always expose the v2 path under its own name so it can be invoked + // explicitly without flipping the env var (useful for A/B testing within + // a single process). + db.register_multi_file_function::(c"read_vortex_v2")?; db.register_copy_function::(c"vortex", c"vortex") } diff --git a/vortex-duckdb/src/multi_file_function.rs b/vortex-duckdb/src/multi_file_function.rs new file mode 100644 index 00000000000..fdf8b2d61e1 --- /dev/null +++ b/vortex-duckdb/src/multi_file_function.rs @@ -0,0 +1,297 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! Vortex implementation of [`MultiFileFunction`]. +//! +//! Plugs Vortex into DuckDB's `MultiFileFunction` template: cross-file +//! orchestration (globbing, parallelism, virtual columns, hive partitioning) +//! is handled by DuckDB. This module supplies the per-file reader using +//! [`VortexFile`] directly so file-level statistics, dtype, and pruning are +//! available without going through `MultiLayoutDataSource`. + +use std::fmt::Debug; + +use vortex::array::ArrayRef; +use vortex::array::Canonical; +use vortex::array::VortexSessionExecute; +use vortex::array::arrays::Struct; +use vortex::array::arrays::StructArray; +use vortex::array::iter::ArrayIterator; +use vortex::dtype::DType; +use vortex::error::VortexResult; +use vortex::error::vortex_err; +use vortex::file::OpenOptionsSessionExt; +use vortex::file::VortexFile; +use vortex::io::runtime::BlockingRuntime; + +use crate::RUNTIME; +use crate::SESSION; +use crate::duckdb::BaseFileReader; +use crate::duckdb::ClientContextRef; +use crate::duckdb::ColumnStatistics; +use crate::duckdb::DataChunkRef; +use crate::duckdb::LogicalType; +use crate::duckdb::MultiFileFunction; +use crate::duckdb::SchemaBuilder; +use crate::exporter::ArrayExporter; +use crate::exporter::ConversionCache; + +/// Multi-file Vortex scan registered via `MultiFileFunction`. +/// +/// Compared to [`crate::multi_file::VortexMultiFileScan`] (the table-function +/// path), this delegates file globbing, virtual columns, and hive partitioning +/// to DuckDB's native machinery, and reads each file via [`VortexFile`]. +#[derive(Debug)] +pub struct VortexMultiFileFunction; + +#[derive(Default)] +pub struct VortexReaderOptions; + +#[derive(Debug)] +pub struct VortexBindData; + +#[derive(Debug)] +pub struct VortexGlobal; + +#[derive(Debug, Default)] +pub struct VortexLocal; + +/// Per-file scan state. Holds the open [`VortexFile`] plus an iterator over the +/// arrays it produces. The iterator is created lazily on first +/// `try_initialize_scan` and drained one batch at a time by `scan`. +pub struct VortexFileReader { + file: VortexFile, + file_idx: usize, + /// Sync-blocking iterator over the file's array stream. Lazily initialized + /// inside `try_initialize_scan` so opening many files in parallel doesn't + /// each spin up a scan task before they're needed. + iter: Option>, + /// Current chunk being drained. `ArrayExporter::export` returns false when + /// it's empty, at which point we pull the next array from `iter`. + exporter: Option, + cache: ConversionCache, + /// Cached schema name list for stats lookups. + column_dtypes: Vec<(String, DType)>, +} + +impl Debug for VortexFileReader { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("VortexFileReader") + .field("file_idx", &self.file_idx) + .field("row_count", &self.file.row_count()) + .finish_non_exhaustive() + } +} + +impl MultiFileFunction for VortexMultiFileFunction { + type ReaderOptions = VortexReaderOptions; + type BindData = VortexBindData; + type GlobalState = VortexGlobal; + type LocalState = VortexLocal; + type Reader = VortexFileReader; + + fn create_options(_ctx: &ClientContextRef) -> VortexResult { + Ok(VortexReaderOptions) + } + + fn initialize_bind_data(_options: Self::ReaderOptions) -> VortexResult { + Ok(VortexBindData) + } + + fn bind_reader( + _ctx: &ClientContextRef, + _bind_data: &Self::BindData, + first_file: &str, + schema: &mut SchemaBuilder, + ) -> VortexResult<()> { + // Open the first file to discover the schema. DuckDB picks the first + // file in the multi-file list for binding. + let first_file = first_file.to_string(); + let file = + RUNTIME.block_on(async move { SESSION.open_options().open_path(&first_file).await })?; + let dtype = file.dtype(); + let fields = dtype.as_struct_fields_opt().ok_or_else(|| { + vortex_err!("Vortex file must contain a struct array at the top level") + })?; + for (name, field_dtype) in fields.names().iter().zip(fields.fields()) { + let logical_type = LogicalType::try_from(&field_dtype)?; + schema.add_column(name.as_ref(), &logical_type); + } + Ok(()) + } + + fn init_global( + _ctx: &ClientContextRef, + _bind_data: &Self::BindData, + ) -> VortexResult { + Ok(VortexGlobal) + } + + fn init_local(_global: &Self::GlobalState) -> Self::LocalState { + VortexLocal + } + + fn create_reader( + _ctx: &ClientContextRef, + _global: &Self::GlobalState, + _bind_data: &Self::BindData, + file_path: &str, + file_idx: usize, + ) -> VortexResult { + let path = file_path.to_string(); + let file = + RUNTIME.block_on(async move { SESSION.open_options().open_path(&path).await })?; + + // Pre-compute (name, dtype) pairs once so per-column stats lookups in + // `get_statistics` don't reparse the struct dtype on each call. + let column_dtypes = file + .dtype() + .as_struct_fields_opt() + .map(|fields| { + fields + .names() + .iter() + .zip(fields.fields()) + .map(|(name, dtype)| (name.to_string(), dtype)) + .collect() + }) + .unwrap_or_default(); + + Ok(VortexFileReader { + file, + file_idx, + iter: None, + exporter: None, + cache: ConversionCache { + file_index: file_idx, + ..Default::default() + }, + column_dtypes, + }) + } +} + +impl BaseFileReader for VortexFileReader { + type GlobalState = VortexGlobal; + type LocalState = VortexLocal; + + fn try_initialize_scan( + &mut self, + _global: &Self::GlobalState, + _local: &mut Self::LocalState, + ) -> VortexResult { + // After the iterator has produced everything, return false to signal + // end-of-file to DuckDB. + if self.iter.is_some() && self.exporter.is_none() { + return Ok(false); + } + if self.iter.is_none() { + let iter = self.file.scan()?.into_array_iter(&*RUNTIME)?; + self.iter = Some(Box::new(iter)); + } + Ok(true) + } + + fn scan( + &mut self, + _global: &Self::GlobalState, + _local: &mut Self::LocalState, + chunk: &mut DataChunkRef, + ) -> VortexResult<()> { + loop { + // Drain the in-flight array if we have one. + if let Some(exporter) = self.exporter.as_mut() { + let has_more = exporter.export(chunk, None, None)?; + if has_more { + return Ok(()); + } + self.exporter = None; + } + + let Some(iter) = self.iter.as_mut() else { + // Can happen if scan is called without try_initialize_scan; treat as end. + chunk.set_len(0); + return Ok(()); + }; + + let Some(next) = iter.next() else { + self.iter.take(); + chunk.set_len(0); + return Ok(()); + }; + let array = next?; + self.exporter = Some(make_exporter(array, &self.cache)?); + } + } + + fn get_statistics(&self, name: &str) -> Option { + let stats = self.file.file_stats()?; + let (stats_set, _) = stats.get_by_name(self.file.dtype(), name)?; + let dtype = &self.column_dtypes.iter().find(|(n, _)| n == name)?.1; + Some(make_column_statistics(stats_set, dtype)) + } + + fn progress_in_file(&self) -> f64 { + // We don't currently track byte-level progress per file; report 0% so + // DuckDB falls back to file-count-based progress. + 0.0 + } +} + +/// Convert the next array off the scan stream into a [`StructArray`] suitable +/// for [`ArrayExporter`]. +fn make_exporter(array: ArrayRef, cache: &ConversionCache) -> VortexResult { + let mut ctx = SESSION.create_execution_ctx(); + let struct_array: StructArray = if let Some(s) = array.as_opt::() { + s.into_owned() + } else { + array.execute::(&mut ctx)?.into_struct() + }; + ArrayExporter::try_new(&struct_array, cache, ctx) +} + +/// Build a [`ColumnStatistics`] from a Vortex `StatsSet`. Handles the shared +/// shape (min/max/has_null/max_string_length); same logic as the existing +/// `datasource.rs` path. +fn make_column_statistics( + stats_set: &vortex::array::stats::StatsSet, + dtype: &DType, +) -> ColumnStatistics { + use vortex::expr::stats::Precision; + use vortex::expr::stats::Stat; + use vortex::scalar::Scalar; + + use crate::convert::ToDuckDBScalar; + + let min = match stats_set.get(Stat::Min) { + Some(Precision::Exact(v)) => Scalar::try_new(dtype.clone(), Some(v)) + .ok() + .and_then(|s| s.try_to_duckdb_scalar().ok()), + _ => None, + }; + let max = match stats_set.get(Stat::Max) { + Some(Precision::Exact(v)) => Scalar::try_new(dtype.clone(), Some(v)) + .ok() + .and_then(|s| s.try_to_duckdb_scalar().ok()), + _ => None, + }; + let max_string_length = match stats_set.get(Stat::UncompressedSizeInBytes) { + Some(Precision::Exact(v)) => v + .as_primitive() + .as_u64() + .map(|u| (1u64 << 63) | u) + .unwrap_or(0), + _ => 0, + }; + let has_null = match stats_set.get(Stat::NullCount) { + Some(Precision::Exact(c)) => c.as_primitive().as_u64().map(|u| u > 0).unwrap_or(true), + _ => true, + } && dtype.is_nullable(); + + ColumnStatistics { + min, + max, + max_string_length, + has_null, + } +} From de9f3fbb09faa4fb52a57afa1abc2ad81f835172 Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Wed, 6 May 2026 21:15:52 -0400 Subject: [PATCH 4/9] feat(duckdb): pushdown, pruning, cardinality, EXPLAIN, filesystem in v2 Brings the MultiFileFunction-backed scan up to feature parity with the existing TableFunction path on the test suite (189/189 pass under both when VX_DUCKDB_MULTI_FILE_FUNCTION=1). FFI surface additions on the multi-file vtable: - prepare_reader(reader, projection, filters): called once after create_reader, hands the per-file reader the columns DuckDB wants (in chunk order) plus the pushed-down TableFilterSet. - cardinality(bind_data, file_count): row-count estimate for the optimizer; falls back to DuckDB's default when not provided. - to_string(bind_data, map): bind-time EXPLAIN key/value output. C++ adapter overrides BaseFileReader::PrepareReader to translate column_ids -> projected column names and forward filters; overrides MultiFileReaderInterface::GetCardinality and TableFunction::to_string to delegate to the new FFI callbacks. In the Rust trait MultiFileFunction picks up `cardinality` and `to_string` defaults; BaseFileReader picks up `prepare_reader` with a default no-op so existing impls don't break. VortexMultiFileFunction wires it together: - projection: builds a `select(names, root())` Vortex projection so chunks contain exactly the columns DuckDB expects (also handles SELECT count(*) which is the explicit zero-projection case). - filter: converts each TableFilter via try_from_table_filter and AND-collects into the scan filter. - file-level pruning: VortexFile::can_prune against the combined filter; pruned files short-circuit TryInitializeScan with false. - progress: rows_scanned / file.row_count() in [0, 100]. - cardinality: rough APPROX_ROWS_PER_FILE * file_count estimate (bind_data can't be mutated from bind_reader through the current FFI; better numbers wait on that hop being added). - to_string: emits a "Function" row. Multi-file orchestration: - register through MultiFileReader::CreateFunctionSet so the function set includes both the single-VARCHAR and LIST(VARCHAR) overloads. `read_vortex_v2(['a.vortex','b.vortex'])` works. - file IO routes through resolve_filesystem(base_url, ctx), so the `vortex_filesystem` extension option ('vortex' vs 'duckdb') chooses the same backends as the v1 path. HTTP/S3 work via DuckDB's httpfs when 'duckdb' is selected. Late materialization remains intentionally off until the per-file reader supports AddVirtualColumn for file_index / file_row_number; batch parallelism within a file is also a follow-up (TryInitializeScan is still one-shot per reader). Signed-off-by: Nicholas Gates Signed-off-by: Nicholas Gates --- .../include/duckdb_vx/multi_file_function.h | 39 ++++ vortex-duckdb/cpp/multi_file_function.cpp | 62 ++++++- .../src/duckdb/multi_file_function.rs | 96 ++++++++++ vortex-duckdb/src/lib.rs | 21 +-- vortex-duckdb/src/multi_file.rs | 2 +- vortex-duckdb/src/multi_file_function.rs | 168 ++++++++++++++++-- 6 files changed, 357 insertions(+), 31 deletions(-) diff --git a/vortex-duckdb/cpp/include/duckdb_vx/multi_file_function.h b/vortex-duckdb/cpp/include/duckdb_vx/multi_file_function.h index b761a0355d1..eb17c7e3490 100644 --- a/vortex-duckdb/cpp/include/duckdb_vx/multi_file_function.h +++ b/vortex-duckdb/cpp/include/duckdb_vx/multi_file_function.h @@ -35,6 +35,17 @@ typedef struct duckdb_vx_mff_reader_ *duckdb_vx_mff_reader; // Opaque writers populated by the extension during bind. typedef struct duckdb_vx_mff_schema_writer_ *duckdb_vx_mff_schema_writer; +// A single projected column passed to prepare_reader. `name` is borrowed for +// the duration of the call. +typedef struct { + const char *name; + size_t name_len; +} duckdb_vx_mff_column; + +// Opaque writer for EXPLAIN/to_string output. Same shape as +// duckdb_vx_string_map but kept distinct for FFI hygiene. +typedef duckdb_vx_string_map duckdb_vx_mff_string_map; + /** * Append a column to the bind schema. The name is copied; the logical type is * cloned. Both arguments remain owned by the caller. @@ -116,6 +127,19 @@ typedef struct { duckdb_vx_error *error); void (*free_reader)(duckdb_vx_mff_reader reader); + /** + * Configure the reader with the columns it should produce and any filters + * pushed down by DuckDB. Called once per (reader, scan) pair before any + * try_initialize_scan / scan calls. `projection` is the ordered list of + * column names the output chunks must contain (one entry per chunk + * column). `filters` may be null when no filters were pushed down. + */ + void (*prepare_reader)(duckdb_vx_mff_reader reader, + const duckdb_vx_mff_column *projection, + size_t projection_count, + duckdb_vx_table_filter_set filters, + duckdb_vx_error *error); + /** * Try to initialize a scan over `reader`. Returns true if a scan can begin, * false if the reader is exhausted. Called with the multi-file global lock @@ -149,6 +173,21 @@ typedef struct { /** Scan progress within a file in [0.0, 100.0]. */ double (*progress_in_file)(duckdb_vx_mff_reader reader); + + /** + * Estimated cardinality across `file_count` files. Returning false leaves + * cardinality unknown (DuckDB falls back to its own heuristic). + */ + bool (*cardinality)(duckdb_vx_mff_bind_data bind_data, + size_t file_count, + duckdb_vx_node_statistics *out); + + /** + * Populate the bind-time EXPLAIN map with key/value pairs (e.g. "Filters", + * "Projection"). Called whenever DuckDB renders the table function in an + * EXPLAIN output. + */ + void (*to_string)(duckdb_vx_mff_bind_data bind_data, duckdb_vx_mff_string_map map); } duckdb_vx_mff_vtab_t; /** diff --git a/vortex-duckdb/cpp/multi_file_function.cpp b/vortex-duckdb/cpp/multi_file_function.cpp index 79771fdeba3..0433e65b5d8 100644 --- a/vortex-duckdb/cpp/multi_file_function.cpp +++ b/vortex-duckdb/cpp/multi_file_function.cpp @@ -152,6 +152,29 @@ class VortexFileReader : public BaseFileReader { return "vortex"; } + void PrepareReader(ClientContext &, GlobalTableFunctionState &) override { + // Translate the multi-file column ids into projected column names, then + // hand DuckDB's TableFilterSet through to Rust as a borrow. The reader + // stores the resulting projection/filter so it can apply them when the + // scan starts. + std::vector ffi_proj; + ffi_proj.reserve(column_ids.size()); + for (idx_t i = 0; i < column_ids.size(); i++) { + auto local_id = column_ids[MultiFileLocalIndex(i)]; + // `local_id` is an index into our local `columns` schema. The + // multi-file reader only routes physical columns here; virtual + // columns are handled separately and never reach this list. + const auto &col = columns[local_id]; + ffi_proj.push_back({col.name.c_str(), col.name.size()}); + } + auto filter_ptr = reinterpret_cast(filters.get()); + duckdb_vx_error error_out = nullptr; + vtab.prepare_reader(handle, ffi_proj.data(), ffi_proj.size(), filter_ptr, &error_out); + if (error_out) { + throw IOException(IntoErrString(error_out)); + } + } + bool TryInitializeScan(ClientContext &, GlobalTableFunctionState &gstate, LocalTableFunctionState &lstate) override { @@ -345,6 +368,21 @@ class VortexMultiFileReaderInterface : public MultiFileReaderInterface { return reader; } + unique_ptr GetCardinality(ClientContext &context, const MultiFileBindData &data, + idx_t file_count) override { + auto &vortex_bind = data.bind_data->Cast(); + duckdb_vx_node_statistics stats = {}; + if (!vtab.cardinality(vortex_bind.handle, file_count, &stats)) { + return MultiFileReaderInterface::GetCardinality(context, data, file_count); + } + auto out = make_uniq(); + out->has_estimated_cardinality = stats.has_estimated_cardinality; + out->estimated_cardinality = stats.estimated_cardinality; + out->has_max_cardinality = stats.has_max_cardinality; + out->max_cardinality = stats.max_cardinality; + return out; + } + unique_ptr Copy() override { return make_uniq(vtab); } @@ -402,10 +440,32 @@ extern "C" duckdb_state duckdb_vx_mff_register(duckdb_database ffi_db, const duc MultiFileFunction mff(vtab->name); mff.function_info = info; + // Bind-time EXPLAIN output. Adds keys like "Function", "Files", + // "Projection", "Filters". MultiFileFunction also installs a + // dynamic_to_string that lists files at scan time; we leave that as-is. + mff.to_string = [](TableFunctionToStringInput &input) { + InsertionOrderPreservingMap result; + const auto &bind = input.bind_data->Cast(); + const auto &vortex_bind = bind.bind_data->Cast(); + auto map = reinterpret_cast(&result); + vortex_bind.vtab.to_string(vortex_bind.handle, map); + return result; + }; + + // Late materialization is not enabled yet: it requires the per-file reader + // to accept AddVirtualColumn calls for file_index / file_row_number and + // produce those columns at scan time. Until that's wired (see follow-up), + // we leave `late_materialization = false` so DuckDB doesn't request + // virtual columns through paths the reader can't satisfy. + try { + // CreateFunctionSet returns a TableFunctionSet that bundles both the + // single-VARCHAR and LIST(VARCHAR) overloads (matching read_parquet's + // shape). This is what enables `read_vortex_v2(['a.vortex','b.vortex'])`. + auto function_set = MultiFileReader::CreateFunctionSet(mff); auto &system_catalog = Catalog::GetSystemCatalog(db); auto data = CatalogTransaction::GetSystemTransaction(db); - CreateTableFunctionInfo tf_info(mff); + CreateTableFunctionInfo tf_info(function_set); tf_info.on_conflict = OnCreateConflict::ALTER_ON_CONFLICT; system_catalog.CreateFunction(data, tf_info); } catch (const std::exception &e) { diff --git a/vortex-duckdb/src/duckdb/multi_file_function.rs b/vortex-duckdb/src/duckdb/multi_file_function.rs index 52c1fd35c45..c17222dc0af 100644 --- a/vortex-duckdb/src/duckdb/multi_file_function.rs +++ b/vortex-duckdb/src/duckdb/multi_file_function.rs @@ -27,13 +27,18 @@ use vortex::error::VortexResult; use vortex::error::vortex_err; use crate::cpp; +use crate::duckdb::Cardinality; use crate::duckdb::ClientContext; use crate::duckdb::ClientContextRef; use crate::duckdb::ColumnStatistics; use crate::duckdb::DataChunk; use crate::duckdb::DataChunkRef; use crate::duckdb::DatabaseRef; +use crate::duckdb::DuckdbStringMap; +use crate::duckdb::DuckdbStringMapRef; use crate::duckdb::LogicalTypeRef; +use crate::duckdb::TableFilterSet; +use crate::duckdb::TableFilterSetRef; use crate::duckdb::try_or; use crate::duckdb_try; @@ -94,6 +99,16 @@ pub trait MultiFileFunction: Sized + Debug { file_path: &str, file_idx: usize, ) -> VortexResult; + + /// Estimated cardinality across `file_count` files. Default returns + /// [`Cardinality::Unknown`] (DuckDB falls back to its own heuristic). + fn cardinality(_bind_data: &Self::BindData, _file_count: usize) -> Cardinality { + Cardinality::Unknown + } + + /// Populate the bind-time EXPLAIN map with key/value pairs (typical keys: + /// `Function`, `Files`, `Projection`, `Filters`). Default no-op. + fn to_string(_bind_data: &Self::BindData, _map: &mut DuckdbStringMapRef) {} } /// Per-file reader contract. Implementations are owned by DuckDB once handed @@ -109,6 +124,22 @@ pub trait BaseFileReader { type GlobalState; type LocalState; + /// Configure projection and filter pushdown. Called once after the reader + /// is created and before any [`Self::try_initialize_scan`] call. + /// `projection` is the ordered list of column names DuckDB needs the + /// chunks to contain (one entry per chunk column). `filters` carries any + /// filters DuckDB pushed down for this scan. + /// + /// Default: no-op (reader scans all columns, no filter pushdown). + fn prepare_reader( + &mut self, + projection: &[&str], + filters: Option<&TableFilterSetRef>, + ) -> VortexResult<()> { + let _ = (projection, filters); + Ok(()) + } + /// Set up scan state for the next batch. Called under DuckDB's per-file /// lock; should not block on I/O. Return `false` once exhausted. fn try_initialize_scan( @@ -182,10 +213,13 @@ impl DatabaseRef { free_local: Some(free_local::), create_reader: Some(create_reader::), free_reader: Some(free_reader::), + prepare_reader: Some(prepare_reader::), try_initialize_scan: Some(try_initialize_scan::), scan: Some(scan::), get_statistics: Some(get_statistics::), progress_in_file: Some(progress_in_file::), + cardinality: Some(cardinality::), + to_string: Some(to_string::), }; duckdb_try!( @@ -322,6 +356,33 @@ unsafe extern "C-unwind" fn free_reader(reader: cpp::duckd } } +unsafe extern "C-unwind" fn prepare_reader( + reader: cpp::duckdb_vx_mff_reader, + projection: *const cpp::duckdb_vx_mff_column, + projection_count: usize, + filters: cpp::duckdb_vx_table_filter_set, + error_out: *mut cpp::duckdb_vx_error, +) { + let reader = unsafe { reader.cast::().as_mut() }.vortex_expect("reader null"); + let filter_ref = if filters.is_null() { + None + } else { + Some(unsafe { TableFilterSet::borrow(filters) }) + }; + try_or(error_out, || { + // Materialize column names into &str borrows scoped to this call. + let mut names: Vec<&str> = Vec::with_capacity(projection_count); + for i in 0..projection_count { + let col = unsafe { &*projection.add(i) }; + let bytes = unsafe { slice::from_raw_parts(col.name.cast::(), col.name_len) }; + let name = std::str::from_utf8(bytes) + .map_err(|e| vortex_err!("projection column name not UTF-8: {e}"))?; + names.push(name); + } + reader.prepare_reader(&names, filter_ref) + }); +} + unsafe extern "C-unwind" fn try_initialize_scan( reader: cpp::duckdb_vx_mff_reader, global: cpp::duckdb_vx_mff_global, @@ -387,6 +448,41 @@ unsafe extern "C-unwind" fn progress_in_file( reader.progress_in_file() } +unsafe extern "C-unwind" fn cardinality( + bind_data: cpp::duckdb_vx_mff_bind_data, + file_count: usize, + out: *mut cpp::duckdb_vx_node_statistics, +) -> bool { + let bind_data = + unsafe { bind_data.cast::().as_ref() }.vortex_expect("bind_data null"); + let out = unsafe { &mut *out }; + match T::cardinality(bind_data, file_count) { + Cardinality::Unknown => false, + Cardinality::Estimate(c) => { + out.has_estimated_cardinality = true; + out.estimated_cardinality = c; + true + } + Cardinality::Maximum(c) => { + out.has_max_cardinality = true; + out.max_cardinality = c; + out.has_estimated_cardinality = true; + out.estimated_cardinality = c; + true + } + } +} + +unsafe extern "C-unwind" fn to_string( + bind_data: cpp::duckdb_vx_mff_bind_data, + map: cpp::duckdb_vx_string_map, +) { + let bind_data = + unsafe { bind_data.cast::().as_ref() }.vortex_expect("bind_data null"); + let map = unsafe { DuckdbStringMap::borrow_mut(map) }; + T::to_string(bind_data, map); +} + // --------------------------------------------------------------------------- // Helpers used by Phase 5 (concrete implementations). // --------------------------------------------------------------------------- diff --git a/vortex-duckdb/src/lib.rs b/vortex-duckdb/src/lib.rs index ccb2e406225..0e57b2b342c 100644 --- a/vortex-duckdb/src/lib.rs +++ b/vortex-duckdb/src/lib.rs @@ -55,16 +55,17 @@ static SESSION: LazyLock = /// to off so the existing scan remains the path of record. /// /// Known gaps in the v2 path (compared to v1) at time of writing: -/// - no projection pushdown: scans always read every column, which mis-sizes -/// output chunks when DuckDB only requested a subset. -/// - no Vortex filesystem integration: the `vortex_filesystem` extension -/// option is ignored; DuckDB's filesystem opens the files. -/// - no `read_vortex(['a.vortex','b.vortex'])` list-of-paths overload. -/// - no filter pushdown into `Vortex` scans (DuckDB still applies filters -/// after reading). -/// - no support for `union_by_name`. -/// These are tracked as follow-up work; for now `read_vortex_v2` exists for -/// benchmark comparisons of the orchestration layer rather than feature parity. +/// - No batch parallelism within a file (`TryInitializeScan` is one-shot, so +/// each Vortex file is scanned by a single worker). +/// - No `union_by_name`, hive partitioning columns, or `filename` / +/// `file_row_number` virtual columns wired through. +/// - No support for the named parameters DuckDB's `MultiFileReader` adds +/// (`union_by_name`, `hive_partitioning`, …) — `ParseOption` returns false. +/// - No `COPY ... FROM 'x.vortex'` via this path. +/// +/// These are tracked as follow-up work; for now `read_vortex_v2` exists +/// alongside `read_vortex` so orchestration paths can be benchmarked +/// side-by-side. fn use_multi_file_function() -> bool { matches!( std::env::var("VX_DUCKDB_MULTI_FILE_FUNCTION").as_deref(), diff --git a/vortex-duckdb/src/multi_file.rs b/vortex-duckdb/src/multi_file.rs index 3f99a854a22..2c4646aa08c 100644 --- a/vortex-duckdb/src/multi_file.rs +++ b/vortex-duckdb/src/multi_file.rs @@ -30,7 +30,7 @@ use crate::filesystem::resolve_filesystem; /// Accepts full URLs (e.g. `s3://bucket/prefix/*.vortex`, `file:///data/*.vortex`) as well as /// bare file paths. For bare paths, the path is made absolute (without requiring it to exist) /// so that relative paths such as `./data/*.vortex` or `../data/*.vortex` are resolved correctly. -fn parse_glob_url(glob_url_str: &str) -> VortexResult { +pub(crate) fn parse_glob_url(glob_url_str: &str) -> VortexResult { Url::parse(glob_url_str).or_else(|_| { let path = absolute(Path::new(glob_url_str)) .map_err(|e| vortex_err!("Failed making {glob_url_str} absolute: {e}"))?; diff --git a/vortex-duckdb/src/multi_file_function.rs b/vortex-duckdb/src/multi_file_function.rs index fdf8b2d61e1..15631b7840c 100644 --- a/vortex-duckdb/src/multi_file_function.rs +++ b/vortex-duckdb/src/multi_file_function.rs @@ -18,23 +18,52 @@ use vortex::array::arrays::Struct; use vortex::array::arrays::StructArray; use vortex::array::iter::ArrayIterator; use vortex::dtype::DType; +use vortex::dtype::FieldName; +use vortex::dtype::FieldNames; use vortex::error::VortexResult; use vortex::error::vortex_err; +use vortex::expr::Expression; +use vortex::expr::and_collect; +use vortex::expr::col; +use vortex::expr::root; +use vortex::expr::select; use vortex::file::OpenOptionsSessionExt; use vortex::file::VortexFile; use vortex::io::runtime::BlockingRuntime; use crate::RUNTIME; use crate::SESSION; +use crate::convert::try_from_table_filter; use crate::duckdb::BaseFileReader; +use crate::duckdb::Cardinality; use crate::duckdb::ClientContextRef; use crate::duckdb::ColumnStatistics; use crate::duckdb::DataChunkRef; +use crate::duckdb::DuckdbStringMapRef; use crate::duckdb::LogicalType; use crate::duckdb::MultiFileFunction; use crate::duckdb::SchemaBuilder; +use crate::duckdb::TableFilterSetRef; use crate::exporter::ArrayExporter; use crate::exporter::ConversionCache; +use crate::filesystem::resolve_filesystem; +use crate::multi_file::parse_glob_url; + +/// Open a [`VortexFile`] using whichever filesystem the user has configured +/// via the `vortex_filesystem` extension option. DuckDB has already expanded +/// any glob and chosen this exact path; we only need the right reader for +/// the URL scheme. Routing through the filesystem also lets HTTP/S3/etc. +/// transparently use DuckDB's `httpfs` when the user picks `'duckdb'`. +fn open_vortex_file(ctx: &ClientContextRef, path: &str) -> VortexResult { + let url = parse_glob_url(path)?; + let mut base_url = url.clone(); + base_url.set_path(""); + let fs = resolve_filesystem(&base_url, ctx)?; + RUNTIME.block_on(async move { + let reader = fs.open_read(url.path()).await?; + SESSION.open_options().open(reader).await + }) +} /// Multi-file Vortex scan registered via `MultiFileFunction`. /// @@ -47,7 +76,14 @@ pub struct VortexMultiFileFunction; #[derive(Default)] pub struct VortexReaderOptions; -#[derive(Debug)] +/// Bind-time data shared across all per-file readers in a query. +/// +/// Bind data is currently empty: the C++ adapter calls +/// [`MultiFileFunction::bind_reader`] with a read-only borrow, so we can't +/// stash per-bind state (e.g. first-file row count, column names) here yet. +/// Cardinality and EXPLAIN consumers fall back to defaults — see the inline +/// comment in `bind_reader`. +#[derive(Debug, Default)] pub struct VortexBindData; #[derive(Debug)] @@ -70,8 +106,26 @@ pub struct VortexFileReader { /// it's empty, at which point we pull the next array from `iter`. exporter: Option, cache: ConversionCache, - /// Cached schema name list for stats lookups. + /// Cached (name, dtype) pairs from the file's struct schema. Used by stats + /// lookups in `get_statistics` to avoid re-walking the dtype each call. column_dtypes: Vec<(String, DType)>, + /// Projection set by [`Self::prepare_reader`]. `None` means prepare was + /// never called (defensive — scan all columns). `Some(empty)` is the + /// explicit zero-projection case (e.g. `SELECT count(*)`); the scan + /// produces struct arrays with no fields, and `ArrayExporter` short- + /// circuits on the empty fields list. + projection: Option>, + /// Filter expression set by [`Self::prepare_reader`]. None when no filters + /// were pushed down or when conversion failed. + filter: Option, + /// Set when a filter has been pushed down and file-level statistics prove + /// the file can be skipped. Causes [`Self::try_initialize_scan`] to return + /// false without opening a scan iterator. + file_pruned: bool, + /// Total rows in the file, cached for [`Self::progress_in_file`]. + total_rows: u64, + /// Rows produced so far. Bumped after each chunk in [`Self::scan`]. + rows_scanned: u64, } impl Debug for VortexFileReader { @@ -99,16 +153,14 @@ impl MultiFileFunction for VortexMultiFileFunction { } fn bind_reader( - _ctx: &ClientContextRef, + ctx: &ClientContextRef, _bind_data: &Self::BindData, first_file: &str, schema: &mut SchemaBuilder, ) -> VortexResult<()> { - // Open the first file to discover the schema. DuckDB picks the first - // file in the multi-file list for binding. - let first_file = first_file.to_string(); - let file = - RUNTIME.block_on(async move { SESSION.open_options().open_path(&first_file).await })?; + // Open the first file (using whichever filesystem the user picked via + // the `vortex_filesystem` extension option) to discover the schema. + let file = open_vortex_file(ctx, first_file)?; let dtype = file.dtype(); let fields = dtype.as_struct_fields_opt().ok_or_else(|| { vortex_err!("Vortex file must contain a struct array at the top level") @@ -120,6 +172,23 @@ impl MultiFileFunction for VortexMultiFileFunction { Ok(()) } + fn cardinality(_bind_data: &Self::BindData, file_count: usize) -> Cardinality { + // We don't yet plumb per-file row counts from bind_reader through to + // bind_data (see comment above), so estimate a moderate per-file size. + // The estimate is only used by the optimizer; correctness doesn't + // depend on it. + const APPROX_ROWS_PER_FILE: u64 = 1_000_000; + if file_count == 0 { + Cardinality::Unknown + } else { + Cardinality::Estimate(APPROX_ROWS_PER_FILE.saturating_mul(file_count as u64)) + } + } + + fn to_string(_bind_data: &Self::BindData, map: &mut DuckdbStringMapRef) { + map.push("Function", "Vortex Multi-File Scan"); + } + fn init_global( _ctx: &ClientContextRef, _bind_data: &Self::BindData, @@ -132,15 +201,13 @@ impl MultiFileFunction for VortexMultiFileFunction { } fn create_reader( - _ctx: &ClientContextRef, + ctx: &ClientContextRef, _global: &Self::GlobalState, _bind_data: &Self::BindData, file_path: &str, file_idx: usize, ) -> VortexResult { - let path = file_path.to_string(); - let file = - RUNTIME.block_on(async move { SESSION.open_options().open_path(&path).await })?; + let file = open_vortex_file(ctx, file_path)?; // Pre-compute (name, dtype) pairs once so per-column stats lookups in // `get_statistics` don't reparse the struct dtype on each call. @@ -157,6 +224,7 @@ impl MultiFileFunction for VortexMultiFileFunction { }) .unwrap_or_default(); + let total_rows = file.row_count(); Ok(VortexFileReader { file, file_idx, @@ -167,6 +235,11 @@ impl MultiFileFunction for VortexMultiFileFunction { ..Default::default() }, column_dtypes, + projection: None, + filter: None, + file_pruned: false, + total_rows, + rows_scanned: 0, }) } } @@ -175,18 +248,73 @@ impl BaseFileReader for VortexFileReader { type GlobalState = VortexGlobal; type LocalState = VortexLocal; + fn prepare_reader( + &mut self, + projection: &[&str], + filters: Option<&TableFilterSetRef>, + ) -> VortexResult<()> { + // Capture the projection in chunk order. `Some(empty)` is the explicit + // zero-projection case (e.g. SELECT count(*)) and is handled when the + // scan starts. + let proj: Vec = projection.iter().map(|n| FieldName::from(*n)).collect(); + + // Build a Vortex filter expression from DuckDB's table filters. Filter + // indices are positions into the *projected* column list (matching + // the v1 datasource path). + if let Some(filters) = filters { + let dtype = self.file.dtype(); + let mut pieces: Vec = Vec::new(); + for (idx, filter) in filters.into_iter() { + let idx = usize::try_from(idx) + .map_err(|_| vortex_err!("filter column index does not fit usize"))?; + let Some(name) = proj.get(idx) else { + continue; + }; + if let Some(expr) = try_from_table_filter(filter, &col(name.as_ref()), dtype)? { + pieces.push(expr); + } + } + self.filter = and_collect(pieces); + } + self.projection = Some(proj); + + // File-level pruning: if the filter combined with the file's stored + // statistics proves no row can match, skip this file entirely. + if let Some(filter) = &self.filter + && self.file.can_prune(filter)? + { + self.file_pruned = true; + } + Ok(()) + } + fn try_initialize_scan( &mut self, _global: &Self::GlobalState, _local: &mut Self::LocalState, ) -> VortexResult { - // After the iterator has produced everything, return false to signal - // end-of-file to DuckDB. + if self.file_pruned { + return Ok(false); + } + // Single-batch model: hand DuckDB one batch covering the whole file, + // then signal exhaustion. (Per-split parallelism is a follow-up.) if self.iter.is_some() && self.exporter.is_none() { return Ok(false); } if self.iter.is_none() { - let iter = self.file.scan()?.into_array_iter(&*RUNTIME)?; + let mut builder = self.file.scan()?; + // Apply projection. `None` (prepare not called) defaults to all + // columns; `Some` (including the empty case for SELECT count(*)) + // applies an explicit `select` so the resulting struct arrays + // contain exactly the columns DuckDB expects. + if let Some(names) = &self.projection { + let names = FieldNames::from_iter(names.iter().cloned()); + builder = builder.with_projection(select(names, root())); + } + if let Some(filter) = self.filter.clone() { + builder = builder.with_filter(filter); + } + let iter = builder.into_array_iter(&*RUNTIME)?; self.iter = Some(Box::new(iter)); } Ok(true) @@ -203,13 +331,13 @@ impl BaseFileReader for VortexFileReader { if let Some(exporter) = self.exporter.as_mut() { let has_more = exporter.export(chunk, None, None)?; if has_more { + self.rows_scanned = self.rows_scanned.saturating_add(chunk.len()); return Ok(()); } self.exporter = None; } let Some(iter) = self.iter.as_mut() else { - // Can happen if scan is called without try_initialize_scan; treat as end. chunk.set_len(0); return Ok(()); }; @@ -232,9 +360,11 @@ impl BaseFileReader for VortexFileReader { } fn progress_in_file(&self) -> f64 { - // We don't currently track byte-level progress per file; report 0% so - // DuckDB falls back to file-count-based progress. - 0.0 + if self.total_rows == 0 { + return 100.0; + } + let pct = (self.rows_scanned as f64 / self.total_rows as f64) * 100.0; + pct.clamp(0.0, 100.0) } } From f39f589b0a03d3f6131d184c1686fb031f1f2b10 Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Thu, 7 May 2026 12:33:29 -0400 Subject: [PATCH 5/9] More Signed-off-by: Nicholas Gates --- benchmarks/duckdb-bench/src/lib.rs | 3 +- vortex-datafusion/src/persistent/opener.rs | 2 +- vortex-duckdb/build.rs | 3 +- vortex-duckdb/cpp/include/duckdb_vx.h | 1 + .../include/duckdb_vx/multi_file_function.h | 83 +- vortex-duckdb/cpp/multi_file_function.cpp | 203 ++- vortex-duckdb/src/duckdb/mod.rs | 1 + .../src/duckdb/multi_file_function.rs | 249 +++- .../src/e2e_test/vortex_scan_test.rs | 243 +++- vortex-duckdb/src/exporter/mod.rs | 66 + vortex-duckdb/src/lib.rs | 10 +- vortex-duckdb/src/multi_file_function.rs | 1183 +++++++++++++++-- vortex-file/src/file.rs | 2 +- vortex-layout/public-api.lock | 10 +- vortex-layout/src/layouts/row_idx/mod.rs | 230 ++++ vortex-layout/src/scan/scan_builder.rs | 138 +- vortex-layout/src/scan/split_by.rs | 215 ++- vortex-python/src/dataset.rs | 12 +- 18 files changed, 2398 insertions(+), 256 deletions(-) diff --git a/benchmarks/duckdb-bench/src/lib.rs b/benchmarks/duckdb-bench/src/lib.rs index fed9f82b004..308553852c0 100644 --- a/benchmarks/duckdb-bench/src/lib.rs +++ b/benchmarks/duckdb-bench/src/lib.rs @@ -90,7 +90,7 @@ impl DuckClient { let connection = db.connect()?; vortex_duckdb::initialize(&db)?; - // Enable Parquet metadata cache for all benchmark runs. + // Enable metadata caches for all benchmark runs. // // `parquet_metadata_cache` is an extension-specific option that's // only available after the Parquet extension is loaded. The Parquet @@ -100,6 +100,7 @@ impl DuckClient { // "Invalid Input Error: The following options were not recognized: // parquet_metadata_cache" when running DuckDB in debug mode. connection.query("SET parquet_metadata_cache = true")?; + connection.query("SET vortex_metadata_cache = true")?; Ok((db, connection)) } diff --git a/vortex-datafusion/src/persistent/opener.rs b/vortex-datafusion/src/persistent/opener.rs index acff2a22806..4f064fa7d87 100644 --- a/vortex-datafusion/src/persistent/opener.rs +++ b/vortex-datafusion/src/persistent/opener.rs @@ -468,7 +468,7 @@ fn natural_split_ranges_for_file( fn compute_natural_split_ranges(layout_reader: &dyn LayoutReader) -> DFResult]>> { let row_count = layout_reader.row_count(); let row_range = 0..row_count; - let split_points: Vec<_> = SplitBy::Layout + let split_points: Vec<_> = SplitBy::layout() .splits(layout_reader, &row_range, &[FieldMask::All]) .map_err(|e| exec_datafusion_err!("Failed to compute Vortex natural splits: {e}"))? .into_iter() diff --git a/vortex-duckdb/build.rs b/vortex-duckdb/build.rs index fa6461c657f..e1ef334334f 100644 --- a/vortex-duckdb/build.rs +++ b/vortex-duckdb/build.rs @@ -20,7 +20,7 @@ const DUCKDB_SOURCE_COMMIT_URL: &str = "https://github.com/duckdb/duckdb/archive const BUILD_ARTIFACTS: [&str; 3] = ["libduckdb.dylib", "libduckdb.so", "libduckdb_static.a"]; -const SOURCE_FILES: [&str; 18] = [ +const SOURCE_FILES: [&str; 19] = [ "cpp/client_context.cpp", "cpp/config.cpp", "cpp/copy_function.cpp", @@ -31,6 +31,7 @@ const SOURCE_FILES: [&str; 18] = [ "cpp/file_system.cpp", "cpp/logical_type.cpp", "cpp/multi_file_function.cpp", + "cpp/object_cache.cpp", "cpp/replacement_scan.cpp", "cpp/reusable_dict.cpp", "cpp/scalar_function.cpp", diff --git a/vortex-duckdb/cpp/include/duckdb_vx.h b/vortex-duckdb/cpp/include/duckdb_vx.h index 862a334a4e5..afe56803b2f 100644 --- a/vortex-duckdb/cpp/include/duckdb_vx.h +++ b/vortex-duckdb/cpp/include/duckdb_vx.h @@ -13,6 +13,7 @@ #include "duckdb_vx/file_system.h" #include "duckdb_vx/logical_type.h" #include "duckdb_vx/multi_file_function.h" +#include "duckdb_vx/object_cache.h" #include "duckdb_vx/reusable_dict.h" #include "duckdb_vx/replacement_scan.h" #include "duckdb_vx/scalar_function.h" diff --git a/vortex-duckdb/cpp/include/duckdb_vx/multi_file_function.h b/vortex-duckdb/cpp/include/duckdb_vx/multi_file_function.h index eb17c7e3490..552a9815d14 100644 --- a/vortex-duckdb/cpp/include/duckdb_vx/multi_file_function.h +++ b/vortex-duckdb/cpp/include/duckdb_vx/multi_file_function.h @@ -9,6 +9,19 @@ * hive partitioning, virtual columns, etc. are all driven by DuckDB itself; the * extension only supplies a per-format reader. * + * Lifecycle, mirroring DuckDB's Parquet reader: + * 1. create_options / initialize_bind_data / bind_reader collect bind-time options, + * metadata, and schema. + * 2. init_global / init_local create per-query and per-worker state. + * 3. create_reader opens one file. DuckDB has dropped the global multi-file scheduling + * mutex before this call and holds a per-file mutex for this reader. + * 4. prepare_reader maps the projection and filters onto the opened reader. + * 5. try_initialize_scan is called with DuckDB's global multi-file scheduling mutex held. + * It must only claim one cheap unit of scan work into local state. + * 6. prepare_scan runs outside that scheduling mutex and initializes local scan state + * for the work claimed by try_initialize_scan. + * 7. scan drains the local state prepared by prepare_scan into DuckDB chunks. + * * Owned-pointer convention: every non-null pointer the extension returns is owned by * DuckDB and must be released by the corresponding free_* callback. Borrowed pointers * (passed in to callbacks) must not be freed. @@ -35,13 +48,23 @@ typedef struct duckdb_vx_mff_reader_ *duckdb_vx_mff_reader; // Opaque writers populated by the extension during bind. typedef struct duckdb_vx_mff_schema_writer_ *duckdb_vx_mff_schema_writer; -// A single projected column passed to prepare_reader. `name` is borrowed for -// the duration of the call. +// A single scan column passed to prepare_reader. `name` is borrowed for the +// duration of the call. `is_projected` distinguishes final output columns from +// filter-only scan columns. typedef struct { const char *name; size_t name_len; + uint64_t column_id; + bool is_virtual; + bool is_projected; } duckdb_vx_mff_column; +// Exact per-file partition statistics for DuckDB's aggregate/statistics +// optimizer. Currently only row counts are exposed. +typedef struct { + uint64_t row_count; +} duckdb_vx_mff_partition_stats; + // Opaque writer for EXPLAIN/to_string output. Same shape as // duckdb_vx_string_map but kept distinct for FFI hygiene. typedef duckdb_vx_string_map duckdb_vx_mff_string_map; @@ -61,6 +84,20 @@ typedef struct { /** Function name, e.g. "read_vortex". Must outlive the registered function. */ const char *name; + /** Whether DuckDB may pass pushed table filters to prepare_reader. */ + bool filter_pushdown; + + /** Whether DuckDB may omit filter-only columns from final table-scan output. */ + bool filter_prune; + + /** + * Try to push a complex filter expression into bind data. Returns true when + * the filter is handled exactly and DuckDB may remove the standalone filter. + */ + bool (*pushdown_complex_filter)(duckdb_vx_mff_bind_data bind_data, + duckdb_vx_expr expr, + duckdb_vx_error *error_out); + // --------------------------------------------------------------------- // Options lifecycle // --------------------------------------------------------------------- @@ -81,6 +118,9 @@ typedef struct { */ duckdb_vx_mff_bind_data (*initialize_bind_data)(duckdb_vx_mff_options options, duckdb_vx_error *error); + /** Clone bind data. Used when DuckDB rewrites plans, e.g. late materialization. */ + duckdb_vx_mff_bind_data (*clone_bind_data)(duckdb_vx_mff_bind_data bind_data, + duckdb_vx_error *error); /** Release bind data. Must accept null. */ void (*free_bind_data)(duckdb_vx_mff_bind_data bind_data); @@ -116,7 +156,8 @@ typedef struct { /** * Open a per-file reader. Called once per file when DuckDB first opens - * that file for scanning. + * that file for scanning. This may open file metadata, but should not do + * per-scan work because projection/filter state has not been prepared yet. */ duckdb_vx_mff_reader (*create_reader)(duckdb_client_context ctx, duckdb_vx_mff_global global, @@ -131,8 +172,10 @@ typedef struct { * Configure the reader with the columns it should produce and any filters * pushed down by DuckDB. Called once per (reader, scan) pair before any * try_initialize_scan / scan calls. `projection` is the ordered list of - * column names the output chunks must contain (one entry per chunk - * column). `filters` may be null when no filters were pushed down. + * intermediate scan columns DuckDB needs the chunks to contain. Columns + * marked `is_projected=false` are only needed for pushed filters and are + * not referenced by DuckDB's final output expressions. `filters` may be + * null when no filters were pushed down. */ void (*prepare_reader)(duckdb_vx_mff_reader reader, const duckdb_vx_mff_column *projection, @@ -142,8 +185,9 @@ typedef struct { /** * Try to initialize a scan over `reader`. Returns true if a scan can begin, - * false if the reader is exhausted. Called with the multi-file global lock - * held; must not block on I/O. + * false if the reader is exhausted. Called with DuckDB's multi-file global + * scheduling mutex held; must not block on I/O, run async work, or build + * expensive scan pipelines. Store only the claimed work descriptor in `local`. */ bool (*try_initialize_scan)(duckdb_vx_mff_reader reader, duckdb_vx_mff_global global, @@ -151,7 +195,18 @@ typedef struct { duckdb_vx_error *error); /** - * Produce the next batch of data into `chunk_out`. Returns: + * Prepare local scan state for the work claimed by try_initialize_scan. + * Called outside DuckDB's multi-file global scheduling mutex, mirroring + * DuckDB's BaseFileReader::PrepareScan hook. + */ + void (*prepare_scan)(duckdb_vx_mff_reader reader, + duckdb_vx_mff_global global, + duckdb_vx_mff_local local, + duckdb_vx_error *error); + + /** + * Produce the next batch of data into `chunk_out`. Called outside DuckDB's + * multi-file global scheduling mutex after prepare_scan. Returns: * - true with chunk size > 0 : more data may follow. * - true with chunk size == 0 : reader is exhausted; DuckDB will move on. * - false : an error occurred (see error_out). @@ -182,6 +237,18 @@ typedef struct { size_t file_count, duckdb_vx_node_statistics *out); + /** + * Get exact row count statistics for one file. Returning false means the + * stats are not currently available; DuckDB will skip statistics-based + * aggregate rewrites unless every file returns exact stats. + */ + bool (*partition_stats)(duckdb_client_context ctx, + duckdb_vx_mff_bind_data bind_data, + const char *file_path, + size_t path_len, + duckdb_vx_mff_partition_stats *out, + duckdb_vx_error *error); + /** * Populate the bind-time EXPLAIN map with key/value pairs (e.g. "Filters", * "Projection"). Called whenever DuckDB renders the table function in an diff --git a/vortex-duckdb/cpp/multi_file_function.cpp b/vortex-duckdb/cpp/multi_file_function.cpp index 0433e65b5d8..27ab823bf5f 100644 --- a/vortex-duckdb/cpp/multi_file_function.cpp +++ b/vortex-duckdb/cpp/multi_file_function.cpp @@ -21,6 +21,8 @@ #include "duckdb_vx/multi_file_function.h" #include +#include +#include #include DUCKDB_INCLUDES_BEGIN @@ -31,12 +33,15 @@ DUCKDB_INCLUDES_BEGIN #include "duckdb/common/multi_file/multi_file_function.hpp" #include "duckdb/common/multi_file/multi_file_reader.hpp" #include "duckdb/common/multi_file/multi_file_states.hpp" +#include "duckdb/function/partition_stats.hpp" #include "duckdb/main/capi/capi_internal.hpp" #include "duckdb/parser/parsed_data/create_table_function_info.hpp" DUCKDB_INCLUDES_END using namespace duckdb; using vortex::IntoErrString; +constexpr column_t COLUMN_IDENTIFIER_FILE_INDEX = MultiFileReader::COLUMN_IDENTIFIER_FILE_INDEX; +constexpr column_t COLUMN_IDENTIFIER_FILE_ROW_NUMBER = MultiFileReader::COLUMN_IDENTIFIER_FILE_ROW_NUMBER; namespace { @@ -93,6 +98,15 @@ struct VortexMultiFileBindData : public TableFunctionData { return false; } + unique_ptr Copy() const override { + duckdb_vx_error error_out = nullptr; + auto cloned = vtab.clone_bind_data(handle, &error_out); + if (error_out) { + throw InternalException(IntoErrString(error_out)); + } + return make_uniq(vtab, cloned); + } + const duckdb_vx_mff_vtab_t &vtab; duckdb_vx_mff_bind_data handle; }; @@ -103,8 +117,10 @@ struct VortexMultiFileBindData : public TableFunctionData { */ class VortexInterfaceGlobalState : public GlobalTableFunctionState { public: - VortexInterfaceGlobalState(const duckdb_vx_mff_vtab_t &vtab, duckdb_vx_mff_global handle) - : vtab(vtab), handle(handle) { + VortexInterfaceGlobalState(const duckdb_vx_mff_vtab_t &vtab, + duckdb_vx_mff_global handle, + const MultiFileGlobalState &multi_file_state) + : vtab(vtab), handle(handle), multi_file_state(&multi_file_state) { } ~VortexInterfaceGlobalState() override { if (handle) { @@ -114,6 +130,7 @@ class VortexInterfaceGlobalState : public GlobalTableFunctionState { const duckdb_vx_mff_vtab_t &vtab; duckdb_vx_mff_global handle; + const MultiFileGlobalState *multi_file_state; }; class VortexInterfaceLocalState : public LocalTableFunctionState { @@ -152,20 +169,45 @@ class VortexFileReader : public BaseFileReader { return "vortex"; } - void PrepareReader(ClientContext &, GlobalTableFunctionState &) override { + void AddVirtualColumn(column_t virtual_column_id) override { + if (columns.empty()) { + throw InternalException("Vortex reader received virtual column before column registration"); + } + virtual_column_ids[columns.size() - 1] = virtual_column_id; + } + + void PrepareReader(ClientContext &, GlobalTableFunctionState &gstate) override { // Translate the multi-file column ids into projected column names, then // hand DuckDB's TableFilterSet through to Rust as a borrow. The reader // stores the resulting projection/filter so it can apply them when the // scan starts. + auto &g = gstate.Cast(); + std::unordered_set projected_column_ids; + if (g.multi_file_state && !g.multi_file_state->projection_ids.empty()) { + projected_column_ids.reserve(g.multi_file_state->projection_ids.size()); + for (const auto &projection_id : g.multi_file_state->projection_ids) { + if (projection_id >= g.multi_file_state->column_indexes.size()) { + throw InternalException("Vortex projection id out of range"); + } + projected_column_ids.insert(g.multi_file_state->column_indexes[projection_id].GetPrimaryIndex()); + } + } + std::vector ffi_proj; ffi_proj.reserve(column_ids.size()); for (idx_t i = 0; i < column_ids.size(); i++) { auto local_id = column_ids[MultiFileLocalIndex(i)]; - // `local_id` is an index into our local `columns` schema. The - // multi-file reader only routes physical columns here; virtual - // columns are handled separately and never reach this list. + // `local_id` is an index into our local `columns` schema. Physical + // columns use their local id directly; non-constant virtual columns + // are appended to `columns` by DuckDB's mapper and announced via + // AddVirtualColumn. const auto &col = columns[local_id]; - ffi_proj.push_back({col.name.c_str(), col.name.size()}); + auto virtual_entry = virtual_column_ids.find(local_id.GetId()); + const bool is_virtual = virtual_entry != virtual_column_ids.end(); + const auto column_id = is_virtual ? virtual_entry->second : local_id.GetId(); + const bool is_projected = + projected_column_ids.empty() || projected_column_ids.find(column_id) != projected_column_ids.end(); + ffi_proj.push_back({col.name.c_str(), col.name.size(), column_id, is_virtual, is_projected}); } auto filter_ptr = reinterpret_cast(filters.get()); duckdb_vx_error error_out = nullptr; @@ -188,6 +230,18 @@ class VortexFileReader : public BaseFileReader { return ok; } + void PrepareScan(ClientContext &, + GlobalTableFunctionState &gstate, + LocalTableFunctionState &lstate) override { + auto &g = gstate.Cast(); + auto &l = lstate.Cast(); + duckdb_vx_error error_out = nullptr; + vtab.prepare_scan(handle, g.handle, l.handle, &error_out); + if (error_out) { + throw IOException(IntoErrString(error_out)); + } + } + AsyncResult Scan(ClientContext &, GlobalTableFunctionState &gstate, LocalTableFunctionState &lstate, @@ -255,6 +309,7 @@ class VortexFileReader : public BaseFileReader { private: const duckdb_vx_mff_vtab_t &vtab; duckdb_vx_mff_reader handle; + std::unordered_map virtual_column_ids; }; /** @@ -264,11 +319,15 @@ class VortexFileReader : public BaseFileReader { */ class VortexMultiFileReaderInterface : public MultiFileReaderInterface { public: - explicit VortexMultiFileReaderInterface(const duckdb_vx_mff_vtab_t &vtab_p) : vtab(vtab_p) { - } + VortexMultiFileReaderInterface() = default; unique_ptr InitializeOptions(ClientContext &context, - optional_ptr) override { + optional_ptr info) override { + if (!info) { + throw BinderException("Vortex multi-file function requires TableFunctionInfo"); + } + vtab = &info->Cast().vtab; + auto &vtab = Vtab(); duckdb_vx_error error_out = nullptr; auto ctx = reinterpret_cast(&context); auto handle = vtab.create_options(ctx, &error_out); @@ -290,6 +349,7 @@ class VortexMultiFileReaderInterface : public MultiFileReaderInterface { unique_ptr InitializeBindData(MultiFileBindData &, unique_ptr options) override { + auto &vtab = Vtab(); auto &vortex_options = options->Cast(); // Take ownership of the options handle and pass it to the FFI. duckdb_vx_error error_out = nullptr; @@ -302,6 +362,7 @@ class VortexMultiFileReaderInterface : public MultiFileReaderInterface { void BindReader(ClientContext &context, vector &return_types, vector &names, MultiFileBindData &bind_data) override { + auto &vtab = Vtab(); auto first_file = bind_data.file_list->GetFirstFile(); auto &vortex_bind = bind_data.bind_data->Cast(); @@ -323,7 +384,8 @@ class VortexMultiFileReaderInterface : public MultiFileReaderInterface { unique_ptr InitializeGlobalState(ClientContext &context, MultiFileBindData &bind_data, - MultiFileGlobalState &) override { + MultiFileGlobalState &multi_file_state) override { + auto &vtab = Vtab(); auto &vortex_bind = bind_data.bind_data->Cast(); duckdb_vx_error error_out = nullptr; auto ctx = reinterpret_cast(&context); @@ -331,16 +393,22 @@ class VortexMultiFileReaderInterface : public MultiFileReaderInterface { if (error_out) { throw BinderException(IntoErrString(error_out)); } - return make_uniq(vtab, handle); + return make_uniq(vtab, handle, multi_file_state); } unique_ptr InitializeLocalState(ExecutionContext &, GlobalTableFunctionState &gstate) override { + auto &vtab = Vtab(); auto &g = gstate.Cast(); auto handle = vtab.init_local(g.handle); return make_uniq(vtab, handle); } + void GetVirtualColumns(ClientContext &, MultiFileBindData &, virtual_column_map_t &result) override { + result.insert(make_pair(COLUMN_IDENTIFIER_FILE_ROW_NUMBER, + TableColumn("file_row_number", LogicalType::BIGINT))); + } + shared_ptr CreateReader(ClientContext &, GlobalTableFunctionState &, BaseUnionData &, const MultiFileBindData &) override { // UNION BY NAME path - not supported yet. @@ -350,6 +418,7 @@ class VortexMultiFileReaderInterface : public MultiFileReaderInterface { shared_ptr CreateReader(ClientContext &context, GlobalTableFunctionState &gstate, const OpenFileInfo &file, idx_t file_idx, const MultiFileBindData &bind_data) override { + auto &vtab = Vtab(); auto &vortex_bind = bind_data.bind_data->Cast(); auto &vortex_g = gstate.Cast(); duckdb_vx_error error_out = nullptr; @@ -370,6 +439,7 @@ class VortexMultiFileReaderInterface : public MultiFileReaderInterface { unique_ptr GetCardinality(ClientContext &context, const MultiFileBindData &data, idx_t file_count) override { + auto &vtab = Vtab(); auto &vortex_bind = data.bind_data->Cast(); duckdb_vx_node_statistics stats = {}; if (!vtab.cardinality(vortex_bind.handle, file_count, &stats)) { @@ -384,11 +454,20 @@ class VortexMultiFileReaderInterface : public MultiFileReaderInterface { } unique_ptr Copy() override { - return make_uniq(vtab); + auto copy = make_uniq(); + copy->vtab = vtab; + return copy; } private: - const duckdb_vx_mff_vtab_t &vtab; + const duckdb_vx_mff_vtab_t &Vtab() const { + if (!vtab) { + throw InternalException("VortexMultiFileReaderInterface used before InitializeOptions"); + } + return *vtab; + } + + const duckdb_vx_mff_vtab_t *vtab = nullptr; }; /** @@ -396,17 +475,77 @@ class VortexMultiFileReaderInterface : public MultiFileReaderInterface { * CreateInterface can construct a VortexMultiFileReaderInterface bound to it. */ struct VortexMultiFileFunctionOp { - static const duckdb_vx_mff_vtab_t *current_vtab; - static unique_ptr CreateInterface(ClientContext &) { - if (!current_vtab) { - throw InternalException("VortexMultiFileFunctionOp::CreateInterface called without a registered vtab"); - } - return make_uniq(*current_vtab); + return make_uniq(); } }; -const duckdb_vx_mff_vtab_t *VortexMultiFileFunctionOp::current_vtab = nullptr; +void mff_pushdown_complex_filter(ClientContext &context, + LogicalGet &get, + FunctionData *bind_data_p, + vector> &filters) { + auto &data = bind_data_p->Cast(); + + MultiFilePushdownInfo info(get); + auto new_list = + data.multi_file_reader->ComplexFilterPushdown(context, *data.file_list, data.file_options, info, filters); + + if (new_list) { + data.file_list = std::move(new_list); + MultiFileReader::PruneReaders(data, *data.file_list); + } + + auto &vortex_bind = data.bind_data->Cast(); + duckdb_vx_error error_out = nullptr; + for (auto iter = filters.begin(); iter != filters.end();) { + duckdb_vx_expr ffi_expr = reinterpret_cast(iter->get()); + const bool pushed = vortex_bind.vtab.pushdown_complex_filter(vortex_bind.handle, ffi_expr, &error_out); + if (error_out) { + throw BinderException(IntoErrString(error_out)); + } + iter = pushed ? filters.erase(iter) : std::next(iter); + } +} + +vector mff_get_partition_stats(ClientContext &context, GetPartitionStatsInput &input) { + vector result; + if (!input.bind_data) { + return result; + } + + auto &data = input.bind_data->Cast(); + if (!data.bind_data || !data.file_list) { + return result; + } + + auto &vortex_bind = data.bind_data->Cast(); + if (!vortex_bind.vtab.partition_stats) { + return result; + } + + auto ctx = reinterpret_cast(&context); + idx_t row_start = 0; + for (const auto &file : data.file_list->Files()) { + duckdb_vx_mff_partition_stats ffi_stats = {}; + duckdb_vx_error error_out = nullptr; + const bool found = vortex_bind.vtab.partition_stats(ctx, vortex_bind.handle, file.path.c_str(), + file.path.size(), &ffi_stats, &error_out); + if (error_out) { + throw IOException(IntoErrString(error_out)); + } + if (!found) { + return {}; + } + + PartitionStatistics stats; + stats.row_start = optional_idx(row_start); + stats.count = static_cast(ffi_stats.row_count); + stats.count_type = CountType::COUNT_EXACT; + result.push_back(std::move(stats)); + row_start += static_cast(ffi_stats.row_count); + } + return result; +} } // namespace @@ -430,15 +569,21 @@ extern "C" duckdb_state duckdb_vx_mff_register(duckdb_database ffi_db, const duc const auto &wrapper = *reinterpret_cast(ffi_db); auto &db = *wrapper.database->instance; - // Capture the vtab pointer so MultiFileFunction::MultiFileBind can find it - // when DuckDB re-enters CreateInterface during bind. The catalog will also - // hold a copy via TableFunctionInfo so this stays alive for the lifetime of - // the registered function. + // The catalog-owned TableFunctionInfo carries the vtab copy that each bind + // resolves through InitializeOptions. Keeping it there avoids a shared + // global pointer across databases/tests. auto info = make_shared_ptr(*vtab); - VortexMultiFileFunctionOp::current_vtab = &info->vtab; MultiFileFunction mff(vtab->name); mff.function_info = info; + mff.filter_pushdown = vtab->filter_pushdown; + mff.filter_prune = vtab->filter_prune; + mff.pushdown_complex_filter = mff_pushdown_complex_filter; + mff.get_partition_stats = mff_get_partition_stats; + mff.late_materialization = true; + mff.get_row_id_columns = [](ClientContext &, optional_ptr) -> vector { + return {COLUMN_IDENTIFIER_FILE_INDEX, COLUMN_IDENTIFIER_FILE_ROW_NUMBER}; + }; // Bind-time EXPLAIN output. Adds keys like "Function", "Files", // "Projection", "Filters". MultiFileFunction also installs a @@ -452,12 +597,6 @@ extern "C" duckdb_state duckdb_vx_mff_register(duckdb_database ffi_db, const duc return result; }; - // Late materialization is not enabled yet: it requires the per-file reader - // to accept AddVirtualColumn calls for file_index / file_row_number and - // produce those columns at scan time. Until that's wired (see follow-up), - // we leave `late_materialization = false` so DuckDB doesn't request - // virtual columns through paths the reader can't satisfy. - try { // CreateFunctionSet returns a TableFunctionSet that bundles both the // single-VARCHAR and LIST(VARCHAR) overloads (matching read_parquet's diff --git a/vortex-duckdb/src/duckdb/mod.rs b/vortex-duckdb/src/duckdb/mod.rs index eb595633ec1..5fb36350136 100644 --- a/vortex-duckdb/src/duckdb/mod.rs +++ b/vortex-duckdb/src/duckdb/mod.rs @@ -14,6 +14,7 @@ mod file_system; mod logical_type; mod macro_; mod multi_file_function; +mod object_cache; mod query_result; mod reusable_dict; mod scalar_function; diff --git a/vortex-duckdb/src/duckdb/multi_file_function.rs b/vortex-duckdb/src/duckdb/multi_file_function.rs index c17222dc0af..483cd04c1c2 100644 --- a/vortex-duckdb/src/duckdb/multi_file_function.rs +++ b/vortex-duckdb/src/duckdb/multi_file_function.rs @@ -13,8 +13,35 @@ //! - [`MultiFileFunction`] ↔ `MultiFileReaderInterface` //! - [`BaseFileReader`] ↔ `BaseFileReader` //! -//! Both are non-object-safe with associated types so each implementation gets -//! statically-monomorphised callbacks (no per-call dyn dispatch). +//! The wrapper is generic over one [`MultiFileFunction`] implementation so each +//! registered function gets statically-monomorphised callbacks (no per-call dyn +//! dispatch). +//! +//! Callback lifecycle, matching DuckDB's `MultiFileFunction` / Parquet reader +//! model: +//! +//! 1. Bind: [`MultiFileFunction::create_options`], +//! [`MultiFileFunction::initialize_bind_data`], and +//! [`MultiFileFunction::bind_reader`] run once to collect options, bind-time +//! state, and schema. +//! 2. Query init: [`MultiFileFunction::init_global`] and +//! [`MultiFileFunction::init_local`] create per-query and per-worker state. +//! 3. File open: [`MultiFileFunction::create_reader`] is called when DuckDB +//! decides to open a file. DuckDB does not hold the global multi-file +//! scheduling mutex while opening; it switches to a per-file mutex so other +//! workers can wait for that specific reader. +//! 4. Reader preparation: [`BaseFileReader::prepare_reader`] maps projection +//! and filters onto the opened reader. It happens once per reader before any +//! scan assignment for that reader. +//! 5. Scan assignment: [`BaseFileReader::try_initialize_scan`] is called while +//! DuckDB holds its global multi-file scheduling mutex. This must be a cheap +//! claim of one independent unit of work, e.g. a row group or row range. It +//! must not perform I/O, block on async work, or construct expensive scan +//! pipelines. +//! 6. Scan execution: DuckDB releases the scheduling mutex before calling its +//! `PrepareScan` hook, exposed here as [`BaseFileReader::prepare_scan`]. +//! Reader implementations should build per-assignment local scan state +//! there, then [`BaseFileReader::scan`] drains that local state into chunks. use std::ffi::CStr; use std::ffi::CString; @@ -36,6 +63,7 @@ use crate::duckdb::DataChunkRef; use crate::duckdb::DatabaseRef; use crate::duckdb::DuckdbStringMap; use crate::duckdb::DuckdbStringMapRef; +use crate::duckdb::ExpressionRef; use crate::duckdb::LogicalTypeRef; use crate::duckdb::TableFilterSet; use crate::duckdb::TableFilterSetRef; @@ -53,7 +81,7 @@ pub trait MultiFileFunction: Sized + Debug { /// Bind-time data, populated from options and (the schema of) the first /// file. Must be `Send` because DuckDB may move it across threads. - type BindData: Send; + type BindData: Clone + Send; /// Global state for one query invocation. Shared across worker threads. type GlobalState: Send + Sync; @@ -62,21 +90,47 @@ pub trait MultiFileFunction: Sized + Debug { type LocalState; /// Per-file reader. Created when DuckDB first opens a file, dropped when - /// scanning of that file finishes. - type Reader: BaseFileReader; + /// scanning of that file finishes. DuckDB stores it in a shared pointer and + /// may call read-only scan callbacks from multiple workers, so shared + /// callbacks must be thread-safe. + type Reader: BaseFileReader + Sync; + + /// Whether DuckDB may pass pushed table filters to + /// [`BaseFileReader::prepare_reader`]. + const FILTER_PUSHDOWN: bool = false; + + /// Whether DuckDB may omit filter-only columns from final table-scan + /// output. + /// + /// Only meaningful when [`Self::FILTER_PUSHDOWN`] is true. + const FILTER_PRUNE: bool = false; /// Construct default options. Called once per bind. fn create_options(ctx: &ClientContextRef) -> VortexResult; + /// Push a complex filter expression into bind data. + /// + /// Returning `true` tells DuckDB the filter is handled exactly and may be + /// removed from the remaining plan. Returning `false` leaves it for DuckDB + /// to apply above the scan or turn into a regular table filter. + fn pushdown_complex_filter( + bind_data: &mut Self::BindData, + expr: &ExpressionRef, + ) -> VortexResult { + let _ = (bind_data, expr); + Ok(false) + } + /// Build bind data from options. Takes ownership of the options struct. fn initialize_bind_data(options: Self::ReaderOptions) -> VortexResult; /// Populate the result schema. DuckDB picks the first file in the file list /// to bind against; the implementation should open it (cheaply, metadata- - /// only if possible) and append columns to `schema`. + /// only if possible), record any bind-time metadata it needs, and append + /// columns to `schema`. fn bind_reader( ctx: &ClientContextRef, - bind_data: &Self::BindData, + bind_data: &mut Self::BindData, first_file: &str, schema: &mut SchemaBuilder, ) -> VortexResult<()>; @@ -91,7 +145,10 @@ pub trait MultiFileFunction: Sized + Debug { fn init_local(global: &Self::GlobalState) -> Self::LocalState; /// Open a per-file reader. Called once per file, on the thread that won the - /// race to open it. + /// race to open it. DuckDB has dropped the global multi-file scheduling + /// mutex before this call, but holds a per-file mutex for this reader. It is + /// reasonable to open file metadata here; do not do per-scan or per-split + /// work here because projection/filter state is not fully prepared yet. fn create_reader( ctx: &ClientContextRef, global: &Self::GlobalState, @@ -106,54 +163,105 @@ pub trait MultiFileFunction: Sized + Debug { Cardinality::Unknown } + /// Exact partition statistics for a file, if already available cheaply. + /// + /// DuckDB uses these to fold aggregates such as `COUNT(*)` during + /// optimization. Returning `None` leaves the scan plan unchanged. + fn partition_stats( + _ctx: &ClientContextRef, + _bind_data: &Self::BindData, + _file_path: &str, + ) -> VortexResult> { + Ok(None) + } + /// Populate the bind-time EXPLAIN map with key/value pairs (typical keys: /// `Function`, `Files`, `Projection`, `Filters`). Default no-op. fn to_string(_bind_data: &Self::BindData, _map: &mut DuckdbStringMapRef) {} } +/// Exact per-file partition statistics exposed to DuckDB's optimizer. +#[derive(Clone, Copy, Debug)] +pub struct PartitionStats { + /// Exact number of rows in this file. + pub row_count: u64, +} + +/// A column DuckDB asks a [`BaseFileReader`] to produce in the intermediate +/// scan chunk. +#[derive(Clone, Copy, Debug)] +pub struct ProjectedColumn<'a> { + /// Column name in the file-local scan chunk. + pub name: &'a str, + /// DuckDB column id. Physical columns use a file-local id; virtual columns + /// use DuckDB's global virtual column id. + pub column_id: u64, + /// True when this column is one of DuckDB's virtual columns. + pub is_virtual: bool, + /// True when DuckDB's final output expressions reference this column. + /// + /// False columns are filter-only: the reader may use them for pushed filter + /// evaluation, but does not need to materialize them into the scan chunk. + pub is_projected: bool, +} + /// Per-file reader contract. Implementations are owned by DuckDB once handed /// off via [`MultiFileFunction::create_reader`] and dropped when scanning of /// that file completes. /// -/// Note: this trait is intentionally not `Send`. DuckDB's `MultiFileFunction` -/// guarantees one-thread-at-a-time access to a given reader (it threads them -/// through `MultiFileLocalState` and acquires per-file locks on transitions); -/// requiring `Send` here would force `BaseFileReader` impls to wrap their -/// scan iterators in synchronization primitives unnecessarily. -pub trait BaseFileReader { - type GlobalState; - type LocalState; - +/// DuckDB calls [`Self::try_initialize_scan`] while holding its global +/// multi-file lock. That method should claim one independent unit of scan work +/// and store only its descriptor in `LocalState`. [`Self::prepare_scan`] then +/// initializes actual per-worker state outside that lock. [`Self::scan`] drains +/// only that local state and may overlap with later +/// [`Self::try_initialize_scan`] calls on the same reader. +pub trait BaseFileReader { /// Configure projection and filter pushdown. Called once after the reader /// is created and before any [`Self::try_initialize_scan`] call. - /// `projection` is the ordered list of column names DuckDB needs the - /// chunks to contain (one entry per chunk column). `filters` carries any + /// `projection` is the ordered list of intermediate scan columns DuckDB + /// allocated for this reader. Filter-only columns have + /// [`ProjectedColumn::is_projected`] set to false. `filters` carries any /// filters DuckDB pushed down for this scan. /// /// Default: no-op (reader scans all columns, no filter pushdown). fn prepare_reader( &mut self, - projection: &[&str], + projection: &[ProjectedColumn<'_>], filters: Option<&TableFilterSetRef>, ) -> VortexResult<()> { let _ = (projection, filters); Ok(()) } - /// Set up scan state for the next batch. Called under DuckDB's per-file - /// lock; should not block on I/O. Return `false` once exhausted. + /// Set up scan state for the next batch. Called under DuckDB's global + /// multi-file scheduling lock; this should only claim work into `local`. + /// Do not open readers, call `block_on`, or construct scan iterators here. + /// Return `false` once exhausted. fn try_initialize_scan( - &mut self, - global: &Self::GlobalState, - local: &mut Self::LocalState, + &self, + global: &GlobalState, + local: &mut LocalState, ) -> VortexResult; + /// Initialize local scan state for the work claimed by + /// [`Self::try_initialize_scan`]. DuckDB calls this outside its global + /// multi-file scheduling lock, so implementations may open per-split + /// iterators, block on async setup, or build scan pipelines here. + /// + /// Default: no-op. + fn prepare_scan(&self, global: &GlobalState, local: &mut LocalState) -> VortexResult<()> { + let _ = (global, local); + Ok(()) + } + /// Produce the next batch into `chunk`. Setting `chunk` to size 0 signals - /// end-of-file; otherwise non-empty implies more may follow. + /// end-of-assignment; otherwise non-empty implies more may follow. This is + /// called outside DuckDB's global multi-file scheduling lock after + /// [`Self::prepare_scan`]. fn scan( - &mut self, - global: &Self::GlobalState, - local: &mut Self::LocalState, + &self, + global: &GlobalState, + local: &mut LocalState, chunk: &mut DataChunkRef, ) -> VortexResult<()>; @@ -202,9 +310,13 @@ impl DatabaseRef { ) -> VortexResult<()> { let vtab = cpp::duckdb_vx_mff_vtab_t { name: name.as_ptr(), + filter_pushdown: T::FILTER_PUSHDOWN, + filter_prune: T::FILTER_PRUNE, + pushdown_complex_filter: Some(pushdown_complex_filter::), create_options: Some(create_options::), free_options: Some(free_options::), initialize_bind_data: Some(initialize_bind_data::), + clone_bind_data: Some(clone_bind_data::), free_bind_data: Some(free_bind_data::), bind_reader: Some(bind_reader::), init_global: Some(init_global::), @@ -215,10 +327,12 @@ impl DatabaseRef { free_reader: Some(free_reader::), prepare_reader: Some(prepare_reader::), try_initialize_scan: Some(try_initialize_scan::), + prepare_scan: Some(prepare_scan::), scan: Some(scan::), get_statistics: Some(get_statistics::), progress_in_file: Some(progress_in_file::), cardinality: Some(cardinality::), + partition_stats: Some(partition_stats::), to_string: Some(to_string::), }; @@ -273,6 +387,28 @@ unsafe extern "C-unwind" fn free_bind_data( } } +unsafe extern "C-unwind" fn clone_bind_data( + bind_data: cpp::duckdb_vx_mff_bind_data, + error_out: *mut cpp::duckdb_vx_error, +) -> cpp::duckdb_vx_mff_bind_data { + let bind_data = + unsafe { bind_data.cast::().as_ref() }.vortex_expect("bind_data null"); + try_or(error_out, || { + Ok(Box::into_raw(Box::new(bind_data.clone())).cast()) + }) +} + +unsafe extern "C-unwind" fn pushdown_complex_filter( + bind_data: cpp::duckdb_vx_mff_bind_data, + expr: cpp::duckdb_vx_expr, + error_out: *mut cpp::duckdb_vx_error, +) -> bool { + let bind_data = + unsafe { bind_data.cast::().as_mut() }.vortex_expect("bind_data null"); + let expr = unsafe { crate::duckdb::Expression::borrow(expr) }; + try_or(error_out, || T::pushdown_complex_filter(bind_data, expr)) +} + unsafe extern "C-unwind" fn bind_reader( ctx: cpp::duckdb_client_context, bind_data: cpp::duckdb_vx_mff_bind_data, @@ -283,7 +419,7 @@ unsafe extern "C-unwind" fn bind_reader( ) { let ctx = unsafe { ClientContext::borrow(ctx) }; let bind_data = - unsafe { bind_data.cast::().as_ref() }.vortex_expect("bind_data null"); + unsafe { bind_data.cast::().as_mut() }.vortex_expect("bind_data null"); let mut builder = SchemaBuilder { raw: schema_writer }; try_or(error_out, || { let path_bytes = unsafe { slice::from_raw_parts(file_path.cast::(), path_len) }; @@ -370,16 +506,21 @@ unsafe extern "C-unwind" fn prepare_reader( Some(unsafe { TableFilterSet::borrow(filters) }) }; try_or(error_out, || { - // Materialize column names into &str borrows scoped to this call. - let mut names: Vec<&str> = Vec::with_capacity(projection_count); + // Materialize column metadata with &str borrows scoped to this call. + let mut projected_columns = Vec::with_capacity(projection_count); for i in 0..projection_count { let col = unsafe { &*projection.add(i) }; let bytes = unsafe { slice::from_raw_parts(col.name.cast::(), col.name_len) }; let name = std::str::from_utf8(bytes) .map_err(|e| vortex_err!("projection column name not UTF-8: {e}"))?; - names.push(name); + projected_columns.push(ProjectedColumn { + name, + column_id: col.column_id, + is_virtual: col.is_virtual, + is_projected: col.is_projected, + }); } - reader.prepare_reader(&names, filter_ref) + reader.prepare_reader(&projected_columns, filter_ref) }); } @@ -389,12 +530,24 @@ unsafe extern "C-unwind" fn try_initialize_scan( local: cpp::duckdb_vx_mff_local, error_out: *mut cpp::duckdb_vx_error, ) -> bool { - let reader = unsafe { reader.cast::().as_mut() }.vortex_expect("reader null"); + let reader = unsafe { reader.cast::().as_ref() }.vortex_expect("reader null"); let global = unsafe { global.cast::().as_ref() }.vortex_expect("global null"); let local = unsafe { local.cast::().as_mut() }.vortex_expect("local null"); try_or(error_out, || reader.try_initialize_scan(global, local)) } +unsafe extern "C-unwind" fn prepare_scan( + reader: cpp::duckdb_vx_mff_reader, + global: cpp::duckdb_vx_mff_global, + local: cpp::duckdb_vx_mff_local, + error_out: *mut cpp::duckdb_vx_error, +) { + let reader = unsafe { reader.cast::().as_ref() }.vortex_expect("reader null"); + let global = unsafe { global.cast::().as_ref() }.vortex_expect("global null"); + let local = unsafe { local.cast::().as_mut() }.vortex_expect("local null"); + try_or(error_out, || reader.prepare_scan(global, local)) +} + unsafe extern "C-unwind" fn scan( reader: cpp::duckdb_vx_mff_reader, global: cpp::duckdb_vx_mff_global, @@ -402,7 +555,7 @@ unsafe extern "C-unwind" fn scan( chunk: cpp::duckdb_data_chunk, error_out: *mut cpp::duckdb_vx_error, ) -> bool { - let reader = unsafe { reader.cast::().as_mut() }.vortex_expect("reader null"); + let reader = unsafe { reader.cast::().as_ref() }.vortex_expect("reader null"); let global = unsafe { global.cast::().as_ref() }.vortex_expect("global null"); let local = unsafe { local.cast::().as_mut() }.vortex_expect("local null"); let chunk_ref = unsafe { DataChunk::borrow_mut(chunk) }; @@ -473,6 +626,30 @@ unsafe extern "C-unwind" fn cardinality( } } +unsafe extern "C-unwind" fn partition_stats( + ctx: cpp::duckdb_client_context, + bind_data: cpp::duckdb_vx_mff_bind_data, + file_path: *const std::os::raw::c_char, + path_len: usize, + out: *mut cpp::duckdb_vx_mff_partition_stats, + error_out: *mut cpp::duckdb_vx_error, +) -> bool { + let ctx = unsafe { ClientContext::borrow(ctx) }; + let bind_data = + unsafe { bind_data.cast::().as_ref() }.vortex_expect("bind_data null"); + try_or(error_out, || { + let path_bytes = unsafe { slice::from_raw_parts(file_path.cast::(), path_len) }; + let path = std::str::from_utf8(path_bytes) + .map_err(|e| vortex_err!("file path is not UTF-8: {e}"))?; + let Some(stats) = T::partition_stats(ctx, bind_data, path)? else { + return Ok(false); + }; + let out = unsafe { &mut *out }; + out.row_count = stats.row_count; + Ok(true) + }) +} + unsafe extern "C-unwind" fn to_string( bind_data: cpp::duckdb_vx_mff_bind_data, map: cpp::duckdb_vx_string_map, diff --git a/vortex-duckdb/src/e2e_test/vortex_scan_test.rs b/vortex-duckdb/src/e2e_test/vortex_scan_test.rs index 15fbf7d7143..cda8b147ba5 100644 --- a/vortex-duckdb/src/e2e_test/vortex_scan_test.rs +++ b/vortex-duckdb/src/e2e_test/vortex_scan_test.rs @@ -3,7 +3,6 @@ //! This module contains tests for the `vortex_scan` table function. -use std::ffi::CStr; use std::io::Write; use std::net::TcpListener; use std::path::Path; @@ -185,8 +184,7 @@ fn test_scan_function_registration() { let chunk = result.into_iter().next().unwrap(); let vec = chunk.get_vector(0); let mut result = vec.as_slice_with_len::(chunk.len().as_())[0]; - let string = - unsafe { CStr::from_ptr(cpp::duckdb_string_t_data(&raw mut result)).to_string_lossy() }; + let string = String::from_duckdb_value(&mut result); assert_eq!(string, "vortex_scan"); } @@ -1030,8 +1028,7 @@ fn test_read_vortex_v2_strings() { let len = chunk.len().as_(); let vec = chunk.get_vector_mut(0); let mut s = unsafe { vec.as_slice_mut::(len) }[0]; - let path = unsafe { CStr::from_ptr(cpp::duckdb_string_t_data(&raw mut s)).to_string_lossy() }; - let aggregated: String = path.into_owned(); + let aggregated = String::from_duckdb_value(&mut s); assert_eq!(aggregated, "alpha,beta,gamma"); } @@ -1056,3 +1053,239 @@ fn test_read_vortex_v2_multiple_files() { let total = vec.as_slice_with_len::(chunk.len().as_())[0]; assert_eq!(total, 210); } + +#[test] +fn test_read_vortex_v2_filters_on_unprojected_column() { + let file = RUNTIME.block_on(async { + write_vortex_file( + [ + ( + "payload", + PrimitiveArray::from_iter([10i32, 20, 30, 40, 50]), + ), + ("unused_a", PrimitiveArray::from_iter([1i32, 1, 1, 1, 1])), + ("unused_b", PrimitiveArray::from_iter([2i32, 2, 2, 2, 2])), + ("filter_key", PrimitiveArray::from_iter([1i32, 2, 3, 4, 5])), + ] + .into_iter(), + ) + .await + }); + + let conn = database_connection(); + let file_path = file.path().to_string_lossy(); + let result = conn + .query(&format!( + "SELECT SUM(payload) FROM read_vortex_v2('{file_path}') WHERE filter_key <= 2" + )) + .unwrap(); + let chunk = result.into_iter().next().unwrap(); + let vec = chunk.get_vector(0); + let total = vec.as_slice_with_len::(chunk.len().as_())[0]; + assert_eq!(total, 30); +} + +#[test] +fn test_read_vortex_v2_exposes_dynamic_filter_pushdown() { + let file = RUNTIME.block_on(async { + let numbers = PrimitiveArray::from_iter(0i32..10_000); + write_single_column_vortex_file("number", numbers).await + }); + + let conn = database_connection(); + let file_path = file.path().to_string_lossy(); + let result = conn + .query(&format!( + "EXPLAIN SELECT number FROM read_vortex_v2('{file_path}') ORDER BY number LIMIT 5" + )) + .unwrap(); + + let mut explain = String::new(); + for mut chunk in result { + let len = chunk.len().as_(); + for column_idx in 0..chunk.column_count() { + let vector = chunk.get_vector_mut(column_idx); + for value in unsafe { vector.as_slice_mut::(len) } { + explain.push_str(&String::from_duckdb_value(value)); + explain.push('\n'); + } + } + } + + assert!( + explain.contains("Dynamic Filter"), + "expected read_vortex_v2 EXPLAIN to include a pushed dynamic filter, got:\n{explain}" + ); +} + +#[test] +fn test_read_vortex_v2_pushes_complex_contains_filter() { + let file = RUNTIME.block_on(async { + write_vortex_file( + [ + ( + "URL", + VarBinArray::from(vec![ + "https://example.com", + "https://www.google.com/search", + "https://mail.google.com", + "https://vortex.dev", + ]) + .into_array(), + ), + ( + "EventTime", + PrimitiveArray::from_iter([40i32, 30, 20, 10]).into_array(), + ), + ] + .into_iter(), + ) + .await + }); + + let conn = database_connection(); + let file_path = file.path().to_string_lossy(); + let count = conn + .query(&format!( + "SELECT COUNT(*) FROM read_vortex_v2('{file_path}') WHERE URL LIKE '%google%'" + )) + .unwrap() + .into_iter() + .next() + .unwrap() + .get_vector(0) + .as_slice_with_len::(1)[0]; + assert_eq!(count, 2); + + let result = conn + .query(&format!( + "EXPLAIN SELECT * FROM read_vortex_v2('{file_path}') WHERE URL LIKE '%google%' ORDER BY EventTime LIMIT 10" + )) + .unwrap(); + + let mut explain = String::new(); + for mut chunk in result { + let len = chunk.len().as_(); + for column_idx in 0..chunk.column_count() { + let vector = chunk.get_vector_mut(column_idx); + for value in unsafe { vector.as_slice_mut::(len) } { + explain.push_str(&String::from_duckdb_value(value)); + explain.push('\n'); + } + } + } + + assert!( + !explain.contains("│ FILTER │"), + "expected the URL contains filter to be removed from the standalone DuckDB FILTER, got:\n{explain}" + ); + assert!( + !explain.contains("contains(URL, 'google')"), + "expected the URL contains filter to be fully pushed into read_vortex_v2, got:\n{explain}" + ); +} + +#[test] +fn test_read_vortex_v2_uses_late_materialization_for_top_n() { + let file = RUNTIME.block_on(async { + write_vortex_file( + [ + ( + "URL", + VarBinArray::from(vec![ + "https://example.com", + "https://www.google.com/search", + "https://mail.google.com", + "https://vortex.dev", + ]) + .into_array(), + ), + ( + "EventTime", + PrimitiveArray::from_iter([40i32, 30, 20, 10]).into_array(), + ), + ( + "WatchID", + PrimitiveArray::from_iter([100i64, 200, 300, 400]).into_array(), + ), + ( + "JavaEnable", + PrimitiveArray::from_iter([1i8, 0, 1, 0]).into_array(), + ), + ] + .into_iter(), + ) + .await + }); + + let conn = database_connection(); + let file_path = file.path().to_string_lossy(); + let result = conn + .query(&format!( + "SELECT string_agg(URL, ',') FROM (SELECT * FROM read_vortex_v2('{file_path}') WHERE URL LIKE '%google%' ORDER BY EventTime LIMIT 2)" + )) + .unwrap(); + let mut chunk = result.into_iter().next().unwrap(); + let len = chunk.len().as_(); + let vector = chunk.get_vector_mut(0); + let mut value = unsafe { vector.as_slice_mut::(len) }[0]; + assert_eq!( + String::from_duckdb_value(&mut value), + "https://mail.google.com,https://www.google.com/search" + ); + + let result = conn + .query(&format!( + "EXPLAIN SELECT * FROM read_vortex_v2('{file_path}') WHERE URL LIKE '%google%' ORDER BY EventTime LIMIT 2" + )) + .unwrap(); + + let mut explain = String::new(); + for mut chunk in result { + let len = chunk.len().as_(); + for column_idx in 0..chunk.column_count() { + let vector = chunk.get_vector_mut(column_idx); + for value in unsafe { vector.as_slice_mut::(len) } { + explain.push_str(&String::from_duckdb_value(value)); + explain.push('\n'); + } + } + } + + assert!( + explain.contains("HASH_JOIN") && explain.contains("SEMI"), + "expected read_vortex_v2 TopN plan to use a late-materialization semi join, got:\n{explain}" + ); + assert!( + explain.contains("file_row_number"), + "expected late materialization to project file_row_number as a row id, got:\n{explain}" + ); +} + +#[test] +fn test_read_vortex_v2_many_large_files_parallel_scan() { + let (tempdir, _files) = RUNTIME.block_on(async { + let tempdir = tempfile::tempdir().unwrap(); + let mut files = Vec::new(); + for file_idx in 0..32 { + let start = file_idx * 10_000; + let numbers = PrimitiveArray::from_iter(start..start + 10_000); + files.push(write_vortex_file_to_dir(tempdir.path(), "number", numbers).await); + } + (tempdir, files) + }); + + let conn = database_connection(); + conn.query("SET threads = 8").unwrap(); + + let glob_pattern = format!("{}/*.vortex", tempdir.path().display()); + let result = conn + .query(&format!( + "SELECT SUM(number) FROM read_vortex_v2('{glob_pattern}')" + )) + .unwrap(); + let chunk = result.into_iter().next().unwrap(); + let vec = chunk.get_vector(0); + let total = vec.as_slice_with_len::(chunk.len().as_())[0]; + assert_eq!(total, 51_199_840_000); +} diff --git a/vortex-duckdb/src/exporter/mod.rs b/vortex-duckdb/src/exporter/mod.rs index 517776f5521..7c8f2943be4 100644 --- a/vortex-duckdb/src/exporter/mod.rs +++ b/vortex-duckdb/src/exporter/mod.rs @@ -46,6 +46,11 @@ pub struct ArrayExporter { /// Columns DuckDB requested to read from file. If empty, it's a zero-column /// projection and should be handled accordingly, see ArrayExporter::export. fields: Vec>, + /// Optional sparse mapping from Vortex struct fields to DuckDB chunk + /// vectors. Used by DuckDB's multi-file scan when filter-only columns are + /// present in the intermediate chunk but not materialized by Vortex. + field_positions: Option>, + chunk_column_count: Option, array_len: usize, remaining: usize, } @@ -67,6 +72,41 @@ impl ArrayExporter { Ok(Self { ctx, fields, + field_positions: None, + chunk_column_count: None, + array_len: array.len(), + remaining: array.len(), + }) + } + + pub fn try_new_with_positions( + array: &StructArray, + cache: &ConversionCache, + mut ctx: ExecutionCtx, + field_positions: Vec, + chunk_column_count: usize, + ) -> VortexResult { + let validity = array.validity()?.execute_mask(array.len(), &mut ctx)?; + assert!(validity.all_true()); + + let fields = array + .iter_unmasked_fields() + .map(|field| new_array_exporter(field.clone(), cache, &mut ctx)) + .collect::>>()?; + + if fields.len() != field_positions.len() { + vortex_bail!( + "Expected {} output positions for {} fields", + fields.len(), + field_positions.len() + ); + } + + Ok(Self { + ctx, + fields, + field_positions: Some(field_positions), + chunk_column_count: Some(chunk_column_count), array_len: array.len(), remaining: array.len(), }) @@ -88,6 +128,32 @@ impl ArrayExporter { let zero_projection = self.fields.is_empty(); + if let Some(field_positions) = &self.field_positions { + let expected_cols = self + .chunk_column_count + .vortex_expect("sparse exporter missing chunk column count"); + let chunk_cols = chunk.column_count(); + if chunk_cols != expected_cols { + vortex_bail!("Expected {expected_cols} columns in output chunk, got {chunk_cols}"); + } + + let chunk_len = duckdb_vector_size().min(self.remaining); + let position = self.array_len - self.remaining; + self.remaining -= chunk_len; + chunk.set_len(chunk_len); + + for (field, pos) in self.fields.iter().zip(field_positions.iter().copied()) { + field.export( + position, + chunk_len, + chunk.get_vector_mut(pos), + &mut self.ctx, + )?; + } + + return Ok(true); + } + // file_row_number column is already populated in scan construction let expected_cols = self.fields.len() + file_index_column_pos.is_some() as usize; let chunk_cols = chunk.column_count(); diff --git a/vortex-duckdb/src/lib.rs b/vortex-duckdb/src/lib.rs index 0e57b2b342c..902c501e3a0 100644 --- a/vortex-duckdb/src/lib.rs +++ b/vortex-duckdb/src/lib.rs @@ -57,8 +57,8 @@ static SESSION: LazyLock = /// Known gaps in the v2 path (compared to v1) at time of writing: /// - No batch parallelism within a file (`TryInitializeScan` is one-shot, so /// each Vortex file is scanned by a single worker). -/// - No `union_by_name`, hive partitioning columns, or `filename` / -/// `file_row_number` virtual columns wired through. +/// - No `union_by_name`, hive partitioning columns, or `filename` virtual +/// column wired through. /// - No support for the named parameters DuckDB's `MultiFileReader` adds /// (`union_by_name`, `hive_partitioning`, …) — `ParseOption` returns false. /// - No `COPY ... FROM 'x.vortex'` via this path. @@ -83,6 +83,12 @@ pub fn initialize(db: &DatabaseRef) -> VortexResult<()> { LogicalType::varchar(), Value::from("vortex"), )?; + db.config().add_extension_options( + "vortex_metadata_cache", + "Cache Vortex file metadata - useful when reading the same files multiple times.", + LogicalType::bool(), + Value::from(false), + )?; if use_multi_file_function() { // Replace the table-function-based scan with the MultiFileFunction // path under the canonical names. Also expose under v2 names so an A/B diff --git a/vortex-duckdb/src/multi_file_function.rs b/vortex-duckdb/src/multi_file_function.rs index 15631b7840c..0ded1c86dc3 100644 --- a/vortex-duckdb/src/multi_file_function.rs +++ b/vortex-duckdb/src/multi_file_function.rs @@ -9,60 +9,128 @@ //! [`VortexFile`] directly so file-level statistics, dtype, and pruning are //! available without going through `MultiLayoutDataSource`. +use std::collections::VecDeque; +use std::ffi::CStr; use std::fmt::Debug; +use std::ops::Range; +use std::sync::atomic::AtomicBool; +use std::sync::atomic::AtomicU64; +use std::sync::atomic::Ordering; +use itertools::Itertools; +use parking_lot::Mutex; use vortex::array::ArrayRef; use vortex::array::Canonical; use vortex::array::VortexSessionExecute; +use vortex::array::arrays::ScalarFn; use vortex::array::arrays::Struct; use vortex::array::arrays::StructArray; -use vortex::array::iter::ArrayIterator; +use vortex::array::arrays::scalar_fn::ScalarFnArrayExt; +use vortex::buffer::Buffer; use vortex::dtype::DType; use vortex::dtype::FieldName; use vortex::dtype::FieldNames; +use vortex::dtype::PType; use vortex::error::VortexResult; use vortex::error::vortex_err; use vortex::expr::Expression; +use vortex::expr::VortexExprExt; use vortex::expr::and_collect; +use vortex::expr::cast; use vortex::expr::col; +use vortex::expr::merge; +use vortex::expr::pack; use vortex::expr::root; use vortex::expr::select; +use vortex::file::Footer; use vortex::file::OpenOptionsSessionExt; use vortex::file::VortexFile; use vortex::io::runtime::BlockingRuntime; +use vortex::io::runtime::Task; +use vortex::layout::layouts::row_idx::row_idx; +use vortex::layout::scan::split_by::SplitBy; +use vortex::scalar_fn::fns::pack::Pack; +use vortex::scan::selection::Selection; use crate::RUNTIME; use crate::SESSION; +use crate::convert::try_from_bound_expression; use crate::convert::try_from_table_filter; +use crate::convert::try_from_virtual_column_filter; +use crate::cpp::DUCKDB_VX_EXPR_TYPE; use crate::duckdb::BaseFileReader; use crate::duckdb::Cardinality; use crate::duckdb::ClientContextRef; use crate::duckdb::ColumnStatistics; use crate::duckdb::DataChunkRef; use crate::duckdb::DuckdbStringMapRef; +use crate::duckdb::ExpressionClass; +use crate::duckdb::ExpressionRef; +use crate::duckdb::ExtractedValue; use crate::duckdb::LogicalType; use crate::duckdb::MultiFileFunction; +use crate::duckdb::PartitionStats; +use crate::duckdb::ProjectedColumn; use crate::duckdb::SchemaBuilder; use crate::duckdb::TableFilterSetRef; +use crate::duckdb::duckdb_vector_size; use crate::exporter::ArrayExporter; use crate::exporter::ConversionCache; use crate::filesystem::resolve_filesystem; use crate::multi_file::parse_glob_url; +type ScanTask = Task>>; + +const VORTEX_METADATA_CACHE_SETTING: &CStr = c"vortex_metadata_cache"; +const VORTEX_FOOTER_CACHE_TYPE: &CStr = c"vortex_footer"; +const DEFAULT_FOOTER_CACHE_BYTES: usize = 10 * 1024; +const FILE_ROW_NUMBER_COLUMN_ID: u64 = 9223372036854775809; +const FILE_INDEX_COLUMN_ID: u64 = 9223372036854775810; +// DuckDB drains one multi-file reader at a time, so raw layout-level splits can +// create thousands of tiny scan assignments for ClickBench-sized shards. +const MULTI_FILE_SCAN_MIN_SPLIT_ROWS: usize = 131_072; +const MULTI_FILE_SCAN_MAX_SPLIT_ROWS: usize = MULTI_FILE_SCAN_MIN_SPLIT_ROWS * 2; + /// Open a [`VortexFile`] using whichever filesystem the user has configured /// via the `vortex_filesystem` extension option. DuckDB has already expanded /// any glob and chosen this exact path; we only need the right reader for /// the URL scheme. Routing through the filesystem also lets HTTP/S3/etc. /// transparently use DuckDB's `httpfs` when the user picks `'duckdb'`. fn open_vortex_file(ctx: &ClientContextRef, path: &str) -> VortexResult { + let metadata_cache_enabled = vortex_metadata_cache_enabled(ctx); + let cached_footer = if metadata_cache_enabled { + // SAFETY: this module is the only writer for `vortex_footer` entries, + // and it stores exactly `Footer` values for this object type. + unsafe { ctx.object_cache_get_cloned::