From 6107bc3d25ec98c2e48cfef049ff8e37636813f1 Mon Sep 17 00:00:00 2001 From: Calin Martinconi Date: Mon, 9 Feb 2026 20:48:31 +0200 Subject: [PATCH 1/2] feat: add support for new compression algorithms --- README.md | 6 ++-- cmd/export.go | 33 +++++++++++++++---- go.mod | 12 +++++++ go.sum | 4 +++ pkg/compressor/compressor.go | 53 +++++++++++++++++++++++++++++++ pkg/compressor/compressor_test.go | 51 +++++++++++++++++++++++++++++ 6 files changed, 151 insertions(+), 8 deletions(-) create mode 100644 pkg/compressor/compressor.go create mode 100644 pkg/compressor/compressor_test.go diff --git a/README.md b/README.md index 3f2c51f..cd3bea2 100644 --- a/README.md +++ b/README.md @@ -8,6 +8,7 @@ batch-export is a tool to retrieve Ethereum event logs for specific contracts, p - Handles large block ranges by querying in smaller chunks. - Supports rate limiting for RPC requests. - Saves retrieved logs to a specified output file (default: `export.ndjson`) in NDJSON format. +- Supports file compression using **Gzip**, **Zstd**, or **Xz** (LZMA2). - Graceful shutdown on interrupt signals (Ctrl+C). ## Requirements @@ -34,7 +35,7 @@ The primary command is export. ```sh ./dist/batch-export export \ --block-range-limit=10000 \ - --compress=true \ + --compression-algo=zstd \ --end=0 \ --endpoint ``` @@ -43,7 +44,8 @@ The primary command is export. ```sh -b, --block-range-limit uint32 Max blocks per log query (default 5) - -c, --compress Compress to GZIP + -c, --compress Compress to GZIP (deprecated, use --compression-algo) + --compression-algo string Compression algorithm (gzip, zstd, xz, none) (default "gzip") --end uint End block (optional, uses latest block if 0) (default 39810670) -e, --endpoint string Ethereum RPC endpoint URL -h, --help help for export diff --git a/cmd/export.go b/cmd/export.go index f39bb50..f54fdef 100644 --- a/cmd/export.go +++ b/cmd/export.go @@ -7,10 +7,10 @@ import ( "sync" "time" + "github.com/ethersphere/batch-export/pkg/compressor" ethclient "github.com/ethersphere/batch-export/pkg/ethclientwrapper" "github.com/ethersphere/batch-export/pkg/eventfetcher" "github.com/ethersphere/batch-export/pkg/filestore" - "github.com/ethersphere/batch-export/pkg/gzipstore" "github.com/ethersphere/bee/v2/pkg/config" "github.com/ethersphere/bee/v2/pkg/util/abiutil" "github.com/spf13/cobra" @@ -91,12 +91,32 @@ The process can be interrupted at any time (Ctrl+C), and it will attempt to save }() compressFunc := func() error { + algo, _ := cmd.Flags().GetString("compression-algo") if compress { - if err := gzipstore.CompressFile(outputFile, outputFile+".gzip"); err != nil { - return fmt.Errorf("error compressing file: %w", err) - } - c.log.Info("File compressed", "outputFile", outputFile+".gzip") + algo = "gzip" // backward compatibility + } + + if algo == "none" { + return nil + } + + outputExt := "" + switch algo { + case "gzip": + outputExt = ".gzip" + case "zstd": + outputExt = ".zst" + case "xz": + outputExt = ".xz" + default: + return fmt.Errorf("unsupported compression algorithm: %s", algo) + } + + compressedFile := outputFile + outputExt + if err := compressor.CompressFile(outputFile, compressedFile, algo); err != nil { + return fmt.Errorf("error compressing file: %w", err) } + c.log.Info("File compressed", "outputFile", compressedFile) return nil } @@ -138,7 +158,8 @@ The process can be interrupted at any time (Ctrl+C), and it will attempt to save cmd.Flags().IntVarP(&maxRequest, "max-request", "m", 15, "Max RPC requests/sec") cmd.Flags().Uint32VarP(&blockRangeLimit, "block-range-limit", "b", 5, "Max blocks per log query") cmd.Flags().StringVarP(&outputFile, "output", "o", "export.ndjson", "Output file path (NDJSON)") - cmd.Flags().BoolVarP(&compress, "compress", "c", false, "Compress to GZIP") + cmd.Flags().BoolVarP(&compress, "compress", "c", false, "Compress to GZIP (deprecated, use --compression-algo)") + cmd.Flags().String("compression-algo", "gzip", "Compression algorithm (gzip, zstd, xz, none)") c.root.AddCommand(cmd) diff --git a/go.mod b/go.mod index 00c838a..c29447b 100644 --- a/go.mod +++ b/go.mod @@ -5,10 +5,22 @@ go 1.24.0 require ( github.com/ethereum/go-ethereum v1.14.5 github.com/ethersphere/bee/v2 v2.5.0 + github.com/klauspost/compress v1.17.6 github.com/spf13/cobra v1.9.1 golang.org/x/time v0.11.0 ) +require ( + github.com/stretchr/testify v1.8.4 + github.com/ulikunitz/xz v0.5.15 +) + +require ( + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) + require ( github.com/Microsoft/go-winio v0.6.2 // indirect github.com/StackExchange/wmi v1.2.1 // indirect diff --git a/go.sum b/go.sum index 68ca7a3..f398f1f 100644 --- a/go.sum +++ b/go.sum @@ -177,6 +177,8 @@ github.com/tklauser/numcpus v0.6.1 h1:ng9scYS7az0Bk4OZLvrNXNSAO2Pxr1XXRAPyjhIx+F github.com/tklauser/numcpus v0.6.1/go.mod h1:1XfjsgE2zo8GVw7POkMbHENHzVg3GzmoZ9fESEdAacY= github.com/tyler-smith/go-bip39 v1.1.0 h1:5eUemwrMargf3BSLRRCalXT93Ns6pQJIjYQN2nyfOP8= github.com/tyler-smith/go-bip39 v1.1.0/go.mod h1:gUYDtqQw1JS3ZJ8UWVcGTGqqr6YIN3CWg+kkNaLt55U= +github.com/ulikunitz/xz v0.5.15 h1:9DNdB5s+SgV3bQ2ApL10xRc35ck0DuIX/isZvIk+ubY= +github.com/ulikunitz/xz v0.5.15/go.mod h1:nbz6k7qbPmH4IRqmfOplQw/tblSgqTqBwxkY0oWt/14= github.com/urfave/cli/v2 v2.25.7 h1:VAzn5oq403l5pHjc4OhD54+XGO9cdKVL/7lDjF+iKUs= github.com/urfave/cli/v2 v2.25.7/go.mod h1:8qnjx1vcq5s2/wpsqoZFndg2CE5tNFyrTvS6SinrnYQ= github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 h1:bAn7/zixMGCfxrRTfdpNzjtPYqr8smhKouy9mxVdGPU= @@ -204,6 +206,8 @@ golang.org/x/time v0.11.0/go.mod h1:CDIdPxbZBQxdj6cxyCIdrNogrJKMJ7pr37NYpMcMDSg= google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI= google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/natefinch/lumberjack.v2 v2.2.1 h1:bBRl1b0OH9s/DuPhuXpNl+VtCaJXFZ5/uEFST95x9zc= gopkg.in/natefinch/lumberjack.v2 v2.2.1/go.mod h1:YD8tP3GAjkrDg1eZH7EGmyESg/lsYskCTPBJVb9jqSc= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= diff --git a/pkg/compressor/compressor.go b/pkg/compressor/compressor.go new file mode 100644 index 0000000..1896c04 --- /dev/null +++ b/pkg/compressor/compressor.go @@ -0,0 +1,53 @@ +package compressor + +import ( + "compress/gzip" + "fmt" + "io" + "os" + + "github.com/klauspost/compress/zstd" + "github.com/ulikunitz/xz" +) + +// CompressFile compresses the specified input file into the specified output files using the given algorithm. +func CompressFile(inputFilePath string, outputFilePath string, algo string) error { + inputFile, err := os.Open(inputFilePath) + if err != nil { + return fmt.Errorf("failed to open input file '%s': %w", inputFilePath, err) + } + defer inputFile.Close() + + outputFile, err := os.Create(outputFilePath) + if err != nil { + return fmt.Errorf("failed to create output file '%s': %w", outputFilePath, err) + } + defer outputFile.Close() + + var writer io.WriteCloser + + switch algo { + case "gzip": + writer = gzip.NewWriter(outputFile) + case "zstd": + writer, err = zstd.NewWriter(outputFile) + if err != nil { + return fmt.Errorf("failed to create zstd writer: %w", err) + } + case "xz": + writer, err = xz.NewWriter(outputFile) + if err != nil { + return fmt.Errorf("failed to create xz writer: %w", err) + } + default: + return fmt.Errorf("unsupported compression algorithm: %s", algo) + } + defer writer.Close() + + _, err = io.Copy(writer, inputFile) + if err != nil { + return fmt.Errorf("failed to compress data: %w", err) + } + + return nil +} diff --git a/pkg/compressor/compressor_test.go b/pkg/compressor/compressor_test.go new file mode 100644 index 0000000..590e771 --- /dev/null +++ b/pkg/compressor/compressor_test.go @@ -0,0 +1,51 @@ +package compressor_test + +import ( + "fmt" + "os" + "testing" + + "github.com/ethersphere/batch-export/pkg/compressor" + "github.com/stretchr/testify/assert" +) + +func TestCompressionSizes(t *testing.T) { + // Create specific test data that resembles JSON logs + inputContent := []byte(`{"address":"0x123","block":100,"event":"Transfer","data":"0xabc"} +{"address":"0x123","block":101,"event":"Transfer","data":"0xdef"} +{"address":"0x123","block":102,"event":"Transfer","data":"0xghi"} +`) + // Repeat to get a reasonable file size + for i := 0; i < 1000; i++ { + inputContent = append(inputContent, []byte(`{"address":"0x123","block":100,"event":"Transfer","data":"0xabc"}`)...) + } + + inputFile := "test_input.json" + err := os.WriteFile(inputFile, inputContent, 0644) + assert.NoError(t, err) + defer os.Remove(inputFile) + + algos := []string{"gzip", "zstd", "xz"} + results := make(map[string]int64) + + fmt.Printf("\n--- Compression Size Comparison (Input size: %d bytes) ---\n", len(inputContent)) + + for _, algo := range algos { + outputFile := "test_output." + algo + defer os.Remove(outputFile) + + err := compressor.CompressFile(inputFile, outputFile, algo) + assert.NoError(t, err) + + info, err := os.Stat(outputFile) + assert.NoError(t, err) + + results[algo] = info.Size() + fmt.Printf("%-5s: %d bytes (%.2f%% of original)\n", algo, info.Size(), float64(info.Size())/float64(len(inputContent))*100) + } + fmt.Println("----------------------------------------------------------") + + // Verify expectations: xz should generally be smaller than gzip for this kind of data + // Note: for very small files/specific patterns, results may vary, so we just log them for the user + // But typically xz < gzip +} From 9940b4d217df7890e3a594b3a14a9e683e07afcd Mon Sep 17 00:00:00 2001 From: Calin Martinconi Date: Tue, 10 Feb 2026 13:18:42 +0200 Subject: [PATCH 2/2] fix: linter --- pkg/compressor/compressor_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/compressor/compressor_test.go b/pkg/compressor/compressor_test.go index 590e771..7c4ab27 100644 --- a/pkg/compressor/compressor_test.go +++ b/pkg/compressor/compressor_test.go @@ -21,7 +21,7 @@ func TestCompressionSizes(t *testing.T) { } inputFile := "test_input.json" - err := os.WriteFile(inputFile, inputContent, 0644) + err := os.WriteFile(inputFile, inputContent, 0o644) assert.NoError(t, err) defer os.Remove(inputFile)