-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathrow_data_to_point.go
More file actions
106 lines (99 loc) · 3.16 KB
/
row_data_to_point.go
File metadata and controls
106 lines (99 loc) · 3.16 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
package exporter
import (
"time"
timestamppb "github.com/golang/protobuf/ptypes/timestamp"
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
distributionpb "google.golang.org/genproto/googleapis/api/distribution"
monitoringpb "google.golang.org/genproto/googleapis/monitoring/v3"
)
// Functions in this file is used to convert RowData to monitoring point that are used by uploading
// RPC calls of monitoring client. All functions in this file are copied from
// contrib.go.opencensus.io/exporter/stackdriver.
func newPoint(v *view.View, row *view.Row, start, end time.Time) *monitoringpb.Point {
switch v.Aggregation.Type {
case view.AggTypeLastValue:
return newGaugePoint(v, row, end)
default:
return newCumulativePoint(v, row, start, end)
}
}
func newCumulativePoint(v *view.View, row *view.Row, start, end time.Time) *monitoringpb.Point {
return &monitoringpb.Point{
Interval: &monitoringpb.TimeInterval{
StartTime: ×tamppb.Timestamp{
Seconds: start.Unix(),
Nanos: int32(start.Nanosecond()),
},
EndTime: ×tamppb.Timestamp{
Seconds: end.Unix(),
Nanos: int32(end.Nanosecond()),
},
},
Value: newTypedValue(v, row),
}
}
func newGaugePoint(v *view.View, row *view.Row, end time.Time) *monitoringpb.Point {
gaugeTime := ×tamppb.Timestamp{
Seconds: end.Unix(),
Nanos: int32(end.Nanosecond()),
}
return &monitoringpb.Point{
Interval: &monitoringpb.TimeInterval{
EndTime: gaugeTime,
},
Value: newTypedValue(v, row),
}
}
func newTypedValue(vd *view.View, r *view.Row) *monitoringpb.TypedValue {
switch v := r.Data.(type) {
case *view.CountData:
return &monitoringpb.TypedValue{Value: &monitoringpb.TypedValue_Int64Value{
Int64Value: v.Value,
}}
case *view.SumData:
switch vd.Measure.(type) {
case *stats.Int64Measure:
return &monitoringpb.TypedValue{Value: &monitoringpb.TypedValue_Int64Value{
Int64Value: int64(v.Value),
}}
case *stats.Float64Measure:
return &monitoringpb.TypedValue{Value: &monitoringpb.TypedValue_DoubleValue{
DoubleValue: v.Value,
}}
}
case *view.DistributionData:
return &monitoringpb.TypedValue{Value: &monitoringpb.TypedValue_DistributionValue{
DistributionValue: &distributionpb.Distribution{
Count: v.Count,
Mean: v.Mean,
SumOfSquaredDeviation: v.SumOfSquaredDev,
// TODO(songya): uncomment this once Stackdriver supports min/max.
// Range: &distributionpb.Distribution_Range{
// Min: v.Min,
// Max: v.Max,
// },
BucketOptions: &distributionpb.Distribution_BucketOptions{
Options: &distributionpb.Distribution_BucketOptions_ExplicitBuckets{
ExplicitBuckets: &distributionpb.Distribution_BucketOptions_Explicit{
Bounds: vd.Aggregation.Buckets,
},
},
},
BucketCounts: v.CountPerBucket,
},
}}
case *view.LastValueData:
switch vd.Measure.(type) {
case *stats.Int64Measure:
return &monitoringpb.TypedValue{Value: &monitoringpb.TypedValue_Int64Value{
Int64Value: int64(v.Value),
}}
case *stats.Float64Measure:
return &monitoringpb.TypedValue{Value: &monitoringpb.TypedValue_DoubleValue{
DoubleValue: v.Value,
}}
}
}
return nil
}