-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathsource.go
More file actions
116 lines (99 loc) · 2.5 KB
/
source.go
File metadata and controls
116 lines (99 loc) · 2.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
package fluentd
import (
"github.com/sirupsen/logrus"
"github.com/fluent/fluentd-forwarder"
"github.com/op/go-logging"
"gopkg.in/sensorbee/sensorbee.v0/bql"
"gopkg.in/sensorbee/sensorbee.v0/core"
"gopkg.in/sensorbee/sensorbee.v0/data"
"time"
)
type source struct {
input *fluentd_forwarder.ForwardInput
ctx *core.Context
w core.Writer
ioParams *bql.IOParams
bind string
tagField string
}
func (s *source) Emit(rset []fluentd_forwarder.FluentRecordSet) error {
now := time.Now().UTC()
for _, rs := range rset {
for _, r := range rs.Records {
t := &core.Tuple{
ProcTimestamp: now,
Timestamp: time.Unix(int64(r.Timestamp), 0),
}
m, err := data.NewMap(r.Data)
if err != nil {
s.ctx.ErrLog(err).WithFields(logrus.Fields{
"source_type": s.ioParams.TypeName,
"source_name": s.ioParams.Name,
"data": r.Data,
}).Error("Cannot create a data.Map from the data")
continue
}
m[s.tagField] = data.String(rs.Tag)
t.Data = m
if err := s.w.Write(s.ctx, t); err != nil {
s.ctx.ErrLog(err).WithFields(logrus.Fields{
"source_type": s.ioParams.TypeName,
"source_name": s.ioParams.Name,
}).Error("Cannot write a tuple")
}
}
}
return nil
}
func (s *source) GenerateStream(ctx *core.Context, w core.Writer) error {
// Because the input isn't running yet, it's safe to set params here.
s.ctx = ctx
s.w = w
s.input.Start()
s.input.WaitForShutdown()
return nil
}
func (s *source) Stop(ctx *core.Context) error {
s.input.Stop()
s.input.WaitForShutdown()
return nil
}
type nullWriter struct {
}
func (n *nullWriter) Write(data []byte) (int, error) {
return len(data), nil
}
// NewSource create a new Source receiving data from fluentd's out_forward.
func NewSource(ctx *core.Context, ioParams *bql.IOParams, params data.Map) (core.Source, error) {
s := &source{
ioParams: ioParams,
bind: "127.0.0.1:24224",
tagField: "tag",
}
if v, ok := params["bind"]; ok {
b, err := data.AsString(v)
if err != nil {
return nil, err
}
s.bind = b
}
if v, ok := params["tag_field"]; ok {
t, err := data.AsString(v)
if err != nil {
return nil, err
}
s.tagField = t
}
// TODO: optionally support logging
logger, err := logging.GetLogger("fluentd-forwarder")
if err != nil {
return nil, err
}
logger.SetBackend(logging.AddModuleLevel(logging.NewLogBackend(&nullWriter{}, "", 0)))
input, err := fluentd_forwarder.NewForwardInput(logger, s.bind, s)
if err != nil {
return nil, err
}
s.input = input
return s, nil
}