@@ -11,6 +11,8 @@ import (
1111 "testing"
1212 "time"
1313
14+ "github.com/prometheus/prometheus/model/labels"
15+ "github.com/prometheus/prometheus/prompb"
1416 "github.com/stretchr/testify/require"
1517 "golang.org/x/sync/errgroup"
1618
@@ -246,3 +248,245 @@ func TestSingleBinaryWithMemberlistScaling(t *testing.T) {
246248 "expected all instances to have %f ring members and %f tombstones" ,
247249 expectedRingMembers , expectedTombstones )
248250}
251+
252+ func TestHATrackerWithMemberlistClusterSync (t * testing.T ) {
253+ s , err := e2e .NewScenario (networkName )
254+ require .NoError (t , err )
255+ defer s .Close ()
256+
257+ // make alert manager config dir
258+ require .NoError (t , writeFileToSharedDir (s , "alertmanager_configs" , []byte {}))
259+
260+ minio := e2edb .NewMinio (9000 , bucketName )
261+ require .NoError (t , s .StartAndWaitReady (minio ))
262+
263+ flags := mergeFlags (BlocksStorageFlags (), map [string ]string {
264+ "-distributor.ha-tracker.enable" : "true" ,
265+ "-distributor.ha-tracker.enable-for-all-users" : "true" ,
266+ "-distributor.ha-tracker.cluster" : "cluster" ,
267+ "-distributor.ha-tracker.replica" : "__replica__" ,
268+ // Use memberlist as the KV store for the HA Tracker
269+ "-distributor.ha-tracker.store" : "memberlist" ,
270+
271+ // To fast failover
272+ "-distributor.ha-tracker.update-timeout" : "1s" ,
273+ "-distributor.ha-tracker.update-timeout-jitter-max" : "0s" ,
274+ "-distributor.ha-tracker.failover-timeout" : "2s" ,
275+
276+ // memberlist config
277+ "-ring.store" : "memberlist" ,
278+ "-memberlist.bind-port" : "8000" ,
279+ })
280+
281+ cortex1 := newSingleBinary ("cortex-1" , "" , "" , flags )
282+ cortex2 := newSingleBinary ("cortex-2" , "" , networkName + "-cortex-1:8000" , flags )
283+ cortex3 := newSingleBinary ("cortex-3" , "" , networkName + "-cortex-1:8000" , flags )
284+
285+ require .NoError (t , s .StartAndWaitReady (cortex1 ))
286+ require .NoError (t , s .StartAndWaitReady (cortex2 , cortex3 ))
287+
288+ // Ensure both Cortex instances have successfully discovered each other in the memberlist cluster.
289+ require .NoError (t , cortex1 .WaitSumMetrics (e2e .Equals (3 ), "memberlist_client_cluster_members_count" ))
290+ require .NoError (t , cortex2 .WaitSumMetrics (e2e .Equals (3 ), "memberlist_client_cluster_members_count" ))
291+ require .NoError (t , cortex3 .WaitSumMetrics (e2e .Equals (3 ), "memberlist_client_cluster_members_count" ))
292+
293+ // All Cortex servers should have 512 tokens, altogether 3 * 512.
294+ require .NoError (t , cortex1 .WaitSumMetrics (e2e .Equals (3 * 512 ), "cortex_ring_tokens_total" ))
295+ require .NoError (t , cortex2 .WaitSumMetrics (e2e .Equals (3 * 512 ), "cortex_ring_tokens_total" ))
296+ require .NoError (t , cortex3 .WaitSumMetrics (e2e .Equals (3 * 512 ), "cortex_ring_tokens_total" ))
297+
298+ now := time .Now ()
299+ userID := "user-1"
300+
301+ client1 , err := e2ecortex .NewClient (cortex1 .HTTPEndpoint (), "" , "" , "" , userID )
302+ require .NoError (t , err )
303+
304+ series , _ := generateSeries ("foo" , now , prompb.Label {Name : "__replica__" , Value : "replica0" }, prompb.Label {Name : "cluster" , Value : "cluster0" })
305+ // send to cortex1
306+ res , err := client1 .Push ([]prompb.TimeSeries {series [0 ]})
307+ require .NoError (t , err )
308+ require .Equal (t , 200 , res .StatusCode )
309+
310+ require .NoError (t , cortex1 .WaitSumMetrics (e2e .Equals (1 ), "cortex_ha_tracker_elected_replica_changes_total" ))
311+ // cortex-2 should be noticed HA reader via memberlist gossip
312+ require .NoError (t , cortex2 .WaitSumMetricsWithOptions (e2e .Equals (1 ), []string {"cortex_ha_tracker_elected_replica_changes_total" }, e2e .WaitMissingMetrics ))
313+ // cortex-3 should be noticed HA reader via memberlist gossip
314+ require .NoError (t , cortex3 .WaitSumMetricsWithOptions (e2e .Equals (1 ), []string {"cortex_ha_tracker_elected_replica_changes_total" }, e2e .WaitMissingMetrics ))
315+
316+ // Wait 5 seconds to ensure the FailoverTimeout (2s) has comfortably passed.
317+ time .Sleep (5 * time .Second )
318+
319+ client2 , err := e2ecortex .NewClient (cortex2 .HTTPEndpoint (), "" , "" , "" , userID )
320+ require .NoError (t , err )
321+
322+ series2 , _ := generateSeries ("foo" , now .Add (time .Second * 30 ), prompb.Label {Name : "__replica__" , Value : "replica1" }, prompb.Label {Name : "cluster" , Value : "cluster0" })
323+ // send to cortex2
324+ res2 , err := client2 .Push ([]prompb.TimeSeries {series2 [0 ]})
325+ require .NoError (t , err )
326+ require .Equal (t , 200 , res2 .StatusCode )
327+
328+ // cortex2 failover to replica1
329+ require .NoError (t , cortex2 .WaitSumMetrics (e2e .Equals (2 ), "cortex_ha_tracker_elected_replica_changes_total" ))
330+ // cortex-1 should be noticed changed HA reader via memberlist gossip
331+ require .NoError (t , cortex1 .WaitSumMetrics (e2e .Equals (2 ), "cortex_ha_tracker_elected_replica_changes_total" ))
332+ // cortex-3 should be noticed changed HA reader via memberlist gossip
333+ require .NoError (t , cortex3 .WaitSumMetrics (e2e .Equals (2 ), "cortex_ha_tracker_elected_replica_changes_total" ))
334+ }
335+
336+ func TestHATrackerWithMemberlist (t * testing.T ) {
337+ s , err := e2e .NewScenario (networkName )
338+ require .NoError (t , err )
339+ defer s .Close ()
340+
341+ // make alert manager config dir
342+ require .NoError (t , writeFileToSharedDir (s , "alertmanager_configs" , []byte {}))
343+
344+ minio := e2edb .NewMinio (9000 , bucketName )
345+ require .NoError (t , s .StartAndWaitReady (minio ))
346+
347+ flags := mergeFlags (BlocksStorageFlags (), map [string ]string {
348+ "-distributor.ha-tracker.enable" : "true" ,
349+ "-distributor.ha-tracker.enable-for-all-users" : "true" ,
350+ "-distributor.ha-tracker.cluster" : "cluster" ,
351+ "-distributor.ha-tracker.replica" : "__replica__" ,
352+ // Use memberlist as the KV store for the HA Tracker
353+ "-distributor.ha-tracker.store" : "memberlist" ,
354+
355+ // To fast failover
356+ "-distributor.ha-tracker.update-timeout" : "1s" ,
357+ "-distributor.ha-tracker.update-timeout-jitter-max" : "0s" ,
358+ "-distributor.ha-tracker.failover-timeout" : "2s" ,
359+
360+ // memberlist config
361+ "-ring.store" : "memberlist" ,
362+ "-memberlist.bind-port" : "8000" ,
363+ })
364+
365+ cortex := newSingleBinary ("cortex" , "" , "" , flags )
366+ require .NoError (t , s .StartAndWaitReady (cortex ))
367+
368+ require .NoError (t , cortex .WaitSumMetrics (e2e .Equals (1 ), "memberlist_client_cluster_members_count" ))
369+ require .NoError (t , cortex .WaitSumMetrics (e2e .Equals (512 ), "cortex_ring_tokens_total" ))
370+
371+ now := time .Now ()
372+ numUsers := 100
373+
374+ for i := 1 ; i <= numUsers ; i ++ {
375+ userID := fmt .Sprintf ("user-%d" , i )
376+ client , err := e2ecortex .NewClient (cortex .HTTPEndpoint (), "" , "" , "" , userID )
377+ require .NoError (t , err )
378+
379+ series , _ := generateSeries ("foo" , now , prompb.Label {Name : "__replica__" , Value : "replica0" }, prompb.Label {Name : "cluster" , Value : "cluster0" })
380+ res , err := client .Push ([]prompb.TimeSeries {series [0 ]})
381+ require .NoError (t , err )
382+ require .Equal (t , 200 , res .StatusCode )
383+ }
384+
385+ require .NoError (t , cortex .WaitSumMetrics (e2e .Equals (float64 (numUsers )), "cortex_ha_tracker_elected_replica_changes_total" ))
386+
387+ // Wait 5 seconds to ensure the FailoverTimeout (2s) has comfortably passed.
388+ time .Sleep (5 * time .Second )
389+
390+ for i := 1 ; i <= numUsers ; i ++ {
391+ userID := fmt .Sprintf ("user-%d" , i )
392+ client , err := e2ecortex .NewClient (cortex .HTTPEndpoint (), "" , "" , "" , userID )
393+ require .NoError (t , err )
394+
395+ // This time, we send data from replica1 instead of replica0.
396+ series , _ := generateSeries ("foo" , now .Add (time .Second * 30 ), prompb.Label {Name : "__replica__" , Value : "replica1" }, prompb.Label {Name : "cluster" , Value : "cluster0" })
397+ res , err := client .Push ([]prompb.TimeSeries {series [0 ]})
398+ require .NoError (t , err )
399+ require .Equal (t , 200 , res .StatusCode )
400+ }
401+
402+ // Since the leader successfully failed over to replica1, the change count increments by 1 per user
403+ require .NoError (t , cortex .WaitSumMetrics (e2e .Equals (float64 (numUsers * 2 )), "cortex_ha_tracker_elected_replica_changes_total" ))
404+ }
405+
406+ func TestHATrackerWithMultiKV (t * testing.T ) {
407+ s , err := e2e .NewScenario (networkName )
408+ require .NoError (t , err )
409+ defer s .Close ()
410+
411+ // make alert manager config dir
412+ require .NoError (t , writeFileToSharedDir (s , "alertmanager_configs" , []byte {}))
413+
414+ consul := e2edb .NewConsul ()
415+ minio := e2edb .NewMinio (9000 , bucketName )
416+ require .NoError (t , s .StartAndWaitReady (consul , minio ))
417+
418+ flags := mergeFlags (BlocksStorageFlags (), map [string ]string {
419+ "-distributor.ha-tracker.enable" : "true" ,
420+ "-distributor.ha-tracker.enable-for-all-users" : "true" ,
421+ "-distributor.ha-tracker.cluster" : "cluster" ,
422+ "-distributor.ha-tracker.replica" : "__replica__" ,
423+ // Use memberlist as the KV store for the HA Tracker
424+ "-distributor.ha-tracker.store" : "multi" ,
425+
426+ // To fast failover
427+ "-distributor.ha-tracker.update-timeout" : "1s" ,
428+ "-distributor.ha-tracker.update-timeout-jitter-max" : "0s" ,
429+ "-distributor.ha-tracker.failover-timeout" : "2s" ,
430+
431+ // multi KV config
432+ "-distributor.ha-tracker.multi.primary" : "consul" ,
433+ "-distributor.ha-tracker.multi.secondary" : "memberlist" ,
434+ "-distributor.ha-tracker.consul.hostname" : consul .NetworkHTTPEndpoint (),
435+
436+ // Enable data mirroring
437+ "-distributor.ha-tracker.multi.mirror-enabled" : "true" ,
438+
439+ // memberlist config
440+ "-ring.store" : "memberlist" ,
441+ "-memberlist.bind-port" : "8000" ,
442+ })
443+
444+ cortex := newSingleBinary ("cortex" , "" , "" , flags )
445+ require .NoError (t , s .StartAndWaitReady (cortex ))
446+
447+ require .NoError (t , cortex .WaitSumMetrics (e2e .Equals (1 ), "memberlist_client_cluster_members_count" ))
448+ require .NoError (t , cortex .WaitSumMetrics (e2e .Equals (512 ), "cortex_ring_tokens_total" ))
449+
450+ // mirror enabled
451+ require .NoError (t , cortex .WaitSumMetrics (e2e .Equals (float64 (1 )), "cortex_multikv_mirror_enabled" ))
452+ // consul as primary KV Store
453+ require .NoError (t , cortex .WaitSumMetricsWithOptions (e2e .Equals (float64 (1 )), []string {"cortex_multikv_primary_store" }, e2e .WaitMissingMetrics ,
454+ e2e .WithLabelMatchers (labels .MustNewMatcher (labels .MatchEqual , "store" , "consul" ))),
455+ )
456+
457+ now := time .Now ()
458+ numUsers := 100
459+
460+ for i := 1 ; i <= numUsers ; i ++ {
461+ userID := fmt .Sprintf ("user-%d" , i )
462+ client , err := e2ecortex .NewClient (cortex .HTTPEndpoint (), "" , "" , "" , userID )
463+ require .NoError (t , err )
464+
465+ series , _ := generateSeries ("foo" , now , prompb.Label {Name : "__replica__" , Value : "replica0" }, prompb.Label {Name : "cluster" , Value : "cluster0" })
466+ res , err := client .Push ([]prompb.TimeSeries {series [0 ]})
467+ require .NoError (t , err )
468+ require .Equal (t , 200 , res .StatusCode )
469+ }
470+
471+ require .NoError (t , cortex .WaitSumMetrics (e2e .Equals (float64 (numUsers )), "cortex_ha_tracker_elected_replica_changes_total" ))
472+
473+ // Wait 5 seconds to ensure the FailoverTimeout (2s) has comfortably passed.
474+ time .Sleep (5 * time .Second )
475+
476+ for i := 1 ; i <= numUsers ; i ++ {
477+ userID := fmt .Sprintf ("user-%d" , i )
478+ client , err := e2ecortex .NewClient (cortex .HTTPEndpoint (), "" , "" , "" , userID )
479+ require .NoError (t , err )
480+
481+ // This time, we send data from replica1 instead of replica0.
482+ series , _ := generateSeries ("foo" , now .Add (time .Second * 30 ), prompb.Label {Name : "__replica__" , Value : "replica1" }, prompb.Label {Name : "cluster" , Value : "cluster0" })
483+ res , err := client .Push ([]prompb.TimeSeries {series [0 ]})
484+ require .NoError (t , err )
485+ require .Equal (t , 200 , res .StatusCode )
486+ }
487+
488+ // Since the leader successfully failed over to replica1, the change count increments by 1 per user
489+ require .NoError (t , cortex .WaitSumMetrics (e2e .Equals (float64 (numUsers * 2 )), "cortex_ha_tracker_elected_replica_changes_total" ))
490+ // Two keys (1 cluster with 2 replicas) per user should be written to the memberlist (secondary store)
491+ require .NoError (t , cortex .WaitSumMetrics (e2e .Equals (float64 (numUsers * 2 )), "cortex_multikv_mirror_writes_total" ))
492+ }
0 commit comments