From 380e48812ddeb926c79663b79ca60952431e44be Mon Sep 17 00:00:00 2001 From: vanitabhagwat Date: Tue, 2 Jun 2026 22:36:47 -0700 Subject: [PATCH 1/2] feat: Add per-feature-view metrics for online read path (latency, requests, errors, hit rate) Emit per-feature-view latency, request count, error count, and total lookup requests on every online read in the Go feature server (HTTP + gRPC). This enables per-FV hit-rate computation in Datadog and allows filtering latency/error distributions by feature view. Key changes: - Add Distribution() to StatsdClient interface - New FeatureViewReadMetrics emitter (fv_read_latency_ms, fv_read_requests, fv_read_errors) - Extend LookupMetricsAggregator with totalByFV for feature_lookup_requests - Extract FV names from request (works with fullFeatureNames=false) - New unified flag ENABLE_FV_LEVEL_METRICS (backward compat with ENABLE_MISSING_KEY_METRICS) - Instrument GetOnlineFeatures and GetOnlineFeaturesRange in both HTTP and gRPC handlers Co-Authored-By: Claude Opus 4.6 --- go/internal/feast/metrics/client.go | 4 +- go/internal/feast/metrics/config.go | 5 + go/internal/feast/metrics/fv_read_metrics.go | 65 +++++++++ .../feast/metrics/fv_read_metrics_test.go | 132 ++++++++++++++++++ go/internal/feast/metrics/lookup_metrics.go | 14 ++ .../feast/metrics/lookup_metrics_test.go | 120 ++++++++++++---- go/internal/feast/server/grpc_server.go | 46 ++++++ go/internal/feast/server/http_server.go | 66 +++++++++ go/main.go | 8 +- 9 files changed, 425 insertions(+), 35 deletions(-) create mode 100644 go/internal/feast/metrics/fv_read_metrics.go create mode 100644 go/internal/feast/metrics/fv_read_metrics_test.go diff --git a/go/internal/feast/metrics/client.go b/go/internal/feast/metrics/client.go index f0e38208fc6..f010ea2f1cc 100644 --- a/go/internal/feast/metrics/client.go +++ b/go/internal/feast/metrics/client.go @@ -4,9 +4,11 @@ package metrics // The real github.com/DataDog/datadog-go/v5/statsd.Client satisfies this interface. type StatsdClient interface { Count(name string, value int64, tags []string, rate float64) error + Distribution(name string, value float64, tags []string, rate float64) error } // NoOpStatsdClient does nothing when metrics are disabled. type NoOpStatsdClient struct{} -func (n *NoOpStatsdClient) Count(string, int64, []string, float64) error { return nil } +func (n *NoOpStatsdClient) Count(string, int64, []string, float64) error { return nil } +func (n *NoOpStatsdClient) Distribution(string, float64, []string, float64) error { return nil } diff --git a/go/internal/feast/metrics/config.go b/go/internal/feast/metrics/config.go index 316715e619f..ef85e23b424 100644 --- a/go/internal/feast/metrics/config.go +++ b/go/internal/feast/metrics/config.go @@ -12,6 +12,11 @@ func IsMissingKeyMetricsEnabled() bool { return strings.ToLower(os.Getenv("ENABLE_MISSING_KEY_METRICS")) == "true" } +func IsFVMetricsEnabled() bool { + return strings.ToLower(os.Getenv("ENABLE_FV_LEVEL_METRICS")) == "true" || + IsMissingKeyMetricsEnabled() +} + func GetOnlineStoreType(config *registry.RepoConfig) string { if storeType, ok := config.OnlineStore["type"]; ok { return fmt.Sprintf("%v", storeType) diff --git a/go/internal/feast/metrics/fv_read_metrics.go b/go/internal/feast/metrics/fv_read_metrics.go new file mode 100644 index 00000000000..020591df9d5 --- /dev/null +++ b/go/internal/feast/metrics/fv_read_metrics.go @@ -0,0 +1,65 @@ +package metrics + +import ( + "math/rand" + "os" + "strconv" +) + +type FeatureViewReadMetrics struct { + project string + onlineStore string + client StatsdClient + sampleRate float64 +} + +func NewFeatureViewReadMetrics(project, onlineStore string, client StatsdClient) *FeatureViewReadMetrics { + if client == nil { + return nil + } + + sampleRate := 1.0 + if rateStr := os.Getenv("FEAST_METRICS_SAMPLE_RATE"); rateStr != "" { + if rate, err := strconv.ParseFloat(rateStr, 64); err == nil { + if rate > 0 && rate <= 1.0 { + sampleRate = rate + } + } + } + + return &FeatureViewReadMetrics{ + project: project, + onlineStore: onlineStore, + client: client, + sampleRate: sampleRate, + } +} + +// Emit emits latency, request count, and optionally errors for each feature view. +func (m *FeatureViewReadMetrics) Emit(featureViewNames []string, latencyMs float64, hasError bool) { + if m == nil || m.client == nil { + return + } + + if m.sampleRate < 1.0 && rand.Float64() > m.sampleRate { + return + } + + baseTags := []string{ + "project:" + m.project, + "online_store_type:" + m.onlineStore, + } + + for _, fvName := range featureViewNames { + tags := make([]string, len(baseTags)+1) + copy(tags, baseTags) + tags[len(baseTags)] = "feature_view:" + fvName + + m.client.Distribution("mlpfs.featureserver.fv_read_latency_ms", latencyMs, tags, 1.0) + m.client.Count("mlpfs.featureserver.fv_read_requests", 1, tags, 1.0) + + if hasError { + m.client.Count("mlpfs.featureserver.fv_read_errors", 1, tags, 1.0) + } + } +} diff --git a/go/internal/feast/metrics/fv_read_metrics_test.go b/go/internal/feast/metrics/fv_read_metrics_test.go new file mode 100644 index 00000000000..1fc3fc64ca9 --- /dev/null +++ b/go/internal/feast/metrics/fv_read_metrics_test.go @@ -0,0 +1,132 @@ +package metrics + +import ( + "os" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestFVMetrics_EmitLatencyDistribution(t *testing.T) { + fake := &fakeStatsdClient{} + m := NewFeatureViewReadMetrics("proj", "redis", fake) + + m.Emit([]string{"hotel_fv"}, 42.5, false) + + assert.Len(t, fake.distCalls, 1) + assert.Equal(t, "mlpfs.featureserver.fv_read_latency_ms", fake.distCalls[0].name) + assert.Equal(t, 42.5, fake.distCalls[0].value) + assert.Contains(t, fake.distCalls[0].tags, "project:proj") + assert.Contains(t, fake.distCalls[0].tags, "online_store_type:redis") + assert.Contains(t, fake.distCalls[0].tags, "feature_view:hotel_fv") +} + +func TestFVMetrics_EmitRequestsCount(t *testing.T) { + fake := &fakeStatsdClient{} + m := NewFeatureViewReadMetrics("proj", "redis", fake) + + m.Emit([]string{"hotel_fv"}, 10.0, false) + + requestCalls := filterCalls(fake.calls, "mlpfs.featureserver.fv_read_requests") + assert.Len(t, requestCalls, 1) + assert.Equal(t, int64(1), requestCalls[0].value) + assert.Contains(t, requestCalls[0].tags, "feature_view:hotel_fv") +} + +func TestFVMetrics_EmitErrors(t *testing.T) { + fake := &fakeStatsdClient{} + m := NewFeatureViewReadMetrics("proj", "redis", fake) + + m.Emit([]string{"hotel_fv"}, 100.0, true) + + errorCalls := filterCalls(fake.calls, "mlpfs.featureserver.fv_read_errors") + assert.Len(t, errorCalls, 1) + assert.Equal(t, int64(1), errorCalls[0].value) + assert.Contains(t, errorCalls[0].tags, "feature_view:hotel_fv") +} + +func TestFVMetrics_NoErrorsWhenSuccess(t *testing.T) { + fake := &fakeStatsdClient{} + m := NewFeatureViewReadMetrics("proj", "redis", fake) + + m.Emit([]string{"hotel_fv"}, 50.0, false) + + errorCalls := filterCalls(fake.calls, "mlpfs.featureserver.fv_read_errors") + assert.Len(t, errorCalls, 0) +} + +func TestFVMetrics_MultipleFeatureViews(t *testing.T) { + fake := &fakeStatsdClient{} + m := NewFeatureViewReadMetrics("proj", "valkey", fake) + + m.Emit([]string{"hotel_fv", "user_fv", "booking_fv"}, 75.0, false) + + // 3 latency distributions + assert.Len(t, fake.distCalls, 3) + + // 3 request counts + requestCalls := filterCalls(fake.calls, "mlpfs.featureserver.fv_read_requests") + assert.Len(t, requestCalls, 3) + + // All have same latency + for _, dc := range fake.distCalls { + assert.Equal(t, 75.0, dc.value) + } + + // Check each FV is tagged + fvTags := make(map[string]bool) + for _, dc := range fake.distCalls { + fvTags[findTag(dc.tags, "feature_view:")] = true + } + assert.True(t, fvTags["hotel_fv"]) + assert.True(t, fvTags["user_fv"]) + assert.True(t, fvTags["booking_fv"]) +} + +func TestFVMetrics_NilClient(t *testing.T) { + m := NewFeatureViewReadMetrics("proj", "redis", nil) + assert.Nil(t, m) +} + +func TestFVMetrics_NilSafe(t *testing.T) { + var m *FeatureViewReadMetrics + m.Emit([]string{"fv"}, 10.0, false) // should not panic +} + +func TestFVMetrics_Sampling(t *testing.T) { + os.Setenv("FEAST_METRICS_SAMPLE_RATE", "0.5") + defer os.Unsetenv("FEAST_METRICS_SAMPLE_RATE") + + fake := &fakeStatsdClient{} + m := NewFeatureViewReadMetrics("proj", "redis", fake) + + assert.Equal(t, 0.5, m.sampleRate) + + // Emit many times — should skip roughly half + emitted := 0 + for i := 0; i < 100; i++ { + fake.distCalls = nil + m.Emit([]string{"fv"}, 10.0, false) + if len(fake.distCalls) > 0 { + emitted++ + } + } + + // With 50% sampling, should emit roughly 50 times (allow wide margin) + assert.Greater(t, emitted, 20) + assert.Less(t, emitted, 80) +} + +func TestIsFVMetricsEnabled(t *testing.T) { + os.Unsetenv("ENABLE_FV_LEVEL_METRICS") + os.Unsetenv("ENABLE_MISSING_KEY_METRICS") + assert.False(t, IsFVMetricsEnabled()) + + os.Setenv("ENABLE_FV_LEVEL_METRICS", "true") + assert.True(t, IsFVMetricsEnabled()) + os.Unsetenv("ENABLE_FV_LEVEL_METRICS") + + os.Setenv("ENABLE_MISSING_KEY_METRICS", "true") + assert.True(t, IsFVMetricsEnabled()) + os.Unsetenv("ENABLE_MISSING_KEY_METRICS") +} diff --git a/go/internal/feast/metrics/lookup_metrics.go b/go/internal/feast/metrics/lookup_metrics.go index 334b1555934..84fc79801ca 100644 --- a/go/internal/feast/metrics/lookup_metrics.go +++ b/go/internal/feast/metrics/lookup_metrics.go @@ -24,6 +24,7 @@ func extractFeatureView(featureName string) string { type LookupMetricsAggregator struct { notFound map[string]int64 nullOrExpired map[string]int64 + totalByFV map[string]int64 project string onlineStore string client StatsdClient @@ -51,6 +52,7 @@ func NewLookupMetricsAggregator( return &LookupMetricsAggregator{ notFound: make(map[string]int64), nullOrExpired: make(map[string]int64), + totalByFV: make(map[string]int64), project: project, onlineStore: onlineStore, client: client, @@ -62,6 +64,7 @@ func (m *LookupMetricsAggregator) Record(featureID string, status serving.FieldS if m == nil { return } + m.totalByFV[extractFeatureView(featureID)]++ switch status { case serving.FieldStatus_NOT_FOUND: m.notFound[featureID]++ @@ -137,4 +140,15 @@ func (m *LookupMetricsAggregator) Emit() { tags[len(baseTags)+1] = "feature_view:" + extractFeatureView(featureID) m.client.Count("mlpfs.featureserver.feature_lookup_null_or_expired", adjustedCount, tags, 1.0) } + + for fvName, count := range m.totalByFV { + if count == 0 { + continue + } + adjustedCount := int64(float64(count) * multiplier) + tags := make([]string, len(baseTags)+1) + copy(tags, baseTags) + tags[len(baseTags)] = "feature_view:" + fvName + m.client.Count("mlpfs.featureserver.feature_lookup_requests", adjustedCount, tags, 1.0) + } } diff --git a/go/internal/feast/metrics/lookup_metrics_test.go b/go/internal/feast/metrics/lookup_metrics_test.go index b4f1651bf52..c6046a53838 100644 --- a/go/internal/feast/metrics/lookup_metrics_test.go +++ b/go/internal/feast/metrics/lookup_metrics_test.go @@ -15,8 +15,15 @@ type metricCall struct { tags []string } +type distCall struct { + name string + value float64 + tags []string +} + type fakeStatsdClient struct { - calls []metricCall + calls []metricCall + distCalls []distCall } func (f *fakeStatsdClient) Count(name string, value int64, tags []string, rate float64) error { @@ -24,6 +31,11 @@ func (f *fakeStatsdClient) Count(name string, value int64, tags []string, rate f return nil } +func (f *fakeStatsdClient) Distribution(name string, value float64, tags []string, rate float64) error { + f.distCalls = append(f.distCalls, distCall{name: name, value: value, tags: tags}) + return nil +} + func newTestAggregator(client StatsdClient) *LookupMetricsAggregator { return NewLookupMetricsAggregator("test_project", "redis", client) } @@ -37,10 +49,15 @@ func TestAggregator_AllNotFound(t *testing.T) { agg.Record("user_fv__age", serving.FieldStatus_NOT_FOUND) agg.Emit() - assert.Len(t, fake.calls, 1) - assert.Equal(t, "mlpfs.featureserver.feature_lookup_not_found", fake.calls[0].name) - assert.Equal(t, int64(3), fake.calls[0].value) - assert.Contains(t, fake.calls[0].tags, "feature:user_fv__age") + notFoundCalls := filterCalls(fake.calls, "mlpfs.featureserver.feature_lookup_not_found") + assert.Len(t, notFoundCalls, 1) + assert.Equal(t, int64(3), notFoundCalls[0].value) + assert.Contains(t, notFoundCalls[0].tags, "feature:user_fv__age") + + totalCalls := filterCalls(fake.calls, "mlpfs.featureserver.feature_lookup_requests") + assert.Len(t, totalCalls, 1) + assert.Equal(t, int64(3), totalCalls[0].value) + assert.Contains(t, totalCalls[0].tags, "feature_view:user_fv") } func TestAggregator_AllNullOrExpired(t *testing.T) { @@ -52,10 +69,14 @@ func TestAggregator_AllNullOrExpired(t *testing.T) { agg.Record("order_fv__amt", serving.FieldStatus_OUTSIDE_MAX_AGE) agg.Emit() - assert.Len(t, fake.calls, 1) - assert.Equal(t, "mlpfs.featureserver.feature_lookup_null_or_expired", fake.calls[0].name) - assert.Equal(t, int64(3), fake.calls[0].value) - assert.Contains(t, fake.calls[0].tags, "feature:order_fv__amt") + nullCalls := filterCalls(fake.calls, "mlpfs.featureserver.feature_lookup_null_or_expired") + assert.Len(t, nullCalls, 1) + assert.Equal(t, int64(3), nullCalls[0].value) + assert.Contains(t, nullCalls[0].tags, "feature:order_fv__amt") + + totalCalls := filterCalls(fake.calls, "mlpfs.featureserver.feature_lookup_requests") + assert.Len(t, totalCalls, 1) + assert.Equal(t, int64(3), totalCalls[0].value) } func TestAggregator_MixedStatuses(t *testing.T) { @@ -69,18 +90,22 @@ func TestAggregator_MixedStatuses(t *testing.T) { agg.Record("fv_b__f2", serving.FieldStatus_OUTSIDE_MAX_AGE) agg.Emit() - assert.Len(t, fake.calls, 2) + notFoundCalls := filterCalls(fake.calls, "mlpfs.featureserver.feature_lookup_not_found") + assert.Len(t, notFoundCalls, 1) + assert.Equal(t, int64(1), notFoundCalls[0].value) - callsByName := map[string]metricCall{} - for _, c := range fake.calls { - callsByName[c.name+":"+findTag(c.tags, "feature:")] = c - } + nullCalls := filterCalls(fake.calls, "mlpfs.featureserver.feature_lookup_null_or_expired") + assert.Len(t, nullCalls, 1) + assert.Equal(t, int64(2), nullCalls[0].value) - nf := callsByName["mlpfs.featureserver.feature_lookup_not_found:fv_a__f1"] - assert.Equal(t, int64(1), nf.value) - - ne := callsByName["mlpfs.featureserver.feature_lookup_null_or_expired:fv_b__f2"] - assert.Equal(t, int64(2), ne.value) + totalCalls := filterCalls(fake.calls, "mlpfs.featureserver.feature_lookup_requests") + assert.Len(t, totalCalls, 2) + totalByFV := map[string]int64{} + for _, c := range totalCalls { + totalByFV[findTag(c.tags, "feature_view:")] = c.value + } + assert.Equal(t, int64(2), totalByFV["fv_a"]) + assert.Equal(t, int64(3), totalByFV["fv_b"]) } func TestAggregator_AllPresent(t *testing.T) { @@ -91,7 +116,16 @@ func TestAggregator_AllPresent(t *testing.T) { agg.Record("fv__f1", serving.FieldStatus_PRESENT) agg.Emit() - assert.Len(t, fake.calls, 0) + // No not_found or null_or_expired calls + notFoundCalls := filterCalls(fake.calls, "mlpfs.featureserver.feature_lookup_not_found") + nullCalls := filterCalls(fake.calls, "mlpfs.featureserver.feature_lookup_null_or_expired") + assert.Len(t, notFoundCalls, 0) + assert.Len(t, nullCalls, 0) + + // But total requests should still be emitted + totalCalls := filterCalls(fake.calls, "mlpfs.featureserver.feature_lookup_requests") + assert.Len(t, totalCalls, 1) + assert.Equal(t, int64(2), totalCalls[0].value) } func TestAggregator_NilSafe(t *testing.T) { @@ -114,8 +148,9 @@ func TestAggregator_Tags(t *testing.T) { agg.Record("hotel_fv__price", serving.FieldStatus_NOT_FOUND) agg.Emit() - assert.Len(t, fake.calls, 1) - tags := fake.calls[0].tags + notFoundCalls := filterCalls(fake.calls, "mlpfs.featureserver.feature_lookup_not_found") + assert.Len(t, notFoundCalls, 1) + tags := notFoundCalls[0].tags assert.Contains(t, tags, "project:mlpfs") assert.Contains(t, tags, "online_store_type:eg-valkey") assert.Contains(t, tags, "feature:hotel_fv__price") @@ -140,14 +175,20 @@ func TestRecordFromFeatureVectors(t *testing.T) { agg.RecordFromFeatureVectors(vectors) agg.Emit() - assert.Len(t, fake.calls, 2) + notFoundCalls := filterCalls(fake.calls, "mlpfs.featureserver.feature_lookup_not_found") + assert.Len(t, notFoundCalls, 2) callsByFeature := map[string]int64{} - for _, c := range fake.calls { + for _, c := range notFoundCalls { callsByFeature[findTag(c.tags, "feature:")] = c.value } assert.Equal(t, int64(1), callsByFeature["fv_a__f1"]) assert.Equal(t, int64(2), callsByFeature["fv_a__f2"]) + + totalCalls := filterCalls(fake.calls, "mlpfs.featureserver.feature_lookup_requests") + assert.Len(t, totalCalls, 1) + assert.Equal(t, int64(4), totalCalls[0].value) + assert.Contains(t, totalCalls[0].tags, "feature_view:fv_a") } func TestRecordFromRangeFeatureVectors(t *testing.T) { @@ -167,9 +208,14 @@ func TestRecordFromRangeFeatureVectors(t *testing.T) { agg.RecordFromRangeFeatureVectors(vectors) agg.Emit() - assert.Len(t, fake.calls, 1) - assert.Equal(t, int64(2), fake.calls[0].value) - assert.Equal(t, "mlpfs.featureserver.feature_lookup_not_found", fake.calls[0].name) + notFoundCalls := filterCalls(fake.calls, "mlpfs.featureserver.feature_lookup_not_found") + assert.Len(t, notFoundCalls, 1) + assert.Equal(t, int64(2), notFoundCalls[0].value) + + totalCalls := filterCalls(fake.calls, "mlpfs.featureserver.feature_lookup_requests") + assert.Len(t, totalCalls, 1) + assert.Equal(t, int64(3), totalCalls[0].value) + assert.Contains(t, totalCalls[0].tags, "feature_view:sfv") } func TestIsMissingKeyMetricsEnabled(t *testing.T) { @@ -224,6 +270,17 @@ func findTag(tags []string, prefix string) string { return "" } +// filterCalls returns only metric calls matching the given metric name. +func filterCalls(calls []metricCall, name string) []metricCall { + var result []metricCall + for _, c := range calls { + if c.name == name { + result = append(result, c) + } + } + return result +} + func TestSampling_DefaultNoSampling(t *testing.T) { os.Unsetenv("FEAST_METRICS_SAMPLE_RATE") fake := &fakeStatsdClient{} @@ -289,8 +346,10 @@ func TestSampling_AdjustsCountsCorrectly(t *testing.T) { agg.Emit() if len(fake.calls) > 0 { emitted = true + notFoundCalls := filterCalls(fake.calls, "mlpfs.featureserver.feature_lookup_not_found") + assert.Len(t, notFoundCalls, 1) // With sample_rate=0.5, count of 2 should become 4 (2 / 0.5) - assert.Equal(t, int64(4), fake.calls[0].value, "Count should be adjusted by 1/sample_rate") + assert.Equal(t, int64(4), notFoundCalls[0].value, "Count should be adjusted by 1/sample_rate") break } } @@ -309,6 +368,7 @@ func TestSampling_NoAdjustmentWhenNotSampling(t *testing.T) { agg.Record("fv__f1", serving.FieldStatus_NOT_FOUND) agg.Emit() - assert.Len(t, fake.calls, 1) - assert.Equal(t, int64(2), fake.calls[0].value, "Count should not be adjusted with sample_rate=1.0") + notFoundCalls := filterCalls(fake.calls, "mlpfs.featureserver.feature_lookup_not_found") + assert.Len(t, notFoundCalls, 1) + assert.Equal(t, int64(2), notFoundCalls[0].value, "Count should not be adjusted with sample_rate=1.0") } diff --git a/go/internal/feast/server/grpc_server.go b/go/internal/feast/server/grpc_server.go index d9a928c7edb..be571f5c458 100644 --- a/go/internal/feast/server/grpc_server.go +++ b/go/internal/feast/server/grpc_server.go @@ -3,6 +3,8 @@ package server import ( "context" "fmt" + "time" + "github.com/DataDog/dd-trace-go/v2/ddtrace/tracer" "github.com/feast-dev/feast/go/internal/feast" "github.com/feast-dev/feast/go/internal/feast/errors" @@ -77,6 +79,9 @@ func (s *grpcServingServiceServer) GetOnlineFeatures(ctx context.Context, reques return nil, err } + fvNames := extractFVNamesFromRequest(featuresOrService.FeaturesRefs, featuresOrService.FeatureService) + t0 := time.Now() + featureVectors, err := s.fs.GetOnlineFeatures( ctx, featuresOrService.FeaturesRefs, @@ -85,8 +90,18 @@ func (s *grpcServingServiceServer) GetOnlineFeatures(ctx context.Context, reques request.GetRequestContext(), request.GetFullFeatureNames()) + latencyMs := float64(time.Since(t0).Milliseconds()) + if err != nil { logSpanContext.Error().Err(err).Msg("Error getting online features") + if s.metricsClient != nil && s.config != nil && len(fvNames) > 0 { + fvMetrics := metrics.NewFeatureViewReadMetrics( + s.config.Project, + metrics.GetOnlineStoreType(s.config), + s.metricsClient, + ) + fvMetrics.Emit(fvNames, latencyMs, true) + } return nil, errors.GrpcFromError(err) } @@ -98,6 +113,15 @@ func (s *grpcServingServiceServer) GetOnlineFeatures(ctx context.Context, reques ) agg.RecordFromFeatureVectors(featureVectors) agg.Emit() + + if len(fvNames) > 0 { + fvMetrics := metrics.NewFeatureViewReadMetrics( + s.config.Project, + metrics.GetOnlineStoreType(s.config), + s.metricsClient, + ) + fvMetrics.Emit(fvNames, latencyMs, false) + } } resp := &serving.GetOnlineFeaturesResponse{ @@ -165,6 +189,9 @@ func (s *grpcServingServiceServer) GetOnlineFeaturesRange(ctx context.Context, r return nil, err } + fvNames := extractFVNamesFromRequest(featuresOrService.FeaturesRefs, featuresOrService.FeatureService) + t0 := time.Now() + rangeFeatureVectors, err := s.fs.GetOnlineFeaturesRange( ctx, featuresOrService.FeaturesRefs, @@ -177,8 +204,18 @@ func (s *grpcServingServiceServer) GetOnlineFeaturesRange(ctx context.Context, r request.GetFullFeatureNames(), ) + latencyMs := float64(time.Since(t0).Milliseconds()) + if err != nil { logSpanContext.Error().Err(err).Msg("Error getting online features range") + if s.metricsClient != nil && s.config != nil && len(fvNames) > 0 { + fvMetrics := metrics.NewFeatureViewReadMetrics( + s.config.Project, + metrics.GetOnlineStoreType(s.config), + s.metricsClient, + ) + fvMetrics.Emit(fvNames, latencyMs, true) + } return nil, errors.GrpcFromError(err) } @@ -190,6 +227,15 @@ func (s *grpcServingServiceServer) GetOnlineFeaturesRange(ctx context.Context, r ) agg.RecordFromRangeFeatureVectors(rangeFeatureVectors) agg.Emit() + + if len(fvNames) > 0 { + fvMetrics := metrics.NewFeatureViewReadMetrics( + s.config.Project, + metrics.GetOnlineStoreType(s.config), + s.metricsClient, + ) + fvMetrics.Emit(fvNames, latencyMs, false) + } } entities := request.GetEntities() diff --git a/go/internal/feast/server/http_server.go b/go/internal/feast/server/http_server.go index 73538f43dd8..873f6722c4c 100644 --- a/go/internal/feast/server/http_server.go +++ b/go/internal/feast/server/http_server.go @@ -328,6 +328,28 @@ func NewHttpServer(fs *feast.FeatureStore, loggingService *logging.LoggingServic return &HttpServer{fs: fs, loggingService: loggingService, metricsClient: metricsClient, config: config} } +func extractFVNamesFromRequest(features []string, featureService *model.FeatureService) []string { + seen := make(map[string]struct{}) + + for _, ref := range features { + if parts := strings.SplitN(ref, ":", 2); len(parts) == 2 { + seen[parts[0]] = struct{}{} + } + } + + if featureService != nil { + for _, proj := range featureService.Projections { + seen[proj.NameToUse()] = struct{}{} + } + } + + names := make([]string, 0, len(seen)) + for name := range seen { + names = append(names, name) + } + return names +} + func parseIncludeMetadata(r *http.Request) (bool, error) { q := r.URL.Query() raw := strings.TrimSpace(q.Get("includeMetadata")) @@ -408,6 +430,9 @@ func (s *HttpServer) getOnlineFeatures(w http.ResponseWriter, r *http.Request) { requestContextProto[key] = value.ToProto() } + fvNames := extractFVNamesFromRequest(request.Features, featureService) + t0 := time.Now() + featureVectors, err = s.fs.GetOnlineFeatures( ctx, request.Features, @@ -416,6 +441,8 @@ func (s *HttpServer) getOnlineFeatures(w http.ResponseWriter, r *http.Request) { requestContextProto, request.FullFeatureNames) + latencyMs := float64(time.Since(t0).Milliseconds()) + defer func() { if featureVectors != nil { go releaseCGOMemory(featureVectors) @@ -424,6 +451,14 @@ func (s *HttpServer) getOnlineFeatures(w http.ResponseWriter, r *http.Request) { if err != nil { logSpanContext.Error().Err(err).Msg("Error getting feature vector") + if s.metricsClient != nil && s.config != nil && len(fvNames) > 0 { + fvMetrics := metrics.NewFeatureViewReadMetrics( + s.config.Project, + metrics.GetOnlineStoreType(s.config), + s.metricsClient, + ) + fvMetrics.Emit(fvNames, latencyMs, true) + } writeJSONError(w, fmt.Errorf("Error getting feature vector: %+v", err), http.StatusInternalServerError) return } @@ -436,6 +471,15 @@ func (s *HttpServer) getOnlineFeatures(w http.ResponseWriter, r *http.Request) { ) agg.RecordFromFeatureVectors(featureVectors) agg.Emit() + + if len(fvNames) > 0 { + fvMetrics := metrics.NewFeatureViewReadMetrics( + s.config.Project, + metrics.GetOnlineStoreType(s.config), + s.metricsClient, + ) + fvMetrics.Emit(fvNames, latencyMs, false) + } } var featureNames []string @@ -619,6 +663,9 @@ func (s *HttpServer) getOnlineFeaturesRange(w http.ResponseWriter, r *http.Reque return } + fvNames := extractFVNamesFromRequest(request.Features, featureService) + t0 := time.Now() + rangeFeatureVectors, err := s.fs.GetOnlineFeaturesRange( ctx, request.Features, @@ -630,6 +677,8 @@ func (s *HttpServer) getOnlineFeaturesRange(w http.ResponseWriter, r *http.Reque requestContextProto, request.FullFeatureNames) + latencyMs := float64(time.Since(t0).Milliseconds()) + defer func() { if rangeFeatureVectors != nil { go releaseCGORangeMemory(rangeFeatureVectors) @@ -638,6 +687,14 @@ func (s *HttpServer) getOnlineFeaturesRange(w http.ResponseWriter, r *http.Reque if err != nil { logSpanContext.Error().Err(err).Msg("Error getting range feature vectors") + if s.metricsClient != nil && s.config != nil && len(fvNames) > 0 { + fvMetrics := metrics.NewFeatureViewReadMetrics( + s.config.Project, + metrics.GetOnlineStoreType(s.config), + s.metricsClient, + ) + fvMetrics.Emit(fvNames, latencyMs, true) + } writeJSONError(w, err, http.StatusInternalServerError) return } @@ -650,6 +707,15 @@ func (s *HttpServer) getOnlineFeaturesRange(w http.ResponseWriter, r *http.Reque ) agg.RecordFromRangeFeatureVectors(rangeFeatureVectors) agg.Emit() + + if len(fvNames) > 0 { + fvMetrics := metrics.NewFeatureViewReadMetrics( + s.config.Project, + metrics.GetOnlineStoreType(s.config), + s.metricsClient, + ) + fvMetrics.Emit(fvNames, latencyMs, false) + } } featureNames, entities, results, err := processFeatureVectors( diff --git a/go/main.go b/go/main.go index 91a4af5d9cd..27a3fd6c7be 100644 --- a/go/main.go +++ b/go/main.go @@ -106,18 +106,18 @@ func main() { } var metricsClient metrics.StatsdClient - if metrics.IsMissingKeyMetricsEnabled() { + if metrics.IsFVMetricsEnabled() { if addr := metrics.GetStatsDAddress(); addr != "" { client, clientErr := statsd.New(addr) if clientErr != nil { - log.Error().Err(clientErr).Msg("Failed to create statsd client for missing key metrics") + log.Error().Err(clientErr).Msg("Failed to create statsd client for metrics") } else { metricsClient = client defer client.Close() - log.Info().Msg("Missing key metrics enabled") + log.Info().Msg("Feature view level metrics enabled") } } else { - log.Warn().Msg("ENABLE_MISSING_KEY_METRICS is true but DD_AGENT_HOST is not set") + log.Warn().Msg("ENABLE_FV_LEVEL_METRICS/ENABLE_MISSING_KEY_METRICS is true but DD_AGENT_HOST is not set") } } From a090d46c91b6739a08a48b7220507cd8283cebc3 Mon Sep 17 00:00:00 2001 From: vanitabhagwat Date: Tue, 2 Jun 2026 23:12:33 -0700 Subject: [PATCH 2/2] refactor: Extract metric name constants and DRY helper for FV metrics emission - Define constants for all metric names (FVReadLatencyMetric, FVReadRequestsMetric, FVReadErrorsMetric, LookupNotFoundMetric, LookupNullOrExpiredMetric, LookupRequestsMetric) - Extract emitFVReadMetrics helper into server_commons.go to eliminate 4 identical nil-check + construct + emit patterns across HTTP and gRPC handlers - Update tests to reference constants instead of string literals Co-Authored-By: Claude Opus 4.6 --- go/internal/feast/metrics/fv_read_metrics.go | 12 +++++-- .../feast/metrics/fv_read_metrics_test.go | 10 +++--- go/internal/feast/metrics/lookup_metrics.go | 12 +++++-- .../feast/metrics/lookup_metrics_test.go | 34 +++++++++--------- go/internal/feast/server/grpc_server.go | 36 +++---------------- go/internal/feast/server/http_server.go | 36 +++---------------- go/internal/feast/server/server_commons.go | 16 ++++++++- 7 files changed, 63 insertions(+), 93 deletions(-) diff --git a/go/internal/feast/metrics/fv_read_metrics.go b/go/internal/feast/metrics/fv_read_metrics.go index 020591df9d5..531336a88a4 100644 --- a/go/internal/feast/metrics/fv_read_metrics.go +++ b/go/internal/feast/metrics/fv_read_metrics.go @@ -6,6 +6,12 @@ import ( "strconv" ) +const ( + FVReadLatencyMetric = "mlpfs.featureserver.fv_read_latency_ms" + FVReadRequestsMetric = "mlpfs.featureserver.fv_read_requests" + FVReadErrorsMetric = "mlpfs.featureserver.fv_read_errors" +) + type FeatureViewReadMetrics struct { project string onlineStore string @@ -55,11 +61,11 @@ func (m *FeatureViewReadMetrics) Emit(featureViewNames []string, latencyMs float copy(tags, baseTags) tags[len(baseTags)] = "feature_view:" + fvName - m.client.Distribution("mlpfs.featureserver.fv_read_latency_ms", latencyMs, tags, 1.0) - m.client.Count("mlpfs.featureserver.fv_read_requests", 1, tags, 1.0) + m.client.Distribution(FVReadLatencyMetric, latencyMs, tags, 1.0) + m.client.Count(FVReadRequestsMetric, 1, tags, 1.0) if hasError { - m.client.Count("mlpfs.featureserver.fv_read_errors", 1, tags, 1.0) + m.client.Count(FVReadErrorsMetric, 1, tags, 1.0) } } } diff --git a/go/internal/feast/metrics/fv_read_metrics_test.go b/go/internal/feast/metrics/fv_read_metrics_test.go index 1fc3fc64ca9..10dd17743b3 100644 --- a/go/internal/feast/metrics/fv_read_metrics_test.go +++ b/go/internal/feast/metrics/fv_read_metrics_test.go @@ -14,7 +14,7 @@ func TestFVMetrics_EmitLatencyDistribution(t *testing.T) { m.Emit([]string{"hotel_fv"}, 42.5, false) assert.Len(t, fake.distCalls, 1) - assert.Equal(t, "mlpfs.featureserver.fv_read_latency_ms", fake.distCalls[0].name) + assert.Equal(t, FVReadLatencyMetric, fake.distCalls[0].name) assert.Equal(t, 42.5, fake.distCalls[0].value) assert.Contains(t, fake.distCalls[0].tags, "project:proj") assert.Contains(t, fake.distCalls[0].tags, "online_store_type:redis") @@ -27,7 +27,7 @@ func TestFVMetrics_EmitRequestsCount(t *testing.T) { m.Emit([]string{"hotel_fv"}, 10.0, false) - requestCalls := filterCalls(fake.calls, "mlpfs.featureserver.fv_read_requests") + requestCalls := filterCalls(fake.calls, FVReadRequestsMetric) assert.Len(t, requestCalls, 1) assert.Equal(t, int64(1), requestCalls[0].value) assert.Contains(t, requestCalls[0].tags, "feature_view:hotel_fv") @@ -39,7 +39,7 @@ func TestFVMetrics_EmitErrors(t *testing.T) { m.Emit([]string{"hotel_fv"}, 100.0, true) - errorCalls := filterCalls(fake.calls, "mlpfs.featureserver.fv_read_errors") + errorCalls := filterCalls(fake.calls, FVReadErrorsMetric) assert.Len(t, errorCalls, 1) assert.Equal(t, int64(1), errorCalls[0].value) assert.Contains(t, errorCalls[0].tags, "feature_view:hotel_fv") @@ -51,7 +51,7 @@ func TestFVMetrics_NoErrorsWhenSuccess(t *testing.T) { m.Emit([]string{"hotel_fv"}, 50.0, false) - errorCalls := filterCalls(fake.calls, "mlpfs.featureserver.fv_read_errors") + errorCalls := filterCalls(fake.calls, FVReadErrorsMetric) assert.Len(t, errorCalls, 0) } @@ -65,7 +65,7 @@ func TestFVMetrics_MultipleFeatureViews(t *testing.T) { assert.Len(t, fake.distCalls, 3) // 3 request counts - requestCalls := filterCalls(fake.calls, "mlpfs.featureserver.fv_read_requests") + requestCalls := filterCalls(fake.calls, FVReadRequestsMetric) assert.Len(t, requestCalls, 3) // All have same latency diff --git a/go/internal/feast/metrics/lookup_metrics.go b/go/internal/feast/metrics/lookup_metrics.go index 84fc79801ca..87ecf9b5072 100644 --- a/go/internal/feast/metrics/lookup_metrics.go +++ b/go/internal/feast/metrics/lookup_metrics.go @@ -10,6 +10,12 @@ import ( "github.com/feast-dev/feast/go/protos/feast/serving" ) +const ( + LookupNotFoundMetric = "mlpfs.featureserver.feature_lookup_not_found" + LookupNullOrExpiredMetric = "mlpfs.featureserver.feature_lookup_null_or_expired" + LookupRequestsMetric = "mlpfs.featureserver.feature_lookup_requests" +) + // extractFeatureView extracts the feature view name from a full feature name. // Feature names follow the format: feature_view__feature_name // Example: "hotel_fv__price" -> "hotel_fv" @@ -126,7 +132,7 @@ func (m *LookupMetricsAggregator) Emit() { copy(tags, baseTags) tags[len(baseTags)] = "feature:" + featureID tags[len(baseTags)+1] = "feature_view:" + extractFeatureView(featureID) - m.client.Count("mlpfs.featureserver.feature_lookup_not_found", adjustedCount, tags, 1.0) + m.client.Count(LookupNotFoundMetric, adjustedCount, tags, 1.0) } for featureID, count := range m.nullOrExpired { @@ -138,7 +144,7 @@ func (m *LookupMetricsAggregator) Emit() { copy(tags, baseTags) tags[len(baseTags)] = "feature:" + featureID tags[len(baseTags)+1] = "feature_view:" + extractFeatureView(featureID) - m.client.Count("mlpfs.featureserver.feature_lookup_null_or_expired", adjustedCount, tags, 1.0) + m.client.Count(LookupNullOrExpiredMetric, adjustedCount, tags, 1.0) } for fvName, count := range m.totalByFV { @@ -149,6 +155,6 @@ func (m *LookupMetricsAggregator) Emit() { tags := make([]string, len(baseTags)+1) copy(tags, baseTags) tags[len(baseTags)] = "feature_view:" + fvName - m.client.Count("mlpfs.featureserver.feature_lookup_requests", adjustedCount, tags, 1.0) + m.client.Count(LookupRequestsMetric, adjustedCount, tags, 1.0) } } diff --git a/go/internal/feast/metrics/lookup_metrics_test.go b/go/internal/feast/metrics/lookup_metrics_test.go index c6046a53838..837dc15c1f8 100644 --- a/go/internal/feast/metrics/lookup_metrics_test.go +++ b/go/internal/feast/metrics/lookup_metrics_test.go @@ -49,12 +49,12 @@ func TestAggregator_AllNotFound(t *testing.T) { agg.Record("user_fv__age", serving.FieldStatus_NOT_FOUND) agg.Emit() - notFoundCalls := filterCalls(fake.calls, "mlpfs.featureserver.feature_lookup_not_found") + notFoundCalls := filterCalls(fake.calls, LookupNotFoundMetric) assert.Len(t, notFoundCalls, 1) assert.Equal(t, int64(3), notFoundCalls[0].value) assert.Contains(t, notFoundCalls[0].tags, "feature:user_fv__age") - totalCalls := filterCalls(fake.calls, "mlpfs.featureserver.feature_lookup_requests") + totalCalls := filterCalls(fake.calls, LookupRequestsMetric) assert.Len(t, totalCalls, 1) assert.Equal(t, int64(3), totalCalls[0].value) assert.Contains(t, totalCalls[0].tags, "feature_view:user_fv") @@ -69,12 +69,12 @@ func TestAggregator_AllNullOrExpired(t *testing.T) { agg.Record("order_fv__amt", serving.FieldStatus_OUTSIDE_MAX_AGE) agg.Emit() - nullCalls := filterCalls(fake.calls, "mlpfs.featureserver.feature_lookup_null_or_expired") + nullCalls := filterCalls(fake.calls, LookupNullOrExpiredMetric) assert.Len(t, nullCalls, 1) assert.Equal(t, int64(3), nullCalls[0].value) assert.Contains(t, nullCalls[0].tags, "feature:order_fv__amt") - totalCalls := filterCalls(fake.calls, "mlpfs.featureserver.feature_lookup_requests") + totalCalls := filterCalls(fake.calls, LookupRequestsMetric) assert.Len(t, totalCalls, 1) assert.Equal(t, int64(3), totalCalls[0].value) } @@ -90,15 +90,15 @@ func TestAggregator_MixedStatuses(t *testing.T) { agg.Record("fv_b__f2", serving.FieldStatus_OUTSIDE_MAX_AGE) agg.Emit() - notFoundCalls := filterCalls(fake.calls, "mlpfs.featureserver.feature_lookup_not_found") + notFoundCalls := filterCalls(fake.calls, LookupNotFoundMetric) assert.Len(t, notFoundCalls, 1) assert.Equal(t, int64(1), notFoundCalls[0].value) - nullCalls := filterCalls(fake.calls, "mlpfs.featureserver.feature_lookup_null_or_expired") + nullCalls := filterCalls(fake.calls, LookupNullOrExpiredMetric) assert.Len(t, nullCalls, 1) assert.Equal(t, int64(2), nullCalls[0].value) - totalCalls := filterCalls(fake.calls, "mlpfs.featureserver.feature_lookup_requests") + totalCalls := filterCalls(fake.calls, LookupRequestsMetric) assert.Len(t, totalCalls, 2) totalByFV := map[string]int64{} for _, c := range totalCalls { @@ -117,13 +117,13 @@ func TestAggregator_AllPresent(t *testing.T) { agg.Emit() // No not_found or null_or_expired calls - notFoundCalls := filterCalls(fake.calls, "mlpfs.featureserver.feature_lookup_not_found") - nullCalls := filterCalls(fake.calls, "mlpfs.featureserver.feature_lookup_null_or_expired") + notFoundCalls := filterCalls(fake.calls, LookupNotFoundMetric) + nullCalls := filterCalls(fake.calls, LookupNullOrExpiredMetric) assert.Len(t, notFoundCalls, 0) assert.Len(t, nullCalls, 0) // But total requests should still be emitted - totalCalls := filterCalls(fake.calls, "mlpfs.featureserver.feature_lookup_requests") + totalCalls := filterCalls(fake.calls, LookupRequestsMetric) assert.Len(t, totalCalls, 1) assert.Equal(t, int64(2), totalCalls[0].value) } @@ -148,7 +148,7 @@ func TestAggregator_Tags(t *testing.T) { agg.Record("hotel_fv__price", serving.FieldStatus_NOT_FOUND) agg.Emit() - notFoundCalls := filterCalls(fake.calls, "mlpfs.featureserver.feature_lookup_not_found") + notFoundCalls := filterCalls(fake.calls, LookupNotFoundMetric) assert.Len(t, notFoundCalls, 1) tags := notFoundCalls[0].tags assert.Contains(t, tags, "project:mlpfs") @@ -175,7 +175,7 @@ func TestRecordFromFeatureVectors(t *testing.T) { agg.RecordFromFeatureVectors(vectors) agg.Emit() - notFoundCalls := filterCalls(fake.calls, "mlpfs.featureserver.feature_lookup_not_found") + notFoundCalls := filterCalls(fake.calls, LookupNotFoundMetric) assert.Len(t, notFoundCalls, 2) callsByFeature := map[string]int64{} @@ -185,7 +185,7 @@ func TestRecordFromFeatureVectors(t *testing.T) { assert.Equal(t, int64(1), callsByFeature["fv_a__f1"]) assert.Equal(t, int64(2), callsByFeature["fv_a__f2"]) - totalCalls := filterCalls(fake.calls, "mlpfs.featureserver.feature_lookup_requests") + totalCalls := filterCalls(fake.calls, LookupRequestsMetric) assert.Len(t, totalCalls, 1) assert.Equal(t, int64(4), totalCalls[0].value) assert.Contains(t, totalCalls[0].tags, "feature_view:fv_a") @@ -208,11 +208,11 @@ func TestRecordFromRangeFeatureVectors(t *testing.T) { agg.RecordFromRangeFeatureVectors(vectors) agg.Emit() - notFoundCalls := filterCalls(fake.calls, "mlpfs.featureserver.feature_lookup_not_found") + notFoundCalls := filterCalls(fake.calls, LookupNotFoundMetric) assert.Len(t, notFoundCalls, 1) assert.Equal(t, int64(2), notFoundCalls[0].value) - totalCalls := filterCalls(fake.calls, "mlpfs.featureserver.feature_lookup_requests") + totalCalls := filterCalls(fake.calls, LookupRequestsMetric) assert.Len(t, totalCalls, 1) assert.Equal(t, int64(3), totalCalls[0].value) assert.Contains(t, totalCalls[0].tags, "feature_view:sfv") @@ -346,7 +346,7 @@ func TestSampling_AdjustsCountsCorrectly(t *testing.T) { agg.Emit() if len(fake.calls) > 0 { emitted = true - notFoundCalls := filterCalls(fake.calls, "mlpfs.featureserver.feature_lookup_not_found") + notFoundCalls := filterCalls(fake.calls, LookupNotFoundMetric) assert.Len(t, notFoundCalls, 1) // With sample_rate=0.5, count of 2 should become 4 (2 / 0.5) assert.Equal(t, int64(4), notFoundCalls[0].value, "Count should be adjusted by 1/sample_rate") @@ -368,7 +368,7 @@ func TestSampling_NoAdjustmentWhenNotSampling(t *testing.T) { agg.Record("fv__f1", serving.FieldStatus_NOT_FOUND) agg.Emit() - notFoundCalls := filterCalls(fake.calls, "mlpfs.featureserver.feature_lookup_not_found") + notFoundCalls := filterCalls(fake.calls, LookupNotFoundMetric) assert.Len(t, notFoundCalls, 1) assert.Equal(t, int64(2), notFoundCalls[0].value, "Count should not be adjusted with sample_rate=1.0") } diff --git a/go/internal/feast/server/grpc_server.go b/go/internal/feast/server/grpc_server.go index be571f5c458..eb93054f07a 100644 --- a/go/internal/feast/server/grpc_server.go +++ b/go/internal/feast/server/grpc_server.go @@ -94,14 +94,7 @@ func (s *grpcServingServiceServer) GetOnlineFeatures(ctx context.Context, reques if err != nil { logSpanContext.Error().Err(err).Msg("Error getting online features") - if s.metricsClient != nil && s.config != nil && len(fvNames) > 0 { - fvMetrics := metrics.NewFeatureViewReadMetrics( - s.config.Project, - metrics.GetOnlineStoreType(s.config), - s.metricsClient, - ) - fvMetrics.Emit(fvNames, latencyMs, true) - } + emitFVReadMetrics(s.metricsClient, s.config, fvNames, latencyMs, true) return nil, errors.GrpcFromError(err) } @@ -114,14 +107,7 @@ func (s *grpcServingServiceServer) GetOnlineFeatures(ctx context.Context, reques agg.RecordFromFeatureVectors(featureVectors) agg.Emit() - if len(fvNames) > 0 { - fvMetrics := metrics.NewFeatureViewReadMetrics( - s.config.Project, - metrics.GetOnlineStoreType(s.config), - s.metricsClient, - ) - fvMetrics.Emit(fvNames, latencyMs, false) - } + emitFVReadMetrics(s.metricsClient, s.config, fvNames, latencyMs, false) } resp := &serving.GetOnlineFeaturesResponse{ @@ -208,14 +194,7 @@ func (s *grpcServingServiceServer) GetOnlineFeaturesRange(ctx context.Context, r if err != nil { logSpanContext.Error().Err(err).Msg("Error getting online features range") - if s.metricsClient != nil && s.config != nil && len(fvNames) > 0 { - fvMetrics := metrics.NewFeatureViewReadMetrics( - s.config.Project, - metrics.GetOnlineStoreType(s.config), - s.metricsClient, - ) - fvMetrics.Emit(fvNames, latencyMs, true) - } + emitFVReadMetrics(s.metricsClient, s.config, fvNames, latencyMs, true) return nil, errors.GrpcFromError(err) } @@ -228,14 +207,7 @@ func (s *grpcServingServiceServer) GetOnlineFeaturesRange(ctx context.Context, r agg.RecordFromRangeFeatureVectors(rangeFeatureVectors) agg.Emit() - if len(fvNames) > 0 { - fvMetrics := metrics.NewFeatureViewReadMetrics( - s.config.Project, - metrics.GetOnlineStoreType(s.config), - s.metricsClient, - ) - fvMetrics.Emit(fvNames, latencyMs, false) - } + emitFVReadMetrics(s.metricsClient, s.config, fvNames, latencyMs, false) } entities := request.GetEntities() diff --git a/go/internal/feast/server/http_server.go b/go/internal/feast/server/http_server.go index 873f6722c4c..0f764360d1d 100644 --- a/go/internal/feast/server/http_server.go +++ b/go/internal/feast/server/http_server.go @@ -451,14 +451,7 @@ func (s *HttpServer) getOnlineFeatures(w http.ResponseWriter, r *http.Request) { if err != nil { logSpanContext.Error().Err(err).Msg("Error getting feature vector") - if s.metricsClient != nil && s.config != nil && len(fvNames) > 0 { - fvMetrics := metrics.NewFeatureViewReadMetrics( - s.config.Project, - metrics.GetOnlineStoreType(s.config), - s.metricsClient, - ) - fvMetrics.Emit(fvNames, latencyMs, true) - } + emitFVReadMetrics(s.metricsClient, s.config, fvNames, latencyMs, true) writeJSONError(w, fmt.Errorf("Error getting feature vector: %+v", err), http.StatusInternalServerError) return } @@ -472,14 +465,7 @@ func (s *HttpServer) getOnlineFeatures(w http.ResponseWriter, r *http.Request) { agg.RecordFromFeatureVectors(featureVectors) agg.Emit() - if len(fvNames) > 0 { - fvMetrics := metrics.NewFeatureViewReadMetrics( - s.config.Project, - metrics.GetOnlineStoreType(s.config), - s.metricsClient, - ) - fvMetrics.Emit(fvNames, latencyMs, false) - } + emitFVReadMetrics(s.metricsClient, s.config, fvNames, latencyMs, false) } var featureNames []string @@ -687,14 +673,7 @@ func (s *HttpServer) getOnlineFeaturesRange(w http.ResponseWriter, r *http.Reque if err != nil { logSpanContext.Error().Err(err).Msg("Error getting range feature vectors") - if s.metricsClient != nil && s.config != nil && len(fvNames) > 0 { - fvMetrics := metrics.NewFeatureViewReadMetrics( - s.config.Project, - metrics.GetOnlineStoreType(s.config), - s.metricsClient, - ) - fvMetrics.Emit(fvNames, latencyMs, true) - } + emitFVReadMetrics(s.metricsClient, s.config, fvNames, latencyMs, true) writeJSONError(w, err, http.StatusInternalServerError) return } @@ -708,14 +687,7 @@ func (s *HttpServer) getOnlineFeaturesRange(w http.ResponseWriter, r *http.Reque agg.RecordFromRangeFeatureVectors(rangeFeatureVectors) agg.Emit() - if len(fvNames) > 0 { - fvMetrics := metrics.NewFeatureViewReadMetrics( - s.config.Project, - metrics.GetOnlineStoreType(s.config), - s.metricsClient, - ) - fvMetrics.Emit(fvNames, latencyMs, false) - } + emitFVReadMetrics(s.metricsClient, s.config, fvNames, latencyMs, false) } featureNames, entities, results, err := processFeatureVectors( diff --git a/go/internal/feast/server/server_commons.go b/go/internal/feast/server/server_commons.go index d4a6c366818..075db6ad9d4 100644 --- a/go/internal/feast/server/server_commons.go +++ b/go/internal/feast/server/server_commons.go @@ -1,11 +1,13 @@ package server import ( - "github.com/prometheus/client_golang/prometheus/promhttp" "net/http" "os" "github.com/DataDog/dd-trace-go/v2/ddtrace/tracer" + "github.com/feast-dev/feast/go/internal/feast/metrics" + "github.com/feast-dev/feast/go/internal/feast/registry" + "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/rs/zerolog" ) @@ -21,6 +23,18 @@ func LogWithSpanContext(span *tracer.Span) zerolog.Logger { return logger } +func emitFVReadMetrics(metricsClient metrics.StatsdClient, config *registry.RepoConfig, fvNames []string, latencyMs float64, hasError bool) { + if metricsClient == nil || config == nil || len(fvNames) == 0 { + return + } + fvMetrics := metrics.NewFeatureViewReadMetrics( + config.Project, + metrics.GetOnlineStoreType(config), + metricsClient, + ) + fvMetrics.Emit(fvNames, latencyMs, hasError) +} + func CommonHttpHandlers(s *HttpServer, healthCheckHandler http.HandlerFunc) []Handler { return []Handler{ {