Skip to content

Commit 0acd047

Browse files
authored
feat: migrate decoders to protobuf (#63)
* feat: migrate decoders to protobuf * refactor: reduce diff noise * build: update missing licenses * fix: add nil check to avoid nil dereference discovered by fuzz testing * test: remove complex type from populator and skip compression strategy in span test * test: ignore non ndjson files in jsonschema test this was conflicting with fuzz testing * lint: remove unused variables * test: add compression strategy test
1 parent 5cae1b2 commit 0acd047

20 files changed

Lines changed: 1188 additions & 1596 deletions

input/elasticapm/internal/modeldecoder/modeldecodertest/populator.go

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ import (
3131

3232
"github.com/elastic/apm-data/input/elasticapm/internal/modeldecoder/nullable"
3333
"github.com/elastic/apm-data/model"
34-
"github.com/elastic/apm-data/model/modelpb"
3534
)
3635

3736
// Values used for populating the model structs
@@ -46,7 +45,6 @@ type Values struct {
4645
HTTPHeader http.Header
4746
LabelVal model.LabelValue
4847
NumericLabelVal model.NumericLabelValue
49-
MetricType modelpb.MetricType
5048
// N controls how many elements are added to a slice or a map
5149
N int
5250
}
@@ -328,8 +326,6 @@ func AssertStructValues(t *testing.T, i interface{}, isException func(string) bo
328326
newVal = timestamppb.New(values.Time)
329327
case time.Duration:
330328
newVal = values.Duration
331-
case modelpb.MetricType:
332-
newVal = values
333329
default:
334330
// the populator recursively iterates over struct and structPtr
335331
// calling this function for all fields;
@@ -346,7 +342,7 @@ func AssertStructValues(t *testing.T, i interface{}, isException func(string) bo
346342
assert.NotZero(t, fVal, key)
347343
return
348344
}
349-
panic(fmt.Sprintf("unhandled type %s for key %s", f.Type(), key))
345+
panic(fmt.Sprintf("unhandled type %s %s for key %s", f.Kind(), f.Type(), key))
350346
}
351347
assert.Equal(t, newVal, fVal, key)
352348
})

input/elasticapm/internal/modeldecoder/modeldecoderutil/http.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@ package modeldecoderutil
2020
import (
2121
"encoding/json"
2222
"net/http"
23+
24+
"github.com/elastic/apm-data/model/modelpb"
25+
"google.golang.org/protobuf/types/known/structpb"
2326
)
2427

2528
// HTTPHeadersToMap converts h to a map[string]any, suitable for
@@ -39,6 +42,39 @@ func HTTPHeadersToMap(h http.Header) map[string]any {
3942
return m
4043
}
4144

45+
// HTTPHeadersToStructPb converts h to a *structpb.Struct, suitable for
46+
// use in modelpb.HTTP.{Request,Response}.Headers.
47+
func HTTPHeadersToStructPb(h http.Header) *structpb.Struct {
48+
if len(h) == 0 {
49+
return nil
50+
}
51+
m := make(map[string]any, len(h))
52+
for k, v := range h {
53+
arr := make([]any, 0, len(v))
54+
for _, s := range v {
55+
arr = append(arr, s)
56+
}
57+
m[k] = arr
58+
}
59+
if str, err := structpb.NewStruct(m); err == nil {
60+
return str
61+
}
62+
return nil
63+
}
64+
65+
func HTTPHeadersToModelpb(h http.Header) map[string]*modelpb.HTTPHeaderValue {
66+
if len(h) == 0 {
67+
return nil
68+
}
69+
m := make(map[string]*modelpb.HTTPHeaderValue, len(h))
70+
for k, v := range h {
71+
m[k] = &modelpb.HTTPHeaderValue{
72+
Values: v,
73+
}
74+
}
75+
return m
76+
}
77+
4278
// NormalizeHTTPRequestBody recurses through v, replacing any instance of
4379
// a json.Number with float64.
4480
//

input/elasticapm/internal/modeldecoder/modeldecoderutil/labels.go

Lines changed: 57 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,31 @@ import (
2222
"strconv"
2323

2424
"github.com/elastic/apm-data/model"
25+
"github.com/elastic/apm-data/model/modelpb"
2526
)
2627

