Skip to content

Commit 3567f6f

Browse files
committed
HATracker: Add a local cache warmup on start
Signed-off-by: SungJin1212 <tjdwls1201@gmail.com>
1 parent f4ae1c1 commit 3567f6f

3 files changed

Lines changed: 179 additions & 1 deletion

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
* [FEATURE] Distributor: Add a per-tenant flag `-distributor.enable-type-and-unit-labels` that enables adding `__unit__` and `__type__` labels for remote write v2 and OTLP requests. This is a breaking change; the `-distributor.otlp.enable-type-and-unit-labels` flag is now deprecated, operates as a no-op, and has been consolidated into this new flag. #7077
1414
* [FEATURE] Querier: Add experimental projection pushdown support in Parquet Queryable. #7152
1515
* [FEATURE] Ingester: Add experimental active series queried metric. #7173
16+
* [ENHANCEMENT] HATracker: Add a local cache warmup on startup to prevent KV store operations. #7213
1617
* [ENHANCEMENT] Ingester: Add support for ingesting Native Histogram with Custom Buckets. #7191
1718
* [ENHANCEMENT] Ingester: Optimize labels out-of-order (ooo) check by allowing the iteration to terminate immediately upon finding the first unsorted label. #7186
1819
* [ENHANCEMENT] Distributor: Skip attaching `__unit__` and `__type__` labels when `-distributor.enable-type-and-unit-labels` is enabled, as these are appended from metadata. #7145

pkg/ha/ha_tracker.go

Lines changed: 76 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"errors"
66
"flag"
77
"fmt"
8+
"maps"
89
"math/rand"
910
"slices"
1011
"strings"
@@ -222,10 +223,84 @@ func NewHATracker(cfg HATrackerConfig, limits HATrackerLimits, trackerStatusConf
222223
t.client = client
223224
}
224225

225-
t.Service = services.NewBasicService(nil, t.loop, nil)
226+
t.Service = services.NewBasicService(t.syncKVStoreToLocalMap, t.loop, nil)
226227
return t, nil
227228
}
228229

230+
// syncKVStoreToLocalMap warms up the local cache by fetching all active entries from the KV store.
231+
func (c *HATracker) syncKVStoreToLocalMap(ctx context.Context) error {
232+
if !c.cfg.EnableHATracker {
233+
return nil
234+
}
235+
236+
start := time.Now()
237+
level.Info(c.logger).Log("msg", "starting HA tracker cache warmup")
238+
239+
keys, err := c.client.List(ctx, "")
240+
if err != nil {
241+
level.Error(c.logger).Log("msg", "failed to list keys during HA tracker cache warmup", "err", err)
242+
return err
243+
}
244+
245+
if len(keys) == 0 {
246+
level.Info(c.logger).Log("msg", "HA tracker cache warmup finished", "reason", "no keys found in KV store")
247+
return nil
248+
}
249+
250+
// create temporarily map
251+
tempElected := make(map[string]ReplicaDesc, len(keys))
252+
tempReplicaGroups := make(map[string]map[string]struct{})
253+
successCount := 0
254+
255+
for _, key := range keys {
256+
if ctx.Err() != nil {
257+
return ctx.Err()
258+
}
259+
260+
val, err := c.client.Get(ctx, key)
261+
if err != nil {
262+
level.Warn(c.logger).Log("msg", "failed to fetch key during cache warmup", "key", key, "err", err)
263+
continue
264+
}
265+
266+
desc, ok := val.(*ReplicaDesc)
267+
if !ok || desc == nil || desc.DeletedAt > 0 {
268+
continue
269+
}
270+
271+
user, cluster, keyHasSeparator := strings.Cut(key, "/")
272+
if !keyHasSeparator {
273+
continue
274+
}
275+
276+
tempElected[key] = *desc
277+
if tempReplicaGroups[user] == nil {
278+
tempReplicaGroups[user] = make(map[string]struct{})
279+
}
280+
tempReplicaGroups[user][cluster] = struct{}{}
281+
successCount++
282+
}
283+
284+
c.electedLock.Lock()
285+
286+
// Update local map
287+
maps.Copy(c.elected, tempElected)
288+
for user, clusters := range tempReplicaGroups {
289+
if c.replicaGroups[user] == nil {
290+
c.replicaGroups[user] = make(map[string]struct{})
291+
}
292+
for cluster := range clusters {
293+
c.replicaGroups[user][cluster] = struct{}{}
294+
}
295+
}
296+
c.electedLock.Unlock()
297+
298+
c.updateUserReplicaGroupCount()
299+
300+
level.Info(c.logger).Log("msg", "HA tracker cache warmup completed", "duration", time.Since(start), "synced keys", successCount)
301+
return nil
302+
}
303+
229304
// Follows pattern used by ring for WatchKey.
230305
func (c *HATracker) loop(ctx context.Context) error {
231306
if !c.cfg.EnableHATracker {

pkg/ha/ha_tracker_test.go

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -758,6 +758,108 @@ func TestCheckReplicaCleanup(t *testing.T) {
758758
))
759759
}
760760

