Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 97 additions & 23 deletions api/v1_metrics_apps_unique.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,19 @@ type AppUniqueMetric struct {
UniqueCount int64 `json:"unique_count"`
}

type TimestampedAppUniqueMetric struct {
AppUniqueMetric
Timestamp string `json:"timestamp"`
}

type GetMetricsAppsUniqueQueryParams struct {
Limit int `query:"limit" default:"100" validate:"min=1,max=1000"`
BucketSize string `query:"bucket_size" default:"day" validate:"oneof=day week month year"`
AppName string `query:"app_name"` // Optional: filter by app name or identifier
}

func (app *ApiServer) v1MetricsAppsUnique(c *fiber.Ctx) error {
queryParams := GetMetricsAppsQueryParams{}
queryParams := GetMetricsAppsUniqueQueryParams{}
if err := app.ParseAndValidateQueryParams(c, &queryParams); err != nil {
return err
}
Expand All @@ -25,6 +36,10 @@ func (app *ApiServer) v1MetricsAppsUnique(c *fiber.Ctx) error {
}

var dateRangeClause string
var bucketClause string
var orderBy string
var appNameFilter string

switch routeParams.TimeRange {
case "week":
dateRangeClause = "date >= CURRENT_DATE - INTERVAL '7 days' AND date < CURRENT_DATE"
Expand All @@ -36,8 +51,33 @@ func (app *ApiServer) v1MetricsAppsUnique(c *fiber.Ctx) error {
dateRangeClause = "date < CURRENT_DATE"
}

switch queryParams.BucketSize {
case "week":
bucketClause = "date_trunc('week', date)::date::text AS bucket"
orderBy = "bucket ASC, api_metrics_apps_unique.app_name"
// Exclude current incomplete week
dateRangeClause = dateRangeClause + " AND date_trunc('week', date) < date_trunc('week', CURRENT_DATE)"
case "month":
bucketClause = "date_trunc('month', date)::date::text AS bucket"
orderBy = "bucket ASC, api_metrics_apps_unique.app_name"
// Exclude current incomplete month
dateRangeClause = dateRangeClause + " AND date_trunc('month', date) < date_trunc('month', CURRENT_DATE)"
default: // day
bucketClause = "date::text AS bucket"
orderBy = "bucket ASC, api_metrics_apps_unique.app_name"
}

// Add app_name filter if provided
// Filter by identifier (app_name column) which could be api_key or app_name
if queryParams.AppName != "" {
appNameFilter = " AND api_metrics_apps_unique.app_name = $1"
}

whereClause := dateRangeClause + appNameFilter

sql := fmt.Sprintf(`
SELECT
%s,
COALESCE(developer_apps.name, api_metrics_apps_unique.app_name) AS name,
api_metrics_apps_unique.app_name AS identifier,
hll_sketch,
Expand All @@ -46,16 +86,23 @@ func (app *ApiServer) v1MetricsAppsUnique(c *fiber.Ctx) error {
FROM api_metrics_apps_unique
LEFT JOIN developer_apps ON developer_apps.address = api_metrics_apps_unique.app_name
WHERE %s
ORDER BY api_metrics_apps_unique.app_name
`, dateRangeClause)

rows, err := app.pool.Query(c.Context(), sql)
ORDER BY %s
`, bucketClause, whereClause, orderBy)

var rows pgx.Rows
var err error
if queryParams.AppName != "" {
rows, err = app.pool.Query(c.Context(), sql, queryParams.AppName)
} else {
rows, err = app.pool.Query(c.Context(), sql)
}
if err != nil {
return fmt.Errorf("failed to query app unique metrics: %w", err)
}
defer rows.Close()

type metricRow struct {
Bucket string `db:"bucket"`
Name string `db:"name"`
Identifier string `db:"identifier"`
HllSketch []byte `db:"hll_sketch"`
Expand All @@ -68,41 +115,68 @@ func (app *ApiServer) v1MetricsAppsUnique(c *fiber.Ctx) error {
return fmt.Errorf("failed to collect app unique metrics: %w", err)
}

appMap := make(map[string][]hll.SketchRow)
// Group rows by (bucket, identifier) tuple
type bucketAppKey struct {
Bucket string
Identifier string
}
bucketAppMap := make(map[bucketAppKey][]hll.SketchRow)
nameMap := make(map[string]string) // Maps identifier to display name
bucketOrder := []string{}
seenBuckets := make(map[string]bool)

for _, row := range metricRows {
appMap[row.Identifier] = append(appMap[row.Identifier], hll.SketchRow{
key := bucketAppKey{Bucket: row.Bucket, Identifier: row.Identifier}
bucketAppMap[key] = append(bucketAppMap[key], hll.SketchRow{
SketchData: row.HllSketch,
UniqueCount: row.UniqueCount,
TotalCount: row.TotalCount,
})
// Store the display name for this identifier
nameMap[row.Identifier] = row.Name
}

result := make([]AppUniqueMetric, 0, len(appMap))
for identifier, sketchRows := range appMap {
merged, err := hll.MergeSketches(sketchRows, 12)
if err != nil {
return fmt.Errorf("failed to merge sketches for identifier %s: %w", identifier, err)
if !seenBuckets[row.Bucket] {
bucketOrder = append(bucketOrder, row.Bucket)
seenBuckets[row.Bucket] = true
}
}

// Use the display name from the join, or fall back to identifier
displayName := nameMap[identifier]
if displayName == "" {
displayName = identifier
// Merge sketches for each (bucket, app) combination
result := make([]TimestampedAppUniqueMetric, 0)
for _, bucket := range bucketOrder {
for key, sketchRows := range bucketAppMap {
if key.Bucket != bucket {
continue
}
merged, err := hll.MergeSketches(sketchRows, 12)
if err != nil {
return fmt.Errorf("failed to merge sketches for bucket %s, identifier %s: %w", bucket, key.Identifier, err)
}

// Use the display name from the join, or fall back to identifier
displayName := nameMap[key.Identifier]
if displayName == "" {
displayName = key.Identifier
}

result = append(result, TimestampedAppUniqueMetric{
Timestamp: bucket,
AppUniqueMetric: AppUniqueMetric{
Name: displayName,
UniqueCount: int64(merged.UniqueCount),
},
})
}

result = append(result, AppUniqueMetric{
Name: displayName,
UniqueCount: int64(merged.UniqueCount),
})
}

// Sort by timestamp then by unique count descending
sort.Slice(result, func(i, j int) bool {
if result[i].Timestamp != result[j].Timestamp {
return result[i].Timestamp < result[j].Timestamp
}
return result[i].UniqueCount > result[j].UniqueCount
})

// Apply limit
if len(result) > queryParams.Limit {
result = result[:queryParams.Limit]
}
Expand Down
38 changes: 29 additions & 9 deletions api/v1_metrics_apps_unique_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,27 +11,31 @@ func TestMetricsAppsUnique(t *testing.T) {
app := testAppWithFixtures(t)

tests := []struct {
name string
timeRange string
limit int
name string
timeRange string
bucketSize string
limit int
}{
{"all_time", "all_time", 100},
{"month", "month", 50},
{"week", "week", 25},
{"year", "year", 100},
{"all_time_day", "all_time", "day", 100},
{"all_time_week", "all_time", "week", 100},
{"all_time_month", "all_time", "month", 100},
{"month_day", "month", "day", 50},
{"week_day", "week", "day", 25},
{"year_month", "year", "month", 100},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
url := "/v1/metrics/aggregates/apps/" + tt.timeRange + "/unique"
url := "/v1/metrics/aggregates/apps/" + tt.timeRange + "/unique?bucket_size=" + tt.bucketSize
if tt.limit != 100 {
url += fmt.Sprintf("?limit=%d", tt.limit)
url += fmt.Sprintf("&limit=%d", tt.limit)
}

var response struct {
Data []struct {
Name string `json:"name"`
UniqueCount int64 `json:"unique_count"`
Timestamp string `json:"timestamp"`
} `json:"data"`
}

Expand All @@ -42,6 +46,22 @@ func TestMetricsAppsUnique(t *testing.T) {
for _, appMetric := range response.Data {
assert.NotEmpty(t, appMetric.Name, "App name should not be empty")
assert.GreaterOrEqual(t, appMetric.UniqueCount, int64(0), "Unique count should be non-negative")
assert.NotEmpty(t, appMetric.Timestamp, "Timestamp should not be empty")
}

// Verify data is ordered chronologically
if len(response.Data) > 1 {
for i := 1; i < len(response.Data); i++ {
if response.Data[i-1].Timestamp == response.Data[i].Timestamp {
// Same timestamp, verify ordering by unique count descending
assert.GreaterOrEqual(t, response.Data[i-1].UniqueCount, response.Data[i].UniqueCount,
"Data with same timestamp should be ordered by unique count descending")
} else {
// Different timestamps, verify chronological order
assert.LessOrEqual(t, response.Data[i-1].Timestamp, response.Data[i].Timestamp,
"Data should be ordered by timestamp ascending")
}
}
}

assert.LessOrEqual(t, len(response.Data), tt.limit, "Response should not exceed limit")
Expand Down