Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 30 additions & 53 deletions src/event_gate_lambda.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@
# limitations under the License.
#

"""Event Gate Lambda function implementation."""
"""
This module contains the AWS Lambda entry point for the EventGate service.
"""

import json
import logging
import os
Expand All @@ -24,6 +27,7 @@
import boto3
from botocore.exceptions import BotoCoreError, NoCredentialsError

from src.handlers.handler_api import HandlerApi
from src.handlers.handler_token import HandlerToken
from src.handlers.handler_topic import HandlerTopic
from src.handlers.handler_health import HandlerHealth
Expand All @@ -32,14 +36,15 @@
from src.writers import writer_eventbridge, writer_kafka, writer_postgres
from src.utils.conf_path import CONF_DIR, INVALID_CONF_ENV

PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))

# Initialize logger
logger = logging.getLogger(__name__)
root_logger = logging.getLogger()
if not root_logger.handlers:
root_logger.addHandler(logging.StreamHandler())

log_level = os.environ.get("LOG_LEVEL", "INFO")
logger.setLevel(log_level)
if not logger.handlers:
logger.addHandler(logging.StreamHandler())
root_logger.setLevel(log_level)
logger = logging.getLogger(__name__)
logger.debug("Initialized logger with level %s", log_level)

# Load main configuration
Expand All @@ -50,11 +55,6 @@
config = json.load(file)
logger.debug("Loaded main configuration")

# Load API definition
with open(os.path.join(PROJECT_ROOT, "api.yaml"), "r", encoding="utf-8") as file:
API = file.read()
logger.debug("Loaded API definition")

# Initialize S3 client with SSL verification
try:
ssl_verify = config.get(SSL_CA_BUNDLE_KEY, True)
Expand All @@ -64,41 +64,33 @@
logger.exception("Failed to initialize AWS S3 client")
raise RuntimeError("AWS S3 client initialization failed") from exc

# Load access configuration
ACCESS: Dict[str, list[str]] = {}
if config["access_config"].startswith("s3://"):
name_parts = config["access_config"].split("/")
BUCKET_NAME = name_parts[2]
BUCKET_OBJECT_KEY = "/".join(name_parts[3:])
ACCESS = json.loads(aws_s3.Bucket(BUCKET_NAME).Object(BUCKET_OBJECT_KEY).get()["Body"].read().decode("utf-8"))
else:
with open(config["access_config"], "r", encoding="utf-8") as file:
ACCESS = json.load(file)
logger.debug("Loaded access configuration")

# Initialize token handler and load token public keys
handler_token = HandlerToken(config).load_public_keys()

# Initialize EventGate writers
writer_eventbridge.init(logger, config)
writer_kafka.init(logger, config)
writer_postgres.init(logger)

# Initialize topic handler and load topic schemas
handler_topic = HandlerTopic(CONF_DIR, ACCESS, handler_token).load_topic_schemas()

# Initialize health handler
# Initialize EventGate handlers
handler_token = HandlerToken(config).load_public_keys()
Comment thread
oto-macenauer-absa marked this conversation as resolved.
Outdated
handler_topic = HandlerTopic(CONF_DIR, config, aws_s3, handler_token).load_access_config().load_topic_schemas()
Comment thread
oto-macenauer-absa marked this conversation as resolved.
Outdated
handler_health = HandlerHealth()
handler_api = HandlerApi().load_api_definition()
Comment thread
oto-macenauer-absa marked this conversation as resolved.
Outdated


def get_api() -> Dict[str, Any]:
"""Return the OpenAPI specification text."""
return {"statusCode": 200, "body": API}
# Route to handler function mapping
ROUTE_MAP: Dict[str, Any] = {
"/api": lambda _: handler_api.get_api(),
"/token": lambda _: handler_token.get_token_provider_info(),
"/health": lambda _: handler_health.get_health(),
"/topics": lambda _: handler_topic.get_topics_list(),
"/topics/{topic_name}": handler_topic.handle_request,
"/terminate": lambda _: sys.exit("TERMINATING"),
}
Comment thread
tmikula-dev marked this conversation as resolved.


