From 5f6baf208aadedcc023287c9aae3e570cac6915b Mon Sep 17 00:00:00 2001 From: Filippo Pacifici Date: Mon, 18 Aug 2025 09:40:21 -0700 Subject: [PATCH 01/19] Initial implementation of GRPC server --- .pre-commit-config.yaml | 5 + flink_worker/README.md | 145 +++++++++++++ flink_worker/flink_worker/__init__.py | 8 + flink_worker/flink_worker/client.py | 195 ++++++++++++++++++ flink_worker/flink_worker/flink_worker_pb2.py | 45 ++++ .../flink_worker/flink_worker_pb2_grpc.py | 101 +++++++++ flink_worker/flink_worker/server.py | 74 +++++++ flink_worker/flink_worker/service.py | 85 ++++++++ flink_worker/flink_worker/test_service.py | 85 ++++++++ flink_worker/pyproject.toml | 59 ++++++ protos/flink_worker.proto | 27 +++ 11 files changed, 829 insertions(+) create mode 100644 flink_worker/README.md create mode 100644 flink_worker/flink_worker/__init__.py create mode 100644 flink_worker/flink_worker/client.py create mode 100644 flink_worker/flink_worker/flink_worker_pb2.py create mode 100644 flink_worker/flink_worker/flink_worker_pb2_grpc.py create mode 100644 flink_worker/flink_worker/server.py create mode 100644 flink_worker/flink_worker/service.py create mode 100644 flink_worker/flink_worker/test_service.py create mode 100644 flink_worker/pyproject.toml create mode 100644 protos/flink_worker.proto diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 084d408e..21c9b36f 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -31,6 +31,11 @@ repos: hooks: - id: flake8 language_version: python3.11 + exclude: | + (?x)^( + flink_worker/.*/flink_worker_pb2\.py$| + flink_worker/.*/flink_worker_pb2_grpc\.py$ + )$ - repo: https://github.com/pycqa/isort rev: 6.0.0 hooks: diff --git a/flink_worker/README.md b/flink_worker/README.md new file mode 100644 index 00000000..0243ca80 --- /dev/null +++ b/flink_worker/README.md @@ -0,0 +1,145 @@ +# Flink Worker gRPC Service + +A Python gRPC service for processing messages in a Flink environment. + +## Features + +- **gRPC Service**: Implements the `FlinkWorkerService` with a `ProcessMessage` method +- **Message Processing**: Takes a message with payload, headers, and timestamp, plus a segment_id +- **Extensible**: The Message class can be subclassed for custom functionality +- **CLI Client**: Includes a test client for easy testing + +## Installation + +1. Create a virtual environment: +```bash +python -m venv .venv +source .venv/bin/activate # On Windows: .venv\Scripts\activate +``` + +2. Install dependencies: +```bash +pip install -e . +``` + +## Usage + +### Starting the Server + +```bash +# Start the server on default port 50051 +python -m flink_worker.server + +# Start on a custom port +python -m flink_worker.server --port 50052 + +# Start on a specific host +python -m flink_worker.server --host 0.0.0.0 --port 50051 +``` + +### Using the CLI Client + +```bash +# Basic usage with defaults +python -m flink_worker.client + +# Custom message +python -m flink_worker.client --payload "Custom message" --segment-id 42 + +# Custom headers +python -m flink_worker.client --headers '{"source": "test", "priority": "high"}' + +# Connect to remote server +python -m flink_worker.client --host 192.168.1.100 --port 50051 +``` + +### Programmatic Usage + +```python +import grpc +from flink_worker_pb2 import Message, ProcessMessageRequest +from flink_worker_pb2_grpc import FlinkWorkerServiceStub + +# Create a channel +channel = grpc.insecure_channel('localhost:50051') +stub = FlinkWorkerServiceStub(channel) + +# Create a message +message = Message( + payload=b"Hello, World!", + headers={"source": "python", "type": "test"}, + timestamp=1234567890 +) + +# Create request +request = ProcessMessageRequest( + message=message, + segment_id=1 +) + +# Process the message +response = stub.ProcessMessage(request) + +# Handle response +for msg in response.messages: + print(f"Processed message: {msg.payload}") +``` + +## API Reference + +### Message + +- `payload`: bytes - The message content +- `headers`: map - Key-value metadata +- `timestamp`: int64 - Unix timestamp + +### ProcessMessageRequest + +- `message`: Message - The message to process +- `segment_id`: uint32 - Segment identifier + +### ProcessMessageResponse + +- `messages`: repeated Message - List of processed messages + +## Development + +### Running Tests + +```bash +pytest tests/ +``` + +### Code Formatting + +```bash +black . +isort . +``` + +### Type Checking + +```bash +mypy . +``` + +## Architecture + +The service is designed to be extensible: + +1. **Base Message Class**: The protobuf-generated `Message` class can be subclassed +2. **Service Implementation**: `FlinkWorkerService` handles the gRPC calls +3. **Processing Logic**: Currently returns the original message plus a processed copy +4. **Error Handling**: Graceful error handling with proper gRPC status codes + +## Contributing + +1. Fork the repository +2. Create a feature branch +3. Make your changes +4. Add tests +5. Submit a pull request + +## License + +MIT License - see LICENSE file for details. diff --git a/flink_worker/flink_worker/__init__.py b/flink_worker/flink_worker/__init__.py new file mode 100644 index 00000000..1d940c1c --- /dev/null +++ b/flink_worker/flink_worker/__init__.py @@ -0,0 +1,8 @@ +""" +Flink Worker gRPC Service Package + +This package provides a gRPC service for processing messages in a Flink environment. +""" + +__version__ = "0.1.0" +__all__ = ["FlinkWorkerService", "Message", "ProcessMessageRequest", "ProcessMessageResponse"] diff --git a/flink_worker/flink_worker/client.py b/flink_worker/flink_worker/client.py new file mode 100644 index 00000000..f37c3ccb --- /dev/null +++ b/flink_worker/flink_worker/client.py @@ -0,0 +1,195 @@ +""" +Flink Worker gRPC Client + +This module provides a CLI client for testing the Flink Worker gRPC service. +""" + +import argparse +import json +import logging +import sys +from typing import Dict + +import grpc + +from .flink_worker_pb2 import Message, ProcessMessageRequest +from .flink_worker_pb2_grpc import FlinkWorkerServiceStub + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" +) +logger = logging.getLogger(__name__) + + +class FlinkWorkerClient: + """Client for interacting with the Flink Worker gRPC service.""" + + def __init__(self, host: str = "localhost", port: int = 50051): + """ + Initialize the client. + + Args: + host: The host address of the gRPC server + port: The port of the gRPC server + """ + self.address = f"{host}:{port}" + self.channel = None + self.stub = None + + def connect(self) -> None: + """Establish connection to the gRPC server.""" + try: + self.channel = grpc.insecure_channel(self.address) + self.stub = FlinkWorkerServiceStub(self.channel) + logger.info(f"Connected to Flink Worker service at {self.address}") + except Exception as e: + logger.error(f"Failed to connect to {self.address}: {e}") + raise + + def disconnect(self) -> None: + """Close the connection to the gRPC server.""" + if self.channel: + self.channel.close() + logger.info("Disconnected from Flink Worker service") + + def process_message( + self, + payload: bytes, + headers: Dict[str, str], + timestamp: int, + segment_id: int + ) -> None: + """ + Send a message to the service for processing. + + Args: + payload: The message payload as bytes + headers: A dictionary of string key-value pairs + timestamp: Unix timestamp + segment_id: The segment ID as an unsigned integer + """ + try: + # Create the message + message = Message( + payload=payload, + headers=headers, + timestamp=timestamp + ) + + # Create the request + request = ProcessMessageRequest( + message=message, + segment_id=segment_id + ) + + logger.info(f"Sending message for segment {segment_id}") + logger.debug(f"Payload length: {len(payload)}") + logger.debug(f"Headers: {headers}") + logger.debug(f"Timestamp: {timestamp}") + + # Make the gRPC call + response = self.stub.ProcessMessage(request) + + logger.info(f"Received {len(response.messages)} processed messages") + + # Display the results + for i, msg in enumerate(response.messages): + print(f"\n--- Processed Message {i + 1} ---") + print(f"Payload length: {len(msg.payload)}") + print(f"Headers: {dict(msg.headers)}") + print(f"Timestamp: {msg.timestamp}") + + # Try to decode payload as text if it looks like text + try: + payload_text = msg.payload.decode('utf-8') + if payload_text.isprintable(): + print(f"Payload (text): {payload_text}") + else: + print(f"Payload (hex): {msg.payload.hex()[:100]}...") + except UnicodeDecodeError: + print(f"Payload (hex): {msg.payload.hex()[:100]}...") + + except grpc.RpcError as e: + logger.error(f"gRPC error: {e.code()} - {e.details()}") + except Exception as e: + logger.error(f"Error processing message: {e}") + + +def main() -> None: + """Main entry point for the Flink Worker gRPC client.""" + parser = argparse.ArgumentParser(description="Flink Worker gRPC Client") + parser.add_argument( + "--host", + type=str, + default="localhost", + help="Host address of the gRPC server (default: localhost)" + ) + parser.add_argument( + "--port", + type=int, + default=50051, + help="Port of the gRPC server (default: 50051)" + ) + parser.add_argument( + "--payload", + type=str, + default="Hello, Flink Worker!", + help="Message payload (default: 'Hello, Flink Worker!')" + ) + parser.add_argument( + "--headers", + type=str, + default='{"source": "cli", "type": "test"}', + help="JSON string of headers (default: '{\"source\": \"cli\", \"type\": \"test\"}')" + ) + parser.add_argument( + "--timestamp", + type=int, + help="Unix timestamp (default: current time)" + ) + parser.add_argument( + "--segment-id", + type=int, + default=1, + help="Segment ID (default: 1)" + ) + + args = parser.parse_args() + + # Parse headers JSON + try: + headers = json.loads(args.headers) + if not isinstance(headers, dict): + raise ValueError("Headers must be a JSON object") + except json.JSONDecodeError as e: + logger.error(f"Invalid headers JSON: {e}") + sys.exit(1) + except ValueError as e: + logger.error(f"Invalid headers: {e}") + sys.exit(1) + + # Use current timestamp if not provided + if args.timestamp is None: + import time + args.timestamp = int(time.time()) + + # Convert payload to bytes + payload = args.payload.encode('utf-8') + + # Create and use the client + client = FlinkWorkerClient(args.host, args.port) + + try: + client.connect() + client.process_message(payload, headers, args.timestamp, args.segment_id) + except Exception as e: + logger.error(f"Client error: {e}") + sys.exit(1) + finally: + client.disconnect() + + +if __name__ == "__main__": + main() diff --git a/flink_worker/flink_worker/flink_worker_pb2.py b/flink_worker/flink_worker/flink_worker_pb2.py new file mode 100644 index 00000000..7dbb3246 --- /dev/null +++ b/flink_worker/flink_worker/flink_worker_pb2.py @@ -0,0 +1,45 @@ +# Generated by the protocol buffer compiler. DO NOT EDIT! +# NO CHECKED-IN PROTOBUF GENCODE +# source: flink_worker.proto +# Protobuf Python Version: 6.31.1 +"""Generated protocol buffer code.""" +from google.protobuf import descriptor as _descriptor +from google.protobuf import descriptor_pool as _descriptor_pool +from google.protobuf import runtime_version as _runtime_version +from google.protobuf import symbol_database as _symbol_database +from google.protobuf.internal import builder as _builder +_runtime_version.ValidateProtobufRuntimeVersion( + _runtime_version.Domain.PUBLIC, + 6, + 31, + 1, + '', + 'flink_worker.proto' +) +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + + + +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x12\x66link_worker.proto\x12\x0c\x66link_worker\"\x92\x01\n\x07Message\x12\x0f\n\x07payload\x18\x01 \x01(\x0c\x12\x33\n\x07headers\x18\x02 \x03(\x0b\x32\".flink_worker.Message.HeadersEntry\x12\x11\n\ttimestamp\x18\x03 \x01(\x03\x1a.\n\x0cHeadersEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"S\n\x15ProcessMessageRequest\x12&\n\x07message\x18\x01 \x01(\x0b\x32\x15.flink_worker.Message\x12\x12\n\nsegment_id\x18\x02 \x01(\r\"A\n\x16ProcessMessageResponse\x12\'\n\x08messages\x18\x01 \x03(\x0b\x32\x15.flink_worker.Message2q\n\x12\x46linkWorkerService\x12[\n\x0eProcessMessage\x12#.flink_worker.ProcessMessageRequest\x1a$.flink_worker.ProcessMessageResponseb\x06proto3') + +_globals = globals() +_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) +_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'flink_worker_pb2', _globals) +if not _descriptor._USE_C_DESCRIPTORS: + DESCRIPTOR._loaded_options = None + _globals['_MESSAGE_HEADERSENTRY']._loaded_options = None + _globals['_MESSAGE_HEADERSENTRY']._serialized_options = b'8\001' + _globals['_MESSAGE']._serialized_start=37 + _globals['_MESSAGE']._serialized_end=183 + _globals['_MESSAGE_HEADERSENTRY']._serialized_start=137 + _globals['_MESSAGE_HEADERSENTRY']._serialized_end=183 + _globals['_PROCESSMESSAGEREQUEST']._serialized_start=185 + _globals['_PROCESSMESSAGEREQUEST']._serialized_end=268 + _globals['_PROCESSMESSAGERESPONSE']._serialized_start=270 + _globals['_PROCESSMESSAGERESPONSE']._serialized_end=335 + _globals['_FLINKWORKERSERVICE']._serialized_start=337 + _globals['_FLINKWORKERSERVICE']._serialized_end=450 +# @@protoc_insertion_point(module_scope) diff --git a/flink_worker/flink_worker/flink_worker_pb2_grpc.py b/flink_worker/flink_worker/flink_worker_pb2_grpc.py new file mode 100644 index 00000000..eb987799 --- /dev/null +++ b/flink_worker/flink_worker/flink_worker_pb2_grpc.py @@ -0,0 +1,101 @@ +# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! +"""Client and server classes corresponding to protobuf-defined services.""" +import grpc +import warnings + +from . import flink_worker_pb2 as flink__worker__pb2 + +GRPC_GENERATED_VERSION = '1.74.0' +GRPC_VERSION = grpc.__version__ +_version_not_supported = False + +try: + from grpc._utilities import first_version_is_lower + _version_not_supported = first_version_is_lower(GRPC_VERSION, GRPC_GENERATED_VERSION) +except ImportError: + _version_not_supported = True + +if _version_not_supported: + raise RuntimeError( + f'The grpc package installed is at version {GRPC_VERSION},' + + f' but the generated code in flink_worker_pb2_grpc.py depends on' + + f' grpcio>={GRPC_GENERATED_VERSION}.' + + f' Please upgrade your grpc module to grpcio>={GRPC_GENERATED_VERSION}' + + f' or downgrade your generated code using grpcio-tools<={GRPC_VERSION}.' + ) + + +class FlinkWorkerServiceStub(object): + """Flink Worker Service + """ + + def __init__(self, channel): + """Constructor. + + Args: + channel: A grpc.Channel. + """ + self.ProcessMessage = channel.unary_unary( + '/flink_worker.FlinkWorkerService/ProcessMessage', + request_serializer=flink__worker__pb2.ProcessMessageRequest.SerializeToString, + response_deserializer=flink__worker__pb2.ProcessMessageResponse.FromString, + _registered_method=True) + + +class FlinkWorkerServiceServicer(object): + """Flink Worker Service + """ + + def ProcessMessage(self, request, context): + """Process a single message and return a list of processed messages + """ + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + +def add_FlinkWorkerServiceServicer_to_server(servicer, server): + rpc_method_handlers = { + 'ProcessMessage': grpc.unary_unary_rpc_method_handler( + servicer.ProcessMessage, + request_deserializer=flink__worker__pb2.ProcessMessageRequest.FromString, + response_serializer=flink__worker__pb2.ProcessMessageResponse.SerializeToString, + ), + } + generic_handler = grpc.method_handlers_generic_handler( + 'flink_worker.FlinkWorkerService', rpc_method_handlers) + server.add_generic_rpc_handlers((generic_handler,)) + server.add_registered_method_handlers('flink_worker.FlinkWorkerService', rpc_method_handlers) + + + # This class is part of an EXPERIMENTAL API. +class FlinkWorkerService(object): + """Flink Worker Service + """ + + @staticmethod + def ProcessMessage(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary( + request, + target, + '/flink_worker.FlinkWorkerService/ProcessMessage', + flink__worker__pb2.ProcessMessageRequest.SerializeToString, + flink__worker__pb2.ProcessMessageResponse.FromString, + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + _registered_method=True) diff --git a/flink_worker/flink_worker/server.py b/flink_worker/flink_worker/server.py new file mode 100644 index 00000000..b421068c --- /dev/null +++ b/flink_worker/flink_worker/server.py @@ -0,0 +1,74 @@ +""" +Flink Worker gRPC Server Entry Point + +This module provides the main entry point for running the Flink Worker gRPC server. +""" + +import argparse +import logging +import signal +import sys +from typing import Optional + +from .service import create_server + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" +) +logger = logging.getLogger(__name__) + + +def signal_handler(signum: int, frame: Optional[object]) -> None: + """Handle shutdown signals gracefully.""" + logger.info(f"Received signal {signum}, shutting down...") + sys.exit(0) + + +def main() -> None: + """Main entry point for the Flink Worker gRPC server.""" + parser = argparse.ArgumentParser(description="Flink Worker gRPC Server") + parser.add_argument( + "--port", + type=int, + default=50051, + help="Port to bind the server to (default: 50051)" + ) + parser.add_argument( + "--host", + type=str, + default="[::]", + help="Host to bind the server to (default: [::])" + ) + + args = parser.parse_args() + + # Set up signal handlers for graceful shutdown + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + + try: + # Create and start the server + server = create_server(args.port) + server.start() + + logger.info(f"Flink Worker gRPC server started on {args.host}:{args.port}") + logger.info("Press Ctrl+C to stop the server") + + # Keep the server running + server.wait_for_termination() + + except KeyboardInterrupt: + logger.info("Received keyboard interrupt, shutting down...") + except Exception as e: + logger.error(f"Error starting server: {e}") + sys.exit(1) + finally: + if 'server' in locals(): + server.stop(0) + logger.info("Server stopped") + + +if __name__ == "__main__": + main() diff --git a/flink_worker/flink_worker/service.py b/flink_worker/flink_worker/service.py new file mode 100644 index 00000000..fda6c2ef --- /dev/null +++ b/flink_worker/flink_worker/service.py @@ -0,0 +1,85 @@ +""" +Flink Worker gRPC Service Implementation + +This module provides the implementation of the FlinkWorkerService gRPC service. +""" + +import logging +from concurrent.futures import ThreadPoolExecutor + +import grpc +from .flink_worker_pb2 import Message, ProcessMessageRequest, ProcessMessageResponse +from .flink_worker_pb2_grpc import FlinkWorkerServiceServicer, add_FlinkWorkerServiceServicer_to_server + +logger = logging.getLogger(__name__) + + +class FlinkWorkerService(FlinkWorkerServiceServicer): + """ + Implementation of the FlinkWorkerService gRPC service. + + This service processes messages and returns a list of processed messages. + The Message class can be subclassed to add custom functionality. + """ + + def ProcessMessage( + self, request: ProcessMessageRequest, context: grpc.ServicerContext + ) -> ProcessMessageResponse: + """ + Process a single message and return a list of processed messages. + + Args: + request: The ProcessMessageRequest containing the message and segment_id + context: The gRPC service context + + Returns: + ProcessMessageResponse containing a list of processed messages + """ + try: + message = request.message + segment_id = request.segment_id + + logger.info(f"Processing message for segment {segment_id}") + logger.debug(f"Message payload length: {len(message.payload)}") + logger.debug(f"Message headers: {message.headers}") + logger.debug(f"Message timestamp: {message.timestamp}") + + # For now, return the original message as-is + # In a real implementation, this would contain the actual processing logic + processed_messages = [message] + + # Add a simple header to indicate processing + processed_message = Message() + processed_message.CopyFrom(message) + processed_message.headers["processed"] = "true" + processed_message.headers["segment_id"] = str(segment_id) + + processed_messages.append(processed_message) + + logger.info(f"Successfully processed message for segment {segment_id}") + return ProcessMessageResponse(messages=processed_messages) + + except Exception as e: + logger.error(f"Error processing message: {e}") + context.set_code(grpc.StatusCode.INTERNAL) + context.set_details(f"Internal error: {str(e)}") + return ProcessMessageResponse(messages=[]) + + +def create_server(port: int = 50051) -> grpc.Server: + """ + Create and configure the gRPC server. + + Args: + port: The port to bind the server to + + Returns: + A configured gRPC server + """ + server = grpc.server(ThreadPoolExecutor(max_workers=10)) + add_FlinkWorkerServiceServicer_to_server(FlinkWorkerService(), server) + + # Bind to the specified port + server.add_insecure_port(f"[::]:{port}") + + return server diff --git a/flink_worker/flink_worker/test_service.py b/flink_worker/flink_worker/test_service.py new file mode 100644 index 00000000..091cabe8 --- /dev/null +++ b/flink_worker/flink_worker/test_service.py @@ -0,0 +1,85 @@ +""" +Simple test script for the Flink Worker gRPC service. + +This script tests the service by starting it in a separate thread and then +sending a test message to it. +""" + +import time + +import grpc + +from .flink_worker_pb2 import Message, ProcessMessageRequest +from .flink_worker_pb2_grpc import FlinkWorkerServiceStub +from .service import create_server + + +def test_service(): + """Test the Flink Worker service.""" + print("πŸš€ Starting Flink Worker gRPC service test...") + + # Create and start the server + server = create_server(port=50052) # Use different port to avoid conflicts + server.start() + + print("βœ… Server started on port 50052") + + # Wait a moment for server to be ready + time.sleep(1) + + try: + # Create a channel and stub + channel = grpc.insecure_channel('localhost:50052') + stub = FlinkWorkerServiceStub(channel) + + print("βœ… Connected to server") + + # Create a test message + test_message = Message( + payload=b"Hello, Flink Worker! This is a test message.", + headers={ + "source": "test_script", + "type": "greeting", + "priority": "high" + }, + timestamp=int(time.time()) + ) + + # Create the request + request = ProcessMessageRequest( + message=test_message, + segment_id=42 + ) + + print("πŸ“€ Sending test message...") + print(f" Payload: {test_message.payload}") + print(f" Headers: {dict(test_message.headers)}") + print(f" Timestamp: {test_message.timestamp}") + print(f" Segment ID: {request.segment_id}") + + # Send the message + response = stub.ProcessMessage(request) + + print("πŸ“₯ Received response:") + print(f" Number of messages: {len(response.messages)}") + + # Display each processed message + for i, msg in enumerate(response.messages): + print(f"\n--- Message {i + 1} ---") + print(f" Payload: {msg.payload}") + print(f" Headers: {dict(msg.headers)}") + print(f" Timestamp: {msg.timestamp}") + + print("\nβœ… Test completed successfully!") + + except Exception as e: + print(f"❌ Test failed: {e}") + raise + finally: + # Clean up + server.stop(0) + print("πŸ›‘ Server stopped") + + +if __name__ == "__main__": + test_service() diff --git a/flink_worker/pyproject.toml b/flink_worker/pyproject.toml new file mode 100644 index 00000000..6d906519 --- /dev/null +++ b/flink_worker/pyproject.toml @@ -0,0 +1,59 @@ +[build-system] +requires = ["setuptools>=61.0", "wheel"] +build-backend = "setuptools.build_meta" + +[project] +name = "flink_worker" +version = "0.1.0" +description = "Flink Worker gRPC Service" +authors = [ + {name = "Sentry Team", email = "team@sentry.io"} +] +readme = "README.md" +requires-python = ">=3.8" +classifiers = [ + "Development Status :: 3 - Alpha", + "Intended Audience :: Developers", + "License :: OSI Approved :: MIT License", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.8", + "Programming Language :: Python :: 3.9", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", +] +dependencies = [ + "grpcio>=1.54.0", + "grpcio-tools>=1.54.0", + "protobuf>=4.21.0", +] + +[project.optional-dependencies] +dev = [ + "pytest>=7.0.0", + "pytest-asyncio>=0.21.0", + "black>=23.0.0", + "isort>=5.12.0", + "mypy>=1.0.0", +] + +[project.scripts] +flink-worker-server = "flink_worker.server:main" +flink-worker-client = "flink_worker.client:main" + +[tool.setuptools.packages.find] +where = ["."] +include = ["flink_worker*"] + +[tool.black] +line-length = 88 +target-version = ['py38'] + +[tool.isort] +profile = "black" +line_length = 88 + +[tool.mypy] +python_version = "3.8" +warn_return_any = true +warn_unused_configs = true +disallow_untyped_defs = true diff --git a/protos/flink_worker.proto b/protos/flink_worker.proto new file mode 100644 index 00000000..34df7a68 --- /dev/null +++ b/protos/flink_worker.proto @@ -0,0 +1,27 @@ +syntax = "proto3"; + +package flink_worker; + +// Message type that can be subclassed +message Message { + bytes payload = 1; + map headers = 2; + int64 timestamp = 3; +} + +// Request for processing a message +message ProcessMessageRequest { + Message message = 1; + uint32 segment_id = 2; +} + +// Response containing processed messages +message ProcessMessageResponse { + repeated Message messages = 1; +} + +// Flink Worker Service +service FlinkWorkerService { + // Process a single message and return a list of processed messages + rpc ProcessMessage(ProcessMessageRequest) returns (ProcessMessageResponse); +} From b50ed9024b73ba4280996dc4e59759c892dc246b Mon Sep 17 00:00:00 2001 From: Filippo Pacifici Date: Mon, 18 Aug 2025 16:19:18 -0700 Subject: [PATCH 02/19] Add The java bridge --- flink_bridge/.idea/compiler.xml | 13 ++ flink_bridge/.idea/encodings.xml | 7 + flink_bridge/.idea/jarRepositories.xml | 25 ++ flink_bridge/.idea/misc.xml | 12 + flink_bridge/.idea/vcs.xml | 6 + flink_bridge/.idea/workspace.xml | 74 ++++++ flink_bridge/CLI_README.md | 184 +++++++++++++++ flink_bridge/README.md | 155 ++++++++++++ flink_bridge/dependency-reduced-pom.xml | 134 +++++++++++ flink_bridge/flink-bridge.iml | 9 + flink_bridge/input.txt | 5 + flink_bridge/pom.xml | 221 ++++++++++++++++++ flink_bridge/run-cli.sh | 60 +++++ .../com/sentry/flink_bridge/FlinkGrpcApp.java | 128 ++++++++++ .../com/sentry/flink_bridge/GrpcCliApp.java | 159 +++++++++++++ .../com/sentry/flink_bridge/GrpcClient.java | 105 +++++++++ .../sentry/flink_bridge/SimpleGrpcTest.java | 83 +++++++ flink_worker/flink_worker/service.py | 2 +- 18 files changed, 1381 insertions(+), 1 deletion(-) create mode 100644 flink_bridge/.idea/compiler.xml create mode 100644 flink_bridge/.idea/encodings.xml create mode 100644 flink_bridge/.idea/jarRepositories.xml create mode 100644 flink_bridge/.idea/misc.xml create mode 100644 flink_bridge/.idea/vcs.xml create mode 100644 flink_bridge/.idea/workspace.xml create mode 100644 flink_bridge/CLI_README.md create mode 100644 flink_bridge/README.md create mode 100644 flink_bridge/dependency-reduced-pom.xml create mode 100644 flink_bridge/flink-bridge.iml create mode 100644 flink_bridge/input.txt create mode 100644 flink_bridge/pom.xml create mode 100755 flink_bridge/run-cli.sh create mode 100644 flink_bridge/src/main/java/com/sentry/flink_bridge/FlinkGrpcApp.java create mode 100644 flink_bridge/src/main/java/com/sentry/flink_bridge/GrpcCliApp.java create mode 100644 flink_bridge/src/main/java/com/sentry/flink_bridge/GrpcClient.java create mode 100644 flink_bridge/src/main/java/com/sentry/flink_bridge/SimpleGrpcTest.java diff --git a/flink_bridge/.idea/compiler.xml b/flink_bridge/.idea/compiler.xml new file mode 100644 index 00000000..d5708ca4 --- /dev/null +++ b/flink_bridge/.idea/compiler.xml @@ -0,0 +1,13 @@ + + + + + + + + + + + + + diff --git a/flink_bridge/.idea/encodings.xml b/flink_bridge/.idea/encodings.xml new file mode 100644 index 00000000..d8a72150 --- /dev/null +++ b/flink_bridge/.idea/encodings.xml @@ -0,0 +1,7 @@ + + + + + + + diff --git a/flink_bridge/.idea/jarRepositories.xml b/flink_bridge/.idea/jarRepositories.xml new file mode 100644 index 00000000..958889ad --- /dev/null +++ b/flink_bridge/.idea/jarRepositories.xml @@ -0,0 +1,25 @@ + + + + + + + + + + + + + diff --git a/flink_bridge/.idea/misc.xml b/flink_bridge/.idea/misc.xml new file mode 100644 index 00000000..ea03147b --- /dev/null +++ b/flink_bridge/.idea/misc.xml @@ -0,0 +1,12 @@ + + + + + + + + diff --git a/flink_bridge/.idea/vcs.xml b/flink_bridge/.idea/vcs.xml new file mode 100644 index 00000000..54e4b961 --- /dev/null +++ b/flink_bridge/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + diff --git a/flink_bridge/.idea/workspace.xml b/flink_bridge/.idea/workspace.xml new file mode 100644 index 00000000..639d266d --- /dev/null +++ b/flink_bridge/.idea/workspace.xml @@ -0,0 +1,74 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + 1755554494736 + + + + diff --git a/flink_bridge/CLI_README.md b/flink_bridge/CLI_README.md new file mode 100644 index 00000000..bb899fa8 --- /dev/null +++ b/flink_bridge/CLI_README.md @@ -0,0 +1,184 @@ +# Flink Bridge gRPC CLI Client + +This is a simple Java CLI application that demonstrates the gRPC client functionality by communicating with the Python FlinkWorker gRPC service. + +## Features + +- Interactive command-line interface +- Send custom messages to the gRPC service +- Built-in test message functionality +- Configurable host and port +- Real-time message processing and response display + +## Prerequisites + +- Java 24 or later +- Maven 3.6 or later +- Python gRPC server running (see setup below) + +## Quick Start + +### 1. Start the Python gRPC Server + +First, start the Python gRPC server in a separate terminal: + +```bash +cd flink_worker +source .venv/bin/activate +python -m flink_worker.server --port 50053 +``` + +### 2. Run the CLI Application + +#### Option A: Using the provided script (recommended) + +```bash +cd flink_bridge +./run-cli.sh +``` + +This script will: +- Compile the project +- Build the CLI JAR +- Run the application connecting to localhost:50053 + +#### Option B: Manual compilation and execution + +```bash +cd flink_bridge + +# Compile and build +mvn clean package + +# Run the CLI application +java -jar target/flink-bridge-cli.jar [host] [port] +``` + +### 3. Use the CLI + +Once running, you'll see an interactive prompt: + +``` +=== Flink Worker gRPC CLI Client === +Commands: + - Send a message to the gRPC service + help - Show this help message + quit/exit - Exit the application + test - Send a test message + +grpc> +``` + +## CLI Commands + +- **Any text message**: Sends the message to the gRPC service +- **`test`**: Sends a predefined test message +- **`help`**: Shows available commands +- **`quit` or `exit`**: Exits the application + +## Example Usage + +``` +grpc> Hello, this is a test message +Response received: + - Payload: Hello, this is a test message + Headers: {source=cli, timestamp=1703123456789, message_id=0} + Timestamp: 1703123456789 + - Payload: Hello, this is a test message + Headers: {source=cli, timestamp=1703123456789, message_id=0, processed=true, segment_id=0} + Timestamp: 1703123456789 + +grpc> test +Sending test message: This is a test message from the CLI client +Response received: + - Payload: This is a test message from the CLI client + Headers: {source=cli, timestamp=1703123456790, message_id=1} + Timestamp: 1703123456790 + - Payload: This is a test message from the CLI client + Headers: {source=cli, timestamp=1703123456790, message_id=1, processed=true, segment_id=1} + Timestamp: 1703123456790 + +grpc> quit +Goodbye! +``` + +## Configuration + +### Command Line Arguments + +- **Host**: First argument (default: `localhost`) +- **Port**: Second argument (default: `50053`) + +Examples: +```bash +# Connect to localhost:50053 (default) +java -jar target/flink-bridge-cli.jar + +# Connect to specific host and port +java -jar target/flink-bridge-cli.jar 192.168.1.100 50051 + +# Connect to specific host with default port +java -jar target/flink-bridge-cli.jar my-server.com +``` + +## Architecture + +The CLI application uses the same `GrpcClient` class that the Flink application uses, demonstrating: + +1. **gRPC Communication**: Direct communication with the Python service +2. **Message Processing**: Sending and receiving protobuf messages +3. **Error Handling**: Graceful error handling and user feedback +4. **Resource Management**: Proper cleanup of gRPC connections + +## Troubleshooting + +### Common Issues + +1. **"gRPC service is not available"** + - Make sure the Python server is running + - Check the host and port configuration + - Verify the server is accessible from your network + +2. **"Connection refused"** + - Check if the server is running on the specified port + - Verify firewall settings + - Ensure the server is binding to the correct interface + +3. **"Invalid port number"** + - Port must be a valid integer between 1-65535 + - Check the command line arguments + +### Debug Mode + +To see more detailed logging, you can set the log level: + +```bash +export SLF4J_SIMPLE_LOG_LEVEL=DEBUG +java -jar target/flink-bridge-cli.jar +``` + +## Development + +### Building + +```bash +mvn clean compile # Compile only +mvn clean package # Compile and package +mvn clean install # Compile, package, and install to local repo +``` + +### Project Structure + +- `GrpcCliApp.java` - Main CLI application +- `GrpcClient.java` - gRPC client abstraction +- `run-cli.sh` - Convenience script for running the CLI +- `pom.xml` - Maven configuration with CLI build profile + +### Adding New Features + +The CLI application is designed to be easily extensible. You can: + +1. Add new commands in the `runInteractiveCli` method +2. Enhance message processing in the `sendMessage` method +3. Add configuration options for different gRPC service features +4. Implement batch processing or file input capabilities diff --git a/flink_bridge/README.md b/flink_bridge/README.md new file mode 100644 index 00000000..c3db3c8e --- /dev/null +++ b/flink_bridge/README.md @@ -0,0 +1,155 @@ +# Flink gRPC Bridge Application + +This is an Apache Flink application that demonstrates integration with a gRPC service for message processing. The application reads messages from a text file source and processes them using the `FlinkWorkerService` gRPC service. + +## Architecture + +The application consists of: +- **FlinkGrpcApp**: Main Flink application that orchestrates the data flow +- **GrpcMessageProcessor**: A `ProcessFunction` that implements the `OneInputStreamProcessFunction` pattern +- **GrpcClient**: Client for communicating with the gRPC service +- **Proto-generated classes**: Auto-generated from `flink_worker.proto` + +## Prerequisites + +- Java 11 or higher +- Maven 3.6+ +- Apache Flink 2.1.0 +- The gRPC service must be running (see the `flink_worker` service) + +## Building the Application + +1. Navigate to the project directory: + ```bash + cd streams/flink_bridge + ``` + +2. Build the project: + ```bash + mvn clean compile + ``` + + This will: + - Generate Java classes from the protobuf definition + - Compile the Java source code + - Create the JAR file + +3. Package the application: + ```bash + mvn package + ``` + +## Running the Application + +### Prerequisites + +1. **Start the gRPC service** (from the `flink_worker` directory): + ```bash + cd streams/flink_worker + python -m flink_worker.server + ``` + + The service will start on port 50051 by default. + +2. **Start Flink cluster** (using the installed Flink 2.1.0): + ```bash + cd flink-2.1.0 + ./bin/start-cluster.sh + ``` + +### Submit the Job + +1. **Submit the Flink job**: + ```bash + cd flink-2.1.0 + ./bin/flink run -c com.sentry.flink_bridge.FlinkGrpcApp \ + ../streams/flink_bridge/target/flink-bridge-1.0.0.jar + ``` + +2. **Monitor the job**: + - Open Flink Web UI: http://localhost:8081 + - Check the job status and logs + +### Alternative: Run Locally + +You can also run the application locally for development/testing: + +```bash +cd streams/flink_bridge +mvn exec:java -Dexec.mainClass="com.sentry.flink_bridge.FlinkGrpcApp" +``` + +## Configuration + +### Input File + +The application reads from `input.txt` by default. You can modify the file path in `FlinkGrpcApp.java`: + +```java +new Path("input.txt") // Change this path as needed +``` + +### gRPC Service + +The gRPC service connection details can be configured in `GrpcClient.java`: + +```java +grpcClient = new GrpcClient("localhost", 50051); // Change host/port as needed +``` + +## Expected Output + +When running successfully, you should see: +1. Messages being read from the input file +2. gRPC service calls for each message +3. Processed messages printed to standard output +4. Logs showing the processing flow + +## Troubleshooting + +### Common Issues + +1. **gRPC service not available**: + - Ensure the Python gRPC service is running + - Check the port number (default: 50051) + - Verify network connectivity + +2. **Protobuf compilation errors**: + - Ensure the proto file is accessible + - Check Maven dependencies + - Run `mvn clean compile` to regenerate classes + +3. **Flink job submission errors**: + - Verify Flink cluster is running + - Check the JAR file path + - Ensure all dependencies are included + +### Logs + +The application uses SLF4J for logging. Check the Flink task manager logs for detailed information about the processing. + +## Development + +### Adding New Features + +1. **New message types**: Modify the protobuf definition and regenerate classes +2. **Additional processing**: Extend the `GrpcMessageProcessor` class +3. **Error handling**: Implement custom error handling in the process function +4. **Configuration**: Add configuration parameters for flexibility + +### Testing + +1. **Unit tests**: Add tests for individual components +2. **Integration tests**: Test the full data flow +3. **gRPC service mocking**: Use mock services for testing without external dependencies + +## Dependencies + +- **Apache Flink 2.1.0**: Core streaming framework +- **gRPC**: For service communication +- **Protobuf**: For message serialization +- **SLF4J**: For logging + +## License + +This project is part of the Sentry Streams project. diff --git a/flink_bridge/dependency-reduced-pom.xml b/flink_bridge/dependency-reduced-pom.xml new file mode 100644 index 00000000..d9067e0d --- /dev/null +++ b/flink_bridge/dependency-reduced-pom.xml @@ -0,0 +1,134 @@ + + + 4.0.0 + com.sentry + flink-bridge + Flink gRPC Bridge + 1.0.0 + A Flink application that processes messages using a gRPC service + + + + kr.motd.maven + os-maven-plugin + 1.7.0 + + + + + maven-shade-plugin + 3.4.1 + + + cli-app + package + + shade + + + flink-bridge-cli + + + com.sentry.flink_bridge.GrpcCliApp + + + + com.sentry.flink_bridge.GrpcCliApp + + + + + + flink-app + package + + shade + + + flink-bridge-app + + + com.sentry.flink_bridge.FlinkGrpcApp + + + + com.sentry.flink_bridge.FlinkGrpcApp + + + + + + + + org.xolstice.maven.plugins + protobuf-maven-plugin + 0.6.1 + + + + compile + compile-custom + + + + + com.google.protobuf:protoc:${protoc.version}:exe:${os.detected.classifier} + grpc-java + io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier} + ${project.basedir}/../protos + + + + maven-compiler-plugin + 3.11.0 + + 21 + 21 + ${maven.compiler.source} + ${maven.compiler.target} + + + + + + + xolstice-releases + https://raw.githubusercontent.com/xolstice/maven-repo/master/releases + + + + + org.apache.flink + flink-streaming-java + 2.1.0 + provided + + + org.apache.flink + flink-clients + 2.1.0 + provided + + + org.apache.flink + flink-annotations + 2.1.0 + provided + + + javax.annotation + javax.annotation-api + 1.3.2 + provided + + + + 21 + 21 + 3.25.1 + 3.25.1 + UTF-8 + 1.60.0 + 2.1.0 + + diff --git a/flink_bridge/flink-bridge.iml b/flink_bridge/flink-bridge.iml new file mode 100644 index 00000000..25f13ec6 --- /dev/null +++ b/flink_bridge/flink-bridge.iml @@ -0,0 +1,9 @@ + + + + + + + + + diff --git a/flink_bridge/input.txt b/flink_bridge/input.txt new file mode 100644 index 00000000..1d089efa --- /dev/null +++ b/flink_bridge/input.txt @@ -0,0 +1,5 @@ +Hello World +This is a test message +Another message for processing +Flink gRPC integration test +Processing stream data with external service diff --git a/flink_bridge/pom.xml b/flink_bridge/pom.xml new file mode 100644 index 00000000..a268ae93 --- /dev/null +++ b/flink_bridge/pom.xml @@ -0,0 +1,221 @@ + + + 4.0.0 + + com.sentry + flink-bridge + 1.0.0 + jar + + Flink gRPC Bridge + A Flink application that processes messages using a gRPC service + + + 21 + 21 + UTF-8 + 1.60.0 + 3.25.1 + 3.25.1 + 2.1.0 + + + + + + xolstice-releases + https://raw.githubusercontent.com/xolstice/maven-repo/master/releases + + + + + + + org.apache.flink + flink-streaming-java + ${flink.version} + provided + + + + org.apache.flink + flink-clients + ${flink.version} + provided + + + + + org.apache.flink + flink-annotations + ${flink.version} + provided + + + + + + io.grpc + grpc-netty-shaded + ${grpc.version} + + + + io.grpc + grpc-protobuf + ${grpc.version} + + + + io.grpc + grpc-stub + ${grpc.version} + + + + + + io.grpc + grpc-core + ${grpc.version} + + + + io.grpc + grpc-api + ${grpc.version} + + + + + com.google.protobuf + protobuf-java + ${protobuf.version} + + + + + org.slf4j + slf4j-api + 1.7.36 + + + + org.slf4j + slf4j-simple + 1.7.36 + + + + javax.annotation + javax.annotation-api + 1.3.2 + provided + + + + + + + + kr.motd.maven + os-maven-plugin + 1.7.0 + + + + + org.apache.maven.plugins + maven-shade-plugin + 3.4.1 + + + cli-app + package + + shade + + + flink-bridge-cli + + + com.sentry.flink_bridge.GrpcCliApp + + + + + + com.sentry.flink_bridge.GrpcCliApp + + + + + + + + flink-app + package + + shade + + + flink-bridge-app + + + com.sentry.flink_bridge.FlinkGrpcApp + + + + + + com.sentry.flink_bridge.FlinkGrpcApp + + + + + + + + + + org.xolstice.maven.plugins + protobuf-maven-plugin + 0.6.1 + + + com.google.protobuf:protoc:${protoc.version}:exe:${os.detected.classifier} + + grpc-java + + io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier} + + ${project.basedir}/../protos + + + + + compile + compile-custom + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.11.0 + + 21 + 21 + ${maven.compiler.source} + ${maven.compiler.target} + + + + + + diff --git a/flink_bridge/run-cli.sh b/flink_bridge/run-cli.sh new file mode 100755 index 00000000..498e81d3 --- /dev/null +++ b/flink_bridge/run-cli.sh @@ -0,0 +1,60 @@ +#!/bin/bash + +# Flink Bridge gRPC CLI Runner +# This script compiles and runs the Java CLI application + +set -e + +# Colors for output +RED='\033[0;31m' +GREEN='\033[0;32m' +YELLOW='\033[1;33m' +NC='\033[0m' # No Color + +echo -e "${GREEN}=== Flink Bridge gRPC CLI Runner ===${NC}" + +# Check if Maven is available +if ! command -v mvn &> /dev/null; then + echo -e "${RED}Error: Maven is not installed or not in PATH${NC}" + echo "Please install Maven and try again" + exit 1 +fi + +# Check if Java is available +if ! command -v java &> /dev/null; then + echo -e "${RED}Error: Java is not installed or not in PATH${NC}" + echo "Please install Java and try again" + exit 1 +fi + +echo -e "${YELLOW}Compiling the project...${NC}" +mvn clean compile + +echo -e "${YELLOW}Building the CLI application...${NC}" +mvn package + +# Check if the CLI JAR was created +CLI_JAR="target/flink-bridge-cli.jar" +if [ ! -f "$CLI_JAR" ]; then + echo -e "${RED}Error: CLI JAR file not found at $CLI_JAR${NC}" + exit 1 +fi + +echo -e "${GREEN}CLI application built successfully!${NC}" +echo -e "${YELLOW}Usage:${NC}" +echo " $0 [host] [port]" +echo " Default: localhost:50053" +echo "" +echo -e "${YELLOW}Make sure the Python gRPC server is running:${NC}" +echo " cd ../flink_worker && source .venv/bin/activate && python -m flink_worker.server --port 50053" +echo "" + +# Parse command line arguments +HOST=${1:-localhost} +PORT=${2:-50053} + +echo -e "${GREEN}Starting CLI application connecting to $HOST:$PORT...${NC}" +echo "" + +# Run the CLI application +java -jar "$CLI_JAR" "$HOST" "$PORT" diff --git a/flink_bridge/src/main/java/com/sentry/flink_bridge/FlinkGrpcApp.java b/flink_bridge/src/main/java/com/sentry/flink_bridge/FlinkGrpcApp.java new file mode 100644 index 00000000..50aba383 --- /dev/null +++ b/flink_bridge/src/main/java/com/sentry/flink_bridge/FlinkGrpcApp.java @@ -0,0 +1,128 @@ +package com.sentry.flink_bridge; + +import org.apache.flink.api.common.RuntimeExecutionMode; +import org.apache.flink.api.connector.dsv2.DataStreamV2SourceUtils; +import org.apache.flink.api.connector.dsv2.WrappedSink; +import org.apache.flink.datastream.api.ExecutionEnvironment; +import org.apache.flink.datastream.api.context.NonPartitionedContext; +import org.apache.flink.datastream.api.context.PartitionedContext; +import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction; +import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream; +import org.apache.flink.streaming.api.functions.sink.PrintSink; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import com.google.protobuf.ByteString; +import flink_worker.FlinkWorker; +import org.apache.flink.datastream.api.common.Collector; + +import java.nio.charset.StandardCharsets; +import java.util.Arrays; + +/** + * Flink application that reads messages from a text file and processes them using a gRPC service. + * This demonstrates the integration between Apache Flink and gRPC services. + */ +public class FlinkGrpcApp { + + private static final Logger LOG = LoggerFactory.getLogger(FlinkGrpcApp.class); + + public static void main(String[] args) throws Exception { + // Set up the streaming execution environment + ExecutionEnvironment env = ExecutionEnvironment.getInstance(); + env.setExecutionMode(RuntimeExecutionMode.STREAMING); + + // Create a data stream from a text file using Flink 2.1.0 API + NonKeyedPartitionStream textStream = env.fromSource( + DataStreamV2SourceUtils.fromData( + Arrays.asList(new String[]{ + "Hello World", + "This is a test message", + "Another message for processing", + "Flink gRPC integration test", + "Processing stream data with external service" + }) + ), + "in memory list" + ); + + // Apply the gRPC processing function + NonKeyedPartitionStream processedStream = textStream + .process(new GrpcMessageProcessor()); + + // Print the processed messages to standard output + processedStream.toSink(new WrappedSink<>(new PrintSink<>())).withName("print-sink"); + + + // Execute the Flink job + LOG.info("Starting Flink gRPC application..."); + env.execute("Flink gRPC Message Processing Job"); + } + + /** + * ProcessFunction that sends messages to the gRPC service for processing. + * This implements the OneInputStreamProcessFunction pattern as requested. + */ + public static class GrpcMessageProcessor implements OneInputStreamProcessFunction { + + private static final Logger LOG = LoggerFactory.getLogger(GrpcMessageProcessor.class); + private GrpcClient grpcClient; + + @Override + public void open(NonPartitionedContext ctx) throws Exception { + // Initialize the gRPC client + grpcClient = new GrpcClient("localhost", 50053); + LOG.info("gRPC client initialized"); + } + + @Override + public void processRecord( + String record, + Collector out, + PartitionedContext ctx + ) throws Exception { + try { + LOG.info("Processing message: {}", record); + + // Create a message for the gRPC service + FlinkWorker.Message message; + message = FlinkWorker.Message.newBuilder() + .setPayload(ByteString.copyFrom(record.getBytes(StandardCharsets.UTF_8))) + .putHeaders("source", "flink") + .putHeaders("timestamp", String.valueOf(System.currentTimeMillis())) + .setTimestamp(System.currentTimeMillis()) + .build(); + + // Create the processing request + FlinkWorker.ProcessMessageRequest request = FlinkWorker.ProcessMessageRequest.newBuilder() + .setMessage(message) + .setSegmentId(0) // Simple segment ID for now + .build(); + + // Send to gRPC service and get response + FlinkWorker.ProcessMessageResponse response = grpcClient.processMessage(request); + + // Process the response and output processed messages + for (FlinkWorker.Message processedMsg : response.getMessagesList()) { + String processedContent = new String(processedMsg.getPayload().toByteArray(), StandardCharsets.UTF_8); + LOG.info("Received processed message: {}", processedContent); + out.collect(processedContent); + + } + + } catch (Exception e) { + LOG.error("Error processing message: {}", record, e); + // In a production environment, you might want to handle errors differently + // For now, we'll just log the error and continue + } + } + + @Override + public void close() throws Exception { + if (grpcClient != null) { + grpcClient.shutdown(); + + } + } + } + +} diff --git a/flink_bridge/src/main/java/com/sentry/flink_bridge/GrpcCliApp.java b/flink_bridge/src/main/java/com/sentry/flink_bridge/GrpcCliApp.java new file mode 100644 index 00000000..149d5346 --- /dev/null +++ b/flink_bridge/src/main/java/com/sentry/flink_bridge/GrpcCliApp.java @@ -0,0 +1,159 @@ +package com.sentry.flink_bridge; + +import flink_worker.FlinkWorker.Message; +import flink_worker.FlinkWorker.ProcessMessageRequest; +import flink_worker.FlinkWorker.ProcessMessageResponse; +import com.google.protobuf.ByteString; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.charset.StandardCharsets; +import java.util.Scanner; + +/** + * Simple CLI application that demonstrates the gRPC client functionality. + * This program allows users to send messages to the FlinkWorker gRPC service + * and see the responses. + */ +public class GrpcCliApp { + + private static final Logger LOG = LoggerFactory.getLogger(GrpcCliApp.class); + private static final String DEFAULT_HOST = "localhost"; + private static final int DEFAULT_PORT = 50053; + + public static void main(String[] args) { + // Parse command line arguments + String host = DEFAULT_HOST; + int port = DEFAULT_PORT; + + if (args.length >= 1) { + host = args[0]; + } + if (args.length >= 2) { + try { + port = Integer.parseInt(args[1]); + } catch (NumberFormatException e) { + LOG.error("Invalid port number: {}. Using default port {}", args[1], DEFAULT_PORT); + port = DEFAULT_PORT; + } + } + + LOG.info("Starting gRPC CLI client connecting to {}:{}", host, port); + + GrpcClient client = null; + try { + // Create the gRPC client + client = new GrpcClient(host, port); + + // Check if the service is available + if (!client.isAvailable()) { + LOG.error("gRPC service is not available at {}:{}", host, port); + LOG.error("Make sure the Python gRPC server is running with:"); + LOG.error("cd flink_worker && source .venv/bin/activate && python -m flink_worker.server --port {}", port); + System.exit(1); + } + + LOG.info("Successfully connected to gRPC service at {}:{}", host, port); + + // Start interactive CLI + runInteractiveCli(client); + + } catch (Exception e) { + LOG.error("Error in gRPC CLI application", e); + System.exit(1); + } finally { + if (client != null) { + client.shutdown(); + } + } + } + + /** + * Runs the interactive command-line interface. + */ + private static void runInteractiveCli(GrpcClient client) { + Scanner scanner = new Scanner(System.in); + int messageCounter = 0; + + System.out.println("\n=== Flink Worker gRPC CLI Client ==="); + System.out.println("Commands:"); + System.out.println(" - Send a message to the gRPC service"); + System.out.println(" help - Show this help message"); + System.out.println(" quit/exit - Exit the application"); + System.out.println(" test - Send a test message"); + System.out.println(); + + while (true) { + System.out.print("grpc> "); + String input = scanner.nextLine().trim(); + + if (input.isEmpty()) { + continue; + } + + if (input.equalsIgnoreCase("quit") || input.equalsIgnoreCase("exit")) { + System.out.println("Goodbye!"); + break; + } + + if (input.equalsIgnoreCase("help")) { + System.out.println("Commands:"); + System.out.println(" - Send a message to the gRPC service"); + System.out.println(" help - Show this help message"); + System.out.println(" quit/exit - Exit the application"); + System.out.println(" test - Send a test message"); + continue; + } + + if (input.equalsIgnoreCase("test")) { + input = "This is a test message from the CLI client"; + System.out.println("Sending test message: " + input); + } + + try { + // Send the message to the gRPC service + ProcessMessageResponse response = sendMessage(client, input, messageCounter++); + + // Display the response + System.out.println("Response received:"); + for (Message msg : response.getMessagesList()) { + String payload = new String(msg.getPayload().toByteArray(), StandardCharsets.UTF_8); + System.out.println(" - Payload: " + payload); + System.out.println(" Headers: " + msg.getHeadersMap()); + System.out.println(" Timestamp: " + msg.getTimestamp()); + } + + } catch (Exception e) { + LOG.error("Error processing message: {}", input, e); + System.out.println("Error: " + e.getMessage()); + } + + System.out.println(); + } + + scanner.close(); + } + + /** + * Sends a message to the gRPC service. + */ + private static ProcessMessageResponse sendMessage(GrpcClient client, String messageText, int segmentId) { + // Create the message + Message message = Message.newBuilder() + .setPayload(ByteString.copyFrom(messageText.getBytes(StandardCharsets.UTF_8))) + .putHeaders("source", "cli") + .putHeaders("timestamp", String.valueOf(System.currentTimeMillis())) + .putHeaders("message_id", String.valueOf(segmentId)) + .setTimestamp(System.currentTimeMillis()) + .build(); + + // Create the request + ProcessMessageRequest request = ProcessMessageRequest.newBuilder() + .setMessage(message) + .setSegmentId(segmentId) + .build(); + + // Send to gRPC service + return client.processMessage(request); + } +} diff --git a/flink_bridge/src/main/java/com/sentry/flink_bridge/GrpcClient.java b/flink_bridge/src/main/java/com/sentry/flink_bridge/GrpcClient.java new file mode 100644 index 00000000..2739b6fb --- /dev/null +++ b/flink_bridge/src/main/java/com/sentry/flink_bridge/GrpcClient.java @@ -0,0 +1,105 @@ +package com.sentry.flink_bridge; + +import flink_worker.FlinkWorker.Message; +import flink_worker.FlinkWorker.ProcessMessageRequest; +import flink_worker.FlinkWorker.ProcessMessageResponse; +import flink_worker.FlinkWorkerServiceGrpc; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.TimeUnit; + +/** + * gRPC client for communicating with the FlinkWorkerService. + * This client handles the connection and communication with the gRPC service. + */ +public class GrpcClient { + + private static final Logger LOG = LoggerFactory.getLogger(GrpcClient.class); + + private final ManagedChannel channel; + private final FlinkWorkerServiceGrpc.FlinkWorkerServiceBlockingStub blockingStub; + + /** + * Constructs a gRPC client for the specified host and port. + * + * @param host the hostname of the gRPC service + * @param port the port number of the gRPC service + */ + public GrpcClient(String host, int port) { + LOG.info("Creating gRPC client for {}:{}", host, port); + try { + // Use DNS resolver with the format dns:///host:port + String target = "dns:///" + host + ":" + port; + LOG.info("Using target: {}", target); + + ManagedChannel channel = ManagedChannelBuilder.forTarget(target) + .usePlaintext() + .maxInboundMessageSize(1024 * 1024) + .build(); + + this.channel = channel; + this.blockingStub = FlinkWorkerServiceGrpc.newBlockingStub(channel); + LOG.info("gRPC client created successfully"); + } catch (Exception e) { + LOG.error("Failed to create gRPC client for {}:{}", host, port, e); + throw new RuntimeException("Failed to create gRPC client", e); + } + } + + + + /** + * Sends a message processing request to the gRPC service. + * + * @param request the message processing request + * @return the response containing processed messages + * @throws RuntimeException if the gRPC call fails + */ + public ProcessMessageResponse processMessage(ProcessMessageRequest request) { + try { + LOG.debug("Sending request to gRPC service: {}", request); + ProcessMessageResponse response = blockingStub.processMessage(request); + LOG.debug("Received response from gRPC service: {} messages", + response.getMessagesCount()); + return response; + } catch (Exception e) { + LOG.error("Error calling gRPC service", e); + throw new RuntimeException("Failed to process message via gRPC", e); + } + } + + /** + * Shuts down the gRPC client and closes the channel. + * This method should be called when the client is no longer needed. + */ + public void shutdown() { + try { + if (channel != null && !channel.isShutdown()) { + channel.shutdown().awaitTermination(5, TimeUnit.SECONDS); + LOG.info("gRPC client shutdown completed"); + } + } catch (InterruptedException e) { + LOG.warn("Interrupted while shutting down gRPC client", e); + Thread.currentThread().interrupt(); + } + } + + /** + * Checks if the gRPC client is available and responsive. + * + * @return true if the service is available, false otherwise + */ + public boolean isAvailable() { + try { + // Try to send a simple request to check availability + // For now, we'll just check if the channel is ready + return channel != null && !channel.isShutdown() && !channel.isTerminated(); + } catch (Exception e) { + LOG.debug("Service availability check failed", e); + return false; + } + } +} diff --git a/flink_bridge/src/main/java/com/sentry/flink_bridge/SimpleGrpcTest.java b/flink_bridge/src/main/java/com/sentry/flink_bridge/SimpleGrpcTest.java new file mode 100644 index 00000000..7ce6a2d3 --- /dev/null +++ b/flink_bridge/src/main/java/com/sentry/flink_bridge/SimpleGrpcTest.java @@ -0,0 +1,83 @@ +package com.sentry.flink_bridge; + +import flink_worker.FlinkWorker.Message; +import flink_worker.FlinkWorker.ProcessMessageRequest; +import flink_worker.FlinkWorker.ProcessMessageResponse; +import com.google.protobuf.ByteString; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import flink_worker.FlinkWorkerServiceGrpc; + +import java.nio.charset.StandardCharsets; + +/** + * Simple test program to debug gRPC connection issues. + */ +public class SimpleGrpcTest { + + public static void main(String[] args) { + System.out.println("Starting simple gRPC test..."); + + String host = "127.0.0.1"; + int port = 50053; + + if (args.length >= 1) { + host = args[0]; + } + if (args.length >= 2) { + try { + port = Integer.parseInt(args[1]); + } catch (NumberFormatException e) { + System.err.println("Invalid port: " + args[1]); + System.exit(1); + } + } + + System.out.println("Connecting to " + host + ":" + port); + + ManagedChannel channel = null; + try { + // Try different connection methods + System.out.println("Method 1: Using forAddress..."); + channel = ManagedChannelBuilder.forAddress(host, port) + .usePlaintext() + .build(); + + System.out.println("Channel created successfully!"); + + // Test the connection + FlinkWorkerServiceGrpc.FlinkWorkerServiceBlockingStub stub = + FlinkWorkerServiceGrpc.newBlockingStub(channel); + + // Create a simple test message + Message message = Message.newBuilder() + .setPayload(ByteString.copyFrom("Hello from Java!".getBytes(StandardCharsets.UTF_8))) + .putHeaders("source", "test") + .setTimestamp(System.currentTimeMillis()) + .build(); + + ProcessMessageRequest request = ProcessMessageRequest.newBuilder() + .setMessage(message) + .setSegmentId(0) + .build(); + + System.out.println("Sending test message..."); + ProcessMessageResponse response = stub.processMessage(request); + + System.out.println("Success! Received response with " + response.getMessagesCount() + " messages"); + + for (Message msg : response.getMessagesList()) { + String payload = new String(msg.getPayload().toByteArray(), StandardCharsets.UTF_8); + System.out.println(" - " + payload); + } + + } catch (Exception e) { + System.err.println("Error: " + e.getMessage()); + e.printStackTrace(); + } finally { + if (channel != null) { + channel.shutdown(); + } + } + } +} diff --git a/flink_worker/flink_worker/service.py b/flink_worker/flink_worker/service.py index fda6c2ef..9b059fe9 100644 --- a/flink_worker/flink_worker/service.py +++ b/flink_worker/flink_worker/service.py @@ -47,10 +47,10 @@ def ProcessMessage( # For now, return the original message as-is # In a real implementation, this would contain the actual processing logic processed_messages = [message] - # Add a simple header to indicate processing processed_message = Message() processed_message.CopyFrom(message) + processed_message.payload = f"{message.payload} processed".encode() processed_message.headers["processed"] = "true" processed_message.headers["segment_id"] = str(segment_id) From 3ad13970242eae177b26127112cc044d82cbb490 Mon Sep 17 00:00:00 2001 From: Filippo Pacifici Date: Mon, 18 Aug 2025 17:54:08 -0700 Subject: [PATCH 03/19] Add tests --- .vscode/settings.json | 2 +- flink_bridge/.idea/workspace.xml | 37 +--- flink_bridge/pom.xml | 41 ++++ .../com/sentry/flink_bridge/FlinkGrpcApp.java | 88 +------- .../com/sentry/flink_bridge/GrpcCliApp.java | 30 ++- .../com/sentry/flink_bridge/GrpcClient.java | 24 ++- .../flink_bridge/GrpcMessageProcessor.java | 75 +++++++ .../sentry/flink_bridge/SimpleGrpcTest.java | 22 +- flink_bridge/src/test/README.md | 132 ++++++++++++ .../com/sentry/flink_bridge/BasicTest.java | 134 +++++++++++++ .../GrpcMessageProcessorTest.java | 188 ++++++++++++++++++ .../sentry/flink_bridge/MockGrpcClient.java | 83 ++++++++ flink_bridge_readme.md | 120 +++++++++++ 13 files changed, 828 insertions(+), 148 deletions(-) create mode 100644 flink_bridge/src/main/java/com/sentry/flink_bridge/GrpcMessageProcessor.java create mode 100644 flink_bridge/src/test/README.md create mode 100644 flink_bridge/src/test/java/com/sentry/flink_bridge/BasicTest.java create mode 100644 flink_bridge/src/test/java/com/sentry/flink_bridge/GrpcMessageProcessorTest.java create mode 100644 flink_bridge/src/test/java/com/sentry/flink_bridge/MockGrpcClient.java create mode 100644 flink_bridge_readme.md diff --git a/.vscode/settings.json b/.vscode/settings.json index 96ae581f..0a62c380 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -8,5 +8,5 @@ "mypy-type-checker.args": [ "--strict" ], - "editor.formatOnSave": true + "editor.formatOnSave": true, } diff --git a/flink_bridge/.idea/workspace.xml b/flink_bridge/.idea/workspace.xml index 639d266d..f9619bf1 100644 --- a/flink_bridge/.idea/workspace.xml +++ b/flink_bridge/.idea/workspace.xml @@ -4,24 +4,7 @@