You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
The `EventType` enum defines all possible event types as strings. Domain events are Pydantic models that extend `AvroBase` (from `pydantic-avro`), making them both usable for MongoDB storage and Avro-serializable for Kafka. The mappings module routes events to the correct Kafka topics.
29
+
The `EventType` enum defines all possible event types as strings. Domain events are Pydantic `BaseModel` subclasses, making them usable for both MongoDB storage and Kafka transport. FastStream handles JSON serialization natively when publishing and deserializing when consuming. The mappings module routes events to the correct Kafka topics.
30
30
31
31
This design eliminates duplication between "domain events" and "Kafka events" by making the domain event the single source of truth.
32
32
@@ -42,7 +42,7 @@ Earlier designs maintained separate domain and Kafka event classes, arguing that
42
42
The unified approach addresses these issues:
43
43
44
44
-**Single definition**: Each event is defined once in `domain/events/typed.py`
- Automatic Avro schema generation via `BaseEvent.avro_schema()`
116
-
- Serialization through the Schema Registry
117
-
- Forward compatibility checking
114
+
Since `BaseEvent` is a plain Pydantic model, FastStream handles serialization and deserialization transparently — publishing calls `model.model_dump_json()` under the hood, and subscribers receive typed model instances from the incoming JSON.
118
115
119
116
## Topic routing
120
117
@@ -180,9 +177,9 @@ graph TB
180
177
```
181
178
182
179
When publishing events, the `UnifiedProducer`:
183
-
1.Looks up the topic via `EVENT_TYPE_TO_TOPIC`
184
-
2.Serializes the event using the Schema Registry
185
-
3. Publishes to Kafka
180
+
1.Persists the event to MongoDB via `EventRepository`
181
+
2.Looks up the topic via `EVENT_TYPE_TO_TOPIC`
182
+
3. Publishes the Pydantic model to Kafka through `broker.publish()` (FastStream handles JSON serialization)
186
183
187
184
The producer handles both storage in MongoDB and publishing to Kafka in a single flow.
188
185
@@ -193,7 +190,7 @@ The producer handles both storage in MongoDB and publishing to Kafka in a single
193
190
|[`domain/enums/events.py`](https://github.com/HardMax71/Integr8sCode/blob/main/backend/app/domain/enums/events.py)|`EventType` enum with all event type values |
194
191
|[`domain/events/typed.py`](https://github.com/HardMax71/Integr8sCode/blob/main/backend/app/domain/events/typed.py)| All domain event classes and `DomainEvent` union |
195
192
|[`infrastructure/kafka/mappings.py`](https://github.com/HardMax71/Integr8sCode/blob/main/backend/app/infrastructure/kafka/mappings.py)| Event-to-topic routing and helper functions |
196
-
|[`services/kafka_event_service.py`](https://github.com/HardMax71/Integr8sCode/blob/main/backend/app/services/kafka_event_service.py)|Publishes events to both MongoDB and Kafka |
193
+
|[`events/core/producer.py`](https://github.com/HardMax71/Integr8sCode/blob/main/backend/app/events/core/producer.py)|UnifiedProducer — persists to MongoDB then publishes to Kafka |
197
194
|[`tests/unit/domain/events/test_event_schema_coverage.py`](https://github.com/HardMax71/Integr8sCode/blob/main/backend/tests/unit/domain/events/test_event_schema_coverage.py)| Validates correspondence between enum and event classes |
-`events/core/producer.py` — UnifiedProducer (persists to MongoDB, publishes to Kafka)
153
+
-`events/handlers.py` — FastStream subscriber registrations for all workers
154
154
155
-
All events are Pydantic models with *strict typing* that extend `AvroBase` for Avro schema generation. The mappings module routes each event type to its destination topic via `EVENT_TYPE_TO_TOPIC`. Schema Registry integration ensures producers and consumers agree on structure, catching incompatible changes *before* runtime failures. The unified producer and consumer classes handle serialization, error handling, and observability.
155
+
All events are Pydantic models with strict typing. FastStream handles JSON serialization natively — the producer publishes Pydantic instances directly via `broker.publish()`, and subscribers receive typed model instances. The mappings module routes each event type to its destination topic via `EVENT_TYPE_TO_TOPIC`. Pydantic validation on both ends ensures structural agreement between producers and consumers.
The backend manages two kinds of schemas: MongoDB collections with indexes and validators, and Kafka event schemas in Avro format with a Confluent Schema Registry. Both are initialized at process start, whether the process is the main API or a standalone worker.
3
+
The backend manages MongoDB collection schemas — indexes, validators, and TTL policies. These are initialized at process start, whether the process is the main API or a standalone worker.
4
+
5
+
Kafka event serialization is handled entirely by FastStream with Pydantic JSON; there is no schema registry involved. See [Event System Design](../architecture/event-system-design.md) for details on event serialization.
4
6
5
7
## MongoDB schema
6
8
@@ -14,32 +16,19 @@ Other migrations create indexes for user settings snapshots, replay sessions, no
14
16
15
17
Repositories don't create their own indexes — they only read and write. This separation keeps startup behavior predictable and prevents the same index being created from multiple code paths.
16
18
17
-
## Kafka schema registry
18
-
19
-
The `SchemaRegistryManager` class in `app/events/schema/schema_registry.py` handles Avro serialization for Kafka events. All registry operations are async and must be awaited. The manager connects to a Confluent Schema Registry and registers schemas for all event types at startup via `await initialize_schemas()`.
20
-
21
-
All event classes in `domain/events/typed.py` extend `AvroBase` (from `pydantic-avro`), enabling automatic Avro schema generation. The manager registers these schemas with subjects named after the class (like `ExecutionRequestedEvent-value`) and sets FORWARD compatibility, meaning new schemas can add fields but not remove required ones. This allows producers to be upgraded before consumers without breaking deserialization.
22
-
23
-
Serialization and deserialization are async — `await serialize_event(event)` and `await deserialize_event(data, topic)` must be awaited. The wire format follows Confluent conventions: a magic byte, four-byte schema id, then the Avro binary payload. The underlying `python-schema-registry-client` library handles schema registration caching internally. The manager maintains a bidirectional cache between schema ids and Python event classes for deserialization. When deserializing, it reads the schema id from the message header, looks up the corresponding event class, deserializes the Avro payload to a dict, and hydrates the Pydantic model.
24
-
25
-
For test isolation, the manager supports an optional `SCHEMA_SUBJECT_PREFIX` environment variable. Setting this to something like `test.session123.` prefixes all subject names, preventing test runs from polluting production schemas or interfering with each other.
26
-
27
19
## Startup sequence
28
20
29
-
During API startup, the `lifespan` function in `dishka_lifespan.py`gets the database from the DI container, creates a `SchemaManager`, and calls `await apply_all()`. It does the same for `SchemaRegistryManager`, calling `await initialize_schemas()` to register all event types (async — must be awaited). Workers like the saga orchestrator and event replay service follow the same pattern — they connect to MongoDB, run schema migrations, and await schema registry initialization before starting their main loops.
21
+
During API startup, the `lifespan` function in `dishka_lifespan.py`initializes Beanie with the MongoDB client, then resolves the `KafkaBroker` from DI, registers FastStream subscribers, sets up Dishka integration, and starts the broker. Workers follow the same pattern — they connect to MongoDB, initialize Beanie, register their subscribers on the broker, and start consuming.
30
22
31
23
## Local development
32
24
33
25
To force a specific MongoDB migration to run again, delete its document from `schema_versions`. To start fresh, point the app at a new database. Migrations are designed to be additive; the system doesn't support automatic rollbacks. If you need to undo a migration in production, you'll have to drop indexes or modify validators manually.
34
26
35
-
For Kafka schemas, the registry keeps all versions. If you break compatibility and need to start over, delete the subject from the registry (either via REST API or the registry's UI if available) and let the app re-register on next startup.
Copy file name to clipboardExpand all lines: docs/getting-started.md
+1-1Lines changed: 1 addition & 1 deletion
Display the source diff
Display the rich diff
Original file line number
Diff line number
Diff line change
@@ -4,7 +4,7 @@ This guide walks you through running Integr8sCode locally and executing your fir
4
4
5
5
## What you're deploying
6
6
7
-
The full stack includes a Svelte frontend, FastAPI backend, MongoDB, Redis, Kafka with Schema Registry, and seven background workers. Sounds like a lot, but `docker compose` handles all of it. First startup takes a few minutes to pull images and initialize services; subsequent starts are much faster.
7
+
The full stack includes a Svelte frontend, FastAPI backend, MongoDB, Redis, Kafka, and seven background workers. Sounds like a lot, but `docker compose` handles all of it. First startup takes a few minutes to pull images and initialize services; subsequent starts are much faster.
0 commit comments