diff --git a/internal/strategy/git/backend.go b/internal/strategy/git/backend.go index 7c9d799..b22e6e6 100644 --- a/internal/strategy/git/backend.go +++ b/internal/strategy/git/backend.go @@ -19,20 +19,87 @@ import ( "github.com/block/cachew/internal/logging" ) -func (s *Strategy) serveFromBackend(w http.ResponseWriter, r *http.Request, repo *gitclone.Repository) { +// bufferedResponseWriter buffers the response up to a threshold before writing +// to the underlying writer. Once the threshold is exceeded, the buffer is flushed +// and subsequent writes go directly through. If the threshold is never exceeded, +// the caller must call commit() explicitly, or the response is discarded (allowing +// a clean fallback to a different handler). +type bufferedResponseWriter struct { + w http.ResponseWriter + headers http.Header + buf bytes.Buffer + code int + committed bool + threshold int +} + +func newBufferedResponseWriter(w http.ResponseWriter, threshold int) *bufferedResponseWriter { + return &bufferedResponseWriter{ + w: w, + headers: make(http.Header), + code: http.StatusOK, + threshold: threshold, + } +} + +func (b *bufferedResponseWriter) Header() http.Header { + if b.committed { + return b.w.Header() + } + return b.headers +} + +func (b *bufferedResponseWriter) WriteHeader(code int) { + if !b.committed { + b.code = code + } +} + +func (b *bufferedResponseWriter) Write(p []byte) (int, error) { + if b.committed { + return b.w.Write(p) //nolint:wrapcheck + } + n, _ := b.buf.Write(p) // bytes.Buffer.Write never returns an error + if b.buf.Len() >= b.threshold { + b.commit() + } + return n, nil +} + +// commit flushes the buffered response to the underlying writer. +func (b *bufferedResponseWriter) commit() { + if b.committed { + return + } + b.committed = true + for k, vs := range b.headers { + for _, v := range vs { + b.w.Header().Add(k, v) + } + } + b.w.WriteHeader(b.code) + _, _ = io.Copy(b.w, &b.buf) //nolint:errcheck +} + +// serveFromBackend serves the request using git http-backend against the local +// mirror. It returns true if the request should be retried against upstream — +// specifically when git upload-pack rejects the request with "not our ref", +// indicating the local mirror is missing an object the client wants (typically +// due to a concurrent force-push fetch orphaning a previously-advertised commit). +func (s *Strategy) serveFromBackend(w http.ResponseWriter, r *http.Request, repo *gitclone.Repository) bool { ctx := r.Context() logger := logging.FromContext(ctx) gitPath, err := exec.LookPath("git") if err != nil { httputil.ErrorResponse(w, r, http.StatusInternalServerError, "git not found in PATH") - return + return false } absRoot, err := filepath.Abs(s.cloneManager.Config().MirrorRoot) if err != nil { httputil.ErrorResponse(w, r, http.StatusInternalServerError, "failed to get absolute path") - return + return false } host := r.PathValue("host") @@ -57,6 +124,15 @@ func (s *Strategy) serveFromBackend(w http.ResponseWriter, r *http.Request, repo slog.String("backend_path", backendPath), slog.String("clone_path", repo.Path())) + // Buffer up to 1 MB before committing to the client. "not our ref" errors + // are always tiny (<1 KB), so if the response stays small we can detect and + // suppress the error before any bytes reach the client, allowing a clean + // fallback to upstream. Large successful pack responses exceed the threshold + // and are streamed through normally. + const fallbackThreshold = 1 << 20 // 1 MB + bw := newBufferedResponseWriter(w, fallbackThreshold) + fallback := false + repo.WithReadLock(func() error { //nolint:errcheck,gosec var stderrBuf bytes.Buffer @@ -88,16 +164,24 @@ func (s *Strategy) serveFromBackend(w http.ResponseWriter, r *http.Request, repo r2.TransferEncoding = nil } - handler.ServeHTTP(w, r2) + handler.ServeHTTP(bw, r2) if stderrBuf.Len() > 0 { + stderr := stderrBuf.String() logger.ErrorContext(r.Context(), "git http-backend error", - slog.String("stderr", stderrBuf.String()), + slog.String("stderr", stderr), slog.String("path", backendPath)) + if !bw.committed && strings.Contains(stderr, "not our ref") { + fallback = true + return nil + } } + bw.commit() return nil }) + + return fallback } func (s *Strategy) ensureRefsUpToDate(ctx context.Context, repo *gitclone.Repository) error { diff --git a/internal/strategy/git/git.go b/internal/strategy/git/git.go index 9852980..e1faf70 100644 --- a/internal/strategy/git/git.go +++ b/internal/strategy/git/git.go @@ -216,14 +216,7 @@ func (s *Strategy) handleRequest(w http.ResponseWriter, r *http.Request) { switch state { case gitclone.StateReady: - if isInfoRefs { - if err := s.ensureRefsUpToDate(ctx, repo); err != nil { - logger.WarnContext(ctx, "Failed to ensure refs up to date", - slog.String("error", err.Error())) - } - } - s.maybeBackgroundFetch(repo) - s.serveFromBackend(w, r, repo) + s.serveReadyRepo(w, r, repo, host, pathValue, isInfoRefs) case gitclone.StateCloning, gitclone.StateEmpty: if state == gitclone.StateEmpty { @@ -237,6 +230,50 @@ func (s *Strategy) handleRequest(w http.ResponseWriter, r *http.Request) { } } +func (s *Strategy) serveReadyRepo(w http.ResponseWriter, r *http.Request, repo *gitclone.Repository, host, pathValue string, isInfoRefs bool) { + ctx := r.Context() + logger := logging.FromContext(ctx) + + if isInfoRefs { + if err := s.ensureRefsUpToDate(ctx, repo); err != nil { + logger.WarnContext(ctx, "Failed to ensure refs up to date", + slog.String("error", err.Error())) + } + } + s.maybeBackgroundFetch(repo) + + // Buffer the request body so it can be replayed if serveFromBackend + // signals a fallback to upstream (e.g. on "not our ref"). + var bodyBytes []byte + if r.Body != nil && r.Body != http.NoBody { + var readErr error + bodyBytes, readErr = io.ReadAll(r.Body) + if readErr != nil { + logger.ErrorContext(ctx, "Failed to read request body", + slog.String("error", readErr.Error())) + http.Error(w, "Internal server error", http.StatusInternalServerError) + return + } + r.Body = io.NopCloser(bytes.NewReader(bodyBytes)) + r.ContentLength = int64(len(bodyBytes)) + r.TransferEncoding = nil + } + + if s.serveFromBackend(w, r, repo) { + // The mirror is missing the requested object — most likely a commit + // that was advertised before a concurrent force-push fetch orphaned + // it. Fall back to upstream so the client is not left with an error. + logger.InfoContext(ctx, "Falling back to upstream due to 'not our ref'", + slog.String("path", pathValue)) + if bodyBytes != nil { + r.Body = io.NopCloser(bytes.NewReader(bodyBytes)) + r.ContentLength = int64(len(bodyBytes)) + r.TransferEncoding = nil + } + s.forwardToUpstream(w, r, host, pathValue) + } +} + // SpoolKeyForRequest returns the spool key for a request, or empty string if the // request is not spoolable. For POST requests, the body is hashed to differentiate // protocol v2 commands (e.g. ls-refs vs fetch) that share the same URL. The request diff --git a/internal/strategy/git/integration_test.go b/internal/strategy/git/integration_test.go index 02507aa..7633ae8 100644 --- a/internal/strategy/git/integration_test.go +++ b/internal/strategy/git/integration_test.go @@ -25,6 +25,58 @@ import ( "github.com/block/cachew/internal/strategy/git" ) +// runGit runs a git command, failing the test on error. +func runGit(t *testing.T, dir string, args ...string) { + t.Helper() + cmd := exec.Command("git", args...) + if dir != "" { + cmd.Dir = dir + } + cmd.Env = append(os.Environ(), + "GIT_AUTHOR_NAME=Test", + "GIT_AUTHOR_EMAIL=test@test.com", + "GIT_COMMITTER_NAME=Test", + "GIT_COMMITTER_EMAIL=test@test.com", + ) + out, err := cmd.CombinedOutput() + if err != nil { + t.Fatalf("git %v: %v\n%s", args, err, out) + } +} + +// gitRevParse returns the full SHA for the given ref in dir. +func gitRevParse(t *testing.T, dir, ref string) string { + t.Helper() + cmd := exec.Command("git", "-C", dir, "rev-parse", ref) + out, err := cmd.CombinedOutput() + if err != nil { + t.Fatalf("git rev-parse %s: %v\n%s", ref, err, out) + } + return strings.TrimSpace(string(out)) +} + +// pktLine encodes s as a git pkt-line. +func pktLine(s string) string { + return fmt.Sprintf("%04x%s", len(s)+4, s) +} + +// upstreamRedirectTransport rewrites all outbound requests to targetBaseURL, +// preserving the path and query, and counts each rewrite. +type upstreamRedirectTransport struct { + targetBaseURL string + inner http.RoundTripper + hits *atomic.Int32 +} + +func (t *upstreamRedirectTransport) RoundTrip(req *http.Request) (*http.Response, error) { + t.hits.Add(1) + req = req.Clone(req.Context()) + req.URL.Scheme = "http" + req.URL.Host = strings.TrimPrefix(t.targetBaseURL, "http://") + req.Host = req.URL.Host + return t.inner.RoundTrip(req) +} + // testServerWithLogging creates an httptest.Server that injects a logger into the request context. func testServerWithLogging(ctx context.Context, handler http.Handler) *httptest.Server { wrapper := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { @@ -373,3 +425,124 @@ func TestIntegrationSpoolReusesDuringClone(t *testing.T) { t.Logf("Total upstream upload-pack requests: %d (first clone: %d)", totalCount, firstCloneCount) assert.Equal(t, firstCloneCount, totalCount, "second clone should not have made additional upstream upload-pack requests") } + +// TestIntegrationNotOurRefFallsBackToUpstream reproduces the force-push race +// condition that causes "not our ref" errors and verifies that cachew falls back +// to upstream rather than returning the git protocol error to the client. +// +// The race is: +// 1. Client receives /info/refs showing commit A as the tip of a branch. +// 2. A concurrent background fetch runs, incorporating a force-push that makes +// commit A unreachable from all refs in the local mirror. +// 3. Client sends POST /git-upload-pack requesting commit A. +// 4. git upload-pack rejects the request: "not our ref ". +// +// After the fix, cachew detects the stderr error before any bytes reach the +// client and transparently forwards the request to upstream. +// +// Run with: go test -v -run TestIntegrationNotOurRefFallsBackToUpstream -tags integration -timeout 30s +func TestIntegrationNotOurRefFallsBackToUpstream(t *testing.T) { + if _, err := exec.LookPath("git"); err != nil { + t.Skip("git not found in PATH") + } + + _, ctx := logging.Configure(context.Background(), logging.Config{Level: slog.LevelDebug}) + tmpDir := t.TempDir() + + upstreamDir := filepath.Join(tmpDir, "upstream.git") + workDir := filepath.Join(tmpDir, "work") + clonesDir := filepath.Join(tmpDir, "clones") + + // --- Build the upstream repo with an initial commit --- + runGit(t, "", "init", "--bare", upstreamDir) + runGit(t, "", "clone", upstreamDir, workDir) + err := os.WriteFile(filepath.Join(workDir, "file.txt"), []byte("initial"), 0o644) + assert.NoError(t, err) + runGit(t, workDir, "add", ".") + runGit(t, workDir, "commit", "-m", "initial commit") + runGit(t, workDir, "push", "origin", "HEAD:main") + + orphanedSHA := gitRevParse(t, workDir, "HEAD") + t.Logf("Orphaned SHA will be: %s", orphanedSHA) + + // --- Clone upstream as the cachew mirror --- + // The path must match what cachew derives from the URL /git/local/repo: + // host="local", repoPath="repo" → clonesDir/local/repo + mirrorPath := filepath.Join(clonesDir, "local", "repo") + runGit(t, "", "clone", "--mirror", upstreamDir, mirrorPath) + + // --- Force-push a completely new history to upstream --- + // An orphan branch has no ancestors in common with the current history, + // so after fetching the mirror, orphanedSHA becomes unreachable from all refs. + runGit(t, workDir, "checkout", "--orphan", "replacement") + err = os.WriteFile(filepath.Join(workDir, "file2.txt"), []byte("replacement"), 0o644) + assert.NoError(t, err) + runGit(t, workDir, "add", ".") + runGit(t, workDir, "commit", "-m", "replacement commit") + runGit(t, workDir, "push", "--force", "origin", "replacement:main") + + // Fetch the mirror so it picks up the force-push. + // After this, orphanedSHA is in the ODB but unreachable from any ref. + runGit(t, mirrorPath, "fetch", "--prune") + + // Sanity-check: orphanedSHA is still in the ODB ... + catFile := exec.Command("git", "-C", mirrorPath, "cat-file", "-e", orphanedSHA) + assert.NoError(t, catFile.Run(), "orphaned SHA should still exist in the mirror ODB") + + // ... but is not reachable from any branch. + branchContains := exec.Command("git", "-C", mirrorPath, "branch", "--contains", orphanedSHA) + out, _ := branchContains.CombinedOutput() + assert.Equal(t, "", strings.TrimSpace(string(out)), "orphaned SHA should not be reachable from any branch") + + // --- Mock upstream: records requests, returns a minimal git flush response --- + var upstreamHits atomic.Int32 + mockUpstream := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + upstreamHits.Add(1) + t.Logf("Mock upstream received: %s %s", r.Method, r.URL.Path) + w.Header().Set("Content-Type", "application/x-git-upload-pack-result") + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte("0008NAK\n")) // minimal NAK response + })) + defer mockUpstream.Close() + + // --- Set up cachew pointing at clonesDir --- + mux := http.NewServeMux() + gc := gitclone.NewManagerProvider(ctx, gitclone.Config{ + MirrorRoot: clonesDir, + FetchInterval: 24 * time.Hour, // prevent auto-fetch during the test + }, nil) + strategy, err := git.New(ctx, git.Config{}, newTestScheduler(ctx, t), nil, mux, gc, + func() (*githubapp.TokenManager, error) { return nil, nil }) //nolint:nilnil + assert.NoError(t, err) + + // Redirect all upstream proxy requests to the mock server. + var redirectHits atomic.Int32 + strategy.SetHTTPTransport(&upstreamRedirectTransport{ + targetBaseURL: mockUpstream.URL, + inner: http.DefaultTransport, + hits: &redirectHits, + }) + + cachewServer := testServerWithLogging(ctx, mux) + defer cachewServer.Close() + + // --- Send a raw upload-pack POST requesting the orphaned SHA --- + // This mirrors what a git client does after receiving /info/refs that + // advertised orphanedSHA (before the force-push fetch updated the mirror). + body := pktLine("want "+orphanedSHA+"\n") + "0000" + pktLine("done\n") + req, err := http.NewRequestWithContext(ctx, http.MethodPost, + cachewServer.URL+"/git/local/repo/git-upload-pack", + strings.NewReader(body)) + assert.NoError(t, err) + req.Header.Set("Content-Type", "application/x-git-upload-pack-request") + + resp, err := http.DefaultClient.Do(req) + assert.NoError(t, err) + defer resp.Body.Close() + _, _ = io.Copy(io.Discard, resp.Body) + + // After the fix: the request must have been forwarded to upstream. + // Before the fix: upstreamHits == 0 and this assertion fails. + assert.True(t, upstreamHits.Load() > 0, + "cachew should fall back to upstream when git upload-pack returns 'not our ref'") +}