Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1080,6 +1080,9 @@ public void dagComplete() {
writeLock.lock();
try {
dagRunning = false;
if (metrics != null) {
metrics.setDagRunning(false);
}
dagStats = new StatsPerDag();
int pendingCount = 0;
for (Entry<Priority, List<TaskInfo>> entry : pendingTasks.entrySet()) {
Expand Down Expand Up @@ -1173,6 +1176,9 @@ public void allocateTask(Object task, Resource capability, String[] hosts, Strin
metrics.setDagId(id.getDAGID().toString());
}
dagRunning = true;
if (metrics != null) {
metrics.setDagRunning(true);
}
}
dagStats.registerTaskRequest(hosts, racks);
addPendingTask(taskInfo);
Expand All @@ -1194,6 +1200,9 @@ public void allocateTask(Object task, Resource capability, ContainerId container
metrics.setDagId(id.getDAGID().toString());
}
dagRunning = true;
if (metrics != null) {
metrics.setDagRunning(true);
}
}
dagStats.registerTaskRequest(null, null);
addPendingTask(taskInfo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@
SchedulerRunningTaskCount("Total number of running tasks"),
SchedulerPendingPreemptionTaskCount("Total number of tasks pending for pre-emption"),
SchedulerPreemptedTaskCount("Total number of tasks pre-empted"),
SchedulerCompletedDagCount("Number of DAGs completed");
SchedulerCompletedDagCount("Number of DAGs completed"),
SchedulerDagRunning("Binary that represents if the AM is idle or running a DAG");

Check failure on line 42 in llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapTaskSchedulerInfo.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Rename this constant name to match the regular expression '^[A-Z][A-Z0-9]*(_[A-Z0-9]+)*$'.

See more on https://sonarcloud.io/project/issues?id=apache_hive&issues=AZ762YtDDH3uf3D_gE61&open=AZ762YtDDH3uf3D_gE61&pullRequest=6561

private final String desc;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import static org.apache.hadoop.hive.llap.tezplugins.metrics.LlapTaskSchedulerInfo.SchedulerPendingPreemptionTaskCount;
import static org.apache.hadoop.hive.llap.tezplugins.metrics.LlapTaskSchedulerInfo.SchedulerPendingTaskCount;
import static org.apache.hadoop.hive.llap.tezplugins.metrics.LlapTaskSchedulerInfo.SchedulerPreemptedTaskCount;
import static org.apache.hadoop.hive.llap.tezplugins.metrics.LlapTaskSchedulerInfo.SchedulerDagRunning;
import static org.apache.hadoop.hive.llap.tezplugins.metrics.LlapTaskSchedulerInfo.SchedulerRunningTaskCount;
import static org.apache.hadoop.hive.llap.tezplugins.metrics.LlapTaskSchedulerInfo.SchedulerSchedulableTaskCount;
import static org.apache.hadoop.hive.llap.tezplugins.metrics.LlapTaskSchedulerInfo.SchedulerSuccessfulTaskCount;
Expand Down Expand Up @@ -84,6 +85,8 @@ public class LlapTaskSchedulerMetrics implements MetricsSource {
@Metric
MutableCounterInt completedDagcount;
@Metric
MutableGaugeInt dagRunning;
@Metric
MutableCounterInt pendingPreemptionTasksCount;
@Metric
MutableGaugeInt wmUnusedGuaranteedCount;
Expand Down Expand Up @@ -276,6 +279,7 @@ private void getTaskSchedulerStats(MetricsRecordBuilder rb) {
.addGauge(SchedulerMemoryPerInstance, memoryPerInstance.value())
.addGauge(SchedulerCpuCoresPerInstance, cpuCoresPerInstance.value())
.addGauge(SchedulerDisabledNodeCount, disabledNodeCount.value())
.addGauge(SchedulerDagRunning, dagRunning.value())
.addCounter(SchedulerPendingTaskCount, pendingTasksCount.value())
.addCounter(SchedulerSchedulableTaskCount, schedulableTasksCount.value())
.addCounter(SchedulerRunningTaskCount, runningTasksCount.value())
Expand All @@ -285,6 +289,10 @@ private void getTaskSchedulerStats(MetricsRecordBuilder rb) {
.addCounter(SchedulerCompletedDagCount, completedDagcount.value());
}

public void setDagRunning(boolean running) {
dagRunning.set(running ? 1 : 0);
}

public JvmMetrics getJvmMetrics() {
return jvmMetrics;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -692,6 +692,10 @@ spec:
type: string
type: object
x-kubernetes-preserve-unknown-fields: true
serviceAccountName:
description: "Kubernetes ServiceAccount name for all component pods.\
\ If not specified, pods use the namespace default service account."
type: string
suspend:
description: "When true, the cluster is immediately suspended (all\
\ components scaled to 0). Set to false to wake a suspended cluster."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,7 @@ rules:
- apiGroups: ["policy"]
resources: ["poddisruptionbudgets"]
verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
# EndpointSlices: operator manages a custom per-pod-hostname slice for TezAM DNS
- apiGroups: ["discovery.k8s.io"]
resources: ["endpointslices"]
verbs: ["get", "list", "create", "patch", "delete"]
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ metadata:
spec:
image: {{ .Values.cluster.image }}
imagePullPolicy: {{ .Values.cluster.imagePullPolicy }}
{{- if .Values.cluster.serviceAccountName }}
serviceAccountName: {{ .Values.cluster.serviceAccountName }}
{{- end }}

metastore:
enabled: {{ .Values.cluster.metastore.enabled }}
Expand Down Expand Up @@ -178,7 +181,6 @@ spec:
tezAm:
enabled: {{ .Values.cluster.tezAm.enabled }}
{{- if .Values.cluster.tezAm.enabled }}
replicas: {{ .Values.cluster.tezAm.replicas }}
scratchStorageSize: {{ .Values.cluster.tezAm.scratchStorageSize | quote }}
{{- if .Values.cluster.tezAm.scratchStorageClassName }}
scratchStorageClassName: {{ .Values.cluster.tezAm.scratchStorageClassName | quote }}
Comment on lines 181 to 186
Expand Down
5 changes: 4 additions & 1 deletion packaging/src/kubernetes/helm/hive-operator/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ cluster:
image: "apache/hive:4.3.0-SNAPSHOT"
imagePullPolicy: IfNotPresent

# ServiceAccount name for all component pods (HS2, Metastore, LLAP, TezAM, schema-init).
# If empty, pods use the namespace default service account.
serviceAccountName: ""

# ---------------------------------------------------------------------------
# DATABASE (Required) — RDBMS for the Hive Metastore backend
# ---------------------------------------------------------------------------
Expand Down Expand Up @@ -207,7 +211,6 @@ cluster:
# ---------------------------------------------------------------------------
tezAm:
enabled: true
replicas: 2
scratchStorageSize: "1Gi"
scratchStorageClassName: ""
resources: {}
Comment on lines 211 to 216
Expand Down
5 changes: 5 additions & 0 deletions packaging/src/kubernetes/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@
<version>${fabric8.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>${curator.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.ToIntFunction;
import java.util.stream.Collectors;

import io.fabric8.kubernetes.client.KubernetesClient;
import org.apache.hive.kubernetes.operator.model.HiveCluster;
Expand Down Expand Up @@ -131,7 +133,7 @@
* @param client the Kubernetes client (for reading current replica counts)
* @return evaluation result with patches and per-component autoscaling statuses
*/
public AutoscalingEvaluation evaluate(HiveCluster cluster, KubernetesClient client) {

Check warning on line 136 in packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/autoscaling/HiveClusterAutoscaler.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Method evaluate length is 174 lines (max allowed is 150).

See more on https://sonarcloud.io/project/issues?id=apache_hive&issues=AZ762YrCDH3uf3D_gE6x&open=AZ762YrCDH3uf3D_gE6x&pullRequest=6561
Map<String, Integer> patches = new HashMap<>();
Map<String, AutoscalingStatus> statuses = new HashMap<>();
HiveClusterSpec spec = cluster.getSpec();
Expand Down Expand Up @@ -178,8 +180,9 @@
} else {
// Pod deletion cost only applies to Deployments (ReplicaSet controller).
// StatefulSets always scale down by highest ordinal regardless of this
// annotation. LLAP/TezAM graceful drain is handled by preStop hooks.
updateDeploymentPodDeletionCost(client, namespace, hs2Metrics, "hs2_open_sessions");
// annotation. LLAP graceful drain is handled by preStop hooks.
updateDeploymentPodDeletionCost(client, namespace, hs2Metrics,
pm -> pm.metrics().getOrDefault("hs2_open_sessions", 0.0).intValue());

Map<String, Integer> hs2Patches = new HashMap<>();
evaluateComponent(cluster, client, namespace, clusterName,
Expand Down Expand Up @@ -251,9 +254,54 @@
tezAuto.metricsPort(), tezAuto.metricsScrapeIntervalSeconds());
String tezKey = cacheKey(namespace, clusterName, tezAmComponentKey);
List<PodMetrics> tezMetrics = metricsCache.getOrEmpty(tezKey, tezAuto.metricsScrapeIntervalSeconds() * 3);

updateDeploymentPodDeletionCost(client, namespace, tezMetrics,
pm -> TezAmBusyMetrics.deletionCost(pm.metrics()));

Map<String, Integer> tezPatches = new HashMap<>();
evaluateComponent(cluster, client, namespace, clusterName,
tezAmComponentKey, tezAuto,
perLlapTezAm.replicas(), patches, statuses, tezMetrics);
tezAmComponentKey, tezAuto, perLlapTezAm.replicas(), tezPatches, statuses, tezMetrics);

Integer tezPatch = tezPatches.get(tezAmComponentKey);
int currentTezReplicas = getCurrentReplicas(client, namespace, clusterName, tezAmComponentKey);
if (tezPatch != null && tezPatch < currentTezReplicas) {
// Scale-down: effective target = max(desired, busyCount).
// Idle pods are removed first as per pod-deletion-cost
PendingScaleDown pending = pendingScaleDowns.get(tezKey);
if (pending != null) {
if (Duration.between(pending.annotatedAt(), Instant.now()).toSeconds() >= 2) {
// Deregister idle AMs from ZK before applying the scale patch.
// All HS2 instances (active-active HA) see CHILD_REMOVED and stop routing to
// these AMs — no new DAGs can arrive on pods that are about to be terminated.
List<String> idlePodNames = tezMetrics.stream()
.filter(pm -> !TezAmBusyMetrics.hasActiveDag(pm.metrics()))
.map(PodMetrics::podName)
.collect(Collectors.toList());

Check warning on line 279 in packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/autoscaling/HiveClusterAutoscaler.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Replace this usage of 'Stream.collect(Collectors.toList())' with 'Stream.toList()' and ensure that the list is unmodified.

See more on https://sonarcloud.io/project/issues?id=apache_hive&issues=AZ762YrCDH3uf3D_gE6w&open=AZ762YrCDH3uf3D_gE6w&pullRequest=6561
TezAmZkDeregistrar.deregisterIdlePods(
cluster.getSpec().zookeeper().quorum(), llapSpec.name(), idlePodNames,
cluster.getSpec().hiveServer2().configOverrides());
patches.put(tezAmComponentKey, pending.targetReplicas());
MANAGED_REPLICAS.put(tezKey, pending.targetReplicas());
lastScaleTimes.put(tezKey, Instant.now().toString());
pendingScaleDowns.remove(tezKey);
LOG.info("[{}] Applying deferred scale-down to {} replicas", tezAmComponentKey,
pending.targetReplicas());
}
} else {
int busyCount = countBusyPods(tezMetrics);
int effectivePatch = Math.max(tezPatch, busyCount);
if (effectivePatch != tezPatch) {
LOG.info("[{}] Scale-down target adjusted to {} (desired={}, busy AMs={})",
tezAmComponentKey, effectivePatch, tezPatch, busyCount);
}
pendingScaleDowns.put(tezKey, new PendingScaleDown(effectivePatch, Instant.now()));
LOG.info("[{}] Deferring scale-down to {} (waiting for deletion-cost propagation)",
tezAmComponentKey, effectivePatch);
}
} else if (tezPatch != null) {
patches.put(tezAmComponentKey, tezPatch);
MANAGED_REPLICAS.put(tezKey, tezPatch);
}
}
}

Expand Down Expand Up @@ -362,8 +410,7 @@
} else {
workloadName = clusterName + "-" + component;
}
if (component.startsWith(ConfigUtils.COMPONENT_LLAP + "-")
|| component.startsWith(ConfigUtils.COMPONENT_TEZAM + "-")) {
if (component.startsWith(ConfigUtils.COMPONENT_LLAP + "-")) {
var ss = client.apps().statefulSets()
.inNamespace(namespace).withName(workloadName).get();
return ss != null && ss.getSpec().getReplicas() != null ? ss.getSpec().getReplicas() : 0;
Expand All @@ -375,25 +422,32 @@
}
}

/** Counts TezAM pods with active DAG work */

Check warning on line 425 in packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/autoscaling/HiveClusterAutoscaler.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

First sentence should end with a period.

See more on https://sonarcloud.io/project/issues?id=apache_hive&issues=AZ762YrCDH3uf3D_gE6y&open=AZ762YrCDH3uf3D_gE6y&pullRequest=6561
private int countBusyPods(List<PodMetrics> tezMetrics) {
return (int) tezMetrics.stream()
.filter(pm -> TezAmBusyMetrics.hasActiveDag(pm.metrics()))
.count();
}

/**
* Patches each pod's deletion cost annotation based on its active session count.
* Kubernetes uses this during scale-down to kill idle pods first (lower cost = killed first).
* <p>
* Only meaningful for Deployments (HS2, Metastore) — the ReplicaSet controller
* Only meaningful for Deployments (HS2, Metastore, TezAM) — the ReplicaSet controller
* respects this annotation. StatefulSets ignore it and always terminate by ordinal.
*/
private void updateDeploymentPodDeletionCost(KubernetesClient client, String namespace,
List<PodMetrics> metrics, String metricName) {
List<PodMetrics> metrics, ToIntFunction<PodMetrics> costFunction) {
for (PodMetrics pm : metrics) {
int sessions = pm.metrics().getOrDefault(metricName, 0.0).intValue();
int cost = costFunction.applyAsInt(pm);
try {
client.pods().inNamespace(namespace).withName(pm.podName())
.edit(pod -> {
if (pod.getMetadata().getAnnotations() == null) {
pod.getMetadata().setAnnotations(new java.util.HashMap<>());
}
pod.getMetadata().getAnnotations()
.put("controller.kubernetes.io/pod-deletion-cost", String.valueOf(sessions));
.put("controller.kubernetes.io/pod-deletion-cost", String.valueOf(cost));
return pod;
});
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hive.kubernetes.operator.autoscaling;

import java.util.Map;

/**
* Interprets TezAM JMX Exporter metrics to determine whether an AM is busy executing
* a DAG or idle and safe to remove during scale-down.
* <p>
* Signal: {@code tez_am_dag_running} (SchedulerDagRunning gauge in LlapTaskSchedulerMetrics).
* Set to 1 when the AM receives its first task for a new DAG, cleared to 0 in dagComplete().
*/
public final class TezAmBusyMetrics {

public static final String METRIC_DAG_RUNNING = "tez_am_dag_running";

private TezAmBusyMetrics() {}

/**
* Returns true when the TezAM has active DAG work (running or pending tasks).
*/
public static boolean hasActiveDag(Map<String, Double> metrics) {
return metrics.getOrDefault(METRIC_DAG_RUNNING, 0.0) > 0;
}

/**

Check warning on line 43 in packaging/src/kubernetes/src/java/org/apache/hive/kubernetes/operator/autoscaling/TezAmBusyMetrics.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

First sentence should end with a period.

See more on https://sonarcloud.io/project/issues?id=apache_hive&issues=AZ762YjtDH3uf3D_gE6p&open=AZ762YjtDH3uf3D_gE6p&pullRequest=6561
* Returns the pod-deletion-cost for this AM:
* 0 if the AM is idle
* 1 if the AM is running a DAG
*/
public static int deletionCost(Map<String, Double> metrics) {
return hasActiveDag(metrics) ? 1 : 0;
}
}
Loading
Loading