A production-ready, PyArrow-accelerated data ingestion and transformation framework for building scalable, enterprise-grade ETL workflows. The pipeline automates the full data lifecycle — ingestion → validation → cleaning → transformation → optimized export — using zero‑configuration defaults with full YAML-based customization.
Data Ingestion Pipeline is designed to mirror real-world data engineering systems used in consulting and enterprise analytics environments. It emphasizes performance, data quality, configurability, and auditability, making it suitable for both analytics and machine‑learning workloads.
Primary Objectives:
- Standardize ingestion across heterogeneous data sources
- Enforce data quality and governance rules
- Produce ML- and BI-ready datasets
- Optimize storage and I/O using columnar formats
| Capability | Description | Business Value |
|---|---|---|
| Multi-format ingestion | CSV, XLSX, JSON, Parquet | Single unified ingestion layer |
| PyArrow acceleration | Columnar I/O & memory efficiency | 5–10× faster processing |
| Automated data cleaning | Missing values, duplicates, outliers | ML-ready datasets |
| YAML-driven configuration | No hard-coded logic | Easy customization & reuse |
| Data validation & QA | Schema, range, completeness checks | Data governance & trust |
| Dual export formats | Parquet + CSV | Analytics + human readability |
| Audit & reporting | JSON metadata and profiling reports | Full traceability |
📊 1M rows × 100 columns → processed in < 30 seconds
💾 70–90% storage reduction using Parquet (Snappy)
⚡ Chunk-based processing for low memory usage
📈 Automated profiling & audit reports
- Getting Started
- Configuration
- Usage
- Pipeline Architecture
- Performance Benchmarks
- Configuration Reference
- Development & Testing
- Production Deployment
- Project Structure
- Python 3.8+
- pip
pip install -r requirements.txt
# or (development mode)
pip install -e .git clone https://github.com/hemanthscode/data-ingestion-pipeline.git
cd data-ingestion-pipeline
pip install -e .python main.py --input data/raw/sample.csvfrom src.pipeline import DataPipeline
pipeline = DataPipeline()
report = pipeline.run("data/raw/sales.csv")
print(f"Pipeline duration: {report['duration_seconds']:.2f}s")
print(f"Final dataset shape: {report['final_shape']}")The pipeline runs with zero configuration by default, using pipeline_config.yaml.
cleaning:
missing_values:
strategy: fill_median
outliers:
method: iqr
action: cap
transformation:
categorical_encoding:
method: onehot
numerical_scaling:
method: minmaxAvailable configs:
pipeline_config.yaml– Production-ready defaultscustom_pipeline.yaml– Override template
[1] Ingestion → Format detection, PyArrow readers
[2] Profiling → Data types, completeness, memory stats
[3] Validation → Schema, ranges, uniqueness checks
[4] Cleaning → Missing values, duplicates, outliers
[5] Transformation→ Encoding, scaling, feature engineering
[6] Export → Parquet + CSV + audit reports
/data/processed/
├── dataset_processed.parquet
├── dataset_processed.csv
/data/reports/
├── profiling_report.json
└── validation_report.json
| Dataset Size | Rows | Raw Size | Parquet Size | Runtime |
|---|---|---|---|---|
| Small | 10K | 5.2 MB | 1.1 MB | 2.1 s |
| Medium | 100K | 48 MB | 8.7 MB | 12.4 s |
| Large | 1M | 450 MB | 67 MB | 28.7 s |
Key Results:
- Up to 90% compression
- ~10× faster than pandas-only pipelines
paths:
raw_data: data/raw
processed_data: data/processed
ingestion:
chunk_size: 10000
supported_formats: [csv, xlsx, parquet, json]
quality:
min_completeness: 0.7
max_duplicate_ratio: 0.1Full configuration is documented in config/pipeline_config.yaml.
# Full pipeline
python main.py --input data/raw/sales.csv
# Skip optional stages
python main.py --input data/raw/sales.csv --skip-validation --skip-transformation
# Custom config
python main.py --input data/raw/sales.csv --config config/custom_pipeline.yamlpip install -e ".[dev]"
pytest tests/
black src/
flake8 src/
mypy src/FROM python:3.11-slim
WORKDIR /app
COPY . .
RUN pip install -e .
CMD ["python", "main.py", "--input", "/data/input.csv"]ingestion:
chunk_size: 50000
max_file_size_mb: 1000
logging:
level: INFOdata-ingestion-pipeline/
├── config/ # YAML configurations
├── src/ # Core pipeline modules
├── data/ # Sample datasets
├── notebooks/ # Jupyter demos
├── tests/ # Unit tests
├── main.py # CLI entry point
├── requirements.txt
└── setup.py
- pandas ≥ 2.0
- pyarrow ≥ 12.0
- numpy ≥ 1.24
- openpyxl ≥ 3.1
PyArrow is required for production performance.