Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 89 additions & 5 deletions internal/strategy/git/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,87 @@ import (
"github.com/block/cachew/internal/logging"
)

func (s *Strategy) serveFromBackend(w http.ResponseWriter, r *http.Request, repo *gitclone.Repository) {
// bufferedResponseWriter buffers the response up to a threshold before writing
// to the underlying writer. Once the threshold is exceeded, the buffer is flushed
// and subsequent writes go directly through. If the threshold is never exceeded,
// the caller must call commit() explicitly, or the response is discarded (allowing
// a clean fallback to a different handler).
type bufferedResponseWriter struct {
w http.ResponseWriter
headers http.Header
buf bytes.Buffer
code int
committed bool
threshold int
}

func newBufferedResponseWriter(w http.ResponseWriter, threshold int) *bufferedResponseWriter {
return &bufferedResponseWriter{
w: w,
headers: make(http.Header),
code: http.StatusOK,
threshold: threshold,
}
}

func (b *bufferedResponseWriter) Header() http.Header {
if b.committed {
return b.w.Header()
}
return b.headers
}

func (b *bufferedResponseWriter) WriteHeader(code int) {
if !b.committed {
b.code = code
}
}

func (b *bufferedResponseWriter) Write(p []byte) (int, error) {
if b.committed {
return b.w.Write(p) //nolint:wrapcheck
}
n, _ := b.buf.Write(p) // bytes.Buffer.Write never returns an error
if b.buf.Len() >= b.threshold {
b.commit()
}
return n, nil
}

// commit flushes the buffered response to the underlying writer.
func (b *bufferedResponseWriter) commit() {
if b.committed {
return
}
b.committed = true
for k, vs := range b.headers {
for _, v := range vs {
b.w.Header().Add(k, v)
}
}
b.w.WriteHeader(b.code)
_, _ = io.Copy(b.w, &b.buf) //nolint:errcheck
}

// serveFromBackend serves the request using git http-backend against the local
// mirror. It returns true if the request should be retried against upstream —
// specifically when git upload-pack rejects the request with "not our ref",
// indicating the local mirror is missing an object the client wants (typically
// due to a concurrent force-push fetch orphaning a previously-advertised commit).
func (s *Strategy) serveFromBackend(w http.ResponseWriter, r *http.Request, repo *gitclone.Repository) bool {
ctx := r.Context()
logger := logging.FromContext(ctx)

gitPath, err := exec.LookPath("git")
if err != nil {
httputil.ErrorResponse(w, r, http.StatusInternalServerError, "git not found in PATH")
return
return false
}

absRoot, err := filepath.Abs(s.cloneManager.Config().MirrorRoot)
if err != nil {
httputil.ErrorResponse(w, r, http.StatusInternalServerError, "failed to get absolute path")
return
return false
}

host := r.PathValue("host")
Expand All @@ -57,6 +124,15 @@ func (s *Strategy) serveFromBackend(w http.ResponseWriter, r *http.Request, repo
slog.String("backend_path", backendPath),
slog.String("clone_path", repo.Path()))

// Buffer up to 1 MB before committing to the client. "not our ref" errors
// are always tiny (<1 KB), so if the response stays small we can detect and
// suppress the error before any bytes reach the client, allowing a clean
// fallback to upstream. Large successful pack responses exceed the threshold
// and are streamed through normally.
const fallbackThreshold = 1 << 20 // 1 MB
bw := newBufferedResponseWriter(w, fallbackThreshold)
fallback := false

repo.WithReadLock(func() error { //nolint:errcheck,gosec
var stderrBuf bytes.Buffer

Expand Down Expand Up @@ -88,16 +164,24 @@ func (s *Strategy) serveFromBackend(w http.ResponseWriter, r *http.Request, repo
r2.TransferEncoding = nil
}

handler.ServeHTTP(w, r2)
handler.ServeHTTP(bw, r2)

if stderrBuf.Len() > 0 {
stderr := stderrBuf.String()
logger.ErrorContext(r.Context(), "git http-backend error",
slog.String("stderr", stderrBuf.String()),
slog.String("stderr", stderr),
slog.String("path", backendPath))
if !bw.committed && strings.Contains(stderr, "not our ref") {
fallback = true
return nil
}
}

bw.commit()
return nil
})

return fallback
}

func (s *Strategy) ensureRefsUpToDate(ctx context.Context, repo *gitclone.Repository) error {
Expand Down
53 changes: 45 additions & 8 deletions internal/strategy/git/git.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,14 +216,7 @@ func (s *Strategy) handleRequest(w http.ResponseWriter, r *http.Request) {

switch state {
case gitclone.StateReady:
if isInfoRefs {
if err := s.ensureRefsUpToDate(ctx, repo); err != nil {
logger.WarnContext(ctx, "Failed to ensure refs up to date",
slog.String("error", err.Error()))
}
}
s.maybeBackgroundFetch(repo)
s.serveFromBackend(w, r, repo)
s.serveReadyRepo(w, r, repo, host, pathValue, isInfoRefs)

case gitclone.StateCloning, gitclone.StateEmpty:
if state == gitclone.StateEmpty {
Expand All @@ -237,6 +230,50 @@ func (s *Strategy) handleRequest(w http.ResponseWriter, r *http.Request) {
}
}

func (s *Strategy) serveReadyRepo(w http.ResponseWriter, r *http.Request, repo *gitclone.Repository, host, pathValue string, isInfoRefs bool) {
ctx := r.Context()
logger := logging.FromContext(ctx)

if isInfoRefs {
if err := s.ensureRefsUpToDate(ctx, repo); err != nil {
logger.WarnContext(ctx, "Failed to ensure refs up to date",
slog.String("error", err.Error()))
}
}
s.maybeBackgroundFetch(repo)

// Buffer the request body so it can be replayed if serveFromBackend
// signals a fallback to upstream (e.g. on "not our ref").
var bodyBytes []byte
if r.Body != nil && r.Body != http.NoBody {
var readErr error
bodyBytes, readErr = io.ReadAll(r.Body)
if readErr != nil {
logger.ErrorContext(ctx, "Failed to read request body",
slog.String("error", readErr.Error()))
http.Error(w, "Internal server error", http.StatusInternalServerError)
return
}
r.Body = io.NopCloser(bytes.NewReader(bodyBytes))
r.ContentLength = int64(len(bodyBytes))
r.TransferEncoding = nil
}

if s.serveFromBackend(w, r, repo) {
// The mirror is missing the requested object — most likely a commit
// that was advertised before a concurrent force-push fetch orphaned
// it. Fall back to upstream so the client is not left with an error.
logger.InfoContext(ctx, "Falling back to upstream due to 'not our ref'",
slog.String("path", pathValue))
if bodyBytes != nil {
r.Body = io.NopCloser(bytes.NewReader(bodyBytes))
r.ContentLength = int64(len(bodyBytes))
r.TransferEncoding = nil
}
s.forwardToUpstream(w, r, host, pathValue)
}
}

// SpoolKeyForRequest returns the spool key for a request, or empty string if the
// request is not spoolable. For POST requests, the body is hashed to differentiate
// protocol v2 commands (e.g. ls-refs vs fetch) that share the same URL. The request
Expand Down
173 changes: 173 additions & 0 deletions internal/strategy/git/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,58 @@ import (
"github.com/block/cachew/internal/strategy/git"
)

// runGit runs a git command, failing the test on error.
func runGit(t *testing.T, dir string, args ...string) {
t.Helper()
cmd := exec.Command("git", args...)
if dir != "" {
cmd.Dir = dir
}
cmd.Env = append(os.Environ(),
"GIT_AUTHOR_NAME=Test",
"GIT_AUTHOR_EMAIL=test@test.com",
"GIT_COMMITTER_NAME=Test",
"GIT_COMMITTER_EMAIL=test@test.com",
)
out, err := cmd.CombinedOutput()
if err != nil {
t.Fatalf("git %v: %v\n%s", args, err, out)
}
}

// gitRevParse returns the full SHA for the given ref in dir.
func gitRevParse(t *testing.T, dir, ref string) string {
t.Helper()
cmd := exec.Command("git", "-C", dir, "rev-parse", ref)
out, err := cmd.CombinedOutput()
if err != nil {
t.Fatalf("git rev-parse %s: %v\n%s", ref, err, out)
}
return strings.TrimSpace(string(out))
}

// pktLine encodes s as a git pkt-line.
func pktLine(s string) string {
return fmt.Sprintf("%04x%s", len(s)+4, s)
}

// upstreamRedirectTransport rewrites all outbound requests to targetBaseURL,
// preserving the path and query, and counts each rewrite.
type upstreamRedirectTransport struct {
targetBaseURL string
inner http.RoundTripper
hits *atomic.Int32
}

func (t *upstreamRedirectTransport) RoundTrip(req *http.Request) (*http.Response, error) {
t.hits.Add(1)
req = req.Clone(req.Context())
req.URL.Scheme = "http"
req.URL.Host = strings.TrimPrefix(t.targetBaseURL, "http://")
req.Host = req.URL.Host
return t.inner.RoundTrip(req)
}

// testServerWithLogging creates an httptest.Server that injects a logger into the request context.
func testServerWithLogging(ctx context.Context, handler http.Handler) *httptest.Server {
wrapper := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
Expand Down Expand Up @@ -373,3 +425,124 @@ func TestIntegrationSpoolReusesDuringClone(t *testing.T) {
t.Logf("Total upstream upload-pack requests: %d (first clone: %d)", totalCount, firstCloneCount)
assert.Equal(t, firstCloneCount, totalCount, "second clone should not have made additional upstream upload-pack requests")
}

// TestIntegrationNotOurRefFallsBackToUpstream reproduces the force-push race
// condition that causes "not our ref" errors and verifies that cachew falls back
// to upstream rather than returning the git protocol error to the client.
//
// The race is:
// 1. Client receives /info/refs showing commit A as the tip of a branch.
// 2. A concurrent background fetch runs, incorporating a force-push that makes
// commit A unreachable from all refs in the local mirror.
// 3. Client sends POST /git-upload-pack requesting commit A.
// 4. git upload-pack rejects the request: "not our ref <A>".
//
// After the fix, cachew detects the stderr error before any bytes reach the
// client and transparently forwards the request to upstream.
//
// Run with: go test -v -run TestIntegrationNotOurRefFallsBackToUpstream -tags integration -timeout 30s
func TestIntegrationNotOurRefFallsBackToUpstream(t *testing.T) {
if _, err := exec.LookPath("git"); err != nil {
t.Skip("git not found in PATH")
}

_, ctx := logging.Configure(context.Background(), logging.Config{Level: slog.LevelDebug})
tmpDir := t.TempDir()

upstreamDir := filepath.Join(tmpDir, "upstream.git")
workDir := filepath.Join(tmpDir, "work")
clonesDir := filepath.Join(tmpDir, "clones")

// --- Build the upstream repo with an initial commit ---
runGit(t, "", "init", "--bare", upstreamDir)
runGit(t, "", "clone", upstreamDir, workDir)
err := os.WriteFile(filepath.Join(workDir, "file.txt"), []byte("initial"), 0o644)
assert.NoError(t, err)
runGit(t, workDir, "add", ".")
runGit(t, workDir, "commit", "-m", "initial commit")
runGit(t, workDir, "push", "origin", "HEAD:main")

orphanedSHA := gitRevParse(t, workDir, "HEAD")
t.Logf("Orphaned SHA will be: %s", orphanedSHA)

// --- Clone upstream as the cachew mirror ---
// The path must match what cachew derives from the URL /git/local/repo:
// host="local", repoPath="repo" → clonesDir/local/repo
mirrorPath := filepath.Join(clonesDir, "local", "repo")
runGit(t, "", "clone", "--mirror", upstreamDir, mirrorPath)

// --- Force-push a completely new history to upstream ---
// An orphan branch has no ancestors in common with the current history,
// so after fetching the mirror, orphanedSHA becomes unreachable from all refs.
runGit(t, workDir, "checkout", "--orphan", "replacement")
err = os.WriteFile(filepath.Join(workDir, "file2.txt"), []byte("replacement"), 0o644)
assert.NoError(t, err)
runGit(t, workDir, "add", ".")
runGit(t, workDir, "commit", "-m", "replacement commit")
runGit(t, workDir, "push", "--force", "origin", "replacement:main")

// Fetch the mirror so it picks up the force-push.
// After this, orphanedSHA is in the ODB but unreachable from any ref.
runGit(t, mirrorPath, "fetch", "--prune")

// Sanity-check: orphanedSHA is still in the ODB ...
catFile := exec.Command("git", "-C", mirrorPath, "cat-file", "-e", orphanedSHA)
assert.NoError(t, catFile.Run(), "orphaned SHA should still exist in the mirror ODB")

// ... but is not reachable from any branch.
branchContains := exec.Command("git", "-C", mirrorPath, "branch", "--contains", orphanedSHA)
out, _ := branchContains.CombinedOutput()
assert.Equal(t, "", strings.TrimSpace(string(out)), "orphaned SHA should not be reachable from any branch")

// --- Mock upstream: records requests, returns a minimal git flush response ---
var upstreamHits atomic.Int32
mockUpstream := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
upstreamHits.Add(1)
t.Logf("Mock upstream received: %s %s", r.Method, r.URL.Path)
w.Header().Set("Content-Type", "application/x-git-upload-pack-result")
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte("0008NAK\n")) // minimal NAK response
}))
defer mockUpstream.Close()

// --- Set up cachew pointing at clonesDir ---
mux := http.NewServeMux()
gc := gitclone.NewManagerProvider(ctx, gitclone.Config{
MirrorRoot: clonesDir,
FetchInterval: 24 * time.Hour, // prevent auto-fetch during the test
}, nil)
strategy, err := git.New(ctx, git.Config{}, newTestScheduler(ctx, t), nil, mux, gc,
func() (*githubapp.TokenManager, error) { return nil, nil }) //nolint:nilnil
assert.NoError(t, err)

// Redirect all upstream proxy requests to the mock server.
var redirectHits atomic.Int32
strategy.SetHTTPTransport(&upstreamRedirectTransport{
targetBaseURL: mockUpstream.URL,
inner: http.DefaultTransport,
hits: &redirectHits,
})

cachewServer := testServerWithLogging(ctx, mux)
defer cachewServer.Close()

// --- Send a raw upload-pack POST requesting the orphaned SHA ---
// This mirrors what a git client does after receiving /info/refs that
// advertised orphanedSHA (before the force-push fetch updated the mirror).
body := pktLine("want "+orphanedSHA+"\n") + "0000" + pktLine("done\n")
req, err := http.NewRequestWithContext(ctx, http.MethodPost,
cachewServer.URL+"/git/local/repo/git-upload-pack",
strings.NewReader(body))
assert.NoError(t, err)
req.Header.Set("Content-Type", "application/x-git-upload-pack-request")

resp, err := http.DefaultClient.Do(req)
assert.NoError(t, err)
defer resp.Body.Close()
_, _ = io.Copy(io.Discard, resp.Body)

// After the fix: the request must have been forwarded to upstream.
// Before the fix: upstreamHits == 0 and this assertion fails.
assert.True(t, upstreamHits.Load() > 0,
"cachew should fall back to upstream when git upload-pack returns 'not our ref'")
}