-
Notifications
You must be signed in to change notification settings - Fork 18
Feature: Support TRUNCATE TABLE for Iceberg engine #1529
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 8 commits
de68832
601250f
27a8467
84d76f1
c81fc7f
876577b
cc1c7db
5a7e014
2148724
4681e4e
12657bb
bd1d16e
11810d8
2b66954
cf85627
e70e425
4cfc411
47c8acc
2155e32
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -9,9 +9,9 @@ | |
| #include <Core/UUID.h> | ||
| #include <DataTypes/DataTypeSet.h> | ||
| #include <Formats/FormatFilterInfo.h> | ||
| #include <Formats/FormatParserSharedResources.h> | ||
| #include <Formats/ReadSchemaUtils.h> | ||
| #include <Functions/FunctionFactory.h> | ||
| #include <Storages/ObjectStorage/DataLakes/Iceberg/MetadataGenerator.h> | ||
| #include <Functions/IFunctionAdaptors.h> | ||
| #include <Functions/tuple.h> | ||
| #include <Processors/Formats/ISchemaReader.h> | ||
|
|
@@ -531,6 +531,86 @@ void IcebergMetadata::mutate( | |
| ); | ||
| } | ||
|
|
||
| void IcebergMetadata::truncate(ContextPtr context, std::shared_ptr<DataLake::ICatalog> catalog, const StorageID & storage_id) | ||
| { | ||
| if (!context->getSettingsRef()[Setting::allow_experimental_insert_into_iceberg].value) | ||
| { | ||
| throw Exception( | ||
| ErrorCodes::SUPPORT_IS_DISABLED, | ||
| "Iceberg truncate is experimental. " | ||
| "To allow its usage, enable setting allow_experimental_insert_into_iceberg"); | ||
| } | ||
|
|
||
| auto [actual_data_snapshot, actual_table_state_snapshot] = getRelevantState(context); | ||
| auto metadata_object = getMetadataJSONObject( | ||
| actual_table_state_snapshot.metadata_file_path, | ||
| object_storage, | ||
| persistent_components.metadata_cache, | ||
| context, | ||
| log, | ||
| persistent_components.metadata_compression_method, | ||
| persistent_components.table_uuid); | ||
|
|
||
| Int64 parent_snapshot_id = actual_table_state_snapshot.snapshot_id.value_or(0); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Using Useful? React with 👍 / 👎. |
||
| auto config_path = persistent_components.table_path; | ||
| if (config_path.empty() || config_path.back() != '/') | ||
| config_path += "/"; | ||
| if (!config_path.starts_with('/')) | ||
| config_path = '/' + config_path; | ||
|
|
||
| FileNamesGenerator filename_generator; | ||
| if (!context->getSettingsRef()[Setting::write_full_path_in_iceberg_metadata]) | ||
| { | ||
| filename_generator = FileNamesGenerator( | ||
| config_path, config_path, (catalog != nullptr && catalog->isTransactional()), persistent_components.metadata_compression_method, write_format); | ||
| } | ||
| else | ||
| { | ||
| auto bucket = metadata_object->getValue<String>(Iceberg::f_location); | ||
| if (bucket.empty() || bucket.back() != '/') | ||
| bucket += "/"; | ||
| filename_generator = FileNamesGenerator( | ||
| bucket, config_path, (catalog != nullptr && catalog->isTransactional()), persistent_components.metadata_compression_method, write_format); | ||
| } | ||
|
|
||
| Int32 new_metadata_version = actual_table_state_snapshot.metadata_version + 1; | ||
| filename_generator.setVersion(new_metadata_version); | ||
| auto [metadata_name, storage_metadata_name] = filename_generator.generateMetadataName(); | ||
|
|
||
| auto [new_snapshot, manifest_list_name, storage_manifest_list_name] = MetadataGenerator(metadata_object).generateNextMetadata( | ||
| filename_generator, metadata_name, parent_snapshot_id, | ||
| /* added_files */ 0, /* added_records */ 0, /* added_files_size */ 0, | ||
| /* num_partitions */ 0, /* added_delete_files */ 0, /* num_deleted_rows */ 0); | ||
|
|
||
| // generate manifest list with 0 manifest files | ||
| int format_version = 1; | ||
| if (metadata_object->has("format-version")) | ||
| format_version = metadata_object->getValue<int>("format-version"); | ||
| bool is_v2 = (format_version == 2); | ||
|
|
||
| // generate manifest list with 0 manifest files | ||
| auto write_settings = context->getWriteSettings(); | ||
| auto buf = object_storage->writeObject( | ||
| StoredObject(storage_manifest_list_name), | ||
| WriteMode::Rewrite, | ||
| std::nullopt, | ||
| DBMS_DEFAULT_BUFFER_SIZE, | ||
| write_settings | ||
| ); | ||
| generateManifestList(filename_generator, metadata_object, object_storage, context, {}, new_snapshot, 0, *buf, Iceberg::FileContentType::DATA, is_v2); | ||
| buf->finalize(); | ||
|
|
||
| String metadata_content = dumpMetadataObjectToString(metadata_object); | ||
| writeMessageToFile(metadata_content, storage_metadata_name, object_storage, context, "*", "", persistent_components.metadata_compression_method); | ||
| if (catalog) | ||
| { | ||
| const auto & [namespace_name, table_name] = DataLake::parseTableName(storage_id.getTableName()); | ||
| bool success = catalog->updateMetadata(namespace_name, table_name, storage_metadata_name, metadata_object); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Useful? React with 👍 / 👎. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
The truncate commit passes Useful? React with 👍 / 👎. |
||
| if (!success) | ||
| throw Exception(ErrorCodes::LOGICAL_ERROR, "Failed to commit Iceberg truncate update to catalog."); | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think it should be a |
||
| } | ||
| } | ||
|
|
||
| void IcebergMetadata::checkMutationIsPossible(const MutationCommands & commands) | ||
| { | ||
| for (const auto & command : commands) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,96 @@ | ||
| #!/usr/bin/env python3 | ||
|
|
||
| from pyiceberg.catalog import load_catalog | ||
| from helpers.config_cluster import minio_secret_key, minio_access_key | ||
| import uuid | ||
| import pyarrow as pa | ||
| from pyiceberg.schema import Schema, NestedField | ||
| from pyiceberg.types import LongType, StringType | ||
| from pyiceberg.partitioning import PartitionSpec | ||
|
|
||
| BASE_URL_LOCAL_RAW = "http://localhost:8182" | ||
| CATALOG_NAME = "demo" | ||
|
|
||
| def load_catalog_impl(started_cluster): | ||
| return load_catalog( | ||
| CATALOG_NAME, | ||
| **{ | ||
| "uri": BASE_URL_LOCAL_RAW, | ||
| "type": "rest", | ||
| "s3.endpoint": f"http://{started_cluster.get_instance_ip('minio')}:9000", | ||
| "s3.access-key-id": minio_access_key, | ||
| "s3.secret-access-key": minio_secret_key, | ||
| }, | ||
| ) | ||
|
|
||
|
|
||
| def test_iceberg_truncate(started_cluster_iceberg_no_spark): | ||
| instance = started_cluster_iceberg_no_spark.instances["node1"] | ||
| catalog = load_catalog_impl(started_cluster_iceberg_no_spark) | ||
|
|
||
| # 1. Setup PyIceberg Namespace and Table | ||
| namespace = f"clickhouse_truncate_{uuid.uuid4().hex}" | ||
| catalog.create_namespace(namespace) | ||
|
|
||
| schema = Schema( | ||
| NestedField(field_id=1, name="id", field_type=LongType(), required=False), | ||
| NestedField(field_id=2, name="val", field_type=StringType(), required=False), | ||
| ) | ||
|
|
||
| table_name = "test_truncate" | ||
| table = catalog.create_table( | ||
| identifier=f"{namespace}.{table_name}", | ||
| schema=schema, | ||
| location=f"s3://warehouse-rest/{namespace}.{table_name}", | ||
| partition_spec=PartitionSpec(), | ||
| ) | ||
|
|
||
| # 2. Populate Data | ||
| df = pa.Table.from_pylist([ | ||
| {"id": 1, "val": "A"}, | ||
| {"id": 2, "val": "B"}, | ||
| {"id": 3, "val": "C"}, | ||
| ]) | ||
| table.append(df) | ||
|
|
||
| # Validate data is in iceberg | ||
| assert len(table.scan().to_arrow()) == 3 | ||
|
|
||
| # 3. Setup ClickHouse Database | ||
| instance.query(f"DROP DATABASE IF EXISTS {namespace}") | ||
| instance.query( | ||
| f""" | ||
| CREATE DATABASE {namespace} ENGINE = DataLakeCatalog('http://rest:8181/v1', 'minio', '{minio_secret_key}') | ||
| SETTINGS | ||
| catalog_type='rest', | ||
| warehouse='demo', | ||
| storage_endpoint='http://minio:9000/warehouse-rest'; | ||
| """, | ||
| settings={"allow_database_iceberg": 1} | ||
| ) | ||
|
|
||
| # 4. Formulate the ClickHouse Table Identifier | ||
| # MUST wrap the inner table name in backticks so ClickHouse parses the Iceberg namespace correctly | ||
| ch_table_identifier = f"`{namespace}.{table_name}`" | ||
|
|
||
| # Assert data from ClickHouse | ||
| assert int(instance.query(f"SELECT count() FROM {namespace}.{ch_table_identifier}").strip()) == 3 | ||
|
|
||
| # 5. Truncate Table via ClickHouse | ||
| instance.query( | ||
| f"TRUNCATE TABLE {namespace}.{ch_table_identifier}", | ||
| settings={"allow_experimental_insert_into_iceberg": 1} | ||
| ) | ||
|
|
||
| # Assert truncated from ClickHouse | ||
| assert int(instance.query(f"SELECT count() FROM {namespace}.{ch_table_identifier}").strip()) == 0 | ||
|
|
||
| # 6. Cross-Engine Validation using PyIceberg | ||
| # Refresh table state to grab the new v<N>.metadata.json you generated | ||
| table.refresh() | ||
|
|
||
| # Assert PyIceberg reads the empty snapshot successfully | ||
| assert len(table.scan().to_arrow()) == 0 | ||
|
|
||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Perhaps it is a good idea to insert data again and check it can be read just to make sure we haven't broken anything?
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good call. Added in the latest commit — after asserting count() == 0, now append a new row via PyIceberg and verify ClickHouse reads it back as count() == 1. This confirms the table remains fully writable after truncation and that the metadata state is consistent. |
||
| # Cleanup | ||
| instance.query(f"DROP DATABASE {namespace}") | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why 0? I have seen "-1" in a few places https://github.com/ClickHouse/ClickHouse/blob/e3064cae1e27cafb79d37215a8d1d13a24c7801f/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergWrites.cpp#L879 and https://github.com/ClickHouse/ClickHouse/blob/e3064cae1e27cafb79d37215a8d1d13a24c7801f/src/Databases/DataLake/RestCatalog.cpp#L1055.
Perhaps it is part of a specification?