761+
func TestHATracker_CacheWarmupOnStart(t *testing.T) {
762+
t.Parallel()
763+
ctx := context.Background()
764+
reg := prometheus.NewPedanticRegistry()
765+
766+
codec := GetReplicaDescCodec()
767+
kvStore, closer := consul.NewInMemoryClient(codec, log.NewNopLogger(), nil)
768+
t.Cleanup(func() { assert.NoError(t, closer.Close()) })
769+
770+
mockKV := kv.PrefixClient(kvStore, "prefix")
771+
772+
// CAS valid entry
773+
user1 := "user1"
774+
clusterUser1 := "clusterUser1"
775+
key1 := fmt.Sprintf("%s/%s", user1, clusterUser1)
776+
desc1 := &ReplicaDesc{
777+
Replica: "replica-0",
778+
ReceivedAt: timestamp.FromTime(time.Now()),
779+
}
780+
781+
err := mockKV.CAS(ctx, key1, func(_ any) (any, bool, error) {
782+
return desc1, true, nil
783+
})
784+
require.NoError(t, err)
785+
786+
user2 := "user2"
787+
clusterUser2 := "clusterUser2"
788+
key2 := fmt.Sprintf("%s/%s", user2, clusterUser2)
789+
desc2 := &ReplicaDesc{
790+
Replica: "replica-0",
791+
ReceivedAt: timestamp.FromTime(time.Now()),
792+
}
793+
err = mockKV.CAS(ctx, key2, func(_ any) (any, bool, error) {
794+
return desc2, true, nil
795+
})
796+
require.NoError(t, err)
797+
798+
// CAS deleted entry
799+
clusterDeleted := "clusterDeleted"
800+
keyDeleted := fmt.Sprintf("%s/%s", user1, clusterDeleted)
801+
descDeleted := &ReplicaDesc{
802+
Replica: "replica-old",
803+
ReceivedAt: timestamp.FromTime(time.Now()),
804+
DeletedAt: timestamp.FromTime(time.Now()), // Marked as deleted
805+
}
806+
err = mockKV.CAS(ctx, keyDeleted, func(_ any) (any, bool, error) {
807+
return descDeleted, true, nil
808+
})
809+
require.NoError(t, err)
810+
811+
cfg := HATrackerConfig{
812+
EnableHATracker: true,
813+
KVStore: kv.Config{Mock: mockKV}, // Use the seeded KV
814+
UpdateTimeout: time.Second,
815+
UpdateTimeoutJitterMax: 0,
816+
FailoverTimeout: time.Second,
817+
}
818+
819+
tracker, err := NewHATracker(cfg, trackerLimits{maxReplicaGroups: 100}, haTrackerStatusConfig, prometheus.WrapRegistererWithPrefix("cortex_", reg), "test-ha-tracker", log.NewNopLogger())
820+
require.NoError(t, err)
821+
822+
// Start ha tracker
823+
require.NoError(t, services.StartAndAwaitRunning(ctx, tracker))
824+
defer services.StopAndAwaitTerminated(ctx, tracker) // nolint:errcheck
825+
826+
tracker.electedLock.Lock()
827+
// Check local cache updated
828+
desc1Cached, ok := tracker.elected[key1]
829+
require.True(t, ok)
830+
require.Equal(t, desc1.Replica, desc1Cached.Replica)
831+
832+
_, ok = tracker.elected[keyDeleted]
833+
require.False(t, ok)
834+
835+
desc2Cached, ok := tracker.elected[key2]
836+
require.True(t, ok)
837+
require.Equal(t, desc2.Replica, desc2Cached.Replica)
838+
839+
// user1 should have 1 group (clusterUser1), ignoring clusterDeleted
840+
require.NotNil(t, tracker.replicaGroups[user1])
841+
require.Equal(t, 1, len(tracker.replicaGroups[user1]))
842+
_, hasClusterUser1 := tracker.replicaGroups[user1][clusterUser1]
843+
require.True(t, hasClusterUser1)
844+
845+
// user2 should have 1 group (clusterUser2), ignoring clusterDeleted
846+
require.NotNil(t, tracker.replicaGroups[user2])
847+
require.Equal(t, 1, len(tracker.replicaGroups[user2]))
848+
_, hasClusterUser2 := tracker.replicaGroups[user2][clusterUser2]
849+
require.True(t, hasClusterUser2)
850+
851+
tracker.electedLock.Unlock()
852+
853+
// Check metric updated
854+
require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(`
855+
# HELP cortex_ha_tracker_user_replica_group_count Number of HA replica groups tracked for each user.
856+
# TYPE cortex_ha_tracker_user_replica_group_count gauge
857+
cortex_ha_tracker_user_replica_group_count{user="user1"} 1
858+
cortex_ha_tracker_user_replica_group_count{user="user2"} 1
859+
`), "cortex_ha_tracker_user_replica_group_count",
860+
))
861+
}
862+
761863
func checkUserReplicaGroups(t *testing.T, duration time.Duration, c *HATracker, user string, expectedReplicaGroups int) {
762864
t.Helper()
763865
test.Poll(t, duration, nil, func() any {

0 commit comments

Comments
 (0)