Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 14 additions & 7 deletions cmd/cachewd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,10 @@ func main() {

schedulerProvider := jobscheduler.NewProvider(ctx, globalConfig.SchedulerConfig)

cr, mr, sr := newRegistries(schedulerProvider, gitManagerProvider, tokenManagerProvider, s3ClientProvider)
// Populated by config.Load before strategies are constructed.
metaStoreProvider, setMetaStore := metadatadb.NewLazyStoreProvider()

cr, mr, sr := newRegistries(schedulerProvider, gitManagerProvider, tokenManagerProvider, s3ClientProvider, metaStoreProvider)

// Commands
switch { //nolint:gocritic
Expand All @@ -90,8 +93,11 @@ func main() {
return
}

mux, err := newMux(ctx, cr, mr, sr, providersConfigHCL, envars)
mux, metadataBackend, err := newMux(ctx, cr, mr, sr, providersConfigHCL, envars)
fatalIfError(ctx, logger, err, "Failed to load config")
if metadataBackend != nil {
setMetaStore(metadatadb.New(ctx, metadataBackend))
}

metricsClient, err := metrics.New(ctx, globalConfig.MetricsConfig)
fatalIfError(ctx, logger, err, "Failed to create metrics client")
Expand Down Expand Up @@ -126,6 +132,7 @@ func newRegistries(
cloneManagerProvider gitclone.ManagerProvider,
tokenManagerProvider githubapp.TokenManagerProvider,
s3ClientProvider s3client.ClientProvider,
metaStoreProvider metadatadb.StoreProvider,
) (
*cache.Registry,
*metadatadb.Registry,
Expand All @@ -147,7 +154,7 @@ func newRegistries(
strategy.RegisterHermit(sr)
strategy.RegisterHost(sr)
strategy.RegisterHTTPProxy(sr)
git.Register(sr, scheduler, cloneManagerProvider, tokenManagerProvider)
git.Register(sr, scheduler, cloneManagerProvider, tokenManagerProvider, metaStoreProvider)
gomod.Register(sr, cloneManagerProvider)

return cr, mr, sr
Expand All @@ -166,7 +173,7 @@ func printSchema(kctx *kong.Context, cr *cache.Registry, mr *metadatadb.Registry
}
}

func newMux(ctx context.Context, cr *cache.Registry, mr *metadatadb.Registry, sr *strategy.Registry, providersConfigHCL *hcl.AST, vars map[string]string) (http.Handler, error) {
func newMux(ctx context.Context, cr *cache.Registry, mr *metadatadb.Registry, sr *strategy.Registry, providersConfigHCL *hcl.AST, vars map[string]string) (http.Handler, metadatadb.Backend, error) {
mux := http.NewServeMux()

mux.HandleFunc("GET /_liveness", func(w http.ResponseWriter, _ *http.Request) {
Expand Down Expand Up @@ -209,13 +216,13 @@ func newMux(ctx context.Context, cr *cache.Registry, mr *metadatadb.Registry, sr
http.DefaultServeMux.ServeHTTP(w, r)
}))

handler, _, loaded, err := config.Load(ctx, cr, mr, sr, providersConfigHCL, mux, vars)
handler, metadataBackend, loaded, err := config.Load(ctx, cr, mr, sr, providersConfigHCL, mux, vars)
if err != nil {
return nil, errors.Errorf("load config: %w", err)
return nil, nil, errors.Errorf("load config: %w", err)
}
readiers = loaded

return handler, nil
return handler, metadataBackend, nil
}

func fatalIfError(ctx context.Context, logger *slog.Logger, err error, msg string) {
Expand Down
23 changes: 23 additions & 0 deletions internal/metadatadb/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"log/slog"
"slices"
"sync"
"sync/atomic"

"github.com/alecthomas/errors"

Expand Down Expand Up @@ -75,6 +76,28 @@ type Namespace struct {
name string
}

// StoreProvider lazily resolves the *Store for components constructed before
// the Store exists. A nil provider means no metadata is configured; callers
// must tolerate this.
type StoreProvider func() (*Store, error)

// NewLazyStoreProvider returns a memoised StoreProvider together with a setter
// that installs the *Store. The setter must be called before the provider is
// first invoked; otherwise the provider returns an error that is cached for
// the lifetime of the process. Matches the sync.OnceValues provider pattern
// used in gitclone, githubapp, and s3client.
func NewLazyStoreProvider() (StoreProvider, func(*Store)) {
var p atomic.Pointer[Store]
provider := sync.OnceValues(func() (*Store, error) {
s := p.Load()
if s == nil {
return nil, errors.New("metadata store not yet initialised")
}
return s, nil
})
return provider, p.Store
}

// Flush forces an immediate sync with the backend.
func (n *Namespace) Flush(ctx context.Context) error {
return errors.Wrap(n.backend.Flush(ctx, n.name), "flush namespace")
Expand Down
33 changes: 33 additions & 0 deletions internal/metadatadb/api_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package metadatadb_test

import (
"context"
"testing"

"github.com/alecthomas/assert/v2"

"github.com/block/cachew/internal/logging"
"github.com/block/cachew/internal/metadatadb"
)

func TestLazyStoreProviderResolvesAfterSet(t *testing.T) {
_, ctx := logging.Configure(context.Background(), logging.Config{})
provider, set := metadatadb.NewLazyStoreProvider()
store := metadatadb.New(ctx, metadatadb.NewMemoryBackend())
set(store)

got, err := provider()
assert.NoError(t, err)
assert.Equal(t, store, got)

// Subsequent calls return the memoised value.
got2, err := provider()
assert.NoError(t, err)
assert.Equal(t, store, got2)
}

func TestLazyStoreProviderErrorsBeforeSet(t *testing.T) {
provider, _ := metadatadb.NewLazyStoreProvider()
_, err := provider()
assert.Error(t, err)
}
37 changes: 35 additions & 2 deletions internal/strategy/git/git.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,14 @@ import (
"github.com/block/cachew/internal/githubapp"
"github.com/block/cachew/internal/jobscheduler"
"github.com/block/cachew/internal/logging"
"github.com/block/cachew/internal/metadatadb"
"github.com/block/cachew/internal/snapshot"
"github.com/block/cachew/internal/strategy"
)

func Register(r *strategy.Registry, scheduler jobscheduler.Provider, cloneManagerProvider gitclone.ManagerProvider, tokenManagerProvider githubapp.TokenManagerProvider) {
func Register(r *strategy.Registry, scheduler jobscheduler.Provider, cloneManagerProvider gitclone.ManagerProvider, tokenManagerProvider githubapp.TokenManagerProvider, metadataStoreProvider metadatadb.StoreProvider) {
strategy.Register(r, "git", "Caches Git repositories, including tarball snapshots.", func(ctx context.Context, config Config, cache cache.Cache, mux strategy.Mux) (*Strategy, error) {
return New(ctx, config, scheduler, cache, mux, cloneManagerProvider, tokenManagerProvider)
return New(ctx, config, scheduler, cache, mux, cloneManagerProvider, tokenManagerProvider, metadataStoreProvider)
})
}

Expand Down Expand Up @@ -59,6 +60,7 @@ type Strategy struct {
coldSnapshotMu sync.Map // keyed by upstream URL, values are *coldSnapshotEntry
deferredRestoreOnce sync.Map // keyed by upstream URL, ensures at most one deferred restore per repo
metrics *gitMetrics
repoCounts *RepoCounts
ready atomic.Bool
}

Expand All @@ -70,6 +72,7 @@ func New(
mux strategy.Mux,
cloneManagerProvider gitclone.ManagerProvider,
tokenManagerProvider githubapp.TokenManagerProvider,
metadataStoreProvider metadatadb.StoreProvider,
) (*Strategy, error) {
if _, err := exec.LookPath("git"); err != nil {
return nil, errors.New("git is required but not found in PATH")
Expand Down Expand Up @@ -117,6 +120,19 @@ func New(
return nil, errors.Wrap(err, "failed to create scheduler")
}

var repoCounts *RepoCounts
if metadataStoreProvider != nil {
store, err := metadataStoreProvider()
if err != nil {
return nil, errors.Wrap(err, "resolve metadata store")
}
if store != nil {
repoCounts = NewRepoCounts(store.Namespace("git"))
logger.InfoContext(ctx, "Per-repo clone histogram enabled",
"retention_days", repoCounts.retentionDays)
}
}

m := newGitMetrics()

s := &Strategy{
Expand All @@ -129,6 +145,16 @@ func New(
spools: make(map[string]*RepoSpools),
tokenManager: tokenManager,
metrics: m,
repoCounts: repoCounts,
}

if s.repoCounts != nil {
s.scheduler.SubmitPeriodicJob("repo-counts-reaper", "reap-repo-counts", defaultRepoCountsReapInterval, func(ctx context.Context) error {
if deleted := s.repoCounts.Reap(); deleted > 0 {
logging.FromContext(ctx).InfoContext(ctx, "Reaped stale repo clone counts", "deleted", deleted)
}
return nil
})
}
// Run startup fetches in the background so the HTTP listener (and
// /_liveness) come up immediately. /_readiness gates on Ready() so the
Expand Down Expand Up @@ -301,6 +327,13 @@ func (s *Strategy) handleGitRequest(w http.ResponseWriter, r *http.Request, host
return
}

// Increment after GetOrCreate so unvalidated URLs can't bloat the keyspace.
if isClone, cerr := RequestIsClone(pathValue, r); cerr != nil {
logger.WarnContext(ctx, "Failed to inspect upload-pack body for clone counting", "error", cerr)
} else if isClone {
s.repoCounts.IncrementClone(upstreamURL)
}

state := repo.State()
isInfoRefs := strings.HasSuffix(pathValue, "/info/refs")

Expand Down
41 changes: 34 additions & 7 deletions internal/strategy/git/git_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/block/cachew/internal/githubapp"
"github.com/block/cachew/internal/jobscheduler"
"github.com/block/cachew/internal/logging"
"github.com/block/cachew/internal/metadatadb"
"github.com/block/cachew/internal/strategy/git"
)

Expand Down Expand Up @@ -98,7 +99,7 @@ func TestNew(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
mux := newTestMux()
cm := gitclone.NewManagerProvider(ctx, tt.config, nil)
s, err := git.New(ctx, git.Config{}, newTestScheduler(ctx, t), nil, mux, cm, func() (*githubapp.TokenManager, error) { return nil, nil }) //nolint:nilnil
s, err := git.New(ctx, git.Config{}, newTestScheduler(ctx, t), nil, mux, cm, func() (*githubapp.TokenManager, error) { return nil, nil }, nil) //nolint:nilnil
if tt.wantError != "" {
assert.Error(t, err)
assert.Contains(t, err.Error(), tt.wantError)
Expand Down Expand Up @@ -204,7 +205,7 @@ func TestNewWithExistingCloneOnDisk(t *testing.T) {
MirrorRoot: tmpDir,
FetchInterval: 15,
}, nil)
_, err = git.New(ctx, git.Config{}, newTestScheduler(ctx, t), nil, mux, cm, func() (*githubapp.TokenManager, error) { return nil, nil }) //nolint:nilnil
_, err = git.New(ctx, git.Config{}, newTestScheduler(ctx, t), nil, mux, cm, func() (*githubapp.TokenManager, error) { return nil, nil }, nil) //nolint:nilnil
assert.NoError(t, err)
}

Expand All @@ -220,7 +221,7 @@ func TestNewIsReadyAfterWarm(t *testing.T) {
MirrorRoot: tmpDir,
FetchInterval: 15,
}, nil)
s, err := git.New(ctx, git.Config{}, newTestScheduler(ctx, t), nil, mux, cm, func() (*githubapp.TokenManager, error) { return nil, nil }) //nolint:nilnil
s, err := git.New(ctx, git.Config{}, newTestScheduler(ctx, t), nil, mux, cm, func() (*githubapp.TokenManager, error) { return nil, nil }, nil) //nolint:nilnil
assert.NoError(t, err)

deadline := time.Now().Add(5 * time.Second)
Expand Down Expand Up @@ -249,7 +250,7 @@ func TestIntegrationWithMockUpstream(t *testing.T) {
MirrorRoot: tmpDir,
FetchInterval: 15,
}, nil)
_, err := git.New(ctx, git.Config{}, newTestScheduler(ctx, t), nil, mux, cm, func() (*githubapp.TokenManager, error) { return nil, nil }) //nolint:nilnil
_, err := git.New(ctx, git.Config{}, newTestScheduler(ctx, t), nil, mux, cm, func() (*githubapp.TokenManager, error) { return nil, nil }, nil) //nolint:nilnil
assert.NoError(t, err)

// Verify handlers exist
Expand Down Expand Up @@ -278,7 +279,7 @@ func TestNewMissingGitBinary(t *testing.T) {
MirrorRoot: filepath.Join(tmpDir, "clones"),
FetchInterval: 15,
}, nil)
_, err := git.New(ctx, git.Config{}, newTestScheduler(ctx, t), nil, mux, cm, func() (*githubapp.TokenManager, error) { return nil, nil }) //nolint:nilnil
_, err := git.New(ctx, git.Config{}, newTestScheduler(ctx, t), nil, mux, cm, func() (*githubapp.TokenManager, error) { return nil, nil }, nil) //nolint:nilnil
assert.Error(t, err)
assert.Contains(t, err.Error(), "git")
}
Expand All @@ -302,7 +303,7 @@ func TestNewMissingSnapshotBinaries(t *testing.T) {
MirrorRoot: filepath.Join(tmpDir, "clones-missing-tar"),
FetchInterval: 15,
}, nil)
_, err := git.New(ctx, git.Config{SnapshotInterval: 1}, newTestScheduler(ctx, t), nil, mux, cm, func() (*githubapp.TokenManager, error) { return nil, nil }) //nolint:nilnil
_, err := git.New(ctx, git.Config{SnapshotInterval: 1}, newTestScheduler(ctx, t), nil, mux, cm, func() (*githubapp.TokenManager, error) { return nil, nil }, nil) //nolint:nilnil
assert.Error(t, err)
assert.Contains(t, err.Error(), "tar")
})
Expand All @@ -318,12 +319,38 @@ func TestNewMissingSnapshotBinaries(t *testing.T) {
MirrorRoot: filepath.Join(tmpDir, "clones-missing-zstd"),
FetchInterval: 15,
}, nil)
_, err := git.New(ctx, git.Config{SnapshotInterval: 1}, newTestScheduler(ctx, t), nil, mux, cm, func() (*githubapp.TokenManager, error) { return nil, nil }) //nolint:nilnil
_, err := git.New(ctx, git.Config{SnapshotInterval: 1}, newTestScheduler(ctx, t), nil, mux, cm, func() (*githubapp.TokenManager, error) { return nil, nil }, nil) //nolint:nilnil
assert.Error(t, err)
assert.Contains(t, err.Error(), "zstd")
})
}

