From ea995b9b4674115df92c46f3cb97183c12ba4cdb Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Thu, 26 Mar 2026 17:25:46 +0000 Subject: [PATCH] fix: improve serialization error messages and add default=str fallback - Replace misleading warning message with clear, guideline-compliant text - Add json.dumps(default=str) fallback to prevent deadlocks when both orjson and json.dumps fail on non-serializable types (e.g. complex) - Catch specific TypeError instead of broad Exception for json fallback - Log technical details at DEBUG level, user-facing messages at WARNING - Add separate flag to log second fallback warning only once Resolves https://github.com/airbytehq/airbyte-internal-issues/issues/16049 Co-Authored-By: bot_apk --- airbyte_cdk/entrypoint.py | 19 ++++++++++++++++--- unit_tests/test_entrypoint.py | 32 ++++++++++++++++++++++++++++++++ 2 files changed, 48 insertions(+), 3 deletions(-) diff --git a/airbyte_cdk/entrypoint.py b/airbyte_cdk/entrypoint.py index d6c92f9fa..c7e2c71cf 100644 --- a/airbyte_cdk/entrypoint.py +++ b/airbyte_cdk/entrypoint.py @@ -49,6 +49,7 @@ VALID_URL_SCHEMES = ["https"] CLOUD_DEPLOYMENT_MODE = "cloud" _HAS_LOGGED_FOR_SERIALIZATION_ERROR = False +_HAS_LOGGED_FOR_SERIALIZATION_FALLBACK = False class AirbyteEntrypoint(object): @@ -333,16 +334,28 @@ def set_up_secret_filter(config: TConfig, connection_specification: Mapping[str, @staticmethod def airbyte_message_to_string(airbyte_message: AirbyteMessage) -> str: global _HAS_LOGGED_FOR_SERIALIZATION_ERROR + global _HAS_LOGGED_FOR_SERIALIZATION_FALLBACK serialized_message = AirbyteMessageSerializer.dump(airbyte_message) try: return orjson.dumps(serialized_message).decode() - except Exception as exception: + except Exception as orjson_error: if not _HAS_LOGGED_FOR_SERIALIZATION_ERROR: logger.warning( - f"There was an error during the serialization of an AirbyteMessage: `{exception}`. This might impact the sync performances." + "Record serialization fell back to slower method. Sync will continue with reduced performance." ) + logger.debug("orjson serialization error: %s", orjson_error) _HAS_LOGGED_FOR_SERIALIZATION_ERROR = True - return json.dumps(serialized_message) + try: + return json.dumps(serialized_message) + except TypeError as json_error: + if not _HAS_LOGGED_FOR_SERIALIZATION_FALLBACK: + logger.warning( + "Record contains a value that could not be serialized to JSON. " + "The value was converted to a string representation." + ) + logger.debug("json serialization error: %s", json_error) + _HAS_LOGGED_FOR_SERIALIZATION_FALLBACK = True + return json.dumps(serialized_message, default=str) @classmethod def extract_state(cls, args: List[str]) -> Optional[Any]: diff --git a/unit_tests/test_entrypoint.py b/unit_tests/test_entrypoint.py index 520131881..7fd82f75a 100644 --- a/unit_tests/test_entrypoint.py +++ b/unit_tests/test_entrypoint.py @@ -838,6 +838,10 @@ def test_handle_record_counts( def test_given_serialization_error_using_orjson_then_fallback_on_json( entrypoint: AirbyteEntrypoint, mocker, spec_mock, config_mock ): + # Reset global flags to avoid test pollution + entrypoint_module._HAS_LOGGED_FOR_SERIALIZATION_ERROR = False + entrypoint_module._HAS_LOGGED_FOR_SERIALIZATION_FALLBACK = False + parsed_args = Namespace( command="read", config="config_path", state="statepath", catalog="catalogpath" ) @@ -856,3 +860,31 @@ def test_given_serialization_error_using_orjson_then_fallback_on_json( # There will be multiple messages here because the fixture `entrypoint` sets a control message. We only care about records here record_messages = list(filter(lambda message: "RECORD" in message, messages)) assert len(record_messages) == 2 + + +def test_given_non_json_serializable_type_then_fallback_with_default_str( + entrypoint: AirbyteEntrypoint, mocker, spec_mock, config_mock +): + """Test that types which both orjson and json cannot serialize (like complex) are handled via default=str fallback.""" + # Reset global flags to avoid test pollution + entrypoint_module._HAS_LOGGED_FOR_SERIALIZATION_ERROR = False + entrypoint_module._HAS_LOGGED_FOR_SERIALIZATION_FALLBACK = False + + parsed_args = Namespace( + command="read", config="config_path", state="statepath", catalog="catalogpath" + ) + record = AirbyteMessage( + record=AirbyteRecordMessage(stream="stream", data={"value": complex(1, 2)}, emitted_at=1), + type=Type.RECORD, + ) + mocker.patch.object(MockSource, "read_state", return_value={}) + mocker.patch.object(MockSource, "read_catalog", return_value={}) + mocker.patch.object(MockSource, "read", return_value=[record]) + + messages = list(entrypoint.run(parsed_args)) + + record_messages = list(filter(lambda message: "RECORD" in message, messages)) + assert len(record_messages) == 1 + # Verify the complex value was converted to its string representation + parsed_record = orjson.loads(record_messages[0]) + assert parsed_record["record"]["data"]["value"] == "(1+2j)"