Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 118 additions & 4 deletions distribution/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,26 @@ import (
"sort"
"sync"
"sync/atomic"

"github.com/cockroachdb/errors"
)

// Route represents a mapping from a key range to a raft group.
// Ranges are right half-open intervals: [Start, End). Start is inclusive and
// End is exclusive. A nil End denotes an unbounded interval extending to
// positive infinity.
type Route struct {
// RouteID is the durable identifier assigned by route catalog.
// Zero means ephemeral/non-catalog routes.
RouteID uint64
// Start marks the inclusive beginning of the range.
Start []byte
// End marks the exclusive end of the range. nil means unbounded.
End []byte
// GroupID identifies the raft group for the range starting at Start.
GroupID uint64
// State tracks control-plane state for this route.
State RouteState
// Load tracks the number of accesses served by this range.
Load uint64
}
Expand All @@ -26,11 +33,20 @@ type Route struct {
type Engine struct {
mu sync.RWMutex
routes []Route
catalogVersion uint64
ts uint64
hotspotThreshold uint64
}

const defaultGroupID uint64 = 1
const minRouteCountForOrderValidation = 2

var (
ErrEngineSnapshotVersionStale = errors.New("engine snapshot version is stale")
ErrEngineSnapshotDuplicateID = errors.New("engine snapshot has duplicate route id")
ErrEngineSnapshotRouteOverlap = errors.New("engine snapshot has overlapping routes")
ErrEngineSnapshotRouteOrder = errors.New("engine snapshot has invalid route order")
)

// NewEngine creates an Engine with no hotspot splitting.
func NewEngine() *Engine {
Expand All @@ -52,12 +68,47 @@ func NewEngineWithThreshold(threshold uint64) *Engine {
return &Engine{routes: make([]Route, 0), hotspotThreshold: threshold}
}

// Version returns current route catalog version applied to the engine.
func (e *Engine) Version() uint64 {
e.mu.RLock()
defer e.mu.RUnlock()
return e.catalogVersion
}

// ApplySnapshot atomically replaces all in-memory routes with the provided
// catalog snapshot when the snapshot version is newer.
func (e *Engine) ApplySnapshot(snapshot CatalogSnapshot) error {
e.mu.Lock()
defer e.mu.Unlock()

if snapshot.Version < e.catalogVersion {
return errors.WithStack(ErrEngineSnapshotVersionStale)
Comment thread
bootjp marked this conversation as resolved.
Outdated
}
if snapshot.Version == e.catalogVersion {
return nil
}

routes, err := routesFromCatalog(snapshot.Routes)
if err != nil {
return err
}
Comment thread
bootjp marked this conversation as resolved.

e.routes = routes
e.catalogVersion = snapshot.Version
return nil
}
Comment thread
bootjp marked this conversation as resolved.
Comment thread
bootjp marked this conversation as resolved.

// UpdateRoute registers or updates a route for the given key range.
// Routes are stored sorted by Start.
func (e *Engine) UpdateRoute(start, end []byte, group uint64) {
e.mu.Lock()
defer e.mu.Unlock()
e.routes = append(e.routes, Route{Start: start, End: end, GroupID: group})
e.routes = append(e.routes, Route{
Start: start,
End: end,
GroupID: group,
State: RouteStateActive,
})
sort.Slice(e.routes, func(i, j int) bool {
return bytes.Compare(e.routes[i].Start, e.routes[j].Start) < 0
})
Expand Down Expand Up @@ -116,7 +167,14 @@ func (e *Engine) Stats() []Route {
defer e.mu.RUnlock()
stats := make([]Route, len(e.routes))
for i, r := range e.routes {
stats[i] = Route{Start: cloneBytes(r.Start), End: cloneBytes(r.End), GroupID: r.GroupID, Load: atomic.LoadUint64(&e.routes[i].Load)}
stats[i] = Route{
RouteID: r.RouteID,
Start: cloneBytes(r.Start),
End: cloneBytes(r.End),
GroupID: r.GroupID,
State: r.State,
Load: atomic.LoadUint64(&e.routes[i].Load),
}
}
return stats
}
Expand All @@ -143,9 +201,11 @@ func (e *Engine) GetIntersectingRoutes(start, end []byte) []Route {
}
// Route intersects with scan range
result = append(result, Route{
RouteID: r.RouteID,
Start: cloneBytes(r.Start),
End: cloneBytes(r.End),
GroupID: r.GroupID,
State: r.State,
Load: atomic.LoadUint64(&r.Load),
})
}
Expand Down Expand Up @@ -182,14 +242,68 @@ func (e *Engine) splitRange(idx int) {
e.routes[idx].Load = 0
return
}
left := Route{Start: r.Start, End: mid, GroupID: r.GroupID}
right := Route{Start: mid, End: r.End, GroupID: r.GroupID}
left := Route{Start: r.Start, End: mid, GroupID: r.GroupID, State: RouteStateActive}
right := Route{Start: mid, End: r.End, GroupID: r.GroupID, State: RouteStateActive}
Comment thread
bootjp marked this conversation as resolved.
Outdated
// replace the range at idx with left and right in an idiomatic manner
e.routes = append(e.routes[:idx+1], e.routes[idx:]...)
e.routes[idx] = left
e.routes[idx+1] = right
}

func routesFromCatalog(routes []RouteDescriptor) ([]Route, error) {
if len(routes) == 0 {
return []Route{}, nil
}

out := make([]Route, 0, len(routes))
Comment thread
bootjp marked this conversation as resolved.
Outdated
seen := make(map[uint64]struct{}, len(routes))
for _, rd := range routes {
if err := validateRouteDescriptor(rd); err != nil {
return nil, err
}
if _, exists := seen[rd.RouteID]; exists {
return nil, errors.WithStack(ErrEngineSnapshotDuplicateID)
}
seen[rd.RouteID] = struct{}{}
out = append(out, Route{
RouteID: rd.RouteID,
Start: cloneBytes(rd.Start),
End: cloneBytes(rd.End),
GroupID: rd.GroupID,
State: rd.State,
Comment thread
bootjp marked this conversation as resolved.
})
}

sort.Slice(out, func(i, j int) bool {
return bytes.Compare(out[i].Start, out[j].Start) < 0
})
Comment thread
bootjp marked this conversation as resolved.
if err := validateRouteOrder(out); err != nil {
return nil, err
}
return out, nil
}

func validateRouteOrder(routes []Route) error {
if len(routes) < minRouteCountForOrderValidation {
return nil
}
for i := 1; i < len(routes); i++ {
prev := routes[i-1]
curr := routes[i]

if bytes.Compare(prev.Start, curr.Start) >= 0 {
return errors.WithStack(ErrEngineSnapshotRouteOrder)
}
Comment thread
bootjp marked this conversation as resolved.
if prev.End == nil {
return errors.WithStack(ErrEngineSnapshotRouteOrder)
}
Comment thread
bootjp marked this conversation as resolved.
Comment thread
bootjp marked this conversation as resolved.
Comment thread
bootjp marked this conversation as resolved.
if bytes.Compare(prev.End, curr.Start) > 0 {
return errors.WithStack(ErrEngineSnapshotRouteOverlap)
}
}
return nil
}

func cloneBytes(b []byte) []byte {
if b == nil {
return nil
Expand Down
103 changes: 103 additions & 0 deletions distribution/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"bytes"
"sync"
"testing"

"github.com/cockroachdb/errors"
)

func TestEngineRouteLookup(t *testing.T) {
Expand Down Expand Up @@ -231,3 +233,104 @@ func TestEngineGetIntersectingRoutes(t *testing.T) {
})
}
}

