From e0624da7710227263ba1e23561aed84ec7dac7d7 Mon Sep 17 00:00:00 2001 From: PRADDZY <64481960+PRADDZY@users.noreply.github.com> Date: Wed, 20 May 2026 21:31:22 +0530 Subject: [PATCH 1/2] fix kafka avro sample decode for pre-deserialized values (cherry picked from commit 288ea29b9fe896f0b101be6347625a116686e1a9) --- .../source/messaging/common_broker_source.py | 11 +++- .../messaging/test_common_broker_source.py | 58 +++++++++++++++++++ 2 files changed, 66 insertions(+), 3 deletions(-) create mode 100644 ingestion/tests/unit/source/messaging/test_common_broker_source.py diff --git a/ingestion/src/metadata/ingestion/source/messaging/common_broker_source.py b/ingestion/src/metadata/ingestion/source/messaging/common_broker_source.py index 46e3af7ca964..beff75b1e259 100644 --- a/ingestion/src/metadata/ingestion/source/messaging/common_broker_source.py +++ b/ingestion/src/metadata/ingestion/source/messaging/common_broker_source.py @@ -16,7 +16,7 @@ import concurrent.futures import traceback from abc import ABC -from typing import Iterable, Optional +from typing import Any, Iterable, Optional import confluent_kafka from confluent_kafka import KafkaError, KafkaException @@ -327,8 +327,11 @@ def yield_topic_sample_data( ) ) - def decode_message(self, record: bytes, schema: str, schema_type: SchemaType): + def decode_message(self, record: Any, schema: str, schema_type: SchemaType): if schema_type == SchemaType.Avro: + # DeserializingConsumer may already return decoded dict/object values. + if not isinstance(record, (bytes, bytearray, memoryview)): + return str(record) deserializer = AvroDeserializer( schema_str=schema, schema_registry_client=self.schema_registry_client ) @@ -336,7 +339,9 @@ def decode_message(self, record: bytes, schema: str, schema_type: SchemaType): if schema_type == SchemaType.Protobuf: logger.debug("Protobuf deserializing sample data is not supported") return "" - return str(record.decode("utf-8")) + if isinstance(record, (bytes, bytearray, memoryview)): + return bytes(record).decode("utf-8") + return str(record) def close(self): if self.generate_sample_data and self.consumer_client: diff --git a/ingestion/tests/unit/source/messaging/test_common_broker_source.py b/ingestion/tests/unit/source/messaging/test_common_broker_source.py new file mode 100644 index 000000000000..271dbd196177 --- /dev/null +++ b/ingestion/tests/unit/source/messaging/test_common_broker_source.py @@ -0,0 +1,58 @@ +# Copyright 2025 Collate +# Licensed under the Collate Community License, Version 1.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Unit tests for common broker message decoding.""" + +from types import SimpleNamespace +from unittest.mock import patch + +from metadata.generated.schema.type.schema import SchemaType +from metadata.ingestion.source.messaging.common_broker_source import CommonBrokerSource + + +def test_decode_message_avro_skips_re_deserialization_for_decoded_values(): + source = SimpleNamespace(schema_registry_client=object()) + decoded_payload = {"event_id": "1", "status": "ok"} + + with patch( + "metadata.ingestion.source.messaging.common_broker_source.AvroDeserializer" + ) as mock_deserializer: + result = CommonBrokerSource.decode_message(source, decoded_payload, "ignored-schema", SchemaType.Avro) + + mock_deserializer.assert_not_called() + assert result == str(decoded_payload) + + +def test_decode_message_avro_deserializes_bytes_payload(): + source = SimpleNamespace(schema_registry_client=object()) + + with patch( + "metadata.ingestion.source.messaging.common_broker_source.AvroDeserializer" + ) as mock_deserializer: + mock_deserializer.return_value.return_value = {"event_id": "2"} + result = CommonBrokerSource.decode_message(source, b"binary-payload", "avro-schema", SchemaType.Avro) + + mock_deserializer.assert_called_once_with( + schema_str="avro-schema", + schema_registry_client=source.schema_registry_client, + ) + mock_deserializer.return_value.assert_called_once_with(b"binary-payload", None) + assert result == str({"event_id": "2"}) + + +def test_decode_message_non_avro_handles_bytes_and_decoded_values(): + source = SimpleNamespace(schema_registry_client=object()) + decoded_payload = {"foo": "bar"} + + bytes_result = CommonBrokerSource.decode_message(source, b"plain-text", "ignored-schema", SchemaType.Other) + decoded_result = CommonBrokerSource.decode_message(source, decoded_payload, "ignored-schema", SchemaType.Other) + + assert bytes_result == "plain-text" + assert decoded_result == str(decoded_payload) From 91fc6b599fa2a1424de443d4418470807df00847 Mon Sep 17 00:00:00 2001 From: PRADDZY <64481960+PRADDZY@users.noreply.github.com> Date: Thu, 21 May 2026 14:53:12 +0530 Subject: [PATCH 2/2] Format broker source Avro decode tests (cherry picked from commit 7d2a77855a90a41bd121782a94c42d3810cad659) --- .../unit/source/messaging/test_common_broker_source.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/ingestion/tests/unit/source/messaging/test_common_broker_source.py b/ingestion/tests/unit/source/messaging/test_common_broker_source.py index 271dbd196177..55e9ab61fdff 100644 --- a/ingestion/tests/unit/source/messaging/test_common_broker_source.py +++ b/ingestion/tests/unit/source/messaging/test_common_broker_source.py @@ -21,9 +21,7 @@ def test_decode_message_avro_skips_re_deserialization_for_decoded_values(): source = SimpleNamespace(schema_registry_client=object()) decoded_payload = {"event_id": "1", "status": "ok"} - with patch( - "metadata.ingestion.source.messaging.common_broker_source.AvroDeserializer" - ) as mock_deserializer: + with patch("metadata.ingestion.source.messaging.common_broker_source.AvroDeserializer") as mock_deserializer: result = CommonBrokerSource.decode_message(source, decoded_payload, "ignored-schema", SchemaType.Avro) mock_deserializer.assert_not_called() @@ -33,9 +31,7 @@ def test_decode_message_avro_skips_re_deserialization_for_decoded_values(): def test_decode_message_avro_deserializes_bytes_payload(): source = SimpleNamespace(schema_registry_client=object()) - with patch( - "metadata.ingestion.source.messaging.common_broker_source.AvroDeserializer" - ) as mock_deserializer: + with patch("metadata.ingestion.source.messaging.common_broker_source.AvroDeserializer") as mock_deserializer: mock_deserializer.return_value.return_value = {"event_id": "2"} result = CommonBrokerSource.decode_message(source, b"binary-payload", "avro-schema", SchemaType.Avro)