Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/Databases/DataLake/RestCatalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -938,9 +938,9 @@ bool RestCatalog::updateMetadata(const String & namespace_name, const String & t
{
sendRequest(endpoint, request_body);
}
catch (const DB::HTTPException &)
catch (const DB::HTTPException & ex)
{
return false;
throw DB::Exception(DB::ErrorCodes::DATALAKE_DATABASE_ERROR, "Failed to update metadata via REST: {}", ex.displayText());
}
return true;
}
Expand Down
6 changes: 6 additions & 0 deletions src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,12 @@ class IDataLakeMetadata : boost::noncopyable

virtual void modifyFormatSettings(FormatSettings &, const Context &) const {}

virtual bool supportsTruncate() const { return false; }
virtual void truncate(ContextPtr /*context*/, std::shared_ptr<DataLake::ICatalog> /*catalog*/, const StorageID & /*storage_id*/)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Truncation is not supported by {} metadata", getName());
}

static constexpr bool supportsTotalRows() { return false; }
virtual std::optional<size_t> totalRows(ContextPtr) const { return {}; }
static constexpr bool supportsTotalBytes() { return false; }
Expand Down
2 changes: 2 additions & 0 deletions src/Storages/ObjectStorage/DataLakes/Iceberg/Constant.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,9 @@ DEFINE_ICEBERG_FIELD_ALIAS(partition_spec, partition-spec);
DEFINE_ICEBERG_FIELD_ALIAS(partition_specs, partition-specs);
DEFINE_ICEBERG_FIELD_ALIAS(spec_id, spec-id);
DEFINE_ICEBERG_FIELD_ALIAS(added_records, added-records);
DEFINE_ICEBERG_FIELD_ALIAS(deleted_records, deleted-records);
DEFINE_ICEBERG_FIELD_ALIAS(added_data_files, added-data-files);
DEFINE_ICEBERG_FIELD_ALIAS(deleted_data_files, deleted-data-files);
DEFINE_ICEBERG_FIELD_ALIAS(added_delete_files, added-delete-files);
DEFINE_ICEBERG_FIELD_ALIAS(added_position_delete_files, added-position-delete-files);
DEFINE_ICEBERG_FIELD_ALIAS(added_position_deletes, added-position-deletes);
Expand Down
94 changes: 93 additions & 1 deletion src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@
#include <Core/UUID.h>
#include <DataTypes/DataTypeSet.h>
#include <Formats/FormatFilterInfo.h>
#include <Formats/FormatParserSharedResources.h>
#include <Formats/ReadSchemaUtils.h>
#include <Functions/FunctionFactory.h>
#include <Storages/ObjectStorage/DataLakes/Iceberg/MetadataGenerator.h>
#include <Functions/IFunctionAdaptors.h>
#include <Functions/tuple.h>
#include <Processors/Formats/ISchemaReader.h>
Expand Down Expand Up @@ -100,6 +100,7 @@ extern const int NOT_IMPLEMENTED;
extern const int ICEBERG_SPECIFICATION_VIOLATION;
extern const int TABLE_ALREADY_EXISTS;
extern const int SUPPORT_IS_DISABLED;
extern const int INCORRECT_DATA;
}

namespace Setting
Expand Down Expand Up @@ -531,6 +532,97 @@ void IcebergMetadata::mutate(
);
}

