diff --git a/images/dvcr-artifact/pkg/registry/imageinfo.go b/images/dvcr-artifact/pkg/registry/imageinfo.go index c55095f937..0ec0369b67 100644 --- a/images/dvcr-artifact/pkg/registry/imageinfo.go +++ b/images/dvcr-artifact/pkg/registry/imageinfo.go @@ -43,7 +43,7 @@ const ( isoImageType = "iso" ) -func getImageInfo(ctx context.Context, sourceReader io.ReadCloser) (ImageInfo, error) { +func getImageInfo(ctx context.Context, sourceReader io.ReadCloser, sourceImageSize int64) (ImageInfo, error) { initialReadSize := syntheticHeadSize headerBuf := make([]byte, initialReadSize) n, err := io.ReadFull(sourceReader, headerBuf) @@ -71,7 +71,7 @@ func getImageInfo(ctx context.Context, sourceReader io.ReadCloser) (ImageInfo, e return getImageInfoVMDK(ctx, formatSourceReaders.TopReader(), headerBuf) } - return getImageInfoStandard(ctx, formatSourceReaders, headerBuf) + return getImageInfoStandard(ctx, formatSourceReaders, headerBuf, sourceImageSize) } // getImageInfoVMDK obtains information about the VMDK image using a synthetic file. @@ -147,7 +147,7 @@ func getImageInfoVMDK(ctx context.Context, sourceReader io.Reader, headerBuf []b } // getImageInfoStandard handles non-VMDK formats using the first 64MB of the file. -func getImageInfoStandard(ctx context.Context, formatSourceReaders *importer.FormatReaders, headerBuf []byte) (ImageInfo, error) { +func getImageInfoStandard(ctx context.Context, formatSourceReaders *importer.FormatReaders, headerBuf []byte, sourceImageSize int64) (ImageInfo, error) { var tempImageInfoFile *os.File var err error var bytesWrittenToTemp int64 @@ -198,10 +198,17 @@ func getImageInfoStandard(ctx context.Context, formatSourceReaders *importer.For } if imageInfo.Format != "raw" { - // It's necessary to read everything from the original image to avoid blocking. - _, err = io.Copy(&EmptyWriter{}, formatSourceReaders.TopReader()) - if err != nil { - return ImageInfo{}, fmt.Errorf("error copying to nowhere: %w", err) + // VirtualSize is already populated by qemu-img info on the 64MiB temp file. + // For archived formats (gz/xz/zst) we still must drain so the upstream + // TeeReader sees the underlying compressed bytes; for plain qcow2 no + // drain is needed, which lets the main upload run at full speed. + // TODO: drop this branch once the TeeReader in inspectAndStreamSourceImage + // no longer blocks the upload pipeline. + if formatSourceReaders.Archived { + _, err = io.Copy(&EmptyWriter{}, formatSourceReaders.TopReader()) + if err != nil { + return ImageInfo{}, fmt.Errorf("error copying to nowhere: %w", err) + } } return imageInfo, nil @@ -226,14 +233,21 @@ func getImageInfoStandard(ctx context.Context, formatSourceReaders *importer.For imageInfo.Format = isoImageType } - // Count uncompressed size of source image. - n, err := io.Copy(&EmptyWriter{}, formatSourceReaders.TopReader()) - if err != nil { - return ImageInfo{}, fmt.Errorf("error copying to nowhere: %w", err) + // For archived raw/iso we still need to drain to compute the uncompressed + // size; for plain raw/iso the uncompressed size equals the on-wire source + // size, so we avoid the costly full-stream drain. + // TODO: drop this branch once the TeeReader in inspectAndStreamSourceImage + // no longer blocks the upload pipeline. + if formatSourceReaders.Archived { + n, err := io.Copy(&EmptyWriter{}, formatSourceReaders.TopReader()) + if err != nil { + return ImageInfo{}, fmt.Errorf("error copying to nowhere: %w", err) + } + imageInfo.VirtualSize = uint64(bytesWrittenToTemp + n) + } else { + imageInfo.VirtualSize = uint64(sourceImageSize) } - imageInfo.VirtualSize = uint64(bytesWrittenToTemp + n) - return imageInfo, nil } } diff --git a/images/dvcr-artifact/pkg/registry/registry.go b/images/dvcr-artifact/pkg/registry/registry.go index 3f33fa406f..a8b0ef90e2 100644 --- a/images/dvcr-artifact/pkg/registry/registry.go +++ b/images/dvcr-artifact/pkg/registry/registry.go @@ -277,7 +277,7 @@ func (p DataProcessor) inspectAndStreamSourceImage( defer imageInfoWriter.Close() klog.Infoln("Streaming from the source") - doneSize, err := io.Copy(streamWriter, io.TeeReader(sourceImageReader, imageInfoWriter)) + doneSize, err := io.Copy(streamWriter, nonBlockingTeeReader(sourceImageReader, imageInfoWriter)) if err != nil { return fmt.Errorf("error copying from the source: %w", err) } @@ -306,7 +306,7 @@ func (p DataProcessor) inspectAndStreamSourceImage( errsGroup.Go(func() error { defer imageInfoReader.Close() - info, err := getImageInfo(ctx, imageInfoReader) + info, err := getImageInfo(ctx, imageInfoReader, int64(sourceImageSize)) if err != nil { return err } diff --git a/images/dvcr-artifact/pkg/registry/teereader.go b/images/dvcr-artifact/pkg/registry/teereader.go new file mode 100644 index 0000000000..afc40d1c84 --- /dev/null +++ b/images/dvcr-artifact/pkg/registry/teereader.go @@ -0,0 +1,51 @@ +/* +Copyright 2026 Flant JSC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package registry + +import ( + "io" + "sync/atomic" +) + +// nonBlockingTeeReader returns an io.Reader that copies what it reads from r +// into w, but, unlike io.TeeReader, never lets a slow or failed w block or +// fail the read side. +// +// As soon as a write to w returns an error (typically io.ErrClosedPipe once +// the inspect side has finished and closed its end of the pipe), the writer +// is marked done and all subsequent reads bypass w entirely. The error from w +// is intentionally discarded: w is best-effort and must not propagate failures +// to the main upload pipeline. +func nonBlockingTeeReader(r io.Reader, w io.Writer) io.Reader { + return &nonBlockingTee{r: r, w: w} +} + +type nonBlockingTee struct { + r io.Reader + w io.Writer + wDone atomic.Bool +} + +func (t *nonBlockingTee) Read(p []byte) (int, error) { + n, err := t.r.Read(p) + if n > 0 && !t.wDone.Load() { + if _, werr := t.w.Write(p[:n]); werr != nil { + t.wDone.Store(true) + } + } + return n, err +} diff --git a/images/dvcr-artifact/pkg/registry/teereader_test.go b/images/dvcr-artifact/pkg/registry/teereader_test.go new file mode 100644 index 0000000000..3b95e4e512 --- /dev/null +++ b/images/dvcr-artifact/pkg/registry/teereader_test.go @@ -0,0 +1,101 @@ +/* +Copyright 2026 Flant JSC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package registry + +import ( + "bytes" + "errors" + "io" + "strings" + "testing" +) + +// errWriter fails every Write call with a fixed error. +type errWriter struct { + err error + written int +} + +func (w *errWriter) Write(p []byte) (int, error) { + w.written++ + return 0, w.err +} + +func TestNonBlockingTeeReader_MirrorsWritesUntilEOF(t *testing.T) { + src := strings.NewReader("hello world") + var sink bytes.Buffer + + r := nonBlockingTeeReader(src, &sink) + out, err := io.ReadAll(r) + if err != nil { + t.Fatalf("read: unexpected error: %v", err) + } + if string(out) != "hello world" { + t.Fatalf("read returned %q, want %q", string(out), "hello world") + } + if sink.String() != "hello world" { + t.Fatalf("sink got %q, want %q", sink.String(), "hello world") + } +} + +func TestNonBlockingTeeReader_WriteErrorDoesNotPropagate(t *testing.T) { + src := strings.NewReader("abcdefghij") + w := &errWriter{err: io.ErrClosedPipe} + + r := nonBlockingTeeReader(src, w) + out, err := io.ReadAll(r) + if err != nil { + t.Fatalf("read: unexpected error: %v", err) + } + if string(out) != "abcdefghij" { + t.Fatalf("read returned %q, want all input", string(out)) + } +} + +func TestNonBlockingTeeReader_StopsWritingAfterFirstWriteError(t *testing.T) { + // Use a reader that returns data in two chunks so we can observe that + // after the first write fails, the writer is no longer called. + src := io.MultiReader( + strings.NewReader("first-"), + strings.NewReader("second"), + ) + w := &errWriter{err: errors.New("boom")} + + r := nonBlockingTeeReader(src, w) + + buf := make([]byte, 6) + if _, err := io.ReadFull(r, buf); err != nil { + t.Fatalf("first read: %v", err) + } + if string(buf) != "first-" { + t.Fatalf("first chunk got %q, want %q", string(buf), "first-") + } + if w.written != 1 { + t.Fatalf("writer should have been called once before failure, got %d", w.written) + } + + rest, err := io.ReadAll(r) + if err != nil { + t.Fatalf("rest read: %v", err) + } + if string(rest) != "second" { + t.Fatalf("rest got %q, want %q", string(rest), "second") + } + if w.written != 1 { + t.Fatalf("writer must not be called after failure, got %d calls total", w.written) + } +}