func TestEngineApplySnapshot_ReplacesRoutesAndVersion(t *testing.T) {
e := NewEngine()
e.UpdateRoute([]byte("a"), []byte("z"), 1)

if got := e.Version(); got != 0 {
t.Fatalf("expected initial version 0, got %d", got)
}

err := e.ApplySnapshot(CatalogSnapshot{
Version: 1,
Routes: []RouteDescriptor{
{RouteID: 10, Start: []byte(""), End: []byte("m"), GroupID: 1, State: RouteStateActive},
{RouteID: 11, Start: []byte("m"), End: nil, GroupID: 2, State: RouteStateWriteFenced},
},
})
if err != nil {
t.Fatalf("apply snapshot: %v", err)
}

if got := e.Version(); got != 1 {
t.Fatalf("expected version 1, got %d", got)
}

stats := e.Stats()
if len(stats) != 2 {
t.Fatalf("expected 2 routes, got %d", len(stats))
}
if stats[0].RouteID != 10 || stats[0].State != RouteStateActive {
t.Fatalf("unexpected first route metadata: %+v", stats[0])
}
if stats[1].RouteID != 11 || stats[1].State != RouteStateWriteFenced {
t.Fatalf("unexpected second route metadata: %+v", stats[1])
}
}

func TestEngineApplySnapshot_RejectsOldVersion(t *testing.T) {
e := NewEngine()

if err := e.ApplySnapshot(CatalogSnapshot{
Version: 2,
Routes: []RouteDescriptor{
{RouteID: 1, Start: []byte(""), End: nil, GroupID: 1, State: RouteStateActive},
},
}); err != nil {
Comment thread
bootjp marked this conversation as resolved.
t.Fatalf("first apply snapshot: %v", err)
}

err := e.ApplySnapshot(CatalogSnapshot{
Version: 1,
Routes: []RouteDescriptor{
{RouteID: 2, Start: []byte(""), End: nil, GroupID: 9, State: RouteStateActive},
},
})
if !errors.Is(err, ErrEngineSnapshotVersionStale) {
t.Fatalf("expected ErrEngineSnapshotVersionStale, got %v", err)
}

route, ok := e.GetRoute([]byte("k"))
if !ok {
t.Fatal("expected route after stale apply")
}
if route.GroupID != 1 || route.RouteID != 1 {
t.Fatalf("expected route to remain unchanged, got %+v", route)
}
}

func TestEngineApplySnapshot_LookupBehavior(t *testing.T) {
e := NewEngine()
err := e.ApplySnapshot(CatalogSnapshot{
Version: 1,
Routes: []RouteDescriptor{
{RouteID: 1, Start: []byte("a"), End: []byte("m"), GroupID: 1, State: RouteStateActive},
{RouteID: 2, Start: []byte("m"), End: nil, GroupID: 2, State: RouteStateActive},
},
})
if err != nil {
t.Fatalf("apply snapshot: %v", err)
}

cases := []struct {
key []byte
group uint64
expect bool
}{
{[]byte("0"), 0, false},
{[]byte("a"), 1, true},
{[]byte("b"), 1, true},
{[]byte("m"), 2, true},
{[]byte("x"), 2, true},
}
for _, c := range cases {
r, ok := e.GetRoute(c.key)
if ok != c.expect {
t.Fatalf("key %q expected ok=%v, got %v", c.key, c.expect, ok)
}
if ok && r.GroupID != c.group {
t.Fatalf("key %q expected group %d, got %d", c.key, c.group, r.GroupID)
}
}
}
Comment thread
bootjp marked this conversation as resolved.
Loading