Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/Common/FailPoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,8 @@ static struct InitFiu
REGULAR(rmt_delay_execute_drop_range) \
REGULAR(rmt_delay_commit_part) \
ONCE(local_object_storage_network_error_during_remove) \
ONCE(parallel_replicas_check_read_mode_always)
ONCE(parallel_replicas_check_read_mode_always)\
REGULAR(lightweight_show_tables)

namespace FailPoints
{
Expand Down
119 changes: 39 additions & 80 deletions src/Databases/DataLake/DatabaseDataLake.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTDataType.h>
#include <Common/FailPoint.h>

namespace DB
{
Expand Down Expand Up @@ -97,6 +98,11 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}

namespace FailPoints
{
extern const char lightweight_show_tables[];
}

DatabaseDataLake::DatabaseDataLake(
const std::string & database_name_,
const std::string & url_,
Expand Down Expand Up @@ -448,6 +454,12 @@ StoragePtr DatabaseDataLake::tryGetTableImpl(const String & name, ContextPtr con
auto catalog = getCatalog();
auto table_metadata = DataLake::TableMetadata().withSchema().withLocation().withDataLakeSpecificProperties();

/// This is added to test that lightweight queries like 'SHOW TABLES' dont end up fetching the table
fiu_do_on(FailPoints::lightweight_show_tables,
{
std::this_thread::sleep_for(std::chrono::seconds(10));
});

const bool with_vended_credentials = settings[DatabaseDataLakeSetting::vended_credentials].value;
if (!lightweight && with_vended_credentials)
table_metadata = table_metadata.withStorageCredentials();
Expand Down Expand Up @@ -665,15 +677,33 @@ DatabaseTablesIteratorPtr DatabaseDataLake::getTablesIterator(
pool.scheduleOrThrow(
[this, table_name, skip_not_loaded, context_, promise=promises.back()]() mutable
{
StoragePtr storage = nullptr;
try
{
auto storage = tryGetTableImpl(table_name, context_, false, skip_not_loaded);
promise->set_value(storage);
LOG_INFO(log, "Get table information for table {}", table_name);
storage = tryGetTableImpl(table_name, context_, false, skip_not_loaded);
}
catch (...)
{
promise->set_exception(std::current_exception());
if (context_->getSettingsRef()[Setting::database_datalake_require_metadata_access])
{
auto error_code = getCurrentExceptionCode();
auto error_message = getCurrentExceptionMessage(true, false, true, true);
auto enhanced_message = fmt::format(
"Received error {} while fetching table metadata for existing table '{}'. "
"If you want this error to be ignored, use database_datalake_require_metadata_access=0. Error: {}",
error_code,
table_name,
error_message);
promise->set_exception(std::make_exception_ptr(Exception::createRuntime(
error_code,
enhanced_message)));
return;
}
else
tryLogCurrentException(log, fmt::format("Ignoring table {}", table_name));
}
promise->set_value(storage);
});
}
catch (...)
Expand Down Expand Up @@ -701,15 +731,14 @@ DatabaseTablesIteratorPtr DatabaseDataLake::getTablesIterator(
return std::make_unique<DatabaseTablesSnapshotIterator>(tables, getDatabaseName());
}

DatabaseTablesIteratorPtr DatabaseDataLake::getLightweightTablesIterator(
ContextPtr context_,
std::vector<LightWeightTableDetails> DatabaseDataLake::getLightweightTablesIterator(
ContextPtr /*context_*/,
const FilterByNameFunction & filter_by_table_name,
bool skip_not_loaded) const
bool /*skip_not_loaded*/) const
{
Tables tables;

auto catalog = getCatalog();
DB::Names iceberg_tables;
std::vector<LightWeightTableDetails> result;

/// Do not throw here, because this might be, for example, a query to system.tables.
/// It must not fail on case of some datalake error.
Expand All @@ -722,84 +751,14 @@ DatabaseTablesIteratorPtr DatabaseDataLake::getLightweightTablesIterator(
tryLogCurrentException(__PRETTY_FUNCTION__);
}

auto & pool = Context::getGlobalContextInstance()->getIcebergCatalogThreadpool();

std::vector<std::shared_ptr<std::promise<StoragePtr>>> promises;
std::vector<std::future<StoragePtr>> futures;

for (const auto & table_name : iceberg_tables)
{
if (filter_by_table_name && !filter_by_table_name(table_name))
continue;

/// NOTE: There are one million of different ways how we can receive
/// weird response from different catalogs. tryGetTableImpl will not
/// throw only in case of expected errors, but sometimes we can receive
/// completely unexpected results for some objects which can be stored
/// in catalogs. But this function is used in SHOW TABLES query which
/// should return at least properly described tables. That is why we
/// have this try/catch here.
try
{
promises.emplace_back(std::make_shared<std::promise<StoragePtr>>());
futures.emplace_back(promises.back()->get_future());

pool.scheduleOrThrow(
[this, table_name, skip_not_loaded, context_, promise = promises.back()] mutable
{
StoragePtr storage = nullptr;
try
{
storage = tryGetTableImpl(table_name, context_, true, skip_not_loaded);
}
catch (...)
{
if (context_->getSettingsRef()[Setting::database_datalake_require_metadata_access])
{
auto error_code = getCurrentExceptionCode();
auto error_message = getCurrentExceptionMessage(true, false, true, true);
auto enhanced_message = fmt::format(
"Received error {} while fetching table metadata for existing table '{}'. "
"If you want this error to be ignored, use database_datalake_require_metadata_access=0. Error: {}",
error_code,
table_name,
error_message);
promise->set_exception(std::make_exception_ptr(Exception::createRuntime(
error_code,
enhanced_message)));
return;
}
else
tryLogCurrentException(log, fmt::format("Ignoring table {}", table_name));
}
promise->set_value(storage);
});
}
catch (...)
{
promises.back()->set_value(nullptr);
tryLogCurrentException(log, "Failed to schedule task into pool");
}
}

for (const auto & future : futures)
future.wait();

size_t future_index = 0;
for (const auto & table_name : iceberg_tables)
{
if (filter_by_table_name && !filter_by_table_name(table_name))
continue;

if (auto storage_ptr = futures[future_index].get(); storage_ptr != nullptr)
{
[[maybe_unused]] bool inserted = tables.emplace(table_name, storage_ptr).second;
chassert(inserted);
}
future_index++;
result.emplace_back(table_name);
}

return std::make_unique<DatabaseTablesSnapshotIterator>(tables, getDatabaseName());
return result;
}

ASTPtr DatabaseDataLake::getCreateDatabaseQueryImpl() const
Expand Down
2 changes: 1 addition & 1 deletion src/Databases/DataLake/DatabaseDataLake.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class DatabaseDataLake final : public IDatabase, WithContext
bool skip_not_loaded) const override;

/// skip_not_loaded flag ignores all non-iceberg tables
DatabaseTablesIteratorPtr getLightweightTablesIterator(
std::vector<LightWeightTableDetails> getLightweightTablesIterator(
ContextPtr context,
const FilterByNameFunction & filter_by_table_name,
bool skip_not_loaded) const override;
Expand Down
19 changes: 17 additions & 2 deletions src/Databases/IDatabase.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,13 @@ struct ParsedTablesMetadata;
struct QualifiedTableName;
class IRestoreCoordination;

/// This structure is returned when getLightweightTablesIterator is called
/// It contains basic details of the table, currently only the table name
struct LightWeightTableDetails
{
String name;
};

class IDatabaseTablesIterator
{
public:
Expand Down Expand Up @@ -271,9 +278,17 @@ class IDatabase : public std::enable_shared_from_this<IDatabase>

/// Same as above, but may return non-fully initialized StoragePtr objects which are not suitable for reading.
/// Useful for queries like "SHOW TABLES"
virtual DatabaseTablesIteratorPtr getLightweightTablesIterator(ContextPtr context, const FilterByNameFunction & filter_by_table_name = {}, bool skip_not_loaded = false) const /// NOLINT
virtual std::vector<LightWeightTableDetails> getLightweightTablesIterator(ContextPtr context, const FilterByNameFunction & filter_by_table_name = {}, bool skip_not_loaded = false) const /// NOLINT
{
return getTablesIterator(context, filter_by_table_name, skip_not_loaded);
std::vector<LightWeightTableDetails> result;

for (auto iterator = getTablesIterator(context, filter_by_table_name, skip_not_loaded); iterator->isValid(); iterator->next())
{
if (const auto & table = iterator->table())
result.emplace_back(iterator->name());
}

return result;
}

virtual DatabaseDetachedTablesSnapshotIteratorPtr getDetachedTablesIterator(
Expand Down
2 changes: 1 addition & 1 deletion src/Interpreters/InterpreterSystemQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1351,7 +1351,7 @@ bool InterpreterSystemQuery::dropStorageReplica(const String & query_replica, co

void InterpreterSystemQuery::dropStorageReplicasFromDatabase(const String & query_replica, DatabasePtr database)
{
for (auto iterator = database->getLightweightTablesIterator(getContext()); iterator->isValid(); iterator->next())
for (auto iterator = database->getTablesIterator(getContext()); iterator->isValid(); iterator->next())
dropStorageReplica(query_replica, iterator->table());

LOG_TRACE(log, "Dropped storage replica from {} of database {}", query_replica, backQuoteIfNeed(database->getDatabaseName()));
Expand Down
2 changes: 1 addition & 1 deletion src/Storages/System/StorageSystemColumns.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,7 @@ void ReadFromSystemColumns::initializePipeline(QueryPipelineBuilder & pipeline,
else
{
const DatabasePtr & database = databases.at(database_name);
for (auto iterator = database->getLightweightTablesIterator(context); iterator->isValid(); iterator->next())
for (auto iterator = database->getTablesIterator(context); iterator->isValid(); iterator->next())
{
if (const auto & table = iterator->table())
{
Expand Down
2 changes: 1 addition & 1 deletion src/Storages/System/StorageSystemCompletions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ void fillDataWithDatabasesTablesColumns(MutableColumns & res_columns, const Cont
res_columns[1]->insert(DATABASE_CONTEXT);
res_columns[2]->insertDefault();

for (auto iterator = database_ptr->getLightweightTablesIterator(context); iterator->isValid(); iterator->next())
for (auto iterator = database_ptr->getTablesIterator(context); iterator->isValid(); iterator->next())
{
const auto & table_name = iterator->name();
const auto & table = iterator->table();
Expand Down
2 changes: 1 addition & 1 deletion src/Storages/System/StorageSystemIcebergHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ void StorageSystemIcebergHistory::fillData([[maybe_unused]] MutableColumns & res
for (const auto & db: databases)
{
/// with last flag we are filtering out all non iceberg table
for (auto iterator = db.second->getLightweightTablesIterator(context_copy, {}, true); iterator->isValid(); iterator->next())
for (auto iterator = db.second->getTablesIterator(context_copy, {}, true); iterator->isValid(); iterator->next())
{
StoragePtr storage = iterator->table();

Expand Down
82 changes: 71 additions & 11 deletions src/Storages/System/StorageSystemTables.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,16 +122,29 @@ ColumnPtr getFilteredTables(
}
else
{
auto table_it = database->getLightweightTablesIterator(context,
/* filter_by_table_name */ {},
/* skip_not_loaded */ false);
for (; table_it->isValid(); table_it->next())
if (engine_column || uuid_column)
{
table_column->insert(table_it->name());
if (engine_column)
engine_column->insert(table_it->table()->getName());
if (uuid_column)
uuid_column->insert(table_it->table()->getStorageID().uuid);
auto table_it = database->getTablesIterator(context,
/* filter_by_table_name */ {},
/* skip_not_loaded */ false);
for (; table_it->isValid(); table_it->next())
{
table_column->insert(table_it->name());
if (engine_column)
engine_column->insert(table_it->table()->getName());
if (uuid_column)
uuid_column->insert(table_it->table()->getStorageID().uuid);
}
}
else
{
auto table_details = database->getLightweightTablesIterator(context,
/* filter_by_table_name */ {},
/* skip_not_loaded */ false);
for (const auto & table_detail : table_details)
{
table_column->insert(table_detail.name);
}
}
}
}
Expand Down Expand Up @@ -291,6 +304,39 @@ class TablesBlockSource : public ISource
}
}


size_t fillTableNamesOnly(MutableColumns & res_columns)
{
auto table_details = database->getLightweightTablesIterator(context,
/* filter_by_table_name */ {},
/* skip_not_loaded */ false);

size_t count = 0;

const auto access = context->getAccess();
for (const auto & table_detail: table_details)
{
if (!tables.contains(table_detail.name))
continue;

size_t src_index = 0;
size_t res_index = 0;

if (!access->isGranted(AccessType::SHOW_TABLES, database_name, table_detail.name))
continue;

if (columns_mask[src_index++])
res_columns[res_index++]->insert(database_name);

if (columns_mask[src_index++])
res_columns[res_index++]->insert(table_detail.name);

++count;
}
++database_idx;
return count;
}

Chunk generate() override
{
if (done)
Expand Down Expand Up @@ -449,8 +495,23 @@ class TablesBlockSource : public ISource

const bool need_to_check_access_for_tables = need_to_check_access_for_databases && !access->isGranted(AccessType::SHOW_TABLES, database_name);

/// This is for queries similar to 'show tables', where only name of the table is needed
auto needed_columns = getPort().getHeader().getColumnsWithTypeAndName();
bool needs_one_column = (needed_columns.size() == 1 && needed_columns[0].name == "name");

bool needs_two_columns = (needed_columns.size() == 2 &&
((needed_columns[0].name == "name" && needed_columns[1].name == "database") ||
(needed_columns[0].name == "database" && needed_columns[1].name == "name")));

if ((needs_one_column || needs_two_columns) && !need_to_check_access_for_tables)
{
size_t rows_added = fillTableNamesOnly(res_columns);
rows_count += rows_added;
continue;
}

if (!tables_it || !tables_it->isValid())
tables_it = database->getLightweightTablesIterator(context,
tables_it = database->getTablesIterator(context,
/* filter_by_table_name */ {},
/* skip_not_loaded */ false);

Expand Down Expand Up @@ -923,7 +984,6 @@ class TablesBlockSource : public ISource
}
}
}

UInt64 num_rows = res_columns.at(0)->size();
return Chunk(std::move(res_columns), num_rows);
}
Expand Down
Loading
Loading