From 5dfd654ef2424b51af0592aa0db0dcca5de17977 Mon Sep 17 00:00:00 2001 From: MoeexT Date: Sat, 28 Feb 2026 14:28:51 +0800 Subject: [PATCH 1/3] :sparkles: support retry task --- frontend/src/i18n/locales/en/common.json | 2 + frontend/src/i18n/locales/zh/common.json | 2 + .../DataCollection/Home/TaskManagement.tsx | 90 +++++++++++++++---- .../module/collection/client/datax_client.py | 6 ++ .../module/collection/interface/collection.py | 23 +++++ .../module/collection/service/collection.py | 4 + 6 files changed, 112 insertions(+), 15 deletions(-) diff --git a/frontend/src/i18n/locales/en/common.json b/frontend/src/i18n/locales/en/common.json index 2e0261a18..d9514d7f8 100644 --- a/frontend/src/i18n/locales/en/common.json +++ b/frontend/src/i18n/locales/en/common.json @@ -82,12 +82,14 @@ "actions": { "start": "Start", "stop": "Stop", + "retry": "Retry", "executionRecords": "Execution Records", "delete": "Delete" }, "messages": { "startSuccess": "Task start request sent", "stopSuccess": "Task stop request sent", + "retrySuccess": "Task retry request sent", "deleteSuccess": "Task deleted", "deleteConfirm": "Are you sure you want to delete this task? This action cannot be undone.", "deleteConfirmMessage": "Are you sure you want to delete task \"{{taskName}}\"? This action cannot be undone.", diff --git a/frontend/src/i18n/locales/zh/common.json b/frontend/src/i18n/locales/zh/common.json index fa200d1b0..5ce0728b9 100644 --- a/frontend/src/i18n/locales/zh/common.json +++ b/frontend/src/i18n/locales/zh/common.json @@ -82,12 +82,14 @@ "actions": { "start": "启动", "stop": "停止", + "retry": "重试", "executionRecords": "执行记录", "delete": "删除" }, "messages": { "startSuccess": "任务启动请求已发送", "stopSuccess": "任务停止请求已发送", + "retrySuccess": "任务重试请求已发送", "deleteSuccess": "任务已删除", "deleteConfirm": "确定要删除该任务吗?此操作不可撤销。", "deleteConfirmMessage": "确定要删除任务「{{taskName}}」吗?删除后将无法恢复。", diff --git a/frontend/src/pages/DataCollection/Home/TaskManagement.tsx b/frontend/src/pages/DataCollection/Home/TaskManagement.tsx index 46ece2353..d3b0f75ae 100644 --- a/frontend/src/pages/DataCollection/Home/TaskManagement.tsx +++ b/frontend/src/pages/DataCollection/Home/TaskManagement.tsx @@ -1,9 +1,11 @@ import { App, Button, Card, Modal, Table, Tag, Tooltip } from "antd"; import { DeleteOutlined, + EditOutlined, PauseCircleOutlined, PlayCircleOutlined, ProfileOutlined, + ReloadOutlined, } from "@ant-design/icons"; import { SearchControls } from "@/components/SearchControls"; import { @@ -35,6 +37,7 @@ export default function TaskManagement() { taskId: "", taskName: "", }); + const filters = [ { key: "status", @@ -101,8 +104,26 @@ export default function TaskManagement() { setDeleteModal({ visible: false, taskId: "", taskName: "" }); }; + const handleRetryTask = async (taskId: string) => { + await executeTaskByIdUsingPost(taskId); + message.success(t("dataCollection.taskManagement.messages.retrySuccess")); + fetchData(); + }; + + const handleEditTask = (record: CollectionTask) => { + navigate(`/data/collection/create-task?taskId=${encodeURIComponent(record.id)}`); + }; + const taskOperations = (record: CollectionTask) => { - const isStopped = record.status === TaskStatus.STOPPED; + // 获取实际的枚举值 + const statusValue = record.status?.value || record.status; + + const isStopped = statusValue === TaskStatus.STOPPED; + const isFailed = statusValue === TaskStatus.FAILED; + const isPending = statusValue === TaskStatus.PENDING; + const isRunning = statusValue === TaskStatus.RUNNING; + const isCompleted = statusValue === TaskStatus.COMPLETED; + const startButton = { key: "start", label: t("dataCollection.taskManagement.actions.start"), @@ -115,7 +136,20 @@ export default function TaskManagement() { icon: , onClick: () => handleStopTask(record.id), }; - return [ + const retryButton = { + key: "retry", + label: t("dataCollection.taskManagement.actions.retry"), + icon: , + onClick: () => handleRetryTask(record.id), + }; + const editButton = { + key: "edit", + label: t("dataCollection.taskManagement.actions.edit"), + icon: , + onClick: () => handleEditTask(record), + }; + + const operations = [ { key: "executions", label: t("dataCollection.taskManagement.actions.executionRecords"), @@ -125,20 +159,46 @@ export default function TaskManagement() { `/data/collection?tab=task-execution&taskId=${encodeURIComponent(record.id)}` ), }, - { - key: "delete", - label: t("dataCollection.taskManagement.actions.delete"), - danger: true, - icon: , - modal: { - title: t("dataCollection.taskManagement.messages.deleteConfirm"), - okText: t("dataCollection.taskManagement.messages.confirmDelete"), - cancelText: t("dataCollection.taskManagement.messages.cancel"), - okType: "danger", - }, - onClick: () => showDeleteConfirm(record.id, record.name), - }, ]; + + // 根据状态添加不同的操作按钮 + // PENDING 状态可以启动和编辑 + if (isPending) { + operations.push(startButton); + operations.push(editButton); + } + // RUNNING 状态可以编辑 + else if (isRunning) { + operations.push(editButton); + } + // FAILED 状态可以重试和编辑 + else if (isFailed) { + operations.push(retryButton); + operations.push(editButton); + } + // STOPPED 状态可以启动和编辑 + else if (isStopped) { + operations.push(startButton); + operations.push(editButton); + } + // COMPLETED 状态的任务不可编辑 + else if (isCompleted) { + // 不添加任何操作 + } + // 其他状态(如 DRAFT)可以编辑 + else { + operations.push(editButton); + } + + operations.push({ + key: "delete", + label: t("dataCollection.taskManagement.actions.delete"), + danger: true, + icon: , + onClick: () => showDeleteConfirm(record.id, record.name), + }); + + return operations; }; const columns = [ diff --git a/runtime/datamate-python/app/module/collection/client/datax_client.py b/runtime/datamate-python/app/module/collection/client/datax_client.py index ff6338289..254f704bf 100644 --- a/runtime/datamate-python/app/module/collection/client/datax_client.py +++ b/runtime/datamate-python/app/module/collection/client/datax_client.py @@ -153,8 +153,14 @@ def run_datax_job(self): self.execution.error_message = f"执行异常: {e}" self.execution.status = TaskStatus.FAILED.name logger.error(f"执行异常: {e}", exc_info=True) + + # 根据同步模式更新任务状态 if self.task.sync_mode == SyncMode.ONCE: + # 一次性任务:使用执行结果作为最终状态 self.task.status = self.execution.status + else: + # 定时任务:恢复为 PENDING 状态,等待下次执行 + self.task.status = TaskStatus.PENDING.name def rename_collection_result(self): if self.template.target_type != "txtfilewriter": diff --git a/runtime/datamate-python/app/module/collection/interface/collection.py b/runtime/datamate-python/app/module/collection/interface/collection.py index 6a350ab9f..72ffc40fd 100644 --- a/runtime/datamate-python/app/module/collection/interface/collection.py +++ b/runtime/datamate-python/app/module/collection/interface/collection.py @@ -120,6 +120,29 @@ async def list_tasks( ) +@router.post("/{task_id}/execute", response_model=StandardResponse[str], operation_id="execute_task") +async def execute_task( + task_id: str, + db: AsyncSession = Depends(get_db) +): + """执行归集任务""" + from app.module.collection.service.collection import CollectionTaskService + + # 验证任务是否存在 + task = await db.execute(select(CollectionTask).where(CollectionTask.id == task_id)) + task = task.scalar_one_or_none() + if not task: + raise BusinessError(ErrorCodes.COLLECTION_TASK_NOT_FOUND, data={"task_id": task_id}) + + # 调用服务执行任务 + await CollectionTaskService.run_async(task_id) + + return SuccessResponse( + data=task_id, + message="Task execution started" + ) + + @router.delete("", response_model=StandardResponse[str], status_code=200) async def delete_collection_tasks( ids: list[str] = Query(..., description="List of task IDs to delete"), diff --git a/runtime/datamate-python/app/module/collection/service/collection.py b/runtime/datamate-python/app/module/collection/service/collection.py index f76275e20..859ed240a 100644 --- a/runtime/datamate-python/app/module/collection/service/collection.py +++ b/runtime/datamate-python/app/module/collection/service/collection.py @@ -72,6 +72,10 @@ async def run_async(task_id: str, dataset_id: str = None): if not template: logger.error(f"template {task.template_name} not exist") return + + # 设置任务状态为 RUNNING + task.status = TaskStatus.RUNNING.name + await session.commit() task_execution = create_execute_record(task) session.add(task_execution) await session.commit() From 08f69f74d3e79aa61daa80e7e44db9d45ae1339c Mon Sep 17 00:00:00 2001 From: MoeexT Date: Mon, 2 Mar 2026 11:58:01 +0800 Subject: [PATCH 2/3] :sparkles: support edit task --- frontend/src/i18n/locales/en/common.json | 10 + frontend/src/i18n/locales/zh/common.json | 10 + .../DataCollection/Create/CreateTask.tsx | 201 +++++++++++++++--- .../pages/DataCollection/collection.apis.ts | 4 + .../module/collection/interface/collection.py | 112 ++++++++-- .../module/collection/schema/collection.py | 32 ++- 6 files changed, 321 insertions(+), 48 deletions(-) diff --git a/frontend/src/i18n/locales/en/common.json b/frontend/src/i18n/locales/en/common.json index d9514d7f8..f9bc02d7c 100644 --- a/frontend/src/i18n/locales/en/common.json +++ b/frontend/src/i18n/locales/en/common.json @@ -83,6 +83,7 @@ "start": "Start", "stop": "Stop", "retry": "Retry", + "edit": "Edit", "executionRecords": "Execution Records", "delete": "Delete" }, @@ -90,11 +91,18 @@ "startSuccess": "Task start request sent", "stopSuccess": "Task stop request sent", "retrySuccess": "Task retry request sent", + "updateSuccess": "Task updated successfully", + "updateFailed": "Failed to update task", "deleteSuccess": "Task deleted", "deleteConfirm": "Are you sure you want to delete this task? This action cannot be undone.", "deleteConfirmMessage": "Are you sure you want to delete task \"{{taskName}}\"? This action cannot be undone.", "confirmDelete": "Delete", "cancel": "Cancel" + }, + "edit": { + "title": "Edit Task", + "taskInfo": "Task Information", + "resetHint": "After saving, the task status will be reset to PENDING and can be re-executed" } }, "templateManagement": { @@ -167,9 +175,11 @@ }, "createTask": { "title": "Create Collection Task", + "editTitle": "Edit Collection Task", "back": "Back", "cancel": "Cancel", "submit": "Create Task", + "updateButton": "Update Task", "actions": { "formatJson": "Format JSON" }, diff --git a/frontend/src/i18n/locales/zh/common.json b/frontend/src/i18n/locales/zh/common.json index 5ce0728b9..2edef769e 100644 --- a/frontend/src/i18n/locales/zh/common.json +++ b/frontend/src/i18n/locales/zh/common.json @@ -83,6 +83,7 @@ "start": "启动", "stop": "停止", "retry": "重试", + "edit": "编辑", "executionRecords": "执行记录", "delete": "删除" }, @@ -90,11 +91,18 @@ "startSuccess": "任务启动请求已发送", "stopSuccess": "任务停止请求已发送", "retrySuccess": "任务重试请求已发送", + "updateSuccess": "任务更新成功", + "updateFailed": "任务更新失败", "deleteSuccess": "任务已删除", "deleteConfirm": "确定要删除该任务吗?此操作不可撤销。", "deleteConfirmMessage": "确定要删除任务「{{taskName}}」吗?删除后将无法恢复。", "confirmDelete": "删除", "cancel": "取消" + }, + "edit": { + "title": "编辑任务", + "taskInfo": "任务信息", + "resetHint": "保存后任务状态将重置为待执行,可以重新执行" } }, "templateManagement": { @@ -167,9 +175,11 @@ }, "createTask": { "title": "创建归集任务", + "editTitle": "编辑归集任务", "back": "返回", "cancel": "取消", "submit": "创建任务", + "updateButton": "更新任务", "actions": { "formatJson": "格式化JSON" }, diff --git a/frontend/src/pages/DataCollection/Create/CreateTask.tsx b/frontend/src/pages/DataCollection/Create/CreateTask.tsx index 8134e9924..a3f97ddaf 100644 --- a/frontend/src/pages/DataCollection/Create/CreateTask.tsx +++ b/frontend/src/pages/DataCollection/Create/CreateTask.tsx @@ -1,8 +1,8 @@ import { useEffect, useState } from "react"; import { Input, Button, Radio, Form, App, Select, InputNumber } from "antd"; -import { Link, useNavigate } from "react-router"; +import { Link, useNavigate, useSearchParams } from "react-router"; import { ArrowLeft } from "lucide-react"; -import { createTaskUsingPost, queryDataXTemplatesUsingGet } from "../collection.apis"; +import { createTaskUsingPost, updateTaskUsingPut, queryDataXTemplatesUsingGet, queryTasksUsingGet } from "../collection.apis"; import SimpleCronScheduler from "@/pages/DataCollection/Create/SimpleCronScheduler"; import { getSyncModeMap } from "../collection.const"; import { SyncMode } from "../collection.model"; @@ -39,11 +39,17 @@ type TemplateFieldDef = { export default function CollectionTaskCreate() { const navigate = useNavigate(); + const [searchParams] = useSearchParams(); const [form] = Form.useForm(); const { message } = App.useApp(); const { t } = useTranslation(); const syncModeOptions = Object.values(getSyncModeMap(t)); + // 编辑模式 + const taskId = searchParams.get("taskId"); + const isEditMode = !!taskId; + const [editLoading, setEditLoading] = useState(false); + const [templates, setTemplates] = useState([]); const [templatesLoading, setTemplatesLoading] = useState(false); const [selectedTemplateId, setSelectedTemplateId] = useState(undefined); @@ -65,8 +71,52 @@ export default function CollectionTaskCreate() { cronExpression: "0 0 * * *", }); + // 解析 cron 表达式 + const parseCronExpression = (cronExpr: string) => { + const parts = cronExpr.trim().split(/\s+/); + if (parts.length !== 5) { + // 无效的 cron 表达式,返回默认值 + return { + type: "daily" as const, + time: "00:00", + cronExpression: cronExpr, + }; + } + + const [minute, hour, day, month, weekday] = parts; + const formattedHour = hour.padStart(2, "0"); + const formattedMinute = minute.padStart(2, "0"); + const time = `${formattedHour}:${formattedMinute}`; + + // 判断类型:monthly (指定日期), weekly (指定星期), daily (都是 *) + if (day !== "*" && month === "*") { + // monthly: 例如 "0 9 1 * *" 表示每月1号9点 + return { + type: "monthly" as const, + time, + monthDay: parseInt(day, 10), + cronExpression: cronExpr, + }; + } else if (weekday !== "*" && day === "*") { + // weekly: 例如 "0 9 * * 1" 表示每周一9点 + return { + type: "weekly" as const, + time, + weekDay: parseInt(weekday, 10), + cronExpression: cronExpr, + }; + } else { + // daily: 例如 "0 9 * * *" 表示每天9点 + return { + type: "daily" as const, + time, + cronExpression: cronExpr, + }; + } + }; + useEffect(() => { - const run = async () => { + const loadTemplates = async () => { setTemplatesLoading(true); try { const resp: any = await queryDataXTemplatesUsingGet({ page: 1, size: 1000 }); @@ -78,8 +128,56 @@ export default function CollectionTaskCreate() { setTemplatesLoading(false); } }; - run() - }, []); + + const loadTask = async () => { + if (!taskId) return; + setEditLoading(true); + try { + const resp: any = await queryTasksUsingGet({ page: 1, size: 1 }); + const task = resp?.data?.content?.find((t: any) => t.id === taskId); + if (task) { + // 设置表单值 + setSelectedTemplateId(task.templateId); + form.setFieldsValue({ + name: task.name, + description: task.description, + syncMode: task.syncMode, + scheduleExpression: task.scheduleExpression || "", + timeoutSeconds: task.timeoutSeconds || 3600, + templateId: task.templateId, + config: task.config || { parameter: {}, reader: {}, writer: {} }, + }); + setNewTask({ + name: task.name, + description: task.description, + syncMode: task.syncMode, + scheduleExpression: task.scheduleExpression || "", + timeoutSeconds: task.timeoutSeconds || 3600, + templateId: task.templateId, + config: task.config || { parameter: {}, reader: {}, writer: {} }, + }); + // 解析 cron 表达式 + if (task.scheduleExpression) { + const parsedSchedule = parseCronExpression(task.scheduleExpression); + setScheduleExpression(parsedSchedule); + } + } else { + message.error(t("dataCollection.taskManagement.messages.updateFailed")); + navigate("/data/collection"); + } + } catch (e) { + message.error(t("dataCollection.taskManagement.messages.updateFailed")); + navigate("/data/collection"); + } finally { + setEditLoading(false); + } + }; + + loadTemplates(); + if (isEditMode) { + loadTask(); + } + }, [taskId]); const parseJsonObjectInput = (value: any) => { if (value === undefined || value === null) return value; @@ -195,10 +293,30 @@ export default function CollectionTaskCreate() { ), }; } - await createTaskUsingPost(payload); - message.success(t("dataCollection.createTask.messages.createSuccess")); + + if (isEditMode) { + // 编辑模式:只更新允许的字段 + const updateData: any = { + description: payload.description, + timeoutSeconds: payload.timeoutSeconds, + config: payload.config, + }; + if (payload.syncMode === SyncMode.SCHEDULED && payload.scheduleExpression) { + updateData.scheduleExpression = payload.scheduleExpression; + } + await updateTaskUsingPut(taskId!, updateData); + message.success(t("dataCollection.taskManagement.messages.updateSuccess")); + } else { + // 创建模式 + await createTaskUsingPost(payload); + message.success(t("dataCollection.createTask.messages.createSuccess")); + } navigate("/data/collection"); } catch (error) { + if (error.errorFields) { + // 表单验证错误,不显示消息 + return; + } message.error( t("dataCollection.createTask.messages.errorWithDetail", { message: error?.data?.message ?? "", @@ -412,22 +530,30 @@ export default function CollectionTaskCreate() { return (
-
-
- - - -

- {t("dataCollection.createTask.title")} -

+ {editLoading ? ( +
+
{t("common.loading")}
-
+ ) : ( + <> +
+
+ + + +

+ {isEditMode + ? t("dataCollection.createTask.editTitle") + : t("dataCollection.createTask.title")} +

+
+
-
-
-
+
+ - + { const value = e.target.value; setNewTask({ @@ -532,6 +662,7 @@ export default function CollectionTaskCreate() {