Skip to content

Commit bed59f8

Browse files
committed
merge in platform log changes
1 parent 4767c1f commit bed59f8

3 files changed

Lines changed: 170 additions & 19 deletions

File tree

collector/receiver/telemetryapireceiver/go.mod

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ replace github.com/open-telemetry/opentelemetry-lambda/collector => ../../
66

77
require (
88
github.com/golang-collections/go-datastructures v0.0.0-20150211160725-59788d5eb259
9+
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.138.0
910
github.com/open-telemetry/opentelemetry-lambda/collector v0.98.0
1011
github.com/stretchr/testify v1.11.1
1112
go.opentelemetry.io/collector/component v1.44.0
@@ -21,6 +22,7 @@ require (
2122
)
2223

2324
require (
25+
github.com/cespare/xxhash/v2 v2.3.0 // indirect
2426
github.com/gobwas/glob v0.2.3 // indirect
2527
go.yaml.in/yaml/v3 v3.0.4 // indirect
2628
)
@@ -43,7 +45,7 @@ require (
4345
github.com/modern-go/reflect2 v1.0.3-0.20250322232337-35a7c28c31ee // indirect
4446
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
4547
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
46-
go.opentelemetry.io/collector/confmap/xconfmap v0.138.0
48+
go.opentelemetry.io/collector/confmap/xconfmap v0.138.0 // indirect
4749
go.opentelemetry.io/collector/consumer/consumererror v0.138.0 // indirect
4850
go.opentelemetry.io/collector/consumer/xconsumer v0.138.0 // indirect
4951
go.opentelemetry.io/collector/featuregate v1.44.0 // indirect

collector/receiver/telemetryapireceiver/go.sum

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
2+
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
13
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
24
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
35
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
@@ -48,6 +50,8 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJ
4850
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
4951
github.com/modern-go/reflect2 v1.0.3-0.20250322232337-35a7c28c31ee h1:W5t00kpgFdJifH4BDsTlE89Zl93FEloxaWZfGcifgq8=
5052
github.com/modern-go/reflect2 v1.0.3-0.20250322232337-35a7c28c31ee/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
53+
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.138.0 h1:34HE7sAjlXlzL1HAbDxOBKFdU3tTQcmgFVvjnts67DA=
54+
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.138.0/go.mod h1:XzBJKpG3Gi3GMyWF+7NgVl219PaGTl4+RaNo8f8KAZs=
5155
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
5256
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
5357
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=

collector/receiver/telemetryapireceiver/receiver.go

Lines changed: 163 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,11 @@ import (
3333
"go.opentelemetry.io/collector/consumer"
3434
"go.opentelemetry.io/collector/pdata/pcommon"
3535
"go.opentelemetry.io/collector/pdata/plog"
36+
"go.opentelemetry.io/collector/pdata/pmetric"
3637
"go.opentelemetry.io/collector/pdata/ptrace"
3738
"go.opentelemetry.io/collector/receiver"
3839
semconv "go.opentelemetry.io/collector/semconv/v1.25.0"
40+
semconv2 "go.opentelemetry.io/otel/semconv/v1.24.0"
3941
"go.uber.org/zap"
4042

4143
"github.com/open-telemetry/opentelemetry-lambda/collector/internal/telemetryapi"
@@ -44,6 +46,10 @@ import (
4446
const (
4547
initialQueueSize = 5
4648
scopeName = "github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapi"
49+
telemetrySuccessStatus = "success"
50+
telemetryFailureStatus = "failure"
51+
telemetryErrorStatus = "error"
52+
telemetryTimeoutStatus = "timeout"
4753
platformReportLogFmt = "REPORT RequestId: %s Duration: %.2f ms Billed Duration: %.0f ms Memory Size: %.0f MB Max Memory Used: %.0f MB"
4854
platformStartLogFmt = "START RequestId: %s Version: %s"
4955
platformRuntimeDoneLogFmt = "END RequestId: %s Version: %s"
@@ -63,6 +69,7 @@ type telemetryAPIReceiver struct {
6369
logger *zap.Logger
6470
queue *queue.Queue // queue is a synchronous queue and is used to put the received log events to be dispatched later
6571
nextTraces consumer.Traces
72+
nextMetrics consumer.Metrics
6673
nextLogs consumer.Logs
6774
lastPlatformStartTime string
6875
lastPlatformEndTime string
@@ -71,6 +78,8 @@ type telemetryAPIReceiver struct {
7178
types []telemetryapi.EventType
7279
resource pcommon.Resource
7380
faasFunctionVersion string
81+
faasName string
82+
faaSMetricBuilders *FaaSMetricBuilders
7483
currentFaasInvocationID string
7584
logReport bool
7685
}
@@ -145,10 +154,31 @@ func (r *telemetryAPIReceiver) httpHandler(w http.ResponseWriter, req *http.Requ
145154
case string(telemetryapi.PlatformInitStart):
146155
r.logger.Info(fmt.Sprintf("Init start: %s", r.lastPlatformStartTime), zap.Any("event", el))
147156
r.lastPlatformStartTime = el.Time
157+
158+
if record, ok := el.Record.(map[string]any); ok {
159+
functionName, _ := record["functionName"].(string)
160+
if functionName != "" {
161+
r.faasName = functionName
162+
}
163+
}
148164
// Function initialization completed.
149165
case string(telemetryapi.PlatformInitRuntimeDone):
150166
r.logger.Info(fmt.Sprintf("Init end: %s", r.lastPlatformEndTime), zap.Any("event", el))
151167
r.lastPlatformEndTime = el.Time
168+
169+
if len(r.lastPlatformStartTime) > 0 && len(r.lastPlatformEndTime) > 0 {
170+
if record, ok := el.Record.(map[string]any); ok {
171+
if td, err := r.createPlatformInitSpan(record, r.lastPlatformStartTime, r.lastPlatformEndTime); err == nil {
172+
err := r.nextTraces.ConsumeTraces(context.Background(), td)
173+
if err == nil {
174+
r.lastPlatformEndTime = ""
175+
r.lastPlatformStartTime = ""
176+
} else {
177+
r.logger.Error("error receiving traces", zap.Error(err))
178+
}
179+
}
180+
}
181+
}
152182
}
153183
// TODO: add support for additional events, see https://docs.aws.amazon.com/lambda/latest/dg/telemetry-api.html
154184
// A report of function initialization.
@@ -170,15 +200,13 @@ func (r *telemetryAPIReceiver) httpHandler(w http.ResponseWriter, req *http.Requ
170200
// Lambda dropped log entries.
171201
// case "platform.logsDropped":
172202
}
173-
if len(r.lastPlatformStartTime) > 0 && len(r.lastPlatformEndTime) > 0 {
174-
if td, err := r.createPlatformInitSpan(r.lastPlatformStartTime, r.lastPlatformEndTime); err == nil {
175-
if r.nextTraces != nil {
176-
err := r.nextTraces.ConsumeTraces(context.Background(), td)
177-
if err == nil {
178-
r.lastPlatformEndTime = ""
179-
r.lastPlatformStartTime = ""
180-
} else {
181-
r.logger.Error("error receiving traces", zap.Error(err))
203+
// Metrics
204+
if r.nextMetrics != nil {
205+
if metrics, err := r.createMetrics(slice); err == nil {
206+
if metrics.MetricCount() > 0 {
207+
err := r.nextMetrics.ConsumeMetrics(context.Background(), metrics)
208+
if err != nil {
209+
r.logger.Error("error receiving metrics", zap.Error(err))
182210
}
183211
}
184212
}
@@ -209,6 +237,91 @@ func (r *telemetryAPIReceiver) getRecordRequestId(record map[string]interface{})
209237
return ""
210238
}
211239

240+
func (r *telemetryAPIReceiver) createMetrics(slice []event) (pmetric.Metrics, error) {
241+
metric := pmetric.NewMetrics()
242+
resourceMetric := metric.ResourceMetrics().AppendEmpty()
243+
r.resource.CopyTo(resourceMetric.Resource())
244+
scopeMetric := resourceMetric.ScopeMetrics().AppendEmpty()
245+
scopeMetric.Scope().SetName(scopeName)
246+
scopeMetric.SetSchemaUrl(semconv2.SchemaURL)
247+
248+
for _, el := range slice {
249+
r.logger.Debug(fmt.Sprintf("Event: %s", el.Type), zap.Any("event", el))
250+
record, ok := el.Record.(map[string]any)
251+
if !ok {
252+
continue
253+
}
254+
ts, err := time.Parse(time.RFC3339, el.Time)
255+
if err != nil {
256+
continue
257+
}
258+
259+
switch el.Type {
260+
case string(telemetryapi.PlatformInitStart):
261+
r.faaSMetricBuilders.coldstartsMetric.Add(1)
262+
r.faaSMetricBuilders.coldstartsMetric.AppendDataPoints(scopeMetric, pcommon.NewTimestampFromTime(ts))
263+
case string(telemetryapi.PlatformInitReport):
264+
metrics, ok := record["metrics"].(map[string]any)
265+
if !ok {
266+
continue
267+
}
268+
269+
status, _ := metrics["status"].(string)
270+
if status == telemetryFailureStatus || status == telemetryErrorStatus {
271+
r.faaSMetricBuilders.errorsMetric.Add(1)
272+
r.faaSMetricBuilders.errorsMetric.AppendDataPoints(scopeMetric, pcommon.NewTimestampFromTime(ts))
273+
} else if status == telemetryTimeoutStatus {
274+
r.faaSMetricBuilders.timeoutsMetric.Add(1)
275+
r.faaSMetricBuilders.timeoutsMetric.AppendDataPoints(scopeMetric, pcommon.NewTimestampFromTime(ts))
276+
}
277+
278+
durationMs, ok := metrics["durationMs"].(float64)
279+
if !ok {
280+
continue
281+
}
282+
283+
r.faaSMetricBuilders.initDurationMetric.Record(durationMs / 1000.0)
284+
r.faaSMetricBuilders.initDurationMetric.AppendDataPoints(scopeMetric, pcommon.NewTimestampFromTime(ts))
285+
case string(telemetryapi.PlatformReport):
286+
metrics, ok := record["metrics"].(map[string]any)
287+
if !ok {
288+
continue
289+
}
290+
291+
maxMemoryUsedMb, ok := metrics["maxMemoryUsedMB"].(float64)
292+
if ok {
293+
r.faaSMetricBuilders.memUsageMetric.Record(maxMemoryUsedMb * 1000000.0)
294+
r.faaSMetricBuilders.memUsageMetric.AppendDataPoints(scopeMetric, pcommon.NewTimestampFromTime(ts))
295+
}
296+
case string(telemetryapi.PlatformRuntimeDone):
297+
status, _ := record["status"].(string)
298+
299+
if status == telemetrySuccessStatus {
300+
r.faaSMetricBuilders.invocationsMetric.Add(1)
301+
r.faaSMetricBuilders.invocationsMetric.AppendDataPoints(scopeMetric, pcommon.NewTimestampFromTime(ts))
302+
} else if status == telemetryFailureStatus || status == telemetryErrorStatus {
303+
r.faaSMetricBuilders.errorsMetric.Add(1)
304+
r.faaSMetricBuilders.errorsMetric.AppendDataPoints(scopeMetric, pcommon.NewTimestampFromTime(ts))
305+
} else if status == telemetryTimeoutStatus {
306+
r.faaSMetricBuilders.timeoutsMetric.Add(1)
307+
r.faaSMetricBuilders.timeoutsMetric.AppendDataPoints(scopeMetric, pcommon.NewTimestampFromTime(ts))
308+
}
309+
310+
metrics, ok := record["metrics"].(map[string]any)
311+
if !ok {
312+
continue
313+
}
314+
315+
durationMs, ok := metrics["durationMs"].(float64)
316+
if ok {
317+
r.faaSMetricBuilders.invokeDurationMetric.Record(durationMs / 1000.0)
318+
r.faaSMetricBuilders.invokeDurationMetric.AppendDataPoints(scopeMetric, pcommon.NewTimestampFromTime(ts))
319+
}
320+
}
321+
}
322+
return metric, nil
323+
}
324+
212325
func (r *telemetryAPIReceiver) createLogs(slice []event) (plog.Logs, error) {
213326
log := plog.NewLogs()
214327
resourceLog := log.ResourceLogs().AppendEmpty()
@@ -456,11 +569,15 @@ func (r *telemetryAPIReceiver) registerTracesConsumer(next consumer.Traces) {
456569
r.nextTraces = next
457570
}
458571

572+
func (r *telemetryAPIReceiver) registerMetricsConsumer(next consumer.Metrics) {
573+
r.nextMetrics = next
574+
}
575+
459576
func (r *telemetryAPIReceiver) registerLogsConsumer(next consumer.Logs) {
460577
r.nextLogs = next
461578
}
462579

463-
func (r *telemetryAPIReceiver) createPlatformInitSpan(start, end string) (ptrace.Traces, error) {
580+
func (r *telemetryAPIReceiver) createPlatformInitSpan(record map[string]any, start, end string) (ptrace.Traces, error) {
464581
traceData := ptrace.NewTraces()
465582
rs := traceData.ResourceSpans().AppendEmpty()
466583
r.resource.CopyTo(rs.Resource())
@@ -470,7 +587,7 @@ func (r *telemetryAPIReceiver) createPlatformInitSpan(start, end string) (ptrace
470587
span := ss.Spans().AppendEmpty()
471588
span.SetTraceID(newTraceID())
472589
span.SetSpanID(newSpanID())
473-
span.SetName("platform.initRuntimeDone")
590+
span.SetName(fmt.Sprintf("init %s", r.faasName))
474591
span.SetKind(ptrace.SpanKindInternal)
475592
span.Attributes().PutBool(semconv.AttributeFaaSColdstart, true)
476593
startTime, err := time.Parse(time.RFC3339, start)
@@ -483,9 +600,36 @@ func (r *telemetryAPIReceiver) createPlatformInitSpan(start, end string) (ptrace
483600
return ptrace.Traces{}, err
484601
}
485602
span.SetEndTimestamp(pcommon.NewTimestampFromTime(endTime))
603+
604+
status, _ := record["status"].(string)
605+
if status != "" && status != "success" {
606+
span.Status().SetCode(ptrace.StatusCodeError)
607+
errorType, _ := record["errorType"].(string)
608+
if errorType != "" {
609+
span.Attributes().PutStr(semconv.AttributeErrorType, errorType)
610+
} else {
611+
span.Attributes().PutStr(semconv.AttributeErrorType, status)
612+
}
613+
}
486614
return traceData, nil
487615
}
488616

617+
func getMetricsTemporality(cfg *Config) pmetric.AggregationTemporality {
618+
temporality := strings.ToLower(cfg.MetricsTemporality)
619+
if temporality == "" {
620+
temporality = os.Getenv("OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE")
621+
}
622+
623+
switch temporality {
624+
case "delta":
625+
return pmetric.AggregationTemporalityDelta
626+
case "cumulative":
627+
return pmetric.AggregationTemporalityCumulative
628+
default:
629+
return pmetric.AggregationTemporalityCumulative
630+
}
631+
}
632+
489633
func newTelemetryAPIReceiver(
490634
cfg *Config,
491635
set receiver.Settings,
@@ -527,13 +671,14 @@ func newTelemetryAPIReceiver(
527671
}
528672

529673
return &telemetryAPIReceiver{
530-
logger: set.Logger,
531-
queue: queue.New(initialQueueSize),
532-
extensionID: cfg.extensionID,
533-
port: cfg.Port,
534-
types: subscribedTypes,
535-
resource: r,
536-
logReport: cfg.LogReport,
674+
logger: set.Logger,
675+
queue: queue.New(initialQueueSize),
676+
extensionID: cfg.extensionID,
677+
port: cfg.Port,
678+
types: subscribedTypes,
679+
resource: r,
680+
faaSMetricBuilders: NewFaaSMetricBuilders(pcommon.NewTimestampFromTime(time.Now()), getMetricsTemporality(cfg)),
681+
logReport: cfg.LogReport,
537682
}, nil
538683
}
539684

0 commit comments

Comments
 (0)