-
Notifications
You must be signed in to change notification settings - Fork 18
Feature: Support TRUNCATE TABLE for Iceberg engine #1529
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
de68832
601250f
27a8467
84d76f1
c81fc7f
876577b
cc1c7db
5a7e014
2148724
4681e4e
12657bb
bd1d16e
11810d8
2b66954
cf85627
e70e425
4cfc411
47c8acc
2155e32
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -9,9 +9,9 @@ | |
| #include <Core/UUID.h> | ||
| #include <DataTypes/DataTypeSet.h> | ||
| #include <Formats/FormatFilterInfo.h> | ||
| #include <Formats/FormatParserSharedResources.h> | ||
| #include <Formats/ReadSchemaUtils.h> | ||
| #include <Functions/FunctionFactory.h> | ||
| #include <Storages/ObjectStorage/DataLakes/Iceberg/MetadataGenerator.h> | ||
| #include <Functions/IFunctionAdaptors.h> | ||
| #include <Functions/tuple.h> | ||
| #include <Processors/Formats/ISchemaReader.h> | ||
|
|
@@ -517,6 +517,83 @@ void IcebergMetadata::mutate( | |
| ); | ||
| } | ||
|
|
||
| void IcebergMetadata::truncate(ContextPtr context, std::shared_ptr<DataLake::ICatalog> catalog, const StorageID & storage_id) | ||
| { | ||
| if (!context->getSettingsRef()[Setting::allow_experimental_insert_into_iceberg].value) | ||
| { | ||
| throw Exception( | ||
| ErrorCodes::SUPPORT_IS_DISABLED, | ||
| "Iceberg truncate is experimental. " | ||
| "To allow its usage, enable setting allow_experimental_insert_into_iceberg"); | ||
| } | ||
|
|
||
| auto [actual_data_snapshot, actual_table_state_snapshot] = getRelevantState(context); | ||
| auto metadata_object = getMetadataJSONObject( | ||
| actual_table_state_snapshot.metadata_file_path, | ||
| object_storage, | ||
| persistent_components.metadata_cache, | ||
| context, | ||
| log, | ||
| persistent_components.metadata_compression_method, | ||
| persistent_components.table_uuid); | ||
|
|
||
| Int64 parent_snapshot_id = actual_table_state_snapshot.snapshot_id.value_or(0); | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why 0? I have seen "-1" in a few places https://github.com/ClickHouse/ClickHouse/blob/e3064cae1e27cafb79d37215a8d1d13a24c7801f/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergWrites.cpp#L879 and https://github.com/ClickHouse/ClickHouse/blob/e3064cae1e27cafb79d37215a8d1d13a24c7801f/src/Databases/DataLake/RestCatalog.cpp#L1055. Perhaps it is part of a specification? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Using Useful? React with 👍 / 👎. |
||
|
|
||
| auto config_path = persistent_components.table_path; | ||
| if (config_path.empty() || config_path.back() != '/') | ||
| config_path += "/"; | ||
| if (!config_path.starts_with('/')) | ||
| config_path = '/' + config_path; | ||
|
|
||
| FileNamesGenerator filename_generator; | ||
| if (!context->getSettingsRef()[Setting::write_full_path_in_iceberg_metadata]) | ||
| { | ||
| filename_generator = FileNamesGenerator( | ||
| config_path, config_path, (catalog != nullptr && catalog->isTransactional()), persistent_components.metadata_compression_method, write_format); | ||
| } | ||
| else | ||
| { | ||
| auto bucket = metadata_object->getValue<String>(Iceberg::f_location); | ||
| if (bucket.empty() || bucket.back() != '/') | ||
| bucket += "/"; | ||
| filename_generator = FileNamesGenerator( | ||
| bucket, config_path, (catalog != nullptr && catalog->isTransactional()), persistent_components.metadata_compression_method, write_format); | ||
| } | ||
|
|
||
| Int32 new_metadata_version = actual_table_state_snapshot.metadata_version + 1; | ||
| filename_generator.setVersion(new_metadata_version); | ||
|
|
||
| auto [metadata_name, storage_metadata_name] = filename_generator.generateMetadataName(); | ||
|
|
||
| auto [new_snapshot, manifest_list_name, storage_manifest_list_name] = MetadataGenerator(metadata_object).generateNextMetadata( | ||
| filename_generator, metadata_name, parent_snapshot_id, | ||
| /* added_files */ 0, /* added_records */ 0, /* added_files_size */ 0, | ||
| /* num_partitions */ 0, /* added_delete_files */ 0, /* num_deleted_rows */ 0); | ||
|
|
||
| // generate manifest list with 0 manifest files | ||
| auto write_settings = context->getWriteSettings(); | ||
| auto buf = object_storage->writeObject( | ||
| StoredObject(storage_manifest_list_name), | ||
| WriteMode::Rewrite, | ||
| std::nullopt, | ||
| DBMS_DEFAULT_BUFFER_SIZE, | ||
| write_settings | ||
| ); | ||
| generateManifestList(filename_generator, metadata_object, object_storage, context, {}, new_snapshot, 0, *buf, Iceberg::FileContentType::DATA, false); | ||
| buf->finalize(); | ||
|
|
||
| String metadata_content = dumpMetadataObjectToString(metadata_object); | ||
| writeMessageToFile(metadata_content, storage_metadata_name, object_storage, context, "*", "", persistent_components.metadata_compression_method); | ||
|
|
||
| if (catalog) | ||
| { | ||
| const auto & [namespace_name, table_name] = DataLake::parseTableName(storage_id.getTableName()); | ||
| bool success = catalog->updateMetadata(namespace_name, table_name, storage_metadata_name, metadata_object); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Useful? React with 👍 / 👎. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
The truncate commit passes Useful? React with 👍 / 👎. |
||
| if (!success) | ||
| throw Exception(ErrorCodes::LOGICAL_ERROR, "Failed to commit Iceberg truncate update to catalog."); | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think it should be a |
||
| } | ||
| } | ||
|
|
||
| void IcebergMetadata::checkMutationIsPossible(const MutationCommands & commands) | ||
| { | ||
| for (const auto & command : commands) | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||
|---|---|---|---|---|
|
|
@@ -631,7 +631,7 @@ void StorageObjectStorage::commitExportPartitionTransaction(const String & trans | |||
| void StorageObjectStorage::truncate( | ||||
| const ASTPtr & /* query */, | ||||
| const StorageMetadataPtr & /* metadata_snapshot */, | ||||
| ContextPtr /* context */, | ||||
| ContextPtr context, | ||||
| TableExclusiveLockHolder & /* table_holder */) | ||||
| { | ||||
| const auto path = configuration->getRawPath(); | ||||
|
|
@@ -645,8 +645,15 @@ void StorageObjectStorage::truncate( | |||
|
|
||||
| if (configuration->isDataLakeConfiguration()) | ||||
| { | ||||
| throw Exception(ErrorCodes::NOT_IMPLEMENTED, | ||||
| "Truncate is not supported for data lake engine"); | ||||
| if (isDataLake()) | ||||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Isn't
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks! |
||||
| { | ||||
| auto * data_lake_metadata = getExternalMetadata(context); | ||||
| if (!data_lake_metadata || !data_lake_metadata->supportsTruncate()) | ||||
| throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Truncate is not supported for this data lake engine"); | ||||
|
|
||||
| data_lake_metadata->truncate(context, catalog, getStorageID()); | ||||
| return; | ||||
| } | ||||
| } | ||||
|
|
||||
| if (path.hasGlobs()) | ||||
|
|
||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,84 @@ | ||
| #!/usr/bin/env python3 | ||
|
|
||
| from pyiceberg.catalog import load_catalog | ||
| from helpers.config_cluster import minio_secret_key, minio_access_key | ||
| import uuid | ||
| import pyarrow as pa | ||
| from pyiceberg.schema import Schema, NestedField | ||
| from pyiceberg.types import LongType, StringType | ||
| from pyiceberg.partitioning import PartitionSpec | ||
|
|
||
| BASE_URL_LOCAL_RAW = "http://localhost:8182" | ||
| CATALOG_NAME = "demo" | ||
|
|
||
| def load_catalog_impl(started_cluster): | ||
| return load_catalog( | ||
| CATALOG_NAME, | ||
| **{ | ||
| "uri": BASE_URL_LOCAL_RAW, | ||
| "type": "rest", | ||
| "s3.endpoint": f"http://{started_cluster.get_instance_ip('minio')}:9000", | ||
| "s3.access-key-id": minio_access_key, | ||
| "s3.secret-access-key": minio_secret_key, | ||
| }, | ||
| ) | ||
|
|
||
| def test_iceberg_truncate(started_cluster_iceberg_no_spark): | ||
| instance = started_cluster_iceberg_no_spark.instances["node1"] | ||
| catalog = load_catalog_impl(started_cluster_iceberg_no_spark) | ||
|
|
||
| namespace = f"clickhouse_truncate_{uuid.uuid4().hex}" | ||
| catalog.create_namespace(namespace) | ||
|
|
||
| schema = Schema( | ||
| NestedField(field_id=1, name="id", field_type=LongType(), required=False), | ||
| NestedField(field_id=2, name="val", field_type=StringType(), required=False), | ||
| ) | ||
|
|
||
| table_name = "test_truncate" | ||
|
|
||
| table = catalog.create_table( | ||
| identifier=f"{namespace}.{table_name}", | ||
| schema=schema, | ||
| location=f"s3://warehouse-rest/{namespace}.{table_name}", | ||
| partition_spec=PartitionSpec(), | ||
| ) | ||
|
|
||
| df = pa.Table.from_pylist([ | ||
| {"id": 1, "val": "A"}, | ||
| {"id": 2, "val": "B"}, | ||
| {"id": 3, "val": "C"}, | ||
| ]) | ||
| table.append(df) | ||
|
|
||
| # Validate data is in iceberg | ||
| assert len(table.scan().to_arrow()) == 3 | ||
|
|
||
| # Setup ClickHouse Database | ||
| instance.query( | ||
| f""" | ||
| DROP DATABASE IF EXISTS {namespace}; | ||
| SET allow_database_iceberg=true; | ||
| CREATE DATABASE {namespace} ENGINE = DataLakeCatalog('http://rest:8181/v1', 'minio', '{minio_secret_key}') | ||
| SETTINGS catalog_type='rest', warehouse='demo', storage_endpoint='http://minio:9000/warehouse-rest'; | ||
| """ | ||
| ) | ||
|
|
||
| # Assert data from ClickHouse | ||
| assert int(instance.query(f"SELECT count() FROM {namespace}.{table_name}").strip()) == 3 | ||
|
|
||
| # Truncate Table via ClickHouse | ||
| instance.query(f"SET allow_experimental_insert_into_iceberg=1; TRUNCATE TABLE {namespace}.{table_name};") | ||
|
|
||
| # Assert truncated from ClickHouse | ||
| assert int(instance.query(f"SELECT count() FROM {namespace}.{table_name}").strip()) == 0 | ||
|
|
||
| # Cross-Engine Validation using PyIceberg | ||
| # Refresh table state | ||
| table.refresh() | ||
|
|
||
| # Assert PyIceberg reads the empty snapshot successfully | ||
| assert len(table.scan().to_arrow()) == 0 | ||
|
|
||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Perhaps it is a good idea to insert data again and check it can be read just to make sure we haven't broken anything?
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good call. Added in the latest commit — after asserting count() == 0, now append a new row via PyIceberg and verify ClickHouse reads it back as count() == 1. This confirms the table remains fully writable after truncation and that the metadata state is consistent. |
||
| # Cleanup | ||
| instance.query(f"DROP DATABASE {namespace}") | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,2 @@ | ||
| 2 | ||
| 0 |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,12 @@ | ||
| SET allow_experimental_object_storage_table_engine = 1; | ||
| SET allow_experimental_insert_into_iceberg = 1; | ||
|
|
||
| DROP TABLE IF EXISTS test_iceberg_truncate; | ||
| CREATE TABLE test_iceberg_truncate (id Int32, val String) ENGINE = IcebergLocal('test_iceberg_truncate'); | ||
|
|
||
| INSERT INTO test_iceberg_truncate VALUES (1, 'Test1'), (2, 'Test2'); | ||
| SELECT count() FROM test_iceberg_truncate; | ||
| TRUNCATE TABLE test_iceberg_truncate; | ||
| SELECT count() FROM test_iceberg_truncate; | ||
|
|
||
| DROP TABLE test_iceberg_truncate; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would not change this method simply to avoid merge conflicts with upstream later on