Skip to content

Commit 3dec678

Browse files
committed
feat: add EventSubscriptionCoordinator for dual-path architecture
Add new streaming package with coordinator pattern: - EventSubscriptionCoordinator: Manages Pub/Sub subscription lifecycle - EventCoordinatorManager: Coordinates multiple subscriptions - EventHandler Protocol: Interface for event processing Features: - Bridge sync Pub/Sub callbacks to async handlers - Graceful start/stop with queue draining - Prometheus metrics (reusing queue metrics with coordinator labels) - Support for dual-path: real-time + batched processing Example usage: ```python # Real-time processing realtime = EventSubscriptionCoordinator( subscription="events-realtime", handler=process_immediately, ) # Batched archival archive = EventSubscriptionCoordinator( subscription="events-archive", handler=event_loader.add, ) manager = EventCoordinatorManager([realtime, archive]) await manager.start_all() ``` Tests: - 4 new unit tests (all passing) - All 242 tests passing - Fixed missing pytest-asyncio dependency Documentation: - Full docstrings with examples - README: Added dual-path architecture section - README: Removed all Firestore references - README: Updated roadmap with completed features
1 parent 1330c2b commit 3dec678

7 files changed

Lines changed: 493 additions & 25 deletions

File tree

README.md

Lines changed: 52 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ Event ingestion and processing primitives for Python.
1111
- **Flexible ingestion** - Accept any JSON payload with Segment-compatible API
1212
- **Stream-based routing** - Separate processing pipelines by event type for isolation and scalability
1313
- **Adapter pattern** - Pluggable validators for multiple event formats and sources
14-
- **Pluggable storage** - Write to GCS + BigQuery (default), Firestore, or implement custom backends
14+
- **Pluggable storage** - Write to GCS + BigQuery (default) or implement custom backends
1515
- **Error handling** - Built-in dead letter queue for validation failures and retries
1616
- **Type-safe** - Full Pydantic v2 validation with strict typing throughout
1717
- **Async-first** - Built on FastAPI with async/await for high throughput
@@ -46,7 +46,6 @@ settings = Settings(
4646
gcp_gcs_bucket="your-events-bucket",
4747
gcp_bigquery_dataset="events",
4848
gcp_bigquery_table="raw_events",
49-
eventkit_event_store="gcs" # or "firestore" for Firestore mode
5049
)
5150

5251
# Add eventkit routes
@@ -84,7 +83,7 @@ curl -X POST http://localhost:8000/api/v1/identify \
8483
└─────────────────────────────────────────────────────────┘
8584
8685
┌─────────────────────────────────────────────────────────┐
87-
│ Processing Pipeline
86+
│ Processing Pipeline │
8887
│ • Adapters - Validate & normalize to typed events │
8988
│ • Validators - Composable field checks │
9089
│ • Sequencer - Hash-based routing for consistency │
@@ -94,9 +93,8 @@ curl -X POST http://localhost:8000/api/v1/identify \
9493
┌─────────────────────────────────────────────────────────┐
9594
│ Storage Layer (Pluggable) │
9695
│ • GCS + BigQuery - Production data warehouse (default) │
97-
│ • Firestore - Managed, serverless (dev/testing) │
9896
│ • Custom - Implement EventStore protocol │
99-
97+
│ │
10098
│ Warehouse Loader (Background Service) │
10199
│ • BigQueryLoader - Batch load GCS → BigQuery │
102100
│ • Custom - Implement WarehouseLoader protocol │
@@ -115,6 +113,43 @@ curl -X POST http://localhost:8000/api/v1/identify \
115113
| **Event Store** | Persist events to storage | Interface for multiple backends |
116114
| **Error Store** | Dead letter queue for failures | Never lose data, debug later |
117115
| **Warehouse Loader** | Load events to data warehouse | Background service for batch loading |
116+
| **EventSubscriptionCoordinator** | Pub/Sub subscription management | Enables dual-path architecture |
117+
118+
---
119+
120+
### Dual-Path Architecture (Advanced)
121+
122+
For applications requiring both real-time processing and cost-optimized storage, `eventkit` supports consuming the same event stream through multiple coordinators:
123+
124+
```python
125+
from eventkit.streaming import EventSubscriptionCoordinator, EventCoordinatorManager
126+
127+
# Path 1: Real-time processing (<1s latency)
128+
realtime_coordinator = EventSubscriptionCoordinator(
129+
project_id="my-project",
130+
subscription="events-realtime",
131+
handler=process_immediately, # e.g., profile updates
132+
)
133+
134+
# Path 2: Batched archival (5-10 min latency, cost-optimized)
135+
archive_coordinator = EventSubscriptionCoordinator(
136+
project_id="my-project",
137+
subscription="events-archive",
138+
handler=event_loader.add, # Batches to GCS
139+
)
140+
141+
# Manage both
142+
manager = EventCoordinatorManager([
143+
realtime_coordinator,
144+
archive_coordinator,
145+
])
146+
147+
await manager.start_all()
148+
```
149+
150+
**Use cases**:
151+
- Real-time: Profile updates, personalization, live dashboards
152+
- Archive: Analytics, compliance, data warehousing
118153

