diff --git a/.github/actions/e2e-boot/action.yml b/.github/actions/e2e-boot/action.yml
index 01850ea1..39d10df9 100644
--- a/.github/actions/e2e-boot/action.yml
+++ b/.github/actions/e2e-boot/action.yml
@@ -28,7 +28,7 @@ runs:
IMAGE_TAG='"$IMAGE_TAG"' docker compose pull --quiet 2>&1
echo "--- pull done, starting infra ---"
docker compose up -d --no-build \
- mongo redis shared-ca zookeeper-certgen zookeeper kafka schema-registry 2>&1
+ mongo redis shared-ca zookeeper-certgen zookeeper kafka 2>&1
echo $? > /tmp/infra-pull.exit
' > /tmp/infra-pull.log 2>&1 &
echo $! > /tmp/infra-pull.pid
diff --git a/.github/workflows/stack-tests.yml b/.github/workflows/stack-tests.yml
index 373565ab..ff5e9649 100644
--- a/.github/workflows/stack-tests.yml
+++ b/.github/workflows/stack-tests.yml
@@ -30,7 +30,6 @@ env:
REDIS_IMAGE: redis:7-alpine
KAFKA_IMAGE: confluentinc/cp-kafka:7.8.2
ZOOKEEPER_IMAGE: confluentinc/cp-zookeeper:7.8.2
- SCHEMA_REGISTRY_IMAGE: confluentinc/cp-schema-registry:7.8.2
K3S_VERSION: v1.32.11+k3s1
K3S_INSTALL_SHA256: d75e014f2d2ab5d30a318efa5c326f3b0b7596f194afcff90fa7a7a91166d5f7
@@ -314,7 +313,7 @@ jobs:
run: |
mkdir -p logs
docker compose logs --timestamps > logs/docker-compose.log 2>&1
- for svc in backend mongo redis kafka zookeeper schema-registry \
+ for svc in backend mongo redis kafka zookeeper \
coordinator k8s-worker pod-monitor result-processor \
saga-orchestrator event-replay dlq-processor; do
docker compose logs --timestamps "$svc" > "logs/$svc.log" 2>&1 || true
diff --git a/deploy.sh b/deploy.sh
index a7dc8bec..8f20d0cb 100755
--- a/deploy.sh
+++ b/deploy.sh
@@ -193,7 +193,7 @@ cmd_infra() {
# Start only infrastructure services (no app, no workers, no observability)
# zookeeper-certgen is needed for kafka to start
- docker compose up -d zookeeper-certgen mongo redis zookeeper kafka schema-registry $WAIT_FLAG $WAIT_TIMEOUT_FLAG
+ docker compose up -d zookeeper-certgen mongo redis zookeeper kafka $WAIT_FLAG $WAIT_TIMEOUT_FLAG
print_success "Infrastructure services started"
docker compose ps
diff --git a/docker-compose.yaml b/docker-compose.yaml
index 80c215c8..78ca0763 100644
--- a/docker-compose.yaml
+++ b/docker-compose.yaml
@@ -100,8 +100,6 @@ services:
condition: service_healthy
kafka:
condition: service_healthy
- schema-registry:
- condition: service_healthy
volumes:
- ./backend/app:/app/app
- ./backend/workers:/app/workers
@@ -327,40 +325,17 @@ services:
retries: 15
start_period: 3s
- schema-registry:
- image: confluentinc/cp-schema-registry:7.8.2
- container_name: schema-registry
- depends_on:
- kafka:
- condition: service_healthy
- ports:
- - "8081:8081"
- environment:
- SCHEMA_REGISTRY_HOST_NAME: schema-registry
- SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka:29092
- SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
- networks:
- - app-network
- healthcheck:
- test: ["CMD", "curl", "-f", "http://localhost:8081/config"]
- interval: 3s
- timeout: 5s
- retries: 15
- start_period: 5s
-
kafdrop:
image: obsidiandynamics/kafdrop:3.31.0
container_name: kafdrop
profiles: ["debug"]
depends_on:
- kafka
- - schema-registry
ports:
- "9000:9000"
environment:
KAFKA_BROKERCONNECT: kafka:29092
JVM_OPTS: "-Xms256M -Xmx512M"
- SCHEMAREGISTRY_CONNECT: http://schema-registry:8081
networks:
- app-network
@@ -378,11 +353,8 @@ services:
condition: service_completed_successfully
kafka:
condition: service_healthy
- schema-registry:
- condition: service_healthy
environment:
- KAFKA_BOOTSTRAP_SERVERS=kafka:29092
- - SCHEMA_REGISTRY_URL=http://schema-registry:8081
volumes:
- ./backend/config.toml:/app/config.toml:ro
- ./backend/secrets.toml:/app/secrets.toml:ro
diff --git a/docs/architecture/event-system-design.md b/docs/architecture/event-system-design.md
index 5e1c2787..ab0d2c8c 100644
--- a/docs/architecture/event-system-design.md
+++ b/docs/architecture/event-system-design.md
@@ -4,7 +4,7 @@ This document explains how events flow through the system and how domain events
## The unified event model
-Events in Integr8sCode use a unified design where domain events are directly Avro-serializable:
+Events in Integr8sCode use a unified design where domain events are plain Pydantic models serialized as JSON:
```mermaid
graph LR
@@ -13,7 +13,7 @@ graph LR
end
subgraph "Domain Layer"
- DE[Domain Events
typed.py
extends AvroBase]
+ DE[Domain Events
typed.py
extends BaseModel]
end
subgraph "Infrastructure"
@@ -26,7 +26,7 @@ graph LR
DE --> MongoDB[(MongoDB)]
```
-The `EventType` enum defines all possible event types as strings. Domain events are Pydantic models that extend `AvroBase` (from `pydantic-avro`), making them both usable for MongoDB storage and Avro-serializable for Kafka. The mappings module routes events to the correct Kafka topics.
+The `EventType` enum defines all possible event types as strings. Domain events are Pydantic `BaseModel` subclasses, making them usable for both MongoDB storage and Kafka transport. FastStream handles JSON serialization natively when publishing and deserializing when consuming. The mappings module routes events to the correct Kafka topics.
This design eliminates duplication between "domain events" and "Kafka events" by making the domain event the single source of truth.
@@ -42,7 +42,7 @@ Earlier designs maintained separate domain and Kafka event classes, arguing that
The unified approach addresses these issues:
- **Single definition**: Each event is defined once in `domain/events/typed.py`
-- **Avro-compatible**: `BaseEvent` extends `AvroBase`, enabling automatic schema generation
+- **JSON-native**: `BaseEvent` extends Pydantic `BaseModel`; FastStream serializes to JSON automatically
- **Storage-ready**: Events include storage fields (`stored_at`, `ttl_expires_at`) that MongoDB uses
- **Topic routing**: The `EVENT_TYPE_TO_TOPIC` mapping in `infrastructure/kafka/mappings.py` handles routing
@@ -92,12 +92,12 @@ sequenceDiagram
This approach is more performant than trying each union member until one validates. The discriminator tells Pydantic exactly which class to use.
-## BaseEvent and AvroBase
+## BaseEvent
-The `BaseEvent` class provides common fields for all events and inherits from `AvroBase` for Avro schema generation:
+The `BaseEvent` class provides common fields for all events:
```python
-class BaseEvent(AvroBase):
+class BaseEvent(BaseModel):
"""Base fields for all domain events."""
model_config = ConfigDict(from_attributes=True)
@@ -111,10 +111,7 @@ class BaseEvent(AvroBase):
ttl_expires_at: datetime = Field(default_factory=...)
```
-The `AvroBase` inheritance enables:
-- Automatic Avro schema generation via `BaseEvent.avro_schema()`
-- Serialization through the Schema Registry
-- Forward compatibility checking
+Since `BaseEvent` is a plain Pydantic model, FastStream handles serialization and deserialization transparently — publishing calls `model.model_dump_json()` under the hood, and subscribers receive typed model instances from the incoming JSON.
## Topic routing
@@ -180,9 +177,9 @@ graph TB
```
When publishing events, the `UnifiedProducer`:
-1. Looks up the topic via `EVENT_TYPE_TO_TOPIC`
-2. Serializes the event using the Schema Registry
-3. Publishes to Kafka
+1. Persists the event to MongoDB via `EventRepository`
+2. Looks up the topic via `EVENT_TYPE_TO_TOPIC`
+3. Publishes the Pydantic model to Kafka through `broker.publish()` (FastStream handles JSON serialization)
The producer handles both storage in MongoDB and publishing to Kafka in a single flow.
@@ -193,7 +190,7 @@ The producer handles both storage in MongoDB and publishing to Kafka in a single
| [`domain/enums/events.py`](https://github.com/HardMax71/Integr8sCode/blob/main/backend/app/domain/enums/events.py) | `EventType` enum with all event type values |
| [`domain/events/typed.py`](https://github.com/HardMax71/Integr8sCode/blob/main/backend/app/domain/events/typed.py) | All domain event classes and `DomainEvent` union |
| [`infrastructure/kafka/mappings.py`](https://github.com/HardMax71/Integr8sCode/blob/main/backend/app/infrastructure/kafka/mappings.py) | Event-to-topic routing and helper functions |
-| [`services/kafka_event_service.py`](https://github.com/HardMax71/Integr8sCode/blob/main/backend/app/services/kafka_event_service.py) | Publishes events to both MongoDB and Kafka |
+| [`events/core/producer.py`](https://github.com/HardMax71/Integr8sCode/blob/main/backend/app/events/core/producer.py) | UnifiedProducer — persists to MongoDB then publishes to Kafka |
| [`tests/unit/domain/events/test_event_schema_coverage.py`](https://github.com/HardMax71/Integr8sCode/blob/main/backend/tests/unit/domain/events/test_event_schema_coverage.py) | Validates correspondence between enum and event classes |
## Related docs
diff --git a/docs/architecture/kafka-topic-architecture.md b/docs/architecture/kafka-topic-architecture.md
index 8b17855b..6c59ac09 100644
--- a/docs/architecture/kafka-topic-architecture.md
+++ b/docs/architecture/kafka-topic-architecture.md
@@ -147,9 +147,9 @@ Admins can:
Key files:
-- `domain/events/typed.py` — all Pydantic event models (extends `AvroBase` for Avro serialization)
+- `domain/events/typed.py` — all Pydantic event models (plain `BaseModel` subclasses)
- `infrastructure/kafka/mappings.py` — event-to-topic routing and helper functions
-- `events/schema/schema_registry.py` — schema manager
-- `events/core/{producer,consumer,dispatcher}.py` — unified Kafka plumbing
+- `events/core/producer.py` — UnifiedProducer (persists to MongoDB, publishes to Kafka)
+- `events/handlers.py` — FastStream subscriber registrations for all workers
-All events are Pydantic models with *strict typing* that extend `AvroBase` for Avro schema generation. The mappings module routes each event type to its destination topic via `EVENT_TYPE_TO_TOPIC`. Schema Registry integration ensures producers and consumers agree on structure, catching incompatible changes *before* runtime failures. The unified producer and consumer classes handle serialization, error handling, and observability.
+All events are Pydantic models with strict typing. FastStream handles JSON serialization natively — the producer publishes Pydantic instances directly via `broker.publish()`, and subscribers receive typed model instances. The mappings module routes each event type to its destination topic via `EVENT_TYPE_TO_TOPIC`. Pydantic validation on both ends ensures structural agreement between producers and consumers.
diff --git a/docs/architecture/overview.md b/docs/architecture/overview.md
index 8d85e7bd..7498135c 100644
--- a/docs/architecture/overview.md
+++ b/docs/architecture/overview.md
@@ -17,7 +17,7 @@ For details on specific components, see:

The SPA hits the frontend, which proxies to the API over HTTPS; the API
-serves both REST and SSE. Kafka carries events with Schema Registry and Zookeeper backing it; kafka-
+serves both REST and SSE. Kafka carries events as JSON (serialized by FastStream) with Zookeeper backing it; kafka-
init seeds topics. All workers are separate containers subscribed to Kafka; the k8s-worker talks to the
Kubernetes API to run code, the pod-monitor watches pods, the result-processor writes results to Mongo
and nudges Redis for SSE fanout, and the saga-orchestrator coordinates long flows with Mongo and Redis.
@@ -44,7 +44,6 @@ graph LR
Mongo[(MongoDB)]
Redis[(Redis)]
Kafka[Kafka]
- Schema["Schema Registry"]
K8s["Kubernetes API"]
OTel["OTel Collector"]
VM["VictoriaMetrics"]
@@ -63,7 +62,6 @@ graph LR
Repos --> Mongo
Services <-->|"keys + SSE bus"| Redis
Events <-->|"produce/consume"| Kafka
- Events ---|"subjects/IDs"| Schema
Services -->|"read limits"| K8s
%% Telemetry edges
@@ -76,11 +74,11 @@ graph LR
- **Routers**: REST + SSE endpoints
- **DI (Dishka)**: Dependency injection & providers
- **Services**: Execution, Events, SSE, Idempotency, Notifications, User Settings, Rate Limit, Saved Scripts, Replay, Saga API
- - **Kafka Layer**: Producer, Consumer, Dispatcher, EventStore, SchemaRegistryManager
+ - **Kafka Layer**: UnifiedProducer, FastStream subscribers, EventStore
FastAPI under Uvicorn exposes REST and SSE routes, with middleware and DI wiring the core services.
Those services use Mongo-backed repositories for state and a unified Kafka layer to publish and react
-to events, with the schema registry ensuring compatibility. Redis handles rate limiting and SSE fanout.
+to events. FastStream handles Pydantic JSON serialization for all Kafka messages. Redis handles rate limiting and SSE fanout.
Telemetry flows through the OpenTelemetry Collector—metrics to VictoriaMetrics for Grafana and traces
to Jaeger. Kubernetes interactions are read via the API. This view focuses on the app’s building blocks;
event workers live in the system diagram.
diff --git a/docs/architecture/user-settings-events.md b/docs/architecture/user-settings-events.md
index 8a6b9378..c6d270e0 100644
--- a/docs/architecture/user-settings-events.md
+++ b/docs/architecture/user-settings-events.md
@@ -13,7 +13,7 @@ notifications, or editor settings. This eliminates branching in both publishing
```
The `changed_fields` list identifies which settings changed. Typed fields (`theme`, `notifications`, `editor`, etc.)
-contain the new values in Avro-compatible form.
+contain the new values as Pydantic model fields.
## TypeAdapter pattern
diff --git a/docs/components/schema-manager.md b/docs/components/schema-manager.md
index d6c05c7a..56b3e552 100644
--- a/docs/components/schema-manager.md
+++ b/docs/components/schema-manager.md
@@ -1,6 +1,8 @@
# Schema management
-The backend manages two kinds of schemas: MongoDB collections with indexes and validators, and Kafka event schemas in Avro format with a Confluent Schema Registry. Both are initialized at process start, whether the process is the main API or a standalone worker.
+The backend manages MongoDB collection schemas — indexes, validators, and TTL policies. These are initialized at process start, whether the process is the main API or a standalone worker.
+
+Kafka event serialization is handled entirely by FastStream with Pydantic JSON; there is no schema registry involved. See [Event System Design](../architecture/event-system-design.md) for details on event serialization.
## MongoDB schema
@@ -14,32 +16,19 @@ Other migrations create indexes for user settings snapshots, replay sessions, no
Repositories don't create their own indexes — they only read and write. This separation keeps startup behavior predictable and prevents the same index being created from multiple code paths.
-## Kafka schema registry
-
-The `SchemaRegistryManager` class in `app/events/schema/schema_registry.py` handles Avro serialization for Kafka events. All registry operations are async and must be awaited. The manager connects to a Confluent Schema Registry and registers schemas for all event types at startup via `await initialize_schemas()`.
-
-All event classes in `domain/events/typed.py` extend `AvroBase` (from `pydantic-avro`), enabling automatic Avro schema generation. The manager registers these schemas with subjects named after the class (like `ExecutionRequestedEvent-value`) and sets FORWARD compatibility, meaning new schemas can add fields but not remove required ones. This allows producers to be upgraded before consumers without breaking deserialization.
-
-Serialization and deserialization are async — `await serialize_event(event)` and `await deserialize_event(data, topic)` must be awaited. The wire format follows Confluent conventions: a magic byte, four-byte schema id, then the Avro binary payload. The underlying `python-schema-registry-client` library handles schema registration caching internally. The manager maintains a bidirectional cache between schema ids and Python event classes for deserialization. When deserializing, it reads the schema id from the message header, looks up the corresponding event class, deserializes the Avro payload to a dict, and hydrates the Pydantic model.
-
-For test isolation, the manager supports an optional `SCHEMA_SUBJECT_PREFIX` environment variable. Setting this to something like `test.session123.` prefixes all subject names, preventing test runs from polluting production schemas or interfering with each other.
-
## Startup sequence
-During API startup, the `lifespan` function in `dishka_lifespan.py` gets the database from the DI container, creates a `SchemaManager`, and calls `await apply_all()`. It does the same for `SchemaRegistryManager`, calling `await initialize_schemas()` to register all event types (async — must be awaited). Workers like the saga orchestrator and event replay service follow the same pattern — they connect to MongoDB, run schema migrations, and await schema registry initialization before starting their main loops.
+During API startup, the `lifespan` function in `dishka_lifespan.py` initializes Beanie with the MongoDB client, then resolves the `KafkaBroker` from DI, registers FastStream subscribers, sets up Dishka integration, and starts the broker. Workers follow the same pattern — they connect to MongoDB, initialize Beanie, register their subscribers on the broker, and start consuming.
## Local development
To force a specific MongoDB migration to run again, delete its document from `schema_versions`. To start fresh, point the app at a new database. Migrations are designed to be additive; the system doesn't support automatic rollbacks. If you need to undo a migration in production, you'll have to drop indexes or modify validators manually.
-For Kafka schemas, the registry keeps all versions. If you break compatibility and need to start over, delete the subject from the registry (either via REST API or the registry's UI if available) and let the app re-register on next startup.
-
## Key files
| File | Purpose |
|--------------------------------------------------------------------------------------------------------------------------------|----------------------------|
| [`schema_manager.py`](https://github.com/HardMax71/Integr8sCode/blob/main/backend/app/db/schema/schema_manager.py) | MongoDB migrations |
-| [`schema_registry.py`](https://github.com/HardMax71/Integr8sCode/blob/main/backend/app/events/schema/schema_registry.py) | Kafka Avro serialization |
-| [`typed.py`](https://github.com/HardMax71/Integr8sCode/blob/main/backend/app/domain/events/typed.py) | Domain events (extend AvroBase) |
+| [`typed.py`](https://github.com/HardMax71/Integr8sCode/blob/main/backend/app/domain/events/typed.py) | Domain events (Pydantic BaseModel) |
| [`mappings.py`](https://github.com/HardMax71/Integr8sCode/blob/main/backend/app/infrastructure/kafka/mappings.py) | Event-to-topic routing |
-| [`dishka_lifespan.py`](https://github.com/HardMax71/Integr8sCode/blob/main/backend/app/dishka_lifespan.py) | Startup initialization |
+| [`dishka_lifespan.py`](https://github.com/HardMax71/Integr8sCode/blob/main/backend/app/core/dishka_lifespan.py) | Startup initialization |
diff --git a/docs/getting-started.md b/docs/getting-started.md
index 0b9348b3..64769e04 100644
--- a/docs/getting-started.md
+++ b/docs/getting-started.md
@@ -4,7 +4,7 @@ This guide walks you through running Integr8sCode locally and executing your fir
## What you're deploying
-The full stack includes a Svelte frontend, FastAPI backend, MongoDB, Redis, Kafka with Schema Registry, and seven background workers. Sounds like a lot, but `docker compose` handles all of it. First startup takes a few minutes to pull images and initialize services; subsequent starts are much faster.
+The full stack includes a Svelte frontend, FastAPI backend, MongoDB, Redis, Kafka, and seven background workers. Sounds like a lot, but `docker compose` handles all of it. First startup takes a few minutes to pull images and initialize services; subsequent starts are much faster.
## Start the stack
diff --git a/docs/operations/cicd.md b/docs/operations/cicd.md
index 0db2fee6..68edba42 100644
--- a/docs/operations/cicd.md
+++ b/docs/operations/cicd.md
@@ -177,7 +177,7 @@ This action kicks off three slow tasks that can overlap:
1. **GHCR login** using `docker/login-action@v3`
2. **Background image pull + infra pre-warm** — pulls all compose images then starts infrastructure services
- (mongo, redis, kafka, zookeeper, schema-registry) in a background `nohup` process. The exit status is persisted
+ (mongo, redis, kafka, zookeeper) in a background `nohup` process. The exit status is persisted
to `/tmp/infra-pull.exit` so the next action can check for failures.
3. **k3s install** — downloads and installs a pinned k3s version with SHA256 checksum verification (see
[supply-chain hardening](#supply-chain-hardening) below)
diff --git a/docs/operations/deployment.md b/docs/operations/deployment.md
index d5a19adf..31ac8bd7 100644
--- a/docs/operations/deployment.md
+++ b/docs/operations/deployment.md
@@ -47,7 +47,7 @@ with health checks and dependency ordering, so containers start in the correct s
./deploy.sh dev
```
-This brings up MongoDB, Redis, Kafka with Zookeeper and Schema Registry, all seven workers, the backend API, and the
+This brings up MongoDB, Redis, Kafka with Zookeeper, all seven workers, the backend API, and the
frontend. Two initialization containers run automatically: `kafka-init` creates required Kafka topics, and `user-seed`
populates the database with default user accounts.
@@ -136,7 +136,6 @@ services define healthchecks in `docker-compose.yaml`:
| Redis | `redis-cli ping` |
| Backend | `curl /api/v1/health/live` |
| Kafka | `kafka-broker-api-versions` |
-| Schema Registry | `curl /config` |
| Zookeeper | `echo ruok \| nc localhost 2181 \| grep imok` |
Services without explicit healthchecks (workers, Grafana, Kafdrop) are considered "started" when their container is
@@ -162,7 +161,7 @@ The Helm chart organizes templates by function:
| `templates/rbac/` | ServiceAccount, Role, RoleBinding |
| `templates/secrets/` | Kubeconfig and Kafka JAAS |
| `templates/configmaps/` | TOML configuration and environment |
-| `templates/infrastructure/` | Zookeeper, Kafka, Schema Registry, Jaeger |
+| `templates/infrastructure/` | Zookeeper, Kafka, Jaeger |
| `templates/app/` | Backend and Frontend deployments |
| `templates/workers/` | All seven worker deployments |
| `templates/jobs/` | Kafka topic init and user seed |
@@ -252,7 +251,7 @@ infrastructure:
### Post-install jobs
-Two Helm hooks run after the main deployment completes. The kafka-init job waits for Kafka and Schema Registry to become
+Two Helm hooks run after the main deployment completes. The kafka-init job waits for Kafka to become
healthy, then creates all required topics using the `scripts/create_topics.py` module. Topics are created with the
prefix defined in settings (default `pref`) to avoid conflicts with Kubernetes-generated environment variables.
diff --git a/docs/reference/configuration.md b/docs/reference/configuration.md
index fd553ff1..766d7396 100644
--- a/docs/reference/configuration.md
+++ b/docs/reference/configuration.md
@@ -76,7 +76,6 @@ For production, mount `secrets.toml` from a Kubernetes Secret at `/app/secrets.t
| Key | Description | Default |
|-----|-------------|---------|
| `KAFKA_BOOTSTRAP_SERVERS` | Broker addresses (comma-separated) | `kafka:29092` |
- | `SCHEMA_REGISTRY_URL` | Confluent Schema Registry URL | `http://schema-registry:8081` |
| `ENABLE_EVENT_STREAMING` | Enable Kafka events | `false` |
| `EVENT_RETENTION_DAYS` | Days to retain events in MongoDB | `30` |
| `KAFKA_CONSUMER_GROUP_ID` | Consumer group ID | `integr8scode-backend` |
diff --git a/docs/testing/kafka-test-stability.md b/docs/testing/kafka-test-stability.md
index 99d17fdb..18ff7991 100644
--- a/docs/testing/kafka-test-stability.md
+++ b/docs/testing/kafka-test-stability.md
@@ -1,72 +1,19 @@
# Kafka test stability
-## The problem
+## Producer lifecycle
-When running tests in parallel with `pytest-xdist`, you might encounter sporadic crashes:
+The project uses aiokafka (`aiokafka==0.12.0`) via FastStream as the Kafka backend. `UnifiedProducer` is a thin wrapper that delegates to `await self._broker.publish()` — it has no internal Kafka client and no `start()`/`stop()` methods of its own. The broker lifecycle is managed externally (by FastStream in the API process, or by each worker's startup code), so the confluent-kafka `librdkafka` race conditions that plague many Python Kafka test suites do not apply here.
-```text
-Fatal Python error: Aborted
-```
-
-The stack trace typically points to `confluent_kafka` operations during producer initialization. This isn't a bug in
-the application code—it's a known race condition in the underlying `librdkafka` C library.
-
-## Why it happens
-
-The `confluent-kafka-python` library wraps `librdkafka`, a high-performance C library. When multiple processes or
-threads create Kafka `Producer` instances simultaneously, they can trigger a race condition in `librdkafka`'s internal
-initialization. This manifests as random `SIGABRT` signals, crashes in `rd_kafka_broker_destroy_final`, or flaky CI
-failures that pass on retry.
-
-## The fix
-
-Serialize `Producer` initialization using a global threading lock. In
-[
-`app/events/core/producer.py`](https://github.com/HardMax71/Integr8sCode/blob/main/backend/app/events/core/producer.py):
+In test fixtures, construct the producer and yield it directly:
```python
---8<-- "backend/app/events/core/producer.py:22:24"
+@pytest.fixture(scope="function")
+async def producer():
+ p = UnifiedProducer(broker, event_repository, logger, settings, event_metrics)
+ yield p
```
-The lock is process-global, so all `UnifiedProducer` instances serialize their initialization. This adds negligible
-overhead in production (producers are created once at startup) while eliminating the race condition in tests.
-
-## Related issues
-
-These GitHub issues document the underlying problem:
-
-| Issue | Description |
-|---------------------------------------------------------------------------------------------------|---------------------------------------------------------|
-| [confluent-kafka-python#1797](https://github.com/confluentinc/confluent-kafka-python/issues/1797) | Segfaults in multithreaded/asyncio pytest environments |
-| [confluent-kafka-python#1761](https://github.com/confluentinc/confluent-kafka-python/issues/1761) | Segfault on garbage collection in multithreaded context |
-| [librdkafka#3608](https://github.com/confluentinc/librdkafka/issues/3608) | Crash in `rd_kafka_broker_destroy_final` |
-
-## Alternative approaches
-
-If you still encounter issues:
-
-1. **Reduce parallelism** - Run Kafka-dependent tests with fewer workers: `pytest -n 2` instead of `-n auto`
-
-2. **Isolate Kafka tests** - Mark Kafka tests and run them separately:
- ```python
- @pytest.mark.kafka
- def test_producer_sends_message():
- ...
- ```
- ```bash
- pytest -m "not kafka" -n auto # parallel
- pytest -m kafka -n 1 # sequential
- ```
-
-3. **Use fixtures carefully** - Ensure producer fixtures are properly scoped and cleaned up:
- ```python
- @pytest.fixture(scope="function")
- async def producer():
- p = UnifiedProducer(config, schema_registry)
- await p.start()
- yield p
- await p.stop() # Always clean up
- ```
+No explicit cleanup is needed — the broker is started and stopped separately.
## Consumer teardown delays
diff --git a/helm/integr8scode/templates/_helpers.tpl b/helm/integr8scode/templates/_helpers.tpl
index e9c4122b..b145728e 100644
--- a/helm/integr8scode/templates/_helpers.tpl
+++ b/helm/integr8scode/templates/_helpers.tpl
@@ -120,13 +120,6 @@ Kafka bootstrap servers
{{- printf "%s-kafka:29092" .Release.Name }}
{{- end }}
-{{/*
-Schema Registry URL
-*/}}
-{{- define "integr8scode.schemaRegistryUrl" -}}
-{{- printf "http://%s-schema-registry:8081" .Release.Name }}
-{{- end }}
-
{{/*
Jaeger host
*/}}
diff --git a/helm/integr8scode/templates/infrastructure/schema-registry.yaml b/helm/integr8scode/templates/infrastructure/schema-registry.yaml
deleted file mode 100644
index ee23cad0..00000000
--- a/helm/integr8scode/templates/infrastructure/schema-registry.yaml
+++ /dev/null
@@ -1,78 +0,0 @@
-{{- if .Values.infrastructure.schemaRegistry.enabled }}
-apiVersion: apps/v1
-kind: Deployment
-metadata:
- name: {{ include "integr8scode.fullname" . }}-schema-registry
- namespace: {{ .Release.Namespace }}
- labels:
- {{- include "integr8scode.labels" . | nindent 4 }}
- app.kubernetes.io/component: schema-registry
-spec:
- replicas: 1
- selector:
- matchLabels:
- app: {{ include "integr8scode.fullname" . }}-schema-registry
- template:
- metadata:
- labels:
- app: {{ include "integr8scode.fullname" . }}-schema-registry
- {{- include "integr8scode.labels" . | nindent 8 }}
- app.kubernetes.io/component: schema-registry
- spec:
- automountServiceAccountToken: false
- containers:
- - name: schema-registry
- image: {{ .Values.infrastructure.schemaRegistry.image }}
- imagePullPolicy: {{ .Values.global.imagePullPolicy | default "IfNotPresent" }}
- # CRITICAL: Unset SCHEMA_REGISTRY_PORT to prevent Confluent image conflict
- command:
- - bash
- - -c
- - |
- unset SCHEMA_REGISTRY_PORT
- /etc/confluent/docker/run
- ports:
- - containerPort: 8081
- name: http
- env:
- - name: SCHEMA_REGISTRY_HOST_NAME
- value: "{{ include "integr8scode.fullname" . }}-schema-registry"
- - name: SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS
- value: "{{ include "integr8scode.fullname" . }}-kafka:29092"
- - name: SCHEMA_REGISTRY_LISTENERS
- value: "http://0.0.0.0:8081"
- - name: SCHEMA_REGISTRY_HEAP_OPTS
- value: {{ .Values.infrastructure.schemaRegistry.heapOpts | default "-Xms256M -Xmx256M" | quote }}
- resources:
- {{- toYaml .Values.infrastructure.schemaRegistry.resources | nindent 10 }}
- readinessProbe:
- httpGet:
- path: /config
- port: 8081
- initialDelaySeconds: 20
- periodSeconds: 10
- timeoutSeconds: 5
- livenessProbe:
- httpGet:
- path: /config
- port: 8081
- initialDelaySeconds: 30
- periodSeconds: 30
- timeoutSeconds: 5
----
-apiVersion: v1
-kind: Service
-metadata:
- name: {{ include "integr8scode.fullname" . }}-schema-registry
- namespace: {{ .Release.Namespace }}
- labels:
- {{- include "integr8scode.labels" . | nindent 4 }}
- app.kubernetes.io/component: schema-registry
-spec:
- selector:
- app: {{ include "integr8scode.fullname" . }}-schema-registry
- ports:
- - name: http
- port: 8081
- targetPort: 8081
-{{- end }}
diff --git a/helm/integr8scode/templates/jobs/kafka-init.yaml b/helm/integr8scode/templates/jobs/kafka-init.yaml
index d267e66a..202c7304 100644
--- a/helm/integr8scode/templates/jobs/kafka-init.yaml
+++ b/helm/integr8scode/templates/jobs/kafka-init.yaml
@@ -38,19 +38,6 @@ spec:
sleep 5
done
echo "Kafka is ready!"
- # Wait for Schema Registry to be ready
- - name: wait-for-schema-registry
- image: busybox:1.36
- command:
- - sh
- - -c
- - |
- echo "Waiting for Schema Registry to be ready..."
- until wget -q -O- http://{{ include "integr8scode.fullname" . }}-schema-registry:8081/config > /dev/null 2>&1; do
- echo "Schema Registry not ready, waiting 5 seconds..."
- sleep 5
- done
- echo "Schema Registry is ready!"
containers:
- name: kafka-init
image: {{ .Values.kafkaInit.image | default (include "integr8scode.backendImage" .) }}
@@ -64,8 +51,6 @@ spec:
env:
- name: KAFKA_BOOTSTRAP_SERVERS
value: {{ include "integr8scode.kafkaBootstrapServers" . | quote }}
- - name: SCHEMA_REGISTRY_URL
- value: {{ include "integr8scode.schemaRegistryUrl" . | quote }}
- name: LOG_LEVEL
value: "INFO"
resources:
diff --git a/helm/integr8scode/templates/secrets/env-secret.yaml b/helm/integr8scode/templates/secrets/env-secret.yaml
index c331f0b5..1d826f1a 100644
--- a/helm/integr8scode/templates/secrets/env-secret.yaml
+++ b/helm/integr8scode/templates/secrets/env-secret.yaml
@@ -18,7 +18,6 @@ stringData:
# Kafka configuration
KAFKA_BOOTSTRAP_SERVERS: {{ include "integr8scode.kafkaBootstrapServers" . | quote }}
- SCHEMA_REGISTRY_URL: {{ include "integr8scode.schemaRegistryUrl" . | quote }}
# MongoDB (contains credentials - URL-encoded)
MONGODB_URL: {{ include "integr8scode.mongodbUrl" . | quote }}
diff --git a/helm/integr8scode/values-prod.yaml b/helm/integr8scode/values-prod.yaml
index 77a9919c..af592206 100644
--- a/helm/integr8scode/values-prod.yaml
+++ b/helm/integr8scode/values-prod.yaml
@@ -128,16 +128,6 @@ infrastructure:
memory: "2Gi"
cpu: "2000m"
- schemaRegistry:
- heapOpts: "-Xms512M -Xmx512M"
- resources:
- requests:
- memory: "512Mi"
- cpu: "200m"
- limits:
- memory: "1Gi"
- cpu: "1000m"
-
jaeger:
resources:
requests:
diff --git a/helm/integr8scode/values.yaml b/helm/integr8scode/values.yaml
index b697243c..856dde9a 100644
--- a/helm/integr8scode/values.yaml
+++ b/helm/integr8scode/values.yaml
@@ -304,21 +304,6 @@ infrastructure:
cpu: "1000m"
ephemeral-storage: "512Mi"
- # Schema Registry
- schemaRegistry:
- enabled: true
- image: confluentinc/cp-schema-registry:7.8.2
- heapOpts: "-Xms256M -Xmx256M"
- resources:
- requests:
- memory: "256Mi"
- cpu: "100m"
- ephemeral-storage: "128Mi"
- limits:
- memory: "512Mi"
- cpu: "500m"
- ephemeral-storage: "256Mi"
-
# Jaeger (distributed tracing)
jaeger:
enabled: true