Skip to content

Commit 915b1a6

Browse files
committed
tar/asm: add NewInputTarStreamWithDone + tests
Refactor tar disassembly into a shared `runInputTarStream`, `runInputTarStreamGoroutine` and `newInputTarStreamCommon` setup helper. Add `NewInputTarStreamWithDone`, which returns an `io.ReadCloser` together with a `done` channel that is signaled when the internal goroutine finishes (or fails), including the final padding-draining phase. Preserve existing `NewInputTarStream` behavior and API. Add unit tests covering: - successful full reads - early consumer close - packer error propagation - underlying reader failure while the tar-split goroutine is still running Motivation: callers need a reliable way to (1) abort consumption when they fail early and (2) block until the background goroutine has terminated so the underlying input reader can be safely released/reused. The wrapper guarantees the protocol (close pipe + send exactly one done value) even on panics, while preserving prompt termination when the consumer closes early. Relates: containers/container-libs#148 Signed-off-by: Jan Kaluza <jkaluza@redhat.com>
1 parent e93e901 commit 915b1a6

2 files changed

Lines changed: 560 additions & 119 deletions

File tree

tar/asm/disassemble.go

Lines changed: 201 additions & 119 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,168 @@
11
package asm
22

33
import (
4+
"errors"
45
"io"
56

67
"github.com/vbatts/tar-split/archive/tar"
78
"github.com/vbatts/tar-split/tar/storage"
89
)
910

