From 54db744f03b4ec9c41d18beeb8e96feccf1ad4ee Mon Sep 17 00:00:00 2001 From: tanishq-chugh Date: Tue, 23 Jun 2026 17:18:36 +0530 Subject: [PATCH 1/7] HIVE-29679: Update Tez AM K8s Operator Auto-Scaling to scale down idle AMs --- .../tezplugins/LlapTaskSchedulerService.java | 9 + .../metrics/LlapTaskSchedulerInfo.java | 3 +- .../metrics/LlapTaskSchedulerMetrics.java | 8 + .../hive-operator/templates/clusterrole.yaml | 4 + packaging/src/kubernetes/pom.xml | 5 + .../autoscaling/HiveClusterAutoscaler.java | 74 ++++++- .../autoscaling/TezAmBusyMetrics.java | 51 +++++ .../autoscaling/TezAmZkDeregistrar.java | 183 ++++++++++++++++++ .../dependent/HiveDependentResource.java | 9 +- .../dependent/LlapResourceBuilder.java | 138 +++++++++++-- .../reconciler/HiveClusterReconciler.java | 53 +++-- 11 files changed, 494 insertions(+), 43 deletions(-) create mode 100644 packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/autoscaling/TezAmBusyMetrics.java create mode 100644 packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/autoscaling/TezAmZkDeregistrar.java diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java index 9ac108728fb7..c75800c5546a 100644 --- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java +++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java @@ -1080,6 +1080,9 @@ public void dagComplete() { writeLock.lock(); try { dagRunning = false; + if (metrics != null) { + metrics.setDagRunning(false); + } dagStats = new StatsPerDag(); int pendingCount = 0; for (Entry> entry : pendingTasks.entrySet()) { @@ -1173,6 +1176,9 @@ public void allocateTask(Object task, Resource capability, String[] hosts, Strin metrics.setDagId(id.getDAGID().toString()); } dagRunning = true; + if (metrics != null) { + metrics.setDagRunning(true); + } } dagStats.registerTaskRequest(hosts, racks); addPendingTask(taskInfo); @@ -1194,6 +1200,9 @@ public void allocateTask(Object task, Resource capability, ContainerId container metrics.setDagId(id.getDAGID().toString()); } dagRunning = true; + if (metrics != null) { + metrics.setDagRunning(true); + } } dagStats.registerTaskRequest(null, null); addPendingTask(taskInfo); diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapTaskSchedulerInfo.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapTaskSchedulerInfo.java index 0750dc6aa7ba..1ae0d4ec6e55 100644 --- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapTaskSchedulerInfo.java +++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapTaskSchedulerInfo.java @@ -38,7 +38,8 @@ public enum LlapTaskSchedulerInfo implements MetricsInfo { SchedulerRunningTaskCount("Total number of running tasks"), SchedulerPendingPreemptionTaskCount("Total number of tasks pending for pre-emption"), SchedulerPreemptedTaskCount("Total number of tasks pre-empted"), - SchedulerCompletedDagCount("Number of DAGs completed"); + SchedulerCompletedDagCount("Number of DAGs completed"), + SchedulerDagRunning("Binary that represents if the AM is idle or running a DAG"); private final String desc; diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapTaskSchedulerMetrics.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapTaskSchedulerMetrics.java index 0dd645da82cc..f5c684b7ab3d 100644 --- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapTaskSchedulerMetrics.java +++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapTaskSchedulerMetrics.java @@ -25,6 +25,7 @@ import static org.apache.hadoop.hive.llap.tezplugins.metrics.LlapTaskSchedulerInfo.SchedulerPendingPreemptionTaskCount; import static org.apache.hadoop.hive.llap.tezplugins.metrics.LlapTaskSchedulerInfo.SchedulerPendingTaskCount; import static org.apache.hadoop.hive.llap.tezplugins.metrics.LlapTaskSchedulerInfo.SchedulerPreemptedTaskCount; +import static org.apache.hadoop.hive.llap.tezplugins.metrics.LlapTaskSchedulerInfo.SchedulerDagRunning; import static org.apache.hadoop.hive.llap.tezplugins.metrics.LlapTaskSchedulerInfo.SchedulerRunningTaskCount; import static org.apache.hadoop.hive.llap.tezplugins.metrics.LlapTaskSchedulerInfo.SchedulerSchedulableTaskCount; import static org.apache.hadoop.hive.llap.tezplugins.metrics.LlapTaskSchedulerInfo.SchedulerSuccessfulTaskCount; @@ -84,6 +85,8 @@ public class LlapTaskSchedulerMetrics implements MetricsSource { @Metric MutableCounterInt completedDagcount; @Metric + MutableGaugeInt dagRunning; + @Metric MutableCounterInt pendingPreemptionTasksCount; @Metric MutableGaugeInt wmUnusedGuaranteedCount; @@ -276,6 +279,7 @@ private void getTaskSchedulerStats(MetricsRecordBuilder rb) { .addGauge(SchedulerMemoryPerInstance, memoryPerInstance.value()) .addGauge(SchedulerCpuCoresPerInstance, cpuCoresPerInstance.value()) .addGauge(SchedulerDisabledNodeCount, disabledNodeCount.value()) + .addGauge(SchedulerDagRunning, dagRunning.value()) .addCounter(SchedulerPendingTaskCount, pendingTasksCount.value()) .addCounter(SchedulerSchedulableTaskCount, schedulableTasksCount.value()) .addCounter(SchedulerRunningTaskCount, runningTasksCount.value()) @@ -285,6 +289,10 @@ private void getTaskSchedulerStats(MetricsRecordBuilder rb) { .addCounter(SchedulerCompletedDagCount, completedDagcount.value()); } + public void setDagRunning(boolean running) { + dagRunning.set(running ? 1 : 0); + } + public JvmMetrics getJvmMetrics() { return jvmMetrics; } diff --git a/packaging/src/kubernetes/helm/hive-operator/templates/clusterrole.yaml b/packaging/src/kubernetes/helm/hive-operator/templates/clusterrole.yaml index 3b0eb0e8e40f..3316ce110546 100644 --- a/packaging/src/kubernetes/helm/hive-operator/templates/clusterrole.yaml +++ b/packaging/src/kubernetes/helm/hive-operator/templates/clusterrole.yaml @@ -58,3 +58,7 @@ rules: - apiGroups: ["policy"] resources: ["poddisruptionbudgets"] verbs: ["get", "list", "watch", "create", "update", "patch", "delete"] + # EndpointSlices: operator manages a custom per-pod-hostname slice for TezAM DNS + - apiGroups: ["discovery.k8s.io"] + resources: ["endpointslices"] + verbs: ["get", "list", "create", "patch", "delete"] diff --git a/packaging/src/kubernetes/pom.xml b/packaging/src/kubernetes/pom.xml index f9e7bd046de8..bc2a2a60b37d 100644 --- a/packaging/src/kubernetes/pom.xml +++ b/packaging/src/kubernetes/pom.xml @@ -73,6 +73,11 @@ ${fabric8.version} provided + + org.apache.curator + curator-framework + ${curator.version} + org.slf4j slf4j-api diff --git a/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/autoscaling/HiveClusterAutoscaler.java b/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/autoscaling/HiveClusterAutoscaler.java index df46ced674c6..10c38d5de6dc 100644 --- a/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/autoscaling/HiveClusterAutoscaler.java +++ b/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/autoscaling/HiveClusterAutoscaler.java @@ -24,6 +24,8 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.function.ToIntFunction; +import java.util.stream.Collectors; import io.fabric8.kubernetes.client.KubernetesClient; import org.apache.hive.kubernetes.operator.model.HiveCluster; @@ -178,8 +180,9 @@ public AutoscalingEvaluation evaluate(HiveCluster cluster, KubernetesClient clie } else { // Pod deletion cost only applies to Deployments (ReplicaSet controller). // StatefulSets always scale down by highest ordinal regardless of this - // annotation. LLAP/TezAM graceful drain is handled by preStop hooks. - updateDeploymentPodDeletionCost(client, namespace, hs2Metrics, "hs2_open_sessions"); + // annotation. LLAP graceful drain is handled by preStop hooks. + updateDeploymentPodDeletionCost(client, namespace, hs2Metrics, + pm -> pm.metrics().getOrDefault("hs2_open_sessions", 0.0).intValue()); Map hs2Patches = new HashMap<>(); evaluateComponent(cluster, client, namespace, clusterName, @@ -251,9 +254,54 @@ public AutoscalingEvaluation evaluate(HiveCluster cluster, KubernetesClient clie tezAuto.metricsPort(), tezAuto.metricsScrapeIntervalSeconds()); String tezKey = cacheKey(namespace, clusterName, tezAmComponentKey); List tezMetrics = metricsCache.getOrEmpty(tezKey, tezAuto.metricsScrapeIntervalSeconds() * 3); + + updateDeploymentPodDeletionCost(client, namespace, tezMetrics, + pm -> TezAmBusyMetrics.deletionCost(pm.metrics())); + + Map tezPatches = new HashMap<>(); evaluateComponent(cluster, client, namespace, clusterName, - tezAmComponentKey, tezAuto, - perLlapTezAm.replicas(), patches, statuses, tezMetrics); + tezAmComponentKey, tezAuto, perLlapTezAm.replicas(), tezPatches, statuses, tezMetrics); + + Integer tezPatch = tezPatches.get(tezAmComponentKey); + int currentTezReplicas = getCurrentReplicas(client, namespace, clusterName, tezAmComponentKey); + if (tezPatch != null && tezPatch < currentTezReplicas) { + // Scale-down: effective target = max(desired, busyCount). + // Idle pods are removed first as per pod-deletion-cost + PendingScaleDown pending = pendingScaleDowns.get(tezKey); + if (pending != null) { + if (Duration.between(pending.annotatedAt(), Instant.now()).toSeconds() >= 2) { + // Deregister idle AMs from ZK before applying the scale patch. + // All HS2 instances (active-active HA) see CHILD_REMOVED and stop routing to + // these AMs — no new DAGs can arrive on pods that are about to be terminated. + List idlePodNames = tezMetrics.stream() + .filter(pm -> !TezAmBusyMetrics.hasActiveDag(pm.metrics())) + .map(PodMetrics::podName) + .collect(Collectors.toList()); + TezAmZkDeregistrar.deregisterIdlePods( + cluster.getSpec().zookeeper().quorum(), llapSpec.name(), idlePodNames, + cluster.getSpec().hiveServer2().configOverrides()); + patches.put(tezAmComponentKey, pending.targetReplicas()); + MANAGED_REPLICAS.put(tezKey, pending.targetReplicas()); + lastScaleTimes.put(tezKey, Instant.now().toString()); + pendingScaleDowns.remove(tezKey); + LOG.info("[{}] Applying deferred scale-down to {} replicas", tezAmComponentKey, + pending.targetReplicas()); + } + } else { + int busyCount = countBusyPods(tezMetrics); + int effectivePatch = Math.max(tezPatch, busyCount); + if (effectivePatch != tezPatch) { + LOG.info("[{}] Scale-down target adjusted to {} (desired={}, busy AMs={})", + tezAmComponentKey, effectivePatch, tezPatch, busyCount); + } + pendingScaleDowns.put(tezKey, new PendingScaleDown(effectivePatch, Instant.now())); + LOG.info("[{}] Deferring scale-down to {} (waiting for deletion-cost propagation)", + tezAmComponentKey, effectivePatch); + } + } else if (tezPatch != null) { + patches.put(tezAmComponentKey, tezPatch); + MANAGED_REPLICAS.put(tezKey, tezPatch); + } } } @@ -362,8 +410,7 @@ private int getCurrentReplicas(KubernetesClient client, String namespace, } else { workloadName = clusterName + "-" + component; } - if (component.startsWith(ConfigUtils.COMPONENT_LLAP + "-") - || component.startsWith(ConfigUtils.COMPONENT_TEZAM + "-")) { + if (component.startsWith(ConfigUtils.COMPONENT_LLAP + "-")) { var ss = client.apps().statefulSets() .inNamespace(namespace).withName(workloadName).get(); return ss != null && ss.getSpec().getReplicas() != null ? ss.getSpec().getReplicas() : 0; @@ -375,17 +422,24 @@ private int getCurrentReplicas(KubernetesClient client, String namespace, } } + /** Counts TezAM pods with active DAG work */ + private int countBusyPods(List tezMetrics) { + return (int) tezMetrics.stream() + .filter(pm -> TezAmBusyMetrics.hasActiveDag(pm.metrics())) + .count(); + } + /** * Patches each pod's deletion cost annotation based on its active session count. * Kubernetes uses this during scale-down to kill idle pods first (lower cost = killed first). *

- * Only meaningful for Deployments (HS2, Metastore) — the ReplicaSet controller + * Only meaningful for Deployments (HS2, Metastore, TezAM) — the ReplicaSet controller * respects this annotation. StatefulSets ignore it and always terminate by ordinal. */ private void updateDeploymentPodDeletionCost(KubernetesClient client, String namespace, - List metrics, String metricName) { + List metrics, ToIntFunction costFunction) { for (PodMetrics pm : metrics) { - int sessions = pm.metrics().getOrDefault(metricName, 0.0).intValue(); + int cost = costFunction.applyAsInt(pm); try { client.pods().inNamespace(namespace).withName(pm.podName()) .edit(pod -> { @@ -393,7 +447,7 @@ private void updateDeploymentPodDeletionCost(KubernetesClient client, String nam pod.getMetadata().setAnnotations(new java.util.HashMap<>()); } pod.getMetadata().getAnnotations() - .put("controller.kubernetes.io/pod-deletion-cost", String.valueOf(sessions)); + .put("controller.kubernetes.io/pod-deletion-cost", String.valueOf(cost)); return pod; }); } catch (Exception e) { diff --git a/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/autoscaling/TezAmBusyMetrics.java b/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/autoscaling/TezAmBusyMetrics.java new file mode 100644 index 000000000000..40f922b248c8 --- /dev/null +++ b/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/autoscaling/TezAmBusyMetrics.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.kubernetes.operator.autoscaling; + +import java.util.Map; + +/** + * Interprets TezAM JMX Exporter metrics to determine whether an AM is busy executing + * a DAG or idle and safe to remove during scale-down. + *

+ * Signal: {@code tez_am_dag_running} (SchedulerDagRunning gauge in LlapTaskSchedulerMetrics). + * Set to 1 when the AM receives its first task for a new DAG, cleared to 0 in dagComplete(). + */ +public final class TezAmBusyMetrics { + + public static final String METRIC_DAG_RUNNING = "tez_am_dag_running"; + + private TezAmBusyMetrics() {} + + /** + * Returns true when the TezAM has active DAG work (running or pending tasks). + */ + public static boolean hasActiveDag(Map metrics) { + return metrics.getOrDefault(METRIC_DAG_RUNNING, 0.0) > 0; + } + + /** + * Returns the pod-deletion-cost for this AM: + * 0 if the AM is idle + * 1 if the AM is running a DAG + */ + public static int deletionCost(Map metrics) { + return hasActiveDag(metrics) ? 1 : 0; + } +} diff --git a/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/autoscaling/TezAmZkDeregistrar.java b/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/autoscaling/TezAmZkDeregistrar.java new file mode 100644 index 000000000000..2d1b78df16e5 --- /dev/null +++ b/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/autoscaling/TezAmZkDeregistrar.java @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.kubernetes.operator.autoscaling; + +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.hive.kubernetes.operator.util.ConfigUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Deletes idle TezAM ZooKeeper registration nodes before a scale-down is applied. + *

+ * Each TezAM registers at {@code /tez-external-sessions//}. + * When we delete these nodes, every HS2 instance (including all replicas in an + * active-active HA setup) sees a {@code CHILD_REMOVED} ZK event and immediately + * stops routing new sessions to those AMs. This prevents new DAGs from arriving + * on a pod that is about to be terminated. + *

+ * Only the registration node is deleted — the ephemeral claim node + * ({@code /tez-external-sessions/-claims/}) is not touched; + * it belongs to the HS2 ZK session and disappears naturally when HS2 releases it. + *

+ * ZK connection parameters are read from the cluster's HS2 configOverrides using the + * same keys that {@code ZookeeperExternalSessionsRegistryClient} reads from HiveConf: + *

    + *
  • {@code hive.zookeeper.connection.timeout} (default: 15s → 15000 ms)
  • + *
  • {@code hive.zookeeper.session.timeout} (default: 120000ms)
  • + *
  • {@code hive.zookeeper.connection.basesleeptime} (default: 1000ms)
  • + *
  • {@code hive.zookeeper.connection.max.retries} (default: 3)
  • + *
+ */ +public final class TezAmZkDeregistrar { + + private static final Logger LOG = LoggerFactory.getLogger(TezAmZkDeregistrar.class); + + private TezAmZkDeregistrar() {} + + /** + * Deletes ZK registration nodes for the given idle TezAM pods. + * Failures are logged as warnings and do not block the scale-down — the + * preStop drain on the pod provides a safety net. + * + * @param zkQuorum ZooKeeper connection string + * @param llapName LLAP cluster name (e.g. "llap0"), used as the ZK namespace + * @param idlePodNames pod names that are idle and about to be removed + * @param hiveSiteConfig HS2 configOverrides map — used to read ZK connection settings + * using the same keys as {@code ZookeeperExternalSessionsRegistryClient} + */ + public static void deregisterIdlePods(String zkQuorum, String llapName, + List idlePodNames, Map hiveSiteConfig) { + if (idlePodNames.isEmpty()) { + return; + } + int connTimeoutMs = getTimeMs(hiveSiteConfig, "hive.zookeeper.connection.timeout", 15000); + int sessionTimeoutMs = getTimeMs(hiveSiteConfig, "hive.zookeeper.session.timeout", 120000); + int baseSleepMs = getTimeMs(hiveSiteConfig, "hive.zookeeper.connection.basesleeptime", 1000); + int maxRetries = getInt(hiveSiteConfig, "hive.zookeeper.connection.max.retries", 3); + + String registryPath = ConfigUtils.TEZ_EXTERNAL_SESSIONS_ZK_PREFIX + "/" + llapName; + CuratorFramework client = CuratorFrameworkFactory.builder() + .connectString(zkQuorum) + .connectionTimeoutMs(connTimeoutMs) + .sessionTimeoutMs(sessionTimeoutMs) + .retryPolicy(new ExponentialBackoffRetry(baseSleepMs, maxRetries)) + .build(); + try { + client.start(); + if (!client.blockUntilConnected(connTimeoutMs, TimeUnit.MILLISECONDS)) { + LOG.warn("[tezam-{}] ZK connect timeout — skipping deregistration", llapName); + return; + } + if (client.checkExists().forPath(registryPath) == null) { + return; + } + for (String appId : client.getChildren().forPath(registryPath)) { + String nodePath = registryPath + "/" + appId; + byte[] data = client.getData().forPath(nodePath); + if (data == null || data.length == 0) { + continue; + } + String hostName = extractHostName(new String(data, StandardCharsets.UTF_8)); + if (hostName == null) { + continue; + } + // hostName = "...svc.cluster.local" + String podName = hostName.contains(".") ? hostName.substring(0, hostName.indexOf('.')) : hostName; + if (idlePodNames.contains(podName)) { + try { + client.delete().forPath(nodePath); + LOG.info("[tezam-{}] Deregistered pod {} (appId={}) from ZK — HS2 will stop routing to it", + llapName, podName, appId); + } catch (Exception e) { + LOG.warn("[tezam-{}] Failed to delete ZK node for pod {}: {}", llapName, podName, e.getMessage()); + } + } + } + } catch (Exception e) { + LOG.warn("[tezam-{}] ZK deregistration error: {}", llapName, e.getMessage()); + } finally { + client.close(); + } + } + + /** + * Reads a time value (in milliseconds) from the config map. + * If the key is absent or un-parseable the defaultMs value is returned. + */ + static int getTimeMs(Map config, String key, int defaultMs) { + if (config == null) { + return defaultMs; + } + String val = config.get(key); + if (val == null) { + return defaultMs; + } + val = val.trim(); + try { + if (val.endsWith("ms")) { + return Integer.parseInt(val.substring(0, val.length() - 2).trim()); + } + if (val.endsWith("s")) { + return (int) (Double.parseDouble(val.substring(0, val.length() - 1).trim()) * 1_000); + } + if (val.endsWith("m")) { + return (int) (Double.parseDouble(val.substring(0, val.length() - 1).trim()) * 60_000); + } + return Integer.parseInt(val); + } catch (NumberFormatException e) { + LOG.debug("Unparseable ZK config '{}' = '{}', using default {}ms", key, val, defaultMs); + return defaultMs; + } + } + + static int getInt(Map config, String key, int defaultVal) { + if (config == null) { + return defaultVal; + } + String val = config.get(key); + if (val == null) { + return defaultVal; + } + try { + return Integer.parseInt(val.trim()); + } catch (NumberFormatException e) { + LOG.debug("Unparseable ZK config '{}' = '{}', using default {}", key, val, defaultVal); + return defaultVal; + } + } + + static String extractHostName(String json) { + String marker = "\"hostName\":\""; + int start = json.indexOf(marker); + if (start < 0) { + return null; + } + start += marker.length(); + int end = json.indexOf('"', start); + return end > start ? json.substring(start, end) : null; + } +} diff --git a/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/dependent/HiveDependentResource.java b/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/dependent/HiveDependentResource.java index 2315b455d760..4dfa73fb774a 100644 --- a/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/dependent/HiveDependentResource.java +++ b/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/dependent/HiveDependentResource.java @@ -855,9 +855,12 @@ private static String buildJmxExporterConfig(String component) { sb.append(" type: GAUGE\n"); break; case ConfigUtils.COMPONENT_TEZAM: - // TezAM DAG execution metrics - sb.append("- pattern: 'Hadoop<>(.+)'\n"); - sb.append(" name: tez_am_$1\n"); + // LlapMetricsSystem registers beans under LlapTaskScheduler service + // SchedulerDagRunning tracks if the AM is running a dag or is idle as a binary + // so exported as GAUGE. + String schedulerBean = "Hadoop<>"; + sb.append("- pattern: '").append(schedulerBean).append("SchedulerDagRunning'\n"); + sb.append(" name: tez_am_dag_running\n"); sb.append(" type: GAUGE\n"); break; default: diff --git a/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/dependent/LlapResourceBuilder.java b/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/dependent/LlapResourceBuilder.java index 504901197168..933b66f171a8 100644 --- a/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/dependent/LlapResourceBuilder.java +++ b/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/dependent/LlapResourceBuilder.java @@ -19,6 +19,7 @@ package org.apache.hive.kubernetes.operator.dependent; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -31,16 +32,24 @@ import io.fabric8.kubernetes.api.model.OwnerReference; import io.fabric8.kubernetes.api.model.OwnerReferenceBuilder; import io.fabric8.kubernetes.api.model.IntOrString; +import io.fabric8.kubernetes.api.model.Pod; import io.fabric8.kubernetes.api.model.Probe; import io.fabric8.kubernetes.api.model.Service; import io.fabric8.kubernetes.api.model.ServiceBuilder; import io.fabric8.kubernetes.api.model.Volume; import io.fabric8.kubernetes.api.model.VolumeMount; +import io.fabric8.kubernetes.api.model.apps.Deployment; +import io.fabric8.kubernetes.api.model.apps.DeploymentBuilder; import io.fabric8.kubernetes.api.model.apps.StatefulSet; import io.fabric8.kubernetes.api.model.apps.StatefulSetBuilder; +import io.fabric8.kubernetes.api.model.discovery.v1.Endpoint; +import io.fabric8.kubernetes.api.model.discovery.v1.EndpointBuilder; +import io.fabric8.kubernetes.api.model.discovery.v1.EndpointSlice; +import io.fabric8.kubernetes.api.model.discovery.v1.EndpointSliceBuilder; import io.fabric8.kubernetes.api.model.policy.v1.PodDisruptionBudget; import io.fabric8.kubernetes.api.model.policy.v1.PodDisruptionBudgetBuilder; import io.javaoperatorsdk.operator.api.reconciler.Context; +import org.apache.hive.kubernetes.operator.autoscaling.TezAmBusyMetrics; import org.apache.hive.kubernetes.operator.model.HiveCluster; import org.apache.hive.kubernetes.operator.model.HiveClusterSpec; import org.apache.hive.kubernetes.operator.model.spec.AutoscalingSpec; @@ -193,7 +202,7 @@ public static PodDisruptionBudget buildPdb(HiveCluster hc, LlapSpec llap) { // --- TezAM resource builders (one TezAM per LLAP cluster) --- - /** TezAM StatefulSet name for a specific LLAP cluster. */ + /** TezAM Deployment/Service name for a specific LLAP cluster. */ public static String tezAmResourceName(HiveCluster hc, LlapSpec llap) { return hc.getMetadata().getName() + TEZAM_INFIX + llap.name(); } @@ -229,9 +238,81 @@ public static PodDisruptionBudget buildTezAmPdb(HiveCluster hc, LlapSpec llap) { .build(); } - /** Builds the TezAM StatefulSet for a specific LLAP cluster. */ - public static StatefulSet buildTezAmStatefulSet(HiveCluster hc, LlapSpec llap, Integer replicas) { - return INSTANCE.doBuildTezAmStatefulSet(hc, llap, replicas); + /** Builds the TezAM Deployment for a specific LLAP cluster. */ + public static Deployment buildTezAmDeployment(HiveCluster hc, LlapSpec llap, Integer replicas) { + return INSTANCE.doBuildTezAmDeployment(hc, llap, replicas); + } + + /** + * Name for the operator-managed EndpointSlice that provides per-pod DNS for TezAM. + * CoreDNS creates {@code ...svc.cluster.local} A-records using it. + */ + public static String tezAmEndpointSliceName(HiveCluster hc, LlapSpec llap) { + return tezAmResourceName(hc, llap) + "-hostnames"; + } + + /** + * Builds a custom EndpointSlice for the TezAM headless Service. + *

+ * Kubernetes only creates per-pod DNS records ({@code ...svc.cluster.local}) + * when the Endpoints/EndpointSlice has a {@code hostname} field for each address. The + * default EndpointSlice controller omits {@code hostname} for Deployment pods (it only + * sets it automatically for StatefulSet pods). This operator-managed EndpointSlice fills + * that gap, giving every ready TezAM pod a resolvable FQDN. + * + * @param pods list of TezAM pods + */ + public static EndpointSlice buildTezAmEndpointSlice(HiveCluster hc, LlapSpec llap, List pods) { + String ns = hc.getMetadata().getNamespace(); + String svcName = tezAmResourceName(hc, llap); + Map labels = new HashMap<>(Map.of( + "kubernetes.io/service-name", svcName, + "endpointslice.kubernetes.io/managed-by", "hive-kubernetes-operator", + Labels.MANAGED_BY, Labels.MANAGED_BY_VALUE, + Labels.APP_INSTANCE, hc.getMetadata().getName(), + Labels.APP_COMPONENT, ConfigUtils.COMPONENT_TEZAM)); + + List endpoints = new ArrayList<>(); + for (var pod : pods) { + String ip = pod.getStatus() != null ? pod.getStatus().getPodIP() : null; + if (ip == null || ip.isEmpty()) { + continue; + } + boolean ready = isPodReady(pod); + endpoints.add(new EndpointBuilder() + .withHostname(pod.getMetadata().getName()) + .withAddresses(ip) + .withNewConditions() + .withReady(ready) + .withServing(ready) + .withTerminating(false) + .endConditions() + .withNewTargetRef() + .withKind("Pod") + .withNamespace(ns) + .withName(pod.getMetadata().getName()) + .endTargetRef() + .build()); + } + + return new EndpointSliceBuilder() + .withNewMetadata() + .withName(tezAmEndpointSliceName(hc, llap)) + .withNamespace(ns) + .withLabels(labels) + .withOwnerReferences(ownerRef(hc)) + .endMetadata() + .withAddressType("IPv4") + .withEndpoints(endpoints) + .build(); + } + + private static boolean isPodReady(io.fabric8.kubernetes.api.model.Pod pod) { + if (pod.getStatus() == null || pod.getStatus().getConditions() == null) { + return false; + } + return pod.getStatus().getConditions().stream() + .anyMatch(c -> "Ready".equals(c.getType()) && "True".equals(c.getStatus())); } /** Builds the headless Service for a TezAM cluster. */ @@ -261,6 +342,11 @@ public static ConfigMap buildTezAmConfigMap(HiveCluster hc, LlapSpec llap) { Map labels = Labels.forTezAmCluster(hc, llap.name()); Map tezSite = HiveConfigBuilder.getTezSite(spec, llap); + // Register the Hadoop Metrics2 JMX sink so LlapTaskSchedulerMetrics are exposed as + // JMX MBeans. The metrics system has no JMX sink and the JMX Exporter agent cannot + // see any Hadoop/LLAP metrics without this being present. + String hadoopMetrics2 = "*.sink.jmx.class=org.apache.hadoop.metrics2.sink.JmxSink\n"; + return new ConfigMapBuilder() .withNewMetadata() .withName(tezAmConfigMapName(hc, llap)) @@ -269,16 +355,17 @@ public static ConfigMap buildTezAmConfigMap(HiveCluster hc, LlapSpec llap) { .withOwnerReferences(ownerRef(hc)) .endMetadata() .addToData("tez-site.xml", HadoopXmlBuilder.buildXml(tezSite)) + .addToData("hadoop-metrics2.properties", hadoopMetrics2) .build(); } // --- Private instance methods that use protected helpers from HiveDependentResource --- - private StatefulSet doBuildTezAmStatefulSet(HiveCluster hiveCluster, LlapSpec llap, + private Deployment doBuildTezAmDeployment(HiveCluster hiveCluster, LlapSpec llap, Integer replicas) { HiveClusterSpec spec = hiveCluster.getSpec(); String ns = hiveCluster.getMetadata().getNamespace(); - String ssName = tezAmResourceName(hiveCluster, llap); + String deployName = tezAmResourceName(hiveCluster, llap); Map allLabels = Labels.forTezAmCluster(hiveCluster, llap.name()); Map selectorLabels = Labels.selectorForTezAmCluster(hiveCluster, llap.name()); @@ -321,24 +408,28 @@ private StatefulSet doBuildTezAmStatefulSet(HiveCluster hiveCluster, LlapSpec ll addExternalJars(spec.image(), spec.externalJars(), initContainers, volumeMounts, volumes, envVars); replaceConfMountWithSubPaths(volumeMounts, HIVE_CONFIG_VOLUME, - "hive-site.xml", "tez-site.xml", "core-site.xml"); + "hive-site.xml", "tez-site.xml", "core-site.xml", "hadoop-metrics2.properties"); + + AutoscalingSpec tezAutoscaling = llap.tezAm().autoscaling(); + if (tezAutoscaling.isEnabled()) { + addJmxExporter(spec.image(), ConfigUtils.COMPONENT_TEZAM, tezAutoscaling.metricsPort(), + initContainers, volumeMounts, volumes, envVars, ports); + } String configHash = sha256( HadoopXmlBuilder.buildXml(HiveConfigBuilder.getHiveServer2HiveSite(hiveCluster, spec)), HadoopXmlBuilder.buildXml(HiveConfigBuilder.getTezSite(spec, llap)), HadoopXmlBuilder.buildXml(HiveConfigBuilder.getHadoopCoreSite(spec))); - StatefulSet statefulSet = new StatefulSetBuilder() + Deployment deployment = new DeploymentBuilder() .withNewMetadata() - .withName(ssName) + .withName(deployName) .withNamespace(ns) .withLabels(allLabels) .withOwnerReferences(ownerRef(hiveCluster)) .endMetadata() .withNewSpec() .withReplicas(replicas) - .withPodManagementPolicy("Parallel") - .withServiceName(ssName) .withNewSelector() .withMatchLabels(selectorLabels) .endSelector() @@ -350,6 +441,7 @@ private StatefulSet doBuildTezAmStatefulSet(HiveCluster hiveCluster, LlapSpec ll .addToAnnotations("hive.apache.org/config-hash", configHash) .endMetadata() .withNewSpec() + .withSubdomain(deployName) .withInitContainers(initContainers) .addNewContainer() .withName(ConfigUtils.COMPONENT_TEZAM) @@ -367,13 +459,31 @@ private StatefulSet doBuildTezAmStatefulSet(HiveCluster hiveCluster, LlapSpec ll .build(); applySpreadAffinityIfAbsent( - statefulSet.getSpec().getTemplate().getSpec(), selectorLabels); + deployment.getSpec().getTemplate().getSpec(), selectorLabels); - appendUserVolumes(statefulSet.getSpec().getTemplate().getSpec(), + appendUserVolumes(deployment.getSpec().getTemplate().getSpec(), spec.volumes(), spec.volumeMounts(), spec.tezAm().extraVolumes(), spec.tezAm().extraVolumeMounts()); - return statefulSet; + // When autoscaling is enabled, add a preStop hook that waits for any in-flight DAG + // to complete before exiting. The operator has already deleted the ZK registration + // node (see HiveClusterAutoscaler) so no new DAGs can arrive. If a DAG arrives in + // via the brief race window before ZK delete, we wait for it to finish. + // Once tez_am_dag_running reaches 0, the hook exits and Kubernetes terminates the pod. + if (tezAutoscaling.isEnabled()) { + String preStopScript = buildDrainScript( + "Waiting for active DAG to complete", + TezAmBusyMetrics.METRIC_DAG_RUNNING, "DAG", + "TezAM is idle. preStop complete, K8s will terminate pod.", + 10, 6, null, tezAutoscaling.metricsPort()); + applyAutoscalingLifecycle( + deployment.getSpec().getTemplate().getSpec(), + deployment.getSpec().getTemplate().getMetadata(), + preStopScript, tezAutoscaling.gracePeriodSeconds(), + tezAutoscaling.metricsScrapeIntervalSeconds()); + } + + return deployment; } private StatefulSet doBuildStatefulSet(HiveCluster hiveCluster, LlapSpec llap, Integer replicas) { diff --git a/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/reconciler/HiveClusterReconciler.java b/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/reconciler/HiveClusterReconciler.java index 1bdc6a4ebe27..5f0739baeedb 100644 --- a/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/reconciler/HiveClusterReconciler.java +++ b/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/reconciler/HiveClusterReconciler.java @@ -30,6 +30,7 @@ import io.fabric8.kubernetes.api.model.Condition; import io.fabric8.kubernetes.api.model.HasMetadata; +import io.fabric8.kubernetes.api.model.Pod; import io.fabric8.kubernetes.api.model.apps.Deployment; import io.fabric8.kubernetes.api.model.apps.StatefulSet; import io.fabric8.kubernetes.api.model.batch.v1.Job; @@ -312,7 +313,7 @@ private HiveClusterStatus buildStatus(HiveCluster resource, ? perLlapTezAm.autoscaling().minReplicas() : perLlapTezAm.replicas(); tezAmStatuses.put(llapSpec.name(), - buildComponentStatus(context, StatefulSet.class, tezAmSsName, + buildComponentStatus(context, Deployment.class, tezAmSsName, perLlapTezAm.replicas(), tezAmMin)); } status.setTezAmClusters(tezAmStatuses); @@ -562,8 +563,7 @@ private void patchReplicas(KubernetesClient client, HiveCluster resource, workloadName = resource.getMetadata().getName() + "-" + component; } try { - if (component.startsWith(ConfigUtils.COMPONENT_LLAP + "-") - || component.startsWith(ConfigUtils.COMPONENT_TEZAM + "-")) { + if (component.startsWith(ConfigUtils.COMPONENT_LLAP + "-")) { client.apps().statefulSets().inNamespace(namespace).withName(workloadName).scale(replicas); } else { client.apps().deployments().inNamespace(namespace).withName(workloadName).scale(replicas); @@ -636,16 +636,18 @@ private void reconcileLlapClusters(HiveCluster resource, KubernetesClient client // --- Per-LLAP TezAM resources (one TezAM per LLAP cluster) --- if (resource.getSpec().tezAm().isEnabled()) { int tezAmReplicas = resolveTezAmReplicaCount(resource, ns, clusterName, llapSpec); + String tezAmName = LlapResourceBuilder.tezAmResourceName(resource, llapSpec); client.configMaps().inNamespace(ns) .resource(LlapResourceBuilder.buildTezAmConfigMap(resource, llapSpec)) .serverSideApply(); client.services().inNamespace(ns) .resource(LlapResourceBuilder.buildTezAmService(resource, llapSpec)) .serverSideApply(); - client.apps().statefulSets().inNamespace(ns) - .resource(LlapResourceBuilder.buildTezAmStatefulSet(resource, llapSpec, tezAmReplicas)) + client.apps().deployments().inNamespace(ns) + .resource(LlapResourceBuilder.buildTezAmDeployment(resource, llapSpec, tezAmReplicas)) .forceConflicts() .serverSideApply(); + reconcileTezAmEndpointSlice(resource, client, llapSpec); if (llapSpec.tezAm().autoscaling().isEnabled()) { client.policy().v1().podDisruptionBudget().inNamespace(ns) .resource(LlapResourceBuilder.buildTezAmPdb(resource, llapSpec)) @@ -743,24 +745,45 @@ private void garbageCollectLlapResources(KubernetesClient client, String ns, Labels.APP_INSTANCE, clusterName, Labels.APP_COMPONENT, ConfigUtils.COMPONENT_TEZAM); - client.apps().statefulSets().inNamespace(ns).withLabels(tezamSelector).list().getItems() + client.apps().deployments().inNamespace(ns).withLabels(tezamSelector).list().getItems() .stream() - .filter(ss -> { - String llapName = ss.getMetadata().getLabels().get(Labels.LLAP_CLUSTER); + .filter(d -> { + String llapName = d.getMetadata().getLabels().get(Labels.LLAP_CLUSTER); return llapName != null && !desiredNames.contains(llapName); }) - .forEach(ss -> { - String llapName = ss.getMetadata().getLabels().get(Labels.LLAP_CLUSTER); + .forEach(d -> { + String llapName = d.getMetadata().getLabels().get(Labels.LLAP_CLUSTER); LOG.info("Garbage-collecting TezAM for LLAP cluster '{}' in {}/{}", llapName, ns, clusterName); - client.apps().statefulSets().inNamespace(ns).withName(ss.getMetadata().getName()).delete(); - client.services().inNamespace(ns).withName(ss.getMetadata().getName()).delete(); + client.apps().deployments().inNamespace(ns).withName(d.getMetadata().getName()).delete(); + client.services().inNamespace(ns).withName(d.getMetadata().getName()).delete(); client.configMaps().inNamespace(ns) - .withName(ss.getMetadata().getName() + "-config").delete(); + .withName(d.getMetadata().getName() + "-config").delete(); client.policy().v1().podDisruptionBudget().inNamespace(ns) - .withName(ss.getMetadata().getName() + "-pdb").delete(); + .withName(d.getMetadata().getName() + "-pdb").delete(); + client.discovery().v1().endpointSlices().inNamespace(ns) + .withName(d.getMetadata().getName() + "-hostnames").delete(); }); } + /** + * Maintains an operator-managed EndpointSlice for the TezAM headless Service. + * The default EndpointSlice controller does not set the {@code hostname} field for + * Deployment pods, so per-pod DNS records are not created by CoreDNS. This method + * creates/updates an EndpointSlice (with managed-by=hive-kubernetes-operator) + * that includes {@code hostname} for each ready TezAM pod, giving CoreDNS the data + * it needs to serve {@code ...svc.cluster.local} A-records. + */ + private void reconcileTezAmEndpointSlice(HiveCluster resource, KubernetesClient client, LlapSpec llapSpec) { + String ns = resource.getMetadata().getNamespace(); + Map selector = Labels.selectorForTezAmCluster(resource, llapSpec.name()); + List pods = client.pods().inNamespace(ns).withLabels(selector).list().getItems(); + var slice = LlapResourceBuilder.buildTezAmEndpointSlice(resource, llapSpec, pods); + client.discovery().v1().endpointSlices().inNamespace(ns) + .resource(slice) + .forceConflicts() + .serverSideApply(); + } + // --- Auto-Suspend / Wake --- enum SuspendAction { RUNNING, IDLE_START, IDLE_WAITING, SUSPEND_NOW, STAY_SUSPENDED, WAKE } @@ -829,7 +852,7 @@ private boolean isClusterIdle(HiveCluster resource, KubernetesClient client) { if (spec.tezAm().isEnabled()) { for (var llap : spec.llapClusters()) { if (llap.isEnabled() - && !isAtMinReplicas(client, ns, name + "-tezam-" + llap.name(), true, + && !isAtMinReplicas(client, ns, name + "-tezam-" + llap.name(), false, llap.tezAm().autoscaling().minReplicas())) { return false; } From 0af4cb8d4b5c695f653caf37a816dd598771c8c1 Mon Sep 17 00:00:00 2001 From: tanishq-chugh Date: Wed, 24 Jun 2026 19:58:48 +0530 Subject: [PATCH 2/7] Add cluster level serviceAccountName support in K8s Operator --- .../hive-operator/crds/hiveclusters.hive.apache.org-v1.yml | 4 ++++ .../kubernetes/helm/hive-operator/templates/hivecluster.yaml | 4 +++- packaging/src/kubernetes/helm/hive-operator/values.yaml | 5 ++++- .../operator/dependent/HiveServer2DeploymentDependent.java | 1 + .../kubernetes/operator/dependent/LlapResourceBuilder.java | 2 ++ .../operator/dependent/MetastoreDeploymentDependent.java | 1 + .../operator/dependent/SchemaInitJobDependent.java | 1 + .../hive/kubernetes/operator/model/HiveClusterSpec.java | 3 +++ .../operator/reconciler/HiveClusterReconciler.java | 3 ++- 9 files changed, 21 insertions(+), 3 deletions(-) diff --git a/packaging/src/kubernetes/helm/hive-operator/crds/hiveclusters.hive.apache.org-v1.yml b/packaging/src/kubernetes/helm/hive-operator/crds/hiveclusters.hive.apache.org-v1.yml index 81947c1c4910..9947f44b75b2 100644 --- a/packaging/src/kubernetes/helm/hive-operator/crds/hiveclusters.hive.apache.org-v1.yml +++ b/packaging/src/kubernetes/helm/hive-operator/crds/hiveclusters.hive.apache.org-v1.yml @@ -692,6 +692,10 @@ spec: type: string type: object x-kubernetes-preserve-unknown-fields: true + serviceAccountName: + description: "Kubernetes ServiceAccount name for all component pods.\ + \ If not specified, pods use the namespace default service account." + type: string suspend: description: "When true, the cluster is immediately suspended (all\ \ components scaled to 0). Set to false to wake a suspended cluster." diff --git a/packaging/src/kubernetes/helm/hive-operator/templates/hivecluster.yaml b/packaging/src/kubernetes/helm/hive-operator/templates/hivecluster.yaml index 67ec6c168fb9..278ae40e8df0 100644 --- a/packaging/src/kubernetes/helm/hive-operator/templates/hivecluster.yaml +++ b/packaging/src/kubernetes/helm/hive-operator/templates/hivecluster.yaml @@ -26,6 +26,9 @@ metadata: spec: image: {{ .Values.cluster.image }} imagePullPolicy: {{ .Values.cluster.imagePullPolicy }} + {{- if .Values.cluster.serviceAccountName }} + serviceAccountName: {{ .Values.cluster.serviceAccountName }} + {{- end }} metastore: enabled: {{ .Values.cluster.metastore.enabled }} @@ -178,7 +181,6 @@ spec: tezAm: enabled: {{ .Values.cluster.tezAm.enabled }} {{- if .Values.cluster.tezAm.enabled }} - replicas: {{ .Values.cluster.tezAm.replicas }} scratchStorageSize: {{ .Values.cluster.tezAm.scratchStorageSize | quote }} {{- if .Values.cluster.tezAm.scratchStorageClassName }} scratchStorageClassName: {{ .Values.cluster.tezAm.scratchStorageClassName | quote }} diff --git a/packaging/src/kubernetes/helm/hive-operator/values.yaml b/packaging/src/kubernetes/helm/hive-operator/values.yaml index 26f7eeb5af5b..c16f7240ea20 100644 --- a/packaging/src/kubernetes/helm/hive-operator/values.yaml +++ b/packaging/src/kubernetes/helm/hive-operator/values.yaml @@ -47,6 +47,10 @@ cluster: image: "apache/hive:4.3.0-SNAPSHOT" imagePullPolicy: IfNotPresent + # ServiceAccount name for all component pods (HS2, Metastore, LLAP, TezAM, schema-init). + # If empty, pods use the namespace default service account. + serviceAccountName: "" + # --------------------------------------------------------------------------- # DATABASE (Required) — RDBMS for the Hive Metastore backend # --------------------------------------------------------------------------- @@ -207,7 +211,6 @@ cluster: # --------------------------------------------------------------------------- tezAm: enabled: true - replicas: 2 scratchStorageSize: "1Gi" scratchStorageClassName: "" resources: {} diff --git a/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/dependent/HiveServer2DeploymentDependent.java b/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/dependent/HiveServer2DeploymentDependent.java index 3afb0af118d2..4ef8a21693e2 100644 --- a/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/dependent/HiveServer2DeploymentDependent.java +++ b/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/dependent/HiveServer2DeploymentDependent.java @@ -251,6 +251,7 @@ protected Deployment desired(HiveCluster hiveCluster, .addToAnnotations("hive.apache.org/config-hash", configHash) .endMetadata() .withNewSpec() + .withServiceAccountName(spec.serviceAccountName()) .withInitContainers(initContainers) .addNewContainer() .withName(COMPONENT) diff --git a/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/dependent/LlapResourceBuilder.java b/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/dependent/LlapResourceBuilder.java index 933b66f171a8..455f8920c05b 100644 --- a/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/dependent/LlapResourceBuilder.java +++ b/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/dependent/LlapResourceBuilder.java @@ -441,6 +441,7 @@ private Deployment doBuildTezAmDeployment(HiveCluster hiveCluster, LlapSpec llap .addToAnnotations("hive.apache.org/config-hash", configHash) .endMetadata() .withNewSpec() + .withServiceAccountName(spec.serviceAccountName()) .withSubdomain(deployName) .withInitContainers(initContainers) .addNewContainer() @@ -588,6 +589,7 @@ private StatefulSet doBuildStatefulSet(HiveCluster hiveCluster, LlapSpec llap, I .addToAnnotations("hive.apache.org/config-hash", configHash) .endMetadata() .withNewSpec() + .withServiceAccountName(spec.serviceAccountName()) .withInitContainers(initContainers) .addNewContainer() .withName(ConfigUtils.COMPONENT_LLAP) diff --git a/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/dependent/MetastoreDeploymentDependent.java b/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/dependent/MetastoreDeploymentDependent.java index ff19afd5c023..7a8616c35596 100644 --- a/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/dependent/MetastoreDeploymentDependent.java +++ b/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/dependent/MetastoreDeploymentDependent.java @@ -161,6 +161,7 @@ protected Deployment desired(HiveCluster hiveCluster, .addToAnnotations("hive.apache.org/config-hash", configHash) .endMetadata() .withNewSpec() + .withServiceAccountName(spec.serviceAccountName()) .withInitContainers(initContainers) .addNewContainer() .withName(COMPONENT) diff --git a/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/dependent/SchemaInitJobDependent.java b/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/dependent/SchemaInitJobDependent.java index fb4b588401c9..d68af96e4f0e 100644 --- a/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/dependent/SchemaInitJobDependent.java +++ b/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/dependent/SchemaInitJobDependent.java @@ -128,6 +128,7 @@ protected Job desired(HiveCluster hiveCluster, hiveCluster, COMPONENT)) .endMetadata() .withNewSpec() + .withServiceAccountName(spec.serviceAccountName()) .withRestartPolicy("OnFailure") .withInitContainers(initContainers) .addNewContainer() diff --git a/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/model/HiveClusterSpec.java b/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/model/HiveClusterSpec.java index b9c0faf42c55..cb1428826bbf 100644 --- a/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/model/HiveClusterSpec.java +++ b/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/model/HiveClusterSpec.java @@ -82,6 +82,9 @@ public record HiveClusterSpec( + "(e.g., mounting a GCS key file at /etc/gcs/key.json)") @SchemaFrom(type = Object[].class) @PreserveUnknownFields List volumeMounts, + @JsonPropertyDescription("Kubernetes ServiceAccount name for all component pods. " + + "If not specified, pods use the namespace default service account.") + String serviceAccountName, @JsonPropertyDescription("Auto-suspend configuration. When enabled and all components " + "are idle for the configured timeout, the cluster scales to 0 replicas.") AutoSuspendSpec autoSuspend, diff --git a/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/reconciler/HiveClusterReconciler.java b/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/reconciler/HiveClusterReconciler.java index 5f0739baeedb..8af1b66a2bd8 100644 --- a/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/reconciler/HiveClusterReconciler.java +++ b/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/reconciler/HiveClusterReconciler.java @@ -586,7 +586,8 @@ private void patchSuspendSpec(KubernetesClient client, HiveCluster resource, boo oldSpec.hiveServer2(), oldSpec.llapClusters(), oldSpec.llapClusterRouting(), oldSpec.tezAm(), oldSpec.zookeeper(), oldSpec.hadoop(), oldSpec.envVars(), oldSpec.externalJars(), - oldSpec.volumes(), oldSpec.volumeMounts(), oldSpec.autoSuspend(), suspend); + oldSpec.volumes(), oldSpec.volumeMounts(), oldSpec.serviceAccountName(), + oldSpec.autoSuspend(), suspend); hc.setSpec(newSpec); return hc; }); From 86a7e298630d541a539fc5872ccc97f0b18c4960 Mon Sep 17 00:00:00 2001 From: tanishq-chugh Date: Thu, 25 Jun 2026 14:14:40 +0530 Subject: [PATCH 3/7] Update kubeVersion in helm chart --- packaging/src/kubernetes/helm/hive-operator/Chart.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packaging/src/kubernetes/helm/hive-operator/Chart.yaml b/packaging/src/kubernetes/helm/hive-operator/Chart.yaml index b1e8104b155e..8aea42bdca59 100644 --- a/packaging/src/kubernetes/helm/hive-operator/Chart.yaml +++ b/packaging/src/kubernetes/helm/hive-operator/Chart.yaml @@ -19,7 +19,7 @@ description: Apache Hive Kubernetes Operator - deploys and manages Hive clusters type: application version: "4.3.0-SNAPSHOT" appVersion: "4.3.0-SNAPSHOT" -kubeVersion: ">=1.25.0" +kubeVersion: ">=1.25.0-0" keywords: - hive - hadoop From b1a8ee13164c44048407e052c4559d8f6216953c Mon Sep 17 00:00:00 2001 From: tanishq-chugh Date: Thu, 25 Jun 2026 15:49:47 +0530 Subject: [PATCH 4/7] Address copilot comments - Endpoint slice issue --- .../kubernetes/operator/dependent/LlapResourceBuilder.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/dependent/LlapResourceBuilder.java b/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/dependent/LlapResourceBuilder.java index 455f8920c05b..f955230732af 100644 --- a/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/dependent/LlapResourceBuilder.java +++ b/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/dependent/LlapResourceBuilder.java @@ -279,8 +279,12 @@ public static EndpointSlice buildTezAmEndpointSlice(HiveCluster hc, LlapSpec lla continue; } boolean ready = isPodReady(pod); + String hostname = pod.getMetadata().getName(); + if (hostname.length() > 63) { + hostname = hostname.substring(0, 63).replaceAll("-+$", ""); + } endpoints.add(new EndpointBuilder() - .withHostname(pod.getMetadata().getName()) + .withHostname(hostname) .withAddresses(ip) .withNewConditions() .withReady(ready) From aab81772d22187c8d8b5679ff0de0c168e9ec0be Mon Sep 17 00:00:00 2001 From: tanishq-chugh Date: Thu, 25 Jun 2026 20:28:57 +0530 Subject: [PATCH 5/7] Address copilot comments - hardcoded IP address format --- .../dependent/LlapResourceBuilder.java | 19 ++++++++++++++++++- .../reconciler/HiveClusterReconciler.java | 14 +++++++++++++- 2 files changed, 31 insertions(+), 2 deletions(-) diff --git a/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/dependent/LlapResourceBuilder.java b/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/dependent/LlapResourceBuilder.java index f955230732af..4b2b1aabe092 100644 --- a/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/dependent/LlapResourceBuilder.java +++ b/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/dependent/LlapResourceBuilder.java @@ -261,6 +261,7 @@ public static String tezAmEndpointSliceName(HiveCluster hc, LlapSpec llap) { * that gap, giving every ready TezAM pod a resolvable FQDN. * * @param pods list of TezAM pods + * @return the EndpointSlice, or {@code null} if there are no pod IPs yet or if pods have mixed IPv4/IPv6 addresses */ public static EndpointSlice buildTezAmEndpointSlice(HiveCluster hc, LlapSpec llap, List pods) { String ns = hc.getMetadata().getNamespace(); @@ -273,11 +274,18 @@ public static EndpointSlice buildTezAmEndpointSlice(HiveCluster hc, LlapSpec lla Labels.APP_COMPONENT, ConfigUtils.COMPONENT_TEZAM)); List endpoints = new ArrayList<>(); + String addressType = null; for (var pod : pods) { String ip = pod.getStatus() != null ? pod.getStatus().getPodIP() : null; if (ip == null || ip.isEmpty()) { continue; } + String ipFamily = ipAddressType(ip); + if (addressType == null) { + addressType = ipFamily; + } else if (!addressType.equals(ipFamily)) { + return null; + } boolean ready = isPodReady(pod); String hostname = pod.getMetadata().getName(); if (hostname.length() > 63) { @@ -299,6 +307,10 @@ public static EndpointSlice buildTezAmEndpointSlice(HiveCluster hc, LlapSpec lla .build()); } + if (endpoints.isEmpty()) { + return null; + } + return new EndpointSliceBuilder() .withNewMetadata() .withName(tezAmEndpointSliceName(hc, llap)) @@ -306,11 +318,16 @@ public static EndpointSlice buildTezAmEndpointSlice(HiveCluster hc, LlapSpec lla .withLabels(labels) .withOwnerReferences(ownerRef(hc)) .endMetadata() - .withAddressType("IPv4") + .withAddressType(addressType) .withEndpoints(endpoints) .build(); } + /** Returns {@code IPv4} or {@code IPv6} for a pod IP string. */ + private static String ipAddressType(String ip) { + return ip.indexOf(':') >= 0 ? "IPv6" : "IPv4"; + } + private static boolean isPodReady(io.fabric8.kubernetes.api.model.Pod pod) { if (pod.getStatus() == null || pod.getStatus().getConditions() == null) { return false; diff --git a/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/reconciler/HiveClusterReconciler.java b/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/reconciler/HiveClusterReconciler.java index 8af1b66a2bd8..b4d6c2e9cf72 100644 --- a/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/reconciler/HiveClusterReconciler.java +++ b/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/reconciler/HiveClusterReconciler.java @@ -34,6 +34,7 @@ import io.fabric8.kubernetes.api.model.apps.Deployment; import io.fabric8.kubernetes.api.model.apps.StatefulSet; import io.fabric8.kubernetes.api.model.batch.v1.Job; +import io.fabric8.kubernetes.api.model.discovery.v1.EndpointSlice; import io.fabric8.kubernetes.client.KubernetesClient; import io.javaoperatorsdk.operator.api.reconciler.Context; import io.javaoperatorsdk.operator.api.reconciler.Cleaner; @@ -778,7 +779,18 @@ private void reconcileTezAmEndpointSlice(HiveCluster resource, KubernetesClient String ns = resource.getMetadata().getNamespace(); Map selector = Labels.selectorForTezAmCluster(resource, llapSpec.name()); List pods = client.pods().inNamespace(ns).withLabels(selector).list().getItems(); - var slice = LlapResourceBuilder.buildTezAmEndpointSlice(resource, llapSpec, pods); + EndpointSlice slice = LlapResourceBuilder.buildTezAmEndpointSlice(resource, llapSpec, pods); + String sliceName = LlapResourceBuilder.tezAmEndpointSliceName(resource, llapSpec); + if (slice == null) { + client.discovery().v1().endpointSlices().inNamespace(ns).withName(sliceName).delete(); + return; + } + var existing = client.discovery().v1().endpointSlices().inNamespace(ns).withName(sliceName).get(); + if (existing != null && existing.getAddressType() != null + && !existing.getAddressType().equals(slice.getAddressType())) { + client.discovery().v1().endpointSlices().inNamespace(ns).withName(sliceName).withGracePeriod(0L).delete(); + } + client.discovery().v1().endpointSlices().inNamespace(ns) .resource(slice) .forceConflicts() From ca95f51cb3518eb4bc685545dfc30d35669c0407 Mon Sep 17 00:00:00 2001 From: tanishq-chugh Date: Thu, 25 Jun 2026 17:14:32 +0530 Subject: [PATCH 6/7] Address SonarQube - 1 --- .../autoscaling/HiveClusterAutoscaler.java | 4 +- .../autoscaling/TezAmBusyMetrics.java | 2 +- .../autoscaling/TezAmZkDeregistrar.java | 40 ++++++++++++------- .../reconciler/HiveClusterReconciler.java | 1 - 4 files changed, 29 insertions(+), 18 deletions(-) diff --git a/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/autoscaling/HiveClusterAutoscaler.java b/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/autoscaling/HiveClusterAutoscaler.java index 10c38d5de6dc..d433801a0dbd 100644 --- a/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/autoscaling/HiveClusterAutoscaler.java +++ b/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/autoscaling/HiveClusterAutoscaler.java @@ -276,7 +276,7 @@ public AutoscalingEvaluation evaluate(HiveCluster cluster, KubernetesClient clie List idlePodNames = tezMetrics.stream() .filter(pm -> !TezAmBusyMetrics.hasActiveDag(pm.metrics())) .map(PodMetrics::podName) - .collect(Collectors.toList()); + .toList(); TezAmZkDeregistrar.deregisterIdlePods( cluster.getSpec().zookeeper().quorum(), llapSpec.name(), idlePodNames, cluster.getSpec().hiveServer2().configOverrides()); @@ -422,7 +422,7 @@ private int getCurrentReplicas(KubernetesClient client, String namespace, } } - /** Counts TezAM pods with active DAG work */ + /** Counts TezAM pods with active DAG work. */ private int countBusyPods(List tezMetrics) { return (int) tezMetrics.stream() .filter(pm -> TezAmBusyMetrics.hasActiveDag(pm.metrics())) diff --git a/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/autoscaling/TezAmBusyMetrics.java b/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/autoscaling/TezAmBusyMetrics.java index 40f922b248c8..7ffee63c56f1 100644 --- a/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/autoscaling/TezAmBusyMetrics.java +++ b/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/autoscaling/TezAmBusyMetrics.java @@ -41,7 +41,7 @@ public static boolean hasActiveDag(Map metrics) { } /** - * Returns the pod-deletion-cost for this AM: + * Returns the pod-deletion-cost for this AM. * 0 if the AM is idle * 1 if the AM is running a DAG */ diff --git a/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/autoscaling/TezAmZkDeregistrar.java b/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/autoscaling/TezAmZkDeregistrar.java index 2d1b78df16e5..b26b34463807 100644 --- a/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/autoscaling/TezAmZkDeregistrar.java +++ b/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/autoscaling/TezAmZkDeregistrar.java @@ -55,6 +55,7 @@ public final class TezAmZkDeregistrar { private static final Logger LOG = LoggerFactory.getLogger(TezAmZkDeregistrar.class); + private static final String PATH_SEPARATOR = "/"; private TezAmZkDeregistrar() {} @@ -79,7 +80,7 @@ public static void deregisterIdlePods(String zkQuorum, String llapName, int baseSleepMs = getTimeMs(hiveSiteConfig, "hive.zookeeper.connection.basesleeptime", 1000); int maxRetries = getInt(hiveSiteConfig, "hive.zookeeper.connection.max.retries", 3); - String registryPath = ConfigUtils.TEZ_EXTERNAL_SESSIONS_ZK_PREFIX + "/" + llapName; + String registryPath = ConfigUtils.TEZ_EXTERNAL_SESSIONS_ZK_PREFIX + PATH_SEPARATOR + llapName; CuratorFramework client = CuratorFrameworkFactory.builder() .connectString(zkQuorum) .connectionTimeoutMs(connTimeoutMs) @@ -96,27 +97,23 @@ public static void deregisterIdlePods(String zkQuorum, String llapName, return; } for (String appId : client.getChildren().forPath(registryPath)) { - String nodePath = registryPath + "/" + appId; + String nodePath = registryPath + PATH_SEPARATOR + appId; byte[] data = client.getData().forPath(nodePath); if (data == null || data.length == 0) { continue; } String hostName = extractHostName(new String(data, StandardCharsets.UTF_8)); - if (hostName == null) { - continue; - } - // hostName = "...svc.cluster.local" - String podName = hostName.contains(".") ? hostName.substring(0, hostName.indexOf('.')) : hostName; - if (idlePodNames.contains(podName)) { - try { - client.delete().forPath(nodePath); - LOG.info("[tezam-{}] Deregistered pod {} (appId={}) from ZK — HS2 will stop routing to it", - llapName, podName, appId); - } catch (Exception e) { - LOG.warn("[tezam-{}] Failed to delete ZK node for pod {}: {}", llapName, podName, e.getMessage()); + if (hostName != null) { + // hostName = "...svc.cluster.local" + String podName = hostName.contains(".") ? hostName.substring(0, hostName.indexOf('.')) : hostName; + if (idlePodNames.contains(podName)) { + deleteRegistrationNode(client, llapName, nodePath, podName, appId); } } } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.warn("[tezam-{}] ZK deregistration interrupted: {}", llapName, e.getMessage()); } catch (Exception e) { LOG.warn("[tezam-{}] ZK deregistration error: {}", llapName, e.getMessage()); } finally { @@ -124,6 +121,21 @@ public static void deregisterIdlePods(String zkQuorum, String llapName, } } + private static void deleteRegistrationNode(CuratorFramework client, String llapName, + String nodePath, String podName, String appId) { + try { + client.delete().forPath(nodePath); + LOG.info("[tezam-{}] Deregistered pod {} (appId={}) from ZK — HS2 will stop routing to it", + llapName, podName, appId); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.warn("[tezam-{}] Failed to delete ZK node for pod {} (interrupted): {}", + llapName, podName, e.getMessage()); + } catch (Exception e) { + LOG.warn("[tezam-{}] Failed to delete ZK node for pod {}: {}", llapName, podName, e.getMessage()); + } + } + /** * Reads a time value (in milliseconds) from the config map. * If the key is absent or un-parseable the defaultMs value is returned. diff --git a/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/reconciler/HiveClusterReconciler.java b/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/reconciler/HiveClusterReconciler.java index b4d6c2e9cf72..1afb813aad47 100644 --- a/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/reconciler/HiveClusterReconciler.java +++ b/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/reconciler/HiveClusterReconciler.java @@ -638,7 +638,6 @@ private void reconcileLlapClusters(HiveCluster resource, KubernetesClient client // --- Per-LLAP TezAM resources (one TezAM per LLAP cluster) --- if (resource.getSpec().tezAm().isEnabled()) { int tezAmReplicas = resolveTezAmReplicaCount(resource, ns, clusterName, llapSpec); - String tezAmName = LlapResourceBuilder.tezAmResourceName(resource, llapSpec); client.configMaps().inNamespace(ns) .resource(LlapResourceBuilder.buildTezAmConfigMap(resource, llapSpec)) .serverSideApply(); From 82b804c83e12c729ca574b8bfc332e43ffe78cfe Mon Sep 17 00:00:00 2001 From: tanishq-chugh Date: Thu, 25 Jun 2026 23:48:24 +0530 Subject: [PATCH 7/7] Refactor Tez AM auto-scaling evaluate code to fix patches issues --- .../autoscaling/HiveClusterAutoscaler.java | 71 ++++++++++--------- 1 file changed, 39 insertions(+), 32 deletions(-) diff --git a/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/autoscaling/HiveClusterAutoscaler.java b/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/autoscaling/HiveClusterAutoscaler.java index d433801a0dbd..44bfb146bf97 100644 --- a/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/autoscaling/HiveClusterAutoscaler.java +++ b/packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/autoscaling/HiveClusterAutoscaler.java @@ -254,53 +254,60 @@ public AutoscalingEvaluation evaluate(HiveCluster cluster, KubernetesClient clie tezAuto.metricsPort(), tezAuto.metricsScrapeIntervalSeconds()); String tezKey = cacheKey(namespace, clusterName, tezAmComponentKey); List tezMetrics = metricsCache.getOrEmpty(tezKey, tezAuto.metricsScrapeIntervalSeconds() * 3); + int currentTezReplicas = getCurrentReplicas(client, namespace, clusterName, tezAmComponentKey); updateDeploymentPodDeletionCost(client, namespace, tezMetrics, pm -> TezAmBusyMetrics.deletionCost(pm.metrics())); - Map tezPatches = new HashMap<>(); - evaluateComponent(cluster, client, namespace, clusterName, - tezAmComponentKey, tezAuto, perLlapTezAm.replicas(), tezPatches, statuses, tezMetrics); + PendingScaleDown pending = pendingScaleDowns.get(tezKey); + if (pending != null) { + Integer appliedTarget = null; + if (Duration.between(pending.annotatedAt(), Instant.now()).toSeconds() >= 2) { + // Deregister idle AMs from ZK before applying the scale patch. + // All HS2 instances see CHILD_REMOVED and stop routing to these AMs. + // No new DAGs can arrive on pods that are about to be terminated. + List idlePodNames = tezMetrics.stream() + .filter(pm -> !TezAmBusyMetrics.hasActiveDag(pm.metrics())) + .map(PodMetrics::podName) + .toList(); + TezAmZkDeregistrar.deregisterIdlePods( + cluster.getSpec().zookeeper().quorum(), llapSpec.name(), idlePodNames, + cluster.getSpec().hiveServer2().configOverrides()); + appliedTarget = pending.targetReplicas(); + patches.put(tezAmComponentKey, appliedTarget); + MANAGED_REPLICAS.put(tezKey, appliedTarget); + lastScaleTimes.put(tezKey, Instant.now().toString()); + pendingScaleDowns.remove(tezKey); + LOG.info("[{}] Applying deferred scale-down to {} replicas", tezAmComponentKey, appliedTarget); + } + evaluateComponent(cluster, client, namespace, clusterName, + tezAmComponentKey, tezAuto, perLlapTezAm.replicas(), new HashMap<>(), statuses, tezMetrics); + if (pendingScaleDowns.containsKey(tezKey)) { + MANAGED_REPLICAS.put(tezKey, currentTezReplicas); + } else if (appliedTarget != null) { + MANAGED_REPLICAS.put(tezKey, appliedTarget); + } + } else { + Map tezPatches = new HashMap<>(); + evaluateComponent(cluster, client, namespace, clusterName, + tezAmComponentKey, tezAuto, perLlapTezAm.replicas(), tezPatches, statuses, tezMetrics); - Integer tezPatch = tezPatches.get(tezAmComponentKey); - int currentTezReplicas = getCurrentReplicas(client, namespace, clusterName, tezAmComponentKey); - if (tezPatch != null && tezPatch < currentTezReplicas) { - // Scale-down: effective target = max(desired, busyCount). - // Idle pods are removed first as per pod-deletion-cost - PendingScaleDown pending = pendingScaleDowns.get(tezKey); - if (pending != null) { - if (Duration.between(pending.annotatedAt(), Instant.now()).toSeconds() >= 2) { - // Deregister idle AMs from ZK before applying the scale patch. - // All HS2 instances (active-active HA) see CHILD_REMOVED and stop routing to - // these AMs — no new DAGs can arrive on pods that are about to be terminated. - List idlePodNames = tezMetrics.stream() - .filter(pm -> !TezAmBusyMetrics.hasActiveDag(pm.metrics())) - .map(PodMetrics::podName) - .toList(); - TezAmZkDeregistrar.deregisterIdlePods( - cluster.getSpec().zookeeper().quorum(), llapSpec.name(), idlePodNames, - cluster.getSpec().hiveServer2().configOverrides()); - patches.put(tezAmComponentKey, pending.targetReplicas()); - MANAGED_REPLICAS.put(tezKey, pending.targetReplicas()); - lastScaleTimes.put(tezKey, Instant.now().toString()); - pendingScaleDowns.remove(tezKey); - LOG.info("[{}] Applying deferred scale-down to {} replicas", tezAmComponentKey, - pending.targetReplicas()); - } - } else { + Integer tezPatch = tezPatches.get(tezAmComponentKey); + if (tezPatch != null && tezPatch < currentTezReplicas) { int busyCount = countBusyPods(tezMetrics); int effectivePatch = Math.max(tezPatch, busyCount); if (effectivePatch != tezPatch) { LOG.info("[{}] Scale-down target adjusted to {} (desired={}, busy AMs={})", tezAmComponentKey, effectivePatch, tezPatch, busyCount); } + MANAGED_REPLICAS.put(tezKey, currentTezReplicas); pendingScaleDowns.put(tezKey, new PendingScaleDown(effectivePatch, Instant.now())); LOG.info("[{}] Deferring scale-down to {} (waiting for deletion-cost propagation)", tezAmComponentKey, effectivePatch); + } else if (tezPatch != null) { + patches.put(tezAmComponentKey, tezPatch); + MANAGED_REPLICAS.put(tezKey, tezPatch); } - } else if (tezPatch != null) { - patches.put(tezAmComponentKey, tezPatch); - MANAGED_REPLICAS.put(tezKey, tezPatch); } } }