Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions api/dbv1/models.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

187 changes: 174 additions & 13 deletions api/metrics_middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@ package api
import (
"context"
"runtime"
"sync"
"time"

"api.audius.co/hll"
"api.audius.co/utils"
"github.com/axiomhq/hyperloglog"
"github.com/gofiber/fiber/v2"
fiberutils "github.com/gofiber/fiber/v2/utils"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/maypok86/otter"
"go.uber.org/zap"
Expand All @@ -22,9 +25,11 @@ type MetricsCollector struct {
flushTimer *time.Ticker
stopCh chan struct{}

appMetrics otter.Cache[string, *AppMetricsData]
routeMetrics otter.Cache[string, *RouteMetricsData]
countMetrics *hll.HLL
appMetrics otter.Cache[string, *AppMetricsData]
routeMetrics otter.Cache[string, *RouteMetricsData]
countMetrics *hll.HLL
appUniqueMetrics map[string]*hll.HLL
appUniqueMu sync.RWMutex
}

// AppMetricsData holds request count data for a specific app identifier
Expand Down Expand Up @@ -75,13 +80,14 @@ func NewMetricsCollector(logger *zap.Logger, writePool *pgxpool.Pool) *MetricsCo
}

collector := &MetricsCollector{
logger: logger.With(zap.String("component", "MetricsCollector")),
writePool: writePool,
appMetrics: appMetricsCache,
routeMetrics: routeMetricsCache,
countMetrics: countMetricsAggregator,
flushTimer: time.NewTicker(flushTimer),
stopCh: make(chan struct{}),
logger: logger.With(zap.String("component", "MetricsCollector")),
writePool: writePool,
appMetrics: appMetricsCache,
routeMetrics: routeMetricsCache,
countMetrics: countMetricsAggregator,
appUniqueMetrics: make(map[string]*hll.HLL),
flushTimer: time.NewTicker(flushTimer),
stopCh: make(chan struct{}),
}

// Start the flush routine
Expand All @@ -97,11 +103,14 @@ func (rmc *MetricsCollector) Middleware() fiber.Handler {

apiKey := c.Query("api_key")
appName := c.Query("app_name")
ipAddress := utils.GetIP(c)

// Only record if we have some identifier
if apiKey != "" || appName != "" {
rmc.recordAppMetric(
fiberutils.CopyString(apiKey),
fiberutils.CopyString(appName),
ipAddress,
)
}

Expand All @@ -115,8 +124,7 @@ func (rmc *MetricsCollector) Middleware() fiber.Handler {
)
}

// Extract IP address for unique tracking
ipAddress := utils.GetIP(c)
// Extract IP address for unique tracking (global)
if ipAddress != "" {
rmc.recordCountMetric(ipAddress)
}
Expand All @@ -126,7 +134,7 @@ func (rmc *MetricsCollector) Middleware() fiber.Handler {
}