119154
## Design Philosophy
120155

@@ -305,17 +340,6 @@ python -m scripts.run_bigquery_loader
305340

306341
See `scripts/bigquery/README.md` and `specs/gcs-bigquery-storage/` for full details.
307342

308-
### Firestore (Development/Testing)
309-
310-
Managed, serverless NoSQL database. Good for development and moderate throughput.
311-
312-
```python
313-
settings = Settings(
314-
gcp_project_id="my-project",
315-
eventkit_event_store="firestore",
316-
)
317-
```
318-
319343
### Custom Storage
320344

321345
Implement the `EventStore` protocol for any backend:
@@ -419,15 +443,15 @@ See [LOCAL_DEV.md](LOCAL_DEV.md) for detailed local development instructions.
419443

420444
**Quick Start:**
421445
```bash
422-
# Start Firestore emulator
446+
# Start GCS emulator (for local development)
423447
docker-compose up -d
424448

425449
# Install dependencies
426450
uv sync
427451

428452
# Run API server
429-
export FIRESTORE_EMULATOR_HOST="localhost:8080"
430453
export GCP_PROJECT_ID="test-project"
454+
export GCP_GCS_BUCKET="test-events"
431455
uv run uvicorn eventkit.api.app:app --reload
432456

433457
# Run tests
@@ -454,19 +478,22 @@ uv run ruff format src/
454478
### Core (v0.x)
455479
- [x] Composable validators (required fields, types, timestamps)
456480
- [x] Segment-compatible adapter with ValidationPipeline
457-
- [ ] Collection API with stream routing
458-
- [ ] Hash-based sequencer for consistent ordering
459-
- [ ] Firestore storage backend (in progress)
460-
- [ ] Error handling and dead letter queue
461-
- [ ] Structured logging
481+
- [x] Collection API with stream routing
482+
- [x] GCS + BigQuery storage backend
483+
- [x] Ring buffer with Write-Ahead Log
484+
- [x] Pub/Sub queue integration
485+
- [x] Event batching and loading
486+
- [x] Prometheus metrics
487+
- [x] EventSubscriptionCoordinator (dual-path architecture)
488+
- [x] Hash-based sequencer for consistent ordering
489+
- [ ] Error handling and dead letter queue (ErrorStore protocol exists, needs implementation)
462490
- [ ] Performance benchmarks (10k+ events/sec)
463491

464492
### v1.0
465-
- [ ] ClickHouse storage backend
466-
- [ ] Pub/Sub integration for async processing
467493
- [ ] OpenAPI spec and generated clients
468494
- [ ] Comprehensive examples and documentation
469495
- [ ] Production deployment guides (Cloud Run, GKE, ECS)
496+
- [ ] S3 + Snowflake/Redshift storage adapters
470497

471498
### Future Ecosystem
472499

src/eventkit/streaming/__init__.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
"""Event streaming coordination for Pub/Sub subscriptions."""
2+
3+
from eventkit.streaming.coordinator import EventHandler, EventSubscriptionCoordinator
4+
from eventkit.streaming.manager import EventCoordinatorManager
5+
6+
__all__ = [
7+
"EventHandler",
8+
"EventSubscriptionCoordinator",
9+
"EventCoordinatorManager",
10+
]

0 commit comments

Comments
 (0)