Last Updated: 2025-01-07 Branch: feature/configurable-output-and-partition-fix Status: Phase 1 & 2 Complete, Phase 3 Partial
The current architecture has two critical issues:
- Memory Bloat: Loads entire partitions into memory (10+ GB), causing OOM crashes
- Retry Failure: Statement timeout occurs during
rows.Next()iteration, outside retry logic wrapper, so retries never happen
Implement chunked streaming architecture:
- Process rows in 10,000-row chunks (configurable)
- Stream directly to temp files via formatter → compressor pipeline
- Wrap entire streaming process in retry logic
- Upload completed temp file to S3
- Memory: Constant ~100 MB regardless of partition size
File: cmd/config.go
- Added
ChunkSize intfield toConfigstruct (line 48) - Added validation: 100 minimum, 1,000,000 maximum (lines 252-260)
- Added error constants:
ErrChunkSizeMinimum,ErrChunkSizeMaximum
File: cmd/root.go
- Added
--chunk-sizeflag (default: 10000) (line 202) - Added viper binding for
chunk_size(line 221) - Wired into Config struct (line 288)
File: cmd/schema.go (NEW FILE)
ColumnInfostruct: Stores column name, data_type, udt_nameTableSchemastruct: Stores table name and columnsgetTableSchema(ctx, tableName): Queriesinformation_schema.columns- Implements
formatters.ColumnSchemaandformatters.TableSchemainterfaces
File: cmd/archiver.go
getTempDir(): Returns OS temp directory (line 107-112)createTempFile(): Creates temp file with "data-archiver-*.tmp" prefix (line 114-118)cleanupTempFile(path): Removes temp file, ignoring errors (line 120-125)
File: cmd/archiver.go
- Added
hashpackage (line 12) - Added
iopackage (line 13)
File: cmd/formatters/formatter.go
TableSchemainterface: GetColumns()ColumnSchemainterface: GetName(), GetType()StreamWriterinterface:WriteChunk(rows []map[string]interface{}) errorClose() error
StreamingFormatterinterface:NewWriter(w io.Writer, schema TableSchema) (StreamWriter, error)Extension() stringMIMEType() string
GetStreamingFormatter(format string): Factory function
File: cmd/formatters/jsonl.go
JSONLStreamingFormatterstructNewJSONLStreamingFormatter(): ConstructorjsonlStreamWriter: Implements StreamWriterWriteChunk(): Marshals each row to JSON, writes with newlineClose(): No-op (JSONL has no footer)
- Memory: Constant - writes rows immediately
- Headers/Footers: None
File: cmd/formatters/csv.go
CSVStreamingFormatterstructNewCSVStreamingFormatter(): ConstructorcsvStreamWriter: Implements StreamWriter- Stores sorted column names from schema
- Writes CSV header in
NewWriter() WriteChunk(): Writes rows using column orderClose(): Flushes CSV writer
- Memory: Constant - uses buffered CSV writer
- Headers/Footers: Header written upfront, no footer
File: cmd/formatters/parquet.go
ParquetStreamingFormatterstruct (supports compression: snappy, zstd, gzip, lz4, none)NewParquetStreamingFormatter(): Constructor with snappy defaultNewParquetStreamingFormatterWithCompression(compression): Custom compressionmapPostgreSQLTypeToParquetNode(udtName): Maps PostgreSQL types to Parquet nodes- Handles: int2/4/8, float4/8, bool, timestamp/tz, date, varchar, text, json/jsonb, uuid, bytea
- Default: String type
parquetStreamWriter: Implements StreamWriter- Uses
parquet.GenericWriter[map[string]any] WriteChunk(): Callswriter.Write(rows)- Parquet handles batchingClose(): Closes Parquet writer (writes footer metadata)
- Uses
- Memory: Constant - Parquet library buffers row groups internally
- Headers/Footers: Schema written upfront, footer with metadata on close
File: cmd/compressors/compressor.go
- Added
NewWriter(w io.Writer, level int) io.WriteCloserto interface - Added
nopWriteCloserhelper type (wraps io.Writer with no-op Close)
File: cmd/compressors/zstd.go
NewWriter(): Createszstd.NewWriter()with encoder level and concurrency
File: cmd/compressors/gzip.go
NewWriter(): Createsgzip.NewWriterLevel()with specified level
File: cmd/compressors/lz4.go
NewWriter(): Createslz4.NewWriter()with compression level option
File: cmd/compressors/none.go
NewWriter(): ReturnsnopWriteCloser(pass-through)
File: cmd/archiver.go
Function: extractPartitionDataStreaming() (lines 1871-2131)
Signature:
func (a *Archiver) extractPartitionDataStreaming(
partition PartitionInfo,
program *tea.Program,
cache *PartitionCache,
updateTaskStage func(string),
) (tempFilePath string, fileSize int64, md5Hash string, uncompressedSize int64, err error)Pipeline Architecture:
For Parquet (internal compression):
PostgreSQL → JSON rows → formatter → hasher → tempFile
↓
MD5 hash
For JSONL/CSV (external compression):
PostgreSQL → JSON rows → formatter → compressor → hasher → tempFile
↓
MD5 hash
Implementation Details:
-
Schema Query (lines 1877-1883):
- Calls
getTableSchema(ctx, partition.TableName) - Returns error if schema query fails
- Calls
-
Temp File Creation (lines 1892-1907):
- Creates temp file with
createTempFile() - Deferred cleanup on error
- Creates temp file with
-
Pipeline Setup (lines 1911-1951):
- Gets streaming formatter:
GetStreamingFormatter(config.OutputFormat) - Branching logic based on
UsesInternalCompression():- Parquet:
formatter → MultiWriter(tempFile, hasher) - JSONL/CSV:
formatter → compressor → MultiWriter(tempFile, hasher)
- Parquet:
- Gets streaming formatter:
-
Chunked Row Processing (lines 1959-2075):
- Query:
SELECT row_to_json(t) FROM table t - Pre-allocate chunk slice with capacity = chunkSize
- For each row:
- Scan JSON data
- Unmarshal to
map[string]interface{} - Append to chunk
- When chunk full (10,000 rows):
- Call
streamWriter.WriteChunk(chunk) - Track uncompressed size (approximate)
- Reset chunk slice (keep capacity)
- Update progress UI
- Call
- Write final partial chunk
- Query:
-
Cleanup (lines 2077-2098):
- Close stream writer (flushes formatters, writes footers)
- Close compressor (if used)
- Close temp file
- Get file size from stat
- Get MD5 hash from hasher
-
Return Values:
tempFilePath: Path to completed temp filefileSize: Size of compressed filemd5Hash: MD5 hex stringuncompressedSize: Approximate uncompressed size (JSON)err: Any error that occurred
Memory Characteristics:
- Chunk buffer: ~10,000 rows × ~10 KB/row = ~100 MB max
- Formatter buffer: Minimal (writes immediately)
- Compressor buffer: 4-32 MB (library-specific)
- Total: ~150 MB worst case, regardless of partition size
Error Handling:
- Context cancellation checked every 100 rows
- Errors during query, scan, unmarshal, or write return immediately
- Deferred cleanup closes writers and removes temp file on error
Location: cmd/archiver.go (NEW FUNCTION)
Name: extractPartitionDataWithRetry()
Requirements:
- Wrap
extractPartitionDataStreaming()in retry loop - Use
config.Database.MaxRetriesandconfig.Database.RetryDelay - Check
isRetryableError()for errors from:- Schema query failures
- PostgreSQL query failures
rows.Err()errors (statement timeout happens here!)
- On retry:
- Log warning with attempt count
- Clean up partial temp file
- Sleep for retry delay (respect context cancellation)
- Return final error after max retries
Pseudo-code:
func (a *Archiver) extractPartitionDataWithRetry(...) (...) {
maxRetries := a.config.Database.MaxRetries
retryDelay := time.Duration(a.config.Database.RetryDelay) * time.Second
for attempt := 0; attempt <= maxRetries; attempt++ {
tempPath, size, md5, uncompSize, err := a.extractPartitionDataStreaming(...)
if err == nil {
return tempPath, size, md5, uncompSize, nil
}
// Clean up failed temp file
cleanupTempFile(tempPath)
if !isRetryableError(err) {
return "", 0, "", 0, err
}
if attempt < maxRetries {
a.logger.Warn(fmt.Sprintf("Extraction failed (attempt %d/%d): %v. Retrying in %v...",
attempt+1, maxRetries+1, err, retryDelay))
select {
case <-time.After(retryDelay):
continue
case <-a.ctx.Done():
return "", 0, "", 0, a.ctx.Err()
}
}
}
return "", 0, "", 0, fmt.Errorf("extraction failed after %d attempts: %w", maxRetries+1, lastErr)
}Location: cmd/archiver.go
Function: processSinglePartition() (around line 1080)
Current Flow:
processSinglePartition()
→ extractPartitionData() // Loads everything into memory
→ extractRowsWithProgress() // Returns []map[string]interface{}
→ formatter.Format(rows) // Formats in memory
→ return []byte
→ compressPartitionData(data) // Compresses in memory
→ uploadToS3(compressed)
New Flow:
processSinglePartition()
→ extractPartitionDataWithRetry() // Streams to temp file with retry
→ tempFilePath, size, md5, uncompSize returned
→ uploadTempFileToS3(tempFilePath, objectKey)
→ cleanupTempFile(tempFilePath)
Changes Needed:
- Replace call to
extractPartitionData()withextractPartitionDataWithRetry() - Remove call to
compressPartitionData()(now done in streaming) - Change
uploadToS3()to read from temp file instead of byte slice - Add cleanup of temp file after successful upload
- Update cache metadata saving (size, md5 already calculated)
Location: cmd/archiver.go
Function: uploadTempFileToS3() (NEW FUNCTION)
Requirements:
- Open temp file for reading
- Check file size for multipart upload threshold (100 MB)
- Use existing S3 uploader logic
- Preserve multipart upload for large files
- Close file after upload
Pseudo-code:
func (a *Archiver) uploadTempFileToS3(tempFilePath, objectKey string) error {
file, err := os.Open(tempFilePath)
if err != nil {
return fmt.Errorf("failed to open temp file: %w", err)
}
defer file.Close()
fileInfo, err := file.Stat()
if err != nil {
return fmt.Errorf("failed to stat temp file: %w", err)
}
_, err = a.s3Uploader.Upload(&s3manager.UploadInput{
Bucket: aws.String(a.config.S3.Bucket),
Key: aws.String(objectKey),
Body: file,
})
if err != nil {
return fmt.Errorf("S3 upload failed: %w", err)
}
return nil
}File: cmd/formatters/jsonl_test.go (expand existing)
- Test streaming formatter with small chunks (10 rows)
- Test streaming formatter with large chunks (100,000 rows)
- Test chunk boundary conditions (exact chunk size, off-by-one)
- Verify output matches non-streaming formatter
File: cmd/formatters/csv_test.go (expand existing)
- Test header is written first
- Test column ordering consistency
- Test NULL handling
- Test special characters (commas, quotes, newlines)
File: cmd/formatters/parquet_test.go (expand existing)
- Test schema mapping for all PostgreSQL types
- Test internal compression (snappy, zstd, gzip, none)
- Test footer metadata integrity
File: cmd/compressors/zstd_test.go, etc. (expand existing)
- Test NewWriter() produces same output as Compress()
- Test streaming compression matches batch compression
File: cmd/archiver_streaming_test.go (NEW FILE)
- Test
extractPartitionDataStreaming()with mock database - Test retry logic with simulated timeout
- Test temp file creation and cleanup
- Test memory usage stays constant (use runtime.ReadMemStats)
File: cmd/archiver_e2e_test.go (NEW FILE)
- Test full pipeline: DB → extraction → compression → S3
- Test with docker-compose dev environment
- Verify data integrity (download, decompress, parse, compare)
- Test all three formats (JSONL, CSV, Parquet)
- Test large partitions (1M+ rows)
File: CHANGELOG.md
Add new section for v1.4.0:
## [1.4.0] - 2025-01-XX
### Changed
- **Streaming Architecture:**
- Refactored data extraction to use streaming/chunked processing
- Memory usage now constant (~150 MB) regardless of partition size
- Eliminates OOM crashes on large partitions (10+ GB)
- Data streams: PostgreSQL → formatter → compressor → temp file → S3
- Chunk size configurable via `--chunk-size` (default: 10,000 rows)
### Fixed
- **Statement Timeout Retry:**
- Retry logic now wraps entire extraction process, not just query start
- Statement timeouts during row iteration are now properly retried
- Prevents silent failures on large partition extractions
### Added
- **Streaming Formatters:**
- New streaming interfaces for JSONL, CSV, and Parquet formats
- Schema pre-query for CSV headers and Parquet type mapping
- Compression handled in streaming mode for all formats
- **Configuration:**
- New `--chunk-size` flag (100-1,000,000, default: 10,000)
- YAML config: `chunk_size`File: README.md
Add section on streaming and memory usage:
### Memory Usage
The archiver uses a streaming architecture that maintains constant memory usage regardless of partition size:
- **Chunk Size**: 10,000 rows (configurable via `--chunk-size`)
- **Memory Footprint**: ~150 MB constant
- **Large Partitions**: No OOM crashes on multi-GB partitions
#### Tuning Chunk Size
Adjust chunk size based on average row size:
- Small rows (~1 KB): `--chunk-size 50000` (~50 MB memory)
- Medium rows (~10 KB): `--chunk-size 10000` (~100 MB memory) - default
- Large rows (~100 KB): `--chunk-size 1000` (~100 MB memory)
- Very large rows (1+ MB): `--chunk-size 100` (~100 MB memory)Before merge, verify:
- All unit tests pass:
go test ./... - All linters pass:
golangci-lint run - Code formatted:
gofmt -w . - Build succeeds:
go build - Docker build succeeds (AMD64):
docker build --platform linux/amd64 . - Docker build succeeds (ARM64):
docker build --platform linux/arm64 . - Dev environment works:
docker compose -f docker-compose.dev.yaml up - Memory usage verified on large partition (use
docker stats) - Statement timeout retry verified (set low timeout, test on large partition)
- Data integrity verified (download S3 file, decompress, parse, compare with DB)
- Review this document to understand current state
- Implement retry wrapper (
extractPartitionDataWithRetry) - Integrate into main flow (modify
processSinglePartition) - Add temp file upload (
uploadTempFileToS3) - Test locally with docker-compose dev environment
- Run all tests and fix any failures
- Update CHANGELOG and README
- Commit and tag v1.4.0
cmd/schema.go- Schema querying and type definitionsSTREAMING_IMPLEMENTATION_STATUS.md- This document
cmd/config.go- Added ChunkSize configcmd/root.go- Added --chunk-size flagcmd/archiver.go- Added streaming extraction function, temp file utilscmd/formatters/formatter.go- Added streaming interfacescmd/formatters/jsonl.go- Added streaming implementationcmd/formatters/csv.go- Added streaming implementationcmd/formatters/parquet.go- Added streaming implementationcmd/compressors/compressor.go- Added NewWriter to interfacecmd/compressors/zstd.go- Added NewWriter implementationcmd/compressors/gzip.go- Added NewWriter implementationcmd/compressors/lz4.go- Added NewWriter implementationcmd/compressors/none.go- Added NewWriter implementation
┌──────────────┐
│ PostgreSQL │
└──────┬───────┘
│ SELECT * FROM partition (all rows)
↓
┌──────────────────────────────────┐
│ extractRowsWithProgress() │
│ • Loads ALL rows into memory │
│ • Returns []map[string]any │
│ • Memory: O(partition_size) │
└──────────────┬───────────────────┘
↓ All rows in memory
┌──────────────────────────────────┐
│ formatter.Format(rows) │
│ • Formats ALL rows │
│ • Returns []byte │
│ • Memory: 2x partition size │
└──────────────┬───────────────────┘
↓ Formatted data
┌──────────────────────────────────┐
│ compressor.Compress(data) │
│ • Compresses entire file │
│ • Returns []byte │
│ • Memory: 3x partition size │
└──────────────┬───────────────────┘
↓ Compressed data
┌──────────────────────────────────┐
│ uploadToS3(compressed) │
│ • Uploads from memory │
└──────────────────────────────────┘
Total Memory: ~3x partition size (10+ GB for large partitions)
┌──────────────┐
│ PostgreSQL │
└──────┬───────┘
│ SELECT row_to_json(t) FROM t
│
↓ Stream rows
┌──────────────────────────────────┐
│ extractPartitionDataStreaming() │
│ • Process in 10K row chunks │
│ • Memory: O(chunk_size) │
└──────────────┬───────────────────┘
│ Chunks
↓
┌───────────────┐
│ Chunk Buffer │ 10,000 rows
│ ~100 MB │
└───────┬───────┘
↓
┌───────────────────┐
│ StreamingFormatter│
│ • JSONL/CSV/Parq │
│ • WriteChunk() │
└───────┬───────────┘
↓
┌───────────────────┐
│ Compressor │ (if not Parquet)
│ • zstd/gzip/lz4 │
│ • ~32 MB buffer │
└───────┬───────────┘
↓
┌───────────────────┐
│ MultiWriter │
│ • Temp File │
│ • MD5 Hasher │
└───────┬───────────┘
↓
┌───────────────────┐
│ Temp File │
│ • On disk │
│ • ~partition sz │
└───────┬───────────┘
↓
┌──────────────────────────────────┐
│ uploadTempFileToS3() │
│ • Reads from disk │
│ • Multipart for large files │
└──────────────────────────────────┘
Total Memory: ~150 MB constant (chunk + compressor buffers)
If unclear on any implementation details, refer to:
- This document for architecture overview
- Inline comments in modified files
- Research report in previous conversation (if needed)
Good luck with the continuation! 🚀