Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion go/internal/feast/metrics/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ package metrics
// The real github.com/DataDog/datadog-go/v5/statsd.Client satisfies this interface.
type StatsdClient interface {
Count(name string, value int64, tags []string, rate float64) error
Distribution(name string, value float64, tags []string, rate float64) error
}

// NoOpStatsdClient does nothing when metrics are disabled.
type NoOpStatsdClient struct{}

func (n *NoOpStatsdClient) Count(string, int64, []string, float64) error { return nil }
func (n *NoOpStatsdClient) Count(string, int64, []string, float64) error { return nil }
func (n *NoOpStatsdClient) Distribution(string, float64, []string, float64) error { return nil }
5 changes: 5 additions & 0 deletions go/internal/feast/metrics/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@ func IsMissingKeyMetricsEnabled() bool {
return strings.ToLower(os.Getenv("ENABLE_MISSING_KEY_METRICS")) == "true"
}

func IsFVMetricsEnabled() bool {
return strings.ToLower(os.Getenv("ENABLE_FV_LEVEL_METRICS")) == "true" ||
IsMissingKeyMetricsEnabled()
}

func GetOnlineStoreType(config *registry.RepoConfig) string {
if storeType, ok := config.OnlineStore["type"]; ok {
return fmt.Sprintf("%v", storeType)
Expand Down
71 changes: 71 additions & 0 deletions go/internal/feast/metrics/fv_read_metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package metrics

import (
"math/rand"
"os"
"strconv"
)

const (
FVReadLatencyMetric = "mlpfs.featureserver.fv_read_latency_ms"
FVReadRequestsMetric = "mlpfs.featureserver.fv_read_requests"
FVReadErrorsMetric = "mlpfs.featureserver.fv_read_errors"
)

type FeatureViewReadMetrics struct {
project string
onlineStore string
client StatsdClient
sampleRate float64
}

func NewFeatureViewReadMetrics(project, onlineStore string, client StatsdClient) *FeatureViewReadMetrics {
if client == nil {
return nil
}

sampleRate := 1.0
if rateStr := os.Getenv("FEAST_METRICS_SAMPLE_RATE"); rateStr != "" {
if rate, err := strconv.ParseFloat(rateStr, 64); err == nil {
if rate > 0 && rate <= 1.0 {
sampleRate = rate
}
}
}

return &FeatureViewReadMetrics{
project: project,
onlineStore: onlineStore,
client: client,
sampleRate: sampleRate,
}
}

// Emit emits latency, request count, and optionally errors for each feature view.
func (m *FeatureViewReadMetrics) Emit(featureViewNames []string, latencyMs float64, hasError bool) {
if m == nil || m.client == nil {
return
}

if m.sampleRate < 1.0 && rand.Float64() > m.sampleRate {
return
}

baseTags := []string{
"project:" + m.project,
"online_store_type:" + m.onlineStore,
}

for _, fvName := range featureViewNames {
tags := make([]string, len(baseTags)+1)
copy(tags, baseTags)
tags[len(baseTags)] = "feature_view:" + fvName

m.client.Distribution(FVReadLatencyMetric, latencyMs, tags, 1.0)
m.client.Count(FVReadRequestsMetric, 1, tags, 1.0)

if hasError {
m.client.Count(FVReadErrorsMetric, 1, tags, 1.0)
}
}
}
132 changes: 132 additions & 0 deletions go/internal/feast/metrics/fv_read_metrics_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
package metrics

import (
"os"
"testing"

"github.com/stretchr/testify/assert"
)

func TestFVMetrics_EmitLatencyDistribution(t *testing.T) {
fake := &fakeStatsdClient{}
m := NewFeatureViewReadMetrics("proj", "redis", fake)

m.Emit([]string{"hotel_fv"}, 42.5, false)

assert.Len(t, fake.distCalls, 1)
assert.Equal(t, FVReadLatencyMetric, fake.distCalls[0].name)
assert.Equal(t, 42.5, fake.distCalls[0].value)
assert.Contains(t, fake.distCalls[0].tags, "project:proj")
assert.Contains(t, fake.distCalls[0].tags, "online_store_type:redis")
assert.Contains(t, fake.distCalls[0].tags, "feature_view:hotel_fv")
}

func TestFVMetrics_EmitRequestsCount(t *testing.T) {
fake := &fakeStatsdClient{}
m := NewFeatureViewReadMetrics("proj", "redis", fake)

m.Emit([]string{"hotel_fv"}, 10.0, false)

requestCalls := filterCalls(fake.calls, FVReadRequestsMetric)
assert.Len(t, requestCalls, 1)
assert.Equal(t, int64(1), requestCalls[0].value)
assert.Contains(t, requestCalls[0].tags, "feature_view:hotel_fv")
}

func TestFVMetrics_EmitErrors(t *testing.T) {
fake := &fakeStatsdClient{}
m := NewFeatureViewReadMetrics("proj", "redis", fake)

m.Emit([]string{"hotel_fv"}, 100.0, true)

errorCalls := filterCalls(fake.calls, FVReadErrorsMetric)
assert.Len(t, errorCalls, 1)
assert.Equal(t, int64(1), errorCalls[0].value)
assert.Contains(t, errorCalls[0].tags, "feature_view:hotel_fv")
}

func TestFVMetrics_NoErrorsWhenSuccess(t *testing.T) {
fake := &fakeStatsdClient{}
m := NewFeatureViewReadMetrics("proj", "redis", fake)

m.Emit([]string{"hotel_fv"}, 50.0, false)

errorCalls := filterCalls(fake.calls, FVReadErrorsMetric)
assert.Len(t, errorCalls, 0)
}

func TestFVMetrics_MultipleFeatureViews(t *testing.T) {
fake := &fakeStatsdClient{}
m := NewFeatureViewReadMetrics("proj", "valkey", fake)

m.Emit([]string{"hotel_fv", "user_fv", "booking_fv"}, 75.0, false)

// 3 latency distributions
assert.Len(t, fake.distCalls, 3)

// 3 request counts
requestCalls := filterCalls(fake.calls, FVReadRequestsMetric)
assert.Len(t, requestCalls, 3)

// All have same latency
for _, dc := range fake.distCalls {
assert.Equal(t, 75.0, dc.value)
}

// Check each FV is tagged
fvTags := make(map[string]bool)
for _, dc := range fake.distCalls {
fvTags[findTag(dc.tags, "feature_view:")] = true
}
assert.True(t, fvTags["hotel_fv"])
assert.True(t, fvTags["user_fv"])
assert.True(t, fvTags["booking_fv"])
}

func TestFVMetrics_NilClient(t *testing.T) {
m := NewFeatureViewReadMetrics("proj", "redis", nil)
assert.Nil(t, m)
}

func TestFVMetrics_NilSafe(t *testing.T) {
var m *FeatureViewReadMetrics
m.Emit([]string{"fv"}, 10.0, false) // should not panic
}

func TestFVMetrics_Sampling(t *testing.T) {
os.Setenv("FEAST_METRICS_SAMPLE_RATE", "0.5")
defer os.Unsetenv("FEAST_METRICS_SAMPLE_RATE")

fake := &fakeStatsdClient{}
m := NewFeatureViewReadMetrics("proj", "redis", fake)

assert.Equal(t, 0.5, m.sampleRate)

// Emit many times — should skip roughly half
emitted := 0
for i := 0; i < 100; i++ {
fake.distCalls = nil
m.Emit([]string{"fv"}, 10.0, false)
if len(fake.distCalls) > 0 {
emitted++
}
}

// With 50% sampling, should emit roughly 50 times (allow wide margin)
assert.Greater(t, emitted, 20)
assert.Less(t, emitted, 80)
}

func TestIsFVMetricsEnabled(t *testing.T) {
os.Unsetenv("ENABLE_FV_LEVEL_METRICS")
os.Unsetenv("ENABLE_MISSING_KEY_METRICS")
assert.False(t, IsFVMetricsEnabled())

os.Setenv("ENABLE_FV_LEVEL_METRICS", "true")
assert.True(t, IsFVMetricsEnabled())
os.Unsetenv("ENABLE_FV_LEVEL_METRICS")

os.Setenv("ENABLE_MISSING_KEY_METRICS", "true")
assert.True(t, IsFVMetricsEnabled())
os.Unsetenv("ENABLE_MISSING_KEY_METRICS")
}
24 changes: 22 additions & 2 deletions go/internal/feast/metrics/lookup_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@ import (
"github.com/feast-dev/feast/go/protos/feast/serving"
)

const (
LookupNotFoundMetric = "mlpfs.featureserver.feature_lookup_not_found"
LookupNullOrExpiredMetric = "mlpfs.featureserver.feature_lookup_null_or_expired"
LookupRequestsMetric = "mlpfs.featureserver.feature_lookup_requests"
)

// extractFeatureView extracts the feature view name from a full feature name.
// Feature names follow the format: feature_view__feature_name
// Example: "hotel_fv__price" -> "hotel_fv"
Expand All @@ -24,6 +30,7 @@ func extractFeatureView(featureName string) string {
type LookupMetricsAggregator struct {
notFound map[string]int64
nullOrExpired map[string]int64
totalByFV map[string]int64
project string
onlineStore string
client StatsdClient
Expand Down Expand Up @@ -51,6 +58,7 @@ func NewLookupMetricsAggregator(
return &LookupMetricsAggregator{
notFound: make(map[string]int64),
nullOrExpired: make(map[string]int64),
totalByFV: make(map[string]int64),
project: project,
onlineStore: onlineStore,
client: client,
Expand All @@ -62,6 +70,7 @@ func (m *LookupMetricsAggregator) Record(featureID string, status serving.FieldS
if m == nil {
return
}
m.totalByFV[extractFeatureView(featureID)]++
switch status {
case serving.FieldStatus_NOT_FOUND:
m.notFound[featureID]++
Expand Down Expand Up @@ -123,7 +132,7 @@ func (m *LookupMetricsAggregator) Emit() {
copy(tags, baseTags)
tags[len(baseTags)] = "feature:" + featureID
tags[len(baseTags)+1] = "feature_view:" + extractFeatureView(featureID)
m.client.Count("mlpfs.featureserver.feature_lookup_not_found", adjustedCount, tags, 1.0)
m.client.Count(LookupNotFoundMetric, adjustedCount, tags, 1.0)
}

for featureID, count := range m.nullOrExpired {
Expand All @@ -135,6 +144,17 @@ func (m *LookupMetricsAggregator) Emit() {
copy(tags, baseTags)
tags[len(baseTags)] = "feature:" + featureID
tags[len(baseTags)+1] = "feature_view:" + extractFeatureView(featureID)
m.client.Count("mlpfs.featureserver.feature_lookup_null_or_expired", adjustedCount, tags, 1.0)
m.client.Count(LookupNullOrExpiredMetric, adjustedCount, tags, 1.0)
}

for fvName, count := range m.totalByFV {
if count == 0 {
continue
}
adjustedCount := int64(float64(count) * multiplier)
tags := make([]string, len(baseTags)+1)
copy(tags, baseTags)
tags[len(baseTags)] = "feature_view:" + fvName
m.client.Count(LookupRequestsMetric, adjustedCount, tags, 1.0)
}
}
Loading
Loading