diff --git a/docs/docs/index.md b/docs/docs/index.md index bc17b4db1..e8fea159d 100644 --- a/docs/docs/index.md +++ b/docs/docs/index.md @@ -1 +1,41 @@ # What is streams-bootstrap? + +`streams-bootstrap` is a Java library that standardizes the development and operation of Kafka-based applications (Kafka +Streams and plain Kafka clients). + +The framework supports Apache Kafka 4.1 and Java 17. Its modules are published to Maven Central for straightforward +integration into existing projects. + +## Why use it? + +Kafka Streams and the core Kafka clients provide strong primitives for stream processing and messaging, but they do not +prescribe: + +- How to structure a full application around those primitives +- How to configure applications consistently +- How to deploy and operate these services on Kubernetes +- How to perform repeatable reprocessing and cleanup +- How to handle errors and large messages uniformly + +`streams-bootstrap` addresses these aspects by supplying: + +1. **Standardized base classes** for Kafka Streams and client applications. +2. **A common CLI/configuration contract** for all Kafka applications. +3. **Helm-based deployment templates** and conventions for Kubernetes. +4. **Built-in reset/clean workflows** for reprocessing and state management. +5. **Consistent error-handling** and dead-letter integration. +6. **Testing infrastructure** for local development and CI environments. +7. **Optional blob-storage-backed serialization** for large messages. + +## Architecture + +The framework uses a modular architecture with a clear separation of concerns. + +### Core Modules + +- `streams-bootstrap-core`: Core abstractions for application lifecycle, execution, and cleanup +- `streams-bootstrap-cli`: CLI framework based on `picocli` +- `streams-bootstrap-test`: Utilities for testing streams-bootstrap applications +- `streams-bootstrap-large-messages`: Support for handling large Kafka messages +- `streams-bootstrap-cli-test`: Test support for CLI-based applications + diff --git a/docs/docs/user/concepts/common.md b/docs/docs/user/concepts/common.md index 4361c36f3..b2634d8d4 100644 --- a/docs/docs/user/concepts/common.md +++ b/docs/docs/user/concepts/common.md @@ -2,15 +2,185 @@ ## Application types -- App -- ConfiguredApp -- ExecutableApp +In streams-bootstrap, there are three application types: + +- **App** +- **ConfiguredApp** +- **ExecutableApp** + +--- + +### App + +The **App** represents your application logic implementation. Each application type has its own `App` interface: + +- **StreamsApp** – for Kafka Streams applications +- **ProducerApp** – for producer applications +- **ConsumerApp** – for consumer applications +- **ConsumerProducerApp** – for consumer–producer applications + +You implement the appropriate interface to define your application's behavior. + +--- + +### ConfiguredApp + +A **ConfiguredApp** pairs an `App` with its configuration. Examples include: + +- `ConfiguredConsumerApp` +- `ConfiguredProducerApp` + +This layer handles Kafka property creation, combining: + +- base configuration +- app-specific configuration +- environment variables +- runtime configuration + +--- + +### ExecutableApp + +An **ExecutableApp** is a `ConfiguredApp` with runtime configuration applied, making it ready to execute. +It can create: + +- a **Runner** for running the application +- a **CleanUpRunner** for cleanup operations + +--- + +### Usage Pattern + +1. You implement an **App**. +2. The system wraps it in a **ConfiguredApp**, applying the configuration. +3. Runtime configuration is then applied to create an **ExecutableApp**, which can be: + +- **run**, or +- **cleaned up**. + +--- ## Application lifecycle -- Running an application -- Cleaning an application +Applications built with streams-bootstrap follow a defined lifecycle with specific states and transitions. + +The lifecycle is managed through the KafkaApplication base class and provides several extension points for +customization. + +| Phase | Description | Entry Point | +|----------------|--------------------------------------------------------------------------|----------------------------------------------------------| +| Initialization | Parse CLI arguments, inject environment variables, configure application | `startApplication()` or `startApplicationWithoutExit()` | +| Preparation | Execute pre-run/pre-clean hooks | `onApplicationStart()`, `prepareRun()`, `prepareClean()` | +| Execution | Run main application logic or cleanup operations | `run()`, `clean()`, `reset()` | +| Shutdown | Stop runners, close resources, cleanup | `stop()`, `close()` | + +### Running an application + +Applications built with streams-bootstrap can be started in two primary ways: + +- **Via Command Line Interface**: When packaged as a runnable JAR (for example, in a container), + the `run` command is the default entrypoint. An example invocation: + + ```bash + java -jar example-app.jar \ + run \ + --bootstrap-servers kafka:9092 \ + --input-topics input-topic \ + --output-topic output-topic \ + --schema-registry-url http://schema-registry:8081 + ``` + +- **Programmatically**: The application subclass calls `startApplication(args)` on startup. Example for a Kafka Streams + application: + +```java + public static void main(final String[] args) { + new MyStreamsApplication().startApplication(args); +} +``` + +### Cleaning an application + +A built-in mechanism is provided to clean up all resources associated with an application. + +When the cleanup operation is triggered, the following resources are removed: + +| Resource Type | Description | Streams Apps | Producer Apps | Consumer Apps | Consumer-Producer Apps | +|---------------------|-----------------------------------------------------------|--------------|---------------|---------------|------------------------| +| Output Topics | The main output topic of the application | ✓ | ✓ | N/A | ✓ | +| Intermediate Topics | Topics for stream operations like `through()` | ✓ | N/A | N/A | N/A | +| Internal Topics | Topics for state stores or repartitioning (Kafka Streams) | ✓ | N/A | N/A | N/A | +| Consumer Groups | Consumer group metadata | ✓ | N/A | ✓ | ✓ | +| Schema Registry | All registered schemas | ✓ | ✓ | ✓ | ✓ | + +Cleanup can be triggered: + +- **Via Command Line**: Helm cleanup jobs +- **Programmatically**: + +```java +// For streams applications +try(StreamsCleanUpRunner cleanUpRunner = streamsApp.createCleanUpRunner()){ + cleanUpRunner. + +clean(); +} + +// For producer applications + try( +CleanUpRunner cleanUpRunner = producerApp.createCleanUpRunner()){ + cleanUpRunner. + +clean(); +} +``` + +Cleanup operations are idempotent, meaning they can be safely retried without causing +additional issues. ## Configuration +Kafka properties are applied in the following order (later values override earlier ones): + +1. Base configuration +2. App config from .createKafkaProperties() +3. Environment variables (`KAFKA_`) +4. Runtime args (--bootstrap-servers, etc.) +5. Serialization config from ProducerApp.defaultSerializationConfig() or StreamsApp.defaultSerializationConfig() +6. CLI overrides via --kafka-config + +Environment variables with the `APP_ prefix` (configurable via `ENV_PREFIX`) are automatically parsed. +Environment variables are converted to CLI arguments: + +```text +APP_BOOTSTRAP_SERVERS → --bootstrap-servers +APP_SCHEMA_REGISTRY_URL → --schema-registry-url +APP_OUTPUT_TOPIC → --output-topic +``` + +Additionally, Kafka-specific environment variables with the `KAFKA_` prefix are automatically added to the Kafka +configuration. + +### Schema Registry integration + +When the `--schema-registry-url` option is provided: + +- Schemas are registered automatically during application startup +- Schema cleanup is handled as part of the `clean` command +- Schema evolution is fully supported + ## Command line interface + +A unified command-line interface is provided for application configuration. + +### CLI Commands + +- `run`: Run the application +- `clean`: Delete topics and consumer groups +- `reset`: Reset internal state and offsets (for Streams apps) + +### Common CLI Configuration Options + +- `--bootstrap-servers`: Kafka bootstrap servers (required) +- `--schema-registry-url`: URL for Avro serialization +- `--kafka-config`: Key-value Kafka configuration diff --git a/docs/docs/user/concepts/consumer-producer.md b/docs/docs/user/concepts/consumer-producer.md new file mode 100644 index 000000000..e69de29bb diff --git a/docs/docs/user/concepts/consumer.md b/docs/docs/user/concepts/consumer.md new file mode 100644 index 000000000..e69de29bb diff --git a/docs/docs/user/concepts/producer.md b/docs/docs/user/concepts/producer.md index 307bbcaf2..fa8d04af3 100644 --- a/docs/docs/user/concepts/producer.md +++ b/docs/docs/user/concepts/producer.md @@ -1,15 +1,155 @@ -# Producer apps +# Producer applications -Producer apps are applications that generate data and send it to a Kafka topic. -They can be used to produce messages from various sources, such as databases, files, or real-time events. +Producer applications generate data and send it to Kafka topics. They can be used to produce messages from various +sources, such as databases, files, or real-time events. + +streams-bootstrap provides a structured way to build producer applications with consistent configuration handling, +command-line support, and lifecycle management. + +--- ## Application lifecycle -- Running an application -- Cleaning an application +### Running an application + +Producer applications are executed using the `ProducerRunner`, which runs the producer logic defined by the application. + +Unlike Kafka Streams applications, producer applications typically: + +- Run to completion and terminate automatically, or +- Run continuously when implemented as long-lived services + +The execution model is fully controlled by the producer implementation and its runnable logic. + +--- + +### Cleaning an application + +Producer applications support a dedicated `clean` command. + +```bash +java -jar my-producer-app.jar \ + --bootstrap-servers localhost:9092 \ + --output-topic my-topic \ + clean +``` + +The clean process can perform the following operations: + +- Delete output topics +- Delete registered schemas from Schema Registry +- Execute custom cleanup hooks defined by the application + +Applications can register custom cleanup logic by overriding `setupCleanUp`. + +--- ## Configuration +### Serialization configuration + +Producer applications define key and value serialization using the `defaultSerializationConfig()` method in their +`ProducerApp` implementation. + +```java + +@Override +public SerializerConfig defaultSerializationConfig() { + return new SerializerConfig(StringSerializer.class, SpecificAvroSerializer.class); +} +``` + +### Kafka properties + +#### Base configuration + +The following Kafka properties are configured by default for Producer applications in streams-bootstrap: + +- `max.in.flight.requests.per.connection = 1` +- `acks = all` +- `compression.type = gzip` + +#### Custom Kafka properties + +Kafka configuration can be customized by overriding `createKafkaProperties()`: + +```java +@Override +public Map createKafkaProperties() { + return Map.of( + ProducerConfig.RETRIES_CONFIG, 3, + ProducerConfig.BATCH_SIZE_CONFIG, 16384, + ProducerConfig.LINGER_MS_CONFIG, 5 + ); +} +``` + +These properties are merged with defaults and CLI-provided configuration. + +--- + +### Lifecycle hooks + +Producer applications can register cleanup logic via `setupCleanUp`. This method allows you to attach: + +- **Cleanup hooks** – for general cleanup logic not tied to Kafka topics +- **Topic hooks** – for reacting to topic lifecycle events (e.g. deletion) + +#### Clean up + +Custom cleanup logic that is not tied to Kafka topics can be registered via cleanup hooks: + +```java + +@Override +public ProducerCleanUpConfiguration setupCleanUp( + final AppConfiguration configuration) { + + return ProducerApp.super.setupCleanUp(configuration) + .registerCleanHook(() -> { + // Custom cleanup logic + }); +} +``` + +#### Topic hooks + +Topic hooks should be used for topic-related cleanup or side effects, such as releasing external +resources associated with a topic or logging topic deletions: + +```java +@Override +public ProducerCleanUpConfiguration setupCleanUp( + final AppConfiguration configuration) { + + return ProducerApp.super.setupCleanUp(configuration) + .registerTopicHook(new TopicHook() { + + @Override + public void deleted(final String topic) { + // Called when a managed topic is deleted + System.out.println("Deleted topic: " + topic); + } + + @Override + public void close() { + // Optional cleanup for the hook itself + } + }); +} +``` + ## Command line interface +Producer applications inherit standard CLI options from `KafkaApplication`. The following CLI options are producer-specific: + +| Option | Description | Default | +|---------------------------|--------------------------------------------------|---------| +| `--output-topic` | Default output topic | - | +| `--labeled-output-topics` | Named output topics (`label1=topic1,...`) | - | + +--- + ## Deployment + +TODO diff --git a/docs/docs/user/concepts/streams.md b/docs/docs/user/concepts/streams.md index 0924695cf..7a601e900 100644 --- a/docs/docs/user/concepts/streams.md +++ b/docs/docs/user/concepts/streams.md @@ -1,36 +1,267 @@ -# Streams apps +# Streams applications Streams apps are applications that process data in real-time as it flows through Kafka topics. They can be used to filter, transform, aggregate, or enrich data streams. Streams apps can also produce new messages to other topics based on the processed data. +--- + ## Application lifecycle -- Running an application -- Resetting an application -- Cleaning an application +### Running an application + +Kafka Streams applications are started via the `KafkaStreamsApplication` entry point: + +```java +public static void main(final String[] args) { + new MyStreamsApplication().startApplication(args); +} +``` + +When an application is started, the following steps are performed: + +- Parse command-line arguments and environment variables +- Create a `StreamsApp` instance +- Wrap it in a `ConfiguredStreamsApp` +- Convert it into an `ExecutableStreamsApp` +- Start execution using the `StreamsRunner` + +--- + +### Resetting an application + +Streams applications support a dedicated `reset` operation that clears processing state while preserving the +application definition and configuration. This is useful for reprocessing input data from the beginning. + +When a reset is triggered, the following resources are affected: + +| Resource | Action | +|------------------|-------------------------------------------| +| State stores | Cleared locally, changelog topics deleted | +| Internal topics | Deleted (e.g. repartition topics) | +| Consumer offsets | Reset to earliest for input topics | +| Output topics | Preserved | + +Triggering a reset via CLI: + +```bash +java -jar my-streams-app.jar reset +``` + +Triggering a reset programmatically: + +```java +try(StreamsCleanUpRunner cleanUpRunner = streamsApp.createCleanUpRunner()){ + cleanUpRunner. + +reset(); +} +``` + +After a reset, the application can be started again and will reprocess all input data. + +--- + +### Cleaning an application + +The `clean` command performs everything that `reset` does and additionally removes the Kafka consumer groups and output +topics created by the application. + +```bash +java -jar my-streams-app.jar clean +``` + +--- ## Configuration -- Topics -- Application id -- Kafka properties -- Lifecycle hooks - - Setup - - Clean up -- Execution options - - On start - - Application server - - State listener - - Uncaught exception handler - - Closing options +### Topics + +Streams applications support flexible topic configuration: + +- `--input-topics`: Comma-separated list of input topics +- `--input-pattern`: Regex pattern for input topics +- `--output-topic`: Default output topic +- `--error-topic`: Topic for error records +- `--labeled-input-topics`: Named input topics with different message types +- `--labeled-input-patterns`: Additional labeled input topic patterns +- `--labeled-output-topics`: Named output topics with different message types + +--- + +### Application ID + +- `--application-id`: Unique Kafka Streams application ID + +--- + +### Kafka properties + +#### Base configuration + +The following Kafka properties are configured by default for Streams +applications in streams-bootstrap: + +- `processing.guarantee = exactly_once_v2` +- `producer.max.in.flight.requests.per.connection = 1` +- `producer.acks = all` +- `producer.compression.type = gzip` + +#### Custom Kafka properties + +Kafka configuration can be customized by overriding `createKafkaProperties()`: + +```java +@Override +public Map createKafkaProperties() { + return Map.of( + StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2, + StreamsConfig.NUM_STREAM_THREADS_CONFIG, 4, + StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler.class.getName() + ); +} +``` + +--- + +### Lifecycle hooks + +Streams applications support the following hook types: + +- **Cleanup hooks** – for general cleanup logic not tied to Kafka topics +- **Topic hooks** – for reacting to topic lifecycle events (e.g. deletion) +- **Reset hooks** – for logic that should run only during an application reset +- +#### Setup + +TODO + +#### Clean up + +Use cleanup hooks for logic that is not tied to Kafka topics, such as closing external resources +or cleaning up temporary state. + +```java +@Override +public StreamsCleanUpConfiguration setupCleanUp( + final AppConfiguration configuration) { + + return StreamsApp.super.setupCleanUp(configuration) + .registerCleanHook(() -> { + // Custom cleanup logic + }); +} +``` + +#### Topic hooks + +Topic hooks allow Kafka Streams applications to react to Kafka topic lifecycle events, such as topic +deletion during `clean` or `reset` operations. + +```java +@Override +public StreamsCleanUpConfiguration setupCleanUp( + final AppConfiguration configuration) { + + return StreamsApp.super.setupCleanUp(configuration) + .registerTopicHook(new TopicHook() { + @Override + public void deleted(final String topic) { + // Called when a managed topic is deleted + System.out.println("Deleted topic: " + topic); + } + @Override + public void close() { + // Optional cleanup for the hook itself + } + }); +} +``` +##### Reset hooks + +Reset hooks allow Kafka Streams applications to execute custom logic only during a reset operation. They are not invoked during a regular clean. + +--- + +###### Example: reset hook registration (Streams) + +```java +@Override +public StreamsCleanUpConfiguration setupCleanUp( + final AppConfiguration configuration) { + + return StreamsApp.super.setupCleanUp(configuration) + .registerResetHook(() -> { + // Custom logic executed only during reset + }); +} +``` + +--- + +### Execution options + +#### On start + +Custom logic can be executed once Kafka Streams has fully started: + +```java + +@Override +private void onStreamsStart(final RunningStreams runningStreams) { + // Custom startup logic +} +``` + +#### Application server + +TODO + +#### State listener + +TODO + +#### Uncaught exception handler + +TODO + +#### Closing options + +TODO + +--- ## Command line interface +Streams applications inherit standard CLI options from `KafkaStreamsApplication`. The following CLI options are streams-app-specific: + +| Option | Description | Default | +|--------------------------------|-------------------------------------------|----------------| +| `--application-id` | Kafka Streams application ID | Auto-generated | +| `--volatile-group-instance-id` | Use volatile group instance ID | false | + +--- + ## Deployment -## Kafka streams extensions +TODO + +--- + +## Kafka Streams extensions + +Several extensions are provided that simplify working with Kafka Streams. + +### Simple topic access + +TODO + +### Error handling + +TODO + +### Serde auto configuration + +TODO -- Simple topic access -- Error handling -- Serde auto configuration +--- diff --git a/docs/docs/user/deployment/kubernetes.md b/docs/docs/user/deployment/kubernetes.md index 87c833da9..68d22c88d 100644 --- a/docs/docs/user/deployment/kubernetes.md +++ b/docs/docs/user/deployment/kubernetes.md @@ -1,5 +1,209 @@ # Deployment to Kubernetes -- Autoscaling -- Monitoring -- Persistence +`streams-bootstrap` provides support for deploying applications to Kubernetes using Helm +charts. The charts cover Kafka Streams, producer, consumer, and producer-consumer applications and offer standardized +solutions for autoscaling, monitoring, and state persistence. + +--- + +## Core capabilities + +- **Autoscaling** – KEDA-based horizontal scaling driven by Kafka consumer lag +- **Monitoring** – JMX metrics export with Prometheus integration +- **Persistence** – Persistent volumes for Kafka Streams state stores + +--- + +## Helm charts + +A set of Helm charts is shipped, tailored to different application types: + +| Chart name | Purpose | Kubernetes workload types | +|------------------------|-----------------------------------------------|--------------------------------| +| `streams-app` | Deploy Kafka Streams applications | `Deployment`, `StatefulSet` | +| `producer-app` | Deploy Kafka Producer applications | `Deployment`, `Job`, `CronJob` | +| `consumer-app` | Deploy Kafka Consumer applications | `Deployment`, `StatefulSet` | +| `consumerproducer-app` | Deploy batch / consumer–producer applications | `Deployment`, `StatefulSet` | +| `*-cleanup-job` | Clean Kafka resources before deployment | `Job` (Helm hooks) | + +--- + +## Chart repository and installation + +The Helm charts are published as a Helm repository: + +```bash +helm repo add streams-bootstrap https://bakdata.github.io/streams-bootstrap/ +helm repo update +``` + +A Streams application can then be installed with: + +```bash +helm install my-app bakdata-common/streams-app --values my-values.yaml +``` + +--- + +## Deployment patterns + +### Streams, consumer and consumer–producer applications + +Streams, consumer and consumer–producer applications support both stateless and stateful deployment modes: + +- **Deployment** + - Used for stateless applications or when state is stored externally + - Enabled when `statefulSet: false` or `persistence.enabled: false` + +- **StatefulSet** + - Used for stateful Kafka Streams applications with local state stores + - Enabled when `statefulSet: true` + - Required when `persistence.enabled: true` + - Each pod receives a dedicated `PersistentVolumeClaim` for RocksDB state + +--- + +### Producer applications + +Producer applications support multiple execution modes depending on workload characteristics: + +- **Deployment** + - Used for long-running or continuous producers + - Enabled when `deployment: true` + - Supports horizontal scaling via `replicaCount` + +- **Job** + - Used for one-time runs or backfills + - Default when `deployment: false` and no `schedule` is provided + - Supports `restartPolicy`, `backoffLimit`, and `ttlSecondsAfterFinished` + +- **CronJob** + - Used for scheduled, periodic execution + - Enabled when a cron expression is provided via `schedule` + - Supports `suspend`, `successfulJobsHistoryLimit`, and `failedJobsHistoryLimit` + + +--- + +### Cleanup jobs + +Cleanup charts are executed as Helm hook Jobs: + +- Run as `pre-install` or `pre-upgrade` hooks +- Remove: + - Kafka topics + - Consumer groups + - Schema Registry subjects + +This ensures a clean starting point for reprocessing or redeployment scenarios. + +--- + +## Configuration structure +TODO + +--- + +## Environment variable mapping + +Helm values are translated into environment variables using a configurable prefix: + +```yaml +configurationEnvPrefix: "APP" + +commandLine: + MY_PARAM: "value" +kafka: + inputTopics: [ "input" ] + outputTopic: "output" +``` + +This results in: + +- `APP_MY_PARAM=value` +- `APP_INPUT_TOPICS=input` +- `APP_OUTPUT_TOPIC=output` + +Kafka client configuration uses the `KAFKA_` prefix: + +```yaml +kafka: + config: + max.poll.records: 500 +``` + +Becomes: + +- `KAFKA_MAX_POLL_RECORDS=500` + +--- + +## Autoscaling + +Autoscaling is implemented using Kubernetes Event-Driven Autoscaling (KEDA). When enabled, KEDA monitors Kafka consumer +lag and adjusts the number of replicas accordingly. + +Autoscaling is disabled by default. + +### Enabling autoscaling + +```yaml +autoscaling: + enabled: true + lagThreshold: "1000" + minReplicas: 0 + maxReplicas: 5 +``` + +When enabled, the chart creates a KEDA `ScaledObject` and omits a fixed `replicaCount` from the workload specification. + +### Scaling behavior + +KEDA computes the desired number of replicas as: + +``` +desiredReplicas = ceil(totalLag / lagThreshold) +``` + +subject to `minReplicas` and `maxReplicas`. + +### Integration with persistence + +When persistence is enabled for Streams applications, autoscaling targets a `StatefulSet`. Each replica receives its own +`PersistentVolumeClaim`. + +> **Note:** Scale-down operations remove pods and their PVCs. Backup and recovery strategies should be considered. + +--- + +## Monitoring + +Monitoring is based on JMX metrics and Prometheus scraping: + +- `jmx.enabled: true` enables Kafka client and Streams metrics +- `prometheus.jmx.enabled: true` adds a Prometheus JMX exporter sidecar +- Metrics are exposed on a dedicated `/metrics` endpoint + +Collected metrics include consumer lag, processing rates, and RocksDB statistics. + +--- + +## Persistence + +Persistence is configured via the `persistence.*` section (Streams applications only): + +```yaml +persistence: + enabled: true + size: 1Gi + storageClassName: standard +``` + +When enabled together with `statefulSet: true`, each pod receives a dedicated volume for local state storage. This +enables: + +- Faster restarts due to warm state +- Improved recovery semantics for stateful topologies + +If persistence is disabled, applications behave as stateless deployments and rely on Kafka changelogs for state +reconstruction. diff --git a/docs/docs/user/deployment/local.md b/docs/docs/user/deployment/local.md index 74626da4f..47edb9a21 100644 --- a/docs/docs/user/deployment/local.md +++ b/docs/docs/user/deployment/local.md @@ -1 +1,47 @@ # Local deployment + +Applications can be run locally for development and testing purposes. This can be done programmatically within your +code. + +### Programmatic Local Execution + +Here is an example of how to run a producer application programmatically. This is useful for simple applications or for +testing. + +```java +try(final KafkaProducerApplication app = new SimpleKafkaProducerApplication<>(() -> + new ProducerApp() { + @Override + public ProducerRunnable buildRunnable(final ProducerBuilder builder) { + return () -> { + try (final Producer producer = builder.createProducer()) { + // Producer logic + } + }; + } + + @Override + public SerializerConfig defaultSerializationConfig() { + return new SerializerConfig(StringSerializer.class, StringSerializer.class); + } + } +)){ + app. + +setBootstrapServers("localhost:9092"); + app. + +setOutputTopic("output-topic"); + app. + +run(); +} +``` + +### Command Line Execution + +You can also run the application from the command line by packaging it as a JAR file. + +```bash +java -jar my-producer-app.jar --bootstrap-servers localhost:9092 --output-topic my-topic run +``` diff --git a/docs/docs/user/extensions/large-messages.md b/docs/docs/user/extensions/large-messages.md index 31ce42dec..19beffc3d 100644 --- a/docs/docs/user/extensions/large-messages.md +++ b/docs/docs/user/extensions/large-messages.md @@ -1 +1,58 @@ # Large messages + +## Overview + +The Large Messages extension adds support for handling messages that exceed Kafka's size limitations by using +external storage mechanisms with automatic cleanup. +It integrates with *streams-bootstrap* to transparently manage: + +- large message serialization +- large message deserialization +- blob storage files cleanup + +For more details, see the large messages +module: [streams-bootstrap-large-messages GitHub repository](https://github.com/bakdata/streams-bootstrap/tree/master/streams-bootstrap-large-messages) + +There are two supported ways to enable cleanup for large messages: + +- Implement `LargeMessageStreamsApp` +- Register a topic cleanup hook manually + +--- + +### Option 1: Implement `LargeMessageStreamsApp` + +Use this option for Kafka Streams applications where large message cleanup should always run together with topic +cleanup. + +```java +public final class MyStreamsApp implements LargeMessageStreamsApp { + + @Override + public void buildTopology(final StreamsBuilderX builder) { + // build topology here + } +} +``` + +### Option 2: Register a cleanup hook manually + +If cleanup should only happen conditionally or requires custom behavior, a topic hook can be registered explicitly. + +```java +private final boolean largeMessageCleanupEnabled; + +@Override +public StreamsCleanUpConfiguration setupCleanUp( + final AppConfiguration configuration) { + + final StreamsCleanUpConfiguration cleanUp = + StreamsApp.super.setupCleanUp(configuration); + + if (this.largeMessageCleanupEnabled) { + LargeMessageAppUtils.registerTopicHook(cleanUp, configuration); + } + + return cleanUp; +} +``` diff --git a/docs/docs/user/getting-started/quick-start.md b/docs/docs/user/getting-started/quick-start.md index 754bef8a6..129c3098c 100644 --- a/docs/docs/user/getting-started/quick-start.md +++ b/docs/docs/user/getting-started/quick-start.md @@ -1 +1,84 @@ -# Quick start +# Quick Start + +This page shows how to add `streams-bootstrap` to a project and how to create and run a minimal application. + +## Prerequisites + +- Java 17 +- Apache Kafka cluster (brokers reachable from the application) +- `streams-bootstrap-cli` dependency (see [Setup](setup.md) for Gradle/Maven snippets) + +## Minimal Kafka Streams Application + +Create a subclass of `KafkaStreamsApplication` and implement the required methods. + +```java +import com.bakdata.kafka.streams.KafkaStreamsApplication; +import com.bakdata.kafka.streams.SerdeConfig; +import com.bakdata.kafka.streams.StreamsApp; +import com.bakdata.kafka.streams.StreamsTopicConfig; +import com.bakdata.kafka.streams.kstream.KStreamX; +import com.bakdata.kafka.streams.kstream.StreamsBuilderX; +import java.util.Map; +import org.apache.kafka.common.serialization.Serdes.StringSerde; + +public class MyStreamsApplication extends KafkaStreamsApplication { + + public static void main(final String[] args) { + new MyStreamsApplication().startApplication(args); + } + + @Override + public StreamsApp createApp() { + return new StreamsApp() { + @Override + public void buildTopology(final StreamsBuilderX builder) { + final KStreamX input = builder.streamInput(); + // topology definition + input.toOutputTopic(); + } + + @Override + public String getUniqueAppId(final StreamsTopicConfig topics) { + return "streams-bootstrap-app-" + topics.getOutputTopic(); + } + + @Override + public SerdeConfig defaultSerializationConfig() { + return new SerdeConfig(StringSerde.class, StringSerde.class); + } + }; + } +} +``` + +## Running the Application + +### Via Command Line Interface + +When packaged as a runnable JAR (for example, in a container), the `run` command is the default entrypoint: + +```bash +java -jar my-streams-app.jar \ + run \ + --bootstrap-servers kafka:9092 \ + --input-topics input-topic \ + --output-topic output-topic \ + --schema-registry-url http://schema-registry:8081 +``` + +Additional subcommands such as `clean` and `reset` are available for lifecycle management. + +### From the `main` Method + +In the `main` method, the application subclass starts up via: + +```java +public static void main(final String[] args) { + new MyStreamsApplication().startApplication(args); +} +``` + +This delegates configuration loading, lifecycle handling, and shutdown to `streams-bootstrap`. + +--- diff --git a/docs/docs/user/getting-started/setup.md b/docs/docs/user/getting-started/setup.md index feae8cb57..95eba5ce6 100644 --- a/docs/docs/user/getting-started/setup.md +++ b/docs/docs/user/getting-started/setup.md @@ -1 +1,32 @@ # Setup + +This page describes dependency setup, configuration options, commands, and Helm-based deployment for +`streams-bootstrap`. + +## Dependencies + +### Gradle + +```gradle +implementation group: 'com.bakdata.kafka', name: 'streams-bootstrap-cli', version: '6.1.0' +``` + +With Kotlin DSL: + +```gradle +implementation(group = "com.bakdata.kafka", name = "streams-bootstrap-cli", version = "6.1.0") +``` + +### Maven + +```xml + + + com.bakdata.kafka + streams-bootstrap-cli + 6.1.0 + +``` + +For other build tools or versions, refer to the +[latest version in MvnRepository](https://mvnrepository.com/artifact/com.bakdata.kafka/streams-bootstrap/latest). diff --git a/docs/docs/user/monitoring.md b/docs/docs/user/monitoring.md index 31b5a7045..94e83e4a3 100644 --- a/docs/docs/user/monitoring.md +++ b/docs/docs/user/monitoring.md @@ -1 +1,337 @@ # Monitoring + +Monitoring features are provided for your applications. + +- **JMX Metrics Export**: Applications built with `streams-bootstrap` can expose JMX (Java Management Extensions) + metrics, which provide insights into the performance and health of the Java application and the Kafka clients. +- **Prometheus Integration**: The Helm charts are configured to work with Prometheus, a popular open-source monitoring + and alerting toolkit. This allows you to scrape the JMX metrics and visualize them in dashboards (e.g., using + Grafana). + +## Monitoring and Observability + +The Helm charts provide integrated monitoring and observability for Kafka applications using a combination of +JMX, Prometheus, Kubernetes probes, and Services. Monitoring can be tailored from lightweight setups for development +to full production stacks with Prometheus Operator. + +### Monitoring Mechanisms + +| Mechanism | Use Case | Key Values | +|-------------------------|-------------------------------------|--------------------------| +| JMX remote access | Direct debugging and inspection | `jmx.enabled` | +| Prometheus JMX exporter | Production metrics collection | `prometheus.jmx.enabled` | +| Liveness probes | Container health checks | `livenessProbe` | +| Readiness probes | Traffic readiness / rollout control | `readinessProbe` | + +### JMX Configuration + +JMX (Java Management Extensions) provides direct access to application metrics and management operations, typically +used for development and debugging. + +Enable JMX in `values.yaml`: + +```yaml +jmx: + enabled: true + port: 5555 + host: localhost +``` + +Parameters: + +- `jmx.enabled`: Enable JMX port for remote access (default: `false`). +- `jmx.port`: JMX port number (default: `5555`). +- `jmx.host`: Host binding for the RMI server (default: `localhost`). + +When enabled, the chart configures the JVM with flags similar to: + +```text +-Dcom.sun.management.jmxremote +-Dcom.sun.management.jmxremote.port=5555 +-Dcom.sun.management.jmxremote.local.only=false +-Dcom.sun.management.jmxremote.authenticate=false +-Dcom.sun.management.jmxremote.ssl=false +-Djava.rmi.server.hostname=localhost +``` + +Accessing JMX metrics from a local client: + +```bash +kubectl port-forward 5555:5555 +jconsole localhost:5555 +``` + +### Prometheus JMX Exporter + +For production monitoring, the Prometheus JMX Exporter runs as a sidecar container that scrapes JMX metrics from the +application and exposes them in Prometheus format. + +Enable the exporter in `values.yaml`: + +```yaml +prometheus: + jmx: + enabled: true + image: bitnami/jmx-exporter + imageTag: 1.1.0 + imagePullPolicy: Always + port: 5556 + metricRules: + - pattern: ".*" + resources: + requests: + cpu: 10m + memory: 100Mi + limits: + cpu: 100m + memory: 100Mi +``` + +Key parameters: + +- `prometheus.jmx.enabled`: Deploy JMX exporter sidecar (default: `false`). +- `prometheus.jmx.image`: Container image for the exporter (default: `bitnami/jmx-exporter`). +- `prometheus.jmx.imageTag`: Exporter image tag (default: `1.1.0`). +- `prometheus.jmx.port`: HTTP port for metrics endpoint (default: `5556`). +- `prometheus.jmx.metricRules`: JMX metric selection and mapping rules. +- `prometheus.jmx.resources`: Resource requests/limits for the exporter container. + +#### Metric Rules + +The `metricRules` section configures which JMX beans are exposed and how they are mapped to Prometheus metrics. The +default configuration uses `pattern: ".*"` to export all metrics, but production setups should restrict this to +relevant Kafka Streams/producer/consumer metrics. + +Example rule set: + +```yaml +prometheus: + jmx: + metricRules: + - pattern: "kafka.streams<>(.+):" + name: kafka_streams_$1_$5 + labels: + client_id: "$2" + $3: "$4" + - pattern: "kafka.producer<>(.+):" + name: kafka_producer_$1_$3 + labels: + client_id: "$2" +``` + +A ConfigMap containing the JMX exporter configuration is generated automatically by the chart and mounted into the +sidecar container. + +### Prometheus Integration + +#### Pod annotations + +For Prometheus instances that use pod annotations for discovery: + +```yaml +podAnnotations: + prometheus.io/scrape: "true" + prometheus.io/path: "/metrics" + prometheus.io/port: "5556" +``` + +This enables scraping the JMX exporter endpoint exposed on `prometheus.jmx.port`. + +### PodMonitor + +For more advanced Prometheus Operator setups, a `PodMonitor` custom resource can be deployed. + +The `streams-bootstrap` repository provides a reference `PodMonitor` +configuration: [monitoring/pod_monitor.yaml](https://github.com/bakdata/streams-bootstrap/blob/1ff01c2f/monitoring/pod_monitor.yaml) + +### Health Checks + +Kubernetes uses liveness and readiness probes to determine when a pod is healthy and when it is ready to receive +traffic. + +**Liveness probes** restart containers that become unhealthy: + +```yaml +livenessProbe: + httpGet: + path: /health + port: 8080 + initialDelaySeconds: 30 + periodSeconds: 10 + timeoutSeconds: 5 + failureThreshold: 3 +``` + +**Readiness probes** gate traffic until the application is ready: + +```yaml +readinessProbe: + httpGet: + path: /ready + port: 8080 + initialDelaySeconds: 10 + periodSeconds: 5 + timeoutSeconds: 3 + successThreshold: 1 + failureThreshold: 3 +``` + +All standard Kubernetes probe types are supported (`httpGet`, `tcpSocket`, `exec`, and `grpc` on recent Kubernetes +versions). Probes are configured under the corresponding `livenessProbe` and `readinessProbe` sections in values. + +### Service and Port Configuration + +Ports and Services control how HTTP APIs and metrics endpoints are exposed: + +```yaml +service: + enabled: true + type: ClusterIP + +ports: + - containerPort: 8080 + name: http + protocol: TCP + servicePort: 80 + - containerPort: 5556 + name: metrics + protocol: TCP + servicePort: 5556 +``` + +Port mapping reference: + +- `jmx.port` → JMX remote port (default `5555`). +- `prometheus.jmx.port` → JMX exporter metrics port (default `5556`). +- Additional entries in `ports[]` → application-specific ports (e.g. HTTP APIs, custom metrics endpoints). + +### Monitoring Configuration Examples + +**Full monitoring stack** (JMX exporter, probes, Service, annotations): + +```yaml +prometheus: + jmx: + enabled: true + port: 5556 + metricRules: + - pattern: "kafka.streams<>(.+):" + name: kafka_streams_$2 + labels: + client_id: "$1" + +readinessProbe: + httpGet: + path: /ready + port: 8080 + initialDelaySeconds: 30 + periodSeconds: 10 + +livenessProbe: + httpGet: + path: /health + port: 8080 + initialDelaySeconds: 60 + periodSeconds: 30 + +service: + enabled: true + type: ClusterIP + +ports: + - containerPort: 8080 + name: http + servicePort: 80 + - containerPort: 5556 + name: metrics + servicePort: 5556 + +podAnnotations: + prometheus.io/scrape: "true" + prometheus.io/path: "/metrics" + prometheus.io/port: "5556" +``` + +**Development/debug configuration with JMX only**: + +```yaml +jmx: + enabled: true + port: 5555 + host: localhost + +prometheus: + jmx: + enabled: false + +livenessProbe: + tcpSocket: + port: 5555 + initialDelaySeconds: 30 + periodSeconds: 30 +``` + +**Minimal production configuration** with annotations and resource limits: + +```yaml +prometheus: + jmx: + enabled: true + resources: + requests: + cpu: 10m + memory: 100Mi + limits: + cpu: 100m + memory: 100Mi + +podAnnotations: + prometheus.io/scrape: "true" + prometheus.io/port: "5556" +``` + +### Application-Specific Considerations + +Kafka Streams applications expose metrics under several JMX domains, including `kafka.streams`, `kafka.producer`, and +`kafka.consumer`. Commonly monitored metrics include: + +- `kafka.streams.state`: Overall application state (running, rebalancing, error). +- `kafka.streams.commit-latency-avg`: Average commit latency. +- `kafka.consumer.records-lag-max`: Maximum records lag per partition. +- `kafka.producer.record-send-rate`: Producer throughput. + +Producer and consumer applications (via `producer-app` and `consumer-app` charts) use the same `prometheus.jmx` +structure +but may differ in availability patterns (for example, Jobs vs Deployments). + +### Troubleshooting + +**Common issues**: + +- No metrics endpoint: + - Ensure `prometheus.jmx.enabled: true`. +- Connection refused on JMX port: + - Ensure `jmx.enabled: true` and the port is exposed. +- Empty metrics response: + - Review `metricRules` patterns; overly restrictive rules may filter out all metrics. +- High exporter CPU usage: + - Avoid `pattern: ".*"` in production; use targeted patterns instead. +- Pod not ready: + - Validate liveness/readiness probe configuration and the corresponding application endpoints. + +**Verifying metrics export**: + +```bash +kubectl port-forward 5556:5556 +curl http://localhost:5556/metrics +``` + +**Debugging JMX connection**: + +```bash +kubectl port-forward 5555:5555 +jconsole localhost:5555 +``` + +If connection fails, verify that JMX is enabled, the port is mapped in `ports`, and the JVM has been started with the +correct JMX system properties. diff --git a/docs/docs/user/testing.md b/docs/docs/user/testing.md index f00b526a9..2608cc125 100644 --- a/docs/docs/user/testing.md +++ b/docs/docs/user/testing.md @@ -1 +1,327 @@ # Testing + +The `streams-bootstrap` testing tools provide utilities for testing Kafka Streams, Consumer, Producer and Consumer-Producer +applications, covering both unit-level and integration-style scenarios. + +They abstract common test concerns such as Kafka infrastructure setup, Schema Registry integration, application +lifecycle handling, and consumer group verification, and are designed to work with real Kafka clusters as well as +schema-aware test environments. + +## TestApplicationRunner + +`TestApplicationRunner` is a test utility for running, configuring, and verifying Kafka applications in integration and system tests. + +It abstracts away repetitive setup such as: +- bootstrap servers +- Schema Registry +- Kafka client configuration +- CLI argument wiring +- lifecycle commands (`run`, `clean`, `reset`) + +Typical use cases: +- end-to-end tests +- containerized test environments +- embedded Kafka setups +- CI pipelines + +## Typical Usage + +```java +TestApplicationRunner runner = + TestApplicationRunner.create("localhost:9092") + .withSchemaRegistry() + .withStateDir(tempDir) + .withNoStateStoreCaching(); +``` + +All applications executed via this runner automatically inherit this configuration. + +Bootstrap Servers +- Passed via `--bootstrap-servers` +- Also set directly on the application instance + +Kafka Configuration +- All provided Kafka properties are injected +- Passed via `--kafka-config key=value` +- Also merged into `app.setKafkaConfig(...)` + +Schema Registry (optional) +- Passed via `--schema-registry-url` +- Only configured when explicitly enabled + +--- + +### Configuring Kafka for Tests + +```java +runner = runner.withKafkaConfig(Map.of("auto.offset.reset", "earliest")); +``` + +Behavior: +- merged with existing configuration +- immutable after creation +- overrides application defaults + +--- + +#### Kafka Streams–Specific Helpers + +##### Configure State Directory + +```java +runner = runner.withStateDir(tempDir); +``` + +Sets: +``` +state.dir = +``` + +Use this to: +- isolate test runs +- avoid state leakage between tests + +--- + +##### Disable State Store Caching + +```java +runner = runner.withNoStateStoreCaching(); +``` + +Sets: +``` +statestore.cache.max.bytes = 0 +``` + +Useful when: +- asserting exact record counts +- debugging processor behavior +- avoiding cache-related timing issues + +--- + +#### Consumer-Specific Helpers + +##### Configure Session Timeout + +```java +runner = runner.withSessionTimeout(Duration.ofSeconds(5)); +``` + +Sets: +``` +session.timeout.ms = 5000 +``` + +Useful for: +- fast consumer group rebalancing +- deterministic failure testing + +--- + +### Schema Registry Support + +#### Enable a Test Schema Registry + +```java +runner = runner.withSchemaRegistry(); +``` + +Creates: +- isolated in-memory Schema Registry +- random scope to avoid collisions +- transparent integration for the application + +--- + +#### Use a Custom TestSchemaRegistry + +```java +TestSchemaRegistry registry = new TestSchemaRegistry(); +runner = runner.withSchemaRegistry(registry); +``` + +Use this when: +- sharing schemas across applications +- inspecting registered schemas during tests + +--- + +### Running Applications + +#### CLI + +```java +CompletableFuture exitCode = + runner.run(app, "--some-flag"); +``` + +- invokes `startApplicationWithoutExit` +- returns application exit code + +--- + +#### Runnable + +```java +CompletableFuture execution = runner.run(app); +``` + +- calls `onApplicationStart()` +- runs application directly +- suitable for long-running tests + +--- + +### Cleaning and Resetting Applications + +#### Clean + +```java +runner.clean(app); +``` + +or + +```java +runner.clean(app, "--custom-arg"); +``` + +Used to: +- delete Kafka topics +- clean local state +- execute cleanup hooks + +--- + +#### Reset + +Supported for: +- Streams applications +- Consumer applications +- Consumer–Producer applications + +```java +runner.reset(streamsApp); +``` + +--- + +### Consumer Group Verification + +```java +ConsumerGroupVerifier verifier = runner.verify(streamsApp); +``` + +Allows you to: +- assert consumer group existence +- check stability +- inspect committed offsets + +--- + +### Creating Test Clients + +```java +KafkaTestClient client = runner.newTestClient(); +``` + +Provides: +- AdminClient access +- Producer/Consumer helpers +- runtime-aware configuration + +--- + +## TestApplicationTopologyFactory + +`TestApplicationTopologyFactory` is a test helper for Kafka Streams applications that integrates with Fluent Kafka Streams Tests. + +It allows you to: +- derive a `TestTopology` from a real application +- reuse production topology and configuration +- inject test-specific runtime settings + +--- + +### Typical Usage + +```java +TestApplicationTopologyFactory factory = + TestApplicationTopologyFactory.withSchemaRegistry(); +``` + +or without Schema Registry: + +```java +TestApplicationTopologyFactory factory = new TestApplicationTopologyFactory(); +``` + +--- + +### Schema Registry Support + +#### Automatic Schema Registry + +```java +TestApplicationTopologyFactory factory = + TestApplicationTopologyFactory.withSchemaRegistry(); +``` + +- random isolated scope +- no cross-test collisions +- safe for parallel execution + +--- + +#### Custom Schema Registry + +```java +TestSchemaRegistry registry = new TestSchemaRegistry(); +TestApplicationTopologyFactory factory = + TestApplicationTopologyFactory.withSchemaRegistry(registry); +``` + +--- + +### Modifying Kafka Configuration + +```java +factory = factory.with(Map.of("commit.interval.ms", 100)); +``` + +- merged into runtime configuration +- applies only to tests +- does not mutate application + +--- + +### Creating a TestTopology + +```java +TestTopology topology = factory.createTopology(app); +``` + +Execution flow: +1. application prepared +2. runtime configuration injected +3. topology extracted +4. `TestTopology` created + +--- + +### JUnit 5 Integration + +```java +TestTopologyExtension extension = factory.createTopologyExtension(app); +``` + +--- + +### Accessing Kafka Properties + +```java +Map props = factory.getKafkaProperties(app); +``` +--- diff --git a/docs/mkdocs.yml b/docs/mkdocs.yml index f7b583021..1f1e55a32 100644 --- a/docs/mkdocs.yml +++ b/docs/mkdocs.yml @@ -31,6 +31,8 @@ nav: - Common concepts: user/concepts/common.md - Streams concepts: user/concepts/streams.md - Producer concepts: user/concepts/producer.md + - Consumer concepts: user/concepts/consumer.md + - Consumer-Producer concepts: user/concepts/consumer-producer.md - Testing: user/testing.md - Monitoring: user/monitoring.md - Extensions: