diff --git a/ingestion/src/metadata/ingestion/source/messaging/common_broker_source.py b/ingestion/src/metadata/ingestion/source/messaging/common_broker_source.py index 46e3af7ca964..beff75b1e259 100644 --- a/ingestion/src/metadata/ingestion/source/messaging/common_broker_source.py +++ b/ingestion/src/metadata/ingestion/source/messaging/common_broker_source.py @@ -16,7 +16,7 @@ import concurrent.futures import traceback from abc import ABC -from typing import Iterable, Optional +from typing import Any, Iterable, Optional import confluent_kafka from confluent_kafka import KafkaError, KafkaException @@ -327,8 +327,11 @@ def yield_topic_sample_data( ) ) - def decode_message(self, record: bytes, schema: str, schema_type: SchemaType): + def decode_message(self, record: Any, schema: str, schema_type: SchemaType): if schema_type == SchemaType.Avro: + # DeserializingConsumer may already return decoded dict/object values. + if not isinstance(record, (bytes, bytearray, memoryview)): + return str(record) deserializer = AvroDeserializer( schema_str=schema, schema_registry_client=self.schema_registry_client ) @@ -336,7 +339,9 @@ def decode_message(self, record: bytes, schema: str, schema_type: SchemaType): if schema_type == SchemaType.Protobuf: logger.debug("Protobuf deserializing sample data is not supported") return "" - return str(record.decode("utf-8")) + if isinstance(record, (bytes, bytearray, memoryview)): + return bytes(record).decode("utf-8") + return str(record) def close(self): if self.generate_sample_data and self.consumer_client: diff --git a/ingestion/tests/unit/source/messaging/test_common_broker_source.py b/ingestion/tests/unit/source/messaging/test_common_broker_source.py new file mode 100644 index 000000000000..55e9ab61fdff --- /dev/null +++ b/ingestion/tests/unit/source/messaging/test_common_broker_source.py @@ -0,0 +1,54 @@ +# Copyright 2025 Collate +# Licensed under the Collate Community License, Version 1.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Unit tests for common broker message decoding.""" + +from types import SimpleNamespace +from unittest.mock import patch + +from metadata.generated.schema.type.schema import SchemaType +from metadata.ingestion.source.messaging.common_broker_source import CommonBrokerSource + + +def test_decode_message_avro_skips_re_deserialization_for_decoded_values(): + source = SimpleNamespace(schema_registry_client=object()) + decoded_payload = {"event_id": "1", "status": "ok"} + + with patch("metadata.ingestion.source.messaging.common_broker_source.AvroDeserializer") as mock_deserializer: + result = CommonBrokerSource.decode_message(source, decoded_payload, "ignored-schema", SchemaType.Avro) + + mock_deserializer.assert_not_called() + assert result == str(decoded_payload) + + +def test_decode_message_avro_deserializes_bytes_payload(): + source = SimpleNamespace(schema_registry_client=object()) + + with patch("metadata.ingestion.source.messaging.common_broker_source.AvroDeserializer") as mock_deserializer: + mock_deserializer.return_value.return_value = {"event_id": "2"} + result = CommonBrokerSource.decode_message(source, b"binary-payload", "avro-schema", SchemaType.Avro) + + mock_deserializer.assert_called_once_with( + schema_str="avro-schema", + schema_registry_client=source.schema_registry_client, + ) + mock_deserializer.return_value.assert_called_once_with(b"binary-payload", None) + assert result == str({"event_id": "2"}) + + +def test_decode_message_non_avro_handles_bytes_and_decoded_values(): + source = SimpleNamespace(schema_registry_client=object()) + decoded_payload = {"foo": "bar"} + + bytes_result = CommonBrokerSource.decode_message(source, b"plain-text", "ignored-schema", SchemaType.Other) + decoded_result = CommonBrokerSource.decode_message(source, decoded_payload, "ignored-schema", SchemaType.Other) + + assert bytes_result == "plain-text" + assert decoded_result == str(decoded_payload)