def lambda_handler(event: Dict[str, Any], _context: Any = None) -> Dict[str, Any]:
"""
AWS Lambda entry point. Dispatches based on API Gateway proxy 'resource' and 'httpMethod'.

Args:
event: The event data from API Gateway.
_context: The mandatory context argument for AWS Lambda invocation (unused).
Expand All @@ -109,26 +101,11 @@ def lambda_handler(event: Dict[str, Any], _context: Any = None) -> Dict[str, Any
"""
try:
resource = event.get("resource", "").lower()
if resource == "/api":
return get_api()
if resource == "/token":
return handler_token.get_token_provider_info()
if resource == "/health":
return handler_health.get_health()
if resource == "/topics":
return handler_topic.get_topics_list()
if resource == "/topics/{topic_name}":
method = event.get("httpMethod")
if method == "GET":
return handler_topic.get_topic_schema(event["pathParameters"]["topic_name"].lower())
if method == "POST":
return handler_topic.post_topic_message(
event["pathParameters"]["topic_name"].lower(),
json.loads(event["body"]),
handler_token.extract_token(event.get("headers", {})),
)
if resource == "/terminate":
sys.exit("TERMINATING")
route_function = ROUTE_MAP.get(resource)

if route_function:
return route_function(event)

return build_error_response(404, "route", "Resource not found")
except (KeyError, json.JSONDecodeError, ValueError, AttributeError, TypeError, RuntimeError) as request_exc:
logger.exception("Request processing error: %s", request_exc)
Expand Down
63 changes: 63 additions & 0 deletions src/handlers/handler_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
#
# Copyright 2026 ABSA Group Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""
This module provides the HandlerApi class for serving the OpenAPI specification.
"""

import logging
import os
from typing import Dict, Any

logger = logging.getLogger(__name__)


class HandlerApi:
"""
HandlerApi manages the OpenAPI specification endpoint.
"""

def __init__(self):
self.api_spec: str = ""

def load_api_definition(self) -> "HandlerApi":
"""
Load the OpenAPI specification from api.yaml file.

Returns:
HandlerApi: The current instance with loaded API definition.
"""
project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), "../.."))
api_path = os.path.join(project_root, "api.yaml")

with open(api_path, "r", encoding="utf-8") as file:
self.api_spec = file.read()
logger.debug("Loaded API definition from %s", api_path)
return self

def get_api(self) -> Dict[str, Any]:
"""
Return the OpenAPI specification.

Returns:
Dict[str, Any]: API Gateway response with OpenAPI spec.
"""
logger.debug("Handling GET API")
Comment thread
oto-macenauer-absa marked this conversation as resolved.
return {
"statusCode": 200,
"headers": {"Content-Type": "application/yaml"},
"body": self.api_spec,
}
58 changes: 52 additions & 6 deletions src/handlers/handler_topic.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from typing import Dict, Any

import jwt
from boto3.resources.base import ServiceResource
from jsonschema import validate
from jsonschema.exceptions import ValidationError

Expand All @@ -31,21 +32,44 @@
from src.writers import writer_eventbridge, writer_kafka, writer_postgres

logger = logging.getLogger(__name__)
log_level = os.environ.get("LOG_LEVEL", "INFO")
logger.setLevel(log_level)


class HandlerTopic:
"""
HandlerTopic manages topic schemas, access control, and message posting.
"""

def __init__(self, conf_dir: str, access_config: Dict[str, list[str]], handler_token: HandlerToken):
def __init__(self, conf_dir: str, config: Dict[str, Any], aws_s3: ServiceResource, handler_token: HandlerToken):
self.conf_dir = conf_dir
self.access_config = access_config
self.config = config
self.aws_s3 = aws_s3
self.handler_token = handler_token
self.access_config: Dict[str, list[str]] = {}
self.topics: Dict[str, Dict[str, Any]] = {}

def load_access_config(self) -> "HandlerTopic":
"""
Load access control configuration from S3 or local file.
Returns:
HandlerTopic: The current instance with loaded access config.
"""
access_path = self.config["access_config"]
logger.debug("Loading access configuration from %s", access_path)

if access_path.startswith("s3://"):
name_parts = access_path.split("/")
bucket_name = name_parts[2]
bucket_object_key = "/".join(name_parts[3:])
self.access_config = json.loads(
self.aws_s3.Bucket(bucket_name).Object(bucket_object_key).get()["Body"].read().decode("utf-8")
)
else:
with open(access_path, "r", encoding="utf-8") as file:
self.access_config = json.load(file)

logger.debug("Loaded access configuration")
return self

def load_topic_schemas(self) -> "HandlerTopic":
"""
Load topic schemas from configuration files.
Expand Down Expand Up @@ -78,7 +102,29 @@ def get_topics_list(self) -> Dict[str, Any]:
"body": json.dumps(list(self.topics)),
}

