diff --git a/examples/grafana/METRICS.md b/examples/grafana/METRICS.md
index fab3e92b..4dc0287e 100644
--- a/examples/grafana/METRICS.md
+++ b/examples/grafana/METRICS.md
@@ -58,6 +58,10 @@ These metrics are reported by Iceberg clients when they perform operations on ta
| `iceberg_commit_added_equality_deletes_total` | Counter | catalog, namespace, table, operation | Total number of equality deletes added in commits |
| `iceberg_commit_total_files_size_bytes` | Counter | catalog, namespace, table, operation | Total size in bytes of files involved in commits |
| `iceberg_commit_duration_seconds` | Histogram | catalog, namespace, table, operation | Duration of commit operations in seconds |
+| `iceberg_commit_retries_total` | Counter | catalog, namespace, table | Server-side retries after a commit CAS conflict (`CommitFailedException`) in the REST catalog commit loop; tune `commitRetry` in `.ice-rest-catalog.yaml` if this grows under parallel writers |
+| `iceberg_commit_lock_acquire_seconds` | Histogram | catalog | Time to acquire the etcd per-table commit lock (`commitLock` in `.ice-rest-catalog.yaml`; etcd backend only) |
+| `iceberg_commit_lock_held_seconds` | Histogram | catalog | Time the etcd commit lock was held during a table commit |
+| `iceberg_commit_lock_acquire_timeouts_total` | Counter | catalog | Acquire attempts that exceeded `commitLock.acquireTimeoutMs` (HTTP 503 to clients) |
#### Reporter Metrics
diff --git a/examples/scratch/.ice-rest-catalog.yaml b/examples/scratch/.ice-rest-catalog.yaml
index 2cd11bf3..ea47bfe9 100644
--- a/examples/scratch/.ice-rest-catalog.yaml
+++ b/examples/scratch/.ice-rest-catalog.yaml
@@ -17,4 +17,4 @@ bearerTokens:
anonymousAccess:
enabled: true
- accessConfig: {}
+ accessConfig: {}
\ No newline at end of file
diff --git a/ice-rest-catalog/README.md b/ice-rest-catalog/README.md
index e0ee2d05..446249b5 100644
--- a/ice-rest-catalog/README.md
+++ b/ice-rest-catalog/README.md
@@ -11,6 +11,19 @@ That's it.
Examples of `.ice-rest-catalog.yaml` (as well as Kubernetes deployment manifests) can be found [here](../examples/).
+## Parallel writers (`commitLock`)
+
+Many concurrent commits to the **same table** can cause repeated `CommitFailedException` (optimistic concurrency). For the **etcd** metastore you can serialize commits per table using etcd’s lock API:
+
+```yaml
+commitLock:
+ enabled: true
+ leaseTtlSeconds: 30
+ acquireTimeoutMs: 30000
+```
+
+If `enabled` is true but the catalog backend is not etcd, the lock is ignored (warning in logs). When lock acquisition exceeds `acquireTimeoutMs`, the server responds with HTTP **503** so clients can retry.
+
## Documentation
- [Architecture](../docs/architecture.md) -- components, design principles, HA, backup/recovery
diff --git a/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/Main.java b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/Main.java
index a901f999..4d94eff7 100644
--- a/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/Main.java
+++ b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/Main.java
@@ -19,6 +19,7 @@
import com.altinity.ice.rest.catalog.internal.aws.CredentialsProvider;
import com.altinity.ice.rest.catalog.internal.config.Config;
import com.altinity.ice.rest.catalog.internal.config.MaintenanceConfig;
+import com.altinity.ice.rest.catalog.internal.etcd.CommitLock;
import com.altinity.ice.rest.catalog.internal.etcd.EtcdCatalog;
import com.altinity.ice.rest.catalog.internal.maintenance.DataCompaction;
import com.altinity.ice.rest.catalog.internal.maintenance.MaintenanceJob;
@@ -279,7 +280,8 @@ private static Server createBaseServer(
if (requireAuth) {
mux.insertHandler(createAuthorizationHandler(config.bearerTokens(), config));
- restCatalogAdapter = new RESTCatalogAdapter(catalog);
+ restCatalogAdapter =
+ new RESTCatalogAdapter(catalog, config.commitRetry(), maybeCommitLock(catalog, config));
var globalConfig = config.toIcebergConfigDefaults();
if (!globalConfig.isEmpty()) {
restCatalogAdapter = new RESTCatalogMiddlewareConfig(restCatalogAdapter, globalConfig);
@@ -299,7 +301,8 @@ private static Server createBaseServer(
new RESTCatalogMiddlewareCredentials(restCatalogAdapter, auth), auth);
}
} else {
- restCatalogAdapter = new RESTCatalogAdapter(catalog);
+ restCatalogAdapter =
+ new RESTCatalogAdapter(catalog, config.commitRetry(), maybeCommitLock(catalog, config));
var globalConfig = config.toIcebergConfigDefaults();
if (!globalConfig.isEmpty()) {
restCatalogAdapter = new RESTCatalogMiddlewareConfig(restCatalogAdapter, globalConfig);
@@ -319,6 +322,18 @@ private static Server createBaseServer(
}
}
+ logger.info(
+ "Commit retry config: numRetries={} minWaitMs={} maxWaitMs={} totalTimeoutMs={}",
+ config.commitRetry().numRetries(),
+ config.commitRetry().minWaitMs(),
+ config.commitRetry().maxWaitMs(),
+ config.commitRetry().totalTimeoutMs());
+ logger.info(
+ "Commit lock config: enabled={} leaseTtlSeconds={} acquireTimeoutMs={}",
+ config.commitLock().enabled(),
+ config.commitLock().leaseTtlSeconds(),
+ config.commitLock().acquireTimeoutMs());
+
var h = new ServletHolder(new RESTCatalogServlet(restCatalogAdapter));
mux.addServlet(h, "/*");
@@ -395,6 +410,22 @@ private static RESTCatalogAuthorizationHandler createAuthorizationHandler(
return new RESTCatalogAuthorizationHandler(tokens, anonymousSession);
}
+ /**
+ * Per-table etcd commit lock for {@link EtcdCatalog}; ignored when disabled or when not using
+ * etcd.
+ */
+ static CommitLock maybeCommitLock(Catalog catalog, Config config) {
+ if (!config.commitLock().enabled()) {
+ return null;
+ }
+ if (!(catalog instanceof EtcdCatalog etcd)) {
+ logger.warn(
+ "commitLock.enabled is true but catalog is not EtcdCatalog; commit lock disabled");
+ return null;
+ }
+ return new CommitLock(etcd.etcdClient(), catalog.name(), config.commitLock());
+ }
+
private static void overrideJettyDefaults(Server s) {
ServerConfig.setQuiet(s);
s.setErrorHandler(new PlainErrorHandler());
diff --git a/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/config/CommitLockConfig.java b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/config/CommitLockConfig.java
new file mode 100644
index 00000000..6bbc80ba
--- /dev/null
+++ b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/config/CommitLockConfig.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright (c) 2025 Altinity Inc and/or its affiliates. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ */
+package com.altinity.ice.rest.catalog.internal.config;
+
+import com.fasterxml.jackson.annotation.JsonPropertyDescription;
+
+/**
+ * Optional per-table etcd commit lock for {@code ice-rest-catalog} when using the etcd metastore.
+ *
+ *
Serializes commits to the same table so concurrent writers do not lose optimistic concurrency
+ * races indefinitely.
+ */
+public record CommitLockConfig(
+ @JsonPropertyDescription(
+ "Enable etcd mutual-exclusion lock around table commits (etcd backend only; default false)")
+ boolean enabled,
+ @JsonPropertyDescription(
+ "Lease TTL for the lock in seconds (must exceed slow commits; default 30)")
+ long leaseTtlSeconds,
+ @JsonPropertyDescription("Max time to wait to acquire the lock in milliseconds (default 30000)")
+ long acquireTimeoutMs) {
+
+ public CommitLockConfig {
+ if (leaseTtlSeconds <= 0) {
+ leaseTtlSeconds = 30;
+ }
+ if (acquireTimeoutMs <= 0) {
+ acquireTimeoutMs = 30_000L;
+ }
+ }
+
+ public static CommitLockConfig defaults() {
+ return new CommitLockConfig(false, 30, 30_000L);
+ }
+}
diff --git a/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/config/CommitRetryConfig.java b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/config/CommitRetryConfig.java
new file mode 100644
index 00000000..a3c0a94a
--- /dev/null
+++ b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/config/CommitRetryConfig.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright (c) 2025 Altinity Inc and/or its affiliates. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ */
+package com.altinity.ice.rest.catalog.internal.config;
+
+import com.fasterxml.jackson.annotation.JsonPropertyDescription;
+import org.apache.iceberg.TableProperties;
+
+/**
+ * Server-side tuning for the REST catalog commit retry loop (OCC compare-and-swap failures).
+ *
+ *
Defaults match Iceberg's {@link TableProperties} commit retry defaults.
+ */
+public record CommitRetryConfig(
+ @JsonPropertyDescription(
+ "Number of retries on CommitFailedException (default: Iceberg commit.retry.num-retries = 4)")
+ int numRetries,
+ @JsonPropertyDescription(
+ "Minimum backoff between retries in ms (default: Iceberg commit.retry.min-wait-ms)")
+ long minWaitMs,
+ @JsonPropertyDescription(
+ "Maximum backoff between retries in ms (default: Iceberg commit.retry.max-wait-ms)")
+ long maxWaitMs,
+ @JsonPropertyDescription(
+ "Total time budget for the retry loop in ms (default: Iceberg commit.retry.total-timeout-ms)")
+ long totalTimeoutMs) {
+
+ public CommitRetryConfig {
+ if (numRetries <= 0) {
+ numRetries = TableProperties.COMMIT_NUM_RETRIES_DEFAULT;
+ }
+ if (minWaitMs <= 0) {
+ minWaitMs = TableProperties.COMMIT_MIN_RETRY_WAIT_MS_DEFAULT;
+ }
+ if (maxWaitMs <= 0) {
+ maxWaitMs = TableProperties.COMMIT_MAX_RETRY_WAIT_MS_DEFAULT;
+ }
+ if (totalTimeoutMs <= 0) {
+ totalTimeoutMs = TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT;
+ }
+ }
+
+ public static CommitRetryConfig defaults() {
+ return new CommitRetryConfig(
+ TableProperties.COMMIT_NUM_RETRIES_DEFAULT,
+ TableProperties.COMMIT_MIN_RETRY_WAIT_MS_DEFAULT,
+ TableProperties.COMMIT_MAX_RETRY_WAIT_MS_DEFAULT,
+ TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT);
+ }
+}
diff --git a/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/config/Config.java b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/config/Config.java
index ab554ebb..cb98cded 100644
--- a/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/config/Config.java
+++ b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/config/Config.java
@@ -56,6 +56,12 @@ public record Config(
"Maintenance schedule in https://github.com/shyiko/skedule?tab=readme-ov-file#format format, e.g. \"every day 00:00\". Empty schedule disables automatic maintenance (default)")
String maintenanceSchedule,
@JsonPropertyDescription("Maintenance config") MaintenanceConfig maintenance,
+ @JsonPropertyDescription(
+ "Server-side commit retry config; tune up for high-contention workloads (e.g., parallel `ice insert` to one table)")
+ CommitRetryConfig commitRetry,
+ @JsonPropertyDescription(
+ "Optional etcd per-table commit lock (etcd metastore only). Reduces CommitFailedException under concurrent writers.")
+ CommitLockConfig commitLock,
@JsonPropertyDescription(
"(experimental) Extra properties to include in loadTable REST response.")
Map loadTableProperties,
@@ -81,6 +87,8 @@ public Config(
AnonymousAccess anonymousAccess,
String maintenanceSchedule,
MaintenanceConfig maintenance,
+ CommitRetryConfig commitRetry,
+ CommitLockConfig commitLock,
Map loadTableProperties,
@JsonProperty("iceberg") Map icebergProperties) {
this.addr = Strings.orDefault(addr, DEFAULT_ADDR);
@@ -98,6 +106,8 @@ public Config(
this.maintenance =
Objects.requireNonNullElseGet(
maintenance, () -> new MaintenanceConfig(null, 0, 0, 0, 0, 0, 0, null, false));
+ this.commitRetry = Objects.requireNonNullElse(commitRetry, CommitRetryConfig.defaults());
+ this.commitLock = Objects.requireNonNullElse(commitLock, CommitLockConfig.defaults());
this.loadTableProperties = Objects.requireNonNullElse(loadTableProperties, Map.of());
this.icebergProperties = Objects.requireNonNullElse(icebergProperties, Map.of());
}
diff --git a/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/etcd/CommitLock.java b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/etcd/CommitLock.java
new file mode 100644
index 00000000..84fb3966
--- /dev/null
+++ b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/etcd/CommitLock.java
@@ -0,0 +1,199 @@
+/*
+ * Copyright (c) 2025 Altinity Inc and/or its affiliates. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ */
+package com.altinity.ice.rest.catalog.internal.etcd;
+
+import com.altinity.ice.rest.catalog.internal.config.CommitLockConfig;
+import com.altinity.ice.rest.catalog.internal.metrics.CatalogMetrics;
+import io.etcd.jetcd.ByteSequence;
+import io.etcd.jetcd.Client;
+import io.etcd.jetcd.Lease;
+import io.etcd.jetcd.Lock;
+import io.etcd.jetcd.lock.LockResponse;
+import io.etcd.jetcd.support.CloseableClient;
+import io.grpc.stub.StreamObserver;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Mutual exclusion for Iceberg table commits using etcd leases + the lock API (same cluster as
+ * {@link EtcdCatalog}).
+ */
+public final class CommitLock {
+
+ private static final Logger logger = LoggerFactory.getLogger(CommitLock.class);
+
+ private static final StreamObserver NOOP_KEEPALIVE =
+ new StreamObserver<>() {
+ @Override
+ public void onNext(io.etcd.jetcd.lease.LeaseKeepAliveResponse value) {}
+
+ @Override
+ public void onError(Throwable t) {}
+
+ @Override
+ public void onCompleted() {}
+ };
+
+ private final Lock lockApi;
+ private final Lease leaseApi;
+ private final String catalogName;
+ private final CommitLockConfig config;
+
+ public CommitLock(Client etcdClient, String catalogName, CommitLockConfig config) {
+ this.lockApi = etcdClient.getLockClient();
+ this.leaseApi = etcdClient.getLeaseClient();
+ this.catalogName = catalogName;
+ this.config = config;
+ }
+
+ /**
+ * Acquire the commit lock for {@code ident}. Caller must {@link Handle#close()} to release.
+ *
+ * @throws CommitLockTimeoutException if the lock is not acquired within {@link
+ * CommitLockConfig#acquireTimeoutMs()}
+ */
+ public Handle acquire(TableIdentifier ident) {
+ String path = lockPath(catalogName, ident);
+ ByteSequence name = ByteSequence.from(path, StandardCharsets.UTF_8);
+ long waitStartNanos = System.nanoTime();
+
+ long leaseId;
+ CloseableClient keepAlive;
+ try {
+ leaseId = leaseApi.grant(config.leaseTtlSeconds()).get().getID();
+ keepAlive = leaseApi.keepAlive(leaseId, NOOP_KEEPALIVE);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ } catch (ExecutionException e) {
+ throw new RuntimeException(e.getCause());
+ }
+
+ try {
+ LockResponse lr =
+ lockApi.lock(name, leaseId).get(config.acquireTimeoutMs(), TimeUnit.MILLISECONDS);
+ long acquireEndNanos = System.nanoTime();
+ CatalogMetrics.getInstance()
+ .recordCommitLockAcquireSeconds(
+ catalogName, (acquireEndNanos - waitStartNanos) / 1_000_000_000.0);
+ return new Handle(lockApi, leaseApi, lr.getKey(), leaseId, keepAlive, acquireEndNanos);
+ } catch (TimeoutException e) {
+ cleanupAfterFailedAcquire(keepAlive, leaseId);
+ CatalogMetrics.getInstance().recordCommitLockAcquireTimeout(catalogName);
+ throw new CommitLockTimeoutException(
+ "commit lock acquire timed out after " + config.acquireTimeoutMs() + " ms", e);
+ } catch (InterruptedException e) {
+ cleanupAfterFailedAcquire(keepAlive, leaseId);
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ } catch (ExecutionException e) {
+ cleanupAfterFailedAcquire(keepAlive, leaseId);
+ throw new RuntimeException(e.getCause());
+ }
+ }
+
+ private void cleanupAfterFailedAcquire(CloseableClient keepAlive, long leaseId) {
+ try {
+ keepAlive.close();
+ } catch (RuntimeException ex) {
+ logger.warn("keepAlive.close failed after lock acquire failure", ex);
+ }
+ try {
+ leaseApi.revoke(leaseId).get();
+ } catch (Exception ex) {
+ logger.warn("lease revoke failed after lock acquire failure", ex);
+ }
+ }
+
+ /**
+ * Acquire locks for every identifier in {@code sorted} order (caller must sort for deadlock-free
+ * ordering), run {@code action}, then release in reverse order.
+ */
+ public void withLocks(List sorted, Runnable action) {
+ List handles = new ArrayList<>(sorted.size());
+ try {
+ for (TableIdentifier id : sorted) {
+ handles.add(acquire(id));
+ }
+ action.run();
+ } finally {
+ for (int i = handles.size() - 1; i >= 0; i--) {
+ try {
+ handles.get(i).close();
+ } catch (RuntimeException e) {
+ logger.warn("failed to release commit lock handle", e);
+ }
+ }
+ }
+ }
+
+ static String lockPath(String catalogName, TableIdentifier ident) {
+ return "locks/v1/" + catalogName + "/" + ident;
+ }
+
+ /** Lease-backed etcd lock; closes to unlock and revoke the lease. */
+ public final class Handle implements AutoCloseable {
+
+ private final Lock lockApi;
+ private final Lease leaseApi;
+ private final ByteSequence lockKey;
+ private final long leaseId;
+ private final CloseableClient keepAlive;
+ private final long acquiredAtNanos;
+
+ private Handle(
+ Lock lockApi,
+ Lease leaseApi,
+ ByteSequence lockKey,
+ long leaseId,
+ CloseableClient keepAlive,
+ long acquiredAtNanos) {
+ this.lockApi = lockApi;
+ this.leaseApi = leaseApi;
+ this.lockKey = lockKey;
+ this.leaseId = leaseId;
+ this.keepAlive = keepAlive;
+ this.acquiredAtNanos = acquiredAtNanos;
+ }
+
+ @Override
+ public void close() {
+ long heldNanos = System.nanoTime() - acquiredAtNanos;
+ try {
+ lockApi.unlock(lockKey).get();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ } catch (ExecutionException e) {
+ throw new RuntimeException(e.getCause());
+ } finally {
+ try {
+ keepAlive.close();
+ } catch (RuntimeException e) {
+ logger.warn("keepAlive.close failed during unlock", e);
+ }
+ try {
+ leaseApi.revoke(leaseId).get();
+ } catch (Exception e) {
+ logger.warn("lease revoke failed during unlock", e);
+ }
+ }
+ CatalogMetrics.getInstance()
+ .recordCommitLockHeldSeconds(catalogName, heldNanos / 1_000_000_000.0);
+ }
+ }
+}
diff --git a/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/etcd/CommitLockTimeoutException.java b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/etcd/CommitLockTimeoutException.java
new file mode 100644
index 00000000..5f99b53b
--- /dev/null
+++ b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/etcd/CommitLockTimeoutException.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright (c) 2025 Altinity Inc and/or its affiliates. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ */
+package com.altinity.ice.rest.catalog.internal.etcd;
+
+/**
+ * Thrown when acquiring the etcd commit lock exceeds the configured timeout. Mapped to HTTP 503 so
+ * clients can retry.
+ */
+public final class CommitLockTimeoutException extends RuntimeException {
+
+ public CommitLockTimeoutException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
diff --git a/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/etcd/EtcdCatalog.java b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/etcd/EtcdCatalog.java
index 6d76830e..11c49473 100644
--- a/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/etcd/EtcdCatalog.java
+++ b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/etcd/EtcdCatalog.java
@@ -82,6 +82,11 @@ public EtcdCatalog(String name, String uri, String warehouseLocation, FileIO io)
this.io = io;
}
+ /** Shared jetcd client for auxiliary features (e.g. commit locks) without a second connection. */
+ public Client etcdClient() {
+ return client;
+ }
+
// Used by EtcdCatalogTest to test concurrent modifications.
protected Txn kvtx() {
return kv.txn();
diff --git a/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/metrics/CatalogMetrics.java b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/metrics/CatalogMetrics.java
index ce7a4d94..f0ba0cdc 100644
--- a/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/metrics/CatalogMetrics.java
+++ b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/metrics/CatalogMetrics.java
@@ -16,10 +16,22 @@
import static com.altinity.ice.rest.catalog.internal.metrics.IcebergMetricNames.CATALOG_OPERATION_LABELS;
import static com.altinity.ice.rest.catalog.internal.metrics.IcebergMetricNames.CATALOG_TABLES_HELP;
import static com.altinity.ice.rest.catalog.internal.metrics.IcebergMetricNames.CATALOG_TABLES_NAME;
+import static com.altinity.ice.rest.catalog.internal.metrics.IcebergMetricNames.COMMIT_LOCK_ACQUIRE_SECONDS_HELP;
+import static com.altinity.ice.rest.catalog.internal.metrics.IcebergMetricNames.COMMIT_LOCK_ACQUIRE_SECONDS_NAME;
+import static com.altinity.ice.rest.catalog.internal.metrics.IcebergMetricNames.COMMIT_LOCK_ACQUIRE_TIMEOUTS_TOTAL_HELP;
+import static com.altinity.ice.rest.catalog.internal.metrics.IcebergMetricNames.COMMIT_LOCK_ACQUIRE_TIMEOUTS_TOTAL_NAME;
+import static com.altinity.ice.rest.catalog.internal.metrics.IcebergMetricNames.COMMIT_LOCK_HELD_SECONDS_HELP;
+import static com.altinity.ice.rest.catalog.internal.metrics.IcebergMetricNames.COMMIT_LOCK_HELD_SECONDS_NAME;
+import static com.altinity.ice.rest.catalog.internal.metrics.IcebergMetricNames.COMMIT_LOCK_LABELS;
+import static com.altinity.ice.rest.catalog.internal.metrics.IcebergMetricNames.COMMIT_RETRIES_TOTAL_HELP;
+import static com.altinity.ice.rest.catalog.internal.metrics.IcebergMetricNames.COMMIT_RETRIES_TOTAL_NAME;
+import static com.altinity.ice.rest.catalog.internal.metrics.IcebergMetricNames.COMMIT_RETRY_LABELS;
+import static com.altinity.ice.rest.catalog.internal.metrics.IcebergMetricNames.DURATION_BUCKETS;
import static com.altinity.ice.rest.catalog.internal.metrics.IcebergMetricNames.LABEL_CATALOG;
import io.prometheus.metrics.core.metrics.Counter;
import io.prometheus.metrics.core.metrics.Gauge;
+import io.prometheus.metrics.core.metrics.Histogram;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -52,6 +64,10 @@ private static class Holder {
private final Gauge tablesTotal;
private final Gauge namespacesTotal;
private final Counter operationsTotal;
+ private final Counter commitRetriesTotal;
+ private final Histogram commitLockAcquireSeconds;
+ private final Histogram commitLockHeldSeconds;
+ private final Counter commitLockAcquireTimeoutsTotal;
/** Returns the singleton instance of the catalog metrics. */
public static CatalogMetrics getInstance() {
@@ -80,6 +96,36 @@ private CatalogMetrics() {
.labelNames(CATALOG_OPERATION_LABELS)
.register();
+ this.commitRetriesTotal =
+ Counter.builder()
+ .name(COMMIT_RETRIES_TOTAL_NAME)
+ .help(COMMIT_RETRIES_TOTAL_HELP)
+ .labelNames(COMMIT_RETRY_LABELS)
+ .register();
+
+ this.commitLockAcquireSeconds =
+ Histogram.builder()
+ .name(COMMIT_LOCK_ACQUIRE_SECONDS_NAME)
+ .help(COMMIT_LOCK_ACQUIRE_SECONDS_HELP)
+ .labelNames(COMMIT_LOCK_LABELS)
+ .classicUpperBounds(DURATION_BUCKETS)
+ .register();
+
+ this.commitLockHeldSeconds =
+ Histogram.builder()
+ .name(COMMIT_LOCK_HELD_SECONDS_NAME)
+ .help(COMMIT_LOCK_HELD_SECONDS_HELP)
+ .labelNames(COMMIT_LOCK_LABELS)
+ .classicUpperBounds(DURATION_BUCKETS)
+ .register();
+
+ this.commitLockAcquireTimeoutsTotal =
+ Counter.builder()
+ .name(COMMIT_LOCK_ACQUIRE_TIMEOUTS_TOTAL_NAME)
+ .help(COMMIT_LOCK_ACQUIRE_TIMEOUTS_TOTAL_HELP)
+ .labelNames(COMMIT_LOCK_LABELS)
+ .register();
+
logger.info("Catalog Prometheus metrics initialized");
}
@@ -118,6 +164,26 @@ public void recordOperation(String catalog, String operation) {
operationsTotal.labelValues(catalog, operation).inc();
}
+ /** Record one server-side commit retry after a commit CAS conflict (CommitFailedException). */
+ public void recordCommitRetry(String catalog, String namespace, String table) {
+ commitRetriesTotal.labelValues(catalog, namespace, table).inc();
+ }
+
+ /** Record duration of etcd commit lock acquisition (wait time). */
+ public void recordCommitLockAcquireSeconds(String catalog, double seconds) {
+ commitLockAcquireSeconds.labelValues(catalog).observe(seconds);
+ }
+
+ /** Record duration the etcd commit lock was held during a commit. */
+ public void recordCommitLockHeldSeconds(String catalog, double seconds) {
+ commitLockHeldSeconds.labelValues(catalog).observe(seconds);
+ }
+
+ /** Record a commit lock acquire that exceeded {@code acquireTimeoutMs}. */
+ public void recordCommitLockAcquireTimeout(String catalog) {
+ commitLockAcquireTimeoutsTotal.labelValues(catalog).inc();
+ }
+
/** Record a table creation. */
public void recordTableCreated(String catalog) {
incrementTablesTotal(catalog);
diff --git a/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/metrics/IcebergMetricNames.java b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/metrics/IcebergMetricNames.java
index c189eead..d27603c0 100644
--- a/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/metrics/IcebergMetricNames.java
+++ b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/metrics/IcebergMetricNames.java
@@ -152,6 +152,28 @@ private IcebergMetricNames() {}
public static final String COMMIT_DURATION_NAME = "iceberg_commit_duration_seconds";
public static final String COMMIT_DURATION_HELP = "Duration of commit operations in seconds";
+ public static final String COMMIT_RETRIES_TOTAL_NAME = "iceberg_commit_retries_total";
+ public static final String COMMIT_RETRIES_TOTAL_HELP =
+ "Total number of CommitFailedException retries triggered by the server-side commit retry loop";
+
+ public static final String[] COMMIT_RETRY_LABELS = {LABEL_CATALOG, LABEL_NAMESPACE, LABEL_TABLE};
+
+ public static final String COMMIT_LOCK_ACQUIRE_SECONDS_NAME =
+ "iceberg_commit_lock_acquire_seconds";
+ public static final String COMMIT_LOCK_ACQUIRE_SECONDS_HELP =
+ "Time taken to acquire the etcd table commit lock (seconds)";
+
+ public static final String COMMIT_LOCK_HELD_SECONDS_NAME = "iceberg_commit_lock_held_seconds";
+ public static final String COMMIT_LOCK_HELD_SECONDS_HELP =
+ "Time the etcd table commit lock was held during commit (seconds)";
+
+ public static final String COMMIT_LOCK_ACQUIRE_TIMEOUTS_TOTAL_NAME =
+ "iceberg_commit_lock_acquire_timeouts_total";
+ public static final String COMMIT_LOCK_ACQUIRE_TIMEOUTS_TOTAL_HELP =
+ "Total number of etcd commit lock acquire attempts that exceeded acquireTimeoutMs";
+
+ public static final String[] COMMIT_LOCK_LABELS = {LABEL_CATALOG};
+
// ==========================================================================
// Histogram Buckets
// ==========================================================================
diff --git a/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/rest/RESTCatalogAdapter.java b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/rest/RESTCatalogAdapter.java
index 5c782a58..66b0b725 100644
--- a/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/rest/RESTCatalogAdapter.java
+++ b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/rest/RESTCatalogAdapter.java
@@ -18,15 +18,13 @@
*/
package com.altinity.ice.rest.catalog.internal.rest;
-import static org.apache.iceberg.TableProperties.COMMIT_MAX_RETRY_WAIT_MS_DEFAULT;
-import static org.apache.iceberg.TableProperties.COMMIT_MIN_RETRY_WAIT_MS_DEFAULT;
-import static org.apache.iceberg.TableProperties.COMMIT_NUM_RETRIES_DEFAULT;
-import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT;
-
import com.altinity.ice.rest.catalog.internal.auth.Session;
+import com.altinity.ice.rest.catalog.internal.config.CommitRetryConfig;
+import com.altinity.ice.rest.catalog.internal.etcd.CommitLock;
import com.altinity.ice.rest.catalog.internal.metrics.CatalogMetrics;
import com.altinity.ice.rest.catalog.internal.metrics.PrometheusMetricsReporter;
import java.util.Arrays;
+import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -39,12 +37,14 @@
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.Transaction;
import org.apache.iceberg.Transactions;
+import org.apache.iceberg.UpdateRequirement;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.SupportsNamespaces;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.catalog.ViewCatalog;
import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.rest.CatalogHandlers;
import org.apache.iceberg.rest.Endpoint;
@@ -60,21 +60,43 @@
import org.apache.iceberg.rest.requests.UpdateNamespacePropertiesRequest;
import org.apache.iceberg.rest.requests.UpdateTableRequest;
import org.apache.iceberg.rest.responses.ConfigResponse;
+import org.apache.iceberg.rest.responses.LoadTableResponse;
import org.apache.iceberg.rest.responses.OAuthTokenResponse;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.Tasks;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class RESTCatalogAdapter implements RESTCatalogHandler {
+ private static final Logger logger = LoggerFactory.getLogger(RESTCatalogAdapter.class);
+
private final Catalog catalog;
private final SupportsNamespaces asNamespaceCatalog;
private final ViewCatalog asViewCatalog;
+ private final CommitRetryConfig commitRetry;
+ private final CommitLock commitLock;
public RESTCatalogAdapter(Catalog catalog) {
+ this(catalog, CommitRetryConfig.defaults(), null);
+ }
+
+ public RESTCatalogAdapter(Catalog catalog, CommitRetryConfig commitRetry) {
+ this(catalog, commitRetry, null);
+ }
+
+ public RESTCatalogAdapter(Catalog catalog, CommitRetryConfig commitRetry, CommitLock commitLock) {
this.catalog = catalog;
this.asNamespaceCatalog =
catalog instanceof SupportsNamespaces ? (SupportsNamespaces) catalog : null;
this.asViewCatalog = catalog instanceof ViewCatalog ? (ViewCatalog) catalog : null;
+ this.commitRetry = commitRetry != null ? commitRetry : CommitRetryConfig.defaults();
+ this.commitLock = commitLock;
+ }
+
+ private static String namespaceLabel(TableIdentifier ident) {
+ String[] levels = ident.namespace().levels();
+ return levels.length == 0 ? "" : String.join(".", levels);
}
@Override
@@ -235,7 +257,7 @@ public T handle(
{
TableIdentifier ident = tableIdentFromPathVars(vars);
UpdateTableRequest request = castRequest(UpdateTableRequest.class, requestBody);
- var response = CatalogHandlers.updateTable(catalog, ident, request);
+ var response = updateTable(catalog, ident, request);
// Check if this update contains schema changes
boolean hasSchemaUpdate =
@@ -274,7 +296,7 @@ public T handle(
{
CommitTransactionRequest request =
castRequest(CommitTransactionRequest.class, requestBody);
- commitTransaction(catalog, request);
+ commitTransaction(request);
return null;
}
@@ -384,15 +406,86 @@ private static OAuthTokenResponse handleOAuthRequest(Object body) {
}
}
+ private LoadTableResponse updateTable(
+ Catalog catalog, TableIdentifier ident, UpdateTableRequest request) {
+ if (commitLock != null) {
+ try (CommitLock.Handle ignored = commitLock.acquire(ident)) {
+ return loadAndCommitTable(catalog, ident, request);
+ }
+ }
+ return loadAndCommitTable(catalog, ident, request);
+ }
+
+ /**
+ * Matches {@link CatalogHandlers#updateTable(Catalog, TableIdentifier, UpdateTableRequest)} so
+ * staged-create commits ({@link UpdateRequirement.AssertTableDoesNotExist}) work instead of
+ * failing with {@link org.apache.iceberg.exceptions.NoSuchTableException} from {@link
+ * Catalog#loadTable(TableIdentifier)}.
+ */
+ private static boolean isCreate(UpdateTableRequest request) {
+ boolean isCreate =
+ request.requirements().stream()
+ .anyMatch(UpdateRequirement.AssertTableDoesNotExist.class::isInstance);
+
+ if (isCreate) {
+ List> invalidRequirements =
+ request.requirements().stream()
+ .filter(req -> !(req instanceof UpdateRequirement.AssertTableDoesNotExist))
+ .collect(Collectors.toList());
+ Preconditions.checkArgument(
+ invalidRequirements.isEmpty(), "Invalid create requirements: %s", invalidRequirements);
+ }
+
+ return isCreate;
+ }
+
+ private LoadTableResponse loadAndCommitTable(
+ Catalog catalog, TableIdentifier ident, UpdateTableRequest request) {
+ if (isCreate(request)) {
+ logger.info("Committing staged table create for {}", ident);
+ return CatalogHandlers.updateTable(catalog, ident, request);
+ }
+ Table table = catalog.loadTable(ident);
+ if (!(table instanceof BaseTable)) {
+ throw new IllegalStateException("Cannot wrap catalog that does not produce BaseTable");
+ }
+ TableOperations ops = ((BaseTable) table).operations();
+ TableMetadata updated = commit(ops, request, ident);
+ return LoadTableResponse.builder().withTableMetadata(updated).build();
+ }
+
/**
* This is a very simplistic approach that only validates the requirements for each table and does
* not do any other conflict detection. Therefore, it does not guarantee true transactional
* atomicity, which is left to the implementation details of a REST server.
*/
- private static void commitTransaction(Catalog catalog, CommitTransactionRequest request) {
+ private void commitTransaction(CommitTransactionRequest request) {
+ List sorted =
+ request.tableChanges().stream()
+ .map(UpdateTableRequest::identifier)
+ .distinct()
+ .sorted(Comparator.comparing(TableIdentifier::toString))
+ .toList();
+
+ Runnable body = () -> commitTransactionBody(request);
+ if (commitLock != null) {
+ commitLock.withLocks(sorted, body);
+ } else {
+ body.run();
+ }
+ }
+
+ private void commitTransactionBody(CommitTransactionRequest request) {
List transactions = Lists.newArrayList();
for (UpdateTableRequest tableChange : request.tableChanges()) {
+ if (isCreate(tableChange)) {
+ logger.info(
+ "Committing staged table create (multi-table txn) for {}", tableChange.identifier());
+ CatalogHandlers.updateTable(catalog, tableChange.identifier(), tableChange);
+ continue;
+ }
+
Table table = catalog.loadTable(tableChange.identifier());
if (table instanceof BaseTable) {
Transaction transaction =
@@ -404,7 +497,7 @@ private static void commitTransaction(Catalog catalog, CommitTransactionRequest
(BaseTransaction.TransactionTable) transaction.table();
// this performs validations and makes temporary commits that are in-memory
- commit(txTable.operations(), tableChange);
+ commit(txTable.operations(), tableChange, tableChange.identifier());
} else {
throw new IllegalStateException("Cannot wrap catalog that does not produce BaseTable");
}
@@ -414,18 +507,26 @@ private static void commitTransaction(Catalog catalog, CommitTransactionRequest
transactions.forEach(Transaction::commitTransaction);
}
- // Copied from CatalogHandlers.commit.
- static TableMetadata commit(TableOperations ops, UpdateTableRequest request) {
+ // Copied from CatalogHandlers.commit; retry budget is configurable (see CommitRetryConfig).
+ private TableMetadata commit(
+ TableOperations ops, UpdateTableRequest request, TableIdentifier ident) {
AtomicBoolean isRetry = new AtomicBoolean(false);
try {
Tasks.foreach(ops)
- .retry(COMMIT_NUM_RETRIES_DEFAULT)
+ .retry(commitRetry.numRetries())
.exponentialBackoff(
- COMMIT_MIN_RETRY_WAIT_MS_DEFAULT,
- COMMIT_MAX_RETRY_WAIT_MS_DEFAULT,
- COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT,
+ commitRetry.minWaitMs(),
+ commitRetry.maxWaitMs(),
+ commitRetry.totalTimeoutMs(),
2.0 /* exponential */)
.onlyRetryOn(CommitFailedException.class)
+ .onFailure(
+ (task, ex) -> {
+ if (ex instanceof CommitFailedException) {
+ CatalogMetrics.getInstance()
+ .recordCommitRetry(catalog.name(), namespaceLabel(ident), ident.name());
+ }
+ })
.run(
taskOps -> {
TableMetadata base = isRetry.get() ? taskOps.refresh() : taskOps.current();
@@ -446,6 +547,11 @@ static TableMetadata commit(TableOperations ops, UpdateTableRequest request) {
TableMetadata updated = metadataBuilder.build();
if (updated.changes().isEmpty()) {
// do not commit if the metadata has not changed
+ logger.warn(
+ "commit no-op for table {}: empty metadata changes after refresh "
+ + "(isRetry={}); client requirements validated but updates produced no changes",
+ ident,
+ isRetry.get());
return;
}
diff --git a/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/rest/RESTCatalogServlet.java b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/rest/RESTCatalogServlet.java
index 4b2563ec..65926cca 100644
--- a/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/rest/RESTCatalogServlet.java
+++ b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/rest/RESTCatalogServlet.java
@@ -19,6 +19,7 @@
package com.altinity.ice.rest.catalog.internal.rest;
import com.altinity.ice.rest.catalog.internal.auth.Session;
+import com.altinity.ice.rest.catalog.internal.etcd.CommitLockTimeoutException;
import com.altinity.ice.rest.catalog.internal.metrics.HttpMetrics;
import jakarta.servlet.http.HttpServlet;
import jakarta.servlet.http.HttpServletRequest;
@@ -70,6 +71,7 @@ public class RESTCatalogServlet extends HttpServlet {
.put(CommitFailedException.class, 409)
.put(UnprocessableEntityException.class, 422)
.put(CommitStateUnknownException.class, 500)
+ .put(CommitLockTimeoutException.class, HttpServletResponse.SC_SERVICE_UNAVAILABLE)
.buildOrThrow();
private final RESTCatalogHandler restCatalogAdapter;
diff --git a/ice-rest-catalog/src/test/java/com/altinity/ice/rest/catalog/RESTCatalogTestBase.java b/ice-rest-catalog/src/test/java/com/altinity/ice/rest/catalog/RESTCatalogTestBase.java
index bfdc07cd..df8fee2a 100644
--- a/ice-rest-catalog/src/test/java/com/altinity/ice/rest/catalog/RESTCatalogTestBase.java
+++ b/ice-rest-catalog/src/test/java/com/altinity/ice/rest/catalog/RESTCatalogTestBase.java
@@ -102,6 +102,8 @@ public void setUp() throws Exception {
false, null)), // anonymousAccess - enable with read-write for testing
null, // maintenanceSchedule
null, // maintenance
+ null, // commitRetry
+ null, // commitLock
null, // loadTableProperties
null // icebergProperties
);
diff --git a/ice-rest-catalog/src/test/java/com/altinity/ice/rest/catalog/internal/config/CommitRetryConfigTest.java b/ice-rest-catalog/src/test/java/com/altinity/ice/rest/catalog/internal/config/CommitRetryConfigTest.java
new file mode 100644
index 00000000..ea74edd8
--- /dev/null
+++ b/ice-rest-catalog/src/test/java/com/altinity/ice/rest/catalog/internal/config/CommitRetryConfigTest.java
@@ -0,0 +1,45 @@
+/*
+ * Copyright (c) 2025 Altinity Inc and/or its affiliates. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ */
+package com.altinity.ice.rest.catalog.internal.config;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.apache.iceberg.TableProperties;
+import org.junit.Test;
+
+public class CommitRetryConfigTest {
+
+ @Test
+ public void defaultsMatchIcebergTableProperties() {
+ CommitRetryConfig d = CommitRetryConfig.defaults();
+ assertThat(d.numRetries()).isEqualTo(TableProperties.COMMIT_NUM_RETRIES_DEFAULT);
+ assertThat(d.minWaitMs()).isEqualTo(TableProperties.COMMIT_MIN_RETRY_WAIT_MS_DEFAULT);
+ assertThat(d.maxWaitMs()).isEqualTo(TableProperties.COMMIT_MAX_RETRY_WAIT_MS_DEFAULT);
+ assertThat(d.totalTimeoutMs()).isEqualTo(TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT);
+ }
+
+ @Test
+ public void zeroOrNegativeFieldsFallBackToDefaults() {
+ CommitRetryConfig c = new CommitRetryConfig(0, 0, 0, 0);
+ assertThat(c.numRetries()).isEqualTo(TableProperties.COMMIT_NUM_RETRIES_DEFAULT);
+ assertThat(c.minWaitMs()).isEqualTo(TableProperties.COMMIT_MIN_RETRY_WAIT_MS_DEFAULT);
+ assertThat(c.maxWaitMs()).isEqualTo(TableProperties.COMMIT_MAX_RETRY_WAIT_MS_DEFAULT);
+ assertThat(c.totalTimeoutMs()).isEqualTo(TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT);
+ }
+
+ @Test
+ public void explicitPositiveValuesPreserved() {
+ CommitRetryConfig c = new CommitRetryConfig(20, 50L, 10_000L, 500_000L);
+ assertThat(c.numRetries()).isEqualTo(20);
+ assertThat(c.minWaitMs()).isEqualTo(50L);
+ assertThat(c.maxWaitMs()).isEqualTo(10_000L);
+ assertThat(c.totalTimeoutMs()).isEqualTo(500_000L);
+ }
+}
diff --git a/ice-rest-catalog/src/test/java/com/altinity/ice/rest/catalog/internal/etcd/EtcdCatalogIT.java b/ice-rest-catalog/src/test/java/com/altinity/ice/rest/catalog/internal/etcd/EtcdCatalogIT.java
index 2b23b10d..d80a7880 100644
--- a/ice-rest-catalog/src/test/java/com/altinity/ice/rest/catalog/internal/etcd/EtcdCatalogIT.java
+++ b/ice-rest-catalog/src/test/java/com/altinity/ice/rest/catalog/internal/etcd/EtcdCatalogIT.java
@@ -12,12 +12,19 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import com.altinity.ice.rest.catalog.internal.config.CommitLockConfig;
import io.etcd.jetcd.KV;
import io.etcd.jetcd.Txn;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.stream.IntStream;
import org.apache.iceberg.BaseMetastoreTableOperations;
@@ -315,6 +322,40 @@ public void testTableRegister() {
assertThat(catalog.dropNamespace(namespace)).isTrue();
}
+ @Test
+ public void commitLockSerializesConcurrentAcquires() throws Exception {
+ CommitLock lock =
+ new CommitLock(
+ catalog.etcdClient(), catalog.name(), new CommitLockConfig(true, 30, 60_000));
+ TableIdentifier id = TableIdentifier.of(Namespace.of("ns"), "t");
+ AtomicInteger concurrent = new AtomicInteger(0);
+ AtomicInteger maxConcurrent = new AtomicInteger(0);
+ int threads = 10;
+ ExecutorService pool = Executors.newFixedThreadPool(threads);
+ CountDownLatch start = new CountDownLatch(1);
+ List> futures = new ArrayList<>();
+ for (int i = 0; i < threads; i++) {
+ futures.add(
+ pool.submit(
+ () -> {
+ start.await();
+ try (CommitLock.Handle h = lock.acquire(id)) {
+ int c = concurrent.incrementAndGet();
+ maxConcurrent.updateAndGet(m -> Math.max(m, c));
+ Thread.sleep(5);
+ concurrent.decrementAndGet();
+ }
+ return null;
+ }));
+ }
+ start.countDown();
+ for (Future> f : futures) {
+ f.get();
+ }
+ pool.shutdown();
+ assertThat(maxConcurrent.get()).isEqualTo(1);
+ }
+
private static String rand() {
return UUID.randomUUID().toString();
}
diff --git a/ice-rest-catalog/src/test/java/com/altinity/ice/rest/catalog/internal/rest/RESTCatalogAdapterCreateIT.java b/ice-rest-catalog/src/test/java/com/altinity/ice/rest/catalog/internal/rest/RESTCatalogAdapterCreateIT.java
new file mode 100644
index 00000000..ebde55a2
--- /dev/null
+++ b/ice-rest-catalog/src/test/java/com/altinity/ice/rest/catalog/internal/rest/RESTCatalogAdapterCreateIT.java
@@ -0,0 +1,191 @@
+/*
+ * Copyright (c) 2025 Altinity Inc and/or its affiliates. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ */
+package com.altinity.ice.rest.catalog.internal.rest;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import com.altinity.ice.rest.catalog.internal.auth.Session;
+import com.altinity.ice.rest.catalog.internal.config.Config;
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.MetadataUpdate;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.UpdateRequirements;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.rest.HTTPRequest;
+import org.apache.iceberg.rest.RESTUtil;
+import org.apache.iceberg.rest.requests.CreateTableRequest;
+import org.apache.iceberg.rest.requests.UpdateTableRequest;
+import org.apache.iceberg.rest.responses.LoadTableResponse;
+import org.apache.iceberg.types.Types;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+/**
+ * Covers staged table create (REST {@code stageCreate} + commit) which must use {@link
+ * org.apache.iceberg.rest.CatalogHandlers#updateTable} create path instead of {@link
+ * org.apache.iceberg.catalog.Catalog#loadTable(TableIdentifier)} first. Uses an in-memory {@code
+ * jdbc:sqlite} metastore (same pattern as {@link
+ * com.altinity.ice.rest.catalog.RESTCatalogTestBase}).
+ */
+public class RESTCatalogAdapterCreateIT {
+
+ private static final Schema SCHEMA =
+ new Schema(Types.NestedField.required(1, "id", Types.LongType.get()));
+
+ private Path warehouseDir;
+ private Catalog catalog;
+ private RESTCatalogAdapter adapter;
+ private Session session;
+
+ @BeforeClass
+ public void beforeClass() throws IOException {
+ warehouseDir = Files.createTempDirectory("restcat-create-it-");
+ String warehouseUri = warehouseDir.toUri().toString();
+ if (!warehouseUri.endsWith("/")) {
+ warehouseUri = warehouseUri + "/";
+ }
+ Config config =
+ new Config(
+ "localhost:8080",
+ "localhost:8081",
+ null,
+ "it",
+ "jdbc:sqlite::memory:",
+ warehouseUri,
+ null,
+ null,
+ null,
+ new Config.AnonymousAccess(true, new Config.AccessConfig(false, null)),
+ null,
+ null,
+ null,
+ null,
+ null,
+ null);
+ catalog = CatalogUtil.buildIcebergCatalog("it", config.toIcebergConfig(), null);
+ adapter = new RESTCatalogAdapter(catalog);
+ session = new Session("it", false, null);
+ }
+
+ @AfterClass
+ public void afterClass() throws Exception {
+ if (catalog instanceof Closeable c) {
+ c.close();
+ }
+ if (warehouseDir != null) {
+ try (var walk = Files.walk(warehouseDir)) {
+ walk.sorted((a, b) -> -a.compareTo(b))
+ .forEach(
+ p -> {
+ try {
+ Files.deleteIfExists(p);
+ } catch (IOException ignored) {
+ // best-effort cleanup
+ }
+ });
+ }
+ }
+ }
+
+ private static String shortId() {
+ return UUID.randomUUID().toString().replace("-", "").substring(0, 8);
+ }
+
+ @Test
+ public void stagedCreateCommitRegistersTable() {
+ String nsName = "ns_" + shortId();
+ Namespace ns = Namespace.of(nsName);
+ ((SupportsNamespaces) catalog).createNamespace(ns);
+ String tableName = "t_" + shortId();
+ TableIdentifier ident = TableIdentifier.of(ns, tableName);
+
+ CreateTableRequest stageReq =
+ CreateTableRequest.builder().withName(tableName).withSchema(SCHEMA).stageCreate().build();
+ stageReq.validate();
+
+ Map createVars = Map.of("namespace", RESTUtil.encodeNamespace(ns));
+ LoadTableResponse staged =
+ adapter.handle(session, Route.CREATE_TABLE, createVars, stageReq, LoadTableResponse.class);
+ assertThat(staged.tableMetadata()).isNotNull();
+
+ List updates = staged.tableMetadata().changes();
+ UpdateTableRequest commitReq =
+ UpdateTableRequest.create(ident, UpdateRequirements.forCreateTable(updates), updates);
+ commitReq.validate();
+
+ Map updateVars =
+ Map.of(
+ "namespace", RESTUtil.encodeNamespace(ns),
+ "table", RESTUtil.encodeString(tableName));
+ LoadTableResponse committed =
+ adapter.handle(session, Route.UPDATE_TABLE, updateVars, commitReq, LoadTableResponse.class);
+ assertThat(committed.tableMetadata()).isNotNull();
+ assertThat(catalog.tableExists(ident)).isTrue();
+ assertThat(catalog.loadTable(ident).schema().sameSchema(SCHEMA)).isTrue();
+ }
+
+ @Test
+ public void stagedCreateCommitDuplicateFails() {
+ String nsName = "ns_" + shortId();
+ Namespace ns = Namespace.of(nsName);
+ ((SupportsNamespaces) catalog).createNamespace(ns);
+ String tableName = "t_" + shortId();
+ TableIdentifier ident = TableIdentifier.of(ns, tableName);
+
+ CreateTableRequest stageReq =
+ CreateTableRequest.builder().withName(tableName).withSchema(SCHEMA).stageCreate().build();
+ stageReq.validate();
+
+ Map createVars = Map.of("namespace", RESTUtil.encodeNamespace(ns));
+ LoadTableResponse staged =
+ adapter.handle(session, Route.CREATE_TABLE, createVars, stageReq, LoadTableResponse.class);
+
+ List updates = staged.tableMetadata().changes();
+ UpdateTableRequest commitReq =
+ UpdateTableRequest.create(ident, UpdateRequirements.forCreateTable(updates), updates);
+ commitReq.validate();
+
+ Map updateVars =
+ Map.of(
+ "namespace", RESTUtil.encodeNamespace(ns),
+ "table", RESTUtil.encodeString(tableName));
+
+ adapter.handle(session, Route.UPDATE_TABLE, updateVars, commitReq, LoadTableResponse.class);
+ assertThat(catalog.tableExists(ident)).isTrue();
+
+ assertThatThrownBy(
+ () ->
+ adapter.handle(
+ session, Route.UPDATE_TABLE, updateVars, commitReq, LoadTableResponse.class))
+ .isInstanceOfAny(AlreadyExistsException.class, CommitFailedException.class);
+ }
+
+ @Test
+ public void routeFromMatchesStagedCreatePaths() {
+ assertThat(Route.from(HTTPRequest.HTTPMethod.POST, "v1/namespaces/ns1/tables").first())
+ .isEqualTo(Route.CREATE_TABLE);
+ assertThat(Route.from(HTTPRequest.HTTPMethod.POST, "v1/namespaces/ns1/tables/t1").first())
+ .isEqualTo(Route.UPDATE_TABLE);
+ }
+}
diff --git a/ice-rest-catalog/src/test/resources/scenarios/basic-operations/run.sh.tmpl b/ice-rest-catalog/src/test/resources/scenarios/basic-operations/run.sh.tmpl
index de9184a2..d82a93e6 100644
--- a/ice-rest-catalog/src/test/resources/scenarios/basic-operations/run.sh.tmpl
+++ b/ice-rest-catalog/src/test/resources/scenarios/basic-operations/run.sh.tmpl
@@ -30,6 +30,10 @@ echo "OK list-namespaces listed ${NAMESPACE_NAME}"
{{ICE_CLI}} --config {{CLI_CONFIG}} describe
echo "OK Listed namespaces"
+# Explicit create-table (staged create + commit); regression for RESTCatalogAdapter UPDATE_TABLE path
+{{ICE_CLI}} --config {{CLI_CONFIG}} create-table ${TABLE_CREATE_VIA_CLI} --schema-from-parquet "file://${INPUT_PATH}"
+echo "OK create-table ${TABLE_CREATE_VIA_CLI}"
+
# Insert from file (like README: ice insert flowers.iris -p file://...)
{{ICE_CLI}} --config {{CLI_CONFIG}} insert ${TABLE_IRIS} -p "file://${INPUT_PATH}"
echo "Inserted from file into ${TABLE_IRIS}"
@@ -213,6 +217,7 @@ echo "OK list-tables listed tables in ${NAMESPACE_NAME}"
{{ICE_CLI}} --config {{CLI_CONFIG}} delete-table ${TABLE_IRIS}
{{ICE_CLI}} --config {{CLI_CONFIG}} delete-table ${TABLE_PARTITIONED}
{{ICE_CLI}} --config {{CLI_CONFIG}} delete-table ${TABLE_SORTED}
+{{ICE_CLI}} --config {{CLI_CONFIG}} delete-table ${TABLE_CREATE_VIA_CLI}
echo "OK Deleted tables"
# Delete the namespace via CLI
diff --git a/ice-rest-catalog/src/test/resources/scenarios/basic-operations/scenario.yaml b/ice-rest-catalog/src/test/resources/scenarios/basic-operations/scenario.yaml
index e1f9ca56..6d410aea 100644
--- a/ice-rest-catalog/src/test/resources/scenarios/basic-operations/scenario.yaml
+++ b/ice-rest-catalog/src/test/resources/scenarios/basic-operations/scenario.yaml
@@ -10,6 +10,7 @@ env:
NAMESPACE_NAME: "test_ns"
INPUT_FILE: "input.parquet"
TABLE_IRIS: "test_ns.iris"
+ TABLE_CREATE_VIA_CLI: "test_ns.iris_create_cli"
TABLE_PARTITIONED: "test_ns.taxis_p_by_day"
TABLE_SORTED: "test_ns.taxis_s_by_day"
TABLE_NO_COPY: "test_ns.iris_no_copy"
diff --git a/ice/README.md b/ice/README.md
index 69687120..c0410008 100644
--- a/ice/README.md
+++ b/ice/README.md
@@ -13,6 +13,7 @@ A CLI for loading data into Iceberg REST catalogs.
- [Delete Partition](#delete-partition)
- [Insert Without Copy](#insert-without-copy)
- [Multiple Files](#multiple-files)
+ - [Parallel inserts and commit retries](#parallel-inserts-and-commit-retries)
- [Namespace Management](#namespace-management)
- [Inspect](#inspect)
- [S3 with Public Data](#s3-with-public-data)
@@ -135,6 +136,24 @@ cat filelist | ice insert flowers.iris -p -
where `filelist` contains one file path per line. If any file fails, the entire transaction is rolled back.
+### Parallel inserts and commit retries
+
+Several concurrent `ice insert` processes (or high catalog contention) can hit optimistic concurrency: one commit wins and others see `CommitFailedException` with a message like `Requirement failed: branch main has changed`. That means the table moved forward while this client was committing; it is not a data corruption signal.
+
+By default, `ice insert` performs **outer** commit retries: it reloads table metadata, re-appends the staged data files, and tries again (default **10** rounds within **300000** ms / 5 minutes). Tune with:
+
+```shell
+# disable outer retries (previous behavior)
+ice insert ns.table file://a.parquet --commit-retries=0
+
+# more headroom under heavy parallel load
+ice insert ns.table file://a.parquet --commit-retries=20 --commit-retry-total-ms=600000
+```
+
+Note: `--commit-retry-total-ms` only applies when `--commit-retries` is greater than zero.
+
+If all retries are exhausted, `ice` logs each **orphaned** data file path (uploaded but not registered in the table). You can delete those objects from object storage if you do not plan to retry.
+
### Namespace Management
```shell
diff --git a/ice/src/main/java/com/altinity/ice/cli/Main.java b/ice/src/main/java/com/altinity/ice/cli/Main.java
index 7d8753fd..7f79483b 100644
--- a/ice/src/main/java/com/altinity/ice/cli/Main.java
+++ b/ice/src/main/java/com/altinity/ice/cli/Main.java
@@ -516,6 +516,18 @@ void insert(
description = "Number of threads to use for inserting data",
defaultValue = "-1")
int threadCount,
+ @CommandLine.Option(
+ names = {"--commit-retries"},
+ description =
+ "Outer retry rounds after CommitFailedException (reload metadata and re-append;"
+ + " set to 0 to disable)",
+ defaultValue = "10")
+ int commitRetries,
+ @CommandLine.Option(
+ names = {"--commit-retry-total-ms"},
+ description = "Total wall-clock budget (ms) for outer commit retries",
+ defaultValue = "300000")
+ long commitRetryTotalMs,
@CommandLine.Option(
names = {"--compression"},
description =
@@ -617,6 +629,8 @@ void insert(
.sortOrderList(sortOrders)
.threadCount(
threadCount < 1 ? Runtime.getRuntime().availableProcessors() : threadCount)
+ .commitRetries(commitRetries)
+ .commitRetryTotalMs(commitRetryTotalMs)
.compression(compression)
.build();
diff --git a/ice/src/main/java/com/altinity/ice/cli/internal/cmd/Insert.java b/ice/src/main/java/com/altinity/ice/cli/internal/cmd/Insert.java
index 858cf042..af7ddc33 100644
--- a/ice/src/main/java/com/altinity/ice/cli/internal/cmd/Insert.java
+++ b/ice/src/main/java/com/altinity/ice/cli/internal/cmd/Insert.java
@@ -36,6 +36,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier;
@@ -66,6 +67,8 @@
import org.apache.iceberg.data.parquet.GenericParquetWriter;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.BadRequestException;
+import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.exceptions.NotFoundException;
import org.apache.iceberg.io.CloseableIterable;
@@ -125,6 +128,16 @@ public static Result run(
}
}
+ if (options.commitRetries() < 0 || options.commitRetryTotalMs() < 0) {
+ throw new IllegalArgumentException(
+ "--commit-retries and --commit-retry-total-ms must be non-negative");
+ }
+ if (options.commitRetries() == 0 && options.commitRetryTotalMs() > 0) {
+ logger.warn(
+ "--commit-retry-total-ms has no effect when --commit-retries is 0; "
+ + "set --commit-retries > 0 to enable outer commit retries");
+ }
+
Table table = catalog.loadTable(nsTable);
// Create transaction and pass it to updatePartitionAndSortOrderMetadata
@@ -183,6 +196,7 @@ public static Result run(
// appendOp to use the same transaction.
AppendFiles appendOp = txn.newAppend();
+ List stagedFiles = new ArrayList<>();
try (FileIO inputIO = Input.newIO(filesExpanded.getFirst(), table, s3ClientLazy);
RetryLog retryLog =
@@ -241,6 +255,7 @@ public static Result run(
for (DataFile df : dataFiles) {
atLeastOneFileAppended = true;
appendOp.appendFile(df); // Only main thread appends now
+ stagedFiles.add(df);
}
} catch (ExecutionException e) {
failed++;
@@ -257,17 +272,34 @@ public static Result run(
if (!options.noCommit()) {
// TODO: log
if (atLeastOneFileAppended) {
- appendOp.commit();
+ try {
+ appendOp.commit();
+ txn.commitTransaction();
+ verifyCommitOrThrow(catalog, nsTable, stagedFiles);
+ } catch (CommitFailedException e) {
+ logger.error("CommitFailedException");
+ commitWithRetryAfterInitialFailure(catalog, nsTable, stagedFiles, options, e);
+ } catch (CommitStateUnknownException e) {
+ logCommitOrphans(stagedFiles);
+ throw new IOException(
+ "DATA LOSS RISK: commit state unknown; "
+ + stagedFiles.size()
+ + " data file(s) uploaded; manual reconciliation required",
+ e);
+ } catch (RuntimeException e) {
+ logCommitOrphans(stagedFiles);
+ throw new IOException(
+ "DATA LOSS: unexpected error during commit; "
+ + stagedFiles.size()
+ + " data file(s) may NOT be registered",
+ e);
+ }
+ if (retryLog != null) {
+ retryLog.commit();
+ }
} else {
logger.warn("Table commit skipped (no files to append)");
}
- if (retryLog != null) {
- retryLog.commit();
- }
- if (atLeastOneFileAppended) {
- // Commit transaction.
- txn.commitTransaction();
- }
} else {
logger.warn("Table commit skipped (--no-commit)");
}
@@ -840,6 +872,151 @@ public String get(PartitionSpec spec, StructLike partitionData, String file) {
}
}
+ private static void sleepCommitRetryBackoff(int round) throws InterruptedException {
+ int shift = Math.min(round - 1, 20);
+ long capMs = Math.min(100L << shift, 30_000L);
+ long sleepMs = ThreadLocalRandom.current().nextLong(0, capMs + 1);
+ Thread.sleep(sleepMs);
+ }
+
+ private static void logCommitOrphans(List stagedFiles) {
+ for (DataFile df : stagedFiles) {
+ logger.error(
+ "DATA LOSS: orphaned data file after exhausted commit retries: {}", df.location());
+ }
+ }
+
+ /**
+ * Ensures every staged data file appears in the table after commit (detects server-side no-op
+ * commits where HTTP succeeds but metadata did not change).
+ */
+ private static void verifyCommitOrThrow(
+ RESTCatalog catalog, TableIdentifier tableId, List stagedFiles) throws IOException {
+ if (stagedFiles.isEmpty()) {
+ return;
+ }
+ Set committed = new HashSet<>();
+ Table fresh = catalog.loadTable(tableId);
+ try (var plan = fresh.newScan().planFiles()) {
+ for (var task : plan) {
+ committed.add(task.file().location());
+ }
+ }
+ List missing =
+ stagedFiles.stream()
+ .filter(df -> !committed.contains(df.location()))
+ .collect(Collectors.toList());
+ if (!missing.isEmpty()) {
+ for (DataFile df : missing) {
+ logger.error(
+ "DATA LOSS: post-commit verification failed; staged file not present in table: {}",
+ df.location());
+ }
+ throw new IOException(
+ "DATA LOSS: "
+ + missing.size()
+ + " of "
+ + stagedFiles.size()
+ + " staged file(s) NOT found in table after commit; "
+ + "client believed commit succeeded but server may have early-returned (empty metadata changes)");
+ }
+ }
+
+ /**
+ * Retries transaction commit after {@link CommitFailedException} by reloading table metadata and
+ * re-appending staged {@link DataFile}s (fixes stale snapshot requirements under contention).
+ */
+ private static void commitWithRetryAfterInitialFailure(
+ RESTCatalog catalog,
+ TableIdentifier tableId,
+ List stagedFiles,
+ Options options,
+ CommitFailedException initialFailure)
+ throws IOException, InterruptedException {
+
+ logger.warn(
+ "Outer commit retry engaged for {} after initial CommitFailedException "
+ + "(commit-retries={}, commit-retry-total-ms={}, stagedFiles={}): {}",
+ tableId,
+ options.commitRetries(),
+ options.commitRetryTotalMs(),
+ stagedFiles.size(),
+ initialFailure.getMessage());
+
+ if (options.commitRetries() == 0) {
+ logCommitOrphans(stagedFiles);
+ throw new IOException(
+ "DATA LOSS: commit failed (--commit-retries is 0); data file(s) may not be registered",
+ initialFailure);
+ }
+
+ long deadlineNs =
+ System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(options.commitRetryTotalMs());
+ CommitFailedException last = initialFailure;
+
+ for (int round = 1; round <= options.commitRetries(); round++) {
+ if (System.nanoTime() > deadlineNs) {
+ logger.warn(
+ "Commit retry wall-clock budget ({} ms) exhausted before completing outer retry round "
+ + "{}/{}",
+ options.commitRetryTotalMs(),
+ round,
+ options.commitRetries());
+ break;
+ }
+
+ sleepCommitRetryBackoff(round);
+
+ if (System.nanoTime() > deadlineNs) {
+ break;
+ }
+
+ try {
+ Table fresh = catalog.loadTable(tableId);
+ Transaction freshTxn = fresh.newTransaction();
+ AppendFiles freshAppend = freshTxn.newAppend();
+ for (DataFile df : stagedFiles) {
+ freshAppend.appendFile(df);
+ }
+ freshAppend.commit();
+ freshTxn.commitTransaction();
+ verifyCommitOrThrow(catalog, tableId, stagedFiles);
+ return;
+ } catch (CommitFailedException retryEx) {
+ last = retryEx;
+ logger.warn(
+ "Commit retry {}/{} failed: {}", round, options.commitRetries(), retryEx.getMessage());
+ } catch (IOException verifyEx) {
+ logCommitOrphans(stagedFiles);
+ throw new IOException(
+ String.format(
+ "DATA LOSS: post-commit verification failed during outer retry round %d/%d; "
+ + "halting retries to avoid duplicate appends. Manual reconciliation required.",
+ round, options.commitRetries()),
+ verifyEx);
+ } catch (CommitStateUnknownException e) {
+ logCommitOrphans(stagedFiles);
+ throw new IOException(
+ String.format(
+ "DATA LOSS RISK: commit state unknown during outer retry round %d/%d; manual reconciliation required",
+ round, options.commitRetries()),
+ e);
+ } catch (RuntimeException e) {
+ last = new CommitFailedException(e, "Commit retry failed unexpectedly");
+ logger.warn("Commit retry {}/{} failed unexpectedly", round, options.commitRetries(), e);
+ }
+ }
+
+ logCommitOrphans(stagedFiles);
+ throw new IOException(
+ "DATA LOSS: commit failed after "
+ + options.commitRetries()
+ + " outer commit retries. "
+ + stagedFiles.size()
+ + " data file(s) uploaded but NOT registered in the table.",
+ last);
+ }
+
public record Options(
DataFileNamingStrategy.Name dataFileNamingStrategy,
boolean skipDuplicates,
@@ -857,7 +1034,9 @@ public record Options(
@Nullable List partitionList,
@Nullable List sortOrderList,
int threadCount,
- @Nullable String compression) {
+ @Nullable String compression,
+ int commitRetries,
+ long commitRetryTotalMs) {
public static Builder builder() {
return new Builder();
@@ -881,6 +1060,8 @@ public static final class Builder {
private List sortOrderList = List.of();
private int threadCount = Runtime.getRuntime().availableProcessors();
private String compression;
+ private int commitRetries = 10;
+ private long commitRetryTotalMs = 300_000L;
private Builder() {}
@@ -969,6 +1150,16 @@ public Builder compression(String compression) {
return this;
}
+ public Builder commitRetries(int commitRetries) {
+ this.commitRetries = commitRetries;
+ return this;
+ }
+
+ public Builder commitRetryTotalMs(long commitRetryTotalMs) {
+ this.commitRetryTotalMs = commitRetryTotalMs;
+ return this;
+ }
+
public Options build() {
return new Options(
dataFileNamingStrategy,
@@ -987,7 +1178,9 @@ public Options build() {
partitionList,
sortOrderList,
threadCount,
- compression);
+ compression,
+ commitRetries,
+ commitRetryTotalMs);
}
}
}
diff --git a/ice/src/main/java/com/altinity/ice/internal/logback/ColorAwarePatternLayout.java b/ice/src/main/java/com/altinity/ice/internal/logback/ColorAwarePatternLayout.java
index 4ca5b19f..5122ce1b 100644
--- a/ice/src/main/java/com/altinity/ice/internal/logback/ColorAwarePatternLayout.java
+++ b/ice/src/main/java/com/altinity/ice/internal/logback/ColorAwarePatternLayout.java
@@ -21,6 +21,10 @@
public class ColorAwarePatternLayout extends PatternLayout {
static {
+ // Logback 1.5.x resolves patterns from DEFAULT_CONVERTER_SUPPLIER_MAP; %processId is not
+ // built into logback-classic 1.5.18, so we register our own.
+ DEFAULT_CONVERTER_SUPPLIER_MAP.put("processId", ProcessIdClassicConverter::new);
+ DEFAULT_CONVERTER_MAP.put("processId", ProcessIdClassicConverter.class.getName());
if (!CommandLine.Help.Ansi.AUTO.enabled()) { // Usage of Picocli heuristic
DEFAULT_CONVERTER_MAP.put("black", NoColorConverter.class.getName());
DEFAULT_CONVERTER_MAP.put("red", NoColorConverter.class.getName());
diff --git a/ice/src/main/java/com/altinity/ice/internal/logback/ProcessIdClassicConverter.java b/ice/src/main/java/com/altinity/ice/internal/logback/ProcessIdClassicConverter.java
new file mode 100644
index 00000000..81ce3027
--- /dev/null
+++ b/ice/src/main/java/com/altinity/ice/internal/logback/ProcessIdClassicConverter.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright (c) 2025 Altinity Inc and/or its affiliates. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ */
+package com.altinity.ice.internal.logback;
+
+import ch.qos.logback.classic.pattern.ClassicConverter;
+import ch.qos.logback.classic.spi.ILoggingEvent;
+
+/**
+ * Emits the JVM process id for use in log patterns as {@code %processId}. Logback does not ship
+ * this conversion word in {@code logback-classic} 1.5.x; we register it from {@link
+ * ColorAwarePatternLayout}.
+ */
+public class ProcessIdClassicConverter extends ClassicConverter {
+
+ private static final String PID = String.valueOf(ProcessHandle.current().pid());
+
+ @Override
+ public String convert(ILoggingEvent event) {
+ return PID;
+ }
+}
diff --git a/ice/src/main/resources/logback.xml b/ice/src/main/resources/logback.xml
index d731bd8f..27fc8463 100644
--- a/ice/src/main/resources/logback.xml
+++ b/ice/src/main/resources/logback.xml
@@ -33,7 +33,7 @@
- %gray(%d{yyyy-MM-dd HH:mm:ss} [%.11thread]) %highlight(%-4level) %gray(%logger{27} >) %X{msgContext}%msg%n%replace(%ex{full,
+ %gray(%d{yyyy-MM-dd HH:mm:ss} [%.11thread/%processId]) %highlight(%-4level) %gray(%logger{27} >) %X{msgContext}%msg%n%replace(%ex{full,
org.eclipse.jetty,
jakarta.servlet,
software.amazon.awssdk.core.internal,