Skip to content

Commit 4a4acc4

Browse files
committed
feat: migrate decoders to protobuf
1 parent 5cae1b2 commit 4a4acc4

17 files changed

Lines changed: 1136 additions & 1504 deletions

File tree

input/elasticapm/internal/modeldecoder/modeldecodertest/populator.go

Lines changed: 55 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -36,54 +36,64 @@ import (
3636

3737
// Values used for populating the model structs
3838
type Values struct {
39-
Str string
40-
Int int
41-
Float float64
42-
Bool bool
43-
Time time.Time
44-
Duration time.Duration
45-
IP netip.Addr
46-
HTTPHeader http.Header
47-
LabelVal model.LabelValue
48-
NumericLabelVal model.NumericLabelValue
49-
MetricType modelpb.MetricType
39+
Str string
40+
Int int
41+
Float float64
42+
Bool bool
43+
Time time.Time
44+
Duration time.Duration
45+
IP netip.Addr
46+
HTTPHeader http.Header
47+
LabelVal model.LabelValue
48+
NumericLabelVal model.NumericLabelValue
49+
MetricType modelpb.MetricType
50+
CompressionStrategy modelpb.CompressionStrategy
5051
// N controls how many elements are added to a slice or a map
5152
N int
5253
}
5354

55+
var compressionStrategyText = map[modelpb.CompressionStrategy]string{
56+
modelpb.CompressionStrategy_COMPRESSION_STRATEGY_EXACT_MATCH: "exact_match",
57+
modelpb.CompressionStrategy_COMPRESSION_STRATEGY_SAME_KIND: "same_kind",
58+
}
59+
5460
// DefaultValues returns a Values struct initialized with non-zero values
5561
func DefaultValues() *Values {
5662
initTime, _ := time.Parse(time.RFC3339, "2020-10-10T10:00:00Z")
5763
return &Values{
58-
Str: "init",
59-
Int: 1,
60-
Float: 0.5,
61-
Bool: true,
62-
Time: initTime,
63-
Duration: time.Second,
64-
IP: netip.MustParseAddr("127.0.0.1"),
65-
HTTPHeader: http.Header{http.CanonicalHeaderKey("user-agent"): []string{"a", "b", "c"}},
66-
LabelVal: model.LabelValue{Value: "init"},
67-
NumericLabelVal: model.NumericLabelValue{Value: 0.5},
68-
N: 3,
64+
Str: "init",
65+
Int: 1,
66+
Float: 0.5,
67+
Bool: true,
68+
Time: initTime,
69+
Duration: time.Second,
70+
IP: netip.MustParseAddr("127.0.0.1"),
71+
HTTPHeader: http.Header{http.CanonicalHeaderKey("user-agent"): []string{"a", "b", "c"}},
72+
LabelVal: model.LabelValue{Value: "init"},
73+
NumericLabelVal: model.NumericLabelValue{Value: 0.5},
74+
MetricType: modelpb.MetricType_METRIC_TYPE_COUNTER,
75+
CompressionStrategy: modelpb.CompressionStrategy_COMPRESSION_STRATEGY_EXACT_MATCH,
76+
N: 3,
6977
}
7078
}
7179

7280
// NonDefaultValues returns a Values struct initialized with non-zero values
7381
func NonDefaultValues() *Values {
7482
updatedTime, _ := time.Parse(time.RFC3339, "2020-12-10T10:00:00Z")
7583
return &Values{
76-
Str: "overwritten",
77-
Int: 12,
78-
Float: 3.5,
79-
Bool: false,
80-
Time: updatedTime,
81-
Duration: time.Minute,
82-
IP: netip.MustParseAddr("192.168.0.1"),
83-
HTTPHeader: http.Header{http.CanonicalHeaderKey("user-agent"): []string{"d", "e"}},
84-
LabelVal: model.LabelValue{Value: "overwritten"},
85-
NumericLabelVal: model.NumericLabelValue{Value: 3.5},
86-
N: 2,
84+
Str: "overwritten",
85+
Int: 12,
86+
Float: 3.5,
87+
Bool: false,
88+
Time: updatedTime,
89+
Duration: time.Minute,
90+
IP: netip.MustParseAddr("192.168.0.1"),
91+
HTTPHeader: http.Header{http.CanonicalHeaderKey("user-agent"): []string{"d", "e"}},
92+
LabelVal: model.LabelValue{Value: "overwritten"},
93+
NumericLabelVal: model.NumericLabelValue{Value: 3.5},
94+
MetricType: modelpb.MetricType_METRIC_TYPE_GAUGE,
95+
CompressionStrategy: modelpb.CompressionStrategy_COMPRESSION_STRATEGY_SAME_KIND,
96+
N: 2,
8797
}
8898
}
8999

@@ -185,8 +195,13 @@ func SetStructValues(in interface{}, values *Values, opts ...SetStructValuesOpti
185195
case reflect.Struct:
186196
switch v := f.Interface().(type) {
187197
case nullable.String:
188-
v.Set(values.Str)
198+
if key == "composite.compression_strategy" {
199+
v.Set(compressionStrategyText[values.CompressionStrategy])
200+
} else {
201+
v.Set(values.Str)
202+
}
189203
fieldVal = reflect.ValueOf(v)
204+
190205
case nullable.Int:
191206
v.Set(values.Int)
192207
fieldVal = reflect.ValueOf(v)
@@ -324,12 +339,14 @@ func AssertStructValues(t *testing.T, i interface{}, isException func(string) bo
324339
newVal = &values.Bool
325340
case http.Header:
326341
newVal = values.HTTPHeader
327-
case *timestamppb.Timestamp:
328-
newVal = timestamppb.New(values.Time)
329342
case time.Duration:
330343
newVal = values.Duration
344+
case *timestamppb.Timestamp:
345+
newVal = timestamppb.New(values.Time)
331346
case modelpb.MetricType:
332-
newVal = values
347+
newVal = values.MetricType
348+
case modelpb.CompressionStrategy:
349+
newVal = values.CompressionStrategy
333350
default:
334351
// the populator recursively iterates over struct and structPtr
335352
// calling this function for all fields;
@@ -346,7 +363,7 @@ func AssertStructValues(t *testing.T, i interface{}, isException func(string) bo
346363
assert.NotZero(t, fVal, key)
347364
return
348365
}
349-
panic(fmt.Sprintf("unhandled type %s for key %s", f.Type(), key))
366+
panic(fmt.Sprintf("unhandled type %s %s for key %s", f.Kind(), f.Type(), key))
350367
}
351368
assert.Equal(t, newVal, fVal, key)
352369
})

input/elasticapm/internal/modeldecoder/modeldecoderutil/http.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@ package modeldecoderutil
2020
import (
2121
"encoding/json"
2222
"net/http"
23+
24+
"github.com/elastic/apm-data/model/modelpb"
25+
"google.golang.org/protobuf/types/known/structpb"
2326
)
2427

2528
// HTTPHeadersToMap converts h to a map[string]any, suitable for
@@ -39,6 +42,39 @@ func HTTPHeadersToMap(h http.Header) map[string]any {
3942
return m
4043
}
4144

45+
// HTTPHeadersToStructPb converts h to a *structpb.Struct, suitable for
46+
// use in modelpb.HTTP.{Request,Response}.Headers.
47+
func HTTPHeadersToStructPb(h http.Header) *structpb.Struct {
48+
if len(h) == 0 {
49+
return nil
50+
}
51+
m := make(map[string]any, len(h))
52+
for k, v := range h {
53+
arr := make([]any, 0, len(v))
54+
for _, s := range v {
55+
arr = append(arr, s)
56+
}
57+
m[k] = arr
58+
}
59+
if str, err := structpb.NewStruct(m); err == nil {
60+
return str
61+
}
62+
return nil
63+
}
64+
65+
func HTTPHeadersToModelpb(h http.Header) map[string]*modelpb.HTTPHeaderValue {
66+
if len(h) == 0 {
67+
return nil
68+
}
69+
m := make(map[string]*modelpb.HTTPHeaderValue, len(h))
70+
for k, v := range h {
71+
m[k] = &modelpb.HTTPHeaderValue{
72+
Values: v,
73+
}
74+
}
75+
return m
76+
}
77+
4278
// NormalizeHTTPRequestBody recurses through v, replacing any instance of
4379
// a json.Number with float64.
4480
//

input/elasticapm/internal/modeldecoder/modeldecoderutil/labels.go

Lines changed: 57 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,31 @@ import (
2222
"strconv"
2323

2424
"github.com/elastic/apm-data/model"
25+
"github.com/elastic/apm-data/model/modelpb"
2526
)
2627

2728
// GlobalLabelsFrom populates the Labels and NumericLabels from global labels
2829
// in the metadata object.
29-
func GlobalLabelsFrom(from map[string]any, to *model.APMEvent) {
30+
func GlobalLabelsFrom(from map[string]any, to *modelpb.APMEvent) {
31+
to.NumericLabels = make(modelpb.NumericLabels)
32+
to.Labels = make(modelpb.Labels)
33+
MergeLabels(from, to)
34+
for k, v := range to.Labels {
35+
v.Global = true
36+
to.Labels[k] = v
37+
}
38+
for k, v := range to.NumericLabels {
39+
v.Global = true
40+
to.NumericLabels[k] = v
41+
}
42+
}
43+
44+
// GlobalLabelsFrom populates the Labels and NumericLabels from global labels
45+
// in the metadata object.
46+
func GlobalLabelsFromOld(from map[string]any, to *model.APMEvent) {
3047
to.NumericLabels = make(model.NumericLabels)
3148
to.Labels = make(model.Labels)
32-
MergeLabels(from, to)
49+
MergeLabelsOld(from, to)
3350
for k, v := range to.Labels {
3451
v.Global = true
3552
to.Labels[k] = v
@@ -44,7 +61,7 @@ func GlobalLabelsFrom(from map[string]any, to *model.APMEvent) {
4461
// combining event-specific labels onto (metadata) global labels.
4562
//
4663
// If eventLabels is non-nil, it is first cloned.
47-
func MergeLabels(eventLabels map[string]any, to *model.APMEvent) {
64+
func MergeLabelsOld(eventLabels map[string]any, to *model.APMEvent) {
4865
if to.NumericLabels == nil {
4966
to.NumericLabels = make(model.NumericLabels)
5067
}
@@ -54,14 +71,47 @@ func MergeLabels(eventLabels map[string]any, to *model.APMEvent) {
5471
for k, v := range eventLabels {
5572
switch v := v.(type) {
5673
case string:
57-
to.Labels.Set(k, v)
74+
model.Labels(to.Labels).Set(k, v)
75+
case bool:
76+
model.Labels(to.Labels).Set(k, strconv.FormatBool(v))
77+
case float64:
78+
model.NumericLabels(to.NumericLabels).Set(k, v)
79+
case json.Number:
80+
if floatVal, err := v.Float64(); err == nil {
81+
model.NumericLabels(to.NumericLabels).Set(k, floatVal)
82+
}
83+
}
84+
}
85+
if len(to.NumericLabels) == 0 {
86+
to.NumericLabels = nil
87+
}
88+
if len(to.Labels) == 0 {
89+
to.Labels = nil
90+
}
91+
}
92+
93+
// MergeLabels merges eventLabels into the APMEvent. This is used for
94+
// combining event-specific labels onto (metadata) global labels.
95+
//
96+
// If eventLabels is non-nil, it is first cloned.
97+
func MergeLabels(eventLabels map[string]any, to *modelpb.APMEvent) {
98+
if to.NumericLabels == nil {
99+
to.NumericLabels = make(modelpb.NumericLabels)
100+
}
101+
if to.Labels == nil {
102+
to.Labels = make(modelpb.Labels)
103+
}
104+
for k, v := range eventLabels {
105+
switch v := v.(type) {
106+
case string:
107+
modelpb.Labels(to.Labels).Set(k, v)
58108
case bool:
59-
to.Labels.Set(k, strconv.FormatBool(v))
109+
modelpb.Labels(to.Labels).Set(k, strconv.FormatBool(v))
60110
case float64:
61-
to.NumericLabels.Set(k, v)
111+
modelpb.NumericLabels(to.NumericLabels).Set(k, v)
62112
case json.Number:
63113
if floatVal, err := v.Float64(); err == nil {
64-
to.NumericLabels.Set(k, floatVal)
114+
modelpb.NumericLabels(to.NumericLabels).Set(k, floatVal)
65115
}
66116
}
67117
}

input/elasticapm/internal/modeldecoder/modeldecoderutil/metrics.go

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@ package modeldecoderutil
2020
import (
2121
"time"
2222

23-
"github.com/elastic/apm-data/model"
23+
"github.com/elastic/apm-data/model/modelpb"
24+
"google.golang.org/protobuf/types/known/durationpb"
2425
)
2526

2627
// SetInternalMetrics extracts well-known internal metrics from event.Metricset.Samples,
@@ -33,7 +34,7 @@ import (
3334
// SetInternalMetrics returns true if any known metric samples were found, and false
3435
// otherwise. If no known metric samples were found, the caller may opt to omit the
3536
// metricset altogether.
36-
func SetInternalMetrics(event *model.APMEvent) bool {
37+
func SetInternalMetrics(event *modelpb.APMEvent) bool {
3738
if event.Transaction == nil {
3839
// Not an internal metricset.
3940
return false
@@ -43,10 +44,16 @@ func SetInternalMetrics(event *model.APMEvent) bool {
4344
for _, v := range event.Metricset.Samples {
4445
switch v.Name {
4546
case "span.self_time.count":
46-
event.Span.SelfTime.Count = int(v.Value)
47+
if event.Span.SelfTime == nil {
48+
event.Span.SelfTime = &modelpb.AggregatedDuration{}
49+
}
50+
event.Span.SelfTime.Count = int64(v.Value)
4751
haveMetrics = true
4852
case "span.self_time.sum.us":
49-
event.Span.SelfTime.Sum = time.Duration(v.Value * 1000)
53+
if event.Span.SelfTime == nil {
54+
event.Span.SelfTime = &modelpb.AggregatedDuration{}
55+
}
56+
event.Span.SelfTime.Sum = durationpb.New(time.Duration(v.Value * 1000))
5057
haveMetrics = true
5158
}
5259
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package modeldecoderutil
2+
3+
import (
4+
"google.golang.org/protobuf/types/known/structpb"
5+
)
6+
7+
func ToStruct(m map[string]any) *structpb.Struct {
8+
if str, err := structpb.NewStruct(m); err == nil {
9+
return str
10+
}
11+
return nil
12+
}
13+
14+
func ToValue(a any) *structpb.Value {
15+
if v, err := structpb.NewValue(a); err == nil {
16+
return v
17+
}
18+
return nil
19+
}
20+
21+
func NormalizeMap(m map[string]any) map[string]any {
22+
v := NormalizeHTTPRequestBody(m)
23+
return v.(map[string]any)
24+
}

0 commit comments

Comments
 (0)