Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 16 additions & 6 deletions cmd/cachewd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,15 +327,25 @@ func fatalIfError(ctx context.Context, logger *slog.Logger, err error, msg strin
os.Exit(1)
}

// extractPathPrefix extracts the strategy name, path prefix from a request path.
// Examples: /git/... -> "git", /gomod/... -> "gomod", /api/v1/... -> "api".
// extractPathPrefix returns the first path segment, or the first two
// non-version segments for /api/ so that object/stats/namespaces aren't
// lumped together (e.g. /api/v1/object/... -> "api/object").
func extractPathPrefix(path string) string {
if path == "" || path == "/" {
parts := strings.SplitN(strings.Trim(path, "/"), "/", 4)
if len(parts) == 0 || parts[0] == "" {
return ""
}
trimmed := strings.TrimPrefix(path, "/")
prefix, _, _ := strings.Cut(trimmed, "/")
return prefix
if parts[0] != "api" || len(parts) < 2 {
return parts[0]
}
i := 1
if i < len(parts) && len(parts[i]) > 0 && parts[i][0] == 'v' {
i++
}
if i < len(parts) {
return "api/" + parts[i]
}
return "api"
}

func newServer(
Expand Down
31 changes: 31 additions & 0 deletions cmd/cachewd/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package main

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestExtractPathPrefix(t *testing.T) {
tests := []struct {
path string
want string
}{
{"", ""},
{"/", ""},
{"/git/github.com/org/repo/info/refs", "git"},
{"/gomod/proxy.golang.org/x/mod/@latest", "gomod"},
{"/hermit/packages/go-1.22.0.tar.gz", "hermit"},
{"/api/v1/object/brew/some-key", "api/object"},
{"/api/v1/stats", "api/stats"},
{"/api/v1/namespaces", "api/namespaces"},
{"/api/object/ns/key", "api/object"},
{"/api", "api"},
{"/api/", "api"},
}
for _, tt := range tests {
t.Run(tt.path, func(t *testing.T) {
assert.Equal(t, tt.want, extractPathPrefix(tt.path))
})
}
}
4 changes: 4 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ require (
github.com/minio/minio-go/v7 v7.0.100
github.com/open-policy-agent/opa v1.15.2
github.com/prometheus/client_golang v1.23.2
github.com/stretchr/testify v1.11.1
go.etcd.io/bbolt v1.4.3
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.68.0
go.opentelemetry.io/contrib/instrumentation/runtime v0.68.0
Expand All @@ -33,6 +34,7 @@ require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/cenkalti/backoff/v5 v5.0.3 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.4.0 // indirect
github.com/dlclark/regexp2 v1.11.5 // indirect
github.com/dustin/go-humanize v1.0.1 // indirect
Expand All @@ -59,6 +61,7 @@ require (
github.com/minio/md5-simd v1.1.2 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/philhofer/fwd v1.2.0 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/prometheus/client_model v0.6.2 // indirect
github.com/prometheus/common v0.67.5 // indirect
github.com/prometheus/otlptranslator v1.0.0 // indirect
Expand Down Expand Up @@ -87,6 +90,7 @@ require (
google.golang.org/genproto/googleapis/rpc v0.0.0-20260401024825-9d38bb4040a9 // indirect
google.golang.org/grpc v1.80.0 // indirect
google.golang.org/protobuf v1.36.11 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
sigs.k8s.io/yaml v1.6.0 // indirect
)

Expand Down
15 changes: 15 additions & 0 deletions internal/gitclone/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"syscall"
"time"

Expand Down Expand Up @@ -85,6 +86,19 @@ type Repository struct {
refCheckValid bool
fetchSem chan struct{}
credentialProvider CredentialProvider
lastAccessed atomic.Int64
}

func (r *Repository) TouchAccessed() {
r.lastAccessed.Store(time.Now().UnixNano())
}

func (r *Repository) LastAccessed() time.Time {
ns := r.lastAccessed.Load()
if ns == 0 {
return time.Time{}
}
return time.Unix(0, ns)
}

type Manager struct {
Expand Down Expand Up @@ -199,6 +213,7 @@ func (m *Manager) GetOrCreate(_ context.Context, upstreamURL string) (*Repositor
fetchSem: make(chan struct{}, 1),
credentialProvider: m.credentialProvider,
}
repo.lastAccessed.Store(time.Now().UnixNano())

headFile := filepath.Join(clonePath, "HEAD")
if _, err := os.Stat(headFile); err == nil {
Expand Down
87 changes: 77 additions & 10 deletions internal/jobscheduler/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"runtime"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/alecthomas/errors"
Expand All @@ -22,6 +23,16 @@ type Config struct {
SchedulerDB string `hcl:"scheduler-db" help:"Path to the scheduler state database." default:"${CACHEW_STATE}/scheduler.db"`
}

// idledPeriodicJob stores enough information to re-arm a periodic job that was
// stopped due to queue inactivity.
type idledPeriodicJob struct {
queue string
id string
interval time.Duration
run func(ctx context.Context) error
idleTimeout time.Duration
}

type queueJob struct {
id string
queue string
Expand Down Expand Up @@ -49,9 +60,14 @@ type Scheduler interface {
// Jobs run concurrently across queues, but never within a queue.
Submit(queue, id string, run func(ctx context.Context) error)
// SubmitPeriodicJob submits a job to the queue that runs immediately, and then periodically after the interval.
// If idleTimeout is provided, the job stops re-arming when the queue has not been touched (via Touch) for
// longer than the timeout. Calling Touch on an idle queue re-arms all its stopped periodic jobs.
//
// Jobs run concurrently across queues, but never within a queue.
SubmitPeriodicJob(queue, id string, interval time.Duration, run func(ctx context.Context) error)
SubmitPeriodicJob(queue, id string, interval time.Duration, run func(ctx context.Context) error, idleTimeout ...time.Duration)
// Touch records activity for a queue, resetting its idle timer. If the queue had periodic jobs that were
// stopped due to inactivity, they are re-scheduled.
Touch(queue string)
}

type prefixedScheduler struct {
Expand All @@ -63,8 +79,12 @@ func (p *prefixedScheduler) Submit(queue, id string, run func(ctx context.Contex
p.scheduler.Submit(queue, p.prefix+id, run)
}

func (p *prefixedScheduler) SubmitPeriodicJob(queue, id string, interval time.Duration, run func(ctx context.Context) error) {
p.scheduler.SubmitPeriodicJob(queue, p.prefix+id, interval, run)
func (p *prefixedScheduler) SubmitPeriodicJob(queue, id string, interval time.Duration, run func(ctx context.Context) error, idleTimeout ...time.Duration) {
p.scheduler.SubmitPeriodicJob(queue, p.prefix+id, interval, run, idleTimeout...)
}

func (p *prefixedScheduler) Touch(queue string) {
p.scheduler.Touch(queue)
}

func (p *prefixedScheduler) WithQueuePrefix(prefix string) Scheduler {
Expand All @@ -85,11 +105,13 @@ type RootScheduler struct {
// ctx is cancelled when the scheduler is shutting down. Periodic re-arm
// goroutines select on it so they exit cleanly instead of submitting to a
// dead scheduler.
ctx context.Context //nolint:containedctx
cancel context.CancelFunc
wg sync.WaitGroup
store ScheduleStore
metrics *schedulerMetrics
ctx context.Context //nolint:containedctx
cancel context.CancelFunc
wg sync.WaitGroup
store ScheduleStore
metrics *schedulerMetrics
lastTouched sync.Map // queue -> *atomic.Int64 (unix nanos)
idledJobs sync.Map // jobKey -> *idledPeriodicJob
}

var _ Scheduler = &RootScheduler{}
Expand Down Expand Up @@ -173,7 +195,18 @@ func (q *RootScheduler) Submit(queue, id string, run func(ctx context.Context) e
q.cond.Signal()
}

func (q *RootScheduler) SubmitPeriodicJob(queue, id string, interval time.Duration, run func(ctx context.Context) error) {
func (q *RootScheduler) SubmitPeriodicJob(queue, id string, interval time.Duration, run func(ctx context.Context) error, idleTimeout ...time.Duration) {
var timeout time.Duration
if len(idleTimeout) > 0 {
timeout = idleTimeout[0]
}
if timeout > 0 {
q.touchQueue(queue)
}
q.submitPeriodicJob(queue, id, interval, run, timeout)
}

func (q *RootScheduler) submitPeriodicJob(queue, id string, interval time.Duration, run func(ctx context.Context) error, idleTimeout time.Duration) {
if q.ctx.Err() != nil {
return
}
Expand All @@ -192,7 +225,14 @@ func (q *RootScheduler) SubmitPeriodicJob(queue, id string, interval time.Durati
// to wake and submit to a dead scheduler. The new pod's
// warmExistingRepos re-registers periodic jobs on startup.
go q.sleepThenSubmit(interval, func() {
q.SubmitPeriodicJob(queue, id, interval, run)
if idleTimeout > 0 && q.isQueueIdle(queue, idleTimeout) {
logging.FromContext(ctx).InfoContext(ctx, "Periodic job idled out", "queue", queue, "job", id)
q.idledJobs.Store(key, &idledPeriodicJob{
queue: queue, id: id, interval: interval, run: run, idleTimeout: idleTimeout,
})
return
}
q.submitPeriodicJob(queue, id, interval, run, idleTimeout)
})
return errors.WithStack(err)
})
Expand All @@ -204,6 +244,33 @@ func (q *RootScheduler) SubmitPeriodicJob(queue, id string, interval time.Durati
go q.sleepThenSubmit(delay, submit)
}

// Touch records activity for a queue, resetting its idle timer. If the queue
// had periodic jobs that were stopped due to inactivity, they are re-scheduled.
func (q *RootScheduler) Touch(queue string) {
q.touchQueue(queue)
q.idledJobs.Range(func(key, value any) bool {
job := value.(*idledPeriodicJob)
if job.queue == queue {
q.idledJobs.Delete(key)
q.submitPeriodicJob(job.queue, job.id, job.interval, job.run, job.idleTimeout)
}
return true
})
}

func (q *RootScheduler) touchQueue(queue string) {
val, _ := q.lastTouched.LoadOrStore(queue, &atomic.Int64{})
val.(*atomic.Int64).Store(time.Now().UnixNano())
}

func (q *RootScheduler) isQueueIdle(queue string, timeout time.Duration) bool {
val, ok := q.lastTouched.Load(queue)
if !ok {
return true
}
return time.Since(time.Unix(0, val.(*atomic.Int64).Load())) > timeout
}

// sleepThenSubmit waits for d, then runs fn — unless the scheduler is
// shutting down, in which case it returns immediately.
func (q *RootScheduler) sleepThenSubmit(d time.Duration, fn func()) {
Expand Down
32 changes: 32 additions & 0 deletions internal/jobscheduler/jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,38 @@ func TestJobSchedulerPeriodicJob(t *testing.T) {
}, "periodic job should execute multiple times")
}

func TestJobSchedulerPeriodicJobStopsOnIdle(t *testing.T) {
_, ctx := logging.Configure(context.Background(), logging.Config{Level: slog.LevelError})
ctx, cancel := context.WithCancel(ctx)
defer cancel()

scheduler, err := jobscheduler.New(ctx, jobscheduler.Config{Concurrency: 1})
assert.NoError(t, err)
t.Cleanup(func() { scheduler.Close() })

var executions atomic.Int32
done := make(chan struct{})

// Submit with a nanosecond idle timeout. The initial SubmitPeriodicJob
// touches the queue so the first execution runs. By the time the re-arm
// check fires, the nanosecond timeout has elapsed and the job stops.
scheduler.SubmitPeriodicJob("queue1", "idle-job", time.Nanosecond, func(_ context.Context) error {
executions.Add(1)
close(done)
return nil
}, time.Nanosecond)

<-done

// Cancel and drain the scheduler. If the job was re-armed, the worker
// would execute it before Wait returns because concurrency is 1.
cancel()
scheduler.Wait()

assert.Equal(t, int32(1), executions.Load(),
"periodic job should not re-arm after idle timeout")
}

func TestJobSchedulerPeriodicJobWithError(t *testing.T) {
_, ctx := logging.Configure(context.Background(), logging.Config{Level: slog.LevelError})
ctx, cancel := context.WithCancel(ctx)
Expand Down
7 changes: 7 additions & 0 deletions internal/strategy/git/git.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type Config struct {
RepackInterval time.Duration `hcl:"repack-interval,optional" help:"How often to run full repack. 0 disables." default:"0"`
ZstdThreads int `hcl:"zstd-threads,optional" help:"Threads for zstd compression/decompression. 0 = all CPU cores; useful for short-lived CLI invocations but risky on a long-running server where multiple snapshot/restore operations can run concurrently." default:"4"`
BundleCacheTTL time.Duration `hcl:"bundle-cache-ttl,optional" help:"TTL of cached server-side git bundles." default:"2h"`
IdleTimeout time.Duration `hcl:"idle-timeout,optional" help:"Stop periodic jobs for repos with no client requests for this duration. 0 disables." default:"72h"`
}

type Strategy struct {
Expand Down Expand Up @@ -197,6 +198,11 @@ func (s *Strategy) Ready() bool {
return s.ready.Load()
}

func (s *Strategy) touchRepo(repo *gitclone.Repository) {
repo.TouchAccessed()
s.scheduler.Touch(repo.UpstreamURL())
}

// SetMetadataStore enables the per-repo clone histogram and schedules its
// daily reaper. Called by config.Load after the metadata backend is built.
func (s *Strategy) SetMetadataStore(store *metadatadb.Store) {
Expand Down Expand Up @@ -332,6 +338,7 @@ func (s *Strategy) handleGitRequest(w http.ResponseWriter, r *http.Request, host
http.Error(w, "Internal server error", http.StatusInternalServerError)
return
}
s.touchRepo(repo)

// Increment after GetOrCreate so unvalidated URLs can't bloat the keyspace.
if isClone, cerr := RequestIsClone(pathValue, r); cerr != nil {
Expand Down
1 change: 1 addition & 0 deletions internal/strategy/git/refs.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ func (s *Strategy) handleEnsureRefs(w http.ResponseWriter, r *http.Request, host
http.Error(w, "internal server error", http.StatusInternalServerError)
return
}
s.touchRepo(repo)

if repo.State() != gitclone.StateReady {
if err := s.ensureCloneReady(ctx, repo); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion internal/strategy/git/repack.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,5 @@ func (s *Strategy) scheduleRepackJobs(repo *gitclone.Repository) {
}
s.metrics.recordOperation(ctx, "repack", status, time.Since(start))
return errors.Wrap(err, "repack")
})
}, s.config.IdleTimeout)
}
Loading