10-
// NewInputTarStream wraps the Reader stream of a tar archive and provides a
11-
// Reader stream of the same.
11+
// runInputTarStreamGoroutine is the goroutine entrypoint.
1212
//
13-
// In the middle it will pack the segments and file metadata to storage.Packer
14-
// `p`.
13+
// It centralizes the goroutine protocol so the core parsing logic can be
14+
// written as ordinary Go code that just "returns an error".
1515
//
16-
// The the storage.FilePutter is where payload of files in the stream are
17-
// stashed. If this stashing is not needed, you can provide a nil
18-
// storage.FilePutter. Since the checksumming is still needed, then a default
19-
// of NewDiscardFilePutter will be used internally
20-
func NewInputTarStream(r io.Reader, p storage.Packer, fp storage.FilePutter) (io.Reader, error) {
16+
// Protocol guarantees:
17+
// - pW is always closed exactly once (CloseWithError(nil) == Close()).
18+
// - if done != nil, exactly one value is sent (nil on success, non-nil on failure).
19+
// - panics are converted into a non-nil error (and the panic is rethrown).
20+
func runInputTarStreamGoroutine(outputRdr io.Reader, pW *io.PipeWriter, p storage.Packer, fp storage.FilePutter, done chan<- error) {
21+
// Default to a non-nil error so a panic can't accidentally look like success.
22+
err := errors.New("panic in runInputTarStream")
23+
defer func() {
24+
// CloseWithError(nil) is equivalent to Close().
25+
pW.CloseWithError(err)
26+
27+
if done != nil {
28+
done <- err
29+
}
30+
31+
// Preserve panic semantics while still ensuring the protocol above runs.
32+
if r := recover(); r != nil {
33+
panic(r)
34+
}
35+
}()
36+
37+
err = runInputTarStream(outputRdr, p, fp)
38+
}
39+
40+
// runInputTarStream drives tar-split parsing.
41+
//
42+
// It reads a tar stream from outputRdr and records tar-split metadata into the
43+
// provided storage.Packer.
44+
//
45+
// Abort behavior: if the consumer closes the read end early, the tee reader will
46+
// stop producing bytes (due to pipe write failure) and tar parsing will return
47+
// an error. We propagate that error so the goroutine terminates promptly rather
48+
// than draining the input stream for no benefit.
49+
func runInputTarStream(outputRdr io.Reader, p storage.Packer, fp storage.FilePutter) error {
50+
tr := tar.NewReader(outputRdr)
51+
tr.RawAccounting = true
52+
53+
for {
54+
hdr, err := tr.Next()
55+
if err != nil {
56+
if err != io.EOF {
57+
return err
58+
}
59+
// Even when EOF is reached, there is often 1024 null bytes at the end
60+
// of an archive. Collect them too.
61+
if b := tr.RawBytes(); len(b) > 0 {
62+
if _, err := p.AddEntry(storage.Entry{
63+
Type: storage.SegmentType,
64+
Payload: b,
65+
}); err != nil {
66+
return err
67+
}
68+
}
69+
break // Not return: we still need to drain any additional padding.
70+
}
71+
if hdr == nil {
72+
break // Not return: we still need to drain any additional padding.
73+
}
74+
75+
if b := tr.RawBytes(); len(b) > 0 {
76+
if _, err := p.AddEntry(storage.Entry{
77+
Type: storage.SegmentType,
78+
Payload: b,
79+
}); err != nil {
80+
return err
81+
}
82+
}
83+
84+
var csum []byte
85+
if hdr.Size > 0 {
86+
_, csum, err = fp.Put(hdr.Name, tr)
87+
if err != nil {
88+
return err
89+
}
90+
}
91+
92+
entry := storage.Entry{
93+
Type: storage.FileType,
94+
Size: hdr.Size,
95+
Payload: csum,
96+
}
97+
// For proper marshalling of non-utf8 characters
98+
entry.SetName(hdr.Name)
99+
100+
// File entries added, regardless of size
101+
if _, err := p.AddEntry(entry); err != nil {
102+
return err
103+
}
104+
105+
if b := tr.RawBytes(); len(b) > 0 {
106+
if _, err := p.AddEntry(storage.Entry{
107+
Type: storage.SegmentType,
108+
Payload: b,
109+
}); err != nil {
110+
return err
111+
}
112+
}
113+
}
114+
115+
// It is allowable, and not uncommon that there is further padding on
116+
// the end of an archive, apart from the expected 1024 null bytes. We
117+
// do this in chunks rather than in one go to avoid cases where a
118+
// maliciously crafted tar file tries to trick us into reading many GBs
119+
// into memory.
120+
const paddingChunkSize = 1024 * 1024
121+
var paddingChunk [paddingChunkSize]byte
122+
for {
123+
n, err := outputRdr.Read(paddingChunk[:])
124+
if n != 0 {
125+
if _, aerr := p.AddEntry(storage.Entry{
126+
Type: storage.SegmentType,
127+
Payload: paddingChunk[:n],
128+
}); aerr != nil {
129+
return aerr
130+
}
131+
}
132+
if err != nil {
133+
if err == io.EOF {
134+
break
135+
}
136+
return err
137+
}
138+
}
139+
140+
return nil
141+
}
142+
143+
// newInputTarStreamCommon sets up the shared plumbing for NewInputTarStream and
144+
// NewInputTarStreamWithDone.
145+
//
146+
// It constructs an io.Pipe and an io.TeeReader such that:
147+
//
148+
// - The caller reads tar bytes from the returned *io.PipeReader.
149+
// - The background goroutine simultaneously reads the same stream from the
150+
// TeeReader to perform tar-split parsing and metadata packing.
151+
//
152+
// Abort and synchronization semantics:
153+
//
154+
// - Closing the returned PipeReader causes the TeeReader to fail its write to
155+
// the pipe, which in turn causes the background goroutine to exit promptly.
156+
// - If withDone is true, a done channel is returned that receives exactly one
157+
// error value (nil on success) once the background goroutine has fully
158+
// terminated. This allows callers to safely wait until the input reader `r`
159+
// is no longer in use.
160+
func newInputTarStreamCommon(
161+
r io.Reader,
162+
p storage.Packer,
163+
fp storage.FilePutter,
164+
withDone bool,
165+
) (pr *io.PipeReader, done <-chan error) {
21166
// What to do here... folks will want their own access to the Reader that is
22167
// their tar archive stream, but we'll need that same stream to use our
23168
// forked 'archive/tar'.
@@ -34,123 +179,60 @@ func NewInputTarStream(r io.Reader, p storage.Packer, fp storage.FilePutter) (io
34179
// only read what the outputRdr Read's. Since Tar archives have padding on
35180
// the end, we want to be the one reading the padding, even if the user's
36181
// `archive/tar` doesn't care.
37-
pR, pW := io.Pipe()
38-
outputRdr := io.TeeReader(r, pW)
182+
pr, pw := io.Pipe()
39183

40-
// we need a putter that will generate the crc64 sums of file payloads
184+
// We need a putter that will generate the crc64 sums of file payloads.
41185
if fp == nil {
42186
fp = storage.NewDiscardFilePutter()
43187
}
44188

45-
go func() {
46-
tr := tar.NewReader(outputRdr)
47-
tr.RawAccounting = true
48-
for {
49-
hdr, err := tr.Next()
50-
if err != nil {
51-
if err != io.EOF {
52-
pW.CloseWithError(err)
53-
return
54-
}
55-
// even when an EOF is reached, there is often 1024 null bytes on
56-
// the end of an archive. Collect them too.
57-
if b := tr.RawBytes(); len(b) > 0 {
58-
_, err := p.AddEntry(storage.Entry{
59-
Type: storage.SegmentType,
60-
Payload: b,
61-
})
62-
if err != nil {
63-
pW.CloseWithError(err)
64-
return
65-
}
66-
}
67-
break // not return. We need the end of the reader.
68-
}
69-
if hdr == nil {
70-
break // not return. We need the end of the reader.
71-
}
72-
73-
if b := tr.RawBytes(); len(b) > 0 {
74-
_, err := p.AddEntry(storage.Entry{
75-
Type: storage.SegmentType,
76-
Payload: b,
77-
})
78-
if err != nil {
79-
pW.CloseWithError(err)
80-
return
81-
}
82-
}
189+
outputRdr := io.TeeReader(r, pw)
83190

84-
var csum []byte
85-
if hdr.Size > 0 {
86-
var err error
87-
_, csum, err = fp.Put(hdr.Name, tr)
88-
if err != nil {
89-
pW.CloseWithError(err)
90-
return
91-
}
92-
}
93-
94-
entry := storage.Entry{
95-
Type: storage.FileType,
96-
Size: hdr.Size,
97-
Payload: csum,
98-
}
99-
// For proper marshalling of non-utf8 characters
100-
entry.SetName(hdr.Name)
101-
102-
// File entries added, regardless of size
103-
_, err = p.AddEntry(entry)
104-
if err != nil {
105-
pW.CloseWithError(err)
106-
return
107-
}
191+
if withDone {
192+
ch := make(chan error, 1)
193+
done = ch
194+
go runInputTarStreamGoroutine(outputRdr, pw, p, fp, ch)
195+
return pr, done
196+
}
108197

109-
if b := tr.RawBytes(); len(b) > 0 {
110-
_, err = p.AddEntry(storage.Entry{
111-
Type: storage.SegmentType,
112-
Payload: b,
113-
})
114-
if err != nil {
115-
pW.CloseWithError(err)
116-
return
117-
}
118-
}
119-
}
198+
go runInputTarStreamGoroutine(outputRdr, pw, p, fp, nil)
199+
return pr, nil
200+
}
120201

121-
// It is allowable, and not uncommon that there is further padding on
122-
// the end of an archive, apart from the expected 1024 null bytes. We
123-
// do this in chunks rather than in one go to avoid cases where a
124-
// maliciously crafted tar file tries to trick us into reading many GBs
125-
// into memory.
126-
const paddingChunkSize = 1024 * 1024
127-
var paddingChunk [paddingChunkSize]byte
128-
for {
129-
var isEOF bool
130-
n, err := outputRdr.Read(paddingChunk[:])
131-
if err != nil {
132-
if err != io.EOF {
133-
pW.CloseWithError(err)
134-
return
135-
}
136-
isEOF = true
137-
}
138-
if n != 0 {
139-
_, err = p.AddEntry(storage.Entry{
140-
Type: storage.SegmentType,
141-
Payload: paddingChunk[:n],
142-
})
143-
if err != nil {
144-
pW.CloseWithError(err)
145-
return
146-
}
147-
}
148-
if isEOF {
149-
break
150-
}
151-
}
152-
pW.Close()
153-
}()
202+
// NewInputTarStream wraps the Reader stream of a tar archive and provides a
203+
// Reader stream of the same.
204+
//
205+
// In the middle it will pack the segments and file metadata to storage.Packer
206+
// `p`.
207+
//
208+
// The storage.FilePutter is where payload of files in the stream are
209+
// stashed. If this stashing is not needed, you can provide a nil
210+
// storage.FilePutter. Since the checksumming is still needed, then a default
211+
// of NewDiscardFilePutter will be used internally
212+
//
213+
// If callers need to be able to abort early and/or wait for goroutine termination,
214+
// prefer NewInputTarStreamWithDone.
215+
//
216+
// Deprecated: Use NewInputTarStreamWithDone instead.
217+
func NewInputTarStream(r io.Reader, p storage.Packer, fp storage.FilePutter) (io.Reader, error) {
218+
pr, _ := newInputTarStreamCommon(r, p, fp, false)
219+
return pr, nil
220+
}
154221

155-
return pR, nil
222+
// NewInputTarStreamWithDone wraps the Reader stream of a tar archive and provides a
223+
// Reader stream of the same.
224+
//
225+
// In the middle it will pack the segments and file metadata to storage.Packer `p`.
226+
//
227+
// It also returns a done channel that will receive exactly one error value
228+
// (nil on success) when the internal goroutine has fully completed parsing
229+
// the tar stream (including the final paddingChunk draining loop) and has
230+
// finished writing all entries to `p`.
231+
//
232+
// The returned reader is an io.ReadCloser so callers can stop early; closing it
233+
// aborts the pipe so the internal goroutine can terminate promptly (rather than
234+
// hanging on a blocked pipe write).
235+
func NewInputTarStreamWithDone(r io.Reader, p storage.Packer, fp storage.FilePutter) (io.ReadCloser, <-chan error, error) {
236+
pr, done := newInputTarStreamCommon(r, p, fp, true)
237+
return pr, done, nil
156238
}

0 commit comments

Comments
 (0)