// TestNewWithMetadataProvider verifies the metadata store provider is invoked
// and a non-nil store is accepted without error. The behaviour of the resulting
// RepoCounts is covered in repocounts_test.go.
func TestNewWithMetadataProvider(t *testing.T) {
_, ctx := logging.Configure(context.Background(), logging.Config{})

store := metadatadb.New(ctx, metadatadb.NewMemoryBackend())
mux := newTestMux()
cm := gitclone.NewManagerProvider(ctx, gitclone.Config{
MirrorRoot: filepath.Join(t.TempDir(), "clones"),
FetchInterval: 15,
}, nil)

var providerCalls int
_, err := git.New(
ctx, git.Config{}, newTestScheduler(ctx, t), nil, mux, cm,
func() (*githubapp.TokenManager, error) { return nil, nil }, //nolint:nilnil
func() (*metadatadb.Store, error) {
providerCalls++
return store, nil
},
)
assert.NoError(t, err)
assert.Equal(t, 1, providerCalls, "metadata store provider should be invoked exactly once")
}

func TestParseGitRefs(t *testing.T) {
_, ctx := logging.Configure(context.Background(), logging.Config{})
_ = ctx
Expand Down
10 changes: 5 additions & 5 deletions internal/strategy/git/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func TestIntegrationGitCloneViaProxy(t *testing.T) {
mux := http.NewServeMux()
memCache, err := cache.NewMemory(ctx, cache.MemoryConfig{MaxTTL: time.Hour})
assert.NoError(t, err)
_, err = git.New(ctx, git.Config{}, newTestScheduler(ctx, t), memCache, mux, gc, func() (*githubapp.TokenManager, error) { return nil, nil }) //nolint:nilnil
_, err = git.New(ctx, git.Config{}, newTestScheduler(ctx, t), memCache, mux, gc, func() (*githubapp.TokenManager, error) { return nil, nil }, nil) //nolint:nilnil
assert.NoError(t, err)

// Start a test server with logging middleware
Expand Down Expand Up @@ -197,7 +197,7 @@ func TestIntegrationGitFetchViaProxy(t *testing.T) {
mux := http.NewServeMux()
memCache, err := cache.NewMemory(ctx, cache.MemoryConfig{MaxTTL: time.Hour})
assert.NoError(t, err)
_, err = git.New(ctx, git.Config{}, newTestScheduler(ctx, t), memCache, mux, gc, func() (*githubapp.TokenManager, error) { return nil, nil }) //nolint:nilnil
_, err = git.New(ctx, git.Config{}, newTestScheduler(ctx, t), memCache, mux, gc, func() (*githubapp.TokenManager, error) { return nil, nil }, nil) //nolint:nilnil
assert.NoError(t, err)

server := testServerWithLogging(ctx, mux)
Expand Down Expand Up @@ -279,7 +279,7 @@ func TestIntegrationPushForwardsToUpstream(t *testing.T) {
}, nil)
memCache, err := cache.NewMemory(ctx, cache.MemoryConfig{MaxTTL: time.Hour})
assert.NoError(t, err)
_, err = git.New(ctx, git.Config{}, newTestScheduler(ctx, t), memCache, mux, gc, func() (*githubapp.TokenManager, error) { return nil, nil }) //nolint:nilnil
_, err = git.New(ctx, git.Config{}, newTestScheduler(ctx, t), memCache, mux, gc, func() (*githubapp.TokenManager, error) { return nil, nil }, nil) //nolint:nilnil
assert.NoError(t, err)

server := testServerWithLogging(ctx, mux)
Expand Down Expand Up @@ -375,7 +375,7 @@ func TestIntegrationSpoolReusesDuringClone(t *testing.T) {
}, nil)
memCache, err := cache.NewMemory(ctx, cache.MemoryConfig{MaxTTL: time.Hour})
assert.NoError(t, err)
strategy, err := git.New(ctx, git.Config{}, newTestScheduler(ctx, t), memCache, mux, gc, func() (*githubapp.TokenManager, error) { return nil, nil }) //nolint:nilnil
strategy, err := git.New(ctx, git.Config{}, newTestScheduler(ctx, t), memCache, mux, gc, func() (*githubapp.TokenManager, error) { return nil, nil }, nil) //nolint:nilnil
assert.NoError(t, err)

strategy.SetHTTPTransport(&countingTransport{
Expand Down Expand Up @@ -522,7 +522,7 @@ func TestIntegrationNotOurRefFallsBackToUpstream(t *testing.T) {
memCache, err := cache.NewMemory(ctx, cache.MemoryConfig{MaxTTL: time.Hour})
assert.NoError(t, err)
strategy, err := git.New(ctx, git.Config{}, newTestScheduler(ctx, t), memCache, mux, gc,
func() (*githubapp.TokenManager, error) { return nil, nil }) //nolint:nilnil
func() (*githubapp.TokenManager, error) { return nil, nil }, nil) //nolint:nilnil
assert.NoError(t, err)

// Redirect all upstream proxy requests to the mock server.
Expand Down
4 changes: 2 additions & 2 deletions internal/strategy/git/repack_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestRepackInterval(t *testing.T) {
}, nil)
s, err := git.New(ctx, git.Config{
RepackInterval: tt.repackInterval,
}, newTestScheduler(ctx, t), nil, mux, cm, func() (*githubapp.TokenManager, error) { return nil, nil }) //nolint:nilnil
}, newTestScheduler(ctx, t), nil, mux, cm, func() (*githubapp.TokenManager, error) { return nil, nil }, nil) //nolint:nilnil
assert.NoError(t, err)
assert.True(t, s != nil)
})
Expand All @@ -65,7 +65,7 @@ func TestRepackScheduledForExistingRepos(t *testing.T) {
}, nil)
s, err := git.New(ctx, git.Config{
RepackInterval: 24 * time.Hour,
}, newTestScheduler(ctx, t), nil, mux, cm, func() (*githubapp.TokenManager, error) { return nil, nil }) //nolint:nilnil
}, newTestScheduler(ctx, t), nil, mux, cm, func() (*githubapp.TokenManager, error) { return nil, nil }, nil) //nolint:nilnil
assert.NoError(t, err)
assert.True(t, s != nil)
}
Loading