-
Notifications
You must be signed in to change notification settings - Fork 5
Expand file tree
/
Copy pathstreamsql_test.go
More file actions
3007 lines (2471 loc) · 90.5 KB
/
streamsql_test.go
File metadata and controls
3007 lines (2471 loc) · 90.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
package streamsql
import (
"context"
"fmt"
"strings"
"sync"
"testing"
"time"
"github.com/rulego/streamsql/utils/cast"
"math/rand"
"github.com/rulego/streamsql/functions"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
// TestStreamData 测试 StreamSQL 的流式数据处理功能
// 这个测试演示了 StreamSQL 的完整工作流程:从创建实例到数据处理再到结果验证
func TestStreamData(t *testing.T) {
// 步骤1: 创建 StreamSQL 实例
// StreamSQL 是流式 SQL 处理引擎的核心组件,负责管理整个流处理生命周期
ssql := New()
// 确保测试结束时停止流处理,释放资源
defer ssql.Stop()
// 步骤2: 定义流式 SQL 查询语句
// 这个 SQL 语句展示了 StreamSQL 的核心功能:
// - SELECT: 选择要输出的字段和聚合函数
// - FROM stream: 指定数据源为流数据
// - WHERE: 过滤条件,排除 device3 的数据
// - GROUP BY: 按设备ID分组,配合滚动窗口进行聚合
// - TumblingWindow('5s'): 5秒滚动窗口,每5秒触发一次计算
// - avg(), min(): 聚合函数,计算平均值和最小值
// - window_start(), window_end(): 窗口函数,获取窗口的开始和结束时间
rsql := "SELECT deviceId,avg(temperature) as avg_temp,min(humidity) as min_humidity ," +
"window_start() as start,window_end() as end FROM stream where deviceId!='device3' group by deviceId,TumblingWindow('5s')"
// 步骤3: 执行 SQL 语句,启动流式分析任务
// Execute 方法会解析 SQL、构建执行计划、初始化窗口管理器和聚合器
err := ssql.Execute(rsql)
if err != nil {
panic(err)
}
// 步骤4: 设置测试环境和并发控制
var wg sync.WaitGroup
wg.Add(1)
// 设置30秒测试超时时间,防止测试无限运行
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
// 步骤5: 启动数据生产者协程
// 模拟实时数据流,持续向 StreamSQL 输入数据
go func() {
defer wg.Done()
// 创建定时器,每秒触发一次数据生成
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
// 每秒生成10条随机测试数据,模拟高频数据流
// 这种数据密度可以测试 StreamSQL 的实时处理能力
for i := 0; i < 10; i++ {
// 构造设备数据,包含设备ID、温度和湿度
randomData := map[string]interface{}{
"deviceId": fmt.Sprintf("device%d", rand.Intn(3)+1), // 随机选择 device1, device2, device3
"temperature": 20.0 + rand.Float64()*10, // 温度范围: 20-30度
"humidity": 50.0 + rand.Float64()*20, // 湿度范围: 50-70%
}
// 将数据添加到流中,触发 StreamSQL 的实时处理
// Emit 会将数据分发到相应的窗口和聚合器中
ssql.Emit(randomData)
}
case <-ctx.Done():
// 超时或取消信号,停止数据生成
return
}
}
}()
// 步骤6: 设置结果处理管道
resultChan := make(chan interface{}, 10)
// 添加计算结果回调函数(Sink)
// 当窗口触发计算时,结果会通过这个回调函数输出
ssql.stream.AddSink(func(result []map[string]interface{}) {
// 非阻塞发送,避免阻塞 sink worker
select {
case resultChan <- result:
default:
// Channel 已满,忽略(非阻塞发送)
}
})
// 步骤7: 启动结果消费者协程
// 记录收到的结果数量,用于验证测试效果
var resultCount int64
var countMutex sync.Mutex
var consumerWg sync.WaitGroup
consumerWg.Add(1)
go func() {
defer consumerWg.Done()
for {
select {
case <-resultChan:
// 每当收到一个窗口的计算结果时,计数器加1
// 注释掉的代码可以用于调试,打印每个结果的详细信息
//fmt.Printf("打印结果: [%s] %v\n", time.Now().Format("15:04:05.000"), result)
countMutex.Lock()
resultCount++
countMutex.Unlock()
case <-ctx.Done():
// 测试超时,退出消费者 goroutine
// 不关闭 channel,让主程序自动退出时清理
return
}
}
}()
// 步骤8: 等待测试完成
// 等待数据生产者协程结束(30秒超时或手动取消)
wg.Wait()
// 停止流处理,确保所有 goroutine 正确退出
ssql.Stop()
// 等待一小段时间,确保所有 sink worker 完成当前任务
// 这样可以确保所有结果都被发送到 channel
time.Sleep(100 * time.Millisecond)
// 取消 context,通知消费者 goroutine 退出
cancel()
// 等待消费者 goroutine 完成(处理完 channel 中剩余的数据或收到取消信号)
consumerWg.Wait()
// 步骤9: 验证测试结果
// 预期在30秒内应该收到5个窗口的计算结果(每5秒一个窗口)
// 这验证了 StreamSQL 的窗口触发机制是否正常工作
countMutex.Lock()
finalCount := resultCount
countMutex.Unlock()
assert.Equal(t, finalCount, int64(5))
}
func TestStreamsql(t *testing.T) {
streamsql := New()
defer streamsql.Stop()
var rsql = "SELECT device,max(temperature) as max_temp,min(humidity) as min_humidity,window_start() as start,window_end() as end FROM stream group by device,SlidingWindow('2s','1s')"
err := streamsql.Execute(rsql)
assert.Nil(t, err)
strm := streamsql.stream
// 不使用事件时间,不需要时间戳字段
testData := []map[string]interface{}{
{"device": "aa", "temperature": 25.0, "humidity": 60},
{"device": "aa", "temperature": 30.0, "humidity": 55},
{"device": "bb", "temperature": 22.0, "humidity": 70},
}
for _, data := range testData {
strm.Emit(data)
}
// 捕获结果
resultChan := make(chan interface{}, 10)
strm.AddSink(func(result []map[string]interface{}) {
select {
case resultChan <- result:
default:
// 非阻塞发送,避免阻塞
}
})
// 等待窗口触发
// 由于使用事件时间,需要等待 watermark 推进(IDLETIMEOUT='2s' 会在2秒后推进)
time.Sleep(3 * time.Second)
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
var actual interface{}
select {
case actual = <-resultChan:
cancel()
case <-ctx.Done():
t.Fatal("Timeout waiting for results")
}
expected := []map[string]interface{}{
{
"device": "aa",
"max_temp": 30.0,
"min_humidity": 55.0,
},
{
"device": "bb",
"max_temp": 22.0,
"min_humidity": 70.0,
},
}
assert.IsType(t, []map[string]interface{}{}, actual)
resultSlice, ok := actual.([]map[string]interface{})
require.True(t, ok)
assert.Len(t, resultSlice, 2)
for _, expectedResult := range expected {
found := false
for _, resultMap := range resultSlice {
if resultMap["device"] == expectedResult["device"] {
assert.InEpsilon(t, expectedResult["max_temp"].(float64), resultMap["max_temp"].(float64), 0.0001)
assert.InEpsilon(t, expectedResult["min_humidity"].(float64), resultMap["min_humidity"].(float64), 0.0001)
// 事件时间模式下,窗口时间基于事件时间戳,检查字段存在和有效性
assert.Contains(t, resultMap, "start")
assert.Contains(t, resultMap, "end")
start, ok1 := resultMap["start"].(int64)
end, ok2 := resultMap["end"].(int64)
assert.True(t, ok1 && ok2, "start and end should be int64")
assert.Greater(t, end, start, "end should be greater than start")
found = true
break
}
}
assert.True(t, found, fmt.Sprintf("Expected result for device %v not found", expectedResult["device"]))
}
}
func TestStreamsqlWithoutGroupBy(t *testing.T) {
streamsql := New()
defer streamsql.Stop()
var rsql = "SELECT max(temperature) as max_temp,min(humidity) as min_humidity,window_start() as start,window_end() as end FROM stream SlidingWindow('2s','1s')"
err := streamsql.Execute(rsql)
assert.Nil(t, err)
strm := streamsql.stream
// 不使用事件时间,不需要时间戳字段
testData := []map[string]interface{}{
{"device": "aa", "temperature": 25.0, "humidity": 60},
{"device": "aa", "temperature": 30.0, "humidity": 55},
{"device": "bb", "temperature": 22.0, "humidity": 70},
}
for _, data := range testData {
strm.Emit(data)
}
// 捕获结果
resultChan := make(chan interface{}, 10)
strm.AddSink(func(result []map[string]interface{}) {
select {
case resultChan <- result:
default:
// 非阻塞发送
}
})
// 等待窗口触发(事件时间模式需要等待 watermark 推进)
time.Sleep(3 * time.Second)
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
var actual interface{}
select {
case actual = <-resultChan:
cancel()
case <-ctx.Done():
t.Fatal("Timeout waiting for results")
}
expected := []map[string]interface{}{
{
"max_temp": 30.0,
"min_humidity": 55.0,
},
}
assert.IsType(t, []map[string]interface{}{}, actual)
resultSlice, ok := actual.([]map[string]interface{})
require.True(t, ok)
assert.Len(t, resultSlice, 1)
for _, expectedResult := range expected {
for _, resultMap := range resultSlice {
assert.InEpsilon(t, expectedResult["max_temp"].(float64), resultMap["max_temp"].(float64), 0.0001)
assert.InEpsilon(t, expectedResult["min_humidity"].(float64), resultMap["min_humidity"].(float64), 0.0001)
// 事件时间模式下,窗口时间基于事件时间戳,检查字段存在
assert.Contains(t, resultMap, "start")
assert.Contains(t, resultMap, "end")
start, ok1 := resultMap["start"].(int64)
end, ok2 := resultMap["end"].(int64)
assert.True(t, ok1 && ok2, "start and end should be int64")
assert.Greater(t, end, start, "end should be greater than start")
}
}
}
func TestStreamsqlDistinct(t *testing.T) {
streamsql := New()
defer streamsql.Stop()
// 测试 SELECT DISTINCT 功能 - 使用聚合函数和 GROUP BY
var rsql = "SELECT DISTINCT device, AVG(temperature) as avg_temp FROM stream GROUP BY device, TumblingWindow('1s')"
err := streamsql.Execute(rsql)
assert.Nil(t, err)
strm := streamsql.stream
//fmt.Println("开始测试 SELECT DISTINCT 功能")
// 添加测试数据,包含重复的设备数据
// 不使用事件时间,不需要时间戳字段
testData := []map[string]interface{}{
{"device": "aa", "temperature": 25.0},
{"device": "aa", "temperature": 35.0}, // 相同设备,不同温度
{"device": "bb", "temperature": 22.0},
{"device": "bb", "temperature": 28.0}, // 相同设备,不同温度
{"device": "cc", "temperature": 30.0},
}
// 添加数据
//fmt.Println("添加测试数据")
for _, data := range testData {
strm.Emit(data)
}
// 创建结果接收通道
resultChan := make(chan interface{}, 10)
// 添加结果回调
strm.AddSink(func(result []map[string]interface{}) {
//fmt.Printf("接收到结果: %v\n", result)
// 使用 recover 防止在 channel 关闭后发送数据导致 panic
defer func() {
if r := recover(); r != nil {
// Channel 已关闭,忽略错误
}
}()
select {
case resultChan <- result:
default:
// 非阻塞发送
}
})
// 等待窗口触发(处理时间模式)
//fmt.Println("等待窗口初始化...")
time.Sleep(1500 * time.Millisecond)
// 手动触发窗口
//fmt.Println("手动触发窗口")
strm.Window.Trigger()
// 等待结果,增加超时时间以确保窗口有足够时间触发
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
var actual interface{}
select {
case actual = <-resultChan:
//fmt.Println("成功接收到结果")
cancel()
case <-ctx.Done():
t.Fatal("测试超时,未收到结果")
}
// 验证结果
resultSlice, ok := actual.([]map[string]interface{})
require.True(t, ok, "结果应该是[]map[string]interface{}类型")
// 验证去重后的结果数量
assert.Len(t, resultSlice, 3, "应该有3个设备的聚合结果")
// 检查是否包含所有预期的设备
deviceFound := make(map[string]bool)
for _, result := range resultSlice {
device, ok := result["device"].(string)
if ok {
deviceFound[device] = true
}
}
assert.True(t, deviceFound["aa"], "结果应包含设备aa")
assert.True(t, deviceFound["bb"], "结果应包含设备bb")
assert.True(t, deviceFound["cc"], "结果应包含设备cc")
// 验证聚合结果 - aa设备的平均温度应为(25+35)/2=30
for _, result := range resultSlice {
device, _ := result["device"].(string)
avgTemp, ok := result["avg_temp"].(float64)
assert.True(t, ok, "avg_temp应该是float64类型")
if device == "aa" {
assert.InEpsilon(t, 30.0, avgTemp, 0.001, "aa设备的平均温度应为30")
} else if device == "bb" {
assert.InEpsilon(t, 25.0, avgTemp, 0.001, "bb设备的平均温度应为25")
} else if device == "cc" {
assert.InEpsilon(t, 30.0, avgTemp, 0.001, "cc设备的平均温度应为30")
}
}
//fmt.Println("测试完成")
}
func TestStreamsqlLimit(t *testing.T) {
// 测试场景1:简单LIMIT功能,不使用窗口函数
t.Run("简单LIMIT查询", func(t *testing.T) {
streamsql := New()
defer streamsql.Stop()
var rsql = "SELECT * FROM stream LIMIT 2"
err := streamsql.Execute(rsql)
assert.Nil(t, err)
strm := streamsql.stream
// 创建结果接收通道
resultChan := make(chan interface{}, 10)
// 添加结果接收器
strm.AddSink(func(result []map[string]interface{}) {
resultChan <- result
})
// 添加测试数据
testData := []map[string]interface{}{
{"device": "aa", "temperature": 25.0},
{"device": "bb", "temperature": 22.0},
{"device": "cc", "temperature": 30.0},
{"device": "dd", "temperature": 28.0},
}
// 实时验证:添加一条数据,立即验证一条结果
for i, data := range testData {
// 添加数据
strm.Emit(data)
// 立即等待并验证结果
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
select {
case result := <-resultChan:
// 验证结果格式
resultSlice, ok := result.([]map[string]interface{})
require.True(t, ok, "结果应该是[]map[string]interface{}类型")
// 验证LIMIT限制:每个batch最多2条记录
assert.LessOrEqual(t, len(resultSlice), 2, "每个batch最多2条记录")
assert.Greater(t, len(resultSlice), 0, "应该有结果")
// 验证字段
for _, item := range resultSlice {
assert.Contains(t, item, "device", "结果应包含device字段")
assert.Contains(t, item, "temperature", "结果应包含temperature字段")
}
_ = i
//t.Logf("第%d条数据处理完成,收到%d条结果记录", i+1, len(resultSlice))
cancel()
case <-ctx.Done():
cancel()
//t.Fatalf("第%d条数据添加后超时,未收到实时结果", i+1)
}
}
// 验证总体处理:由于LIMIT 2,应该处理完4条数据
//t.Log("所有数据都得到了实时处理,符合非聚合场景的流处理特性")
})
// 测试场景2:聚合查询 + LIMIT
t.Run("聚合查询与LIMIT", func(t *testing.T) {
streamsql := New()
defer streamsql.Stop()
var rsql = "SELECT device, avg(temperature) as avg_temp, count(*) as cnt FROM stream GROUP BY device LIMIT 2"
err := streamsql.Execute(rsql)
assert.Nil(t, err)
strm := streamsql.stream
// 创建结果接收通道
resultChan := make(chan interface{}, 10)
// 添加结果回调
strm.AddSink(func(result []map[string]interface{}) {
resultChan <- result
})
// 添加测试数据 - 多个设备的温度数据
testData := []map[string]interface{}{
{"device": "sensor1", "temperature": 20.0},
{"device": "sensor1", "temperature": 22.0},
{"device": "sensor2", "temperature": 25.0},
{"device": "sensor2", "temperature": 27.0},
{"device": "sensor3", "temperature": 30.0},
{"device": "sensor3", "temperature": 32.0},
{"device": "sensor4", "temperature": 35.0},
{"device": "sensor4", "temperature": 37.0},
}
// 添加数据
for _, data := range testData {
strm.Emit(data)
}
// 等待聚合
time.Sleep(500 * time.Millisecond)
// 手动触发窗口
strm.Window.Trigger()
// 等待结果
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
var actual interface{}
select {
case actual = <-resultChan:
cancel()
case <-ctx.Done():
t.Fatal("测试超时,未收到聚合结果")
}
// 验证聚合结果
resultSlice, ok := actual.([]map[string]interface{})
require.True(t, ok, "结果应该是[]map[string]interface{}类型")
// LIMIT 2 应该只返回2个设备的聚合结果
assert.LessOrEqual(t, len(resultSlice), 2, "聚合结果应该限制在2条以内")
assert.Greater(t, len(resultSlice), 0, "应该有聚合结果")
// 验证聚合字段
for _, result := range resultSlice {
assert.Contains(t, result, "device", "结果应包含device字段")
assert.Contains(t, result, "avg_temp", "结果应包含avg_temp字段")
assert.Contains(t, result, "cnt", "结果应包含cnt字段")
// 验证聚合值的类型和合理性
avgTemp, ok := result["avg_temp"].(float64)
assert.True(t, ok, "avg_temp应该是float64类型")
assert.Greater(t, avgTemp, 0.0, "平均温度应该大于0")
cnt, ok := result["cnt"].(float64)
assert.True(t, ok, "cnt应该是float64类型")
assert.GreaterOrEqual(t, cnt, 1.0, "计数应该至少为1")
}
})
// 测试场景3:窗口聚合 + LIMIT
t.Run("窗口聚合与LIMIT", func(t *testing.T) {
streamsql := New()
defer streamsql.Stop()
var rsql = "SELECT device, max(temperature) as max_temp, min(temperature) as min_temp FROM stream GROUP BY device, TumblingWindow('1s') LIMIT 3"
err := streamsql.Execute(rsql)
assert.Nil(t, err)
strm := streamsql.stream
// 创建结果接收通道
resultChan := make(chan interface{}, 10)
// 添加结果回调
strm.AddSink(func(result []map[string]interface{}) {
resultChan <- result
})
// 添加测试数据 - 5个设备的数据
testData := []map[string]interface{}{
{"device": "dev1", "temperature": 20.0},
{"device": "dev2", "temperature": 25.0},
{"device": "dev3", "temperature": 30.0},
{"device": "dev4", "temperature": 35.0},
{"device": "dev5", "temperature": 40.0},
}
// 添加数据
for _, data := range testData {
strm.Emit(data)
}
// 等待窗口触发
time.Sleep(1200 * time.Millisecond) // 等待超过窗口大小
// 等待结果
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
var actual interface{}
select {
case actual = <-resultChan:
cancel()
case <-ctx.Done():
t.Fatal("测试超时,未收到窗口聚合结果")
}
// 验证窗口聚合结果
resultSlice, ok := actual.([]map[string]interface{})
require.True(t, ok, "结果应该是[]map[string]interface{}类型")
// LIMIT 3 应该只返回3个设备的聚合结果
assert.LessOrEqual(t, len(resultSlice), 3, "窗口聚合结果应该限制在3条以内")
assert.Greater(t, len(resultSlice), 0, "应该有窗口聚合结果")
// 验证聚合字段
for _, result := range resultSlice {
assert.Contains(t, result, "device", "结果应包含device字段")
assert.Contains(t, result, "max_temp", "结果应包含max_temp字段")
assert.Contains(t, result, "min_temp", "结果应包含min_temp字段")
// 验证最大值和最小值
maxTemp, ok := result["max_temp"].(float64)
assert.True(t, ok, "max_temp应该是float64类型")
minTemp, ok := result["min_temp"].(float64)
assert.True(t, ok, "min_temp应该是float64类型")
assert.GreaterOrEqual(t, maxTemp, minTemp, "最大值应该大于等于最小值")
}
})
// 测试场景4:HAVING + LIMIT 组合
t.Run("HAVING与LIMIT组合", func(t *testing.T) {
streamsql := New()
defer streamsql.Stop()
var rsql = "SELECT device, avg(temperature) as avg_temp FROM stream GROUP BY device HAVING avg_temp > 25 LIMIT 2"
err := streamsql.Execute(rsql)
assert.Nil(t, err)
strm := streamsql.stream
// 创建结果接收通道
resultChan := make(chan interface{}, 10)
// 添加结果回调
strm.AddSink(func(result []map[string]interface{}) {
resultChan <- result
})
// 添加测试数据 - 设计一些平均温度大于25的设备
testData := []map[string]interface{}{
{"device": "cold_sensor", "temperature": 15.0},
{"device": "cold_sensor", "temperature": 18.0}, // 平均16.5,不满足条件
{"device": "warm_sensor1", "temperature": 26.0},
{"device": "warm_sensor1", "temperature": 28.0}, // 平均27,满足条件
{"device": "warm_sensor2", "temperature": 30.0},
{"device": "warm_sensor2", "temperature": 32.0}, // 平均31,满足条件
{"device": "warm_sensor3", "temperature": 35.0},
{"device": "warm_sensor3", "temperature": 37.0}, // 平均36,满足条件
}
// 添加数据
for _, data := range testData {
strm.Emit(data)
}
// 等待聚合
time.Sleep(500 * time.Millisecond)
// 手动触发窗口
strm.Window.Trigger()
// 等待结果
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
var actual interface{}
select {
case actual = <-resultChan:
cancel()
case <-ctx.Done():
t.Fatal("测试超时,未收到HAVING+LIMIT结果")
}
// 验证HAVING + LIMIT结果
resultSlice, ok := actual.([]map[string]interface{})
require.True(t, ok, "结果应该是[]map[string]interface{}类型")
// LIMIT 2 + HAVING条件,最多2条符合条件的结果
assert.LessOrEqual(t, len(resultSlice), 2, "HAVING+LIMIT结果应该限制在2条以内")
// 验证所有结果都满足HAVING条件
for _, result := range resultSlice {
assert.Contains(t, result, "device", "结果应包含device字段")
assert.Contains(t, result, "avg_temp", "结果应包含avg_temp字段")
// 验证不包含cold_sensor(平均温度<25)
assert.NotEqual(t, "cold_sensor", result["device"], "结果不应包含cold_sensor")
// 验证平均温度确实大于25
avgTemp, ok := result["avg_temp"].(float64)
assert.True(t, ok, "avg_temp应该是float64类型")
assert.Greater(t, avgTemp, 25.0, "avg_temp应该大于25(满足HAVING条件)")
}
})
}
func TestSimpleQuery(t *testing.T) {
strm := New()
// 测试结束时确保关闭流处理
defer strm.Stop()
// 测试简单查询,不使用窗口函数
var rsql = "SELECT device, temperature FROM stream"
err := strm.Execute(rsql)
assert.Nil(t, err)
// 创建结果接收通道
resultChan := make(chan interface{}, 10)
// 添加sink
strm.stream.AddSink(func(result []map[string]interface{}) {
//fmt.Printf("接收到结果: %v\n", result)
resultChan <- result
})
//添加数据
testData := []map[string]interface{}{
{"device": "test-device", "temperature": 25.5},
}
// 发送数据
//fmt.Println("添加数据...")
for _, data := range testData {
strm.Emit(data)
}
// 等待结果
//fmt.Println("等待结果...")
//等待结果
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
select {
case result := <-resultChan:
//fmt.Printf("收到结果: %v\n", result)
// 验证结果
resultSlice, ok := result.([]map[string]interface{})
require.True(t, ok, "结果应该是[]map[string]interface{}类型")
require.Len(t, resultSlice, 1, "应该只有一条结果")
item := resultSlice[0]
assert.Equal(t, "test-device", item["device"], "device字段应该正确")
assert.Equal(t, 25.5, item["temperature"], "temperature字段应该正确")
cancel()
case <-ctx.Done():
t.Fatal("测试超时,未收到结果")
}
time.Sleep(500 * time.Millisecond)
}
func TestHavingClause(t *testing.T) {
streamsql := New()
defer streamsql.Stop()
// 定义SQL语句,使用HAVING子句
rsql := "SELECT device, avg(temperature) as avg_temp FROM stream GROUP BY device HAVING avg_temp > 25"
err := streamsql.Execute(rsql)
assert.Nil(t, err)
strm := streamsql.stream
// 创建结果接收通道
resultChan := make(chan interface{}, 10)
// 添加结果回调
strm.AddSink(func(result []map[string]interface{}) {
//fmt.Printf("接收到结果: %v\n", result)
resultChan <- result
})
// 添加测试数据,确保有不同的聚合结果
testData := []map[string]interface{}{
{"device": "dev1", "temperature": 20.0},
{"device": "dev1", "temperature": 22.0},
{"device": "dev2", "temperature": 26.0},
{"device": "dev2", "temperature": 28.0},
{"device": "dev3", "temperature": 30.0},
}
// 添加数据
for _, data := range testData {
strm.Emit(data)
}
// 等待窗口初始化
time.Sleep(500 * time.Millisecond)
// 手动触发窗口
strm.Window.Trigger()
// 等待结果
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
var actual interface{}
select {
case actual = <-resultChan:
cancel()
case <-ctx.Done():
t.Fatal("测试超时,未收到结果")
}
// 验证结果
resultSlice, ok := actual.([]map[string]interface{})
require.True(t, ok, "结果应该是[]map[string]interface{}类型")
// HAVING avg_temp > 25 应该只返回dev2和dev3
// 验证结果中不包含dev1
for _, result := range resultSlice {
assert.NotEqual(t, "dev1", result["device"], "结果不应包含dev1")
assert.Contains(t, []string{"dev2", "dev3"}, result["device"], "结果应只包含dev2和dev3")
// 验证平均温度确实大于25
avgTemp, ok := result["avg_temp"].(float64)
assert.True(t, ok, "avg_temp应该是float64类型")
assert.Greater(t, avgTemp, 25.0, "avg_temp应该大于25")
}
}
func TestSessionWindow(t *testing.T) {
streamsql := New()
defer streamsql.Stop()
// 使用 SESSION 窗口,超时时间为 2 秒
rsql := "SELECT device, avg(temperature) as avg_temp FROM stream GROUP BY device, SESSIONWINDOW('2s')"
err := streamsql.Execute(rsql)
assert.Nil(t, err)
strm := streamsql.stream
// 创建结果接收通道
resultChan := make(chan interface{}, 10)
// 添加结果回调
strm.AddSink(func(result []map[string]interface{}) {
//fmt.Printf("接收到结果: %v\n", result)
resultChan <- result
})
// 添加测试数据 - 两个设备,不同的时间
// 不使用事件时间,不需要时间戳字段
testData := []struct {
data map[string]interface{}
wait time.Duration
}{
// 第一组数据 - device1
{map[string]interface{}{"device": "device1", "temperature": 20.0}, 0},
{map[string]interface{}{"device": "device1", "temperature": 22.0}, 500 * time.Millisecond},
// 第二组数据 - device2
{map[string]interface{}{"device": "device2", "temperature": 25.0}, time.Second},
{map[string]interface{}{"device": "device2", "temperature": 27.0}, 500 * time.Millisecond},
// 间隔超过会话超时
// 第三组数据 - device1,新会话
{map[string]interface{}{"device": "device1", "temperature": 30.0}, 3 * time.Second},
}
// 按指定的间隔添加数据
for _, item := range testData {
if item.wait > 0 {
time.Sleep(item.wait)
}
strm.Emit(item.data)
}
// 等待会话超时,使最后一个会话触发
time.Sleep(3 * time.Second)
// 手动触发所有窗口,确保数据被处理
strm.Window.Trigger()
// 收集结果
var results []interface{}
var resultsMutex sync.Mutex
// 等待接收结果
timeout := time.After(5 * time.Second)
done := false
for !done {
select {
case result := <-resultChan:
resultsMutex.Lock()
results = append(results, result)
resultCount := len(results)
resultsMutex.Unlock()
// 我们期望至少 3 个会话结果
if resultCount >= 3 {
done = true
}
case <-timeout:
// 超时,可能没有收到足够的结果
done = true
}
}
// 验证结果
resultsMutex.Lock()
resultCount := len(results)
resultsCopy := make([]interface{}, len(results))
copy(resultsCopy, results)
resultsMutex.Unlock()
assert.GreaterOrEqual(t, resultCount, 2, "应该至少收到两个会话的结果")
// 检查结果中是否包含两个设备的会话
hasDevice1 := false
hasDevice2 := false
for _, result := range resultsCopy {
resultSlice, ok := result.([]map[string]interface{})
assert.True(t, ok, "结果应该是[]map[string]interface{}类型")
for _, item := range resultSlice {
device, ok := item["device"].(string)
assert.True(t, ok, "device字段应该是string类型")
if device == "device1" {
hasDevice1 = true
} else if device == "device2" {
hasDevice2 = true
}
}
}
assert.True(t, hasDevice1, "结果中应该包含device1的会话")
assert.True(t, hasDevice2, "结果中应该包含device2的会话")
}
func TestExpressionInAggregation(t *testing.T) {
streamsql := New()
defer streamsql.Stop()
// 测试在聚合函数中使用表达式
var rsql = "SELECT device, AVG(temperature * 1.8 + 32) as fahrenheit FROM stream GROUP BY device, TumblingWindow('1s')"
err := streamsql.Execute(rsql)
assert.Nil(t, err)
strm := streamsql.stream
//fmt.Println("开始测试表达式功能")
// 添加测试数据,温度使用摄氏度
// 不使用事件时间,不需要时间戳字段
testData := []map[string]interface{}{
{"device": "aa", "temperature": 0.0}, // 华氏度应为 32
{"device": "aa", "temperature": 100.0}, // 华氏度应为 212
{"device": "bb", "temperature": 20.0}, // 华氏度应为 68
{"device": "bb", "temperature": 30.0}, // 华氏度应为 86
}
// 添加数据
//fmt.Println("添加测试数据")
for _, data := range testData {
strm.Emit(data)
}
// 创建结果接收通道
resultChan := make(chan interface{}, 10)
// 添加结果回调
strm.AddSink(func(result []map[string]interface{}) {
//fmt.Printf("接收到结果: %v\n", result)
resultChan <- result
})
// 等待窗口触发(处理时间模式)
//fmt.Println("等待窗口初始化...")
time.Sleep(1 * time.Second)
// 手动触发窗口
//fmt.Println("手动触发窗口")
strm.Window.Trigger()
// 等待结果
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
var actual interface{}
select {
case actual = <-resultChan:
//fmt.Println("成功接收到结果")
cancel()
case <-ctx.Done():
t.Fatal("测试超时,未收到结果")
}
// 验证结果
resultSlice, ok := actual.([]map[string]interface{})
require.True(t, ok, "结果应该是[]map[string]interface{}类型")
// 验证结果数量
assert.Len(t, resultSlice, 2, "应该有2个设备的聚合结果")
// 检查设备及其华氏度温度
for _, result := range resultSlice {
device, _ := result["device"].(string)
fahrenheit, ok := result["fahrenheit"].(float64)
assert.True(t, ok, "fahrenheit应该是float64类型")
if device == "aa" {
// (0 + 100)/2 = 50 摄氏度,转华氏度为 50*1.8+32 = 122
assert.InEpsilon(t, 122.0, fahrenheit, 0.001, "aa设备的平均华氏温度应为122")
} else if device == "bb" {
// (20 + 30)/2 = 25 摄氏度,转华氏度为 25*1.8+32 = 77
assert.InEpsilon(t, 77.0, fahrenheit, 0.001, "bb设备的平均华氏温度应为77")
}
}