A Python application for ingesting Bluesky's Jetstream API data and writing it to DuckDB.
This project uses uv for Python package management. Install uv first:
# On macOS and Linux
curl -LsSf https://astral.sh/uv/install.sh | sh
# On Windows
powershell -c "irm https://astral.sh/uv/install.ps1 | iex"
# Or with pip
pip install uv- Clone the repository
- Install dependencies and create a virtual environment:
uv sync
This will automatically:
- Create a virtual environment in
.venv/ - Install all project dependencies
- Install the package in editable mode
Run the ingestor with a DuckDB database file path:
uv run jetstream-ingest /path/to/jetstream.dbOptional arguments:
--batch-size: Number of messages to batch before writing (default: 1000)
Example:
uv run jetstream-ingest data/jetstream.db --batch-size 500A Docker Compose setup is provided for easy development:
-
Build and start the ingestor:
docker-compose up -d ingestor
-
View logs:
docker-compose logs -f ingestor
-
Query the database using Python and DuckDB:
docker-compose run --rm duckdb-cli
-
Stop the services:
docker-compose down
The database file will be persisted in the ./data directory.
-
Install with development dependencies:
uv sync --dev
-
Run tests:
uv run pytest tests/
-
Add new dependencies:
# Add a runtime dependency uv add <package-name> # Add a development dependency uv add --dev <package-name>
The application writes to a DuckDB table named jetstream_messages with the following schema:
receivedTimestamp: TIMESTAMP - when the message was receivedpayload: VARCHAR - raw JSON message from Jetstream
You can query the DuckDB database using the DuckDB CLI or any DuckDB client:
import duckdb
# Connect to the database
conn = duckdb.connect('data/jetstream.db', read_only=True)
# Query the data
result = conn.execute("""
SELECT
COUNT(*) as total_messages,
MIN(receivedTimestamp) as first_message,
MAX(receivedTimestamp) as last_message
FROM jetstream_messages
""").fetchall()
print(result)
conn.close()Example queries:
-- Count total messages
SELECT COUNT(*) FROM jetstream_messages;
-- View recent messages
SELECT receivedTimestamp, payload
FROM jetstream_messages
ORDER BY receivedTimestamp DESC
LIMIT 10;
-- Export to Parquet (if needed)
COPY jetstream_messages TO 'output.parquet' (FORMAT PARQUET);The application logs to stdout with INFO level by default. Monitor the logs for:
- Connection status
- Batch writing success/failure
- Error conditions
The application implements:
- Automatic reconnection on connection loss
- Graceful shutdown on SIGTERM/SIGINT
- Batch retry logic for failed writes
- Proper DuckDB connection cleanup on shutdown
MIT License