void IcebergMetadata::truncate(ContextPtr context, std::shared_ptr<DataLake::ICatalog> catalog, const StorageID & storage_id)
{
if (!context->getSettingsRef()[Setting::allow_experimental_insert_into_iceberg].value)
throw Exception(
ErrorCodes::SUPPORT_IS_DISABLED,
"Iceberg truncate is experimental. "
"To allow its usage, enable setting allow_experimental_insert_into_iceberg");

// Bug 1 fix: REMOVE the isTransactional() guard entirely.
// REST/transactional catalogs are the primary target of this feature.

auto [actual_data_snapshot, actual_table_state_snapshot] = getRelevantState(context);
auto metadata_object = getMetadataJSONObject(
actual_table_state_snapshot.metadata_file_path,
object_storage,
persistent_components.metadata_cache,
context,
log,
persistent_components.metadata_compression_method,
persistent_components.table_uuid);

// Bug 4 fix: use -1 as the Iceberg "no parent" sentinel
Int64 parent_snapshot_id = actual_table_state_snapshot.snapshot_id.value_or(-1);

auto config_path = persistent_components.table_path;
if (!config_path.starts_with('/')) config_path = '/' + config_path;
if (!config_path.ends_with('/')) config_path += "/";

// Bug 3 fix: restore isTransactional flag in FileNamesGenerator
bool is_transactional = (catalog != nullptr && catalog->isTransactional());
FileNamesGenerator filename_generator;
// Transactional catalogs (REST) require full S3 URIs — force location-based path.
// Non-transactional respects the write_full_path_in_iceberg_metadata setting.
if (is_transactional || context->getSettingsRef()[Setting::write_full_path_in_iceberg_metadata])
{
String location = metadata_object->getValue<String>(Iceberg::f_location);
if (!location.ends_with("/")) location += "/";
filename_generator = FileNamesGenerator(
location, config_path, is_transactional,
persistent_components.metadata_compression_method, write_format);
}
else
{
filename_generator = FileNamesGenerator(
config_path, config_path, false,
persistent_components.metadata_compression_method, write_format);
}

Int32 new_metadata_version = actual_table_state_snapshot.metadata_version + 1;
filename_generator.setVersion(new_metadata_version);

auto [metadata_name, storage_metadata_name] = filename_generator.generateMetadataName();

auto [new_snapshot, manifest_list_name, storage_manifest_list_name] = MetadataGenerator(metadata_object).generateNextMetadata(
filename_generator, metadata_name, parent_snapshot_id,
0, 0, 0, 0, 0, 0, std::nullopt, std::nullopt, /*is_truncate=*/true);

auto write_settings = context->getWriteSettings();
auto buf = object_storage->writeObject(
StoredObject(storage_manifest_list_name),
WriteMode::Rewrite, std::nullopt,
DBMS_DEFAULT_BUFFER_SIZE, write_settings);

generateManifestList(filename_generator, metadata_object, object_storage, context,
{}, new_snapshot, 0, *buf, Iceberg::FileContentType::DATA, /*use_previous_snapshots=*/false);
buf->finalize();

String metadata_content = dumpMetadataObjectToString(metadata_object);
writeMessageToFile(metadata_content, storage_metadata_name, object_storage,
context, "*", "", persistent_components.metadata_compression_method);

// Bug 2 fix: restore the catalog commit, matching the pattern from IcebergWrites.cpp
if (catalog)
{
// Build the catalog-visible path (blob URI for transactional, bare path otherwise)
String catalog_filename = metadata_name;
if (is_transactional)
{
const String blob_storage_type_name = Poco::toLower(String(magic_enum::enum_name(object_storage->getType())));
const auto blob_storage_namespace_name = persistent_components.table_path;
catalog_filename = blob_storage_type_name + "://" + blob_storage_namespace_name + "/" + metadata_name;
}

const auto & [namespace_name, table_name] = DataLake::parseTableName(storage_id.getTableName());
// Pass metadata_object (not new_snapshot) — matches the fix already applied in
// IcebergWrites.cpp and Mutations.cpp
if (!catalog->updateMetadata(namespace_name, table_name, catalog_filename, new_snapshot))
throw Exception(ErrorCodes::INCORRECT_DATA, "Failed to commit Iceberg truncate update to catalog.");
}
}

