-
Notifications
You must be signed in to change notification settings - Fork 8
Expand file tree
/
Copy pathencode_v2.go
More file actions
150 lines (126 loc) · 3.31 KB
/
encode_v2.go
File metadata and controls
150 lines (126 loc) · 3.31 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
// Unless explicitly stated otherwise all files in this repository are licensed
// under the MIT License.
// This product includes software developed at Guance Cloud (https://www.guance.com/).
// Copyright 2021-present Guance, Inc.
package point
// EncodeV2 set points to be encoded.
func (e *Encoder) EncodeV2(pts []*Point) {
e.pts = pts
}
// Next is a iterator that return encoded data in prepared buf.
// If all points encoded done, return false.
// If any error occurred, we can get the error by call e.LastErr().
func (e *Encoder) Next(buf []byte) ([]byte, bool) {
switch e.enc {
case Protobuf:
return e.doEncodeProtobuf(buf)
case LineProtocol:
return e.doEncodeLineProtocol(buf)
case JSON:
return e.doEncodeJSON(buf)
case PBJSON:
return e.doEncodePBJSON(buf)
default: // TODO: json
return nil, false
}
}
func (e *Encoder) doEncodeProtobuf(buf []byte) ([]byte, bool) {
var curSize int
// clear points before current package
if len(e.pbpts.Arr) > 0 {
e.pbpts.Arr = e.pbpts.Arr[:0]
}
for _, pt := range e.pts[e.lastPtsIdx:] {
if pt == nil {
continue
}
curSize += pt.PBArraySize()
if curSize >= len(buf) {
// no point added, means current point(not added) is a
// huge point that can't fit into buf.
if len(e.pbpts.Arr) == 0 {
e.lastErr = errTooSmallBuffer
return nil, false
}
break // break current package, the remaining points will encoded in next package
} else {
e.pbpts.Arr = append(e.pbpts.Arr, pt.pt)
e.lastPtsIdx++
}
}
if len(e.pbpts.Arr) == 0 { // no points available, it's done
return nil, false
}
if n, err := e.pbpts.MarshalTo(buf); err != nil {
e.lastErr = err
return nil, false
} else {
if e.fn != nil {
if err := e.fn(len(e.pbpts.Arr), buf[:n]); err != nil {
e.lastErr = err
return nil, false
}
}
e.parts++
return buf[:n], true
}
}
func (e *Encoder) doEncodeLineProtocol(buf []byte) ([]byte, bool) {
curSize := 0
npts := 0
for _, pt := range e.pts[e.lastPtsIdx:] {
if pt == nil {
continue
}
lppt, err := pt.LPPoint()
if err != nil {
e.lastErr = err
continue
}
ptsize := lppt.StringSize()
if curSize+ptsize+1 > len(buf) { // extra +1 used to store the last '\n'
if curSize == 0 { // nothing added
e.lastErr = errTooSmallBuffer
return nil, false
}
if e.fn != nil {
if err := e.fn(npts, buf[:curSize]); err != nil {
e.lastErr = err
return nil, false
}
}
e.parts++
return buf[:curSize], true
} else {
e.lpPointBuf = lppt.AppendString(e.lpPointBuf)
copy(buf[curSize:], e.lpPointBuf[:ptsize])
// Always add '\n' to the end of current point, this may
// cause a _unneeded_ '\n' to the end of buf, it's ok for
// line-protocol parsing.
buf[curSize+ptsize] = '\n'
curSize += (ptsize + 1)
// clean buffer, next time AppendString() append from byte 0
e.lpPointBuf = e.lpPointBuf[:0]
e.lastPtsIdx++
npts++
}
}
if curSize > 0 {
e.parts++
if e.fn != nil { // NOTE: encode callback error will terminate encode
if err := e.fn(npts, buf[:curSize]); err != nil {
e.lastErr = err
return nil, false
}
}
return buf[:curSize], true
} else {
return nil, false
}
}
func (e *Encoder) doEncodeJSON(buf []byte) ([]byte, bool) {
return nil, false
}
func (e *Encoder) doEncodePBJSON(buf []byte) ([]byte, bool) {
return nil, false
}