A Symfony 7 CLI application for importing and processing coffee feed data using asynchronous and synchronous messaging based on user preferences.
Get the app running in 3 commands:
docker compose up -d # Start app and background worker
docker compose exec app php bin/console app:import-coffee-feed coffee_feed.jsonl --async -vvv # Import data
docker compose exec app sqlite3 var/data.db "SELECT COUNT(*) FROM coffee_beans;" # Verify importThis project processes coffee feed data from a local JSONL file and stores it in a SQLite database. It uses Symfony's Messenger component for asynchronous message handling and includes test coverage with PHPUnit and Behat.
Purpose: The application efficiently ingests large coffee product feeds by decoupling the read (JSONL parsing) from write (database insertion) operations. Data is chunked into 20-item batches and processed either synchronously for immediate results or asynchronously via a background queue worker for better resource utilization.
- Framework: Symfony 7
- PHP Version: 8.4+
- Database: SQLite
- Messaging: Symfony Messenger with Doctrine transport
- Testing: PHPUnit 13, Behat (BDD)
- Containerization: Docker (Alpine-based multi-stage build)
├── src/
│ ├── Command/ # CLI commands
│ ├── Entity/ # Database entities (CoffeeBean, etc.)
│ ├── Message/ # Message classes
│ ├── MessageHandler/ # Message handlers
│ ├── Repository/ # Doctrine repositories
│ └── Kernel.php
├── tests/ # Test suites
├── config/ # Configuration files
├── docker-compose.yaml # Docker Compose setup
├── Dockerfile # Multi-stage Docker build
├── coffee_feed.jsonl # Input data file
└── composer.json
- Docker
Build and run with Docker Compose:
docker compose up -dThis will:
- Build the multi-stage Docker image
- Set up the environment
- Automatically create the SQLite database via
scripts/entrypoint.sh - Start the app container (main application)
- Start the queue_worker container (dedicated background async worker that runs automatically)
The docker-compose.yaml includes two services:
- app: Main application container for running commands, tests, and direct queries. Keep it running in the background to exec into.
- queue_worker: Dedicated background worker container that continuously processes async messages. Runs the
messenger:consume asynccommand automatically with memory limits (128MB) and batch limits (10 messages) to prevent resource exhaustion.
Both services share the same SQLite database via a named volume (data_storage).
The coffee_feed.jsonl file contains one JSON object per line, each representing a coffee product:
{"sku":"ARABICA-001","name":"Ethiopian Yirgacheffe","in_stock":true,"origin":"Ethiopia","roast":"Medium","roast_date":"2024-01-15"}
{"sku":"ROBUSTA-042","name":"Vietnamese Robusta","in_stock":false,"origin":"Vietnam","roast":"Dark","roast_date":"2024-01-10"}
{"sku":"BLEND-007","name":"Dawn Espresso Blend","in_stock":true,"origin":"Brazil/Colombia","roast":"Medium","roast_date":"2024-01-20"}Expected fields: sku, name, in_stock, origin, roast, roast_date
The import command processes these records in 20-item chunks to maintain consistent memory usage regardless of file size.
Run the import command to process the coffee feed:
Asynchronous (queued for background processing):
docker compose exec app php bin/console app:import-coffee-feed coffee_feed.jsonl --async -vvvSynchronous (immediate processing):
docker compose exec app php bin/console app:import-coffee-feed coffee_feed.jsonl -vvvThis command:
- Ensures the
coffee_beanstable exists (creates it if needed) - Reads the
coffee_feed.jsonlfile in 20-item chunks - Dispatches messages for each chunk (async or sync based on
--asyncflag) - Stores data in the database via the message handlers
When using the --async flag, messages are automatically processed by the queue_worker container. To monitor progress:
docker compose logs queue_worker -f # View worker logs in real-time
docker compose ps # Check if worker is runningThe worker will continue processing messages in the background until the queue is empty. No additional command is needed.
After importing, verify the data was stored correctly:
Direct SQLite access (recommended):
Count total records:
docker compose exec app sqlite3 var/data.db "SELECT COUNT(*) as total FROM coffee_beans;"Query specific records:
docker compose exec app sqlite3 var/data.db "SELECT sku, name, in_stock FROM coffee_beans LIMIT 10;"docker compose exec app vendor/bin/phpunitdocker compose exec app vendor/bin/behatThe database is created with a two-tier approach for safety and flexibility:
1. Docker startup (entrypoint):
When docker compose up runs, scripts/entrypoint.sh executes scripts/init.sql to create the initial SQLite database if it doesn't exist. This ensures the database file and base structure are ready.
2. Runtime safeguard (import command):
Each time you run the import command, ImportCoffeeFeedCommand calls ensureSchemaExists() which verifies the coffee_beans table exists. If it's missing (e.g., fresh database or corrupted state), it's recreated automatically.
Result: The coffee_beans table is guaranteed to exist before any data import, preventing import failures due to missing schemas.
Application configuration via .env files:
.env- Shared environment configuration.env.dev- Development-specific settings.env.test- Test environment settings
Key variables:
APP_ENV=dev # Environment: dev or test
APP_DEBUG=true # Enable debug mode
DATABASE_URL=sqlite:///%kernel.project_dir%/var/data.db?timeout=5&journal_mode=wal # SQLite with WAL mode
MESSENGER_TRANSPORT_DSN=doctrine://default # Use Doctrine for message queueWAL mode (journal_mode=wal) improves SQLite concurrency - critical for async worker + main app writing simultaneously.
The application uses SQLite. Database files are stored in the var/ directory.
The application uses a CQRS (Command Query Responsibility Segregation) pattern combined with Event-Driven architecture:
Query Side (Read):
ImportCoffeeFeedCommand: Reads JSONL file via streaming generator, chunks data, dispatches events
Command Side (Write):
ImportCoffeeChunkHandler: Subscribes to chunk events and persists data to the database
Separation & Decoupling:
- File reading (query) is completely isolated from database writing (command)
- Communication flows through Symfony Messenger as the event bus
- Enables independent scaling of read and write operations
- ImportCoffeeFeedCommand: Streams JSONL and dispatches
ImportCoffeeChunkMessageevents - ImportCoffeeChunkMessage: Event payload containing a batch of coffee bean data
- ImportCoffeeChunkHandler: Event handler that subscribes and processes writes
- Synchronous (default): Events processed immediately within the same command via the
synctransport - Asynchronous (
--asyncflag): Events dispatched to a queue and processed by background workers
The implementation prioritizes memory efficiency for handling large datasets:
-
Streaming Generator Pattern: The
streamJsonInChunks()method uses PHP generators to read the JSONL file line-by-line, keeping only one line in memory at a time instead of loading the entire file -
Chunked Processing: Data is processed in fixed 20-item batches. This prevents memory from accumulating when processing large feeds
-
Async Queue Offload: With the
--asyncflag, messages are immediately dispatched to the queue (database transport) rather than held in memory waiting for processing -
Bulk Inserts: The handler performs bulk inserts via
bulkInsert()instead of individual row inserts, reducing round-trips and memory overhead -
Decoupled Read/Write: The command releases memory after dispatching each chunk's event - it doesn't wait for handler completion, allowing garbage collection between iterations
- CoffeeBean Entity: Represents a coffee product (SKU, name, stock status, origin, roast, etc.)
The Dockerfile uses a multi-stage build strategy:
- Builder Stage: Compiles PHP extensions and installs dependencies
- Runtime Stage: Minimal image with only runtime requirements
This approach reduces the final image size while maintaining all necessary functionality.
Messages not being processed:
- Check the worker container is running:
docker compose ps - View worker logs:
docker compose logs queue_worker -f - The async worker runs automatically in the background when you use
--asyncflag
Database locked or corrupted:
Reset by removing the volume (the database will be recreated on next startup):
docker compose down -v
docker compose up -dContainer fails to start or stuck:
docker compose down -v # Remove volumes and containers
docker compose up --build # Rebuild and start freshPermission errors in var/ directory:
docker compose exec app chown -R www-data:www-data var/Proprietary
This project was developed under human direction with AI assistance (Claude/Gemini) as a tool for code implementation, testing setup, and documentation. All architectural decisions and technical direction were made by the developer.
This is an assessment project. Refer to the project guidelines for contribution policies.