void IcebergMetadata::checkMutationIsPossible(const MutationCommands & commands)
{
for (const auto & command : commands)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ class IcebergMetadata : public IDataLakeMetadata
bool supportsUpdate() const override { return true; }
bool supportsWrites() const override { return true; }
bool supportsParallelInsert() const override { return true; }
bool supportsTruncate() const override { return true; }

void truncate(ContextPtr context, std::shared_ptr<DataLake::ICatalog> catalog, const StorageID & storage_id) override;

IcebergHistory getHistory(ContextPtr local_context) const;

Expand Down
47 changes: 47 additions & 0 deletions src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergWrites.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,53 @@ void generateManifestList(
else
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unknown iceberg version {}", version);

// For empty manifest list (e.g. TRUNCATE), write a valid Avro container
// file manually so we can embed the full schema JSON with field-ids intact,
// without triggering the DataFileWriter constructor's eager writeHeader()
// which commits encoder state before we can override avro.schema.
if (manifest_entry_names.empty() && !use_previous_snapshots)
{
auto write_avro_long = [](WriteBuffer & out, int64_t val)
{
uint64_t n = (static_cast<uint64_t>(val) << 1) ^ static_cast<uint64_t>(val >> 63);
while (n & ~0x7fULL)
{
char c = static_cast<char>((n & 0x7f) | 0x80);
out.write(&c, 1);
n >>= 7;
}
char c = static_cast<char>(n);
out.write(&c, 1);
};

auto write_avro_bytes = [&](WriteBuffer & out, const String & s)
{
write_avro_long(out, static_cast<int64_t>(s.size()));
out.write(s.data(), s.size());
};

// Avro Object Container File header
buf.write("Obj\x01", 4);

// Metadata map: 2 entries (codec + schema)
write_avro_long(buf, 2);
write_avro_bytes(buf, "avro.codec");
write_avro_bytes(buf, "null");
write_avro_bytes(buf, "avro.schema");
write_avro_bytes(buf, schema_representation); // full JSON, field-ids intact

// End of metadata map
write_avro_long(buf, 0);

// Sync marker (16 zero bytes — valid, no data blocks follow)
static const char sync_marker[16] = {};
buf.write(sync_marker, 16);

// No data blocks for empty manifest list
buf.finalize();
return;
}

auto schema = avro::compileJsonSchemaFromString(schema_representation); // NOLINT

auto adapter = std::make_unique<OutputStreamWriteBufferAdapter>(buf);
Expand Down
21 changes: 18 additions & 3 deletions src/Storages/ObjectStorage/DataLakes/Iceberg/MetadataGenerator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,8 @@ MetadataGenerator::NextMetadataResult MetadataGenerator::generateNextMetadata(
Int32 added_delete_files,
Int32 num_deleted_rows,
std::optional<Int64> user_defined_snapshot_id,
std::optional<Int64> user_defined_timestamp)
std::optional<Int64> user_defined_timestamp,
bool is_truncate)
{
int format_version = metadata_object->getValue<Int32>(Iceberg::f_format_version);
Poco::JSON::Object::Ptr new_snapshot = new Poco::JSON::Object;
Expand All @@ -137,7 +138,16 @@ MetadataGenerator::NextMetadataResult MetadataGenerator::generateNextMetadata(

auto parent_snapshot = getParentSnapshot(parent_snapshot_id);
Poco::JSON::Object::Ptr summary = new Poco::JSON::Object;
if (num_deleted_rows == 0)
if (is_truncate)
{
summary->set(Iceberg::f_operation, Iceberg::f_overwrite);
Int32 prev_total_records = parent_snapshot && parent_snapshot->has(Iceberg::f_summary) && parent_snapshot->getObject(Iceberg::f_summary)->has(Iceberg::f_total_records) ? std::stoi(parent_snapshot->getObject(Iceberg::f_summary)->getValue<String>(Iceberg::f_total_records)) : 0;
Int32 prev_total_data_files = parent_snapshot && parent_snapshot->has(Iceberg::f_summary) && parent_snapshot->getObject(Iceberg::f_summary)->has(Iceberg::f_total_data_files) ? std::stoi(parent_snapshot->getObject(Iceberg::f_summary)->getValue<String>(Iceberg::f_total_data_files)) : 0;

summary->set(Iceberg::f_deleted_records, std::to_string(prev_total_records));
summary->set(Iceberg::f_deleted_data_files, std::to_string(prev_total_data_files));
}
else if (num_deleted_rows == 0)
{
summary->set(Iceberg::f_operation, Iceberg::f_append);
summary->set(Iceberg::f_added_data_files, std::to_string(added_files));
Expand All @@ -157,7 +167,12 @@ MetadataGenerator::NextMetadataResult MetadataGenerator::generateNextMetadata(

auto sum_with_parent_snapshot = [&](const char * field_name, Int32 snapshot_value)
{
Int32 prev_value = parent_snapshot ? std::stoi(parent_snapshot->getObject(Iceberg::f_summary)->getValue<String>(field_name)) : 0;
if (is_truncate)
{
summary->set(field_name, std::to_string(0));
return;
}
Int32 prev_value = parent_snapshot && parent_snapshot->has(Iceberg::f_summary) && parent_snapshot->getObject(Iceberg::f_summary)->has(field_name) ? std::stoi(parent_snapshot->getObject(Iceberg::f_summary)->getValue<String>(field_name)) : 0;
summary->set(field_name, std::to_string(prev_value + snapshot_value));
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ class MetadataGenerator
Int32 added_delete_files,
Int32 num_deleted_rows,
std::optional<Int64> user_defined_snapshot_id = std::nullopt,
std::optional<Int64> user_defined_timestamp = std::nullopt);
std::optional<Int64> user_defined_timestamp = std::nullopt,
bool is_truncate = false);

void generateAddColumnMetadata(const String & column_name, DataTypePtr type);
void generateDropColumnMetadata(const String & column_name);
Expand Down
2 changes: 1 addition & 1 deletion src/Storages/ObjectStorage/DataLakes/Iceberg/Mutations.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -517,7 +517,7 @@ static bool writeMetadataFiles(
catalog_filename = blob_storage_type_name + "://" + blob_storage_namespace_name + "/" + metadata_name;

const auto & [namespace_name, table_name] = DataLake::parseTableName(table_id.getTableName());
if (!catalog->updateMetadata(namespace_name, table_name, catalog_filename, new_snapshot))
if (!catalog->updateMetadata(namespace_name, table_name, catalog_filename, metadata))
{
cleanup();
return false;
Expand Down
10 changes: 7 additions & 3 deletions src/Storages/ObjectStorage/StorageObjectStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -625,7 +625,7 @@ void StorageObjectStorage::commitExportPartitionTransaction(const String & trans
void StorageObjectStorage::truncate(
const ASTPtr & /* query */,
const StorageMetadataPtr & /* metadata_snapshot */,
ContextPtr /* context */,
ContextPtr context,
TableExclusiveLockHolder & /* table_holder */)
{
const auto path = configuration->getRawPath();
Expand All @@ -639,8 +639,12 @@ void StorageObjectStorage::truncate(

if (configuration->isDataLakeConfiguration())
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED,
"Truncate is not supported for data lake engine");
auto * data_lake_metadata = getExternalMetadata(context);
if (!data_lake_metadata || !data_lake_metadata->supportsTruncate())
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Truncate is not supported for this data lake engine");

data_lake_metadata->truncate(context, catalog, getStorageID());
return;
}

if (path.hasGlobs())
Expand Down
Loading
Loading