Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
de68832
[WIP] (DONOTMERGE) Feature: Support TRUNCATE TABLE for Iceberg engine
Mar 16, 2026
601250f
Fix: Address code review feedback and drop stateless test
Mar 17, 2026
27a8467
Fix: Properly escape Iceberg namespace in DataLakeCatalog queries
Mar 18, 2026
84d76f1
Fix: Add missing S3 credentials to ClickHouse DataLakeCatalog settings
Mar 18, 2026
c81fc7f
Fix: Remove invalid DataLakeCatalog S3 settings
Mar 19, 2026
876577b
Fix: Restore minio_secret_key variable for DataLakeCatalog auth
Mar 19, 2026
cc1c7db
Merge branch 'antalya-26.1' into feature/iceberg-truncate
il9ue Mar 19, 2026
5a7e014
Fix: Dynamically determine Iceberg format version to prevent Avro seg…
Mar 19, 2026
2148724
feat(iceberg): Implement TRUNCATE TABLE for Iceberg Engine (REST cata…
Mar 20, 2026
4681e4e
Merge branch 'antalya-26.1' into feature/iceberg-truncate
il9ue Mar 20, 2026
12657bb
fix(iceberg): pass new_snapshot to updateMetadata in IcebergStorageSink
Mar 21, 2026
bd1d16e
fix(iceberg): restore return false in RestCatalog::updateMetadata
Mar 24, 2026
11810d8
Merge branch 'antalya-26.1' into feature/iceberg-truncate
il9ue Mar 25, 2026
2b66954
fix(iceberg): revert Mutations.cpp updateMetadata to pass new_snapshot
Mar 25, 2026
cf85627
refactor(iceberg): add comment explaining Avro zigzag encoding in gen…
Mar 26, 2026
e70e425
Merge branch 'antalya-26.1' into feature/iceberg-truncate
il9ue Mar 26, 2026
4cfc411
refactor(iceberg): address code review feedback on TRUNCATE implement…
Mar 27, 2026
47c8acc
Merge branch 'antalya-26.1' into feature/iceberg-truncate
il9ue Mar 27, 2026
2155e32
Merge branch 'antalya-26.1' into feature/iceberg-truncate
il9ue Mar 27, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,12 @@ class IDataLakeMetadata : boost::noncopyable

virtual void modifyFormatSettings(FormatSettings &, const Context &) const {}

virtual bool supportsTruncate() const { return false; }
virtual void truncate(ContextPtr /*context*/, std::shared_ptr<DataLake::ICatalog> /*catalog*/, const StorageID & /*storage_id*/)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Truncation is not supported by {} metadata", getName());
}

static constexpr bool supportsTotalRows() { return false; }
virtual std::optional<size_t> totalRows(ContextPtr) const { return {}; }
static constexpr bool supportsTotalBytes() { return false; }
Expand Down
82 changes: 81 additions & 1 deletion src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@
#include <Core/UUID.h>
#include <DataTypes/DataTypeSet.h>
#include <Formats/FormatFilterInfo.h>
#include <Formats/FormatParserSharedResources.h>
#include <Formats/ReadSchemaUtils.h>
#include <Functions/FunctionFactory.h>
#include <Storages/ObjectStorage/DataLakes/Iceberg/MetadataGenerator.h>
#include <Functions/IFunctionAdaptors.h>
#include <Functions/tuple.h>
#include <Processors/Formats/ISchemaReader.h>
Expand Down Expand Up @@ -531,6 +531,86 @@ void IcebergMetadata::mutate(
);
}

