Skip to content

Commit a5de797

Browse files
authored
#AI commit# 开发阶段:新增功能 - 添加批量获取队列资源功能支持 (#935)
1 parent efe408b commit a5de797

5 files changed

Lines changed: 268 additions & 2 deletions

File tree

linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/external/service/ExternalResourceService.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,12 @@ List<ExternalAppInfo> getAppInfo(
5050
ExternalResourceIdentifier identifier)
5151
throws RMErrorException;
5252

53+
Map<String, NodeResource> getBatchResource(
54+
ResourceType resourceType,
55+
RMLabelContainer labelContainer,
56+
List<ExternalResourceIdentifier> identifiers)
57+
throws RMErrorException;
58+
5359
ExternalResourceProvider chooseProvider(
5460
ResourceType resourceType, RMLabelContainer labelContainer) throws RMErrorException;
5561
}

linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/external/service/impl/ExternalResourceServiceImpl.java

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.linkis.manager.rm.external.service.impl;
1919

2020
import org.apache.linkis.manager.common.conf.RMConfiguration;
21+
import org.apache.linkis.manager.common.entity.resource.CommonNodeResource;
2122
import org.apache.linkis.manager.common.entity.resource.NodeResource;
2223
import org.apache.linkis.manager.common.entity.resource.ResourceType;
2324
import org.apache.linkis.manager.common.exception.RMErrorException;
@@ -33,6 +34,8 @@
3334
import org.apache.linkis.manager.rm.external.parser.YarnResourceIdentifierParser;
3435
import org.apache.linkis.manager.rm.external.request.ExternalResourceRequester;
3536
import org.apache.linkis.manager.rm.external.service.ExternalResourceService;
37+
import org.apache.linkis.manager.rm.external.yarn.YarnQueueInfo;
38+
import org.apache.linkis.manager.rm.external.yarn.YarnResourceIdentifier;
3639
import org.apache.linkis.manager.rm.external.yarn.YarnResourceRequester;
3740
import org.apache.linkis.manager.rm.utils.RMUtils;
3841

@@ -44,11 +47,13 @@
4447

4548
import java.net.ConnectException;
4649
import java.text.MessageFormat;
50+
import java.util.HashMap;
4751
import java.util.List;
4852
import java.util.Map;
4953
import java.util.concurrent.ExecutionException;
5054
import java.util.concurrent.TimeUnit;
5155
import java.util.function.Function;
56+
import java.util.stream.Collectors;
5257

5358
import com.fasterxml.jackson.core.JsonParseException;
5459
import com.google.common.cache.CacheBuilder;
@@ -141,6 +146,60 @@ public List<ExternalAppInfo> getAppInfo(
141146
return appInfos;
142147
}
143148

