|
18 | 18 | package org.apache.linkis.manager.rm.external.service.impl; |
19 | 19 |
|
20 | 20 | import org.apache.linkis.manager.common.conf.RMConfiguration; |
| 21 | +import org.apache.linkis.manager.common.entity.resource.CommonNodeResource; |
21 | 22 | import org.apache.linkis.manager.common.entity.resource.NodeResource; |
22 | 23 | import org.apache.linkis.manager.common.entity.resource.ResourceType; |
23 | 24 | import org.apache.linkis.manager.common.exception.RMErrorException; |
|
33 | 34 | import org.apache.linkis.manager.rm.external.parser.YarnResourceIdentifierParser; |
34 | 35 | import org.apache.linkis.manager.rm.external.request.ExternalResourceRequester; |
35 | 36 | import org.apache.linkis.manager.rm.external.service.ExternalResourceService; |
| 37 | +import org.apache.linkis.manager.rm.external.yarn.YarnQueueInfo; |
| 38 | +import org.apache.linkis.manager.rm.external.yarn.YarnResourceIdentifier; |
36 | 39 | import org.apache.linkis.manager.rm.external.yarn.YarnResourceRequester; |
37 | 40 | import org.apache.linkis.manager.rm.utils.RMUtils; |
38 | 41 |
|
|
44 | 47 |
|
45 | 48 | import java.net.ConnectException; |
46 | 49 | import java.text.MessageFormat; |
| 50 | +import java.util.HashMap; |
47 | 51 | import java.util.List; |
48 | 52 | import java.util.Map; |
49 | 53 | import java.util.concurrent.ExecutionException; |
50 | 54 | import java.util.concurrent.TimeUnit; |
51 | 55 | import java.util.function.Function; |
| 56 | +import java.util.stream.Collectors; |
52 | 57 |
|
53 | 58 | import com.fasterxml.jackson.core.JsonParseException; |
54 | 59 | import com.google.common.cache.CacheBuilder; |
@@ -141,6 +146,60 @@ public List<ExternalAppInfo> getAppInfo( |
141 | 146 | return appInfos; |
142 | 147 | } |
143 | 148 |
|
| 149 | + @Override |
| 150 | + public Map<String, NodeResource> getBatchResource( |
| 151 | + ResourceType resourceType, |
| 152 | + RMLabelContainer labelContainer, |
| 153 | + List<ExternalResourceIdentifier> identifiers) |
| 154 | + throws RMErrorException { |
| 155 | + ExternalResourceProvider provider = chooseProvider(resourceType, labelContainer); |
| 156 | + ExternalResourceRequester externalResourceRequester = getRequester(resourceType); |
| 157 | + |
| 158 | + if (externalResourceRequester instanceof YarnResourceRequester) { |
| 159 | + YarnResourceRequester yarnRequester = (YarnResourceRequester) externalResourceRequester; |
| 160 | + List<String> queueNames = |
| 161 | + identifiers.stream() |
| 162 | + .map(id -> ((YarnResourceIdentifier) id).getQueueName()) |
| 163 | + .collect(Collectors.toList()); |
| 164 | + |
| 165 | + Map<String, YarnQueueInfo> batchResources = |
| 166 | + (Map<String, YarnQueueInfo>) |
| 167 | + retry( |
| 168 | + RMConfiguration.EXTERNAL_RETRY_NUM.getValue(), |
| 169 | + (i) -> |
| 170 | + yarnRequester.getBatchResources( |
| 171 | + yarnRequester.getAndUpdateActiveRmWebAddress(provider), |
| 172 | + queueNames, |
| 173 | + provider), |
| 174 | + (i) -> yarnRequester.reloadExternalResourceAddress(provider)); |
| 175 | + |
| 176 | + Map<String, NodeResource> result = new HashMap<>(); |
| 177 | + batchResources.forEach( |
| 178 | + (queueName, queueInfo) -> { |
| 179 | + CommonNodeResource nodeResource = new CommonNodeResource(); |
| 180 | + nodeResource.setMaxResource(queueInfo.getMaxResource()); |
| 181 | + nodeResource.setUsedResource(queueInfo.getUsedResource()); |
| 182 | + nodeResource.setMaxApps(queueInfo.getMaxApps()); |
| 183 | + nodeResource.setNumPendingApps(queueInfo.getNumPendingApps()); |
| 184 | + nodeResource.setNumActiveApps(queueInfo.getNumActiveApps()); |
| 185 | + result.put(queueName, nodeResource); |
| 186 | + }); |
| 187 | + return result; |
| 188 | + } else { |
| 189 | + // For other resource types, fall back to individual requests |
| 190 | + Map<String, NodeResource> result = new HashMap<>(); |
| 191 | + for (ExternalResourceIdentifier identifier : identifiers) { |
| 192 | + try { |
| 193 | + NodeResource resource = getResource(resourceType, labelContainer, identifier); |
| 194 | + result.put(((YarnResourceIdentifier) identifier).getQueueName(), resource); |
| 195 | + } catch (Exception e) { |
| 196 | + logger.error("Failed to get resource for identifier " + identifier, e); |
| 197 | + } |
| 198 | + } |
| 199 | + return result; |
| 200 | + } |
| 201 | + } |
| 202 | + |
144 | 203 | private Object retry(int retryNum, Function function, Function reloadExternalAddress) |
145 | 204 | throws RMErrorException { |
146 | 205 | int times = 0; |
|
0 commit comments