Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 27 additions & 13 deletions images/dvcr-artifact/pkg/registry/imageinfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ const (
isoImageType = "iso"
)

func getImageInfo(ctx context.Context, sourceReader io.ReadCloser) (ImageInfo, error) {
func getImageInfo(ctx context.Context, sourceReader io.ReadCloser, sourceImageSize int64) (ImageInfo, error) {
initialReadSize := syntheticHeadSize
headerBuf := make([]byte, initialReadSize)
n, err := io.ReadFull(sourceReader, headerBuf)
Expand Down Expand Up @@ -71,7 +71,7 @@ func getImageInfo(ctx context.Context, sourceReader io.ReadCloser) (ImageInfo, e
return getImageInfoVMDK(ctx, formatSourceReaders.TopReader(), headerBuf)
}

return getImageInfoStandard(ctx, formatSourceReaders, headerBuf)
return getImageInfoStandard(ctx, formatSourceReaders, headerBuf, sourceImageSize)
}

// getImageInfoVMDK obtains information about the VMDK image using a synthetic file.
Expand Down Expand Up @@ -147,7 +147,7 @@ func getImageInfoVMDK(ctx context.Context, sourceReader io.Reader, headerBuf []b
}

// getImageInfoStandard handles non-VMDK formats using the first 64MB of the file.
func getImageInfoStandard(ctx context.Context, formatSourceReaders *importer.FormatReaders, headerBuf []byte) (ImageInfo, error) {
func getImageInfoStandard(ctx context.Context, formatSourceReaders *importer.FormatReaders, headerBuf []byte, sourceImageSize int64) (ImageInfo, error) {
var tempImageInfoFile *os.File
var err error
var bytesWrittenToTemp int64
Expand Down Expand Up @@ -198,10 +198,17 @@ func getImageInfoStandard(ctx context.Context, formatSourceReaders *importer.For
}

if imageInfo.Format != "raw" {
// It's necessary to read everything from the original image to avoid blocking.
_, err = io.Copy(&EmptyWriter{}, formatSourceReaders.TopReader())
if err != nil {
return ImageInfo{}, fmt.Errorf("error copying to nowhere: %w", err)
// VirtualSize is already populated by qemu-img info on the 64MiB temp file.
// For archived formats (gz/xz/zst) we still must drain so the upstream
// TeeReader sees the underlying compressed bytes; for plain qcow2 no
// drain is needed, which lets the main upload run at full speed.
// TODO: drop this branch once the TeeReader in inspectAndStreamSourceImage
// no longer blocks the upload pipeline.
if formatSourceReaders.Archived {
_, err = io.Copy(&EmptyWriter{}, formatSourceReaders.TopReader())
if err != nil {
return ImageInfo{}, fmt.Errorf("error copying to nowhere: %w", err)
}
}

return imageInfo, nil
Expand All @@ -226,14 +233,21 @@ func getImageInfoStandard(ctx context.Context, formatSourceReaders *importer.For
imageInfo.Format = isoImageType
}

// Count uncompressed size of source image.
n, err := io.Copy(&EmptyWriter{}, formatSourceReaders.TopReader())
if err != nil {
return ImageInfo{}, fmt.Errorf("error copying to nowhere: %w", err)
// For archived raw/iso we still need to drain to compute the uncompressed
// size; for plain raw/iso the uncompressed size equals the on-wire source
// size, so we avoid the costly full-stream drain.
// TODO: drop this branch once the TeeReader in inspectAndStreamSourceImage
// no longer blocks the upload pipeline.
if formatSourceReaders.Archived {
n, err := io.Copy(&EmptyWriter{}, formatSourceReaders.TopReader())
if err != nil {
return ImageInfo{}, fmt.Errorf("error copying to nowhere: %w", err)
}
imageInfo.VirtualSize = uint64(bytesWrittenToTemp + n)
} else {
imageInfo.VirtualSize = uint64(sourceImageSize)
}

imageInfo.VirtualSize = uint64(bytesWrittenToTemp + n)

return imageInfo, nil
}
}
Expand Down
4 changes: 2 additions & 2 deletions images/dvcr-artifact/pkg/registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ func (p DataProcessor) inspectAndStreamSourceImage(
defer imageInfoWriter.Close()

klog.Infoln("Streaming from the source")
doneSize, err := io.Copy(streamWriter, io.TeeReader(sourceImageReader, imageInfoWriter))
doneSize, err := io.Copy(streamWriter, nonBlockingTeeReader(sourceImageReader, imageInfoWriter))
if err != nil {
return fmt.Errorf("error copying from the source: %w", err)
}
Expand Down Expand Up @@ -306,7 +306,7 @@ func (p DataProcessor) inspectAndStreamSourceImage(
errsGroup.Go(func() error {
defer imageInfoReader.Close()

info, err := getImageInfo(ctx, imageInfoReader)
info, err := getImageInfo(ctx, imageInfoReader, int64(sourceImageSize))
if err != nil {
return err
}
Expand Down
51 changes: 51 additions & 0 deletions images/dvcr-artifact/pkg/registry/teereader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
Copyright 2026 Flant JSC

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package registry

import (
"io"
"sync/atomic"
)

// nonBlockingTeeReader returns an io.Reader that copies what it reads from r
// into w, but, unlike io.TeeReader, never lets a slow or failed w block or
// fail the read side.
//
// As soon as a write to w returns an error (typically io.ErrClosedPipe once
// the inspect side has finished and closed its end of the pipe), the writer
// is marked done and all subsequent reads bypass w entirely. The error from w
// is intentionally discarded: w is best-effort and must not propagate failures
// to the main upload pipeline.
func nonBlockingTeeReader(r io.Reader, w io.Writer) io.Reader {
return &nonBlockingTee{r: r, w: w}
}

type nonBlockingTee struct {
r io.Reader
w io.Writer
wDone atomic.Bool
}

func (t *nonBlockingTee) Read(p []byte) (int, error) {
n, err := t.r.Read(p)
if n > 0 && !t.wDone.Load() {
if _, werr := t.w.Write(p[:n]); werr != nil {
t.wDone.Store(true)
}
}
return n, err
}
101 changes: 101 additions & 0 deletions images/dvcr-artifact/pkg/registry/teereader_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
Copyright 2026 Flant JSC

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package registry

import (
"bytes"
"errors"
"io"
"strings"
"testing"
)

// errWriter fails every Write call with a fixed error.
type errWriter struct {
err error
written int
}

func (w *errWriter) Write(p []byte) (int, error) {
w.written++
return 0, w.err
}

func TestNonBlockingTeeReader_MirrorsWritesUntilEOF(t *testing.T) {
src := strings.NewReader("hello world")
var sink bytes.Buffer

r := nonBlockingTeeReader(src, &sink)
out, err := io.ReadAll(r)
if err != nil {
t.Fatalf("read: unexpected error: %v", err)
}
if string(out) != "hello world" {
t.Fatalf("read returned %q, want %q", string(out), "hello world")
}
if sink.String() != "hello world" {
t.Fatalf("sink got %q, want %q", sink.String(), "hello world")
}
}

func TestNonBlockingTeeReader_WriteErrorDoesNotPropagate(t *testing.T) {
src := strings.NewReader("abcdefghij")
w := &errWriter{err: io.ErrClosedPipe}

r := nonBlockingTeeReader(src, w)
out, err := io.ReadAll(r)
if err != nil {
t.Fatalf("read: unexpected error: %v", err)
}
if string(out) != "abcdefghij" {
t.Fatalf("read returned %q, want all input", string(out))
}
}

func TestNonBlockingTeeReader_StopsWritingAfterFirstWriteError(t *testing.T) {
// Use a reader that returns data in two chunks so we can observe that
// after the first write fails, the writer is no longer called.
src := io.MultiReader(
strings.NewReader("first-"),
strings.NewReader("second"),
)
w := &errWriter{err: errors.New("boom")}

r := nonBlockingTeeReader(src, w)

buf := make([]byte, 6)
if _, err := io.ReadFull(r, buf); err != nil {
t.Fatalf("first read: %v", err)
}
if string(buf) != "first-" {
t.Fatalf("first chunk got %q, want %q", string(buf), "first-")
}
if w.written != 1 {
t.Fatalf("writer should have been called once before failure, got %d", w.written)
}

rest, err := io.ReadAll(r)
if err != nil {
t.Fatalf("rest read: %v", err)
}
if string(rest) != "second" {
t.Fatalf("rest got %q, want %q", string(rest), "second")
}
if w.written != 1 {
t.Fatalf("writer must not be called after failure, got %d calls total", w.written)
}
}
Loading