From 37e0acae8a98719c56dd74362b61eb32a396866f Mon Sep 17 00:00:00 2001 From: architjainjain Date: Mon, 1 Jun 2026 15:21:15 +0530 Subject: [PATCH] HIVE-27126: queue level resource stats for YARN RM. --- .../logs/BeelineInPlaceUpdateStream.java | 5 + .../hadoop/hive/common/log/InPlaceUpdate.java | 7 + .../hive/common/log/ProgressMonitor.java | 7 + .../org/apache/hadoop/hive/conf/HiveConf.java | 6 + .../hive/common/log/TestInPlaceUpdate.java | 171 ++++++ .../hadoop/hive/ql/exec/tez/TezSession.java | 2 + .../ql/exec/tez/TezSessionPoolSession.java | 6 + .../hive/ql/exec/tez/TezSessionState.java | 31 ++ .../exec/tez/YarnQueueMetricsCollector.java | 391 ++++++++++++++ .../exec/tez/monitoring/RenderStrategy.java | 18 +- .../ql/exec/tez/monitoring/TezJobMonitor.java | 64 ++- .../tez/monitoring/TezProgressMonitor.java | 69 +++ .../hadoop/hive/ql/session/SessionState.java | 5 + .../tez/TestYarnQueueMetricsCollector.java | 504 ++++++++++++++++++ .../TestTezJobMonitorQueueMetrics.java | 235 ++++++++ .../TestTezProgressMonitorQueueMetrics.java | 441 +++++++++++++++ service-rpc/if/TCLIService.thrift | 1 + .../rpc/thrift/TProgressUpdateResp.java | 77 ++- .../hive/service/cli/JobProgressUpdate.java | 10 +- .../service/cli/thrift/ThriftCLIService.java | 17 +- 20 files changed, 2057 insertions(+), 10 deletions(-) create mode 100644 common/src/test/org/apache/hadoop/hive/common/log/TestInPlaceUpdate.java create mode 100644 ql/src/java/org/apache/hadoop/hive/ql/exec/tez/YarnQueueMetricsCollector.java create mode 100644 ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestYarnQueueMetricsCollector.java create mode 100644 ql/src/test/org/apache/hadoop/hive/ql/exec/tez/monitoring/TestTezJobMonitorQueueMetrics.java create mode 100644 ql/src/test/org/apache/hadoop/hive/ql/exec/tez/monitoring/TestTezProgressMonitorQueueMetrics.java diff --git a/beeline/src/java/org/apache/hive/beeline/logs/BeelineInPlaceUpdateStream.java b/beeline/src/java/org/apache/hive/beeline/logs/BeelineInPlaceUpdateStream.java index 3f74d9444ac7..27e969999753 100644 --- a/beeline/src/java/org/apache/hive/beeline/logs/BeelineInPlaceUpdateStream.java +++ b/beeline/src/java/org/apache/hive/beeline/logs/BeelineInPlaceUpdateStream.java @@ -96,5 +96,10 @@ public String executionStatus() { public double progressedPercentage() { return response.getProgressedPercentage(); } + + @Override + public String queueMetrics() { + return response.isSetQueueMetrics() ? response.getQueueMetrics() : ""; + } } } diff --git a/common/src/java/org/apache/hadoop/hive/common/log/InPlaceUpdate.java b/common/src/java/org/apache/hadoop/hive/common/log/InPlaceUpdate.java index fe55e7267d3e..9209d30b2c37 100644 --- a/common/src/java/org/apache/hadoop/hive/common/log/InPlaceUpdate.java +++ b/common/src/java/org/apache/hadoop/hive/common/log/InPlaceUpdate.java @@ -175,6 +175,13 @@ public void render(ProgressMonitor monitor) { reprintLine(SEPARATOR); reprintLineWithColorAsBold(footer, Ansi.Color.RED); reprintLine(SEPARATOR); + + // Display queue metrics if available (may be multi-line: queue name + metrics) + String queueMetrics = monitor.queueMetrics(); + if (queueMetrics != null && !queueMetrics.isEmpty()) { + reprintMultiLine(queueMetrics); + reprintLine(SEPARATOR); + } } diff --git a/common/src/java/org/apache/hadoop/hive/common/log/ProgressMonitor.java b/common/src/java/org/apache/hadoop/hive/common/log/ProgressMonitor.java index 67dd7ca02f1e..1781c32820c9 100644 --- a/common/src/java/org/apache/hadoop/hive/common/log/ProgressMonitor.java +++ b/common/src/java/org/apache/hadoop/hive/common/log/ProgressMonitor.java @@ -52,6 +52,11 @@ public String executionStatus() { public double progressedPercentage() { return 0; } + + @Override + public String queueMetrics() { + return ""; + } }; List headers(); @@ -65,4 +70,6 @@ public double progressedPercentage() { String executionStatus(); double progressedPercentage(); + + String queueMetrics(); } diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 4c238efb2a27..8ab493a0d6c0 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -3986,6 +3986,12 @@ public static enum ConfVars { HIVE_SERVER2_TEZ_QUEUE_ACCESS_CHECK("hive.server2.tez.queue.access.check", false, "Whether to check user access to explicitly specified YARN queues. " + "yarn.resourcemanager.webapp.address must be configured to use this."), + HIVE_TEZ_QUEUE_METRICS_REFRESH_INTERVAL("hive.tez.queue.metrics.refresh.interval", "0s", + new TimeValidator(TimeUnit.SECONDS), + "Interval for refreshing YARN queue resource metrics during Tez query execution. " + + "When set to a positive value (e.g. 10s), displays real-time memory, vCore, capacity " + + "and application metrics for the YARN queue being used. " + + "Set to 0 or negative to disable. Minimum effective value is 1 second."), HIVE_SERVER2_TEZ_SESSION_LIFETIME("hive.server2.tez.session.lifetime", "162h", new TimeValidator(TimeUnit.HOURS), "The lifetime of the Tez sessions launched by HS2 when default sessions are enabled.\n" + diff --git a/common/src/test/org/apache/hadoop/hive/common/log/TestInPlaceUpdate.java b/common/src/test/org/apache/hadoop/hive/common/log/TestInPlaceUpdate.java new file mode 100644 index 000000000000..60c65a07b60e --- /dev/null +++ b/common/src/test/org/apache/hadoop/hive/common/log/TestInPlaceUpdate.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.common.log; + +import org.apache.commons.lang3.StringUtils; +import org.junit.Test; + +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Unit tests for InPlaceUpdate + *

+ * We capture stdout via a ByteArrayOutputStream and inspect the rendered output. + */ +public class TestInPlaceUpdate { + + /** + * Minimal ProgressMonitor stub — returns empty headers/rows/footer. + */ + private static ProgressMonitor makeMonitor(String queueMetrics) { + return new ProgressMonitor() { + @Override + public List headers() { + return Arrays.asList("VERTICES", "MODE", "STATUS", "TOTAL", "COMPLETED", "RUNNING", "PENDING", "FAILED", + "KILLED"); + } + + @Override + public List> rows() { + return Collections.emptyList(); + } + + @Override + public String footerSummary() { + return "VERTICES: 00/00"; + } + + @Override + public long startTime() { + return System.currentTimeMillis(); + } + + @Override + public double progressedPercentage() { + return 0.0; + } + + @Override + public String executionStatus() { + return "RUNNING"; + } + + @Override + public String queueMetrics() { + return queueMetrics; + } + }; + } + + /** + * Expected separator: 94 dashes. + */ + private static final String SEPARATOR = new String(new char[InPlaceUpdate.MIN_TERMINAL_WIDTH]).replace("\0", "-"); + + /** + * When queueMetrics() returns a non-empty string, InPlaceUpdate.render() must + * print a separator line immediately after the metrics block — so total separators + * = 4 (VERTICES table) + 1 (after queue metrics) = 5. + */ + @Test + public void testSeparatorPrintedAfterQueueMetrics() { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + PrintStream ps = new PrintStream(baos); + InPlaceUpdate inPlace = new InPlaceUpdate(ps); + + String metrics = """ + QUEUE: default (1s ago) + MEMORY: 2.0/8.0 GB (25.00%) | VCORES: 4/16 (25.00%) + CAPACITY: 60.00% | ACTIVE_APPS: 1 | PENDING: 0"""; + + inPlace.render(makeMonitor(metrics)); + ps.flush(); + + String output = baos.toString(); + + // The metrics content should appear + assertTrue("Output should contain QUEUE: line", output.contains("QUEUE: default")); + assertTrue("Output should contain MEMORY: line", output.contains("MEMORY: 2.0/8.0 GB")); + assertTrue("Output should contain CAPACITY: line", output.contains("CAPACITY: 60.00%")); + + // The separator must appear AFTER the CAPACITY line in the rendered output + int capacityIdx = output.indexOf("CAPACITY:"); + int separatorIdx = output.indexOf(SEPARATOR, capacityIdx); + assertTrue( + "Separator must appear after CAPACITY: line (separatorIdx=" + separatorIdx + ", capacityIdx=" + capacityIdx + ")", + separatorIdx > capacityIdx); + + // Total separators = 4 (VERTICES table) + 1 (after queue metrics) = 5 + int count = StringUtils.countMatches(output, SEPARATOR); + assertEquals("With queue metrics, total separators should be 5 (4 VERTICES + 1 after metrics)", 5, count); + } + + /** + * When queueMetrics() returns an empty string, InPlaceUpdate.render() must NOT + * print an extra separator — so total remains 4 (the VERTICES table separators only). + */ + @Test + public void testNoExtraSeparatorWhenQueueMetricsEmpty() { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + PrintStream ps = new PrintStream(baos); + InPlaceUpdate inPlace = new InPlaceUpdate(ps); + + inPlace.render(makeMonitor("")); + ps.flush(); + + String output = baos.toString(); + + // VERTICES table renders 4 separators (before-header, after-header, before-footer, after-footer) + // With empty queueMetrics there should be exactly 4, not 5. + int count = StringUtils.countMatches(output, SEPARATOR); + assertEquals("With empty queue metrics, only 4 VERTICES-table separators should appear", 4, count); + } + + /** + * When queueMetrics() returns null, behaviour should be identical to empty. + */ + @Test + public void testNoExtraSeparatorWhenQueueMetricsNull() { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + PrintStream ps = new PrintStream(baos); + InPlaceUpdate inPlace = new InPlaceUpdate(ps); + + inPlace.render(makeMonitor(null)); + ps.flush(); + + String output = baos.toString(); + + int count = StringUtils.countMatches(output, SEPARATOR); + assertEquals("With null queue metrics, only 4 VERTICES-table separators should appear", 4, count); + } + + // ── Verify separator constant length ──────────────────────────────────────── + + @Test + public void testSeparatorLengthEqualsMinTerminalWidth() { + assertTrue("Separator should consist only of dashes", + SEPARATOR.matches("-{" + InPlaceUpdate.MIN_TERMINAL_WIDTH + "}")); + } +} \ No newline at end of file diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSession.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSession.java index 2f64ec41a58f..6e555ae4adc9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSession.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSession.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; import org.apache.hadoop.hive.ql.wm.WmContext; import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.tez.client.TezClient; import org.apache.tez.dag.api.DAG; import org.apache.tez.dag.api.TezException; @@ -89,6 +90,7 @@ public String toString() { HiveConf getConf(); TezClient getTezClient(); DAGClient submitDAG(DAG dag) throws TezException, IOException; + YarnClient getYarnClient(); boolean isOpen(); boolean isOpening(); boolean getDoAsEnabled(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java index a473b32e8879..a7aff13bdcc4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hive.ql.wm.WmContext; import org.apache.hadoop.hive.registry.impl.TezAmInstance; import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.tez.client.TezClient; import org.apache.tez.dag.api.DAG; import org.apache.tez.dag.api.TezException; @@ -339,6 +340,11 @@ public TezClient getTezClient() { return baseSession.getTezClient(); } + @Override + public YarnClient getYarnClient() { + return baseSession.getYarnClient(); + } + @Override public DAGClient submitDAG(DAG dag) throws TezException, IOException { return baseSession.submitDAG(dag); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java index b50c15cf9b63..bcca29be28e5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java @@ -73,6 +73,7 @@ import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceType; +import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.tez.client.TezClient; import org.apache.tez.common.TezUtils; @@ -119,6 +120,7 @@ public class TezSessionState implements TezSession { Path tezScratchDir; protected LocalResource appJarLr; private TezClient session; + private YarnClient yarnClient; private Future sessionFuture; /** Console used for user feedback during async session opening. */ private LogHelper console; @@ -752,6 +754,17 @@ public void close(boolean keepDagFilesDir) throws Exception { closeClient(asyncSession); } } + + // Stop YarnClient if it was initialized + if (yarnClient != null) { + try { + LOG.info("Stopping YarnClient for session: {}", sessionId); + yarnClient.stop(); + yarnClient = null; + } catch (Exception e) { + LOG.warn("Error stopping YarnClient for session {}: {}", sessionId, e.getMessage()); + } + } } finally { try { cleanupScratchDir(); @@ -797,6 +810,19 @@ public String getSessionId() { protected final void setTezClient(TezClient session) { this.session = session; + + // Initialize YarnClient for queue metrics collection + if (session != null && yarnClient == null) { + try { + yarnClient = YarnClient.createYarnClient(); + yarnClient.init(conf); + yarnClient.start(); + LOG.info("YarnClient initialized for session: {}", sessionId); + } catch (Exception e) { + LOG.warn("Failed to initialize YarnClient for metrics collection", e); + yarnClient = null; + } + } } @Override @@ -822,6 +848,11 @@ public TezClient getTezClient() { return session; } + @Override + public YarnClient getYarnClient() { + return yarnClient; + } + @Override public DAGClient submitDAG(DAG dag) throws TezException, IOException { return getTezClient().submitDAG(dag); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/YarnQueueMetricsCollector.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/YarnQueueMetricsCollector.java new file mode 100644 index 000000000000..7ee897be6f28 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/YarnQueueMetricsCollector.java @@ -0,0 +1,391 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.tez; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.hadoop.yarn.api.records.QueueInfo; +import org.apache.hadoop.yarn.api.records.QueueStatistics; +import org.apache.hadoop.yarn.client.api.YarnClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Random; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Collects YARN queue resource metrics in the background using a scheduled executor. + * Provides thread-safe access to the latest metrics snapshot. + *

+ * Implements {@link AutoCloseable} to support try-with-resources for automatic + * resource cleanup. The {@link #close()} method is equivalent to {@link #shutdown()}. + */ +public class YarnQueueMetricsCollector implements AutoCloseable { + private static final Logger LOG = LoggerFactory.getLogger(YarnQueueMetricsCollector.class); + private static final Random RANDOM = new Random(); + + private final YarnClient yarnClient; + private final String queueName; + private final ScheduledExecutorService executorService; + private final AtomicReference snapshotRef; + private final AtomicBoolean isShutdown; + + // Circuit breaker for handling repeated failures + private int consecutiveFailures = 0; + private static final int MAX_CONSECUTIVE_FAILURES = 5; + private static final int BACKOFF_THRESHOLD = 3; + + /** + * Creates a new metrics collector that immediately begins collecting queue metrics. + * + * @param yarnClient The YarnClient to use for querying queue info + * @param queueName The queue name to monitor + * @param refreshIntervalMs How often to refresh metrics in milliseconds + * @param queryId The query ID for thread naming + * @throws IllegalArgumentException if yarnClient or queueName is null + */ + public YarnQueueMetricsCollector(YarnClient yarnClient, String queueName, + long refreshIntervalMs, String queryId) { + if (yarnClient == null) { + throw new IllegalArgumentException("YarnClient cannot be null"); + } + if (queueName == null) { + throw new IllegalArgumentException("Queue name cannot be null"); + } + + this.yarnClient = yarnClient; + this.queueName = queueName; + this.snapshotRef = new AtomicReference<>(null); + this.isShutdown = new AtomicBoolean(false); + + // Create named daemon thread for metrics collection + this.executorService = Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder() + .setNameFormat("yarn-queue-metrics-collector-" + queryId) + .setDaemon(true) + .build() + ); + + try { + // Perform eager initial collection + collectMetrics(); + + // Add random jitter (0–20 % of refresh interval) to prevent thundering herd: + // when many queries start simultaneously they would otherwise all hit YARN RM + // at the same fixed intervals, causing load spikes. + // RANDOM.nextLong(1, 100) returns [1, 99], divide by 100 to get percentage, then multiply by 0.2 for 20% max. + long jitter = (long) (refreshIntervalMs * RANDOM.nextLong(1, 100) / 100.0 * 0.2); + long initialDelay = refreshIntervalMs + jitter; + + // Schedule periodic collection with jittered initial delay + executorService.scheduleWithFixedDelay( + () -> { + try { + collectMetrics(); + } catch (Exception e) { + LOG.error("Unexpected error in scheduled metrics collection for queue {}: {}", + queueName, e.getMessage(), e); + } + }, + initialDelay, + refreshIntervalMs, + TimeUnit.MILLISECONDS + ); + + LOG.info("Started YARN queue metrics collector for queue: {}, refresh interval: {}ms, initial delay: {}ms", + queueName, refreshIntervalMs, initialDelay); + } catch (IllegalArgumentException e) { + // scheduleWithFixedDelay rejects a zero or negative period; clean up and + // rethrow with context so the caller can log and skip gracefully. + executorService.shutdownNow(); + throw new IllegalArgumentException( + "Invalid refresh interval " + refreshIntervalMs + "ms for queue: " + queueName, e); + } catch (RuntimeException e) { + // Any other runtime failure during initialisation — clean up to prevent thread leak. + LOG.error("Failed to initialize metrics collector for queue {}, shutting down executor", + queueName, e); + executorService.shutdownNow(); + throw new IllegalStateException( + "Failed to initialize YARN queue metrics collector for queue: " + queueName, e); + } + } + + /** + * Checks if an exception is or was caused by an InterruptedException. + * + * @param e The exception to check + * @return true if the exception is an InterruptedException or has one as its cause + */ + private boolean isInterruptedException(Exception e) { + return e instanceof InterruptedException || e.getCause() instanceof InterruptedException; + } + + /** + * Collects queue metrics and updates the snapshot. + * Handles all exceptions gracefully by setting snapshot to null. + * Implements circuit breaker pattern to back off on repeated failures. + */ + private void collectMetrics() { + // Circuit breaker: Skip collection if too many consecutive failures + // This prevents hammering a struggling YARN ResourceManager + if (consecutiveFailures >= MAX_CONSECUTIVE_FAILURES) { + if (consecutiveFailures == MAX_CONSECUTIVE_FAILURES) { + LOG.warn("Queue metrics collection has failed {} times consecutively for queue {}. " + + "Temporarily reducing collection attempts to avoid overloading YARN RM. " + + "Will retry periodically.", MAX_CONSECUTIVE_FAILURES, queueName); + consecutiveFailures++; // Increment to avoid repeated logging + } + // Still attempt collection occasionally, but skip most attempts + if (RANDOM.nextDouble() > 0.1) { // Only try 10% of the time + return; + } + } + + try { + QueueInfo queueInfo = yarnClient.getQueueInfo(queueName); + if (queueInfo != null) { + QueueMetricsSnapshot snapshot = new QueueMetricsSnapshot(queueInfo); + snapshotRef.set(snapshot); + + // Success - reset circuit breaker + if (consecutiveFailures > 0) { + LOG.info("Queue metrics collection recovered for queue {} after {} failures", + queueName, consecutiveFailures); + consecutiveFailures = 0; + } + + LOG.debug("Collected queue metrics for {}: memory={}/{} GB, vCores={}/{}", + queueName, snapshot.memoryUsedGB, snapshot.memoryTotalGB, + snapshot.vCoresUsed, snapshot.vCoresTotal); + } else { + // Null QueueInfo indicates queue doesn't exist or is inaccessible (e.g., wrong name, + // deleted queue, or permission denied). This is likely a configuration issue, not a + // transient failure, so we don't increment consecutiveFailures to avoid triggering + // the circuit breaker. The snapshot remains null, but we'll keep trying each interval. + LOG.warn("QueueInfo is null for queue: {} - queue may not exist or is inaccessible. " + + "Check queue name configuration.", queueName); + snapshotRef.set(null); + } + } catch (Exception e) { + if (isInterruptedException(e)) { + LOG.debug("Metrics collection interrupted for queue: {}", queueName); + Thread.currentThread().interrupt(); + // Don't increment failure counter or set snapshot to null, preserve last good state on interrupt + return; + } + + // Increment failure counter + consecutiveFailures++; + + // Log warnings for first few failures, then reduce logging frequency + if (consecutiveFailures <= BACKOFF_THRESHOLD) { + LOG.warn("Failed to collect queue metrics for queue {} (failure {} of {}): {}", + queueName, consecutiveFailures, MAX_CONSECUTIVE_FAILURES, e.getMessage()); + } else if (consecutiveFailures == MAX_CONSECUTIVE_FAILURES) { + LOG.warn("Queue metrics collection failing repeatedly for queue {} ({} consecutive failures). " + + "This may indicate YARN RM is under heavy load or unreachable.", + queueName, consecutiveFailures); + } + + LOG.debug("Full exception for queue metrics collection failure", e); + snapshotRef.set(null); + } + } + + /** + * Gets the latest metrics snapshot in a thread-safe manner. + * + * @return The latest snapshot, or null if collection failed or not yet completed + */ + public QueueMetricsSnapshot getLatestSnapshot() { + return snapshotRef.get(); + } + + /** + * Gets the queue name being monitored. + * + * @return The queue name + */ + public String getQueueName() { + return queueName; + } + + /** + * Closes this collector, releasing all resources. Equivalent to calling {@link #shutdown()}. + * Enables use with try-with-resources statements. + */ + @Override + public void close() { + shutdown(); + } + + /** + * Shuts down the metrics collector gracefully. + * This method is idempotent and thread-safe. + */ + public synchronized void shutdown() { + if (isShutdown.getAndSet(true)) { + return; // Already shut down + } + + LOG.info("Shutting down YARN queue metrics collector for queue: {}", queueName); + + try { + executorService.shutdownNow(); + boolean terminated = executorService.awaitTermination(5, TimeUnit.SECONDS); + if (!terminated) { + LOG.warn("Metrics collector for queue {} did not terminate within timeout", queueName); + } + } catch (InterruptedException e) { + LOG.warn("Interrupted while shutting down metrics collector for queue: {}", queueName); + Thread.currentThread().interrupt(); + } + } + + /** + * Immutable snapshot of queue metrics at a point in time. + */ + public static final class QueueMetricsSnapshot { + private final float memoryUsedGB; + private final float memoryTotalGB; + private final int vCoresUsed; + private final int vCoresTotal; + private final float capacityPercentage; + private final float currentCapacityPercentage; + private final int runningApps; + private final int pendingApps; + private final int allocatedContainers; + private final int pendingContainers; + private final long collectionTimestamp; + + /** + * Creates a snapshot from QueueInfo. + * + * @param queueInfo The queue info to extract metrics from + * @throws IllegalArgumentException if queueInfo is null + */ + public QueueMetricsSnapshot(QueueInfo queueInfo) { + if (queueInfo == null) { + throw new IllegalArgumentException("QueueInfo cannot be null"); + } + + this.collectionTimestamp = System.currentTimeMillis(); + + // Extract queue statistics with null-safe handling + QueueStatistics stats = queueInfo.getQueueStatistics(); + if (stats != null) { + // Convert memory from MB to GB. Total = Allocated + Available + this.memoryUsedGB = stats.getAllocatedMemoryMB() / 1024.0f; + this.memoryTotalGB = (stats.getAllocatedMemoryMB() + stats.getAvailableMemoryMB()) / 1024.0f; + this.vCoresUsed = (int) stats.getAllocatedVCores(); + this.vCoresTotal = (int) (stats.getAllocatedVCores() + stats.getAvailableVCores()); + this.runningApps = (int) stats.getNumAppsRunning(); + this.pendingApps = (int) stats.getNumAppsPending(); + this.allocatedContainers = (int) stats.getAllocatedContainers(); + this.pendingContainers = (int) stats.getPendingContainers(); + } else { + LOG.debug("QueueStatistics is null for queue, using zero values"); + this.memoryUsedGB = 0; + this.memoryTotalGB = 0; + this.vCoresUsed = 0; + this.vCoresTotal = 0; + this.runningApps = 0; + this.pendingApps = 0; + this.allocatedContainers = 0; + this.pendingContainers = 0; + } + + // Get capacity percentages + this.capacityPercentage = queueInfo.getCapacity() * 100; + this.currentCapacityPercentage = queueInfo.getCurrentCapacity() * 100; + } + + public float getMemoryUsedGB() { + return memoryUsedGB; + } + + public float getMemoryTotalGB() { + return memoryTotalGB; + } + + public int getVCoresUsed() { + return vCoresUsed; + } + + public int getVCoresTotal() { + return vCoresTotal; + } + + public float getCapacityPercentage() { + return capacityPercentage; + } + + public float getCurrentCapacityPercentage() { + return currentCapacityPercentage; + } + + public int getRunningApps() { + return runningApps; + } + + public int getPendingApps() { + return pendingApps; + } + + public int getAllocatedContainers() { + return allocatedContainers; + } + + public int getPendingContainers() { + return pendingContainers; + } + + public long getCollectionTimestamp() { + return collectionTimestamp; + } + + /** + * Gets the memory usage percentage as a formatted string. + * + * @return Formatted percentage like "50.25%" or "N/A" if total is zero + */ + public String getMemoryPercentage() { + if (memoryTotalGB > 0) { + return String.format("%.2f%%", (memoryUsedGB / memoryTotalGB) * 100); + } + return "N/A"; + } + + /** + * Gets the vCore usage percentage as a formatted string. + * + * @return Formatted percentage like "75.00%" or "N/A" if total is zero + */ + public String getVCoresPercentage() { + if (vCoresTotal > 0) { + return String.format("%.2f%%", ((float) vCoresUsed / vCoresTotal) * 100); + } + return "N/A"; + } + } +} + diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/RenderStrategy.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/RenderStrategy.java index 132489e4906a..0e8314cdaf53 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/RenderStrategy.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/RenderStrategy.java @@ -63,8 +63,9 @@ private abstract static class BaseUpdateFunction implements UpdateFunction { @Override public void update(DAGStatus status, Map vertexProgressMap) { - renderProgress(monitor.progressMonitor(status, vertexProgressMap)); - String report = getReport(vertexProgressMap); + ProgressMonitor progressMonitor = monitor.progressMonitor(status, vertexProgressMap); + renderProgress(progressMonitor); + String report = getReport(vertexProgressMap, progressMonitor); if (showReport(report)) { renderReport(report); lastReport = report; @@ -83,8 +84,10 @@ private boolean showReport(String report) { Map 1: 0(+1)/1 Reducer 2: 0/1 Map 1: 1/1 Reducer 2: 0(+1)/1 Map 1: 1/1 Reducer 2: 1/1 + When YARN queue metrics are available, they are appended after the vertex progress, e.g.: + Map 1: 1/1 Reducer 2: 1/1 QUEUE: default | MEMORY: 1.5/5.4 GB (27.78% used) | VCORES: 3/7 (42.86% used) | CAPACITY: 46.30% (used), 60.00% (allocated) | APPS: 1 running, 0 pending | CONTAINERS: 4 allocated, 0 pending */ - private String getReport(Map progressMap) { + private String getReport(Map progressMap, ProgressMonitor progressMonitor) { StringWriter reportBuffer = new StringWriter(); SortedSet keys = new TreeSet<>(progressMap.keySet()); @@ -134,6 +137,15 @@ private String getReport(Map progressMap) { } } + // Append queue metrics if available (only when metricsCollector is active) + if (progressMonitor != null) { + String queueMetrics = progressMonitor.queueMetrics(); + if (queueMetrics != null && !queueMetrics.isEmpty()) { + // Replace newlines with spaces so the whole report stays on a single log line + reportBuffer.append("\t").append(queueMetrics.replace("\n", " | ")); + } + } + return reportBuffer.toString(); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java index 871258837af6..ca416cc25215 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java @@ -42,6 +42,8 @@ import org.apache.hadoop.hive.ql.exec.tez.TezSession; import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolManager; import org.apache.hadoop.hive.ql.exec.tez.Utils; +import org.apache.hadoop.hive.ql.exec.tez.YarnQueueMetricsCollector; +import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.hive.ql.session.SessionState; @@ -119,6 +121,7 @@ public static void initShutdownHook() { private final RenderStrategy.UpdateFunction updateFunction; // compile time tez counters private final TezCounters counters; + private YarnQueueMetricsCollector metricsCollector; public TezJobMonitor(TezSession session, List topSortedWorks, final DAGClient dagClient, HiveConf conf, DAG dag, Context ctx, final TezCounters counters, PerfLogger perfLogger) { @@ -134,6 +137,9 @@ public TezJobMonitor(TezSession session, List topSortedWorks, final DA this.counters = counters; this.shouldCollectSummaryString = conf.getBoolVar(HiveConf.ConfVars.HIVE_QUERY_HISTORY_ENABLED) && conf.getBoolVar(ConfVars.HIVE_QUERY_HISTORY_EXEC_SUMMARY_ENABLED); + + // Initialize YARN queue metrics collector if enabled + this.metricsCollector = initializeMetricsCollector(); } private RenderStrategy.UpdateFunction updateFunction() { @@ -144,6 +150,52 @@ private RenderStrategy.UpdateFunction updateFunction() { : new RenderStrategy.LogToFileFunction(this, perfLogger); } + private YarnQueueMetricsCollector initializeMetricsCollector() { + // Get refresh interval - controls whether the feature is enabled. + // interval <= 0 means disabled (default is 0s = disabled). + long refreshInterval = HiveConf.getTimeVar(hiveConf, + ConfVars.HIVE_TEZ_QUEUE_METRICS_REFRESH_INTERVAL, TimeUnit.MILLISECONDS); + + if (refreshInterval <= 0) { + LOG.debug("Queue metrics collection disabled (refresh interval: {}ms)", refreshInterval); + return null; + } + + try { + // Get YarnClient from session + YarnClient yarnClient = session.getYarnClient(); + if (yarnClient == null) { + LOG.warn("YarnClient not available, skipping queue metrics collection"); + return null; + } + + // Get queue name, default to "default" if not specified + String queueName = session.getQueueName(); + if (queueName == null || queueName.trim().isEmpty()) { + queueName = "default"; + LOG.info("Queue name not specified. For metrics monitoring, using 'default' as queue name"); + } + + // Validate minimum refresh interval (at least 1 second) + if (refreshInterval < 1000) { + LOG.warn("Queue metrics refresh interval {}ms is less than minimum 1000ms, using 1000ms", + refreshInterval); + refreshInterval = 1000; + } + + // Get query ID from DAG name + String queryId = dag.getName(); + + LOG.info("Initializing YARN queue metrics collector for queue: {}, refresh interval: {}ms", + queueName, refreshInterval); + + return new YarnQueueMetricsCollector(yarnClient, queueName, refreshInterval, queryId); + } catch (Exception e) { + LOG.warn("Unable to initialize YARN queue metrics collector", e); + return null; + } + } + private boolean isProfilingEnabled() { return HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.TEZ_EXEC_SUMMARY) || Utilities.isPerfOrAboveLogging(hiveConf); @@ -312,6 +364,16 @@ public int monitorExecution() { synchronized (shutdownList) { shutdownList.remove(dagClient); } + + // Shutdown metrics collector if it was initialized + if (metricsCollector != null) { + try { + metricsCollector.shutdown(); + } catch (Exception e) { + LOG.warn("Error shutting down queue metrics collector", e); + } + } + break; } } @@ -530,7 +592,7 @@ public String getDiagnostics() { ProgressMonitor progressMonitor(DAGStatus status, Map progressMap) { try { return new TezProgressMonitor(dagClient, status, topSortedWorks, progressMap, console, - executionStartTime); + executionStartTime, metricsCollector); } catch (IOException | TezException e) { console.printInfo("Getting Progress Information: " + e.getMessage() + " stack trace: " + ExceptionUtils.getStackTrace(e)); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezProgressMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezProgressMonitor.java index 735442d2d1c8..e3f3b37806c0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezProgressMonitor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezProgressMonitor.java @@ -17,7 +17,10 @@ */ package org.apache.hadoop.hive.ql.exec.tez.monitoring; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.hive.common.log.InPlaceUpdate; import org.apache.hadoop.hive.common.log.ProgressMonitor; +import org.apache.hadoop.hive.ql.exec.tez.YarnQueueMetricsCollector; import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.tez.dag.api.TezException; @@ -40,10 +43,12 @@ public class TezProgressMonitor implements ProgressMonitor { private static final int COLUMN_1_WIDTH = 16; + private static final String QUEUE_UNAVAILABLE_MSG = "QUEUE: unavailable"; private final List topSortedWork; private final SessionState.LogHelper console; private final long executionStartTime; private final DAGStatus status; + private final YarnQueueMetricsCollector metricsCollector; Map vertexStatusMap = new HashMap<>(); Map progressCountsMap = new HashMap<>(); @@ -54,10 +59,18 @@ public class TezProgressMonitor implements ProgressMonitor { TezProgressMonitor(DAGClient dagClient, DAGStatus status, List topSortedWork, Map progressMap, SessionState.LogHelper console, long executionStartTime) throws IOException, TezException { + this(dagClient, status, topSortedWork, progressMap, console, executionStartTime, null); + } + + TezProgressMonitor(DAGClient dagClient, DAGStatus status, List topSortedWork, + Map progressMap, SessionState.LogHelper console, long executionStartTime, + YarnQueueMetricsCollector metricsCollector) + throws IOException, TezException { this.status = status; this.topSortedWork = topSortedWork; this.console = console; this.executionStartTime = executionStartTime; + this.metricsCollector = metricsCollector; for (Map.Entry entry : progressMap.entrySet()) { String vertexName = entry.getKey(); progressCountsMap.put(vertexName, new VertexProgress(entry.getValue(), status.getState())); @@ -327,6 +340,62 @@ public int hashCode() { } } + @Override + public String queueMetrics() { + if (metricsCollector == null) { + return ""; + } + + try { + YarnQueueMetricsCollector.QueueMetricsSnapshot snapshot = metricsCollector.getLatestSnapshot(); + if (snapshot == null) { + return QUEUE_UNAVAILABLE_MSG; + } + + // Truncate queue name if too long (leave room for "QUEUE: " prefix) + String displayQueueName = metricsCollector.getQueueName(); + int maxQueueNameLength = InPlaceUpdate.MIN_TERMINAL_WIDTH - "QUEUE: ".length(); + if (displayQueueName.length() > maxQueueNameLength) { + displayQueueName = StringUtils.abbreviate(displayQueueName, maxQueueNameLength); + } + + // Line 1: Queue name + String lineQueueHeader = "QUEUE: " + displayQueueName; + + // Line 2: Memory + VCores (resource usage) + String lineQueueResources = String.format( + "MEMORY: %.1f/%.1f GB (%s used) | VCORES: %d/%d (%s used)", + snapshot.getMemoryUsedGB(), + snapshot.getMemoryTotalGB(), + snapshot.getMemoryPercentage(), + snapshot.getVCoresUsed(), + snapshot.getVCoresTotal(), + snapshot.getVCoresPercentage() + ); + + // Line 3: Capacity (current usage and allocated capacity) + String lineCapacity = String.format( + "CAPACITY: %.2f%% (used), %.2f%% (allocated)", + snapshot.getCurrentCapacityPercentage(), + snapshot.getCapacityPercentage() + ); + + // Line 4: Apps and Containers + String lineAppsAndContainers = String.format( + "APPS: %d running, %d pending | CONTAINERS: %d allocated, %d pending", + snapshot.getRunningApps(), + snapshot.getPendingApps(), + snapshot.getAllocatedContainers(), + snapshot.getPendingContainers() + ); + + return lineQueueHeader + "\n" + lineQueueResources + "\n" + lineCapacity + "\n" + lineAppsAndContainers; + } catch (Exception e) { + console.printInfo("Error formatting queue metrics: " + e.getMessage()); + return QUEUE_UNAVAILABLE_MSG; + } + } + public DAGStatus getStatus() { return status; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java index 53ee29743449..8b8836ec6d2a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java @@ -2251,6 +2251,11 @@ public String executionStatus() { public double progressedPercentage() { return percentage; } + + @Override + public String queueMetrics() { + return ""; + } }; } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestYarnQueueMetricsCollector.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestYarnQueueMetricsCollector.java new file mode 100644 index 000000000000..898467f96ea0 --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestYarnQueueMetricsCollector.java @@ -0,0 +1,504 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.tez; + +import org.apache.hadoop.yarn.api.records.QueueInfo; +import org.apache.hadoop.yarn.api.records.QueueStatistics; +import org.apache.hadoop.yarn.client.api.YarnClient; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mockingDetails; +import static org.mockito.Mockito.when; + + +/** + * Test cases for YarnQueueMetricsCollector. + */ +public class TestYarnQueueMetricsCollector { + + @Mock + private YarnClient mockYarnClient; + + @Mock + private QueueInfo mockQueueInfo; + + @Mock + private QueueStatistics mockQueueStats; + + private AutoCloseable closeable; + + private static final long WAIT_TIMEOUT_MS = 5000; + + @Before + public void setUp() { + closeable = MockitoAnnotations.openMocks(this); + } + + @After + public void tearDown() throws Exception { + if (closeable != null) { + closeable.close(); + } + } + + /** + * Waits for a snapshot to be available (non-null). + */ + private YarnQueueMetricsCollector.QueueMetricsSnapshot waitForSnapshot( + YarnQueueMetricsCollector collector, long timeoutMs) { + long startTime = System.currentTimeMillis(); + YarnQueueMetricsCollector.QueueMetricsSnapshot snapshot; + while ((snapshot = collector.getLatestSnapshot()) == null) { + if (System.currentTimeMillis() - startTime > timeoutMs) { + fail("Snapshot not available after " + timeoutMs + "ms"); + } + } + return snapshot; + } + + /** + * Waits for a specific number of invocations with timeout. + */ + private void waitForInvocationCount(Object mock, int minCount, long timeoutMs) { + long startTime = System.currentTimeMillis(); + while (mockingDetails(mock).getInvocations().size() < minCount) { + if (System.currentTimeMillis() - startTime > timeoutMs) { + return; // Don't fail, just return - test will check the count + } + } + } + + /** + * Helper method that configures mock objects with standard happy-path values + * for queue metrics collection tests. Tests that need specific values can + * override individual mocks after calling this method. + *

+ * Default values: + *

+ */ + private void setupHappyPathMocks() throws Exception { + when(mockQueueStats.getAllocatedMemoryMB()).thenReturn(1024L); + when(mockQueueStats.getAvailableMemoryMB()).thenReturn(1024L); + when(mockQueueStats.getAllocatedVCores()).thenReturn(4L); + when(mockQueueStats.getAvailableVCores()).thenReturn(4L); + when(mockQueueStats.getNumAppsRunning()).thenReturn(1L); + when(mockQueueStats.getNumAppsPending()).thenReturn(0L); + when(mockQueueStats.getAllocatedContainers()).thenReturn(2L); + when(mockQueueStats.getPendingContainers()).thenReturn(0L); + when(mockQueueInfo.getQueueStatistics()).thenReturn(mockQueueStats); + when(mockQueueInfo.getCapacity()).thenReturn(0.5f); + when(mockQueueInfo.getCurrentCapacity()).thenReturn(0.25f); + when(mockYarnClient.getQueueInfo(anyString())).thenReturn(mockQueueInfo); + } + + @Test(expected = IllegalArgumentException.class) + public void testConstructorWithNullYarnClient() { + // Constructor throws before any resource is acquired - no try-with-resources needed + new YarnQueueMetricsCollector(null, "default", 1000, "query-1"); + } + + @Test(expected = IllegalArgumentException.class) + public void testConstructorWithNullQueueName() { + // Constructor throws before any resource is acquired - no try-with-resources needed + new YarnQueueMetricsCollector(mockYarnClient, null, 1000, "query-1"); + } + + @Test + public void testSuccessfulMetricsCollection() throws Exception { + setupHappyPathMocks(); + when(mockYarnClient.getQueueInfo("default")).thenReturn(mockQueueInfo); + + try (YarnQueueMetricsCollector collector = new YarnQueueMetricsCollector( + mockYarnClient, "default", 10000, "test-query-1")) { + YarnQueueMetricsCollector.QueueMetricsSnapshot snapshot = waitForSnapshot(collector, WAIT_TIMEOUT_MS); + + assertNotNull("Snapshot should not be null", snapshot); + assertEquals("Memory used should be 1GB", 1.0f, snapshot.getMemoryUsedGB(), 0.1f); + assertEquals("Memory total should be 2GB (1+1)", 2.0f, snapshot.getMemoryTotalGB(), 0.1f); + assertEquals("VCores used should be 4", 4, snapshot.getVCoresUsed()); + assertEquals("VCores total should be 8 (4+4)", 8, snapshot.getVCoresTotal()); + assertEquals("Running apps should be 1", 1, snapshot.getRunningApps()); + assertEquals("Pending apps should be 0", 0, snapshot.getPendingApps()); + assertEquals("Allocated containers should be 2", 2, snapshot.getAllocatedContainers()); + assertEquals("Pending containers should be 0", 0, snapshot.getPendingContainers()); + assertEquals("Capacity should be 50%", 50.0f, snapshot.getCapacityPercentage(), 0.1f); + assertEquals("Current capacity should be 25%", 25.0f, snapshot.getCurrentCapacityPercentage(), 0.1f); + assertEquals("Memory percentage", "50.00%", snapshot.getMemoryPercentage()); + assertEquals("VCores percentage", "50.00%", snapshot.getVCoresPercentage()); + } + } + + @Test + public void testMetricsCollectionWithNullQueueInfo() throws Exception { + when(mockYarnClient.getQueueInfo("nonexistent")).thenReturn(null); + + try (YarnQueueMetricsCollector collector = new YarnQueueMetricsCollector( + mockYarnClient, "nonexistent", 10000, "test-query-2")) { + // Constructor performs eager collection, snapshot should already be null + // (No need to wait since null QueueInfo means immediate null snapshot) + assertNull("Snapshot should be null for nonexistent queue", + collector.getLatestSnapshot()); + } + } + + @Test + public void testMetricsCollectionWithNullQueueStatistics() throws Exception { + when(mockQueueInfo.getQueueStatistics()).thenReturn(null); + when(mockQueueInfo.getCapacity()).thenReturn(0.5f); + when(mockQueueInfo.getCurrentCapacity()).thenReturn(0.0f); + when(mockYarnClient.getQueueInfo("default")).thenReturn(mockQueueInfo); + + try (YarnQueueMetricsCollector collector = new YarnQueueMetricsCollector( + mockYarnClient, "default", 10000, "test-query-3")) { + YarnQueueMetricsCollector.QueueMetricsSnapshot snapshot = waitForSnapshot(collector, WAIT_TIMEOUT_MS); + + assertNotNull("Snapshot should not be null", snapshot); + assertEquals("Memory used should be 0", 0.0f, snapshot.getMemoryUsedGB(), 0.01f); + assertEquals("Memory total should be 0", 0.0f, snapshot.getMemoryTotalGB(), 0.01f); + assertEquals("VCores used should be 0", 0, snapshot.getVCoresUsed()); + assertEquals("VCores total should be 0", 0, snapshot.getVCoresTotal()); + assertEquals("Capacity should still be 50%", 50.0f, snapshot.getCapacityPercentage(), 0.1f); + assertEquals("Current capacity should be 0%", 0.0f, snapshot.getCurrentCapacityPercentage(), 0.1f); + } + } + + @Test + public void testPercentageCalculationWithZeroTotal() { + // Setup with zero totals + when(mockQueueStats.getAllocatedMemoryMB()).thenReturn(0L); + when(mockQueueStats.getAvailableMemoryMB()).thenReturn(0L); + when(mockQueueStats.getAllocatedVCores()).thenReturn(0L); + when(mockQueueStats.getAvailableVCores()).thenReturn(0L); + when(mockQueueStats.getNumAppsRunning()).thenReturn(0L); + when(mockQueueStats.getNumAppsPending()).thenReturn(0L); + when(mockQueueStats.getAllocatedContainers()).thenReturn(0L); + when(mockQueueStats.getPendingContainers()).thenReturn(0L); + when(mockQueueInfo.getQueueStatistics()).thenReturn(mockQueueStats); + when(mockQueueInfo.getCapacity()).thenReturn(0.0f); + when(mockQueueInfo.getCurrentCapacity()).thenReturn(0.0f); + + YarnQueueMetricsCollector.QueueMetricsSnapshot snapshot = + new YarnQueueMetricsCollector.QueueMetricsSnapshot(mockQueueInfo); + + // Should return "N/A" for percentages when total is zero + assertEquals("Memory percentage should be N/A", "N/A", snapshot.getMemoryPercentage()); + assertEquals("VCores percentage should be N/A", "N/A", snapshot.getVCoresPercentage()); + } + + @Test + public void testShutdownIdempotency() throws Exception { + when(mockYarnClient.getQueueInfo("default")).thenReturn(mockQueueInfo); + + // Use try-with-resources; also call shutdown() explicitly inside to + // verify that repeated close()/shutdown() calls are safe (idempotent). + try (YarnQueueMetricsCollector collector = new YarnQueueMetricsCollector( + mockYarnClient, "default", 10000, "test-query-4")) { + collector.shutdown(); // explicit close 1 + collector.shutdown(); // explicit close 2 + // implicit close 3 happens at end of try block + assertTrue("Multiple shutdowns should be safe", true); + } + } + + @Test + public void testExceptionDuringCollection() throws Exception { + when(mockYarnClient.getQueueInfo("default")) + .thenThrow(new RuntimeException("RM unavailable")); + + try (YarnQueueMetricsCollector collector = new YarnQueueMetricsCollector( + mockYarnClient, "default", 10000, "test-query-5")) { + assertNull("Snapshot should be null after exception", + collector.getLatestSnapshot()); + } + } + + @Test + public void testQueueNameRetrieval() throws Exception { + when(mockYarnClient.getQueueInfo(anyString())).thenReturn(mockQueueInfo); + when(mockQueueInfo.getQueueStatistics()).thenReturn(null); + when(mockQueueInfo.getCapacity()).thenReturn(0.5f); + + try (YarnQueueMetricsCollector collector = new YarnQueueMetricsCollector( + mockYarnClient, "production", 10000, "test-query-6")) { + assertEquals("Queue name should match", "production", collector.getQueueName()); + } + } + + @Test + public void testMemoryAndVCoreCalculation() { + // Test with specific values to verify calculation + when(mockQueueStats.getAllocatedMemoryMB()).thenReturn(5120L); // 5GB used + when(mockQueueStats.getAvailableMemoryMB()).thenReturn(15360L); // 15GB available + when(mockQueueStats.getAllocatedVCores()).thenReturn(50L); + when(mockQueueStats.getAvailableVCores()).thenReturn(150L); + when(mockQueueStats.getNumAppsRunning()).thenReturn(3L); + when(mockQueueStats.getNumAppsPending()).thenReturn(2L); + when(mockQueueStats.getAllocatedContainers()).thenReturn(10L); + when(mockQueueStats.getPendingContainers()).thenReturn(7L); + when(mockQueueInfo.getQueueStatistics()).thenReturn(mockQueueStats); + when(mockQueueInfo.getCapacity()).thenReturn(0.2f); // 20% + when(mockQueueInfo.getCurrentCapacity()).thenReturn(0.05f); // 5% + + YarnQueueMetricsCollector.QueueMetricsSnapshot snapshot = + new YarnQueueMetricsCollector.QueueMetricsSnapshot(mockQueueInfo); + + // Total = Used + Available + assertEquals("Memory used", 5.0f, snapshot.getMemoryUsedGB(), 0.01f); + assertEquals("Memory total", 20.0f, snapshot.getMemoryTotalGB(), 0.01f); // 5+15 + assertEquals("Memory percentage", "25.00%", snapshot.getMemoryPercentage()); // 5/20 + + assertEquals("VCores used", 50, snapshot.getVCoresUsed()); + assertEquals("VCores total", 200, snapshot.getVCoresTotal()); // 50+150 + assertEquals("VCores percentage", "25.00%", snapshot.getVCoresPercentage()); // 50/200 + + assertEquals("Running apps", 3, snapshot.getRunningApps()); + assertEquals("Pending apps", 2, snapshot.getPendingApps()); + assertEquals("Allocated containers", 10, snapshot.getAllocatedContainers()); + assertEquals("Pending containers", 7, snapshot.getPendingContainers()); + assertEquals("Capacity", 20.0f, snapshot.getCapacityPercentage(), 0.01f); + assertEquals("Current capacity", 5.0f, snapshot.getCurrentCapacityPercentage(), 0.01f); + } + + @Test(expected = IllegalArgumentException.class) + public void testQueueMetricsSnapshotWithNullQueueInfo() { + new YarnQueueMetricsCollector.QueueMetricsSnapshot(null); + } + + // ------------------------------------------------------------------------- + // Tests for Issue #1: Jitter on initial delay (Thundering Herd prevention) + // ------------------------------------------------------------------------- + // Note: Jitter is implicitly tested by all tests that successfully create collectors. + // Explicit jitter distribution testing would require reflection to access private + // scheduling details, which is fragile and not worth the maintenance cost. + + @Test + public void testExecutorCleanupOnInitializationFailure() throws Exception { + when(mockYarnClient.getQueueInfo(anyString())) + .thenThrow(new RuntimeException("Simulated RM failure during init")); + + // Constructor wraps collection error - first call fails but shouldn't throw from constructor + // (collectMetrics swallows exceptions). This test verifies the try-catch guards are correct. + try (YarnQueueMetricsCollector collector = new YarnQueueMetricsCollector( + mockYarnClient, "default", 10000, "init-fail-query")) { + assertNull("Snapshot should be null after init failure", + collector.getLatestSnapshot()); + } + } + + // ------------------------------------------------------------------------- + // Tests for Issue #2: Circuit breaker for repeated failures + // ------------------------------------------------------------------------- + @Test + public void testCircuitBreakerActivatesAfterMaxFailures() throws Exception { + when(mockYarnClient.getQueueInfo(anyString())) + .thenThrow(new RuntimeException("YARN RM unavailable")); + + try (YarnQueueMetricsCollector collector = new YarnQueueMetricsCollector( + mockYarnClient, "default", 50, "circuit-breaker-query-1")) { + waitForInvocationCount(mockYarnClient, 6, 1000); + + assertNull("Snapshot should be null when circuit breaker active", + collector.getLatestSnapshot()); + + int callCount = mockingDetails(mockYarnClient).getInvocations().size(); + assertTrue("Circuit breaker should reduce calls (got " + callCount + ")", + callCount < 12); + } + } + + @Test + public void testCircuitBreakerResetsOnSuccess() throws Exception { + // Set up QueueStats and QueueInfo mocks first (without touching mockYarnClient.getQueueInfo) + when(mockQueueStats.getAllocatedMemoryMB()).thenReturn(4096L); + when(mockQueueStats.getAvailableMemoryMB()).thenReturn(4096L); + when(mockQueueStats.getAllocatedVCores()).thenReturn(50L); + when(mockQueueStats.getAvailableVCores()).thenReturn(50L); + when(mockQueueStats.getNumAppsRunning()).thenReturn(2L); + when(mockQueueStats.getNumAppsPending()).thenReturn(1L); + when(mockQueueStats.getAllocatedContainers()).thenReturn(5L); + when(mockQueueStats.getPendingContainers()).thenReturn(5L); + when(mockQueueInfo.getQueueStatistics()).thenReturn(mockQueueStats); + when(mockQueueInfo.getCapacity()).thenReturn(0.3f); + when(mockQueueInfo.getCurrentCapacity()).thenReturn(0.2f); + + // Set up getQueueInfo to fail 5 times then succeed — this must be last to avoid being overwritten + when(mockYarnClient.getQueueInfo(anyString())) + .thenThrow(new RuntimeException("Temporary RM failure")) + .thenThrow(new RuntimeException("Temporary RM failure")) + .thenThrow(new RuntimeException("Temporary RM failure")) + .thenThrow(new RuntimeException("Temporary RM failure")) + .thenThrow(new RuntimeException("Temporary RM failure")) + .thenReturn(mockQueueInfo); + + try (YarnQueueMetricsCollector collector = new YarnQueueMetricsCollector( + mockYarnClient, "default", 30, "circuit-breaker-recovery-query")) { + waitForInvocationCount(mockYarnClient, 3, 200); + assertNull("Snapshot should be null after circuit breaker activates", + collector.getLatestSnapshot()); + + YarnQueueMetricsCollector.QueueMetricsSnapshot snapshot = waitForSnapshot(collector, 2000); + assertNotNull("Snapshot should be populated after circuit breaker recovery", snapshot); + assertEquals("Memory used should be 4GB", 4.0f, snapshot.getMemoryUsedGB(), 0.1f); + } + } + + /** + * Verifies that the circuit breaker mechanism does not interfere with normal + * successful metrics collection. This test documents expected behavior: when + * YarnClient consistently returns valid data, the circuit breaker should never + * activate and collection should work without interruption. + *

+ * This test is part of the circuit breaker test suite (alongside failure and + * recovery scenarios) and specifically validates the happy path behavior. While + * it has similar assertions to testSuccessfulMetricsCollection(), its purpose + * is to document circuit breaker behavior in the happy path, not general + * functionality. + * + * @see #testSuccessfulMetricsCollection() for general happy path testing + * @see #testCircuitBreakerActivatesAfterMaxFailures() for failure scenarios + * @see #testCircuitBreakerResetsOnSuccess() for recovery scenarios + */ + @Test + public void testCircuitBreakerDoesNotAffectSuccessfulCollection() throws Exception { + setupHappyPathMocks(); + when(mockQueueStats.getAllocatedMemoryMB()).thenReturn(2048L); + when(mockQueueStats.getAvailableMemoryMB()).thenReturn(2048L); + when(mockQueueStats.getAllocatedVCores()).thenReturn(20L); + when(mockQueueStats.getAvailableVCores()).thenReturn(20L); + when(mockQueueInfo.getCapacity()).thenReturn(0.1f); + + try (YarnQueueMetricsCollector collector = new YarnQueueMetricsCollector( + mockYarnClient, "default", 10000, "no-failures-query")) { + YarnQueueMetricsCollector.QueueMetricsSnapshot snapshot = waitForSnapshot(collector, WAIT_TIMEOUT_MS); + + assertNotNull("Snapshot should be available with no failures", snapshot); + assertEquals("Memory used should be 2GB", 2.0f, snapshot.getMemoryUsedGB(), 0.1f); + assertEquals("VCores used should be 20", 20, snapshot.getVCoresUsed()); + } + } + + @Test + public void testNullQueueInfoDoesNotTriggerCircuitBreaker() throws Exception { + when(mockYarnClient.getQueueInfo(anyString())).thenReturn(null); + + try (YarnQueueMetricsCollector collector = new YarnQueueMetricsCollector( + mockYarnClient, "nonexistent-queue", 50, "null-queueinfo-query")) { + waitForInvocationCount(mockYarnClient, 8, 800); + + assertNull("Snapshot should remain null for null QueueInfo", + collector.getLatestSnapshot()); + + int callCount = mockingDetails(mockYarnClient).getInvocations().size(); + assertTrue("Null QueueInfo should NOT trigger circuit breaker (got " + callCount + " calls)", + callCount >= 8); + } + } + + @Test + public void testSnapshotCollectionTimestampIsRecent() throws Exception { + setupHappyPathMocks(); + + long beforeCreate = System.currentTimeMillis(); + try (YarnQueueMetricsCollector collector = new YarnQueueMetricsCollector( + mockYarnClient, "default", 10000, "timestamp-test")) { + YarnQueueMetricsCollector.QueueMetricsSnapshot snapshot = + waitForSnapshot(collector, WAIT_TIMEOUT_MS); + long afterCollect = System.currentTimeMillis(); + + assertNotNull("Snapshot should not be null", snapshot); + assertTrue("Timestamp should be >= creation time", + snapshot.getCollectionTimestamp() >= beforeCreate); + assertTrue("Timestamp should be <= current time", + snapshot.getCollectionTimestamp() <= afterCollect); + assertTrue("Timestamp should not be zero", snapshot.getCollectionTimestamp() > 0); + } + } + + @Test + public void testRefreshIntervalRespected() throws Exception { + setupHappyPathMocks(); + when(mockQueueStats.getAllocatedMemoryMB()).thenReturn(2048L); + when(mockQueueStats.getAvailableMemoryMB()).thenReturn(2048L); + when(mockQueueStats.getAllocatedVCores()).thenReturn(8L); + when(mockQueueStats.getAvailableVCores()).thenReturn(8L); + when(mockQueueStats.getNumAppsRunning()).thenReturn(2L); + when(mockQueueInfo.getCapacity()).thenReturn(0.6f); + + long intervalMs = 100; + try (YarnQueueMetricsCollector collector = new YarnQueueMetricsCollector( + mockYarnClient, "default", intervalMs, "refresh-interval-test")) { + waitForSnapshot(collector, WAIT_TIMEOUT_MS); + int callsAfterFirst = mockingDetails(mockYarnClient).getInvocations().size(); + + long toleranceMs = intervalMs + (long)(intervalMs * 0.2) + 300; + waitForInvocationCount(mockYarnClient, callsAfterFirst + 1, toleranceMs); + + int callsAfterWait = mockingDetails(mockYarnClient).getInvocations().size(); + assertTrue("At least one refresh should have occurred within interval + tolerance", + callsAfterWait > callsAfterFirst); + } + } + + @Test + public void testZeroRefreshIntervalIsRejected() throws Exception { + // scheduleWithFixedDelay requires a strictly positive period; zero throws + // IllegalArgumentException directly (not wrapped in a generic RuntimeException). + when(mockQueueInfo.getQueueStatistics()).thenReturn(null); + when(mockQueueInfo.getCapacity()).thenReturn(0.5f); + when(mockYarnClient.getQueueInfo(anyString())).thenReturn(mockQueueInfo); + + assertThrows(IllegalArgumentException.class, () -> + new YarnQueueMetricsCollector(mockYarnClient, "default", 0, "zero-interval-test")); + } + + @Test + public void testNegativeRefreshIntervalIsRejected() throws Exception { + // Negative interval is also rejected by scheduleWithFixedDelay and thrown directly + // as IllegalArgumentException (not wrapped in a generic RuntimeException). + when(mockQueueInfo.getQueueStatistics()).thenReturn(null); + when(mockQueueInfo.getCapacity()).thenReturn(0.5f); + when(mockYarnClient.getQueueInfo(anyString())).thenReturn(mockQueueInfo); + + assertThrows(IllegalArgumentException.class, () -> + new YarnQueueMetricsCollector(mockYarnClient, "default", -1000, "negative-interval-test")); + } +} diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/monitoring/TestTezJobMonitorQueueMetrics.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/monitoring/TestTezJobMonitorQueueMetrics.java new file mode 100644 index 000000000000..f2289261b5c8 --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/monitoring/TestTezJobMonitorQueueMetrics.java @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.tez.monitoring; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConfForTest; +import org.apache.hadoop.hive.ql.Context; +import org.apache.hadoop.hive.ql.exec.tez.TezSession; +import org.apache.hadoop.hive.ql.log.PerfLogger; +import org.apache.hadoop.hive.ql.plan.BaseWork; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.tez.common.counters.TezCounters; +import org.apache.tez.dag.api.DAG; +import org.apache.tez.dag.api.client.DAGClient; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertNotNull; +import static org.mockito.Mockito.anyString; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * Test cases for TezJobMonitor queue metrics initialization. + */ +public class TestTezJobMonitorQueueMetrics { + + @Mock + private TezSession mockSession; + @Mock + private DAGClient mockDagClient; + @Mock + private DAG mockDag; + @Mock + private Context mockContext; + @Mock + private PerfLogger mockPerfLogger; + @Mock + private YarnClient mockYarnClient; + @Mock + private TezCounters mockCounters; + + private HiveConf hiveConf; + private List topSortedWorks; + private SessionState sessionState; + + @Before + public void setUp() { + MockitoAnnotations.openMocks(this); + hiveConf = new HiveConfForTest(TestTezJobMonitorQueueMetrics.class); + hiveConf.set("hive.security.authorization.manager", + "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdConfOnlyAuthorizerFactory"); + sessionState = SessionState.start(hiveConf); + topSortedWorks = new ArrayList<>(); + when(mockDag.getName()).thenReturn("test-dag-1"); + } + + @After + public void tearDown() throws Exception { + if (sessionState != null) { + sessionState.close(); + } + } + + @Test + public void testMetricsCollectorDisabledByDefault() throws Exception { + when(mockSession.getYarnClient()).thenReturn(mockYarnClient); + when(mockSession.getQueueName()).thenReturn("default"); + + TezJobMonitor monitor = + new TezJobMonitor(mockSession, topSortedWorks, mockDagClient, hiveConf, mockDag, mockContext, mockCounters, + mockPerfLogger); + + assertNotNull("Monitor should be created", monitor); + verify(mockSession, never()).getYarnClient(); + verify(mockYarnClient, never()).getQueueInfo(anyString()); + } + + @Test + public void testMetricsCollectorEnabledWithInterval() { + hiveConf.setTimeVar(HiveConf.ConfVars.HIVE_TEZ_QUEUE_METRICS_REFRESH_INTERVAL, 10, TimeUnit.SECONDS); + + when(mockSession.getYarnClient()).thenReturn(mockYarnClient); + when(mockSession.getQueueName()).thenReturn("default"); + + TezJobMonitor monitor = + new TezJobMonitor(mockSession, topSortedWorks, mockDagClient, hiveConf, mockDag, mockContext, mockCounters, + mockPerfLogger); + + assertNotNull("Monitor should be created", monitor); + verify(mockSession, atLeastOnce()).getYarnClient(); + } + + @Test + public void testMetricsCollectorDisabledWithZeroInterval() throws Exception { + hiveConf.setTimeVar(HiveConf.ConfVars.HIVE_TEZ_QUEUE_METRICS_REFRESH_INTERVAL, 0, TimeUnit.SECONDS); + + when(mockSession.getYarnClient()).thenReturn(mockYarnClient); + when(mockSession.getQueueName()).thenReturn("default"); + + TezJobMonitor monitor = + new TezJobMonitor(mockSession, topSortedWorks, mockDagClient, hiveConf, mockDag, mockContext, mockCounters, + mockPerfLogger); + + assertNotNull("Monitor should be created", monitor); + verify(mockSession, never()).getYarnClient(); + verify(mockYarnClient, never()).getQueueInfo(anyString()); + } + + @Test + public void testMetricsCollectorDisabledWithNegativeInterval() throws Exception { + hiveConf.setTimeVar(HiveConf.ConfVars.HIVE_TEZ_QUEUE_METRICS_REFRESH_INTERVAL, -1, TimeUnit.SECONDS); + + when(mockSession.getYarnClient()).thenReturn(mockYarnClient); + when(mockSession.getQueueName()).thenReturn("default"); + + TezJobMonitor monitor = + new TezJobMonitor(mockSession, topSortedWorks, mockDagClient, hiveConf, mockDag, mockContext, mockCounters, + mockPerfLogger); + + assertNotNull("Monitor should be created", monitor); + verify(mockSession, never()).getYarnClient(); + verify(mockYarnClient, never()).getQueueInfo(anyString()); + } + + @Test + public void testMetricsCollectorWithSmallInterval() { + hiveConf.setTimeVar(HiveConf.ConfVars.HIVE_TEZ_QUEUE_METRICS_REFRESH_INTERVAL, 500, TimeUnit.MILLISECONDS); + + when(mockSession.getYarnClient()).thenReturn(mockYarnClient); + when(mockSession.getQueueName()).thenReturn("default"); + + TezJobMonitor monitor = + new TezJobMonitor(mockSession, topSortedWorks, mockDagClient, hiveConf, mockDag, mockContext, mockCounters, + mockPerfLogger); + + assertNotNull("Monitor should be created with adjusted interval", monitor); + } + + @Test + public void testMetricsCollectorWithCustomQueue() { + hiveConf.setTimeVar(HiveConf.ConfVars.HIVE_TEZ_QUEUE_METRICS_REFRESH_INTERVAL, 15, TimeUnit.SECONDS); + + when(mockSession.getYarnClient()).thenReturn(mockYarnClient); + when(mockSession.getQueueName()).thenReturn("production.analytics"); + + TezJobMonitor monitor = + new TezJobMonitor(mockSession, topSortedWorks, mockDagClient, hiveConf, mockDag, mockContext, mockCounters, + mockPerfLogger); + + verify(mockSession, atLeastOnce()).getQueueName(); + assertNotNull("Monitor should be created with custom queue", monitor); + } + + /** + * Tests that TezJobMonitor handles edge cases gracefully when metrics collection + * cannot be initialized due to null or invalid inputs. + *

+ * This test covers three edge cases in a single parameterized test to avoid + * code duplication (SonarQube rule java:S5976): + *

+ *

+ * All three cases share identical structure (metrics enabled with 10s interval, + * one edge-case input), so they are expressed as a single test with an inline + * parameter table. + * + *

+   * label            | yarnClient  | getQueueName()
+   * nullYarnClient   | null        | "default"
+   * nullQueueName    | mock        | null
+   * blankQueueName   | mock        | "  "
+   * 
+ */ + @Test + public void testMetricsCollectorWithEdgeCaseQueueNameOrYarnClient() { + Object[][] cases = { { "nullYarnClient", null, "default" }, { "nullQueueName", mockYarnClient, null }, + { "blankQueueName", mockYarnClient, " " }, }; + + for (Object[] row : cases) { + String label = (String) row[0]; + YarnClient yarnClient = (YarnClient) row[1]; + String queueName = (String) row[2]; + + // Re-initialise mocks so state from the previous row does not leak. + MockitoAnnotations.openMocks(this); + when(mockDag.getName()).thenReturn("test-dag-1"); + + hiveConf.setTimeVar(HiveConf.ConfVars.HIVE_TEZ_QUEUE_METRICS_REFRESH_INTERVAL, 10, TimeUnit.SECONDS); + when(mockSession.getYarnClient()).thenReturn(yarnClient); + when(mockSession.getQueueName()).thenReturn(queueName); + + TezJobMonitor monitor = + new TezJobMonitor(mockSession, topSortedWorks, mockDagClient, hiveConf, mockDag, mockContext, mockCounters, + mockPerfLogger); + + assertNotNull("Monitor must be created for case [" + label + "]", monitor); + // With a positive interval the code must always reach the YarnClient gate. + verify(mockSession, atLeastOnce()).getYarnClient(); + // When a non-null YarnClient is present, getQueueName() must also be called. + if (yarnClient != null) { + verify(mockSession, atLeastOnce()).getQueueName(); + } + } + } +} diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/monitoring/TestTezProgressMonitorQueueMetrics.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/monitoring/TestTezProgressMonitorQueueMetrics.java new file mode 100644 index 000000000000..ac2b5039e490 --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/monitoring/TestTezProgressMonitorQueueMetrics.java @@ -0,0 +1,441 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.tez.monitoring; + +import org.apache.hadoop.hive.ql.exec.tez.YarnQueueMetricsCollector; +import org.apache.hadoop.hive.ql.plan.BaseWork; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.tez.dag.api.client.DAGClient; +import org.apache.tez.dag.api.client.DAGStatus; +import org.apache.tez.dag.api.client.Progress; +import org.apache.hadoop.hive.common.log.InPlaceUpdate; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.when; + + +/** + * Test cases for TezProgressMonitor queue metrics functionality. + */ +public class TestTezProgressMonitorQueueMetrics { + + @Mock + private DAGClient mockDagClient; + + @Mock + private DAGStatus mockDagStatus; + + @Mock + private YarnQueueMetricsCollector mockMetricsCollector; + + @Mock + private YarnQueueMetricsCollector.QueueMetricsSnapshot mockSnapshot; + + @Mock + private SessionState.LogHelper mockConsole; + + + @Before + public void setUp() { + MockitoAnnotations.openMocks(this); + } + + @Test + public void testQueueMetricsWithNullCollector() throws Exception { + List works = new ArrayList<>(); + Map progressMap = new HashMap<>(); + + when(mockDagStatus.getState()).thenReturn(DAGStatus.State.RUNNING); + + TezProgressMonitor monitor = new TezProgressMonitor( + mockDagClient, mockDagStatus, works, progressMap, mockConsole, + System.currentTimeMillis(), null); + + String result = monitor.queueMetrics(); + + assertEquals("Should return empty string when collector is null", "", result); + } + + @Test + public void testQueueMetricsWithNullSnapshot() throws Exception { + List works = new ArrayList<>(); + Map progressMap = new HashMap<>(); + + when(mockDagStatus.getState()).thenReturn(DAGStatus.State.RUNNING); + when(mockMetricsCollector.getLatestSnapshot()).thenReturn(null); + + TezProgressMonitor monitor = new TezProgressMonitor( + mockDagClient, mockDagStatus, works, progressMap, mockConsole, + System.currentTimeMillis(), mockMetricsCollector); + + String result = monitor.queueMetrics(); + + assertEquals("Should return 'unavailable' when snapshot is null", + "QUEUE: unavailable", result); + } + + @Test + public void testQueueMetricsFormatting() throws Exception { + List works = new ArrayList<>(); + Map progressMap = new HashMap<>(); + + // Setup snapshot with known values + long now = System.currentTimeMillis(); + when(mockSnapshot.getMemoryUsedGB()).thenReturn(8.5f); + when(mockSnapshot.getMemoryTotalGB()).thenReturn(16.0f); + when(mockSnapshot.getMemoryPercentage()).thenReturn("53.12%"); + when(mockSnapshot.getVCoresUsed()).thenReturn(100); + when(mockSnapshot.getVCoresTotal()).thenReturn(200); + when(mockSnapshot.getVCoresPercentage()).thenReturn("50.00%"); + when(mockSnapshot.getCapacityPercentage()).thenReturn(60.0f); + when(mockSnapshot.getCurrentCapacityPercentage()).thenReturn(25.0f); + when(mockSnapshot.getRunningApps()).thenReturn(5); + when(mockSnapshot.getPendingApps()).thenReturn(2); + when(mockSnapshot.getAllocatedContainers()).thenReturn(12); + when(mockSnapshot.getPendingContainers()).thenReturn(10); + when(mockSnapshot.getCollectionTimestamp()).thenReturn(now - 5000); // 5 seconds ago + + when(mockDagStatus.getState()).thenReturn(DAGStatus.State.RUNNING); + when(mockMetricsCollector.getLatestSnapshot()).thenReturn(mockSnapshot); + when(mockMetricsCollector.getQueueName()).thenReturn("default"); + + TezProgressMonitor monitor = new TezProgressMonitor( + mockDagClient, mockDagStatus, works, progressMap, mockConsole, + System.currentTimeMillis(), mockMetricsCollector); + + String result = monitor.queueMetrics(); + + // Verify 4-line format (no staleness) + String[] lines = result.split("\n"); + assertEquals("Should have 4 lines", 4, lines.length); + + // Line 1: Queue name (no staleness) + assertTrue("Line 1 should contain queue name", lines[0].contains("QUEUE: default")); + assertFalse("Line 1 should NOT contain staleness", lines[0].contains("ago")); + + // Line 2: Memory + VCores with "used" label + assertTrue("Line 2 should contain memory info", lines[1].contains("MEMORY: 8.5/16.0 GB")); + assertTrue("Line 2 should contain 'used' label", lines[1].contains("53.12% used")); + assertTrue("Line 2 should contain vCores info", lines[1].contains("VCORES: 100/200")); + assertTrue("Line 2 should contain vCores 'used' label", lines[1].contains("50.00% used")); + + // Line 3: Capacity with (used) and (allocated) labels + assertTrue("Line 3 should contain capacity used", lines[2].contains("CAPACITY: 25.00% (used)")); + assertTrue("Line 3 should contain capacity allocated", lines[2].contains("60.00% (allocated)")); + + // Line 4: Apps and Containers + assertTrue("Line 4 should contain running apps", lines[3].contains("APPS: 5 running")); + assertTrue("Line 4 should contain pending apps", lines[3].contains("2 pending")); + assertTrue("Line 4 should contain allocated containers", lines[3].contains("CONTAINERS: 12 allocated")); + assertTrue("Line 4 should contain pending containers", lines[3].contains("10 pending")); + + } + + @Test + public void testQueueMetricsStalenessBeyond60Seconds() throws Exception { + List works = new ArrayList<>(); + Map progressMap = new HashMap<>(); + + long now = System.currentTimeMillis(); + when(mockSnapshot.getMemoryUsedGB()).thenReturn(1.0f); + when(mockSnapshot.getMemoryTotalGB()).thenReturn(10.0f); + when(mockSnapshot.getMemoryPercentage()).thenReturn("10.00%"); + when(mockSnapshot.getVCoresUsed()).thenReturn(10); + when(mockSnapshot.getVCoresTotal()).thenReturn(100); + when(mockSnapshot.getVCoresPercentage()).thenReturn("10.00%"); + when(mockSnapshot.getCapacityPercentage()).thenReturn(50.0f); + when(mockSnapshot.getCurrentCapacityPercentage()).thenReturn(10.0f); + when(mockSnapshot.getRunningApps()).thenReturn(1); + when(mockSnapshot.getPendingApps()).thenReturn(0); + when(mockSnapshot.getAllocatedContainers()).thenReturn(2); + when(mockSnapshot.getPendingContainers()).thenReturn(0); + when(mockSnapshot.getCollectionTimestamp()).thenReturn(now - 120000); // 120 seconds ago + + when(mockDagStatus.getState()).thenReturn(DAGStatus.State.RUNNING); + when(mockMetricsCollector.getLatestSnapshot()).thenReturn(mockSnapshot); + when(mockMetricsCollector.getQueueName()).thenReturn("default"); + + TezProgressMonitor monitor = new TezProgressMonitor( + mockDagClient, mockDagStatus, works, progressMap, mockConsole, + System.currentTimeMillis(), mockMetricsCollector); + + String result = monitor.queueMetrics(); + + String[] lines = result.split("\n"); + assertEquals("Should have 4 lines", 4, lines.length); + // Staleness is removed in new format, so line 1 should only have queue name + assertFalse("Line 1 should NOT show staleness (removed from new format)", lines[0].contains("ago")); + assertTrue("Line 1 should contain QUEUE: default", lines[0].contains("QUEUE: default")); + } + + @Test + public void testQueueNameTruncation() throws Exception { + List works = new ArrayList<>(); + Map progressMap = new HashMap<>(); + + long now = System.currentTimeMillis(); + when(mockSnapshot.getMemoryUsedGB()).thenReturn(1.0f); + when(mockSnapshot.getMemoryTotalGB()).thenReturn(10.0f); + when(mockSnapshot.getMemoryPercentage()).thenReturn("10.00%"); + when(mockSnapshot.getVCoresUsed()).thenReturn(10); + when(mockSnapshot.getVCoresTotal()).thenReturn(100); + when(mockSnapshot.getVCoresPercentage()).thenReturn("10.00%"); + when(mockSnapshot.getCapacityPercentage()).thenReturn(50.0f); + when(mockSnapshot.getCurrentCapacityPercentage()).thenReturn(10.0f); + when(mockSnapshot.getRunningApps()).thenReturn(1); + when(mockSnapshot.getPendingApps()).thenReturn(0); + when(mockSnapshot.getAllocatedContainers()).thenReturn(2); + when(mockSnapshot.getPendingContainers()).thenReturn(0); + when(mockSnapshot.getCollectionTimestamp()).thenReturn(now - 1000); + + when(mockDagStatus.getState()).thenReturn(DAGStatus.State.RUNNING); + when(mockMetricsCollector.getLatestSnapshot()).thenReturn(mockSnapshot); + // Very long queue name + when(mockMetricsCollector.getQueueName()).thenReturn( + "root.production.analytics.data-engineering.team-alpha.project-beta"); + + TezProgressMonitor monitor = new TezProgressMonitor( + mockDagClient, mockDagStatus, works, progressMap, mockConsole, + System.currentTimeMillis(), mockMetricsCollector); + + String result = monitor.queueMetrics(); + + String[] lines = result.split("\n"); + assertEquals("Should have 4 lines", 4, lines.length); + + // Line 1 should not exceed separator width (94 chars) - without staleness, more room for queue name + assertTrue("Line 1 should not exceed 94 characters", lines[0].length() <= 94); + + // When the full queue name would cause line 1 overflow, it should be truncated from start with "..." + if (lines[0].contains("...")) { + // Queue name was truncated + assertFalse("Full long queue name should not appear", + lines[0].contains("root.production.analytics.data-engineering.team-alpha.project-beta")); + assertTrue("Truncated queue name should contain ...", lines[0].contains("...")); + } + + // Line 1 should NOT contain staleness (removed in new format) + assertFalse("Line 1 should NOT contain staleness", lines[0].contains("ago")); + + // Line 2 should contain resource info + assertTrue("Line 2 should contain MEMORY", lines[1].contains("MEMORY:")); + + // Line 3 should contain capacity + assertTrue("Line 3 should contain CAPACITY", lines[2].contains("CAPACITY:")); + + } + + @Test + public void testQueueMetricsWithZeroPercentages() throws Exception { + List works = new ArrayList<>(); + Map progressMap = new HashMap<>(); + + long now = System.currentTimeMillis(); + when(mockSnapshot.getMemoryUsedGB()).thenReturn(0.0f); + when(mockSnapshot.getMemoryTotalGB()).thenReturn(0.0f); + when(mockSnapshot.getMemoryPercentage()).thenReturn("N/A"); + when(mockSnapshot.getVCoresUsed()).thenReturn(0); + when(mockSnapshot.getVCoresTotal()).thenReturn(0); + when(mockSnapshot.getVCoresPercentage()).thenReturn("N/A"); + when(mockSnapshot.getCapacityPercentage()).thenReturn(50.0f); + when(mockSnapshot.getCurrentCapacityPercentage()).thenReturn(0.0f); + when(mockSnapshot.getRunningApps()).thenReturn(0); + when(mockSnapshot.getPendingApps()).thenReturn(0); + when(mockSnapshot.getAllocatedContainers()).thenReturn(0); + when(mockSnapshot.getPendingContainers()).thenReturn(0); + when(mockSnapshot.getCollectionTimestamp()).thenReturn(now); + + when(mockDagStatus.getState()).thenReturn(DAGStatus.State.RUNNING); + when(mockMetricsCollector.getLatestSnapshot()).thenReturn(mockSnapshot); + when(mockMetricsCollector.getQueueName()).thenReturn("empty"); + + TezProgressMonitor monitor = new TezProgressMonitor( + mockDagClient, mockDagStatus, works, progressMap, mockConsole, + System.currentTimeMillis(), mockMetricsCollector); + + String result = monitor.queueMetrics(); + + String[] lines = result.split("\n"); + assertEquals("Should have 4 lines", 4, lines.length); + // Line 1: queue name (no staleness) + assertTrue("Line 1 should contain queue name", lines[0].contains("QUEUE: empty")); + assertFalse("Line 1 should NOT contain staleness", lines[0].contains("ago")); + // Line 2: memory + vcores with N/A + assertTrue("Line 2 should contain N/A for memory percentage", lines[1].contains("N/A")); + assertTrue("Line 2 should handle zero values", lines[1].contains("0.0/0.0 GB")); + // Line 3: capacity + assertTrue("Line 3 should contain capacity", lines[2].contains("CAPACITY:")); + // Line 4: apps and containers + assertTrue("Line 4 should contain APPS:", lines[3].contains("APPS:")); + assertTrue("Line 4 should contain CONTAINERS:", lines[3].contains("CONTAINERS:")); + } + + @Test + public void testQueueMetricsExceptionHandling() throws Exception { + List works = new ArrayList<>(); + Map progressMap = new HashMap<>(); + + when(mockDagStatus.getState()).thenReturn(DAGStatus.State.RUNNING); + when(mockMetricsCollector.getLatestSnapshot()).thenThrow( + new RuntimeException("Unexpected error")); + + TezProgressMonitor monitor = new TezProgressMonitor( + mockDagClient, mockDagStatus, works, progressMap, mockConsole, + System.currentTimeMillis(), mockMetricsCollector); + + String result = monitor.queueMetrics(); + + // Should gracefully handle exceptions and return unavailable + assertEquals("Should return unavailable on exception", + "QUEUE: unavailable", result); + } + + @Test + public void testStalenessAtExactly60Seconds() throws Exception { + List works = new ArrayList<>(); + Map progressMap = new HashMap<>(); + + long now = System.currentTimeMillis(); + when(mockSnapshot.getMemoryUsedGB()).thenReturn(1.0f); + when(mockSnapshot.getMemoryTotalGB()).thenReturn(10.0f); + when(mockSnapshot.getMemoryPercentage()).thenReturn("10.00%"); + when(mockSnapshot.getVCoresUsed()).thenReturn(10); + when(mockSnapshot.getVCoresTotal()).thenReturn(100); + when(mockSnapshot.getVCoresPercentage()).thenReturn("10.00%"); + when(mockSnapshot.getCapacityPercentage()).thenReturn(50.0f); + when(mockSnapshot.getCurrentCapacityPercentage()).thenReturn(25.0f); + when(mockSnapshot.getRunningApps()).thenReturn(1); + when(mockSnapshot.getPendingApps()).thenReturn(0); + when(mockSnapshot.getAllocatedContainers()).thenReturn(2); + when(mockSnapshot.getPendingContainers()).thenReturn(0); + when(mockSnapshot.getCollectionTimestamp()).thenReturn(now - 60000L); // exactly 60s ago + + when(mockDagStatus.getState()).thenReturn(DAGStatus.State.RUNNING); + when(mockMetricsCollector.getLatestSnapshot()).thenReturn(mockSnapshot); + when(mockMetricsCollector.getQueueName()).thenReturn("default"); + + TezProgressMonitor monitor = new TezProgressMonitor( + mockDagClient, mockDagStatus, works, progressMap, mockConsole, + System.currentTimeMillis(), mockMetricsCollector); + + String result = monitor.queueMetrics(); + String[] lines = result.split("\n"); + + assertEquals("Should have 4 lines", 4, lines.length); + // Staleness is removed in new format + assertFalse("Should NOT show staleness (removed in new format)", lines[0].contains("ago")); + assertTrue("Line 1 should contain queue name", lines[0].contains("QUEUE: default")); + } + + @Test + public void testStalenessAtZeroSeconds() throws Exception { + List works = new ArrayList<>(); + Map progressMap = new HashMap<>(); + + long now = System.currentTimeMillis(); + when(mockSnapshot.getMemoryUsedGB()).thenReturn(2.0f); + when(mockSnapshot.getMemoryTotalGB()).thenReturn(10.0f); + when(mockSnapshot.getMemoryPercentage()).thenReturn("20.00%"); + when(mockSnapshot.getVCoresUsed()).thenReturn(5); + when(mockSnapshot.getVCoresTotal()).thenReturn(50); + when(mockSnapshot.getVCoresPercentage()).thenReturn("10.00%"); + when(mockSnapshot.getCapacityPercentage()).thenReturn(60.0f); + when(mockSnapshot.getCurrentCapacityPercentage()).thenReturn(30.0f); + when(mockSnapshot.getRunningApps()).thenReturn(2); + when(mockSnapshot.getPendingApps()).thenReturn(0); + when(mockSnapshot.getAllocatedContainers()).thenReturn(4); + when(mockSnapshot.getPendingContainers()).thenReturn(0); + when(mockSnapshot.getCollectionTimestamp()).thenReturn(now); // right now + + when(mockDagStatus.getState()).thenReturn(DAGStatus.State.RUNNING); + when(mockMetricsCollector.getLatestSnapshot()).thenReturn(mockSnapshot); + when(mockMetricsCollector.getQueueName()).thenReturn("default"); + + TezProgressMonitor monitor = new TezProgressMonitor( + mockDagClient, mockDagStatus, works, progressMap, mockConsole, + System.currentTimeMillis(), mockMetricsCollector); + + String result = monitor.queueMetrics(); + String[] lines = result.split("\n"); + + assertEquals("Should have 4 lines", 4, lines.length); + // Staleness is removed in new format + assertFalse("Should NOT show staleness (removed in new format)", lines[0].contains("ago")); + assertTrue("Line 1 should contain queue name", lines[0].contains("QUEUE: default")); + } + + @Test + public void testQueueNameExactlyAtMaxLength() throws Exception { + List works = new ArrayList<>(); + Map progressMap = new HashMap<>(); + + long now = System.currentTimeMillis(); + when(mockSnapshot.getMemoryUsedGB()).thenReturn(1.0f); + when(mockSnapshot.getMemoryTotalGB()).thenReturn(10.0f); + when(mockSnapshot.getMemoryPercentage()).thenReturn("10.00%"); + when(mockSnapshot.getVCoresUsed()).thenReturn(1); + when(mockSnapshot.getVCoresTotal()).thenReturn(10); + when(mockSnapshot.getVCoresPercentage()).thenReturn("10.00%"); + when(mockSnapshot.getCapacityPercentage()).thenReturn(40.0f); + when(mockSnapshot.getCurrentCapacityPercentage()).thenReturn(20.0f); + when(mockSnapshot.getRunningApps()).thenReturn(0); + when(mockSnapshot.getPendingApps()).thenReturn(0); + when(mockSnapshot.getAllocatedContainers()).thenReturn(0); + when(mockSnapshot.getPendingContainers()).thenReturn(0); + when(mockSnapshot.getCollectionTimestamp()).thenReturn(now - 2000); // not used since staleness removed + + when(mockDagStatus.getState()).thenReturn(DAGStatus.State.RUNNING); + when(mockMetricsCollector.getLatestSnapshot()).thenReturn(mockSnapshot); + + // Build a queue name that exactly fills the allowed space: + // Line 1 budget: MIN_TERMINAL_WIDTH (94) - "QUEUE: ".length(7) = 87 (no staleness) + int maxLen = InPlaceUpdate.MIN_TERMINAL_WIDTH - "QUEUE: ".length(); + String exactName = "q".repeat(maxLen); // exactly maxLen characters + when(mockMetricsCollector.getQueueName()).thenReturn(exactName); + + TezProgressMonitor monitor = new TezProgressMonitor( + mockDagClient, mockDagStatus, works, progressMap, mockConsole, + System.currentTimeMillis(), mockMetricsCollector); + + String result = monitor.queueMetrics(); + String[] lines = result.split("\n"); + + assertEquals("Should have 4 lines", 4, lines.length); + // Exactly at max length — should NOT be truncated + assertFalse("Queue name at exact max length should not be truncated", + lines[0].contains("...")); + assertTrue("Full queue name should appear", lines[0].contains(exactName)); + assertTrue("Line 1 should still be within terminal width", + lines[0].length() <= InPlaceUpdate.MIN_TERMINAL_WIDTH); + // No staleness in new format + assertFalse("Should NOT show staleness", lines[0].contains("ago")); + } +} + diff --git a/service-rpc/if/TCLIService.thrift b/service-rpc/if/TCLIService.thrift index a399e66445c8..c4a53f9b71a0 100644 --- a/service-rpc/if/TCLIService.thrift +++ b/service-rpc/if/TCLIService.thrift @@ -1286,6 +1286,7 @@ struct TProgressUpdateResp { 4: required TJobExecutionStatus status 5: required string footerSummary 6: required i64 startTime + 7: optional string queueMetrics } struct TGetQueryIdReq { diff --git a/service-rpc/src/gen/thrift/gen-javabean/org/apache/hive/service/rpc/thrift/TProgressUpdateResp.java b/service-rpc/src/gen/thrift/gen-javabean/org/apache/hive/service/rpc/thrift/TProgressUpdateResp.java index 8b2f6a21629e..ef787a15d080 100644 --- a/service-rpc/src/gen/thrift/gen-javabean/org/apache/hive/service/rpc/thrift/TProgressUpdateResp.java +++ b/service-rpc/src/gen/thrift/gen-javabean/org/apache/hive/service/rpc/thrift/TProgressUpdateResp.java @@ -17,6 +17,7 @@ private static final org.apache.thrift.protocol.TField STATUS_FIELD_DESC = new org.apache.thrift.protocol.TField("status", org.apache.thrift.protocol.TType.I32, (short)4); private static final org.apache.thrift.protocol.TField FOOTER_SUMMARY_FIELD_DESC = new org.apache.thrift.protocol.TField("footerSummary", org.apache.thrift.protocol.TType.STRING, (short)5); private static final org.apache.thrift.protocol.TField START_TIME_FIELD_DESC = new org.apache.thrift.protocol.TField("startTime", org.apache.thrift.protocol.TType.I64, (short)6); + private static final org.apache.thrift.protocol.TField QUEUE_METRICS_FIELD_DESC = new org.apache.thrift.protocol.TField("queueMetrics", org.apache.thrift.protocol.TType.STRING, (short)7); private static final org.apache.thrift.scheme.SchemeFactory STANDARD_SCHEME_FACTORY = new TProgressUpdateRespStandardSchemeFactory(); private static final org.apache.thrift.scheme.SchemeFactory TUPLE_SCHEME_FACTORY = new TProgressUpdateRespTupleSchemeFactory(); @@ -27,6 +28,7 @@ private @org.apache.thrift.annotation.Nullable TJobExecutionStatus status; // required private @org.apache.thrift.annotation.Nullable java.lang.String footerSummary; // required private long startTime; // required + private @org.apache.thrift.annotation.Nullable java.lang.String queueMetrics; // optional /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ public enum _Fields implements org.apache.thrift.TFieldIdEnum { @@ -39,7 +41,8 @@ public enum _Fields implements org.apache.thrift.TFieldIdEnum { */ STATUS((short)4, "status"), FOOTER_SUMMARY((short)5, "footerSummary"), - START_TIME((short)6, "startTime"); + START_TIME((short)6, "startTime"), + QUEUE_METRICS((short)7, "queueMetrics"); private static final java.util.Map byName = new java.util.HashMap(); @@ -67,6 +70,8 @@ public static _Fields findByThriftId(int fieldId) { return FOOTER_SUMMARY; case 6: // START_TIME return START_TIME; + case 7: // QUEUE_METRICS + return QUEUE_METRICS; default: return null; } @@ -129,6 +134,8 @@ public java.lang.String getFieldName() { new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); tmpMap.put(_Fields.START_TIME, new org.apache.thrift.meta_data.FieldMetaData("startTime", org.apache.thrift.TFieldRequirementType.REQUIRED, new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64))); + tmpMap.put(_Fields.QUEUE_METRICS, new org.apache.thrift.meta_data.FieldMetaData("queueMetrics", org.apache.thrift.TFieldRequirementType.OPTIONAL, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); metaDataMap = java.util.Collections.unmodifiableMap(tmpMap); org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(TProgressUpdateResp.class, metaDataMap); } @@ -152,7 +159,6 @@ public TProgressUpdateResp( this.status = status; this.footerSummary = footerSummary; this.startTime = startTime; - setStartTimeIsSet(true); } /** @@ -180,6 +186,9 @@ public TProgressUpdateResp(TProgressUpdateResp other) { this.footerSummary = other.footerSummary; } this.startTime = other.startTime; + if (other.isSetQueueMetrics()) { + this.queueMetrics = other.queueMetrics; + } } public TProgressUpdateResp deepCopy() { @@ -196,6 +205,7 @@ public void clear() { this.footerSummary = null; setStartTimeIsSet(false); this.startTime = 0; + this.queueMetrics = null; } public int getHeaderNamesSize() { @@ -378,6 +388,30 @@ public void setStartTimeIsSet(boolean value) { __isset_bitfield = org.apache.thrift.EncodingUtils.setBit(__isset_bitfield, __STARTTIME_ISSET_ID, value); } + @org.apache.thrift.annotation.Nullable + public java.lang.String getQueueMetrics() { + return this.queueMetrics; + } + + public void setQueueMetrics(@org.apache.thrift.annotation.Nullable java.lang.String queueMetrics) { + this.queueMetrics = queueMetrics; + } + + public void unsetQueueMetrics() { + this.queueMetrics = null; + } + + /** Returns true if field queueMetrics is set (has been assigned a value) and false otherwise */ + public boolean isSetQueueMetrics() { + return this.queueMetrics != null; + } + + public void setQueueMetricsIsSet(boolean value) { + if (!value) { + this.queueMetrics = null; + } + } + public void setFieldValue(_Fields field, @org.apache.thrift.annotation.Nullable java.lang.Object value) { switch (field) { case HEADER_NAMES: @@ -428,6 +462,14 @@ public void setFieldValue(_Fields field, @org.apache.thrift.annotation.Nullable } break; + case QUEUE_METRICS: + if (value == null) { + unsetQueueMetrics(); + } else { + setQueueMetrics((java.lang.String)value); + } + break; + } } @@ -452,6 +494,9 @@ public java.lang.Object getFieldValue(_Fields field) { case START_TIME: return getStartTime(); + case QUEUE_METRICS: + return getQueueMetrics(); + } throw new java.lang.IllegalStateException(); } @@ -475,6 +520,8 @@ public boolean isSet(_Fields field) { return isSetFooterSummary(); case START_TIME: return isSetStartTime(); + case QUEUE_METRICS: + return isSetQueueMetrics(); } throw new java.lang.IllegalStateException(); } @@ -546,6 +593,15 @@ public boolean equals(TProgressUpdateResp that) { return false; } + boolean this_present_queueMetrics = true && this.isSetQueueMetrics(); + boolean that_present_queueMetrics = true && that.isSetQueueMetrics(); + if (this_present_queueMetrics || that_present_queueMetrics) { + if (!(this_present_queueMetrics && that_present_queueMetrics)) + return false; + if (!this.queueMetrics.equals(that.queueMetrics)) + return false; + } + return true; } @@ -573,6 +629,10 @@ public int hashCode() { hashCode = hashCode * 8191 + org.apache.thrift.TBaseHelper.hashCode(startTime); + hashCode = hashCode * 8191 + ((isSetQueueMetrics()) ? 131071 : 524287); + if (isSetQueueMetrics()) + hashCode = hashCode * 8191 + queueMetrics.hashCode(); + return hashCode; } @@ -851,6 +911,14 @@ public void read(org.apache.thrift.protocol.TProtocol iprot, TProgressUpdateResp org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } break; + case 7: // QUEUE_METRICS + if (schemeField.type == org.apache.thrift.protocol.TType.STRING) { + struct.queueMetrics = iprot.readString(); + struct.setQueueMetricsIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; default: org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } @@ -911,6 +979,11 @@ public void write(org.apache.thrift.protocol.TProtocol oprot, TProgressUpdateRes oprot.writeFieldBegin(START_TIME_FIELD_DESC); oprot.writeI64(struct.startTime); oprot.writeFieldEnd(); + if (struct.queueMetrics != null) { + oprot.writeFieldBegin(QUEUE_METRICS_FIELD_DESC); + oprot.writeString(struct.queueMetrics); + oprot.writeFieldEnd(); + } oprot.writeFieldStop(); oprot.writeStructEnd(); } diff --git a/service/src/java/org/apache/hive/service/cli/JobProgressUpdate.java b/service/src/java/org/apache/hive/service/cli/JobProgressUpdate.java index 332ef2dace64..987a76b1f105 100644 --- a/service/src/java/org/apache/hive/service/cli/JobProgressUpdate.java +++ b/service/src/java/org/apache/hive/service/cli/JobProgressUpdate.java @@ -28,21 +28,23 @@ public class JobProgressUpdate { private final List headers; private final List> rows; public final String status; + private final String queueMetrics; JobProgressUpdate(ProgressMonitor monitor) { this(monitor.headers(), monitor.rows(), monitor.footerSummary(), monitor.progressedPercentage(), - monitor.startTime(), monitor.executionStatus()); + monitor.startTime(), monitor.executionStatus(), monitor.queueMetrics()); } private JobProgressUpdate(List headers, List> rows, String footerSummary, - double progressedPercentage, long startTimeMillis, String status) { + double progressedPercentage, long startTimeMillis, String status, String queueMetrics) { this.progressedPercentage = progressedPercentage; this.footerSummary = footerSummary; this.startTimeMillis = startTimeMillis; this.headers = headers; this.rows = rows; this.status = status; + this.queueMetrics = queueMetrics; } public List headers() { @@ -52,4 +54,8 @@ public List headers() { public List> rows() { return rows; } + + public String queueMetrics() { + return queueMetrics; + } } diff --git a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java index ccf576fe50d3..6621bb62996d 100644 --- a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java +++ b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java @@ -813,14 +813,27 @@ public TGetOperationStatusResp GetOperationStatus(TGetOperationStatusReq req) th } TJobExecutionStatus executionStatus = mapper.forStatus(progressUpdate.status); - resp.setProgressUpdateResponse(new TProgressUpdateResp( + TProgressUpdateResp tProgressUpdateResp = new TProgressUpdateResp( progressUpdate.headers(), progressUpdate.rows(), progressUpdate.progressedPercentage, executionStatus, progressUpdate.footerSummary, progressUpdate.startTimeMillis - )); + ); + // HIVE-27126: Workaround for Thrift code generation bug. + // When queueMetrics field was added and Thrift code regenerated, the generated constructor + // accepts startTimeMillis parameter but FAILS to call setStartTimeIsSet(true). + // Without the isset flag, Thrift serialization treats startTime as unset, causing: + // 1) Clients receive incomplete/invalid progress updates + // 2) Thrift validation may fail or skip the field entirely + // Solution: Explicitly call setStartTime() after construction to set the isset flag. + // This ensures proper serialization and backward compatibility with Thrift clients. + tProgressUpdateResp.setStartTime(progressUpdate.startTimeMillis); + if (progressUpdate.queueMetrics() != null && !progressUpdate.queueMetrics().isEmpty()) { + tProgressUpdateResp.setQueueMetrics(progressUpdate.queueMetrics()); + } + resp.setProgressUpdateResponse(tProgressUpdateResp); if (opException != null) { resp.setSqlState(opException.getSQLState()); resp.setErrorCode(opException.getErrorCode());