Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 19 additions & 1 deletion backend/biz/task/handler/v1/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"errors"
"fmt"
"log/slog"
"strconv"
"strings"
"time"

"github.com/GoYoko/web"
Expand Down Expand Up @@ -469,6 +471,7 @@ func buildTaskStreamsFromLogEntries(entries []tasklog.Entry, logger *slog.Logger
Type: consts.TaskStreamType(entry.Event),
Data: normalizeTaskStreamData(entry.Event, []byte(entry.Data)),
Kind: entry.Kind,
Seq: msgSeqStart(entry.MsgSeq),
Timestamp: entry.TS.UnixMilli(),
})
if entry.Event == "task-ended" {
Expand All @@ -479,6 +482,18 @@ func buildTaskStreamsFromLogEntries(entries []tasklog.Entry, logger *slog.Logger
return streams, ended
}

func msgSeqStart(msgSeq string) uint64 {
if msgSeq == "" {
return 0
}
start, _, _ := strings.Cut(msgSeq, "-")
seq, err := strconv.ParseUint(start, 10, 64)
if err != nil {
return 0
}
return seq
}

type taskUserInputStoragePayload struct {
Encoding string `json:"encoding"`
Content string `json:"content"`
Expand Down Expand Up @@ -554,6 +569,7 @@ func (h *TaskHandler) consumeLiveStream(ctx context.Context, cancel context.Canc
Type: consts.TaskStreamType(chunk.Event),
Data: normalizeTaskStreamData(chunk.Event, chunk.Data),
Kind: chunk.Kind,
Seq: chunk.Seq,
Timestamp: chunk.Timestamp / 1e6,
}); err != nil {
return
Expand All @@ -572,6 +588,7 @@ func (h *TaskHandler) subscribeRealtimeStream(ctx context.Context, cancel contex
Type: consts.TaskStreamType(chunk.Event),
Data: normalizeTaskStreamData(chunk.Event, chunk.Data),
Kind: chunk.Kind,
Seq: chunk.Seq,
Timestamp: chunk.Timestamp / 1e6,
}); err != nil {
return fmt.Errorf("failed to write to websocket: %w", err)
Expand Down Expand Up @@ -788,7 +805,8 @@ func (h *TaskHandler) TaskTurns(c *web.Context, req domain.TaskRoundsReq) error
Event: c.Event,
Kind: c.Kind,
Timestamp: c.Timestamp,
Seq: c.TurnSeq,
Seq: c.Seq,
TurnSeq: c.TurnSeq,
Labels: c.Labels,
})
}
Expand Down
5 changes: 4 additions & 1 deletion backend/biz/task/handler/v1/task_attach_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func TestBuildTaskStreamsFromLogEntriesKeepsStreamingWhenNotEnded(t *testing.T)

streams, ended := buildTaskStreamsFromLogEntries([]tasklog.Entry{
{TaskID: uuid.Nil, TS: base, Event: "task-started", Kind: "acp_event"},
{TaskID: uuid.Nil, TS: base.Add(time.Second), Event: "task-running", Kind: "agent_message_chunk"},
{TaskID: uuid.Nil, TS: base.Add(time.Second), Event: "task-running", Kind: "agent_message_chunk", MsgSeq: "13-16"},
}, logger)

if ended {
Expand All @@ -46,6 +46,9 @@ func TestBuildTaskStreamsFromLogEntriesKeepsStreamingWhenNotEnded(t *testing.T)
if len(streams) != 2 {
t.Fatalf("len(streams) = %d, want 2", len(streams))
}
if streams[1].Seq != 13 {
t.Fatalf("stream seq = %d, want 13", streams[1].Seq)
}
}

func TestNormalizeUserInputDataWrapsLegacyText(t *testing.T) {
Expand Down
1 change: 1 addition & 0 deletions backend/biz/task/handler/v1/task_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,7 @@ func (h *TaskHandler) controlSubscribeTaskEvents(ctx context.Context, wsConn *ws
Type: consts.TaskStreamTypeTaskEvent,
Data: chunk.Data,
Kind: chunk.Kind,
Seq: chunk.Seq,
Timestamp: chunk.Timestamp / 1e6,
})
})
Expand Down
4 changes: 4 additions & 0 deletions backend/docs/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -12689,6 +12689,10 @@
}
},
"seq": {
"description": "消息序号,来自 ClickHouse msg_seq_start",
"type": "integer"
},
"turn_seq": {
"description": "轮次号,可作为 cursor 翻页;仅日志存储为 ClickHouse 时有值",
"type": "integer"
},
Expand Down
4 changes: 3 additions & 1 deletion backend/domain/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,7 @@ type TaskStream struct {
Type consts.TaskStreamType `json:"type"`
Data []byte `json:"data"` // user-input 事件使用 TaskUserInputPayload 的 JSON 字符串
Kind string `json:"kind"`
Seq uint64 `json:"seq,omitempty"`
Timestamp int64 `json:"timestamp"`
}

