Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
* [ENHANCEMENT] Ingester: Instrument Ingester CPU profile with userID for read APIs. #7184
* [ENHANCEMENT] Ingester: Add fetch timeout for Ingester expanded postings cache. #7185
* [ENHANCEMENT] Ingester: Add feature flag to collect metrics of how expensive an unoptimized regex matcher is and new limits to protect Ingester query path against expensive unoptimized regex matchers. #7194 #7210
* [ENHANCEMENT] Querier: Add logging for api calls in querier during an OOMKill. #7216
Comment thread
yeya24 marked this conversation as resolved.
Outdated
* [BUGFIX] Ring: Change DynamoDB KV to retry indefinitely for WatchKey. #7088
* [BUGFIX] Ruler: Add XFunctions validation support. #7111
* [BUGFIX] Querier: propagate Prometheus info annotations in protobuf responses. #7132
Expand Down
28 changes: 20 additions & 8 deletions pkg/api/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/cortexproject/cortex/pkg/api/queryapi"
"github.com/cortexproject/cortex/pkg/engine"
"github.com/cortexproject/cortex/pkg/querier"
"github.com/cortexproject/cortex/pkg/querier/api_tracker"
"github.com/cortexproject/cortex/pkg/querier/codec"
"github.com/cortexproject/cortex/pkg/querier/stats"
"github.com/cortexproject/cortex/pkg/util"
Expand Down Expand Up @@ -285,6 +286,17 @@ func NewQuerierHandler(

queryAPI := queryapi.NewQueryAPI(engine, translateSampleAndChunkQueryable, statsRenderer, logger, codecs, corsOrigin)

apiTracker := api_tracker.NewAPITracker(querierCfg.ActiveQueryTrackerDir, querierCfg.MaxConcurrent, util_log.GoKitLogToSlog(logger))
var apiHandler http.Handler
var legacyAPIHandler http.Handler
if apiTracker != nil {
apiHandler = api_tracker.NewAPIWrapper(promRouter, apiTracker)
legacyAPIHandler = api_tracker.NewAPIWrapper(legacyPromRouter, apiTracker)
} else {
apiHandler = promRouter
legacyAPIHandler = legacyPromRouter
}

// TODO(gotjosh): This custom handler is temporary until we're able to vendor the changes in:
// https://github.com/prometheus/prometheus/pull/7125/files
router.Path(path.Join(prefix, "/api/v1/metadata")).Handler(querier.MetadataHandler(metadataQuerier))
Expand All @@ -295,10 +307,10 @@ func NewQuerierHandler(
router.Path(path.Join(prefix, "/api/v1/query_exemplars")).Methods("GET", "POST").Handler(promRouter)
router.Path(path.Join(prefix, "/api/v1/format_query")).Methods("GET", "POST").Handler(promRouter)
router.Path(path.Join(prefix, "/api/v1/parse_query")).Methods("GET", "POST").Handler(promRouter)
router.Path(path.Join(prefix, "/api/v1/labels")).Methods("GET", "POST").Handler(promRouter)
router.Path(path.Join(prefix, "/api/v1/label/{name}/values")).Methods("GET").Handler(promRouter)
router.Path(path.Join(prefix, "/api/v1/series")).Methods("GET", "POST", "DELETE").Handler(promRouter)
router.Path(path.Join(prefix, "/api/v1/metadata")).Methods("GET").Handler(promRouter)
router.Path(path.Join(prefix, "/api/v1/labels")).Methods("GET", "POST").Handler(apiHandler)
router.Path(path.Join(prefix, "/api/v1/label/{name}/values")).Methods("GET").Handler(apiHandler)
router.Path(path.Join(prefix, "/api/v1/series")).Methods("GET", "POST", "DELETE").Handler(apiHandler)
router.Path(path.Join(prefix, "/api/v1/metadata")).Methods("GET").Handler(apiHandler)

// TODO(gotjosh): This custom handler is temporary until we're able to vendor the changes in:
// https://github.com/prometheus/prometheus/pull/7125/files
Expand All @@ -310,10 +322,10 @@ func NewQuerierHandler(
router.Path(path.Join(legacyPrefix, "/api/v1/query_exemplars")).Methods("GET", "POST").Handler(legacyPromRouter)
router.Path(path.Join(legacyPrefix, "/api/v1/format_query")).Methods("GET", "POST").Handler(legacyPromRouter)
router.Path(path.Join(legacyPrefix, "/api/v1/parse_query")).Methods("GET", "POST").Handler(legacyPromRouter)
router.Path(path.Join(legacyPrefix, "/api/v1/labels")).Methods("GET", "POST").Handler(legacyPromRouter)
router.Path(path.Join(legacyPrefix, "/api/v1/label/{name}/values")).Methods("GET").Handler(legacyPromRouter)
router.Path(path.Join(legacyPrefix, "/api/v1/series")).Methods("GET", "POST", "DELETE").Handler(legacyPromRouter)
router.Path(path.Join(legacyPrefix, "/api/v1/metadata")).Methods("GET").Handler(legacyPromRouter)
router.Path(path.Join(legacyPrefix, "/api/v1/labels")).Methods("GET", "POST").Handler(legacyAPIHandler)
router.Path(path.Join(legacyPrefix, "/api/v1/label/{name}/values")).Methods("GET").Handler(legacyAPIHandler)
router.Path(path.Join(legacyPrefix, "/api/v1/series")).Methods("GET", "POST", "DELETE").Handler(legacyAPIHandler)
router.Path(path.Join(legacyPrefix, "/api/v1/metadata")).Methods("GET").Handler(legacyAPIHandler)

if cfg.buildInfoEnabled {
router.Path(path.Join(prefix, "/api/v1/status/buildinfo")).Methods("GET").Handler(promRouter)
Expand Down
202 changes: 202 additions & 0 deletions pkg/querier/api_tracker/api_tracker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
package api_tracker

import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"log/slog"
"os"
"path/filepath"
"strings"
"time"

"github.com/edsrzf/mmap-go"
)

type APITracker struct {
Comment thread
yeya24 marked this conversation as resolved.
Outdated
mmappedFile []byte
getNextIndex chan int
logger *slog.Logger
closer io.Closer
maxConcurrent int
}

var _ io.Closer = &APITracker{}

const (
apiEntrySize int = 1000
)

func parseAPIBrokenJSON(brokenJSON []byte) (string, bool) {
apis := strings.ReplaceAll(string(brokenJSON), "\x00", "")
if len(apis) > 0 {
apis = apis[:len(apis)-1] + "]"
}

if len(apis) <= 1 {
return "[]", false
}

return apis, true
}

func logUnfinishedAPIs(filename string, filesize int, logger *slog.Logger) {
if _, err := os.Stat(filename); err == nil {
fd, err := os.Open(filename)
if err != nil {
logger.Error("Failed to open API log file", "err", err)
return
}
defer fd.Close()

brokenJSON := make([]byte, filesize)
_, err = fd.Read(brokenJSON)
if err != nil {
logger.Error("Failed to read API log file", "err", err)
return
}

apis, apisExist := parseAPIBrokenJSON(brokenJSON)
if !apisExist {
return
}
logger.Info("These API calls didn't finish in prometheus' last run:", "apis", apis)
}
}

type mmappedAPIFile struct {
f io.Closer
m mmap.MMap
}

func (f *mmappedAPIFile) Close() error {
err := f.m.Unmap()
if err != nil {
err = fmt.Errorf("mmappedAPIFile: unmapping: %w", err)
}
if fErr := f.f.Close(); fErr != nil {
return errors.Join(fmt.Errorf("close mmappedAPIFile.f: %w", fErr), err)
}

return err
}

func getAPIMMappedFile(filename string, filesize int, logger *slog.Logger) ([]byte, io.Closer, error) {
file, err := os.OpenFile(filename, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0o666)
if err != nil {
absPath, pathErr := filepath.Abs(filename)
if pathErr != nil {
absPath = filename
}
logger.Error("Error opening API log file", "file", absPath, "err", err)
return nil, nil, err
}

err = file.Truncate(int64(filesize))
if err != nil {
file.Close()
logger.Error("Error setting filesize.", "filesize", filesize, "err", err)
return nil, nil, err
}

fileAsBytes, err := mmap.Map(file, mmap.RDWR, 0)
if err != nil {
file.Close()
logger.Error("Failed to mmap", "file", filename, "Attempted size", filesize, "err", err)
return nil, nil, err
}

return fileAsBytes, &mmappedAPIFile{f: file, m: fileAsBytes}, err
}

func NewAPITracker(localStoragePath string, maxConcurrent int, logger *slog.Logger) *APITracker {
if localStoragePath == "" {
return nil
}

err := os.MkdirAll(localStoragePath, 0o777)
if err != nil {
logger.Error("Failed to create directory for logging active APIs")
return nil
}

filename, filesize := filepath.Join(localStoragePath, "apis.active"), 1+maxConcurrent*apiEntrySize
logUnfinishedAPIs(filename, filesize, logger)

fileAsBytes, closer, err := getAPIMMappedFile(filename, filesize, logger)
if err != nil {
logger.Error("Unable to create mmap-ed active API log", "err", err)
return nil
}

copy(fileAsBytes, "[")
apiTracker := &APITracker{
mmappedFile: fileAsBytes,
closer: closer,
getNextIndex: make(chan int, maxConcurrent),
logger: logger,
maxConcurrent: maxConcurrent,
}

apiTracker.generateIndices(maxConcurrent)

return apiTracker
}

func newAPIJSONEntry(entryMap map[string]interface{}, logger *slog.Logger) []byte {
jsonEntry, err := json.Marshal(entryMap)
if err != nil {
logger.Error("Cannot create json of API entry", "entry", entryMap)
return []byte{}
}

if len(jsonEntry) > apiEntrySize {
newEntryMap := make(map[string]interface{})
newEntryMap["timestamp_sec"] = time.Now().Unix()
jsonEntry, err = json.Marshal(newEntryMap)
if err != nil {
logger.Error("Cannot create json of API entry", "entry", newEntryMap)
return []byte{}
}
}

return jsonEntry
}

func (tracker *APITracker) generateIndices(maxConcurrent int) {
for i := 0; i < maxConcurrent; i++ {
tracker.getNextIndex <- 1 + (i * apiEntrySize)
}
}

func (tracker *APITracker) Delete(insertIndex int) {
copy(tracker.mmappedFile[insertIndex:], strings.Repeat("\x00", apiEntrySize))
tracker.getNextIndex <- insertIndex
}

func (tracker *APITracker) Insert(ctx context.Context, entryMap map[string]interface{}) (int, error) {
select {
case i := <-tracker.getNextIndex:
fileBytes := tracker.mmappedFile
entry := newAPIJSONEntry(entryMap, tracker.logger)
start, end := i, i+apiEntrySize

copy(fileBytes[start:], entry)
copy(fileBytes[end-1:], ",")
return i, nil
case <-ctx.Done():
return 0, ctx.Err()
}
}

func (tracker *APITracker) Close() error {
if tracker == nil || tracker.closer == nil {
return nil
}
if err := tracker.closer.Close(); err != nil {
return fmt.Errorf("close APITracker.closer: %w", err)
}
return nil
}
129 changes: 129 additions & 0 deletions pkg/querier/api_tracker/api_tracker_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
package api_tracker

import (
"context"
"log/slog"
"net/http"
"net/http/httptest"
"os"
"path/filepath"
"strings"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestAPITracker(t *testing.T) {
tmpDir, err := os.MkdirTemp("", "api-tracker-test")
require.NoError(t, err)
defer os.RemoveAll(tmpDir)

logger := slog.New(slog.NewTextHandler(os.Stdout, nil))
tracker := NewAPITracker(tmpDir, 10, logger)
require.NotNil(t, tracker)
defer tracker.Close()

ctx := context.Background()
insertIndex, err := tracker.Insert(ctx, make(map[string]interface{}))
require.NoError(t, err)
assert.Greater(t, insertIndex, 0)

tracker.Delete(insertIndex)
}

func TestAPITrackerLogUnfinished(t *testing.T) {
tmpDir, err := os.MkdirTemp("", "api-tracker-test")
require.NoError(t, err)
defer os.RemoveAll(tmpDir)

filename := filepath.Join(tmpDir, "apis.active")
content := `[{"path":"/api/v1/series","method":"GET","timestamp_sec":1234567890},`
err = os.WriteFile(filename, []byte(content), 0644)
require.NoError(t, err)

var logOutput strings.Builder
logger := slog.New(slog.NewTextHandler(&logOutput, nil))

tracker := NewAPITracker(tmpDir, 10, logger)
require.NotNil(t, tracker)
defer tracker.Close()
output := logOutput.String()
assert.Contains(t, output, "These API calls didn't finish in prometheus' last run")
assert.Contains(t, output, "/api/v1/series")
}

func TestAPITrackerNilDirectory(t *testing.T) {
logger := slog.New(slog.NewTextHandler(os.Stdout, nil))
tracker := NewAPITracker("", 10, logger)
assert.Nil(t, tracker)
}

func TestAPIWrapper(t *testing.T) {
tmpDir, err := os.MkdirTemp("", "api-wrapper-test")
require.NoError(t, err)
defer os.RemoveAll(tmpDir)

logger := slog.New(slog.NewTextHandler(os.Stdout, nil))
tracker := NewAPITracker(tmpDir, 10, logger)
require.NotNil(t, tracker)
defer tracker.Close()

handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
})

wrapper := NewAPIWrapper(handler, tracker)

req := httptest.NewRequest("GET", "/api/v1/series?match[]=up", nil)
rr := httptest.NewRecorder()
wrapper.ServeHTTP(rr, req)
assert.Equal(t, http.StatusOK, rr.Code)
}

func TestAPIWrapperNilTracker(t *testing.T) {
handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
})

wrapper := NewAPIWrapper(handler, nil)

req := httptest.NewRequest("GET", "/api/v1/series?match[]=up", nil)
rr := httptest.NewRecorder()
wrapper.ServeHTTP(rr, req)
assert.Equal(t, http.StatusOK, rr.Code)
}

func TestAPITrackerAboveMaxConcurrency(t *testing.T) {
tmpDir, err := os.MkdirTemp("", "api-tracker-test")
require.NoError(t, err)
defer os.RemoveAll(tmpDir)

logger := slog.New(slog.NewTextHandler(os.Stdout, nil))

tracker := NewAPITracker(tmpDir, 2, logger)
require.NotNil(t, tracker)
defer tracker.Close()
ctx := context.Background()

index1, err := tracker.Insert(ctx, make(map[string]interface{}))
require.NoError(t, err)

index2, err := tracker.Insert(ctx, make(map[string]interface{}))
require.NoError(t, err)

ctx, cancel := context.WithTimeout(ctx, 100*time.Millisecond)
defer cancel()

_, err = tracker.Insert(ctx, make(map[string]interface{}))
assert.Error(t, err) // Should timeout

tracker.Delete(index1)
ctx = context.Background()
index3, err := tracker.Insert(ctx, make(map[string]interface{}))
require.NoError(t, err)

tracker.Delete(index2)
tracker.Delete(index3)
}
Loading
Loading