Skip to content

Commit 995e633

Browse files
committed
Fixed statelessness.
1 parent f75f3a4 commit 995e633

9 files changed

Lines changed: 369 additions & 306 deletions

File tree

cpp/deeplake_pg/dl_catalog.cpp

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -37,10 +37,8 @@ std::string join_path(const std::string& root, const std::string& name)
3737
return root + "/" + k_catalog_dir + "/" + name;
3838
}
3939

40-
std::shared_ptr<deeplake_api::catalog_table> open_or_create_table(
41-
const std::string& path,
42-
deeplake_api::catalog_table_schema schema,
43-
icm::string_map<> creds)
40+
std::shared_ptr<deeplake_api::catalog_table>
41+
open_or_create_table(const std::string& path, deeplake_api::catalog_table_schema schema, icm::string_map<> creds)
4442
{
4543
return deeplake_api::open_or_create_catalog_table(path, schema, std::move(creds)).get_future().get();
4644
}
@@ -51,9 +49,8 @@ int64_t now_ms()
5149
return duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
5250
}
5351

54-
std::shared_ptr<deeplake_api::catalog_table> open_catalog_table(const std::string& root_path,
55-
const std::string& name,
56-
icm::string_map<> creds)
52+
std::shared_ptr<deeplake_api::catalog_table>
53+
open_catalog_table(const std::string& root_path, const std::string& name, icm::string_map<> creds)
5754
{
5855
const auto path = join_path(root_path, name);
5956
return deeplake_api::open_catalog_table(path, std::move(creds)).get_future().get();
@@ -213,8 +210,8 @@ std::vector<table_meta> load_tables(const std::string& root_path, icm::string_ma
213210
auto path_it = row.find("dataset_path");
214211
auto state_it = row.find("state");
215212
auto updated_it = row.find("updated_at");
216-
if (table_id_it == row.end() || schema_it == row.end() || table_it == row.end() ||
217-
path_it == row.end() || state_it == row.end() || updated_it == row.end()) {
213+
if (table_id_it == row.end() || schema_it == row.end() || table_it == row.end() || path_it == row.end() ||
214+
state_it == row.end() || updated_it == row.end()) {
218215
continue;
219216
}
220217

cpp/deeplake_pg/extension_init.cpp

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ bool treat_numeric_as_double = true; // Treat numeric types as double by default
5555
bool print_progress_during_seq_scan = false;
5656
bool use_shared_mem_for_refresh = false;
5757
bool enable_dataset_logging = false; // Enable dataset operation logging for debugging
58-
bool allow_custom_paths = true; // Allow dataset_path in CREATE TABLE options
58+
bool allow_custom_paths = true; // Allow dataset_path in CREATE TABLE options
5959

6060
} // namespace pg
6161

@@ -133,7 +133,6 @@ void initialize_guc_parameters()
133133
nullptr // check_hook, assign_hook, show_hook
134134
);
135135

136-
137136
DefineCustomBoolVariable("pg_deeplake.print_runtime_stats",
138137
"Enable runtime statistics printing for pg_deeplake operations.",
139138
nullptr, // optional long description

cpp/deeplake_pg/table_am.cpp

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,22 +13,22 @@ extern "C" {
1313
#include <catalog/namespace.h>
1414
#include <catalog/storage.h>
1515
#include <catalog/storage_xlog.h>
16-
#include <storage/smgr.h>
17-
#include <storage/bufmgr.h>
1816
#include <commands/vacuum.h> // For VacuumParams and VACOPT_* flags
1917
#include <miscadmin.h>
2018
#include <nodes/bitmapset.h> // For bitmap operations
2119
#include <nodes/nodes.h> // For node types
2220
#include <nodes/parsenodes.h> // For parse nodes
2321
#include <pgstat.h> // For statistics collector integration
2422
#include <storage/block.h>
23+
#include <storage/bufmgr.h>
2524
#include <storage/relfilelocator.h>
25+
#include <storage/smgr.h>
2626
#include <utils/builtins.h> // For text conversion functions
2727
#include <utils/lsyscache.h>
2828
#include <utils/rel.h>
2929
#include <utils/relcache.h>
3030
#include <utils/snapshot.h> // For SNAPSHOT_DIRTY and snapshot types
31-
#include <utils/varlena.h> // For text functions
31+
#include <utils/varlena.h> // For text functions
3232

3333
#ifdef __cplusplus
3434
}
@@ -261,8 +261,7 @@ void deeplake_estimate_rel_size(
261261
}
262262
if (pages != nullptr) {
263263
constexpr uint32_t min_pages = 1;
264-
const uint32_t num_blocks =
265-
static_cast<uint32_t>(std::ceil(static_cast<double>(total_rows) / 65536.0));
264+
const uint32_t num_blocks = static_cast<uint32_t>(std::ceil(static_cast<double>(total_rows) / 65536.0));
266265
*pages = std::max(min_pages, num_blocks);
267266
}
268267

@@ -338,15 +337,15 @@ bool deeplake_scan_analyze_next_block(TableScanDesc scan, ReadStream* stream)
338337
Buffer buf = read_stream_next_buffer(stream, NULL);
339338

340339
if (!BufferIsValid(buf)) {
341-
return false; // No more blocks to sample
340+
return false; // No more blocks to sample
342341
}
343342

344343
// Release the buffer immediately - we don't actually need the physical data
345344
// because our columnar storage provides the data via scan_analyze_next_tuple.
346345
// But we needed to consume the stream to increment bs.m.
347346
ReleaseBuffer(buf);
348347

349-
return true; // Indicate we have data to process
348+
return true; // Indicate we have data to process
350349
}
351350
#endif
352351

@@ -754,7 +753,8 @@ void deeplake_table_am_routine::scan_rescan(TableScanDesc scan,
754753
std::string pre_msg = "Progress of sequential scan for table '" +
755754
scan_data->scan_state.get_table_data().get_table_name() + "'" + " (rescan " +
756755
std::to_string(scan_data->num_rescans) + ")";
757-
scan_data->progress_bar.restart(scan_data->scan_state.get_table_data().num_total_rows(), std::move(pre_msg));
756+
scan_data->progress_bar.restart(scan_data->scan_state.get_table_data().num_total_rows(),
757+
std::move(pre_msg));
758758
}
759759
}
760760
}
@@ -1180,6 +1180,10 @@ void deeplake_table_am_routine::relation_set_new_node(
11801180

11811181
convert_schema(tupdesc);
11821182

1183+
// Set DDL context to prevent auto-creation of tables from catalog during
1184+
// concurrent table creation (which causes race conditions).
1185+
table_storage::ddl_context_guard ddl_guard;
1186+
11831187
try {
11841188
table_storage::instance().create_table(table_name, RelationGetRelid(rel), tupdesc);
11851189
} catch (const std::exception& e) {

cpp/deeplake_pg/table_storage.cpp

Lines changed: 32 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -21,24 +21,24 @@ extern "C" {
2121
#include <utils/guc.h>
2222
#include <utils/lsyscache.h>
2323
#include <utils/rel.h>
24-
#include <utils/syscache.h>
2524
#include <utils/snapmgr.h>
25+
#include <utils/syscache.h>
2626

2727
#ifdef __cplusplus
2828
}
2929
#endif
3030

3131
#include "table_storage.hpp"
3232

33-
#include "exceptions.hpp"
3433
#include "dl_catalog.hpp"
35-
#include <storage/exceptions.hpp>
34+
#include "exceptions.hpp"
3635
#include "logger.hpp"
3736
#include "memory_tracker.hpp"
3837
#include "nd_utils.hpp"
3938
#include "table_ddl_lock.hpp"
4039
#include "table_scan.hpp"
4140
#include "utils.hpp"
41+
#include <storage/exceptions.hpp>
4242

4343
#include <icm/json.hpp>
4444
#include <icm/string_map.hpp>
@@ -276,7 +276,13 @@ void table_storage::load_table_metadata()
276276
auto* rel = makeRangeVar(pstrdup(meta.schema_name.c_str()), pstrdup(meta.table_name.c_str()), -1);
277277
Oid relid = RangeVarGetRelid(rel, NoLock, true);
278278
if (!OidIsValid(relid)) {
279-
// Attempt to create missing table from catalog entry.
279+
// Table exists in catalog but not in PostgreSQL.
280+
if (in_ddl_context()) {
281+
// During DDL (CREATE TABLE), skip auto-creation to avoid races.
282+
// The table might be in the middle of being created by another backend.
283+
continue;
284+
}
285+
// Not in DDL context (e.g., SET root_path) - safe to auto-create.
280286
pg::utils::memory_context_switcher context_switcher;
281287
pg::utils::spi_connector connector;
282288
bool pushed_snapshot = false;
@@ -293,14 +299,10 @@ void table_storage::load_table_metadata()
293299
SPI_execute(buf.data, false, 0);
294300

295301
resetStringInfo(&buf);
296-
appendStringInfo(&buf,
297-
"SELECT create_deeplake_table(%s, %s)",
298-
quote_literal_cstr(qualified_name.c_str()),
299-
qpath);
302+
appendStringInfo(
303+
&buf, "SELECT create_deeplake_table(%s, %s)", quote_literal_cstr(qualified_name.c_str()), qpath);
300304
if (SPI_execute(buf.data, false, 0) != SPI_OK_SELECT) {
301-
elog(WARNING,
302-
"Failed to auto-create deeplake table %s from catalog",
303-
qualified_name.c_str());
305+
elog(WARNING, "Failed to auto-create deeplake table %s from catalog", qualified_name.c_str());
304306
}
305307
if (pushed_snapshot) {
306308
PopActiveSnapshot();
@@ -334,7 +336,8 @@ void table_storage::load_table_metadata()
334336
return;
335337
}
336338

337-
struct snapshot_guard {
339+
struct snapshot_guard
340+
{
338341
bool active = false;
339342
snapshot_guard()
340343
{
@@ -355,22 +358,21 @@ void table_storage::load_table_metadata()
355358
// If not, drop and recreate the table with the correct schema
356359
if (!pg::utils::check_column_exists("pg_deeplake_tables", "table_oid")) {
357360
base::log_warning(base::log_channel::generic,
358-
"Detected old schema for pg_deeplake_tables without table_oid column. "
359-
"Dropping and recreating table to match current schema.");
361+
"Detected old schema for pg_deeplake_tables without table_oid column. "
362+
"Dropping and recreating table to match current schema.");
360363

361364
pg::utils::spi_connector connector;
362365
const char* drop_query = "DROP TABLE IF EXISTS public.pg_deeplake_tables CASCADE";
363366
if (SPI_execute(drop_query, false, 0) != SPI_OK_UTILITY) {
364367
base::log_warning(base::log_channel::generic, "Failed to drop old pg_deeplake_tables table");
365368
}
366369

367-
const char* create_query =
368-
"CREATE TABLE public.pg_deeplake_tables ("
369-
" id SERIAL PRIMARY KEY,"
370-
" table_oid OID NOT NULL UNIQUE,"
371-
" table_name NAME NOT NULL UNIQUE,"
372-
" ds_path TEXT NOT NULL UNIQUE"
373-
")";
370+
const char* create_query = "CREATE TABLE public.pg_deeplake_tables ("
371+
" id SERIAL PRIMARY KEY,"
372+
" table_oid OID NOT NULL UNIQUE,"
373+
" table_name NAME NOT NULL UNIQUE,"
374+
" ds_path TEXT NOT NULL UNIQUE"
375+
")";
374376
if (SPI_execute(create_query, false, 0) != SPI_OK_UTILITY) {
375377
base::log_warning(base::log_channel::generic, "Failed to create new pg_deeplake_tables table");
376378
}
@@ -445,8 +447,10 @@ void table_storage::load_table_metadata()
445447
// Use the actual relation name from PostgreSQL catalog, not the cached metadata name
446448
// This ensures we have the current name even if the table was renamed
447449
std::string actual_table_name = get_qualified_table_name(rel);
448-
elog(DEBUG1, "Loading table from metadata: cached_name=%s, actual_name=%s",
449-
table_name, actual_table_name.c_str());
450+
elog(DEBUG1,
451+
"Loading table from metadata: cached_name=%s, actual_name=%s",
452+
table_name,
453+
actual_table_name.c_str());
450454
table_data td(
451455
relid, actual_table_name, CreateTupleDescCopy(RelationGetDescr(rel)), std::string(ds_path), creds);
452456
auto it2status = tables_.emplace(relid, std::move(td));
@@ -570,10 +574,11 @@ void table_storage::create_table(const std::string& table_name, Oid table_id, Tu
570574
// Use provided dataset path or construct default path
571575
if (!options.dataset_path().empty()) {
572576
if (!pg::allow_custom_paths) {
573-
ereport(ERROR,
574-
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
575-
errmsg("Custom dataset_path is disabled"),
576-
errhint("Set deeplake.allow_custom_paths=on or omit dataset_path and configure deeplake.root_path")));
577+
ereport(
578+
ERROR,
579+
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
580+
errmsg("Custom dataset_path is disabled"),
581+
errhint("Set deeplake.allow_custom_paths=on or omit dataset_path and configure deeplake.root_path")));
577582
}
578583
// Explicit path provided via WITH clause
579584
dataset_path = options.dataset_path();

cpp/deeplake_pg/table_storage.hpp

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ namespace pg {
77
// Session-level credentials management
88
struct session_credentials
99
{
10-
static char* creds_guc_string; // GUC string variable for credentials
10+
static char* creds_guc_string; // GUC string variable for credentials
1111
static char* root_path_guc_string; // GUC string variable for root path
1212

1313
// Get credentials from current session
@@ -228,7 +228,37 @@ class table_storage
228228
up_to_date_ = up_to_date;
229229
}
230230

231+
/**
232+
* RAII guard to suppress auto-creation of tables during load_table_metadata().
233+
*
234+
* When creating a table concurrently, we don't want load_table_metadata() to
235+
* auto-create tables from the catalog, as this causes race conditions with
236+
* other backends that are also creating tables.
237+
*
238+
* Usage: Create this guard before calling table_storage::instance() during DDL.
239+
*/
240+
class ddl_context_guard
241+
{
242+
public:
243+
ddl_context_guard()
244+
{
245+
in_ddl_context_ = true;
246+
}
247+
~ddl_context_guard()
248+
{
249+
in_ddl_context_ = false;
250+
}
251+
ddl_context_guard(const ddl_context_guard&) = delete;
252+
ddl_context_guard& operator=(const ddl_context_guard&) = delete;
253+
};
254+
255+
static bool in_ddl_context() noexcept
256+
{
257+
return in_ddl_context_;
258+
}
259+
231260
private:
261+
static inline thread_local bool in_ddl_context_ = false;
232262
table_storage() = default;
233263

234264
void save_table_metadata(const table_data& td);
@@ -242,4 +272,4 @@ class table_storage
242272
int64_t catalog_version_ = 0;
243273
};
244274

245-
} // namespace pg
275+
} // namespace pg

0 commit comments

Comments
 (0)