Expand Down Expand Up @@ -357,7 +358,8 @@ type TaskChunkEntry struct {
Event string `json:"event"`
Kind string `json:"kind"`
Timestamp int64 `json:"timestamp"`
Seq uint32 `json:"seq,omitempty"` // 轮次号,可作为 cursor 翻页;仅日志存储为 ClickHouse 时有值
Seq uint64 `json:"seq,omitempty"` // 消息序号,来自 ClickHouse msg_seq_start
TurnSeq uint32 `json:"turn_seq,omitempty"` // 轮次号,可作为 cursor 翻页;仅日志存储为 ClickHouse 时有值
Labels map[string]string `json:"labels,omitempty"`
}

Expand Down
1 change: 1 addition & 0 deletions backend/pkg/taskflow/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,7 @@ type TaskChunk struct {
Data []byte `json:"data,omitempty"`
Event string `json:"event"`
Kind string `json:"kind"`
Seq uint64 `json:"seq,omitempty"`
Timestamp int64 `json:"timestamp,omitempty"`
}

Expand Down
6 changes: 4 additions & 2 deletions backend/pkg/tasklog/clickhouse_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func (p *ClickHouseProvider) QueryTurns(ctx context.Context, taskID uuid.UUID, _
args = append(args, limit)

q := fmt.Sprintf(`
SELECT ts, event, kind, data, turn_seq
SELECT ts, event, kind, data, turn_seq, msg_seq_start
FROM %[1]s
WHERE task_id = ? AND turn_seq IN (
SELECT DISTINCT turn_seq
Expand Down Expand Up @@ -135,8 +135,9 @@ ORDER BY turn_seq %[3]s, ts ASC, msg_seq_start ASC, ingest_id ASC
kind string
data string
turnSeq uint32
seq uint64
)
if err := rows.Scan(&ts, &event, &kind, &data, &turnSeq); err != nil {
if err := rows.Scan(&ts, &event, &kind, &data, &turnSeq, &seq); err != nil {
return nil, err
}
// 行按扫描方向排列,最后一行所在轮即本页边界轮
Expand All @@ -146,6 +147,7 @@ ORDER BY turn_seq %[3]s, ts ASC, msg_seq_start ASC, ingest_id ASC
Event: event,
Kind: kind,
Timestamp: ts.UTC().UnixNano(),
Seq: seq,
TurnSeq: turnSeq,
})
}
Expand Down
40 changes: 26 additions & 14 deletions backend/pkg/tasklog/clickhouse_provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,10 @@ func TestClickHouseProviderQueryTurnsUsesSparseTurnCursor(t *testing.T) {
defer db.Close()

taskID := uuid.MustParse("11111111-1111-1111-1111-111111111111")
chunkRows := sqlmock.NewRows([]string{"ts", "event", "kind", "data", "turn_seq"}).
AddRow(time.Unix(1_710_000_010, 0).UTC(), "user-input", "", "latest", uint32(1))
chunkRows := sqlmock.NewRows([]string{"ts", "event", "kind", "data", "turn_seq", "msg_seq_start"}).
AddRow(time.Unix(1_710_000_010, 0).UTC(), "user-input", "", "latest", uint32(1), uint64(11))

mock.ExpectQuery("SELECT ts, event, kind, data, turn_seq[\\s\\S]*FROM task_logs_test[\\s\\S]*turn_seq IN \\([\\s\\S]*SELECT DISTINCT turn_seq[\\s\\S]*FROM task_logs_test[\\s\\S]*turn_seq < \\?[\\s\\S]*ORDER BY turn_seq DESC[\\s\\S]*LIMIT \\?[\\s\\S]*ORDER BY turn_seq DESC, ts ASC, msg_seq_start ASC, ingest_id ASC\\s*$").
mock.ExpectQuery("SELECT ts, event, kind, data, turn_seq, msg_seq_start[\\s\\S]*FROM task_logs_test[\\s\\S]*turn_seq IN \\([\\s\\S]*SELECT DISTINCT turn_seq[\\s\\S]*FROM task_logs_test[\\s\\S]*turn_seq < \\?[\\s\\S]*ORDER BY turn_seq DESC[\\s\\S]*LIMIT \\?[\\s\\S]*ORDER BY turn_seq DESC, ts ASC, msg_seq_start ASC, ingest_id ASC\\s*$").
WithArgs(taskID, taskID, uint32(2), 1).
WillReturnRows(chunkRows)

Expand All @@ -137,6 +137,9 @@ func TestClickHouseProviderQueryTurnsUsesSparseTurnCursor(t *testing.T) {
if resp.Chunks[0].TurnSeq != 1 {
t.Fatalf("chunk turn_seq = %d, want 1", resp.Chunks[0].TurnSeq)
}
if resp.Chunks[0].Seq != 11 {
t.Fatalf("chunk seq = %d, want 11", resp.Chunks[0].Seq)
}
if resp.HasMore {
t.Fatal("expected has_more=false")
}
Expand All @@ -157,10 +160,10 @@ func TestClickHouseProviderQueryTurnsBackwardHasMore(t *testing.T) {

taskID := uuid.MustParse("11111111-1111-1111-1111-111111111111")
now := time.Unix(1_710_000_010, 0).UTC()
chunkRows := sqlmock.NewRows([]string{"ts", "event", "kind", "data", "turn_seq"}).
AddRow(now, "task-running", "acp_event", "turn-3", uint32(3))
chunkRows := sqlmock.NewRows([]string{"ts", "event", "kind", "data", "turn_seq", "msg_seq_start"}).
AddRow(now, "task-running", "acp_event", "turn-3", uint32(3), uint64(31))

mock.ExpectQuery("SELECT ts, event, kind, data, turn_seq[\\s\\S]*FROM task_logs_test[\\s\\S]*turn_seq IN \\([\\s\\S]*SELECT DISTINCT turn_seq[\\s\\S]*FROM task_logs_test[\\s\\S]*ORDER BY turn_seq DESC[\\s\\S]*LIMIT \\?[\\s\\S]*ORDER BY turn_seq DESC, ts ASC, msg_seq_start ASC, ingest_id ASC\\s*$").
mock.ExpectQuery("SELECT ts, event, kind, data, turn_seq, msg_seq_start[\\s\\S]*FROM task_logs_test[\\s\\S]*turn_seq IN \\([\\s\\S]*SELECT DISTINCT turn_seq[\\s\\S]*FROM task_logs_test[\\s\\S]*ORDER BY turn_seq DESC[\\s\\S]*LIMIT \\?[\\s\\S]*ORDER BY turn_seq DESC, ts ASC, msg_seq_start ASC, ingest_id ASC\\s*$").
WithArgs(taskID, taskID, 1).
WillReturnRows(chunkRows)

Expand All @@ -179,6 +182,9 @@ func TestClickHouseProviderQueryTurnsBackwardHasMore(t *testing.T) {
if string(resp.Chunks[0].Data) != "turn-3" {
t.Fatalf("chunk data = %q, want turn-3", string(resp.Chunks[0].Data))
}
if resp.Chunks[0].Seq != 31 {
t.Fatalf("chunk seq = %d, want 31", resp.Chunks[0].Seq)
}
if !resp.HasMore {
t.Fatal("expected has_more=true")
}
Expand All @@ -199,11 +205,11 @@ func TestClickHouseProviderQueryTurnsForward(t *testing.T) {

taskID := uuid.MustParse("11111111-1111-1111-1111-111111111111")
now := time.Unix(1_710_000_010, 0).UTC()
chunkRows := sqlmock.NewRows([]string{"ts", "event", "kind", "data", "turn_seq"}).
AddRow(now, "user-input", "", "turn-4", uint32(4)).
AddRow(now.Add(time.Second), "user-input", "", "turn-5", uint32(5))
chunkRows := sqlmock.NewRows([]string{"ts", "event", "kind", "data", "turn_seq", "msg_seq_start"}).
AddRow(now, "user-input", "", "turn-4", uint32(4), uint64(41)).
AddRow(now.Add(time.Second), "user-input", "", "turn-5", uint32(5), uint64(51))

mock.ExpectQuery("SELECT ts, event, kind, data, turn_seq[\\s\\S]*FROM task_logs_test[\\s\\S]*turn_seq IN \\([\\s\\S]*SELECT DISTINCT turn_seq[\\s\\S]*FROM task_logs_test[\\s\\S]*turn_seq > \\?[\\s\\S]*ORDER BY turn_seq ASC[\\s\\S]*LIMIT \\?[\\s\\S]*ORDER BY turn_seq ASC, ts ASC, msg_seq_start ASC, ingest_id ASC\\s*$").
mock.ExpectQuery("SELECT ts, event, kind, data, turn_seq, msg_seq_start[\\s\\S]*FROM task_logs_test[\\s\\S]*turn_seq IN \\([\\s\\S]*SELECT DISTINCT turn_seq[\\s\\S]*FROM task_logs_test[\\s\\S]*turn_seq > \\?[\\s\\S]*ORDER BY turn_seq ASC[\\s\\S]*LIMIT \\?[\\s\\S]*ORDER BY turn_seq ASC, ts ASC, msg_seq_start ASC, ingest_id ASC\\s*$").
WithArgs(taskID, taskID, uint32(3), 2).
WillReturnRows(chunkRows)

Expand All @@ -226,6 +232,9 @@ func TestClickHouseProviderQueryTurnsForward(t *testing.T) {
if resp.Chunks[0].TurnSeq != 4 || resp.Chunks[1].TurnSeq != 5 {
t.Fatalf("chunk turn_seqs = [%d, %d], want [4, 5]", resp.Chunks[0].TurnSeq, resp.Chunks[1].TurnSeq)
}
if resp.Chunks[0].Seq != 41 || resp.Chunks[1].Seq != 51 {
t.Fatalf("chunk seqs = [%d, %d], want [41, 51]", resp.Chunks[0].Seq, resp.Chunks[1].Seq)
}
if !resp.HasMore {
t.Fatal("expected has_more=true")
}
Expand Down Expand Up @@ -324,11 +333,11 @@ func TestClickHouseProviderQueryTurnsBackwardInclusive(t *testing.T) {

taskID := uuid.MustParse("11111111-1111-1111-1111-111111111111")
now := time.Unix(1_710_000_010, 0).UTC()
chunkRows := sqlmock.NewRows([]string{"ts", "event", "kind", "data", "turn_seq"}).
AddRow(now.Add(time.Second), "user-input", "", "turn-5", uint32(5)).
AddRow(now, "user-input", "", "turn-4", uint32(4))
chunkRows := sqlmock.NewRows([]string{"ts", "event", "kind", "data", "turn_seq", "msg_seq_start"}).
AddRow(now.Add(time.Second), "user-input", "", "turn-5", uint32(5), uint64(51)).
AddRow(now, "user-input", "", "turn-4", uint32(4), uint64(41))

mock.ExpectQuery("SELECT ts, event, kind, data, turn_seq[\\s\\S]*turn_seq <= \\?[\\s\\S]*ORDER BY turn_seq DESC[\\s\\S]*LIMIT \\?[\\s\\S]*ORDER BY turn_seq DESC, ts ASC, msg_seq_start ASC, ingest_id ASC\\s*$").
mock.ExpectQuery("SELECT ts, event, kind, data, turn_seq, msg_seq_start[\\s\\S]*turn_seq <= \\?[\\s\\S]*ORDER BY turn_seq DESC[\\s\\S]*LIMIT \\?[\\s\\S]*ORDER BY turn_seq DESC, ts ASC, msg_seq_start ASC, ingest_id ASC\\s*$").
WithArgs(taskID, taskID, uint32(5), 2).
WillReturnRows(chunkRows)

Expand All @@ -351,6 +360,9 @@ func TestClickHouseProviderQueryTurnsBackwardInclusive(t *testing.T) {
if resp.Chunks[0].TurnSeq != 5 || resp.Chunks[1].TurnSeq != 4 {
t.Fatalf("chunk turn_seqs = [%d, %d], want [5, 4]", resp.Chunks[0].TurnSeq, resp.Chunks[1].TurnSeq)
}
if resp.Chunks[0].Seq != 51 || resp.Chunks[1].Seq != 41 {
t.Fatalf("chunk seqs = [%d, %d], want [51, 41]", resp.Chunks[0].Seq, resp.Chunks[1].Seq)
}
if resp.HasMore {
t.Fatal("expected has_more=false")
}
Expand Down
1 change: 1 addition & 0 deletions backend/pkg/tasklog/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type TurnChunk struct {
Event string
Kind string
Timestamp int64
Seq uint64 // 消息序号,来自 ClickHouse msg_seq_start
TurnSeq uint32 // 轮次号,仅 ClickHouse 有值,Loki 为 0
Labels map[string]string
}
Expand Down