Skip to content

Commit 64f58f1

Browse files
committed
#AI commit# perf: 优化引擎创建和复用服务性能
- 在 AMUtils 中引入复用线程池,将线程数设置为 5 来提升指标更新效率 - 修改 DefaultEngineAskEngineService 中的条件逻辑结构,优化引擎复用节点的处理流程 - 将 DefaultEngineCreateService 中的日志级别从 info 提升到 warn 并添加异常堆栈信息 - 在 DefaultEngineCreateService 和 DefaultEngineReuseService 中添加空值检查来避免 NPE - 使用 tryCatch 包装指标更新逻辑以提高系统稳定性 - 从引擎复用过滤条件中移除节点状态解锁检查以提升复用率
1 parent 8d1ade2 commit 64f58f1

4 files changed

Lines changed: 96 additions & 60 deletions

File tree

linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineAskEngineService.scala

Lines changed: 41 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -247,37 +247,50 @@ class DefaultEngineAskEngineService
247247
null
248248
}
249249
}
250-
251-
val engineCreateRequest = new EngineCreateRequest
252-
engineCreateRequest.setLabels(engineAskRequest.getLabels)
253-
engineCreateRequest.setTimeout(engineAskRequest.getTimeOut)
254-
engineCreateRequest.setUser(engineAskRequest.getUser)
255-
engineCreateRequest.setProperties(engineAskRequest.getProperties)
256-
engineCreateRequest.setCreateService(engineAskRequest.getCreateService)
257-
258-
val createNode = engineCreateService.createEngine(engineCreateRequest, sender)
259-
val timeout =
260-
if (engineCreateRequest.getTimeout <= 0) {
261-
AMConfiguration.ENGINE_START_MAX_TIME.getValue.toLong
262-
} else engineCreateRequest.getTimeout
263-
// UseEngine requires a timeout (useEngine 需要加上超时)
264-
val createEngineNode = getEngineNodeManager.useEngine(createNode, timeout)
265-
if (null == createEngineNode) {
266-
throw new LinkisRetryException(
267-
AMConstant.EM_ERROR_CODE,
268-
s"create engine${createNode.getServiceInstance} success, but to use engine failed"
269-
)
270-
}
271-
logger.info(
272-
s"Task: $taskId finished to ask engine for user ${engineAskRequest.getUser} by create node $createEngineNode"
273-
)
274-
if (null != sender) {
275-
sender.send(EngineCreateSuccess(engineAskAsyncId, createEngineNode))
250+
if (reuseNode != null) {
276251
logger.info(
277-
s"Task: $taskId has sent EngineCreateSuccess($engineAskAsyncId, reuse=false) to Entrance."
252+
s"Task: $taskId finished to ask engine for user ${engineAskRequest.getUser} by reuse node $reuseNode"
278253
)
254+
if (null != sender) {
255+
sender.send(EngineCreateSuccess(engineAskAsyncId, reuseNode, true))
256+
logger.info(
257+
s"Task: $taskId has sent EngineCreateSuccess($engineAskAsyncId, reuse=true) to Entrance."
258+
)
259+
} else {
260+
logger.warn(f"Task: $taskId will not send async using null sender.")
261+
}
279262
} else {
280-
logger.warn(s"Task: $taskId will not send async using null sender.")
263+
val engineCreateRequest = new EngineCreateRequest
264+
engineCreateRequest.setLabels(engineAskRequest.getLabels)
265+
engineCreateRequest.setTimeout(engineAskRequest.getTimeOut)
266+
engineCreateRequest.setUser(engineAskRequest.getUser)
267+
engineCreateRequest.setProperties(engineAskRequest.getProperties)
268+
engineCreateRequest.setCreateService(engineAskRequest.getCreateService)
269+
270+
val createNode = engineCreateService.createEngine(engineCreateRequest, sender)
271+
val timeout =
272+
if (engineCreateRequest.getTimeout <= 0) {
273+
AMConfiguration.ENGINE_START_MAX_TIME.getValue.toLong
274+
} else engineCreateRequest.getTimeout
275+
// UseEngine requires a timeout (useEngine 需要加上超时)
276+
val createEngineNode = getEngineNodeManager.useEngine(createNode, timeout)
277+
if (null == createEngineNode) {
278+
throw new LinkisRetryException(
279+
AMConstant.EM_ERROR_CODE,
280+
s"create engine${createNode.getServiceInstance} success, but to use engine failed"
281+
)
282+
}
283+
logger.info(
284+
s"Task: $taskId finished to ask engine for user ${engineAskRequest.getUser} by create node $createEngineNode"
285+
)
286+
if (null != sender) {
287+
sender.send(EngineCreateSuccess(engineAskAsyncId, createEngineNode))
288+
logger.info(
289+
s"Task: $taskId has sent EngineCreateSuccess($engineAskAsyncId, reuse=false) to Entrance."
290+
)
291+
} else {
292+
logger.warn(s"Task: $taskId will not send async using null sender.")
293+
}
281294
}
282295
} {
283296
Utils.tryAndWarn {

linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineCreateService.scala

Lines changed: 24 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -236,10 +236,10 @@ class DefaultEngineCreateService
236236

237237
val engineNode = Utils.tryCatch(getEMService().createEngine(engineBuildRequest, emNode)) {
238238
case t: Throwable =>
239-
logger.info(s"Failed to create ec($resourceTicketId) ask ecm ${emNode.getServiceInstance}")
239+
logger.warn(s"Failed to create ec($resourceTicketId) ask ecm ${emNode.getServiceInstance}", t)
240240
val failedEcNode = getEngineNodeManager.getEngineNode(oldServiceInstance)
241241
if (null == failedEcNode) {
242-
logger.info(s" engineConn does not exist in db: $oldServiceInstance ")
242+
logger.warn(s" engineConn does not exist in db: $oldServiceInstance ")
243243
} else {
244244
failedEcNode.setLabels(nodeLabelService.getNodeLabels(oldServiceInstance))
245245
failedEcNode.getLabels.addAll(
@@ -289,18 +289,28 @@ class DefaultEngineCreateService
289289
s"Failed to update engineNode: ${t.getMessage}"
290290
)
291291
}
292-
if (Configuration.METRICS_INCREMENTAL_UPDATE_ENABLE.getValue) {
293-
val emInstance = engineNode.getServiceInstance.getInstance
294-
val ecmInstance = engineNode.getEMNode.getServiceInstance.getInstance
295-
// 8. Update job history metrics after successful engine creation - 异步执行
296-
AMUtils.updateMetricsAsync(
297-
taskId,
298-
resourceTicketId,
299-
emInstance,
300-
ecmInstance,
301-
null,
302-
isReuse = false
303-
)
292+
Utils.tryCatch {
293+
if (Configuration.METRICS_INCREMENTAL_UPDATE_ENABLE.getValue) {
294+
val emInstance = engineNode.getServiceInstance.getInstance
295+
val ecmInstance = engineNode.getEMNode.getServiceInstance.getInstance
296+
if ((null != emInstance) && (null != ecmInstance)) {
297+
// 8. Update job history metrics after successful engine creation - 异步执行
298+
AMUtils.updateMetricsAsync(
299+
taskId,
300+
resourceTicketId,
301+
emInstance,
302+
ecmInstance,
303+
null,
304+
isReuse = false
305+
)
306+
} else {
307+
logger.info(
308+
s"CreateEngine:Failed to update metrics for emInstance: $emInstance, ecmInstance: $ecmInstance"
309+
)
310+
}
311+
}
312+
} { case e: Exception =>
313+
logger.error(s"Failed to update metrics for taskId: $taskId", e)
304314
}
305315
// 9. Add the Label of EngineConn, and add the Alias of engineConn
306316
val engineConnAliasLabel = labelBuilderFactory.createLabel(classOf[AliasServiceInstanceLabel])

linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineReuseService.scala

Lines changed: 23 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,6 @@ class DefaultEngineReuseService extends AbstractEngineService with EngineReuseSe
236236
StringUtils.isNotBlank(templateName) && AMConfiguration.EC_REUSE_WITH_TEMPLATE_RULE_ENABLE
237237
) {
238238
engineScoreList = engineScoreList
239-
.filter(engine => engine.getNodeStatus == NodeStatus.Unlock)
240239
.filter(engine => {
241240
val oldTemplateName: String =
242241
getValueByKeyFromProps(confTemplateNameKey, parseParamsToMap(engine.getParams))
@@ -276,7 +275,6 @@ class DefaultEngineReuseService extends AbstractEngineService with EngineReuseSe
276275

277276
// 过滤掉资源不满足的引擎
278277
engineScoreList = engineScoreList
279-
.filter(engine => engine.getNodeStatus == NodeStatus.Unlock)
280278
.filter(engine => {
281279
val enginePythonVersion: String = getPythonVersion(parseParamsToMap(engine.getParams))
282280
var pythonVersionMatch: Boolean = true
@@ -383,20 +381,29 @@ class DefaultEngineReuseService extends AbstractEngineService with EngineReuseSe
383381
.toJson(engine) + " from engineLabelMap : " + AMUtils.GSON.toJson(instances)
384382
)
385383
}
386-
if (Configuration.METRICS_INCREMENTAL_UPDATE_ENABLE.getValue) {
387-
val engineNode =
388-
ecResourceInfoService.getECResourceInfoRecordByInstance(
389-
engine.getServiceInstance.getInstance
390-
)
391-
// 异步更新 metrics
392-
AMUtils.updateMetricsAsync(
393-
taskId,
394-
engineNode.getTicketId,
395-
engineNode.getServiceInstance,
396-
engineNode.getEcmInstance,
397-
engineNode.getLogDirSuffix,
398-
isReuse = true
399-
)
384+
Utils.tryCatch {
385+
if (Configuration.METRICS_INCREMENTAL_UPDATE_ENABLE.getValue) {
386+
val engineNode =
387+
ecResourceInfoService.getECResourceInfoRecordByInstance(
388+
engine.getServiceInstance.getInstance
389+
)
390+
if (null != engineNode) {
391+
// 异步更新 metrics
392+
AMUtils.updateMetricsAsync(
393+
taskId,
394+
engineNode.getTicketId,
395+
engineNode.getServiceInstance,
396+
engineNode.getEcmInstance,
397+
engineNode.getLogDirSuffix,
398+
isReuse = true
399+
)
400+
} else {
401+
logger.info(s"ReuseEngine:Failed to update metrics for engineNode: $engineNode")
402+
}
403+
404+
}
405+
} { case e: Exception =>
406+
logger.error(s"Failed to update metrics for taskId: $taskId", e)
400407
}
401408
engine
402409
}

linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/utils/AMUtils.scala

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,11 +44,16 @@ import java.io.File
4444
import java.util
4545

4646
import scala.collection.JavaConverters._
47+
import scala.concurrent.ExecutionContextExecutorService
4748

4849
import com.google.gson.JsonObject
4950

5051
object AMUtils extends Logging {
5152

53+
// 优化:线程池复用,线程数设置为5
54+
private implicit val updateMetricsExecutor: ExecutionContextExecutorService =
55+
Utils.newCachedExecutionContext(5, "UpdateMetrics-Thread-")
56+
5257
lazy val GSON = BDPJettyServerHelper.gson
5358

5459
private val SUCCESS_FLAG = 0
@@ -409,14 +414,15 @@ object AMUtils extends Logging {
409414
import scala.concurrent.Future
410415
import scala.util.{Failure, Success}
411416

417+
// 优化:使用复用的线程池,线程数设置为5
412418
Future {
413419
updateMetrics(taskId, resourceTicketId, emInstance, ecmInstance, engineLogPath, isReuse)
414-
}(Utils.newCachedExecutionContext(1, "UpdateMetrics-Thread-")).onComplete {
420+
}(updateMetricsExecutor).onComplete {
415421
case Success(_) =>
416422
logger.debug(s"Task: $taskId metrics update completed successfully for engine: $emInstance")
417423
case Failure(t) =>
418424
logger.warn(s"Task: $taskId metrics update failed for engine: $emInstance", t)
419-
}(Utils.newCachedExecutionContext(1, "UpdateMetrics-Thread-"))
425+
}(updateMetricsExecutor)
420426
}
421427

422428
}

0 commit comments

Comments
 (0)