2728
// GlobalLabelsFrom populates the Labels and NumericLabels from global labels
2829
// in the metadata object.
29-
func GlobalLabelsFrom(from map[string]any, to *model.APMEvent) {
30+
func GlobalLabelsFrom(from map[string]any, to *modelpb.APMEvent) {
31+
to.NumericLabels = make(modelpb.NumericLabels)
32+
to.Labels = make(modelpb.Labels)
33+
MergeLabels(from, to)
34+
for k, v := range to.Labels {
35+
v.Global = true
36+
to.Labels[k] = v
37+
}
38+
for k, v := range to.NumericLabels {
39+
v.Global = true
40+
to.NumericLabels[k] = v
41+
}
42+
}
43+
44+
// GlobalLabelsFrom populates the Labels and NumericLabels from global labels
45+
// in the metadata object.
46+
func GlobalLabelsFromOld(from map[string]any, to *model.APMEvent) {
3047
to.NumericLabels = make(model.NumericLabels)
3148
to.Labels = make(model.Labels)
32-
MergeLabels(from, to)
49+
MergeLabelsOld(from, to)
3350
for k, v := range to.Labels {
3451
v.Global = true
3552
to.Labels[k] = v
@@ -44,7 +61,7 @@ func GlobalLabelsFrom(from map[string]any, to *model.APMEvent) {
4461
// combining event-specific labels onto (metadata) global labels.
4562
//
4663
// If eventLabels is non-nil, it is first cloned.
47-
func MergeLabels(eventLabels map[string]any, to *model.APMEvent) {
64+
func MergeLabelsOld(eventLabels map[string]any, to *model.APMEvent) {
4865
if to.NumericLabels == nil {
4966
to.NumericLabels = make(model.NumericLabels)
5067
}
@@ -54,14 +71,47 @@ func MergeLabels(eventLabels map[string]any, to *model.APMEvent) {
5471
for k, v := range eventLabels {
5572
switch v := v.(type) {
5673
case string:
57-
to.Labels.Set(k, v)
74+
model.Labels(to.Labels).Set(k, v)
75+
case bool:
76+
model.Labels(to.Labels).Set(k, strconv.FormatBool(v))
77+
case float64:
78+
model.NumericLabels(to.NumericLabels).Set(k, v)
79+
case json.Number:
80+
if floatVal, err := v.Float64(); err == nil {
81+
model.NumericLabels(to.NumericLabels).Set(k, floatVal)
82+
}
83+
}
84+
}
85+
if len(to.NumericLabels) == 0 {
86+
to.NumericLabels = nil
87+
}
88+
if len(to.Labels) == 0 {
89+
to.Labels = nil
90+
}
91+
}
92+
93+
// MergeLabels merges eventLabels into the APMEvent. This is used for
94+
// combining event-specific labels onto (metadata) global labels.
95+
//
96+
// If eventLabels is non-nil, it is first cloned.
97+
func MergeLabels(eventLabels map[string]any, to *modelpb.APMEvent) {
98+
if to.NumericLabels == nil {
99+
to.NumericLabels = make(modelpb.NumericLabels)
100+
}
101+
if to.Labels == nil {
102+
to.Labels = make(modelpb.Labels)
103+
}
104+
for k, v := range eventLabels {
105+
switch v := v.(type) {
106+
case string:
107+
modelpb.Labels(to.Labels).Set(k, v)
58108
case bool:
59-
to.Labels.Set(k, strconv.FormatBool(v))
109+
modelpb.Labels(to.Labels).Set(k, strconv.FormatBool(v))
60110
case float64:
61-
to.NumericLabels.Set(k, v)
111+
modelpb.NumericLabels(to.NumericLabels).Set(k, v)
62112
case json.Number:
63113
if floatVal, err := v.Float64(); err == nil {
64-
to.NumericLabels.Set(k, floatVal)
114+
modelpb.NumericLabels(to.NumericLabels).Set(k, floatVal)
65115
}
66116
}
67117
}

input/elasticapm/internal/modeldecoder/modeldecoderutil/metrics.go

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@ package modeldecoderutil
2020
import (
2121
"time"
2222

23-
"github.com/elastic/apm-data/model"
23+
"github.com/elastic/apm-data/model/modelpb"
24+
"google.golang.org/protobuf/types/known/durationpb"
2425
)
2526

2627
// SetInternalMetrics extracts well-known internal metrics from event.Metricset.Samples,
@@ -33,7 +34,7 @@ import (
3334
// SetInternalMetrics returns true if any known metric samples were found, and false
3435
// otherwise. If no known metric samples were found, the caller may opt to omit the
3536
// metricset altogether.
36-
func SetInternalMetrics(event *model.APMEvent) bool {
37+
func SetInternalMetrics(event *modelpb.APMEvent) bool {
3738
if event.Transaction == nil {
3839
// Not an internal metricset.
3940
return false
@@ -43,10 +44,16 @@ func SetInternalMetrics(event *model.APMEvent) bool {
4344
for _, v := range event.Metricset.Samples {
4445
switch v.Name {
4546
case "span.self_time.count":
46-
event.Span.SelfTime.Count = int(v.Value)
47+
if event.Span.SelfTime == nil {
48+
event.Span.SelfTime = &modelpb.AggregatedDuration{}
49+
}
50+
event.Span.SelfTime.Count = int64(v.Value)
4751
haveMetrics = true
4852
case "span.self_time.sum.us":
49-
event.Span.SelfTime.Sum = time.Duration(v.Value * 1000)
53+
if event.Span.SelfTime == nil {
54+
event.Span.SelfTime = &modelpb.AggregatedDuration{}
55+
}
56+
event.Span.SelfTime.Sum = durationpb.New(time.Duration(v.Value * 1000))
5057
haveMetrics = true
5158
}
5259
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
// Licensed to Elasticsearch B.V. under one or more contributor
2+
// license agreements. See the NOTICE file distributed with
3+
// this work for additional information regarding copyright
4+
// ownership. Elasticsearch B.V. licenses this file to you under
5+
// the Apache License, Version 2.0 (the "License"); you may
6+
// not use this file except in compliance with the License.
7+
// You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package modeldecoderutil
19+
20+
import (
21+
"google.golang.org/protobuf/types/known/structpb"
22+
)
23+
24+
func ToStruct(m map[string]any) *structpb.Struct {
25+
if str, err := structpb.NewStruct(m); err == nil {
26+
return str
27+
}
28+
return nil
29+
}
30+
31+
func ToValue(a any) *structpb.Value {
32+
if v, err := structpb.NewValue(a); err == nil {
33+
return v
34+
}
35+
return nil
36+
}
37+
38+
func NormalizeMap(m map[string]any) map[string]any {
39+
if v := NormalizeHTTPRequestBody(m); v != nil {
40+
return v.(map[string]any)
41+
}
42+
return nil
43+
}

0 commit comments

Comments
 (0)