-
Notifications
You must be signed in to change notification settings - Fork 5
Expand file tree
/
Copy pathstreamsql_strategy_test.go
More file actions
150 lines (135 loc) · 5.71 KB
/
streamsql_strategy_test.go
File metadata and controls
150 lines (135 loc) · 5.71 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
package streamsql
import (
"testing"
"time"
"github.com/rulego/streamsql/types"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
// TestSQLIntegration_StrategyBlock 测试 SQL 集成下的阻塞策略
func TestSQLIntegration_StrategyBlock(t *testing.T) {
// 配置:输出缓冲为 1,阻塞策略,超时 100ms
ssql := New(WithCustomPerformance(types.PerformanceConfig{
BufferConfig: types.BufferConfig{
DataChannelSize: 100,
ResultChannelSize: 100,
WindowOutputSize: 1,
},
OverflowConfig: types.OverflowConfig{
Strategy: types.OverflowStrategyBlock,
BlockTimeout: 100 * time.Millisecond,
AllowDataLoss: true,
},
WorkerConfig: types.WorkerConfig{
SinkPoolSize: 0, // 无缓冲任务队列
SinkWorkerCount: 1, // 1个 worker
},
}))
defer ssql.Stop()
// SQL: 每条数据触发一次窗口
rsql := "SELECT deviceId FROM stream GROUP BY deviceId, CountingWindow(1)"
err := ssql.Execute(rsql)
require.NoError(t, err)
// 添加同步 Sink 阻塞 Stream 处理,从而反压 Window
// 注意:必须在 Execute 之后添加,因为 Execute 才会创建 stream
ssql.AddSyncSink(func(results []map[string]interface{}) {
time.Sleep(500 * time.Millisecond)
})
// 发送 5 条数据
// d1: Worker 处理中 (阻塞 500ms)
// d2: Stream 尝试写入 WorkerPool -> 阻塞 (无缓冲)
// d3: Window OutputChan (size 1) -> 填满
// d4: Window OutputChan 满 -> 尝试写入 -> 阻塞 (Window Add) -> 放入 TriggerChan (size=1)
// d5: Window Add -> TriggerChan 满 -> 阻塞? No, Emit 是异步的?
// Emit 往 dataChan 写. DataProcessor 读 dataChan -> Window.Add.
// Window.Add 往 triggerChan 写.
//
// 修正分析:
// Window.Add 是非阻塞的 (如果 triggerChan 不满).
// CountingWindow triggerChan size = bufferSize = 1.
// Worker 协程: 从 triggerChan 读 -> 处理 -> sendResult (到 OutputChan).
//
// d1: Worker读triggerChan -> OutputChan -> Stream -> WorkerPool -> Worker(busy).
// d2: Worker读triggerChan -> OutputChan -> Stream -> Blocked on WorkerPool.
// 此时 Stream 持有 d2. OutputChan 空.
// Worker 协程 阻塞在 sendResult(d2)? No, Stream 取走了 d2, Stream 阻塞在 dispatch.
// 所以 OutputChan 是空的!
// Wait, Stream loop:
// result := <-OutputChan. (Stream has d2).
// handleResult(d2) -> Blocked.
// So OutputChan is empty.
// d3: Worker读triggerChan -> OutputChan (d3). Success.
// OutputChan has d3.
// d4: Worker读triggerChan -> OutputChan (d4). Blocked (OutputChan full).
// Worker 协程 阻塞在 sendResult(d4).
// d5: Add -> triggerChan (d5). Success (triggerChan size 1).
// d6: Add -> triggerChan (d6). Blocked (triggerChan full).
// Add blocks. DataProcessor blocks. Emit succeeds (dataChan).
//
// 所以 Window Worker 只有在 sendResult 阻塞时才触发 Drop logic.
// sendResult 只有在 OutputChan 满且超时时才 Drop.
//
// d4 阻塞在 sendResult.
// 100ms 后超时 -> Drop d4.
// Worker 继续.
//
// 所以 d4 应该是被 Drop 的那个.
// Sent: d1, d2, d3. (d5 在 triggerChan, d6 在 dataChan).
// Wait, d5 is in triggerChan, not processed yet.
// So Sent = 3. Dropped = 1 (d4).
for _, id := range []string{"d1", "d2", "d3", "d4", "d5"} {
ssql.Emit(map[string]interface{}{"deviceId": id})
time.Sleep(10 * time.Millisecond)
}
// 等待足够长的时间让 Stream 醒来并处理完,以及 Window 丢弃逻辑执行
time.Sleep(1000 * time.Millisecond)
// 获取统计信息
// d1: Stream 处理完
// d2: Stream 处理完 (Worker 醒来后处理 d2)
// d3: Dropped (Worker 阻塞 -> 超时)
// d4: Dropped (Worker 阻塞 -> 超时)
// d5: Dropped (Worker 阻塞 -> 超时)
// Total Sent: 2 (d1, d2).
// Dropped: 3 (d3, d4, d5).
stats := ssql.stream.GetStats()
assert.Equal(t, int64(3), stats["droppedCount"], "Should have 3 dropped window result due to overflow")
assert.Equal(t, int64(2), stats["sentCount"], "Should have 2 sent window result")
}
// TestSQLIntegration_StrategyDrop 测试 SQL 集成下的丢弃策略
func TestSQLIntegration_StrategyDrop(t *testing.T) {
// 配置:输出缓冲为 1,丢弃策略
ssql := New(WithCustomPerformance(types.PerformanceConfig{
BufferConfig: types.BufferConfig{
DataChannelSize: 100,
ResultChannelSize: 100,
WindowOutputSize: 1,
},
OverflowConfig: types.OverflowConfig{
Strategy: types.OverflowStrategyDrop,
},
}))
defer ssql.Stop()
// SQL: 每条数据触发一次窗口
rsql := "SELECT deviceId FROM stream GROUP BY deviceId, CountingWindow(1)"
err := ssql.Execute(rsql)
require.NoError(t, err)
// 连续发送 3 条数据
ssql.Emit(map[string]interface{}{"deviceId": "d1"})
ssql.Emit(map[string]interface{}{"deviceId": "d2"})
ssql.Emit(map[string]interface{}{"deviceId": "d3"})
// 等待处理完成
time.Sleep(200 * time.Millisecond)
// 对于 StrategyDrop,它会挤掉旧数据,所以 sentCount 应该持续增加
stats := ssql.stream.GetStats()
// d1, d2, d3 都会成功发送(虽然 d1, d2 可能被挤掉,但 sendResult 逻辑中挤掉旧的后写入新的算发送成功)
assert.Equal(t, int64(3), stats["sentCount"])
// 验证最终留在缓冲区的是最后一条数据 (d3)
// 注意:AddSink 会启动 worker 从 OutputChan 读。
// 为了验证,我们直接从 Window 的 OutputChan 读
select {
case result := <-ssql.stream.Window.OutputChan():
assert.Equal(t, "d3", result[0].Data.(map[string]interface{})["deviceId"])
case <-time.After(100 * time.Millisecond):
// 如果已经被 AddSink 的 worker 读走了也正常,但由于我们没加 Sink,所以应该在里面
}
}