web3-etl is an extendable Extract-Transform-Load (ETL) pipeline that reads on-chain event logs emitted by any EVM compatible smart-contract, enriches them with useful metadata and stores the results in either CSV files or a MySQL database.
The project is heavily inspired by chainbound/apollo but adds:
- Chunk-based log retrieval (no full block downloads)
- Optional event filtering at the RPC level
- Pluggable sinks: CSV or MySQL
- Automatic resume via a progress file
- A lightweight REST API to launch and monitor indexing jobs at runtime
- Project Layout
- Features
- Configuration
- Quick Start (CLI)
- REST API
- Storage Back-ends
- Resume Capability
- Logging & Retry
- Manual Test Checklist
- Roadmap
- License
etl-blockchain/
├── cmd/
│ ├── indexer.go # CLI launcher
│ └── api.go # REST server bootstrap
├── internal/
│ ├── api/ # Router, handlers, DTOs
│ ├── config/ # YAML loader & validation
│ ├── indexer/ # Main orchestrator
│ ├── parser/ # ABI decoding & enrichment
│ ├── rpc/ # Resilient Ethereum RPC client
│ └── sink/ # CSV / MySQL back-ends
├── abi/ # Contract ABIs referenced in the config
├── data/ # Generated CSV files (git-ignored)
├── config.yaml.example
└── README.md # You are here 👋
- Chunked Log Scanning – Reads logs in fixed-size block windows to avoid timeouts and memory spikes.
- Event Filtering – Specify a list of event names per contract; the RPC node returns only the topics you care about.
- Multi-contract Support – Index as many contracts as you wish in a single run.
- Enrichment Layer – Each record is augmented with: event name, tx hash, block number, timestamp, sender, etc.
- Pluggable Sinks – Out-of-the-box support for CSV and MySQL. New sinks can be added by implementing a tiny interface.
- Progress Tracking – Last processed block is stored in
.progress.json; crashes or restarts continue where they left off. - REST API – Trigger long-running indexing jobs programmatically and query their status.
- Observability – Structured logging, retries with back-off, and human-readable progress bars.
All runtime options live in config.yaml (copy config.yaml.example and customise):
rpc_url: "https://mainnet.infura.io/v3/YOUR_KEY"
start_block: 12345678
chunk_size: 1000 # Optional – window size in blocks
contracts:
- name: USDC # Human-friendly label
address: "0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48"
abi: "./abi/token.json"
events: # Optional – filter only these events
- Transfer
storage:
type: "csv" # "csv" or "mysql"
mysql:
dsn: "user:pass@tcp(127.0.0.1:3306)/mydb"
csv:
output_dir: "./data" # Folder must exist
retry:
attempts: 3
delay_ms: 1500# Dependencies
go mod download
# Copy & edit configuration
cp config.yaml.example config.yaml
vim config.yaml
# Run the indexer
go run cmd/indexer.go --config=config.yamlCLI flags override the YAML file:
--config Path to configuration file (default: ./config.yaml)
--start-block First block to scan (uint64)
--rpc-url Alternative RPC endpoint
--storage-type "csv" or "mysql"The HTTP server (default port 8080) lets you create, inspect and cancel jobs.
| Verb | Endpoint | Purpose |
|---|---|---|
| POST | /jobs |
Launch a new indexing job |
| GET | /jobs/{job_id} |
Get real-time status of a job |
| DELETE | /jobs/{job_id} |
(Optional) Cancel a running job |
curl -X POST http://localhost:8080/jobs \
-H 'Content-Type: application/json' \
-d '{
"rpc_url": "https://mainnet.infura.io/v3/YOUR_KEY",
"start_block": 16460000,
"contracts": [{
"name": "USDC",
"address": "0xa0b8…e6eb48",
"abi": "./abi/token.json",
"events": ["Transfer"]
}],
"storage": { "type": "csv", "csv": { "output_dir": "./data" } }
}'The response contains a UUID:
{ "job_id": "1b0dbe6e-2f1c-4758-ad7d-f5021f3ab206" }curl http://localhost:8080/jobs/1b0dbe6e-2f1c-4758-ad7d-f5021f3ab206- One file per
<ContractName>_<EventName>.csv(e.g.USDC_Transfer.csv). - Headers are auto-generated on first write.
- Ideal for analytics pipelines or quick Excel exploration.
- One table per event:
event_<event_name>(e.g.event_transfer). - Column types are inferred from the ABI parameters.
- Perfect for dashboards and ad-hoc SQL queries.
The indexer writes the most recent processed block to .progress.json. On restart it resumes from that block, ensuring at-most-once processing without manual intervention.
- Structured logs via
logrus(orzap). - Automatic retries with configurable attempts/delay for transient RPC and sink errors.
- Concise progress output:
✓ 182000 → 182999 | events: 48 | 1.3 s
- Index Transfer events for USDC on mainnet.
- Confirm decoded data and CSV persistence.
- Switch sink to MySQL and validate inserts.
- Simulate network failure; verify retry and resume logic.
- Webhook sink (push events to external HTTP endpoints)
- Parallel indexing of multiple contracts/events
- Prometheus metrics
- Automatic upload of generated CSVs to S3
- Read-only REST API to serve indexed data
Distributed under the MIT License.