-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathsink.go
More file actions
220 lines (198 loc) · 5.25 KB
/
sink.go
File metadata and controls
220 lines (198 loc) · 5.25 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
package mqtt
import (
"fmt"
"github.com/eclipse/paho.mqtt.golang"
"gopkg.in/sensorbee/sensorbee.v0/bql"
"gopkg.in/sensorbee/sensorbee.v0/core"
"gopkg.in/sensorbee/sensorbee.v0/data"
)
type sink struct {
opts *mqtt.ClientOptions
client mqtt.Client
qos byte
retained bool
broker string
user string
password string
payloadPath data.Path
topicPath data.Path
qosPath data.Path
defaultTopic string
}
func (s *sink) Write(ctx *core.Context, t *core.Tuple) error {
if !s.client.IsConnected() {
return nil
}
p, err := t.Data.Get(s.payloadPath)
if err != nil {
return err
}
var b []byte
switch p.Type() {
case data.TypeString:
str, _ := data.AsString(p)
b = []byte(str) // TODO: reduce this data copy
case data.TypeBlob:
b, _ = data.AsBlob(p)
case data.TypeArray, data.TypeMap:
b = []byte(p.String()) // TODO: reduce this data copy
default:
return fmt.Errorf("data type '%v' cannot be used as payload", p.Type())
}
topic := ""
if to, err := t.Data.Get(s.topicPath); err != nil {
if s.defaultTopic == "" {
return fmt.Errorf("topic field '%v' is missing", s.topicPath)
}
topic = s.defaultTopic
} else if topic, err = data.AsString(to); err != nil {
return err
}
qos := s.qos
if q, err := t.Data.Get(s.qosPath); err == nil {
qq, err := data.AsInt(q)
if err != nil {
return err
} else if qq < 0 || qq > 2 {
return fmt.Errorf("wrong QoS: %d", qq)
}
qos = byte(qq)
}
if token := s.client.Publish(topic, qos, s.retained, b); token.Wait() && token.Error() != nil {
return token.Error()
}
return nil
}
func (s *sink) Close(ctx *core.Context) error {
s.client.Disconnect(250)
return nil
}
// NewSink returns a sink as MQTT publisher. To publish a message, a tuple
// inserted into the sink needs to have two fields: "topic" and "payload".
// There is also one optional field: "qos", that should contain MQTT qos to
// publish message with:
//
// {
// "topic": "foo/bar",
// "payload": "any form of data including JSON encoded in string",
// "qos": 1
// }
//
// In the case above, the topic of the message is "foo/bar" and the QoS is
// increased to 1 meaing message must be delivered at least once.
// The field names of topic and payload can be changed by setting topic_field,
// payload_field and qos_field parameters, respectively. When a payload
// needs is a string or a blob, it's directly sent to a broker. The payload can
// also be an array or a map, and it will be sent as JSON.
//
// The sink has following optional parameters:
//
// * broker: the address of the broker in URI "scheme://host:port" format (default: "tcp://127.0.0.1:1883")
// * user: the user name to be connected (default: "")
// * password: the password of the user (default: "")
// * payload_field: the field name in tuples having a payload (default: "payload")
// * topic_field: the field name in tuples having a topic (default: "")
// * default_topic: the default topic used when a tuple doesn't have topic_field (default: "")
// * default_qos: the default to publish tuples with, can be 0, 1 or 2 (default: 0)
func NewSink(ctx *core.Context, ioParams *bql.IOParams, params data.Map) (core.Sink, error) {
s := &sink{
qos: 0,
retained: false,
broker: "tcp://127.0.0.1:1883",
user: "",
password: "",
payloadPath: data.MustCompilePath("payload"),
topicPath: data.MustCompilePath("topic"),
qosPath: data.MustCompilePath("qos"),
defaultTopic: "",
}
if v, ok := params["broker"]; ok {
b, err := data.AsString(v)
if err != nil {
return nil, err
}
s.broker, err = adjustOldBrokerURL(b)
if err != nil {
return nil, err
}
}
if v, ok := params["user"]; ok {
u, err := data.AsString(v)
if err != nil {
return nil, err
}
s.user = u
}
if v, ok := params["password"]; ok {
p, err := data.AsString(v)
if err != nil {
return nil, err
}
s.password = p
}
if v, ok := params["payload_field"]; ok {
name, err := data.AsString(v)
if err != nil {
return nil, err
}
path, err := data.CompilePath(name)
if err != nil {
return nil, err
}
s.payloadPath = path
}
if v, ok := params["topic_field"]; ok {
name, err := data.AsString(v)
if err != nil {
return nil, err
}
path, err := data.CompilePath(name)
if err != nil {
return nil, err
}
s.topicPath = path
}
if v, ok := params["default_topic"]; ok {
t, err := data.AsString(v)
if err != nil {
return nil, err
}
if t == "" {
return nil, fmt.Errorf("empty default topic is not supported")
}
s.defaultTopic = t
}
if v, ok := params["qos_field"]; ok {
name, err := data.AsString(v)
if err != nil {
return nil, err
}
path, err := data.CompilePath(name)
if err != nil {
return nil, err
}
s.qosPath = path
}
if v, ok := params["default_qos"]; ok {
q, err := data.AsInt(v)
if err != nil {
return nil, err
}
if q < 0 || q > 2 {
return nil, fmt.Errorf("unknown QoS. Qos can only be between 0 and 2")
}
s.qos = byte(q)
}
s.opts = mqtt.NewClientOptions()
s.opts.AddBroker(s.broker)
if s.user != "" {
s.opts.Username = s.user
s.opts.Password = s.password
}
s.client = mqtt.NewClient(s.opts)
if token := s.client.Connect(); token.Wait() && token.Error() != nil {
// TODO: error log
return nil, token.Error()
}
return s, nil
}