From b685fd84fafd9c1a3bf5b785e77fcad3f8d485a0 Mon Sep 17 00:00:00 2001 From: Vikrant Puppala Date: Tue, 12 May 2026 09:13:50 +0000 Subject: [PATCH] Fix CloudFetch goroutine leak that retains Arrow buffers after Close The cloudFetchDownloadTask goroutine sends its result on an unbuffered channel without respecting context cancellation. When the iterator is closed mid-download (timeout, error, or consumer dropping the result set), in-flight goroutines that have already finished their HTTP read sit blocked on the send forever, pinning the downloaded buffer in the Go heap. Under bursts of large CloudFetch queries this manifests as a multi-GiB heap plateau that only releases on process restart. Fix: route the channel send through a helper that selects on ctx.Done(), so cancellation via task.cancel() (already issued from cloudIPCStreamIterator.Close) drains the goroutine and frees its buffer. Closes #356 Signed-off-by: Vikrant Puppala --- internal/rows/arrowbased/batchloader.go | 17 +++- internal/rows/arrowbased/batchloader_test.go | 97 ++++++++++++++++++++ 2 files changed, 111 insertions(+), 3 deletions(-) diff --git a/internal/rows/arrowbased/batchloader.go b/internal/rows/arrowbased/batchloader.go index 2d86478..aafff8f 100644 --- a/internal/rows/arrowbased/batchloader.go +++ b/internal/rows/arrowbased/batchloader.go @@ -297,7 +297,7 @@ func (cft *cloudFetchDownloadTask) Run() { downloadStart := time.Now() data, err := fetchBatchBytes(cft.ctx, cft.link, cft.minTimeToExpiry, cft.speedThresholdMbps, cft.httpClient) if err != nil { - cft.resultChan <- cloudFetchDownloadTaskResult{data: nil, err: err} + cft.sendResult(cloudFetchDownloadTaskResult{data: nil, err: err}) return } @@ -306,7 +306,7 @@ func (cft *cloudFetchDownloadTask) Run() { data.Close() //nolint:errcheck,gosec // G104: close after reading data downloadMs := time.Since(downloadStart).Milliseconds() if err != nil { - cft.resultChan <- cloudFetchDownloadTaskResult{data: nil, err: err} + cft.sendResult(cloudFetchDownloadTaskResult{data: nil, err: err}) return } @@ -316,10 +316,21 @@ func (cft *cloudFetchDownloadTask) Run() { cft.link.RowCount, ) - cft.resultChan <- cloudFetchDownloadTaskResult{data: bytes.NewReader(buf), err: nil, downloadMs: downloadMs} + cft.sendResult(cloudFetchDownloadTaskResult{data: bytes.NewReader(buf), err: nil, downloadMs: downloadMs}) }() } +// sendResult delivers the download result to the consumer, but drops it if the +// task's context has already been cancelled. Without this guard, a goroutine +// that finishes its work after the iterator is closed blocks forever on the +// unbuffered resultChan and pins the downloaded buffer in the heap (issue #356). +func (cft *cloudFetchDownloadTask) sendResult(result cloudFetchDownloadTaskResult) { + select { + case cft.resultChan <- result: + case <-cft.ctx.Done(): + } +} + // logCloudFetchSpeed calculates and logs download speed metrics func logCloudFetchSpeed(fullURL string, contentLength int64, duration time.Duration, speedThresholdMbps float64) { if contentLength > 0 && duration.Seconds() > 0 { diff --git a/internal/rows/arrowbased/batchloader_test.go b/internal/rows/arrowbased/batchloader_test.go index 59947ff..52e7dc4 100644 --- a/internal/rows/arrowbased/batchloader_test.go +++ b/internal/rows/arrowbased/batchloader_test.go @@ -6,7 +6,10 @@ import ( "fmt" "net/http" "net/http/httptest" + "runtime" + "strings" "sync" + "sync/atomic" "testing" "time" @@ -604,3 +607,97 @@ func generateMockArrowBytes(record arrow.Record) []byte { } return buf.Bytes() } + +// TestCloudFetchIterator_CloseReleasesInFlightDownloads reproduces issue #356: +// when the consumer closes the iterator while downloads are still in flight, +// goroutines that completed their HTTP fetch get permanently blocked sending +// to the unbuffered resultChan. They retain the downloaded buffers (Arrow +// allocations in earlier versions, raw bytes in current code) until process +// exit, producing a heap plateau that only releases on restart. +// +// The test schedules many concurrent downloads, lets them complete, and then +// closes the iterator without consuming the queued results. After Close +// returns, no cloudFetchDownloadTask goroutines must remain. +func TestCloudFetchIterator_CloseReleasesInFlightDownloads(t *testing.T) { + arrowBytes := generateMockArrowBytes(generateArrowRecord()) + + // Track in-flight downloads. The server signals each request as it starts + // and waits on a release channel so the test can hold downloads in the + // queued-but-not-yet-consumed state before closing the iterator. + var inFlight atomic.Int64 + release := make(chan struct{}) + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + inFlight.Add(1) + <-release + w.WriteHeader(http.StatusOK) + _, _ = w.Write(arrowBytes) + })) + defer server.Close() + + const nLinks = 20 + links := make([]*cli_service.TSparkArrowResultLink, nLinks) + for i := range links { + links[i] = &cli_service.TSparkArrowResultLink{ + FileLink: server.URL, + ExpiryTime: time.Now().Add(10 * time.Minute).Unix(), + StartRowOffset: int64(i), + RowCount: 1, + } + } + + cfg := config.WithDefaults() + cfg.UseLz4Compression = false + cfg.MaxDownloadThreads = 10 + + bi, err := NewCloudBatchIterator(context.Background(), links, 0, nil, cfg, nil) + assert.Nil(t, err) + + // Kick off the first batch download. The iterator schedules + // MaxDownloadThreads concurrent fetches behind the scenes. + go func() { _, _ = bi.Next() }() + + // Wait for all MaxDownloadThreads goroutines to be blocked inside the + // server handler (they've issued the GET and are waiting for the body). + assert.Eventually(t, func() bool { + return inFlight.Load() == int64(cfg.MaxDownloadThreads) + }, 5*time.Second, 10*time.Millisecond, "expected %d in-flight downloads", cfg.MaxDownloadThreads) + + // Release the downloads so each goroutine finishes its HTTP read and + // attempts to send its result on the unbuffered resultChan. Only the + // first task's result will be read (by the Next() call above); the rest + // will be queued, blocked on the send. + close(release) + + // Give the goroutines time to finish their HTTP work and reach the + // channel send. + time.Sleep(200 * time.Millisecond) + + // Close the iterator without consuming the remaining batches. + bi.Close() + + // After Close, every cloudFetchDownloadTask goroutine must exit. We don't + // compare against the total goroutine count because httptest keeps + // persistent server/transport goroutines around — we look only for our + // own download goroutines. + assert.Eventually(t, func() bool { + return countDownloadTaskGoroutines() == 0 + }, 5*time.Second, 50*time.Millisecond, + "cloudFetchDownloadTask goroutines leaked after Close: have %d", + countDownloadTaskGoroutines()) +} + +// countDownloadTaskGoroutines returns the number of live goroutines whose +// stack includes cloudFetchDownloadTask.Run. Used to detect the leak in +// issue #356. +func countDownloadTaskGoroutines() int { + buf := make([]byte, 64*1024) + for { + n := runtime.Stack(buf, true) + if n < len(buf) { + buf = buf[:n] + break + } + buf = make([]byte, 2*len(buf)) + } + return strings.Count(string(buf), "cloudFetchDownloadTask).Run") +}