diff --git a/.coverage b/.coverage new file mode 100644 index 0000000..50b7caf Binary files /dev/null and b/.coverage differ diff --git a/.flake8 b/.flake8 index 7ec14f0..440b900 100644 --- a/.flake8 +++ b/.flake8 @@ -1,3 +1,5 @@ [flake8] ignore = E501, W503 max-line-length = 120 + +exclude = examples/grpc/*_pb2*.py diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index d852563..6578b1d 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -16,25 +16,25 @@ jobs: strategy: fail-fast: false matrix: - python-version: ["3.8", "3.9", "3.10"] + python-version: [ "3.9", "3.10", "3.11", "3.12", "3.13" ] steps: - - uses: actions/checkout@v3 - - name: Set up Python ${{ matrix.python-version }} - uses: actions/setup-python@v3 - with: - python-version: ${{ matrix.python-version }} - - name: Install dependencies - run: | - python -m pip install --upgrade pip - if [ -f requirements-dev.txt ]; then pip install -r requirements-dev.txt; fi - if [ -f requirements.txt ]; then pip install -r requirements.txt; fi - - name: Lint with flake8 - run: | - # stop the build if there are Python syntax errors or undefined names - flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics - # exit-zero treats all errors as warnings. The GitHub editor is 127 chars wide - flake8 . --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics - - name: Test with pytest - run: | - pytest tests + - uses: actions/checkout@v3 + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v3 + with: + python-version: ${{ matrix.python-version }} + - name: Install dependencies + run: | + python -m pip install --upgrade pip + if [ -f requirements-dev.txt ]; then pip install -r requirements-dev.txt; fi + if [ -f requirements.txt ]; then pip install -r requirements.txt; fi + - name: Lint with flake8 + run: | + # stop the build if there are Python syntax errors or undefined names + flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics + # exit-zero treats all errors as warnings. The GitHub editor is 127 chars wide + flake8 . --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics + - name: Test with pytest + run: | + pytest tests diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index fdad95a..ed66175 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -15,7 +15,7 @@ fail_fast: false repos: - repo: https://github.com/pre-commit/mirrors-mypy - rev: v1.11.2 + rev: v1.15.0 hooks: - id: mypy # Run mypy type checker; info: runs with flag --missing-imports args: [ --config-file=mypy.ini ] @@ -44,7 +44,7 @@ repos: ^.*_pb2* ) - repo: https://github.com/pre-commit/pre-commit-hooks - rev: v4.6.0 + rev: v5.0.0 hooks: - id: check-added-large-files # Prevent giant files from being committed args: [ maxkb=1024 ] @@ -67,7 +67,7 @@ repos: files: 'requirements/requirements.*\.txt$' - repo: https://github.com/pycqa/flake8 - rev: 7.1.1 + rev: 7.1.2 hooks: - id: flake8 diff --git a/RELEASE.md b/RELEASE.md index 3a0c95f..5439411 100644 --- a/RELEASE.md +++ b/RELEASE.md @@ -4,6 +4,14 @@ ***************** +## Release ONDEWO LOGGING PYTHON 3.5.0 + +### Improvements + +* Added AsyncTimer and async logger #21 + +***************** + ## Release ONDEWO LOGGING PYTHON 3.4.0 ### Improvements diff --git a/envs/local.env b/envs/local.env new file mode 100644 index 0000000..be2c1c9 --- /dev/null +++ b/envs/local.env @@ -0,0 +1,5 @@ +# without fluentd +ONDEWO_LOGGING_CONFIG_FILE=ondewo/logging/config/logging-no-fluentd.yaml + +# with fluentd +# ONDEWO_LOGGING_CONFIG_FILE=ondewo/logging/config/logging.yaml diff --git a/examples/__init__.py b/examples/__init__.py new file mode 100644 index 0000000..e674230 --- /dev/null +++ b/examples/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2021-2024 ONDEWO GmbH +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/examples/grpc/__init__.py b/examples/grpc/__init__.py new file mode 100644 index 0000000..e674230 --- /dev/null +++ b/examples/grpc/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2021-2024 ONDEWO GmbH +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/examples/grpc/grpc_client.py b/examples/grpc/grpc_client.py new file mode 100644 index 0000000..f13b1db --- /dev/null +++ b/examples/grpc/grpc_client.py @@ -0,0 +1,113 @@ +# Copyright 2019 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""An example of multiprocessing concurrency with gRPC.""" + +from __future__ import ( + absolute_import, + division, + print_function, +) + +import atexit +import logging +import multiprocessing +import operator +import sys +import time + +import grpc # type: ignore[import] + +import prime_pb2 # type: ignore[import] +import prime_pb2_grpc # type: ignore[import] + +_PROCESS_COUNT = 20 +_MAXIMUM_CANDIDATE = 10000 + +# Each worker process initializes a single channel after forking. +# It's regrettable, but to ensure that each subprocess only has to instantiate +# a single channel to be reused across all RPCs, we use globals. +_worker_channel_singleton = None +_worker_stub_singleton = None + +_LOGGER = logging.getLogger(__name__) + + +def _shutdown_worker(): + _LOGGER.info("Shutting worker process down.") + if _worker_channel_singleton is not None: + _worker_channel_singleton.close() + + +def _initialize_worker(server_address): + global _worker_channel_singleton # pylint: disable=global-statement + global _worker_stub_singleton # pylint: disable=global-statement + # _LOGGER.info("Initializing worker process.") + _worker_channel_singleton = grpc.insecure_channel(server_address) + _worker_stub_singleton = prime_pb2_grpc.PrimeCheckerStub( + _worker_channel_singleton + ) + atexit.register(_shutdown_worker) + + +def _run_worker_query(primality_candidate): + # _LOGGER.info("Checking primality of %s.", primality_candidate) + result = _worker_stub_singleton.check( + prime_pb2.PrimeCandidate(candidate=primality_candidate) + ) + # print(result) + return result + + +def _calculate_primes(server_address): + worker_pool = multiprocessing.Pool( + processes=_PROCESS_COUNT, + initializer=_initialize_worker, + initargs=(server_address,), + ) + check_range = range(2, _MAXIMUM_CANDIDATE) + primality = worker_pool.map(_run_worker_query, check_range) + primes = zip(check_range, map(operator.attrgetter("isPrime"), primality)) + # _LOGGER.info(f"primes: {primes}") + return tuple(primes) + + +def main(): + # msg:str = "Determine the primality of the first {} integers.".format( _MAXIMUM_CANDIDATE ) + # parser = argparse.ArgumentParser(description=msg) + # parser.add_argument( + # "server_address", + # help="The address of the server (e.g. localhost:51317)", + # default="localhost:35065" + # ) + # args = parser.parse_args() + # primes = _calculate_primes(args.server_address) + + start_main = time.time() + for _ in range(17): # Replace repeated calls with a loop + start_call = time.time() + primes = _calculate_primes("localhost:51317") + end_call = time.time() + print(f"Execution time for _calculate_primes: {end_call - start_call:.5f} seconds") + end_main = time.time() + print(primes) + print(f"Total execution time for main: {end_main - start_main:.5f} seconds") + + +if __name__ == "__main__": + handler = logging.StreamHandler(sys.stdout) + formatter = logging.Formatter("[PID %(process)d] %(message)s") + handler.setFormatter(formatter) + _LOGGER.addHandler(handler) + _LOGGER.setLevel(logging.INFO) + main() diff --git a/examples/grpc/grpc_multiprocessing_example_test.py b/examples/grpc/grpc_multiprocessing_example_test.py new file mode 100644 index 0000000..c013184 --- /dev/null +++ b/examples/grpc/grpc_multiprocessing_example_test.py @@ -0,0 +1,166 @@ +"""Test for multiprocessing example (refactored for pytest).""" + +import ast +import math +import os +import re +import subprocess +import sys +import tempfile +import time + +import pytest # Import pytest + +_BINARY_DIR = os.path.realpath( + os.path.join(os.path.dirname(os.path.abspath(__file__)), "..") +) +# Assume binaries are in the same location relative to the test file +# If running from a different structure, these paths might need adjustment. +_SERVER_PATH = os.path.join(_BINARY_DIR, "grpc", "grpc_server.py") +_CLIENT_PATH = os.path.join(_BINARY_DIR, "grpc", "grpc_client.py") + + +# --- Helper Functions (unchanged) --- + +def is_prime(n): + """Checks if a number is prime.""" + if n < 2: + return False + # Optimization: Check only up to the square root + for i in range(2, int(math.ceil(math.sqrt(n))) + 1): + if n % i == 0: + return False + return True + + +def _get_server_address(server_stream, timeout_sec=10): + """Reads the server stream to find the binding address.""" + start_time = time.time() + while time.time() - start_time < timeout_sec: + # Go back to the beginning to re-read potential new output + server_stream.seek(0) + # Read all lines currently available + lines = server_stream.readlines() + for line in lines: + matches = re.search("Binding to '(.+)'", line) + if matches is not None: + return matches.group(1) # Use group(1) for the captured group + # Avoid busy-waiting + time.sleep(0.1) + raise TimeoutError(f"Could not find server address in output within {timeout_sec}s") + + +# --- Pytest Fixture for Server Management --- + +@pytest.fixture(scope="function") # Use "module" scope if server can be reused across tests +def running_server(): + """Starts the server, yields its address, and ensures termination.""" + # Check if server executable exists before starting + if not os.path.exists(_SERVER_PATH): + pytest.skip(f"Server executable not found at {_SERVER_PATH}") + + # Using 'r+' mode allows reading and writing (needed for seek/read) + # Using text=True for automatic decoding/encoding + with tempfile.TemporaryFile(mode="r+", encoding='utf-8') as server_stdout: + server_process = None + try: + server_process = subprocess.Popen( + (sys.executable, _SERVER_PATH), + stdout=server_stdout, + stderr=subprocess.PIPE, # Capture stderr too for debugging + text=True, + encoding='utf-8' + ) + # Wait for the server to bind and print the address + server_address = _get_server_address(server_stdout) + yield server_address # Provide the address to the test function + + finally: + # Ensure the server process is terminated even if errors occur + if server_process: + server_process.terminate() + try: + # Wait a bit for graceful termination + stdout, stderr = server_process.communicate(timeout=5) + except subprocess.TimeoutExpired: + # Force kill if termination takes too long + server_process.kill() + stdout, stderr = server_process.communicate() + # Optional: Check stderr for server errors during test run + # if stderr: + # print(f"\nServer stderr:\n{stderr}") + + +# --- Test Function --- + +def test_multiprocessing_example(running_server): + """Tests the client-server interaction.""" + server_address = running_server # Get address from the fixture + + # Check if client executable exists + if not os.path.exists(_CLIENT_PATH): + pytest.skip(f"Client executable not found at {_CLIENT_PATH}") + + client_process = None + # Use a temporary file for the client's output + with tempfile.TemporaryFile(mode="r+", encoding='utf-8') as client_stdout: + try: + client_process = subprocess.Popen( + ( + _CLIENT_PATH, + server_address, # Pass the address obtained from the fixture + ), + stdout=client_stdout, + stderr=subprocess.PIPE, # Capture stderr + text=True, + encoding='utf-8' + ) + # Wait for the client process to complete + client_stdout_str, client_stderr_str = client_process.communicate(timeout=30) # Add timeout + + # Optional: Check client return code and stderr + assert client_process.returncode == 0, f"Client exited with code {client_process.returncode}. Stderr:\n{client_stderr_str}" + # if client_stderr_str: + # print(f"\nClient stderr:\n{client_stderr_str}") + + # Process the client's output + # Go back to the start of the temp file to read its content + client_stdout.seek(0) + client_output_lines = client_stdout.read().strip().splitlines() + + # Ensure there is output before trying to access the last line + assert client_output_lines, "Client produced no output" + + # Parse the last line which should contain the results + # Use try-except for robustness against malformed output + try: + results = ast.literal_eval(client_output_lines[-1]) + except (SyntaxError, ValueError) as e: + pytest.fail(f"Failed to parse client output: {e}\nOutput:\n{client_output_lines[-1]}") + + # Perform assertions using pytest's assert + values = tuple(result[0] for result in results) + expected_values = tuple(range(2, 10000)) + + # Using tuple() for comparison as range object isn't directly comparable + assert values == expected_values, "Sequence of numbers processed is incorrect" + + for result in results: + # Ensure result is a list/tuple with at least 2 elements + assert isinstance(result, (list, tuple)) and len(result) >= 2, f"Unexpected result format: {result}" + number, is_prime_result = result[0], result[1] + assert isinstance(number, int), f"Expected integer, got {type(number)} in {result}" + assert isinstance(is_prime_result, bool), f"Expected boolean, got {type(is_prime_result)} in {result}" + # Use the helper function to verify primality + assert is_prime(number) == is_prime_result, f"Primality check failed for {number}" + + except subprocess.TimeoutExpired: + pytest.fail(f"Client process timed out. Address: {server_address}") + finally: + # Ensure client process is cleaned up if it exists (though communicate should handle this) + if client_process and client_process.poll() is None: + client_process.kill() + client_process.wait() + +# Note: The `if __name__ == "__main__": unittest.main()` block is removed. +# Tests are run using the pytest command-line tool, e.g., `pytest your_test_file.py` diff --git a/examples/grpc/grpc_server.py b/examples/grpc/grpc_server.py new file mode 100644 index 0000000..4342e69 --- /dev/null +++ b/examples/grpc/grpc_server.py @@ -0,0 +1,134 @@ +# Copyright 2019 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""An example of multiprocess concurrency with gRPC.""" +from __future__ import ( + absolute_import, + division, + print_function, +) + +import contextlib +import datetime +import math +import multiprocessing +import socket +import sys +import time +from concurrent import futures + +import grpc # type: ignore[import] + +import prime_pb2 # type: ignore[import] +import prime_pb2_grpc # type: ignore[import] +from ondewo.logging.logger import logger_console as log + +_ONE_DAY = datetime.timedelta(days=1) +_PROCESS_COUNT = multiprocessing.cpu_count() +_THREAD_CONCURRENCY = _PROCESS_COUNT + + +# log = logging.getLogger(__name__) +# handler = logging.StreamHandler(sys.stdout) +# formatter = logging.Formatter("[PID %(process)d] %(message)s") +# handler.setFormatter(formatter) +# log.addHandler(handler) +# log.setLevel(logging.DEBUG) + +def is_prime(n): + # start_time = time.time() + # log.debug("is prime: %s", n) + for i in range(2, int(math.ceil(math.sqrt(n)))): + if n % i == 0: + # log.debug("Execution time for check: %.5f seconds", time.time() - start_time) + return False + else: + # log.debug("Execution time for check: %.5f seconds", time.time() - start_time) + return True + + +class PrimeChecker(prime_pb2_grpc.PrimeCheckerServicer): + + # Timer decorator is too slow + # @Timer(logger=log.debug, log_arguments=False, message='check if Prime: Elapsed time: {:.5f}') + def check(self, request, context): + start_time = time.time() + log.debug("Determining primality of %s", request.candidate) + result = prime_pb2.Primality(isPrime=is_prime(request.candidate)) + # log.debug(f"{request.candidate} is prime: {result.isPrime}") + # print(f"{request.candidate} is prime: {result.isPrime}") + log.debug("Execution time for check: %.5f seconds", time.time() - start_time) + return result + + +def _wait_forever(server): + try: + while True: + time.sleep(_ONE_DAY.total_seconds()) + except KeyboardInterrupt: + server.stop(None) + + +def _run_server(bind_address): + """Start a server in a subprocess.""" + log.info("Starting new server.") + options = (("grpc.so_reuseport", 1),) + + server = grpc.server( + futures.ThreadPoolExecutor( + max_workers=_THREAD_CONCURRENCY, + ), + options=options, + ) + prime_pb2_grpc.add_PrimeCheckerServicer_to_server(PrimeChecker(), server) + server.add_insecure_port(bind_address) + server.start() + _wait_forever(server) + + +@contextlib.contextmanager +def _reserve_port(): + """Find and reserve a port for all subprocesses to use.""" + sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) + if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 0: + raise RuntimeError("Failed to set SO_REUSEPORT.") + sock.bind(("", 0)) + try: + yield sock.getsockname()[1] + finally: + sock.close() + + +def main(): + # with _reserve_port() as port: + bind_address = "localhost:{}".format(str(51317)) + # bind_address = "localhost:{}".format(port) + log.info("Binding to '%s'", bind_address) + sys.stdout.flush() + workers = [] + for _ in range(_PROCESS_COUNT): + # NOTE: It is imperative that the worker subprocesses be forked before + # any gRPC servers start up. See + # https://github.com/grpc/grpc/issues/16001 for more details. + worker = multiprocessing.Process( + target=_run_server, args=(bind_address,) + ) + worker.start() + workers.append(worker) + for worker in workers: + worker.join() + + +if __name__ == "__main__": + main() diff --git a/examples/grpc/multiprocess_snippet.py b/examples/grpc/multiprocess_snippet.py new file mode 100644 index 0000000..d645dc6 --- /dev/null +++ b/examples/grpc/multiprocess_snippet.py @@ -0,0 +1,16 @@ +from multiprocessing import Process, current_process + +from ondewo.logging.logger import logger_console + + +def worker(): + logger_console.info({"message": f"hello from {current_process().pid}"}) + + +if __name__ == "__main__": + procs = [Process(target=worker) for _ in range(4)] + for p in procs: + p.start() + for p in procs: + p.join() + print("all done") diff --git a/examples/grpc/prime.proto b/examples/grpc/prime.proto new file mode 100644 index 0000000..4ef232f --- /dev/null +++ b/examples/grpc/prime.proto @@ -0,0 +1,35 @@ +// Copyright 2019 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +package prime; + +// A candidate integer for primality testing. +message PrimeCandidate { + // The candidate. + int64 candidate = 1; +} + +// The primality of the requested integer candidate. +message Primality { + // Is the candidate prime? + bool isPrime = 1; +} + +// Service to check primality. +service PrimeChecker { + // Determines the primality of an integer. + rpc check (PrimeCandidate) returns (Primality) {} +} diff --git a/examples/grpc/prime_pb2.py b/examples/grpc/prime_pb2.py new file mode 100644 index 0000000..1ee53ad --- /dev/null +++ b/examples/grpc/prime_pb2.py @@ -0,0 +1,40 @@ +# -*- coding: utf-8 -*- +# Generated by the protocol buffer compiler. DO NOT EDIT! +# NO CHECKED-IN PROTOBUF GENCODE +# source: prime.proto +# Protobuf Python Version: 5.29.0 +"""Generated protocol buffer code.""" +from google.protobuf import descriptor as _descriptor +from google.protobuf import descriptor_pool as _descriptor_pool +from google.protobuf import runtime_version as _runtime_version +from google.protobuf import symbol_database as _symbol_database +from google.protobuf.internal import builder as _builder +_runtime_version.ValidateProtobufRuntimeVersion( + _runtime_version.Domain.PUBLIC, + 5, + 29, + 0, + '', + 'prime.proto' +) +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + + + +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0bprime.proto\x12\x05prime\"#\n\x0ePrimeCandidate\x12\x11\n\tcandidate\x18\x01 \x01(\x03\"\x1c\n\tPrimality\x12\x0f\n\x07isPrime\x18\x01 \x01(\x08\x32\x42\n\x0cPrimeChecker\x12\x32\n\x05\x63heck\x12\x15.prime.PrimeCandidate\x1a\x10.prime.Primality\"\x00\x62\x06proto3') + +_globals = globals() +_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) +_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'prime_pb2', _globals) +if not _descriptor._USE_C_DESCRIPTORS: + DESCRIPTOR._loaded_options = None + _globals['_PRIMECANDIDATE']._serialized_start=22 + _globals['_PRIMECANDIDATE']._serialized_end=57 + _globals['_PRIMALITY']._serialized_start=59 + _globals['_PRIMALITY']._serialized_end=87 + _globals['_PRIMECHECKER']._serialized_start=89 + _globals['_PRIMECHECKER']._serialized_end=155 +# @@protoc_insertion_point(module_scope) diff --git a/examples/grpc/prime_pb2_grpc.py b/examples/grpc/prime_pb2_grpc.py new file mode 100644 index 0000000..1bb5184 --- /dev/null +++ b/examples/grpc/prime_pb2_grpc.py @@ -0,0 +1,101 @@ +# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! +"""Client and server classes corresponding to protobuf-defined services.""" +import grpc +import warnings + +import prime_pb2 as prime__pb2 + +GRPC_GENERATED_VERSION = '1.71.0' +GRPC_VERSION = grpc.__version__ +_version_not_supported = False + +try: + from grpc._utilities import first_version_is_lower + _version_not_supported = first_version_is_lower(GRPC_VERSION, GRPC_GENERATED_VERSION) +except ImportError: + _version_not_supported = True + +if _version_not_supported: + raise RuntimeError( + f'The grpc package installed is at version {GRPC_VERSION},' + + f' but the generated code in prime_pb2_grpc.py depends on' + + f' grpcio>={GRPC_GENERATED_VERSION}.' + + f' Please upgrade your grpc module to grpcio>={GRPC_GENERATED_VERSION}' + + f' or downgrade your generated code using grpcio-tools<={GRPC_VERSION}.' + ) + + +class PrimeCheckerStub(object): + """Service to check primality. + """ + + def __init__(self, channel): + """Constructor. + + Args: + channel: A grpc.Channel. + """ + self.check = channel.unary_unary( + '/prime.PrimeChecker/check', + request_serializer=prime__pb2.PrimeCandidate.SerializeToString, + response_deserializer=prime__pb2.Primality.FromString, + _registered_method=True) + + +class PrimeCheckerServicer(object): + """Service to check primality. + """ + + def check(self, request, context): + """Determines the primality of an integer. + """ + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + +def add_PrimeCheckerServicer_to_server(servicer, server): + rpc_method_handlers = { + 'check': grpc.unary_unary_rpc_method_handler( + servicer.check, + request_deserializer=prime__pb2.PrimeCandidate.FromString, + response_serializer=prime__pb2.Primality.SerializeToString, + ), + } + generic_handler = grpc.method_handlers_generic_handler( + 'prime.PrimeChecker', rpc_method_handlers) + server.add_generic_rpc_handlers((generic_handler,)) + server.add_registered_method_handlers('prime.PrimeChecker', rpc_method_handlers) + + + # This class is part of an EXPERIMENTAL API. +class PrimeChecker(object): + """Service to check primality. + """ + + @staticmethod + def check(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary( + request, + target, + '/prime.PrimeChecker/check', + prime__pb2.PrimeCandidate.SerializeToString, + prime__pb2.Primality.FromString, + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + _registered_method=True) diff --git a/examples/grpc/requirements.txt b/examples/grpc/requirements.txt new file mode 100644 index 0000000..348adac --- /dev/null +++ b/examples/grpc/requirements.txt @@ -0,0 +1,4 @@ +grpcio +grpcio-tools +pytest-asyncio +pytest diff --git a/mypy.ini b/mypy.ini index be2d442..fa85ba0 100644 --- a/mypy.ini +++ b/mypy.ini @@ -14,3 +14,9 @@ ignore_missing_imports = True [mypy-wrapt] ignore_errors = True ignore_missing_imports = True +[mypy-grpc.*] +ignore_errors = True +ignore_missing_imports = True +[mypy-prime_pb2.*] +ignore_errors = True +ignore_missing_imports = True diff --git a/ondewo/logging/async_logger.py b/ondewo/logging/async_logger.py new file mode 100644 index 0000000..44cf9e4 --- /dev/null +++ b/ondewo/logging/async_logger.py @@ -0,0 +1,152 @@ +import asyncio +import logging +import logging.config +import os +import sys +from pathlib import Path +from typing import ( + Any, + Dict, + Optional, + Tuple, +) + +import aiofiles +import yaml +from dotenv import load_dotenv + +from ondewo.logging.decorators import AsyncTimer + +load_dotenv() +MODULE_NAME: str = os.getenv("MODULE_NAME", "") +GIT_REPO_NAME: str = os.getenv("GIT_REPO_NAME", "") +DOCKER_IMAGE_NAME: str = os.getenv("DOCKER_IMAGE_NAME", "") + + +class AsyncCustomLogger(logging.Logger): + GRPC_LEVEL_NUM = 25 + logging.addLevelName(GRPC_LEVEL_NUM, "GRPC") + + async def grpc(self, message_dict: Dict[str, Any], *args, **kwargs) -> None: + if self.isEnabledFor(self.GRPC_LEVEL_NUM): + await asyncio.sleep(0) # Simulate async processing + super().log(self.GRPC_LEVEL_NUM, message_dict, *args, **kwargs) + + +async def import_config() -> Dict[str, Any]: + """ + Asynchronously imports the logging configuration from a YAML file. + + The function first checks for 'logging.yaml' in the current directory. + If not found, it looks in a 'config' subdirectory relative to the + location of the current file. + + Returns: + A dictionary containing the loaded configuration. Returns an empty + dictionary if the file is not found or if there's an error parsing + the YAML. + """ + config_path: Path + + if os.path.exists("./logging.yaml"): + config_path = Path("./logging.yaml") + else: + parent: Path = Path(__file__).resolve().parent + config_path = parent / "config" / "logging.yaml" + + conf: Dict[str, Any] = {} + + try: + async with aiofiles.open(config_path, mode="r") as fd: + content: str = await fd.read() + try: + loaded_config: Optional[Dict[str, Any]] = yaml.safe_load(content) + conf = loaded_config if loaded_config is not None else {} + except yaml.YAMLError as e: + print(f"Error parsing YAML file: {e}") + conf = {} + except FileNotFoundError: + print(f"Config file not found: {config_path}") + except yaml.YAMLError as e: + print(f"Error parsing YAML file: {e}") + + return conf + + +async def set_module_name( + module_name: str, + git_repo_name: str, + docker_image_name: str, + conf: Dict[str, Any] +) -> Dict[str, Any]: + conf["logging"]["formatters"]["fluent_debug"]["format"]["module_name"] = module_name + conf["logging"]["formatters"]["fluent_debug"]["format"]["git_repo_name"] = git_repo_name + conf["logging"]["formatters"]["fluent_debug"]["format"]["docker_image_name"] = docker_image_name + + conf["logging"]["formatters"]["fluent_console"]["format"]["module_name"] = module_name + conf["logging"]["formatters"]["fluent_console"]["format"]["git_repo_name"] = git_repo_name + conf["logging"]["formatters"]["fluent_console"]["format"]["docker_image_name"] = docker_image_name + + return conf + + +async def initiate_loggers(conf: Dict[str, Any]) -> Tuple[logging.Logger, ...]: + logging.setLoggerClass(AsyncCustomLogger) + logging.config.dictConfig(conf["logging"]) + + if not GIT_REPO_NAME: + logging.warning("NO GIT REPO NAME WAS GIVEN. Please set the GIT_REPO_NAME env var.") + if not DOCKER_IMAGE_NAME: + logging.warning("NO DOCKER IMAGE NAME WAS GIVEN. Please state the docker image for the logs.") + if not MODULE_NAME: + logging.warning("NO MODULE NAME WAS GIVEN. Please provide a module name for the logs.") + if not os.path.exists("./logging.yaml"): + logging.info("No logging.yaml in the root of the project, using the default config.") + + logger_root: logging.Logger = logging.getLogger("root") + logger_console: logging.Logger = logging.getLogger("console") + logger_debug: logging.Logger = logging.getLogger("debug") + + return logger_root, logger_console, logger_debug + + +async def check_python_version(logger: logging.Logger) -> None: + if sys.version_info[0] == 2: + logger.error("Python 2 is not supported. Please use Python 3.") + + +async def create_logs(conf: Optional[Dict[str, Any]] = None) -> Tuple[logging.Logger, ...]: + conf = conf if conf else await import_config() + conf = await set_module_name(MODULE_NAME, GIT_REPO_NAME, DOCKER_IMAGE_NAME, conf) + logger_root, logger_console, logger_debug = await initiate_loggers(conf) + await check_python_version(logger_console) + return logger_root, logger_console, logger_debug + + +def sync_create_logs(): + try: + return asyncio.run(create_logs()) + except RuntimeError: + loop = asyncio.get_event_loop() + return loop.run_until_complete(create_logs()) + + +logger_root, logger_console, logger_debug = sync_create_logs() + + +# Example usage +async def main(): + logger_root, logger_console, logger_debug = await create_logs() + logger_console.info("Asynchronous logging system initialized.") + + @AsyncTimer(logger=logger_console.debug, log_arguments=False, message="Test Elapsed: {:0.3f}") + async def my_test_function(val: int) -> int: + await asyncio.sleep(0.01) # Simulate some work + logger_console.info(f"Test value: {val}") + return val * 2 + + await my_test_function(5) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/ondewo/logging/config/logging-no-fluentd.yaml b/ondewo/logging/config/logging-no-fluentd.yaml new file mode 100644 index 0000000..de94274 --- /dev/null +++ b/ondewo/logging/config/logging-no-fluentd.yaml @@ -0,0 +1,138 @@ +logging: + version: 1 + + formatters: + brief: + format: '%(message)s' + default: + format: >- + %(asctime)s.%(msecs)03d [%(process)d|%(threadName)-8s] - %(levelname)-4s - %(message)s + datefmt: '%Y-%m-%dT%H:%M:%S' + debug: + format: >- + %(asctime)s.%(msecs)03d %(module)s:%(funcName)s():%(lineno)d - [%(processName)s|%(threadName)s] - %(levelname)s - %(message)s + datefmt: '%Y-%m-%dT%H:%M:%S' + fluent_console: + '()': fluent.handler.FluentRecordFormatter + format: + time: '%(asctime)s' + where: '[%(module)s|%(funcName)s]' + message: '%(message)s' + datefmt: '%H:%M:%S' + fluent_debug: + '()': fluent.handler.FluentRecordFormatter + format: + level: '%(levelname)s' + hostname: '%(hostname)s' + where: '%(module)s.%(funcName)s' + stack_trace: '%(exc_text)s' + message: '%(message)s' + time: '%(asctime)s.%(msecs)03d' + process: '%(processName)s:%(process)d' + thread: '%(threadName)s:%(thread)d' + datefmt: '%Y-%m-%dT%H:%M:%S' + + handlers: + console: + class: logging.StreamHandler + level: DEBUG + formatter: debug + stream: ext://sys.stdout + debug: + class: logging.StreamHandler + level: DEBUG + formatter: debug + stream: ext://sys.stdout + # fluent-console: + # class: fluent.handler.FluentHandler + # host: 172.17.0.1 + # port: 24221 + # tag: py.console.logging + # buffer_overflow_handler: overflow_handler + # formatter: fluent_console + # level: DEBUG + # fluent-async-console: + # class: fluent.asynchandler.FluentHandler + # host: 172.17.0.1 + # port: 24221 + # tag: py.console.async.logging + # buffer_overflow_handler: overflow_handler + # formatter: fluent_console + # level: DEBUG + # fluent-debug: + # class: fluent.handler.FluentHandler + # host: 172.17.0.1 + # port: 24221 + # tag: py.debug.logging + # buffer_overflow_handler: overflow_handler + # formatter: fluent_debug + # level: DEBUG + # fluent-async-debug: + # class: fluent.asynchandler.FluentHandler + # host: 172.17.0.1 + # port: 24221 + # tag: py.debug.async.logging + # buffer_overflow_handler: overflow_handler + # formatter: fluent_debug + # level: DEBUG + # fluent-elastic: + # class: fluent.handler.FluentHandler + # host: 172.17.0.1 + # port: 24221 + # tag: py.elastic.logging + # buffer_overflow_handler: overflow_handler + # formatter: fluent_debug + # level: DEBUG + # fluent-async-elastic: + # class: fluent.asynchandler.FluentHandler + # host: 172.17.0.1 + # port: 24221 + # tag: py.elastic.async.logging + # buffer_overflow_handler: overflow_handler + # formatter: fluent_debug + # level: DEBUG + # fluent-async-elastic-develop: + # class: fluent.asynchandler.FluentHandler + # host: 172.17.0.1 + # port: 24221 + # tag: py.elastic.develop.async.logging + # buffer_overflow_handler: overflow_handler + # formatter: fluent_debug + # level: DEBUG + # fluent-async-elastic-staging: + # class: fluent.asynchandler.FluentHandler + # host: 172.17.0.1 + # port: 24221 + # tag: py.elastic.staging.async.logging + # buffer_overflow_handler: overflow_handler + # formatter: fluent_debug + # level: DEBUG + # fluent-async-elastic-production: + # class: fluent.asynchandler.FluentHandler + # host: 172.17.0.1 + # port: 24221 + # tag: py.elastic.production.async.logging + # buffer_overflow_handler: overflow_handler + # formatter: fluent_debug + # level: DEBUG + 'none': # py2 crashes if this isnt strung + class: logging.NullHandler + + loggers: + 'null': + handlers: [ 'none' ] + level: DEBUG + propagate: False + console: + handlers: [ console ] + level: DEBUG + propagate: False + debug: + handlers: [ debug ] + level: DEBUG + propagate: False + '': # root logger + handlers: [ debug ] + # handlers: [fluent-async-console, fluent-async-debug, fluent-async-elastic] + level: DEBUG + propagate: False diff --git a/ondewo/logging/decorators.py b/ondewo/logging/decorators.py index 2c98165..eb32c81 100644 --- a/ondewo/logging/decorators.py +++ b/ondewo/logging/decorators.py @@ -11,8 +11,9 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - import functools +import inspect +import json import time import traceback import uuid @@ -22,6 +23,8 @@ dataclass, field, ) +from functools import wraps +from inspect import iscoroutinefunction from logging import ( Filter, Logger, @@ -30,6 +33,7 @@ from typing import ( Any, Callable, + Coroutine, Dict, Optional, TypeVar, @@ -352,3 +356,308 @@ def __enter__(self) -> None: def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: """Remove the filter from the logger when leaving the context.""" self.logger.removeFilter(self.filter) + + +def async_log_exception(logger: Callable[[Dict[str, Any]], Coroutine[Any, Any, None]]): + """ + An asynchronous decorator to log exceptions raised by an async function. + + Args: + logger: An async function to handle the log record. + """ + + def decorator(func: Callable[..., Coroutine[Any, Any, Any]]): + @wraps(func) + async def wrapper(*args: Any, **kwargs: Any) -> Any: + try: + return await func(*args, **kwargs) + except Exception as e: + exc_type = type(e) + exc_val = e + tb = traceback.extract_tb(e.__traceback__) + traceback_str = "".join(traceback.format_list(tb)) + function_name = func.__name__ + log_data = { + "tags": ["exception"], + "message": str(exc_val), + "traceback": traceback_str, + "function": function_name, + "exception_type": exc_type.__name__, + "log_type": "exception", + } + await logger(log_data) + raise # Re-raise the exception after logging + + return wrapper + + return decorator + + +def async_log_args_kwargs_results( + logger: Callable[[Dict[str, Any]], Coroutine[Any, Any, None]], + argument_max_length: int = 50, +): + """ + An asynchronous decorator to log arguments, keyword arguments, and the result + of an async function. + + Args: + logger: An async function that accepts a dictionary containing + the log information. + argument_max_length: The maximum length to represent arguments as strings. + """ + + def decorator(func: Callable[..., Coroutine[Any, Any, Any]]): + @wraps(func) + async def wrapper(*args: Any, **kwargs: Any) -> Any: + signature = inspect.signature(func) + bound_arguments = signature.bind(*args, **kwargs) + bound_arguments.apply_defaults() + + formatted_args = { + name: repr(value)[:argument_max_length] + for name, value in bound_arguments.arguments.items() + } + formatted_kwargs = { + name: repr(value)[:argument_max_length] for name, value in kwargs.items() + } + + log_data_before = { + "message": f"Function called: {func.__name__} with args={json.dumps(formatted_args)}, kwargs={json.dumps(formatted_kwargs)}", + "function": func.__name__, + "args": formatted_args, + "kwargs": formatted_kwargs, + "log_type": "call", + } + await logger(log_data_before) + + try: + result = await func(*args, **kwargs) + formatted_result = repr(result)[:argument_max_length] + log_data_after = { + "message": f"Function returned: {func.__name__} -> {formatted_result}", + "function": func.__name__, + "result": formatted_result, + "log_type": "return", + } + await logger(log_data_after) + return result + except Exception as e: + exc_type = type(e) + exc_val = str(e) + log_data_exception = { + "message": f"Exception in {func.__name__}: {exc_type.__name__} - {exc_val}", + "function": func.__name__, + "exception_type": exc_type.__name__, + "exception": exc_val, + "log_type": "exception", + } + await logger(log_data_exception) + raise # Re-raise the exception + + return wrapper + + return decorator + + +def async_exception_handling( + func: Callable[..., Coroutine[Any, Any, None]] +) -> Callable[..., Coroutine[Any, Any, None]]: + + def sync_wrapper(*args: Any, **kwargs: Any) -> Coroutine[Any, Any, Any]: + async def async_wrapper(*inner_args: Any, **inner_kwargs: Any) -> Any: + try: + return await func(*inner_args, **inner_kwargs) + except Exception as e: + # In a real scenario, you would likely use a proper logging mechanism + print(f"Caught exception in {func.__name__}: {e}") + # Here, we'll simulate logging by returning a dictionary + return {"tags": ["exception"], "message": str(e)} + + return async_wrapper(*args, **kwargs) + + return sync_wrapper + + +def async_log_arguments(logger_func: Callable[[Dict[str, Any]], Coroutine[Any, Any, None]]): + """ + An asynchronous decorator to log the arguments of an async function. + + Args: + logger_func: An async function that accepts a dictionary containing + the log information. + """ + + def decorator(func: Callable[..., Coroutine[Any, Any, Any]]): + @wraps(func) + async def wrapper(*args: Any, **kwargs: Any) -> Any: + signature = inspect.signature(func) + bound_arguments = signature.bind(*args, **kwargs) + bound_arguments.apply_defaults() + arguments_log = {name: str(value) for name, value in bound_arguments.arguments.items()} + log_data = { + "message": f"Function arguments: {json.dumps({'function': func.__name__, 'args': arguments_log, 'kwargs': kwargs})}", + "function": func.__name__, + "args": arguments_log, + "kwargs": kwargs, + "log_type": "arguments", + } + await logger_func(log_data) + return await func(*args, **kwargs) + + return wrapper + + return decorator + + +@dataclass +class AsyncTimer: + """Time your async code using a class, context manager, or decorator""" + + name: str = field(default_factory=lambda: str(uuid.uuid4())) + message: str = FINISH + logger: Union[Logger, Callable[[Dict[str, Any]], Union[None, Coroutine[Any, Any, None]]]] = field( + default=logger_console + ) + _start_times: Dict[int, float] = field(default_factory=dict, init=False, repr=False) + log_arguments: bool = True + suppress_exceptions: bool = False + recursive: bool = False + recurse_depths: Dict[int, float] = field(default_factory=lambda: defaultdict(float)) + argument_max_length: int = 10000 + + def __post_init__(self) -> None: + """Ensure logger is callable.""" + if not callable(self.logger): + raise ValueError("Logger must be a callable function.") + + def __call__(self, func: Callable[..., Coroutine[Any, Any, Any]]) -> Callable[..., Coroutine[Any, Any, Any]]: + @wraps(func) + async def wrapper(*args: Any, **kwargs: Any) -> Any: + start_time = time.perf_counter() + thread_id: int = get_ident() + function_name = func.__name__ + + await self._log_async({"message": START.format(function_name, thread_id)}) + + if thread_id in self._start_times and self.recursive: + self.recurse_depths[thread_id] += 1 + await self._log_async( + {"message": f"Recursing in {function_name}, depth = {self.recurse_depths[thread_id]}"} + ) + + self._start_times[thread_id] = start_time + + value: Any + try: + value = await func(*args, **kwargs) + except Exception as exc: + trace = traceback.format_exc() + await self._log_async({"message": f"Exception in {function_name}: {trace}", "tags": ["exception"]}) + if not self.suppress_exceptions: + elapsed_time = time.perf_counter() - self._start_times.pop(thread_id, time.perf_counter()) + await self.report(elapsed_time=elapsed_time, func_name=function_name, thread_id=thread_id) + raise + value = f"An exception occurred! {exc}" + + if self.log_arguments: + arg_reprs = [repr(arg)[:self.argument_max_length] for arg in args] + kwarg_reprs = {k: repr(v)[:self.argument_max_length] for k, v in kwargs.items()} + await self._log_async( + { + "message": f"Function {function_name} executed.", + "args": arg_reprs, + "kwargs": kwarg_reprs, + "result": repr(value)[:self.argument_max_length] + } + ) + + elapsed_time = time.perf_counter() - self._start_times.pop(thread_id, time.perf_counter()) + await self.report(elapsed_time=elapsed_time, func_name=function_name, thread_id=thread_id) + return value + + return wrapper + + async def __aenter__(self) -> "AsyncTimer": + self._context_start_time = time.perf_counter() + thread_id: int = get_ident() + await self._log_async({"message": START.format(self.name, thread_id)}) + return self + + async def __aexit__( + self, exc_type: Optional[type[BaseException]], exc_val: Optional[BaseException], + exc_tb: Optional[traceback.TracebackException] + ) -> None: + elapsed_time = time.perf_counter() - self._context_start_time + thread_id: int = get_ident() + await self.report(elapsed_time=elapsed_time, func_name=self.name, thread_id=thread_id) + if exc_type and not self.suppress_exceptions: + raise + + async def start( + self, + func: Optional[Callable[..., Any]] = None, + instance: Optional[Any] = None, + args: Optional[Any] = None, + kwargs: Optional[Any] = None + ) -> None: + """Start a new timer""" + thread_id: int = get_ident() + function_name = self._get_function_name(func) + + await self._log_async({"message": START.format(function_name, thread_id)}) + + if thread_id in self._start_times: + if self.recursive: + self.recurse_depths[thread_id] += 1 + await self._log_async({"message": f"Recursing, depth = {self.recurse_depths[thread_id]}"}) + return + else: + self._start_times[thread_id] = time.perf_counter() + + async def stop(self, func: Optional[Callable[..., Any]] = None) -> float: + """Stop the timer, and report the elapsed time""" + thread_id: int = get_ident() + + if self.recurse_depths[thread_id]: + self.recurse_depths[thread_id] -= 1 + if self.recursive: + return 0.0 + + # Calculate elapsed time + elapsed_time = time.perf_counter() - self._start_times.pop(thread_id, time.perf_counter()) + + # Report elapsed time + func_name = self._get_function_name(func) + await self.report(elapsed_time=elapsed_time, func_name=func_name, thread_id=thread_id) + + return elapsed_time + + async def report( + self, + elapsed_time: float, + func_name: Optional[str] = None, + thread_id: Optional[int] = None, + ) -> None: + name: str = func_name or CONTEXT + + message = self.message.format(elapsed_time, name, thread_id) + log: Dict[str, Any] = { + "message": message, + "duration": elapsed_time, + "tags": ["timing"], + } + await self._log_async(log) + + async def _log_async(self, log: Dict[str, Any]) -> None: + """Helper function to log synchronously or asynchronously""" + if iscoroutinefunction(self.logger): + await self.logger(log) # Async logger + else: + self.logger(log) # type:ignore # Sync logger + + def _get_function_name(self, func: Optional[Callable[..., Any]]) -> str: + """Helper function to get the function name safely""" + if func is None: + return "UNKNOWN_FUNCTION_NAME" + return getattr(func, '__name__', str(func)) diff --git a/ondewo/logging/logger.py b/ondewo/logging/logger.py index de0024b..0ef1240 100644 --- a/ondewo/logging/logger.py +++ b/ondewo/logging/logger.py @@ -113,23 +113,28 @@ def grpc(self, message_dict: Dict[str, Any], *args, **kwargs) -> None: # type: ) -def import_config() -> Dict[str, Any]: +def import_config(use_packaged: bool = False) -> Dict[str, Any]: """ Imports the config from the yaml file. The yaml file is taken relative to this file, so nothing about the python path is assumed. :param: :return: logging config as a dictionary """ - if os.path.exists("/home/ondewo/logging.yaml"): - config_path: str = "/home/ondewo/logging.yaml" + config_path: str + ondewo_logging_config_file: str = os.getenv("ONDEWO_LOGGING_CONFIG_FILE", "") + if ondewo_logging_config_file and os.path.exists(ondewo_logging_config_file) and not use_packaged: + config_path = ondewo_logging_config_file + elif os.path.exists("/home/ondewo/logging.yaml") and not use_packaged: + config_path = "/home/ondewo/logging.yaml" else: parent: str = os.path.abspath(os.path.dirname(file_anchor.__file__)) config_path = f"{parent}/config/logging.yaml" + conf: Dict[str, Any] with open(config_path) as fd: conf = yaml.safe_load(fd) - return conf # type: ignore + return conf def set_module_name( @@ -145,13 +150,15 @@ def set_module_name( :param conf: the config of the logger :return: the config with module name """ - conf["logging"]["formatters"]["fluent_debug"]["format"]["module_name"] = module_name - conf["logging"]["formatters"]["fluent_debug"]["format"]["git_repo_name"] = git_repo_name - conf["logging"]["formatters"]["fluent_debug"]["format"]["docker_image_name"] = docker_image_name + if "fluent_debug" in conf["logging"]["formatters"]: + conf["logging"]["formatters"]["fluent_debug"]["format"]["module_name"] = module_name + conf["logging"]["formatters"]["fluent_debug"]["format"]["git_repo_name"] = git_repo_name + conf["logging"]["formatters"]["fluent_debug"]["format"]["docker_image_name"] = docker_image_name - conf["logging"]["formatters"]["fluent_console"]["format"]["module_name"] = module_name - conf["logging"]["formatters"]["fluent_console"]["format"]["git_repo_name"] = git_repo_name - conf["logging"]["formatters"]["fluent_console"]["format"]["docker_image_name"] = docker_image_name + if "fluent_console" in conf["logging"]["formatters"]: + conf["logging"]["formatters"]["fluent_console"]["format"]["module_name"] = module_name + conf["logging"]["formatters"]["fluent_console"]["format"]["git_repo_name"] = git_repo_name + conf["logging"]["formatters"]["fluent_console"]["format"]["docker_image_name"] = docker_image_name return conf @@ -198,17 +205,10 @@ def initiate_loggers(conf: Dict[str, Any]) -> Tuple[logging.Logger, ...]: def check_python_version(logger: logging.Logger) -> None: - """ - Checks the python version. The python3 logger cant be imported to python 2. - - :param: - :return: - """ + """Checks the python version. The python3 logger cant be imported to python 2.""" if sys.version_info[0] == 2: # this wont actually run because of syntax errors, but functions as a sort of documentation - logger.error( - "Looks like you imported the Python3 logger in a Python2 project. Did you mean to do that?" - ) + logger.error("Looks like you imported the Python3 logger in a Python2 project. Did you mean to do that?") def create_logs(conf: Optional[Dict[str, Any]] = None) -> Tuple[logging.Logger, ...]: diff --git a/ondewo/logging/version.py b/ondewo/logging/version.py index f631007..01bd03c 100644 --- a/ondewo/logging/version.py +++ b/ondewo/logging/version.py @@ -1 +1 @@ -__version__ = '3.4.0' +__version__ = '3.5.0' diff --git a/pytest.ini b/pytest.ini new file mode 100644 index 0000000..0102b0a --- /dev/null +++ b/pytest.ini @@ -0,0 +1,2 @@ +[pytest] +asyncio_default_fixture_loop_scope = function diff --git a/requirements-dev.txt b/requirements-dev.txt index 33d9561..1ba7f40 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -3,6 +3,7 @@ coverage>=6.4 jupyter pre-commit +pytest-asyncio pytest-cov>=5.0.0 pytest-timeout>=2.3.1 pytest-xdist>=3.6.1 diff --git a/requirements-static-code-checks.txt b/requirements-static-code-checks.txt index e9fe72f..b607421 100644 --- a/requirements-static-code-checks.txt +++ b/requirements-static-code-checks.txt @@ -9,3 +9,4 @@ types-mock types-setuptools types-toml types-PyYAML +types-aiofiles diff --git a/requirements.txt b/requirements.txt index be79d54..ae52df4 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,6 @@ -fluent-logger>=0.10.0 +aiofiles +asyncio +fluent-logger>=0.11.1 msgpack python_dotenv>=0.10.1 PyYAML>=5.3.1 diff --git a/setup.py b/setup.py index 2c382cb..3dabe04 100644 --- a/setup.py +++ b/setup.py @@ -41,11 +41,11 @@ def read_requirements(file_path: str, encoding: str = 'utf-8') -> List[str]: 'Intended Audience :: Developers', 'Topic :: Software Development :: Libraries', 'License :: OSI Approved :: Apache Software License', - 'Programming Language :: Python :: 3.8', 'Programming Language :: Python :: 3.9', 'Programming Language :: Python :: 3.10', 'Programming Language :: Python :: 3.11', 'Programming Language :: Python :: 3.12', + 'Programming Language :: Python :: 3.13', 'Operating System :: OS Independent', ], python_requires='>=3.8', diff --git a/tests/test_async_logger.py b/tests/test_async_logger.py new file mode 100644 index 0000000..2e0370b --- /dev/null +++ b/tests/test_async_logger.py @@ -0,0 +1,145 @@ +import logging +import os +import sys +from pathlib import Path +from typing import Any, Dict +from unittest.mock import patch + +import pytest +import yaml + +from ondewo.logging.async_logger import (AsyncCustomLogger, + check_python_version, create_logs, + import_config, initiate_loggers, + set_module_name) + + +@pytest.mark.asyncio +async def test_import_config_exists_at_root(tmp_path: Path) -> None: + """Tests if the config is loaded correctly when 'logging.yaml' exists at the root.""" + config_data: Dict[str, Any] = {"level": "INFO", "format": "%(asctime)s - %(levelname)s - %(message)s"} + config_file: Path = tmp_path / "logging.yaml" + with open(config_file, "w") as f: + yaml.dump(config_data, f) + + # Change the current working directory to tmp_path for the test + os.chdir(tmp_path) + try: + loaded_config: Dict[str, Any] = await import_config() + assert loaded_config == config_data + finally: + # Change back to the original working directory + os.chdir(Path(__file__).parent) + + +@pytest.mark.asyncio +async def test_import_config_exists_in_subdir(tmp_path: Path) -> None: + """Tests if the config is loaded correctly when 'logging.yaml' exists in the 'config' subdirectory.""" + config_data: Dict[str, Any] = {"handlers": {"console": {"level": "DEBUG"}}} + config_dir: Path = tmp_path / "config" + config_dir.mkdir() + config_file: Path = config_dir / "logging.yaml" + with open(config_file, "w") as f: + yaml.dump(config_data, f) + + # Patch __file__ to simulate the module being in tmp_path + with patch("ondewo.logging.async_logger.__file__", str(tmp_path / "your_module.py")): + loaded_config: Dict[str, Any] = await import_config() + assert loaded_config == config_data + + +@pytest.mark.asyncio +async def test_import_config_not_found(tmp_path: Path, capsys: pytest.CaptureFixture[str]) -> None: + """Tests the behavior when the 'logging.yaml' file is not found.""" + # Patch __file__ to simulate the module being in tmp_path + with patch("ondewo.logging.async_logger.__file__", str(tmp_path / "your_module.py")): + loaded_config: Dict[str, Any] = await import_config() + assert loaded_config == {} + captured = capsys.readouterr() + assert f"Config file not found: {tmp_path / 'config' / 'logging.yaml'}" in captured.out + + +@pytest.mark.asyncio +async def test_import_config_empty_file_root(tmp_path: Path) -> None: + """Tests the behavior when 'logging.yaml' at the root is empty.""" + config_file: Path = tmp_path / "logging.yaml" + config_file.write_text("") + os.chdir(tmp_path) + try: + loaded_config: Dict[str, Any] = await import_config() + assert loaded_config == {} + finally: + os.chdir(Path(__file__).parent) + + +@pytest.mark.asyncio +async def test_import_config_empty_file_subdir(tmp_path: Path) -> None: + """Tests the behavior when 'logging.yaml' in the subdirectory is empty.""" + config_dir: Path = tmp_path / "config" + config_dir.mkdir() + config_file: Path = config_dir / "logging.yaml" + config_file.write_text("") + with patch("ondewo.logging.async_logger.__file__", str(tmp_path / "your_module.py")): + loaded_config: Dict[str, Any] = await import_config() + assert loaded_config == {} + + +@pytest.mark.asyncio +async def test_import_config_invalid_yaml_root(tmp_path: Path, capsys: pytest.CaptureFixture[str]) -> None: + """Tests the behavior when 'logging.yaml' at the root contains invalid YAML.""" + invalid_yaml: str = "this is not valid yaml: -" + config_file: Path = tmp_path / "logging.yaml" + config_file.write_text(invalid_yaml) + os.chdir(tmp_path) + try: + loaded_config: Dict[str, Any] = await import_config() + assert loaded_config == {} + captured = capsys.readouterr() + assert "Error parsing YAML file" in captured.out + finally: + os.chdir(Path(__file__).parent) + + +@pytest.mark.asyncio +async def test_set_module_name(): + conf = {"logging": {"formatters": {"fluent_debug": {"format": {}}, "fluent_console": {"format": {}}}}} + module_name = "test_module" + git_repo_name = "test_repo" + docker_image_name = "test_image" + + updated_conf = await set_module_name(module_name, git_repo_name, docker_image_name, conf) + assert updated_conf["logging"]["formatters"]["fluent_debug"]["format"]["module_name"] == module_name + assert updated_conf["logging"]["formatters"]["fluent_console"]["format"]["git_repo_name"] == git_repo_name + + +@pytest.mark.asyncio +async def test_initiate_loggers(): + conf = {"logging": {"version": 1, "loggers": {"root": {}, "console": {}, "debug": {}}}} + logger_root, logger_console, logger_debug = await initiate_loggers(conf) + assert isinstance(logger_root, logging.Logger) + assert isinstance(logger_console, logging.Logger) + assert isinstance(logger_debug, logging.Logger) + + +@pytest.mark.asyncio +async def test_check_python_version(caplog): + logger = logging.getLogger("test_logger") + await check_python_version(logger) + if sys.version_info[0] == 2: + assert "Python 2 is not supported" in caplog.text + + +@pytest.mark.asyncio +async def test_create_logs(): + logger_root, logger_console, logger_debug = await create_logs() + assert isinstance(logger_root, logging.Logger) + assert isinstance(logger_console, logging.Logger) + assert isinstance(logger_debug, logging.Logger) + + +@pytest.mark.asyncio +async def test_async_custom_logger_grpc(): + logger = AsyncCustomLogger("test_logger") + message_dict = {"message": "Test GRPC message"} + await logger.grpc(message_dict) + assert logger.isEnabledFor(AsyncCustomLogger.GRPC_LEVEL_NUM) diff --git a/tests/test_decorators.py b/tests/test_decorators.py index d35c928..c0021de 100644 --- a/tests/test_decorators.py +++ b/tests/test_decorators.py @@ -12,6 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +import asyncio +import json import re from logging import Logger from multiprocessing.pool import ThreadPool @@ -23,6 +25,7 @@ from typing import ( Any, Callable, + Coroutine, Dict, List, Set, @@ -31,10 +34,16 @@ import pytest +from ondewo.logging.async_logger import logger_console as log from ondewo.logging.constants import CONTEXT from ondewo.logging.decorators import ( + AsyncTimer, ThreadContextLogger, Timer, + async_exception_handling, + async_log_args_kwargs_results, + async_log_arguments, + async_log_exception, exception_handling, exception_silencing, timing, @@ -686,3 +695,284 @@ def test_logging(logger, log_store) -> None: logger.info('Info message') assert log_store.messages['debug'] == ['Debug message'] assert log_store.messages['info'] == ['Info message'] + + +@pytest.mark.asyncio +async def test_async_timer(): + log_store = [] + + async def mock_logger(log: Dict[str, Any]) -> None: + log_store.append(log) + + timer = AsyncTimer(logger=mock_logger) + + @timer + async def async_function(): + await asyncio.sleep(0.1) + return "Done" + + result = await async_function() + assert result == "Done" + assert len(log_store) == 3 # Start log, arguments log, stop log + assert "Starting" in log_store[0]["message"] + assert "Function async_function executed" in log_store[1]["message"] + assert "Elapsed time" in log_store[2]["message"] + + +@pytest.mark.asyncio +async def test_async_exception_handling() -> None: + log_store: List[Dict[str, Any]] = [] + + async def mock_logger(log: Dict[str, Any]) -> None: + log_store.append(log) + + @async_exception_handling + async def async_function_with_exception() -> None: + raise ValueError("Test exception") + + result = await async_function_with_exception() # type: ignore + assert result is not None + assert isinstance(result, dict) + assert len(log_store) == 0 + assert "exception" in result["tags"] + assert "Test exception" in result["message"] + + +@pytest.mark.asyncio +async def test_async_exception_handling_with_logger() -> None: + log_store: List[Dict[str, Any]] = [] + + async def mock_logger(log: Dict[str, Any]) -> None: + log_store.append(log) + + @async_exception_handling + async def async_function_with_exception_and_logger( + logger: Callable[[Dict[str, Any]], Coroutine[Any, Any, None]] + ) -> None: + try: + raise ValueError("Test exception with logger") + except Exception as e: + await logger({"tags": ["exception"], "message": str(e)}) + + await async_function_with_exception_and_logger(mock_logger) # type: ignore + assert len(log_store) == 1 # Exception log recorded by mock_logger + assert "exception" in log_store[0]["tags"] + assert "Test exception with logger" in log_store[0]["message"] + + +@pytest.mark.asyncio +async def test_async_function_without_exception() -> None: + log_store: List[Dict[str, Any]] = [] + + async def mock_logger(log: Dict[str, Any]) -> None: + log_store.append(log) + + @async_exception_handling # type:ignore + async def async_function_without_exception() -> str: + return "Function completed successfully" + + result = await async_function_without_exception() # type:ignore + assert result == "Function completed successfully" + assert not log_store + + +@pytest.mark.asyncio +async def test_async_function_without_exception2() -> None: + log_store: List[Dict[str, Any]] = [] + + async def mock_logger(log: Dict[str, Any]) -> None: + log_store.append(log) + + @async_exception_handling # type:ignore + async def async_function_without_exception() -> str: + return "Function completed successfully" + + result = await async_function_without_exception() # type:ignore + assert result == "Function completed successfully" + assert len(log_store) == 0 + + +@pytest.mark.asyncio +async def test_async_log_arguments() -> None: + log_store: List[Dict[str, Any]] = [] + + async def mock_logger(log: Dict[str, Any]) -> None: + log_store.append(log) + + @async_log_arguments(mock_logger) + async def async_function_with_args(a: int, b: int) -> int: + return a + b + + result = await async_function_with_args(3, 5) + assert result == 8 + assert len(log_store) == 1 + assert log_store[0]["log_type"] == "arguments" + assert "Function arguments" in log_store[0]["message"] + logged_data = json.loads(log_store[0]["message"].split(": ", 1)[1]) + assert logged_data["function"] == "async_function_with_args" + assert logged_data["args"] == {"a": "3", "b": "5"} + assert logged_data["kwargs"] == {} + + +@pytest.mark.asyncio +async def test_async_log_arguments_with_kwargs() -> None: + log_store: List[Dict[str, Any]] = [] + + async def mock_logger(log: Dict[str, Any]) -> None: + log_store.append(log) + + @async_log_arguments(mock_logger) + async def async_function_with_args_kwargs(x: int, y: str = "default") -> str: + return f"{x}: {y}" + + result = await async_function_with_args_kwargs(10, y="custom") + assert result == "10: custom" + assert len(log_store) == 1 + assert log_store[0]["log_type"] == "arguments" + logged_data = json.loads(log_store[0]["message"].split(": ", 1)[1]) + assert logged_data["args"] == {"x": "10", "y": "custom"} + assert logged_data["kwargs"] == {"y": "custom"} + + +@pytest.mark.asyncio +async def test_async_log_arguments_no_args() -> None: + log_store: List[Dict[str, Any]] = [] + + async def mock_logger(log: Dict[str, Any]) -> None: + log_store.append(log) + + @async_log_arguments(mock_logger) + async def async_function_without_args() -> str: + return "No arguments here" + + result = await async_function_without_args() + assert result == "No arguments here" + assert len(log_store) == 1 + assert log_store[0]["log_type"] == "arguments" + logged_data = json.loads(log_store[0]["message"].split(": ", 1)[1]) + assert logged_data["args"] == {} + assert logged_data["kwargs"] == {} + + +@pytest.mark.asyncio +async def test_async_log_exception_decorator() -> None: + log_store: List[Dict[str, Any]] = [] + + async def mock_logger(log: Dict[str, Any]) -> None: + log_store.append(log) + + @async_log_exception(mock_logger) + async def failing_function(): + raise ValueError("Decorated test exception") + + with pytest.raises(ValueError, match="Decorated test exception"): + await failing_function() + + assert len(log_store) == 1 + assert "exception" in log_store[0]["tags"] + assert "Decorated test exception" in log_store[0]["message"] + assert "traceback" in log_store[0] + assert "ValueError" == log_store[0]["exception_type"] + assert "failing_function" == log_store[0]["function"] + assert "log_type" in log_store[0] and log_store[0]["log_type"] == "exception" + + +@pytest.mark.asyncio +async def test_async_log_exception_decorator_no_exception() -> None: + log_store: List[Dict[str, Any]] = [] + + async def mock_logger(log: Dict[str, Any]) -> None: + log_store.append(log) + + @async_log_exception(mock_logger) + async def successful_function(): + return "Success!" + + result = await successful_function() + assert result == "Success!" + assert len(log_store) == 0 # No exception, so no log + + +@pytest.mark.asyncio +async def test_async_log_args_kwargs_results() -> None: + log_store = [] + + async def mock_logger(log: Dict[str, Any]) -> None: + log_store.append(log) + + @async_log_args_kwargs_results(logger=mock_logger) + async def async_function(a: int, b: int) -> int: + return a + b + + result = await async_function(3, 5) + assert result == 8 + assert len(log_store) == 2 # Log entry for call and return + assert log_store[0]["log_type"] == "call" + assert log_store[0]["function"] == "async_function" + assert log_store[0]["args"] == {"a": "3", "b": "5"} + assert log_store[0]["kwargs"] == {} + + assert log_store[1]["log_type"] == "return" + assert log_store[1]["function"] == "async_function" + assert log_store[1]["result"] == "8" + + +@pytest.mark.asyncio +async def test_async_log_args_kwargs_results_with_kwargs2() -> None: + log_store = [] + + async def mock_logger(log: Dict[str, Any]) -> None: + log_store.append(log) + + @async_log_args_kwargs_results(logger=mock_logger, argument_max_length=5) + async def async_function_with_kwargs(x: int, name: str = "default_name") -> str: + return f"{name}: {x}" + + result = await async_function_with_kwargs(10, name="test_long_name") + assert result == "test_long_name: 10" + assert len(log_store) == 2 + assert log_store[0]["log_type"] == "call" + assert log_store[0]["args"] == {"x": "10", "name": repr("test_long_name")[:5]} + assert log_store[0]["kwargs"] == {"name": repr("test_long_name")[:5]} + assert log_store[1]["log_type"] == "return" + assert log_store[1]["result"] == repr("test_long_name: 10")[:5] + + +@pytest.mark.asyncio +async def test_async_log_args_kwargs_results_exception() -> None: + log_store = [] + + async def mock_logger(log: Dict[str, Any]) -> None: + log_store.append(log) + + @async_log_args_kwargs_results(logger=mock_logger) + async def failing_async_function(val: int): + if val < 0: + raise ValueError("Value cannot be negative") + return val * 2 + + with pytest.raises(ValueError, match="Value cannot be negative"): + await failing_async_function(-5) + + assert len(log_store) == 2 # Log for call and exception + assert log_store[0]["log_type"] == "call" + assert log_store[0]["function"] == "failing_async_function" + assert log_store[0]["args"] == {"val": "-5"} + assert log_store[1]["log_type"] == "exception" + assert log_store[1]["function"] == "failing_async_function" + assert log_store[1]["exception_type"] == "ValueError" + assert log_store[1]["exception"] == "Value cannot be negative" + + +@pytest.mark.asyncio +async def test_async_log_simple() -> None: + """Tests the AsyncTimer decorator with a simple asynchronous function.""" + + @AsyncTimer(logger=log.debug, log_arguments=True, message="Test Elapsed: {:0.5f}") + async def my_test_function(val: int) -> int: + await asyncio.sleep(0.01) # Simulate some work + print(f"Test value: {val}") + return val * 2 + + result = await my_test_function(5) + assert result == 10