// Increments the request count for a given app identifier
func (rmc *MetricsCollector) recordAppMetric(apiKey, appName string) {
func (rmc *MetricsCollector) recordAppMetric(apiKey, appName, ipAddress string) {
// Prioritize api_key over app_name as identifier
identifier := apiKey
if identifier == "" {
Expand All @@ -147,6 +155,26 @@ func (rmc *MetricsCollector) recordAppMetric(apiKey, appName string) {
data.RequestCount++
data.LastSeen = lastSeen
rmc.appMetrics.Set(identifier, data)

// Record IP address to app-specific HLL sketch for unique user tracking
if ipAddress != "" {
rmc.appUniqueMu.Lock()
appHLL, exists := rmc.appUniqueMetrics[identifier]
if !exists {
// Create new HLL instance for this app
var err error
appHLL, err = hll.NewHLL(rmc.logger, rmc.writePool, "api_metrics_apps_unique", 12)
if err != nil {
rmc.logger.Error("Failed to create app unique HLL", zap.Error(err), zap.String("identifier", identifier))
rmc.appUniqueMu.Unlock()
return
}
rmc.appUniqueMetrics[identifier] = appHLL
}
rmc.appUniqueMu.Unlock()

appHLL.Record(ipAddress)
}
}

// Increments the request count for a given route pattern
Expand Down Expand Up @@ -210,6 +238,23 @@ func (rmc *MetricsCollector) flushMetrics() {
// Get HLL sketch copy
currentHLL, currentTotalRequests := rmc.countMetrics.GetSketchCopy()

type AppUniqueData struct {
Identifier string
Sketch *hyperloglog.Sketch
TotalCount int64
}
appUniqueData := make(map[string]*AppUniqueData)
rmc.appUniqueMu.Lock()
for identifier, appHLL := range rmc.appUniqueMetrics {
sketchCopy, totalCount := appHLL.GetSketchCopy()
appUniqueData[identifier] = &AppUniqueData{
Identifier: identifier,
Sketch: sketchCopy,
TotalCount: totalCount,
}
}
rmc.appUniqueMu.Unlock()

// Begin transaction
tx, err := rmc.writePool.Begin(ctx)
if err != nil {
Expand Down Expand Up @@ -295,6 +340,122 @@ func (rmc *MetricsCollector) flushMetrics() {
}
}

// Flush app unique metrics
if len(appUniqueData) > 0 {
appUniqueUpserted := 0
for _, data := range appUniqueData {
if data.Sketch == nil {
continue
}

// Clone the sketch to avoid modifying the original
newSketch := data.Sketch.Clone()
if newSketch == nil {
continue
}

var existingSketchData []byte
var existingCount int64
query := `
SELECT hll_sketch, total_count
FROM api_metrics_apps_unique
WHERE date = $1 AND app_name = $2
FOR UPDATE`
err = tx.QueryRow(ctx, query, date, data.Identifier).Scan(&existingSketchData, &existingCount)

if err != nil && err != pgx.ErrNoRows {
rmc.logger.Error("Failed to query existing app unique metrics",
zap.Error(err),
zap.String("identifier", data.Identifier))
continue
}

var finalSketchData []byte
var finalTotalCount int64
var finalUniqueCount int64

if err == pgx.ErrNoRows {
// No existing row - use new sketch as-is
var marshalErr error
finalSketchData, marshalErr = newSketch.MarshalBinary()
if marshalErr != nil {
rmc.logger.Error("Failed to marshal new sketch",
zap.Error(marshalErr),
zap.String("identifier", data.Identifier))
continue
}
finalTotalCount = data.TotalCount
finalUniqueCount = int64(newSketch.Estimate())
} else {
// Row exists - merge sketches
if existingSketchData != nil {
// Merge with existing sketch
existingSketch, unmarshalErr := hll.UnmarshalSketch(existingSketchData, 12)
if unmarshalErr != nil {
rmc.logger.Error("Failed to unmarshal existing sketch",
zap.Error(unmarshalErr),
zap.String("identifier", data.Identifier))
continue
}

if mergeErr := existingSketch.Merge(newSketch); mergeErr != nil {
rmc.logger.Error("Failed to merge sketches",
zap.Error(mergeErr),
zap.String("identifier", data.Identifier))
continue
}

var marshalErr error
finalSketchData, marshalErr = existingSketch.MarshalBinary()
if marshalErr != nil {
rmc.logger.Error("Failed to marshal merged sketch",
zap.Error(marshalErr),
zap.String("identifier", data.Identifier))
continue
}
finalUniqueCount = int64(existingSketch.Estimate())
} else {
// Row exists but sketch is NULL - use new sketch
var marshalErr error
finalSketchData, marshalErr = newSketch.MarshalBinary()
if marshalErr != nil {
rmc.logger.Error("Failed to marshal new sketch",
zap.Error(marshalErr),
zap.String("identifier", data.Identifier))
continue
}
finalUniqueCount = int64(newSketch.Estimate())
}
finalTotalCount = existingCount + data.TotalCount
}

// Use INSERT ... ON CONFLICT for atomic upsert (same pattern as api_metrics_apps)
upsertQuery := `
INSERT INTO api_metrics_apps_unique (date, app_name, hll_sketch, total_count, unique_count, created_at, updated_at)
VALUES ($1, $2, $3, $4, $5, NOW(), NOW())
ON CONFLICT (date, app_name)
DO UPDATE SET
hll_sketch = EXCLUDED.hll_sketch,
total_count = EXCLUDED.total_count,
unique_count = EXCLUDED.unique_count,
updated_at = NOW()`

_, err = tx.Exec(ctx, upsertQuery, date, data.Identifier, finalSketchData, finalTotalCount, finalUniqueCount)
if err != nil {
rmc.logger.Error("Failed to upsert app unique metrics",
zap.Error(err),
zap.String("identifier", data.Identifier))
continue
}

appUniqueUpserted++
}

rmc.logger.Debug("Flushed app unique metrics",
zap.Int("upserted", appUniqueUpserted),
zap.Int("total", len(appUniqueData)))
}

// Commit transaction
if err := tx.Commit(ctx); err != nil {
rmc.logger.Error("Failed to commit metrics transaction", zap.Error(err))
Expand Down
1 change: 1 addition & 0 deletions api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,7 @@ func NewApiServer(config config.Config) *ApiServer {
g.Get("/metrics/total_artists", app.v1MetricsTotalArtists)
g.Get("/metrics/total_wallets", app.v1MetricsTotalWallets)
g.Get("/metrics/aggregates/apps/:time_range", app.v1MetricsApps)
g.Get("/metrics/aggregates/apps/:time_range/unique", app.v1MetricsAppsUnique)
g.Get("/metrics/aggregates/routes/:time_range", app.v1MetricsRoutes)
g.Get("/metrics/aggregates/routes/trailing/:time_range", app.v1MetricsRoutesTrailing)

Expand Down
Loading