void IcebergMetadata::truncate(ContextPtr context, std::shared_ptr<DataLake::ICatalog> catalog, const StorageID & storage_id)
{
if (!context->getSettingsRef()[Setting::allow_experimental_insert_into_iceberg].value)
{
throw Exception(
ErrorCodes::SUPPORT_IS_DISABLED,
"Iceberg truncate is experimental. "
"To allow its usage, enable setting allow_experimental_insert_into_iceberg");
}

auto [actual_data_snapshot, actual_table_state_snapshot] = getRelevantState(context);
auto metadata_object = getMetadataJSONObject(
actual_table_state_snapshot.metadata_file_path,
object_storage,
persistent_components.metadata_cache,
context,
log,
persistent_components.metadata_compression_method,
persistent_components.table_uuid);

Int64 parent_snapshot_id = actual_table_state_snapshot.snapshot_id.value_or(0);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Keep no-parent sentinel when table has no snapshot

Using value_or(0) fabricates parent snapshot id 0 for tables that currently have no snapshot. Catalog commits then assert main is at snapshot 0 instead of using the existing no-parent sentinel (-1 used in other Iceberg write paths), so truncating a freshly created empty table can fail with optimistic-lock checks.

Useful? React with 👍 / 👎.

auto config_path = persistent_components.table_path;
if (config_path.empty() || config_path.back() != '/')
config_path += "/";
if (!config_path.starts_with('/'))
config_path = '/' + config_path;

FileNamesGenerator filename_generator;
if (!context->getSettingsRef()[Setting::write_full_path_in_iceberg_metadata])
{
filename_generator = FileNamesGenerator(
config_path, config_path, (catalog != nullptr && catalog->isTransactional()), persistent_components.metadata_compression_method, write_format);
}
else
{
auto bucket = metadata_object->getValue<String>(Iceberg::f_location);
if (bucket.empty() || bucket.back() != '/')
bucket += "/";
filename_generator = FileNamesGenerator(
bucket, config_path, (catalog != nullptr && catalog->isTransactional()), persistent_components.metadata_compression_method, write_format);
}

Int32 new_metadata_version = actual_table_state_snapshot.metadata_version + 1;
filename_generator.setVersion(new_metadata_version);
auto [metadata_name, storage_metadata_name] = filename_generator.generateMetadataName();

auto [new_snapshot, manifest_list_name, storage_manifest_list_name] = MetadataGenerator(metadata_object).generateNextMetadata(
filename_generator, metadata_name, parent_snapshot_id,
/* added_files */ 0, /* added_records */ 0, /* added_files_size */ 0,
/* num_partitions */ 0, /* added_delete_files */ 0, /* num_deleted_rows */ 0);

// generate manifest list with 0 manifest files
int format_version = 1;
if (metadata_object->has("format-version"))
format_version = metadata_object->getValue<int>("format-version");
bool is_v2 = (format_version == 2);

// generate manifest list with 0 manifest files
auto write_settings = context->getWriteSettings();
auto buf = object_storage->writeObject(
StoredObject(storage_manifest_list_name),
WriteMode::Rewrite,
std::nullopt,
DBMS_DEFAULT_BUFFER_SIZE,
write_settings
);
generateManifestList(filename_generator, metadata_object, object_storage, context, {}, new_snapshot, 0, *buf, Iceberg::FileContentType::DATA, is_v2);
buf->finalize();

String metadata_content = dumpMetadataObjectToString(metadata_object);
writeMessageToFile(metadata_content, storage_metadata_name, object_storage, context, "*", "", persistent_components.metadata_compression_method);
if (catalog)
{
const auto & [namespace_name, table_name] = DataLake::parseTableName(storage_id.getTableName());
bool success = catalog->updateMetadata(namespace_name, table_name, storage_metadata_name, metadata_object);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Pass the generated snapshot to updateMetadata

truncate() builds new_snapshot but calls catalog->updateMetadata(..., metadata_object). For REST catalogs, RestCatalog::updateMetadata treats this argument as a snapshot and reads snapshot-id; table metadata objects do not have that field, so truncate fails before the request is sent. This makes TRUNCATE TABLE fail for Iceberg tables backed by REST catalogs.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Commit truncate with catalog-visible metadata path

The truncate commit passes storage_metadata_name to updateMetadata, but catalog updates need the catalog-facing metadata URI (as done in normal Iceberg writes). With Glue, this value is written to metadata_location; using an internal storage path instead of an s3://... URI leaves the catalog pointing at a non-resolvable location after truncate.

Useful? React with 👍 / 👎.

if (!success)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Failed to commit Iceberg truncate update to catalog.");
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it should be a LOGICAL_ERROR. This will crash ClickHouse

}
}

