diff --git a/benchmarks/duckdb-bench/src/lib.rs b/benchmarks/duckdb-bench/src/lib.rs index fed9f82b004..308553852c0 100644 --- a/benchmarks/duckdb-bench/src/lib.rs +++ b/benchmarks/duckdb-bench/src/lib.rs @@ -90,7 +90,7 @@ impl DuckClient { let connection = db.connect()?; vortex_duckdb::initialize(&db)?; - // Enable Parquet metadata cache for all benchmark runs. + // Enable metadata caches for all benchmark runs. // // `parquet_metadata_cache` is an extension-specific option that's // only available after the Parquet extension is loaded. The Parquet @@ -100,6 +100,7 @@ impl DuckClient { // "Invalid Input Error: The following options were not recognized: // parquet_metadata_cache" when running DuckDB in debug mode. connection.query("SET parquet_metadata_cache = true")?; + connection.query("SET vortex_metadata_cache = true")?; Ok((db, connection)) } diff --git a/vortex-duckdb/build.rs b/vortex-duckdb/build.rs index e435e9ba063..e1ef334334f 100644 --- a/vortex-duckdb/build.rs +++ b/vortex-duckdb/build.rs @@ -20,7 +20,7 @@ const DUCKDB_SOURCE_COMMIT_URL: &str = "https://github.com/duckdb/duckdb/archive const BUILD_ARTIFACTS: [&str; 3] = ["libduckdb.dylib", "libduckdb.so", "libduckdb_static.a"]; -const SOURCE_FILES: [&str; 17] = [ +const SOURCE_FILES: [&str; 19] = [ "cpp/client_context.cpp", "cpp/config.cpp", "cpp/copy_function.cpp", @@ -30,6 +30,8 @@ const SOURCE_FILES: [&str; 17] = [ "cpp/expr.cpp", "cpp/file_system.cpp", "cpp/logical_type.cpp", + "cpp/multi_file_function.cpp", + "cpp/object_cache.cpp", "cpp/replacement_scan.cpp", "cpp/reusable_dict.cpp", "cpp/scalar_function.cpp", diff --git a/vortex-duckdb/cpp/file_system.cpp b/vortex-duckdb/cpp/file_system.cpp index 11083ad6b86..562a12b0855 100644 --- a/vortex-duckdb/cpp/file_system.cpp +++ b/vortex-duckdb/cpp/file_system.cpp @@ -12,11 +12,21 @@ DUCKDB_INCLUDES_BEGIN #include DUCKDB_INCLUDES_END +#include #include using namespace duckdb; using vortex::SetError; +struct duckdb_vx_file_handle_ { + explicit duckdb_vx_file_handle_(shared_ptr context, unique_ptr handle) + : context(std::move(context)), handle(std::move(handle)) { + } + + shared_ptr context; + unique_ptr handle; +}; + extern "C" duckdb_vx_file_handle duckdb_vx_fs_open(duckdb_client_context ctx, const char *path, duckdb_vx_error *error_out) { if (!ctx || !path) { @@ -29,7 +39,7 @@ duckdb_vx_fs_open(duckdb_client_context ctx, const char *path, duckdb_vx_error * try { auto &fs = FileSystem::GetFileSystem(*client_context); auto handle = fs.OpenFile(path, FileFlags::FILE_FLAGS_READ | FileFlags::FILE_FLAGS_PARALLEL_ACCESS); - return reinterpret_cast(handle.release()); + return new duckdb_vx_file_handle_(client_context->shared_from_this(), std::move(handle)); } catch (const std::exception &e) { SetError(error_out, e.what()); return nullptr; @@ -50,7 +60,7 @@ duckdb_vx_fs_create(duckdb_client_context ctx, const char *path, duckdb_vx_error try { auto &fs = FileSystem::GetFileSystem(*client_context); auto handle = fs.OpenFile(path, flags); - return reinterpret_cast(handle.release()); + return new duckdb_vx_file_handle_(client_context->shared_from_this(), std::move(handle)); } catch (const std::exception &e) { SetError(error_out, e.what()); return nullptr; @@ -59,7 +69,7 @@ duckdb_vx_fs_create(duckdb_client_context ctx, const char *path, duckdb_vx_error extern "C" void duckdb_vx_fs_close(duckdb_vx_file_handle *handle) { if (handle && *handle) { - delete reinterpret_cast(std::exchange(*handle, nullptr)); + delete std::exchange(*handle, nullptr); } } @@ -70,7 +80,7 @@ duckdb_vx_fs_get_size(duckdb_vx_file_handle handle, idx_t *size_out, duckdb_vx_e } try { - *size_out = reinterpret_cast(handle)->GetFileSize(); + *size_out = handle->handle->GetFileSize(); } catch (const std::exception &e) { return SetError(error_out, e.what()); } @@ -88,7 +98,7 @@ extern "C" duckdb_state duckdb_vx_fs_read(duckdb_vx_file_handle handle, } try { - reinterpret_cast(handle)->Read(buffer, len, offset); + handle->handle->Read(buffer, len, offset); *out_len = len; } catch (const std::exception &e) { return SetError(error_out, e.what()); @@ -107,7 +117,7 @@ extern "C" duckdb_state duckdb_vx_fs_write(duckdb_vx_file_handle handle, } try { - reinterpret_cast(handle)->Write(QueryContext(), buffer, len, offset); + handle->handle->Write(QueryContext(), buffer, len, offset); *out_len = len; } catch (const std::exception &e) { return SetError(error_out, e.what()); @@ -144,7 +154,7 @@ extern "C" duckdb_state duckdb_vx_fs_sync(duckdb_vx_file_handle handle, duckdb_v } try { - reinterpret_cast(handle)->Sync(); + handle->handle->Sync(); } catch (const std::exception &e) { return SetError(error_out, e.what()); } diff --git a/vortex-duckdb/cpp/include/duckdb_vx.h b/vortex-duckdb/cpp/include/duckdb_vx.h index dcad0ae1487..afe56803b2f 100644 --- a/vortex-duckdb/cpp/include/duckdb_vx.h +++ b/vortex-duckdb/cpp/include/duckdb_vx.h @@ -12,6 +12,8 @@ #include "duckdb_vx/expr.h" #include "duckdb_vx/file_system.h" #include "duckdb_vx/logical_type.h" +#include "duckdb_vx/multi_file_function.h" +#include "duckdb_vx/object_cache.h" #include "duckdb_vx/reusable_dict.h" #include "duckdb_vx/replacement_scan.h" #include "duckdb_vx/scalar_function.h" diff --git a/vortex-duckdb/cpp/include/duckdb_vx/multi_file_function.h b/vortex-duckdb/cpp/include/duckdb_vx/multi_file_function.h new file mode 100644 index 00000000000..dd7350c0e98 --- /dev/null +++ b/vortex-duckdb/cpp/include/duckdb_vx/multi_file_function.h @@ -0,0 +1,279 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +/** + * C ABI for registering a DuckDB MultiFileFunction-backed table function. + * + * Unlike duckdb_vx_tfunc_register (which wraps a single TableFunction), this exposes + * DuckDB's templated MultiFileFunction machinery: file globbing, per-file readers, + * hive partitioning, virtual columns, etc. are all driven by DuckDB itself; the + * extension only supplies a per-format reader. + * + * Lifecycle, mirroring DuckDB's Parquet reader: + * 1. create_options / initialize_bind_data / bind_reader collect bind-time options, + * metadata, and schema. + * 2. init_global / init_local create per-query and per-worker state. + * 3. create_reader opens one file. DuckDB has dropped the global multi-file scheduling + * mutex before this call and holds a per-file mutex for this reader. + * 4. prepare_reader maps the projection and filters onto the opened reader. + * 5. try_initialize_scan is called with DuckDB's global multi-file scheduling mutex held. + * It must only claim one cheap unit of scan work into local state. + * 6. prepare_scan runs outside that scheduling mutex and initializes local scan state + * for the work claimed by try_initialize_scan. + * 7. scan drains the local state prepared by prepare_scan into DuckDB chunks. + * + * Owned-pointer convention: every non-null pointer the extension returns is owned by + * DuckDB and must be released by the corresponding free_* callback. Borrowed pointers + * (passed in to callbacks) must not be freed. + */ +#pragma once + +#include "duckdb_vx/data.h" +#include "error.h" +#include "table_function.h" +#include +#include + +#ifdef __cplusplus +extern "C" { +#endif + +// Opaque, extension-owned. Lifetime is tied to the corresponding free_* callback. +typedef struct duckdb_vx_mff_options_ *duckdb_vx_mff_options; +typedef struct duckdb_vx_mff_bind_data_ *duckdb_vx_mff_bind_data; +typedef struct duckdb_vx_mff_global_ *duckdb_vx_mff_global; +typedef struct duckdb_vx_mff_local_ *duckdb_vx_mff_local; +typedef struct duckdb_vx_mff_reader_ *duckdb_vx_mff_reader; + +// Opaque writers populated by the extension during bind. +typedef struct duckdb_vx_mff_schema_writer_ *duckdb_vx_mff_schema_writer; + +// A single scan column passed to prepare_reader. `name` is borrowed for the +// duration of the call. `is_projected` distinguishes final output columns from +// filter-only scan columns. +typedef struct { + const char *name; + size_t name_len; + uint64_t column_id; + bool is_virtual; + bool is_projected; +} duckdb_vx_mff_column; + +// Exact per-file partition statistics for DuckDB's aggregate/statistics +// optimizer. Currently only row counts are exposed. +typedef struct { + uint64_t row_count; +} duckdb_vx_mff_partition_stats; + +// Opaque writer for EXPLAIN/to_string output. Same shape as +// duckdb_vx_string_map but kept distinct for FFI hygiene. +typedef duckdb_vx_string_map duckdb_vx_mff_string_map; + +/** + * Append a column to the bind schema. The name is copied; the logical type is + * cloned. Both arguments remain owned by the caller. + */ +void duckdb_vx_mff_schema_writer_add_column(duckdb_vx_mff_schema_writer writer, + const char *name, + size_t name_len, + duckdb_logical_type type); + +// vtable mirroring the subset of MultiFileReaderInterface + BaseFileReader we expose. +// All callbacks are required and must be non-null. +typedef struct { + /** Function name, e.g. "read_vortex". Must outlive the registered function. */ + const char *name; + + /** Whether DuckDB may pass pushed table filters to prepare_reader. */ + bool filter_pushdown; + + /** Whether DuckDB may omit filter-only columns from final table-scan output. */ + bool filter_prune; + + /** + * Try to push a complex filter expression into bind data. Returns true when + * the filter is handled exactly and DuckDB may remove the standalone filter. + */ + bool (*pushdown_complex_filter)(duckdb_vx_mff_bind_data bind_data, + duckdb_vx_expr expr, + duckdb_vx_error *error_out); + + // --------------------------------------------------------------------- + // Options lifecycle + // --------------------------------------------------------------------- + + /** Create a fresh, default options object. Called once per bind. */ + duckdb_vx_mff_options (*create_options)(duckdb_client_context ctx, duckdb_vx_error *error); + /** Release options created by create_options. Must accept null. */ + void (*free_options)(duckdb_vx_mff_options options); + + // --------------------------------------------------------------------- + // Bind lifecycle + // --------------------------------------------------------------------- + + /** + * Initialize bind data from options. Called once per bind, after options. + * Takes ownership of `options` (must be freed via free_options if the + * extension does not retain it). + */ + duckdb_vx_mff_bind_data (*initialize_bind_data)(duckdb_vx_mff_options options, + duckdb_vx_error *error); + /** Clone bind data. Used when DuckDB rewrites plans, e.g. late materialization. */ + duckdb_vx_mff_bind_data (*clone_bind_data)(duckdb_vx_mff_bind_data bind_data, + duckdb_vx_error *error); + /** Release bind data. Must accept null. */ + void (*free_bind_data)(duckdb_vx_mff_bind_data bind_data); + + /** + * Bind the reader's schema. Called by DuckDB after the first file in the + * file list is known. The extension should open the file (or a metadata- + * only handle) and append result columns via the schema_writer. + * + * `first_file_path` is borrowed (not nul-terminated, length given). + */ + void (*bind_reader)(duckdb_client_context ctx, + duckdb_vx_mff_bind_data bind_data, + const char *first_file_path, + size_t path_len, + duckdb_vx_mff_schema_writer schema_out, + duckdb_vx_error *error); + + // --------------------------------------------------------------------- + // Per-query state lifecycle + // --------------------------------------------------------------------- + + duckdb_vx_mff_global (*init_global)(duckdb_client_context ctx, + duckdb_vx_mff_bind_data bind_data, + duckdb_vx_error *error); + void (*free_global)(duckdb_vx_mff_global global); + + duckdb_vx_mff_local (*init_local)(duckdb_vx_mff_global global); + void (*free_local)(duckdb_vx_mff_local local); + + // --------------------------------------------------------------------- + // Per-file reader lifecycle + // --------------------------------------------------------------------- + + /** + * Open a per-file reader. Called once per file when DuckDB first opens + * that file for scanning. This may open file metadata, but should not do + * per-scan work because projection/filter state has not been prepared yet. + */ + duckdb_vx_mff_reader (*create_reader)(duckdb_client_context ctx, + duckdb_vx_mff_global global, + duckdb_vx_mff_bind_data bind_data, + const char *file_path, + size_t path_len, + size_t file_idx, + duckdb_vx_error *error); + void (*free_reader)(duckdb_vx_mff_reader reader); + + /** + * Configure the reader with the columns it should produce and any filters + * pushed down by DuckDB. Called once per (reader, scan) pair before any + * try_initialize_scan / scan calls. `projection` is the ordered list of + * intermediate scan columns DuckDB needs the chunks to contain. Columns + * marked `is_projected=false` are only needed for pushed filters and are + * not referenced by DuckDB's final output expressions. `filters` may be + * null when no filters were pushed down. + */ + void (*prepare_reader)(duckdb_vx_mff_reader reader, + const duckdb_vx_mff_column *projection, + size_t projection_count, + duckdb_vx_table_filter_set filters, + duckdb_vx_error *error); + + /** + * Try to initialize a scan over `reader`. Returns true if a scan can begin, + * false if the reader is exhausted. Called with DuckDB's multi-file global + * scheduling mutex held; must not block on I/O, run async work, or build + * expensive scan pipelines. Store only the claimed work descriptor in `local`. + */ + bool (*try_initialize_scan)(duckdb_vx_mff_reader reader, + duckdb_vx_mff_global global, + duckdb_vx_mff_local local, + duckdb_vx_error *error); + + /** + * Prepare local scan state for the work claimed by try_initialize_scan. + * Called outside DuckDB's multi-file global scheduling mutex, mirroring + * DuckDB's BaseFileReader::PrepareScan hook. + */ + void (*prepare_scan)(duckdb_vx_mff_reader reader, + duckdb_vx_mff_global global, + duckdb_vx_mff_local local, + duckdb_vx_error *error); + + /** + * Produce the next batch of data into `chunk_out`. Called outside DuckDB's + * multi-file global scheduling mutex after prepare_scan. Returns: + * - true with chunk size > 0 : more data may follow. + * - true with chunk size == 0 : reader is exhausted; DuckDB will move on. + * - false : an error occurred (see error_out). + */ + bool (*scan)(duckdb_vx_mff_reader reader, + duckdb_vx_mff_global global, + duckdb_vx_mff_local local, + duckdb_data_chunk chunk_out, + duckdb_vx_error *error); + + /** + * Get bind-time per-column statistics by name. Used when DuckDB asks for + * scan stats after copying bind data, before a per-file reader exists. + * Returns false if no stats are available. + */ + bool (*statistics)(duckdb_vx_mff_bind_data bind_data, + const char *col_name, + size_t name_len, + duckdb_column_statistics *stats_out); + + /** + * Get per-column statistics by name. Returns false if no stats are + * available. Same convention as duckdb_vx_tfunc_vtab_t::statistics. + */ + bool (*get_statistics)(duckdb_vx_mff_reader reader, + const char *col_name, + size_t name_len, + duckdb_column_statistics *stats_out); + + /** Scan progress within a file in [0.0, 100.0]. */ + double (*progress_in_file)(duckdb_vx_mff_reader reader); + + /** + * Estimated cardinality across `file_count` files. Returning false leaves + * cardinality unknown (DuckDB falls back to its own heuristic). + */ + bool (*cardinality)(duckdb_vx_mff_bind_data bind_data, + size_t file_count, + duckdb_vx_node_statistics *out); + + /** + * Get exact row count statistics for one file. Returning false means the + * stats are not currently available; DuckDB will skip statistics-based + * aggregate rewrites unless every file returns exact stats. + */ + bool (*partition_stats)(duckdb_client_context ctx, + duckdb_vx_mff_bind_data bind_data, + const char *file_path, + size_t path_len, + duckdb_vx_mff_partition_stats *out, + duckdb_vx_error *error); + + /** + * Populate the bind-time EXPLAIN map with key/value pairs (e.g. "Filters", + * "Projection"). Called whenever DuckDB renders the table function in an + * EXPLAIN output. + */ + void (*to_string)(duckdb_vx_mff_bind_data bind_data, duckdb_vx_mff_string_map map); +} duckdb_vx_mff_vtab_t; + +/** + * Register the multi-file function described by `vtab` against `ffi_db`. The + * vtab is copied into a TableFunctionInfo owned by the catalog, so the caller + * may free it after this returns. + */ +duckdb_state duckdb_vx_mff_register(duckdb_database ffi_db, const duckdb_vx_mff_vtab_t *vtab); + +#ifdef __cplusplus +} +#endif diff --git a/vortex-duckdb/cpp/include/duckdb_vx/object_cache.h b/vortex-duckdb/cpp/include/duckdb_vx/object_cache.h new file mode 100644 index 00000000000..3ac4ddeef85 --- /dev/null +++ b/vortex-duckdb/cpp/include/duckdb_vx/object_cache.h @@ -0,0 +1,37 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +#pragma once + +#include "duckdb_vx/duckdb_diagnostics.h" + +DUCKDB_INCLUDES_BEGIN +#include +DUCKDB_INCLUDES_END + +#ifdef __cplusplus +extern "C" { +#endif + +typedef struct duckdb_vx_object_cache_entry_ *duckdb_vx_object_cache_entry; + +duckdb_vx_object_cache_entry duckdb_vx_object_cache_get(duckdb_client_context ctx, + const char *key, + size_t key_len, + const char *object_type); + +void *duckdb_vx_object_cache_entry_get_data(duckdb_vx_object_cache_entry entry); + +void duckdb_vx_object_cache_entry_free(duckdb_vx_object_cache_entry *entry); + +duckdb_state duckdb_vx_object_cache_put(duckdb_client_context ctx, + const char *key, + size_t key_len, + const char *object_type, + idx_t estimated_memory, + void *data, + duckdb_delete_callback_t delete_callback); + +#ifdef __cplusplus +} +#endif diff --git a/vortex-duckdb/cpp/multi_file_function.cpp b/vortex-duckdb/cpp/multi_file_function.cpp new file mode 100644 index 00000000000..e81d14bd82c --- /dev/null +++ b/vortex-duckdb/cpp/multi_file_function.cpp @@ -0,0 +1,705 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +/** + * C++ adapters that bridge a duckdb_vx_mff_vtab_t to DuckDB's MultiFileFunction. + * + * Layered design: + * - VortexBaseFileReaderOptions : BaseFileReaderOptions - opaque options handle + * - VortexFileReader : BaseFileReader - per-file scan adapter + * - VortexMultiFileReaderInterface : MultiFileReaderInterface - cross-file orchestrator + * - VortexMultiFileFunctionOp - OP type for MultiFileFunction + * + * Each adapter holds a non-owning pointer to the registered vtab and an + * extension-owned FFI handle. The FFI handle is freed via the vtab's free_* + * callback in the destructor. + */ + +#include "duckdb_vx/data.hpp" +#include "duckdb_vx/duckdb_diagnostics.h" +#include "duckdb_vx/error.hpp" +#include "duckdb_vx/multi_file_function.h" + +#include +#include +#include +#include + +DUCKDB_INCLUDES_BEGIN +#include "duckdb.h" +#include "duckdb/catalog/catalog.hpp" +#include "duckdb/common/multi_file/base_file_reader.hpp" +#include "duckdb/common/multi_file/multi_file_data.hpp" +#include "duckdb/common/multi_file/multi_file_function.hpp" +#include "duckdb/common/multi_file/multi_file_reader.hpp" +#include "duckdb/common/multi_file/multi_file_states.hpp" +#include "duckdb/function/partition_stats.hpp" +#include "duckdb/main/capi/capi_internal.hpp" +#include "duckdb/parser/parsed_data/create_table_function_info.hpp" +DUCKDB_INCLUDES_END + +using namespace duckdb; +using vortex::IntoErrString; +constexpr column_t COLUMN_IDENTIFIER_FILE_INDEX = MultiFileReader::COLUMN_IDENTIFIER_FILE_INDEX; +constexpr column_t COLUMN_IDENTIFIER_FILE_ROW_NUMBER = MultiFileReader::COLUMN_IDENTIFIER_FILE_ROW_NUMBER; + +namespace { + +/** + * Internal bind data stored on the catalog-owned function. We keep the vtable + * here so per-bind/per-file adapters can find it without a separate registry. + */ +struct VortexMultiFileFunctionInfo : TableFunctionInfo { + explicit VortexMultiFileFunctionInfo(const duckdb_vx_mff_vtab_t &vtab_p) : vtab(vtab_p) { + } + + const duckdb_vx_mff_vtab_t vtab; +}; + +class VortexBaseFileReaderOptions : public BaseFileReaderOptions { +public: + VortexBaseFileReaderOptions(const duckdb_vx_mff_vtab_t &vtab, duckdb_vx_mff_options handle) + : vtab(vtab), handle(handle) { + } + ~VortexBaseFileReaderOptions() override { + if (handle) { + vtab.free_options(handle); + } + } + + /** Release ownership of the FFI handle to the caller. */ + duckdb_vx_mff_options Release() { + auto out = handle; + handle = nullptr; + return out; + } + + const duckdb_vx_mff_vtab_t &vtab; + +private: + duckdb_vx_mff_options handle; +}; + +/** + * Bind data attached to the MultiFileBindData. Holds the FFI bind-data handle + * for the lifetime of the prepared statement. + */ +struct VortexMultiFileBindData : public TableFunctionData { + VortexMultiFileBindData(const duckdb_vx_mff_vtab_t &vtab, duckdb_vx_mff_bind_data handle) + : vtab(vtab), handle(handle) { + } + ~VortexMultiFileBindData() override { + if (handle) { + vtab.free_bind_data(handle); + } + } + + bool SupportStatementCache() const override { + return false; + } + + unique_ptr Copy() const override { + duckdb_vx_error error_out = nullptr; + auto cloned = vtab.clone_bind_data(handle, &error_out); + if (error_out) { + throw InternalException(IntoErrString(error_out)); + } + return make_uniq(vtab, cloned); + } + + const duckdb_vx_mff_vtab_t &vtab; + duckdb_vx_mff_bind_data handle; +}; + +/** + * Global state for a single multi-file scan. Distinct from MultiFileGlobalState + * (which DuckDB owns); this is the *interface*-owned global state slot. + */ +class VortexInterfaceGlobalState : public GlobalTableFunctionState { +public: + VortexInterfaceGlobalState(const duckdb_vx_mff_vtab_t &vtab, + duckdb_vx_mff_global handle, + const MultiFileGlobalState &multi_file_state) + : vtab(vtab), handle(handle), multi_file_state(&multi_file_state) { + } + ~VortexInterfaceGlobalState() override { + if (handle) { + vtab.free_global(handle); + } + } + + const duckdb_vx_mff_vtab_t &vtab; + duckdb_vx_mff_global handle; + const MultiFileGlobalState *multi_file_state; +}; + +class VortexInterfaceLocalState : public LocalTableFunctionState { +public: + VortexInterfaceLocalState(const duckdb_vx_mff_vtab_t &vtab, duckdb_vx_mff_local handle) + : vtab(vtab), handle(handle) { + } + ~VortexInterfaceLocalState() override { + if (handle) { + vtab.free_local(handle); + } + } + + const duckdb_vx_mff_vtab_t &vtab; + duckdb_vx_mff_local handle; +}; + +static Value &UnwrapValue(duckdb_value value) { + return *(reinterpret_cast(value)); +} + +void DestroyValues(duckdb_column_statistics &stats) { + if (stats.min) { + duckdb_destroy_value(&stats.min); + } + if (stats.max) { + duckdb_destroy_value(&stats.max); + } +} + +unique_ptr NumericStatsFrom(duckdb_column_statistics &stats, const LogicalType &type) { + BaseStatistics out = BaseStatistics::CreateUnknown(type); + if (stats.min) { + NumericStats::SetMin(out, UnwrapValue(stats.min)); + duckdb_destroy_value(&stats.min); + } + if (stats.max) { + NumericStats::SetMax(out, UnwrapValue(stats.max)); + duckdb_destroy_value(&stats.max); + } + if (!stats.has_null) { + out.Set(StatsInfo::CANNOT_HAVE_NULL_VALUES); + } + return out.ToUnique(); +} + +unique_ptr StringStatsFrom(duckdb_column_statistics &stats, const LogicalType &type) { + BaseStatistics out = BaseStatistics::CreateUnknown(type); + if (stats.min) { + StringStats::SetMin(out, StringValue::Get(UnwrapValue(stats.min))); + duckdb_destroy_value(&stats.min); + } + if (stats.max) { + StringStats::SetMax(out, StringValue::Get(UnwrapValue(stats.max))); + duckdb_destroy_value(&stats.max); + } + if (stats.max_string_length >> 63) { + StringStats::SetMaxStringLength(out, uint32_t(stats.max_string_length)); + } + if (!stats.has_null) { + out.Set(StatsInfo::CANNOT_HAVE_NULL_VALUES); + } + return out.ToUnique(); +} + +unique_ptr BaseStatsFrom(duckdb_column_statistics &stats, const LogicalType &type) { + BaseStatistics out = BaseStatistics::CreateUnknown(type); + DestroyValues(stats); + if (!stats.has_null) { + out.Set(StatsInfo::CANNOT_HAVE_NULL_VALUES); + } + return out.ToUnique(); +} + +unique_ptr ColumnStatsFrom(duckdb_column_statistics &stats, const LogicalType &type) { + switch (type.id()) { + case LogicalTypeId::BOOLEAN: + case LogicalTypeId::TINYINT: + case LogicalTypeId::SMALLINT: + case LogicalTypeId::INTEGER: + case LogicalTypeId::BIGINT: + case LogicalTypeId::FLOAT: + case LogicalTypeId::DOUBLE: + case LogicalTypeId::UTINYINT: + case LogicalTypeId::USMALLINT: + case LogicalTypeId::UINTEGER: + case LogicalTypeId::UBIGINT: + case LogicalTypeId::UHUGEINT: + case LogicalTypeId::HUGEINT: + return NumericStatsFrom(stats, type); + case LogicalTypeId::VARCHAR: + case LogicalTypeId::BLOB: + return StringStatsFrom(stats, type); + case LogicalTypeId::STRUCT: + DestroyValues(stats); + return nullptr; + default: + return BaseStatsFrom(stats, type); + } +} + +/** + * Per-file reader adapter. DuckDB's MultiFileFunction drives one of these + * per opened file; each Scan call asks the extension for the next chunk. + */ +class VortexFileReader : public BaseFileReader { +public: + VortexFileReader(OpenFileInfo file_p, + const duckdb_vx_mff_vtab_t &vtab_p, + duckdb_vx_mff_reader handle_p) + : BaseFileReader(std::move(file_p)), vtab(vtab_p), handle(handle_p) { + } + ~VortexFileReader() override { + if (handle) { + vtab.free_reader(handle); + } + } + + string GetReaderType() const override { + return "vortex"; + } + + void AddVirtualColumn(column_t virtual_column_id) override { + if (columns.empty()) { + throw InternalException("Vortex reader received virtual column before column registration"); + } + virtual_column_ids[columns.size() - 1] = virtual_column_id; + } + + void PrepareReader(ClientContext &, GlobalTableFunctionState &gstate) override { + // Translate the multi-file column ids into projected column names, then + // hand DuckDB's TableFilterSet through to Rust as a borrow. The reader + // stores the resulting projection/filter so it can apply them when the + // scan starts. + auto &g = gstate.Cast(); + std::unordered_set projected_column_ids; + if (g.multi_file_state && !g.multi_file_state->projection_ids.empty()) { + projected_column_ids.reserve(g.multi_file_state->projection_ids.size()); + for (const auto &projection_id : g.multi_file_state->projection_ids) { + if (projection_id >= g.multi_file_state->column_indexes.size()) { + throw InternalException("Vortex projection id out of range"); + } + projected_column_ids.insert(g.multi_file_state->column_indexes[projection_id].GetPrimaryIndex()); + } + } + + std::vector ffi_proj; + ffi_proj.reserve(column_ids.size()); + for (idx_t i = 0; i < column_ids.size(); i++) { + auto local_id = column_ids[MultiFileLocalIndex(i)]; + // `local_id` is an index into our local `columns` schema. Physical + // columns use their local id directly; non-constant virtual columns + // are appended to `columns` by DuckDB's mapper and announced via + // AddVirtualColumn. + const auto &col = columns[local_id]; + auto virtual_entry = virtual_column_ids.find(local_id.GetId()); + const bool is_virtual = virtual_entry != virtual_column_ids.end(); + const auto column_id = is_virtual ? virtual_entry->second : local_id.GetId(); + const bool is_projected = + projected_column_ids.empty() || projected_column_ids.find(column_id) != projected_column_ids.end(); + ffi_proj.push_back({col.name.c_str(), col.name.size(), column_id, is_virtual, is_projected}); + } + auto filter_ptr = reinterpret_cast(filters.get()); + duckdb_vx_error error_out = nullptr; + vtab.prepare_reader(handle, ffi_proj.data(), ffi_proj.size(), filter_ptr, &error_out); + if (error_out) { + throw IOException(IntoErrString(error_out)); + } + } + + bool TryInitializeScan(ClientContext &, + GlobalTableFunctionState &gstate, + LocalTableFunctionState &lstate) override { + auto &g = gstate.Cast(); + auto &l = lstate.Cast(); + duckdb_vx_error error_out = nullptr; + const bool ok = vtab.try_initialize_scan(handle, g.handle, l.handle, &error_out); + if (error_out) { + throw IOException(IntoErrString(error_out)); + } + return ok; + } + + void PrepareScan(ClientContext &, + GlobalTableFunctionState &gstate, + LocalTableFunctionState &lstate) override { + auto &g = gstate.Cast(); + auto &l = lstate.Cast(); + duckdb_vx_error error_out = nullptr; + vtab.prepare_scan(handle, g.handle, l.handle, &error_out); + if (error_out) { + throw IOException(IntoErrString(error_out)); + } + } + + AsyncResult Scan(ClientContext &, + GlobalTableFunctionState &gstate, + LocalTableFunctionState &lstate, + DataChunk &chunk) override { + auto &g = gstate.Cast(); + auto &l = lstate.Cast(); + duckdb_vx_error error_out = nullptr; + auto chunk_handle = reinterpret_cast(&chunk); + const bool ok = vtab.scan(handle, g.handle, l.handle, chunk_handle, &error_out); + if (!ok || error_out) { + throw IOException(IntoErrString(error_out)); + } + // Translate "0 rows" into FINISHED so the multi-file scanner advances + // to the next file. Otherwise, signal we may have more. + return chunk.size() == 0 ? AsyncResult(SourceResultType::FINISHED) + : AsyncResult(SourceResultType::HAVE_MORE_OUTPUT); + } + + unique_ptr GetStatistics(ClientContext &, const string &name) override { + for (auto &col : columns) { + if (col.name != name) { + continue; + } + duckdb_column_statistics stats = {}; + if (!vtab.get_statistics(handle, name.c_str(), name.size(), &stats)) { + return nullptr; + } + return ColumnStatsFrom(stats, col.type); + } + return nullptr; + } + + double GetProgressInFile(ClientContext &) override { + return vtab.progress_in_file(handle); + } + +private: + const duckdb_vx_mff_vtab_t &vtab; + duckdb_vx_mff_reader handle; + std::unordered_map virtual_column_ids; +}; + +/** + * Cross-file orchestrator. Implements only the methods the basic scan pipeline + * needs; everything else (hive partitioning, COPY, union-by-name, virtual cols) + * defaults to the base-class behaviour. + */ +class VortexMultiFileReaderInterface : public MultiFileReaderInterface { +public: + VortexMultiFileReaderInterface() = default; + + unique_ptr InitializeOptions(ClientContext &context, + optional_ptr info) override { + if (!info) { + throw BinderException("Vortex multi-file function requires TableFunctionInfo"); + } + vtab = &info->Cast().vtab; + auto &vtab = Vtab(); + duckdb_vx_error error_out = nullptr; + auto ctx = reinterpret_cast(&context); + auto handle = vtab.create_options(ctx, &error_out); + if (error_out) { + throw BinderException(IntoErrString(error_out)); + } + return make_uniq(vtab, handle); + } + + bool ParseCopyOption(ClientContext &, const string &, const vector &, + BaseFileReaderOptions &, vector &, vector &) override { + return false; + } + + bool ParseOption(ClientContext &, const string &, const Value &, MultiFileOptions &, + BaseFileReaderOptions &) override { + return false; + } + + unique_ptr InitializeBindData(MultiFileBindData &, + unique_ptr options) override { + auto &vtab = Vtab(); + auto &vortex_options = options->Cast(); + // Take ownership of the options handle and pass it to the FFI. + duckdb_vx_error error_out = nullptr; + auto bind_handle = vtab.initialize_bind_data(vortex_options.Release(), &error_out); + if (error_out) { + throw BinderException(IntoErrString(error_out)); + } + return make_uniq(vtab, bind_handle); + } + + void BindReader(ClientContext &context, vector &return_types, vector &names, + MultiFileBindData &bind_data) override { + auto &vtab = Vtab(); + auto first_file = bind_data.file_list->GetFirstFile(); + auto &vortex_bind = bind_data.bind_data->Cast(); + + // Schema collection writer: a pair of vectors that the FFI populates. + struct SchemaWriter { + vector &names; + vector &types; + }; + SchemaWriter writer = {names, return_types}; + + duckdb_vx_error error_out = nullptr; + auto ctx = reinterpret_cast(&context); + vtab.bind_reader(ctx, vortex_bind.handle, first_file.path.c_str(), first_file.path.size(), + reinterpret_cast(&writer), &error_out); + if (error_out) { + throw BinderException(IntoErrString(error_out)); + } + } + + unique_ptr InitializeGlobalState(ClientContext &context, + MultiFileBindData &bind_data, + MultiFileGlobalState &multi_file_state) override { + auto &vtab = Vtab(); + auto &vortex_bind = bind_data.bind_data->Cast(); + duckdb_vx_error error_out = nullptr; + auto ctx = reinterpret_cast(&context); + auto handle = vtab.init_global(ctx, vortex_bind.handle, &error_out); + if (error_out) { + throw BinderException(IntoErrString(error_out)); + } + return make_uniq(vtab, handle, multi_file_state); + } + + unique_ptr InitializeLocalState(ExecutionContext &, + GlobalTableFunctionState &gstate) override { + auto &vtab = Vtab(); + auto &g = gstate.Cast(); + auto handle = vtab.init_local(g.handle); + return make_uniq(vtab, handle); + } + + void GetVirtualColumns(ClientContext &, MultiFileBindData &, virtual_column_map_t &result) override { + result.insert(make_pair(COLUMN_IDENTIFIER_FILE_ROW_NUMBER, + TableColumn("file_row_number", LogicalType::BIGINT))); + } + + shared_ptr CreateReader(ClientContext &, GlobalTableFunctionState &, BaseUnionData &, + const MultiFileBindData &) override { + // UNION BY NAME path - not supported yet. + throw NotImplementedException("UNION BY NAME is not yet supported by the Vortex multi-file function"); + } + + shared_ptr CreateReader(ClientContext &context, GlobalTableFunctionState &gstate, + const OpenFileInfo &file, idx_t file_idx, + const MultiFileBindData &bind_data) override { + auto &vtab = Vtab(); + auto &vortex_bind = bind_data.bind_data->Cast(); + auto &vortex_g = gstate.Cast(); + duckdb_vx_error error_out = nullptr; + auto ctx = reinterpret_cast(&context); + auto handle = vtab.create_reader(ctx, vortex_g.handle, vortex_bind.handle, file.path.c_str(), + file.path.size(), file_idx, &error_out); + if (error_out) { + throw IOException(IntoErrString(error_out)); + } + auto reader = make_shared_ptr(file, vtab, handle); + // BaseFileReader exposes its file-local schema via the `columns` field; + // the multi-file reader uses it to build the global<->local column + // mapping. We don't yet support per-file schema variation, so inherit + // the bind-time global schema directly. + reader->columns = bind_data.columns; + return reader; + } + + unique_ptr GetCardinality(ClientContext &context, const MultiFileBindData &data, + idx_t file_count) override { + auto &vtab = Vtab(); + auto &vortex_bind = data.bind_data->Cast(); + duckdb_vx_node_statistics stats = {}; + if (!vtab.cardinality(vortex_bind.handle, file_count, &stats)) { + return MultiFileReaderInterface::GetCardinality(context, data, file_count); + } + auto out = make_uniq(); + out->has_estimated_cardinality = stats.has_estimated_cardinality; + out->estimated_cardinality = stats.estimated_cardinality; + out->has_max_cardinality = stats.has_max_cardinality; + out->max_cardinality = stats.max_cardinality; + return out; + } + + unique_ptr Copy() override { + auto copy = make_uniq(); + copy->vtab = vtab; + return copy; + } + +private: + const duckdb_vx_mff_vtab_t &Vtab() const { + if (!vtab) { + throw InternalException("VortexMultiFileReaderInterface used before InitializeOptions"); + } + return *vtab; + } + + const duckdb_vx_mff_vtab_t *vtab = nullptr; +}; + +/** + * The OP type required by MultiFileFunction. Holds a pointer to the vtab so + * CreateInterface can construct a VortexMultiFileReaderInterface bound to it. + */ +struct VortexMultiFileFunctionOp { + static unique_ptr CreateInterface(ClientContext &) { + return make_uniq(); + } +}; + +void mff_pushdown_complex_filter(ClientContext &context, + LogicalGet &get, + FunctionData *bind_data_p, + vector> &filters) { + auto &data = bind_data_p->Cast(); + + MultiFilePushdownInfo info(get); + auto new_list = + data.multi_file_reader->ComplexFilterPushdown(context, *data.file_list, data.file_options, info, filters); + + if (new_list) { + data.file_list = std::move(new_list); + MultiFileReader::PruneReaders(data, *data.file_list); + } + + auto &vortex_bind = data.bind_data->Cast(); + duckdb_vx_error error_out = nullptr; + for (auto iter = filters.begin(); iter != filters.end();) { + duckdb_vx_expr ffi_expr = reinterpret_cast(iter->get()); + const bool pushed = vortex_bind.vtab.pushdown_complex_filter(vortex_bind.handle, ffi_expr, &error_out); + if (error_out) { + throw BinderException(IntoErrString(error_out)); + } + iter = pushed ? filters.erase(iter) : std::next(iter); + } +} + +unique_ptr mff_statistics(ClientContext &context, const FunctionData *bind_data_p, + column_t column_index) { + auto stats = MultiFileFunction::MultiFileScanStats(context, bind_data_p, + column_index); + if (stats) { + return stats; + } + + auto &data = bind_data_p->Cast(); + if (IsVirtualColumn(column_index) || !data.bind_data || !data.file_list) { + return nullptr; + } + if (data.file_list->GetExpandResult() == FileExpandResult::MULTIPLE_FILES) { + return nullptr; + } + if (column_index >= data.names.size() || column_index >= data.types.size()) { + return nullptr; + } + + auto &vortex_bind = data.bind_data->Cast(); + if (!vortex_bind.vtab.statistics) { + return nullptr; + } + + duckdb_column_statistics raw_stats = {}; + const auto &name = data.names[column_index]; + if (!vortex_bind.vtab.statistics(vortex_bind.handle, name.c_str(), name.size(), &raw_stats)) { + return nullptr; + } + return ColumnStatsFrom(raw_stats, data.types[column_index]); +} + +vector mff_get_partition_stats(ClientContext &context, GetPartitionStatsInput &input) { + vector result; + if (!input.bind_data) { + return result; + } + + auto &data = input.bind_data->Cast(); + if (!data.bind_data || !data.file_list) { + return result; + } + + auto &vortex_bind = data.bind_data->Cast(); + if (!vortex_bind.vtab.partition_stats) { + return result; + } + + auto ctx = reinterpret_cast(&context); + idx_t row_start = 0; + for (const auto &file : data.file_list->Files()) { + duckdb_vx_mff_partition_stats ffi_stats = {}; + duckdb_vx_error error_out = nullptr; + const bool found = vortex_bind.vtab.partition_stats(ctx, vortex_bind.handle, file.path.c_str(), + file.path.size(), &ffi_stats, &error_out); + if (error_out) { + throw IOException(IntoErrString(error_out)); + } + if (!found) { + return {}; + } + + PartitionStatistics stats; + stats.row_start = optional_idx(row_start); + stats.count = static_cast(ffi_stats.row_count); + stats.count_type = CountType::COUNT_EXACT; + result.push_back(std::move(stats)); + row_start += static_cast(ffi_stats.row_count); + } + return result; +} + +} // namespace + +extern "C" void duckdb_vx_mff_schema_writer_add_column(duckdb_vx_mff_schema_writer writer, + const char *name, + size_t name_len, + duckdb_logical_type type) { + struct SchemaWriter { + vector &names; + vector &types; + }; + auto &w = *reinterpret_cast(writer); + w.names.emplace_back(name, name_len); + w.types.emplace_back(*reinterpret_cast(type)); +} + +extern "C" duckdb_state duckdb_vx_mff_register(duckdb_database ffi_db, const duckdb_vx_mff_vtab_t *vtab) { + D_ASSERT(ffi_db); + D_ASSERT(vtab); + + const auto &wrapper = *reinterpret_cast(ffi_db); + auto &db = *wrapper.database->instance; + + // The catalog-owned TableFunctionInfo carries the vtab copy that each bind + // resolves through InitializeOptions. Keeping it there avoids a shared + // global pointer across databases/tests. + auto info = make_shared_ptr(*vtab); + + MultiFileFunction mff(vtab->name); + mff.function_info = info; + mff.statistics = mff_statistics; + mff.filter_pushdown = vtab->filter_pushdown; + mff.filter_prune = vtab->filter_prune; + mff.pushdown_complex_filter = mff_pushdown_complex_filter; + mff.get_partition_stats = mff_get_partition_stats; + mff.late_materialization = true; + mff.get_row_id_columns = [](ClientContext &, optional_ptr) -> vector { + return {COLUMN_IDENTIFIER_FILE_INDEX, COLUMN_IDENTIFIER_FILE_ROW_NUMBER}; + }; + + // Bind-time EXPLAIN output. Adds keys like "Function", "Files", + // "Projection", "Filters". MultiFileFunction also installs a + // dynamic_to_string that lists files at scan time; we leave that as-is. + mff.to_string = [](TableFunctionToStringInput &input) { + InsertionOrderPreservingMap result; + const auto &bind = input.bind_data->Cast(); + const auto &vortex_bind = bind.bind_data->Cast(); + auto map = reinterpret_cast(&result); + vortex_bind.vtab.to_string(vortex_bind.handle, map); + return result; + }; + + try { + // CreateFunctionSet returns a TableFunctionSet that bundles both the + // single-VARCHAR and LIST(VARCHAR) overloads (matching read_parquet's + // shape). This is what enables `read_vortex_v2(['a.vortex','b.vortex'])`. + auto function_set = MultiFileReader::CreateFunctionSet(mff); + auto &system_catalog = Catalog::GetSystemCatalog(db); + auto data = CatalogTransaction::GetSystemTransaction(db); + CreateTableFunctionInfo tf_info(function_set); + tf_info.on_conflict = OnCreateConflict::ALTER_ON_CONFLICT; + system_catalog.CreateFunction(data, tf_info); + } catch (const std::exception &e) { + ErrorData err(e); + DUCKDB_LOG_ERROR(db, "Failed to create Vortex multi-file function:\t" + err.Message()); + return DuckDBError; + } + return DuckDBSuccess; +} diff --git a/vortex-duckdb/cpp/object_cache.cpp b/vortex-duckdb/cpp/object_cache.cpp new file mode 100644 index 00000000000..953a1b63ca1 --- /dev/null +++ b/vortex-duckdb/cpp/object_cache.cpp @@ -0,0 +1,130 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +#include "duckdb_vx/object_cache.h" + +DUCKDB_INCLUDES_BEGIN +#include +#include +#include +DUCKDB_INCLUDES_END + +#include +#include + +namespace { + +class VortexObjectCacheEntry final : public duckdb::ObjectCacheEntry { +public: + VortexObjectCacheEntry(std::string object_type_p, + duckdb::idx_t estimated_memory_p, + void *data_p, + duckdb_delete_callback_t delete_callback_p) + : object_type(std::move(object_type_p)), estimated_memory(estimated_memory_p), data(data_p), + delete_callback(delete_callback_p) { + } + + ~VortexObjectCacheEntry() override { + if (delete_callback && data) { + delete_callback(data); + } + } + + std::string GetObjectType() override { + return object_type; + } + + duckdb::optional_idx GetEstimatedCacheMemory() const override { + return estimated_memory; + } + + void *GetData() const { + return data; + } + +private: + std::string object_type; + duckdb::idx_t estimated_memory; + void *data; + duckdb_delete_callback_t delete_callback; +}; + +} // namespace + +struct duckdb_vx_object_cache_entry_ { + explicit duckdb_vx_object_cache_entry_(duckdb::shared_ptr entry_p) + : entry(std::move(entry_p)) { + } + + duckdb::shared_ptr entry; +}; + +extern "C" duckdb_vx_object_cache_entry duckdb_vx_object_cache_get(duckdb_client_context ctx, + const char *key, + size_t key_len, + const char *object_type) { + if (!ctx || !key || !object_type) { + return nullptr; + } + + try { + auto &context = *reinterpret_cast(ctx); + auto object = duckdb::ObjectCache::GetObjectCache(context).GetObject(std::string(key, key_len)); + if (!object || object->GetObjectType() != object_type) { + return nullptr; + } + if (!dynamic_cast(object.get())) { + return nullptr; + } + return new duckdb_vx_object_cache_entry_(std::move(object)); + } catch (...) { + return nullptr; + } +} + +extern "C" void *duckdb_vx_object_cache_entry_get_data(duckdb_vx_object_cache_entry entry) { + if (!entry) { + return nullptr; + } + + auto *vortex_entry = dynamic_cast(entry->entry.get()); + return vortex_entry ? vortex_entry->GetData() : nullptr; +} + +extern "C" void duckdb_vx_object_cache_entry_free(duckdb_vx_object_cache_entry *entry) { + if (!entry || !*entry) { + return; + } + delete *entry; + *entry = nullptr; +} + +extern "C" duckdb_state duckdb_vx_object_cache_put(duckdb_client_context ctx, + const char *key, + size_t key_len, + const char *object_type, + idx_t estimated_memory, + void *data, + duckdb_delete_callback_t delete_callback) { + bool entry_created = false; + try { + if (!ctx || !key || !object_type || !data) { + if (delete_callback && data) { + delete_callback(data); + } + return DuckDBError; + } + + auto &context = *reinterpret_cast(ctx); + auto entry = duckdb::make_shared_ptr( + object_type, estimated_memory, data, delete_callback); + entry_created = true; + duckdb::ObjectCache::GetObjectCache(context).Put(std::string(key, key_len), std::move(entry)); + return DuckDBSuccess; + } catch (...) { + if (!entry_created && delete_callback && data) { + delete_callback(data); + } + return DuckDBError; + } +} diff --git a/vortex-duckdb/src/datasource.rs b/vortex-duckdb/src/datasource.rs index 1f07155e1f6..8921466bf40 100644 --- a/vortex-duckdb/src/datasource.rs +++ b/vortex-duckdb/src/datasource.rs @@ -63,7 +63,6 @@ use vortex_utils::parallelism::get_available_parallelism; use crate::RUNTIME; use crate::SESSION; use crate::convert::ToDuckDBScalar; -use crate::convert::try_from_bound_expression; use crate::convert::try_from_table_filter; use crate::convert::try_from_virtual_column_filter; use crate::duckdb::BindInputRef; @@ -497,23 +496,13 @@ impl TableFunction for T { } fn pushdown_complex_filter( - bind_data: &mut Self::BindData, + _bind_data: &mut Self::BindData, expr: &ExpressionRef, ) -> VortexResult { tracing::debug!("Attempting to push down filter expression: {expr}"); - let Some(expr) = try_from_bound_expression(expr)? else { - return Ok(false); - }; - bind_data.filter_exprs.push(expr); - - // NOTE(ngates): Vortex does indeed run exact filters, so in theory we should return `true` - // here to tell DuckDB we've handled the filter. However, DuckDB applies some crude - // cardinality estimation heuristics (e.g. an equality filter => 20% selectivity) that - // means by returning false, DuckDB runs an additional filter (a little bit of overhead) - // but tends to end up with a better query plan. - // If we plumb row count estimation into the layout tree, perhaps we could use zone maps - // etc. to return estimates. But this function is probably called too late anyway. Maybe - // we need our own cardinality heuristics. + // Returning false keeps DuckDB's cardinality heuristics in the plan. Since DuckDB will + // still evaluate the filter, do not also push it into Vortex: that only evaluates the same + // predicate twice. Ok(false) } diff --git a/vortex-duckdb/src/duckdb/mod.rs b/vortex-duckdb/src/duckdb/mod.rs index c42fbdaf1e4..5fb36350136 100644 --- a/vortex-duckdb/src/duckdb/mod.rs +++ b/vortex-duckdb/src/duckdb/mod.rs @@ -13,6 +13,8 @@ mod expr; mod file_system; mod logical_type; mod macro_; +mod multi_file_function; +mod object_cache; mod query_result; mod reusable_dict; mod scalar_function; @@ -37,6 +39,7 @@ pub use ddb_string::*; pub use expr::*; pub use file_system::*; pub use logical_type::*; +pub use multi_file_function::*; pub use query_result::*; pub use reusable_dict::*; pub use scalar_function::*; diff --git a/vortex-duckdb/src/duckdb/multi_file_function.rs b/vortex-duckdb/src/duckdb/multi_file_function.rs new file mode 100644 index 00000000000..8898f00e7a5 --- /dev/null +++ b/vortex-duckdb/src/duckdb/multi_file_function.rs @@ -0,0 +1,702 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! Rust-side wrapper for DuckDB's `MultiFileFunction` template. +//! +//! Lets a table-function-like type plug into DuckDB's native multi-file machinery +//! (file globbing, virtual columns, hive partitioning, COPY support, etc.) by +//! supplying only what's format-specific: how to open a file, how to read its +//! schema, and how to scan a chunk. The cross-file orchestration is owned by +//! DuckDB. +//! +//! The trait pair mirrors the DuckDB C++ surface: +//! - [`MultiFileFunction`] ↔ `MultiFileReaderInterface` +//! - [`BaseFileReader`] ↔ `BaseFileReader` +//! +//! The wrapper is generic over one [`MultiFileFunction`] implementation so each +//! registered function gets statically-monomorphised callbacks (no per-call dyn +//! dispatch). +//! +//! Callback lifecycle, matching DuckDB's `MultiFileFunction` / Parquet reader +//! model: +//! +//! 1. Bind: [`MultiFileFunction::create_options`], +//! [`MultiFileFunction::initialize_bind_data`], and +//! [`MultiFileFunction::bind_reader`] run once to collect options, bind-time +//! state, and schema. +//! 2. Query init: [`MultiFileFunction::init_global`] and +//! [`MultiFileFunction::init_local`] create per-query and per-worker state. +//! 3. File open: [`MultiFileFunction::create_reader`] is called when DuckDB +//! decides to open a file. DuckDB does not hold the global multi-file +//! scheduling mutex while opening; it switches to a per-file mutex so other +//! workers can wait for that specific reader. +//! 4. Reader preparation: [`BaseFileReader::prepare_reader`] maps projection +//! and filters onto the opened reader. It happens once per reader before any +//! scan assignment for that reader. +//! 5. Scan assignment: [`BaseFileReader::try_initialize_scan`] is called while +//! DuckDB holds its global multi-file scheduling mutex. This must be a cheap +//! claim of one independent unit of work, e.g. a row group or row range. It +//! must not perform I/O, block on async work, or construct expensive scan +//! pipelines. +//! 6. Scan execution: DuckDB releases the scheduling mutex before calling its +//! `PrepareScan` hook, exposed here as [`BaseFileReader::prepare_scan`]. +//! Reader implementations should build per-assignment local scan state +//! there, then [`BaseFileReader::scan`] drains that local state into chunks. + +use std::ffi::CStr; +use std::ffi::CString; +use std::fmt::Debug; +use std::ptr; +use std::slice; + +use vortex::error::VortexExpect; +use vortex::error::VortexResult; +use vortex::error::vortex_err; + +use crate::cpp; +use crate::duckdb::Cardinality; +use crate::duckdb::ClientContext; +use crate::duckdb::ClientContextRef; +use crate::duckdb::ColumnStatistics; +use crate::duckdb::DataChunk; +use crate::duckdb::DataChunkRef; +use crate::duckdb::DatabaseRef; +use crate::duckdb::DuckdbStringMap; +use crate::duckdb::DuckdbStringMapRef; +use crate::duckdb::ExpressionRef; +use crate::duckdb::LogicalTypeRef; +use crate::duckdb::TableFilterSet; +use crate::duckdb::TableFilterSetRef; +use crate::duckdb::try_or; +use crate::duckdb_try; + +/// A table function backed by DuckDB's `MultiFileFunction` template. +/// +/// Implementors describe a per-format reader; DuckDB owns the cross-file +/// orchestration (globbing, parallelism, virtual columns). +pub trait MultiFileFunction: Sized + Debug { + /// Per-format options collected from the `TABLE(...)` named parameters. + /// For minimal implementations this can be a unit struct. + type ReaderOptions: Send + Sync; + + /// Bind-time data, populated from options and (the schema of) the first + /// file. Must be `Send` because DuckDB may move it across threads. + type BindData: Clone + Send; + + /// Global state for one query invocation. Shared across worker threads. + type GlobalState: Send + Sync; + + /// Per-thread local state. + type LocalState; + + /// Per-file reader. Created when DuckDB first opens a file, dropped when + /// scanning of that file finishes. DuckDB stores it in a shared pointer and + /// may call read-only scan callbacks from multiple workers, so shared + /// callbacks must be thread-safe. + type Reader: BaseFileReader + Sync; + + /// Whether DuckDB may pass pushed table filters to + /// [`BaseFileReader::prepare_reader`]. + const FILTER_PUSHDOWN: bool = false; + + /// Whether DuckDB may omit filter-only columns from final table-scan + /// output. + /// + /// Only meaningful when [`Self::FILTER_PUSHDOWN`] is true. + const FILTER_PRUNE: bool = false; + + /// Construct default options. Called once per bind. + fn create_options(ctx: &ClientContextRef) -> VortexResult; + + /// Push a complex filter expression into bind data. + /// + /// Returning `true` tells DuckDB the filter is handled exactly and may be + /// removed from the remaining plan. Returning `false` leaves it for DuckDB + /// to apply above the scan or turn into a regular table filter. + fn pushdown_complex_filter( + bind_data: &mut Self::BindData, + expr: &ExpressionRef, + ) -> VortexResult { + let _ = (bind_data, expr); + Ok(false) + } + + /// Build bind data from options. Takes ownership of the options struct. + fn initialize_bind_data(options: Self::ReaderOptions) -> VortexResult; + + /// Populate the result schema. DuckDB picks the first file in the file list + /// to bind against; the implementation should open it (cheaply, metadata- + /// only if possible), record any bind-time metadata it needs, and append + /// columns to `schema`. + fn bind_reader( + ctx: &ClientContextRef, + bind_data: &mut Self::BindData, + first_file: &str, + schema: &mut SchemaBuilder, + ) -> VortexResult<()>; + + /// Initialize global state for one query. + fn init_global( + ctx: &ClientContextRef, + bind_data: &Self::BindData, + ) -> VortexResult; + + /// Initialize per-thread state. + fn init_local(global: &Self::GlobalState) -> Self::LocalState; + + /// Open a per-file reader. Called once per file, on the thread that won the + /// race to open it. DuckDB has dropped the global multi-file scheduling + /// mutex before this call, but holds a per-file mutex for this reader. It is + /// reasonable to open file metadata here; do not do per-scan or per-split + /// work here because projection/filter state is not fully prepared yet. + fn create_reader( + ctx: &ClientContextRef, + global: &Self::GlobalState, + bind_data: &Self::BindData, + file_path: &str, + file_idx: usize, + ) -> VortexResult; + + /// Estimated cardinality across `file_count` files. Default returns + /// [`Cardinality::Unknown`] (DuckDB falls back to its own heuristic). + fn cardinality(_bind_data: &Self::BindData, _file_count: usize) -> Cardinality { + Cardinality::Unknown + } + + /// Exact partition statistics for a file, if already available cheaply. + /// + /// DuckDB uses these to fold aggregates such as `COUNT(*)` during + /// optimization. Returning `None` leaves the scan plan unchanged. + fn partition_stats( + _ctx: &ClientContextRef, + _bind_data: &Self::BindData, + _file_path: &str, + ) -> VortexResult> { + Ok(None) + } + + /// Per-column statistics available from bind-time metadata. Default + /// returns `None`. + fn statistics(_bind_data: &Self::BindData, _name: &str) -> Option { + None + } + + /// Populate the bind-time EXPLAIN map with key/value pairs (typical keys: + /// `Function`, `Files`, `Projection`, `Filters`). Default no-op. + fn to_string(_bind_data: &Self::BindData, _map: &mut DuckdbStringMapRef) {} +} + +/// Exact per-file partition statistics exposed to DuckDB's optimizer. +#[derive(Clone, Copy, Debug)] +pub struct PartitionStats { + /// Exact number of rows in this file. + pub row_count: u64, +} + +/// A column DuckDB asks a [`BaseFileReader`] to produce in the intermediate +/// scan chunk. +#[derive(Clone, Copy, Debug)] +pub struct ProjectedColumn<'a> { + /// Column name in the file-local scan chunk. + pub name: &'a str, + /// DuckDB column id. Physical columns use a file-local id; virtual columns + /// use DuckDB's global virtual column id. + pub column_id: u64, + /// True when this column is one of DuckDB's virtual columns. + pub is_virtual: bool, + /// True when DuckDB's final output expressions reference this column. + /// + /// False columns are filter-only: the reader may use them for pushed filter + /// evaluation, but does not need to materialize them into the scan chunk. + pub is_projected: bool, +} + +/// Per-file reader contract. Implementations are owned by DuckDB once handed +/// off via [`MultiFileFunction::create_reader`] and dropped when scanning of +/// that file completes. +/// +/// DuckDB calls [`Self::try_initialize_scan`] while holding its global +/// multi-file lock. That method should claim one independent unit of scan work +/// and store only its descriptor in `LocalState`. [`Self::prepare_scan`] then +/// initializes actual per-worker state outside that lock. [`Self::scan`] drains +/// only that local state and may overlap with later +/// [`Self::try_initialize_scan`] calls on the same reader. +pub trait BaseFileReader { + /// Configure projection and filter pushdown. Called once after the reader + /// is created and before any [`Self::try_initialize_scan`] call. + /// `projection` is the ordered list of intermediate scan columns DuckDB + /// allocated for this reader. Filter-only columns have + /// [`ProjectedColumn::is_projected`] set to false. `filters` carries any + /// filters DuckDB pushed down for this scan. + /// + /// Default: no-op (reader scans all columns, no filter pushdown). + fn prepare_reader( + &mut self, + projection: &[ProjectedColumn<'_>], + filters: Option<&TableFilterSetRef>, + ) -> VortexResult<()> { + let _ = (projection, filters); + Ok(()) + } + + /// Set up scan state for the next batch. Called under DuckDB's global + /// multi-file scheduling lock; this should only claim work into `local`. + /// Do not open readers, call `block_on`, or construct scan iterators here. + /// Return `false` once exhausted. + fn try_initialize_scan( + &self, + global: &GlobalState, + local: &mut LocalState, + ) -> VortexResult; + + /// Initialize local scan state for the work claimed by + /// [`Self::try_initialize_scan`]. DuckDB calls this outside its global + /// multi-file scheduling lock, so implementations may open per-split + /// iterators, block on async setup, or build scan pipelines here. + /// + /// Default: no-op. + fn prepare_scan(&self, global: &GlobalState, local: &mut LocalState) -> VortexResult<()> { + let _ = (global, local); + Ok(()) + } + + /// Produce the next batch into `chunk`. Setting `chunk` to size 0 signals + /// end-of-assignment; otherwise non-empty implies more may follow. This is + /// called outside DuckDB's global multi-file scheduling lock after + /// [`Self::prepare_scan`]. + fn scan( + &self, + global: &GlobalState, + local: &mut LocalState, + chunk: &mut DataChunkRef, + ) -> VortexResult<()>; + + /// Per-column statistics by name. Default returns `None`. + fn get_statistics(&self, _name: &str) -> Option { + None + } + + /// Scan progress within this file in `[0.0, 100.0]`. Default `0.0`. + fn progress_in_file(&self) -> f64 { + 0.0 + } +} + +/// Append-only schema builder passed to [`MultiFileFunction::bind_reader`]. +/// +/// Wraps the C++ `vector` / `vector` pair via +/// `duckdb_vx_mff_schema_writer_add_column`. +pub struct SchemaBuilder { + raw: cpp::duckdb_vx_mff_schema_writer, +} + +impl SchemaBuilder { + /// Append `(name, type)` to the result schema. + pub fn add_column(&mut self, name: &str, logical_type: &LogicalTypeRef) { + unsafe { + cpp::duckdb_vx_mff_schema_writer_add_column( + self.raw, + name.as_ptr().cast(), + name.len(), + logical_type.as_ptr(), + ); + } + } +} + +impl DatabaseRef { + /// Register `T` as a multi-file table function on this database under + /// `name`. + /// + /// The vtable is statically derived from `T` and copied into a C++ + /// `TableFunctionInfo` owned by the catalog; `T` itself is never instanced. + pub fn register_multi_file_function( + &self, + name: &CStr, + ) -> VortexResult<()> { + let vtab = cpp::duckdb_vx_mff_vtab_t { + name: name.as_ptr(), + filter_pushdown: T::FILTER_PUSHDOWN, + filter_prune: T::FILTER_PRUNE, + pushdown_complex_filter: Some(pushdown_complex_filter::), + create_options: Some(create_options::), + free_options: Some(free_options::), + initialize_bind_data: Some(initialize_bind_data::), + clone_bind_data: Some(clone_bind_data::), + free_bind_data: Some(free_bind_data::), + bind_reader: Some(bind_reader::), + init_global: Some(init_global::), + free_global: Some(free_global::), + init_local: Some(init_local::), + free_local: Some(free_local::), + create_reader: Some(create_reader::), + free_reader: Some(free_reader::), + prepare_reader: Some(prepare_reader::), + try_initialize_scan: Some(try_initialize_scan::), + prepare_scan: Some(prepare_scan::), + scan: Some(scan::), + statistics: Some(statistics::), + get_statistics: Some(get_statistics::), + progress_in_file: Some(progress_in_file::), + cardinality: Some(cardinality::), + partition_stats: Some(partition_stats::), + to_string: Some(to_string::), + }; + + duckdb_try!( + unsafe { cpp::duckdb_vx_mff_register(self.as_ptr(), &raw const vtab) }, + "Failed to register multi-file function '{}'", + name.to_string_lossy() + ); + + Ok(()) + } +} + +// --------------------------------------------------------------------------- +// FFI shim: each callback boxes/unboxes the trait's associated type and +// dispatches to the corresponding trait method. +// --------------------------------------------------------------------------- + +unsafe extern "C-unwind" fn create_options( + ctx: cpp::duckdb_client_context, + error_out: *mut cpp::duckdb_vx_error, +) -> cpp::duckdb_vx_mff_options { + let ctx = unsafe { ClientContext::borrow(ctx) }; + try_or(error_out, || { + let opts = T::create_options(ctx)?; + Ok(Box::into_raw(Box::new(opts)).cast()) + }) +} + +unsafe extern "C-unwind" fn free_options(opts: cpp::duckdb_vx_mff_options) { + if !opts.is_null() { + drop(unsafe { Box::from_raw(opts.cast::()) }); + } +} + +unsafe extern "C-unwind" fn initialize_bind_data( + opts: cpp::duckdb_vx_mff_options, + error_out: *mut cpp::duckdb_vx_error, +) -> cpp::duckdb_vx_mff_bind_data { + let opts = unsafe { Box::from_raw(opts.cast::()) }; + try_or(error_out, || { + let bind_data = T::initialize_bind_data(*opts)?; + Ok(Box::into_raw(Box::new(bind_data)).cast()) + }) +} + +unsafe extern "C-unwind" fn free_bind_data( + bind_data: cpp::duckdb_vx_mff_bind_data, +) { + if !bind_data.is_null() { + drop(unsafe { Box::from_raw(bind_data.cast::()) }); + } +} + +unsafe extern "C-unwind" fn clone_bind_data( + bind_data: cpp::duckdb_vx_mff_bind_data, + error_out: *mut cpp::duckdb_vx_error, +) -> cpp::duckdb_vx_mff_bind_data { + let bind_data = + unsafe { bind_data.cast::().as_ref() }.vortex_expect("bind_data null"); + try_or(error_out, || { + Ok(Box::into_raw(Box::new(bind_data.clone())).cast()) + }) +} + +unsafe extern "C-unwind" fn pushdown_complex_filter( + bind_data: cpp::duckdb_vx_mff_bind_data, + expr: cpp::duckdb_vx_expr, + error_out: *mut cpp::duckdb_vx_error, +) -> bool { + let bind_data = + unsafe { bind_data.cast::().as_mut() }.vortex_expect("bind_data null"); + let expr = unsafe { crate::duckdb::Expression::borrow(expr) }; + try_or(error_out, || T::pushdown_complex_filter(bind_data, expr)) +} + +unsafe extern "C-unwind" fn bind_reader( + ctx: cpp::duckdb_client_context, + bind_data: cpp::duckdb_vx_mff_bind_data, + file_path: *const std::os::raw::c_char, + path_len: usize, + schema_writer: cpp::duckdb_vx_mff_schema_writer, + error_out: *mut cpp::duckdb_vx_error, +) { + let ctx = unsafe { ClientContext::borrow(ctx) }; + let bind_data = + unsafe { bind_data.cast::().as_mut() }.vortex_expect("bind_data null"); + let mut builder = SchemaBuilder { raw: schema_writer }; + try_or(error_out, || { + let path_bytes = unsafe { slice::from_raw_parts(file_path.cast::(), path_len) }; + let path = std::str::from_utf8(path_bytes) + .map_err(|e| vortex_err!("file path is not UTF-8: {e}"))?; + T::bind_reader(ctx, bind_data, path, &mut builder) + }) +} + +unsafe extern "C-unwind" fn init_global( + ctx: cpp::duckdb_client_context, + bind_data: cpp::duckdb_vx_mff_bind_data, + error_out: *mut cpp::duckdb_vx_error, +) -> cpp::duckdb_vx_mff_global { + let ctx = unsafe { ClientContext::borrow(ctx) }; + let bind_data = + unsafe { bind_data.cast::().as_ref() }.vortex_expect("bind_data null"); + try_or(error_out, || { + let global = T::init_global(ctx, bind_data)?; + Ok(Box::into_raw(Box::new(global)).cast()) + }) +} + +unsafe extern "C-unwind" fn free_global(global: cpp::duckdb_vx_mff_global) { + if !global.is_null() { + drop(unsafe { Box::from_raw(global.cast::()) }); + } +} + +unsafe extern "C-unwind" fn init_local( + global: cpp::duckdb_vx_mff_global, +) -> cpp::duckdb_vx_mff_local { + let global = unsafe { global.cast::().as_ref() }.vortex_expect("global null"); + let local = T::init_local(global); + Box::into_raw(Box::new(local)).cast() +} + +unsafe extern "C-unwind" fn free_local(local: cpp::duckdb_vx_mff_local) { + if !local.is_null() { + drop(unsafe { Box::from_raw(local.cast::()) }); + } +} + +#[allow(clippy::too_many_arguments)] +unsafe extern "C-unwind" fn create_reader( + ctx: cpp::duckdb_client_context, + global: cpp::duckdb_vx_mff_global, + bind_data: cpp::duckdb_vx_mff_bind_data, + file_path: *const std::os::raw::c_char, + path_len: usize, + file_idx: usize, + error_out: *mut cpp::duckdb_vx_error, +) -> cpp::duckdb_vx_mff_reader { + let ctx = unsafe { ClientContext::borrow(ctx) }; + let global = unsafe { global.cast::().as_ref() }.vortex_expect("global null"); + let bind_data = + unsafe { bind_data.cast::().as_ref() }.vortex_expect("bind_data null"); + try_or(error_out, || { + let path_bytes = unsafe { slice::from_raw_parts(file_path.cast::(), path_len) }; + let path = std::str::from_utf8(path_bytes) + .map_err(|e| vortex_err!("file path is not UTF-8: {e}"))?; + let reader = T::create_reader(ctx, global, bind_data, path, file_idx)?; + Ok(Box::into_raw(Box::new(reader)).cast()) + }) +} + +unsafe extern "C-unwind" fn free_reader(reader: cpp::duckdb_vx_mff_reader) { + if !reader.is_null() { + drop(unsafe { Box::from_raw(reader.cast::()) }); + } +} + +unsafe extern "C-unwind" fn prepare_reader( + reader: cpp::duckdb_vx_mff_reader, + projection: *const cpp::duckdb_vx_mff_column, + projection_count: usize, + filters: cpp::duckdb_vx_table_filter_set, + error_out: *mut cpp::duckdb_vx_error, +) { + let reader = unsafe { reader.cast::().as_mut() }.vortex_expect("reader null"); + let filter_ref = if filters.is_null() { + None + } else { + Some(unsafe { TableFilterSet::borrow(filters) }) + }; + try_or(error_out, || { + // Materialize column metadata with &str borrows scoped to this call. + let mut projected_columns = Vec::with_capacity(projection_count); + for i in 0..projection_count { + let col = unsafe { &*projection.add(i) }; + let bytes = unsafe { slice::from_raw_parts(col.name.cast::(), col.name_len) }; + let name = std::str::from_utf8(bytes) + .map_err(|e| vortex_err!("projection column name not UTF-8: {e}"))?; + projected_columns.push(ProjectedColumn { + name, + column_id: col.column_id, + is_virtual: col.is_virtual, + is_projected: col.is_projected, + }); + } + reader.prepare_reader(&projected_columns, filter_ref) + }); +} + +unsafe extern "C-unwind" fn try_initialize_scan( + reader: cpp::duckdb_vx_mff_reader, + global: cpp::duckdb_vx_mff_global, + local: cpp::duckdb_vx_mff_local, + error_out: *mut cpp::duckdb_vx_error, +) -> bool { + let reader = unsafe { reader.cast::().as_ref() }.vortex_expect("reader null"); + let global = unsafe { global.cast::().as_ref() }.vortex_expect("global null"); + let local = unsafe { local.cast::().as_mut() }.vortex_expect("local null"); + try_or(error_out, || reader.try_initialize_scan(global, local)) +} + +unsafe extern "C-unwind" fn prepare_scan( + reader: cpp::duckdb_vx_mff_reader, + global: cpp::duckdb_vx_mff_global, + local: cpp::duckdb_vx_mff_local, + error_out: *mut cpp::duckdb_vx_error, +) { + let reader = unsafe { reader.cast::().as_ref() }.vortex_expect("reader null"); + let global = unsafe { global.cast::().as_ref() }.vortex_expect("global null"); + let local = unsafe { local.cast::().as_mut() }.vortex_expect("local null"); + try_or(error_out, || reader.prepare_scan(global, local)) +} + +unsafe extern "C-unwind" fn scan( + reader: cpp::duckdb_vx_mff_reader, + global: cpp::duckdb_vx_mff_global, + local: cpp::duckdb_vx_mff_local, + chunk: cpp::duckdb_data_chunk, + error_out: *mut cpp::duckdb_vx_error, +) -> bool { + let reader = unsafe { reader.cast::().as_ref() }.vortex_expect("reader null"); + let global = unsafe { global.cast::().as_ref() }.vortex_expect("global null"); + let local = unsafe { local.cast::().as_mut() }.vortex_expect("local null"); + let chunk_ref = unsafe { DataChunk::borrow_mut(chunk) }; + match reader.scan(global, local, chunk_ref) { + Ok(()) => { + unsafe { error_out.write(ptr::null_mut()) }; + true + } + Err(err) => { + let msg = err.to_string(); + unsafe { error_out.write(cpp::duckdb_vx_error_create(msg.as_ptr().cast(), msg.len())) }; + false + } + } +} + +unsafe extern "C-unwind" fn get_statistics( + reader: cpp::duckdb_vx_mff_reader, + name: *const std::os::raw::c_char, + name_len: usize, + stats_out: *mut cpp::duckdb_column_statistics, +) -> bool { + let reader = unsafe { reader.cast::().as_ref() }.vortex_expect("reader null"); + let name = unsafe { slice::from_raw_parts(name.cast::(), name_len) }; + let Ok(name) = std::str::from_utf8(name) else { + return false; + }; + let Some(stats) = reader.get_statistics(name) else { + return false; + }; + write_column_statistics(stats_out, stats); + true +} + +unsafe extern "C-unwind" fn statistics( + bind_data: cpp::duckdb_vx_mff_bind_data, + name: *const std::os::raw::c_char, + name_len: usize, + stats_out: *mut cpp::duckdb_column_statistics, +) -> bool { + let bind_data = + unsafe { bind_data.cast::().as_ref() }.vortex_expect("bind_data null"); + let name = unsafe { slice::from_raw_parts(name.cast::(), name_len) }; + let Ok(name) = std::str::from_utf8(name) else { + return false; + }; + let Some(stats) = T::statistics(bind_data, name) else { + return false; + }; + write_column_statistics(stats_out, stats); + true +} + +fn write_column_statistics(stats_out: *mut cpp::duckdb_column_statistics, stats: ColumnStatistics) { + let out = unsafe { &mut *stats_out }; + out.min = stats.min.map_or(ptr::null_mut(), |v| v.into_ptr()); + out.max = stats.max.map_or(ptr::null_mut(), |v| v.into_ptr()); + out.max_string_length = stats.max_string_length; + out.has_null = stats.has_null; +} + +unsafe extern "C-unwind" fn progress_in_file( + reader: cpp::duckdb_vx_mff_reader, +) -> f64 { + let reader = unsafe { reader.cast::().as_ref() }.vortex_expect("reader null"); + reader.progress_in_file() +} + +unsafe extern "C-unwind" fn cardinality( + bind_data: cpp::duckdb_vx_mff_bind_data, + file_count: usize, + out: *mut cpp::duckdb_vx_node_statistics, +) -> bool { + let bind_data = + unsafe { bind_data.cast::().as_ref() }.vortex_expect("bind_data null"); + let out = unsafe { &mut *out }; + match T::cardinality(bind_data, file_count) { + Cardinality::Unknown => false, + Cardinality::Estimate(c) => { + out.has_estimated_cardinality = true; + out.estimated_cardinality = c; + true + } + Cardinality::Maximum(c) => { + out.has_max_cardinality = true; + out.max_cardinality = c; + out.has_estimated_cardinality = true; + out.estimated_cardinality = c; + true + } + } +} + +unsafe extern "C-unwind" fn partition_stats( + ctx: cpp::duckdb_client_context, + bind_data: cpp::duckdb_vx_mff_bind_data, + file_path: *const std::os::raw::c_char, + path_len: usize, + out: *mut cpp::duckdb_vx_mff_partition_stats, + error_out: *mut cpp::duckdb_vx_error, +) -> bool { + let ctx = unsafe { ClientContext::borrow(ctx) }; + let bind_data = + unsafe { bind_data.cast::().as_ref() }.vortex_expect("bind_data null"); + try_or(error_out, || { + let path_bytes = unsafe { slice::from_raw_parts(file_path.cast::(), path_len) }; + let path = std::str::from_utf8(path_bytes) + .map_err(|e| vortex_err!("file path is not UTF-8: {e}"))?; + let Some(stats) = T::partition_stats(ctx, bind_data, path)? else { + return Ok(false); + }; + let out = unsafe { &mut *out }; + out.row_count = stats.row_count; + Ok(true) + }) +} + +unsafe extern "C-unwind" fn to_string( + bind_data: cpp::duckdb_vx_mff_bind_data, + map: cpp::duckdb_vx_string_map, +) { + let bind_data = + unsafe { bind_data.cast::().as_ref() }.vortex_expect("bind_data null"); + let map = unsafe { DuckdbStringMap::borrow_mut(map) }; + T::to_string(bind_data, map); +} + +// --------------------------------------------------------------------------- +// Helpers used by Phase 5 (concrete implementations). +// --------------------------------------------------------------------------- + +/// Build a `CStr` literal-equivalent at runtime. Convenient for type names +/// passed to `LogicalType` / DuckDB FFI. +#[allow(dead_code)] +pub(crate) fn cstring(s: &str) -> CString { + CString::new(s).unwrap_or_else(|_| CString::default()) +} diff --git a/vortex-duckdb/src/duckdb/object_cache.rs b/vortex-duckdb/src/duckdb/object_cache.rs new file mode 100644 index 00000000000..67b7840836b --- /dev/null +++ b/vortex-duckdb/src/duckdb/object_cache.rs @@ -0,0 +1,132 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::ffi::CStr; +use std::ffi::CString; + +use vortex::error::VortexResult; +use vortex::error::vortex_bail; +use vortex::error::vortex_err; + +use crate::cpp; +use crate::duckdb::ClientContextRef; +use crate::duckdb::drop_boxed; +use crate::lifetime_wrapper; + +lifetime_wrapper!( + /// A borrowed DuckDB object-cache entry handle. + ObjectCacheEntry, + cpp::duckdb_vx_object_cache_entry, + cpp::duckdb_vx_object_cache_entry_free +); + +impl ObjectCacheEntryRef { + fn data_ptr(&self) -> *mut std::ffi::c_void { + unsafe { cpp::duckdb_vx_object_cache_entry_get_data(self.as_ptr()) } + } +} + +impl ClientContextRef { + /// Retrieve a cloned Rust value from DuckDB's per-database object cache. + /// + /// `object_type` must match the type string used when storing the value. + /// + /// # Safety + /// + /// The caller must ensure that every object stored under the `(key, + /// object_type)` pair was inserted as the same Rust type `T`. + pub unsafe fn object_cache_get_cloned( + &self, + key: &str, + object_type: &CStr, + ) -> VortexResult> { + let key = cache_key(key)?; + let entry = unsafe { + cpp::duckdb_vx_object_cache_get( + self.as_ptr(), + key.as_ptr(), + key.as_bytes().len(), + object_type.as_ptr(), + ) + }; + if entry.is_null() { + return Ok(None); + } + + let entry = unsafe { ObjectCacheEntry::own(entry) }; + let data = entry.data_ptr(); + if data.is_null() { + return Ok(None); + } + + Ok(Some(unsafe { (&*data.cast::()).clone() })) + } + + /// Store a Rust value in DuckDB's per-database object cache. + /// + /// `estimated_memory` is reported to DuckDB's object cache in bytes for + /// eviction accounting. + pub fn object_cache_put( + &self, + key: &str, + object_type: &CStr, + estimated_memory: usize, + value: T, + ) -> VortexResult<()> { + let key = cache_key(key)?; + let estimated_memory = cpp::idx_t::try_from(estimated_memory) + .map_err(|_| vortex_err!("object cache memory estimate does not fit idx_t"))?; + let data = Box::into_raw(Box::new(value)); + let state = unsafe { + cpp::duckdb_vx_object_cache_put( + self.as_ptr(), + key.as_ptr(), + key.as_bytes().len(), + object_type.as_ptr(), + estimated_memory, + data.cast(), + Some(drop_boxed::), + ) + }; + if state != cpp::duckdb_state::DuckDBSuccess { + vortex_bail!("failed to store object in DuckDB object cache"); + } + Ok(()) + } +} + +fn cache_key(key: &str) -> VortexResult { + CString::new(key).map_err(|_| vortex_err!("object cache key contains an interior NUL byte")) +} + +#[cfg(test)] +mod tests { + use vortex::error::VortexResult; + + use crate::duckdb::Database; + + #[test] + fn object_cache_round_trip_clones_stored_value() -> VortexResult<()> { + let db = Database::open_in_memory()?; + let conn = db.connect()?; + let ctx = conn.client_context()?; + + assert_eq!( + unsafe { ctx.object_cache_get_cloned::("cache-key", c"vortex_test") }?, + None + ); + + ctx.object_cache_put("cache-key", c"vortex_test", 5, String::from("value"))?; + + assert_eq!( + unsafe { ctx.object_cache_get_cloned::("cache-key", c"vortex_test") }?, + Some(String::from("value")) + ); + assert_eq!( + unsafe { ctx.object_cache_get_cloned::("cache-key", c"other_type") }?, + None + ); + + Ok(()) + } +} diff --git a/vortex-duckdb/src/e2e_test/vortex_scan_test.rs b/vortex-duckdb/src/e2e_test/vortex_scan_test.rs index 8e65d26ed6f..2c0b2c16481 100644 --- a/vortex-duckdb/src/e2e_test/vortex_scan_test.rs +++ b/vortex-duckdb/src/e2e_test/vortex_scan_test.rs @@ -3,7 +3,6 @@ //! This module contains tests for the `vortex_scan` table function. -use std::ffi::CStr; use std::io::Write; use std::net::TcpListener; use std::path::Path; @@ -185,8 +184,7 @@ fn test_scan_function_registration() { let chunk = result.into_iter().next().unwrap(); let vec = chunk.get_vector(0); let mut result = vec.as_slice_with_len::(chunk.len().as_())[0]; - let string = - unsafe { CStr::from_ptr(cpp::duckdb_string_t_data(&raw mut result)).to_string_lossy() }; + let string = String::from_duckdb_value(&mut result); assert_eq!(string, "vortex_scan"); } @@ -993,3 +991,407 @@ fn test_vortex_encodings_roundtrip() { let fixed_child_values = fixed_child.as_slice_with_len::(10); // 10 total child elements assert_eq!(fixed_child_values, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]); } + +#[test] +fn test_read_vortex_v2_basic() { + let file = RUNTIME.block_on(async { + let numbers = buffer![1i32, 2, 3, 4, 5]; + write_single_column_vortex_file("number", numbers).await + }); + let conn = database_connection(); + let file_path = file.path().to_string_lossy(); + let result = conn + .query(&format!( + "SELECT SUM(number) FROM read_vortex_v2('{file_path}')" + )) + .unwrap(); + let chunk = result.into_iter().next().unwrap(); + let vec = chunk.get_vector(0); + let sum = vec.as_slice_with_len::(chunk.len().as_())[0]; + assert_eq!(sum, 15); +} + +#[test] +fn test_read_vortex_v2_strings() { + let file = RUNTIME.block_on(async { + let strings = VarBinArray::from(vec!["alpha", "beta", "gamma"]); + write_single_column_vortex_file("s", strings).await + }); + let conn = database_connection(); + let file_path = file.path().to_string_lossy(); + let result = conn + .query(&format!( + "SELECT string_agg(s, ',') FROM read_vortex_v2('{file_path}')" + )) + .unwrap(); + let mut chunk = result.into_iter().next().unwrap(); + let len = chunk.len().as_(); + let vec = chunk.get_vector_mut(0); + let mut s = unsafe { vec.as_slice_mut::(len) }[0]; + let aggregated = String::from_duckdb_value(&mut s); + assert_eq!(aggregated, "alpha,beta,gamma"); +} + +#[test] +fn test_read_vortex_v2_multiple_files() { + let (tempdir, _f1, _f2) = RUNTIME.block_on(async { + let tempdir = tempfile::tempdir().unwrap(); + let f1 = write_vortex_file_to_dir(tempdir.path(), "numbers", buffer![10i32, 20, 30]).await; + let f2 = write_vortex_file_to_dir(tempdir.path(), "numbers", buffer![40i32, 50, 60]).await; + (tempdir, f1, f2) + }); + + let glob_pattern = format!("{}/*.vortex", tempdir.path().display()); + let conn = database_connection(); + let result = conn + .query(&format!( + "SELECT SUM(numbers) FROM read_vortex_v2('{glob_pattern}')" + )) + .unwrap(); + let chunk = result.into_iter().next().unwrap(); + let vec = chunk.get_vector(0); + let total = vec.as_slice_with_len::(chunk.len().as_())[0]; + assert_eq!(total, 210); +} + +#[test] +fn test_read_vortex_v2_filters_on_unprojected_column() { + let file = RUNTIME.block_on(async { + write_vortex_file( + [ + ( + "payload", + PrimitiveArray::from_iter([10i32, 20, 30, 40, 50]), + ), + ("unused_a", PrimitiveArray::from_iter([1i32, 1, 1, 1, 1])), + ("unused_b", PrimitiveArray::from_iter([2i32, 2, 2, 2, 2])), + ("filter_key", PrimitiveArray::from_iter([1i32, 2, 3, 4, 5])), + ] + .into_iter(), + ) + .await + }); + + let conn = database_connection(); + let file_path = file.path().to_string_lossy(); + let result = conn + .query(&format!( + "SELECT SUM(payload) FROM read_vortex_v2('{file_path}') WHERE filter_key <= 2" + )) + .unwrap(); + let chunk = result.into_iter().next().unwrap(); + let vec = chunk.get_vector(0); + let total = vec.as_slice_with_len::(chunk.len().as_())[0]; + assert_eq!(total, 30); +} + +#[test] +fn test_read_vortex_v2_exposes_dynamic_filter_pushdown() { + let file = RUNTIME.block_on(async { + let numbers = PrimitiveArray::from_iter(0i32..10_000); + write_single_column_vortex_file("number", numbers).await + }); + + let conn = database_connection(); + let file_path = file.path().to_string_lossy(); + let result = conn + .query(&format!( + "EXPLAIN SELECT number FROM read_vortex_v2('{file_path}') ORDER BY number LIMIT 5" + )) + .unwrap(); + + let mut explain = String::new(); + for mut chunk in result { + let len = chunk.len().as_(); + for column_idx in 0..chunk.column_count() { + let vector = chunk.get_vector_mut(column_idx); + for value in unsafe { vector.as_slice_mut::(len) } { + explain.push_str(&String::from_duckdb_value(value)); + explain.push('\n'); + } + } + } + + assert!( + explain.contains("Dynamic Filter"), + "expected read_vortex_v2 EXPLAIN to include a pushed dynamic filter, got:\n{explain}" + ); +} + +#[test] +fn test_read_vortex_v2_pushes_complex_contains_filter() { + let file = RUNTIME.block_on(async { + write_vortex_file( + [ + ( + "URL", + VarBinArray::from(vec![ + "https://example.com", + "https://www.google.com/search", + "https://mail.google.com", + "https://vortex.dev", + ]) + .into_array(), + ), + ( + "EventTime", + PrimitiveArray::from_iter([40i32, 30, 20, 10]).into_array(), + ), + ] + .into_iter(), + ) + .await + }); + + let conn = database_connection(); + let file_path = file.path().to_string_lossy(); + let count = conn + .query(&format!( + "SELECT COUNT(*) FROM read_vortex_v2('{file_path}') WHERE URL LIKE '%google%'" + )) + .unwrap() + .into_iter() + .next() + .unwrap() + .get_vector(0) + .as_slice_with_len::(1)[0]; + assert_eq!(count, 2); + + let result = conn + .query(&format!( + "EXPLAIN SELECT * FROM read_vortex_v2('{file_path}') WHERE URL LIKE '%google%' ORDER BY EventTime LIMIT 10" + )) + .unwrap(); + + let mut explain = String::new(); + for mut chunk in result { + let len = chunk.len().as_(); + for column_idx in 0..chunk.column_count() { + let vector = chunk.get_vector_mut(column_idx); + for value in unsafe { vector.as_slice_mut::(len) } { + explain.push_str(&String::from_duckdb_value(value)); + explain.push('\n'); + } + } + } + + assert!( + !explain.contains("│ FILTER │"), + "expected the URL contains filter to be removed from the standalone DuckDB FILTER, got:\n{explain}" + ); + assert!( + !explain.contains("contains(URL, 'google')"), + "expected the URL contains filter to be fully pushed into read_vortex_v2, got:\n{explain}" + ); +} + +#[test] +fn test_read_vortex_v2_leaves_non_consumed_complex_or_filter_to_duckdb() { + let file = RUNTIME.block_on(async { + write_vortex_file( + [ + ("a", PrimitiveArray::from_iter([1i32, 2, 3, 4]).into_array()), + ( + "b", + PrimitiveArray::from_iter([10i32, 20, 20, 40]).into_array(), + ), + ] + .into_iter(), + ) + .await + }); + + let conn = database_connection(); + let file_path = file.path().to_string_lossy(); + let count = conn + .query(&format!( + "SELECT COUNT(*) FROM read_vortex_v2('{file_path}') WHERE a = 1 OR b = 20" + )) + .unwrap() + .into_iter() + .next() + .unwrap() + .get_vector(0) + .as_slice_with_len::(1)[0]; + assert_eq!(count, 3); + + let result = conn + .query(&format!( + "EXPLAIN SELECT * FROM read_vortex_v2('{file_path}') WHERE a = 1 OR b = 20" + )) + .unwrap(); + + let mut explain = String::new(); + for mut chunk in result { + let len = chunk.len().as_(); + for column_idx in 0..chunk.column_count() { + let vector = chunk.get_vector_mut(column_idx); + for value in unsafe { vector.as_slice_mut::(len) } { + explain.push_str(&String::from_duckdb_value(value)); + explain.push('\n'); + } + } + } + + assert!( + !explain.contains("$.a") && !explain.contains("$.b"), + "expected the non-consumed OR filter to stay out of read_vortex_v2, got:\n{explain}" + ); + assert!( + explain.contains("│ FILTER │"), + "expected DuckDB to keep its standalone OR filter for planning, got:\n{explain}" + ); +} + +#[test] +fn test_read_vortex_v2_uses_file_stats_for_join_filter_pushdown() { + let fact = RUNTIME.block_on(async { + write_single_column_vortex_file("k", PrimitiveArray::from_iter(0i32..1000)).await + }); + let dim = RUNTIME.block_on(async { + write_single_column_vortex_file("k", PrimitiveArray::from_iter(10i32..21)).await + }); + + let conn = database_connection(); + let fact_path = fact.path().to_string_lossy(); + let dim_path = dim.path().to_string_lossy(); + let count = conn + .query(&format!( + "SELECT COUNT(*) FROM read_vortex_v2('{fact_path}') fact JOIN read_vortex_v2('{dim_path}') dim USING (k)" + )) + .unwrap() + .into_iter() + .next() + .unwrap() + .get_vector(0) + .as_slice_with_len::(1)[0]; + assert_eq!(count, 11); + + let result = conn + .query(&format!( + "EXPLAIN SELECT COUNT(*) FROM read_vortex_v2('{fact_path}') fact JOIN read_vortex_v2('{dim_path}') dim USING (k)" + )) + .unwrap(); + + let mut explain = String::new(); + for mut chunk in result { + let len = chunk.len().as_(); + for column_idx in 0..chunk.column_count() { + let vector = chunk.get_vector_mut(column_idx); + for value in unsafe { vector.as_slice_mut::(len) } { + explain.push_str(&String::from_duckdb_value(value)); + explain.push('\n'); + } + } + } + + assert!( + explain.contains("k>=10") && explain.contains("k<=20"), + "expected file statistics to produce a join-derived scan filter, got:\n{explain}" + ); +} + +#[test] +fn test_read_vortex_v2_uses_late_materialization_for_top_n() { + let file = RUNTIME.block_on(async { + write_vortex_file( + [ + ( + "URL", + VarBinArray::from(vec![ + "https://example.com", + "https://www.google.com/search", + "https://mail.google.com", + "https://vortex.dev", + ]) + .into_array(), + ), + ( + "EventTime", + PrimitiveArray::from_iter([40i32, 30, 20, 10]).into_array(), + ), + ( + "WatchID", + PrimitiveArray::from_iter([100i64, 200, 300, 400]).into_array(), + ), + ( + "JavaEnable", + PrimitiveArray::from_iter([1i8, 0, 1, 0]).into_array(), + ), + ] + .into_iter(), + ) + .await + }); + + let conn = database_connection(); + let file_path = file.path().to_string_lossy(); + let result = conn + .query(&format!( + "SELECT string_agg(URL, ',') FROM (SELECT * FROM read_vortex_v2('{file_path}') WHERE URL LIKE '%google%' ORDER BY EventTime LIMIT 2)" + )) + .unwrap(); + let mut chunk = result.into_iter().next().unwrap(); + let len = chunk.len().as_(); + let vector = chunk.get_vector_mut(0); + let mut value = unsafe { vector.as_slice_mut::(len) }[0]; + assert_eq!( + String::from_duckdb_value(&mut value), + "https://mail.google.com,https://www.google.com/search" + ); + + let result = conn + .query(&format!( + "EXPLAIN SELECT * FROM read_vortex_v2('{file_path}') WHERE URL LIKE '%google%' ORDER BY EventTime LIMIT 2" + )) + .unwrap(); + + let mut explain = String::new(); + for mut chunk in result { + let len = chunk.len().as_(); + for column_idx in 0..chunk.column_count() { + let vector = chunk.get_vector_mut(column_idx); + for value in unsafe { vector.as_slice_mut::(len) } { + explain.push_str(&String::from_duckdb_value(value)); + explain.push('\n'); + } + } + } + + assert!( + explain.contains("HASH_JOIN") && explain.contains("SEMI"), + "expected read_vortex_v2 TopN plan to use a late-materialization semi join, got:\n{explain}" + ); + assert!( + explain.contains("file_row_number"), + "expected late materialization to project file_row_number as a row id, got:\n{explain}" + ); +} + +#[test] +fn test_read_vortex_v2_many_large_files_parallel_scan() { + let (tempdir, _files) = RUNTIME.block_on(async { + let tempdir = tempfile::tempdir().unwrap(); + let mut files = Vec::new(); + for file_idx in 0..32 { + let start = file_idx * 10_000; + let numbers = PrimitiveArray::from_iter(start..start + 10_000); + files.push(write_vortex_file_to_dir(tempdir.path(), "number", numbers).await); + } + (tempdir, files) + }); + + let conn = database_connection(); + conn.query("SET threads = 8").unwrap(); + + let glob_pattern = format!("{}/*.vortex", tempdir.path().display()); + let result = conn + .query(&format!( + "SELECT SUM(number) FROM read_vortex_v2('{glob_pattern}')" + )) + .unwrap(); + let chunk = result.into_iter().next().unwrap(); + let vec = chunk.get_vector(0); + let total = vec.as_slice_with_len::(chunk.len().as_())[0]; + assert_eq!(total, 51_199_840_000); +} diff --git a/vortex-duckdb/src/exporter/mod.rs b/vortex-duckdb/src/exporter/mod.rs index 517776f5521..7c8f2943be4 100644 --- a/vortex-duckdb/src/exporter/mod.rs +++ b/vortex-duckdb/src/exporter/mod.rs @@ -46,6 +46,11 @@ pub struct ArrayExporter { /// Columns DuckDB requested to read from file. If empty, it's a zero-column /// projection and should be handled accordingly, see ArrayExporter::export. fields: Vec>, + /// Optional sparse mapping from Vortex struct fields to DuckDB chunk + /// vectors. Used by DuckDB's multi-file scan when filter-only columns are + /// present in the intermediate chunk but not materialized by Vortex. + field_positions: Option>, + chunk_column_count: Option, array_len: usize, remaining: usize, } @@ -67,6 +72,41 @@ impl ArrayExporter { Ok(Self { ctx, fields, + field_positions: None, + chunk_column_count: None, + array_len: array.len(), + remaining: array.len(), + }) + } + + pub fn try_new_with_positions( + array: &StructArray, + cache: &ConversionCache, + mut ctx: ExecutionCtx, + field_positions: Vec, + chunk_column_count: usize, + ) -> VortexResult { + let validity = array.validity()?.execute_mask(array.len(), &mut ctx)?; + assert!(validity.all_true()); + + let fields = array + .iter_unmasked_fields() + .map(|field| new_array_exporter(field.clone(), cache, &mut ctx)) + .collect::>>()?; + + if fields.len() != field_positions.len() { + vortex_bail!( + "Expected {} output positions for {} fields", + fields.len(), + field_positions.len() + ); + } + + Ok(Self { + ctx, + fields, + field_positions: Some(field_positions), + chunk_column_count: Some(chunk_column_count), array_len: array.len(), remaining: array.len(), }) @@ -88,6 +128,32 @@ impl ArrayExporter { let zero_projection = self.fields.is_empty(); + if let Some(field_positions) = &self.field_positions { + let expected_cols = self + .chunk_column_count + .vortex_expect("sparse exporter missing chunk column count"); + let chunk_cols = chunk.column_count(); + if chunk_cols != expected_cols { + vortex_bail!("Expected {expected_cols} columns in output chunk, got {chunk_cols}"); + } + + let chunk_len = duckdb_vector_size().min(self.remaining); + let position = self.array_len - self.remaining; + self.remaining -= chunk_len; + chunk.set_len(chunk_len); + + for (field, pos) in self.fields.iter().zip(field_positions.iter().copied()) { + field.export( + position, + chunk_len, + chunk.get_vector_mut(pos), + &mut self.ctx, + )?; + } + + return Ok(true); + } + // file_row_number column is already populated in scan construction let expected_cols = self.fields.len() + file_index_column_pos.is_some() as usize; let chunk_cols = chunk.column_count(); diff --git a/vortex-duckdb/src/lib.rs b/vortex-duckdb/src/lib.rs index 413a71c611e..902c501e3a0 100644 --- a/vortex-duckdb/src/lib.rs +++ b/vortex-duckdb/src/lib.rs @@ -22,6 +22,7 @@ use crate::duckdb::LogicalType; use crate::duckdb::Value; use crate::multi_file::VortexMultiFileScan; use crate::multi_file::VortexMultiFileScanList; +use crate::multi_file_function::VortexMultiFileFunction; mod convert; mod datasource; @@ -29,6 +30,7 @@ pub mod duckdb; mod exporter; mod filesystem; mod multi_file; +mod multi_file_function; #[rustfmt::skip] #[path = "./cpp.rs"] @@ -45,6 +47,32 @@ static RUNTIME: LazyLock = LazyLock::new(CurrentThreadRunt static SESSION: LazyLock = LazyLock::new(|| VortexSession::default().with_handle(RUNTIME.handle())); +/// Returns true if the user has opted into the experimental MultiFileFunction- +/// backed scan path via `VX_DUCKDB_MULTI_FILE_FUNCTION=1` (or `=true`). +/// +/// Used to switch between the existing TableFunction-driven `read_vortex` and +/// the new `MultiFileFunction`-driven path during benchmarking. Defaults +/// to off so the existing scan remains the path of record. +/// +/// Known gaps in the v2 path (compared to v1) at time of writing: +/// - No batch parallelism within a file (`TryInitializeScan` is one-shot, so +/// each Vortex file is scanned by a single worker). +/// - No `union_by_name`, hive partitioning columns, or `filename` virtual +/// column wired through. +/// - No support for the named parameters DuckDB's `MultiFileReader` adds +/// (`union_by_name`, `hive_partitioning`, …) — `ParseOption` returns false. +/// - No `COPY ... FROM 'x.vortex'` via this path. +/// +/// These are tracked as follow-up work; for now `read_vortex_v2` exists +/// alongside `read_vortex` so orchestration paths can be benchmarked +/// side-by-side. +fn use_multi_file_function() -> bool { + matches!( + std::env::var("VX_DUCKDB_MULTI_FILE_FUNCTION").as_deref(), + Ok("1") | Ok("true") | Ok("TRUE") + ) +} + /// Initialize the Vortex extension by registering the extension functions. /// Note: This also registers extension options. If you want to register options /// separately (e.g., before creating connections), call `register_extension_options` first. @@ -55,11 +83,29 @@ pub fn initialize(db: &DatabaseRef) -> VortexResult<()> { LogicalType::varchar(), Value::from("vortex"), )?; - db.register_table_function::(c"vortex_scan")?; - db.register_table_function::(c"read_vortex")?; - // Register list overloads for multi-glob scanning (e.g., read_vortex(['a.vortex', 'b.vortex'])) - db.register_table_function::(c"vortex_scan")?; - db.register_table_function::(c"read_vortex")?; + db.config().add_extension_options( + "vortex_metadata_cache", + "Cache Vortex file metadata - useful when reading the same files multiple times.", + LogicalType::bool(), + Value::from(false), + )?; + if use_multi_file_function() { + // Replace the table-function-based scan with the MultiFileFunction + // path under the canonical names. Also expose under v2 names so an A/B + // test can run both registrations side-by-side. + db.register_multi_file_function::(c"vortex_scan")?; + db.register_multi_file_function::(c"read_vortex")?; + } else { + db.register_table_function::(c"vortex_scan")?; + db.register_table_function::(c"read_vortex")?; + // Register list overloads for multi-glob scanning (e.g., read_vortex(['a.vortex', 'b.vortex'])) + db.register_table_function::(c"vortex_scan")?; + db.register_table_function::(c"read_vortex")?; + } + // Always expose the v2 path under its own name so it can be invoked + // explicitly without flipping the env var (useful for A/B testing within + // a single process). + db.register_multi_file_function::(c"read_vortex_v2")?; db.register_copy_function::(c"vortex", c"vortex") } diff --git a/vortex-duckdb/src/multi_file.rs b/vortex-duckdb/src/multi_file.rs index 3f99a854a22..2c4646aa08c 100644 --- a/vortex-duckdb/src/multi_file.rs +++ b/vortex-duckdb/src/multi_file.rs @@ -30,7 +30,7 @@ use crate::filesystem::resolve_filesystem; /// Accepts full URLs (e.g. `s3://bucket/prefix/*.vortex`, `file:///data/*.vortex`) as well as /// bare file paths. For bare paths, the path is made absolute (without requiring it to exist) /// so that relative paths such as `./data/*.vortex` or `../data/*.vortex` are resolved correctly. -fn parse_glob_url(glob_url_str: &str) -> VortexResult { +pub(crate) fn parse_glob_url(glob_url_str: &str) -> VortexResult { Url::parse(glob_url_str).or_else(|_| { let path = absolute(Path::new(glob_url_str)) .map_err(|e| vortex_err!("Failed making {glob_url_str} absolute: {e}"))?; diff --git a/vortex-duckdb/src/multi_file_function.rs b/vortex-duckdb/src/multi_file_function.rs new file mode 100644 index 00000000000..bee8734aadb --- /dev/null +++ b/vortex-duckdb/src/multi_file_function.rs @@ -0,0 +1,1423 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! Vortex implementation of [`MultiFileFunction`]. +//! +//! Plugs Vortex into DuckDB's `MultiFileFunction` template: cross-file +//! orchestration (globbing, parallelism, virtual columns, hive partitioning) +//! is handled by DuckDB. This module supplies the per-file reader using +//! [`VortexFile`] directly so file-level statistics, dtype, and pruning are +//! available without going through `MultiLayoutDataSource`. + +use std::collections::VecDeque; +use std::ffi::CStr; +use std::fmt::Debug; +use std::ops::Range; +use std::sync::atomic::AtomicBool; +use std::sync::atomic::AtomicU64; +use std::sync::atomic::Ordering; + +use itertools::Itertools; +use parking_lot::Mutex; +use vortex::array::ArrayRef; +use vortex::array::Canonical; +use vortex::array::VortexSessionExecute; +use vortex::array::arrays::ScalarFn; +use vortex::array::arrays::Struct; +use vortex::array::arrays::StructArray; +use vortex::array::arrays::scalar_fn::ScalarFnArrayExt; +use vortex::buffer::Buffer; +use vortex::dtype::DType; +use vortex::dtype::FieldName; +use vortex::dtype::FieldNames; +use vortex::dtype::PType; +use vortex::error::VortexResult; +use vortex::error::vortex_err; +use vortex::expr::Expression; +use vortex::expr::VortexExprExt; +use vortex::expr::and_collect; +use vortex::expr::cast; +use vortex::expr::col; +use vortex::expr::merge; +use vortex::expr::pack; +use vortex::expr::root; +use vortex::expr::select; +use vortex::file::Footer; +use vortex::file::OpenOptionsSessionExt; +use vortex::file::VortexFile; +use vortex::io::runtime::BlockingRuntime; +use vortex::io::runtime::Task; +use vortex::layout::layouts::row_idx::row_idx; +use vortex::scalar_fn::fns::pack::Pack; +use vortex::scan::selection::Selection; + +use crate::RUNTIME; +use crate::SESSION; +use crate::convert::try_from_bound_expression; +use crate::convert::try_from_table_filter; +use crate::convert::try_from_virtual_column_filter; +use crate::cpp::DUCKDB_VX_EXPR_TYPE; +use crate::duckdb::BaseFileReader; +use crate::duckdb::Cardinality; +use crate::duckdb::ClientContextRef; +use crate::duckdb::ColumnStatistics; +use crate::duckdb::DataChunkRef; +use crate::duckdb::DuckdbStringMapRef; +use crate::duckdb::ExpressionClass; +use crate::duckdb::ExpressionRef; +use crate::duckdb::ExtractedValue; +use crate::duckdb::LogicalType; +use crate::duckdb::MultiFileFunction; +use crate::duckdb::PartitionStats; +use crate::duckdb::ProjectedColumn; +use crate::duckdb::SchemaBuilder; +use crate::duckdb::TableFilterSetRef; +use crate::duckdb::duckdb_vector_size; +use crate::exporter::ArrayExporter; +use crate::exporter::ConversionCache; +use crate::filesystem::resolve_filesystem; +use crate::multi_file::parse_glob_url; + +type ScanTask = Task>>; + +const VORTEX_METADATA_CACHE_SETTING: &CStr = c"vortex_metadata_cache"; +const VORTEX_FOOTER_CACHE_TYPE: &CStr = c"vortex_footer"; +const DEFAULT_FOOTER_CACHE_BYTES: usize = 10 * 1024; +const FILE_ROW_NUMBER_COLUMN_ID: u64 = 9223372036854775809; +const FILE_INDEX_COLUMN_ID: u64 = 9223372036854775810; + +/// Open a [`VortexFile`] using whichever filesystem the user has configured +/// via the `vortex_filesystem` extension option. DuckDB has already expanded +/// any glob and chosen this exact path; we only need the right reader for +/// the URL scheme. Routing through the filesystem also lets HTTP/S3/etc. +/// transparently use DuckDB's `httpfs` when the user picks `'duckdb'`. +fn open_vortex_file(ctx: &ClientContextRef, path: &str) -> VortexResult { + let metadata_cache_enabled = vortex_metadata_cache_enabled(ctx); + let cached_footer = if metadata_cache_enabled { + // SAFETY: this module is the only writer for `vortex_footer` entries, + // and it stores exactly `Footer` values for this object type. + unsafe { ctx.object_cache_get_cloned::