diff --git a/backend/biz/task/handler/v1/task.go b/backend/biz/task/handler/v1/task.go index ebbf009a..429eb2fb 100644 --- a/backend/biz/task/handler/v1/task.go +++ b/backend/biz/task/handler/v1/task.go @@ -6,6 +6,8 @@ import ( "errors" "fmt" "log/slog" + "strconv" + "strings" "time" "github.com/GoYoko/web" @@ -469,6 +471,7 @@ func buildTaskStreamsFromLogEntries(entries []tasklog.Entry, logger *slog.Logger Type: consts.TaskStreamType(entry.Event), Data: normalizeTaskStreamData(entry.Event, []byte(entry.Data)), Kind: entry.Kind, + Seq: msgSeqStart(entry.MsgSeq), Timestamp: entry.TS.UnixMilli(), }) if entry.Event == "task-ended" { @@ -479,6 +482,18 @@ func buildTaskStreamsFromLogEntries(entries []tasklog.Entry, logger *slog.Logger return streams, ended } +func msgSeqStart(msgSeq string) uint64 { + if msgSeq == "" { + return 0 + } + start, _, _ := strings.Cut(msgSeq, "-") + seq, err := strconv.ParseUint(start, 10, 64) + if err != nil { + return 0 + } + return seq +} + type taskUserInputStoragePayload struct { Encoding string `json:"encoding"` Content string `json:"content"` @@ -554,6 +569,7 @@ func (h *TaskHandler) consumeLiveStream(ctx context.Context, cancel context.Canc Type: consts.TaskStreamType(chunk.Event), Data: normalizeTaskStreamData(chunk.Event, chunk.Data), Kind: chunk.Kind, + Seq: chunk.Seq, Timestamp: chunk.Timestamp / 1e6, }); err != nil { return @@ -572,6 +588,7 @@ func (h *TaskHandler) subscribeRealtimeStream(ctx context.Context, cancel contex Type: consts.TaskStreamType(chunk.Event), Data: normalizeTaskStreamData(chunk.Event, chunk.Data), Kind: chunk.Kind, + Seq: chunk.Seq, Timestamp: chunk.Timestamp / 1e6, }); err != nil { return fmt.Errorf("failed to write to websocket: %w", err) @@ -788,7 +805,8 @@ func (h *TaskHandler) TaskTurns(c *web.Context, req domain.TaskRoundsReq) error Event: c.Event, Kind: c.Kind, Timestamp: c.Timestamp, - Seq: c.TurnSeq, + Seq: c.Seq, + TurnSeq: c.TurnSeq, Labels: c.Labels, }) } diff --git a/backend/biz/task/handler/v1/task_attach_test.go b/backend/biz/task/handler/v1/task_attach_test.go index 5ea8a942..80a05f6c 100644 --- a/backend/biz/task/handler/v1/task_attach_test.go +++ b/backend/biz/task/handler/v1/task_attach_test.go @@ -37,7 +37,7 @@ func TestBuildTaskStreamsFromLogEntriesKeepsStreamingWhenNotEnded(t *testing.T) streams, ended := buildTaskStreamsFromLogEntries([]tasklog.Entry{ {TaskID: uuid.Nil, TS: base, Event: "task-started", Kind: "acp_event"}, - {TaskID: uuid.Nil, TS: base.Add(time.Second), Event: "task-running", Kind: "agent_message_chunk"}, + {TaskID: uuid.Nil, TS: base.Add(time.Second), Event: "task-running", Kind: "agent_message_chunk", MsgSeq: "13-16"}, }, logger) if ended { @@ -46,6 +46,9 @@ func TestBuildTaskStreamsFromLogEntriesKeepsStreamingWhenNotEnded(t *testing.T) if len(streams) != 2 { t.Fatalf("len(streams) = %d, want 2", len(streams)) } + if streams[1].Seq != 13 { + t.Fatalf("stream seq = %d, want 13", streams[1].Seq) + } } func TestNormalizeUserInputDataWrapsLegacyText(t *testing.T) { diff --git a/backend/biz/task/handler/v1/task_control.go b/backend/biz/task/handler/v1/task_control.go index 0a88f0bb..fbfd3c2d 100644 --- a/backend/biz/task/handler/v1/task_control.go +++ b/backend/biz/task/handler/v1/task_control.go @@ -418,6 +418,7 @@ func (h *TaskHandler) controlSubscribeTaskEvents(ctx context.Context, wsConn *ws Type: consts.TaskStreamTypeTaskEvent, Data: chunk.Data, Kind: chunk.Kind, + Seq: chunk.Seq, Timestamp: chunk.Timestamp / 1e6, }) }) diff --git a/backend/docs/swagger.json b/backend/docs/swagger.json index de2e92fe..aab3ffd1 100644 --- a/backend/docs/swagger.json +++ b/backend/docs/swagger.json @@ -12689,6 +12689,10 @@ } }, "seq": { + "description": "消息序号,来自 ClickHouse msg_seq_start", + "type": "integer" + }, + "turn_seq": { "description": "轮次号,可作为 cursor 翻页;仅日志存储为 ClickHouse 时有值", "type": "integer" }, diff --git a/backend/domain/task.go b/backend/domain/task.go index 28949828..bb6c3052 100644 --- a/backend/domain/task.go +++ b/backend/domain/task.go @@ -315,6 +315,7 @@ type TaskStream struct { Type consts.TaskStreamType `json:"type"` Data []byte `json:"data"` // user-input 事件使用 TaskUserInputPayload 的 JSON 字符串 Kind string `json:"kind"` + Seq uint64 `json:"seq,omitempty"` Timestamp int64 `json:"timestamp"` } @@ -357,7 +358,8 @@ type TaskChunkEntry struct { Event string `json:"event"` Kind string `json:"kind"` Timestamp int64 `json:"timestamp"` - Seq uint32 `json:"seq,omitempty"` // 轮次号,可作为 cursor 翻页;仅日志存储为 ClickHouse 时有值 + Seq uint64 `json:"seq,omitempty"` // 消息序号,来自 ClickHouse msg_seq_start + TurnSeq uint32 `json:"turn_seq,omitempty"` // 轮次号,可作为 cursor 翻页;仅日志存储为 ClickHouse 时有值 Labels map[string]string `json:"labels,omitempty"` } diff --git a/backend/pkg/taskflow/types.go b/backend/pkg/taskflow/types.go index 3f769c12..cb0f53d0 100644 --- a/backend/pkg/taskflow/types.go +++ b/backend/pkg/taskflow/types.go @@ -362,6 +362,7 @@ type TaskChunk struct { Data []byte `json:"data,omitempty"` Event string `json:"event"` Kind string `json:"kind"` + Seq uint64 `json:"seq,omitempty"` Timestamp int64 `json:"timestamp,omitempty"` } diff --git a/backend/pkg/tasklog/clickhouse_provider.go b/backend/pkg/tasklog/clickhouse_provider.go index 908e61e3..3b76704a 100644 --- a/backend/pkg/tasklog/clickhouse_provider.go +++ b/backend/pkg/tasklog/clickhouse_provider.go @@ -107,7 +107,7 @@ func (p *ClickHouseProvider) QueryTurns(ctx context.Context, taskID uuid.UUID, _ args = append(args, limit) q := fmt.Sprintf(` -SELECT ts, event, kind, data, turn_seq +SELECT ts, event, kind, data, turn_seq, msg_seq_start FROM %[1]s WHERE task_id = ? AND turn_seq IN ( SELECT DISTINCT turn_seq @@ -135,8 +135,9 @@ ORDER BY turn_seq %[3]s, ts ASC, msg_seq_start ASC, ingest_id ASC kind string data string turnSeq uint32 + seq uint64 ) - if err := rows.Scan(&ts, &event, &kind, &data, &turnSeq); err != nil { + if err := rows.Scan(&ts, &event, &kind, &data, &turnSeq, &seq); err != nil { return nil, err } // 行按扫描方向排列,最后一行所在轮即本页边界轮 @@ -146,6 +147,7 @@ ORDER BY turn_seq %[3]s, ts ASC, msg_seq_start ASC, ingest_id ASC Event: event, Kind: kind, Timestamp: ts.UTC().UnixNano(), + Seq: seq, TurnSeq: turnSeq, }) } diff --git a/backend/pkg/tasklog/clickhouse_provider_test.go b/backend/pkg/tasklog/clickhouse_provider_test.go index 434ce264..8107730f 100644 --- a/backend/pkg/tasklog/clickhouse_provider_test.go +++ b/backend/pkg/tasklog/clickhouse_provider_test.go @@ -115,10 +115,10 @@ func TestClickHouseProviderQueryTurnsUsesSparseTurnCursor(t *testing.T) { defer db.Close() taskID := uuid.MustParse("11111111-1111-1111-1111-111111111111") - chunkRows := sqlmock.NewRows([]string{"ts", "event", "kind", "data", "turn_seq"}). - AddRow(time.Unix(1_710_000_010, 0).UTC(), "user-input", "", "latest", uint32(1)) + chunkRows := sqlmock.NewRows([]string{"ts", "event", "kind", "data", "turn_seq", "msg_seq_start"}). + AddRow(time.Unix(1_710_000_010, 0).UTC(), "user-input", "", "latest", uint32(1), uint64(11)) - mock.ExpectQuery("SELECT ts, event, kind, data, turn_seq[\\s\\S]*FROM task_logs_test[\\s\\S]*turn_seq IN \\([\\s\\S]*SELECT DISTINCT turn_seq[\\s\\S]*FROM task_logs_test[\\s\\S]*turn_seq < \\?[\\s\\S]*ORDER BY turn_seq DESC[\\s\\S]*LIMIT \\?[\\s\\S]*ORDER BY turn_seq DESC, ts ASC, msg_seq_start ASC, ingest_id ASC\\s*$"). + mock.ExpectQuery("SELECT ts, event, kind, data, turn_seq, msg_seq_start[\\s\\S]*FROM task_logs_test[\\s\\S]*turn_seq IN \\([\\s\\S]*SELECT DISTINCT turn_seq[\\s\\S]*FROM task_logs_test[\\s\\S]*turn_seq < \\?[\\s\\S]*ORDER BY turn_seq DESC[\\s\\S]*LIMIT \\?[\\s\\S]*ORDER BY turn_seq DESC, ts ASC, msg_seq_start ASC, ingest_id ASC\\s*$"). WithArgs(taskID, taskID, uint32(2), 1). WillReturnRows(chunkRows) @@ -137,6 +137,9 @@ func TestClickHouseProviderQueryTurnsUsesSparseTurnCursor(t *testing.T) { if resp.Chunks[0].TurnSeq != 1 { t.Fatalf("chunk turn_seq = %d, want 1", resp.Chunks[0].TurnSeq) } + if resp.Chunks[0].Seq != 11 { + t.Fatalf("chunk seq = %d, want 11", resp.Chunks[0].Seq) + } if resp.HasMore { t.Fatal("expected has_more=false") } @@ -157,10 +160,10 @@ func TestClickHouseProviderQueryTurnsBackwardHasMore(t *testing.T) { taskID := uuid.MustParse("11111111-1111-1111-1111-111111111111") now := time.Unix(1_710_000_010, 0).UTC() - chunkRows := sqlmock.NewRows([]string{"ts", "event", "kind", "data", "turn_seq"}). - AddRow(now, "task-running", "acp_event", "turn-3", uint32(3)) + chunkRows := sqlmock.NewRows([]string{"ts", "event", "kind", "data", "turn_seq", "msg_seq_start"}). + AddRow(now, "task-running", "acp_event", "turn-3", uint32(3), uint64(31)) - mock.ExpectQuery("SELECT ts, event, kind, data, turn_seq[\\s\\S]*FROM task_logs_test[\\s\\S]*turn_seq IN \\([\\s\\S]*SELECT DISTINCT turn_seq[\\s\\S]*FROM task_logs_test[\\s\\S]*ORDER BY turn_seq DESC[\\s\\S]*LIMIT \\?[\\s\\S]*ORDER BY turn_seq DESC, ts ASC, msg_seq_start ASC, ingest_id ASC\\s*$"). + mock.ExpectQuery("SELECT ts, event, kind, data, turn_seq, msg_seq_start[\\s\\S]*FROM task_logs_test[\\s\\S]*turn_seq IN \\([\\s\\S]*SELECT DISTINCT turn_seq[\\s\\S]*FROM task_logs_test[\\s\\S]*ORDER BY turn_seq DESC[\\s\\S]*LIMIT \\?[\\s\\S]*ORDER BY turn_seq DESC, ts ASC, msg_seq_start ASC, ingest_id ASC\\s*$"). WithArgs(taskID, taskID, 1). WillReturnRows(chunkRows) @@ -179,6 +182,9 @@ func TestClickHouseProviderQueryTurnsBackwardHasMore(t *testing.T) { if string(resp.Chunks[0].Data) != "turn-3" { t.Fatalf("chunk data = %q, want turn-3", string(resp.Chunks[0].Data)) } + if resp.Chunks[0].Seq != 31 { + t.Fatalf("chunk seq = %d, want 31", resp.Chunks[0].Seq) + } if !resp.HasMore { t.Fatal("expected has_more=true") } @@ -199,11 +205,11 @@ func TestClickHouseProviderQueryTurnsForward(t *testing.T) { taskID := uuid.MustParse("11111111-1111-1111-1111-111111111111") now := time.Unix(1_710_000_010, 0).UTC() - chunkRows := sqlmock.NewRows([]string{"ts", "event", "kind", "data", "turn_seq"}). - AddRow(now, "user-input", "", "turn-4", uint32(4)). - AddRow(now.Add(time.Second), "user-input", "", "turn-5", uint32(5)) + chunkRows := sqlmock.NewRows([]string{"ts", "event", "kind", "data", "turn_seq", "msg_seq_start"}). + AddRow(now, "user-input", "", "turn-4", uint32(4), uint64(41)). + AddRow(now.Add(time.Second), "user-input", "", "turn-5", uint32(5), uint64(51)) - mock.ExpectQuery("SELECT ts, event, kind, data, turn_seq[\\s\\S]*FROM task_logs_test[\\s\\S]*turn_seq IN \\([\\s\\S]*SELECT DISTINCT turn_seq[\\s\\S]*FROM task_logs_test[\\s\\S]*turn_seq > \\?[\\s\\S]*ORDER BY turn_seq ASC[\\s\\S]*LIMIT \\?[\\s\\S]*ORDER BY turn_seq ASC, ts ASC, msg_seq_start ASC, ingest_id ASC\\s*$"). + mock.ExpectQuery("SELECT ts, event, kind, data, turn_seq, msg_seq_start[\\s\\S]*FROM task_logs_test[\\s\\S]*turn_seq IN \\([\\s\\S]*SELECT DISTINCT turn_seq[\\s\\S]*FROM task_logs_test[\\s\\S]*turn_seq > \\?[\\s\\S]*ORDER BY turn_seq ASC[\\s\\S]*LIMIT \\?[\\s\\S]*ORDER BY turn_seq ASC, ts ASC, msg_seq_start ASC, ingest_id ASC\\s*$"). WithArgs(taskID, taskID, uint32(3), 2). WillReturnRows(chunkRows) @@ -226,6 +232,9 @@ func TestClickHouseProviderQueryTurnsForward(t *testing.T) { if resp.Chunks[0].TurnSeq != 4 || resp.Chunks[1].TurnSeq != 5 { t.Fatalf("chunk turn_seqs = [%d, %d], want [4, 5]", resp.Chunks[0].TurnSeq, resp.Chunks[1].TurnSeq) } + if resp.Chunks[0].Seq != 41 || resp.Chunks[1].Seq != 51 { + t.Fatalf("chunk seqs = [%d, %d], want [41, 51]", resp.Chunks[0].Seq, resp.Chunks[1].Seq) + } if !resp.HasMore { t.Fatal("expected has_more=true") } @@ -324,11 +333,11 @@ func TestClickHouseProviderQueryTurnsBackwardInclusive(t *testing.T) { taskID := uuid.MustParse("11111111-1111-1111-1111-111111111111") now := time.Unix(1_710_000_010, 0).UTC() - chunkRows := sqlmock.NewRows([]string{"ts", "event", "kind", "data", "turn_seq"}). - AddRow(now.Add(time.Second), "user-input", "", "turn-5", uint32(5)). - AddRow(now, "user-input", "", "turn-4", uint32(4)) + chunkRows := sqlmock.NewRows([]string{"ts", "event", "kind", "data", "turn_seq", "msg_seq_start"}). + AddRow(now.Add(time.Second), "user-input", "", "turn-5", uint32(5), uint64(51)). + AddRow(now, "user-input", "", "turn-4", uint32(4), uint64(41)) - mock.ExpectQuery("SELECT ts, event, kind, data, turn_seq[\\s\\S]*turn_seq <= \\?[\\s\\S]*ORDER BY turn_seq DESC[\\s\\S]*LIMIT \\?[\\s\\S]*ORDER BY turn_seq DESC, ts ASC, msg_seq_start ASC, ingest_id ASC\\s*$"). + mock.ExpectQuery("SELECT ts, event, kind, data, turn_seq, msg_seq_start[\\s\\S]*turn_seq <= \\?[\\s\\S]*ORDER BY turn_seq DESC[\\s\\S]*LIMIT \\?[\\s\\S]*ORDER BY turn_seq DESC, ts ASC, msg_seq_start ASC, ingest_id ASC\\s*$"). WithArgs(taskID, taskID, uint32(5), 2). WillReturnRows(chunkRows) @@ -351,6 +360,9 @@ func TestClickHouseProviderQueryTurnsBackwardInclusive(t *testing.T) { if resp.Chunks[0].TurnSeq != 5 || resp.Chunks[1].TurnSeq != 4 { t.Fatalf("chunk turn_seqs = [%d, %d], want [5, 4]", resp.Chunks[0].TurnSeq, resp.Chunks[1].TurnSeq) } + if resp.Chunks[0].Seq != 51 || resp.Chunks[1].Seq != 41 { + t.Fatalf("chunk seqs = [%d, %d], want [51, 41]", resp.Chunks[0].Seq, resp.Chunks[1].Seq) + } if resp.HasMore { t.Fatal("expected has_more=false") } diff --git a/backend/pkg/tasklog/types.go b/backend/pkg/tasklog/types.go index 3e52f08a..82c34a56 100644 --- a/backend/pkg/tasklog/types.go +++ b/backend/pkg/tasklog/types.go @@ -51,6 +51,7 @@ type TurnChunk struct { Event string Kind string Timestamp int64 + Seq uint64 // 消息序号,来自 ClickHouse msg_seq_start TurnSeq uint32 // 轮次号,仅 ClickHouse 有值,Loki 为 0 Labels map[string]string }