diff --git a/src/Databases/DataLake/GlueCatalog.cpp b/src/Databases/DataLake/GlueCatalog.cpp index ea95bb67bc39..bfe2f09dd049 100644 --- a/src/Databases/DataLake/GlueCatalog.cpp +++ b/src/Databases/DataLake/GlueCatalog.cpp @@ -685,7 +685,7 @@ void GlueCatalog::createTable(const String & namespace_name, const String & tabl throw DB::Exception(DB::ErrorCodes::DATALAKE_DATABASE_ERROR, "Can not create metadata in glue catalog: {}", response.GetError().GetMessage()); } -bool GlueCatalog::updateMetadata(const String & namespace_name, const String & table_name, const String & new_metadata_path, Poco::JSON::Object::Ptr /*new_snapshot*/) const +CommitOutcome GlueCatalog::updateMetadata(const String & namespace_name, const String & table_name, const String & new_metadata_path, Poco::JSON::Object::Ptr /*new_snapshot*/) const { Aws::Glue::Model::UpdateTableRequest request; request.SetDatabaseName(namespace_name); @@ -725,7 +725,7 @@ bool GlueCatalog::updateMetadata(const String & namespace_name, const String & t if (!response.IsSuccess()) throw DB::Exception(DB::ErrorCodes::DATALAKE_DATABASE_ERROR, "Can not update metadata in glue catalog {}", response.GetError().GetMessage()); - return true; + return CommitOutcome::Committed; } void GlueCatalog::dropTable(const String & namespace_name, const String & table_name) const diff --git a/src/Databases/DataLake/GlueCatalog.h b/src/Databases/DataLake/GlueCatalog.h index 21674c25edc7..6a8c4106ba9e 100644 --- a/src/Databases/DataLake/GlueCatalog.h +++ b/src/Databases/DataLake/GlueCatalog.h @@ -68,7 +68,7 @@ class GlueCatalog final : public ICatalog, private DB::WithContext void createTable(const String & namespace_name, const String & table_name, const String & new_metadata_path, Poco::JSON::Object::Ptr metadata_content) const override; - bool updateMetadata(const String & namespace_name, const String & table_name, const String & new_metadata_path, Poco::JSON::Object::Ptr new_snapshot) const override; + CommitOutcome updateMetadata(const String & namespace_name, const String & table_name, const String & new_metadata_path, Poco::JSON::Object::Ptr new_snapshot) const override; void dropTable(const String & namespace_name, const String & table_name) const override; /// Returns a callback that re-vends fresh AWS credentials from the configured diff --git a/src/Databases/DataLake/ICatalog.cpp b/src/Databases/DataLake/ICatalog.cpp index a70038741bdc..c192388148b5 100644 --- a/src/Databases/DataLake/ICatalog.cpp +++ b/src/Databases/DataLake/ICatalog.cpp @@ -361,7 +361,7 @@ void ICatalog::createTable(const String & /*namespace_name*/, const String & /*t throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "createTable is not implemented"); } -bool ICatalog::updateMetadata(const String & /*namespace_name*/, const String & /*table_name*/, const String & /*new_metadata_path*/, Poco::JSON::Object::Ptr /*new_snapshot*/) const +CommitOutcome ICatalog::updateMetadata(const String & /*namespace_name*/, const String & /*table_name*/, const String & /*new_metadata_path*/, Poco::JSON::Object::Ptr /*new_snapshot*/) const { throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "updateMetadata is not implemented"); } diff --git a/src/Databases/DataLake/ICatalog.h b/src/Databases/DataLake/ICatalog.h index cbd700fbabcc..6b8b73ed1aea 100644 --- a/src/Databases/DataLake/ICatalog.h +++ b/src/Databases/DataLake/ICatalog.h @@ -155,6 +155,19 @@ struct CatalogSettings DB::SettingsChanges allChanged() const; }; +enum class CommitOutcome +{ + /// The commit is confirmed live in the catalog. The caller must NOT delete any files. + Committed, + /// The commit was cleanly rejected and never became visible. The files we wrote are + /// orphans and the caller may safely delete them. + RejectedCleanly, + /// Could not determine whether the commit landed (e.g. the post-failure re-read + /// also failed, or the catalog response lacked snapshot state). The caller must + /// preserve all files: a recoverable leak is preferable to unrecoverable corruption. + Unknown, +}; + /// Base class for catalog implementation. /// Used for communication with the catalog. class ICatalog @@ -203,8 +216,9 @@ class ICatalog /// Creates new table in catalog. virtual void createTable(const String & namespace_name, const String & table_name, const String & new_metadata_path, Poco::JSON::Object::Ptr metadata_content) const; - /// Updates metadata in catalog. - virtual bool updateMetadata(const String & namespace_name, const String & table_name, const String & new_metadata_path, Poco::JSON::Object::Ptr new_snapshot) const; + /// Returns a `CommitOutcome` describing whether the commit became live, was cleanly + /// rejected, or is of unknown status. Callers must only delete written files on`RejectedCleanly`. + virtual CommitOutcome updateMetadata(const String & namespace_name, const String & table_name, const String & new_metadata_path, Poco::JSON::Object::Ptr new_snapshot) const; /// Drop table from catalog. virtual void dropTable(const String & namespace_name, const String & table_name) const; diff --git a/src/Databases/DataLake/RestCatalog.cpp b/src/Databases/DataLake/RestCatalog.cpp index e030c873a152..89a623425183 100644 --- a/src/Databases/DataLake/RestCatalog.cpp +++ b/src/Databases/DataLake/RestCatalog.cpp @@ -2,9 +2,11 @@ #include #include #include +#include #include #include #include +#include #include #include #include @@ -41,6 +43,7 @@ #include #include #include +#include #include #include #include @@ -1170,7 +1173,7 @@ void RestCatalog::createTable(const String & namespace_name, const String & tabl } -bool RestCatalog::updateMetadata(const String & namespace_name, const String & table_name, const String & /*new_metadata_path*/, Poco::JSON::Object::Ptr new_snapshot) const +CommitOutcome RestCatalog::updateMetadata(const String & namespace_name, const String & table_name, const String & /*new_metadata_path*/, Poco::JSON::Object::Ptr new_snapshot) const { const std::string endpoint = base_url / config.prefix / "namespaces" / namespace_name / "tables" / table_name; @@ -1232,9 +1235,135 @@ bool RestCatalog::updateMetadata(const String & namespace_name, const String & t } catch (const DB::HTTPException &) { - return false; + return classifyCommitOutcomeAfterFailure(namespace_name, table_name, new_snapshot); } - return true; + catch (const DB::NetException &) + { + return classifyCommitOutcomeAfterFailure(namespace_name, table_name, new_snapshot); + } + catch (const Poco::Net::NetException &) + { + return classifyCommitOutcomeAfterFailure(namespace_name, table_name, new_snapshot); + } + return CommitOutcome::Committed; +} + +Poco::JSON::Object::Ptr RestCatalog::getRawTableMetadataObject( + const std::string & namespace_name, + const std::string & table_name, + DB::ContextPtr /*context_*/) const +{ + const std::string endpoint = std::filesystem::path(NAMESPACES_ENDPOINT) / encodeNamespaceForURI(namespace_name) / "tables" / table_name; + + String json_str; + { + /// Always a fresh network GET: this read path consults no metadata cache, so it + /// reflects the catalog's current server-side state (required to classify a commit). + auto buf = createReadBuffer(config.prefix / endpoint); + if (buf->eof()) + return nullptr; + readJSONObjectPossiblyInvalid(json_str, *buf); + } + + Poco::JSON::Parser parser; + Poco::Dynamic::Var json = parser.parse(json_str); + const Poco::JSON::Object::Ptr & object = json.extract(); + if (!object || !object->has("metadata")) + return nullptr; + return object->get("metadata").extract(); +} + +CommitOutcome RestCatalog::classifyCommitOutcomeAfterFailure( + const std::string & namespace_name, + const std::string & table_name, + Poco::JSON::Object::Ptr new_snapshot) const +{ + /// Snapshot-expiration commits pass no snapshot (nullptr); there is no id to confirm, + /// so we cannot prove the commit landed or was rejected -> preserve files. + if (!new_snapshot || !new_snapshot->has(DB::Iceberg::f_metadata_snapshot_id)) + return CommitOutcome::Unknown; + + const Int64 our_snapshot_id = new_snapshot->getValue(DB::Iceberg::f_metadata_snapshot_id); + + Poco::JSON::Object::Ptr metadata_object; + try + { + metadata_object = getRawTableMetadataObject(namespace_name, table_name, getContext()); + } + catch (...) + { + LOG_WARNING( + log, + "Commit response lost and post-failure re-read of table {}.{} also failed; " + "classifying snapshot {} as Unknown to preserve files: {}", + namespace_name, table_name, our_snapshot_id, DB::getCurrentExceptionMessage(false)); + return CommitOutcome::Unknown; + } + + if (!metadata_object) + { + LOG_WARNING( + log, + "Post-failure re-read of table {}.{} returned no metadata; " + "classifying snapshot {} as Unknown", + namespace_name, table_name, our_snapshot_id); + return CommitOutcome::Unknown; + } + + /// Our snapshot is the current ref target: the commit landed and only its response was lost. + bool had_current_snapshot_id = metadata_object->has(DB::Iceberg::f_current_snapshot_id) + && !metadata_object->get(DB::Iceberg::f_current_snapshot_id).isEmpty(); + if (had_current_snapshot_id) + { + const Int64 current_snapshot_id = metadata_object->getValue(DB::Iceberg::f_current_snapshot_id); + if (current_snapshot_id == our_snapshot_id) + { + LOG_DEBUG( + log, "Commit confirmed: snapshot {} is current in catalog for {}.{}", + our_snapshot_id, namespace_name, table_name); + return CommitOutcome::Committed; + } + } + + /// Our snapshot is in the history but no longer current: we landed, then a concurrent + /// writer superseded us. We test membership rather than equality with current, because + /// otherwise a later concurrent commit would make our real commit look rejected. + bool had_snapshots_array = false; + if (auto snapshots = metadata_object->getArray(DB::Iceberg::f_snapshots)) + { + had_snapshots_array = true; + for (UInt32 i = 0; i < snapshots->size(); ++i) + { + auto snapshot = snapshots->getObject(i); + if (snapshot + && snapshot->has(DB::Iceberg::f_metadata_snapshot_id) + && snapshot->getValue(DB::Iceberg::f_metadata_snapshot_id) == our_snapshot_id) + { + LOG_DEBUG( + log, + "Commit confirmed: snapshot {} present in catalog history for {}.{} " + "(superseded by a later snapshot)", + our_snapshot_id, namespace_name, table_name); + return CommitOutcome::Committed; + } + } + } + + if (had_current_snapshot_id || had_snapshots_array) + { + LOG_DEBUG( + log, "Commit cleanly rejected: snapshot {} absent from catalog state for {}.{}", + our_snapshot_id, namespace_name, table_name); + return CommitOutcome::RejectedCleanly; + } + + /// The response carried no snapshot state at all, so we cannot prove anything: preserve files. + LOG_WARNING( + log, + "Post-failure re-read of table {}.{} carried no snapshot state; " + "classifying snapshot {} as Unknown", + namespace_name, table_name, our_snapshot_id); + return CommitOutcome::Unknown; } void RestCatalog::dropTable(const String & namespace_name, const String & table_name) const diff --git a/src/Databases/DataLake/RestCatalog.h b/src/Databases/DataLake/RestCatalog.h index 9184e93e4530..59a2d8cd5aea 100644 --- a/src/Databases/DataLake/RestCatalog.h +++ b/src/Databases/DataLake/RestCatalog.h @@ -87,7 +87,7 @@ class RestCatalog : public ICatalog, public DB::WithContext void createTable(const String & namespace_name, const String & table_name, const String & new_metadata_path, Poco::JSON::Object::Ptr metadata_content) const override; - bool updateMetadata(const String & namespace_name, const String & table_name, const String & new_metadata_path, Poco::JSON::Object::Ptr new_snapshot) const override; + CommitOutcome updateMetadata(const String & namespace_name, const String & table_name, const String & new_metadata_path, Poco::JSON::Object::Ptr new_snapshot) const override; bool isTransactional() const override { return true; } @@ -204,6 +204,20 @@ class RestCatalog : public ICatalog, public DB::WithContext DB::ContextPtr context_, TableMetadata & result) const; + Poco::JSON::Object::Ptr getRawTableMetadataObject( + const std::string & namespace_name, + const std::string & table_name, + DB::ContextPtr context_) const; + + /// After a failed commit POST, re-reads the catalog and classifies whether our snapshot + /// nonetheless became live (lost-response case) or was cleanly rejected. See the four + /// classification arms in the implementation. Never returns `RejectedCleanly` unless the + /// re-read succeeded and our snapshot id is provably absent from populated snapshot state. + CommitOutcome classifyCommitOutcomeAfterFailure( + const std::string & namespace_name, + const std::string & table_name, + Poco::JSON::Object::Ptr new_snapshot) const; + Config loadConfig(); virtual DB::HTTPHeaderEntries getAuthHeaders( bool update_token, diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp index 9819a8783516..912c2c90f94c 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp @@ -719,9 +719,12 @@ void IcebergMetadata::truncate(ContextPtr context, std::shared_ptrupdateMetadata(namespace_name, table_name, path_resolver.resolveForCatalog(metadata_info.path), new_snapshot)) + /// Truncate performs no destructive file cleanup here, so anything other than a + /// confirmed commit is surfaced as a failure (preserving files). + const auto outcome = catalog->updateMetadata(namespace_name, table_name, path_resolver.resolveForCatalog(metadata_info.path), new_snapshot); + if (outcome != DataLake::CommitOutcome::Committed) throw Exception(ErrorCodes::INCORRECT_DATA, - "Failed to commit Iceberg truncate update to catalog."); + "Failed to commit Iceberg truncate update to catalog (commit was not confirmed)."); } } @@ -1811,11 +1814,21 @@ bool IcebergMetadata::commitImportPartitionTransactionImpl( auto catalog_filename = resolver.resolveForCatalog(metadata_info.path); const auto & [namespace_name, table_name] = DataLake::parseTableName(table_id.getTableName()); - if (!catalog->updateMetadata(namespace_name, table_name, catalog_filename, new_snapshot)) + const auto outcome = catalog->updateMetadata(namespace_name, table_name, catalog_filename, new_snapshot); + if (outcome == DataLake::CommitOutcome::RejectedCleanly) { cleanup(true); return false; } + if (outcome == DataLake::CommitOutcome::Unknown) + { + LOG_ERROR( + log, + "Iceberg commit for {}.{} is of unknown status after a lost response; " + "preserving written files to avoid corrupting a possibly-committed snapshot.", + namespace_name, table_name); + return false; + } /// Catalog has accepted the commit - the new snapshot is now live and references /// storage_manifest_entry_name / storage_manifest_list_name. From here on, any diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergWrites.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergWrites.cpp index 026e19dfcd04..8f8c128b2900 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergWrites.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergWrites.cpp @@ -1300,6 +1300,11 @@ bool IcebergStorageSink::initializeMetadata() } }; + /// Becomes true once the catalog has confirmed the commit (the snapshot is live and + /// references the manifest entry / manifest list we wrote). From that point any failure + /// must NOT delete those files, otherwise the live snapshot is corrupted. + bool published = false; + try { for (const auto & [partition_key, writer] : writer_per_partition_key) @@ -1392,11 +1397,24 @@ bool IcebergStorageSink::initializeMetadata() auto catalog_filename = resolver.resolveForCatalog(metadata_info.path); const auto & [namespace_name, table_name] = DataLake::parseTableName(table_id.getTableName()); - if (!catalog->updateMetadata(namespace_name, table_name, catalog_filename, new_snapshot)) + const auto outcome = catalog->updateMetadata(namespace_name, table_name, catalog_filename, new_snapshot); + if (outcome == DataLake::CommitOutcome::RejectedCleanly) { cleanup(true); return false; } + if (outcome == DataLake::CommitOutcome::Unknown) + { + LOG_ERROR( + log, + "Iceberg commit for {}.{} is of unknown status after a lost response; " + "preserving written files to avoid corrupting a possibly-committed snapshot. " + "Orphaned files may need manual cleanup if the commit did not actually land.", + namespace_name, table_name); + return false; + } + /// Committed: the snapshot is now live. + published = true; } } @@ -1412,6 +1430,17 @@ bool IcebergStorageSink::initializeMetadata() } catch (...) { + if (published) + { + /// The commit is already live in the catalog. A failure in trailing post-publish + /// work (e.g. metadata-cache invalidation) must NOT delete the manifest files the + /// live snapshot references. + tryLogCurrentException( + log, + "Post-publish work failed after Iceberg snapshot was committed; " + "skipping cleanup to preserve the published snapshot"); + return true; + } cleanup(false); throw; } diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/Mutations.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/Mutations.cpp index 10d75fd4f3e8..397679f61e48 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/Mutations.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/Mutations.cpp @@ -545,11 +545,21 @@ static bool writeMetadataFiles( { auto catalog_filename = path_resolver.resolveForCatalog(metadata_info.path); const auto & [namespace_name, table_name] = DataLake::parseTableName(table_id.getTableName()); - if (!catalog->updateMetadata(namespace_name, table_name, catalog_filename, new_snapshot)) + const auto outcome = catalog->updateMetadata(namespace_name, table_name, catalog_filename, new_snapshot); + if (outcome == DataLake::CommitOutcome::RejectedCleanly) { cleanup(); return false; } + if (outcome == DataLake::CommitOutcome::Unknown) + { + LOG_ERROR( + getLogger("IcebergMutations"), + "Iceberg mutation commit for {}.{} is of unknown status after a lost response; " + "preserving written files to avoid corrupting a possibly-committed snapshot.", + namespace_name, table_name); + return false; + } } } } @@ -1417,7 +1427,8 @@ ExpireSnapshotsResult expireSnapshots( { auto catalog_filename = persistent_table_components.path_resolver.resolveForCatalog(metadata_info.path); const auto & [namespace_name, parsed_table_name] = DataLake::parseTableName(table_name); - if (!catalog->updateMetadata(namespace_name, parsed_table_name, catalog_filename, nullptr)) + const auto outcome = catalog->updateMetadata(namespace_name, parsed_table_name, catalog_filename, nullptr); + if (outcome != DataLake::CommitOutcome::Committed) { throw Exception( ErrorCodes::LOGICAL_ERROR, diff --git a/tests/integration/test_storage_iceberg_no_spark/catalog_fault_proxy.py b/tests/integration/test_storage_iceberg_no_spark/catalog_fault_proxy.py new file mode 100644 index 000000000000..c05943e4b83c --- /dev/null +++ b/tests/integration/test_storage_iceberg_no_spark/catalog_fault_proxy.py @@ -0,0 +1,243 @@ +#!/usr/bin/env python3 +"""Transparent reverse proxy for an Iceberg REST catalog with controllable fault injection. + +Forwards every request unchanged by default. A test arms a one-shot fault via the +/__fault control API; once armed, the proxy forwards the matching commit POST upstream +(so the catalog commits), consumes the upstream response, then RSTs the client connection +without returning that response. ClickHouse then retries the identical POST into a now +stale-assert state, reproducing the "commit succeeded, response lost" window. + +Request bodies (including Transfer-Encoding: chunked) are fully decoded before being +forwarded, and the upstream status plus forwarded body length of every /tables/ POST are +recorded and exposed via /__fault/status for the test to assert on. +""" + +import http.client +import json +import os +import socket +import struct +import sys +import threading +from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer + +UPSTREAM = os.environ.get("UPSTREAM", "http://rest:8181") +LISTEN = os.environ.get("LISTEN", "0.0.0.0:8181") + +_HOP_BY_HOP = { + "connection", "keep-alive", "proxy-authenticate", "proxy-authorization", + "te", "trailers", "transfer-encoding", "upgrade", +} + + +def _parse_upstream(u): + assert u.startswith("http://"), "only http upstream supported" + hostport = u[len("http://"):].rstrip("/") + if ":" in hostport: + host, port = hostport.split(":", 1) + return host, int(port) + return hostport, 80 + + +UP_HOST, UP_PORT = _parse_upstream(UPSTREAM) + + +class _Fault: + def __init__(self): + self.lock = threading.Lock() + self.armed = None + self.budget = 0 + self.seen = 0 + self.faulted = 0 + self.fault_upstream_status = None + self.commit_posts = [] + + def arm(self, rule): + with self.lock: + self.armed = rule + self.budget = int(rule.get("count", 1)) + self.seen = 0 + self.faulted = 0 + self.fault_upstream_status = None + self.commit_posts = [] + + def disarm(self): + with self.lock: + self.armed = None + self.budget = 0 + + def should_fault(self, method, path): + with self.lock: + if not self.armed or self.budget <= 0: + return False + if self.armed.get("method", "POST").upper() != method.upper(): + return False + if self.armed.get("match_path_substr", "/tables/") not in path: + return False + self.seen += 1 + self.budget -= 1 + self.faulted += 1 + return True + + def record_commit(self, status, faulted, body_len): + with self.lock: + self.commit_posts.append( + {"status": status, "faulted": faulted, "body_len": body_len}) + if faulted: + self.fault_upstream_status = status + + def status(self): + with self.lock: + return { + "armed": self.armed, "budget": self.budget, + "seen": self.seen, "faulted": self.faulted, + "fault_upstream_status": self.fault_upstream_status, + "commit_posts": list(self.commit_posts), + } + + +FAULT = _Fault() + + +class Handler(BaseHTTPRequestHandler): + protocol_version = "HTTP/1.1" + + def _read_body(self): + te = (self.headers.get("Transfer-Encoding") or "").lower() + if "chunked" in te: + return self._read_chunked() + n = int(self.headers.get("Content-Length", 0) or 0) + return self.rfile.read(n) if n else b"" + + def _read_chunked(self): + chunks = [] + while True: + line = self.rfile.readline() + if not line: + break + line = line.strip() + if not line: + continue + try: + size = int(line.split(b";", 1)[0], 16) + except ValueError: + break + if size == 0: + while True: + t = self.rfile.readline() + if t in (b"\r\n", b"\n", b""): + break + break + chunks.append(self.rfile.read(size)) + self.rfile.read(2) + return b"".join(chunks) + + def _json(self, status, obj): + body = json.dumps(obj).encode() + self.send_response(status) + self.send_header("Content-Type", "application/json") + self.send_header("Content-Length", str(len(body))) + self.end_headers() + self.wfile.write(body) + + def _control(self): + if self.command == "POST" and self.path.startswith("/__fault/arm"): + rule = json.loads(self._read_body() or b"{}") + FAULT.arm(rule) + self.log_message("ARM %s", rule) + return self._json(200, {"ok": True, "armed": rule}) + if self.command == "POST" and self.path.startswith("/__fault/disarm"): + self._read_body() + FAULT.disarm() + return self._json(200, {"ok": True}) + if self.command == "GET" and self.path.startswith("/__fault/status"): + return self._json(200, FAULT.status()) + return self._json(404, {"error": "unknown control path"}) + + def _upstream_headers(self): + h = {} + for k, v in self.headers.items(): + kl = k.lower() + if kl in _HOP_BY_HOP or kl in ("host", "content-length"): + continue + h[k] = v + h["Host"] = f"{UP_HOST}:{UP_PORT}" + return h + + def _forward(self, body): + conn = http.client.HTTPConnection(UP_HOST, UP_PORT, timeout=60) + conn.request(self.command, self.path, body=body, headers=self._upstream_headers()) + resp = conn.getresponse() + data = resp.read() + headers = [(k, v) for (k, v) in resp.getheaders() if k.lower() not in _HOP_BY_HOP] + status = resp.status + conn.close() + return status, headers, data + + def _reset_connection(self): + try: + self.connection.setsockopt( + socket.SOL_SOCKET, socket.SO_LINGER, struct.pack("ii", 1, 0)) + except OSError: + pass + self.close_connection = True + try: + self.connection.close() + except OSError: + pass + + def _relay(self, status, headers, data): + self.send_response(status) + sent_len = False + for k, v in headers: + if k.lower() == "content-length": + sent_len = True + self.send_header(k, v) + if not sent_len: + self.send_header("Content-Length", str(len(data))) + self.end_headers() + if self.command != "HEAD": + self.wfile.write(data) + + def _handle(self): + if self.path.startswith("/__fault"): + return self._control() + body = self._read_body() + fault = FAULT.should_fault(self.command, self.path) + try: + status, headers, data = self._forward(body) + except Exception as e: + self.log_message("upstream forward FAILED for %s %s: %s", self.command, self.path, e) + return self._json(502, {"error": "upstream forward failed"}) + + is_commit = self.command == "POST" and "/tables/" in self.path + if is_commit: + FAULT.record_commit(status, fault, len(body)) + self.log_message("commit POST -> upstream %s (faulted=%s, fwd_body=%dB) %s", + status, fault, len(body), self.path) + + if fault: + self.log_message("FAULT: upstream returned %s; dropping response to client (RST)", status) + return self._reset_connection() + return self._relay(status, headers, data) + + do_GET = _handle + do_POST = _handle + do_PUT = _handle + do_DELETE = _handle + do_HEAD = _handle + + def log_message(self, fmt, *args): + sys.stderr.write("[proxy] " + (fmt % args) + "\n") + sys.stderr.flush() + + +def main(): + host, port = LISTEN.split(":") + httpd = ThreadingHTTPServer((host, int(port)), Handler) + print(f"catalog_fault_proxy: listening {LISTEN}, upstream {UPSTREAM}", flush=True) + httpd.serve_forever() + + +if __name__ == "__main__": + main() diff --git a/tests/integration/test_storage_iceberg_no_spark/conftest.py b/tests/integration/test_storage_iceberg_no_spark/conftest.py index b6bf3bf49007..ba374f97dcf5 100644 --- a/tests/integration/test_storage_iceberg_no_spark/conftest.py +++ b/tests/integration/test_storage_iceberg_no_spark/conftest.py @@ -1,3 +1,4 @@ +import os import pytest import logging import pyspark @@ -45,6 +46,12 @@ def started_cluster_iceberg_no_spark(): stay_alive=True, ) + # --- fault-injection proxy for the catalog commit-safety tests --- + # Merge the rest-proxy compose file into the cluster's compose args. + here = os.path.dirname(os.path.realpath(__file__)) + os.environ["PROXY_SCRIPT"] = os.path.join(here, "catalog_fault_proxy.py") + cluster.base_cmd += ["--file", os.path.join(here, "docker_compose_rest_proxy.yml")] + logging.info("Starting cluster...") cluster.start() diff --git a/tests/integration/test_storage_iceberg_no_spark/docker_compose_rest_proxy.yml b/tests/integration/test_storage_iceberg_no_spark/docker_compose_rest_proxy.yml new file mode 100644 index 000000000000..2073444ffe45 --- /dev/null +++ b/tests/integration/test_storage_iceberg_no_spark/docker_compose_rest_proxy.yml @@ -0,0 +1,12 @@ +services: + rest-proxy: + image: python:3.11-slim + hostname: rest-proxy + environment: + UPSTREAM: "http://rest:8181" + LISTEN: "0.0.0.0:8181" + volumes: + - ${PROXY_SCRIPT}:/catalog_fault_proxy.py:ro + command: ["python", "/catalog_fault_proxy.py"] + ports: + - "8999:8181" diff --git a/tests/integration/test_storage_iceberg_no_spark/test_catalog_commit_safety.py b/tests/integration/test_storage_iceberg_no_spark/test_catalog_commit_safety.py new file mode 100644 index 000000000000..563f99285138 --- /dev/null +++ b/tests/integration/test_storage_iceberg_no_spark/test_catalog_commit_safety.py @@ -0,0 +1,242 @@ +#!/usr/bin/env python3 +"""Tests for the Iceberg REST catalog commit-safety bug. + +Bug: on a network failure AFTER the catalog commit succeeds, ClickHouse's HTTP +layer retries the byte-identical commit POST, the catalog returns 409 (its +assert-ref-snapshot-id no longer matches), and -- before the fix -- +`RestCatalog::updateMetadata` collapsed that 409 to `return false`, so the caller +ran cleanup() and deleted files the now-live snapshot references. + +These tests interpose `catalog_fault_proxy.py` between ClickHouse and the real +REST catalog to make the "commit succeeds, response lost" window deterministic, +then assert the corruption invariant: + + NO catalog-referenced snapshot may point at a manifest list that is missing + from object storage. + +Wiring (already in place): + * conftest.py merges docker_compose_rest_proxy.yml into the cluster; that runs + the `rest-proxy` service on the shared network and publishes 8999:8181. + * ClickHouse reaches the catalog THROUGH the proxy (CREATE DATABASE below uses + `http://rest-proxy:8181/v1`); pyiceberg keeps talking to the real catalog at + localhost:8182, so it observes true committed state. + * proxy_control_url() returns http://localhost:8999, the host-published port. + +The fix is re-read-and-classify by snapshot-id. On a failed commit, +`RestCatalog::updateMetadata` re-reads the catalog and returns a typed +`CommitOutcome`: + * Committed -- our snapshot-id is the catalog's current-snapshot-id, or is + present in snapshots[] (we landed, then were superseded). + Callers skip cleanup. + * RejectedCleanly -- the re-read succeeded and our snapshot-id is provably absent + from populated snapshot state. Only here is cleanup safe. + * Unknown -- the re-read failed or carried no snapshot state. Callers + preserve all files, because a recoverable leak beats + unrecoverable corruption. +Making 409 non-retriable at the HTTP layer is an optional efficiency, not the +correctness fix: classification removes the corruption regardless of retry behavior. +""" + +import uuid + +import boto3 +import pyarrow as pa +import pytest +import requests +from helpers.config_cluster import minio_access_key, minio_secret_key +from pyiceberg.catalog import load_catalog +from pyiceberg.partitioning import PartitionSpec +from pyiceberg.schema import Schema, NestedField +from pyiceberg.types import LongType, StringType + +# pyiceberg -> REAL catalog (host-published port, as in test_iceberg_truncate.py) +REST_CATALOG_HOST_URL = "http://localhost:8182" +# ClickHouse -> catalog THROUGH the fault proxy (in-network hostname) +CH_CATALOG_URL_VIA_PROXY = "http://rest-proxy:8181/v1" +CATALOG_NAME = "demo" +BUCKET = "warehouse-rest" + + +# --------------------------------------------------------------------------- # +# Helpers +# --------------------------------------------------------------------------- # +def load_catalog_real(started_cluster): + return load_catalog( + CATALOG_NAME, + **{ + "uri": REST_CATALOG_HOST_URL, + "type": "rest", + "s3.endpoint": f"http://{started_cluster.get_instance_ip('minio')}:9000", + "s3.access-key-id": minio_access_key, + "s3.secret-access-key": minio_secret_key, + }, + ) + + +def proxy_control_url(started_cluster): + # rest-proxy publishes its control+forward port as 8999:8181 + # (docker_compose_rest_proxy.yml), so the host reaches it at localhost:8999. + return "http://localhost:8999" + + +def arm_fault(started_cluster, rule): + r = requests.post(f"{proxy_control_url(started_cluster)}/__fault/arm", + json=rule, timeout=10) + r.raise_for_status() + + +def disarm_fault(started_cluster): + requests.post(f"{proxy_control_url(started_cluster)}/__fault/disarm", timeout=10) + + +def fault_status(started_cluster): + return requests.get(f"{proxy_control_url(started_cluster)}/__fault/status", + timeout=10).json() + + +def _s3(started_cluster): + ip = started_cluster.get_instance_ip("minio") + return boto3.client( + "s3", + endpoint_url=f"http://{ip}:9000", + aws_access_key_id=minio_access_key, + aws_secret_access_key=minio_secret_key, + ) + + +def s3_object_exists(started_cluster, s3_uri): + assert s3_uri and s3_uri.startswith("s3://"), s3_uri + bucket, _, key = s3_uri[len("s3://"):].partition("/") + try: + _s3(started_cluster).head_object(Bucket=bucket, Key=key) + return True + except Exception: + return False + + +def snapshots_with_missing_manifest_list(started_cluster, table): + """Catalog-referenced snapshots whose manifest list is gone from storage. + Empty list == healthy.""" + missing = [] + for snap in (table.metadata.snapshots or []): + ml = getattr(snap, "manifest_list", None) + if ml and not s3_object_exists(started_cluster, ml): + missing.append(snap) + return missing + + +def _setup_table_with_one_snapshot(started_cluster, kind): + """Create namespace+table, point CH at the proxy, seed snapshot N so the next + commit carries assert-ref-snapshot-id. Returns (catalog, namespace, ch_ident, + identifier, snap_N, instance).""" + instance = started_cluster.instances["node1"] + catalog = load_catalog_real(started_cluster) + + namespace = f"ch_commit_safety_{kind}_{uuid.uuid4().hex}" + catalog.create_namespace(namespace) + schema = Schema( + NestedField(field_id=1, name="id", field_type=LongType(), required=False), + NestedField(field_id=2, name="val", field_type=StringType(), required=False), + ) + table_name = "t" + identifier = f"{namespace}.{table_name}" + catalog.create_table( + identifier=identifier, + schema=schema, + location=f"s3://{BUCKET}/{identifier}", + partition_spec=PartitionSpec(), + ) + ch_ident = f"`{namespace}.{table_name}`" + + instance.query(f"DROP DATABASE IF EXISTS {namespace}") + instance.query( + f""" + CREATE DATABASE {namespace} + ENGINE = DataLakeCatalog('{CH_CATALOG_URL_VIA_PROXY}', 'minio', '{minio_secret_key}') + SETTINGS + catalog_type='rest', + warehouse='{CATALOG_NAME}', + storage_endpoint='http://minio:9000/{BUCKET}'; + """, + settings={"allow_database_iceberg": 1}, + ) + + # Seed snapshot N (via pyiceberg/real catalog) so the next CH commit asserts on it. + catalog.load_table(identifier).append( + pa.Table.from_pylist([{"id": 1, "val": "A"}]) + ) + snap_N = catalog.load_table(identifier).metadata.current_snapshot_id + assert int(instance.query( + f"SELECT count() FROM {namespace}.{ch_ident}").strip()) == 1 + return catalog, namespace, ch_ident, identifier, snap_N, instance + + +def test_insert_without_fault_is_clean_baseline(started_cluster_iceberg_no_spark): + cluster = started_cluster_iceberg_no_spark + catalog, namespace, ch_ident, identifier, snap_N, instance = \ + _setup_table_with_one_snapshot(cluster, "insert_baseline") + + disarm_fault(cluster) # ensure nothing left armed from a prior test + + insert_error = None + try: + instance.query( + f"INSERT INTO {namespace}.{ch_ident} VALUES (2, 'B')", + settings={"allow_experimental_insert_into_iceberg": 1}, + ) + except Exception as e: + insert_error = str(e) + + st = fault_status(cluster) + assert st["faulted"] == 0, f"no fault should have fired in baseline: {st}" + + after = catalog.load_table(identifier) + missing = snapshots_with_missing_manifest_list(cluster, after) + + print("BASELINE INSERT raised:", insert_error, + "| current_snapshot:", after.metadata.current_snapshot_id, + "| missing:", [s.snapshot_id for s in missing]) + + assert insert_error is None, f"baseline INSERT should succeed, got: {insert_error}" + assert not missing, ( + f"baseline must not corrupt; missing manifest lists: " + f"{[s.snapshot_id for s in missing]}") + assert int(instance.query( + f"SELECT count() FROM {namespace}.{ch_ident}").strip()) == 2 + + +def test_insert_commit_response_loss_is_handled(started_cluster_iceberg_no_spark): + cluster = started_cluster_iceberg_no_spark + catalog, namespace, ch_ident, identifier, snap_N, instance = \ + _setup_table_with_one_snapshot(cluster, "insert") + + # Drop the response of the FIRST commit POST for this table. + arm_fault(cluster, {"method": "POST", "match_path_substr": "/tables/t", + "mode": "commit_then_drop", "count": 1}) + + insert_error = None + try: + instance.query( + f"INSERT INTO {namespace}.{ch_ident} VALUES (2, 'B')", + settings={"allow_experimental_insert_into_iceberg": 1}, + ) + except Exception as e: + insert_error = str(e) + + st = fault_status(cluster) + assert st["faulted"] == 1, f"fault did not fire (check wiring/precondition): {st}" + + after = catalog.load_table(identifier) + missing = snapshots_with_missing_manifest_list(cluster, after) + + print("INSERT raised:", insert_error, + "| current_snapshot:", after.metadata.current_snapshot_id, + "| missing:", [s.snapshot_id for s in missing]) + + assert insert_error is None, ( + f"lost-response INSERT should succeed cleanly, got: {insert_error}") + assert not missing, ( + "no catalog-referenced snapshot may point at a deleted manifest list; " + f"corrupted: {[s.snapshot_id for s in missing]}") + assert int(instance.query( + f"SELECT count() FROM {namespace}.{ch_ident}").strip()) == 2