From 89092fc8b05cb7719e77713f39887217fc28dc65 Mon Sep 17 00:00:00 2001 From: Davit Buniatyan Date: Sun, 8 Feb 2026 04:54:40 +0000 Subject: [PATCH 01/14] implementation of prefetcher --- cpp/bifrost/async_prefetcher.hpp | 10 +++ cpp/bifrost/column_streamer.hpp | 19 +++++ cpp/deeplake_pg/deeplake_executor.cpp | 5 ++ cpp/deeplake_pg/duckdb_deeplake_scan.cpp | 41 +++++++++- cpp/deeplake_pg/duckdb_executor.cpp | 52 +++++++++++++ cpp/deeplake_pg/extension_init.cpp | 54 +++++++++++++ cpp/deeplake_pg/table_am.cpp | 11 +++ cpp/deeplake_pg/table_data.hpp | 12 +++ cpp/deeplake_pg/table_data_impl.hpp | 93 +++++++++++++++++++++++ cpp/deeplake_pg/utils.hpp | 6 ++ postgres/tests/sql/tpch/create_schema.sql | 12 ++- 11 files changed, 311 insertions(+), 4 deletions(-) diff --git a/cpp/bifrost/async_prefetcher.hpp b/cpp/bifrost/async_prefetcher.hpp index 6d0244cd38..8e30c6999f 100644 --- a/cpp/bifrost/async_prefetcher.hpp +++ b/cpp/bifrost/async_prefetcher.hpp @@ -38,6 +38,16 @@ class async_prefetcher void start(); void stop() noexcept; + /** + * @brief Wait for the first batch to be ready (for cold run optimization). + * @param timeout_ms Maximum time to wait in milliseconds. + * + * This method is used for eager prefetching during cold runs. + * It fetches and caches the first batch so that subsequent next_batch() + * calls return immediately without blocking. + */ + void wait_for_first_batch(int64_t timeout_ms = 30000); + bool is_started() const noexcept; heimdall::dataset_view_ptr dataset() const noexcept; diff --git a/cpp/bifrost/column_streamer.hpp b/cpp/bifrost/column_streamer.hpp index dc4f7c7186..4fb1379501 100644 --- a/cpp/bifrost/column_streamer.hpp +++ b/cpp/bifrost/column_streamer.hpp @@ -35,6 +35,25 @@ class column_streamer return b.columns()[0].array(); } + /** + * @brief Returns the async promise for the next batch (for parallel warming). + */ + async::promise next_batch_async() + { + return prefetcher_.next_batch_async().then([](deeplake_core::batch b) { + return b.columns()[0].array(); + }); + } + + /** + * @brief Pre-fetch and cache the first batch for cold run optimization. + * @param timeout_ms Maximum time to wait in milliseconds. + */ + void ensure_first_batch_ready(int64_t timeout_ms = 30000) + { + prefetcher_.wait_for_first_batch(timeout_ms); + } + bool empty() const noexcept { return prefetcher_.size() == 0; diff --git a/cpp/deeplake_pg/deeplake_executor.cpp b/cpp/deeplake_pg/deeplake_executor.cpp index 5c89ecc4a2..3f7834ca8b 100644 --- a/cpp/deeplake_pg/deeplake_executor.cpp +++ b/cpp/deeplake_pg/deeplake_executor.cpp @@ -85,6 +85,11 @@ void analyze_plan(PlannedStmt* plan) } } } + + // Warm all streamers in parallel for cold run optimization + if (pg::eager_batch_prefetch) { + table_data->get_streamers().warm_all_streamers(); + } } pg::query_info::current().set_all_tables_are_deeplake(all_tables_are_deeplake); } diff --git a/cpp/deeplake_pg/duckdb_deeplake_scan.cpp b/cpp/deeplake_pg/duckdb_deeplake_scan.cpp index 19f00b02ed..69a44f781c 100644 --- a/cpp/deeplake_pg/duckdb_deeplake_scan.cpp +++ b/cpp/deeplake_pg/duckdb_deeplake_scan.cpp @@ -988,10 +988,47 @@ void deeplake_scan_function(duckdb::ClientContext& context, duckdb::TableFunctio deeplake_scan_function_helper helper(context, data, output); try { helper.scan(); + } catch (const duckdb::OutOfMemoryException& e) { + // Provide helpful error message with configuration hints for OOM + elog(ERROR, + "DuckDB out of memory during Deeplake scan: %s. " + "Consider increasing pg_deeplake.duckdb_memory_limit_mb or " + "setting pg_deeplake.duckdb_temp_directory for disk spilling.", + e.what()); } catch (const duckdb::Exception& e) { - elog(ERROR, "DuckDB exception during Deeplake scan: %s", e.what()); + // Check if the error message indicates memory issues + std::string msg = e.what(); + std::string msg_lower; + msg_lower.reserve(msg.size()); + for (char c : msg) { + msg_lower.push_back(static_cast(std::tolower(static_cast(c)))); + } + if (msg_lower.find("memory") != std::string::npos || msg_lower.find("oom") != std::string::npos) { + elog(ERROR, + "DuckDB memory error during Deeplake scan: %s. " + "Consider increasing pg_deeplake.duckdb_memory_limit_mb or " + "setting pg_deeplake.duckdb_temp_directory for disk spilling.", + e.what()); + } else { + elog(ERROR, "DuckDB exception during Deeplake scan: %s", e.what()); + } } catch (const std::exception& e) { - elog(ERROR, "STD exception during Deeplake scan: %s", e.what()); + // Check if the error message indicates memory issues + std::string msg = e.what(); + std::string msg_lower; + msg_lower.reserve(msg.size()); + for (char c : msg) { + msg_lower.push_back(static_cast(std::tolower(static_cast(c)))); + } + if (msg_lower.find("memory") != std::string::npos || msg_lower.find("oom") != std::string::npos) { + elog(ERROR, + "Memory error during Deeplake scan: %s. " + "Consider increasing pg_deeplake.duckdb_memory_limit_mb or " + "setting pg_deeplake.duckdb_temp_directory for disk spilling.", + e.what()); + } else { + elog(ERROR, "STD exception during Deeplake scan: %s", e.what()); + } } catch (...) { elog(ERROR, "Unknown exception during Deeplake scan"); } diff --git a/cpp/deeplake_pg/duckdb_executor.cpp b/cpp/deeplake_pg/duckdb_executor.cpp index 0b58d60eb0..9b298c7ea8 100644 --- a/cpp/deeplake_pg/duckdb_executor.cpp +++ b/cpp/deeplake_pg/duckdb_executor.cpp @@ -17,6 +17,8 @@ #include "table_storage.hpp" #include "utils.hpp" +#include + #include #include #include @@ -74,6 +76,56 @@ std::unique_ptr create_connections() // Register the deeplake_scan table function for zero-copy access pg::register_deeplake_scan_function(*(conns->con_cpp)); + // Configure DuckDB memory management for large operations (e.g., JOINs at SF100+) + // This prevents segfaults during memory-intensive operations by enabling disk spilling + // + // Memory configuration: + // - If duckdb_memory_limit_mb > 0, use the explicit setting (in MB) + // - Otherwise, auto-detect using 80% of system memory with 256MB minimum floor + // - For containerized environments with cgroup limits, auto-detection may use host + // memory instead of container limits; set pg_deeplake.duckdb_memory_limit_mb explicitly + // + // All memory values use MB units consistently throughout this codebase + uint64_t mem_limit_mb = 0; + if (pg::duckdb_memory_limit_mb > 0) { + mem_limit_mb = static_cast(pg::duckdb_memory_limit_mb); + } else { + // Auto-detect: use 80% of system memory + uint64_t total_bytes = base::system_report::total_memory(); + mem_limit_mb = static_cast(total_bytes * 0.8 / (1024ULL * 1024ULL)); + if (mem_limit_mb < 256) { + mem_limit_mb = 256; // Minimum floor of 256MB + } + } + + // Apply memory limit to DuckDB + auto mem_result = conns->con_cpp->Query(fmt::format("SET memory_limit='{}MB'", mem_limit_mb)); + if (!mem_result || mem_result->HasError()) { + elog(WARNING, "Failed to set DuckDB memory_limit: %s", + mem_result ? mem_result->GetError().c_str() : "null result"); + } + + // Configure temp directory for disk spilling (if specified) + if (pg::duckdb_temp_directory != nullptr && std::strlen(pg::duckdb_temp_directory) > 0) { + auto temp_result = conns->con_cpp->Query( + fmt::format("SET temp_directory='{}'", pg::duckdb_temp_directory)); + if (!temp_result || temp_result->HasError()) { + elog(WARNING, "Failed to set DuckDB temp_directory: %s", + temp_result ? temp_result->GetError().c_str() : "null result"); + } + } + + // Log DuckDB settings at INFO level for operational visibility + auto verify_mem = conns->con_cpp->Query("SELECT current_setting('memory_limit')"); + if (verify_mem && !verify_mem->HasError() && verify_mem->RowCount() > 0) { + elog(INFO, "DuckDB memory_limit configured: %s", verify_mem->GetValue(0, 0).ToString().c_str()); + } + + auto verify_temp = conns->con_cpp->Query("SELECT current_setting('temp_directory')"); + if (verify_temp && !verify_temp->HasError() && verify_temp->RowCount() > 0) { + elog(INFO, "DuckDB temp_directory configured: %s", verify_temp->GetValue(0, 0).ToString().c_str()); + } + // For now, we'll use C++ API for queries since table functions require it // The C API connection will be used later when we can restructure to avoid table functions // or when DuckDB provides a way to register table functions via C API diff --git a/cpp/deeplake_pg/extension_init.cpp b/cpp/deeplake_pg/extension_init.cpp index 5444a07499..09895661e9 100644 --- a/cpp/deeplake_pg/extension_init.cpp +++ b/cpp/deeplake_pg/extension_init.cpp @@ -64,6 +64,11 @@ bool use_shared_mem_for_refresh = false; bool enable_dataset_logging = false; // Enable dataset operation logging for debugging bool allow_custom_paths = true; // Allow dataset_path in CREATE TABLE options bool stateless_enabled = false; // Enable stateless catalog sync across instances +bool eager_batch_prefetch = true; // Enable eager prefetch of first batch for cold run optimization + +// DuckDB memory management - controls memory_limit and temp_directory for large operations +int32_t duckdb_memory_limit_mb = 0; // 0 = auto-detect (80% of system memory) +char* duckdb_temp_directory = nullptr; // nullptr/empty = DuckDB's default temp location } // namespace pg @@ -245,6 +250,19 @@ void initialize_guc_parameters() nullptr, nullptr); + DefineCustomBoolVariable("pg_deeplake.eager_batch_prefetch", + "Enable eager prefetch of first batch for cold run optimization.", + "When enabled, the first batch of data for all columns is prefetched in parallel " + "when a scan begins. This significantly improves cold run performance by overlapping " + "the initial data fetches for multiple columns.", + &pg::eager_batch_prefetch, + true, + PGC_USERSET, + 0, + nullptr, + nullptr, + nullptr); + DefineCustomBoolVariable("pg_deeplake.enable_dataset_logging", "Enable operation logging for deeplake datasets.", "When enabled, all dataset operations (append_row, update_row, delete_row, etc.) " @@ -277,6 +295,42 @@ void initialize_guc_parameters() ); + // DuckDB memory management GUC parameters + // These control DuckDB's internal memory budget for large operations like JOINs + DefineCustomIntVariable( + "pg_deeplake.duckdb_memory_limit_mb", + "Memory limit for DuckDB's internal operations in MB.", + "Controls DuckDB's memory budget for large operations like JOINs and aggregations. " + "Set to 0 (default) for auto-detection using 80% of system memory. " + "When the limit is exceeded, DuckDB spills to disk using temp_directory. " + "For containerized environments with cgroup limits, set this explicitly as " + "auto-detection may use host memory instead of container limits.", + &pg::duckdb_memory_limit_mb, // linked C variable + 0, // default value (0 = auto-detect) + 0, // min value (0 = unlimited/auto) + INT_MAX, // max value + PGC_USERSET, // context - can be set by any user + GUC_UNIT_MB, // flags - treat as MB + nullptr, // check_hook + nullptr, // assign_hook + nullptr // show_hook + ); + + DefineCustomStringVariable( + "pg_deeplake.duckdb_temp_directory", + "Temporary directory for DuckDB disk spilling during large operations.", + "Specifies where DuckDB writes temporary files when memory_limit is exceeded. " + "Empty string (default) uses DuckDB's default temp location. " + "DuckDB will validate the path at runtime and fail gracefully if invalid.", + &pg::duckdb_temp_directory, // linked C variable + "", // default value (empty = DuckDB default) + PGC_USERSET, // context - can be set by any user + 0, // flags + nullptr, // check_hook + nullptr, // assign_hook + nullptr // show_hook + ); + // Initialize PostgreSQL memory tracking pg::memory_tracker::initialize_guc_parameters(); diff --git a/cpp/deeplake_pg/table_am.cpp b/cpp/deeplake_pg/table_am.cpp index c9d071e392..c3d462e284 100644 --- a/cpp/deeplake_pg/table_am.cpp +++ b/cpp/deeplake_pg/table_am.cpp @@ -373,6 +373,12 @@ double deeplake_index_build_range_scan(Relation heap_rel, td.create_streamer(attnum, -1); } } + + // Warm all streamers in parallel for cold run optimization + if (pg::eager_batch_prefetch) { + td.get_streamers().warm_all_streamers(); + } + std::vector values(nkeys, 0); std::vector nulls(nkeys, 0); pg::table_scan tscan(table_id, false, false); @@ -728,6 +734,11 @@ TableScanDesc deeplake_table_am_routine::scan_begin(Relation relation, } } + // Warm all streamers in parallel for cold run optimization + if (pg::eager_batch_prefetch) { + td.get_streamers().warm_all_streamers(); + } + if (nkeys > 0) { extended_scan->scan_state.nkeys = nkeys; // copy ScanKeyData because Postgres only gave us a pointer diff --git a/cpp/deeplake_pg/table_data.hpp b/cpp/deeplake_pg/table_data.hpp index be7b419625..65f02ff188 100644 --- a/cpp/deeplake_pg/table_data.hpp +++ b/cpp/deeplake_pg/table_data.hpp @@ -130,13 +130,25 @@ struct table_data std::vector column_to_batches; std::vector> streamers; + std::vector> first_batch_cache_; inline void reset() noexcept { column_to_batches.clear(); streamers.clear(); + first_batch_cache_.clear(); } + /** + * @brief Pre-warm all streamers by triggering parallel first batch downloads. + * + * This method initiates the download of the first batch for all active + * streamers in parallel, then waits for all downloads to complete. + * This significantly improves cold run performance by overlapping the + * initial data fetches. + */ + inline void warm_all_streamers(); + inline nd::array get_sample(int32_t column_number, int64_t row_number); template diff --git a/cpp/deeplake_pg/table_data_impl.hpp b/cpp/deeplake_pg/table_data_impl.hpp index 9f90517fe1..0ecc4f8f2d 100644 --- a/cpp/deeplake_pg/table_data_impl.hpp +++ b/cpp/deeplake_pg/table_data_impl.hpp @@ -9,6 +9,8 @@ #include +#include + // Inline implementation functions for table_data // This file should be included at the end of table_data.hpp @@ -112,6 +114,20 @@ inline void table_data::open_dataset(bool create) ASSERT(dataset_ != nullptr); num_total_rows_ = dataset_->num_rows(); + // Validate row count against TID conversion limits + // With TUPLES_PER_BLOCK=256 and BlockNumber=uint32_t, max is ~1.1 trillion rows + // which is safe for foreseeable scale factors, but provide early warning + constexpr int64_t MAX_SUPPORTED_ROWS = + static_cast(UINT32_MAX) * static_cast(pg::DEEPLAKE_TUPLES_PER_BLOCK); + if (num_total_rows_ > MAX_SUPPORTED_ROWS) { + elog(WARNING, + "Table '%s' has %ld rows, exceeding max supported %ld for TID conversion. " + "Consider partitioning or sharding.", + table_name_.c_str(), + num_total_rows_, + MAX_SUPPORTED_ROWS); + } + // Enable logging if GUC parameter is set if (pg::enable_dataset_logging && dataset_ && !dataset_->is_logging_enabled()) { dataset_->start_logging(); @@ -562,6 +578,42 @@ inline void table_data::create_streamer(int32_t idx, int32_t worker_id) streamers_.column_to_batches[idx].batches.resize(batch_count); } +inline void table_data::streamer_info::warm_all_streamers() +{ + // Pre-warm all streamers in parallel by triggering first batch downloads. + // Uses next_batch_async() + async::combine() to fetch all first batches in parallel. + // Results are stored in first_batch_cache_ for use by get_sample/value_ptr. + + first_batch_cache_.resize(streamers.size()); + + icm::vector indices; + icm::vector> promises; + indices.reserve(streamers.size()); + promises.reserve(streamers.size()); + + for (size_t i = 0; i < streamers.size(); ++i) { + if (streamers[i] && !streamers[i]->empty()) { + promises.push_back(streamers[i]->next_batch_async()); + indices.push_back(i); + } + } + + if (promises.empty()) { + return; + } + + try { + auto results = async::combine(std::move(promises)).get_future().get(); + for (size_t j = 0; j < indices.size(); ++j) { + first_batch_cache_[indices[j]] = std::move(results[j]); + } + } catch (const std::exception& e) { + base::log_warning(base::log_channel::async, "warm_all_streamers failed: {}", e.what()); + // Non-fatal - subsequent batch fetches will retry via normal path + first_batch_cache_.clear(); + } +} + inline nd::array table_data::streamer_info::get_sample(int32_t column_number, int64_t row_number) { const int64_t batch_index = row_number >> batch_size_log2_; @@ -569,6 +621,19 @@ inline nd::array table_data::streamer_info::get_sample(int32_t column_number, in auto& col_data = column_to_batches[column_number]; auto& batch = col_data.batches[batch_index]; + + // Check first_batch_cache_ for batch 0 (from warm_all_streamers) + if (batch_index == 0 && !first_batch_cache_.empty() && + static_cast(column_number) < first_batch_cache_.size() && + first_batch_cache_[column_number].has_value()) { + std::lock_guard lock(col_data.mutex_); + if (!batch.initialized_.load(std::memory_order_relaxed)) { + batch.owner_ = std::move(*first_batch_cache_[column_number]); + first_batch_cache_[column_number].reset(); + batch.initialized_.store(true, std::memory_order_release); + } + } + if (!batch.initialized_.load(std::memory_order_acquire)) [[unlikely]] { std::lock_guard lock(col_data.mutex_); for (int64_t i = 0; i <= batch_index; ++i) { @@ -595,6 +660,20 @@ inline const T* table_data::streamer_info::value_ptr(int32_t column_number, int6 auto& col_data = column_to_batches[column_number]; auto& batch = col_data.batches[batch_index]; + + // Check first_batch_cache_ for batch 0 (from warm_all_streamers) + if (batch_index == 0 && !first_batch_cache_.empty() && + static_cast(column_number) < first_batch_cache_.size() && + first_batch_cache_[column_number].has_value()) { + std::lock_guard lock(col_data.mutex_); + if (!batch.initialized_.load(std::memory_order_relaxed)) { + batch.owner_ = utils::eval_with_nones(std::move(*first_batch_cache_[column_number])); + batch.data_ = batch.owner_.data().data(); + first_batch_cache_[column_number].reset(); + batch.initialized_.store(true, std::memory_order_release); + } + } + if (!batch.initialized_.load(std::memory_order_acquire)) [[unlikely]] { std::lock_guard lock(col_data.mutex_); for (int64_t i = 0; i <= batch_index; ++i) { @@ -617,6 +696,20 @@ inline std::string_view table_data::streamer_info::value(int32_t column_number, auto& col_data = column_to_batches[column_number]; auto& batch = col_data.batches[batch_index]; + + // Check first_batch_cache_ for batch 0 (from warm_all_streamers) + if (batch_index == 0 && !first_batch_cache_.empty() && + static_cast(column_number) < first_batch_cache_.size() && + first_batch_cache_[column_number].has_value()) { + std::lock_guard lock(col_data.mutex_); + if (!batch.initialized_.load(std::memory_order_relaxed)) { + batch.owner_ = std::move(*first_batch_cache_[column_number]); + batch.holder_ = impl::string_stream_array_holder(batch.owner_); + first_batch_cache_[column_number].reset(); + batch.initialized_.store(true, std::memory_order_release); + } + } + if (!batch.initialized_.load(std::memory_order_acquire)) [[unlikely]] { std::lock_guard lock(col_data.mutex_); for (int64_t i = 0; i <= batch_index; ++i) { diff --git a/cpp/deeplake_pg/utils.hpp b/cpp/deeplake_pg/utils.hpp index 3cde8d0a2d..84e91d386e 100644 --- a/cpp/deeplake_pg/utils.hpp +++ b/cpp/deeplake_pg/utils.hpp @@ -57,6 +57,12 @@ extern bool use_shared_mem_for_refresh; extern bool enable_dataset_logging; extern bool allow_custom_paths; extern bool stateless_enabled; +extern bool eager_batch_prefetch; + +// DuckDB memory management GUC variables - defined in extension_init.cpp +// These control DuckDB's internal memory limit and temp directory for disk spilling +extern int32_t duckdb_memory_limit_mb; +extern char* duckdb_temp_directory; namespace utils { diff --git a/postgres/tests/sql/tpch/create_schema.sql b/postgres/tests/sql/tpch/create_schema.sql index e0a408ec78..40e090cdfc 100644 --- a/postgres/tests/sql/tpch/create_schema.sql +++ b/postgres/tests/sql/tpch/create_schema.sql @@ -1,3 +1,9 @@ +-- TPC-H Schema for pg_deeplake (v2: BIGINT keys for SF1000+ support, 2026-02) +-- +-- MIGRATION: Existing SF1000 data with INT overflow issues is already corrupted. +-- Re-ingestion with this schema is required to recover from overflow errors. +-- SF10 and SF100 data will continue to work since o_orderkey maxes at ~600M for SF100. + DROP TABLE IF EXISTS customer; DROP TABLE IF EXISTS lineitem; DROP TABLE IF EXISTS nation; @@ -18,8 +24,9 @@ CREATE TABLE customer ( c_comment VARCHAR(117) NOT NULL ) USING deeplake; +-- Note: l_orderkey uses BIGINT to support SF1000+ (values exceed 2.1B INT4 limit) CREATE TABLE lineitem ( - l_orderkey int NOT NULL, + l_orderkey bigint NOT NULL, l_partkey int NOT NULL, l_suppkey int not null, l_linenumber int not null, @@ -44,8 +51,9 @@ CREATE TABLE nation ( n_comment varchar(152) NULL ) USING deeplake; +-- Note: o_orderkey uses BIGINT to support SF1000+ (values exceed 2.1B INT4 limit) CREATE TABLE orders ( - o_orderkey int NOT NULL, + o_orderkey bigint NOT NULL, o_custkey int NOT NULL, o_orderstatus VARCHAR(1) NOT NULL, o_totalprice decimal(15, 2) NOT NULL, From 62706af453d4424c4b519957baaa47c1064afc2f Mon Sep 17 00:00:00 2001 From: Davit Buniatyan Date: Sun, 8 Feb 2026 06:49:16 +0000 Subject: [PATCH 02/14] minor fixes --- cpp/deeplake_pg/duckdb_executor.cpp | 8 ++- cpp/deeplake_pg/table_am.cpp | 93 +++++++++++++++++++++++++---- cpp/deeplake_pg/table_data.hpp | 2 - cpp/deeplake_pg/table_data_impl.hpp | 81 +++++-------------------- cpp/deeplake_pg/table_scan.hpp | 4 +- cpp/deeplake_pg/table_scan_impl.hpp | 4 +- 6 files changed, 107 insertions(+), 85 deletions(-) diff --git a/cpp/deeplake_pg/duckdb_executor.cpp b/cpp/deeplake_pg/duckdb_executor.cpp index 9b298c7ea8..071ef7ff1c 100644 --- a/cpp/deeplake_pg/duckdb_executor.cpp +++ b/cpp/deeplake_pg/duckdb_executor.cpp @@ -107,8 +107,14 @@ std::unique_ptr create_connections() // Configure temp directory for disk spilling (if specified) if (pg::duckdb_temp_directory != nullptr && std::strlen(pg::duckdb_temp_directory) > 0) { + // Sanitize the directory path to prevent SQL injection + std::string safe_dir(pg::duckdb_temp_directory); + // Escape single quotes by doubling them (standard SQL escaping) + for (size_t pos = 0; (pos = safe_dir.find('\'', pos)) != std::string::npos; pos += 2) { + safe_dir.insert(pos, 1, '\''); + } auto temp_result = conns->con_cpp->Query( - fmt::format("SET temp_directory='{}'", pg::duckdb_temp_directory)); + fmt::format("SET temp_directory='{}'", safe_dir)); if (!temp_result || temp_result->HasError()) { elog(WARNING, "Failed to set DuckDB temp_directory: %s", temp_result ? temp_result->GetError().c_str() : "null result"); diff --git a/cpp/deeplake_pg/table_am.cpp b/cpp/deeplake_pg/table_am.cpp index c3d462e284..31b65c8092 100644 --- a/cpp/deeplake_pg/table_am.cpp +++ b/cpp/deeplake_pg/table_am.cpp @@ -294,8 +294,20 @@ bool deeplake_scan_analyze_next_tuple( return false; } - if (!scan_data->scan_state.get_next_tuple(slot)) { - return false; // no more tuples + try { + if (!scan_data->scan_state.get_next_tuple(slot)) { + return false; // no more tuples + } + } catch (const std::exception& e) { + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Deeplake ANALYZE scan failed: %s", e.what()))); + return false; + } catch (...) { + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Deeplake ANALYZE scan failed: unknown exception"))); + return false; } /* @@ -797,7 +809,18 @@ bool deeplake_table_am_routine::scan_getnextslot(TableScanDesc scan, ScanDirecti ++scan_data->progress_bar; } - return scan_data->scan_state.get_next_tuple(slot); + try { + return scan_data->scan_state.get_next_tuple(slot); + } catch (const std::exception& e) { + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Deeplake scan failed: %s", e.what()))); + } catch (...) { + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Deeplake scan failed: unknown exception"))); + } + return false; } void deeplake_table_am_routine::scan_set_tidrange(TableScanDesc scan, ItemPointer mintid, ItemPointer maxtid) @@ -844,7 +867,18 @@ bool deeplake_table_am_routine::scan_getnextslot_tidrange(TableScanDesc scan, // Switch to the dedicated memory context for this scan pg::utils::memory_context_switcher context_switcher(scan_data->memory_context); - return scan_data->scan_state.get_next_tuple(slot); + try { + return scan_data->scan_state.get_next_tuple(slot); + } catch (const std::exception& e) { + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Deeplake TID range scan failed: %s", e.what()))); + } catch (...) { + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Deeplake TID range scan failed: unknown exception"))); + } + return false; } #if PG_VERSION_NUM >= PG_VERSION_NUM_18 @@ -897,7 +931,18 @@ bool deeplake_table_am_routine::scan_bitmap_next_tuple( // Get next row number int64_t row_num = scan_data->bitmap_row_numbers[scan_data->current_offset++]; scan_data->scan_state.set_current_position(row_num); - return scan_data->scan_state.get_next_tuple(slot); + try { + return scan_data->scan_state.get_next_tuple(slot); + } catch (const std::exception& e) { + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Deeplake bitmap scan failed: %s", e.what()))); + } catch (...) { + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Deeplake bitmap scan failed: unknown exception"))); + } + return false; } #endif @@ -945,13 +990,23 @@ bool deeplake_table_am_routine::scan_sample_next_tuple(TableScanDesc scan, sample_fraction = sampler->fraction; } - while (scan_data->scan_state.get_next_tuple(slot)) { - // random fraction, should come from scanstate->tsm_state - const double random_value = (double)random() / RAND_MAX; + try { + while (scan_data->scan_state.get_next_tuple(slot)) { + // random fraction, should come from scanstate->tsm_state + const double random_value = (double)random() / RAND_MAX; - if (random_value <= sample_fraction) { - return true; + if (random_value <= sample_fraction) { + return true; + } } + } catch (const std::exception& e) { + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Deeplake sample scan failed: %s", e.what()))); + } catch (...) { + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Deeplake sample scan failed: unknown exception"))); } return false; @@ -997,10 +1052,22 @@ bool deeplake_table_am_routine::index_fetch_tuple(struct IndexFetchTableData* sc pg::utils::memory_context_switcher context_switcher(idx_scan->memory_context); idx_scan->scan_state.set_current_position(utils::tid_to_row_number(tid)); - if (!idx_scan->scan_state.get_next_tuple(slot)) { - if (all_dead != nullptr) { - *all_dead = true; + try { + if (!idx_scan->scan_state.get_next_tuple(slot)) { + if (all_dead != nullptr) { + *all_dead = true; + } + return false; } + } catch (const std::exception& e) { + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Deeplake index fetch failed: %s", e.what()))); + return false; + } catch (...) { + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Deeplake index fetch failed: unknown exception"))); return false; } diff --git a/cpp/deeplake_pg/table_data.hpp b/cpp/deeplake_pg/table_data.hpp index 65f02ff188..fd6fe1a097 100644 --- a/cpp/deeplake_pg/table_data.hpp +++ b/cpp/deeplake_pg/table_data.hpp @@ -130,13 +130,11 @@ struct table_data std::vector column_to_batches; std::vector> streamers; - std::vector> first_batch_cache_; inline void reset() noexcept { column_to_batches.clear(); streamers.clear(); - first_batch_cache_.clear(); } /** diff --git a/cpp/deeplake_pg/table_data_impl.hpp b/cpp/deeplake_pg/table_data_impl.hpp index 0ecc4f8f2d..30bb55f9c3 100644 --- a/cpp/deeplake_pg/table_data_impl.hpp +++ b/cpp/deeplake_pg/table_data_impl.hpp @@ -580,37 +580,26 @@ inline void table_data::create_streamer(int32_t idx, int32_t worker_id) inline void table_data::streamer_info::warm_all_streamers() { - // Pre-warm all streamers in parallel by triggering first batch downloads. - // Uses next_batch_async() + async::combine() to fetch all first batches in parallel. - // Results are stored in first_batch_cache_ for use by get_sample/value_ptr. - - first_batch_cache_.resize(streamers.size()); - - icm::vector indices; - icm::vector> promises; - indices.reserve(streamers.size()); - promises.reserve(streamers.size()); + // Pre-warm all streamers in parallel by using the prefetcher's built-in caching. + // This calls ensure_first_batch_ready() on each streamer, which internally caches the + // first batch inside the async_prefetcher. When next_batch() is later called, it returns + // the cached batch without re-fetching. + // + // This approach is safe because: + // - Each streamer manages its own cache independently (no "all or nothing" failure) + // - The prefetcher's internal cache is consumed by next_batch() transparently + // - No external first_batch_cache_ needed, avoiding race conditions for (size_t i = 0; i < streamers.size(); ++i) { if (streamers[i] && !streamers[i]->empty()) { - promises.push_back(streamers[i]->next_batch_async()); - indices.push_back(i); - } - } - - if (promises.empty()) { - return; - } - - try { - auto results = async::combine(std::move(promises)).get_future().get(); - for (size_t j = 0; j < indices.size(); ++j) { - first_batch_cache_[indices[j]] = std::move(results[j]); + try { + streamers[i]->ensure_first_batch_ready(); + } catch (const std::exception& e) { + base::log_error(base::log_channel::async, + "warm_all_streamers: streamer {} failed: {}", i, e.what()); + // Non-fatal per-streamer: subsequent next_batch() will retry via normal path + } } - } catch (const std::exception& e) { - base::log_warning(base::log_channel::async, "warm_all_streamers failed: {}", e.what()); - // Non-fatal - subsequent batch fetches will retry via normal path - first_batch_cache_.clear(); } } @@ -622,18 +611,6 @@ inline nd::array table_data::streamer_info::get_sample(int32_t column_number, in auto& col_data = column_to_batches[column_number]; auto& batch = col_data.batches[batch_index]; - // Check first_batch_cache_ for batch 0 (from warm_all_streamers) - if (batch_index == 0 && !first_batch_cache_.empty() && - static_cast(column_number) < first_batch_cache_.size() && - first_batch_cache_[column_number].has_value()) { - std::lock_guard lock(col_data.mutex_); - if (!batch.initialized_.load(std::memory_order_relaxed)) { - batch.owner_ = std::move(*first_batch_cache_[column_number]); - first_batch_cache_[column_number].reset(); - batch.initialized_.store(true, std::memory_order_release); - } - } - if (!batch.initialized_.load(std::memory_order_acquire)) [[unlikely]] { std::lock_guard lock(col_data.mutex_); for (int64_t i = 0; i <= batch_index; ++i) { @@ -661,19 +638,6 @@ inline const T* table_data::streamer_info::value_ptr(int32_t column_number, int6 auto& col_data = column_to_batches[column_number]; auto& batch = col_data.batches[batch_index]; - // Check first_batch_cache_ for batch 0 (from warm_all_streamers) - if (batch_index == 0 && !first_batch_cache_.empty() && - static_cast(column_number) < first_batch_cache_.size() && - first_batch_cache_[column_number].has_value()) { - std::lock_guard lock(col_data.mutex_); - if (!batch.initialized_.load(std::memory_order_relaxed)) { - batch.owner_ = utils::eval_with_nones(std::move(*first_batch_cache_[column_number])); - batch.data_ = batch.owner_.data().data(); - first_batch_cache_[column_number].reset(); - batch.initialized_.store(true, std::memory_order_release); - } - } - if (!batch.initialized_.load(std::memory_order_acquire)) [[unlikely]] { std::lock_guard lock(col_data.mutex_); for (int64_t i = 0; i <= batch_index; ++i) { @@ -697,19 +661,6 @@ inline std::string_view table_data::streamer_info::value(int32_t column_number, auto& col_data = column_to_batches[column_number]; auto& batch = col_data.batches[batch_index]; - // Check first_batch_cache_ for batch 0 (from warm_all_streamers) - if (batch_index == 0 && !first_batch_cache_.empty() && - static_cast(column_number) < first_batch_cache_.size() && - first_batch_cache_[column_number].has_value()) { - std::lock_guard lock(col_data.mutex_); - if (!batch.initialized_.load(std::memory_order_relaxed)) { - batch.owner_ = std::move(*first_batch_cache_[column_number]); - batch.holder_ = impl::string_stream_array_holder(batch.owner_); - first_batch_cache_[column_number].reset(); - batch.initialized_.store(true, std::memory_order_release); - } - } - if (!batch.initialized_.load(std::memory_order_acquire)) [[unlikely]] { std::lock_guard lock(col_data.mutex_); for (int64_t i = 0; i <= batch_index; ++i) { diff --git a/cpp/deeplake_pg/table_scan.hpp b/cpp/deeplake_pg/table_scan.hpp index afe38f1a42..028b58eb60 100644 --- a/cpp/deeplake_pg/table_scan.hpp +++ b/cpp/deeplake_pg/table_scan.hpp @@ -32,8 +32,8 @@ class table_scan inline table_scan& operator=(table_scan&&) = delete; inline ~table_scan() = default; - inline std::pair get_datum(int32_t column_number, int64_t row_number) const noexcept; - inline void convert_nd_to_pg(int64_t row_number, Datum* values, bool* nulls) const noexcept; + inline std::pair get_datum(int32_t column_number, int64_t row_number) const; + inline void convert_nd_to_pg(int64_t row_number, Datum* values, bool* nulls) const; inline bool get_next_tuple(TupleTableSlot* slot); diff --git a/cpp/deeplake_pg/table_scan_impl.hpp b/cpp/deeplake_pg/table_scan_impl.hpp index caca926b25..088302d3e6 100644 --- a/cpp/deeplake_pg/table_scan_impl.hpp +++ b/cpp/deeplake_pg/table_scan_impl.hpp @@ -57,7 +57,7 @@ inline table_scan::table_scan(Oid table_id, bool is_parallel, bool streamer_only } } -inline std::pair table_scan::get_datum(int32_t column_number, int64_t row_number) const noexcept +inline std::pair table_scan::get_datum(int32_t column_number, int64_t row_number) const { const auto base_typeid = table_data_.get_base_atttypid(column_number); const auto column_typmod = table_data_.get_atttypmod(column_number); @@ -155,7 +155,7 @@ inline std::pair table_scan::get_datum(int32_t column_number, int64 return {(Datum)0, true}; } -inline void table_scan::convert_nd_to_pg(int64_t row_number, Datum* values, bool* nulls) const noexcept +inline void table_scan::convert_nd_to_pg(int64_t row_number, Datum* values, bool* nulls) const { for (auto col : null_columns_) { nulls[col] = true; From ea991204b7c99aaa2c2b4f9f9e331ad476b3e27e Mon Sep 17 00:00:00 2001 From: Davit Buniatyan Date: Sun, 8 Feb 2026 08:00:54 +0000 Subject: [PATCH 03/14] fixes --- cpp/CMakePresets.json | 4 +- cpp/deeplake_pg/table_am.cpp | 2 + cpp/deeplake_pg/table_data.hpp | 2 + cpp/deeplake_pg/table_data_impl.hpp | 80 ++++++++++++++++++++++++----- 4 files changed, 74 insertions(+), 14 deletions(-) diff --git a/cpp/CMakePresets.json b/cpp/CMakePresets.json index 00ef436ad4..3fa323c23f 100644 --- a/cpp/CMakePresets.json +++ b/cpp/CMakePresets.json @@ -28,7 +28,7 @@ "inherits": "template-unix", "cacheVariables": { "AL_PG": "ON", - "AL_ASSERTIONS": "ON" + "AL_ASSERTIONS": "OFF" } }, { @@ -36,7 +36,7 @@ "inherits": "template-windows", "cacheVariables": { "AL_PG": "ON", - "AL_ASSERTIONS": "ON" + "AL_ASSERTIONS": "OFF" } }, { diff --git a/cpp/deeplake_pg/table_am.cpp b/cpp/deeplake_pg/table_am.cpp index 31b65c8092..5b7758c68d 100644 --- a/cpp/deeplake_pg/table_am.cpp +++ b/cpp/deeplake_pg/table_am.cpp @@ -389,6 +389,7 @@ double deeplake_index_build_range_scan(Relation heap_rel, // Warm all streamers in parallel for cold run optimization if (pg::eager_batch_prefetch) { td.get_streamers().warm_all_streamers(); + base::log_info(base::log_channel::generic, "Eager batch prefetch completed for index_build_range_scan"); } std::vector values(nkeys, 0); @@ -749,6 +750,7 @@ TableScanDesc deeplake_table_am_routine::scan_begin(Relation relation, // Warm all streamers in parallel for cold run optimization if (pg::eager_batch_prefetch) { td.get_streamers().warm_all_streamers(); + base::log_info(base::log_channel::generic, "Eager batch prefetch completed for scan_begin"); } if (nkeys > 0) { diff --git a/cpp/deeplake_pg/table_data.hpp b/cpp/deeplake_pg/table_data.hpp index fd6fe1a097..d0a4e7eb40 100644 --- a/cpp/deeplake_pg/table_data.hpp +++ b/cpp/deeplake_pg/table_data.hpp @@ -130,11 +130,13 @@ struct table_data std::vector column_to_batches; std::vector> streamers; + icm::vector> first_batch_cache_; inline void reset() noexcept { column_to_batches.clear(); streamers.clear(); + first_batch_cache_.clear(); } /** diff --git a/cpp/deeplake_pg/table_data_impl.hpp b/cpp/deeplake_pg/table_data_impl.hpp index 30bb55f9c3..1fbe05fc28 100644 --- a/cpp/deeplake_pg/table_data_impl.hpp +++ b/cpp/deeplake_pg/table_data_impl.hpp @@ -580,27 +580,45 @@ inline void table_data::create_streamer(int32_t idx, int32_t worker_id) inline void table_data::streamer_info::warm_all_streamers() { - // Pre-warm all streamers in parallel by using the prefetcher's built-in caching. - // This calls ensure_first_batch_ready() on each streamer, which internally caches the - // first batch inside the async_prefetcher. When next_batch() is later called, it returns - // the cached batch without re-fetching. - // - // This approach is safe because: - // - Each streamer manages its own cache independently (no "all or nothing" failure) - // - The prefetcher's internal cache is consumed by next_batch() transparently - // - No external first_batch_cache_ needed, avoiding race conditions + // Pre-warm all streamers in parallel using async::combine() for maximum parallelism. + // This triggers concurrent downloads for all first batches, significantly reducing + // cold run latency compared to sequential warming. + + first_batch_cache_.resize(streamers.size()); + + icm::vector indices; + icm::vector> promises; for (size_t i = 0; i < streamers.size(); ++i) { if (streamers[i] && !streamers[i]->empty()) { try { - streamers[i]->ensure_first_batch_ready(); + promises.push_back(streamers[i]->next_batch_async()); + indices.push_back(i); } catch (const std::exception& e) { base::log_error(base::log_channel::async, - "warm_all_streamers: streamer {} failed: {}", i, e.what()); - // Non-fatal per-streamer: subsequent next_batch() will retry via normal path + "warm_all_streamers: streamer {} failed to start async: {}", i, e.what()); } } } + + if (promises.empty()) { + return; + } + + try { + // Wait for all promises in parallel + auto results = async::combine(std::move(promises)).get_future().get(); + + // Store results in first_batch_cache_ + for (size_t j = 0; j < indices.size(); ++j) { + first_batch_cache_[indices[j]] = std::move(results[j]); + } + } catch (const std::exception& e) { + base::log_error(base::log_channel::async, + "warm_all_streamers: async::combine failed: {}", e.what()); + // Clear cache on failure - subsequent next_batch() will retry normally + first_batch_cache_.clear(); + } } inline nd::array table_data::streamer_info::get_sample(int32_t column_number, int64_t row_number) @@ -611,6 +629,18 @@ inline nd::array table_data::streamer_info::get_sample(int32_t column_number, in auto& col_data = column_to_batches[column_number]; auto& batch = col_data.batches[batch_index]; + // Check first_batch_cache_ for batch_index 0 + if (batch_index == 0 && !first_batch_cache_.empty() && + static_cast(column_number) < first_batch_cache_.size() && + first_batch_cache_[column_number].has_value()) { + std::lock_guard lock(col_data.mutex_); + if (!batch.initialized_.load(std::memory_order_relaxed)) { + batch.owner_ = std::move(*first_batch_cache_[column_number]); + first_batch_cache_[column_number].reset(); + batch.initialized_.store(true, std::memory_order_release); + } + } + if (!batch.initialized_.load(std::memory_order_acquire)) [[unlikely]] { std::lock_guard lock(col_data.mutex_); for (int64_t i = 0; i <= batch_index; ++i) { @@ -638,6 +668,19 @@ inline const T* table_data::streamer_info::value_ptr(int32_t column_number, int6 auto& col_data = column_to_batches[column_number]; auto& batch = col_data.batches[batch_index]; + // Check first_batch_cache_ for batch_index 0 + if (batch_index == 0 && !first_batch_cache_.empty() && + static_cast(column_number) < first_batch_cache_.size() && + first_batch_cache_[column_number].has_value()) { + std::lock_guard lock(col_data.mutex_); + if (!batch.initialized_.load(std::memory_order_relaxed)) { + batch.owner_ = utils::eval_with_nones(std::move(*first_batch_cache_[column_number])); + batch.data_ = batch.owner_.data().data(); + first_batch_cache_[column_number].reset(); + batch.initialized_.store(true, std::memory_order_release); + } + } + if (!batch.initialized_.load(std::memory_order_acquire)) [[unlikely]] { std::lock_guard lock(col_data.mutex_); for (int64_t i = 0; i <= batch_index; ++i) { @@ -661,6 +704,19 @@ inline std::string_view table_data::streamer_info::value(int32_t column_number, auto& col_data = column_to_batches[column_number]; auto& batch = col_data.batches[batch_index]; + // Check first_batch_cache_ for batch_index 0 + if (batch_index == 0 && !first_batch_cache_.empty() && + static_cast(column_number) < first_batch_cache_.size() && + first_batch_cache_[column_number].has_value()) { + std::lock_guard lock(col_data.mutex_); + if (!batch.initialized_.load(std::memory_order_relaxed)) { + batch.owner_ = std::move(*first_batch_cache_[column_number]); + batch.holder_ = impl::string_stream_array_holder(batch.owner_); + first_batch_cache_[column_number].reset(); + batch.initialized_.store(true, std::memory_order_release); + } + } + if (!batch.initialized_.load(std::memory_order_acquire)) [[unlikely]] { std::lock_guard lock(col_data.mutex_); for (int64_t i = 0; i <= batch_index; ++i) { From 131592f79193943d498cdc194f0f8e028c646776 Mon Sep 17 00:00:00 2001 From: Davit Buniatyan Date: Sun, 8 Feb 2026 16:57:22 +0000 Subject: [PATCH 04/14] minor update --- cpp/deeplake_pg/table_data_impl.hpp | 25 ++++++++++++++++++++++--- 1 file changed, 22 insertions(+), 3 deletions(-) diff --git a/cpp/deeplake_pg/table_data_impl.hpp b/cpp/deeplake_pg/table_data_impl.hpp index 1fbe05fc28..bda94a4b69 100644 --- a/cpp/deeplake_pg/table_data_impl.hpp +++ b/cpp/deeplake_pg/table_data_impl.hpp @@ -7,6 +7,7 @@ #include "table_version.hpp" #include "utils.hpp" +#include #include #include @@ -584,6 +585,11 @@ inline void table_data::streamer_info::warm_all_streamers() // This triggers concurrent downloads for all first batches, significantly reducing // cold run latency compared to sequential warming. + // Early exit if no streamers + if (streamers.empty()) { + return; + } + first_batch_cache_.resize(streamers.size()); icm::vector indices; @@ -592,11 +598,16 @@ inline void table_data::streamer_info::warm_all_streamers() for (size_t i = 0; i < streamers.size(); ++i) { if (streamers[i] && !streamers[i]->empty()) { try { - promises.push_back(streamers[i]->next_batch_async()); + auto promise = streamers[i]->next_batch_async(); + // Only add promises that are valid (not error promises) + promises.push_back(std::move(promise)); indices.push_back(i); } catch (const std::exception& e) { - base::log_error(base::log_channel::async, + base::log_warning(base::log_channel::async, "warm_all_streamers: streamer {} failed to start async: {}", i, e.what()); + } catch (...) { + base::log_warning(base::log_channel::async, + "warm_all_streamers: streamer {} failed with unknown exception", i); } } } @@ -613,11 +624,19 @@ inline void table_data::streamer_info::warm_all_streamers() for (size_t j = 0; j < indices.size(); ++j) { first_batch_cache_[indices[j]] = std::move(results[j]); } + } catch (const bifrost::stop_iteration&) { + // Expected when there are no more batches - silently ignore + base::log_debug(base::log_channel::async, "warm_all_streamers: no batches to warm (stop_iteration)"); + first_batch_cache_.clear(); } catch (const std::exception& e) { - base::log_error(base::log_channel::async, + base::log_warning(base::log_channel::async, "warm_all_streamers: async::combine failed: {}", e.what()); // Clear cache on failure - subsequent next_batch() will retry normally first_batch_cache_.clear(); + } catch (...) { + base::log_warning(base::log_channel::async, + "warm_all_streamers: async::combine failed with unknown exception"); + first_batch_cache_.clear(); } } From 56b74f43f1d1a5b950d8e76d0c8aebfb6a97959f Mon Sep 17 00:00:00 2001 From: Davit Buniatyan Date: Sun, 8 Feb 2026 21:56:30 +0000 Subject: [PATCH 05/14] cpp updates --- cpp/CMakeLists.pg.cmake | 47 +++++++++++++- cpp/bifrost/column_streamer.hpp | 14 ++-- cpp/cmake/modules/FindPostgres.cmake | 37 ++++++----- cpp/deeplake_pg/deeplake_executor.cpp | 1 + cpp/deeplake_pg/extension_init.cpp | 5 ++ cpp/deeplake_pg/table_data.hpp | 2 - cpp/deeplake_pg/table_data_impl.hpp | 94 ++++----------------------- 7 files changed, 88 insertions(+), 112 deletions(-) diff --git a/cpp/CMakeLists.pg.cmake b/cpp/CMakeLists.pg.cmake index fa53cf215f..0f77ac6c89 100644 --- a/cpp/CMakeLists.pg.cmake +++ b/cpp/CMakeLists.pg.cmake @@ -54,11 +54,48 @@ include_directories(${DEFAULT_PARENT_DIR}/.ext/duckdb/src/include) set(POSTGRES_DIR "${DEFAULT_PARENT_DIR}/../postgres") +# Build fingerprint: git hash + dirty state for hot-reload verification. +# Always recomputed at configure time so the .so reflects the current source. +# -c safe.directory=* is needed because the Docker container runs as root +# but the bind-mounted /deeplake is owned by the host user. +execute_process( + COMMAND git -c safe.directory=* rev-parse --short=12 HEAD + WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR} + OUTPUT_VARIABLE _GIT_HASH + OUTPUT_STRIP_TRAILING_WHITESPACE + ERROR_QUIET + RESULT_VARIABLE _GIT_RESULT) +if(_GIT_RESULT EQUAL 0) + execute_process( + COMMAND git -c safe.directory=* diff --quiet + WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR} + RESULT_VARIABLE _GIT_DIRTY) + if(_GIT_DIRTY) + set(PG_DEEPLAKE_BUILD_HASH "${_GIT_HASH}-dirty") + else() + set(PG_DEEPLAKE_BUILD_HASH "${_GIT_HASH}") + endif() +else() + set(PG_DEEPLAKE_BUILD_HASH "unknown") +endif() +message(STATUS "PG_DEEPLAKE_BUILD_HASH: ${PG_DEEPLAKE_BUILD_HASH}") + foreach(PG_VERSION ${PG_VERSIONS}) set(PG_LIB "pg_deeplake_${PG_VERSION}") message(STATUS "Creating library ${PG_LIB} with sources: ${PG_SOURCES}") ADD_LIBRARY(${PG_LIB} SHARED ${PG_SOURCES}) + # Embed build fingerprint for hot-reload verification. + # Isolated in a generated .cpp so hash changes only recompile one file + # instead of every source file in the target (configure_file only + # rewrites when content actually changes, so same-hash = zero work). + configure_file( + "${CMAKE_CURRENT_SOURCE_DIR}/deeplake_pg/build_info.cpp.in" + "${CMAKE_CURRENT_BINARY_DIR}/build_info_${PG_VERSION}.cpp" + @ONLY) + target_sources(${PG_LIB} PRIVATE + "${CMAKE_CURRENT_BINARY_DIR}/build_info_${PG_VERSION}.cpp") + set(PG_TARGET_NAME "configure_postgres_REL_${PG_VERSION}_0") if(TARGET ${PG_TARGET_NAME}) @@ -151,8 +188,14 @@ foreach(PG_VERSION ${PG_VERSIONS}) DESTINATION ${PG_SHAREDIR}/extension ) + # Symlink instead of copy: always points at the build output, zero I/O, + # and survives incremental builds where the target is up-to-date (the + # existing symlink still resolves to the current .so). add_custom_command(TARGET ${PG_LIB} POST_BUILD - COMMAND ${CMAKE_COMMAND} -E copy ${PG_LIB}${CMAKE_SHARED_LIBRARY_SUFFIX} "${POSTGRES_DIR}/" - COMMENT "Copied ${CMAKE_BINARY_DIR}/${PG_LIB}${CMAKE_SHARED_LIBRARY_SUFFIX} to ${POSTGRES_DIR}/" + COMMAND ${CMAKE_COMMAND} -E rm -f "${POSTGRES_DIR}/${PG_LIB}${CMAKE_SHARED_LIBRARY_SUFFIX}" + COMMAND ${CMAKE_COMMAND} -E create_symlink + "$" + "${POSTGRES_DIR}/${PG_LIB}${CMAKE_SHARED_LIBRARY_SUFFIX}" + COMMENT "Symlinked ${POSTGRES_DIR}/${PG_LIB}${CMAKE_SHARED_LIBRARY_SUFFIX} -> $" ) endforeach() diff --git a/cpp/bifrost/column_streamer.hpp b/cpp/bifrost/column_streamer.hpp index 4fb1379501..f2fa919e8e 100644 --- a/cpp/bifrost/column_streamer.hpp +++ b/cpp/bifrost/column_streamer.hpp @@ -35,19 +35,13 @@ class column_streamer return b.columns()[0].array(); } - /** - * @brief Returns the async promise for the next batch (for parallel warming). - */ - async::promise next_batch_async() - { - return prefetcher_.next_batch_async().then([](deeplake_core::batch b) { - return b.columns()[0].array(); - }); - } - /** * @brief Pre-fetch and cache the first batch for cold run optimization. * @param timeout_ms Maximum time to wait in milliseconds. + * + * This method waits for the first batch to be downloaded and cached + * internally. Subsequent calls to next_batch() will return immediately + * for the first batch. */ void ensure_first_batch_ready(int64_t timeout_ms = 30000) { diff --git a/cpp/cmake/modules/FindPostgres.cmake b/cpp/cmake/modules/FindPostgres.cmake index 62531f6250..815da0b209 100644 --- a/cpp/cmake/modules/FindPostgres.cmake +++ b/cpp/cmake/modules/FindPostgres.cmake @@ -1,21 +1,28 @@ include(FetchContent) include(ExternalProject) -# Define PostgreSQL versions -set(postgres_versions - "REL_16_0" - "REL_17_0" - "REL_18_0" -) - -# Define corresponding SHA256 checksums for each version -set(postgres_SHA256_CHECKSUMS - "37851d1fdae1f2cdd1d23bf9a4598b6c2f3f6792e18bc974d78ed780a28933bf" - "16912fe4aef3c8f297b5da1b591741f132377c8b5e1b8e896e07fdd680d6bf34" - "b155bd4a467b401ebe61b504643492aae2d0836981aa4a5a60f8668b94eadebc" -) - -# Loop through each PostgreSQL version +# Map BUILD_PG_* options to version tags and SHA256 checksums. +# Only download/build PostgreSQL versions that are actually enabled, +# avoiding unnecessary network downloads and compilation. +set(postgres_versions) +set(postgres_SHA256_CHECKSUMS) + +if(BUILD_PG_16) + list(APPEND postgres_versions "REL_16_0") + list(APPEND postgres_SHA256_CHECKSUMS "37851d1fdae1f2cdd1d23bf9a4598b6c2f3f6792e18bc974d78ed780a28933bf") +endif() + +if(BUILD_PG_17) + list(APPEND postgres_versions "REL_17_0") + list(APPEND postgres_SHA256_CHECKSUMS "16912fe4aef3c8f297b5da1b591741f132377c8b5e1b8e896e07fdd680d6bf34") +endif() + +if(BUILD_PG_18) + list(APPEND postgres_versions "REL_18_0") + list(APPEND postgres_SHA256_CHECKSUMS "b155bd4a467b401ebe61b504643492aae2d0836981aa4a5a60f8668b94eadebc") +endif() + +# Loop through each enabled PostgreSQL version foreach(postgres_version IN LISTS postgres_versions) # Find the index of the current version list(FIND postgres_versions ${postgres_version} postgres_index) diff --git a/cpp/deeplake_pg/deeplake_executor.cpp b/cpp/deeplake_pg/deeplake_executor.cpp index 3f7834ca8b..8ac9bbc4b5 100644 --- a/cpp/deeplake_pg/deeplake_executor.cpp +++ b/cpp/deeplake_pg/deeplake_executor.cpp @@ -89,6 +89,7 @@ void analyze_plan(PlannedStmt* plan) // Warm all streamers in parallel for cold run optimization if (pg::eager_batch_prefetch) { table_data->get_streamers().warm_all_streamers(); + base::log_info(base::log_channel::generic, "Eager batch prefetch completed for deeplake_executor"); } } pg::query_info::current().set_all_tables_are_deeplake(all_tables_are_deeplake); diff --git a/cpp/deeplake_pg/extension_init.cpp b/cpp/deeplake_pg/extension_init.cpp index 09895661e9..1f65118b79 100644 --- a/cpp/deeplake_pg/extension_init.cpp +++ b/cpp/deeplake_pg/extension_init.cpp @@ -8,6 +8,11 @@ extern "C" { #include +// Build fingerprint for hot-reload verification. +// Defined in the generated build_info_.cpp (see build_info.cpp.in). +// Isolated there so that hash changes only recompile one file. +// Extract with: strings pg_deeplake_18.so | grep PG_DEEPLAKE_BUILD: + #include #include #include diff --git a/cpp/deeplake_pg/table_data.hpp b/cpp/deeplake_pg/table_data.hpp index d0a4e7eb40..fd6fe1a097 100644 --- a/cpp/deeplake_pg/table_data.hpp +++ b/cpp/deeplake_pg/table_data.hpp @@ -130,13 +130,11 @@ struct table_data std::vector column_to_batches; std::vector> streamers; - icm::vector> first_batch_cache_; inline void reset() noexcept { column_to_batches.clear(); streamers.clear(); - first_batch_cache_.clear(); } /** diff --git a/cpp/deeplake_pg/table_data_impl.hpp b/cpp/deeplake_pg/table_data_impl.hpp index bda94a4b69..cc4d4c07e3 100644 --- a/cpp/deeplake_pg/table_data_impl.hpp +++ b/cpp/deeplake_pg/table_data_impl.hpp @@ -581,63 +581,29 @@ inline void table_data::create_streamer(int32_t idx, int32_t worker_id) inline void table_data::streamer_info::warm_all_streamers() { - // Pre-warm all streamers in parallel using async::combine() for maximum parallelism. - // This triggers concurrent downloads for all first batches, significantly reducing - // cold run latency compared to sequential warming. - - // Early exit if no streamers - if (streamers.empty()) { - return; - } - - first_batch_cache_.resize(streamers.size()); - - icm::vector indices; - icm::vector> promises; + // Pre-warm all streamers by calling ensure_first_batch_ready() on each. + // This uses the async_prefetcher's internal caching mechanism which is + // properly coordinated with next_batch() to avoid race conditions. + // + // Note: This is sequential, not parallel. The async_prefetcher already + // does background prefetching, so calling ensure_first_batch_ready() + // just waits for the already-in-progress first batch to complete. for (size_t i = 0; i < streamers.size(); ++i) { if (streamers[i] && !streamers[i]->empty()) { try { - auto promise = streamers[i]->next_batch_async(); - // Only add promises that are valid (not error promises) - promises.push_back(std::move(promise)); - indices.push_back(i); + streamers[i]->ensure_first_batch_ready(); + } catch (const bifrost::stop_iteration&) { + // Expected when there are no batches - continue with next streamer } catch (const std::exception& e) { base::log_warning(base::log_channel::async, - "warm_all_streamers: streamer {} failed to start async: {}", i, e.what()); + "warm_all_streamers: streamer {} failed: {}", i, e.what()); } catch (...) { base::log_warning(base::log_channel::async, "warm_all_streamers: streamer {} failed with unknown exception", i); } } } - - if (promises.empty()) { - return; - } - - try { - // Wait for all promises in parallel - auto results = async::combine(std::move(promises)).get_future().get(); - - // Store results in first_batch_cache_ - for (size_t j = 0; j < indices.size(); ++j) { - first_batch_cache_[indices[j]] = std::move(results[j]); - } - } catch (const bifrost::stop_iteration&) { - // Expected when there are no more batches - silently ignore - base::log_debug(base::log_channel::async, "warm_all_streamers: no batches to warm (stop_iteration)"); - first_batch_cache_.clear(); - } catch (const std::exception& e) { - base::log_warning(base::log_channel::async, - "warm_all_streamers: async::combine failed: {}", e.what()); - // Clear cache on failure - subsequent next_batch() will retry normally - first_batch_cache_.clear(); - } catch (...) { - base::log_warning(base::log_channel::async, - "warm_all_streamers: async::combine failed with unknown exception"); - first_batch_cache_.clear(); - } } inline nd::array table_data::streamer_info::get_sample(int32_t column_number, int64_t row_number) @@ -648,18 +614,6 @@ inline nd::array table_data::streamer_info::get_sample(int32_t column_number, in auto& col_data = column_to_batches[column_number]; auto& batch = col_data.batches[batch_index]; - // Check first_batch_cache_ for batch_index 0 - if (batch_index == 0 && !first_batch_cache_.empty() && - static_cast(column_number) < first_batch_cache_.size() && - first_batch_cache_[column_number].has_value()) { - std::lock_guard lock(col_data.mutex_); - if (!batch.initialized_.load(std::memory_order_relaxed)) { - batch.owner_ = std::move(*first_batch_cache_[column_number]); - first_batch_cache_[column_number].reset(); - batch.initialized_.store(true, std::memory_order_release); - } - } - if (!batch.initialized_.load(std::memory_order_acquire)) [[unlikely]] { std::lock_guard lock(col_data.mutex_); for (int64_t i = 0; i <= batch_index; ++i) { @@ -687,19 +641,6 @@ inline const T* table_data::streamer_info::value_ptr(int32_t column_number, int6 auto& col_data = column_to_batches[column_number]; auto& batch = col_data.batches[batch_index]; - // Check first_batch_cache_ for batch_index 0 - if (batch_index == 0 && !first_batch_cache_.empty() && - static_cast(column_number) < first_batch_cache_.size() && - first_batch_cache_[column_number].has_value()) { - std::lock_guard lock(col_data.mutex_); - if (!batch.initialized_.load(std::memory_order_relaxed)) { - batch.owner_ = utils::eval_with_nones(std::move(*first_batch_cache_[column_number])); - batch.data_ = batch.owner_.data().data(); - first_batch_cache_[column_number].reset(); - batch.initialized_.store(true, std::memory_order_release); - } - } - if (!batch.initialized_.load(std::memory_order_acquire)) [[unlikely]] { std::lock_guard lock(col_data.mutex_); for (int64_t i = 0; i <= batch_index; ++i) { @@ -723,19 +664,6 @@ inline std::string_view table_data::streamer_info::value(int32_t column_number, auto& col_data = column_to_batches[column_number]; auto& batch = col_data.batches[batch_index]; - // Check first_batch_cache_ for batch_index 0 - if (batch_index == 0 && !first_batch_cache_.empty() && - static_cast(column_number) < first_batch_cache_.size() && - first_batch_cache_[column_number].has_value()) { - std::lock_guard lock(col_data.mutex_); - if (!batch.initialized_.load(std::memory_order_relaxed)) { - batch.owner_ = std::move(*first_batch_cache_[column_number]); - batch.holder_ = impl::string_stream_array_holder(batch.owner_); - first_batch_cache_[column_number].reset(); - batch.initialized_.store(true, std::memory_order_release); - } - } - if (!batch.initialized_.load(std::memory_order_acquire)) [[unlikely]] { std::lock_guard lock(col_data.mutex_); for (int64_t i = 0; i <= batch_index; ++i) { From 588903423f6f0b73a4009a0b85eafd27360edc48 Mon Sep 17 00:00:00 2001 From: Davit Buniatyan Date: Sun, 8 Feb 2026 21:57:01 +0000 Subject: [PATCH 06/14] added --- cpp/deeplake_pg/build_info.cpp.in | 8 ++++++++ 1 file changed, 8 insertions(+) create mode 100644 cpp/deeplake_pg/build_info.cpp.in diff --git a/cpp/deeplake_pg/build_info.cpp.in b/cpp/deeplake_pg/build_info.cpp.in new file mode 100644 index 0000000000..ecf48e52d4 --- /dev/null +++ b/cpp/deeplake_pg/build_info.cpp.in @@ -0,0 +1,8 @@ +// Auto-generated at CMake configure time — do not edit manually. +// Isolated so that build-hash changes only recompile this single file +// instead of all pg_deeplake source files (18+). +// configure_file() only rewrites when the content actually changes, +// so an unchanged hash means zero recompilation. + +__attribute__((visibility("default"), used)) +const char pg_deeplake_build_hash[] = "PG_DEEPLAKE_BUILD:@PG_DEEPLAKE_BUILD_HASH@"; From 39df23d9fb95a5a1940c9f8a1bee460a97073139 Mon Sep 17 00:00:00 2001 From: Davit Buniatyan Date: Sun, 8 Feb 2026 22:15:33 +0000 Subject: [PATCH 07/14] startup scripts improved --- cpp/CMakeLists.pg.cmake | 52 +++++++++++++++++++++++++++++++ postgres/scripts/run_pg_server.sh | 17 ++++++++-- 2 files changed, 67 insertions(+), 2 deletions(-) diff --git a/cpp/CMakeLists.pg.cmake b/cpp/CMakeLists.pg.cmake index fa53cf215f..730ae506ee 100644 --- a/cpp/CMakeLists.pg.cmake +++ b/cpp/CMakeLists.pg.cmake @@ -48,6 +48,58 @@ include(${CMAKE_CURRENT_SOURCE_DIR}/cmake/modules/FindDuckDB.cmake) set(CMAKE_PREFIX_PATH ${CMAKE_CURRENT_SOURCE_DIR}/.ext/deeplake_api ${CMAKE_PREFIX_PATH}) find_package(DeepLakeAPI REQUIRED PATHS ${CMAKE_CURRENT_SOURCE_DIR}/.ext/deeplake_api/lib/cmake/deeplake_api NO_DEFAULT_PATH) +# AWS SDK - required by deeplake_api (symbols not bundled in the prebuilt library) +find_package(AWSSDK COMPONENTS core s3 identity-management) +if(AWSSDK_FOUND) + message(STATUS "Found AWS SDK: ${AWSSDK_LIBRARIES}") + list(APPEND DEEPLAKE_STATIC_LINK_LIBS ${AWSSDK_LIBRARIES}) +else() + message(STATUS "AWS SDK not found via find_package, trying manual discovery...") + # Try to find AWS SDK libraries in common vcpkg locations + set(_AWS_SEARCH_PATHS + "$ENV{VCPKG_ROOT}/installed/arm64-linux/lib" + "$ENV{VCPKG_ROOT}/packages/aws-sdk-cpp_arm64-linux/lib" + ) + # Also check for AWS libs in the build's vcpkg_installed + file(GLOB _VCPKG_INSTALLED_DIRS "${CMAKE_BINARY_DIR}/vcpkg_installed/*/lib") + list(APPEND _AWS_SEARCH_PATHS ${_VCPKG_INSTALLED_DIRS}) + + set(_AWS_LIBS + aws-cpp-sdk-s3 + aws-cpp-sdk-core + aws-cpp-sdk-identity-management + aws-cpp-sdk-cognito-identity + aws-cpp-sdk-sts + aws-crt-cpp + aws-c-s3 + aws-c-auth + aws-c-http + aws-c-mqtt + aws-c-event-stream + aws-c-io + aws-c-cal + aws-c-compression + aws-c-sdkutils + aws-c-common + aws-checksums + s2n + ) + set(_FOUND_AWS_LIBS) + foreach(_lib ${_AWS_LIBS}) + find_library(_LIB_${_lib} NAMES ${_lib} PATHS ${_AWS_SEARCH_PATHS} NO_DEFAULT_PATH) + if(_LIB_${_lib}) + list(APPEND _FOUND_AWS_LIBS ${_LIB_${_lib}}) + message(STATUS " Found: ${_LIB_${_lib}}") + endif() + endforeach() + if(_FOUND_AWS_LIBS) + list(APPEND DEEPLAKE_STATIC_LINK_LIBS ${_FOUND_AWS_LIBS}) + message(STATUS "Linked ${CMAKE_LIST_LENGTH(_FOUND_AWS_LIBS)} AWS SDK libraries manually") + else() + message(WARNING "AWS SDK libraries not found. pg_deeplake may fail to load at runtime.") + endif() +endif() + # } include_directories(${DEFAULT_PARENT_DIR}/.ext/duckdb/src/include) diff --git a/postgres/scripts/run_pg_server.sh b/postgres/scripts/run_pg_server.sh index b6fc812218..effaef0b06 100755 --- a/postgres/scripts/run_pg_server.sh +++ b/postgres/scripts/run_pg_server.sh @@ -78,12 +78,25 @@ if [ -d "$POSTGRES_DATA" ]; then fi install_extension -# Initialize database -"$POSTGRES_INSTALL/bin/initdb" -D "$POSTGRES_DATA" -U "$USER" +# Initialize database with 'postgres' superuser (matches Docker POSTGRES_USER=postgres) +"$POSTGRES_INSTALL/bin/initdb" -D "$POSTGRES_DATA" -U postgres echo "shared_preload_libraries = 'pg_deeplake'" >> "$POSTGRES_DATA/postgresql.conf" echo "max_connections = 300" >> "$POSTGRES_DATA/postgresql.conf" echo "shared_buffers = 128MB" >> "$POSTGRES_DATA/postgresql.conf" #echo "log_min_messages = debug1" >> "$POSTGRES_DATA/postgresql.conf" +# Configure pg_hba.conf: use scram-sha-256 for TCP connections (matches Docker behavior) +# Keep trust for local socket connections (for admin convenience) +if [[ "$(uname)" == "Darwin" ]]; then + sed -i '' 's/^\(host.*all.*all.*127\.0\.0\.1\/32\s*\)trust/\1scram-sha-256/' "$POSTGRES_DATA/pg_hba.conf" + sed -i '' 's/^\(host.*all.*all.*::1\/128\s*\)trust/\1scram-sha-256/' "$POSTGRES_DATA/pg_hba.conf" +else + sed -i 's/^\(host.*all.*all.*127\.0\.0\.1\/32\s*\)trust/\1scram-sha-256/' "$POSTGRES_DATA/pg_hba.conf" + sed -i 's/^\(host.*all.*all.*::1\/128\s*\)trust/\1scram-sha-256/' "$POSTGRES_DATA/pg_hba.conf" +fi + # Start PostgreSQL "$POSTGRES_INSTALL/bin/pg_ctl" -D "$POSTGRES_DATA" -l "$TEST_LOGFILE" start + +# Set postgres password (matches Docker POSTGRES_PASSWORD=password) +"$POSTGRES_INSTALL/bin/psql" -U postgres -c "ALTER USER postgres PASSWORD 'password';" From aef73f774f1acee1d6eb8983614d2f240061d78b Mon Sep 17 00:00:00 2001 From: Davit Buniatyan Date: Sun, 8 Feb 2026 23:18:26 +0000 Subject: [PATCH 08/14] build system improvements --- cpp/deeplake_pg/table_data_impl.hpp | 60 +++++++++++++++++++++-------- postgres/scripts/run_pg_server.sh | 4 +- scripts/build_pg_ext.py | 52 ++++++++++++++++++++++--- 3 files changed, 91 insertions(+), 25 deletions(-) diff --git a/cpp/deeplake_pg/table_data_impl.hpp b/cpp/deeplake_pg/table_data_impl.hpp index cc4d4c07e3..11658ed091 100644 --- a/cpp/deeplake_pg/table_data_impl.hpp +++ b/cpp/deeplake_pg/table_data_impl.hpp @@ -11,6 +11,7 @@ #include #include +#include // Inline implementation functions for table_data // This file should be included at the end of table_data.hpp @@ -579,29 +580,54 @@ inline void table_data::create_streamer(int32_t idx, int32_t worker_id) streamers_.column_to_batches[idx].batches.resize(batch_count); } +namespace detail { + +// SFINAE check: does column_streamer have ensure_first_batch_ready()? +// This allows warm_all_streamers to compile against both the released API +// (which lacks the method) and the local dev API (which has it). +template +struct has_ensure_first_batch_ready : std::false_type {}; + +template +struct has_ensure_first_batch_ready().ensure_first_batch_ready())>> : std::true_type {}; + +template +inline void try_warm_streamer([[maybe_unused]] Streamer& s, [[maybe_unused]] size_t idx) +{ + if constexpr (has_ensure_first_batch_ready::value) { + try { + s.ensure_first_batch_ready(); + } catch (const bifrost::stop_iteration&) { + // Expected when there are no batches + } catch (const std::exception& e) { + base::log_warning(base::log_channel::async, + "warm_all_streamers: streamer {} failed: {}", idx, e.what()); + } catch (...) { + base::log_warning(base::log_channel::async, + "warm_all_streamers: streamer {} failed with unknown exception", idx); + } + } + // If the method doesn't exist, this is a no-op. + // The async_prefetcher background prefetch is still active. +} + +} // namespace detail + inline void table_data::streamer_info::warm_all_streamers() { - // Pre-warm all streamers by calling ensure_first_batch_ready() on each. - // This uses the async_prefetcher's internal caching mechanism which is - // properly coordinated with next_batch() to avoid race conditions. + // Pre-warm all streamers by calling ensure_first_batch_ready() on each + // (when available). This uses the async_prefetcher's internal caching + // mechanism which is properly coordinated with next_batch() to avoid + // race conditions. // - // Note: This is sequential, not parallel. The async_prefetcher already - // does background prefetching, so calling ensure_first_batch_ready() - // just waits for the already-in-progress first batch to complete. + // When ensure_first_batch_ready() is not available (released API < 4.6), + // this is a no-op -- the async_prefetcher already starts background + // prefetching in the column_streamer constructor. for (size_t i = 0; i < streamers.size(); ++i) { if (streamers[i] && !streamers[i]->empty()) { - try { - streamers[i]->ensure_first_batch_ready(); - } catch (const bifrost::stop_iteration&) { - // Expected when there are no batches - continue with next streamer - } catch (const std::exception& e) { - base::log_warning(base::log_channel::async, - "warm_all_streamers: streamer {} failed: {}", i, e.what()); - } catch (...) { - base::log_warning(base::log_channel::async, - "warm_all_streamers: streamer {} failed with unknown exception", i); - } + detail::try_warm_streamer(*streamers[i], i); } } } diff --git a/postgres/scripts/run_pg_server.sh b/postgres/scripts/run_pg_server.sh index effaef0b06..e35bf5ea2d 100755 --- a/postgres/scripts/run_pg_server.sh +++ b/postgres/scripts/run_pg_server.sh @@ -95,8 +95,8 @@ else sed -i 's/^\(host.*all.*all.*::1\/128\s*\)trust/\1scram-sha-256/' "$POSTGRES_DATA/pg_hba.conf" fi -# Start PostgreSQL -"$POSTGRES_INSTALL/bin/pg_ctl" -D "$POSTGRES_DATA" -l "$TEST_LOGFILE" start +# Start PostgreSQL (extended timeout for shared_preload_libraries loading) +"$POSTGRES_INSTALL/bin/pg_ctl" -D "$POSTGRES_DATA" -l "$TEST_LOGFILE" -t 120 start # Set postgres password (matches Docker POSTGRES_PASSWORD=password) "$POSTGRES_INSTALL/bin/psql" -U postgres -c "ALTER USER postgres PASSWORD 'password';" diff --git a/scripts/build_pg_ext.py b/scripts/build_pg_ext.py index 01e2e62198..c770d55a1d 100644 --- a/scripts/build_pg_ext.py +++ b/scripts/build_pg_ext.py @@ -1,4 +1,3 @@ -import distutils.sysconfig as sysconfig import json import os import sys @@ -14,6 +13,7 @@ Usage: python3 scripts/build_pg_ext.py dev --pg-versions 16,17,18 #Build for PostgreSQL 16, 17, and 18 Usage: python3 scripts/build_pg_ext.py dev --pg-versions 16 #Build for PostgreSQL 16 only Usage: python3 scripts/build_pg_ext.py prod --pg-versions all #Build for all supported PostgreSQL versions +Usage: python3 scripts/build_pg_ext.py dev --local-api /path/to/package #Use local deeplake API package instead of downloading """ def get_pinned_version(): @@ -114,7 +114,38 @@ def download_api_lib(api_root_dir, overwrite=True): print(f"Successfully installed deeplake API library version {version}") -def run(mode: str, incremental: bool, deeplake_link_type: str = None, pg_versions: list = None): + +def install_local_api_lib(api_root_dir, local_path): + """ + Install the deeplake API library from a local package directory (e.g. from an indra build). + This copies include/, lib/, and cmake/ from the local package into .ext/deeplake_api/. + """ + if not os.path.isdir(local_path): + raise Exception(f"Local API path does not exist: {local_path}") + + # Verify the local package has the expected structure + local_lib = os.path.join(local_path, "lib") + local_include = os.path.join(local_path, "include") + if not os.path.isdir(local_lib) or not os.path.isdir(local_include): + raise Exception(f"Local API path missing lib/ or include/ directories: {local_path}") + + os.makedirs(api_root_dir, exist_ok=True) + + print(f"Installing local deeplake API from {local_path} ...") + + # Remove existing and copy from local + err = os.system(f"rm -rf {api_root_dir}/*") + if err: + raise Exception(f"Failed to clean {api_root_dir}. Command exited with code {err}.") + + err = os.system(f"cp -r {local_path}/* {api_root_dir}/") + if err: + raise Exception(f"Failed to copy local API library. Command exited with code {err}.") + + print(f"Successfully installed local deeplake API from {local_path}") + + +def run(mode: str, incremental: bool, deeplake_link_type: str = None, pg_versions: list = None, local_api_path: str = None): modes = ["debug", "dev", "prod"] if mode not in modes: raise Exception(f"Invalid mode - '{mode}'. Possible values - {', '.join(modes)}") @@ -128,9 +159,12 @@ def run(mode: str, incremental: bool, deeplake_link_type: str = None, pg_version try: if not incremental: - # Download DeepLake API library before CMake configuration + # Install DeepLake API library before CMake configuration # (CMake's find_package needs it to exist during configuration) - download_api_lib(".ext/deeplake_api") + if local_api_path: + install_local_api_lib(".ext/deeplake_api", local_api_path) + else: + download_api_lib(".ext/deeplake_api") cmake_cmd = (f"cmake " f"{preset} ") @@ -221,6 +255,7 @@ def write_mode(mode: str): mode = sys.argv[1] deeplake_link_type = None pg_versions = None + local_api_path = None # Parse optional flags i = 2 @@ -232,6 +267,11 @@ def write_mode(mode: str): elif arg == "--deeplake-static": deeplake_link_type = "static" i += 1 + elif arg == "--local-api": + if i + 1 >= len(sys.argv): + raise Exception("--local-api requires a path to the local deeplake API package directory") + local_api_path = os.path.abspath(sys.argv[i + 1]) + i += 2 elif arg == "--pg-versions": if i + 1 >= len(sys.argv): raise Exception("--pg-versions requires a value (e.g., '16,17,18' or 'all')") @@ -250,6 +290,6 @@ def write_mode(mode: str): raise Exception(f"Invalid --pg-versions format: '{versions_str}'. Use comma-separated numbers (e.g., '16,17,18') or 'all'") i += 2 else: - raise Exception(f"Invalid option '{arg}'. Use --deeplake-shared, --deeplake-static, or --pg-versions") + raise Exception(f"Invalid option '{arg}'. Use --deeplake-shared, --deeplake-static, --local-api, or --pg-versions") - run(mode=mode, incremental=False, deeplake_link_type=deeplake_link_type, pg_versions=pg_versions) + run(mode=mode, incremental=False, deeplake_link_type=deeplake_link_type, pg_versions=pg_versions, local_api_path=local_api_path) From b176cd6978f4ccd68c7bc18659850df7289b4981 Mon Sep 17 00:00:00 2001 From: Davit Buniatyan Date: Mon, 9 Feb 2026 00:10:31 +0000 Subject: [PATCH 09/14] table data impl --- cpp/deeplake_pg/table_data_impl.hpp | 31 ++++++++++++++++++++++++++++- 1 file changed, 30 insertions(+), 1 deletion(-) diff --git a/cpp/deeplake_pg/table_data_impl.hpp b/cpp/deeplake_pg/table_data_impl.hpp index 11658ed091..ac907e871d 100644 --- a/cpp/deeplake_pg/table_data_impl.hpp +++ b/cpp/deeplake_pg/table_data_impl.hpp @@ -624,10 +624,39 @@ inline void table_data::streamer_info::warm_all_streamers() // When ensure_first_batch_ready() is not available (released API < 4.6), // this is a no-op -- the async_prefetcher already starts background // prefetching in the column_streamer constructor. + // + // CRITICAL: We run all warmings in PARALLEL to overlap network I/O + // for the first batch of each column. This is essential for cold run + // performance optimization. + // Collect indices of valid streamers to warm + std::vector indices; for (size_t i = 0; i < streamers.size(); ++i) { if (streamers[i] && !streamers[i]->empty()) { - detail::try_warm_streamer(*streamers[i], i); + indices.push_back(i); + } + } + + if (indices.empty()) { + return; + } + + // Spawn threads to warm all streamers in parallel + // Each thread waits for its first batch to be ready (blocking until download complete) + std::vector threads; + threads.reserve(indices.size()); + + for (size_t idx : indices) { + threads.emplace_back([this, idx]() { + detail::try_warm_streamer(*streamers[idx], idx); + }); + } + + // Wait for all warming threads to complete + // This ensures all first batches are downloaded before query execution starts + for (auto& t : threads) { + if (t.joinable()) { + t.join(); } } } From 293323d5bb559562a334d017bfef1e1ac4605b37 Mon Sep 17 00:00:00 2001 From: Davit Buniatyan Date: Mon, 9 Feb 2026 01:18:02 +0000 Subject: [PATCH 10/14] improvements --- cpp/deeplake_pg/duckdb_deeplake_scan.cpp | 16 +++++ cpp/deeplake_pg/table_data.hpp | 14 ++++ cpp/deeplake_pg/table_data_impl.hpp | 86 +++++++++++++++++++++++- 3 files changed, 113 insertions(+), 3 deletions(-) diff --git a/cpp/deeplake_pg/duckdb_deeplake_scan.cpp b/cpp/deeplake_pg/duckdb_deeplake_scan.cpp index 69a44f781c..a91066777b 100644 --- a/cpp/deeplake_pg/duckdb_deeplake_scan.cpp +++ b/cpp/deeplake_pg/duckdb_deeplake_scan.cpp @@ -955,6 +955,22 @@ class deeplake_scan_function_helper } ASSERT(output_.ColumnCount() == global_state_.column_ids.size()); + + // Pre-trigger parallel batch initialization for all streaming columns. + // Without this, each column's batch download would block sequentially, + // serializing I/O waits. This overlaps all column batch downloads. + if (!has_index_search() && pg::eager_batch_prefetch) { + std::vector streaming_cols; + for (unsigned i = 0; i < global_state_.column_ids.size(); ++i) { + const auto col_idx = global_state_.column_ids[i]; + if (bind_data_.table_data.is_column_requested(col_idx) && + bind_data_.table_data.column_has_streamer(col_idx)) { + streaming_cols.push_back(col_idx); + } + } + bind_data_.table_data.get_streamers().prefetch_batches_for_row(streaming_cols, current_row); + } + icm::vector> column_promises; // Fill output vectors column by column using table_data streamers for (unsigned i = 0; i < global_state_.column_ids.size(); ++i) { diff --git a/cpp/deeplake_pg/table_data.hpp b/cpp/deeplake_pg/table_data.hpp index fd6fe1a097..02418b4cee 100644 --- a/cpp/deeplake_pg/table_data.hpp +++ b/cpp/deeplake_pg/table_data.hpp @@ -117,11 +117,14 @@ struct table_data { std::mutex mutex_; std::vector batches; + /// Pre-fetched raw batch from parallel prefetch. Protected by mutex_. + std::optional prefetched_raw_batch_; column_data() = default; column_data(const column_data&) = delete; column_data(column_data&& other) noexcept : batches(std::move(other.batches)) + , prefetched_raw_batch_(std::move(other.prefetched_raw_batch_)) { } column_data& operator=(const column_data&) = delete; @@ -147,6 +150,17 @@ struct table_data */ inline void warm_all_streamers(); + /** + * @brief Pre-initialize batches for all given columns at the specified row in parallel. + * + * For a cold run, batch initialization blocks on I/O. Without this method, + * the scan processes columns sequentially, serializing I/O waits. + * This method triggers batch downloads for all columns that need it concurrently, + * then waits for all to complete, so the subsequent sequential column processing + * finds all batches already initialized. + */ + inline void prefetch_batches_for_row(const std::vector& column_indices, int64_t row_number); + inline nd::array get_sample(int32_t column_number, int64_t row_number); template diff --git a/cpp/deeplake_pg/table_data_impl.hpp b/cpp/deeplake_pg/table_data_impl.hpp index ac907e871d..c77400e15d 100644 --- a/cpp/deeplake_pg/table_data_impl.hpp +++ b/cpp/deeplake_pg/table_data_impl.hpp @@ -661,6 +661,66 @@ inline void table_data::streamer_info::warm_all_streamers() } } +inline void table_data::streamer_info::prefetch_batches_for_row( + const std::vector& column_indices, int64_t row_number) +{ + const int64_t batch_index = row_number >> batch_size_log2_; + + // Fast check: find columns that need batch initialization + std::vector need_init; + for (int32_t col : column_indices) { + if (col < 0 || static_cast(col) >= column_to_batches.size()) continue; + auto& col_data = column_to_batches[col]; + if (static_cast(batch_index) < col_data.batches.size() && + !col_data.batches[batch_index].initialized_.load(std::memory_order_acquire)) { + need_init.push_back(col); + } + } + + if (need_init.size() <= 1) { + // 0 or 1 columns need init - no benefit from parallelism + return; + } + + // Multiple columns need batch initialization: call next_batch() for each in parallel. + // This overlaps the I/O waits across all columns. The raw batch results are stored + // in prefetched_raw_batch_ and consumed by the type-specific value_ptr/value/get_sample + // methods during the subsequent sequential column processing. + std::vector threads; + threads.reserve(need_init.size()); + + for (int32_t col : need_init) { + threads.emplace_back([this, col, batch_index]() { + try { + auto& col_data = column_to_batches[col]; + // Fetch all uninitialized batches up to batch_index for this column + std::lock_guard lock(col_data.mutex_); + for (int64_t i = 0; i <= batch_index; ++i) { + if (!col_data.batches[i].initialized_.load(std::memory_order_relaxed)) { + // Only pre-fetch one batch (the next one from the streamer). + // Store it as a raw batch for the type-specific code to consume. + if (!col_data.prefetched_raw_batch_.has_value()) { + col_data.prefetched_raw_batch_ = streamers[col]->next_batch(); + } + break; // Only need to pre-fetch the next pending batch + } + } + } catch (const bifrost::stop_iteration&) { + // Expected when streamer is exhausted + } catch (const std::exception& e) { + base::log_warning(base::log_channel::async, + "prefetch_batches_for_row: column {} failed: {}", col, e.what()); + } + }); + } + + for (auto& t : threads) { + if (t.joinable()) { + t.join(); + } + } +} + inline nd::array table_data::streamer_info::get_sample(int32_t column_number, int64_t row_number) { const int64_t batch_index = row_number >> batch_size_log2_; @@ -673,7 +733,13 @@ inline nd::array table_data::streamer_info::get_sample(int32_t column_number, in std::lock_guard lock(col_data.mutex_); for (int64_t i = 0; i <= batch_index; ++i) { if (!col_data.batches[i].initialized_.load(std::memory_order_relaxed)) { - col_data.batches[i].owner_ = streamers[column_number]->next_batch(); + // Use pre-fetched batch if available, otherwise fetch synchronously + if (col_data.prefetched_raw_batch_.has_value()) { + col_data.batches[i].owner_ = std::move(*col_data.prefetched_raw_batch_); + col_data.prefetched_raw_batch_.reset(); + } else { + col_data.batches[i].owner_ = streamers[column_number]->next_batch(); + } col_data.batches[i].initialized_.store(true, std::memory_order_release); } } @@ -700,7 +766,15 @@ inline const T* table_data::streamer_info::value_ptr(int32_t column_number, int6 std::lock_guard lock(col_data.mutex_); for (int64_t i = 0; i <= batch_index; ++i) { if (!col_data.batches[i].initialized_.load(std::memory_order_relaxed)) { - col_data.batches[i].owner_ = utils::eval_with_nones(streamers[column_number]->next_batch()); + // Use pre-fetched batch if available, otherwise fetch synchronously + nd::array raw_batch; + if (col_data.prefetched_raw_batch_.has_value()) { + raw_batch = std::move(*col_data.prefetched_raw_batch_); + col_data.prefetched_raw_batch_.reset(); + } else { + raw_batch = streamers[column_number]->next_batch(); + } + col_data.batches[i].owner_ = utils::eval_with_nones(std::move(raw_batch)); col_data.batches[i].data_ = col_data.batches[i].owner_.data().data(); col_data.batches[i].initialized_.store(true, std::memory_order_release); } @@ -723,7 +797,13 @@ inline std::string_view table_data::streamer_info::value(int32_t column_number, std::lock_guard lock(col_data.mutex_); for (int64_t i = 0; i <= batch_index; ++i) { if (!col_data.batches[i].initialized_.load(std::memory_order_relaxed)) { - col_data.batches[i].owner_ = streamers[column_number]->next_batch(); + // Use pre-fetched batch if available, otherwise fetch synchronously + if (col_data.prefetched_raw_batch_.has_value()) { + col_data.batches[i].owner_ = std::move(*col_data.prefetched_raw_batch_); + col_data.prefetched_raw_batch_.reset(); + } else { + col_data.batches[i].owner_ = streamers[column_number]->next_batch(); + } col_data.batches[i].holder_ = impl::string_stream_array_holder(col_data.batches[i].owner_); col_data.batches[i].initialized_.store(true, std::memory_order_release); } From 1de810cc0ed815b3cbcb9a95128a7779d8006bbe Mon Sep 17 00:00:00 2001 From: Davit Buniatyan Date: Mon, 9 Feb 2026 06:35:54 +0000 Subject: [PATCH 11/14] updates --- cpp/deeplake_pg/table_data.hpp | 8 +-- cpp/deeplake_pg/table_data_impl.hpp | 81 +++++++++++++++++++---------- scripts/build_pg_ext.py | 10 +++- 3 files changed, 67 insertions(+), 32 deletions(-) diff --git a/cpp/deeplake_pg/table_data.hpp b/cpp/deeplake_pg/table_data.hpp index 02418b4cee..0ba749fd20 100644 --- a/cpp/deeplake_pg/table_data.hpp +++ b/cpp/deeplake_pg/table_data.hpp @@ -117,14 +117,16 @@ struct table_data { std::mutex mutex_; std::vector batches; - /// Pre-fetched raw batch from parallel prefetch. Protected by mutex_. - std::optional prefetched_raw_batch_; + /// Pre-fetched raw batches from parallel prefetch. Protected by mutex_. + /// warm_all_streamers() drains the streamer into this deque; + /// value_ptr/get_sample/value consume from the front. + std::deque prefetched_raw_batches_; column_data() = default; column_data(const column_data&) = delete; column_data(column_data&& other) noexcept : batches(std::move(other.batches)) - , prefetched_raw_batch_(std::move(other.prefetched_raw_batch_)) + , prefetched_raw_batches_(std::move(other.prefetched_raw_batches_)) { } column_data& operator=(const column_data&) = delete; diff --git a/cpp/deeplake_pg/table_data_impl.hpp b/cpp/deeplake_pg/table_data_impl.hpp index c77400e15d..3c84936c74 100644 --- a/cpp/deeplake_pg/table_data_impl.hpp +++ b/cpp/deeplake_pg/table_data_impl.hpp @@ -616,18 +616,15 @@ inline void try_warm_streamer([[maybe_unused]] Streamer& s, [[maybe_unused]] siz inline void table_data::streamer_info::warm_all_streamers() { - // Pre-warm all streamers by calling ensure_first_batch_ready() on each - // (when available). This uses the async_prefetcher's internal caching - // mechanism which is properly coordinated with next_batch() to avoid - // race conditions. + // Full pre-warming: download ALL batches for ALL columns in parallel. + // This eliminates cold-run latency by pre-populating the raw batch cache + // before query execution starts. // - // When ensure_first_batch_ready() is not available (released API < 4.6), - // this is a no-op -- the async_prefetcher already starts background - // prefetching in the column_streamer constructor. - // - // CRITICAL: We run all warmings in PARALLEL to overlap network I/O - // for the first batch of each column. This is essential for cold run - // performance optimization. + // Each column's async_prefetcher has already queued all batch downloads + // in its start() method. Here we drain those queues in parallel across + // columns, storing the raw nd::array results in prefetched_raw_batches_. + // The type-specific initialization (eval_with_nones, string_stream_array_holder) + // happens lazily when the batch is first accessed by value_ptr/get_sample/value. // Collect indices of valid streamers to warm std::vector indices; @@ -641,19 +638,47 @@ inline void table_data::streamer_info::warm_all_streamers() return; } - // Spawn threads to warm all streamers in parallel - // Each thread waits for its first batch to be ready (blocking until download complete) + // Spawn threads to pre-download ALL batches for each column in parallel. + // Each thread drains the column's async_prefetcher, storing raw batches + // in prefetched_raw_batches_ for later consumption by value_ptr/get_sample/value. std::vector threads; threads.reserve(indices.size()); for (size_t idx : indices) { threads.emplace_back([this, idx]() { - detail::try_warm_streamer(*streamers[idx], idx); + try { + auto& col_data = column_to_batches[idx]; + const size_t num_batches = col_data.batches.size(); + // Count how many batches are already initialized + size_t batches_to_fetch = 0; + for (size_t i = 0; i < num_batches; ++i) { + if (!col_data.batches[i].initialized_.load(std::memory_order_relaxed)) { + ++batches_to_fetch; + } + } + if (batches_to_fetch == 0) { + return; + } + // Drain all pending batches from the streamer into the prefetch queue + std::lock_guard lock(col_data.mutex_); + for (size_t i = 0; i < batches_to_fetch; ++i) { + try { + col_data.prefetched_raw_batches_.push_back(streamers[idx]->next_batch()); + } catch (const bifrost::stop_iteration&) { + break; // No more batches + } + } + } catch (const std::exception& e) { + base::log_warning(base::log_channel::async, + "warm_all_streamers: column {} failed: {}", idx, e.what()); + } catch (...) { + base::log_warning(base::log_channel::async, + "warm_all_streamers: column {} failed with unknown exception", idx); + } }); } - // Wait for all warming threads to complete - // This ensures all first batches are downloaded before query execution starts + // Wait for all columns to finish downloading for (auto& t : threads) { if (t.joinable()) { t.join(); @@ -684,7 +709,7 @@ inline void table_data::streamer_info::prefetch_batches_for_row( // Multiple columns need batch initialization: call next_batch() for each in parallel. // This overlaps the I/O waits across all columns. The raw batch results are stored - // in prefetched_raw_batch_ and consumed by the type-specific value_ptr/value/get_sample + // in prefetched_raw_batches_ and consumed by the type-specific value_ptr/value/get_sample // methods during the subsequent sequential column processing. std::vector threads; threads.reserve(need_init.size()); @@ -699,8 +724,8 @@ inline void table_data::streamer_info::prefetch_batches_for_row( if (!col_data.batches[i].initialized_.load(std::memory_order_relaxed)) { // Only pre-fetch one batch (the next one from the streamer). // Store it as a raw batch for the type-specific code to consume. - if (!col_data.prefetched_raw_batch_.has_value()) { - col_data.prefetched_raw_batch_ = streamers[col]->next_batch(); + if (col_data.prefetched_raw_batches_.empty()) { + col_data.prefetched_raw_batches_.push_back(streamers[col]->next_batch()); } break; // Only need to pre-fetch the next pending batch } @@ -734,9 +759,9 @@ inline nd::array table_data::streamer_info::get_sample(int32_t column_number, in for (int64_t i = 0; i <= batch_index; ++i) { if (!col_data.batches[i].initialized_.load(std::memory_order_relaxed)) { // Use pre-fetched batch if available, otherwise fetch synchronously - if (col_data.prefetched_raw_batch_.has_value()) { - col_data.batches[i].owner_ = std::move(*col_data.prefetched_raw_batch_); - col_data.prefetched_raw_batch_.reset(); + if (!col_data.prefetched_raw_batches_.empty()) { + col_data.batches[i].owner_ = std::move(col_data.prefetched_raw_batches_.front()); + col_data.prefetched_raw_batches_.pop_front(); } else { col_data.batches[i].owner_ = streamers[column_number]->next_batch(); } @@ -768,9 +793,9 @@ inline const T* table_data::streamer_info::value_ptr(int32_t column_number, int6 if (!col_data.batches[i].initialized_.load(std::memory_order_relaxed)) { // Use pre-fetched batch if available, otherwise fetch synchronously nd::array raw_batch; - if (col_data.prefetched_raw_batch_.has_value()) { - raw_batch = std::move(*col_data.prefetched_raw_batch_); - col_data.prefetched_raw_batch_.reset(); + if (!col_data.prefetched_raw_batches_.empty()) { + raw_batch = std::move(col_data.prefetched_raw_batches_.front()); + col_data.prefetched_raw_batches_.pop_front(); } else { raw_batch = streamers[column_number]->next_batch(); } @@ -798,9 +823,9 @@ inline std::string_view table_data::streamer_info::value(int32_t column_number, for (int64_t i = 0; i <= batch_index; ++i) { if (!col_data.batches[i].initialized_.load(std::memory_order_relaxed)) { // Use pre-fetched batch if available, otherwise fetch synchronously - if (col_data.prefetched_raw_batch_.has_value()) { - col_data.batches[i].owner_ = std::move(*col_data.prefetched_raw_batch_); - col_data.prefetched_raw_batch_.reset(); + if (!col_data.prefetched_raw_batches_.empty()) { + col_data.batches[i].owner_ = std::move(col_data.prefetched_raw_batches_.front()); + col_data.prefetched_raw_batches_.pop_front(); } else { col_data.batches[i].owner_ = streamers[column_number]->next_batch(); } diff --git a/scripts/build_pg_ext.py b/scripts/build_pg_ext.py index c770d55a1d..8f4cacc605 100644 --- a/scripts/build_pg_ext.py +++ b/scripts/build_pg_ext.py @@ -2,7 +2,12 @@ import os import sys import platform -import requests + +try: + import requests +except ImportError: + os.system("pip install requests --user") + import requests """ Usage: python3 scripts/build_pg_ext.py debug #Debug build @@ -16,6 +21,8 @@ Usage: python3 scripts/build_pg_ext.py dev --local-api /path/to/package #Use local deeplake API package instead of downloading """ + + def get_pinned_version(): """ Read the pinned deeplake API version from DEEPLAKE_API_VERSION file. @@ -77,6 +84,7 @@ def download_api_lib(api_root_dir, overwrite=True): print(f"Downloading prebuilt api libraries from {asset_url} ...") + response = requests.get(asset_url) if response.status_code != 200: raise Exception(f"Failed to download api libraries from {asset_url}. Status code: {response.status_code}") From 1f88eb85d53b66ce88b3e4e31d764c1ff0d9bf79 Mon Sep 17 00:00:00 2001 From: Davit Buniatyan Date: Mon, 9 Feb 2026 08:00:07 +0000 Subject: [PATCH 12/14] further iterations --- cpp/deeplake_pg/deeplake_executor.cpp | 13 ++++++++++--- cpp/deeplake_pg/duckdb_executor.cpp | 21 ++++++++++++++++++++- cpp/deeplake_pg/duckdb_executor.hpp | 4 ++++ cpp/deeplake_pg/table_data.hpp | 10 ++++++++++ cpp/deeplake_pg/table_data_impl.hpp | 11 +++++++++++ 5 files changed, 55 insertions(+), 4 deletions(-) diff --git a/cpp/deeplake_pg/deeplake_executor.cpp b/cpp/deeplake_pg/deeplake_executor.cpp index 8ac9bbc4b5..11e08972e2 100644 --- a/cpp/deeplake_pg/deeplake_executor.cpp +++ b/cpp/deeplake_pg/deeplake_executor.cpp @@ -86,13 +86,20 @@ void analyze_plan(PlannedStmt* plan) } } - // Warm all streamers in parallel for cold run optimization + // Start async warming of streamers for cold run optimization. + // Non-blocking: DuckDB query starts immediately while batches prefetch in background. if (pg::eager_batch_prefetch) { - table_data->get_streamers().warm_all_streamers(); - base::log_info(base::log_channel::generic, "Eager batch prefetch completed for deeplake_executor"); + table_data->get_streamers().warm_all_streamers_async(); } } pg::query_info::current().set_all_tables_are_deeplake(all_tables_are_deeplake); + + // Pre-initialize DuckDB connection while batch prefetching runs in the background. + // This overlaps DuckDB init (DB creation, table registration) with I/O, + // reducing cold run latency. + if (pg::eager_batch_prefetch && pg::query_info::current().is_deeplake_table_referenced()) { + pg::ensure_duckdb_initialized(); + } } } // namespace pg diff --git a/cpp/deeplake_pg/duckdb_executor.cpp b/cpp/deeplake_pg/duckdb_executor.cpp index 071ef7ff1c..cb3359ca8b 100644 --- a/cpp/deeplake_pg/duckdb_executor.cpp +++ b/cpp/deeplake_pg/duckdb_executor.cpp @@ -432,7 +432,7 @@ void* duckdb_result_holder::get_chunk_ptr(size_t chunk_idx) const // Execute SQL query and return DuckDB results using C API // Note: Currently we still use C++ API for execution due to table function requirements // This function converts C++ results to C API format for processing -duckdb_result_holder execute_sql_query_direct(const std::string& query_string) +static std::unique_ptr& get_duckdb_conns() { static std::unique_ptr conns; if (conns == nullptr || !pg::table_storage::instance().is_up_to_date()) { @@ -444,6 +444,25 @@ duckdb_result_holder execute_sql_query_direct(const std::string& query_string) register_views(conns.get()); pg::table_storage::instance().set_up_to_date(true); } + return conns; +} + +} // unnamed namespace + +namespace pg { + +void ensure_duckdb_initialized() +{ + get_duckdb_conns(); +} + +} // namespace pg + +namespace pg { + +duckdb_result_holder execute_sql_query_direct(const std::string& query_string) +{ + auto& conns = get_duckdb_conns(); std::string duckdb_query = pg_to_duckdb_translator::translate(query_string); diff --git a/cpp/deeplake_pg/duckdb_executor.hpp b/cpp/deeplake_pg/duckdb_executor.hpp index b4ff441d23..8a90767dd2 100644 --- a/cpp/deeplake_pg/duckdb_executor.hpp +++ b/cpp/deeplake_pg/duckdb_executor.hpp @@ -54,6 +54,10 @@ struct duckdb_result_holder void* get_chunk_ptr(size_t chunk_idx) const; }; +// Pre-initialize DuckDB connection (for cold run optimization). +// Can be called early (e.g., during analyze_plan) to overlap DuckDB init with batch prefetching. +void ensure_duckdb_initialized(); + // Execute SQL query using DuckDB and return results directly without conversion duckdb_result_holder execute_sql_query_direct(const std::string& query_string); diff --git a/cpp/deeplake_pg/table_data.hpp b/cpp/deeplake_pg/table_data.hpp index 0ba749fd20..cd67aba496 100644 --- a/cpp/deeplake_pg/table_data.hpp +++ b/cpp/deeplake_pg/table_data.hpp @@ -152,6 +152,16 @@ struct table_data */ inline void warm_all_streamers(); + /** + * @brief Start warming all streamers in the background (non-blocking). + * + * Launches warm_all_streamers() in a detached background thread. + * The DuckDB executor can begin query processing immediately while + * batch data is being prefetched. The prefetched batches are stored + * in prefetched_raw_batches_ and consumed by value_ptr/get_sample. + */ + inline void warm_all_streamers_async(); + /** * @brief Pre-initialize batches for all given columns at the specified row in parallel. * diff --git a/cpp/deeplake_pg/table_data_impl.hpp b/cpp/deeplake_pg/table_data_impl.hpp index 3c84936c74..66b910f6da 100644 --- a/cpp/deeplake_pg/table_data_impl.hpp +++ b/cpp/deeplake_pg/table_data_impl.hpp @@ -686,6 +686,17 @@ inline void table_data::streamer_info::warm_all_streamers() } } +inline void table_data::streamer_info::warm_all_streamers_async() +{ + // Launch warm_all_streamers() in a detached background thread. + // This allows the DuckDB executor to start query processing immediately + // while batch data is being prefetched in the background. + // The prefetched data is consumed via prefetched_raw_batches_ in value_ptr/get_sample. + std::thread([this]() { + warm_all_streamers(); + }).detach(); +} + inline void table_data::streamer_info::prefetch_batches_for_row( const std::vector& column_indices, int64_t row_number) { From 3811d7d0d406f71d31ebfc95b54e28f2de8a09a7 Mon Sep 17 00:00:00 2001 From: Davit Buniatyan Date: Mon, 9 Feb 2026 18:22:49 +0000 Subject: [PATCH 13/14] improvements --- cpp/bifrost/column_streamer.hpp | 7 ++ cpp/deeplake_pg/deeplake_executor.cpp | 11 ++- cpp/deeplake_pg/table_data.hpp | 10 +-- cpp/deeplake_pg/table_data_impl.hpp | 116 +++++--------------------- scripts/build_pg_ext.py | 11 ++- 5 files changed, 43 insertions(+), 112 deletions(-) diff --git a/cpp/bifrost/column_streamer.hpp b/cpp/bifrost/column_streamer.hpp index f2fa919e8e..7e8b89a7ad 100644 --- a/cpp/bifrost/column_streamer.hpp +++ b/cpp/bifrost/column_streamer.hpp @@ -35,6 +35,13 @@ class column_streamer return b.columns()[0].array(); } + async::promise next_batch_async() + { + return prefetcher_.next_batch_async().then([](deeplake_core::batch b) { + return b.columns()[0].array(); + }); + } + /** * @brief Pre-fetch and cache the first batch for cold run optimization. * @param timeout_ms Maximum time to wait in milliseconds. diff --git a/cpp/deeplake_pg/deeplake_executor.cpp b/cpp/deeplake_pg/deeplake_executor.cpp index 11e08972e2..3a5d6a70f2 100644 --- a/cpp/deeplake_pg/deeplake_executor.cpp +++ b/cpp/deeplake_pg/deeplake_executor.cpp @@ -86,17 +86,16 @@ void analyze_plan(PlannedStmt* plan) } } - // Start async warming of streamers for cold run optimization. - // Non-blocking: DuckDB query starts immediately while batches prefetch in background. + // Warm first batches for all streamers in parallel for cold run optimization. + // This blocks until all first batches are downloaded but overlaps I/O across columns. if (pg::eager_batch_prefetch) { - table_data->get_streamers().warm_all_streamers_async(); + table_data->get_streamers().warm_all_streamers(); } } pg::query_info::current().set_all_tables_are_deeplake(all_tables_are_deeplake); - // Pre-initialize DuckDB connection while batch prefetching runs in the background. - // This overlaps DuckDB init (DB creation, table registration) with I/O, - // reducing cold run latency. + // Pre-initialize DuckDB connection early so it's ready when query execution starts. + // This reduces cold run latency by front-loading DuckDB init. if (pg::eager_batch_prefetch && pg::query_info::current().is_deeplake_table_referenced()) { pg::ensure_duckdb_initialized(); } diff --git a/cpp/deeplake_pg/table_data.hpp b/cpp/deeplake_pg/table_data.hpp index cd67aba496..bc299affb7 100644 --- a/cpp/deeplake_pg/table_data.hpp +++ b/cpp/deeplake_pg/table_data.hpp @@ -24,6 +24,7 @@ #include #include +#include #include #include @@ -152,15 +153,6 @@ struct table_data */ inline void warm_all_streamers(); - /** - * @brief Start warming all streamers in the background (non-blocking). - * - * Launches warm_all_streamers() in a detached background thread. - * The DuckDB executor can begin query processing immediately while - * batch data is being prefetched. The prefetched batches are stored - * in prefetched_raw_batches_ and consumed by value_ptr/get_sample. - */ - inline void warm_all_streamers_async(); /** * @brief Pre-initialize batches for all given columns at the specified row in parallel. diff --git a/cpp/deeplake_pg/table_data_impl.hpp b/cpp/deeplake_pg/table_data_impl.hpp index 66b910f6da..a0e69a7719 100644 --- a/cpp/deeplake_pg/table_data_impl.hpp +++ b/cpp/deeplake_pg/table_data_impl.hpp @@ -580,123 +580,47 @@ inline void table_data::create_streamer(int32_t idx, int32_t worker_id) streamers_.column_to_batches[idx].batches.resize(batch_count); } -namespace detail { - -// SFINAE check: does column_streamer have ensure_first_batch_ready()? -// This allows warm_all_streamers to compile against both the released API -// (which lacks the method) and the local dev API (which has it). -template -struct has_ensure_first_batch_ready : std::false_type {}; - -template -struct has_ensure_first_batch_ready().ensure_first_batch_ready())>> : std::true_type {}; - -template -inline void try_warm_streamer([[maybe_unused]] Streamer& s, [[maybe_unused]] size_t idx) -{ - if constexpr (has_ensure_first_batch_ready::value) { - try { - s.ensure_first_batch_ready(); - } catch (const bifrost::stop_iteration&) { - // Expected when there are no batches - } catch (const std::exception& e) { - base::log_warning(base::log_channel::async, - "warm_all_streamers: streamer {} failed: {}", idx, e.what()); - } catch (...) { - base::log_warning(base::log_channel::async, - "warm_all_streamers: streamer {} failed with unknown exception", idx); - } - } - // If the method doesn't exist, this is a no-op. - // The async_prefetcher background prefetch is still active. -} - -} // namespace detail - inline void table_data::streamer_info::warm_all_streamers() { - // Full pre-warming: download ALL batches for ALL columns in parallel. - // This eliminates cold-run latency by pre-populating the raw batch cache - // before query execution starts. - // - // Each column's async_prefetcher has already queued all batch downloads - // in its start() method. Here we drain those queues in parallel across - // columns, storing the raw nd::array results in prefetched_raw_batches_. - // The type-specific initialization (eval_with_nones, string_stream_array_holder) - // happens lazily when the batch is first accessed by value_ptr/get_sample/value. - - // Collect indices of valid streamers to warm - std::vector indices; + // Warm the first batch of every streamer in parallel by calling + // ensure_first_batch_ready() on each one. This caches the first batch + // INSIDE the async_prefetcher, so the normal next_batch() flow works + // unchanged - it just returns the cached batch immediately instead of + // blocking on I/O. This is safe to call multiple times (idempotent). + + std::vector active; for (size_t i = 0; i < streamers.size(); ++i) { if (streamers[i] && !streamers[i]->empty()) { - indices.push_back(i); + active.push_back(i); } } - if (indices.empty()) { + if (active.size() <= 1) { + // 0 or 1 streamers: warm inline, no benefit from parallelism + for (size_t i : active) { + streamers[i]->ensure_first_batch_ready(); + } return; } - // Spawn threads to pre-download ALL batches for each column in parallel. - // Each thread drains the column's async_prefetcher, storing raw batches - // in prefetched_raw_batches_ for later consumption by value_ptr/get_sample/value. + // Warm all streamers in parallel using threads std::vector threads; - threads.reserve(indices.size()); - - for (size_t idx : indices) { - threads.emplace_back([this, idx]() { + threads.reserve(active.size()); + for (size_t i : active) { + threads.emplace_back([this, i]() { try { - auto& col_data = column_to_batches[idx]; - const size_t num_batches = col_data.batches.size(); - // Count how many batches are already initialized - size_t batches_to_fetch = 0; - for (size_t i = 0; i < num_batches; ++i) { - if (!col_data.batches[i].initialized_.load(std::memory_order_relaxed)) { - ++batches_to_fetch; - } - } - if (batches_to_fetch == 0) { - return; - } - // Drain all pending batches from the streamer into the prefetch queue - std::lock_guard lock(col_data.mutex_); - for (size_t i = 0; i < batches_to_fetch; ++i) { - try { - col_data.prefetched_raw_batches_.push_back(streamers[idx]->next_batch()); - } catch (const bifrost::stop_iteration&) { - break; // No more batches - } - } + streamers[i]->ensure_first_batch_ready(); } catch (const std::exception& e) { base::log_warning(base::log_channel::async, - "warm_all_streamers: column {} failed: {}", idx, e.what()); - } catch (...) { - base::log_warning(base::log_channel::async, - "warm_all_streamers: column {} failed with unknown exception", idx); + "warm_all_streamers: streamer {} failed: {}", i, e.what()); } }); } - - // Wait for all columns to finish downloading for (auto& t : threads) { - if (t.joinable()) { - t.join(); - } + t.join(); } } -inline void table_data::streamer_info::warm_all_streamers_async() -{ - // Launch warm_all_streamers() in a detached background thread. - // This allows the DuckDB executor to start query processing immediately - // while batch data is being prefetched in the background. - // The prefetched data is consumed via prefetched_raw_batches_ in value_ptr/get_sample. - std::thread([this]() { - warm_all_streamers(); - }).detach(); -} - inline void table_data::streamer_info::prefetch_batches_for_row( const std::vector& column_indices, int64_t row_number) { diff --git a/scripts/build_pg_ext.py b/scripts/build_pg_ext.py index 8f4cacc605..673f31e02e 100644 --- a/scripts/build_pg_ext.py +++ b/scripts/build_pg_ext.py @@ -1,3 +1,11 @@ +#!/usr/bin/env -S uv run +# /// script +# requires-python = ">=3.11" +# dependencies = [ +# "requests>=2.28", +# ] +# /// + import json import os import sys @@ -6,7 +14,7 @@ try: import requests except ImportError: - os.system("pip install requests --user") + os.system("pip install requests --user --break-system-packages") import requests """ @@ -23,6 +31,7 @@ + def get_pinned_version(): """ Read the pinned deeplake API version from DEEPLAKE_API_VERSION file. From 9c64d2b16aa4ce0981bc031de9bd68f42189502d Mon Sep 17 00:00:00 2001 From: Davit Buniatyan Date: Tue, 10 Feb 2026 00:20:26 +0000 Subject: [PATCH 14/14] cpp updates --- cpp/bifrost/column_streamer.hpp | 6 +- cpp/deeplake_pg/deeplake_executor.cpp | 8 +- cpp/deeplake_pg/table_am.cpp | 18 ++- cpp/deeplake_pg/table_data.hpp | 9 +- cpp/deeplake_pg/table_data_impl.hpp | 178 +++++++++++--------------- postgres/scripts/run_pg_server.sh | 8 +- 6 files changed, 107 insertions(+), 120 deletions(-) diff --git a/cpp/bifrost/column_streamer.hpp b/cpp/bifrost/column_streamer.hpp index 7e8b89a7ad..69ca7aed8c 100644 --- a/cpp/bifrost/column_streamer.hpp +++ b/cpp/bifrost/column_streamer.hpp @@ -35,11 +35,9 @@ class column_streamer return b.columns()[0].array(); } - async::promise next_batch_async() + async::promise next_batch_async() { - return prefetcher_.next_batch_async().then([](deeplake_core::batch b) { - return b.columns()[0].array(); - }); + return prefetcher_.next_batch_async(); } /** diff --git a/cpp/deeplake_pg/deeplake_executor.cpp b/cpp/deeplake_pg/deeplake_executor.cpp index 3a5d6a70f2..3f968ea3e2 100644 --- a/cpp/deeplake_pg/deeplake_executor.cpp +++ b/cpp/deeplake_pg/deeplake_executor.cpp @@ -89,7 +89,13 @@ void analyze_plan(PlannedStmt* plan) // Warm first batches for all streamers in parallel for cold run optimization. // This blocks until all first batches are downloaded but overlaps I/O across columns. if (pg::eager_batch_prefetch) { - table_data->get_streamers().warm_all_streamers(); + try { + table_data->get_streamers().warm_all_streamers(); + } catch (const std::exception& e) { + elog(WARNING, "Eager batch prefetch failed during analyze_plan: %s", e.what()); + } catch (...) { + elog(WARNING, "Eager batch prefetch failed during analyze_plan with unknown exception"); + } } } pg::query_info::current().set_all_tables_are_deeplake(all_tables_are_deeplake); diff --git a/cpp/deeplake_pg/table_am.cpp b/cpp/deeplake_pg/table_am.cpp index 5b7758c68d..12746a9944 100644 --- a/cpp/deeplake_pg/table_am.cpp +++ b/cpp/deeplake_pg/table_am.cpp @@ -388,8 +388,13 @@ double deeplake_index_build_range_scan(Relation heap_rel, // Warm all streamers in parallel for cold run optimization if (pg::eager_batch_prefetch) { - td.get_streamers().warm_all_streamers(); - base::log_info(base::log_channel::generic, "Eager batch prefetch completed for index_build_range_scan"); + try { + td.get_streamers().warm_all_streamers(); + } catch (const std::exception& e) { + elog(WARNING, "Eager batch prefetch failed during index_build_range_scan: %s", e.what()); + } catch (...) { + elog(WARNING, "Eager batch prefetch failed during index_build_range_scan with unknown exception"); + } } std::vector values(nkeys, 0); @@ -749,8 +754,13 @@ TableScanDesc deeplake_table_am_routine::scan_begin(Relation relation, // Warm all streamers in parallel for cold run optimization if (pg::eager_batch_prefetch) { - td.get_streamers().warm_all_streamers(); - base::log_info(base::log_channel::generic, "Eager batch prefetch completed for scan_begin"); + try { + td.get_streamers().warm_all_streamers(); + } catch (const std::exception& e) { + elog(WARNING, "Eager batch prefetch failed during scan_begin: %s", e.what()); + } catch (...) { + elog(WARNING, "Eager batch prefetch failed during scan_begin with unknown exception"); + } } if (nkeys > 0) { diff --git a/cpp/deeplake_pg/table_data.hpp b/cpp/deeplake_pg/table_data.hpp index bc299affb7..ce51f3d5f8 100644 --- a/cpp/deeplake_pg/table_data.hpp +++ b/cpp/deeplake_pg/table_data.hpp @@ -118,16 +118,11 @@ struct table_data { std::mutex mutex_; std::vector batches; - /// Pre-fetched raw batches from parallel prefetch. Protected by mutex_. - /// warm_all_streamers() drains the streamer into this deque; - /// value_ptr/get_sample/value consume from the front. - std::deque prefetched_raw_batches_; column_data() = default; column_data(const column_data&) = delete; column_data(column_data&& other) noexcept : batches(std::move(other.batches)) - , prefetched_raw_batches_(std::move(other.prefetched_raw_batches_)) { } column_data& operator=(const column_data&) = delete; @@ -136,11 +131,15 @@ struct table_data std::vector column_to_batches; std::vector> streamers; + std::vector> first_batch_cache_; + bool warmed_ = false; inline void reset() noexcept { column_to_batches.clear(); streamers.clear(); + first_batch_cache_.clear(); + warmed_ = false; } /** diff --git a/cpp/deeplake_pg/table_data_impl.hpp b/cpp/deeplake_pg/table_data_impl.hpp index a0e69a7719..a0a37669e9 100644 --- a/cpp/deeplake_pg/table_data_impl.hpp +++ b/cpp/deeplake_pg/table_data_impl.hpp @@ -582,105 +582,57 @@ inline void table_data::create_streamer(int32_t idx, int32_t worker_id) inline void table_data::streamer_info::warm_all_streamers() { - // Warm the first batch of every streamer in parallel by calling - // ensure_first_batch_ready() on each one. This caches the first batch - // INSIDE the async_prefetcher, so the normal next_batch() flow works - // unchanged - it just returns the cached batch immediately instead of - // blocking on I/O. This is safe to call multiple times (idempotent). - - std::vector active; - for (size_t i = 0; i < streamers.size(); ++i) { - if (streamers[i] && !streamers[i]->empty()) { - active.push_back(i); - } - } - - if (active.size() <= 1) { - // 0 or 1 streamers: warm inline, no benefit from parallelism - for (size_t i : active) { - streamers[i]->ensure_first_batch_ready(); - } + if (warmed_) { return; } + warmed_ = true; - // Warm all streamers in parallel using threads - std::vector threads; - threads.reserve(active.size()); - for (size_t i : active) { - threads.emplace_back([this, i]() { - try { - streamers[i]->ensure_first_batch_ready(); - } catch (const std::exception& e) { - base::log_warning(base::log_channel::async, - "warm_all_streamers: streamer {} failed: {}", i, e.what()); - } - }); - } - for (auto& t : threads) { - t.join(); - } -} + first_batch_cache_.resize(streamers.size()); -inline void table_data::streamer_info::prefetch_batches_for_row( - const std::vector& column_indices, int64_t row_number) -{ - const int64_t batch_index = row_number >> batch_size_log2_; + icm::vector indices; + icm::vector> promises; - // Fast check: find columns that need batch initialization - std::vector need_init; - for (int32_t col : column_indices) { - if (col < 0 || static_cast(col) >= column_to_batches.size()) continue; - auto& col_data = column_to_batches[col]; - if (static_cast(batch_index) < col_data.batches.size() && - !col_data.batches[batch_index].initialized_.load(std::memory_order_acquire)) { - need_init.push_back(col); + for (size_t i = 0; i < streamers.size(); ++i) { + if (streamers[i]) { + promises.push_back(streamers[i]->next_batch_async()); + indices.push_back(i); } } - if (need_init.size() <= 1) { - // 0 or 1 columns need init - no benefit from parallelism + if (promises.empty()) { return; } - // Multiple columns need batch initialization: call next_batch() for each in parallel. - // This overlaps the I/O waits across all columns. The raw batch results are stored - // in prefetched_raw_batches_ and consumed by the type-specific value_ptr/value/get_sample - // methods during the subsequent sequential column processing. - std::vector threads; - threads.reserve(need_init.size()); - - for (int32_t col : need_init) { - threads.emplace_back([this, col, batch_index]() { - try { - auto& col_data = column_to_batches[col]; - // Fetch all uninitialized batches up to batch_index for this column - std::lock_guard lock(col_data.mutex_); - for (int64_t i = 0; i <= batch_index; ++i) { - if (!col_data.batches[i].initialized_.load(std::memory_order_relaxed)) { - // Only pre-fetch one batch (the next one from the streamer). - // Store it as a raw batch for the type-specific code to consume. - if (col_data.prefetched_raw_batches_.empty()) { - col_data.prefetched_raw_batches_.push_back(streamers[col]->next_batch()); - } - break; // Only need to pre-fetch the next pending batch - } - } - } catch (const bifrost::stop_iteration&) { - // Expected when streamer is exhausted - } catch (const std::exception& e) { - base::log_warning(base::log_channel::async, - "prefetch_batches_for_row: column {} failed: {}", col, e.what()); - } - }); - } + try { + auto results = async::combine(std::move(promises)).get_future().get(); - for (auto& t : threads) { - if (t.joinable()) { - t.join(); + for (size_t j = 0; j < indices.size(); ++j) { + auto& batch = results[j]; + first_batch_cache_[indices[j]] = batch.columns()[0].array(); } + } catch (const std::exception& e) { + elog(WARNING, "warm_all_streamers failed: %s", e.what()); + first_batch_cache_.clear(); + } catch (...) { + elog(WARNING, "warm_all_streamers failed with unknown exception"); + first_batch_cache_.clear(); } } +inline void table_data::streamer_info::prefetch_batches_for_row( + const std::vector& /*column_indices*/, int64_t /*row_number*/) +{ + // The async_prefetcher already downloads batches ahead of time in the + // background. Each column_streamer's prefetcher queues up to max_task_count_ + // batch requests on construction. By the time the scan reaches a batch + // boundary, the download is typically already complete or nearly ready. + // + // Previously this method spawned OS threads per column per batch boundary + // to overlap I/O, but thread creation (pthread_create/clone3) was the + // dominant overhead, as shown by profiling. The async prefetcher provides + // the same I/O overlap without thread creation cost. +} + inline nd::array table_data::streamer_info::get_sample(int32_t column_number, int64_t row_number) { const int64_t batch_index = row_number >> batch_size_log2_; @@ -689,17 +641,22 @@ inline nd::array table_data::streamer_info::get_sample(int32_t column_number, in auto& col_data = column_to_batches[column_number]; auto& batch = col_data.batches[batch_index]; + if (batch_index == 0 && !first_batch_cache_.empty() + && static_cast(column_number) < first_batch_cache_.size() + && first_batch_cache_[column_number].has_value()) { + std::lock_guard lock(col_data.mutex_); + if (!batch.initialized_.load(std::memory_order_relaxed)) { + batch.owner_ = std::move(*first_batch_cache_[column_number]); + first_batch_cache_[column_number].reset(); + batch.initialized_.store(true, std::memory_order_release); + } + } + if (!batch.initialized_.load(std::memory_order_acquire)) [[unlikely]] { std::lock_guard lock(col_data.mutex_); for (int64_t i = 0; i <= batch_index; ++i) { if (!col_data.batches[i].initialized_.load(std::memory_order_relaxed)) { - // Use pre-fetched batch if available, otherwise fetch synchronously - if (!col_data.prefetched_raw_batches_.empty()) { - col_data.batches[i].owner_ = std::move(col_data.prefetched_raw_batches_.front()); - col_data.prefetched_raw_batches_.pop_front(); - } else { - col_data.batches[i].owner_ = streamers[column_number]->next_batch(); - } + col_data.batches[i].owner_ = streamers[column_number]->next_batch(); col_data.batches[i].initialized_.store(true, std::memory_order_release); } } @@ -722,18 +679,23 @@ inline const T* table_data::streamer_info::value_ptr(int32_t column_number, int6 auto& col_data = column_to_batches[column_number]; auto& batch = col_data.batches[batch_index]; + if (batch_index == 0 && !first_batch_cache_.empty() + && static_cast(column_number) < first_batch_cache_.size() + && first_batch_cache_[column_number].has_value()) { + std::lock_guard lock(col_data.mutex_); + if (!batch.initialized_.load(std::memory_order_relaxed)) { + batch.owner_ = utils::eval_with_nones(std::move(*first_batch_cache_[column_number])); + first_batch_cache_[column_number].reset(); + batch.data_ = batch.owner_.data().data(); + batch.initialized_.store(true, std::memory_order_release); + } + } + if (!batch.initialized_.load(std::memory_order_acquire)) [[unlikely]] { std::lock_guard lock(col_data.mutex_); for (int64_t i = 0; i <= batch_index; ++i) { if (!col_data.batches[i].initialized_.load(std::memory_order_relaxed)) { - // Use pre-fetched batch if available, otherwise fetch synchronously - nd::array raw_batch; - if (!col_data.prefetched_raw_batches_.empty()) { - raw_batch = std::move(col_data.prefetched_raw_batches_.front()); - col_data.prefetched_raw_batches_.pop_front(); - } else { - raw_batch = streamers[column_number]->next_batch(); - } + nd::array raw_batch = streamers[column_number]->next_batch(); col_data.batches[i].owner_ = utils::eval_with_nones(std::move(raw_batch)); col_data.batches[i].data_ = col_data.batches[i].owner_.data().data(); col_data.batches[i].initialized_.store(true, std::memory_order_release); @@ -753,17 +715,23 @@ inline std::string_view table_data::streamer_info::value(int32_t column_number, auto& col_data = column_to_batches[column_number]; auto& batch = col_data.batches[batch_index]; + if (batch_index == 0 && !first_batch_cache_.empty() + && static_cast(column_number) < first_batch_cache_.size() + && first_batch_cache_[column_number].has_value()) { + std::lock_guard lock(col_data.mutex_); + if (!batch.initialized_.load(std::memory_order_relaxed)) { + batch.owner_ = std::move(*first_batch_cache_[column_number]); + first_batch_cache_[column_number].reset(); + batch.holder_ = impl::string_stream_array_holder(batch.owner_); + batch.initialized_.store(true, std::memory_order_release); + } + } + if (!batch.initialized_.load(std::memory_order_acquire)) [[unlikely]] { std::lock_guard lock(col_data.mutex_); for (int64_t i = 0; i <= batch_index; ++i) { if (!col_data.batches[i].initialized_.load(std::memory_order_relaxed)) { - // Use pre-fetched batch if available, otherwise fetch synchronously - if (!col_data.prefetched_raw_batches_.empty()) { - col_data.batches[i].owner_ = std::move(col_data.prefetched_raw_batches_.front()); - col_data.prefetched_raw_batches_.pop_front(); - } else { - col_data.batches[i].owner_ = streamers[column_number]->next_batch(); - } + col_data.batches[i].owner_ = streamers[column_number]->next_batch(); col_data.batches[i].holder_ = impl::string_stream_array_holder(col_data.batches[i].owner_); col_data.batches[i].initialized_.store(true, std::memory_order_release); } diff --git a/postgres/scripts/run_pg_server.sh b/postgres/scripts/run_pg_server.sh index e35bf5ea2d..5f0304e6cf 100755 --- a/postgres/scripts/run_pg_server.sh +++ b/postgres/scripts/run_pg_server.sh @@ -95,8 +95,14 @@ else sed -i 's/^\(host.*all.*all.*::1\/128\s*\)trust/\1scram-sha-256/' "$POSTGRES_DATA/pg_hba.conf" fi -# Start PostgreSQL (extended timeout for shared_preload_libraries loading) +# Start PostgreSQL temporarily to set password "$POSTGRES_INSTALL/bin/pg_ctl" -D "$POSTGRES_DATA" -l "$TEST_LOGFILE" -t 120 start # Set postgres password (matches Docker POSTGRES_PASSWORD=password) "$POSTGRES_INSTALL/bin/psql" -U postgres -c "ALTER USER postgres PASSWORD 'password';" + +# Stop PostgreSQL and restart in foreground +stop_postgres + +echo "Starting PostgreSQL in foreground..." +exec "$POSTGRES_INSTALL/bin/postgres" -D "$POSTGRES_DATA"