diff --git a/extensions-contrib/prometheus-emitter/src/main/resources/defaultMetrics.json b/extensions-contrib/prometheus-emitter/src/main/resources/defaultMetrics.json index bbb4d7b11a55..bfd4d667363b 100644 --- a/extensions-contrib/prometheus-emitter/src/main/resources/defaultMetrics.json +++ b/extensions-contrib/prometheus-emitter/src/main/resources/defaultMetrics.json @@ -150,9 +150,14 @@ "segment/nuked/bytes" : { "dimensions" : ["dataSource", "taskType"], "type" : "count", "help": "Size in bytes of segments deleted via the Kill Task." }, "task/success/count" : { "dimensions" : ["dataSource"], "type" : "count", "help": "Number of successful tasks per emission period."}, "task/failed/count" : { "dimensions" : ["dataSource"], "type" : "count", "help": "Number of failed tasks per emission period."}, - "task/running/count" : { "dimensions" : ["dataSource"], "type" : "count", "help": "Number of current running tasks."}, + "task/running/count" : { "dimensions" : ["dataSource", "category"], "type" : "count", "help": "Number of current running tasks."}, "task/pending/count" : { "dimensions" : ["dataSource"], "type" : "count", "help": "Number of current pending tasks."}, "task/waiting/count" : { "dimensions" : ["dataSource"], "type" : "count", "help": "Number of current waiting tasks."}, + "taskSlot/total/count" : { "dimensions" : ["category"], "type" : "gauge", "help": "Number of total task slots per emission period."}, + "taskSlot/idle/count" : { "dimensions" : ["category"], "type" : "gauge", "help": "Number of idle task slots per emission period."}, + "taskSlot/used/count" : { "dimensions" : ["category"], "type" : "gauge", "help": "Number of busy task slots per emission period."}, + "taskSlot/lazy/count" : { "dimensions" : ["category"], "type" : "gauge", "help": "Number of total task slots in lazy marked Middle Managers and Indexers per emission period."}, + "taskSlot/blacklisted/count" : { "dimensions" : ["category"], "type" : "gauge", "help": "Number of total task slots in blacklisted Middle Managers and Indexers per emission period."}, "supervisor/count" : { "dimensions" : ["supervisorId", "type", "state", "detailedState"], "type" : "gauge", "help": "Count of active supervisors. Each supervisor emits 1, tagged with its state. Available only if the SupervisorStatsMonitor module is included."}, "segment/assigned/count" : { "dimensions" : ["tier"], "type" : "count", "help": "Number of segments assigned to be loaded in the cluster."}, diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java index db17c0ed8662..5d34662f7431 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java @@ -32,6 +32,7 @@ import org.apache.druid.server.metrics.TaskSlotCountStatsProvider; import javax.annotation.Nullable; +import java.util.Collections; import java.util.Map; import java.util.concurrent.atomic.AtomicReference; @@ -183,6 +184,17 @@ public Map getRunningTaskCount() Optional taskQueue = getTaskQueue(); if (taskQueue.isPresent()) { return taskQueue.get().getRunningTaskCount(); + } else { + return Collections.emptyMap(); + } + } + + @Override + public Map> getRunningTaskCountByCategory() + { + Optional taskQueue = getTaskQueue(); + if (taskQueue.isPresent()) { + return taskQueue.get().getRunningTaskCountByCategory(); } else { return null; } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java index 405e6f8f60dc..73465adc6524 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java @@ -947,6 +947,26 @@ public Map getRunningTaskCount() )); } + public Map> getRunningTaskCountByCategory() + { + final Map> byCategory = + taskRunner.getRunningTasksByCategory(); + if (byCategory.isEmpty()) { + return null; + } + final Map taskDatasources = getCurrentTaskDatasources(); + final Map> result = new HashMap<>(); + byCategory.forEach((category, items) -> { + final Map countsByKey = new HashMap<>(); + for (final TaskRunnerWorkItem item : items) { + final RowKey key = taskDatasources.getOrDefault(item.getTaskId(), RowKey.empty()); + countsByKey.merge(key, 1L, Long::sum); + } + result.put(category, countsByKey); + }); + return result; + } + public Map getPendingTaskCount() { final Map taskDatasources = getCurrentTaskDatasources(); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunner.java index 2178bc433dfd..af2f3d58a769 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunner.java @@ -32,6 +32,7 @@ import javax.annotation.Nullable; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.Executor; @@ -111,6 +112,10 @@ default RunnerTaskState getRunnerTaskState(String taskId) return null; } + default Map> getRunningTasksByCategory() + { + return Collections.emptyMap(); + } default TaskLocation getTaskLocation(String taskId) { return TaskLocation.unknown(); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java index dcf4bdf266ec..875bd42104cb 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java @@ -1701,6 +1701,19 @@ public int getTotalCapacity() return getWorkers().stream().mapToInt(workerInfo -> workerInfo.getWorker().getCapacity()).sum(); } + @Override + @SuppressWarnings("GuardedBy") // Read on tasks is safe + public Map> getRunningTasksByCategory() + { + final Map> grouped = new HashMap<>(); + for (final HttpRemoteTaskRunnerWorkItem item : tasks.values()) { + if (item.getState() == HttpRemoteTaskRunnerWorkItem.State.RUNNING && item.getWorker() != null) { + grouped.computeIfAbsent(item.getWorker().getCategory(), k -> new ArrayList<>()).add(item); + } + } + return new HashMap<>(grouped); + } + /** * Retrieves the maximum capacity of the task runner when autoscaling is enabled.* diff --git a/server/src/main/java/org/apache/druid/server/metrics/TaskCountStatsProvider.java b/server/src/main/java/org/apache/druid/server/metrics/TaskCountStatsProvider.java index f94deb562bf5..bf5f99c47cd4 100644 --- a/server/src/main/java/org/apache/druid/server/metrics/TaskCountStatsProvider.java +++ b/server/src/main/java/org/apache/druid/server/metrics/TaskCountStatsProvider.java @@ -56,6 +56,17 @@ public interface TaskCountStatsProvider @Deprecated Map getWaitingTaskCount(); + /** + * Return the number of current running tasks grouped by workerCategory, then by datasource and task type. + * The outer key is the worker category string; the inner map mirrors the structure of + * {@link #getRunningTaskCount()}. Returns null if the underlying implementation does not support + * per-category grouping (e.g., when not using a worker-pool-based task runner). + */ + default Map> getRunningTaskCountByCategory() + { + return null; + } + /** * Collects all task level stats. This method deprecates the other task stats * methods such as {@link #getPendingTaskCount()}, {@link #getWaitingTaskCount()}