149+
@Override
150+
public Map<String, NodeResource> getBatchResource(
151+
ResourceType resourceType,
152+
RMLabelContainer labelContainer,
153+
List<ExternalResourceIdentifier> identifiers)
154+
throws RMErrorException {
155+
ExternalResourceProvider provider = chooseProvider(resourceType, labelContainer);
156+
ExternalResourceRequester externalResourceRequester = getRequester(resourceType);
157+
158+
if (externalResourceRequester instanceof YarnResourceRequester) {
159+
YarnResourceRequester yarnRequester = (YarnResourceRequester) externalResourceRequester;
160+
List<String> queueNames =
161+
identifiers.stream()
162+
.map(id -> ((YarnResourceIdentifier) id).getQueueName())
163+
.collect(Collectors.toList());
164+
165+
Map<String, YarnQueueInfo> batchResources =
166+
(Map<String, YarnQueueInfo>)
167+
retry(
168+
RMConfiguration.EXTERNAL_RETRY_NUM.getValue(),
169+
(i) ->
170+
yarnRequester.getBatchResources(
171+
yarnRequester.getAndUpdateActiveRmWebAddress(provider),
172+
queueNames,
173+
provider),
174+
(i) -> yarnRequester.reloadExternalResourceAddress(provider));
175+
176+
Map<String, NodeResource> result = new HashMap<>();
177+
batchResources.forEach(
178+
(queueName, queueInfo) -> {
179+
CommonNodeResource nodeResource = new CommonNodeResource();
180+
nodeResource.setMaxResource(queueInfo.getMaxResource());
181+
nodeResource.setUsedResource(queueInfo.getUsedResource());
182+
nodeResource.setMaxApps(queueInfo.getMaxApps());
183+
nodeResource.setNumPendingApps(queueInfo.getNumPendingApps());
184+
nodeResource.setNumActiveApps(queueInfo.getNumActiveApps());
185+
result.put(queueName, nodeResource);
186+
});
187+
return result;
188+
} else {
189+
// For other resource types, fall back to individual requests
190+
Map<String, NodeResource> result = new HashMap<>();
191+
for (ExternalResourceIdentifier identifier : identifiers) {
192+
try {
193+
NodeResource resource = getResource(resourceType, labelContainer, identifier);
194+
result.put(((YarnResourceIdentifier) identifier).getQueueName(), resource);
195+
} catch (Exception e) {
196+
logger.error("Failed to get resource for identifier " + identifier, e);
197+
}
198+
}
199+
return result;
200+
}
201+
}
202+
144203
private Object retry(int retryNum, Function function, Function reloadExternalAddress)
145204
throws RMErrorException {
146205
int times = 0;

linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/external/yarn/YarnResourceRequester.java

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,31 @@ public YarnQueueInfo getResources(
218218
String queueName,
219219
ExternalResourceProvider provider) {
220220
JsonNode resp = getResponseByUrl("scheduler", rmWebAddress, provider);
221+
return getResourcesFromResponse(resp, realQueueName, queueName, provider);
222+
}
223+
224+
public Map<String, YarnQueueInfo> getBatchResources(
225+
String rmWebAddress, List<String> queueNames, ExternalResourceProvider provider) {
226+
JsonNode resp = getResponseByUrl("scheduler", rmWebAddress, provider);
227+
Map<String, YarnQueueInfo> queueInfoMap = new HashMap<>();
228+
for (String queueName : queueNames) {
229+
try {
230+
String realQueueName = queuePrefix + queueName;
231+
if (queueName.startsWith(queuePrefix)) {
232+
realQueueName = queueName;
233+
}
234+
YarnQueueInfo queueInfo =
235+
getResourcesFromResponse(resp, realQueueName, queueName, provider);
236+
queueInfoMap.put(queueName, queueInfo);
237+
} catch (Exception e) {
238+
logger.error("Failed to get resource for queue " + queueName, e);
239+
}
240+
}
241+
return queueInfoMap;
242+
}
243+
244+
private YarnQueueInfo getResourcesFromResponse(
245+
JsonNode resp, String realQueueName, String queueName, ExternalResourceProvider provider) {
221246
JsonNode schedulerInfo = resp.path("scheduler").path("schedulerInfo");
222247
String schedulerType = schedulerInfo.path("type").asText();
223248
if ("capacityScheduler".equals(schedulerType)) {
@@ -233,7 +258,8 @@ public YarnQueueInfo getResources(
233258
}
234259
JsonNode queueInfo = queue.get();
235260
return new YarnQueueInfo(
236-
maxEffectiveHandle(queue, rmWebAddress, queueName, provider).get(),
261+
maxEffectiveHandle(queue, getAndUpdateActiveRmWebAddress(provider), queueName, provider)
262+
.get(),
237263
getYarnResource(queue.map(node -> node.path("resourcesUsed")), queueName).get(),
238264
queueInfo.path("maxApps").asInt(),
239265
queueInfo.path("numPendingApps").asInt(),
@@ -323,7 +349,8 @@ public List<ExternalAppInfo> requestAppInfo(
323349
+ "&states="
324350
+ YarnAppState.RUNNING.getState()
325351
+ ","
326-
+ YarnAppState.ACCEPTED.getState();
352+
+ YarnAppState.ACCEPTED.getState()
353+
+ RMConfiguration.YARN_APPS_FILTER_PARMS.getValue();
327354
resp =
328355
getResponseByUrl("apps" + queryParams, rmWebAddress, provider).path("apps").path("app");
329356
} else {

linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/restful/RMMonitorRest.scala

Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ import org.apache.linkis.manager.persistence.{
5252
ResourceManagerPersistence
5353
}
5454
import org.apache.linkis.manager.rm.domain.RMLabelContainer
55+
import org.apache.linkis.manager.rm.external.domain.ExternalResourceIdentifier
5556
import org.apache.linkis.manager.rm.external.service.ExternalResourceService
5657
import org.apache.linkis.manager.rm.external.yarn.{YarnAppInfo, YarnResourceIdentifier}
5758
import org.apache.linkis.manager.rm.restful.vo.{UserCreatorEngineType, UserResourceVo}
@@ -574,6 +575,174 @@ class RMMonitorRest extends Logging {
574575
appendMessageData(message, "queues", clusters)
575576
}
576577

578+
@ApiOperation(value = "getBatchQueueResource", notes = "get batch queue resource")
579+
@RequestMapping(path = Array("batchqueueresources"), method = Array(RequestMethod.POST))
580+
def getBatchQueueResource(
581+
request: HttpServletRequest,
582+
@RequestBody param: util.Map[String, AnyRef]
583+
): Message = {
584+
ModuleUserUtils.getOperationUser(request, "getBatchQueueResource")
585+
val message = Message.ok("")
586+
val queueNamesParam = param.get("queueNames")
587+
if (queueNamesParam == null) {
588+
return Message.error("queueNames parameter is required")
589+
}
590+
val queueNames = queueNamesParam match {
591+
case list: java.util.List[_] =>
592+
list.asScala.map(_.toString.trim).filter(StringUtils.isNotBlank).toArray
593+
case array: Array[_] =>
594+
array.map(_.toString.trim).filter(StringUtils.isNotBlank)
595+
case _ =>
596+
return Message.error("queueNames parameter must be an array or comma-separated string")
597+
}
598+
if (queueNames.isEmpty) {
599+
return Message.error("queueNames parameter is empty")
600+
}
601+
var clustername = param.get("clustername").asInstanceOf[String]
602+
val crossCluster = java.lang.Boolean.parseBoolean(
603+
param.getOrDefault("crossCluster", "false").asInstanceOf[String]
604+
)
605+
if (crossCluster) {
606+
clustername = AMConfiguration.PRIORITY_CLUSTER_TARGET
607+
}
608+
val clusterLabel = labelFactory.createLabel(classOf[ClusterLabel])
609+
clusterLabel.setClusterName(clustername)
610+
clusterLabel.setClusterType(param.get("clustertype").asInstanceOf[String])
611+
val labelContainer = new RMLabelContainer(Lists.newArrayList(clusterLabel))
612+
val queueInfoMap = new mutable.HashMap[String, AnyRef]()
613+
614+
try {
615+
// Process queue names and create identifiers
616+
import java.util.ArrayList
617+
val identifiers = new ArrayList[ExternalResourceIdentifier]()
618+
queueNames.foreach { queueName =>
619+
var processedQueueName = queueName
620+
if (
621+
StringUtils.isNotBlank(processedQueueName) && processedQueueName.startsWith(queuePrefix)
622+
) {
623+
logger.info(
624+
"Queue name {} starts with '{}', remove '{}'",
625+
processedQueueName,
626+
queuePrefix,
627+
queuePrefix
628+
)
629+
processedQueueName = processedQueueName.substring(queuePrefix.length)
630+
}
631+
identifiers.add(new YarnResourceIdentifier(processedQueueName))
632+
}
633+
634+
// Use batch API to get all queue resources at once
635+
val batchResources =
636+
externalResourceService.getBatchResource(ResourceType.Yarn, labelContainer, identifiers)
637+
638+
// Process the results
639+
import scala.collection.JavaConverters._
640+
batchResources.asScala.foreach { case (queueName, nodeResource) =>
641+
(
642+
nodeResource.getMaxResource.asInstanceOf[YarnResource],
643+
nodeResource.getUsedResource.asInstanceOf[YarnResource]
644+
) match {
645+
case (maxResource, usedResource) =>
646+
val queueInfo = new mutable.HashMap[String, AnyRef]()
647+
queueInfo.put("queuename", maxResource)
648+
queueInfo.put(
649+
"maxResources",
650+
Map(
651+
"memory" -> maxResource.getQueueMemory,
652+
"cores" -> maxResource.getQueueCores
653+
).asJava
654+
)
655+
queueInfo.put(
656+
"usedResources",
657+
Map(
658+
"memory" -> usedResource.getQueueMemory,
659+
"cores" -> usedResource.getQueueCores
660+
).asJava
661+
)
662+
val usedMemoryPercentage = usedResource.getQueueMemory
663+
.asInstanceOf[Double] / maxResource.getQueueMemory.asInstanceOf[Double]
664+
val usedCPUPercentage = usedResource.getQueueCores
665+
.asInstanceOf[Double] / maxResource.getQueueCores.asInstanceOf[Double]
666+
queueInfo.put(
667+
"usedPercentage",
668+
Map("memory" -> usedMemoryPercentage, "cores" -> usedCPUPercentage).asJava
669+
)
670+
queueInfo.put("maxApps", nodeResource.getMaxApps.asInstanceOf[AnyRef])
671+
queueInfo.put("numActiveApps", nodeResource.getNumActiveApps.asInstanceOf[AnyRef])
672+
queueInfo.put("numPendingApps", nodeResource.getNumPendingApps.asInstanceOf[AnyRef])
673+
queueInfoMap.put(queueName, queueInfo.asJava)
674+
case _ =>
675+
logger.warn(s"Failed to get queue resource for $queueName")
676+
}
677+
}
678+
} catch {
679+
case e: Exception =>
680+
logger.error("Failed to get batch queue resources", e)
681+
// Fall back to individual requests if batch API fails
682+
queueNames.foreach { queueName =>
683+
try {
684+
var processedQueueName = queueName
685+
if (
686+
StringUtils
687+
.isNotBlank(processedQueueName) && processedQueueName.startsWith(queuePrefix)
688+
) {
689+
processedQueueName = processedQueueName.substring(queuePrefix.length)
690+
}
691+
val yarnIdentifier = new YarnResourceIdentifier(processedQueueName)
692+
val providedYarnResource =
693+
externalResourceService.getResource(ResourceType.Yarn, labelContainer, yarnIdentifier)
694+
(
695+
providedYarnResource.getMaxResource.asInstanceOf[YarnResource],
696+
providedYarnResource.getUsedResource.asInstanceOf[YarnResource]
697+
) match {
698+
case (maxResource, usedResource) =>
699+
val queueInfo = new mutable.HashMap[String, AnyRef]()
700+
queueInfo.put(
701+
"maxResources",
702+
Map(
703+
"memory" -> maxResource.getQueueMemory,
704+
"cores" -> maxResource.getQueueCores
705+
).asJava
706+
)
707+
queueInfo.put(
708+
"usedResources",
709+
Map(
710+
"memory" -> usedResource.getQueueMemory,
711+
"cores" -> usedResource.getQueueCores
712+
).asJava
713+
)
714+
val usedMemoryPercentage = usedResource.getQueueMemory
715+
.asInstanceOf[Double] / maxResource.getQueueMemory.asInstanceOf[Double]
716+
val usedCPUPercentage = usedResource.getQueueCores
717+
.asInstanceOf[Double] / maxResource.getQueueCores.asInstanceOf[Double]
718+
queueInfo.put(
719+
"usedPercentage",
720+
Map("memory" -> usedMemoryPercentage, "cores" -> usedCPUPercentage).asJava
721+
)
722+
queueInfo.put("maxApps", providedYarnResource.getMaxApps.asInstanceOf[AnyRef])
723+
queueInfo.put(
724+
"numActiveApps",
725+
providedYarnResource.getNumActiveApps.asInstanceOf[AnyRef]
726+
)
727+
queueInfo.put(
728+
"numPendingApps",
729+
providedYarnResource.getNumPendingApps.asInstanceOf[AnyRef]
730+
)
731+
queueInfoMap.put(queueName, queueInfo.asJava)
732+
case _ =>
733+
logger.warn(s"Failed to get queue resource for $queueName")
734+
}
735+
} catch {
736+
case ex: Exception =>
737+
logger.error(s"Failed to get queue resource for $queueName", ex)
738+
}
739+
}
740+
}
741+
742+
appendMessageData(message, "queueInfos", queueInfoMap.asJava)
743+
message
744+
}
745+
577746
private def getEngineNodesByUserList(
578747
userList: List[String],
579748
withResource: Boolean = false

linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/conf/RMConfiguration.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,4 +91,9 @@ public class RMConfiguration {
9191

9292
public static final CommonVars<Boolean> YARN_APPS_FILTER_ENABLED =
9393
CommonVars.apply("wds.linkis.rm.yarn.apps.filter.enabled", true);
94+
95+
public static final CommonVars<String> YARN_APPS_FILTER_PARMS =
96+
CommonVars.apply(
97+
"wds.linkis.rm.yarn.apps.filter.parms",
98+
"&deSelects=resourceRequests,timeouts,appNodeLabelExpression,amNodeLabelExpression,resourceInfo");
9499
}

0 commit comments

Comments
 (0)