Skip to content

Commit 5160c8b

Browse files
committed
#AI commit# feat: 实现starrocks任务重试机制
- 引入ComputationEngineUtils工具类,提供向entrance发送消息的功能 - 添加配置项支持上下文语句和JDBC SET语句的错误索引调整 - 实现任务重试逻辑,支持JDBC和aiSQL类型的断点续跑 - 优化结果集别名数量管理,确保重试时状态正确恢复 - 新增TaskRetryInterceptor拦截器,动态为任务添加重试开关 - 增强错误处理机制,支持多种错误场景的正则匹配重试 - 修复任务重试时结果目录重复创建的问题 - 扩展ResponseTaskStatusWithExecuteCodeIndex协议,增加aliasNum字段 - 优化JDBC任务重试时的SQL索引调整逻辑,处理SET语句场景 - 增加StarRocks引擎类型支持和相关配置选项
1 parent 8d1ade2 commit 5160c8b

17 files changed

Lines changed: 438 additions & 67 deletions

File tree

linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/conf/Configuration.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,12 @@ object Configuration extends Logging {
104104
val METRICS_INCREMENTAL_UPDATE_ENABLE =
105105
CommonVars[Boolean]("linkis.jobhistory.metrics.incremental.update.enable", false)
106106

107+
val EXECUTE_ERROR_CODE_INDEX =
108+
CommonVars("execute.error.code.index", "-1")
109+
110+
val EXECUTE_RESULTSET_ALIAS_NUM =
111+
CommonVars("execute.resultset.alias.num", "0")
112+
107113
val GLOBAL_CONF_CHN_NAME = "全局设置"
108114

109115
val GLOBAL_CONF_CHN_OLDNAME = "通用设置"

linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/protocol/task/ResponseTaskExecute.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,10 +52,14 @@ case class ResponseTaskStatus(execId: String, status: ExecutionNodeStatus)
5252
class ResponseTaskStatusWithExecuteCodeIndex(
5353
execId: String,
5454
status: ExecutionNodeStatus,
55-
private var _errorIndex: Int = -1
55+
private var _errorIndex: Int = -1,
56+
private var _aliasNum: Int = 0 // 新增:aliasNum字段
5657
) extends ResponseTaskStatus(execId, status) {
5758
def errorIndex: Int = _errorIndex
5859
def errorIndex_=(value: Int): Unit = _errorIndex = value
60+
// 新增:aliasNum的getter和setter
61+
def aliasNum: Int = _aliasNum
62+
def aliasNum_=(value: Int): Unit = _aliasNum = value
5963
}
6064

6165
case class ResponseTaskResultSet(execId: String, output: String, alias: String)

linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/conf/ComputationExecutorConf.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,4 +155,16 @@ object ComputationExecutorConf {
155155
val SUPPORT_PARTIAL_RETRY_FOR_FAILED_TASKS_ENABLED: Boolean =
156156
CommonVars[Boolean]("linkis.partial.retry.for.failed.task.enabled", false).getValue
157157

158+
val CONTEXT_STATEMENT_PREFIXES = CommonVars(
159+
"linkis.engineconn.context.statement.prefixes",
160+
"USE ,SET ,ALTER SESSION ,SET ROLE ,SET SCHEMA ",
161+
"SQL context statement prefixes for partial retry"
162+
)
163+
164+
val JDBC_SET_STATEMENT_PREFIXES = CommonVars(
165+
"linkis.engineconn.jdbc.set.statement.prefixes",
166+
"SET QUERY_TIMEOUT,SET QUERY_QUEUE_PENDING_TIMEOUT,SET NEW_PLANNER",
167+
"JDBC SET statement prefixes for error index adjustment"
168+
)
169+
158170
}

linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/execute/ComputationExecutor.scala

Lines changed: 125 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.linkis.engineconn.computation.executor.execute
1919

2020
import org.apache.linkis.DataWorkCloudApplication
21+
import org.apache.linkis.common.conf.Configuration
2122
import org.apache.linkis.common.log.LogUtils
2223
import org.apache.linkis.common.utils.{Logging, Utils}
2324
import org.apache.linkis.engineconn.acessible.executor.entity.AccessibleExecutor
@@ -35,16 +36,22 @@ import org.apache.linkis.engineconn.computation.executor.exception.HookExecuteEx
3536
import org.apache.linkis.engineconn.computation.executor.hook.ComputationExecutorHook
3637
import org.apache.linkis.engineconn.computation.executor.metrics.ComputationEngineConnMetrics
3738
import org.apache.linkis.engineconn.computation.executor.upstream.event.TaskStatusChangedForUpstreamMonitorEvent
39+
import org.apache.linkis.engineconn.computation.executor.utlis.ComputationEngineUtils
3840
import org.apache.linkis.engineconn.core.EngineConnObject
3941
import org.apache.linkis.engineconn.core.executor.ExecutorManager
4042
import org.apache.linkis.engineconn.executor.entity.{LabelExecutor, ResourceExecutor}
4143
import org.apache.linkis.engineconn.executor.listener.ExecutorListenerBusContext
4244
import org.apache.linkis.governance.common.entity.ExecutionNodeStatus
4345
import org.apache.linkis.governance.common.paser.CodeParser
44-
import org.apache.linkis.governance.common.protocol.task.{EngineConcurrentInfo, RequestTask}
46+
import org.apache.linkis.governance.common.protocol.task.{
47+
EngineConcurrentInfo,
48+
RequestTask,
49+
ResponseTaskError,
50+
ResponseTaskStatusWithExecuteCodeIndex
51+
}
4552
import org.apache.linkis.governance.common.utils.{JobUtils, LoggerUtils}
4653
import org.apache.linkis.manager.common.entity.enumeration.NodeStatus
47-
import org.apache.linkis.manager.label.entity.engine.{EngineType, UserCreatorLabel}
54+
import org.apache.linkis.manager.label.entity.engine.{EngineType, EngineTypeLabel, UserCreatorLabel}
4855
import org.apache.linkis.manager.label.utils.LabelUtil
4956
import org.apache.linkis.protocol.engine.JobProgressInfo
5057
import org.apache.linkis.scheduler.executer._
@@ -264,17 +271,40 @@ abstract class ComputationExecutor(val outputPrintLimit: Int = 1000)
264271
}
265272
var executeFlag = true
266273
val errorIndex: Int = Integer.valueOf(
267-
engineConnTask.getProperties.getOrDefault("execute.error.code.index", "-1").toString
274+
engineConnTask.getProperties
275+
.getOrDefault(Configuration.EXECUTE_ERROR_CODE_INDEX.key, "-1")
276+
.toString
268277
)
269-
engineExecutionContext.getProperties.put("execute.error.code.index", errorIndex.toString)
278+
engineExecutionContext.getProperties
279+
.put(Configuration.EXECUTE_ERROR_CODE_INDEX.key, errorIndex.toString)
280+
// jdbc执行任务重试,如果sql有被set进sql,会导致sql的index错位,这里会将日志打印的index进行减一,保证用户看的index是正常的,然后重试的errorIndex需要加一,保证重试的位置是正确的
281+
var newIndex = index
282+
var newErrorIndex = errorIndex
283+
if (adjustErrorIndexForSetScenarios(engineConnTask)) {
284+
newIndex = index - 1
285+
newErrorIndex = errorIndex + 1
286+
}
270287
// 重试的时候如果执行过则跳过执行
271-
if (retryEnable && errorIndex > 0 && index < errorIndex) {
272-
engineExecutionContext.appendStdout(
273-
LogUtils.generateInfo(
274-
s"aisql retry with errorIndex: ${errorIndex}, current sql index: ${index} will skip."
288+
if (retryEnable && errorIndex > 0 && index < newErrorIndex) {
289+
val code = codes(index).trim.toUpperCase()
290+
val shouldSkip = !isContextStatement(code)
291+
292+
if (shouldSkip) {
293+
engineExecutionContext.appendStdout(
294+
LogUtils.generateInfo(
295+
s"task retry with errorIndex: ${errorIndex}, current sql index: ${newIndex} will skip."
296+
)
275297
)
276-
)
277-
executeFlag = false
298+
executeFlag = false
299+
} else {
300+
if (newIndex >= 0) {
301+
engineExecutionContext.appendStdout(
302+
LogUtils.generateInfo(
303+
s"task retry with errorIndex: ${errorIndex}, current sql index: ${newIndex} is a context statement, will execute."
304+
)
305+
)
306+
}
307+
}
278308
}
279309
if (executeFlag) {
280310
val code = codes(index)
@@ -289,21 +319,23 @@ abstract class ComputationExecutor(val outputPrintLimit: Int = 1000)
289319
response match {
290320
case e: ErrorExecuteResponse =>
291321
val props: util.Map[String, String] = engineCreationContext.getOptions
292-
val aiSqlEnable: String = props.getOrDefault("linkis.ai.sql.enable", "false").toString
322+
val taskRetry: String =
323+
props.getOrDefault("linkis.task.retry.switch", "false").toString
293324
val retryNum: Int =
294-
Integer.valueOf(props.getOrDefault("linkis.ai.retry.num", "0").toString)
325+
Integer.valueOf(props.getOrDefault("linkis.task.retry.num", "0").toString)
295326

296-
if (retryEnable && !props.isEmpty && "true".equals(aiSqlEnable) && retryNum > 0) {
327+
if (retryEnable && !props.isEmpty && "true".equals(taskRetry) && retryNum > 0) {
297328
logger.info(
298-
s"aisql execute failed, with index: ${index} retryNum: ${retryNum}, and will retry",
329+
s"task execute failed, with index: ${index} retryNum: ${retryNum}, and will retry",
299330
e.t
300331
)
301332
engineExecutionContext.appendStdout(
302333
LogUtils.generateInfo(
303-
s"aisql execute failed, with index: ${index} retryNum: ${retryNum}, and will retry"
334+
s"task execute failed, with index: ${index} retryNum: ${retryNum}, and will retry"
304335
)
305336
)
306-
engineConnTask.getProperties.put("execute.error.code.index", index.toString)
337+
engineConnTask.getProperties
338+
.put(Configuration.EXECUTE_ERROR_CODE_INDEX.key, index.toString)
307339
return ErrorRetryExecuteResponse(e.message, index, e.t)
308340
} else {
309341
failedTasks.increase()
@@ -320,7 +352,14 @@ abstract class ComputationExecutor(val outputPrintLimit: Int = 1000)
320352
e.getOutput.substring(0, outputPrintLimit)
321353
} else e.getOutput
322354
engineExecutionContext.appendStdout(output)
323-
if (StringUtils.isNotBlank(e.getOutput)) engineExecutionContext.sendResultSet(e)
355+
if (StringUtils.isNotBlank(e.getOutput)) {
356+
engineConnTask.getProperties
357+
.put(
358+
Configuration.EXECUTE_RESULTSET_ALIAS_NUM.key,
359+
engineExecutionContext.getAliasNum.toString
360+
)
361+
engineExecutionContext.sendResultSet(e)
362+
}
324363
case _: IncompleteExecuteResponse =>
325364
incomplete ++= incompleteSplitter
326365
}
@@ -369,6 +408,31 @@ abstract class ComputationExecutor(val outputPrintLimit: Int = 1000)
369408
executeResponse match {
370409
case successExecuteResponse: SuccessExecuteResponse =>
371410
transformTaskStatus(engineConnTask, ExecutionNodeStatus.Succeed)
411+
case ErrorRetryExecuteResponse(message, index, throwable) =>
412+
ComputationEngineUtils.sendToEntrance(
413+
engineConnTask,
414+
ResponseTaskError(engineConnTask.getTaskId, message)
415+
)
416+
logger.warn(
417+
s"The task begins executing retries,jobId:${engineConnTask.getTaskId},index:${index} ,message:${message}",
418+
throwable
419+
)
420+
421+
val currentAliasNum = Integer.valueOf(
422+
engineConnTask.getProperties
423+
.getOrDefault(Configuration.EXECUTE_RESULTSET_ALIAS_NUM.key, "0")
424+
.toString
425+
)
426+
427+
ComputationEngineUtils.sendToEntrance(
428+
engineConnTask,
429+
new ResponseTaskStatusWithExecuteCodeIndex(
430+
engineConnTask.getTaskId,
431+
ExecutionNodeStatus.Failed,
432+
index,
433+
currentAliasNum
434+
)
435+
)
372436
case errorExecuteResponse: ErrorExecuteResponse =>
373437
listenerBusContext.getEngineConnSyncListenerBus.postToAll(
374438
TaskResponseErrorEvent(engineConnTask.getTaskId, errorExecuteResponse.message)
@@ -404,6 +468,22 @@ abstract class ComputationExecutor(val outputPrintLimit: Int = 1000)
404468

405469
def getProgressInfo(taskID: String): Array[JobProgressInfo]
406470

471+
/**
472+
* 调整错误索引:直接匹配三种SET语句场景 因为SET语句会被解析器视为第一条SQL
473+
*/
474+
protected def adjustErrorIndexForSetScenarios(engineConnTask: EngineConnTask): Boolean = {
475+
val executionCode = engineConnTask.getCode
476+
val engineTypeLabel = engineConnTask.getLables.find(_.isInstanceOf[EngineTypeLabel]).get
477+
val engineType = engineTypeLabel.asInstanceOf[EngineTypeLabel].getEngineType
478+
var result = false
479+
if (executionCode != null && engineType.equals(EngineType.JDBC.toString)) {
480+
val upperCode = executionCode.toUpperCase().trim
481+
val jdbcSetPrefixes = ComputationExecutorConf.JDBC_SET_STATEMENT_PREFIXES.getValue.split(",")
482+
result = jdbcSetPrefixes.exists(upperCode.startsWith)
483+
}
484+
result
485+
}
486+
407487
protected def createEngineExecutionContext(
408488
engineConnTask: EngineConnTask
409489
): EngineExecutionContext = {
@@ -427,6 +507,22 @@ abstract class ComputationExecutor(val outputPrintLimit: Int = 1000)
427507
engineExecutionContext.setJobId(engineConnTask.getTaskId)
428508
engineExecutionContext.getProperties.putAll(engineConnTask.getProperties)
429509
engineExecutionContext.setLabels(engineConnTask.getLables)
510+
511+
val errorIndex: Int = Integer.valueOf(
512+
engineConnTask.getProperties
513+
.getOrDefault(Configuration.EXECUTE_ERROR_CODE_INDEX.key, "-1")
514+
.toString
515+
)
516+
if (errorIndex > 0) {
517+
val savedAliasNum = Integer.valueOf(
518+
engineConnTask.getProperties
519+
.getOrDefault(Configuration.EXECUTE_RESULTSET_ALIAS_NUM.key, "0")
520+
.toString
521+
)
522+
engineExecutionContext.setResultSetNum(savedAliasNum)
523+
logger.info(s"Restore aliasNum to $savedAliasNum for retry task")
524+
}
525+
430526
engineExecutionContext
431527
}
432528

@@ -439,6 +535,18 @@ abstract class ComputationExecutor(val outputPrintLimit: Int = 1000)
439535
}
440536
}
441537

538+
/**
539+
* 判断是否为上下文语句,重试时需要保留执行
540+
*
541+
* @param code
542+
* SQL代码(已转换为大写并去除首尾空格)
543+
* @return
544+
* true表示是上下文语句,false表示不是
545+
*/
546+
private def isContextStatement(code: String): Boolean = {
547+
ComputationExecutorConf.CONTEXT_STATEMENT_PREFIXES.getValue.split(",").exists(code.startsWith)
548+
}
549+
442550
/**
443551
* job task log print task params info
444552
*

linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/service/TaskExecutionServiceImpl.scala

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -249,17 +249,6 @@ class TaskExecutionServiceImpl
249249
sendToEntrance(task, ResponseTaskError(task.getTaskId, message))
250250
logger.error(message, throwable)
251251
sendToEntrance(task, ResponseTaskStatus(task.getTaskId, ExecutionNodeStatus.Failed))
252-
case ErrorRetryExecuteResponse(message, index, throwable) =>
253-
sendToEntrance(task, ResponseTaskError(task.getTaskId, message))
254-
logger.error(message, throwable)
255-
sendToEntrance(
256-
task,
257-
new ResponseTaskStatusWithExecuteCodeIndex(
258-
task.getTaskId,
259-
ExecutionNodeStatus.Failed,
260-
index
261-
)
262-
)
263252
case _ =>
264253
}
265254
LoggerUtils.removeJobIdMDC()

linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/utlis/ComputationEngineUtils.scala

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,19 @@
1717

1818
package org.apache.linkis.engineconn.computation.executor.utlis
1919

20+
import org.apache.linkis.common.utils.{Logging, Utils}
21+
import org.apache.linkis.engineconn.computation.executor.entity.EngineConnTask
22+
import org.apache.linkis.governance.common.exception.engineconn.{
23+
EngineConnExecutorErrorCode,
24+
EngineConnExecutorErrorException
25+
}
26+
import org.apache.linkis.protocol.message.RequestProtocol
27+
import org.apache.linkis.rpc.Sender
2028
import org.apache.linkis.server.BDPJettyServerHelper
2129

2230
import com.google.gson.{Gson, GsonBuilder}
2331

24-
object ComputationEngineUtils {
32+
object ComputationEngineUtils extends Logging {
2533

2634
def GSON: Gson = BDPJettyServerHelper.gson
2735

@@ -30,4 +38,24 @@ object ComputationEngineUtils {
3038
private val WORK_DIR_STR = "user.dir"
3139
def getCurrentWorkDir: String = System.getProperty(WORK_DIR_STR)
3240

41+
def sendToEntrance(task: EngineConnTask, msg: RequestProtocol): Unit = {
42+
Utils.tryCatch {
43+
var sender: Sender = null
44+
if (null != task && null != task.getCallbackServiceInstance() && null != msg) {
45+
sender = Sender.getSender(task.getCallbackServiceInstance())
46+
sender.send(msg)
47+
} else {
48+
// todo
49+
logger.debug("SendtoEntrance error, cannot find entrance instance.")
50+
}
51+
} { t =>
52+
val errorMsg = s"SendToEntrance error. $msg" + t.getCause
53+
logger.error(errorMsg, t)
54+
throw new EngineConnExecutorErrorException(
55+
EngineConnExecutorErrorCode.SEND_TO_ENTRANCE_ERROR,
56+
errorMsg
57+
)
58+
}
59+
}
60+
3361
}

linkis-computation-governance/linkis-engineconn/linkis-engineconn-executor/executor-core/src/main/scala/org/apache/linkis/engineconn/executor/ExecutorExecutionContext.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,4 +127,6 @@ trait ExecutorExecutionContext {
127127

128128
def setResultSetNum(num: Int): Unit = aliasNum.set(num)
129129

130+
def getAliasNum: Int = aliasNum.get()
131+
130132
}

linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/conf/EntranceSpringConfiguration.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,8 @@ public EntranceInterceptor[] entranceInterceptors() {
151151
new SQLLimitEntranceInterceptor(),
152152
new CommentInterceptor(),
153153
new SetTenantLabelInterceptor(),
154-
new UserCreatorIPCheckInterceptor()
154+
new UserCreatorIPCheckInterceptor(),
155+
new TaskRetryInterceptor()
155156
};
156157
}
157158

linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/job/EntranceExecutionJob.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -157,9 +157,15 @@ public ExecuteRequest jobToExecuteRequest() throws EntranceErrorException {
157157
String resultSetPathRoot = GovernanceCommonConf.RESULT_SET_STORE_PATH().getValue(runtimeMapTmp);
158158

159159
if (!runtimeMapTmp.containsKey(GovernanceCommonConf.RESULT_SET_STORE_PATH().key())) {
160-
String resultParentPath = CommonLogPathUtils.getResultParentPath(jobRequest);
161-
CommonLogPathUtils.buildCommonPath(resultParentPath, true);
162-
resultSetPathRoot = CommonLogPathUtils.getResultPath(jobRequest);
160+
// 修复:任务重试背景下,10:59分提交任务执行,重试时时间变成11:00,重试任务会重新生成结果目录,导致查询结果集时,重试之前执行的结果集丢失
161+
// 新增判断:生成结果目录之前,判断任务之前是否生成结果集,生成过就复用
162+
if (org.apache.commons.lang3.StringUtils.isNotEmpty(jobRequest.getResultLocation())) {
163+
resultSetPathRoot = jobRequest.getResultLocation();
164+
} else {
165+
String resultParentPath = CommonLogPathUtils.getResultParentPath(jobRequest);
166+
CommonLogPathUtils.buildCommonPath(resultParentPath, true);
167+
resultSetPathRoot = CommonLogPathUtils.getResultPath(jobRequest);
168+
}
163169
}
164170

165171
Map<String, Object> jobMap = new HashMap<String, Object>();

0 commit comments

Comments
 (0)