Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion collector/receiver/telemetryapireceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ Supported events:

| Field | Default | Description |
|---------|---------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------|
| `port` | 4325 | HTTP server port to receive Telemetry API data. |
| `port` | 0 (dynamically determined by OS) | HTTP server port to receive Telemetry API data. |
| `types` | ["platform", "function", "extension"] | [Types](https://docs.aws.amazon.com/lambda/latest/dg/telemetry-api-reference.html#telemetry-subscribe-api) of telemetry to subscribe to |


Expand Down
2 changes: 1 addition & 1 deletion collector/receiver/telemetryapireceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
const (
typeStr = "telemetryapi"
stability = component.StabilityLevelDevelopment
defaultPort = 4325
defaultPort = 0
platform = "platform"
function = "function"
extension = "extension"
Expand Down
56 changes: 40 additions & 16 deletions collector/receiver/telemetryapireceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ import (
"context"
crand "crypto/rand"
"encoding/json"
"errors"
"fmt"
"io"
"net"
"net/http"
"os"
"strconv"
Expand Down Expand Up @@ -84,29 +86,55 @@ type telemetryAPIReceiver struct {
logReport bool
}

func (r *telemetryAPIReceiver) bindListener() (net.Listener, string, error) {
listenerAddr := listenOnAddress()
l, err := net.Listen("tcp", listenerAddr+":"+strconv.Itoa(r.port))
if err != nil {
return nil, "", err
}
addr := fmt.Sprintf("%s:%d", listenerAddr, l.Addr().(*net.TCPAddr).Port)
return l, addr, nil
}

func (r *telemetryAPIReceiver) Start(ctx context.Context, host component.Host) error {
address := listenOnAddress(r.port)
r.logger.Info("Listening for requests", zap.String("address", address))
if len(r.types) == 0 {
return fmt.Errorf("no telemetry event types provided")
}

listener, address, err := r.bindListener()
if err != nil {
return fmt.Errorf("failed to find available port: %w", err)
}
r.logger.Info("Starting telemetry API listener", zap.String("address", address))

mux := http.NewServeMux()
mux.HandleFunc("/", r.httpHandler)
r.httpServer = &http.Server{Addr: address, Handler: mux}
go func() {
_ = r.httpServer.ListenAndServe()
err := r.httpServer.Serve(listener)
if !errors.Is(err, http.ErrServerClosed) {
r.logger.Error("Unexpected stop on HTTP Server", zap.Error(err))
} else {
r.logger.Info("HTTP server closed", zap.Error(err))
}
}()

telemetryClient := telemetryapi.NewClient(r.logger)
if len(r.types) > 0 {
_, err := telemetryClient.Subscribe(ctx, r.types, r.extensionID, fmt.Sprintf("http://%s/", address))
if err != nil {
r.logger.Info("Listening for requests", zap.String("address", address), zap.String("extensionID", r.extensionID))
return err
}
if _, err := telemetryClient.Subscribe(ctx, r.types, r.extensionID, fmt.Sprintf("http://%s/", address)); err != nil {
r.logger.Error("Failed to subscribe to telemetry", zap.Error(err))
_ = r.Shutdown(ctx)
return err
}
r.logger.Info("Successfully subscribed to telemetry", zap.String("address", address))
return nil
}

func (r *telemetryAPIReceiver) Shutdown(ctx context.Context) error {
if r.httpServer != nil {
if err := r.httpServer.Shutdown(ctx); err != nil {
return err
}
}
return nil
}
Comment thread
wpessers marked this conversation as resolved.

Expand Down Expand Up @@ -727,14 +755,10 @@ func newTelemetryAPIReceiver(
}, nil
}

func listenOnAddress(port int) string {
func listenOnAddress() string {
envAwsLocal, ok := os.LookupEnv("AWS_SAM_LOCAL")
var addr string
if ok && envAwsLocal == "true" {
addr = ":" + strconv.Itoa(port)
} else {
addr = "sandbox.localdomain:" + strconv.Itoa(port)
return ""
}

return addr
return "sandbox.localdomain"
}
33 changes: 29 additions & 4 deletions collector/receiver/telemetryapireceiver/receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,16 @@ func TestListenOnAddress(t *testing.T) {
{
desc: "listen on address without AWS_SAM_LOCAL env variable",
testFunc: func(t *testing.T) {
addr := listenOnAddress(4325)
require.EqualValues(t, "sandbox.localdomain:4325", addr)
addr := listenOnAddress()
require.EqualValues(t, "sandbox.localdomain", addr)
},
},
{
desc: "listen on address with AWS_SAM_LOCAL env variable",
testFunc: func(t *testing.T) {
t.Setenv("AWS_SAM_LOCAL", "true")
addr := listenOnAddress(4325)
require.EqualValues(t, ":4325", addr)
addr := listenOnAddress()
require.EqualValues(t, "", addr)
},
},
}
Expand All @@ -56,6 +56,31 @@ func TestListenOnAddress(t *testing.T) {
}
}

func TestBindListener(t *testing.T) {
t.Setenv("AWS_SAM_LOCAL", "true")

t.Run("dynamic port allocation", func(t *testing.T) {
r, err := newTelemetryAPIReceiver(&Config{Port: 0}, receivertest.NewNopSettings(Type))
require.NoError(t, err)
listener, addr, err := r.bindListener()
require.NoError(t, err)
require.NotEmpty(t, addr)
require.NotNil(t, listener)
t.Cleanup(func() { require.NoError(t, listener.Close()) })
require.Contains(t, addr, ":")
})

t.Run("specific port", func(t *testing.T) {
r, err := newTelemetryAPIReceiver(&Config{Port: 4325}, receivertest.NewNopSettings(Type))
require.NoError(t, err)
listener, addr, err := r.bindListener()
require.NoError(t, err)
require.NotNil(t, listener)
t.Cleanup(func() { require.NoError(t, listener.Close()) })
require.Contains(t, addr, ":4325")
})
}

type mockConsumer struct {
consumed int
}
Expand Down
Loading