diff --git a/airbyte_cdk/entrypoint.py b/airbyte_cdk/entrypoint.py index d6c92f9fa..1df36fed5 100644 --- a/airbyte_cdk/entrypoint.py +++ b/airbyte_cdk/entrypoint.py @@ -342,7 +342,14 @@ def airbyte_message_to_string(airbyte_message: AirbyteMessage) -> str: f"There was an error during the serialization of an AirbyteMessage: `{exception}`. This might impact the sync performances." ) _HAS_LOGGED_FOR_SERIALIZATION_ERROR = True - return json.dumps(serialized_message) + try: + return json.dumps(serialized_message) + except Exception as json_exception: + raise AirbyteTracedException( + internal_message=f"Failed to serialize AirbyteMessage to JSON: `{json_exception}`", + failure_type=FailureType.system_error, + message="A record returned from the API failed to be serialized to JSON.", + ) from json_exception @classmethod def extract_state(cls, args: List[str]) -> Optional[Any]: diff --git a/unit_tests/test_entrypoint.py b/unit_tests/test_entrypoint.py index 520131881..f4e56f936 100644 --- a/unit_tests/test_entrypoint.py +++ b/unit_tests/test_entrypoint.py @@ -856,3 +856,22 @@ def test_given_serialization_error_using_orjson_then_fallback_on_json( # There will be multiple messages here because the fixture `entrypoint` sets a control message. We only care about records here record_messages = list(filter(lambda message: "RECORD" in message, messages)) assert len(record_messages) == 2 + + +def test_given_non_json_serializable_type_then_raise_traced_exception( + entrypoint: AirbyteEntrypoint, mocker, spec_mock, config_mock +): + """Test that types which both orjson and json cannot serialize (like complex) raise AirbyteTracedException to prevent data corruption.""" + parsed_args = Namespace( + command="read", config="config_path", state="statepath", catalog="catalogpath" + ) + record = AirbyteMessage( + record=AirbyteRecordMessage(stream="stream", data={"value": complex(1, 2)}, emitted_at=1), + type=Type.RECORD, + ) + mocker.patch.object(MockSource, "read_state", return_value={}) + mocker.patch.object(MockSource, "read_catalog", return_value={}) + mocker.patch.object(MockSource, "read", return_value=[record]) + + with pytest.raises(AirbyteTracedException, match="failed to be serialized to JSON"): + list(entrypoint.run(parsed_args))