From 9671f6697f5dd5d166567325d1ac8855b84b70ac Mon Sep 17 00:00:00 2001 From: vostres Date: Wed, 28 Jan 2026 10:57:42 +0100 Subject: [PATCH 1/9] Add content --- docs/docs/index.md | 45 +++ docs/docs/user/concepts/common.md | 179 ++++++++- docs/docs/user/concepts/consumer-producer.md | 0 docs/docs/user/concepts/consumer.md | 0 docs/docs/user/concepts/producer.md | 121 +++++- docs/docs/user/concepts/streams.md | 222 ++++++++++- docs/docs/user/deployment/kubernetes.md | 231 ++++++++++- docs/docs/user/deployment/local.md | 46 +++ docs/docs/user/extensions/large-messages.md | 105 +++++ docs/docs/user/getting-started/quick-start.md | 95 ++++- docs/docs/user/getting-started/setup.md | 155 ++++++++ docs/docs/user/monitoring.md | 367 ++++++++++++++++++ docs/docs/user/testing.md | 183 +++++++++ docs/mkdocs.yml | 2 + 14 files changed, 1717 insertions(+), 34 deletions(-) create mode 100644 docs/docs/user/concepts/consumer-producer.md create mode 100644 docs/docs/user/concepts/consumer.md diff --git a/docs/docs/index.md b/docs/docs/index.md index bc17b4db1..64677fc48 100644 --- a/docs/docs/index.md +++ b/docs/docs/index.md @@ -1 +1,46 @@ # What is streams-bootstrap? + +`streams-bootstrap` is a Java library that standardizes the development and operation of Kafka-based applications (Kafka +Streams and plain Kafka clients). + +The framework supports Apache Kafka 4.1 and Java 17. Its modules are published to Maven Central for straightforward +integration into existing projects. + +## Why use it? + +Kafka Streams and the core Kafka clients provide strong primitives for stream processing and messaging, but they do not +prescribe: + +- How to structure a full application around those primitives +- How to configure applications consistently +- How to deploy and operate these services on Kubernetes +- How to perform repeatable reprocessing and cleanup +- How to handle errors and large messages uniformly + +`streams-bootstrap` addresses these aspects by supplying: + +1. **Standardized base classes** for Kafka Streams and client applications. +2. **A common CLI/configuration contract** for all streaming services. +3. **Helm-based deployment templates** and conventions for Kubernetes. +4. **Built-in reset/clean workflows** for reprocessing and state management. +5. **Consistent error-handling** and dead-letter integration. +6. **Testing infrastructure** for local development and CI environments. +7. **Optional S3-backed serialization** for large messages. + +## Framework Architecture + +The framework uses a modular architecture with a clear separation of concerns. + +### Core Modules + +- `streams-bootstrap-core`: Base classes such as `KafkaApplication`, `Runner`, and `CleanUpRunner` +- `streams-bootstrap-cli`: CLI framework based on `picocli` +- `streams-bootstrap-test`: Testing utilities (`TestApplicationRunner`, `KafkaTestClient`) +- `streams-bootstrap-large-messages`: Support for handling large Kafka messages +- `streams-bootstrap-cli-test`: Test support for CLI-based applications + +### External Dependencies + +- Apache Kafka: `kafka-streams`, `kafka-clients` +- Confluent Platform: Schema Registry and Avro SerDes +- Picocli: Command-line parsing and CLI framework diff --git a/docs/docs/user/concepts/common.md b/docs/docs/user/concepts/common.md index 4361c36f3..335f2a7e3 100644 --- a/docs/docs/user/concepts/common.md +++ b/docs/docs/user/concepts/common.md @@ -2,15 +2,184 @@ ## Application types -- App -- ConfiguredApp -- ExecutableApp +The **streams-bootstrap** framework uses a three-layer application type hierarchy: +**App → ConfiguredApp → ExecutableApp** + +--- + +### App + +The **App** represents your application logic implementation. Each application type has its own `App` interface: + +- **StreamsApp** – for Kafka Streams applications +- **ProducerApp** – for producer applications +- **ConsumerApp** – for consumer applications +- **ConsumerProducerApp** – for consumer–producer applications + +You implement the appropriate interface to define your application's behavior. + +--- + +### ConfiguredApp + +A **ConfiguredApp** pairs an `App` with its configuration. Examples include: + +- `ConfiguredConsumerApp` +- `ConfiguredProducerApp` + +This layer handles Kafka property creation, combining: + +- base configuration +- app-specific configuration +- environment variables +- runtime configuration + +--- + +### ExecutableApp + +An **ExecutableApp** is a `ConfiguredApp` with runtime configuration applied, making it ready to execute. +It can create: + +- a **Runner** for running the application +- a **CleanUpRunner** for cleanup operations + +The `KafkaApplication` base class orchestrates the creation of these components through methods such as: + +- `createConfiguredApp()` +- `createExecutableApp()` + +--- + +### Usage Pattern + +1. You implement an **App**. +2. The framework wraps it in a **ConfiguredApp**, applying the configuration. +3. Runtime configuration is then applied to create an **ExecutableApp**, which can be: + +- **run**, or +- **cleaned up**. + +--- ## Application lifecycle -- Running an application -- Cleaning an application +Applications built with streams-bootstrap follow a defined lifecycle with specific states and transitions. + +The framework manages this lifecycle through the KafkaApplication base class and provides several extension points for +customization. + +| Phase | Description | Entry Point | +|----------------|--------------------------------------------------------------------------|----------------------------------------------------------| +| Initialization | Parse CLI arguments, inject environment variables, configure application | `startApplication()` or `startApplicationWithoutExit()` | +| Preparation | Execute pre-run/pre-clean hooks | `onApplicationStart()`, `prepareRun()`, `prepareClean()` | +| Execution | Run main application logic or cleanup operations | `run()`, `clean()`, `reset()` | +| Shutdown | Stop runners, close resources, cleanup | `stop()`, `close()` | + +### Running an application + +Applications built with streams-bootstrap can be started in two primary ways: + +- **Via Command Line Interface**: When packaged as a runnable JAR (for example, in a container), + the `run` command is the default entrypoint. An example invocation: + + ```bash + java -jar example-app.jar \ + run \ + --bootstrap-servers kafka:9092 \ + --input-topics input-topic \ + --output-topic output-topic \ + --schema-registry-url http://schema-registry:8081 + ``` + +- **Programmatically**: The application subclass calls `startApplication(args)` on startup. Example for a Kafka Streams + application: + +```java + public static void main(final String[] args) { + new MyStreamsApplication().startApplication(args); +} + ``` + +### Cleaning an application + +The framework provides a built-in mechanism to clean up all resources associated with an application. + +When the cleanup operation is triggered, the following resources are removed: + +| Resource Type | Description | Streams Apps | Producer Apps | +|---------------------|-----------------------------------------------------------|--------------|---------------| +| Output Topics | The main output topic of the application | ✓ | ✓ | +| Intermediate Topics | Topics for stream operations like `through()` | ✓ | N/A | +| Internal Topics | Topics for state stores or repartitioning (Kafka Streams) | ✓ | N/A | +| Consumer Groups | Consumer group metadata | ✓ | N/A | +| Schema Registry | All registered schemas | ✓ | ✓ | + +Cleanup can be triggered: + +- **Via Command Line**: Helm cleanup jobs +- **Programmatically**: + +```java +// For streams applications +try(StreamsCleanUpRunner cleanUpRunner = streamsApp.createCleanUpRunner()){ + cleanUpRunner. + +clean(); +} + +// For producer applications + try( +CleanUpRunner cleanUpRunner = producerApp.createCleanUpRunner()){ + cleanUpRunner. + +clean(); +} +``` + +The framework ensures that cleanup operations are idempotent, meaning they can be safely retried without causing +additional issues. ## Configuration +Kafka properties are applied in the following order (later values override earlier ones): + +1. Base configuration +2. App config from .createKafkaProperties() +3. Environment variables (`KAFKA_`) +4. Runtime args (--bootstrap-servers, etc.) +5. Serialization config from ProducerApp.defaultSerializationConfig() or StreamsApp.defaultSerializationConfig() +6. CLI overrides via --kafka-config + +The framework automatically parses environment variables with the `APP_ prefix` (configurable via `ENV_PREFIX`). +Environment variables are converted to CLI arguments: + +```text +APP_BOOTSTRAP_SERVERS → --bootstrap-servers +APP_SCHEMA_REGISTRY_URL → --schema-registry-url +APP_OUTPUT_TOPIC → --output-topic +``` + +Additionally, Kafka-specific environment variables with the `KAFKA_` prefix are automatically added to the Kafka +configuration. + ## Command line interface + +The framework provides a unified command-line interface for application configuration. + +### CLI Commands + +- `run`: Run the application +- `clean`: Delete topics and consumer groups +- `reset`: Reset internal state and offsets (for Streams apps) + +### Common CLI Configuration Options + +- `--bootstrap-servers`: Kafka bootstrap servers (required) +- `--schema-registry-url`: URL for Avro serialization +- `--kafka-config`: Key-value Kafka configuration +- `--output-topic`: Main output topic +- `--labeled-output-topics`: Named output topics +- `--input-topics`: Input topics (for Streams apps) +- `--input-pattern`: Input topic pattern (for Streams apps) +- `--application-id`: Unique app ID (for Streams apps) diff --git a/docs/docs/user/concepts/consumer-producer.md b/docs/docs/user/concepts/consumer-producer.md new file mode 100644 index 000000000..e69de29bb diff --git a/docs/docs/user/concepts/consumer.md b/docs/docs/user/concepts/consumer.md new file mode 100644 index 000000000..e69de29bb diff --git a/docs/docs/user/concepts/producer.md b/docs/docs/user/concepts/producer.md index 307bbcaf2..aebd70b43 100644 --- a/docs/docs/user/concepts/producer.md +++ b/docs/docs/user/concepts/producer.md @@ -1,15 +1,126 @@ -# Producer apps +# Producer applications -Producer apps are applications that generate data and send it to a Kafka topic. -They can be used to produce messages from various sources, such as databases, files, or real-time events. +Producer applications are applications that generate data and send it to Kafka topics. They can be used to produce +messages from various sources, such as databases, files, or real-time events. + +The `streams-bootstrap` framework provides a structured way to build Kafka producer applications with built-in +configuration handling, command-line support, and resource lifecycle management. + +--- ## Application lifecycle -- Running an application -- Cleaning an application +### Running an application + +Producer applications are executed using the `ProducerRunner`, which runs the producer logic defined by the application. + +Unlike Kafka Streams applications, producer applications typically: + +- Run to completion and terminate automatically, or +- Run continuously when implemented as long-lived services + +The execution model is fully controlled by the producer implementation and its runnable logic. + +### Cleaning an application + +Producer applications support a dedicated `clean` command that removes Kafka-related resources created by the +application. + +```bash +java -jar my-producer-app.jar \ + --bootstrap-servers localhost:9092 \ + --output-topic my-topic \ + clean +``` + +The clean process can perform the following operations: + +- Delete output topics +- Delete registered schemas from Schema Registry +- Execute custom cleanup hooks defined by the application + +Applications can register custom cleanup logic by overriding `setupCleanUp`: + +```java + +@Override +public void setupCleanUp(final EffectiveAppConfiguration configuration) { + configuration.addCleanupHook(() -> { + // Custom cleanup logic + }); +} +``` + +--- ## Configuration +### Serialization configuration + +Producer applications define key and value serialization using the `defaultSerializationConfig()` method in their +`ProducerApp` implementation. + +```java + +@Override +public SerializerConfig defaultSerializationConfig() { + return new SerializerConfig(StringSerializer.class, SpecificAvroSerializer.class); +} +``` + +Common serializer combinations include: + +| Key Serializer | Value Serializer | Use case | +|-----------------------|--------------------------|----------------------------| +| `StringSerializer` | `StringSerializer` | Simple string messages | +| `StringSerializer` | `SpecificAvroSerializer` | Avro with schema evolution | +| `StringSerializer` | `GenericAvroSerializer` | Dynamic Avro schemas | +| `ByteArraySerializer` | `ByteArraySerializer` | Binary data | + +### Custom Kafka properties + +Producer-specific Kafka configuration can be customized by overriding `createKafkaProperties()`: + +```java + +@Override +public Map createKafkaProperties() { + return Map.of( + ProducerConfig.ACKS_CONFIG, "all", + ProducerConfig.RETRIES_CONFIG, 3, + ProducerConfig.BATCH_SIZE_CONFIG, 16384, + ProducerConfig.LINGER_MS_CONFIG, 5 + ); +} +``` + +These properties are merged with the framework defaults and CLI-provided configuration. + +### Schema Registry integration + +When the `--schema-registry-url` option is provided: + +- Schemas are registered automatically during application startup +- Schema cleanup is handled as part of the `clean` command +- Schema evolution is fully supported + +--- + ## Command line interface +Producer applications inherit standard CLI options from `KafkaApplication`. + +```text +--bootstrap-servers Kafka bootstrap servers (comma-separated) (Required) +--bootstrap-server Alias for --bootstrap-servers (Required) +--schema-registry-url URL of the Schema Registry (Optional) +--kafka-config Additional Kafka config (key=value,...) (Optional) +--output-topic Default output topic (Optional) +--labeled-output-topics Named output topics (label1=topic1,...) (Optional) +``` + +--- + ## Deployment + +TODO diff --git a/docs/docs/user/concepts/streams.md b/docs/docs/user/concepts/streams.md index 0924695cf..3f4091118 100644 --- a/docs/docs/user/concepts/streams.md +++ b/docs/docs/user/concepts/streams.md @@ -1,36 +1,218 @@ -# Streams apps +# Streams applications Streams apps are applications that process data in real-time as it flows through Kafka topics. They can be used to filter, transform, aggregate, or enrich data streams. Streams apps can also produce new messages to other topics based on the processed data. +--- + ## Application lifecycle -- Running an application -- Resetting an application -- Cleaning an application +### Running an application + +Kafka Streams applications are started via the `KafkaStreamsApplication` entry point: + +```java +public static void main(final String[] args) { + new MyStreamsApplication().startApplication(args); +} +``` + +When an application is started, the framework performs the following steps: + +- Parse command-line arguments and environment variables +- Create a `StreamsApp` instance +- Wrap it in a `ConfiguredStreamsApp` +- Convert it into an `ExecutableStreamsApp` +- Start execution using the `StreamsRunner` + +--- + +### Resetting an application + +Streams applications support a dedicated `reset` operation that clears processing state while preserving the +application definition and configuration. This is useful for reprocessing input data from the beginning. + +When a reset is triggered, the following resources are affected: + +| Resource | Action | +|--------------------|-------------------------------------------| +| State stores | Cleared locally, changelog topics deleted | +| Internal topics | Deleted (e.g. repartition topics) | +| Consumer offsets | Reset to earliest for input topics | +| Output topics | Preserved | +| Application config | Preserved | + +Triggering a reset via CLI: + +```bash +java -jar my-streams-app.jar reset +``` + +Triggering a reset programmatically: + +```java +try(StreamsCleanUpRunner cleanUpRunner = streamsApp.createCleanUpRunner()){ + cleanUpRunner. + +reset(); +} +``` + +After a reset, the application can be started again and will reprocess all input data. + +--- + +### Cleaning an application + +The `clean` command removes Kafka-related resources created by the application: + +```bash +java -jar my-streams-app.jar clean +``` + +This includes: + +- Consumer groups +- Internal topics +- Output topics (unless explicitly preserved) + +--- ## Configuration -- Topics -- Application id -- Kafka properties -- Lifecycle hooks - - Setup - - Clean up -- Execution options - - On start - - Application server - - State listener - - Uncaught exception handler - - Closing options +### Topics + +Streams applications support flexible topic configuration: + +- `--input-topics`: Comma-separated list of input topics +- `--input-pattern`: Regex pattern for input topics +- `--output-topic`: Default output topic +- `--error-topic`: Topic for error records +- `--labeled-input-topics`: Named input topics with different message types +- `--labeled-input-patterns`: Additional labeled input topic patterns +- `--labeled-output-topics`: Named output topics with different message types + +--- + +### Application ID + +- `--application-id`: Unique Kafka Streams application ID + +If not provided, the framework generates a deterministic application ID. + +--- + +### Kafka properties + +Additional Kafka Streams configuration can be supplied using: + +- `--kafka-config ` + +The framework applies the following defaults: + +```text +processing.guarantee=exactly_once_v2 +producer.max.in.flight.requests.per.connection=1 +producer.acks=all +producer.compression.type=gzip +``` + +--- + +### Lifecycle hooks + +#### Setup + +Custom uncaught exception handling can be provided by overriding the default handler: + +```java + +@Override +private StreamsUncaughtExceptionHandler createUncaughtExceptionHandler() { + return new MyCustomExceptionHandler(); +} +``` + +#### Clean up + +Applications can observe Kafka Streams state transitions by registering a state listener: + +```java + +@Override +private StateListener createStateListener() { + return new MyCustomStateListener(); +} +``` + +--- + +### Execution options + +#### On start + +Custom logic can be executed once Kafka Streams has fully started: + +```java + +@Override +private void onStreamsStart(final RunningStreams runningStreams) { + // Custom startup logic +} +``` + +#### Application server + +TODO + +#### State listener + +TODO + +#### Uncaught exception handler + +TODO + +#### Closing options + +TODO + +--- ## Command line interface +Streams applications inherit standard CLI options from `KafkaStreamsApplication`. + +| Option | Description | Default | +|--------------------------------|-------------------------------------------|----------------| +| `--bootstrap-servers` | Kafka bootstrap servers (comma-separated) | Required | +| `--schema-registry-url` | URL of the Schema Registry | None | +| `--application-id` | Kafka Streams application ID | Auto-generated | +| `--volatile-group-instance-id` | Use volatile group instance ID | false | + +--- + ## Deployment -## Kafka streams extensions +TODO + +--- + +## Kafka Streams extensions + +The framework provides several extensions that simplify working with Kafka Streams. + +### Simple topic access + +TODO + +### Error handling + +TODO + +### Serde auto configuration + +TODO -- Simple topic access -- Error handling -- Serde auto configuration +--- diff --git a/docs/docs/user/deployment/kubernetes.md b/docs/docs/user/deployment/kubernetes.md index 87c833da9..33e7fe175 100644 --- a/docs/docs/user/deployment/kubernetes.md +++ b/docs/docs/user/deployment/kubernetes.md @@ -1,5 +1,230 @@ # Deployment to Kubernetes -- Autoscaling -- Monitoring -- Persistence +The `streams-bootstrap` framework provides support for deploying applications to Kubernetes using Helm +charts. The charts cover Kafka Streams, producer, consumer, and producer-consumer applications and offer standardized +solutions for autoscaling, monitoring, and state persistence. + +--- + +## Core capabilities + +- **Autoscaling** – KEDA-based horizontal scaling driven by Kafka consumer lag +- **Monitoring** – JMX metrics export with Prometheus integration +- **Persistence** – Persistent volumes for Kafka Streams state stores + +--- + +## Helm charts + +The framework ships a set of Helm charts tailored to different application types: + +| Chart name | Purpose | Kubernetes workload types | +|------------------------|-----------------------------------------------|--------------------------------| +| `streams-app` | Deploy Kafka Streams applications | `Deployment`, `StatefulSet` | +| `producer-app` | Deploy Kafka Producer applications | `Deployment`, `Job`, `CronJob` | +| `consumer-app` | Deploy Kafka Consumer applications | `Deployment` | +| `consumerproducer-app` | Deploy batch / consumer–producer applications | `Deployment` | +| `*-cleanup-job` | Clean Kafka resources before deployment | `Job` (Helm hooks) | + +Cleanup charts (for example `streams-app-cleanup-job` and `producer-app-cleanup-job`) remove Kafka topics, consumer +groups, and Schema Registry subjects associated with a release before a new deployment. + +--- + +## Chart repository and installation + +The Helm charts are published as a Helm repository: + +```bash +helm repo add bakdata-common https://bakdata.github.io/streams-bootstrap/ +helm repo update +``` + +A Streams application can then be installed with: + +```bash +helm install my-app bakdata-common/streams-app --values my-values.yaml +``` + +--- + +## Deployment patterns + +### Streams applications (`streams-app`) + +Streams applications support both stateless and stateful deployment modes: + +- **Deployment** + - Used for stateless applications or when state is stored externally + - Enabled when `statefulSet: false` or `persistence.enabled: false` + +- **StatefulSet** + - Used for stateful Kafka Streams applications with local state stores + - Enabled when `statefulSet: true` and `persistence.enabled: true` + - Each pod receives a dedicated `PersistentVolumeClaim` for RocksDB state + +This allows choosing between elasticity (Deployment) and stronger data locality guarantees (StatefulSet). + +--- + +### Producer applications (`producer-app`) + +Producer applications support multiple execution models: + +| Mode | Use case | Resource type | +|------------|-------------------------------------|----------------------| +| Deployment | Long-running or continuous producer | `apps/v1/Deployment` | +| Job | One-time run or backfill | `batch/v1/Job` | +| CronJob | Scheduled periodic execution | `batch/v1/CronJob` | + +--- + +### Consumer and consumer–producer applications + +- **`consumer-app`** + - Deployed as a `Deployment` + - Uses Kafka consumer groups for parallel consumption + +- **`consumerproducer-app`** + - Deployed as a `Deployment` + - Typically used for batch-style read–process–write workloads + +--- + +### Cleanup jobs + +Cleanup charts are executed as Helm hook Jobs: + +- Run as `pre-install` or `pre-upgrade` hooks +- Remove: + - Kafka topics + - Consumer groups + - Schema Registry subjects + +This ensures a clean starting point for reprocessing or redeployment scenarios. + +--- + +## Configuration structure + +All charts share a common configuration structure, with chart-specific extensions: + +| Section | Purpose | Examples | +|-------------------------|-------------------------------------------|-----------------------------------| +| `image`, `imageTag` | Container image configuration | `streamsApp`, `latest` | +| `kafka.*` | Kafka connection and topic configuration | `bootstrapServers`, `inputTopics` | +| `commandLine.*` | CLI arguments passed to the application | `MY_PARAM: "value"` | +| `env.*` | Additional environment variables | `MY_ENV: foo` | +| `secrets.*` | Inline secret values | Tokens, passwords | +| `secretRefs.*` | References to existing `Secret` objects | External credentials | +| `resources.*` | CPU and memory requests/limits | `requests.cpu: 200m` | +| `autoscaling.*` | KEDA autoscaling configuration | `lagThreshold`, `minReplicas` | +| `persistence.*` | Streams state-store persistence | `enabled: true`, `size: 1Gi` | +| `jmx.*`, `prometheus.*` | JMX exporter and Prometheus configuration | `jmx.enabled: true` | +| `statefulSet` | Toggle `StatefulSet` vs `Deployment` | `true` / `false` | + +--- + +## Environment variable mapping + +Helm values are translated into environment variables using a configurable prefix: + +```yaml +configurationEnvPrefix: "APP" + +commandLine: + MY_PARAM: "value" +kafka: + inputTopics: [ "input" ] + outputTopic: "output" +``` + +This results in: + +- `APP_MY_PARAM=value` +- `APP_INPUT_TOPICS=input` +- `APP_OUTPUT_TOPIC=output` + +Kafka client configuration uses the `KAFKA_` prefix: + +```yaml +kafka: + config: + max.poll.records: 500 +``` + +Becomes: + +- `KAFKA_MAX_POLL_RECORDS=500` + +--- + +## Autoscaling + +Autoscaling is implemented using Kubernetes Event-Driven Autoscaling (KEDA). When enabled, KEDA monitors Kafka consumer +lag and adjusts the number of replicas accordingly. + +Autoscaling is disabled by default. + +### Enabling autoscaling + +```yaml +autoscaling: + enabled: true + lagThreshold: "1000" + minReplicas: 0 + maxReplicas: 5 +``` + +When enabled, the chart creates a KEDA `ScaledObject` and omits a fixed `replicaCount` from the workload specification. + +### Scaling behavior + +KEDA computes the desired number of replicas as: + +``` +desiredReplicas = ceil(totalLag / lagThreshold) +``` + +subject to `minReplicas` and `maxReplicas`. + +### Integration with persistence + +When persistence is enabled for Streams applications, autoscaling targets a `StatefulSet`. Each replica receives its own +`PersistentVolumeClaim`. + +> **Note:** Scale-down operations remove pods and their PVCs. Backup and recovery strategies should be considered. + +--- + +## Monitoring + +Monitoring is based on JMX metrics and Prometheus scraping: + +- `jmx.enabled: true` enables Kafka client and Streams metrics +- `prometheus.jmx.enabled: true` adds a Prometheus JMX exporter sidecar +- Metrics are exposed on a dedicated `/metrics` endpoint + +Collected metrics include consumer lag, processing rates, and RocksDB statistics. + +--- + +## Persistence + +Persistence is configured via the `persistence.*` section (Streams applications only): + +```yaml +persistence: + enabled: true + size: 1Gi + storageClassName: standard +``` + +When enabled together with `statefulSet: true`, each pod receives a dedicated volume for local state storage. This +enables: + +- Faster restarts due to warm state +- Improved recovery semantics for stateful topologies + +If persistence is disabled, applications behave as stateless deployments and rely on Kafka changelogs for state +reconstruction. diff --git a/docs/docs/user/deployment/local.md b/docs/docs/user/deployment/local.md index 74626da4f..47edb9a21 100644 --- a/docs/docs/user/deployment/local.md +++ b/docs/docs/user/deployment/local.md @@ -1 +1,47 @@ # Local deployment + +Applications can be run locally for development and testing purposes. This can be done programmatically within your +code. + +### Programmatic Local Execution + +Here is an example of how to run a producer application programmatically. This is useful for simple applications or for +testing. + +```java +try(final KafkaProducerApplication app = new SimpleKafkaProducerApplication<>(() -> + new ProducerApp() { + @Override + public ProducerRunnable buildRunnable(final ProducerBuilder builder) { + return () -> { + try (final Producer producer = builder.createProducer()) { + // Producer logic + } + }; + } + + @Override + public SerializerConfig defaultSerializationConfig() { + return new SerializerConfig(StringSerializer.class, StringSerializer.class); + } + } +)){ + app. + +setBootstrapServers("localhost:9092"); + app. + +setOutputTopic("output-topic"); + app. + +run(); +} +``` + +### Command Line Execution + +You can also run the application from the command line by packaging it as a JAR file. + +```bash +java -jar my-producer-app.jar --bootstrap-servers localhost:9092 --output-topic my-topic run +``` diff --git a/docs/docs/user/extensions/large-messages.md b/docs/docs/user/extensions/large-messages.md index 31ce42dec..19c25bd4d 100644 --- a/docs/docs/user/extensions/large-messages.md +++ b/docs/docs/user/extensions/large-messages.md @@ -1 +1,106 @@ # Large messages + +## Overview + +The **Large Messages Extension** adds support for handling messages that exceed Kafka's size limitations by using +external storage mechanisms with automatic cleanup. +It integrates with the *streams-bootstrap* framework to transparently manage: + +- large message serialization +- large message deserialization +- external storage cleanup + +For both **producer** and **streams** applications. + +--- + +## Core Components + +### LargeMessageAppUtils + +`LargeMessageAppUtils` is the central utility for managing large message cleanup operations. +It provides factory methods for creating **topic hooks** that automatically clean up externally stored large message +files when topics are deleted. + +**Key methods:** + +- `createTopicHook(Map kafkaProperties)` + Creates a cleanup hook from Kafka properties. +- `createTopicHook(AppConfiguration configuration)` + Creates a cleanup hook from application configuration. +- `registerTopicHook()` + Registers the cleanup hook with a cleanup configuration. + +--- + +### LargeMessageProducerApp + +`LargeMessageProducerApp` extends `ProducerApp` to automatically handle cleanup of large message files for producer +applications. + +When a topic is deleted, it removes all associated large message files stored externally via the +`LargeMessageSerializer`. + +The interface automatically registers the cleanup hook inside `setupCleanUp()`: + +--- + +### LargeMessageStreamsApp + +`LargeMessageStreamsApp` extends `StreamsApp` to provide automatic cleanup for Kafka Streams applications using large +messages. + +When streams topics are cleaned up, the extension ensures corresponding external large message files are also removed. + +It also registers the cleanup hook during `setupCleanUp()`: + +--- + +## Implementation Details + +### Topic Hook Mechanism + +Cleanup is implemented through the `LargeMessageTopicHook` class, which implements the `TopicHook` interface. + +When a topic is deleted, the hook's `deleted()` method is triggered, which calls `deleteAllFiles()` on the configured +`LargeMessageStoringClient`. + +--- + +### Configuration Requirements + +The large message extension requires Kafka properties needed to build an `AbstractLargeMessageConfig`. + +This configuration is used to instantiate the appropriate `LargeMessageStoringClient` for the storage backend. + +--- + +## Usage Examples + +### Producer Application + +```java +public class MyLargeMessageProducer extends KafkaProducerApplication { + @Override + public LargeMessageProducerApp createApp() { + return new LargeMessageProducerApp() { + + @Override + public ProducerRunnable buildRunnable(final ProducerBuilder builder) { + return () -> { + try (final Producer producer = builder.createProducer()) { + // Producer logic with large message support + } + }; + } + + @Override + public SerializerConfig defaultSerializationConfig() { + // Configure LargeMessageSerializer + return new SerializerConfig(LargeMessageSerializer.class, LargeMessageSerializer.class); + } + }; + } +} + +``` diff --git a/docs/docs/user/getting-started/quick-start.md b/docs/docs/user/getting-started/quick-start.md index 754bef8a6..000272bb1 100644 --- a/docs/docs/user/getting-started/quick-start.md +++ b/docs/docs/user/getting-started/quick-start.md @@ -1 +1,94 @@ -# Quick start +# Quick Start + +This page shows how to add `streams-bootstrap` to a project and how to create and run a minimal application. + +## Prerequisites + +- Java 17 +- Apache Kafka cluster (brokers reachable from the application) +- Access to Maven Central +- `streams-bootstrap-cli` dependency (see [Setup](setup.md) for Gradle/Maven snippets) + +## Minimal Kafka Streams Application + +Create a subclass of `KafkaStreamsApplication` and implement the required methods. + +```java +import com.bakdata.kafka.streams.KafkaStreamsApplication; +import com.bakdata.kafka.streams.SerdeConfig; +import com.bakdata.kafka.streams.StreamsApp; +import com.bakdata.kafka.streams.StreamsTopicConfig; +import com.bakdata.kafka.streams.kstream.KStreamX; +import com.bakdata.kafka.streams.kstream.StreamsBuilderX; +import java.util.Map; +import org.apache.kafka.common.serialization.Serdes.StringSerde; + +public class MyStreamsApplication extends KafkaStreamsApplication { + + public static void main(final String[] args) { + new MyStreamsApplication().startApplication(args); + } + + @Override + public StreamsApp createApp() { + return new StreamsApp() { + @Override + public void buildTopology(final StreamsBuilderX builder) { + final KStreamX input = builder.streamInput(); + + // topology definition + + input.toOutputTopic(); + } + + @Override + public String getUniqueAppId(final StreamsTopicConfig topics) { + return "streams-bootstrap-app-" + topics.getOutputTopic(); + } + + @Override + public SerdeConfig defaultSerializationConfig() { + return new SerdeConfig(StringSerde.class, StringSerde.class); + } + + @Override + public Map createKafkaProperties() { + return Map.of( + // additional Kafka properties + ); + } + }; + } +} +``` + +## Running the Application + +### Via Command Line Interface + +When packaged as a runnable JAR (for example, in a container), the `run` command is the default entrypoint: + +```bash +java -jar my-streams-app.jar \ + run \ + --bootstrap-servers kafka:9092 \ + --input-topics input-topic \ + --output-topic output-topic \ + --schema-registry-url http://schema-registry:8081 +``` + +Additional subcommands such as `clean` and `reset` are available for lifecycle management. + +### From the `main` Method + +In the `main` method, the application subclass starts the framework via: + +```java +public static void main(final String[] args) { + new MyStreamsApplication().startApplication(args); +} +``` + +This delegates configuration loading, lifecycle handling, and shutdown to `streams-bootstrap`. + +--- diff --git a/docs/docs/user/getting-started/setup.md b/docs/docs/user/getting-started/setup.md index feae8cb57..ec916e474 100644 --- a/docs/docs/user/getting-started/setup.md +++ b/docs/docs/user/getting-started/setup.md @@ -1 +1,156 @@ # Setup + +This page describes dependency setup, configuration options, commands, and Helm-based deployment for +`streams-bootstrap`. + +## Dependencies + +### Gradle + +```gradle +implementation group: 'com.bakdata.kafka', name: 'streams-bootstrap-cli', version: '6.1.0' +``` + +With Kotlin DSL: + +```gradle +implementation(group = "com.bakdata.kafka", name = "streams-bootstrap-cli", version = "6.1.0") +``` + +### Maven + +```xml + + + com.bakdata.kafka + streams-bootstrap-cli + 6.1.0 + +``` + +For other build tools or versions, refer to the +[latest version in MvnRepository](https://mvnrepository.com/artifact/com.bakdata.kafka/streams-bootstrap/latest). + +## Kafka Streams Applications + +Create a subclass of `KafkaStreamsApplication` and provide a `StreamsApp` via `createApp()`. + +Key responsibilities of `StreamsApp`: + +- Define the topology in `buildTopology(StreamsBuilderX builder)` +- Provide a unique application id in `getUniqueAppId(StreamsTopicConfig topics)` +- Configure default key/value SerDes via `defaultSerializationConfig()` +- Optionally provide extra Kafka properties via `createKafkaProperties()` + +A complete example is shown in the Quick Start page. + +### Configuration Options (Streams) + +The following CLI options are available: + +- `--bootstrap-servers`, `--bootstrap-server`: List of Kafka bootstrap servers (comma-separated) (**required**) +- `--schema-registry-url`: The URL of the Schema Registry +- `--kafka-config`: Kafka Streams configuration (`[,...]`) +- `--input-topics`: List of input topics (comma-separated) +- `--input-pattern`: Pattern of input topics +- `--output-topic`: The output topic +- `--error-topic`: A topic to write errors to +- `--labeled-input-topics`: Additional labeled input topics for different message types ( + `[,...]`) +- `--labeled-input-patterns`: Additional labeled input patterns for different message types ( + `[,...]`) +- `--labeled-output-topics`: Additional labeled output topics for different message types ( + `[,...]`) +- `--application-id`: Unique application ID to use for Kafka Streams. Can also be provided by implementing + `StreamsApp#getUniqueAppId()` +- `--volatile-group-instance-id`: Whether the group instance id is volatile, i.e., it changes on Streams shutdown + +Additional commands: + +- `clean`: Reset the Kafka Streams application. Additionally, delete the consumer group and all output and intermediate + topics associated with the Kafka Streams application. +- `reset`: Clear all state stores, consumer group offsets, and internal topics associated with the Kafka Streams + application. + +## Kafka Producer Applications + +Create a subclass of `KafkaProducerApplication`. + +```java +import com.bakdata.kafka.producer.KafkaProducerApplication; +import com.bakdata.kafka.producer.ProducerApp; +import com.bakdata.kafka.producer.ProducerBuilder; +import com.bakdata.kafka.producer.ProducerRunnable; +import com.bakdata.kafka.producer.SerializerConfig; +import java.util.Map; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.common.serialization.StringSerializer; + +public class MyProducerApplication extends KafkaProducerApplication { + + public static void main(final String[] args) { + new MyProducerApplication().startApplication(args); + } + + @Override + public ProducerApp createApp() { + return new ProducerApp() { + @Override + public ProducerRunnable buildRunnable(final ProducerBuilder builder) { + return () -> { + try (final Producer producer = builder.createProducer()) { + // producer logic + } + }; + } + + @Override + public SerializerConfig defaultSerializationConfig() { + return new SerializerConfig(StringSerializer.class, StringSerializer.class); + } + + @Override + public Map createKafkaProperties() { + return Map.of( + // additional Kafka properties + ); + } + }; + } +} +``` + +### Configuration Options (Producer) + +The following CLI options are available: + +- `--bootstrap-servers`, `--bootstrap-server`: List of Kafka bootstrap servers (comma-separated) (**required**) +- `--schema-registry-url`: The URL of the Schema Registry +- `--kafka-config`: Kafka producer configuration (`[,...]`) +- `--output-topic`: The output topic +- `--labeled-output-topics`: Additional labeled output topics (`[,...]`) + +Additional commands: + +- `clean`: Delete all output topics associated with the Kafka producer application. + +## Helm Charts + +For configuration and deployment to Kubernetes, use the provided Helm charts: + +- Streams applications: + - Chart: `charts/streams-app` + - Example configuration: `charts/streams-app/values.yaml` + - Cleanup job: `charts/streams-app-cleanup-job` + +- Producer applications: + - Chart: `charts/producer-app` + - Example configuration: `charts/producer-app/values.yaml` + - Cleanup job: `charts/producer-app-cleanup-job` + +To configure your streams app, you can use +the [`values.yaml`](https://github.com/bakdata/streams-bootstrap/blob/master/charts/streams-app/values.yaml) as a +starting point. +We also provide a chart +to [clean](https://github.com/bakdata/streams-bootstrap/tree/master/charts/streams-app-cleanup-job) your streams app. + diff --git a/docs/docs/user/monitoring.md b/docs/docs/user/monitoring.md index 31b5a7045..c6c63b01d 100644 --- a/docs/docs/user/monitoring.md +++ b/docs/docs/user/monitoring.md @@ -1 +1,368 @@ # Monitoring + +The framework provides features for monitoring your applications. + +- **JMX Metrics Export**: Applications built with `streams-bootstrap` can expose JMX (Java Management Extensions) + metrics, which provide insights into the performance and health of the Java application and the Kafka clients. +- **Prometheus Integration**: The Helm charts are configured to work with Prometheus, a popular open-source monitoring + and alerting toolkit. This allows you to scrape the JMX metrics and visualize them in dashboards (e.g., using + Grafana). + +## Monitoring and Observability + +The Helm charts provide integrated monitoring and observability for Kafka applications using a combination of +JMX, Prometheus, Kubernetes probes, and Services. Monitoring can be tailored from lightweight setups for development +to full production stacks with Prometheus Operator. + +### Monitoring Mechanisms + +| Mechanism | Use Case | Key Values | +|--------------------------|-------------------------------------|--------------------------| +| JMX remote access | Direct debugging and inspection | `jmx.enabled` | +| Prometheus JMX exporter | Production metrics collection | `prometheus.jmx.enabled` | +| Liveness probes | Container health checks | `livenessProbe` | +| Readiness probes | Traffic readiness / rollout control | `readinessProbe` | +| Custom ports and Service | HTTP / metrics endpoints exposure | `ports`, `service.*` | + +### JMX Configuration + +JMX (Java Management Extensions) provides direct access to application metrics and management operations, typically +used for development and debugging. + +Enable JMX in `values.yaml`: + +```yaml +jmx: + enabled: true + port: 5555 + host: localhost +``` + +Parameters: + +- `jmx.enabled`: Enable JMX port for remote access (default: `false`). +- `jmx.port`: JMX port number (default: `5555`). +- `jmx.host`: Host binding for the RMI server (default: `localhost`). + +When enabled, the chart configures the JVM with flags similar to: + +```text +-Dcom.sun.management.jmxremote +-Dcom.sun.management.jmxremote.port=5555 +-Dcom.sun.management.jmxremote.local.only=false +-Dcom.sun.management.jmxremote.authenticate=false +-Dcom.sun.management.jmxremote.ssl=false +-Djava.rmi.server.hostname=localhost +``` + +> **Security note**: The default JMX configuration disables authentication and SSL. For production, use +> port-forwarding or tunnel access instead of exposing JMX externally. + +Accessing JMX metrics from a local client: + +```bash +kubectl port-forward 5555:5555 +jconsole localhost:5555 +``` + +### Prometheus JMX Exporter + +For production monitoring, the Prometheus JMX Exporter runs as a sidecar container that scrapes JMX metrics from the +application and exposes them in Prometheus format. + +Enable the exporter in `values.yaml`: + +```yaml +prometheus: + jmx: + enabled: true + image: bitnami/jmx-exporter + imageTag: 1.1.0 + imagePullPolicy: Always + port: 5556 + metricRules: + - pattern: ".*" + resources: + requests: + cpu: 10m + memory: 100Mi + limits: + cpu: 100m + memory: 100Mi +``` + +Key parameters: + +- `prometheus.jmx.enabled`: Deploy JMX exporter sidecar (default: `false`). +- `prometheus.jmx.image`: Container image for the exporter (default: `bitnami/jmx-exporter`). +- `prometheus.jmx.imageTag`: Exporter image tag (default: `1.1.0`). +- `prometheus.jmx.port`: HTTP port for metrics endpoint (default: `5556`). +- `prometheus.jmx.metricRules`: JMX metric selection and mapping rules. +- `prometheus.jmx.resources`: Resource requests/limits for the exporter container. + +#### Metric Rules + +The `metricRules` section configures which JMX beans are exposed and how they are mapped to Prometheus metrics. The +default configuration uses `pattern: ".*"` to export all metrics, but production setups should restrict this to +relevant Kafka Streams/producer/consumer metrics. + +Example rule set: + +```yaml +prometheus: + jmx: + metricRules: + - pattern: "kafka.streams<>(.+):" + name: kafka_streams_$1_$5 + labels: + client_id: "$2" + $3: "$4" + - pattern: "kafka.producer<>(.+):" + name: kafka_producer_$1_$3 + labels: + client_id: "$2" +``` + +A ConfigMap containing the JMX exporter configuration is generated automatically by the chart and mounted into the +sidecar container. + +### Prometheus Integration + +#### Pod annotations + +For Prometheus instances that use pod annotations for discovery: + +```yaml +podAnnotations: + prometheus.io/scrape: "true" + prometheus.io/path: "/metrics" + prometheus.io/port: "5556" +``` + +This enables scraping the JMX exporter endpoint exposed on `prometheus.jmx.port`. + +#### Service and PodMonitor + +To expose metrics via a Service and integrate with Prometheus Operator using a `PodMonitor`: + +```yaml +service: + enabled: true + type: ClusterIP + labels: + monitoring: "true" + +ports: + - containerPort: 5556 + name: metrics + protocol: TCP + servicePort: 5556 +``` + +Example `PodMonitor`: + +```yaml +apiVersion: monitoring.coreos.com/v1 +kind: PodMonitor +metadata: + name: streams-app +spec: + selector: + matchLabels: + app.kubernetes.io/name: streams-app + podMetricsEndpoints: + - port: metrics + interval: 30s +``` + +### Health Checks + +Kubernetes uses liveness and readiness probes to determine when a pod is healthy and when it is ready to receive +traffic. + +**Liveness probes** restart containers that become unhealthy: + +```yaml +livenessProbe: + httpGet: + path: /health + port: 8080 + initialDelaySeconds: 30 + periodSeconds: 10 + timeoutSeconds: 5 + failureThreshold: 3 +``` + +**Readiness probes** gate traffic until the application is ready: + +```yaml +readinessProbe: + httpGet: + path: /ready + port: 8080 + initialDelaySeconds: 10 + periodSeconds: 5 + timeoutSeconds: 3 + successThreshold: 1 + failureThreshold: 3 +``` + +All standard Kubernetes probe types are supported (`httpGet`, `tcpSocket`, `exec`, and `grpc` on recent Kubernetes +versions). Probes are configured under the corresponding `livenessProbe` and `readinessProbe` sections in values. + +### Service and Port Configuration + +Ports and Services control how HTTP APIs and metrics endpoints are exposed: + +```yaml +service: + enabled: true + type: ClusterIP + +ports: + - containerPort: 8080 + name: http + protocol: TCP + servicePort: 80 + - containerPort: 5556 + name: metrics + protocol: TCP + servicePort: 5556 +``` + +Port mapping reference: + +- `jmx.port` → JMX remote port (default `5555`). +- `prometheus.jmx.port` → JMX exporter metrics port (default `5556`). +- Additional entries in `ports[]` → application-specific ports (e.g. HTTP APIs, custom metrics endpoints). + +### Monitoring Configuration Examples + +**Full monitoring stack** (JMX exporter, probes, Service, annotations): + +```yaml +prometheus: + jmx: + enabled: true + port: 5556 + metricRules: + - pattern: "kafka.streams<>(.+):" + name: kafka_streams_$2 + labels: + client_id: "$1" + +readinessProbe: + httpGet: + path: /ready + port: 8080 + initialDelaySeconds: 30 + periodSeconds: 10 + +livenessProbe: + httpGet: + path: /health + port: 8080 + initialDelaySeconds: 60 + periodSeconds: 30 + +service: + enabled: true + type: ClusterIP + +ports: + - containerPort: 8080 + name: http + servicePort: 80 + - containerPort: 5556 + name: metrics + servicePort: 5556 + +podAnnotations: + prometheus.io/scrape: "true" + prometheus.io/path: "/metrics" + prometheus.io/port: "5556" +``` + +**Development/debug configuration with JMX only**: + +```yaml +jmx: + enabled: true + port: 5555 + host: localhost + +prometheus: + jmx: + enabled: false + +livenessProbe: + tcpSocket: + port: 5555 + initialDelaySeconds: 30 + periodSeconds: 30 +``` + +**Minimal production configuration** with annotations and resource limits: + +```yaml +prometheus: + jmx: + enabled: true + resources: + requests: + cpu: 10m + memory: 100Mi + limits: + cpu: 100m + memory: 100Mi + +podAnnotations: + prometheus.io/scrape: "true" + prometheus.io/port: "5556" +``` + +### Application-Specific Considerations + +Kafka Streams applications expose metrics under several JMX domains, including `kafka.streams`, `kafka.producer`, and +`kafka.consumer`. Commonly monitored metrics include: + +- `kafka.streams.state`: Overall application state (running, rebalancing, error). +- `kafka.streams.commit-latency-avg`: Average commit latency. +- `kafka.consumer.records-lag-max`: Maximum records lag per partition. +- `kafka.producer.record-send-rate`: Producer throughput. + +Producer and consumer applications (via `producer-app` and `consumer-app` charts) use the same `prometheus.jmx` +structure +but may differ in availability patterns (for example, Jobs vs Deployments). + +### Troubleshooting + +**Common issues**: + +- No metrics endpoint: + - Ensure `prometheus.jmx.enabled: true`. +- Connection refused on JMX port: + - Ensure `jmx.enabled: true` and the port is exposed. +- Empty metrics response: + - Review `metricRules` patterns; overly restrictive rules may filter out all metrics. +- High exporter CPU usage: + - Avoid `pattern: ".*"` in production; use targeted patterns instead. +- Pod not ready: + - Validate liveness/readiness probe configuration and the corresponding application endpoints. + +**Verifying metrics export**: + +```bash +kubectl port-forward 5556:5556 +curl http://localhost:5556/metrics +``` + +**Debugging JMX connection**: + +```bash +kubectl port-forward 5555:5555 +jconsole localhost:5555 +``` + +If connection fails, verify that JMX is enabled, the port is mapped in `ports`, and the JVM has been started with the +correct JMX system properties. diff --git a/docs/docs/user/testing.md b/docs/docs/user/testing.md index f00b526a9..268c449df 100644 --- a/docs/docs/user/testing.md +++ b/docs/docs/user/testing.md @@ -1 +1,184 @@ # Testing + +The `streams-bootstrap` Testing Framework provides a comprehensive set of tools for testing Kafka Streams and Producer +applications. This framework simplifies both unit and integration testing by providing test abstractions that handle +Kafka infrastructure setup, Schema Registry integration, and consumer group verification. + +The framework supports testing with real Kafka clusters using TestContainers, mock Schema Registry for schema-aware +testing, and utilities for verifying application behavior and consumer group states. + +## Core Testing Components + +### KafkaTest Base Class + +`KafkaTest` is an abstract base class that sets up a Kafka environment using TestContainers. It provides: + +- Kafka container setup +- Access to bootstrap servers and Schema Registry +- Methods for waiting on consumer group states +- Integration with `TestSchemaRegistry` +- Creation of `KafkaTestClient` instances + +### KafkaTestClient + +`KafkaTestClient` is a fluent test client that simplifies: + +- Producing data +- Consuming records +- Admin operations +- Topic creation and verification + +### ConsumerGroupVerifier + +Provides tools to: + +- Check if a group is active or closed +- Get current group state +- Verify processing completion (lag = 0) +- Compute lag manually + +## Unit Testing with `fluent-kafka-streams-tests` + +The framework integrates with `fluent-kafka-streams-tests` for unit testing Kafka Streams topologies. + +## TestSchemaRegistry + +`TestSchemaRegistry` provides built-in support for Schema Registry in tests using a mock implementation. It creates +isolated Schema Registry instances for testing schema-aware applications. + +### Features: + +- Random scoped mock URLs to avoid collisions +- Support for custom mock URLs +- Configurable schema providers +- Compatible with Confluent’s `MockSchemaRegistry` + +### Example: + +```java +// Random scope +TestSchemaRegistry registry = new TestSchemaRegistry(); + +// Custom scope +TestSchemaRegistry registry = new TestSchemaRegistry("mock://custom-scope"); + +// Default providers +SchemaRegistryClient client = this.registry.getSchemaRegistryClient(); + +// With custom providers +List providers = List.of(new ProtobufSchemaProvider()); +SchemaRegistryClient client = this.registry.getSchemaRegistryClient(this.providers); +``` + +## Integration Testing with TestContainers + +For integration tests that require a real Kafka environment, the framework provides integration with TestContainers. + +### Single Node Kafka Testing + +`KafkaTest` provides a base class for integration tests with a single Kafka broker. + +### Multi-Node Cluster Testing + +For testing with multi-node Kafka clusters, the framework provides `ApacheKafkaContainerCluster`: + +Example usage: + +```java +ApacheKafkaContainerCluster cluster = new ApacheKafkaContainerCluster("3.4.0", 3, 2); +cluster. + +start(); + +String bootstrapServers = this.cluster.getBootstrapServers(); +// Run tests... +cluster. + +stop(); +``` + +### Features: + +- Configurable broker count +- Configurable replication factor for internal topics +- Uses KRaft (no ZooKeeper) +- Waits for all brokers to be ready before returning + +## Utilities for Kafka Testing + +### KafkaTestClient Operations + +`KafkaTestClient` provides a fluent API for common Kafka operations in tests: + +#### Topic Management + +```java +KafkaTestClient client = newTestClient(); + +// Create topic with default settings (1 partition, 1 replica) +client.createTopic("my-topic"); + +// Create topic with custom settings +client.createTopic("my-topic", + KafkaTestClient.defaultTopicSettings() + .partitions(3) + .replicationFactor(1) + .build()); + +// Create topic with config +Map config = Map.of("cleanup.policy", "compact"); +client.createTopic("my-topic",settings, config); + +// Check if topic exists +boolean exists = this.client.existsTopic("my-topic"); +``` + +#### Data Production + +```java +client.send() + .withKeySerializer(new StringSerializer()) + .withValueSerializer(new StringSerializer()) + +to("topic-name",List.of( + new SimpleProducerRecord<>("key1","value1"), + new SimpleProducerRecord<>("key2","value2") + )); +``` + +#### Data Consumption + +```java +List> records = client.read() + .withKeyDeserializer(new StringDeserializer()) + .withValueDeserializer(new StringDeserializer()) + .from("topic-name", Duration.ofSeconds(10)); +``` + +## Administrative Operations + +`KafkaTestClient` provides access to administrative operations through `AdminClientX`: + +```java +try(AdminClientX admin = client.admin()){ +TopicClient topicClient = this.admin.getTopicClient(); +ConsumerGroupClient consumerGroupClient = this.admin.getConsumerGroupClient(); +} +``` + +## Consumer Group Verification + +The framework provides utilities for verifying consumer group states: + +```java +// Wait for application to become active +awaitActive(app); + +// Wait for completion of processing +awaitProcessing(app); + +// Wait for app to shut down +awaitClosed(app); +``` + +These methods ensure test reliability by validating consumer group behavior via `ConsumerGroupVerifier`. diff --git a/docs/mkdocs.yml b/docs/mkdocs.yml index f7b583021..1f1e55a32 100644 --- a/docs/mkdocs.yml +++ b/docs/mkdocs.yml @@ -31,6 +31,8 @@ nav: - Common concepts: user/concepts/common.md - Streams concepts: user/concepts/streams.md - Producer concepts: user/concepts/producer.md + - Consumer concepts: user/concepts/consumer.md + - Consumer-Producer concepts: user/concepts/consumer-producer.md - Testing: user/testing.md - Monitoring: user/monitoring.md - Extensions: From 24eca859b3cfad548ad50f3309579e7750e4afaf Mon Sep 17 00:00:00 2001 From: vostres Date: Wed, 28 Jan 2026 12:08:53 +0100 Subject: [PATCH 2/9] Apply suggestions from code review Co-authored-by: Philipp Schirmer --- docs/docs/index.md | 4 ++-- docs/docs/user/deployment/kubernetes.md | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/docs/index.md b/docs/docs/index.md index 64677fc48..a0fd25112 100644 --- a/docs/docs/index.md +++ b/docs/docs/index.md @@ -20,12 +20,12 @@ prescribe: `streams-bootstrap` addresses these aspects by supplying: 1. **Standardized base classes** for Kafka Streams and client applications. -2. **A common CLI/configuration contract** for all streaming services. +2. **A common CLI/configuration contract** for all Kafka applications. 3. **Helm-based deployment templates** and conventions for Kubernetes. 4. **Built-in reset/clean workflows** for reprocessing and state management. 5. **Consistent error-handling** and dead-letter integration. 6. **Testing infrastructure** for local development and CI environments. -7. **Optional S3-backed serialization** for large messages. +7. **Optional blob-storage-backed serialization** for large messages. ## Framework Architecture diff --git a/docs/docs/user/deployment/kubernetes.md b/docs/docs/user/deployment/kubernetes.md index 33e7fe175..74aca8b7b 100644 --- a/docs/docs/user/deployment/kubernetes.md +++ b/docs/docs/user/deployment/kubernetes.md @@ -36,7 +36,7 @@ groups, and Schema Registry subjects associated with a release before a new depl The Helm charts are published as a Helm repository: ```bash -helm repo add bakdata-common https://bakdata.github.io/streams-bootstrap/ +helm repo add streams-bootstrap https://bakdata.github.io/streams-bootstrap/ helm repo update ``` From 30fe5b088bcb6b1217adeb4d22850f83f579d895 Mon Sep 17 00:00:00 2001 From: vostres Date: Wed, 28 Jan 2026 17:19:54 +0100 Subject: [PATCH 3/9] Implement review (part 1) --- docs/docs/index.md | 11 +- docs/docs/user/concepts/common.md | 27 ++-- docs/docs/user/concepts/producer.md | 17 --- docs/docs/user/concepts/streams.md | 44 ++----- docs/docs/user/deployment/kubernetes.md | 7 +- docs/docs/user/extensions/large-messages.md | 110 +++++----------- docs/docs/user/getting-started/quick-start.md | 10 -- docs/docs/user/getting-started/setup.md | 124 ------------------ docs/docs/user/monitoring.md | 51 ++----- docs/docs/user/testing.md | 73 ++++------- 10 files changed, 96 insertions(+), 378 deletions(-) diff --git a/docs/docs/index.md b/docs/docs/index.md index a0fd25112..e8fea159d 100644 --- a/docs/docs/index.md +++ b/docs/docs/index.md @@ -27,20 +27,15 @@ prescribe: 6. **Testing infrastructure** for local development and CI environments. 7. **Optional blob-storage-backed serialization** for large messages. -## Framework Architecture +## Architecture The framework uses a modular architecture with a clear separation of concerns. ### Core Modules -- `streams-bootstrap-core`: Base classes such as `KafkaApplication`, `Runner`, and `CleanUpRunner` +- `streams-bootstrap-core`: Core abstractions for application lifecycle, execution, and cleanup - `streams-bootstrap-cli`: CLI framework based on `picocli` -- `streams-bootstrap-test`: Testing utilities (`TestApplicationRunner`, `KafkaTestClient`) +- `streams-bootstrap-test`: Utilities for testing streams-bootstrap applications - `streams-bootstrap-large-messages`: Support for handling large Kafka messages - `streams-bootstrap-cli-test`: Test support for CLI-based applications -### External Dependencies - -- Apache Kafka: `kafka-streams`, `kafka-clients` -- Confluent Platform: Schema Registry and Avro SerDes -- Picocli: Command-line parsing and CLI framework diff --git a/docs/docs/user/concepts/common.md b/docs/docs/user/concepts/common.md index 335f2a7e3..04bb682cb 100644 --- a/docs/docs/user/concepts/common.md +++ b/docs/docs/user/concepts/common.md @@ -2,8 +2,11 @@ ## Application types -The **streams-bootstrap** framework uses a three-layer application type hierarchy: -**App → ConfiguredApp → ExecutableApp** +In streams-bootstrap, there are three application types: + +- **App** +- **ConfiguredApp** +- **ExecutableApp** --- @@ -44,11 +47,6 @@ It can create: - a **Runner** for running the application - a **CleanUpRunner** for cleanup operations -The `KafkaApplication` base class orchestrates the creation of these components through methods such as: - -- `createConfiguredApp()` -- `createExecutableApp()` - --- ### Usage Pattern @@ -107,6 +105,8 @@ The framework provides a built-in mechanism to clean up all resources associated When the cleanup operation is triggered, the following resources are removed: +**TODO:** extend the table for new consumer apps + | Resource Type | Description | Streams Apps | Producer Apps | |---------------------|-----------------------------------------------------------|--------------|---------------| | Output Topics | The main output topic of the application | ✓ | ✓ | @@ -163,6 +163,14 @@ APP_OUTPUT_TOPIC → --output-topic Additionally, Kafka-specific environment variables with the `KAFKA_` prefix are automatically added to the Kafka configuration. +### Schema Registry integration + +When the `--schema-registry-url` option is provided: + +- Schemas are registered automatically during application startup +- Schema cleanup is handled as part of the `clean` command +- Schema evolution is fully supported + ## Command line interface The framework provides a unified command-line interface for application configuration. @@ -178,8 +186,3 @@ The framework provides a unified command-line interface for application configur - `--bootstrap-servers`: Kafka bootstrap servers (required) - `--schema-registry-url`: URL for Avro serialization - `--kafka-config`: Key-value Kafka configuration -- `--output-topic`: Main output topic -- `--labeled-output-topics`: Named output topics -- `--input-topics`: Input topics (for Streams apps) -- `--input-pattern`: Input topic pattern (for Streams apps) -- `--application-id`: Unique app ID (for Streams apps) diff --git a/docs/docs/user/concepts/producer.md b/docs/docs/user/concepts/producer.md index aebd70b43..9937ef757 100644 --- a/docs/docs/user/concepts/producer.md +++ b/docs/docs/user/concepts/producer.md @@ -68,15 +68,6 @@ public SerializerConfig defaultSerializationConfig() { } ``` -Common serializer combinations include: - -| Key Serializer | Value Serializer | Use case | -|-----------------------|--------------------------|----------------------------| -| `StringSerializer` | `StringSerializer` | Simple string messages | -| `StringSerializer` | `SpecificAvroSerializer` | Avro with schema evolution | -| `StringSerializer` | `GenericAvroSerializer` | Dynamic Avro schemas | -| `ByteArraySerializer` | `ByteArraySerializer` | Binary data | - ### Custom Kafka properties Producer-specific Kafka configuration can be customized by overriding `createKafkaProperties()`: @@ -96,14 +87,6 @@ public Map createKafkaProperties() { These properties are merged with the framework defaults and CLI-provided configuration. -### Schema Registry integration - -When the `--schema-registry-url` option is provided: - -- Schemas are registered automatically during application startup -- Schema cleanup is handled as part of the `clean` command -- Schema evolution is fully supported - --- ## Command line interface diff --git a/docs/docs/user/concepts/streams.md b/docs/docs/user/concepts/streams.md index 3f4091118..262b6c8a7 100644 --- a/docs/docs/user/concepts/streams.md +++ b/docs/docs/user/concepts/streams.md @@ -35,13 +35,12 @@ application definition and configuration. This is useful for reprocessing input When a reset is triggered, the following resources are affected: -| Resource | Action | -|--------------------|-------------------------------------------| -| State stores | Cleared locally, changelog topics deleted | -| Internal topics | Deleted (e.g. repartition topics) | -| Consumer offsets | Reset to earliest for input topics | -| Output topics | Preserved | -| Application config | Preserved | +| Resource | Action | +|------------------|-------------------------------------------| +| State stores | Cleared locally, changelog topics deleted | +| Internal topics | Deleted (e.g. repartition topics) | +| Consumer offsets | Reset to earliest for input topics | +| Output topics | Preserved | Triggering a reset via CLI: @@ -65,18 +64,13 @@ After a reset, the application can be started again and will reprocess all input ### Cleaning an application -The `clean` command removes Kafka-related resources created by the application: +The `clean` command performs everything that `reset` does and additionally removes the Kafka consumer groups created by +the application. ```bash java -jar my-streams-app.jar clean ``` -This includes: - -- Consumer groups -- Internal topics -- Output topics (unless explicitly preserved) - --- ## Configuration @@ -99,8 +93,6 @@ Streams applications support flexible topic configuration: - `--application-id`: Unique Kafka Streams application ID -If not provided, the framework generates a deterministic application ID. - --- ### Kafka properties @@ -124,27 +116,11 @@ producer.compression.type=gzip #### Setup -Custom uncaught exception handling can be provided by overriding the default handler: - -```java - -@Override -private StreamsUncaughtExceptionHandler createUncaughtExceptionHandler() { - return new MyCustomExceptionHandler(); -} -``` +TODO #### Clean up -Applications can observe Kafka Streams state transitions by registering a state listener: - -```java - -@Override -private StateListener createStateListener() { - return new MyCustomStateListener(); -} -``` +TODO --- diff --git a/docs/docs/user/deployment/kubernetes.md b/docs/docs/user/deployment/kubernetes.md index 74aca8b7b..773bde633 100644 --- a/docs/docs/user/deployment/kubernetes.md +++ b/docs/docs/user/deployment/kubernetes.md @@ -22,13 +22,10 @@ The framework ships a set of Helm charts tailored to different application types |------------------------|-----------------------------------------------|--------------------------------| | `streams-app` | Deploy Kafka Streams applications | `Deployment`, `StatefulSet` | | `producer-app` | Deploy Kafka Producer applications | `Deployment`, `Job`, `CronJob` | -| `consumer-app` | Deploy Kafka Consumer applications | `Deployment` | -| `consumerproducer-app` | Deploy batch / consumer–producer applications | `Deployment` | +| `consumer-app` | Deploy Kafka Consumer applications | `Deployment`, `StatefulSet` | +| `consumerproducer-app` | Deploy batch / consumer–producer applications | `Deployment`, `StatefulSet` | | `*-cleanup-job` | Clean Kafka resources before deployment | `Job` (Helm hooks) | -Cleanup charts (for example `streams-app-cleanup-job` and `producer-app-cleanup-job`) remove Kafka topics, consumer -groups, and Schema Registry subjects associated with a release before a new deployment. - --- ## Chart repository and installation diff --git a/docs/docs/user/extensions/large-messages.md b/docs/docs/user/extensions/large-messages.md index 19c25bd4d..a40db4877 100644 --- a/docs/docs/user/extensions/large-messages.md +++ b/docs/docs/user/extensions/large-messages.md @@ -3,104 +3,56 @@ ## Overview The **Large Messages Extension** adds support for handling messages that exceed Kafka's size limitations by using -external storage mechanisms with automatic cleanup. +external storage mechanisms with automatic cleanup. It integrates with the *streams-bootstrap* framework to transparently manage: - large message serialization - large message deserialization -- external storage cleanup +- blob storage files cleanup -For both **producer** and **streams** applications. +For more details, see the large messages +module: [streams-bootstrap-large-messages GitHub repository](https://github.com/bakdata/streams-bootstrap/tree/master/streams-bootstrap-large-messages) ---- - -## Core Components - -### LargeMessageAppUtils - -`LargeMessageAppUtils` is the central utility for managing large message cleanup operations. -It provides factory methods for creating **topic hooks** that automatically clean up externally stored large message -files when topics are deleted. - -**Key methods:** - -- `createTopicHook(Map kafkaProperties)` - Creates a cleanup hook from Kafka properties. -- `createTopicHook(AppConfiguration configuration)` - Creates a cleanup hook from application configuration. -- `registerTopicHook()` - Registers the cleanup hook with a cleanup configuration. - ---- +There are two supported ways to enable cleanup for large messages: -### LargeMessageProducerApp - -`LargeMessageProducerApp` extends `ProducerApp` to automatically handle cleanup of large message files for producer -applications. - -When a topic is deleted, it removes all associated large message files stored externally via the -`LargeMessageSerializer`. - -The interface automatically registers the cleanup hook inside `setupCleanUp()`: +- Implement `LargeMessageStreamsApp` +- Register a topic cleanup hook manually --- -### LargeMessageStreamsApp - -`LargeMessageStreamsApp` extends `StreamsApp` to provide automatic cleanup for Kafka Streams applications using large -messages. - -When streams topics are cleaned up, the extension ensures corresponding external large message files are also removed. - -It also registers the cleanup hook during `setupCleanUp()`: - ---- +### Option 1: Implement `LargeMessageStreamsApp` -## Implementation Details +Use this option for Kafka Streams applications where large message cleanup should always run together with topic +cleanup. -### Topic Hook Mechanism - -Cleanup is implemented through the `LargeMessageTopicHook` class, which implements the `TopicHook` interface. - -When a topic is deleted, the hook's `deleted()` method is triggered, which calls `deleteAllFiles()` on the configured -`LargeMessageStoringClient`. - ---- +```java +public final class MyStreamsApp implements LargeMessageStreamsApp { -### Configuration Requirements + @Override + public void buildTopology(final StreamsBuilderX builder) { + // build topology here + } +} +``` -The large message extension requires Kafka properties needed to build an `AbstractLargeMessageConfig`. +### Option 2: Register a cleanup hook manually -This configuration is used to instantiate the appropriate `LargeMessageStoringClient` for the storage backend. +If cleanup should only happen conditionally or requires custom behavior, a topic hook can be registered explicitly. ---- +```java +private final boolean largeMessageCleanupEnabled; -## Usage Examples +@Override +public StreamsCleanUpConfiguration setupCleanUp( + final AppConfiguration configuration) { -### Producer Application + final StreamsCleanUpConfiguration cleanUp = + StreamsApp.super.setupCleanUp(configuration); -```java -public class MyLargeMessageProducer extends KafkaProducerApplication { - @Override - public LargeMessageProducerApp createApp() { - return new LargeMessageProducerApp() { - - @Override - public ProducerRunnable buildRunnable(final ProducerBuilder builder) { - return () -> { - try (final Producer producer = builder.createProducer()) { - // Producer logic with large message support - } - }; - } - - @Override - public SerializerConfig defaultSerializationConfig() { - // Configure LargeMessageSerializer - return new SerializerConfig(LargeMessageSerializer.class, LargeMessageSerializer.class); - } - }; + if (this.largeMessageCleanupEnabled) { + LargeMessageAppUtils.registerTopicHook(cleanUp, configuration); } -} + return cleanUp; +} ``` diff --git a/docs/docs/user/getting-started/quick-start.md b/docs/docs/user/getting-started/quick-start.md index 000272bb1..948d22f94 100644 --- a/docs/docs/user/getting-started/quick-start.md +++ b/docs/docs/user/getting-started/quick-start.md @@ -6,7 +6,6 @@ This page shows how to add `streams-bootstrap` to a project and how to create an - Java 17 - Apache Kafka cluster (brokers reachable from the application) -- Access to Maven Central - `streams-bootstrap-cli` dependency (see [Setup](setup.md) for Gradle/Maven snippets) ## Minimal Kafka Streams Application @@ -35,9 +34,7 @@ public class MyStreamsApplication extends KafkaStreamsApplication { @Override public void buildTopology(final StreamsBuilderX builder) { final KStreamX input = builder.streamInput(); - // topology definition - input.toOutputTopic(); } @@ -50,13 +47,6 @@ public class MyStreamsApplication extends KafkaStreamsApplication { public SerdeConfig defaultSerializationConfig() { return new SerdeConfig(StringSerde.class, StringSerde.class); } - - @Override - public Map createKafkaProperties() { - return Map.of( - // additional Kafka properties - ); - } }; } } diff --git a/docs/docs/user/getting-started/setup.md b/docs/docs/user/getting-started/setup.md index ec916e474..95eba5ce6 100644 --- a/docs/docs/user/getting-started/setup.md +++ b/docs/docs/user/getting-started/setup.md @@ -30,127 +30,3 @@ implementation(group = "com.bakdata.kafka", name = "streams-bootstrap-cli", vers For other build tools or versions, refer to the [latest version in MvnRepository](https://mvnrepository.com/artifact/com.bakdata.kafka/streams-bootstrap/latest). - -## Kafka Streams Applications - -Create a subclass of `KafkaStreamsApplication` and provide a `StreamsApp` via `createApp()`. - -Key responsibilities of `StreamsApp`: - -- Define the topology in `buildTopology(StreamsBuilderX builder)` -- Provide a unique application id in `getUniqueAppId(StreamsTopicConfig topics)` -- Configure default key/value SerDes via `defaultSerializationConfig()` -- Optionally provide extra Kafka properties via `createKafkaProperties()` - -A complete example is shown in the Quick Start page. - -### Configuration Options (Streams) - -The following CLI options are available: - -- `--bootstrap-servers`, `--bootstrap-server`: List of Kafka bootstrap servers (comma-separated) (**required**) -- `--schema-registry-url`: The URL of the Schema Registry -- `--kafka-config`: Kafka Streams configuration (`[,...]`) -- `--input-topics`: List of input topics (comma-separated) -- `--input-pattern`: Pattern of input topics -- `--output-topic`: The output topic -- `--error-topic`: A topic to write errors to -- `--labeled-input-topics`: Additional labeled input topics for different message types ( - `[,...]`) -- `--labeled-input-patterns`: Additional labeled input patterns for different message types ( - `[,...]`) -- `--labeled-output-topics`: Additional labeled output topics for different message types ( - `[,...]`) -- `--application-id`: Unique application ID to use for Kafka Streams. Can also be provided by implementing - `StreamsApp#getUniqueAppId()` -- `--volatile-group-instance-id`: Whether the group instance id is volatile, i.e., it changes on Streams shutdown - -Additional commands: - -- `clean`: Reset the Kafka Streams application. Additionally, delete the consumer group and all output and intermediate - topics associated with the Kafka Streams application. -- `reset`: Clear all state stores, consumer group offsets, and internal topics associated with the Kafka Streams - application. - -## Kafka Producer Applications - -Create a subclass of `KafkaProducerApplication`. - -```java -import com.bakdata.kafka.producer.KafkaProducerApplication; -import com.bakdata.kafka.producer.ProducerApp; -import com.bakdata.kafka.producer.ProducerBuilder; -import com.bakdata.kafka.producer.ProducerRunnable; -import com.bakdata.kafka.producer.SerializerConfig; -import java.util.Map; -import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.common.serialization.StringSerializer; - -public class MyProducerApplication extends KafkaProducerApplication { - - public static void main(final String[] args) { - new MyProducerApplication().startApplication(args); - } - - @Override - public ProducerApp createApp() { - return new ProducerApp() { - @Override - public ProducerRunnable buildRunnable(final ProducerBuilder builder) { - return () -> { - try (final Producer producer = builder.createProducer()) { - // producer logic - } - }; - } - - @Override - public SerializerConfig defaultSerializationConfig() { - return new SerializerConfig(StringSerializer.class, StringSerializer.class); - } - - @Override - public Map createKafkaProperties() { - return Map.of( - // additional Kafka properties - ); - } - }; - } -} -``` - -### Configuration Options (Producer) - -The following CLI options are available: - -- `--bootstrap-servers`, `--bootstrap-server`: List of Kafka bootstrap servers (comma-separated) (**required**) -- `--schema-registry-url`: The URL of the Schema Registry -- `--kafka-config`: Kafka producer configuration (`[,...]`) -- `--output-topic`: The output topic -- `--labeled-output-topics`: Additional labeled output topics (`[,...]`) - -Additional commands: - -- `clean`: Delete all output topics associated with the Kafka producer application. - -## Helm Charts - -For configuration and deployment to Kubernetes, use the provided Helm charts: - -- Streams applications: - - Chart: `charts/streams-app` - - Example configuration: `charts/streams-app/values.yaml` - - Cleanup job: `charts/streams-app-cleanup-job` - -- Producer applications: - - Chart: `charts/producer-app` - - Example configuration: `charts/producer-app/values.yaml` - - Cleanup job: `charts/producer-app-cleanup-job` - -To configure your streams app, you can use -the [`values.yaml`](https://github.com/bakdata/streams-bootstrap/blob/master/charts/streams-app/values.yaml) as a -starting point. -We also provide a chart -to [clean](https://github.com/bakdata/streams-bootstrap/tree/master/charts/streams-app-cleanup-job) your streams app. - diff --git a/docs/docs/user/monitoring.md b/docs/docs/user/monitoring.md index c6c63b01d..aae1a8728 100644 --- a/docs/docs/user/monitoring.md +++ b/docs/docs/user/monitoring.md @@ -16,13 +16,12 @@ to full production stacks with Prometheus Operator. ### Monitoring Mechanisms -| Mechanism | Use Case | Key Values | -|--------------------------|-------------------------------------|--------------------------| -| JMX remote access | Direct debugging and inspection | `jmx.enabled` | -| Prometheus JMX exporter | Production metrics collection | `prometheus.jmx.enabled` | -| Liveness probes | Container health checks | `livenessProbe` | -| Readiness probes | Traffic readiness / rollout control | `readinessProbe` | -| Custom ports and Service | HTTP / metrics endpoints exposure | `ports`, `service.*` | +| Mechanism | Use Case | Key Values | +|-------------------------|-------------------------------------|--------------------------| +| JMX remote access | Direct debugging and inspection | `jmx.enabled` | +| Prometheus JMX exporter | Production metrics collection | `prometheus.jmx.enabled` | +| Liveness probes | Container health checks | `livenessProbe` | +| Readiness probes | Traffic readiness / rollout control | `readinessProbe` | ### JMX Configuration @@ -55,9 +54,6 @@ When enabled, the chart configures the JVM with flags similar to: -Djava.rmi.server.hostname=localhost ``` -> **Security note**: The default JMX configuration disables authentication and SSL. For production, use -> port-forwarding or tunnel access instead of exposing JMX externally. - Accessing JMX metrics from a local client: ```bash @@ -141,39 +137,12 @@ podAnnotations: This enables scraping the JMX exporter endpoint exposed on `prometheus.jmx.port`. -#### Service and PodMonitor - -To expose metrics via a Service and integrate with Prometheus Operator using a `PodMonitor`: - -```yaml -service: - enabled: true - type: ClusterIP - labels: - monitoring: "true" - -ports: - - containerPort: 5556 - name: metrics - protocol: TCP - servicePort: 5556 -``` +### PodMonitor -Example `PodMonitor`: +For more advanced Prometheus Operator setups, a `PodMonitor` custom resource can be deployed. -```yaml -apiVersion: monitoring.coreos.com/v1 -kind: PodMonitor -metadata: - name: streams-app -spec: - selector: - matchLabels: - app.kubernetes.io/name: streams-app - podMetricsEndpoints: - - port: metrics - interval: 30s -``` +The `streams-bootstrap` repository provides a reference `PodMonitor` +configuration: [monitoring/pod_monitor.yaml](https://github.com/bakdata/streams-bootstrap/blob/1ff01c2f/monitoring/pod_monitor.yaml) ### Health Checks diff --git a/docs/docs/user/testing.md b/docs/docs/user/testing.md index 268c449df..b7c9ef595 100644 --- a/docs/docs/user/testing.md +++ b/docs/docs/user/testing.md @@ -9,16 +9,6 @@ testing, and utilities for verifying application behavior and consumer group sta ## Core Testing Components -### KafkaTest Base Class - -`KafkaTest` is an abstract base class that sets up a Kafka environment using TestContainers. It provides: - -- Kafka container setup -- Access to bootstrap servers and Schema Registry -- Methods for waiting on consumer group states -- Integration with `TestSchemaRegistry` -- Creation of `KafkaTestClient` instances - ### KafkaTestClient `KafkaTestClient` is a fluent test client that simplifies: @@ -39,36 +29,7 @@ Provides tools to: ## Unit Testing with `fluent-kafka-streams-tests` -The framework integrates with `fluent-kafka-streams-tests` for unit testing Kafka Streams topologies. - -## TestSchemaRegistry - -`TestSchemaRegistry` provides built-in support for Schema Registry in tests using a mock implementation. It creates -isolated Schema Registry instances for testing schema-aware applications. - -### Features: - -- Random scoped mock URLs to avoid collisions -- Support for custom mock URLs -- Configurable schema providers -- Compatible with Confluent’s `MockSchemaRegistry` - -### Example: - -```java -// Random scope -TestSchemaRegistry registry = new TestSchemaRegistry(); - -// Custom scope -TestSchemaRegistry registry = new TestSchemaRegistry("mock://custom-scope"); - -// Default providers -SchemaRegistryClient client = this.registry.getSchemaRegistryClient(); - -// With custom providers -List providers = List.of(new ProtobufSchemaProvider()); -SchemaRegistryClient client = this.registry.getSchemaRegistryClient(this.providers); -``` +The framework integrates with `fluent-kafka-streams-tests` for unit testing Kafka Streams topologies.``` ## Integration Testing with TestContainers @@ -116,18 +77,30 @@ stop(); KafkaTestClient client = newTestClient(); // Create topic with default settings (1 partition, 1 replica) -client.createTopic("my-topic"); +client. + +createTopic("my-topic"); // Create topic with custom settings -client.createTopic("my-topic", +client. + +createTopic("my-topic", KafkaTestClient.defaultTopicSettings() - .partitions(3) - .replicationFactor(1) - .build()); + . + +partitions(3) + . + +replicationFactor(1) + . + +build()); // Create topic with config Map config = Map.of("cleanup.policy", "compact"); -client.createTopic("my-topic",settings, config); +client. + +createTopic("my-topic",settings, config); // Check if topic exists boolean exists = this.client.existsTopic("my-topic"); @@ -137,8 +110,12 @@ boolean exists = this.client.existsTopic("my-topic"); ```java client.send() - .withKeySerializer(new StringSerializer()) - .withValueSerializer(new StringSerializer()) + . + +withKeySerializer(new StringSerializer()) + . + +withValueSerializer(new StringSerializer()) to("topic-name",List.of( new SimpleProducerRecord<>("key1","value1"), From 1cb4a4a4e5f51ffa4d1f980a350f9504ce437f31 Mon Sep 17 00:00:00 2001 From: vostres Date: Wed, 28 Jan 2026 19:00:54 +0100 Subject: [PATCH 4/9] Implement review (part 2) --- docs/docs/user/concepts/common.md | 2 +- docs/docs/user/concepts/producer.md | 49 +++++++++++++++---------- docs/docs/user/concepts/streams.md | 4 +- docs/docs/user/deployment/kubernetes.md | 3 +- 4 files changed, 35 insertions(+), 23 deletions(-) diff --git a/docs/docs/user/concepts/common.md b/docs/docs/user/concepts/common.md index 04bb682cb..a37d8eb7f 100644 --- a/docs/docs/user/concepts/common.md +++ b/docs/docs/user/concepts/common.md @@ -97,7 +97,7 @@ Applications built with streams-bootstrap can be started in two primary ways: public static void main(final String[] args) { new MyStreamsApplication().startApplication(args); } - ``` +``` ### Cleaning an application diff --git a/docs/docs/user/concepts/producer.md b/docs/docs/user/concepts/producer.md index 9937ef757..cc4163744 100644 --- a/docs/docs/user/concepts/producer.md +++ b/docs/docs/user/concepts/producer.md @@ -1,10 +1,10 @@ # Producer applications -Producer applications are applications that generate data and send it to Kafka topics. They can be used to produce -messages from various sources, such as databases, files, or real-time events. +Producer applications generate data and send it to Kafka topics. They can be used to produce messages from various +sources, such as databases, files, or real-time events. -The `streams-bootstrap` framework provides a structured way to build Kafka producer applications with built-in -configuration handling, command-line support, and resource lifecycle management. +streams-bootstrap provides a structured way to build producer applications with consistent configuration handling, +command-line support, and lifecycle management. --- @@ -21,10 +21,11 @@ Unlike Kafka Streams applications, producer applications typically: The execution model is fully controlled by the producer implementation and its runnable logic. +--- + ### Cleaning an application -Producer applications support a dedicated `clean` command that removes Kafka-related resources created by the -application. +Producer applications support a dedicated `clean` command. ```bash java -jar my-producer-app.jar \ @@ -39,17 +40,7 @@ The clean process can perform the following operations: - Delete registered schemas from Schema Registry - Execute custom cleanup hooks defined by the application -Applications can register custom cleanup logic by overriding `setupCleanUp`: - -```java - -@Override -public void setupCleanUp(final EffectiveAppConfiguration configuration) { - configuration.addCleanupHook(() -> { - // Custom cleanup logic - }); -} -``` +Applications can register custom cleanup logic by overriding `setupCleanUp`. --- @@ -68,7 +59,7 @@ public SerializerConfig defaultSerializationConfig() { } ``` -### Custom Kafka properties +### Kafka properties Producer-specific Kafka configuration can be customized by overriding `createKafkaProperties()`: @@ -85,7 +76,27 @@ public Map createKafkaProperties() { } ``` -These properties are merged with the framework defaults and CLI-provided configuration. +These properties are merged with defaults and CLI-provided configuration. + +--- + +### Lifecycle hooks + +#### Clean up + +Custom cleanup logic that is not tied to Kafka topics can be registered via cleanup hooks: + +```java + +@Override +public void setupCleanUp(final EffectiveAppConfiguration configuration) { + configuration.addCleanupHook(() -> { + // Custom cleanup logic + }); +} +``` + +Topic-related cleanup should be implemented using topic hooks. --- diff --git a/docs/docs/user/concepts/streams.md b/docs/docs/user/concepts/streams.md index 262b6c8a7..259a03da6 100644 --- a/docs/docs/user/concepts/streams.md +++ b/docs/docs/user/concepts/streams.md @@ -64,8 +64,8 @@ After a reset, the application can be started again and will reprocess all input ### Cleaning an application -The `clean` command performs everything that `reset` does and additionally removes the Kafka consumer groups created by -the application. +The `clean` command performs everything that `reset` does and additionally removes the Kafka consumer groups and output +topics created by the application. ```bash java -jar my-streams-app.jar clean diff --git a/docs/docs/user/deployment/kubernetes.md b/docs/docs/user/deployment/kubernetes.md index 773bde633..c66bf3096 100644 --- a/docs/docs/user/deployment/kubernetes.md +++ b/docs/docs/user/deployment/kubernetes.md @@ -57,7 +57,8 @@ Streams applications support both stateless and stateful deployment modes: - **StatefulSet** - Used for stateful Kafka Streams applications with local state stores - - Enabled when `statefulSet: true` and `persistence.enabled: true` + - Enabled when `statefulSet: true` + - Required when `persistence.enabled: true` - Each pod receives a dedicated `PersistentVolumeClaim` for RocksDB state This allows choosing between elasticity (Deployment) and stronger data locality guarantees (StatefulSet). From 7b9383387b294d11159fd15be559fe13e890c24f Mon Sep 17 00:00:00 2001 From: vostres Date: Wed, 28 Jan 2026 20:22:09 +0100 Subject: [PATCH 5/9] Wip --- docs/docs/user/concepts/streams.md | 13 +------------ 1 file changed, 1 insertion(+), 12 deletions(-) diff --git a/docs/docs/user/concepts/streams.md b/docs/docs/user/concepts/streams.md index 259a03da6..35b81b3c3 100644 --- a/docs/docs/user/concepts/streams.md +++ b/docs/docs/user/concepts/streams.md @@ -97,18 +97,7 @@ Streams applications support flexible topic configuration: ### Kafka properties -Additional Kafka Streams configuration can be supplied using: - -- `--kafka-config ` - -The framework applies the following defaults: - -```text -processing.guarantee=exactly_once_v2 -producer.max.in.flight.requests.per.connection=1 -producer.acks=all -producer.compression.type=gzip -``` +TODO --- From c39e9f16f37335a762fdfeedeb13176ad778aa81 Mon Sep 17 00:00:00 2001 From: vostres Date: Fri, 6 Feb 2026 11:55:55 +0100 Subject: [PATCH 6/9] Wip --- docs/docs/user/concepts/common.md | 16 +++++++-------- docs/docs/user/testing.md | 34 ------------------------------- 2 files changed, 7 insertions(+), 43 deletions(-) diff --git a/docs/docs/user/concepts/common.md b/docs/docs/user/concepts/common.md index a37d8eb7f..726d25f67 100644 --- a/docs/docs/user/concepts/common.md +++ b/docs/docs/user/concepts/common.md @@ -105,15 +105,13 @@ The framework provides a built-in mechanism to clean up all resources associated When the cleanup operation is triggered, the following resources are removed: -**TODO:** extend the table for new consumer apps - -| Resource Type | Description | Streams Apps | Producer Apps | -|---------------------|-----------------------------------------------------------|--------------|---------------| -| Output Topics | The main output topic of the application | ✓ | ✓ | -| Intermediate Topics | Topics for stream operations like `through()` | ✓ | N/A | -| Internal Topics | Topics for state stores or repartitioning (Kafka Streams) | ✓ | N/A | -| Consumer Groups | Consumer group metadata | ✓ | N/A | -| Schema Registry | All registered schemas | ✓ | ✓ | +| Resource Type | Description | Streams Apps | Producer Apps | Consumer Apps | Consumer-Producer Apps | +|---------------------|-----------------------------------------------------------|--------------|---------------|---------------|------------------------| +| Output Topics | The main output topic of the application | ✓ | ✓ | N/A | ✓ | +| Intermediate Topics | Topics for stream operations like `through()` | ✓ | N/A | N/A | N/A | +| Internal Topics | Topics for state stores or repartitioning (Kafka Streams) | ✓ | N/A | N/A | N/A | +| Consumer Groups | Consumer group metadata | ✓ | N/A | ✓ | ✓ | +| Schema Registry | All registered schemas | ✓ | ✓ | ✓ | ✓ | Cleanup can be triggered: diff --git a/docs/docs/user/testing.md b/docs/docs/user/testing.md index b7c9ef595..46b9d37d6 100644 --- a/docs/docs/user/testing.md +++ b/docs/docs/user/testing.md @@ -31,40 +31,6 @@ Provides tools to: The framework integrates with `fluent-kafka-streams-tests` for unit testing Kafka Streams topologies.``` -## Integration Testing with TestContainers - -For integration tests that require a real Kafka environment, the framework provides integration with TestContainers. - -### Single Node Kafka Testing - -`KafkaTest` provides a base class for integration tests with a single Kafka broker. - -### Multi-Node Cluster Testing - -For testing with multi-node Kafka clusters, the framework provides `ApacheKafkaContainerCluster`: - -Example usage: - -```java -ApacheKafkaContainerCluster cluster = new ApacheKafkaContainerCluster("3.4.0", 3, 2); -cluster. - -start(); - -String bootstrapServers = this.cluster.getBootstrapServers(); -// Run tests... -cluster. - -stop(); -``` - -### Features: - -- Configurable broker count -- Configurable replication factor for internal topics -- Uses KRaft (no ZooKeeper) -- Waits for all brokers to be ready before returning - ## Utilities for Kafka Testing ### KafkaTestClient Operations From 7433b28f53e287c8d9f52d95518a04baa73a7231 Mon Sep 17 00:00:00 2001 From: vostres Date: Fri, 6 Feb 2026 12:06:51 +0100 Subject: [PATCH 7/9] Remove "framework" --- docs/docs/user/concepts/common.md | 12 ++++++------ docs/docs/user/concepts/streams.md | 4 ++-- docs/docs/user/deployment/kubernetes.md | 4 ++-- docs/docs/user/extensions/large-messages.md | 2 +- docs/docs/user/getting-started/quick-start.md | 2 +- docs/docs/user/monitoring.md | 2 +- docs/docs/user/testing.md | 10 +++++----- 7 files changed, 18 insertions(+), 18 deletions(-) diff --git a/docs/docs/user/concepts/common.md b/docs/docs/user/concepts/common.md index 726d25f67..b2634d8d4 100644 --- a/docs/docs/user/concepts/common.md +++ b/docs/docs/user/concepts/common.md @@ -52,7 +52,7 @@ It can create: ### Usage Pattern 1. You implement an **App**. -2. The framework wraps it in a **ConfiguredApp**, applying the configuration. +2. The system wraps it in a **ConfiguredApp**, applying the configuration. 3. Runtime configuration is then applied to create an **ExecutableApp**, which can be: - **run**, or @@ -64,7 +64,7 @@ It can create: Applications built with streams-bootstrap follow a defined lifecycle with specific states and transitions. -The framework manages this lifecycle through the KafkaApplication base class and provides several extension points for +The lifecycle is managed through the KafkaApplication base class and provides several extension points for customization. | Phase | Description | Entry Point | @@ -101,7 +101,7 @@ Applications built with streams-bootstrap can be started in two primary ways: ### Cleaning an application -The framework provides a built-in mechanism to clean up all resources associated with an application. +A built-in mechanism is provided to clean up all resources associated with an application. When the cleanup operation is triggered, the following resources are removed: @@ -135,7 +135,7 @@ clean(); } ``` -The framework ensures that cleanup operations are idempotent, meaning they can be safely retried without causing +Cleanup operations are idempotent, meaning they can be safely retried without causing additional issues. ## Configuration @@ -149,7 +149,7 @@ Kafka properties are applied in the following order (later values override earli 5. Serialization config from ProducerApp.defaultSerializationConfig() or StreamsApp.defaultSerializationConfig() 6. CLI overrides via --kafka-config -The framework automatically parses environment variables with the `APP_ prefix` (configurable via `ENV_PREFIX`). +Environment variables with the `APP_ prefix` (configurable via `ENV_PREFIX`) are automatically parsed. Environment variables are converted to CLI arguments: ```text @@ -171,7 +171,7 @@ When the `--schema-registry-url` option is provided: ## Command line interface -The framework provides a unified command-line interface for application configuration. +A unified command-line interface is provided for application configuration. ### CLI Commands diff --git a/docs/docs/user/concepts/streams.md b/docs/docs/user/concepts/streams.md index 35b81b3c3..90c7e9496 100644 --- a/docs/docs/user/concepts/streams.md +++ b/docs/docs/user/concepts/streams.md @@ -18,7 +18,7 @@ public static void main(final String[] args) { } ``` -When an application is started, the framework performs the following steps: +When an application is started, the following steps are performed: - Parse command-line arguments and environment variables - Create a `StreamsApp` instance @@ -166,7 +166,7 @@ TODO ## Kafka Streams extensions -The framework provides several extensions that simplify working with Kafka Streams. +Several extensions are provided that simplify working with Kafka Streams. ### Simple topic access diff --git a/docs/docs/user/deployment/kubernetes.md b/docs/docs/user/deployment/kubernetes.md index c66bf3096..c568ac2da 100644 --- a/docs/docs/user/deployment/kubernetes.md +++ b/docs/docs/user/deployment/kubernetes.md @@ -1,6 +1,6 @@ # Deployment to Kubernetes -The `streams-bootstrap` framework provides support for deploying applications to Kubernetes using Helm +`streams-bootstrap` provides support for deploying applications to Kubernetes using Helm charts. The charts cover Kafka Streams, producer, consumer, and producer-consumer applications and offer standardized solutions for autoscaling, monitoring, and state persistence. @@ -16,7 +16,7 @@ solutions for autoscaling, monitoring, and state persistence. ## Helm charts -The framework ships a set of Helm charts tailored to different application types: +A set of Helm charts is shipped, tailored to different application types: | Chart name | Purpose | Kubernetes workload types | |------------------------|-----------------------------------------------|--------------------------------| diff --git a/docs/docs/user/extensions/large-messages.md b/docs/docs/user/extensions/large-messages.md index a40db4877..6bb2762ed 100644 --- a/docs/docs/user/extensions/large-messages.md +++ b/docs/docs/user/extensions/large-messages.md @@ -4,7 +4,7 @@ The **Large Messages Extension** adds support for handling messages that exceed Kafka's size limitations by using external storage mechanisms with automatic cleanup. -It integrates with the *streams-bootstrap* framework to transparently manage: +It integrates with *streams-bootstrap* to transparently manage: - large message serialization - large message deserialization diff --git a/docs/docs/user/getting-started/quick-start.md b/docs/docs/user/getting-started/quick-start.md index 948d22f94..129c3098c 100644 --- a/docs/docs/user/getting-started/quick-start.md +++ b/docs/docs/user/getting-started/quick-start.md @@ -71,7 +71,7 @@ Additional subcommands such as `clean` and `reset` are available for lifecycle m ### From the `main` Method -In the `main` method, the application subclass starts the framework via: +In the `main` method, the application subclass starts up via: ```java public static void main(final String[] args) { diff --git a/docs/docs/user/monitoring.md b/docs/docs/user/monitoring.md index aae1a8728..94e83e4a3 100644 --- a/docs/docs/user/monitoring.md +++ b/docs/docs/user/monitoring.md @@ -1,6 +1,6 @@ # Monitoring -The framework provides features for monitoring your applications. +Monitoring features are provided for your applications. - **JMX Metrics Export**: Applications built with `streams-bootstrap` can expose JMX (Java Management Extensions) metrics, which provide insights into the performance and health of the Java application and the Kafka clients. diff --git a/docs/docs/user/testing.md b/docs/docs/user/testing.md index 46b9d37d6..7d394ab16 100644 --- a/docs/docs/user/testing.md +++ b/docs/docs/user/testing.md @@ -1,10 +1,10 @@ # Testing -The `streams-bootstrap` Testing Framework provides a comprehensive set of tools for testing Kafka Streams and Producer -applications. This framework simplifies both unit and integration testing by providing test abstractions that handle +The `streams-bootstrap` Testing tools provide a comprehensive set of utilities for testing Kafka Streams and Producer +applications. These tools simplify both unit and integration testing by providing test abstractions that handle Kafka infrastructure setup, Schema Registry integration, and consumer group verification. -The framework supports testing with real Kafka clusters using TestContainers, mock Schema Registry for schema-aware +Testing is supported with real Kafka clusters using TestContainers, mock Schema Registry for schema-aware testing, and utilities for verifying application behavior and consumer group states. ## Core Testing Components @@ -29,7 +29,7 @@ Provides tools to: ## Unit Testing with `fluent-kafka-streams-tests` -The framework integrates with `fluent-kafka-streams-tests` for unit testing Kafka Streams topologies.``` +Integration is provided with `fluent-kafka-streams-tests` for unit testing Kafka Streams topologies.``` ## Utilities for Kafka Testing @@ -111,7 +111,7 @@ ConsumerGroupClient consumerGroupClient = this.admin.getConsumerGroupClient(); ## Consumer Group Verification -The framework provides utilities for verifying consumer group states: +The tools provide utilities for verifying consumer group states: ```java // Wait for application to become active From 27b14d91bd431039e39b28bba8611d31d1845712 Mon Sep 17 00:00:00 2001 From: vostres Date: Fri, 6 Feb 2026 14:45:59 +0100 Subject: [PATCH 8/9] Wip --- docs/docs/user/concepts/producer.md | 57 +++++++++++++++--- docs/docs/user/concepts/streams.md | 90 ++++++++++++++++++++++++++++- 2 files changed, 136 insertions(+), 11 deletions(-) diff --git a/docs/docs/user/concepts/producer.md b/docs/docs/user/concepts/producer.md index cc4163744..a2d644f5f 100644 --- a/docs/docs/user/concepts/producer.md +++ b/docs/docs/user/concepts/producer.md @@ -61,14 +61,22 @@ public SerializerConfig defaultSerializationConfig() { ### Kafka properties -Producer-specific Kafka configuration can be customized by overriding `createKafkaProperties()`: +#### Base configuration -```java +The following Kafka properties are configured by default for Producer applications in streams-bootstrap: + +- `max.in.flight.requests.per.connection = 1` +- `acks = all` +- `compression.type = gzip` + +#### Custom Kafka properties + +Kafka configuration can be customized by overriding `createKafkaProperties()`: +```java @Override public Map createKafkaProperties() { return Map.of( - ProducerConfig.ACKS_CONFIG, "all", ProducerConfig.RETRIES_CONFIG, 3, ProducerConfig.BATCH_SIZE_CONFIG, 16384, ProducerConfig.LINGER_MS_CONFIG, 5 @@ -82,6 +90,11 @@ These properties are merged with defaults and CLI-provided configuration. ### Lifecycle hooks +Producer applications can register cleanup logic via `setupCleanUp`. This method allows you to attach: + +- **Cleanup hooks** – for general cleanup logic not tied to Kafka topics +- **Topic hooks** – for reacting to topic lifecycle events (e.g. deletion) + #### Clean up Custom cleanup logic that is not tied to Kafka topics can be registered via cleanup hooks: @@ -89,16 +102,42 @@ Custom cleanup logic that is not tied to Kafka topics can be registered via clea ```java @Override -public void setupCleanUp(final EffectiveAppConfiguration configuration) { - configuration.addCleanupHook(() -> { - // Custom cleanup logic - }); +public ProducerCleanUpConfiguration setupCleanUp( + final AppConfiguration configuration) { + + return ProducerApp.super.setupCleanUp(configuration) + .registerCleanHook(() -> { + // Custom cleanup logic + }); } ``` -Topic-related cleanup should be implemented using topic hooks. +#### Topic hooks ---- +Topic hooks should be used for topic-related cleanup or side effects, such as releasing external +resources associated with a topic or logging topic deletions: + +```java +@Override +public ProducerCleanUpConfiguration setupCleanUp( + final AppConfiguration configuration) { + + return ProducerApp.super.setupCleanUp(configuration) + .registerTopicHook(new TopicHook() { + + @Override + public void deleted(final String topic) { + // Called when a managed topic is deleted + System.out.println("Deleted topic: " + topic); + } + + @Override + public void close() { + // Optional cleanup for the hook itself + } + }); +} +``` ## Command line interface diff --git a/docs/docs/user/concepts/streams.md b/docs/docs/user/concepts/streams.md index 90c7e9496..cbeb228fa 100644 --- a/docs/docs/user/concepts/streams.md +++ b/docs/docs/user/concepts/streams.md @@ -97,19 +97,105 @@ Streams applications support flexible topic configuration: ### Kafka properties -TODO +#### Base configuration + +The following Kafka properties are configured by default for Streams +applications in streams-bootstrap: + +- `processing.guarantee = exactly_once_v2` +- `producer.max.in.flight.requests.per.connection = 1` +- `producer.acks = all` +- `producer.compression.type = gzip` + +#### Custom Kafka properties + +Kafka configuration can be customized by overriding `createKafkaProperties()`: + +```java +@Override +public Map createKafkaProperties() { + return Map.of( + StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2, + StreamsConfig.NUM_STREAM_THREADS_CONFIG, 4, + StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler.class.getName() + ); +} +``` --- ### Lifecycle hooks +Streams applications support the following hook types: + +- **Cleanup hooks** – for general cleanup logic not tied to Kafka topics +- **Topic hooks** – for reacting to topic lifecycle events (e.g. deletion) +- **Reset hooks** – for logic that should run only during an application reset +- #### Setup TODO #### Clean up -TODO +Use cleanup hooks for logic that is not tied to Kafka topics, such as closing external resources +or cleaning up temporary state. + +```java +@Override +public StreamsCleanUpConfiguration setupCleanUp( + final AppConfiguration configuration) { + + return StreamsApp.super.setupCleanUp(configuration) + .registerCleanHook(() -> { + // Custom cleanup logic + }); +} +``` + +#### Topic hooks + +Topic hooks allow Kafka Streams applications to react to Kafka topic lifecycle events, such as topic +deletion during `clean` or `reset` operations. + +```java +@Override +public StreamsCleanUpConfiguration setupCleanUp( + final AppConfiguration configuration) { + + return StreamsApp.super.setupCleanUp(configuration) + .registerTopicHook(new TopicHook() { + @Override + public void deleted(final String topic) { + // Called when a managed topic is deleted + System.out.println("Deleted topic: " + topic); + } + @Override + public void close() { + // Optional cleanup for the hook itself + } + }); +} +``` +##### Reset hooks + +Reset hooks allow Kafka Streams applications to execute custom logic only during a reset operation. They are not invoked during a regular clean. + +--- + +###### Example: reset hook registration (Streams) + +```java +@Override +public StreamsCleanUpConfiguration setupCleanUp( + final AppConfiguration configuration) { + + return StreamsApp.super.setupCleanUp(configuration) + .registerResetHook(() -> { + // Custom logic executed only during reset + }); +} +``` --- From f234d690aff74825ed54c3249a118b0a745dc76e Mon Sep 17 00:00:00 2001 From: vostres Date: Mon, 9 Feb 2026 08:58:01 +0100 Subject: [PATCH 9/9] Wip --- docs/docs/user/concepts/producer.md | 16 +- docs/docs/user/concepts/streams.md | 4 +- docs/docs/user/deployment/kubernetes.md | 53 +-- docs/docs/user/extensions/large-messages.md | 2 +- docs/docs/user/testing.md | 348 +++++++++++++++----- 5 files changed, 299 insertions(+), 124 deletions(-) diff --git a/docs/docs/user/concepts/producer.md b/docs/docs/user/concepts/producer.md index a2d644f5f..fa8d04af3 100644 --- a/docs/docs/user/concepts/producer.md +++ b/docs/docs/user/concepts/producer.md @@ -141,16 +141,12 @@ public ProducerCleanUpConfiguration setupCleanUp( ## Command line interface -Producer applications inherit standard CLI options from `KafkaApplication`. - -```text ---bootstrap-servers Kafka bootstrap servers (comma-separated) (Required) ---bootstrap-server Alias for --bootstrap-servers (Required) ---schema-registry-url URL of the Schema Registry (Optional) ---kafka-config Additional Kafka config (key=value,...) (Optional) ---output-topic Default output topic (Optional) ---labeled-output-topics Named output topics (label1=topic1,...) (Optional) -``` +Producer applications inherit standard CLI options from `KafkaApplication`. The following CLI options are producer-specific: + +| Option | Description | Default | +|---------------------------|--------------------------------------------------|---------| +| `--output-topic` | Default output topic | - | +| `--labeled-output-topics` | Named output topics (`label1=topic1,...`) | - | --- diff --git a/docs/docs/user/concepts/streams.md b/docs/docs/user/concepts/streams.md index cbeb228fa..7a601e900 100644 --- a/docs/docs/user/concepts/streams.md +++ b/docs/docs/user/concepts/streams.md @@ -233,12 +233,10 @@ TODO ## Command line interface -Streams applications inherit standard CLI options from `KafkaStreamsApplication`. +Streams applications inherit standard CLI options from `KafkaStreamsApplication`. The following CLI options are streams-app-specific: | Option | Description | Default | |--------------------------------|-------------------------------------------|----------------| -| `--bootstrap-servers` | Kafka bootstrap servers (comma-separated) | Required | -| `--schema-registry-url` | URL of the Schema Registry | None | | `--application-id` | Kafka Streams application ID | Auto-generated | | `--volatile-group-instance-id` | Use volatile group instance ID | false | diff --git a/docs/docs/user/deployment/kubernetes.md b/docs/docs/user/deployment/kubernetes.md index c568ac2da..68d22c88d 100644 --- a/docs/docs/user/deployment/kubernetes.md +++ b/docs/docs/user/deployment/kubernetes.md @@ -47,9 +47,9 @@ helm install my-app bakdata-common/streams-app --values my-values.yaml ## Deployment patterns -### Streams applications (`streams-app`) +### Streams, consumer and consumer–producer applications -Streams applications support both stateless and stateful deployment modes: +Streams, consumer and consumer–producer applications support both stateless and stateful deployment modes: - **Deployment** - Used for stateless applications or when state is stored externally @@ -61,31 +61,27 @@ Streams applications support both stateless and stateful deployment modes: - Required when `persistence.enabled: true` - Each pod receives a dedicated `PersistentVolumeClaim` for RocksDB state -This allows choosing between elasticity (Deployment) and stronger data locality guarantees (StatefulSet). - --- -### Producer applications (`producer-app`) - -Producer applications support multiple execution models: +### Producer applications -| Mode | Use case | Resource type | -|------------|-------------------------------------|----------------------| -| Deployment | Long-running or continuous producer | `apps/v1/Deployment` | -| Job | One-time run or backfill | `batch/v1/Job` | -| CronJob | Scheduled periodic execution | `batch/v1/CronJob` | +Producer applications support multiple execution modes depending on workload characteristics: ---- +- **Deployment** + - Used for long-running or continuous producers + - Enabled when `deployment: true` + - Supports horizontal scaling via `replicaCount` -### Consumer and consumer–producer applications +- **Job** + - Used for one-time runs or backfills + - Default when `deployment: false` and no `schedule` is provided + - Supports `restartPolicy`, `backoffLimit`, and `ttlSecondsAfterFinished` -- **`consumer-app`** - - Deployed as a `Deployment` - - Uses Kafka consumer groups for parallel consumption +- **CronJob** + - Used for scheduled, periodic execution + - Enabled when a cron expression is provided via `schedule` + - Supports `suspend`, `successfulJobsHistoryLimit`, and `failedJobsHistoryLimit` -- **`consumerproducer-app`** - - Deployed as a `Deployment` - - Typically used for batch-style read–process–write workloads --- @@ -104,22 +100,7 @@ This ensures a clean starting point for reprocessing or redeployment scenarios. --- ## Configuration structure - -All charts share a common configuration structure, with chart-specific extensions: - -| Section | Purpose | Examples | -|-------------------------|-------------------------------------------|-----------------------------------| -| `image`, `imageTag` | Container image configuration | `streamsApp`, `latest` | -| `kafka.*` | Kafka connection and topic configuration | `bootstrapServers`, `inputTopics` | -| `commandLine.*` | CLI arguments passed to the application | `MY_PARAM: "value"` | -| `env.*` | Additional environment variables | `MY_ENV: foo` | -| `secrets.*` | Inline secret values | Tokens, passwords | -| `secretRefs.*` | References to existing `Secret` objects | External credentials | -| `resources.*` | CPU and memory requests/limits | `requests.cpu: 200m` | -| `autoscaling.*` | KEDA autoscaling configuration | `lagThreshold`, `minReplicas` | -| `persistence.*` | Streams state-store persistence | `enabled: true`, `size: 1Gi` | -| `jmx.*`, `prometheus.*` | JMX exporter and Prometheus configuration | `jmx.enabled: true` | -| `statefulSet` | Toggle `StatefulSet` vs `Deployment` | `true` / `false` | +TODO --- diff --git a/docs/docs/user/extensions/large-messages.md b/docs/docs/user/extensions/large-messages.md index 6bb2762ed..19beffc3d 100644 --- a/docs/docs/user/extensions/large-messages.md +++ b/docs/docs/user/extensions/large-messages.md @@ -2,7 +2,7 @@ ## Overview -The **Large Messages Extension** adds support for handling messages that exceed Kafka's size limitations by using +The Large Messages extension adds support for handling messages that exceed Kafka's size limitations by using external storage mechanisms with automatic cleanup. It integrates with *streams-bootstrap* to transparently manage: diff --git a/docs/docs/user/testing.md b/docs/docs/user/testing.md index 7d394ab16..2608cc125 100644 --- a/docs/docs/user/testing.md +++ b/docs/docs/user/testing.md @@ -1,127 +1,327 @@ # Testing -The `streams-bootstrap` Testing tools provide a comprehensive set of utilities for testing Kafka Streams and Producer -applications. These tools simplify both unit and integration testing by providing test abstractions that handle -Kafka infrastructure setup, Schema Registry integration, and consumer group verification. +The `streams-bootstrap` testing tools provide utilities for testing Kafka Streams, Consumer, Producer and Consumer-Producer +applications, covering both unit-level and integration-style scenarios. -Testing is supported with real Kafka clusters using TestContainers, mock Schema Registry for schema-aware -testing, and utilities for verifying application behavior and consumer group states. +They abstract common test concerns such as Kafka infrastructure setup, Schema Registry integration, application +lifecycle handling, and consumer group verification, and are designed to work with real Kafka clusters as well as +schema-aware test environments. -## Core Testing Components +## TestApplicationRunner -### KafkaTestClient +`TestApplicationRunner` is a test utility for running, configuring, and verifying Kafka applications in integration and system tests. -`KafkaTestClient` is a fluent test client that simplifies: +It abstracts away repetitive setup such as: +- bootstrap servers +- Schema Registry +- Kafka client configuration +- CLI argument wiring +- lifecycle commands (`run`, `clean`, `reset`) -- Producing data -- Consuming records -- Admin operations -- Topic creation and verification +Typical use cases: +- end-to-end tests +- containerized test environments +- embedded Kafka setups +- CI pipelines -### ConsumerGroupVerifier +## Typical Usage -Provides tools to: +```java +TestApplicationRunner runner = + TestApplicationRunner.create("localhost:9092") + .withSchemaRegistry() + .withStateDir(tempDir) + .withNoStateStoreCaching(); +``` + +All applications executed via this runner automatically inherit this configuration. + +Bootstrap Servers +- Passed via `--bootstrap-servers` +- Also set directly on the application instance -- Check if a group is active or closed -- Get current group state -- Verify processing completion (lag = 0) -- Compute lag manually +Kafka Configuration +- All provided Kafka properties are injected +- Passed via `--kafka-config key=value` +- Also merged into `app.setKafkaConfig(...)` -## Unit Testing with `fluent-kafka-streams-tests` +Schema Registry (optional) +- Passed via `--schema-registry-url` +- Only configured when explicitly enabled -Integration is provided with `fluent-kafka-streams-tests` for unit testing Kafka Streams topologies.``` +--- -## Utilities for Kafka Testing +### Configuring Kafka for Tests + +```java +runner = runner.withKafkaConfig(Map.of("auto.offset.reset", "earliest")); +``` -### KafkaTestClient Operations +Behavior: +- merged with existing configuration +- immutable after creation +- overrides application defaults -`KafkaTestClient` provides a fluent API for common Kafka operations in tests: +--- -#### Topic Management +#### Kafka Streams–Specific Helpers + +##### Configure State Directory ```java -KafkaTestClient client = newTestClient(); +runner = runner.withStateDir(tempDir); +``` -// Create topic with default settings (1 partition, 1 replica) -client. +Sets: +``` +state.dir = +``` -createTopic("my-topic"); +Use this to: +- isolate test runs +- avoid state leakage between tests -// Create topic with custom settings -client. +--- -createTopic("my-topic", - KafkaTestClient.defaultTopicSettings() - . +##### Disable State Store Caching -partitions(3) - . +```java +runner = runner.withNoStateStoreCaching(); +``` + +Sets: +``` +statestore.cache.max.bytes = 0 +``` + +Useful when: +- asserting exact record counts +- debugging processor behavior +- avoiding cache-related timing issues + +--- + +#### Consumer-Specific Helpers + +##### Configure Session Timeout + +```java +runner = runner.withSessionTimeout(Duration.ofSeconds(5)); +``` + +Sets: +``` +session.timeout.ms = 5000 +``` -replicationFactor(1) - . +Useful for: +- fast consumer group rebalancing +- deterministic failure testing -build()); +--- -// Create topic with config -Map config = Map.of("cleanup.policy", "compact"); -client. +### Schema Registry Support -createTopic("my-topic",settings, config); +#### Enable a Test Schema Registry -// Check if topic exists -boolean exists = this.client.existsTopic("my-topic"); +```java +runner = runner.withSchemaRegistry(); ``` -#### Data Production +Creates: +- isolated in-memory Schema Registry +- random scope to avoid collisions +- transparent integration for the application + +--- + +#### Use a Custom TestSchemaRegistry ```java -client.send() - . +TestSchemaRegistry registry = new TestSchemaRegistry(); +runner = runner.withSchemaRegistry(registry); +``` + +Use this when: +- sharing schemas across applications +- inspecting registered schemas during tests + +--- -withKeySerializer(new StringSerializer()) - . +### Running Applications -withValueSerializer(new StringSerializer()) +#### CLI -to("topic-name",List.of( - new SimpleProducerRecord<>("key1","value1"), - new SimpleProducerRecord<>("key2","value2") - )); +```java +CompletableFuture exitCode = + runner.run(app, "--some-flag"); ``` -#### Data Consumption +- invokes `startApplicationWithoutExit` +- returns application exit code + +--- + +#### Runnable ```java -List> records = client.read() - .withKeyDeserializer(new StringDeserializer()) - .withValueDeserializer(new StringDeserializer()) - .from("topic-name", Duration.ofSeconds(10)); +CompletableFuture execution = runner.run(app); ``` -## Administrative Operations +- calls `onApplicationStart()` +- runs application directly +- suitable for long-running tests -`KafkaTestClient` provides access to administrative operations through `AdminClientX`: +--- + +### Cleaning and Resetting Applications + +#### Clean ```java -try(AdminClientX admin = client.admin()){ -TopicClient topicClient = this.admin.getTopicClient(); -ConsumerGroupClient consumerGroupClient = this.admin.getConsumerGroupClient(); -} +runner.clean(app); ``` -## Consumer Group Verification +or -The tools provide utilities for verifying consumer group states: +```java +runner.clean(app, "--custom-arg"); +``` + +Used to: +- delete Kafka topics +- clean local state +- execute cleanup hooks + +--- + +#### Reset + +Supported for: +- Streams applications +- Consumer applications +- Consumer–Producer applications ```java -// Wait for application to become active -awaitActive(app); +runner.reset(streamsApp); +``` -// Wait for completion of processing -awaitProcessing(app); +--- -// Wait for app to shut down -awaitClosed(app); +### Consumer Group Verification + +```java +ConsumerGroupVerifier verifier = runner.verify(streamsApp); ``` -These methods ensure test reliability by validating consumer group behavior via `ConsumerGroupVerifier`. +Allows you to: +- assert consumer group existence +- check stability +- inspect committed offsets + +--- + +### Creating Test Clients + +```java +KafkaTestClient client = runner.newTestClient(); +``` + +Provides: +- AdminClient access +- Producer/Consumer helpers +- runtime-aware configuration + +--- + +## TestApplicationTopologyFactory + +`TestApplicationTopologyFactory` is a test helper for Kafka Streams applications that integrates with Fluent Kafka Streams Tests. + +It allows you to: +- derive a `TestTopology` from a real application +- reuse production topology and configuration +- inject test-specific runtime settings + +--- + +### Typical Usage + +```java +TestApplicationTopologyFactory factory = + TestApplicationTopologyFactory.withSchemaRegistry(); +``` + +or without Schema Registry: + +```java +TestApplicationTopologyFactory factory = new TestApplicationTopologyFactory(); +``` + +--- + +### Schema Registry Support + +#### Automatic Schema Registry + +```java +TestApplicationTopologyFactory factory = + TestApplicationTopologyFactory.withSchemaRegistry(); +``` + +- random isolated scope +- no cross-test collisions +- safe for parallel execution + +--- + +#### Custom Schema Registry + +```java +TestSchemaRegistry registry = new TestSchemaRegistry(); +TestApplicationTopologyFactory factory = + TestApplicationTopologyFactory.withSchemaRegistry(registry); +``` + +--- + +### Modifying Kafka Configuration + +```java +factory = factory.with(Map.of("commit.interval.ms", 100)); +``` + +- merged into runtime configuration +- applies only to tests +- does not mutate application + +--- + +### Creating a TestTopology + +```java +TestTopology topology = factory.createTopology(app); +``` + +Execution flow: +1. application prepared +2. runtime configuration injected +3. topology extracted +4. `TestTopology` created + +--- + +### JUnit 5 Integration + +```java +TestTopologyExtension extension = factory.createTopologyExtension(app); +``` + +--- + +### Accessing Kafka Properties + +```java +Map props = factory.getKafkaProperties(app); +``` +---