-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathproject_data.go
More file actions
159 lines (141 loc) · 5.23 KB
/
project_data.go
File metadata and controls
159 lines (141 loc) · 5.23 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
package exporter
import (
"fmt"
"time"
"go.opencensus.io/tag"
"google.golang.org/api/support/bundler"
metricpb "google.golang.org/genproto/googleapis/api/metric"
monitoringpb "google.golang.org/genproto/googleapis/monitoring/v3"
)
// maximum number of time series that stackdriver accepts. Only test may change this value.
var MaxTimeSeriesPerUpload = 200
// projectData contain per-project data in exporter. It should be created by newProjectData()
type projectData struct {
parent *StatsExporter
projectID string
// We make bundler for each project because call to monitoring RPC can be grouped only in
// project level
bndler expBundler
}
// We wrap bundler and its maker for testing purpose.
type expBundler interface {
Add(interface{}, int) error
Flush()
}
var newExpBundler = defaultNewExpBundler
// Since options in bundler are directly set to its fields and interface does not allow any fields,
// we put option set-up process inside bundler's maker.
func defaultNewExpBundler(uploader func(interface{}), delayThreshold time.Duration, countThreshold int) expBundler {
bndler := bundler.NewBundler((*RowData)(nil), uploader)
// Set options for bundler if they are provided by users.
if 0 < delayThreshold {
bndler.DelayThreshold = delayThreshold
}
if 0 < countThreshold {
bndler.BundleCountThreshold = countThreshold
}
return bndler
}
func (e *StatsExporter) newProjectData(projectID string) *projectData {
pd := &projectData{
parent: e,
projectID: projectID,
}
pd.bndler = newExpBundler(pd.uploadRowData, e.opts.BundleDelayThreshold, e.opts.BundleCountThreshold)
return pd
}
// uploadRowData is called by bundler to upload row data, and report any error happened meanwhile.
func (pd *projectData) uploadRowData(bundle interface{}) {
exp := pd.parent
rds := bundle.([]*RowData)
// reqRds contains RowData objects those are uploaded to stackdriver at given iteration.
// It's main usage is for error reporting. For actual uploading operation, we use req.
// remainingRds are RowData that has not been processed at all.
var reqRds, remainingRds []*RowData
for ; len(rds) != 0; rds = remainingRds {
var req *monitoringpb.CreateTimeSeriesRequest
req, reqRds, remainingRds = pd.makeReq(rds)
if req == nil {
// no need to perform RPC call for empty set of requests.
continue
}
if err := exp.client.CreateTimeSeries(exp.ctx, req); err != nil {
newErr := fmt.Errorf("RPC call to create time series failed for project %s: %v", pd.projectID, err)
// We pass all row data not successfully uploaded.
exp.onError(newErr, reqRds...)
}
}
}
// makeReq creates a request that's suitable to be passed to create time series RPC call.
//
// reqRds contains rows those are contained in req. Main use of reqRds is to be returned to users if
// creating time series failed. (We don't want users to investigate structure of timeseries.)
// remainingRds contains rows those are not used at all in makeReq because of the length limitation
// or request. Another call of makeReq() with remainigRds will handle (some) rows in them. When req
// is nil, then there's nothing to request and reqRds will also contain nothing.
//
// Some rows in rds may fail while converting them to time series, and in that case makeReq() calls
// exporter's onError() directly, not propagating errors to the caller.
func (pd *projectData) makeReq(rds []*RowData) (req *monitoringpb.CreateTimeSeriesRequest, reqRds, remainingRds []*RowData) {
exp := pd.parent
timeSeries := []*monitoringpb.TimeSeries{}
var i int
var rd *RowData
for i, rd = range rds {
pt := newPoint(rd.View, rd.Row, rd.Start, rd.End)
if pt.Value == nil {
err := fmt.Errorf("inconsistent data found in view %s", rd.View.Name)
pd.parent.onError(err, rd)
continue
}
resource, err := exp.makeResource(rd)
if err != nil {
newErr := fmt.Errorf("failed to construct resource of view %s: %v", rd.View.Name, err)
pd.parent.onError(newErr, rd)
continue
}
ts := &monitoringpb.TimeSeries{
Metric: &metricpb.Metric{
Type: rd.View.Name,
Labels: exp.makeLabels(rd.Row.Tags),
},
Resource: resource,
Points: []*monitoringpb.Point{pt},
}
// Growing timeseries and reqRds are done at same time.
timeSeries = append(timeSeries, ts)
reqRds = append(reqRds, rd)
// Don't grow timeseries over the limit.
if len(timeSeries) == MaxTimeSeriesPerUpload {
break
}
}
// Since i is the last index processed, remainingRds should start from i+1.
remainingRds = rds[i+1:]
if len(timeSeries) == 0 {
req = nil
} else {
req = &monitoringpb.CreateTimeSeriesRequest{
Name: fmt.Sprintf("projects/%s", pd.projectID),
TimeSeries: timeSeries,
}
}
return req, reqRds, remainingRds
}
// makeLables constructs label that's ready for being uploaded to stackdriver.
func (e *StatsExporter) makeLabels(tags []tag.Tag) map[string]string {
opts := e.opts
labels := make(map[string]string, len(opts.DefaultLabels)+len(tags))
for key, val := range opts.DefaultLabels {
labels[key] = val
}
// If there's overlap When combining exporter's default label and tags, values in tags win.
for _, tag := range tags {
labels[tag.Key.Name()] = tag.Value
}
// Some labels are not for exporting.
for _, key := range opts.UnexportedLabels {
delete(labels, key)
}
return labels
}