-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcollector.go
More file actions
118 lines (107 loc) · 2.65 KB
/
collector.go
File metadata and controls
118 lines (107 loc) · 2.65 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
package glsensor
import (
"log"
"sync"
"github.com/dipoll/glsensor/destinations"
"github.com/dipoll/glsensor/sensors"
)
//Server Main service structure
type Server struct {
configuration []*DeviceConf
//destinations destinations.Router
//destinations []destinations.Sender
currentMeasurements []destinations.MetricValue
mtx sync.RWMutex
readyToSend chan *destinations.MetricValue
halt chan bool
}
//NewServer creates a new collector server
func NewServer(conf []*DeviceConf) *Server {
s := Server{
configuration: conf,
readyToSend: make(chan *destinations.MetricValue),
halt: make(chan bool),
}
return &s
}
//Start runs listen and forward data to senders
func (m *Server) Start() {
log.Println("Starting server")
m.forwardToSender()
}
func (m *Server) forwardToSender() bool {
for {
select {
case msg := <-m.readyToSend:
log.Println("Sending some data", msg)
//m.destinations.Send(msg)
for _, d := range m.configuration {
for _, dest := range d.Destinations {
dest.Destination.Send(msg)
}
}
case shutDown := <-m.halt:
if shutDown {
log.Println("Got shutdown signal, stopping ...")
return true
}
default:
continue
}
}
}
//Shutdown sends signal to stop handling of destinations
func (m *Server) Shutdown() {
log.Println("Shutdown the server!")
m.halt <- true
}
// CollectAll runs all measurers to collect data
// into the memory and sends all notifications to the
// destinations
func (m *Server) CollectAll() error {
log.Println("Starting collecting all sensors")
for _, device := range m.configuration {
log.Println("Collecting from device ", device)
err := m.collectFromDevice(device)
if err != nil {
return err
}
}
return nil
}
func (m *Server) collectFromDevice(d *DeviceConf) error {
fin := make(chan int, len(d.Sensors))
for _, sconf := range d.Sensors {
go func(sens sensors.Measurer, c chan int) {
defer func() {
if r := recover(); r != nil {
c <- 0
}
}()
measurements, err := sens.Measure()
if err != nil {
return
}
for _, meas := range measurements {
go m.addRetreivedValue(meas, d, true)
}
c <- 1
}(sconf.Sensor, fin)
}
for i := 0; i < len(d.Sensors); i++ {
<-fin
}
close(fin)
return nil
}
//Added retrieved value
func (m *Server) addRetreivedValue(meas sensors.Measurement, d *DeviceConf, notify bool) {
log.Println("Adding value of ", d, meas)
m.mtx.Lock()
rval := destinations.MetricValue{M: &meas, Name: d.Name, Location: d.Location, Region: d.Region}
m.currentMeasurements = append(m.currentMeasurements, rval)
m.mtx.Unlock()
if notify {
m.readyToSend <- &rval
}
}