File tree Expand file tree Collapse file tree
Expand file tree Collapse file tree Original file line number Diff line number Diff line change 1+ package database
2+
3+ import (
4+ "ac/utils"
5+ "sync"
6+ "time"
7+ )
8+
9+ type BatchInserter [T any ] struct {
10+ mu sync.Mutex
11+ buffer []T
12+ maxSize int
13+ ticker * time.Ticker
14+ stopCh chan struct {}
15+ tableName string
16+ }
17+
18+ func NewBatchInserter [T any ](tableName string , maxSize int , flushInterval time.Duration ) * BatchInserter [T ] {
19+ b := & BatchInserter [T ]{
20+ buffer : make ([]T , 0 , maxSize ),
21+ maxSize : maxSize ,
22+ ticker : time .NewTicker (flushInterval ),
23+ stopCh : make (chan struct {}),
24+ tableName : tableName ,
25+ }
26+ go b .run ()
27+ return b
28+ }
29+
30+ func (b * BatchInserter [T ]) run () {
31+ for {
32+ select {
33+ case <- b .ticker .C :
34+ b .Flush ()
35+ case <- b .stopCh :
36+ return
37+ }
38+ }
39+ }
40+
41+ func (b * BatchInserter [T ]) Add (item T ) {
42+ b .mu .Lock ()
43+ b .buffer = append (b .buffer , item )
44+ shouldFlush := len (b .buffer ) >= b .maxSize
45+ b .mu .Unlock ()
46+
47+ if shouldFlush {
48+ b .Flush ()
49+ }
50+ }
51+
52+ func (b * BatchInserter [T ]) Flush () {
53+ b .mu .Lock ()
54+ if len (b .buffer ) == 0 {
55+ b .mu .Unlock ()
56+ return
57+ }
58+ batch := b .buffer
59+ b .buffer = make ([]T , 0 , b .maxSize )
60+ b .mu .Unlock ()
61+
62+ if result := DB .CreateInBatches (& batch , len (batch )); result .Error != nil {
63+ utils .SugarLogger .Errorf ("[DB] Batch insert failed for %s: %s" , b .tableName , result .Error )
64+ return
65+ }
66+ utils .SugarLogger .Infof ("[DB] Flushed %d %s rows" , len (batch ), b .tableName )
67+ }
68+
69+ func (b * BatchInserter [T ]) Stop () {
70+ b .ticker .Stop ()
71+ close (b .stopCh )
72+ b .Flush ()
73+ }
Original file line number Diff line number Diff line change @@ -17,6 +17,10 @@ func main() {
1717
1818 service .RegisterRincon ()
1919 database .InitializeDB ()
20+ service .InitSignalBatch ()
21+ service .InitPingBatch ()
22+ defer service .StopSignalBatch ()
23+ defer service .StopPingBatch ()
2024 mqtt .InitializeMQTT ()
2125 service .SubscribeTopics ()
2226
Original file line number Diff line number Diff line change @@ -11,6 +11,16 @@ import (
1111 "github.com/gaucho-racing/mapache-go"
1212)
1313
14+ var pingBatch * database.BatchInserter [mapache.Ping ]
15+
16+ func InitPingBatch () {
17+ pingBatch = database .NewBatchInserter [mapache.Ping ]("pings" , 5000 , 1 * time .Second )
18+ }
19+
20+ func StopPingBatch () {
21+ pingBatch .Stop ()
22+ }
23+
1424func HandlePing (vehicleID string , nodeID string , payload []byte ) {
1525 utils .SugarLogger .Infof ("[MQ] Received ping from ac/%s/%s" , vehicleID , nodeID )
1626 ping := binary .BigEndian .Uint64 (payload [:8 ])
@@ -53,15 +63,6 @@ func GetPing(vehicleID string, micros int) mapache.Ping {
5363}
5464
5565func CreatePing (ping mapache.Ping ) error {
56- result := database .DB .Create (& ping )
57- if result .Error != nil {
58- return result .Error
59- }
60- utils .SugarLogger .Infow ("[DB] New ping created" ,
61- "vehicle_id" , ping .VehicleID ,
62- "ping" , ping .Ping ,
63- "pong" , ping .Pong ,
64- "latency" , ping .Latency ,
65- )
66+ pingBatch .Add (ping )
6667 return nil
6768}
Original file line number Diff line number Diff line change @@ -2,12 +2,22 @@ package service
22
33import (
44 "ac/database"
5- "ac/utils"
65 "fmt"
6+ "time"
77
88 "github.com/gaucho-racing/mapache-go"
99)
1010
11+ var signalBatch * database.BatchInserter [mapache.Signal ]
12+
13+ func InitSignalBatch () {
14+ signalBatch = database .NewBatchInserter [mapache.Signal ]("signals" , 5000 , 1 * time .Second )
15+ }
16+
17+ func StopSignalBatch () {
18+ signalBatch .Stop ()
19+ }
20+
1121// signalCallbacks is a list of functions that will be called when a signal is created or updated
1222var signalCallbacks = []func (signal mapache.Signal ){}
1323
@@ -39,14 +49,7 @@ func CreateSignal(signal mapache.Signal) error {
3949 if signal .Name == "" {
4050 return fmt .Errorf ("signal name cannot be empty" )
4151 }
42- if result := database .DB .Create (& signal ); result .Error != nil {
43- return result .Error
44- }
45- utils .SugarLogger .Infow ("[DB] Signal inserted" ,
46- "timestamp" , signal .Timestamp ,
47- "vehicle_id" , signal .VehicleID ,
48- "name" , signal .Name ,
49- )
52+ signalBatch .Add (signal )
5053 go signalNotify (signal )
5154 return nil
5255}
You can’t perform that action at this time.
0 commit comments