Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/Common/FailPoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,8 @@ static struct InitFiu
REGULAR(rmt_delay_execute_drop_range) \
REGULAR(rmt_delay_commit_part) \
ONCE(local_object_storage_network_error_during_remove) \
ONCE(parallel_replicas_check_read_mode_always)
ONCE(parallel_replicas_check_read_mode_always)\
REGULAR(lightweight_show_tables)

namespace FailPoints
{
Expand Down
119 changes: 39 additions & 80 deletions src/Databases/DataLake/DatabaseDataLake.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTDataType.h>
#include <Common/FailPoint.h>

namespace DB
{
Expand Down Expand Up @@ -97,6 +98,11 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}

namespace FailPoints
{
extern const char lightweight_show_tables[];
}

DatabaseDataLake::DatabaseDataLake(
const std::string & database_name_,
const std::string & url_,
Expand Down Expand Up @@ -448,6 +454,12 @@ StoragePtr DatabaseDataLake::tryGetTableImpl(const String & name, ContextPtr con
auto catalog = getCatalog();
auto table_metadata = DataLake::TableMetadata().withSchema().withLocation().withDataLakeSpecificProperties();

/// This is added to test that lightweight queries like 'SHOW TABLES' dont end up fetching the table
fiu_do_on(FailPoints::lightweight_show_tables,
{
std::this_thread::sleep_for(std::chrono::seconds(10));
});

const bool with_vended_credentials = settings[DatabaseDataLakeSetting::vended_credentials].value;
if (!lightweight && with_vended_credentials)
table_metadata = table_metadata.withStorageCredentials();
Expand Down Expand Up @@ -665,15 +677,33 @@ DatabaseTablesIteratorPtr DatabaseDataLake::getTablesIterator(
pool.scheduleOrThrow(
[this, table_name, skip_not_loaded, context_, promise=promises.back()]() mutable
{
StoragePtr storage = nullptr;
try
{
auto storage = tryGetTableImpl(table_name, context_, false, skip_not_loaded);
promise->set_value(storage);
LOG_INFO(log, "Get table information for table {}", table_name);
storage = tryGetTableImpl(table_name, context_, false, skip_not_loaded);
}
catch (...)
{
promise->set_exception(std::current_exception());
if (context_->getSettingsRef()[Setting::database_datalake_require_metadata_access])
{
auto error_code = getCurrentExceptionCode();
auto error_message = getCurrentExceptionMessage(true, false, true, true);
auto enhanced_message = fmt::format(
"Received error {} while fetching table metadata for existing table '{}'. "
"If you want this error to be ignored, use database_datalake_require_metadata_access=0. Error: {}",
error_code,
table_name,
error_message);
promise->set_exception(std::make_exception_ptr(Exception::createRuntime(
error_code,
enhanced_message)));
return;
}
else
tryLogCurrentException(log, fmt::format("Ignoring table {}", table_name));
}
promise->set_value(storage);
});
}
catch (...)
Expand Down Expand Up @@ -701,15 +731,14 @@ DatabaseTablesIteratorPtr DatabaseDataLake::getTablesIterator(
return std::make_unique<DatabaseTablesSnapshotIterator>(tables, getDatabaseName());
}

DatabaseTablesIteratorPtr DatabaseDataLake::getLightweightTablesIterator(
ContextPtr context_,
std::vector<LightWeightTableDetails> DatabaseDataLake::getLightweightTablesIterator(
ContextPtr /*context_*/,
const FilterByNameFunction & filter_by_table_name,
bool skip_not_loaded) const
bool /*skip_not_loaded*/) const
{
Tables tables;

auto catalog = getCatalog();
DB::Names iceberg_tables;
std::vector<LightWeightTableDetails> result;

/// Do not throw here, because this might be, for example, a query to system.tables.
/// It must not fail on case of some datalake error.
Expand All @@ -722,84 +751,14 @@ DatabaseTablesIteratorPtr DatabaseDataLake::getLightweightTablesIterator(
tryLogCurrentException(__PRETTY_FUNCTION__);
}

auto & pool = Context::getGlobalContextInstance()->getIcebergCatalogThreadpool();

std::vector<std::shared_ptr<std::promise<StoragePtr>>> promises;
std::vector<std::future<StoragePtr>> futures;

for (const auto & table_name : iceberg_tables)
{
if (filter_by_table_name && !filter_by_table_name(table_name))
continue;

/// NOTE: There are one million of different ways how we can receive
/// weird response from different catalogs. tryGetTableImpl will not
/// throw only in case of expected errors, but sometimes we can receive
/// completely unexpected results for some objects which can be stored
/// in catalogs. But this function is used in SHOW TABLES query which
/// should return at least properly described tables. That is why we
/// have this try/catch here.
try
{
promises.emplace_back(std::make_shared<std::promise<StoragePtr>>());
futures.emplace_back(promises.back()->get_future());

pool.scheduleOrThrow(
[this, table_name, skip_not_loaded, context_, promise = promises.back()] mutable
{
StoragePtr storage = nullptr;
try
{
storage = tryGetTableImpl(table_name, context_, true, skip_not_loaded);
}
catch (...)
{
if (context_->getSettingsRef()[Setting::database_datalake_require_metadata_access])
{
auto error_code = getCurrentExceptionCode();
auto error_message = getCurrentExceptionMessage(true, false, true, true);
auto enhanced_message = fmt::format(
"Received error {} while fetching table metadata for existing table '{}'. "
"If you want this error to be ignored, use database_datalake_require_metadata_access=0. Error: {}",
error_code,
table_name,
error_message);
promise->set_exception(std::make_exception_ptr(Exception::createRuntime(
error_code,
enhanced_message)));
return;
}
else
tryLogCurrentException(log, fmt::format("Ignoring table {}", table_name));
}
promise->set_value(storage);
});
}
catch (...)
{
promises.back()->set_value(nullptr);
tryLogCurrentException(log, "Failed to schedule task into pool");
}
}

for (const auto & future : futures)
future.wait();

size_t future_index = 0;
for (const auto & table_name : iceberg_tables)
{
if (filter_by_table_name && !filter_by_table_name(table_name))
continue;

if (auto storage_ptr = futures[future_index].get(); storage_ptr != nullptr)
{
[[maybe_unused]] bool inserted = tables.emplace(table_name, storage_ptr).second;
chassert(inserted);
}
future_index++;
result.emplace_back(table_name);
}

return std::make_unique<DatabaseTablesSnapshotIterator>(tables, getDatabaseName());
return result;
}

ASTPtr DatabaseDataLake::getCreateDatabaseQueryImpl() const
Expand Down
2 changes: 1 addition & 1 deletion src/Databases/DataLake/DatabaseDataLake.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class DatabaseDataLake final : public IDatabase, WithContext
bool skip_not_loaded) const override;

/// skip_not_loaded flag ignores all non-iceberg tables
DatabaseTablesIteratorPtr getLightweightTablesIterator(
std::vector<LightWeightTableDetails> getLightweightTablesIterator(
ContextPtr context,
const FilterByNameFunction & filter_by_table_name,
bool skip_not_loaded) const override;
Expand Down
19 changes: 17 additions & 2 deletions src/Databases/IDatabase.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,13 @@ struct ParsedTablesMetadata;
struct QualifiedTableName;
class IRestoreCoordination;

/// This structure is returned when getLightweightTablesIterator is called
/// It contains basic details of the table, currently only the table name
struct LightWeightTableDetails
{
String name;
};

class IDatabaseTablesIterator
{
public:
Expand Down Expand Up @@ -271,9 +278,17 @@ class IDatabase : public std::enable_shared_from_this<IDatabase>

/// Same as above, but may return non-fully initialized StoragePtr objects which are not suitable for reading.
/// Useful for queries like "SHOW TABLES"
virtual DatabaseTablesIteratorPtr getLightweightTablesIterator(ContextPtr context, const FilterByNameFunction & filter_by_table_name = {}, bool skip_not_loaded = false) const /// NOLINT
virtual std::vector<LightWeightTableDetails> getLightweightTablesIterator(ContextPtr context, const FilterByNameFunction & filter_by_table_name = {}, bool skip_not_loaded = false) const /// NOLINT
{
return getTablesIterator(context, filter_by_table_name, skip_not_loaded);
std::vector<LightWeightTableDetails> result;

for (auto iterator = getTablesIterator(context, filter_by_table_name, skip_not_loaded); iterator->isValid(); iterator->next())
{
if (const auto & table = iterator->table())
result.emplace_back(iterator->name());
}

return result;
}

virtual DatabaseDetachedTablesSnapshotIteratorPtr getDetachedTablesIterator(
Expand Down
2 changes: 1 addition & 1 deletion src/Interpreters/InterpreterSystemQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1351,7 +1351,7 @@ bool InterpreterSystemQuery::dropStorageReplica(const String & query_replica, co

void InterpreterSystemQuery::dropStorageReplicasFromDatabase(const String & query_replica, DatabasePtr database)
{
for (auto iterator = database->getLightweightTablesIterator(getContext()); iterator->isValid(); iterator->next())
for (auto iterator = database->getTablesIterator(getContext()); iterator->isValid(); iterator->next())
dropStorageReplica(query_replica, iterator->table());

LOG_TRACE(log, "Dropped storage replica from {} of database {}", query_replica, backQuoteIfNeed(database->getDatabaseName()));
Expand Down
2 changes: 1 addition & 1 deletion src/Storages/System/StorageSystemColumns.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,7 @@ void ReadFromSystemColumns::initializePipeline(QueryPipelineBuilder & pipeline,
else
{
const DatabasePtr & database = databases.at(database_name);
for (auto iterator = database->getLightweightTablesIterator(context); iterator->isValid(); iterator->next())
for (auto iterator = database->getTablesIterator(context); iterator->isValid(); iterator->next())
{
if (const auto & table = iterator->table())
{
Expand Down
2 changes: 1 addition & 1 deletion src/Storages/System/StorageSystemCompletions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ void fillDataWithDatabasesTablesColumns(MutableColumns & res_columns, const Cont
res_columns[1]->insert(DATABASE_CONTEXT);
res_columns[2]->insertDefault();

for (auto iterator = database_ptr->getLightweightTablesIterator(context); iterator->isValid(); iterator->next())
for (auto iterator = database_ptr->getTablesIterator(context); iterator->isValid(); iterator->next())
{
const auto & table_name = iterator->name();
const auto & table = iterator->table();
Expand Down
2 changes: 1 addition & 1 deletion src/Storages/System/StorageSystemIcebergHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ void StorageSystemIcebergHistory::fillData([[maybe_unused]] MutableColumns & res
for (const auto & db: databases)
{
/// with last flag we are filtering out all non iceberg table
for (auto iterator = db.second->getLightweightTablesIterator(context_copy, {}, true); iterator->isValid(); iterator->next())
for (auto iterator = db.second->getTablesIterator(context_copy, {}, true); iterator->isValid(); iterator->next())
{
StoragePtr storage = iterator->table();

Expand Down
82 changes: 71 additions & 11 deletions src/Storages/System/StorageSystemTables.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,16 +122,29 @@ ColumnPtr getFilteredTables(
}
else
{
auto table_it = database->getLightweightTablesIterator(context,
/* filter_by_table_name */ {},
/* skip_not_loaded */ false);
for (; table_it->isValid(); table_it->next())
if (engine_column || uuid_column)
{
table_column->insert(table_it->name());
if (engine_column)
engine_column->insert(table_it->table()->getName());
if (uuid_column)
uuid_column->insert(table_it->table()->getStorageID().uuid);
auto table_it = database->getTablesIterator(context,
/* filter_by_table_name */ {},
/* skip_not_loaded */ false);
for (; table_it->isValid(); table_it->next())
{
table_column->insert(table_it->name());
if (engine_column)
engine_column->insert(table_it->table()->getName());
if (uuid_column)
uuid_column->insert(table_it->table()->getStorageID().uuid);
}
}
else
{
auto table_details = database->getLightweightTablesIterator(context,
/* filter_by_table_name */ {},
/* skip_not_loaded */ false);
for (const auto & table_detail : table_details)
{
table_column->insert(table_detail.name);
}
}
}
}
Expand Down Expand Up @@ -291,6 +304,39 @@ class TablesBlockSource : public ISource
}
}


size_t fillTableNamesOnly(MutableColumns & res_columns)
{
auto table_details = database->getLightweightTablesIterator(context,
/* filter_by_table_name */ {},
/* skip_not_loaded */ false);

size_t count = 0;

const auto access = context->getAccess();
for (const auto & table_detail: table_details)
{
if (!tables.contains(table_detail.name))
continue;

size_t src_index = 0;
size_t res_index = 0;

if (!access->isGranted(AccessType::SHOW_TABLES, database_name, table_detail.name))
continue;

if (columns_mask[src_index++])
res_columns[res_index++]->insert(database_name);

if (columns_mask[src_index++])
res_columns[res_index++]->insert(table_detail.name);

++count;
}
++database_idx;
return count;
}

Chunk generate() override
{
if (done)
Expand Down Expand Up @@ -449,8 +495,23 @@ class TablesBlockSource : public ISource

const bool need_to_check_access_for_tables = need_to_check_access_for_databases && !access->isGranted(AccessType::SHOW_TABLES, database_name);

/// This is for queries similar to 'show tables', where only name of the table is needed
auto needed_columns = getPort().getHeader().getColumnsWithTypeAndName();
bool needs_one_column = (needed_columns.size() == 1 && needed_columns[0].name == "name");

bool needs_two_columns = (needed_columns.size() == 2 &&
((needed_columns[0].name == "name" && needed_columns[1].name == "database") ||
(needed_columns[0].name == "database" && needed_columns[1].name == "name")));

if ((needs_one_column || needs_two_columns) && !need_to_check_access_for_tables)
{
size_t rows_added = fillTableNamesOnly(res_columns);
rows_count += rows_added;
continue;
}

if (!tables_it || !tables_it->isValid())
tables_it = database->getLightweightTablesIterator(context,
tables_it = database->getTablesIterator(context,
/* filter_by_table_name */ {},
/* skip_not_loaded */ false);

Expand Down Expand Up @@ -923,7 +984,6 @@ class TablesBlockSource : public ISource
}
}
}

UInt64 num_rows = res_columns.at(0)->size();
return Chunk(std::move(res_columns), num_rows);
}
Expand Down
Loading
Loading