void IcebergMetadata::checkMutationIsPossible(const MutationCommands & commands)
{
for (const auto & command : commands)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ class IcebergMetadata : public IDataLakeMetadata
bool supportsUpdate() const override { return true; }
bool supportsWrites() const override { return true; }
bool supportsParallelInsert() const override { return true; }
bool supportsTruncate() const override { return true; }

void truncate(ContextPtr context, std::shared_ptr<DataLake::ICatalog> catalog, const StorageID & storage_id) override;

IcebergHistory getHistory(ContextPtr local_context) const;

Expand Down
10 changes: 7 additions & 3 deletions src/Storages/ObjectStorage/StorageObjectStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -625,7 +625,7 @@ void StorageObjectStorage::commitExportPartitionTransaction(const String & trans
void StorageObjectStorage::truncate(
const ASTPtr & /* query */,
const StorageMetadataPtr & /* metadata_snapshot */,
ContextPtr /* context */,
ContextPtr context,
TableExclusiveLockHolder & /* table_holder */)
{
const auto path = configuration->getRawPath();
Expand All @@ -639,8 +639,12 @@ void StorageObjectStorage::truncate(

if (configuration->isDataLakeConfiguration())
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED,
"Truncate is not supported for data lake engine");
auto * data_lake_metadata = getExternalMetadata(context);
if (!data_lake_metadata || !data_lake_metadata->supportsTruncate())
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Truncate is not supported for this data lake engine");

data_lake_metadata->truncate(context, catalog, getStorageID());
return;
}

if (path.hasGlobs())
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
#!/usr/bin/env python3

from pyiceberg.catalog import load_catalog
from helpers.config_cluster import minio_secret_key, minio_access_key
import uuid
import pyarrow as pa
from pyiceberg.schema import Schema, NestedField
from pyiceberg.types import LongType, StringType
from pyiceberg.partitioning import PartitionSpec

BASE_URL_LOCAL_RAW = "http://localhost:8182"
CATALOG_NAME = "demo"

def load_catalog_impl(started_cluster):
return load_catalog(
CATALOG_NAME,
**{
"uri": BASE_URL_LOCAL_RAW,
"type": "rest",
"s3.endpoint": f"http://{started_cluster.get_instance_ip('minio')}:9000",
"s3.access-key-id": minio_access_key,
"s3.secret-access-key": minio_secret_key,
},
)


def test_iceberg_truncate(started_cluster_iceberg_no_spark):
instance = started_cluster_iceberg_no_spark.instances["node1"]
catalog = load_catalog_impl(started_cluster_iceberg_no_spark)

# 1. Setup PyIceberg Namespace and Table
namespace = f"clickhouse_truncate_{uuid.uuid4().hex}"
catalog.create_namespace(namespace)

schema = Schema(
NestedField(field_id=1, name="id", field_type=LongType(), required=False),
NestedField(field_id=2, name="val", field_type=StringType(), required=False),
)

table_name = "test_truncate"
table = catalog.create_table(
identifier=f"{namespace}.{table_name}",
schema=schema,
location=f"s3://warehouse-rest/{namespace}.{table_name}",
partition_spec=PartitionSpec(),
)

# 2. Populate Data
df = pa.Table.from_pylist([
{"id": 1, "val": "A"},
{"id": 2, "val": "B"},
{"id": 3, "val": "C"},
])
table.append(df)

# Validate data is in iceberg
assert len(table.scan().to_arrow()) == 3

# 3. Setup ClickHouse Database
instance.query(f"DROP DATABASE IF EXISTS {namespace}")
instance.query(
f"""
CREATE DATABASE {namespace} ENGINE = DataLakeCatalog('http://rest:8181/v1', 'minio', '{minio_secret_key}')
SETTINGS
catalog_type='rest',
warehouse='demo',
storage_endpoint='http://minio:9000/warehouse-rest';
""",
settings={"allow_database_iceberg": 1}
)

# 4. Formulate the ClickHouse Table Identifier
# MUST wrap the inner table name in backticks so ClickHouse parses the Iceberg namespace correctly
ch_table_identifier = f"`{namespace}.{table_name}`"

# Assert data from ClickHouse
assert int(instance.query(f"SELECT count() FROM {namespace}.{ch_table_identifier}").strip()) == 3

# 5. Truncate Table via ClickHouse
instance.query(
f"TRUNCATE TABLE {namespace}.{ch_table_identifier}",
settings={"allow_experimental_insert_into_iceberg": 1}
)

# Assert truncated from ClickHouse
assert int(instance.query(f"SELECT count() FROM {namespace}.{ch_table_identifier}").strip()) == 0

# 6. Cross-Engine Validation using PyIceberg
# Refresh table state to grab the new v<N>.metadata.json you generated
table.refresh()

# Assert PyIceberg reads the empty snapshot successfully
assert len(table.scan().to_arrow()) == 0

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps it is a good idea to insert data again and check it can be read just to make sure we haven't broken anything?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call. Added in the latest commit — after asserting count() == 0, now append a new row via PyIceberg and verify ClickHouse reads it back as count() == 1. This confirms the table remains fully writable after truncation and that the metadata state is consistent.

# Cleanup
instance.query(f"DROP DATABASE {namespace}")
Loading