def get_topic_schema(self, topic_name: str) -> Dict[str, Any]:
def handle_request(self, event: Dict[str, Any]) -> Dict[str, Any]:
"""
Handle GET/POST requests for /topics/{topic_name} resource.

Args:
event: The API Gateway event containing path parameters, method, body, and headers.
Returns:
Dict[str, Any]: API Gateway response.
"""
topic_name = event["pathParameters"]["topic_name"].lower()
method = event.get("httpMethod")

if method == "GET":
return self._get_topic_schema(topic_name)
if method == "POST":
return self._post_topic_message(
topic_name,
json.loads(event["body"]),
self.handler_token.extract_token(event.get("headers", {})),
)
return build_error_response(404, "route", "Resource not found")
Comment thread
tmikula-dev marked this conversation as resolved.

def _get_topic_schema(self, topic_name: str) -> Dict[str, Any]:
"""
Return the JSON schema for a specific topic.
Args:
Expand All @@ -97,7 +143,7 @@ def get_topic_schema(self, topic_name: str) -> Dict[str, Any]:
"body": json.dumps(self.topics[topic_name]),
}

def post_topic_message(self, topic_name: str, topic_message: Dict[str, Any], token_encoded: str) -> Dict[str, Any]:
def _post_topic_message(self, topic_name: str, topic_message: Dict[str, Any], token_encoded: str) -> Dict[str, Any]:
"""
Validate auth and schema; dispatch message to all writers.
Args:
Expand Down
48 changes: 48 additions & 0 deletions tests/handlers/test_handler_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
#
# Copyright 2026 ABSA Group Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import pytest
from unittest.mock import patch, mock_open

from src.handlers.handler_api import HandlerApi

Comment thread
coderabbitai[bot] marked this conversation as resolved.

def test_load_api_definition_success():
"""Test successful loading of API definition."""
mock_content = "openapi: 3.0.0\ninfo:\n title: Test API"
with patch("builtins.open", mock_open(read_data=mock_content)):
handler = HandlerApi().load_api_definition()
assert handler.api_spec == mock_content


def test_load_api_definition_file_not_found():
"""Test that FileNotFoundError is raised when api.yaml doesn't exist."""
with patch("builtins.open", side_effect=FileNotFoundError("api.yaml not found")):
handler = HandlerApi()
with pytest.raises(FileNotFoundError):
handler.load_api_definition()


def test_get_api_returns_correct_response():
"""Test get_api returns correct response structure."""
mock_content = "openapi: 3.0.0"
with patch("builtins.open", mock_open(read_data=mock_content)):
handler = HandlerApi().load_api_definition()
response = handler.get_api()

assert response["statusCode"] == 200
assert response["headers"]["Content-Type"] == "application/yaml"
assert response["body"] == mock_content
Loading