From 964643606e635c224bd94e2002afafb9b6a9bf11 Mon Sep 17 00:00:00 2001 From: Oliver Marshall Date: Wed, 4 Mar 2026 14:44:45 +0000 Subject: [PATCH] feat(debezium): add full e2e demo using docker-compose Co-Authored-By: Claude Opus 4.6 --- debezium/README.md | 31 +++-- debezium/cdc-demo/README.md | 62 ++++++++++ debezium/cdc-demo/dc_config.yaml | 32 +++++ debezium/cdc-demo/docker-compose.yaml | 148 ++++++++++++++++++++++++ debezium/cdc-demo/register-connector.sh | 30 +++++ 5 files changed, 295 insertions(+), 8 deletions(-) create mode 100644 debezium/cdc-demo/README.md create mode 100644 debezium/cdc-demo/dc_config.yaml create mode 100644 debezium/cdc-demo/docker-compose.yaml create mode 100755 debezium/cdc-demo/register-connector.sh diff --git a/debezium/README.md b/debezium/README.md index 7d024e0..72a2fcb 100644 --- a/debezium/README.md +++ b/debezium/README.md @@ -33,6 +33,20 @@ mise run demo # Installs MariaDB, starts it, runs CDC The module also includes a Debezium Server sink connector for deployment with standalone Debezium Server. +### [cdc-demo](./cdc-demo/) + +**Docker Compose end-to-end demo** - Full stack with Postgres, Debezium Connect, Kafka, and XTDB's built-in `debezium-cdc` module. + +- Postgres source (WAL-based CDC) +- Kafka as the message bus +- Uses XTDB's native CDC ingestion (no custom code) +- Fully self-contained via docker-compose + +```bash +cd cdc-demo +docker compose up -d +``` + ## Key Concepts ### Schema-less Ingestion @@ -63,14 +77,15 @@ The demos handle both full Debezium envelope format and the flattened format (vi ## Comparison -| Feature | Static JSON | Live CDC (debezium-xtdb) | -|---------|-------------|--------------------------| -| Real database | No | Yes (MySQL/MariaDB) | -| Kafka required | No | No | -| CDC engine | None | Debezium Embedded | -| Latency | N/A | Sub-second | -| Setup complexity | Minimal | Medium (MariaDB install) | -| Best for | Learning | Development/Testing | +| Feature | Static JSON | Live CDC (debezium-xtdb) | Docker Compose (cdc-demo) | +|---------|-------------|--------------------------|---------------------------| +| Real database | No | Yes (MySQL/MariaDB) | Yes (Postgres) | +| Kafka required | No | No | Yes | +| CDC engine | None | Debezium Embedded | Debezium Connect | +| XTDB ingestion | Custom (JDBC) | Custom (JDBC) | Built-in `debezium-cdc` | +| Latency | N/A | Sub-second | Sub-second | +| Setup complexity | Minimal | Medium (MariaDB install) | Medium (Docker) | +| Best for | Learning | Development/Testing | Production-like demo | ## Architecture diff --git a/debezium/cdc-demo/README.md b/debezium/cdc-demo/README.md new file mode 100644 index 0000000..f903fe6 --- /dev/null +++ b/debezium/cdc-demo/README.md @@ -0,0 +1,62 @@ +# Debezium CDC Demo + +Demonstrates end-to-end Change Data Capture from Postgres into XTDB via Debezium and Kafka. + +## Prerequisites + +Build the Docker image (includes the debezium module): + +```bash +./docker/scripts/build-aws-image.sh --clean +``` + +## Start the stack + +```bash +cd debezium/cdc-demo +docker compose up -d +``` + +Wait ~30s for all services to stabilise. +Check status with `docker compose ps` — `debezium-init` and `minio-setup` will show as exited (expected, they're one-shot). + +## Run the demo + +To submit transactions to Postgres: + +```bash +PGPASSWORD=postgres psql -h localhost -p 5434 -U postgres -d sourcedb +``` + +To watch messages on the kafka topic: + +```bash +docker compose exec kafka kafka-console-consumer --bootstrap-server kafka:29092 --topic sourcedb.public.cdc_demo --from-beginning 2>/dev/null | jq . +``` + +To watch for changes in XTDB: +```bash +watch -n 0.5 "psql -h localhost -p 5433 -U xtdb -d xtdb -c \"SELECT *, _valid_time FROM public.cdc_demo FOR ALL VALID_TIME ORDER BY _id, _valid_from\"" +``` + +### 1. Create a table and insert data in Postgres + +```sql +CREATE TABLE cdc_demo (_id INT PRIMARY KEY, name TEXT, email TEXT); +INSERT INTO cdc_demo VALUES (1, 'Alice', 'alice@example.com'); +INSERT INTO cdc_demo VALUES (2, 'Bob', 'bob@example.com'); +``` + +### 2. More mutations + +```sql +UPDATE cdc_demo SET email = 'alice-new@example.com' WHERE _id = 1; +DELETE FROM cdc_demo WHERE _id = 2; +INSERT INTO cdc_demo VALUES (3, 'Charlie', 'charlie@example.com'); +``` + +## Tear down + +```bash +docker compose down -v +``` diff --git a/debezium/cdc-demo/dc_config.yaml b/debezium/cdc-demo/dc_config.yaml new file mode 100644 index 0000000..393f602 --- /dev/null +++ b/debezium/cdc-demo/dc_config.yaml @@ -0,0 +1,32 @@ +server: + host: '*' + port: 5432 + +flightSql: + host: '*' + port: 9832 + +logClusters: + kafkaCluster: !Kafka + bootstrapServers: !Env KAFKA_BOOTSTRAP_SERVERS + +log: !Kafka + cluster: kafkaCluster + topic: !Env XTDB_LOG_TOPIC + +storage: !Remote + objectStore: !S3 + bucket: !Env XTDB_S3_BUCKET + prefix: "xtdb-object-store" + endpoint: !Env XTDB_S3_ENDPOINT + credentials: + accessKey: !Env ACCESS_KEY + secretKey: !Env SECRET_KEY + pathStyleAccessEnabled: true + +diskCache: + path: /var/lib/xtdb/buffers + +healthz: + host: '*' + port: 8080 diff --git a/debezium/cdc-demo/docker-compose.yaml b/debezium/cdc-demo/docker-compose.yaml new file mode 100644 index 0000000..1949148 --- /dev/null +++ b/debezium/cdc-demo/docker-compose.yaml @@ -0,0 +1,148 @@ +version: '3' +services: + minio: + image: minio/minio + ports: + - "9000:9000" + - "8090:8090" + environment: + - MINIO_ROOT_USER=minioadmin + - MINIO_ROOT_PASSWORD=minioadmin + volumes: + - minio_data:/data + command: server /data --console-address ":8090" + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"] + interval: 5s + timeout: 5s + retries: 3 + + # a service simply to create the bucket + minio-setup: + image: minio/mc + depends_on: + minio: + condition: service_healthy + entrypoint: > + /bin/sh -c " + mc alias set myminio http://minio:9000 minioadmin minioadmin && + mc mb --ignore-existing myminio/xtdb-bucket && + echo 'MinIO bucket created successfully!'" + + kafka: + image: confluentinc/cp-kafka:7.7.1 + expose: + - 9092 + - 9999 + - 29092 + - 29093 + ports: + - 29092:29092 + - 29093:29093 + environment: + CLUSTER_ID: "1" + KAFKA_PROCESS_ROLES: "broker,controller" + KAFKA_NODE_ID: "1" + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT" + KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092" + KAFKA_CONTROLLER_QUORUM_VOTERS: "1@kafka:29093" + KAFKA_LISTENERS: "PLAINTEXT://kafka:29092,CONTROLLER://kafka:29093,PLAINTEXT_HOST://0.0.0.0:9092" + KAFKA_CONTROLLER_LISTENER_NAMES: "CONTROLLER" + KAFKA_INTER_BROKER_LISTENER_NAME: "PLAINTEXT" + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: "1" + + # source postgres + postgres: + image: postgres:17-alpine + ports: + - "5434:5432" + environment: + POSTGRES_USER: postgres + POSTGRES_PASSWORD: postgres + POSTGRES_DB: sourcedb + command: ["postgres", "-c", "wal_level=logical"] + healthcheck: + test: ["CMD-SHELL", "pg_isready -U postgres"] + interval: 5s + timeout: 5s + retries: 5 + + # captures PG WAL → Kafka + debezium-connect: + image: quay.io/debezium/connect:3.0 + depends_on: + kafka: + condition: service_started + postgres: + condition: service_healthy + ports: + - "8083:8083" + environment: + BOOTSTRAP_SERVERS: kafka:29092 + GROUP_ID: "1" + CONFIG_STORAGE_TOPIC: debezium_configs + OFFSET_STORAGE_TOPIC: debezium_offsets + STATUS_STORAGE_TOPIC: debezium_statuses + + debezium-init: + image: curlimages/curl + depends_on: + - debezium-connect + - postgres + volumes: + - ./register-connector.sh:/register-connector.sh:ro + entrypoint: ["/bin/sh", "/register-connector.sh"] + + # main XTDB node (queryable via pgwire on :5433) + xtdb: + image: xtdb/xtdb-aws:latest + depends_on: + - kafka + - minio-setup + expose: + - 8080 + - 5432 + - 9832 + ports: + - "8081:8080" + - "5433:5432" + - "9832:9832" + environment: + AWS_REGION: "aws-iso-global" + AWS_S3_FORCE_PATH_STYLE: "true" + AWS_S3_USE_VIRTUAL_HOSTING: "false" + XTDB_NODE_ID: "xt-node-1" + KAFKA_BOOTSTRAP_SERVERS: "kafka:29092" + XTDB_LOG_TOPIC: "xt_log" + XTDB_S3_BUCKET: "xtdb-bucket" + XTDB_S3_ENDPOINT: "http://minio:9000" + ACCESS_KEY: "minioadmin" + SECRET_KEY: "minioadmin" + command: ["-f", "/config/dc_config.yaml"] + volumes: + - ./dc_config.yaml:/config/dc_config.yaml + + # XTDB CDC node + xtdb-cdc: + image: xtdb/xtdb-aws:latest + depends_on: + - kafka + - minio-setup + - debezium-init + environment: + AWS_REGION: "aws-iso-global" + AWS_S3_FORCE_PATH_STYLE: "true" + AWS_S3_USE_VIRTUAL_HOSTING: "false" + XTDB_NODE_ID: "xt-cdc-node" + KAFKA_BOOTSTRAP_SERVERS: "kafka:29092" + XTDB_LOG_TOPIC: "xt_log" + XTDB_S3_BUCKET: "xtdb-bucket" + XTDB_S3_ENDPOINT: "http://minio:9000" + ACCESS_KEY: "minioadmin" + SECRET_KEY: "minioadmin" + command: ["debezium-cdc", "--kafka-cluster", "kafkaCluster", "-t", "sourcedb.public.cdc_demo", "-f", "/config/dc_config.yaml"] + volumes: + - ./dc_config.yaml:/config/dc_config.yaml + +volumes: + minio_data: diff --git a/debezium/cdc-demo/register-connector.sh b/debezium/cdc-demo/register-connector.sh new file mode 100755 index 0000000..94884a5 --- /dev/null +++ b/debezium/cdc-demo/register-connector.sh @@ -0,0 +1,30 @@ +#!/bin/sh +set -e + +CONNECT_URL="http://debezium-connect:8083" + +echo "Waiting for Debezium Connect to be ready..." +until curl -sf "$CONNECT_URL/connectors" > /dev/null 2>&1; do + sleep 2 +done +echo "Debezium Connect is ready." + +curl -sf -X POST "$CONNECT_URL/connectors" \ + -H "Content-Type: application/json" \ + -d '{ + "name": "sourcedb-connector", + "config": { + "connector.class": "io.debezium.connector.postgresql.PostgresConnector", + "database.hostname": "postgres", + "database.port": "5432", + "database.user": "postgres", + "database.password": "postgres", + "database.dbname": "sourcedb", + "topic.prefix": "sourcedb", + "schema.include.list": "public", + "plugin.name": "pgoutput" + } + }' + +echo "" +echo "Connector registered successfully."