Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/actions/e2e-boot/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ runs:
IMAGE_TAG='"$IMAGE_TAG"' docker compose pull --quiet 2>&1
echo "--- pull done, starting infra ---"
docker compose up -d --no-build \
mongo redis shared-ca zookeeper-certgen zookeeper kafka schema-registry 2>&1
mongo redis shared-ca zookeeper-certgen zookeeper kafka 2>&1
echo $? > /tmp/infra-pull.exit
' > /tmp/infra-pull.log 2>&1 &
echo $! > /tmp/infra-pull.pid
Expand Down
3 changes: 1 addition & 2 deletions .github/workflows/stack-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ env:
REDIS_IMAGE: redis:7-alpine
KAFKA_IMAGE: confluentinc/cp-kafka:7.8.2
ZOOKEEPER_IMAGE: confluentinc/cp-zookeeper:7.8.2
SCHEMA_REGISTRY_IMAGE: confluentinc/cp-schema-registry:7.8.2
K3S_VERSION: v1.32.11+k3s1
K3S_INSTALL_SHA256: d75e014f2d2ab5d30a318efa5c326f3b0b7596f194afcff90fa7a7a91166d5f7

Expand Down Expand Up @@ -314,7 +313,7 @@ jobs:
run: |
mkdir -p logs
docker compose logs --timestamps > logs/docker-compose.log 2>&1
for svc in backend mongo redis kafka zookeeper schema-registry \
for svc in backend mongo redis kafka zookeeper \
coordinator k8s-worker pod-monitor result-processor \
saga-orchestrator event-replay dlq-processor; do
docker compose logs --timestamps "$svc" > "logs/$svc.log" 2>&1 || true
Expand Down
2 changes: 1 addition & 1 deletion deploy.sh
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ cmd_infra() {

# Start only infrastructure services (no app, no workers, no observability)
# zookeeper-certgen is needed for kafka to start
docker compose up -d zookeeper-certgen mongo redis zookeeper kafka schema-registry $WAIT_FLAG $WAIT_TIMEOUT_FLAG
docker compose up -d zookeeper-certgen mongo redis zookeeper kafka $WAIT_FLAG $WAIT_TIMEOUT_FLAG

print_success "Infrastructure services started"
docker compose ps
Expand Down
28 changes: 0 additions & 28 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,6 @@ services:
condition: service_healthy
kafka:
condition: service_healthy
schema-registry:
condition: service_healthy
volumes:
- ./backend/app:/app/app
- ./backend/workers:/app/workers
Expand Down Expand Up @@ -327,40 +325,17 @@ services:
retries: 15
start_period: 3s

schema-registry:
image: confluentinc/cp-schema-registry:7.8.2
container_name: schema-registry
depends_on:
kafka:
condition: service_healthy
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka:29092
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
networks:
- app-network
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8081/config"]
interval: 3s
timeout: 5s
retries: 15
start_period: 5s

kafdrop:
image: obsidiandynamics/kafdrop:3.31.0
container_name: kafdrop
profiles: ["debug"]
depends_on:
- kafka
- schema-registry
ports:
- "9000:9000"
environment:
KAFKA_BROKERCONNECT: kafka:29092
JVM_OPTS: "-Xms256M -Xmx512M"
SCHEMAREGISTRY_CONNECT: http://schema-registry:8081
networks:
- app-network

Expand All @@ -378,11 +353,8 @@ services:
condition: service_completed_successfully
kafka:
condition: service_healthy
schema-registry:
condition: service_healthy
environment:
- KAFKA_BOOTSTRAP_SERVERS=kafka:29092
- SCHEMA_REGISTRY_URL=http://schema-registry:8081
volumes:
- ./backend/config.toml:/app/config.toml:ro
- ./backend/secrets.toml:/app/secrets.toml:ro
Expand Down
27 changes: 12 additions & 15 deletions docs/architecture/event-system-design.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ This document explains how events flow through the system and how domain events

## The unified event model

Events in Integr8sCode use a unified design where domain events are directly Avro-serializable:
Events in Integr8sCode use a unified design where domain events are plain Pydantic models serialized as JSON:

```mermaid
graph LR
Expand All @@ -13,7 +13,7 @@ graph LR
end

subgraph "Domain Layer"
DE[Domain Events<br/>typed.py<br/>extends AvroBase]
DE[Domain Events<br/>typed.py<br/>extends BaseModel]
end

subgraph "Infrastructure"
Expand All @@ -26,7 +26,7 @@ graph LR
DE --> MongoDB[(MongoDB)]
```

The `EventType` enum defines all possible event types as strings. Domain events are Pydantic models that extend `AvroBase` (from `pydantic-avro`), making them both usable for MongoDB storage and Avro-serializable for Kafka. The mappings module routes events to the correct Kafka topics.
The `EventType` enum defines all possible event types as strings. Domain events are Pydantic `BaseModel` subclasses, making them usable for both MongoDB storage and Kafka transport. FastStream handles JSON serialization natively when publishing and deserializing when consuming. The mappings module routes events to the correct Kafka topics.

This design eliminates duplication between "domain events" and "Kafka events" by making the domain event the single source of truth.

Expand All @@ -42,7 +42,7 @@ Earlier designs maintained separate domain and Kafka event classes, arguing that
The unified approach addresses these issues:

- **Single definition**: Each event is defined once in `domain/events/typed.py`
- **Avro-compatible**: `BaseEvent` extends `AvroBase`, enabling automatic schema generation
- **JSON-native**: `BaseEvent` extends Pydantic `BaseModel`; FastStream serializes to JSON automatically
- **Storage-ready**: Events include storage fields (`stored_at`, `ttl_expires_at`) that MongoDB uses
- **Topic routing**: The `EVENT_TYPE_TO_TOPIC` mapping in `infrastructure/kafka/mappings.py` handles routing

Expand Down Expand Up @@ -92,12 +92,12 @@ sequenceDiagram

This approach is more performant than trying each union member until one validates. The discriminator tells Pydantic exactly which class to use.

## BaseEvent and AvroBase
## BaseEvent

The `BaseEvent` class provides common fields for all events and inherits from `AvroBase` for Avro schema generation:
The `BaseEvent` class provides common fields for all events:

```python
class BaseEvent(AvroBase):
class BaseEvent(BaseModel):
"""Base fields for all domain events."""
model_config = ConfigDict(from_attributes=True)

Expand All @@ -111,10 +111,7 @@ class BaseEvent(AvroBase):
ttl_expires_at: datetime = Field(default_factory=...)
```

The `AvroBase` inheritance enables:
- Automatic Avro schema generation via `BaseEvent.avro_schema()`
- Serialization through the Schema Registry
- Forward compatibility checking
Since `BaseEvent` is a plain Pydantic model, FastStream handles serialization and deserialization transparently — publishing calls `model.model_dump_json()` under the hood, and subscribers receive typed model instances from the incoming JSON.

## Topic routing

Expand Down Expand Up @@ -180,9 +177,9 @@ graph TB
```

When publishing events, the `UnifiedProducer`:
1. Looks up the topic via `EVENT_TYPE_TO_TOPIC`
2. Serializes the event using the Schema Registry
3. Publishes to Kafka
1. Persists the event to MongoDB via `EventRepository`
2. Looks up the topic via `EVENT_TYPE_TO_TOPIC`
3. Publishes the Pydantic model to Kafka through `broker.publish()` (FastStream handles JSON serialization)

The producer handles both storage in MongoDB and publishing to Kafka in a single flow.

Expand All @@ -193,7 +190,7 @@ The producer handles both storage in MongoDB and publishing to Kafka in a single
| [`domain/enums/events.py`](https://github.com/HardMax71/Integr8sCode/blob/main/backend/app/domain/enums/events.py) | `EventType` enum with all event type values |
| [`domain/events/typed.py`](https://github.com/HardMax71/Integr8sCode/blob/main/backend/app/domain/events/typed.py) | All domain event classes and `DomainEvent` union |
| [`infrastructure/kafka/mappings.py`](https://github.com/HardMax71/Integr8sCode/blob/main/backend/app/infrastructure/kafka/mappings.py) | Event-to-topic routing and helper functions |
| [`services/kafka_event_service.py`](https://github.com/HardMax71/Integr8sCode/blob/main/backend/app/services/kafka_event_service.py) | Publishes events to both MongoDB and Kafka |
| [`events/core/producer.py`](https://github.com/HardMax71/Integr8sCode/blob/main/backend/app/events/core/producer.py) | UnifiedProducer — persists to MongoDB then publishes to Kafka |
| [`tests/unit/domain/events/test_event_schema_coverage.py`](https://github.com/HardMax71/Integr8sCode/blob/main/backend/tests/unit/domain/events/test_event_schema_coverage.py) | Validates correspondence between enum and event classes |

## Related docs
Expand Down
8 changes: 4 additions & 4 deletions docs/architecture/kafka-topic-architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,9 @@ Admins can:

Key files:

- `domain/events/typed.py` — all Pydantic event models (extends `AvroBase` for Avro serialization)
- `domain/events/typed.py` — all Pydantic event models (plain `BaseModel` subclasses)
- `infrastructure/kafka/mappings.py` — event-to-topic routing and helper functions
- `events/schema/schema_registry.py` — schema manager
- `events/core/{producer,consumer,dispatcher}.py` — unified Kafka plumbing
- `events/core/producer.py` — UnifiedProducer (persists to MongoDB, publishes to Kafka)
- `events/handlers.py` — FastStream subscriber registrations for all workers

All events are Pydantic models with *strict typing* that extend `AvroBase` for Avro schema generation. The mappings module routes each event type to its destination topic via `EVENT_TYPE_TO_TOPIC`. Schema Registry integration ensures producers and consumers agree on structure, catching incompatible changes *before* runtime failures. The unified producer and consumer classes handle serialization, error handling, and observability.
All events are Pydantic models with strict typing. FastStream handles JSON serialization natively — the producer publishes Pydantic instances directly via `broker.publish()`, and subscribers receive typed model instances. The mappings module routes each event type to its destination topic via `EVENT_TYPE_TO_TOPIC`. Pydantic validation on both ends ensures structural agreement between producers and consumers.
8 changes: 3 additions & 5 deletions docs/architecture/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ For details on specific components, see:
![System diagram](/assets/images/system_diagram.png)

The SPA hits the frontend, which proxies to the API over HTTPS; the API
serves both REST and SSE. Kafka carries events with Schema Registry and Zookeeper backing it; kafka-
serves both REST and SSE. Kafka carries events as JSON (serialized by FastStream) with Zookeeper backing it; kafka-
init seeds topics. All workers are separate containers subscribed to Kafka; the k8s-worker talks to the
Kubernetes API to run code, the pod-monitor watches pods, the result-processor writes results to Mongo
and nudges Redis for SSE fanout, and the saga-orchestrator coordinates long flows with Mongo and Redis.
Expand All @@ -44,7 +44,6 @@ graph LR
Mongo[(MongoDB)]
Redis[(Redis)]
Kafka[Kafka]
Schema["Schema Registry"]
K8s["Kubernetes API"]
OTel["OTel Collector"]
VM["VictoriaMetrics"]
Expand All @@ -63,7 +62,6 @@ graph LR
Repos --> Mongo
Services <-->|"keys + SSE bus"| Redis
Events <-->|"produce/consume"| Kafka
Events ---|"subjects/IDs"| Schema
Services -->|"read limits"| K8s

%% Telemetry edges
Expand All @@ -76,11 +74,11 @@ graph LR
- **Routers**: REST + SSE endpoints
- **DI (Dishka)**: Dependency injection & providers
- **Services**: Execution, Events, SSE, Idempotency, Notifications, User Settings, Rate Limit, Saved Scripts, Replay, Saga API
- **Kafka Layer**: Producer, Consumer, Dispatcher, EventStore, SchemaRegistryManager
- **Kafka Layer**: UnifiedProducer, FastStream subscribers, EventStore

FastAPI under Uvicorn exposes REST and SSE routes, with middleware and DI wiring the core services.
Those services use Mongo-backed repositories for state and a unified Kafka layer to publish and react
to events, with the schema registry ensuring compatibility. Redis handles rate limiting and SSE fanout.
to events. FastStream handles Pydantic JSON serialization for all Kafka messages. Redis handles rate limiting and SSE fanout.
Telemetry flows through the OpenTelemetry Collector—metrics to VictoriaMetrics for Grafana and traces
to Jaeger. Kubernetes interactions are read via the API. This view focuses on the app’s building blocks;
event workers live in the system diagram.
Expand Down
2 changes: 1 addition & 1 deletion docs/architecture/user-settings-events.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ notifications, or editor settings. This eliminates branching in both publishing
```

The `changed_fields` list identifies which settings changed. Typed fields (`theme`, `notifications`, `editor`, etc.)
contain the new values in Avro-compatible form.
contain the new values as Pydantic model fields.

## TypeAdapter pattern

Expand Down
23 changes: 6 additions & 17 deletions docs/components/schema-manager.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# Schema management

The backend manages two kinds of schemas: MongoDB collections with indexes and validators, and Kafka event schemas in Avro format with a Confluent Schema Registry. Both are initialized at process start, whether the process is the main API or a standalone worker.
The backend manages MongoDB collection schemas — indexes, validators, and TTL policies. These are initialized at process start, whether the process is the main API or a standalone worker.

Kafka event serialization is handled entirely by FastStream with Pydantic JSON; there is no schema registry involved. See [Event System Design](../architecture/event-system-design.md) for details on event serialization.

## MongoDB schema

Expand All @@ -14,32 +16,19 @@ Other migrations create indexes for user settings snapshots, replay sessions, no

Repositories don't create their own indexes — they only read and write. This separation keeps startup behavior predictable and prevents the same index being created from multiple code paths.

## Kafka schema registry

The `SchemaRegistryManager` class in `app/events/schema/schema_registry.py` handles Avro serialization for Kafka events. All registry operations are async and must be awaited. The manager connects to a Confluent Schema Registry and registers schemas for all event types at startup via `await initialize_schemas()`.

All event classes in `domain/events/typed.py` extend `AvroBase` (from `pydantic-avro`), enabling automatic Avro schema generation. The manager registers these schemas with subjects named after the class (like `ExecutionRequestedEvent-value`) and sets FORWARD compatibility, meaning new schemas can add fields but not remove required ones. This allows producers to be upgraded before consumers without breaking deserialization.

Serialization and deserialization are async — `await serialize_event(event)` and `await deserialize_event(data, topic)` must be awaited. The wire format follows Confluent conventions: a magic byte, four-byte schema id, then the Avro binary payload. The underlying `python-schema-registry-client` library handles schema registration caching internally. The manager maintains a bidirectional cache between schema ids and Python event classes for deserialization. When deserializing, it reads the schema id from the message header, looks up the corresponding event class, deserializes the Avro payload to a dict, and hydrates the Pydantic model.

For test isolation, the manager supports an optional `SCHEMA_SUBJECT_PREFIX` environment variable. Setting this to something like `test.session123.` prefixes all subject names, preventing test runs from polluting production schemas or interfering with each other.

## Startup sequence

During API startup, the `lifespan` function in `dishka_lifespan.py` gets the database from the DI container, creates a `SchemaManager`, and calls `await apply_all()`. It does the same for `SchemaRegistryManager`, calling `await initialize_schemas()` to register all event types (async — must be awaited). Workers like the saga orchestrator and event replay service follow the same pattern — they connect to MongoDB, run schema migrations, and await schema registry initialization before starting their main loops.
During API startup, the `lifespan` function in `dishka_lifespan.py` initializes Beanie with the MongoDB client, then resolves the `KafkaBroker` from DI, registers FastStream subscribers, sets up Dishka integration, and starts the broker. Workers follow the same pattern — they connect to MongoDB, initialize Beanie, register their subscribers on the broker, and start consuming.

## Local development

To force a specific MongoDB migration to run again, delete its document from `schema_versions`. To start fresh, point the app at a new database. Migrations are designed to be additive; the system doesn't support automatic rollbacks. If you need to undo a migration in production, you'll have to drop indexes or modify validators manually.

For Kafka schemas, the registry keeps all versions. If you break compatibility and need to start over, delete the subject from the registry (either via REST API or the registry's UI if available) and let the app re-register on next startup.

## Key files

| File | Purpose |
|--------------------------------------------------------------------------------------------------------------------------------|----------------------------|
| [`schema_manager.py`](https://github.com/HardMax71/Integr8sCode/blob/main/backend/app/db/schema/schema_manager.py) | MongoDB migrations |
| [`schema_registry.py`](https://github.com/HardMax71/Integr8sCode/blob/main/backend/app/events/schema/schema_registry.py) | Kafka Avro serialization |
| [`typed.py`](https://github.com/HardMax71/Integr8sCode/blob/main/backend/app/domain/events/typed.py) | Domain events (extend AvroBase) |
| [`typed.py`](https://github.com/HardMax71/Integr8sCode/blob/main/backend/app/domain/events/typed.py) | Domain events (Pydantic BaseModel) |
| [`mappings.py`](https://github.com/HardMax71/Integr8sCode/blob/main/backend/app/infrastructure/kafka/mappings.py) | Event-to-topic routing |
| [`dishka_lifespan.py`](https://github.com/HardMax71/Integr8sCode/blob/main/backend/app/dishka_lifespan.py) | Startup initialization |
| [`dishka_lifespan.py`](https://github.com/HardMax71/Integr8sCode/blob/main/backend/app/core/dishka_lifespan.py) | Startup initialization |
2 changes: 1 addition & 1 deletion docs/getting-started.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ This guide walks you through running Integr8sCode locally and executing your fir

## What you're deploying

The full stack includes a Svelte frontend, FastAPI backend, MongoDB, Redis, Kafka with Schema Registry, and seven background workers. Sounds like a lot, but `docker compose` handles all of it. First startup takes a few minutes to pull images and initialize services; subsequent starts are much faster.
The full stack includes a Svelte frontend, FastAPI backend, MongoDB, Redis, Kafka, and seven background workers. Sounds like a lot, but `docker compose` handles all of it. First startup takes a few minutes to pull images and initialize services; subsequent starts are much faster.

## Start the stack

Expand Down
2 changes: 1 addition & 1 deletion docs/operations/cicd.md
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ This action kicks off three slow tasks that can overlap:

1. **GHCR login** using `docker/login-action@v3`
2. **Background image pull + infra pre-warm** — pulls all compose images then starts infrastructure services
(mongo, redis, kafka, zookeeper, schema-registry) in a background `nohup` process. The exit status is persisted
(mongo, redis, kafka, zookeeper) in a background `nohup` process. The exit status is persisted
to `/tmp/infra-pull.exit` so the next action can check for failures.
3. **k3s install** — downloads and installs a pinned k3s version with SHA256 checksum verification (see
[supply-chain hardening](#supply-chain-hardening) below)
Expand Down
Loading
Loading