diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml
index 084d408e..21c9b36f 100644
--- a/.pre-commit-config.yaml
+++ b/.pre-commit-config.yaml
@@ -31,6 +31,11 @@ repos:
hooks:
- id: flake8
language_version: python3.11
+ exclude: |
+ (?x)^(
+ flink_worker/.*/flink_worker_pb2\.py$|
+ flink_worker/.*/flink_worker_pb2_grpc\.py$
+ )$
- repo: https://github.com/pycqa/isort
rev: 6.0.0
hooks:
diff --git a/.vscode/settings.json b/.vscode/settings.json
index 96ae581f..0a62c380 100644
--- a/.vscode/settings.json
+++ b/.vscode/settings.json
@@ -8,5 +8,5 @@
"mypy-type-checker.args": [
"--strict"
],
- "editor.formatOnSave": true
+ "editor.formatOnSave": true,
}
diff --git a/flink_bridge/.idea/compiler.xml b/flink_bridge/.idea/compiler.xml
new file mode 100644
index 00000000..d5708ca4
--- /dev/null
+++ b/flink_bridge/.idea/compiler.xml
@@ -0,0 +1,13 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/flink_bridge/.idea/encodings.xml b/flink_bridge/.idea/encodings.xml
new file mode 100644
index 00000000..d8a72150
--- /dev/null
+++ b/flink_bridge/.idea/encodings.xml
@@ -0,0 +1,7 @@
+
+
+
+
+
+
+
diff --git a/flink_bridge/.idea/jarRepositories.xml b/flink_bridge/.idea/jarRepositories.xml
new file mode 100644
index 00000000..958889ad
--- /dev/null
+++ b/flink_bridge/.idea/jarRepositories.xml
@@ -0,0 +1,25 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/flink_bridge/.idea/misc.xml b/flink_bridge/.idea/misc.xml
new file mode 100644
index 00000000..ea03147b
--- /dev/null
+++ b/flink_bridge/.idea/misc.xml
@@ -0,0 +1,12 @@
+
+
+
+
+
+
+
+
diff --git a/flink_bridge/.idea/vcs.xml b/flink_bridge/.idea/vcs.xml
new file mode 100644
index 00000000..54e4b961
--- /dev/null
+++ b/flink_bridge/.idea/vcs.xml
@@ -0,0 +1,6 @@
+
+
+
+
+
+
diff --git a/flink_bridge/.idea/workspace.xml b/flink_bridge/.idea/workspace.xml
new file mode 100644
index 00000000..f9619bf1
--- /dev/null
+++ b/flink_bridge/.idea/workspace.xml
@@ -0,0 +1,57 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ {
+ "associatedIndex": 2
+}
+
+
+
+
+
+ {
+ "keyToString": {
+ "RunOnceActivity.ShowReadmeOnStart": "true",
+ "kotlin-language-version-configured": "true",
+ "last_opened_file_path": "/Users/filippopacifici/code/streams/flink_bridge/pom.xml"
+ }
+}
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ 1755554494736
+
+
+ 1755554494736
+
+
+
+
diff --git a/flink_bridge/CLI_README.md b/flink_bridge/CLI_README.md
new file mode 100644
index 00000000..bb899fa8
--- /dev/null
+++ b/flink_bridge/CLI_README.md
@@ -0,0 +1,184 @@
+# Flink Bridge gRPC CLI Client
+
+This is a simple Java CLI application that demonstrates the gRPC client functionality by communicating with the Python FlinkWorker gRPC service.
+
+## Features
+
+- Interactive command-line interface
+- Send custom messages to the gRPC service
+- Built-in test message functionality
+- Configurable host and port
+- Real-time message processing and response display
+
+## Prerequisites
+
+- Java 24 or later
+- Maven 3.6 or later
+- Python gRPC server running (see setup below)
+
+## Quick Start
+
+### 1. Start the Python gRPC Server
+
+First, start the Python gRPC server in a separate terminal:
+
+```bash
+cd flink_worker
+source .venv/bin/activate
+python -m flink_worker.server --port 50053
+```
+
+### 2. Run the CLI Application
+
+#### Option A: Using the provided script (recommended)
+
+```bash
+cd flink_bridge
+./run-cli.sh
+```
+
+This script will:
+- Compile the project
+- Build the CLI JAR
+- Run the application connecting to localhost:50053
+
+#### Option B: Manual compilation and execution
+
+```bash
+cd flink_bridge
+
+# Compile and build
+mvn clean package
+
+# Run the CLI application
+java -jar target/flink-bridge-cli.jar [host] [port]
+```
+
+### 3. Use the CLI
+
+Once running, you'll see an interactive prompt:
+
+```
+=== Flink Worker gRPC CLI Client ===
+Commands:
+ - Send a message to the gRPC service
+ help - Show this help message
+ quit/exit - Exit the application
+ test - Send a test message
+
+grpc>
+```
+
+## CLI Commands
+
+- **Any text message**: Sends the message to the gRPC service
+- **`test`**: Sends a predefined test message
+- **`help`**: Shows available commands
+- **`quit` or `exit`**: Exits the application
+
+## Example Usage
+
+```
+grpc> Hello, this is a test message
+Response received:
+ - Payload: Hello, this is a test message
+ Headers: {source=cli, timestamp=1703123456789, message_id=0}
+ Timestamp: 1703123456789
+ - Payload: Hello, this is a test message
+ Headers: {source=cli, timestamp=1703123456789, message_id=0, processed=true, segment_id=0}
+ Timestamp: 1703123456789
+
+grpc> test
+Sending test message: This is a test message from the CLI client
+Response received:
+ - Payload: This is a test message from the CLI client
+ Headers: {source=cli, timestamp=1703123456790, message_id=1}
+ Timestamp: 1703123456790
+ - Payload: This is a test message from the CLI client
+ Headers: {source=cli, timestamp=1703123456790, message_id=1, processed=true, segment_id=1}
+ Timestamp: 1703123456790
+
+grpc> quit
+Goodbye!
+```
+
+## Configuration
+
+### Command Line Arguments
+
+- **Host**: First argument (default: `localhost`)
+- **Port**: Second argument (default: `50053`)
+
+Examples:
+```bash
+# Connect to localhost:50053 (default)
+java -jar target/flink-bridge-cli.jar
+
+# Connect to specific host and port
+java -jar target/flink-bridge-cli.jar 192.168.1.100 50051
+
+# Connect to specific host with default port
+java -jar target/flink-bridge-cli.jar my-server.com
+```
+
+## Architecture
+
+The CLI application uses the same `GrpcClient` class that the Flink application uses, demonstrating:
+
+1. **gRPC Communication**: Direct communication with the Python service
+2. **Message Processing**: Sending and receiving protobuf messages
+3. **Error Handling**: Graceful error handling and user feedback
+4. **Resource Management**: Proper cleanup of gRPC connections
+
+## Troubleshooting
+
+### Common Issues
+
+1. **"gRPC service is not available"**
+ - Make sure the Python server is running
+ - Check the host and port configuration
+ - Verify the server is accessible from your network
+
+2. **"Connection refused"**
+ - Check if the server is running on the specified port
+ - Verify firewall settings
+ - Ensure the server is binding to the correct interface
+
+3. **"Invalid port number"**
+ - Port must be a valid integer between 1-65535
+ - Check the command line arguments
+
+### Debug Mode
+
+To see more detailed logging, you can set the log level:
+
+```bash
+export SLF4J_SIMPLE_LOG_LEVEL=DEBUG
+java -jar target/flink-bridge-cli.jar
+```
+
+## Development
+
+### Building
+
+```bash
+mvn clean compile # Compile only
+mvn clean package # Compile and package
+mvn clean install # Compile, package, and install to local repo
+```
+
+### Project Structure
+
+- `GrpcCliApp.java` - Main CLI application
+- `GrpcClient.java` - gRPC client abstraction
+- `run-cli.sh` - Convenience script for running the CLI
+- `pom.xml` - Maven configuration with CLI build profile
+
+### Adding New Features
+
+The CLI application is designed to be easily extensible. You can:
+
+1. Add new commands in the `runInteractiveCli` method
+2. Enhance message processing in the `sendMessage` method
+3. Add configuration options for different gRPC service features
+4. Implement batch processing or file input capabilities
diff --git a/flink_bridge/PIPELINE_PARSER_README.md b/flink_bridge/PIPELINE_PARSER_README.md
new file mode 100644
index 00000000..d0abc0a5
--- /dev/null
+++ b/flink_bridge/PIPELINE_PARSER_README.md
@@ -0,0 +1,125 @@
+# Pipeline Parser
+
+This module provides Java classes for parsing YAML pipeline configuration files produced by the FlinkPipelineAdapter.
+
+## Overview
+
+The pipeline parser consists of two main classes:
+- `PipelineStep`: A POJO representing a single pipeline step
+- `PipelineParser`: A utility class for parsing YAML files into lists of PipelineStep objects
+
+## YAML Format
+
+The parser expects YAML files with the following structure:
+
+```yaml
+- config: {}
+ step_name: parser
+- config:
+ batch_size: 2
+ batch_timedelta:18c0a70726f6a6
+ seconds: 20
+ step_name: mybatchk
+```
+
+Each step must have:
+- `step_name`: A string identifying the step
+- `config`: A map containing configuration parameters (can be empty)
+
+## Usage
+
+### Basic Parsing
+
+```java
+import io.sentry.flink_bridge.PipelineParser;
+import io.sentry.flink_bridge.PipelineStep;
+import java.util.List;
+
+// Create parser
+PipelineParser parser = new PipelineParser();
+
+// Parse from file
+List steps = parser.parseFile("pipeline_config.yaml");
+
+// Parse from string
+String yamlContent = "...";
+List steps = parser.parseString(yamlContent);
+
+// Access step information
+for (PipelineStep step : steps) {
+ System.out.println("Step: " + step.getStepName());
+ System.out.println("Config: " + step.getConfig());
+}
+```
+
+### Working with PipelineStep Objects
+
+```java
+PipelineStep step = steps.get(0);
+
+// Get step name
+String stepName = step.getStepName();
+
+// Get configuration
+Map config = step.getConfig();
+
+// Access specific config values
+if (config.containsKey("batch_size")) {
+ Integer batchSize = (Integer) config.get("batch_size");
+ System.out.println("Batch size: " + batchSize);
+}
+```
+
+## Dependencies
+
+The parser requires the SnakeYAML library, which is already included in the project's `pom.xml`:
+
+```xml
+
+ org.yaml
+ snakeyaml
+ 2.2
+
+```
+
+## Error Handling
+
+The parser provides comprehensive error handling:
+
+- `FileNotFoundException`: Thrown when the specified file doesn't exist
+- `RuntimeException`: Thrown for parsing errors, empty YAML, or other issues
+
+```java
+try {
+ List steps = parser.parseFile("config.yaml");
+ // Process steps...
+} catch (FileNotFoundException e) {
+ System.err.println("File not found: " + e.getMessage());
+} catch (RuntimeException e) {
+ System.err.println("Parsing error: " + e.getMessage());
+}
+```
+
+## Examples
+
+See `PipelineParserExample.java` for complete usage examples including:
+- File parsing
+- String parsing
+- Business logic processing
+
+## Testing
+
+Run the tests with:
+
+```bash
+mvn test -Dtest=PipelineParserTest
+```
+
+The tests verify:
+- Correct parsing of valid YAML
+- Error handling for invalid YAML
+- Edge cases like empty content
+
+## Integration with FlinkPipelineAdapter
+
+This parser is designed to work with the YAML output from the Python `FlinkPipelineAdapter` class. The adapter produces YAML descriptions that can be directly consumed by this Java parser, enabling seamless integration between Python pipeline generation and Java pipeline execution.
diff --git a/flink_bridge/README.md b/flink_bridge/README.md
new file mode 100644
index 00000000..c3db3c8e
--- /dev/null
+++ b/flink_bridge/README.md
@@ -0,0 +1,155 @@
+# Flink gRPC Bridge Application
+
+This is an Apache Flink application that demonstrates integration with a gRPC service for message processing. The application reads messages from a text file source and processes them using the `FlinkWorkerService` gRPC service.
+
+## Architecture
+
+The application consists of:
+- **FlinkGrpcApp**: Main Flink application that orchestrates the data flow
+- **GrpcMessageProcessor**: A `ProcessFunction` that implements the `OneInputStreamProcessFunction` pattern
+- **GrpcClient**: Client for communicating with the gRPC service
+- **Proto-generated classes**: Auto-generated from `flink_worker.proto`
+
+## Prerequisites
+
+- Java 11 or higher
+- Maven 3.6+
+- Apache Flink 2.1.0
+- The gRPC service must be running (see the `flink_worker` service)
+
+## Building the Application
+
+1. Navigate to the project directory:
+ ```bash
+ cd streams/flink_bridge
+ ```
+
+2. Build the project:
+ ```bash
+ mvn clean compile
+ ```
+
+ This will:
+ - Generate Java classes from the protobuf definition
+ - Compile the Java source code
+ - Create the JAR file
+
+3. Package the application:
+ ```bash
+ mvn package
+ ```
+
+## Running the Application
+
+### Prerequisites
+
+1. **Start the gRPC service** (from the `flink_worker` directory):
+ ```bash
+ cd streams/flink_worker
+ python -m flink_worker.server
+ ```
+
+ The service will start on port 50051 by default.
+
+2. **Start Flink cluster** (using the installed Flink 2.1.0):
+ ```bash
+ cd flink-2.1.0
+ ./bin/start-cluster.sh
+ ```
+
+### Submit the Job
+
+1. **Submit the Flink job**:
+ ```bash
+ cd flink-2.1.0
+ ./bin/flink run -c com.sentry.flink_bridge.FlinkGrpcApp \
+ ../streams/flink_bridge/target/flink-bridge-1.0.0.jar
+ ```
+
+2. **Monitor the job**:
+ - Open Flink Web UI: http://localhost:8081
+ - Check the job status and logs
+
+### Alternative: Run Locally
+
+You can also run the application locally for development/testing:
+
+```bash
+cd streams/flink_bridge
+mvn exec:java -Dexec.mainClass="com.sentry.flink_bridge.FlinkGrpcApp"
+```
+
+## Configuration
+
+### Input File
+
+The application reads from `input.txt` by default. You can modify the file path in `FlinkGrpcApp.java`:
+
+```java
+new Path("input.txt") // Change this path as needed
+```
+
+### gRPC Service
+
+The gRPC service connection details can be configured in `GrpcClient.java`:
+
+```java
+grpcClient = new GrpcClient("localhost", 50051); // Change host/port as needed
+```
+
+## Expected Output
+
+When running successfully, you should see:
+1. Messages being read from the input file
+2. gRPC service calls for each message
+3. Processed messages printed to standard output
+4. Logs showing the processing flow
+
+## Troubleshooting
+
+### Common Issues
+
+1. **gRPC service not available**:
+ - Ensure the Python gRPC service is running
+ - Check the port number (default: 50051)
+ - Verify network connectivity
+
+2. **Protobuf compilation errors**:
+ - Ensure the proto file is accessible
+ - Check Maven dependencies
+ - Run `mvn clean compile` to regenerate classes
+
+3. **Flink job submission errors**:
+ - Verify Flink cluster is running
+ - Check the JAR file path
+ - Ensure all dependencies are included
+
+### Logs
+
+The application uses SLF4J for logging. Check the Flink task manager logs for detailed information about the processing.
+
+## Development
+
+### Adding New Features
+
+1. **New message types**: Modify the protobuf definition and regenerate classes
+2. **Additional processing**: Extend the `GrpcMessageProcessor` class
+3. **Error handling**: Implement custom error handling in the process function
+4. **Configuration**: Add configuration parameters for flexibility
+
+### Testing
+
+1. **Unit tests**: Add tests for individual components
+2. **Integration tests**: Test the full data flow
+3. **gRPC service mocking**: Use mock services for testing without external dependencies
+
+## Dependencies
+
+- **Apache Flink 2.1.0**: Core streaming framework
+- **gRPC**: For service communication
+- **Protobuf**: For message serialization
+- **SLF4J**: For logging
+
+## License
+
+This project is part of the Sentry Streams project.
diff --git a/flink_bridge/demo_parser.sh b/flink_bridge/demo_parser.sh
new file mode 100755
index 00000000..757a0497
--- /dev/null
+++ b/flink_bridge/demo_parser.sh
@@ -0,0 +1,44 @@
+#!/bin/bash
+
+echo "=== Pipeline Parser Demo ==="
+echo ""
+
+# Compile the project
+echo "1. Compiling the project..."
+mvn compile -q
+if [ $? -eq 0 ]; then
+ echo " ✓ Compilation successful"
+else
+ echo " ✗ Compilation failed"
+ exit 1
+fi
+
+echo ""
+
+# Run the tests
+echo "2. Running tests..."
+mvn test -Dtest=PipelineParserTest -q
+if [ $? -eq 0 ]; then
+ echo " ✓ All tests passed"
+else
+ echo " ✗ Some tests failed"
+fi
+
+echo ""
+
+# Show the example YAML file
+echo "3. Example YAML file (example_pipeline.yaml):"
+cat example_pipeline.yaml
+
+echo ""
+
+# Show how to run the example
+echo "4. To run the parser example:"
+echo " mvn exec:java -Dexec.mainClass=\"io.sentry.flink_bridge.PipelineParserExample\""
+echo ""
+echo " Or compile and run manually:"
+echo " mvn package"
+echo " java -cp target/flink-bridge-app.jar io.sentry.flink_bridge.PipelineParserExample"
+
+echo ""
+echo "=== Demo Complete ==="
diff --git a/flink_bridge/dependency-reduced-pom.xml b/flink_bridge/dependency-reduced-pom.xml
new file mode 100644
index 00000000..c2f4ccc8
--- /dev/null
+++ b/flink_bridge/dependency-reduced-pom.xml
@@ -0,0 +1,235 @@
+
+
+ 4.0.0
+ io.sentry
+ flink-bridge
+ Flink gRPC Bridge
+ 1.0.0
+ A Flink application that processes messages using a gRPC service
+
+
+
+ kr.motd.maven
+ os-maven-plugin
+ 1.7.0
+
+
+
+
+ org.codehaus.mojo
+ build-helper-maven-plugin
+ 3.6.0
+
+
+ add-source
+ generate-sources
+
+ add-source
+
+
+
+ target/generated-sources/protobuf/java
+ target/generated-sources/protobuf/grpc-java
+
+
+
+
+
+
+ maven-shade-plugin
+ 3.4.1
+
+
+ cli-app
+ package
+
+ shade
+
+
+ flink-bridge-cli
+
+
+ io.sentry.flink_bridge.GrpcCliApp
+
+
+
+ io.sentry.flink_bridge.GrpcCliApp
+
+
+
+
+
+ flink-app
+ package
+
+ shade
+
+
+ flink-bridge-app
+
+
+ io.sentry.flink_bridge.FlinkGrpcApp
+
+
+
+ io.sentry.flink_bridge.FlinkGrpcApp
+
+
+
+
+
+
+
+ org.xolstice.maven.plugins
+ protobuf-maven-plugin
+ 0.6.1
+
+
+
+ compile
+ compile-custom
+
+
+
+
+ com.google.protobuf:protoc:${protoc.version}:exe:${os.detected.classifier}
+ grpc-java
+ io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}
+ ${project.basedir}/../protos
+
+
+
+ maven-compiler-plugin
+ 3.11.0
+
+ 21
+ 21
+ ${maven.compiler.source}
+ ${maven.compiler.target}
+
+
+
+ maven-surefire-plugin
+ 3.1.2
+
+
+ **/*Test.java
+
+
+
+
+
+
+
+ xolstice-releases
+ https://raw.githubusercontent.com/xolstice/maven-repo/master/releases
+
+
+
+
+ org.apache.flink
+ flink-streaming-java
+ 2.1.0
+ provided
+
+
+ org.apache.flink
+ flink-clients
+ 2.1.0
+ provided
+
+
+ org.apache.flink
+ flink-annotations
+ 2.1.0
+ provided
+
+
+ javax.annotation
+ javax.annotation-api
+ 1.3.2
+ provided
+
+
+ org.junit.jupiter
+ junit-jupiter
+ 5.10.0
+ test
+
+
+ junit-jupiter-api
+ org.junit.jupiter
+
+
+ junit-jupiter-params
+ org.junit.jupiter
+
+
+ junit-jupiter-engine
+ org.junit.jupiter
+
+
+
+
+ org.apache.flink
+ flink-test-utils
+ 2.1.0
+ test
+
+
+ flink-table-common
+ org.apache.flink
+
+
+ flink-test-utils-junit
+ org.apache.flink
+
+
+ flink-runtime
+ org.apache.flink
+
+
+ flink-core
+ org.apache.flink
+
+
+ flink-rpc-akka-loader
+ org.apache.flink
+
+
+ assertj-core
+ org.assertj
+
+
+ flink-streaming-java
+ org.apache.flink
+
+
+ flink-statebackend-rocksdb
+ org.apache.flink
+
+
+ flink-statebackend-changelog
+ org.apache.flink
+
+
+ flink-dstl-dfs
+ org.apache.flink
+
+
+ curator-test
+ org.apache.curator
+
+
+
+
+
+ 21
+ 3.25.1
+ 2.1.0
+ 21
+ 3.25.1
+ UTF-8
+ 1.60.0
+
+
diff --git a/flink_bridge/example_pipeline.yaml b/flink_bridge/example_pipeline.yaml
new file mode 100644
index 00000000..c28b7736
--- /dev/null
+++ b/flink_bridge/example_pipeline.yaml
@@ -0,0 +1,12 @@
+- config: {}
+ step_name: parser
+- config:
+ batch_size: 2
+ batch_timedelta: "18c0a70726f6a6"
+ seconds: 20
+ step_name: mybatchk
+- config:
+ window_size: 300
+ slide_interval: 60
+ aggregation_type: "sum"
+ step_name: window_aggregator
diff --git a/flink_bridge/flink-bridge.iml b/flink_bridge/flink-bridge.iml
new file mode 100644
index 00000000..25f13ec6
--- /dev/null
+++ b/flink_bridge/flink-bridge.iml
@@ -0,0 +1,9 @@
+
+
+
+
+
+
+
+
+
diff --git a/flink_bridge/input.txt b/flink_bridge/input.txt
new file mode 100644
index 00000000..1d089efa
--- /dev/null
+++ b/flink_bridge/input.txt
@@ -0,0 +1,5 @@
+Hello World
+This is a test message
+Another message for processing
+Flink gRPC integration test
+Processing stream data with external service
diff --git a/flink_bridge/pom.xml b/flink_bridge/pom.xml
new file mode 100644
index 00000000..ce5a046c
--- /dev/null
+++ b/flink_bridge/pom.xml
@@ -0,0 +1,287 @@
+
+
+ 4.0.0
+
+ io.sentry
+ flink-bridge
+ 1.0.0
+ jar
+
+ Flink gRPC Bridge
+ A Flink application that processes messages using a gRPC service
+
+
+ 21
+ 21
+ UTF-8
+ 1.60.0
+ 3.25.1
+ 3.25.1
+ 2.1.0
+
+
+
+
+
+ xolstice-releases
+ https://raw.githubusercontent.com/xolstice/maven-repo/master/releases
+
+
+
+
+
+
+ org.apache.flink
+ flink-streaming-java
+ ${flink.version}
+ provided
+
+
+
+ org.apache.flink
+ flink-connector-base
+ ${flink.version}
+
+
+
+ org.apache.flink
+ flink-clients
+ ${flink.version}
+ provided
+
+
+
+
+ org.apache.flink
+ flink-annotations
+ ${flink.version}
+ provided
+
+
+
+ org.apache.flink
+ flink-connector-kafka
+ 4.0.1-2.0
+
+
+
+
+ io.grpc
+ grpc-netty-shaded
+ ${grpc.version}
+
+
+
+ io.grpc
+ grpc-protobuf
+ ${grpc.version}
+
+
+
+ io.grpc
+ grpc-stub
+ ${grpc.version}
+
+
+
+
+
+ io.grpc
+ grpc-core
+ ${grpc.version}
+
+
+
+ io.grpc
+ grpc-api
+ ${grpc.version}
+
+
+
+
+ com.google.protobuf
+ protobuf-java
+ ${protobuf.version}
+
+
+
+
+ org.slf4j
+ slf4j-api
+ 1.7.36
+
+
+
+ org.slf4j
+ slf4j-simple
+ 1.7.36
+
+
+
+ javax.annotation
+ javax.annotation-api
+ 1.3.2
+ provided
+
+
+
+
+ org.yaml
+ snakeyaml
+ 2.2
+
+
+
+
+ org.junit.jupiter
+ junit-jupiter
+ 5.10.0
+ test
+
+
+
+ org.apache.flink
+ flink-test-utils
+ ${flink.version}
+ test
+
+
+
+
+
+
+
+ kr.motd.maven
+ os-maven-plugin
+ 1.7.0
+
+
+
+
+ org.codehaus.mojo
+ build-helper-maven-plugin
+ 3.6.0
+
+
+ add-source
+ generate-sources
+
+ add-source
+
+
+
+ target/generated-sources/protobuf/java
+ target/generated-sources/protobuf/grpc-java
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+ 3.4.1
+
+
+ cli-app
+ package
+
+ shade
+
+
+ flink-bridge-cli
+
+
+ io.sentry.flink_bridge.GrpcCliApp
+
+
+
+
+
+ io.sentry.flink_bridge.GrpcCliApp
+
+
+
+
+
+
+
+ flink-app
+ package
+
+ shade
+
+
+ flink-bridge-app
+
+
+ io.sentry.flink_bridge.FlinkGrpcApp
+
+
+
+
+
+ io.sentry.flink_bridge.FlinkGrpcApp
+
+
+
+
+
+
+
+
+
+ org.xolstice.maven.plugins
+ protobuf-maven-plugin
+ 0.6.1
+
+
+ com.google.protobuf:protoc:${protoc.version}:exe:${os.detected.classifier}
+
+ grpc-java
+
+ io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}
+
+ ${project.basedir}/../protos
+
+
+
+
+ compile
+ compile-custom
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+ 3.11.0
+
+ 21
+ 21
+ ${maven.compiler.source}
+ ${maven.compiler.target}
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-surefire-plugin
+ 3.1.2
+
+
+ **/*Test.java
+
+
+
+
+
+
diff --git a/flink_bridge/run-cli.sh b/flink_bridge/run-cli.sh
new file mode 100755
index 00000000..498e81d3
--- /dev/null
+++ b/flink_bridge/run-cli.sh
@@ -0,0 +1,60 @@
+#!/bin/bash
+
+# Flink Bridge gRPC CLI Runner
+# This script compiles and runs the Java CLI application
+
+set -e
+
+# Colors for output
+RED='\033[0;31m'
+GREEN='\033[0;32m'
+YELLOW='\033[1;33m'
+NC='\033[0m' # No Color
+
+echo -e "${GREEN}=== Flink Bridge gRPC CLI Runner ===${NC}"
+
+# Check if Maven is available
+if ! command -v mvn &> /dev/null; then
+ echo -e "${RED}Error: Maven is not installed or not in PATH${NC}"
+ echo "Please install Maven and try again"
+ exit 1
+fi
+
+# Check if Java is available
+if ! command -v java &> /dev/null; then
+ echo -e "${RED}Error: Java is not installed or not in PATH${NC}"
+ echo "Please install Java and try again"
+ exit 1
+fi
+
+echo -e "${YELLOW}Compiling the project...${NC}"
+mvn clean compile
+
+echo -e "${YELLOW}Building the CLI application...${NC}"
+mvn package
+
+# Check if the CLI JAR was created
+CLI_JAR="target/flink-bridge-cli.jar"
+if [ ! -f "$CLI_JAR" ]; then
+ echo -e "${RED}Error: CLI JAR file not found at $CLI_JAR${NC}"
+ exit 1
+fi
+
+echo -e "${GREEN}CLI application built successfully!${NC}"
+echo -e "${YELLOW}Usage:${NC}"
+echo " $0 [host] [port]"
+echo " Default: localhost:50053"
+echo ""
+echo -e "${YELLOW}Make sure the Python gRPC server is running:${NC}"
+echo " cd ../flink_worker && source .venv/bin/activate && python -m flink_worker.server --port 50053"
+echo ""
+
+# Parse command line arguments
+HOST=${1:-localhost}
+PORT=${2:-50053}
+
+echo -e "${GREEN}Starting CLI application connecting to $HOST:$PORT...${NC}"
+echo ""
+
+# Run the CLI application
+java -jar "$CLI_JAR" "$HOST" "$PORT"
diff --git a/flink_bridge/src/main/java/io/sentry/flink_bridge/CustomPostProcessor.java b/flink_bridge/src/main/java/io/sentry/flink_bridge/CustomPostProcessor.java
new file mode 100644
index 00000000..45e5b912
--- /dev/null
+++ b/flink_bridge/src/main/java/io/sentry/flink_bridge/CustomPostProcessor.java
@@ -0,0 +1,54 @@
+package io.sentry.flink_bridge;
+
+import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction;
+
+import java.util.Collections;
+import java.util.Set;
+
+import org.apache.flink.api.common.state.ValueStateDeclaration;
+import org.apache.flink.api.common.state.ListStateDeclaration;
+import org.apache.flink.api.common.state.StateDeclaration;
+import org.apache.flink.api.common.state.StateDeclarations;
+import org.apache.flink.api.common.state.v2.ListState;
+import org.apache.flink.api.common.state.v2.ValueState;
+import org.apache.flink.api.common.typeinfo.TypeDescriptors;
+import org.apache.flink.datastream.api.common.Collector;
+import org.apache.flink.datastream.api.context.PartitionedContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Custom processing function that adds logging after gRPC processing.
+ * This implements the OneInputStreamProcessFunction pattern for Flink
+ * DataStream API.
+ */
+public class CustomPostProcessor implements OneInputStreamProcessFunction {
+
+ private static final Logger LOG = LoggerFactory.getLogger(CustomPostProcessor.class);
+ static final ValueStateDeclaration VALUE_STATE_DECLARATION = StateDeclarations
+ .valueState("example-list-state", TypeDescriptors.LONG);
+
+ // static final ValueStateDeclaration VALUE_STATE_DECLARATION =
+ // StateDeclarations
+ // .valueStateBuilder("example-list-state", TypeDescriptors.LONG).build();
+
+ @Override
+ public Set usesStates() {
+ return Collections.singleton(VALUE_STATE_DECLARATION);
+ }
+
+ @Override
+ public void processRecord(Message record, Collector output, PartitionedContext ctx)
+ throws Exception {
+
+ ValueState state = ctx.getStateManager().getState(VALUE_STATE_DECLARATION);
+ long stateVal = 0;
+ if (state.value() != null) {
+ stateVal = state.value();
+ }
+ state.update((long) stateVal + record.getPayload().length);
+ stateVal = state.value();
+ LOG.info("KEY {} Dumping message size: {}", ctx.getStateManager().getCurrentKey(), stateVal);
+ output.collect(stateVal);
+ }
+}
diff --git a/flink_bridge/src/main/java/io/sentry/flink_bridge/FlinkGrpcApp.java b/flink_bridge/src/main/java/io/sentry/flink_bridge/FlinkGrpcApp.java
new file mode 100644
index 00000000..8ad878b9
--- /dev/null
+++ b/flink_bridge/src/main/java/io/sentry/flink_bridge/FlinkGrpcApp.java
@@ -0,0 +1,151 @@
+package io.sentry.flink_bridge;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.api.connector.dsv2.DataStreamV2SinkUtils;
+import org.apache.flink.api.connector.dsv2.DataStreamV2SourceUtils;
+import org.apache.flink.api.connector.dsv2.WrappedSink;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
+import org.apache.flink.connector.kafka.sink.KafkaSink;
+import org.apache.flink.connector.kafka.source.KafkaSource;
+import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
+import org.apache.flink.datastream.api.ExecutionEnvironment;
+import org.apache.flink.datastream.api.builtin.BuiltinFuncs;
+import org.apache.flink.datastream.api.extension.eventtime.EventTimeExtension;
+import org.apache.flink.datastream.api.extension.eventtime.strategy.EventTimeWatermarkGeneratorBuilder;
+import org.apache.flink.datastream.api.extension.window.strategy.WindowStrategy;
+import org.apache.flink.datastream.api.stream.KeyedPartitionStream;
+import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream;
+import org.apache.flink.streaming.api.functions.sink.PrintSink;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.util.ParameterTool;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.List;
+
+/**
+ * Flink application that reads messages from a text file and processes them
+ * using a gRPC service.
+ * This demonstrates the integration between Apache Flink and gRPC services.
+ */
+class KeyGenerator implements KeySelector {
+ @Override
+ public String getKey(Message message) {
+ return String.valueOf(message.getPayload().length % 4);
+ }
+}
+
+class MessageSerializer implements SerializationSchema {
+ @Override
+ public byte[] serialize(Message message) {
+ return message.getPayload();
+ }
+}
+
+public class FlinkGrpcApp {
+
+ private static final Logger LOG = LoggerFactory.getLogger(FlinkGrpcApp.class);
+
+ public static void main(String[] args) throws Exception {
+ ParameterTool parameters = ParameterTool.fromArgs(args);
+ String pipelineConfigFile = parameters.getRequired("pipeline-name");
+ PipelineParser parser = new PipelineParser();
+ Pipeline pipeline = parser.parseFile(pipelineConfigFile);
+ List steps = pipeline.getSteps();
+
+ List data = TestData.getMetrics();
+
+ // Set up the streaming execution environment
+ ExecutionEnvironment env = ExecutionEnvironment.getInstance();
+ env.setExecutionMode(RuntimeExecutionMode.STREAMING);
+
+ Source source = pipeline.getSource();
+
+ String bootstrapServers = String.join(",", (List) source.getConfig().get("bootstrap_servers"));
+ KafkaSource kafkaSource = KafkaSource.builder()
+ .setBootstrapServers(bootstrapServers)
+ .setTopics((String) source.getConfig().get("stream_name"))
+ .setGroupId("flink-grpc-group")
+ .setStartingOffsets(OffsetsInitializer.latest())
+ .setValueOnlyDeserializer(new SimpleStringSchema())
+ .build();
+
+ LOG.info("Kafka source: {} conencts to {}", kafkaSource, bootstrapServers);
+ // Create a data stream from a text file using Flink 2.1.0 API
+ // NonKeyedPartitionStream textStream = env.fromSource(
+ // DataStreamV2SourceUtils.fromData(data), "in memory list");
+
+ NonKeyedPartitionStream textStream = env
+ .fromSource(
+ DataStreamV2SourceUtils.wrapSource(kafkaSource),
+ "kafka-source");
+
+ KeyedPartitionStream messageStream = textStream
+ .process(new StringDeserializer("my_pipeline"))
+ .keyBy(new KeyGenerator());
+
+ EventTimeWatermarkGeneratorBuilder watermarkBuilder = EventTimeExtension
+ .newWatermarkGeneratorBuilder(Message::getTimestamp)
+ .withIdleness(Duration.ofSeconds(10))
+ .withMaxOutOfOrderTime(Duration.ofSeconds(4)) // set max out-of-order time
+ .periodicWatermark(Duration.ofMillis(2000));
+
+ KeyedPartitionStream delayedStream = messageStream
+ .process(new WatermarkEmitter())
+ .withName("message-delay-generator")
+ .keyBy(new KeyGenerator());
+
+ // Apply the gRPC processing function
+ KeyedPartitionStream watermarkedStream = delayedStream
+ .process(watermarkBuilder.buildAsProcessFunction())
+ .withName("watermark-generator")
+ .keyBy(new KeyGenerator());
+
+ //////////////// APPLICATION LOGIC ////////////////
+
+ KeyedPartitionStream processedStream = watermarkedStream;
+ for (int i = 0; i < steps.size(); i++) {
+ LOG.info("Applying step {} of {}", steps.get(i).getStepName(), i);
+ processedStream = processedStream
+ .process(EventTimeExtension.wrapProcessFunction(new GrpcMessageProcessor(i)))
+ .withName(String.format("processor-%s-%d", steps.get(i).getStepName(), i))
+ .keyBy(new KeyGenerator());
+ }
+
+ for (Sink sink : pipeline.getSinks().values()) {
+ String sinkBootstrapServers = String.join(",",
+ (List) sink.getConfig().get("bootstrap_servers"));
+ KafkaSink kafkaSink = KafkaSink.builder()
+ .setBootstrapServers(sinkBootstrapServers)
+ .setRecordSerializer(KafkaRecordSerializationSchema.builder()
+ .setTopic((String) sink.getConfig().get("stream_name"))
+ .setValueSerializationSchema(new MessageSerializer())
+ .build())
+ .build();
+
+ processedStream.toSink(DataStreamV2SinkUtils.wrapSink(kafkaSink)).withName(sink.getName());
+ }
+
+ // Add custom post-processing function after gRPC processing
+ // NonKeyedPartitionStream postProcessedStream = processedStream
+ // .process(BuiltinFuncs.window(
+ // WindowStrategy.tumbling(Duration.ofSeconds(2),
+ // WindowStrategy.EVENT_TIME),
+ // new WindowProcessing()));
+
+ // NonKeyedPartitionStream serializedStream =
+ // postProcessedStream.process(new StringSerializer());
+
+ // Print the processed messages to standard output
+ // processedStream.toSink(new WrappedSink<>(new
+ // PrintSink<>())).withName("print-sink");
+
+ // Execute the Flink job
+ LOG.info("Starting Flink gRPC application...");
+ env.execute("Flink gRPC Message Processing Job");
+ }
+
+}
diff --git a/flink_bridge/src/main/java/io/sentry/flink_bridge/GrpcCliApp.java b/flink_bridge/src/main/java/io/sentry/flink_bridge/GrpcCliApp.java
new file mode 100644
index 00000000..a9cf51f2
--- /dev/null
+++ b/flink_bridge/src/main/java/io/sentry/flink_bridge/GrpcCliApp.java
@@ -0,0 +1,155 @@
+package io.sentry.flink_bridge;
+
+import flink_worker.FlinkWorker.Message;
+import flink_worker.FlinkWorker.ProcessMessageRequest;
+import flink_worker.FlinkWorker.ProcessMessageResponse;
+import com.google.protobuf.ByteString;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Scanner;
+import java.util.List;
+
+/**
+ * Simple CLI application that demonstrates the gRPC client functionality.
+ * This program allows users to send messages to the FlinkWorker gRPC service
+ * and see the responses.
+ */
+public class GrpcCliApp {
+
+ private static final Logger LOG = LoggerFactory.getLogger(GrpcCliApp.class);
+ private static final String DEFAULT_HOST = "localhost";
+ private static final int DEFAULT_PORT = 50053;
+
+ public static void main(String[] args) {
+ // Parse command line arguments
+ String host = DEFAULT_HOST;
+ int port = DEFAULT_PORT;
+
+ if (args.length >= 1) {
+ host = args[0];
+ }
+ if (args.length >= 2) {
+ try {
+ port = Integer.parseInt(args[1]);
+ } catch (NumberFormatException e) {
+ LOG.error("Invalid port number: {}. Using default port {}", args[1], DEFAULT_PORT);
+ port = DEFAULT_PORT;
+ }
+ }
+
+ LOG.info("Starting gRPC CLI client connecting to {}:{}", host, port);
+
+ GrpcClient client = null;
+ try {
+ // Create the gRPC client
+ client = new GrpcClient(host, port);
+
+ // Check if the service is available
+ if (!client.isAvailable()) {
+ LOG.error("gRPC service is not available at {}:{}", host, port);
+ LOG.error("Make sure the Python gRPC server is running with:");
+ LOG.error("cd flink_worker && source .venv/bin/activate && python -m flink_worker.server --port {}",
+ port);
+ System.exit(1);
+ }
+
+ LOG.info("Successfully connected to gRPC service at {}:{}", host, port);
+
+ // Start interactive CLI
+ runInteractiveCli(client);
+
+ } catch (Exception e) {
+ LOG.error("Error in gRPC CLI application", e);
+ System.exit(1);
+ } finally {
+ if (client != null) {
+ client.shutdown();
+ }
+ }
+ }
+
+ /**
+ * Runs the interactive command-line interface.
+ */
+ private static void runInteractiveCli(GrpcClient client) {
+ Scanner scanner = new Scanner(System.in);
+ int messageCounter = 0;
+
+ System.out.println("\n=== Flink Worker gRPC CLI Client ===");
+ System.out.println("Commands:");
+ System.out.println(" - Send a message to the gRPC service");
+ System.out.println(" help - Show this help message");
+ System.out.println(" quit/exit - Exit the application");
+ System.out.println(" test - Send a test message");
+ System.out.println();
+
+ while (true) {
+ System.out.print("grpc> ");
+ String input = scanner.nextLine().trim();
+
+ if (input.isEmpty()) {
+ continue;
+ }
+
+ if (input.equalsIgnoreCase("quit") || input.equalsIgnoreCase("exit")) {
+ System.out.println("Goodbye!");
+ break;
+ }
+
+ if (input.equalsIgnoreCase("help")) {
+ System.out.println("Commands:");
+ System.out.println(" - Send a message to the gRPC service");
+ System.out.println(" help - Show this help message");
+ System.out.println(" quit/exit - Exit the application");
+ System.out.println(" test - Send a test message");
+ continue;
+ }
+
+ if (input.equalsIgnoreCase("test")) {
+ input = "This is a test message from the CLI client";
+ System.out.println("Sending test message: " + input);
+ }
+
+ try {
+ // Send the message to the gRPC service
+ List processedMessages = sendMessage(client, input, messageCounter++);
+
+ // Display the response
+ System.out.println("Response received:");
+ for (Message msg : processedMessages) {
+ String payload = new String(msg.getPayload().toByteArray(), StandardCharsets.UTF_8);
+ System.out.println(" - Payload: " + payload);
+ System.out.println(" Headers: " + msg.getHeadersMap());
+ System.out.println(" Timestamp: " + msg.getTimestamp());
+ }
+
+ } catch (Exception e) {
+ LOG.error("Error processing message: {}", input, e);
+ System.out.println("Error: " + e.getMessage());
+ }
+
+ System.out.println();
+ }
+
+ scanner.close();
+ }
+
+ /**
+ * Sends a message to the gRPC service.
+ */
+ private static List sendMessage(GrpcClient client, String messageText, int segmentId) {
+ // Create the message
+ Message message = Message.newBuilder()
+ .setPayload(ByteString.copyFrom(messageText.getBytes(StandardCharsets.UTF_8)))
+ .putHeaders("source", "cli")
+ .putHeaders("timestamp", String.valueOf(System.currentTimeMillis()))
+ .putHeaders("message_id", String.valueOf(segmentId))
+ .setTimestamp(System.currentTimeMillis())
+ .build();
+
+ // Send to gRPC service
+ return client.processMessage(message, 0);
+ }
+}
diff --git a/flink_bridge/src/main/java/io/sentry/flink_bridge/GrpcClient.java b/flink_bridge/src/main/java/io/sentry/flink_bridge/GrpcClient.java
new file mode 100644
index 00000000..fe9b4930
--- /dev/null
+++ b/flink_bridge/src/main/java/io/sentry/flink_bridge/GrpcClient.java
@@ -0,0 +1,210 @@
+package io.sentry.flink_bridge;
+
+import flink_worker.FlinkWorker.Message;
+import flink_worker.FlinkWorker.ProcessMessageRequest;
+import flink_worker.FlinkWorker.ProcessMessageResponse;
+import flink_worker.FlinkWorker.ProcessWatermarkRequest;
+import flink_worker.FlinkWorker.AddToWindowRequest;
+import flink_worker.FlinkWorker.TriggerWindowRequest;
+import flink_worker.FlinkWorker.WindowIdentifier;
+import flink_worker.FlinkWorkerServiceGrpc;
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * gRPC client for communicating with the FlinkWorkerService.
+ * This client handles the connection and communication with the gRPC service.
+ */
+public class GrpcClient {
+
+ private static final Logger LOG = LoggerFactory.getLogger(GrpcClient.class);
+
+ private final ManagedChannel channel;
+ private final FlinkWorkerServiceGrpc.FlinkWorkerServiceBlockingStub blockingStub;
+
+ /**
+ * Constructs a gRPC client for the specified host and port.
+ *
+ * @param host the hostname of the gRPC service
+ * @param port the port number of the gRPC service
+ */
+ public GrpcClient(String host, int port) {
+ LOG.info("Creating gRPC client for {}:{}", host, port);
+ try {
+ // Use DNS resolver with the format dns:///host:port
+ String target = "dns:///" + host + ":" + port;
+ LOG.info("Using target: {}", target);
+
+ ManagedChannel channel = ManagedChannelBuilder.forTarget(target)
+ .usePlaintext()
+ .maxInboundMessageSize(1024 * 1024)
+ .build();
+
+ this.channel = channel;
+ this.blockingStub = FlinkWorkerServiceGrpc.newBlockingStub(channel);
+ LOG.info("gRPC client created successfully");
+ } catch (Exception e) {
+ LOG.error("Failed to create gRPC client for {}:{}", host, port, e);
+ throw new RuntimeException("Failed to create gRPC client", e);
+ }
+ }
+
+ /**
+ * Sends a message processing request to the gRPC service.
+ *
+ * @param message the message to process
+ * @return a list of processed messages
+ * @throws RuntimeException if the gRPC call fails
+ */
+ public List processMessage(Message message, int segment_id) {
+ try {
+ // Construct the request internally
+ ProcessMessageRequest request = ProcessMessageRequest.newBuilder()
+ .setMessage(message)
+ .setSegmentId(segment_id)
+ .build();
+
+ LOG.debug("Sending request to gRPC service: {}", request);
+ ProcessMessageResponse response = blockingStub.processMessage(request);
+ LOG.debug("Received response from gRPC service: {} messages",
+ response.getMessagesCount());
+ return response.getMessagesList();
+ } catch (Exception e) {
+ LOG.error("Error calling gRPC service", e);
+ throw new RuntimeException("Failed to process message via gRPC", e);
+ }
+ }
+
+ /**
+ * Sends a watermark processing request to the gRPC service.
+ *
+ * @param timestamp the watermark timestamp
+ * @param headers optional headers for the watermark
+ * @param segmentId the segment ID for the watermark
+ * @return a list of processed messages
+ * @throws RuntimeException if the gRPC call fails
+ */
+ public List processWatermark(long timestamp, java.util.Map headers, int segment_id) {
+ try {
+ // Construct the request internally
+ ProcessWatermarkRequest request = ProcessWatermarkRequest.newBuilder()
+ .setTimestamp(timestamp)
+ .putAllHeaders(headers != null ? headers : new java.util.HashMap<>())
+ .setSegmentId(segment_id)
+ .build();
+
+ LOG.debug("Sending watermark request to gRPC service: {}", request);
+ ProcessMessageResponse response = blockingStub.processWatermark(request);
+ LOG.debug("Received watermark response from gRPC service: {} messages",
+ response.getMessagesCount());
+ return response.getMessagesList();
+ } catch (Exception e) {
+ LOG.error("Error calling gRPC service for watermark", e);
+ throw new RuntimeException("Failed to process watermark via gRPC", e);
+ }
+ }
+
+ /**
+ * Adds a message to a window.
+ *
+ * @param message the message to add to the window
+ * @param segmentId the segment ID
+ * @param partitionKey the partition key for the window
+ * @param windowStartTime the window start time
+ * @throws RuntimeException if the gRPC call fails
+ */
+ public void addToWindow(Message message, int segmentId, String partitionKey, long windowStartTime) {
+ try {
+ // Construct the window identifier
+ WindowIdentifier windowId = WindowIdentifier.newBuilder()
+ .setPartitionKey(partitionKey)
+ .setWindowStartTime(windowStartTime)
+ .build();
+
+ // Construct the request
+ AddToWindowRequest request = AddToWindowRequest.newBuilder()
+ .setMessage(message)
+ .setSegmentId(segmentId)
+ .setWindowId(windowId)
+ .build();
+
+ LOG.debug("Sending add to window request: {}", request);
+ blockingStub.addToWindow(request);
+ LOG.debug("Successfully added message to window");
+ } catch (Exception e) {
+ LOG.error("Error adding message to window", e);
+ throw new RuntimeException("Failed to add message to window via gRPC", e);
+ }
+ }
+
+ /**
+ * Triggers a window and returns the accumulated messages.
+ *
+ * @param segmentId the segment ID
+ * @param partitionKey the partition key for the window
+ * @param windowStartTime the window start time
+ * @return a list of accumulated messages from the window
+ * @throws RuntimeException if the gRPC call fails
+ */
+ public List triggerWindow(int segmentId, String partitionKey, long windowStartTime) {
+ try {
+ // Construct the window identifier
+ WindowIdentifier windowId = WindowIdentifier.newBuilder()
+ .setPartitionKey(partitionKey)
+ .setWindowStartTime(windowStartTime)
+ .build();
+
+ // Construct the request
+ TriggerWindowRequest request = TriggerWindowRequest.newBuilder()
+ .setWindowId(windowId)
+ .setSegmentId(segmentId)
+ .build();
+
+ LOG.debug("Sending trigger window request: {}", request);
+ ProcessMessageResponse response = blockingStub.triggerWindow(request);
+ LOG.debug("Received trigger window response: {} messages",
+ response.getMessagesCount());
+ return response.getMessagesList();
+ } catch (Exception e) {
+ LOG.error("Error triggering window", e);
+ throw new RuntimeException("Failed to trigger window via gRPC", e);
+ }
+ }
+
+ /**
+ * Shuts down the gRPC client and closes the channel.
+ * This method should be called when the client is no longer needed.
+ */
+ public void shutdown() {
+ try {
+ if (channel != null && !channel.isShutdown()) {
+ channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
+ LOG.info("gRPC client shutdown completed");
+ }
+ } catch (InterruptedException e) {
+ LOG.warn("Interrupted while shutting down gRPC client", e);
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ /**
+ * Checks if the gRPC client is available and responsive.
+ *
+ * @return true if the service is available, false otherwise
+ */
+ public boolean isAvailable() {
+ try {
+ // Try to send a simple request to check availability
+ // For now, we'll just check if the channel is ready
+ return channel != null && !channel.isShutdown() && !channel.isTerminated();
+ } catch (Exception e) {
+ LOG.debug("Service availability check failed", e);
+ return false;
+ }
+ }
+}
diff --git a/flink_bridge/src/main/java/io/sentry/flink_bridge/GrpcMessageProcessor.java b/flink_bridge/src/main/java/io/sentry/flink_bridge/GrpcMessageProcessor.java
new file mode 100644
index 00000000..5bcb0490
--- /dev/null
+++ b/flink_bridge/src/main/java/io/sentry/flink_bridge/GrpcMessageProcessor.java
@@ -0,0 +1,109 @@
+package io.sentry.flink_bridge;
+
+import org.apache.flink.datastream.api.context.NonPartitionedContext;
+import org.apache.flink.datastream.api.context.PartitionedContext;
+import org.apache.flink.datastream.api.extension.eventtime.function.OneInputEventTimeStreamProcessFunction;
+import org.apache.flink.datastream.api.extension.eventtime.timer.EventTimeManager;
+import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction;
+import org.apache.flink.datastream.api.common.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import flink_worker.FlinkWorker;
+
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * ProcessFunction that sends messages to the gRPC service for processing.
+ * This implements the OneInputStreamProcessFunction pattern for Flink
+ * DataStream API.
+ */
+public class GrpcMessageProcessor implements OneInputEventTimeStreamProcessFunction {
+
+ private static final Logger LOG = LoggerFactory.getLogger(GrpcMessageProcessor.class);
+ protected GrpcClient grpcClient;
+ private int segment_id;
+ private EventTimeManager eventTimeManager;
+
+ public GrpcMessageProcessor(int segment_id) {
+ this.segment_id = segment_id;
+ }
+
+ @Override
+ public void open(NonPartitionedContext ctx) throws Exception {
+ // Initialize the gRPC client
+ grpcClient = new GrpcClient("localhost", 50051);
+ LOG.info("gRPC client initialized");
+ }
+
+ @Override
+ public void initEventTimeProcessFunction(EventTimeManager eventTimeManager) {
+ // get event time manager instance
+ this.eventTimeManager = eventTimeManager;
+ }
+
+ @Override
+ public void processRecord(
+ Message record,
+ Collector out,
+ PartitionedContext ctx) throws Exception {
+ try {
+ LOG.info("Processing message for TS {}", record.getTimestamp());
+
+ // Send to gRPC service and get response
+ List processedMessages = grpcClient.processMessage(record.toProto(), this.segment_id);
+
+ // Process the response and output processed messages
+ for (FlinkWorker.Message processedMsg : processedMessages) {
+ // String processedContent = new String(processedMsg.getPayload().toByteArray(),
+ // StandardCharsets.UTF_8);
+ // LOG.info("Received processed message: {}", processedMsg);
+ out.collect(new Message(processedMsg));
+
+ }
+
+ } catch (Exception e) {
+ LOG.error("Error processing message: {}", record, e);
+ // In a production environment, you might want to handle errors differently
+ // For now, we'll just log the error and continue
+ }
+ }
+
+ @Override
+ public void onEventTimer(
+ long timestamp,
+ Collector output,
+ PartitionedContext ctx) {
+ // write your event timer callback here
+ LOG.info("Received trigger for time {}", timestamp);
+ }
+
+ @Override
+ public void onEventTimeWatermark(
+ long watermarkTimestamp,
+ Collector output,
+ NonPartitionedContext ctx)
+ throws Exception {
+ // sense event time watermark arrival
+ Map headers = new HashMap<>();
+ headers.put("job_name", ctx.getJobInfo().getJobName());
+ List processedMessages = grpcClient.processWatermark(watermarkTimestamp, headers,
+ this.segment_id);
+
+ for (FlinkWorker.Message processedMsg : processedMessages) {
+ output.collect(new Message(processedMsg));
+ }
+
+ LOG.info("Received watermark for time {}", watermarkTimestamp);
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (grpcClient != null) {
+ grpcClient.shutdown();
+
+ }
+ }
+}
diff --git a/flink_bridge/src/main/java/io/sentry/flink_bridge/Message.java b/flink_bridge/src/main/java/io/sentry/flink_bridge/Message.java
new file mode 100644
index 00000000..1b9a7d56
--- /dev/null
+++ b/flink_bridge/src/main/java/io/sentry/flink_bridge/Message.java
@@ -0,0 +1,121 @@
+package io.sentry.flink_bridge;
+
+import flink_worker.FlinkWorker;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * POJO Message class that follows Flink's serialization rules.
+ * This class can be serialized by Flink's POJO serializer.
+ */
+public class Message {
+
+ private byte[] payload;
+ private Map headers;
+ private long timestamp;
+
+ /**
+ * Default constructor required by Flink's POJO serializer.
+ */
+ public Message() {
+ this.payload = new byte[0];
+ this.headers = new HashMap<>();
+ this.timestamp = 0L;
+ }
+
+ /**
+ * Constructor with all fields.
+ *
+ * @param payload the message payload
+ * @param headers the message headers
+ * @param timestamp the message timestamp
+ */
+ public Message(byte[] payload, Map headers, long timestamp) {
+ this.payload = payload != null ? payload : new byte[0];
+ this.headers = headers != null ? new HashMap<>(headers) : new HashMap<>();
+ this.timestamp = timestamp;
+ }
+
+ /**
+ * Constructor that creates Message from FlinkWorker.Message.
+ *
+ * @param protoMessage the protobuf message to convert from
+ */
+ public Message(FlinkWorker.Message protoMessage) {
+ this.payload = protoMessage.getPayload().toByteArray();
+ this.headers = new HashMap<>(protoMessage.getHeadersMap());
+ this.timestamp = protoMessage.getTimestamp();
+ }
+
+ /**
+ * Serialization method that produces FlinkWorker.Message.
+ *
+ * @return the protobuf message
+ */
+ public FlinkWorker.Message toProto() {
+ FlinkWorker.Message.Builder builder = FlinkWorker.Message.newBuilder();
+ builder.setPayload(com.google.protobuf.ByteString.copyFrom(this.payload));
+ builder.putAllHeaders(this.headers);
+ builder.setTimestamp(this.timestamp);
+ return builder.build();
+ }
+
+ // Getters and setters required by Flink's POJO serializer
+
+ public byte[] getPayload() {
+ return payload;
+ }
+
+ public void setPayload(byte[] payload) {
+ this.payload = payload != null ? payload : new byte[0];
+ }
+
+ public Map getHeaders() {
+ return headers;
+ }
+
+ public void setHeaders(Map headers) {
+ this.headers = headers != null ? new HashMap<>(headers) : new HashMap<>();
+ }
+
+ public long getTimestamp() {
+ return timestamp;
+ }
+
+ public void setTimestamp(long timestamp) {
+ this.timestamp = timestamp;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null || getClass() != obj.getClass())
+ return false;
+
+ Message message = (Message) obj;
+
+ if (timestamp != message.timestamp)
+ return false;
+ if (!java.util.Arrays.equals(payload, message.payload))
+ return false;
+ return headers.equals(message.headers);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = java.util.Arrays.hashCode(payload);
+ result = 31 * result + headers.hashCode();
+ result = 31 * result + (int) (timestamp ^ (timestamp >>> 32));
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return "Message{" +
+ "payload=" + java.util.Arrays.toString(payload) +
+ ", headers=" + headers +
+ ", timestamp=" + timestamp +
+ '}';
+ }
+}
diff --git a/flink_bridge/src/main/java/io/sentry/flink_bridge/Pipeline.java b/flink_bridge/src/main/java/io/sentry/flink_bridge/Pipeline.java
new file mode 100644
index 00000000..ecb7a4b2
--- /dev/null
+++ b/flink_bridge/src/main/java/io/sentry/flink_bridge/Pipeline.java
@@ -0,0 +1,125 @@
+package io.sentry.flink_bridge;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Represents a complete pipeline configuration.
+ * Contains sources, steps, and sinks.
+ */
+public class Pipeline {
+ private Map sources;
+ private List steps;
+ private Map sinks;
+
+ /**
+ * Default constructor required for YAML deserialization.
+ */
+ public Pipeline() {
+ }
+
+ /**
+ * Constructor with parameters.
+ *
+ * @param sources the map of source configurations
+ * @param steps the list of pipeline steps
+ * @param sinks the map of sink configurations
+ */
+ public Pipeline(Map sources, List steps, Map sinks) {
+ this.sources = sources;
+ this.steps = steps;
+ this.sinks = sinks;
+ }
+
+ /**
+ * Gets the sources map.
+ *
+ * @return the sources map
+ */
+ public Map getSources() {
+ return sources;
+ }
+
+ public Source getSource() {
+ assert sources.size() == 1;
+ return sources.values().iterator().next();
+ }
+
+ /**
+ * Sets the sources map.
+ *
+ * @param sources the sources map to set
+ */
+ public void setSources(Map sources) {
+ this.sources = sources;
+ }
+
+ /**
+ * Gets the steps list.
+ *
+ * @return the steps list
+ */
+ public List getSteps() {
+ return steps;
+ }
+
+ /**
+ * Sets the steps list.
+ *
+ * @param steps the steps list to set
+ */
+ public void setSteps(List steps) {
+ this.steps = steps;
+ }
+
+ /**
+ * Gets the sinks map.
+ *
+ * @return the sinks map
+ */
+ public Map getSinks() {
+ return sinks;
+ }
+
+ /**
+ * Sets the sinks map.
+ *
+ * @param sinks the sinks map to set
+ */
+ public void setSinks(Map sinks) {
+ this.sinks = sinks;
+ }
+
+ @Override
+ public String toString() {
+ return "Pipeline{" +
+ "sources=" + sources +
+ ", steps=" + steps +
+ ", sinks=" + sinks +
+ '}';
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o)
+ return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ Pipeline that = (Pipeline) o;
+
+ if (sources != null ? !sources.equals(that.sources) : that.sources != null)
+ return false;
+ if (steps != null ? !steps.equals(that.steps) : that.steps != null)
+ return false;
+ return sinks != null ? sinks.equals(that.sinks) : that.sinks == null;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = sources != null ? sources.hashCode() : 0;
+ result = 31 * result + (steps != null ? steps.hashCode() : 0);
+ result = 31 * result + (sinks != null ? sinks.hashCode() : 0);
+ return result;
+ }
+}
diff --git a/flink_bridge/src/main/java/io/sentry/flink_bridge/PipelineParser.java b/flink_bridge/src/main/java/io/sentry/flink_bridge/PipelineParser.java
new file mode 100644
index 00000000..922af4a7
--- /dev/null
+++ b/flink_bridge/src/main/java/io/sentry/flink_bridge/PipelineParser.java
@@ -0,0 +1,183 @@
+package io.sentry.flink_bridge;
+
+import org.yaml.snakeyaml.Yaml;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.InputStream;
+import java.util.List;
+import java.util.Map;
+import java.util.ArrayList;
+import java.util.HashMap;
+
+/**
+ * Parser for YAML pipeline configuration files.
+ * Reads YAML files and converts them to Pipeline objects.
+ * Supports the new format with sources, steps, and sinks.
+ */
+public class PipelineParser {
+
+ private final Yaml yaml;
+
+ /**
+ * Default constructor that initializes the YAML parser.
+ */
+ public PipelineParser() {
+ this.yaml = new Yaml();
+ }
+
+ /**
+ * Parses a YAML file and returns a Pipeline object.
+ *
+ * @param filename the path to the YAML file
+ * @return a Pipeline object
+ * @throws FileNotFoundException if the file is not found
+ * @throws RuntimeException if there's an error parsing the YAML
+ */
+ public Pipeline parseFile(String filename) throws FileNotFoundException {
+ try (FileInputStream inputStream = new FileInputStream(new File(filename))) {
+ return parseInputStream(inputStream);
+ } catch (FileNotFoundException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new RuntimeException("Error parsing YAML file: " + filename, e);
+ }
+ }
+
+ /**
+ * Parses YAML content from an InputStream and returns a Pipeline object.
+ *
+ * @param inputStream the InputStream containing YAML content
+ * @return a Pipeline object
+ * @throws RuntimeException if there's an error parsing the YAML
+ */
+ public Pipeline parseInputStream(InputStream inputStream) {
+ try {
+ Object rawPipeline = yaml.load(inputStream);
+
+ if (rawPipeline == null) {
+ throw new RuntimeException("YAML file is empty or could not be parsed");
+ }
+
+ if (!(rawPipeline instanceof Map)) {
+ throw new RuntimeException("YAML file must contain a map with sources, steps, and/or sinks");
+ }
+
+ @SuppressWarnings("unchecked")
+ Map pipelineMap = (Map) rawPipeline;
+
+ return parsePipeline(pipelineMap);
+ } catch (Exception e) {
+ throw new RuntimeException("Error parsing YAML content", e);
+ }
+ }
+
+ /**
+ * Parses the pipeline configuration.
+ *
+ * @param pipelineMap the parsed YAML map
+ * @return a Pipeline object
+ */
+ private Pipeline parsePipeline(Map pipelineMap) {
+ Map sources = parseSources(pipelineMap.get("sources"));
+ List steps = parseSteps(pipelineMap.get("steps"));
+ Map sinks = parseSinks(pipelineMap.get("sinks"));
+
+ return new Pipeline(sources, steps, sinks);
+ }
+
+ /**
+ * Parses sources from the raw YAML data.
+ *
+ * @param rawSources the raw sources data
+ * @return a map of source configurations
+ */
+ @SuppressWarnings("unchecked")
+ private Map parseSources(Object rawSources) {
+ Map sources = new HashMap<>();
+
+ if (rawSources == null) {
+ return sources;
+ }
+
+ Map sourcesMap = (Map) rawSources;
+ for (Map.Entry entry : sourcesMap.entrySet()) {
+ String sourceName = entry.getKey();
+ @SuppressWarnings("unchecked")
+ Map sourceConfig = (Map) entry.getValue();
+
+ if (sourceConfig == null) {
+ sourceConfig = new HashMap<>();
+ }
+
+ sources.put(sourceName, new Source(sourceName, sourceConfig));
+ }
+
+ return sources;
+ }
+
+ /**
+ * Parses sinks from the raw YAML data.
+ *
+ * @param rawSinks the raw sinks data
+ * @return a map of sink configurations
+ */
+ @SuppressWarnings("unchecked")
+ private Map parseSinks(Object rawSinks) {
+ Map sinks = new HashMap<>();
+
+ if (rawSinks == null) {
+ return sinks;
+ }
+
+ Map sinksMap = (Map) rawSinks;
+ for (Map.Entry entry : sinksMap.entrySet()) {
+ String sinkName = entry.getKey();
+ @SuppressWarnings("unchecked")
+ Map sinkConfig = (Map) entry.getValue();
+
+ if (sinkConfig == null) {
+ sinkConfig = new HashMap<>();
+ }
+
+ sinks.put(sinkName, new Sink(sinkName, sinkConfig));
+ }
+
+ return sinks;
+ }
+
+ /**
+ * Parses steps from the raw YAML data.
+ *
+ * @param rawSteps the raw steps data
+ * @return a list of PipelineStep objects
+ */
+ @SuppressWarnings("unchecked")
+ private List parseSteps(Object rawSteps) {
+ List steps = new ArrayList<>();
+
+ if (rawSteps == null) {
+ return steps;
+ }
+
+ List